Search code examples
c#unity-game-enginereactivex

How to use UniRx Observable.Timeout set a timeout?


I'm attempt use observable.Timeout set a timeout but result is beyond my expect.
Code like this:

public class ObservableTest : MonoBehaviour {

    // Use this for initialization
    void Start () {
        var subj = new Subject<int> ();
        new System.Threading.Thread (() => {
            Debug.Log("thread id - " + System.Threading.Thread.CurrentThread.ManagedThreadId);
            var i = 0;
            while(i < 10) {
                i += 1;
                System.Threading.Thread.Sleep(i*1000 );
                subj.OnNext(i);
            }
        }).Start();


        var timeout = subj.Timeout (System.TimeSpan.FromSeconds (4));
        timeout.Subscribe (x => Debug.Log("x - " + x), ex => Debug.LogError(ex), () => Debug.Log("completed"));
    }

}

Console output:

x - 1
x - 2
...
x - 10

Why not interrupt before x - 5?How to set a timeout to a observable?

UPDATE

Sometime output interrupted at x - 8 I don't know how does it works really.


Solution

  • I have checked the problem. Used Spy() extension from this post > How can I see what my reactive extensions query is doing?

    The problem might be the Thread of the caller isn't handled correctly. It works for me. Disconnects after x - 2.

    var subj = new Subject<int>();
    new Thread(() => {
        Console.WriteLine("thread id - " + Thread.CurrentThread.ManagedThreadId);
        var i = 0;
        while (i < 4)
        {
            i += 1;
            Thread.Sleep(i * 1000);
            subj.OnNext(i);
        }
        subj.OnCompleted();
    }).Start();
    
    //var timeout = subj.Timeout(TimeSpan.FromSeconds(0.5));
    var timeout = subj.Timeout(TimeSpan.FromSeconds(2));
    IDisposable disp = timeout.Spy().Subscribe(x => Debug.WriteLine("x - " + x), ex => Debug.WriteLine(ex.Message), () => Debug.WriteLine("completed"));
    // That might be missing.
    Thread.CurrentThread.Join(10500);
    disp.Dispose();
    

    output

    Debug Trace:
    IObservable: Observable obtained on Thread: 14, 03:05:24.28
    IObservable: Subscribed to on Thread: 14, 03:05:24.289
    IObservable: Subscription completed. 03:05:24.296
    IObservable: OnNext(1) on Thread: 17, 03:05:25.258 //<- i = 1. One sec wait.
    x - 1
    IObservable: OnNext(2) on Thread: 17, 03:05:27.261 //<- i = 2. Two sec wait. It's correct.
    x - 2
    IObservable: OnError(System.TimeoutException: The operation has timed out.) on Thread: 13, 03:05:29.264 //<- i = 3. Two sec timeout comes into play. Correct. Next step would need 3 sec to finish.
    The operation has timed out.
    IObservable: Cleaned up on Thread: 13, 03:05:29.266
    

    I had a different output, which is also correct

    Debug Trace:
    IObservable: Observable obtained on Thread: 14, 03:20:17.714
    IObservable: Subscribed to on Thread: 14, 03:20:17.723
    IObservable: Subscription completed. 03:20:17.73
    IObservable: OnNext(1) on Thread: 17, 03:20:18.693 //<- 1 sec passed since subscription. correct
    x - 1
    IObservable: OnError(System.TimeoutException: The operation has timed out.) on Thread: 12, 03:20:20.698 //<- 2 sec passed, and that was the timeout. There was a race between timeout and OnNext(2). Timeout win. Correct.
    The operation has timed out.
    IObservable: Cleaned up on Thread: 12, 03:20:20.7