Search code examples
system.reactive

Reactive Extensions operators for pattern matching


I'd like to subscribe to a stream and get a notification if a certain sequence of events are observed.

Something like - in very much pseudo code: .WhenType<MessageHandled>().ThenType<TransactionCommitted>().ThenAnyNumberOfAnyType().ThenType<QueueStopped>().

Like a regex, but filtering on events/types in the stream rather than a string.

What's the best way to accomplish something like that?


Solution

  • I tried using Observable Joins (see Guide to System.Reactive.Joins), but couldn't get a working solution.

    I did, however, try a state machine option and got something that works.

    I started by creating an enum that represents your states. I know you're using types, but the enum makes my example clearer.

    public enum Value
    {
        MessageHandled,
        TransactionCommitted,
        QueueStopped,
        X,
        Y,
    }
    

    I then created an observable that returns various combinations of these values.

    IObservable<Value> source = new[]
    {
        /* invalid */ Value.TransactionCommitted, Value.TransactionCommitted, Value.MessageHandled, Value.X,
        /* valid */ Value.MessageHandled, Value.TransactionCommitted, Value.X, Value.Y, Value.QueueStopped,
        /* invalid */ Value.X, Value.X,
        /* valid */ Value.MessageHandled, Value.TransactionCommitted, Value.QueueStopped,
        /* invalid */ Value.Y, Value.MessageHandled, Value.Y, Value.TransactionCommitted, Value.QueueStopped,
    }.ToObservable();
    

    I created an enum for the states, and a definition for the initial states of the state machine and the transitions.

    public enum State { A, B, Z }
    
    var initial = new (Value value, State state)[]
    {
        (Value.MessageHandled, State.A),
    };
    
    var transitions = new(State current, Func<Value, bool> rule, State next)[]
    {
        (State.A, t => t == Value.TransactionCommitted, State.B),
        (State.B, t => t != Value.QueueStopped, State.B),
        (State.B, t => t == Value.QueueStopped, State.Z),
    };
    

    And finally, here's the query that runs the state machine:

    IObservable<Value[]> query =
        source
            .Scan(
                new List<(State state, Value[] values)>(),
                (accumulator, value) =>
                (
                    Enumerable
                        .Concat(
                            from a in accumulator
                            from t in transitions
                            where a.state == t.current
                            where t.rule(value)
                            select (t.next, a.values.Append(value).ToArray()),
                            from i in initial
                            where i.value == value
                            select (i.state, new[] { value }))
                ).ToList())
            .SelectMany(x => x.Where(y => y.state == State.Z), (x, y) => y.values);
    

    That produced:

    output

    And, based on your description, that's what I think you wanted.