probability.py 14,9 ko
Newer Older
"""Probability models. (Chapter 13-15)
"""

from utils import *
from logic import extend
from random import random, seed

#______________________________________________________________________________

def DTAgentProgram(belief_state):
    "A decision-theoretic agent. [Fig. 13.1]"
    def program(percept):
        belief_state.observe(program.action, percept)
        program.action = argmax(belief_state.actions(), 
                                belief_state.expected_outcome_utility)
        return program.action
    program.action = None
    return program

#______________________________________________________________________________

class ProbDist:
    """A discrete probability distribution.  You name the random variable
    in the constructor, then assign and query probability of values.
    >>> P = ProbDist('Flip'); P['H'], P['T'] = 0.25, 0.75; P['H']
    0.25
    >>> P = ProbDist('X', {'lo': 125, 'med': 375, 'hi': 500})
withal's avatar
withal a validé
    >>> P['lo'], P['med'], P['hi']
    (0.125, 0.375, 0.5)
    def __init__(self, varname='?', freqs=None):
        """If freqs is given, it is a dictionary of value: frequency pairs,
        and the ProbDist then is normalized."""
        update(self, prob={}, varname=varname, values=[])
        if freqs:
            for (v, p) in freqs.items():
                self[v] = p
            self.normalize()

    def __getitem__(self, val):
        "Given a value, return P(value)."
        try: return self.prob[val]
        except KeyError: return 0
withal's avatar
withal a validé
        "Set P(val) = p."
        if val not in self.values:
            self.values.append(val)
        self.prob[val] = p

    def normalize(self):
        """Make sure the probabilities of all values sum to 1.
        Returns the normalized distribution.
        Raises a ZeroDivisionError if the sum of the values is 0.
        >>> P = ProbDist('Flip'); P['H'], P['T'] = 35, 65
        >>> P = P.normalize()
        >>> print '%5.3f %5.3f' % (P.prob['H'], P.prob['T'])
        0.350 0.650
        """
        total = float(sum(self.prob.values()))
        if not (1.0-epsilon < total < 1.0+epsilon):
            for val in self.prob:
                self.prob[val] /= total
        return self

    def show_approx(self, numfmt='%.3g'):
        """Show the probabilities rounded and sorted by key, for the
        sake of portable doctests."""
        return ', '.join([('%s: ' + numfmt) % (v, p)
                          for (v, p) in sorted(self.prob.items())])

epsilon = 0.001

class JointProbDist(ProbDist):
    """A discrete probability distribute over a set of variables.
    >>> P = JointProbDist(['X', 'Y']); P[1, 1] = 0.25
    >>> P[1, 1]
    0.25
    >>> P[dict(X=0, Y=1)] = 0.5
    >>> P[dict(X=0, Y=1)]
    def __init__(self, variables):
        update(self, prob={}, variables=variables, vals=DefaultDict([]))

    def __getitem__(self, values):
        "Given a tuple or dict of values, return P(values)."
        values = event_values(values, self.variables)
        return ProbDist.__getitem__(self, values)

    def __setitem__(self, values, p):
        """Set P(values) = p.  Values can be a tuple or a dict; it must
        have a value for each of the variables in the joint. Also keep track
        of the values we have seen so far for each variable."""
        values = event_values(values, self.variables)
withal's avatar
withal a validé
        for var, val in zip(self.variables, values):
            if val not in self.vals[var]:
                self.vals[var].append(val)

    def values(self, var):
        "Return the set of possible values for a variable."
        return self.vals[var]

    def __repr__(self):
        return "P(%s)" % self.variables

def event_values(event, vars):
    """Return a tuple of the values of variables vars in event.
    >>> event_values ({'A': 10, 'B': 9, 'C': 8}, ['C', 'A'])
    (8, 10)
    >>> event_values ((1, 2), ['C', 'A'])
    (1, 2)
    """
    if isinstance(event, tuple) and len(event) == len(vars):
        return event
    else:
        return tuple([event[var] for var in vars])

#______________________________________________________________________________

def enumerate_joint_ask(X, e, P):
    """Return a probability distribution over the values of the variable X,
    given the {var:val} observations e, in the JointProbDist P. [Section 13.3]
    >>> P = JointProbDist(['X', 'Y'])
    >>> P[0,0] = 0.25; P[0,1] = 0.5; P[1,1] = P[2,1] = 0.125
    >>> enumerate_joint_ask('X', dict(Y=1), P).show_approx()
    '0: 0.667, 1: 0.167, 2: 0.167'
    """
    assert X not in e, "Query variable must be distinct from evidence"
    Q = ProbDist(X) # probability distribution for X, initially empty
    Y = [v for v in P.variables if v != X and v not in e] # hidden vars.
    for xi in P.values(X):
        Q[xi] = enumerate_joint(Y, extend(e, X, xi), P)
    return Q.normalize()

def enumerate_joint(vars, e, P):
withal's avatar
withal a validé
    """Return the sum of those entries in P consistent with e,
    provided vars is P's remaining variables (the ones not in e)."""
    if not vars: 
    Y, rest = vars[0], vars[1:]
    return sum([enumerate_joint(rest, extend(e, Y, y), P) 
                for y in P.values(Y)])

#______________________________________________________________________________

withal's avatar
withal a validé
    "Bayesian network containing only boolean-variable nodes."
    def __init__(self, nodes=[]):
withal's avatar
withal a validé
        "nodes must be ordered with parents before children."
        update(self, nodes=[], vars=[])
        for node in nodes:
            self.add(node)

    def add(self, node):
withal's avatar
withal a validé
        """Add a node to the net. Its parents must already be in the
        net, and node itself must not."""
        assert node not in self.nodes
        assert every(lambda parent: parent in self.variables(), node.parents)
        self.nodes.append(node)
        self.vars.append(node.variable)

withal's avatar
withal a validé
    def variable_node(self, var):
        """Return the node for the variable named var.
        >>> burglary.variable_node('Burglary').variable
        'Burglary'"""
        for n in self.nodes:
            if n.variable == var:
                return n
        raise Exception("No such variable: %s" % var)

withal's avatar
withal a validé
    def variables(self):
        """List all of the net's variables, parents before children.
        >>> burglary.variables()
        ['Burglary', 'Earthquake', 'Alarm', 'JohnCalls', 'MaryCalls']"""
        return [n.variable for n in self.nodes]
    
withal's avatar
withal a validé
    def variable_values(self, var):
withal's avatar
withal a validé
        "Return the domain of var."
    """A conditional probability distribution for a boolean variable,
    P(X | parents). Part of a BayesNet."""

    def __init__(self, X, parents, cpt):
        """X is a variable name, and parents a sequence of variable
        names or a space-separated string.  cpt, the conditional
        probability table, takes one of these forms:

        * A number, the unconditional probability P(X=true). You can
          use this form when there are no parents.

        * A dict {v: p, ...}, the conditional probability distribution
          P(X=true | parent=v) = p. When there's just one parent.

        * A dict {(v1, v2, ...): p, ...}, the distribution P(X=true |
          parent1=v1, parent2=v2, ...) = p. Each key must have as many
          values as there are parents. You can use this form always;
          the first two are just conveniences.

        In all cases the probability of X being false is left implicit,
        since it follows from P(X=true).

        >>> X = BayesNode('X', '', 0.2)
        >>> Y = BayesNode('Y', 'P', {T: 0.2, F: 0.7})
        >>> Z = BayesNode('Z', 'P Q', 
        ...    {(T, T): 0.2, (T, F): 0.3, (F, T): 0.5, (F, F): 0.7})
        """
        if isinstance(parents, str): parents = parents.split()

        # We store the table always in the third form above.
        if isinstance(cpt, (float, int)): # no parents, 0-tuple
            cpt = {(): cpt}
        elif isinstance(cpt, dict):
            if cpt and isinstance(cpt.keys()[0], bool): # one parent, 1-tuple
withal's avatar
withal a validé
                cpt = dict(((v,), p) for v, p in cpt.items())

        assert isinstance(cpt, dict)
        for vs, p in cpt.items():
            assert isinstance(vs, tuple) and len(vs) == len(parents)
            assert every(lambda v: isinstance(v, bool), vs)
            assert 0 <= p <= 1

        update(self, variable=X, parents=parents, cpt=cpt)

    def p(self, value, event):
        """Return the conditional probability 
        P(X=value | parents=parent_values), where parent_values
        are the values of parents in event. (event must assign each
        parent a value.)
        >>> bn = BayesNode('X', 'Burglary', {T: 0.2, F: 0.625})
        >>> bn.p(False, {'Burglary': False, 'Earthquake': True})
        0.375"""
        assert isinstance(value, bool)
        ptrue = self.cpt[event_values(event, self.parents)]
        return if_(value, ptrue, 1 - ptrue)

    def sample(self, event):
        """Sample from the distribution for this variable conditioned
        on event's values for parent_vars. That is, return True/False
        at random according with the conditional probability given the
        parents."""
        return random() <= self.p(True, event)
# Burglary example [Fig. 14.2]

T, F = True, False

burglary = BayesNet([
withal's avatar
withal a validé
    node('Burglary', '', 0.001),
    node('Earthquake', '', 0.002),
    node('Alarm', 'Burglary Earthquake',
withal's avatar
withal a validé
         {(T, T): 0.95, (T, F): 0.94, (F, T): 0.29, (F, F): 0.001}),
    node('JohnCalls', 'Alarm', {T: 0.90, F: 0.05}),
    node('MaryCalls', 'Alarm', {T: 0.70, F: 0.01})
    ])

#______________________________________________________________________________

def enumeration_ask(X, e, bn):
withal's avatar
withal a validé
    """Return the conditional probability distribution of variable X
    given evidence e, from BayesNet bn. [Fig. 14.9]
    >>> enumeration_ask('Burglary',
    ...   {'JohnCalls': True, 'MaryCalls': True}, burglary).show_approx()
    'False: 0.716, True: 0.284'"""
withal's avatar
withal a validé
    Q = ProbDist(X)
    for xi in bn.variable_values(X):
        Q[xi] = enumerate_all(bn.variables(), extend(e, X, xi), bn)
    return Q.normalize()

def enumerate_all(vars, e, bn):
    """Return the sum of those entries in P(vars | e{others})
    consistent with e, where P is the joint distribution represented
    by bn, and e{others} means e restricted to bn's other variables
    (the ones other than vars). Parents must precede children in vars."""
withal's avatar
withal a validé
    if not vars:
withal's avatar
withal a validé
    Y, rest = vars[0], vars[1:]
    Ynode = bn.variable_node(Y)
    if Y in e:
        return Ynode.p(e[Y], e) * enumerate_all(rest, e, bn)
withal's avatar
withal a validé
    else:
        return sum(Ynode.p(y, e) * enumerate_all(rest, extend(e, Y, y), bn)
withal's avatar
withal a validé
                   for y in bn.variable_values(Y))
withal's avatar
withal a validé
#______________________________________________________________________________
def elimination_ask(X, e, bn, order=reversed):
    for var in order(bn.vars):
        factors.append(Factor(var, e)) 
        if is_hidden(var, X, e):
            factors = sum_out(var, factors)
    return pointwise_product(factors).normalize()

def is_hidden(var, X, e):
    return var != X and var not in e

def Factor(var, e):
    NotImplemented

def pointwise_product(factors):
    NotImplemented
    NotImplemented

#______________________________________________________________________________

# Fig. 14.12a: sprinkler network

sprinkler = BayesNet([
withal's avatar
withal a validé
    node('Cloudy', '', 0.5),
    node('Sprinkler', 'Cloudy', {T: 0.10, F: 0.50}),
    node('Rain', 'Cloudy', {T: 0.80, F: 0.20}),
    node('WetGrass', 'Sprinkler Rain',
withal's avatar
withal a validé
         {(T, T): 0.99, (T, F): 0.90, (F, T): 0.90, (F, F): 0.00})])

#______________________________________________________________________________

withal's avatar
withal a validé
    """Randomly sample from bn's full joint distribution. The result
    is a {variable: value} dict. [Fig. 14.13]"""
    event = {}
    for node in bn.nodes:
        event[node.variable] = node.sample(event)
withal's avatar
withal a validé
    return event

#_______________________________________________________________________________

def rejection_sampling(X, e, bn, N):
withal's avatar
withal a validé
    """Estimate the probability distribution of variable X given
    evidence e in BayesNet bn, using N samples.  [Fig. 14.14]
    Raises a ZeroDivisionError if all the N samples are rejected,
    i.e., inconsistent with e.
    >>> seed(47)
    >>> p = rejection_sampling('Burglary',
    ...   {'JohnCalls': True, 'MaryCalls': True}, burglary, 10000)
    >>> p.show_approx()
    'False: 0.7, True: 0.3'
withal's avatar
withal a validé
    counts = {True: 0, False: 0} # boldface N in Fig. 14.14
    for j in xrange(N):
withal's avatar
withal a validé
        sample = prior_sample(bn) # boldface x in Fig. 14.14
        if consistent_with(sample, e):
            counts[sample[X]] += 1
    return ProbDist(X, counts)
withal's avatar
withal a validé
def consistent_with(event, evidence):
    "Is event consistent with the given evidence?"
    return every(lambda (k, v): evidence.get(k, v) == v,
                 event.items())

#_______________________________________________________________________________

def likelihood_weighting(X, e, bn, N):
withal's avatar
withal a validé
    """Estimate the probability distribution of variable X given
    evidence e in BayesNet bn.  [Fig. 14.15]
    >>> seed(1017)
    >>> p = likelihood_weighting('Burglary',
    ...  {'JohnCalls': True, 'MaryCalls': True}, burglary, 10000)
    >>> p.show_approx()
    'False: 0.702, True: 0.298'
withal's avatar
withal a validé
    W = {True: 0.0, False: 0.0}
    for j in xrange(N):
withal's avatar
withal a validé
        sample, weight = weighted_sample(bn, e) # boldface x, w in Fig. 14.15
        W[sample[X]] += weight
    return ProbDist(X, W)
def weighted_sample(bn, e):
withal's avatar
withal a validé
    """Sample an event from bn that's consistent with the evidence e;
    return the event and its weight, the likelihood that the event
    accords to the evidence."""
    w = 1
    event = dict(e) # boldface x in Fig. 14.15
    for node in bn.nodes:
        Xi = node.variable
withal's avatar
withal a validé
        if Xi in e:
            w *= node.p(e[Xi], event)
            event[Xi] = node.sample(event)
withal's avatar
withal a validé
    return event, w
    

#_______________________________________________________________________________

# MISSING

peter.norvig's avatar
peter.norvig a validé
__doc__ += """
## We can build up a probability distribution like this (p. 469):
>>> P = ProbDist()
>>> P['sunny'] = 0.7
>>> P['rain'] = 0.2
>>> P['cloudy'] = 0.08
>>> P['snow'] = 0.02

## and query it like this: (Never mind this ELLIPSIS option
##                          added to make the doctest portable.)
>>> P['rain']               #doctest:+ELLIPSIS
0.2...
peter.norvig's avatar
peter.norvig a validé

## A Joint Probability Distribution is dealt with like this (Fig. 13.3):
peter.norvig's avatar
peter.norvig a validé
>>> P = JointProbDist(['Toothache', 'Cavity', 'Catch'])
>>> T, F = True, False
>>> P[T, T, T] = 0.108; P[T, T, F] = 0.012; P[F, T, T] = 0.072; P[F, T, F] = 0.008
>>> P[T, F, T] = 0.016; P[T, F, F] = 0.064; P[F, F, T] = 0.144; P[F, F, F] = 0.576

>>> P[T, T, T] 
0.108

## Ask for P(Cavity|Toothache=T)
>>> PC = enumerate_joint_ask('Cavity', {'Toothache': T}, P) 
>>> PC.show_approx()
'False: 0.4, True: 0.6'
peter.norvig's avatar
peter.norvig a validé

>>> 0.6-epsilon < PC[T] < 0.6+epsilon 
True

>>> 0.4-epsilon < PC[F] < 0.4+epsilon 
True
"""