Search code examples
plotlyplotly-dashplotly-pythoncandlestick-chart

how to use the dash plotly extenddata api with go.candlestick plots?


I'm trying to plot a candlestick from a live stream of data by extending it updon receiving new data and its not working. Currently by inspecting chrome's DevTool console it displays the following error -

Error: attribute type must be an array of length equal to indices array length

Below is my code. It uses the dash extension library to receive data instead of using interval because at the same time the actual program also updates a tick stream.

I speculate it has either something to do with plotly candlestick data formatting or simply go.candlestick doesnt support extenddata, any idea how to fix it?

import json
import dash_html_components as html
import dash_core_components as dcc
import plotly.graph_objects as go
from gevent import sleep
from dash import Dash
from dash.dependencies import Input, Output, State
from dash_extensions import WebSocket
from dash_extensions.websockets import SocketPool, run_server

import threading, queue, time
from datetime import datetime
from pytz import timezone
x = 0
def genData(q):
    global x
    while True:
        values = {}
        values['high'] = 10
        values['low'] = 5 
        values['open'] = 6
        values['close'] = 9
        
        values['CHANGE'] = 0
        values['MARKET_STATE'] = 'edit'
        values['UPDATE_TIME'] = datetime.now(timezone('Europe/London')).time().strftime('%H:%M:%S')
        item = {}
        item['values'] = values
        q.put(item)
        x += 0.1
        sleep(1)

# This block runs asynchronously.
def ws_handler(ws, q):
    # for data in data_feed():
    #     ws.send(json.dumps(data))  # send data
    while True:
        while len(list(q.queue)) != 0:
            data = q.get()
            #print(data)
            sleep(0.1)  # delay between data events
            ws.send(json.dumps(data))
            q.task_done()


q = queue.Queue()
# Create example app.
app = Dash(prevent_initial_callbacks=True)
socket_pool = SocketPool(app, handler=lambda x: ws_handler(x, q))
app.layout = html.Div([
    dcc.Graph(id="graph", figure=go.Figure(go.Candlestick(x=[],open=[],close=[],high=[],low=[]))),#go.Scatter(x=[], y=[]))),
    WebSocket(id="ws")
])


@app.callback(Output("graph", "extendData"), [Input("ws", "message")], [State("graph", "figure")])
def update_graph(msg, figure):    
    data = json.loads(msg['data'])['values']
    return (
        dict(
            x = [[ datetime.strptime(data['UPDATE_TIME'],'%H:%M:%S') ]],
            open  = [[ data['open'] ]],
            close = [[ data['close']]],
            high  = [[ data['high']]],
            low   = [[ data['low']]],
            type='candlestick'
        ),
        [0],
        10 
    )

if __name__ == '__main__':
    threading.Thread(target=genData, daemon=True, args=(q,)).start()
    threading.Thread(target=run_server,daemon=True,args=(app,),kwargs={'port':5000}).start()
    

Solution

  • The error you are receiving is telling you that the key 'type', in the dict you are returning, does not correspond with a value that is an array of length equal to the length of the other arrays in the dict. Basically, the solution is to remove the key 'type' in the dict and your callback should work. Here is the correct code, where I inserted a comment of the solution:

    @app.callback(Output("graph", "extendData"), [Input("ws", "message")], [State("graph", "figure")])
    def update_graph(msg, figure):    
        data = json.loads(msg['data'])['values']
        #removed 'type' key in dict below
        return (
            dict(
                x = [[ datetime.strptime(data['UPDATE_TIME'],'%H:%M:%S') ]],
                open  = [[ data['open'] ]],
                close = [[ data['close']]],
                high  = [[ data['high']]],
                low   = [[ data['low']]]
            ),
            [0],
            10 
        )
    

    Here is a quick example I used successfully:

    @app.callback(
        Output('indicators', 'extendData'),
        Input('live_interval', 'n_intervals')
    )
    def update_rates(n_intervals):
        #gather data into dict from dataframe
        new_data = candles[['time', 'high', 'low', 'open', 'close']].iloc[-1]
        new_data['time'] = new_data['time'] + delta*n_intervals
        new_data = new_data.to_dict()
    
        #data rename
        new_data['x'] = new_data['time']
        del new_data['time']
    
        #data reshape
        for k in new_data.keys():
            new_data[k] = [[new_data[k]]]
    
        return new_data, [1]