Search code examples
c#parallel-processingqueuetask

Execution of parallel tasks while waiting for selected ones in c#


I have built a MQTT client that listens for certain status data. For each message I run a method which can take a while (up to 1 second). Since a lot of messages can arrive at once, I want to run the whole thing in parallel. My problem now is, when I receive a message belonging to topic A, I want to make sure that the previous task belonging to topic A has already finished before I start the new one. But I also need to be able to receive new messages during the time I am waiting for Task A to finish and add them to the queue if necessary. Of course, if the new message belongs to topic B, I don't care about the status of task A and I can run this method call in parallel.

In my mind, this is solved with a kind of dictionary that has different queues.


Solution

  • What about to use a lock on an object related to the topic? When a new item come in the system you could retrieve/create a lock object from a ConcurrentDictionary and then you could use this object to lock the execution. something like this.

    static ConcurrentDictionary<string,object> _locksByCategory = 
        new ConcurrentDictionary<string,object>();
    
    async void ProcessItem(ItemType item) {
        var lockObject = _locksByCategory(item.Category, new object(), (k, o) => o);
        lock (lockObject) {
            // your code
        }
    }
    

    This isn't a production ready solution but could help to start with.