Search code examples
pythonsimpy

Why does the timeout() in SimPy 4 wait indefinitely? Could you please tell me what's going wrong?


I would like to create a simple simulation of a port, where "process" represents the various events occurring during the circulation of containers at the port, and "store" is used to hold the various resources. First, I create a ship service process (there could be many ships involved), and then within this process, I create the various event processes required for a single ship service.

Each process event takes time to consume, but I'm wondering why event T0, after acquiring the needed resources, waits indefinitely at the timeout() point, and subsequently does not produce resources. This leads to other events also being stuck in an endless loop, requesting resources they cannot obtain, because every other event depends on the first event to produce resources first. This is a procedural issue.

If I comment out the timeout(), the code runs normally and prints out the processes of various events consuming and producing resources.

def transition_processes(env, pid, tid, place_store, PN_Matrix, myT_attribute, Place_List, Ships, transition_flag, StopFlag, process_state_store, ships_state_store):

    print(f"Transition {tid} in process {pid} is running.")

    temp = yield process_state_store.get(lambda x: x['pid'] == pid)
    temp['state'][tid] = 'waiting'
    yield process_state_store.put(temp)
    index = int(tid[1:])
    from_place = []
    to_place = []
    cnt = 0
    for arc in PN_Matrix[index]:
        int_arc = [0, 0]
        if arc[0] == 'X':
            int_arc[0] = Ships[pid]['X']
        elif arc[0] == '-X':
            int_arc[0] = -Ships[pid]['X']
        elif arc[0] == 'Y':
            int_arc[0] = Ships[pid]['Y']
        elif arc[0] == '-Y':
            int_arc[0] = -Ships[pid]['Y']
        else:
            int_arc[0] = int(arc[0])

        int_arc[1] = int(arc[1])

        if int_arc[0] < 0:
            # Record the position/index of Pj that the transition needs to consume, as well as the arc weight and type
            from_place.append([cnt, int_arc[0], int_arc[1]])
        elif int_arc[0] > 0:
            # Record the destination Pj position/index of the token produced by the transition, as well as the arc weight and type
            to_place.append([cnt, int_arc[0], int_arc[1]])
        cnt += 1

    while True:
        temp = yield process_state_store.get(lambda x: x['pid'] == pid)
        yield process_state_store.put(temp)
        if temp['state'].get(tid, 'not start') != 'over':

            fire = True
            from_place_tokens = []
            for i in from_place:
                temp = yield place_store.get(lambda x: x['Place_id'] == i[0])
                from_place_tokens.append(temp)

            # Read the global Place tokens information
            for i, j in zip(from_place, from_place_tokens):
                # Check if there are enough resources to consume
                if i[2] != 0 and j['Tokens'].get(pid + '_' + str(i[2]), 0) < -i[1]:
                    fire = False
                    break
                elif i[2] == 0 and j['Tokens'].get('pub', 0) < -i[1]:
                    fire = False
                    break
                elif i[2] == 0 and j['Tokens'].get('pub', 0) == 1:
                    # If there is only 1 resource left in this pubPlace, give priority to the smaller numbered t event in the PN process chart
                    for t in range(len(PN_Matrix)):
                        if PN_Matrix[t][i[0]][0] != '0' and t < int(tid[1:]):
                            fire = False
                            print(f'{pid} {tid} requests the last pub_token of {i[0]}th Place failed, no priority, continue to wait\n')
                            break
                if not fire:
                    break

            # Execute the event
            if fire:
                # Consume resources
                for ind, i in enumerate(from_place):
                    if i[2] == 0:
                        from_place_tokens[ind]['Tokens']['pub'] += i[1]
                        print(f'{pid} {tid} consumes {-i[1]} class {i[2]} resources from {i[0]}th Place')
                    else:
                        from_place_tokens[ind]['Tokens'][pid + '_' + str(i[2])] = from_place_tokens[ind]['Tokens'].get(pid + '_' + str(i[2]), 0) + i[1]
                        print(f'{pid} {tid} consumes {-i[1]} class {i[2]} resources from {i[0]}th Place')

                # Simulate time consumption
                try:
                    print(f'{pid} {tid} timeout start, now:{env.now}')
                    yield env.timeout(int(myT_attribute[0]))
                    print(f'{pid} {tid} has finished the timeout')
                except Exception as e:
                    print(f'An error occurred: {e}')

                # Release all locks
                for i in from_place_tokens:
                    yield place_store.put(i)

                # Produce tokens
                for i in to_place:
                    if i[2] == 0:  # Public resources
                        temp = yield place_store.get(lambda x: x['Place_id'] == i[0])
                        temp['Tokens']['pub'] += i[1]
                        yield place_store.put(temp)
                        print(f'{pid} {tid} produces {i[1]} class {i[2]} resources to {i[0]}th Place')

                    else:  # Non-public resources with arc type
                        temp = yield place_store.get(lambda x: x['Place_id'] == i[0])
                        temp['Tokens'][pid + '_' + str(i[2])] = temp['Tokens'].get(pid + '_' + str(i[2]), 0) + i[1]
                        yield place_store.put(temp)
                        print(f'{pid} {tid} produces {i[1]} class {i[2]} resources to {i[0]}th Place')

                if tid in transition_flag:  # Record the moment for transitions that need moment recording
                    temp = yield process_state_store.get(lambda x: x['pid'] == pid)
                    yield process_state_store.put(temp)
                    if temp['state'][tid] != 'over':

                        val = transition_flag[tid]
                        temp = yield ships_state_store.get(lambda x: x['pid'] == pid)
                        temp['state'][val] = datetime.fromtimestamp(env.now).strftime('%Y-%m-%d %H:%M:%S')
                        yield ships_state_store.put(temp)

                        # If all are recorded, they can be put into the output queue
                        time_over_ship = True
                        try:
                            temp = yield ships_state_store.get(lambda x: x['pid'] == pid)
                            yield ships_state_store.put(temp)

                            for k, v in temp['state'].items():
                                if v == 'null':
                                    time_over_ship = False
                                    break
                            if time_over_ship:
                                yield over_ships_queue.put(temp)  # Put into queue
                                print(f'{pid} enqueued')
                        except Exception as e:
                            print(f'An error occurred: {e} \nRecording moments for certain transitions')

                # After the event is executed, it defaults back to the waiting state, special T becomes over
                if myT_attribute[1] == 1:
                    temp = yield process_state_store.get(lambda x: x['pid'] == pid)
                    temp['state'][tid] = 'over'
                    print(f'{pid} {tid} over')
                    yield process_state_store.put(temp)
                else:
                    temp = yield process_state_store.get(lambda x: x['pid'] == pid)
                    temp['state'][tid] = 'waiting'
                    yield process_state_store.put(temp)

                # Check after executing an event
                time_to_stop = True
                temp = yield process_state_store.get(lambda x: x['pid'] == pid)
                yield process_state_store.put(temp)

                for k, v in StopFlag.items():
                    if temp['state'].get(k, 'not start') != v:
                        time_to_stop = False
                        break

                if time_to_stop:
                    temp = yield process_state_store.get(lambda x: x['pid'] == pid)

                    for k, v in temp['state'].items():
                        temp['state'][k] = 'over'
                    yield process_state_store.put(temp)

                    print(f'{pid} shut down')

            # Release all locks
            else:
                for i in from_place_tokens:
                    yield place_store.put(i)

        else:
            break


def ship_processes(env, pid, place_store, PN_Matrix, TransitionAttribute, Place_List, Ships, transition_flag, StopFlag, process_state_store, ships_state_store, init_done_event):
    yield init_done_event
    item = {'pid': pid, 'state': {}}
    yield process_state_store.put(item)
    item = {'pid': pid, 'state': {}}
    yield ships_state_store.put(item)
    for k, v in transition_flag.items():
        temp = yield ships_state_store.get(lambda x: x['pid'] == pid)
        temp['state'][v] = 'null'
        yield ships_state_store.put(temp)
    for i in range(len(TransitionAttribute)):
        tid = f'T{i}'
        env.process(transition_processes(env, pid, tid, place_store, PN_Matrix, TransitionAttribute[i], Place_List, Ships, transition_flag, StopFlag, process_state_store, ships_state_store))

all code: It's a bit ugly, sorry, I'm a newbie.

def read_xlsx():  # Read the file to get the initial input data structure
    df_pp = pd.read_excel("./input_xlsx/PortPetriNet.xlsx")
    df_ta = pd.read_excel("./input_xlsx/TransitionAttribute.xlsx")
    df_pa = pd.read_excel("./input_xlsx/PlaceAttribute.xlsx")
    df_s = pd.read_excel("./input_xlsx/Ships.xlsx", index_col='Ship Number')
    df_tf = pd.read_excel("./input_xlsx/TransitionFlag.xlsx")
    df_sf = pd.read_excel("./input_xlsx/StopFlag.xlsx")

    # Delete the first column (column label)
    df_pp = df_pp.iloc[:, 1:]
    # Initialize a two-dimensional array
    pn_matrix = []
    # Iterate through each row of the DataFrame, set the elements in the list of each link in the matrix to str type for subsequent judgment of ±X/Y
    for index, row in df_pp.iterrows():
        # Initialize the list for the current row
        row_list = []
        # Iterate through each cell
        for item in row:
            # If the cell is not NaN (i.e., not empty)
            if pd.notnull(item):
                # Assuming the cell content is a string, split it into a list
                # Note: Here it is assumed that the cell content is already in string format '[X,1]', if not, it needs to be converted to a string str(item)
                split_items = str(item).strip('[]').split(',')
                # Add the split list to the current row list
                row_list.append(split_items)
            else:
                # If the cell is empty, add ['0', '0']
                row_list.append(['0', '0'])
        # Add the current row list to the two-dimensional array
        pn_matrix.append(row_list)
    # The final data type of pn_matrix is: [[str, str], ......]

    transition_attribute_values = df_ta.iloc[0]
    # Initialize the final list
    transition_attribute = []
    # Iterate through each element in the row (skipping the first one, as it is the name of the row)
    for item in transition_attribute_values:
        # Convert a list in string form (like '[600,1]') into a list of integers
        int_list = [int(x) for x in item.strip('[]').split(',')]
        # Add the list of integers to the final list
        transition_attribute.append(int_list)
    # The final type of transition_attribute is: [[int, int], ...]

    df_s.rename(columns={'Import Containers X': 'X', 'Export Containers Y': 'Y'}, inplace=True)
    df_s.index = df_s.index.astype(str)  # Convert the key part to a string
    ships = df_s.to_dict(orient='index')
    # The final type of ships: {str: {'X': int, 'Y': int}, ......}

    '''
    In Python 3.7 and above, dictionaries are iterated in insertion order. This is because from Python 3.7 onwards, dictionaries have been officially declared as ordered, meaning when you iterate through a dictionary, the key-value pairs are returned in the order they were added to the dictionary.
    In Python 3.6, this feature was introduced as a detail of the dictionary implementation and was guaranteed in CPython (the default implementation of Python), but it was not formally included in the language specification until version 3.7.
    In Python 3.5 and below, dictionaries are unordered, and if you need an ordered data structure, you should use collections.OrderedDict.
    '''

    # Initialize the result list
    place_attribute = []
    # Iterate through each row of the DataFrame
    for index, row in df_pa.iterrows():
        # Iterate through each cell
        for item in row:
            # Check if the cell is empty
            if not pd.isnull(item):
                # Otherwise, safely parse the string into a dictionary using ast.literal_eval
                place_attribute.append(ast.literal_eval(item))
    # The final type of place_attribute: [{str:int}, ......]

    # Convert the DataFrame to a dictionary, where the first column is the value and the second column is the key, note that this is the opposite of the stop_flag below
    transition_flag = pd.Series(df_tf.iloc[:, 0].values, index=df_tf.iloc[:, 1]).to_dict()
    # Data type: {str:str, ......}
    # Convert the DataFrame to a dictionary, where the first column is the value and the second column is the key
    stop_flag = pd.Series(df_sf.iloc[:, 1].values, index=df_sf.iloc[:, 0]).to_dict()

    # Check for consistency in the dimensions of the data structures
    if len(pn_matrix) != len(transition_attribute):
        print('The number of rows in the PN matrix does not match the number of transition_attribute, please check.\n')
    elif len(pn_matrix[0]) != len(place_attribute):
        print('The number of columns in the PN matrix does not match the number of place_attribute, please check.\n')
    else:
        print(f'PetriNet Matrix ({len(pn_matrix)} rows by {len(pn_matrix[0])} columns):')
        for row in pn_matrix:
            print(row)
        print(f'\nEach Transition duration and its over condition ({len(transition_attribute)} items):\n', transition_attribute)
        print(f'\nEach Place attributes and initial token quantities ({len(place_attribute)} items):\n', place_attribute)
        print(f'\nShip requests summary ({len(ships)} ships) (Ship Number: Import, Export):\n', ships)
        print('\nTransitions that require moment recording:\n', transition_flag)
        print(f'\nFlags marking the end of service for a ship:\n{stop_flag}\n')

        return pn_matrix, transition_attribute, place_attribute, ships, transition_flag, stop_flag


def initial_time():  # Set the simulation start time and time speed factor
    while True:
        time_str = input("Enter the simulation start time, format example (2023-12-01 14:30:00):\n")
        try:
            initial_start_time = datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S').timestamp()
            print("The entered date and time is: ", datetime.fromtimestamp(initial_start_time).strftime('%Y-%m-%d %H:%M:%S'))
            break
        except ValueError:
            print("The format of the entered date and time is incorrect. Please enter in the format '2023-12-01 14:30:00'.\n")
    while True:
        speed_factor = float(input("Enter the time speed factor (positive real number)\n"))
        if speed_factor >= 0:
            print("The entered time speed factor is: ", speed_factor)
            break
        else:
            print("Incorrect format, please enter a positive real number.\n")
    return initial_start_time, speed_factor


def initialize_places(env, place_list, place_store, init_done_event):
    """Add initial resources"""
    for i, tokens in enumerate(place_list):  # place_attribute type: [{str:int}, ......]
        item = {'Place_id': i, 'Tokens': tokens}  # i is of int type
        yield place_store.put(item)
    # Signal that initialization is complete
    #print('141ok')
    init_done_event.succeed()


def transition_processes(env, pid, tid, place_store, PN_Matrix, myT_attribute, Place_List, Ships, transition_flag, StopFlag, process_state_store, ships_state_store):

    print(f"Transition {tid} in process {pid} is running.")

    temp = yield process_state_store.get(lambda x: x['pid'] == pid)
    temp['state'][tid] = 'waiting'
    yield process_state_store.put(temp)
    index = int(tid[1:])
    from_place = []
    to_place = []
    cnt = 0
    for arc in PN_Matrix[index]:
        int_arc = [0, 0]
        if arc[0] == 'X':
            int_arc[0] = Ships[pid]['X']
        elif arc[0] == '-X':
            int_arc[0] = -Ships[pid]['X']
        elif arc[0] == 'Y':
            int_arc[0] = Ships[pid]['Y']
        elif arc[0] == '-Y':
            int_arc[0] = -Ships[pid]['Y']
        else:
            int_arc[0] = int(arc[0])

        int_arc[1] = int(arc[1])

        if int_arc[0] < 0:
            # Record the position/index of Pj that the transition needs to consume, as well as the arc weight and type
            from_place.append([cnt, int_arc[0], int_arc[1]])
        elif int_arc[0] > 0:
            # Record the destination Pj position/index of the token produced by the transition, as well as the arc weight and type
            to_place.append([cnt, int_arc[0], int_arc[1]])
        cnt += 1

    while True:
        temp = yield process_state_store.get(lambda x: x['pid'] == pid)
        yield process_state_store.put(temp)
        if temp['state'].get(tid, 'not start') != 'over':

            fire = True
            from_place_tokens = []
            for i in from_place:
                temp = yield place_store.get(lambda x: x['Place_id'] == i[0])
                from_place_tokens.append(temp)

            # Read the global Place tokens information
            for i, j in zip(from_place, from_place_tokens):
                # Check if there are enough resources to consume
                if i[2] != 0 and j['Tokens'].get(pid + '_' + str(i[2]), 0) < -i[1]:
                    fire = False
                    break
                elif i[2] == 0 and j['Tokens'].get('pub', 0) < -i[1]:
                    fire = False
                    break
                elif i[2] == 0 and j['Tokens'].get('pub', 0) == 1:
                    # If there is only 1 resource left in this pubPlace, give priority to the smaller numbered t event in the PN process chart
                    for t in range(len(PN_Matrix)):
                        if PN_Matrix[t][i[0]][0] != '0' and t < int(tid[1:]):
                            fire = False
                            print(f'{pid} {tid} requests the last pub_token of {i[0]}th Place failed, no priority, continue to wait\n')
                            break
                if not fire:
                    break

            # Execute the event
            if fire:
                # Consume resources
                for ind, i in enumerate(from_place):
                    if i[2] == 0:
                        from_place_tokens[ind]['Tokens']['pub'] += i[1]
                        print(f'{pid} {tid} consumes {-i[1]} class {i[2]} resources from {i[0]}th Place')
                    else:
                        from_place_tokens[ind]['Tokens'][pid + '_' + str(i[2])] = from_place_tokens[ind]['Tokens'].get(pid + '_' + str(i[2]), 0) + i[1]
                        print(f'{pid} {tid} consumes {-i[1]} class {i[2]} resources from {i[0]}th Place')

                # Simulate time consumption
                try:
                    print(f'{pid} {tid} timeout start, now:{env.now}')
                    yield env.timeout(int(myT_attribute[0]))
                    print(f'{pid} {tid} has finished the timeout')
                except Exception as e:
                    print(f'An error occurred: {e}')

                # Release all locks
                for i in from_place_tokens:
                    yield place_store.put(i)

                # Produce tokens
                for i in to_place:
                    if i[2] == 0:  # Public resources
                        temp = yield place_store.get(lambda x: x['Place_id'] == i[0])
                        temp['Tokens']['pub'] += i[1]
                        yield place_store.put(temp)
                        print(f'{pid} {tid} produces {i[1]} class {i[2]} resources to {i[0]}th Place')

                    else:  # Non-public resources with arc type
                        temp = yield place_store.get(lambda x: x['Place_id'] == i[0])
                        temp['Tokens'][pid + '_' + str(i[2])] = temp['Tokens'].get(pid + '_' + str(i[2]), 0) + i[1]
                        yield place_store.put(temp)
                        print(f'{pid} {tid} produces {i[1]} class {i[2]} resources to {i[0]}th Place')

                if tid in transition_flag:  # Record the moment for transitions that need moment recording
                    temp = yield process_state_store.get(lambda x: x['pid'] == pid)
                    yield process_state_store.put(temp)
                    if temp['state'][tid] != 'over':

                        val = transition_flag[tid]
                        temp = yield ships_state_store.get(lambda x: x['pid'] == pid)
                        temp['state'][val] = datetime.fromtimestamp(env.now).strftime('%Y-%m-%d %H:%M:%S')
                        yield ships_state_store.put(temp)

                        # If all are recorded, they can be put into the output queue
                        time_over_ship = True
                        try:
                            temp = yield ships_state_store.get(lambda x: x['pid'] == pid)
                            yield ships_state_store.put(temp)

                            for k, v in temp['state'].items():
                                if v == 'null':
                                    time_over_ship = False
                                    break
                            if time_over_ship:
                                yield over_ships_queue.put(temp)  # Put into queue
                                print(f'{pid} enqueued')
                        except Exception as e:
                            print(f'An error occurred: {e} \nRecording moments for certain transitions')

                # After the event is executed, it defaults back to the waiting state, special T becomes over
                if myT_attribute[1] == 1:
                    temp = yield process_state_store.get(lambda x: x['pid'] == pid)
                    temp['state'][tid] = 'over'
                    print(f'{pid} {tid} over')
                    yield process_state_store.put(temp)
                else:
                    temp = yield process_state_store.get(lambda x: x['pid'] == pid)
                    temp['state'][tid] = 'waiting'
                    yield process_state_store.put(temp)

                # Check after executing an event
                time_to_stop = True
                temp = yield process_state_store.get(lambda x: x['pid'] == pid)
                yield process_state_store.put(temp)

                for k, v in StopFlag.items():
                    if temp['state'].get(k, 'not start') != v:
                        time_to_stop = False
                        break

                if time_to_stop:
                    temp = yield process_state_store.get(lambda x: x['pid'] == pid)

                    for k, v in temp['state'].items():
                        temp['state'][k] = 'over'
                    yield process_state_store.put(temp)

                    print(f'{pid} shut down')

            # Release all locks
            else:
                for i in from_place_tokens:
                    yield place_store.put(i)

        else:
            break


def ship_processes(env, pid, place_store, PN_Matrix, TransitionAttribute, Place_List, Ships, transition_flag, StopFlag, process_state_store, ships_state_store, init_done_event):
    yield init_done_event
    item = {'pid': pid, 'state': {}}
    yield process_state_store.put(item)
    item = {'pid': pid, 'state': {}}
    yield ships_state_store.put(item)
    for k, v in transition_flag.items():
        temp = yield ships_state_store.get(lambda x: x['pid'] == pid)
        temp['state'][v] = 'null'
        yield ships_state_store.put(temp)
    for i in range(len(TransitionAttribute)):
        tid = f'T{i}'
        env.process(transition_processes(env, pid, tid, place_store, PN_Matrix, TransitionAttribute[i], Place_List, Ships, transition_flag, StopFlag, process_state_store, ships_state_store))

if __name__ == '__main__':
    # Read the file to obtain the initial matrix
    PN_Matrix, TransitionAttribute, Place_List, Ships, TransitionFlag, StopFlag = read_xlsx()
    # Set the simulation start time and time speed factor
    start_time, speed_factor = initial_time()
    # Initialize the simulation environment (commented out line allows setting a start time)
    #env = simpy.Environment(initial_time=start_time)
    env = simpy.Environment()
    # Place_List needs to be set as a shared resource; the rest are read-only, so pass them directly
    # Initialize the state of each Place resource
    place_store = simpy.FilterStore(env)
    init_done_event = env.event()
    env.process(initialize_places(env, Place_List, place_store, init_done_event))

    # GUI process is responsible for displaying output in real-time; the program will output the following:
    # process_state = {}  # Output the status of various processes {pid1: {tid1: ready/working/waiting, tid2...}, pid2: {...}...}
    process_state_store = simpy.FilterStore(env)
    # ships_state = {}  # Save the state of ship service processes {pid1: {application time, entry time, departure time}, pid2: {...}...}
    ships_state_store = simpy.FilterStore(env)
    over_ships_queue = simpy.Store(env)  # Once over_ship has a departure time, put it in the store for GUI output (using the store as a queue)

    place_store_plt_data = {}  # {place_1: [save a sum_tokens for each time period (index)], place_2: [], ....}
    process_state_store_plt_data = {}   # {pid1: {tid1: [save the status t for each time period (index)]}, pidn: {tidn: []...}, .... }


    for k, v in Ships.items():
        pid = k
        env.process(ship_processes(env, pid, place_store, PN_Matrix, TransitionAttribute, Place_List, Ships, TransitionFlag, StopFlag, process_state_store, ships_state_store, init_done_event))

    env.run()



Solution

  • I think I know why now. When determining whether a process can execute, if the evaluation fails, it will return to continue checking the execution conditions. After a failed judgment, we need to implement a timeout. Otherwise, if the event cannot proceed, it will endlessly loop through the judgment uninterrupted, preventing other events from proceeding.

    After adding this timeout, at least the code is running smoothly, and it more closely reflects the actual situation.

        while True:
            ......
            ......
            # Execute the event
            if fire:
               ......
               yield env.timeout(...)
               ......
    
            # Release all locks
            else:
                ......
                yield env.timeout(...)