1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.jumpmind.symmetric.load.csv;
23
24 import java.io.BufferedReader;
25 import java.io.IOException;
26 import java.util.List;
27 import java.util.Map;
28
29 import org.apache.commons.lang.ArrayUtils;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.jumpmind.symmetric.common.ParameterConstants;
33 import org.jumpmind.symmetric.common.csv.CsvConstants;
34 import org.jumpmind.symmetric.db.BinaryEncoding;
35 import org.jumpmind.symmetric.db.IDbDialect;
36 import org.jumpmind.symmetric.load.DataLoaderContext;
37 import org.jumpmind.symmetric.load.DataLoaderStatistics;
38 import org.jumpmind.symmetric.load.IColumnFilter;
39 import org.jumpmind.symmetric.load.IDataLoader;
40 import org.jumpmind.symmetric.load.IDataLoaderContext;
41 import org.jumpmind.symmetric.load.IDataLoaderFilter;
42 import org.jumpmind.symmetric.load.IDataLoaderStatistics;
43 import org.jumpmind.symmetric.load.TableTemplate;
44 import org.jumpmind.symmetric.model.Node;
45 import org.jumpmind.symmetric.model.Trigger;
46 import org.jumpmind.symmetric.service.IConfigurationService;
47 import org.jumpmind.symmetric.service.INodeService;
48 import org.jumpmind.symmetric.service.IParameterService;
49 import org.springframework.dao.DataIntegrityViolationException;
50 import org.springframework.jdbc.core.JdbcTemplate;
51
52 import com.csvreader.CsvReader;
53
54 public class CsvLoader implements IDataLoader {
55
56 static final Log logger = LogFactory.getLog(CsvLoader.class);
57
58 protected JdbcTemplate jdbcTemplate;
59
60 protected IDbDialect dbDialect;
61
62 protected IParameterService parameterService;
63
64 protected IConfigurationService configurationService;
65
66 protected INodeService nodeService;
67
68 protected CsvReader csvReader;
69
70 protected DataLoaderContext context;
71
72 protected DataLoaderStatistics stats;
73
74 protected List<IDataLoaderFilter> filters;
75
76 protected Map<String, IColumnFilter> columnFilters;
77
78 public void open(BufferedReader reader) throws IOException {
79 csvReader = new CsvReader(reader);
80 csvReader.setEscapeMode(CsvReader.ESCAPE_MODE_BACKSLASH);
81 csvReader.setSafetySwitch(false);
82 context = new DataLoaderContext();
83 stats = new DataLoaderStatistics();
84 }
85
86 public void open(BufferedReader reader, List<IDataLoaderFilter> filters, Map<String, IColumnFilter> columnFilters)
87 throws IOException {
88 open(reader);
89 this.filters = filters;
90 this.columnFilters = columnFilters;
91 }
92
93 public boolean hasNext() throws IOException {
94 while (csvReader.readRecord()) {
95 String[] tokens = csvReader.getValues();
96
97 if (tokens[0].equals(CsvConstants.BATCH)) {
98 context.setBatchId(new Long(tokens[1]));
99 stats = new DataLoaderStatistics();
100 return true;
101 } else if (tokens[0].equals(CsvConstants.NODEID)) {
102 context.setNodeId(tokens[1]);
103 } else if (tokens[0].equals(CsvConstants.VERSION)) {
104 context.setVersion(tokens[1] + "." + tokens[2] + "." + tokens[3]);
105 } else if (isMetaTokenParsed(tokens)) {
106 continue;
107 } else {
108 throw new RuntimeException("Unexpected token '" + tokens[0] + "' while parsing for next batch");
109 }
110 }
111 return false;
112 }
113
114 public void skip() throws IOException {
115 context.setSkipping(true);
116 load();
117
118 }
119
120 public void load() throws IOException {
121 try {
122 prepareTableForDataLoad();
123 BinaryEncoding encoding = BinaryEncoding.NONE;
124 while (csvReader.readRecord()) {
125 String[] tokens = csvReader.getValues();
126 stats.incrementLineCount();
127 if (tokens != null && tokens.length > 0 && tokens[0] != null) {
128 stats.incrementByteCount(csvReader.getRawRecord().length());
129
130 if (tokens[0].equals(CsvConstants.INSERT)) {
131 if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
132 insert(tokens, encoding);
133 }
134 } else if (tokens[0].equals(CsvConstants.UPDATE)) {
135 if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
136 update(tokens, encoding);
137 }
138 } else if (tokens[0].equals(CsvConstants.DELETE)) {
139 if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
140 delete(tokens);
141 }
142 } else if (tokens[0].equals(CsvConstants.OLD)) {
143 context.setOldData((String[]) ArrayUtils.subarray(tokens, 1, tokens.length));
144 } else if (isMetaTokenParsed(tokens)) {
145 continue;
146 } else if (tokens[0].equals(CsvConstants.COMMIT)) {
147 break;
148 } else if (tokens[0].equals(CsvConstants.SQL)) {
149 if ((context.getTableTemplate() == null || !context.getTableTemplate().isIgnoreThisTable()) && !context.isSkipping()) {
150 runSql(tokens[1]);
151 }
152 } else if (tokens[0].equals(CsvConstants.CREATE)) {
153 if (!context.isSkipping()) {
154 runDdl(tokens[1]);
155 }
156 } else if (tokens[0].equals(CsvConstants.BINARY)) {
157 try {
158 encoding = BinaryEncoding.valueOf(tokens[1]);
159 } catch (Exception ex) {
160 logger.warn("Unsupported binary encoding value of " + tokens[1]);
161 }
162 } else {
163 throw new RuntimeException("Unexpected token '" + tokens[0] + "' on line "
164 + stats.getLineCount() + " of batch " + context.getBatchId());
165 }
166 }
167 }
168 } finally {
169 cleanupAfterDataLoad();
170 }
171 }
172
173 protected boolean isMetaTokenParsed(String[] tokens) {
174 boolean isMetaTokenParsed = true;
175 if (tokens[0].equals(CsvConstants.TABLE)) {
176 setTable(tokens[1]);
177 } else if (tokens[0].equals(CsvConstants.KEYS)) {
178 context.setKeyNames((String[]) ArrayUtils.subarray(tokens, 1, tokens.length));
179 } else if (tokens[0].equals(CsvConstants.COLUMNS)) {
180 context.setColumnNames((String[]) ArrayUtils.subarray(tokens, 1, tokens.length));
181 } else {
182 isMetaTokenParsed = false;
183 }
184 return isMetaTokenParsed;
185 }
186
187 protected void setTable(String tableName) {
188 cleanupAfterDataLoad();
189 context.setTableName(tableName);
190
191 if (context.getTableTemplate() == null) {
192 String fullTableName = tableName;
193
194 if (parameterService.is(ParameterConstants.DATA_LOADER_LOOKUP_TARGET_SCHEMA)) {
195 Node sourceNode = nodeService.findNode(context.getNodeId());
196 if (sourceNode != null) {
197 Trigger trigger = configurationService.getTriggerFor(tableName, sourceNode.getNodeGroupId());
198 if (trigger != null && trigger.getTargetSchemaName() != null) {
199 fullTableName = trigger.getTargetSchemaName() + "." + tableName;
200 }
201 }
202 }
203
204 context.setTableTemplate(new TableTemplate(jdbcTemplate, dbDialect, fullTableName,
205 this.columnFilters != null ? this.columnFilters.get(tableName) : null, parameterService
206 .is(ParameterConstants.DATA_LOADER_NO_KEYS_IN_UPDATE)));
207 }
208 prepareTableForDataLoad();
209 }
210
211 protected void prepareTableForDataLoad() {
212 if (context != null && context.getTableTemplate() != null) {
213 dbDialect.prepareTableForDataLoad(context.getTableTemplate().getTable());
214 }
215 }
216
217 protected void cleanupAfterDataLoad() {
218 if (context != null && context.getTableTemplate() != null) {
219 dbDialect.cleanupAfterDataLoad(context.getTableTemplate().getTable());
220 }
221 }
222
223 protected int insert(String[] tokens, BinaryEncoding encoding) {
224 stats.incrementStatementCount();
225 String[] columnValues = parseColumns(tokens, 1);
226 int rows = 0;
227
228 boolean continueToLoad = true;
229 if (filters != null) {
230 stats.startTimer();
231 for (IDataLoaderFilter filter : filters) {
232 continueToLoad &= filter.filterInsert(context, columnValues);
233 }
234 stats.incrementFilterMillis(stats.endTimer());
235 }
236
237 if (continueToLoad) {
238 boolean enableFallbackUpdate = parameterService.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_UPDATE);
239 Object savepoint = null;
240 try {
241 stats.startTimer();
242 if (enableFallbackUpdate) {
243 savepoint = dbDialect.createSavepointForFallback();
244 }
245 rows = context.getTableTemplate().insert(context, columnValues, encoding);
246 dbDialect.releaseSavepoint(savepoint);
247 } catch (DataIntegrityViolationException e) {
248
249
250 if (enableFallbackUpdate) {
251 dbDialect.rollbackToSavepoint(savepoint);
252 if (logger.isDebugEnabled()) {
253 logger.debug("Unable to insert into " + context.getTableName() + ", updating instead: "
254 + ArrayUtils.toString(tokens));
255 }
256 String keyValues[] = parseKeys(tokens, 1);
257 stats.incrementFallbackUpdateCount();
258 rows = context.getTableTemplate().update(context, columnValues, keyValues, encoding);
259 if (rows == 0) {
260 throw new RuntimeException("Unable to update " + context.getTableName() + ": "
261 + ArrayUtils.toString(tokens), e);
262 }
263 } else {
264
265 throw e;
266 }
267 } finally {
268 stats.incrementDatabaseMillis(stats.endTimer());
269 }
270 }
271 return rows;
272 }
273
274 protected int update(String[] tokens, BinaryEncoding encoding) {
275 stats.incrementStatementCount();
276 String columnValues[] = parseColumns(tokens, 1);
277 String keyValues[] = parseKeys(tokens, 1 + columnValues.length);
278 int rows = 0;
279 boolean continueToLoad = true;
280 if (filters != null) {
281 stats.startTimer();
282 for (IDataLoaderFilter filter : filters) {
283 continueToLoad &= filter.filterUpdate(context, columnValues, keyValues);
284 }
285 stats.incrementFilterMillis(stats.endTimer());
286 }
287
288 if (continueToLoad) {
289 boolean enableFallbackInsert = parameterService.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_INSERT);
290 stats.startTimer();
291 rows = context.getTableTemplate().update(context, columnValues, keyValues, encoding);
292 if (rows == 0) {
293 if (enableFallbackInsert) {
294 if (logger.isDebugEnabled()) {
295 logger.debug("Unable to update " + context.getTableName() + ", inserting instead: "
296 + ArrayUtils.toString(tokens));
297 }
298 stats.incrementFallbackInsertCount();
299 rows = context.getTableTemplate().insert(context, columnValues, encoding);
300 } else {
301
302 stats.incrementDatabaseMillis(stats.endTimer());
303 throw new RuntimeException("Unable to update " + context.getTableName() + ": "
304 + ArrayUtils.toString(tokens));
305 }
306 } else if (rows > 1) {
307 logger.warn("Too many rows (" + rows + ") updated for " + context.getTableName() + ": "
308 + ArrayUtils.toString(tokens));
309 }
310 stats.incrementDatabaseMillis(stats.endTimer());
311 }
312 return rows;
313 }
314
315 protected int delete(String[] tokens) {
316 stats.incrementStatementCount();
317 String keyValues[] = parseKeys(tokens, 1);
318 int rows = 0;
319 boolean continueToLoad = true;
320
321 if (filters != null) {
322 stats.startTimer();
323 for (IDataLoaderFilter filter : filters) {
324 continueToLoad &= filter.filterDelete(context, keyValues);
325 }
326 stats.incrementFilterMillis(stats.endTimer());
327 }
328
329 if (continueToLoad) {
330 boolean allowMissingDelete = parameterService.is(ParameterConstants.DATA_LOADER_ALLOW_MISSING_DELETE);
331 stats.startTimer();
332 rows = context.getTableTemplate().delete(context, keyValues);
333 stats.incrementDatabaseMillis(stats.endTimer());
334 if (rows == 0) {
335 if (allowMissingDelete) {
336 logger.warn("Delete of " + context.getTableName() + " affected no rows: "
337 + ArrayUtils.toString(tokens));
338 stats.incrementMissingDeleteCount();
339 } else {
340 throw new RuntimeException("Delete of " + context.getTableName() + " affected no rows: "
341 + ArrayUtils.toString(tokens));
342 }
343 }
344 }
345 return rows;
346 }
347
348 protected void runSql(String sql) {
349 stats.incrementStatementCount();
350 if (logger.isDebugEnabled()) {
351 logger.debug("Running SQL: " + sql);
352 }
353 jdbcTemplate.execute(sql);
354 if (context.getTableTemplate() != null) {
355 context.getTableTemplate().resetMetaData();
356 }
357 }
358
359 protected void runDdl(String xml) {
360 stats.incrementStatementCount();
361 if (logger.isDebugEnabled()) {
362 logger.debug("Running DDL: " + xml);
363 }
364 dbDialect.createTables(xml);
365 context.getTableTemplate().resetMetaData();
366 }
367
368 protected String[] parseKeys(String[] tokens, int startIndex) {
369 if (context.getTableTemplate().getKeyNames() == null) {
370 throw new RuntimeException("Key names were not specified for table "
371 + context.getTableTemplate().getTableName());
372 }
373 int keyLength = context.getTableTemplate().getKeyNames().length;
374 return parseValues("key", tokens, startIndex, startIndex + keyLength);
375 }
376
377 protected String[] parseColumns(String[] tokens, int startIndex) {
378 if (context.getTableTemplate().getColumnNames() == null) {
379 throw new RuntimeException("Column names were not specified for table "
380 + context.getTableTemplate().getTableName());
381 }
382 int columnLength = context.getTableTemplate().getColumnNames().length;
383 return parseValues("column", tokens, startIndex, startIndex + columnLength);
384 }
385
386 protected String[] parseValues(String name, String[] tokens, int startIndex, int endIndex) {
387 if (tokens.length < endIndex) {
388 throw new RuntimeException("Expected to have " + (endIndex - startIndex) + " " + name + " values for "
389 + context.getTableTemplate().getTableName() + ": " + ArrayUtils.toString(tokens));
390 }
391 return (String[]) ArrayUtils.subarray(tokens, startIndex, endIndex);
392 }
393
394 public IDataLoader clone() {
395 CsvLoader dataLoader = new CsvLoader();
396 dataLoader.setJdbcTemplate(jdbcTemplate);
397 dataLoader.setDbDialect(dbDialect);
398 dataLoader.setParameterService(parameterService);
399 dataLoader.setConfigurationService(configurationService);
400 dataLoader.setNodeService(nodeService);
401 return dataLoader;
402 }
403
404 public void close() {
405 if (csvReader != null) {
406 csvReader.close();
407 }
408 }
409
410 public IDataLoaderContext getContext() {
411 return context;
412 }
413
414 public IDataLoaderStatistics getStatistics() {
415 return stats;
416 }
417
418 public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
419 this.jdbcTemplate = jdbcTemplate;
420 }
421
422 public void setDbDialect(IDbDialect dbDialect) {
423 this.dbDialect = dbDialect;
424 }
425
426 public void setParameterService(IParameterService parameterService) {
427 this.parameterService = parameterService;
428 }
429
430 public void setConfigurationService(IConfigurationService configurationService) {
431 this.configurationService = configurationService;
432 }
433
434 public void setNodeService(INodeService nodeService) {
435 this.nodeService = nodeService;
436 }
437
438 }