Search code examples
c#asynchronousmvvmtask-parallel-librarytpl-dataflow

Queue async operations to have them running one at a time in a MVVM application


I’m currently building a MVVM application, one of my view models uses a Service registered with dependency injection. This service runs powershell cmdlet or http REST commands against various 3rd party applications which are not very pleased when they receive several request at the same time.

That’s why I’d like to be able to trigger several operations from the UI (without blocking it) but making sure that the service is only processing one at a time. My UI elements will show in the meantime if they are working or waiting.

I tried to implement TPL ActionBlock but so far all my operations run at the same time the only way I found to get them working in queue blocks the UI until all the tasks are done.

Here’s what I've done:

My view model contains an ObservableCollection of elements which contains two lists (one nested in another) On the UI it looks like a list of item which can be expanded to show a small tree view.

What I want is that each time I expand an item all sub items in the tree view go check their status in the 3rd party application through the service. The method in the UI sub item looks like this:

private async Task<bool> UpdateSubItemsStatus()
    {
        foreach (var item in connectorsMenuItems)
        {
            await parent.Library.EnqueueConnectorOperations(Connectors.service.OperationType.CheckPresence, parent.CI, AssetId, item.ConnectorID, parent.ConnectorInterfaces.Single(c => c.ItemId == AssetId).ItemsConnectorPresence.Single(i => i.ConnectorId == item.ConnectorID));
        }
        return true;
    }

Here, "parent" is the 1st level item and "parent.Library" is the main view model hosting everything.

On the View model, the methods that retrieve this is the following:

public async Task EnqueueConnectorOperations(OperationType operationType, ConfigurationItem ci, Guid itemId, Guid ConnectorID, ItemConnectorPresence itemPresence)
    {
        logManager.WriteLog($"Library : Received connector operation for item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
        //Set requestor UI item in working state in the UI
        if(ci.CIType == EnumCIType.Application)
        {
            LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).IsWorking = true;
            LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).Status = LibraryItemState.UpdatingStatus;
            LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).StatusString = "Checking Presence";
        }

        ActionBlock<OperationType> actionBlock = new ActionBlock<OperationType>(async _operationType =>
        {
            logManager.WriteLog($"Library : Sending the operation to connector service : item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
            await connectorService.EnqueueConnectorOperations(operationType, ci, itemId, Settings.Default.ProjectLocalPath + @"\" + ci.ID.ToString(), ConnectorID, Settings.Default.DisplayLanguage, Settings.Default.ProjectLocalPath, itemPresence).ConfigureAwait(false);
        }, new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 1,
            CancellationToken = new CancellationTokenSource(TimeSpan.FromMinutes(5)).Token,
        });

        actionBlock.Post(operationType);
        actionBlock.Complete();
        actionBlock.Completion.Wait();
    }

Then the service named here “connectorService” does it's job.

Here in the last line if I use actionBlock.Completion.Wait() all task runs sequentially the my UI is blocked.

If I use instead await actionBlock.Completion(). The UI isn’t blocked but all runs in parallel.

So if someone would have an advice it would be great!

UPDATE :

I adapted JSteward's anwser to fit my needs :

I declared the ActionBlock as you recommended as a private member of my view model. But when I did as you said when I expended an item, it's operatiosn where queued correctly but if I expanded another item then it's operations (which were also in their queue) were running in parallel of the operations of the first item. Which is not the behaior I expect only one operation at a time no matter how many item are requesting.

So I did the following changes : The ActionBlock is initialized in the viewmodel's constructor once for all :

public ViewModelCtor()
{
 actionBlock = new ActionBlock<ConnectorOperationArgWrapper>(async _connectorOperationArgWrapper =>
            {
                logManager.WriteLog($"Library : Sending the operation to connector service for {_connectorOperationArgWrapper.itemPresence.ItemName} on connector {connectorService.GetConnectorName(_connectorOperationArgWrapper.itemPresence.ConnectorId)}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogLevel.Information);
                LibraryItems.Single(l => l.CI.ID == _connectorOperationArgWrapper.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _connectorOperationArgWrapper.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _connectorOperationArgWrapper.itemPresence.ConnectorId).StatusString = "Cheking Presence";
                LibraryItems.Single(l => l.CI.ID == _connectorOperationArgWrapper.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _connectorOperationArgWrapper.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _connectorOperationArgWrapper.itemPresence.ConnectorId).Status = LibraryItemState.UpdatingStatus;
                await connectorService.EnqueueConnectorOperations(_connectorOperationArgWrapper.operationType, _connectorOperationArgWrapper.ci, _connectorOperationArgWrapper.itemPresence.itemId, Settings.Default.ProjectLocalPath + @"\" + _connectorOperationArgWrapper.ci.ID.ToString(), _connectorOperationArgWrapper.itemPresence.ConnectorId, Settings.Default.DisplayLanguage, Settings.Default.ProjectLocalPath, _connectorOperationArgWrapper.itemPresence).ConfigureAwait(false);
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1,
            });
}

And so the methods call by items which gets expanded now looks like this :

public async Task EnqueueConnectorOperations(ConnectorOperationArgWrapper _args)
    {

        logManager.WriteLog($"Library : Received operation request for {_args.itemPresence.ItemName} on connector {connectorService.GetConnectorName(_args.itemPresence.ConnectorId)}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogLevel.Information);

        if (_args.ci.CIType == EnumCIType.Application)
        {
            LibraryItems.Single(l => l.CI.ID == _args.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _args.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _args.itemPresence.ConnectorId).IsWorking = true;
            LibraryItems.Single(l => l.CI.ID == _args.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _args.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _args.itemPresence.ConnectorId).Status = LibraryItemState.NeedsAttention;
            LibraryItems.Single(l => l.CI.ID == _args.ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == _args.itemPresence.itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == _args.itemPresence.ConnectorId).StatusString = "Waiting";
        }

        logManager.WriteLog($"Library : post actionblock", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogLevel.Information);
        await actionBlock.SendAsync(_args);

        //actionBlock.Complete();
        //await actionBlock.Completion;
    }

I commented the part with actionBlock complete and completion because I want the block to be able to receive and enqueue request at any time and even maybe several times per item.

So far it seems to work, is it right to do as I did or am I going to face some troubles with this ?


Solution

  • Right now you're creating a new ActionBlock for every operation. An ActionBlock has an internal queue you should send messages to and let it run them sequentially using a single ActionBlock. By rearrange things and making the ActionBlock a class member you'll be able to control it much better and await each group of subview items.

    private ActionBlock<OperationType> actionBlock;
    
    public void OnTreeViewExpand()
    {
        //Re-initialize the actionblock for a new set of operations
        actionBlock = new ActionBlock<OperationType>(async _operationType =>
        {
            logManager.WriteLog($"Library : Sending the operation to connector service : item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
            await connectorService.EnqueueConnectorOperations(operationType, ci, itemId, Settings.Default.ProjectLocalPath + @"\" + ci.ID.ToString(), ConnectorID, Settings.Default.DisplayLanguage, Settings.Default.ProjectLocalPath, itemPresence).ConfigureAwait(false);
        }, new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 1,
            CancellationToken = new CancellationTokenSource(TimeSpan.FromMinutes(5)).Token,
        });
    }
    
    private async Task<bool> UpdateSubItemsStatus()
    {
        foreach (var item in connectorsMenuItems)
        {
            await parent.Library.EnqueueConnectorOperations(Connectors.service.OperationType.CheckPresence, parent.CI, AssetId, item.ConnectorID, parent.ConnectorInterfaces.Single(c => c.ItemId == AssetId).ItemsConnectorPresence.Single(i => i.ConnectorId == item.ConnectorID));
        }
    
        //All items sent, signal completion
        actionBlock.Complete();
        await actionBlock.Completion;
        return true;
    }
    
    public Task EnqueueConnectorOperations(OperationType operationType, ConfigurationItem ci, Guid itemId, Guid ConnectorID, ItemConnectorPresence itemPresence)
    {
        logManager.WriteLog($"Library : Received connector operation for item {itemId}, the name of the item is {itemPresence.ItemName}", System.Threading.Thread.CurrentThread.ManagedThreadId.ToString(), LogManagement.LogLevel.Information);
        //Set requestor UI item in working state in the UI
        if (ci.CIType == EnumCIType.Application)
        {
            LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).IsWorking = true;
            LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).Status = LibraryItemState.UpdatingStatus;
            LibraryItems.Single(l => l.CI.ID == ci.ID).DeployableAssetMenuItems.Single(d => d.AssetId == itemId).ConnectorsMenuItems.Single(c => c.ConnectorID == ConnectorID).StatusString = "Checking Presence";
        }
        return actionBlock.SendAsync(operationType);
    }