I'm trying to achieve next:
private beginTransaction(): Observable() {
..
}
private test(): void {
this.beginTransaction().subscribe((): void => {
this.commitTransaction();
});
this.beginTransaction().subscribe((): void => {
this.commitTransaction();
});
}
beginTransaction can be called concurrently, but should delay the observable until first or only one beginTransaction finished.
In order words: Only one transaction can be in progress at any time.
What have I tried:
private transactionInProgress: boolean = false;
private canBeginTransaction: Subject<void> = new Subject<void>();
private bla3(): void {
this.beginTransaction().subscribe((): void => {
console.log('beginTransaction 1');
this.commitTransaction();
});
this.beginTransaction().subscribe((): void => {
console.log('beginTransaction 2');
this.commitTransaction();
});
this.beginTransaction().subscribe((): void => {
console.log('beginTransaction 3');
this.commitTransaction();
});
}
private commitTransaction(): void {
this.transactionInProgress = false;
this.canBeginTransaction.next();
}
private beginTransaction(): Observable<void> {
if(this.transactionInProgress) {
return of(undefined)
.pipe(
skipUntil(this.canBeginTransaction),
tap((): void => {
console.log('begin transaction');
})
);
}
this.transactionInProgress = true;
return of(undefined);
}
What you've asked about is pretty vague and general. Without a doubt, a more constrained scenario could probably look a whole lot simpler.
Regardless, here I create a pipeline that only lets transaction(): Observable
be subscribed to once at a time.
Here's how that might look:
/****
* Represents what each transaction does. Isn't concerned about
* order/timing/'transactionInProgress' or anything like that.
*
* Here is a fake transaction that just takes 3-5 seconds to emit
* the string: `Hello ${name}`
****/
function transaction(args): Observable<string> {
const name = args?.message;
const duration = 3000 + (Math.random() * 2000);
return of("Hello").pipe(
tap(_ => console.log("starting transaction")),
switchMap(v => timer(duration).pipe(
map(_ => `${v} ${name}`)
)),
tap(_ => console.log("Ending transation"))
);
}
// Track transactions
let currentTransactionId = 0;
// Start transactions
const transactionSubj = new Subject<any>();
// Perform transaction: concatMap ensures we only start a new one if
// there isn't a current transaction underway
const transaction$ = transactionSubj.pipe(
concatMap(({id, args}) => transaction(args).pipe(
map(payload => ({id, payload}))
)),
shareReplay(1)
);
/****
* Begin a new transaction, we give it an ID since transactions are
* "hot" and we don't want to return the wrong (earlier) transactions,
* just the current one started with this call.
****/
function beginTransaction(args): Observable<any> {
return defer(() => {
const currentId = currentTransactionId++;
transactionSubj.next({id: currentId, args});
return transaction$.pipe(
first(({id}) => id === currentId),
map(({payload}) => payload)
);
})
}
// Queue up 3 transactions, each one will wait for the previous
// one to complete before it will begin.
beginTransaction({message: "Dave"}).subscribe(console.log);
beginTransaction({message: "Tom"}).subscribe(console.log);
beginTransaction({message: "Tim"}).subscribe(console.log);
The current setup requires transactions to be asynchronous, or you risk losing the first one. The workaround for that is not simple, so I've built an operator that subscribes, then calls a function as soon as possible after that.
Here it is:
function initialize<T>(fn: () => void): MonoTypeOperatorFunction<T> {
return s => new Observable(observer => {
const bindOn = name => observer[name].bind(observer);
const sub = s.subscribe({
next: bindOn("next"),
error: bindOn("error"),
complete: bindOn("complete")
});
fn();
return {
unsubscribe: () => sub.unsubscribe
};
});
}
and here it is in use:
function beginTransaction(args): Observable<any> {
return defer(() => {
const currentId = currentTransactionId++;
return transaction$.pipe(
initialize(() => transactionSubj.next({id: currentId, args})),
first(({id}) => id === currentId),
map(({payload}) => payload)
);
})
}
defer
?Consider re-writting beginTransaction:
function beginTransaction(args): Observable<any> {
const currentId = currentTransactionId++;
return transaction$.pipe(
initialize(() => transactionSubj.next({id: currentId, args})),
first(({id}) => id === currentId),
map(({payload}) => payload)
);
}
In this case, the ID is set at the moment you invoke beginTransaction
.
// The ID is set here, but it won't be used until subscribed
const preppedTransaction = beginTransaction({message: "Dave"});
// 10 seconds later, that ID gets used.
setTimeout(
() => preppedTransaction.subscribe(console.log),
10000
);
If transactionSubj.next
is called without the initialize operator, then this problem gets even worse as transactionSubj.next
would also get called 10 seconds before the observable is subscribed to (You're sure to miss the output)
The problems continue:
What if you want to subscribe to the same observable twice?
const preppedTransaction = beginTransaction({message: "Dave"});
preppedTransaction.subscribe(
value => console.log("First Subscribe: ", value)
);
preppedTransaction.subscribe(
value => console.log("Second Subscribe: ", value)
);
I would expect the output to be:
First Subscribe: Hello Dave
Second Subscribe: Hello Dave
Instead, you get
First Subscribe: Hello Dave
First Subscribe: Hello Dave
Second Subscribe: Hello Dave
Second Subscribe: Hello Dave
Because you don't get a new ID on subscribing, the two subscriptions share one ID. defer
fixes this problem by not assigning an id until subscription. This becomes seriously important when managing errors in streams (letting you re-try an observable after it errors).