Search code examples

Optimisation algorithm, constraints and score calculation configurations with Optapy

I am using the Optapy library in python, and I am using the school timetabling instance on GitHub as a base. I have few questions regarding the library configurations:

  • How do I choose the optimisation algorithm (e.g. tabu search or simulated annealing)?
  • How do Optapy calculate the score of a solution? Do I have the option to change the score calculation type in python?
  • How can I decide the weights for each constraint, except hard or soft constraint?

I was looking at OptaPlanner User Guide, but I am not sure how to implement it on python.

Guidance appreciated.


  • OptaPy can be configured using the programmatic API. The config classes can be found in the optapy.config package. In particular, you choose the optimisation algorithm via withPhases:

    import optapy.config
    solver_config = optapy.config.solver.SolverConfig().withEntityClasses(get_class(Lesson)) \
        .withSolutionClass(get_class(TimeTable)) \
        .withConstraintProviderClass(get_class(define_constraints)) \
        .withTerminationSpentLimit(Duration.ofSeconds(30)) \

    (the above configures simulated annealing).

    Recently added was the @easy_score_calculator and @incremental_score_calculator decorators, which allows you to define an EasyScoreCalculator or IncrementalScoreCalculator respectively. For example, (EasyScoreCalculator, maximize value):

    def my_score_calculator(solution: Solution):
        total_score = 0
        for entity in solution.entity_list:
            total_score += 0 if entity.value is None else entity.value
        return optapy.score.SimpleScore.of(total_score)
    solver_config = optapy.config.solver.SolverConfig()
    termination_config = optapy.config.solver.termination.TerminationConfig()
    solver_config.withSolutionClass(optapy.get_class(Solution)) \
        .withEntityClasses(optapy.get_class(Entity)) \
        .withEasyScoreCalculatorClass(optapy.get_class(my_score_calculator)) \

    or with an IncrementalScoreCalculator (NQueens):

    class IncrementalScoreCalculator:
        score: int
        row_index_map: dict
        ascending_diagonal_index_map: dict
        descending_diagonal_index_map: dict
        def resetWorkingSolution(self, working_solution: Solution):
            n = working_solution.n
            self.row_index_map = dict()
            self.ascending_diagonal_index_map = dict()
            self.descending_diagonal_index_map = dict()
            for i in range(n):
                self.row_index_map[i] = list()
                self.ascending_diagonal_index_map[i] = list()
                self.descending_diagonal_index_map[i] = list()
                if i != 0:
                    self.ascending_diagonal_index_map[n - 1 + i] = list()
                    self.descending_diagonal_index_map[-i] = list()
            self.score = 0
            for queen in working_solution.queen_list:
        def beforeEntityAdded(self, entity: any):
        def afterEntityAdded(self, entity: any):
        def beforeVariableChanged(self, entity: any, variableName: str):
        def afterVariableChanged(self, entity: any, variableName: str):
        def beforeEntityRemoved(self, entity: any):
        def afterEntityRemoved(self, entity: any):
        def insert(self, queen: Queen):
            row = queen.row
            if row is not None:
                row_index = queen.row
                row_index_list = self.row_index_map[row_index]
                self.score -= len(row_index_list)
                ascending_diagonal_index_list = self.ascending_diagonal_index_map[queen.getAscendingDiagonalIndex()]
                self.score -= len(ascending_diagonal_index_list)
                descending_diagonal_index_list = self.descending_diagonal_index_map[queen.getDescendingDiagonalIndex()]
                self.score -= len(descending_diagonal_index_list)
        def retract(self, queen: Queen):
            row = queen.row
            if row is not None:
                row_index = queen.row
                row_index_list = self.row_index_map[row_index]
                self.score += len(row_index_list)
                ascending_diagonal_index_list = self.ascending_diagonal_index_map[queen.getAscendingDiagonalIndex()]
                self.score += len(ascending_diagonal_index_list)
                descending_diagonal_index_list = self.descending_diagonal_index_map[queen.getDescendingDiagonalIndex()]
                self.score += len(descending_diagonal_index_list)
        def calculateScore(self) -> optapy.score.SimpleScore:
            return optapy.score.SimpleScore.of(self.score)
    solver_config = optapy.config.solver.SolverConfig()
    termination_config = optapy.config.solver.termination.TerminationConfig()
    solver_config.withSolutionClass(optapy.get_class(Solution)) \
        .withEntityClasses(optapy.get_class(Queen)) \
        .withScoreDirectorFactory(optapy.config.score.director.ScoreDirectorFactoryConfig() \
                                  .withIncrementalScoreCalculatorClass(optapy.get_class(IncrementalScoreCalculator))) \

    If by weights you mean ConstraintConfiguration (which allows you to define custom constraint weights per problem), that is not exposed via OptaPy yet. If you mean how to make a constraint weight more/less, either change the second parameter to penalize/reward (if constant), or add a third parameter that computes the constraint multiplier (which the second parameter will be multiplied by), like so:

    def undesired_day_for_employee(constraint_factory: ConstraintFactory):
        return constraint_factory.forEach(shift_class) \
            .join(availability_class, [Joiners.equal(lambda shift: shift.employee,
                                                     lambda availability: availability.employee),
                                       Joiners.equal(lambda shift:,
                                                     lambda availability:
                                       ]) \
            .filter(lambda shift, availability: availability.availability_type == AvailabilityType.UNDESIRED) \
            .penalize('Undesired day for employee', HardSoftScore.ofSoft(2),
                      lambda shift, availability: get_shift_duration_in_minutes(shift))

    (this constraint penalizes by 2 soft for every minute an employee works on an UNDESIRED day)