Search code examples
c#multithreadingiothread-safetymsgpack

Why does my thread id changes after reading asynchronously from a stream?


I have this couple of methods:

private static bool loaded = false;
private static bool replaying = false;
private static string wIndex = String.Empty;
private static WorldData wData;
private static ConcurrentDictionary<int, List<long>> streamPosition
    = new ConcurrentDictionary<int, List<long>>();
private static ConcurrentDictionary<int, List<string>> collectionNames
    = new ConcurrentDictionary<int, List<string>>();
private static async void StartReplay()
{
    try
    {
        Stopwatch st = new Stopwatch();
        while (loaded)
        {
            while (replaying)
            {
                st.Start();
                for (int i = 0; i < collectionNames.Count; i++)
                {
                    XLogger.Log(toConsole.Debug, Thread.CurrentThread.ManagedThreadId
                        .ToString());
                    wData.CopyCollection(await DeserializeListFromStreamAsync(
                        wData.GetCollectionByName(collectionNames[Thread.CurrentThread
                        .ManagedThreadId][i]), i, new CancellationToken()));
                }
                st.Stop();
                int sleepTime = DebriefingManager.replayRate
                    - (int)st.ElapsedMilliseconds;
                if (sleepTime > 0)
                {
                    Thread.Sleep(sleepTime);
                }
                else
                {
                    XLogger.Log(toConsole.Bad, "Debriefing is slow, video may lag.");
                    XLogger.Log(toFile.System, "Debriefing is slow, video may lag.");
                }
                st.Reset();
            }
        }
    }
    catch (Exception e)
    {
        XLogger.Log(toConsole.Bad, e.ToString());
        XLogger.Log(toFile.Error, e.ToString());
    }
}

private static async Task<ConcurrentDictionary<string, T>>
    DeserializeListFromStreamAsync<T>(
    ConcurrentDictionary<string, T> coll, int i, CancellationToken cancellationToken)
{
    var dataStructures = new ConcurrentDictionary<string, T>();
    using (FileStream stream = File.OpenRead(DebriefingManager
        .GetReadingStreamByCollection(coll)))
    {
        stream.Position = streamPosition[Thread.CurrentThread.ManagedThreadId][i];
        using (var streamReader = new MessagePackStreamReader(stream))
        {
            XLogger.Log(toConsole.Debug,
                $"{Thread.CurrentThread.ManagedThreadId} --- test 1");
            ReadOnlySequence<byte>? msgpack = await streamReader
                .ReadAsync(cancellationToken);
            XLogger.Log(toConsole.Debug,
                $"{Thread.CurrentThread.ManagedThreadId} --- test 2");
            if (msgpack is null) return null;
            dataStructures = MessagePackSerializer
                .Deserialize<ConcurrentDictionary<string, T>>(
                (ReadOnlySequence<byte>)msgpack, cancellationToken: cancellationToken);
        }
        streamPosition[Thread.CurrentThread.ManagedThreadId][i] = stream.Position;
    }

    return dataStructures;
}

StartReplay is run by three different threads. I need to have a unique id for each thread as I need the List<long> and List<string> to be unique for each one. So I thought about using ConcurrentDictionaries and the Thread.CurrentThread.ManagedThreadId as a key.

The first thing I tried was to use Thread.CurrentThread.ManagedThreadId but I discovered that after this line: ReadOnlySequence<byte>? msgpack = await streamReader.ReadAsync(cancellationToken); the Id changed. Not knowing that it should be immutable I thought nothing of it and tried to use the [ThreadStatic] attribute, but after that same line the value of the variable tagged was reset to 0. After using the Thread debug window I found out that the threads that ran my code were "killed" after that line and new ones were used to continue the code.

My question than is: why does this happen? And how do I prevent it? Might this be impacting performance?

EDIT: I should also add that the method is a modified version of the one in the MessagePack documentation in the "Multiple MessagePack structures on a single Stream " section.


Solution

  • Why does this happen?

    Because this is the nature of the beast (asynchrony). The completion of asynchronous operations happens on a thread that is usually different than the thread that initiated the asynchronous operation. This is especially true for Console applications, that are not equipped with any specialized mechanism that restores the original thread after the await. It would be different if you had, for example, a Windows Forms app. These applications start by installing a specialized scheduler on the UI thread, called WindowsFormsSynchronizationContext, which intervenes after the await, and schedules the continuation back on the UI thread. You don't have such a thing in a Console application, so you are experiencing the effects of asynchrony in its purest form.

    How do I prevent it?

    By not having asynchronous execution flows. Just wait synchronously all the asynchronous operations, and you'll be in the same thread from start to finish:

    ReadOnlySequence<byte>? msgpack = streamReader
        .ReadAsync(cancellationToken).GetAwaiter().GetResult();
    

    If you find it tiresome to write .GetAwaiter().GetResult() everywhere, you can shorten it to .Wait2() with these two extension methods:

    public static void Wait2(this Task task) => task.GetAwaiter().GetResult();
    public static TResult Wait2<TResult>(this Task<TResult> task) => task.GetAwaiter().GetResult();
    

    Might this be impacting performance?

    It might impact the memory efficiency. Your threads will be blocked during the asynchronous operations, so you program might need more threads than usual. This could have some temporary effects on performance, in case the ThreadPool becomes saturated, and needs to spawn more threads. The thread-injecting heuristics are a bit conservative, by injecting at maximum one new thread per second. In case this is a problem, you can configure the ThreadPool in advance with the ThreadPool.SetMinThreads method.

    Note: Blocking the current thread with .GetAwaiter().GetResult() is not a good way to write code in general. It violates the common wisdom of not blocking on async code. Here I am just answering directly your question about how to prevent the thread from changing. I am not advising you to actually do it. If you asked for my advice, I would say to rethink everything that you have done so far, and maybe restart your project from scratch.