We have one producer & one consumer & one partition. Both consumer/producer are spring boot applications. The consumer app runs on my local machine while producer along with kafka & zookeeper on a remote machine.
During development, I redeployed my producer application with some changes. But after that my consumer is not receiving any messages. I tried restarting the consumer, but no luck. What can be the issue and/or how can it be solved?
Consumer Config:
spring:
cloud:
stream:
defaultBinder: kafka
bindings:
input:
destination: sales
content-type: application/json
kafka:
binder:
brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
defaultZkPort: 2181
defaultBrokerPort: 9092
server:
port: 0
Producer Config:
cloud:
stream:
defaultBinder: kafka
bindings:
output:
destination: sales
content-type: application/json
kafka:
binder:
brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
defaultZkPort: 2181
defaultBrokerPort: 9092
EDIT2:
After 5 minutes the consumer app dies with following exception:
2017-09-12 18:14:47,254 ERROR main o.s.c.s.b.k.p.KafkaTopicProvisioner:253 - Cannot initialize Binder
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2017-09-12 18:14:47,255 WARN main o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext:550 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
2017-09-12 18:14:47,256 INFO main o.s.i.m.IntegrationMBeanExporter:449 - Unregistering JMX-exposed beans on shutdown
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:241 - Unregistering JMX-exposed beans
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: input
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: nullChannel
2017-09-12 18:14:47,258 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: errorChannel
Well, it looks like there is already a bug reported with spring-cloud-stream-binder-kafka
stating the resetOffset
property has no effect. Hence, on the consumer always requested messages with the offset as latest
.
As mentioned on the git issue, the only workaround is to fix this via the kafka consumer CLI tool.