Search code examples
pythonoptimizationfinancequantitative-financecandlestick-chart

Generate CandleSticks from ticker data


I have an algorithm to convert the stock market ticker data to CandleSticks. I have a code that calls this function many many times and I'm trying to optimize the function so it run faster. So what I would like is for you to read the code and give me suggestions on how to make it faster

For the sake of this question you can think about the market ticker as two lists. A list of the price of a certain stock

stock_price = [ 5, 5.1, 5, 4.9 , ... ]

and a list of the timestamps asociated to each price.

timestamps  = [ 1534339504.36133 , 1534339704.36133, 1534339804.36133, 1534340504.36133, ... ]

You will notice that the sampling rate is variable, sometimes it can be few seconds some times it can be several minutes. The input lists are sorted by increasing timestamp.

So I give a number N of candles that I want to compute. each candle of a duration of time T. If I ask for 10 candles of a duration of 5 minutes each and I don't have enough timestamps the first candles will be NAN. On the other hand if I have a large amount of timestamps from the last weeks, only the last samples will be taken into account to compute the last 10 candles and the rest will be ignorer.

There is another detail. I compute the candles in a sligthly different manner. Normally, they are referenced to the UTC, me I consider the last element from my lists as the closing price and time of my last candle

In the end, I need lists or numpy arrays with the candle's open, high, low, close prices as well as the time for N candles of a time interval of T

So to convert these two lists into candlecharts I do the following

# time_interval is the size of the candle: 1, 5, 10... minutes, hours, etc
# nb_candles is the number of candles that I want to extract ( for example the last 5 candles )
def convert_samples_to_candles( stock_price , times , time_interval , nb_candles=-1 ):
    #If no data return NaNs        
    if( len(stock_price) == 0 or len(times) == 0 ):
        NO_RESPONSE = [np.NaN]
        return NO_RESPONSE, NO_RESPONSE, NO_RESPONSE, NO_RESPONSE, NO_RESPONSE

    last_time = times[-1]
    last_val  = stock_price[-1]

    #if nb_candles is not specified compute all the candles
    if( nb_candles==-1 ):
        nb_candles = int((last_time - times[0])/time_interval) + 1

    candles_open  = [np.NaN]*nb_candles
    candles_close = [np.NaN]*nb_candles
    candles_high  = [np.NaN]*nb_candles 
    candles_low   = [np.NaN]*nb_candles 
    candles_time  = [np.NaN]*nb_candles

    k=1
    last_candle = -1

    #Initialize the last candles with the last value
    candles_open[-1]  = last_val
    candles_close[-1] = last_val
    candles_high[-1]  = last_val
    candles_low[-1]   = last_val
    candles_time[-1]  = last_time

    #Iterate and fill each candle from the last one to the first one
    nb_times = len(times)
    while( k < nb_times and times[-1*k] + nb_candles*time_interval >  last_time ):

        a_last       = stock_price[-1*k]
        a_timestamp  = times[-1*k]
        candle_index = (-1*int((last_time - a_timestamp)/time_interval) -1)

        if( candle_index > -1 ):
            k += 1
            continue

        if( candle_index < last_candle ):
            candles_time[ candle_index ]  = a_timestamp
            candles_close[ candle_index ] = a_last
            candles_high[ candle_index ]  = a_last
            candles_low[ candle_index ]   = a_last
            candles_open[ candle_index ]  = a_last

            last_candle = candle_index

        else:
            #print candle_index, candles_open 
            candles_open[ candle_index ]  = a_last

            if( candles_high[ candle_index ]  < a_last ):
                candles_high[ candle_index ]  = a_last

            if( candles_low[ candle_index ]   > a_last ):
                candles_low[ candle_index ]   = a_last

        k += 1


    return candles_open, candles_close, candles_high, candles_low, candles_time

Thank you very much for your time!


Solution

  • So after some research I tried to give a different approach to compute the candles.

    I defined a Candle_Handler class and I iteratively insert samples, and update the candles.

    This code is is slightly faster than the one in the question when you iteratively recompute the candles.

    class Candle_Handler(  ):
    
        def __init__(self, time_interval, nb_candles=5 ):
    
            self.nb_candles    = nb_candles
            self.time_interval = time_interval
    
            self.times  = []
            self.values = []
    
            self.candles_t = [ [] for _ in range(nb_candles) ]
            self.candles_v = [ [] for _ in range(nb_candles) ]       
    
    
        def insert_sample( self, value, time ):
            self.candles_t[-1].append(time)
            self.candles_v[-1].append(value)
    
            for i in range( self.nb_candles ):
    
                candle_index = -1*(i+1)
                if( len(self.candles_t[candle_index]) == 0 ): continue
    
                candle_time_interval = (i+1)*self.time_interval
    
                if( i + 1 == self.nb_candles ):
                    while( len(self.candles_t[candle_index])> 0 and  time - self.candles_t[candle_index][0] > candle_time_interval ):
                        del self.candles_t[candle_index][0]
                        del self.candles_v[candle_index][0]
    
                else:
    
                    while( len(self.candles_t[candle_index])> 0 and  time - self.candles_t[candle_index][0] > candle_time_interval ):
                        ltime  = self.candles_t[candle_index].pop(0)
                        lvalue = self.candles_v[candle_index].pop(0)
    
                        self.candles_t[candle_index-1].append( ltime )
                        self.candles_v[candle_index-1].append( lvalue )
    
    
        def get_all_candles(self, delta=1.0 ):
    
            last_time = self.candles_t[-1][-1]
    
            candles_open  = [ c[0]   if len(c)>0 else np.NAN for c in self.candles_v ] 
            candles_close = [ c[-1]  if len(c)>0 else np.NAN for c in self.candles_v ] 
            candles_high  = [ max(c) if len(c)>0 else np.NAN for c in self.candles_v ]  
            candles_low   = [ min(c) if len(c)>0 else np.NAN for c in self.candles_v ]  
            #candles_time  = [ c[-1]  if len(c)>0 else np.NAN for c in self.candles_t ]        
            candles_time  = [ last_time - (self.nb_candles - (c+1) )*self.time_interval for c in range(self.nb_candles) ]         
    
    
            for i in range( 1, self.nb_candles ):
                if( np.isnan( candles_close[i-1] ) ): continue
    
                if( np.isnan( candles_open[i] ) ):
                    candles_open[i]  = candles_close[i-1]
                    candles_close[i] = candles_close[i-1]
                    candles_high[i]  = candles_close[i-1]
                    candles_low[i]   = candles_close[i-1]
    
    
            if( not delta == 1.0 ):
                candles_close[-1] = candles_close[-1]*delta
                if( candles_high[-1] < candles_close[-1] ):
                    candles_high[-1] = candles_close[-1]
                if( candles_low[-1]  > candles_close[-1] ):
                    candles_low[-1]  = candles_close[-1]
    
                if( len(self.candles_v[-1]) == 1 ):
                    candles_open[-1] = candles_close[-1]
    
            return candles_open, candles_close, candles_high, candles_low, candles_time