In [53]:
import itertools

# Bayesian Networks

A Bayesian network, or Bayes net for short, is a data structure to represent a joint probability distribution, and do inference on it. For example, here is a network with five nodes, each with its conditional probability table, and with arrows from parent to child variables. The story, from Judea Pearl, is that Judea has a burglar alarm, and it can be triggered by either a burglary or an earthquake. If the alarm sounds, one or both of Judea's neighbors, John and Mary, might call him to let him know.

<p><img src="http://norvig.com/ipython/burglary2.jpg">

This topic of Bayes nets can be confusing, because there are many different concepts to keep track of:

* `BayesNet`: A graph, where each node represents a variable, and is pointed to by zero or more *parents*. (See diagram above.)

* `Variable`: A random variable; the ovals in the diagram above. We will only allow variables with a finite discrete domain of possible values; in the diagram all the variables are Boolean, meaning their domain is the set $\{t, f\}$. The value of a variable depends on the value of the parents, in a probabilistic way specified by the variable's conditional probability table. Given the parents, the variable is independent of all the other variables. For example, if I know whether *Alarm* is true or false, then I know the probability of *JohnCalls*, and evidence about the other variables won't give me any more information about *JohnCalls*.

* `ProbDist`: A probability distribution enumerates each possible value in the domain of a variable,
and the probability of that value. For example, `{True: 0.95, False: 0.05}` is a probability distribution for a Boolean variable.

* `CPTable`: A conditional probability table is a a mapping, `{tuple: ProbDist, ...}`, where each tuple lists the values of each of the parent variables, in order, and the probability distribution says what the possible outcomes are for the variable, given those values of the parents. For example, for the variable *Alarm*, the top row of the `CPTable` says "*t, t*, .95", which means that when *Burglary* is true and *Earthquake* is true, the probability of *Alarm* being true is .95. Think of this row entry as an abbreviation that makes sense for Boolean variables, but to accomodate non-Boolean variables, we will represent this in the more general format: `{(True, True): {True: 0.95, False: 0.05}}`.

* `Evidence`: A mapping, `{Variable: value, ...}`, which denotes which variables we have observed known values for.

We will introduce implementations of these concepts:

# `BayesNet`

A `BayesNet` is a graph of variables, where each variable is specified by a triple of `(name, parentnames, cpt)`, where the name is a string, the `parentnames` is a sequence of strings, and the CPT is in a format we will explain soon.

In [24]:
class BayesNet(object):
    "Bayesian network: a graph with an ordered list of variables."
     
    def __init__(self): 
        self.variables = [] # List of variables, in parent-first topological order
        self.lookup = {}    # Mapping of {variable_name: variable} pairs
            
    def add(self, name, parentnames, cpt):
        "Add a new Variable to the BayesNet. Parentnames must already have been added."
        parents = [self.lookup[name] for name in parentnames]
        var = Variable(name, parents, cpt)
        self.variables.append(var)
        self.lookup[name] = var
        return self

# `Variable` 

The `Variable` data structure holds a name, a list of parents (which are actual variables, not names), and a conditional probability table. The order of the parent variables is important, because you will have to use the same order in the CPT. For convenience, we also store the* domain* of the variable: the set of possible values (all our variables are discrete). 

In [32]:
class Variable(object):
    "A discrete random variable in a BayesNet."
    
    def __init__(self, name, parents, cpt):
        "A variable has a name, list of parent variables, and a CPT."
        self.name    = name
        self.parents = parents
        self.cpt     = CPT(cpt, parents)
        self.domain  = set(v for row in self.cpt for v in self.cpt[row])
        
    def P(self, evidence):
        "The full probability distribution for P(variable | evidence)."
        return self.cpt[tuple(evidence[var] for var in self.parents)]

    def __repr__(self): return self.name

#  `ProbDist` and `Evidence`

A `ProbDist` is a mapping of `{outcome: probability}` for every outcome of a random variable. You can give it the same arguments that you would give to the `dict` constructor. As a shortcut for Boolean random variables, you can say `ProbDist(0.2)` instead of `ProbDist({False: 0.8, True: 0.2})`.

`Evidence` is just a dict of `{variable: value}` pairs, describing the exact values for a set of variables.

In [7]:
class ProbDist(dict):
    "A Probability Distribution; an {outcome: probability} mapping."
    def __init__(self, mapping=(), **kwargs):
        if isinstance(mapping, float):
            mapping = {True: mapping, False: 1 - mapping}
        self.update(mapping, **kwargs)
        total = sum(self.values())
        normalize(self)
        
def normalize(dic):
    "Make sum to values of dic sum to 1.0; assert no negative values."
    total = sum(dic.values())
    for key in dic:
        dic[key] = dic[key] / total
        assert dic[key] >= 0
            
class Evidence(dict): pass

In [14]:
# An example ProbDist
ProbDist(heads=6, tails=4)

{'heads': 0.6, 'tails': 0.4}

In [15]:
# A Boolean ProbDist
ProbDist(0.25) 

{False: 0.75, True: 0.25}

#  `CPT`: Conditional Probability Table

A `CPT` is a mapping from tuples of parent values to probability distributions. Every possible tuple must be represented in the table. We allow shortcuts for the case of `CPT`s with zeron or one parent.

In [17]:
class CPT(dict):
    """A mapping of {row: ProbDist, ...} where each row is a tuple
    of possible values of the parent variables."""
    
    def __init__(self, data, parents=None):
        """Provides two shortcuts for writing a Conditional Probability Table. 
        With no parents, CPT(dist) =>  CPT({(): dist}).
        With one parent, CPT({val: dist,...}) => CPT({(val,): dist,...})."""
        def Tuple(row): return row if isinstance(row, tuple) else (row,)
        if not parents and (not isinstance(data, dict) or set(data.keys()) != {()}):
            data = {(): data}
        for row in data:
            self[Tuple(row)] = ProbDist(data[row])
        if parents:
            assert set(self) == set(expected_tuples(parents)), (
           "CPT must handle all possibile tuples of parent values")

def expected_tuples(parents):
    "The set of tuples of one value from each parent (in order)."
    return set(itertools.product(*[p.domain for p in parents]))

In [18]:
# An example of a CPT with no parents, and thus one row with an empty tuple
CPT({(): 0.25})

{(): {False: 0.75, True: 0.25}}

# An Example Bayes Net

Now we are ready to define the network from the burglary alarm scenario:

In [33]:
T = True
F = False

alarm_net = (BayesNet()
    .add('Burglary', [], 0.001)
    .add('Earthquake', [], 0.002)
    .add('Alarm', ['Burglary', 'Earthquake'],
         {(T, T): 0.95, (T, F): 0.94, (F, T): 0.29, (F, F): 0.001})
    .add('JohnCalls', ['Alarm'], {T: 0.90, F: 0.05})
    .add('MaryCalls', ['Alarm'], {T: 0.70, F:0.01}))

In [34]:
globals().update(alarm_net.lookup)

In [35]:
Alarm.cpt

{(False, False): {False: 0.999, True: 0.001},
 (False, True): {False: 0.71, True: 0.29},
 (True, False): {False: 0.06000000000000005, True: 0.94},
 (True, True): {False: 0.050000000000000044, True: 0.95}}

In [36]:
Alarm.P({Burglary:False, Earthquake:False})

{False: 0.999, True: 0.001}

In [38]:
Alarm.P({Burglary:False, Earthquake:False})[True]

0.001

# Inference in Bayes Nets

In [182]:
def enumeration_ask(X, e, bn):
    "Given evidence e, ask what the probability distribution is for X in bn."
    assert X not in e, "Query variable must be distinct from evidence"
    Q = {}
    for xi in X.domain:
        Q[xi] = enumerate_all_vars(bn.variables, extend(e, X, xi), bn)
    return ProbDist(Q)

def enumerate_all_vars(vars, e, bn):
    """Return the sum of those entries in P(vars | e_{others})
    consistent with e, where P is the joint distribution represented
    by bn, and e_{others} means e restricted to bn's other variables
    (the ones other than vars). Parents must precede children in vars."""
    if not vars:
        return 1.0
    Y, rest = vars[0], vars[1:]
    if Y in e:
        y = e[Y]
        return P(Y, e, y) * enumerate_all_vars(rest, e, bn)
    else:
        return sum(P(Y, e, y) * enumerate_all_vars(rest, extend(e, Y, y), bn)
                   for y in Y.domain)
    
def extend(dic, var, val):
    """Return a copy of the dict, with {var: val} added."""
    dic2 = dic.copy()
    dic2[var] = val
    return dic2

In [185]:
enumeration_ask(Burglary, {JohnCalls:T, MaryCalls:T}, alarm_net)

{False: 0.7158281646356071, True: 0.2841718353643929}

In [189]:
enumeration_ask(Burglary, {MaryCalls:T}, alarm_net)

{False: 0.9438825459610851, True: 0.056117454038914924}

In [190]:
enumeration_ask(Alarm, {MaryCalls:T}, alarm_net)

{False: 0.8499098822502404, True: 0.15009011774975956}

In [191]:
enumeration_ask(Earthquake, {MaryCalls:T}, alarm_net)

{False: 0.9641190847135443, True: 0.03588091528645573}

In [193]:
enumeration_ask(JohnCalls, {Earthquake:T}, alarm_net)

{False: 0.7029390000000001, True: 0.29706099999999996}

In [None]:
def enumeration_ask(X, e, bn):
    "Given evidence e, ask what the probability distribution is for X in bn."
    assert X not in e, "Query variable must be distinct from evidence"
    Q = {}
    for xi in X.domain:
        Q[xi] = enumerate_all_vars(bn.variables, extend(e, X, xi), bn)
    return ProbDist(Q)

def enumerate_all_vars(vars, e, bn):
    """Return the sum of those entries in P(vars | e_{others})
    consistent with e, where P is the joint distribution represented
    by bn, and e_{others} means e restricted to bn's other variables
    (the ones other than vars). Parents must precede children in vars."""
    if not vars:
        return 1.0
    Y, rest = vars[0], vars[1:]
    if Y in e:
        y = e[Y]
        return P(Y, e, y) * enumerate_all_vars(rest, e, bn)
    else:
        return sum(P(Y, e, y) * enumerate_all_vars(rest, extend(e, Y, y), bn)
                   for y in Y.domain)
    
def extend(dic, var, val):
    """Return a copy of the dict, with {var: val} added."""
    dic2 = dic.copy()
    dic2[var] = val
    return dic2

# Full Joint ???

In [51]:
def full_joint(net):
    rows = itertools.product(*[var.domain for var in net.variables])
    return {row: joint_probability(row, net)
            for row in rows}

def joint_probability(row, net):
    evidence = dict(zip(net.variables, row))
    def Pvar(var): 
        return var.cpt[tuple(evidence[v] for v in var.parents)][evidence[var]]
    return prod(Pvar(v) for v in net.variables)
    
def prod(numbers):
    product = 1
    for x in numbers:
        product *= x
    return product

j = full_joint(alarm_net)
j, len(j), sum(j.values())

({(False, False, False, False, False): 0.9367427006189999,
  (False, False, False, False, True): 0.009462047481,
  (False, False, False, True, False): 0.049302247401000004,
  (False, False, False, True, True): 0.0004980024990000001,
  (False, False, True, False, False): 2.9910059999999997e-05,
  (False, False, True, False, True): 6.979013999999998e-05,
  (False, False, True, True, False): 0.00026919054,
  (False, False, True, True, True): 0.0006281112599999999,
  (False, True, False, False, False): 0.00133417449,
  (False, True, False, False, True): 1.3476510000000001e-05,
  (False, True, False, True, False): 7.021971e-05,
  (False, True, False, True, True): 7.0929e-07,
  (False, True, True, False, False): 1.73826e-05,
  (False, True, True, False, True): 4.055939999999999e-05,
  (False, True, True, True, False): 0.00015644340000000003,
  (False, True, True, True, True): 0.0003650346,
  (True, False, False, False, False): 5.631714000000005e-05,
  (True, False, False, False, True): 5.688

In [50]:
def joint_distribution(net, evidence={}):
    "Given a Bayes net and some evidence variables, return the joint distribution over all variables."
    values = [({evidence[var]} if var in evidence else var.domain)
              for var in net.variables]
    return ProbDist({row: joint_probability(row, net)
                     for row in itertools.product(*values)})

def joint_probability(row, net):
    evidence = dict(zip(net.variables, row))
    def Pvar(var): 
        return var.cpt[tuple(evidence[v] for v in var.parents)][evidence[var]]
    return prod(Pvar(v) for v in net.variables)
    
def prod(numbers):
    product = 1
    for x in numbers:
        product *= x
    return product

j = joint_distribution(alarm_net, {JohnCalls:True, MaryCalls:True})
j, len(j), sum(j.values())

({(False, False, False, True, True): 0.23895323731595236,
  (False, False, True, True, True): 0.3013824614795795,
  (False, True, False, True, True): 0.0003403339180750413,
  (False, True, True, True, True): 0.17515213192200013,
  (True, False, False, True, True): 1.4365911696438334e-05,
  (True, False, True, True, True): 0.2835830968876924,
  (True, True, False, True, True): 2.399116849772601e-08,
  (True, True, True, True, True): 0.00057434857383556},
 8,
 1.0)

In [52]:
alarm_net.variables

[Burglary, Earthquake, Alarm, JohnCalls, MaryCalls]

In [146]:
def tests():
    ProbDist({'heads': 1, 'tails': 1}) == ProbDist(heads=2, tails=2) == {'heads': 0.5, 'tails': 0.5}
    ProbDist(0.2) == ProbDist({False: 0.8, True: 0.2})
    
    CPT(0.2, []) == CPT({(): {False: 0.8, True: 0.2}}, [])
    
    return 'tests pass'
    
tests()


'tests pass'

The entries in a `CPTable` are all of the form `{(parent_value, ...): ProbDist}`. You could create such a table yourself, but we provide the function `CPT` to make it slightly easier. We provide functions to verify CPTs and ProbDists.

In [None]:
The one method, `P`, gives the probability distribution for the variable, given evidence that specifies the values of all the parents.
(If you don't know the values for all the parents, later we will see that `enumeration_ask` can still give you an answer.)

In [102]:
ProbDist(.3)

{False: 0.7, True: 0.3}

In [80]:
T = True 
F = False

def CPT(data, 



Now name the variables and ask for  **P**(*Alarm* | *Burglary*=*f*, *Earthquake*=*t*):

In [86]:
Alarm.P({Burglary:F, Earthquake:T})

{False: 0.71, True: 0.29}

# Asia
https://www.norsys.com/tutorials/netica/secA/tut_A1.htm
    
Asia = (BayesNet()
        .add('VisitAsia', [], 0.01)
        .add('Smoker', [], 0.30)
        .add('TB', ['VisitAsia'], {T: 

# Flu Net

In [76]:
sick = (BayesNet()
        .add('Vaccinated', [], {(): 0.35})
        .add('Flu', ['Vaccinated'], {T: 0.075, F: 0.45})
        .add('Fever', ['Flu'], {T: 0.75,  F: 0.25})
        .add('Headache', ['Flu'], {T: 0.7,   F: 0.4}))

In [77]:
globals().update(sick)

enumeration_ask(Headache, {Flu: False}, sick)

{False: 0.6, True: 0.39999999999999997}

In [75]:
enumeration_ask(Headache, {Vaccinated: False, Fever: True}, sick)

{False: 0.386842105263158, True: 0.613157894736842}

In [38]:
enumeration_ask(Burglary, {JohnCalls: True, MaryCalls: True}, alarm_net)

{False: 0.7158281646356071, True: 0.2841718353643929}

In [39]:
enumeration_ask(Burglary, {JohnCalls: False, MaryCalls: False}, alarm_net)

{False: 0.9999098156062451, True: 9.018439375484353e-05}

In [40]:
enumeration_ask(Burglary, {JohnCalls: False, MaryCalls: True}, alarm_net)

{False: 0.993123753926579, True: 0.0068762460734210235}

In [41]:
enumeration_ask(Burglary, {JohnCalls: True, MaryCalls: False}, alarm_net)

{False: 0.9948701418665987, True: 0.005129858133401302}

In [47]:
# Not executable yet
weather = (BayesNet()
           .add('Yesterday', [], {(): {'rain': 0.2, 'sun': 0.8}})
           .add('Pressure', [], {(): {'lo': 0.3, 'hi': 0.7}})
           .add('Today', ['Yesterday', 'Pressure'], 
                {('rain', 'lo'): {'rain': 0.7, 'sun': 0.3},
                 ('rain', 'hi'): {'rain': 0.5, 'sun': 0.5},
                 ('sun', 'lo'):  {'rain': 0.2, 'sun': 0.8},
                 ('sun', 'hi'):  {'rain': 0.1, 'sun': 0.9}}))
                 
globals().update(weather)

In [49]:
enumeration_ask(Yesterday, {Today: 'rain'}, weather)

KeyError: True