Search code examples
apache-sparkpysparkapache-kafkaspark-structured-streamingspark-kafka-integration

Connecting Pyspark with Kafka


I'm having problem understanding how to connect Kafka and PySpark.

I have kafka installation on Windows 10 with topic nicely streaming data. I've installed pyspark which runs properly-I'm able to create test DataFrame without problem.

But when I try to connect to Kafka stream it gives me error:

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming- Kafka Integration Guide".

Spark documentation is not really helpful - it says: ... groupId = org.apache.spark artifactId = spark-sql-kafka-0-10_2.12 version = 3.2.0 ...

For Python applications, you need to add this above library and its dependencies when deploying your application. See the Deploying subsection below.

And then when you go to Deploying section it says:

As with any Spark applications, spark-submit is used to launch your application. spark-sql-kafka-0-10_2.12 and its dependencies can be directly added to spark-submit using --packages, such as, ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 ...

I'm developing app, I don't want to deploy it. Where and how to add these dependencies if I'm developing pyspark app?

Tried several tutorials ended up being more confused.

Saw answer saying that

"You need to add kafka-clients JAR to your --packages".so-answer

Few more steps could be useful because for someone who is new this is unclear.

versions:

  • kafka 2.13-2.8.1
  • spark 3.1.2
  • java 11.0.12

All environmental variables and paths are correctly set.

EDIT

I've load :

   os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.1'

as suggested but still getting same error. I've triple checked kafka, scala and spark versions and tried various combinations but not it didn't work, I'm still getting same error:

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming-Kafka Integration Guide".

EDIT 2

I installed latest Spark 3.2.0 and Hadoop 3.3.1 and kafka version kafka_2.12-2.8.1. Changed all environmental variables, tested Spark and Kafka - working properly.

My environment variable looks like this now:

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,org.apache.kafka:kafka-clients:2.8.1'

Still no luck, I get same error :(


Solution

  • Spark documentation is not really helpful - it says ... artifactId = spark-sql-kafka-0-10_2.12 version = 3.2.0 ...

    Yes, that is correct... but you're looking at documentation for the latest version of Spark

    Instead, you've mentioned

    versions:

    • spark 3.1.2

    Have you tried looking at the version specific docs?

    In other words, you want the matching spark-sql-kafka version of 3.1.2.

    bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2

    Or in Python,

    scala_version = '2.12'
    spark_version = '3.1.2'
    # TODO: Ensure match above values match the correct versions
    packages = [
        f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
        'org.apache.kafka:kafka-clients:3.2.1'
    ]
    spark = SparkSession.builder\
       .master("local")\
       .appName("kafka-example")\
       .config("spark.jars.packages", ",".join(packages))\
       .getOrCreate()
    

    Or with an env-var

    import os
    
    spark_version = '3.1.2'
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:{}'.format(spark_version)
    
    # init spark here
    

    need to add this above library and its dependencies

    As you found in my previous answer, also append the kafka-clients package using comma-separated list.

    --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.apache.kafka:kafka-clients:2.8.1


    I'm developing app, I don't want to deploy it.

    "Deploy" is Spark terminology. Running locally is still a "deployment"