Шаг 1. Импортирем библиотеки¶
import tensorflow as tf
import numpy as np
from vizdoom import *
import random
import time
from skimage import transform
from collections import deque
import matplotlib.pyplot as plt
import warnings #
warnings.filterwarnings('ignore')
Шаг 2: Создайте нашу среду 🎮¶
- Теперь, когда мы импортировали библиотеки / зависимости, мы создадим нашу среду.
- Doom среда требует:
-
конфигурационный файл
, который обрабатывает все параметры (размер кадра, возможные действия ...) -
Файл сценария
: который генерирует правильный сценарий (в нашем случае базовый , но вам предлагается попробовать другие сценарии ).
-
-
Примечание: у нас есть 3 возможных действия
[[0,0,1], [1,0,0], [0,1,0]]
, поэтому нам не нужно выполнять горячее кодирование ### Наше окружение -
Монстр порождается случайно где-то вдоль противоположной стены .
- Игрок может только идти влево / вправо и стрелять .
- 1 удара достаточно , чтобы убить монстра .
-
Эпизод заканчивается, когда монстр убит или по тайм-ауту (300) .
НАГРАДЫ: -
+101 за убийство монстра
- -5 пропавших без вести
- Эпизод заканчивается после убийства монстра или по таймауту.
- награда за жизнь = -1
"""
Создаем среду
"""
def create_environment():
game = DoomGame()
# Грузим конфиг
game.load_config("basic.cfg")
# Грузим сценарий
game.set_doom_scenario_path("basic.wad")
game.init()
# Here our possible actions
left = [1, 0, 0]
right = [0, 1, 0]
shoot = [0, 0, 1]
possible_actions = [left, right, shoot]
return game, possible_actions
"""
Исследуем среду путем случайных действий
"""
def test_environment():
game = DoomGame()
game.load_config("basic.cfg")
game.set_doom_scenario_path("basic.wad")
game.init()
shoot = [0, 0, 1]
left = [1, 0, 0]
right = [0, 1, 0]
actions = [shoot, left, right]
episodes = 10
for i in range(episodes):
game.new_episode()
while not game.is_episode_finished():
state = game.get_state()
img = state.screen_buffer
misc = state.game_variables
action = random.choice(actions)
print(action)
reward = game.make_action(action)
print ("\treward:", reward)
time.sleep(0.02)
print ("Result:", game.get_total_reward())
time.sleep(2)
game.close()
game, possible_actions = create_environment()
Шаг 3: Определить функции предварительной обработки ⚙️¶
Предварительная обработка - важный шаг, потому что мы хотим уменьшить сложность наших состояний, чтобы уменьшить время вычислений, необходимое для обучения. </ B>
Наши шаги:
- Оттенки серого каждого из наших кадров (потому что цвет не добавляет важную информацию </ b>). Но это уже сделано в файле конфигурации.
- Обрезать экран (в нашем случае мы убираем крышу, потому что она не содержит информации)
- нормализуем значения пикселей
- Наконец мы изменяем размер предварительно обработанного кадра
"""
Изменяем размер картинки
__________________
| |
| |
| |
| |
|_________________|
на
_____________
| |
| |
| |
|____________|
return preprocessed_frame
"""
def preprocess_frame(frame):
cropped_frame = frame[30:-10,30:-30]
normalized_frame = cropped_frame/255.0
preprocessed_frame = transform.resize(normalized_frame, [84,84])
return preprocessed_frame
Массив картинок¶
Как объясняется в этом действительно Хорошая статья мы складываем фреймы.
Укладка кадров действительно важна, потому что она помогает нам дать ощущение движения нашей нейронной сети.
- Сначала мы препроцессируем кадр
- Затем мы добавляем кадр в деку, которая автоматически удаляет самый старый кадр
- наконец мы строим сложенное состояние
Вот как работает стек:
- Для первого кадра мы подаем 4 кадра
- На каждом временном шаге мы добавляем новый кадр в deque и затем складываем их, чтобы сформировать новый сложенный кадр
- И так далее
- Если мы закончили, мы создаем новый стек с 4 новыми кадрами (потому что мы в новом эпизоде) .
stack_size = 4 # Сколько кадров в массиве
stacked_frames = deque([np.zeros((84,84), dtype=np.int) for i in range(stack_size)], maxlen=4)
def stack_frames(stacked_frames, state, is_new_episode):
# Делаем препроцессинг
frame = preprocess_frame(state)
if is_new_episode:
# Очищаем массив
stacked_frames = deque([np.zeros((84,84), dtype=np.int) for i in range(stack_size)], maxlen=4)
# Т.к. эпизод новый добавляем кадры
stacked_frames.append(frame)
stacked_frames.append(frame)
stacked_frames.append(frame)
stacked_frames.append(frame)
stacked_state = np.stack(stacked_frames, axis=2)
else:
# Добавляем в массив, при этом он работает по принципу FIFO
stacked_frames.append(frame)
stacked_state = np.stack(stacked_frames, axis=2)
return stacked_state, stacked_frames
Шаг 4: Настройка наших гиперпараметры ⚗️¶
В этой части мы настроим наши гиперпараметры. Но когда вы внедряете нейронную сеть самостоятельно, вы не будете внедрять гиперпараметры сразу, а постепенно .
- Во-первых, вы начинаете с определения гиперпараметров нейронных сетей при реализации модели.
- Затем вы добавите тренировочные гиперпараметры при реализации алгоритма обучения.
### Модель гиперпараметров
state_size = [84,84,4] # Входной стек размером 84x84x4
action_size = game.get_available_buttons_size() # 3 возможных действия: лево, право и огонь
learning_rate = 0.0002 # Alpha (aka learning rate)
### Параметры обучние
total_episodes = 500 # Количество эпизодов для обучения
max_steps = 100 # Количество шагов в эпизоде
batch_size = 64
# Параметры фазы исследования
explore_start = 1.0 # вероятность того что сейчас фаза исследования на старте
explore_stop = 0.01 # вероятность того что сейчас фаза исследования закончена
decay_rate = 0.0001 # экспонициальное затухание
# парамтер Q-обучения
gamma = 0.95
### Параметры памяти
pretrain_length = batch_size # Количество опытов, сохраненных в памяти при первоначальной инициализации
memory_size = 1000000 # Количество опытов, которые может сохранить Память
### Смотрим ли обучние
training = True
## Рендерить ли обучние
episode_render = False
Шаг 5: Создать нашу модель нейронной сети Deep Q-learning learning¶
Это наша модель глубокого обучения:
- мы берем стек из 4 кадров в качестве входных данных
- проходит через 3 коннета
- Тогда это вспыхнуло
- Наконец, он проходит через 2 слоя ФК
- выводит значение Q для каждого действия
class DQNetwork:
def __init__(self, state_size, action_size, learning_rate, name='DQNetwork'):
self.state_size = state_size
self.action_size = action_size
self.learning_rate = learning_rate
with tf.variable_scope(name):
# Мы создаем заполнители
# * state_size означает, что мы берем каждый элемент state_size в кортеж, следовательно, это как если бы мы написали
# [None, 84, 84, 4]
self.inputs_ = tf.placeholder(tf.float32, [None, *state_size], name="inputs")
self.actions_ = tf.placeholder(tf.float32, [None, 3], name="actions_")
# Помним что target_Q = R(s,a) + ymax Qhat(s', a')
self.target_Q = tf.placeholder(tf.float32, [None], name="target")
"""
Первая convnet:
CNN
BatchNormalization
ELU
"""
# Входная сетка is 84x84x4
self.conv1 = tf.layers.conv2d(inputs = self.inputs_,
filters = 32,
kernel_size = [8,8],
strides = [4,4],
padding = "VALID",
kernel_initializer=tf.contrib.layers.xavier_initializer_conv2d(),
name = "conv1")
self.conv1_batchnorm = tf.layers.batch_normalization(self.conv1,
training = True,
epsilon = 1e-5,
name = 'batch_norm1')
self.conv1_out = tf.nn.elu(self.conv1_batchnorm, name="conv1_out")
## --> [20, 20, 32]
"""
Вторая convnet:
CNN
BatchNormalization
ELU
"""
self.conv2 = tf.layers.conv2d(inputs = self.conv1_out,
filters = 64,
kernel_size = [4,4],
strides = [2,2],
padding = "VALID",
kernel_initializer=tf.contrib.layers.xavier_initializer_conv2d(),
name = "conv2")
self.conv2_batchnorm = tf.layers.batch_normalization(self.conv2,
training = True,
epsilon = 1e-5,
name = 'batch_norm2')
self.conv2_out = tf.nn.elu(self.conv2_batchnorm, name="conv2_out")
## --> [9, 9, 64]
"""
Третья convnet:
CNN
BatchNormalization
ELU
"""
self.conv3 = tf.layers.conv2d(inputs = self.conv2_out,
filters = 128,
kernel_size = [4,4],
strides = [2,2],
padding = "VALID",
kernel_initializer=tf.contrib.layers.xavier_initializer_conv2d(),
name = "conv3")
self.conv3_batchnorm = tf.layers.batch_normalization(self.conv3,
training = True,
epsilon = 1e-5,
name = 'batch_norm3')
self.conv3_out = tf.nn.elu(self.conv3_batchnorm, name="conv3_out")
## --> [3, 3, 128]
self.flatten = tf.layers.flatten(self.conv3_out)
## --> [1152]
self.fc = tf.layers.dense(inputs = self.flatten,
units = 512,
activation = tf.nn.elu,
kernel_initializer=tf.contrib.layers.xavier_initializer(),
name="fc1")
self.output = tf.layers.dense(inputs = self.fc,
kernel_initializer=tf.contrib.layers.xavier_initializer(),
units = 3,
activation=None)
# Расчитывем Q.
self.Q = tf.reduce_sum(tf.multiply(self.output, self.actions_), axis=1)
# Считаем потери
# Sum(Qtarget - Q)^2
self.loss = tf.reduce_mean(tf.square(self.target_Q - self.Q))
self.optimizer = tf.train.RMSPropOptimizer(self.learning_rate).minimize(self.loss)
# Обновляем граф
tf.reset_default_graph()
# Инициализируем сетка
DQNetwork = DQNetwork(state_size, action_size, learning_rate)
Шаг 6: Накопление опыта¶
Теперь, когда мы создаем нашу нейронную сеть, нам нужно реализовать метод Replay Experience.
Здесь мы создадим объект Memory, который создает deque.A deque (двусторонняя очередь) - это тип данных, который удаляет самый старый элемент каждый раз, когда вы добавляете новый элемент.
Эта часть была взята из Udacity: <a href="https://github.com/udacity/deep-learning/blob/master/reinforcement/Q-learning-cart.ipynb" Cartpole DQN</a>
class Memory():
def __init__(self, max_size):
self.buffer = deque(maxlen = max_size)
def add(self, experience):
self.buffer.append(experience)
def sample(self, batch_size):
buffer_size = len(self.buffer)
index = np.random.choice(np.arange(buffer_size),
size = batch_size,
replace = False)
return [self.buffer[i] for i in index]
memory = Memory(max_size = memory_size)
# Получаем эпизод
game.new_episode()
for i in range(pretrain_length):
# Если мы только начали
if i == 0:
# Берем текущее состояние
state = game.get_state().screen_buffer
state, stacked_frames = stack_frames(stacked_frames, state, True)
# Выбираем действие
action = random.choice(possible_actions)
# Смотрим награду
reward = game.make_action(action)
# Проверяем не закончился ли эпизод
done = game.is_episode_finished()
# Если всё печально и агент погиб
if done:
# Мы кончили
next_state = np.zeros(state.shape)
# Запоминанем историю эпизода
memory.add((state, action, reward, next_state, done))
# Стартуем новый эпизов
game.new_episode()
state = game.get_state().screen_buffer
state, stacked_frames = stack_frames(stacked_frames, state, True)
else:
# Берем следующий кадр
next_state = game.get_state().screen_buffer
next_state, stacked_frames = stack_frames(stacked_frames, next_state, False)
# Запоминанем историю эпизода
memory.add((state, action, reward, next_state, done))
# Делаем шажочек
state = next_state
Шаг 7: Настроить Tensorboard 📊¶
Дополнительную информацию о тенорборде см. В этом отличном учебнике за 30 минут
Чтобы запустить тензорную доску: tenorboard --logdir = / tenorboard / dqn / 1
writer = tf.summary.FileWriter("/tensorboard/dqn/1")
tf.summary.scalar("Loss", DQNetwork.loss)
write_op = tf.summary.merge_all()
Шаг 8: Обучаем нашего агента¶
Наш алгоритм:
- Инициализировать веса
- Инициировать среду
- Инициализация скорости затухания (которая будет использоваться для уменьшения эпсилон)
-
Для эпизода до max_episode сделать
Сделать новый эпизод
Установите шаг в 0
Соблюдайте первое состояние $ s_0 $
В то время как step <max_steps do : Увеличение decay_rate С помощью $ \ epsilon $ выберите случайное действие $ a_t $, в противном случае выберите $ a_t = \ mathrm {argmax} _a Q (s_t, a) $ Выполните действие $ a_t $ в симуляторе и наблюдайте за вознаграждением $ r_ {t + 1} $ и новым состоянием $ s_ {t + 1} $ Сохранение перехода $ <s_t, a_t, r_ {t + 1}, s_ {t + 1}> $ в памяти $ D $ Образец случайной мини-партии из $ D $: $ <s, a, r, s '> $ Установите $ \ hat {Q} = r $, если эпизод заканчивается в $ + 1 $, в противном случае установите $ \ hat {Q} = r + \ gamma \ max_ {a '} {Q (s', a ')} $ Сделайте шаг градиентного спуска с потерей $ (\ hat {Q} - Q (s, a)) ^ 2 $ endfor
- endfor
"""
Эта функция выполнит свою роль
С помощью ϵ выберите случайное действие
"""
def predict_action(explore_start, explore_stop, decay_rate, decay_step, state, actions):
## EPSILON Жадная стратегия
# Выберите действие a из состояния s, используя эпсилон жадный.
## Сначала мы рандомизируем число
exp_exp_tradeoff = np.random.rand()
# Здесь мы будем использовать улучшенную версию нашей жадной стратегии epsilon, используемой в блокноте Q-learning
explore_probability = explore_stop + (explore_start - explore_stop) * np.exp(-decay_rate * decay_step)
if (explore_probability > exp_exp_tradeoff):
# Выбираем слчуаеное действие на этапе исследования
action = random.choice(possible_actions)
else:
# Получим действие от Q-сети (эксплуатация)
# Оценим состояние значений Qs
Qs = sess.run(DQNetwork.output, feed_dict = {DQNetwork.inputs_: state.reshape((1, *state.shape))})
# Возьмём наибольшее значение Q (= лучшее действие)
choice = np.argmax(Qs)
action = possible_actions[int(choice)]
return action, explore_probability
Шаг 9: Смотрим как играет наш агент 👀¶
Теперь, когда мы обучили нашего агента, мы можем проверить его
with tf.Session() as sess:
game, possible_actions = create_environment()
totalScore = 0
# Загружаем модедь
saver.restore(sess, "./models/model.ckpt")
game.init()
for i in range(1):
game.new_episode()
while not game.is_episode_finished():
frame = game.get_state().screen_buffer
state = stack_frames(stacked_frames, frame)
# Take the biggest Q value (= the best action)
Qs = sess.run(DQNetwork.output, feed_dict = {DQNetwork.inputs_: state.reshape((1, *state.shape))})
action = np.argmax(Qs)
action = possible_actions[int(action)]
game.make_action(action)
score = game.get_total_reward()
print("Score: ", score)
totalScore += score
print("TOTAL_SCORE", totalScore/100.0)
game.close()