Search code examples
c#asp.net-coreautofac

ASP.NET Core 3.1 IOptions - Cannot Get Config Initialised From File In Helper class


I have the following helper class that I am using to perform a single asynchronous initialisation task (CreateKafkaTopic) before creating a test server via WebApplicationFactory.

Helper class to create Kafka Topic from Config via method CreateKafkaTopic

using System;
using System.Threading.Tasks;

using Autofac;
using Autofac.Builder;
using Autofac.Extensions.DependencyInjection;
using Extensions.Hosting.AsyncInitialization;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

using WebApp.Data.Serializers.AutofacModule;
using WebApp.Kafka.AutofacModule;
using WebApp.Kafka.Admin;
using WebApp.Kafka.Config;
using WebApp.Mqtt.Config;
using WebApp.S3.Config;


namespace WebApp.Testing.Fixtures
{
    /// <summary>
    /// Core helper testing utilities
    /// </summary>
    public class CoreTestUtils
    {
        /// <summary>
        /// Create a configuration object from appsettings.json file
        /// </summary>
        /// <param name="basePath">Base path for json file</param>
        /// <param name="fileName">Name of the json file</param>
        public static IConfigurationRoot GetIConfigurationRoot(string basePath, string fileName = "appsettings")
        {
            if (basePath == null)
                throw new ArgumentNullException(nameof(basePath));

            return new ConfigurationBuilder()
                .SetBasePath(basePath)
                .AddJsonFile($"{fileName}.json", optional: true)
                .AddEnvironmentVariables()
                .Build();
        }

        /// <summary>
        /// Configure services for logging, signalR and config options
        /// </summary>
        /// <param name="configBasePath">Base path for json file</param>
        /// <param name="fileName">Filename for json file, defaults to 'appsettings'</param>
        /// <returns>Populated service collection</returns>
        public static IServiceCollection ConfigureServices(string configBasePath, string fileName = "appsettings")
        {
            if (configBasePath == null)
                throw new ArgumentNullException(nameof(configBasePath));

            var services = new ServiceCollection();

            // Add configuration from appsettings.json
            IConfigurationRoot _configuration = CoreTestUtils.GetIConfigurationRoot(configBasePath, fileName);
            services.Configure<KafkaConfig>(options =>
                _configuration.GetSection(KafkaConfig.SectionName).Bind(options));
            services.Configure<S3Config>(options =>
                _configuration.GetSection(S3Config.SectionName).Bind(options));
            services.Configure<MqttConfig>(options =>
                _configuration.GetSection(MqttConfig.SectionName).Bind(options));

            // Populate service provider with usual services
            services.AddLogging();
            services.AddSignalR();
            services.AddOptions();
            services.AddSingleton<IKafkaAdminFactory, KafkaAdminFactory>();
            services.AddAsyncInitializer<KafkaAdminService>();

            return services;
        }

        /// <summary>
        /// Create topic on Kafka cluster based on settings in config file
        /// </summary>
        /// <param name="configBasePath">Base path where config file exists</param>
        /// <param name="fileName">The config file that exists in the base path</param>
        /// <exception cref="ArgumentNullException">Thrown if KafkaAdmin service failed to be resolved</exception>
        public static async Task CreateKafkaTopic(string configBasePath, string fileName = "appsettings")
        {
            Console.WriteLine($"Trying to create a Kafka Topic with basePath:{configBasePath} and file:{fileName}");

            // environment variables are set at this point

            var services = ConfigureServices(configBasePath, fileName);

            using (var container = CoreTestUtils.ConfigureContainer(services, new Autofac.Module[] {
                new SerializerModule(),
                new KafkaModule() }))
            {
                using (var scope = container.BeginLifetimeScope())
                {
                    Console.WriteLine("About to create the factory");
                    var factory = scope.Resolve<IKafkaAdminFactory>();
                    Console.WriteLine("Have created the factory");
                    var client = factory.CreateAdminClient();
                    Console.Write($"client is null? => {client == null}");
                    await Task.CompletedTask;
                }
            }
        }


        /// <summary>
        /// Build Autofac container with the modules and services provided
        /// </summary>
        /// <param name="services">Services</param>
        /// <param name="modules">List of Autofac modules</param>
        /// <returns>Populated service collection</returns>
        public static IContainer ConfigureContainer(IServiceCollection services, Autofac.Module[] modules)
        {
            if (services == null)
                throw new ArgumentNullException(nameof(services));

            if (modules == null)
                throw new ArgumentNullException(nameof(modules));

            var containerBuilder = new ContainerBuilder();

            foreach (var module in modules)
            {
                containerBuilder.RegisterModule(module);
            }

            containerBuilder.Populate(services);

            return containerBuilder.Build(ContainerBuildOptions.IgnoreStartableComponents);
        }
    }
}

The initialisation task creates a kafka topic (CreateKafkaTopic) from config, using a service KafkaAdminService (see listing below). The service has a dependency on a factory class (IKafkaAdminFactory) to create a confluent kafka IAdminClient based upon an IOptions dependency.

I have been struggling trying to get the IOptions dependency initialised using my helper class. The IOptions dependency does not seem to be populating my POCO class from file raising an exception that a required section is missing. Does anyone have any ideas why the POCO config is not being initialised from appsettings file by the CreateKafkaTopic of my helper class?

KafkaAdminFactory

 public class KafkaAdminFactory : IKafkaAdminFactory
    {
        private KafkaConfig _Config { get; }
        public KafkaAdminFactory(IOptions<KafkaConfig> options)
        {
            _Config = options.Value ?? throw new ArgumentNullException(nameof(options));
        }

        public IAdminClient CreateAdminClient()
        {
            var adminClientBuilder = new AdminClientBuilder(
                new AdminClientConfig()
                {
                    BootstrapServers = _Config.Consumer.BootstrapServers
                }
            );

            return adminClientBuilder.Build();
        }

KafkaAdminService

using System;
using System.Threading.Tasks;

using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Extensions.Hosting.AsyncInitialization;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

using WebApp.Kafka.Config;


namespace WebApp.Kafka.Admin
{
    /// <summary> Use a factory method to create the client, increased testability </summary>
    public delegate IAdminClient KafkaAdminFactoryD(KafkaConfig config);

    /// <summary>Service to make a request from Kafka to create a topic</summary>
    public class KafkaAdminService : IAsyncInitializer
    {
        private IKafkaAdminFactory _Factory { get; set; }
        private ILogger<KafkaAdminService> _Logger { get; set; }
        private KafkaConfig _Config { get; set; }


        /// <summary>
        /// Retrieve KafkaConfig from appsettings
        /// </summary>
        /// <param name="config">Config POCO from appsettings file</param>
        /// <param name="clientFactory"><see cref="KafkaAdminFactory"/></param>
        /// <param name="logger">Logger instance</param>
        public KafkaAdminService(
            IOptions<KafkaConfig> config,
            IKafkaAdminFactory clientFactory,
            ILogger<KafkaAdminService> logger)
        {
            if (clientFactory == null)
                throw new ArgumentNullException(nameof(clientFactory));

            if (config == null)
                throw new ArgumentNullException(nameof(config));

            _Config = config.Value ?? throw new ArgumentNullException(nameof(config));
            _Factory = clientFactory ?? throw new ArgumentNullException(nameof(clientFactory));
            _Logger = logger ?? throw new ArgumentNullException(nameof(logger));
        }


        /// <summary>
        /// Create a Kafka topic if it does not already exist
        /// </summary>
        /// <exception name="CreateTopicsException">
        /// Thrown for exceptions encountered except duplicate topic
        /// </exception>
        public async Task InitializeAsync()
        {
            using (var client = _Factory.CreateAdminClient())
            {
                try
                {
                    _Logger.LogInformation("Admin service trying to create Kafka Topic...");
                    _Logger.LogInformation($"Topic::{_Config.Topic.Name}, ReplicationCount::{_Config.Topic.ReplicationCount}, PartitionCount::{_Config.Topic.PartitionCount}");
                    _Logger.LogInformation($"Bootstrap Servers::{_Config.Consumer.BootstrapServers}");

                    await client.CreateTopicsAsync(new TopicSpecification[] {
                        new TopicSpecification {
                            Name = _Config.Topic.Name,
                            NumPartitions = _Config.Topic.PartitionCount,
                            ReplicationFactor = _Config.Topic.ReplicationCount
                        }
                    }, null);

                    _Logger.LogInformation($"Admin service successfully created topic {_Config.Topic.Name}");
                }
                catch (CreateTopicsException e)
                {
                    if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
                    {
                        _Logger.LogInformation($"An error occured creating topic {_Config.Topic.Name}: {e.Results[0].Error.Reason}");
                        throw e;
                    }
                    else
                    {
                        _Logger.LogInformation($"Topic {_Config.Topic.Name} already exists");
                    }
                }
            }

            _Logger.LogInformation("Kafka Consumer thread started");

            await Task.CompletedTask;
        }
    }
}

Solution

  • Solved it. I was the wrong file path to my config file and was also reading the settings file as optional. Refactored my code as follows for use to help perform an asynchronous initialisation task when using WebApplicationFactory:

    using System;
    using System.IO;
    using System.Threading.Tasks;
    
    using Autofac;
    using Autofac.Builder;
    using Autofac.Extensions.DependencyInjection;
    using Extensions.Hosting.AsyncInitialization;
    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.DependencyInjection;
    
    using WebApp.Kafka.AutofacModule;
    using WebApp.Kafka.Admin;
    using WebApp.Kafka.Config;
    using WebApp.Mqtt.Config;
    using WebApp.S3.Config;
    
    
    namespace WebApp.Testing.Fixtures
    {
        /// <summary>
        /// Core helper testing utilities
        /// </summary>
        public class CoreTestUtils
        {
            /// <summary>
            /// Create a configuration object from appsettings.json file and environmental variables
            /// </summary>
            /// <param name="basePath">Base path for json file</param>
            /// <param name="fileName">Name of the json file</param>
            /// <param name="optional">True when json config file is mandatory</param>
            public static IConfigurationRoot GetIConfigurationRoot(string basePath, string fileName = "appsettings.json", bool optional = true)
            {
                if (basePath == null)
                    throw new ArgumentNullException(nameof(basePath));
    
                string fullPath = Path.Combine(basePath, $"{fileName}");
                return new ConfigurationBuilder()
                    .AddJsonFile(fullPath, optional: optional)
                    .AddEnvironmentVariables()
                    .Build();
            }
    
            /// <summary>
            /// Configure services for logging, signalR and config options
            /// </summary>
            /// <param name="configBasePath">Base path for json file</param>
            /// <param name="fileName">Filename for json file, defaults to 'appsettings'</param>
            /// <param name="optional">True when json config file is mandatory</param>
            /// <returns>Populated service collection</returns>
            public static IServiceCollection ConfigureServices(string configBasePath, string fileName = "appsettings.json", bool optional = true)
            {
                if (configBasePath == null)
                    throw new ArgumentNullException(nameof(configBasePath));
    
                var services = new ServiceCollection();
    
                // Add configuration from appsettings.json
                IConfigurationRoot _configuration = CoreTestUtils.GetIConfigurationRoot(configBasePath, fileName, optional);
                services.Configure<KafkaConfig>(options =>
                    _configuration.GetSection(KafkaConfig.SectionName).Bind(options));
                services.Configure<S3Config>(options =>
                    _configuration.GetSection(S3Config.SectionName).Bind(options));
                services.Configure<MqttConfig>(options =>
                    _configuration.GetSection(MqttConfig.SectionName).Bind(options));
    
                // Populate service provider with usual services
                services.AddLogging();
                services.AddSignalR();
                services.AddOptions();
                services.AddSingleton<IKafkaAdminFactory, KafkaAdminFactory>();
                services.AddAsyncInitializer<KafkaAdminService>();
    
                return services;
            }
    
            /// <summary>
            /// Create topic on Kafka cluster based on settings in config file
            /// </summary>
            /// <param name="configBasePath">Base path where config file exists</param>
            /// <param name="fileName">The config file that exists in the base path</param>
            /// <param name="optional">True when json config file is mandatory</param>
            /// <exception cref="ArgumentNullException">Thrown if KafkaAdmin service failed to be resolved</exception>
            public static async Task CreateKafkaTopic(string configBasePath, string fileName = "appsettings.json", bool optional = true)
            {
                var services = ConfigureServices(configBasePath, fileName, optional);
    
                using (var container = CoreTestUtils.ConfigureContainer(services, new Autofac.Module[] { new KafkaModule() }))
                {
                    using (var scope = container.BeginLifetimeScope())
                    {
                        var kafkaSvc = scope.Resolve<IAsyncInitializer>() ?? throw new ArgumentNullException("Failed to resolve KafkaAdminService");
                        await kafkaSvc.InitializeAsync();
                    }
                }
            }
    
            /// <summary>
            /// Build Autofac container with the modules and services provided
            /// </summary>
            /// <param name="services">Services</param>
            /// <param name="modules">List of Autofac modules</param>
            /// <returns>Populated service collection</returns>
            public static IContainer ConfigureContainer(IServiceCollection services, Autofac.Module[] modules)
            {
                if (services == null)
                    throw new ArgumentNullException(nameof(services));
    
                if (modules == null)
                    throw new ArgumentNullException(nameof(modules));
    
                var containerBuilder = new ContainerBuilder();
    
                foreach (var module in modules)
                {
                    containerBuilder.RegisterModule(module);
                }
    
                containerBuilder.Populate(services);
    
                return containerBuilder.Build(ContainerBuildOptions.IgnoreStartableComponents);
            }
        }
    }