My current process is running in the following manner:
1.) A user enters a URL in the front end app for analyzing
2.) The front end validates the URL and creates a record of the URL in a table containing attributes of the URL
3.) The front end creates/updates a row in a table that keeps track of which stage of processing the URL is in (Each URL has its own internal ID)
3.A) The status code is updated to "queued" status
---- Table defnition:
ID INT PRIMARY KEY,
StatusCode INT,
StatusDescription VARCHAR(MAX),
IsInitial BIT,
LastUpdated DATETIME
4.) The front end sends a message to an Azure Storage Queue containing the submitted URL's internal ID
After the first message is sent to the queue ------------>
4.A) An object is created in the UI for the user to click on (to "refresh" the data)
4.B) The user clicks (high probability this will happen) on the created object once created (instantly if validated)
4.C) Another message is sent to the queue containing the URL's ID
<--------------------------------
5.) An azure webjob (background task) running Continuously picks these messages up and begins processing
6.) The webjob determines if this URL is ready to be processed
..... it is ready to start processing if
..... It exits if
Once it is determined to be deemed OK to continue ...
At the very end of the process, LastUpdated is updated to the current time
A try catch surrounds the process
a.) If the process errors out, the status code is updated to reflect that
b.) A new message is pushed into the queue for a retry
Function to determine if the URL is ready to be parsed:
private bool IsReadyToParse(int [ID])
{
using (var db = EntityFactory.GetInstance())
{
var item = db.ProcessStatus.FirstOrDefault(x => x.ID == [ID]);
if (item == null || item.StatusCode > 1)
{
return false;
}
if (item.StatusCode == (int)ProcessStatusEnum.Error || item.LastUpdated == null)
{
item.LastUpdated = DateTime.Now;
db.Entry(item).State = EntityState.Modified;
db.SaveChanges();
return true;
}
return ((DateTime)item.LastUpdated).AddMinutes(15) < DateTime.Now;
}
}
Queue message enters through this function:
// This function will get triggered/executed when a new message is written
// on an Azure Queue
public static void ProcessQueueMessage([QueueTrigger("[queue]")] QueueItem item, TextWriter log)
{
Console.WriteLine("Item found! Starting services [Id: {0}]", item.ID);
Agent agent = new Agent([ID], log);
agent.StartProcessing();
log.WriteLine([Item]);
}
... Now the problem is that this continuously running webjob can pick up more than one message at a time (and I want to scale this out to a couple more webjobs reading from the same queue down the road)
How can I be sure that the function IsReadyToParse() actually reflects the current state of processing?
What if the database is just about to update the status code to "in process", but another thread just read the status code and gave the OK to go ahead with the process also?
Here is a possible approach that is similar to what the WebJobs SDK does internally to prevent more that one webjob function to process the same blob trigger simultaneously.
When a function picks up a message from a queue, create a blob with the same name as the ID in the message. The content of the blob is the status of the processing (Done or InProgress). When a function wants to process the message with that ID it must take a lease on that blob - that guarantees the thread safety. Then:
If processing a message can take more that 60 seconds you will need some extra code that will renew the blob lease, otherwise it will just expire and someone else can pick it up.