Search code examples
javaapache-kafkadocker-composeconfluent-schema-registrytestcontainers

Testcontainers SchemaRegistry can't connect to Kafka container


I want to run integration tests that test my kafka listener and avro serialization. this requires a Kafka and a Schema regsitry (transitively also a Zookeeper).

When testing I currently have to a docker-compose.yml, but I want to reduce user error by building the required containers via testcontainers. The Kafka and Zookeeper instances get started neatly and seem to work just fine - my application can create the required topics and the listener is subscribed as well, I can even send messages via kafka console producer.

What does not work is the SchemaRegistry. The container starts, apparently connects to the ZK but cannot establish a connection to the broker. It retries connecting for some time until it times out and subsequently the container is stopped. I therefore cannot register and read my avro schematas for (De-)Serialization in my test which fail because of this.

I can't find the reason why the SR can apparently connect to the ZK but cant find my broker.

Did someone run into this problem as well? Did you manage to get this running? If so, how so? I need Kafka and the Schema Registry testcontainers to be fully available for my tests, so omitting any of them is not an option.

I could also keep using the docker-compose.yml but I would really like to setup my test environment fully programmatically.

The schema registry container logs the following:

2023-02-08 16:56:09 [2023-02-08 15:56:09,556] INFO Session establishment complete on server zookeeper/192.168.144.2:2181, session id = 0x1000085b81e0003, negotiated timeout = 40000 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO Session: 0x1000085b81e0003 closed (org.apache.zookeeper.ZooKeeper)
2023-02-08 16:56:09 [2023-02-08 15:56:09,696] INFO EventThread shut down for session: 0x1000085b81e0003 (org.apache.zookeeper.ClientCnxn)
2023-02-08 16:56:09 [2023-02-08 15:56:09,787] INFO AdminClientConfig values:
/*  Omitted for brevity  */
(org.apache.kafka.clients.admin.AdminClientConfig)
2023-02-08 16:56:10 [2023-02-08 15:56:10,284] INFO Kafka version: 7.3.1-ccs (org.apache.kafka.common.utils.AppInfoParser)
2023-02-08 16:56:10 [2023-02-08 15:56:10,284] INFO Kafka commitId: 8628b0341c3c4676 (org.apache.kafka.common.utils.AppInfoParser)
2023-02-08 16:56:10 [2023-02-08 15:56:10,284] INFO Kafka startTimeMs: 1675871770281 (org.apache.kafka.common.utils.AppInfoParser)
2023-02-08 16:56:10 [2023-02-08 15:56:10,308] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:10 [2023-02-08 15:56:10,313] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:54776) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
/*  These lines repeat a few times until the container times out and exits.  */
2023-02-08 16:56:50 [2023-02-08 15:56:50,144] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:50 [2023-02-08 15:56:50,144] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:54776) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:50 [2023-02-08 15:56:50,298] ERROR Error while getting broker list. (io.confluent.admin.utils.ClusterStatus)
2023-02-08 16:56:50 java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-02-08 16:56:50     at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
2023-02-08 16:56:50     at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
2023-02-08 16:56:50     at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
2023-02-08 16:56:50     at io.confluent.admin.utils.ClusterStatus.isKafkaReady(ClusterStatus.java:147)
2023-02-08 16:56:50     at io.confluent.admin.utils.cli.KafkaReadyCommand.main(KafkaReadyCommand.java:149)
2023-02-08 16:56:50 Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes
2023-02-08 16:56:51 [2023-02-08 15:56:51,103] INFO [AdminClient clientId=adminclient-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:51 [2023-02-08 15:56:51,103] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:54776) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
2023-02-08 16:56:51 [2023-02-08 15:56:51,300] INFO Expected 1 brokers but found only 0. Trying to query Kafka for metadata again ... (io.confluent.admin.utils.ClusterStatus)
2023-02-08 16:56:51 [2023-02-08 15:56:51,300] ERROR Expected 1 brokers but found only 0. Brokers found []. (io.confluent.admin.utils.ClusterStatus)
2023-02-08 16:56:51 Using log4j config /etc/schema-registry/log4j.properties

My base test class. ITs that need Kafka extend this class

@Testcontainers
@SpringBootTest
@Slf4j
public class AbstractIT {

  private static final Network network = Network.newNetwork();

  protected static GenericContainer ZOOKEEPER = new GenericContainer<>(
      DockerImageName.parse("confluentinc/cp-zookeeper:7.2.0"))
      .withNetwork(network)
      .withNetworkAliases("zookeeper")
      .withEnv(Map.of(
          "ZOOKEEPER_CLIENT_PORT", "2181",
          "ZOOKEEPER_TICK_TIME", "2000"));
  
  protected static final KafkaContainer KAFKA = new KafkaContainer(
      DockerImageName.parse("confluentinc/cp-kafka"))
      .withExternalZookeeper("zookeeper:2181")
      .dependsOn(ZOOKEEPER)
      .withNetwork(network)
      .withNetworkAliases("broker");

  protected static final GenericContainer SCHEMAREGSISTRY = new GenericContainer<>(
      DockerImageName.parse("confluentinc/cp-schema-registry"))
      .dependsOn(ZOOKEEPER, KAFKA)
      .withEnv(Map.of(
          "SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL", "zookeeper:2181",
          "SCHEMA_REGISTRY_HOST_NAME", "schemaregistry",
          "SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085",
          "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "broker:9092"))
      .withNetwork(network)
      .withNetworkAliases("schemaregistry");

  @DynamicPropertySource
  static void registerPgProperties(DynamicPropertyRegistry registry) {
    registry.add("bootstrap.servers", KAFKA::getBootstrapServers);
    registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
    registry.add("spring.kafka.consumer.auto-offset-reset", () -> "earliest");
    registry.add("spring.data.mongodb.uri", MONGODB::getConnectionString);
    registry.add("spring.data.mongodb.database", () ->"test");
  }

//container startup, shutdown as well as topic creation omitted for brevity

}

My docker-compose.yml that I want to replicate with testcontainers

version: "3.5"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.2.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.2.0
    hostname: broker
    container_name: broker
    restart: always
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_SCHEMA_REGISTRY_URL: "schemaregistry:8085"


  schemaregistry:
    container_name: schemaregistry
    hostname: schemaregistry
    image: confluentinc/cp-schema-registry:5.1.2
    restart: always
    depends_on:
      - zookeeper
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
      SCHEMA_REGISTRY_HOST_NAME: schemaregistry
      SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085"
    ports:
      - "8085:8085"
    volumes:
      - "./src/main/avro/:/etc/schema"

Solution

  • Here is working setup of Kafka & Schema Registry for Testcontainers. It could help to find the issue in your setup

    Schema Registry

    public class SchemaRegistryContainer extends GenericContainer<SchemaRegistryContainer> {
        public static final String SCHEMA_REGISTRY_IMAGE =
                "confluentinc/cp-schema-registry";
        public static final int SCHEMA_REGISTRY_PORT = 8081;
    
        public SchemaRegistryContainer() {
            this(CONFLUENT_PLATFORM_VERSION);
        }
    
        public SchemaRegistryContainer(String version) {
            super(SCHEMA_REGISTRY_IMAGE + ":" + version);
    
            waitingFor(Wait.forHttp("/subjects").forStatusCode(200));
            withExposedPorts(SCHEMA_REGISTRY_PORT);
        }
    
        public SchemaRegistryContainer withKafka(KafkaContainer kafka) {
            return withKafka(kafka.getNetwork(), kafka.getNetworkAliases().get(0) + ":9092");
        }
    
        public SchemaRegistryContainer withKafka(Network network, String bootstrapServers) {
            withNetwork(network);
            withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry");
            withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081");
            withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + bootstrapServers);
            return self();
        }
    }
    

    Kafka + Schema Registry

        public static final String CONFLUENT_PLATFORM_VERSION = "5.5.1";
    
        private static final Network KAFKA_NETWORK = Network.newNetwork();
        private static final DockerImageName KAFKA_IMAGE = DockerImageName.parse("confluentinc/cp-kafka")
                .withTag(CONFLUENT_PLATFORM_VERSION);
        private static final KafkaContainer KAFKA = new KafkaContainer(KAFKA_IMAGE)
                .withNetwork(KAFKA_NETWORK)
                .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
                .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1");
    
        private static final SchemaRegistryContainer SCHEMA_REGISTRY =
                new SchemaRegistryContainer(CONFLUENT_PLATFORM_VERSION);
        
        @BeforeAll
        static void startKafkaContainer() {
            KAFKA.start();
            SCHEMA_REGISTRY.withKafka(KAFKA).start();
    
            // init kafka properties for consumer or producer
            ....
            kafkaProperties.setBootstrapServers(KAFKA.getBootstrapServers());
            kafkaProperties.setSchemaRegistryUrl("http://" + SCHEMA_REGISTRY.getHost() + ":" + SCHEMA_REGISTRY.getFirstMappedPort());
        }