View Javadoc

1   /*
2    * SymmetricDS is an open source database synchronization solution.
3    *   
4    * Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
5    *
6    * This library is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License as published by the Free Software Foundation; either
9    * version 3 of the License, or (at your option) any later version.
10   *
11   * This library is distributed in the hope that it will be useful,
12   * but WITHOUT ANY WARRANTY; without even the implied warranty of
13   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14   * Lesser General Public License for more details.
15   *
16   * You should have received a copy of the GNU Lesser General Public
17   * License along with this library; if not, see
18   * <http://www.gnu.org/licenses/>.
19   */
20  package org.jumpmind.symmetric.test;
21  
22  import java.math.BigDecimal;
23  import java.sql.ResultSet;
24  import java.sql.SQLException;
25  import java.sql.Timestamp;
26  import java.sql.Types;
27  import java.text.ParseException;
28  import java.util.Date;
29  import java.util.List;
30  import java.util.Map;
31  
32  import org.apache.commons.lang.ArrayUtils;
33  import org.apache.commons.lang.StringUtils;
34  import org.apache.commons.lang.time.DateUtils;
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.ddlutils.model.Table;
38  import org.jumpmind.symmetric.common.Constants;
39  import org.jumpmind.symmetric.common.ParameterConstants;
40  import org.jumpmind.symmetric.db.IDbDialect;
41  import org.jumpmind.symmetric.db.db2.Db2DbDialect;
42  import org.jumpmind.symmetric.model.OutgoingBatch;
43  import org.jumpmind.symmetric.service.IConfigurationService;
44  import org.jumpmind.symmetric.service.INodeService;
45  import org.jumpmind.symmetric.service.IOutgoingBatchService;
46  import org.jumpmind.symmetric.service.IParameterService;
47  import org.jumpmind.symmetric.statistic.IStatisticManager;
48  import org.jumpmind.symmetric.test.ParameterizedSuite.ParameterExcluder;
49  import org.junit.Assert;
50  import org.junit.Test;
51  import org.springframework.jdbc.core.RowMapper;
52  
53  public class SimpleIntegrationTest extends AbstractIntegrationTest {
54  
55      static final Log logger = LogFactory.getLog(SimpleIntegrationTest.class);
56  
57      static final String insertOrderHeaderSql = "insert into test_order_header (order_id, customer_id, status, deliver_date) values(?,?,?,?)";
58  
59      static final String updateOrderHeaderStatusSql = "update test_order_header set status = ? where order_id = ?";
60  
61      static final String selectOrderHeaderSql = "select order_id, customer_id, status, deliver_date from test_order_header where order_id = ?";
62  
63      static final String insertOrderDetailSql = "insert into test_order_detail (order_id, line_number, item_type, item_id, quantity, price) values(?,?,?,?,?,?)";
64  
65      static final String insertCustomerSql = "insert into test_customer (customer_id, name, is_active, address, city, state, zip, entry_time, notes, icon) values(?,?,?,?,?,?,?,?,?,?)";
66  
67      static final String insertTestTriggerTableSql = "insert into test_triggers_table (id, string_one_value, string_two_value) values(?,?,?)";
68  
69      static final String updateTestTriggerTableSql = "update test_triggers_table set string_one_value=?";
70  
71      static final String insertStoreStatusSql = "insert into test_store_status (store_id, register_id, status) values(?,?,?)";
72  
73      static final String updateStoreStatusSql = "update test_store_status set status = ? where store_id = ? and register_id = ?";
74  
75      static final String selectStoreStatusSql = "select status from test_store_status where store_id = ? and register_id = ?";
76  
77      static final String enableKeyWordTriggerSql = "update sym_trigger set sync_on_insert = 1, sync_on_update = 1, sync_on_delete = 1 where source_table_name = 'test_key_word'";
78  
79      static final String alterKeyWordSql = "alter table test_key_word add \"key word\" char(1)";
80  
81      static final String alterKeyWordSql2 = "alter table test_key_word add \"case\" char(1)";
82  
83      static final String insertKeyWordSql = "insert into test_key_word (id, \"key word\", \"case\") values (?, ?, ?)";
84  
85      static final String updateKeyWordSql = "update test_key_word set \"key word\" = ?, \"case\" = ? where id = ?";
86  
87      static final String selectKeyWordSql = "select \"key word\", \"case\" from test_key_word where id = ?";
88  
89      static final String nullSyncColumnLevelSql = "update test_sync_column_level set string_value = null, time_value = null, date_value = null, bigint_value = null, decimal_value = null where id = ?";
90  
91      static final String deleteSyncColumnLevelSql = "delete from test_sync_column_level where id = ?";
92  
93      static final String updateSyncColumnLevelSql = "update test_sync_column_level set $(column) = ? where id = ?";
94  
95      static final String selectSyncColumnLevelSql = "select count(*) from test_sync_column_level where id = ? and $(column) = ?";
96  
97      static final String isRegistrationClosedSql = "select count(*) from sym_node_security where registration_enabled=0 and node_id=?";
98  
99      static final byte[] BINARY_DATA = new byte[] { 0x01, 0x02, 0x03 };
100 
101     public SimpleIntegrationTest() throws Exception {
102     }
103 
104     public SimpleIntegrationTest(String client, String root) throws Exception {
105         super(client, root);
106     }
107 
108     @Test(timeout = 30000)
109     public void registerClientWithRoot() {
110         getRootEngine().openRegistration(TestConstants.TEST_CLIENT_NODE_GROUP, TestConstants.TEST_CLIENT_EXTERNAL_ID);
111         getClientEngine().start();
112         Assert.assertTrue("The client did not register.", getClientEngine().isRegistered());
113         IStatisticManager statMgr = (IStatisticManager) getClientEngine().getApplicationContext().getBean(
114                 Constants.STATISTIC_MANAGER);
115         statMgr.flush();
116     }
117 
118     @Test(timeout = 30000)
119     public void initialLoad() {
120         IDbDialect rootDialect = getRootDbDialect();
121         rootJdbcTemplate.update(insertCustomerSql, new Object[] { 301, "Linus", "1", "42 Blanket Street",
122                 "Santa Claus", "IN", 90009, new Date(), "This is a test", BINARY_DATA });
123         insertIntoTestTriggerTable(rootDialect, new Object[] { 1, "wow", "mom" });
124         insertIntoTestTriggerTable(rootDialect, new Object[] { 2, "mom", "wow" });
125         INodeService nodeService = (INodeService) getRootEngine().getApplicationContext().getBean(
126                 Constants.NODE_SERVICE);
127         String nodeId = nodeService.findNodeByExternalId(TestConstants.TEST_CLIENT_NODE_GROUP,
128                 TestConstants.TEST_CLIENT_EXTERNAL_ID).getNodeId();
129         getRootEngine().reloadNode(nodeId);
130         IOutgoingBatchService outgoingBatchService = (IOutgoingBatchService) getRootEngine().getApplicationContext()
131                 .getBean(Constants.OUTGOING_BATCH_SERVICE);
132         Assert.assertFalse(outgoingBatchService.isInitialLoadComplete(nodeId));
133         getClientEngine().pull();
134         Assert.assertTrue(outgoingBatchService.isInitialLoadComplete(nodeId));
135         assertEquals(clientJdbcTemplate.queryForInt("select count(*) from sym_incoming_batch where status='ER'"), 0,
136                 "The initial load errored out." + printRootAndClientDatabases());
137         assertEquals(clientJdbcTemplate.queryForInt("select count(*) from test_triggers_table"), 2,
138                 "test_triggers_table on the client did not contain the expected number of rows");
139         assertEquals(clientJdbcTemplate.queryForInt("select count(*) from test_customer"), 2,
140                 "test_customer on the client did not contain the expected number of rows");
141         assertEquals(clientJdbcTemplate
142                 .queryForInt("select count(*) from sym_node_security where initial_load_enabled=1"), 0,
143                 "Initial load was not successful according to the client");
144         assertEquals(rootJdbcTemplate
145                 .queryForInt("select count(*) from sym_node_security where initial_load_enabled=1"), 0,
146                 "Initial load was not successful accordign to the root");
147     }
148 
149     private void insertIntoTestTriggerTable(IDbDialect dialect, Object[] values) {
150         Table testTriggerTable = dialect.getMetaDataFor(null, null, "test_triggers_table", true);
151         try {
152             dialect.prepareTableForDataLoad(testTriggerTable);
153             dialect.getJdbcTemplate().update(insertTestTriggerTableSql, values);
154         } finally {
155             dialect.cleanupAfterDataLoad(testTriggerTable);
156         }
157     }
158 
159     @Test(timeout = 30000)
160     public void syncToClient() {
161         // test pulling no data
162         getClientEngine().pull();
163 
164         final byte[] BIG_BINARY = new byte[200];
165         for (int i = 0; i < BIG_BINARY.length; i++) {
166             BIG_BINARY[i] = 0x01;
167         }
168 
169         // now change some data that should be sync'd
170         rootJdbcTemplate.update(insertCustomerSql, new Object[] { 101, "Charlie Brown", "1", "300 Grub Street",
171                 "New Yorl", "NY", 90009, new Date(), "This is a test", BIG_BINARY });
172 
173         getClientEngine().pull();
174         assertEquals(clientJdbcTemplate.queryForInt("select count(*) from test_customer where customer_id=101"), 1,
175                 "The customer was not sync'd to the client." + printRootAndClientDatabases());
176 
177         if (getRootDbDialect().isClobSyncSupported()) {
178             assertEquals(clientJdbcTemplate.queryForObject("select notes from test_customer where customer_id=101",
179                     String.class), "This is a test", "The CLOB notes field on customer was not sync'd to the client.");
180         }
181 
182         if (getRootDbDialect().isBlobSyncSupported()) {
183             byte[] data = (byte[]) clientJdbcTemplate.queryForObject(
184                     "select icon from test_customer where customer_id=101", new RowMapper() {
185                         public Object mapRow(ResultSet rs, int rowNum) throws SQLException {
186                             return rs.getBytes(1);
187                         }
188                     });
189             Assert.assertTrue("The BLOB icon field on customer was not sync'd to the client.", ArrayUtils.isEquals(
190                     data, BIG_BINARY));
191         }
192 
193     }
194 
195     @Test(timeout = 30000)
196     public void syncToRootAutoGeneratedPrimaryKey() {
197         final String NEW_VALUE = "unique new value one value";
198         IDbDialect clientDialect = getClientDbDialect();
199         insertIntoTestTriggerTable(clientDialect, new Object[] { 3, "value one", "value \" two" });
200         getClientEngine().push();
201         clientJdbcTemplate.update(updateTestTriggerTableSql, new Object[] { NEW_VALUE });
202         getClientEngine().push();
203         int syncCount = rootJdbcTemplate.queryForInt(
204                 "select count(*) from test_triggers_table where string_one_value=?", new Object[] { NEW_VALUE });
205         assertEquals(syncCount, 3, syncCount + " of the rows were updated");
206     }
207 
208     @Test(timeout = 30000)
209     public void reopenRegistration() {
210         getRootEngine().reOpenRegistration(TestConstants.TEST_CLIENT_EXTERNAL_ID);
211         getClientEngine().pull();
212         Assert.assertEquals(1, getRootDbDialect().getJdbcTemplate().queryForInt(isRegistrationClosedSql,
213                 new Object[] { TestConstants.TEST_CLIENT_EXTERNAL_ID }, new int[] { Types.VARCHAR }));
214     }
215 
216     private void assertEquals(Object actual, Object expected, String failureMessage) {
217         Assert.assertEquals(failureMessage, expected, actual);
218     }
219 
220     private boolean turnOnNoKeysInUpdateParameter(boolean newValue) {
221         IParameterService clientParameterService = (IParameterService) getClientEngine().getApplicationContext()
222                 .getBean(Constants.PARAMETER_SERVICE);
223         IParameterService rootParameterService = (IParameterService) getRootEngine().getApplicationContext().getBean(
224                 Constants.PARAMETER_SERVICE);
225         Assert.assertEquals(clientParameterService.is(ParameterConstants.DATA_LOADER_NO_KEYS_IN_UPDATE),
226                 rootParameterService.is(ParameterConstants.DATA_LOADER_NO_KEYS_IN_UPDATE));
227         boolean oldValue = clientParameterService.is(ParameterConstants.DATA_LOADER_NO_KEYS_IN_UPDATE);
228         clientParameterService.saveParameter(ParameterConstants.DATA_LOADER_NO_KEYS_IN_UPDATE, newValue);
229         rootParameterService.saveParameter(ParameterConstants.DATA_LOADER_NO_KEYS_IN_UPDATE, newValue);
230         return oldValue;
231     }
232 
233     @Test(timeout = 30000)
234     public void syncToRoot() throws ParseException {
235         turnOnNoKeysInUpdateParameter(true);
236         Date date = DateUtils.parseDate("2007-01-03", new String[] { "yyyy-MM-dd" });
237         clientJdbcTemplate.update(insertOrderHeaderSql, new Object[] { "10", 100, null, date }, new int[] {
238                 Types.VARCHAR, Types.INTEGER, Types.CHAR, Types.DATE });
239         clientJdbcTemplate.update(insertOrderDetailSql, new Object[] { "10", 1, "STK", "110000065", 3, 3.33 });
240         getClientEngine().push();
241     }
242 
243     @Test(timeout = 30000)
244     public void syncInsertCondition() throws ParseException {
245         // Should not sync when status = null
246         Date date = DateUtils.parseDate("2007-01-02", new String[] { "yyyy-MM-dd" });
247         rootJdbcTemplate.update(insertOrderHeaderSql, new Object[] { "11", 100, null, date }, new int[] {
248                 Types.VARCHAR, Types.INTEGER, Types.CHAR, Types.DATE });
249         getClientEngine().pull();
250 
251         IOutgoingBatchService outgoingBatchService = findOnRoot(Constants.OUTGOING_BATCH_SERVICE);
252         List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
253         assertEquals(batches.size(), 0, "There should be no outgoing batches, yet I found some.");
254 
255         assertEquals(clientJdbcTemplate.queryForList(selectOrderHeaderSql, new Object[] { "11" }).size(), 0,
256                 "The order record was sync'd when it should not have been.");
257 
258         // Should sync when status = C
259         rootJdbcTemplate.update(insertOrderHeaderSql, new Object[] { "12", 100, "C", date }, new int[] { Types.VARCHAR,
260                 Types.INTEGER, Types.CHAR, Types.DATE });
261         getClientEngine().pull();
262         assertEquals(clientJdbcTemplate.queryForList(selectOrderHeaderSql, new Object[] { "12" }).size(), 1,
263                 "The order record was not sync'd when it should have been.");
264         // TODO: make sure event did not fire
265     }
266 
267     @Test(timeout = 30000)
268     public void oneColumnTableWithPrimaryKeyUpdate() throws Exception {
269         boolean oldValue = turnOnNoKeysInUpdateParameter(true);
270         rootJdbcTemplate.update("insert into ONE_COLUMN_TABLE values(1)");
271         Assert
272                 .assertTrue(clientJdbcTemplate
273                         .queryForInt("select count(*) from ONE_COLUMN_TABLE where MY_ONE_COLUMN=1") == 0);
274         getClientEngine().pull();
275         Assert
276                 .assertTrue(clientJdbcTemplate
277                         .queryForInt("select count(*) from ONE_COLUMN_TABLE where MY_ONE_COLUMN=1") == 1);
278         rootJdbcTemplate.update("update ONE_COLUMN_TABLE set MY_ONE_COLUMN=1 where MY_ONE_COLUMN=1");
279         getClientEngine().pull();
280         IOutgoingBatchService outgoingBatchService = findOnRoot(Constants.OUTGOING_BATCH_SERVICE);
281         List<OutgoingBatch> batches = outgoingBatchService.getOutgoingBatches(TestConstants.TEST_CLIENT_EXTERNAL_ID);
282         assertEquals(batches.size(), 0, "There should be no outgoing batches, yet I found some.");
283         turnOnNoKeysInUpdateParameter(oldValue);
284     }
285 
286     @Test(timeout = 30000)
287     @SuppressWarnings("unchecked")
288     public void syncUpdateCondition() {
289         rootJdbcTemplate.update(updateOrderHeaderStatusSql, new Object[] { "I", "1" });
290         getClientEngine().pull();
291         assertEquals(clientJdbcTemplate.queryForList(selectOrderHeaderSql, new Object[] { "1" }).size(), 0,
292                 "The order record was sync'd when it should not have been.");
293 
294         rootJdbcTemplate.update(updateOrderHeaderStatusSql, new Object[] { "C", "1" });
295         getClientEngine().pull();
296         List list = clientJdbcTemplate.queryForList(selectOrderHeaderSql, new Object[] { "1" });
297         assertEquals(list.size(), 1, "The order record should exist.");
298         Map map = (Map) list.get(0);
299         assertEquals(map.get("status"), "C", "Status should be complete");
300         // TODO: make sure event did not fire
301     }
302 
303     @Test(timeout = 30000)
304     public void ignoreNodeChannel() {
305         INodeService nodeService = (INodeService) getRootEngine().getApplicationContext().getBean("nodeService");
306         IConfigurationService configService = (IConfigurationService) getRootEngine().getApplicationContext().getBean(
307                 "configurationService");
308         nodeService.ignoreNodeChannelForExternalId(true, TestConstants.TEST_CHANNEL_ID,
309                 TestConstants.TEST_ROOT_NODE_GROUP, TestConstants.TEST_ROOT_EXTERNAL_ID);
310         configService.flushChannels();
311         rootJdbcTemplate.update(insertCustomerSql, new Object[] { 201, "Charlie Dude", "1", "300 Grub Street",
312                 "New Yorl", "NY", 90009, new Date(), "This is a test", BINARY_DATA });
313         getClientEngine().pull();
314         assertEquals(clientJdbcTemplate.queryForInt("select count(*) from test_customer where customer_id=201"), 0,
315                 "The customer was sync'd to the client.");
316         nodeService.ignoreNodeChannelForExternalId(false, TestConstants.TEST_CHANNEL_ID,
317                 TestConstants.TEST_ROOT_NODE_GROUP, TestConstants.TEST_ROOT_EXTERNAL_ID);
318         configService.flushChannels();
319     }
320 
321     @Test(timeout = 30000)
322     public void syncUpdateWithEmptyKey() {
323         if (getClientDbDialect().isEmptyStringNulled()) {
324             return;
325         }
326         clientJdbcTemplate.update(insertStoreStatusSql, new Object[] { "00001", "", 1 });
327         getClientEngine().push();
328 
329         clientJdbcTemplate.update(updateStoreStatusSql, new Object[] { 2, "00001", "" });
330         getClientEngine().push();
331 
332         int status = rootJdbcTemplate.queryForInt(selectStoreStatusSql, new Object[] { "00001", "   " });
333         assertEquals(status, 2, "Wrong store status");
334     }
335 
336     @Test(timeout = 30000)
337     public void testPurge() throws Exception {
338         // do an extra pull to make sure we have events cleared out
339         getClientEngine().pull();
340         Thread.sleep(1000);
341         getRootEngine().purge();
342         getClientEngine().purge();
343         Assert.assertTrue("Expected most data rows to have been purged.", rootJdbcTemplate
344                 .queryForInt("select count(*) from " + TestConstants.TEST_PREFIX + "data") < 5);
345         Assert.assertTrue("Expected most data rows to have been purged.", clientJdbcTemplate
346                 .queryForInt("select count(*) from " + TestConstants.TEST_PREFIX + "data") < 5);
347     }
348 
349     @Test
350     public void testHeartbeat() throws Exception {
351         long ts = System.currentTimeMillis();
352         Thread.sleep(1000);
353         getClientEngine().heartbeat();
354         getClientEngine().push();
355         Date time = (Date) rootJdbcTemplate.queryForObject("select heartbeat_time from " + TestConstants.TEST_PREFIX
356                 + "node where external_id='" + TestConstants.TEST_CLIENT_EXTERNAL_ID + "'", Timestamp.class);
357         Assert.assertTrue("The client node was not sync'd to the root as expected.", time != null
358                 && time.getTime() > ts);
359     }
360 
361     @Test(timeout = 30000)
362     public void testVirtualTransactionId() {
363         rootJdbcTemplate.update("insert into test_very_long_table_name_1234 values('42')");
364         if (getRootDbDialect().isTransactionIdOverrideSupported()) {
365             assertEquals(rootJdbcTemplate.queryForObject(
366                     "select transaction_id from sym_data_event where data_id in (select max(data_id) from sym_data)",
367                     String.class), "42", "The hardcoded transaction id was not found.");
368             Assert.assertEquals(rootJdbcTemplate.update("delete from test_very_long_table_name_1234 where id='42'"), 1);
369             assertEquals(rootJdbcTemplate.queryForObject(
370                     "select transaction_id from sym_data_event where data_id in (select max(data_id) from sym_data)",
371                     String.class), "42", "The hardcoded transaction id was not found.");
372         }
373     }
374 
375     @Test(timeout = 30000)
376     public void testCaseSensitiveTableNames() {
377         rootJdbcTemplate.update("insert into TEST_ALL_CAPS values(1, 'HELLO')");
378         getClientEngine().pull();
379         assertEquals(clientJdbcTemplate.queryForInt("select count(*) from TEST_ALL_CAPS where ALL_CAPS_ID = 1"), 1,
380                 "Table name in all caps was not synced");
381         rootJdbcTemplate.update("insert into Test_Mixed_Case values(1, 'Hello')");
382         getClientEngine().pull();
383         assertEquals(clientJdbcTemplate.queryForInt("select count(*) from Test_Mixed_Case where Mixed_Case_Id = 1"), 1,
384                 "Table name in mixed case was not synced");
385     }
386 
387     /***
388      * TODO test on MSSQL
389      */
390     @Test(timeout = 30000)
391     @ParameterExcluder("mssql")
392     public void testNoPrimaryKeySync() {
393         rootJdbcTemplate.update("insert into NO_PRIMARY_KEY_TABLE values(1, 2, 'HELLO')");
394         getClientEngine().pull();
395         assertEquals(clientJdbcTemplate.queryForInt("select TWO_COLUMN from NO_PRIMARY_KEY_TABLE where ONE_COLUMN=1"),
396                 2, "Table was not synced");
397         rootJdbcTemplate.update("update NO_PRIMARY_KEY_TABLE set TWO_COLUMN=3 where ONE_COLUMN=1");
398         getClientEngine().pull();
399         assertEquals(clientJdbcTemplate.queryForInt("select TWO_COLUMN from NO_PRIMARY_KEY_TABLE where ONE_COLUMN=1"),
400                 3, "Table was not updated");
401         rootJdbcTemplate.update("delete from NO_PRIMARY_KEY_TABLE");
402         getClientEngine().pull();
403         assertEquals(clientJdbcTemplate.queryForInt("select count(*) from NO_PRIMARY_KEY_TABLE"), 0,
404                 "Table was not deleted from");
405     }
406 
407     @SuppressWarnings("unchecked")
408     @Test(timeout = 30000)
409     public void testReservedColumnNames() {
410         if (getRootDbDialect() instanceof Db2DbDialect || getClientDbDialect() instanceof Db2DbDialect) {
411             return;
412         }
413         // alter the table to have column names that are not usually allowed
414         String rquote = getRootDbDialect().getIdentifierQuoteString();
415         String cquote = getClientDbDialect().getIdentifierQuoteString();
416         rootJdbcTemplate.update(alterKeyWordSql.replaceAll("\"", rquote));
417         rootJdbcTemplate.update(alterKeyWordSql2.replaceAll("\"", rquote));
418         clientJdbcTemplate.update(alterKeyWordSql.replaceAll("\"", cquote));
419         clientJdbcTemplate.update(alterKeyWordSql2.replaceAll("\"", cquote));
420 
421         // enable the trigger for the table and update the client with
422         // configuration
423         rootJdbcTemplate.update(enableKeyWordTriggerSql);
424         getRootEngine().syncTriggers();
425         getRootEngine().reOpenRegistration(TestConstants.TEST_CLIENT_EXTERNAL_ID);
426         getClientEngine().pull();
427 
428         rootJdbcTemplate.update(insertKeyWordSql.replaceAll("\"", rquote), new Object[] { 1, "x", "a" });
429         getClientEngine().pull();
430 
431         rootJdbcTemplate.update(updateKeyWordSql.replaceAll("\"", rquote), new Object[] { "y", "b", 1 });
432         getClientEngine().pull();
433 
434         List rowList = clientJdbcTemplate.queryForList(selectKeyWordSql.replaceAll("\"", cquote),
435                 new Object[] { 1 });
436         Map columnMap = (Map) rowList.get(0);
437         assertEquals(columnMap.get("key word"), "y", "Wrong key word value in table");
438         assertEquals(columnMap.get("case"), "b", "Wrong case value in table");
439     }
440 
441     @Test(timeout = 30000)
442     public void testSyncColumnLevel() throws ParseException {
443         int id = 1;
444         String[] columns = { "id", "string_value", "time_value", "date_value", "bigint_value", "decimal_value" };
445         Object[] values = new Object[] { id, "moredata", getDate("2008-01-02 03:04:05"),
446                 getDate("2008-02-01 05:03:04"), 600, new BigDecimal("34.10") };
447 
448         // Null out columns, change each column and sync one at a time
449         clientJdbcTemplate.update(nullSyncColumnLevelSql, new Object[] { id });
450 
451         for (int i = 1; i < columns.length; i++) {
452             rootJdbcTemplate.update(replace("column", columns[i], updateSyncColumnLevelSql), new Object[] { values[i],
453                     id });
454             getClientEngine().pull();
455             assertEquals(clientJdbcTemplate.queryForInt(replace("column", columns[i], selectSyncColumnLevelSql),
456                     new Object[] { id, values[i] }), 1, "Table was not updated for column " + columns[i]);
457         }
458     }
459 
460     @Test(timeout = 30000)
461     public void testSyncColumnLevelTogether() throws ParseException {
462         int id = 1;
463         String[] columns = { "id", "string_value", "time_value", "date_value", "bigint_value", "decimal_value" };
464         Object[] values = new Object[] { id, "moredata", getDate("2008-01-02 03:04:05"),
465                 getDate("2008-02-01 05:03:04"), 600, new BigDecimal("34.10") };
466 
467         // Null out columns, change all columns, sync all together
468         rootJdbcTemplate.update(nullSyncColumnLevelSql, new Object[] { id });
469 
470         for (int i = 1; i < columns.length; i++) {
471             rootJdbcTemplate.update(replace("column", columns[i], updateSyncColumnLevelSql), new Object[] { values[i],
472                     id });
473         }
474         getClientEngine().pull();
475     }
476 
477     @Test(timeout = 30000)
478     public void testSyncColumnLevelFallback() throws ParseException {
479         int id = 1;
480         String[] columns = { "id", "string_value", "time_value", "date_value", "bigint_value", "decimal_value" };
481         Object[] values = new Object[] { id, "fallback on insert", getDate("2008-01-02 03:04:05"),
482                 getDate("2008-02-01 05:03:04"), 600, new BigDecimal("34.10") };
483 
484         // Force a fallback of an update to insert the row
485         clientJdbcTemplate.update(deleteSyncColumnLevelSql, new Object[] { id });
486         rootJdbcTemplate.update(replace("column", "string_value", updateSyncColumnLevelSql), new Object[] { values[1],
487                 id });
488         getClientEngine().pull();
489 
490         for (int i = 1; i < columns.length; i++) {
491             assertEquals(clientJdbcTemplate.queryForInt(replace("column", columns[i], selectSyncColumnLevelSql),
492                     new Object[] { id, values[i] }), 1, "Table was not updated for column " + columns[i]);
493         }
494     }
495 
496     @Test(timeout = 30000)
497     public void testSyncColumnLevelNoChange() throws ParseException {
498         int id = 1;
499 
500         // Change a column to the same value, which on some systems will be
501         // captured
502         rootJdbcTemplate.update(replace("column", "string_value", updateSyncColumnLevelSql),
503                 new Object[] { "same", id });
504         rootJdbcTemplate.update(replace("column", "string_value", updateSyncColumnLevelSql),
505                 new Object[] { "same", id });
506         clientJdbcTemplate.update(deleteSyncColumnLevelSql, new Object[] { id });
507         getClientEngine().pull();
508     }
509 
510     @Test(timeout = 30000)
511     public void cleanupAfterTests() {
512         getClientEngine().pull();
513         getClientEngine().purge();
514         getRootEngine().purge();
515     }
516 
517     private String replace(String prop, String replaceWith, String sourceString) {
518         return StringUtils.replace(sourceString, "$(" + prop + ")", replaceWith);
519     }
520 
521     private Date getDate(String dateString) throws ParseException {
522         if (!getClientDbDialect().isDateOverrideToTimestamp() || !getRootDbDialect().isDateOverrideToTimestamp()) {
523             return DateUtils.parseDate(dateString.split(" ")[0], new String[] { "yyyy-MM-dd" });
524         } else {
525             return DateUtils.parseDate(dateString, new String[] { "yyyy-MM-dd HH:mm:ss" });
526         }
527     }
528 
529     protected void testDeletes() {
530     }
531 
532     protected void testMultiRowInsert() {
533     }
534 
535     protected void testMultipleChannels() {
536     }
537 
538     protected void testChannelInError() {
539     }
540 
541     protected void testTableSyncConfigChangeForRoot() {
542     }
543 
544     protected void testTableSyncConfigChangeForClient() {
545     }
546 
547     protected void testDataChangeTableChangeDataChangeThenSync() {
548     }
549 
550     protected void testTransactionalCommit() {
551     }
552 
553     protected void testTransactionalCommitPastBatchBoundary() {
554     }
555 
556     protected void testSyncingGlobalParametersFromRoot() {
557     }
558 
559     protected void testRejectedRegistration() {
560     }
561 
562 }