Search code examples
pythonapache-sparkapache-kafkaspark-structured-streaming

Unable to use kafka jars on Jupyter notebook


I'm using spark structured streaming to read data from single node Kafka. Running below setup locally on mac. I can read via spark-submit, but does not work in Jupyter notebook.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col,from_json
from pyspark.sql.types import  StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType

import time
spark.stop()
time.sleep(30)

spark = SparkSession.builder\
    .appName('KafkaIntegration6')\
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4")\
    .getOrCreate()
print('done')

kafka_stream_df = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "TestTopic2")\
    .option("startingOffsets", "earliest")\
    .load()

Error: AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.

Question: Im able to run same code, can read from Kafka, using spark submit by passing spark-sql-kafka-0-10_2.12:3.4 as --packages. However when I try to run from Jupyter notebook by importing Kafka package during session creation, getting this error. When looking at spark web UI on http://localhost:4040/environment/, I can see package org.apache.spark:spark-sql-kafka-0-10_2.12:3.4 mentioned under spark.jars.packages


Solution

  • This alternative method still seems to be working

    import pyspark
    from pyspark.sql import SparkSession
    import os
    
    scala_version = '2.12'  # TODO: Ensure this is correct
    spark_version = pyspark.__version__
    
    packages = [
        f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
        'org.apache.kafka:kafka-clients:3.5.1'
    ]
    
    args = os.environ.get('PYSPARK_SUBMIT_ARGS', '')
    if not args:
        args = f'--packages {",".join(packages)}'
        print('Using packages', packages)
        os.environ['PYSPARK_SUBMIT_ARGS'] = f'{args} pyspark-shell'
    else:
        print(f'Found existing args: {args}') 
    
    spark = SparkSession.builder\
       .master("local")\
       .appName("kafka-example")\
       .getOrCreate()
    

    In the logs, I get this, which I was not seeing when using spark.jars.packages (which should work, according to docs)

    Using packages ['org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1', 'org.apache.kafka:kafka-clients:3.5.1']
    :: loading settings :: url = jar:file:/private/tmp/venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
    Ivy Default Cache set to: /Users/jomoore/.ivy2/cache
    The jars for the packages stored in: /Users/jomoore/.ivy2/jars
    org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
    org.apache.kafka#kafka-clients added as a dependency
    :: resolving dependencies :: org.apache.spark#spark-submit-parent-27575234-cd3f-4e23-9c25-4ea0f1309b35;1.0
        confs: [default]
        found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1 in central
        found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
        found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
        found org.apache.hadoop#hadoop-client-api;3.3.4 in central
        found org.xerial.snappy#snappy-java;1.1.10.1 in local-m2-cache
        found org.slf4j#slf4j-api;2.0.6 in central
        found commons-logging#commons-logging;1.1.3 in local-m2-cache
        found com.google.code.findbugs#jsr305;3.0.0 in local-m2-cache
        found org.apache.commons#commons-pool2;2.11.1 in local-m2-cache
        found org.apache.kafka#kafka-clients;3.5.1 in local-m2-cache
        found com.github.luben#zstd-jni;1.5.5-1 in local-m2-cache
        found org.lz4#lz4-java;1.8.0 in local-m2-cache
    downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.4.1/spark-sql-kafka-0-10_2.12-3.4.1.jar ...
        [SUCCESSFUL ] org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1!spark-sql-kafka-0-10_2.12.jar (195ms)
    downloading file:/Users/jomoore/.m2/repository/org/apache/kafka/kafka-clients/3.5.1/kafka-clients-3.5.1.jar ...
        [SUCCESSFUL ] org.apache.kafka#kafka-clients;3.5.1!kafka-clients.jar (11ms)
    downloading https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.4.1/spark-token-provider-kafka-0-10_2.12-3.4.1.jar ...
        [SUCCESSFUL ] org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1!spark-token-provider-kafka-0-10_2.12.jar (117ms)