I'm using Snappydata with pyspark to run my sql queries and convert the output DF into a dictionary to bulk insert it into mongo. I've gone through many similar quertions to test the convertion of a spark DF to Dictionary.
Currently I'm using map(lambda row: row.asDict(), x.collect())
this method to convert my bulk DF to dictionary. And it is taking 2-3sec for 10K records.
I've stated below how I impliment my idea:
x = snappySession.sql("select * from test")
df = map(lambda row: row.asDict(), x.collect())
db.collection.insert_many(df)
Is there any faster way?
I'd recommend using foreachPartition
:
(snappySession
.sql("select * from test")
.foreachPartition(insert_to_mongo))
where insert_to_mongo
:
def insert_to_mongo(rows):
client = ...
db = ...
db.collection.insert_many((row.asDict() for row in rows))