Search code examples
multithreadingc++11boostconcurrencyfibers

problem with understanding example of code from Boost.Fiber library


I am currently trying (in order to learn) to understand example of code from the Boost.Fiber library : https://www.boost.org/doc/libs/1_71_0/libs/fiber/examples/work_sharing.cpp

//          Copyright Nat Goodspeed + Oliver Kowalke 2015.
// Distributed under the Boost Software License, Version 1.0.
//    (See accompanying file LICENSE_1_0.txt or copy at
//          http://www.boost.org/LICENSE_1_0.txt)

#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <deque>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <sstream>
#include <string>
#include <thread>

#include <boost/assert.hpp>

#include <boost/fiber/all.hpp>

#include <boost/fiber/detail/thread_barrier.hpp>

static std::size_t fiber_count{ 0 };
static std::mutex mtx_count{};
static boost::fibers::condition_variable_any cnd_count{};
typedef std::unique_lock< std::mutex > lock_type;

/*****************************************************************************
 *   example fiber function
 *****************************************************************************/
//[fiber_fn_ws
void whatevah( char me) {
    try {
        std::thread::id my_thread = std::this_thread::get_id(); /**< get ID of initial thread >*/
        {
            std::ostringstream buffer;
            buffer << "fiber " << me << " started on thread " << my_thread << '\n';
            std::cout << buffer.str() << std::flush;
        }
        for ( unsigned i = 0; i < 10; ++i) { /**< loop ten times >*/
            boost::this_fiber::yield(); /**< yield to other fibers >*/
            std::thread::id new_thread = std::this_thread::get_id(); /**< get ID of current thread >*/
            if ( new_thread != my_thread) { /**< test if fiber was migrated to another thread >*/
                my_thread = new_thread;
                std::ostringstream buffer;
                buffer << "fiber " << me << " switched to thread " << my_thread << '\n';
                std::cout << buffer.str() << std::flush;
            }
        }
    } catch ( ... ) {
    }
    lock_type lk( mtx_count);
    if ( 0 == --fiber_count) { /**< Decrement fiber counter for each completed fiber. >*/
        lk.unlock();
        cnd_count.notify_all(); /**< Notify all fibers waiting on `cnd_count`. >*/
    }
}
//]

/*****************************************************************************
 *   example thread function
 *****************************************************************************/
//[thread_fn_ws
void thread( boost::fibers::detail::thread_barrier * b) {
    std::ostringstream buffer;
    buffer << "thread started " << std::this_thread::get_id() << std::endl;
    std::cout << buffer.str() << std::flush;
    boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); /**<
                                                                                    Install the scheduling algorithm `boost::fibers::algo::shared_work` in order to
                                                                                    join the work sharing.
                                                                                    >*/
    b->wait(); /**< sync with other threads: allow them to start processing >*/
    lock_type lk( mtx_count);
    cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /**<
                                                             Suspend main fiber and resume worker fibers in the meanwhile.
                                                             Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`)
                                                             if all worker fibers are complete.
                                                             >*/
    BOOST_ASSERT( 0 == fiber_count);
}
//]

/*****************************************************************************
 *   main()
 *****************************************************************************/
int main( int argc, char *argv[]) {
    std::cout << "main thread started " << std::this_thread::get_id() << std::endl;
    //[main_ws
    boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); /*<
                                                                                    Install the scheduling algorithm `boost::fibers::algo::shared_work` in the main thread
                                                                                    too, so each new fiber gets launched into the shared pool.
                                                                                    >*/

    for ( char c : std::string("abcdefghijklmnopqrstuvwxyz")) { /*<
                                                                 Launch a number of worker fibers; each worker fiber picks up a character
                                                                 that is passed as parameter to fiber-function `whatevah`.
                                                                 Each worker fiber gets detached.
                                                                 >*/
        boost::fibers::fiber([c](){ whatevah( c); }).detach();
        ++fiber_count; /*< Increment fiber counter for each new fiber. >*/
    }
    boost::fibers::detail::thread_barrier b( 4);
    std::thread threads[] = { /*<
                               Launch a couple of threads that join the work sharing.
                               >*/
        std::thread( thread, & b),
        std::thread( thread, & b),
        std::thread( thread, & b)
    };
    b.wait(); /*< sync with other threads: allow them to start processing >*/
    {
        lock_type/*< `lock_type` is typedef'ed as __unique_lock__< [@http://en.cppreference.com/w/cpp/thread/mutex `std::mutex`] > >*/ lk( mtx_count);
        cnd_count.wait( lk, [](){ return 0 == fiber_count; } ); /*<
                                                                 Suspend main fiber and resume worker fibers in the meanwhile.
                                                                 Main fiber gets resumed (e.g returns from `condition_variable_any::wait()`)
                                                                 if all worker fibers are complete.
                                                                 >*/
    } /*<
       Releasing lock of mtx_count is required before joining the threads, otherwise
       the other threads would be blocked inside condition_variable::wait() and
       would never return (deadlock).
       >*/
    BOOST_ASSERT( 0 == fiber_count);
    for ( std::thread & t : threads) { /*< wait for threads to terminate >*/
        t.join();
    }
    //]
    std::cout << "done." << std::endl;
    return EXIT_SUCCESS;
}

I have a problem with to understand how the fibers can continue their execution, in the different thread when all the thread are waiting for the notify_all(), because when all the thread are blocking because of the wait function : cnd_count.wait( lk, [](){ return 0 == fiber_count; } ). So if there are all blocked by the wait function, how is it possible that the fibers continue to execute, I thought that the fibers were executed by the thread with the help of their own scheduling manager. I have read that the threads are just execution unit that the fibers use in order to run their own callable or function in their. So why all the fibers continue to run when all the threads are blocking ?


Solution

  • See: https://www.boost.org/doc/libs/1_71_0/libs/fiber/doc/html/fiber/scheduling.html

    Whenever a thread is suspended the scheduler run the next ready fiber, so whatevah got executed when cnd_count.wait is called.

    You can try removing the line boost::fibers::use_scheduling_algorithm< boost::fibers::algo::shared_work >(); on thread_fn_ws, and you will find all the fiber are ran in the main thread, because scheduler is not installed and thus fiber won't be ran.