I am writing a class library which connects to the Twitter Streaming API and processes the continuous JSON stream in real-time. I want to raise an event every time a new tweet is received from the API so that I can make use of a lambda method in the caller class like so:
var stream = new Stream(customerKey,customerSecret,accessToken,accessTokenSecret);
stream.Start();
// Handler
stream.TweetReceivedEvent += (sender, tweetargs) =>
{
Console.WriteLine(tweetargs.Tweet.ToString());
};
However, I am unsure of how to do this. At the moment, I have created a class called Stream
where the logic associated with connecting to the Twitter Streaming API is held (featured only the Start()
method from this class below for brevity):
public async Task Start()
{
//Twitter Streaming API
string stream_url = "https://stream.twitter.com/1.1/statuses/filter.json";
string trackKeywords = "twitter";
string followUserId = "";
string locationCoord = "";
string postparameters = (trackKeywords.Length == 0 ? string.Empty : "&track=" + trackKeywords) +
(followUserId.Length == 0 ? string.Empty : "&follow=" + followUserId) +
(locationCoord.Length == 0 ? string.Empty : "&locations=" + locationCoord);
if (!string.IsNullOrEmpty(postparameters))
{
if (postparameters.IndexOf('&') == 0)
postparameters = postparameters.Remove(0, 1).Replace("#", "%23");
}
//Connect
webRequest = (HttpWebRequest) WebRequest.Create(stream_url);
webRequest.Timeout = -1;
webRequest.Headers.Add("Authorization", GetAuthHeader(stream_url + "?" + postparameters));
Encoding encode = Encoding.GetEncoding("utf-8");
if (postparameters.Length > 0)
{
webRequest.Method = "POST";
webRequest.ContentType = "application/x-www-form-urlencoded";
byte[] _twitterTrack = encode.GetBytes(postparameters);
webRequest.ContentLength = _twitterTrack.Length;
var _twitterPost = webRequest.GetRequestStream();
_twitterPost.Write(_twitterTrack, 0, _twitterTrack.Length);
_twitterPost.Close();
}
webRequest.BeginGetResponse(ar =>
{
var req = (WebRequest)ar.AsyncState;
using (var response = req.EndGetResponse(ar))
{
using (var reader = new StreamReader(response.GetResponseStream()))
{
while (!reader.EndOfStream)
{
// Deserialize the JSON obj to type Tweet
var jsonObj = JsonConvert.DeserializeObject<Tweet>(reader.ReadLine(), new JsonSerializerSettings());
Console.WriteLine(jsonObj.Text);
Raise(TweetReceivedEvent, new TweetReceivedEventArgs(jsonObj));
}
}
}
}, webRequest);
}
Inside of the same class, I have created an event and a delegate like so:
public event TweetReceivedHandler TweetReceivedEvent;
public delegate void TweetReceivedHandler(TwitterStreamClient s, TweetEventArgs e);
And the Raise
method calls the EventHandler method which is featured in my tester class:
public void Raise(TweetReceivedHandler handler, TweetEventArgs e)
{
if (handler != null)
{
handler(this, e);
}
}
However when I debug and step through the Raise
method the Handler is always null. What am I missing here? As you can see I've taken some steps to making this method async and returning a Task although I am unsure that this is the correct course of action.
If you need any clarification then feel free to ask and I will be eternally grateful to you if you could explain what it is I need to do! Apologise in advance if I'm getting the wrong end of the stick completely!
A (slightly older) version of this class exists at https://github.com/adaam2/APoorMansTwitterStreamingClient/blob/master/TwitterClient/Infrastructure/Utility/TwitterStreamClient.cs if you want to check out the full code.
As Hans suggests, the key to seeing the event that is raised is to make sure you have subscribed to it before it's possible for the event to be raised. Otherwise, it could wind up being raised before you get a chance to subscribe to it. E.g.:
var stream = new Stream(customerKey,customerSecret,accessToken,accessTokenSecret);
// Handler
stream.TweetReceivedEvent += (sender, tweetargs) =>
{
Console.WriteLine(tweetargs.Tweet.ToString());
};
stream.Start();
As far as fixing your Start()
method goes, the key to writing an async
method is to make sure you use await
in it. Ideally all asynchronous operations initiated in the method will be awaitable, so that you can use the simplified await
syntax throughout.
In your example, I would recommend something more like this:
public async Task Start()
{
//Twitter Streaming API
string stream_url = "https://stream.twitter.com/1.1/statuses/filter.json";
string trackKeywords = "twitter";
string followUserId = "";
string locationCoord = "";
string postparameters = (trackKeywords.Length == 0 ? string.Empty : "&track=" + trackKeywords) +
(followUserId.Length == 0 ? string.Empty : "&follow=" + followUserId) +
(locationCoord.Length == 0 ? string.Empty : "&locations=" + locationCoord);
if (!string.IsNullOrEmpty(postparameters))
{
if (postparameters.IndexOf('&') == 0)
postparameters = postparameters.Remove(0, 1).Replace("#", "%23");
}
//Connect
webRequest = (HttpWebRequest) WebRequest.Create(stream_url);
webRequest.Timeout = -1;
webRequest.Headers.Add("Authorization", GetAuthHeader(stream_url + "?" + postparameters));
Encoding encode = Encoding.GetEncoding("utf-8");
if (postparameters.Length > 0)
{
webRequest.Method = "POST";
webRequest.ContentType = "application/x-www-form-urlencoded";
byte[] _twitterTrack = encode.GetBytes(postparameters);
webRequest.ContentLength = _twitterTrack.Length;
var _twitterPost = await webRequest.GetRequestStreamAsync();
await _twitterPost.WriteAsync(_twitterTrack, 0, _twitterTrack.Length);
_twitterPost.Close();
}
using (var response = await webRequest.GetResponseAsync())
{
using (var reader = new StreamReader(response.GetResponseStream()))
{
while (!reader.EndOfStream)
{
// Deserialize the JSON obj to type Tweet
var jsonObj = JsonConvert.DeserializeObject<Tweet>(await reader.ReadLineAsync(), new JsonSerializerSettings());
Console.WriteLine(jsonObj.Text);
Raise(TweetReceivedEvent, new TweetReceivedEventArgs(jsonObj));
}
}
}
}
Note that I used the ...Async()
method in four places in the above: getting the request stream, writing to the request stream, getting the response, and processing the response stream. In this way, the logic of the method can be written in a direct, step-by-step way but while still allowing for asynchronous operation. That is, the method will return and execution of the current thread can proceed while these asynchronous operations are in progress, and the method will resume execution later when they complete.
Most importantly, doing it this way ensures that the Task
object returned by the method does not itself complete until the whole operation has completed. In this way, you can actually remove the event completely and rely on the Task
object itself to signal completion. Then your call site looks like this instead:
var stream = new Stream(customerKey,customerSecret,accessToken,accessTokenSecret);
await stream.Start();
Console.WriteLine(tweetargs.Tweet.ToString());
Finally, one more suggestion: I strongly recommend that you use a name other than Stream
for your class. The name Stream
is practically guaranteed to cause confusion when trying to read and maintain the code.