Search code examples
javacachingrx-javareactive-streamsbackpressure

RxJava multiple consumers of one publisher


I'm writing some kind of middleware HTTP proxy with cache. The workflow is:

  1. Client requests this proxy for resource
  2. If resurce exists in cache, proxy returns it
  3. If resource wasn't found, proxy fetching remote resource and returns to the user. Proxy saves this resource to the cache on data loading.

My interfaces have Publisher<ByteBuffer> stream for remote resource, cache which accepts Publisher<ByteBuffer> to save, and clients' connection which accepts Publisher<ByteBuffer> as a response:

// remote resource
interface Resource {
  Publisher<ByteBuffer> fetch();
}

// cache
interface Cache {
  Completable save(Publisher<ByteBuffer> data);
}

// clien response connection
interface Connection {
  Completable send(Publisher<ByteBuffer> data);
}

My problem is that I need to lazy save this stream of byte buffers to cache when sending the response to the client, so the client should be responsible for requesting ByteByffer chunks from remote resource, not cache.

I tried to use Publisher::cache method, but it's not a good choice for me, because it keeps all received data in memory, it's not acceptable, since cached data may be few GB of size.

As a workaround, I created Subject filled by next items received from Resource:

private final Cache cache;
private final Connection out;

Completable proxy(Resource res) {
  Subject<ByteBuffer> mirror = PublishSUbject.create();
  return Completable.mergeArray(
    out.send(res.fetch().doOnNext(mirror::onNext),
    cache.save(mirror.toFlowable(BackpressureStrategy.BUFFER))
  );
}

Is it possible to reuse same Publisher without caching items in memory, and where only one subscriber will be responsible for requesting items from publisher?


Solution

  • I might be missing something (added comment about my version of the Publisher interface being different).

    But.. here's how I would do something like this conceptually.

    I'm going to simplify the interfaces to deal with Integers:

    // remote resource
    interface Resource {
      ConnectableObservable<Integer> fetch();
    }
    
    // cache
    interface Cache {
      Completable save(Integer data);
    }
    
    // client response connection
    interface Connection {
      Completable send(Integer data);
    }
    

    I'd use Observable::publish to create a ConnectableObservable and establish two subscriptions:

    @Test
    public void testProxy()
    {
        // Override schedulers:
        TestScheduler s = new TestScheduler();
        
        RxJavaPlugins.setIoSchedulerHandler(
                scheduler -> s );
        RxJavaPlugins.setComputationSchedulerHandler(
                scheduler -> s );
        
        // Mock interfaces:
        Resource resource = () -> Observable.range( 1, 100 )
                .publish();
        
        Cache cache = data -> Completable.fromObservable( Observable.just( data )
                    .delay( 100, TimeUnit.MILLISECONDS )
                    .doOnNext( __ -> System.out.println( String.format( "Caching %d", data ))));
        
        Connection connection = data -> Completable.fromObservable( Observable.just( data )
                    .delay( 500, TimeUnit.MILLISECONDS )
                    .doOnNext( __ -> System.out.println( String.format( "Sending %d", data ))));
        
        // Subscribe to resource:
        ConnectableObservable<Integer> observable = resource.fetch();
        
        observable
            .observeOn( Schedulers.io() )
            .concatMapCompletable( data -> connection.send( data ))
            .subscribe();
        
        observable
            .observeOn( Schedulers.computation() )
            .concatMapCompletable( data -> cache.save( data ))
            .subscribe();
        
        observable.connect();
        
        // Simulate passage of time:
        s.advanceTimeBy( 10, TimeUnit.SECONDS );
    }
    

    Output:

    Caching 1
    Caching 2
    Caching 3
    Caching 4
    Sending 1
    Caching 5
    Caching 6
    Caching 7
    Caching 8
    Caching 9
    Sending 2
    Caching 10
    . . . 
    

    Update

    Based on your comments, it sounds like respecting backpressure is important in your case.

    Let's say you have a Publisher somewhere that honors backpressure, you can transform it into a Flowable as follows:

    Flowable<T> flowable = Flowable.fromPublisher( publisher );
    

    Once you have a Flowable you can allow for multiple subscribers without worrying about each subscriber having to request values from the Publisher (or either subscriber from missing any events while establishing the subscriptions). You do that by calling flowable.publish() to create a ConnectableFlowable.

    enter image description here

    ConnectableFlowable<T> flowable = Flowable.fromPublisher( publisher ).publish();
    out.send(flowable);   // calls flowable.subscribe()
    cache.save(flowable); // calls flowable.subscribe()
    flowable.connect();   // begins emitting values