Search code examples
optaplanneroptapy

Solving integer network flow problem using OptaPlanner


I am new to OptaPlanner and would like to test its capabilities on an integer network flow problem.

The problem I am trying to model is relatively simple:

  • We are given a directed graph with nodes and edges (where edges are (node from, node to) pairs).
  • All nodes and edges have lower bounds and upper bounds on flows, and per-unit flow costs.
  • All node and edge flows are integer.
  • Constraints:
    • Flow bounds: both node and edge flows must be between their respective lower and upper bounds (hard penalty),
    • Flow conservation: for every non-source and non-sink node, the total in-flow must equal the total out-flow (hard penalty).
    • Cost minimization: we wish to minimize the per-unit flow cost for both node and edge flows (soft penalty).

I started by modeling the problem facts as follows using OptaPy:

from optapy import problem_fact, planning_id

@problem_fact
class Node:
    def __init__(self, id, lb, ub, cost):
        self.id = id
        self.lb = lb
        self.ub = ub
        self.cost = cost

    @planning_id
    def get_id(self):
        return self.id

    def __str__(self):
        return f'Node(id={self.id},lb={self.lb},ub={self.ub},cost={self.cost})'

@problem_fact
class Edge:
    def __init__(self, id, node_from, node_to, cost):
        self.id = id
        self.node_from = node_from
        self.node_to = node_to
        self.cost = cost

    @planning_id
    def get_id(self):
        return self.id

    def __str__(self):
        return f'Node(id={self.id},node_from={self.node_from.id},node_to={self.node_to.id},cost={self.cost})'

I am guessing that the planning entities and variables are done analogously?

from optapy import planning_entity, planning_variable

@planning_entity
class NodeFlow:
    def __init__(self, id, node, flow=None):
        self.id = id
        self.node = node
        self.flow = flow

    @planning_id
    def get_id(self):
        return self.id

    @planning_variable(Node, ['nodeFlowRange'])
    def get_flow(self):
        return self.flow

    def set_flow(self, new_flow):
        self.flow = new_flow

@planning_entity
class EdgeFlow:
    def __init__(self, id, edge, flow=None):
        self.id = id
        self.edge = edge
        self.flow = flow

    @planning_id
    def get_id(self):
        return self.id

    @planning_variable(Node, ['edgeFlowRange'])
    def get_flow(self):
        return self.flow

    def set_flow(self, new_flow):
        self.flow = new_flow

However, now I am stuck modeling the constraints. It's not clear to me how to model, e.g., flow conservation. My code skeleton is as follows:

from optapy import constraint_provider
from optapy.types import Joiners, HardSoftScore

@constraint_provider
def define_constraints(constraint_factory):
    return [
        # Hard constraints
        flow_conservation(constraint_factory)
    ]

def flow_conservation(constraint_factory):
    return constraint_factory \
        .for_each(Node) \
        #.group_by(?) # To sum in-flows and out-flows per node?
        .join(Node,
              # ?
              ) \
        .penalize('Flow conservation conflict', HardSoftScore.ONE_HARD)

Is this the correct way to model such a problem? How do I model e.g. flow conservation?

Thank you for your help!


Solution

  • "flow_bounds" can be modelled as a "built-in" constraint by using "@value_range_provider" on a "@planning_entity" (as described in https://www.optapy.org/docs/latest/planner-configuration/planner-configuration.html#valueRangeProviderOnPlanningEntity), but it appears there is a bug I need to fix with @value_range_provider on @planning_entity) (will add how to do it once a build with the fix is released).

    Since "flow_conservation" only apply to non_sources/non_sinks, I recommend adding a kind field to Node to determine if it a source or sink (or just a normal node). From your NodeFlow and EdgeFlow classes, flow is declared to be a Node, but from your problem description, it should be an integer. I am confused what NodeFlow represent (is it outgoing - incoming for a given and hence should always be 0 except for source and sink nodes)?

    "cost_minimization" can be split into node_cost_minimization and edge_cost_minimization, which sums the product item.flow * item.cost and penalize by it. Putting all the constraints together, it should look something like this:

    from optapy import constraint_provider
    from optapy.constraint import Joiners, ConstraintCollectors
    from optapy.score import HardSoftScore
    from domain import Node, Edge, NodeFlow, EdgeFlow
    
    @constraint_provider
    def flow_constraints(constraint_factory):
        return [
            flow_conservation(constraint_factory),
            minimize_node_cost(constraint_factory),
            minimize_edge_cost(constraint_factory),
            outgoing_bounds(constraint_factory),
            incoming_bounds(constraint_factory),
        ]
    
    
    def outgoing_bounds(constraint_factory):
        return (
            constraint_factory.for_each(Node)
                 # Join with outgoing edge flows
                 .join(EdgeFlow, Joiners.equal(lambda node: node.id, lambda edge_flow: edge_flow.edge.node_from.id))
                 # Get total outgoing flow
                 .group_by(lambda node, outgoing_edge: node,
                           ConstraintCollectors.sum(lambda node, outgoing_edge: outgoing_edge.flow))
                 # Only penalize if outgoing_flow is not in bounds for node
                 .filter(lambda node, outgoing_flow: outgoing_flow < node.lb or outgoing_flow > node.ub)
                 .penalize('source_bounds', HardSoftScore.ONE_HARD, lambda node, outgoing_flow: 1000 * max(node.lb - outgoing_flow, outgoing_flow - node.ub))
        )
    
    
    def incoming_bounds(constraint_factory):
        return (
            constraint_factory.for_each(Node)
                 # Join with incoming edge flows
                 .join(EdgeFlow, Joiners.equal(lambda node: node.id,
                                               lambda edge_flow: edge_flow.edge.node_to.id))
                 # Get total incoming flow
                 .group_by(lambda node, incoming_edge: node,
                           ConstraintCollectors.sum(lambda node, incoming_edge: incoming_edge.flow))
                 # Only penalize if outgoing_flow is not in bounds for node
                 .filter(lambda node, incoming_flow: incoming_flow < node.lb or incoming_flow > node.ub)
                 .penalize('sink_bounds', HardSoftScore.ONE_HARD, lambda node, incoming_flow: 1000 * max(node.lb - incoming_flow, incoming_flow - node.ub))
        )
    
    
    def flow_conservation(constraint_factory):
        return (
            constraint_factory.for_each(Node)
                 # Do not include source and sink nodes
                 .filter(lambda node: node.kind != 'source' and node.kind != 'sink')
                 # Join with incoming edge flows
                 .join(EdgeFlow, Joiners.equal(lambda node: node.id,
                                               lambda edge_flow: edge_flow.edge.node_from.id))
                 # Get total incoming flow
                 .group_by(lambda node, incoming_edge: node,
                           ConstraintCollectors.sum(lambda node, incoming_edge: incoming_edge.flow))
                 # Join with outgoing edge flows
                 .join(EdgeFlow, Joiners.equal(lambda node, incoming_flow: node.id,
                                               lambda edge_flow: edge_flow.edge.node_to.id))
                 # Get total outgoing flow
                 .group_by(lambda node, incoming_flow, outgoing_edge: node,
                           lambda node, incoming_flow, outgoing_edge: incoming_flow,
                           ConstraintCollectors.sum(lambda node, incoming_flow, outgoing_edge: outgoing_edge.flow))
                 # Only penalize if incoming flow != outgoing flow
                 .filter(lambda node, incoming_flow, outgoing_flow: incoming_flow != outgoing_flow)
                 .penalize('flow_conservation', HardSoftScore.ONE_HARD, lambda node, incoming_flow, outgoing_flow: abs(incoming_flow - outgoing_flow))
        )
    
    
    def minimize_node_cost(constraint_factory):
        return (
            constraint_factory.for_each(NodeFlow)
                .group_by(ConstraintCollectors.sum(lambda node_flow: node_flow.node.cost * node_flow.flow))
                .penalize('node flow cost', HardSoftScore.ONE_SOFT, lambda cost: cost)
        )
    
    
    def minimize_edge_cost(constraint_factory):
        return (
            constraint_factory.for_each(EdgeFlow)
                .group_by(ConstraintCollectors.sum(lambda edge_flow: edge_flow.edge.cost * edge_flow.flow))
                .penalize('edge flow cost', HardSoftScore.ONE_SOFT, lambda cost: cost)
        )
    

    The domain objects I used:

    from optapy import problem_fact, planning_id
    
    
    @problem_fact
    class Node:
        def __init__(self, id, lb, ub, cost, kind):
            self.id = id
            self.lb = lb
            self.ub = ub
            self.kind = kind
            self.cost = cost
    
        @planning_id
        def get_id(self):
            return self.id
    
        def __str__(self):
            return f'Node(id={self.id},lb={self.lb},ub={self.ub},cost={self.cost})'
    
    
    @problem_fact
    class Edge:
        def __init__(self, id, node_from, node_to, lb, ub, cost):
            self.id = id
            self.node_from = node_from
            self.node_to = node_to
            self.lb = lb
            self.ub = ub
            self.cost = cost
    
        @planning_id
        def get_id(self):
            return self.id
    
        def __str__(self):
            return f'Node(id={self.id},node_from={self.node_from.id},node_to={self.node_to.id},cost={self.cost})'
    
    
    from optapy import planning_entity, planning_variable, planning_solution, planning_score, \
    planning_entity_collection_property, problem_fact_collection_property, value_range_provider
    
    from optapy.score import HardSoftScore
    
    
    @planning_entity
    class NodeFlow:
        def __init__(self, id, node, flow=None):
            self.id = id
            self.node = node
            self.flow = flow
    
        @planning_id
        def get_id(self):
            return self.id
    
        @planning_variable(object, ['nodeFlowRange'])
        def get_flow(self):
            return self.flow
    
        def set_flow(self, new_flow):
            self.flow = new_flow
    
        def __str__(self):
            return f'{self.node}: {self.flow}'
    
    
    
    @planning_entity
    class EdgeFlow:
        def __init__(self, id, edge, flow=None):
            self.id = id
            self.edge = edge
            self.flow = flow
    
        @planning_id
        def get_id(self):
            return self.id
    
        @planning_variable(int, ['edgeFlowRange'])
        def get_flow(self):
            return self.flow
    
        def set_flow(self, new_flow):
            self.flow = new_flow
    
        def __str__(self):
            return f'{self.edge}: {self.flow}'
    
    
    
    @planning_solution
    class NetworkFlow:
        def __init__(self, nodes, edges, node_flows, edge_flows, score):
            self.nodes = nodes
            self.edges = edges
            self.node_flows = node_flows
            self.edge_flows = edge_flows
            self.score = score
    
        @problem_fact_collection_property(Node)
        def get_nodes(self):
            return self.nodes
    
        @problem_fact_collection_property(Edge)
        def get_edges(self):
            return self.edges
    
        @planning_entity_collection_property(NodeFlow)
        def get_node_flows(self):
            return self.node_flows
    
        @planning_entity_collection_property(EdgeFlow)
        def get_edge_flows(self):
            return self.edge_flows
    
        @planning_score(HardSoftScore)
        def get_score(self):
            return self.score
    
        @problem_fact_collection_property(int)
        @value_range_provider('edgeFlowRange')
        def get_edge_flow_range(self):
            if len(self.edges) == 0:
                return [0]
            else:
                lb = min(map(lambda edge: edge.lb, self.edges))
                ub = max(map(lambda edge: edge.ub, self.edges))
                return [i for i in range(lb, ub + 1)]
    
        @problem_fact_collection_property(int)
        @value_range_provider('nodeFlowRange')
        def get_node_flow_range(self):
            if len(self.nodes) == 0:
                return [0]
            else:
                lb = min(map(lambda node: node.lb, self.nodes))
                ub = max(map(lambda node: node.ub, self.nodes))
                return [i for i in range(lb, ub + 1)]
    
        def set_score(self, score):
            self.score = score
    
        def __str__(self):
            return (f'Nodes:\n' +
                    f'\n'.join(list(map(str, self.node_flows))) +
                    f'\nEdges:\n' +
                    f'\n'.join(list(map(str, self.edge_flows))) +
                    f'\nScore: {self.score}'
            )
    
    
    def generate_problem():
        nodes = [
            Node('source', 10, 10, 1, 'source'),
            Node('sink', 10, 10, 1, 'sink'),
            Node('a', 0, 2, 1, 'node'),
            Node('b', 0, 8, 2, 'node'),
            Node('c', 0, 3, 1, 'node'),
        ]
        edges = [
            Edge('source-a', nodes[0], nodes[2], 0, 2, 1),
            Edge('source-b', nodes[0], nodes[3], 0, 10, 2),
            Edge('b-c', nodes[3], nodes[4], 0, 3, 1),
            Edge('a-sink', nodes[2], nodes[1], 0, 2, 1),
            Edge('b-sink', nodes[3], nodes[1], 0, 5, 2),
            Edge('c-sink', nodes[4], nodes[1], 0, 3, 1),
        ]
        node_flows = []
        edge_flows = list(map(lambda edge: EdgeFlow(edge.id, edge), edges))
        return NetworkFlow(nodes, edges, node_flows, edge_flows, None)
    

    It can be simpler (for instance, you can put the planning variable "flow" directly into Node and Edge and remove NodeFlow and EdgeFlow). Do note OptaPy is significantly slower than OptaPlanner currently, but we are working on improving its performance (hopefully in the next release).