Search code examples
c#multithreadingasync-awaitlockingmonitor

Thread Locking doesn't seem to work consistently


Update (Explanation of output)

I have multiple tasks running that all try to do a job however I only ever want one task to be doing or waiting to do that job.

In one scenario, if the job is in-progress, other tasks that attempt to do that same job will see that it's already in-progress and skip attempting to do it and move on immediately because they have no need to know if or when it actually finishes.

In another scenario, if the job is in-progress, other tasks should wait for the job to finish before continuing but then not attempt to do the job themselves because it was just completed by the task they were waiting on. Once the job is complete, all the tasks that were waiting may move on but none of them should attempt to do that job that was just done.

Original Question

My goal is to create an async Task extension method that will generate results similar to this: (not exactly because SKIP and WAIT-TO-SKIP won't be in exactly the same order every time)

When run with one thread:

[Task 1] LOCKED
[Task 1] WORK-START
[Task 1] WORK-END
[Task 1] UNLOCKED

When run with five threads:

[Task 1] LOCKED
[Task 1] WORK-START
[Task 2] WAIT-TO-SKIP
[Task 3] WAIT-TO-SKIP
[Task 4] WAIT-TO-SKIP
[Task 5] WAIT-TO-SKIP
[Task 1] WORK-END
[Task 1] UNLOCKED
[Task 2] SKIP
[Task 3] SKIP
[Task 4] SKIP
[Task 5] SKIP

It should also have a parameter that allows it to behave like this:

[Task 1] LOCKED
[Task 1] WORK-START
[Task 2] SKIP
[Task 3] SKIP
[Task 4] SKIP
[Task 5] SKIP
[Task 1] WORK-END
[Task 1] UNLOCKED

Here's the method I came up with:

public static async Task FirstThreadAsync(IFirstThread obj, Func<Task> action, TaskCompletionSource? waitTaskSource, string threadName = "")
{
    if (obj.Locked)
    {
        if (waitTaskSource != null && !waitTaskSource.Task.IsCompleted)
        {
            Log.Debug(Logger, $"[{threadName}] WAIT-TO-SKIP");
            await waitTaskSource.Task;
        }
        Log.Debug(Logger, $"[{threadName}] SKIP-1");
        return;
    }
    var lockWasTaken = false;
    var temp = obj;
    try
    {
        if (waitTaskSource == null || waitTaskSource.Task.IsCompleted == false)
        {
            Monitor.TryEnter(temp, ref lockWasTaken);
            if (lockWasTaken) obj.Locked = true;
        }
    }
    finally
    {
        if (lockWasTaken) Monitor.Exit(temp);
    }
    if (waitTaskSource?.Task.IsCompleted == true)
    {
        Log.Debug(Logger, $"[{threadName}] SKIP-3");
        return;
    }
    if (waitTaskSource != null && !lockWasTaken)
    {
        if (!waitTaskSource.Task.IsCompleted)
        {
            Log.Debug(Logger, $"[{threadName}] WAIT-TO-SKIP (LOCKED)");
            await waitTaskSource.Task;
        }
        Log.Debug(Logger, $"[{threadName}] SKIP-2");
        return;
    }
    Log.Debug(Logger, $"[{threadName}] LOCKED");
    try
    {
        Log.Debug(Logger, $"[{threadName}] WORK-START");
        await action.Invoke().ConfigureAwait(false);
        Log.Debug(Logger, $"[{threadName}] WORK-END");
    }
    catch (Exception ex)
    {
        waitTaskSource?.TrySetException(ex);
        throw;
    }
    finally
    {
        obj.Locked = false;
        Log.Debug(Logger, $"[{threadName}] UNLOCKED");
        waitTaskSource?.TrySetResult();
    }
}

Here's the interface and the Example class:

public interface IFirstThread
{
    bool Locked { get; set; }
}

public class Example : IFirstThread
{
    public bool Locked { get; set; }

    public async Task DoWorkAsync(string taskName)
    {
        for (var i = 0; i < 10; i++)
        {
            await Task.Delay(5);
        }
    }
}

Here's a unit test where the Task1 - Task5 are all trying to run at the same time and Task End runs after they all completed:

[TestMethod]
public async Task DoWorkOnce_AsyncX()
{
    var waitTaskSource = new TaskCompletionSource();
    var example = new Methods.Example();
    var tasks = new List<Task>
    {
        FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task 1"),
        FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task 2"),
        FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task 3"),
        FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task 4"),
        FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task 5"),
    };

    await Task.WhenAll(tasks);
    await FirstThreadAsync(example, example.DoWorkAsync, waitTaskSource, "Task End"),

    //code to get and compare the output
}

The problem I'm having is that 99% of the time it works as expected but sometimes it allows two threads to run simultaneously, and I can't seem to figure out why or how to stop it. Here's an example of the unit test output from one the rare cases when it allows two threads to run simultaneously:

[Task 5] LOCKED,
[Task 4] WAIT-TO-SKIP,
[Task 2] LOCKED,
[Task 3] WAIT-TO-SKIP,
[Task 1] WAIT-TO-SKIP,
[Task 5] WORK-START,
[Task 2] WORK-START,
[Task 2] WORK-END,
[Task 5] WORK-END,
[Task 5] UNLOCKED,
[Task 2] UNLOCKED,
[Task 1] SKIP-1,
[Task 4] SKIP-1,
[Task 3] SKIP-1,
[Task End] LOCKED,
[Task End] WORK-START,
[Task End] WORK-END,
[Task End] UNLOCKED

As you can see Task 5 and Task 2 both get locked when only one of them should be getting locked. Task End is only part of the test to verify that class is left in an unlocked state when the simultaneous calls are done. Also, the threadName parameter is completely not necessary and only included so I can tell which task is which for testing purposes.


Solution

  • You use Monitor.Exit much too early. When in one thread

    Monitor.TryEnter(temp, ref lockWasTaken);
    

    is executed after

    if (lockWasTaken) Monitor.Exit(temp);
    

    was executed in another thread, both can have lockWasTaken true.

    You don't need the Locked property on the object you use as the protected resource. The Monitor class sets an internal marker in the object directly in an atomic matter. You can simplify your logic a lot by only relying on the Monitor functionality without having your own separate flags.


    Also, as noted by another user, we don't see you creating explicit separate threads for the tasks. The tasks can run on the same thread and then Monitor.TryEnter will allow multiple tasks to enter at the same time when they run on the same thread.

    See https://softwareengineering.stackexchange.com/questions/340414/when-is-it-safe-to-use-monitor-lock-with-task on why you should use SempahoreSlim instead of Monitor in async programming.


    Here is some sample code solving the task with SemaphoreSlim. I don't see the deeper purpose though ;)

    private async void button1_Click(object sender, EventArgs e)
    {
        TaskCompletionSource? waitTaskSource = null; // or  new TaskCompletionSource();
        var example = new Example();
        var sempahore = new SemaphoreSlim(1);
        var tasks = new List<Task>
        {
            FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task 1"),
            FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task 2"),
            FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task 3"),
            FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task 4"),
            FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task 5"),
        };
    
        await Task.WhenAll(tasks);
        await FirstThreadAsync(sempahore, example.DoWorkAsync, waitTaskSource, "Task End");
    }
    
    public static async Task FirstThreadAsync(SemaphoreSlim semaphore, Func<Task> action, TaskCompletionSource? waitTaskSource, string threadName = "")
    {
        // Try to acquire lock
        bool lockAcquired = await semaphore.WaitAsync(TimeSpan.Zero);
    
        if (lockAcquired)
        {
            // Lock acquired -> Do Work
            Debug.WriteLine($"[{threadName}] LOCKED");
    
            try
            {
                Debug.WriteLine($"[{threadName}] WORK-START");
                await action.Invoke();
                Debug.WriteLine($"[{threadName}] WORK-END");
    
                semaphore.Release();
                Debug.WriteLine($"[{threadName}] UNLOCKED");
            }
            catch (Exception ex)
            {
                waitTaskSource?.TrySetException(ex);
            }
            finally
            {
                waitTaskSource?.TrySetResult();
            }
        }
        else // No lock acquired
        {
            // When source is specified, await it.
            if(waitTaskSource != null)
            {
                Debug.WriteLine($"[{threadName}] WAIT-TO-SKIP");
                await waitTaskSource.Task;
            }
    
            Debug.WriteLine($"[{threadName}] SKIP");
        }
    }
    
    
    
    public class Example
    {
        public async Task DoWorkAsync()
        {
            for (var i = 0; i < 10; i++)
            {
                await Task.Delay(5);
            }
        }
    }
    

    I have just seen your requirement that the work package should only be done once at all. To achieve that, you can just skip releasing the semaphore.

    Fur further discussions we need to know more details on the actual requirements. Your current set of requirements seems a bit strange. Why have multiple tasks but only one work package?