I have 2 source Flux streams which return streams of all keywords and all dictionaries:
Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();
Keyword
has a reference to a Dictionary
object like the following:
public class Keyword {
private String id;
private String dictionaryId;
}
The goal is to have a transformed Flux<DictionaryTO>
which inside holds all properties of Dictionary
plus a list of keywords that belong to the dictionary:
public class DictionaryTO {
private String id;
private Collection<KeywordTO> keywords;
}
public class KeywordTO {
private String id;
}
The question is how to zip/merge these 2 Flux streams in a reactive way without blocking any of the source streams.
Note that keywordFlux
contains all keywords, so some filtering should be applied based on Keyword.dictionaryId
.
As suggested by boris-the-spider, I ended up using .flatMap()
and .zipWith()
.
Mono<Map>
of keywords (grouped by dictionaryId
) and cache it because it will be used multiple times later.flatMap
the Flux
of dictionaries and zip
single dictionary with the above map of keywords. Then map "tuple of dictionary and keywords map" to a dictionary with keywords.Full solution:
Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();
Mono<Map<String, Collection<KeywordTO>>> keywordsMapMono = keywordFlux
.collectMultimap(KeywordTO::getDictionaryId, k -> keywordTOMapper.map(k))
.cache();
Flux<DictionaryTO> dictionaryTOFlux = dictionaryFlux
.map(dictionaryTOMapper:map)
.flatMap(dic -> Mono.just(dic).zipWith(keywordsMapMono))
.map(tuple -> {
Collection<KeywordTO> keywordsForDic = tuple.getT2().get(dic.getId())
DictionaryTO dic = tuple.getT1();
dic.setKeywords(keywordsForDic);
return dic;
});