Search code examples
c#system.reactivereactive-programmingsodium

Switching streams in RX: Sodium's equivalent of merge and switch in RX


How can the television channel problem as explained in this talk at 31th minute be solved by RX ?

The problem expressed in Rx is as follows:

The are two television channels (channel1 and channel2) which transmit a stream of images, plus a stream of fuzz which represents no channel or white noise.

There are two buttons which send events eButton1 and eButton2 when they are pressed.

These button presses should result in the respective channels being sent to the screen.

Each button press should be projected (mapped) into the respective channel, and then all channels combined into a selection stream as stream of streams which starts with the fuzz stream. Finally a switch operator sends the selected stream to the screen.

Question

What is the equivalent of Sodiums'switch and merge in RX?

Is it possible to solve it with pure higher order functions ? I.e. without using closures ? I don't see how that would be possible.

enter image description here


Solution

  • Switch and Merge both exist in the core Rx library, so happily the code in the slides actually translates almost verbatim line for line into Rx.

    The Switch operator works on a stream of streams - in Rx this is a type IObservable<IObservable<T>>.

    Switch flattens this stream of streams sending only the most recent stream to it's output, so you end up with an IObservable<T>.

    See the C# sample below. I've re-used the variable names in the talk as far as possible, so this should be easy to follow.

    The only thing that's (very slightly) different is the hold function is replaced with the Rx equivalent StartWith.

    Include nuget package Rx-Main and run this as a console app. The code subscribes to the screen stream and starts rendering frames from the "Fuzz" channel to the console. It will prompt you for a channel number. Enter 1 or 2 and you'll see the output switch to frames from the corresponding channel.

    // helper method to create channels
    private static IObservable<string> CreateChannelStream(
        string name, CompositeDisposable disposables)
    {
        // this hacks together a demo channel stream -
        // a stream of "frames" for the channel
        // for simplicity rather than using images, I use a string
        // message for each frame
        // how it works isn't important, just know you'll get a
        // message event every second
        var channel = Observable.Interval(TimeSpan.FromSeconds(1))
                                .Select(x => name + " Frame: " + x)
                                .Publish();
        disposables.Add(channel.Connect());
        return channel;
    }
    
    public static void Main()
    {       
        // for cleaning up the hot channel streams
        var disposable = new CompositeDisposable();
    
        // some channels
        var fuzz = CreateChannelStream("Fuzz", disposable);
        var channel1 = CreateChannelStream("Channel1", disposable);
        var channel2 = CreateChannelStream("Channel2", disposable);
    
        // the button press event streams
        var eButton1 = new Subject<Unit>();
        var eButton2 = new Subject<Unit>();
    
        // the button presses are projected to
        // the respective channel streams
        // note, you could obtain the channel via a function call here
        // if you wanted to - to keep it close to the slides I'm not.
        var eChan1 = eButton1.Select(_ => channel1);
        var eChan2 = eButton2.Select(_ => channel2);
    
        // create the selection "stream of streams"
        // an IObservable<IObservable<string>> here
        // that starts with "fuzz"
        var sel = Observable.Merge(eChan1, eChan2).StartWith(fuzz);
    
        // flatten and select the most recent stream with Switch
        var screen = sel.Switch();
    
        // subscribe to the screen and print the frames
        // it will start with "fuzz"
        disposable.Add(screen.Subscribe(Console.WriteLine));
    
        bool quit = false;
    
        // a little test loop
        // entering 1 or 2 will switch
        // to that channel
        while(!quit)
        {
            var chan = Console.ReadLine();
            switch (chan.ToUpper())
            {
                case "1":
                    // raise a button 1 event
                    eButton1.OnNext(Unit.Default);
                    break;
                case "2":
                    // raise a button 2 event
                    eButton2.OnNext(Unit.Default);
                    break;  
                case "Q":
                    quit = true;
                    break;                
            }
        }         
    
        disposable.Dispose();
    }