Search code examples
apache-sparkpysparkapache-kafkaspark-structured-streamingspark-kafka-integration

Spark 3.x Integration with Kafka in Python


Kafka with spark-streaming throws an error:

from pyspark.streaming.kafka import KafkaUtils ImportError: No module named kafka

I have already setup a kafka broker and a working spark environment with one master and one worker.

import os

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python2.7'
import findspark
findspark.init('/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7')
import pyspark
import sys
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils



if __name__=="__main__":
    sc = SparkContext(appName="SparkStreamAISfromKAFKA")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc,1)
    kvs = KafkaUtils.createStream(ssc,"my-kafka-broker","raw-event-streaming-consumer",{'enriched_ais_messages':1})
    lines = kvs.map(lambda x: x[1])
    lines.count().map(lambda x: 'Messages AIS: %s' % x).pprint()
    ssc.start()
    ssc.awaitTermination()

I assume for the error that something is missing related to kafka ans specifically with the versions. Can anyone help with this?

spark-version: version 3.0.0-preview2

I execute with:

/usr/spark/spark-3.0.0-preview2-bin-hadoop2.7/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1 --jars spark-streaming-kafka-0-10_2.11 spark_streamer.py spark://mysparkip:7077

Solution

  • According to the Spark Streaming + Kafka Integration Guide:

    "Kafka 0.8 support is deprecated as of Spark 2.3.0."

    In addition, the screenshot below shows that Python is no supported for Kafka 0.10 (and higher).

    enter image description here

    In your case you will have to use Spark 2.4 in order to get your code running.

    PySpark supports Structured Streaming

    If you plan to use the latest version of Spark (e.g. 3.x) and still want to integrate Spark with Kafka in Python you can use Structured Streaming. You will find detailed instructions on how to use the Python API in the Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher):

    Reading Data from Kafka

    # Subscribe to 1 topic
    df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
      .option("subscribe", "topic1") \
      .load()
    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    

    Writing Data to Kafka

    # Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    ds = df \
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
      .writeStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
      .option("topic", "topic1") \
      .start()