View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 3 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, see
18   * <http://www.gnu.org/licenses/>.
19   */
20  
21  package org.jumpmind.symmetric.transport.internal;
22  
23  import java.io.IOException;
24  import java.io.InputStream;
25  import java.io.OutputStream;
26  import java.io.OutputStreamWriter;
27  import java.io.PipedInputStream;
28  import java.io.PipedOutputStream;
29  import java.io.PrintWriter;
30  import java.util.List;
31  
32  import org.apache.commons.io.IOUtils;
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.jumpmind.symmetric.SymmetricEngine;
36  import org.jumpmind.symmetric.common.Constants;
37  import org.jumpmind.symmetric.model.BatchInfo;
38  import org.jumpmind.symmetric.model.IncomingBatchHistory;
39  import org.jumpmind.symmetric.model.Node;
40  import org.jumpmind.symmetric.model.NodeSecurity;
41  import org.jumpmind.symmetric.service.IAcknowledgeService;
42  import org.jumpmind.symmetric.service.IDataExtractorService;
43  import org.jumpmind.symmetric.service.IDataLoaderService;
44  import org.jumpmind.symmetric.service.IDataService;
45  import org.jumpmind.symmetric.service.INodeService;
46  import org.jumpmind.symmetric.service.IParameterService;
47  import org.jumpmind.symmetric.service.IRegistrationService;
48  import org.jumpmind.symmetric.transport.AbstractTransportManager;
49  import org.jumpmind.symmetric.transport.IIncomingTransport;
50  import org.jumpmind.symmetric.transport.IOutgoingTransport;
51  import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
52  import org.jumpmind.symmetric.transport.ITransportManager;
53  import org.springframework.beans.factory.BeanFactory;
54  
55  /***
56   * Coordinates interaction between two symmetric engines in the same JVM.
57   */
58  public class InternalTransportManager extends AbstractTransportManager implements ITransportManager {
59  
60      static final Log logger = LogFactory.getLog(InternalTransportManager.class);
61  
62      private IParameterService parameterServer;
63  
64      private INodeService nodeService;
65  
66      public InternalTransportManager(INodeService nodeService, IParameterService config) {
67          this.parameterServer = config;
68          this.nodeService = nodeService;
69      }
70  
71      public IIncomingTransport getPullTransport(final Node remote, final Node local) throws IOException {
72          final PipedOutputStream respOs = new PipedOutputStream();
73          final PipedInputStream respIs = new PipedInputStream(respOs);
74  
75          runAtClient(remote.getSyncURL(), null, respOs, new IClientRunnable() {
76              public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception {
77                  // TODO this is duplicated from the Pull Servlet. It should be
78                  // consolidated somehow!
79                  INodeService nodeService = (INodeService) factory.getBean(Constants.NODE_SERVICE);
80                  NodeSecurity security = nodeService.findNodeSecurity(local.getNodeId());
81                  if (security.isInitialLoadEnabled()) {
82                      ((IDataService) factory.getBean(Constants.DATA_SERVICE)).insertReloadEvent(local);
83                  }
84                  IDataExtractorService extractor = (IDataExtractorService) factory
85                          .getBean(Constants.DATAEXTRACTOR_SERVICE);
86                  IOutgoingTransport transport = new InternalOutgoingTransport(respOs);
87                  extractor.extract(local, transport);
88                  transport.close();
89              }
90          });
91          return new InternalIncomingTransport(respIs);
92      }
93  
94      public IOutgoingWithResponseTransport getPushTransport(final Node remote, final Node local) throws IOException {
95  
96          final PipedOutputStream pushOs = new PipedOutputStream();
97          final PipedInputStream pushIs = new PipedInputStream(pushOs);
98  
99          final PipedOutputStream respOs = new PipedOutputStream();
100         final PipedInputStream respIs = new PipedInputStream(respOs);
101 
102         runAtClient(remote.getSyncURL(), pushIs, respOs, new IClientRunnable() {
103             public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception {
104                 // This should be basically what the push servlet does ...
105                 IDataLoaderService service = (IDataLoaderService) factory.getBean(Constants.DATALOADER_SERVICE);
106                 service.loadData(pushIs, respOs);
107             }
108         });
109         return new InternalOutgoingWithResponseTransport(pushOs, respIs);
110     }
111 
112     public IIncomingTransport getRegisterTransport(final Node client) throws IOException {
113 
114         final PipedOutputStream respOs = new PipedOutputStream();
115         final PipedInputStream respIs = new PipedInputStream(respOs);
116 
117         runAtClient(parameterServer.getRegistrationUrl(), null, respOs, new IClientRunnable() {
118             public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception {
119                 // This should be basically what the registration servlet does
120                 // ...
121                 IRegistrationService service = (IRegistrationService) factory.getBean(Constants.REGISTRATION_SERVICE);
122                 service.registerNode(client, os, false);
123             }
124         });
125         return new InternalIncomingTransport(respIs);
126     }
127 
128     public boolean sendAcknowledgement(Node remote, List<IncomingBatchHistory> list, Node local) throws IOException {
129         try {
130             if (list != null && list.size() > 0) {
131                 SymmetricEngine remoteEngine = getTargetEngine(remote.getSyncURL());
132 
133                 String ackData = getAcknowledgementData(local.getNodeId(), list);
134                 List<BatchInfo> batches = readAcknowledgement(ackData);
135                 IAcknowledgeService service = (IAcknowledgeService) remoteEngine.getApplicationContext().getBean(
136                         Constants.ACKNOWLEDGE_SERVICE);
137                 for (BatchInfo batchInfo : batches) {
138                     service.ack(batchInfo);
139                 }
140 
141             }
142             return true;
143         } catch (Exception ex) {
144             logger.error(ex, ex);
145             return false;
146         }
147     }
148 
149     public void writeAcknowledgement(OutputStream out, List<IncomingBatchHistory> list) throws IOException {
150         PrintWriter pw = new PrintWriter(new OutputStreamWriter(out, ENCODING), true);
151         pw.println(getAcknowledgementData(nodeService.findIdentity().getNodeId(), list));
152         pw.close();
153     }
154 
155     private void runAtClient(final String url, final InputStream is, final OutputStream os,
156             final IClientRunnable runnable) {
157         new Thread() {
158             public void run() {
159                 try {
160                     SymmetricEngine engine = getTargetEngine(url);
161                     runnable.run(engine.getApplicationContext(), is, os);
162                 } catch (Exception e) {
163                     logger.error(e, e);
164                 } finally {
165                     IOUtils.closeQuietly(is);
166                     IOUtils.closeQuietly(os);
167                 }
168             }
169         }.start();
170     }
171 
172     private SymmetricEngine getTargetEngine(String url) {
173         SymmetricEngine engine = SymmetricEngine.findEngineByUrl(url);
174         if (engine == null) {
175             throw new NullPointerException("Could not find the engine reference for the following url: " + url);
176         } else {
177             return engine;
178         }
179     }
180 
181     interface IClientRunnable {
182         public void run(BeanFactory factory, InputStream is, OutputStream os) throws Exception;
183     }
184 
185 }