Search code examples
javascriptnode.jspromiserxjsnode-amqp

RxJS Observable with asynchronous subscriber function


I'm trying to do something that feels like it should be straightforward, but is proving surprisingly difficult.

I have a function to subscribe to a RabbitMQ queue. Concretely, this is the Channel.consume function here: http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume

It returns a promise which is resolved with a subscription id - which is needed to unsubscribe later - and also has a callback argument to invoke when messages are pulled off the queue.

When I want to unsubscribe from the queue, I'd need to cancel the consumer using the Channel.cancel function here: http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel. This takes the previously returned subscription id.

I want to wrap all of this stuff in an Observable that subscribes to the queue when the observable is subscribed to, and cancels the subscription when the observable is unsubscribed from. However, this is proving somewhat hard due to the 'double-asynchronous' nature of the calls (I mean to say that they have both a callback AND return a promise).

Ideally, the code I'd like to be able to write is:

return new Rx.Observable(async (subscriber) => {
  var consumeResult = await channel.consume(queueName, (message) => subscriber.next(message));
  return async () => {
    await channel.cancel(consumeResult.consumerTag);
  };
});

However, this isn't possible as this constructor doesn't support async subscriber functions or teardown logic.

I've not been able to figure this one out. Am I missing something here? Why is this so hard?

Cheers, Alex


Solution

  • The created observable does not need to wait for the channel.consume promise to resolve, as the observer (it's an observer that's passed, not a subscriber) is only called from within the function you provide.

    However, the unsubscribe function that you return will have to wait for that promise to resolve. And it can do that internally, like this:

    return new Rx.Observable((observer) => {
      var consumeResult = channel.consume(queueName, (message) => observer.next(message));
      return () => {
        consumeResult.then(() => channel.cancel(consumeResult.consumerTag));
      };
    });