1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.jumpmind.symmetric.load.csv;
23
24 import java.io.BufferedReader;
25 import java.io.IOException;
26 import java.util.List;
27 import java.util.Map;
28
29 import org.apache.commons.lang.ArrayUtils;
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.jumpmind.symmetric.common.ParameterConstants;
33 import org.jumpmind.symmetric.common.csv.CsvConstants;
34 import org.jumpmind.symmetric.db.BinaryEncoding;
35 import org.jumpmind.symmetric.db.IDbDialect;
36 import org.jumpmind.symmetric.load.DataLoaderContext;
37 import org.jumpmind.symmetric.load.DataLoaderStatistics;
38 import org.jumpmind.symmetric.load.IColumnFilter;
39 import org.jumpmind.symmetric.load.IDataLoader;
40 import org.jumpmind.symmetric.load.IDataLoaderContext;
41 import org.jumpmind.symmetric.load.IDataLoaderFilter;
42 import org.jumpmind.symmetric.load.IDataLoaderStatistics;
43 import org.jumpmind.symmetric.load.TableTemplate;
44 import org.jumpmind.symmetric.model.Node;
45 import org.jumpmind.symmetric.model.Trigger;
46 import org.jumpmind.symmetric.service.IConfigurationService;
47 import org.jumpmind.symmetric.service.INodeService;
48 import org.jumpmind.symmetric.service.IParameterService;
49 import org.springframework.dao.DataIntegrityViolationException;
50 import org.springframework.jdbc.core.JdbcTemplate;
51
52 import com.csvreader.CsvReader;
53
54 public class CsvLoader implements IDataLoader {
55
56 static final Log logger = LogFactory.getLog(CsvLoader.class);
57
58 protected JdbcTemplate jdbcTemplate;
59
60 protected IDbDialect dbDialect;
61
62 protected IParameterService parameterService;
63
64 protected IConfigurationService configurationService;
65
66 protected INodeService nodeService;
67
68 protected CsvReader csvReader;
69
70 protected DataLoaderContext context;
71
72 protected DataLoaderStatistics stats;
73
74 protected List<IDataLoaderFilter> filters;
75
76 protected Map<String, IColumnFilter> columnFilters;
77
78 public void open(BufferedReader reader) throws IOException {
79 csvReader = new CsvReader(reader);
80 csvReader.setEscapeMode(CsvReader.ESCAPE_MODE_BACKSLASH);
81 csvReader.setSafetySwitch(false);
82 context = new DataLoaderContext();
83 stats = new DataLoaderStatistics();
84 }
85
86 public void open(BufferedReader reader, List<IDataLoaderFilter> filters, Map<String, IColumnFilter> columnFilters)
87 throws IOException {
88 open(reader);
89 this.filters = filters;
90 this.columnFilters = columnFilters;
91 }
92
93 public boolean hasNext() throws IOException {
94 while (csvReader.readRecord()) {
95 String[] tokens = csvReader.getValues();
96
97 if (tokens[0].equals(CsvConstants.BATCH)) {
98 context.setBatchId(new Long(tokens[1]));
99 stats = new DataLoaderStatistics();
100 prepareTableForDataLoad();
101 return true;
102 } else if (tokens[0].equals(CsvConstants.NODEID)) {
103 context.setNodeId(tokens[1]);
104 } else if (tokens[0].equals(CsvConstants.VERSION)) {
105 context.setVersion(tokens[1] + "." + tokens[2] + "." + tokens[3]);
106 } else if (isMetaTokenParsed(tokens)) {
107 continue;
108 } else {
109 throw new RuntimeException("Unexpected token '" + tokens[0] + "' while parsing for next batch");
110 }
111 }
112 return false;
113 }
114
115 public void skip() throws IOException {
116 context.setSkipping(true);
117 load();
118
119 }
120
121 public void load() throws IOException {
122 BinaryEncoding encoding = BinaryEncoding.NONE;
123 while (csvReader.readRecord()) {
124 String[] tokens = csvReader.getValues();
125 stats.incrementLineCount();
126 if (tokens != null && tokens.length > 0 && tokens[0] != null) {
127 stats.incrementByteCount(csvReader.getRawRecord().length());
128
129 if (tokens[0].equals(CsvConstants.INSERT)) {
130 if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
131 insert(tokens, encoding);
132 }
133 } else if (tokens[0].equals(CsvConstants.UPDATE)) {
134 if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
135 update(tokens, encoding);
136 }
137 } else if (tokens[0].equals(CsvConstants.DELETE)) {
138 if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
139 delete(tokens);
140 }
141 } else if (isMetaTokenParsed(tokens)) {
142 continue;
143 } else if (tokens[0].equals(CsvConstants.COMMIT)) {
144 cleanupAfterDataLoad();
145 break;
146 } else if (tokens[0].equals(CsvConstants.SQL)) {
147 if (!context.getTableTemplate().isIgnoreThisTable() && !context.isSkipping()) {
148 runSql(tokens[1]);
149 }
150 } else if (tokens[0].equals(CsvConstants.CREATE)) {
151 if (!context.isSkipping()) {
152 runDdl(tokens[1]);
153 }
154 } else if (tokens[0].equals(CsvConstants.BINARY)) {
155 try {
156 encoding = BinaryEncoding.valueOf(tokens[1]);
157 } catch (Exception ex) {
158 logger.warn("Unsupported binary encoding value of " + tokens[1]);
159 }
160 } else {
161 throw new RuntimeException("Unexpected token '" + tokens[0] + "' on line " + stats.getLineCount()
162 + " of batch " + context.getBatchId());
163 }
164 }
165 }
166 }
167
168 protected boolean isMetaTokenParsed(String[] tokens) {
169 boolean isMetaTokenParsed = true;
170 if (tokens[0].equals(CsvConstants.TABLE)) {
171 setTable(tokens[1]);
172 } else if (tokens[0].equals(CsvConstants.KEYS)) {
173 context.setKeyNames((String[]) ArrayUtils.subarray(tokens, 1, tokens.length));
174 } else if (tokens[0].equals(CsvConstants.COLUMNS)) {
175 context.setColumnNames((String[]) ArrayUtils.subarray(tokens, 1, tokens.length));
176 } else {
177 isMetaTokenParsed = false;
178 }
179 return isMetaTokenParsed;
180 }
181
182 protected void setTable(String tableName) {
183 cleanupAfterDataLoad();
184 context.setTableName(tableName);
185
186 if (context.getTableTemplate() == null) {
187 String fullTableName = tableName;
188
189 if (parameterService.is(ParameterConstants.DATA_LOADER_LOOKUP_TARGET_SCHEMA)) {
190 Node sourceNode = nodeService.findNode(context.getNodeId());
191 if (sourceNode != null) {
192 Trigger trigger = configurationService.getTriggerFor(tableName, sourceNode.getNodeGroupId());
193 if (trigger != null && trigger.getTargetSchemaName() != null) {
194 fullTableName = trigger.getTargetSchemaName() + "." + tableName;
195 }
196 }
197 }
198
199 context.setTableTemplate(new TableTemplate(jdbcTemplate, dbDialect, fullTableName,
200 this.columnFilters != null ? this.columnFilters.get(tableName) : null, parameterService
201 .is(ParameterConstants.DATA_LOADER_NO_KEYS_IN_UPDATE)));
202 }
203
204 prepareTableForDataLoad();
205 }
206
207 protected void prepareTableForDataLoad() {
208 if (context != null && context.getTableTemplate() != null) {
209 dbDialect.prepareTableForDataLoad(context.getTableTemplate().getTable());
210 }
211 }
212
213 protected void cleanupAfterDataLoad() {
214 if (context != null && context.getTableTemplate() != null) {
215 dbDialect.cleanupAfterDataLoad(context.getTableTemplate().getTable());
216 }
217 }
218
219 protected int insert(String[] tokens, BinaryEncoding encoding) {
220 stats.incrementStatementCount();
221 String[] columnValues = parseColumns(tokens, 1);
222 int rows = 0;
223
224 boolean continueToLoad = true;
225 if (filters != null) {
226 stats.startTimer();
227 for (IDataLoaderFilter filter : filters) {
228 continueToLoad &= filter.filterInsert(context, columnValues);
229 }
230 stats.incrementFilterMillis(stats.endTimer());
231 }
232
233 if (continueToLoad) {
234 boolean enableFallbackUpdate = parameterService.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_UPDATE);
235 Object savepoint = null;
236 try {
237 stats.startTimer();
238 if (enableFallbackUpdate) {
239 savepoint = dbDialect.createSavepointForFallback();
240 }
241 rows = context.getTableTemplate().insert(context, columnValues, encoding);
242 dbDialect.releaseSavepoint(savepoint);
243 } catch (DataIntegrityViolationException e) {
244
245
246 if (enableFallbackUpdate) {
247 dbDialect.rollbackToSavepoint(savepoint);
248 if (logger.isDebugEnabled()) {
249 logger.debug("Unable to insert into " + context.getTableName() + ", updating instead: "
250 + ArrayUtils.toString(tokens));
251 }
252 String keyValues[] = parseKeys(tokens, 1);
253 stats.incrementFallbackUpdateCount();
254 rows = context.getTableTemplate().update(context, columnValues, keyValues, encoding);
255 if (rows == 0) {
256 throw new RuntimeException("Unable to update " + context.getTableName() + ": "
257 + ArrayUtils.toString(tokens), e);
258 }
259 } else {
260
261 throw e;
262 }
263 } finally {
264 stats.incrementDatabaseMillis(stats.endTimer());
265 }
266 }
267 return rows;
268 }
269
270 protected int update(String[] tokens, BinaryEncoding encoding) {
271 stats.incrementStatementCount();
272 String columnValues[] = parseColumns(tokens, 1);
273 String keyValues[] = parseKeys(tokens, 1 + columnValues.length);
274 int rows = 0;
275 boolean continueToLoad = true;
276 if (filters != null) {
277 stats.startTimer();
278 for (IDataLoaderFilter filter : filters) {
279 continueToLoad &= filter.filterUpdate(context, columnValues, keyValues);
280 }
281 stats.incrementFilterMillis(stats.endTimer());
282 }
283
284 if (continueToLoad) {
285 boolean enableFallbackInsert = parameterService.is(ParameterConstants.DATA_LOADER_ENABLE_FALLBACK_INSERT);
286 stats.startTimer();
287 rows = context.getTableTemplate().update(context, columnValues, keyValues, encoding);
288 if (rows == 0) {
289 if (enableFallbackInsert) {
290 if (logger.isDebugEnabled()) {
291 logger.debug("Unable to update " + context.getTableName() + ", inserting instead: "
292 + ArrayUtils.toString(tokens));
293 }
294 stats.incrementFallbackInsertCount();
295 rows = context.getTableTemplate().insert(context, columnValues, encoding);
296 } else {
297
298 stats.incrementDatabaseMillis(stats.endTimer());
299 throw new RuntimeException("Unable to update " + context.getTableName() + ": "
300 + ArrayUtils.toString(tokens));
301 }
302 } else if (rows > 1) {
303 logger.warn("Too many rows (" + rows + ") updated for " + context.getTableName() + ": "
304 + ArrayUtils.toString(tokens));
305 }
306 stats.incrementDatabaseMillis(stats.endTimer());
307 }
308 return rows;
309 }
310
311 protected int delete(String[] tokens) {
312 stats.incrementStatementCount();
313 String keyValues[] = parseKeys(tokens, 1);
314 int rows = 0;
315 boolean continueToLoad = true;
316
317 if (filters != null) {
318 stats.startTimer();
319 for (IDataLoaderFilter filter : filters) {
320 continueToLoad &= filter.filterDelete(context, keyValues);
321 }
322 stats.incrementFilterMillis(stats.endTimer());
323 }
324
325 if (continueToLoad) {
326 boolean allowMissingDelete = parameterService.is(ParameterConstants.DATA_LOADER_ALLOW_MISSING_DELETE);
327 stats.startTimer();
328 rows = context.getTableTemplate().delete(context, keyValues);
329 stats.incrementDatabaseMillis(stats.endTimer());
330 if (rows == 0) {
331 if (allowMissingDelete) {
332 logger.warn("Delete of " + context.getTableName() + " affected no rows: "
333 + ArrayUtils.toString(tokens));
334 stats.incrementMissingDeleteCount();
335 } else {
336 throw new RuntimeException("Delete of " + context.getTableName() + " affected no rows: "
337 + ArrayUtils.toString(tokens));
338 }
339 }
340 }
341 return rows;
342 }
343
344 protected void runSql(String sql) {
345 stats.incrementStatementCount();
346 if (logger.isDebugEnabled()) {
347 logger.debug("Running SQL: " + sql);
348 }
349 jdbcTemplate.execute(sql);
350 }
351
352 protected void runDdl(String xml) {
353 stats.incrementStatementCount();
354 if (logger.isDebugEnabled()) {
355 logger.debug("Running DDL: " + xml);
356 }
357 dbDialect.createTables(xml);
358 context.getTableTemplate().resetMetaData();
359 }
360
361 protected String[] parseKeys(String[] tokens, int startIndex) {
362 if (context.getTableTemplate().getKeyNames() == null) {
363 throw new RuntimeException("Key names were not specified for table "
364 + context.getTableTemplate().getTableName());
365 }
366 int keyLength = context.getTableTemplate().getKeyNames().length;
367 return parseValues("key", tokens, startIndex, startIndex + keyLength);
368 }
369
370 protected String[] parseColumns(String[] tokens, int startIndex) {
371 if (context.getTableTemplate().getColumnNames() == null) {
372 throw new RuntimeException("Column names were not specified for table "
373 + context.getTableTemplate().getTableName());
374 }
375 int columnLength = context.getTableTemplate().getColumnNames().length;
376 return parseValues("column", tokens, startIndex, startIndex + columnLength);
377 }
378
379 protected String[] parseValues(String name, String[] tokens, int startIndex, int endIndex) {
380 if (tokens.length < endIndex) {
381 throw new RuntimeException("Expected to have " + (endIndex - startIndex) + " " + name + " values for "
382 + context.getTableTemplate().getTableName() + ": " + ArrayUtils.toString(tokens));
383 }
384 return (String[]) ArrayUtils.subarray(tokens, startIndex, endIndex);
385 }
386
387 public IDataLoader clone() {
388 CsvLoader dataLoader = new CsvLoader();
389 dataLoader.setJdbcTemplate(jdbcTemplate);
390 dataLoader.setDbDialect(dbDialect);
391 dataLoader.setParameterService(parameterService);
392 dataLoader.setConfigurationService(configurationService);
393 dataLoader.setNodeService(nodeService);
394 return dataLoader;
395 }
396
397 public void close() {
398
399 if (csvReader != null) {
400 csvReader.close();
401 }
402
403 }
404
405 public IDataLoaderContext getContext() {
406 return context;
407 }
408
409 public IDataLoaderStatistics getStatistics() {
410 return stats;
411 }
412
413 public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
414 this.jdbcTemplate = jdbcTemplate;
415 }
416
417 public void setDbDialect(IDbDialect dbDialect) {
418 this.dbDialect = dbDialect;
419 }
420
421 public void setParameterService(IParameterService parameterService) {
422 this.parameterService = parameterService;
423 }
424
425 public void setConfigurationService(IConfigurationService configurationService) {
426 this.configurationService = configurationService;
427 }
428
429 public void setNodeService(INodeService nodeService) {
430 this.nodeService = nodeService;
431 }
432
433 }