Search code examples
rxjsqueuerxjs6

RxJs - how to make observable behave like queue


I'm trying to achieve next:

private beginTransaction(): Observable() {
  ..
}

private test(): void {
  this.beginTransaction().subscribe((): void => {
    this.commitTransaction();
  });

  this.beginTransaction().subscribe((): void => {
     this.commitTransaction();
  });
}

beginTransaction can be called concurrently, but should delay the observable until first or only one beginTransaction finished.

In order words: Only one transaction can be in progress at any time.

What have I tried:

private transactionInProgress: boolean = false;
private canBeginTransaction: Subject<void> = new Subject<void>();

private bla3(): void {

  this.beginTransaction().subscribe((): void => {
    console.log('beginTransaction 1');
    this.commitTransaction();
  });

  this.beginTransaction().subscribe((): void => {
    console.log('beginTransaction 2');
    this.commitTransaction();
  });

  this.beginTransaction().subscribe((): void => {
    console.log('beginTransaction 3');
    this.commitTransaction();
  });
}

private commitTransaction(): void {
  this.transactionInProgress = false;
  this.canBeginTransaction.next();
}

private beginTransaction(): Observable<void> {

  if(this.transactionInProgress) {
    return of(undefined)
    .pipe(
      skipUntil(this.canBeginTransaction),
      tap((): void => {
        console.log('begin transaction');
      })
    );
  }
  this.transactionInProgress = true;
  return of(undefined);
}

Solution

  • What you've asked about is pretty vague and general. Without a doubt, a more constrained scenario could probably look a whole lot simpler.

    Regardless, here I create a pipeline that only lets transaction(): Observable be subscribed to once at a time.

    Here's how that might look:

    /****
     * Represents what each transaction does. Isn't concerned about
     * order/timing/'transactionInProgress' or anything like that.
     * 
     * Here is a fake transaction that just takes 3-5 seconds to emit
     * the string: `Hello ${name}`
     ****/
    function transaction(args): Observable<string> {
      const name = args?.message;
      const duration = 3000 + (Math.random() * 2000);
      return of("Hello").pipe(
        tap(_ => console.log("starting transaction")),
        switchMap(v => timer(duration).pipe(
          map(_ => `${v} ${name}`)
        )),
        tap(_ => console.log("Ending transation"))
      );
    }
    
    // Track transactions
    let currentTransactionId = 0;
    // Start transactions
    const transactionSubj = new Subject<any>();
    // Perform transaction: concatMap ensures we only start a new one if 
    // there isn't a current transaction underway
    const transaction$ = transactionSubj.pipe(
      concatMap(({id, args}) => transaction(args).pipe(
        map(payload => ({id, payload}))
      )),
      shareReplay(1)
    );
    
    /****
     * Begin a new transaction, we give it an ID since transactions are
     * "hot" and we don't want to return the wrong (earlier) transactions,
     * just the current one started with this call.
     ****/
    function beginTransaction(args): Observable<any> {
      return defer(() => {
        const currentId = currentTransactionId++;
        transactionSubj.next({id: currentId, args});
        return transaction$.pipe(
          first(({id}) => id === currentId),
          map(({payload}) => payload)
        );
      })
    }
    
    // Queue up 3 transactions, each one will wait for the previous 
    // one to complete before it will begin.
    beginTransaction({message: "Dave"}).subscribe(console.log);
    beginTransaction({message: "Tom"}).subscribe(console.log);
    beginTransaction({message: "Tim"}).subscribe(console.log);
    

    Asynchronous Transactions

    The current setup requires transactions to be asynchronous, or you risk losing the first one. The workaround for that is not simple, so I've built an operator that subscribes, then calls a function as soon as possible after that.

    Here it is:

    function initialize<T>(fn: () => void): MonoTypeOperatorFunction<T> {
      return s => new Observable(observer => {
        const bindOn = name => observer[name].bind(observer);
        const sub = s.subscribe({
          next: bindOn("next"),
          error: bindOn("error"),
          complete: bindOn("complete")
        });
        fn();
        return {
          unsubscribe: () => sub.unsubscribe
        };
      });
    }
    

    and here it is in use:

    function beginTransaction(args): Observable<any> {
      return defer(() => {
        const currentId = currentTransactionId++;
        return transaction$.pipe(
          initialize(() => transactionSubj.next({id: currentId, args})),
          first(({id}) => id === currentId),
          map(({payload}) => payload)
        );
      })
    }
    

    Aside: Why Use defer?

    Consider re-writting beginTransaction:

    function beginTransaction(args): Observable<any> {
      const currentId = currentTransactionId++;
      return transaction$.pipe(
        initialize(() => transactionSubj.next({id: currentId, args})),
        first(({id}) => id === currentId),
        map(({payload}) => payload)
      );
    }
    

    In this case, the ID is set at the moment you invoke beginTransaction.

    // The ID is set here, but it won't be used until subscribed
    const preppedTransaction = beginTransaction({message: "Dave"});
    
    // 10 seconds later, that ID gets used.
    setTimeout(
      () => preppedTransaction.subscribe(console.log),
      10000
    );
    

    If transactionSubj.next is called without the initialize operator, then this problem gets even worse as transactionSubj.next would also get called 10 seconds before the observable is subscribed to (You're sure to miss the output)

    The problems continue:

    What if you want to subscribe to the same observable twice?

    const preppedTransaction = beginTransaction({message: "Dave"});
    preppedTransaction.subscribe(
      value => console.log("First Subscribe: ", value)
    );
    preppedTransaction.subscribe(
      value => console.log("Second Subscribe: ", value)
    );
    

    I would expect the output to be:

    First Subscribe: Hello Dave
    Second Subscribe: Hello Dave
    

    Instead, you get

    First Subscribe: Hello Dave
    First Subscribe: Hello Dave
    Second Subscribe: Hello Dave
    Second Subscribe: Hello Dave
    

    Because you don't get a new ID on subscribing, the two subscriptions share one ID. defer fixes this problem by not assigning an id until subscription. This becomes seriously important when managing errors in streams (letting you re-try an observable after it errors).