I couldn't start my Kafka streams application. I was able to when I was depending on Confluent Kafka cloud, but when I did the switch to Kafka locally on docker it doesn't start anymore.
docker-compose:
# https://docs.confluent.io/current/installation/docker/config-reference.html
# https://github.com/confluentinc/cp-docker-images
version: "3"
services:
zookeeper:
container_name: local-zookeeper
image: confluentinc/cp-zookeeper:5.5.1
ports:
- 2181:2181
hostname: zookeeper
networks:
- local_kafka_network
environment:
- ZOOKEEPER_CLIENT_PORT=2181
kafka:
container_name: local-kafka
image: confluentinc/cp-kafka:5.5.1
depends_on:
- zookeeper
ports:
- 9092:9092
- 29092:29092
hostname: kafka
networks:
- local_kafka_network
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
schema-registry:
container_name: local-schema-registry
image: confluentinc/cp-schema-registry:5.5.1
depends_on:
- kafka
ports:
- 8081:8081
hostname: schema-registry
networks:
- local_kafka_network
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
- SCHEMA_REGISTRY_DEBUG=true
command:
- /bin/bash
- -c
- |
# install jq
curl -sL https://github.com/stedolan/jq/releases/download/jq-1.6/jq-linux64 -o /usr/local/bin/jq && chmod u+x /usr/local/bin/jq
# start
/etc/confluent/docker/run
schema-registry-ui:
container_name: local-schema-registry-ui
image: landoop/schema-registry-ui:latest
depends_on:
- schema-registry
ports:
- 8001:8000
hostname: schema-registry-ui
networks:
- local_kafka_network
environment:
- SCHEMAREGISTRY_URL=http://schema-registry:8081
- PROXY=true
kafka-rest:
container_name: local-kafka-rest
image: confluentinc/cp-kafka-rest:5.5.1
depends_on:
- kafka
- schema-registry
ports:
- 8082:8082
hostname: kafka-rest
networks:
- local_kafka_network
environment:
- KAFKA_REST_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_REST_LISTENERS=http://kafka-rest:8082
- KAFKA_REST_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- KAFKA_REST_HOST_NAME=kafka-rest
kafka-ui:
container_name: local-kafka-ui
image: landoop/kafka-topics-ui:latest
depends_on:
- kafka-rest
ports:
- 8000:8000
hostname: kafka-ui
networks:
- local_kafka_network
environment:
- KAFKA_REST_PROXY_URL=http://kafka-rest:8082
- PROXY=true
# https://github.com/confluentinc/ksql/blob/4.1.3-post/docs/tutorials/docker-compose.yml#L85
ksql-server:
container_name: local-ksql-server
# TODO update 5.5.1
image: confluentinc/cp-ksql-server:5.4.2
depends_on:
- kafka
- schema-registry
ports:
- 8088:8088
hostname: ksql-server
networks:
- local_kafka_network
environment:
- KSQL_BOOTSTRAP_SERVERS=kafka:29092
- KSQL_LISTENERS=http://ksql-server:8088
- KSQL_KSQL_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- KSQL_KSQL_SERVICE_ID=local-ksql-server
ksql-cli:
container_name: local-ksql-cli
# TODO update 5.5.1
image: confluentinc/cp-ksql-cli:5.4.2
depends_on:
- ksql-server
hostname: ksql-cli
networks:
- local_kafka_network
entrypoint: /bin/sh
tty: true
# distributed mode
kafka-connect:
container_name: local-kafka-connect
image: confluentinc/cp-kafka-connect:5.5.1
depends_on:
- kafka
- schema-registry
ports:
- 8083:8083
hostname: kafka-connect
networks:
- local_kafka_network
environment:
- CONNECT_BOOTSTRAP_SERVERS=kafka:29092
- CONNECT_REST_ADVERTISED_HOST_NAME=kafka-connect
- CONNECT_REST_PORT=8083
- CONNECT_GROUP_ID=local-connect-group
- CONNECT_CONFIG_STORAGE_TOPIC=local-connect-configs
- CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
- CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
- CONNECT_OFFSET_STORAGE_TOPIC=local-connect-offsets
- CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
- CONNECT_STATUS_STORAGE_TOPIC=local-connect-status
- CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
- CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- CONNECT_PLUGIN_PATH=/usr/share/java
volumes:
- "./local/connect/data:/data"
command:
- /bin/bash
- -c
- |
# install unzip
apt-get update && apt-get install unzip -y
# install plugin
unzip /data/jcustenborder-kafka-connect-spooldir-*.zip 'jcustenborder-kafka-connect-spooldir-*/lib/*' -d /usr/share/java/kafka-connect-spooldir/
mv /usr/share/java/kafka-connect-spooldir/*/lib/* /usr/share/java/kafka-connect-spooldir
ls -la /usr/share/java
# setup spooldir plugin
mkdir -p /tmp/error /tmp/finished
# start
/etc/confluent/docker/run
kafka-connect-ui:
container_name: local-kafka-connect-ui
image: landoop/kafka-connect-ui:latest
depends_on:
- kafka-connect
ports:
- 8002:8000
hostname: kafka-connect-ui
networks:
- local_kafka_network
environment:
- CONNECT_URL=http://kafka-connect:8083
networks:
local_kafka_network:
Main method:
package io.confluent.developer.time.solution;
import io.confluent.developer.StreamsUtils;
import io.confluent.developer.avro.ElectronicOrder;
import io.confluent.developer.time.TopicLoader;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.TimestampExtractor;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
public class StreamsTimestampExtractor {
static class OrderTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
ElectronicOrder order = (ElectronicOrder)record.value();
System.out.println("Extracting time of " + order.getTime() + " from " + order);
return order.getTime();
}
}
public static void main(String[] args) throws IOException, InterruptedException {
final Properties streamsProps = StreamsUtils.loadProperties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "extractor-windowed-streams");
StreamsBuilder builder = new StreamsBuilder();
final String inputTopic = streamsProps.getProperty("extractor.input.topic");
final String outputTopic = streamsProps.getProperty("extractor.output.topic");
final Map<String, Object> configMap = StreamsUtils.propertiesToMap(streamsProps);
final SpecificAvroSerde<ElectronicOrder> electronicSerde =
StreamsUtils.getSpecificAvroSerde(configMap);
final KStream<String, ElectronicOrder> electronicStream =
builder.stream(inputTopic,
Consumed.with(Serdes.String(), electronicSerde)
.withTimestampExtractor(new OrderTimestampExtractor()))
.peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value));
electronicStream.groupByKey().windowedBy(TimeWindows.of(Duration.ofHours(1)))
.aggregate(() -> 0.0,
(key, order, total) -> total + order.getPrice(),
Materialized.with(Serdes.String(), Serdes.Double()))
.toStream()
.map((wk, value) -> KeyValue.pair(wk.key(),value))
.peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Double()));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
TopicLoader.runProducer();
kafkaStreams.start();
}
}
Running the code in my machine produces records but exits immediately:
Note that I was able to process the continuous stream of data when I was running this exact code with confluent Kafka cloud.
To reproduce locally, all you need is to get the code from this confluent tutorial, modify the properties file to point to the local Kafka broker, and use the docker-compose I provided for setting up Kafka.
Adding a shutdown hook and uncaught exception handler helped me diagnose and fix the issue:
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
TopicLoader.runProducer();
kafkaStreams.setUncaughtExceptionHandler(e -> {
log.error("unhandled streams exception, shutting down.", e);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
});
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("Runtime shutdown hook, state={}", kafkaStreams.state());
if (kafkaStreams.state().isRunningOrRebalancing()) {
log.info("Shutting down started.");
kafkaStreams.close(Duration.ofMinutes(2));
log.info("Shutting down completed.");
}
}));
kafkaStreams.start();
Turns out I’ve configured a replication factor of 1 in the broker while in my properties file I had 3, so the exception was: Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
So the solution for me was to decrease the replication.factor
from 3 to 1 in my properties file.