• 欢迎访问开心洋葱网站,在线教程,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站,欢迎加入开心洋葱 QQ群
  • 为方便开心洋葱网用户,开心洋葱官网已经开启复制功能!
  • 欢迎访问开心洋葱网站,手机也能访问哦~欢迎加入开心洋葱多维思维学习平台 QQ群
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏开心洋葱吧~~~~~~~~~~~~~!
  • 由于近期流量激增,小站的ECS没能经的起亲们的访问,本站依然没有盈利,如果各位看如果觉着文字不错,还请看官给小站打个赏~~~~~~~~~~~~~!

[强化学习实战]DQN算法实战-小车上山(MountainCar-v0)

人工智能 柯南404 1429次浏览 0个评论

DQN算法实战-小车上山

  • 案例分析
    • 实验环境
    • 用线性近似求解最优策略
    • 用深度Q学习求解最优策略
  • 参考

 

案例分析

如图1所示,一个小车在一段范围内行驶。在任一时刻,在水平方向看,小车位置的范围是[-1.2,0.6],速度的范围是[-0.07,0.07]。在每个时刻,智能体可以对小车施加3种动作中的一种:向左施力、不施力、向右施力。智能体施力和小车的水平位置会共同决定小车下一时刻的速度。当某时刻小车的水平位置大于0.5时,控制目标成功达成,回合结束。控制的目标是让小车以尽可能少的步骤达到目标。一般认为,如果智能体在连续100个回合中的平均步数≤110,就认为问题解决了。

小车上山

在绝大多数情况下,智能体简单向右施力并不足以让小车成功越过目标。假设智能体并不知道环境确定小车位置和速度的数学表达式。事实上,小车的位置和速度是有数学表达式的。记第t时刻(t=0,1,2,…)小车的位置为X t ( X t ∈ [ − 1.2 , 0.6 ] ),速度为V t ( V t ∈ [ − 0.07 , 0.07 ] ) ,智能体施力为A t ∈ 0 , 1 , 2 ,初始状态X 0 ∈ [ − 0.6 , − 0.4 ) , V 0 = 0 。从t时刻到t + 1时刻的更新式为

在这里插入图片描述

其中限制函数clip()限制了位置和速度的范围:

在这里插入图片描述

实验环境

Gym库内置的环境’MountainCar-v0’已经实现了小车上山环境。在这个环境中,每一步的奖励都是-1,回合的回报的值就是总步数的负数。导入这个环境,并查看其状态空间和动作空间,以及位置和速度的参数。

import numpy as np
np.random.seed(0)
import pandas as pd
import matplotlib.pyplot as plt
import gym
import tensorflow.compat.v2 as tf
tf.random.set_seed(0)
from tensorflow import keras

env = gym.make('MountainCar-v0')
env.seed(0)
print('观测空间 = {}'.format(env.observation_space))
print('动作空间 = {}'.format(env.action_space))
print('位置范围 = {}'.format((env.unwrapped.min_position,
        env.unwrapped.max_position)))
print('速度范围 = {}'.format((-env.unwrapped.max_speed,
        env.unwrapped.max_speed)))
print('目标位置 = {}'.format(env.unwrapped.goal_position))

使用这个环境。在代码清单2中的策略总是试图向右对小车施力。程序运行结果表明,仅仅简单地向右施力,是不可能让小车达到目标的。为了避免程序无穷尽地运行下去,这里限制了回合最大的步数为200。 

positions, velocities = [], []
observation = env.reset()
while True:
    positions.append(observation[0])
    velocities.append(observation[1])
    next_observation, reward, done, _ = env.step(2)
    if done:
        break
    observation = next_observation

if next_observation[0] > 0.5:
    print('成功到达')
else:
    print('失败退出')

# 绘制位置和速度图像
fig, ax = plt.subplots()
ax.plot(positions, label='position')
ax.plot(velocities, label='velocity')
ax.legend()
plt.show()

用线性近似求解最优策略

本节我们将用形如q ( s , a ) = [ x ( s , a ) ] T w 的线性组合来近似动作价值函数,求解最优策略。
在这个问题中,位置和速度都是连续的变量。要从连续空间中导出数目有限的特征,最简单的方法是采用独热编码(one-hot coding)。如图a所示:在二维的“位置–速度”空间中,我们可将其划分为许多小格。位置轴范围总长是l 位 置 ,每个小格的宽度是δ 位 置 ,那么位置轴有b 位 置 = [ l 位 置 ÷ δ 速 度 ] 个小格;同理,速度范围总长l速度,每个小格长度δ 速度,b 速 度 = [ l 速 度 ÷ δ 速 度 ] 个小格。这样,整个空间有b 位 置 b 速 度 个小格。每个小格对应一个特征:当位置速度对位于某个小格时,那个小格对应的特征为1,其他小格对应的特征均为0。这样,独热编码就从连续的空间中提取出了b 位 置 b 速 度 个特征。采用独热编码后得到的价值函数,对于同一网格内的所有位置速度对,其价值函数的估计都是相同的。所以这只是一种近似。如果要让近似更准确,就要让每个小格的长度δ 位 置 和 δ 速 度更小。但是,这样会增大特征的数目b 位 置 b 速 度  。

在这里插入图片描述

砖瓦编码(tile coding)可以在精度相同的情况下减少特征数目。如图b所示,砖瓦编码引入了多层大网格。本节用的m层砖瓦编码,每层的大网格都是原来独热编码小格的m位宽、m位长。在相邻两层之间,在两个维度上都偏移一个独热编码的小格。对于任意的位置速度对,它在每一层都会落在某个大网格里。这样,我们可以让每层中大网格对应的特征为1,其他特征为0。综合考虑所有层,总共大致有b 位 置 b 速 度 / m 个特征,特征数大大减小。

TileCoder类实现了砖瓦编码。构造TileCoder类需要两个参数:参数layers表示要用几层砖瓦编码;参数features表示砖瓦编码应该得到多少特征,即x(s,a)的维度,它也是w的维度。构造TileCoder类对象后,就可以调用这个对象找到每个数据激活了哪些特征。调用的参数floats输入[0,1]间的浮点数的tuple,参数ints输入int元素的tuple(不参与砖瓦编码);返回int型列表,表示激活的参数指标。

class TileCoder:
    def __init__(self, layers, features):
        self.layers = layers
        self.features = features
        self.codebook = {}
    
    def get_feature(self, codeword):
        if codeword in self.codebook:
            return self.codebook[codeword]
        count = len(self.codebook)
        if count >= self.features: # 冲突处理
            return hash(codeword) % self.features
        self.codebook[codeword] = count
        return count
    
    def __call__(self, floats=(), ints=()):
        dim = len(floats)
        scaled_floats = tuple(f * self.layers * self.layers for f in floats)
        features = []
        for layer in range(self.layers):
            codeword = (layer,) + tuple(int((f + (1 + dim * i) * layer) /
                    self.layers) for i, f in enumerate(scaled_floats)) + ints
            feature = self.get_feature(codeword)
            features.append(feature)
        return features

在小车上山任务中,如果我们对观测空间选取8层的砖瓦编码,那么观测空间第0层有8×8=64个砖瓦,剩下8-1=7层有(8+1)×(8+1)=81个砖瓦,一共有64+7×81=631个砖瓦。再考虑到动作有3种可能的取值,那么总共有631×3=1893个特征。接下来,我们运用砖瓦编码来实现函数近似的智能体。以下是函数近似SARSA算法的智能体类SARSAAgent和函数近似SARSA(λ)的智能体类SARSALambdaAgent。

class SARSAAgent:
    def __init__(self, env, layers=8, features=1893, gamma=1.,
                learning_rate=0.03, epsilon=0.001):
        self.action_n = env.action_space.n # 动作数
        self.obs_low = env.observation_space.low
        self.obs_scale = env.observation_space.high - \
                env.observation_space.low # 观测空间范围
        self.encoder = TileCoder(layers, features) # 砖瓦编码器
        self.w = np.zeros(features) # 权重
        self.gamma = gamma # 折扣
        self.learning_rate = learning_rate # 学习率
        self.epsilon = epsilon # 探索
        
    def encode(self, observation, action): # 编码
        states = tuple((observation - self.obs_low) / self.obs_scale)
        actions = (action,)
        return self.encoder(states, actions)
    
    def get_q(self, observation, action): # 动作价值
        features = self.encode(observation, action)
        return self.w[features].sum()
    
    def decide(self, observation): # 判决
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.action_n)
        else:
            qs = [self.get_q(observation, action) for action in
                    range(self.action_n)]
            return np.argmax(qs)
        
    def learn(self, observation, action, reward,
            next_observation, done, next_action): # 学习
        u = reward + (1. - done) * self.gamma * \
                self.get_q(next_observation, next_action)
        td_error = u - self.get_q(observation, action)
        features = self.encode(observation, action)
        self.w[features] += (self.learning_rate * td_error)

class SARSALambdaAgent(SARSAAgent):
    def __init__(self, env, layers=8, features=1893, gamma=1.,
            learning_rate=0.03, epsilon=0.001, lambd=0.9):
        super().__init__(env=env, layers=layers, features=features,
                gamma=gamma, learning_rate=learning_rate, epsilon=epsilon)
        self.lambd = lambd
        self.z = np.zeros(features) # 初始化资格迹
        
    def learn(self, observation, action, reward, next_observation, done,
            next_action):
        u = reward
        if not done:
            u += (self.gamma * self.get_q(next_observation, next_action))
            self.z *= (self.gamma * self.lambd)
            features = self.encode(observation, action)
            self.z[features] = 1. # 替换迹
        td_error = u - self.get_q(observation, action)
        self.w += (self.learning_rate * td_error * self.z)
        if done:
            self.z = np.zeros_like(self.z) # 为下一回合初始化资格迹

运用环境对象env和构造好的智能体对象agent,我们就可以用函数play_sarsa()训练智能体。对于训练了300个回合的SARSAAgent,平均回合奖励可以达到-121左右;对于训练了150个回合的SARSALambdaAgent,平均回合奖励可以达到-107左右。在这个实现中,SARSA(λ)算法比SARSA算法更为高效。事实上,SARSA(λ)算法是针对小车上山这个任务最有效的方法之一。

用深度Q学习求解最优策略

首先我们来看经验回放。代码清单中的类DQNReplayer实现了经验回放。构造这个类的参数中有个int型的参数capacity,表示存储空间最多可以存储几条经验。当要存储的经验数超过capacity时,会用最新的经验覆盖最早存入的经验。

class DQNReplayer:
    def __init__(self, capacity):
        self.memory = pd.DataFrame(index=range(capacity),
                columns=['observation', 'action', 'reward',
                'next_observation', 'done'])
        self.i = 0
        self.count = 0
        self.capacity = capacity
    
    def store(self, *args):
        self.memory.loc[self.i] = args
        self.i = (self.i + 1) % self.capacity
        self.count = min(self.count + 1, self.capacity)
        
    def sample(self, size):
        indices = np.random.choice(self.count, size=size)
        return (np.stack(self.memory.loc[indices, field]) for field in
                self.memory.columns)

接下来我们来看函数近似部分。函数近似采用了矢量形式的近似函数q ( s ; w ) , s ∈ ( S ) q(s;w),s∈(\mathcal{S})q(s;w),s∈(S),近似函数的形式为全连接神经网络。以下分别实现了带目标网络的深度Q学习智能体和双重Q学习智能体。它们和play_qlearning()函数结合,就实现了带目标网络的深度Q学习算法和双重Q学习算法。

 

class DQNAgent:
    def __init__(self, env, net_kwargs={}, gamma=0.99, epsilon=0.001,
             replayer_capacity=10000, batch_size=64):
        observation_dim = env.observation_space.shape[0]
        self.action_n = env.action_space.n
        self.gamma = gamma
        self.epsilon = epsilon
        
        self.batch_size = batch_size
        self.replayer = DQNReplayer(replayer_capacity) # 经验回放
         
        self.evaluate_net = self.build_network(input_size=observation_dim,
                output_size=self.action_n, **net_kwargs) # 评估网络
        self.target_net = self.build_network(input_size=observation_dim,
                output_size=self.action_n, **net_kwargs) # 目标网络

        self.target_net.set_weights(self.evaluate_net.get_weights())

    def build_network(self, input_size, hidden_sizes, output_size,
                activation=tf.nn.relu, output_activation=None,
                learning_rate=0.01): # 构建网络
        model = keras.Sequential()
        for layer, hidden_size in enumerate(hidden_sizes):
            kwargs = dict(input_shape=(input_size,)) if not layer else {}
            model.add(keras.layers.Dense(units=hidden_size,
                    activation=activation, **kwargs))
        model.add(keras.layers.Dense(units=output_size,
                activation=output_activation)) # 输出层
        optimizer = tf.optimizers.Adam(lr=learning_rate)
        model.compile(loss='mse', optimizer=optimizer)
        return model
        
    def learn(self, observation, action, reward, next_observation, done):
        self.replayer.store(observation, action, reward, next_observation,
                done) # 存储经验

        observations, actions, rewards, next_observations, dones = \
                self.replayer.sample(self.batch_size) # 经验回放

        next_qs = self.target_net.predict(next_observations)
        next_max_qs = next_qs.max(axis=-1)
        us = rewards + self.gamma * (1. - dones) * next_max_qs
        targets = self.evaluate_net.predict(observations)
        targets[np.arange(us.shape[0]), actions] = us
        self.evaluate_net.fit(observations, targets, verbose=0)

        if done: # 更新目标网络
            self.target_net.set_weights(self.evaluate_net.get_weights())

    def decide(self, observation): # epsilon贪心策略
        if np.random.rand() < self.epsilon:
            return np.random.randint(self.action_n)
        qs = self.evaluate_net.predict(observation[np.newaxis])
        return np.argmax(qs)

def play_qlearning(env, agent, train=False, render=False):
    episode_reward = 0
    observation = env.reset()
    while True:
        if render:
            env.render()
        action = agent.decide(observation)
        next_observation, reward, done, _ = env.step(action)
        episode_reward += reward
        if train:
            agent.learn(observation, action, reward, next_observation,
                    done)
        if done:
            break
        observation = next_observation
    return episode_reward

class DoubleDQNAgent(DQNAgent):
    def learn(self, observation, action, reward, next_observation, done):
        self.replayer.store(observation, action, reward, next_observation,
                done) # 存储经验
        observations, actions, rewards, next_observations, dones = \
                self.replayer.sample(self.batch_size) # 经验回放
        next_eval_qs = self.evaluate_net.predict(next_observations)
        next_actions = next_eval_qs.argmax(axis=-1)
        next_qs = self.target_net.predict(next_observations)
        next_max_qs = next_qs[np.arange(next_qs.shape[0]), next_actions] 
        us = rewards + self.gamma * next_max_qs * (1. - dones)
        targets = self.evaluate_net.predict(observations)
        targets[np.arange(us.shape[0]), actions] = us
        self.evaluate_net.fit(observations, targets, verbose=0)

        if done:
            self.target_net.set_weights(self.evaluate_net.get_weights())

代码使用TensorFlow来实现,并同时兼容TensorFlow 1.X的最新稳定版本和TensorFlow 2.X的最新稳定版本。对于基于TensorFlow的程序,即使已经设置了随机数的种子,也不能保证完全复现。所以,运行结果有差异是正常现象。

参考

原理的介绍可以参考我之前的文章

函数近似方法与原理

线性近似与函数近似的收敛性

DQN算法原理


开心洋葱 , 版权所有丨如未注明 , 均为原创丨未经授权请勿修改 , 转载请注明[强化学习实战]DQN算法实战-小车上山(MountainCar-v0)
喜欢 (0)

您必须 登录 才能发表评论!

加载中……