Search code examples
.netmultithreadingtask-parallel-librarytpl-dataflow

.NET - Block main thread until there are any available threads


I have a process where my main thread is reading a file and splitting it into parts. Those parts then require further processing. I would like to utilize any available threads so that the downstream processing is utilizing as much CPU (or as many cores) as possible. I don't want to create an excessive backlog from the main thread, so I need the main thread to wait to add to the queue until there is another available thread.

I see many articles like VB.NET 4.0: Looking to execute multiple threads, but wait until all threads are completed before resuming, but they are waiting for all threads to complete, whereas I just need any threads to be available

Is this something I can tackle with the Task Parallel Library, or should I be manually creating threads and monitoring a threadpool?

Using Reader As New StreamReader(FileName)
    Do
        CurrentBlockSize = Reader.ReadBlock(CurrentBuffer, 0, BufferSize)

        RunningBuffer &= New String(CurrentBuffer)

        If RunningBuffer.Contains(RowDelimiter) Then
            LineParts = RunningBuffer.Split(RowDelimiter)

            For I As Integer = 0 To LineParts.Count - 1
                If I < LineParts.Count - 1 Then

                    'Make synchronous call that blocks until' 
                    'another thread is available to process the line'
                    AddLineToTheProcessingQueue(CurrentLine)

                Else
                    RunningBuffer = LineParts(I)
                End If
            Next
        End If

    Loop While CurrentBlockSize = BufferSize
End Using

Solution

  • Paste this code into a new Console Application.

    Imports System.Threading
    
    Module Module1
    
        ' I just picked 6 randomly, not sure what is a good strategy for picking this number
        ' also, not sure what is the difference between a Worker Thread and a Completion thread
        Const MaxWorkerThreads As Integer = 6
        Const MaxCompletionPortThreads As Integer = 6
    
        Sub Main()
    
            ThreadPool.SetMaxThreads(MaxWorkerThreads, MaxCompletionPortThreads)
    
            Dim availableWorkerThreads As Integer
            Dim availableCompletionPortThreads As Integer
    
            For i As Integer = 0 To 100
    
                ' GetAvailableThreads returns results via output parameters
                ThreadPool.GetAvailableThreads(availableWorkerThreads, availableCompletionPortThreads)
    
                Dim tries As Integer = 0
    
                Do While (availableWorkerThreads = 0)
                    ' this loop does not execute if there are available threads
                    ' you may want to add a fail-safe to check "tries" in case the child threads get stuck
                    tries += 1
    
                    Console.WriteLine(String.Format("waiting to start item {0}, attempt {1}, available threads: {2}, {3}", i, tries, availableWorkerThreads, availableCompletionPortThreads))
    
                    ' failure to call Sleep will make your program unresponsive
                    Thread.Sleep(1000)
    
                    ' call GetAvailableThreads again for the next test at the top of the loop
                    ThreadPool.GetAvailableThreads(availableWorkerThreads, availableCompletionPortThreads)
                Loop
    
                ' this is how you pass parameters to a thread created through QueueUserWorkItem
                Dim parameters As Object() = {i}
                ThreadPool.QueueUserWorkItem(AddressOf DoWork, parameters)
                ' According to MSDN, you must Sleep after calling QueueUserWorkItem, or else the current thread will just exit
                Thread.Sleep(500)
    
            Next
    
        End Sub
    
        Sub DoWork(parameters As Object())
            Dim itemNumber = parameters(0)
            Dim sleepLength = itemNumber * 1000
            Console.WriteLine(String.Format("Item: {0} - sleeping for {1} miliseconds.", itemNumber, sleepLength))
            Thread.Sleep(sleepLength)
            Console.WriteLine(String.Format("Item: {0} - done sleeping.", itemNumber))
        End Sub
    
    End Module