Search code examples
apache-sparkpysparkapache-spark-sqlspark-structured-streaming

Spark Structured Streaming using sockets, set SCHEMA, Display DATAFRAME in console


How can I set a schema for a streaming DataFrame in PySpark.

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *

spark = SparkSession\
    .builder\
    .appName("StructuredNetworkWordCount")\
    .getOrCreate()

# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
   .readStream\
   .format('socket')\
   .option('host', '192.168.0.113')\
   .option('port', 5560)\
   .load()

For example I need a table like :

Name,  lastName,   PhoneNumber    
Bob, Dylan, 123456    
Jack, Ma, 789456
....

How can I set the header/schema to ['Name','lastName','PhoneNumber'] with their data types.

Also, Is it possible to display this table continuously, or say top 20 rows of the DataFrame. When I tried it I get the error

"pyspark.sql.utils.AnalysisException: 'Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;\nProject"


Solution

  • TextSocketSource doesn't provide any integrated parsing options. It is only possible to use one of the two formats:

    • timestamp and text if includeTimestamp is set to true with the following schema:

      StructType([
          StructField("value", StringType()),
          StructField("timestamp", TimestampType())
      ])
      
    • text only if includeTimestamp is set to false with the schema as shown below:

      StructType([StructField("value", StringType())]))
      

    If you want to change this format you'll have to transform the stream to extract fields of interest, for example with regular expressions:

    from pyspark.sql.functions import regexp_extract
    from functools import partial
    
    fields = partial(
        regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$"
    )
    
    lines.select(
        fields(idx=1).alias("name"),
        fields(idx=2).alias("last_name"), 
        fields(idx=3).alias("phone_number")
    )