Search code examples
c#sql-serverservice-broker

SQL Server Service Broker notification queue dont fire anymore after waiting for it with a SQL Query


(bad grammar warning) I wanted to get notified everytime a specific table has changed, like a new rows got inserted. It works as long as i dont try to monitor the queue in my SQL Server/SSMS with a query, instead of my program.

I created a new Service and queue like that:

CREATE QUEUE SQLDependencyQueue;
CREATE SERVICE NamesService
ON QUEUE SQLDependencyQueue
([http://schemas.microsoft.com/SQL/Notifications/PostQueryNotification]) ;

When i now listen to the queue with a little C# program, it works like a charm.

 private void InitializGetData()
        {
            String connectionString = "Server=DESKTOP-N943R2B\\SQLEXPRESS01;Database = DBDependency; User Id = sa;Password = admin; ";

            if (conn == null)
            {
                conn = new SqlConnection(connectionString);
            }

            if(command == null)
            {
                command = new SqlCommand(@"SELECT[id],[lade_nr],[vg_lfd_nr],[status] FROM[dbo].[t3_auftr_log_ll] where [status] = 0", conn);
            }

            if (dataToWatch == null)
            {
                dataToWatch = new DataSet();
            }

            notification = new SqlNotificationRequestRegister("WAITFOR(RECEIVE * FROM SQLDependencyQueue);", strServiceName, timeOut, connectionString);
            notification.OnChanged += NotificationOnChanged;

            getData();
        }

        void getData()
        {
            if(notification == null)
            {
                return;
            }

            dataToWatch.Clear();

            command.Notification = null;
            command.Notification = notification.NotificationRequest;

            //notification.StartSqlNotification();

            using (SqlDataAdapter adapter = new SqlDataAdapter(command))
            {
                adapter.Fill(dataToWatch, "Order_log_ll");

                for(int i= 0; i <= dataToWatch.Tables["Order_log_ll"].Rows.Count - 1; i++)
                {
                    Console.WriteLine(String.Format("Row: {0}", dataToWatch.Tables["t3_Auftr_log_ll"].Rows[i]["vg_lfd_nr"].ToString())); //, dataToWatch.Tables["t3_Auftr_log_ll"].Rows[i]["status"].ToString()

                    using (SqlCommand cmd = new SqlCommand("update t3_auftr_log_ll set status = 1 where id = @id",conn))
                    {
                        if (conn.State != System.Data.ConnectionState.Open)
                        {
                            conn.Open();
                        }
                        cmd.Parameters.AddWithValue("@id", dataToWatch.Tables["t3_Auftr_log_ll"].Rows[i]["id"].ToString());
                        cmd.ExecuteNonQuery();
                    }
                }
            }

            notification.StartSqlNotification();
        }
private void RegisterSqlNotificationRequest()
        {
            request = new SqlNotificationRequest();
            request.UserData = new Guid().ToString();
            request.Options = String.Format("Service={0}", strServiceName);
            request.Timeout = intNotificationTimeout;

            if (OnChanged != null)
            {
                OnChanged(this, null);
            }
        }
public void Listen()
        {
            using (SqlConnection conn = new SqlConnection(strConnectionString))
            {
                using (cmd = new SqlCommand("WAITFOR(RECEIVE * FROM SQLDependencyQueue);", conn))
                {
                    if (conn.State != System.Data.ConnectionState.Open)
                    {
                        conn.Open();
                    }

                    cmd.CommandTimeout = intNotificationTimeout + 120;

                    using (SqlDataReader reader = cmd.ExecuteReader())
                    {
                        while (reader.Read())
                        {
                            for (int i = 0; i <= reader.FieldCount - 1; i++)
                                Debug.WriteLine(reader[i].ToString());
                        }
                    }
                }
            }

            RegisterSqlNotificationRequest();
        }

But if i start the program and then call a query in SSMS to see what the queue has in it like that:

WAITFOR(RECEIVE * FROM SQLDependencyQueue);

or like that:

RECEIVE conversation_handle, message_type_name, message_body  
FROM SQLDependencyQueue;

The queue stops working. My program is waiting for a notification, but the SQL server dosent create notification after that anymore when i insert something into the table.

When i check if the queue is active like that:

select is_receive_enabled
from sys.service_queues
where name = N'SQLDependencyQueue';

It shows me "1" for active, but still dont get a notification.

Only if i restart the program and subscribe again to the queue, its works again.

Edit:

When looking at the subcriptions with

select * from sys.dm_qn_subscriptions

there are two entryies when the program starts (withought a manuell wait). when i also wait extra manuelly for a notification, i thought i get 3 rows then, but the program stops working and in the subscription table left only one row, even tho i stopped the manuell wait.

enter image description here


Solution

  • If you run a RECEIVE manually and get a message back it means you have consumed the notification. In your C# code, after consuming a notification you subscribe again. If you consume it from SSMS, then your program won't subscribe again, so you'll end up with the program waiting forever for a notification that was already consumed from the queue, and nobody will re-subscribe.

    You can check sys.dm_qn_subscriptions for the status of your query notification subscriptions.

    What most people miss about Query Notifications is that you only get one notification, then you must subscribe again. Many expect to subscribe once an then get a stream of notifications, for every change. The feature is designed for cache invalidation (you get the data in the cache with a query, then you get notified when to refresh the cache). It does not work for monitoring a table for changes (which is what you seem to be trying).