How can the television channel problem as explained in this talk at 31th minute be solved by RX ?
The problem expressed in Rx is as follows:
The are two television channels (channel1
and channel2
) which transmit a stream of images, plus a stream of fuzz
which represents no channel or white noise.
There are two buttons which send events eButton1
and eButton2
when they are pressed.
These button presses should result in the respective channels being sent to the screen.
Each button press should be projected (mapped) into the respective channel, and then all channels combined into a selection stream as stream of streams which starts with the fuzz
stream. Finally a switch operator sends the selected stream to the screen
.
What is the equivalent of Sodiums'switch and merge in RX?
Is it possible to solve it with pure higher order functions ? I.e. without using closures ? I don't see how that would be possible.
Switch and Merge both exist in the core Rx library, so happily the code in the slides actually translates almost verbatim line for line into Rx.
The Switch
operator works on a stream of streams - in Rx this is a type IObservable<IObservable<T>>
.
Switch flattens this stream of streams sending only the most recent stream to it's output, so you end up with an IObservable<T>
.
See the C# sample below. I've re-used the variable names in the talk as far as possible, so this should be easy to follow.
The only thing that's (very slightly) different is the hold
function is replaced with the Rx equivalent StartWith
.
Include nuget package Rx-Main
and run this as a console app. The code subscribes to the screen
stream and starts rendering frames from the "Fuzz" channel to the console. It will prompt you for a channel number. Enter 1 or 2 and you'll see the output switch to frames from the corresponding channel.
// helper method to create channels
private static IObservable<string> CreateChannelStream(
string name, CompositeDisposable disposables)
{
// this hacks together a demo channel stream -
// a stream of "frames" for the channel
// for simplicity rather than using images, I use a string
// message for each frame
// how it works isn't important, just know you'll get a
// message event every second
var channel = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(x => name + " Frame: " + x)
.Publish();
disposables.Add(channel.Connect());
return channel;
}
public static void Main()
{
// for cleaning up the hot channel streams
var disposable = new CompositeDisposable();
// some channels
var fuzz = CreateChannelStream("Fuzz", disposable);
var channel1 = CreateChannelStream("Channel1", disposable);
var channel2 = CreateChannelStream("Channel2", disposable);
// the button press event streams
var eButton1 = new Subject<Unit>();
var eButton2 = new Subject<Unit>();
// the button presses are projected to
// the respective channel streams
// note, you could obtain the channel via a function call here
// if you wanted to - to keep it close to the slides I'm not.
var eChan1 = eButton1.Select(_ => channel1);
var eChan2 = eButton2.Select(_ => channel2);
// create the selection "stream of streams"
// an IObservable<IObservable<string>> here
// that starts with "fuzz"
var sel = Observable.Merge(eChan1, eChan2).StartWith(fuzz);
// flatten and select the most recent stream with Switch
var screen = sel.Switch();
// subscribe to the screen and print the frames
// it will start with "fuzz"
disposable.Add(screen.Subscribe(Console.WriteLine));
bool quit = false;
// a little test loop
// entering 1 or 2 will switch
// to that channel
while(!quit)
{
var chan = Console.ReadLine();
switch (chan.ToUpper())
{
case "1":
// raise a button 1 event
eButton1.OnNext(Unit.Default);
break;
case "2":
// raise a button 2 event
eButton2.OnNext(Unit.Default);
break;
case "Q":
quit = true;
break;
}
}
disposable.Dispose();
}