Search code examples
apache-sparkpysparkapache-spark-sqlspark-structured-streaming

Is it possible to register the dataFrame as a SQL temporary view on spark structured streaming dataframe?


I am reading data from kafka topic using spark structured streaming, I want to run sql queries on this streaming data.

Following is code:-

from pyspark.sql import SparkSession, SQLContext

def process_batch(df, id):
    # here I want to run sql queries on data frame but it is giving error
    # table not found 
    spark = spark_session()
    df.createOrReplaceTempView("x")
    spark.sql("select * from x")

def spark_session():
    spark = SparkSession \
        .builder \
        .appName("Python kafka Spark example") \
        .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1') \
        .getOrCreate()

    return spark

def main():

    spark = spark_session()

    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "test") \
        .option("startingOffsets", "earliest") \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \

    query = df.writeStream.foreachBatch(process_batch).start()


    query.awaitTermination()

error = org.apache.spark.sql.AnalysisException: Table or view not found: x;


Solution

  • created a new dataframe from an existing batch df and ran sql queries over it , solved problem by this appraoch.

    from pyspark.sql import SparkSession, SQLContext
    from pyspark.sql.types import *
    
    def process_batch(df, id):
        df.show()
        df1 = df.collect()
    
        spark = spark_session()
    
        schemaString = "key value"
    
        fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
        schema = StructType(fields)
    
        df2 = spark.createDataFrame(df1, schema)
        df2.createOrReplaceTempView("x")
        spark.sql("SELECT value FROM x limit 2").show()
    
    def spark_session():
        spark = SparkSession \
            .builder \
            .appName("Python kafka Spark example") \
            .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1') \
            .getOrCreate()
    
        return spark
    
    
    def main():
    
        spark = spark_session()
    
        df = spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "localhost:9092") \
            .option("subscribe", "test") \
            .option("startingOffsets", "earliest") \
            .load() \
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    
        # query = df \
        #             .writeStream \
        #                     .outputMode("append") \
        #                     .format("console") \
        #                     .start()
    
        #spark.sql("select * from test").show()
    
        query = df.writeStream.foreachBatch(process_batch).start()
    
    
        query.awaitTermination()
    
    
    if __name__ == "__main__":
        main()