I am reading a CSV file through DataSet and then sending that file to Kafka. spark-submit job is working fine but when the program sends files to Kafka it gives me an exception. Below is the exception -
FileStreamSource at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) Caused by: org.apache.spark.sql.AnalysisException: Required attribute 'value' not found; at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72) at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:71) at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:87) at org.apache.spark.sql.kafka010.KafkaSink.addBatch(KafkaSink.scala:38)
Below is my code:
System.setProperty("hadoop.home.dir", "C:\\hadoop-2.7.3\\");
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
SparkSession spark = SparkSession
.builder()
.config("spark.sql.session.timeZone", "UTC")
.config("spark.sql.streaming.checkpointLocation", "D:\\Workspac\\checkpoint")
.appName("StructuredStreamingAverage")
.master("local")
.getOrCreate();
StructType userSchema = new StructType().add("startdate", "string").add("accountname", "string").add("eventdate", "string")/*.add("u_lastlogin", "string")*//*.add("u_firstName", "string")*/;
Dataset<Row> dataset = spark.
readStream()
.option("header",true)
.option("sep",",")
.schema(userSchema)
.csv("D:\\Workspac\\sophos");
Dataset<Row> df_DateConverted = dataset.withColumn("eventdate", from_unixtime(col("eventdate").divide(1000)).cast(DataTypes.TimestampType));
if(df_DateConverted.isStreaming()) {
try {
df_DateConverted
.select("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "rawEventTopic")
.start().awaitTermination();
} catch (StreamingQueryException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
As exception cleary says that
Caused by: org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
So may be the issue is with .select("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value")
code but I dont know what I should write here. Thanks.
I tried with
df_DateConverted
.select(col("key").cast("string"), from_json(col("value").cast("string"),userSchema))
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "rawEventTopic")
.start().awaitTermination();
but facing below exception -
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`key`' given input columns: [startdate, accountname, eventdate];;
'Project [unresolvedalias(cast('key as string), None), jsontostructs(StructField(startdate,StringType,true), StructField(accountname,StringType,true), StructField(eventdate,StringType,true), cast('value as string), Some(UTC)) AS jsontostructs(CAST(value AS STRING))#10]
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5f3ddc86,csv,List(),Some(StructType(StructField(startdate,StringType,true), StructField(accountname,StringType,true), StructField(eventdate,StringType,true))),List(),None,Map(sep -> ,, header -> true, path -> D:\cybernetizWorkspace\sophos),None), FileSource[D:\cybernetizWorkspace\sophos], [startdate#0, accountname#1, eventdate#2]
Output of df_DateConverted.printSchema(); is below -
root
|-- startdate: string (nullable = true)
|-- accountname: string (nullable = true)
|-- eventdate: timestamp (nullable = true)
As you can see from df_DateConverted
schema you don't have a key column and hence the error when doing col("key").cast("string")
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`key`' given input columns: [startdate, accountname, eventdate];;
You can simply remove the key while writing the data to kafka because key
is optional while writing data to kafka. refer: here
In your above implementation the syntax "to_json(struct(*)) AS value"
is wrong and hence you are getting an error on value
.
You should do something like:
df_DateConverted
.select(to_json(struct($"startdate", $"accountname", $"eventdate")).alias("value"))
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "rawEventTopic")
.start().awaitTermination();