Search code examples
apache-sparkapache-kafkakafka-producer-api

No output to Kafka topic: Spark Structured Streaming and Kafka Integration


I am trying to send stream output from Apache Spark 2.3.1 to Apache Kafka using kafka sink:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf
import org.apache.kafka.clients
import org.apache.spark.streaming
import java.sql.Timestamp
import java.util.Properties

object CQ3D {    
  def main(args: Array[String]) {

val spark = SparkSession
      .builder
      .appName("test")
      .getOrCreate()

val predictionStreamSchema = new StructType()
  .add("production_id", "long")
  .add("type", "string")

val lines = spark
      .readStream
      .option("sep", ",")
      .schema(testSchema)
      .csv("/path/to/directory/")

val query = lines.selectExpr("CAST(production_id AS STRING) AS key", "type AS value").writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "test")
      .option("checkpointLocation", "/local/directory")
      .outputMode("complete")
      .start()

query.awaitTermination()

My build.sbt file looks like:

name := "CQ3D"
version := "0.1"
scalaVersion := "2.11.8"
val sparkVersion = "2.3.1"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion
)

libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion

My code gives right output with the console sink, however no output is being generated or being sent to kafka topic when using kafka sink. My kafka zookeeper and kafka server are running on the same machine. Console messages are as follows:

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 --class CQ3D --master local[4] /home/salman/Development

/SparkStreaming/Scala/target/scala-2.11/cq3d_2.11-0.1.jar
Ivy Default Cache set to: /home/salman/.ivy2/cache
The jars for the packages stored in: /home/salman/.ivy2/jars
:: loading settings :: url = jar:file:/home/salman/spark-2.3.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-18e5a4df-cae8-4cf2-92bb-e02af7673888;1.0
    confs: [default]
    found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.1 in spark-list
    found org.apache.kafka#kafka-clients;0.10.0.1 in spark-list
    found net.jpountz.lz4#lz4;1.3.0 in spark-list
    found org.xerial.snappy#snappy-java;1.1.2.6 in spark-list
    found org.slf4j#slf4j-api;1.7.21 in central
    found org.spark-project.spark#unused;1.0.0 in spark-list
:: resolution report :: resolve 247ms :: artifacts dl 4ms
    :: modules in use:
    net.jpountz.lz4#lz4;1.3.0 from spark-list in [default]
    org.apache.kafka#kafka-clients;0.10.0.1 from spark-list in [default]
    org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.1 from spark-list in [default]
    org.slf4j#slf4j-api;1.7.21 from central in [default]
    org.spark-project.spark#unused;1.0.0 from spark-list in [default]
    org.xerial.snappy#snappy-java;1.1.2.6 from spark-list in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   6   |   0   |   0   |   0   ||   6   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-18e5a4df-cae8-4cf2-92bb-e02af7673888
    confs: [default]
    0 artifacts copied, 6 already retrieved (0kB/5ms)
2018-09-14 20:14:58 WARN  Utils:66 - Your hostname, salman-ubuntu-desktop resolves to a loopback address: 127.0.1.1; using 150.82.219.122 instead (on interface enp4s0)
2018-09-14 20:14:58 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2018-09-14 20:14:59 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-09-14 20:14:59 INFO  SparkContext:54 - Running Spark version 2.3.1
2018-09-14 20:14:59 INFO  SparkContext:54 - Submitted application: CQ3D
2018-09-14 20:14:59 INFO  SecurityManager:54 - Changing view acls to: salman
2018-09-14 20:14:59 INFO  SecurityManager:54 - Changing modify acls to: salman
2018-09-14 20:14:59 INFO  SecurityManager:54 - Changing view acls groups to: 
2018-09-14 20:14:59 INFO  SecurityManager:54 - Changing modify acls groups to: 
2018-09-14 20:14:59 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(salman); groups with view permissions: Set(); users  with modify permissions: Set(salman); groups with modify permissions: Set()
2018-09-14 20:14:59 INFO  Utils:54 - Successfully started service 'sparkDriver' on port 36805.

Am I using the correct import and/or libraryDependencies?

Sometimes while compiling I am getting the following warnings:

[warn] There may be incompatibilities among your library dependencies.
[warn] Run 'evicted' to see detailed eviction warnings

However, the code still compiles using "sbt package". When I execute the code using the following I am not getting any output in kafka topic?

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 --class testClass --master local[4] /home/user/Dev/Scala/target/scala-2.11/testClass_2.11-0.1.jar

Solution

  • In the Spark documentation, it mentions that for Spark Streaming from the local filesystem, files must be atomically moved into the source folder. There might be a configuration to read existing files, but I cannot recall.

    In the comments, I mentioned Kafka Connect, which is a built in framework for transferring data to Kafka, you just need to build the linked project and run Kafka Connect.

    Otherwise, tools I suggest to others are Flume if you're already using Hadoop, or Filebeat / Fluentd if you have Elasticsearch for taking files into Kafka. Basically, Spark is too much overhead for such a simple program to read from the local filesystem, and does not require any parallelism to read each file