I am looking to inject a kafka producer as a singleton in my app. It currently has two steps required when disposing the instance. First, you must flush the buffer, second call dispose. To increase performance, this should only happen when messages are no longer being processed.
My solution for ASP.NET core, is to use the AddSingleton()
method in DI and then use ApplicationLifetime.ApplicationStopping.Register
to register a callback that will flush and dispose the producer. I followed the tutorial found here:https://andrewlock.net/four-ways-to-dispose-idisposables-in-asp-net-core/
putting together a quick test I did the following in my Startup class:
public void ConfigureServices(IServiceCollection services)
{
var producerConfig = new Dictionary<string, object>
{
{ "bootstrap.servers", "192.168.99.100:9092" },
{ "client.id", Dns.GetHostName() },
{ "api.version.request", true }
};
services.AddSingleton(new Producer<Null, string>(producerConfig, null, new StringSerializer(Encoding.UTF8)));
services.AddMvc();
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IApplicationLifetime lifetime)
{
loggerFactory.AddConsole();
app.UseMvc();
app.UseWebSockets();
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
lifetime.ApplicationStopping.Register(flushAndDispose, app.ApplicationServices.GetRequiredService<Producer>());
}
but when it runs I get the following error:
An exception of type 'System.InvalidOperationException' occurred in Microsoft.Extensions.DependencyInjection.Abstractions.dll but was not handled in user code: 'No service for type 'Confluent.Kafka.Producer' has been registered.'
The assumption is also that Producer<T1,T2>
is derived from Producer
you did not explicitly register a Producer
with the service collection so the provider is unaware of how to resolve it.
services.AddSingleton<Producer>(
c => new Producer<Null, string>(producerConfig, null,
new StringSerializer(Encoding.UTF8)));