1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.jumpmind.symmetric.service.impl;
23
24 import java.io.BufferedWriter;
25 import java.io.IOException;
26 import java.sql.Connection;
27 import java.sql.PreparedStatement;
28 import java.sql.ResultSet;
29 import java.sql.SQLException;
30 import java.util.ArrayList;
31 import java.util.Date;
32 import java.util.List;
33
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.jumpmind.symmetric.Version;
37 import org.jumpmind.symmetric.common.Constants;
38 import org.jumpmind.symmetric.db.IDbDialect;
39 import org.jumpmind.symmetric.extract.DataExtractorContext;
40 import org.jumpmind.symmetric.extract.IDataExtractor;
41 import org.jumpmind.symmetric.extract.IExtractorFilter;
42 import org.jumpmind.symmetric.model.BatchType;
43 import org.jumpmind.symmetric.model.Data;
44 import org.jumpmind.symmetric.model.DataEventType;
45 import org.jumpmind.symmetric.model.Node;
46 import org.jumpmind.symmetric.model.NodeChannel;
47 import org.jumpmind.symmetric.model.OutgoingBatch;
48 import org.jumpmind.symmetric.model.OutgoingBatchHistory;
49 import org.jumpmind.symmetric.model.Trigger;
50 import org.jumpmind.symmetric.model.TriggerHistory;
51 import org.jumpmind.symmetric.model.OutgoingBatch.Status;
52 import org.jumpmind.symmetric.service.IConfigurationService;
53 import org.jumpmind.symmetric.service.IDataExtractorService;
54 import org.jumpmind.symmetric.service.IExtractListener;
55 import org.jumpmind.symmetric.service.IOutgoingBatchService;
56 import org.jumpmind.symmetric.transport.IOutgoingTransport;
57 import org.springframework.beans.BeansException;
58 import org.springframework.beans.factory.BeanFactory;
59 import org.springframework.beans.factory.BeanFactoryAware;
60 import org.springframework.dao.DataAccessException;
61 import org.springframework.jdbc.core.ConnectionCallback;
62 import org.springframework.jdbc.support.JdbcUtils;
63
64 public class DataExtractorService extends AbstractService implements IDataExtractorService, BeanFactoryAware {
65
66 protected static final Log logger = LogFactory.getLog(DataExtractorService.class);
67
68 private IOutgoingBatchService outgoingBatchService;
69
70 private IConfigurationService configurationService;
71
72 private IDbDialect dbDialect;
73
74 private BeanFactory beanFactory;
75
76 private DataExtractorContext context;
77
78 private List<IExtractorFilter> extractorFilters;
79
80 private String tablePrefix;
81
82 public OutgoingBatch extractNodeIdentityFor(Node node, IOutgoingTransport transport) {
83 String tableName = tablePrefix + "_node_identity";
84 OutgoingBatch batch = new OutgoingBatch(node, Constants.CHANNEL_CONFIG, BatchType.INITIAL_LOAD);
85 outgoingBatchService.insertOutgoingBatch(batch);
86 try {
87 BufferedWriter writer = transport.open();
88 IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion());
89 DataExtractorContext ctxCopy = context.copy(dataExtractor);
90 dataExtractor.init(writer, ctxCopy);
91 dataExtractor.begin(batch, writer);
92 TriggerHistory audit = new TriggerHistory(tableName, "node_id", "node_id");
93 Data data = new Data(1, null, node.getNodeId(), DataEventType.INSERT, tableName, null, audit);
94 dataExtractor.write(writer, data, ctxCopy);
95 dataExtractor.commit(batch, writer);
96 return batch;
97 } catch (IOException e) {
98 throw new RuntimeException(e);
99 }
100
101 }
102
103 private IDataExtractor getDataExtractor(String version) {
104 String beanName = Constants.DATA_EXTRACTOR;
105 if (version != null) {
106 int[] versions = Version.parseVersion(version);
107
108 if (versions[0] == 1 && versions[1] <= 1) {
109 beanName += "10";
110 } else if (versions[0] == 1 && versions[1] <= 3) {
111 beanName += "13";
112 }
113 }
114 return (IDataExtractor) beanFactory.getBean(beanName);
115 }
116
117 public OutgoingBatch extractInitialLoadFor(Node node, final Trigger trigger, final IOutgoingTransport transport) {
118
119 OutgoingBatch batch = new OutgoingBatch(node, trigger.getChannelId(), BatchType.INITIAL_LOAD);
120 outgoingBatchService.insertOutgoingBatch(batch);
121 OutgoingBatchHistory history = new OutgoingBatchHistory(batch);
122 writeInitialLoad(node, trigger, transport, batch, null);
123 history.setStatus(OutgoingBatchHistory.Status.SE);
124 history.setEndTime(new Date());
125 outgoingBatchService.insertOutgoingBatchHistory(history);
126 return batch;
127 }
128
129 public void extractInitialLoadWithinBatchFor(Node node, final Trigger trigger, final IOutgoingTransport transport,
130 DataExtractorContext ctx) {
131 writeInitialLoad(node, trigger, transport, null, ctx);
132 }
133
134 protected void writeInitialLoad(Node node, final Trigger trigger, final IOutgoingTransport transport,
135 final OutgoingBatch batch, final DataExtractorContext ctx) {
136
137 final String sql = dbDialect.createInitalLoadSqlFor(node, trigger);
138 final TriggerHistory audit = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
139 final IDataExtractor dataExtractor = ctx != null ? ctx.getDataExtractor() : getDataExtractor(node
140 .getSymmetricVersion());
141
142 jdbcTemplate.execute(new ConnectionCallback() {
143 public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
144 try {
145 PreparedStatement st = null;
146 ResultSet rs = null;
147 try {
148 st = conn.prepareStatement(sql, java.sql.ResultSet.TYPE_FORWARD_ONLY,
149 java.sql.ResultSet.CONCUR_READ_ONLY);
150 st.setFetchSize(dbDialect.getStreamingResultsFetchSize());
151 rs = st.executeQuery();
152 final BufferedWriter writer = transport.open();
153 final DataExtractorContext ctxCopy = ctx == null ? context.copy(dataExtractor) : ctx;
154 if (batch != null) {
155 dataExtractor.init(writer, ctxCopy);
156 dataExtractor.begin(batch, writer);
157 }
158 while (rs.next()) {
159 dataExtractor.write(writer, new Data(0, null, rs.getString(1), DataEventType.INSERT, audit
160 .getSourceTableName(), null, audit), ctxCopy);
161 }
162 if (batch != null) {
163 dataExtractor.commit(batch, writer);
164 }
165 } finally {
166 JdbcUtils.closeResultSet(rs);
167 JdbcUtils.closeStatement(st);
168 }
169 return null;
170 } catch (Exception e) {
171 throw new RuntimeException("Error during SQL: " + sql, e);
172 }
173 }
174 });
175 }
176
177 public boolean extract(Node node, IOutgoingTransport transport) throws Exception {
178 IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion());
179 ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor, transport);
180 return extract(node, handler);
181 }
182
183 /***
184 * Allow a handler callback to do the work so we can route the extracted
185 * data to other types of handlers for processing.
186 */
187 public boolean extract(Node node, final IExtractListener handler) throws Exception {
188
189 List<NodeChannel> channels = configurationService.getChannels();
190
191 for (NodeChannel nodeChannel : channels) {
192 outgoingBatchService.buildOutgoingBatches(node.getNodeId(), nodeChannel);
193 }
194
195 List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatches(node.getNodeId());
196
197 if (batches != null && batches.size() > 0) {
198 OutgoingBatchHistory history = null;
199 try {
200 handler.init();
201 for (final OutgoingBatch batch : batches) {
202 history = new OutgoingBatchHistory(batch);
203 handler.startBatch(batch);
204 selectEventDataToExtract(handler, batch);
205 handler.endBatch(batch);
206 history.setStatus(OutgoingBatchHistory.Status.SE);
207 history.setEndTime(new Date());
208 outgoingBatchService.insertOutgoingBatchHistory(history);
209 }
210 } catch (RuntimeException e) {
211 SQLException se = unwrapSqlException(e);
212 if (history != null) {
213 if (se != null) {
214 history.setSqlState(se.getSQLState());
215 history.setSqlCode(se.getErrorCode());
216 history.setSqlMessage(se.getMessage());
217 } else {
218 history.setSqlMessage(e.getMessage());
219 }
220 history.setStatus(OutgoingBatchHistory.Status.SE);
221 history.setEndTime(new Date());
222 outgoingBatchService.setBatchStatus(history.getBatchId(), Status.ER);
223 outgoingBatchService.insertOutgoingBatchHistory(history);
224 } else {
225 logger.error(
226 "Could not log the outgoing batch status because the batch history has not been created.",
227 e);
228 }
229 throw e;
230 } finally {
231 handler.done();
232 }
233 return true;
234 }
235 return false;
236 }
237
238 public boolean extractBatchRange(IOutgoingTransport transport, String startBatchId, String endBatchId)
239 throws Exception {
240 IDataExtractor dataExtractor = getDataExtractor(null);
241 ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor, transport);
242 return extractBatchRange(handler, startBatchId, endBatchId);
243 }
244
245 public boolean extractBatchRange(final IExtractListener handler, String startBatchId, String endBatchId)
246 throws Exception {
247
248 List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatchRange(startBatchId, endBatchId);
249
250 if (batches != null && batches.size() > 0) {
251 try {
252 handler.init();
253 for (final OutgoingBatch batch : batches) {
254 handler.startBatch(batch);
255 selectEventDataToExtract(handler, batch);
256 handler.endBatch(batch);
257 }
258 } finally {
259 handler.done();
260 }
261 return true;
262 }
263 return false;
264 }
265
266 private void selectEventDataToExtract(final IExtractListener handler, final OutgoingBatch batch) {
267 jdbcTemplate.execute(new ConnectionCallback() {
268 public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
269 PreparedStatement ps = conn.prepareStatement(getSql("selectEventDataToExtractSql"),
270 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
271 ps.setFetchSize(dbDialect.getStreamingResultsFetchSize());
272 ps.setString(1, batch.getNodeId());
273 ps.setLong(2, batch.getBatchId());
274 ResultSet rs = ps.executeQuery();
275 try {
276 while (rs.next()) {
277 try {
278 handler.dataExtracted(next(rs));
279 } catch (RuntimeException e) {
280 throw e;
281 } catch (Exception e) {
282 throw new RuntimeException(e);
283 }
284 }
285 } finally {
286 JdbcUtils.closeResultSet(rs);
287 JdbcUtils.closeStatement(ps);
288 }
289 return null;
290 }
291 });
292 }
293
294 private Data next(ResultSet results) throws SQLException {
295 long dataId = results.getLong(1);
296 String tableName = results.getString(2);
297 DataEventType eventType = DataEventType.getEventType(results.getString(3));
298 String rowData = results.getString(4);
299 String pk = results.getString(5);
300 Date created = results.getDate(7);
301 TriggerHistory audit = configurationService.getHistoryRecordFor(results.getInt(8));
302 return new Data(dataId, pk, rowData, eventType, tableName, created, audit);
303 }
304
305 public void setOutgoingBatchService(IOutgoingBatchService batchBuilderService) {
306 this.outgoingBatchService = batchBuilderService;
307 }
308
309 public void setContext(DataExtractorContext context) {
310 this.context = context;
311 }
312
313 public void setDbDialect(IDbDialect dialect) {
314 this.dbDialect = dialect;
315 }
316
317 public void setConfigurationService(IConfigurationService configurationService) {
318 this.configurationService = configurationService;
319 }
320
321 class ExtractStreamHandler implements IExtractListener {
322
323 IOutgoingTransport transport;
324
325 IDataExtractor dataExtractor;
326
327 DataExtractorContext context;
328
329 BufferedWriter writer;
330
331 ExtractStreamHandler(IDataExtractor dataExtractor, IOutgoingTransport transport) throws Exception {
332 this.transport = transport;
333 this.dataExtractor = dataExtractor;
334 }
335
336 public void dataExtracted(Data data) throws Exception {
337 if (extractorFilters != null) {
338 for (IExtractorFilter filter : extractorFilters) {
339 if (!filter.filterData(data, context)) {
340
341 return;
342 }
343 }
344 }
345 dataExtractor.write(writer, data, context);
346 }
347
348 public void done() throws IOException {
349 }
350
351 public void endBatch(OutgoingBatch batch) throws Exception {
352 dataExtractor.commit(batch, writer);
353 }
354
355 public void init() throws Exception {
356 this.writer = transport.open();
357 this.context = DataExtractorService.this.context.copy(dataExtractor);
358 dataExtractor.init(writer, context);
359 }
360
361 public void startBatch(OutgoingBatch batch) throws Exception {
362 context.setBatch(batch);
363 dataExtractor.begin(batch, writer);
364 }
365
366 }
367
368 public void setTablePrefix(String tablePrefix) {
369 this.tablePrefix = tablePrefix;
370 }
371
372 public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
373 this.beanFactory = beanFactory;
374 }
375
376 public void addExtractorFilter(IExtractorFilter extractorFilter) {
377 if (this.extractorFilters == null) {
378 this.extractorFilters = new ArrayList<IExtractorFilter>();
379 }
380 this.extractorFilters.add(extractorFilter);
381 }
382
383 public void setExtractorFilters(List<IExtractorFilter> extractorFilters) {
384 this.extractorFilters = extractorFilters;
385 }
386
387 }