Search code examples
javarestrx-javajersey-clienthateoas

Jersey RxJava client and collection of resource endpoints


I'm very very new to RxJava and I'm still trying to understand the data flow but I want to know if this is possible before heading to far down the rabbit hole.

The goal: to decouple API resources and allow them to be distributed, as well as providing flexibility to the clients (web/mobile) to fetch data based on the href links provided to only a few 'known' API endpoints.

I've got a collection of JSON objects from one API where each json has an 'href' field that provides a link to the API resource where the full details of the data can be retrieved. For example:

{
   {
      name: "assassins_creed",
      href: "games/assassins_creedId123"
   },
   {
      name: "bioshock",
      href: "games/bioshockId456"
   },
   {
      name: "clean_code",
      href: "books/clean_codeId789"
   },
   {
      name: "christmas",
      href: "events/xmas001"
   }
}

After retrieving this collection, I'd like to make a call to the appropriate resource using that href value and store the result in a collection such as a JSON array.

I'm mapping the Strings from the list of JSON objects into another list then I'm attempting to add them into an Observable so that I can retrieve each resource asynchronously into the same response.

I've been using the Jersey example as reference and I'm concerned that its not able to take dynamic href links and return the response I would like.

So far I stopped at:

@Override
public List<JSONObject> getUsersDashboard(TokenModel token, String userId, @Suspended final AsyncResponse async) {
    List<JSONObject> list = //Database request to get items for userId
    List<String> hrefLinks = list.stream()
            .map(e -> (String) e.get("href"))
            .collect(Collectors.toList());

    final Queue<String> errors = new ConcurrentLinkedQueue<>();

    Observable.just(new JSONArray())
            .zipWith(dashboard(token.getAccessToken(),"plhUrl", errors), (array, objects) -> {
                array.add(objects);
                return array;
            })
            .subscribe(response -> {
                //errors?
                async.resume(response);
            }, async::resume);
}

private Observable<List<JSONObject>> dashboard(String access, String urlFragment, Queue<String> errors) {
     Client client = ClientBuilder.newClient();
     return RxObservable.from(client).target(urlFragment).request()
            .header("Authorization", access)
            .rx()
            .get(new GenericType<List<JSONObject>>(){})
            .onErrorReturn(throwable -> {
                errors.offer(throwable.getMessage());
                return Collections.emptyList();
            });
}

I was hesitant to put this into a for loop to create Observables for each hrefLinks entry.

How can I get the responses from multiple resources into a single coherent response for the client?


Solution

  • flatMap operator is the thing you are looking for. You can emit the href links in a stream so that in flatMap operator you convert each link into a new observable which propagates its result into the original stream. I think this is exactly the behavior you described.

    EDIT: Retrofit library provides nice interface to create observables from links. To elaborate more on your example, you first need to transform the collection object into some kind of Iterable using map operator. Once you have an interable, use flatMap to create an observable of items that belong to the Iterable using Observable.from(). This will emit the links as separate emissions. You then use flatMap again to transform each link into an observable of network request which forwards the emission back to the original stream.

    getCollectionObjectObservable()
      .map(result -> iterable) // transform the object into Iterable( ie. List ) 
      .flatMap(iterable -> Observable.from(iterable))
      .flatMap(link -> createRequestObservable(link))
      .subscribe(result -> /* process the result of each item here */ )