planning.py 71,6 ko
Newer Older
"""Planning (Chapters 10-11)
"""

Aman Deep Singh's avatar
Aman Deep Singh a validé
import copy
Manpreet Kaur's avatar
Manpreet Kaur a validé
import itertools
from collections import deque, defaultdict
from functools import reduce as _reduce

import search
from logic import FolKB, conjuncts, unify, associate, SAT_plan, dpll_satisfiable
from search import Node
from utils import Expr, expr, first
class PlanningProblem:
C.G.Vedant's avatar
C.G.Vedant a validé
    """
    Planning Domain Definition Language (PlanningProblem) used to define a search problem.
    It stores states in a knowledge base consisting of first order logic statements.
    The conjunction of these logical statements completely defines a state.
C.G.Vedant's avatar
C.G.Vedant a validé
    """
    def __init__(self, initial, goals, actions):
        self.initial = self.convert(initial)
        self.goals = self.convert(goals)
C.G.Vedant's avatar
C.G.Vedant a validé
        self.actions = actions
    def convert(self, clauses):
        """Converts strings into exprs"""
        if not isinstance(clauses, Expr):
            if len(clauses) > 0:
                clauses = expr(clauses)
            else:
                clauses = []
            clauses = conjuncts(clauses)
        except AttributeError:
Aman Deep Singh's avatar
Aman Deep Singh a validé

        new_clauses = []
        for clause in clauses:
            if clause.op == '~':
                new_clauses.append(expr('Not' + str(clause.args[0])))
            else:
                new_clauses.append(clause)
        return new_clauses
C.G.Vedant's avatar
C.G.Vedant a validé

    def expand_actions(self, name=None):
        """Generate all possible actions with variable bindings for precondition selection heuristic"""

        objects = set(arg for clause in self.initial for arg in clause.args)
        expansions = []
        action_list = []
        if name is not None:
            for action in self.actions:
                if str(action.name) == name:
                    action_list.append(action)
                    break
        else:
            action_list = self.actions

        for action in action_list:
            for permutation in itertools.permutations(objects, len(action.args)):
                bindings = unify(Expr(action.name, *action.args), Expr(action.name, *permutation))
                if bindings is not None:
                    new_args = []
                    for arg in action.args:
                        if arg in bindings:
                            new_args.append(bindings[arg])
                        else:
                            new_args.append(arg)
                    new_expr = Expr(str(action.name), *new_args)
                    new_preconds = []
                    for precond in action.precond:
                        new_precond_args = []
                        for arg in precond.args:
                            if arg in bindings:
                                new_precond_args.append(bindings[arg])
                            else:
                                new_precond_args.append(arg)
                        new_precond = Expr(str(precond.op), *new_precond_args)
                        new_preconds.append(new_precond)
                    new_effects = []
                    for effect in action.effect:
                        new_effect_args = []
                        for arg in effect.args:
                            if arg in bindings:
                                new_effect_args.append(bindings[arg])
                            else:
                                new_effect_args.append(arg)
                        new_effect = Expr(str(effect.op), *new_effect_args)
                        new_effects.append(new_effect)
                    expansions.append(Action(new_expr, new_preconds, new_effects))

        return expansions

    def is_strips(self):
        """
        Returns True if the problem does not contain negative literals in preconditions and goals
        """
        return (all(clause.op[:3] != 'Not' for clause in self.goals) and
                all(clause.op[:3] != 'Not' for action in self.actions for clause in action.precond))

C.G.Vedant's avatar
C.G.Vedant a validé
    def goal_test(self):
        """Checks if the goals have been reached"""
        return all(goal in self.initial for goal in self.goals)
C.G.Vedant's avatar
C.G.Vedant a validé

    def act(self, action):
        """
        Performs the action given as argument.
C.G.Vedant's avatar
C.G.Vedant a validé
        Note that action is an Expr like expr('Remove(Glass, Table)') or expr('Eat(Sandwich)')
C.G.Vedant's avatar
C.G.Vedant a validé
        action_name = action.op
        args = action.args
        list_action = first(a for a in self.actions if a.name == action_name)
        if list_action is None:
            raise Exception("Action '{}' not found".format(action_name))
        if not list_action.check_precond(self.initial, args):
C.G.Vedant's avatar
C.G.Vedant a validé
            raise Exception("Action '{}' pre-conditions not satisfied".format(action))
        self.initial = list_action(self.initial, args).clauses
C.G.Vedant's avatar
C.G.Vedant a validé

C.G.Vedant's avatar
C.G.Vedant a validé
class Action:
    """
    Defines an action schema using preconditions and effects.
    Use this to describe actions in PlanningProblem.
    action is an Expr where variables are given as arguments(args).
    Precondition and effect are both lists with positive and negative literals.
    Negative preconditions and effects are defined by adding a 'Not' before the name of the clause
C.G.Vedant's avatar
C.G.Vedant a validé
    Example:
    precond = [expr("Human(person)"), expr("Hungry(Person)"), expr("NotEaten(food)")]
    effect = [expr("Eaten(food)"), expr("Hungry(person)")]
    eat = Action(expr("Eat(person, food)"), precond, effect)
C.G.Vedant's avatar
C.G.Vedant a validé
    """

    def __init__(self, action, precond, effect):
        if isinstance(action, str):
            action = expr(action)
C.G.Vedant's avatar
C.G.Vedant a validé
        self.name = action.op
        self.args = action.args
        self.precond = self.convert(precond)
        self.effect = self.convert(effect)
C.G.Vedant's avatar
C.G.Vedant a validé

    def __call__(self, kb, args):
        return self.act(kb, args)

    def __repr__(self):
        return '{}'.format(Expr(self.name, *self.args))
    def convert(self, clauses):
        """Converts strings into Exprs"""
        if isinstance(clauses, Expr):
            clauses = conjuncts(clauses)
            for i in range(len(clauses)):
                if clauses[i].op == '~':
                    clauses[i] = expr('Not' + str(clauses[i].args[0]))
        elif isinstance(clauses, str):
            clauses = clauses.replace('~', 'Not')
            if len(clauses) > 0:
                clauses = expr(clauses)
            try:
                clauses = conjuncts(clauses)
            except AttributeError:
                pass
        return clauses
    def relaxed(self):
        """
        Removes delete list from the action by removing all negative literals from action's effect
        """
        return Action(Expr(self.name, *self.args), self.precond,
                      list(filter(lambda effect: effect.op[:3] != 'Not', self.effect)))

C.G.Vedant's avatar
C.G.Vedant a validé
    def substitute(self, e, args):
        """Replaces variables in expression with their respective Propositional symbol"""
opensourceware's avatar
opensourceware a validé
        new_args = list(e.args)
        for num, x in enumerate(e.args):
            for i, _ in enumerate(self.args):
opensourceware's avatar
opensourceware a validé
                if self.args[i] == x:
                    new_args[num] = args[i]
C.G.Vedant's avatar
C.G.Vedant a validé
        return Expr(e.op, *new_args)

    def check_precond(self, kb, args):
        """Checks if the precondition is satisfied in the current state"""

        if isinstance(kb, list):
            kb = FolKB(kb)
        for clause in self.precond:
C.G.Vedant's avatar
C.G.Vedant a validé
            if self.substitute(clause, args) not in kb.clauses:
                return False
        return True

    def act(self, kb, args):
        """Executes the action on the state's knowledge base"""
C.G.Vedant's avatar
C.G.Vedant a validé

        if isinstance(kb, list):
            kb = FolKB(kb)
C.G.Vedant's avatar
C.G.Vedant a validé

        if not self.check_precond(kb, args):
            raise Exception('Action pre-conditions not satisfied')
        for clause in self.effect:
            kb.tell(self.substitute(clause, args))
            if clause.op[:3] == 'Not':
                new_clause = Expr(clause.op[3:], *clause.args)
                if kb.ask(self.substitute(new_clause, args)) is not False:
                    kb.retract(self.substitute(new_clause, args))
            else:
                new_clause = Expr('Not' + clause.op, *clause.args)
C.G.Vedant's avatar
C.G.Vedant a validé

                if kb.ask(self.substitute(new_clause, args)) is not False:
                    kb.retract(self.substitute(new_clause, args))
def goal_test(goals, state):
    """Generic goal testing helper function"""

    if isinstance(state, list):
        kb = FolKB(state)
    else:
        kb = state
    return all(kb.ask(q) is not False for q in goals)


def air_cargo():
    """
    [Figure 10.1] AIR-CARGO-PROBLEM

    An air-cargo shipment problem for delivering cargo to different locations,
    given the starting location and airplanes.

    Example:
    >>> from planning import *
    >>> ac = air_cargo()
    >>> ac.goal_test()
    False
    >>> ac.act(expr('Load(C2, P2, JFK)'))
    >>> ac.act(expr('Load(C1, P1, SFO)'))
    >>> ac.act(expr('Fly(P1, SFO, JFK)'))
    >>> ac.act(expr('Fly(P2, JFK, SFO)'))
    >>> ac.act(expr('Unload(C2, P2, SFO)'))
    >>> ac.goal_test()
    False
    >>> ac.act(expr('Unload(C1, P1, JFK)'))
    >>> ac.goal_test()
    True
    >>>
    """
C.G.Vedant's avatar
C.G.Vedant a validé

    return PlanningProblem(
        initial='At(C1, SFO) & At(C2, JFK) & At(P1, SFO) & At(P2, JFK) & '
                'Cargo(C1) & Cargo(C2) & Plane(P1) & Plane(P2) & Airport(SFO) & Airport(JFK)',
        goals='At(C1, JFK) & At(C2, SFO)',
        actions=[Action('Load(c, p, a)',
                        precond='At(c, a) & At(p, a) & Cargo(c) & Plane(p) & Airport(a)',
                        effect='In(c, p) & ~At(c, a)'),
                 Action('Unload(c, p, a)',
                        precond='In(c, p) & At(p, a) & Cargo(c) & Plane(p) & Airport(a)',
                        effect='At(c, a) & ~In(c, p)'),
                 Action('Fly(p, f, to)',
                        precond='At(p, f) & Plane(p) & Airport(f) & Airport(to)',
                        effect='At(p, to) & ~At(p, f)')])
opensourceware's avatar
opensourceware a validé


def spare_tire():
    """[Figure 10.2] SPARE-TIRE-PROBLEM

    A problem involving changing the flat tire of a car
    with a spare tire from the trunk.

    Example:
    >>> from planning import *
    >>> st = spare_tire()
    >>> st.goal_test()
    False
    >>> st.act(expr('Remove(Spare, Trunk)'))
    >>> st.act(expr('Remove(Flat, Axle)'))
    >>> st.goal_test()
    False
    >>> st.act(expr('PutOn(Spare, Axle)'))
    >>> st.goal_test()
    True
    >>>
    """
    return PlanningProblem(initial='Tire(Flat) & Tire(Spare) & At(Flat, Axle) & At(Spare, Trunk)',
                           goals='At(Spare, Axle) & At(Flat, Ground)',
                           actions=[Action('Remove(obj, loc)',
                                           precond='At(obj, loc)',
                                           effect='At(obj, Ground) & ~At(obj, loc)'),
                                    Action('PutOn(t, Axle)',
                                           precond='Tire(t) & At(t, Ground) & ~At(Flat, Axle)',
                                           effect='At(t, Axle) & ~At(t, Ground)'),
                                    Action('LeaveOvernight',
                                           precond='',
                                           effect='~At(Spare, Ground) & ~At(Spare, Axle) & ~At(Spare, Trunk) & \
                                        ~At(Flat, Ground) & ~At(Flat, Axle) & ~At(Flat, Trunk)')])
def three_block_tower():
    """
    [Figure 10.3] THREE-BLOCK-TOWER

    A blocks-world problem of stacking three blocks in a certain configuration,
    also known as the Sussman Anomaly.

    Example:
    >>> from planning import *
    >>> tbt = three_block_tower()
    >>> tbt.goal_test()
    False
    >>> tbt.act(expr('MoveToTable(C, A)'))
    >>> tbt.act(expr('Move(B, Table, C)'))
    >>> tbt.goal_test()
    False
    >>> tbt.act(expr('Move(A, Table, B)'))
    >>> tbt.goal_test()
    True
    >>>
    """
    return PlanningProblem(
        initial='On(A, Table) & On(B, Table) & On(C, A) & Block(A) & Block(B) & Block(C) & Clear(B) & Clear(C)',
        goals='On(A, B) & On(B, C)',
        actions=[Action('Move(b, x, y)',
                        precond='On(b, x) & Clear(b) & Clear(y) & Block(b) & Block(y)',
                        effect='On(b, y) & Clear(x) & ~On(b, x) & ~Clear(y)'),
                 Action('MoveToTable(b, x)',
                        precond='On(b, x) & Clear(b) & Block(b)',
                        effect='On(b, Table) & Clear(x) & ~On(b, x)')])
def simple_blocks_world():
    """
    SIMPLE-BLOCKS-WORLD

    A simplified definition of the Sussman Anomaly problem.

    Example:
    >>> from planning import *
    >>> sbw = simple_blocks_world()
    >>> sbw.goal_test()
    False
    >>> sbw.act(expr('ToTable(A, B)'))
    >>> sbw.act(expr('FromTable(B, A)'))
    >>> sbw.goal_test()
    False
    >>> sbw.act(expr('FromTable(C, B)'))
    >>> sbw.goal_test()
    True
    >>>
    """

    return PlanningProblem(initial='On(A, B) & Clear(A) & OnTable(B) & OnTable(C) & Clear(C)',
                           goals='On(B, A) & On(C, B)',
                           actions=[Action('ToTable(x, y)',
                                           precond='On(x, y) & Clear(x)',
                                           effect='~On(x, y) & Clear(y) & OnTable(x)'),
                                    Action('FromTable(y, x)',
                                           precond='OnTable(y) & Clear(y) & Clear(x)',
                                           effect='~OnTable(y) & ~Clear(x) & On(y, x)')])
    """
    [Figure 10.7] CAKE-PROBLEM

    A problem where we begin with a cake and want to
    reach the state of having a cake and having eaten a cake.
    The possible actions include baking a cake and eating a cake.

    Example:
    >>> from planning import *
    >>> cp = have_cake_and_eat_cake_too()
    >>> cp.goal_test()
    False
    >>> cp.act(expr('Eat(Cake)'))
    >>> cp.goal_test()
    False
    >>> cp.act(expr('Bake(Cake)'))
    >>> cp.goal_test()
    True
    >>>
    """
    return PlanningProblem(initial='Have(Cake)',
                           goals='Have(Cake) & Eaten(Cake)',
                           actions=[Action('Eat(Cake)',
                                           precond='Have(Cake)',
                                           effect='Eaten(Cake) & ~Have(Cake)'),
                                    Action('Bake(Cake)',
                                           precond='~Have(Cake)',
                                           effect='Have(Cake)')])
def shopping_problem():
    """
    SHOPPING-PROBLEM

    A problem of acquiring some items given their availability at certain stores.

    Example:
    >>> from planning import *
    >>> sp = shopping_problem()
    >>> sp.goal_test()
    False
    >>> sp.act(expr('Go(Home, HW)'))
    >>> sp.act(expr('Buy(Drill, HW)'))
    >>> sp.act(expr('Go(HW, SM)'))
    >>> sp.act(expr('Buy(Banana, SM)'))
    >>> sp.goal_test()
    False
    >>> sp.act(expr('Buy(Milk, SM)'))
    >>> sp.goal_test()
    True
    >>>
    """
    return PlanningProblem(initial='At(Home) & Sells(SM, Milk) & Sells(SM, Banana) & Sells(HW, Drill)',
                           goals='Have(Milk) & Have(Banana) & Have(Drill)',
                           actions=[Action('Buy(x, store)',
                                           precond='At(store) & Sells(store, x)',
                                           effect='Have(x)'),
                                    Action('Go(x, y)',
                                           precond='At(x)',
                                           effect='At(y) & ~At(x)')])
def socks_and_shoes():
    """
    SOCKS-AND-SHOES-PROBLEM

    A task of wearing socks and shoes on both feet

    Example:
    >>> from planning import *
    >>> ss = socks_and_shoes()
    >>> ss.goal_test()
    False
    >>> ss.act(expr('RightSock'))
    >>> ss.act(expr('RightShoe'))
    >>> ss.act(expr('LeftSock'))
    >>> ss.goal_test()
    False
    >>> ss.act(expr('LeftShoe'))
    >>> ss.goal_test()
    True
    >>>
    """
    return PlanningProblem(initial='',
                           goals='RightShoeOn & LeftShoeOn',
                           actions=[Action('RightShoe',
                                           precond='RightSockOn',
                                           effect='RightShoeOn'),
                                    Action('RightSock',
                                           precond='',
                                           effect='RightSockOn'),
                                    Action('LeftShoe',
                                           precond='LeftSockOn',
                                           effect='LeftShoeOn'),
                                    Action('LeftSock',
                                           precond='',
                                           effect='LeftSockOn')])
Aman Deep Singh's avatar
Aman Deep Singh a validé
def double_tennis_problem():
    """
    [Figure 11.10] DOUBLE-TENNIS-PROBLEM

    A multiagent planning problem involving two partner tennis players
    trying to return an approaching ball and repositioning around in the court.

    Example:
    >>> from planning import *
    >>> dtp = double_tennis_problem()
    False
    >>> dtp.act(expr('Go(A, RightBaseLine, LeftBaseLine)'))
    >>> dtp.act(expr('Hit(A, Ball, RightBaseLine)'))
    False
    >>> dtp.act(expr('Go(A, LeftNet, RightBaseLine)'))
    return PlanningProblem(
        initial='At(A, LeftBaseLine) & At(B, RightNet) & Approaching(Ball, RightBaseLine) & Partner(A, B) & Partner(B, A)',
        goals='Returned(Ball) & At(a, LeftNet) & At(a, RightNet)',
        actions=[Action('Hit(actor, Ball, loc)',
                        precond='Approaching(Ball, loc) & At(actor, loc)',
                        effect='Returned(Ball)'),
                 Action('Go(actor, to, loc)',
                        precond='At(actor, loc)',
                        effect='At(actor, to) & ~At(actor, loc)')])


class ForwardPlan(search.Problem):
    """
    Forward state-space search [Section 10.2.1]
    """

    def __init__(self, planning_problem):
        super().__init__(associate('&', planning_problem.initial), associate('&', planning_problem.goals))
        self.planning_problem = planning_problem
        self.expanded_actions = self.planning_problem.expand_actions()

    def actions(self, state):
        return [action for action in self.expanded_actions if all(pre in conjuncts(state) for pre in action.precond)]

    def result(self, state, action):
        return associate('&', action(conjuncts(state), action.args).clauses)

    def goal_test(self, state):
        return all(goal in conjuncts(state) for goal in self.planning_problem.goals)

    def h(self, state):
        """
        Computes ignore delete lists heuristic by creating a relaxed version of the original problem (we can do that
        by removing the delete lists from all actions, ie. removing all negative literals from effects) that will be
        easier to solve through GraphPlan and where the length of the solution will serve as a good heuristic.
        """
        relaxed_planning_problem = PlanningProblem(initial=state.state,
                                                   goals=self.goal,
                                                   actions=[action.relaxed() for action in
                                                            self.planning_problem.actions])
        try:
            return len(linearize(GraphPlan(relaxed_planning_problem).execute()))
        except:
            return float('inf')


class BackwardPlan(search.Problem):
    """
    Backward relevant-states search [Section 10.2.2]
    """

    def __init__(self, planning_problem):
        super().__init__(associate('&', planning_problem.goals), associate('&', planning_problem.initial))
        self.planning_problem = planning_problem
        self.expanded_actions = self.planning_problem.expand_actions()

    def actions(self, subgoal):
        """
        Returns True if the action is relevant to the subgoal, ie.:
        - the action achieves an element of the effects
        - the action doesn't delete something that needs to be achieved
        - the preconditions are consistent with other subgoals that need to be achieved
        """

        def negate_clause(clause):
            return Expr(clause.op.replace('Not', ''), *clause.args) if clause.op[:3] == 'Not' else Expr(
                'Not' + clause.op, *clause.args)

        subgoal = conjuncts(subgoal)
        return [action for action in self.expanded_actions if
                (any(prop in action.effect for prop in subgoal) and
                 not any(negate_clause(prop) in subgoal for prop in action.effect) and
                 not any(negate_clause(prop) in subgoal and negate_clause(prop) not in action.effect
                         for prop in action.precond))]

    def result(self, subgoal, action):
        # g' = (g - effects(a)) + preconds(a)
        return associate('&', set(set(conjuncts(subgoal)).difference(action.effect)).union(action.precond))

    def goal_test(self, subgoal):
        return all(goal in conjuncts(self.goal) for goal in conjuncts(subgoal))

    def h(self, subgoal):
        """
        Computes ignore delete lists heuristic by creating a relaxed version of the original problem (we can do that
        by removing the delete lists from all actions, ie. removing all negative literals from effects) that will be
        easier to solve through GraphPlan and where the length of the solution will serve as a good heuristic.
        """
        relaxed_planning_problem = PlanningProblem(initial=self.goal,
                                                   goals=subgoal.state,
                                                   actions=[action.relaxed() for action in
                                                            self.planning_problem.actions])
        try:
            return len(linearize(GraphPlan(relaxed_planning_problem).execute()))
        except:
            return float('inf')


def SATPlan(planning_problem, solution_length, SAT_solver=dpll_satisfiable):
    """
    Planning as Boolean satisfiability [Section 10.4.1]
    """

    def expand_transitions(state, actions):
        state = sorted(conjuncts(state))
        for action in filter(lambda act: act.check_precond(state, act.args), actions):
            transition[associate('&', state)].update(
                {Expr(action.name, *action.args):
                     associate('&', sorted(set(filter(lambda clause: clause.op[:3] != 'Not',
                                                      action(state, action.args).clauses))))
                     if planning_problem.is_strips()
                     else associate('&', sorted(set(action(state, action.args).clauses)))})
        for state in transition[associate('&', state)].values():
            if state not in transition:
                expand_transitions(expr(state), actions)

    transition = defaultdict(dict)
    expand_transitions(associate('&', planning_problem.initial), planning_problem.expand_actions())

    return SAT_plan(associate('&', sorted(planning_problem.initial)), transition,
                    associate('&', sorted(planning_problem.goals)), solution_length, SAT_solver=SAT_solver)
class Level:
Manpreet Kaur's avatar
Manpreet Kaur a validé
    """
    Contains the state of the planning problem
    and exhaustive list of actions which use the
    states as pre-condition.
    """

    def __init__(self, kb):
        """Initializes variables to hold state and action details of a level"""

        self.kb = kb
        # current state
        self.current_state = kb.clauses
        # current action to state link
        self.current_action_links = {}
        # current state to action link
        self.current_state_links = {}
        # current action to next state link
Manpreet Kaur's avatar
Manpreet Kaur a validé
        self.next_action_links = {}
        # next state to current action link
        self.next_state_links = {}
        # mutually exclusive actions
Manpreet Kaur's avatar
Manpreet Kaur a validé
        self.mutex = []

    def __call__(self, actions, objects):
        self.build(actions, objects)
        self.find_mutex()

    def separate(self, e):
        """Separates an iterable of elements into positive and negative parts"""

        positive = []
        negative = []
        for clause in e:
            if clause.op[:3] == 'Not':
                negative.append(clause)
            else:
                positive.append(clause)
        return positive, negative

Manpreet Kaur's avatar
Manpreet Kaur a validé
    def find_mutex(self):
        """Finds mutually exclusive actions"""

        # Inconsistent effects
        pos_nsl, neg_nsl = self.separate(self.next_state_links)

        for negeff in neg_nsl:
            new_negeff = Expr(negeff.op[3:], *negeff.args)
            for poseff in pos_nsl:
                if new_negeff == poseff:
                    for a in self.next_state_links[poseff]:
                        for b in self.next_state_links[negeff]:
                            if {a, b} not in self.mutex:
                                self.mutex.append({a, b})

        # Interference will be calculated with the last step
        pos_csl, neg_csl = self.separate(self.current_state_links)
        # Competing needs
        for pos_precond in pos_csl:
            for neg_precond in neg_csl:
                new_neg_precond = Expr(neg_precond.op[3:], *neg_precond.args)
                if new_neg_precond == pos_precond:
                    for a in self.current_state_links[pos_precond]:
                        for b in self.current_state_links[neg_precond]:
                            if {a, b} not in self.mutex:
                                self.mutex.append({a, b})
        # Inconsistent support
Manpreet Kaur's avatar
Manpreet Kaur a validé
        state_mutex = []
        for pair in self.mutex:
            next_state_0 = self.next_action_links[list(pair)[0]]
            if len(pair) == 2:
                next_state_1 = self.next_action_links[list(pair)[1]]
            else:
                next_state_1 = self.next_action_links[list(pair)[0]]
            if (len(next_state_0) == 1) and (len(next_state_1) == 1):
                state_mutex.append({next_state_0[0], next_state_1[0]})
        self.mutex = self.mutex + state_mutex
Manpreet Kaur's avatar
Manpreet Kaur a validé

    def build(self, actions, objects):
        """Populates the lists and dictionaries containing the state action dependencies"""
        for clause in self.current_state:
            p_expr = Expr('P' + clause.op, *clause.args)
            self.current_action_links[p_expr] = [clause]
            self.next_action_links[p_expr] = [clause]
            self.current_state_links[clause] = [p_expr]
            self.next_state_links[clause] = [p_expr]
Manpreet Kaur's avatar
Manpreet Kaur a validé

        for a in actions:
            num_args = len(a.args)
            possible_args = tuple(itertools.permutations(objects, num_args))

            for arg in possible_args:
                if a.check_precond(self.kb, arg):
Manpreet Kaur's avatar
Manpreet Kaur a validé
                    for num, symbol in enumerate(a.args):
                        if not symbol.op.islower():
                            arg = list(arg)
                            arg[num] = symbol
                            arg = tuple(arg)

                    new_action = a.substitute(Expr(a.name, *a.args), arg)
                    self.current_action_links[new_action] = []
                    for clause in a.precond:
Manpreet Kaur's avatar
Manpreet Kaur a validé
                        new_clause = a.substitute(clause, arg)
                        self.current_action_links[new_action].append(new_clause)
                        if new_clause in self.current_state_links:
                            self.current_state_links[new_clause].append(new_action)
Manpreet Kaur's avatar
Manpreet Kaur a validé
                        else:
                            self.current_state_links[new_clause] = [new_action]
Manpreet Kaur's avatar
Manpreet Kaur a validé
                    self.next_action_links[new_action] = []
                    for clause in a.effect:
Manpreet Kaur's avatar
Manpreet Kaur a validé
                        new_clause = a.substitute(clause, arg)

                        self.next_action_links[new_action].append(new_clause)
                        if new_clause in self.next_state_links:
                            self.next_state_links[new_clause].append(new_action)
Manpreet Kaur's avatar
Manpreet Kaur a validé
                        else:
                            self.next_state_links[new_clause] = [new_action]
Manpreet Kaur's avatar
Manpreet Kaur a validé

    def perform_actions(self):
        """Performs the necessary actions and returns a new Level"""
        new_kb = FolKB(list(set(self.next_state_links.keys())))
        return Level(new_kb)
Manpreet Kaur's avatar
Manpreet Kaur a validé


class Graph:
    """
    Contains levels of state and actions
    Used in graph planning algorithm to extract a solution
    """

    def __init__(self, planning_problem):
        self.planning_problem = planning_problem
        self.kb = FolKB(planning_problem.initial)
        self.levels = [Level(self.kb)]
        self.objects = set(arg for clause in self.kb.clauses for arg in clause.args)
    def __call__(self):
        self.expand_graph()
Manpreet Kaur's avatar
Manpreet Kaur a validé

    def expand_graph(self):
        """Expands the graph by a level"""

Manpreet Kaur's avatar
Manpreet Kaur a validé
        last_level = self.levels[-1]
        last_level(self.planning_problem.actions, self.objects)
Manpreet Kaur's avatar
Manpreet Kaur a validé
        self.levels.append(last_level.perform_actions())

    def non_mutex_goals(self, goals, index):
        """Checks whether the goals are mutually exclusive"""

Manpreet Kaur's avatar
Manpreet Kaur a validé
        goal_perm = itertools.combinations(goals, 2)
        for g in goal_perm:
            if set(g) in self.levels[index].mutex:
                return False
        return True


class GraphPlan:
    """
    Class for formulation GraphPlan algorithm
    Constructs a graph of state and action space
    Returns solution for the planning problem
    """

    def __init__(self, planning_problem):
        self.graph = Graph(planning_problem)
        self.no_goods = []
Manpreet Kaur's avatar
Manpreet Kaur a validé
        self.solution = []

    def check_leveloff(self):
        """Checks if the graph has levelled off"""
        check = (set(self.graph.levels[-1].current_state) == set(self.graph.levels[-2].current_state))

        if check:
Manpreet Kaur's avatar
Manpreet Kaur a validé
            return True

    def extract_solution(self, goals, index):
        """Extracts the solution"""

        if not self.graph.non_mutex_goals(goals, index):
        # Create all combinations of actions that satisfy the goal
Manpreet Kaur's avatar
Manpreet Kaur a validé
        actions = []
        for goal in goals:
        all_actions = list(itertools.product(*actions))
        # Filter out non-mutex actions
Manpreet Kaur's avatar
Manpreet Kaur a validé
        for action_tuple in all_actions:
            action_pairs = itertools.combinations(list(set(action_tuple)), 2)
            non_mutex_actions.append(list(set(action_tuple)))
            for pair in action_pairs:
Manpreet Kaur's avatar
Manpreet Kaur a validé
                if set(pair) in level.mutex:
                    non_mutex_actions.pop(-1)
                    break

        # Recursion
Manpreet Kaur's avatar
Manpreet Kaur a validé
            if [action_list, index] not in self.solution:
                self.solution.append([action_list, index])

                new_goals = []
                    if act in level.current_action_links:
                        new_goals = new_goals + level.current_action_links[act]
                if abs(index) + 1 == len(self.graph.levels):
Manpreet Kaur's avatar
Manpreet Kaur a validé
                    return
Manpreet Kaur's avatar
Manpreet Kaur a validé
                    return
                else:
                    self.extract_solution(new_goals, index - 1)
        # Level-Order multiple solutions
Manpreet Kaur's avatar
Manpreet Kaur a validé
        solution = []
        for item in self.solution:
            if item[1] == -1:
                solution.append([])
                solution[-1].append(item[0])
            else:
                solution[-1].append(item[0])

        for num, item in enumerate(solution):
            item.reverse()
            solution[num] = item

        return solution

Aman Deep Singh's avatar
Aman Deep Singh a validé
    def goal_test(self, kb):
        return all(kb.ask(q) is not False for q in self.graph.planning_problem.goals)
Aman Deep Singh's avatar
Aman Deep Singh a validé
    def execute(self):
        """Executes the GraphPlan algorithm for the given problem"""
Aman Deep Singh's avatar
Aman Deep Singh a validé
        while True:
            self.graph.expand_graph()
            if (self.goal_test(self.graph.levels[-1].kb) and self.graph.non_mutex_goals(
                    self.graph.planning_problem.goals, -1)):
                solution = self.extract_solution(self.graph.planning_problem.goals, -1)
Aman Deep Singh's avatar
Aman Deep Singh a validé
                if solution:
                    return solution
Aman Deep Singh's avatar
Aman Deep Singh a validé
            if len(self.graph.levels) >= 2 and self.check_leveloff():
                return None
class Linearize:
    def __init__(self, planning_problem):
        self.planning_problem = planning_problem
Aman Deep Singh's avatar
Aman Deep Singh a validé
    def filter(self, solution):
        """Filter out persistence actions from a solution"""

        new_solution = []
        for section in solution[0]:
            new_section = []
            for operation in section:
                if not (operation.op[0] == 'P' and operation.op[1].isupper()):
                    new_section.append(operation)
            new_solution.append(new_section)
        return new_solution

Aman Deep Singh's avatar
Aman Deep Singh a validé
        """Return valid linear order of actions for a given level"""

        for permutation in itertools.permutations(level):
Aman Deep Singh's avatar
Aman Deep Singh a validé
            count = 0
            for action in permutation:
                try:
                    temp.act(action)
                    count += 1
                except:
                    count = 0
Aman Deep Singh's avatar
Aman Deep Singh a validé
                    break
            if count == len(permutation):
                return list(permutation), temp
        return None
Aman Deep Singh's avatar
Aman Deep Singh a validé
    def execute(self):
        """Finds total-order solution for a planning graph"""
        graphplan_solution = GraphPlan(self.planning_problem).execute()
Aman Deep Singh's avatar
Aman Deep Singh a validé
        filtered_solution = self.filter(graphplan_solution)
        ordered_solution = []
Aman Deep Singh's avatar
Aman Deep Singh a validé
        for level in filtered_solution:
            level_solution, planning_problem = self.orderlevel(level, planning_problem)
Aman Deep Singh's avatar
Aman Deep Singh a validé
            for element in level_solution:
                ordered_solution.append(element)
Aman Deep Singh's avatar
Aman Deep Singh a validé
        return ordered_solution
def linearize(solution):
    """Converts a level-ordered solution into a linear solution"""

    linear_solution = []
    for section in solution[0]:
        for operation in section:
            if not (operation.op[0] == 'P' and operation.op[1].isupper()):
                linear_solution.append(operation)

    return linear_solution


class PartialOrderPlanner:
    """
    [Section 10.13] PARTIAL-ORDER-PLANNER

    Partially ordered plans are created by a search through the space of plans
    rather than a search through the state space. It views planning as a refinement of partially ordered plans.
    A partially ordered plan is defined by a set of actions and a set of constraints of the form A < B,
    which denotes that action A has to be performed before action B.
    To summarize the working of a partial order planner,
    1. An open precondition is selected (a sub-goal that we want to achieve).
    2. An action that fulfils the open precondition is chosen.
    3. Temporal constraints are updated.
    4. Existing causal links are protected. Protection is a method that checks if the causal links conflict
       and if they do, temporal constraints are added to fix the threats.
    5. The set of open preconditions is updated.
    6. Temporal constraints of the selected action and the next action are established.
    7. A new causal link is added between the selected action and the owner of the open precondition.
    8. The set of new causal links is checked for threats and if found, the threat is removed by either promotion or
       demotion. If promotion or demotion is unable to solve the problem, the planning problem cannot be solved with
       the current sequence of actions or it may not be solvable at all.
    9. These steps are repeated until the set of open preconditions is empty.
    """
    def __init__(self, planning_problem):
        self.tries = 1
        self.planning_problem = planning_problem
        self.causal_links = []
        self.start = Action('Start', [], self.planning_problem.initial)
        self.finish = Action('Finish', self.planning_problem.goals, [])
        self.actions = set()
        self.actions.add(self.start)
        self.actions.add(self.finish)
        self.constraints = set()
        self.constraints.add((self.start, self.finish))
        self.agenda = set()
        for precond in self.finish.precond:
            self.agenda.add((precond, self.finish))
        self.expanded_actions = planning_problem.expand_actions()

    def find_open_precondition(self):
        """Find open precondition with the least number of possible actions"""

        number_of_ways = dict()
        actions_for_precondition = dict()
        for element in self.agenda:
            open_precondition = element[0]
            possible_actions = list(self.actions) + self.expanded_actions
            for action in possible_actions:
                for effect in action.effect:
                    if effect == open_precondition:
                        if open_precondition in number_of_ways:
                            number_of_ways[open_precondition] += 1
                            actions_for_precondition[open_precondition].append(action)
                        else:
                            number_of_ways[open_precondition] = 1
                            actions_for_precondition[open_precondition] = [action]

        number = sorted(number_of_ways, key=number_of_ways.__getitem__)
        for k, v in number_of_ways.items():
            if v == 0:
                return None, None, None