Search code examples
c#system.reactiveudpclient

Merging Deferred Observerables Udp


I'm trying to merge a set of deferred observable (which originate from UDP.RecieveAsync) calls into one observable I can subscribe too. I'm new to Reactive Extensions and I'm sure I'm doing something wrong with the deffering.

Log.Information("InboundUdpListener is starting");

IObservable<UdpReceiveResult> receiveStream = null;

IList<IObservable<UdpReceiveResult>> receivingStreams = new List<IObservable<UdpReceiveResult>>();

foreach (var devicePortMapping in _deviceTypeMapper.GetDeviceTypes())
{
      Log.Information("InboundUdpListener is starting for DeviceType: {DeviceType}, Port: {Port}",
      devicePortMapping.DeviceType, devicePortMapping.Port);

      var client = new UdpClient(devicePortMapping.Port);

      receivingStreams.Add(Observable.Defer(() => client
          .ReceiveAsync()
          .ToObservable())
          .Repeat());

      _clients.Add(client);
}

receiveStream = receivingStreams.Merge();

_listener = receiveStream.Subscribe(async r =>
{
    Log.Information("InboundUdpListener received {BytesReceived} bytes from IPAddress : {IPAddress}, Port : {Port}", r.Buffer.Length, r.RemoteEndPoint.Address.MapToIPv4(),r.RemoteEndPoint.Port);

    var message = new IncomingMessage(r.RemoteEndPoint, r.Buffer);

    var deviceTypeMap = _deviceTypeMapper.GetDeviceType(message);

    message.DeviceType = deviceTypeMap?.DeviceType ?? DeviceTypeEnum.UnIdentified;

    Log.Information("InboundUdpListener is publishing message {@Message}", message);

    await _messagePublisher.Publish(message);
});

Log.Information("InboundUdpListener is started");

Solution

  • You are definitely thinking in a procedural manner, rather than functional.

    You need to try to keep your processing within the observable and avoid foreach and temporary lists of observables.

    You are also using a UdpClient to produce your values - this object is disposable so your observable should manage its lifetime for you. You do that with Observable.Using.

    Also, async methods can be consumed with Observable.FromAsync so you should be using that too.

    So, given all of that your receiveStream should probably look like this:

    IObservable<UdpReceiveResult> receiveStream =
        from devicePortMapping in _deviceTypeMapper.GetDeviceTypes().ToObservable()
        from stream in
            Observable
                .Using(
                    () => new UdpClient(devicePortMapping.Port),
                    client =>
                        Observable
                            .FromAsync(() => client.ReceiveAsync())
                            .Repeat())
        select stream;
    

    Now, given what I see in in your subscribe call, you probably can go one step further and do this:

    IObservable<UdpReceiveResult> receiveStream =
        from devicePortMapping in _deviceTypeMapper.GetDeviceTypes().ToObservable()
        from stream in
            Observable
                .Using(
                    () => new UdpClient(devicePortMapping.Port),
                    client =>
                        Observable
                            .FromAsync(() => client.ReceiveAsync())
                            .Repeat())
        select new IncomingMessage(stream.RemoteEndPoint, stream.Buffer)
        {
            DeviceType = devicePortMapping
        };
    

    This now means that you have access to the original device type in the query itself, so no need to look it up - if I have correctly understood what your code is doing.

    If you do need to do the look up then you should do it as part of the query. Try doing it this way:

    IObservable<UdpReceiveResult> receiveStream =
        from devicePortMapping in _deviceTypeMapper.GetDeviceTypes().ToObservable()
        from stream in
            Observable
                .Using(
                    () => new UdpClient(devicePortMapping.Port),
                    client =>
                        Observable
                            .FromAsync(() => client.ReceiveAsync())
                            .Repeat())
        from message in Observable.Start(() =>
        {
            var message = new IncomingMessage(r.RemoteEndPoint, r.Buffer);
            var deviceTypeMap = _deviceTypeMapper.GetDeviceType(message);
            message.DeviceType = deviceTypeMap?.DeviceType ?? DeviceTypeEnum.UnIdentified;
        })
        select message;
    

    It's probably best to use the second query if you can.