Search code examples
kubernetesdelayed-job

How to constrain delayed_job processing based on kubernetes cluster


I am looking for a way to isolate which of my review environments process which jobs.

We are using delayed_job and am running some kubernetes alias clusters based on a master cluster.

Is this at all possible? I found a way to prefix the worker's name simply, but I can't find a way to pass this on to the actual job.

Any help is appreciated.

The way I figured it should work is something like this.

I'm not sure if this is the right way to go, perhaps the same thing could be achieved using the lifecycle events? I just add a column and use the lifecycle events to add the data and query it?

Crossposted to collectiveidea/delayed_job/issues/1125


Solution

  • Eventually, I ended up with the following solution. Add a varchar column named cluster to the delayed_jobs table and BOOM. Works like a charm.

    require 'delayed/backend/active_record'
    
    module Delayed
      module Backend
        module ActiveRecord
          class Configuration
            attr_accessor :cluster
          end
    
          # A job object that is persisted to the database.
          # Contains the work object as a YAML field.
          class Job < ::ActiveRecord::Base
            READY_SQL = <<~SQL.squish.freeze
              ((cluster = ? AND run_at <= ? AND (locked_at IS NULL OR locked_at < ?)) OR locked_by = ?) AND failed_at IS NULL
            SQL
    
            before_save :set_cluster
    
            def self.ready_to_run(worker_name, max_run_time)
              where(READY_SQL, cluster, db_time_now, db_time_now - max_run_time, worker_name)
            end
    
            # When a worker is exiting, make sure we don't have any locked jobs.
            def self.clear_locks!(worker_name)
              where(cluster: cluster, locked_by: worker_name)
                .update_all(locked_by: nil, locked_at: nil) # rubocop:disable Rails/SkipsModelValidations
            end
    
            def self.cluster
              Delayed::Backend::ActiveRecord.configuration.cluster
            end
    
            def set_cluster
              self.cluster ||= self.class.cluster
            end
          end
        end
      end
    end
    
    Delayed::Backend::ActiveRecord.configuration.cluster = ENV['CLUSTER'] if ENV['CLUSTER']