Search code examples
c#c++multithreadingc++-clistddeque

Avoiding std::deque iterator not dereferencable error between unmanaged c++, c++/cli and c# code


I have a VS2015 solution comprised of unmanaged c++ code (to do some CPU intensive simulation computations), a c++/cli wrapper around this code and a c# project which calls the c++/cli wrappers via a DLL. The following example is a simplified version of the full code, sorry for the quantity of code in advance, but it is required for a complete picture of what is going on.

Unmanaged C++ Code

class diffusion_limited_aggregate {
public:
    diffusion_limited_aggregate() 
        : aggregate_map(), attractor_set(), batch_queue() {}
    std::size_t size() const noexcept { return aggregate_map.size(); }
    std::queue<std::pair<int,int>>& batch_queue_handle() noexcept { return batch_queue; }
    void generate(std::size_t n) {
        initialise_attractor_structure(); // set up initial attractor seed points
        std::size_t count = 0U;
        std::pair<int,int> current = std::make_pair(0,0);
        std::pair<int,int> prev = current;
        bool has_next_spawned = false;
        while (size() < n) {
            if (!has_next_spawned) {
                // => call function to spawn particle setting current 
                has_next_spawned = true;
            }
            prev = current;
            // => call function to update random walking particle position
            // => call function to check for lattice boundary collision
            if (aggregate_collision(current, prev, count)) has_next_spawned = false;
        }
    }
    void initialise_attractor_structure() {
        attractor_set.clear();
        attractor_set.insert(std::make_pair(0,0));
    }
    void push_particle(const std::pair<int,int>& p, std::size_t count) {
        aggregate_map.insert(std::make_pair(p, count));
        batch_queue.push(p);
    }
    bool aggregate_collision(const std::pair<int,int>& current,
        const std::pair<int,int>& prev, std::size_t& count) {
        if (aggregate_map.find(current) != aggregate_map.end() 
            || attractor_set.find(current) != attractor_set.end()) {
            push_particle(previous, ++count);
            return true;
        }
        return false;
    }
private:
    std::unordered_map<std::pair<int,int>, 
        std::size_t,
        utl::tuple_hash> aggregate_map;
    std::unordered_set<std::pair<int,int>, utl::tuple_hash> attractor_set;
    std::queue<std::pair<int,int>> batch_queue; // holds buffer of aggregate points
};

Where utl::tuple_hash is a hashing function object for std::pair and, more generally, std::tuple instances, defined as:

namespace utl {
    template<class Tuple, std::size_t N>
    struct tuple_hash_t {
        static std::size_t tuple_hash_compute(const Tuple& t) {
            using type = typename std::tuple_element<N-1, Tuple>::type;
            return tuple_hash_t<Tuple,N-1>::tuple_hash_compute(t)
                + std::hash<type>()(std::get<N-1>(t));
        }
    };
    // base
    template<class Tuple>
    struct tuple_hash_t<Tuple, 1> {
        static std::size_t tuple_hash_compute(const Tuple& t) {
            using type = typename std::tuple_element<0,Tuple>::type;
            return 51U + std::hash<type>()(std::get<0>(t))*51U;
        }
    };
    struct tuple_hash {
        template<class... Args>
        std::size_t operator()(const std::tuple<Args...>& t) const {
            return tuple_hash_t<std::tuple<Args...>,sizeof...(Args)>::tuple_hash_compute(t);
        }
        template<class Ty1, class Ty2>
        std::size_t operator()(const std::pair<Ty1, Ty2>& p) const {
            return tuple_hash_t<std::pair<Ty1,Ty2>,2>::tuple_hash_compute(p);
        }
    };
}

Managed C++/CLI Wrapper

The following is a wrapper in c++/cli around the class diffusion_limited_aggregate, the important method in this case is ProcessBatchQueue. This method is where the std::deque iterator not dereferencable error must occur as it is the only place in which the batch_queue contents are being accessed and popped.

public ref class ManagedDLA2DContainer {
private:
    diffusion_limited_aggregate* native_dla_2d_ptr;
    System::Object^ lock_obj = gcnew System::Object();
public:
    ManagedDLA2DContainer() : native_dla_2d_ptr(new diffusion_limited_aggregate()) {}
    ~ManagedDLA2DContainer() { delete native_dla_2d_ptr; }
    std::size_t Size() { return native_dla_2d_ptr->size(); }
    void Generate(std::size_t n) { native_dla_2d_ptr->generate(n); }
    System::Collections::Concurrent::BlockingCollection<
        System::Collections::Generic::KeyValuePair<int,int>
    >^ ProcessBatchQueue() {
        // store particles in blocking queue configuration
        System::Collections::Concurrent::BlockingCollection<
            System::Collections::Generic::KeyValuePair<int,int>>^ blocking_queue =
            gcnew System::Collections::Concurrent::BlockingCollection<
                System::Collections::Generic::KeyValuePair<int,int>
            >();
        System::Threading::Monitor::Enter(lock_obj); // define critical section start
        try {
            // get ref to batch_queue
            std::queue<std::pair<int,int>>& bq_ref = native_dla_2d_ptr->batch_queue_handle();
            // loop over bq transferring particles to blocking_queue
            while (!bq_ref.empty()) {
                auto front = std::move(bq_ref.front());
                blocking_queue->Add(System::Collections::Generic::KeyValuePair<int,int>(front.first,front.second));
                bq_ref.pop();
            }
        }
        finally { System::Threading::Monitor::Exit(lock_obj); }
        return blocking_queue;
    }
}

C# Code

Finally, I have the following c# code which uses ManagedDLA2DContainer to produce aggregates and display them on an interface.

public partial class MainWindow : Window {
    private static readonly System.object locker = new object();
    private readonly ManagedDLA2DContainer dla_2d;
    public MainWindow() {
        InitializeComponent();
        dla_2d = new ManagedDLA2DContainer();
    }
    private void GenerateAggregate(uint n) {
        // start asynchronous task to perform aggregate simulation computations
        Task.Run(() => CallNativeCppAggregateGenerators(n));
        System.Threading.Thread.Sleep(5);
        // start asynchronous task to perform rendering
        Task.Run(() => AggregateUpdateListener(n));
    }
    private void CallNativeCppAggregateGenerators(uint n) {
        dla_2d.Generate(n);
    }
    private void AggregateUpdateListener(uint n) {
        const double interval = 10.0;
        Timer timer = new Timer(interval);
        timer.Elapsed += Update2DAggregateOnTimedEvent;
        timer.AutoReset = true;
        timer.Enabled = true;
    }
    private void Update2DAggregateOnTimedEvent(object source, ElapsedEventArgs e) {
        lock(locker) {
            BlockingCollection<KeyValuePair<int,int>> bq = dla_2d.ProcessBatchQueue();
            while(bq.Count != 0) {
                KeyValuePair<int,int> p = bq.Take();
                Point3D pos = new Point3D(p.Key, p.Value, 0.0);
                // => do stuff with pos, sending to another class method for rendering
                // using Dispatcher.Invoke(() => { ... }); to render in GUI
            }
        }
    }
}

The method GenerateAggregate is only called once per aggregate execution, it is called via a button handler method as I have a Generate method on the interface with a OnGenerateButtonClicked event handler function which calls GenerateAggreate. Both CallNativeCppAggregateGenerators and AggregateUpdateListener are not called anywhere else in the code either.


The Issue

As mentioned in the managed wrapper section, when executing this code I occasionally get the run-time assertion error,

std::deque iterator not dereferencable.

This tends to occur when first executing but it does also occur in the middle of an ongoing aggregate generation process too, so the start-up code for generating the aggregate is likely not a culprit here.

How could I go about in resolving this issue? Hopefully it's a simple case of some logic error in my critical section code or similar, but I haven't been able to pinpoint the exact problem yet.

As pointed out in the comments, the issue could be that elements are continuously being added batch_queue whilst the C# thread calling ProcessBatchQueue is consuming the queue elements thereby possibly invalidating batch_queue's iterators. Is there a typical producer-consumer design pattern that could be applied to this use case?

Edit: It would be nice if the downvoter could give their reasons so that I can improve the question.


Solution

  • I arrived at a solution for this problem which will be detailed below. As suggested in the question, the issue was that when processing the batch_queue its' iterators would occasionally be invalidated due to continuously pushing elements to the queue in the aggregate generation process.

    This solution uses slightly more memory than the previous batch_queue based implementation, however it is safe as far as iterator validity is concerned. I replaced the batch_queue with a std::vector<std::pair<int,int>> buffer of aggregate particles in the native c++ code:

    class diffusion_limited_aggregate {
    public:
    //...
        const std::vector<std::pair<int,int>>& aggregate_buffer() const noexcept { return buffer; }
    private:
    //...
        std::vector<std::pair<int,int>> buffer;
    };
    

    Then ManagedDLA2DContainer::ProcessBatchQueue was replaced with ManagedDLA2DContainer::ConsumeBuffer which reads up to a marked index and pushes the most recent batch of aggregate particles to a c# List<KeyValuePair<int,int>>:

    System::Collections::Generic::List<System::Collections::Generic::KeyValuePair<int, int>>^ ConsumeBuffer(std::size_t marked_index) {
            System::Collections::Generic::List<System::Collections::Generic::KeyValuePair<int, int>>^ buffer =
                gcnew System::Collections::Generic::List<System::Collections::Generic::KeyValuePair<int, int>>();
            if (native_dla_2d_ptr->aggregate_buffer().empty()) return buffer;
            System::Threading::Monitor::Enter(lock_obj);    // define critical section start
            try {   // execute critical section
                // read from last marked buffer index up to size of buffer and write these data to batch list
                for (int i = marked_index; i < native_dla_2d_ptr->aggregate_buffer().size(); ++i) {
                    buffer->Add(System::Collections::Generic::KeyValuePair<int, int>(
                        native_dla_2d_ptr->aggregate_buffer()[i].first,
                        native_dla_2d_ptr->aggregate_buffer()[i].second
                        )
                    );
                }
            }
            finally { System::Threading::Monitor::Exit(lock_obj); } // exit critical section by releasing exclusive lock
            return buffer;
    }
    

    And finally the code in the c# MainWindow::Update2DAggregateOnTimedEvent method was altered to reflect these changes in the c++/cli code:

    private void Update2DAggregateOnTimedEvent(object source, ElapsedEventArgs e, uint n) {
        lock (locker) {
            List<KeyValuePair<int,int>> buffer = dla_2d.ConsumeBuffer(
                (current_particles == 0) ? 0 : current_particles-1); // fetch batch list
            foreach (var p in buffer) {
                // => add p co-ords to GUI manager...
                ++current_particles;
                // => render aggregate...
            }
        }
    }