I am trying to send stream output from Apache Spark 2.3.1 to Apache Kafka using kafka sink:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.udf
import org.apache.kafka.clients
import org.apache.spark.streaming
import java.sql.Timestamp
import java.util.Properties
object CQ3D {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("test")
.getOrCreate()
val predictionStreamSchema = new StructType()
.add("production_id", "long")
.add("type", "string")
val lines = spark
.readStream
.option("sep", ",")
.schema(testSchema)
.csv("/path/to/directory/")
val query = lines.selectExpr("CAST(production_id AS STRING) AS key", "type AS value").writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test")
.option("checkpointLocation", "/local/directory")
.outputMode("complete")
.start()
query.awaitTermination()
My build.sbt file looks like:
name := "CQ3D"
version := "0.1"
scalaVersion := "2.11.8"
val sparkVersion = "2.3.1"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion
)
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion
My code gives right output with the console sink, however no output is being generated or being sent to kafka topic when using kafka sink. My kafka zookeeper and kafka server are running on the same machine. Console messages are as follows:
./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 --class CQ3D --master local[4] /home/salman/Development
/SparkStreaming/Scala/target/scala-2.11/cq3d_2.11-0.1.jar
Ivy Default Cache set to: /home/salman/.ivy2/cache
The jars for the packages stored in: /home/salman/.ivy2/jars
:: loading settings :: url = jar:file:/home/salman/spark-2.3.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-18e5a4df-cae8-4cf2-92bb-e02af7673888;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.1 in spark-list
found org.apache.kafka#kafka-clients;0.10.0.1 in spark-list
found net.jpountz.lz4#lz4;1.3.0 in spark-list
found org.xerial.snappy#snappy-java;1.1.2.6 in spark-list
found org.slf4j#slf4j-api;1.7.21 in central
found org.spark-project.spark#unused;1.0.0 in spark-list
:: resolution report :: resolve 247ms :: artifacts dl 4ms
:: modules in use:
net.jpountz.lz4#lz4;1.3.0 from spark-list in [default]
org.apache.kafka#kafka-clients;0.10.0.1 from spark-list in [default]
org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.1 from spark-list in [default]
org.slf4j#slf4j-api;1.7.21 from central in [default]
org.spark-project.spark#unused;1.0.0 from spark-list in [default]
org.xerial.snappy#snappy-java;1.1.2.6 from spark-list in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 6 | 0 | 0 | 0 || 6 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-18e5a4df-cae8-4cf2-92bb-e02af7673888
confs: [default]
0 artifacts copied, 6 already retrieved (0kB/5ms)
2018-09-14 20:14:58 WARN Utils:66 - Your hostname, salman-ubuntu-desktop resolves to a loopback address: 127.0.1.1; using 150.82.219.122 instead (on interface enp4s0)
2018-09-14 20:14:58 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2018-09-14 20:14:59 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-09-14 20:14:59 INFO SparkContext:54 - Running Spark version 2.3.1
2018-09-14 20:14:59 INFO SparkContext:54 - Submitted application: CQ3D
2018-09-14 20:14:59 INFO SecurityManager:54 - Changing view acls to: salman
2018-09-14 20:14:59 INFO SecurityManager:54 - Changing modify acls to: salman
2018-09-14 20:14:59 INFO SecurityManager:54 - Changing view acls groups to:
2018-09-14 20:14:59 INFO SecurityManager:54 - Changing modify acls groups to:
2018-09-14 20:14:59 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(salman); groups with view permissions: Set(); users with modify permissions: Set(salman); groups with modify permissions: Set()
2018-09-14 20:14:59 INFO Utils:54 - Successfully started service 'sparkDriver' on port 36805.
Am I using the correct import and/or libraryDependencies?
Sometimes while compiling I am getting the following warnings:
[warn] There may be incompatibilities among your library dependencies.
[warn] Run 'evicted' to see detailed eviction warnings
However, the code still compiles using "sbt package". When I execute the code using the following I am not getting any output in kafka topic?
./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 --class testClass --master local[4] /home/user/Dev/Scala/target/scala-2.11/testClass_2.11-0.1.jar
In the Spark documentation, it mentions that for Spark Streaming from the local filesystem, files must be atomically moved into the source folder. There might be a configuration to read existing files, but I cannot recall.
In the comments, I mentioned Kafka Connect, which is a built in framework for transferring data to Kafka, you just need to build the linked project and run Kafka Connect.
Otherwise, tools I suggest to others are Flume if you're already using Hadoop, or Filebeat / Fluentd if you have Elasticsearch for taking files into Kafka. Basically, Spark is too much overhead for such a simple program to read from the local filesystem, and does not require any parallelism to read each file