View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    * Copyright (C) Eric Long <erilong@users.sourceforge.net>
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 3 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, see
19   * <http://www.gnu.org/licenses/>.
20   */
21  
22  package org.jumpmind.symmetric.service.impl;
23  
24  import java.sql.Connection;
25  import java.sql.PreparedStatement;
26  import java.sql.ResultSet;
27  import java.sql.SQLException;
28  import java.sql.Types;
29  import java.util.ArrayList;
30  import java.util.Collections;
31  import java.util.Comparator;
32  import java.util.Date;
33  import java.util.HashSet;
34  import java.util.List;
35  import java.util.Set;
36  
37  import org.apache.commons.lang.StringUtils;
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.jumpmind.symmetric.common.ParameterConstants;
41  import org.jumpmind.symmetric.db.IDbDialect;
42  import org.jumpmind.symmetric.db.SequenceIdentifier;
43  import org.jumpmind.symmetric.model.BatchType;
44  import org.jumpmind.symmetric.model.NodeChannel;
45  import org.jumpmind.symmetric.model.NodeSecurity;
46  import org.jumpmind.symmetric.model.OutgoingBatch;
47  import org.jumpmind.symmetric.model.OutgoingBatchHistory;
48  import org.jumpmind.symmetric.model.OutgoingBatch.Status;
49  import org.jumpmind.symmetric.service.IConfigurationService;
50  import org.jumpmind.symmetric.service.INodeService;
51  import org.jumpmind.symmetric.service.IOutgoingBatchService;
52  import org.jumpmind.symmetric.statistic.IStatisticManager;
53  import org.jumpmind.symmetric.statistic.StatisticName;
54  import org.jumpmind.symmetric.util.MaxRowsStatementCreator;
55  import org.springframework.dao.DataAccessException;
56  import org.springframework.jdbc.core.ConnectionCallback;
57  import org.springframework.jdbc.core.PreparedStatementCallback;
58  import org.springframework.jdbc.core.RowMapper;
59  import org.springframework.jdbc.support.JdbcUtils;
60  import org.springframework.transaction.annotation.Transactional;
61  
62  public class OutgoingBatchService extends AbstractService implements IOutgoingBatchService {
63  
64      final static Log logger = LogFactory.getLog(OutgoingBatchService.class);
65  
66      private INodeService nodeService;
67  
68      private IDbDialect dbDialect;
69  
70      private IStatisticManager statisticManager;
71      
72      private IConfigurationService configurationService;
73  
74      /***
75       * Create a batch and mark events as tied to that batch. We iterate through
76       * all the events so we can find a transaction boundary to stop on. <p/>
77       * This method is currently non-transactional because of the fear of having
78       * to deal with large numbers of events as part of the same batch. This
79       * shouldn't be an issue in most cases other than possibly leaving a batch
80       * row w/out data every now and then or leaving a batch w/out the associated
81       * history row.
82       */
83      @Transactional
84      @Deprecated
85      public void buildOutgoingBatches(final String nodeId, final List<NodeChannel> channels) {
86          for (NodeChannel nodeChannel : channels) {
87              buildOutgoingBatches(nodeId, nodeChannel);
88          }
89      }
90  
91      @Transactional
92      public void buildOutgoingBatches(final String nodeId, final NodeChannel channel) {
93  
94          if (channel.isSuspended()) {
95              logger.warn(channel.getId() + " channel for " + nodeId + " is currently suspended.");
96          } else if (channel.isEnabled()) {
97              long dataEventCount = jdbcTemplate.queryForLong(getSql("selectEventsToBatchCountSql"), new Object[] { 0,
98                      nodeId, channel.getId() });
99  
100             if (dataEventCount > channel.getMaxBatchSize()) {
101                 buildOutgoingBatchesPeekAhead(nodeId, channel);
102             } else if (dataEventCount > 0) {
103                 OutgoingBatch newBatch = new OutgoingBatch();
104                 newBatch.setBatchType(BatchType.EVENTS);
105                 newBatch.setChannelId(channel.getId());
106                 newBatch.setNodeId(nodeId);
107 
108                 if (channel.isIgnored()) {
109                     newBatch.setStatus(Status.OK);
110                 }
111 
112                 long startTime = System.currentTimeMillis();
113                 insertOutgoingBatch(newBatch);
114                 dataEventCount = jdbcTemplate.update(getSql("updateBatchedEventsMultiSql"), new Object[] {
115                         newBatch.getBatchId(), 1, 0, nodeId, newBatch.getChannelId() });
116                 long databaseMillis = System.currentTimeMillis() - startTime;
117 
118                 OutgoingBatchHistory history = new OutgoingBatchHistory(newBatch);
119                 history.setEndTime(new Date());
120                 history.setDataEventCount(dataEventCount);
121                 history.setDatabaseMillis(databaseMillis);
122                 insertOutgoingBatchHistory(history);
123                 statisticManager.getStatistic(StatisticName.OUTGOING_MS_PER_EVENT_BATCHED).add(databaseMillis,
124                         dataEventCount);
125                 statisticManager.getStatistic(StatisticName.OUTGOING_EVENTS_PER_BATCH).add(dataEventCount, 1);
126             }
127         }
128     }
129 
130     @Transactional
131     private void buildOutgoingBatchesPeekAhead(final String nodeId, final NodeChannel channel) {
132 
133         final int batchSizePeekAhead = parameterService.getInt(ParameterConstants.OUTGOING_BATCH_PEEK_AHEAD_WINDOW);
134 
135         jdbcTemplate.execute(new ConnectionCallback() {
136             public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
137 
138                 PreparedStatement update = null;
139                 try {
140                     update = conn.prepareStatement(getSql("updateBatchedEventsSql"));
141 
142                     update.setQueryTimeout(jdbcTemplate.getQueryTimeout());
143 
144                     if (channel.isSuspended()) {
145                         logger.warn(channel.getId() + " channel for " + nodeId + " is currently suspended.");
146                     } else if (channel.isEnabled()) {
147                         // determine which transactions will be part of this
148                         // batch on this channel
149                         PreparedStatement select = null;
150                         ResultSet results = null;
151 
152                         try {
153 
154                             select = conn.prepareStatement(getSql("selectEventsToBatchSql"));
155 
156                             select.setQueryTimeout(jdbcTemplate.getQueryTimeout());
157 
158                             select.setInt(1, 0);
159                             select.setString(2, nodeId);
160                             select.setString(3, channel.getId());
161                             results = select.executeQuery();
162 
163                             int count = 0;
164                             long databaseMillis = 0;
165                             int dataEventCount = 0;
166                             boolean peekAheadMode = false;
167                             int peekAheadCountDown = batchSizePeekAhead;
168                             Set<String> transactionIds = new HashSet<String>();
169 
170                             OutgoingBatch newBatch = new OutgoingBatch();
171                             newBatch.setBatchType(BatchType.EVENTS);
172                             newBatch.setChannelId(channel.getId());
173                             newBatch.setNodeId(nodeId);
174 
175                             // node channel is setup to ignore, just mark the
176                             // batch as already processed.
177                             if (channel.isIgnored()) {
178                                 newBatch.setStatus(Status.OK);
179                             }
180 
181                             if (results.next()) {
182 
183                                 databaseMillis = 0;
184                                 dataEventCount = 0;
185                                 insertOutgoingBatch(newBatch);
186                                 OutgoingBatchHistory history = new OutgoingBatchHistory(newBatch);
187 
188                                 do {
189                                     String trxId = results.getString(1);
190 
191                                     if (!peekAheadMode
192                                             || (peekAheadMode && (trxId != null && transactionIds.contains(trxId)))) {
193                                         peekAheadCountDown = batchSizePeekAhead;
194 
195                                         if (trxId != null) {
196                                             transactionIds.add(trxId);
197                                         }
198 
199                                         int dataId = results.getInt(2);
200 
201                                         update.clearParameters();
202                                         update.setLong(1, Long.valueOf(newBatch.getBatchId()));
203                                         update.setInt(2, 1);
204                                         update.setString(3, nodeId);
205                                         update.setLong(4, dataId);
206                                         update.addBatch();
207 
208                                         count++;
209                                         dataEventCount++;
210 
211                                     } else {
212                                         peekAheadCountDown--;
213                                     }
214 
215                                     if (count > channel.getMaxBatchSize()) {
216                                         peekAheadMode = true;
217                                     }
218 
219                                     // put this in so we don't build up too many
220                                     // statements to send to the server.
221                                     if (count
222                                             % parameterService
223                                                     .getInt(ParameterConstants.OUTGOING_BATCH_PEEK_AHEAD_BATCH_COMMIT_SIZE) == 0) {
224                                         long startTime = System.currentTimeMillis();
225                                         update.executeBatch();
226                                         databaseMillis += (System.currentTimeMillis() - startTime);
227                                     }
228 
229                                 } while (results.next() && peekAheadCountDown != 0);
230 
231                                 long startTime = System.currentTimeMillis();
232                                 update.executeBatch();
233                                 databaseMillis += (System.currentTimeMillis() - startTime);
234 
235                                 history.setEndTime(new Date());
236                                 history.setDataEventCount(dataEventCount);
237                                 history.setDatabaseMillis(databaseMillis);
238                                 insertOutgoingBatchHistory(history);
239                                 statisticManager.getStatistic(StatisticName.OUTGOING_MS_PER_EVENT_BATCHED).add(
240                                         databaseMillis, dataEventCount);
241                                 statisticManager.getStatistic(StatisticName.OUTGOING_EVENTS_PER_BATCH).add(
242                                         dataEventCount, 1);
243 
244                             }
245 
246                         } finally {
247                             JdbcUtils.closeResultSet(results);
248                             JdbcUtils.closeStatement(select);
249                         }
250 
251                     }
252                 } finally {
253                     JdbcUtils.closeStatement(update);
254                 }
255                 return null;
256             }
257         });
258     }
259 
260     public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) {
261         long batchId = dbDialect.insertWithGeneratedKey(getSql("createBatchSql"), SequenceIdentifier.OUTGOING_BATCH,
262                 new PreparedStatementCallback() {
263                     public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
264                         ps.setString(1, outgoingBatch.getNodeId());
265                         ps.setString(2, outgoingBatch.getChannelId());
266                         ps.setString(3, outgoingBatch.getStatus().name());
267                         ps.setString(4, outgoingBatch.getBatchType().getCode());
268                         return null;
269                     }
270                 });
271         outgoingBatch.setBatchId(batchId);
272     }
273 
274     /***
275      * Select batches to process. Batches that are NOT in error will be returned
276      * first. They will be ordered by batch id as the batches will have already
277      * been created by {@link #buildOutgoingBatches(String)} in channel priority
278      * order.
279      */
280     @SuppressWarnings("unchecked")
281     public List<OutgoingBatch> getOutgoingBatches(String nodeId) {
282         List<OutgoingBatch> list = (List<OutgoingBatch>) jdbcTemplate.query(getSql("selectOutgoingBatchSql"),
283                 new Object[] { nodeId, OutgoingBatch.Status.NE.toString(), OutgoingBatch.Status.SE.toString(),
284                         OutgoingBatch.Status.ER.toString() }, new OutgoingBatchMapper());
285         final HashSet<String> errorChannels = new HashSet<String>();
286         for (OutgoingBatch batch : list) {
287             if (batch.getStatus().equals(OutgoingBatch.Status.ER)) {
288                 errorChannels.add(batch.getChannelId());
289             }
290         }
291         
292         List<NodeChannel> channels = configurationService.getChannels();
293         Collections.sort(channels, new Comparator<NodeChannel>() {
294             public int compare(NodeChannel b1, NodeChannel b2) {
295                 boolean isError1 = errorChannels.contains(b1.getId());
296                 boolean isError2 = errorChannels.contains(b2.getId());
297                 if (isError1 == isError2) {
298                     return b1.getProcessingOrder() < b2.getProcessingOrder() ? -1 : 1;
299                 } else if (!isError1 && isError2) {
300                     return -1;
301                 } else {
302                     return 1;
303                 }
304             }
305         });
306         
307        
308         return filterMaxNumberOfOutgoingBatchesByChannel(list, channels);
309     }
310     
311     /***
312      * Filter out the maximum number of batches to send.
313      */
314     private List<OutgoingBatch> filterMaxNumberOfOutgoingBatchesByChannel(List<OutgoingBatch> batches, List<NodeChannel> channels) {
315         if (batches != null && batches.size() > 0) {
316             List<OutgoingBatch> filtered = new ArrayList<OutgoingBatch>(batches.size());
317             for (NodeChannel channel : channels) {
318                 int max = channel.getMaxBatchToSend();
319                 int count = 0;
320                 for (OutgoingBatch outgoingBatch : batches) {
321                     if (channel.getId().equals(outgoingBatch.getChannelId()) && count < max) {
322                         filtered.add(outgoingBatch);
323                         count++;
324                     }
325                 }
326             }
327             return filtered;
328         } else {
329             return batches;
330         }
331     }
332 
333     @SuppressWarnings("unchecked")
334     public List<OutgoingBatch> getOutgoingBatchRange(String startBatchId, String endBatchId) {
335         return (List<OutgoingBatch>) jdbcTemplate.query(getSql("selectOutgoingBatchRangeSql"), new Object[] {
336                 startBatchId, endBatchId }, new OutgoingBatchMapper());
337     }
338 
339     @SuppressWarnings("unchecked")
340     public List<OutgoingBatch> getOutgoingBatcheErrors(int maxRows) {
341         return (List<OutgoingBatch>) jdbcTemplate.query(new MaxRowsStatementCreator(
342                 getSql("selectOutgoingBatchErrorsSql"), maxRows), new OutgoingBatchMapper());
343     }
344 
345     // Moving away from SENT status to reduce updates to outgoing_batch table
346     @Deprecated
347     public void markOutgoingBatchSent(OutgoingBatch batch) {
348         setBatchStatus(batch.getBatchId(), batch.getStatus());
349     }
350 
351     @Deprecated
352     public void setBatchStatus(long batchId, Status status) {
353         jdbcTemplate.update(getSql("changeBatchStatusSql"), new Object[] { status.name(), batchId });
354     }
355 
356     // TODO Should this move to DataService?
357     @SuppressWarnings("unchecked")
358     public boolean isInitialLoadComplete(String nodeId) {
359 
360         NodeSecurity security = nodeService.findNodeSecurity(nodeId);
361         if (security == null || security.isInitialLoadEnabled()) {
362             return false;
363         }
364 
365         List<String> statuses = (List<String>) jdbcTemplate.queryForList(getSql("initialLoadStatusSql"),
366                 new Object[] { nodeId }, String.class);
367         if (statuses == null || statuses.size() == 0) {
368             throw new RuntimeException("The initial load has not been started for " + nodeId);
369         }
370 
371         for (String status : statuses) {
372             if (!Status.OK.name().equals(status)) {
373                 return false;
374             }
375         }
376         return true;
377     }
378 
379     public void insertOutgoingBatchHistory(OutgoingBatchHistory history) {
380         jdbcTemplate.update(getSql("insertOutgoingBatchHistorySql"), new Object[] { history.getBatchId(),
381                 history.getNodeId(), history.getStatus().toString(), history.getNetworkMillis(),
382                 history.getFilterMillis(), history.getDatabaseMillis(), history.getHostName(), history.getByteCount(),
383                 history.getDataEventCount(), history.getFailedDataId(), history.getStartTime(), history.getEndTime(),
384                 history.getSqlState(), history.getSqlCode(), StringUtils.abbreviate(history.getSqlMessage(), 50) },
385                 new int[] { Types.INTEGER, Types.VARCHAR, Types.CHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER,
386                         Types.VARCHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP,
387                         Types.VARCHAR, Types.INTEGER, Types.VARCHAR });
388     }
389 
390     @SuppressWarnings("unchecked")
391     public List<OutgoingBatchHistory> findOutgoingBatchHistory(long batchId, String nodeId) {
392         return (List<OutgoingBatchHistory>) jdbcTemplate.query(getSql("findOutgoingBatchHistorySql"), new Object[] {
393                 batchId, nodeId }, new OutgoingBatchHistoryMapper());
394     }
395 
396     class OutgoingBatchMapper implements RowMapper {
397         public Object mapRow(ResultSet rs, int num) throws SQLException {
398             OutgoingBatch batch = new OutgoingBatch();
399             batch.setBatchId(rs.getLong(1));
400             batch.setNodeId(rs.getString(2));
401             batch.setChannelId(rs.getString(3));
402             batch.setStatus(rs.getString(4));
403             batch.setBatchType(rs.getString(5));
404             batch.setCreateTime(rs.getTimestamp(6));
405             return batch;
406         }
407     }
408 
409     class OutgoingBatchHistoryMapper implements RowMapper {
410         public Object mapRow(ResultSet rs, int num) throws SQLException {
411             OutgoingBatchHistory history = new OutgoingBatchHistory();
412             history.setBatchId(rs.getLong(1));
413             history.setNodeId(rs.getString(2));
414             history.setStatus(OutgoingBatchHistory.Status.valueOf(rs.getString(3)));
415             history.setNetworkMillis(rs.getLong(4));
416             history.setFilterMillis(rs.getLong(5));
417             history.setDatabaseMillis(rs.getLong(6));
418             history.setHostName(rs.getString(7));
419             history.setByteCount(rs.getLong(8));
420             history.setDataEventCount(rs.getLong(9));
421             history.setFailedDataId(rs.getLong(10));
422             history.setStartTime(rs.getTime(11));
423             history.setEndTime(rs.getTime(12));
424             history.setSqlState(rs.getString(13));
425             history.setSqlCode(rs.getInt(14));
426             history.setSqlMessage(rs.getString(15));
427             return history;
428         }
429     }
430 
431     public void setDbDialect(IDbDialect dbDialect) {
432         this.dbDialect = dbDialect;
433     }
434 
435     public void setNodeService(INodeService nodeService) {
436         this.nodeService = nodeService;
437     }
438 
439     public void setStatisticManager(IStatisticManager statisticManager) {
440         this.statisticManager = statisticManager;
441     }
442 
443     public void setConfigurationService(IConfigurationService configurationService) {
444         this.configurationService = configurationService;
445     }
446 
447 }