python - 具有离散动作空间的软演员评论家
问题描述
我正在尝试为离散动作空间实现软演员评论家算法,但我在损失函数方面遇到了麻烦。
以下是来自 SAC 的具有连续动作空间的链接: https ://spinningup.openai.com/en/latest/algorithms/sac.html
我不知道我做错了什么。
问题是网络在 cartpole 环境中没有学到任何东西。
github上的完整代码:https ://github.com/tk2232/sac_discrete/blob/master/sac_discrete.py
这是我的想法如何计算离散动作的损失。
价值网络
class ValueNet:
def __init__(self, sess, state_size, hidden_dim, name):
self.sess = sess
with tf.variable_scope(name):
self.states = tf.placeholder(dtype=tf.float32, shape=[None, state_size], name='value_states')
self.targets = tf.placeholder(dtype=tf.float32, shape=[None, 1], name='value_targets')
x = Dense(units=hidden_dim, activation='relu')(self.states)
x = Dense(units=hidden_dim, activation='relu')(x)
self.values = Dense(units=1, activation=None)(x)
optimizer = tf.train.AdamOptimizer(0.001)
loss = 0.5 * tf.reduce_mean((self.values - tf.stop_gradient(self.targets)) ** 2)
self.train_op = optimizer.minimize(loss, var_list=_params(name))
def get_value(self, s):
return self.sess.run(self.values, feed_dict={self.states: s})
def update(self, s, targets):
self.sess.run(self.train_op, feed_dict={self.states: s, self.targets: targets})
在 Q_Network 中,我通过收集的操作收集值
例子
q_out = [[0.5533, 0.4444], [0.2222, 0.6666]]
collected_actions = [0, 1]
gather = [[0.5533], [0.6666]]
聚集功能
def gather_tensor(params, idx):
idx = tf.stack([tf.range(tf.shape(idx)[0]), idx[:, 0]], axis=-1)
params = tf.gather_nd(params, idx)
return params
Q网络
class SoftQNetwork:
def __init__(self, sess, state_size, action_size, hidden_dim, name):
self.sess = sess
with tf.variable_scope(name):
self.states = tf.placeholder(dtype=tf.float32, shape=[None, state_size], name='q_states')
self.targets = tf.placeholder(dtype=tf.float32, shape=[None, 1], name='q_targets')
self.actions = tf.placeholder(dtype=tf.int32, shape=[None, 1], name='q_actions')
x = Dense(units=hidden_dim, activation='relu')(self.states)
x = Dense(units=hidden_dim, activation='relu')(x)
x = Dense(units=action_size, activation=None)(x)
self.q = tf.reshape(gather_tensor(x, self.actions), shape=(-1, 1))
optimizer = tf.train.AdamOptimizer(0.001)
loss = 0.5 * tf.reduce_mean((self.q - tf.stop_gradient(self.targets)) ** 2)
self.train_op = optimizer.minimize(loss, var_list=_params(name))
def update(self, s, a, target):
self.sess.run(self.train_op, feed_dict={self.states: s, self.actions: a, self.targets: target})
def get_q(self, s, a):
return self.sess.run(self.q, feed_dict={self.states: s, self.actions: a})
政策网
class PolicyNet:
def __init__(self, sess, state_size, action_size, hidden_dim):
self.sess = sess
with tf.variable_scope('policy_net'):
self.states = tf.placeholder(dtype=tf.float32, shape=[None, state_size], name='policy_states')
self.targets = tf.placeholder(dtype=tf.float32, shape=[None, 1], name='policy_targets')
self.actions = tf.placeholder(dtype=tf.int32, shape=[None, 1], name='policy_actions')
x = Dense(units=hidden_dim, activation='relu')(self.states)
x = Dense(units=hidden_dim, activation='relu')(x)
self.logits = Dense(units=action_size, activation=None)(x)
dist = Categorical(logits=self.logits)
optimizer = tf.train.AdamOptimizer(0.001)
# Get action
self.new_action = dist.sample()
self.new_log_prob = dist.log_prob(self.new_action)
# Calc loss
log_prob = dist.log_prob(tf.squeeze(self.actions))
loss = tf.reduce_mean(tf.squeeze(self.targets) - 0.2 * log_prob)
self.train_op = optimizer.minimize(loss, var_list=_params('policy_net'))
def get_action(self, s):
action = self.sess.run(self.new_action, feed_dict={self.states: s[np.newaxis, :]})
return action[0]
def get_next_action(self, s):
next_action, next_log_prob = self.sess.run([self.new_action, self.new_log_prob], feed_dict={self.states: s})
return next_action.reshape((-1, 1)), next_log_prob.reshape((-1, 1))
def update(self, s, a, target):
self.sess.run(self.train_op, feed_dict={self.states: s, self.actions: a, self.targets: target})
更新功能
def soft_q_update(batch_size, frame_idx):
gamma = 0.99
alpha = 0.2
state, action, reward, next_state, done = replay_buffer.sample(batch_size)
action = action.reshape((-1, 1))
reward = reward.reshape((-1, 1))
done = done.reshape((-1, 1))
Q_target
v_ = value_net_target.get_value(next_state)
q_target = reward + (1 - done) * gamma * v_
V_target
next_action, next_log_prob = policy_net.get_next_action(state)
q1 = soft_q_net_1.get_q(state, next_action)
q2 = soft_q_net_2.get_q(state, next_action)
q = np.minimum(q1, q2)
v_target = q - alpha * next_log_prob
Policy_target
q1 = soft_q_net_1.get_q(state, action)
q2 = soft_q_net_2.get_q(state, action)
policy_target = np.minimum(q1, q2)
解决方案
由于该算法对离散和连续策略都是通用的,因此关键思想是我们需要一个可重新参数化的离散分布。然后,扩展应该包括对连续 SAC 的最小代码修改 --- 只需更改策略分发类。
有一种这样的分布——GumbelSoftmax 分布。PyTorch 没有此内置功能,因此我只是从具有正确 rsample() 的近亲扩展它并添加正确的 log prob 计算方法。由于能够计算重新参数化的动作及其日志概率,SAC 以最少的额外代码完美地处理离散动作,如下所示。
def calc_log_prob_action(self, action_pd, reparam=False):
'''Calculate log_probs and actions with option to reparametrize from paper eq. 11'''
samples = action_pd.rsample() if reparam else action_pd.sample()
if self.body.is_discrete: # this is straightforward using GumbelSoftmax
actions = samples
log_probs = action_pd.log_prob(actions)
else:
mus = samples
actions = self.scale_action(torch.tanh(mus))
# paper Appendix C. Enforcing Action Bounds for continuous actions
log_probs = (action_pd.log_prob(mus) - torch.log(1 - actions.pow(2) + 1e-6).sum(1))
return log_probs, actions
# ... for discrete action, GumbelSoftmax distribution
class GumbelSoftmax(distributions.RelaxedOneHotCategorical):
'''
A differentiable Categorical distribution using reparametrization trick with Gumbel-Softmax
Explanation http://amid.fish/assets/gumbel.html
NOTE: use this in place PyTorch's RelaxedOneHotCategorical distribution since its log_prob is not working right (returns positive values)
Papers:
[1] The Concrete Distribution: A Continuous Relaxation of Discrete Random Variables (Maddison et al, 2017)
[2] Categorical Reparametrization with Gumbel-Softmax (Jang et al, 2017)
'''
def sample(self, sample_shape=torch.Size()):
'''Gumbel-softmax sampling. Note rsample is inherited from RelaxedOneHotCategorical'''
u = torch.empty(self.logits.size(), device=self.logits.device, dtype=self.logits.dtype).uniform_(0, 1)
noisy_logits = self.logits - torch.log(-torch.log(u))
return torch.argmax(noisy_logits, dim=-1)
def log_prob(self, value):
'''value is one-hot or relaxed'''
if value.shape != self.logits.shape:
value = F.one_hot(value.long(), self.logits.shape[-1]).float()
assert value.shape == self.logits.shape
return - torch.sum(- value * F.log_softmax(self.logits, -1), -1)
这是 LunarLander 的结果。SAC 学会了快速解决它。
完整的实现代码位于https://github.com/kengz/SLM-Lab/blob/master/slm_lab/agent/algorithm/sac.py的SLM 实验室
Roboschool(连续)和 LunarLander(离散)的 SAC 基准测试结果如下所示:https ://github.com/kengz/SLM-Lab/pull/399
推荐阅读
- javascript - 在 JavaScript 中这个输入的输出顺序是什么?为什么?
- javascript - 我将如何在 selenium webdriver 中自动执行此操作?
- python - 获取 / 遍历 xml 文件 (.h5) 文件的树视图,直到使用 h5py 生成最终分支
- angular - 如何检查用户是否已登录。Angular 12.1.2
- reactjs - React typescript 将数组识别为对象
- java - 读取文件时 Scanner 和 hasNextLine() 的 Java 问题
- c# - 我被要求创建一个打印 pdf 文件的 Windows 服务,下面的代码在我在控制台应用程序上运行但不在服务上运行时有效
- python - 在 previus 完成后,Celery Django 运行定期任务。[django 芹菜节拍]
- android - Flutter 从 GetStorage 列表中获取数据值
- laravel - 在 Nginx 中从多个域/子域提供单个应用程序?