Search code examples
c#sql-serverazureazure-webjobsazure-queues

How to modify connection string based on queue message in Azure WebJobs?


I have an Azure WebJob that needs to change which DB it connects to based on the queue message it is processing:

This queue message needs to access database CustomerOneDb:

{"FormId":1, "DbName":"CustomerOneDb"}

This queue message needs to access database CustomerTwoDb:

{"FormId":2, "DbName":"CustomerTwoDb"}

I assign the connection string when the DBContext is added, and I want to make it dynamic:

hostBuilder.ConfigureServices(services =>
{
    services.AddDbContext<DbContext.MyDbContext>(
        (serviceProvider, cfg) =>
        {
            if (!cfg.IsConfigured)
            {
                var cnString = "server=localhost;Initial Catalog={DbName};Integrated Security=True;";

                // TODO: Get DB name from queue message
                const string dbName = "CustomerOneDb";

                cnString = cnString.Replace("{DbName}", dbName);

                cfg.UseSqlServer(cnString);
            }
        }
    );
});

I haven't found any way to access the queue message from the context that calls UseSqlServer, so I'm looking for places where I can access the queue message, update some sort of variable, and then access that from inside AddDbContext.

FAILED ATTEMPT #1: Get it from the standard queue handler.

[FunctionName("ProcessQueueMessage")]
public async Task ProcessQueueMessage([QueueTrigger("my-queue")] QueueMessage queueMessage)
{
  // Process queue message
  // This runs *after* the code that calls UseSqlServer, so it's too late
}

FAILED ATTEMPT #2: Intercept the queue message by implementing IQueueProcessorFactory:

public class QueueData
{
    public string DatabaseName { get; set; } = string.Empty;
}

public class MyQueueProcessorFactory : Microsoft.Azure.WebJobs.Host.Queues.IQueueProcessorFactory
{
    private readonly IServiceProvider _serviceProvider;

    public MyQueueProcessorFactory(IServiceProvider serviceProvider)
    {
        _serviceProvider = serviceProvider;
    }

    public QueueProcessor Create(QueueProcessorOptions context)
    {
        return new MyQueueProcessor(_serviceProvider, context);
    }
}

public class MyQueueProcessor : QueueProcessor
{
    private readonly IServiceProvider _serviceProvider;

    public MyQueueProcessor(IServiceProvider serviceProvider, QueueProcessorOptions context) : base(context)
    {
        _serviceProvider = serviceProvider;
        Context = context;
    }

    internal QueueProcessorOptions Context { get; private set; }

    protected override Task<bool> BeginProcessingMessageAsync(QueueMessage message, CancellationToken cancellationToken)
    {
        var queueData = _serviceProvider.GetRequiredService<QueueData>();
        queueData.DatabaseName = "CustomerOneDb"; // TODO: Set variable based on value inside queue message

        return base.BeginProcessingMessageAsync(message, cancellationToken);
    }
}

QueueData is a scoped service:

hostBuilder.ConfigureServices(services =>
{
    // ... other config omitted
    services.AddSingleton<IQueueProcessorFactory, MyQueueProcessorFactory>();
    services.AddScoped<QueueData>();
});

The function BeginProcessingMessageAsync runs before my call to UseSqlServer, which is great! However, when I retrieve the instance of QueueData later, it is always an empty string:

services.AddDbContext<DbContext.MyDbContext>(
    (serviceProvider, cfg) =>
    {
        if (!cfg.IsConfigured)
        {
            // ...
            var queueData = serviceProvider.GetRequiredService<QueueData>();
            var dbName = queueData.DatabaseName; // Always an empty string (unless QueueData is configured as a Singleton)
            // ...
        }
    }
);

FAILED ATTEMPT #3: Change QueueData to be a singleton:

services.AddSingleton<QueueData>();

This works! I can assign the DB name from BeginProcessingMessageAsync and retrieve it from within AddDbContext. BUT... This is not a viable solution because the data is no longer scoped to the message that triggered the invocation. That is, two messages coming in at the same time may compete with each other to set/retrieve the DB name on the same instance.

POSSIBLE SOLUTIONS: I can achieve my goal of dynamically setting the connection string based on info in the queue message if I can achieve any of these:

  • Access the queue message from inside AddDbContext.
  • Adjust the scope of QueueData so that when I assign it a value from BeginProcessingMessageAsync it is accessible later in AddDbContext
  • Change QueueData to hold a collection of DbName values that are identified by a unique ID that I can then look up later from inside AddDbContext. The queue message ID would work but is not accessible from inside AddDbContext.

I'm stumped on how to do any of these options, so I'm asking for help here...


Solution

  • I was able to resolve this by updating the connection string from the queue handler.

    [FunctionName("ProcessQueueMessage")]
    public async Task ProcessQueueMessage([QueueTrigger("my-queue")] QueueMessage queueMessage)
    {
      _repositoryService.SetFecContextDatabase(queueMessage.DatabaseName);
      // ...rest of implementation...
    }
    

    RepositoryService :

    public class RepositoryService : IRepositoryService
    {
        private readonly FecContext _context;
        
        public RepositoryService(FecContext context)
        {
            _context = context;
        }
    
        public void SetFecContextDatabase(string dbName)
        {
            string currentCnString = _context.Database.GetDbConnection().ConnectionString;
            var cnString = currentCnString.Replace("{DbName}", dbName);
            _context.Database.GetDbConnection().ConnectionString = cnString;
        }
    }