Search code examples
c#twitterlinq-to-twitter

How to Raise new event on twitter so that clients can receive notification of the event?


I am retrieving tweets using LINQ to Twitter 4.1.0 searching for a particular hashtag eg.#abc. Now want to be notified whenever anyone tweets in my account using that same hashtag eg.#abc. Can anyone suggest me, how to do this?


Solution

  • If you're using the REST API, you can do a search query that runs periodically, using the SinceID/MaxID to make sure you don't search for tweets you've already seen. It works like this:

        static async Task DoPagedSearchAsync(TwitterContext twitterCtx)
        {
            const int MaxSearchEntriesToReturn = 100;
    
            string searchTerm = "twitter";
    
            // oldest id you already have for this search term
            ulong sinceID = 1;
    
            // used after the first query to track current session
            ulong maxID; 
    
            var combinedSearchResults = new List<Status>();
    
            List<Status> searchResponse =
                await
                (from search in twitterCtx.Search
                 where search.Type == SearchType.Search &&
                       search.Query == searchTerm &&
                       search.Count == MaxSearchEntriesToReturn &&
                       search.SinceID == sinceID
                 select search.Statuses)
                .SingleOrDefaultAsync();
    
            combinedSearchResults.AddRange(searchResponse);
            ulong previousMaxID = ulong.MaxValue;
            do
            {
                // one less than the newest id you've just queried
                maxID = searchResponse.Min(status => status.StatusID) - 1;
    
                Debug.Assert(maxID < previousMaxID);
                previousMaxID = maxID;
    
                searchResponse =
                    await
                    (from search in twitterCtx.Search
                     where search.Type == SearchType.Search &&
                           search.Query == searchTerm &&
                           search.Count == MaxSearchEntriesToReturn &&
                           search.MaxID == maxID &&
                           search.SinceID == sinceID
                     select search.Statuses)
                    .SingleOrDefaultAsync();
    
                combinedSearchResults.AddRange(searchResponse);
            } while (searchResponse.Any());
    
            combinedSearchResults.ForEach(tweet =>
                Console.WriteLine(
                    "\n  User: {0} ({1})\n  Tweet: {2}",
                    tweet.User.ScreenNameResponse,
                    tweet.User.UserIDResponse,
                    tweet.Text));
        }
    

    In this example, the SinceID is set to 1 to get all the tweets since the beginning of Twitter. However, you should pass that as a parameter, having previously kept the oldest tweet ID from the previous query.

    Since you didn't post any code showing what you wanted, I'm assuming that you're using some form of polling on the REST API, not too far unlike the example above. However, you can also use the streaming API, which returns a matching tweet within seconds after it's been tweeted. Here's an example of using a Filter stream:

        static async Task DoFilterStreamAsync(TwitterContext twitterCtx)
        {
            Console.WriteLine("\nStreamed Content: \n");
            int count = 0;
            var cancelTokenSrc = new CancellationTokenSource();
    
            try
            {
                await
                    (from strm in twitterCtx.Streaming
                                            .WithCancellation(cancelTokenSrc.Token)
                     where strm.Type == StreamingType.Filter &&
                           strm.Track == "twitter"
                     select strm)
                    .StartAsync(async strm =>
                    {
                        await HandleStreamResponse(strm);
    
                        if (count++ >= 5)
                            cancelTokenSrc.Cancel();
                    });
            }
            catch (IOException ex)
            {
                // Twitter might have closed the stream,
                // which they do sometimes. You should
                // restart the stream, but be sure to
                // read Twitter documentation on stream
                // back-off strategies to prevent your
                // app from being blocked.
                Console.WriteLine(ex.ToString());
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Stream cancelled.");
            }
        }
    

    The HandleStreamResponse() is a method you would write to handle the notification. My suggestion is that you use a message queue or something else so that you don't block the stream in case the term your using is trending or getting high traffic. Just be quick about handling the message.

    If you prefer reactive programming, you can do that like this:

        static async Task DoRxObservableStreamAsync(TwitterContext twitterCtx)
        {
            Console.WriteLine("\nStreamed Content: \n");
            int count = 0;
            var cancelTokenSrc = new CancellationTokenSource();
    
            try
            {
                var observable =
                    await
                        (from strm in twitterCtx.Streaming
                                                .WithCancellation(cancelTokenSrc.Token)
                         where strm.Type == StreamingType.Filter &&
                               strm.Track == "twitter"
                         select strm)
                        .ToObservableAsync();
    
                observable.Subscribe(
                    async strm =>
                    {
                        await HandleStreamResponse(strm);
    
                        if (count++ >= 5)
                            cancelTokenSrc.Cancel();
                    },
                    ex => Console.WriteLine(ex.ToString()),
                    () => Console.WriteLine("Completed"));
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Stream cancelled.");
            }
        }
    

    In both the callback and reactive scenarios, monitor the steam to see if it was closed (e.g. OperationCancelledException). Then you must create a brand new stream instance to re-start. If that happens, keep track of the last TweetID you saw and use the REST API to search all the tweets between then and the time the new stream starts.

    Update

    In each demo, there's a HandleStreamResponse method, which is async. You can download the demo source code and step through it to get a feel for how it works. Essentially, the StreamContent type has an EntityType property that tells what type the response is. Because the demos in the source code use different types of streams, the switch statement accounts for all the possible message types. However, in the case of a Search stream, the only responses will be StreamEntityType.Status, which will simplify your code because you can eliminate the other cases. Once the type is known, you can just do the conversion with the as operator on the Entity property like var status = strm.Entity as Status and then query the status variable properties for the info you need.

        static async Task<int> HandleStreamResponse(StreamContent strm)
        {
            switch (strm.EntityType)
            {
                case StreamEntityType.Control:
                    var control = strm.Entity as Control;
                    Console.WriteLine("Control URI: {0}", control.URL);
                    break;
                case StreamEntityType.Delete:
                    var delete = strm.Entity as Delete;
                    Console.WriteLine("Delete - User ID: {0}, Status ID: {1}", delete.UserID, delete.StatusID);
                    break;
                case StreamEntityType.DirectMessage:
                    var dm = strm.Entity as DirectMessage;
                    Console.WriteLine("Direct Message - Sender: {0}, Text: {1}", dm.Sender, dm.Text);
                    break;
                case StreamEntityType.Disconnect:
                    var disconnect = strm.Entity as Disconnect;
                    Console.WriteLine("Disconnect - {0}", disconnect.Reason);
                    break;
                case StreamEntityType.Event:
                    var evt = strm.Entity as Event;
                    Console.WriteLine("Event - Event Name: {0}", evt.EventName);
                    break;
                case StreamEntityType.ForUser:
                    var user = strm.Entity as ForUser;
                    Console.WriteLine("For User - User ID: {0}, # Friends: {1}", user.UserID, user.Friends.Count);
                    break;
                case StreamEntityType.FriendsList:
                    var friends = strm.Entity as FriendsList;
                    Console.WriteLine("Friends List - # Friends: {0}", friends.Friends.Count);
                    break;
                case StreamEntityType.GeoScrub:
                    var scrub = strm.Entity as GeoScrub;
                    Console.WriteLine("GeoScrub - User ID: {0}, Up to Status ID: {1}", scrub.UserID, scrub.UpToStatusID);
                    break;
                case StreamEntityType.Limit:
                    var limit = strm.Entity as Limit;
                    Console.WriteLine("Limit - Track: {0}", limit.Track);
                    break;
                case StreamEntityType.Stall:
                    var stall = strm.Entity as Stall;
                    Console.WriteLine("Stall - Code: {0}, Message: {1}, % Full: {2}", stall.Code, stall.Message, stall.PercentFull);
                    break;
                case StreamEntityType.Status:
                    var status = strm.Entity as Status;
                    Console.WriteLine("Status - @{0}: {1}", status.User.ScreenNameResponse, status.Text);
                    break;
                case StreamEntityType.StatusWithheld:
                    var statusWithheld = strm.Entity as StatusWithheld;
                    Console.WriteLine("Status Withheld - Status ID: {0}, # Countries: {1}", statusWithheld.StatusID, statusWithheld.WithheldInCountries.Count);
                    break;
                case StreamEntityType.TooManyFollows:
                    var follows = strm.Entity as TooManyFollows;
                    Console.WriteLine("Too Many Follows - Message: {0}", follows.Message);
                    break;
                case StreamEntityType.UserWithheld:
                    var userWithheld = strm.Entity as UserWithheld;
                    Console.WriteLine("User Withheld - User ID: {0}, # Countries: {1}", userWithheld.UserID, userWithheld.WithheldInCountries.Count);
                    break;
                case StreamEntityType.ParseError:
                    var unparsedJson = strm.Entity as string;
                    Console.WriteLine("Parse Error - {0}", unparsedJson);
                    break;
                case StreamEntityType.Unknown:
                default:
                    Console.WriteLine("Unknown - " + strm.Content + "\n");
                    break;
            }
    
            return await Task.FromResult(0);
        }