We've written some routing slips within MassTransit and currently, it's all using in-memory broker so that we can test it all which is perfectly fine. However, when we move it to using Azure Service Bus when the routing slip is executed then the process just log out trying to create topics.
Below is a sample code for the in-memory which works.
using MassTransit;
using Contracts;
var build = Host.CreateDefaultBuilder(args)
.ConfigureLogging(builder => builder.SetMinimumLevel(LogLevel.Debug))
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumer<StartMessageConsumer>();
x.AddExecuteActivity<Activity1, Activity1Args>();
x.AddExecuteActivity<Activity2, Activity2Args>();
x.UsingInMemory((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
}).Build();
var task = build.RunAsync();
await Task.Delay(1000);
Console.ReadKey(false);
var publishEndpoint = build.Services.GetRequiredService<IBus>();
Console.WriteLine("Message pub");
await publishEndpoint.Publish(new StartMessage());
await task;
public class StartMessageConsumer : IConsumer<StartMessage>
{
public async Task Consume(ConsumeContext<StartMessage> context)
{
Console.WriteLine("StartMessageConsumer");
var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity("Activity1", new Uri("topic:activity1_execute"));
builder.AddActivity("Activity2", new Uri("topic:activity2_execute"));
var routingSlip = builder.Build();
await context.Execute(routingSlip);
}
}
public class Activity1 : IExecuteActivity<Activity1Args>
{
public async Task<ExecutionResult> Execute(ExecuteContext<Activity1Args> context)
{
await Task.Delay(500);
Console.WriteLine("Done Activity 1");
return context.Completed();
}
}
public class Activity2 : IExecuteActivity<Activity2Args>
{
public async Task<ExecutionResult> Execute(ExecuteContext<Activity2Args> context)
{
await Task.Delay(500);
Console.WriteLine("Done Activity 2");
return context.Completed();
}
}
namespace Contracts
{
public record Activity2Args;
public record StartMessage;
public record Activity1Args;
}
Then if we change the AddMassTransit
to be the following:
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumer<StartMessageConsumer>();
x.AddExecuteActivity<Activity1, Activity1Args>();
x.AddExecuteActivity<Activity2, Activity2Args>();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.Host("Endpoint=sb://test-test-test-test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxx");
cfg.ConfigureEndpoints(context);
});
});
We get the following output in the console from the logs:
info: MassTransit[0]
Bus started: sb://test-test-test-test.servicebus.windows.net/
Message pub
dbug: MassTransit[0]
Create send transport: sb://test-test-test-test.servicebus.windows.net/Contracts/StartMessage?type=topic
dbug: MassTransit[0]
Topic: Contracts/StartMessage ()
dbug: MassTransit.Messages[0]
SEND sb://test-test-test-test.servicebus.windows.net/Contracts/StartMessage?type=topic 68230000-5877-2c33-fdb9-08db58732b38 Contracts.StartMessage
StartMessageConsumer
dbug: MassTransit[0]
Create send transport: sb://test-test-test-test.servicebus.windows.net/activity1_execute?type=topic
dbug: MassTransit[0]
Create topic: activity1_execute
dbug: MassTransit[0]
Create topic: activity1_execute
dbug: MassTransit[0]
Create topic: activity1_execute
dbug: MassTransit[0]
Create topic: activity1_execute
dbug: MassTransit[0]
Create topic: activity1_execute
dbug: MassTransit[0]
Create topic: activity1_execute
dbug: MassTransit[0]
Create topic: activity1_execute
dbug: MassTransit[0]
Create topic: activity1_execute
dbug: MassTransit[0]
Create topic: activity1_execute
dbug: MassTransit[0]
Create topic: activity1_execute
dbug: MassTransit[0]
Create topic: activity1_execute
dbug: MassTransit[0]
Create topic: activity1_execute
dbug: MassTransit[0]
Create topic: activity1_execute
dbug: MassTransit[0]
Create topic: activity1_execute
dbug: MassTransit[0]
Create topic: activity1_execute
This seems to work with exchange:
for RabbitMQ but Azure Service Bus seems to try to create a topic continuously. Is this support or is it because it's Azure Service Bus?
You don't send messages to topics, you send them to queues.
builder.AddActivity("Activity1", new Uri("queue:activity1_execute"));
builder.AddActivity("Activity2", new Uri("queue:activity2_execute"));