Search code examples
pythonmachine-learningjupyter-notebooksimulationreinforcement-learning

Why I am getting AttributeError: 'process_simulation' object has no attribute 'reset'?


I have a class process_simulation. In that, I have a method sample. I am trying to run it from within the same object context. Please have a look at the code:

import simpy
import random

class process_simulation():
    
    def __init__(self, simulation_duration=60, time_step=1, setup_time=0, reward_value=0):
        
        self.simulation_duration = simulation_duration
        self.time_step = time_step
        self.next_time_stop = 0
        
        self.setup_time = setup_time
        self.buffer_capacity = 99999
        self.initial_buffer = 0
        self.initial_reward = reward_value
        
        self.process_time_of_product_a = [5,4,3,3,1]
        self.process_time_of_product_b = [1,2,3,4,5]
        self.process_time_of_product_c = [3,5,1,4,2]
        
        self.default_standard_deviation = .0
        
        
        self.fifo_list = []
        
        
        self.time_to_get_raw_material = 1
        self.cost_for_machining = 0
        self.cost_for_storage = 1
        self.cost_for_raw_material = 50
        
        
        self.finished_jobs_machine1 = 0
        self.finished_jobs_machine2 = 0
        self.finished_jobs_machine3 = 0
        self.finished_jobs_machine4 = 0
        self.finished_jobs_machine5 = 0
        
        
        self.consecutive_production_of_product_a = 0
        self.consecutive_production_of_product_b = 0
        self.consecutive_production_of_product_c = 0
        self.consecutive_switches_to_product_a = 0
        self.consecutive_switches_to_product_b = 0
        self.consecutive_switches_to_product_c = 0
        self.consecutive_idle_states = 0
        
        self.actions = [0,1,2,3]
        self.reward = self.initial_reward
        self.state = dict()
        
        self.observation_size = 3
        self.action_size = len(self.actions)
        
        
    def _get_next_job(self,finished_jobs):
        return self.fifo_list[finished_jobs]
    
    def _apply_variability(self,process_time):
        return random.gauss(process_time,process_time*self.default_standard_deviation)
    
            
        # RF1: Simply return a default value for finished goods
        # return 300 

        # RF2: Value mix of products in terms of the over-all reward
        #'''
        # Case 1: selection of product type A for manufacturing
    
    def _calculate_sales_value(self,product_type):
        
        if product_type=='A':
            self.consecutive_production_of_product_a += 1
            self.consecutive_production_of_product_b = 0
            self.consecutive_production_of_product_c = 0
            if self.consecutive_production_of_product_a ==1:
                self.consecutive_switches_to_product_a +=1
                return min(400, 200*(1.5**(self.consecutive_switches_to_product_a)))
            else:
                self.consecutive_switches_to_product_a = 0
                return max (100, 200*(.75**(self.consecutive_switches_to_product_a-2)))
            
        elif product_type=='B':
            self.consecutive_production_of_product_b += 1
            self.consecutive_production_of_product_a = 0
            self.consecutive_production_of_product_c = 0
            if self.consecutive_production_of_product_b ==1:
                self.consecutive_switches_to_product_b +=1
                return min(400, 200*(1.5**(self.consecutive_switches_to_product_b)))
            else:
                self.consecutive_switches_to_product_a = 0
                return max (100, 200*(.75**(self.consecutive_switches_to_product_b-1)))
            
        elif product_type=='C':
            self.consecutive_production_of_product_c += 1
            self.consecutive_production_of_product_a = 0
            self.consecutive_production_of_product_b = 0
            if self.consecutive_production_of_product_c ==1:
                self.consecutive_switches_to_product_c +=1
                return min(400, 200*(1.5**(self.consecutive_switches_to_product_c)))
            else:
                self.consecutive_switches_to_product_a = 0
                return max (100, 200*(.75**(self.consecutive_switches_to_product_c)))
        
        
        else:
            raise valueerror('{} --> requested type is not in the list of allowed product types [A, B , C]'.format(product_type))
            
            
    def _run_machine_1(self):
        while True:
            jobs = len(self.fifo_list)
            if (jobs==0) or (jobs==self.finished_jobs_machine1):
                yield self.env.timeout(self.time_step)
                
            else:
                next_job = self._get_next_job(self.finished_jobs_machine1)
                if job==1:
                    yield self.env.timeout(self.setup_time)
                elif self.fifo_list[self.finished_jobs_machine1-1] != next_job:
                    yield self.timeout(self.setup_time)
                
                if next_job=='A':
                    yield self.env.timeout(self.time_to_get_raw_material)
                    yield self.env.raw_material_of_product_a.get(1)
                    self.reward -= self.cost_for_machining*self.process_time_of_product_a[0]
                    yield self.env.timeout(self._apply_variability(self.process_time_of_product_a[0]))
                    yield self.env.raw_material_machine1_product_a.put(1)
                    self.finished_jobs_machine1 +=1
                    
                elif next_job=='B':
                    yield self.env.timeout(self.time_to_get_raw_material)
                    yield self.env.raw_material_of_product_b.get(1)
                    self.reward -= self.cost_for_machining*self.process_time_of_product_b[0]
                    yield self.env.timeout(self._apply_variability(self.process_time_of_product_b[0]))
                    yield self.env.raw_material_machine1_product_b.put(1)
                    self.finished_jobs_machine1 +=1
                    
                elif next_job=='C':
                    yield self.env.timeout(self.time_to_get_raw_material)
                    yield self.env.raw_material_of_product_c.get(1)
                    self.reward -= self.cost_for_machining*self.process_time_of_product_c[0]
                    yield self.env.timeout(self._apply_variability(self.process_time_of_product_c[0]))
                    yield self.env.raw_material_machine1_product_c.put(1)
                    self.finished_jobs_machine1 +=1
                    
                else:
                    raise valueerror('{} --> requested type is not in the list of allowed product types [A, B, C]'.format(product_type))
                    
    def _run_machine_2(self):
        while True:
            jobs = len(self.fifo_list)
            if (jobs==0) or (jobs==self.finished_jobs_machine2):
                yield self.env.timeout(self.time_step)
                
            else:
                next_job = self._get_next_job(self.finished_jobs_machine2)
                if job==1:
                    yield self.env.timeout(self.setup_time)
                elif self.fifo_list[self.finished_jobs_machine2-1] != next_job:
                    yield self.timeout(self.setup_time)
                
                if next_job=='A':
                    yield self.env.timeout(self.time_to_get_raw_material)
                    yield self.env.raw_material_machine1_product_a
                    self.reward -= self.cost_for_machining*self.process_time_of_product_a[0]
                    yield self.env.timeout(self._apply_variability(self.process_time_of_product_a[0]))
                    yield self.env.raw_material_machine2_product_a.put(1)
                    self.finished_jobs_machine2 +=1
                    
                elif next_job=='B':
                    yield self.env.timeout(self.time_to_get_raw_material)
                    yield self.env.raw_material_machine1_product_b.get(1)
                    self.reward -= self.cost_for_machining*self.process_time_of_product_b[0]
                    yield self.env.timeout(self._apply_variability(self.process_time_of_product_b[0]))
                    yield self.env.raw_material_machine2_product_b.put(1)
                    self.finished_jobs_machine2 +=1
                    
                elif next_job=='C':
                    yield self.env.timeout(self.time_to_get_raw_material)
                    yield self.env.raw_material_machine1_product_c.get(1)
                    self.reward -= self.cost_for_machining*self.process_time_of_product_c[0]
                    yield self.env.timeout(self._apply_variability(self.process_time_of_product_c[0]))
                    yield self.env.raw_material_machine2_product_c.put(1)
                    self.finished_jobs_machine2 +=1
                    
                else:
                    raise valueerror('{} --> requested type is not in the list of allowed product types [A, B, C]'.format(product_type))
                    
        
    def _run_machine_3(self):
        while True:
            jobs = len(self.fifo_list)
            if (jobs==0) or (jobs==self.finished_jobs_machine3):
                yield self.env.timeout(self.time_step)
                
            else:
                next_job = self._get_next_job(self.finished_jobs_machine3)
                if job==1:
                    yield self.env.timeout(self.setup_time)
                elif self.fifo_list[self.finished_jobs_machine3-1] != next_job:
                    yield self.timeout(self.setup_time)
                
                if next_job=='A':
                    yield self.env.timeout(self.time_to_get_raw_material)
                    yield self.env.raw_material_machine2_product_a.get(1)
                    self.reward -= self.cost_for_machining*self.process_time_of_product_a[0]
                    yield self.env.timeout(self._apply_variability(self.process_time_of_product_a[0]))
                    yield self.env.raw_material_machine3_product_a.put(1)
                    self.finished_jobs_machine3 +=1
                    
                elif next_job=='B':
                    yield self.env.timeout(self.time_to_get_raw_material)
                    yield self.env.raw_material_machine2_product_b.get(1)
                    self.reward -= self.cost_for_machining*self.process_time_of_product_b[0]
                    yield self.env.timeout(self._apply_variability(self.process_time_of_product_b[0]))
                    yield self.env.raw_material_machine3_product_b.put(1)
                    self.finished_jobs_machine3 +=1
                    
                elif next_job=='C':
                    yield self.env.timeout(self.time_to_get_raw_material)
                    yield self.env.raw_material_machine2_product_c.get(1)
                    self.reward -= self.cost_for_machining*self.process_time_of_product_c[0]
                    yield self.env.timeout(self._apply_variability(self.process_time_of_product_c[0]))
                    yield self.env.raw_material_machine3_product_c.put(1)
                    self.finished_jobs_machine3 +=1
                    
                else:
                    raise valueerror('{} --> requested type is not in the list of allowed product types [A, B, C]'.format(product_type))
        
        
    def _run_machine_4(self):
        while True:
            jobs = len(self.fifo_list)
            if (jobs==0) or (jobs==self.finished_jobs_machine4):
                yield self.env.timeout(self.time_step)
                
            else:
                next_job = self._get_next_job(self.finished_jobs_machine4)
                if job==1:
                    yield self.env.timeout(self.setup_time)
                elif self.fifo_list[self.finished_jobs_machine4-1] != next_job:
                    yield self.timeout(self.setup_time)
                
                if next_job=='A':
                    yield self.env.timeout(self.time_to_get_raw_material)
                    yield self.env.raw_material_machine3_product_a.get(1)
                    self.reward -= self.cost_for_machining*self.process_time_of_product_a[0]
                    yield self.env.timeout(self._apply_variability(self.process_time_of_product_a[0]))
                    yield self.env.raw_material_machine4_product_a.put(1)
                    self.finished_jobs_machine4 +=1
                    
                elif next_job=='B':
                    yield self.env.timeout(self.time_to_get_raw_material)
                    yield self.env.raw_material_machine3_product_b.get(1)
                    self.reward -= self.cost_for_machining*self.process_time_of_product_b[0]
                    yield self.env.timeout(self._apply_variability(self.process_time_of_product_b[0]))
                    yield self.env.raw_material_machine4_product_b.put(1)
                    self.finished_jobs_machine4 +=1
                    
                elif next_job=='C':
                    yield self.env.timeout(self.time_to_get_raw_material)
                    yield self.env.raw_material_machine3_product_c.get(1)
                    self.reward -= self.cost_for_machining*self.process_time_of_product_c[0]
                    yield self.env.timeout(self._apply_variability(self.process_time_of_product_c[0]))
                    yield self.env.raw_material_machine4_product_c.put(1)
                    self.finished_jobs_machine4 +=1
                    
                else:
                    raise valueerror('{} --> requested type is not in the list of allowed product types [A, B, C]'.format(product_type))
                    
                    
    def _run_machine_5(self):
        while True:
            jobs = len(self.fifo_list)
            if (jobs==0) or (jobs==self.finished_jobs_machine5):
                yield self.env.timeout(self.time_step)
                
            else:
                next_job = self._get_next_job(self.finished_jobs_machine5)
                if job==1:
                    yield self.env.timeout(self.setup_time)
                elif self.fifo_list[self.finished_jobs_machine5-1] != next_job:
                    yield self.timeout(self.setup_time)
                
                if next_job=='A':
                    yield self.env.timeout(self.time_to_get_raw_material)
                    yield self.env.raw_material_machine4_product_a.get(1)
                    self.reward -= self.cost_for_machining*self.process_time_of_product_a[0]
                    yield self.env.timeout(self._apply_variability(self.process_time_of_product_a[0]))
                    yield self.env.raw_material_machine5_product_a.put(1)
                    self.finished_jobs_machine5 +=1
                    
                elif next_job=='B':
                    yield self.env.timeout(self.time_to_get_raw_material)
                    yield self.env.raw_material_machine4_product_b.get(1)
                    self.reward -= self.cost_for_machining*self.process_time_of_product_b[0]
                    yield self.env.timeout(self._apply_variability(self.process_time_of_product_b[0]))
                    yield self.env.raw_material_machine5_product_b.put(1)
                    self.finished_jobs_machine5 +=1
                    
                elif next_job=='C':
                    yield self.env.timeout(self.time_to_get_raw_material)
                    yield self.env.raw_material_machine4_product_c.get(1)
                    self.reward -= self.cost_for_machining*self.process_time_of_product_c[0]
                    yield self.env.timeout(self._apply_variability(self.process_time_of_product_c[0]))
                    yield self.env.raw_material_machine5_product_c.put(1)
                    self.finished_jobs_machine5 +=1
                    
                else:
                    raise valueerror('{} --> requested type is not in the list of allowed product types [A, B, C]'.format(product_type))
 
    
 
    def _get_buffer_level(self):
        return[self.env.raw_material_of_product_a.level,self.env.raw_material_of_product_b.level,self.env.raw_material_of_product_c.level,
               self.env.raw_material_machine1_product_a.level,self.env.raw_material_machine1_product_b.level,self.env.raw_material_machine1_product_c.level,
               self.env.raw_material_machine2_product_a.level,self.env.raw_material_machine2_product_b.level,self.env.raw_material_machine2_product_c.level,
               self.env.raw_material_machine3_product_a.level,self.env.raw_material_machine3_product_b.level,self.env.raw_material_machine3_product_c.level,
               self.env.raw_material_machine4_product_a.level,self.env.raw_material_machine4_product_b.level,self.env.raw_material_machine4_product_c.level,
               self.env.raw_material_machine5_product_a.level,self.env.raw_material_machine5_product_b.level,self.env.raw_material_machine5_product_c.level]
    
    def _get_observations_of_the_buffer_level(self):
        observations = self._get_buffer_level()
        return observations[:3]
    
    
    def _islegal(self,action):
        if action not in self.action:
            raise ValueError('{} --> requested action not in list of allowed actions [0, 1, 2, 3'.format(action))
            
    
    def _release_raw_material(self,action):
        if action==0:
            yield self.env.raw_material_of_product_a.put(1)
            self.reward -= self.cost_for_raw_material
            
        elif action==1:
            yield self.env.raw_material_of_product_b.put(1)
            self.reward -= self.cost_for_raw_material
            
        elif action==2:
            yield self.env.raw_material_of_product_c.put(1)
            self.reward -= self.cost_for_raw_material
            
        else:
            pass
        
        
    def _perform_action(self,action):
        if action == 0:
            self.fifo_list +=['A']
            self.env.process(self._release_raw_material(action))
        
        elif action == 1:
            self.fifo_list +=['B']
            self.env.process(self._release_raw_material(action))
            
        elif action == 2:
            self.fifo_list +=['C']
            self.env.process(self._release_raw_material(action))
            
        elif action ==3:
            self.reward -= 0
                    
            
            
    
    
    def _reset_simulation(self):
        self.env = simpy.Environment()
        self.env.raw_material_of_product_a = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_of_product_b = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_of_product_c = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_machine1_product_a = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_machine1_product_b = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_machine1_product_c = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_machine2_product_a = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_machine2_product_b = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_machine2_product_c = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_machine3_product_a = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_machine3_product_b = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_machine3_product_c = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_machine4_product_a = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_machine4_product_b = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_machine4_product_c = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_machine5_product_a = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_machine5_product_b = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        self.env.raw_material_machine5_product_c = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
        
        
        self.finished_jobs_machine1=0
        self.finished_jobs_machine2=0
        self.finished_jobs_machine3=0
        self.finished_jobs_machine4=0
        self.finished_jobs_machine5=0
        
        self.consecutive_production_of_product_a=0
        self.consecutive_production_of_product_b=0
        self.consecutive_production_of_product_c=0
        self.consecutive_switches_to_product_a=0
        self.consecutive_switches_to_product_b=0
        self.consecutive_switches_to_product_c=0
        self.consecutive_idle_states=0
        
        self.next_time_stop=0
        self.fifo_list=[]
        self.reward=self.initial_reward
        
        self.env.process(self._run_machine_1())
        self.env.process(self._run_machine_2())
        self.env.process(self._run_machine_3())
        self.env.process(self._run_machine_4())
        self.env.process(self._run_machine_5())
        
        observations=self._get_observations_of_the_buffer_level()
        return observations
    
    
    def step(self,action):
        self.reward=self.initial_reward
        self._islegal(action)
        self._perform_action(action)
        if False:
            print('t=' + str(self.env.now) + ' ' + str(self._get_observations()) + ' ' + str(self.reward))
            
            
        self.next_time_stop +=self.time_step
        self.env.run(untill=self.next_time_stop)
        
        observations = self._get_observations_of_the_buffer_level()
        cost_for_storage=sum(self._get_buffer_level()[:3])
        
        if action!=3:
            self.consecutive_idle_states = 0
        else: 
            self.consecutive_idle_states += 1
            
        reward = self.reward - cost_for_storage - (self.consecutive_idle_states*5)
        terminal= True if (self.env.now >= self.simulation_duration) else False
        info = dict()
        return(observations,reward,terminal,info)

Normally the simulation runs but when I am calling this function to another simulation it is showing this error.


Solution

  • You tried to do something like

    sim1 = process_simulation()
    sim1.reset()
    

    or

    sim1 = process_simulation()
    var1 = sim1.reset
    

    which you obviously neither define reset method or initialize reset attribute in your init.

    That question does NOT provide enough info and it's a really terrible question. Like @buran said in the comment.