Search code examples
rabbitmqautofacmasstransit

What is the intended MassTransit middleware construct for setting mult-tenant scope values upon each message received?


I'm using MassTransit with Autofac and need a way to interrogate inbound messages for a tenant Id which will be used to dynamically construct a connection string. So the logical steps would be as follows:

  1. A message is received

  2. The tenant Id is retrieved from the message and a per-message-scoped component is used to set the tenant Id

  3. A message consumer is resolved by the container which uses the per-message-scoped component to construct dependencies accordingly.

    MassTransit has an example of how to create message filters here. At first glance this seems like a potential solution, but it doesn't appear that a new instance of a message filter is resolved using the same lifetime scope as is used to resolve the consumer for each message.

So my question is this: Can message filters be used for this purpose? If so, please provide an example or point to an example showing something similar (e.g. Anything where a DI-container is used which supports scopes for accessing per-message lifetime resources). If not, what if any is the intended way of facilitating this need with MassTransit?


Solution

  • Based on looking at the code, you should be able to create a new ILifetimeScope at the ConsumeContext level, and it will be used by the consumer. An example of the filter logic is included below.

    public class LifetimeScopeFilter :
        IFilter<ConsumeContext>
    {
        LifetimeScopeFilter(ILifetimeScope mainScope)
        {
            _mainScope = mainScope;
        }
    
        public void Probe(ProbeContext context)
        {
            var scope = context.CreateFilterScope("lifetimeScope");
        }
    
        public async Task Send(ConsumeContext context, IPipe<ConsumeContext> next)
        {
            ILifetimeScope scope = null;
    
            try
            {
                context.GetOrAddPayload<ILifetimeScope>(() => scope = _mainScope.BeginLifetimeScope())
    
                await next.Send(context).ConfigureAwait(false);
            }
            finally
            {
                scope?.Dispose();
            }
        }
    }