Search code examples
rx-javarx-java2

Split a Flowable in 2, process 2 streams, but one depends on the other?


I have the following situation: I need to process a stream that I receive as a Flowable. Every item on the stream has a piece of data, only the first element on the stream contains metadata. The function that can process the stream of data needs the information in the metadata to do so.

Something like:

// Stream items look like this
class StreamItem{
   Metadata meta;
   Data data;
}

// Processor looks like this
Single<Result> processStream(Meta meta, Flowable<Data> data);

I receive a Flowable<StreamItem>. I've tried to do something like:

Flowable<StreamItem> input = ...

ConnectableFlowable<StreamItem> multi = input.publish;

Single<Meta> streamMeta = multi.firstOrError().map(StreamItem::getMeta);

Flowable<Data> streamData = multi.map(StreamItem::getData);

multi.connect();

Single<Result> result = streamMeta.flatMap(meta ->  processStream(meta,streamData));

After that I just return result.ignoreResult() (as we need the side effects of the process but not really the object), and from the client (which is the entry point) we just map that Completable into the standard response for the call. Not sure if that last part is relevant.

I have also tried:

Flowable<Result> res = input.publish(
   flow -> {
     Single<Meta> meta = flow.firstOrError().map(StreamItem::getMeta);
     Flowable<Data> data = flow.map(StreamITem::getData);
     return meta.flatMap(met -> processStream(met,data)).toFlowable();
   });

and then return res.ignoreElements() for the same Completable process described above.

I have been able to process either the Meta, or stub out the Meta and process the data stream, but as soon as I wire in both like described above, it seems no processing gets done. I think it might be that I'm nesting processing of the same stream? Anyway, I think that I'm probably misunderstanding how all this works (I'm pretty new to Rx), so if anybody has a better idea on how to achieve this, I'd love to hear it!


Solution

  • Changing some things around a little, I think you can leverage Flowable::withLatestFrom( Flowable, BiFunction ).

    // Stream items look like this
    class StreamItem
    {
        String meta;
        Integer data;
    
        public String getMeta()
        {
            return meta;
        }
    
        public Integer getData()
        {
            return data;
        }
    }
    
    // Processor looks like this
    interface Processor
    {
        String processStream( String meta, Integer data );
    }
    
    @Test
    public void testFlowable()
    {
        // Set up mock input:
        AtomicBoolean first = new AtomicBoolean( true );
    
        Flowable<StreamItem> input = Flowable.generate( emitter -> {
    
            StreamItem item = new StreamItem();
            item.data = (int)( Math.random() * 100 );
    
            if ( first.getAndSet( false )) {
                item.meta = UUID.randomUUID().toString();
            }
    
            emitter.onNext( item );
        } );
    
        // Mock processor:
        Processor processor = ( meta, data ) -> meta + " : " + data;
    
        // Set up rx pipeline:
        Flowable<StreamItem> multi = input.share();
        Maybe<String> streamMeta = multi.firstElement().map( StreamItem::getMeta );
    
        Flowable<String> result = multi.map( StreamItem::getData )
            .withLatestFrom( streamMeta.toFlowable(),
                    ( data, meta ) -> processor.processStream( meta, data ));
    
        // Subscribe:
        result.take( 5 ).blockingSubscribe( System.out::println );
    }
    

    Output:

    3fba00bd-027b-4802-8b7d-674497d72052 : 14
    3fba00bd-027b-4802-8b7d-674497d72052 : 72
    3fba00bd-027b-4802-8b7d-674497d72052 : 47
    3fba00bd-027b-4802-8b7d-674497d72052 : 14
    3fba00bd-027b-4802-8b7d-674497d72052 : 93
    

    Update based on feedback:

    This seems to do the trick if you really need your data Flowable alongside a concrete metadata object:

    // Stream items look like this
    class StreamItem
    {
        String meta;
        Integer data;
    
        public String getMeta()
        {
            return meta;
        }
    
        public Integer getData()
        {
            return data;
        }
    }
    
    // Processor looks like this
    interface Processor
    {
        String processStream( String meta, Flowable<Integer> data );
    }
    
    @Test
    public void testFlowable()
    {
        // Set up mock input:
        AtomicBoolean first = new AtomicBoolean( true );
    
        Flowable<StreamItem> input = Flowable.generate( emitter -> {
    
            StreamItem item = new StreamItem();
            item.data = (int)( Math.random() * 100 );
    
            if ( first.getAndSet( false )) {
                item.meta = UUID.randomUUID().toString();
            }
    
            emitter.onNext( item );
        } );
    
        // Mock processor:
        Processor processor = ( meta, data ) -> {
            System.out.println( meta );
            data.subscribe( System.out::println );
            return meta;
        };
    
        // Set up rx pipeline:
        Flowable<StreamItem> multi = input.take( 5 ).share();
        Maybe<String> streamMeta = multi.firstElement().map( StreamItem::getMeta );
    
        streamMeta.map( meta ->
            processor.processStream( meta, multi.map( StreamItem::getData )))
        .subscribe();
    }
    

    Output:

    3421c5f6-8554-43ce-aa69-e6cef9c1ed89
    47
    46
    74
    59
    57