Search code examples
pysparkapache-spark-sqlsnappydata

Fastest way to create Dictionary from pyspark DF


I'm using Snappydata with pyspark to run my sql queries and convert the output DF into a dictionary to bulk insert it into mongo. I've gone through many similar quertions to test the convertion of a spark DF to Dictionary.

Currently I'm using map(lambda row: row.asDict(), x.collect()) this method to convert my bulk DF to dictionary. And it is taking 2-3sec for 10K records.

I've stated below how I impliment my idea:

x = snappySession.sql("select * from test")
df = map(lambda row: row.asDict(), x.collect())
db.collection.insert_many(df)

Is there any faster way?


Solution

  • I'd recommend using foreachPartition:

    (snappySession
        .sql("select * from test")
        .foreachPartition(insert_to_mongo))
    

    where insert_to_mongo:

    def insert_to_mongo(rows):
        client  = ...
        db = ...
        db.collection.insert_many((row.asDict() for row in rows))