View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Eric Long <erilong@users.sourceforge.net>,
5    *               Chris Henson <chenson42@users.sourceforge.net>
6    *
7    * This library is free software; you can redistribute it and/or
8    * modify it under the terms of the GNU Lesser General Public
9    * License as published by the Free Software Foundation; either
10   * version 3 of the License, or (at your option) any later version.
11   *
12   * This library is distributed in the hope that it will be useful,
13   * but WITHOUT ANY WARRANTY; without even the implied warranty of
14   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15   * Lesser General Public License for more details.
16   *
17   * You should have received a copy of the GNU Lesser General Public
18   * License along with this library; if not, see
19   * <http://www.gnu.org/licenses/>.
20   */
21  
22  package org.jumpmind.symmetric.service.impl;
23  
24  import java.io.IOException;
25  import java.io.OutputStream;
26  import java.net.ConnectException;
27  import java.sql.Types;
28  import java.util.List;
29  
30  import org.apache.commons.lang.time.DateUtils;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.jumpmind.symmetric.common.Constants;
34  import org.jumpmind.symmetric.common.ParameterConstants;
35  import org.jumpmind.symmetric.db.IDbDialect;
36  import org.jumpmind.symmetric.model.Node;
37  import org.jumpmind.symmetric.model.NodeSecurity;
38  import org.jumpmind.symmetric.model.OutgoingBatch;
39  import org.jumpmind.symmetric.model.Trigger;
40  import org.jumpmind.symmetric.service.IAcknowledgeService;
41  import org.jumpmind.symmetric.service.IClusterService;
42  import org.jumpmind.symmetric.service.IConfigurationService;
43  import org.jumpmind.symmetric.service.IDataExtractorService;
44  import org.jumpmind.symmetric.service.IDataLoaderService;
45  import org.jumpmind.symmetric.service.IDataService;
46  import org.jumpmind.symmetric.service.INodeService;
47  import org.jumpmind.symmetric.service.IRegistrationService;
48  import org.jumpmind.symmetric.transport.IOutgoingTransport;
49  import org.jumpmind.symmetric.transport.ITransportManager;
50  import org.jumpmind.symmetric.transport.internal.InternalOutgoingTransport;
51  import org.jumpmind.symmetric.util.RandomTimeSlot;
52  
53  // TODO: NodeService already does all this DML. Should use NodeService or move
54  // methods to there.
55  public class RegistrationService extends AbstractService implements IRegistrationService {
56  
57      protected static final Log logger = LogFactory.getLog(RegistrationService.class);
58  
59      private INodeService nodeService;
60  
61      private IDataExtractorService dataExtractorService;
62  
63      private IAcknowledgeService acknowledgeService;
64  
65      private IConfigurationService configurationService;
66  
67      private IClusterService clusterService;
68  
69      private IDataService dataService;
70  
71      private IDataLoaderService dataLoaderService;
72  
73      private ITransportManager transportManager;
74  
75      private IDbDialect dbDialect;
76  
77      /***
78       * Register a node for the given domain name and domain ID if the
79       * registration is open.
80       * @param isRequestedRegistration An indicator that registration has been requested by the remote client
81       */
82      public boolean registerNode(Node node, OutputStream out, boolean isRequestedRegistration) throws IOException {
83          if (!configurationService.isRegistrationServer()) {
84              // registration is not allowed until this node has an initial load
85              NodeSecurity security = nodeService.findNodeSecurity(nodeService.findIdentity().getNodeId());
86              if (security != null && security.getInitialLoadTime() == null) {
87                  logger.warn("Registration is not allowed until this node has an initial load");
88                  return false;
89              }
90          }
91          String nodeId = findNodeToRegister(node.getNodeGroupId(), node.getExternalId());
92          if (nodeId == null && parameterService.is(ParameterConstants.AUTO_REGISTER_ENABLED)) {
93              Node existingNode = nodeService.findNodeByExternalId(node.getNodeGroupId(), node.getExternalId());
94              if (existingNode != null) {
95                  nodeId = existingNode.getNodeId();
96              } else {
97                  openRegistration(node.getNodeGroupId(), node.getExternalId());
98                  nodeId = findNodeToRegister(node.getNodeGroupId(), node.getExternalId());
99              }
100         }
101         if (nodeId == null) {
102             return false;
103         }
104         node.setNodeId(nodeId);
105         jdbcTemplate.update(getSql("registerNodeSecuritySql"), new Object[] { node.getNodeId() });
106         jdbcTemplate.update(getSql("registerNodeSql"), new Object[] { node.getSyncURL(), node.getSchemaVersion(),
107                 node.getDatabaseType(), node.getDatabaseVersion(), node.getSymmetricVersion(), node.getNodeId() },
108                 new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR });
109         boolean success = writeConfiguration(node, out);
110         if (success && parameterService.is(ParameterConstants.AUTO_RELOAD_ENABLED)) {
111             // only send automatic initial load once or if the client is really re-registering
112             NodeSecurity security = nodeService.findNodeSecurity(node.getNodeId());
113             if ((security != null && security.getInitialLoadTime() == null) || isRequestedRegistration) {
114                 dataService.reloadNode(node.getNodeId());
115             }
116         }
117         return success;
118     }
119 
120     private String findNodeToRegister(String nodeGroupId, String externald) {
121         return (String) jdbcTemplate.queryForObject(getSql("findNodeToRegisterSql"), new Object[] { nodeGroupId,
122                 externald }, String.class);
123     }
124 
125     private void sleepBeforeRegistrationRetry() {
126         try {
127             RandomTimeSlot randomSleepTimeSlot = new RandomTimeSlot(parameterService
128                     .getString(ParameterConstants.EXTERNAL_ID), 60);
129             long sleepTimeInMs = DateUtils.MILLIS_PER_SECOND * randomSleepTimeSlot.getRandomValueSeededByDomainId();
130             logger.warn("Could not register.  Sleeping for " + sleepTimeInMs + "ms before attempting again.");
131             Thread.sleep(sleepTimeInMs);
132         } catch (InterruptedException e) {
133         }
134     }
135 
136     public boolean isRegisteredWithServer() {
137         return nodeService.findIdentity() != null;
138     }
139 
140     public void registerWithServer() {
141         boolean registered = isRegisteredWithServer();
142         // If we cannot contact the server to register, we simply must wait and
143         // try again.
144         while (!registered) {
145             try {
146                 logger.info("Attempting to register with " + parameterService.getRegistrationUrl());
147                 registered = dataLoaderService.loadData(transportManager.getRegisterTransport(new Node(
148                         this.parameterService, dbDialect)));
149             } catch (ConnectException e) {
150                 logger.warn("Connection failed while registering.");
151             } catch (Exception e) {
152                 logger.error(e, e);
153             }
154 
155             if (!registered) {
156                 sleepBeforeRegistrationRetry();
157             } else {
158                 Node node = nodeService.findIdentity();
159                 if (node != null) {
160                     logger.info("Successfully registered node [id=" + node.getNodeId() + "]");
161                 } else {
162                     logger.error("Node registration is unavailable");
163                 }
164             }
165         }
166     }
167 
168     /***
169      * Synchronize node configuration.
170      */
171     protected boolean writeConfiguration(Node node, OutputStream out) throws IOException {
172         boolean written = false;
173         IOutgoingTransport transport = new InternalOutgoingTransport(out);
174         List<String> tableNames = configurationService.getRootConfigChannelTableNames();
175         if (tableNames != null && tableNames.size() > 0) {
176             for (String tableName : tableNames) {
177                 Trigger trigger = configurationService.getTriggerForTarget(tableName,
178                         parameterService.getNodeGroupId(), node.getNodeGroupId(), Constants.CHANNEL_CONFIG);
179                 if (trigger != null) {
180                     OutgoingBatch batch = dataExtractorService.extractInitialLoadFor(node, trigger, transport);
181                     // acknowledge right away, because the acknowledgment is not
182                     // build into the registration
183                     // protocol.
184                     acknowledgeService.ack(batch.getBatchInfo());
185                 }
186             }
187             acknowledgeService.ack(dataExtractorService.extractNodeIdentityFor(node, transport).getBatchInfo());
188             written = true;
189         } else {
190             logger
191                     .error("There were no configuration tables to return to the node.  There is a good chance that the system is configured incorrectly.");
192         }
193         transport.close();
194         return written;
195     }
196 
197     /***
198      * Re-open registration for a single node that already exists in the
199      * database. A new password is generated and the registration_enabled flag
200      * is turned on. The next node to try registering for this node group and
201      * external ID will be given this information.
202      */
203     public void reOpenRegistration(String nodeId) {
204         String password = nodeService.generatePassword();
205         Node node = nodeService.findNode(nodeId);
206         if (node != null) {
207             int updateCount = jdbcTemplate.update(getSql("reopenRegistrationSql"), new Object[] { password, nodeId });
208             if (updateCount == 0) {
209                 // if the update count was 0, then we probably have a row in the
210                 // node table, but not in node security.
211                 // lets go ahead and try to insert into node security.
212                 jdbcTemplate.update(getSql("openRegistrationNodeSecuritySql"), new Object[] { nodeId, password });
213             }
214         }
215     }
216 
217     /***
218      * Open registration for a single new node given a node group (f.e.,
219      * "STORE") and external ID (f.e., "00001"). The unique node ID and password
220      * are generated and stored in the node and node_security tables with the
221      * registration_enabled flag turned on. The next node to try registering for
222      * this node group and external ID will be given this information.
223      */
224     public void openRegistration(String nodeGroup, String externalId) {
225         String nodeId = nodeService.generateNodeId(nodeGroup, externalId);
226         String password = nodeService.generatePassword();
227         jdbcTemplate.update(getSql("openRegistrationNodeSql"), new Object[] { nodeId, nodeGroup, externalId });
228         jdbcTemplate.update(getSql("openRegistrationNodeSecuritySql"), new Object[] { nodeId, password });
229         clusterService.initLockTableForNode(nodeService.findNode(nodeId));
230     }
231 
232     public void setNodeService(INodeService nodeService) {
233         this.nodeService = nodeService;
234     }
235 
236     public void setDataExtractorService(IDataExtractorService dataExtractorService) {
237         this.dataExtractorService = dataExtractorService;
238     }
239 
240     public void setConfigurationService(IConfigurationService configurationService) {
241         this.configurationService = configurationService;
242     }
243 
244     public void setAcknowledgeService(IAcknowledgeService acknowledgeService) {
245         this.acknowledgeService = acknowledgeService;
246     }
247 
248     public void setClusterService(IClusterService clusterService) {
249         this.clusterService = clusterService;
250     }
251 
252     public void setDataService(IDataService dataService) {
253         this.dataService = dataService;
254     }
255 
256     public boolean isAutoRegistration() {
257         return parameterService.is(ParameterConstants.AUTO_REGISTER_ENABLED);
258     }
259 
260     public void setDataLoaderService(IDataLoaderService dataLoaderService) {
261         this.dataLoaderService = dataLoaderService;
262     }
263 
264     public void setTransportManager(ITransportManager transportManager) {
265         this.transportManager = transportManager;
266     }
267 
268     public void setDbDialect(IDbDialect dbDialect) {
269         this.dbDialect = dbDialect;
270     }
271 
272 }