Search code examples
pythonbloombergblpapi

Fast / Efficient method to retrieve value of specified field with BLPAPI


The reason why I ask this question is because of the manner in which bloomberg sends its data via BLPAPI. Following on from this post, I want to establish an efficient method of obtaining the value of a specific field. As the nature of the way that data is sent means that there can be multiple messages (msg's) in the session.nextEvent() and that surplus data is sent such there is more data than requested I was wondering whether there was a known efficient way of doing so. So far the techniques and emthods I have used means than for 60 securities and 5 subscriptions the data is never live as it lags behind and I beleive the reason for this is how I manage the data coming in. I have some example below signifying an example subscriptin for one security. Given that MKTDATA_EVENT_TYPE and MKTDATA_EVENT_SUBTYPE can be different I am struggling to find an effective way to-do this.

My aim is to avoid for loops where possible and opt for dictionary's to direct me to the value wanted.

import blpapi
from bloomberg import BloombergSessionHandler

# session = blpapi.Session()

host='localhost'
port=8194

session_options = blpapi.SessionOptions()
session_options.setServerHost(host)
session_options.setServerPort(port)
session_options.setSlowConsumerWarningHiWaterMark(0.05)
session_options.setSlowConsumerWarningLoWaterMark(0.02)

session = blpapi.Session(session_options)
if not session.start():
    print("Failed to start Bloomberg session.")


subscriptions = blpapi.SubscriptionList()
fields = ['BID','ASK','TRADE','LAST_PRICE','LAST_TRADE']
subscriptions.add('GB00BLPK7110 @UKRB Corp', fields)
session.subscribe(subscriptions)
session.start()


while(True):
    event = session.nextEvent()
    print("Event type:",event.eventType())

    if event.eventType() == blpapi.Event.SUBSCRIPTION_DATA: 
        i = 0
        for msg in event:
            print("This is msg ", i)
            i+=1
            print("\n" , "msg is ", msg, "\n")
            print("  Message type:",msg.messageType())
            eltMsg = msg.asElement();
            msgType = eltMsg.getElement('MKTDATA_EVENT_TYPE').getValueAsString();
            msgSubType = eltMsg.getElement('MKTDATA_EVENT_SUBTYPE').getValueAsString();
            print(" ",msgType,msgSubType)

            for fld in fields:
                print(" Fields are :",  fields)
                if eltMsg.hasElement(fld):
                    print("    ",fld,eltMsg.getElement(fld).getValueAsFloat())
    else:
        for msg in event:
            print("  Message type:",msg.messageType())

I tried obtaining the values for the specified fields I subscribed to but found that my code was too slow and as such didn't meet the requirements to display live data.

    def process_subscription_data1(self, session):
        while True:
            event = session.nextEvent()
            print(f"The event is {event}")
            if event.eventType() == blpapi.Event.SUBSCRIPTION_DATA:
                print(f"The event type is: {event.eventType()}")
                for msg in event:
                    print(f"The msg is: {msg}")
                    data = {'instrument': msg.correlationIds()[0].value()}
                    print(f"The data is: {data}")
                    # Processing fields efficiently
                    for field in self.fields:
                        print("field is ", field, " ", self.fields)
                        element = msg.getElement(field) if msg.hasElement(field) else None
                        print("element is ", element)
                        data[field] = element.getValueAsString() if element and not element.isNull() else 'N/A'
                    print(f"Emitting data for {data}")
                    self.data_signal.emit(data)  # Emit data immediately for each message

^^ code which I have tried and was far too slow (even without the print statements they are just showing how convoluted the code is)


Solution

  • One way to "throttle" the rate at which real-time ticks come from Bloomberg to specify an interval when adding a subscription:

    subs = blpapi.SubscriptionList()
    flds = ['BID','ASK','TRADE','LAST_TRADE','LAST_PRICE']
    tickers = ['RXA Comdty','GB00BLPK7110 @UKRB Corp']
    
    nId = 0
    for t in tickers:
        subs.add(t,flds,options={'interval':1},correlationId=blpapi.CorrelationId(nId))
        nId += 1
    
    session.subscribe(subs)
    

    This restricts the messages to 1 second intervals (in this example), and each message will contain a 'summary' of all the data for the given ticker. Each message will be larger as it will contain everything and not just the fields you specify. You can specify different intervals for each subscription if some tickers are less active than others.

    If you are building a gui, then this will be event-driven and have a message loop. It could make sense to move to the asynchronous/callback method of handling Bloomberg events. Here the messages are handled in a separate worker thread, and you only need to alert the gui if an event is of interest.

    import blpapi
    import threading
    from queue import Queue
    
    def processEvent(evt,session):
        et = evt.eventType()
    
        if et == blpapi.Event.SESSION_STATUS:
            print('Session Status event')
            for msg in evt:
                print('   ',msg.messageType())
                if msg.messageType() == 'SessionStarted':
                    sessionReady.set()
            return
        if et == blpapi.Event.SUBSCRIPTION_STATUS:
            print('Subscription Status event')
            for msg in evt:
                cId = msg.correlationId()
                print('   ',msg.messageType(),'for ticker:',tickers[cId.value()])
    
            return
        if et == blpapi.Event.SUBSCRIPTION_DATA:
            for msg in evt:
                cId = msg.correlationId()
                tkr = tickers[cId.value()]
                eltMsg = msg.asElement()
    
                for f in flds:
                    if eltMsg.hasElement(f):
                        v = eltMsg.getElement(f).getValueAsFloat()
                        tick = (tkr,{f:v})
                        qTicks.put(tick)
      
            return
    
    qTicks = Queue()
    sessionReady = threading.Event() 
    
    session = blpapi.Session(eventHandler=processEvent)
    
    print('Waiting for sesssion to start')
    session.startAsync()
    sessionReady.wait()
    print('Session started')
    
    subs = blpapi.SubscriptionList()
    flds = ['BID','ASK','TRADE','LAST_TRADE','LAST_PRICE']
    tickers = ['EUR Curncy','RXA Comdty','GB00BLPK7110 @UKRB Corp']
    
    nId = 0
    for t in tickers:
        subs.add(t,flds,correlationId=blpapi.CorrelationId(nId))
        nId += 1
    
    session.subscribe(subs)
    
    while True:
        try:
            tick = qTicks.get(True)
            print(tick)
        except:
            print('Terminating')
            break
    
    session.stopAsync()
    

    One wrinkle is that startAsync returns immediately, so your Session may not be ready when you try and add subscriptions. One way to get around this is to wait for the SessionStarted message and signal a python Event. Before you start to work with the Session, wait on this event.

    I don't know what kind of message loop the gui might have. In this example I am using a python Queue to send the tick data from the worker thread to the main thread (Ctrl-C will generate an exception and end the loop). Another alternative is to 'post' messages to the main thread's message queue. The processEvent callback can be used to decide when to fire an event to the main thread: eg the Bloomberg message contains (inconsistently-named) last update times for each field and these could be used to determine if the message's data has changed since the last tick: you would only signal new data ticks, cutting down the amount of screen updating in the gui.