I have a class process_simulation. In that, I have a method sample. I am trying to run it from within the same object context. Please have a look at the code:
import simpy
import random
class process_simulation():
def __init__(self, simulation_duration=60, time_step=1, setup_time=0, reward_value=0):
self.simulation_duration = simulation_duration
self.time_step = time_step
self.next_time_stop = 0
self.setup_time = setup_time
self.buffer_capacity = 99999
self.initial_buffer = 0
self.initial_reward = reward_value
self.process_time_of_product_a = [5,4,3,3,1]
self.process_time_of_product_b = [1,2,3,4,5]
self.process_time_of_product_c = [3,5,1,4,2]
self.default_standard_deviation = .0
self.fifo_list = []
self.time_to_get_raw_material = 1
self.cost_for_machining = 0
self.cost_for_storage = 1
self.cost_for_raw_material = 50
self.finished_jobs_machine1 = 0
self.finished_jobs_machine2 = 0
self.finished_jobs_machine3 = 0
self.finished_jobs_machine4 = 0
self.finished_jobs_machine5 = 0
self.consecutive_production_of_product_a = 0
self.consecutive_production_of_product_b = 0
self.consecutive_production_of_product_c = 0
self.consecutive_switches_to_product_a = 0
self.consecutive_switches_to_product_b = 0
self.consecutive_switches_to_product_c = 0
self.consecutive_idle_states = 0
self.actions = [0,1,2,3]
self.reward = self.initial_reward
self.state = dict()
self.observation_size = 3
self.action_size = len(self.actions)
def _get_next_job(self,finished_jobs):
return self.fifo_list[finished_jobs]
def _apply_variability(self,process_time):
return random.gauss(process_time,process_time*self.default_standard_deviation)
# RF1: Simply return a default value for finished goods
# return 300
# RF2: Value mix of products in terms of the over-all reward
#'''
# Case 1: selection of product type A for manufacturing
def _calculate_sales_value(self,product_type):
if product_type=='A':
self.consecutive_production_of_product_a += 1
self.consecutive_production_of_product_b = 0
self.consecutive_production_of_product_c = 0
if self.consecutive_production_of_product_a ==1:
self.consecutive_switches_to_product_a +=1
return min(400, 200*(1.5**(self.consecutive_switches_to_product_a)))
else:
self.consecutive_switches_to_product_a = 0
return max (100, 200*(.75**(self.consecutive_switches_to_product_a-2)))
elif product_type=='B':
self.consecutive_production_of_product_b += 1
self.consecutive_production_of_product_a = 0
self.consecutive_production_of_product_c = 0
if self.consecutive_production_of_product_b ==1:
self.consecutive_switches_to_product_b +=1
return min(400, 200*(1.5**(self.consecutive_switches_to_product_b)))
else:
self.consecutive_switches_to_product_a = 0
return max (100, 200*(.75**(self.consecutive_switches_to_product_b-1)))
elif product_type=='C':
self.consecutive_production_of_product_c += 1
self.consecutive_production_of_product_a = 0
self.consecutive_production_of_product_b = 0
if self.consecutive_production_of_product_c ==1:
self.consecutive_switches_to_product_c +=1
return min(400, 200*(1.5**(self.consecutive_switches_to_product_c)))
else:
self.consecutive_switches_to_product_a = 0
return max (100, 200*(.75**(self.consecutive_switches_to_product_c)))
else:
raise valueerror('{} --> requested type is not in the list of allowed product types [A, B , C]'.format(product_type))
def _run_machine_1(self):
while True:
jobs = len(self.fifo_list)
if (jobs==0) or (jobs==self.finished_jobs_machine1):
yield self.env.timeout(self.time_step)
else:
next_job = self._get_next_job(self.finished_jobs_machine1)
if job==1:
yield self.env.timeout(self.setup_time)
elif self.fifo_list[self.finished_jobs_machine1-1] != next_job:
yield self.timeout(self.setup_time)
if next_job=='A':
yield self.env.timeout(self.time_to_get_raw_material)
yield self.env.raw_material_of_product_a.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_a[0]
yield self.env.timeout(self._apply_variability(self.process_time_of_product_a[0]))
yield self.env.raw_material_machine1_product_a.put(1)
self.finished_jobs_machine1 +=1
elif next_job=='B':
yield self.env.timeout(self.time_to_get_raw_material)
yield self.env.raw_material_of_product_b.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_b[0]
yield self.env.timeout(self._apply_variability(self.process_time_of_product_b[0]))
yield self.env.raw_material_machine1_product_b.put(1)
self.finished_jobs_machine1 +=1
elif next_job=='C':
yield self.env.timeout(self.time_to_get_raw_material)
yield self.env.raw_material_of_product_c.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_c[0]
yield self.env.timeout(self._apply_variability(self.process_time_of_product_c[0]))
yield self.env.raw_material_machine1_product_c.put(1)
self.finished_jobs_machine1 +=1
else:
raise valueerror('{} --> requested type is not in the list of allowed product types [A, B, C]'.format(product_type))
def _run_machine_2(self):
while True:
jobs = len(self.fifo_list)
if (jobs==0) or (jobs==self.finished_jobs_machine2):
yield self.env.timeout(self.time_step)
else:
next_job = self._get_next_job(self.finished_jobs_machine2)
if job==1:
yield self.env.timeout(self.setup_time)
elif self.fifo_list[self.finished_jobs_machine2-1] != next_job:
yield self.timeout(self.setup_time)
if next_job=='A':
yield self.env.timeout(self.time_to_get_raw_material)
yield self.env.raw_material_machine1_product_a
self.reward -= self.cost_for_machining*self.process_time_of_product_a[0]
yield self.env.timeout(self._apply_variability(self.process_time_of_product_a[0]))
yield self.env.raw_material_machine2_product_a.put(1)
self.finished_jobs_machine2 +=1
elif next_job=='B':
yield self.env.timeout(self.time_to_get_raw_material)
yield self.env.raw_material_machine1_product_b.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_b[0]
yield self.env.timeout(self._apply_variability(self.process_time_of_product_b[0]))
yield self.env.raw_material_machine2_product_b.put(1)
self.finished_jobs_machine2 +=1
elif next_job=='C':
yield self.env.timeout(self.time_to_get_raw_material)
yield self.env.raw_material_machine1_product_c.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_c[0]
yield self.env.timeout(self._apply_variability(self.process_time_of_product_c[0]))
yield self.env.raw_material_machine2_product_c.put(1)
self.finished_jobs_machine2 +=1
else:
raise valueerror('{} --> requested type is not in the list of allowed product types [A, B, C]'.format(product_type))
def _run_machine_3(self):
while True:
jobs = len(self.fifo_list)
if (jobs==0) or (jobs==self.finished_jobs_machine3):
yield self.env.timeout(self.time_step)
else:
next_job = self._get_next_job(self.finished_jobs_machine3)
if job==1:
yield self.env.timeout(self.setup_time)
elif self.fifo_list[self.finished_jobs_machine3-1] != next_job:
yield self.timeout(self.setup_time)
if next_job=='A':
yield self.env.timeout(self.time_to_get_raw_material)
yield self.env.raw_material_machine2_product_a.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_a[0]
yield self.env.timeout(self._apply_variability(self.process_time_of_product_a[0]))
yield self.env.raw_material_machine3_product_a.put(1)
self.finished_jobs_machine3 +=1
elif next_job=='B':
yield self.env.timeout(self.time_to_get_raw_material)
yield self.env.raw_material_machine2_product_b.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_b[0]
yield self.env.timeout(self._apply_variability(self.process_time_of_product_b[0]))
yield self.env.raw_material_machine3_product_b.put(1)
self.finished_jobs_machine3 +=1
elif next_job=='C':
yield self.env.timeout(self.time_to_get_raw_material)
yield self.env.raw_material_machine2_product_c.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_c[0]
yield self.env.timeout(self._apply_variability(self.process_time_of_product_c[0]))
yield self.env.raw_material_machine3_product_c.put(1)
self.finished_jobs_machine3 +=1
else:
raise valueerror('{} --> requested type is not in the list of allowed product types [A, B, C]'.format(product_type))
def _run_machine_4(self):
while True:
jobs = len(self.fifo_list)
if (jobs==0) or (jobs==self.finished_jobs_machine4):
yield self.env.timeout(self.time_step)
else:
next_job = self._get_next_job(self.finished_jobs_machine4)
if job==1:
yield self.env.timeout(self.setup_time)
elif self.fifo_list[self.finished_jobs_machine4-1] != next_job:
yield self.timeout(self.setup_time)
if next_job=='A':
yield self.env.timeout(self.time_to_get_raw_material)
yield self.env.raw_material_machine3_product_a.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_a[0]
yield self.env.timeout(self._apply_variability(self.process_time_of_product_a[0]))
yield self.env.raw_material_machine4_product_a.put(1)
self.finished_jobs_machine4 +=1
elif next_job=='B':
yield self.env.timeout(self.time_to_get_raw_material)
yield self.env.raw_material_machine3_product_b.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_b[0]
yield self.env.timeout(self._apply_variability(self.process_time_of_product_b[0]))
yield self.env.raw_material_machine4_product_b.put(1)
self.finished_jobs_machine4 +=1
elif next_job=='C':
yield self.env.timeout(self.time_to_get_raw_material)
yield self.env.raw_material_machine3_product_c.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_c[0]
yield self.env.timeout(self._apply_variability(self.process_time_of_product_c[0]))
yield self.env.raw_material_machine4_product_c.put(1)
self.finished_jobs_machine4 +=1
else:
raise valueerror('{} --> requested type is not in the list of allowed product types [A, B, C]'.format(product_type))
def _run_machine_5(self):
while True:
jobs = len(self.fifo_list)
if (jobs==0) or (jobs==self.finished_jobs_machine5):
yield self.env.timeout(self.time_step)
else:
next_job = self._get_next_job(self.finished_jobs_machine5)
if job==1:
yield self.env.timeout(self.setup_time)
elif self.fifo_list[self.finished_jobs_machine5-1] != next_job:
yield self.timeout(self.setup_time)
if next_job=='A':
yield self.env.timeout(self.time_to_get_raw_material)
yield self.env.raw_material_machine4_product_a.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_a[0]
yield self.env.timeout(self._apply_variability(self.process_time_of_product_a[0]))
yield self.env.raw_material_machine5_product_a.put(1)
self.finished_jobs_machine5 +=1
elif next_job=='B':
yield self.env.timeout(self.time_to_get_raw_material)
yield self.env.raw_material_machine4_product_b.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_b[0]
yield self.env.timeout(self._apply_variability(self.process_time_of_product_b[0]))
yield self.env.raw_material_machine5_product_b.put(1)
self.finished_jobs_machine5 +=1
elif next_job=='C':
yield self.env.timeout(self.time_to_get_raw_material)
yield self.env.raw_material_machine4_product_c.get(1)
self.reward -= self.cost_for_machining*self.process_time_of_product_c[0]
yield self.env.timeout(self._apply_variability(self.process_time_of_product_c[0]))
yield self.env.raw_material_machine5_product_c.put(1)
self.finished_jobs_machine5 +=1
else:
raise valueerror('{} --> requested type is not in the list of allowed product types [A, B, C]'.format(product_type))
def _get_buffer_level(self):
return[self.env.raw_material_of_product_a.level,self.env.raw_material_of_product_b.level,self.env.raw_material_of_product_c.level,
self.env.raw_material_machine1_product_a.level,self.env.raw_material_machine1_product_b.level,self.env.raw_material_machine1_product_c.level,
self.env.raw_material_machine2_product_a.level,self.env.raw_material_machine2_product_b.level,self.env.raw_material_machine2_product_c.level,
self.env.raw_material_machine3_product_a.level,self.env.raw_material_machine3_product_b.level,self.env.raw_material_machine3_product_c.level,
self.env.raw_material_machine4_product_a.level,self.env.raw_material_machine4_product_b.level,self.env.raw_material_machine4_product_c.level,
self.env.raw_material_machine5_product_a.level,self.env.raw_material_machine5_product_b.level,self.env.raw_material_machine5_product_c.level]
def _get_observations_of_the_buffer_level(self):
observations = self._get_buffer_level()
return observations[:3]
def _islegal(self,action):
if action not in self.action:
raise ValueError('{} --> requested action not in list of allowed actions [0, 1, 2, 3'.format(action))
def _release_raw_material(self,action):
if action==0:
yield self.env.raw_material_of_product_a.put(1)
self.reward -= self.cost_for_raw_material
elif action==1:
yield self.env.raw_material_of_product_b.put(1)
self.reward -= self.cost_for_raw_material
elif action==2:
yield self.env.raw_material_of_product_c.put(1)
self.reward -= self.cost_for_raw_material
else:
pass
def _perform_action(self,action):
if action == 0:
self.fifo_list +=['A']
self.env.process(self._release_raw_material(action))
elif action == 1:
self.fifo_list +=['B']
self.env.process(self._release_raw_material(action))
elif action == 2:
self.fifo_list +=['C']
self.env.process(self._release_raw_material(action))
elif action ==3:
self.reward -= 0
def _reset_simulation(self):
self.env = simpy.Environment()
self.env.raw_material_of_product_a = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_of_product_b = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_of_product_c = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_machine1_product_a = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_machine1_product_b = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_machine1_product_c = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_machine2_product_a = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_machine2_product_b = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_machine2_product_c = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_machine3_product_a = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_machine3_product_b = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_machine3_product_c = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_machine4_product_a = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_machine4_product_b = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_machine4_product_c = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_machine5_product_a = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_machine5_product_b = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.env.raw_material_machine5_product_c = simpy.container(self.env, capacity=self.buffer_capacity, init=self.initial_buffer)
self.finished_jobs_machine1=0
self.finished_jobs_machine2=0
self.finished_jobs_machine3=0
self.finished_jobs_machine4=0
self.finished_jobs_machine5=0
self.consecutive_production_of_product_a=0
self.consecutive_production_of_product_b=0
self.consecutive_production_of_product_c=0
self.consecutive_switches_to_product_a=0
self.consecutive_switches_to_product_b=0
self.consecutive_switches_to_product_c=0
self.consecutive_idle_states=0
self.next_time_stop=0
self.fifo_list=[]
self.reward=self.initial_reward
self.env.process(self._run_machine_1())
self.env.process(self._run_machine_2())
self.env.process(self._run_machine_3())
self.env.process(self._run_machine_4())
self.env.process(self._run_machine_5())
observations=self._get_observations_of_the_buffer_level()
return observations
def step(self,action):
self.reward=self.initial_reward
self._islegal(action)
self._perform_action(action)
if False:
print('t=' + str(self.env.now) + ' ' + str(self._get_observations()) + ' ' + str(self.reward))
self.next_time_stop +=self.time_step
self.env.run(untill=self.next_time_stop)
observations = self._get_observations_of_the_buffer_level()
cost_for_storage=sum(self._get_buffer_level()[:3])
if action!=3:
self.consecutive_idle_states = 0
else:
self.consecutive_idle_states += 1
reward = self.reward - cost_for_storage - (self.consecutive_idle_states*5)
terminal= True if (self.env.now >= self.simulation_duration) else False
info = dict()
return(observations,reward,terminal,info)
Normally the simulation runs but when I am calling this function to another simulation it is showing this error.
You tried to do something like
sim1 = process_simulation()
sim1.reset()
or
sim1 = process_simulation()
var1 = sim1.reset
which you obviously neither define reset method or initialize reset attribute in your init.
That question does NOT provide enough info and it's a really terrible question. Like @buran said in the comment.