Search code examples
androidrx-javaquickblox

Using RxJava to make Quickblox API requests


I'm trying to use the Quickblox Android SDK combined with RxJava to make some chained and time-based Quickblox API calls. The API already have async methods. Here are two of them:

Create a session:

QBAuth.createSession(new QBEntityCallbackImpl<QBSession>() {

            @Override
            public void onSuccess(QBSession session, Bundle params) {


            }

            @Override
            public void onError(List<String> errors) {


            }
        });

Create a Dialog (Chat Room)

QBChatService.getInstance().getGroupChatManager().createDialog(qbDialog,
            new QBEntityCallbackImpl<QBDialog>() {

                @Override
                public void onSuccess(QBDialog dialog, Bundle args) {


                }

                @Override
                public void onError(List<String> errors) {


                }
            });

As you saw, every API call must have a Callback Implementation as method parameter. The problem is that I have to make some chained API calls. Example:

  1. Create empty session, then
  2. Sign in, then
  3. Retrieve user info, then
  4. Login to chat service, then
  5. ....

I searched a little bit about RxJava and I saw that it's good to solve this kind of problem, because you can apply some filters and other things before make the next API call. The problem is that I don't know how to adapt this API to work with RxJava.

  • Should the API call with callbacks be inside the Observable call() method?

  • Should the onError() and the onSuccess() methods of the Quickblox API call the onError() and the onNext()/onCompleted() methods of the Subscriber?

Could anyone make some example of the use of RxJava to create a session and then create a dialog using the provided create session and create dialog API methods?


Solution

  • Here's a example of how to create observables using Observable.create() in your case:

    Observable<QBSession> connectionObservable = Observable.create(new OnSubscribe<QBSession>() {
        @Override
        public void call(Subscriber<? super QBSession> subscriber) {
            if (!subscriber.isUnsubscribed()) {
                QBAuth.createSession(new QBEntityCallbackImpl<QBSession>() {
    
                    @Override
                    public void onSuccess(QBSession session, Bundle params) {
                        subscriber.onNext(session);
                        subscriber.onCompleted();
                    }
    
                    @Override
                    public void onError(List<String> errors) {
                        subscriber.onError(/*some throwable here*/);
                    }
                });
            }
        }
    });
    
    Observable<QBDialog> dialogCreationObservable = Observable.create(new OnSubscribe<QBDialog>() {
        @Override
        public void call(Subscriber<? super QBDialog> subscriber) {
            if (!subscriber.isUnsubscribed()) {
                QBChatService.getInstance().getGroupChatManager().createDialog(qbDialog,
                        new QBEntityCallbackImpl<QBDialog>() {
    
                            @Override
                            public void onSuccess(QBDialog dialog, Bundle args) {
                                subscriber.onNext(dialog);
                                subscriber.onCompleted();
                            }
    
                            @Override
                            public void onError(List<String> errors) {
                                subscriber.onError(/*some throwable here*/);
                            }
                        });
            }
        }
    });