View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Eric Long <erilong@users.sourceforge.net>,
5    *               Chris Henson <chenson42@users.sourceforge.net>
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 3 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, see
19   * <http://www.gnu.org/licenses/>.
20   */
21  
22  package org.jumpmind.symmetric.load.csv;
23  
24  import java.io.BufferedReader;
25  import java.io.IOException;
26  import java.util.List;
27  import java.util.Map;
28  
29  import org.apache.commons.lang.ArrayUtils;
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.jumpmind.symmetric.common.ParameterConstants;
33  import org.jumpmind.symmetric.common.csv.CsvConstants;
34  import org.jumpmind.symmetric.db.BinaryEncoding;
35  import org.jumpmind.symmetric.db.IDbDialect;
36  import org.jumpmind.symmetric.load.DataLoaderContext;
37  import org.jumpmind.symmetric.load.DataLoaderStatistics;
38  import org.jumpmind.symmetric.load.IColumnFilter;
39  import org.jumpmind.symmetric.load.IDataLoader;
40  import org.jumpmind.symmetric.load.IDataLoaderContext;
41  import org.jumpmind.symmetric.load.IDataLoaderFilter;
42  import org.jumpmind.symmetric.load.IDataLoaderStatistics;
43  import org.jumpmind.symmetric.load.TableTemplate;
44  import org.jumpmind.symmetric.model.Node;
45  import org.jumpmind.symmetric.model.Trigger;
46  import org.jumpmind.symmetric.service.IConfigurationService;
47  import org.jumpmind.symmetric.service.INodeService;
48  import org.jumpmind.symmetric.service.IParameterService;
49  import org.springframework.dao.DataIntegrityViolationException;
50  import org.springframework.jdbc.core.JdbcTemplate;
51  
52  import com.csvreader.CsvReader;
53  
54  public class CsvLoader implements IDataLoader {
55  
56      static final Log logger = LogFactory.getLog(CsvLoader.class);
57  
58      protected JdbcTemplate jdbcTemplate;
59  
60      protected IDbDialect dbDialect;
61  
62      protected IParameterService parameterService;
63  
64      protected IConfigurationService configurationService;
65  
66      protected INodeService nodeService;
67  
68      protected CsvReader csvReader;
69  
70      protected DataLoaderContext context;
71  
72      protected DataLoaderStatistics stats;
73  
74      protected List<IDataLoaderFilter> filters;
75  
76      protected Map<String, IColumnFilter> columnFilters;
77  
78      public void open(BufferedReader reader) throws IOException {
79          csvReader = new CsvReader(reader);
80          csvReader.setEscapeMode(CsvReader.ESCAPE_MODE_BACKSLASH);
81          csvReader.setSafetySwitch(false);
82          context = new DataLoaderContext();
83          stats = new DataLoaderStatistics();
84      }
85  
86      public void open(BufferedReader reader, List<IDataLoaderFilter> filters, Map<String, IColumnFilter> columnFilters)
87              throws IOException {
88          open(reader);
89          this.filters = filters;
90          this.columnFilters = columnFilters;
91      }
92  
93      public boolean hasNext() throws IOException {
94          while (csvReader.readRecord()) {
95              String[] tokens = csvReader.getValues();
96  
97              if (tokens[0].equals(CsvConstants.BATCH)) {
98                  context.setBatchId(new Long(tokens[1]));
99                  stats = new DataLoaderStatistics();
100                 return true;
101             } else if (tokens[0].equals(CsvConstants.NODEID)) {
102                 context.setNodeId(tokens[1]);
103             } else if (tokens[0].equals(CsvConstants.VERSION)) {
104                 context.setVersion(tokens[1] + "." + tokens[2] + "." + tokens[3]);
105             } else if (isMetaTokenParsed(tokens)) {
106                 continue;
107             } else {
108                 throw new RuntimeException("Unexpected token '" + tokens[0] + "' while parsing for next batch");
109             }
110         }
111         return false;
112     }
113 
114     public void skip() throws IOException {
115         context.setSkipping(true);
116         load();
117         // skipping is reset when a new batch_id is set
118     }
119 
120     public void load() throws IOException {
121         try {
122             prepareTableForDataLoad();
123             BinaryEncoding encoding = BinaryEncoding.NONE;
124             while (csvReader.readRecord()) {
125                 String[] tokens = csvReader.getValues();
126                 stats.incrementLineCount();
127                 if (tokens != null && tokens.length > 0 && tokens[0] != null) {
128                     stats.incrementByteCount(csvReader.getRawRecord().length());
129 
130                     if (tokens[0].equals(CsvConstants.INSERT)) {
131                         if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
132                             insert(tokens, encoding);
133                         }
134                     } else if (tokens[0].equals(CsvConstants.UPDATE)) {
135                         if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
136                             update(tokens, encoding);
137                         }
138                     } else if (tokens[0].equals(CsvConstants.DELETE)) {
139                         if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
140                             delete(tokens);
141                         }
142                     } else if (tokens[0].equals(CsvConstants.OLD)) {
143                         context.setOldData((String[]) ArrayUtils.subarray(tokens, 1, tokens.length));
144                     } else if (isMetaTokenParsed(tokens)) {
145                         continue;
146                     } else if (tokens[0].equals(CsvConstants.COMMIT)) {
147                         break;
148                     } else if (tokens[0].equals(CsvConstants.SQL)) {
149                         if ((context.getTableTemplate() == null || !context.getTableTemplate().isIgnoreThisTable()) && !context.isSkipping()) {
150                             runSql(tokens[1]);
151                         }
152                     } else if (tokens[0].equals(CsvConstants.CREATE)) {
153                         if (!context.isSkipping()) {
154                             runDdl(tokens[1]);
155                         }
156                     } else if (tokens[0].equals(CsvConstants.BINARY)) {
157                         try {
158                             encoding = BinaryEncoding.valueOf(tokens[1]);
159                         } catch (Exception ex) {
160                             logger.warn("Unsupported binary encoding value of " + tokens[1]);
161                         }
162                     } else {
163                         throw new RuntimeException("Unexpected token '" + tokens[0] + "' on line "
164                                 + stats.getLineCount() + " of batch " + context.getBatchId());
165                     }
166                 }
167             }
168         } finally {
169             cleanupAfterDataLoad();
170         }
171     }
172 
173     protected boolean isMetaTokenParsed(String[] tokens) {
174         boolean isMetaTokenParsed = true;
175         if (tokens[0].equals(CsvConstants.TABLE)) {
176             setTable(tokens[1]);
177         } else if (tokens[0].equals(CsvConstants.KEYS)) {
178             context.setKeyNames((String[]) ArrayUtils.subarray(tokens, 1, tokens.length));
179         } else if (tokens[0].equals(CsvConstants.COLUMNS)) {
180             context.setColumnNames((String[]) ArrayUtils.subarray(tokens, 1, tokens.length));
181         } else {
182             isMetaTokenParsed = false;
183         }
184         return isMetaTokenParsed;
185     }
186 
187     protected void setTable(String tableName) {
188         cleanupAfterDataLoad();
189         context.setTableName(tableName);
190 
191         if (context.getTableTemplate() == null) {
192             String fullTableName = tableName;
193 
194             if (parameterService.is(ParameterConstants.DATA_LOADER_LOOKUP_TARGET_SCHEMA)) {
195                 Node sourceNode = nodeService.findNode(context.getNodeId());
196                 if (sourceNode != null) {
197                     Trigger trigger = configurationService.getTriggerFor(tableName, sourceNode.getNodeGroupId());
198                     if (trigger != null && trigger.getTargetSchemaName() != null) {
199                         fullTableName = trigger.getTargetSchemaName() + "." + tableName;
200                     }
201                 }
202             }
203 
204             context.setTableTemplate(new TableTemplate(jdbcTemplate, dbDialect, fullTableName,
205                     this.columnFilters != null ? this.columnFilters.get(tableName) : null, parameterService
206                             .is(ParameterConstants.DATA_LOADER_NO_KEYS_IN_UPDATE)));
207         }
208         prepareTableForDataLoad();
209     }
210 
211     protected void prepareTableForDataLoad() {
212         if (context != null && context.getTableTemplate() != null) {
213             dbDialect.prepareTableForDataLoad(context.getTableTemplate().getTable());
214         }
215     }
216 
217     protected void cleanupAfterDataLoad() {
218         if (context != null && context.getTableTemplate() != null) {
219             dbDialect.cleanupAfterDataLoad(context.getTableTemplate().getTable());
220         }
221     }
222 
223     protected int insert(String[] tokens, BinaryEncoding encoding) {
224         stats.incrementStatementCount();
225         String[] columnValues = parseColumns(tokens, 1);
226         int rows = 0;
227 
228         boolean continueToLoad = true;
229         if (filters != null) {
230             stats.startTimer();
231             for (IDataLoaderFilter filter : filters) {
232                 continueToLoad &= filter.filterInsert(context, columnValues);
233             }
234             stats.incrementFilterMillis(stats.endTimer());
235         }
236 
237         if (continueToLoad) {
238             boolean enableFallbackUpdate = parameterService.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_UPDATE);
239             Object savepoint = null;
240             try {
241                 stats.startTimer();
242                 if (enableFallbackUpdate) {
243                     savepoint = dbDialect.createSavepointForFallback();
244                 }
245                 rows = context.getTableTemplate().insert(context, columnValues, encoding);
246                 dbDialect.releaseSavepoint(savepoint);
247             } catch (DataIntegrityViolationException e) {
248                 // TODO: modify sql-error-codes.xml for unique constraint vs
249                 // foreign key
250                 if (enableFallbackUpdate) {
251                     dbDialect.rollbackToSavepoint(savepoint);
252                     if (logger.isDebugEnabled()) {
253                         logger.debug("Unable to insert into " + context.getTableName() + ", updating instead: "
254                                 + ArrayUtils.toString(tokens));
255                     }
256                     String keyValues[] = parseKeys(tokens, 1);
257                     stats.incrementFallbackUpdateCount();
258                     rows = context.getTableTemplate().update(context, columnValues, keyValues, encoding);
259                     if (rows == 0) {
260                         throw new RuntimeException("Unable to update " + context.getTableName() + ": "
261                                 + ArrayUtils.toString(tokens), e);
262                     }
263                 } else {
264                     // TODO: log the PK information as an ERROR level.
265                     throw e;
266                 }
267             } finally {
268                 stats.incrementDatabaseMillis(stats.endTimer());
269             }
270         }
271         return rows;
272     }
273 
274     protected int update(String[] tokens, BinaryEncoding encoding) {
275         stats.incrementStatementCount();
276         String columnValues[] = parseColumns(tokens, 1);
277         String keyValues[] = parseKeys(tokens, 1 + columnValues.length);
278         int rows = 0;
279         boolean continueToLoad = true;
280         if (filters != null) {
281             stats.startTimer();
282             for (IDataLoaderFilter filter : filters) {
283                 continueToLoad &= filter.filterUpdate(context, columnValues, keyValues);
284             }
285             stats.incrementFilterMillis(stats.endTimer());
286         }
287 
288         if (continueToLoad) {
289             boolean enableFallbackInsert = parameterService.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_INSERT);
290             stats.startTimer();
291             rows = context.getTableTemplate().update(context, columnValues, keyValues, encoding);
292             if (rows == 0) {
293                 if (enableFallbackInsert) {
294                     if (logger.isDebugEnabled()) {
295                         logger.debug("Unable to update " + context.getTableName() + ", inserting instead: "
296                                 + ArrayUtils.toString(tokens));
297                     }
298                     stats.incrementFallbackInsertCount();
299                     rows = context.getTableTemplate().insert(context, columnValues, encoding);
300                 } else {
301                     // TODO: log the PK information as an ERROR level.
302                     stats.incrementDatabaseMillis(stats.endTimer());
303                     throw new RuntimeException("Unable to update " + context.getTableName() + ": "
304                             + ArrayUtils.toString(tokens));
305                 }
306             } else if (rows > 1) {
307                 logger.warn("Too many rows (" + rows + ") updated for " + context.getTableName() + ": "
308                         + ArrayUtils.toString(tokens));
309             }
310             stats.incrementDatabaseMillis(stats.endTimer());
311         }
312         return rows;
313     }
314 
315     protected int delete(String[] tokens) {
316         stats.incrementStatementCount();
317         String keyValues[] = parseKeys(tokens, 1);
318         int rows = 0;
319         boolean continueToLoad = true;
320 
321         if (filters != null) {
322             stats.startTimer();
323             for (IDataLoaderFilter filter : filters) {
324                 continueToLoad &= filter.filterDelete(context, keyValues);
325             }
326             stats.incrementFilterMillis(stats.endTimer());
327         }
328 
329         if (continueToLoad) {
330             boolean allowMissingDelete = parameterService.is(ParameterConstants.DATA_LOADER_ALLOW_MISSING_DELETE);
331             stats.startTimer();
332             rows = context.getTableTemplate().delete(context, keyValues);
333             stats.incrementDatabaseMillis(stats.endTimer());
334             if (rows == 0) {
335                 if (allowMissingDelete) {
336                     logger.warn("Delete of " + context.getTableName() + " affected no rows: "
337                             + ArrayUtils.toString(tokens));
338                     stats.incrementMissingDeleteCount();
339                 } else {
340                     throw new RuntimeException("Delete of " + context.getTableName() + " affected no rows: "
341                             + ArrayUtils.toString(tokens));
342                 }
343             }
344         }
345         return rows;
346     }
347 
348     protected void runSql(String sql) {
349         stats.incrementStatementCount();
350         if (logger.isDebugEnabled()) {
351             logger.debug("Running SQL: " + sql);
352         }
353         jdbcTemplate.execute(sql);
354         if (context.getTableTemplate() != null) {
355             context.getTableTemplate().resetMetaData();
356         }
357     }
358 
359     protected void runDdl(String xml) {
360         stats.incrementStatementCount();
361         if (logger.isDebugEnabled()) {
362             logger.debug("Running DDL: " + xml);
363         }
364         dbDialect.createTables(xml);
365         context.getTableTemplate().resetMetaData();
366     }
367 
368     protected String[] parseKeys(String[] tokens, int startIndex) {
369         if (context.getTableTemplate().getKeyNames() == null) {
370             throw new RuntimeException("Key names were not specified for table "
371                     + context.getTableTemplate().getTableName());
372         }
373         int keyLength = context.getTableTemplate().getKeyNames().length;
374         return parseValues("key", tokens, startIndex, startIndex + keyLength);
375     }
376 
377     protected String[] parseColumns(String[] tokens, int startIndex) {
378         if (context.getTableTemplate().getColumnNames() == null) {
379             throw new RuntimeException("Column names were not specified for table "
380                     + context.getTableTemplate().getTableName());
381         }
382         int columnLength = context.getTableTemplate().getColumnNames().length;
383         return parseValues("column", tokens, startIndex, startIndex + columnLength);
384     }
385 
386     protected String[] parseValues(String name, String[] tokens, int startIndex, int endIndex) {
387         if (tokens.length < endIndex) {
388             throw new RuntimeException("Expected to have " + (endIndex - startIndex) + " " + name + " values for "
389                     + context.getTableTemplate().getTableName() + ": " + ArrayUtils.toString(tokens));
390         }
391         return (String[]) ArrayUtils.subarray(tokens, startIndex, endIndex);
392     }
393 
394     public IDataLoader clone() {
395         CsvLoader dataLoader = new CsvLoader();
396         dataLoader.setJdbcTemplate(jdbcTemplate);
397         dataLoader.setDbDialect(dbDialect);
398         dataLoader.setParameterService(parameterService);
399         dataLoader.setConfigurationService(configurationService);
400         dataLoader.setNodeService(nodeService);
401         return dataLoader;
402     }
403 
404     public void close() {
405         if (csvReader != null) {
406             csvReader.close();
407         }
408     }
409 
410     public IDataLoaderContext getContext() {
411         return context;
412     }
413 
414     public IDataLoaderStatistics getStatistics() {
415         return stats;
416     }
417 
418     public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
419         this.jdbcTemplate = jdbcTemplate;
420     }
421 
422     public void setDbDialect(IDbDialect dbDialect) {
423         this.dbDialect = dbDialect;
424     }
425 
426     public void setParameterService(IParameterService parameterService) {
427         this.parameterService = parameterService;
428     }
429 
430     public void setConfigurationService(IConfigurationService configurationService) {
431         this.configurationService = configurationService;
432     }
433 
434     public void setNodeService(INodeService nodeService) {
435         this.nodeService = nodeService;
436     }
437 
438 }