1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.jumpmind.symmetric.service.impl;
22
23 import java.io.File;
24 import java.net.MalformedURLException;
25 import java.net.URL;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Date;
29 import java.util.HashMap;
30 import java.util.List;
31 import java.util.Map;
32
33 import org.apache.commons.lang.StringUtils;
34 import org.apache.commons.logging.Log;
35 import org.apache.commons.logging.LogFactory;
36 import org.apache.ddlutils.model.Table;
37 import org.jumpmind.symmetric.Version;
38 import org.jumpmind.symmetric.common.Constants;
39 import org.jumpmind.symmetric.common.ParameterConstants;
40 import org.jumpmind.symmetric.db.IDbDialect;
41 import org.jumpmind.symmetric.db.SqlScript;
42 import org.jumpmind.symmetric.model.Channel;
43 import org.jumpmind.symmetric.model.DataEventAction;
44 import org.jumpmind.symmetric.model.DataEventType;
45 import org.jumpmind.symmetric.model.Node;
46 import org.jumpmind.symmetric.model.NodeGroupLink;
47 import org.jumpmind.symmetric.model.Trigger;
48 import org.jumpmind.symmetric.model.TriggerHistory;
49 import org.jumpmind.symmetric.model.TriggerReBuildReason;
50 import org.jumpmind.symmetric.service.IBootstrapService;
51 import org.jumpmind.symmetric.service.IClusterService;
52 import org.jumpmind.symmetric.service.IConfigurationService;
53 import org.jumpmind.symmetric.service.IDataService;
54 import org.jumpmind.symmetric.service.INodeService;
55 import org.jumpmind.symmetric.service.IRegistrationService;
56 import org.jumpmind.symmetric.service.IUpgradeService;
57 import org.jumpmind.symmetric.service.LockAction;
58 import org.jumpmind.symmetric.util.AppUtils;
59 import org.springframework.transaction.annotation.Transactional;
60
61 public class BootstrapService extends AbstractService implements IBootstrapService {
62
63 static final Log logger = LogFactory.getLog(BootstrapService.class);
64
65 private IDbDialect dbDialect;
66
67 private String tablePrefix;
68
69 private IConfigurationService configurationService;
70
71 private IClusterService clusterService;
72
73 private INodeService nodeService;
74
75 private IDataService dataService;
76
77 private IUpgradeService upgradeService;
78
79 private IRegistrationService registrationService;
80
81 private List<Channel> defaultChannels;
82
83 private boolean initialized = false;
84
85 private Map<Integer,Trigger> triggerCache;
86
87 public void setupDatabase() {
88 setupDatabase(false);
89 }
90
91 public void setupDatabase(boolean force) {
92 if (!initialized || force) {
93 if (parameterService.is(ParameterConstants.AUTO_CONFIGURE_DATABASE) || force) {
94 logger.info("Initializing SymmetricDS database.");
95 dbDialect.initConfigDb();
96 if (defaultChannels != null) {
97 logger.info("Setting up " + defaultChannels.size() + " default channels");
98 for (Channel defaultChannel : defaultChannels) {
99 configurationService.saveChannel(defaultChannel);
100 }
101 }
102 parameterService.rereadParameters();
103 logger.info("Done initializing SymmetricDS database.");
104 } else {
105 logger.info("SymmetricDS is not configured to auto create the database.");
106 }
107
108 if (upgradeService.isUpgradeNecessary()) {
109 if (parameterService.is(ParameterConstants.AUTO_UPGRADE)) {
110 try {
111 upgradeService.upgrade();
112 } catch (RuntimeException ex) {
113 logger
114 .fatal(
115 "The upgrade failed. The system may be unstable. Please resolve the problem manually.",
116 ex);
117 throw ex;
118 }
119 } else {
120 throw new RuntimeException("Upgrade of node is necessary. "
121 + "Please set auto.upgrade property to true for an automated upgrade.");
122 }
123 }
124 initialized = true;
125
126 }
127
128
129 clusterService.initLockTable();
130 }
131
132 /***
133 * This is done periodically throughout the day (so it needs to be
134 * efficient). If the trigger is created for the first time (no previous
135 * trigger existed), then should we auto-resync data?
136 */
137 public void syncTriggers() {
138 if (clusterService.lock(LockAction.SYNCTRIGGERS)) {
139 try {
140 logger.info("Synchronizing triggers");
141 removeInactiveTriggers();
142 updateOrCreateSymTriggers();
143 } finally {
144 clusterService.unlock(LockAction.SYNCTRIGGERS);
145 logger.info("Done synchronizing triggers.");
146 }
147 }
148 }
149
150 private void removeInactiveTriggers() {
151 List<Trigger> triggers = configurationService.getInactiveTriggersForSourceNodeGroup(parameterService
152 .getString(ParameterConstants.NODE_GROUP_ID));
153 for (Trigger trigger : triggers) {
154 TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
155 logger.info("About to remove triggers for inactivated table: " + history.getSourceTableName());
156 if (history != null) {
157 dbDialect.removeTrigger(history.getSourceCatalogName(), history.getSourceSchemaName(), history
158 .getNameForInsertTrigger(), trigger.getSourceTableName(), history);
159 dbDialect.removeTrigger(history.getSourceCatalogName(), history.getSourceSchemaName(), history
160 .getNameForDeleteTrigger(), trigger.getSourceTableName(), history);
161 dbDialect.removeTrigger(history.getSourceCatalogName(), history.getSourceSchemaName(), history
162 .getNameForUpdateTrigger(), trigger.getSourceTableName(), history);
163 configurationService.inactivateTriggerHistory(history);
164 } else {
165 logger.info("A trigger was inactivated that had not yet been built. Taking no action.");
166 }
167 }
168 }
169
170 /***
171 * Create triggers on SymmetricDS tables so changes to configuration can be
172 * synchronized.
173 */
174 private List<Trigger> getConfigurationTriggers() {
175 List<Trigger> triggers = new ArrayList<Trigger>();
176 Node me = nodeService.findIdentity();
177 if (parameterService.is(ParameterConstants.AUTO_SYNC_CONFIGURATION) && me != null) {
178 List<NodeGroupLink> links = configurationService.getGroupLinksFor(me.getNodeGroupId());
179 for (NodeGroupLink nodeGroupLink : links) {
180 if (nodeGroupLink.getDataEventAction().equals(DataEventAction.WAIT_FOR_POLL)) {
181 triggers.addAll(configurationService.getConfigurationTriggers(nodeGroupLink.getSourceGroupId(),
182 nodeGroupLink.getTargetGroupId(), false));
183 }
184 }
185 } else {
186 logger
187 .info("Auto syncing of configuration is currently off. Configuration triggers will not be generated.");
188 }
189 return triggers;
190 }
191
192 public Map<Integer,Trigger> getCachedTriggers(boolean refreshCache) {
193 if (triggerCache == null || refreshCache) {
194 synchronized (this) {
195 triggerCache = new HashMap<Integer, Trigger>();
196 List<Trigger> triggers = new ArrayList<Trigger>();
197 triggers.addAll(getConfigurationTriggers());
198 triggers.addAll(configurationService.getActiveTriggersForSourceNodeGroup(parameterService
199 .getString(ParameterConstants.NODE_GROUP_ID)));
200 for (Trigger trigger : triggers) {
201 triggerCache.put(trigger.getTriggerId(), trigger);
202 }
203 }
204 }
205 return triggerCache;
206 }
207
208
209
210 private void updateOrCreateSymTriggers() {
211 Collection<Trigger> triggers = getCachedTriggers(true).values();
212
213 for (Trigger trigger : triggers) {
214
215 String schemaPlusTriggerName = (trigger.getSourceSchemaName() != null ? trigger.getSourceSchemaName() + "."
216 : "")
217 + trigger.getSourceTableName();
218
219 try {
220
221 TriggerReBuildReason reason = TriggerReBuildReason.NEW_TRIGGERS;
222
223 Table table = dbDialect.getMetaDataFor(trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
224 trigger.getSourceTableName(), false);
225
226 if (table != null) {
227 TriggerHistory latestHistoryBeforeRebuild = configurationService.getLatestHistoryRecordFor(trigger
228 .getTriggerId());
229
230 boolean forceRebuildOfTriggers = false;
231 if (latestHistoryBeforeRebuild == null) {
232 reason = TriggerReBuildReason.NEW_TRIGGERS;
233 forceRebuildOfTriggers = true;
234
235 } else if (TriggerHistory.calculateTableHashFor(table) != latestHistoryBeforeRebuild.getTableHash()) {
236 reason = TriggerReBuildReason.TABLE_SCHEMA_CHANGED;
237 forceRebuildOfTriggers = true;
238
239 } else if (trigger.hasChangedSinceLastTriggerBuild(latestHistoryBeforeRebuild.getCreateTime())
240 || trigger.getHashedValue() != latestHistoryBeforeRebuild.getTriggerRowHash()) {
241 reason = TriggerReBuildReason.TABLE_SYNC_CONFIGURATION_CHANGED;
242 forceRebuildOfTriggers = true;
243 }
244
245
246
247
248
249 TriggerHistory newestHistory = rebuildTriggerIfNecessary(forceRebuildOfTriggers, trigger,
250 DataEventType.DELETE, reason, latestHistoryBeforeRebuild, rebuildTriggerIfNecessary(
251 forceRebuildOfTriggers, trigger, DataEventType.UPDATE, reason,
252 latestHistoryBeforeRebuild, rebuildTriggerIfNecessary(forceRebuildOfTriggers,
253 trigger, DataEventType.INSERT, reason, latestHistoryBeforeRebuild, null,
254 trigger.isSyncOnInsert(), table), trigger.isSyncOnUpdate(), table), trigger
255 .isSyncOnDelete(), table);
256
257 if (latestHistoryBeforeRebuild != null && newestHistory != null) {
258 configurationService.inactivateTriggerHistory(latestHistoryBeforeRebuild);
259 }
260
261 } else {
262 logger.error("The configured table does not exist in the datasource that is configured: "
263 + schemaPlusTriggerName);
264 }
265 } catch (Exception ex) {
266 logger.error("Failed to synchronize trigger for " + schemaPlusTriggerName, ex);
267 }
268
269 }
270 }
271
272 @Deprecated
273 public void register() {
274 validateConfiguration();
275 }
276
277 public void validateConfiguration() {
278 Node node = nodeService.findIdentity();
279 if (node == null && !configurationService.isRegistrationServer()) {
280 if (!parameterService.is(ParameterConstants.START_PULL_JOB)) {
281 registrationService.registerWithServer();
282 }
283 } else if (node != null && parameterService.getExternalId().equals(node.getExternalId())
284 && parameterService.getNodeGroupId().equals(node.getNodeGroupId())) {
285 heartbeat();
286 } else if (node == null) {
287 if (!loadFromScriptIfProvided()) {
288 logger
289 .info("Could not find my identity in the database and this node is configured as a registration server. We are auto inserting the required rows to begin operation.");
290
291
292 }
293 } else {
294 throw new IllegalStateException(
295 "The configured state does not match recorded database state. The recorded external id is "
296 + node.getExternalId() + " while the configured external id is "
297 + parameterService.getExternalId() + ". The recorded node group id is "
298 + node.getNodeGroupId() + " while the configured node group id is "
299 + parameterService.getNodeGroupId());
300 }
301 }
302
303 /***
304 * Give the end use the option to provide a script that will load a
305 * registration server with an initial SymmetricDS setup.
306 *
307 * Look first on the file system, then in the classpath for the SQL file.
308 *
309 * @return true if the script was executed
310 */
311 private boolean loadFromScriptIfProvided() {
312 boolean loaded = false;
313 String sqlScript = parameterService.getString(ParameterConstants.AUTO_CONFIGURE_REGISTRATION_SERVER_SQL_SCRIPT);
314 if (!StringUtils.isBlank(sqlScript)) {
315 File file = new File(sqlScript);
316 URL fileUrl = null;
317 if (file.isFile()) {
318 try {
319 fileUrl = file.toURL();
320 } catch (MalformedURLException e) {
321 throw new RuntimeException(e);
322 }
323 } else {
324 fileUrl = getClass().getResource(sqlScript);
325 }
326
327 if (fileUrl != null) {
328 new SqlScript(fileUrl, jdbcTemplate.getDataSource(), true).execute();
329 loaded = true;
330 }
331 }
332 return loaded;
333 }
334
335 @Transactional
336 public void heartbeat() {
337 Node node = nodeService.findIdentity();
338 if (node != null) {
339 logger.info("Updating my node information and heartbeat time.");
340 node.setHeartbeatTime(new Date());
341 node.setTimezoneOffset(AppUtils.getTimezoneOffset());
342 node.setDatabaseType(dbDialect.getName());
343 node.setDatabaseVersion(dbDialect.getVersion());
344 node.setSchemaVersion(parameterService.getString(ParameterConstants.SCHEMA_VERSION));
345 node.setExternalId(parameterService.getExternalId());
346 node.setNodeGroupId(parameterService.getNodeGroupId());
347 node.setSymmetricVersion(Version.version());
348 if (!StringUtils.isBlank(parameterService.getMyUrl())) {
349 node.setSyncURL(parameterService.getMyUrl());
350 } else {
351 node.setSyncURL(Constants.PROTOCOL_NONE + "://" + AppUtils.getServerId());
352 }
353 nodeService.updateNode(node);
354 logger.info("Done updating my node information and heartbeat time.");
355 if (!configurationService.isRegistrationServer()) {
356 dataService.insertHeartbeatEvent(node);
357 }
358 }
359 }
360
361 private TriggerHistory rebuildTriggerIfNecessary(boolean forceRebuild, Trigger trigger, DataEventType dmlType,
362 TriggerReBuildReason reason, TriggerHistory oldhist, TriggerHistory hist, boolean create, Table table) {
363
364 boolean triggerExists = false;
365
366 TriggerHistory newTriggerHist = new TriggerHistory(table, trigger, reason);
367 int maxTriggerNameLength = dbDialect.getMaxTriggerNameLength();
368 String triggerPrefix = parameterService.getString(ParameterConstants.RUNTIME_CONFIG_TRIGGER_PREFIX);
369 newTriggerHist.setNameForInsertTrigger(dbDialect.getTriggerName(DataEventType.INSERT, triggerPrefix, maxTriggerNameLength,
370 trigger, hist).toUpperCase());
371 newTriggerHist.setNameForUpdateTrigger(dbDialect.getTriggerName(DataEventType.UPDATE, triggerPrefix, maxTriggerNameLength,
372 trigger, hist).toUpperCase());
373 newTriggerHist.setNameForDeleteTrigger(dbDialect.getTriggerName(DataEventType.DELETE, triggerPrefix, maxTriggerNameLength,
374 trigger, hist).toUpperCase());
375
376 String oldTriggerName = null;
377 String oldSourceSchema = null;
378 String oldCatalogName = null;
379 if (oldhist != null) {
380 oldTriggerName = oldhist.getTriggerNameForDmlType(dmlType);
381 oldSourceSchema = oldhist.getSourceSchemaName();
382 oldCatalogName = oldhist.getSourceCatalogName();
383 triggerExists = dbDialect.doesTriggerExist(oldCatalogName, oldSourceSchema, oldhist.getSourceTableName(),
384 oldTriggerName);
385 } else {
386
387
388
389 oldTriggerName = newTriggerHist.getTriggerNameForDmlType(dmlType);
390 oldSourceSchema = trigger.getSourceSchemaName();
391 oldCatalogName = trigger.getSourceCatalogName();
392 triggerExists = dbDialect.doesTriggerExist(oldCatalogName, oldSourceSchema, trigger.getSourceTableName(),
393 oldTriggerName);
394 }
395
396 if (!triggerExists && forceRebuild) {
397 reason = TriggerReBuildReason.TRIGGERS_MISSING;
398 }
399
400 if ((forceRebuild || !create) && triggerExists) {
401 dbDialect.removeTrigger(oldCatalogName, oldSourceSchema, oldTriggerName, trigger.getSourceTableName(), oldhist);
402 triggerExists = false;
403 }
404
405 boolean isDeadTrigger = !trigger.isSyncOnInsert() && !trigger.isSyncOnUpdate() && !trigger.isSyncOnDelete();
406
407 if (hist == null && (oldhist == null || (!triggerExists && create) || (isDeadTrigger && forceRebuild))) {
408 configurationService.insert(newTriggerHist);
409 hist = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
410 }
411
412 if (!triggerExists && create) {
413 dbDialect.initTrigger(dmlType, trigger, hist, tablePrefix, table);
414 }
415
416 return hist;
417 }
418
419 public void setConfigurationService(IConfigurationService configurationService) {
420 this.configurationService = configurationService;
421 }
422
423 public void setDbDialect(IDbDialect dbDialect) {
424 this.dbDialect = dbDialect;
425 }
426
427 public void setNodeService(INodeService nodeService) {
428 this.nodeService = nodeService;
429 }
430
431 public void setTablePrefix(String tablePrefix) {
432 this.tablePrefix = tablePrefix;
433 }
434
435 public void setDataService(IDataService dataService) {
436 this.dataService = dataService;
437 }
438
439 public void setUpgradeService(IUpgradeService upgradeService) {
440 this.upgradeService = upgradeService;
441 }
442
443 public void setClusterService(IClusterService clusterService) {
444 this.clusterService = clusterService;
445 }
446
447 public void setRegistrationService(IRegistrationService registrationService) {
448 this.registrationService = registrationService;
449 }
450
451 public void setDefaultChannels(List<Channel> defaultChannels) {
452 this.defaultChannels = defaultChannels;
453 }
454
455 }