Search code examples
javajerseyjacksonrx-javajersey-client

No serializer found for class rx.Observable using Jersey RxJava client


I'm creating an API resource that uses the Jersey RxJava client to aggregate other resources into a single response. However, I'm getting an error returned which is a bit puzzling. The object being returned is a JSONArray but I'm getting this:

No serializer found for class rx.Observable and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) ) (through reference chain: org.json.simple.JSONArray[0])

and method:

@Override
public void getUsersDashboard(String token, String userId, @Suspended final AsyncResponse async) {
    List<JSONObject> list = //query persistence layer

    final Queue<String> errors = new ConcurrentLinkedQueue<>();

    final List<Observable> observables = list.stream()
            .map(jsonObject -> (String) jsonObject.get("href"))
            .map(link -> dashboard(token, link, errors))
            .collect(Collectors.toList());

    Observable.just(new JSONArray())
            .zipWith(observables, (jsonArray, resultFromObservable) -> {
                jsonArray.add(resultFromObservable);
                return jsonArray;
            })
            .subscribe(async::resume, async::resume);
}

Any ideas what would be causing this?


Solution

  • So, your observables list is of type List<Observable>. From this I assume that the dashboard method returns an Observable<?>; further assuming that it returns a Observable<T extends JSONNode>, I would rewrite your method as follows:

    @Override
    public void getUsersDashboard(String token, String userId, @Suspended final AsyncResponse async) {
        List<JSONObject> list = //query persistence layer
    
        final Queue<String> errors = new ConcurrentLinkedQueue<>();
    
        Observable
        .fromIterable(list)
        .map(jsonObject -> (String) jsonObject.get("href"))
        .flatMap(link -> dashboard(token, link, errors))
        .collect(JSONArray::new, JSONArray::add)
        .subscribe(async::resume, async::resume);
    }
    

    The zip method would also kinda work if you used the following line:

    .zipWith(Observable.merge(observables), (jsonArray, resultFromObservable) -> {
    

    However, zip() takes one element out of each observable, meaning that you would end up with an array of size one.