Search code examples
apache-sparkpysparkapache-kafkaspark-streamingspark-streaming-kafka

Spark Streaming + Kafka integration


I try to integrate spark and kafka in Jupyter notebook by using pyspark. Here is my work environment.

Spark version: Spark 2.2.1 Kafka version: Kafka_2.11-0.8.2.2 Spark streaming kafka jar: spark-streaming-kafka-0-8-assembly_2.11-2.2.1.jar

I added a Spark streaming kafka assembly jar file to spark-defaults.conf file.

When i start streamingContext for pyspark streaming, this error appears as can't read kafka version from MANIFEST.MF.

enter image description here

Here is my code.

from pyspark import SparkContext, SparkConf
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import sys
import os

from kafka import KafkaProducer

#Receive data handler
def handler(message):
    records = message.collect()
    for record in records:
        print(record)
        #producer.send('receive', str(res))
        #producer.flush()

producer = KafkaProducer(bootstrap_servers='slave02:9092')
sc = SparkContext(appName="SparkwithKafka")
ssc = StreamingContext(sc, 1)

#Create Kafka streaming with argv
zkQuorum = 'slave02:2181'
topic = 'send'
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic:1})
kvs.foreachRDD(handler)

ssc.start()

Solution

  • Sorry for my posting in Scala

    Spark 2.2.1 with Scala 2.11 and Kafka 0.10 do all work though they are marked as experimental

    The proper way to create a stream if using above libraries is to use

    val kStrream =  KafkaUtils.createDirectStream(
              ssc, PreferConsistent,
              Subscribe[String, String](Array("weblogs-text"), kafkaParams, fromOffsets))
    

    Pay attention to the dependencies for example kafka has jar files that are specific to the version of Kafka Client version and spark version.

           <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>2.2.1</version>
                <scope>provided</scope>
            </dependency>