Search code examples
pythonmultithreadingwebsocketmultiprocessing

Python websocket multithread real time data question


I'm trying to build a terminal application that send request to a server and receive real time data back via websocket(data is in JSON format). Meanwhile, I want to save the data in a list. The above functions is in one thread. The second thread will loop through the list and print the data out(it's supposed to update my database but for simplicity sake I'm just printing it out now). But somehow the second thread doesn't seem to be starting or the data are not appended to the list so that there is nothing to print. The following is my code:

from functools import partial
import sys
import time
import getopt
import requests
import socket
import json
import websocket
import threading
from multiprocessing import Process




def process_data(dataBuffer):
    if len(dataBuffer) > 0:
        for data in dataBuffer:
           
            if data['Key']['Name'] == "CNY=X":
               print("CNY: " , data['Fields']['BID'], data['Fields']['ASK'], data['Fields']['PCTCHNG'], data['Fields']['NETCHNG_1'], data['Fields']['PRIMACT_1'])
        if data['Key']['Name'] == "TWD=X":
            print("TWD: " , data['Fields']['BID'], data['Fields']['ASK'], data['Fields']['PCTCHNG'], data['Fields']['NETCHNG_1'], data['Fields']['PRIMACT_1'])
                #if obj['Key']['Name'] == "CNY=X":
                #    print("CNY: " , obj['Fields']['BID'], obj['Fields']['ASK'], obj['Fields']['PCTCHNG'], obj['Fields']['NETCHNG_1'], obj['Fields']['PRIMACT_1'])
                #if obj['Key']['Name'] == "TWD=X":
                #    print("TWD: " , obj['Fields']['BID'], obj['Fields']['ASK'], obj['Fields']['PCTCHNG'], obj['Fields']['NETCHNG_1'], obj['Fields']['PRIMACT_1'])

def process_message(message_json):
    """ Parse at high level and output JSON of message """
    message_type = message_json['Type']
    global dataBuffer
    if message_type == "Refresh":
        if 'Domain' in message_json:
            message_domain = message_json['Domain']
            if message_domain == "Login":
                process_login_response(message_json)
            
        dataBuffer.append(message_json)
    elif message_type == "Ping":
        pong_json = {'Type': 'Pong'}
        web_socket_app.send(json.dumps(pong_json))
        print("SENT:")
        print(json.dumps(pong_json, sort_keys=True, indent=2, separators=(',', ':')))
    elif message_json['Type'] == "Update":
         dataBuffer.append(message_json)
    
    
def process_login_response(message_json):
    """ Send item request """
    send_market_price_request(ric)


def send_market_price_request(ric_name):
    """ Create and send simple Market Price request """
    mp_req_json = {
        'ID': 2,
        'Key': {
            'Name': ric_name,
            'Service': service
        },
    }
    web_socket_app.send(json.dumps(mp_req_json))
    print("SENT:")
    print(json.dumps(mp_req_json, sort_keys=True, indent=2, separators=(',', ':')))


def send_login_request(auth_token, is_refresh_token):
    """
        Send login request with authentication token.
        Used both for the initial login and subsequent reissues to update the authentication token
"""
    
    web_socket_app.send(json.dumps(login_json))
    print("SENT:")
    print(json.dumps(login_json, sort_keys=True, indent=2, separators=(',', ':')))


def on_message( _, message):
    """ Called when message received, parse message into JSON for processing """
    #global dataBuffer
    
    print("RECEIVED: ")
    message_json = json.loads(message)
    print(json.dumps(message_json, sort_keys=True, indent=2, separators=(',', ':')))
    for singleMsg in message_json:
        process_message(singleMsg)
       
 

def on_error(_, error):
    """ Called when websocket error has occurred """
    print(error)


def on_close(_, close_status_code, close_msg):
    """ Called when websocket is closed """
    


def on_open(_):
    """ Called when handshake is complete and websocket is open, send login """

    print("WebSocket successfully connected!")
    global web_socket_open
    web_socket_open = True
    send_login_request(sts_token, False)


def get_sts_token(current_refresh_token, url=None):
    """
        Retrieves an authentication token.
        :param current_refresh_token: Refresh token retrieved from a previous authentication, used to retrieve a
        subsequent access token. If not provided (i.e. on the initial authentication), the password is used.
    """


###
user = 'xxx'
password = 'xxx'
clientid = 'xxx'

###

# Global Default Variables
app_id = '256'
auth_url = 'https://api.refinitiv.com:443/auth/oauth2/v1/token'
#hostname = 'ap-southeast-1-aws-1-lrg.optimized-pricing-api.refinitiv.net'
hostname ='ap-northeast-1-aws-1-sm.optimized-pricing-api.refinitiv.net'
port = '443'
scope = 'trapi.streaming.pricing.read'

position = ''
sts_token = ''
refresh_token = ''
client_secret = ''
service = 'ELEKTRON_DD'

# Global Variables
web_socket_app = None
web_socket_open = False
logged_in = False
original_expire_time = '0';
dataBuffer = []


ric = ["CNY=X", "TWD=X"]

###

position_host = socket.gethostname()
position = socket.gethostbyname(position_host) + "/" + position_host
sts_token, refresh_token, expire_time = get_sts_token(None)

original_expire_time = expire_time
ws_address = "wss://{}:{}/WebSocket".format(hostname, port)
web_socket_app = websocket.WebSocketApp(ws_address, on_message=on_message,on_error=on_error,on_close=on_close,subprotocols=['tr_json2'])
web_socket_app.on_open = on_open
expire_time = 60
wst = threading.Thread(target=web_socket_app.run_forever, kwargs={'sslopt': {'check_hostname':
False}})
t2 = threading.Thread(target=process_data , args=(dataBuffer,))
#wst = Process(target=web_socket_app.run_forever, kwargs={'sslopt': {'check_hostname': False}})
#t2 = Process(target=process_data, args=(dataBuffer,))
    ###
    ###
wst.start()
t2.start()

  

The following is the first json message received:

[
    {
      "Fields":{
        "ACTIV_DATE":"2023-08-22",
        "ACT_FLAG1":" ",
        "ACT_FLAG2":" ",
        "ACT_FLAG3":" ",
        "ACT_TP_1":"B\u21e7",
        "ACT_TP_2":"B\u21e7",
        "ACT_TP_3":"B\u21e7",
        "ACVOL_1":null,
        "ASIA_CLOSE":null,
        "ASIA_CL_DT":null,
        "ASIA_HIGH":null,
        "ASIA_HI_TM":null,
        "ASIA_LOW":null,
        "ASIA_LW_TM":null,
        "ASIA_NETCH":null,
        "ASIA_OPEN":null,
        "ASIA_OP_TM":null,
        "ASK":7.2929,
        "ASKSIZE":null,
        "ASK_1":7.2925,
        "ASK_2":7.2885,
        "ASK_HIGH_1":7.2932,
        "ASK_HI_TME":"08:26:00",
        "ASK_LOW_1":7.268,
        "ASK_LO_TME":"01:34:00",
        "ASK_SPREAD":null,
        "BCAST_REF":"CNY=",
        "BCKGRNDPAG":"    ",
        "BEST_ASIZ1":null,
        "BEST_ASIZ2":null,
        "BEST_ASIZ3":null,
        "BEST_ASIZ4":null,
        "BEST_ASIZ5":null,
        "BEST_ASK1":null,
        "BEST_ASK2":null,
        "BEST_ASK3":null,
        "BEST_ASK4":null,
        "BEST_ASK5":null,
        "BEST_BID1":null,
        "BEST_BID2":null,
        "BEST_BID3":null,
        "BEST_BID4":null,
        "BEST_BID5":null,
        "BEST_BSIZ1":null,
        "BEST_BSIZ2":null,
        "BEST_BSIZ3":null,
        "BEST_BSIZ4":null,
        "BEST_BSIZ5":null,
        "BID":7.2923,
        "BIDSIZE":null,
        "BID_1":7.2911,
        "BID_2":7.2882,
        "BID_HIGH_1":7.2926,
        "BID_LOW_1":7.267,
        "BID_NET_CH":0.0058,
        "BID_SPREAD":null,
        "BID_TICK_1":"\u21e7",
        "BKGD_REF":"Chinese Yuan",
        "BOND_TYPE":"   ",
        "BPV":null,
        "B_ASK1_TIM":null,
        "B_ASK2_TIM":null,
        "B_ASK3_TIM":null,
        "B_ASK4_TIM":null,
        "B_ASK5_TIM":null,
        "B_BID1_TIM":null,
        "B_BID2_TIM":null,
        "B_BID3_TIM":null,
        "B_BID4_TIM":null,
        "B_BID5_TIM":null,
        "CALL_PRC":null,
        "CCY_NAME":"                              ",
        "CLOSE_ASK":7.289,
        "CLOSE_BID":7.2865,
        "CLOSE_TYPE":"B ",
        "CNTCT_ID":null,
        "CONTEXT_ID":3284,
        "CONVEXITY":null,
        "CROSS_SC":"1E+00",
        "CTBLOC_ID1":"   ",
        "CTBLOC_ID2":"   ",
        "CTBLOC_ID3":"   ",
        "CTBTR_1":"Refinitiv",
        "CTBTR_2":"            ",
        "CTBTR_3":"            ",
        "CTBTR_BKGD":null,
        "CTBTR_ID1":"    ",
        "CTBTR_ID2":"    ",
        "CTBTR_ID3":"    ",
        "CTB_LOC1":"   ",
        "CTB_LOC2":"   ",
        "CTB_LOC3":"   ",
        "CTB_PAGE1":"    ",
        "CTB_PAGE2":"    ",
        "CTB_PAGE3":"    ",
        "CURRENCY":"CNY",
        "DAYS_MAT":null,
        "DEALT_VL1":null,
        "DEALT_VL2":null,
        "DEALT_VL3":null,
        "DELTA":null,
        "DLG_CODE1":"      ",
        "DLG_CODE2":"      ",
        "DLG_CODE3":"      ",
        "DSO_ID":16416,
        "DSPLY_NAME":"Refinitiv",
        "DVOL1_SC":" ",
        "DVOL2_SC":" ",
        "DVOL3_SC":" ",
        "EMAIL_ADRS":"                                        ",
        "EURO_CLOSE":null,
        "EURO_CL_DT":null,
        "EURO_HIGH":null,
        "EURO_HI_TM":null,
        "EURO_LOW":null,
        "EURO_LW_TM":null,
        "EURO_NETCH":null,
        "EURO_OPEN":null,
        "EURO_OP_TM":null,
        "FIX_DATE":null,
        "FWD1_PRICE":null,
        "FWD2_PRICE":null,
        "GEN_TEXT16":"                ",
        "GEN_VAL1":null,
        "GEN_VAL10":null,
        "GEN_VAL2":null,
        "GEN_VAL3":null,
        "GEN_VAL4":null,
        "GEN_VAL5":null,
        "GEN_VAL6":null,
        "GEN_VAL7":null,
        "GEN_VAL8":null,
        "GEN_VAL9":null,
        "GN_TXT16_2":"                ",
        "GN_TXT24_1":"                        ",
        "GN_TXT32_1":"                                ",
        "GN_TXT32_2":"                                ",
        "GV10_TEXT":"      ",
        "GV1_DATE":null,
        "GV1_FLAG":" ",
        "GV1_TEXT":"SPOT  ",
        "GV1_TIME":null,
        "GV2A_RTIM1":null,
        "GV2A_RTIM2":null,
        "GV2A_RTIM3":null,
        "GV2B_RTIM1":null,
        "GV2B_RTIM2":null,
        "GV2B_RTIM3":null,
        "GV2_DATE":null,
        "GV2_FLAG":" ",
        "GV2_TEXT":"USDCNY",
        "GV2_TIME":null,
        "GV3_FLAG":" ",
        "GV3_TEXT":"      ",
        "GV4_FLAG":" ",
        "GV4_TEXT":"      ",
        "GV5_TEXT":"      ",
        "GV6_TEXT":"      ",
        "GV7_TEXT":"      ",
        "GV8_TEXT":"      ",
        "GV9_TEXT":"      ",
        "HIGHTP_1":"B",
        "HIGH_1":7.2926,
        "HIGH_2":7.2911,
        "HIGH_3":7.2909,
        "HIGH_4":7.2909,
        "HIGH_5":7.2909,
        "HIGH_TIME":"08:26:00",
        "HIGH_TIME2":"08:19:00",
        "HIGH_TIME3":"07:30:00",
        "HIGH_TIME4":"07:30:00",
        "HIGH_TIME5":"07:30:00",
        "HIGH_YLD":null,
        "HSTCLBDDAT":"2023-08-21",
        "HSTCLSDATE":"2023-08-21",
        "HST_CLOSE":7.2865,
        "HST_CLSBID":7.2865,
        "HST_CLSYLD":null,
        "IND_NEWS":"          ",
        "INST_DESC":"                              ",
        "IRGFID":null,
        "IRGPRC":0.0796,
        "IRGVAL":null,
        "LONGLINK1":null,
        "LOWTP_1":"A",
        "LOW_1":7.268,
        "LOW_2":7.268,
        "LOW_3":7.268,
        "LOW_4":7.268,
        "LOW_5":7.268,
        "LOW_TIME":"01:34:00",
        "LOW_TIME2":"01:34:00",
        "LOW_TIME3":"01:34:00",
        "LOW_TIME4":"01:34:00",
        "LOW_TIME5":"01:34:00",
        "LOW_YLD":null,
        "MATUR_DATE":null,
        "MID_1":null,
        "MID_2":null,
        "MID_3":null,
        "MID_NET_CH":null,
        "MID_PRICE":null,
        "MID_SPREAD":null,
        "MONTH_HIGH":null,
        "MONTH_LOW":null,
        "MTD":null,
        "MTHHI_DT":null,
        "MTHLO_DT":null,
        "NETCHNG_1":0.0058,
        "NEWS":"    ",
        "NEWSTM_MS":null,
        "NEWS_TIME":null,
        "NOMINAL":null,
        "NUM_BIDS":null,
        "NUM_MOVES":null,
        "OFFCL_CODE":"MHSH        ",
        "OFFC_CODE2":"            ",
        "OFF_CD_IN2":"   ",
        "OFF_CD_IND":"   ",
        "OPEN_BID":7.267,
        "OPEN_PRC":7.267,
        "OPEN_TIME":"01:34:00",
        "OPEN_TYPE":"B ",
        "OPEN_YLD":null,
        "PCTCHG_3M":null,
        "PCTCHG_6M":null,
        "PCTCHG_INC":null,
        "PCTCHG_MTD":null,
        "PCTCHG_TRT":null,
        "PCTCHG_YTD":null,
        "PCTCHNG":0.08,
        "PREF_DISP":153,
        "PREMIUM":null,
        "PREV_DISP":null,
        "PRIMACT_1":7.2923,
        "PRIMACT_2":7.2911,
        "PRIMACT_3":7.2882,
        "PROD_PERM":363,
        "PROV_SYMB":"                                ",
        "PR_FREQ":"      ",
        "PUTCALLIND":"CALL",
        "P_C_IND1":"    ",
        "QUOTE_TYPE":"   ",
        "QUOTIM_MS":null,
        "RCS_AS_CLA":"   ",
        "RDNDISPLAY":153,
        "RDN_EXCHD2":"NY$",
        "RDN_EXCHID":"   ",
        "RECORDTYPE":209,
        "REL_SPEEDG":null,
        "RIC_DESC":"        ",
        "RT_YIELD_1":null,
        "SALTIM_MS":null,
        "SCALING":"1       ",
        "SC_ACT_TP1":"  ",
        "SC_ACT_TP2":"  ",
        "SC_ACT_TP3":"  ",
        "SC_AFLAG1":" ",
        "SC_AFLAG2":" ",
        "SC_AFLAG3":" ",
        "SEC_ACT_1":7.2929,
        "SEC_ACT_2":7.2925,
        "SEC_ACT_3":7.2885,
        "SEC_HIGH":null,
        "SEC_HI_TP":" ",
        "SEC_LOW":null,
        "SEC_LO_TP":" ",
        "SEC_VOL1":null,
        "SEQNUM":null,
        "SETTLEDATE":null,
        "SPS_SP_RIC":".[SPSEVAI-VAH10-P4",
        "START_DT":null,
        "TERM":null,
        "TIMACT":"08:29:00",
        "TIMCOR":null,
        "TIMCOR_MS":null,
        "TRADE_DATE":null,
        "TRDPRC_1":null,
        "TRDTIM_1":null,
        "TRDTIM_MS":null,
        "TRDVOL_1":null,
        "TRD_UNITS":"4DP ",
        "US_CLOSE":null,
        "US_CL_DT":null,
        "US_HIGH":null,
        "US_HI_TM":null,
        "US_LOW":null,
        "US_LW_TM":null,
        "US_NETCH":null,
        "US_OPEN":null,
        "US_OP_TM":null,
        "VALUE_DT1":"2023-08-22",
        "VALUE_DT2":"2023-08-22",
        "VALUE_DT3":"2023-08-22",
        "VALUE_TS1":"08:29:57",
        "VALUE_TS2":"08:19:04",
        "VALUE_TS3":"08:09:39",
        "WEB_ADRS":"                                                                                ",
        "WEEK_HIGH":null,
        "WEEK_LOW":null,
        "WEIGHTING":null,
        "WKHI_DT":null,
        "WKLO_DT":null,
        "WTD_AVE1":null,
        "YLD_NETCHG":null,
        "YRBDHI_IND":" ",
        "YRBDLO_IND":" ",
        "YRBIDHIGH":7.317,
        "YRBIDLOW":6.6905,
        "YRHIGH":7.317,
        "YRHIGHDAT":null,
        "YRLOW":6.6915,
        "YRLOWDAT":null,
        "YTD":null
      },
      "ID":2,
      "Key":{
        "Name":"CNY=X",
        "Service":"ELEKTRON_DD"
      },
      "PermData":"AwEBNjw=",
      "Qos":{
        "Rate":"JitConflated",
        "Timeliness":"Realtime"
      },
      "SeqNumber":6334,
      "State":{
        "Data":"Ok",
        "Stream":"Open"
      },
      "Type":"Refresh"
    }
  ]

The following is the subsequent json message received:

[
  {
    "DoNotConflate":true,
    "Fields":{
      "ACTIV_DATE":"2023-08-22",
      "ACT_TP_1":"B\u21e9",
      "ACT_TP_2":"B\u21e7",
      "ACT_TP_3":"B\u21e7",
      "ASK":7.292,
      "ASK_HIGH_1":7.2938,
      "ASK_HI_TME":"08:31:00",
      "ASK_LOW_1":7.268,
      "ASK_LO_TME":"01:34:00",
      "BID":7.2908,
      "BID_HIGH_1":7.2933,
      "BID_LOW_1":7.267,
      "BID_NET_CH":0.0043,
      "BID_TICK_1":"\u21e9",
      "CLOSE_ASK":7.289,
      "CLOSE_BID":7.2865,
      "CLOSE_TYPE":"B ",
      "GV2_TEXT":"USDCNY",
      "HIGHTP_1":"B",
      "HIGH_1":7.2933,
      "HIGH_TIME":"08:34:00",
      "HSTCLBDDAT":"2023-08-21",
      "HSTCLSDATE":"2023-08-21",
      "HST_CLOSE":7.2865,
      "HST_CLSBID":7.2865,
      "IRGPRC":0.059,
      "LOWTP_1":"A",
      "LOW_1":7.268,
      "LOW_TIME":"01:34:00",
      "NETCHNG_1":0.0043,
      "OPEN_BID":7.267,
      "OPEN_PRC":7.267,
      "OPEN_TIME":"01:34:00",
      "OPEN_TYPE":"B ",
      "PCTCHNG":0.06,
      "PRIMACT_1":7.2908,
      "SCALING":"1       ",
      "SEC_ACT_1":7.292,
      "TIMACT":"08:39:00",
      "VALUE_DT1":"2023-08-22",
      "VALUE_TS1":"08:39:04",
      "YRBIDHIGH":7.317,
      "YRBIDLOW":6.6905,
      "YRHIGH":7.317,
      "YRLOW":6.6915
    },
    "ID":2,
    "Key":{
      "Name":"CNY=X",
      "Service":"ELEKTRON_DD"
    },
    "SeqNumber":6350,
    "Type":"Update",
    "UpdateType":"Unspecified"
  }
]

The above code is what I tried. Please help. I've been googling for weeks and still couldn't figure it out....


Solution

  • I just saw your comment/question. To be sure I am notified of your new comment, prefix it with @Booboo.

    The list might eventually have data but it could be too late. If process_data finds an empty list the function terminates and when it does the thread terminates. If there is data, your code will always print out all the elements of the list that meet your criteria and then terminate. If the other thread is constantly adding data to this list, you would prefer to never terminate and print out new, relevant data. A queue contains "messages." The receiver can make a call to get the next item on the queue and the item is then automatically removed from the queue so it will never be processed again by the receiver. Moreover, if there is no message on the queue, the receiver will block until there is a message. This removes all timing considerations.

    As long as you are using multithreading rather than multiprocessing, the simplest change is to modify the code to define a global queue:

    from functools import partial
    import sys
    import time
    import getopt
    import requests
    import socket
    import json
    import websocket
    import threading
    from multiprocessing import Process
    
    from queue import Queue
    
    queue = Queue()
    ...
    

    If you change to use multiprocessing and you are running under a platform that uses the fork method to create child processes, (e.g. Linux), then you want:

    ...
    from multiprocessing import Process, Queue
    
    queue = Queue()
    

    Then process_data is modifed as follows:

    def process_data(dataBuffer):
        while True: # Loop forever
            data = queue.get() # Blocks until there is something on the queue
               
            if data['Key']['Name'] == "CNY=X":
                ... # etc.
    

    Then get rid of all references to dataBuffer. Wherever you were appending to dataBuffer, for example, dataBuffer.append(message_json), change the code to be queue.put(message_json).