Search code examples
multithreadingpowershellparallel-processing

How do I add an atomic counter to a powershell ForEach -Parallel loop


In this question, it was explained how to add to a concurrent ThreadSafe collection Powershell: How to add Result to an Array (ForEach-Object -Parallel)

I have a simpler use case , where I would just like to increment a single value. (Integer).

Is it possible to do in Powershell using some sort of Atomic Integer data type?

$myAtomicCounter = 0

$myItems | ForEach-Object -Parallel {
    #...other work

    $myAtomicCounter.ThreadSafeAdd(2)

    # .. some more work using counter
}

Write-Host($myAtomicCounter)

Solution

  • In PowerShell when updating a single value from multiple threads you must use a locking mechanism, for example Mutex, SemaphoreSlim or even Monitor.Enter otherwise the updating operation will not be thread safe. A synchronized hashtable does not ensure that updating the key values is thread safe.

    Below is a simple demo that proves what is stated above:

    $sync = [hashtable]::Synchronized(@{ })
    $iteration = 0
    
    do {
        $sync['Value'] = 0
        $iteration++
        0..10 | ForEach-Object -Parallel {
            $sync = $using:sync
            Start-Sleep -Milliseconds 200
            $sync['Value']++
        } -ThrottleLimit 11
    }
    while ($sync['Value'] -eq 11)
    
    "On iteration $iteration the value was $($sync['Value'])"
    

    Interlocked.Increment is also not thread-safe in PowerShell as PSReference is not the same as a C# by ref value. Again, simple example to prove it:

    $iteration = 0
    
    do {
        $iteration++
        $i = [ref] 0
    
        0..10 | ForEach-Object -Parallel {
            $i = $using:i
            # [System.Threading.Monitor]::Enter($i) => makes this thread safe
            Start-Sleep -Milliseconds 200
            $null = [System.Threading.Interlocked]::Increment($i)
            # [System.Threading.Monitor]::Exit($i)  => makes this thread safe
        }
    }
    until ($i.Value -ne 11)
    
    "On iteration $iteration the value was $($i.Value)"
    

    A few examples on how we can increment the value in a thread safely manner:

    1. Using a Mutex:
    $processedItems = [hashtable]::Synchronized(@{
        Lock    = [System.Threading.Mutex]::new()
        Counter = 0
    })
    
    0..10 | ForEach-Object -Parallel {
        # using sleep as to emulate doing something here
        Start-Sleep -Milliseconds (Get-Random -Maximum 200)
    
        # bring the local variable to this scope
        $ref = $using:processedItems
        # lock this thread until I can write
        if ($ref['Lock'].WaitOne()) {
            # when I can write, update the value
            $ref['Counter']++
            # and realease this lock so others threads can write
            $ref['Lock'].ReleaseMutex()
        }
    }
    
    # Should be True:
    $processedItems['Counter'] -eq 11
    
    1. Another example of tread safe incrementing a counter using Monitor.Enter and a custom function that tries to resemble the C# lock statement:
    function lock {
        param(
            [Parameter(Mandatory)]
            [object] $Object,
    
            [Parameter(Mandatory)]
            [scriptblock] $ScriptBlock
        )
    
        try {
            [System.Threading.Monitor]::Enter($Object)
            & $ScriptBlock
        }
        finally {
            [System.Threading.Monitor]::Exit($Object)
        }
    }
    
    $utils = [hashtable]::Synchronized(@{
        LockFunc = $function:lock.ToString()
        Counter  = @(0)
    })
    
    0..10 | ForEach-Object -Parallel {
        # bring the utils var to this scope
        $utils = $using:utils
        # define the `lock` function here
        $function:lock = $utils['LockFunc']
    
        Start-Sleep -Milliseconds (Get-Random -Maximum 200)
    
        # lock the counter array
        lock($utils['Counter'].SyncRoot) {
            # increment and release when done
            $utils['Counter'][0]++
        }
    }
    
    # Should be True:
    $utils['Counter'][0] -eq 11
    
    1. Using a SemaphoreSlim:
    $utils = [hashtable]::Synchronized(@{
        Semaphore = [System.Threading.SemaphoreSlim]::new(1, 1)
        Counter  = @(0)
    })
    
    
    0..10 | ForEach-Object -Parallel {
        # bring the utils var to this scope
        $utils = $using:utils
    
        Start-Sleep -Milliseconds (Get-Random -Maximum 200)
    
        # wait here until we can write
        $utils['Semaphore'].Wait()
        # once i can write, increment the value
        $utils['Counter'][0]++
        # and release so other thread can write
        $null = $utils['Semaphore'].Release()
    }
    
    # Should be True:
    $utils['Counter'][0] -eq 11
    
    1. A much simpler approach in PowerShell would be to output from your parallel loop into a linear loop where you can safely update the counter without having to care about thread safety, basically let PowerShell handle the synchronization for you:
    $counter = 0
    
    0..10 | ForEach-Object -Parallel {
        # using sleep as to emulate doing something here
        Start-Sleep -Milliseconds (Get-Random -Maximum 200)
    
        # when this thread is done, output from the parallel
        $_
    } | ForEach-Object {
        # then the output from the parallel loop is received in this linear
        # thread safe loop where we can update the counter
        $counter++
    }
    
    # Should be True:
    $counter -eq 11