I am hoping to map a message to a object with schema
and payload
inside in during Spark structured streaming.
This is my original code
val input_schema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType)
.add("voltage", DoubleType)
.add("temperature", DoubleType)
val df = spark.readStream
.schema(input_schema)
.option("maxFilesPerTrigger", 1)
.parquet("s3a://my-bucket/my-folder/")
.select(to_json(struct("*")).alias("value"))
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "my-topic")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
This will output message in this format when writes to Kafka:
{
"timestamp": 1682556571.14622,
"current": 2.0172032595808242,
"voltage": 19.34080877806074,
"temperature": 37.461518565900434
}
However, I hope to add a field schema
and move this to the payload
so that later I can sink to Postgres by JDBC Sink Connector such as Aiven's JDBC Sink and Source Connectors.
Based on this doc, I think I should use "decimal"
as each field type.
So this is the Kafka message format I hope to generate:
{
"schema":{
"type": "struct",
"fields":[
{
"type": "decimal",
"optional": false,
"field": "timestamp"
},
{
"type": "decimal",
"optional": true,
"field": "current"
},
{
"type": "decimal",
"optional": true,
"field": "voltage"
},
{
"type": "decimal",
"optional": true,
"field": "temperature"
}
]
},
"payload":{
"timestamp": 1682556571.14622,
"current": 2.0172032595808242,
"voltage": 19.34080877806074,
"temperature": 37.461518565900434
}
}
I tried to update my Spark code to
val input_schema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType, nullable = true)
.add("voltage", DoubleType, nullable = true)
.add("temperature", DoubleType, nullable = true)
val output_schema = new StructType()
.add("timestamp", "decimal")
.add("current", "decimal", nullable = true)
.add("voltage", "decimal", nullable = true)
.add("temperature", "decimal", nullable = true)
val df = spark.readStream
.schema(input_schema)
.option("maxFilesPerTrigger", 1)
.parquet("s3a://my-bucket/my-folder/")
.select(
to_json(struct("*")).alias("payload")
)
.withColumn(
"schema",
to_json(struct(
lit("struct").alias("type"),
lit(output_schema.fields.map(field => struct(
lit(field.dataType).alias("type"),
lit(field.nullable).alias("optional"),
lit(field.name).alias("field")
))).alias("fields")
))
)
.select(
to_json(struct(
col("schema"),
col("payload")
)).alias("value")
)
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "my-topic")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
But when I spark-submit
, I got error
Exception in thread "main" org.apache.spark.SparkRuntimeException: The feature is not supported: literal for 'DecimalType(10,0)' of class org.apache.spark.sql.types.DecimalType.
at org.apache.spark.sql.errors.QueryExecutionErrors$.literalTypeUnsupportedError(QueryExecutionErrors.scala:296)
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:101)
at org.apache.spark.sql.functions$.lit(functions.scala:125)
at com.hongbomiao.IngestFromS3ToKafka$.$anonfun$main$1(IngestFromS3ToKafka.scala:46)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at com.hongbomiao.IngestFromS3ToKafka$.main(IngestFromS3ToKafka.scala:45)
at com.hongbomiao.IngestFromS3ToKafka.main(IngestFromS3ToKafka.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
I kind of feeling StructType
causing it returns DecimalType(10,0)
. Maybe in this case I should not use StructType
?
I am not sure how to generate exactly "decimal"
in the output message. Any guide would be appreciate, thanks!
I think my original guess is correct. StructType
is special and should not be used here. Using "decimal"
is same with using DecimalType(10,0)
which is why it shows that error.
In my case, I should use simple struct
.
Also, I found as downstream Aiven's JDBC sink connector document may have issue:
I tried decimal
, Decimal
, DECIMAL
, all failed when running the connector.
As default value.converter
is org.apache.kafka.connect.json.JsonConverter
, so I should use double
(or float
) in my case based on JsonSchema source code.
So the final working version is:
val input_schema = new StructType()
.add("timestamp", DoubleType)
.add("current", DoubleType, nullable = true)
.add("voltage", DoubleType, nullable = true)
.add("temperature", DoubleType, nullable = true)
val df = spark.readStream
.schema(input_schema)
.option("maxFilesPerTrigger", 1)
.parquet("s3a://my-bucket/my-folder/")
.select(
struct(
lit("struct").alias("type"),
array(
struct(
lit("double").alias("type"),
lit(false).alias("optional"),
lit("timestamp").alias("field")
),
struct(
lit("double").alias("type"),
lit(true).alias("optional"),
lit("current").alias("field")
),
struct(
lit("double").alias("type"),
lit(true).alias("optional"),
lit("voltage").alias("field")
),
struct(
lit("double").alias("type"),
lit(true).alias("optional"),
lit("temperature").alias("field")
)
).alias("fields")
).alias("schema"),
struct("*").alias("payload")
)
.select(to_json(struct("*")).alias("value"))
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"hm-kafka-kafka-bootstrap.hm-kafka.svc:9092"
)
.option("topic", "my-topic")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
This will print format
{
"schema":{
"type": "struct",
"fields":[
{
"type": "double",
"optional": false,
"field": "timestamp"
},
{
"type": "double",
"optional": true,
"field": "current"
},
{
"type": "double",
"optional": true,
"field": "voltage"
},
{
"type": "double",
"optional": true,
"field": "temperature"
}
]
},
"payload":{
"timestamp": 1682556571.14622,
"current": 2.0172032595808242,
"voltage": 19.34080877806074,
"temperature": 37.461518565900434
}
}