Search code examples
pythonintersectionsimpytraffic-simulation

Traffic simulation using Simpy


I have been working on simpy to design a traffic simulation of a signalized intersection (three approaches). The attached code described the design of the cars driving on a single lane road and arrive at the intersection from one direction only. The traffic light switching between green and red intervals. my question is: how to modify the code to simulate more than one traffic signal with different arrival rate that work in sequence at the same intersection. My goal is to connect more than one road to the intersection and control the traffic movement by a traffic signal.

Thanks in advance.

# Section 1: Import from modules and define a utility class.

from collections import deque # double-ended queue
from numpy import random
import simpy
from simpy.util import start_delayed


class Struct(object):
   """
   This simple class allows one to create an object whose attributes are
   initialized via keyword argument/value pairs.  One can update the attributes
   as needed later.
   """
   def __init__(self, **kwargs):
      self.__dict__.update(kwargs)


# Section 2: Initializations.

random.seed([1, 2, 3])

# Total number of seconds to be simulated:
end_time= 200.0

# Cars cars arrive at the traffic light according to a Poisson process with an
# average rate of 0.2 per second:
arrival_rate= 0.2
t_interarrival_mean= 1.0 / arrival_rate

# Traffic light green and red durations:
t_green= 30.0; t_red= 40.0

# The time for a car at the head of the queue to depart (clear the intersection)
# is modeled as a triangular distribution with specified minimum, maximum, and
# mode.
t_depart_left= 1.6; t_depart_mode= 2.0; t_depart_right= 2.4

# Initially, no cars are waiting at the light:
queue= deque()

# Track number of cars:
arrival_count= departure_count= 0

Q_stats= Struct(count=0, cars_waiting=0)
W_stats= Struct(count=0, waiting_time=0.0)


# Section 3: Arrival event.

def arrival():
   """
   This generator functions simulates the arrival of a car.  Cars arrive
   according to a Poisson process having rate `arrival_rate`.  The times between
   subsequent arrivals are i.i.d. exponential random variables with mean

      t_interarrival_mean= 1.0 / arrival_rate
   """
   global arrival_count, env, light, queue

   while True:
      arrival_count+= 1

      if light == 'red' or len(queue):

         # The light is red or there is a queue of cars.  ==> The new car joins
         # the queue.  Append a tuple that contains the number of the car and
         # the time at which it arrived:
         queue.append((arrival_count, env.now))
         print("Car #%d arrived and joined the queue at position %d at time "
           "%.3f." % (arrival_count, len(queue), env.now))

      else:

         # The light is green and no cars are waiting.  ==> The new car passes
         # through the intersection immediately.
         print("Car #%d arrived to a green light with no cars waiting at time "
           "%.3f." % (arrival_count, env.now))

         # Record waiting time statistics.  (This car experienced zero waiting
         # time, so we increment the count of cars, but the cumulative waiting
         # time remains unchanged.
         W_stats.count+= 1

      # Schedule next arrival:
      yield env.timeout( random.exponential(t_interarrival_mean))


# Section 4: Define event functions.

# Section 4.1: Departure event.

def departure():
   """
   This generator function simulates the 'departure' of a car, i.e., a car that
   previously entered the intersection clears the intersection.  Once a car has
   departed, we remove it from the queue, and we no longer track it in the
   simulation.
   """
   global env, queue

   while True:

      # The car that entered the intersection clears the intersection:
      car_number, t_arrival= queue.popleft()
      print("Car #%d departed at time %.3f, leaving %d cars in the queue."
        % (car_number, env.now, len(queue)))

      # Record waiting time statistics:
      W_stats.count+= 1
      W_stats.waiting_time+= env.now - t_arrival

      # If the light is red or the queue is empty, do not schedule the next
      # departure.  `departure` is a generator, so the `return` statement
      # terminates the iterator that the generator produces.
      if light == 'red' or len(queue) == 0:
         return

      # Generate departure delay as a random draw from triangular distribution:
      delay= random.triangular(left=t_depart_left, mode=t_depart_mode,
        right=t_depart_right)

      # Schedule next departure:
      yield env.timeout(delay)


# Section 4.2: Light change-of-state event.

def light():
   """
   This generator function simulates state changes of the traffic light.  For
   simplicity, the light is either green or red--there is no yellow state.
   """
   global env, light

   while True:


      # Section 4.2.1: Change the light to green.

      light= 'green'
      print("\nThe light turned green at time %.3f." % env.now)

      # If there are cars in the queue, schedule a departure event:
      if len(queue):

         # Generate departure delay as a random draw from triangular
         # distribution:
         delay= random.triangular(left=t_depart_left, mode=t_depart_mode,
           right=t_depart_right)

         start_delayed(env, departure(), delay=delay)

      # Schedule event that will turn the light red:
      yield env.timeout(t_green)


      # Section 4.2.2: Change the light to red.
      light= 'red'
      print("\nThe light turned red at time %.3f."   % env.now)

      # Schedule event that will turn the light green:
      yield env.timeout(t_red)


# Section 4.3: Schedule event that collects Q_stats.

def monitor():
   """
   This generator function produces an interator that collects statistics on the
   state of the queue at regular intervals.  An alternative approach would be to
   apply the PASTA property of the Poisson process ('Poisson Arrivals See Time
   Averages') and sample the queue at instants immediately prior to arrivals.
   """
   global env, Q_stats

   while True:
      Q_stats.count+= 1
      Q_stats.cars_waiting+= len(queue)
      yield env.timeout(1.0)


# Section 5: Schedule initial events and run the simulation.  Note: The first
# change of the traffic light, first arrival of a car, and first statistical
# monitoring event are scheduled by invoking `env.process`.  Subsequent changes
# will be scheduled by invoking the `timeout` method.  With this scheme, there
# is only one event of each of these types scheduled at any time; this keeps the
# event queue short, which is good for both memory utilization and running time.

print("\nSimulation of Cars Arriving at Intersection Controlled by a Traffic "
  "Light\n\n")

# Initialize environment:
env= simpy.Environment()

# Schedule first change of the traffic light:
env.process(light())

# Schedule first arrival of a car:
t_first_arrival= random.exponential(t_interarrival_mean)
start_delayed(env, arrival(), delay=t_first_arrival)

# Schedule first statistical monitoring event:
env.process(monitor())

# Let the simulation run for specified time:
env.run(until=end_time)


# Section 6: Report statistics.

print("\n\n      *** Statistics ***\n\n")

print("Mean number of cars waiting: %.3f"
  % (Q_stats.cars_waiting / float(Q_stats.count)))

print("Mean waiting time (seconds): %.3f"
  % (W_stats.waiting_time / float(W_stats.count)))

Solution

  • You need a master light controller to manage all your lights. I used the subscribe / publish pattern where each lane subscribed to the light they are interested in with a callback function. the light uses these callback functions to publish when the light is on. I treat red and green light as separate lights. The master light controller rotates through all the lights, calling each light's subscribers' callback function when the light turns on.

    This models a cross road, If you want to model a tee, just comment out any one lane.

    """
        Simulation of a traffic intersection
        
        Has four main lanes of traffic: North, South, East, and West
        for each direction there is also a turn lane: North left.... ect
    
        Assumes opposing lanes (such as North, and South) 
        will have the same light color
    
        Light are manganed by a light controler
    
        Lanes "subscribe" to the light color they are interested in
        with a callback function
        When the light color comes on, the light control calls the callback
        function of the lanes that subscribed to that light color
    
        When a lane has a green light, it sends cars through the intersection,
        otherwise arriving cars accumulate in a queue
    
        Programmer: Michael R. Gibbs
    """
    
    import simpy
    import random
    
    class LightControl():
        """
            Controls the color and timing of the lights
        """
    
        def __init__(self, env):
    
            self.env = env
    
            # subscrition lists for each color and direction combo
            self.red_ns_subs = []
            self.red_ns_left_subs = []
    
            self.red_ew_subs = []
            self.red_ew_left_subs = []
    
            self.green_ns_subs = []
            self.green_ns_left_subs = []
    
            self.green_ew_subs = []
            self.green_ew_left_subs = []
    
            # start the lights
            self.env.process(self.run_lights())
    
        def _braodcast(self, subscribers):
            """
                utility to call the callback funcs for each subscriber
            """
    
            for callback in subscribers:
                callback()
    
        def run_lights(self):
            """
                controls the lights with a simple loop
                note different light have different times
            """
    
            while True:
    
                print(f'{self.env.now} ns left is green')
                self._braodcast(self.green_ns_left_subs)
    
                yield self.env.timeout(60)
    
                self._braodcast(self.red_ns_left_subs)
                self._braodcast(self.green_ns_subs)
                print(f'{self.env.now} ns is green')
                
                yield self.env.timeout(180)
    
                self._braodcast(self.red_ns_subs)
                self._braodcast(self.green_ew_left_subs)
                print(f'{self.env.now} ew left is green')
                
                yield self.env.timeout(60)
    
                self._braodcast(self.red_ew_left_subs)
                self._braodcast(self.green_ew_subs)
                print(f'{self.env.now} ew is green')
                
                yield self.env.timeout(120)
    
                self._braodcast(self.red_ew_subs)
    
    class Lane():
        """
            lane of traffic
    
            accumulates cars,
            waits for green light,
            sends cars through interscetion, until red light
    
            note if a car is already in the intersection when
            the light truns red, it will finish its trip and 
            continue through
        """
    
        def __init__(self, name, env):
    
            self.name = name
            self.env = env
            self.lane_q = []
            self.green = False
    
        def add_car(self, car):
            """
            car arrives at lane
            """
    
            self.lane_q.append(car)
    
        def move_cars(self):
            """
            move cars through the intersection until light turns red
            """
    
            while self.green and (len(self.lane_q) > 0):
    
                car = self.lane_q.pop(0)
                yield self.env.timeout(10)
    
                print(f'{self.env.now} {self.name} sent car through')
    
        def green_light(self):
            """
            callback telling the lane that the light is green,
            starts moving cars through the intersection
            """
    
            self.green = True
    
            self.env.process(self.move_cars())
    
        def red_light(self):
            """
            callback telling the lane the light is red
            sets stop flag
            """
    
            self.green = False
    
    
    def gen_cars(env, lane, delay_func):
        """
        generates cars arriving at a lane
        """
    
        while True:
            yield env.timeout(delay_func())
            lane.add_car(object())
    
    # init sim
    env = simpy.Environment()
    lights = LightControl(env)
    
    # make lanes and subscibe to a lights with a callback
    north =Lane('North lane', env)
    
    lights.green_ns_subs.append(north.green_light)
    lights.red_ns_subs.append(north.red_light)
    
    north_left =Lane('North left lane', env)
    
    lights.green_ns_left_subs.append(north_left.green_light)
    lights.red_ns_left_subs.append(north_left.red_light)
    
    south =Lane('South lane', env)
    
    lights.green_ns_subs.append(south.green_light)
    lights.red_ns_subs.append(south.red_light)
    
    south_left =Lane('South left lane', env)
    
    lights.green_ns_left_subs.append(south_left.green_light)
    lights.red_ns_left_subs.append(south_left.red_light)
    
    east =Lane('East lane', env)
    
    lights.green_ew_subs.append(east.green_light)
    lights.red_ew_subs.append(east.red_light)
    
    east_left =Lane('East left lane', env)
    
    lights.green_ew_left_subs.append(east_left.green_light)
    lights.red_ew_left_subs.append(east_left.red_light)
    
    west =Lane('West lane', env)
    
    lights.green_ew_subs.append(west.green_light)
    lights.red_ew_subs.append(west.red_light)
    
    west_left =Lane('West left lane', env)
    
    lights.green_ew_left_subs.append(west_left.green_light)
    lights.red_ew_left_subs.append(west_left.red_light)
    
    # generate cars for each lane
    env.process(gen_cars(env, north, lambda : random.triangular(5,15,10)))
    env.process(gen_cars(env, north_left, lambda : random.triangular(10,30,20)))
    env.process(gen_cars(env, south, lambda : random.triangular(5,15,10)))
    env.process(gen_cars(env, south_left, lambda : random.triangular(12,35,20)))
    env.process(gen_cars(env, east, lambda : random.triangular(10,40,30)))
    env.process(gen_cars(env, east_left, lambda : random.triangular(15,45,30)))
    env.process(gen_cars(env, west, lambda : random.triangular(11,45,35)))
    env.process(gen_cars(env, west_left, lambda : random.triangular(15,50,40)))
    
    env.run(1200)