Search code examples
qtrxcpp

How to connect two functions returning observables?


I wrote one function that return a pair of QDateTime as observable, like this one:

rxcpp::observable<std::tuple<QDateTime, QDateTime>> experimentOne(const QDateTimeAxis * const axis 
{
   return rxcpp::observable<>::create<std::tuple<QDateTime, QDateTime>>(
     [axis](rxcpp::subscriber<std::tuple<QDateTime, QDateTime>> s) {

       auto rangeCallback = [s](QDateTime minv, QDateTime maxv) {

          if (s.is_subscribed()) {

              // send to the subscriber
              s.on_next(std::make_tuple<QDateTime, QDateTime>(std::move(minv), std::move(maxv)));
          }

       };

       QObject::connect(axis, &QDateTimeAxis::rangeChanged, rangeCallback);
   }); 
}

So with this function I can subscribe to change in the date range on a axis of a QChart.

I wrote also another function that, given two dates, returns an observable with values coming from a sqlite db, like the following

rxcpp::observable<std::tuple<double, double>> Database::getValueRange(const std::string& table, unsigned long start, unsigned long end)
{

   return rxcpp::observable<>::create<std::tuple<double, double>>(
      [this, table, start, end](rxcpp::subscriber<std::tuple<double, double>> s) {

    // get the prepared statement for the query 1, i.e. ohlcv values
    // within a date range
    sqlite3_stmt *stmt = this->m_query3_stms[table].get();

    // bind first parameter, the start timestamp
    int rc = sqlite3_bind_int64(stmt, 1, start);
    checkSqliteCode(rc, m_db.get());

    // bind the second parameter, the end timestamp
    rc = sqlite3_bind_int64(stmt, 2, end);
    checkSqliteCode(rc, m_db.get());

    // step through the query results
    while ( sqlite3_step(stmt)==SQLITE_ROW && s.is_subscribed() ) {

        // extract name values from the current result row
        float minv = sqlite3_column_double(stmt, 0);
        float maxv = sqlite3_column_double(stmt, 1);

        // send to the subscriber
        s.on_next(std::make_tuple<double, double>(minv, maxv));
    }

    // reset the statement for reuse
    sqlite3_reset(stmt);

    // send complete to the subscriber
    s.on_completed();

   });
}

How can I pipe the values from the first functions (two dates) as input to the second function in a idiomatic form in RxCpp? In a way at the end of the pipeline I can subscribe to the values coming from the db based on the input dates?


Solution

  • The canonical way to create a new value range for each new pair of date values is to use map followed by one of the flattening operators

    auto valueRanges = experimentOne(/*params*/).
        map(rxcpp::util::apply_to([](QDateTime d1, QDateTime d2){
          return getValueRange(/*params*/).
              map(rxcpp::util::apply_to([=](double db1, double db2){ 
                  return std::make_tuple(d1, d2, db1, db2); 
              }));
        })).
        /* switch_on_next() or merge() or concat() */
        /* this will produce std::tuple< QDateTime, QDateTime, double, double>
    
    • switch_on_next will cancel the previous value range and start the new value range.
    • merge will produce all the value ranges as soon as possible.
    • concat will produce the values ranges one at a time, in order.

    in the case that the values ranges are run on different threads a thread-safe coordination must be passed to merge so that the value ranges are interleaved safely.

    To select out a particular range, use filter(). If you want to be able to split out the ranges into separate expressions, use publish() to share the ranges first.

    auto hotValueRanges = valueRanges.
        publish().ref_count();
    
    auto aDateRange = hotValueRanges.
        filter(rxcpp::util::apply_to([](QDateTime d1, QDateTime d2, double, double){
            return isADesiredDate(d1, d2);
        })).
        subscribe(/*use the range*/);
    
    auto anotherDateRange = hotValueRanges.
        filter(rxcpp::util::apply_to([](QDateTime d1, QDateTime d2, double, double){
            return isAnotherDesiredDate(d1, d2);
        })).
        subscribe(/*use the range*/);