Search code examples
azure-service-fabric

What impact does changing a IReliableQueue to a IReliableConcurrentQueue have in an existing deployment?


I am working in a Service Fabric application that uses IReliableQueue. For the uses cases of this system, the IReliableConcurrentQueue makes sense to use and some local testing (i.e. basically by just changing the code to use IReliableConcurrentQueue instead of IReliableQueue - queue name does not change) shows great performance improvements. However, I am worried about the impact of changing this in a production system (i.e. upgrading). I can't find any docs or online questions (unless I just missed them) about these considerations. For example, in this system, the existing IReliableQueue will almost always have items. So what happens to that data when I upgrade the SF application? Will it be available to dequeue in the IReliableConcurrentQueue? Or would data be lost? I know I can "just try it" but wanted to see if someone out there had done the same or could offer pointers to existing resources. Thanks!


Solution

  • Sorry for a late answer (that you probably don't need anymore but still).

    When we calling GetOrAddAsync method on IReliableStateManager we aren't retrieving the interface to store values - we actually creating an instance of reliable collection. This basically means that type of the interface we specify is very important.

    Taking this into account if we do this:

    Service v. 1.0

    // Somewhere in RunAsync for example
    await this.StateManager.GetOrAddAsync<IReliableQueue<long>>("MyCollection")
    

    Then doing this in the next version:

    Service v. 1.1

    // Somewhere in RunAsync for example
    await this.StateManager.GetOrAddAsync<IReliableConcurrentQueue<long>>("MyCollection")
    

    will throw an exception:

    Returned reliable object of type Microsoft.ServiceFabric.Data.Collections.DistributedQueue`1[System.Int64] cannot be casted to requested type Microsoft.ServiceFabric.Data.Collections.IReliableConcurrentQueue`1[System.Int64]

    and then:

    System.ExecutionEngineException: 'Exception of type 'System.ExecutionEngineException' was thrown.'

    The above exception looks like a bug so I have filled one.


    UPDATE 2019.06.28

    It turned out that appearance of System.ExecutionEngineException isn't a bug but rather an undocumented behavior of Environment.FailFast method in combination with Visual Studio debugger.

    Please see my comment to the above issue.


    This is what would happen.

    There are plenty ways to overcome this.

    Here is the most obvious one:

    Example

    var migrate = false; // This flag indicates whether the migration was already done.
    var migrateValues = new List<long>();
    
    var applicationFlags = await this.StateManager
        .GetOrAddAsync<IReliableDictionary<string, bool>>("application-flags");
    using (var transaction = this.StateManager.CreateTransaction())
    {
        var flag = await applicationFlags
            .TryGetValueAsync(transaction, "queue-to-concurrent-queue-migration");
        if (!flag.HasValue || !flag.Value)
        {
            var queue = await this.StateManager
                .GetOrAddAsync<IReliableQueue<long>>("value-collection");
            for (;;)
            {
                var c = await queue.TryDequeueAsync(transaction);
                if (!c.HasValue)
                {
                    break;
                }
    
                migrateValues.Add(c.Value);
            }
            migrate = true;
        }
    }
    
    if (migrate)
    {
        await this.StateManager.RemoveAsync("value-collection");
    
        using (var transaction = this.StateManager.CreateTransaction())
        {
            var concurrentQueue = await this.StateManager
                .GetOrAddAsync<IReliableConcurrentQueue<long>>("value-collection");
    
            foreach (var i in migrateValues)
            {
                await concurrentQueue.EnqueueAsync(transaction, i);
            }
    
            await applicationFlags.AddOrUpdateAsync(
                transaction,
                "queue-to-concurrent-queue-migration",
                true,
                (s, b) => true);
        }
        await transaction.CommitAsync();
    }
    

    Please note that this code is just an illustrative example and should be properly tested before applying it to real life application.