I'm looking to process the results of a long-lived HTTP connection from a server I am integrating with as they happen. This server returns one line of JSON (\n
delimited) per "event" I wish to process.
Given an instance of Stream
assigned to the variable changeStream
that represents bytes from the open HTTP connection, here's an extracted example of what I'm doing:
(request
is an instance of WebRequest
, configured to open a connection to the server I am integrating with.)
var response = request.GetResponse();
var changeStream = response.GetResponseStream();
var lineByLine = Observable.Using(
() => new StreamReader(changeStream),
(streamReader) => streamReader.ReadLineAsync().ToObservable()
);
lineByLine.Subscribe((string line) =>
{
System.Console.WriteLine($"JSON! ---------=> {line}");
});
Using the code above, what ends up happening is I receive the first line that the server sends, but then none after that. Neither ones from the initial response, nor new ones generated by real time activity.
Please note: This scenario is not a candidate for switching to SignalR
Even though this looks more complicated, it is better to use the built-in operators to make this work.
IObservable<string> query =
Observable
.FromAsync(() => request.GetResponseAsync())
.SelectMany(
r => Observable.Using(
() => r,
r2 => Observable.Using(
() => r2.GetResponseStream(),
rs => Observable.Using(
() => new StreamReader(rs),
sr =>
Observable
.Defer(() => Observable.Start(() => sr.ReadLine()))
.Repeat()
.TakeWhile(w => w != null)))));
It's untested, but it should be close.