Search code examples
pythonpython-asyncio

Python async dataprocessing function


I want to run the runBacktest() function in async is this possible?

import pandas as pd
from pathlib import Path
from datetime import datetime

from indicators import *


#Loading the file.
dfCryptoCap = pd.read_csv(f"{Path(__file__).parent.resolve()}\CRYPTOCAP_TOTAL, 720_b2571.csv")
dfBtcUsd = pd.read_csv(f"{Path(__file__).parent.resolve()}\INDEX_BTCUSD, 720_17561.csv")

# Add Column for converted unix timestamp to datetime
dfCryptoCap['timeiso'] = pd.to_datetime(dfCryptoCap['time'],unit='s')
dfBtcUsd['timeiso'] = pd.to_datetime(dfBtcUsd['time'],unit='s')

dfCryptoCapHA = generateHeikinAshi(dfCryptoCap)
dfBtcUsdHA = generateHeikinAshi(dfBtcUsd)


results = []
def runBacktest(lenSmooth1, winningLenSmooth1, winningPNL):
    
    dfCryptoCapEMA = dfCryptoCapHA.copy()
    dfCryptoCapEMA['open'] = calculateEMA(dfCryptoCapHA['open'], lenSmooth1)
    dfCryptoCapEMA['high'] = calculateEMA(dfCryptoCapHA['high'], lenSmooth1)
    dfCryptoCapEMA['low'] = calculateEMA(dfCryptoCapHA['low'], lenSmooth1)
    dfCryptoCapEMA['close'] = calculateEMA(dfCryptoCapHA['close'], lenSmooth1)
    # print(dfCryptoCapSMA1)

    portfoliosize = 1000
    entryPrice = 0.0
    traderesult = 0.0
    for i in range(1, len(dfCryptoCapEMA)):

        if dfCryptoCapEMA.iloc[i]['close'] > dfCryptoCapEMA.iloc[i]['open'] and dfCryptoCapEMA.iloc[i -1]['close'] <= dfCryptoCapEMA.iloc[i -1]['open']:
            btcOHLC = dfBtcUsd.loc[dfBtcUsd['time'] == dfCryptoCapEMA.iloc[i]['time']]
            entryPrice = btcOHLC.iloc[0]['close'].tolist()


        elif dfCryptoCapEMA.iloc[i]['close'] < dfCryptoCapEMA.iloc[i]['open'] and dfCryptoCapEMA.iloc[i -1]['close'] >= dfCryptoCapEMA.iloc[i -1]['open']:
            btcOHLC = dfBtcUsd.loc[dfBtcUsd['time'] == dfCryptoCapEMA.iloc[i]['time']]

            try:
                traderesult = (btcOHLC.iloc[0]['close'].tolist() - entryPrice) / entryPrice * 100
            except:
                traderesult = 0
            
            if traderesult > 0:
                portfoliosize = portfoliosize * (1 + (traderesult / 100))
            elif traderesult < 0:
                portfoliosize = portfoliosize * (1 - (abs(traderesult) / 100))

    result = f"Round - lenSmooth1 = {lenSmooth1} | PNL = {round(portfoliosize,2)} || currentWinner = {winningLenSmooth1} | currentWinnerPNL = {round(winningPNL,2)}"
    #print(result)  
    
    if portfoliosize > winningPNL:
        results.append(result)
        winningPNL = portfoliosize
        winningLenSmooth1 = lenSmooth1
    
    return [winningLenSmooth1, winningPNL]


result = []
for x in range(1, 151, 1):
    if x == 1:
        result = runBacktest(x, 0, 0)
    else:
        result = runBacktest(x, result[0], result[1])
        print(results[len(results) - 1])

Currently, the backtest runs synchronously and with larger datasets each iteration take up to a minute at the moment. I want to speed up the process by run runBacktest() asynchronous with different lenSmooth1 value and review the results at the end.

i tried to add the following to my script but i don't see any improvements in duration

import asyncio

async def run_tasks():
   tasks = [runBacktest(x, 0, 0) for x in range(1, 151, 1)]
   #await asyncio.wait(tasks)
   await asyncio.gather(*tasks)

def main():
   loop = asyncio.new_event_loop()
   asyncio.set_event_loop(loop)
   loop.run_until_complete(run_tasks())
   loop.close()

main()
print(results)

Solution

  • Async will improve performance when you are working with IO operations (e.g. waiting for a response over a network, loading a file, etc.) but it won't do much to help with CPU-bound processes.

    This article does a great job of breaking down different means of achieving concurrency in Python. What you're looking for is likely an implementation of the multiprocessing library.