I have a multiline flat file which I wish to convert to an rdd/dataframe as a 4 column dataframe, or rdd array via PySpark. The Spark Scala code is,
#from pyspark.sql import SparkSession # Scala equivalant
#from pyspark import SparkContext # Scala equivalant
import org.apache.spark.mllib.rdd.RDDFunctions._
path = '/mypath/file'
spark = SparkSession.builder.appName('findApp').getOrCreate()
rdd = spark.sparkContext.textFile(path).sliding(4, 4).toDF("x", "y", "z", "a")
There is not a sliding()
function in PySpark. What is the equivalent? The input is
A
B
C
D
A2
B2
C2
D2
The desired output is
x | y | z | a |
---|---|---|---|
A | B | C | D |
A2 | B2 | C2 | D2 |
I'd better add that the data sets are around 50 million records, per data set and there are couple of 100 data sets. So it's over 2 terabyte of data in total because one column holds >300 features. I like the pandas code by @GoodMan
feel free to modernize it. Since I am not aware how your data is sorted, feel free to change the window function. You can also change the number of columns if required.
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import *
# Create a SparkSession
spark = SparkSession.builder.appName("Data Transformation").getOrCreate()
data = ["A", "B", "C", "D", "A2", "B2", "C2", "D2"]
schema = "col1 string, id int"
data_rdd = spark.sparkContext.parallelize(data)
new_rdd = data_rdd.map(lambda x: (x,1))
df = new_rdd.toDF(schema)
mywindow = Window.orderBy("col1")
df_row = df.withColumn("column",row_number().over(mywindow)).withColumn("row",expr("ceiling(column/4)"))
df_row.createOrReplaceTempView("data")
df_final = spark.sql("""
Select row,
case when column%4= 1 then col1 else null end as col1,
case when column%4= 2 then col1 else null end as col2,
case when column%4= 3 then col1 else null end as col3,
case when column%4= 0 then col1 else null end as col4
from data order by col1
""")
df_final2 = df_final.groupBy('row').agg(first("col1",ignorenulls=True).alias("x"), \
first("col2",ignorenulls=True).alias("y"), \
first("col3",ignorenulls=True).alias("z"), \
first("col4",ignorenulls=True).alias("a"),)
print(data)
df_final2.show()
spark.stop()