Search code examples
kdb+

Replay tplog file by appending messages in the partitioned table in kdb


I'm trying to replay messages from tp log directly to partitioned table on disk by appending messages since I don't have much primary memory compared to tplog size.

TpLog is like below:

q)9 2 sublist get `:/Users/uts/Desktop/repos/ktick/tick/sym2020.05.23
`upd `trade (,0D22:38:00.083960000;,`MSFT.O;,45.15104;,710)              
`upd `quote (,0D22:38:01.082882000;,`VOD.L;,341.2765;,341.3056;,732;,481)

I'm using below 'upsert' method to append these tplog msgs to partitioned table but it is failing with type error on upsert:

quote:([]time:`timespan$();sym:`symbol$();bid:`float$();ask:`float$();bsize:`int$();asize:`int$());
trade:([]time:`timespan$();sym:`symbol$();price:`float$();size:`int$());


`:/Users/uts/db/2020.05.23/quote/ set .Q.en[`:/Users/uts/db;]quote;
`:/Users/uts/db/2020.05.23/trade/ set .Q.en[`:/Users/uts/db;]trade;


upd:{[t;d]
    if[`trade~t;[show raze d;`:/Users/uts/db/2020.05.23/trade/ upsert .Q.en[`:/Users/uts/db;]enlist (cols trade)!raze d]];
    };

-11!`:/Users/uts/Desktop/repos/ktick/tick/sym2020.05.23

Error:

    'type
  [1]  upd:{[t;d]
    if[`trade~t;[show raze d;`:/Users/utsav/db/2020.05.23/trade/ upsert .Q.en[`:/Users/utsav/db;]enlist (cols trade)!raze d]];
                                                                 ^
    }

But if I try to manually append the message to partitioned table, it's working fine:

`:/Users/uts/db/2020.05.23/trade/ upsert .Q.en[`:/Users/uts/db;]enlist (cols trade)!raze (enlist 0D22:39:00.083960000;enlist `MSFT.O;enlist 45.15104; enlist 710)

Not sure why 'upsert' is not working within upd function along with -11!.

Please share(details/links) if there is any better way(most probably there must be) to replay the tplogs directly to disk without using much primary memory.


Solution

  • I'm not sure if it'll solve your exact problem but a few suggestions:

    1. Your code assumes that every single tickerplant log record is for a single row. This may not be the case, as many tickerplant logs will log multiple rows in a single update. What this means is that your enlist (cols trade)!raze d code wouldn't work (though I would suspect a length error in this case). A more general alternative is to use:
    $[0>type first d;enlist cols[trade]!d;flip cols[trade]!d]
    
    1. You should not try to write to disk for every single upd record from a tickerplant log - it's simply too many disk writes in such a short space of time. It's inefficient and could lead to disk I/O constraints. Better to insert in-memory until the table reaches a certain size and then write in a batch and wipe the table. I would suggest something like:
    write:{`:/Users/uts/db/2020.05.23/trade/ upsert .Q.en[`:/Users/uts/db;value x];delete from x};
    upd:{[t;d]
        if[`trade~t;t insert d];
        if[10000<count value t;write[t]];
        };
    

    Then your replay would look like:

    -11!`:/Users/uts/Desktop/repos/ktick/tick/sym2020.05.23;
    if[0<count trade;write[`trade]]; /need to write the leftovers
    `sym`time xasc `:/Users/uts/db/2020.05.23/trade/;
    @[`:/Users/uts/db/2020.05.23/trade/;`sym;`p#];