logic.py 68,3 ko
Newer Older

    def get_neg_watched(self, l):
        return self.__watch_list[l][1]

    def add(self, clause, model):
        self.__twl[clause] = self.__assign_watching_literals(clause, model)
        w1, p1 = inspect_literal(self.get_first_watched(clause))
        w2, p2 = inspect_literal(self.get_second_watched(clause))
        self.__watch_list[w1][0].add(clause) if p1 else self.__watch_list[w1][1].add(clause)
        if w1 != w2:
            self.__watch_list[w2][0].add(clause) if p2 else self.__watch_list[w2][1].add(clause)

    def remove(self, clause):
        w1, p1 = inspect_literal(self.get_first_watched(clause))
        w2, p2 = inspect_literal(self.get_second_watched(clause))
        del self.__twl[clause]
        self.__watch_list[w1][0].discard(clause) if p1 else self.__watch_list[w1][1].discard(clause)
        if w1 != w2:
            self.__watch_list[w2][0].discard(clause) if p2 else self.__watch_list[w2][1].discard(clause)

    def update_first_watched(self, clause, model):
        # if a non-zero literal different from the other watched literal is found
        found, new_watching = self.__find_new_watching_literal(clause, self.get_first_watched(clause), model)
        if found:  # then it will replace the watched literal
            w, p = inspect_literal(self.get_second_watched(clause))
            self.__watch_list[w][0].remove(clause) if p else self.__watch_list[w][1].remove(clause)
            self.set_second_watched(clause, new_watching)
            w, p = inspect_literal(new_watching)
            self.__watch_list[w][0].add(clause) if p else self.__watch_list[w][1].add(clause)
            return True

    def update_second_watched(self, clause, model):
        # if a non-zero literal different from the other watched literal is found
        found, new_watching = self.__find_new_watching_literal(clause, self.get_second_watched(clause), model)
        if found:  # then it will replace the watched literal
            w, p = inspect_literal(self.get_first_watched(clause))
            self.__watch_list[w][0].remove(clause) if p else self.__watch_list[w][1].remove(clause)
            self.set_first_watched(clause, new_watching)
            w, p = inspect_literal(new_watching)
            self.__watch_list[w][0].add(clause) if p else self.__watch_list[w][1].add(clause)
            return True

    def __find_new_watching_literal(self, clause, other_watched, model):
        # if a non-zero literal different from the other watched literal is found
        if len(clause.args) > 2:
            for l in disjuncts(clause):
                if l != other_watched and pl_true(l, model) is not False:
                    # then it is returned
                    return True, l
        return False, None

    def __assign_watching_literals(self, clause, model=None):
        if len(clause.args) > 2:
            if model is None or not model:
                return [clause.args[0], clause.args[-1]]
            else:
                return [next(l for l in disjuncts(clause) if pl_true(l, model) is None),
                        next(l for l in disjuncts(clause) if pl_true(l, model) is False)]


# ______________________________________________________________________________
def WalkSAT(clauses, p=0.5, max_flips=10000):
Chipe1's avatar
Chipe1 a validé
    """Checks for satisfiability of all clauses by randomly flipping values of variables
    >>> WalkSAT([A & ~A], 0.5, 100) is None
    True
Chipe1's avatar
Chipe1 a validé
    """
    # Set of all symbols in all clauses
C.G.Vedant's avatar
C.G.Vedant a validé
    symbols = {sym for clause in clauses for sym in prop_symbols(clause)}
MircoT's avatar
MircoT a validé
    # model is a random assignment of true/false to the symbols in clauses
    model = {s: random.choice([True, False]) for s in symbols}
    for i in range(max_flips):
        satisfied, unsatisfied = [], []
        for clause in clauses:
Chipe1's avatar
Chipe1 a validé
            (satisfied if pl_true(clause, model) else unsatisfied).append(clause)
MircoT's avatar
MircoT a validé
        if not unsatisfied:  # if model satisfies all the clauses
            return model
        clause = random.choice(unsatisfied)
        if probability(p):
C.G.Vedant's avatar
C.G.Vedant a validé
            sym = random.choice(list(prop_symbols(clause)))
MircoT's avatar
MircoT a validé
            # Flip the symbol in clause that maximizes number of sat. clauses
Chipe1's avatar
Chipe1 a validé
            def sat_count(sym):
                # Return the the number of clauses satisfied after flipping the symbol.
Chipe1's avatar
Chipe1 a validé
                model[sym] = not model[sym]
                count = len([clause for clause in clauses if pl_true(clause, model)])
                model[sym] = not model[sym]
                return count
Peter Norvig's avatar
Peter Norvig a validé
            sym = argmax(prop_symbols(clause), key=sat_count)
    # If no solution is found within the flip limit, we return failure
Chipe1's avatar
Chipe1 a validé
    return None

# ______________________________________________________________________________
# Map Coloring Problems


def MapColoringSAT(colors, neighbors):
    """Make a SAT for the problem of coloring a map with different colors
    for any two adjacent regions. Arguments are a list of colors, and a
    dict of {region: [neighbor,...]} entries. This dict may also be
    specified as a string of the form defined by parse_neighbors."""
    if isinstance(neighbors, str):
        neighbors = parse_neighbors(neighbors)
    colors = UniversalDict(colors)
    clauses = []
    for state in neighbors.keys():
        clause = [expr(state + '_' + c) for c in colors[state]]
        clauses.append(clause)
        for t in itertools.combinations(clause, 2):
            clauses.append([~t[0], ~t[1]])
        visited = set()
        adj = set(neighbors[state]) - visited
        visited.add(state)
        for n_state in adj:
            for col in colors[n_state]:
                clauses.append([expr('~' + state + '_' + col), expr('~' + n_state + '_' + col)])
    return associate('&', map(lambda c: associate('|', c), clauses))


australia_sat = MapColoringSAT(list('RGB'), """SA: WA NT Q NSW V; NT: WA Q; NSW: Q V; T: """)

france_sat = MapColoringSAT(list('RGBY'),
                            """AL: LO FC; AQ: MP LI PC; AU: LI CE BO RA LR MP; BO: CE IF CA FC RA
                            AU; BR: NB PL; CA: IF PI LO FC BO; CE: PL NB NH IF BO AU LI PC; FC: BO
                            CA LO AL RA; IF: NH PI CA BO CE; LI: PC CE AU MP AQ; LO: CA AL FC; LR:
                            MP AU RA PA; MP: AQ LI AU LR; NB: NH CE PL BR; NH: PI IF CE NB; NO:
                            PI; PA: LR RA; PC: PL CE LI AQ; PI: NH NO CA IF; PL: BR NB CE PC; RA:
                            AU BO FC PA LR""")

usa_sat = MapColoringSAT(list('RGBY'),
                         """WA: OR ID; OR: ID NV CA; CA: NV AZ; NV: ID UT AZ; ID: MT WY UT;
                         UT: WY CO AZ; MT: ND SD WY; WY: SD NE CO; CO: NE KA OK NM; NM: OK TX AZ;
                         ND: MN SD; SD: MN IA NE; NE: IA MO KA; KA: MO OK; OK: MO AR TX;
                         TX: AR LA; MN: WI IA; IA: WI IL MO; MO: IL KY TN AR; AR: MS TN LA;
                         LA: MS; WI: MI IL; IL: IN KY; IN: OH KY; MS: TN AL; AL: TN GA FL;
                         MI: OH IN; OH: PA WV KY; KY: WV VA TN; TN: VA NC GA; GA: NC SC FL;
                         PA: NY NJ DE MD WV; WV: MD VA; VA: MD DC NC; NC: SC; NY: VT MA CT NJ;
                         NJ: DE; DE: MD; MD: DC; VT: NH MA; MA: NH RI CT; CT: RI; ME: NH;
                         HI: ; AK: """)


# ______________________________________________________________________________
# Expr functions for WumpusKB and HybridWumpusAgent

def facing_east(time):

def facing_west(time):

def facing_north(time):

def facing_south(time):

def wumpus(x, y):
def wumpus_alive(time):
    return Expr('WumpusAlive', time)

def have_arrow(time):
    return Expr('HaveArrow', time)

def percept_stench(time):
    return Expr('Stench', time)

def percept_breeze(time):
    return Expr('Breeze', time)

def percept_glitter(time):
    return Expr('Glitter', time)

def percept_bump(time):
    return Expr('Bump', time)

def percept_scream(time):
    return Expr('Scream', time)

def move_forward(time):
    return Expr('Forward', time)

def turn_left(time):
    return Expr('TurnLeft', time)

def turn_right(time):
    return Expr('TurnRight', time)

def ok_to_move(x, y, time):
    return Expr('OK', x, y, time)


def location(x, y, time=None):
    if time is None:
        return Expr('L', x, y)
    else:
        return Expr('L', x, y, time)

# Symbols

def implies(lhs, rhs):
    return Expr('==>', lhs, rhs)

def equiv(lhs, rhs):
# Helper Function

def new_disjunction(sentences):
    t = sentences[0]
    for i in range(1, len(sentences)):
        t |= sentences[i]
    return t


# ______________________________________________________________________________


class WumpusKB(PropKB):
    """
    Create a Knowledge Base that contains the a temporal "Wumpus physics" and temporal rules with time zero.
    def __init__(self, dimrow):
        super().__init__()
        self.dimrow = dimrow
        self.tell(~wumpus(1, 1))
        self.tell(~pit(1, 1))
        for y in range(1, dimrow + 1):
            for x in range(1, dimrow + 1):
                if x > 1:  # West room exists
                    pits_in.append(pit(x - 1, y))
                    wumpus_in.append(wumpus(x - 1, y))
                if y < dimrow:  # North room exists
                    pits_in.append(pit(x, y + 1))
                    wumpus_in.append(wumpus(x, y + 1))
                if x < dimrow:  # East room exists
                    pits_in.append(pit(x + 1, y))
                    wumpus_in.append(wumpus(x + 1, y))
                if y > 1:  # South room exists
                    pits_in.append(pit(x, y - 1))
                    wumpus_in.append(wumpus(x, y - 1))
                self.tell(equiv(breeze(x, y), new_disjunction(pits_in)))
                self.tell(equiv(stench(x, y), new_disjunction(wumpus_in)))
        # Rule that describes existence of at least one Wumpus
        for x in range(1, dimrow + 1):
                wumpus_at_least.append(wumpus(x, y))

        self.tell(new_disjunction(wumpus_at_least))
        # Rule that describes existence of at most one Wumpus
        for i in range(1, dimrow + 1):
            for j in range(1, dimrow + 1):
                for u in range(1, dimrow + 1):
                    for v in range(1, dimrow + 1):
                        if i != u or j != v:
                            self.tell(~wumpus(i, j) | ~wumpus(u, v))

        # Temporal rules at time zero
        for i in range(1, dimrow + 1):
            for j in range(1, dimrow + 1):
                self.tell(implies(location(i, j, 0), equiv(percept_breeze(0), breeze(i, j))))
                self.tell(implies(location(i, j, 0), equiv(percept_stench(0), stench(i, j))))
                if i != 1 or j != 1:
                    self.tell(~location(i, j, 0))

        self.tell(wumpus_alive(0))
        self.tell(have_arrow(0))
        self.tell(facing_east(0))
        self.tell(~facing_north(0))
        self.tell(~facing_south(0))
        self.tell(~facing_west(0))

    def make_action_sentence(self, action, time):
        actions = [move_forward(time), shoot(time), turn_left(time), turn_right(time)]
        for a in actions:
            if action is a:
                self.tell(action)
            else:
                self.tell(~a)

    def make_percept_sentence(self, percept, time):
        # Glitter, Bump, Stench, Breeze, Scream
        flags = [0, 0, 0, 0, 0]

        # Things perceived
        if isinstance(percept, Glitter):
            flags[0] = 1
            self.tell(percept_glitter(time))
        elif isinstance(percept, Bump):
            flags[1] = 1
            self.tell(percept_bump(time))
        elif isinstance(percept, Stench):
            flags[2] = 1
            self.tell(percept_stench(time))
        elif isinstance(percept, Breeze):
            flags[3] = 1
            self.tell(percept_breeze(time))
        elif isinstance(percept, Scream):
            flags[4] = 1
            self.tell(percept_scream(time))

        # Things not perceived
        for i in range(len(flags)):
            if flags[i] == 0:
                if i == 0:
                    self.tell(~percept_glitter(time))
                elif i == 1:
                    self.tell(~percept_bump(time))
                elif i == 2:
                    self.tell(~percept_stench(time))
                elif i == 3:
                    self.tell(~percept_breeze(time))
                elif i == 4:
                    self.tell(~percept_scream(time))

    def add_temporal_sentences(self, time):
        if time == 0:
            return
        t = time - 1

        # current location rules
        for i in range(1, self.dimrow + 1):
            for j in range(1, self.dimrow + 1):
                self.tell(implies(location(i, j, time), equiv(percept_breeze(time), breeze(i, j))))
                self.tell(implies(location(i, j, time), equiv(percept_stench(time), stench(i, j))))
                        location(i, j, time), location(i, j, time) & ~move_forward(time) | percept_bump(time)))
                    s.append(location(i - 1, j, t) & facing_east(t) & move_forward(t))

                if i != self.dimrow:
                    s.append(location(i + 1, j, t) & facing_west(t) & move_forward(t))

                    s.append(location(i, j - 1, t) & facing_north(t) & move_forward(t))
                if j != self.dimrow:
                    s.append(location(i, j + 1, t) & facing_south(t) & move_forward(t))
                # add sentence about location i,j
                # add sentence about safety of location i,j
                    equiv(ok_to_move(i, j, time), ~pit(i, j) & ~wumpus(i, j) & wumpus_alive(time))
        # Rules about current orientation

        a = facing_north(t) & turn_right(t)
        b = facing_south(t) & turn_left(t)
        c = facing_east(t) & ~turn_left(t) & ~turn_right(t)
        s = equiv(facing_east(time), a | b | c)
        self.tell(s)

        a = facing_north(t) & turn_left(t)
        b = facing_south(t) & turn_right(t)
        c = facing_west(t) & ~turn_left(t) & ~turn_right(t)
        s = equiv(facing_west(time), a | b | c)
        self.tell(s)

        a = facing_east(t) & turn_left(t)
        b = facing_west(t) & turn_right(t)
        c = facing_north(t) & ~turn_left(t) & ~turn_right(t)
        s = equiv(facing_north(time), a | b | c)
        self.tell(s)

        a = facing_west(t) & turn_left(t)
        b = facing_east(t) & turn_right(t)
        c = facing_south(t) & ~turn_left(t) & ~turn_right(t)
        s = equiv(facing_south(time), a | b | c)
        # Rules about last action
        self.tell(equiv(move_forward(t), ~turn_right(t) & ~turn_left(t)))
        # Rule about the arrow
        self.tell(equiv(have_arrow(time), have_arrow(t) & ~shoot(t)))
        # Rule about Wumpus (dead or alive)
        self.tell(equiv(wumpus_alive(time), wumpus_alive(t) & ~percept_scream(time)))
    def ask_if_true(self, query):
        return pl_resolution(self, query)
# ______________________________________________________________________________


class WumpusPosition:
    def __init__(self, x, y, orientation):
        self.X = x
        self.Y = y
        self.orientation = orientation

    def get_location(self):
        return self.X, self.Y

    def set_location(self, x, y):
        self.X = x
        self.Y = y

    def get_orientation(self):
        return self.orientation

    def set_orientation(self, orientation):
        self.orientation = orientation

    def __eq__(self, other):
        if other.get_location() == self.get_location() and other.get_orientation() == self.get_orientation():
            return True
        else:
            return False
# ______________________________________________________________________________


class HybridWumpusAgent(Agent):
    """An agent for the wumpus world that does logical inference. [Figure 7.20]"""
    def __init__(self, dimentions):
        self.dimrow = dimentions
        self.kb = WumpusKB(self.dimrow)
        self.t = 0
        self.plan = list()
        self.current_position = WumpusPosition(1, 1, 'UP')
        super().__init__(self.execute)

    def execute(self, percept):
        self.kb.make_percept_sentence(percept, self.t)
        self.kb.add_temporal_sentences(self.t)

        temp = list()

        for i in range(1, self.dimrow + 1):
            for j in range(1, self.dimrow + 1):
                if self.kb.ask_if_true(location(i, j, self.t)):
                    temp.append(i)
                    temp.append(j)

        if self.kb.ask_if_true(facing_north(self.t)):
            self.current_position = WumpusPosition(temp[0], temp[1], 'UP')
        elif self.kb.ask_if_true(facing_south(self.t)):
            self.current_position = WumpusPosition(temp[0], temp[1], 'DOWN')
        elif self.kb.ask_if_true(facing_west(self.t)):
            self.current_position = WumpusPosition(temp[0], temp[1], 'LEFT')
        elif self.kb.ask_if_true(facing_east(self.t)):
            self.current_position = WumpusPosition(temp[0], temp[1], 'RIGHT')

        safe_points = list()
        for i in range(1, self.dimrow + 1):
            for j in range(1, self.dimrow + 1):
                if self.kb.ask_if_true(ok_to_move(i, j, self.t)):
                    safe_points.append([i, j])

        if self.kb.ask_if_true(percept_glitter(self.t)):
            goals = list()
            goals.append([1, 1])
            self.plan.append('Grab')
            actions = self.plan_route(self.current_position, goals, safe_points)
            self.plan.extend(actions)
            self.plan.append('Climb')

        if len(self.plan) == 0:
            unvisited = list()
            for i in range(1, self.dimrow + 1):
                for j in range(1, self.dimrow + 1):
                    for k in range(self.t):
                        if self.kb.ask_if_true(location(i, j, k)):
                            unvisited.append([i, j])
            unvisited_and_safe = list()
            for u in unvisited:
                for s in safe_points:
                    if u not in unvisited_and_safe and s == u:
                        unvisited_and_safe.append(u)

            temp = self.plan_route(self.current_position, unvisited_and_safe, safe_points)
            self.plan.extend(temp)
        if len(self.plan) == 0 and self.kb.ask_if_true(have_arrow(self.t)):
            possible_wumpus = list()
            for i in range(1, self.dimrow + 1):
                for j in range(1, self.dimrow + 1):
                    if not self.kb.ask_if_true(wumpus(i, j)):
                        possible_wumpus.append([i, j])

            temp = self.plan_shot(self.current_position, possible_wumpus, safe_points)
            self.plan.extend(temp)

        if len(self.plan) == 0:
            not_unsafe = list()
            for i in range(1, self.dimrow + 1):
                for j in range(1, self.dimrow + 1):
                    if not self.kb.ask_if_true(ok_to_move(i, j, self.t)):
                        not_unsafe.append([i, j])
            temp = self.plan_route(self.current_position, not_unsafe, safe_points)
            self.plan.extend(temp)

        if len(self.plan) == 0:
            start = list()
            start.append([1, 1])
            temp = self.plan_route(self.current_position, start, safe_points)
            self.plan.extend(temp)
            self.plan.append('Climb')

        action = self.plan[0]
        self.plan = self.plan[1:]
        self.kb.make_action_sentence(action, self.t)
        self.t += 1

        return action
    def plan_route(self, current, goals, allowed):
        problem = PlanRoute(current, goals, allowed, self.dimrow)
        return astar_search(problem).solution()

    def plan_shot(self, current, goals, allowed):
        shooting_positions = set()

        for loc in goals:
            x = loc[0]
            y = loc[1]
            for i in range(1, self.dimrow + 1):
                if i < x:
                    shooting_positions.add(WumpusPosition(i, y, 'EAST'))
                if i > x:
                    shooting_positions.add(WumpusPosition(i, y, 'WEST'))
                if i < y:
                    shooting_positions.add(WumpusPosition(x, i, 'NORTH'))
                if i > y:
                    shooting_positions.add(WumpusPosition(x, i, 'SOUTH'))

        # Can't have a shooting position from any of the rooms the Wumpus could reside
        orientations = ['EAST', 'WEST', 'NORTH', 'SOUTH']
        for loc in goals:
            for orientation in orientations:
                shooting_positions.remove(WumpusPosition(loc[0], loc[1], orientation))

        actions = list()
        actions.extend(self.plan_route(current, shooting_positions, allowed))
        actions.append('Shoot')
        return actions
# ______________________________________________________________________________
def SAT_plan(init, transition, goal, t_max, SAT_solver=cdcl_satisfiable):
C.G.Vedant's avatar
C.G.Vedant a validé
    """Converts a planning problem to Satisfaction problem by translating it to a cnf sentence.
    [Figure 7.22]
    >>> transition = {'A': {'Left': 'A', 'Right': 'B'}, 'B': {'Left': 'A', 'Right': 'C'}, 'C': {'Left': 'B', 'Right': 'C'}}
    # Functions used by SAT_plan
C.G.Vedant's avatar
C.G.Vedant a validé
    def translate_to_SAT(init, transition, goal, time):
        clauses = []
        states = [state for state in transition]
        # Symbol claiming state s at time t
        state_counter = itertools.count()
C.G.Vedant's avatar
C.G.Vedant a validé
        for s in states:
            for t in range(time + 1):
Donato Meoli's avatar
Donato Meoli a validé
                state_sym[s, t] = Expr("S{}".format(next(state_counter)))
        # Add initial state axiom
C.G.Vedant's avatar
C.G.Vedant a validé
        clauses.append(state_sym[init, 0])

        # Add goal state axiom
        clauses.append(state_sym[first(clause[0] for clause in state_sym
                                       if set(conjuncts(clause[0])).issuperset(conjuncts(goal))), time]) \
            if isinstance(goal, Expr) else clauses.append(state_sym[goal, time])
        # All possible transitions
        transition_counter = itertools.count()
C.G.Vedant's avatar
C.G.Vedant a validé
        for s in states:
            for action in transition[s]:
                s_ = transition[s][action]
                for t in range(time):
                    # Action 'action' taken from state 's' at time 't' to reach 's_'
Donato Meoli's avatar
Donato Meoli a validé
                    action_sym[s, action, t] = Expr("T{}".format(next(transition_counter)))
C.G.Vedant's avatar
C.G.Vedant a validé

                    # Change the state from s to s_
                    clauses.append(action_sym[s, action, t] | '==>' | state_sym[s, t])
                    clauses.append(action_sym[s, action, t] | '==>' | state_sym[s_, t + 1])
        # Allow only one state at any time
        for t in range(time + 1):
            # must be a state at any time
            clauses.append(associate('|', [state_sym[s, t] for s in states]))
C.G.Vedant's avatar
C.G.Vedant a validé

            for s in states:
                for s_ in states[states.index(s) + 1:]:
                    # for each pair of states s, s_ only one is possible at time t
C.G.Vedant's avatar
C.G.Vedant a validé
                    clauses.append((~state_sym[s, t]) | (~state_sym[s_, t]))

        # Restrict to one transition per timestep
C.G.Vedant's avatar
C.G.Vedant a validé
        for t in range(time):
            # list of possible transitions at time t
C.G.Vedant's avatar
C.G.Vedant a validé
            transitions_t = [tr for tr in action_sym if tr[2] == t]

            # make sure at least one of the transitions happens
            clauses.append(associate('|', [action_sym[tr] for tr in transitions_t]))
C.G.Vedant's avatar
C.G.Vedant a validé

            for tr in transitions_t:
                for tr_ in transitions_t[transitions_t.index(tr) + 1:]:
                    # there cannot be two transitions tr and tr_ at time t
                    clauses.append(~action_sym[tr] | ~action_sym[tr_])
        # Combine the clauses to form the cnf
C.G.Vedant's avatar
C.G.Vedant a validé
        return associate('&', clauses)

    def extract_solution(model):
        true_transitions = [t for t in action_sym if model[action_sym[t]]]
        # Sort transitions based on time, which is the 3rd element of the tuple
        true_transitions.sort(key=lambda x: x[2])
        return [action for s, action, time in true_transitions]
    # Body of SAT_plan algorithm
        # dictionaries to help extract the solution from model
        state_sym = {}
        action_sym = {}

withal's avatar
withal a validé
        cnf = translate_to_SAT(init, transition, goal, t)
        model = SAT_solver(cnf)
        if model is not False:
            return extract_solution(model)
    return None

# ______________________________________________________________________________
def unify(x, y, s={}):
    """Unify expressions x,y with substitution s; return a substitution that
    would make x,y equal, or None if x,y can not unify. x and y can be
    variables (e.g. Expr('x')), constants, lists, or Exprs. [Figure 9.1]
    >>> unify(x, 3, {})
    {x: 3}
    """
    if s is None:
        return None
    elif x == y:
        return s
    elif is_variable(x):
        return unify_var(x, y, s)
    elif is_variable(y):
        return unify_var(y, x, s)
    elif isinstance(x, Expr) and isinstance(y, Expr):
        return unify(x.args, y.args, unify(x.op, y.op, s))
    elif isinstance(x, str) or isinstance(y, str):
        return None
    elif issequence(x) and issequence(y) and len(x) == len(y):
MircoT's avatar
MircoT a validé
        if not x:
            return s
        return unify(x[1:], y[1:], unify(x[0], y[0], s))
    else:
        return None

    """A variable is an Expr with no args and a lowercase symbol as the op."""
Peter Norvig's avatar
Peter Norvig a validé
    return isinstance(x, Expr) and not x.args and x.op[0].islower()
def unify_var(var, x, s):
    if var in s:
        return unify(s[var], x, s)
C.G.Vedant's avatar
C.G.Vedant a validé
    elif x in s:
        return unify(var, s[x], s)
    elif occur_check(var, x, s):
        new_s = extend(s, var, x)
        cascade_substitution(new_s)
        return new_s
def occur_check(var, x, s):
    """Return true if variable var occurs anywhere in x
    (or in subst(s, x), if s has a binding for x)."""
    elif is_variable(x) and x in s:
        return occur_check(var, s[x], s)
        return (occur_check(var, x.op, s) or
                occur_check(var, x.args, s))
withal's avatar
withal a validé
    elif isinstance(x, (list, tuple)):
norvig's avatar
norvig a validé
        return first(e for e in x if occur_check(var, e, s))
    else:
        return False
def subst(s, x):
    """Substitute the substitution s into the expression x.
    >>> subst({x: 42, y:0}, F(x) + y)
    (F(42) + 0)
    """
withal's avatar
withal a validé
    if isinstance(x, list):
        return [subst(s, xi) for xi in x]
withal's avatar
withal a validé
    elif isinstance(x, tuple):
        return tuple([subst(s, xi) for xi in x])
withal's avatar
withal a validé
    elif not isinstance(x, Expr):
withal's avatar
withal a validé
    elif is_var_symbol(x.op):
withal's avatar
withal a validé
    else:
        return Expr(x.op, *[subst(s, arg) for arg in x.args])
def cascade_substitution(s):
    """This method allows to return a correct unifier in normal form
    and perform a cascade substitution to s.
    For every mapping in s perform a cascade substitution on s.get(x)
    and if it is replaced with a function ensure that all the function 
    terms are correct updates by passing over them again.
    >>> s = {x: y, y: G(z)}
    >>> cascade_substitution(s)
    >>> s == {x: G(z), y: G(z)}
    True
    """

    for x in s:
        s[x] = subst(s, s.get(x))
        if isinstance(s.get(x), Expr) and not is_variable(s.get(x)):
            # Ensure Function Terms are correct updates by passing over them again.
            s[x] = subst(s, s.get(x))
withal's avatar
withal a validé
def standardize_variables(sentence, dic=None):
Peter Norvig's avatar
Peter Norvig a validé
    """Replace all the variables in sentence with new variables."""
MircoT's avatar
MircoT a validé
    if dic is None:
        dic = {}
    if not isinstance(sentence, Expr):
withal's avatar
withal a validé
    elif is_var_symbol(sentence.op):
        if sentence in dic:
            return dic[sentence]
        else:
            v = Expr('v_{}'.format(next(standardize_variables.counter)))
withal's avatar
withal a validé
    else:
Donato Meoli's avatar
Donato Meoli a validé
        return Expr(sentence.op, *[standardize_variables(a, dic) for a in sentence.args])
withal's avatar
withal a validé
standardize_variables.counter = itertools.count()
# ______________________________________________________________________________
class FolKB(KB):
    """A knowledge base consisting of first-order definite clauses.
    >>> kb0 = FolKB([expr('Farmer(Mac)'), expr('Rabbit(Pete)'),
    ...              expr('(Rabbit(r) & Farmer(f)) ==> Hates(f, r)')])
    >>> kb0.tell(expr('Rabbit(Flopsie)'))
    >>> kb0.retract(expr('Rabbit(Pete)'))
    >>> kb0.ask(expr('Hates(Mac, x)'))[x]
    Flopsie
    >>> kb0.ask(expr('Wife(Pete, x)'))
    False
    def __init__(self, initial_clauses=None):
MircoT's avatar
MircoT a validé
        self.clauses = []  # inefficient: no indexing
        if initial_clauses:
            for clause in initial_clauses:
                self.tell(clause)

    def tell(self, sentence):
        if is_definite_clause(sentence):
            self.clauses.append(sentence)
        else:
            raise Exception("Not a definite clause: {}".format(sentence))

    def ask_generator(self, query):
withal's avatar
withal a validé
        return fol_bc_ask(self, query)

    def retract(self, sentence):
        self.clauses.remove(sentence)

withal's avatar
withal a validé
    def fetch_rules_for_goal(self, goal):
        return self.clauses

def fol_fc_ask(KB, alpha):
    """A simple forward-chaining algorithm. [Figure 9.3]"""
    # TODO: Improve efficiency
C.G.Vedant's avatar
C.G.Vedant a validé
    kb_consts = list({c for clause in KB.clauses for c in constant_symbols(clause)})
C.G.Vedant's avatar
C.G.Vedant a validé
    def enum_subst(p):
        query_vars = list({v for clause in p for v in variables(clause)})
        for assignment_list in itertools.product(kb_consts, repeat=len(query_vars)):
            theta = {x: y for x, y in zip(query_vars, assignment_list)}
            yield theta

    # check if we can answer without new inferences
    for q in KB.clauses:
Donato Meoli's avatar
Donato Meoli a validé
        phi = unify(q, alpha)
        if phi is not None:
            yield phi
    while True:
        new = []
        for rule in KB.clauses:
            p, q = parse_definite_clause(rule)
C.G.Vedant's avatar
C.G.Vedant a validé
            for theta in enum_subst(p):
                if set(subst(theta, p)).issubset(set(KB.clauses)):
                    q_ = subst(theta, q)
Donato Meoli's avatar
Donato Meoli a validé
                    if all([unify(x, q_) is None for x in KB.clauses + new]):
                        new.append(q_)
Donato Meoli's avatar
Donato Meoli a validé
                        phi = unify(q_, alpha)
                        if phi is not None:
                            yield phi
        if not new:
            break
        for clause in new:
            KB.tell(clause)
    return None
withal's avatar
withal a validé
def fol_bc_ask(KB, query):
    """A simple backward-chaining algorithm for first-order logic. [Figure 9.6]
    KB should be an instance of FolKB, and query an atomic sentence."""
withal's avatar
withal a validé
    return fol_bc_or(KB, query, {})

withal's avatar
withal a validé
def fol_bc_or(KB, goal, theta):
    for rule in KB.fetch_rules_for_goal(goal):
        lhs, rhs = parse_definite_clause(standardize_variables(rule))
        for theta1 in fol_bc_and(KB, lhs, unify(rhs, goal, theta)):
            yield theta1

withal's avatar
withal a validé
def fol_bc_and(KB, goals, theta):
    if theta is None:
        pass
    elif not goals:
        yield theta
withal's avatar
withal a validé
    else:
        first, rest = goals[0], goals[1:]
        for theta1 in fol_bc_or(KB, subst(theta, first), theta):
            for theta2 in fol_bc_and(KB, rest, theta1):
                yield theta2
C.G.Vedant's avatar
C.G.Vedant a validé
# A simple KB that defines the relevant conditions of the Wumpus World as in Fig 7.4.
# See Sec. 7.4.3
wumpus_kb = PropKB()

P11, P12, P21, P22, P31, B11, B21 = expr('P11, P12, P21, P22, P31, B11, B21')
wumpus_kb.tell(~P11)
wumpus_kb.tell(B11 | '<=>' | (P12 | P21))
wumpus_kb.tell(B21 | '<=>' | (P11 | P22 | P31))
C.G.Vedant's avatar
C.G.Vedant a validé
wumpus_kb.tell(~B11)
wumpus_kb.tell(B21)

test_kb = FolKB(
    map(expr, ['Farmer(Mac)',
               'Rabbit(Pete)',
               'Mother(MrsMac, Mac)',
               'Mother(MrsRabbit, Pete)',
               '(Rabbit(r) & Farmer(f)) ==> Hates(f, r)',
               '(Mother(m, c)) ==> Loves(m, c)',
               '(Mother(m, r) & Rabbit(r)) ==> Rabbit(m)',
               '(Farmer(f)) ==> Human(f)',
               # Note that this order of conjuncts
               # would result in infinite recursion:
               # '(Human(h) & Mother(m, h)) ==> Human(m)'
               '(Mother(m, h) & Human(h)) ==> Human(m)'
               ]))

crime_kb = FolKB(
    map(expr, ['(American(x) & Weapon(y) & Sells(x, y, z) & Hostile(z)) ==> Criminal(x)',
               'Owns(Nono, M1)',
               'Missile(M1)',
               '(Missile(x) & Owns(Nono, x)) ==> Sells(West, x, Nono)',
               'Missile(x) ==> Weapon(x)',
               'Enemy(x, America) ==> Hostile(x)',
               'American(West)',
               'Enemy(Nono, America)'
               ]))

# ______________________________________________________________________________

# Example application (not in the book).
# You can use the Expr class to do symbolic differentiation.  This used to be
# a part of AI; now it is considered a separate field, Symbolic Algebra.

def diff(y, x):
    """Return the symbolic derivative, dy/dx, as an Expr.
    However, you probably want to simplify the results with simp.
    >>> diff(x * x, x)
    ((x * 1) + (x * 1))
    """
MircoT's avatar
MircoT a validé
    if y == x:
Peter Norvig's avatar
Peter Norvig a validé
        return 1
MircoT's avatar
MircoT a validé
    elif not y.args:
Peter Norvig's avatar
Peter Norvig a validé
        return 0
    else:
        u, op, v = y.args[0], y.op, y.args[-1]
MircoT's avatar
MircoT a validé
        if op == '+':
            return diff(u, x) + diff(v, x)
        elif op == '-' and len(y.args) == 1:
MircoT's avatar
MircoT a validé
            return -diff(u, x)
        elif op == '-':
            return diff(u, x) - diff(v, x)
        elif op == '*':
            return u * diff(v, x) + v * diff(u, x)
        elif op == '/':
            return (v * diff(u, x) - u * diff(v, x)) / (v * v)
        elif op == '**' and isnumber(x.op):
            return v * u ** (v - 1) * diff(u, x)
MircoT's avatar
MircoT a validé
        elif op == '**':
            return (v * u ** (v - 1) * diff(u, x) +
                    u ** v * Expr('log')(u) * diff(v, x))
MircoT's avatar
MircoT a validé
        elif op == 'log':
            return diff(u, x) / u
        else:
            raise ValueError("Unknown op: {} in diff({}, {})".format(op, y, x))
    """Simplify the expression x."""
    if isnumber(x) or not x.args:
MircoT's avatar
MircoT a validé
        return x
MircoT's avatar
MircoT a validé
    args = list(map(simp, x.args))
    u, op, v = args[0], x.op, args[-1]
withal's avatar
withal a validé
    if op == '+':
Peter Norvig's avatar
Peter Norvig a validé
        if v == 0:
MircoT's avatar
MircoT a validé
            return u