View Javadoc

1   /*
2    * Created on 19-Mar-2005
3    */
4   package ca.spaz.cron.datasource.sql.USDAsr17;
5   
6   import java.io.*;
7   import java.net.*;
8   import java.sql.*;
9   import java.util.*;
10  import java.util.zip.*;
11  
12  import org.apache.log4j.Logger;
13  
14  import ca.spaz.task.Task;
15  import ca.spaz.util.*;
16  
17  public class USDAImporter implements Task {
18     /***
19      * 
20      */
21     private static final int BATCH_SIZE = 100;
22  
23     private static final boolean TEST_MODE = false;
24     
25     /***
26      * 
27      */
28     private static final int CONVERSION_PROGRESS_PORTION = 60;
29  
30     /***
31      * 
32      */
33     private static final int DOWNLOAD_PROGRESS_PORTION = 35;
34  
35     /***
36      * 
37      */
38     private static final int FD_GROUP_INDEX = 3;
39  
40     /***
41      * 
42      */
43     private static final int WEIGHT_INDEX = 2;
44  
45     /***
46      * 
47      */
48     private static final int NUT_DATA_INDEX = 1;
49  
50     /***
51      * 
52      */
53     private static final int FOOD_DES_INDEX = 0;
54  
55     /***
56      * Logger for this class
57      */
58     private static final Logger logger = Logger.getLogger(USDAImporter.class);
59  
60     private HashMap groups = new HashMap();
61     private HashMap foods = new HashMap();
62  
63     private Connection con;
64  
65     private static final int HASH_MAX = 50;
66  
67     private URL sourceURL;
68  
69     private InputStream sourceStream;
70  
71     private PrintStream out;
72  
73     private HashMap insertStmts;
74  
75     private HashMap updateStmts;
76  
77     private Set added_foods;
78     
79     public USDAImporter(Connection conn, OutputStream out) {
80        this.con = conn;
81        this.added_foods = new HashSet();
82        if (null == out) {
83           this.out = System.out;
84        } else {
85           this.out = new PrintStream(out);
86        }
87     }
88  
89  public InputStream getSourceStream() {
90        return sourceStream;
91     }
92  
93     public void setSourceStream(InputStream sourceStream) {
94        this.sourceStream = sourceStream;
95     }
96  
97     public URL getSourceURL() {
98        return sourceURL;
99     }
100 
101    public void setSourceURL(URL sourceURL) {
102       this.sourceURL = sourceURL;
103    }
104 
105    /* (non-Javadoc)
106     * @see com.sun.swingutils.SwingWorker#construct()
107     */
108    public void run() {
109       abort = false;
110       curTask = "Importing USDA sr17";
111       if (null == sourceURL && null == sourceStream) {         
112          return;
113       }
114       if (null == sourceStream) {
115 //       From a source URL -- this will probably also be a zip.
116          try {
117             File tempDir = new File(System.getProperty("java.io.tmpdir"));
118             tempDir = new File(tempDir, "usda_dl");
119             if (tempDir.exists()) {
120                out.print("Clearing old temp dir... ");
121                out.flush();
122                ToolBox.deleteDir(tempDir);
123                out.println("Done!");
124                out.flush();
125             }
126             out.println("Creating temp directory.");
127             out.flush();
128             if (!tempDir.mkdir()) {
129                // Indicate that it aborted, somehow.
130                return;
131             }
132             File tempFile = new File(tempDir, "usda_sr17_temp.zip");
133             if (!downloadFile(sourceURL, tempFile, DOWNLOAD_PROGRESS_PORTION)) {
134                return; // Indicate failure.
135             }
136             if (abort) return;
137             ZipFile zif = new ZipFile(tempFile);
138             ZipEntry[] relevantEntries = getZipEntries(zif);
139             // Prepare the progress counts, starting with 50%, for the number of bytes
140             // in each zip entry.
141             long nbytes = getExtractedBytes(relevantEntries);
142 
143             progress = DOWNLOAD_PROGRESS_PORTION;
144             InputStream ins;
145             int idx = FD_GROUP_INDEX;
146             int baseProgress = progress;
147             ins = zif.getInputStream(relevantEntries[idx]);
148             parseInputStream(ins, "food groups", relevantEntries[idx].getSize(), nbytes, baseProgress, idx);
149             try {
150                PreparedStatement ps = con
151                      .prepareStatement("INSERT INTO FoodGroup VALUES (?)");
152                for (Iterator iter = groups.values().iterator(); iter.hasNext();) {
153                   String grp = (String) iter.next();
154                   ps.clearParameters();
155                   ps.setString(1, grp);
156                   ps.execute();
157                }
158             } catch (SQLException e) {
159                out.println("[ERROR] Could not import food groups: " + e.getMessage());
160             }
161             if (!abort) {
162                idx = FOOD_DES_INDEX;
163                baseProgress = progress;
164                ins = zif.getInputStream(relevantEntries[idx]);
165                parseInputStream(ins, "food descriptions", relevantEntries[idx].getSize(), nbytes, baseProgress, idx);
166             }
167             if (!abort) {
168                idx = WEIGHT_INDEX;
169                baseProgress = progress;
170                ins = zif.getInputStream(relevantEntries[idx]);
171                parseInputStream(ins, "measures", relevantEntries[idx].getSize(), nbytes, baseProgress, idx);
172             }
173             if (!abort) {
174                idx = NUT_DATA_INDEX;
175                baseProgress = progress;
176                ins = zif.getInputStream(relevantEntries[idx]);
177                parseInputStream(ins, "nutrients", relevantEntries[idx].getSize(), nbytes, baseProgress, idx);
178             }
179             zif.close();
180             if (ToolBox.deleteDir(tempDir)) {
181                out.println("Succeeded in clearing temp");
182                out.flush();
183             } else {
184                out.println("Failed to clear temp - delete " + tempFile + " manually.");
185                out.flush();
186             }
187          } catch (FileNotFoundException e) {
188             e.printStackTrace();
189             return;
190          } catch (IOException e) {
191             e.printStackTrace();
192             return;
193          }
194       } else {
195 //       From an input stream.  Assume that it is the zip.
196          out.println("Reading data from local input stream");
197       }
198       progress = 100;
199       return;
200    }
201    
202    private void parseInputStream(InputStream ins, String streamName, long streamSize, long totalSize, int baseProgress, int parseType) throws IOException {
203       int progressSize = (int) (CONVERSION_PROGRESS_PORTION * (double) streamSize / totalSize);
204       out.print("Reading in " + streamName + "...");
205       out.flush();
206       CountableInputStream cbis = cbis = new CountableInputStream(ins);
207       BufferedReader brin = new BufferedReader(new InputStreamReader(cbis));
208       String line = brin.readLine();
209       int linenumber = 0;
210       while (line != null) {
211          if (abort) return;
212          switch (parseType) {
213          case FD_GROUP_INDEX:
214             parseFoodGroupLine(line);
215             break;
216          case FOOD_DES_INDEX:
217             USDAFood food = new USDAFood(line, groups);
218             foods.put(food.ndb_id, food);
219             if (!TEST_MODE) {
220                food.addToDB(con);
221             }
222             break;
223          case WEIGHT_INDEX:
224             USDAWeight w = new USDAWeight(line);            
225             if (!TEST_MODE) {
226                w.addToDB(con, foods);
227             }  
228             break;
229          case NUT_DATA_INDEX:
230             String[] parts = line.split("//^");
231             for (int i = 0; i < parts.length; i++) {
232                parts[i] = parts[i].replaceAll("^~", "");
233                parts[i] = parts[i].replaceAll("~$", "");
234             }
235             double amount = Double.parseDouble(parts[2]);
236             //if (amount <= 0) continue;
237             try {
238                USDANutrientInfo inf = (USDANutrientInfo) USDANutrientInfo.getNutrientMap().get(parts[1]);
239                if (null == inf) {
240                   break;
241                }
242                USDAFood f = (USDAFood) foods.get(parts[0]);
243                if (!added_foods.contains(inf.table + "," + Integer.toHexString(f.hashCode()))) {
244                   // Need to add the food
245                   PreparedStatement stmt = getCreateNutrientRow(inf.table);
246                   stmt.clearParameters();
247                   stmt.setInt(1, f.ID);
248                   stmt.addBatch();
249                   added_foods.add(inf.table + "," + Integer.toHexString(f.hashCode()));
250                }
251                PreparedStatement stmt = getAddNutrient(inf.table, inf.tag);
252                stmt.clearParameters();
253                stmt.setDouble(1, amount);
254                stmt.setInt(2, f.ID);
255                stmt.addBatch();
256             } catch (SQLException e) {
257                throw new IllegalStateException(e);
258             }
259             if (linenumber % BATCH_SIZE == 0) {
260                executeBatches();
261             }
262             break;
263          default:
264             throw new IllegalArgumentException("Invalid parse type: " + parseType);
265          }
266          progress = (int) (baseProgress + (progressSize * ((double) cbis.getBytesRead() / streamSize)));
267          line = brin.readLine();
268          linenumber++;
269       }
270       executeBatches();
271       out.println("Done.");
272    }
273    
274    private PreparedStatement getCreateNutrientRow(String table) {
275       if (null == insertStmts) {
276          insertStmts = new HashMap();
277       }
278       PreparedStatement ret = (PreparedStatement) insertStmts.get(table);
279       if (null == ret) {
280          try {
281             ret = con.prepareStatement("INSERT INTO " + table + " (FID) VALUES (?)");
282             if (logger.isInfoEnabled()) {
283                logger.info("prepared: 'INSERT INTO " + table + " (FID) VALUES (?)'");
284             }
285          } catch (SQLException e) {
286             throw new IllegalStateException(e);
287          }
288          insertStmts.put(table, ret);
289       }
290       return ret;
291    }
292    
293    private PreparedStatement getAddNutrient(String table, String tag) {
294       if (null == updateStmts) {
295          updateStmts = new HashMap();
296       }
297       PreparedStatement ret = (PreparedStatement) updateStmts.get(table + "," + tag);
298       if (null == ret) {
299          try {
300             ret = con.prepareStatement("UPDATE " + table + " SET " + tag + " = ? WHERE FID = ?");
301             if (logger.isInfoEnabled()) {
302                logger.info("prepared: 'UPDATE " + table + " SET " + tag + " = ? WHERE FID = ?'");
303             }
304          } catch (SQLException e) {
305             throw new IllegalStateException(e);
306          }
307          updateStmts.put(table + "," + tag, ret);
308       }
309       return ret;
310    }
311    
312    private void executeBatches() {
313       if (abort) return;
314       if (null != insertStmts) {
315          for (Iterator iter = insertStmts.values().iterator(); iter.hasNext();) {
316             PreparedStatement stmt = (PreparedStatement) iter.next();
317             try {
318                stmt.executeBatch();
319             } catch (SQLException e) {
320                throw new IllegalStateException(e);
321             }
322          }
323       }
324       if (null != updateStmts) {
325          for (Iterator iter = updateStmts.values().iterator(); iter.hasNext();) {
326             PreparedStatement stmt = (PreparedStatement) iter.next();
327             try {
328                stmt.executeBatch();
329             } catch (SQLException e) {
330                throw new IllegalStateException(e);
331             }
332          }
333       }
334    }
335 
336    /***
337     * @param line
338     */
339    private void parseFoodGroupLine(String line) {
340       String[] parts = line.split("//^");
341       for (int i = 0; i < parts.length; i++) {
342          parts[i] = parts[i].replaceAll("^~", "");
343          parts[i] = parts[i].replaceAll("~$", "");
344       }
345       groups.put(parts[0], parts[1]);
346    }
347 
348    /***
349     * @param relevantEntries
350     * @return
351     */
352    private long getExtractedBytes(ZipEntry[] relevantEntries) {
353       long nbytes = 0;
354       for (int i = 0; i < relevantEntries.length; i++) {
355          nbytes += relevantEntries[i].getSize();
356       }
357       if (nbytes < 0) {
358          nbytes = Long.MAX_VALUE;
359       }
360       return nbytes;
361    }
362 
363    /***
364     * @param zif
365     * @return
366     */
367    private ZipEntry[] getZipEntries(ZipFile zif) {
368       ZipEntry[] relevantEntries = new ZipEntry[4];
369       for(Enumeration e = zif.entries(); e.hasMoreElements();) {
370          ZipEntry went = (ZipEntry) e.nextElement();
371          if (went.getName().equalsIgnoreCase("food_des.txt")) {
372             relevantEntries[FOOD_DES_INDEX] = went;
373          } else if (went.getName().equalsIgnoreCase("nut_data.txt")) {
374             relevantEntries[NUT_DATA_INDEX] = went;
375          } else if (went.getName().equalsIgnoreCase("weight.txt")) {
376             relevantEntries[WEIGHT_INDEX] = went;
377          } else if (went.getName().equalsIgnoreCase("fd_group.txt")) {
378             relevantEntries[FD_GROUP_INDEX] = went;
379          } else {
380             // Irrelevant file.
381          }
382       }
383       return relevantEntries;
384    }
385 
386    /***
387     * Download a URL into a file.  Takes the provided portion of the overall progress.
388     * @param url an URL to download.
389     * @param tempFile a file to save its contents in.
390     * @param progressPortion The portion of the overall progress that this method takes.
391     * @throws FileNotFoundException if the destination file cannot be found.
392     * @throws IOException for all other I/O errors.
393     */
394    private boolean downloadFile(URL url, File tempFile, int progressPortion) throws FileNotFoundException, IOException {
395       FileOutputStream fos = new FileOutputStream(tempFile);
396       out.println("Created temp file: " + tempFile);
397       out.flush();
398       HttpURLConnection uconn = (HttpURLConnection) url.openConnection();
399       out.println("Getting data from " + url);
400       out.flush();
401       InputStream raw = uconn.getInputStream();
402       InputStream in = new BufferedInputStream(raw);
403       int contentLength = uconn.getContentLength();
404       byte[] data = new byte[contentLength];
405       int read = 0;
406       int offs = 0;
407       while (offs < contentLength && !abort) {
408          read = in.read(data, offs, data.length - offs);
409          if (read < 0) {
410             break;
411          }
412          offs += read;
413          progress = (int)(progressPortion * (offs/(double)contentLength));
414       }
415       in.close();
416       if (offs != contentLength) {
417          out.println("Error downloading " + url);
418          out.flush();
419          return false;
420       }
421       out.print("Completed downloading " + url + ", writing...");
422       out.flush();
423       fos.write(data);
424       fos.close();
425       out.println("Done!");
426       out.flush();
427       return true;
428    }
429 
430    private volatile int progress = 0;
431    private volatile boolean abort = false;
432    private volatile String curTask;
433    
434    public int getTaskProgress() {
435       return progress;
436    }
437 
438    public void abortTask() {
439       abort = true;
440    }
441 
442    public boolean canAbortTask() {
443       return true;
444    }
445 
446    public String getTaskDescription() {
447       return curTask;
448    }
449 
450 }