I'd like to subscribe to a stream and get a notification if a certain sequence of events are observed.
Something like - in very much pseudo code: .WhenType<MessageHandled>().ThenType<TransactionCommitted>().ThenAnyNumberOfAnyType().ThenType<QueueStopped>()
.
Like a regex, but filtering on events/types in the stream rather than a string.
What's the best way to accomplish something like that?
I tried using Observable Joins (see Guide to System.Reactive.Joins), but couldn't get a working solution.
I did, however, try a state machine option and got something that works.
I started by creating an enum
that represents your states. I know you're using types, but the enum
makes my example clearer.
public enum Value
{
MessageHandled,
TransactionCommitted,
QueueStopped,
X,
Y,
}
I then created an observable that returns various combinations of these values.
IObservable<Value> source = new[]
{
/* invalid */ Value.TransactionCommitted, Value.TransactionCommitted, Value.MessageHandled, Value.X,
/* valid */ Value.MessageHandled, Value.TransactionCommitted, Value.X, Value.Y, Value.QueueStopped,
/* invalid */ Value.X, Value.X,
/* valid */ Value.MessageHandled, Value.TransactionCommitted, Value.QueueStopped,
/* invalid */ Value.Y, Value.MessageHandled, Value.Y, Value.TransactionCommitted, Value.QueueStopped,
}.ToObservable();
I created an enum for the states, and a definition for the initial states of the state machine and the transitions.
public enum State { A, B, Z }
var initial = new (Value value, State state)[]
{
(Value.MessageHandled, State.A),
};
var transitions = new(State current, Func<Value, bool> rule, State next)[]
{
(State.A, t => t == Value.TransactionCommitted, State.B),
(State.B, t => t != Value.QueueStopped, State.B),
(State.B, t => t == Value.QueueStopped, State.Z),
};
And finally, here's the query that runs the state machine:
IObservable<Value[]> query =
source
.Scan(
new List<(State state, Value[] values)>(),
(accumulator, value) =>
(
Enumerable
.Concat(
from a in accumulator
from t in transitions
where a.state == t.current
where t.rule(value)
select (t.next, a.values.Append(value).ToArray()),
from i in initial
where i.value == value
select (i.state, new[] { value }))
).ToList())
.SelectMany(x => x.Where(y => y.state == State.Z), (x, y) => y.values);
That produced:
And, based on your description, that's what I think you wanted.