I am reproducing my Rx issue with a simplified test case below. The test below hangs. I am sure it is a small, but fundamental, thing that I am missing, but can't put my finger on it.
public class Service
{
private ISubject<double> _subject = new Subject<double>();
public void Reset()
{
_subject.OnNext(0.0);
}
public IObservable<double> GetProgress()
{
return _subject;
}
}
public class ObTest
{
[Fact]
private async Task SimpleTest()
{
var service = new Service();
var result = service.GetProgress().Take(1);
var task = Task.Run(async () =>
{
service.Reset();
});
await result;
}
}
UPDATE
My attempt above was to simplify the problem a little and understand it. In my case GetProgress()
is a merge of various Observables
that publish the download progress, one of these Observables
is a Subject<double>
that publishes 0
everytime somebody calls a method to delete the download.
The race condition identified by Enigmativity and Theodor Zoulias may(??) happen in real life. I display a view which attempts to get the progress, however, quick fingers delete it just in time.
What I need to understand a bit more is if the download is started again (subscription has taken place by now, by virtue of displaying a view, which has already made the subscription) and somebody again deletes it.
public class Service
{
private ISubject<double> _deleteSubject = new Subject<double>();
public void Reset()
{
_deleteSubject.OnNext(0.0);
}
public IObservable<double> GetProgress()
{
return _deleteSubject.Merge(downloadProgress);
}
}
Your code isn't hanging. It's awaiting an observable that sometimes never gets a value.
You have a race condition.
The Task.Run
is sometimes executing to completion before the await result
creates the subscription to the observable - so it never sees the value.
Try this code instead:
private async Task SimpleTest()
{
var service = new Service();
var result = service.GetProgress().Take(1);
var awaiter = result.GetAwaiter();
var task = Task.Run(() =>
{
service.Reset();
});
await awaiter;
}