Search code examples
c#pipelineienumerableyield

The IEnumerable appraoch pipeline


code from https://ayende.com/blog/3082/pipes-and-filters-the-ienumerable-appraoch

Question There is registered 3 operations. First gets all Processes in system. Second filters processes. Third writes processes names.

But yield is used and GetEnumerator.

Line current = operation.Execute(current); is executed three times and output list from first operation is input of second opetation. And it is only one list (operations ). I dont understand How and what is stored in private readonly List<IOperation<T>> operations = new List<IOperation<T>>(); ??? as foreach has three execution. Some kind of delegate which is executed when enumerator.MoveNext() is executed? OK, but we have Foreach, which is executed? three times before GetEnumerator is used. So list "operations " should be ovverriden ... ? :) I would like to understand a mechanism. How this "delegates" is stored before execution enumerator.MoveNext() is used. and at what it is used?

class Program
    {
        static void Main(string[] args)
        {
            var trivialProcess = new Pipeline<Process>();

            trivialProcess.Register(new GetAllProcesses());
            trivialProcess.Register(new LimitByWorkingSetSize());
            trivialProcess.Register(new PrintProcessName());

            trivialProcess.Execute();
        }
    }


    interface IOperation<T>
    {
        IEnumerable<T> Execute(IEnumerable<T> input);

    }

    class GetAllProcesses : IOperation<Process>
    {
        public IEnumerable<Process> Execute(IEnumerable<Process> input)
        {
            Debug.WriteLine("GetAllProcesses Execute");
            return Process.GetProcesses();
        }
    }

    class LimitByWorkingSetSize : IOperation<Process>
    {
        public IEnumerable<Process> Execute(IEnumerable<Process> input)
        {
            int maxSizeBytes = 50 * 1024 * 1024;
            Debug.WriteLine("LimitByWorkingSetSize Enter");
            foreach (Process process in input)
            {
                Debug.WriteLine("LimitByWorkingSetSize foreach");
                if (process.WorkingSet64 > maxSizeBytes)
                {
                    Debug.WriteLine("LimitByWorkingSetSize yield");
                    yield return process;
                }
            }

        }
    }

    class PrintProcessName : IOperation<Process>
    {
        public IEnumerable<Process> Execute(IEnumerable<Process> input)
        {
            Debug.WriteLine("PrintProcessName Enter");
            foreach (Process process in input)
            {
                Debug.WriteLine("PrintProcessName  print");
                Console.WriteLine(process.ProcessName);
            }
            Debug.WriteLine("PrintProcessName break");
            yield break;
        }
    }

    class Pipeline<T>
    {
        private readonly List<IOperation<T>> operations = new List<IOperation<T>>();

        public Pipeline<T> Register(IOperation<T> operation)
        {
            operations.Add(operation);
            return this;
        }

        public void Execute()
        {
            IEnumerable<T> current = new List<T>();

            foreach (IOperation<T> operation in operations)
            {
                current = operation.Execute(current);
            }
            IEnumerator<T> enumerator = current.GetEnumerator();
            while (enumerator.MoveNext()) ;
        }
    }

Solution

  • I dont understand How and what is stored in private readonly List<IOperation> operations = new List<IOperation>(); ???

    It's a List of IOperation<T>, in this case an instance of the GetAllProcesses, LimitByWorkingSetSize or PrintProcessName classes. All these classes implement the IOperation<T> interface and hence are capable of being treated as an IOperation<T>, and are permitted to be stored in the list.

    as foreach has three execution

    Yes but that just sets up the chain of calls for the LimitBy and PrintProcesses classes because their Executes are generators (feature yield) - they'll only be executed when they're enumerated. The current = operation.Execute(current); will execute the Execute of GetAll right away, because that doesn't yield anything (but it could be modified so it did) but running the code in PrintProcess, which leans on LimitBy, only happens when that MoveNext() occurs. PrintProcess traps control flow and doesn't yield anything while it's enumerating.

    delegate which is executed when enumerator.MoveNext() is executed?

    I suspect that's delving a bit deep into the mechanics of yield for this context (but you ask about it again below, so..)

    OK, but we have Foreach, which is executed?

    There are lots of foreaches in the code sample given. Executing the PrintProcess's Execute runs a foreach, which enumerates the output of the next method down in the chain (LimitBy's Execute) - which in itself also contains a foreach. If you single step it in the debugger with a breakpoint on the { under every foreach you'll see it jumping back and forth between PrintProcess and LimitBy; LimitBy is generating data on some loop passes that PrintProcess is enumerating. PrintProcesses doesn't yield anything to a further-up operation so control passes back and forth between these two until LimitBy has nothing left to give, at which point there will be no more things for PrintProcess to enumerate

    So list "operations " should be ovverriden ... ?

    I suspect that's a typo of overwritten. There isn't anything in the code that overwrites the operations list; it's current that's overwritten. It merely serves as a way of remembering what the output was last time so it can become an input this time on every pass of the loop. It wasn't strictly necessary to init it to a new List at the outset because the first Execute in this chain doesn't do anything with its input

    How this "delegates" is stored before execution enumerator.MoveNext() is used. and at what it is used?

    I think you're essentially asking how yield works; the short short version is that the compiler writes some classes for you that implement an enumerator that generates values based on the logic you used surrounding the yield keyword. The long answer is fairly horrific and not something you'd want to write yourself, which is why we're usually more than happy to let the compiler generate it and not look under the hood:

    #define DEBUG
    using System;
    using System.Collections;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Reflection;
    using System.Runtime.CompilerServices;
    using System.Security;
    using System.Security.Permissions;
    
    [assembly: CompilationRelaxations(8)]
    [assembly: RuntimeCompatibility(WrapNonExceptionThrows = true)]
    [assembly: Debuggable(DebuggableAttribute.DebuggingModes.Default | DebuggableAttribute.DebuggingModes.DisableOptimizations | DebuggableAttribute.DebuggingModes.IgnoreSymbolStoreSequencePoints | DebuggableAttribute.DebuggingModes.EnableEditAndContinue)]
    [assembly: SecurityPermission(SecurityAction.RequestMinimum, SkipVerification = true)]
    [assembly: AssemblyVersion("0.0.0.0")]
    [module: UnverifiableCode]
    namespace ConsoleApp7net5
    {
        internal class Program
        {
            private static void Main(string[] args)
            {
                Pipeline<Process> pipeline = new Pipeline<Process>();
                pipeline.Register(new GetAllProcesses());
                pipeline.Register(new LimitByWorkingSetSize());
                pipeline.Register(new PrintProcessName());
                pipeline.Execute();
            }
        }
        internal interface IOperation<T>
        {
            IEnumerable<T> Execute(IEnumerable<T> input);
        }
        internal class GetAllProcesses : IOperation<Process>
        {
            public IEnumerable<Process> Execute(IEnumerable<Process> input)
            {
                Debug.WriteLine("GetAllProcesses Execute");
                return Process.GetProcesses();
            }
        }
        internal class LimitByWorkingSetSize : IOperation<Process>
        {
            [CompilerGenerated]
            private sealed class <Execute>d__0 : IEnumerable<Process>, IEnumerable, IEnumerator<Process>, IDisposable, IEnumerator
            {
                private int <>1__state;
    
                private Process <>2__current;
    
                private int <>l__initialThreadId;
    
                private IEnumerable<Process> input;
    
                public IEnumerable<Process> <>3__input;
    
                public LimitByWorkingSetSize <>4__this;
    
                private int <maxSizeBytes>5__1;
    
                private IEnumerator<Process> <>s__2;
    
                private Process <process>5__3;
    
                Process IEnumerator<Process>.Current
                {
                    [DebuggerHidden]
                    get
                    {
                        return <>2__current;
                    }
                }
    
                object IEnumerator.Current
                {
                    [DebuggerHidden]
                    get
                    {
                        return <>2__current;
                    }
                }
    
                [DebuggerHidden]
                public <Execute>d__0(int <>1__state)
                {
                    this.<>1__state = <>1__state;
                    <>l__initialThreadId = Environment.CurrentManagedThreadId;
                }
    
                [DebuggerHidden]
                void IDisposable.Dispose()
                {
                    int num = <>1__state;
                    if (num == -3 || num == 1)
                    {
                        try
                        {
                        }
                        finally
                        {
                            <>m__Finally1();
                        }
                    }
                }
    
                private bool MoveNext()
                {
                    try
                    {
                        int num = <>1__state;
                        if (num != 0)
                        {
                            if (num != 1)
                            {
                                return false;
                            }
                            <>1__state = -3;
                            goto IL_00bb;
                        }
                        <>1__state = -1;
                        <maxSizeBytes>5__1 = 52428800;
                        Debug.WriteLine("LimitByWorkingSetSize Enter");
                        <>s__2 = input.GetEnumerator();
                        <>1__state = -3;
                        goto IL_00c3;
                        IL_00bb:
                        <process>5__3 = null;
                        goto IL_00c3;
                        IL_00c3:
                        if (<>s__2.MoveNext())
                        {
                            <process>5__3 = <>s__2.Current;
                            Debug.WriteLine("LimitByWorkingSetSize foreach");
                            if (<process>5__3.WorkingSet64 > <maxSizeBytes>5__1)
                            {
                                Debug.WriteLine("LimitByWorkingSetSize yield");
                                <>2__current = <process>5__3;
                                <>1__state = 1;
                                return true;
                            }
                            goto IL_00bb;
                        }
                        <>m__Finally1();
                        <>s__2 = null;
                        return false;
                    }
                    catch
                    {
                        //try-fault
                        ((IDisposable)this).Dispose();
                        throw;
                    }
                }
    
                bool IEnumerator.MoveNext()
                {
                    //ILSpy generated this explicit interface implementation from .override directive in MoveNext
                    return this.MoveNext();
                }
    
                private void <>m__Finally1()
                {
                    <>1__state = -1;
                    if (<>s__2 != null)
                    {
                        <>s__2.Dispose();
                    }
                }
    
                [DebuggerHidden]
                void IEnumerator.Reset()
                {
                    throw new NotSupportedException();
                }
    
                [DebuggerHidden]
                IEnumerator<Process> IEnumerable<Process>.GetEnumerator()
                {
                    <Execute>d__0 <Execute>d__;
                    if (<>1__state == -2 && <>l__initialThreadId == Environment.CurrentManagedThreadId)
                    {
                        <>1__state = 0;
                        <Execute>d__ = this;
                    }
                    else
                    {
                        <Execute>d__ = new <Execute>d__0(0);
                        <Execute>d__.<>4__this = <>4__this;
                    }
                    <Execute>d__.input = <>3__input;
                    return <Execute>d__;
                }
    
                [DebuggerHidden]
                IEnumerator IEnumerable.GetEnumerator()
                {
                    return ((IEnumerable<Process>)this).GetEnumerator();
                }
            }
    
            [IteratorStateMachine(typeof(<Execute>d__0))]
            public IEnumerable<Process> Execute(IEnumerable<Process> input)
            {
                <Execute>d__0 <Execute>d__ = new <Execute>d__0(-2);
                <Execute>d__.<>4__this = this;
                <Execute>d__.<>3__input = input;
                return <Execute>d__;
            }
        }
        internal class PrintProcessName : IOperation<Process>
        {
            [CompilerGenerated]
            private sealed class <Execute>d__0 : IEnumerable<Process>, IEnumerable, IEnumerator<Process>, IDisposable, IEnumerator
            {
                private int <>1__state;
    
                private Process <>2__current;
    
                private int <>l__initialThreadId;
    
                private IEnumerable<Process> input;
    
                public IEnumerable<Process> <>3__input;
    
                public PrintProcessName <>4__this;
    
                private IEnumerator<Process> <>s__1;
    
                private Process <process>5__2;
    
                Process IEnumerator<Process>.Current
                {
                    [DebuggerHidden]
                    get
                    {
                        return <>2__current;
                    }
                }
    
                object IEnumerator.Current
                {
                    [DebuggerHidden]
                    get
                    {
                        return <>2__current;
                    }
                }
    
                [DebuggerHidden]
                public <Execute>d__0(int <>1__state)
                {
                    this.<>1__state = <>1__state;
                    <>l__initialThreadId = Environment.CurrentManagedThreadId;
                }
    
                [DebuggerHidden]
                void IDisposable.Dispose()
                {
                }
    
                private bool MoveNext()
                {
                    if (<>1__state != 0)
                    {
                        return false;
                    }
                    <>1__state = -1;
                    Debug.WriteLine("PrintProcessName Enter");
                    <>s__1 = input.GetEnumerator();
                    try
                    {
                        while (<>s__1.MoveNext())
                        {
                            <process>5__2 = <>s__1.Current;
                            Debug.WriteLine("PrintProcessName  print");
                            Console.WriteLine(<process>5__2.ProcessName);
                            <process>5__2 = null;
                        }
                    }
                    finally
                    {
                        if (<>s__1 != null)
                        {
                            <>s__1.Dispose();
                        }
                    }
                    <>s__1 = null;
                    Debug.WriteLine("PrintProcessName break");
                    return false;
                }
    
                bool IEnumerator.MoveNext()
                {
                    //ILSpy generated this explicit interface implementation from .override directive in MoveNext
                    return this.MoveNext();
                }
    
                [DebuggerHidden]
                void IEnumerator.Reset()
                {
                    throw new NotSupportedException();
                }
    
                [DebuggerHidden]
                IEnumerator<Process> IEnumerable<Process>.GetEnumerator()
                {
                    <Execute>d__0 <Execute>d__;
                    if (<>1__state == -2 && <>l__initialThreadId == Environment.CurrentManagedThreadId)
                    {
                        <>1__state = 0;
                        <Execute>d__ = this;
                    }
                    else
                    {
                        <Execute>d__ = new <Execute>d__0(0);
                        <Execute>d__.<>4__this = <>4__this;
                    }
                    <Execute>d__.input = <>3__input;
                    return <Execute>d__;
                }
    
                [DebuggerHidden]
                IEnumerator IEnumerable.GetEnumerator()
                {
                    return ((IEnumerable<Process>)this).GetEnumerator();
                }
            }
    
            [IteratorStateMachine(typeof(<Execute>d__0))]
            public IEnumerable<Process> Execute(IEnumerable<Process> input)
            {
                <Execute>d__0 <Execute>d__ = new <Execute>d__0(-2);
                <Execute>d__.<>4__this = this;
                <Execute>d__.<>3__input = input;
                return <Execute>d__;
            }
        }
        internal class Pipeline<T>
        {
            private readonly List<IOperation<T>> operations = new List<IOperation<T>>();
    
            public Pipeline<T> Register(IOperation<T> operation)
            {
                operations.Add(operation);
                return this;
            }
    
            public void Execute()
            {
                IEnumerable<T> enumerable = null;
                List<IOperation<T>>.Enumerator enumerator = operations.GetEnumerator();
                try
                {
                    while (enumerator.MoveNext())
                    {
                        IOperation<T> current = enumerator.Current;
                        enumerable = current.Execute(enumerable);
                    }
                }
                finally
                {
                    ((IDisposable)enumerator).Dispose();
                }
                IEnumerator<T> enumerator2 = enumerable.GetEnumerator();
                while (enumerator2.MoveNext())
                {
                    int num = 0;
                }
            }
        }
    }
    

    And for my money I'd probably leave all this alone and just do:

    Process.GetProcesses()
      .Where(p => p.BasePriority > 50*1024*1024)
      .Select(p => p.ProcessName)
      .ToList();
    

    Here's a bit on yield:

    When a method returns something using yield, it is possible to get back into the method and carry on from where we left off. Look at this method that doesn't yield:

    int FibonacciSequence(){
      return 1;
      return 2;
      return 3;
      return 5;
    }
    

    When we call this, we'll only ever get 1. The first return is a hard stop; we could call it a million times and only the first return ever does anything. All the other code is totally unreachable.

    Now let's change it so it yields:

    IEnumerable<int> FibonacciSequence(){
      yield return 1;
      yield return 2;
      yield return 3;
      yield return 5;
    }
    

    It's a enumerable now, so now we call it as part of something that will enumerate over it:

    foreach(var v in FibonacciSequence())
      Console.WriteLine(v);
    

    There's a bunch of stuff the compiler does to transform your foreach into a while that gets an enumerator and calls movenext over and over but the crux of it is, when you enumerate over this yielding method it

    • returns 1 to the foreach loop
    • 1 is printed
    • movenext is called invisibly by the foreach, which makes c# go back into the method just after the "return 1" and this time it returns 2
    • 2 is printed
    • go back into the method after the yield return 2, return 3
    • 3 is printed
    • back in, return 5
    • print 5
    • back in, fall out of the end of the method, stopping the enumeration

    So when we yield, it's like it places a marker in the method at that point, goes back to the caller, and when we come back into the method (by moving the iterator along one), we start at the marker from where we left off, rather than from the first line of the method

    You don't have to hard code your yields; you can write a method that you can enumerate forever:

    IEnumerable<int> FibonacciSequence(){
      int prevprev = 0;
      int prev = 1;
    
      while(true){
        int toReturn = prevprev + prev;
        prevprev = prev;
        prev = toReturn;
        yield return toReturn;
      }
    }
    

    Every time it hits yield, C# goes back to the enumerating code with the new value. Because this is in a loop it'll just keep generating Fibonacci til it blows up (the addition results in an overflow if you have checked math on, or you fill up your memory.. or if you're just printing, it'll go forever). To be able to stop enumerating you either need to return from the method without yielding(fall out of the end of the method), or do a yield break