I have a Spring Cloud Stream project with Actuator and the Kafka binder. I am exploring the bindings/
actuator and am trying to stop a producer as an exercise. I make the following POST request via curl:
curl -v 'localhost:8081/actuator/bindings/producer-out-0' -H 'content-type: application/json' -d '{"state": "STOPPED"}'
Actual Results:
The query returns 204. The state of the producer (seen from GET /actuator/bindings/producer-out-0) is now stopped
. The producer is still producing messages, however, which can be seen from both logging and consumer activity on the topic.
Expected Results: I expected the producer to stop producing messages. (I have also tried using the PAUSED state, which also returns 204, but error logs indicate that this producer cannot be paused.)
Do I misunderstand how this actuator works? When a producer is stopped, is it expected that S.C.S. will continue to poll that producer? The only documentation I am aware of is here, but it doesn't answer my questions as far as I can tell.
Background:
I am using spring-boot-starter-parent 2.5.3 and have starter-web and starter-actuator listed as dependencies. I don't think I'm missing any.
This is the producer/consumer pair. As you can see I am using a pollable supplier.
@Configuration
@Profile("numbers")
public class NumberHandlers {
private static final Logger LOGGER = LoggerFactory.getLogger(NumberHandlers.class);
@Bean
public Supplier<Integer> producer() {
// Needed an effectively-final mutable integer. Side-bar comments welcome. :P
var counter = new AtomicInteger();
return () -> {
var n = counter.getAndIncrement();
LOGGER.info("Producing number: " + n);
return n;
};
}
@Bean
public Consumer<Integer> consumer() {
return it -> LOGGER.info("Consuming number: " + it);
}
}
These are active when I pass in the numbers
profile. My configurations are below.
application.yml:
server:
port: 8081
spring:
cloud:
stream:
kafka:
binder:
brokers: ${env.kafka.bootstrapservers:localhost}
management:
endpoints:
web:
exposure:
include: 'bindings'
... and application-numbers.yml:
spring:
cloud:
stream:
poller:
fixedDelay: 5000
bindings:
producer-out-0:
destination: numbers-raw
producer:
partitionCount: 3
consumer-in-0:
destination: numbers-raw
kafka:
bindings:
producer-out-0:
producer:
topic.properties:
# These look weird because they're done as an exercise.
retention.bytes: 10000
retention.ms: 172800000
function:
definition: producer;consumer
I am testing in a localhost environment using a docker-compose kafka and zookeeper on the host network.
Thanks!
Lifecycle control of producer bindings is not currently supported, only consumer bindings.