Search code examples
angularrxjsdeferredobservable

Defer Observables


I am using a standard WebSocket which sends messages and eventually receives a response. Previously, I have used $q.defer, stored the deferred object and returned a promise. The response handler would look up the stored deferred object and resolve it using a value.

Is this possible using observables?

// As part of the websocket setup
websocket.onmessage = function(message) {
    uuid = message.someResponse.uuid;
    this._observables[uuid].create(function(observer) {
        observer.onNext(response);
        observer.onCompleted();
    });
}

public sendRequest(request : any) : Observable<any> {
    this.sendMessage(request);
    return this._observables[request[Object.keys(request)[0]].uuid] = new Observable();
}

// Inside some requesting method
var observable = this.sendRequest(request);
observable.subscribe(
    response => console.log(response),
    response => console.log(response.error),
    () => {
        delete this._callbacks[uuid];
    }
);

Solution

  • I would refactor your code this way:

    public initialize() : Observable<any> {
      return Observable.create(observer => {
        websocket.onmessage = function(message) {
          uuid = message.someResponse.uuid;
          observer.next(message);
        }
      });
    }
    

    and the way to send and received messages as well:

    var observable = initialize();
    
    var request = (...)
    this.sendRequest(request);
    
    observable.subscribe(
      message => {
        // called each time a message is received
        console.log(message)
      },
      response => {
        console.log(response.error)
      },
      () => {
      }
    );
    

    In the contrary to promises, observables needs to be initialize once since they support events. Each time a message is sent, the event callback (the first parameter of the subscribe method) will be called with the message as parameter.

    For more details you could have a look the following article in section "Event-based support":