Search code examples
pythonschedulingor-toolscp-satoperations-research

Schedule non-overlapping tasks between two people


I am using Google OR-Tools to solve a maintenance scheduling problem. I have five machines which are all initially broken. I need to schedule tasks for two technicians to fix the machines to minimize the total lost output. Each technicial can fix any of the machines. However, the time taken to fix each machine is different (but known for each machine beforehand). The output from each machine is the same. Thus, the optimal solution is to do the quickest tasks (machine fixes) first and so as many machines are up and running as soon as possible. (This is a toy problem to get me started on something more complex.)

I've hacked around the job shop problem to solve this for one technician (see working python script below) but I am stuck trying to apply it to two technicians as I can't work out how to handle the no overlap condition between two sets of technicians' tasks.

from ortools.sat.python import cp_model
import pandas as pd

model = cp_model.CpModel()

horizon = 12 # just put a large enough horizon (time endpoint) here for now

#start and end times for each task as int variables
t00 = model.NewIntVar(0, horizon, 't00')
t01 = model.NewIntVar(0, horizon, 't01')

t10 = model.NewIntVar(0, horizon, 't10') # task 1 start
t11 = model.NewIntVar(0, horizon, 't11') # task 1 end

t20 = model.NewIntVar(0, horizon, 't20') # task 2 start
t21 = model.NewIntVar(0, horizon, 't21') # task 2 end

t30 = model.NewIntVar(0, horizon, 't30')
t31 = model.NewIntVar(0, horizon, 't31')

t40 = model.NewIntVar(0, horizon, 't40')
t41 = model.NewIntVar(0, horizon, 't41')

#create intervals for each task with given durations
i0 = model.NewIntervalVar(t00, 3, t01, 'i0')
i1 = model.NewIntervalVar(t10, 1, t11, 'i1')
i2 = model.NewIntervalVar(t20, 1, t21, 'i2')
i3 = model.NewIntervalVar(t30, 2, t31, 'i3')
i4 = model.NewIntervalVar(t40, 1, t41, 'i4')

#only one technician so no overlap between any of the task intervals
model.AddNoOverlap([i0, i1, i2, i3, i4])

#minimize sum of all start points (equivalent to minimizing total machine downtime)
model.Minimize(t00+t10+t20+t30+t40)

solver = cp_model.CpSolver()
status = solver.Solve(model)

#present results in a schedule of tasks
df=pd.DataFrame()
df['task']=[0,1,2,3,4]
df['start']=[solver.Value(x) for x in [t00,t10,t20,t30,t40]]
df['end']=[solver.Value(x) for x in [t01,t11,t21,t31,t41]]
df=df.sort_values(by='start').reset_index(drop=True)
print(df)

In my successful code (for one technician) I have scheduled five tasks labelled 0,1,2,3,4 with durations 3,1,1,2,1 units respectively. The one-technician script correctly optimizes by putting the tasks in the order 1,2,4,3,0.

As far as I can see from some other posts (e.g. Nurse scheduling example: preventing overlapping shifts) I think I need to introduce two boolean variables for each machine to indicate which technician (call them a and b) carries out each task. E.g. I need for task 0 (fixing machine 0):

bool_0a=model.NewBoolVar('bool_0a') # if True means task 0 done by technician a
bool_0b=model.NewBoolVar('bool_0b') # if True means task 0 done by technician b

Then I need to introduce model.AddBoolOr([bool_0a, bool_0b]) to prevent two technicians from fixing the same machine.

However I now get stuck with is how to handle the AddNoOverlaps condition. Previously it was model.AddNoOverlap([i0, i1, i2, i3, i4]) except now I need to apply it to two sets of intervals (one for each technician) and I don't know which tasks are in which set until I solve the problem.

Please could someone suggest how I can do this. Or maybe I am using the wrong ideas to move to the two-technician case which is somehow not a simple extension of the one-technician case.

Added: working code following comments and answer

Below is a working code for the problem following the answers and comments. Every task-technician pair is given a boolean variable tech_for_task. Those tasks which are carried out by a technician are True otherwise False. For each task, a model.AddBoolOr is applied to the list of booleans in tech_for_task to ensure it is done by one technician only. No overlaps are applied per-technician to their set of intervals.

One thing which works but I am not sure is the best practice: I minimize the lost output objective function which sums the end times of all tasks including the physically meaningless ones which have tech_for_task=False. Looking at the solution, ends=0 for these cases so they don't contribute to the objective function. However, it would be nice to not include these in the sum in the first place. A weighted sum as suggested here (Google OR tools - train scheduling problem) would seem good but in my code it appears to introduce a non-linearity which is verboten.

The code seems to give sensible results. The ten-task case with multiple technicians takes a while to run but the results show each technicians' tasks are sorted in ascending order which seems sensible. Thanks everyone.

from ortools.sat.python import cp_model
import pandas as pd

n_techs = 2                                   # number technicians

#different scenarios for testing {machine:duration to fix}
durations = {0: 3, 1: 1, 2: 1, 3: 2, 4: 1}
# durations = {0: 10, 1: 9, 2: 8, 3: 7, 4: 6,5: 5, 6: 4, 7: 3, 8: 2, 9: 1, 10:1}


model = cp_model.CpModel()
n_tasks=len(durations)
all_techs=range(n_techs)
all_tasks=range(n_tasks)

#horizon is sum of all durations (conservative)
horizon=sum(durations.values())

#boolean variable for each combination of technician and task.  if True then technician works on this task
tech_for_task={}
for tech in all_techs:
    for task in all_tasks:
        tech_for_task[(tech,task)]=model.NewBoolVar('tech_{0}_on_task_{1}'.format(tech,task))
    
    
#for each task, apply sum(tech_for_task)==1 to ensure one tech works on each task only
for task in all_tasks:
    model.Add(sum([tech_for_task[(tech,task)] for tech in all_techs])==1)
    
#boolean or condition to ensure that only one tech works on each task (replaced by sum==1 following comment)
# for task in all_tasks:
    # model.AddBoolOr([tech_for_task[(tech,task)] for tech in all_techs])


#start,end and interval for each task
starts = {}
ends = {}
intervals = {}
for tech in all_techs:
    for task in all_tasks:
        
        starts[(tech,task)] = model.NewIntVar(0, horizon, 'tech_{0}_task_{1}_start'.format(tech,task))
        ends[(tech,task)] = model.NewIntVar(0, horizon, 'tech_{0}_task_{1}_end'.format(tech,task))
        
        #this optional interval is only live when a technician is working on a task (tech_for_task=True)
        intervals[(tech,task)]=model.NewOptionalIntervalVar(starts[(tech,task)], 
                                               durations[task], 
                                               ends[(tech,task)], 
                                               tech_for_task[(tech,task)], 
                                               'tech_{0}_task_{1}_opt_interval'.format(tech,task))


#the tasks for each technician should not overlap
for tech in all_techs:
    model.AddNoOverlap([intervals[(tech,task)] for task in all_tasks])

        
# minimize sum of end time points for all tasks.  ends=0 when tech_for_task=False so not cause problem
obj_fun=[]
for tech in all_techs:
    for task in all_tasks:
        obj_fun.append(ends[(tech,task)])
model.Minimize(sum(obj_fun))

#thought this would minimize the endpoints of active intervals but its nonlinear so doesnt work
# model.Minimize(sum( ends[(tech,task)] * tech_for_task[(tech,task)] for tech in all_techs for task in all_tasks ))

solver = cp_model.CpSolver()
status = solver.Solve(model)

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
    
    print('solved ok')
    
    c_starts={}
    c_ends={}
    c_tech_for_task={}
    c_tech={}
    c_task={}
    for tech in all_techs:
        for task in all_tasks:
            tt=(tech,task)
            c_starts[tt]=solver.Value(starts[tt])
            c_ends[tt]=solver.Value(ends[tt])
            c_tech_for_task[tt]=solver.Value(tech_for_task[tt])
            c_tech[tt]=tech
            c_task[tt]=task

    df=pd.DataFrame()
    df['task']=c_task.values()
    df['tech']=c_tech.values()
    df['start']=c_starts.values()
    df['end']=c_ends.values()
    df['tech_for_task']=c_tech_for_task.values()

    #sort chronologically    
    df=df.sort_values(by='start').reset_index(drop=True)
    df['duration']=df['task'].map(durations)
    
    #get only the tasks which were done by a tech (tech_for_task=0 are not meaningful)
    df=df[df.tech_for_task==1]
    print(df)

elif status==cp_model.MODEL_INVALID:
    print('Model invalid :-(')

Solution

  • You could model it by creating an array of variables technicianForTask[task] which indicate which technician is doing each task. Then add no-overlap constraints for each pair of intervals, but only enforced if the technician is the same for both the tasks.

    I don't have a working Python installation, but the equivalent c# code would look like this:

                int nTasks = 10;
                int nTechnicians = 3;
    
                IntVar[] technicianForTask = new IntVar[nTasks];
                IntVar[] start = new IntVar[nTasks];
                IntVar[] end = new IntVar[nTasks];
                IntervalVar[] interval = new IntervalVar[nTasks];
                for (int i = 0; i < nTasks; i++)
                {
                    technicianForTask[i] = model.NewIntVar(0, nTechnicians - 1, $"Technician for task {i}");
                    start[i] = model.NewIntVar(0, horizon, $"Start of task {i}");
                    end[i] = model.NewIntVar(0, horizon, $"End of task {i}");
                    interval[i] = model.NewIntervalVar(start[i], 3, end[i], $"Interval for task {i}"); // You'll have to put the right duration in here
                }
    
                for (int i = 0; i < nTasks - 1; i++)
                {
                    for (int j = i + 1; j < nTasks; j++)
                    {
                        IntVar sameTechnician = model.NewBoolVar($"Job {i} and {j} have the same technician.");
                        model.Add(technicianForTask[i] == technicianForTask[j]).OnlyEnforceIf(sameTechnician);
                        model.Add(technicianForTask[i] != technicianForTask[j]).OnlyEnforceIf(sameTechnician.Not());
                        model.AddNoOverlap(new List<IntervalVar>() { interval[i], interval[j] }).OnlyEnforceIf(sameTechnician);
                    }
                }
    

    I'm sure you can convert to the equivalent Python code.

    You'll have to redefine your objective function with the sum of the array of start times.