I try to integrate spark and kafka in Jupyter notebook by using pyspark. Here is my work environment.
Spark version: Spark 2.2.1 Kafka version: Kafka_2.11-0.8.2.2 Spark streaming kafka jar: spark-streaming-kafka-0-8-assembly_2.11-2.2.1.jar
I added a Spark streaming kafka assembly jar file to spark-defaults.conf file.
When i start streamingContext for pyspark streaming, this error appears as can't read kafka version from MANIFEST.MF.
Here is my code.
from pyspark import SparkContext, SparkConf
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import sys
import os
from kafka import KafkaProducer
#Receive data handler
def handler(message):
records = message.collect()
for record in records:
print(record)
#producer.send('receive', str(res))
#producer.flush()
producer = KafkaProducer(bootstrap_servers='slave02:9092')
sc = SparkContext(appName="SparkwithKafka")
ssc = StreamingContext(sc, 1)
#Create Kafka streaming with argv
zkQuorum = 'slave02:2181'
topic = 'send'
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic:1})
kvs.foreachRDD(handler)
ssc.start()
Sorry for my posting in Scala
Spark 2.2.1 with Scala 2.11 and Kafka 0.10 do all work though they are marked as experimental
The proper way to create a stream if using above libraries is to use
val kStrream = KafkaUtils.createDirectStream(
ssc, PreferConsistent,
Subscribe[String, String](Array("weblogs-text"), kafkaParams, fromOffsets))
Pay attention to the dependencies for example kafka has jar files that are specific to the version of Kafka Client version and spark version.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.1</version>
<scope>provided</scope>
</dependency>