I'm using spark structured streaming to read data from single node Kafka. Running below setup locally on mac. I can read via spark-submit, but does not work in Jupyter notebook.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,from_json
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType
import time
spark.stop()
time.sleep(30)
spark = SparkSession.builder\
.appName('KafkaIntegration6')\
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4")\
.getOrCreate()
print('done')
kafka_stream_df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "TestTopic2")\
.option("startingOffsets", "earliest")\
.load()
Error: AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.
Question: Im able to run same code, can read from Kafka, using spark submit by passing spark-sql-kafka-0-10_2.12:3.4 as --packages. However when I try to run from Jupyter notebook by importing Kafka package during session creation, getting this error. When looking at spark web UI on http://localhost:4040/environment/, I can see package org.apache.spark:spark-sql-kafka-0-10_2.12:3.4 mentioned under spark.jars.packages
This alternative method still seems to be working
import pyspark
from pyspark.sql import SparkSession
import os
scala_version = '2.12' # TODO: Ensure this is correct
spark_version = pyspark.__version__
packages = [
f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
'org.apache.kafka:kafka-clients:3.5.1'
]
args = os.environ.get('PYSPARK_SUBMIT_ARGS', '')
if not args:
args = f'--packages {",".join(packages)}'
print('Using packages', packages)
os.environ['PYSPARK_SUBMIT_ARGS'] = f'{args} pyspark-shell'
else:
print(f'Found existing args: {args}')
spark = SparkSession.builder\
.master("local")\
.appName("kafka-example")\
.getOrCreate()
In the logs, I get this, which I was not seeing when using spark.jars.packages
(which should work, according to docs)
Using packages ['org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1', 'org.apache.kafka:kafka-clients:3.5.1']
:: loading settings :: url = jar:file:/private/tmp/venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/jomoore/.ivy2/cache
The jars for the packages stored in: /Users/jomoore/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-27575234-cd3f-4e23-9c25-4ea0f1309b35;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1 in central
found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
found org.apache.hadoop#hadoop-client-api;3.3.4 in central
found org.xerial.snappy#snappy-java;1.1.10.1 in local-m2-cache
found org.slf4j#slf4j-api;2.0.6 in central
found commons-logging#commons-logging;1.1.3 in local-m2-cache
found com.google.code.findbugs#jsr305;3.0.0 in local-m2-cache
found org.apache.commons#commons-pool2;2.11.1 in local-m2-cache
found org.apache.kafka#kafka-clients;3.5.1 in local-m2-cache
found com.github.luben#zstd-jni;1.5.5-1 in local-m2-cache
found org.lz4#lz4-java;1.8.0 in local-m2-cache
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.4.1/spark-sql-kafka-0-10_2.12-3.4.1.jar ...
[SUCCESSFUL ] org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1!spark-sql-kafka-0-10_2.12.jar (195ms)
downloading file:/Users/jomoore/.m2/repository/org/apache/kafka/kafka-clients/3.5.1/kafka-clients-3.5.1.jar ...
[SUCCESSFUL ] org.apache.kafka#kafka-clients;3.5.1!kafka-clients.jar (11ms)
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.4.1/spark-token-provider-kafka-0-10_2.12-3.4.1.jar ...
[SUCCESSFUL ] org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1!spark-token-provider-kafka-0-10_2.12.jar (117ms)