Search code examples
javaapache-sparkapache-kafkaapache-spark-sqlspark-structured-streaming

How to write selected columns to Kafka topic?


I am using spark-sql-2.4.1v with java 1.8. and kafka versions spark-sql-kafka-0-10_2.11_2.4.3 and kafka-clients_0.10.0.0

StreamingQuery queryComapanyRecords = 
                 comapanyRecords
                 .writeStream()
                .format("kafka")
                .option("kafka.bootstrap.servers",KAFKA_BROKER)
                .option("topic", "in_topic") 
                .option("auto.create.topics.enable", "false")
                .option("key.serializer","org.apache.kafka.common.serialization.StringDeserializer")
                .option("value.serializer", "com.spgmi.ca.prescore.serde.MessageRecordSerDe")
                .option("checkpointLocation", "/app/chkpnt/" )
                .outputMode("append")
                .start();



queryLinkingMessageRecords.awaitTermination();

Giving error :

Caused by: org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:71)
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:71)
    at scala.Option.getOrElse(Option.scala:121)

I tried to fix as below, but unable to send the value i.e. which is a java bean in my case.

  StreamingQuery queryComapanyRecords = 
                     comapanyRecords.selectExpr("CAST(company_id AS STRING) AS key", "to_json(struct(\"company_id\",\"fiscal_year\",\"fiscal_quarter\")) AS value")
                    .writeStream()
                    .format("kafka")
                    .option("kafka.bootstrap.servers",KAFKA_BROKER)
                    .option("topic", "in_topic")
                    .start();

So is there anyway in java how to handle/send this value( i.e. Java bean as record) ??.


Solution

  • Kafka data source requires a specific schema for reading (loading) and writing (saving) datasets.

    Quoting the official documentation (highlighting the most important field / column):

    Each row in the source has the following schema:

    ...

    value binary

    ...

    In other words, you have Kafka records in the value column when reading from a Kafka topic and you have to make your data to save to a Kafka topic available in the value column as well.

    In other words, whatever is or is going to be in Kafka is in the value column. The value column is where you "store" business records (the data).

    On to your question:

    How to write selected columns to Kafka topic?

    You should "pack" the selected columns together so they can all together be part of the value column. to_json standard function is a good fit so the selected columns are going to be a JSON message.

    Example

    Let me give you an example.

    Don't forget to start a Spark application or spark-shell with the Kafka data source. Mind the versions of Scala (2.11 or 2.12) and Spark (e.g. 2.4.4).

    spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4
    

    Let's start by creating a sample dataset. Any multiple-field dataset would work.

    val ns = Seq((0, "zero")).toDF("id", "name")
    scala> ns.show
    +---+----+
    | id|name|
    +---+----+
    |  0|zero|
    +---+----+
    

    If we tried to write the dataset to a Kafka topic, it would error out due to value column missing. That's what you faced initially.

    scala> ns.write.format("kafka").option("topic", "in_topic").save
    org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
      at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$validateQuery$6(KafkaWriter.scala:71)
      at scala.Option.getOrElse(Option.scala:138)
      ...
    

    You have to come up with a way to "pack" multiple fields (columns) together and make it available as value column. struct and to_json standard functions will do it.

    val vs = ns.withColumn("value", to_json(struct("id", "name")))
    scala> vs.show(truncate = false)
    +---+----+----------------------+
    |id |name|value                 |
    +---+----+----------------------+
    |0  |zero|{"id":0,"name":"zero"}|
    +---+----+----------------------+
    

    Saving to a Kafka topic should now be a breeze.

    vs.write.format("kafka").option("topic", "in_topic").save