Search code examples
javascripttypescriptecmascript-6producer-consumerrxjs5

How to subscribe exactly once to an element from AsyncSubject (consumer pattern)


In rxjs5, I have an AsyncSubject and want to subscribe to it multiple times, but only ONE subscriber should ever receive the next() event. All others (if they are not yet unsubscribed) should immediately get the complete() event without next().

Example:

let fired = false;
let as = new AsyncSubject();

const setFired = () => {
    if (fired == true) throw new Error("Multiple subscriptions executed");
    fired = true;
}

let subscription1 = as.subscribe(setFired);
let subscription2 = as.subscribe(setFired);

// note that subscription1/2 could be unsubscribed from in the future
// and still only a single subscriber should be triggered

setTimeout(() => {
    as.next(undefined);
    as.complete();
}, 500);

Solution

  • The simples way is to wrap your AsyncSubject in another object that handles the logic of calling 1 subscriber only. Assuming you want to invoke the 1st subscriber only, the code below should be a good starting point

    let as = new AsyncSubject();
    
    const createSingleSubscriberAsyncSubject = as => {
        // define empty array for subscribers
        let subscribers = [];
    
        const subscribe = callback => {
            if (typeof callback !== 'function') throw new Error('callback provided is not a function');
    
            subscribers.push(callback);
    
            // return a function to unsubscribe
            const unsubscribe = () => { subscribers = subscribers.filter(cb => cb !== callback); };
            return unsubscribe;
        };
    
        // the main subscriber that will listen to the original AS
        const mainSubscriber = (...args) => {
            // you can change this logic to invoke the last subscriber
            if (subscribers[0]) {
                subscribers[0](...args);
            }
        };
    
        as.subscribe(mainSubscriber);
    
        return {
            subscribe,
            // expose original AS methods as needed
            next: as.next.bind(as),
            complete: as.complete.bind(as),
        };
    };
    
    // use
    
    const singleSub = createSingleSubscriberAsyncSubject(as);
    
    // add 3 subscribers
    const unsub1 = singleSub.subscribe(() => console.log('subscriber 1'));
    const unsub2 = singleSub.subscribe(() => console.log('subscriber 2'));
    const unsub3 = singleSub.subscribe(() => console.log('subscriber 3'));
    
    // remove first subscriber
    unsub1();
    
    setTimeout(() => {
        as.next(undefined);
        as.complete();
        // only 'subscriber 2' is printed
    }, 500);