Search code examples
pythonsimpy

How to implement batch resource using SimPy


I spent some time looking for the solution to this kind of simple problem but still could not find it. The problem is to simulate process when some events can be processes concurrently by limited amount of resources while some events require aggregating first and then batch processing. Here is a simple code example of ferry transporting people:

import simpy
import matplotlib.pyplot as plt


class Person:
    def __init__(self, name):
        self.name = name
        self.timeline = []

    def log(self, event, time):
        self.timeline.append((event, time))


class BatchResource(simpy.Resource):
    def __init__(self, env, capacity, batch_size, process_time):
        super().__init__(env, capacity)
        self.batch_size = batch_size
        self.process_time = process_time
        self.batch_queue = []

    def request(self):
        req = super().request()
        self.batch_queue.append(req)

        # Start processing only if a full batch is ready
        if len(self.batch_queue) >= self.batch_size:
            self._env.process(self.process_batch())

        return req

    def process_batch(self):
        batch = self.batch_queue[: self.batch_size]  # Take the batch
        self.batch_queue = self.batch_queue[self.batch_size :]  # Remove from queue

        print(f"[{self._env.now}] Ferry is full! Departing with {len(batch)} people.")
        yield self._env.timeout(self.process_time)  # Simulate crossing time

        print(f"[{self._env.now}] Ferry has crossed the river.")

        # Release all passengers in the batch
        for req in batch:
            if not req.triggered:  # Ensure it hasn't already been granted
                req.succeed()


def person(env, person, ferry, casses_before, casses_after):
    # Process before crossing the river
    with casses_before.request() as request:
        yield request
        person.log("start_before", env.now)
        yield env.timeout(1)
        person.log("end_before", env.now)

    # Wait for the ferry (only departs when full)
    with ferry.request() as request:
        yield request
        person.log("start_wait", env.now)
        yield env.timeout(1)
        person.log("end_wait", env.now)

    # Process after crossing the river
    with casses_after.request() as request:
        yield request
        person.log("start_after", env.now)
        yield env.timeout(1)
        person.log("end_after", env.now)


def setup(env, num_people, capacity, people):
    casses_before = simpy.Resource(env, capacity=3)
    casses_after = simpy.Resource(env, capacity=3)
    ferry = BatchResource(env, capacity=capacity, batch_size=capacity, process_time=5)

    for i in range(num_people):
        person_instance = Person(f"Person {i}")
        people.append(person_instance)
        env.process(person(env, person_instance, ferry, casses_before, casses_after))
        yield env.timeout(0.5)  # New person arrives every 0.5 time units


# Setup and start the simulation
env = simpy.Environment()
people = []
env.process(setup(env, num_people=30, capacity=10, people=people))
env.run(until=50)

# Plot Gantt-like chart
fig, ax = plt.subplots(figsize=(10, 8))

colors = {"before": "tab:blue", "wait": "tab:orange", "after": "tab:green"}

for i, person in enumerate(people):
    for event, time in person.timeline:
        if "start" in event:
            start_time = time
        elif "end" in event:
            end_time = time
            stage = event.split("_")[1]
            ax.broken_barh(
                [(start_time, end_time - start_time)],
                (i * 10, 9),
                facecolors=(colors[stage]),
            )

ax.set_yticks([i * 10 + 5 for i in range(len(people))])
ax.set_yticklabels([person.name for person in people])
ax.set_xlabel("Time")
ax.set_ylabel("Person")
ax.grid(True)

plt.show()

It results in the following process gantt: enter image description here

But I am looking for the case when ferry (orange blocks) wait for people to fill it completely and then to do work.

Can someone please suggest the way to implement it?

Thanks!


Solution

  • I assume you are looking for a chart like this?

    enter image description here

    You need to think of the ferry as a process not a resource. There are two ways to do this. The first is to break the process into two processes. The first process does the pre processing and puts the person into a queue. When the queue is big enough, pop the batch off the queue and start the ferry process. the pre processing process is at the person level and the ferry is at batch level that iterates over each person at event time. The second way is for the ferry to publish depart and arrive events that the people subscribe to. Everyone in a batch are listing to the same events and reacts at the same time with the events fire. I listed both codes

        import simpy
        import matplotlib.pyplot as plt
        
        """
        Ferry Simulation using a batching queue
        
        We use a chaing of processes to move the passengers
        The first process in the do the sign in
        the second process, when the queue is big enough, to to move th passengers
        
        Agmented by: Michael R. Gibbs
        """
        
        
        class Person:
            def __init__(self, name):
                self.name = name
                self.timeline = []
        
            def log(self, event, time):
                self.timeline.append((event, time))
        
        
            
        # not used anymore
        # class BatchResource(simpy.Resource):
        #     def __init__(self, env, capacity, batch_size, process_time):
        #         super().__init__(env, capacity)
        #         self.batch_size = batch_size
        #         self.process_time = process_time
        #         self.batch_queue = []
        
        #     def request(self):
        #         req = super().request()
        #         self.batch_queue.append(req)
        
        #         # Start processing only if a full batch is ready
        #         if len(self.batch_queue) >= self.batch_size:
        #             self._env.process(self.process_batch())
        
        #         return req
        
        #     def process_batch(self):
        #         batch = self.batch_queue[: self.batch_size]  # Take the batch
        #         self.batch_queue = self.batch_queue[self.batch_size :]  # Remove from queue
        
        #         print(f"[{self._env.now}] Ferry is full! Departing with {len(batch)} people.")
        #         yield self._env.timeout(self.process_time)  # Simulate crossing time
        
        #         print(f"[{self._env.now}] Ferry has crossed the river.")
        
        #         # Release all passengers in the batch
        #         for req in batch:
        #             if not req.triggered:  # Ensure it hasn't already been granted
        #                 req.succeed()
        
        
        
        # updated to use a batching queue
        def sign_in(env, batch_q, person, batch_size):
            """
            pre ferry stuff, sign in and wait for ferry to leave
            """
        
            # Process before crossing the river
            person.log("start_before", env.now)
            yield env.timeout(1)
            person.log("end_before", env.now)
        
            # start waiting for the ferry in a queue
            batch_q.append(person)
         
            person.log("start_wait", env.now)
        
            if len(batch_q) >= batch_size:
                env.process(send_ferry(env, batch_q, batch_size, 5))
        
        def send_ferry(env, batch_q, batch_size, ferry_time):
            """
            The ferry loads a queue worth of people and transport them
            """
            ferry_passengers = batch_q[0:batch_size]
            del batch_q[0:batch_size]
        
            # load and depart
            for p in ferry_passengers:
                p.log("end_wait", env.now)
                p.log("start_after", env.now)
        
            # ship
            yield env.timeout(ferry_time)
        
            # arrive
            for p in ferry_passengers:
                p.log("end_after", env.now)
        
        
        
        def setup(env, batch_q, num_people, capacity, people):
            # casses_before = simpy.Resource(env, capacity=3)
            # casses_after = simpy.Resource(env, capacity=3)
            #ferry = BatchResource(env, capacity=capacity, batch_size=capacity, process_time=5)
            
        
        
            for i in range(num_people):
                person_instance = Person(f"Person {i}")
                people.append(person_instance)
                
                env.process(sign_in(env, batch_q, person_instance, capacity))
        
                yield env.timeout(0.5)  # New person arrives every 0.5 time units
        
        
        # Setup and start the simulation
        
        # new batching queue is just a list
        batch_q = []
        
        env = simpy.Environment()
        people = []
        env.process(setup(env, batch_q, num_people=30, capacity=10, people=people))
        env.run(until=60)
        
        # Plot Gantt-like chart
        fig, ax = plt.subplots(figsize=(10, 8))
        
        colors = {"before": "tab:blue", "wait": "tab:orange", "after": "tab:green"}
        
        for i, person in enumerate(people):
            for event, time in person.timeline:
                if "start" in event:
                    start_time = time
                elif "end" in event:
                    end_time = time
                    stage = event.split("_")[1]
                    ax.broken_barh(
                        [(start_time, end_time - start_time)],
                        (i * 10, 9),
                        facecolors=(colors[stage]),
                    )
        
        ax.set_yticks([i * 10 + 5 for i in range(len(people))])
        ax.set_yticklabels([person.name for person in people])
        ax.set_xlabel("Time")
        ax.set_ylabel("Person")
        ax.grid(True)
        
        plt.show()
    

    Next example

    import simpy
    import matplotlib.pyplot as plt
    
    """
    Ferry Simulation using events to manage the processing
    
    Agmented by: Michael R. Gibbs
    """
    
    
    class Person:
        def __init__(self, name):
            self.name = name
            self.timeline = []
    
        def log(self, event, time):
            self.timeline.append((event, time))
    
    class Ferry():
        '''
        new class
    
        collects people until it is told to leave
        The Ferry has a start event that people can
        listen to for when the ferry departs.
        The ferry also has a event for when the ferry arrives.
        
        Ferry is not a resource, Its more of a object/agent
        using events to communitcate with people about the 
        depart and arrive events.  People can handle the events
        as they wish.
        '''
    
        # counts the number of boats creatd, also used for unique id
        boat_cnt = 0
    
        def __init__(self, env, batch_size, ferry_time):
    
            self.env = env
            self.batch_size = batch_size
            self.ferry_time = ferry_time
    
            self.passengers = []
    
            # events that the passengers can subscribe to
            self.depart_event = env.event()
            self.arrive_event = env.event()
    
            # add count and get id number
            self.__class__.boat_cnt += 1
            self.id = self.__class__.boat_cnt
    
        def add_passenger(self, person):
    
            print(f"[{self.env.now}] Person {person.name} has arrived on boat {self.id}")
    
            self.passengers.append(person)
    
        def get_depart_event(self):
            """
                Just a getter to get the current trip depart event
            """
    
            return self.depart_event
        
        def get_arrive_event(self):
            """
                Just a getter to get the current trip arrived event
            """
    
            return self.arrive_event
        
        def send_ferry(self):
    
            print(f"[{self.env.now}] Ferry {self.id} as departed with {len(self.passengers)} passengers")
            
            # notify the passengers that ferry has departed
            self.depart_event.succeed()
    
            # travel time
            yield self.env.timeout(self.ferry_time)
    
            # notify the passengers that ferry has arrived
            self.arrive_event.succeed()
    
        
    # not used anymore
    # class BatchResource(simpy.Resource):
    #     def __init__(self, env, capacity, batch_size, process_time):
    #         super().__init__(env, capacity)
    #         self.batch_size = batch_size
    #         self.process_time = process_time
    #         self.batch_queue = []
    
    #     def request(self):
    #         req = super().request()
    #         self.batch_queue.append(req)
    
    #         # Start processing only if a full batch is ready
    #         if len(self.batch_queue) >= self.batch_size:
    #             self._env.process(self.process_batch())
    
    #         return req
    
    #     def process_batch(self):
    #         batch = self.batch_queue[: self.batch_size]  # Take the batch
    #         self.batch_queue = self.batch_queue[self.batch_size :]  # Remove from queue
    
    #         print(f"[{self._env.now}] Ferry is full! Departing with {len(batch)} people.")
    #         yield self._env.timeout(self.process_time)  # Simulate crossing time
    
    #         print(f"[{self._env.now}] Ferry has crossed the river.")
    
    #         # Release all passengers in the batch
    #         for req in batch:
    #             if not req.triggered:  # Ensure it hasn't already been granted
    #                 req.succeed()
    
    
    
    # updated to use new ferry
    def person(env, person, ferry):
    
        # Process before crossing the river
        person.log("start_before", env.now)
        yield env.timeout(1)
        person.log("end_before", env.now)
    
        # wait for ferry to depart
        depart_event = ferry.get_depart_event()
        ferry.add_passenger(person)
    
        print(len(ferry.passengers), ferry.batch_size)
        if len(ferry.passengers) >= ferry.batch_size:
            env.process(ferry.send_ferry())
    
        person.log("start_wait", env.now)
        yield depart_event
        person.log("end_wait", env.now)
    
        # Process of crossing
        arrive_event = ferry.get_arrive_event()
        person.log("start_after", env.now)
        yield arrive_event
        person.log("end_after", env.now)
    
    
    def setup(env, num_people, capacity, people):
        # casses_before = simpy.Resource(env, capacity=3)
        # casses_after = simpy.Resource(env, capacity=3)
        #ferry = BatchResource(env, capacity=capacity, batch_size=capacity, process_time=5)
        
        ferry = Ferry(env, capacity, 5)
        cnt = 0
    
        for i in range(num_people):
            person_instance = Person(f"Person {i}")
            people.append(person_instance)
            
            env.process(person(env, person_instance, ferry))
    
            # check if ferry is full
            cnt += 1
            if cnt >= ferry.batch_size:
                ferry = Ferry(env, capacity, 5)
                cnt = 0
    
            yield env.timeout(0.5)  # New person arrives every 0.5 time units
    
    
    # Setup and start the simulation
    env = simpy.Environment()
    people = []
    env.process(setup(env, num_people=30, capacity=10, people=people))
    env.run(until=60)
    
    # Plot Gantt-like chart
    fig, ax = plt.subplots(figsize=(10, 8))
    
    colors = {"before": "tab:blue", "wait": "tab:orange", "after": "tab:green"}
    
    for i, person in enumerate(people):
        for event, time in person.timeline:
            if "start" in event:
                start_time = time
            elif "end" in event:
                end_time = time
                stage = event.split("_")[1]
                ax.broken_barh(
                    [(start_time, end_time - start_time)],
                    (i * 10, 9),
                    facecolors=(colors[stage]),
                )
    
    ax.set_yticks([i * 10 + 5 for i in range(len(people))])
    ax.set_yticklabels([person.name for person in people])
    ax.set_xlabel("Time")
    ax.set_ylabel("Person")
    ax.grid(True)
    
    plt.show()