Search code examples
javaproject-reactorflux

How to zip multiple Flux streams with filtering


I have 2 source Flux streams which return streams of all keywords and all dictionaries:

Flux<Keyword> keywordFlux = keywordRepository.findAll();
Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();

Keyword has a reference to a Dictionary object like the following:

public class Keyword {
    private String id;
    private String dictionaryId;
}

The goal is to have a transformed Flux<DictionaryTO> which inside holds all properties of Dictionary plus a list of keywords that belong to the dictionary:

public class DictionaryTO {
    private String id;
    private Collection<KeywordTO> keywords;
}
public class KeywordTO {
    private String id;
}

The question is how to zip/merge these 2 Flux streams in a reactive way without blocking any of the source streams.

Note that keywordFlux contains all keywords, so some filtering should be applied based on Keyword.dictionaryId.


Solution

  • As suggested by boris-the-spider, I ended up using .flatMap() and .zipWith().

    1. Create a Mono<Map> of keywords (grouped by dictionaryId) and cache it because it will be used multiple times later.
    2. flatMap the Flux of dictionaries and zip single dictionary with the above map of keywords. Then map "tuple of dictionary and keywords map" to a dictionary with keywords.

    Full solution:

    Flux<Keyword> keywordFlux = keywordRepository.findAll();
    Flux<Dictionary> dictionaryFlux = dictionaryRepository.findAll();
    
    Mono<Map<String, Collection<KeywordTO>>> keywordsMapMono = keywordFlux
        .collectMultimap(KeywordTO::getDictionaryId, k -> keywordTOMapper.map(k))
        .cache(); 
    
    Flux<DictionaryTO> dictionaryTOFlux = dictionaryFlux
        .map(dictionaryTOMapper:map) 
        .flatMap(dic -> Mono.just(dic).zipWith(keywordsMapMono))
        .map(tuple -> {
            Collection<KeywordTO> keywordsForDic = tuple.getT2().get(dic.getId())
            DictionaryTO dic = tuple.getT1();
            dic.setKeywords(keywordsForDic);
            return dic;
        });