Search code examples
c#sql-serverrace-conditionazure-webjobssdkazure-storage-queues

How can I ensure I dont run into this race condition?


My current process is running in the following manner:

1.) A user enters a URL in the front end app for analyzing

2.) The front end validates the URL and creates a record of the URL in a table containing attributes of the URL

3.) The front end creates/updates a row in a table that keeps track of which stage of processing the URL is in (Each URL has its own internal ID)

3.A) The status code is updated to "queued" status

---- Table defnition:

ID INT PRIMARY KEY,
StatusCode INT,
StatusDescription VARCHAR(MAX),
IsInitial BIT,
LastUpdated DATETIME

4.) The front end sends a message to an Azure Storage Queue containing the submitted URL's internal ID

After the first message is sent to the queue ------------>

4.A) An object is created in the UI for the user to click on (to "refresh" the data)

4.B) The user clicks (high probability this will happen) on the created object once created (instantly if validated)

4.C) Another message is sent to the queue containing the URL's ID

<--------------------------------

5.) An azure webjob (background task) running Continuously picks these messages up and begins processing

6.) The webjob determines if this URL is ready to be processed

..... it is ready to start processing if

  • It is new (LastUpdated field is null)
  • Status code attached to the item represents an error
  • It has been 15 minutes since LastUpdated

..... It exits if

  • The ID in the message is invalid
  • The status code attached says it is currently being processed already
  • It has been less than 15 minutes since it was LastUpdated

Once it is determined to be deemed OK to continue ...

  • If it is new, the webjob will update the LastUpdated to Datetime.Now
  • At the beginning of each step of the process, the status code is updated to reflect this
  • At the very end of the process, LastUpdated is updated to the current time

  • A try catch surrounds the process

a.) If the process errors out, the status code is updated to reflect that

b.) A new message is pushed into the queue for a retry

Function to determine if the URL is ready to be parsed:

    private bool IsReadyToParse(int [ID])
    {
        using (var db = EntityFactory.GetInstance())
        {
            var item = db.ProcessStatus.FirstOrDefault(x => x.ID == [ID]);

            if (item == null || item.StatusCode > 1)
            {
                return false;
            }

            if (item.StatusCode == (int)ProcessStatusEnum.Error || item.LastUpdated == null)
            {
                item.LastUpdated = DateTime.Now;
                db.Entry(item).State = EntityState.Modified;
                db.SaveChanges();
                return true;
            }

            return ((DateTime)item.LastUpdated).AddMinutes(15) < DateTime.Now;
        }
    }

Queue message enters through this function:

     // This function will get triggered/executed when a new message is written 
    // on an Azure Queue
    public static void ProcessQueueMessage([QueueTrigger("[queue]")] QueueItem item, TextWriter log)
    {
        Console.WriteLine("Item found! Starting services [Id: {0}]", item.ID);

        Agent agent = new Agent([ID], log);
        agent.StartProcessing();

        log.WriteLine([Item]);
    }

... Now the problem is that this continuously running webjob can pick up more than one message at a time (and I want to scale this out to a couple more webjobs reading from the same queue down the road)

How can I be sure that the function IsReadyToParse() actually reflects the current state of processing?

What if the database is just about to update the status code to "in process", but another thread just read the status code and gave the OK to go ahead with the process also?


Solution

  • Here is a possible approach that is similar to what the WebJobs SDK does internally to prevent more that one webjob function to process the same blob trigger simultaneously.

    When a function picks up a message from a queue, create a blob with the same name as the ID in the message. The content of the blob is the status of the processing (Done or InProgress). When a function wants to process the message with that ID it must take a lease on that blob - that guarantees the thread safety. Then:

    • If it cannot acquire the least, someone else is processing the message => discard the queue message.
    • If it acquires the lease but the status is "Done", someone already processed the message => discard the queue message.
    • If it acquires the lease and the status is "In progress", someone tried to process the message but couldn't complete => use the message and process again.

    If processing a message can take more that 60 seconds you will need some extra code that will renew the blob lease, otherwise it will just expire and someone else can pick it up.