I'm very new to the reactive world and struggling to understand how to accomplish a task. I'm working on a legacy project where I must implement an interface that has many methods to query various objects from redis. Sometimes the queries are as simple as querying a hash by ID, so it's a single call to redis to get the hash. Other times, I may need to lookup the ID from a redis set first based some parameters, then get the hash with the resultant ID. I'm using Reactor 3.1.0.M3 with Lettuce 5.0.0.RC1 in a Spring Boot application.
My existing code to for these two example methods look like this:
public <T extends CatalogInfo> T get(String id, Class<T> clazz) {
String result = (String)repository.getRedisHashRepository().getHashById(CatalogUtils.root(clazz).getSimpleName(), id);
if (null != result) {
return serializer.fromData(result, clazz);
}
return null;
}
public <T extends CatalogInfo> T get(String attName, String attValue, Class<T> clazz) {
String attKey = CatalogUtils.buildKey(CatalogUtils.root(clazz), attName, attValue);
String id = CatalogUtils.getIdFromSet(repository.getRedisSetRepository().getSetMembers(attKey));
if (id == null) {
return null;
}
return get(id, clazz);
}
There are some utility functions in there that just help me build the keys I want to use for redis from the Class name and to make sure the ID stored in the redis set is a single value. As you can see, in the second get method, I call the first get method with the result from the set.
Implementing the first method with Lettuce/Reactor was easy enough:
public <T extends CatalogInfo> Mono<String> getReactive(String id, Class<T> clazz, Publisher<String>... publisher) {
Mono<String> mono = Mono.just(id).flatMap(new Function<String, Mono<String>>() {
@Override
public Mono<String> apply(String id) {
return hashCommands.hget(CatalogUtils.root(clazz).getSimpleName(), id);
}
});
return mono;
}
At that point, I can call mono.block() to get the resulting value. I can fairly easily get the functionality of the second get method working by chaining together two flatMaps/functions:
public <T extends CatalogInfo> Flux<String> getIdFromSetReactive(String attName, String attValue, Class<T> clazz) {
String attKey = CatalogUtils.buildKey(CatalogUtils.root(clazz), attName, attValue);
Flux<String> flux = Flux.just(attKey).flatMap(new Function<String, Flux<String>>() {
@Override
public Flux<String> apply(String attKey) {
return commands.smembers(attKey);
}
}).flatMap(new Function<String, Publisher<String>>() {
@Override
public Publisher<String> apply(String id) {
return hashCommands.hget(CatalogUtils.root(clazz).getSimpleName(), id);
}
});
return flux;
}
I have many different types of methods that may require up to 8 calls to redis to complete. In my original code, I could re-use each method and call one method from another, but I can't figure out how to do this with reactor.
I would like to be able to call a method that builds a Flux to get the ID from a redis set (let's call it fluxA), then call another method that builds a Flux to query a redis hash based on ID (fluxB), etc.
I think I might need to define each function I might need as member variables like this:
private Function<String, Flux<String>> getIdFromSetFunction = new Function<String, Flux<String>>() {
@Override
public Flux<String> apply(String attKey) {
return commands.smembers(attKey);
}
};
and then make calls like
return Flux.just(attKey).flatMap(getIdFromSetFunction).flatMap(getHash);
The only issue is the code that gets executed in those functions need the Class information that is currently made available in my method calls. But I'm not sure that is the right approach.
Any advice would be greatly appreciated!
conceptually, you aren't "composing mono objects" as much as "creating a new one for each step".
Mono<String> a = Mono.just("something");
Mono<String> b = a.flatMap( s -> goDoSomethingElseThatReturnsAMono(s));
String result = b.block();
you can keep chaining like that as much as you want. (or go into the Flux
world with Mono.flatMapMany
if you'll have multiple data items you want to process individually).
nothing happens until block
is called, as that subscribes to b
and blocks the current thread until a result is available.