I'm trying to create an Rx based WebRequestFactory to consume any generic API.
I've got a basic GET request semi sorted
public static IObservable<T> GetData<T>(Uri uri, Func<string, T> generator)
{
System.Diagnostics.Debug.WriteLine(uri);
return (from request in Observable.Return(CreateWebRequest(uri))
from response in Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()
select generator(new StreamReader(response.GetResponseStream()).ReadToEnd())).ObserveOnDispatcher();
}
private static WebRequest CreateWebRequest(Uri uri)
{
var ret = (HttpWebRequest)WebRequest.Create(uri);
ret.AllowReadStreamBuffering = false;
return ret;
}
Which works well, but I'm unsure how Exception handling is done in Rx, is it done only in the Subscribe() method? I.e. Subscribe(onNext: response => handleResponse(response), onError: error => handleError(error) Or can I catch it earlier on?
I'm also having difficulty getting a POST version of GetData, I currently have:
public static IObservable<T> PostData<T>(Uri uri, Func<string, T> generator, String postData)
{
System.Diagnostics.Debug.WriteLine(uri);
byte[] buffer = Encoding.UTF8.GetBytes(postData);
var request = CreatePOSTWebRequest(uri);
var obs1 = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)();
return null;
}
private static WebRequest CreatePOSTWebRequest(Uri uri)
{
var ret = (HttpWebRequest)WebRequest.Create(uri);
ret.Method = "POST";
ret.AllowReadStreamBuffering = false;
return ret;
}
But I have no idea how to connect the sending of the POST data event and then the retrieval of the server response.
It's my first time using Async HTTPRequests, so any help would be much appreciated
Edit: (I hope this is the stack overflow recommended way to adding to my question)
I think I have solved it by using a WebClient instead:
public static IObservable<T> PostData<T>(Uri uri, Func<string, T> generator, String postData)
{
System.Diagnostics.Debug.WriteLine(uri);
WebClient wc = CreatePostWebClient(uri, postData);
return (from e in Observable.FromEvent<UploadStringCompletedEventArgs>(wc, "UploadStringCompleted")
select generator(e.EventArgs.Result)).ObserveOnDispatcher();
}
public static WebClient CreatePostWebClient(Uri uri, string postData)
{
var wc = new WebClient();
wc.AllowReadStreamBuffering = false;
wc.UploadStringAsync(uri, postData);
return wc;
}
But I'm a little concerned with the race condition vibe it's giving me, as in I'm starting the upload before I observe for the response.
You are correct, error handling is done by sending an OnError
message down the pipeline and unsubscribing from the source. There are operators that can change this behavior, like Catch
which can provide a new sequence to use instead (for example, returning a default value using Observable.Return
).
As far as posting data, it shouldn't be too difficult unless you plan on actually asynchronously buffering request data as to not block the client (in which case it becomes significantly more complex). As long as it's just post data, it should just be something like:
return (from request in Observable.Return(CreateWebRequest(uri))
from requestStream in Observable.FromAsyncPattern<WebResponse>(
request.BeginGetRequest, request.EndGetRequest)()
from response in PostAndGetResponse(request, requestStream)
select generator(new StreamReader(response.GetResponseStream()).ReadToEnd())
).ObserveOnDispatcher();
private IObservable<WebResponse> PostAndGetResponse(
WebRequest request, Stream requestStream)
{
// Write data to requestStream
return Observable.FromAsyncPattern<WebResponse>(
request.BeginGetResponse, request.EndGetResponse)()
}