from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
# DataFrame for kafka
schema = StructType() \
.add("sensor_id", StringType(), True) \
.add("create_dt", StringType(), True) \
.add("collect_time", StringType(), True) \
.add("avg", StringType(), True) \
.add("data_type", StringType(), True) \
.add("max", StringType(), True) \
.add("min", StringType(), True)
###################################### Source Code ######################################
# Spark Bridge local to spark_master
spark = SparkSession.builder \
.master(_connSession)\
.appName("Spark_Streaming+kafka+cassandra") \
.config('spark.cassandra.connection.host', _connCassandraHost) \
.config('spark.cassandra.connection.port', _connCassandraPort) \
.getOrCreate()
# readStream From DATA_SOC at DB Server
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", _connKafka) \
.option('startingOffsets','earliest') \
.option("subscribe", _topic) \
.load() \
.select(from_json(col("value").cast("String"), schema).alias("parsed_value")) \
.select("parsed_value.*")
df.withColumn("create_dt", lit("collect_time"[0:7]))
df.printSchema()
# writeStream at cassandra in Spark master VM
ds = df.writeStream \
.trigger(processingTime='15 seconds') \
.format("org.apache.spark.sql.cassandra") \
.option("checkpointLocation","/home/jeju/jeju_sensor_collector/src/checkPoint") \
.options(table=_table,keyspace=_keySpace) \
.outputMode('append') \
.start()
ds.awaitTermination()
root
|-- sensor_id: string (nullable = true)
|-- create_dt: string (nullable = true)
|-- collect_time: string (nullable = true)
|-- avg: string (nullable = true)
|-- data_type: string (nullable = true)
|-- max: string (nullable = true)
|-- min: string (nullable = true)
com.datastax.spark.connector.writer.NullKeyColumnException: Invalid null value for key column create_dt
create_dt value error so I tried
df.withColumn("create_dt", lit("collect_time"[0:7]))
but still error..... how can i insert value in readstream
create_dt value must be collect_time[0:7]
collect_time value is 2022-10-26 14:30:11.000. and create_dt mute be 202210 and sensor_id&create_dt,collect_time is primary_eky
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
# DataFrame for kafka
schema = StructType() \
.add("sensor_id", StringType(), True) \
.add("collect_time", StringType(), True) \
.add("avg", StringType(), True) \
.add("data_type", StringType(), True) \
.add("max", StringType(), True) \
.add("min", StringType(), True)
###################################### Source Code ######################################
# Spark Bridge local to spark_master
spark = SparkSession.builder \
.master(_connSession)\
.appName("Spark_Streaming+kafka+cassandra") \
.config('spark.cassandra.connection.host', _connCassandraHost) \
.config('spark.cassandra.connection.port', _connCassandraPort) \
.getOrCreate()
# readStream From DATA_SOC at DB Server
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", _connKafka) \
.option('startingOffsets','earliest') \
.option("subscribe", _topic) \
.load() \
.select(from_json(col("value").cast("String"), schema).alias("parsed_value")) \
.select("parsed_value.*")
df = df.withColumn("create_dt", lit("collect_time"[0:7]))
df.printSchema()
# writeStream at cassandra in Spark master VM
ds = df.writeStream \
.trigger(processingTime='15 seconds') \
.format("org.apache.spark.sql.cassandra") \
.option("checkpointLocation","/home/jeju/jeju_sensor_collector/src/checkPoint") \
.options(table=_table,keyspace=_keySpace) \
.outputMode('append') \
.start()
ds.awaitTermination()
I was delete .add("create_dt", StringType(), True)
cause df.withColumn
will make create_dt. and first time I only typing df.withColumn("create_dt", lit("collect_time"[0:7]))
but that is not apllied df. so change df = df.withColumn("create_dt", lit("collect_time"[0:7]))
finally i can apllied