Search code examples
scalaapache-sparkapache-kafkaspark-structured-streamingspark-kafka-integration

How to map a message to a object with `schema` and `payload` in Spark structured streaming correctly?


I am hoping to map a message to a object with schema and payload inside in during Spark structured streaming.

This is my original code

val input_schema = new StructType()
  .add("timestamp", DoubleType)
  .add("current", DoubleType)
  .add("voltage", DoubleType)
  .add("temperature", DoubleType)

val df = spark.readStream
  .schema(input_schema)
  .option("maxFilesPerTrigger", 1)
  .parquet("s3a://my-bucket/my-folder/")
  .select(to_json(struct("*")).alias("value"))

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
  )
  .option("topic", "my-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

This will output message in this format when writes to Kafka:

{
  "timestamp": 1682556571.14622,
  "current": 2.0172032595808242,
  "voltage": 19.34080877806074,
  "temperature": 37.461518565900434
}

However, I hope to add a field schema and move this to the payload so that later I can sink to Postgres by JDBC Sink Connector such as Aiven's JDBC Sink and Source Connectors.

Based on this doc, I think I should use "decimal" as each field type.

So this is the Kafka message format I hope to generate:

{
  "schema":{
    "type": "struct",
    "fields":[
      {
        "type": "decimal",
        "optional": false,
        "field": "timestamp"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "current"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "voltage"
      },
      {
        "type": "decimal",
        "optional": true,
        "field": "temperature"
      }
    ]
  },
  "payload":{
    "timestamp": 1682556571.14622,
    "current": 2.0172032595808242,
    "voltage": 19.34080877806074,
    "temperature": 37.461518565900434
  }
}

I tried to update my Spark code to

val input_schema = new StructType()
  .add("timestamp", DoubleType)
  .add("current", DoubleType, nullable = true)
  .add("voltage", DoubleType, nullable = true)
  .add("temperature", DoubleType, nullable = true)

val output_schema = new StructType()
  .add("timestamp", "decimal")
  .add("current", "decimal", nullable = true)
  .add("voltage", "decimal", nullable = true)
  .add("temperature", "decimal", nullable = true)

val df = spark.readStream
  .schema(input_schema)
  .option("maxFilesPerTrigger", 1)
  .parquet("s3a://my-bucket/my-folder/")
  .select(
    to_json(struct("*")).alias("payload")
  )
  .withColumn(
    "schema",
    to_json(struct(
      lit("struct").alias("type"),
      lit(output_schema.fields.map(field => struct(
        lit(field.dataType).alias("type"),
        lit(field.nullable).alias("optional"),
        lit(field.name).alias("field")
      ))).alias("fields")
    ))
  )
  .select(
    to_json(struct(
      col("schema"),
      col("payload")
    )).alias("value")
  )

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
  )
  .option("topic", "my-topic")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

But when I spark-submit, I got error

Exception in thread "main" org.apache.spark.SparkRuntimeException: The feature is not supported: literal for 'DecimalType(10,0)' of class org.apache.spark.sql.types.DecimalType.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.literalTypeUnsupportedError(QueryExecutionErrors.scala:296)
    at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:101)
    at org.apache.spark.sql.functions$.lit(functions.scala:125)
    at com.hongbomiao.IngestFromS3ToKafka$.$anonfun$main$1(IngestFromS3ToKafka.scala:46)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
    at com.hongbomiao.IngestFromS3ToKafka$.main(IngestFromS3ToKafka.scala:45)
    at com.hongbomiao.IngestFromS3ToKafka.main(IngestFromS3ToKafka.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I kind of feeling StructType causing it returns DecimalType(10,0). Maybe in this case I should not use StructType?

I am not sure how to generate exactly "decimal" in the output message. Any guide would be appreciate, thanks!


Solution

  • I think my original guess is correct. StructType is special and should not be used here. Using "decimal" is same with using DecimalType(10,0) which is why it shows that error.

    In my case, I should use simple struct.

    Also, I found as downstream Aiven's JDBC sink connector document may have issue:

    enter image description here

    I tried decimal, Decimal, DECIMAL, all failed when running the connector.

    As default value.converter is org.apache.kafka.connect.json.JsonConverter, so I should use double (or float) in my case based on JsonSchema source code.

    So the final working version is:

    val input_schema = new StructType()
      .add("timestamp", DoubleType)
      .add("current", DoubleType, nullable = true)
      .add("voltage", DoubleType, nullable = true)
      .add("temperature", DoubleType, nullable = true)
    
    val df = spark.readStream
      .schema(input_schema)
      .option("maxFilesPerTrigger", 1)
      .parquet("s3a://my-bucket/my-folder/")
      .select(
        struct(
          lit("struct").alias("type"),
          array(
            struct(
              lit("double").alias("type"),
              lit(false).alias("optional"),
              lit("timestamp").alias("field")
            ),
            struct(
              lit("double").alias("type"),
              lit(true).alias("optional"),
              lit("current").alias("field")
            ),
            struct(
              lit("double").alias("type"),
              lit(true).alias("optional"),
              lit("voltage").alias("field")
            ),
            struct(
              lit("double").alias("type"),
              lit(true).alias("optional"),
              lit("temperature").alias("field")
            )
          ).alias("fields")
        ).alias("schema"),
        struct("*").alias("payload")
      )
      .select(to_json(struct("*")).alias("value"))
    
    val query = df.writeStream
      .format("kafka")
      .option(
        "kafka.bootstrap.servers",
        "hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
      )
      .option("topic", "my-topic")
      .option("checkpointLocation", "/tmp/checkpoint")
      .start()
    

    This will print format

    {
      "schema":{
        "type": "struct",
        "fields":[
          {
            "type": "double",
            "optional": false,
            "field": "timestamp"
          },
          {
            "type": "double",
            "optional": true,
            "field": "current"
          },
          {
            "type": "double",
            "optional": true,
            "field": "voltage"
          },
          {
            "type": "double",
            "optional": true,
            "field": "temperature"
          }
        ]
      },
      "payload":{
        "timestamp": 1682556571.14622,
        "current": 2.0172032595808242,
        "voltage": 19.34080877806074,
        "temperature": 37.461518565900434
      }
    }