I have built a MQTT client that listens for certain status data. For each message I run a method which can take a while (up to 1 second). Since a lot of messages can arrive at once, I want to run the whole thing in parallel. My problem now is, when I receive a message belonging to topic A, I want to make sure that the previous task belonging to topic A has already finished before I start the new one. But I also need to be able to receive new messages during the time I am waiting for Task A to finish and add them to the queue if necessary. Of course, if the new message belongs to topic B, I don't care about the status of task A and I can run this method call in parallel.
In my mind, this is solved with a kind of dictionary that has different queues.
What about to use a lock on an object related to the topic? When a new item come in the system you could retrieve/create a lock object from a ConcurrentDictionary and then you could use this object to lock the execution. something like this.
static ConcurrentDictionary<string,object> _locksByCategory =
new ConcurrentDictionary<string,object>();
async void ProcessItem(ItemType item) {
var lockObject = _locksByCategory(item.Category, new object(), (k, o) => o);
lock (lockObject) {
// your code
}
}
This isn't a production ready solution but could help to start with.