Search code examples
powershellpipelineproducer-consumerthrottling

Powershell: Pipeline Producer and Consumer communication


Environment: Win10 + Powershell 5.1

I have a producer function and a consumer function. I used pipeline to connect them. The producer function create a new object every 5 seconds, while the consumer function might take longer to consume an object within 5 seconds. Consumer function has the ability to deal with 3 objects. Is there a way for the consumer to let producer know that producer needs to wait until consumer has additional capacity?

This is my code. Populate-NextIcon = Producer and Launch-Icon = consumer

Populate-NextIcon -List $list| foreach-object { $_ | Launch-Icon }

function Populate-NextIcon
{
param ([System.Collections.ArrayList]$list)

foreach($i in $list)
{
  New-Object PSObject -Property @{ Name = $i.name; x = $i.x; y= $i.y }
  sleep
}

}

function Launch-Icon
{
 [CmdletBinding()]
    Param(
            [Parameter(ValueFromPipelineByPropertyName )]
                [String]$name,
            [Parameter(ValueFromPipelineByPropertyName )]
                [int]$x,
            [Parameter(ValueFromPipelineByPropertyName )]
                [int]$y              
        )

....do something to launch the icon
}


Solution

  • There's nothing you need to do to throttle the upstream command in a pipeline (assuming it is a PowerShell command) - PowerShell will automatically block the producer (the upstream cmdlet) until the consumer (the downstream cmdlet) is ready to accept more input.

    Consider the following example:

    & { while ($true) { (++$i) } } | ForEach-Object { "[$_]"; Start-Sleep 1 }
    

    This produces an infinite sequence of numbers, outputting a new number every second.

    See the bottom section for details.


    Note that PowerShell's common -OutBuffer parameter only controls buffering on the producer side (and therefore does not apply here); that is, it allows a cmdlet to produce a batch of output of objects before PowerShell sends them through the pipeline - whereas by default each object is sent immediately when output (see the bottom section for details).
    Note that, perhaps surprisingly, -OutBuffer <n> means that output starts only once the <n>nth + 1 object is created. In other words: the default behavior is equal to -OutBuffer 0, and -OutBuffer 1 means that once a 2nd object is created, the objects are sent.

    While this alters the timing of when objects are sent through the pipeline, the basic mechanism remains the same: once a batch of objects is ready, the objects are sent one by one.

    This parameter is rarely used, but it can be useful in cases where a slow consumer (downstream cmdlet) would unduly throttle a producer, and you want to producer to produce objects ahead of time; this allows a producer to produce its batches more quickly, ahead of time.


    As for what you tried:

    As Mathias R. Jessen points out, there is no reason to use the ForEach-Object cmdlet in a pipeline connecting two pipeline-processing cmdlets - just use
    Populate-NextIcon -List $list| Launch-Icon

    However, this assumes that Launch-Icon uses a process { ... } block for per-input-object processing, the absence of which is most likely the problem in your case.

    Without such a block, your function body implicitly behaves like an end { ... } block, which means that it won't be invoked until after all input has been received - and, assuming input actually ends, your pipeline-binding parameter variables will at that point reflect only the last input object - see about_Functions_Advanced.
    In effect - if the (implied) end block is ever called, which with an infinite producer will not happen - this omission limits your function to processing at most a single object received from the pipeline.

    You bypassed the problem with the missing process block with your ForEach-Object call, by effectively creating a nested, single-input-object pipeline, whose only input object is the single ForEach-Object input object at hand.

    While this doesn't impede functionality per se, it slows down your command needlessly and noticeably.


    How data flows through the PowerShell pipeline:

    Traditional shell pipelines, such as in POSIX-like shells on Unix, are based on:

    • inter-process communication (programs running in child processes communicate)
    • sending raw byte data, with data being passed in chunks of bytes (buffers of a fixed size, typically 64KB).

    By contrast, PowerShell's pipeline is based on:

    • in-process method calls on specialized .NET classes called cmdlets (which equally applies to binary cmdlets typically compiled from C# and written-in-PowerShell cmdlets a.k.a. advanced functions / scripts)
    • sending data as objects (instances of .NET types), which are always passed one by one:
      • as soon as they're being emitted, by default
      • with -OutputBuffer optionally controlling how many objects are produced first

    Note: When external programs participate in a PowerShell pipeline:

    • they run in child processes, of necessity

    • only text-based communication (strings) is supported, where lines of text constitute the objects that flow through the pipeline one by one, which applies both to data received from an external program, and data that PowerShell commands send to an external program.

    • Caveat:

      • Only PowerShell (Core) v7.4+ has support for sending raw byte data through the pipeline; Windows PowerShell and PowerShell (Core) up to v7.3.x invariably communicate with and between external programs as text - see this answer for background.

      • Sending a single string to an external program invariably causes a trailing newline to be appended - see GitHub issue #5974.


    Communication between cmdlets in the pipeline:

    Binary cmdlets receive pipeline input - object by object - via the Cmdlet.ProcessRecord() method (note the unexpected use of the term record in the name, which, however, cannot be changed anymore).

    In PowerShell code, the equivalent is:

    • in advanced functions / scripts: the process { ... } block inside the function / script body.

    • in (the rarely used) filter functions: the entire function body

    Sending pipeline input typically happens in the same method / block (using the .WriteObject() method in binary cmdlets, and either implicit output or (rarely needed) Write-Output calls), although cmdlets that must aggregate all of their input before they can produce output, such as Sort-Object, must defer output until their .EndProcessing() method / end block is called.

    When two PowerShell commands are connected in a pipeline - let's call them producer and consumer (from a given command's perspective, the general terms for referring to commands that come earlier / later in a pipeline are upstream / downstream commands), their specific interaction is as follows:

    • When the producer outputs (sends) an object (which may not happen instantly if -OutBuffer is involved), the consumer's .ProcessRecord method / process block is called.

    • This happens synchronously, i.e. the producer's execution is blocked until the receiver's method / block call finishes.

    This means that pacing consumers and receivers to avoid runaway output production in the consumer that a receiver cannot keep up with is built into PowerShell's infrastructure (assuming intra-command code that executes synchronously, as is typical).

    Here's a minimal example that uses a simple function as the producer and a filter function as the receiver; however, the same principles apply to cmdlets:

    # A simple produces function that produces a number sequence indefinitely.
    # Write-Output is equivalent to calling .WriteObject() from a binary cmdlet.
    # (It is used here explicitly to highlight when output happens, 
    #  though note that explicit Write-Output use  is rarely necessary.)
    function Invoke-Producer { 
      while ($true) { Write-Output ++$i; Write-Verbose -vb 'back in producer' } 
    }
    
    # A simply receiver filter function that echoes each input object and
    # indefinitely waits for user input before continuing.
    # The filter's body acts like a ProcessRecord() / process block implementation.
    filter Invoke-Receiver { "`nReceived: $_"; Read-Host 'Press Enter to continue' }
    
    Invoke-Producer | Invoke-Receiver
    

    You'll see the following once you press Enter, which shows that control isn't returned to the producer until the receiver has finished processing each object:

    Received: 1
    Press Enter to continue:
    
    VERBOSE: back in producer
    
    Received: 2
    Press Enter to continue:
    

    Caveat: This automatic pacing does not apply when external programs act as producers when combined with a PowerShell command acting as the receiver (as of PowerShell 7.1):

    PowerShell buffers output lines from external programs if the receiver cannot keep up, which means that at least hypothetically you could run out of memory.

    You can verify this behavior as follows on Unix (macOS or Linux); make sure that no find process is currently running:

    $notified = $false
    
    # Using `find`, list the paths of all file-system items in PowerShell's 
    # home directory and echo one by one, waiting fora half-second after each.
    find $PSHOME | ForEach-Object { 
      "[$_]"
      if (-not $notified -and -not (Get-Process -ea Ignore find)) { 
        Write-Verbose -vb "FIND HAS EXITED."
        $notified = $true
      }
      Start-Sleep -MilliSeconds 500
    }
    

    You'll see something like the following, which implies that PowerShell has buffered all of find's output behind the scenes (as a result of which find then exited), before the ForEach-Object command has processed it all:

    [/Users/jdoe/.powershell]
    [/Users/jdoe/.powershell/Microsoft.CSharp.dll]
    VERBOSE: FIND HAS EXITED.
    [/Users/jdoe/.powershell/Microsoft.PowerShell.Commands.Management.dll]
    ...
    

    Note that there are scenarios where even a PowerShell receiver could run out of memory with an infinite producer, namely:

    • if the PowerShell command needs to aggregate all input first.

    • if the PowerShell command is a simple function / script that accesses pipeline input via the automatic $input variable, whose use means that PowerShell buffers all input up front.

    For instance, the following command will run out of memory, because processing of the simple foo function's body doesn't start until all input has been collected, which by definition never happens here:

    # Simple function that, due to use of $input, relies on PowerShell
    # to collect all its pipeline input *up front*.
    # Note: The body of a simple function / script is implicitly the
    #       equivalent of the `.EndProcessing()` method / `end` block of a
    #       cmdlet.
    function foo { $input | % { "[$_]" } }
    
    # !! Never produces output, and eventually runs out of memory.
    & { while ($true) { (++$i) } } | foo