Search code examples
c#.netazure-functionsazure-durable-functionsdataverse

Using ConcurrentQueue in Azure Durable Function with Dataverse interaction


First of all, I'm new to Durable Functions and don't undestand all concepts of it right now. Especially, the do's and dont's of sharing variables between methods. I'm building a Function that is supposed to run periodically, get some data from Microsoft Dataverse, send it through some logic and update the entity rows. I'm trying to use a Durable Function for cost optimization and because the amount of data to be handled is going to be big, and we've had problems before with long running "basic" Functions.

As far as I understand things, I can't simply pass the data I receive in form of "Entity"s to the ActivityFunctions from my orchestrator, because it's not a basic datatype and can't be (de)serialized. So I'm trying to use a static ConcurrentQueue in the Class that I fill in the orchestrator and dequeue in the activity. As the queue size is getting smaller, I'd assume this part is probably not the best solution possible, but working. I don't know if the activities actually do anything useful. What I do know is that, once the parallel activity calls are done, the Function will hang in the "await Task.WhenAll" without doing anything visibly.

Please help me understand all the things going wrong in my implementation attempt.

    public class StatusSetter
    {
        private readonly IDataverseService _dataverseService;
        private readonly IDataverseClientFactory _dataverseClientFactory;
        public static Dictionary<string, List<Entity>> entities = new Dictionary<string, List<Entity>>();
        public static Guid someGuid = Guid.Empty;
        public static EntitySetting currentSetting;
        public static int cnt = 0;
        public static ConcurrentQueue<Entity> entQueue = new ConcurrentQueue<Entity>();

        public StatusSetter(IDataverseService dataverseService, IDataverseClientFactory dataverseClientFactory)
        {
            _dataverseService = dataverseService;
            _dataverseClientFactory = dataverseClientFactory;
        }
       [Function(nameof(StatusSetter))]
       public async Task RunOrchestrator(
           [OrchestrationTrigger] TaskOrchestrationContext context)
       {
           ILogger logger = context.CreateReplaySafeLogger(nameof(StatusSetter));

           var serviceClient = _dataverseService.GetClient();

           currentSetting = new EntitySetting { logicalname = "contact", columns = new List<string> { "statuscode", "createdon" } };
               await context.CallActivityAsync<bool>(nameof(RetrieveEntity));
// some more of the above with changig settings


           if (!context.IsReplaying)
               foreach (var contact in entities["contact"])
               {
                   entQueue.Enqueue(contact);
               }

           var parallelTasks = new List<Task<string>>();

           int count = entQueue.Count;
           for (int i = 0; i < count; i++)
           {
               Task<string> task = context.CallActivityAsync<string>(nameof(HandleContactBatch), "");
               parallelTasks.Add(task);
           }

           await Task.WhenAll(parallelTasks);

// never reached
       }
        [Function(nameof(RetrieveEntity))]
        public bool RetrieveEntity([ActivityTrigger] FunctionContext executionContext)
        {
            var serviceClient = _dataverseService.GetClient();
            EntitySetting setting = currentSetting;
            entities.Add(setting.logicalname, new List<Entity>());

            EntityCollection collection;
            QueryExpression query = new QueryExpression(setting.logicalname);
            query.ColumnSet = new ColumnSet(setting.columns.ToArray());
            query.PageInfo.Count = 5000;
            query.PageInfo.PageNumber = 1;
            query.PageInfo.PagingCookie = null;
            do
            {
                collection = serviceClient.RetrieveMultiple(query);
                entities[setting.logicalname].AddRange(collection.Entities);
                query.PageInfo.PageNumber += 1;
                query.PageInfo.PagingCookie = collection.PagingCookie;
            }
            while (collection.MoreRecords);

            return true;
        }
        [Function(nameof(HandleContactBatch))]
        public string HandleContactBatch([ActivityTrigger] string dummy, FunctionContext executionContext)
        {
            var serviceClient = _dataverseService.GetClient();
            ILogger logger = executionContext.GetLogger(nameof(HandleContactBatch));

            logger.LogInformation("Start handling " + cnt++ + " remaining " + entQueue.Count); // cnt increases, entQueue.count decreases

            Entity contact;
            bool success = entQueue.TryDequeue(out contact);

            if (!success)
            {
                return "";
            }

// do stuff with serviceClient, don't know if it actually happens, e.g. serviceClient.Update()

Host Builder:

var host = new HostBuilder()
                .ConfigureFunctionsWorkerDefaults()
                .ConfigureCustomFunctionsWorker()
    .Build();

host.Run();

If you need any more code, please let me know!


Solution

  • Summary

    It's hard to re-write your code to make it work, because it's not designed quite right, but I've had a go. More usefully I've tried to describe how you need to re-think it.

    Really you probably need to do some simpler Durable Functions tutorials first, and then have another go at writing it and posting for help. Maybe start with the Microsoft Durable Functions overview documentation^1

    Some things you've misunderstood

    A Durable Orchestration entirely exits every time you await an Activity. This means all your class's member variables, even if they're static, will disappear and lose their values. Taking a snippet of your code:

    [Function(nameof(StatusSetter))]
    public async Task RunOrchestrator([OrchestrationTrigger] TaskOrchestrationContext context)
    {
        var serviceClient = _dataverseService.GetClient();
    
        // So even though you give currentSetting a value here:
        currentSetting = new EntitySetting { logicalname = "contact", columns = new List<string> { "statuscode", "createdon" } };
                
        // ...as soon as you await this activity, the whole Function exits.
        // Your static variables no longer exist.
        await context.CallActivityAsync<bool>(nameof(RetrieveEntity));
    

    So it then exits and calls your RetrieveEntity Activity:

    [Function(nameof(RetrieveEntity))]
    public bool RetrieveEntity([ActivityTrigger] FunctionContext executionContext)
    {
        var serviceClient = _dataverseService.GetClient();
    
        // currentSetting will now be empty,
        // because the Function has exited and then started again.
        EntitySetting setting = currentSetting; // empty.
        entities.Add(setting.logicalname, new List<Entity>());
    

    And worse still: the reverse is true. Your Activity is doing entities.Add into that static variable, on the last line above. But, when your Orchestration resumes, it is a totally fresh version of the application. The static variables will be empty again.

    Put another way: when you resume your Orchestration code, on this line, the entities list will be empty again:

    if (!context.IsReplaying)
    {
        // ...meaning that this "entities" will be empty.
        foreach (var contact in entities["contact"])
        {
            entQueue.Enqueue(contact);
        }
    }
    

    The key point to understand is: no variables will persist between calls to an Activity from and Orchestration, or back again.

    This is why you need to pass objects into your Activity, and then return objects back out again into the Orchestration. There is an example here, which uses strings, but you could equally use any other .NET class that you've written. https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-sequence?tabs=csharp

    TL;DR: you can't use static or member variables to pass data between an Orchestration and its Activities.

    A very pseudocode re-write of your code

    Like I said earlier, it's hard to fix your code because it's just not structured right. But to give you a sense of how to do this, try this:

    namespace MyDurableFunctions
    {
        public class StatusSetter
        {
            private readonly IDataverseService _dataverseService;
    
            public StatusSetter(IDataverseService dataverseService)
            {
                _dataverseService = dataverseService;
            }
    
            [Function(nameof(StatusSetter))]
            public async Task RunOrchestrator([OrchestrationTrigger] TaskOrchestrationContext context)
            {
                var serviceClient = _dataverseService.GetClient();
                var currentSetting = new EntitySetting { logicalname = "contact", columns = new List<string> { "statuscode", "createdon" } };
                
                // We can pass the currentSetting directly into the Activity here.
                // And then also retrieve a strongly-typed object back,
                // rather than the "entities" static you were trying to use.
                var entities = await context.CallActivityAsync<List<NamedEntities>>(nameof(RetrieveEntity), entitySetting);
    
                // It's not clear what you wanted to do next, but it feels like
                // you were trying to pass each of those entity lists to another function;
                // one list at a time?
                // You don't say what this Activity should return, either.
                // But taking some assumptions, perhaps this is what you want:
                var parallelActivities = entities
                    .Select(entityList => context.CallActivityAsync<MyHandleBatchResult>(nameof(HandleContactBatch), entityList))
                    .ToArray();
    
                // Now await all those Activities, in a parallelised operation.
                await Task.WhenAll(parallelActivities);
            }
    
            /// <summary>
            /// Takes an EntitySetting from the Orchestration,
            /// and returns a list of Entity objects to the Orchestration.
            /// </summary>
            [Function(nameof(RetrieveEntity))]
            public bool RetrieveEntity([ActivityTrigger] EntitySetting setting)
            {
                var serviceClient = _dataverseService.GetClient();
    
                var entities = new List<NamedEntities>(); // I invented this type.
                entities.Add(new NamedEntities(setting.logicalname, new List<Entity>()));
    
                // (this part snipped for brevity - but it's the same as your code.)
    
                return entities;
            }
    
            /// <summary>
            /// Takes an Entity list, which originally came from the RetrieveEntity activity,
            /// and returns some result object (that you haven't mentioned, so I'm guessing)
            /// </summary>
            [Function(nameof(HandleContactBatch))]
            public string HandleContactBatch([ActivityTrigger] NamedEntities namedEntity)
            {
                var serviceClient = _dataverseService.GetClient();
    
                // Some dummy example; it's not clear what you want to return here.
                MyHandleBatchResult result = new() { Success = true };
    
                return result;
            }
        }
    
        // Just a small record definition to make things easier to follow.
        public record NamedEntities(string LogicalName, List<Entity> List);
    }
    

    You'll notice it's actually much neater now, too!

    Other things to watch out for

    • I noticed you're checking IsReplaying. That's probably a good sign that you're using Durable Functions in the wrong way (unless you're doing something super advanced.) Outside of the scope of this answer; but for someone who is new to Durable Functions that's a good flag to step back and re-think.

    • You're trying to use traditional concurrent/parallel programming methods like Parallel Tasks, but these aren't what you use for a Durable Functions pattern. Instead, you use the native Durable Functions parallelisation (fan-out, fan-in). I've changed your code to show an example.

    • The above was done to show you how to use Durable Functions, but I had no regard for the Dataverse task itself. I'm assuming you're already OK with that, so you can modify it as required.

    Hope that helps!