Newer
Older
"""Markov Decision Processes (Chapter 17)
First we define an MDP, and the special case of a GridMDP, in which
states are laid out in a 2-dimensional grid. We also represent a policy
as a dictionary of {state:action} pairs, and a Utility function as a
dictionary of {state:number} pairs. We then define the value_iteration
and policy_iteration algorithms."""
from grid import orientations, turn_right, turn_left
import random
class MDP:
"""A Markov Decision Process, defined by an initial state, transition model,
and reward function. We also keep track of a gamma value, for use by
algorithms. The transition model is represented somewhat differently from
the text. Instead of P(s' | s, a) being a probability number for each
state/state/action triplet, we instead have T(s, a) return a
list of (p, s') pairs. We also keep track of the possible states,
terminal states, and actions for each state. [page 646]"""
def __init__(self, init, actlist, terminals, gamma=.9):
self.init = init
self.actlist = actlist
self.terminals = terminals
if not (0 <= gamma < 1):
raise ValueError("An MDP must have 0 <= gamma < 1")
"Return a numeric reward for this state."
return self.reward[state]
"""Transition model. From a state and an action, return a list
of (probability, result-state) pairs."""
def actions(self, state):
"""Set of actions that can be performed in this state. By default, a
fixed list of actions, except for terminal states. Override this
method if you need to specialize by state."""
if state in self.terminals:
return [None]
else:
return self.actlist
"""A two-dimensional grid MDP, as in [Figure 17.1]. All you have to do is
specify the grid as a list of lists of rewards; use None for an obstacle
(unreachable state). Also, you should specify the terminal states.
An action is an (x, y) unit vector; e.g. (1, 0) means move east."""
def __init__(self, grid, terminals, init=(0, 0), gamma=.9):
terminals=terminals, gamma=gamma)
for x in range(self.cols):
for y in range(self.rows):
self.reward[x, y] = grid[y][x]
if grid[y][x] is not None:
self.states.add((x, y))
def T(self, state, action):
return [(0.0, state)]
else:
return [(0.8, self.go(state, action)),
(0.1, self.go(state, turn_right(action))),
(0.1, self.go(state, turn_left(action)))]
def go(self, state, direction):
"""Return the state that results from going in this direction."""
state1 = vector_add(state, direction)
return state1 if state1 in self.states else state
def to_grid(self, mapping):
"""Convert a mapping from (x, y) to v into a [[..., v, ...]] grid."""
for y in range(self.rows)]))
def to_arrows(self, policy):
chars = {
(1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1): 'v', None: '.'}
return self.to_grid({s: chars[a] for (s, a) in policy.items()})
# ______________________________________________________________________________
Surya Teja Cheedella
a validé
""" [Figure 17.1]
A 4x3 grid environment that presents the agent with a sequential decision problem.
"""
Surya Teja Cheedella
a validé
sequential_decision_environment = GridMDP([[-0.04, -0.04, -0.04, +1],
[-0.04, -0.04, -0.04, -0.04]],
terminals=[(3, 2), (3, 1)])
# ______________________________________________________________________________
def value_iteration(mdp, epsilon=0.001):
"""Solving an MDP by value iteration. [Figure 17.4]"""
R, T, gamma = mdp.R, mdp.T, mdp.gamma
while True:
delta = 0
for s in mdp.states:
U1[s] = R(s) + gamma * max([sum([p * U[s1] for (p, s1) in T(s, a)])
for a in mdp.actions(s)])
delta = max(delta, abs(U1[s] - U[s]))
if delta < epsilon * (1 - gamma) / gamma:
def best_policy(mdp, U):
"""Given an MDP and a utility function U, determine the best policy,
as a mapping from state to action. (Equation 17.4)"""
pi = {}
for s in mdp.states:
pi[s] = argmax(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp))
return pi
def expected_utility(a, s, U, mdp):
"""The expected utility of doing a in state s, according to the MDP and U."""
return sum([p * U[s1] for (p, s1) in mdp.T(s, a)])
# ______________________________________________________________________________
def policy_iteration(mdp):
"""Solve an MDP by policy iteration [Figure 17.7]"""
U = {s: 0 for s in mdp.states}
pi = {s: random.choice(mdp.actions(s)) for s in mdp.states}
while True:
U = policy_evaluation(pi, U, mdp)
unchanged = True
for s in mdp.states:
a = argmax(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp))
if a != pi[s]:
pi[s] = a
unchanged = False
if unchanged:
return pi
def policy_evaluation(pi, U, mdp, k=20):
"""Return an updated utility mapping U from each state in the MDP to its
utility, using an approximation (modified policy iteration)."""
R, T, gamma = mdp.R, mdp.T, mdp.gamma
for i in range(k):
for s in mdp.states:
U[s] = R(s) + gamma * sum([p * U[s1] for (p, s1) in T(s, pi[s])])
return U
Surya Teja Cheedella
a validé
>>> pi = best_policy(sequential_decision_environment, value_iteration(sequential_decision_environment, .01))
Surya Teja Cheedella
a validé
>>> sequential_decision_environment.to_arrows(pi)
[['>', '>', '>', '.'], ['^', None, '^', '.'], ['^', '>', '^', '<']]
Surya Teja Cheedella
a validé
>>> print_table(sequential_decision_environment.to_arrows(pi))
> > > .
^ None ^ .
^ > ^ <
Surya Teja Cheedella
a validé
>>> print_table(sequential_decision_environment.to_arrows(policy_iteration(sequential_decision_environment)))
> > > .
^ None ^ .
^ > ^ <