Search code examples
c#.netsystem.reactivehttp-streaming

How do I receive live data from a long lived HTTP connection in C#?


I'm looking to process the results of a long-lived HTTP connection from a server I am integrating with as they happen. This server returns one line of JSON (\n delimited) per "event" I wish to process.

Given an instance of Stream assigned to the variable changeStream that represents bytes from the open HTTP connection, here's an extracted example of what I'm doing:

(request is an instance of WebRequest, configured to open a connection to the server I am integrating with.)

var response = request.GetResponse();
var changeStream = response.GetResponseStream();

var lineByLine = Observable.Using(
    () => new StreamReader(changeStream),
    (streamReader) => streamReader.ReadLineAsync().ToObservable()
);

lineByLine.Subscribe((string line) =>
{
    System.Console.WriteLine($"JSON! ---------=> {line}");
});

Using the code above, what ends up happening is I receive the first line that the server sends, but then none after that. Neither ones from the initial response, nor new ones generated by real time activity.

  • For the purposes of my question, assume this connection will remain open indefinitely.
  • How do I go about having system.reactive trigger a callback for each line as I encounter them?

Please note: This scenario is not a candidate for switching to SignalR


Solution

  • Even though this looks more complicated, it is better to use the built-in operators to make this work.

    IObservable<string> query =
        Observable
            .FromAsync(() => request.GetResponseAsync())
            .SelectMany(
                r => Observable.Using(
                    () => r,
                    r2 => Observable.Using(
                        () => r2.GetResponseStream(),
                        rs => Observable.Using(
                            () => new StreamReader(rs),
                            sr =>
                                Observable
                                    .Defer(() => Observable.Start(() => sr.ReadLine()))
                                    .Repeat()
                                    .TakeWhile(w => w != null)))));
    

    It's untested, but it should be close.