Search code examples
javaapache-kafkaapache-kafka-streamsslf4j

My Kafka streaming application just exit with code 0 doing nothing


In order to try the Kafka stream I did this :

public static void main(String[] args) {

        final StreamsBuilder builder = new StreamsBuilder();

        final Properties streamsConfiguration = new Properties();

        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "generic-avro-integration-test");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.BOOTSTRAP_SERVER);
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringDeserializer.class);
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaAvroDeserializer.class);
        streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, Utils.SCHEMA_REGISTRY_URL);
        
        builder.stream(Utils.ALL_FX_EVENTS_TOPIC).foreach((key, value) -> System.out.println(key));
        
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
        kafkaStreams.start();
    }

But when I run it locally I only get this :

enter image description here

Basically I run it from my IDE and after 1 sec it just stops while it should be waiting for new events pushed in the topic.

I don't understand.

The kafka topic is on another machine, but I coded a very simple consumer as well and I was able to read the messages from this remote topic.

For some reason this very simple kafka stream app exit with the code 0. There is not much I can do, any idea ?

Since the issue seems to be related to the slf4j dependencies here the pom :

4.0.0 jar
<name>Ingestor :: Bigdata :: Ingestor</name>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

<parent>
    <groupId>com.parent-pom</groupId>
    <artifactId>parent-pom</artifactId>
    <version>4.0.2</version>
</parent>

<groupId>com</groupId>
<artifactId>ingestor</artifactId>
<version>1.0.1-SNAPSHOT</version>


<properties>
    
    <sq.artifact.type>internal</sq.artifact.type>
    <maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
    <maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
    <maven.compiler.source>7</maven.compiler.source>
    <maven.compiler.target>7</maven.compiler.target>
    <revision>1.0.0-SNAPSHOT</revision>
    <sq.scs>fx-dan</sq.scs>
</properties>

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>3.1.0</version>
    </dependency>
    <dependency>
        <groupId>com</groupId>
        <artifactId>libs-schemas</artifactId>
        <version>1.0.6-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-avro-serializer</artifactId>
        <version>7.0.1</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-schema-registry-client</artifactId>
        <version>7.0.1</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>common-config</artifactId>
        <version>7.0.1</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>common-utils</artifactId>
        <version>7.0.1</version>
    </dependency>
    <dependency>
        <groupId>io.confluent</groupId>
        <artifactId>kafka-schema-serializer</artifactId>
        <version>7.0.1</version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>31.0.1-jre</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.30</version>
    </dependency>
</dependencies>

Update :

Beside the missing log4j properties file the error was due a miss configuration for the Serdes.

The updated code looks like this :

public static void main(String[] args) {

        final StreamsBuilder builder = new StreamsBuilder();

        final Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "generic-avro-integration-test");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.BOOTSTRAP_SERVER);
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, Utils.SCHEMA_REGISTRY_URL);

        final Serde<String> stringSerde = Serdes.String();
        final Serde<AllTypesFxEvents> specificAvroSerde = new SpecificAvroSerde<>();

        final boolean isKeySerde = false;
        specificAvroSerde.configure(Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, Utils.SCHEMA_REGISTRY_URL),
                isKeySerde);

builder.stream(Utils.ALL_FX_EVENTS_TOPIC).foreach((key, value) -> System.out.println(key));

KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
        kafkaStreams.cleanUp();
        kafkaStreams.start();

I don't have the exception stack anymore but this link helped me to fix it.

proper guide for java kafka stream with avro schema registry


Solution

  • I fixed my issues and here is a summary.

    The first issue was that I didn't set up slf4j properly. Kafka expect to have slf4j available because this is what it uses to print the error it encounters. Not having it was silently failing my program.

    To resolve this you need to add a log4j.properties file in your resources folder (in your src folder).

    Mine looked like this (it doesn't print DEBUG logs):

    log4j.rootLogger=INFO, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
    

    Once I had the logs, I had a stack trace about serialization error. I don't have the logs anymore but the other SO post that made me fix it is :

    Use Kafka Streams with Avro Schema Registry

    Basically I was using the wrong object for the Serde configuration.

    The updated code looks like this :

    public static void main(String[] args) {

        final StreamsBuilder builder = new StreamsBuilder();
    
        final Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "generic-avro-integration-test");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.BOOTSTRAP_SERVER);
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, Utils.SCHEMA_REGISTRY_URL);
    
        final Serde<String> stringSerde = Serdes.String();
        final Serde<AllTypesFxEvents> specificAvroSerde = new SpecificAvroSerde<>();
    
        final boolean isKeySerde = false;
        specificAvroSerde.configure(Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, Utils.SCHEMA_REGISTRY_URL),
                isKeySerde);
    
    builder.stream(Utils.ALL_FX_EVENTS_TOPIC).foreach((key, value) -> System.out.println(key));
    
    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
            kafkaStreams.cleanUp();
            kafkaStreams.start();