Search code examples
kdb

How to implement dynamic subscribers with KDB?


I'm trying to implement dynamic subscribers in a kdb-tick system whereby a subset of the events are passed to a given consumer based on a query supplied by the consumer.

For instance given a batch of events i.e.:

flip `source`channel`value!(10?`a`b`c;10?`a`b`c;10?10)

source channel value
--------------------
a      a       4
b      b       5
a      c       4
b      a       2
c      c       7
c      b       8
c      a       5
a      c       6
b      a       4
b      a       1

The tickerplant should only send the events without a channel of `c i.e.

source channel value
--------------------
a      a       4
b      b       5 
b      a       2 
c      b       8
c      a       5 
b      a       4
b      a       1

I have tried to implement this by parsing a dynamic conditional as follows:

q).tp.subscribers

hp        | host isOpen port   h   subs
----------| --------------------------------------------------
:test:5000| test 0      5000   6   "enlist(not;(in;`channel;enlist`c))"

Whereby subs is a conditional argument to a functional select statement that is used in the following code:

.tp.send:{neg[x] y};

.tp.reval:{[batch;subscriber]
  .tp.send[raze .subscriber`h] reval[parse["?[batch;",raze[subscriber`subs],";0b;()]"]]
  };

// Called with event batch 
.tp.broadcast:{[batch]
   .tp.reval[batch]'[select from .tp.subscribers where isOpen] 
  };

This fails on account of batch not being addressable in a non global context through a functional select statement. I was wondering how this functionality might be effectively achieved?

Could anyone advise me on or point me to information pertaining to a solution for this problem statement. Your advice would very much appreciated. Apologies if this is a newbie question. Thanks


Solution

  • I think the fact that you're expecting a string form of a conditional argument is part of your problem (that in turn requires you to parse a stringified functional select and that parse assumes global).

    Why not expect a list form of the conditional argument instead? Then there's no need to parse and you can create a local functional select. E.g.

    .tp.subscribers:([hp:1#`:test:5000]subs:enlist(not;(in;`channel;1#`c)))
    
    q){[batch] reval ?[batch;(0!.tp.subscribers)`subs;0b;()]}flip `source`channel`value!(10?`a`b`c;10?`a`b`c;10?10)
    source channel value
    --------------------
    a      a       4
    b      b       5
    b      a       2
    c      b       8
    c      a       5
    b      a       4
    b      a       1
    

    Or have the user specify a lambda and run that (though I guess you would lose the ability to use reval in that case):

    .tp.subscribers:([hp:1#`:test:5000]subs:enlist{select from x where not channel=`c})
    
    q){[batch] @[first(0!.tp.subscribers)`subs;batch;()]}flip `source`channel`value!(10?`a`b`c;10?`a`b`c;10?10)
    source channel value
    --------------------
    a      b       9
    c      b       0
    b      b       0
    b      a       9
    b      a       3
    b      a       9