1
2
3
4 package ca.spaz.cron.datasource.sql.USDAsr17;
5
6 import java.io.*;
7 import java.net.*;
8 import java.sql.*;
9 import java.util.*;
10 import java.util.zip.*;
11
12 import org.apache.log4j.Logger;
13
14 import ca.spaz.task.Task;
15 import ca.spaz.util.*;
16
17 public class USDAImporter implements Task {
18 /***
19 *
20 */
21 private static final int BATCH_SIZE = 100;
22
23 private static final boolean TEST_MODE = false;
24
25 /***
26 *
27 */
28 private static final int CONVERSION_PROGRESS_PORTION = 60;
29
30 /***
31 *
32 */
33 private static final int DOWNLOAD_PROGRESS_PORTION = 35;
34
35 /***
36 *
37 */
38 private static final int FD_GROUP_INDEX = 3;
39
40 /***
41 *
42 */
43 private static final int WEIGHT_INDEX = 2;
44
45 /***
46 *
47 */
48 private static final int NUT_DATA_INDEX = 1;
49
50 /***
51 *
52 */
53 private static final int FOOD_DES_INDEX = 0;
54
55 /***
56 * Logger for this class
57 */
58 private static final Logger logger = Logger.getLogger(USDAImporter.class);
59
60 private HashMap groups = new HashMap();
61 private HashMap foods = new HashMap();
62
63 private Connection con;
64
65 private static final int HASH_MAX = 50;
66
67 private URL sourceURL;
68
69 private InputStream sourceStream;
70
71 private PrintStream out;
72
73 private HashMap insertStmts;
74
75 private HashMap updateStmts;
76
77 private Set added_foods;
78
79 public USDAImporter(Connection conn, OutputStream out) {
80 this.con = conn;
81 this.added_foods = new HashSet();
82 if (null == out) {
83 this.out = System.out;
84 } else {
85 this.out = new PrintStream(out);
86 }
87 }
88
89 public InputStream getSourceStream() {
90 return sourceStream;
91 }
92
93 public void setSourceStream(InputStream sourceStream) {
94 this.sourceStream = sourceStream;
95 }
96
97 public URL getSourceURL() {
98 return sourceURL;
99 }
100
101 public void setSourceURL(URL sourceURL) {
102 this.sourceURL = sourceURL;
103 }
104
105
106
107
108 public void run() {
109 abort = false;
110 curTask = "Importing USDA sr17";
111 if (null == sourceURL && null == sourceStream) {
112 return;
113 }
114 if (null == sourceStream) {
115
116 try {
117 File tempDir = new File(System.getProperty("java.io.tmpdir"));
118 tempDir = new File(tempDir, "usda_dl");
119 if (tempDir.exists()) {
120 out.print("Clearing old temp dir... ");
121 out.flush();
122 ToolBox.deleteDir(tempDir);
123 out.println("Done!");
124 out.flush();
125 }
126 out.println("Creating temp directory.");
127 out.flush();
128 if (!tempDir.mkdir()) {
129
130 return;
131 }
132 File tempFile = new File(tempDir, "usda_sr17_temp.zip");
133 if (!downloadFile(sourceURL, tempFile, DOWNLOAD_PROGRESS_PORTION)) {
134 return;
135 }
136 if (abort) return;
137 ZipFile zif = new ZipFile(tempFile);
138 ZipEntry[] relevantEntries = getZipEntries(zif);
139
140
141 long nbytes = getExtractedBytes(relevantEntries);
142
143 progress = DOWNLOAD_PROGRESS_PORTION;
144 InputStream ins;
145 int idx = FD_GROUP_INDEX;
146 int baseProgress = progress;
147 ins = zif.getInputStream(relevantEntries[idx]);
148 parseInputStream(ins, "food groups", relevantEntries[idx].getSize(), nbytes, baseProgress, idx);
149 try {
150 PreparedStatement ps = con
151 .prepareStatement("INSERT INTO FoodGroup VALUES (?)");
152 for (Iterator iter = groups.values().iterator(); iter.hasNext();) {
153 String grp = (String) iter.next();
154 ps.clearParameters();
155 ps.setString(1, grp);
156 ps.execute();
157 }
158 } catch (SQLException e) {
159 out.println("[ERROR] Could not import food groups: " + e.getMessage());
160 }
161 if (!abort) {
162 idx = FOOD_DES_INDEX;
163 baseProgress = progress;
164 ins = zif.getInputStream(relevantEntries[idx]);
165 parseInputStream(ins, "food descriptions", relevantEntries[idx].getSize(), nbytes, baseProgress, idx);
166 }
167 if (!abort) {
168 idx = WEIGHT_INDEX;
169 baseProgress = progress;
170 ins = zif.getInputStream(relevantEntries[idx]);
171 parseInputStream(ins, "measures", relevantEntries[idx].getSize(), nbytes, baseProgress, idx);
172 }
173 if (!abort) {
174 idx = NUT_DATA_INDEX;
175 baseProgress = progress;
176 ins = zif.getInputStream(relevantEntries[idx]);
177 parseInputStream(ins, "nutrients", relevantEntries[idx].getSize(), nbytes, baseProgress, idx);
178 }
179 zif.close();
180 if (ToolBox.deleteDir(tempDir)) {
181 out.println("Succeeded in clearing temp");
182 out.flush();
183 } else {
184 out.println("Failed to clear temp - delete " + tempFile + " manually.");
185 out.flush();
186 }
187 } catch (FileNotFoundException e) {
188 e.printStackTrace();
189 return;
190 } catch (IOException e) {
191 e.printStackTrace();
192 return;
193 }
194 } else {
195
196 out.println("Reading data from local input stream");
197 }
198 progress = 100;
199 return;
200 }
201
202 private void parseInputStream(InputStream ins, String streamName, long streamSize, long totalSize, int baseProgress, int parseType) throws IOException {
203 int progressSize = (int) (CONVERSION_PROGRESS_PORTION * (double) streamSize / totalSize);
204 out.print("Reading in " + streamName + "...");
205 out.flush();
206 CountableInputStream cbis = cbis = new CountableInputStream(ins);
207 BufferedReader brin = new BufferedReader(new InputStreamReader(cbis));
208 String line = brin.readLine();
209 int linenumber = 0;
210 while (line != null) {
211 if (abort) return;
212 switch (parseType) {
213 case FD_GROUP_INDEX:
214 parseFoodGroupLine(line);
215 break;
216 case FOOD_DES_INDEX:
217 USDAFood food = new USDAFood(line, groups);
218 foods.put(food.ndb_id, food);
219 if (!TEST_MODE) {
220 food.addToDB(con);
221 }
222 break;
223 case WEIGHT_INDEX:
224 USDAWeight w = new USDAWeight(line);
225 if (!TEST_MODE) {
226 w.addToDB(con, foods);
227 }
228 break;
229 case NUT_DATA_INDEX:
230 String[] parts = line.split("//^");
231 for (int i = 0; i < parts.length; i++) {
232 parts[i] = parts[i].replaceAll("^~", "");
233 parts[i] = parts[i].replaceAll("~$", "");
234 }
235 double amount = Double.parseDouble(parts[2]);
236
237 try {
238 USDANutrientInfo inf = (USDANutrientInfo) USDANutrientInfo.getNutrientMap().get(parts[1]);
239 if (null == inf) {
240 break;
241 }
242 USDAFood f = (USDAFood) foods.get(parts[0]);
243 if (!added_foods.contains(inf.table + "," + Integer.toHexString(f.hashCode()))) {
244
245 PreparedStatement stmt = getCreateNutrientRow(inf.table);
246 stmt.clearParameters();
247 stmt.setInt(1, f.ID);
248 stmt.addBatch();
249 added_foods.add(inf.table + "," + Integer.toHexString(f.hashCode()));
250 }
251 PreparedStatement stmt = getAddNutrient(inf.table, inf.tag);
252 stmt.clearParameters();
253 stmt.setDouble(1, amount);
254 stmt.setInt(2, f.ID);
255 stmt.addBatch();
256 } catch (SQLException e) {
257 throw new IllegalStateException(e);
258 }
259 if (linenumber % BATCH_SIZE == 0) {
260 executeBatches();
261 }
262 break;
263 default:
264 throw new IllegalArgumentException("Invalid parse type: " + parseType);
265 }
266 progress = (int) (baseProgress + (progressSize * ((double) cbis.getBytesRead() / streamSize)));
267 line = brin.readLine();
268 linenumber++;
269 }
270 executeBatches();
271 out.println("Done.");
272 }
273
274 private PreparedStatement getCreateNutrientRow(String table) {
275 if (null == insertStmts) {
276 insertStmts = new HashMap();
277 }
278 PreparedStatement ret = (PreparedStatement) insertStmts.get(table);
279 if (null == ret) {
280 try {
281 ret = con.prepareStatement("INSERT INTO " + table + " (FID) VALUES (?)");
282 if (logger.isInfoEnabled()) {
283 logger.info("prepared: 'INSERT INTO " + table + " (FID) VALUES (?)'");
284 }
285 } catch (SQLException e) {
286 throw new IllegalStateException(e);
287 }
288 insertStmts.put(table, ret);
289 }
290 return ret;
291 }
292
293 private PreparedStatement getAddNutrient(String table, String tag) {
294 if (null == updateStmts) {
295 updateStmts = new HashMap();
296 }
297 PreparedStatement ret = (PreparedStatement) updateStmts.get(table + "," + tag);
298 if (null == ret) {
299 try {
300 ret = con.prepareStatement("UPDATE " + table + " SET " + tag + " = ? WHERE FID = ?");
301 if (logger.isInfoEnabled()) {
302 logger.info("prepared: 'UPDATE " + table + " SET " + tag + " = ? WHERE FID = ?'");
303 }
304 } catch (SQLException e) {
305 throw new IllegalStateException(e);
306 }
307 updateStmts.put(table + "," + tag, ret);
308 }
309 return ret;
310 }
311
312 private void executeBatches() {
313 if (abort) return;
314 if (null != insertStmts) {
315 for (Iterator iter = insertStmts.values().iterator(); iter.hasNext();) {
316 PreparedStatement stmt = (PreparedStatement) iter.next();
317 try {
318 stmt.executeBatch();
319 } catch (SQLException e) {
320 throw new IllegalStateException(e);
321 }
322 }
323 }
324 if (null != updateStmts) {
325 for (Iterator iter = updateStmts.values().iterator(); iter.hasNext();) {
326 PreparedStatement stmt = (PreparedStatement) iter.next();
327 try {
328 stmt.executeBatch();
329 } catch (SQLException e) {
330 throw new IllegalStateException(e);
331 }
332 }
333 }
334 }
335
336 /***
337 * @param line
338 */
339 private void parseFoodGroupLine(String line) {
340 String[] parts = line.split("//^");
341 for (int i = 0; i < parts.length; i++) {
342 parts[i] = parts[i].replaceAll("^~", "");
343 parts[i] = parts[i].replaceAll("~$", "");
344 }
345 groups.put(parts[0], parts[1]);
346 }
347
348 /***
349 * @param relevantEntries
350 * @return
351 */
352 private long getExtractedBytes(ZipEntry[] relevantEntries) {
353 long nbytes = 0;
354 for (int i = 0; i < relevantEntries.length; i++) {
355 nbytes += relevantEntries[i].getSize();
356 }
357 if (nbytes < 0) {
358 nbytes = Long.MAX_VALUE;
359 }
360 return nbytes;
361 }
362
363 /***
364 * @param zif
365 * @return
366 */
367 private ZipEntry[] getZipEntries(ZipFile zif) {
368 ZipEntry[] relevantEntries = new ZipEntry[4];
369 for(Enumeration e = zif.entries(); e.hasMoreElements();) {
370 ZipEntry went = (ZipEntry) e.nextElement();
371 if (went.getName().equalsIgnoreCase("food_des.txt")) {
372 relevantEntries[FOOD_DES_INDEX] = went;
373 } else if (went.getName().equalsIgnoreCase("nut_data.txt")) {
374 relevantEntries[NUT_DATA_INDEX] = went;
375 } else if (went.getName().equalsIgnoreCase("weight.txt")) {
376 relevantEntries[WEIGHT_INDEX] = went;
377 } else if (went.getName().equalsIgnoreCase("fd_group.txt")) {
378 relevantEntries[FD_GROUP_INDEX] = went;
379 } else {
380
381 }
382 }
383 return relevantEntries;
384 }
385
386 /***
387 * Download a URL into a file. Takes the provided portion of the overall progress.
388 * @param url an URL to download.
389 * @param tempFile a file to save its contents in.
390 * @param progressPortion The portion of the overall progress that this method takes.
391 * @throws FileNotFoundException if the destination file cannot be found.
392 * @throws IOException for all other I/O errors.
393 */
394 private boolean downloadFile(URL url, File tempFile, int progressPortion) throws FileNotFoundException, IOException {
395 FileOutputStream fos = new FileOutputStream(tempFile);
396 out.println("Created temp file: " + tempFile);
397 out.flush();
398 HttpURLConnection uconn = (HttpURLConnection) url.openConnection();
399 out.println("Getting data from " + url);
400 out.flush();
401 InputStream raw = uconn.getInputStream();
402 InputStream in = new BufferedInputStream(raw);
403 int contentLength = uconn.getContentLength();
404 byte[] data = new byte[contentLength];
405 int read = 0;
406 int offs = 0;
407 while (offs < contentLength && !abort) {
408 read = in.read(data, offs, data.length - offs);
409 if (read < 0) {
410 break;
411 }
412 offs += read;
413 progress = (int)(progressPortion * (offs/(double)contentLength));
414 }
415 in.close();
416 if (offs != contentLength) {
417 out.println("Error downloading " + url);
418 out.flush();
419 return false;
420 }
421 out.print("Completed downloading " + url + ", writing...");
422 out.flush();
423 fos.write(data);
424 fos.close();
425 out.println("Done!");
426 out.flush();
427 return true;
428 }
429
430 private volatile int progress = 0;
431 private volatile boolean abort = false;
432 private volatile String curTask;
433
434 public int getTaskProgress() {
435 return progress;
436 }
437
438 public void abortTask() {
439 abort = true;
440 }
441
442 public boolean canAbortTask() {
443 return true;
444 }
445
446 public String getTaskDescription() {
447 return curTask;
448 }
449
450 }