Search code examples
javareactive-programmingspring-webfluxproject-reactor

Wrapping blocking client with Mono and execute sequence


I am trying to make a reactive application which needs to execute ssh commands.

Currently, there is an SSH client (based on sshd mina) which is blocking (maybe there is a way to use it in a non-blocking way, but I don't know it). My idea is to create a wrapper around this blocking client, so that I can transform the blocking calls into Mono as in the code below.

public class SshReactiveClient extends AutoCloseable {
  private final SshClient sshClient;

  public SshReactiveClient(final SshClient sshClient) {
    this.sshClient = sshClient;
  }

  public Mono<SshResponse> open(final Duration timeout) {
    return Mono.fromCallable(() -> sshClient.open(timeout))
        .subscribeOn(Schedulers.boundedElastic());
  }

  public Mono<SshResponse> execCommand(final SshCommand command, final Duration timeout) {
    return Mono.fromCallable(() -> sshClient.execCommand(command, timeout))
        .subscribeOn(Schedulers.boundedElastic());
  }

  @Override
  public void close() throws Exception {
    sshClient.close();
  }
}

First, is it a good idea to proceed like that or not? What would be better?

The second point is how to write the code so that I can execute a sequence of ssh command using the responses from the previous commands to execute the next one?


Solution

  • Your understanding is correct. You need to wrap blocking or sync code and run it on a separate Scheduler. The better way would be if client supports async interface.

    To execute commands in a sequence you need to build a flow using reactive API.

    execCommand(command1)
            .flatMap(res -> 
                 execCommand(getCommand2(res))
            )
            .flatMap(res -> 
                 execCommand(getCommand3(res))
            )
    

    There are many other options depending on your requirements. For example, if you need results from command1 & command2 to execute command3, you could just "shift" flow one level down

    execCommand(command1)
            .flatMap(res1 -> 
                 execCommand(getCommand2(res1))
                      .flatMap(res2 -> 
                            execCommand(getCommand3(res1, res2))
                      )
            )
            
    

    As an alternative, you could apply builder pattern to the reactor flow to collect responses in a sequential flow Builder pattern with Reactor mono

    You could also execute command1 and command2 in parallel and use responses from both

    Mono.zip(command1, command2)
         .flatMap(tuple -> 
               execCommand(getCommand3(tuple.getT1(), tuple.getT2()))
         )