I have two dataframes that need to be cross joined on a 20-node cluster. However because of their size, a simple crossjoin is failing. I am looking to partition the data and perform the crossjoin and am looking for an efficient way to do it.
Manually split file f1 into three and read into dataframes: df1A, df1B, df1C
.
Manually split file f2 into four and ready into dataframes: df2A, df2B, df2C, df2D
.
Cross join df1A X df2A, df1A X df2B,..,df1A X df2D,...,df1C X df2D
.
Save each cross join in a file and manually put together all files.
This way Spark can perform each cross join parallely and things should complete fairly quickly.
Is there is more efficient way of accomplishing this by reading both files into two dataframes, then partitioning each dataframe into 3 and 4 "pieces" and for each partition of one dataframe cross join with every partition of the other dataframe?
Data frame can be partitioned ether range or hash .
val df1 = spark.read.csv("file1.txt")
val df2 = spark.read.csv("file2.txt")
val partitionedByRange1 = df1.repartitionByRange(3, $"k")
val partitionedByRange2 = df2.repartitionByRange(4, $"k")
val result =partitionedByRange1.crossJoin(partitionedByRange2);
NOTE : set property spark.sql.crossJoin.enabled=true