Search code examples
powershellarraylistconcurrencysynchronizedrunspace

How to dynamically add items to a PowerShell ArrayList and process them recursively using Runspace pool?


I have a for loop that iterates through an ArrayList and during the process, adds more items to the list and processes them as well (iteratively). I am trying to convert this function to run concurrently using Runspacepool.

Here is the normal code without runspace:

$array = [System.Collections.ArrayList]@(1, 2, 3, 4, 5)
Write-Host "Number of items in array before loop: $($array.Count)"
for ($i = 0; $i -lt $array.Count; $i++) {
    Write-Host "Counter: $i`tArray: $array"
    if ($array[$i] -in @(1, 2, 3, 4, 5)) {
        $array.Add($array[$i] + 3) | Out-Null
    }
}
Write-Host "Array: $array"
Write-Host "Number of items in array after loop: $($array.Count)"

Output is:

Number of items in array before loop: 5
Counter: 0      Array: 1 2 3 4 5
Counter: 1      Array: 1 2 3 4 5 4
Counter: 2      Array: 1 2 3 4 5 4 5
Counter: 3      Array: 1 2 3 4 5 4 5 6
Counter: 4      Array: 1 2 3 4 5 4 5 6 7
Counter: 5      Array: 1 2 3 4 5 4 5 6 7 8
Counter: 6      Array: 1 2 3 4 5 4 5 6 7 8 7
Counter: 7      Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 8      Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 9      Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 10     Array: 1 2 3 4 5 4 5 6 7 8 7 8
Counter: 11     Array: 1 2 3 4 5 4 5 6 7 8 7 8
Array: 1 2 3 4 5 4 5 6 7 8 7 8
Number of items in array after loop: 12

Here is the Runspace function that I am trying to implement:

$pool = [RunspaceFactory]::CreateRunspacePool(1, 10)
$pool.Open()
$runspaces = @()

$scriptblock = {
    Param ($i, $array)
    # Start-Sleep 1 # <------ Output varies significantly if this is enabled
    Write-Output "$i value: $array"
    if ($i -in @(1, 2, 3, 4, 5)) {
        $array.Add($i + 3) | Out-Null
    }
}

$array = [System.Collections.ArrayList]::Synchronized(([System.Collections.ArrayList]$(1, 2, 3, 4, 5)))
Write-Host "Number of items in array before loop: $($array.Count)"
for ($i = 0; $i -lt $array.Count; $i++) {
    $runspace = [PowerShell]::Create().AddScript($scriptblock).AddArgument($array[$i]).AddArgument($array)
    $runspace.RunspacePool = $pool
    $runspaces += [PSCustomObject]@{ Pipe = $runspace; Status = $runspace.BeginInvoke() }
}

while ($runspaces.Status -ne $null) {
    $completed = $runspaces | Where-Object { $_.Status.IsCompleted -eq $true }
    foreach ($runspace in $completed) {
        $runspace.Pipe.EndInvoke($runspace.Status)
        $runspace.Status = $null
    }
}
Write-Host "array: $array"
Write-Host "Number of items in array after loop: $($array.Count)"
$pool.Close()
$pool.Dispose()

Output without sleep function is as expected:

Number of items in array before loop: 5
Current value: 1        Array: 1 2 3 4 5
Current value: 2        Array: 1 2 3 4 5 4
Current value: 3        Array: 1 2 3 4 5 4 5
Current value: 4        Array: 1 2 3 4 5 4 5 6
Current value: 5        Array: 1 2 3 4 5 4 5 6 7
Current value: 4        Array: 1 2 3 4 5 4 5 6 7 8
Current value: 5        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 6        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 7        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 8        Array: 1 2 3 4 5 4 5 6 7 8 7
Current value: 7        Array: 1 2 3 4 5 4 5 6 7 8 7 8
Current value: 8        Array: 1 2 3 4 5 4 5 6 7 8 7 8
Array: 1 2 3 4 5 4 5 6 7 8 7 8
Number of items in array after loop: 12

Output with Sleep:

Number of items in array before loop: 5
Current value: 1        Array: 1 2 3 4 5
Current value: 2        Array: 1 2 3 4 5 4
Current value: 3        Array: 1 2 3 4 5 4 5
Current value: 4        Array: 1 2 3 4 5 4 5 6
Current value: 5        Array: 1 2 3 4 5 4 5 6 7
Array: 1 2 3 4 5 4 5 6 7 8
Number of items in array after loop: 10

I understand that this is happening because the for loop exits before the sleep time is completed and therefore, only the first 5 items are added to the runspace pool.

Is there a way to add more items to the ArrayList dynamically and still process them concurrently using runspaces?


Solution

  • This answer attempts to provide a better solution to the producer-consumer problem using a BlockingCollection<T> which provides an implementation of the producer/consumer pattern.

    To clarify on the issue with my previous answer, as OP has noted in a comment:

    If the starting count of the queue (say, 2) is less than the max number of threads (say 5), then only that many (2, in this case) threads remain active no matter how many ever items are added to the queue later. Only the starting number of threads process the rest of the items in the queue. In my case, the starting count is usually one. Then I make a irm (alias for Invoke-RestMethod) request, and add some 10~20 items. These are processed by only the first thread. The other threads go to Completed state right at the start. Is there a solution to this?

    For this example, the runspaces will be using the TryTake(T, TimeSpan) method overload which blocks the thread and waits for the specified timeout. On each loop iteration the runspaces will also be updating a Synchronized Hashtable with their TryTake(..) result.

    The main thread will be using the Synchronized Hashtable to wait until all runspaces had sent a $false status, when this happens an exit signal is sent to the threads to with .CompleteAdding().

    Even though not perfect, this solves the problem where some of the threads might exit early from the loop and attempts to ensure that all threads end at the same time (when there are no more items in the collection).

    The producer logic will be very similar to the previous answer, however, in this case each thread will wait random amount of time between $timeout.Seconds - 5 and $timeout.Seconds + 5 on each loop iteration.

    The results one can expect from this demo can be found on this gist.

    using namespace System.Management.Automation.Runspaces
    using namespace System.Collections.Concurrent
    using namespace System.Threading
    
    try {
        $threads = 20
        $bc      = [BlockingCollection[int]]::new()
        $status  = [hashtable]::Synchronized(@{ TotalCount = 0 })
    
        # set a timer, all threads will wait for it before exiting
        # this timespan should be tweaked depending on the task at hand
        $timeout = [timespan]::FromSeconds(5)
    
        foreach($i in 1, 2, 3, 4, 5) {
            $bc.Add($i)
        }
    
    
        $scriptblock = {
            param([timespan] $timeout, [int] $threads)
    
            $id = [runspace]::DefaultRunspace
            $status[$id.InstanceId] = $true
            $syncRoot = $status.SyncRoot
            $release  = {
                [Threading.Monitor]::Exit($syncRoot)
                [Threading.Monitor]::PulseAll($syncRoot)
            }
    
            # will use this to simulate random delays
            $min = $timeout.Seconds - 5
            $max = $timeout.Seconds + 5
    
            [ref] $target = $null
            while(-not $bc.IsCompleted) {
                # NOTE from `Hashtable.Synchronized(Hashtable)` MS Docs:
                #
                #    The Synchronized method is thread safe for multiple readers and writers.
                #    Furthermore, the synchronized wrapper ensures that there is only
                #    one writer writing at a time.
                #
                #    Enumerating through a collection is intrinsically not a
                #    thread-safe procedure. Even when a collection is synchronized,
                #    other threads can still modify the collection, which causes the
                #    enumerator to throw an exception.
    
                # Mainly doing this (lock on the sync hash) to get the Active Count
                # Not really needed and only for demo porpuses
    
                # if we can't lock on this object in 200ms go next iteration
                if(-not [Threading.Monitor]::TryEnter($syncRoot, 200)) {
                    continue
                }
    
                # if there are no items in queue, send `$false` to the main thread
                if(-not ($status[$id.InstanceId] = $bc.TryTake($target, $timeout))) {
                    # release the lock and signal the threads they can get a handle
                    & $release
                    # and go next iteration
                    continue
                }
    
                # if there was an item in queue, get the active count
                $active = @($status.Values -eq $true).Count
                # add 1 to the total count
                $status['TotalCount'] += 1
                # and release the lock
                & $release
    
                Write-Host (
                    ('Target Value: {0}' -f $target.Value).PadRight(20) + '|'.PadRight(5) +
                    ('Items in Queue: {0}' -f $bc.Count).PadRight(20)   + '|'.PadRight(5) +
                    ('Runspace Id: {0}' -f $id.Id).PadRight(20)         + '|'.PadRight(5) +
                    ('Active Runspaces [{0:D2} / {1:D2}]' -f $active, $threads)
                )
    
                $ran = [random]::new()
                # start a simulated delay
                Start-Sleep $ran.Next($min, $max)
    
                # get a random number between 0 and 10
                $ran = $ran.Next(11)
                # if the number is greater than the Dequeued Item
                if ($ran -gt $target.Value) {
                    # enumerate starting from `$ran - 2` up to `$ran`
                    foreach($i in ($ran - 2)..$ran) {
                        # enqueue each item
                        $bc.Add($i)
                    }
                }
    
                # Send 1 to the Success Stream, this will help us check
                # if the test succeeded later on
                1
            }
        }
    
        $iss    = [initialsessionstate]::CreateDefault2()
        $rspool = [runspacefactory]::CreateRunspacePool(1, $threads, $iss, $Host)
        $rspool.ApartmentState = [ApartmentState]::STA
        $rspool.ThreadOptions  = [PSThreadOptions]::UseNewThread
        $rspool.InitialSessionState.Variables.Add([SessionStateVariableEntry[]]@(
            [SessionStateVariableEntry]::new('bc', $bc, 'Producer Consumer Collection')
            [SessionStateVariableEntry]::new('status', $status, 'Monitoring hash for signaling `.CompleteAdding()`')
        ))
        $rspool.Open()
    
        $params = @{
            Timeout = $timeout
            Threads = $threads
        }
    
        $rs = for($i = 0; $i -lt $threads; $i++) {
            $ps = [powershell]::Create($iss).AddScript($scriptblock).AddParameters($params)
            $ps.RunspacePool = $rspool
    
            @{
                Instance    = $ps
                AsyncResult = $ps.BeginInvoke()
            }
        }
    
        while($status.ContainsValue($true)) {
            Start-Sleep -Milliseconds 200
        }
    
        # send signal to stop
        $bc.CompleteAdding()
    
        [int[]] $totalCount = foreach($r in $rs) {
            try {
                $r.Instance.EndInvoke($r.AsyncResult)
                $r.Instance.Dispose()
            }
            catch {
                Write-Error $_
            }
        }
        Write-Host ("`nTotal Count [ IN {0} / OUT {1} ]" -f $totalCount.Count, $status['TotalCount'])
        Write-Host ("Items in Queue: {0}" -f $bc.Count)
        Write-Host ("Test Succeeded: {0}" -f (
            [Linq.Enumerable]::Sum($totalCount) -eq $status['TotalCount'] -and
            $bc.Count -eq 0
        ))
    }
    finally {
        ($bc, $rspool).ForEach('Dispose')
    }