Search code examples
javaobservablerx-java2

How to create an Observable inside another Observable to receive part of the data?


By subscribing to getOrderBook I am getting data via Netty and Websocket.
The first time a full OrderBook arrives, and then incremental updates follow. As a result, I get the complete OrderBook with all the changes.

Please tell me how can I create another Observable so that I can receive separately the data that comes in the update (incrementalUpdateData) through a subscription?

private final Map<Instrument, OrderBook> orderBookMap = new HashMap<>();

  public Observable<OrderBook> getOrderBook(Instrument instrument) {
    return service.subscribeChannel(instrument).flatMap(jsonNode -> {
      if (jsonNode.get("action").asText().equalsIgnoreCase("snapshot")) { //first update - full snapshot
        OrderBook orderBook = mapper.treeToValue(jsonNode.get("data"),
                mapper.getTypeFactory().constructCollectionType(List.class, Orderbook.class));
        orderBookMap.put(instrument, orderBook);
        return Observable.just(orderBook);
      } else { //second update and later - incremental update
        OrderBook orderBook = orderBookMap.getOrDefault(instrument, null);
        PublicOrder incrementalUpdateData = mapper.treeToValue(jsonNode.get("data").get(0).get("asks"),
                mapper.getTypeFactory().constructCollectionType(List.class, PublicOrder.class));
        orderBook.update(incrementalUpdateData);
        return Observable.just(orderBook);
      }
    });
  }

What do I expect.
getOrderBook.subscribe(some instrument) - get full orderBook
getOrderBookUpdate.subscribe(some instrument) - get only incremental data


Solution

  • Done via publishsubject
    Result code looks like:

      private final Map<Instrument, PublishSubject<OrderBookUpdate>> orderBookUpdatesSubscriptions;<br/>
      private final Map<Instrument, OrderBook> orderBookMap = new HashMap<>();
    
      public Observable<OrderBook> getOrderBook(Instrument instrument) {
        return service.subscribeChannel(instrument).flatMap(jsonNode -> {
          if (jsonNode.get("action").asText().equalsIgnoreCase("snapshot")) { //first update - full snapshot
            OrderBook orderBook = mapper.treeToValue(jsonNode.get("data"),
                    mapper.getTypeFactory().constructCollectionType(List.class, Orderbook.class));
            orderBookMap.put(instrument, orderBook);
            return Observable.just(orderBook);
          } else { //second update and later - incremental update
            OrderBook orderBook = orderBookMap.getOrDefault(instrument, null);
            PublicOrder incrementalUpdateData = mapper.treeToValue(jsonNode.get("data").get(0).get("asks"),
                    mapper.getTypeFactory().constructCollectionType(List.class, PublicOrder.class));
            orderBookUpdatesSubscriptions.computeIfPresent(instrument, (a,b) -> b.onNext(new OrderBookUpdate(incrementalUpdateData)));
            orderBook.update(incrementalUpdateData);
            return Observable.just(orderBook);
          }
        });   }
    
      public Observable<OrderBookUpdate> getOrderBookUpdates(Instrument instrument) {
        return orderBookUpdatesSubscriptions.computeIfAbsent(instrument, v -> PublishSubject.create());
      }