Search code examples
rabbitmqakkascalabilityhigh-availabilitytask-queue

Frame works for distributed scheduling task queue for a high scale


I need to build a system which requires at least a hundred thousand jobs created /scheduled for a day. I looked into multiple task queues but there isn't much support for scheduling say like quartz in a distributed way, A good solution seems to be celery( I am a java guy and I cannot use python),

Akka (This looks like a good solution scalable,persistent etc but the scheduling seem to have a limitation like I need to schedule 1,00,000 jobs at different times of the day as per the user request).new to akka please correct me if I am wrong.

I don't require and distributed locks for scheduling rather I need concurrency in an async fashion. Please suggest the alternatives. Open for scala/java/javascript as a language.

Quartz is not scalable(there are some serious limitations), I have to send around 300 million messages a day which will be delivered via 1,00,000 jobs


Solution

  • If you are worried about the accuracy of the Akka scheduler, once I found myself in the same situation and I will tell you my solution. Maybe there are better practices for this kind of scenario, but this one worked nicely for me. I would appreciate any improvements and suggestions. Instead of scheduling works for very long time intervals, I created a scheduling actor which sends ticks to itself in order to know when the work should be executed. Suppose you want to schedule some work from date A to date B, and execute it every T time units (this may be from milliseconds to years). Then what my actor does is scheduling (using the normal Akka scheduler) one tick exactly at the half of the interval between the current time and the time it should execute the first work. That is basically as a binary search. In this way, the actor will not be ticking constantly and will be very accurate in executing the work in the right time. See the Java code below for more information:

    public class WorkScheduler extends UntypedActor {
    
        public static Props props(final Date from, final Date to, final long every, final TimeUnit unit) {
            return Props.create(new Creator<WorkScheduler>() {
                private static final long serialVersionUID = 1L;
    
                @Override
                public WorkScheduler create() throws Exception {
                    return new WorkScheduler(from, to, every, unit);
                }
            });
        }
    
        // Thresholds to avoid ticking at very long (or very short) intervals 
        private static final long MIN_TICK_DELTA = 1000 // 1 sec.
        private static final long MAX_TICK_DELTA = 21600000 // 6 hours
    
        private class Tick extends Message {
        }
    
        private long from;
        private long to;
        private long dt;
        private long checkpoint;
    
        public WorkScheduler(Date from, Date to, long every, TimeUnit unit) {
            this.from = from.getTime();
            this.to = to.getTime();
            this.dt = unit.toMillis(every);
        }
    
        @Override
        public void preStart() throws Exception {
            scheduleNextTick(); // The first tick
        }
    
        private void scheduleNextTick() {
            long t = new Date().getTime();
    
            // Compute next checkpoint
            if (t < from) {
                checkpoint = from;
            } else {
                long k = (t - from) / dt;
                if ((t - from) % dt != 0) ++k;
                checkpoint = from + k * dt;
            }
    
            if (checkpoint > to) { // All works executed. Shutdown.
                getContext().stop(self());
            } else { // Schedule next tick
                long delta = Math.max(MIN_TICK_DELTA, Math.min((checkpoint - t) / 2, MAX_TICK_DELTA));
                getContext().system().scheduler().scheduleOnce(
                        FiniteDuration.apply(delta, TimeUnit.MILLISECONDS),
                        self(),
                        new Tick(),
                        getContext().dispatcher(),
                        null);
            }
        }
    
        @Override
        public void onReceive(Object msg) throws Exception {
            if (msg instanceof Tick) {
                long t = new Date().getTime();
                if (t >= checkpoint) {
                    if (t >= checkpoint + dt) // Tick came too late, due to some external delay (like system restart)
                       ;
    
                    // execute work here, preferably spawning a new actor 
                    // responsible for doing the work in asynchronous fashion
                }
                scheduleNextTick();
            } else
                unhandled(msg);
        }
    
    }
    

    Hope it helped))