7个流行的Python强化学习算法及代码实现详解
作者:deephub 发布时间:2021-07-06 08:38:03
目前流行的强化学习算法包括 Q-learning、SARSA、DDPG、A2C、PPO、DQN 和 TRPO。 这些算法已被用于在游戏、机器人和决策制定等各种应用中,并且这些流行的算法还在不断发展和改进,本文我们将对其做一个简单的介绍。
1、Q-learning
Q-learning:Q-learning 是一种无模型、非策略的强化学习算法。 它使用 Bellman 方程估计最佳动作值函数,该方程迭代地更新给定状态动作对的估计值。 Q-learning 以其简单性和处理大型连续状态空间的能力而闻名。
下面是一个使用 Python 实现 Q-learning 的简单示例:
importnumpyasnp
# Define the Q-table and the learning rate
Q=np.zeros((state_space_size, action_space_size))
alpha=0.1
# Define the exploration rate and discount factor
epsilon=0.1
gamma=0.99
forepisodeinrange(num_episodes):
current_state=initial_state
whilenotdone:
# Choose an action using an epsilon-greedy policy
ifnp.random.uniform(0, 1) <epsilon:
action=np.random.randint(0, action_space_size)
else:
action=np.argmax(Q[current_state])
# Take the action and observe the next state and reward
next_state, reward, done=take_action(current_state, action)
# Update the Q-table using the Bellman equation
Q[current_state, action] =Q[current_state, action] +alpha* (reward+gamma*np.max(Q[next_state]) -Q[current_state, action])
current_state=next_state
上面的示例中,state_space_size 和 action_space_size 分别是环境中的状态数和动作数。 num_episodes 是要为运行算法的轮次数。 initial_state 是环境的起始状态。 take_action(current_state, action) 是一个函数,它将当前状态和一个动作作为输入,并返回下一个状态、奖励和一个指示轮次是否完成的布尔值。
在 while 循环中,使用 epsilon-greedy 策略根据当前状态选择一个动作。 使用概率 epsilon选择一个随机动作,使用概率 1-epsilon选择对当前状态具有最高 Q 值的动作。
采取行动后,观察下一个状态和奖励,使用Bellman方程更新q。 并将当前状态更新为下一个状态。这只是 Q-learning 的一个简单示例,并未考虑 Q-table 的初始化和要解决的问题的具体细节。
2、SARSA
SARSA:SARSA 是一种无模型、基于策略的强化学习算法。 它也使用Bellman方程来估计动作价值函数,但它是基于下一个动作的期望值,而不是像 Q-learning 中的最优动作。 SARSA 以其处理随机动力学问题的能力而闻名。
importnumpyasnp
# Define the Q-table and the learning rate
Q=np.zeros((state_space_size, action_space_size))
alpha=0.1
# Define the exploration rate and discount factor
epsilon=0.1
gamma=0.99
forepisodeinrange(num_episodes):
current_state=initial_state
action=epsilon_greedy_policy(epsilon, Q, current_state)
whilenotdone:
# Take the action and observe the next state and reward
next_state, reward, done=take_action(current_state, action)
# Choose next action using epsilon-greedy policy
next_action=epsilon_greedy_policy(epsilon, Q, next_state)
# Update the Q-table using the Bellman equation
Q[current_state, action] =Q[current_state, action] +alpha* (reward+gamma*Q[next_state, next_action] -Q[current_state, action])
current_state=next_state
action=next_action
state_space_size和action_space_size分别是环境中的状态和操作的数量。num_episodes是您想要运行SARSA算法的轮次数。Initial_state是环境的初始状态。take_action(current_state, action)是一个将当前状态和作为操作输入的函数,并返回下一个状态、奖励和一个指示情节是否完成的布尔值。
在while循环中,使用在单独的函数epsilon_greedy_policy(epsilon, Q, current_state)中定义的epsilon-greedy策略来根据当前状态选择操作。使用概率 epsilon选择一个随机动作,使用概率 1-epsilon对当前状态具有最高 Q 值的动作。
上面与Q-learning相同,但是采取了一个行动后,在观察下一个状态和奖励时它然后使用贪心策略选择下一个行动。并使用Bellman方程更新q表。
3、DDPG
DDPG 是一种用于连续动作空间的无模型、非策略算法。 它是一种actor-critic算法,其中actor网络用于选择动作,而critic网络用于评估动作。 DDPG 对于机器人控制和其他连续控制任务特别有用。
importnumpyasnp
fromkeras.modelsimportModel, Sequential
fromkeras.layersimportDense, Input
fromkeras.optimizersimportAdam
# Define the actor and critic models
actor=Sequential()
actor.add(Dense(32, input_dim=state_space_size, activation='relu'))
actor.add(Dense(32, activation='relu'))
actor.add(Dense(action_space_size, activation='tanh'))
actor.compile(loss='mse', optimizer=Adam(lr=0.001))
critic=Sequential()
critic.add(Dense(32, input_dim=state_space_size, activation='relu'))
critic.add(Dense(32, activation='relu'))
critic.add(Dense(1, activation='linear'))
critic.compile(loss='mse', optimizer=Adam(lr=0.001))
# Define the replay buffer
replay_buffer= []
# Define the exploration noise
exploration_noise=OrnsteinUhlenbeckProcess(size=action_space_size, theta=0.15, mu=0, sigma=0.2)
forepisodeinrange(num_episodes):
current_state=initial_state
whilenotdone:
# Select an action using the actor model and add exploration noise
action=actor.predict(current_state)[0] +exploration_noise.sample()
action=np.clip(action, -1, 1)
# Take the action and observe the next state and reward
next_state, reward, done=take_action(current_state, action)
# Add the experience to the replay buffer
replay_buffer.append((current_state, action, reward, next_state, done))
# Sample a batch of experiences from the replay buffer
batch=sample(replay_buffer, batch_size)
# Update the critic model
states=np.array([x[0] forxinbatch])
actions=np.array([x[1] forxinbatch])
rewards=np.array([x[2] forxinbatch])
next_states=np.array([x[3] forxinbatch])
target_q_values=rewards+gamma*critic.predict(next_states)
critic.train_on_batch(states, target_q_values)
# Update the actor model
action_gradients=np.array(critic.get_gradients(states, actions))
actor.train_on_batch(states, action_gradients)
current_state=next_state
在本例中,state_space_size和action_space_size分别是环境中的状态和操作的数量。num_episodes是轮次数。Initial_state是环境的初始状态。Take_action (current_state, action)是一个函数,它接受当前状态和操作作为输入,并返回下一个操作。
4、A2C
A2C(Advantage Actor-Critic)是一种有策略的actor-critic算法,它使用Advantage函数来更新策略。 该算法实现简单,可以处理离散和连续的动作空间。
importnumpyasnp
fromkeras.modelsimportModel, Sequential
fromkeras.layersimportDense, Input
fromkeras.optimizersimportAdam
fromkeras.utilsimportto_categorical
# Define the actor and critic models
state_input=Input(shape=(state_space_size,))
actor=Dense(32, activation='relu')(state_input)
actor=Dense(32, activation='relu')(actor)
actor=Dense(action_space_size, activation='softmax')(actor)
actor_model=Model(inputs=state_input, outputs=actor)
actor_model.compile(loss='categorical_crossentropy', optimizer=Adam(lr=0.001))
state_input=Input(shape=(state_space_size,))
critic=Dense(32, activation='relu')(state_input)
critic=Dense(32, activation='relu')(critic)
critic=Dense(1, activation='linear')(critic)
critic_model=Model(inputs=state_input, outputs=critic)
critic_model.compile(loss='mse', optimizer=Adam(lr=0.001))
forepisodeinrange(num_episodes):
current_state=initial_state
done=False
whilenotdone:
# Select an action using the actor model and add exploration noise
action_probs=actor_model.predict(np.array([current_state]))[0]
action=np.random.choice(range(action_space_size), p=action_probs)
# Take the action and observe the next state and reward
next_state, reward, done=take_action(current_state, action)
# Calculate the advantage
target_value=critic_model.predict(np.array([next_state]))[0][0]
advantage=reward+gamma*target_value-critic_model.predict(np.array([current_state]))[0][0]
# Update the actor model
action_one_hot=to_categorical(action, action_space_size)
actor_model.train_on_batch(np.array([current_state]), advantage*action_one_hot)
# Update the critic model
critic_model.train_on_batch(np.array([current_state]), reward+gamma*target_value)
current_state=next_state
在这个例子中,actor模型是一个神经网络,它有2个隐藏层,每个隐藏层有32个神经元,具有relu激活函数,输出层具有softmax激活函数。critic模型也是一个神经网络,它有2个隐含层,每层32个神经元,具有relu激活函数,输出层具有线性激活函数。
使用分类交叉熵损失函数训练actor模型,使用均方误差损失函数训练critic模型。动作是根据actor模型预测选择的,并添加了用于探索的噪声。
5、PPO
PPO(Proximal Policy Optimization)是一种策略算法,它使用信任域优化的方法来更新策略。 它在具有高维观察和连续动作空间的环境 * 别有用。 PPO 以其稳定性和高样品效率而著称。
importnumpyasnp
fromkeras.modelsimportModel, Sequential
fromkeras.layersimportDense, Input
fromkeras.optimizersimportAdam
# Define the policy model
state_input=Input(shape=(state_space_size,))
policy=Dense(32, activation='relu')(state_input)
policy=Dense(32, activation='relu')(policy)
policy=Dense(action_space_size, activation='softmax')(policy)
policy_model=Model(inputs=state_input, outputs=policy)
# Define the value model
value_model=Model(inputs=state_input, outputs=Dense(1, activation='linear')(policy))
# Define the optimizer
optimizer=Adam(lr=0.001)
forepisodeinrange(num_episodes):
current_state=initial_state
whilenotdone:
# Select an action using the policy model
action_probs=policy_model.predict(np.array([current_state]))[0]
action=np.random.choice(range(action_space_size), p=action_probs)
# Take the action and observe the next state and reward
next_state, reward, done=take_action(current_state, action)
# Calculate the advantage
target_value=value_model.predict(np.array([next_state]))[0][0]
advantage=reward+gamma*target_value-value_model.predict(np.array([current_state]))[0][0]
# Calculate the old and new policy probabilities
old_policy_prob=action_probs[action]
new_policy_prob=policy_model.predict(np.array([next_state]))[0][action]
# Calculate the ratio and the surrogate loss
ratio=new_policy_prob/old_policy_prob
surrogate_loss=np.minimum(ratio*advantage, np.clip(ratio, 1-epsilon, 1+epsilon) *advantage)
# Update the policy and value models
policy_model.trainable_weights=value_model.trainable_weights
policy_model.compile(optimizer=optimizer, loss=-surrogate_loss)
policy_model.train_on_batch(np.array([current_state]), np.array([action_one_hot]))
value_model.train_on_batch(np.array([current_state]), reward+gamma*target_value)
current_state=next_state
6、DQN
DQN(深度 Q 网络)是一种无模型、非策略算法,它使用神经网络来逼近 Q 函数。 DQN 特别适用于 Atari 游戏和其他类似问题,其中状态空间是高维的,并使用神经网络近似 Q 函数。
importnumpyasnp
fromkeras.modelsimportSequential
fromkeras.layersimportDense, Input
fromkeras.optimizersimportAdam
fromcollectionsimportdeque
# Define the Q-network model
model=Sequential()
model.add(Dense(32, input_dim=state_space_size, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(action_space_size, activation='linear'))
model.compile(loss='mse', optimizer=Adam(lr=0.001))
# Define the replay buffer
replay_buffer=deque(maxlen=replay_buffer_size)
forepisodeinrange(num_episodes):
current_state=initial_state
whilenotdone:
# Select an action using an epsilon-greedy policy
ifnp.random.rand() <epsilon:
action=np.random.randint(0, action_space_size)
else:
action=np.argmax(model.predict(np.array([current_state]))[0])
# Take the action and observe the next state and reward
next_state, reward, done=take_action(current_state, action)
# Add the experience to the replay buffer
replay_buffer.append((current_state, action, reward, next_state, done))
# Sample a batch of experiences from the replay buffer
batch=random.sample(replay_buffer, batch_size)
# Prepare the inputs and targets for the Q-network
inputs=np.array([x[0] forxinbatch])
targets=model.predict(inputs)
fori, (state, action, reward, next_state, done) inenumerate(batch):
ifdone:
targets[i, action] =reward
else:
targets[i, action] =reward+gamma*np.max(model.predict(np.array([next_state]))[0])
# Update the Q-network
model.train_on_batch(inputs, targets)
current_state=next_state
上面的代码,Q-network有2个隐藏层,每个隐藏层有32个神经元,使用relu激活函数。该网络使用均方误差损失函数和Adam优化器进行训练。
7、TRPO
TRPO (Trust Region Policy Optimization)是一种无模型的策略算法,它使用信任域优化方法来更新策略。 它在具有高维观察和连续动作空间的环境 * 别有用。
TRPO 是一个复杂的算法,需要多个步骤和组件来实现。TRPO不是用几行代码就能实现的简单算法。
所以我们这里使用实现了TRPO的现有库,例如OpenAI Baselines,它提供了包括TRPO在内的各种预先实现的强化学习算法,。
要在OpenAI Baselines中使用TRPO,我们需要安装:
pip install baselines
然后可以使用baselines库中的trpo_mpi模块在你的环境中训练TRPO代理,这里有一个简单的例子:
importgym
frombaselines.common.vec_env.dummy_vec_envimportDummyVecEnv
frombaselines.trpo_mpiimporttrpo_mpi
#Initialize the environment
env=gym.make("CartPole-v1")
env=DummyVecEnv([lambda: env])
# Define the policy network
policy_fn=mlp_policy
#Train the TRPO model
model=trpo_mpi.learn(env, policy_fn, max_iters=1000)
我们使用Gym库初始化环境。然后定义策略网络,并调用TRPO模块中的learn()函数来训练模型。
还有许多其他库也提供了TRPO的实现,例如TensorFlow、PyTorch和RLLib。下面时一个使用TF 2.0实现的样例
importtensorflowastf
importgym
# Define the policy network
classPolicyNetwork(tf.keras.Model):
def__init__(self):
super(PolicyNetwork, self).__init__()
self.dense1=tf.keras.layers.Dense(16, activation='relu')
self.dense2=tf.keras.layers.Dense(16, activation='relu')
self.dense3=tf.keras.layers.Dense(1, activation='sigmoid')
defcall(self, inputs):
x=self.dense1(inputs)
x=self.dense2(x)
x=self.dense3(x)
returnx
# Initialize the environment
env=gym.make("CartPole-v1")
# Initialize the policy network
policy_network=PolicyNetwork()
# Define the optimizer
optimizer=tf.optimizers.Adam()
# Define the loss function
loss_fn=tf.losses.BinaryCrossentropy()
# Set the maximum number of iterations
max_iters=1000
# Start the training loop
foriinrange(max_iters):
# Sample an action from the policy network
action=tf.squeeze(tf.random.categorical(policy_network(observation), 1))
# Take a step in the environment
observation, reward, done, _=env.step(action)
withtf.GradientTape() astape:
# Compute the loss
loss=loss_fn(reward, policy_network(observation))
# Compute the gradients
grads=tape.gradient(loss, policy_network.trainable_variables)
# Perform the update step
optimizer.apply_gradients(zip(grads, policy_network.trainable_variables))
ifdone:
# Reset the environment
observation=env.reset()
在这个例子中,我们首先使用TensorFlow的Keras API定义一个策略网络。然后使用Gym库和策略网络初始化环境。然后定义用于训练策略网络的优化器和损失函数。
在训练循环中,从策略网络中采样一个动作,在环境中前进一步,然后使用TensorFlow的GradientTape计算损失和梯度。然后我们使用优化器执行更新步骤。
这是一个简单的例子,只展示了如何在TensorFlow 2.0中实现TRPO。TRPO是一个非常复杂的算法,这个例子没有涵盖所有的细节,但它是试验TRPO的一个很好的起点。
来源:https://segmentfault.com/a/1190000043352516
猜你喜欢
- folder.htm<html><head><title>闪亮日子之在线创建文件夹</title&
- 一般来说,我们判断 iframe 是否加载完成其实与 判断 JavaScript 文件是否加载完成 采用的方法很类似:var&nb
- 1、查看数据库中有哪些用户? select username from all_users;2、查看数据库中有哪些DBA用户? select
- 随着网站的内容的增多和用户访问量的增多,无可避免的是网站加载会越来越慢,受限于带宽和服务器同一时间的请求次数的限制,我们往往需要在此时对我们
- #第一种def delRepeat(liebiao): for x in liebiao: while li
- 在制作网页以及编程的时候,适当的进行注释,不仅使自己的思路清晰,极大地减轻了维护的难度,而且方便项目组其他人了解你的代码,方便对代码的理解以
- 最近心情非常差,而且还没有触底的样子,哎~~~总是会忍不住叹气~~~前些日子在Twitter上叨唠说“不在乎IE8什么时候推出,只在乎IE6
- 今天偶尔在知乎上看到某大佬用Python写的ATM系统案例,然后观摩了下他的实现思路和源码,感觉受益颇多,于是就根据自己的思路和目前掌握的P
- 介绍百度aip模块是用于实现百度云与用户接口,简单来说就是使用百度云所拥有的人工智能模块。模块使用pip install baidu-aip
- Fuko Masked 是 Kaloyan Tsvetkov 的一个小型PHP库,用于通过用编辑后的元素替换列入黑名单的元素来屏蔽敏感数据。
- 现在的垃圾留言越来越智能,并且从留言内容几乎看不出来是垃圾留言,而大量的垃圾留言会导致文章可读性下降,并可能会被搜索引擎惩罚,经过一段时间的
- Javascript中的eval函数让我们可以很方便地操作一系列变量(a1,a2,a3,……)。自从VBScript 5.0之后,我们也可以
- 本文实例讲述了Python实现批量读取图片并存入mongodb数据库的方法。分享给大家供大家参考,具体如下:我的图片放在E:\image\中
- 问题:每次打开pycharm打开py文件光标都是insert模式, 像下面图片那样解决方案:讲Tools里面的Vim Emulator勾选去
- 本文实例讲述了php中正则替换函数ereg_replace用法。分享给大家供大家参考。具体如下:下面的实例是利用php 正则替换函数 ere
- 我很久前在YAHOO上扣的代码,兼容性很好,在Windows下的主流浏览器中可以正常运行。大家先不要急着下载代码,你随时都可以下,我们来分
- Lightbox里面的一个函数,能把页面实际的高宽与浏览器可视面积的高宽保存在一个数组中..非常好用.什么是Lightbox?下载light
- 动态语言与静态语言有很多不同,最大的特性之一就是可以实现动态的对类和实例进行修改,在Python中,我们创建了一个类后可以对实例和类绑定心的
- 一般的网站会有很多页面,面包屑导航可以大大改善用户寻找他们的路径的方法。就可用性而言,面包屑可以减少一个网站的用户返回上一级页面的操作次数,
- 开门见山,直接使用 skimage 库为图像添加高斯噪声是很简单的:import skimageorigin = skimage.io.im