I am working to introduce reactive programming to my company. I'm in the process of building a simple demo of an activity recommendation system to prove the performance benefits.
One of the challenges I have come up against is enriching the results in one stream with the results of another. I have a working example which is shown below but I am not sure if there are any issues with this approach. Could somebody take a look and provide any potential improvements.
public Flux<Integer> getRecommendedActivities(Long userId, String location, Integer limit) {
Flux<ActivityData> activities = activityDatabaseService.getByLocation(location);
Mono<Map<String,BigInteger>> userCategoryScores = userScoresDatabaseService.get(userId);
return activities
.zipWith(userCategoryScores.cache().repeat(), this::scoreActivitiesBasedOnUserCategoryScores)
.sort(compareActivityScoreStrength)
.map(ScoredActivityData::getActivityId)
.take(limit);
}
private ScoredActivityData scoreActivitiesBasedOnUserCategoryScores(ActivityData deal,Map<String, BigInteger> categoryScores){
//This method combines the deal score and user category scores to come up with a final score
}
Thanks, Carl
Nothing inherently wrong with the code you have there. A few stylistic points which may or may not be helpful:
The x.zipWith(y.cache().repeat())
pattern will work just fine, but I find it a bit ugly if it can be avoided (zipWith()
implies two real Flux
of data in my head, rather than a Mono
that's been arbitrarily cached and repeated - so the behaviour is not necessarily as "stand out" obvious as it could be.) Instead, I prefer y.flatMapMany(x)
- it makes it clearer that you're taking one value and applying a transformation to many values through a real Flux
. So in your case, that might look something like:
userScoresDatabaseService.get(userId)
.flatMapMany(c -> activityDatabaseService.getByLocation(location)
.map(a -> scoreActivitiesBasedOnUserCategoryScores(a, c))
)
.sort() //etc.
Flux.sort()
should really be a "last resort" operation, especially since you cite performance benefits as a reason for exploring reactive. (Reading the entire data in from a database service, then sorting it, then only taking the first n
values screams inefficiency - you'd be much better sorting and limiting the values in the data layer.) Bear in mind Flux.sort()
will have to wait for the entire source Flux
to finish before sorting & returning the values, storing every value in memory as it goes, so doing so loses many benefits of the Flux
in the first place. It also makes your reactive chain shorter & simpler, since it doesn't need to worry about sorting and limiting.With the above points, your entire getRecommendedActivities()
could read something like:
scoresDb.get(userId)
.flatMapMany(c -> activityDb.getByLocation(location, limit, comparator)
.map(a -> score(a, c).getActivityId())
)
...which to me at least, reads much shorter & simpler.