Search code examples
c#c++exceptionfibers

Using managed threads and fibers in CLR


Okay, the following link has a warning that the discussion uses unsupported and undocumented apis. Well I'm trying to use the code sample any way. It mostly works. Any ideas about the specific issue below relating to exceptions?

http://msdn.microsoft.com/en-us/magazine/cc164086.aspx

FYI, I made an improvement over the original sample. It was maintaining a pointer to the "previousfiber". Instead, the updated sample below uses a "mainfiber" pointer which gets passed to every fiber class. In that way, they always yield back to the main fiber. That allows the main fiber to handle scheduling for all other fibers. The other fibers always "yield" back to the main fiber.

The reason for posting this question has to do with throwing exceptions inside a fiber. According to the article, by using the CorBindToRunTime API with CreateLogicalThreadState(), SwitchOutLogicalThreadState(), etc, the framework will create a managed thread for each fiber and properly handle exceptions.

However, in the included code examples it has an UUnit test which experiments with throwing a managed exception within a Fiber and also catching it within the same fiber. That soft of works. But after handling it by logging a message, it seems the stack is in a bad state because if the fiber calls any other method even an empty method, the whole application crashes.

This implies to me that SwitchOutLogicalThreadState() and SwitchInLogicalThreadState() maybe aren't being used properly or else maybe they're not doing their job.

NOTE: One clue to the problem is that the managed code logs out the Thread.CurrentThread.ManagedThreadId and it is the same for every fiber. This suggests that the CreateLogicalThreadState() method didn't really create a new managed thread as advertised.

To analyze this better, I have made a pseudocode listing of the order of low level APIs called to handle the fibers. Remember, that fibers all run on the same thread so there's nothing concurrently happening, it's a linear logic. The necessary trick of course is to save and restore the stack. That's where it seems to be having trouble.

It starts out as simply a thread so then it converts to a fiber:

  1. ConvertThreadToFiber(objptr);
  2. CreateFiber() // create several win32 fibers.

Now invoke a fiber the first time, it's startup method does this:

  1. corhost->SwitchOutLogicalThreadState(&cookie); The main cookie is held on the stack.
  2. SwitchToFiber(); // first time calls the fiber startup method
  3. corhost->CreateLogicalThreadState();
  4. run the main fiber abstract method.

Eventually the fiber needs to yield back to the main fiber:

  1. corhost->SwitchOutLogicalThreadState(&cookie);
  2. SwitchToFiber(fiber);
  3. corhost->SwitchInLogicalThreadState(&cookie); // the main fiber cookie, right?

Also the main fiber will resume a preexisting fiber:

  1. corhost->SwitchOutLogicalThreadState(&cookie);
  2. SwitchToFiber(fiber);
  3. corhost->SwitchInLogicalThreadState(&cookie); // the main fiber cookie, right?

The following is fibers.cpp which wraps the fiber api for managed code.

#define _WIN32_WINNT 0x400

#using <mscorlib.dll>
#include <windows.h>
#include <mscoree.h>
#include <iostream>
using namespace std;

#if defined(Yield)
#undef Yield
#endif

#define CORHOST

namespace Fibers {

typedef System::Runtime::InteropServices::GCHandle GCHandle;

VOID CALLBACK unmanaged_fiberproc(PVOID pvoid);

__gc private struct StopFiber {};

enum FiberStateEnum {
    FiberCreated, FiberRunning, FiberStopPending, FiberStopped
};

#pragma unmanaged

#if defined(CORHOST)
ICorRuntimeHost *corhost;

void initialize_corhost() {
    CorBindToCurrentRuntime(0, CLSID_CorRuntimeHost,
        IID_ICorRuntimeHost, (void**) &corhost);
}

#endif

void CorSwitchToFiber(void *fiber) {
#if defined(CORHOST)
    DWORD *cookie;
    corhost->SwitchOutLogicalThreadState(&cookie);
#endif
    SwitchToFiber(fiber);
#if defined(CORHOST)
    corhost->SwitchInLogicalThreadState(cookie);
#endif
}

#pragma managed

__gc __abstract public class Fiber : public System::IDisposable {
public:
#if defined(CORHOST)
    static Fiber() { initialize_corhost(); }
#endif
    Fiber() : state(FiberCreated) {
        void *objptr = (void*) GCHandle::op_Explicit(GCHandle::Alloc(this));
        fiber = ConvertThreadToFiber(objptr);
        mainfiber = fiber;
        //System::Console::WriteLine( S"Created main fiber.");
}

    Fiber(Fiber *_mainfiber) : state(FiberCreated) {
        void *objptr = (void*) GCHandle::op_Explicit(GCHandle::Alloc(this));
        fiber = CreateFiber(0, unmanaged_fiberproc, objptr);
        mainfiber = _mainfiber->fiber;
        //System::Console::WriteLine(S"Created worker fiber");
    }

    __property bool get_IsRunning() {
        return state != FiberStopped;
    }

    int GetHashCode() {
        return (int) fiber;
    }


    bool Resume() {
        if(!fiber || state == FiberStopped) {
            return false;
        }
        if( state == FiberStopPending) {
            Dispose();
            return false;
        }
        void *current = GetCurrentFiber();
        if(fiber == current) {
            return false;
        }
        CorSwitchToFiber(fiber);
        return true;
    }

    void Dispose() {
        if(fiber) {
            void *current = GetCurrentFiber();
            if(fiber == current) {
                state = FiberStopPending;
                CorSwitchToFiber(mainfiber);
            }
            state = FiberStopped;
            System::Console::WriteLine( S"\nDeleting Fiber.");
            DeleteFiber(fiber);
            fiber = 0;
        }
    }
protected:
    virtual void Run() = 0;


    void Yield() {
        CorSwitchToFiber(mainfiber);
        if(state == FiberStopPending)
            throw new StopFiber;
    }
private:
    void *fiber, *mainfiber;
    FiberStateEnum state;

private public:
    void main() {
        state = FiberRunning;
        try {
            Run();
        } catch(System::Object *x) {
            System::Console::Error->WriteLine(
                S"\nFIBERS.DLL: main Caught {0}", x);
        }
        Dispose();
    }
};

void fibermain(void* objptr) {
    //System::Console::WriteLine(   S"\nfibermain()");
    System::IntPtr ptr = (System::IntPtr) objptr;
    GCHandle g = GCHandle::op_Explicit(ptr);
    Fiber *fiber = static_cast<Fiber*>(g.Target);
    g.Free();
    fiber->main();
    System::Console::WriteLine( S"\nfibermain returning");
}

#pragma unmanaged

VOID CALLBACK unmanaged_fiberproc(PVOID objptr) {
#if defined(CORHOST)
    corhost->CreateLogicalThreadState();
#endif
    fibermain(objptr);
#if defined(CORHOST)
    corhost->DeleteLogicalThreadState();
#endif
}

}

The above fibers.cpp class file is the only class in the Visaul c++ project. It's built as a DLL with CLR support using /CLR:oldstyle switch.

using System;
using System.Threading;
using Fibers;
using NUnit.Framework;

namespace TickZoom.Utilities
{
    public class FiberTask : Fiber 
    {
        public FiberTask()
        {

        }
        public FiberTask(FiberTask mainTask)
            : base(mainTask)
        {

        }

        protected override void Run()
        {
            while (true)
            {
                Console.WriteLine("Top of worker loop.");
                try
                {
                    Work();
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Exception: " + ex.Message);
                }
                Console.WriteLine("After the exception.");
                Work();
            }
        }

        private void Work()
        {
            Console.WriteLine("Doing work on fiber: " + GetHashCode() + ", thread id: " + Thread.CurrentThread.ManagedThreadId);
            ++counter;
            Console.WriteLine("Incremented counter " + counter);
            if (counter == 2)
            {
                Console.WriteLine("Throwing an exception.");
                throw new InvalidCastException("Just a test exception.");
            }
            Yield();
        }

        public static int counter;
    }

    [TestFixture]
    public class TestingFibers
    {
        [Test]
        public void TestIdeas()
        {
            var fiberTasks = new System.Collections.Generic.List<FiberTask>();
            var mainFiber = new FiberTask();
            for( var i=0; i< 5; i++)
            {
                fiberTasks.Add(new FiberTask(mainFiber));
            }
            for (var i = 0; i < fiberTasks.Count; i++)
            {
                Console.WriteLine("Resuming " + i);
                var fiberTask = fiberTasks[i];
                if( !fiberTask.Resume())
                {
                    Console.WriteLine("Fiber " + i + " was disposed.");
                    fiberTasks.RemoveAt(i);
                    i--;
                }
            }
            for (var i = 0; i < fiberTasks.Count; i++)
            {
                Console.WriteLine("Disposing " + i);
                fiberTasks[i].Dispose();
            }
        }
    }
}

The above unit test gives the following output and then crashes badly:

Resuming 0
Top of worker loop.
Doing work on fiber: 476184704, thread id: 7
Incremented counter 1
Resuming 1
Top of worker loop.
Doing work on fiber: 453842656, thread id: 7
Incremented counter 2
Throwing an exception.
Exception: Just a test exception.
After the exception.

Solution

  • A time ago, I experienced the same problem - I tried to use the code snippet in .NET 3.5 (later on 4.0) and it crashed. This convinced me to turn away from the "hacky" solution. The truth is that .NET is missing a generic co-routine concept. There are some guys which simulate co-routines by enumerators and the yield keyword (see http://fxcritic.blogspot.com/2008/05/lightweight-fibercoroutines.html). However, this has clear disadvantages to me: It is not as intuitive to use as good-old Win32 fibers and it requires you to use IEnumerable as return type for each co-routine.

    Maybe this arcticle: http://msdn.microsoft.com/en-us/vstudio/gg316360 is interesting for you. Microsoft is about to introduce a new async keyword. A community technology preview (CTP) is offered for download. I guess it should be possible to develop a clean co-routine implementation on top of those async extensions.