I have an Azure function where I want to get the messages from a Service bus Subscription on a Topic, and then resend all the message to another service bus
But when I try this I get all kind of exception and it never gets any of the messages send to the other service bus.
Here is how I do it in my azure function
public PartStatusFunction(ILogger<PartStatusFunction> log, IEnvironmentVariableLoader environmentVariableLoader)
{
_logger = log;
_environmentVariableLoader = environmentVariableLoader;
}
[FunctionName("PartStatusFunction")]
public async Task Run([ServiceBusTrigger("partstatus", "linktestest", Connection = "smartadmin.messagebus.subscription.name")] ServiceBusReceivedMessage message)
{
_logger.LogInformation($"C# ServiceBus topic trigger function processed message: {message}");
try
{
ServiceBusSender serviceBusSender = _environmentVariableLoader.GetServiceBusClient().CreateSender("partstatus");
ServiceBusMessage serviceMessage = new ServiceBusMessage(message);
await serviceBusSender.SendMessageAsync(serviceMessage);
}
catch (Exception e)
{
_logger.LogInformation($"Failed to send message to linktest azure bus: " + message.ToString() + " Exception " + e.ToString());
throw;
}
}
I have created an EnvironmentVariableLoader class that is created in a singleton instance on my startup class
public class Startup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
builder.Services.AddSingleton<IEnvironmentVariableLoader, EnvironmentVariableLoader>();
}
}
The EnvironmentVariableLoader then creates a the ServiceBusClient like this
private ServiceBusClient _serviceBusClient;
public EnvironmentVariableLoader()
{
_serviceBusClient = new ServiceBusClient("Endpoint=xxxxxxxxxxxx",
new ServiceBusClientOptions()
{
TransportType = ServiceBusTransportType.AmqpTcp,
});
}
public ServiceBusClient GetServiceBusClient()
{
return _serviceBusClient;
}
When I try to do this in a console application it works.
_serviceBusClientReceive = new ServiceBusClient("Endpoint=yyyyyyyyy",
new ServiceBusClientOptions()
{
TransportType = ServiceBusTransportType.AmqpTcp
});
_serviceBusClientSend = new ServiceBusClient("Endpoint=xxxxxxxx",
new ServiceBusClientOptions()
{
TransportType = ServiceBusTransportType.AmqpTcp
});
while (true)
{
try
{
var serviceBusReceiver = _serviceBusClientReceive.CreateReceiver("partstatus", "linktestest");
var receivedMessage= await serviceBusReceiver.ReceiveMessageAsync();
ServiceBusSender serviceBusSender = _serviceBusClientSend.CreateSender("partstatus");
ServiceBusMessage serviceMessage = new ServiceBusMessage(receivedMessage);
await serviceBusSender.SendMessageAsync(serviceMessage);
}
catch (Exception e)
{
}
}
Is it not possible to create a connection to another service bus in an Azure Function?
Any help is appreciated.
If you're using Azure Functions, the solution could be significantly simpler than the original suggestion if you use an Azure Service Bus output binding to complement the trigger. In essence, your Functions code becomes a bridge between the incoming queue and the outgoing queue.
public class Functions
{
private readonly ILogger<Functions> logger;
public Functions(ILogger<Functions> logger)
{
this.logger = logger;
}
[ServiceBusAccount("ServiceBusConnection")]
[FunctionName(nameof(PartStatusFunction))]
[return: ServiceBus("%OutputQueue%")]
public Task<ServiceBusMessage> PartStatusFunction([ServiceBusTrigger("%InputQueue%")] ServiceBusReceivedMessage message)
{
logger.LogInformation("Sending message with ID {MessageId} to the destination queue", message.MessageId);
return Task.FromResult(new ServiceBusMessage(message));
}
}
The configuration would require values for the following.
InputQueue
OutputQueue
ServiceBusConnection
Update
In case you need to move messages between two different namespaces, two connection strings are required and the code will look slightly different at the attributes level.
[FunctionName(nameof(PartStatusFunction))]
[return: ServiceBus("%OutputQueue%", Connection = "OutputServiceBusConnection")]
public Task<ServiceBusMessage> PartStatusFunction([ServiceBusTrigger("%InputQueue%", Connection = "InputServiceBusConnection")] ServiceBusReceivedMessage message)
{
logger.LogInformation("Sending message with ID {MessageId} to the destination queue", message.MessageId);
return Task.FromResult(new ServiceBusMessage(message));
}
Note that this is going to work with In-Process SDK. For Isolated Worker SDK, the code will look slightly different but still follow the same idea.