Search code examples
c++multithreadingtbbtbb-flow-graph

how to abort execution of a node and its children in tbb flowgraph


I'm currently testing the flow graph feature of tbb. In order to use it, I must be able to abort execution of some node in the graph, including all children which depend on it but leave other children which do not depend on it, executing. Throwing an exception from the body or calling task::cancel_group_execution() aborts the execution of all nodes.

#include <cstdio>
#include "tbb/flow_graph.h"

using namespace tbb::flow;

struct body
{   std::string my_name;
    body( const char *name ) : my_name(name)
    {
    }
    void operator()( continue_msg ) const
    {   if (my_name == "B")
            tbb::task::self().group()->cancel_group_execution();
        else
        {   sleep(1);
            printf("%s\n", my_name.c_str());
        }
    }
};

int main()
{
    graph g;

    broadcast_node< continue_msg > start(g);
    continue_node<continue_msg> a( g, body("A"));
    continue_node<continue_msg> b( g, body("B"));
    continue_node<continue_msg> c( g, body("C"));
    continue_node<continue_msg> d( g, body("D"));
    continue_node<continue_msg> e( g, body("E"));

    make_edge( start, a );
    make_edge( start, b );
    make_edge( a, c );
    make_edge( b, c );
    make_edge( c, d );
    make_edge( a, e );

    for (int i = 0; i < 3; ++i )
        try
        {   start.try_put( continue_msg() );
            g.wait_for_all();
        } catch (...)
        {   printf("Caught exception\n");
        }
    return 0;
}

Solution

  • You can represent abort status with bool instead of continue_msg. Each process_node receive predecessor node status and process task when it's available, and send updated abort status to successor node.

    struct body
    {   std::string my_name;
        body( const char *name ) : my_name(name)
        {
        }
        bool operator()( bool avail ) const
        {   if (!avail)
               printf("%s skipped\n", my_name.c_str());
            else
                if (my_name == "B")
                {   printf("%s fail\n", my_name.c_str());
                    avail = false;  // fail task
                }
                else
                {   sleep(1);
                    printf("%s\n", my_name.c_str());
                }
            return avail;
        }
    };
    
    int main()
    {
        graph g;
    
        typedef function_node<bool, bool> process_node;
        typedef std::tuple<bool,bool> bool_pair;
        broadcast_node< bool > start(g);
        process_node a( g, unlimited, body("A"));
        process_node b( g, unlimited, body("B"));
        process_node c( g, unlimited, body("C"));
        join_node<bool_pair> join_c(g);
        function_node<bool_pair, bool> and_c(g, unlimited, [](const bool_pair& in)->bool {
            return std::get<0>(in) && std::get<1>(in);
        });
        process_node d( g, unlimited, body("D"));
        process_node e( g, unlimited, body("E"));
    
        /*
         * start -+-> A -+-> E
         *        |       \
         *        |        \
         *        |         join_c -> and_c -> C -> D
         *        |        /
         *        |       /
         *        +-> B -- 
         */
        make_edge( start, a );
        make_edge( start, b );
        make_edge( a, input_port<0>(join_c) );
        make_edge( b, input_port<1>(join_c) );
        make_edge( join_c, and_c );
        make_edge( and_c, c );
        make_edge( c, d );
        make_edge( a, e );
    
        for (int i = 0; i < 3; ++i )
            try
            {   start.try_put( true );
                g.wait_for_all();
            } catch (...)
            {   printf("Caught exception\n");
            }
        return 0;
    }