Search code examples
pythonpython-3.xpandasdaskkyotocabinet

How to read a csv and process rows using dask?


I want to read a 28Gb csv file and print the contents. However, my code:

import json
import sys
from datetime import datetime
from hashlib import md5

import dask.dataframe as dd
import dask.multiprocessing
import pandas as pd

from kyotocabinet import *


class IndexInKyoto:

    def hash_string(self, string):
        return md5(string.encode('utf-8')).hexdigest()

    def dbproc(self, db):
        db[self.hash_string(self.row)] = self.row

    def index_row(self, row):
        self.row = row
        DB.process(self.dbproc, "index.kch")

start_time = datetime.utcnow()
row_counter = 0
ob = IndexInKyoto()
df = dd.read_csv("/Users/aviralsrivastava/dev/levelsdb-learning/10gb.csv", blocksize=1000000)
df = df.compute(scheduler='processes')     # convert to pandas
df = df.to_dict(orient='records')
for row in df:
    ob.index_row(row)
print("Total time:")
print(datetime.utcnow-start_time)

is not working. When I run the command htop I can see dask running but there is no output whatsoever. Nor there is any index.kch file created. I rant the same thing without using dask and it was running fine; I was using Pandas streaming api (chunksize) but it was too slow and hence, I want to use dask.


Solution

  • df = df.compute(scheduler='processes')     # convert to pandas
    

    Do not do this!

    You are loading the pieces in separate processes, and then transferring all the data to be stitched into a single data-frame in the main process. This will only add overhead to your processing, and create copies of the data in memory.

    If all you want to do is (for some reason) print every row to the console, then you would be perfectly well using Pandas streaming CSV reader (pd.read_csv(chunksize=..)). You could run it using Dask's chunking and maybe get a speedup is you do the printing in the workers which read the data:

    df = dd.read_csv(..)
    
    # function to apply to each sub-dataframe
    @dask.delayed
    def print_a_block(d):
        for row in df:
            print(row)
    
    dask.compute(*[print_a_block(d) for d in df.to_delayed()])
    

    Note that for row in df actually gets you the columns, maybe you wanted iterrows, or maybe you actually wanted to process your data somehow.