In rxjs5
, I have an AsyncSubject and want to subscribe to it multiple times, but only ONE subscriber should ever receive the next()
event. All others (if they are not yet unsubscribed) should immediately get the complete()
event without next()
.
Example:
let fired = false;
let as = new AsyncSubject();
const setFired = () => {
if (fired == true) throw new Error("Multiple subscriptions executed");
fired = true;
}
let subscription1 = as.subscribe(setFired);
let subscription2 = as.subscribe(setFired);
// note that subscription1/2 could be unsubscribed from in the future
// and still only a single subscriber should be triggered
setTimeout(() => {
as.next(undefined);
as.complete();
}, 500);
The simples way is to wrap your AsyncSubject in another object that handles the logic of calling 1 subscriber only. Assuming you want to invoke the 1st subscriber only, the code below should be a good starting point
let as = new AsyncSubject();
const createSingleSubscriberAsyncSubject = as => {
// define empty array for subscribers
let subscribers = [];
const subscribe = callback => {
if (typeof callback !== 'function') throw new Error('callback provided is not a function');
subscribers.push(callback);
// return a function to unsubscribe
const unsubscribe = () => { subscribers = subscribers.filter(cb => cb !== callback); };
return unsubscribe;
};
// the main subscriber that will listen to the original AS
const mainSubscriber = (...args) => {
// you can change this logic to invoke the last subscriber
if (subscribers[0]) {
subscribers[0](...args);
}
};
as.subscribe(mainSubscriber);
return {
subscribe,
// expose original AS methods as needed
next: as.next.bind(as),
complete: as.complete.bind(as),
};
};
// use
const singleSub = createSingleSubscriberAsyncSubject(as);
// add 3 subscribers
const unsub1 = singleSub.subscribe(() => console.log('subscriber 1'));
const unsub2 = singleSub.subscribe(() => console.log('subscriber 2'));
const unsub3 = singleSub.subscribe(() => console.log('subscriber 3'));
// remove first subscriber
unsub1();
setTimeout(() => {
as.next(undefined);
as.complete();
// only 'subscriber 2' is printed
}, 500);