Search code examples
javareactive-programmingproject-reactorreactive-streams

Project Reactor: Enriching the result of a Flux with the result of a Mono


I am working to introduce reactive programming to my company. I'm in the process of building a simple demo of an activity recommendation system to prove the performance benefits.

One of the challenges I have come up against is enriching the results in one stream with the results of another. I have a working example which is shown below but I am not sure if there are any issues with this approach. Could somebody take a look and provide any potential improvements.

    public Flux<Integer> getRecommendedActivities(Long userId, String location, Integer limit) {
        Flux<ActivityData> activities = activityDatabaseService.getByLocation(location);
        Mono<Map<String,BigInteger>> userCategoryScores = userScoresDatabaseService.get(userId);

        return activities
            .zipWith(userCategoryScores.cache().repeat(), this::scoreActivitiesBasedOnUserCategoryScores)
            .sort(compareActivityScoreStrength)
            .map(ScoredActivityData::getActivityId)
            .take(limit);
    }

    private ScoredActivityData scoreActivitiesBasedOnUserCategoryScores(ActivityData deal,Map<String, BigInteger> categoryScores){
        //This method combines the deal score and user category scores to come up with a final score
    }

Thanks, Carl


Solution

  • Nothing inherently wrong with the code you have there. A few stylistic points which may or may not be helpful:

    • The "norm" with reactive programming is to use a fluent style and inline everything throughout, rather than declare separate local variables at the top of your method and use those in the reactive chain.
    • The x.zipWith(y.cache().repeat()) pattern will work just fine, but I find it a bit ugly if it can be avoided (zipWith() implies two real Flux of data in my head, rather than a Mono that's been arbitrarily cached and repeated - so the behaviour is not necessarily as "stand out" obvious as it could be.) Instead, I prefer y.flatMapMany(x) - it makes it clearer that you're taking one value and applying a transformation to many values through a real Flux. So in your case, that might look something like:

      userScoresDatabaseService.get(userId)
              .flatMapMany(c -> activityDatabaseService.getByLocation(location)
                      .map(a -> scoreActivitiesBasedOnUserCategoryScores(a, c))
              )
              .sort() //etc.
      
    • Flux.sort() should really be a "last resort" operation, especially since you cite performance benefits as a reason for exploring reactive. (Reading the entire data in from a database service, then sorting it, then only taking the first n values screams inefficiency - you'd be much better sorting and limiting the values in the data layer.) Bear in mind Flux.sort() will have to wait for the entire source Flux to finish before sorting & returning the values, storing every value in memory as it goes, so doing so loses many benefits of the Flux in the first place. It also makes your reactive chain shorter & simpler, since it doesn't need to worry about sorting and limiting.
    • Nit: Some of your method names & variable names appear to be quite long. I'd shorten them if possible - I find this makes things much easier to read in the context of a reactive chain.

    With the above points, your entire getRecommendedActivities() could read something like:

    scoresDb.get(userId)
            .flatMapMany(c -> activityDb.getByLocation(location, limit, comparator)
                    .map(a -> score(a, c).getActivityId())
            )
    

    ...which to me at least, reads much shorter & simpler.