Search code examples
kdbpykx

Writing a subscriber in PYKX


I know how to write a subscriber for the following code in kdb+/q, but I'm puzzled by how I could get the value of the table quotes from pykx. Here's my

publisher.q

tg:{[n] // random table generator
    ([]
        time: n?(.z.P-365*1D);
        sym: n?`A`B`C`D`E;
        price: n?100.0
    )
 }

quotes:tg 3

\l tick/u.q
.u.init[];

\t 50
.z.ts:{
    .u.pub[`quotes; tg 3]; // publish quotes
 };

\p 5011

and here's my attempt at writing a subscriber in python using pykx.

subscriber.py

import pykx as kx
import time

# Open a connection to the defined process
q = kx.QConnection('localhost', 5011)
while True:
    bars = q('.u.sub[`quotes;`]')
    print(bars)
    time.sleep(1)

which outputs...

`quotes
+`time`sym`price!(`timestamp$();`g#`symbol$();`float$())
WARN: Discarding unexpected async message from handle: 9
WARN: Discarding unexpected async message from handle: 9

Solution

  • There is an example detailed and downloadable on:

    https://code.kx.com/pykx/2.5/examples/subscriber/readme.html

    Applied to your example would be similar to:

    (Note: this is draft/untested code. Use the working subscriber.py example from above as your starting point.)

    import pykx as kx
    import asyncio
    import datetime
    
    quotes = kx.q('flip', {
        'time': [kx.TimestampAtom(datetime.datetime.now())],
        'sym': [kx.SymbolAtom('')],
        'price': [kx.FloatAtom(0.0)]
    })
    
    
    async def main_loop(q, tab):
        while True:
            await asyncio.sleep(0.5)
            result = q.poll_recv()
            tab = kx.q.upsert(tab, result[2])
            print(tab)  # usable result
    
    
    async def main():
        global quotes
        async with kx.RawQConnection(port=5011) as q:
            await q('.u.sub', 'quotes', '')
    
            await main_loop(q, quotes)
    
    
    if __name__ == '__main__':
        asyncio.run(main())