Search code examples
apache-sparkpysparkspark-avro

How to write spark dataframe in a single file in local system without using coalesce


I want to generate an avro file from a pyspark dataframe and currently I am doing coalesce as below

df = df.coalesce(1)
df.write.format('avro').save('file:///mypath')

But this is leading to memory issues now as all the data will be fetched to memory before writing and my data size is growing consistently everyday. So I want to write the data by each partition so that the data would be written to disk in chunks and doesnot raise OOM issues. I found that toLocalIterator helps in achieving this. But I am not sure how to use it. I tried the below usage and it returns all rows

iter = df.toLocalIterator()
for i in iter:
    print('writing some data')
    # write the data into disk/file

The iter is iterating over each row rather than each partition. How should I do this?


Solution

  • when you do df = df.coalesce(1) all the data is collected into one of the worker nodes. if that node cannot handle such huge due to resource constraints on the node then the job will fail with OOM error.

    As per spark documentation toLocalIterator Returns an iterator that contains all rows in this current Dataset and Max Memory It can consume is equivalent to largest partition in this Dataset

    How toLocalIterator works?

    The first partition is sent to the driver. If you continue to iterate and reach the end of the first partition, the second partition will be sent to the driver node and so on continuous till last partition.. so that is why (max memory it can occupy = largest partition) make sure your master node has sufficient ram and disk.

    toLocalIterator.next() method makes sure to pull next partition records if previous partition processing is done.

    what you can do is 
    
        //batch objects like 1000 per batch
        df.toLocalIterator().foreach(obj => {
          //add object in array
          //if batch size is reached ... 
            //then serialize them and use FileOutputStream and save in local location 
        })
    

    note: make sure to cache your parentDF .. otherwise in some scenarios every partition needs to be recomputed.