I'm creating an API resource that uses the Jersey RxJava client to aggregate other resources into a single response. However, I'm getting an error returned which is a bit puzzling. The object being returned is a JSONArray but I'm getting this:
No serializer found for class rx.Observable and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) ) (through reference chain: org.json.simple.JSONArray[0])
and method:
@Override
public void getUsersDashboard(String token, String userId, @Suspended final AsyncResponse async) {
List<JSONObject> list = //query persistence layer
final Queue<String> errors = new ConcurrentLinkedQueue<>();
final List<Observable> observables = list.stream()
.map(jsonObject -> (String) jsonObject.get("href"))
.map(link -> dashboard(token, link, errors))
.collect(Collectors.toList());
Observable.just(new JSONArray())
.zipWith(observables, (jsonArray, resultFromObservable) -> {
jsonArray.add(resultFromObservable);
return jsonArray;
})
.subscribe(async::resume, async::resume);
}
Any ideas what would be causing this?
So, your observables
list is of type List<Observable>
. From this I assume that the dashboard
method returns an Observable<?>
; further assuming that it returns a Observable<T extends JSONNode>
, I would rewrite your method as follows:
@Override
public void getUsersDashboard(String token, String userId, @Suspended final AsyncResponse async) {
List<JSONObject> list = //query persistence layer
final Queue<String> errors = new ConcurrentLinkedQueue<>();
Observable
.fromIterable(list)
.map(jsonObject -> (String) jsonObject.get("href"))
.flatMap(link -> dashboard(token, link, errors))
.collect(JSONArray::new, JSONArray::add)
.subscribe(async::resume, async::resume);
}
The zip method would also kinda work if you used the following line:
.zipWith(Observable.merge(observables), (jsonArray, resultFromObservable) -> {
However, zip()
takes one element out of each observable, meaning that you would end up with an array of size one.