Search code examples
scalaspring-bootspring-cloud-streamspring-cloud-dataflowspring-cloud-stream-binder-kafka

Spring Cloud Data Flow Custom Scala Processor unable to send/receive data from Starter Apps (SCDF 2.5.1 & Spring Boot 2.2.6)


I have been working on creating a simple custom processor in Scala for Spring Cloud Data Flow and have been running into issues with sending/receiving data from/to starter applications. I have been unable to see any messages propagating through the stream. The definition of the stream is time --trigger.time-unit=SECONDS | pass-through-log | log where pass-through-log is my custom processor.

I am using Spring Cloud Data Flow 2.5.1 and Spring Boot 2.2.6.

Here is the code used for the processor - I am using the functional model.

@SpringBootApplication
class PassThroughLog {

  @Bean
  def passthroughlog(): Function[String, String] = {
    input: String => {
      println(s"Received input `$input`")
      input
    }
  }
}

object PassThroughLog {
  def main(args: Array[String]): Unit = SpringApplication.run(classOf[PassThroughLog], args: _ *)
}

application.yml

spring:
  cloud:
    stream:
      function:
        bindings:
          passthroughlog-in-0: input
          passthroughlog-out-0: output

build.gradle.kts

// scala
implementation("org.scala-lang:scala-library:2.12.10")

// spring
implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR5"))
implementation(platform("org.springframework.cloud:spring-cloud-stream-dependencies:Horsham.SR5"))
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.cloud:spring-cloud-starter-function-web:3.0.7.RELEASE")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.0.5.RELEASE")

I have posted the entire project to github if the code samples here are lacking. I also posted the logs there, as they are quite long.

When I bootstrap a local Kafka cluster and push arbitrary data to the input topic, I am able to see data flowing through the processor. However, when I deploy the application on Spring Cloud Data Flow, this is not the case. I am deploying the app via Docker in Kubernetes.

Additionally, when I deploy a stream with the definition time --trigger.time-unit=SECONDS | log, I see messages in the log sink. This has convinced me the problem lies with the custom processor.

Am I missing something simple like a dependency or extra configuration? Any help is greatly appreciated.


Solution

  • Turns out the problem was with my Dockerfile. For ease of configuration, I had a build argument to specify the jar file used in the ENTRYPOINT. To accomplish this, I used the shell version of ENTRYPOINT. Changing up my ENTRYPOINT to the exec version solved my issue.

    The shell version of ENTRYPOINT does not play well with image arguments (docker run <image> <args>), and hence SCDF could not pass the appropriate arguments to the container.

    Changing my Dockerfile from:

    FROM openjdk:11.0.5-jdk-slim as build
    ARG JAR
    ENV JAR $JAR
    ADD build/libs/$JAR .
    ENTRYPOINT java -jar $JAR
    

    to

    FROM openjdk:11.0.5-jdk-slim as build
    ARG JAR
    ADD build/libs/$JAR program.jar
    ENTRYPOINT ["java", "-jar", "program.jar"]
    

    fixed the problem.