I am new to using TPL. I've been able to implement a simple post / receive transaction on a BufferBlock but when I try to go asynchronous, I get hung.
Here is a simplified version of what I am trying.
Declare the message buffer (made it global)
Dim msgBuffer As New BufferBlock(Of String)
A simple function to post the messages which is just a list of file names in a directory
Private Sub PostMessagesToBuffer()
filesList = Directory.GetFiles(txtBoxSrcFilesDir.Text, fileFilter).ToList
For Each file In filesList
msgBuffer.Post(file)
Next
msgBuffer.Complete()
End Sub
I created this function for processing the messages:
Private Async Function ProcessMessagesAsync() As Task(Of Integer)
Dim msgsProcessed = 0
While Await msgBuffer.OutputAvailableAsync
Dim msg = msgBuffer.Receive
Console.WriteLine(msg)
msgsProcessed += 1
End While
Return msgsProcessed
End Function
Then just this set of calls to run it all.
Dim msgProcessor = ProcessMessagesAsync()
PostMessagesToBuffer()
msgProcessor.Wait()
Console.writeLine("Processed " & msgProcessor.Result & " Messages.")
I can debug and see the messages getting added to the buffer but the "While Await" never gets the signal that a message is available on the queue. It just sits there, never getting into the loop to do the work on the message. Am I missing something rather simple here?
You are mixing synchronous and asynchronous code. Deadlocks are pretty common when blocking on async code. Instead of Wait
:
msgProcessor.Wait()
...it is safer to Await
:
Await msgProcessor
Also the DataflowBlock.Receive
is a blocking method. The proper method to use after awaiting the DataflowBlock.OutputAvailableAsync
is the method DataflowBlock.TryReceive
.
I should also note that retrieving manually the elements of a DataflowBlock
is feasible but uncommon. Usually the last block in a dataflow pipeline is an ActionBlock
, that doesn't generate output.