Search code examples
ruby-on-railsrails-activejob

Rails ActiveJob showing outdated DB data


Problem I'm trying to solve:

Multiple projects connect/make updates to the same DB. "Project A" is tasked with making updates to the DB, "Project B" is the client interface, tasked with showing updated data.

Solution:

When Project A is scheduled to update records, Project B will create a background Job to run for 15 minutes to catch the updates made, updating the interface every 15 seconds via ActionCable (webhooks).

Issue:

ActiveJob is stuck on "3 remaining records" without noticing any DB changes. If the Model is called directly (outside of the Job), everything works as expected; DB changes are noticed, client interface gets updated, and everyone is happy.

But as soon as it's called from the Job (to run the task in the background), it gets stuck.


Job that calls the Model logic:

class BatchWatchJob < ApplicationJob
    queue_as :default

    def perform(**args)
        if args[:batch_number]
            p "=== Begin - BatchWatch for batch_number: #{args[:batch_number]} ==="
            begin
                # Set 15 minute timeout
                # ReImport.watch_batch() not updating 
                # it's count when DB data changes :/
                #   BUT if I call the method directly (not from this Job), it works
                Timeout.timeout(900) { ReImport.watch_batch(args[:batch_number]) }
            rescue Timeout::Error
                puts '=== TIMEOUT ERROR ==='
                puts 'BatchWatch job failed to finish in the requisite amount of time (15 minutes)'
            end
        else
            p "=== Unable to start BatchWatch, batch_number not specified ==="
        end
    end
end

Model containing my logic:

class ReImport < ApplicationRecord

        [...]

        def self.watch_batch(num)
                batch = ReImport.where(batch: num)
                total = batch.count
                remaining = batch.where(completed: false) # NOTE: starts with 3 remaining
                remaining_count = remaining.count
                p "initial remaining: #{remaining_count}"
                while !remaining.empty? do
                        p 'wait 10 seconds...'

                        sleep(10) # wait 10 seconds

                        p '...seconds finished...'
                        # batch.reload # doesn't work
                        # remaining.reload # doesn't work
                        batch = ReImport.where(batch: num)
                        remaining = batch.where(completed: false) # NOTE: this should now be 2
                        p "remaining_count: #{remaining_count}"
                        p "remaining . count: #{remaining.count}"
                        p "(remaining_count > remaining.count): #{(remaining_count > remaining.count)}"
                        if remaining_count > remaining.count
                                p '=== WATCH_BATCH: update found! ==='
                                # Update count
                                remaining_count = remaining.count

                                # Broadcast DataTables to update records
                                ReImportBatchChannel.broadcast_to("re_import_batch_#{num}", {update: true})
                        else
                                p '=== WATCH_BATCH: nothing changed yet ==='
                        end 
                end 
                p '=== WATCH_BATCH COMPLETED (or timeout reached) ==='
        end

        [...]

end

Rails Console - Output:

 > BatchWatchJob.perform_later(batch_number: 7)

Enqueued BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) to Async(default) with arguments: {:batch_number=>7}

 => #<BatchWatchJob:0x000000055c2100 @arguments=[{:batch_number=>7}], @job_id="7d1beeaf-c4d8-4489-885c-13b44d1037cf", @queue_name="default", @priority=nil, @executions=0, @provider_job_id="011e004c-34f5-4c7a-9925-052df1aa1774"> 

Performing BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) from Async(default) with arguments: {:batch_number=>7}

"=== Begin - BatchWatch for batch_number: 7 ==="
     (2.1ms)  SET  @@SESSION.sql_mode = CONCAT(CONCAT(@@sql_mode, ',STRICT_ALL_TABLES'), ',NO_AUTO_VALUE_ON_ZERO'),  @@SESSION.sql_auto_is_null = 0, @@SESSION.wait_timeout = 2147483
     (0.9ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7
     (0.7ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0
"initial remaining: 3"
    ReImport Exists (0.5ms)  SELECT  1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1
"wait 10 seconds..."
"...seconds finished..."

"remaining_count: 3"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"remaining . count: 3"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"(remaining_count > remaining.count): false"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"=== WATCH_BATCH: nothing changed yet ==="
    CACHE ReImport Exists (0.0ms)  SELECT  1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1  [["batch", 7], ["completed", 0], ["LIMIT", 1]]

"wait 10 seconds..."
"...seconds finished..."
"remaining_count: 3"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"remaining . count: 3"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"(remaining_count > remaining.count): false"
    CACHE  (0.0ms)  SELECT COUNT(*) FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0  [["batch", 7], ["completed", 0]]
"=== WATCH_BATCH: nothing changed yet ==="
    CACHE ReImport Exists (0.0ms)  SELECT  1 AS one FROM `re_imports` WHERE `re_imports`.`batch` = 7 AND `re_imports`.`completed` = 0 LIMIT 1  [["batch", 7], ["completed", 0], ["LIMIT", 1]]

"wait 10 seconds..."
[...]
Performed BatchWatchJob (Job ID: 7d1beeaf-c4d8-4489-885c-13b44d1037cf) from Async(default) in 22863.44ms

I'm not just noticing CACHE ReImport Exists, trying to figure out how to turn that off...


Solution

  • I found that the problem is SQL Caching, and fixed it by using ActiveRecords uncached method ReImport.uncached do within the while loop like so:

    Model

    class ReImport < ApplicationRecord
    
        [...]
    
        def self.watch_batch(num)
            batch = ReImport.where(batch: num)
            total = batch.count
            remaining = batch.where(completed: false)
            remaining_count = remaining.count
            p "initial remaining: #{remaining_count}"
            while !remaining.empty? do
                ReImport.uncached do # <--- UNCACHE SQL QUERIES
                    p 'wait 10 seconds...'
    
                    sleep(10) # wait 10 seconds
    
                    p '...seconds finished...'
                    batch = ReImport.where(batch: num)
                    remaining = batch.where(completed: false)
                    p "remaining_count: #{remaining_count}"
                    p "remaining . count: #{remaining.count}"
                    p "(remaining_count > remaining.count): #{(remaining_count > remaining.count)}"
                    if remaining_count > remaining.count
                        p '=== WATCH_BATCH: update found! ==='
                        # Update count
                        remaining_count = remaining.count
    
                        # Broadcast DataTables to update records
                        ReImportBatchChannel.broadcast_to("re_import_batch_#{num}", {update: true})
                    else
                        p '=== WATCH_BATCH: nothing changed yet ==='
                    end
                end
            end
            p '=== WATCH_BATCH COMPLETED (or timeout reached) ==='
        end
    
        [...]
    
    end
    

    Source