DQN算法实战-小车上山
- 案例分析
-
- 实验环境
- 用线性近似求解最优策略
- 用深度Q学习求解最优策略
- 参考
案例分析
如图1所示,一个小车在一段范围内行驶。在任一时刻,在水平方向看,小车位置的范围是[-1.2,0.6],速度的范围是[-0.07,0.07]。在每个时刻,智能体可以对小车施加3种动作中的一种:向左施力、不施力、向右施力。智能体施力和小车的水平位置会共同决定小车下一时刻的速度。当某时刻小车的水平位置大于0.5时,控制目标成功达成,回合结束。控制的目标是让小车以尽可能少的步骤达到目标。一般认为,如果智能体在连续100个回合中的平均步数≤110,就认为问题解决了。
在绝大多数情况下,智能体简单向右施力并不足以让小车成功越过目标。假设智能体并不知道环境确定小车位置和速度的数学表达式。事实上,小车的位置和速度是有数学表达式的。记第t时刻(t=0,1,2,…)小车的位置为X t ( X t ∈ [ − 1.2 , 0.6 ] ),速度为V t ( V t ∈ [ − 0.07 , 0.07 ] ) ,智能体施力为A t ∈ 0 , 1 , 2 ,初始状态X 0 ∈ [ − 0.6 , − 0.4 ) , V 0 = 0 。从t时刻到t + 1时刻的更新式为
其中限制函数clip()限制了位置和速度的范围:
实验环境
Gym库内置的环境’MountainCar-v0’已经实现了小车上山环境。在这个环境中,每一步的奖励都是-1,回合的回报的值就是总步数的负数。导入这个环境,并查看其状态空间和动作空间,以及位置和速度的参数。
import numpy as np
np.random.seed(0)
import pandas as pd
import matplotlib.pyplot as plt
import gym
import tensorflow.compat.v2 as tf
tf.random.set_seed(0)
from tensorflow import keras
env = gym.make('MountainCar-v0')
env.seed(0)
print('观测空间 = {}'.format(env.observation_space))
print('动作空间 = {}'.format(env.action_space))
print('位置范围 = {}'.format((env.unwrapped.min_position,
env.unwrapped.max_position)))
print('速度范围 = {}'.format((-env.unwrapped.max_speed,
env.unwrapped.max_speed)))
print('目标位置 = {}'.format(env.unwrapped.goal_position))
使用这个环境。在代码清单2中的策略总是试图向右对小车施力。程序运行结果表明,仅仅简单地向右施力,是不可能让小车达到目标的。为了避免程序无穷尽地运行下去,这里限制了回合最大的步数为200。
positions, velocities = [], []
observation = env.reset()
while True:
positions.append(observation[0])
velocities.append(observation[1])
next_observation, reward, done, _ = env.step(2)
if done:
break
observation = next_observation
if next_observation[0] > 0.5:
print('成功到达')
else:
print('失败退出')
# 绘制位置和速度图像
fig, ax = plt.subplots()
ax.plot(positions, label='position')
ax.plot(velocities, label='velocity')
ax.legend()
plt.show()
用线性近似求解最优策略
本节我们将用形如q ( s , a ) = [ x ( s , a ) ] T w 的线性组合来近似动作价值函数,求解最优策略。
在这个问题中,位置和速度都是连续的变量。要从连续空间中导出数目有限的特征,最简单的方法是采用独热编码(one-hot coding)。如图a所示:在二维的“位置–速度”空间中,我们可将其划分为许多小格。位置轴范围总长是l 位 置 ,每个小格的宽度是δ 位 置 ,那么位置轴有b 位 置 = [ l 位 置 ÷ δ 速 度 ] 个小格;同理,速度范围总长l速度,每个小格长度δ 速度,b 速 度 = [ l 速 度 ÷ δ 速 度 ] 个小格。这样,整个空间有b 位 置 b 速 度 个小格。每个小格对应一个特征:当位置速度对位于某个小格时,那个小格对应的特征为1,其他小格对应的特征均为0。这样,独热编码就从连续的空间中提取出了b 位 置 b 速 度 个特征。采用独热编码后得到的价值函数,对于同一网格内的所有位置速度对,其价值函数的估计都是相同的。所以这只是一种近似。如果要让近似更准确,就要让每个小格的长度δ 位 置 和 δ 速 度更小。但是,这样会增大特征的数目b 位 置 b 速 度 。
砖瓦编码(tile coding)可以在精度相同的情况下减少特征数目。如图b所示,砖瓦编码引入了多层大网格。本节用的m层砖瓦编码,每层的大网格都是原来独热编码小格的m位宽、m位长。在相邻两层之间,在两个维度上都偏移一个独热编码的小格。对于任意的位置速度对,它在每一层都会落在某个大网格里。这样,我们可以让每层中大网格对应的特征为1,其他特征为0。综合考虑所有层,总共大致有b 位 置 b 速 度 / m 个特征,特征数大大减小。
TileCoder类实现了砖瓦编码。构造TileCoder类需要两个参数:参数layers表示要用几层砖瓦编码;参数features表示砖瓦编码应该得到多少特征,即x(s,a)的维度,它也是w的维度。构造TileCoder类对象后,就可以调用这个对象找到每个数据激活了哪些特征。调用的参数floats输入[0,1]间的浮点数的tuple,参数ints输入int元素的tuple(不参与砖瓦编码);返回int型列表,表示激活的参数指标。
class TileCoder:
def __init__(self, layers, features):
self.layers = layers
self.features = features
self.codebook = {}
def get_feature(self, codeword):
if codeword in self.codebook:
return self.codebook[codeword]
count = len(self.codebook)
if count >= self.features: # 冲突处理
return hash(codeword) % self.features
self.codebook[codeword] = count
return count
def __call__(self, floats=(), ints=()):
dim = len(floats)
scaled_floats = tuple(f * self.layers * self.layers for f in floats)
features = []
for layer in range(self.layers):
codeword = (layer,) + tuple(int((f + (1 + dim * i) * layer) /
self.layers) for i, f in enumerate(scaled_floats)) + ints
feature = self.get_feature(codeword)
features.append(feature)
return features
在小车上山任务中,如果我们对观测空间选取8层的砖瓦编码,那么观测空间第0层有8×8=64个砖瓦,剩下8-1=7层有(8+1)×(8+1)=81个砖瓦,一共有64+7×81=631个砖瓦。再考虑到动作有3种可能的取值,那么总共有631×3=1893个特征。接下来,我们运用砖瓦编码来实现函数近似的智能体。以下是函数近似SARSA算法的智能体类SARSAAgent和函数近似SARSA(λ)的智能体类SARSALambdaAgent。
class SARSAAgent:
def __init__(self, env, layers=8, features=1893, gamma=1.,
learning_rate=0.03, epsilon=0.001):
self.action_n = env.action_space.n # 动作数
self.obs_low = env.observation_space.low
self.obs_scale = env.observation_space.high - \
env.observation_space.low # 观测空间范围
self.encoder = TileCoder(layers, features) # 砖瓦编码器
self.w = np.zeros(features) # 权重
self.gamma = gamma # 折扣
self.learning_rate = learning_rate # 学习率
self.epsilon = epsilon # 探索
def encode(self, observation, action): # 编码
states = tuple((observation - self.obs_low) / self.obs_scale)
actions = (action,)
return self.encoder(states, actions)
def get_q(self, observation, action): # 动作价值
features = self.encode(observation, action)
return self.w[features].sum()
def decide(self, observation): # 判决
if np.random.rand() < self.epsilon:
return np.random.randint(self.action_n)
else:
qs = [self.get_q(observation, action) for action in
range(self.action_n)]
return np.argmax(qs)
def learn(self, observation, action, reward,
next_observation, done, next_action): # 学习
u = reward + (1. - done) * self.gamma * \
self.get_q(next_observation, next_action)
td_error = u - self.get_q(observation, action)
features = self.encode(observation, action)
self.w[features] += (self.learning_rate * td_error)
class SARSALambdaAgent(SARSAAgent):
def __init__(self, env, layers=8, features=1893, gamma=1.,
learning_rate=0.03, epsilon=0.001, lambd=0.9):
super().__init__(env=env, layers=layers, features=features,
gamma=gamma, learning_rate=learning_rate, epsilon=epsilon)
self.lambd = lambd
self.z = np.zeros(features) # 初始化资格迹
def learn(self, observation, action, reward, next_observation, done,
next_action):
u = reward
if not done:
u += (self.gamma * self.get_q(next_observation, next_action))
self.z *= (self.gamma * self.lambd)
features = self.encode(observation, action)
self.z[features] = 1. # 替换迹
td_error = u - self.get_q(observation, action)
self.w += (self.learning_rate * td_error * self.z)
if done:
self.z = np.zeros_like(self.z) # 为下一回合初始化资格迹
运用环境对象env和构造好的智能体对象agent,我们就可以用函数play_sarsa()训练智能体。对于训练了300个回合的SARSAAgent,平均回合奖励可以达到-121左右;对于训练了150个回合的SARSALambdaAgent,平均回合奖励可以达到-107左右。在这个实现中,SARSA(λ)算法比SARSA算法更为高效。事实上,SARSA(λ)算法是针对小车上山这个任务最有效的方法之一。
用深度Q学习求解最优策略
首先我们来看经验回放。代码清单中的类DQNReplayer实现了经验回放。构造这个类的参数中有个int型的参数capacity,表示存储空间最多可以存储几条经验。当要存储的经验数超过capacity时,会用最新的经验覆盖最早存入的经验。
class DQNReplayer:
def __init__(self, capacity):
self.memory = pd.DataFrame(index=range(capacity),
columns=['observation', 'action', 'reward',
'next_observation', 'done'])
self.i = 0
self.count = 0
self.capacity = capacity
def store(self, *args):
self.memory.loc[self.i] = args
self.i = (self.i + 1) % self.capacity
self.count = min(self.count + 1, self.capacity)
def sample(self, size):
indices = np.random.choice(self.count, size=size)
return (np.stack(self.memory.loc[indices, field]) for field in
self.memory.columns)
接下来我们来看函数近似部分。函数近似采用了矢量形式的近似函数q ( s ; w ) , s ∈ ( S ) q(s;w),s∈(\mathcal{S})q(s;w),s∈(S),近似函数的形式为全连接神经网络。以下分别实现了带目标网络的深度Q学习智能体和双重Q学习智能体。它们和play_qlearning()函数结合,就实现了带目标网络的深度Q学习算法和双重Q学习算法。
class DQNAgent:
def __init__(self, env, net_kwargs={}, gamma=0.99, epsilon=0.001,
replayer_capacity=10000, batch_size=64):
observation_dim = env.observation_space.shape[0]
self.action_n = env.action_space.n
self.gamma = gamma
self.epsilon = epsilon
self.batch_size = batch_size
self.replayer = DQNReplayer(replayer_capacity) # 经验回放
self.evaluate_net = self.build_network(input_size=observation_dim,
output_size=self.action_n, **net_kwargs) # 评估网络
self.target_net = self.build_network(input_size=observation_dim,
output_size=self.action_n, **net_kwargs) # 目标网络
self.target_net.set_weights(self.evaluate_net.get_weights())
def build_network(self, input_size, hidden_sizes, output_size,
activation=tf.nn.relu, output_activation=None,
learning_rate=0.01): # 构建网络
model = keras.Sequential()
for layer, hidden_size in enumerate(hidden_sizes):
kwargs = dict(input_shape=(input_size,)) if not layer else {}
model.add(keras.layers.Dense(units=hidden_size,
activation=activation, **kwargs))
model.add(keras.layers.Dense(units=output_size,
activation=output_activation)) # 输出层
optimizer = tf.optimizers.Adam(lr=learning_rate)
model.compile(loss='mse', optimizer=optimizer)
return model
def learn(self, observation, action, reward, next_observation, done):
self.replayer.store(observation, action, reward, next_observation,
done) # 存储经验
observations, actions, rewards, next_observations, dones = \
self.replayer.sample(self.batch_size) # 经验回放
next_qs = self.target_net.predict(next_observations)
next_max_qs = next_qs.max(axis=-1)
us = rewards + self.gamma * (1. - dones) * next_max_qs
targets = self.evaluate_net.predict(observations)
targets[np.arange(us.shape[0]), actions] = us
self.evaluate_net.fit(observations, targets, verbose=0)
if done: # 更新目标网络
self.target_net.set_weights(self.evaluate_net.get_weights())
def decide(self, observation): # epsilon贪心策略
if np.random.rand() < self.epsilon:
return np.random.randint(self.action_n)
qs = self.evaluate_net.predict(observation[np.newaxis])
return np.argmax(qs)
def play_qlearning(env, agent, train=False, render=False):
episode_reward = 0
observation = env.reset()
while True:
if render:
env.render()
action = agent.decide(observation)
next_observation, reward, done, _ = env.step(action)
episode_reward += reward
if train:
agent.learn(observation, action, reward, next_observation,
done)
if done:
break
observation = next_observation
return episode_reward
class DoubleDQNAgent(DQNAgent):
def learn(self, observation, action, reward, next_observation, done):
self.replayer.store(observation, action, reward, next_observation,
done) # 存储经验
observations, actions, rewards, next_observations, dones = \
self.replayer.sample(self.batch_size) # 经验回放
next_eval_qs = self.evaluate_net.predict(next_observations)
next_actions = next_eval_qs.argmax(axis=-1)
next_qs = self.target_net.predict(next_observations)
next_max_qs = next_qs[np.arange(next_qs.shape[0]), next_actions]
us = rewards + self.gamma * next_max_qs * (1. - dones)
targets = self.evaluate_net.predict(observations)
targets[np.arange(us.shape[0]), actions] = us
self.evaluate_net.fit(observations, targets, verbose=0)
if done:
self.target_net.set_weights(self.evaluate_net.get_weights())
代码使用TensorFlow来实现,并同时兼容TensorFlow 1.X的最新稳定版本和TensorFlow 2.X的最新稳定版本。对于基于TensorFlow的程序,即使已经设置了随机数的种子,也不能保证完全复现。所以,运行结果有差异是正常现象。
参考
原理的介绍可以参考我之前的文章
函数近似方法与原理
线性近似与函数近似的收敛性
DQN算法原理