1 |
|
|
2 |
|
|
3 |
|
|
4 |
|
package ca.spaz.cron.datasource.sql.USDAsr17; |
5 |
|
|
6 |
|
import java.io.*; |
7 |
|
import java.net.*; |
8 |
|
import java.sql.*; |
9 |
|
import java.util.*; |
10 |
|
import java.util.zip.*; |
11 |
|
|
12 |
|
import org.apache.log4j.Logger; |
13 |
|
|
14 |
|
import ca.spaz.task.Task; |
15 |
|
import ca.spaz.util.*; |
16 |
|
|
17 |
|
public class USDAImporter implements Task { |
18 |
|
|
19 |
|
|
20 |
|
|
21 |
|
private static final int BATCH_SIZE = 100; |
22 |
|
|
23 |
|
private static final boolean TEST_MODE = false; |
24 |
|
|
25 |
|
|
26 |
|
|
27 |
|
|
28 |
|
private static final int CONVERSION_PROGRESS_PORTION = 60; |
29 |
|
|
30 |
|
|
31 |
|
|
32 |
|
|
33 |
|
private static final int DOWNLOAD_PROGRESS_PORTION = 35; |
34 |
|
|
35 |
|
|
36 |
|
|
37 |
|
|
38 |
|
private static final int FD_GROUP_INDEX = 3; |
39 |
|
|
40 |
|
|
41 |
|
|
42 |
|
|
43 |
|
private static final int WEIGHT_INDEX = 2; |
44 |
|
|
45 |
|
|
46 |
|
|
47 |
|
|
48 |
|
private static final int NUT_DATA_INDEX = 1; |
49 |
|
|
50 |
|
|
51 |
|
|
52 |
|
|
53 |
|
private static final int FOOD_DES_INDEX = 0; |
54 |
|
|
55 |
|
|
56 |
|
|
57 |
|
|
58 |
0 |
private static final Logger logger = Logger.getLogger(USDAImporter.class); |
59 |
|
|
60 |
0 |
private HashMap groups = new HashMap(); |
61 |
0 |
private HashMap foods = new HashMap(); |
62 |
|
|
63 |
|
private Connection con; |
64 |
|
|
65 |
|
private static final int HASH_MAX = 50; |
66 |
|
|
67 |
|
private URL sourceURL; |
68 |
|
|
69 |
|
private InputStream sourceStream; |
70 |
|
|
71 |
|
private PrintStream out; |
72 |
|
|
73 |
|
private HashMap insertStmts; |
74 |
|
|
75 |
|
private HashMap updateStmts; |
76 |
|
|
77 |
|
private Set added_foods; |
78 |
|
|
79 |
0 |
public USDAImporter(Connection conn, OutputStream out) { |
80 |
0 |
this.con = conn; |
81 |
0 |
this.added_foods = new HashSet(); |
82 |
0 |
if (null == out) { |
83 |
0 |
this.out = System.out; |
84 |
0 |
} else { |
85 |
0 |
this.out = new PrintStream(out); |
86 |
|
} |
87 |
0 |
} |
88 |
|
|
89 |
|
public InputStream getSourceStream() { |
90 |
0 |
return sourceStream; |
91 |
|
} |
92 |
|
|
93 |
|
public void setSourceStream(InputStream sourceStream) { |
94 |
0 |
this.sourceStream = sourceStream; |
95 |
0 |
} |
96 |
|
|
97 |
|
public URL getSourceURL() { |
98 |
0 |
return sourceURL; |
99 |
|
} |
100 |
|
|
101 |
|
public void setSourceURL(URL sourceURL) { |
102 |
0 |
this.sourceURL = sourceURL; |
103 |
0 |
} |
104 |
|
|
105 |
|
|
106 |
|
|
107 |
|
|
108 |
|
public void run() { |
109 |
0 |
abort = false; |
110 |
0 |
curTask = "Importing USDA sr17"; |
111 |
0 |
if (null == sourceURL && class="keyword">null == sourceStream) { |
112 |
0 |
return; |
113 |
|
} |
114 |
0 |
if (null == sourceStream) { |
115 |
|
|
116 |
|
try { |
117 |
0 |
File tempDir = new File(System.getProperty("java.io.tmpdir")); |
118 |
0 |
tempDir = new File(tempDir, "usda_dl"); |
119 |
0 |
if (tempDir.exists()) { |
120 |
0 |
out.print("Clearing old temp dir... "); |
121 |
0 |
out.flush(); |
122 |
0 |
ToolBox.deleteDir(tempDir); |
123 |
0 |
out.println("Done!"); |
124 |
0 |
out.flush(); |
125 |
|
} |
126 |
0 |
out.println("Creating temp directory."); |
127 |
0 |
out.flush(); |
128 |
0 |
if (!tempDir.mkdir()) { |
129 |
|
|
130 |
0 |
return; |
131 |
|
} |
132 |
0 |
File tempFile = new File(tempDir, "usda_sr17_temp.zip"); |
133 |
0 |
if (!downloadFile(sourceURL, tempFile, DOWNLOAD_PROGRESS_PORTION)) { |
134 |
0 |
return; |
135 |
|
} |
136 |
0 |
if (abort) return; |
137 |
0 |
ZipFile zif = new ZipFile(tempFile); |
138 |
0 |
ZipEntry[] relevantEntries = getZipEntries(zif); |
139 |
|
|
140 |
|
|
141 |
0 |
long nbytes = getExtractedBytes(relevantEntries); |
142 |
|
|
143 |
0 |
progress = DOWNLOAD_PROGRESS_PORTION; |
144 |
|
InputStream ins; |
145 |
0 |
int idx = FD_GROUP_INDEX; |
146 |
0 |
int baseProgress = progress; |
147 |
0 |
ins = zif.getInputStream(relevantEntries[idx]); |
148 |
0 |
parseInputStream(ins, "food groups", relevantEntries[idx].getSize(), nbytes, baseProgress, idx); |
149 |
|
try { |
150 |
0 |
PreparedStatement ps = con |
151 |
|
.prepareStatement("INSERT INTO FoodGroup VALUES (?)"); |
152 |
0 |
for (Iterator iter = groups.values().iterator(); iter.hasNext();) { |
153 |
0 |
String grp = (String) iter.next(); |
154 |
0 |
ps.clearParameters(); |
155 |
0 |
ps.setString(1, grp); |
156 |
0 |
ps.execute(); |
157 |
0 |
} |
158 |
0 |
} catch (SQLException e) { |
159 |
0 |
out.println("[ERROR] Could not import food groups: " + e.getMessage()); |
160 |
0 |
} |
161 |
0 |
if (!abort) { |
162 |
0 |
idx = FOOD_DES_INDEX; |
163 |
0 |
baseProgress = progress; |
164 |
0 |
ins = zif.getInputStream(relevantEntries[idx]); |
165 |
0 |
parseInputStream(ins, "food descriptions", relevantEntries[idx].getSize(), nbytes, baseProgress, idx); |
166 |
|
} |
167 |
0 |
if (!abort) { |
168 |
0 |
idx = WEIGHT_INDEX; |
169 |
0 |
baseProgress = progress; |
170 |
0 |
ins = zif.getInputStream(relevantEntries[idx]); |
171 |
0 |
parseInputStream(ins, "measures", relevantEntries[idx].getSize(), nbytes, baseProgress, idx); |
172 |
|
} |
173 |
0 |
if (!abort) { |
174 |
0 |
idx = NUT_DATA_INDEX; |
175 |
0 |
baseProgress = progress; |
176 |
0 |
ins = zif.getInputStream(relevantEntries[idx]); |
177 |
0 |
parseInputStream(ins, "nutrients", relevantEntries[idx].getSize(), nbytes, baseProgress, idx); |
178 |
|
} |
179 |
0 |
zif.close(); |
180 |
0 |
if (ToolBox.deleteDir(tempDir)) { |
181 |
0 |
out.println("Succeeded in clearing temp"); |
182 |
0 |
out.flush(); |
183 |
0 |
} else { |
184 |
0 |
out.println("Failed to clear temp - delete " + tempFile + " manually."); |
185 |
0 |
out.flush(); |
186 |
|
} |
187 |
0 |
} catch (FileNotFoundException e) { |
188 |
0 |
e.printStackTrace(); |
189 |
0 |
return; |
190 |
0 |
} catch (IOException e) { |
191 |
0 |
e.printStackTrace(); |
192 |
0 |
return; |
193 |
0 |
} |
194 |
|
} else { |
195 |
|
|
196 |
0 |
out.println("Reading data from local input stream"); |
197 |
|
} |
198 |
0 |
progress = 100; |
199 |
0 |
return; |
200 |
|
} |
201 |
|
|
202 |
|
private void parseInputStream(InputStream ins, String streamName, long streamSize, class="keyword">long totalSize, int baseProgress, int parseType) throws IOException { |
203 |
0 |
int progressSize = (class="keyword">int) (CONVERSION_PROGRESS_PORTION * (double) streamSize / totalSize); |
204 |
0 |
out.print("Reading in " + streamName + "..."); |
205 |
0 |
out.flush(); |
206 |
0 |
CountableInputStream cbis = cbis = new CountableInputStream(ins); |
207 |
0 |
BufferedReader brin = new BufferedReader(class="keyword">new InputStreamReader(cbis)); |
208 |
0 |
String line = brin.readLine(); |
209 |
0 |
int linenumber = 0; |
210 |
0 |
while (line != null) { |
211 |
0 |
if (abort) return; |
212 |
0 |
switch (parseType) { |
213 |
|
case FD_GROUP_INDEX: |
214 |
0 |
parseFoodGroupLine(line); |
215 |
0 |
break; |
216 |
|
case FOOD_DES_INDEX: |
217 |
0 |
USDAFood food = new USDAFood(line, groups); |
218 |
0 |
foods.put(food.ndb_id, food); |
219 |
|
if (!TEST_MODE) { |
220 |
0 |
food.addToDB(con); |
221 |
|
} |
222 |
0 |
break; |
223 |
|
case WEIGHT_INDEX: |
224 |
0 |
USDAWeight w = new USDAWeight(line); |
225 |
|
if (!TEST_MODE) { |
226 |
0 |
w.addToDB(con, foods); |
227 |
|
} |
228 |
0 |
break; |
229 |
|
case NUT_DATA_INDEX: |
230 |
0 |
String[] parts = line.split("\\^"); |
231 |
0 |
for (int i = 0; i < parts.length; i++) { |
232 |
0 |
parts[i] = parts[i].replaceAll("^~", ""); |
233 |
0 |
parts[i] = parts[i].replaceAll("~$", ""); |
234 |
|
} |
235 |
0 |
double amount = Double.parseDouble(parts[2]); |
236 |
|
|
237 |
|
try { |
238 |
0 |
USDANutrientInfo inf = (USDANutrientInfo) USDANutrientInfo.getNutrientMap().get(parts[1]); |
239 |
0 |
if (null == inf) { |
240 |
0 |
break; |
241 |
|
} |
242 |
0 |
USDAFood f = (USDAFood) foods.get(parts[0]); |
243 |
0 |
if (!added_foods.contains(inf.table + "," + Integer.toHexString(f.hashCode()))) { |
244 |
|
|
245 |
0 |
PreparedStatement stmt = getCreateNutrientRow(inf.table); |
246 |
0 |
stmt.clearParameters(); |
247 |
0 |
stmt.setInt(1, f.ID); |
248 |
0 |
stmt.addBatch(); |
249 |
0 |
added_foods.add(inf.table + "," + Integer.toHexString(f.hashCode())); |
250 |
|
} |
251 |
0 |
PreparedStatement stmt = getAddNutrient(inf.table, inf.tag); |
252 |
0 |
stmt.clearParameters(); |
253 |
0 |
stmt.setDouble(1, amount); |
254 |
0 |
stmt.setInt(2, f.ID); |
255 |
0 |
stmt.addBatch(); |
256 |
0 |
} catch (SQLException e) { |
257 |
0 |
throw new IllegalStateException(e); |
258 |
0 |
} |
259 |
0 |
if (linenumber % BATCH_SIZE == 0) { |
260 |
0 |
executeBatches(); |
261 |
0 |
} |
262 |
|
break; |
263 |
|
default: |
264 |
0 |
throw new IllegalArgumentException("Invalid parse type: " + parseType); |
265 |
|
} |
266 |
0 |
progress = (int) (baseProgress + (progressSize * ((double) cbis.getBytesRead() / streamSize))); |
267 |
0 |
line = brin.readLine(); |
268 |
0 |
linenumber++; |
269 |
0 |
} |
270 |
0 |
executeBatches(); |
271 |
0 |
out.println("Done."); |
272 |
0 |
} |
273 |
|
|
274 |
|
private PreparedStatement getCreateNutrientRow(String table) { |
275 |
0 |
if (null == insertStmts) { |
276 |
0 |
insertStmts = new HashMap(); |
277 |
|
} |
278 |
0 |
PreparedStatement ret = (PreparedStatement) insertStmts.get(table); |
279 |
0 |
if (null == ret) { |
280 |
|
try { |
281 |
0 |
ret = con.prepareStatement("INSERT INTO " + table + " (FID) VALUES (?)"); |
282 |
0 |
if (logger.isInfoEnabled()) { |
283 |
0 |
logger.info("prepared: 'INSERT INTO " + table + " (FID) VALUES (?)'"); |
284 |
|
} |
285 |
0 |
} catch (SQLException e) { |
286 |
0 |
throw new IllegalStateException(e); |
287 |
0 |
} |
288 |
0 |
insertStmts.put(table, ret); |
289 |
|
} |
290 |
0 |
return ret; |
291 |
|
} |
292 |
|
|
293 |
|
private PreparedStatement getAddNutrient(String table, String tag) { |
294 |
0 |
if (null == updateStmts) { |
295 |
0 |
updateStmts = new HashMap(); |
296 |
|
} |
297 |
0 |
PreparedStatement ret = (PreparedStatement) updateStmts.get(table + "," + tag); |
298 |
0 |
if (null == ret) { |
299 |
|
try { |
300 |
0 |
ret = con.prepareStatement("UPDATE " + table + " SET " + tag + " = ? WHERE FID = ?"); |
301 |
0 |
if (logger.isInfoEnabled()) { |
302 |
0 |
logger.info("prepared: 'UPDATE " + table + " SET " + tag + " = ? WHERE FID = ?'"); |
303 |
|
} |
304 |
0 |
} catch (SQLException e) { |
305 |
0 |
throw new IllegalStateException(e); |
306 |
0 |
} |
307 |
0 |
updateStmts.put(table + "," + tag, ret); |
308 |
|
} |
309 |
0 |
return ret; |
310 |
|
} |
311 |
|
|
312 |
|
private void executeBatches() { |
313 |
0 |
if (abort) return; |
314 |
0 |
if (null != insertStmts) { |
315 |
0 |
for (Iterator iter = insertStmts.values().iterator(); iter.hasNext();) { |
316 |
0 |
PreparedStatement stmt = (PreparedStatement) iter.next(); |
317 |
|
try { |
318 |
0 |
stmt.executeBatch(); |
319 |
0 |
} catch (SQLException e) { |
320 |
0 |
throw new IllegalStateException(e); |
321 |
0 |
} |
322 |
0 |
} |
323 |
|
} |
324 |
0 |
if (null != updateStmts) { |
325 |
0 |
for (Iterator iter = updateStmts.values().iterator(); iter.hasNext();) { |
326 |
0 |
PreparedStatement stmt = (PreparedStatement) iter.next(); |
327 |
|
try { |
328 |
0 |
stmt.executeBatch(); |
329 |
0 |
} catch (SQLException e) { |
330 |
0 |
throw new IllegalStateException(e); |
331 |
0 |
} |
332 |
0 |
} |
333 |
|
} |
334 |
0 |
} |
335 |
|
|
336 |
|
|
337 |
|
|
338 |
|
|
339 |
|
private void parseFoodGroupLine(String line) { |
340 |
0 |
String[] parts = line.split("\\^"); |
341 |
0 |
for (int i = 0; i < parts.length; i++) { |
342 |
0 |
parts[i] = parts[i].replaceAll("^~", ""); |
343 |
0 |
parts[i] = parts[i].replaceAll("~$", ""); |
344 |
|
} |
345 |
0 |
groups.put(parts[0], parts[1]); |
346 |
0 |
} |
347 |
|
|
348 |
|
|
349 |
|
|
350 |
|
|
351 |
|
|
352 |
|
private long getExtractedBytes(ZipEntry[] relevantEntries) { |
353 |
0 |
long nbytes = 0; |
354 |
0 |
for (int i = 0; i < relevantEntries.length; i++) { |
355 |
0 |
nbytes += relevantEntries[i].getSize(); |
356 |
|
} |
357 |
0 |
if (nbytes < 0) { |
358 |
0 |
nbytes = Long.MAX_VALUE; |
359 |
|
} |
360 |
0 |
return nbytes; |
361 |
|
} |
362 |
|
|
363 |
|
|
364 |
|
|
365 |
|
|
366 |
|
|
367 |
|
private ZipEntry[] getZipEntries(ZipFile zif) { |
368 |
0 |
ZipEntry[] relevantEntries = new ZipEntry[4]; |
369 |
0 |
for(Enumeration e = zif.entries(); e.hasMoreElements();) { |
370 |
0 |
ZipEntry went = (ZipEntry) e.nextElement(); |
371 |
0 |
if (went.getName().equalsIgnoreCase("food_des.txt")) { |
372 |
0 |
relevantEntries[FOOD_DES_INDEX] = went; |
373 |
0 |
} else if (went.getName().equalsIgnoreCase("nut_data.txt")) { |
374 |
0 |
relevantEntries[NUT_DATA_INDEX] = went; |
375 |
0 |
} else if (went.getName().equalsIgnoreCase("weight.txt")) { |
376 |
0 |
relevantEntries[WEIGHT_INDEX] = went; |
377 |
0 |
} else if (went.getName().equalsIgnoreCase("fd_group.txt")) { |
378 |
0 |
relevantEntries[FD_GROUP_INDEX] = went; |
379 |
|
} else { |
380 |
|
|
381 |
|
} |
382 |
0 |
} |
383 |
0 |
return relevantEntries; |
384 |
|
} |
385 |
|
|
386 |
|
|
387 |
|
|
388 |
|
|
389 |
|
|
390 |
|
|
391 |
|
|
392 |
|
|
393 |
|
|
394 |
|
private boolean downloadFile(URL url, File tempFile, int progressPortion) throws FileNotFoundException, IOException { |
395 |
0 |
FileOutputStream fos = new FileOutputStream(tempFile); |
396 |
0 |
out.println("Created temp file: " + tempFile); |
397 |
0 |
out.flush(); |
398 |
0 |
HttpURLConnection uconn = (HttpURLConnection) url.openConnection(); |
399 |
0 |
out.println("Getting data from " + url); |
400 |
0 |
out.flush(); |
401 |
0 |
InputStream raw = uconn.getInputStream(); |
402 |
0 |
InputStream in = new BufferedInputStream(raw); |
403 |
0 |
int contentLength = uconn.getContentLength(); |
404 |
0 |
byte[] data = new byte[contentLength]; |
405 |
0 |
int read = 0; |
406 |
0 |
int offs = 0; |
407 |
0 |
while (offs < contentLength && !abort) { |
408 |
0 |
read = in.read(data, offs, data.length - offs); |
409 |
0 |
if (read < 0) { |
410 |
0 |
break; |
411 |
|
} |
412 |
0 |
offs += read; |
413 |
0 |
progress = (int)(progressPortion * (offs/(double)contentLength)); |
414 |
0 |
} |
415 |
0 |
in.close(); |
416 |
0 |
if (offs != contentLength) { |
417 |
0 |
out.println("Error downloading " + url); |
418 |
0 |
out.flush(); |
419 |
0 |
return false; |
420 |
|
} |
421 |
0 |
out.print("Completed downloading " + url + ", writing..."); |
422 |
0 |
out.flush(); |
423 |
0 |
fos.write(data); |
424 |
0 |
fos.close(); |
425 |
0 |
out.println("Done!"); |
426 |
0 |
out.flush(); |
427 |
0 |
return true; |
428 |
|
} |
429 |
|
|
430 |
0 |
private volatile int progress = 0; |
431 |
0 |
private volatile boolean abort = false; |
432 |
|
private volatile String curTask; |
433 |
|
|
434 |
|
public int getTaskProgress() { |
435 |
0 |
return progress; |
436 |
|
} |
437 |
|
|
438 |
|
public void abortTask() { |
439 |
0 |
abort = true; |
440 |
0 |
} |
441 |
|
|
442 |
|
public boolean canAbortTask() { |
443 |
0 |
return true; |
444 |
|
} |
445 |
|
|
446 |
|
public String getTaskDescription() { |
447 |
0 |
return curTask; |
448 |
|
} |
449 |
|
|
450 |
|
} |