Search code examples
c++reactive-programmingrxcpp

rxcpp: nested while loop or similar "classic" imperative structure for program


I have a device that streams some events. I want to use reactive extensions to model the following behavior:

  1. Detect when a user connects a dongle (my program checks for events for dongle connected).
  2. Start to capture a stream of data from the dongle after the dongle is connected.
  3. Be able to detect when the dongle got disconnected and come back to 1., later if user connects the dongle again, I want to go to 2. In the state where I stream data if keyboard is hit, then the program finishes.

I know how to wait for dongle connection (1.):

auto waitForDongle = events.take_while([](auto const & event) {
      return event == DongleConnected
      }).subscribe([](auto) {});

And I know how capture the stream (2.):

auto streamMotionData = events.take_while([](auto const &) { return !keyboardPressed(); })
    .map([](auto const & evt) -> boost::optional<std::vector<double>> {
            ...
            return data;
        }).subscribe([](vector<double> const &) { ...});

My problem is that I do not know how to combine streams in order to come back to 1. and later 2. I just know how to do this once and sequentially. But I want the behavior described above.


Solution

  • This is related to the common UX drag example in Rx. In this case, the press is a dongle connection and the release is a dongle removal.

    This solution requires that only one dongle may be connected at a time (a removal is expected before the next connection). There would have to be more information in the event to allow multiple connections to overlap.

    Here is the core of the solution. The whole program is below.

        auto DatasFromConnectedDongle = DongleConnects. // when connected
            map([=](DongleEvent const & event){
                assert(event == DongleEvent::Connected);
                cout << "Connected - " << flush;
                return DongleDatas. // return all the datas
                    take_until(DongleRemoves). // stop when removed
                    finally([](){
                        cout << "- Removed" << endl;
                    });
            }).
            switch_on_next(). // only listen to datas from the most recent connected dongle
            take_until(Exits); // stop everything when key is pressed
    

    I did end up using repeat(), but only to produce the test event data.

    #include <rxcpp/rx.hpp>
    
    namespace Rx {
    using namespace rxcpp;
    using namespace rxcpp::sources;
    using namespace rxcpp::operators;
    using namespace rxcpp::util;
    }
    using namespace Rx;
    
    #include <cassert>
    using namespace std;
    using namespace std::chrono;
    
    int main()
    {
        //
        // test code
        //
    
        auto keyboardPressed = [](){
            return false;
        };
    
        enum class DongleEvent {
            Connected,
            Removed,
            Data,
            Other
        };
        auto events = from(
                DongleEvent::Data, DongleEvent::Other, 
                DongleEvent::Connected, DongleEvent::Data, 
                DongleEvent::Other, DongleEvent::Other, 
                DongleEvent::Data, DongleEvent::Removed, 
                DongleEvent::Other, DongleEvent::Data).
            repeat(5). // send the above events five times over
            zip(take_at<0>(), interval(milliseconds(200))). // pace our test data
            publish().
            ref_count(); // publish and ref_count make the events sharable
    
        //
        // the solution
        //
    
        // fires when connected
        auto DongleConnects = events.
            filter([](DongleEvent const & event) {
                return event == DongleEvent::Connected;
            });
    
        // fires when data arrives
        auto DongleDatas = events.
            filter([](DongleEvent const & event) {
                return event == DongleEvent::Data;
            });
    
        // fires when removed    
        auto DongleRemoves = events.
            filter([](DongleEvent const & event) {
                return event == DongleEvent::Removed;
            });
    
        // fires when key pressed    
        auto Exits = interval(milliseconds(200)).
            filter([=](long){
                return keyboardPressed();
            });
    
        auto DatasFromConnectedDongle = DongleConnects.
            map([=](DongleEvent const & event){
                assert(event == DongleEvent::Connected);
                cout << "Connected - " << flush;
                return DongleDatas. // return all the datas
                    take_until(DongleRemoves). // stop when removed
                    finally([](){
                        cout << "- Removed" << endl;
                    });
            }).
            switch_on_next(). // only listen to datas from the most recent connected dongle
            take_until(Exits); // stop everything when key is pressed
    
        DatasFromConnectedDongle.subscribe([](DongleEvent const & event){
            assert(event == DongleEvent::Data);
            cout << "Data " << flush;
        });
    
        return 0;
    }
    

    produces

    ~/source/rxcpp/Rx/v2/examples/dongle (master)$ cmake .
    ...
    ~/source/rxcpp/Rx/v2/examples/dongle (master)$ make
    Scanning dependencies of target dongle
    [ 50%] Building CXX object CMakeFiles/dongle.dir/main.cpp.o
    [100%] Linking CXX executable dongle
    [100%] Built target dongle
    ~/source/rxcpp/Rx/v2/examples/dongle (master)$ ./dongle 
    Connected - Data Data - Removed
    Connected - Data Data - Removed
    Connected - Data Data - Removed
    Connected - Data Data - Removed
    Connected - Data Data - Removed
    ~/source/rxcpp/Rx/v2/examples/dongle (master)$