"""Reinforcement Learning (Chapter 21) """ from collections import defaultdict from utils import argmax import agents import random class PassiveADPAgent(agents.Agent): """Passive (non-learning) agent that uses adaptive dynamic programming on a given MDP and policy. [Fig. 21.2]""" NotImplemented class PassiveTDAgent: """The abstract class for a Passive (non-learning) agent that uses temporal differences to learn utility estimates. Override update_state method to convert percept to state and reward. The mdp being probided should be an instance of a subclass of the MDP Class.[Fig. 21.4] """ def __init__(self, pi, mdp, alpha=None): self.pi = pi self.U = {s: 0. for s in mdp.states} self.Ns = {s: 0 for s in mdp.states} self.s = None self.a = None self.r = None self.gamma = mdp.gamma self.terminals = mdp.terminals if alpha: self.alpha = alpha else: self.alpha = lambda n: 1./(1+n) # udacity video def __call__(self, percept): s_prime, r_prime = self.update_state(percept) pi, U, Ns, s, a, r = self.pi, self.U, self.Ns, self.s, self.a, self.r alpha, gamma, terminals = self.alpha, self.gamma, self.terminals if not Ns[s_prime]: U[s_prime] = r_prime if s is not None: Ns[s] += 1 U[s] += alpha(Ns[s]) * (r + gamma * U[s_prime] - U[s]) if s_prime in terminals: self.s = self.a = self.r = None else: self.s, self.a, self.r = s_prime, pi[s_prime], r_prime return self.a def update_state(self, percept): ''' To be overriden in most cases. The default case assumes th percept to be of type (state, reward)''' return percept class QLearningAgent: """ An exploratory Q-learning agent. It avoids having to learn the transition model because the Q-value of a state can be related directly to those of its neighbors. [Fig. 21.8] """ def __init__(self, mdp, Ne, Rplus, alpha=None): self.gamma = mdp.gamma self.terminals = mdp.terminals self.all_act = mdp.actlist self.Ne = Ne # iteration limit in exploration function self.Rplus = Rplus # large value to assign before iteration limit self.Q = defaultdict(float) self.Nsa = defaultdict(float) self.s = None self.a = None self.r = None if alpha: self.alpha = alpha else: self.alpha = lambda n: 1./(1+n) # udacity video def f(self, u, n): """ Exploration function. Returns fixed Rplus untill agent has visited state, action a Ne number of times. Same as ADP agent in book.""" if n < self.Ne: return self.Rplus else: return u def actions_in_state(self, state): """ Returns actions possible in given state. Useful for max and argmax. """ if state in self.terminals: return [None] else: return self.all_act def __call__(self, percept): s1, r1 = self.update_state(percept) Q, Nsa, s, a, r = self.Q, self.Nsa, self.s, self.a, self.r alpha, gamma, terminals, actions_in_state = self.alpha, self.gamma, self.terminals, self.actions_in_state if s1 in terminals: Q[(s1, None)] = r1 if s is not None: Nsa[(s, a)] += 1 Q[(s, a)] += alpha(Nsa[(s, a)])*(r+gamma*max([Q[(s1, a1)] for a1 in actions_in_state(s1)])-Q[(s, a)]) if s1 in terminals: self.s = self.a = self.r = None else: self.s, self.r = s1, r1 self.a = argmax(actions_in_state(s1), key=lambda a1: self.f(Q[(s1, a1)], Nsa[(s1, a1)])) return self.a def update_state(self, percept): ''' To be overriden in most cases. The default case assumes the percept to be of type (state, reward)''' return percept def run_single_trial(agent_program, mdp): ''' Execute trial for given agent_program and mdp. mdp should be an instance of subclass of mdp.MDP ''' def take_single_action(mdp, s, a): ''' Selects outcome of taking action a in state s. Weighted Sampling. ''' x = random.uniform(0, 1) cumulative_probability = 0.0 for probability_state in mdp.T(s, a): probability, state = probability_state cumulative_probability += probability if x < cumulative_probability: break return state current_state = mdp.init while True: current_reward = mdp.R(current_state) percept = (current_state, current_reward) next_action = agent_program(percept) if next_action is None: break current_state = take_single_action(mdp, current_state, next_action)