Search code examples
c#dependency-injectionasp.net-coreasp.net-core-webapiasp.net-core-signalr

How to invoke SignalR Clients.All.InvokeAsync() in places other than the Controller?


I'm able to access my IHubContext<MyHub> fine and dandy in my .NET Core WebAPI's Controller through DI in the constructor, but I want to access it elsewhere too.

Specifically, when I consume a message from RabbitMQ, sometimes I want to update clients through _myHubContext.Clients.All.InvokeAsync(), but I just can't figure out how to get it.

I'm also having issues finding documentation for doing this kind of thing outside of the controller.

Any help would be appreciated.

EDIT:

To add some detail, and where the cause of my problem may originate, I'm trying to access the IHubContext (and some of my own services registered in ConfigureServices) within my Startup class, specifically during IApplicationLifetime ApplicationStarted and ApplicationStopped which call a RabbitMQ consumer's methods to connect and disconnect.

I'm I correct in guessing that maybe I'm unable to access registered services in the Startup class? If so, how would I go about starting these services?

Update:

Moving services.AddSignalR() and some of the services that are called at startup one level up to the WebHost.ConfigureServices in Program.cs solved some of my problems, but of course there are more.

I wasn't getting any messages on my JS client when I received a message from RabbitMQ, but my client was connecting successfully. "Weird..." I thought. To get more info, I wired up a an GET action in my controller to sent some content through the SignalR Hub. Whenever I called that GET, it works... the IHubContext<MyHub>. I get the hubContext through the constructor in my RabbitMQ listener, just like I do with the controller.

The new question: are things injected differently in the controller than they are into services that I register myself at startup? How so, and how do I overcome that?

Some code to go with it...

Excerpt from Program.cs

public static IWebHost BuildWebHost(string[] args) =>
    WebHost.CreateDefaultBuilder(args)
        .UseKestrel()
        .UseIISIntegration()
        .ConfigureServices(services => {
            services.AddSignalR();
            services.AddTransient<ISubscriber, Subscriber>();
            services.AddTransient<IDataService, DataService>();
            services.AddTransient<IHealthCheckProcessor, HealthCheckProcessor>();
            services.AddTransient<INodeProcessor, NodeProcessor>();
        })
        .UseStartup<Startup>()
        .Build();

From Startup.cs

public class Startup
{
    public Startup(IConfiguration _configuration, ISubscriber _subscriber)
    {
        configuration = _configuration;
        subscriber = _subscriber;
    }
    public IConfiguration configuration { get; }
    public ISubscriber subscriber { get; }

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddCors();
        services.AddMvc();
    }

    public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime applicationLifetime)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }
        app.UseCors(builder => builder
            // CORS stuff);

        app.UseSignalR(routes =>
        {
            routes.MapHub<StatusHub>("Status");
        });
        app.UseMvc();
        applicationLifetime.ApplicationStarted.Register(OnStartup);
        applicationLifetime.ApplicationStopping.Register(OnShutdown);
    }

    private void OnStartup() {
        // MessageBroker stuff
        subscriber.Start(messageBroker);
    }

    private void OnShutdown() {
        subscriber.Stop();
    }
}

From Subscriber.cs

public class Subscriber : ISubscriber {
    public static IConnection connection;
    public static IModel channel;
    public IHubContext<StatusHub> hubContext;

    public static IHealthCheckProcessor healthCheckProcessor;
    public static INodeProcessor nodeProcessor;

    public Subscriber(IHubContext<StatusHub> _hubContext, INodeProcessor _nodeProcessor, IHealthCheckProcessor _healthCheckProcessor)
    {
        connection = new ConnectionFactory().CreateConnection();
        channel = connection.CreateModel();
        hubContext = _hubContext;
        nodeProcessor = _nodeProcessor;
        healthCheckProcessor = _healthCheckProcessor;
    }

    public void Start(MessageBroker messageBroker)
    {
        var factory = new ConnectionFactory() { HostName = messageBroker.URL }.CreateConnection();

        foreach (Queue queue in messageBroker.Queues)
        {
            channel.QueueDeclare(
                queue: queue.Name,
                durable: queue.Durable,
                exclusive: queue.Exclusive,
                autoDelete: queue.AutoDelete,
                arguments: null
            );

            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            consumer.Received += (model, ea) =>
            {
                byte[] body = ea.Body;
                string message = Encoding.UTF8.GetString(body);

                RouteMessage(queue, message);
            };

            channel.BasicConsume(
                    queue: queue.Name,
                    autoAck: queue.AutoAck,
                    consumer: consumer
                );
            hubContext.Clients.All.InvokeAsync("Send", "It worked - from the subscriber");
        }
    }

    public void RouteMessage(Queue queue, string message) {
        if(queue.Name == "discovery") {
            nodeProcessor.Process(message);
        }
        if(queue.Name == "health") {
            healthCheckProcessor.Process(message);
        }
    }

    public void Stop()
    {
        Console.WriteLine("Terminating connection to RabbitMQ instance.");
        channel.Close(200, "Goodbye");
        connection.Close();
    }
}

From HealthCheckProcessor.cs

public class HealthCheckProcessor : IHealthCheckProcessor {
    private IDataService dataService;
    private IHubContext<StatusHub> hubContext;

    public HealthCheckProcessor(IDataService _dataService, IHubContext<StatusHub> _hubContext)
    {
        dataService = _dataService;
        hubContext = _hubContext;
    }
    public void Process(string message) {
        HealthCheck health = JsonConvert.DeserializeObject<HealthCheck>(message);
        Node node = dataService.GetSingle(health.NodeId);
        node.Health = health;

        dataService.Update(node);
        Console.WriteLine("It's sending.");
        hubContext.Clients.All.InvokeAsync("Send", "It worked - from the processor");
    }
}

From the Controller:

[Route("api/[controller]")]
public class MyController: Controller
{
    private IDataService _dataService;
    private readonly IConfiguration configuration;
    private static IHubContext<StatusHub> hubContext;

    public NodesController(IConfiguration config, IDataService dataService, IHubContext<StatusHub> _hubContext)
    {
        _dataService = dataService;
        configuration = config;
        hubContext = _hubContext;
    }

    [HttpGet]
    public string Get()
    {
        hubContext.Clients.All.InvokeAsync("Send", "Blarg!");
        return "Well, I tried.";
    }
}

Solution

  • You are trying to access services that are not available at the time you request them.

    Configure is called after ConfigureServices specifically so that any services registered can be accessible.

    public class Startup {
        public Startup(IConfiguration _configuration) {
            configuration = _configuration;
        }
    
        public IConfiguration configuration { get; }
    
        public void ConfigureServices(IServiceCollection services) {        
            services.AddCors();
            services.AddMvc();
    
            services.AddSignalR();
            services.AddTransient<ISubscriber, Subscriber>();
            services.AddTransient<IDataService, DataService>();
            services.AddTransient<IHealthCheckProcessor, HealthCheckProcessor>();
            services.AddTransient<INodeProcessor, NodeProcessor>();
        }
    
        public void Configure(
            IApplicationBuilder app, 
            IHostingEnvironment env, 
            IApplicationLifetime applicationLifetime, 
            IServiceProvider sp
        ) {
            if (env.IsDevelopment()) {
                app.UseDeveloperExceptionPage();
            }
            app.UseCors(builder => builder
                // CORS stuff);
    
            app.UseMvc();
    
            app.UseSignalR(routes => {
                routes.MapHub<StatusHub>("Status");
            });
    
            //At this point all the necessary dependencies have been registered and configured
            var subscriber = sp.GetService<ISubscriber>();
    
            applicationLifetime.ApplicationStarted.Register(() => OnStartup(subscriber));
            applicationLifetime.ApplicationStopping.Register(() => OnShutdown(subscriber));
        }
    
        private void OnStartup(ISubscriber subscriber) {
            // MessageBroker stuff
            subscriber.Start(messageBroker);
        }
    
        private void OnShutdown(ISubscriber subscriber) {
            subscriber.Stop();
        }
    }
    

    You should be able to now remove the convenience ConfigureServices when building the host.

    public static IWebHost BuildWebHost(string[] args) =>
        WebHost.CreateDefaultBuilder(args)
            .UseKestrel()
            .UseIISIntegration()
            .UseStartup<Startup>()
            .Build();