我们继续上一篇文章。
上一篇已经完成了critic的设计,下面我们再来把actor完成。
actor
actor在算法中的工作流程就是observation(1x3) -> actor -> act(1x3)
同样的,actor也是一个mlp,只是在输出维度上与critic不同,critic输出的是一个值,而actor输出的是一个1×3的数组。
我们同样也是调用提前写好的mlp函数,来生成actor net。对于actor,也就是policy的处理,相比critic要麻烦一些。对于连续输出的问题,policy一般定义为高斯函数,即按照高斯函数采样。所以把actor,也就是policy拆开,里面是这样的流程observation(1x3) -> mean_net -> mean -> Normal(mean,std) -> sample -> act(1x3)
有些小伙伴可能会问,直接像critic一样,将observation直接经mlp输出act行不行,这个问题我也不知道答案是什么,有知道的小伙伴可以留言一下
下面我们开始编程实现上面说的过程。
from torch.distributions.normal import Normal
# 首先是定义std,也就是高斯分布的标准差,关于为什么要用log而不是直接定义标准差呢,
# 这是因为std也是作为网络的一个参数来优化,在网络的反向传播中,更倾向于log函数
# 关于具体原因,可以参考一下这篇文章,基本原理是相同的
# https://blog.csdn.net/bornfree5511/article/details/115017192?spm=1001.2014.3001.5501
log_std = -0.5 * np.ones(act_dim, dtype=np.float32)
self.log_std = torch.nn.Parameter(torch.as_tensor(log_std))
# 然后定义生成均值的网络,我们希望将obs直接映射到均值的输出
# 至于为什么要这么做,欢迎感兴趣的小伙伴留言探讨
self.mu_net = mlp([obs_dim] + list(hidden_sizes) + [act_dim], activation)
mu = self.mu_net(obs)
std = torch.exp(self.log_std)
pi=Normal(mu,std)
act=pi.sample()
# 至于为什么要计算logp_a,参考这篇文章
# https://blog.csdn.net/bornfree5511/article/details/115017192?spm=1001.2014.3001.5501
logp_a=pi.log_prob(act).sum(axis=-1)
我们编写一个测试用例来看下actor具体是怎么做的。
import torch
import torch.nn as nn
from torch.distributions.normal import Normal
import numpy as np
def mlp(sizes, activation, output_activation=nn.Identity):
layers = []
for j in range(len(sizes) - 1):
act = activation if j < len(sizes) - 2 else output_activation
layers += [nn.Linear(sizes[j], sizes[j + 1]), act()]
return nn.Sequential(*layers)
class MLPActor(nn.Module):
def __init__(self,obs,obs_dim,hidden_sizes,act_dim,activation):
super().__init__()
log_std = -0.5 * np.ones(act_dim, dtype=np.float32)
self.log_std = torch.nn.Parameter(torch.as_tensor(log_std))
self.mu_net = mlp([obs_dim] + list(hidden_sizes) + [act_dim], activation)
self.obs=obs
def _distribution(self):
mu = self.mu_net(self.obs)
std = torch.exp(self.log_std)
self.pi=Normal(mu,std)
return self.pi
def _get_action(self):
self.act=self.pi.sample()
return self.act
def _log_prob_of_act_from_distribution(self):
logp_a=pi.log_prob(self.act).sum(axis=-1)
return logp_a
obs_dim=3
act_dim=3
observation=torch.as_tensor([0.5, 0.1, 0],dtype=torch.float32)
hidden_sizes=[64,64]
activation=nn.Tanh
actor=MLPActor(observation,obs_dim,hidden_sizes,act_dim,activation)
pi=actor._distribution()
act=actor._get_action()
logp_a=actor._log_prob_of_act_from_distribution()
print('actor={},\npi={},\nact={},\nlogp_a={}'.format(actor,pi,act,logp_a))
输出:
actor=MLPActor(ell/selfOpenSourceProjects/kuka-reach-drl/test/test_actor.py”, line 28
(mu_net): Sequential(f)
(0): Linear(in_features=3, out_features=64, bias=True)
(1): Tanh()valid syntax
(2): Linear(in_features=64, out_features=64, bias=True)
(3): Tanh()~
(4): Linear(in_features=64, out_features=3, bias=True)
(5): Identity()
)
),
pi=Normal(loc: torch.Size([3]), scale: torch.Size([3])),
act=tensor([ 0.9923, -0.1982, 0.0383]),
logp_a=-2.321655035018921和预期的流程的工作是一样的。
actor和critic都定义完成以后,为了训练actor和critic,需要定义他们的损失函数并计算损失。在计算损失之前呢,我们还需要的一个部分是缓冲区,因为当训练数据到达一定量时,才会执行训练过程,在此之前是收集经验的过程。
缓冲区的定义很简单,我们主要看下实现过程。(主要是看下spinningup的实现过程,\狗头)
def combined_shape(length, shape=None):
if shape is None:
return (length,)
return (length, shape) if np.isscalar(shape) else (length, *shape)
class PPOBuffer:
"""
A buffer for storing trajectories experienced by a PPO agent interacting
with the environment, and using Generalized Advantage Estimation (GAE-Lambda)
for calculating the advantages of state-action pairs.
"""
def __init__(self, obs_dim, act_dim, size, gamma=0.99, lam=0.95):
self.obs_buf = np.zeros(core.combined_shape(size, obs_dim), dtype=np.float32)
self.act_buf = np.zeros(core.combined_shape(size, act_dim), dtype=np.float32)
self.adv_buf = np.zeros(size, dtype=np.float32)
self.rew_buf = np.zeros(size, dtype=np.float32)
self.ret_buf = np.zeros(size, dtype=np.float32)
self.val_buf = np.zeros(size, dtype=np.float32)
self.logp_buf = np.zeros(size, dtype=np.float32)
self.gamma, self.lam = gamma, lam
self.ptr, self.path_start_idx, self.max_size = 0, 0, size
以上的代码是缓冲区的初始化过程,个人觉着combined_shape()函数可有可无,其他的也都很显然,没什么要多说的。
def store(self, obs, act, rew, val, logp):
"""
Append one timestep of agent-environment interaction to the buffer.
"""
assert self.ptr < self.max_size # buffer has to have room so you can store
self.obs_buf[self.ptr] = obs
self.act_buf[self.ptr] = act
self.rew_buf[self.ptr] = rew
self.val_buf[self.ptr] = val
self.logp_buf[self.ptr] = logp
self.ptr += 1
接下来定义存储函数。
def finish_path(self, last_val=0):
"""
Call this at the end of a trajectory, or when one gets cut off
by an epoch ending. This looks back in the buffer to where the
trajectory started, and uses rewards and value estimates from
the whole trajectory to compute advantage estimates with GAE-Lambda,
as well as compute the rewards-to-go for each state, to use as
the targets for the value function.
The "last_val" argument should be 0 if the trajectory ended
because the agent reached a terminal state (died), and otherwise
should be V(s_T), the value function estimated for the last state.
This allows us to bootstrap the reward-to-go calculation to account
for timesteps beyond the arbitrary episode horizon (or epoch cutoff).
"""
path_slice = slice(self.path_start_idx, self.ptr)
rews = np.append(self.rew_buf[path_slice], last_val)
vals = np.append(self.val_buf[path_slice], last_val)
# the next two lines implement GAE-Lambda advantage calculation
deltas = rews[:-1] + self.gamma * vals[1:] - vals[:-1]
self.adv_buf[path_slice] = core.discount_cumsum(deltas, self.gamma * self.lam)
# the next line computes rewards-to-go, to be targets for the value function
self.ret_buf[path_slice] = core.discount_cumsum(rews, self.gamma)[:-1]
self.path_start_idx = self.ptr
def get(self):
"""
Call this at the end of an epoch to get all of the data from
the buffer, with advantages appropriately normalized (shifted to have
mean zero and std one). Also, resets some pointers in the buffer.
"""
assert self.ptr == self.max_size # buffer has to be full before you can get
self.ptr, self.path_start_idx = 0, 0
# the next two lines implement the advantage normalization trick
adv_mean, adv_std = mpi_statistics_scalar(self.adv_buf)
self.adv_buf = (self.adv_buf - adv_mean) / adv_std
data = dict(obs=self.obs_buf, act=self.act_buf, ret=self.ret_buf,
adv=self.adv_buf, logp=self.logp_buf)
return {k: torch.as_tensor(v, dtype=torch.float32) for k, v in data.items()}
上面两个函数较为重要,finish_path函数计算了GAE用来估计优势函数,同时计算了reward to go,用于value function求loss,GAE和reward to go的算法细节后面会补充。
上面就是整个缓冲区和actor的实现,后面我们会实现他们的损失函数并且实现update更新参数的过程。
猜你想看:
- Ubuntu助手 — 一键自动安装软件,一键进行系统配置
- 深度强化学习专栏 —— 1.研究现状
- 深度强化学习专栏 —— 2.手撕DQN算法实现CartPole控制
- 深度强化学习专栏 —— 3.实现一阶倒立摆
- 深度强化学习专栏 —— 4. 使用ray做分布式计算
- 深度强化学习专栏 —— 5. 使用ray的tune组件优化强化学习算法的超参数
- 深度强化学习专栏 —— 6. 使用RLLib和ray进行强化学习训练
- 深度强化学习专栏 —— 7. 实现机械臂reach某点之PPO算法实现(一)
- pybullet杂谈 :使用深度学习拟合相机坐标系与世界坐标系坐标变换关系(一)
- pybullet杂谈 :使用深度学习拟合相机坐标系与世界坐标系坐标变换关系(二)
- pybullet电机控制总结
- Part 1 – 自定义gym环境
- Part 1.1 – 注册自定义Gym环境
- Part 1.2 – 实现一个井字棋游戏的gym环境
- Part 1.3 – 熟悉PyBullet
- Part 1.4 – 为PyBullet创建Gym环境