Search code examples
pythonpysparkspark-streaming

Turn Spark Stream from Socket into DataFrame


I established a socket connection with my SparkSession that is sending a row of a .csv-File to my Stream.

So far my (PySpark-) code looks like this:

stream = spark.readStream.format('socket').option('host', 'localhost').option('port', 5555).load()

stream.writeStream.format('console').start().awaitTermination()

This prints the lines of the .csv File in one column like this:

+-----------------+
|            value|
+-----------------+
|[2, C4653, C5030]|
+-----------------+

But what I actually would like to have is this:

+-----+-----+-----+
| col1| col2| col3|
+-----+-----+-----+
|    2|C4653|C5030|
+-----+-----+-----+

I would like to use this as a DataFrame to feed a ML-Pipeline with.

How can I process the incoming Stream Data?


Solution

  • You already has a Data Frame stream, which just need to change schema.

    Just add this transformation after load() call:

     stream.selectExpr("split(value, ' ')[0] as col1","split(value, ' ')[1] as col2", "split(value, ' ')[2] as col3")