Search code examples
pythonreinforcement-learningamazon-sagemakerray

How to make the inputs and model have the same shape (RLlib Ray Sagemaker reinforcement learning)


I have a mismatch in shapes between inputs and the model of my reinforcement learning project.

I have been closely following the AWS examples, specifically the cartpole example. However I have built my own custom environment. What I am struggling to understand is how to change my environment so that it is able to work with the prebuilt Ray RLEstimator.

Here is the code for the environment:

from enum import Enum
import math

import gym
from gym import error, spaces, utils, wrappers
from gym.utils import seeding
from gym.envs.registration import register
from gym.spaces import Discrete, Box


import numpy as np

# from float_space import FloatSpace


def sigmoid_price_fun(x, maxcust, gamma):
    return maxcust / (1 + math.exp(gamma * max(0, x)))


class Actions(Enum):
    DECREASE_PRICE = 0
    INCREASE_PRICE = 1
    HOLD = 2


PRICE_ADJUSTMENT = {
    Actions.DECREASE_PRICE: -0.25,
    Actions.INCREASE_PRICE: 0.25,
    Actions.HOLD: 0
}


class ArrivalSim(gym.Env):
    """ Simple environment for price optimising RL learner. """


    def __init__(self, price):
        """
        Parameters
        ----------
        price : float
            The initial price to use.
        """
        super().__init__()
        self.price = price
        self.revenue = 0
        self.action_space = Discrete(3)  # [0, 1, 2]  #increase or decrease
        self.observation_space = Box(np.array(0.0),np.array(1000))
#         self.observation_space = FloatSpace(price)

    def step(self, action):
        """ Enacts the specified action in the environment.

        Returns the new price, reward, whether we're finished and an empty dict for compatibility with Gym's
        interface. """

        self._take_action(Actions(action))
        next_state = self.price
#         next_state = self.observation_space.sample()
        reward = self._get_reward()
        done = False

        if next_state < 0 or reward == 0:
            done = True

        print(next_state, reward, done, {})

        return np.array(next_state), reward, done, {}

    def reset(self):
        """ Resets the environment, selecting a random initial price. Returns the price. """

#         self.observation_space.value = np.random.rand()
#         return self.observation_space.sample()
        self.price = np.random.rand()
        return self.price

    def _take_action(self, action):
#         self.observation_space.value += PRICE_ADJUSTMENT[action]
        self.price += PRICE_ADJUSTMENT[action]

    def _get_reward(self,price):
#         price = self.observation_space.value
#         return max(np.random.poisson(sigmoid_price_fun(price, 50, 0.5)) * price, 0)
        self.revenue = max(np.random.poisson(sigmoid_price_fun(self.price, 50, 0.5)) * self.price, 0)
        return max(np.random.poisson(sigmoid_price_fun(self.price, 50, 0.5)) * self.price, 0)


#     def render(self, mode='human'):
#         super().render(mode)

def testEnv():
    register(
        id='ArrivalSim-v0',
        entry_point='env:ArrivalSim',
        kwargs= {'price' : 40}
    )
    env = gym.make('ArrivalSim-v0')

    env.reset()
    for _ in range(20):
        test = env.action_space.sample()
        print(test)
        print(env.observation_space)
        env.step(test)  # take a random action
    env.close()



if __name__ =='__main__':

    testEnv()

Here is the training script

import json
import os

import gym
import ray
from ray.tune import run_experiments
from ray.tune.registry import register_env
from gym.envs.registration import register

from sagemaker_rl.ray_launcher import SageMakerRayLauncher


def create_environment(env_config):
    import gym
#     from gym.spaces import Space
    from gym.envs.registration import register

    # This import must happen inside the method so that worker processes import this code
    register(
        id='ArrivalSim-v0',
        entry_point='env:ArrivalSim',
        kwargs= {'price' : 40}
    )
    return gym.make('ArrivalSim-v0')



class MyLauncher(SageMakerRayLauncher):

    def register_env_creator(self):
        register_env("ArrivalSim-v0", create_environment)

    def get_experiment_config(self):
        return {
          "training": {
            "env": "ArrivalSim-v0",
            "run": "PPO",
            "stop": {
              "episode_reward_mean": 5000,
            },
            "config": {
              "gamma": 0.995,
              "kl_coeff": 1.0,
              "num_sgd_iter": 10,
              "lr": 0.0001,
              "sgd_minibatch_size": 32768,
              "train_batch_size": 320000,
              "monitor": False,  # Record videos.
              "model": {
                "free_log_std": False
              },
              "use_gae": False,
              "num_workers": (self.num_cpus-1),
              "num_gpus": self.num_gpus,
              "batch_mode": "complete_episodes"

            }
          }
        }

if __name__ == "__main__":
    MyLauncher().train_main()

Here is the code I run in Jupyter:

metric_definitions = RLEstimator.default_metric_definitions(RLToolkit.RAY)
environment = env = {
    'SAGEMAKER_REQUIREMENTS': 'requirements.txt', # path relative to `source_dir` below.
}

estimator = RLEstimator(entry_point="train.py",
                        source_dir='.',
                        toolkit=RLToolkit.RAY,
                        toolkit_version='0.6.5',
                        framework=RLFramework.TENSORFLOW,
                        dependencies=["sagemaker_rl"],
#                         image_name='price-response-ray-cpu',
                        role=role,
#                         train_instance_type="ml.c5.2xlarge",
                        train_instance_type='local',
                        train_instance_count=1,
#                         output_path=s3_output_path,
#                         base_job_name=job_name_prefix,
                        metric_definitions=metric_definitions
#                         hyperparameters={
                          # Attention scientists!  You can override any Ray algorithm parameter here:
                          #"rl.training.config.horizon": 5000,
                          #"rl.training.config.num_sgd_iter": 10,
                        #}
                    )

estimator.fit(wait=True)
job_name = estimator.latest_training_job.job_name
print("Training job: %s" % job_name)

The error message I have been receiving has been the following:

algo-1-dxwxx_1  | == Status ==
algo-1-dxwxx_1  | Using FIFO scheduling algorithm.
algo-1-dxwxx_1  | Resources requested: 0/3 CPUs, 0/0 GPUs
algo-1-dxwxx_1  | Memory usage on this node: 1.1/4.1 GB
algo-1-dxwxx_1  | 
algo-1-dxwxx_1  | == Status ==
algo-1-dxwxx_1  | Using FIFO scheduling algorithm.
algo-1-dxwxx_1  | Resources requested: 2/3 CPUs, 0/0 GPUs
algo-1-dxwxx_1  | Memory usage on this node: 1.4/4.1 GB
algo-1-dxwxx_1  | Result logdir: /opt/ml/output/intermediate/training
algo-1-dxwxx_1  | Number of trials: 1 ({'RUNNING': 1})
algo-1-dxwxx_1  | RUNNING trials:
algo-1-dxwxx_1  |  - PPO_ArrivalSim-v0_0:   RUNNING
algo-1-dxwxx_1  | 
algo-1-dxwxx_1  | (pid=72) 2019-08-30 09:35:13,030  WARNING ppo.py:172 -- FYI: By default, the value function will not share layers with the policy model ('vf_share_layers': False).
algo-1-dxwxx_1  | 2019-08-30 09:35:13,063   ERROR trial_runner.py:460 -- Error processing event.
algo-1-dxwxx_1  | Traceback (most recent call last):
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/tune/trial_runner.py", line 409, in _process_trial
algo-1-dxwxx_1  |     result = self.trial_executor.fetch_result(trial)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/tune/ray_trial_executor.py", line 314, in fetch_result
algo-1-dxwxx_1  |     result = ray.get(trial_future[0])
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/worker.py", line 2316, in get
algo-1-dxwxx_1  |     raise value
algo-1-dxwxx_1  | ray.exceptions.RayTaskError: ray_worker (pid=72, host=b9b15d495b68)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/models/model.py", line 83, in __init__
algo-1-dxwxx_1  |     restored, num_outputs, options)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/models/model.py", line 135, in _build_layers_v2
algo-1-dxwxx_1  |     raise NotImplementedError
algo-1-dxwxx_1  | NotImplementedError
algo-1-dxwxx_1  | 
algo-1-dxwxx_1  | During handling of the above exception, another exception occurred:
algo-1-dxwxx_1  | 
algo-1-dxwxx_1  | ray_worker (pid=72, host=b9b15d495b68)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/agents/agent.py", line 276, in __init__
algo-1-dxwxx_1  |     Trainable.__init__(self, config, logger_creator)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/tune/trainable.py", line 88, in __init__
algo-1-dxwxx_1  |     self._setup(copy.deepcopy(self.config))
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/agents/agent.py", line 373, in _setup
algo-1-dxwxx_1  |     self._init()
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/agents/ppo/ppo.py", line 77, in _init
algo-1-dxwxx_1  |     self.env_creator, self._policy_graph)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/agents/agent.py", line 506, in make_local_evaluator
algo-1-dxwxx_1  |     extra_config or {}))
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/agents/agent.py", line 714, in _make_evaluator
algo-1-dxwxx_1  |     async_remote_worker_envs=config["async_remote_worker_envs"])
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/evaluation/policy_evaluator.py", line 288, in __init__
algo-1-dxwxx_1  |     self._build_policy_map(policy_dict, policy_config)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/evaluation/policy_evaluator.py", line 661, in _build_policy_map
algo-1-dxwxx_1  |     policy_map[name] = cls(obs_space, act_space, merged_conf)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/agents/ppo/ppo_policy_graph.py", line 176, in __init__
algo-1-dxwxx_1  |     seq_lens=existing_seq_lens)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/models/catalog.py", line 215, in get_model
algo-1-dxwxx_1  |     seq_lens)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/models/catalog.py", line 255, in _get_model
algo-1-dxwxx_1  |     num_outputs, options)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/models/model.py", line 86, in __init__
algo-1-dxwxx_1  |     input_dict["obs"], num_outputs, options)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/ray/rllib/models/fcnet.py", line 37, in _build_layers
algo-1-dxwxx_1  |     scope=label)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/tensorflow/contrib/framework/python/ops/arg_scope.py", line 182, in func_with_args
algo-1-dxwxx_1  |     return func(*args, **current_args)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/tensorflow/contrib/layers/python/layers/layers.py", line 1854, in fully_connected
algo-1-dxwxx_1  |     outputs = layer.apply(inputs)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/base_layer.py", line 817, in apply
algo-1-dxwxx_1  |     return self.__call__(inputs, *args, **kwargs)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/layers/base.py", line 374, in __call__
algo-1-dxwxx_1  |     outputs = super(Layer, self).__call__(inputs, *args, **kwargs)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/base_layer.py", line 730, in __call__
algo-1-dxwxx_1  |     self._assert_input_compatibility(inputs)
algo-1-dxwxx_1  |   File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/base_layer.py", line 1493, in _assert_input_compatibility
algo-1-dxwxx_1  |     str(x.shape.as_list()))
algo-1-dxwxx_1  | ValueError: Input 0 of layer default/fc1 is incompatible with the layer: : expected min_ndim=2, found ndim=1. Full shape received: [None]
algo-1-dxwxx_1  | 
algo-1-dxwxx_1  | 2019-08-30 09:35:13,064   INFO ray_trial_executor.py:178 -- Destroying actor for trial PPO_ArrivalSim-v0_0. If your trainable is slow to initialize, consider setting reuse_actors=True to reduce actor creation overheads.
algo-1-dxwxx_1  | 2019-08-30 09:35:13,076   INFO trial_runner.py:497 -- Attempting to recover trial state from last checkpoint.
algo-1-dxwxx_1  | (pid=72) 2019-08-30 09:35:13,041  INFO policy_evaluator.py:278 -- Creating policy evaluation worker 0 on CPU (please ignore any CUDA init errors)

I am not sure how to change the input the environment gives to the model or the models setup itself. It seems the documentations are quite obscure. I have a hunch that problem lies with the observation and action spaces

Here is the reference to the original aws project example: https://github.com/awslabs/amazon-sagemaker-examples/tree/master/reinforcement_learning/rl_roboschool_ray


Solution

  • Possible reason:

    The error message:

    ValueError: Input 0 of layer default/fc1 is incompatible with the layer: : expected min_ndim=2, found ndim=1. Full shape received: [None]

    Your original environment obs space is self.observation_space = Box(np.array(0.0),np.array(1000)).

    Displaying the shape of your environment obs space gives:

    print(Box(np.array(0.0), np.array(1000), dtype=np.float32).shape) = ()

    This could be indicated by Full shape received: [None] in the error message.

    If you pass the shape (1,1) into np.zeros, you get the expected min_ndim=2:

    x = np.zeros((1, 1)) print(x) [[0.]] print(x.ndim) 2

    Suggested solution:

    I assume that you want your environment obs space to range from 0.0 to 1000.0 as indicated by the self.price = np.random.rand() in your reset function.

    Try using the following for your environment obs space:

    self.observation_space = Box(0.0, 1000.0, shape=(1,1), dtype=np.float32)

    I hope that by setting the Box with an explicit shape helps.

    EDIT (20190910):

    To show that it works, truncated output from Sagemaker (Jupyter notebook instance):

    .
    .
    .
    algo-1-y2ayw_1  | price b = 0.439261780930142
    algo-1-y2ayw_1  | price a = 0.439261780930142
    algo-1-y2ayw_1  | (self.price).shape = (1,)
    algo-1-y2ayw_1  | [0.43926178] 10.103020961393266 False {}
    algo-1-y2ayw_1  | price b = 0.439261780930142
    algo-1-y2ayw_1  | price a = 0.439261780930142
    algo-1-y2ayw_1  | (self.price).shape = (1,)
    algo-1-y2ayw_1  | [0.43926178] 9.663759180463124 False {}
    algo-1-y2ayw_1  | price b = 0.439261780930142
    algo-1-y2ayw_1  | price a = 0.189261780930142
    algo-1-y2ayw_1  | (self.price).shape = (1,)
    algo-1-y2ayw_1  | [0.18926178] 5.67785342790426 False {}
    algo-1-y2ayw_1  | price b = 0.189261780930142
    algo-1-y2ayw_1  | price a = -0.06073821906985799
    algo-1-y2ayw_1  | (self.price).shape = (1,)
    algo-1-y2ayw_1  | [-0.06073822] 0 True {}
    algo-1-y2ayw_1  | Result for PPO_ArrivalSim-v0_0:
    algo-1-y2ayw_1  |   date: 2019-09-10_11-51-13
    algo-1-y2ayw_1  |   done: true
    algo-1-y2ayw_1  |   episode_len_mean: 126.72727272727273
    algo-1-y2ayw_1  |   episode_reward_max: 15772.677709596366
    algo-1-y2ayw_1  |   episode_reward_mean: 2964.4609668691965
    algo-1-y2ayw_1  |   episode_reward_min: 0.0
    algo-1-y2ayw_1  |   episodes: 5
    algo-1-y2ayw_1  |   experiment_id: 5d3b9f2988854a0db164a2e5e9a7550f
    algo-1-y2ayw_1  |   hostname: 2dae585dcc65
    algo-1-y2ayw_1  |   info:
    algo-1-y2ayw_1  |     cur_lr: 4.999999873689376e-05
    algo-1-y2ayw_1  |     entropy: 1.0670874118804932
    algo-1-y2ayw_1  |     grad_time_ms: 1195.066
    algo-1-y2ayw_1  |     kl: 3.391784191131592
    algo-1-y2ayw_1  |     load_time_ms: 44.725
    algo-1-y2ayw_1  |     num_steps_sampled: 463
    algo-1-y2ayw_1  |     num_steps_trained: 463
    algo-1-y2ayw_1  |     policy_loss: -0.05383850634098053
    algo-1-y2ayw_1  |     sample_time_ms: 621.282
    algo-1-y2ayw_1  |     total_loss: 2194493.5
    algo-1-y2ayw_1  |     update_time_ms: 145.352
    algo-1-y2ayw_1  |     vf_explained_var: -5.519390106201172e-05
    algo-1-y2ayw_1  |     vf_loss: 2194492.5
    algo-1-y2ayw_1  |   iterations_since_restore: 2
    algo-1-y2ayw_1  |   node_ip: 172.18.0.2
    algo-1-y2ayw_1  |   pid: 77
    algo-1-y2ayw_1  |   policy_reward_mean: {}
    algo-1-y2ayw_1  |   time_since_restore: 4.55129861831665
    algo-1-y2ayw_1  |   time_this_iter_s: 1.3484764099121094
    algo-1-y2ayw_1  |   time_total_s: 4.55129861831665
    algo-1-y2ayw_1  |   timestamp: 1568116273
    algo-1-y2ayw_1  |   timesteps_since_restore: 463
    algo-1-y2ayw_1  |   timesteps_this_iter: 234
    algo-1-y2ayw_1  |   timesteps_total: 463
    algo-1-y2ayw_1  |   training_iteration: 2
    algo-1-y2ayw_1  |
    algo-1-y2ayw_1  | A worker died or was killed while executing task 00000000781a7b5b94a203683f8f789e593abbb1.
    algo-1-y2ayw_1  | A worker died or was killed while executing task 00000000d3507bc6b41ee1c9fc36292eeae69557.
    algo-1-y2ayw_1  | == Status ==
    algo-1-y2ayw_1  | Using FIFO scheduling algorithm.
    algo-1-y2ayw_1  | Resources requested: 0/3 CPUs, 0/0 GPUs
    algo-1-y2ayw_1  | Result logdir: /opt/ml/output/intermediate/training
    algo-1-y2ayw_1  | TERMINATED trials:
    algo-1-y2ayw_1  |  - PPO_ArrivalSim-v0_0:   TERMINATED [pid=77], 4 s, 2 iter, 463 ts, 2.96e+03 rew
    algo-1-y2ayw_1  |
    algo-1-y2ayw_1  | Saved model configuration.
    algo-1-y2ayw_1  | Saved the checkpoint file /opt/ml/output/intermediate/training/PPO_ArrivalSim-v0_0_2019-09-10_11-50-53vd32vlux/checkpoint-2.extra_data as /opt/ml/model/checkpoint.extra_data
    algo-1-y2ayw_1  | Saved the checkpoint file /opt/ml/output/intermediate/training/PPO_ArrivalSim-v0_0_2019-09-10_11-50-53vd32vlux/checkpoint-2.tune_metadata as /opt/ml/model/checkpoint.tune_metadata
    algo-1-y2ayw_1  | Created LogSyncer for /root/ray_results/PPO_ArrivalSim-v0_2019-09-10_11-51-13xdn_5i34 -> None
    algo-1-y2ayw_1  | 2019-09-10 11:51:13.941718: I tensorflow/core/common_runtime/process_util.cc:71] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
    algo-1-y2ayw_1  | reset -> (self.price).shape =  (1,)
    algo-1-y2ayw_1  | LocalMultiGPUOptimizer devices ['/cpu:0']
    algo-1-y2ayw_1  | reset -> (self.price).shape =  (1,)
    algo-1-y2ayw_1  | INFO:tensorflow:No assets to save.
    algo-1-y2ayw_1  | No assets to save.
    algo-1-y2ayw_1  | INFO:tensorflow:No assets to write.
    algo-1-y2ayw_1  | No assets to write.
    algo-1-y2ayw_1  | INFO:tensorflow:SavedModel written to: /opt/ml/model/1/saved_model.pb
    algo-1-y2ayw_1  | SavedModel written to: /opt/ml/model/1/saved_model.pb
    algo-1-y2ayw_1  | Saved TensorFlow serving model!
    algo-1-y2ayw_1  | A worker died or was killed while executing task 00000000f352d985b807ca399460941fe2264899.
    
    algo-1-y2ayw_1  | 2019-09-10 11:51:20,075 sagemaker-containers INFO
        
     Reporting training SUCCESS
    
    tmpwwb4b358_algo-1-y2ayw_1 exited with code 0
    
    Aborting on container exit...
    Failed to delete: /tmp/tmpwwb4b358/algo-1-y2ayw Please remove it manually.
    
    ===== Job Complete =====
    

    This time I make edits in all 3 files. Your environment, training script & the Jupyter notebook but it turns out that there isn't a need to define custom models for your custom environment. However, that remains viable. And you're right, the main cause of the issue is still in the obs space.

    I set self.price to be a 1D numpy array to make it talk better with Ray RLlib. The creation of the custom environment in the training script was done in a simpler way as shown below. As for the notebook, I used version 0.5.3 instead of 0.6.5 for toolkit_version & the training is done in local mode (in the docker container on the Sagemaker Jupyter notebook instance, still on AWS) with CPU only. However, it will also work with any ML instance (e.g ml.m4.xlarge) with GPU.

    The entire package along with all dependencies is in here.

    The edited env:

    # new
    from __future__ import absolute_import
    from __future__ import division
    from __future__ import print_function
    # end new
    
    
    from enum import Enum
    import math
    
    import gym
    from gym import error, spaces, utils, wrappers
    from gym.utils import seeding
    from gym.envs.registration import register
    from gym.spaces import Discrete, Box
    
    import numpy as np
    
    
    def sigmoid_price_fun(x, maxcust, gamma):
        return maxcust / (1 + math.exp(gamma * max(0, x)))
    
    
    class Actions(Enum):
        DECREASE_PRICE = 0
        INCREASE_PRICE = 1
        HOLD = 2
    
    
    PRICE_ADJUSTMENT = {
        Actions.DECREASE_PRICE: -0.25,
        Actions.INCREASE_PRICE: 0.25,
        Actions.HOLD: 0
    }
    
    
    class ArrivalSim(gym.Env):
        """ Simple environment for price optimising RL learner. """
    
        def __init__(self, price):
            """
            Parameters
            ----------
            price : float
                The initial price to use.
            """
            super().__init__()
    
            self.price = price
            self.revenue = 0
            self.action_space = Discrete(3)  # [0, 1, 2]  #increase or decrease
            # original obs space:
            #self.observation_space = Box(0.0, 1000.0, shape=(1,1), dtype=np.float32)
            # obs space initially suggested:
            #self.observation_space = Box(0.0, 1000.0, shape=(1,1), dtype=np.float32)
            # obs space suggested in this edit:
            self.observation_space = spaces.Box(np.array([0.0]), np.array([1000.0]), dtype=np.float32)
    
        def step(self, action):
            """ Enacts the specified action in the environment.
    
            Returns the new price, reward, whether we're finished and an empty dict for compatibility with Gym's
            interface. """
    
            self._take_action(Actions(action))
    
            next_state = self.price
            print('(self.price).shape =', (self.price).shape)
            #next_state = self.observation_space.sample()
    
            reward = self._get_reward()
            done = False
    
            if next_state < 0 or reward == 0:
                done = True
            
            print(next_state, reward, done, {})
    
            return np.array(next_state), reward, done, {}
    
        def reset(self):
            """ Resets the environment, selecting a random initial price. Returns the price. """
            #self.observation_space.value = np.random.rand()
            #return self.observation_space.sample()
            
            self.price = np.random.rand(1)
            
            print('reset -> (self.price).shape = ', (self.price).shape)
    
            return self.price
    
        def _take_action(self, action):
    #         self.observation_space.value += PRICE_ADJUSTMENT[action]
            #print('price b =', self.price)
            print('price b =', self.price[0])
            #print('price b =', self.price[[0]])
            #self.price += PRICE_ADJUSTMENT[action]
            self.price[0] += PRICE_ADJUSTMENT[action]
            #self.price[[0]] += PRICE_ADJUSTMENT[action]
            #print('price a =', self.price)
            print('price a =', self.price[0])
            #print('price a =', self.price[[0]])
    
        #def _get_reward(self, price):
        def _get_reward(self):
    #         price = self.observation_space.value
    #         return max(np.random.poisson(sigmoid_price_fun(price, 50, 0.5)) * price, 0)
            #self.revenue = max(np.random.poisson(sigmoid_price_fun(self.price, 50, 0.5)) * self.price, 0)
            #return max(np.random.poisson(sigmoid_price_fun(self.price, 50, 0.5)) * self.price, 0)
            self.revenue = max(np.random.poisson(sigmoid_price_fun(self.price[0], 50, 0.5)) * self.price[0], 0)
            return max(np.random.poisson(sigmoid_price_fun(self.price[0], 50, 0.5)) * self.price[0], 0)
    
    #     def render(self, mode='human'):
    #         super().render(mode)
    
    def testEnv():
        """
        register(
            id='ArrivalSim-v0',
            entry_point='env:ArrivalSim',
            kwargs= {'price' : 40.0}
        )
        env = gym.make('ArrivalSim-v0')
        """
        env = ArrivalSim(30.0)
    
        val = env.reset()
        print('val.shape = ', val.shape)
    
        for _ in range(5):
            print('env.observation_space =', env.observation_space)
            act = env.action_space.sample()
            print('\nact =', act)
            next_state, reward, done, _ = env.step(act)  # take a random action
            print('next_state = ', next_state)
        env.close()
    
    
    
    if __name__ =='__main__':
    
        testEnv()
    

    The edited training script:

    import json
    import os
    
    import gym
    import ray
    from ray.tune import run_experiments
    import ray.rllib.agents.a3c as a3c
    import ray.rllib.agents.ppo as ppo
    from ray.tune.registry import register_env
    from mod_op_env import ArrivalSim
    
    from sagemaker_rl.ray_launcher import SageMakerRayLauncher
            
    """
    def create_environment(env_config):
        import gym
    #     from gym.spaces import Space
        from gym.envs.registration import register
    
        # This import must happen inside the method so that worker processes import this code
        register(
            id='ArrivalSim-v0',
            entry_point='env:ArrivalSim',
            kwargs= {'price' : 40}
        )
        return gym.make('ArrivalSim-v0')
    """
    def create_environment(env_config):
        price = 30.0
        # This import must happen inside the method so that worker processes import this code
        from mod_op_env import ArrivalSim
        return ArrivalSim(price)
    
    
    class MyLauncher(SageMakerRayLauncher):
        def __init__(self):        
            super(MyLauncher, self).__init__()
            self.num_gpus = int(os.environ.get("SM_NUM_GPUS", 0))
            self.hosts_info = json.loads(os.environ.get("SM_RESOURCE_CONFIG"))["hosts"]
            self.num_total_gpus = self.num_gpus * len(self.hosts_info)
            
        def register_env_creator(self):
            register_env("ArrivalSim-v0", create_environment)
    
        def get_experiment_config(self):
            return {
              "training": {
                "env": "ArrivalSim-v0",
                "run": "PPO",
                "stop": {
                  "training_iteration": 3,
                },
                  
                "local_dir": "/opt/ml/model/",
                "checkpoint_freq" : 3,
                  
                "config": {                                
                  #"num_workers": max(self.num_total_gpus-1, 1),
                  "num_workers": max(self.num_cpus-1, 1),
                  #"use_gpu_for_workers": False,
                  "train_batch_size": 128, #5,
                  "sample_batch_size": 32, #1,
                  "gpu_fraction": 0.3,
                  "optimizer": {
                    "grads_per_step": 10
                  },
                },
                #"trial_resources": {"cpu": 1, "gpu": 0, "extra_gpu": max(self.num_total_gpus-1, 1), "extra_cpu": 0},
                #"trial_resources": {"cpu": 1, "gpu": 0, "extra_gpu": max(self.num_total_gpus-1, 0),
                #                    "extra_cpu": max(self.num_cpus-1, 1)},
                "trial_resources": {"cpu": 1,
                                    "extra_cpu": max(self.num_cpus-1, 1)},              
              }
            }
    
    if __name__ == "__main__":
        os.environ["LC_ALL"] = "C.UTF-8"
        os.environ["LANG"] = "C.UTF-8"
        os.environ["RAY_USE_XRAY"] = "1"
        print(ppo.DEFAULT_CONFIG)
        MyLauncher().train_main()
    
    

    The notebook code:

    !/bin/bash ./setup.sh
    
    from time import gmtime, strftime
    import sagemaker 
    role = sagemaker.get_execution_role()
    
    sage_session = sagemaker.session.Session()
    s3_bucket = sage_session.default_bucket()  
    s3_output_path = 's3://{}/'.format(s3_bucket)
    print("S3 bucket path: {}".format(s3_output_path))
    
    job_name_prefix = 'ArrivalSim'
    
    from sagemaker.rl import RLEstimator, RLToolkit, RLFramework
    
    estimator = RLEstimator(entry_point="mod_op_train.py", # Our launcher code
                            source_dir='src', # Directory where the supporting files are at. All of this will be
                                              # copied into the container.
                            dependencies=["common/sagemaker_rl"], # some other utils files.
                            toolkit=RLToolkit.RAY, # We want to run using the Ray toolkit against the ray container image.
                            framework=RLFramework.TENSORFLOW, # The code is in tensorflow backend.
                            toolkit_version='0.5.3', # Toolkit version. This will also choose an apporpriate tf version.                                               
                            #toolkit_version='0.6.5', # Toolkit version. This will also choose an apporpriate tf version.                        
                            role=role, # The IAM role that we created at the begining.
                            #train_instance_type="ml.m4.xlarge", # Since we want to run fast, lets run on GPUs.
                            train_instance_type="local", # Since we want to run fast, lets run on GPUs.
                            train_instance_count=1, # Single instance will also work, but running distributed makes things 
                                                    # fast, particularly in the case of multiple rollout training.
                            output_path=s3_output_path, # The path where we can expect our trained model.
                            base_job_name=job_name_prefix, # This is the name we setup above to be to track our job.
                            hyperparameters = {      # Some hyperparameters for Ray toolkit to operate.
                              "s3_bucket": s3_bucket,
                              "rl.training.stop.training_iteration": 2, # Number of iterations.
                              "rl.training.checkpoint_freq": 2,
                            },
                            #metric_definitions=metric_definitions, # This will bring all the logs out into the notebook.
                        )
    
    estimator.fit()