Search code examples
dataframepysparkapache-spark-sqlflat-file

PySpark equivalent of Spark sliding() function


I have a multiline flat file which I wish to convert to an rdd/dataframe as a 4 column dataframe, or rdd array via PySpark. The Spark Scala code is,

#from pyspark.sql import SparkSession # Scala equivalant
#from pyspark import SparkContext # Scala equivalant
import org.apache.spark.mllib.rdd.RDDFunctions._
path = '/mypath/file'
spark = SparkSession.builder.appName('findApp').getOrCreate()
rdd = spark.sparkContext.textFile(path).sliding(4, 4).toDF("x", "y", "z", "a")

There is not a sliding() function in PySpark. What is the equivalent? The input is

A
B
C
D
A2
B2
C2
D2

The desired output is

x y z a
A B C D
A2 B2 C2 D2

I'd better add that the data sets are around 50 million records, per data set and there are couple of 100 data sets. So it's over 2 terabyte of data in total because one column holds >300 features. I like the pandas code by @GoodMan


Solution

  • feel free to modernize it. Since I am not aware how your data is sorted, feel free to change the window function. You can also change the number of columns if required.

    from pyspark.sql import SparkSession
    from pyspark.sql import Window
    from pyspark.sql.functions import *
    
    # Create a SparkSession
    spark = SparkSession.builder.appName("Data Transformation").getOrCreate()
    
    data = ["A", "B", "C", "D", "A2", "B2", "C2", "D2"]
    
    schema = "col1 string, id int"
    
    data_rdd = spark.sparkContext.parallelize(data)
    
    new_rdd = data_rdd.map(lambda x: (x,1))
    
    df = new_rdd.toDF(schema)
    
    mywindow = Window.orderBy("col1")
    
    df_row = df.withColumn("column",row_number().over(mywindow)).withColumn("row",expr("ceiling(column/4)"))
    
    df_row.createOrReplaceTempView("data")
    
    df_final = spark.sql("""
    Select row,
    case when column%4= 1 then col1 else null end as col1,
    case when column%4= 2 then col1 else null end as col2,
    case when column%4= 3 then col1 else null end as col3,
    case when column%4= 0 then col1 else null end as col4
    from data order by col1
    
    """)
    
    df_final2 = df_final.groupBy('row').agg(first("col1",ignorenulls=True).alias("x"), \
                          first("col2",ignorenulls=True).alias("y"), \
                          first("col3",ignorenulls=True).alias("z"), \
                          first("col4",ignorenulls=True).alias("a"),)
    
    print(data)
    
    df_final2.show()
    
    spark.stop()