I am using the ColdFusion gateways to fire and forget a large number of actions. To do this, I have a loop that goes through a query with a SendGatewayMessage()
at the end. However, the query that I loop through can get extremely large. (100.000+ records)
To prevent actions from being lost, I increased the queue size and the number of threads.
Because actions still got lost, I included a loop before the SendGatewayMessage()
like so:
<cfloop condition="#gatewayService.getQueueSize()# GTE #gatewayService.getMaxQueueSize()#">
<cfset guardianCount = guardianCount+1>
</cfloop>
<cflog file="gatewayGuardian" text="#i# waited for #guardianCount# iterations. Queuesize:#gatewayService.getQueueSize()#">
<cfset SendGatewayMessage("EventGateway",eventData)>
(More info on the gatewayService class here)
This is more or less acceptable, since I can increase the request timeout to a few hours(!), but I am still looking for a more effective way to slow down the sending of messages to the queue in the hope that the overall process will go faster with less pressure on the resources of the server.
Any suggestions? Any thoughts on the consequences of increasing the queue size even further?
Right now, I use application variables to keep track of the records in the whole job, the number of batches already processed and the number of records processed. At the start of the job, I have a piece of code that initiates all these variables like so:
<cfif not structKeyExists(application,"batchNumber") or application.batchNumber
eq 0 or application.batchNumber eq "">
<cfset application.batchNumber = 0>
<cfset application.recordsToDo = 0>
<cfset application.recordsDone = 0>
<cfset application.recordsDoneErrors = 0>
</cfif>
After that, I set all the records in a query and determine which records in that query we need to process in the current batch. The amount of records in the batch is determined by the total amount of records and the maximum queue size. This way, each batch will never occupy more than about half of the queue. This makes sure that the job will never interfere with other operations or jobs and that the initial request will not time out.
<cfset application.recordsToSync = qryRecords.recordcount>
<cfif not structKeyExists(application,"recordsPerBatch") or application.recordsPerBatch eq "" or application.recordsPerBatch eq 0>
<cfset application.recordsPerBatch = ceiling(application.recordsToDo/(ceiling(application.recordsToDo/gatewayService.getMaxQueueSize())+1))>
</cfif>
<cfset startRow = (application.recordsPerBatch*application.batchNumber)+1>
<cfset endRow = startRow + application.recordsPerBatch-1>
<cfif endRow gt application.recordsToDo>
<cfset endRow = application.recordsToDo>
</cfif>
Then I loop through the query with a from/to loop to fire off the gateway events. I kept the guardian so there will never be a record lost because the queue is full.
<cfloop from="#startRow#" to="#endRow#" index="i">
<cfset guardianCount = 0>
<!--- load all values from the record into a struct --->
<cfset stRecordData = structNew()>
<cfloop list="#qryRecords.columnlist#" index="columnlabel">
<cfset stRecordData[columnlabel] = trim(qryRecords[columnlabel][i])>
</cfloop>
<cfset eventData = structNew()>
<cfset eventData.stData = stRecordData>
<cfset eventData.action = "bigJob">
<cfloop condition="#gatewayService.getQueueSize()# GTE #gatewayService.getMaxQueueSize()#">
<cfset guardianCount = guardianCount++>
</cfloop>
<cfset SendGatewayMessage("eventGateway",eventData)>
</cfloop>
Whenever a record is done, I have a function that checks the number of done vs the number of records to do. When they are the same, I'm done. Otherwise we may need to start a new batch. Notice that the check to see if we're done is in a cflock, but the actual event post is not. This is because otherwise you might get a deadlock when the event you posted can't read the variables you use inside the lock.
I hope this is of use to someone or someone else has a better idea still.
<cflock timeout="30" name="jobResult">
<cfset application.recordsDone++>
<cfif application.recordsDone eq application.recordsToDo>
<!--- We are done. Set all the application variables we used back to zero, so they do not get in the way when we start the job again --->
<cfset application.batchNumber = 0>
<cfset application.recordsToDo = 0>
<cfset application.recordsDone = 0>
<cfset application.recordsPerBatch = 0>
<cfset application.recordsDoneErrors = 0>
<cfset application.JobStarted = 0>
<!--- If the number of records we have done is the same as the number of records in a batch times the current batchnumber plus one, we are done with the batch. --->
<cfelseif application.recordsDone eq application.recordsPerBatch*(application.batchNumber+1)
and application.recordsDone neq application.recordsToDo>
<cfset application.batchNumber++>
<cfset doEventAnnounce = true>
</cfif>
</cflock>
<cfif doEventAnnounce>
<!--- Fire off the event that starts the job. All the info it needs is in the applicationscope. --->
<cfhttp url="#URURLHERE#/index.cfm" method="post">
<cfhttpparam type="url" name="event" value="startBigJob">
</cfhttp>
</cfif>