I have two micro services with rabbitmq and masstransit v8 latest version that I send request from micro A to micro B and get data from database then response the result. In application every thing work great but in integration test with inmemorytestharness I get this error MassTransit.RequestFaultException : The EventBus.Messages.Contracts.Services.Discounts.Currencies.Query.GetCurrencyByName.GetCurrencyByNameRequest request faulted: Cannot access a disposed object.
Message for request:
public class GetCurrencyByNameRequest
{
public string Name { get; set; }
}
Message for response:
public class GetCurrencyByNameResponse
{
public string Id { get; set; }
public string Name { get; set; }
}
Consumer:
public class GetCurrencyByNameConsumer : IConsumer<GetCurrencyByNameRequest>
{
private readonly IUnitOfWork _uow;
public GetCurrencyByNameConsumer(IUnitOfWork uow)
{
_uow = uow;
}
public async Task Consume(ConsumeContext<GetCurrencyByNameRequest> context)
{
var filter = Builders<Currency>.Filter.Eq(x => x.CurrencyName, new CurrencyName(context.Message.Name));
var currency = await _uow.GenericRepository<Currency>()
.GetSingleDocumentByFilterAsync(filter, context.CancellationToken);
if (currency == null)
{
await context.RespondAsync(new GetCurrencyByNameResponse
{
Id = string.Empty,
Name = string.Empty
});
}
else
{
await context.RespondAsync(new GetCurrencyByNameResponse
{
Id = currency.Id,
Name = currency.CurrencyName
});
}
}
}
Masstransit configuration in micro A:
services.AddMassTransit(x =>
{
var entryAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(x => x.FullName.Contains("Payment.Application"));
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumers(entryAssembly);
x.UsingRabbitMq((context, cfg) =>
{
//cfg.Host(new Uri(configuration["RabbitMqSettings:Uri"]));
cfg.Host(configuration["RabbitMqSettings:Host"], configuration["RabbitMqSettings:VirtualHost"], h =>
{
h.Username(configuration["RabbitMqSettings:Username"]);
h.Password(configuration["RabbitMqSettings:Password"]);
});
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseInMemoryOutbox();
cfg.ConfigureEndpoints(context);
});
});
Masstransit configuration in micro B
services.AddMassTransit(x =>
{
var entryAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(x => x.FullName.Contains("Discount.Application"));
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumers(entryAssembly);
x.UsingRabbitMq((context, cfg) =>
{
// cfg.Host(new Uri(configuration["RabbitMqSettings:Uri"]));
cfg.Host(configuration["RabbitMqSettings:Host"], configuration["RabbitMqSettings:VirtualHost"], h =>
{
h.Username(configuration["RabbitMqSettings:Username"]);
h.Password(configuration["RabbitMqSettings:Password"]);
});
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseInMemoryOutbox();
cfg.ConfigureEndpoints(context);
});
});
Masstransit inmemorytestharness configuration
public static InMemoryTestHarness TestHarness { get; set; } = default!;
public static IBus TestHarnessBus { get; set; } = default!;
public static async Task UseHarnessAsync()
{
await using var provider = new ServiceCollection()
.AddScoped<IUnitOfWork, UnitOfWork>()
.AddMassTransitInMemoryTestHarness(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumer<GetCurrencyByNameConsumer>();
// .Endpoint(x=>x.Name = "get-currency-by-name-request");
x.AddConsumerTestHarness<GetCurrencyByNameConsumer>();
})
.AddGenericRequestClient()
.BuildServiceProvider(true);
TestHarness = provider.GetRequiredService<InMemoryTestHarness>();
await TestHarness.Start().ConfigureAwait(false);
TestHarnessBus = provider.GetRequiredService<IBus>();
}
IntegrationTest
[Test]
public async Task CreatePackageCommand_InsertToDb_ReturnPackage()
{
IRequestClient<GetCurrencyByNameRequest>? client = TestHarnessBus.CreateRequestClient<GetCurrencyByNameRequest>();
await Task.Delay(20000);
var req = new GetCurrencyByNameRequest
{
Name = "Dollar"
};
//error occurs in this line
var response = await client.GetResponse<GetCurrencyByNameResponse>(req);
(await TestHarness.Sent.Any<GetCurrencyByNameRequest>()).Should().BeTrue();
(await TestHarness.Consumed.Any<GetCurrencyByNameResponse>()).Should().BeTrue();
var consumerHarness = GetConsumerTestHarness<GetCurrencyByNameConsumer>();
(await consumerHarness.Consumed.Any<GetCurrencyByNameRequest>()).Should().BeTrue();
var command = new CreatePackageCommand
{
IsActive = true,
Description = new Translation
{
Arabic = StringHelper.RandomString(10),
English = StringHelper.RandomString(10),
Persian = StringHelper.RandomString(10),
},
Sort = 1,
Name = new Translation
{
Arabic = StringHelper.RandomString(10),
English = StringHelper.RandomString(10),
Persian = StringHelper.RandomString(10),
},
Price = 999,
CurrencyName = "Dollar"
};
var packageId = await SendAsync(command);
}
Well.
public static async Task UseHarnessAsync()
{
await using var provider = new ServiceCollection()
As soon as that method exits, it's going to dispose of the service provider.