"""Reinforcement Learning (Chapter 21) """ from collections import defaultdict from utils import argmax from mdp import MDP, policy_evaluation import random class PassiveADPAgent: """Passive (non-learning) agent that uses adaptive dynamic programming on a given MDP and policy. [Figure 21.2]""" class ModelMDP(MDP): """ Class for implementing modifed Version of input MDP with an editable transition model P and a custom function T. """ def __init__(self, init, actlist, terminals, gamma, states): super().__init__(init, actlist, terminals, gamma) nested_dict = lambda: defaultdict(nested_dict) # StackOverflow:whats-the-best-way-to-initialize-a-dict-of-dicts-in-python self.P = nested_dict() def T(self, s, a): """Returns a list of tuples with probabilities for states based on the learnt model P. """ return [(prob, res) for (res, prob) in self.P[(s, a)].items()] def __init__(self, pi, mdp): self.pi = pi self.mdp = PassiveADPAgent.ModelMDP(mdp.init, mdp.actlist, mdp.terminals, mdp.gamma, mdp.states) self.U = {} self.Nsa = defaultdict(int) self.Ns1_sa = defaultdict(int) self.s = None self.a = None def __call__(self, percept): s1, r1 = percept self.mdp.states.add(s1) # Model keeps track of visited states. R, P, mdp, pi = self.mdp.reward, self.mdp.P, self.mdp, self.pi s, a, Nsa, Ns1_sa, U = self.s, self.a, self.Nsa, self.Ns1_sa, self.U if s1 not in R: # Reward is only available for visted state. U[s1] = R[s1] = r1 if s is not None: Nsa[(s, a)] += 1 Ns1_sa[(s1, s, a)] += 1 # for each t such that Ns′|sa [t, s, a] is nonzero for t in [res for (res, state, act), freq in Ns1_sa.items() if (state, act) == (s, a) and freq != 0]: P[(s, a)][t] = Ns1_sa[(t, s, a)] / Nsa[(s, a)] U = policy_evaluation(pi, U, mdp) if s1 in mdp.terminals: self.s = self.a = None else: self.s, self.a = s1, self.pi[s1] return self.a def update_state(self, percept): ''' To be overridden in most cases. The default case assumes th percept to be of type (state, reward)''' return percept class PassiveTDAgent: """The abstract class for a Passive (non-learning) agent that uses temporal differences to learn utility estimates. Override update_state method to convert percept to state and reward. The mdp being provided should be an instance of a subclass of the MDP Class.[Figure 21.4] """ def __init__(self, pi, mdp, alpha=None): self.pi = pi self.U = {s: 0. for s in mdp.states} self.Ns = {s: 0 for s in mdp.states} self.s = None self.a = None self.r = None self.gamma = mdp.gamma self.terminals = mdp.terminals if alpha: self.alpha = alpha else: self.alpha = lambda n: 1./(1+n) # udacity video def __call__(self, percept): s1, r1 = self.update_state(percept) pi, U, Ns, s, a, r = self.pi, self.U, self.Ns, self.s, self.a, self.r alpha, gamma, terminals = self.alpha, self.gamma, self.terminals if not Ns[s1]: U[s1] = r1 if s is not None: Ns[s] += 1 U[s] += alpha(Ns[s]) * (r + gamma * U[s1] - U[s]) if s1 in terminals: self.s = self.a = self.r = None else: self.s, self.a, self.r = s1, pi[s1], r1 return self.a def update_state(self, percept): ''' To be overridden in most cases. The default case assumes th percept to be of type (state, reward)''' return percept class QLearningAgent: """ An exploratory Q-learning agent. It avoids having to learn the transition model because the Q-value of a state can be related directly to those of its neighbors. [Figure 21.8] """ def __init__(self, mdp, Ne, Rplus, alpha=None): self.gamma = mdp.gamma self.terminals = mdp.terminals self.all_act = mdp.actlist self.Ne = Ne # iteration limit in exploration function self.Rplus = Rplus # large value to assign before iteration limit self.Q = defaultdict(float) self.Nsa = defaultdict(float) self.s = None self.a = None self.r = None if alpha: self.alpha = alpha else: self.alpha = lambda n: 1./(1+n) # udacity video def f(self, u, n): """ Exploration function. Returns fixed Rplus untill agent has visited state, action a Ne number of times. Same as ADP agent in book.""" if n < self.Ne: return self.Rplus else: return u def actions_in_state(self, state): """ Returns actions possible in given state. Useful for max and argmax. """ if state in self.terminals: return [None] else: return self.all_act def __call__(self, percept): s1, r1 = self.update_state(percept) Q, Nsa, s, a, r = self.Q, self.Nsa, self.s, self.a, self.r alpha, gamma, terminals, actions_in_state = self.alpha, self.gamma, self.terminals, self.actions_in_state if s1 in terminals: Q[s1, None] = r1 if s is not None: Nsa[s, a] += 1 Q[s, a] += alpha(Nsa[s, a]) * (r + gamma * max(Q[s1, a1] for a1 in actions_in_state(s1)) - Q[s, a]) if s1 in terminals: self.s = self.a = self.r = None else: self.s, self.r = s1, r1 self.a = argmax(actions_in_state(s1), key=lambda a1: self.f(Q[s1, a1], Nsa[s1, a1])) return self.a def update_state(self, percept): ''' To be overridden in most cases. The default case assumes the percept to be of type (state, reward)''' return percept def run_single_trial(agent_program, mdp): ''' Execute trial for given agent_program and mdp. mdp should be an instance of subclass of mdp.MDP ''' def take_single_action(mdp, s, a): ''' Selects outcome of taking action a in state s. Weighted Sampling. ''' x = random.uniform(0, 1) cumulative_probability = 0.0 for probability_state in mdp.T(s, a): probability, state = probability_state cumulative_probability += probability if x < cumulative_probability: break return state current_state = mdp.init while True: current_reward = mdp.R(current_state) percept = (current_state, current_reward) next_action = agent_program(percept) if next_action is None: break current_state = take_single_action(mdp, current_state, next_action)