Search code examples
javaapache-kafkaspring-cloud-streamspring-kafka

Consumer does not receive messages after kafka producer/consumer restart


We have one producer & one consumer & one partition. Both consumer/producer are spring boot applications. The consumer app runs on my local machine while producer along with kafka & zookeeper on a remote machine.

During development, I redeployed my producer application with some changes. But after that my consumer is not receiving any messages. I tried restarting the consumer, but no luck. What can be the issue and/or how can it be solved?

Consumer Config:

spring:
  cloud:
    stream:
      defaultBinder: kafka
      bindings:
        input:
          destination: sales
          content-type: application/json
      kafka:
        binder:
          brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
          zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
          defaultZkPort: 2181
          defaultBrokerPort: 9092
server:
  port: 0

Producer Config:

cloud:
stream:
  defaultBinder: kafka
  bindings:
    output:
      destination: sales
      content-type: application/json
  kafka:
    binder:
      brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
      zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
      defaultZkPort: 2181
      defaultBrokerPort: 9092

EDIT2:

After 5 minutes the consumer app dies with following exception:

2017-09-12 18:14:47,254 ERROR main o.s.c.s.b.k.p.KafkaTopicProvisioner:253 - Cannot initialize Binder
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2017-09-12 18:14:47,255  WARN main o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext:550 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
2017-09-12 18:14:47,256  INFO main o.s.i.m.IntegrationMBeanExporter:449 - Unregistering JMX-exposed beans on shutdown
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:241 - Unregistering JMX-exposed beans
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: input
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: nullChannel
2017-09-12 18:14:47,258  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: errorChannel

Solution

  • Well, it looks like there is already a bug reported with spring-cloud-stream-binder-kafka stating the resetOffset property has no effect. Hence, on the consumer always requested messages with the offset as latest.

    As mentioned on the git issue, the only workaround is to fix this via the kafka consumer CLI tool.