Search code examples
spring-cloudspring-cloud-streamspring-cloud-dataflow

Spring Cloud Stream AggregateApplication with local Data Flow Server


I am trying to create Spring Cloud Stream Aggregate Application which runs with Data Flow Web Server to be able to manage application via Web UI.

Application runner class:

@SpringBootApplication
public class Runner {

public static void main(String[] args) {
    new AggregateApplicationBuilder(args).web(true)
            .from(JSONFileSourceApplication.class).args("--fixedDelay=5000")
            .via(ProcessorOne.class)
            .to(LoggingSinkApplication.class).run(args);
}

This works OK. Now trying to add Dataflow Server. Create a class:

@SpringBootApplication
@EnableDataFlowServer
public class WebServer {}

And set it as parent configuration of AggregateApplicationBuilder:

...
    new AggregateApplicationBuilder(WebServer.class, args).web(true)
...

If I run it, the following exception occurs:

BeanCreationException: Error creating bean with name 'initH2TCPServer' ... 
Factory method 'initH2TCPServer' threw exception ... Exception opening port "19092" (port may be in use)

Looks like AggregateApplicationBuilder process tries to create another H2 server instead of using one from parent configuration.

If I replace @SpringBootApplication annotation with @Configuration in my JSONFileSourceApplication, ProcessorOne and LoggingSinkApplication classes - stream application starts, web server starts (http://localhost:9393/dashboard), but I don't see my stream components, all tabs in web UI are empty.

How to run Spring Cloud Stream AggregateApplication with Web UI enabled?


Solution

  • SCDF as it stands today, it does not support the concept of aggregate applications.

    The primary reason to this is the fact that SCDF assumes apps to be of know channel types; it is either input/output or both (for processors). However, when using AggregateApplicationBuilder, there are a variety of ways you can compose the channels and it gets blurry in the DSL/UI to be able to discover and bind to the channels in an automated manner.

    That said, in the upcoming release,

    1) We are planning to introduce the concept of "function-chaining". This allows composition of "multiple" small functions (e.g., filterNulls, transformToUppercase, splitByHypen, ..) into a single stream application at runtime. As a developer, you'd focus on developing/testing the functions standalone and register them with SCDF. Once available in the registry, you will have new DSL primitives to compose them into a single unit, which internally is chained (at runtime) by SCDF.

    2) We have plans to elevate the visibility of queues/topics. There will be DSL primitives to interact and create data pipelines with them directly. Given this flexibility, it will be easier for composition like use-cases.