Search code examples
pythonapache-sparkapache-spark-sqlspark-streaming

Convert RDD to Dataframe in Spark Streaming Python


I am trying to convert RDD to DataFrame in Spark Streaming. I am following below process.

socket_stream = ssc.socketTextStream("localhost", 9999)
def convert_to_df(rdd):
    schema = StructType([StructField("text", StringType(), True)])
    df =spark.createDataFrame(rdd, schema = schema)
    df.show(10)

socket_stream.foreachRDD(convert_to_df)

I am providing input through socket nc -lk 9999

If I give "hello world" as my input it is showing me below error

StructType can not accept object 'hello world' in type <class 'str'>

expected output

+-------=-+
|text     |
+---------+
hello world
+---------+

Solution

  • Since you use RDD[str] you should either provide a matching type. For an atomic value it is either a corresponding AtomicType

    from pyspark.sql.types import StringType, StructField, StructType
    
    rdd = sc.parallelize(["hello world"])
    spark.createDataFrame(rdd, StringType())
    

    or its string description:

    spark.createDataFrame(rdd, "string")
    

    If you want to use StructType convert data to tuples first:

    schema = StructType([StructField("text", StringType(), True)])
    
    spark.createDataFrame(rdd.map(lambda x: (x, )), schema)
    

    Of course if you're going to just convert each batch to DataFrame it makes much more sense to use Structured Streaming all the way:

    lines = (spark
        .readStream
        .format("socket")
        .option("host", "localhost")
        .option("port", 9999)
        .load())