Search code examples
c#uwpreactiveuireactivexasp.net-dynamic-data

How should I use Rx + DynamicData to periodically check for updates from many online services?


I have a basic Calendar/Agenda app that will list the most recent events from a series of accounts and calendars. For example, let's say I have 3 accounts: two different Microsoft accounts and one Google account. I am currently storing these as a SourceCache<Account, string> called Accounts in a service (AccountsService).

SourceCache<T1,T2> is part of DynamicData… which basically makes Reactive Collections. I want it reactive so that when I add or remove an account, everything in the app (the settings page, the calendar pages, etc) will update automatically.

Now, each account can have multiple Calendars. And for each of those Calendars, I want to download all upcoming CalendarEvents. The catch is that I need to do this at regular intervals to see if new events have been added or if the events have been changed.

Here's how I'm currently doing it, but I'm afraid it's probably really bad Rx.

var calendarSet = this.accountsService.Accounts.Connect()
    .ObserveOn(RxApp.TaskpoolScheduler)
    .TransformMany(x =>
    {
        ReadOnlyObservableCollection<Models.Calendar> subCalendars;
        x.CalendarService.Calendars.Connect()
            .AutoRefreshOnObservable(calendar => calendar.IsEnabledChanged)
            .AutoRefreshOnObservable(calendar => calendar.IsColorChanged)
            .Filter(calendar=>calendar.IsEnabled)
            .Bind(out subCalendars)
            .Subscribe();
        return subCalendars;
     }, x => x.CacheKey)
     .ObserveOnDispatcher()
     .Publish();

calendarSet
    .Bind(out calendars)
    .Subscribe();


var eventSet = calendarSet
    .ObserveOn(RxApp.TaskpoolScheduler)
    .Transform( calendar =>
    {
        var events = new List<Models.CalendarEvent>();
        Debug.WriteLine(calendar.Name);
        calendar.CalendarService.CalendarEventsObservable(calendar).Subscribe(items =>
        {
            events.AddRange(items);
        });
        return events;
    })
    .TransformMany(x => x, x => x.Key)
    .Filter(x => x.EndDateTime > DateTimeOffset.Now)
    .Sort(new Models.CalendarEventSorter())
    .ObserveOnDispatcher()
    .Bind(out calendarEvents)
    .Subscribe();

calendarSet.Connect();

The big part is where the events are also loading through a subscription to an observable as well. That is where I put the timer that lets me control how often to check the online services. It looks like this (20 seconds is only for testing!):

public IObservable<List<Models.CalendarEvent>> CalendarEventsObservable(Models.Calendar calendar)
    {
        var obs = Observable.Interval(TimeSpan.FromSeconds(20)).SelectMany(async x =>
        {
            var items = await GetAllEventsForCalendarAsync(calendar);
            return items;
        });

        return obs;
    }

This seems to work! I can enable/disable certain calendars and the events will appear or disappear from my bound list. I can see that the events are getting updated at regular intervals … and I assume that since I'm using TransformMany with a key tied to the CalenderEvents online id (which is fixed), the newly downloaded events just replace the old ones in the cache. I don't see any flickering on the UI.

**Correction: It seems to work because of a hack I accidentally left in from another trial. On the original accounts SourceCache, I am running a timer that calls Accounts.Refresh(). If I take this out, nothing works.

Is this the correct way to do this? Please, enlighten me... I am struggling a bit with Rx and DynamicData. There are so many operators I don't know what to do with yet.

Thanks!


Solution

  • I have rewritten the code a little and it seems to work a lot better. It's even working without the hack where I manually call Refresh() on the original Account SourceCache.

    I realized I'd been misusing the Publish method. Also, subscribing to an observable in the transform statement wasn't going to work because it was not a DynamicData observable (with changesets). Instead, I decided to do a SubscribeMany to subscribe to all of the CalendarEventObservables from each Calendar and in the subscription's Action logic, I would populate a new SourceCache of CalendarEvents. No events would get deleted, but duplicate events would just overwrite the old ones because of the cache key and I could filter out events from caledars that were not selected.

     private SourceCache<Models.CalendarEvent, string> calendarEventCache;
     public IObservableCache<Models.CalendarEvent, string> CalendarEventCache => calendarEventCache.Connect().AutoRefreshOnObservable(x=>x.Parent.IsEnabledChanged).Filter(x=>x.Parent.IsEnabled).AsObservableCache();
    
    //////////////////
    ///IN CONSTRUCTOR:
    
            calendarEventCache = new SourceCache<Models.CalendarEvent, string>(x => x.Key);
    
            calendarsCache = this.accountsService.Accounts.Connect()
                .ObserveOn(RxApp.TaskpoolScheduler)
                .TransformMany(x =>
                {
                    ReadOnlyObservableCollection<Models.Calendar> subCalendars;
                    x.CalendarService.Calendars.Connect()
                                .AutoRefreshOnObservable(calendar => calendar.IsEnabledChanged)
                                .AutoRefreshOnObservable(calendar => calendar.IsColorChanged)
                                .Filter(calendar => calendar.IsEnabled)
                                .Bind(out subCalendars)
                                .Subscribe();
                    return subCalendars;
                }, x => x.CacheKey)
                .ObserveOnDispatcher()
                .AsObservableCache();
    
            calendarsCache.Connect()
                .ObserveOn(RxApp.TaskpoolScheduler)
                .SubscribeMany(calendar =>
                {
                    return calendar.CalendarService.CalendarEventsObservable(calendar).Subscribe(calendarEvent =>
                   {
                       calendarEventCache.AddOrUpdate(calendarEvent);
                   });
                })
                .Subscribe();
    

    Now, in my viewmodel for the UI, I subscribe to the SourceCache<CalendarEvent,string>;