Search code examples
rrcpparmadillorcpparmadillorcppparallel

Why does my RcppParallel implementation of a user-defined function crash unexpectedly?


I have developed a dual chain markov monte carlo model designed to forecast loan portfolios in the excellent package Rcpp but have run into an issue trying to implement a parallelised version of these functions with RcppParallel.

I have based my attempts this far on this vignette (https://gallery.rcpp.org/articles/parallel-distance-matrix/) and this stackoverflow thread (How to call user-defined function in RcppParallel?).

All of the UDFs underlying this logic are implemented using Armadillo type objects, which I understand are threadsafe, and the writing of data between functions and pre-allocated outputs should be working smoothly as I have this same logic implemented successfully in serial functions. It's also true that the function portfolio_simulation_rating_model_rs_ts works well with the inputs used outside of the RcppParallel wrapper and there are no compilation errors or warnings when I source this code and the underlying functions. However, once I get to running the dcmcmc_portfolio_rating_model_parallel function in R, my session crashes only saying that there has been a fatal error.

Clearly I am missing something in the parallelisation, so any help/suggestions would be greatly appreciated.

    // [[Rcpp::depends(RcppArmadillo, RcppParallel)]]
    #include <string>
    #include <algorithm>
    #include <vector>
    #include <math.h>
    #include <RcppArmadillo.h>
    #include <RcppParallel.h>
    using namespace arma;
    using namespace RcppParallel;
    using namespace Rcpp;
    using namespace std;

struct dcmcmc_portfolio_rating_model_worker : public Worker {

  // Input Values
  const int n_loans ;
  const int n_regime ;
  const int n_matrix ;
  const int n_amort ;
  const RVector<double> loan_ids ;
  const RVector<double> starting_balances ;
  const RVector<double> starting_positions ;
  const RVector<double> cprs ;
  const RVector<double> sim_regime_indices ;
  const RVector<double> loan_regime_indices ;
  const RVector<double> starting_periods ;
  const RVector<double> regime_matrix_indices ;
  const RVector<double> matrix_indices ;
  const RVector<double> matrix_elements ;
  const int nrow ;
  const int ncol ;
  const RMatrix<double> amortisation_schedules ;
  const int periods ;
  const int iterations ;

  // Output Matrix
  RMatrix<double> output_mx ;

  dcmcmc_portfolio_rating_model_worker(
    const int& n_loans,
    const int& n_regime,
    const int& n_matrix,
    const int& n_amort,
    const NumericVector& loan_ids,
    const NumericVector& starting_balances,
    const NumericVector& starting_positions,
    const NumericVector& cprs,
    const NumericVector& sim_regime_indices,
    const NumericVector& loan_regime_indices,
    const NumericVector& starting_periods,
    const NumericVector& regime_matrix_indices,
    const NumericVector& matrix_indices,
    const NumericVector& matrix_elements,
    const int& nrow,
    const int& ncol,
    const NumericMatrix& amortisation_schedules,
    const int& periods,
    const int& iterations,
    NumericMatrix& output_mx)
    : n_loans(n_loans),
      n_regime(n_regime),
      n_matrix(n_matrix),
      n_amort(n_amort),
      loan_ids(loan_ids),
      starting_balances(starting_balances),
      starting_positions(starting_positions),
      cprs(cprs),
      sim_regime_indices(sim_regime_indices),
      loan_regime_indices(loan_regime_indices),
      starting_periods(starting_periods),
      regime_matrix_indices(regime_matrix_indices),
      matrix_indices(matrix_indices),
      matrix_elements(matrix_elements),
      nrow(nrow),
      ncol(ncol),
      amortisation_schedules(amortisation_schedules),
      periods(periods),
      iterations(iterations),
      output_mx(output_mx) {}
  
  // Setting up functions to convert inputs to arma
  
  arma::vec convert_input_vector(RVector<double> input_vector, int length)
  {RVector<double> tmp_input_vector = input_vector ;
   arma::vec input_vector_ts(tmp_input_vector.begin(), length, false) ;
   return input_vector_ts ;}
  
  arma::mat convert_input_matrix(RMatrix<double> input_matrix, int rows, int cols)
  {RMatrix<double> tmp_input_matrix = input_matrix ;
   arma::mat input_matrix_ts(tmp_input_matrix.begin(), rows, cols, false) ;
   return input_matrix_ts ;}
  
  // Function to iterate

  void operator()(std::size_t begin, std::size_t end){
    
    arma::vec loan_ids_ts = convert_input_vector(loan_ids, n_loans) ;
    arma::vec starting_balances_ts = convert_input_vector(starting_balances, n_loans) ;
    arma::vec starting_positions_ts  = convert_input_vector(starting_positions, n_loans) ;
    arma::vec cprs_ts = convert_input_vector(cprs, n_loans) ;
    arma::vec sim_regime_indices_ts = convert_input_vector(sim_regime_indices, n_regime);
    arma::vec loan_regime_indices_ts = convert_input_vector(loan_regime_indices, n_regime) ;
    arma::vec starting_periods_ts  = convert_input_vector(starting_periods, n_regime) ;
    arma::vec regime_matrix_indices_ts  = convert_input_vector(regime_matrix_indices, n_regime);
    arma::vec matrix_indices_ts = convert_input_vector(matrix_indices, n_matrix) ;
    arma::vec matrix_elements_ts = convert_input_vector(matrix_elements, n_matrix) ;
    arma::mat amortisation_schedules_ts = convert_input_matrix(amortisation_schedules, n_amort, 3) ;

    for(unsigned int i = begin; i < end; i++){

      arma::vec i_sim_regime_indices = allwhich_ts(sim_regime_indices_ts,
                                                   i) ;

      int sim_begin = as_scalar(i_sim_regime_indices.head(1)) ;
      int sim_end = as_scalar(i_sim_regime_indices.tail(1)) ;
      
      arma::vec i_loan_regime_indices = loan_regime_indices_ts.subvec(sim_begin, sim_end) ;

      arma::vec i_starting_periods = starting_periods_ts.subvec(sim_begin, sim_end) ;

      arma::vec i_regime_matrix_indices = regime_matrix_indices_ts.subvec(sim_begin, sim_end) ;

      arma::mat pf_simulation = portfolio_simulation_rating_model_rs_ts(
        loan_ids_ts,
        starting_balances_ts,
        starting_positions_ts,
        cprs_ts,
        i_loan_regime_indices,
        i_starting_periods,
        i_regime_matrix_indices,
        matrix_indices_ts,
        matrix_elements_ts,
        nrow,
        ncol,
        amortisation_schedules_ts,
        periods
      ) ;

      int sim_rows = pf_simulation.n_rows ;

      int sim_cols = pf_simulation.n_cols ;

      for(int c = 0; c < sim_cols; c++){

        for(int r = 0; r < sim_rows; r++){

          output_mx((n_loans*periods*i + r), c) = pf_simulation(r, c) ;

        }

      }

      for(int r = 0; r < sim_rows; r++){

        output_mx((n_loans*periods*i + r), 7) = (i + 1) ;

      }

    }

  }

};

//[[Rcpp::export]]

NumericMatrix dcmcmc_portfolio_rating_model_parallel(
    const NumericVector& loan_ids,
    const NumericVector& starting_balances,
    const NumericVector& starting_positions,
    const NumericVector& cprs,
    const NumericVector& sim_regime_indices,
    const NumericVector& loan_regime_indices,
    const NumericVector& starting_periods,
    const NumericVector& regime_matrix_indices,
    const NumericVector& matrix_indices,
    const NumericVector& matrix_elements,
    int nrow,
    int ncol,
    const NumericMatrix& amortisation_schedules,
    int periods,
    int iterations
  ){

  int n_loans = loan_ids.size() ;
  
  int n_regime = sim_regime_indices.size() ;
  
  int n_matrix = matrix_indices.size() ;
  
  int n_amort = amortisation_schedules.nrow() ;

  NumericMatrix output_mx(n_loans*periods*iterations, 8) ;

  // Creating Worker object

  dcmcmc_portfolio_rating_model_worker DCMCMC(
      n_loans,
      n_regime,
      n_matrix,
      n_amort,
      loan_ids,
      starting_balances,
      starting_positions,
      cprs,
      sim_regime_indices,
      loan_regime_indices,
      starting_periods,
      regime_matrix_indices,
      matrix_indices,
      matrix_elements,
      nrow,
      ncol,
      amortisation_schedules,
      periods,
      iterations,
      output_mx
  ) ;

  // Call parellised worker
  
  parallelFor(0, iterations, DCMCMC) ;

  return(output_mx) ;

}

EDIT:

I have produced a minimum reproducible example, trying to incorporate the helpful comments recieved on this post so far. The example sets up trivial functions designed to mimic the structure of my modelling functions. The final function causing a crash takes three vectors, vec1, vec2, and vec_ind. It applies a worker which attempts to take chunks of equal size (indentified by indices stored in vec_ind) of vec1 and vec2, add these subvector chunks, and store the results in the relevant portions of an output vector.

I have reproduced the example below using both arma::vec and std::vector types and experience the crashing behaviour in both. I present the std::vector code below, further to Dirk's suggestion that the RcppArmadillo types may be relying on R memory, and I have removed all namespace inclusions other than RcppParallel to avoid conflicts, as per onyambu's remark.

Here is the Rcpp

// [[Rcpp::depends(RcppArmadillo, RcppParallel)]]
#include <string>
#include <algorithm>
#include <vector>
#include <math.h>
#include <RcppArmadillo.h>
#include <RcppParallel.h>
using namespace RcppParallel;

//[[Rcpp::export]]

std::vector<double> allwhich_ts(std::vector<double> vector, double value){
  
  int length = vector.size() ;
  
  std::vector<double> values(0) ;
  
  for(int i = 0; i < length; i++){
    
    bool match = vector[i] == value;
    
    if(match){
      
      values.push_back(i);
      
    }
  }
  
  return(values);
  
}

//[[Rcpp::export]]

std::vector<double> vector_addition(std::vector<double> vector1, std::vector<double> vector2){
  
  int n_elements = vector1.size() ;
  
  std::vector<double> output_vec = std::vector<double>(n_elements) ;
  
  for(int i = 0; i < n_elements; i++){
    
    output_vec[i] = vector1[i] + vector2[i] ;
    
  }
  
  return(output_vec) ;
  
}

struct vector_addition_worker : public Worker {
  
  const RVector<double> vector1 ;
  const RVector<double> vector2 ;
  const RVector<double> vector_indices ;
  const int vector_length ;
  
  RVector<double> output_vec ;
  
  vector_addition_worker(
    const Rcpp::NumericVector& vector1,
    const Rcpp::NumericVector& vector2,
    const Rcpp::NumericVector& vector_indices,
    const int& vector_length,
    Rcpp::NumericVector& output_vec
  ) : vector1(vector1),
      vector2(vector2),
      vector_indices(vector_indices),
      vector_length(vector_length),
      output_vec(output_vec) {}
  
  std::vector<double> convert_input_vec(RVector<double> input_vector, int vec_length){
    RVector<double> tmp_vector = input_vector ;
    std::vector<double> input_vector_ts(tmp_vector.begin(), tmp_vector.end()) ;
    return(input_vector_ts) ;
  }
  
  void operator()(std::size_t begin, std::size_t end){
    
    std::vector<double> vector1_ts = convert_input_vec(vector1, vector_length) ;
    std::vector<double> vector2_ts = convert_input_vec(vector2, vector_length) ;
    std::vector<double> vector_indices_ts = convert_input_vec(vector_indices, vector_length) ;

    for(unsigned int i = begin; i < end; i++){
      
      std::vector<double> indices = allwhich_ts(vector_indices_ts, i) ;
      
      int values_begin = indices.at(1) ;
      int values_end = indices.at(std::distance(indices.begin(), indices.end())) ;
      
      std::vector<double> values1(vector1_ts.begin() + values_begin, vector1_ts.begin() + values_end) ;
      std::vector<double> values2(vector2_ts.begin() + values_begin, vector2_ts.begin() + values_end) ;
      
      std::vector<double> interim_op = vector_addition(values1, values2) ;
      
      int op_size = interim_op.size() ;
      
      for(int n = 0; n < op_size; n++){
        
        output_vec[i*op_size + n] = interim_op[n] ;
        
      }
      
    }
    
  }
  
};

//[[Rcpp::export]]

Rcpp::NumericVector vector_addition_parallel(Rcpp::NumericVector vec1,
                                             Rcpp::NumericVector vec2,
                                             Rcpp::NumericVector vec_ind){
  
  int vec_length = vec1.size() ;
  
  double n_indices = *std::max_element(vec_ind.begin(), vec_ind.end()) ;
  
  Rcpp::NumericVector op_vec(vec_length);
  
  vector_addition_worker vec_add_worker(
    vec1,
    vec2,
    vec_ind,
    vec_length,
    op_vec
  ) ;
  
  parallelFor(0, n_indices, vec_add_worker) ;
  
  return(op_vec) ;
  
}

Here is the R code which tests for expected behaviour

library(Rcpp)
library(RcppParallel)
library(RcppArmadillo)

# Setting up dummy data

vec1 = rep(1, 500)
vec2 = rep(1, 500)
vec_inds = sort(rep(1:20, 25))

length(vec1);length(vec2);length(vec_inds)

## Checking that allwhich_ts is working as expected

allwhich_ts(vec_inds, 1)

# Checking that vector_addition is working as expected

vector_addition(vec1, vec2)

# Checking that the same logic can be applied serially (mainly to verify data handling method)

r_repro_function <- function(vec1, vec2, vec_inds){
  
  op_vec = numeric(length(vec1))
  
  for(i in unique(vec_inds)){
    tmp1 = vec1[vec_inds == i]
    tmp2 = vec2[vec_inds == i]
    
    tmp_op = tmp1 + tmp2
    
    for(n in 1:length(tmp1)){
      
      op_vec[(i - 1)*length(tmp1) + n] = tmp_op[n]
      
    }
    
  }
  
  op_vec
  
}

r_repro_function(vec1, vec2, vec_inds)

vector_addition_parallel(vec1 = vec1,
                         vec2 = vec2,
                         vec_ind = vec_inds)

Solution

  • So following Dirk's suggestion I am posting an answer with a pared back example to illustrate the problem I had and the solution I arrived at with his help.

    The mistake I made was actually in how I treated the begin and end variables within my worker. In contrast to the articles in the RcppParallel gallery, I was not using begin/end to guide iterators to the relevant portions of the calculation, but rather trying to use them to index the relevant part of my input dataset for each portion.

    This caused dimension errors, which on my machine simply crashed the R session.

    The solution to this mistake would be to either (1) ensure any UDFs you are applying deal in iterators rather than vector values or (2) to bridge the begin/end variables correctly to the vectors you are trying to index.

    Given that all of my modelling functions are already in the business of taking vector indices, I have applied the second approach and create a unique_indices vector within my function which the begin/end values can simply select values from. The current solution makes some assumptions about how the input indices will work (i.e. simply integer values from smallest to largest in the argument vector).

    Apologies if this is still considered verbose, but I thought it worth keeping the data-handling logic as it was in the problem statement because that is where the problem arose. That is where a submatrix is identified by an index and used as the arguments to some calculation. The key differences to the example above are on lines 48-52 and 62-65

    Where (1) each i between begin and end is used to select an index as so int index_value = unique_indices[i] ; which then identifies the relevant input data and (2) the unique_indices vector is defined by the characteristics of the vector of indices vec_ind

    :

    // [[Rcpp::depends(RcppArmadillo, RcppParallel)]]
    #include <string>
    #include <algorithm>
    #include <vector>
    #include <math.h>
    #include <RcppArmadillo.h>
    #include <RcppParallel.h>
    using namespace RcppParallel;
    
    //[[Rcpp::export]]
    
    std::vector<double> allwhich_ts(std::vector<double> vector, double value){
      int length = vector.size() ;
      std::vector<double> values(length) ;
      int matches = 0;
      for(int i = 0; i < length; i++){
        bool match = vector[i] == value;
        if(match){values[matches] = i;
          matches++ ;}}
      std::vector<double> op(values.begin(), values.begin() + matches) ;
      return(op);
    }
    
    struct vector_double_worker : public Worker {
      // Defining worker arguments
      const RVector<double> vector1 ;
      const RVector<double> vector_indices ;
      const RVector<double> unique_indices ;
      const int vector_length ;
      RVector<double> output_vec ;
      // Initialising function argument values
      vector_double_worker(
        const Rcpp::NumericVector& vector1, const Rcpp::NumericVector& vector_indices,
        const Rcpp::NumericVector& unique_indices, const int& vector_length, Rcpp::NumericVector& output_vec
      ) : vector1(vector1),vector_indices(vector_indices),unique_indices(unique_indices),
          vector_length(vector_length),output_vec(output_vec) {}
      // Setting up conversion function so that UDFs can deal in std:: types
      std::vector<double> convert_input_vec(RVector<double> input_vector, int vec_length){
        std::vector<double> input_vector_ts(input_vector.begin(), input_vector.end()) ;
        return(input_vector_ts) ;}
      // Defining operator ranges which will breakdown the task into partitions
      void operator()(std::size_t begin, std::size_t end){
      // Converting input vectors to std types
        std::vector<double> vector1_ts = convert_input_vec(vector1, vector_length) ;
        std::vector<double> vector_indices_ts = convert_input_vec(vector_indices, vector_length) ;
      // For loop to perform calculations for each element in a given partition
        for(unsigned int i = begin; i < end; i++){
          int index_value = unique_indices[i] ; // begin and end now used to index the vector of input indices defined outside of the function
          std::vector<double> indices = allwhich_ts(vector_indices_ts, index_value) ; // identifying sub-vector indices
          int values_begin = indices.at(0) ;
          int values_end = indices.at(std::distance(indices.begin(), indices.end()) - 1) ; // - 1 was added to avoid dimension error
          std::vector<double> values1(vector1_ts.begin() + values_begin, vector1_ts.begin() + values_end + 1) ; // + 1 was added to avoid dimension error
          int op_size = values1.size() ;
          for(int n = 0; n < op_size; n++){output_vec[i*op_size + n] = values1[n] * 2 ;} // Trivial example calculation
        }}};
    
    //[[Rcpp::export]]
    
    Rcpp::NumericVector vector_double_parallel(Rcpp::NumericVector vec1, Rcpp::NumericVector vec_ind){
      int vec_length = vec1.size() ; // Setting up output vector
      Rcpp::NumericVector op_vec(vec_length);
      double n_indices = *std::max_element(vec_ind.begin(), vec_ind.end()) ; // Identifying unique index values
      double min_indices = *std::min_element(vec_ind.begin(), vec_ind.end()) ;
      Rcpp::NumericVector unique_indices(n_indices) ;
      std::iota(unique_indices.begin(), unique_indices.end(), min_indices);
      vector_double_worker vec_2_worker(vec1,vec_ind,unique_indices,vec_length,op_vec) ; // Setting up parallel worker
      parallelFor(0, n_indices, vec_2_worker) ; // Populating output vector with results
      return(op_vec) ;}