I need to get the camera ID from the server, then get the events of this camera by ID, and at the end get the image by ID. Then, all this needs to be combined into a UI post {Camera name - image url - it events} For example, we might have 5 camera names, 30 event IDs (6 for one camera name) and 30 url. I tried to use zip to combine Observable api calls that (separately) get the camera name, event id and image url. But as I understand it does not work normally due to the fact that zip cannot properly match 5 names and 30 IDs Tell me what can be used for this?
GetApiMethods getApiMethods = NetworkService.getInstance().createService(GetApiMethods.class);
Observable<Camera> cameras = getCamera();
Observable<Event> events = getEvents();
Observable<Post> combined = Observable.zip(cameras, events, (camera, event) -> new Post(camera.getDisplayName(),"null",event.getType()));
public LiveData<String> CameraRequests() {
combined.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.concatMap(Observable::fromArray)
.subscribe(new Observer<Post>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Post post) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
return liveData;
}
public Observable<Event> getEvents() {
return Observable.create(allEvents -> getApiMethods.getCameras()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.concatMap(Observable::fromIterable)
.concatMap(Observable::fromIterable)
.concatMap(camera -> getApiMethods.getEvents(camera.getAccessPoint().replace("hosts", "")))
.concatMap(Observable::fromIterable)
.map(Event::getId)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String event) {
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("RXJAVA1", String.valueOf(e));
}
@Override
public void onComplete() {
}
})
);
}
public Observable<Camera> getCamera() {
return Observable.create(cameras1 -> getApiMethods.getCameras()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.concatMap(Observable::fromIterable)
.map(Camera::getDisplayName)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("RXJAVA", String.valueOf(e));
}
@Override
public void onComplete() {
}
})
);
}
Seems pretty forward to me if using flatMap
:
Observable<Post> getPostsObservable() {
return getCameras().flatMap(camera ->
// for each camera, build observable to get events
getEvents(camera.accessPoint).flatMap(event ->
// for each event, build observable to get imageUrl
getImageUrl(event.id).map(imageUrl ->
// combine camera, event and imageUrl in current rx-chain
new Post(camera, event, imageUrl)
)
)
);
}
You may have to tweak your code a little bit more, like simplifying your getCameras
and getEvents
methods. You can check out the demo below that mimics your app, to better understand how it works and what to rework. If calling the getPosts
method, you should see in logcat that the Posts are being correctly created and received.
import android.util.Log;
import androidx.annotation.NonNull;
import java.util.ArrayList;
import java.util.List;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class RxDemo {
// your Camera class
public class Camera {
private final String accessPoint;
public Camera(String accessPoint) {
this.accessPoint = accessPoint;
}
public String getAccessPoint() {
return accessPoint;
}
}
// your Event class
public class Event {
private final String id;
public Event(String id) {
this.id = id;
}
public String getId() {
return id;
}
}
// your Post class
public class Post {
private final Camera camera;
private final Event event;
private final String imageUrl;
public Post(Camera camera, Event event, String imageUrl) {
this.camera = camera;
this.event = event;
this.imageUrl = imageUrl;
}
public Camera getCamera() {
return camera;
}
public Event getEvent() {
return event;
}
public String getImageUrl() {
return imageUrl;
}
@NonNull
@Override
public String toString() {
return "camera: " + camera.accessPoint + "; event: " + event.id + "; imageUrl: " + imageUrl;
}
}
// mocked getApiMethods.getCameras(), return 5 mocked cameras
private Observable<List<Camera>> apiGetCameras() {
List<Camera> cameras = new ArrayList<>();
cameras.add(new Camera("cam_1"));
cameras.add(new Camera("cam_2"));
cameras.add(new Camera("cam_3"));
cameras.add(new Camera("cam_4"));
cameras.add(new Camera("cam_5"));
return Observable.just(cameras);
}
// mocked getApiMethods.getEvents, return 6 mocked events with cameraId postFix
private Observable<List<Event>> apiGetEvents(String cameraId) {
List<Event> events = new ArrayList<>();
events.add(new Event("event_1_" + cameraId));
events.add(new Event("event_2_" + cameraId));
events.add(new Event("event_3_" + cameraId));
events.add(new Event("event_4_" + cameraId));
events.add(new Event("event_5_" + cameraId));
events.add(new Event("event_6_" + cameraId));
return Observable.just(events);
}
// mocked getApiMethods.getImageUrl
private Observable<String> apiGetImageUrl(String eventId) {
return Observable.just("http://my.domain.com/" + eventId + ".png");
}
public Observable<Camera> getCameras() {
return apiGetCameras().concatMapIterable(cam -> cam);
}
public Observable<Event> getEvents(String cameraId) {
return apiGetEvents(cameraId).concatMapIterable(event -> event);
}
// build observable that emits posts
private Observable<Post> getPostsObservable() {
return getCameras().flatMap(camera ->
// for each camera, build observable to get events
getEvents(camera.accessPoint).flatMap(event ->
// for each event, build observable to get imageUrl
apiGetImageUrl(event.id).map(imageUrl ->
// combine camera, event and imageUrl in current rx-chain
new Post(camera, event, imageUrl)
)
)
);
}
// call this in your activity/fragment/viewModel to get the posts
public void getPosts() {
getPostsObservable()
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Post>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Post post) {
Log.d("AAAA", post.toString());
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}