Search code examples
c#system.reactivemspecncrunch

Why does my Rx.NET observable appear to produce its entire sequence twice?


I have a randomly failing unit test which I'm at a loss to explain. This involves an observable sequence using Rx.NET and an extension method that I made to transform the sequence. First, let me show how the test fails:

Machine.Specifications.SpecificationException:   Expected: System.Collections.Generic.List`1[System.Int32]:
{
  [8],
  [10],
  [11]
}

  But was:  System.Collections.Generic.List`1[System.Int32]:
{
  [8],
  [10],
  [11],
  [8],
  [10],
  [11]
}

OK, so you see, I get the entire sequence twice instead of once. Here's the test:

[Subject(typeof(ObservableExtensions), "Shutter Current Readings")]
internal class when_a_shutter_current_reading_is_received
    {
    Establish context = () => source = "Z8\nZ10\nZ11\n".ToObservable();
    Because of = () => source
        .ShutterCurrentReadings().Trace("Unbelievable")
        .SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
    It should_receive_the_current_readings = () => elementHistory.ShouldEqual(expectedElements);
    static List<int> elementHistory = new List<int>();
    static List<int> expectedElements = new List<int> {8, 10, 11};
    static IObservable<char> source;

    }

SubscribeAndWaitForCompletion() is an extension method defined as follows:

public static void SubscribeAndWaitForCompletion<T>(this IObservable<T> sequence, Action<T> observer)
    {
    var sequenceComplete = new ManualResetEvent(false);
    var subscription = sequence.Subscribe(
        onNext: observer,
        onCompleted: () => sequenceComplete.Set()
        );
    sequenceComplete.WaitOne();
    subscription.Dispose();
    sequenceComplete.Dispose();
    }

You'll notice there's a .Trace() call in there, and another one inside the extension method, and this produces logging about the observable sequence via NLog, Here's the trace output:

20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: Subscribe()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: Subscribe()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(8)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(8)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(10)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(10)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(11)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(11)
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnCompleted()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnCompleted()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: Dispose()
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: Dispose()
Child test failed

This is pretty much what I would expect. I get one trace output from inside my extension method, then another on the transformed sequence outside the extension method. Each element in the sequence flows through the system exactly once, just as expected. And yet, I get the entire sequence captured twice in my test.

I had better provide the extension method so we can see what it does. Here it is:

    public static IObservable<int> ShutterCurrentReadings(this IObservable<char> source)
        {
        const string shutterCurrentPattern = @"^Z(?<Current>\d{1,2})[^0-9]";
        var shutterCurrentRegex =
            new Regex(shutterCurrentPattern, RegexOptions.Compiled | RegexOptions.ExplicitCapture);
        var buffers = source.Publish(s => s.BufferByPredicates(p => p == 'Z', q => !char.IsDigit(q)));
        var shutterCurrentValues = from buffer in buffers
                                   let message = new string(buffer.ToArray())
                                   let patternMatch = shutterCurrentRegex.Match(message)
                                   where patternMatch.Success
                                   let shutterCurrent = int.Parse(patternMatch.Groups["Current"].Value)
                                   select shutterCurrent;
        return shutterCurrentValues.Trace("ShutterCurrent");
        }

So the intent here is to pick out readings from a current sensor from within the data stream. The readings are in the format Znn (literal 'Z' followed by one or two decimal digits followed by a newline. The extension method transforms the raw input character sequence into a sequence of integers representing current readings. The filter uses the Rx Buffer operator to buffer up characters that it thinks might be valid sensor readings. The buffer is opened when a 'Z' character is seen and closed when a non-digit character is seen. This is double checked by matching and parsing in a regular expression and then if the result passes all that it is converted to an integer and emitted in the output sequence.

Can anyone see why I might be getting double-data in my results?

Update: additional code relevant to the investigation.

    public static IObservable<IList<char>> BufferByPredicates(this IObservable<char> source,
        Predicate<char> bufferOpening, Predicate<char> bufferClosing)
        {
        return source.Buffer(source.Where(c => bufferOpening(c)), x => source.Where(c => bufferClosing(c)));
        }

The Trace extension method is found in NuGet package TA.ASCOM.ReactiveCommunications (one of mine) but here's the source:

    public static IObservable<TSource> Trace<TSource>(this IObservable<TSource> source, string name)
        {
        var log = LogManager.GetLogger(name);
        var id = 0;
        return Observable.Create<TSource>(observer =>
            {
            var idClosure = ++id;
            Action<string, object> trace = (m, v) => log.Debug("{0}[{1}]: {2}({3})", name, idClosure, m, v);
            trace("Subscribe", "");
            var disposable = source.Subscribe(
                v =>
                    {
                    trace("OnNext", v);
                    observer.OnNext(v);
                    },
                e =>
                    {
                    trace("OnError", "");
                    observer.OnError(e);
                    },
                () =>
                    {
                    trace("OnCompleted", "");
                    observer.OnCompleted();
                    });
            return () =>
                {
                trace("Dispose", "");
                disposable.Dispose();
                };
            });
        }

I suspect I may have copied this code from someone else but I don't seem to have made a note of who.


Solution

  • EDIT:

    Here's a way to mock the problem up in LinqPad, without using the MSpec/NChrunch (?) runner:

    void Main()
    {
        //static initializers
        List<int> expectedElements = new List<int> { 8, 10, 11 };
        List<int> elementHistory = new List<int>();
        IObservable<char> source;
    
        //simulated continuous running of MSpec test
        for (int i = 0; i < 20; i++)
        {
    
            //establish
            source = "Z8\nZ10\nZ11\n".ToObservable();
    
            //because
            source
                .ShutterCurrentReadings()
                .Trace("Unbelievable")
                .SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
    
            //it
            elementHistory.Dump(i.ToString()); //Linqpad
            if(elementHistory.Count > 3)
                throw new Exception("Assert.ShouldNotHappen");
        }
    }
    
    public static class Extensions
    {
        public static IObservable<int> ShutterCurrentReadings(this IObservable<char> source)
        {
            const string shutterCurrentPattern = @"^Z(?<Current>\d{1,2})[^0-9]";
            var shutterCurrentRegex =
                new Regex(shutterCurrentPattern, RegexOptions.Compiled | RegexOptions.ExplicitCapture);
            var buffers = source.Publish(s => s.BufferByPredicates(p => p == 'Z', q => !char.IsDigit(q)));
            var shutterCurrentValues = from buffer in buffers
                                       let message = new string(buffer.ToArray())
                                       let patternMatch = shutterCurrentRegex.Match(message)
                                       where patternMatch.Success
                                       let shutterCurrent = int.Parse(patternMatch.Groups["Current"].Value)
                                       select shutterCurrent;
            return shutterCurrentValues.Trace("ShutterCurrent");
        }
    
        public static void SubscribeAndWaitForCompletion<T>(this IObservable<T> sequence, Action<T> observer)
        {
            var sequenceComplete = new ManualResetEvent(false);
            var subscription = sequence.Subscribe(
                onNext: observer,
                onCompleted: () => sequenceComplete.Set()
                );
            sequenceComplete.WaitOne();
            subscription.Dispose();
            sequenceComplete.Dispose();
        }
    
        public static IObservable<TSource> Trace<TSource>(this IObservable<TSource> source, string name)
        {
            var log = LogManager.GetLogger(name);
            var id = 0;
            return Observable.Create<TSource>(observer =>
                {
                    var idClosure = ++id;
                    Action<string, object> trace = (m, v) => log.Debug("{0}[{1}]: {2}({3})", name, idClosure, m, v);
                    trace("Subscribe", "");
                    var disposable = source.Subscribe(
                        v =>
                            {
                                trace("OnNext", v);
                                observer.OnNext(v);
                            },
                        e =>
                            {
                                trace("OnError", "");
                                observer.OnError(e);
                            },
                        () =>
                            {
                                trace("OnCompleted", "");
                                observer.OnCompleted();
                            });
                    return () =>
                        {
                            trace("Dispose", "");
                            disposable.Dispose();
                        };
                });
        }
    
        public static IObservable<IList<char>> BufferByPredicates(this IObservable<char> source,
            Predicate<char> bufferOpening, Predicate<char> bufferClosing)
        {
            return source.Buffer(source.Where(c => bufferOpening(c)), x => source.Where(c => bufferClosing(c)));
        }
    }
    

    This fails, like your scenario.

    My best suggestion for fixing it would be to move initialization of elementHistory to the Establish step. You can also move the source variable away from the establish, so your test would look like this:

    internal class when_a_shutter_current_reading_is_received
    {
        Establish context = () => elementHistory = new List<int>();
        Because of = () => "Z8\nZ10\nZ11\n".ToObservable()
            .ShutterCurrentReadings()
            .Trace("Unbelievable")
            .SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
        It should_receive_the_current_readings = () => elementHistory.ShouldEqual(expectedElements);
        static List<int> elementHistory;
        static List<int> expectedElements = new List<int> { 8, 10, 11 };
    
    }
    

    You may also want to look at Microsoft.Reactive.Testing which provides some more robust testing on Rx queries, though it wouldn't read as simply as your tests.


    Old Answer:

    I can't compile your code due to missing Trace, ShouldEqual and BufferByPredicates functions. If they come from an external source, please document where.

    I'm guessing the problem stems from the BufferByPredicates implementation, the Trace implementation, the lack of Connect after the Publish, or the static elementHistory.

    My best guess is the static elementHistory: If that test is run twice concurrently, you have a race condition and it's possible you would end up with double results (Establish runs twice, then Because runs twice, then It would fail).