I need to copy date from one source (in parallel) to another with batches.
I did this:
Flux.generate((SynchronousSink<String> sink) -> {
try {
String val = dataSource.getNextItem();
if (val == null) {
sink.complete();
return;
}
sink.next(val);
} catch (InterruptedException e) {
sink.error(e);
}
})
.parallel(4)
.runOn(Schedulers.parallel())
.doOnNext(dataTarget::write)
.sequential()
.blockLast();
class dataSource{
public Item getNextItem(){
//...
}
}
class dataTarget{
public void write(List<Item> items){
//...
}
}
It receives data in parallel, but writes one at a time.
I need to collect them in batches (like by 10 items) and then write the batch.
How can I do that?
UPDATE:
The main idea that the source is the messaging system (i.e. rabbitmq or nats) that's suitable to efficiently send messages one by one, but the target is the database which is more efficient on inserting a batch.
So the final result should be like — I receive messages in parallel until buffer is not filled up, then I write all the buffer into database by one shot.
It's easy to do in regular java, but in case of streams — I don't get how to do it. How to buffer the data and how to pause the reader till the writer is not ready to get next part.
All you need is Flux#buffer(int maxSize)
operator:
Flux.generate((SynchronousSink<String> sink) -> {
try {
String val = dataSource.getNextItem();
if (val == null) {
sink.complete();
return;
}
sink.next(val);
} catch (InterruptedException e) {
sink.error(e);
}
})
.buffer(10) //Flux<List<String>>
.flatMap(dataTarget::write)
.blockLast();
class DataTarget{
public Mono<Void> write(List<String> items){
return reactiveDbClient.insert(items);
}
}
Here, buffer
collects items into multiple List
's of 10 items(batches). You do not need to use parallel scheduler. The flatmap
will run these operations asynchronously. See Understanding Reactive’s .flatMap() Operator.