1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.jumpmind.symmetric.service.impl;
23
24 import java.io.IOException;
25 import java.io.OutputStream;
26 import java.net.ConnectException;
27 import java.sql.Types;
28 import java.util.List;
29
30 import org.apache.commons.lang.time.DateUtils;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.jumpmind.symmetric.common.Constants;
34 import org.jumpmind.symmetric.common.ParameterConstants;
35 import org.jumpmind.symmetric.db.IDbDialect;
36 import org.jumpmind.symmetric.model.Node;
37 import org.jumpmind.symmetric.model.NodeSecurity;
38 import org.jumpmind.symmetric.model.OutgoingBatch;
39 import org.jumpmind.symmetric.model.Trigger;
40 import org.jumpmind.symmetric.service.IAcknowledgeService;
41 import org.jumpmind.symmetric.service.IClusterService;
42 import org.jumpmind.symmetric.service.IConfigurationService;
43 import org.jumpmind.symmetric.service.IDataExtractorService;
44 import org.jumpmind.symmetric.service.IDataLoaderService;
45 import org.jumpmind.symmetric.service.IDataService;
46 import org.jumpmind.symmetric.service.INodeService;
47 import org.jumpmind.symmetric.service.IRegistrationService;
48 import org.jumpmind.symmetric.transport.IOutgoingTransport;
49 import org.jumpmind.symmetric.transport.ITransportManager;
50 import org.jumpmind.symmetric.transport.internal.InternalOutgoingTransport;
51 import org.jumpmind.symmetric.util.RandomTimeSlot;
52
53
54
55 public class RegistrationService extends AbstractService implements IRegistrationService {
56
57 protected static final Log logger = LogFactory.getLog(RegistrationService.class);
58
59 private INodeService nodeService;
60
61 private IDataExtractorService dataExtractorService;
62
63 private IAcknowledgeService acknowledgeService;
64
65 private IConfigurationService configurationService;
66
67 private IClusterService clusterService;
68
69 private IDataService dataService;
70
71 private IDataLoaderService dataLoaderService;
72
73 private ITransportManager transportManager;
74
75 private IDbDialect dbDialect;
76
77 /***
78 * Register a node for the given domain name and domain ID if the
79 * registration is open.
80 * @param isRequestedRegistration An indicator that registration has been requested by the remote client
81 */
82 public boolean registerNode(Node node, OutputStream out, boolean isRequestedRegistration) throws IOException {
83 if (!configurationService.isRegistrationServer()) {
84
85 NodeSecurity security = nodeService.findNodeSecurity(nodeService.findIdentity().getNodeId());
86 if (security != null && security.getInitialLoadTime() == null) {
87 logger.warn("Registration is not allowed until this node has an initial load");
88 return false;
89 }
90 }
91 String nodeId = findNodeToRegister(node.getNodeGroupId(), node.getExternalId());
92 if (nodeId == null && parameterService.is(ParameterConstants.AUTO_REGISTER_ENABLED)) {
93 Node existingNode = nodeService.findNodeByExternalId(node.getNodeGroupId(), node.getExternalId());
94 if (existingNode != null) {
95 nodeId = existingNode.getNodeId();
96 } else {
97 openRegistration(node.getNodeGroupId(), node.getExternalId());
98 nodeId = findNodeToRegister(node.getNodeGroupId(), node.getExternalId());
99 }
100 }
101 if (nodeId == null) {
102 return false;
103 }
104 node.setNodeId(nodeId);
105 jdbcTemplate.update(getSql("registerNodeSecuritySql"), new Object[] { node.getNodeId() });
106 jdbcTemplate.update(getSql("registerNodeSql"), new Object[] { node.getSyncURL(), node.getSchemaVersion(),
107 node.getDatabaseType(), node.getDatabaseVersion(), node.getSymmetricVersion(), node.getNodeId() },
108 new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR });
109 boolean success = writeConfiguration(node, out);
110 if (success && parameterService.is(ParameterConstants.AUTO_RELOAD_ENABLED)) {
111
112 NodeSecurity security = nodeService.findNodeSecurity(node.getNodeId());
113 if ((security != null && security.getInitialLoadTime() == null) || isRequestedRegistration) {
114 dataService.reloadNode(node.getNodeId());
115 }
116 }
117 return success;
118 }
119
120 private String findNodeToRegister(String nodeGroupId, String externald) {
121 return (String) jdbcTemplate.queryForObject(getSql("findNodeToRegisterSql"), new Object[] { nodeGroupId,
122 externald }, String.class);
123 }
124
125 private void sleepBeforeRegistrationRetry() {
126 try {
127 RandomTimeSlot randomSleepTimeSlot = new RandomTimeSlot(parameterService
128 .getString(ParameterConstants.EXTERNAL_ID), 60);
129 long sleepTimeInMs = DateUtils.MILLIS_PER_SECOND * randomSleepTimeSlot.getRandomValueSeededByDomainId();
130 logger.warn("Could not register. Sleeping for " + sleepTimeInMs + "ms before attempting again.");
131 Thread.sleep(sleepTimeInMs);
132 } catch (InterruptedException e) {
133 }
134 }
135
136 public boolean isRegisteredWithServer() {
137 return nodeService.findIdentity() != null;
138 }
139
140 public void registerWithServer() {
141 boolean registered = isRegisteredWithServer();
142
143
144 while (!registered) {
145 try {
146 logger.info("Attempting to register with " + parameterService.getRegistrationUrl());
147 registered = dataLoaderService.loadData(transportManager.getRegisterTransport(new Node(
148 this.parameterService, dbDialect)));
149 } catch (ConnectException e) {
150 logger.warn("Connection failed while registering.");
151 } catch (Exception e) {
152 logger.error(e, e);
153 }
154
155 if (!registered) {
156 sleepBeforeRegistrationRetry();
157 } else {
158 Node node = nodeService.findIdentity();
159 if (node != null) {
160 logger.info("Successfully registered node [id=" + node.getNodeId() + "]");
161 } else {
162 logger.error("Node registration is unavailable");
163 }
164 }
165 }
166 }
167
168 /***
169 * Synchronize node configuration.
170 */
171 protected boolean writeConfiguration(Node node, OutputStream out) throws IOException {
172 boolean written = false;
173 IOutgoingTransport transport = new InternalOutgoingTransport(out);
174 List<String> tableNames = configurationService.getRootConfigChannelTableNames();
175 if (tableNames != null && tableNames.size() > 0) {
176 for (String tableName : tableNames) {
177 Trigger trigger = configurationService.getTriggerForTarget(tableName,
178 parameterService.getNodeGroupId(), node.getNodeGroupId(), Constants.CHANNEL_CONFIG);
179 if (trigger != null) {
180 OutgoingBatch batch = dataExtractorService.extractInitialLoadFor(node, trigger, transport);
181
182
183
184 acknowledgeService.ack(batch.getBatchInfo());
185 }
186 }
187 acknowledgeService.ack(dataExtractorService.extractNodeIdentityFor(node, transport).getBatchInfo());
188 written = true;
189 } else {
190 logger
191 .error("There were no configuration tables to return to the node. There is a good chance that the system is configured incorrectly.");
192 }
193 transport.close();
194 return written;
195 }
196
197 /***
198 * Re-open registration for a single node that already exists in the
199 * database. A new password is generated and the registration_enabled flag
200 * is turned on. The next node to try registering for this node group and
201 * external ID will be given this information.
202 */
203 public void reOpenRegistration(String nodeId) {
204 String password = nodeService.generatePassword();
205 Node node = nodeService.findNode(nodeId);
206 if (node != null) {
207 int updateCount = jdbcTemplate.update(getSql("reopenRegistrationSql"), new Object[] { password, nodeId });
208 if (updateCount == 0) {
209
210
211
212 jdbcTemplate.update(getSql("openRegistrationNodeSecuritySql"), new Object[] { nodeId, password });
213 }
214 }
215 }
216
217 /***
218 * Open registration for a single new node given a node group (f.e.,
219 * "STORE") and external ID (f.e., "00001"). The unique node ID and password
220 * are generated and stored in the node and node_security tables with the
221 * registration_enabled flag turned on. The next node to try registering for
222 * this node group and external ID will be given this information.
223 */
224 public void openRegistration(String nodeGroup, String externalId) {
225 String nodeId = nodeService.generateNodeId(nodeGroup, externalId);
226 String password = nodeService.generatePassword();
227 jdbcTemplate.update(getSql("openRegistrationNodeSql"), new Object[] { nodeId, nodeGroup, externalId });
228 jdbcTemplate.update(getSql("openRegistrationNodeSecuritySql"), new Object[] { nodeId, password });
229 clusterService.initLockTableForNode(nodeService.findNode(nodeId));
230 }
231
232 public void setNodeService(INodeService nodeService) {
233 this.nodeService = nodeService;
234 }
235
236 public void setDataExtractorService(IDataExtractorService dataExtractorService) {
237 this.dataExtractorService = dataExtractorService;
238 }
239
240 public void setConfigurationService(IConfigurationService configurationService) {
241 this.configurationService = configurationService;
242 }
243
244 public void setAcknowledgeService(IAcknowledgeService acknowledgeService) {
245 this.acknowledgeService = acknowledgeService;
246 }
247
248 public void setClusterService(IClusterService clusterService) {
249 this.clusterService = clusterService;
250 }
251
252 public void setDataService(IDataService dataService) {
253 this.dataService = dataService;
254 }
255
256 public boolean isAutoRegistration() {
257 return parameterService.is(ParameterConstants.AUTO_REGISTER_ENABLED);
258 }
259
260 public void setDataLoaderService(IDataLoaderService dataLoaderService) {
261 this.dataLoaderService = dataLoaderService;
262 }
263
264 public void setTransportManager(ITransportManager transportManager) {
265 this.transportManager = transportManager;
266 }
267
268 public void setDbDialect(IDbDialect dbDialect) {
269 this.dbDialect = dbDialect;
270 }
271
272 }