Search code examples
c#.net-corekestrelsystem.io.pipelines

How to create a responding TCP listener with the System.IO.Pipelines package?


I want to create a TCP Listener using Kestrel and the System.IO.Pipelines package. The messages I receive will always be HL7 messages. An example message could be

MSH|^~&|MegaReg|XYZHospC|SuperOE|XYZImgCtr|20060529090131-0500||ADT^A01^ADT_A01|01052901|P|2.5 EVN||200605290901||||200605290900 PID|||56782445^^^UAReg^PI||KLEINSAMPLE^BARRY^Q^JR||19620910|M||2028-9^^HL70005^RA99113^^XYZ|260 GOODWIN CREST DRIVE^^BIRMINGHAM^AL^35209^^M~NICKELL’S PICKLES^10000 W 100TH AVE^BIRMINGHAM^AL^35200^^O|||||||0105I30001^^^99DEF^AN PV1||I|W^389^1^UABH^^^^3||||12345^MORGAN^REX^J^^^MD^0010^UAMC^L||67890^GRAINGER^LUCY^X^^^MD^0010^UAMC^L|MED|||||A0||13579^POTTER^SHERMAN^T^^^MD^0010^UAMC^L|||||||||||||||||||||||||||200605290900 OBX|1|NM|^Body Height||1.80|m^Meter^ISO+|||||F OBX|2|NM|^Body Weight||79|kg^Kilogram^ISO+|||||F AL1|1||^ASPIRIN DG1|1||786.50^CHEST PAIN, UNSPECIFIED^I9|||A

The only important thing to note is that each incoming HL7 message starts with a vertical tab character so you know where the message begins. Each HL7 message contains multiple segments so I think I will have to loop through each segment. After processing the request I want to send back a HL7 message as a response. First of all I came up with this

internal class HL7Listener : ConnectionHandler
{
    public override async Task OnConnectedAsync(ConnectionContext connection)
    {
        IDuplexPipe pipe = connection.Transport;

        await FillPipe(pipe.Output);
        await ReadPipe(pipe.Input);
    }

    private async Task FillPipe(PipeWriter pipeWriter)
    {
        const int minimumBufferSize = 512;

        while (true)
        {
            Memory<byte> memory = pipeWriter.GetMemory(minimumBufferSize);
            
            try
            {
                int bytesRead = 32; // not sure what to do here
                
                if (bytesRead == 0)
                {
                    break;
                }
                
                pipeWriter.Advance(bytesRead);
            }
            catch (Exception ex)
            {
                // ... something failed ...

                break;
            }

            FlushResult result = await pipeWriter.FlushAsync();

            if (result.IsCompleted)
            {
                break;
            }
        }

        pipeWriter.Complete();
    }

    private async Task ReadPipe(PipeReader pipeReader)
    {
        while (true)
        {
            ReadResult result = await pipeReader.ReadAsync();

            ReadOnlySequence<byte> buffer = result.Buffer;
            SequencePosition? position;

            do
            {
                position = buffer.PositionOf((byte)'\v');

                if (position != null)
                {
                    ReadOnlySequence<byte> line = buffer.Slice(0, position.Value);

                    // ... Process the line ...

                    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
                }
            }
            while (position != null);

            pipeReader.AdvanceTo(buffer.Start, buffer.End);

            if (result.IsCompleted)
            {
                break;
            }
        }

        pipeReader.Complete();
    }
}

Unfortunately I'm struggling with some things:

  • The part int bytesRead = 32;, how do I get to know how many bytes have been read? Or how to read with the writer instance?
  • Currently the debugger does not hit the code at // ... Process the line .... Basically I have to extract the whole HL7 message so I can use my HL7 parser to convert the message string.
  • Where do I have to respond? After calling await ReadPipe(pipe.Input);? By using await connection.Transport.Output.WriteAsync(/* the HL7 message to send back */);?

Solution

  • Have you seen David Fowler's TcpEcho example? I'd say that's fairly canonical as he's the one that posted the devblogs System.IO.Pipelines announcement.

    His example deals with raw sockets. I've adapted it to the ConnectionHandler API and HL7 messages (however, I know very little about HL7):

    internal class HL7Listener : ConnectionHandler
    {
        public override async Task OnConnectedAsync(ConnectionContext connection)
        {
            while (true)
            {
                var result = await connection.Transport.Input.ReadAsync();
                var buffer = result.Buffer;
    
                while (TryReadMessage(ref buffer, out ReadOnlySequence<byte> hl7Message))
                {
                    // Process the line.
                    var response = ProcessMessage(hl7Message);
                    await connection.Transport.Output.WriteAsync(response);
                }
    
                if (result.IsCompleted)
                {
                    break;
                }
    
                connection.Transport.Input.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    
        public static bool TryReadMessage(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> hl7Message)
        {
            var endOfMessage = buffer.PositionOf((byte)0x1C);
    
            if (endOfMessage == null || !TryMatchNextByte(ref buffer, endOfMessage.Value, 0x0D, out var lastBytePosition))
            {
                hl7Message = default;
                return false;
            }
    
            var messageBounds = buffer.GetPosition(1, lastBytePosition.Value); // Slice() is exclusive on the upper bound
            hl7Message = buffer.Slice(0, messageBounds);
            buffer = buffer.Slice(messageBounds); // remove message from buffer
            return true;
        }
    
        /// <summary>
        /// Does the next byte after currentPosition match the provided value?
        /// </summary>
        private static bool TryMatchNextByte(ref ReadOnlySequence<byte> buffer, SequencePosition currentPosition, byte value, out SequencePosition? nextPosition)
        {
            nextPosition = buffer.Slice(currentPosition).PositionOf(value);
            if(nextPosition == null || !nextPosition.Value.Equals(buffer.GetPosition(1, currentPosition)))
            {
                nextPosition = null;
                return false;
            }
            return true;
        }
    
        private ReadOnlyMemory<byte> ProcessMessage(ReadOnlySequence<byte> hl7Message)
        {
            var incomingMessage = Encoding.UTF8.GetString(hl7Message.ToArray());
            // do something with the message and generate your response. I'm using UTF8 here
            // but not sure if that's valid for HL7.
            return Encoding.UTF8.GetBytes("Response message: OK!");
        }
    }
    

    Update: Added the most recent information about HL7 messages structure.