Menu

Howto sync only current (last and next X) entries

Help
Matthias
2017-12-04
2018-01-04
  • Matthias

    Matthias - 2017-12-04

    Hello there,

    I need help with the following trigger: synchronize always the next and last X entries of my table.

    To clarify, I have a table with planned events, but I do not want to synchronize the complete table. Only current events shall be synchronized. This shall happen on a regular basis, also when no data is changed.

    Is this possible?

    Sync table row only if in:

    SELECT event_timestamp, UTC_TIMESTAMP()-event_timestamp as recent FROM mytable WHERE UTC_TIMESTAMP()-event_timestamp >= 0 order by recent ASC limit 2

    or in:

    SELECT event_timestamp, event_timestamp-UTC_TIMESTAMP() as recent FROM mytable WHERE event_timestamp-UTC_TIMESTAMP() >= 0 order by recent ASC limit 2

    Thank you

     
  • Tom

    Tom - 2017-12-05

    Hello Matthias,

    if I get it right what do you want to achieve, I am not sure that you can do it in symmetric directly, because of this order and limit. But in your case you can use SYM_TRIGGER.SYNC_ON_UPDATE_CONDITION and set there some condition that send on update only that records with some ID (or other column), that have their ID (I suppose that mytable has some ID column as primary key), in something you select, like:

    (select MT.ID from MYTABLE MT where MT.ID in (SELECT ID FROM mytable WHERE UTC_TIMESTAMP()-event_timestamp >= 0 order by recent ASC limit 2 ) and MT.ID = new.ID)
    

    or probably maybe something like this with highest and lowerst event_timestamp, this code will be probably somewhere little wrong, but:

    ((select MT.ID from MYTABLE MT where MT.event_timestamp in (SELECT event_timestamp FROM mytable WHERE UTC_TIMESTAMP()-event_timestamp >= 0 order by recent ASC limit 2 ) and MT.ID = new.ID) 
    OR
    (select MT.ID from MYTABLE MT where MT.event_timestamp-UTC_TIMESTAMP() in (SELECT event_timestamp-UTC_TIMESTAMP() FROM mytable WHERE UTC_TIMESTAMP()-event_timestamp >= 0 order by recent ASC limit 2 ) and MT.ID = new.ID))
    

    Maybe you will come with some easier condition, hope this will help you to start somewhere. This will not be probably very fast if mytable is huge, but it should work in this way.

    Next..
    This shall happen on a regular basis, also when no data is changed.
    Well I suppose that you can do some automatic event (e.g. in cron) that will update only selected records with lowest and highest event_timestamp and symmetric will find them on regular symm trigger for this table. Or you can sent some automatic reload event (insert reload event in sym_data also from e.g. cron with suitable condition to reload only some rows with these event_timestamps) .

     
  • Matthias

    Matthias - 2017-12-11

    Thank you for the hint of the SYNC_ON_UPDATE_CONDITION - it was exactly what I was looking for.
    But concerning the regular updates, I do not like the idea of overwriting the cells (with identical values). So maybe I need to add a "to_sync"-flag in the table. The idea is to have a program that sets the flag for all rows in the timeframe and to unset old entries. I suppose that it is possible to delete rows in the "stores" when the flag is changed from True to False at the "corp" node?

     
  • Jean-Francois Lambert

    I would tackle this differently: eager materialized view (i.e. table) with a UNION for the two records you're targeting. Then no special SymmetricDS black magic.

     
  • Mark Michalek

    Mark Michalek - 2017-12-13

    Matthias, if I understand correctly, you want to sync the 2 most recent events on a schedule, whether they have changed or not. If this understanding is correct, it would be possible to periodically insert a reload event for those 2 rows using Symmetric. In SymmetricDS 3.9 (to be released very soon), you could do a custom SQL job in the sym_job table. In SymmetricDS 3.8, you could do a heartbeat listenter extension, so your sync would run in conjunction with the periodic heartbeat. The SQL to queue up the reload would be something like this:

    insert into sym_data (node_list, table_name, event_type, row_data,
                           trigger_hist_id, channel_id, create_time) (
        select '00001', t.source_table_name, 'R', 'event_timestamp-UTC_TIMESTAMP() >= 0 order by recent ASC limit 2',
                h.trigger_hist_id, t.channel_id, current_timestamp
            from sym_trigger t inner join sym_trigger_router tr on
                t.trigger_id=tr.trigger_id inner join sym_trigger_hist h on
                h.trigger_hist_id=(select max(trigger_hist_id) from sym_trigger_hist
                    where trigger_id=t.trigger_id)
        where channel_id='sale_transaction' and
            tr.router_id like 'store_corp_identity' and
            (t.source_table_name like 'table_%')
        order by tr.initial_load_order asc);
    

    Here is a starting point of what a heartbeat extention would look like in 3.8:

    import java.util.Date;
    import org.jumpmind.symmetric.ISymmetricEngine;
    import org.jumpmind.symmetric.SymmetricException;
    import org.jumpmind.symmetric.ext.IHeartbeatListener;
    import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
    import org.jumpmind.symmetric.model.Lock;
    import org.jumpmind.symmetric.model.Node;
    import org.jumpmind.symmetric.service.ClusterConstants;
    import org.jumpmind.util.FormatUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class SyncOnHeartbeatExtension implements IHeartbeatListener, ISymmetricEngineAware {
    
        // Configuration for this extension... -----------------------------------------------------------------
        final String TABLE_NAME = "table"; 
        final String CHANNEL_ID = "channel_name_here"; 
        final String ROUTER_ID = "router_name_here"; 
        // End Configuration. ----------------------------------------------------------------------------------
    
        private final Logger log = LoggerFactory.getLogger(getClass());
    
        private ISymmetricEngine engine;
    
        final String INSERT_RELOAD_DATA_TEMPLATE = 
                "insert into sym_data ( table_name, event_type, row_data, trigger_hist_id, channel_id, transaction_id, create_time)" + 
                "    select  t.source_table_name, 'R', 'event_timestamp-UTC_TIMESTAMP() >= 0 order by recent ASC limit 2' + 
                "            h.trigger_hist_id, t.channel_id, '1', current_timestamp" + 
                "        from sym_trigger t inner join sym_trigger_router tr on" + 
                "            t.trigger_id=tr.trigger_id inner join sym_trigger_hist h on" + 
                "            h.trigger_hist_id=(select max(trigger_hist_id) from sym_trigger_hist" + 
                "                where trigger_id=t.trigger_id)" + 
                "    where channel_id=? and" + 
                "        tr.router_id like ? and" + 
                "        (t.source_table_name = ?)" + 
                "    order by tr.initial_load_order asc;";
    
        @Override
        public void heartbeat(Node me) {
            int updatedCount = engine.getSqlTemplate().update(INSERT_RELOAD_DATA_TEMPLATE, CHANNEL_ID, ROUTER_ID, TABLE_NAME);
            if (updatedCount != 1) {
                throw new SymmetricException("SyncOnHeartbeatExtension is designed to insert exactly 1 sym_data row.  Instead inserted " + 
                        updatedCount + ". Check TABLE_NAME, CHANNEL_ID, ROUTER_ID parameters in the extension itself.");
            }
        }
    
        @Override
        public long getTimeBetweenHeartbeatsInSeconds() {
            return 0;
        }
        @Override
        public void setSymmetricEngine(ISymmetricEngine engine) {
            this.engine = engine;
        }
    }
    
     

    Last edit: Mark Michalek 2017-12-13
  • Matthias

    Matthias - 2017-12-14

    Fantastic, that was a valuable hint. I absolutely want that feature :)
    http://www.symmetricds.org/issues/view.php?id=2684

     
  • Mark Michalek

    Mark Michalek - 2017-12-15

    Matthias, it will be in 3.9. If all goes well that will be released today.

    Mark

     
  • Matthias

    Matthias - 2018-01-04

    Works like a charm.
    I added a TO_SYNC column and the sym_job is updating its status regularly.

    Thank you for your help!

     

Log in to post a comment.