Search code examples
javaproject-reactor

Is it possible to pass the Reactor Context to a Flux created from a Sinks?


We're currrently using the ReactiveSecurityContextHolder which gets our correct Auth details and is used along the Flux stream.

Now we want to de-couple stuff. The first iteration is that we use a Sinks as an intermediate 'event-hub'. So from an endpoint we produce some item to the Sinks.Many.

A listener is consuming events from this Sinks and doing the heavy work. Now in this consumer I'd like to use the context which is available on the producing site. I know one can do deferContextual to pass on the current context to another Flux. But is it possible to pass the context to the resulting Flux from the Sinks?

Thanks in advance.

Alex


Solution

  • There is currently no API that exposes that on arbitrary Sinks. The challenge with Sinks is that a lot of them are multicasting to multiple Subscribers, and the Context is defined on each Subscriber.

    There is a hack though: Sinks.Many<T> is Scannable, and most concrete implementations should expose their current collection of subscribers through the Stream<Scannable> inners() method. In the case of a unicast sink, scan(Attr.ACTUAL) would also work.

    Two big caveats:

    • these APIs only expose Scannables, which doesn't allow access to Context directly
    • if the implementation's inner subscriber isn't Scannable, it is replaced in the stream by the Scannable#NOT_SCANNABLE constant

    Most if not all of reactor-core CoreSubscribers are Scannable, but if you connect a custom subscriber which isn't Scannable, even though it has a Context, you won't be able to see it.

    Multicast Sinks in reactor-core tend to wrap downstream subscribers in their own inner Scannable inner tracker, which would make this approach work.

    Unicast Sinks are a bit different as they directly attach to the downstream Subscriber. So if it is a CoreSubscriber but somehow not a Scannable, you won't be able to see it as a CoreSubscriber and access its Context.

    To sum up the approach:

    1. call sink.inners() to get a Stream<Scannable>
    2. ensure values are instances of CoreSubscriber (that's the part where things can go wrong)
    3. cast values to CoreSubscriber and call currentContext()
    4. somehow reconcile the various Contexts you got to extract the relevant key-value pair(s)