I'm trying to stream data using Apache Kafka and Spark, but I get an error in line 24 of my code saying "Cannot resolve method "createStream" in "KafkaUtils"". I also tried using KafkaUtils.createDirectStream but that didn't work either. How do I resolve this error? Below are my files that I am using
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import scala.Tuple2;
public class NumbersConsumer {
public static final String BOOTSTRAP_SERVER = "localhost:9092";
public static final String GROUP_ID = "spark-group";
public static final Map<String, Integer> TOPICS = new HashMap<>();
public static final int BATCH_DURATION = 5000;
public static void main(String[] args) throws Exception {
TOPICS.put("Demo1", 1);
SparkConf sparkConf = new SparkConf().setAppName("demo");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, new Duration(BATCH_DURATION));
JavaPairDStream<String, String> messages = KafkaUtils.createStream(streamingContext, BOOTSTRAP_SERVER, GROUP_ID, TOPICS);
JavaPairDStream<String, String> percentage = messages.mapToPair(
num -> new Tuple2<>("Percentage: ", num._2 + "%")
);
percentage.print();
streamingContext.start();
streamingContext.awaitTermination();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>testVideoKafka</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.13</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>3.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.13</artifactId>
<version>3.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
You need to remove provided
scope from streaming-kafka dependency.
You should add it to plain spark-steaming dependency instead.
Note from docs
Do not manually add dependencies on org.apache.kafka artifacts (e.g.
kafka-clients
)
So remove that.
createDirectStream
is correct - https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
Also, Spark Streaming module for Kafka is largely considered deprecated, with Structured Streaming suggested instead (spark-sql-kafka-0-10
)