View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>,
5    *               Eric Long <erilong@users.sourceforge.net>
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 3 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, see
19   * <http://www.gnu.org/licenses/>.
20   */
21  package org.jumpmind.symmetric.service.impl;
22  
23  import java.io.File;
24  import java.net.MalformedURLException;
25  import java.net.URL;
26  import java.util.ArrayList;
27  import java.util.Collection;
28  import java.util.Date;
29  import java.util.HashMap;
30  import java.util.List;
31  import java.util.Map;
32  
33  import org.apache.commons.lang.StringUtils;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.ddlutils.model.Table;
37  import org.jumpmind.symmetric.Version;
38  import org.jumpmind.symmetric.common.Constants;
39  import org.jumpmind.symmetric.common.ParameterConstants;
40  import org.jumpmind.symmetric.db.IDbDialect;
41  import org.jumpmind.symmetric.db.SqlScript;
42  import org.jumpmind.symmetric.model.Channel;
43  import org.jumpmind.symmetric.model.DataEventAction;
44  import org.jumpmind.symmetric.model.DataEventType;
45  import org.jumpmind.symmetric.model.Node;
46  import org.jumpmind.symmetric.model.NodeGroupLink;
47  import org.jumpmind.symmetric.model.Trigger;
48  import org.jumpmind.symmetric.model.TriggerHistory;
49  import org.jumpmind.symmetric.model.TriggerReBuildReason;
50  import org.jumpmind.symmetric.service.IBootstrapService;
51  import org.jumpmind.symmetric.service.IClusterService;
52  import org.jumpmind.symmetric.service.IConfigurationService;
53  import org.jumpmind.symmetric.service.IDataService;
54  import org.jumpmind.symmetric.service.INodeService;
55  import org.jumpmind.symmetric.service.IRegistrationService;
56  import org.jumpmind.symmetric.service.IUpgradeService;
57  import org.jumpmind.symmetric.service.LockAction;
58  import org.jumpmind.symmetric.util.AppUtils;
59  import org.springframework.transaction.annotation.Transactional;
60  
61  public class BootstrapService extends AbstractService implements IBootstrapService {
62  
63      static final Log logger = LogFactory.getLog(BootstrapService.class);
64  
65      private IDbDialect dbDialect;
66  
67      private String tablePrefix;
68  
69      private IConfigurationService configurationService;
70  
71      private IClusterService clusterService;
72  
73      private INodeService nodeService;
74  
75      private IDataService dataService;
76  
77      private IUpgradeService upgradeService;
78  
79      private IRegistrationService registrationService;
80  
81      private List<Channel> defaultChannels;
82  
83      private boolean initialized = false;
84  
85      private Map<Integer,Trigger> triggerCache;
86      
87      public void setupDatabase() {
88          setupDatabase(false);
89      }
90      
91      public void setupDatabase(boolean force) {
92          if (!initialized || force) {
93              if (parameterService.is(ParameterConstants.AUTO_CONFIGURE_DATABASE) || force) {
94                  logger.info("Initializing SymmetricDS database.");
95                  dbDialect.initConfigDb();
96                  if (defaultChannels != null) {
97                      logger.info("Setting up " + defaultChannels.size() + " default channels");
98                      for (Channel defaultChannel : defaultChannels) {
99                          configurationService.saveChannel(defaultChannel);
100                     }
101                 }
102                 parameterService.rereadParameters();
103                 logger.info("Done initializing SymmetricDS database.");
104             } else {
105                 logger.info("SymmetricDS is not configured to auto create the database.");
106             }
107 
108             if (upgradeService.isUpgradeNecessary()) {
109                 if (parameterService.is(ParameterConstants.AUTO_UPGRADE)) {
110                     try {
111                         upgradeService.upgrade();
112                     } catch (RuntimeException ex) {
113                         logger
114                                 .fatal(
115                                         "The upgrade failed. The system may be unstable.  Please resolve the problem manually.",
116                                         ex);
117                         throw ex;
118                     }
119                 } else {
120                     throw new RuntimeException("Upgrade of node is necessary.  "
121                             + "Please set auto.upgrade property to true for an automated upgrade.");
122                 }
123             }
124             initialized = true;
125 
126         }
127 
128         // lets do this every time init is called.
129         clusterService.initLockTable();
130     }
131 
132     /***
133      * This is done periodically throughout the day (so it needs to be
134      * efficient). If the trigger is created for the first time (no previous
135      * trigger existed), then should we auto-resync data?
136      */
137     public void syncTriggers() {
138         if (clusterService.lock(LockAction.SYNCTRIGGERS)) {
139             try {
140                 logger.info("Synchronizing triggers");
141                 removeInactiveTriggers();
142                 updateOrCreateSymTriggers();
143             } finally {
144                 clusterService.unlock(LockAction.SYNCTRIGGERS);
145                 logger.info("Done synchronizing triggers.");
146             }
147         }
148     }
149 
150     private void removeInactiveTriggers() {
151         List<Trigger> triggers = configurationService.getInactiveTriggersForSourceNodeGroup(parameterService
152                 .getString(ParameterConstants.NODE_GROUP_ID));
153         for (Trigger trigger : triggers) {
154             TriggerHistory history = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
155             logger.info("About to remove triggers for inactivated table: " + history.getSourceTableName());
156             if (history != null) {
157                 dbDialect.removeTrigger(history.getSourceCatalogName(), history.getSourceSchemaName(), history
158                         .getNameForInsertTrigger(), trigger.getSourceTableName(), history);
159                 dbDialect.removeTrigger(history.getSourceCatalogName(), history.getSourceSchemaName(), history
160                         .getNameForDeleteTrigger(), trigger.getSourceTableName(), history);
161                 dbDialect.removeTrigger(history.getSourceCatalogName(), history.getSourceSchemaName(), history
162                         .getNameForUpdateTrigger(), trigger.getSourceTableName(), history);
163                 configurationService.inactivateTriggerHistory(history);
164             } else {
165                 logger.info("A trigger was inactivated that had not yet been built.  Taking no action.");
166             }
167         }
168     }
169 
170     /***
171      * Create triggers on SymmetricDS tables so changes to configuration can be
172      * synchronized.
173      */
174     private List<Trigger> getConfigurationTriggers() {
175         List<Trigger> triggers = new ArrayList<Trigger>();
176         Node me = nodeService.findIdentity();
177         if (parameterService.is(ParameterConstants.AUTO_SYNC_CONFIGURATION) && me != null) {
178             List<NodeGroupLink> links = configurationService.getGroupLinksFor(me.getNodeGroupId());
179             for (NodeGroupLink nodeGroupLink : links) {
180                 if (nodeGroupLink.getDataEventAction().equals(DataEventAction.WAIT_FOR_POLL)) {
181                     triggers.addAll(configurationService.getConfigurationTriggers(nodeGroupLink.getSourceGroupId(),
182                             nodeGroupLink.getTargetGroupId(), false));
183                 }
184             }
185         } else {
186             logger
187                     .info("Auto syncing of configuration is currently off.  Configuration triggers will not be generated.");
188         }
189         return triggers;
190     }
191     
192     public Map<Integer,Trigger> getCachedTriggers(boolean refreshCache) {
193         if (triggerCache == null || refreshCache) {
194             synchronized (this) {
195                 triggerCache = new HashMap<Integer, Trigger>();
196                 List<Trigger> triggers = new ArrayList<Trigger>();
197                 triggers.addAll(getConfigurationTriggers());
198                 triggers.addAll(configurationService.getActiveTriggersForSourceNodeGroup(parameterService
199                         .getString(ParameterConstants.NODE_GROUP_ID)));
200                 for (Trigger trigger : triggers) {
201                     triggerCache.put(trigger.getTriggerId(), trigger);
202                 }               
203             }
204         }
205         return triggerCache;
206     }
207 
208     
209 
210     private void updateOrCreateSymTriggers() {
211         Collection<Trigger> triggers = getCachedTriggers(true).values();
212 
213         for (Trigger trigger : triggers) {
214 
215             String schemaPlusTriggerName = (trigger.getSourceSchemaName() != null ? trigger.getSourceSchemaName() + "."
216                     : "")
217                     + trigger.getSourceTableName();
218 
219             try {
220 
221                 TriggerReBuildReason reason = TriggerReBuildReason.NEW_TRIGGERS;
222 
223                 Table table = dbDialect.getMetaDataFor(trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
224                         trigger.getSourceTableName(), false);
225 
226                 if (table != null) {
227                     TriggerHistory latestHistoryBeforeRebuild = configurationService.getLatestHistoryRecordFor(trigger
228                             .getTriggerId());
229 
230                     boolean forceRebuildOfTriggers = false;
231                     if (latestHistoryBeforeRebuild == null) {
232                         reason = TriggerReBuildReason.NEW_TRIGGERS;
233                         forceRebuildOfTriggers = true;
234 
235                     } else if (TriggerHistory.calculateTableHashFor(table) != latestHistoryBeforeRebuild.getTableHash()) {
236                         reason = TriggerReBuildReason.TABLE_SCHEMA_CHANGED;
237                         forceRebuildOfTriggers = true;
238 
239                     } else if (trigger.hasChangedSinceLastTriggerBuild(latestHistoryBeforeRebuild.getCreateTime())
240                             || trigger.getHashedValue() != latestHistoryBeforeRebuild.getTriggerRowHash()) {
241                         reason = TriggerReBuildReason.TABLE_SYNC_CONFIGURATION_CHANGED;
242                         forceRebuildOfTriggers = true;
243                     }
244 
245                     // TODO should probably check to see if the time stamp on
246                     // the symmetric-dialects.xml is newer than the
247                     // create time on the hist record.
248 
249                     TriggerHistory newestHistory = rebuildTriggerIfNecessary(forceRebuildOfTriggers, trigger,
250                             DataEventType.DELETE, reason, latestHistoryBeforeRebuild, rebuildTriggerIfNecessary(
251                                     forceRebuildOfTriggers, trigger, DataEventType.UPDATE, reason,
252                                     latestHistoryBeforeRebuild, rebuildTriggerIfNecessary(forceRebuildOfTriggers,
253                                             trigger, DataEventType.INSERT, reason, latestHistoryBeforeRebuild, null,
254                                             trigger.isSyncOnInsert(), table), trigger.isSyncOnUpdate(), table), trigger
255                                     .isSyncOnDelete(), table);
256 
257                     if (latestHistoryBeforeRebuild != null && newestHistory != null) {
258                         configurationService.inactivateTriggerHistory(latestHistoryBeforeRebuild);
259                     }
260 
261                 } else {
262                     logger.error("The configured table does not exist in the datasource that is configured: "
263                             + schemaPlusTriggerName);
264                 }
265             } catch (Exception ex) {
266                 logger.error("Failed to synchronize trigger for " + schemaPlusTriggerName, ex);
267             }
268 
269         }
270     }
271 
272     @Deprecated
273     public void register() {
274         validateConfiguration();
275     }
276 
277     public void validateConfiguration() {
278         Node node = nodeService.findIdentity();
279         if (node == null && !configurationService.isRegistrationServer()) {
280             if (!parameterService.is(ParameterConstants.START_PULL_JOB)) {
281                 registrationService.registerWithServer();
282             }
283         } else if (node != null && parameterService.getExternalId().equals(node.getExternalId())
284                 && parameterService.getNodeGroupId().equals(node.getNodeGroupId())) {
285             heartbeat();
286         } else if (node == null) {
287             if (!loadFromScriptIfProvided()) {
288                 logger
289                         .info("Could not find my identity in the database and this node is configured as a registration server.  We are auto inserting the required rows to begin operation.");
290                 // TODO
291                 // nodeService.insertIdentity();
292             }
293         } else {
294             throw new IllegalStateException(
295                     "The configured state does not match recorded database state.  The recorded external id is "
296                             + node.getExternalId() + " while the configured external id is "
297                             + parameterService.getExternalId() + ".  The recorded node group id is "
298                             + node.getNodeGroupId() + " while the configured node group id is "
299                             + parameterService.getNodeGroupId());
300         }
301     }
302 
303     /***
304      * Give the end use the option to provide a script that will load a
305      * registration server with an initial SymmetricDS setup.
306      * 
307      * Look first on the file system, then in the classpath for the SQL file.
308      * 
309      * @return true if the script was executed
310      */
311     private boolean loadFromScriptIfProvided() {
312         boolean loaded = false;
313         String sqlScript = parameterService.getString(ParameterConstants.AUTO_CONFIGURE_REGISTRATION_SERVER_SQL_SCRIPT);
314         if (!StringUtils.isBlank(sqlScript)) {
315             File file = new File(sqlScript);
316             URL fileUrl = null;
317             if (file.isFile()) {
318                 try {
319                     fileUrl = file.toURL();
320                 } catch (MalformedURLException e) {
321                     throw new RuntimeException(e);
322                 }
323             } else {
324                 fileUrl = getClass().getResource(sqlScript);
325             }
326 
327             if (fileUrl != null) {
328                 new SqlScript(fileUrl, jdbcTemplate.getDataSource(), true).execute();
329                 loaded = true;
330             }
331         }
332         return loaded;
333     }
334 
335     @Transactional
336     public void heartbeat() {
337         Node node = nodeService.findIdentity();
338         if (node != null) {
339             logger.info("Updating my node information and heartbeat time.");
340             node.setHeartbeatTime(new Date());
341             node.setTimezoneOffset(AppUtils.getTimezoneOffset());
342             node.setDatabaseType(dbDialect.getName());
343             node.setDatabaseVersion(dbDialect.getVersion());
344             node.setSchemaVersion(parameterService.getString(ParameterConstants.SCHEMA_VERSION));
345             node.setExternalId(parameterService.getExternalId());
346             node.setNodeGroupId(parameterService.getNodeGroupId());
347             node.setSymmetricVersion(Version.version());
348             if (!StringUtils.isBlank(parameterService.getMyUrl())) {
349                 node.setSyncURL(parameterService.getMyUrl());
350             } else {
351                 node.setSyncURL(Constants.PROTOCOL_NONE + "://" + AppUtils.getServerId());
352             }
353             nodeService.updateNode(node);
354             logger.info("Done updating my node information and heartbeat time.");
355             if (!configurationService.isRegistrationServer()) {
356                 dataService.insertHeartbeatEvent(node);
357             }
358         }
359     }
360 
361     private TriggerHistory rebuildTriggerIfNecessary(boolean forceRebuild, Trigger trigger, DataEventType dmlType,
362             TriggerReBuildReason reason, TriggerHistory oldhist, TriggerHistory hist, boolean create, Table table) {
363 
364         boolean triggerExists = false;
365 
366         TriggerHistory newTriggerHist = new TriggerHistory(table, trigger, reason);
367         int maxTriggerNameLength = dbDialect.getMaxTriggerNameLength();
368         String triggerPrefix = parameterService.getString(ParameterConstants.RUNTIME_CONFIG_TRIGGER_PREFIX);
369         newTriggerHist.setNameForInsertTrigger(dbDialect.getTriggerName(DataEventType.INSERT, triggerPrefix, maxTriggerNameLength,
370                 trigger, hist).toUpperCase());
371         newTriggerHist.setNameForUpdateTrigger(dbDialect.getTriggerName(DataEventType.UPDATE, triggerPrefix, maxTriggerNameLength,
372                 trigger, hist).toUpperCase());
373         newTriggerHist.setNameForDeleteTrigger(dbDialect.getTriggerName(DataEventType.DELETE, triggerPrefix, maxTriggerNameLength,
374                 trigger, hist).toUpperCase());
375         
376         String oldTriggerName = null;
377         String oldSourceSchema = null;
378         String oldCatalogName = null;
379         if (oldhist != null) {
380             oldTriggerName = oldhist.getTriggerNameForDmlType(dmlType);
381             oldSourceSchema = oldhist.getSourceSchemaName();
382             oldCatalogName = oldhist.getSourceCatalogName();
383             triggerExists = dbDialect.doesTriggerExist(oldCatalogName, oldSourceSchema, oldhist.getSourceTableName(),
384                     oldTriggerName);
385         } else {
386             // We had no trigger_hist row, lets validate that the trigger as
387             // defined in the trigger
388             // does not exist as well.
389             oldTriggerName = newTriggerHist.getTriggerNameForDmlType(dmlType);
390             oldSourceSchema = trigger.getSourceSchemaName();
391             oldCatalogName = trigger.getSourceCatalogName();
392             triggerExists = dbDialect.doesTriggerExist(oldCatalogName, oldSourceSchema, trigger.getSourceTableName(),
393                     oldTriggerName);
394         }
395 
396         if (!triggerExists && forceRebuild) {
397             reason = TriggerReBuildReason.TRIGGERS_MISSING;
398         }
399 
400         if ((forceRebuild || !create) && triggerExists) {
401             dbDialect.removeTrigger(oldCatalogName, oldSourceSchema, oldTriggerName, trigger.getSourceTableName(), oldhist);
402             triggerExists = false;
403         }
404 
405         boolean isDeadTrigger = !trigger.isSyncOnInsert() && !trigger.isSyncOnUpdate() && !trigger.isSyncOnDelete();
406 
407         if (hist == null && (oldhist == null || (!triggerExists && create) || (isDeadTrigger && forceRebuild))) {
408             configurationService.insert(newTriggerHist);
409             hist = configurationService.getLatestHistoryRecordFor(trigger.getTriggerId());
410         }
411 
412         if (!triggerExists && create) {
413             dbDialect.initTrigger(dmlType, trigger, hist, tablePrefix, table);
414         }
415 
416         return hist;
417     }
418 
419     public void setConfigurationService(IConfigurationService configurationService) {
420         this.configurationService = configurationService;
421     }
422 
423     public void setDbDialect(IDbDialect dbDialect) {
424         this.dbDialect = dbDialect;
425     }
426 
427     public void setNodeService(INodeService nodeService) {
428         this.nodeService = nodeService;
429     }
430 
431     public void setTablePrefix(String tablePrefix) {
432         this.tablePrefix = tablePrefix;
433     }
434 
435     public void setDataService(IDataService dataService) {
436         this.dataService = dataService;
437     }
438 
439     public void setUpgradeService(IUpgradeService upgradeService) {
440         this.upgradeService = upgradeService;
441     }
442 
443     public void setClusterService(IClusterService clusterService) {
444         this.clusterService = clusterService;
445     }
446 
447     public void setRegistrationService(IRegistrationService registrationService) {
448         this.registrationService = registrationService;
449     }
450 
451     public void setDefaultChannels(List<Channel> defaultChannels) {
452         this.defaultChannels = defaultChannels;
453     }
454 
455 }