Search code examples
dockerapache-kafkaapache-kafka-streams

kafka streams doesn't start up


I couldn't start my Kafka streams application. I was able to when I was depending on Confluent Kafka cloud, but when I did the switch to Kafka locally on docker it doesn't start anymore.

docker-compose:

# https://docs.confluent.io/current/installation/docker/config-reference.html
# https://github.com/confluentinc/cp-docker-images

version: "3"

services:

  zookeeper:
    container_name: local-zookeeper
    image: confluentinc/cp-zookeeper:5.5.1
    ports:
      - 2181:2181
    hostname: zookeeper
    networks:
      - local_kafka_network
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181

  kafka:
    container_name: local-kafka
    image: confluentinc/cp-kafka:5.5.1
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
      - 29092:29092
    hostname: kafka
    networks:
      - local_kafka_network
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1

  schema-registry:
    container_name: local-schema-registry
    image: confluentinc/cp-schema-registry:5.5.1
    depends_on:
      - kafka
    ports:
      - 8081:8081
    hostname: schema-registry
    networks:
      - local_kafka_network
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
      - SCHEMA_REGISTRY_DEBUG=true
    command:
      - /bin/bash
      - -c
      - |
        # install jq
        curl -sL https://github.com/stedolan/jq/releases/download/jq-1.6/jq-linux64 -o /usr/local/bin/jq && chmod u+x /usr/local/bin/jq
        # start
        /etc/confluent/docker/run

  schema-registry-ui:
    container_name: local-schema-registry-ui
    image: landoop/schema-registry-ui:latest
    depends_on:
      - schema-registry
    ports:
      - 8001:8000
    hostname: schema-registry-ui
    networks:
      - local_kafka_network
    environment:
      - SCHEMAREGISTRY_URL=http://schema-registry:8081
      - PROXY=true

  kafka-rest:
    container_name: local-kafka-rest
    image: confluentinc/cp-kafka-rest:5.5.1
    depends_on:
      - kafka
      - schema-registry
    ports:
      - 8082:8082
    hostname: kafka-rest
    networks:
      - local_kafka_network
    environment:
      - KAFKA_REST_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_REST_LISTENERS=http://kafka-rest:8082
      - KAFKA_REST_SCHEMA_REGISTRY_URL=http://schema-registry:8081
      - KAFKA_REST_HOST_NAME=kafka-rest

  kafka-ui:
    container_name: local-kafka-ui
    image: landoop/kafka-topics-ui:latest
    depends_on:
      - kafka-rest
    ports:
      - 8000:8000
    hostname: kafka-ui
    networks:
      - local_kafka_network
    environment:
      - KAFKA_REST_PROXY_URL=http://kafka-rest:8082
      - PROXY=true

  # https://github.com/confluentinc/ksql/blob/4.1.3-post/docs/tutorials/docker-compose.yml#L85
  ksql-server:
    container_name: local-ksql-server
    # TODO update 5.5.1
    image: confluentinc/cp-ksql-server:5.4.2
    depends_on:
      - kafka
      - schema-registry
    ports:
      - 8088:8088
    hostname: ksql-server
    networks:
      - local_kafka_network
    environment:
      - KSQL_BOOTSTRAP_SERVERS=kafka:29092
      - KSQL_LISTENERS=http://ksql-server:8088
      - KSQL_KSQL_SCHEMA_REGISTRY_URL=http://schema-registry:8081
      - KSQL_KSQL_SERVICE_ID=local-ksql-server

  ksql-cli:
    container_name: local-ksql-cli
    # TODO update 5.5.1
    image: confluentinc/cp-ksql-cli:5.4.2
    depends_on:
      - ksql-server
    hostname: ksql-cli
    networks:
      - local_kafka_network
    entrypoint: /bin/sh
    tty: true

  # distributed mode
  kafka-connect:
    container_name: local-kafka-connect
    image: confluentinc/cp-kafka-connect:5.5.1
    depends_on:
      - kafka
      - schema-registry
    ports:
      - 8083:8083
    hostname: kafka-connect
    networks:
      - local_kafka_network
    environment:
      - CONNECT_BOOTSTRAP_SERVERS=kafka:29092
      - CONNECT_REST_ADVERTISED_HOST_NAME=kafka-connect
      - CONNECT_REST_PORT=8083
      - CONNECT_GROUP_ID=local-connect-group
      - CONNECT_CONFIG_STORAGE_TOPIC=local-connect-configs
      - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
      - CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
      - CONNECT_OFFSET_STORAGE_TOPIC=local-connect-offsets
      - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
      - CONNECT_STATUS_STORAGE_TOPIC=local-connect-status
      - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
      - CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
      - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
      - CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
      - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
      - CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - CONNECT_PLUGIN_PATH=/usr/share/java
    volumes:
      - "./local/connect/data:/data"
    command:
      - /bin/bash
      - -c
      - |
        # install unzip
        apt-get update && apt-get install unzip -y
        # install plugin
        unzip /data/jcustenborder-kafka-connect-spooldir-*.zip 'jcustenborder-kafka-connect-spooldir-*/lib/*' -d /usr/share/java/kafka-connect-spooldir/
        mv /usr/share/java/kafka-connect-spooldir/*/lib/* /usr/share/java/kafka-connect-spooldir
        ls -la /usr/share/java
        # setup spooldir plugin
        mkdir -p /tmp/error /tmp/finished
        # start
        /etc/confluent/docker/run

  kafka-connect-ui:
    container_name: local-kafka-connect-ui
    image: landoop/kafka-connect-ui:latest
    depends_on:
      - kafka-connect
    ports:
      - 8002:8000
    hostname: kafka-connect-ui
    networks:
      - local_kafka_network
    environment:
      - CONNECT_URL=http://kafka-connect:8083

networks:
  local_kafka_network:

Main method:

package io.confluent.developer.time.solution;

import io.confluent.developer.StreamsUtils;
import io.confluent.developer.avro.ElectronicOrder;
import io.confluent.developer.time.TopicLoader;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.TimestampExtractor;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;

public class StreamsTimestampExtractor {

    static class OrderTimestampExtractor implements TimestampExtractor {
        @Override
        public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
            ElectronicOrder order = (ElectronicOrder)record.value();
            System.out.println("Extracting time of " + order.getTime() + " from " + order);
            return order.getTime();
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {

        final Properties streamsProps = StreamsUtils.loadProperties();
        streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "extractor-windowed-streams");

        StreamsBuilder builder = new StreamsBuilder();
        final String inputTopic = streamsProps.getProperty("extractor.input.topic");
        final String outputTopic = streamsProps.getProperty("extractor.output.topic");
        final Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);

        final SpecificAvroSerde<ElectronicOrder> electronicSerde =
                StreamsUtils.getSpecificAvroSerde(configMap);

        final KStream<String, ElectronicOrder> electronicStream =
                builder.stream(inputTopic,
                Consumed.with(Serdes.String(), electronicSerde)
                        .withTimestampExtractor(new OrderTimestampExtractor()))
                        .peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));

        electronicStream.groupByKey().windowedBy(TimeWindows.of(Duration.ofHours(1)))
                .aggregate(() -> 0.0,
                        (key, order, total) -> total + order.getPrice(),
                        Materialized.with(Serdes.String(), Serdes.Double()))
                .toStream()
                .map((wk, value) -> KeyValue.pair(wk.key(),value))
                .peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
                .to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));

        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
        TopicLoader.runProducer();
        kafkaStreams.start();

    }
}

Running the code in my machine produces records but exits immediately: enter image description here

Note that I was able to process the continuous stream of data when I was running this exact code with confluent Kafka cloud.

To reproduce locally, all you need is to get the code from this confluent tutorial, modify the properties file to point to the local Kafka broker, and use the docker-compose I provided for setting up Kafka.


Solution

  • Adding a shutdown hook and uncaught exception handler helped me diagnose and fix the issue:

            KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
            TopicLoader.runProducer();
            kafkaStreams.setUncaughtExceptionHandler(e -> {
                log.error("unhandled streams exception, shutting down.", e);
                return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
            });
    
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                log.info("Runtime shutdown hook, state={}", kafkaStreams.state());
                if (kafkaStreams.state().isRunningOrRebalancing()) {
                    log.info("Shutting down started.");
                    kafkaStreams.close(Duration.ofMinutes(2));
                    log.info("Shutting down completed.");
                }
            }));
            kafkaStreams.start();
    

    Turns out I’ve configured a replication factor of 1 in the broker while in my properties file I had 3, so the exception was: Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1. So the solution for me was to decrease the replication.factor from 3 to 1 in my properties file.