Search code examples
pythonapache-sparkapache-kafkacassandraspark-structured-streaming

how insert value into spark readstream


Source Code

from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *

# DataFrame for kafka
schema = StructType() \
        .add("sensor_id", StringType(), True) \
        .add("create_dt", StringType(), True) \
        .add("collect_time", StringType(), True) \
        .add("avg", StringType(), True) \
        .add("data_type", StringType(), True) \
        .add("max", StringType(), True) \
        .add("min", StringType(), True)

###################################### Source Code ######################################

# Spark Bridge local to spark_master
spark = SparkSession.builder \
    .master(_connSession)\
    .appName("Spark_Streaming+kafka+cassandra") \
    .config('spark.cassandra.connection.host', _connCassandraHost) \
    .config('spark.cassandra.connection.port', _connCassandraPort) \
    .getOrCreate()

# readStream From DATA_SOC at DB Server
df = spark.readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", _connKafka) \
  .option('startingOffsets','earliest') \
  .option("subscribe", _topic) \
  .load() \
  .select(from_json(col("value").cast("String"), schema).alias("parsed_value")) \
  .select("parsed_value.*")

df.withColumn("create_dt", lit("collect_time"[0:7]))

df.printSchema()

# writeStream at cassandra in Spark master VM
ds = df.writeStream \
  .trigger(processingTime='15 seconds') \
  .format("org.apache.spark.sql.cassandra") \
  .option("checkpointLocation","/home/jeju/jeju_sensor_collector/src/checkPoint") \
  .options(table=_table,keyspace=_keySpace) \
  .outputMode('append') \
  .start()

ds.awaitTermination()

root
 |-- sensor_id: string (nullable = true)
 |-- create_dt: string (nullable = true)
 |-- collect_time: string (nullable = true)
 |-- avg: string (nullable = true)
 |-- data_type: string (nullable = true)
 |-- max: string (nullable = true)
 |-- min: string (nullable = true)

Error

com.datastax.spark.connector.writer.NullKeyColumnException: Invalid null value for key column create_dt

create_dt value error so I tried

df.withColumn("create_dt", lit("collect_time"[0:7]))

but still error..... how can i insert value in readstream

create_dt value must be collect_time[0:7]

example

collect_time value is 2022-10-26 14:30:11.000. and create_dt mute be 202210 and sensor_id&create_dt,collect_time is primary_eky

Sample Success data on cassandra

enter image description here


Solution

  • Solved Code

    from pyspark.sql import SparkSession, SQLContext
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    
    # DataFrame for kafka
    schema = StructType() \
            .add("sensor_id", StringType(), True) \
            .add("collect_time", StringType(), True) \
            .add("avg", StringType(), True) \
            .add("data_type", StringType(), True) \
            .add("max", StringType(), True) \
            .add("min", StringType(), True)
    
    ###################################### Source Code ######################################
    
    # Spark Bridge local to spark_master
    spark = SparkSession.builder \
        .master(_connSession)\
        .appName("Spark_Streaming+kafka+cassandra") \
        .config('spark.cassandra.connection.host', _connCassandraHost) \
        .config('spark.cassandra.connection.port', _connCassandraPort) \
        .getOrCreate()
    
    # readStream From DATA_SOC at DB Server
    df = spark.readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", _connKafka) \
      .option('startingOffsets','earliest') \
      .option("subscribe", _topic) \
      .load() \
      .select(from_json(col("value").cast("String"), schema).alias("parsed_value")) \
      .select("parsed_value.*")
    
    df = df.withColumn("create_dt", lit("collect_time"[0:7]))
    
    
    df.printSchema()
    
    # writeStream at cassandra in Spark master VM
    ds = df.writeStream \
      .trigger(processingTime='15 seconds') \
      .format("org.apache.spark.sql.cassandra") \
      .option("checkpointLocation","/home/jeju/jeju_sensor_collector/src/checkPoint") \
      .options(table=_table,keyspace=_keySpace) \
      .outputMode('append') \
      .start()
    
    ds.awaitTermination()
    

    I was delete .add("create_dt", StringType(), True) cause df.withColumn will make create_dt. and first time I only typing df.withColumn("create_dt", lit("collect_time"[0:7])) but that is not apllied df. so change df = df.withColumn("create_dt", lit("collect_time"[0:7])) finally i can apllied