View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Eric Long <erilong@users.sourceforge.net>,
5    *               Chris Henson <chenson42@users.sourceforge.net>
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 3 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, see
19   * <http://www.gnu.org/licenses/>.
20   */
21  
22  package org.jumpmind.symmetric.load.csv;
23  
24  import java.io.BufferedReader;
25  import java.io.IOException;
26  import java.util.List;
27  import java.util.Map;
28  
29  import org.apache.commons.lang.ArrayUtils;
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.jumpmind.symmetric.common.ParameterConstants;
33  import org.jumpmind.symmetric.common.csv.CsvConstants;
34  import org.jumpmind.symmetric.db.BinaryEncoding;
35  import org.jumpmind.symmetric.db.IDbDialect;
36  import org.jumpmind.symmetric.load.DataLoaderContext;
37  import org.jumpmind.symmetric.load.DataLoaderStatistics;
38  import org.jumpmind.symmetric.load.IColumnFilter;
39  import org.jumpmind.symmetric.load.IDataLoader;
40  import org.jumpmind.symmetric.load.IDataLoaderContext;
41  import org.jumpmind.symmetric.load.IDataLoaderFilter;
42  import org.jumpmind.symmetric.load.IDataLoaderStatistics;
43  import org.jumpmind.symmetric.load.TableTemplate;
44  import org.jumpmind.symmetric.model.Node;
45  import org.jumpmind.symmetric.model.Trigger;
46  import org.jumpmind.symmetric.service.IConfigurationService;
47  import org.jumpmind.symmetric.service.INodeService;
48  import org.jumpmind.symmetric.service.IParameterService;
49  import org.springframework.dao.DataIntegrityViolationException;
50  import org.springframework.jdbc.core.JdbcTemplate;
51  
52  import com.csvreader.CsvReader;
53  
54  public class CsvLoader implements IDataLoader {
55  
56      static final Log logger = LogFactory.getLog(CsvLoader.class);
57  
58      protected JdbcTemplate jdbcTemplate;
59  
60      protected IDbDialect dbDialect;
61  
62      protected IParameterService parameterService;
63  
64      protected IConfigurationService configurationService;
65  
66      protected INodeService nodeService;
67  
68      protected CsvReader csvReader;
69  
70      protected DataLoaderContext context;
71  
72      protected DataLoaderStatistics stats;
73  
74      protected List<IDataLoaderFilter> filters;
75  
76      protected Map<String, IColumnFilter> columnFilters;
77  
78      public void open(BufferedReader reader) throws IOException {
79          csvReader = new CsvReader(reader);
80          csvReader.setEscapeMode(CsvReader.ESCAPE_MODE_BACKSLASH);
81          csvReader.setSafetySwitch(false);
82          context = new DataLoaderContext();
83          stats = new DataLoaderStatistics();
84      }
85  
86      public void open(BufferedReader reader, List<IDataLoaderFilter> filters, Map<String, IColumnFilter> columnFilters)
87              throws IOException {
88          open(reader);
89          this.filters = filters;
90          this.columnFilters = columnFilters;
91      }
92  
93      public boolean hasNext() throws IOException {
94          while (csvReader.readRecord()) {
95              String[] tokens = csvReader.getValues();
96  
97              if (tokens[0].equals(CsvConstants.BATCH)) {
98                  context.setBatchId(new Long(tokens[1]));
99                  stats = new DataLoaderStatistics();
100                 prepareTableForDataLoad();
101                 return true;
102             } else if (tokens[0].equals(CsvConstants.NODEID)) {
103                 context.setNodeId(tokens[1]);
104             } else if (tokens[0].equals(CsvConstants.VERSION)) {
105                 context.setVersion(tokens[1] + "." + tokens[2] + "." + tokens[3]);
106             } else if (isMetaTokenParsed(tokens)) {
107                 continue;
108             } else {
109                 throw new RuntimeException("Unexpected token '" + tokens[0] + "' while parsing for next batch");
110             }
111         }
112         return false;
113     }
114 
115     public void skip() throws IOException {
116         context.setSkipping(true);
117         load();
118         // skipping is reset when a new batch_id is set
119     }
120 
121     public void load() throws IOException {
122         BinaryEncoding encoding = BinaryEncoding.NONE;
123         while (csvReader.readRecord()) {
124             String[] tokens = csvReader.getValues();
125             stats.incrementLineCount();
126             if (tokens != null && tokens.length > 0 && tokens[0] != null) {
127                 stats.incrementByteCount(csvReader.getRawRecord().length());
128 
129                 if (tokens[0].equals(CsvConstants.INSERT)) {
130                     if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
131                         insert(tokens, encoding);
132                     }
133                 } else if (tokens[0].equals(CsvConstants.UPDATE)) {
134                     if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
135                         update(tokens, encoding);
136                     }
137                 } else if (tokens[0].equals(CsvConstants.DELETE)) {
138                     if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
139                         delete(tokens);
140                     }
141                 } else if (isMetaTokenParsed(tokens)) {
142                     continue;
143                 } else if (tokens[0].equals(CsvConstants.COMMIT)) {
144                     cleanupAfterDataLoad();
145                     break;
146                 } else if (tokens[0].equals(CsvConstants.SQL)) {
147                     if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
148                         runSql(tokens[1]);
149                     }
150                 } else if (tokens[0].equals(CsvConstants.CREATE)) {
151                     if (!context.isSkipping()) {
152                         runDdl(tokens[1]);
153                     }
154                 } else if (tokens[0].equals(CsvConstants.BINARY)) {
155                     try {
156                         encoding = BinaryEncoding.valueOf(tokens[1]);
157                     } catch (Exception ex) {
158                         logger.warn("Unsupported binary encoding value of " + tokens[1]);
159                     }
160                 } else {
161                     throw new RuntimeException("Unexpected token '" + tokens[0] + "' on line " + stats.getLineCount()
162                             + " of batch " + context.getBatchId());
163                 }
164             }
165         }
166     }
167 
168     protected boolean isMetaTokenParsed(String[] tokens) {
169         boolean isMetaTokenParsed = true;
170         if (tokens[0].equals(CsvConstants.TABLE)) {
171             setTable(tokens[1]);
172         } else if (tokens[0].equals(CsvConstants.KEYS)) {
173             context.setKeyNames((String[]) ArrayUtils.subarray(tokens, 1, tokens.length));
174         } else if (tokens[0].equals(CsvConstants.COLUMNS)) {
175             context.setColumnNames((String[]) ArrayUtils.subarray(tokens, 1, tokens.length));
176         } else {
177             isMetaTokenParsed = false;
178         }
179         return isMetaTokenParsed;
180     }
181 
182     protected void setTable(String tableName) {
183         cleanupAfterDataLoad();
184         context.setTableName(tableName);
185 
186         if (context.getTableTemplate() == null) {
187             String fullTableName = tableName;
188 
189             if (parameterService.is(ParameterConstants.DATA_LOADER_LOOKUP_TARGET_SCHEMA)) {
190                 Node sourceNode = nodeService.findNode(context.getNodeId());
191                 if (sourceNode != null) {
192                     Trigger trigger = configurationService.getTriggerFor(tableName, sourceNode.getNodeGroupId());
193                     if (trigger != null && trigger.getTargetSchemaName() != null) {
194                         fullTableName = trigger.getTargetSchemaName() + "." + tableName;
195                     }
196                 }
197             }
198 
199             context.setTableTemplate(new TableTemplate(jdbcTemplate, dbDialect, fullTableName,
200                     this.columnFilters != null ? this.columnFilters.get(tableName) : null, parameterService
201                             .is(ParameterConstants.DATA_LOADER_NO_KEYS_IN_UPDATE)));
202         }
203 
204         prepareTableForDataLoad();
205     }
206     
207     protected void prepareTableForDataLoad() {
208         if (context != null && context.getTableTemplate() != null) {
209             dbDialect.prepareTableForDataLoad(context.getTableTemplate().getTable());
210         }
211     }
212 
213     protected void cleanupAfterDataLoad() {
214         if (context != null && context.getTableTemplate() != null) {
215             dbDialect.cleanupAfterDataLoad(context.getTableTemplate().getTable());
216         }
217     }
218 
219     protected int insert(String[] tokens, BinaryEncoding encoding) {
220         stats.incrementStatementCount();
221         String[] columnValues = parseColumns(tokens, 1);
222         int rows = 0;
223 
224         boolean continueToLoad = true;
225         if (filters != null) {
226             stats.startTimer();
227             for (IDataLoaderFilter filter : filters) {
228                 continueToLoad &= filter.filterInsert(context, columnValues);
229             }
230             stats.incrementFilterMillis(stats.endTimer());
231         }
232 
233         if (continueToLoad) {
234             boolean enableFallbackUpdate = parameterService.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_UPDATE);
235             Object savepoint = null;
236             try {
237                 stats.startTimer();
238                 if (enableFallbackUpdate) {
239                     savepoint = dbDialect.createSavepointForFallback();
240                 }
241                 rows = context.getTableTemplate().insert(context, columnValues, encoding);
242                 dbDialect.releaseSavepoint(savepoint);
243             } catch (DataIntegrityViolationException e) {
244                 // TODO: modify sql-error-codes.xml for unique constraint vs
245                 // foreign key
246                 if (enableFallbackUpdate) {
247                     dbDialect.rollbackToSavepoint(savepoint);
248                     if (logger.isDebugEnabled()) {
249                         logger.debug("Unable to insert into " + context.getTableName() + ", updating instead: "
250                                 + ArrayUtils.toString(tokens));
251                     }
252                     String keyValues[] = parseKeys(tokens, 1);
253                     stats.incrementFallbackUpdateCount();
254                     rows = context.getTableTemplate().update(context, columnValues, keyValues, encoding);
255                     if (rows == 0) {
256                         throw new RuntimeException("Unable to update " + context.getTableName() + ": "
257                                 + ArrayUtils.toString(tokens), e);
258                     }
259                 } else {
260                     // TODO: log the PK information as an ERROR level.
261                     throw e;
262                 }
263             } finally {
264                 stats.incrementDatabaseMillis(stats.endTimer());
265             }
266         }
267         return rows;
268     }
269 
270     protected int update(String[] tokens, BinaryEncoding encoding) {
271         stats.incrementStatementCount();
272         String columnValues[] = parseColumns(tokens, 1);
273         String keyValues[] = parseKeys(tokens, 1 + columnValues.length);
274         int rows = 0;
275         boolean continueToLoad = true;
276         if (filters != null) {
277             stats.startTimer();
278             for (IDataLoaderFilter filter : filters) {
279                 continueToLoad &= filter.filterUpdate(context, columnValues, keyValues);
280             }
281             stats.incrementFilterMillis(stats.endTimer());
282         }
283 
284         if (continueToLoad) {
285             boolean enableFallbackInsert = parameterService.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_INSERT);
286             stats.startTimer();
287             rows = context.getTableTemplate().update(context, columnValues, keyValues, encoding);
288             if (rows == 0) {
289                 if (enableFallbackInsert) {
290                     if (logger.isDebugEnabled()) {
291                         logger.debug("Unable to update " + context.getTableName() + ", inserting instead: "
292                                 + ArrayUtils.toString(tokens));
293                     }
294                     stats.incrementFallbackInsertCount();
295                     rows = context.getTableTemplate().insert(context, columnValues, encoding);
296                 } else {
297                     // TODO: log the PK information as an ERROR level.
298                     stats.incrementDatabaseMillis(stats.endTimer());
299                     throw new RuntimeException("Unable to update " + context.getTableName() + ": "
300                             + ArrayUtils.toString(tokens));
301                 }
302             } else if (rows > 1) {
303                 logger.warn("Too many rows (" + rows + ") updated for " + context.getTableName() + ": "
304                         + ArrayUtils.toString(tokens));
305             }
306             stats.incrementDatabaseMillis(stats.endTimer());
307         }
308         return rows;
309     }
310 
311     protected int delete(String[] tokens) {
312         stats.incrementStatementCount();
313         String keyValues[] = parseKeys(tokens, 1);
314         int rows = 0;
315         boolean continueToLoad = true;
316 
317         if (filters != null) {
318             stats.startTimer();
319             for (IDataLoaderFilter filter : filters) {
320                 continueToLoad &= filter.filterDelete(context, keyValues);
321             }
322             stats.incrementFilterMillis(stats.endTimer());
323         }
324 
325         if (continueToLoad) {
326             boolean allowMissingDelete = parameterService.is(ParameterConstants.DATA_LOADER_ALLOW_MISSING_DELETE);
327             stats.startTimer();
328             rows = context.getTableTemplate().delete(context, keyValues);
329             stats.incrementDatabaseMillis(stats.endTimer());
330             if (rows == 0) {
331                 if (allowMissingDelete) {
332                     logger.warn("Delete of " + context.getTableName() + " affected no rows: "
333                             + ArrayUtils.toString(tokens));
334                     stats.incrementMissingDeleteCount();
335                 } else {
336                     throw new RuntimeException("Delete of " + context.getTableName() + " affected no rows: "
337                             + ArrayUtils.toString(tokens));
338                 }
339             }
340         }
341         return rows;
342     }
343 
344     protected void runSql(String sql) {
345         stats.incrementStatementCount();
346         if (logger.isDebugEnabled()) {
347             logger.debug("Running SQL: " + sql);
348         }
349         jdbcTemplate.execute(sql);
350     }
351 
352     protected void runDdl(String xml) {
353         stats.incrementStatementCount();
354         if (logger.isDebugEnabled()) {
355             logger.debug("Running DDL: " + xml);
356         }
357         dbDialect.createTables(xml);
358         context.getTableTemplate().resetMetaData();
359     }
360 
361     protected String[] parseKeys(String[] tokens, int startIndex) {
362         if (context.getTableTemplate().getKeyNames() == null) {
363             throw new RuntimeException("Key names were not specified for table "
364                     + context.getTableTemplate().getTableName());
365         }
366         int keyLength = context.getTableTemplate().getKeyNames().length;
367         return parseValues("key", tokens, startIndex, startIndex + keyLength);
368     }
369 
370     protected String[] parseColumns(String[] tokens, int startIndex) {
371         if (context.getTableTemplate().getColumnNames() == null) {
372             throw new RuntimeException("Column names were not specified for table "
373                     + context.getTableTemplate().getTableName());
374         }
375         int columnLength = context.getTableTemplate().getColumnNames().length;
376         return parseValues("column", tokens, startIndex, startIndex + columnLength);
377     }
378 
379     protected String[] parseValues(String name, String[] tokens, int startIndex, int endIndex) {
380         if (tokens.length < endIndex) {
381             throw new RuntimeException("Expected to have " + (endIndex - startIndex) + " " + name + " values for "
382                     + context.getTableTemplate().getTableName() + ": " + ArrayUtils.toString(tokens));
383         }
384         return (String[]) ArrayUtils.subarray(tokens, startIndex, endIndex);
385     }
386 
387     public IDataLoader clone() {
388         CsvLoader dataLoader = new CsvLoader();
389         dataLoader.setJdbcTemplate(jdbcTemplate);
390         dataLoader.setDbDialect(dbDialect);
391         dataLoader.setParameterService(parameterService);
392         dataLoader.setConfigurationService(configurationService);
393         dataLoader.setNodeService(nodeService);
394         return dataLoader;
395     }
396 
397     public void close() {
398 
399         if (csvReader != null) {
400             csvReader.close();
401         }
402 
403     }
404 
405     public IDataLoaderContext getContext() {
406         return context;
407     }
408 
409     public IDataLoaderStatistics getStatistics() {
410         return stats;
411     }
412 
413     public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
414         this.jdbcTemplate = jdbcTemplate;
415     }
416 
417     public void setDbDialect(IDbDialect dbDialect) {
418         this.dbDialect = dbDialect;
419     }
420 
421     public void setParameterService(IParameterService parameterService) {
422         this.parameterService = parameterService;
423     }
424 
425     public void setConfigurationService(IConfigurationService configurationService) {
426         this.configurationService = configurationService;
427     }
428 
429     public void setNodeService(INodeService nodeService) {
430         this.nodeService = nodeService;
431     }
432 
433 }