Creating a Reinforcement Learning Model with Tensorflow

Aryan Jha
6 min readDec 10, 2021

So I was on YouTube, and I saw this amazing video about AI playing hide and seek! (It’s only a few minutes, and it’s really interesting.) I was interested in how it worked, and found out that it was with the power of reinforcement learning.

Reinforcement learning is an interesting part of machine learning. Instead of just predicting numbers or recognizing images, it actually interacts with the environment. A reinforcement learning model can do tasks around your house or even play games!

I decided to look more into reinforcement learning, and came across this tutorial that makes a reinforcement learning model using tensorflow. The tutorial wasn’t really that clear though, and had many unnecessary parts that could be confusing. So, I rewrote the tutorial with only the necessary parts and stuff I thought was cool.

I made a quick video about reinforcement learning, check it out here!

I ran everything in Google Colab, because I found some issues while running it locally. You can try running it in a Jupyter Notebook, but it might not work as some of the commands only work with Linux distributions that have the apt package manager (like ones based on Ubuntu). If you want the exact notebook I used, it’s here.

Enough talking though, let’s get into the coding!

First, let’s download the things we need to download.

!sudo apt-get update!sudo apt-get install -y xvfb ffmpeg freeglut3-dev!pip install 'imageio==2.4.0'!pip install pyvirtualdisplay!pip install tf-agents[reverb]!pip install pyglet

Now, let’s import everything we need.

from __future__ import absolute_import, division, print_functionimport base64import imageioimport IPythonimport matplotlibimport matplotlib.pyplot as pltimport numpy as npimport PIL.Imageimport pyvirtualdisplayimport reverbimport tensorflow as tffrom tf_agents.agents.dqn import dqn_agentfrom tf_agents.drivers import py_driverfrom tf_agents.environments import suite_gymfrom tf_agents.environments import tf_py_environmentfrom tf_agents.eval import metric_utilsfrom tf_agents.metrics import tf_metricsfrom tf_agents.networks import sequentialfrom tf_agents.policies import py_tf_eager_policyfrom tf_agents.policies import random_tf_policyfrom tf_agents.replay_buffers import reverb_replay_bufferfrom tf_agents.replay_buffers import reverb_utilsfrom tf_agents.trajectories import trajectoryfrom tf_agents.specs import tensor_specfrom tf_agents.utils import common

Let’s set up a virtual display so we can see the environment.

display = pyvirtualdisplay.Display(visible=0, size=(1400, 900)).start()

Here, we set up the parameters.

# basically the amount of epochsnum_iterations = 2000 # @param {type:"integer"}initial_collect_steps = 100  # @param {type:"integer"}collect_steps_per_iteration =   1# @param {type:"integer"}replay_buffer_max_length = 100000  # @param {type:"integer"}batch_size = 64  # @param {type:"integer"}learning_rate = 1e-3  # @param {type:"number"}log_interval = 200  # @param {type:"integer"}num_eval_episodes = 10  # @param {type:"integer"}eval_interval = 1000  # @param {type:"integer"}

The “@param” part basically makes editing each parameter easier.

Now, let’s load the environment. The environment has a cart, which will be controlled by the AI, and a pole, which will lean to a side as the cart moves. The AI’s job is to balance the pole by moving the cart.

env_name = 'CartPole-v0'env = suite_gym.load(env_name)

This just renders the environment so you can see it.

env.reset()PIL.Image.fromarray(env.render())

The observation spec is an array with the cart position, cart velocity, pole position, and pole velocity.

print('Observation Spec:')print(env.time_step_spec().observation)

The reward spec is a float that contains the reward that the AI will receive.

print('Reward Spec:')print(env.time_step_spec().reward)

The action spec is 2 values: either move left (0) or move right (1).

print('Action Spec:')print(env.action_spec())

Time_step is basically all of these specs but per frame.

time_step = env.reset()print('Time step:')print(time_step)

This sets a variable that won’t be used later, just for the next line.

action = np.array(1, dtype=np.int32)

And this prints time_step but for frame 2.

next_time_step = env.step(action)print('Next time step:')print(next_time_step)

Here, we set up the training environment and the testing environment.

train_py_env = suite_gym.load(env_name)eval_py_env = suite_gym.load(env_name)

This turns the environment which was written in raw python and has values stored in Numpy arrays to something compatible with our Tensorflow agents.

train_env = tf_py_environment.TFPyEnvironment(train_py_env)eval_env = tf_py_environment.TFPyEnvironment(eval_py_env)

The agent is the algorithm that actually does the learning. Here, we create a DQN agent, which basically takes in values from the environment and uses a neural network to set a value (move left or right).

Here, we make the action spec into a tensor, and set a variable for the number of actions.

fc_layer_params = (100, 50)action_tensor_spec = tensor_spec.from_spec(env.action_spec())num_actions = action_tensor_spec.maximum - action_tensor_spec.minimum + 1

Now, we create a function that creates Keras dense layers with the correct configuration when called.

def dense_layer(num_units):   return tf.keras.layers.Dense(num_units,   activation=tf.keras.activations.relu,   kernel_initializer=tf.keras.initializers.VarianceScaling(   scale=2.0, mode='fan_in', distribution='truncated_normal'))

Here, we create the neural network. The network has dense layers with an output layer to generate one value per available action (move left or right per frame)

dense_layers = [dense_layer(num_units) for num_units in fc_layer_params]q_values_layer = tf.keras.layers.Dense(num_actions, activation=None, kernel_initializer=tf.keras.initializers.RandomUniform(minval=-0.03, maxval=0.03), bias_initializer=tf.keras.initializers.Constant(-0.2)) 
q_net = sequential.Sequential(dense_layers + [q_values_layer])

We create a variable called optimizer to make the code look cleaner later.

optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

We actually create and initialize the agent here.

train_step_counter = tf.Variable(0)agent = dqn_agent.DqnAgent(train_env.time_step_spec(), train_env.action_spec(), q_network=q_net, optimizer=optimizer, td_errors_loss_fn=common.element_wise_squared_loss, train_step_counter=train_step_counter)agent.initialize()

The return is the sum of all rewards while running a policy. We now create a function to calculate the average return for the amount of episodes (20 in this case).

def compute_avg_return(environment, policy, num_episodes=20):

We reset the time_step variable so previous runs of the code don’t mess up this one.

   time_step = environment.reset()

We reset episode_return for the same reason.

   episode_return = 0.0

We check if it isn’t the last frame.

   while not time_step.is_last():

We set action_step, which is what the policy did for that time_step.

      action_step = policy.action(time_step)

Time_step is now the environment after the action.

      time_step = environment.step(action_step.action)

We set episode_return to episode_return + reward.

      episode_return += time_step.reward

We set total_return to total_return + episode_return

   total_return += episode_return

Now, we compute the average return and return that value as a numpy array.

   avg_return = total_return / num_episodes   return avg_return.numpy()[0]

Here, we create a reverb table to keep track of all the data.

table_name = 'uniform_table'replay_buffer_signature = tensor_spec.from_spec(agent.collect_data_spec)replay_buffer_signature = tensor_spec.add_outer_dim(replay_buffer_signature)table = reverb.Table(table_name, max_size=replay_buffer_max_length, sampler=reverb.selectors.Uniform(), remover=reverb.selectors.Fifo(), rate_limiter=reverb.rate_limiters.MinSize(1), signature=replay_buffer_signature)reverb_server = reverb.Server([table])replay_buffer = reverb_replay_buffer.ReverbReplayBuffer(agent.collect_data_spec, table_name=table_name, sequence_length=2, local_server=reverb_server)rb_observer = reverb_utils.ReverbAddTrajectoryObserver(replay_buffer.py_client, table_name, sequence_length=2)

Here, we create a dataset from the previous tries.

dataset = replay_buffer.as_dataset(num_parallel_calls=3, sample_batch_size=batch_size, num_steps=2).prefetch(3)

Now, we create a shorter way to call iter(dataset) , which feeds the dataset data to the agent.

iterator = iter(dataset)

This is where we start the training of the agent.

This line seems to just check the latency, but I think I might be wrong. However, removing it seems to make the entire training part stop working, so i suggest keeping it in.

try:     %%timeexcept:     pass

First, we have to reset train_step.

agent.train_step_counter.assign(0)

We have to reset the environment before training so our previous actions don’t affect anything.

time_step = train_py_env.reset()

Here, we create a driver which basically collects the experience.

collect_driver = py_driver.PyDriver(env, py_tf_eager_policy.PyTFEagerPolicy(agent.collect_policy,use_tf_function=True), [rb_observer], max_steps=collect_steps_per_iteration)

Now, we run a loop for the amount of iterations that we specified earlier.

for _ in range(num_iterations):

Here, we collect a few steps and save them.

     time_step, _ = collect_driver.run(time_step)

Now, we update the network with the new data.

     experience, unused_info = next(iterator)     train_loss = agent.train(experience).loss     step = agent.train_step_counter.numpy()

We basically just display metrics like the loss and the average return for each chunk of steps (200 in this case).

     if step % log_interval == 0:          print('step = {0}: loss = {1}'.format(step, train_loss))     if step % eval_interval == 0:          avg_return = compute_avg_return(eval_env, agent.policy,   num_eval_episodes)          print('step = {0}: Average Return = {1}'.format(step, avg_return))
returns.append(avg_return)

And, if you run it, you will be done!

Another interesting thing in the tutorial was creating a video to visualise what the model is actually doing.

This just basically allows you to create mp4 files easily.

def embed_mp4(filename):
video = open(filename,'rb').read()
b64 = base64.b64encode(video)
tag = '''
<video width="640" height="480" controls>
<source src="data:video/mp4;base64,{0}" type="video/mp4">
Your browser does not support the video tag.
</video>'''.format(b64.decode())

return IPython.display.HTML(tag)

Now, all we need to do is render each frame and add it to the mp4 file.

def create_policy_eval_video(policy, filename, num_episodes=5, fps=30):
filename = filename + ".mp4"
with imageio.get_writer(filename, fps=fps) as video:
for _ in range(num_episodes):
time_step = eval_env.reset()
video.append_data(eval_py_env.render())
while not time_step.is_last():
action_step = policy.action(time_step)
time_step = eval_env.step(action_step.action)
video.append_data(eval_py_env.render())
return embed_mp4(filename)

create_policy_eval_video(agent.policy, "trained-agent")

Congrats! You did it! You now have made a reinforcement learning model that can play this cart pole game. The applications of this kind of machine learning are endless, and that’s why I am so interested in it. I can’t wait to move on to some other more advanced projects.

Stay tuned for some more articles about reinforcement learning and just general AI stuff too.

--

--