I have been working on creating a simple custom processor in Scala for Spring Cloud Data Flow and have been running into issues with sending/receiving data from/to starter applications. I have been unable to see any messages propagating through the stream. The definition of the stream is time --trigger.time-unit=SECONDS | pass-through-log | log
where pass-through-log
is my custom processor.
I am using Spring Cloud Data Flow 2.5.1 and Spring Boot 2.2.6.
Here is the code used for the processor - I am using the functional model.
@SpringBootApplication
class PassThroughLog {
@Bean
def passthroughlog(): Function[String, String] = {
input: String => {
println(s"Received input `$input`")
input
}
}
}
object PassThroughLog {
def main(args: Array[String]): Unit = SpringApplication.run(classOf[PassThroughLog], args: _ *)
}
application.yml
spring:
cloud:
stream:
function:
bindings:
passthroughlog-in-0: input
passthroughlog-out-0: output
build.gradle.kts
// scala
implementation("org.scala-lang:scala-library:2.12.10")
// spring
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.cloud:spring-cloud-starter-function-web:3.0.7.RELEASE")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.5.RELEASE")
I have posted the entire project to github if the code samples here are lacking. I also posted the logs there, as they are quite long.
When I bootstrap a local Kafka cluster and push arbitrary data to the input
topic, I am able to see data flowing through the processor. However, when I deploy the application on Spring Cloud Data Flow, this is not the case. I am deploying the app via Docker in Kubernetes.
Additionally, when I deploy a stream with the definition time --trigger.time-unit=SECONDS | log
, I see messages in the log sink. This has convinced me the problem lies with the custom processor.
Am I missing something simple like a dependency or extra configuration? Any help is greatly appreciated.
Turns out the problem was with my Dockerfile. For ease of configuration, I had a build argument to specify the jar file used in the ENTRYPOINT
. To accomplish this, I used the shell version of ENTRYPOINT
. Changing up my ENTRYPOINT
to the exec version solved my issue.
The shell version of ENTRYPOINT
does not play well with image arguments (docker run <image> <args>
), and hence SCDF could not pass the appropriate arguments to the container.
Changing my Dockerfile from:
FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ENV JAR $JAR
ADD build/libs/$JAR .
ENTRYPOINT java -jar $JAR
to
FROM openjdk:11.0.5-jdk-slim as build
ARG JAR
ADD build/libs/$JAR program.jar
ENTRYPOINT ["java", "-jar", "program.jar"]
fixed the problem.