Multiple projects connect/make updates to the same DB. "Project A" is tasked with making updates to the DB, "Project B" is the client interface, tasked with showing updated data.
When Project A is scheduled to update records, Project B will create a background Job to run for 15 minutes to catch the updates made, updating the interface every 15 seconds via ActionCable (webhooks).
ActiveJob is stuck on "3 remaining records
" without noticing any DB changes. If the Model is called directly (outside of the Job), everything works as expected; DB changes are noticed, client interface gets updated, and everyone is happy.
But as soon as it's called from the Job (to run the task in the background), it gets stuck.
class BatchWatchJob < ApplicationJob
queue_as :default
def perform(**args)
if args[:batch_number]
p "=== Begin - BatchWatch for batch_number: #{args[:batch_number]} ==="
begin
# Set 15 minute timeout
# ReImport.watch_batch() not updating
# it's count when DB data changes :/
# BUT if I call the method directly (not from this Job), it works
Timeout.timeout(900) { ReImport.watch_batch(args[:batch_number]) }
rescue Timeout::Error
puts '=== TIMEOUT ERROR ==='
puts 'BatchWatch job failed to finish in the requisite amount of time (15 minutes)'
end
else
p "=== Unable to start BatchWatch, batch_number not specified ==="
end
end
end
class ReImport < ApplicationRecord
[...]
def self.watch_batch(num)
batch = ReImport.where(batch: num)
total = batch.count
remaining = batch.where(completed: false) # NOTE: starts with 3 remaining
remaining_count = remaining.count
p "initial remaining: #{remaining_count}"
while !remaining.empty? do
p 'wait 10 seconds...'
sleep(10) # wait 10 seconds
p '...seconds finished...'
# batch.reload # doesn't work
# remaining.reload # doesn't work
batch = ReImport.where(batch: num)
remaining = batch.where(completed: false) # NOTE: this should now be 2
p "remaining_count: #{remaining_count}"
p "remaining . count: #{remaining.count}"
p "(remaining_count > remaining.count): #{(remaining_count > remaining.count)}"
if remaining_count > remaining.count
p '=== WATCH_BATCH: update found! ==='
# Update count
remaining_count = remaining.count
# Broadcast DataTables to update records
ReImportBatchChannel.broadcast_to("re_import_batch_#{num}", {update: true})
else
p '=== WATCH_BATCH: nothing changed yet ==='
end
end
p '=== WATCH_BATCH COMPLETED (or timeout reached) ==='
end
[...]
end
> BatchWatchJob.perform_later(batch_number: 7)
Enqueued BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) to Async(default) with arguments: {:batch_number=>7}
=> #<BatchWatchJob:0x000000055c2100 @arguments=[{:batch_number=>7}], @job_id="7d1beeaf-c4d8-4489-885c-13b44d1037cf", @queue_name="default", @priority=nil, @executions=0, @provider_job_id="011e004c-34f5-4c7a-9925-052df1aa1774">
Performing BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) from Async(default) with arguments: {:batch_number=>7}
"=== Begin - BatchWatch for batch_number: 7 ==="
(2.1ms) SET @@SESSION.sql_mode = CONCAT(CONCAT(@@sql_mode, ',STRICT_ALL_TABLES'), ',NO_AUTO_VALUE_ON_ZERO'), @@SESSION.sql_auto_is_null = 0, @@SESSION.wait_timeout = 2147483
(0.9ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7
(0.7ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0
"initial remaining: 3"
ReImport Exists (0.5ms) SELECT 1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1
"wait 10 seconds..."
"...seconds finished..."
"remaining_count: 3"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"remaining . count: 3"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"(remaining_count > remaining.count): false"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"=== WATCH_BATCH: nothing changed yet ==="
CACHE ReImport Exists (0.0ms) SELECT 1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1 [["batch", 7], ["completed", 0], ["LIMIT", 1]]
"wait 10 seconds..."
"...seconds finished..."
"remaining_count: 3"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"remaining . count: 3"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"(remaining_count > remaining.count): false"
CACHE (0.0ms) SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 [["batch", 7], ["completed", 0]]
"=== WATCH_BATCH: nothing changed yet ==="
CACHE ReImport Exists (0.0ms) SELECT 1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1 [["batch", 7], ["completed", 0], ["LIMIT", 1]]
"wait 10 seconds..."
[...]
Performed BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) from Async(default) in 22863.44ms
I'm not just noticing CACHE ReImport Exists
, trying to figure out how to turn that off...
I found that the problem is SQL Caching, and fixed it by using ActiveRecords uncached method ReImport.uncached do
within the while loop like so:
class ReImport < ApplicationRecord
[...]
def self.watch_batch(num)
batch = ReImport.where(batch: num)
total = batch.count
remaining = batch.where(completed: false)
remaining_count = remaining.count
p "initial remaining: #{remaining_count}"
while !remaining.empty? do
ReImport.uncached do # <--- UNCACHE SQL QUERIES
p 'wait 10 seconds...'
sleep(10) # wait 10 seconds
p '...seconds finished...'
batch = ReImport.where(batch: num)
remaining = batch.where(completed: false)
p "remaining_count: #{remaining_count}"
p "remaining . count: #{remaining.count}"
p "(remaining_count > remaining.count): #{(remaining_count > remaining.count)}"
if remaining_count > remaining.count
p '=== WATCH_BATCH: update found! ==='
# Update count
remaining_count = remaining.count
# Broadcast DataTables to update records
ReImportBatchChannel.broadcast_to("re_import_batch_#{num}", {update: true})
else
p '=== WATCH_BATCH: nothing changed yet ==='
end
end
end
p '=== WATCH_BATCH COMPLETED (or timeout reached) ==='
end
[...]
end