Search code examples
scalaapache-sparkapache-kafkaspark-structured-streaming

Can't Read from and write to kafka topic using spark scala


I am writing a sample spark streaming program to read the messages from input kafka topic and write it to console and another output kafka topic. Not getting any error or exception, but I am not seeing messages in console as well as in output kafka topic too. Can anyone please let me know where/what am I missing.

This is my code.


    object Test extends App {
      val logger: Logger = LoggerFactory.getLogger(Test.getClass)
      val kafkalogger = org.apache.log4j.Logger.getLogger("kafka")
      kafkalogger.info("Running Pipeline")
      kafkalogger.setLevel(Level.INFO);
      val spark = SparkSession.builder().getOrCreate()
      
      val dfStream = spark.readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "*.*.*.*:9092")
          .option("subscribe", "input-topic")
          .option("startingOffsets", "earliest")
          .load()
    
        dfStream.printSchema()
    
        val messageDF = dfStream.selectExpr("CAST(value AS STRING)")
        
        messageDF.printSchema()
        
        kafkalogger.info("Before writing messages to console")
        
        messageDF.writeStream.outputMode("append").format("console").start()
        
        kafkalogger.info("After writing to console")
         val writeToKafka = dfStream
          //.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
         .selectExpr("CAST(value AS STRING)")
          .writeStream
          .format("kafka")
          .option("kafka.bootstrap.servers", "*.*.*.*:9092")
          .option("topic", "output-topic")
          .option("checkpointLocation", "/tmp/jsp_checkpointdir")
          .start()
        kafkalogger.info("After writing to topic")
        writeToKafka.awaitTermination()
    }

I am able to see the schema of dataframe, but not the actual messages.

21/11/08 07:02:00 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2b99c937{/static/sql,null,AVAILABLE,@Spark}
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- value: string (nullable = true)

21/11/08 07:02:03 INFO kafka: Before writing messages to console
21/11/08 07:02:03 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(memoryOverhead -> name: memoryOverhead, amount: 512, script: , vendor: , cores -> name: cores, amount: 2, script: , vendor: , memory -> name: memory, amount: 4096, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/11/08 07:02:03 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@40fd1a78{/StreamingQuery,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@32430075{/StreamingQuery/json,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery/statistics: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@60f1f95b{/StreamingQuery/statistics,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /StreamingQuery/statistics/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6f926d01{/StreamingQuery/statistics/json,null,AVAILABLE,@Spark}
21/11/08 07:02:03 INFO ServerInfo: Adding filter to /static/sql: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
21/11/08 07:02:03 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@10e4ce98{/static/sql,null,AVAILABLE,@Spark}
21/11/08 07:02:03 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
21/11/08 07:02:03 WARN StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/11/08 07:02:03 INFO MicroBatchExecution: Checkpoint root /mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f resolved to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f.
21/11/08 07:02:03 INFO CheckpointFileManager: Writing atomically to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/metadata using temp file hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/.metadata.2a9256a4-a174-4ebb-b671-a1f011b78158.tmp
21/11/08 07:02:03 INFO CheckpointFileManager: Renamed temp file hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/.metadata.2a9256a4-a174-4ebb-b671-a1f011b78158.tmp to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f/metadata
21/11/08 07:02:03 INFO MicroBatchExecution: Starting [id = 3393cbc3-9899-4053-8cb6-8c7654d8db28, runId = 23729550-4fc3-4500-8fcf-fa76da0ed3cf]. Use hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/mnt/tmp/temporary-666cc7f1-7209-411d-b550-bc9b9e2ed79f to store the query checkpoint.
21/11/08 07:02:03 INFO kafka: After writing to console
21/11/08 07:02:03 WARN StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/11/08 07:02:03 INFO MicroBatchExecution: Checkpoint root /tmp/jsp_checkpointdir resolved to hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/tmp/jsp_checkpointdir.
21/11/08 07:02:03 INFO MicroBatchExecution: Reading table [org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@43309170] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@d19db03]
21/11/08 07:02:03 INFO MicroBatchExecution: Starting new streaming query.
21/11/08 07:02:03 INFO MicroBatchExecution: Stream started from {}
21/11/08 07:02:03 INFO MicroBatchExecution: Starting [id = 3ae88b1d-c4bb-4a7c-be5c-4a75fda4903d, runId = 0d089f20-c110-465b-add9-080e213dd72d]. Use hdfs://ip-10-0-2-229.ap-south-1.compute.internal:8020/tmp/jsp_checkpointdir to store the query checkpoint.
21/11/08 07:02:03 INFO kafka: After writing to topic
21/11/08 07:02:03 INFO MicroBatchExecution: Reading table [org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@43309170] from DataSourceV2 named 'kafka' [org.apache.spark.sql.kafka010.KafkaSourceProvider@d19db03]
21/11/08 07:02:03 INFO MicroBatchExecution: Starting new streaming query.
21/11/08 07:02:03 INFO MicroBatchExecution: Stream started from {}
21/11/08 07:02:03 INFO ConsumerConfig: ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest

These are the messages in input kafka topic

[root@ip-*.*.*.* kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input-topic --from-beginning
test message
message2
message3
message4
message4
message5
message6

No messages in output-topic too.

[root@ip-*.*.*.* kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output-topic --from-beginning

Solution

  • This issue is resolved after reverting the configuration changes to config/server.properties file and kept it as default config. I'm unable to consume from the beginning of the topic if messages entered the topic before Spark streaming job is started. This is different issue.