Search code examples
javaapache-sparkapache-kafkaspark-structured-streaming

How can I check I get data from Kafka in Spark-structured-streaming with Java?


I'm trying to get data from kafka to spark-structured-streaming, but I can't check whether I am doing well or not. I want to print data from kafka on console, but nothing comes on console. It may because of huge size of data from Kafka, but I don't have idea.

I am using Windows 10. I checked that port for kafka is established by "netstat -an | findstr TARGET_IP". TARGET_IP means kafka producer's IP. By PID from above result, I checked "tasklist /FI "PID eq 5406"". 5406 is PID of java.exe, and used memory for PID 5406 is continuously increasing.

public static void main( String[] args ) {
    SparkSession spark = SparkSession.builder()
            .master("local")
            .appName("App").getOrCreate();
    Dataset<Row> df = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "TARGET_IP:TARGET_PORT")
            .option("subscribe", "TARGET_TOPIC")
            .option("startingOffsets", "earliest")
            .load();
    df.printSchema();
    StreamingQuery queryone = df.writeStream().trigger(Trigger.ProcessingTime(1000)).format("console").start();
    try {
        queryone.awaitTermination();
    } catch (StreamingQueryException e) {
        e.printStackTrace();
    }
}

Solution

  • I test your code, it's can print.

    First you should check your kafka topic, ensure have message in it.

    Then check you spark app,ensure it can connect your kafka broker.