Search code examples
rx-javafutureblockingvert.x

How to go from blocking vertx future to RxJava Observable


I saw code like this in our repo.

   public Observable<Optional<DeviceInfo>> getDeviceInfo(final String userAgent) {
        final ObservableFuture<Optional<DeviceInfo>> observable = RxHelper.observableFuture();
        vertx.executeBlocking(future -> {
            try {
                final Optional<Device> device = Optional.ofNullable(engine.get().getDeviceForRequest(userAgent));
                if (device.isPresent()) {
                    future.complete(Optional.of(new DeviceInfo()));
                } else {
                    future.complete(Optional.empty());
                }
            } catch (final RuntimeException e) {
                LOGGER.error("Unable to get the UA device info {}, reason {}", userAgent, e.getMessage());
                future.fail(e.getMessage());
            }
        }, observable.toHandler());

        return observable.single();
    }

For me it seems a little strange to write that much code to go from executing this blocking code and map the future to a single Observable.

Isn't there an easier and better way to do exactly this? e.g. some convenience factory method etc


Solution

  • With the Vert.x API for RxJava and Optional.map:

    public Single<Optional<DeviceInfo>> getDeviceInfo(final String userAgent) {
      return vertx.rxExecuteBlocking(future -> {
        try {
          final Optional<Device> device = Optional.ofNullable(engine.get().getDeviceForRequest(userAgent));
          future.complete(device.map(d -> new DeviceInfo()));
        } catch (final RuntimeException e) {
          LOGGER.error("Unable to get the UA device info {}, reason {}", userAgent, e.getMessage());
          future.fail(e.getMessage());
        }
      });
    }