Search code examples
spring-cloud-stream

Using stream bridge to send message from one consumer to another consumer


I had seen issue - StreamBridge not sending acknowledgment header and corresponding code as below for GH--2563.

if (ObjectUtils.containsElement(consumerBindingNames, destinationName)) { //GH-2563
                    logger.warn("You seem to be sending data to the input binding.  It is not "
                            + "recommended, since you are bypassing the binder and this the messaging system exposed by the binder.");
                }

My use case is falling in same category but I am not sure what is correct way of doing it. I have below properties in applicaiton.properties

spring.cloud.function.definition=startTransformation;auditLog
spring.cloud.stream.function.bindings.startTransformation-in-0=transformer
spring.cloud.stream.bindings.transformer.group=transformer-group
spring.cloud.stream.bindings.transformer.consumer.concurrency=3
spring.cloud.stream.bindings.transformer.consumer.maxAttempts = 1

spring.cloud.stream.function.bindings.auditLog-in-0=audit
spring.cloud.stream.bindings.audit.group=audit-group
spring.cloud.stream.bindings.audit.consumer.concurrency=1
spring.cloud.stream.bindings.audit.consumer.maxAttempts=1

Explaining with diagram enter image description here

pseudo-code

public void startTransformation(Message etlMessage) {
        String message = "Request received for transformation " + etlMessage;
        streamBridge.send("audit",messageReceivedEvent);
        //complex logic.
        streamBridge.send("audit",messageCompletedEvent);
    }

public void rss(Message ingestion) {
            String message = "ingesting message";
            streamBridge.send("audit",messageReceivedEvent);
            //complex logic.
            streamBridge.send("audit",messageCompletedEvent);
           streamBridge.send("transformer",messageCompletedEvent);

        }

While sending message I am getting warning and I understand why because the destination and consumer binding name are same.

For resolution, I saw comment "You need an output binding with inbound-stream as its destination.". I am not able to understand it.

The above scenario works if the consumers are at different JVM. I am able to send message without warnings to input bounding of a consumer running as a separate JVM application (i think as it does not have any information about the binding name of other jvm).


Solution

  • Based on the updates and the clarifications you provided in the comments, I assume that you are not providing this audit configuration in the rss app. That may be why you are not seeing the warning in the rss app as this app does not know anything about it's (audit) binding status, and only in the second Boot app.

    Based on the diagram you have above, here is something you can try:

    Instead of sending to a destination (audit) that is same as the binding name, can you use a different destination for this binding? For example:

    
    public void startTransformation(Message etlMessage) {
            String message = "Request received for transformation " + etlMessage;
            streamBridge.send("audit-destination",messageReceivedEvent);
            //complex logic.
            streamBridge.send("audit-destination",messageCompletedEvent);
        }
    
    public void rss(Message ingestion) {
                String message = "ingesting message";
                streamBridge.send("audit-destination",messageReceivedEvent);
                //complex logic.
                streamBridge.send("audit-destination",messageCompletedEvent);
               streamBridge.send("transformer",messageCompletedEvent);
    
            }
    

    Then add this to the configuration in the second app:

    spring.cloud.stream.function.bindings.auditLog-in-0=audit
    spring.cloud.stream.bindings.audit.destination=audit-destination
    spring.cloud.stream.bindings.audit.group=audit-group
    spring.cloud.stream.bindings.audit.consumer.concurrency=1
    spring.cloud.stream.bindings.audit.consumer.maxAttempts=1
    
    

    audit-destination is an arbitrary name and it can be anything.