How can I set a schema for a streaming DataFrame
in PySpark.
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
.readStream\
.format('socket')\
.option('host', '192.168.0.113')\
.option('port', 5560)\
.load()
For example I need a table like :
Name, lastName, PhoneNumber
Bob, Dylan, 123456
Jack, Ma, 789456
....
How can I set the header/schema to ['Name','lastName','PhoneNumber'] with their data types.
Also, Is it possible to display this table continuously, or say top 20 rows of the DataFrame
. When I tried it I get the error
"pyspark.sql.utils.AnalysisException: 'Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;\nProject"
TextSocketSource
doesn't provide any integrated parsing options. It is only possible to use one of the two formats:
timestamp and text if includeTimestamp
is set to true
with the following schema:
StructType([
StructField("value", StringType()),
StructField("timestamp", TimestampType())
])
text only if includeTimestamp
is set to false
with the schema as shown below:
StructType([StructField("value", StringType())]))
If you want to change this format you'll have to transform the stream to extract fields of interest, for example with regular expressions:
from pyspark.sql.functions import regexp_extract
from functools import partial
fields = partial(
regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$"
)
lines.select(
fields(idx=1).alias("name"),
fields(idx=2).alias("last_name"),
fields(idx=3).alias("phone_number")
)