Search code examples
javaredisreactive-programmingvert.x

Unclear about how to structure a vertx-redis-client code


I'm trying to implement a compare-and-delete (CAD) function using the vertx-redis-client library.

I'm following this answer on how to do this. I've already implemented the Lua script method and that works fine. However, as redis installations may possibly prohibit running the eval command with arbitrary Lua scripts, I think I must also implement this functionality using the WATCH/GET/MULTI/compare/DELETE/UNWATCH sequence of commands.

However, I'm really having a hard time wrapping my head around the correct way to do this with the vert.x reactive way of programming.

Here's my code that implements both of the methods to do a CAD operation:

    private Future<Boolean> deleteLua(final String key, final String value) {
        final RedisAPI redis = RedisAPI.api(client);

        final Promise<Boolean> promise = Promise.promise();
        redis.eval(List.of("if redis.call('get', KEYS[1]) == ARGV[1] then redis.call('del', KEYS[1]); return 1; else return 0; end", "1", key, value))
                .onSuccess(res -> {
                    System.out.println("Success: " + res);
                    promise.complete(res.toInteger() == 1);
                })
                .onFailure(err -> {
                    System.out.println("Error: " + err);
                    promise.fail(err);
                });

        return promise.future();
    }
    private Future<Boolean> deleteMulti(final String key, final String value) {
        final RedisAPI redis = RedisAPI.api(client);

        final Promise<Boolean> promise = Promise.promise();
        redis.watch(List.of(key))
                .onSuccess(watchResult -> {
                    redis.get(key)
                            .onSuccess(getResult -> {
                                redis.multi()
                                        .onSuccess(multiResult -> {
                                            if (value.equals(String.valueOf(getResult))) {
                                                redis.del(List.of(key))
                                                        .onSuccess(delResult -> {
                                                            System.out.println("key deleted");
                                                        });
                                            } else {
                                                System.out.println("value did not match, key not deleted");
                                                promise.complete(false);
                                            }
                                            redis.exec().onSuccess(execResult -> {
                                                System.out.println("execResult: " + execResult);
                                                redis.unwatch().onSuccess(unwatchResult -> {
                                                    System.out.println("unwatched key");
                                                    promise.complete(true);
                                                });
                                            });
                                        })
                                        .onFailure(multiErr -> {
                                            System.out.println("The multi failed: " + multiErr);
                                            promise.fail(multiErr);
                                        });
                            });
                });
        return Future.succeededFuture(true);
    }

Both seem to work but I'm really uncertain about the deleteMulti method. First of all, it just looks ugly... it's so endlessly nested! I believe I can probably use a compose somehow to get around that to some extent, but I've tried doing that but keep running into walls on how exactly to do that. For example, I need the getResult inside the onSuccess block of the multi but when I try to use the compose structure, everything gets a bit messed up and I'm unclear on what result exactly I have in each compose block. Another thing I'm very unclear about is how the redis transactions work here. is my redis object keeping track of the session so that my call to watch will be "active" for any subsequent redis calls I make using that same object? Also, how do I need to treat unwatch? I'm currently only calling it within onSuccess for the watch->get->multi->exec block, but what if the get or multi fail? Do I need to unwatch on failures as well or will that just be handled by the session getting invalidated somehow? That actually seems unlikely since this is all reactive and the method (where the RedisAPI redis object is actually created will return probably before all the operations have finished.

So yeah... quite confused.

Any help in getting this code block into shape would be very much appreciated.


Solution

  • Using onSuccess() and onFailure() is a very tedious approach of working with futures. I suggest you familiarize yourself with the combinator methods, most notably map() and flatMap(). These are what I would call operators of sequential composition -- that is, they allow expressing "first do A, then do B". map() is synchronous, while flatMap() is asynchronous. (Also note that compose() is the same as flatMap(), so it's a matter of style. I prefer using flatMap(), so that's what you see below.) There are other operators for error handling etc., but these 2 are the most important in my opinion.

    That said, here's my implementation:

    private Future<Boolean> compareAndDelete(Redis client, String key, String value) {
        return client.connect().flatMap(connection -> {
            RedisAPI redis = RedisAPI.api(connection);
            return redis.watch(List.of(key))
                    .flatMap(ignored -> redis.get(key))
                    .flatMap(response -> {
                        if (response == null) {
                            // key does not exist
                            return Future.succeededFuture(false);
                        }
                        if (value.equals(response.toString())) {
                            return redis.multi()
                                    .flatMap(ignored -> redis.del(List.of(key)))
                                    .flatMap(ignored -> redis.exec())
                                    // null reply means transaction aborted
                                    .map(execResult -> execResult != null);
                        } else {
                            return Future.succeededFuture(false);
                        }
                    })
                    .eventually(connection::close);
        });
    }
    

    You'll note that I don't do any error handling here. Instead, I structure the code so that whenever an error happens (that is, a Future completes with an error), it gets propagated through the chain. The method returns a future which completes with true when the "compare and delete" operation succeeded or with false when the operation failed (the key does not exist, or someone changed the value in the middle of the transaction). The future may complete with a failure in case of some error.

    For testing, I also wrote this simple function:

    private Future<Void> delay(long timeout) {
        Promise<Void> promise = ((ContextInternal) vertx.getOrCreateContext()).promise();
        vertx.setTimer(timeout, ignored -> {
            promise.complete();
        });
        return promise.future();
    }
    

    With this, I can insert an arbitrary delay into the chain to be able to easily trigger the situation when someone else changes the value in the middle of the transaction. For example, I can insert this line

                                    .flatMap(ignored -> delay(10_000))
    

    in between the calls to redis.del() and redis.exec().

    In addition to using future combinators instead of success/failure callbacks, there's one other important difference. The first thing my implementation does is obtaining a connection from the Redis client. The Vert.x Redis client manages a connection pool and if you use the Redis object for sending commands, each command may be executed on a different connection. On the other hand, when using a RedisConnection object, a single connection will be used to send all the commands, which is necessary for Redis transactions to work correctly. The last thing my method does, as you see, is "closing" the connection (which in fact means just returning the connection to the pool).