Search code examples
javaconcurrencypromiseratpack

Ratpack's Promise.cache with multiple downstream promises in ParallelBatch


I'm running into a NullPointerException in the guts of Ratpack when using Ratpack's Promise.cache in combination with multiple downstream promises and ParallelBatch, and it's not clear to me from the documentation whether my usage is incorrect, or if this represents a bug in Ratpack.

Here's a reduced test case that demonstrates the problem:

@Test
public void foo() throws Exception {
    List<Promise<Integer>> promises = new ArrayList<>();

    for (int i = 0; i < 25; i++) {
        Promise<Integer> p = Promise.value(12);
        p = p.cache();
        promises.add(p.map(v -> v + 1));
        promises.add(p.map(v -> v + 2));
    }

    final List<Integer> results = ExecHarness.yieldSingle(c ->
            ParallelBatch.of(promises).yield()
    ).getValueOrThrow();
}

Running this test 10000 times locally results in a failure rate of about 10 / 10000, with a NullPointerException that looks like this:

java.lang.NullPointerException
    at ratpack.exec.internal.CachingUpstream.yield(CachingUpstream.java:93)
    at ratpack.exec.internal.CachingUpstream.tryDrain(CachingUpstream.java:65)
    at ratpack.exec.internal.CachingUpstream.lambda$connect$0(CachingUpstream.java:116)
    at ratpack.exec.internal.CachingUpstream$$Lambda$58/1438461739.connect(Unknown Source)
    at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
    at ratpack.exec.internal.DefaultExecution$$Lambda$33/2092087501.execute(Unknown Source)
    at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
    at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
    at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
    at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
    at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
    at ratpack.exec.internal.DefaultExecController$1$$Lambda$7/1411892748.call(Unknown Source)
    at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
    at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
    at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory$$Lambda$8/1157058691.run(Unknown Source)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:745)

Not using cache in this testcase makes the issue go away, as does not subscribing to each cached promise twice.

My question is: is this incorrect usage of Ratpack's API, or does it represent a bug in the framework? If the former, can you point me towards something in the docs that explains why this usage is wrong?


Solution

  • Even though your example is not the best use case for caching promises (recreating and caching promise that holds the same value for each iteration step does not make much sense), you actually have found a race condition bug in CachingUpstream<T> class.

    I did some experiments to figure out what is happening and here are my findings. Firstly, I have created a promise of value 12 that provides custom (more verbose) implementation of CachingUpstream<T> object. I took the body of Promise.value(12) and I have override inline a method cacheResultIf(Predicate<? super ExecResult<T>> shouldCache) that by default returns CachingUpstream<T> instance:

    Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
            continuation.resume(() -> down.success(12))
    )) {
        @Override
        public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
            return transform(up -> {
                return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
            });
        }
    };
    

    Next I have created a class TestCachingUpstream<T> simply by copying the body of the original class and I have added a few things, e.g.

    • I made every TestCachingUpstream<T> having internal ID (random UUID) to make tracing the execution of a promise easier.
    • I added a few verbose log messages when specific things happened during promise execution.

    I haven't changed implementation of the methods, I just wanted to trace the execution flow and keep original implementation as is. My custom class looked like this:

    private static class TestCachingUpstream<T> implements Upstream<T> {
        private final String id = UUID.randomUUID().toString();
    
        private Upstream<? extends T> upstream;
    
        private final Clock clock;
        private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
        private final Function<? super ExecResult<T>, Duration> ttlFunc;
    
        private final AtomicBoolean pending = new AtomicBoolean();
        private final AtomicBoolean draining = new AtomicBoolean();
        private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();
    
        public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
            this(upstream, ttl, Clock.systemUTC());
        }
    
        @VisibleForTesting
        TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
            this.upstream = upstream;
            this.ttlFunc = ttl;
            this.clock = clock;
        }
    
        private void tryDrain() {
            if (draining.compareAndSet(false, true)) {
                try {
                    TestCachingUpstream.Cached<? extends T> cached = ref.get();
                    if (needsFetch(cached)) {
                        if (pending.compareAndSet(false, true)) {
                            Downstream<? super T> downstream = waiting.poll();
    
                            System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);
    
                            if (downstream == null) {
                                pending.set(false);
                            } else {
                                try {
                                    yield(downstream);
                                } catch (Throwable e) {
                                    System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
                                    receiveResult(downstream, ExecResult.of(Result.error(e)));
                                }
                            }
                        }
                    } else {
                        System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
                        Downstream<? super T> downstream = waiting.poll();
                        while (downstream != null) {
                            downstream.accept(cached.result);
                            downstream = waiting.poll();
                        }
                    }
                } finally {
                    draining.set(false);
                }
            }
    
            if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
                tryDrain();
            }
        }
    
        private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
            return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
        }
    
        private void yield(final Downstream<? super T> downstream) throws Exception {
            System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
            upstream.connect(new Downstream<T>() {
                public void error(Throwable throwable) {
                    System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
                    receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
                }
    
                @Override
                public void success(T value) {
                    System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
                    receiveResult(downstream, ExecResult.of(Result.success(value)));
                }
    
                @Override
                public void complete() {
                    System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
                    receiveResult(downstream, CompleteExecResult.get());
                }
            });
        }
    
        @Override
        public void connect(Downstream<? super T> downstream) throws Exception {
            TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
            if (needsFetch(cached)) {
                Promise.<T>async(d -> {
                    waiting.add(d);
                    tryDrain();
                }).result(downstream::accept);
            } else {
                downstream.accept(cached.result);
            }
        }
    
        private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
            Duration ttl = Duration.ofSeconds(0);
            try {
                ttl = ttlFunc.apply(result);
            } catch (Throwable e) {
                if (result.isError()) {
                    //noinspection ThrowableResultOfMethodCallIgnored
                    result.getThrowable().addSuppressed(e);
                } else {
                    result = ExecResult.of(Result.error(e));
                }
            }
    
            Instant expiresAt;
            if (ttl.isNegative()) {
                expiresAt = null; // eternal
                System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
                upstream = null; // release
            } else if (ttl.isZero()) {
                expiresAt = clock.instant().minus(Duration.ofSeconds(1));
            } else {
                expiresAt = clock.instant().plus(ttl);
            }
    
            ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
            pending.set(false);
    
            downstream.accept(result);
    
            tryDrain();
        }
    
        static class Cached<T> {
            final ExecResult<T> result;
            final Instant expireAt;
    
            Cached(ExecResult<T> result, Instant expireAt) {
                this.result = result;
                this.expireAt = expireAt;
            }
        }
    }
    

    I have reduced the number of steps in for-loop from 25 to 3 to keep console output more concise.

    Success test execution (no race condition)

    Let's take a look what does the flow of a correct execution look like:

    [c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] no pending execution and downstream is not null and cached is null...
    [c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] calling yield...  
    [c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream.connect.success
    [c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
    [c9a70043-36b8-44f1-b8f3-dd8ce30ca0ef] [ratpack-compute-22-2] upstream does not need fetching...
    [5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] no pending execution and downstream is not null and cached is null...
    [5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] calling yield...  
    [5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream.connect.success
    [5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
    [5c740555-3638-4f3d-8a54-162d37bcb695] [ratpack-compute-22-4] upstream does not need fetching...
    [c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] no pending execution and downstream is not null and cached is null...
    [c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] calling yield...  
    [c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream.connect.success
    [c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
    [c47a8f8a-5f93-4d2f-ac18-63ed76848b9f] [ratpack-compute-22-6] upstream does not need fetching...
    

    As you can see each iteration causes cached promise to produce 5 console log lines.

    • When tryDrain method gets called for the first time, there is no result cached and it goes down to yield(downstream); method call
    • calling yield(downstream) completes successfully and receiveResult(downstream, ExecResult.of(Result.success(value))); is called from inside success callback
    • Promise.cache() uses infinite expiration date by using negative duration and this is why receiveResult() method releases upstream object by setting it's value to null
    • receiveResult() method before completing sets cached result using ref internal object and calls tryDrain() right before exiting the method.
    • tryDrain() method sees previously cached result for the next call on cached promise (p.map(v -> v + 2)) so it passes cached result directly to the downstream.

    And this scenario repeats for all 3 promises created inside for-loop.

    Failed test execution (race condition)

    Running the test with those System.out.printf() made test failing a few times less often, mostly because this I/O operation consumes some CPU cycles and desynchronized part of code had a few more cycles to avoid race condition. However it still happens and now let's take a look what does the output of failed test look like:

    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] no pending execution and downstream is not null and cached is null...
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] calling yield...  
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] upstream.connect.success
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] no pending execution and downstream is not null and cached is null...
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling yield... upstream is null... 
    [4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] no pending execution and downstream is not null and cached is null...
    [4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] calling yield...  
    [4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream.connect.success
    [4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
    [4f00c50a-4706-4d22-b905-096934b7c374] [ratpack-compute-786-4] upstream does not need fetching...
    [8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] no pending execution and downstream is not null and cached is null...
    [8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] calling yield...  
    [8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream.connect.success
    [8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
    [8b27d16f-dc91-4341-b630-cf5c959c45e8] [ratpack-compute-786-6] upstream does not need fetching...
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] calling receiveResult after catching exception class java.lang.NullPointerException
    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-3] releasing upstream... (ExecResult{complete=false, error=java.lang.NullPointerException, value=null}) 
    
    java.lang.NullPointerException
        at app.AnotherPromiseTest$TestCachingUpstream.yield(AnotherPromiseTest.java:120)
        at app.AnotherPromiseTest$TestCachingUpstream.tryDrain(AnotherPromiseTest.java:89)
        at app.AnotherPromiseTest$TestCachingUpstream.lambda$connect$0(AnotherPromiseTest.java:146)
        at ratpack.exec.internal.DefaultExecution.lambda$null$2(DefaultExecution.java:122)
        at ratpack.exec.internal.DefaultExecution$SingleEventExecStream.exec(DefaultExecution.java:489)
        at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:216)
        at ratpack.exec.internal.DefaultExecution.exec(DefaultExecution.java:209)
        at ratpack.exec.internal.DefaultExecution.drain(DefaultExecution.java:179)
        at ratpack.exec.internal.DefaultExecution.<init>(DefaultExecution.java:92)
        at ratpack.exec.internal.DefaultExecController$1.lambda$start$0(DefaultExecController.java:195)
        at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:309)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
        at ratpack.exec.internal.DefaultExecController$ExecControllerBindingThreadFactory.lambda$newThread$0(DefaultExecController.java:136)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
    

    This is the output of failed test - I run it inside IntelliJ IDEA and I have configured execution of this test to repeat until failure. It took me some time to get this test failed, but after running this test couple of times it finally failed around iteration number 1500. In this case we can see race condition happened to the first promise created in the for-loop. You can see that after releasing upstream object inside receiveResult() method

    [088a234e-17d0-4f3a-bb7c-ec6e4a464fa2] [ratpack-compute-786-2] releasing upstream... (ExecResult{complete=false, error=null, value=12}) 
    

    and calling tryDrain before exiting the method, next execution of cached promise didn't see previously cached result yet and it run down to the yield(downstream) method again. After upstream object was already released by setting its value to null. And yield(downstream) method expects that upstream object is initialized correctly, otherwise it throws NPE.

    I was trying to debug method:

    private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
        return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
    }
    

    This is the method that decides if cached promise need to be fetched. However when I added any logging statements it started causing StackOverflowError. I'm guessing that in rare cases cached.expireAt.isBefore(clock.instant()) returns false, because cached object comes from AtomicReference so this object should be passed correctly between methods execution.

    And here is the full test class I used in my experiments:

    import com.google.common.annotations.VisibleForTesting;
    import io.netty.util.internal.PlatformDependent;
    import org.junit.Test;
    import ratpack.exec.*;
    import ratpack.exec.internal.CompleteExecResult;
    import ratpack.exec.internal.DefaultExecution;
    import ratpack.exec.internal.DefaultPromise;
    import ratpack.exec.util.ParallelBatch;
    import ratpack.func.Function;
    import ratpack.func.Predicate;
    import ratpack.test.exec.ExecHarness;
    
    import java.time.Clock;
    import java.time.Duration;
    import java.time.Instant;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Queue;
    import java.util.UUID;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicReference;
    
    public class AnotherPromiseTest {
    
        @Test
        public void foo() throws Exception {
            List<Promise<Integer>> promises = new ArrayList<>();
    
            for (int i = 0; i < 3; i++) {
                Promise<Integer> p = new DefaultPromise<Integer>(down -> DefaultExecution.require().delimit(down::error, continuation ->
                        continuation.resume(() -> down.success(12))
                )) {
                    @Override
                    public Promise<Integer> cacheResultIf(Predicate<? super ExecResult<Integer>> shouldCache) {
                        return transform(up -> {
                            return new TestCachingUpstream<>(up, shouldCache.function(Duration.ofSeconds(-1), Duration.ZERO));
                        });
                    }
                };
    
                p = p.cache();
                promises.add(p.map(v -> v + 1));
                promises.add(p.map(v -> v + 2));
            }
    
            ExecHarness.yieldSingle(c -> ParallelBatch.of(promises).yield()).getValueOrThrow();
        }
    
        private static class TestCachingUpstream<T> implements Upstream<T> {
            private final String id = UUID.randomUUID().toString();
    
            private Upstream<? extends T> upstream;
    
            private final Clock clock;
            private final AtomicReference<TestCachingUpstream.Cached<? extends T>> ref = new AtomicReference<>();
            private final Function<? super ExecResult<T>, Duration> ttlFunc;
    
            private final AtomicBoolean pending = new AtomicBoolean();
            private final AtomicBoolean draining = new AtomicBoolean();
            private final Queue<Downstream<? super T>> waiting = PlatformDependent.newMpscQueue();
    
            public TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl) {
                this(upstream, ttl, Clock.systemUTC());
            }
    
            @VisibleForTesting
            TestCachingUpstream(Upstream<? extends T> upstream, Function<? super ExecResult<T>, Duration> ttl, Clock clock) {
                this.upstream = upstream;
                this.ttlFunc = ttl;
                this.clock = clock;
            }
    
            private void tryDrain() {
                if (draining.compareAndSet(false, true)) {
                    try {
                        TestCachingUpstream.Cached<? extends T> cached = ref.get();
                        if (needsFetch(cached)) {
                            if (pending.compareAndSet(false, true)) {
                                Downstream<? super T> downstream = waiting.poll();
    
                                System.out.printf("[%s] [%s] no pending execution and downstream is %s and cached is %s...%n", id, Thread.currentThread().getName(), downstream == null ? "null" : "not null", cached);
    
                                if (downstream == null) {
                                    pending.set(false);
                                } else {
                                    try {
                                        yield(downstream);
                                    } catch (Throwable e) {
                                        System.out.printf("[%s] [%s] calling receiveResult after catching exception %s%n", id, Thread.currentThread().getName(), e.getClass());
                                        receiveResult(downstream, ExecResult.of(Result.error(e)));
                                    }
                                }
                            }
                        } else {
                            System.out.printf("[%s] [%s] upstream does not need fetching...%n", id, Thread.currentThread().getName());
                            Downstream<? super T> downstream = waiting.poll();
                            while (downstream != null) {
                                downstream.accept(cached.result);
                                downstream = waiting.poll();
                            }
                        }
                    } finally {
                        draining.set(false);
                    }
                }
    
                if (!waiting.isEmpty() && !pending.get() && needsFetch(ref.get())) {
                    tryDrain();
                }
            }
    
            private boolean needsFetch(TestCachingUpstream.Cached<? extends T> cached) {
                return cached == null || (cached.expireAt != null && cached.expireAt.isBefore(clock.instant()));
            }
    
            private void yield(final Downstream<? super T> downstream) throws Exception {
                System.out.printf("[%s] [%s] calling yield... %s %n", id, Thread.currentThread().getName(), upstream == null ? "upstream is null..." : "");
                upstream.connect(new Downstream<T>() {
                    public void error(Throwable throwable) {
                        System.out.printf("[%s] [%s] upstream.connect.error%n", id, Thread.currentThread().getName());
                        receiveResult(downstream, ExecResult.of(Result.<T>error(throwable)));
                    }
    
                    @Override
                    public void success(T value) {
                        System.out.printf("[%s] [%s] upstream.connect.success%n", id, Thread.currentThread().getName());
                        receiveResult(downstream, ExecResult.of(Result.success(value)));
                    }
    
                    @Override
                    public void complete() {
                        System.out.printf("[%s] [%s] upstream.connect.complete%n", id, Thread.currentThread().getName());
                        receiveResult(downstream, CompleteExecResult.get());
                    }
                });
            }
    
            @Override
            public void connect(Downstream<? super T> downstream) throws Exception {
                TestCachingUpstream.Cached<? extends T> cached = this.ref.get();
                if (needsFetch(cached)) {
                    Promise.<T>async(d -> {
                        waiting.add(d);
                        tryDrain();
                    }).result(downstream::accept);
                } else {
                    downstream.accept(cached.result);
                }
            }
    
            private void receiveResult(Downstream<? super T> downstream, ExecResult<T> result) {
                Duration ttl = Duration.ofSeconds(0);
                try {
                    ttl = ttlFunc.apply(result);
                } catch (Throwable e) {
                    if (result.isError()) {
                        //noinspection ThrowableResultOfMethodCallIgnored
                        result.getThrowable().addSuppressed(e);
                    } else {
                        result = ExecResult.of(Result.error(e));
                    }
                }
    
                Instant expiresAt;
                if (ttl.isNegative()) {
                    expiresAt = null; // eternal
                    System.out.printf("[%s] [%s] releasing upstream... (%s) %n", id, Thread.currentThread().getName(), result.toString());
                    upstream = null; // release
                } else if (ttl.isZero()) {
                    expiresAt = clock.instant().minus(Duration.ofSeconds(1));
                } else {
                    expiresAt = clock.instant().plus(ttl);
                }
    
                ref.set(new TestCachingUpstream.Cached<>(result, expiresAt));
                pending.set(false);
    
                downstream.accept(result);
    
                tryDrain();
            }
    
            static class Cached<T> {
                final ExecResult<T> result;
                final Instant expireAt;
    
                Cached(ExecResult<T> result, Instant expireAt) {
                    this.result = result;
                    this.expireAt = expireAt;
                }
            }
        }
    }
    

    Hope it helps.