I am doing a small task of reading access_logs file using a kafka topic, then i count the status and send the count of Status to another kafka topic. But i keep getting errors like, while i use no output mode or append mode:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
When using complete mode:
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: requirement failed: KafkaTable does not support Complete mode.
This is my code: structuredStreaming.scala
package com.spark.sparkstreaming
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
import org.apache.spark.sql.functions._
import java.util.regex.Pattern
import java.util.regex.Matcher
import java.text.SimpleDateFormat
import java.util.Locale
import Utilities._
object structuredStreaming {
case class LogEntry(ip:String, client:String, user:String, dateTime:String, request:String, status:String, bytes:String, referer:String, agent:String)
val logPattern = apacheLogPattern()
val datePattern = Pattern.compile("\\[(.*?) .+]")
def parseDateField(field: String): Option[String] = {
val dateMatcher = datePattern.matcher(field)
if (dateMatcher.find) {
val dateString = dateMatcher.group(1)
val dateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH)
val date = (dateFormat.parse(dateString))
val timestamp = new java.sql.Timestamp(date.getTime());
return Option(timestamp.toString())
} else {
None
}
}
def parseLog(x:Row) : Option[LogEntry] = {
val matcher:Matcher = logPattern.matcher(x.getString(0));
if (matcher.matches()) {
val timeString = matcher.group(4)
return Some(LogEntry(
matcher.group(1),
matcher.group(2),
matcher.group(3),
parseDateField(matcher.group(4)).getOrElse(""),
matcher.group(5),
matcher.group(6),
matcher.group(7),
matcher.group(8),
matcher.group(9)
))
} else {
return None
}
}
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("StructuredStreaming")
.master("local[*]")
.config("spark.sql.streaming.checkpointLocation", "/home/UDHAV.MAHATA/Documents/Checkpoints")
.getOrCreate()
setupLogging()
// val rawData = spark.readStream.text("/home/UDHAV.MAHATA/Documents/Spark/logs")
val rawData = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "testing")
.load()
import spark.implicits._
val structuredData = rawData.flatMap(parseLog).select("status")
val windowed = structuredData.groupBy($"status").count()
//val query = windowed.writeStream.outputMode("complete").format("console").start()
val query = windowed
.writeStream
.outputMode("complete")
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "sink")
.start()
query.awaitTermination()
spark.stop()
}
}
Utilities.scala
package com.spark.sparkstreaming
import org.apache.log4j.Level
import java.util.regex.Pattern
import java.util.regex.Matcher
object Utilities {
def setupLogging() = {
import org.apache.log4j.{Level, Logger}
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
}
def apacheLogPattern():Pattern = {
val ddd = "\\d{1,3}"
val ip = s"($ddd\\.$ddd\\.$ddd\\.$ddd)?"
val client = "(\\S+)"
val user = "(\\S+)"
val dateTime = "(\\[.+?\\])"
val request = "\"(.*?)\""
val status = "(\\d{3})"
val bytes = "(\\S+)"
val referer = "\"(.*?)\""
val agent = "\"(.*?)\""
val regex = s"$ip $client $user $dateTime $request $status $bytes $referer $agent"
Pattern.compile(regex)
}
}
Can anyone help me with where am i doing mistake?
As the error message is suggesting, you need to add a watermark to your grouping.
Replace this line
val windowed = structuredData.groupBy($"status").count()
with
import org.apache.spark.sql.functions.{window, col}
val windowed = structuredData.groupBy(window(col("dateTime"), "10 minutes"), "status").count()
It is important that the column dateTime
is of type timestamp
which you parse from the Kafka source anyway if I understood your code correctly.
Without the window, Spark will not know how much data to be aggregated.