I have a Spring Cloud Data Flow (SCDF)
server running on Kubernetes cluster
with Kafka
as the message broker. Now I am trying to launch a Spring Cloud Task (SCT)
that writes to a topic in Kafka
. I would like the SCT to use the same Kafka
that SCDF
is using. This brings up two questions that I have and hope they can be answered:
- How to configure the SCT to use the same Kafka as SCDF?
- Is it possible to configure the SCT so that the Kafka server uri can be passed to the SCT automatically when it launches, similar to the data source properties that get passed to the SCT at launch?
As I could not find any examples on how to achieve this, help is very appreciated.
Edit: My own answer
This is how I get it working for my case. My SCT requires spring.kafka.bootstrap-servers
to be supplied. From SCDF's shell, I provide it as an argument --spring.kafka.bootstrap-servers=${KAFKA_SERVICE_HOST}:${KAFKA_SERVICE_PORT}
, where KAFKA_SERVICE_HOST
and KAFKA_SERVICE_PORT
are environment variables created by SCDF's k8s setup script.
This is how to launch the task within SCDF's shell
dataflow:>task launch --name sample-task --arguments "--spring.kafka.bootstrap-servers=${KAFKA_SERVICE_HOST}:${KAFKA_SERVICE_PORT}"
You may want to review the Spring Cloud Task Events
section in the reference guide.
The expectation is that you'd choose the binder of choice and pack that library in the Task application's classpath. With that dependency, you'd then configure the application with Spring Cloud Stream's Kafka binder properties such as the spring.cloud.stream.kafka.binder.brokers
and others that are relevant to connect to the existing Kafka cluster.
Upon launching the Task application (from SCDF) with these configurations, you'd be able to publish or receive events in your Task app.
Alternatively, with the Kafka-binder in the classpath of the Task application, you can define the Kafka binder properties to all the Task's launched by SCDF via global configuration. See Common Application Properties
in the ref. guide for more information. In this model, you don't have to configure each of the Task application with Kafka properties explicitly, but instead, SCDF would propagate them automatically when it launches the Tasks. Keep in mind that these properties would be supplied to all the Task launches.