Search code examples
javaspring-bootspring-cloud-streamspring-cloud-dataflowspring-cloud-function

How to log around Spring Cloud Stream functions?


I have a simple Spring cloud Stream function

package com.example    

@Configuration
public class Processor {

    @Bean
    public Function<Message<String>, Message<String>> mapper() {
        return msg -> {
            // do stuff
        };
    }

}

I want to log the input message and the output message transparently. I tried @GlobalChannelInterceptor exactly as suggested in this answer, but with not luck. Both the input and output messages are intercepted in the preSend postSend afterSendCompletion methods after the function is completed.

I also tried using AspectJ as follows:

@Slf4j
@Aspect
@Component
public class MyAspect {

    @Before("execution(* com.example.Processor.*(..)) && args(message,..)")
    public void logBefore(JoinPoint joinPoint, Message<?> message) {
        log.info("before {}", message);
    }

    @Around("execution(* com.example.Processor.*(..)) && args(message,..)")
    public Object logAround(ProceedingJoinPoint joinPoint, Message<?> message) throws Throwable {
        log.info("around {}", message);
        return joinPoint.proceed();
    }

    @After("execution(* com.example.Processor.*(..)) && args(message,..)")
    public void logAfter(JoinPoint joinPoint, Message<?> message) {
        log.info("after {}", message);
    }

}

But this hooks are never called. No logs are shown and I can't hit them with debugger.

Any suggestions on how I can log around Spring Cloud Functions?


Solution

  • I think the issue you see with global interceptor is that you're only intercepting output channel. Function is invoked via input channel.

    You can also achieve this with function. Simply define another function called log and then compose it before and after the actual function.

    For example;

    @Bean
    public Function<Message<?>, Message<?>> log() {
        return m -> {
            System.out.println("Logging " + m);
            return m;
        };
    }
    ...
    spring.cloud.function.definition=log|uppercase|log
    

    You can then also simplify the binding name to be more manageable.

    spring.cloud.stream.function.bindings.log|uppercase|log-in-0=input
    spring.cloud.stream.function.bindings.log|uppercase|log-out-0=output
    

    And you'll get the following in your logs

    Logging GenericMessage [payload=byte[5], headers={amqp_receivedDeliveryMode=PERSISTENT...
    Logging GenericMessage [payload=HELLO, headers={amqp_receivedDeliveryMode=PERSISTENT,...
    

    The benefit of this approach is that you can easily manage when to log and when not to log by simply changing properties. And it's also per function (not global)

    That said, feel free to create an issue so we can make this ore configurable.