Search code examples
apache-sparkhadoopspark-streaming

Spark Streaming: Text data source supports only a single column


I am consuming Kafka data and then stream the data to HDFS.

The data stored in Kafka topic trial is like:

hadoop
hive
hive
kafka
hive

However, when I submit my codes, it returns:

Exception in thread "main"

org.apache.spark.sql.streaming.StreamingQueryException: Text data source supports only a single column, and you have 7 columns.;
=== Streaming Query ===
Identifier: [id = 2f3c7433-f511-49e6-bdcf-4275b1f1229a, runId = 9c0f7a35-118a-469c-990f-af00f55d95fb]
Current Committed Offsets: {KafkaSource[Subscribe[trial]]: {"trial":{"2":13,"1":13,"3":12,"0":13}}}
Current Available Offsets: {KafkaSource[Subscribe[trial]]: {"trial":{"2":13,"1":13,"3":12,"0":14}}}

My question is: as shown above, the data stored in Kafka comprises only ONE column, why the program says there are 7 columns ?

Any help is appreciated.


My spark-streaming codes:

def main(args: Array[String]): Unit = {
val spark = SparkSession
  .builder.master("local[4]")
  .appName("SpeedTester")
  .config("spark.driver.memory", "3g")
  .getOrCreate()

val ds = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "192.168.95.20:9092")
  .option("subscribe", "trial")
  .option("startingOffsets" , "earliest")
  .load()
  .writeStream
  .format("text")
  .option("path", "hdfs://192.168.95.21:8022/tmp/streaming/fixed")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()
  .awaitTermination()
 }

Solution

  • That is explained in the Structured Streaming + Kafka Integration Guide:

    Each row in the source has the following schema:

    Column Type

    key binary

    value binary

    topic string

    partition int

    offset long

    timestamp long

    timestampType int

    Which gives exactly seven columns. If you want to write only payload (value) select it and cast to string:

    spark.readStream
       ...
      .load()
      .selectExpr("CAST(value as string)")
      .writeStream
      ...
      .awaitTermination()