IIT-M RL-ASSIGNMENT-2-GRIDWORLD
Solution for submission 131999
A detailed solution for submission 131999 submitted for challenge IIT-M RL-ASSIGNMENT-2-GRIDWORLD
What is the notebook about?¶
Problem - Gridworld Environment Algorithms¶
This problem deals with a grid world and stochastic actions. The tasks you have to do are:
- Implement Policy Iteration
- Implement Value Iteration
- Implement TD lamdda
- Visualize the results
- Explain the results
How to use this notebook? 📝¶
This is a shared template and any edits you make here will not be saved.You should make a copy in your own drive. Click the "File" menu (top-left), then "Save a Copy in Drive". You will be working in your copy however you like.
Update the config parameters. You can define the common variables here
Variable | Description |
---|---|
AICROWD_DATASET_PATH |
Path to the file containing test data. This should be an absolute path. |
AICROWD_RESULTS_DIR |
Path to write the output to. |
AICROWD_ASSETS_DIR |
In case your notebook needs additional files (like model weights, etc.,), you can add them to a directory and specify the path to the directory here (please specify relative path). The contents of this directory will be sent to AIcrowd for evaluation. |
AICROWD_API_KEY |
In order to submit your code to AIcrowd, you need to provide your account's API key. This key is available at https://www.aicrowd.com/participants/me |
- Installing packages. Please use the Install packages 🗃 section to install the packages
Setup AIcrowd Utilities 🛠¶
We use this to bundle the files for submission and create a submission on AIcrowd. Do not edit this block.
!pip install aicrowd-cli > /dev/null
AIcrowd Runtime Configuration 🧷¶
Get login API key from https://www.aicrowd.com/participants/me
import os
AICROWD_DATASET_PATH = os.getenv("DATASET_PATH", os.getcwd()+"/a5562c7d-55f0-4d06-841c-110655bb04ec_a2_gridworld_inputs.zip")
AICROWD_RESULTS_DIR = os.getenv("OUTPUTS_DIR", "results")
!unzip -q $AICROWD_DATASET_PATH
DATASET_DIR = 'inputs/'
GridWorld Environment¶
Read the code for the environment thoroughly
Do not edit the code for the environment
import numpy as np
class GridEnv_HW2:
def __init__(self,
goal_location,
action_stochasticity,
non_terminal_reward,
terminal_reward,
grey_in,
brown_in,
grey_out,
brown_out
):
# Do not edit this section
self.action_stochasticity = action_stochasticity
self.non_terminal_reward = non_terminal_reward
self.terminal_reward = terminal_reward
self.grid_size = [10, 10]
# Index of the actions
self.actions = {'N': (1, 0),
'E': (0,1),
'S': (-1,0),
'W': (0,-1)}
self.perpendicular_order = ['N', 'E', 'S', 'W']
l = ['normal' for _ in range(self.grid_size[0]) ]
self.grid = np.array([l for _ in range(self.grid_size[1]) ], dtype=object)
self.grid[goal_location[0], goal_location[1]] = 'goal'
self.goal_location = goal_location
for gi in grey_in:
self.grid[gi[0],gi[1]] = 'grey_in'
for bi in brown_in:
self.grid[bi[0], bi[1]] = 'brown_in'
for go in grey_out:
self.grid[go[0], go[1]] = 'grey_out'
for bo in brown_out:
self.grid[bo[0], bo[1]] = 'brown_out'
self.grey_outs = grey_out
self.brown_outs = brown_out
def _out_of_grid(self, state):
if state[0] < 0 or state[1] < 0:
return True
elif state[0] > self.grid_size[0] - 1:
return True
elif state[1] > self.grid_size[1] - 1:
return True
else:
return False
def _grid_state(self, state):
return self.grid[state[0], state[1]]
def get_transition_probabilites_and_reward(self, state, action):
"""
Returns the probabiltity of all possible transitions for the given action in the form:
A list of tuples of (next_state, probability, reward)
Note that based on number of state and action there can be many different next states
Unless the state is All the probabilities of next states should add up to 1
"""
grid_state = self._grid_state(state)
if grid_state == 'goal':
return [(self.goal_location, 1.0, 0.0)]
elif grid_state == 'grey_in':
npr = []
for go in self.grey_outs:
npr.append((go, 1/len(self.grey_outs),
self.non_terminal_reward))
return npr
elif grid_state == 'brown_in':
npr = []
for bo in self.brown_outs:
npr.append((bo, 1/len(self.brown_outs),
self.non_terminal_reward))
return npr
direction = self.actions.get(action, None)
if direction is None:
raise ValueError("Invalid action %s , please select among" % action, list(self.actions.keys()))
dir_index = self.perpendicular_order.index(action)
wrap_acts = self.perpendicular_order[dir_index:] + self.perpendicular_order[:dir_index]
next_state_probs = {}
for prob, a in zip(self.action_stochasticity, wrap_acts):
d = self.actions[a]
next_state = (state[0] + d[0]), (state[1] + d[1])
if self._out_of_grid(next_state):
next_state = state
next_state_probs.setdefault(next_state, 0.0)
next_state_probs[next_state] += prob
npr = []
for ns, prob in next_state_probs.items():
next_grid_state = self._grid_state(ns)
reward = self.terminal_reward if next_grid_state == 'goal' else self.non_terminal_reward
npr.append((ns, prob, reward))
return npr
def step(self, state, action):
npr = self.get_transition_probabilites_and_reward(state, action)
probs = [t[1] for t in npr]
sampled_idx = np.random.choice(range(len(npr)), p=probs)
sampled_npr = npr[sampled_idx]
next_state = sampled_npr[0]
reward = sampled_npr[2]
is_terminal = next_state == tuple(self.goal_location)
return next_state, reward, is_terminal
Example environment¶
This has the same setup as the pdf, do not edit the settings
def get_base_kwargs():
goal_location = (9,9)
action_stochasticity = [0.8, 0.2/3, 0.2/3, 0.2/3]
grey_out = [(3,2), (4,2), (5,2), (6,2)]
brown_in = [(9,7)]
grey_in = [(0,0)]
brown_out = [(1,7)]
non_terminal_reward = 0
terminal_reward = 10
base_kwargs = {"goal_location": goal_location,
"action_stochasticity": action_stochasticity,
"brown_in": brown_in,
"grey_in": grey_in,
"brown_out": brown_out,
"non_terminal_reward": non_terminal_reward,
"terminal_reward": terminal_reward,
"grey_out": grey_out,}
return base_kwargs
base_kwargs = get_base_kwargs()
Task 2.1 - Value Iteration¶
Run value iteration on the environment and generate the policy and expected reward
def value_iteration(env, gamma):
# Initial Values
values = np.zeros((10, 10))
# Initial policy
policy = np.empty((10, 10), object)
policy[:] = 'N' # Make all the policy values as 'N'
# Begin code here
all_values = []
H = np.zeros((10, 10))
not_done = True
while(not_done):
delta = 0
for statei in range((env.grid).shape[0]):
for statej in range((env.grid).shape[1]):
action_values = []
for action in env.perpendicular_order:
trans_probab_reward = env.get_transition_probabilites_and_reward((statei, statej), action)
reward = 0
for new_state_probab_reward in trans_probab_reward:
if env._out_of_grid((new_state_probab_reward[0][0], new_state_probab_reward[0][1])):
continue
else:
reward += new_state_probab_reward[1] * (new_state_probab_reward[2] + \
gamma*values[new_state_probab_reward[0][0], new_state_probab_reward[0][1]])
action_values.append(reward)
H[statei][statej] = np.max(np.array(action_values))
policy[statei][statej] = env.perpendicular_order[np.argmax(np.array(action_values))]
delta = max(delta, abs(values[statei][statej] - H[statei][statej]))
for statei in range((env.grid).shape[0]):
for statej in range((env.grid).shape[1]):
values[statei][statej] = H[statei][statej]
all_values.append(values.copy())
if(delta < 1e-8):
not_done = False
# Put your extra information needed for plots etc in this dictionary
extra_info = {'all_values': all_values}
# End code
# Do not change the number of output values
return {"Values": values, "Policy": policy}, extra_info
env = GridEnv_HW2(**base_kwargs)
res, extra_info = value_iteration(env, 0.7)
# The rounding off is just for making print statement cleaner
print(np.flipud(np.round(res['Values'], decimals=2)))
print(np.flipud(res['Policy']))
Task 2.2 - Policy Iteration¶
Run policy iteration on the environment and generate the policy and expected reward
def policy_iteration(env, gamma):
# Initial Values
values = np.zeros((10, 10))
# Initial policy
policy = np.empty((10, 10), object)
policy[:] = 'N' # Make all the policy values as 'N'
# Begin code here
all_values = []
not_done = True
while(not_done):
vi_not_done = True
while(vi_not_done):
delta = 0
for statei in range((env.grid).shape[0]):
for statej in range((env.grid).shape[1]):
j = values[statei][statej]
trans_probab_reward = env.get_transition_probabilites_and_reward((statei, statej), policy[statei][statej])
value = 0
for new_state_probab_reward in trans_probab_reward:
if env._out_of_grid((new_state_probab_reward[0][0], new_state_probab_reward[0][1])):
continue
else:
value += new_state_probab_reward[1] * (new_state_probab_reward[2] + \
gamma*values[new_state_probab_reward[0][0], new_state_probab_reward[0][1]])
values[statei][statej] = value
delta = max(delta, abs(j - values[statei][statej]))
if(delta < 1e-8):
vi_not_done = False
not_done = False
for statei in range((env.grid).shape[0]):
for statej in range((env.grid).shape[1]):
b = policy[statei][statej]
action_values = []
for action in env.perpendicular_order:
reward = 0
trans_probab_reward = env.get_transition_probabilites_and_reward((statei, statej), action)
for new_state_probab_reward in trans_probab_reward:
if env._out_of_grid((new_state_probab_reward[0][0], new_state_probab_reward[0][1])):
continue
else:
reward += new_state_probab_reward[1] * (new_state_probab_reward[2] + \
gamma*values[new_state_probab_reward[0][0], new_state_probab_reward[0][1]])
action_values.append(reward)
policy[statei][statej] = env.perpendicular_order[np.argmax(np.array(action_values))]
if(b != policy[statei][statej]):
not_done = True
all_values.append(values.copy())
# Put your extra information needed for plots etc in this dictionary
extra_info = {'all_values': all_values}
# End code
# Do not change the number of output values
return {"Values": values, "Policy": policy}, extra_info
env = GridEnv_HW2(**base_kwargs)
res, extra_info = policy_iteration(env, 0.7)
# The rounding off is just for making print statement cleaner
print(np.flipud(np.round(res['Values'], decimals=2)))
print(np.flipud(res['Policy']))
Task 2.3 - TD Lambda¶
Use the heuristic policy and implement TD lambda to find values on the gridworld
# The policy mentioned in the pdf to be used for TD lambda, do not modify this
def heuristic_policy(env, state):
goal = env.goal_location
dx = goal[0] - state[0]
dy = goal[1] - state[1]
if abs(dx) >= abs(dy):
direction = (np.sign(dx), 0)
else:
direction = (0, np.sign(dy))
for action, dir_val in env.actions.items():
if dir_val == direction:
target_action = action
break
return target_action
def td_lambda(env, lamda, seeds):
alpha = 0.5
gamma = 0.7
N = len(seeds)
# Usage of input_policy
# heuristic_policy(env, state) -> action
example_action = heuristic_policy(env, (1,2)) # Returns 'N' if goal is (9,9)
# Example of env.step
# env.step(state, action) -> Returns next_state, reward, is_terminal
# Initial values
values = np.zeros((10, 10))
es = np.zeros((10,10))
for episode_idx in range(N):
# Do not change this else the results will not match due to environment stochas
np.random.seed(seeds[episode_idx])
grey_in_loc = np.where(env.grid == 'grey_in')
state = grey_in_loc[0][0], grey_in_loc[1][0]
done = False
while not done:
action = heuristic_policy(env, state)
ns, rew, is_terminal = env.step(state, action)
# env.step is already taken inside the loop for you,
# Don't use env.step anywhere else in your code
# Begin code here
try:
if all_values:
pass
except:
all_values = []
delta = rew + (gamma * values[ns[0]][ns[1]]) - values[state[0]][state[1]]
es[state[0]][state[1]] += 1
for statei in range((env.grid).shape[0]):
for statej in range((env.grid).shape[1]):
values[statei][statej] += alpha*delta*(es[statei][statej])
es[statei][statej] = gamma*lamda*(es[statei][statej])
state = (ns[0], ns[1])
if(state == env.goal_location):
done = True
all_values.append(values.copy())
# Put your extra information needed for plots etc in this dictionary
extra_info = {'all_values': all_values}
# End code
# Do not change the number of output values
return {"Values": values}, extra_info
env = GridEnv_HW2(**base_kwargs)
res, extra_info = td_lambda(env, lamda=0.5, seeds=np.arange(1000))
# The rounding off is just for making print statement cleaner
print(np.flipud(np.round(res['Values'], decimals=2)))
Task 2.4 - TD Lamdba for multiple values of $\lambda$¶
Ideally this code should run as is
# This cell is only for your subjective evaluation results, display the results as asked in the pdf
# You can change it as you require, this code should run TD lamdba by default for different values of lambda
lamda_values = np.arange(0, 100+5, 5)/100
td_lamda_results = {}
extra_info = {}
for lamda in lamda_values:
env = GridEnv_HW2(**base_kwargs)
td_lamda_results[lamda], extra_info[lamda] = td_lambda(env, lamda,
seeds=np.arange(1000))
Generate Results ✅¶
def get_results(kwargs):
gridenv = GridEnv_HW2(**kwargs)
policy_iteration_results = policy_iteration(gridenv, 0.7)[0]
value_iteration_results = value_iteration(gridenv, 0.7)[0]
td_lambda_results = td_lambda(env, 0.5, np.arange(1000))[0]
final_results = {}
final_results["policy_iteration"] = policy_iteration_results
final_results["value_iteration"] = value_iteration_results
final_results["td_lambda"] = td_lambda_results
return final_results
# Do not edit this cell, generate results with it as is
if not os.path.exists(AICROWD_RESULTS_DIR):
os.mkdir(AICROWD_RESULTS_DIR)
for params_file in os.listdir(DATASET_DIR):
kwargs = np.load(os.path.join(DATASET_DIR, params_file), allow_pickle=True).item()
results = get_results(kwargs)
idx = params_file.split('_')[-1][:-4]
np.save(os.path.join(AICROWD_RESULTS_DIR, 'results_' + idx), results)
Check your score on the public data¶
This scores is not your final score, and it doesn't use the marks weightages. This is only for your reference of how arrays are matched and with what tolerance.
# Check your score on the given test cases (There are more private test cases not provided)
target_folder = 'targets'
result_folder = AICROWD_RESULTS_DIR
def check_algo_match(results, targets):
if 'Policy' in results:
policy_match = results['Policy'] == targets['Policy']
else:
policy_match = True
# Reference https://numpy.org/doc/stable/reference/generated/numpy.allclose.html
rewards_match = np.allclose(results['Values'], targets['Values'], rtol=3)
equal = rewards_match and policy_match
return equal
def check_score(target_folder, result_folder):
match = []
for out_file in os.listdir(result_folder):
res_file = os.path.join(result_folder, out_file)
results = np.load(res_file, allow_pickle=True).item()
idx = out_file.split('_')[-1][:-4] # Extract the file number
target_file = os.path.join(target_folder, f"targets_{idx}.npy")
targets = np.load(target_file, allow_pickle=True).item()
algo_match = []
for k in targets:
algo_results = results[k]
algo_targets = targets[k]
algo_match.append(check_algo_match(algo_results, algo_targets))
match.append(np.mean(algo_match))
return np.mean(match)
if os.path.exists(target_folder):
print("Shared data Score (normalized to 1):", check_score(target_folder, result_folder))
Display Results of TD lambda¶
Display Results of TD lambda with lambda values from 0 to 1 with steps of 0.05
Add code/text as required
import matplotlib.pyplot as plt
plt.style.use('ggplot')
import seaborn as sns
for lamda in list(td_lamda_results.keys()):
print(" Values of the grid for lambda = ", lamda)
print(np.flipud(np.round(td_lamda_results[lamda]['Values'], decimals=1)))
ax = sns.heatmap(np.flipud(td_lamda_results[lamda]['Values']), vmin = 0, vmax=10)
plt.show()
Subjective questions¶
2.a Value Iteration vs Policy Iteration¶
- Compare value iteration and policy iteration for states Brown in, Brown Out, Grey out and Grey In
- Which one converges faster and why
fig, (ax1,ax2) = plt.subplots(1,2,figsize=(20,9))
grid = env.grid
grey_in = []
grey_out = []
brown_in = []
brown_out = []
for i in range(grid.shape[0]):
for j in range(grid.shape[1]):
if grid[i][j] == 'grey_in':
grey_in.append((i,j))
elif grid[i][j] == 'grey_out':
grey_out.append((i,j))
elif grid[i][j] == 'brown_in':
brown_in.append((i,j))
elif grid[i][j] == 'brown_out':
brown_out.append((i,j))
spec_states = {'grey_in':grey_in, 'grey_out':grey_out, 'brown_in':brown_in, 'brown_out_':brown_out}
res1, extra_info1 = value_iteration(env, 0.7)
rewards1 = dict()
for spec_state in list(spec_states.keys()):
for state in spec_states[spec_state]:
rewards1[spec_state + str(state)] = []
for grid in extra_info1['all_values']:
rewards1[spec_state + str(state)].append(grid[state])
for state in list(rewards1.keys()):
ax1.plot(rewards1[state], label = state)
ax1.set_xlabel('iterations')
ax1.set_ylabel('value')
ax1.title.set_text('Value Iteration')
ax1.legend()
res2, extra_info2 = policy_iteration(env, 0.7)
rewards2 = dict()
for spec_state in list(spec_states.keys()):
for state in spec_states[spec_state]:
rewards2[spec_state + str(state)] = []
for grid in extra_info2['all_values']:
rewards2[spec_state + str(state)].append(grid[state])
for state in list(rewards2.keys()):
ax2.plot(rewards2[state], label = state)
ax2.set_xlabel('iterations')
ax2.set_ylabel('value')
ax2.title.set_text('Policy Iteration')
ax2.legend()
plt.show()
Policy iteration includes: policy evaluation + policy improvement, and the two are repeated iteratively until policy converges. Value iteration includes: finding optimal value function + one policy extraction. There is no repeat of the two because once the value function is optimal, then the policy out of it should also be optimal (i.e. converged). Finding optimal value function can also be seen as a combination of policy improvement (due to max) and truncated policy evaluation (the reassignment of v_(s) after just one sweep of all states regardless of convergence). Policy iteration converges faster than value iteration, as a policy converges more quickly than a value function. As we can observe from the above figure that policy iteration converged in 5 to 6 iterations but value iteration took around 40 iterations
2.b How changing $\lambda$ affecting TD Lambda¶
From the visualisation of td(lambda) heatmaps plotted above we can observe that values of grid with higher lambda is higher than values of grid with lower values of lambda. Everything goes well as long as all of the approximations get better with time. This method is called TD(0), and is biased, while having reduced variance. A Monte-Carlo estimation method is not biased, but has a lot of variance since it uses the outcome of a complete episode to perform an update. The variance comes from the fact that, at each interaction, there’s randomness involved in picking an action – in the case of a stochastic policy – and in the fact that the environment’s dynamics are also random. From grid values We can see how different values of lambda affect the initial value of a return and the way this value decays over time. Bigger values of lambda lead to slower decay (information from the past is given a non-negligible importance)
2.c Policy iteration error curve¶
Plot error curve of $J_i$ vs iteration $i$ for policy iteration
res, extra_info = policy_iteration(env, 0.7)
Jv = res['Values']
error = []
for j in extra_info['all_values']:
error.append(np.sqrt(np.sum((j-Jv)**2)/(j.shape[0]*j.shape[1])))
plt.figure(figsize=(12,6))
plt.plot(error)
plt.xlabel('iterations')
plt.ylabel('error(Ji)')
plt.title('Policy Iteration')
plt.show()
2.d TD Lamdba error curve¶
Plot error curve of $J_i$ vs iteration $i$ for TD Lambda for $\lambda = [0, 0.25, 0.5, 0.75, 1]$
lamda_values = np.arange(0, 100+5, 5)/100
td_lamda_results = {}
extra_info = {}
for lamda in lamda_values:
env = GridEnv_HW2(**base_kwargs)
td_lamda_results[lamda], extra_info[lamda] = td_lambda(env, lamda,
seeds=np.arange(1000))
lamdas = [0,0.25,0.5,0.75,1]
for lamda in lamdas:
Jv = res['Values']
error = []
for j in extra_info[lamda]['all_values']:
error.append(np.sqrt(np.sum((j-Jv)**2)/(j.shape[0]*j.shape[1])))
plt.figure(figsize=(12,6))
plt.plot(error, label=lamda)
plt.xlabel('iterations')
plt.ylabel('error(Ji)')
plt.title('TD(lamda)')
plt.legend()
plt.show()
Submit to AIcrowd 🚀¶
!DATASET_PATH=$AICROWD_DATASET_PATH aicrowd notebook submit --no-verify -c iit-m-rl-assignment-2-gridworld -a assets
Content
Comments
You must login before you can post a comment.