Search code examples
c#.netxsockets.net

Synchronising a collection over a websocket connection


I'm working on a client-server system at the moment, and I'm trying to get a collection to synchronise across a websocket. Everything is in C# + .Net 4.5, and I was wondering if there was a particular best practise for synchronising data over a websocket. It's a one way sync:

Server: BindingCollection< MyClass > ----- Websocket -----> Client: BindingCollection< MyClass >

The collection could be up to 1000 objects with 20 fields each so sending the whole lot each time seems a little wasteful.


Solution

  • I would use a observer pattern and only send the changed object to be synced.

    So I finally took the time to write a small example. I am using a in-memory generic repository that invokes events on changes. The changes is then sent to all clients so that you do not have to send the complete list/collection.

    A simple model to monitor

    using System;
    
    namespace SynchronizingCollection.Common.Model
    {
        public class MyModel
        {
            public Guid Id { get; set; }
            public string Name { get; set; }
            public int Age { get; set; }
        }
    }
    

    A Generic Repository

    Notice the event OnChange that is called when something is added/updated/removed. The event is "subscribed" to in a XSockets long running controller (a singleton) See the "RepoMonitor" class

    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    
    namespace SynchronizingCollection.Server.Repository
    {
        /// <summary>
        /// A static generic thread-safe repository for in-memory storage
        /// </summary>
        /// <typeparam name="TK">Key Type</typeparam>
        /// <typeparam name="T">Value Type</typeparam>
        public static class Repository<TK, T>
        {
            /// <summary>
            /// When something changes
            /// </summary>
            public static event EventHandler<OnChangedArgs<TK,T>> OnChange;
    
            private static ConcurrentDictionary<TK, T> Container { get; set; }
    
            static Repository()
            {
                Container = new ConcurrentDictionary<TK, T>();
            }
    
            /// <summary>
            /// Adds or updates the entity T with key TK
            /// </summary>
            /// <param name="key"></param>
            /// <param name="entity"></param>
            /// <returns></returns>
            public static T AddOrUpdate(TK key, T entity)
            {
                var obj = Container.AddOrUpdate(key, entity, (s, o) => entity);
                if(OnChange != null)
                    OnChange.Invoke(null,new OnChangedArgs<TK, T>(){Key = key,Value = entity, Operation =  Operation.AddUpdate});
                return obj;
            }
    
            /// <summary>
            /// Removes the entity T with key TK
            /// </summary>
            /// <param name="key"></param>
            /// <returns></returns>
            public static bool Remove(TK key)
            {
                T entity;
                var result = Container.TryRemove(key, out entity);
                if (result)
                {
                    if (OnChange != null)
                        OnChange.Invoke(null, new OnChangedArgs<TK, T>() { Key = key, Value = entity, Operation = Operation.Remove});
                }
                return result;
            }
    
            /// <summary>
            /// Removes all entities matching the expression f
            /// </summary>
            /// <param name="f"></param>
            /// <returns></returns>
            public static int Remove(Func<T, bool> f)
            {
                return FindWithKeys(f).Count(o => Remove(o.Key));
            }        
    
            /// <summary>
            /// Find all entities T matching the expression f
            /// </summary>
            /// <param name="f"></param>
            /// <returns></returns>
            public static IEnumerable<T> Find(Func<T, bool> f)
            {
                return Container.Values.Where(f);
            }
    
            /// <summary>
            /// Find all entities T matching the expression f and returns a Dictionary TK,T
            /// </summary>
            /// <param name="f"></param>
            /// <returns></returns>
            public static IDictionary<TK, T> FindWithKeys(Func<T, bool> f)
            {
                var y = from x in Container
                        where f.Invoke(x.Value)
                        select x;
                return y.ToDictionary(x => x.Key, x => x.Value);
            }
    
            /// <summary>
            /// Returns all entities as a Dictionary TK,T
            /// </summary>
            /// <returns></returns>
            public static IDictionary<TK, T> GetAllWithKeys()
            {
                return Container;
            }
    
            /// <summary>
            /// Returns all entities T from the repository
            /// </summary>
            /// <returns></returns>
            public static IEnumerable<T> GetAll()
            {
                return Container.Values;
            }
    
            /// <summary>
            /// Get a single entity T with the key TK
            /// </summary>
            /// <param name="key"></param>
            /// <returns></returns>
            public static T GetById(TK key)
            {
                return Container.ContainsKey(key) ? Container[key] : default(T);
            }
    
            /// <summary>
            /// Get a single entity T as a KeyValuePair TK,T with the key TK
            /// </summary>
            /// <param name="key"></param>
            /// <returns></returns>
            public static KeyValuePair<TK, T> GetByIdWithKey(TK key)
            {
                return Container.ContainsKey(key) ? new KeyValuePair<TK, T>(key, Container[key]) : new KeyValuePair<TK, T>(key, default(T));
            }
    
            /// <summary>
            /// Checks if the repository has a key TK
            /// </summary>
            /// <param name="key"></param>
            /// <returns></returns>
            public static bool ContainsKey(TK key)
            {
                return Container.ContainsKey(key);
            }
        }
    }
    

    Event argument and an enum to know what change just happend

    using System;
    
    namespace SynchronizingCollection.Server.Repository
    {
        /// <summary>
        /// To send changes in the repo
        /// </summary>
        /// <typeparam name="TK"></typeparam>
        /// <typeparam name="T"></typeparam>
        public class OnChangedArgs<TK,T> : EventArgs
        {
            public Operation Operation { get; set; }
            public TK Key { get; set; }
            public T Value { get; set; }
        }
    }
    
    namespace SynchronizingCollection.Server.Repository
    {
        /// <summary>
        /// What kind of change was performed
        /// </summary>
        public enum Operation
        {
            AddUpdate,
            Remove
        }
    }
    

    The Controller that send changes to the clients...

    using System;
    using SynchronizingCollection.Common.Model;
    using SynchronizingCollection.Server.Repository;
    using XSockets.Core.XSocket;
    using XSockets.Core.XSocket.Helpers;
    using XSockets.Plugin.Framework;
    using XSockets.Plugin.Framework.Attributes;
    
    namespace SynchronizingCollection.Server
    {
        /// <summary>
        /// Long running controller that will send information to clients about the collection changes
        /// </summary>
        [XSocketMetadata(PluginRange = PluginRange.Internal, PluginAlias = "RepoMonitor")]
        public class RepositoryMonitor : XSocketController
        {
            public RepositoryMonitor()
            {
                Repository<Guid, MyModel>.OnChange += RepositoryOnChanged;
            }
    
            private void RepositoryOnChanged(object sender, OnChangedArgs<Guid, MyModel> e)
            {
                switch (e.Operation)
                {
                    case Operation.Remove:
                        this.InvokeTo<Demo>(p => p.SendUpdates, e.Value,"removed");
                    break;                    
                    case Operation.AddUpdate:
                        this.InvokeTo<Demo>(p => p.SendUpdates, e.Value, "addorupdated");
                    break;                    
                }
            }       
        }
    }
    

    The XSockets controller that clients call to add/remove/update the collection.

    using System;
    using SynchronizingCollection.Common.Model;
    using SynchronizingCollection.Server.Repository;
    using XSockets.Core.XSocket;
    
    namespace SynchronizingCollection.Server
    {
        public class Demo : XSocketController
        {
            public bool SendUpdates { get; set; }
    
            public Demo()
            {
                //By default all clients get updates
                SendUpdates = true;
            }
    
            public void AddOrUpdateModel(MyModel model)
            {
                Repository<Guid, MyModel>.AddOrUpdate(model.Id, model);
            }
    
            public void RemoveModel(MyModel model)
            {
                Repository<Guid, MyModel>.Remove(model.Id);
            }        
        }
    }
    

    And a demo client in C# that adds and removed 10 different objects... But it would be easy to use the JavaScript API as well. Especially with knockoutjs for manipulating the collection on the client.

    using System;
    using System.Threading;
    using SynchronizingCollection.Common.Model;
    using XSockets.Client40;
    
    namespace SynchronizingCollection.Client
    {
        class Program
        {
            static void Main(string[] args)
            {
                var c = new XSocketClient("ws://127.0.0.1:4502","http://localhost","demo");
    
                c.Controller("demo").OnOpen += (sender, connectArgs) => Console.WriteLine("Demo OPEN");
    
                c.Controller("demo").On<MyModel>("addorupdated", model => Console.WriteLine("Updated " + model.Name));
                c.Controller("demo").On<MyModel>("removed", model => Console.WriteLine("Removed " + model.Name));
    
                c.Open();
    
                for (var i = 0; i < 10; i++)
                {
                    var m = new MyModel() {Id = Guid.NewGuid(), Name = "Person Nr" + i, Age = i};
                    c.Controller("demo").Invoke("AddOrUpdateModel", m);
    
                    Thread.Sleep(2000);
    
                    c.Controller("demo").Invoke("RemoveModel", m);
                    Thread.Sleep(2000);
                }
    
                Console.ReadLine();
            }
        }
    }
    

    You can download the project from my dropbox: https://www.dropbox.com/s/5ljbedovx6ufkww/SynchronizingCollection.zip?dl=0

    Regards Uffe