Search code examples
c++c++11reactive-programmingrxcpp

How to handle a request/response stream in rxcpp


I need to implement a camera sampling system in rxcpp.. The way I imagined this is passing a requestStream as param and receiving a responseStream

Every time requestSample is called, a new camera session is created and when on_complete() is called for the requestStream, the camera session terminates

observable<ImageSample> requestSampleStream(observable<ImageRequest> requestStream$) {
  auto response$ = rxcpp::observable<>::create<ImageSample>(
  [&](rxcpp::subscriber<ImageSample> s){
    auto request_next = [&](ImageRequest imageRequest) {
      cout << "image request next" << endl;
      SampleImage vsi;
      s.on_next(vsi);
    };

    auto request_completed = [&] {
      cout << "no more samples needed.. close camera" << endl;
      s.on_completed();
    };

    auto request_error = [&](std::exception_ptr e) {
      try { rethrow_exception(e); }
      catch (const exception &ex) {
        cerr << "error happened on request stream.. close the camera and send error on return stream" << endl << ex.what() << endl;
      }

      s.on_error(e);
    };

    requestStream$.subscribe(request_next,
                              request_error,
                              request_completed
    );
  });

  return response$;
}

The problem here is that by the time that requestStream$.subscribe(request_next.. is called, I get an EXC_BAD_ACCESS


Solution

  • s is a local stack variable, but the lambdas are capturing a reference to s. This reference is invalid by the time the lambdas are called. change [&] to [=] and this should work!

    Another option is to use the existing map algo.

    observable<ImageSample> requestSampleStream(observable<ImageRequest> requestStream$) {
        return requestStream$ |
            map([](ImageRequest imageRequest){
                cout << "image request next" << endl;
                SampleImage vsi;
                s.on_next(vsi);
            })
            // optional
            | tap([&](std::exception_ptr e) {
                try { rethrow_exception(e); }
                catch (const exception &ex) {
                    cerr << "error happened on request stream.. close the camera and send error on return stream" << endl << ex.what() << endl;
                }
            },
            [](){
                cout << "no more samples needed.. close camera" << endl;
            })
            ;
    }