Menu

initial_load_select expression doesn't work

Help
Shen Flik
2014-11-26
2014-12-04
  • Shen Flik

    Shen Flik - 2014-11-26

    Hi folks,

    I try to sync customer information betwen central database and region database according to region code which region customer belongs to. The external_id is configured as same as region code. The normal data changes are captured and synced properly. However, the initial load doesn't work as expected. To ensure only the customers belonging to the region are load into region following initial_load_select is set. The addr_code is one column of table customer and its first 6 digits represent as the region code.

    INSERT INTO sym_trigger_router (trigger_id,router_id,initial_load_order,initial_load_select,last_update_time,create_time)
    VALUES('global_customer','CENTRAL_2_REGION_column',700,'left(addr_code, 6)=''$(externalId)''',current_timestamp,current_timestamp);
    

    Does the initial_load_select support SQL function or sub-query? Is there any suggestion?

    I use SymmetricDS 3.6.11 + MySQL 5.6.

    Thanks in advance.
    Flik

     
  • Eric Long

    Eric Long - 2014-11-28

    That looks correct. It will add that SQL to the select statement during the initial load. Remember that initial load and change capture are different parts of the synchronization. Changes will go through your router, and the default router will send all changes. I wonder if you want to do that filter in the router instead. If you do a BSH router, I think it would be 'if (ADDR_CODE == null) return false; else return ADDR_CODE.substring(0, 6);'

     
  • Shen Flik

    Shen Flik - 2014-12-02

    Hi Eric,

    Many thanks for your reply.
    My trigger with external select and router, a column match router, are as following.

    INSERT INTO sym_trigger (trigger_id,source_catalog_name,source_schema_name,source_table_name,channel_id,external_select,excluded_column_names,last_update_time,create_time)
    VALUES('global_customer','centralDB',null,'customer','customer','SELECT left($(curTriggerValue).$(curColumnPrefix)addr_code, 6)','service_type,has_doc,desc',current_timestamp,current_timestamp);
    
    INSERT INTO sym_router (router_id,target_catalog_name,target_schema_name,source_node_group_id,target_node_group_id,router_type,router_expression,use_source_catalog_schema,create_time,last_update_time)
    VALUES('CENTRAL_2_REGION_column','localDB',null,'CENTRAL','REGION','column','EXTERNAL_DATA=:EXTERNAL_ID',0,current_timestamp,current_timestamp);
    

    However, even changes are captured and replicated properly, the initial load doesn't load any data to region. I also saw there are 212 rows in about 20 batches when doing initial load. How can I know what is exact data processed during above approach and why it finally doesn't load any data into remote region?

    Regards,
    Flik

     
  • Shen Flik

    Shen Flik - 2014-12-02

    Hi Eric,

    Do you think the "router_expression" will be a problem? Since the expression is set to "EXTERNAL_DATA=:EXTERNAL_ID", it will be fine during changes caputing and external data is populated by external select. However it maybe cause problem since no information is populated in external data column then no row will match.
    Is it possible to configure a new trigger used only for RELOAD?
    What do you think?

    Regards,
    Flik

     
  • Shen Flik

    Shen Flik - 2014-12-03

    HI,

    I added another router for initial load from central database to region database as following.

    INSERT INTO sym_router (router_id,target_catalog_name,target_schema_name,source_node_group_id,target_node_group_id,router_type,router_expression,sync_on_update,sync_on_insert,sync_on_delete,use_source_catalog_schema,create_time,last_update_time)
    VALUES('CENTRAL_2_REGION_reload','centralDB',null,'CENTRAL','REGION','default',null,0,0,0,0,current_timestamp,current_timestamp);
    
    INSERT INTO sym_trigger_router (trigger_id,router_id,initial_load_order,initial_load_select,last_update_time,create_time)
    VALUES('global_customer','CNTRAL_2_REGION_reload',700,'left(addr_code, 6)=''$(externalId)''',current_timestamp,current_timestamp);
    

    I also enabled the runtime configuration [initial.load.delete.first] to ensure table to be cleaned up.
    It seems the data is finally sent out to region. But following errors occurred even when all tables are truncated. Is the conflict caused by two routers?

    2014-12-03 16:07:55,763 INFO [REGION-310104] [IncomingBatchService] [region-310104-pull-1] Retrying batch central-118
    2014-12-03 16:07:55,850 INFO [REGION-310104] [DefaultDatabaseWriter] [region-310104-pull-1] Failed to process a update event in batch 118.
    Failed pk data was: "9505"
    Failed row data was: "9505","","","1","baba..","0","","0","avatar/2012/11/0d74c577eb63c7f17ea71bf2f50f4590.jpg","0","","bababa...","1","1981-10-21","90","0","310104013004","1","54209425","2012-11-01 11:27:03","97","343","1","2013-11-18 16:00:00","212","310230198110210059","0","bababa...","baba..","13661616903","","0",",1,"

    2014-12-03 16:07:55,855 INFO [REGION-310104] [DefaultDatabaseWriter] [region-310104-pull-1] Failed to process a update event in batch 118.
    Failed pk data was: "9505"
    Failed row data was: "9505","","","1","baba..","0","","0","avatar/2012/11/0d74c577eb63c7f17ea71bf2f50f4590.jpg","0","","bababa...","1","1981-10-21","90","0","310104013004","1","54209425","2012-11-01 11:27:03","97","343","1","2013-11-18 16:00:00","212","310230198110210059","0","bababa...","baba..","13661616903","","0",",1,"

    2014-12-03 16:07:55,858 INFO [REGION-310104] [DefaultDatabaseWriter] [region-310104-pull-1] Failed to process a update event in batch 118.
    Failed pk data was: "9505"
    Failed row data was: "9505","","","1","baba..","0","","0","avatar/2012/11/0d74c577eb63c7f17ea71bf2f50f4590.jpg","0","","bababa...","1","1981-10-21","90","0","310104013004","1","54209425","2012-11-01 11:27:03","97","343","1","2013-11-18 16:00:00","212","310230198110210059","0","bababa...","baba..","13661616903","","0",",1,"

    2014-12-03 16:07:55,865 ERROR [REGION-310104] [DataLoaderService] [region-310104-pull-1] Failed to load batch central-118 because: Detected conflict while executing UPDATE on centralDB.customer. The primary key data was: {cust_id=9505}.
    org.jumpmind.symmetric.io.data.writer.ConflictException: Detected conflict while executing UPDATE on centralDB.customer. The primary key data was: {cust_id=9505}.
    at org.jumpmind.symmetric.io.data.writer.AbstractDatabaseWriter.write(AbstractDatabaseWriter.java:176)
    at org.jumpmind.symmetric.io.data.writer.DefaultTransformWriterConflictResolver.performFallbackToUpdate(DefaultTransformWriterConflictResolver.java:83)
    at org.jumpmind.symmetric.io.data.writer.AbstractDatabaseWriterConflictResolver.needsResolved(AbstractDatabaseWriterConflictResolver.java:92)
    at org.jumpmind.symmetric.io.data.writer.AbstractDatabaseWriter.write(AbstractDatabaseWriter.java:174)
    at org.jumpmind.symmetric.io.data.writer.AbstractDatabaseWriter.write(AbstractDatabaseWriter.java:131)
    at org.jumpmind.symmetric.io.data.writer.NestedDataWriter.write(NestedDataWriter.java:64)
    at org.jumpmind.symmetric.model.ProcessInfoDataWriter.write(ProcessInfoDataWriter.java:65)
    at org.jumpmind.symmetric.io.data.writer.TransformWriter.write(TransformWriter.java:223)
    at org.jumpmind.symmetric.io.data.DataProcessor.forEachDataInTable(DataProcessor.java:199)
    at org.jumpmind.symmetric.io.data.DataProcessor.forEachTableInBatch(DataProcessor.java:169)
    at org.jumpmind.symmetric.io.data.DataProcessor.process(DataProcessor.java:115)
    at org.jumpmind.symmetric.service.impl.DataLoaderService$LoadIntoDatabaseOnArrivalListener.end(DataLoaderService.java:802)
    at org.jumpmind.symmetric.io.data.writer.StagingDataWriter.notifyEndBatch(StagingDataWriter.java:75)
    at org.jumpmind.symmetric.io.data.writer.AbstractProtocolDataWriter.end(AbstractProtocolDataWriter.java:220)
    at org.jumpmind.symmetric.io.data.DataProcessor.process(DataProcessor.java:129)
    at org.jumpmind.symmetric.service.impl.DataLoaderService.loadDataFromTransport(DataLoaderService.java:431)
    at org.jumpmind.symmetric.service.impl.DataLoaderService.loadDataFromPull(DataLoaderService.java:269)
    at org.jumpmind.symmetric.service.impl.PullService.execute(PullService.java:133)
    at org.jumpmind.symmetric.service.impl.NodeCommunicationService$2.run(NodeCommunicationService.java:307)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
    at java.lang.Thread.run(Thread.java:662)
    2014-12-03 16:07:55,955 INFO [REGION-310104] [PullService] [region-310104-pull-1] There was a failure while pulling data from CENTRAL:central:central. 3 rows and 1 batches were processed

    Regards,
    Flik

     
  • Shen Flik

    Shen Flik - 2014-12-03

    Hi,

    I tried another way that configuring two columns in router expression of original column match router instead of creating new one reload router as following.

    INSERT INTO sym_router (router_id,target_catalog_name,target_schema_name,source_node_group_id,target_node_group_id,router_type,router_expression,use_source_catalog_schema,create_time,last_update_time)
    VALUES('CENTRAL_2_REGION_column','centralDB',null,'CENTRAL','REGION','column','EXTERNAL_DATA=NULL or EXTERNAL_DATA=:EXTERNAL_ID',0,current_timestamp,current_timestamp);
    

    However, same issue occurred.
    Anyway, it is better than have two routers and stopped by same issue.

    Regards,
    Flik

     
  • Shen Flik

    Shen Flik - 2014-12-04

    Hi,

    I have a transform to to add additional column with constant value for global customer into target database. I found the root cause of above error is that I set the attribute "update_first" of transform to [TRUE] and it leaded following block of logic in class DefaultDatabaseWriter giving conflict conclusion. Since the database was empty and executing an update statement would NOT impact any record.

    455                try {
    456                    long count = execute(data, values);
    457                    statistics.get(batch)
    458                            .increment(DataWriterStatisticConstants.UPDATECOUNT, count);
    459                    if (count > 0) {
    460                        return LoadStatus.SUCCESS;
    461                    } else {
    462                        context.put(CUR_DATA,getCurData(transaction));
    463                        return LoadStatus.CONFLICT;
    464                    }
    465                } catch (SqlException ex) {
    

    After disabling the attribute "update_first", data could be sent to region database successfully.

    Best regards,
    Flik

     

Log in to post a comment.