In short, i have a data source that provides a shared_ptr type. This pointer seems to go directly out of scope in source_node's operator overload. I have added a fully simplified example demonstrating the issue.
My question: what is an elegant why to overcome this problem?
#include <exception>
#include <tbb/concurrent_queue.h>
#include <tbb/flow_graph.h>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <memory>
#include <iostream>
using namespace tbb::flow;
// Plain Data Object
class Data
{
public:
size_t _data;
Data(size_t d) : _data(d){}
};
// simpel counting node
class CountingNode
{
private:
size_t _count;
public:
CountingNode():_count(0){}
size_t operator()(std::shared_ptr<Data> data)
{
if (static_cast<bool>(data)) // <-- PROBLEM always false
_count += data->_data;
std::cout << _count << std::endl;
return _count;
}
size_t count() { return _count;}
};
// Source,
// exhausted when a empty/null shared_ptr is encountered
// our datasource provides a shared_ptr to a data object
class SourceNode
{
public:
std::shared_ptr<tbb::concurrent_bounded_queue<std::shared_ptr<Data>>> _queue;
std::shared_ptr<std::mutex> _mtx;
bool _started;
SourceNode() : _started(false)
{
_queue = std::shared_ptr<tbb::concurrent_bounded_queue<std::shared_ptr<Data>>>(new tbb::concurrent_bounded_queue<std::shared_ptr<Data>>());
_mtx = std::shared_ptr<std::mutex>(new std::mutex());
}
SourceNode(const SourceNode& other)
{
_queue = other._queue;
_mtx = other._mtx;
_started = other._started;
}
void push(std::shared_ptr<Data> data)
{
_queue->push(data);
}
bool operator ()(std::shared_ptr<Data> data)
{
std::unique_lock<std::mutex> ul(*_mtx);
{
if (!_started)
{
_started = true;
for(size_t idx = 0; idx != 10; ++idx)
_queue->push(std::shared_ptr<Data>(new Data(idx)));
_queue->push(std::shared_ptr<Data>()); // no more data
}
}
_queue->pop(data);
return static_cast<bool>(data);
}
void close()
{
_queue->push(std::shared_ptr<Data>());
}
};
int main(int argc, char* argv[])
{
graph g;
source_node<std::shared_ptr<Data>> source(g,SourceNode(),false);
function_node<std::shared_ptr<Data>,int> sink(g,tbb::flow::serial, CountingNode());
make_edge(source, sink );
source.activate();
g.wait_for_all();
return 0;
}
If more details need to be provided just let me know.
Kind Regards Auke-Dirk
You should accept data
argument passed toSourceNode
by reference to return updated value to the caller:
class SourceNode
{
...
bool operator ()(std::shared_ptr<Data>& data)
{
...
_queue->pop(data);
return static_cast<bool>(data);
}
}
Otherwise, only local copy of data
parameter if updated by operator and further actions are performed on default constructed (empty) value.