I have the following situation: I need to process a stream that I receive as a Flowable. Every item on the stream has a piece of data, only the first element on the stream contains metadata. The function that can process the stream of data needs the information in the metadata to do so.
Something like:
// Stream items look like this
class StreamItem{
Metadata meta;
Data data;
}
// Processor looks like this
Single<Result> processStream(Meta meta, Flowable<Data> data);
I receive a Flowable<StreamItem>
. I've tried to do something like:
Flowable<StreamItem> input = ...
ConnectableFlowable<StreamItem> multi = input.publish;
Single<Meta> streamMeta = multi.firstOrError().map(StreamItem::getMeta);
Flowable<Data> streamData = multi.map(StreamItem::getData);
multi.connect();
Single<Result> result = streamMeta.flatMap(meta -> processStream(meta,streamData));
After that I just return result.ignoreResult()
(as we need the side effects of the process but not really the object), and from the client (which is the entry point) we just map that Completable
into the standard response for the call. Not sure if that last part is relevant.
I have also tried:
Flowable<Result> res = input.publish(
flow -> {
Single<Meta> meta = flow.firstOrError().map(StreamItem::getMeta);
Flowable<Data> data = flow.map(StreamITem::getData);
return meta.flatMap(met -> processStream(met,data)).toFlowable();
});
and then return res.ignoreElements()
for the same Completable
process described above.
I have been able to process either the Meta, or stub out the Meta and process the data stream, but as soon as I wire in both like described above, it seems no processing gets done. I think it might be that I'm nesting processing of the same stream? Anyway, I think that I'm probably misunderstanding how all this works (I'm pretty new to Rx), so if anybody has a better idea on how to achieve this, I'd love to hear it!
Changing some things around a little, I think you can leverage Flowable::withLatestFrom( Flowable, BiFunction )
.
// Stream items look like this
class StreamItem
{
String meta;
Integer data;
public String getMeta()
{
return meta;
}
public Integer getData()
{
return data;
}
}
// Processor looks like this
interface Processor
{
String processStream( String meta, Integer data );
}
@Test
public void testFlowable()
{
// Set up mock input:
AtomicBoolean first = new AtomicBoolean( true );
Flowable<StreamItem> input = Flowable.generate( emitter -> {
StreamItem item = new StreamItem();
item.data = (int)( Math.random() * 100 );
if ( first.getAndSet( false )) {
item.meta = UUID.randomUUID().toString();
}
emitter.onNext( item );
} );
// Mock processor:
Processor processor = ( meta, data ) -> meta + " : " + data;
// Set up rx pipeline:
Flowable<StreamItem> multi = input.share();
Maybe<String> streamMeta = multi.firstElement().map( StreamItem::getMeta );
Flowable<String> result = multi.map( StreamItem::getData )
.withLatestFrom( streamMeta.toFlowable(),
( data, meta ) -> processor.processStream( meta, data ));
// Subscribe:
result.take( 5 ).blockingSubscribe( System.out::println );
}
Output:
3fba00bd-027b-4802-8b7d-674497d72052 : 14
3fba00bd-027b-4802-8b7d-674497d72052 : 72
3fba00bd-027b-4802-8b7d-674497d72052 : 47
3fba00bd-027b-4802-8b7d-674497d72052 : 14
3fba00bd-027b-4802-8b7d-674497d72052 : 93
Update based on feedback:
This seems to do the trick if you really need your data Flowable
alongside a concrete metadata object:
// Stream items look like this
class StreamItem
{
String meta;
Integer data;
public String getMeta()
{
return meta;
}
public Integer getData()
{
return data;
}
}
// Processor looks like this
interface Processor
{
String processStream( String meta, Flowable<Integer> data );
}
@Test
public void testFlowable()
{
// Set up mock input:
AtomicBoolean first = new AtomicBoolean( true );
Flowable<StreamItem> input = Flowable.generate( emitter -> {
StreamItem item = new StreamItem();
item.data = (int)( Math.random() * 100 );
if ( first.getAndSet( false )) {
item.meta = UUID.randomUUID().toString();
}
emitter.onNext( item );
} );
// Mock processor:
Processor processor = ( meta, data ) -> {
System.out.println( meta );
data.subscribe( System.out::println );
return meta;
};
// Set up rx pipeline:
Flowable<StreamItem> multi = input.take( 5 ).share();
Maybe<String> streamMeta = multi.firstElement().map( StreamItem::getMeta );
streamMeta.map( meta ->
processor.processStream( meta, multi.map( StreamItem::getData )))
.subscribe();
}
Output:
3421c5f6-8554-43ce-aa69-e6cef9c1ed89
47
46
74
59
57