I have a randomly failing unit test which I'm at a loss to explain. This involves an observable sequence using Rx.NET and an extension method that I made to transform the sequence. First, let me show how the test fails:
Machine.Specifications.SpecificationException: Expected: System.Collections.Generic.List`1[System.Int32]: { [8], [10], [11] } But was: System.Collections.Generic.List`1[System.Int32]: { [8], [10], [11], [8], [10], [11] }
OK, so you see, I get the entire sequence twice instead of once. Here's the test:
[Subject(typeof(ObservableExtensions), "Shutter Current Readings")]
internal class when_a_shutter_current_reading_is_received
{
Establish context = () => source = "Z8\nZ10\nZ11\n".ToObservable();
Because of = () => source
.ShutterCurrentReadings().Trace("Unbelievable")
.SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
It should_receive_the_current_readings = () => elementHistory.ShouldEqual(expectedElements);
static List<int> elementHistory = new List<int>();
static List<int> expectedElements = new List<int> {8, 10, 11};
static IObservable<char> source;
}
SubscribeAndWaitForCompletion()
is an extension method defined as follows:
public static void SubscribeAndWaitForCompletion<T>(this IObservable<T> sequence, Action<T> observer)
{
var sequenceComplete = new ManualResetEvent(false);
var subscription = sequence.Subscribe(
onNext: observer,
onCompleted: () => sequenceComplete.Set()
);
sequenceComplete.WaitOne();
subscription.Dispose();
sequenceComplete.Dispose();
}
You'll notice there's a .Trace()
call in there, and another one inside the extension method, and this produces logging about the observable sequence via NLog, Here's the trace output:
20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: Subscribe() 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: Subscribe() 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(8) 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(8) 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(10) 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(10) 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnNext(11) 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnNext(11) 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: OnCompleted() 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: OnCompleted() 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|Unbelievable[1]: Dispose() 20:43:43.1547|DEBUG|c__DisplayClass0_1`1|ShutterCurrent[1]: Dispose() Child test failed
This is pretty much what I would expect. I get one trace output from inside my extension method, then another on the transformed sequence outside the extension method. Each element in the sequence flows through the system exactly once, just as expected. And yet, I get the entire sequence captured twice in my test.
I had better provide the extension method so we can see what it does. Here it is:
public static IObservable<int> ShutterCurrentReadings(this IObservable<char> source)
{
const string shutterCurrentPattern = @"^Z(?<Current>\d{1,2})[^0-9]";
var shutterCurrentRegex =
new Regex(shutterCurrentPattern, RegexOptions.Compiled | RegexOptions.ExplicitCapture);
var buffers = source.Publish(s => s.BufferByPredicates(p => p == 'Z', q => !char.IsDigit(q)));
var shutterCurrentValues = from buffer in buffers
let message = new string(buffer.ToArray())
let patternMatch = shutterCurrentRegex.Match(message)
where patternMatch.Success
let shutterCurrent = int.Parse(patternMatch.Groups["Current"].Value)
select shutterCurrent;
return shutterCurrentValues.Trace("ShutterCurrent");
}
So the intent here is to pick out readings from a current sensor from within the data stream. The readings are in the format Znn (literal 'Z' followed by one or two decimal digits followed by a newline. The extension method transforms the raw input character sequence into a sequence of integers representing current readings. The filter uses the Rx Buffer
operator to buffer up characters that it thinks might be valid sensor readings. The buffer is opened when a 'Z' character is seen and closed when a non-digit character is seen. This is double checked by matching and parsing in a regular expression and then if the result passes all that it is converted to an integer and emitted in the output sequence.
Can anyone see why I might be getting double-data in my results?
Update: additional code relevant to the investigation.
public static IObservable<IList<char>> BufferByPredicates(this IObservable<char> source,
Predicate<char> bufferOpening, Predicate<char> bufferClosing)
{
return source.Buffer(source.Where(c => bufferOpening(c)), x => source.Where(c => bufferClosing(c)));
}
The Trace
extension method is found in NuGet package TA.ASCOM.ReactiveCommunications
(one of mine) but here's the source:
public static IObservable<TSource> Trace<TSource>(this IObservable<TSource> source, string name)
{
var log = LogManager.GetLogger(name);
var id = 0;
return Observable.Create<TSource>(observer =>
{
var idClosure = ++id;
Action<string, object> trace = (m, v) => log.Debug("{0}[{1}]: {2}({3})", name, idClosure, m, v);
trace("Subscribe", "");
var disposable = source.Subscribe(
v =>
{
trace("OnNext", v);
observer.OnNext(v);
},
e =>
{
trace("OnError", "");
observer.OnError(e);
},
() =>
{
trace("OnCompleted", "");
observer.OnCompleted();
});
return () =>
{
trace("Dispose", "");
disposable.Dispose();
};
});
}
I suspect I may have copied this code from someone else but I don't seem to have made a note of who.
EDIT:
Here's a way to mock the problem up in LinqPad, without using the MSpec/NChrunch (?) runner:
void Main()
{
//static initializers
List<int> expectedElements = new List<int> { 8, 10, 11 };
List<int> elementHistory = new List<int>();
IObservable<char> source;
//simulated continuous running of MSpec test
for (int i = 0; i < 20; i++)
{
//establish
source = "Z8\nZ10\nZ11\n".ToObservable();
//because
source
.ShutterCurrentReadings()
.Trace("Unbelievable")
.SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
//it
elementHistory.Dump(i.ToString()); //Linqpad
if(elementHistory.Count > 3)
throw new Exception("Assert.ShouldNotHappen");
}
}
public static class Extensions
{
public static IObservable<int> ShutterCurrentReadings(this IObservable<char> source)
{
const string shutterCurrentPattern = @"^Z(?<Current>\d{1,2})[^0-9]";
var shutterCurrentRegex =
new Regex(shutterCurrentPattern, RegexOptions.Compiled | RegexOptions.ExplicitCapture);
var buffers = source.Publish(s => s.BufferByPredicates(p => p == 'Z', q => !char.IsDigit(q)));
var shutterCurrentValues = from buffer in buffers
let message = new string(buffer.ToArray())
let patternMatch = shutterCurrentRegex.Match(message)
where patternMatch.Success
let shutterCurrent = int.Parse(patternMatch.Groups["Current"].Value)
select shutterCurrent;
return shutterCurrentValues.Trace("ShutterCurrent");
}
public static void SubscribeAndWaitForCompletion<T>(this IObservable<T> sequence, Action<T> observer)
{
var sequenceComplete = new ManualResetEvent(false);
var subscription = sequence.Subscribe(
onNext: observer,
onCompleted: () => sequenceComplete.Set()
);
sequenceComplete.WaitOne();
subscription.Dispose();
sequenceComplete.Dispose();
}
public static IObservable<TSource> Trace<TSource>(this IObservable<TSource> source, string name)
{
var log = LogManager.GetLogger(name);
var id = 0;
return Observable.Create<TSource>(observer =>
{
var idClosure = ++id;
Action<string, object> trace = (m, v) => log.Debug("{0}[{1}]: {2}({3})", name, idClosure, m, v);
trace("Subscribe", "");
var disposable = source.Subscribe(
v =>
{
trace("OnNext", v);
observer.OnNext(v);
},
e =>
{
trace("OnError", "");
observer.OnError(e);
},
() =>
{
trace("OnCompleted", "");
observer.OnCompleted();
});
return () =>
{
trace("Dispose", "");
disposable.Dispose();
};
});
}
public static IObservable<IList<char>> BufferByPredicates(this IObservable<char> source,
Predicate<char> bufferOpening, Predicate<char> bufferClosing)
{
return source.Buffer(source.Where(c => bufferOpening(c)), x => source.Where(c => bufferClosing(c)));
}
}
This fails, like your scenario.
My best suggestion for fixing it would be to move initialization of elementHistory
to the Establish
step. You can also move the source
variable away from the establish, so your test would look like this:
internal class when_a_shutter_current_reading_is_received
{
Establish context = () => elementHistory = new List<int>();
Because of = () => "Z8\nZ10\nZ11\n".ToObservable()
.ShutterCurrentReadings()
.Trace("Unbelievable")
.SubscribeAndWaitForCompletion(item => elementHistory.Add(item));
It should_receive_the_current_readings = () => elementHistory.ShouldEqual(expectedElements);
static List<int> elementHistory;
static List<int> expectedElements = new List<int> { 8, 10, 11 };
}
You may also want to look at Microsoft.Reactive.Testing
which provides some more robust testing on Rx queries, though it wouldn't read as simply as your tests.
Old Answer:
I can't compile your code due to missing Trace
, ShouldEqual
and BufferByPredicates
functions. If they come from an external source, please document where.
I'm guessing the problem stems from the BufferByPredicates
implementation, the Trace
implementation, the lack of Connect
after the Publish
, or the static elementHistory
.
My best guess is the static elementHistory
: If that test is run twice concurrently, you have a race condition and it's possible you would end up with double results (Establish
runs twice, then Because
runs twice, then It
would fail).