1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.jumpmind.symmetric.transport.internal;
22
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.io.OutputStreamWriter;
27 import java.io.PipedInputStream;
28 import java.io.PipedOutputStream;
29 import java.io.PrintWriter;
30 import java.util.List;
31
32 import org.apache.commons.io.IOUtils;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.jumpmind.symmetric.SymmetricEngine;
36 import org.jumpmind.symmetric.common.Constants;
37 import org.jumpmind.symmetric.model.BatchInfo;
38 import org.jumpmind.symmetric.model.IncomingBatchHistory;
39 import org.jumpmind.symmetric.model.Node;
40 import org.jumpmind.symmetric.model.NodeSecurity;
41 import org.jumpmind.symmetric.service.IAcknowledgeService;
42 import org.jumpmind.symmetric.service.IDataExtractorService;
43 import org.jumpmind.symmetric.service.IDataLoaderService;
44 import org.jumpmind.symmetric.service.IDataService;
45 import org.jumpmind.symmetric.service.INodeService;
46 import org.jumpmind.symmetric.service.IParameterService;
47 import org.jumpmind.symmetric.service.IRegistrationService;
48 import org.jumpmind.symmetric.transport.AbstractTransportManager;
49 import org.jumpmind.symmetric.transport.IIncomingTransport;
50 import org.jumpmind.symmetric.transport.IOutgoingTransport;
51 import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
52 import org.jumpmind.symmetric.transport.ITransportManager;
53 import org.springframework.beans.factory.BeanFactory;
54
55 /***
56 * Coordinates interaction between two symmetric engines in the same JVM.
57 */
58 public class InternalTransportManager extends AbstractTransportManager implements ITransportManager {
59
60 static final Log logger = LogFactory.getLog(InternalTransportManager.class);
61
62 private IParameterService parameterServer;
63
64 private INodeService nodeService;
65
66 public InternalTransportManager(INodeService nodeService, IParameterService config) {
67 this.parameterServer = config;
68 this.nodeService = nodeService;
69 }
70
71 public IIncomingTransport getPullTransport(final Node remote, final Node local) throws IOException {
72 final PipedOutputStream respOs = new PipedOutputStream();
73 final PipedInputStream respIs = new PipedInputStream(respOs);
74
75 runAtClient(remote.getSyncURL(), null, respOs, new IClientRunnable() {
76 public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception {
77
78
79 INodeService nodeService = (INodeService) factory.getBean(Constants.NODE_SERVICE);
80 NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId());
81 if (security.isInitialLoadEnabled()) {
82 ((IDataService) factory.getBean(Constants.DATA_SERVICE)).insertReloadEvent(local);
83 }
84 IDataExtractorService extractor = (IDataExtractorService) factory
85 .getBean(Constants.DATAEXTRACTOR_SERVICE);
86 IOutgoingTransport transport = new InternalOutgoingTransport(respOs);
87 extractor.extract(local, transport);
88 transport.close();
89 }
90 });
91 return new InternalIncomingTransport(respIs);
92 }
93
94 public IOutgoingWithResponseTransport getPushTransport(final Node remote, final Node local) throws IOException {
95
96 final PipedOutputStream pushOs = new PipedOutputStream();
97 final PipedInputStream pushIs = new PipedInputStream(pushOs);
98
99 final PipedOutputStream respOs = new PipedOutputStream();
100 final PipedInputStream respIs = new PipedInputStream(respOs);
101
102 runAtClient(remote.getSyncURL(), pushIs, respOs, new IClientRunnable() {
103 public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception {
104
105 IDataLoaderService service = (IDataLoaderService) factory.getBean(Constants.DATALOADER_SERVICE);
106 service.loadData(pushIs, respOs);
107 }
108 });
109 return new InternalOutgoingWithResponseTransport(pushOs, respIs);
110 }
111
112 public IIncomingTransport getRegisterTransport(final Node client) throws IOException {
113
114 final PipedOutputStream respOs = new PipedOutputStream();
115 final PipedInputStream respIs = new PipedInputStream(respOs);
116
117 runAtClient(parameterServer.getRegistrationUrl(), null, respOs, new IClientRunnable() {
118 public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception {
119
120
121 IRegistrationService service = (IRegistrationService) factory.getBean(Constants.REGISTRATION_SERVICE);
122 service.registerNode(client, os, false);
123 }
124 });
125 return new InternalIncomingTransport(respIs);
126 }
127
128 public boolean sendAcknowledgement(Node remote, List<IncomingBatchHistory> list, Node local) throws IOException {
129 try {
130 if (list != null && list.size() > 0) {
131 SymmetricEngine remoteEngine = getTargetEngine(remote.getSyncURL());
132
133 String ackData = getAcknowledgementData(local.getNodeId(), list);
134 List<BatchInfo> batches = readAcknowledgement(ackData);
135 IAcknowledgeService service = (IAcknowledgeService) remoteEngine.getApplicationContext().getBean(
136 Constants.ACKNOWLEDGE_SERVICE);
137 for (BatchInfo batchInfo : batches) {
138 service.ack(batchInfo);
139 }
140
141 }
142 return true;
143 } catch (Exception ex) {
144 logger.error(ex, ex);
145 return false;
146 }
147 }
148
149 public void writeAcknowledgement(OutputStream out, List<IncomingBatchHistory> list) throws IOException {
150 PrintWriter pw = new PrintWriter(new OutputStreamWriter(out, Constants.ENCODING), true);
151 pw.println(getAcknowledgementData(nodeService.findIdentity().getNodeId(), list));
152 pw.close();
153 }
154
155 private void runAtClient(final String url, final InputStream is, final OutputStream os,
156 final IClientRunnable runnable) {
157 new Thread() {
158 public void run() {
159 try {
160 SymmetricEngine engine = getTargetEngine(url);
161 runnable.run(engine.getApplicationContext(), is, os);
162 } catch (Exception e) {
163 logger.error(e, e);
164 } finally {
165 IOUtils.closeQuietly(is);
166 IOUtils.closeQuietly(os);
167 }
168 }
169 }.start();
170 }
171
172 private SymmetricEngine getTargetEngine(String url) {
173 SymmetricEngine engine = SymmetricEngine.findEngineByUrl(url);
174 if (engine == null) {
175 throw new NullPointerException("Could not find the engine reference for the following url: " + url);
176 } else {
177 return engine;
178 }
179 }
180
181 interface IClientRunnable {
182 public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception;
183 }
184
185 }