Search code examples
c++11rxcpp

RXCPP: Timeout on blocking function


Consider a blocking function: this_thread::sleep_for(milliseconds(3000));

I'm trying to get the following behavior:

Trigger Blocking Function               

|---------------------------------------------X

I want to trigger the blocking function and if it takes too long (more than two seconds), it should timeout.

I've done the following:

my_connection = observable<>::create<int>([](subscriber<int> s) {
    auto s2 = observable<>::just(1, observe_on_new_thread()) |
    subscribe<int>([&](auto x) {
        this_thread::sleep_for(milliseconds(3000));
        s.on_next(1);
    });
}) |
timeout(seconds(2), observe_on_new_thread());

I can't get this to work. For starters, I think s can't on_next from a different thread.

So my question is, what is the correct reactive way of doing this? How can I wrap a blocking function in rxcpp and add a timeout to it?

Subsequently, I want to get an RX stream that behaves like this:

Trigger                Cleanup

|------------------------X
                           (Delay)   Trigger           Cleanup
                                       |-----------------X

Solution

  • Great question! The above is pretty close.

    Here is an example of how to adapt blocking operations to rxcpp. It does libcurl polling to make http requests.

    The following should do what you intended.

    auto sharedThreads = observe_on_event_loop();
    
    auto my_connection = observable<>::create<int>([](subscriber<int> s) {
            this_thread::sleep_for(milliseconds(3000));
            s.on_next(1);
            s.on_completed();
        }) |
        subscribe_on(observe_on_new_thread()) |
        //start_with(0) | // workaround bug in timeout
        timeout(seconds(2), sharedThreads);
        //skip(1); // workaround bug in timeout
    
    my_connection.as_blocking().subscribe(
        [](int){}, 
        [](exception_ptr ep){cout << "timed out" << endl;}
    );
    
    • subscribe_on will run the create on a dedicated thread, and thus create is allowed to block that thread.
    • timeout will run the timer on a different thread, that can be shared with others, and transfer all the on_next/on_error/on_completed calls to that same thread.
    • as_blocking will make sure that subscribe does not return until it has completed. This is only used to prevent main() from exiting - most often in test or example programs.

    EDIT: added workaround for bug in timeout. At the moment, it does not schedule the first timeout until the first value arrives.

    EDIT-2: timeout bug has been fixed, the workaround is not needed anymore.