Search code examples
spring-integrationspring-integration-dsl

Multiple ThreadStatePropagationChannelInterceptors not possible


I have a Spring Integration setup where I have configured a SecurityContextPropagationChannelInterceptor which propagates the spring security context async channel boundaries. This works well.

I also have some custom ThreadLocal data in addition to the security ThreadLocal context data that I would like to propagate too. I have implemented a custom ThreadStatePropagationChannelInterceptor in a similar manner to how the SecurityContextPropagationChannelInterceptor is implemented, but I have discovered that it is not possible to have more than one ThreadStatePropagationChannelInterceptor as it implements a MessageWithThreadState that has one state object. This state object is the value stored by the first ThreadStatePropagationChannelInterceptor registered and so the second ThreadStatePropagationChannelInterceptor instance which wants to register different data gets a ClassCastException.

What is a better way to propagate this custom data as it seems the ThreadStatePropagationChannelInterceptor is not designed to have more than one piece of state?

2023-09-07 09:51:16,643 [task-scheduler-6] ERROR org.springframework.integration.handler.LoggingHandler - org.springframework.messaging.MessagingException: nested exception is java.lang.ClassCastException: class org.springframework.security.authentication.UsernamePasswordAuthenticationToken cannot be cast to class com....origin.OriginData (org.springframework.security.authentication.UsernamePasswordAuthenticationToken and com....origin.OriginData are in unnamed module of loader org.apache.catalina.loader.ParallelWebappClassLoader @3070f3e6)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:426)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:347)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:340)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassCastException: class org.springframework.security.authentication.UsernamePasswordAuthenticationToken cannot be cast to class com....origin.OriginData (org.springframework.security.authentication.UsernamePasswordAuthenticationToken and com....origin.OriginData are in unnamed module of loader org.apache.catalina.loader.ParallelWebappClassLoader @3070f3e6)
    at com....origin.OriginDataPropagationChannelInterceptor.populatePropagatedContext(OriginDataPropagationChannelInterceptor.java:25)
    at org.springframework.integration.channel.interceptor.ThreadStatePropagationChannelInterceptor.postReceive(ThreadStatePropagationChannelInterceptor.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.postReceive(AbstractMessageChannel.java:522)
    at org.springframework.integration.channel.AbstractPollableChannel.receive(AbstractPollableChannel.java:108)
    at org.springframework.integration.endpoint.PollingConsumer.receiveMessage(PollingConsumer.java:217)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:443)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
    ... 13 more


Solution

  • So, I have a confirmation that this is a problem. And here is the fix: https://github.com/spring-projects/spring-integration/pull/8735.

    Essentially, you are right: when we use several ThreadStatePropagationChannelInterceptor instances in the same channel, we got the problem restoring contexts populated by them. Just because they are stacking their states, but they are still called from the channel in the order they are provided.

    As a workaround I suggest to look into intercepting different channels: do not use them in the same channel. Or implement a composite thread state to take both values into a single container and then restore them respectively.