Search code examples
javaspring-bootjava-8spring-cloudfunctional-interface

How to configure the bindings of a function in Spring Cloud Stream to bind its input to a web endpoint and its output to a Kafka topic


I have a regular java Function; which I am trying to bind:

  1. Its input to a web endpoint
  2. Its output to a kafka topic.

When I use my function in the context of the web, it always returns the resulting value of the Function back to the web client alone. Can I do something like this?:

spring.cloud.stream.bindings.input.binder=web
spring.cloud.stream.bindings.output.binder=kafka

I'm currently even trying to split the Function into 2:

  • One with its input bound to the web client and its output dynamically bound to the second function (using spring.cloud.stream.sendto.destination)
  • Another function with its output bound to a kafka binding.

Still this approach doesn't work either. The dynamic routing (spring.cloud.stream.sendto.destination) shows up back on the web client; but no Message is sent to the kafka binding itself. Here is the code I am using in this second approach (2 functions) in the hopes to simply get a Spring functional app to bind its input to a web endpoint and output to a kafka topic.

WebToKafkaApp.java

@SpringBootApplication
public class WebToKafkaApp {
    public static void main(String[] args) {
        SpringApplication.run(WebToKafkaApp.class, args);
    }

    @Bean
    public Function<String, Message<String>> webFunction() {
        return payload -> createPayloadMapperToMessage("kafkaFunction").apply(payload);
    }

    @Bean
    public Function<Flux<Message<String>>, Flux<Message<String>>> kafkaFunction() {
        return flux -> flux.map(msg -> createPayloadMapperToMessage("").apply(msg.getPayload()));
    }

    private Function<String, Message<String>> createPayloadMapperToMessage(String destination) {
        return payload -> MessageBuilder
                .withPayload(payload.toUpperCase())
                .setHeader("spring.cloud.stream.sendto.destination", destination)
                .build();
    }
}

application.yml

spring.cloud.stream.bindings.webFunction-in-0:
  destination: webFunctionIN
  contentType: application/json
spring.cloud.stream.bindings.webFunction-out-0:
  destination: webFunctionOUT
  contentType: application/json
spring.cloud.stream.bindings.kafkaFunction-in-0:
  destination: kafkaFunctionIN
  contentType: application/json
  binder: kafka
spring.cloud.stream.bindings.kafkaFunction-out-0:
  destination: kafkaFunctionOUT
  contentType: application/json
  binder: kafka

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092

spring.cloud.stream.function.routing.enabled: true
spring.cloud.function.definition: webFunction

build.gradle

plugins {
    id 'org.springframework.boot' version '2.2.1.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
repositories {
    mavenCentral()
}
ext {
    set('springCloudVersion', "Hoxton.RELEASE")
}
dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.cloud:spring-cloud-starter-function-web'
    implementation 'org.springframework.cloud:spring-cloud-starter-function-webflux'
    implementation 'org.springframework.cloud:spring-cloud-stream'
    implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}
dependencyManagement {
    imports {
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
    }
}
test {
    useJUnitPlatform()
}

Any help would be appreciated.


Solution

  • Thanks to Oleg for posting the idea behind this solution. Essentially, I enhanced his proposal to generically handle a bridge between:

    1. A functional web controller; which could receive the web request.
    2. A stream supplier; which could forward any message to a messaging infrastructure.

    This solution encapsulates the concerns described in Oleg example, inside a custom implementation of a Supplier. Such implementation exposes an API to trigger the Supplier to emit a message passed as parameter. Such a class would look like the following:

    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    
    import java.util.function.Supplier;
    
    import reactor.core.publisher.EmitterProcessor;
    import reactor.core.publisher.Flux;
    
    public class StreamSupplier implements Supplier<Flux<?>> {
    
        private static final String SPRING_CLOUD_STREAM_SENDTO_DESTINATION =
                "spring.cloud.stream.sendto.destination";
    
        public static <T> Message<?> createMessage(T payload, String destination) {
            MessageBuilder<T> builder = MessageBuilder.withPayload(payload);
            if (destination != null && !destination.isEmpty())
                builder.setHeader(SPRING_CLOUD_STREAM_SENDTO_DESTINATION, destination);
            return builder.build();
        }
    
        private String defaultDestination;
        private EmitterProcessor<? super Object> processor = EmitterProcessor.create();
    
        public StreamSupplier() {
            this(null);
        }
    
        public StreamSupplier(String defaultDestination) {
            this.defaultDestination = defaultDestination;
        }
    
        // SEND APIs
    
        public <T> Message<?> sendMessage(T payload) {
            return sendMessage(payload, defaultDestination);
        }
    
        public <T> Message<?> sendMessage(T payload, String destination) {
            return sendBody(createMessage(payload, destination));
        }
    
        public <T> T sendBody(T body) {
            processor.onNext(body);
            return body;
        }
    
        /**
         * Returns {@link EmitterProcessor} used internally to programmatically publish messages onto
         * the output binding associated with this {@link Supplier}. Such programmatic publications
         * are available through the {@code sendXXX} API methods available in this class.
         */
        @Override
        public Flux<?> get() {
            return processor;
        }
    }
    

    Then a developer only has to:

    1. Register an instance of this particular Supplier implementation as a bean in a Spring application; and let spring-cloud-function scan this bean into the FunctionCatalog.
    2. Create a web function that forwards any message to a streaming infrastructure using the previously registered Supplier - which can be configured using all the bells and whistles of spring-cloud-stream.

    The following example demonstrate this:

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Controller;
    
    import java.util.function.Function;
    import java.util.function.Supplier;
    
    import reactor.core.publisher.Flux;
    
    @SpringBootApplication
    @Controller
    public class MyApp {
    
        public static void main(String[] args) {
            SpringApplication.run(MyApp.class,
                    "--spring.cloud.function.definition=streamSupplierFunction;webToStreamFunction");
        }
    
        // Functional Web Controller
        @Bean
        public Function<String, String> webToStreamFunction() {
            return msg -> streamSupplier().sendBody(msg);
        }
    
        // Functional Stream Supplier
        @Bean
        public Supplier<Flux<?>> streamSupplierFunction() {
            return new StreamSupplier();
        }
    
        // DOUBLE REGISTRATION TO AVOID POLLABLE CONFIGURATION
        // LIMITATION OF SPRING-CLOUD-FUNCTION
        @Bean
        public StreamSupplier streamSupplier() {
            return (StreamSupplier) streamSupplierFunction();
        }
    }
    

    Again, I want to thanks Oleg for providing the required details I was looking for to build this comprehensive solution.

    Complete code on GitHub