View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>,
5    *               Eric Long <erilong@users.sourceforge.net>
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 3 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, see
19   * <http://www.gnu.org/licenses/>.
20   */
21  
22  package org.jumpmind.symmetric.service.impl;
23  
24  import java.io.BufferedWriter;
25  import java.io.IOException;
26  import java.sql.Connection;
27  import java.sql.PreparedStatement;
28  import java.sql.ResultSet;
29  import java.sql.SQLException;
30  import java.util.ArrayList;
31  import java.util.Date;
32  import java.util.List;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.jumpmind.symmetric.Version;
37  import org.jumpmind.symmetric.common.Constants;
38  import org.jumpmind.symmetric.db.IDbDialect;
39  import org.jumpmind.symmetric.extract.DataExtractorContext;
40  import org.jumpmind.symmetric.extract.IDataExtractor;
41  import org.jumpmind.symmetric.extract.IExtractorFilter;
42  import org.jumpmind.symmetric.model.BatchType;
43  import org.jumpmind.symmetric.model.Data;
44  import org.jumpmind.symmetric.model.DataEventType;
45  import org.jumpmind.symmetric.model.Node;
46  import org.jumpmind.symmetric.model.NodeChannel;
47  import org.jumpmind.symmetric.model.OutgoingBatch;
48  import org.jumpmind.symmetric.model.OutgoingBatchHistory;
49  import org.jumpmind.symmetric.model.Trigger;
50  import org.jumpmind.symmetric.model.TriggerHistory;
51  import org.jumpmind.symmetric.model.OutgoingBatch.Status;
52  import org.jumpmind.symmetric.service.IConfigurationService;
53  import org.jumpmind.symmetric.service.IDataExtractorService;
54  import org.jumpmind.symmetric.service.IExtractListener;
55  import org.jumpmind.symmetric.service.IOutgoingBatchService;
56  import org.jumpmind.symmetric.transport.IOutgoingTransport;
57  import org.springframework.beans.BeansException;
58  import org.springframework.beans.factory.BeanFactory;
59  import org.springframework.beans.factory.BeanFactoryAware;
60  import org.springframework.dao.DataAccessException;
61  import org.springframework.jdbc.core.ConnectionCallback;
62  import org.springframework.jdbc.support.JdbcUtils;
63  
64  public class DataExtractorService extends AbstractService implements IDataExtractorService, BeanFactoryAware {
65  
66      protected static final Log logger = LogFactory.getLog(DataExtractorService.class);
67  
68      private IOutgoingBatchService outgoingBatchService;
69  
70      private IConfigurationService configurationService;
71  
72      private IDbDialect dbDialect;
73  
74      private BeanFactory beanFactory;
75  
76      private DataExtractorContext context;
77  
78      private List<IExtractorFilter> extractorFilters;
79  
80      private String tablePrefix;
81  
82      public OutgoingBatch extractNodeIdentityFor(Node node, IOutgoingTransport transport) {
83          String tableName = tablePrefix + "_node_identity";
84          OutgoingBatch batch = new OutgoingBatch(node, Constants.CHANNEL_CONFIG, BatchType.INITIAL_LOAD);
85          outgoingBatchService.insertOutgoingBatch(batch);
86          try {
87              BufferedWriter writer = transport.open();
88              IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion());
89              DataExtractorContext ctxCopy = context.copy(dataExtractor);
90              dataExtractor.init(writer, ctxCopy);
91              dataExtractor.begin(batch, writer);
92              TriggerHistory audit = new TriggerHistory(tableName, "node_id", "node_id");
93              Data data = new Data(1, null, node.getNodeId(), DataEventType.INSERT, tableName, null, audit);
94              dataExtractor.write(writer, data, ctxCopy);
95              dataExtractor.commit(batch, writer);
96              return batch;
97          } catch (IOException e) {
98              throw new RuntimeException(e);
99          }
100 
101     }
102 
103     private IDataExtractor getDataExtractor(String version) {
104         String beanName = Constants.DATA_EXTRACTOR;
105         if (version != null) {
106             int[] versions = Version.parseVersion(version);
107             // TODO: this should be versions[1] == 0 for 1.2 release
108             if (versions[0] == 1 && versions[1] <= 1) {
109                 beanName += "10";
110             } else if (versions[0] == 1 && versions[1] <= 3) {
111                 beanName += "13";
112             }
113         }
114         return (IDataExtractor) beanFactory.getBean(beanName);
115     }
116 
117     public OutgoingBatch extractInitialLoadFor(Node node, final Trigger trigger, final IOutgoingTransport transport) {
118 
119         OutgoingBatch batch = new OutgoingBatch(node, trigger.getChannelId(), BatchType.INITIAL_LOAD);
120         outgoingBatchService.insertOutgoingBatch(batch);
121         OutgoingBatchHistory history = new OutgoingBatchHistory(batch);
122         writeInitialLoad(node, trigger, transport, batch, null);
123         history.setStatus(OutgoingBatchHistory.Status.SE);
124         history.setEndTime(new Date());
125         outgoingBatchService.insertOutgoingBatchHistory(history);
126         return batch;
127     }
128 
129     public void extractInitialLoadWithinBatchFor(Node node, final Trigger trigger, final IOutgoingTransport transport,
130             DataExtractorContext ctx) {
131         writeInitialLoad(node, trigger, transport, null, ctx);
132     }
133 
134     protected void writeInitialLoad(Node node, final Trigger trigger, final IOutgoingTransport transport,
135             final OutgoingBatch batch, final DataExtractorContext ctx) {
136 
137         final String sql = dbDialect.createInitalLoadSqlFor(node, trigger);
138         final TriggerHistory audit = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
139         final IDataExtractor dataExtractor = ctx != null ? ctx.getDataExtractor() : getDataExtractor(node
140                 .getSymmetricVersion());
141 
142         jdbcTemplate.execute(new ConnectionCallback() {
143             public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
144                 try {
145                     PreparedStatement st = null;
146                     ResultSet rs = null;
147                     try {
148                         st = conn.prepareStatement(sql, java.sql.ResultSet.TYPE_FORWARD_ONLY,
149                                 java.sql.ResultSet.CONCUR_READ_ONLY);
150                         st.setFetchSize(dbDialect.getStreamingResultsFetchSize());
151                         rs = st.executeQuery();
152                         final BufferedWriter writer = transport.open();
153                         final DataExtractorContext ctxCopy = ctx == null ? context.copy(dataExtractor) : ctx;
154                         if (batch != null) {
155                             dataExtractor.init(writer, ctxCopy);
156                             dataExtractor.begin(batch, writer);
157                         }
158                         while (rs.next()) {
159                             dataExtractor.write(writer, new Data(0, null, rs.getString(1), DataEventType.INSERT, audit
160                                     .getSourceTableName(), null, audit), ctxCopy);
161                         }
162                         if (batch != null) {
163                             dataExtractor.commit(batch, writer);
164                         }
165                     } finally {
166                         JdbcUtils.closeResultSet(rs);
167                         JdbcUtils.closeStatement(st);
168                     }
169                     return null;
170                 } catch (Exception e) {
171                     throw new RuntimeException("Error during SQL: " + sql, e);
172                 }
173             }
174         });
175     }
176 
177     public boolean extract(Node node, IOutgoingTransport transport) throws Exception {
178         IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion());
179         ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor, transport);
180         return extract(node, handler);
181     }
182 
183     /***
184      * Allow a handler callback to do the work so we can route the extracted
185      * data to other types of handlers for processing.
186      */
187     public boolean extract(Node node, final IExtractListener handler) throws Exception {
188 
189         List<NodeChannel> channels = configurationService.getChannels();
190 
191         for (NodeChannel nodeChannel : channels) {
192             outgoingBatchService.buildOutgoingBatches(node.getNodeId(), nodeChannel);
193         }
194 
195         List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatches(node.getNodeId());
196 
197         if (batches != null && batches.size() > 0) {
198             OutgoingBatchHistory history = null;
199             try {
200                 handler.init();
201                 for (final OutgoingBatch batch : batches) {
202                     history = new OutgoingBatchHistory(batch);
203                     handler.startBatch(batch);
204                     selectEventDataToExtract(handler, batch);
205                     handler.endBatch(batch);
206                     history.setStatus(OutgoingBatchHistory.Status.SE);
207                     history.setEndTime(new Date());
208                     outgoingBatchService.insertOutgoingBatchHistory(history);
209                 }
210             } catch (RuntimeException e) {
211                 SQLException se = unwrapSqlException(e);
212                 if (history != null) {
213                     if (se != null) {
214                         history.setSqlState(se.getSQLState());
215                         history.setSqlCode(se.getErrorCode());
216                         history.setSqlMessage(se.getMessage());
217                     } else {
218                         history.setSqlMessage(e.getMessage());
219                     }
220                     history.setStatus(OutgoingBatchHistory.Status.SE);
221                     history.setEndTime(new Date());
222                     outgoingBatchService.setBatchStatus(history.getBatchId(), Status.ER);
223                     outgoingBatchService.insertOutgoingBatchHistory(history);
224                 } else {
225                     logger.error(
226                             "Could not log the outgoing batch status because the batch history has not been created.",
227                             e);
228                 }
229                 throw e;
230             } finally {
231                 handler.done();
232             }
233             return true;
234         }
235         return false;
236     }
237     
238     public boolean extractBatchRange(IOutgoingTransport transport, String startBatchId, String endBatchId)
239             throws Exception {
240         IDataExtractor dataExtractor = getDataExtractor(null);
241         ExtractStreamHandler handler = new ExtractStreamHandler(dataExtractor, transport);
242         return extractBatchRange(handler, startBatchId, endBatchId);
243     }
244 
245     public boolean extractBatchRange(final IExtractListener handler, String startBatchId, String endBatchId)
246             throws Exception {
247 
248         List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatchRange(startBatchId, endBatchId);
249 
250         if (batches != null && batches.size() > 0) {
251             try {
252                 handler.init();
253                 for (final OutgoingBatch batch : batches) {
254                     handler.startBatch(batch);
255                     selectEventDataToExtract(handler, batch);
256                     handler.endBatch(batch);
257                 }
258             } finally {
259                 handler.done();
260             }
261             return true;
262         }
263         return false;
264     }
265 
266     private void selectEventDataToExtract(final IExtractListener handler, final OutgoingBatch batch) {
267         jdbcTemplate.execute(new ConnectionCallback() {
268             public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
269                 PreparedStatement ps = conn.prepareStatement(getSql("selectEventDataToExtractSql"),
270                         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
271                 ps.setFetchSize(dbDialect.getStreamingResultsFetchSize());
272                 ps.setString(1, batch.getNodeId());
273                 ps.setLong(2, batch.getBatchId());
274                 ResultSet rs = ps.executeQuery();
275                 try {
276                     while (rs.next()) {
277                         try {
278                             handler.dataExtracted(next(rs));
279                         } catch (RuntimeException e) {
280                             throw e;
281                         } catch (Exception e) {
282                             throw new RuntimeException(e);
283                         }
284                     }
285                 } finally {
286                     JdbcUtils.closeResultSet(rs);
287                     JdbcUtils.closeStatement(ps);
288                 }
289                 return null;
290             }
291         });
292     }
293 
294     private Data next(ResultSet results) throws SQLException {
295         long dataId = results.getLong(1);
296         String tableName = results.getString(2);
297         DataEventType eventType = DataEventType.getEventType(results.getString(3));
298         String rowData = results.getString(4);
299         String pk = results.getString(5);
300         Date created = results.getDate(7);
301         TriggerHistory audit = configurationService.getHistoryRecordFor(results.getInt(8));
302         return new Data(dataId, pk, rowData, eventType, tableName, created, audit);
303     }
304 
305     public void setOutgoingBatchService(IOutgoingBatchService batchBuilderService) {
306         this.outgoingBatchService = batchBuilderService;
307     }
308 
309     public void setContext(DataExtractorContext context) {
310         this.context = context;
311     }
312 
313     public void setDbDialect(IDbDialect dialect) {
314         this.dbDialect = dialect;
315     }
316 
317     public void setConfigurationService(IConfigurationService configurationService) {
318         this.configurationService = configurationService;
319     }
320 
321     class ExtractStreamHandler implements IExtractListener {
322 
323         IOutgoingTransport transport;
324 
325         IDataExtractor dataExtractor;
326 
327         DataExtractorContext context;
328 
329         BufferedWriter writer;
330 
331         ExtractStreamHandler(IDataExtractor dataExtractor, IOutgoingTransport transport) throws Exception {
332             this.transport = transport;
333             this.dataExtractor = dataExtractor;
334         }
335 
336         public void dataExtracted(Data data) throws Exception {
337             if (extractorFilters != null) {
338                 for (IExtractorFilter filter : extractorFilters) {
339                     if (!filter.filterData(data, context)) {
340                         // short circuit the extract if instructed
341                         return;
342                     }
343                 }
344             }
345             dataExtractor.write(writer, data, context);
346         }
347 
348         public void done() throws IOException {
349         }
350 
351         public void endBatch(OutgoingBatch batch) throws Exception {
352             dataExtractor.commit(batch, writer);
353         }
354 
355         public void init() throws Exception {
356             this.writer = transport.open();
357             this.context = DataExtractorService.this.context.copy(dataExtractor);
358             dataExtractor.init(writer, context);
359         }
360 
361         public void startBatch(OutgoingBatch batch) throws Exception {
362             context.setBatch(batch);
363             dataExtractor.begin(batch, writer);
364         }
365 
366     }
367 
368     public void setTablePrefix(String tablePrefix) {
369         this.tablePrefix = tablePrefix;
370     }
371 
372     public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
373         this.beanFactory = beanFactory;
374     }
375 
376     public void addExtractorFilter(IExtractorFilter extractorFilter) {
377         if (this.extractorFilters == null) {
378             this.extractorFilters = new ArrayList<IExtractorFilter>();
379         }
380         this.extractorFilters.add(extractorFilter);
381     }
382 
383     public void setExtractorFilters(List<IExtractorFilter> extractorFilters) {
384         this.extractorFilters = extractorFilters;
385     }
386 
387 }