Search code examples
c++reactive-programmingrxcpp

Scheduling and Timeout handling with rxcpp


I'm new to using rxcpp and trying to get something functional together in the following scenario:

I have one data source that will retrieve commands from a separate source, the code I'm writing will retrieve these commands into an rxcpp observable. It has the special condition that if no command has been received within a certain period of time the subscribers onError function will be ran instead of the onNext, but the timeout can only happen before receiving the first command. After receiving the first command, no timeout can happen no matter how long it takes for it to receive any further commands.

I'm trying to accomplish this with something like this:

auto timeout = rxcpp::observable<>::timer(std::chrono::steady_clock::now() + timeout,
                             rxcpp::observe_on_event_loop()).map([](int val) // Note, converts the value type of the timer observable and converts timeouts to errors
{
    std::cout << "TIMED OUT!" << std::endl;
    throw std::runtime_error("timeout");
    return command_type();
});
auto commands = timeout.amb(rxcpp::observe_on_event_loop(), createCommandSource(event_loop_scheduler, ...));

The problem I have is that the timeout happens before any commands are recieved even though they are inserterd far before the timeout happens. I have experimented with timouts from 1000ms to 5000ms, and it makes no difference. If I remove the timeout code the command is recieved immediately however. I'm suspecting that it's likely that I simply have misunderstood how to use the schedulers in rxcpp though, so I'm wondering on how this could be accomplished.


Solution

  • I wrote a simple createCommandSource. This worked for me:

    #include "rxcpp/rx.hpp"
    using namespace rxcpp;
    using namespace rxcpp::sources;
    using namespace rxcpp::util;
    
    using namespace std;
    
    struct command_type {};
    
    int main()
    {
        auto eventloop = rxcpp::observe_on_event_loop();
        auto createCommandSource = [=]() {
            return rxcpp::observable<>::interval(std::chrono::seconds(1), eventloop).map([](long) {return command_type(); });
        };
        auto timeout = rxcpp::observable<>::timer(eventloop.now() + std::chrono::seconds(2), eventloop).map([](long ) // Note, converts the value type of the timer observable and converts timeouts to errors
        {
            std::cout << "TIMED OUT!" << std::endl;
            throw std::runtime_error("timeout");
            return command_type();
        });
        auto commands = timeout.amb(eventloop, createCommandSource().take(5));
    
        commands
            .as_blocking().subscribe(
            [](command_type) {printf("command\n"); },
            [](std::exception_ptr) {printf("execption\n"); });
    
        std::this_thread::sleep_for(std::chrono::seconds(2));
    
        return 0;
    }