1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.jumpmind.symmetric.service.impl;
23
24 import java.sql.Connection;
25 import java.sql.PreparedStatement;
26 import java.sql.ResultSet;
27 import java.sql.SQLException;
28 import java.sql.Types;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.Comparator;
32 import java.util.Date;
33 import java.util.HashSet;
34 import java.util.List;
35 import java.util.Set;
36
37 import org.apache.commons.lang.StringUtils;
38 import org.apache.commons.logging.Log;
39 import org.apache.commons.logging.LogFactory;
40 import org.jumpmind.symmetric.common.ParameterConstants;
41 import org.jumpmind.symmetric.db.IDbDialect;
42 import org.jumpmind.symmetric.db.SequenceIdentifier;
43 import org.jumpmind.symmetric.model.BatchType;
44 import org.jumpmind.symmetric.model.NodeChannel;
45 import org.jumpmind.symmetric.model.NodeSecurity;
46 import org.jumpmind.symmetric.model.OutgoingBatch;
47 import org.jumpmind.symmetric.model.OutgoingBatchHistory;
48 import org.jumpmind.symmetric.model.OutgoingBatch.Status;
49 import org.jumpmind.symmetric.service.IConfigurationService;
50 import org.jumpmind.symmetric.service.INodeService;
51 import org.jumpmind.symmetric.service.IOutgoingBatchService;
52 import org.jumpmind.symmetric.statistic.IStatisticManager;
53 import org.jumpmind.symmetric.statistic.StatisticName;
54 import org.jumpmind.symmetric.util.MaxRowsStatementCreator;
55 import org.springframework.dao.DataAccessException;
56 import org.springframework.jdbc.core.ConnectionCallback;
57 import org.springframework.jdbc.core.PreparedStatementCallback;
58 import org.springframework.jdbc.core.RowMapper;
59 import org.springframework.jdbc.support.JdbcUtils;
60 import org.springframework.transaction.annotation.Transactional;
61
62 public class OutgoingBatchService extends AbstractService implements IOutgoingBatchService {
63
64 final static Log logger = LogFactory.getLog(OutgoingBatchService.class);
65
66 private INodeService nodeService;
67
68 private IDbDialect dbDialect;
69
70 private IStatisticManager statisticManager;
71
72 private IConfigurationService configurationService;
73
74 /***
75 * Create a batch and mark events as tied to that batch. We iterate through
76 * all the events so we can find a transaction boundary to stop on. <p/>
77 * This method is currently non-transactional because of the fear of having
78 * to deal with large numbers of events as part of the same batch. This
79 * shouldn't be an issue in most cases other than possibly leaving a batch
80 * row w/out data every now and then or leaving a batch w/out the associated
81 * history row.
82 */
83 @Transactional
84 @Deprecated
85 public void buildOutgoingBatches(final String nodeId, final List<NodeChannel> channels) {
86 for (NodeChannel nodeChannel : channels) {
87 buildOutgoingBatches(nodeId, nodeChannel);
88 }
89 }
90
91 @Transactional
92 public void buildOutgoingBatches(final String nodeId, final NodeChannel channel) {
93
94 if (channel.isSuspended()) {
95 logger.warn(channel.getId() + " channel for " + nodeId + " is currently suspended.");
96 } else if (channel.isEnabled()) {
97 long dataEventCount = jdbcTemplate.queryForLong(getSql("selectEventsToBatchCountSql"), new Object[] { 0,
98 nodeId, channel.getId() });
99
100 if (dataEventCount > channel.getMaxBatchSize()) {
101 buildOutgoingBatchesPeekAhead(nodeId, channel);
102 } else if (dataEventCount > 0) {
103 OutgoingBatch newBatch = new OutgoingBatch();
104 newBatch.setBatchType(BatchType.EVENTS);
105 newBatch.setChannelId(channel.getId());
106 newBatch.setNodeId(nodeId);
107
108 if (channel.isIgnored()) {
109 newBatch.setStatus(Status.OK);
110 }
111
112 long startTime = System.currentTimeMillis();
113 insertOutgoingBatch(newBatch);
114 dataEventCount = jdbcTemplate.update(getSql("updateBatchedEventsMultiSql"), new Object[] {
115 newBatch.getBatchId(), 1, 0, nodeId, newBatch.getChannelId() });
116 long databaseMillis = System.currentTimeMillis() - startTime;
117
118 OutgoingBatchHistory history = new OutgoingBatchHistory(newBatch);
119 history.setEndTime(new Date());
120 history.setDataEventCount(dataEventCount);
121 history.setDatabaseMillis(databaseMillis);
122 insertOutgoingBatchHistory(history);
123 statisticManager.getStatistic(StatisticName.OUTGOING_MS_PER_EVENT_BATCHED).add(databaseMillis,
124 dataEventCount);
125 statisticManager.getStatistic(StatisticName.OUTGOING_EVENTS_PER_BATCH).add(dataEventCount, 1);
126 }
127 }
128 }
129
130 @Transactional
131 private void buildOutgoingBatchesPeekAhead(final String nodeId, final NodeChannel channel) {
132
133 final int batchSizePeekAhead = parameterService.getInt(ParameterConstants.OUTGOING_BATCH_PEEK_AHEAD_WINDOW);
134
135 jdbcTemplate.execute(new ConnectionCallback() {
136 public Object doInConnection(Connection conn) throws SQLException, DataAccessException {
137
138 PreparedStatement update = null;
139 try {
140 update = conn.prepareStatement(getSql("updateBatchedEventsSql"));
141
142 update.setQueryTimeout(jdbcTemplate.getQueryTimeout());
143
144 if (channel.isSuspended()) {
145 logger.warn(channel.getId() + " channel for " + nodeId + " is currently suspended.");
146 } else if (channel.isEnabled()) {
147
148
149 PreparedStatement select = null;
150 ResultSet results = null;
151
152 try {
153
154 select = conn.prepareStatement(getSql("selectEventsToBatchSql"));
155
156 select.setQueryTimeout(jdbcTemplate.getQueryTimeout());
157
158 select.setInt(1, 0);
159 select.setString(2, nodeId);
160 select.setString(3, channel.getId());
161 results = select.executeQuery();
162
163 int count = 0;
164 long databaseMillis = 0;
165 int dataEventCount = 0;
166 boolean peekAheadMode = false;
167 int peekAheadCountDown = batchSizePeekAhead;
168 Set<String> transactionIds = new HashSet<String>();
169
170 OutgoingBatch newBatch = new OutgoingBatch();
171 newBatch.setBatchType(BatchType.EVENTS);
172 newBatch.setChannelId(channel.getId());
173 newBatch.setNodeId(nodeId);
174
175
176
177 if (channel.isIgnored()) {
178 newBatch.setStatus(Status.OK);
179 }
180
181 if (results.next()) {
182
183 databaseMillis = 0;
184 dataEventCount = 0;
185 insertOutgoingBatch(newBatch);
186 OutgoingBatchHistory history = new OutgoingBatchHistory(newBatch);
187
188 do {
189 String trxId = results.getString(1);
190
191 if (!peekAheadMode
192 || (peekAheadMode && (trxId != null && transactionIds.contains(trxId)))) {
193 peekAheadCountDown = batchSizePeekAhead;
194
195 if (trxId != null) {
196 transactionIds.add(trxId);
197 }
198
199 int dataId = results.getInt(2);
200
201 update.clearParameters();
202 update.setLong(1, Long.valueOf(newBatch.getBatchId()));
203 update.setInt(2, 1);
204 update.setString(3, nodeId);
205 update.setLong(4, dataId);
206 update.addBatch();
207
208 count++;
209 dataEventCount++;
210
211 } else {
212 peekAheadCountDown--;
213 }
214
215 if (count > channel.getMaxBatchSize()) {
216 peekAheadMode = true;
217 }
218
219
220
221 if (count
222 % parameterService
223 .getInt(ParameterConstants.OUTGOING_BATCH_PEEK_AHEAD_BATCH_COMMIT_SIZE) == 0) {
224 long startTime = System.currentTimeMillis();
225 update.executeBatch();
226 databaseMillis += (System.currentTimeMillis() - startTime);
227 }
228
229 } while (results.next() && peekAheadCountDown != 0);
230
231 long startTime = System.currentTimeMillis();
232 update.executeBatch();
233 databaseMillis += (System.currentTimeMillis() - startTime);
234
235 history.setEndTime(new Date());
236 history.setDataEventCount(dataEventCount);
237 history.setDatabaseMillis(databaseMillis);
238 insertOutgoingBatchHistory(history);
239 statisticManager.getStatistic(StatisticName.OUTGOING_MS_PER_EVENT_BATCHED).add(
240 databaseMillis, dataEventCount);
241 statisticManager.getStatistic(StatisticName.OUTGOING_EVENTS_PER_BATCH).add(
242 dataEventCount, 1);
243
244 }
245
246 } finally {
247 JdbcUtils.closeResultSet(results);
248 JdbcUtils.closeStatement(select);
249 }
250
251 }
252 } finally {
253 JdbcUtils.closeStatement(update);
254 }
255 return null;
256 }
257 });
258 }
259
260 public void insertOutgoingBatch(final OutgoingBatch outgoingBatch) {
261 long batchId = dbDialect.insertWithGeneratedKey(getSql("createBatchSql"), SequenceIdentifier.OUTGOING_BATCH,
262 new PreparedStatementCallback() {
263 public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
264 ps.setString(1, outgoingBatch.getNodeId());
265 ps.setString(2, outgoingBatch.getChannelId());
266 ps.setString(3, outgoingBatch.getStatus().name());
267 ps.setString(4, outgoingBatch.getBatchType().getCode());
268 return null;
269 }
270 });
271 outgoingBatch.setBatchId(batchId);
272 }
273
274 /***
275 * Select batches to process. Batches that are NOT in error will be returned
276 * first. They will be ordered by batch id as the batches will have already
277 * been created by {@link #buildOutgoingBatches(String)} in channel priority
278 * order.
279 */
280 @SuppressWarnings("unchecked")
281 public List<OutgoingBatch> getOutgoingBatches(String nodeId) {
282 List<OutgoingBatch> list = (List<OutgoingBatch>) jdbcTemplate.query(getSql("selectOutgoingBatchSql"),
283 new Object[] { nodeId, OutgoingBatch.Status.NE.toString(), OutgoingBatch.Status.SE.toString(),
284 OutgoingBatch.Status.ER.toString() }, new OutgoingBatchMapper());
285 final HashSet<String> errorChannels = new HashSet<String>();
286 for (OutgoingBatch batch : list) {
287 if (batch.getStatus().equals(OutgoingBatch.Status.ER)) {
288 errorChannels.add(batch.getChannelId());
289 }
290 }
291
292 List<NodeChannel> channels = configurationService.getChannels();
293 Collections.sort(channels, new Comparator<NodeChannel>() {
294 public int compare(NodeChannel b1, NodeChannel b2) {
295 boolean isError1 = errorChannels.contains(b1.getId());
296 boolean isError2 = errorChannels.contains(b2.getId());
297 if (isError1 == isError2) {
298 return b1.getProcessingOrder() < b2.getProcessingOrder() ? -1 : 1;
299 } else if (!isError1 && isError2) {
300 return -1;
301 } else {
302 return 1;
303 }
304 }
305 });
306
307
308 return filterMaxNumberOfOutgoingBatchesByChannel(list, channels);
309 }
310
311 /***
312 * Filter out the maximum number of batches to send.
313 */
314 private List<OutgoingBatch> filterMaxNumberOfOutgoingBatchesByChannel(List<OutgoingBatch> batches, List<NodeChannel> channels) {
315 if (batches != null && batches.size() > 0) {
316 List<OutgoingBatch> filtered = new ArrayList<OutgoingBatch>(batches.size());
317 for (NodeChannel channel : channels) {
318 int max = channel.getMaxBatchToSend();
319 int count = 0;
320 for (OutgoingBatch outgoingBatch : batches) {
321 if (channel.getId().equals(outgoingBatch.getChannelId()) && count < max) {
322 filtered.add(outgoingBatch);
323 count++;
324 }
325 }
326 }
327 return filtered;
328 } else {
329 return batches;
330 }
331 }
332
333 @SuppressWarnings("unchecked")
334 public List<OutgoingBatch> getOutgoingBatchRange(String startBatchId, String endBatchId) {
335 return (List<OutgoingBatch>) jdbcTemplate.query(getSql("selectOutgoingBatchRangeSql"), new Object[] {
336 startBatchId, endBatchId }, new OutgoingBatchMapper());
337 }
338
339 @SuppressWarnings("unchecked")
340 public List<OutgoingBatch> getOutgoingBatcheErrors(int maxRows) {
341 return (List<OutgoingBatch>) jdbcTemplate.query(new MaxRowsStatementCreator(
342 getSql("selectOutgoingBatchErrorsSql"), maxRows), new OutgoingBatchMapper());
343 }
344
345
346 @Deprecated
347 public void markOutgoingBatchSent(OutgoingBatch batch) {
348 setBatchStatus(batch.getBatchId(), batch.getStatus());
349 }
350
351 @Deprecated
352 public void setBatchStatus(long batchId, Status status) {
353 jdbcTemplate.update(getSql("changeBatchStatusSql"), new Object[] { status.name(), batchId });
354 }
355
356
357 @SuppressWarnings("unchecked")
358 public boolean isInitialLoadComplete(String nodeId) {
359
360 NodeSecurity security = nodeService.findNodeSecurity(nodeId);
361 if (security == null || security.isInitialLoadEnabled()) {
362 return false;
363 }
364
365 List<String> statuses = (List<String>) jdbcTemplate.queryForList(getSql("initialLoadStatusSql"),
366 new Object[] { nodeId }, String.class);
367 if (statuses == null || statuses.size() == 0) {
368 throw new RuntimeException("The initial load has not been started for " + nodeId);
369 }
370
371 for (String status : statuses) {
372 if (!Status.OK.name().equals(status)) {
373 return false;
374 }
375 }
376 return true;
377 }
378
379 public void insertOutgoingBatchHistory(OutgoingBatchHistory history) {
380 jdbcTemplate.update(getSql("insertOutgoingBatchHistorySql"), new Object[] { history.getBatchId(),
381 history.getNodeId(), history.getStatus().toString(), history.getNetworkMillis(),
382 history.getFilterMillis(), history.getDatabaseMillis(), history.getHostName(), history.getByteCount(),
383 history.getDataEventCount(), history.getFailedDataId(), history.getStartTime(), history.getEndTime(),
384 history.getSqlState(), history.getSqlCode(), StringUtils.abbreviate(history.getSqlMessage(), 50) },
385 new int[] { Types.INTEGER, Types.VARCHAR, Types.CHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER,
386 Types.VARCHAR, Types.INTEGER, Types.INTEGER, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP,
387 Types.VARCHAR, Types.INTEGER, Types.VARCHAR });
388 }
389
390 @SuppressWarnings("unchecked")
391 public List<OutgoingBatchHistory> findOutgoingBatchHistory(long batchId, String nodeId) {
392 return (List<OutgoingBatchHistory>) jdbcTemplate.query(getSql("findOutgoingBatchHistorySql"), new Object[] {
393 batchId, nodeId }, new OutgoingBatchHistoryMapper());
394 }
395
396 class OutgoingBatchMapper implements RowMapper {
397 public Object mapRow(ResultSet rs, int num) throws SQLException {
398 OutgoingBatch batch = new OutgoingBatch();
399 batch.setBatchId(rs.getLong(1));
400 batch.setNodeId(rs.getString(2));
401 batch.setChannelId(rs.getString(3));
402 batch.setStatus(rs.getString(4));
403 batch.setBatchType(rs.getString(5));
404 batch.setCreateTime(rs.getTimestamp(6));
405 return batch;
406 }
407 }
408
409 class OutgoingBatchHistoryMapper implements RowMapper {
410 public Object mapRow(ResultSet rs, int num) throws SQLException {
411 OutgoingBatchHistory history = new OutgoingBatchHistory();
412 history.setBatchId(rs.getLong(1));
413 history.setNodeId(rs.getString(2));
414 history.setStatus(OutgoingBatchHistory.Status.valueOf(rs.getString(3)));
415 history.setNetworkMillis(rs.getLong(4));
416 history.setFilterMillis(rs.getLong(5));
417 history.setDatabaseMillis(rs.getLong(6));
418 history.setHostName(rs.getString(7));
419 history.setByteCount(rs.getLong(8));
420 history.setDataEventCount(rs.getLong(9));
421 history.setFailedDataId(rs.getLong(10));
422 history.setStartTime(rs.getTime(11));
423 history.setEndTime(rs.getTime(12));
424 history.setSqlState(rs.getString(13));
425 history.setSqlCode(rs.getInt(14));
426 history.setSqlMessage(rs.getString(15));
427 return history;
428 }
429 }
430
431 public void setDbDialect(IDbDialect dbDialect) {
432 this.dbDialect = dbDialect;
433 }
434
435 public void setNodeService(INodeService nodeService) {
436 this.nodeService = nodeService;
437 }
438
439 public void setStatisticManager(IStatisticManager statisticManager) {
440 this.statisticManager = statisticManager;
441 }
442
443 public void setConfigurationService(IConfigurationService configurationService) {
444 this.configurationService = configurationService;
445 }
446
447 }