Search code examples
c#.net.net-coreignite

IgniteQueue in Apache Ignite.NET


We are using Ignite.NET and don't have option to use Ignite Java API (team skills, technology affinity etc). We are looking to create a queuing mechanism so that we could process messages in distributed fashion. I found IgniteQueue data structure to be most suitable but it doesn't seem to be available in ignite.net could someone please suggest a solution to the scenario. Multiple producers queue a unique work item to be processed reliably by only 1 consumer at a time.

E.g. there are P1,P2 producers (on different machines) they generate T1,T2,T3 on the queue and we have C1,C2,C3 consumers (on different machines) now T1 should be processed by ONLY 1 from C1,C2,C3 and so on for T2,T3 should also similarly be processed only once by 1 consumer


Solution

  • IgniteQueue is built on top of Ignite Cache, so yes, you can replicate the same functionality in .NET:

    1. Create a cache
    2. Use Continuous Query as a consumer, call ICache.Remove to ensure that every item is processed only once
    3. Add data to cache on producers with Data Streamers or just use ICache.Put / PutAll

    Below is the code for continuous query listener:

    class CacheEventListener<TK, TV> : ICacheEntryEventListener<TK, TV>
    {
        private readonly string _cacheName;
    
        [InstanceResource]  // Injected automatically.
        private readonly IIgnite _ignite = null;
    
        private ICache<TK, TV> _cache;
    
        public CacheEventListener(string cacheName)
        {
            _cacheName = cacheName;
        }
    
        public void OnEvent(IEnumerable<ICacheEntryEvent<TK, TV>> events)
        {
            _cache = _cache ?? _ignite.GetCache<TK, TV>(_cacheName);
    
            foreach (var entryEvent in events)
            {
                if (entryEvent.EventType == CacheEntryEventType.Created && _cache.Remove(entryEvent.Key))
                {
                    // Run consumer logic here - use another thread for heavy processing.
                    Consume(entryEvent.Value);
                }
            }
        }
    }
    

    Then we deploy this to every node with a single call:

    var consumer = new CacheEventListener<Guid, string>(cache.Name);
    var continuousQuery = new ContinuousQuery<Guid, string>(consumer);
    cache.QueryContinuous(continuousQuery);
    

    As a result, OnEvent is called once per entry on the primary node for that entry. So there is one consumer per Ignite node. We can increase effective number of consumers per node by offloading actual consumer logic to other threads, using BlockingCollection and so on.

    And one last thing - we have to come up with a unique cache key for every new entry. Simplest thing is Guid.NewGuid(), but we can also use AtomicSequence.