We are using Ignite.NET and don't have option to use Ignite Java API (team skills, technology affinity etc). We are looking to create a queuing mechanism so that we could process messages in distributed fashion. I found IgniteQueue data structure to be most suitable but it doesn't seem to be available in ignite.net could someone please suggest a solution to the scenario. Multiple producers queue a unique work item to be processed reliably by only 1 consumer at a time.
E.g. there are P1,P2 producers (on different machines) they generate T1,T2,T3 on the queue and we have C1,C2,C3 consumers (on different machines) now T1 should be processed by ONLY 1 from C1,C2,C3 and so on for T2,T3 should also similarly be processed only once by 1 consumer
IgniteQueue is built on top of Ignite Cache, so yes, you can replicate the same functionality in .NET:
ICache.Put
/ PutAll
Below is the code for continuous query listener:
class CacheEventListener<TK, TV> : ICacheEntryEventListener<TK, TV>
{
private readonly string _cacheName;
[InstanceResource] // Injected automatically.
private readonly IIgnite _ignite = null;
private ICache<TK, TV> _cache;
public CacheEventListener(string cacheName)
{
_cacheName = cacheName;
}
public void OnEvent(IEnumerable<ICacheEntryEvent<TK, TV>> events)
{
_cache = _cache ?? _ignite.GetCache<TK, TV>(_cacheName);
foreach (var entryEvent in events)
{
if (entryEvent.EventType == CacheEntryEventType.Created && _cache.Remove(entryEvent.Key))
{
// Run consumer logic here - use another thread for heavy processing.
Consume(entryEvent.Value);
}
}
}
}
Then we deploy this to every node with a single call:
var consumer = new CacheEventListener<Guid, string>(cache.Name);
var continuousQuery = new ContinuousQuery<Guid, string>(consumer);
cache.QueryContinuous(continuousQuery);
As a result, OnEvent
is called once per entry on the primary node for that entry. So there is one consumer per Ignite node. We can increase effective number of consumers per node by offloading actual consumer logic to other threads, using BlockingCollection
and so on.
And one last thing - we have to come up with a unique cache key for every new entry. Simplest thing is Guid.NewGuid()
, but we can also use AtomicSequence
.