By subscribing to getOrderBook
I am getting data via Netty and Websocket.
The first time a full OrderBook
arrives, and then incremental updates follow. As a result, I get the complete OrderBook
with all the changes.
Please tell me how can I create another Observable so that I can receive separately the data that comes in the update (incrementalUpdateData
) through a subscription?
private final Map<Instrument, OrderBook> orderBookMap = new HashMap<>();
public Observable<OrderBook> getOrderBook(Instrument instrument) {
return service.subscribeChannel(instrument).flatMap(jsonNode -> {
if (jsonNode.get("action").asText().equalsIgnoreCase("snapshot")) { //first update - full snapshot
OrderBook orderBook = mapper.treeToValue(jsonNode.get("data"),
mapper.getTypeFactory().constructCollectionType(List.class, Orderbook.class));
orderBookMap.put(instrument, orderBook);
return Observable.just(orderBook);
} else { //second update and later - incremental update
OrderBook orderBook = orderBookMap.getOrDefault(instrument, null);
PublicOrder incrementalUpdateData = mapper.treeToValue(jsonNode.get("data").get(0).get("asks"),
mapper.getTypeFactory().constructCollectionType(List.class, PublicOrder.class));
orderBook.update(incrementalUpdateData);
return Observable.just(orderBook);
}
});
}
What do I expect.
getOrderBook.subscribe(some instrument)
- get full orderBook
getOrderBookUpdate.subscribe(some instrument)
- get only incremental data
Done via publishsubject
Result code looks like:
private final Map<Instrument, PublishSubject<OrderBookUpdate>> orderBookUpdatesSubscriptions;<br/>
private final Map<Instrument, OrderBook> orderBookMap = new HashMap<>();
public Observable<OrderBook> getOrderBook(Instrument instrument) {
return service.subscribeChannel(instrument).flatMap(jsonNode -> {
if (jsonNode.get("action").asText().equalsIgnoreCase("snapshot")) { //first update - full snapshot
OrderBook orderBook = mapper.treeToValue(jsonNode.get("data"),
mapper.getTypeFactory().constructCollectionType(List.class, Orderbook.class));
orderBookMap.put(instrument, orderBook);
return Observable.just(orderBook);
} else { //second update and later - incremental update
OrderBook orderBook = orderBookMap.getOrDefault(instrument, null);
PublicOrder incrementalUpdateData = mapper.treeToValue(jsonNode.get("data").get(0).get("asks"),
mapper.getTypeFactory().constructCollectionType(List.class, PublicOrder.class));
orderBookUpdatesSubscriptions.computeIfPresent(instrument, (a,b) -> b.onNext(new OrderBookUpdate(incrementalUpdateData)));
orderBook.update(incrementalUpdateData);
return Observable.just(orderBook);
}
}); }
public Observable<OrderBookUpdate> getOrderBookUpdates(Instrument instrument) {
return orderBookUpdatesSubscriptions.computeIfAbsent(instrument, v -> PublishSubject.create());
}