I am trying to convert RDD to DataFrame in Spark Streaming. I am following below process.
socket_stream = ssc.socketTextStream("localhost", 9999)
def convert_to_df(rdd):
schema = StructType([StructField("text", StringType(), True)])
df =spark.createDataFrame(rdd, schema = schema)
df.show(10)
socket_stream.foreachRDD(convert_to_df)
I am providing input through socket nc -lk 9999
If I give "hello world" as my input it is showing me below error
StructType can not accept object 'hello world' in type <class 'str'>
expected output
+-------=-+
|text |
+---------+
hello world
+---------+
Since you use RDD[str]
you should either provide a matching type. For an atomic value it is either a corresponding AtomicType
from pyspark.sql.types import StringType, StructField, StructType
rdd = sc.parallelize(["hello world"])
spark.createDataFrame(rdd, StringType())
or its string description:
spark.createDataFrame(rdd, "string")
If you want to use StructType
convert data to tuples
first:
schema = StructType([StructField("text", StringType(), True)])
spark.createDataFrame(rdd.map(lambda x: (x, )), schema)
Of course if you're going to just convert each batch to DataFrame
it makes much more sense to use Structured Streaming all the way:
lines = (spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load())