Search code examples
c#twitterstreamlinq-to-twitterrx.net

Rx Twitter Stream stops on entering the third search topic


I tried to rebuild the Twitter example by Jonathan Worthington from this video: https://www.youtube.com/watch?v=_VdIQTtRkb8

It starts out working fine. But after a while the Stream stops. On the third search topic entered the app doesn't recieve any tweets anymore. And I have no idea why. It seems not to be dependent on time. Because if I let it run without changing the search topic it keeps going. Can anyone help?

Here is the code of the main window:

public ObservableCollection<string> PositiveTweets = new ObservableCollection<string>();
public ObservableCollection<string> NegativeTweets = new ObservableCollection<string>();

public MainWindow()
{
    InitializeComponent();
    FocusManager.SetFocusedElement(this, SearchTextBox);

    PositiveListBox.ItemsSource = PositiveTweets;
    NegativeListBox.ItemsSource = NegativeTweets;

    var keywords = Keywords.Create(new List<string> {"cool"}, new List<string> {"sucks"});
    var sa = new SentimentAnalysis(keywords);

    var topics = Observable
        .FromEventPattern<TextChangedEventArgs>(SearchTextBox, "TextChanged")
        .Select(e => ((TextBox) e.Sender).Text)
        .Throttle(TimeSpan.FromSeconds(1));

    var tweets = topics
        .Select(Twitter.AllTweetsAbout)
        .Switch()
        .Select(sa.Score)
        .Publish();

    tweets.Connect();

    var format = new Func<string, ScoredTweet, string>((topic, st) => String.Format("[user: @{0} | topic: {1} | score: {3}]\r\n{2}\r\n", st.Tweet.User, topic, st.Tweet.Text, st.Score));

    var addToList = new Action<string, ObservableCollection<string>>((item, list) =>
    {
        if (list.Count == 4)
            list.RemoveAt(3);
        list.Insert(0, item);
    });

    tweets
        .Where(x => x.Score >= 0)
        .Sample(TimeSpan.FromSeconds(1))
        .ObserveOnDispatcher()
        .Subscribe(x => addToList(format(x.Tweet.Topic, x), PositiveTweets));

    tweets
        .Where(x => x.Score < 0)
        .Sample(TimeSpan.FromSeconds(1))
        .ObserveOnDispatcher()
        .Subscribe(x => addToList(format(x.Tweet.Topic, x), NegativeTweets));
}

This is the XAML code:

<StackPanel Margin="10">
    <DockPanel>
        <Label DockPanel.Dock="Left" Content="Search:" Margin="0,0,10,0" FontSize="20"/>
        <TextBox Name="SearchTextBox" FontSize="20" Focusable="True"/>
    </DockPanel>
    <Label Content="positive" FontSize="20"/>
    <ListBox Name="PositiveListBox" Height="250" FontSize="16"/>
    <Label Content="negative" FontSize="20"/>
    <ListBox Name="NegativeListBox" Height="250" FontSize="16"/>
</StackPanel>

This is how the IObservable is created:

readonly static SingleUserAuthorizer Auth = new SingleUserAuthorizer
{
    CredentialStore = new InMemoryCredentialStore
    {
        ConsumerKey = ConfigurationManager.AppSettings["consumerKey"],
        ConsumerSecret = ConfigurationManager.AppSettings["consumerSecret"],
        OAuthToken = ConfigurationManager.AppSettings["authtoken"],
        OAuthTokenSecret = ConfigurationManager.AppSettings["authtokensecret"],
    }
};

public static IObservable<Tweet> AllTweetsAbout(string topic)
{
    return Observable.Create<Tweet>(o =>
    {
        var twitterCtx = new TwitterContext(Auth);

        var query = from s in twitterCtx.Streaming
            where s.Type == StreamingType.Filter &&
                    s.Track == topic
            select s;

        var disposed = false;

        query.StartAsync(s =>
        {
            if(disposed)
                s.CloseStream();
            else
                o.OnNext(Tweet.Parse(s.Content, topic));

            return Task.FromResult(true);
        });

        return Disposable.Create(() => disposed = true);
    });
}

Finally the Sentiment Analysis:

public class ScoredTweet
{
    public Tweet Tweet { get; set; }
    public int Score { get; set; }
}

public class SentimentAnalysis
{
    private readonly Keywords _keywords;

    public SentimentAnalysis(Keywords keywords)
    {
        _keywords = keywords;
    }

    public ScoredTweet Score(Tweet tweet)
    {
        return new ScoredTweet
        {
            Tweet = tweet,
            Score = _keywords.Positive.Count(x => tweet.Text.Contains(x)) - _keywords.Negative.Count(x => tweet.Text.Contains(x))
        };
    }
}

public class Keywords
{
    public List<string> Positive { get; private set; }
    public List<string> Negative { get; private set; }

    public static Keywords Create(List<string> positive, List<string> negative)
    {
       return new Keywords
        {
            Positive = positive,
            Negative = negative
        }; 
    }
}

Solution

  • I found the solution to the problem. The Twitter class has to be not static and the twitter context should only be created on creation of the twitter class.

    In the code that I posted the AllTweetsAbout method is static and the twitter context is created everytime that method is called. This doesn't work, probably because it gets blocked somehow by the Twitter Api when to many sign in operations occurr in a given time or something like that.