Search code examples
androidrx-javarx-java2okhttpandroid-mvvm

RxJava + Websocket - How to add Observable to Websocket listener?


I have a ViewModel that is observing a RxJava Observable in my MainRepo class. I am trying to get my WebsocketListener in the MainRepo class to emit events, but I'm unsure how to do so.

MainRepo class:

private WebSocket ws;

public void createWsConnection() {
        Request request = new Request.Builder()
                .url(Constants.WEBSOCKET_ENDPOINT)
                .addHeader(Constants.WEBSOCKET_HEADERS_KEY, Constants.USER_ID)
                .build();

        OkHttpClient client = new OkHttpClient
                .Builder()
                .pingInterval(30, TimeUnit.SECONDS)
                .build();

        this.ws = client.newWebSocket(request, webSocketListener);
    }

This is where I'm confused. I don't know how I would use the websocket with the RxJava observable.

public Observable<String> createListener(){
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) {
                 //I don't know what to put here in order to emit messages
                 //back to my ViewModel class using the websocket listener
            }
        });
    }

The websocket listener:

 private WebSocketListener webSocketListener = new WebSocketListener() {

        @Override
        public void onOpen(@NotNull WebSocket webSocket, Response response) {
            Timber.d("Ws connection opened...", response.toString());
        }

        @Override
        public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closing...");
        }

        @Override
        public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closed...");
        }

        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
            Timber.d("Ws incoming message.");

        }

        @Override
        public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
            Timber.e(t, "Ws connection failure.", response.toString());

        }
    };

A function in the ViewModel class that is observing the Observable in my MainRepo class:

public void connectToWs(){
        mainRepo.createListener()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Timber.d("Subscribed");
            }

            @Override
            public void onNext(@NonNull String s) {
                Timber.d("Message: " + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Timber.e(e, "Something went wrong.");
            }

            @Override
            public void onComplete() {
                Timber.d("On complete.");
            }
        });
    }

Solution

  • Create a PublishSubject and change your createListener method to return it:

    private PublishSubject<String> publishSubject = PublishSubject.create<String>();
    
    public Observable<String> createListener(){
        return publishSubject;
    }
    

    PublishSubject is an Observable so notice that you don't need to change your method signature, but I'd suggest you to rename the method name to something like observeMessages.

    Then in your websocket listener you can emit the messages to the PublishSubject with onNext method. You should also call onComplete in the onClosed method and onError in the onFailure method:

     private WebSocketListener webSocketListener = new WebSocketListener() {
    
            @Override
            public void onOpen(@NotNull WebSocket webSocket, Response response) {
                Timber.d("Ws connection opened...", response.toString());
            }
    
            @Override
            public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
                Timber.d("Ws connection closing...");
            }
    
            @Override
            public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
                Timber.d("Ws connection closed...");
    
                publishSubject.onComplete();
            }
    
            @Override
            public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
                Timber.d("Ws incoming message.");
    
                publishSubject.onNext(text);
            }
    
            @Override
            public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
                Timber.e(t, "Ws connection failure.", response.toString());
    
                publishSubject.onError(t);
            }
        };