I am new to the BigData eco system and kind of getting started.
I have read several articles about reading a kafka topic using spark streaming but would like to know if it is possible to read from kafka using a spark job instead of streaming ? If yes, could you guys help me in pointing out to some articles or code snippets that can get me started.
My second part of the question is writing to hdfs in parquet format. Once i read from Kafka , i assume i will have an rdd. Convert this rdd into a dataframe and then write the dataframe as a parquet file. Is this the right approach.
Any help appreciated.
For reading data from Kafka and writing it to HDFS, in Parquet format, using Spark Batch job instead of streaming, you can use Spark Structured Streaming.
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
It comes with Kafka as a built in Source, i.e., we can poll data from Kafka. It’s compatible with Kafka broker versions 0.10.0 or higher.
For pulling the data from Kafka in batch mode, you can create a Dataset/DataFrame for a defined range of offsets.
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
Each row in the source has the following schema:
| Column | Type |
| key | binary |
| value | binary |
| topic | string |
| partition | int |
| offset | long |
| timestamp | long |
| timestampType | int |
Now, to write Data to HDFS in parquet format, following code can be written:
For more information on Spark Structured Streaming + Kafka, please refer to following guide - Kafka Integration Guide
I hope it helps!