Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 184 additions & 0 deletions dql_plots/DeepQ.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
"""
This is the DQN implementation with Convolutional Q-Network

In this we have:
QNetwork: a Convolusional Network to estimate Q values from image input
DQN: the Deep Q-Learning agent class that handles action slection, training, target network updates and epsilon decay

The important bits are:
Convolutional layers that we use for visual input processing
Epsilon greedy exploration just like in Q-Learning
Experience replay //TODO
RMSProp optimizer //TODO
"""

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

import random
from collections import namedtuple, deque

#Neural network we use to get the Q values from visual input(stacked frames)
class QNetwork(nn.Module):
def __init__(self, stacked_input, num_actions, activation=F.relu):
super(QNetwork, self).__init__()
#convolution 1st layer
self.layer1 = nn.Conv2d(stacked_input, 16, kernel_size=8, stride=4) # [batch_size, 4, 84, 84] -> [batch_size, 16, 20, 20] Filters = 16

#convolution 2nd layer
self.layer2 = nn.Conv2d(16, 32, kernel_size=4, stride=2) # [batch_size, 32, 9, 9] input from 1st layer = 16, filters =32

#Ouput size after 2nd layer
self.flatten_img = 32 * 9 * 9 # layer 2 gives out 32 filtered, 9x9 sized batch_size times -> [batch_size, 2592]

#First fully connected layer
self.fully_connected_layer1 = nn.Linear(self.flatten_img, 256) #flattened img passed to 256 neurons
self.fully_connect_layer2 = nn.Linear(256, num_actions) #Q values [batch_size, 3]

self.activation = activation

def forward(self, input_img):
#apply conv layers with ReLU
input_img = F.relu(self.layer1(input_img))
input_img = F.relu(self.layer2(input_img))
input_img = input_img.view((-1, self.flatten_img))
input_img = self.activation(self.fully_connected_layer1(input_img))
input_img = self.fully_connect_layer2(input_img)
return input_img

#Tuple for structrued replays //TODO
TrainingSample = namedtuple('TraniningSample', ('state', 'action', 'reward', 'next_state', 'terminated')) #to access the tuple using names, 'Transition' in Pytorch

#Replay buffer for experiece replay //TODO
class ExperienceReplay:
def __init__(self, stacked_input, num_actions, capacity=int(1e5)):
self.capacity = capacity
self.sample_idx = 0
self.samples_stored_till_now = 0

#initiliazing
self.state = np.zeros((capacity, *stacked_input), dtype=np.uint8)
self.action = np.zeros((capacity, *num_actions), dtype=np.int64)
self.reward = np.zeros((capacity, 1), dtype=np.float32)
self.next_state = np.zeros((capacity, *stacked_input), dtype=np.uint8)
self.terminated = np.zeros((capacity, 1), dtype=np.float32)

#step in environment
def push(self, state, action, reward, next_state, terminated):
self.state[self.sample_idx] = state
self.action[self.sample_idx] = action
self.reward[self.sample_idx] = reward
self.next_state[self.sample_idx] = next_state
self.terminated[self.sample_idx] = terminated

self.sample_idx = (self.sample_idx + 1) % self.capacity
self.samples_stored_till_now = min(self.samples_stored_till_now + 1, self.capacity) # rewrite the old memory idx

#random batch
def sample(self, batch_size):
idx = np.random.randint(0, self.samples_stored_till_now, batch_size)
batch = TrainingSample(
state=torch.FloatTensor(self.state[idx]),
action=torch.LongTensor(self.action[idx]),
reward=torch.FloatTensor(self.reward[idx]),
next_state=torch.FloatTensor(self.next_state[idx]),
terminated=torch.FloatTensor(self.terminated[idx]),
)
return batch

#how much samples are stored
def __len__(self):
return self.samples_stored_till_now

#The actual Deep Q-Learning Network agent
class DQN:
def __init__(
self,
stacked_input,
num_actions,
alpha=0.00025,
epsilon=1.0,
minimum_epsilon=0.1,
discount_factor=0.99,
batch_size=32,
warmup_steps=5000,
ExperienceReplay_memory=int(5e4),
target_update_interval=10000,
):
self.num_actions = num_actions
self.epsilon = epsilon
self.discount_factor = discount_factor
self.batch_size = batch_size
self.warmup_steps = warmup_steps
self.target_update_interval = target_update_interval

#Q Network; input => stacked frames, action; output => QValues
self.network = QNetwork(stacked_input[0], num_actions)

#Target Network
self.target_network = QNetwork(stacked_input[0], num_actions)

#update the weights in Target Network
self.target_network.load_state_dict(self.network.state_dict())

#optimizer; reference = Deepmind DQN
self.optimizer = torch.optim.RMSprop(self.network.parameters(), alpha)

self.buffer = ExperienceReplay(stacked_input, (1, ), ExperienceReplay_memory) #initialized Experience Replay

self.total_steps = 0
self.epsilon_decay = (epsilon - minimum_epsilon) / 1e6 #Epsilon Decay

#Epsilon Greedy
@torch.no_grad()
def act(self, input_img, training=True):
self.network.eval() if not training else self.network.train()
if training and ((np.random.rand() < self.epsilon) or (self.total_steps < self.warmup_steps)):
action = np.random.randint(0, self.num_actions)
else:
input_img = torch.from_numpy(input_img).float().unsqueeze(0)
q = self.network(input_img)
action = torch.argmax(q).item()
return action
#Perform a training step
def learn(self):
current_state, action, reward, next_state, terminated = self.buffer.sample(self.batch_size) # random batch of past transitions from replay buffer and move them to GPU.

# Q(s', a)
next_q = self.target_network(next_state).detach()
#Get target Q-values from network
target_q = reward + (1. - terminated) * self.discount_factor * next_q.max(dim=1, keepdim=True).values # target = immediate reward + gamma * Q(s', a)
#Loss
loss = F.mse_loss(self.network(current_state).gather(1, action.long()), target_q)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()

result = {
'total_steps': self.total_steps,
'value_loss': loss.item()
}
return result
#Proccess a single transition and update networks
def process(self, transition):
result = {}
self.total_steps += 1

#sotre transition
self.buffer.push(*transition)

if self.total_steps > self.warmup_steps:
result = self.learn()

# update weights
if self.total_steps % self.target_update_interval == 0:
self.target_network.load_state_dict(self.network.state_dict())

#decay epsilon
self.epsilon -= self.epsilon_decay

return result

#this needs to be flushed out //TODO
95 changes: 95 additions & 0 deletions dql_plots/ImageProcessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
Observation Wrapper for our Reinforcement Learning Env

This file wraps around the Gymnasium env and:
Converts RGB frames to 84x84 greyscale for processing
It also stacks multiple consecutive frames (usually 4) to capture temporal dynamics.
Repeats the same action for a few frames to reduce computational expenses and to smoothen

Furthermore, some of the key features are:
Frame Stacking
Greyscale conversion
Frame skipping for faster performance

"""



import cv2
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt
from collections import deque


class Observation_processing(gym.Wrapper):
def __init__(self, env, repeat_action=3, stack_frames=4, do_nothing_frames=50):
super(Observation_processing, self).__init__(env)
self.do_nothing_frames = do_nothing_frames
self.repeat_action = repeat_action #same action for these frames => easy computation
self.stack_frames = stack_frames #predicting motion of car

self.frames = deque(maxlen=self.stack_frames) #refresh the frames

def rgb_to_grayscale(self, img):
img = cv2.resize(img, dsize = (84,84))
img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
return img

# reset episode => do nothing -> grayscale -> stack
def reset(self):
state,info = self.env.reset()

# for this zoom in phase do nothing
for _ in range(self.do_nothing_frames):
state, _, terminated, truncated, info = self.env.step(0)
#additional termination condition to avoid bad terminal state
if terminated or truncated:
state, info = self.env.reset()

state = self.rgb_to_grayscale(state)

#deque 4 frames
for _ in range(self.stack_frames):
self.frames.append(state)

# stack frames
stacked_state = np.stack(self.frames, axis=0)

return stacked_state, info # [stack_frames=4, 84, 84]

#take an action
def step(self, action):
total_reward = 0 #initializaed
terminated = False
truncated = False

#repeat action for repeat_action
for _ in range(self.repeat_action):
state, reward, terminated, truncated, info = self.env.step(action)
total_reward += reward

if terminated or truncated:
break

state = self.rgb_to_grayscale(state)
self.frames.append(state) #store new frames; (t-2, t-1, t, t+1)
stacked_state = np.stack(self.frames, axis=0)

return stacked_state, total_reward, terminated, truncated, info


# ============================================================
# Usage Example:
# env = gym.make("CarRacing-v3", render_mode="rgb_array")
# wrapped_env = Observation_processing(env)
# state, info = wrapped_env.reset()
# next_state, reward, done, truncated, info = wrapped_env.step(action)
#
# This wrapper ensures the input to your neural network is:
# - [stack_frames, 84, 84] shaped (default: [4, 84, 84])
# - temporally aware (via stacked grayscale frames)
# - computationally efficient (via frame skipping)
#
# Useful for: Deep Q-Networks, Policy Gradient methods, or any CNN-based RL pipeline.
# ============================================================
Loading