Search code examples
postgresqlasp.net-coresignalrsignalr-hubasp.net-core-signalr

How can I listen postgresql database with SignalR Core in .net core project?


I'm working on .net core web application. I want to listen my PostgreSQL database. And if there are any changes on table, I have to got it.

So according to my research, I have to use SignalR Core. I did some example application with SignalR like chat app but none of them listen database. I couldn't find any example for this.

-Does It have to be trigger on PostgreSQL database?

-Does It have to be listener on code side?

-How can I use SignalR Core?

Please show me a way.

Thanks a lot.


Solution

  • This example is work asp.net core 3.0+. Full code is below.

    Step 1. Create a trigger on PostgreSql for listening actions

     create trigger any_after_alarm_speed after
     insert
     or
     delete
     or
     update
     on
     public.alarm_speed for each row execute procedure alarm_speedf();
    

    Step 2. Create Procedur on Postgresql

    CREATE OR REPLACE FUNCTION public.alarm_speedf()
    RETURNS trigger
    LANGUAGE plpgsql
    AS $function$
    BEGIN
    IF TG_OP = 'INSERT' then
    PERFORM pg_notify('notifyalarmspeed', format('INSERT %s %s', NEW.alarm_speed_id, 
    NEW.alarm_speed_date));
    ELSIF TG_OP = 'UPDATE' then
    PERFORM pg_notify('notifyalarmspeed', format('UPDATE %s %s', OLD.alarm_speed_id, 
    OLD.alarm_speed_date));
    ELSIF TG_OP = 'DELETE' then
    PERFORM pg_notify('notifyalarmspeed', format('DELETE %s %s', OLD.alarm_speed_id, 
    OLD.alarm_speed_date));
    END IF;
    RETURN NULL;
    END;
    $function$;
    

    Step 3. Create Hub

      public class speedalarmhub : Hub
        {
    
            private IMemoryCache _cache;
           `private IHubContext<speedalarmhub> _hubContext;
             public speedalarmhub(IMemoryCache cache, IHubContext<speedalarmhub> hubContext)
            {
                _cache = cache;
                _hubContext = hubContext; 
            }
    
            public async Task SendMessage()
            {
                if (!_cache.TryGetValue("SpeedAlarm", out string response))
                {
                    SpeedListener speedlist = new SpeedListener(_hubContext,_cache);
                    speedlist.ListenForAlarmNotifications();
                    string jsonspeedalarm = speedlist.GetAlarmList();
                    _cache.Set("SpeedAlarm", jsonspeedalarm);
                    await Clients.All.SendAsync("ReceiveMessage", _cache.Get("SpeedAlarm").ToString());
                }
                else
                {
                    await Clients.All.SendAsync("ReceiveMessage", _cache.Get("SpeedAlarm").ToString());
                }
            }
    
        }
    

    Step 4. Create Listener Controller

     public class SpeedListener :Controller
    {
        private IHubContext<speedalarmhub> _hubContext;
        private IMemoryCache _cache;
        public SpeedListener(IHubContext<speedalarmhub> hubContext,IMemoryCache cache)
        {
            _hubContext = hubContext;
            _cache = cache; 
        }
        static string GetConnectionString()
        {
            var csb = new NpgsqlConnectionStringBuilder
            {
                Host = "yourip",
                Database = "yourdatabase",
                Username = "yourusername",
                Password = "yourpassword",
                Port = 5432,
                KeepAlive = 30
            };
            return csb.ConnectionString;
        }
        public void ListenForAlarmNotifications()
        {
            NpgsqlConnection conn = new NpgsqlConnection(GetConnectionString());
            conn.StateChange += conn_StateChange;
            conn.Open();
            var listenCommand = conn.CreateCommand();
            listenCommand.CommandText = $"listen notifyalarmspeed;";
            listenCommand.ExecuteNonQuery();
            conn.Notification += PostgresNotificationReceived;
            _hubContext.Clients.All.SendAsync(this.GetAlarmList());
            while (true)
            {
                conn.Wait();
            }
        }
        private void PostgresNotificationReceived(object sender, NpgsqlNotificationEventArgs e)
        {
    
            string actionName = e.Payload.ToString();
            string actionType = "";
            if (actionName.Contains("DELETE"))
            {
                actionType = "Delete";
            }
            if (actionName.Contains("UPDATE"))
            {
                actionType = "Update";
            }
            if (actionName.Contains("INSERT"))
            {
                actionType = "Insert";
            }
            _hubContext.Clients.All.SendAsync("ReceiveMessage", this.GetAlarmList());
        }
        public string GetAlarmList()
        {
            var AlarmList = new List<AlarmSpeedViewModel>();
            using (NpgsqlCommand sqlCmd = new NpgsqlCommand())
            {
                sqlCmd.CommandType = CommandType.StoredProcedure;
                sqlCmd.CommandText = "sp_alarm_speed_process_get";
                NpgsqlConnection conn = new NpgsqlConnection(GetConnectionString());
                conn.Open();
                sqlCmd.Connection = conn;
                using (NpgsqlDataReader reader = sqlCmd.ExecuteReader())
                {
                    while (reader.Read())
                    {
                        AlarmSpeedViewModel model = new AlarmSpeedViewModel();
                        model.alarm_speed_id = reader.GetInt32(0);
                      // you must fill  your model items
                        AlarmList.Add(model);
                    }
                    reader.Close();
                    conn.Close();
                }
    
    
    
            }
            _cache.Set("SpeedAlarm", SerializeObjectToJson(AlarmList));
            return _cache.Get("SpeedAlarm").ToString();
        }
        public String SerializeObjectToJson(Object alarmspeed)
        {
            try
            {
                var jss = new JavaScriptSerializer();
                return  jss.Serialize(alarmspeed);
            }
            catch (Exception) { return null; }
        }
        private void conn_StateChange(object sender, System.Data.StateChangeEventArgs e)
        {
    
            _hubContext.Clients.All.SendAsync("Current State: " + e.CurrentState.ToString() + " Original State: " + e.OriginalState.ToString(), "connection state changed");
        }
    }
    

    Step 5 Calling Hub

    <script src="~/lib/signalr.js"></script>
    
    <script type="text/javascript">
    // Start the connection.
    var connection = new signalR.HubConnectionBuilder()
        .withUrl('/speedalarmhub')
        .build();
    
    
    connection.on('ReceiveMessage', function (message) {
    
                var encodedMsg = message;
                // Add the message to the page.
    
    });
    // Transport fallback functionality is now built into start.
    connection.start()
        .then(function () {
    
            console.log('connection started');
            connection.invoke('SendMessage');
        })
        .catch(error => {
            console.error(error.message);
        });
    

    Step 6. Add below code Configuration Services at Startup

    public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllersWithViews();
            services.AddSignalR();
            services.AddMemoryCache();
        }
    

    Step 7. add below code in Configure method

    app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllerRoute(
                    name: "default",
                    pattern: "{controller=Home}/{action=Index}/{id?}");
                  endpoints.MapHub<speedalarmhub>("/speedalarmhub");
            });