作者: Jacob Chapman 和 Mathias Lechner
建立日期 2020/05/23
上次修改日期 2024/03/16
說明: 使用深度 Q 網路玩 Atari 打磚塊。
此腳本展示在 BreakoutNoFrameskip-v4
環境中實作深度 Q 學習。
當代理採取行動並在環境中移動時,它會學習將觀察到的環境狀態對應到一個行動。代理會根據「Q 值」選擇給定狀態下的行動,Q 值是根據預期的最高長期回報加權的獎勵。Q 學習代理學習執行其任務,使建議的行動最大化潛在的未來回報。此方法被認為是一種「離策略」方法,表示它的 Q 值更新時假設已選擇了最佳行動,即使實際上並未選擇最佳行動。
在此環境中,一個板子沿著螢幕底部移動,返回一顆球,球會破壞螢幕頂部的磚塊。遊戲的目標是移除所有磚塊並突破關卡。代理必須學習控制板子,向左和向右移動,返回球並移除所有磚塊,而不會讓球通過板子。
Deepmind 的論文訓練了「總共 5000 萬個影格(也就是說,總共約 38 天的遊戲體驗)」。但是,此腳本將在大約 1000 萬個影格中獲得良好的結果,這些影格在現代機器上處理的時間不到 24 小時。
您可以將 max_episodes
變數設定為大於 0 的值,來控制回合數。
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import keras
from keras import layers
import gymnasium as gym
from gymnasium.wrappers import AtariPreprocessing, FrameStack
import numpy as np
import tensorflow as tf
# Configuration parameters for the whole setup
seed = 42
gamma = 0.99 # Discount factor for past rewards
epsilon = 1.0 # Epsilon greedy parameter
epsilon_min = 0.1 # Minimum epsilon greedy parameter
epsilon_max = 1.0 # Maximum epsilon greedy parameter
epsilon_interval = (
epsilon_max - epsilon_min
) # Rate at which to reduce chance of random action being taken
batch_size = 32 # Size of batch taken from replay buffer
max_steps_per_episode = 10000
max_episodes = 10 # Limit training episodes, will run until solved if smaller than 1
# Use the Atari environment
# Specify the `render_mode` parameter to show the attempts of the agent in a pop up window.
env = gym.make("BreakoutNoFrameskip-v4") # , render_mode="human")
# Environment preprocessing
env = AtariPreprocessing(env)
# Stack four frames
env = FrameStack(env, 4)
env.seed(seed)
A.L.E: Arcade Learning Environment (version 0.8.1+unknown)
[Powered by Stella]
Game console created:
ROM file: /Users/luca/mambaforge/envs/keras-io/lib/python3.9/site-packages/AutoROM/roms/breakout.bin
Cart Name: Breakout - Breakaway IV (1978) (Atari)
Cart MD5: f34f08e5eb96e500e851a80be3277a56
Display Format: AUTO-DETECT ==> NTSC
ROM Size: 2048
Bankswitch Type: AUTO-DETECT ==> 2K
Running ROM file...
Random seed is -975249067
Game console created:
ROM file: /Users/luca/mambaforge/envs/keras-io/lib/python3.9/site-packages/AutoROM/roms/breakout.bin
Cart Name: Breakout - Breakaway IV (1978) (Atari)
Cart MD5: f34f08e5eb96e500e851a80be3277a56
Display Format: AUTO-DETECT ==> NTSC
ROM Size: 2048
Bankswitch Type: AUTO-DETECT ==> 2K
Running ROM file...
Random seed is -1625411987
(3444837047, 2669555309)
此網路學習 Q 表的近似值,這是一種代理將採取的狀態和行動之間的對應。對於每個狀態,我們將有四個可採取的行動。環境提供狀態,而行動是通過選擇輸出層中預測的四個 Q 值中較大的值來選擇的。
num_actions = 4
def create_q_model():
# Network defined by the Deepmind paper
return keras.Sequential(
[
layers.Lambda(
lambda tensor: keras.ops.transpose(tensor, [0, 2, 3, 1]),
output_shape=(84, 84, 4),
input_shape=(4, 84, 84),
),
# Convolutions on the frames on the screen
layers.Conv2D(32, 8, strides=4, activation="relu", input_shape=(4, 84, 84)),
layers.Conv2D(64, 4, strides=2, activation="relu"),
layers.Conv2D(64, 3, strides=1, activation="relu"),
layers.Flatten(),
layers.Dense(512, activation="relu"),
layers.Dense(num_actions, activation="linear"),
]
)
# The first model makes the predictions for Q-values which are used to
# make a action.
model = create_q_model()
# Build a target model for the prediction of future rewards.
# The weights of a target model get updated every 10000 steps thus when the
# loss between the Q-values is calculated the target Q-value is stable.
model_target = create_q_model()
# In the Deepmind paper they use RMSProp however then Adam optimizer
# improves training time
optimizer = keras.optimizers.Adam(learning_rate=0.00025, clipnorm=1.0)
# Experience replay buffers
action_history = []
state_history = []
state_next_history = []
rewards_history = []
done_history = []
episode_reward_history = []
running_reward = 0
episode_count = 0
frame_count = 0
# Number of frames to take random action and observe output
epsilon_random_frames = 50000
# Number of frames for exploration
epsilon_greedy_frames = 1000000.0
# Maximum replay length
# Note: The Deepmind paper suggests 1000000 however this causes memory issues
max_memory_length = 100000
# Train the model after 4 actions
update_after_actions = 4
# How often to update the target network
update_target_network = 10000
# Using huber loss for stability
loss_function = keras.losses.Huber()
while True:
observation, _ = env.reset()
state = np.array(observation)
episode_reward = 0
for timestep in range(1, max_steps_per_episode):
frame_count += 1
# Use epsilon-greedy for exploration
if frame_count < epsilon_random_frames or epsilon > np.random.rand(1)[0]:
# Take random action
action = np.random.choice(num_actions)
else:
# Predict action Q-values
# From environment state
state_tensor = keras.ops.convert_to_tensor(state)
state_tensor = keras.ops.expand_dims(state_tensor, 0)
action_probs = model(state_tensor, training=False)
# Take best action
action = keras.ops.argmax(action_probs[0]).numpy()
# Decay probability of taking random action
epsilon -= epsilon_interval / epsilon_greedy_frames
epsilon = max(epsilon, epsilon_min)
# Apply the sampled action in our environment
state_next, reward, done, _, _ = env.step(action)
state_next = np.array(state_next)
episode_reward += reward
# Save actions and states in replay buffer
action_history.append(action)
state_history.append(state)
state_next_history.append(state_next)
done_history.append(done)
rewards_history.append(reward)
state = state_next
# Update every fourth frame and once batch size is over 32
if frame_count % update_after_actions == 0 and len(done_history) > batch_size:
# Get indices of samples for replay buffers
indices = np.random.choice(range(len(done_history)), size=batch_size)
# Using list comprehension to sample from replay buffer
state_sample = np.array([state_history[i] for i in indices])
state_next_sample = np.array([state_next_history[i] for i in indices])
rewards_sample = [rewards_history[i] for i in indices]
action_sample = [action_history[i] for i in indices]
done_sample = keras.ops.convert_to_tensor(
[float(done_history[i]) for i in indices]
)
# Build the updated Q-values for the sampled future states
# Use the target model for stability
future_rewards = model_target.predict(state_next_sample)
# Q value = reward + discount factor * expected future reward
updated_q_values = rewards_sample + gamma * keras.ops.amax(
future_rewards, axis=1
)
# If final frame set the last value to -1
updated_q_values = updated_q_values * (1 - done_sample) - done_sample
# Create a mask so we only calculate loss on the updated Q-values
masks = keras.ops.one_hot(action_sample, num_actions)
with tf.GradientTape() as tape:
# Train the model on the states and updated Q-values
q_values = model(state_sample)
# Apply the masks to the Q-values to get the Q-value for action taken
q_action = keras.ops.sum(keras.ops.multiply(q_values, masks), axis=1)
# Calculate loss between new Q-value and old Q-value
loss = loss_function(updated_q_values, q_action)
# Backpropagation
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
if frame_count % update_target_network == 0:
# update the the target network with new weights
model_target.set_weights(model.get_weights())
# Log details
template = "running reward: {:.2f} at episode {}, frame count {}"
print(template.format(running_reward, episode_count, frame_count))
# Limit the state and reward history
if len(rewards_history) > max_memory_length:
del rewards_history[:1]
del state_history[:1]
del state_next_history[:1]
del action_history[:1]
del done_history[:1]
if done:
break
# Update running reward to check condition for solving
episode_reward_history.append(episode_reward)
if len(episode_reward_history) > 100:
del episode_reward_history[:1]
running_reward = np.mean(episode_reward_history)
episode_count += 1
if running_reward > 40: # Condition to consider the task solved
print("Solved at episode {}!".format(episode_count))
break
if (
max_episodes > 0 and episode_count >= max_episodes
): # Maximum number of episodes reached
print("Stopped at episode {}!".format(episode_count))
break
訓練前:
訓練初期:
訓練後期: