{ "cells": [ { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "# Planning\n", "#### Chapters 10-11\n", "----" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook serves as supporting material for topics covered in **Chapter 10 - Classical Planning** and **Chapter 11 - Planning and Acting in the Real World** from the book *[Artificial Intelligence: A Modern Approach](http://aima.cs.berkeley.edu)*. \n", "This notebook uses implementations from the [planning.py](https://github.com/aimacode/aima-python/blob/master/planning.py) module. \n", "See the [intro notebook](https://github.com/aimacode/aima-python/blob/master/intro.ipynb) for instructions.\n", "\n", "We'll start by looking at `PDDL` and `Action` data types for defining problems and actions. \n", "Then, we will see how to use them by trying to plan a trip from *Sibiu* to *Bucharest* across the familiar map of Romania, from [search.ipynb](https://github.com/aimacode/aima-python/blob/master/search.ipynb) \n", "followed by some common planning problems and methods of solving them.\n", "\n", "Let's start by importing everything from the planning module." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from planning import *\n", "from notebook import psource" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## CONTENTS\n", "\n", "- PDDL\n", "- Action\n", "- Planning Problems\n", " * Air cargo problem\n", " * Spare tire problem\n", " * Three block tower problem\n", " * Shopping Problem\n", " * Cake problem\n", "- Solving Planning Problems\n", " * GraphPlan" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## PDDL\n", "\n", "PDDL stands for Planning Domain Definition Language.\n", "The `PDDL` class is used to represent planning problems in this module. The following attributes are essential to be able to define a problem:\n", "* an initial state\n", "* a set of goals\n", "* a set of viable actions that can be executed in the search space of the problem\n", "\n", "View the source to see how the Python code tries to realise these." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "
\n", "class PDDL:\n",
" """\n",
" Planning Domain Definition Language (PDDL) used to define a search problem.\n",
" It stores states in a knowledge base consisting of first order logic statements.\n",
" The conjunction of these logical statements completely defines a state.\n",
" """\n",
"\n",
" def __init__(self, init, goals, actions):\n",
" self.init = self.convert(init)\n",
" self.goals = expr(goals)\n",
" self.actions = actions\n",
"\n",
" def convert(self, init):\n",
" """Converts strings into exprs"""\n",
" try:\n",
" init = conjuncts(expr(init))\n",
" except AttributeError:\n",
" init = expr(init)\n",
" return init\n",
"\n",
" def goal_test(self):\n",
" """Checks if the goals have been reached"""\n",
" return all(goal in self.init for goal in conjuncts(self.goals))\n",
"\n",
" def act(self, action):\n",
" """\n",
" Performs the action given as argument.\n",
" Note that action is an Expr like expr('Remove(Glass, Table)') or expr('Eat(Sandwich)')\n",
" """ \n",
" action_name = action.op\n",
" args = action.args\n",
" list_action = first(a for a in self.actions if a.name == action_name)\n",
" if list_action is None:\n",
" raise Exception("Action '{}' not found".format(action_name))\n",
" if not list_action.check_precond(self.init, args):\n",
" raise Exception("Action '{}' pre-conditions not satisfied".format(action))\n",
" self.init = list_action(self.init, args).clauses\n",
"
class Action:\n",
" """\n",
" Defines an action schema using preconditions and effects.\n",
" Use this to describe actions in PDDL.\n",
" action is an Expr where variables are given as arguments(args).\n",
" Precondition and effect are both lists with positive and negative literals.\n",
" Negative preconditions and effects are defined by adding a 'Not' before the name of the clause\n",
" Example:\n",
" precond = [expr("Human(person)"), expr("Hungry(Person)"), expr("NotEaten(food)")]\n",
" effect = [expr("Eaten(food)"), expr("Hungry(person)")]\n",
" eat = Action(expr("Eat(person, food)"), precond, effect)\n",
" """\n",
"\n",
" def __init__(self, action, precond, effect):\n",
" action = expr(action)\n",
" self.name = action.op\n",
" self.args = action.args\n",
" self.precond, self.effect = self.convert(precond, effect)\n",
"\n",
" def __call__(self, kb, args):\n",
" return self.act(kb, args)\n",
"\n",
" def convert(self, precond, effect):\n",
" """Converts strings into Exprs"""\n",
"\n",
" precond = precond.replace('~', 'Not')\n",
" if len(precond) > 0:\n",
" precond = expr(precond)\n",
" effect = effect.replace('~', 'Not')\n",
" if len(effect) > 0:\n",
" effect = expr(effect)\n",
"\n",
" try:\n",
" precond = conjuncts(precond)\n",
" except AttributeError:\n",
" pass\n",
" try:\n",
" effect = conjuncts(effect)\n",
" except AttributeError:\n",
" pass\n",
"\n",
" return precond, effect\n",
"\n",
" def substitute(self, e, args):\n",
" """Replaces variables in expression with their respective Propositional symbol"""\n",
"\n",
" new_args = list(e.args)\n",
" for num, x in enumerate(e.args):\n",
" for i, _ in enumerate(self.args):\n",
" if self.args[i] == x:\n",
" new_args[num] = args[i]\n",
" return Expr(e.op, *new_args)\n",
"\n",
" def check_precond(self, kb, args):\n",
" """Checks if the precondition is satisfied in the current state"""\n",
"\n",
" if isinstance(kb, list):\n",
" kb = FolKB(kb)\n",
"\n",
" for clause in self.precond:\n",
" if self.substitute(clause, args) not in kb.clauses:\n",
" return False\n",
" return True\n",
"\n",
" def act(self, kb, args):\n",
" """Executes the action on the state's knowledge base"""\n",
"\n",
" if isinstance(kb, list):\n",
" kb = FolKB(kb)\n",
"\n",
" if not self.check_precond(kb, args):\n",
" raise Exception('Action pre-conditions not satisfied')\n",
" for clause in self.effect:\n",
" kb.tell(self.substitute(clause, args))\n",
" if clause.op[:3] == 'Not':\n",
" new_clause = Expr(clause.op[3:], *clause.args)\n",
"\n",
" if kb.ask(self.substitute(new_clause, args)) is not False:\n",
" kb.retract(self.substitute(new_clause, args))\n",
" else:\n",
" new_clause = Expr('Not' + clause.op, *clause.args)\n",
"\n",
" if kb.ask(self.substitute(new_clause, args)) is not False: \n",
" kb.retract(self.substitute(new_clause, args))\n",
"\n",
" return kb\n",
"
def air_cargo():\n",
" """Air cargo problem"""\n",
"\n",
" return PDDL(init='At(C1, SFO) & At(C2, JFK) & At(P1, SFO) & At(P2, JFK) & Cargo(C1) & Cargo(C2) & Plane(P1) & Plane(P2) & Airport(SFO) & Airport(JFK)',\n",
" goals='At(C1, JFK) & At(C2, SFO)', \n",
" actions=[Action('Load(c, p, a)', \n",
" precond='At(c, a) & At(p, a) & Cargo(c) & Plane(p) & Airport(a)', \n",
" effect='In(c, p) & ~At(c, a)'),\n",
" Action('Unload(c, p, a)',\n",
" precond='In(c, p) & At(p, a) & Cargo(c) & Plane(p) & Airport(a)',\n",
" effect='At(c, a) & ~In(c, p)'),\n",
" Action('Fly(p, f, to)',\n",
" precond='At(p, f) & Plane(p) & Airport(f) & Airport(to)',\n",
" effect='At(p, to) & ~At(p, f)')])\n",
"
def spare_tire():\n",
" """Spare tire problem"""\n",
"\n",
" return PDDL(init='Tire(Flat) & Tire(Spare) & At(Flat, Axle) & At(Spare, Trunk)',\n",
" goals='At(Spare, Axle) & At(Flat, Ground)',\n",
" actions=[Action('Remove(obj, loc)',\n",
" precond='At(obj, loc)',\n",
" effect='At(obj, Ground) & ~At(obj, loc)'),\n",
" Action('PutOn(t, Axle)',\n",
" precond='Tire(t) & At(t, Ground) & ~At(Flat, Axle)',\n",
" effect='At(t, Axle) & ~At(t, Ground)'),\n",
" Action('LeaveOvernight',\n",
" precond='',\n",
" effect='~At(Spare, Ground) & ~At(Spare, Axle) & ~At(Spare, Trunk) & \\\n",
" ~At(Flat, Ground) & ~At(Flat, Axle) & ~At(Flat, Trunk)')])\n",
"
def three_block_tower():\n",
" """Sussman Anomaly problem"""\n",
"\n",
" return PDDL(init='On(A, Table) & On(B, Table) & On(C, A) & Block(A) & Block(B) & Block(C) & Clear(B) & Clear(C)',\n",
" goals='On(A, B) & On(B, C)',\n",
" actions=[Action('Move(b, x, y)',\n",
" precond='On(b, x) & Clear(b) & Clear(y) & Block(b) & Block(y)',\n",
" effect='On(b, y) & Clear(x) & ~On(b, x) & ~Clear(y)'),\n",
" Action('MoveToTable(b, x)',\n",
" precond='On(b, x) & Clear(b) & Block(b)',\n",
" effect='On(b, Table) & Clear(x) & ~On(b, x)')])\n",
"
def shopping_problem():\n",
" """Shopping problem"""\n",
"\n",
" return PDDL(init='At(Home) & Sells(SM, Milk) & Sells(SM, Banana) & Sells(HW, Drill)',\n",
" goals='Have(Milk) & Have(Banana) & Have(Drill)', \n",
" actions=[Action('Buy(x, store)',\n",
" precond='At(store) & Sells(store, x)',\n",
" effect='Have(x)'),\n",
" Action('Go(x, y)',\n",
" precond='At(x)',\n",
" effect='At(y) & ~At(x)')])\n",
"
def have_cake_and_eat_cake_too():\n",
" """Cake problem"""\n",
"\n",
" return PDDL(init='Have(Cake)',\n",
" goals='Have(Cake) & Eaten(Cake)',\n",
" actions=[Action('Eat(Cake)',\n",
" precond='Have(Cake)',\n",
" effect='Eaten(Cake) & ~Have(Cake)'),\n",
" Action('Bake(Cake)',\n",
" precond='~Have(Cake)',\n",
" effect='Have(Cake)')])\n",
"
class Level:\n",
" """\n",
" Contains the state of the planning problem\n",
" and exhaustive list of actions which use the\n",
" states as pre-condition.\n",
" """\n",
"\n",
" def __init__(self, kb):\n",
" """Initializes variables to hold state and action details of a level"""\n",
"\n",
" self.kb = kb\n",
" # current state\n",
" self.current_state = kb.clauses\n",
" # current action to state link\n",
" self.current_action_links = {}\n",
" # current state to action link\n",
" self.current_state_links = {}\n",
" # current action to next state link\n",
" self.next_action_links = {}\n",
" # next state to current action link\n",
" self.next_state_links = {}\n",
" # mutually exclusive actions\n",
" self.mutex = []\n",
"\n",
" def __call__(self, actions, objects):\n",
" self.build(actions, objects)\n",
" self.find_mutex()\n",
"\n",
" def separate(self, e):\n",
" """Separates an iterable of elements into positive and negative parts"""\n",
"\n",
" positive = []\n",
" negative = []\n",
" for clause in e:\n",
" if clause.op[:3] == 'Not':\n",
" negative.append(clause)\n",
" else:\n",
" positive.append(clause)\n",
" return positive, negative\n",
"\n",
" def find_mutex(self):\n",
" """Finds mutually exclusive actions"""\n",
"\n",
" # Inconsistent effects\n",
" pos_nsl, neg_nsl = self.separate(self.next_state_links)\n",
"\n",
" for negeff in neg_nsl:\n",
" new_negeff = Expr(negeff.op[3:], *negeff.args)\n",
" for poseff in pos_nsl:\n",
" if new_negeff == poseff:\n",
" for a in self.next_state_links[poseff]:\n",
" for b in self.next_state_links[negeff]:\n",
" if {a, b} not in self.mutex:\n",
" self.mutex.append({a, b})\n",
"\n",
" # Interference will be calculated with the last step\n",
" pos_csl, neg_csl = self.separate(self.current_state_links)\n",
"\n",
" # Competing needs\n",
" for posprecond in pos_csl:\n",
" for negprecond in neg_csl:\n",
" new_negprecond = Expr(negprecond.op[3:], *negprecond.args)\n",
" if new_negprecond == posprecond:\n",
" for a in self.current_state_links[posprecond]:\n",
" for b in self.current_state_links[negprecond]:\n",
" if {a, b} not in self.mutex:\n",
" self.mutex.append({a, b})\n",
"\n",
" # Inconsistent support\n",
" state_mutex = []\n",
" for pair in self.mutex:\n",
" next_state_0 = self.next_action_links[list(pair)[0]]\n",
" if len(pair) == 2:\n",
" next_state_1 = self.next_action_links[list(pair)[1]]\n",
" else:\n",
" next_state_1 = self.next_action_links[list(pair)[0]]\n",
" if (len(next_state_0) == 1) and (len(next_state_1) == 1):\n",
" state_mutex.append({next_state_0[0], next_state_1[0]})\n",
" \n",
" self.mutex = self.mutex + state_mutex\n",
"\n",
" def build(self, actions, objects):\n",
" """Populates the lists and dictionaries containing the state action dependencies"""\n",
"\n",
" for clause in self.current_state:\n",
" p_expr = Expr('P' + clause.op, *clause.args)\n",
" self.current_action_links[p_expr] = [clause]\n",
" self.next_action_links[p_expr] = [clause]\n",
" self.current_state_links[clause] = [p_expr]\n",
" self.next_state_links[clause] = [p_expr]\n",
"\n",
" for a in actions:\n",
" num_args = len(a.args)\n",
" possible_args = tuple(itertools.permutations(objects, num_args))\n",
"\n",
" for arg in possible_args:\n",
" if a.check_precond(self.kb, arg):\n",
" for num, symbol in enumerate(a.args):\n",
" if not symbol.op.islower():\n",
" arg = list(arg)\n",
" arg[num] = symbol\n",
" arg = tuple(arg)\n",
"\n",
" new_action = a.substitute(Expr(a.name, *a.args), arg)\n",
" self.current_action_links[new_action] = []\n",
"\n",
" for clause in a.precond:\n",
" new_clause = a.substitute(clause, arg)\n",
" self.current_action_links[new_action].append(new_clause)\n",
" if new_clause in self.current_state_links:\n",
" self.current_state_links[new_clause].append(new_action)\n",
" else:\n",
" self.current_state_links[new_clause] = [new_action]\n",
" \n",
" self.next_action_links[new_action] = []\n",
" for clause in a.effect:\n",
" new_clause = a.substitute(clause, arg)\n",
"\n",
" self.next_action_links[new_action].append(new_clause)\n",
" if new_clause in self.next_state_links:\n",
" self.next_state_links[new_clause].append(new_action)\n",
" else:\n",
" self.next_state_links[new_clause] = [new_action]\n",
"\n",
" def perform_actions(self):\n",
" """Performs the necessary actions and returns a new Level"""\n",
"\n",
" new_kb = FolKB(list(set(self.next_state_links.keys())))\n",
" return Level(new_kb)\n",
"
class Graph:\n",
" """\n",
" Contains levels of state and actions\n",
" Used in graph planning algorithm to extract a solution\n",
" """\n",
"\n",
" def __init__(self, pddl):\n",
" self.pddl = pddl\n",
" self.kb = FolKB(pddl.init)\n",
" self.levels = [Level(self.kb)]\n",
" self.objects = set(arg for clause in self.kb.clauses for arg in clause.args)\n",
"\n",
" def __call__(self):\n",
" self.expand_graph()\n",
"\n",
" def expand_graph(self):\n",
" """Expands the graph by a level"""\n",
"\n",
" last_level = self.levels[-1]\n",
" last_level(self.pddl.actions, self.objects)\n",
" self.levels.append(last_level.perform_actions())\n",
"\n",
" def non_mutex_goals(self, goals, index):\n",
" """Checks whether the goals are mutually exclusive"""\n",
"\n",
" goal_perm = itertools.combinations(goals, 2)\n",
" for g in goal_perm:\n",
" if set(g) in self.levels[index].mutex:\n",
" return False\n",
" return True\n",
"
class GraphPlan:\n",
" """\n",
" Class for formulation GraphPlan algorithm\n",
" Constructs a graph of state and action space\n",
" Returns solution for the planning problem\n",
" """\n",
"\n",
" def __init__(self, pddl):\n",
" self.graph = Graph(pddl)\n",
" self.nogoods = []\n",
" self.solution = []\n",
"\n",
" def check_leveloff(self):\n",
" """Checks if the graph has levelled off"""\n",
"\n",
" check = (set(self.graph.levels[-1].current_state) == set(self.graph.levels[-2].current_state))\n",
"\n",
" if check:\n",
" return True\n",
"\n",
" def extract_solution(self, goals, index):\n",
" """Extracts the solution"""\n",
"\n",
" level = self.graph.levels[index] \n",
" if not self.graph.non_mutex_goals(goals, index):\n",
" self.nogoods.append((level, goals))\n",
" return\n",
"\n",
" level = self.graph.levels[index - 1] \n",
"\n",
" # Create all combinations of actions that satisfy the goal \n",
" actions = []\n",
" for goal in goals:\n",
" actions.append(level.next_state_links[goal]) \n",
"\n",
" all_actions = list(itertools.product(*actions)) \n",
"\n",
" # Filter out non-mutex actions\n",
" non_mutex_actions = [] \n",
" for action_tuple in all_actions:\n",
" action_pairs = itertools.combinations(list(set(action_tuple)), 2) \n",
" non_mutex_actions.append(list(set(action_tuple))) \n",
" for pair in action_pairs: \n",
" if set(pair) in level.mutex:\n",
" non_mutex_actions.pop(-1)\n",
" break\n",
" \n",
"\n",
" # Recursion\n",
" for action_list in non_mutex_actions: \n",
" if [action_list, index] not in self.solution:\n",
" self.solution.append([action_list, index])\n",
"\n",
" new_goals = []\n",
" for act in set(action_list): \n",
" if act in level.current_action_links:\n",
" new_goals = new_goals + level.current_action_links[act]\n",
"\n",
" if abs(index) + 1 == len(self.graph.levels):\n",
" return\n",
" elif (level, new_goals) in self.nogoods:\n",
" return\n",
" else:\n",
" self.extract_solution(new_goals, index - 1)\n",
"\n",
" # Level-Order multiple solutions\n",
" solution = []\n",
" for item in self.solution:\n",
" if item[1] == -1:\n",
" solution.append([])\n",
" solution[-1].append(item[0])\n",
" else:\n",
" solution[-1].append(item[0])\n",
"\n",
" for num, item in enumerate(solution):\n",
" item.reverse()\n",
" solution[num] = item\n",
"\n",
" return solution\n",
"
def air_cargo_graphplan():\n",
" """Solves the air cargo problem using GraphPlan"""\n",
"\n",
" pddl = air_cargo()\n",
" graphplan = GraphPlan(pddl)\n",
"\n",
" def goal_test(kb, goals):\n",
" return all(kb.ask(q) is not False for q in goals)\n",
"\n",
" goals = expr('At(C1, JFK), At(C2, SFO)')\n",
"\n",
" while True:\n",
" if (goal_test(graphplan.graph.levels[-1].kb, goals) and graphplan.graph.non_mutex_goals(goals, -1)):\n",
" solution = graphplan.extract_solution(goals, -1)\n",
" if solution:\n",
" return solution\n",
"\n",
" graphplan.graph.expand_graph()\n",
" if len(graphplan.graph.levels) >= 2 and graphplan.check_leveloff():\n",
" return None\n",
"