{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"# Planning\n",
"#### Chapters 10-11\n",
"----"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This notebook serves as supporting material for topics covered in **Chapter 10 - Classical Planning** and **Chapter 11 - Planning and Acting in the Real World** from the book *[Artificial Intelligence: A Modern Approach](http://aima.cs.berkeley.edu)*. \n",
"This notebook uses implementations from the [planning.py](https://github.com/aimacode/aima-python/blob/master/planning.py) module. \n",
"See the [intro notebook](https://github.com/aimacode/aima-python/blob/master/intro.ipynb) for instructions.\n",
"\n",
"We'll start by looking at `PlanningProblem` and `Action` data types for defining problems and actions. \n",
"Then, we will see how to use them by trying to plan a trip from *Sibiu* to *Bucharest* across the familiar map of Romania, from [search.ipynb](https://github.com/aimacode/aima-python/blob/master/search.ipynb) \n",
"followed by some common planning problems and methods of solving them.\n",
"\n",
"Let's start by importing everything from the planning module."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from planning import *\n",
"from notebook import psource"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## CONTENTS\n",
"\n",
"**Classical Planning**\n",
"- PlanningProblem\n",
"- Action\n",
"- Planning Problems\n",
" * Air cargo problem\n",
" * Spare tire problem\n",
" * Three block tower problem\n",
" * Shopping Problem\n",
" * Socks and shoes problem\n",
" * Cake problem\n",
"- Solving Planning Problems\n",
" * GraphPlan\n",
" * Linearize\n",
" * PartialOrderPlanner\n",
"
\n",
"\n",
"**Planning in the real world**\n",
"- Problem\n",
"- HLA\n",
"- Planning Problems\n",
" * Job shop problem\n",
" * Double tennis problem\n",
"- Solving Planning Problems\n",
" * Hierarchical Search\n",
" * Angelic Search"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## PlanningProblem\n",
"\n",
"PDDL stands for Planning Domain Definition Language.\n",
"The `PlanningProblem` class is used to represent planning problems in this module. The following attributes are essential to be able to define a problem:\n",
"* an initial state\n",
"* a set of goals\n",
"* a set of viable actions that can be executed in the search space of the problem\n",
"\n",
"View the source to see how the Python code tries to realise these."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"\n",
"
class PlanningProblem:\n",
" """\n",
" Planning Domain Definition Language (PlanningProblem) used to define a search problem.\n",
" It stores states in a knowledge base consisting of first order logic statements.\n",
" The conjunction of these logical statements completely defines a state.\n",
" """\n",
"\n",
" def __init__(self, init, goals, actions):\n",
" self.init = self.convert(init)\n",
" self.goals = self.convert(goals)\n",
" self.actions = actions\n",
"\n",
" def convert(self, clauses):\n",
" """Converts strings into exprs"""\n",
" if not isinstance(clauses, Expr):\n",
" if len(clauses) > 0:\n",
" clauses = expr(clauses)\n",
" else:\n",
" clauses = []\n",
" try:\n",
" clauses = conjuncts(clauses)\n",
" except AttributeError:\n",
" clauses = clauses\n",
"\n",
" new_clauses = []\n",
" for clause in clauses:\n",
" if clause.op == '~':\n",
" new_clauses.append(expr('Not' + str(clause.args[0])))\n",
" else:\n",
" new_clauses.append(clause)\n",
" return new_clauses\n",
"\n",
" def goal_test(self):\n",
" """Checks if the goals have been reached"""\n",
" return all(goal in self.init for goal in self.goals)\n",
"\n",
" def act(self, action):\n",
" """\n",
" Performs the action given as argument.\n",
" Note that action is an Expr like expr('Remove(Glass, Table)') or expr('Eat(Sandwich)')\n",
" """ \n",
" action_name = action.op\n",
" args = action.args\n",
" list_action = first(a for a in self.actions if a.name == action_name)\n",
" if list_action is None:\n",
" raise Exception("Action '{}' not found".format(action_name))\n",
" if not list_action.check_precond(self.init, args):\n",
" raise Exception("Action '{}' pre-conditions not satisfied".format(action))\n",
" self.init = list_action(self.init, args).clauses\n",
"
class Action:\n",
" """\n",
" Defines an action schema using preconditions and effects.\n",
" Use this to describe actions in PlanningProblem.\n",
" action is an Expr where variables are given as arguments(args).\n",
" Precondition and effect are both lists with positive and negative literals.\n",
" Negative preconditions and effects are defined by adding a 'Not' before the name of the clause\n",
" Example:\n",
" precond = [expr("Human(person)"), expr("Hungry(Person)"), expr("NotEaten(food)")]\n",
" effect = [expr("Eaten(food)"), expr("Hungry(person)")]\n",
" eat = Action(expr("Eat(person, food)"), precond, effect)\n",
" """\n",
"\n",
" def __init__(self, action, precond, effect):\n",
" if isinstance(action, str):\n",
" action = expr(action)\n",
" self.name = action.op\n",
" self.args = action.args\n",
" self.precond = self.convert(precond)\n",
" self.effect = self.convert(effect)\n",
"\n",
" def __call__(self, kb, args):\n",
" return self.act(kb, args)\n",
"\n",
" def __repr__(self):\n",
" return '{}({})'.format(self.__class__.__name__, Expr(self.name, *self.args))\n",
"\n",
" def convert(self, clauses):\n",
" """Converts strings into Exprs"""\n",
" if isinstance(clauses, Expr):\n",
" clauses = conjuncts(clauses)\n",
" for i in range(len(clauses)):\n",
" if clauses[i].op == '~':\n",
" clauses[i] = expr('Not' + str(clauses[i].args[0]))\n",
"\n",
" elif isinstance(clauses, str):\n",
" clauses = clauses.replace('~', 'Not')\n",
" if len(clauses) > 0:\n",
" clauses = expr(clauses)\n",
"\n",
" try:\n",
" clauses = conjuncts(clauses)\n",
" except AttributeError:\n",
" pass\n",
"\n",
" return clauses\n",
"\n",
" def substitute(self, e, args):\n",
" """Replaces variables in expression with their respective Propositional symbol"""\n",
"\n",
" new_args = list(e.args)\n",
" for num, x in enumerate(e.args):\n",
" for i, _ in enumerate(self.args):\n",
" if self.args[i] == x:\n",
" new_args[num] = args[i]\n",
" return Expr(e.op, *new_args)\n",
"\n",
" def check_precond(self, kb, args):\n",
" """Checks if the precondition is satisfied in the current state"""\n",
"\n",
" if isinstance(kb, list):\n",
" kb = FolKB(kb)\n",
" for clause in self.precond:\n",
" if self.substitute(clause, args) not in kb.clauses:\n",
" return False\n",
" return True\n",
"\n",
" def act(self, kb, args):\n",
" """Executes the action on the state's knowledge base"""\n",
"\n",
" if isinstance(kb, list):\n",
" kb = FolKB(kb)\n",
"\n",
" if not self.check_precond(kb, args):\n",
" raise Exception('Action pre-conditions not satisfied')\n",
" for clause in self.effect:\n",
" kb.tell(self.substitute(clause, args))\n",
" if clause.op[:3] == 'Not':\n",
" new_clause = Expr(clause.op[3:], *clause.args)\n",
"\n",
" if kb.ask(self.substitute(new_clause, args)) is not False:\n",
" kb.retract(self.substitute(new_clause, args))\n",
" else:\n",
" new_clause = Expr('Not' + clause.op, *clause.args)\n",
"\n",
" if kb.ask(self.substitute(new_clause, args)) is not False: \n",
" kb.retract(self.substitute(new_clause, args))\n",
"\n",
" return kb\n",
"
def air_cargo():\n",
" """\n",
" [Figure 10.1] AIR-CARGO-PROBLEM\n",
"\n",
" An air-cargo shipment problem for delivering cargo to different locations,\n",
" given the starting location and airplanes.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> ac = air_cargo()\n",
" >>> ac.goal_test()\n",
" False\n",
" >>> ac.act(expr('Load(C2, P2, JFK)'))\n",
" >>> ac.act(expr('Load(C1, P1, SFO)'))\n",
" >>> ac.act(expr('Fly(P1, SFO, JFK)'))\n",
" >>> ac.act(expr('Fly(P2, JFK, SFO)'))\n",
" >>> ac.act(expr('Unload(C2, P2, SFO)'))\n",
" >>> ac.goal_test()\n",
" False\n",
" >>> ac.act(expr('Unload(C1, P1, JFK)'))\n",
" >>> ac.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='At(C1, SFO) & At(C2, JFK) & At(P1, SFO) & At(P2, JFK) & Cargo(C1) & Cargo(C2) & Plane(P1) & Plane(P2) & Airport(SFO) & Airport(JFK)', \n",
" goals='At(C1, JFK) & At(C2, SFO)',\n",
" actions=[Action('Load(c, p, a)', \n",
" precond='At(c, a) & At(p, a) & Cargo(c) & Plane(p) & Airport(a)',\n",
" effect='In(c, p) & ~At(c, a)'),\n",
" Action('Unload(c, p, a)',\n",
" precond='In(c, p) & At(p, a) & Cargo(c) & Plane(p) & Airport(a)',\n",
" effect='At(c, a) & ~In(c, p)'),\n",
" Action('Fly(p, f, to)',\n",
" precond='At(p, f) & Plane(p) & Airport(f) & Airport(to)',\n",
" effect='At(p, to) & ~At(p, f)')])\n",
"
def spare_tire():\n",
" """[Figure 10.2] SPARE-TIRE-PROBLEM\n",
"\n",
" A problem involving changing the flat tire of a car\n",
" with a spare tire from the trunk.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> st = spare_tire()\n",
" >>> st.goal_test()\n",
" False\n",
" >>> st.act(expr('Remove(Spare, Trunk)'))\n",
" >>> st.act(expr('Remove(Flat, Axle)'))\n",
" >>> st.goal_test()\n",
" False\n",
" >>> st.act(expr('PutOn(Spare, Axle)'))\n",
" >>> st.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='Tire(Flat) & Tire(Spare) & At(Flat, Axle) & At(Spare, Trunk)',\n",
" goals='At(Spare, Axle) & At(Flat, Ground)',\n",
" actions=[Action('Remove(obj, loc)',\n",
" precond='At(obj, loc)',\n",
" effect='At(obj, Ground) & ~At(obj, loc)'),\n",
" Action('PutOn(t, Axle)',\n",
" precond='Tire(t) & At(t, Ground) & ~At(Flat, Axle)',\n",
" effect='At(t, Axle) & ~At(t, Ground)'),\n",
" Action('LeaveOvernight',\n",
" precond='',\n",
" effect='~At(Spare, Ground) & ~At(Spare, Axle) & ~At(Spare, Trunk) & \\\n",
" ~At(Flat, Ground) & ~At(Flat, Axle) & ~At(Flat, Trunk)')])\n",
"
def three_block_tower():\n",
" """\n",
" [Figure 10.3] THREE-BLOCK-TOWER\n",
"\n",
" A blocks-world problem of stacking three blocks in a certain configuration,\n",
" also known as the Sussman Anomaly.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> tbt = three_block_tower()\n",
" >>> tbt.goal_test()\n",
" False\n",
" >>> tbt.act(expr('MoveToTable(C, A)'))\n",
" >>> tbt.act(expr('Move(B, Table, C)'))\n",
" >>> tbt.goal_test()\n",
" False\n",
" >>> tbt.act(expr('Move(A, Table, B)'))\n",
" >>> tbt.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='On(A, Table) & On(B, Table) & On(C, A) & Block(A) & Block(B) & Block(C) & Clear(B) & Clear(C)',\n",
" goals='On(A, B) & On(B, C)',\n",
" actions=[Action('Move(b, x, y)',\n",
" precond='On(b, x) & Clear(b) & Clear(y) & Block(b) & Block(y)',\n",
" effect='On(b, y) & Clear(x) & ~On(b, x) & ~Clear(y)'),\n",
" Action('MoveToTable(b, x)',\n",
" precond='On(b, x) & Clear(b) & Block(b)',\n",
" effect='On(b, Table) & Clear(x) & ~On(b, x)')])\n",
"
def simple_blocks_world():\n",
" """\n",
" SIMPLE-BLOCKS-WORLD\n",
"\n",
" A simplified definition of the Sussman Anomaly problem.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> sbw = simple_blocks_world()\n",
" >>> sbw.goal_test()\n",
" False\n",
" >>> sbw.act(expr('ToTable(A, B)'))\n",
" >>> sbw.act(expr('FromTable(B, A)'))\n",
" >>> sbw.goal_test()\n",
" False\n",
" >>> sbw.act(expr('FromTable(C, B)'))\n",
" >>> sbw.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='On(A, B) & Clear(A) & OnTable(B) & OnTable(C) & Clear(C)',\n",
" goals='On(B, A) & On(C, B)',\n",
" actions=[Action('ToTable(x, y)',\n",
" precond='On(x, y) & Clear(x)',\n",
" effect='~On(x, y) & Clear(y) & OnTable(x)'),\n",
" Action('FromTable(y, x)',\n",
" precond='OnTable(y) & Clear(y) & Clear(x)',\n",
" effect='~OnTable(y) & ~Clear(x) & On(y, x)')])\n",
"
def shopping_problem():\n",
" """\n",
" SHOPPING-PROBLEM\n",
"\n",
" A problem of acquiring some items given their availability at certain stores.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> sp = shopping_problem()\n",
" >>> sp.goal_test()\n",
" False\n",
" >>> sp.act(expr('Go(Home, HW)'))\n",
" >>> sp.act(expr('Buy(Drill, HW)'))\n",
" >>> sp.act(expr('Go(HW, SM)'))\n",
" >>> sp.act(expr('Buy(Banana, SM)'))\n",
" >>> sp.goal_test()\n",
" False\n",
" >>> sp.act(expr('Buy(Milk, SM)'))\n",
" >>> sp.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='At(Home) & Sells(SM, Milk) & Sells(SM, Banana) & Sells(HW, Drill)',\n",
" goals='Have(Milk) & Have(Banana) & Have(Drill)', \n",
" actions=[Action('Buy(x, store)',\n",
" precond='At(store) & Sells(store, x)',\n",
" effect='Have(x)'),\n",
" Action('Go(x, y)',\n",
" precond='At(x)',\n",
" effect='At(y) & ~At(x)')])\n",
"
def socks_and_shoes():\n",
" """\n",
" SOCKS-AND-SHOES-PROBLEM\n",
"\n",
" A task of wearing socks and shoes on both feet\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> ss = socks_and_shoes()\n",
" >>> ss.goal_test()\n",
" False\n",
" >>> ss.act(expr('RightSock'))\n",
" >>> ss.act(expr('RightShoe'))\n",
" >>> ss.act(expr('LeftSock'))\n",
" >>> ss.goal_test()\n",
" False\n",
" >>> ss.act(expr('LeftShoe'))\n",
" >>> ss.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='',\n",
" goals='RightShoeOn & LeftShoeOn',\n",
" actions=[Action('RightShoe',\n",
" precond='RightSockOn',\n",
" effect='RightShoeOn'),\n",
" Action('RightSock',\n",
" precond='',\n",
" effect='RightSockOn'),\n",
" Action('LeftShoe',\n",
" precond='LeftSockOn',\n",
" effect='LeftShoeOn'),\n",
" Action('LeftSock',\n",
" precond='',\n",
" effect='LeftSockOn')])\n",
"
def have_cake_and_eat_cake_too():\n",
" """\n",
" [Figure 10.7] CAKE-PROBLEM\n",
"\n",
" A problem where we begin with a cake and want to \n",
" reach the state of having a cake and having eaten a cake.\n",
" The possible actions include baking a cake and eating a cake.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> cp = have_cake_and_eat_cake_too()\n",
" >>> cp.goal_test()\n",
" False\n",
" >>> cp.act(expr('Eat(Cake)'))\n",
" >>> cp.goal_test()\n",
" False\n",
" >>> cp.act(expr('Bake(Cake)'))\n",
" >>> cp.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='Have(Cake)',\n",
" goals='Have(Cake) & Eaten(Cake)',\n",
" actions=[Action('Eat(Cake)',\n",
" precond='Have(Cake)',\n",
" effect='Eaten(Cake) & ~Have(Cake)'),\n",
" Action('Bake(Cake)',\n",
" precond='~Have(Cake)',\n",
" effect='Have(Cake)')])\n",
"
class Level:\n",
" """\n",
" Contains the state of the planning problem\n",
" and exhaustive list of actions which use the\n",
" states as pre-condition.\n",
" """\n",
"\n",
" def __init__(self, kb):\n",
" """Initializes variables to hold state and action details of a level"""\n",
"\n",
" self.kb = kb\n",
" # current state\n",
" self.current_state = kb.clauses\n",
" # current action to state link\n",
" self.current_action_links = {}\n",
" # current state to action link\n",
" self.current_state_links = {}\n",
" # current action to next state link\n",
" self.next_action_links = {}\n",
" # next state to current action link\n",
" self.next_state_links = {}\n",
" # mutually exclusive actions\n",
" self.mutex = []\n",
"\n",
" def __call__(self, actions, objects):\n",
" self.build(actions, objects)\n",
" self.find_mutex()\n",
"\n",
" def separate(self, e):\n",
" """Separates an iterable of elements into positive and negative parts"""\n",
"\n",
" positive = []\n",
" negative = []\n",
" for clause in e:\n",
" if clause.op[:3] == 'Not':\n",
" negative.append(clause)\n",
" else:\n",
" positive.append(clause)\n",
" return positive, negative\n",
"\n",
" def find_mutex(self):\n",
" """Finds mutually exclusive actions"""\n",
"\n",
" # Inconsistent effects\n",
" pos_nsl, neg_nsl = self.separate(self.next_state_links)\n",
"\n",
" for negeff in neg_nsl:\n",
" new_negeff = Expr(negeff.op[3:], *negeff.args)\n",
" for poseff in pos_nsl:\n",
" if new_negeff == poseff:\n",
" for a in self.next_state_links[poseff]:\n",
" for b in self.next_state_links[negeff]:\n",
" if {a, b} not in self.mutex:\n",
" self.mutex.append({a, b})\n",
"\n",
" # Interference will be calculated with the last step\n",
" pos_csl, neg_csl = self.separate(self.current_state_links)\n",
"\n",
" # Competing needs\n",
" for posprecond in pos_csl:\n",
" for negprecond in neg_csl:\n",
" new_negprecond = Expr(negprecond.op[3:], *negprecond.args)\n",
" if new_negprecond == posprecond:\n",
" for a in self.current_state_links[posprecond]:\n",
" for b in self.current_state_links[negprecond]:\n",
" if {a, b} not in self.mutex:\n",
" self.mutex.append({a, b})\n",
"\n",
" # Inconsistent support\n",
" state_mutex = []\n",
" for pair in self.mutex:\n",
" next_state_0 = self.next_action_links[list(pair)[0]]\n",
" if len(pair) == 2:\n",
" next_state_1 = self.next_action_links[list(pair)[1]]\n",
" else:\n",
" next_state_1 = self.next_action_links[list(pair)[0]]\n",
" if (len(next_state_0) == 1) and (len(next_state_1) == 1):\n",
" state_mutex.append({next_state_0[0], next_state_1[0]})\n",
" \n",
" self.mutex = self.mutex + state_mutex\n",
"\n",
" def build(self, actions, objects):\n",
" """Populates the lists and dictionaries containing the state action dependencies"""\n",
"\n",
" for clause in self.current_state:\n",
" p_expr = Expr('P' + clause.op, *clause.args)\n",
" self.current_action_links[p_expr] = [clause]\n",
" self.next_action_links[p_expr] = [clause]\n",
" self.current_state_links[clause] = [p_expr]\n",
" self.next_state_links[clause] = [p_expr]\n",
"\n",
" for a in actions:\n",
" num_args = len(a.args)\n",
" possible_args = tuple(itertools.permutations(objects, num_args))\n",
"\n",
" for arg in possible_args:\n",
" if a.check_precond(self.kb, arg):\n",
" for num, symbol in enumerate(a.args):\n",
" if not symbol.op.islower():\n",
" arg = list(arg)\n",
" arg[num] = symbol\n",
" arg = tuple(arg)\n",
"\n",
" new_action = a.substitute(Expr(a.name, *a.args), arg)\n",
" self.current_action_links[new_action] = []\n",
"\n",
" for clause in a.precond:\n",
" new_clause = a.substitute(clause, arg)\n",
" self.current_action_links[new_action].append(new_clause)\n",
" if new_clause in self.current_state_links:\n",
" self.current_state_links[new_clause].append(new_action)\n",
" else:\n",
" self.current_state_links[new_clause] = [new_action]\n",
" \n",
" self.next_action_links[new_action] = []\n",
" for clause in a.effect:\n",
" new_clause = a.substitute(clause, arg)\n",
"\n",
" self.next_action_links[new_action].append(new_clause)\n",
" if new_clause in self.next_state_links:\n",
" self.next_state_links[new_clause].append(new_action)\n",
" else:\n",
" self.next_state_links[new_clause] = [new_action]\n",
"\n",
" def perform_actions(self):\n",
" """Performs the necessary actions and returns a new Level"""\n",
"\n",
" new_kb = FolKB(list(set(self.next_state_links.keys())))\n",
" return Level(new_kb)\n",
"
class Graph:\n",
" """\n",
" Contains levels of state and actions\n",
" Used in graph planning algorithm to extract a solution\n",
" """\n",
"\n",
" def __init__(self, pddl):\n",
" self.pddl = pddl\n",
" self.kb = FolKB(pddl.init)\n",
" self.levels = [Level(self.kb)]\n",
" self.objects = set(arg for clause in self.kb.clauses for arg in clause.args)\n",
"\n",
" def __call__(self):\n",
" self.expand_graph()\n",
"\n",
" def expand_graph(self):\n",
" """Expands the graph by a level"""\n",
"\n",
" last_level = self.levels[-1]\n",
" last_level(self.pddl.actions, self.objects)\n",
" self.levels.append(last_level.perform_actions())\n",
"\n",
" def non_mutex_goals(self, goals, index):\n",
" """Checks whether the goals are mutually exclusive"""\n",
"\n",
" goal_perm = itertools.combinations(goals, 2)\n",
" for g in goal_perm:\n",
" if set(g) in self.levels[index].mutex:\n",
" return False\n",
" return True\n",
"
class GraphPlan:\n",
" """\n",
" Class for formulation GraphPlan algorithm\n",
" Constructs a graph of state and action space\n",
" Returns solution for the planning problem\n",
" """\n",
"\n",
" def __init__(self, pddl):\n",
" self.graph = Graph(pddl)\n",
" self.nogoods = []\n",
" self.solution = []\n",
"\n",
" def check_leveloff(self):\n",
" """Checks if the graph has levelled off"""\n",
"\n",
" check = (set(self.graph.levels[-1].current_state) == set(self.graph.levels[-2].current_state))\n",
"\n",
" if check:\n",
" return True\n",
"\n",
" def extract_solution(self, goals, index):\n",
" """Extracts the solution"""\n",
"\n",
" level = self.graph.levels[index] \n",
" if not self.graph.non_mutex_goals(goals, index):\n",
" self.nogoods.append((level, goals))\n",
" return\n",
"\n",
" level = self.graph.levels[index - 1] \n",
"\n",
" # Create all combinations of actions that satisfy the goal \n",
" actions = []\n",
" for goal in goals:\n",
" actions.append(level.next_state_links[goal]) \n",
"\n",
" all_actions = list(itertools.product(*actions)) \n",
"\n",
" # Filter out non-mutex actions\n",
" non_mutex_actions = [] \n",
" for action_tuple in all_actions:\n",
" action_pairs = itertools.combinations(list(set(action_tuple)), 2) \n",
" non_mutex_actions.append(list(set(action_tuple))) \n",
" for pair in action_pairs: \n",
" if set(pair) in level.mutex:\n",
" non_mutex_actions.pop(-1)\n",
" break\n",
" \n",
"\n",
" # Recursion\n",
" for action_list in non_mutex_actions: \n",
" if [action_list, index] not in self.solution:\n",
" self.solution.append([action_list, index])\n",
"\n",
" new_goals = []\n",
" for act in set(action_list): \n",
" if act in level.current_action_links:\n",
" new_goals = new_goals + level.current_action_links[act]\n",
"\n",
" if abs(index) + 1 == len(self.graph.levels):\n",
" return\n",
" elif (level, new_goals) in self.nogoods:\n",
" return\n",
" else:\n",
" self.extract_solution(new_goals, index - 1)\n",
"\n",
" # Level-Order multiple solutions\n",
" solution = []\n",
" for item in self.solution:\n",
" if item[1] == -1:\n",
" solution.append([])\n",
" solution[-1].append(item[0])\n",
" else:\n",
" solution[-1].append(item[0])\n",
"\n",
" for num, item in enumerate(solution):\n",
" item.reverse()\n",
" solution[num] = item\n",
"\n",
" return solution\n",
"\n",
" def goal_test(self, kb):\n",
" return all(kb.ask(q) is not False for q in self.graph.pddl.goals)\n",
"\n",
" def execute(self):\n",
" """Executes the GraphPlan algorithm for the given problem"""\n",
"\n",
" while True:\n",
" self.graph.expand_graph()\n",
" if (self.goal_test(self.graph.levels[-1].kb) and self.graph.non_mutex_goals(self.graph.pddl.goals, -1)):\n",
" solution = self.extract_solution(self.graph.pddl.goals, -1)\n",
" if solution:\n",
" return solution\n",
" \n",
" if len(self.graph.levels) >= 2 and self.check_leveloff():\n",
" return None\n",
"
def air_cargo_graphplan():\n",
" """Solves the air cargo problem using GraphPlan"""\n",
" return GraphPlan(air_cargo()).execute()\n",
"
class Linearize:\n",
"\n",
" def __init__(self, pddl):\n",
" self.pddl = pddl\n",
"\n",
" def filter(self, solution):\n",
" """Filter out persistence actions from a solution"""\n",
"\n",
" new_solution = []\n",
" for section in solution[0]:\n",
" new_section = []\n",
" for operation in section:\n",
" if not (operation.op[0] == 'P' and operation.op[1].isupper()):\n",
" new_section.append(operation)\n",
" new_solution.append(new_section)\n",
" return new_solution\n",
"\n",
" def orderlevel(self, level, pddl):\n",
" """Return valid linear order of actions for a given level"""\n",
"\n",
" for permutation in itertools.permutations(level):\n",
" temp = copy.deepcopy(pddl)\n",
" count = 0\n",
" for action in permutation:\n",
" try:\n",
" temp.act(action)\n",
" count += 1\n",
" except:\n",
" count = 0\n",
" temp = copy.deepcopy(pddl)\n",
" break\n",
" if count == len(permutation):\n",
" return list(permutation), temp\n",
" return None\n",
"\n",
" def execute(self):\n",
" """Finds total-order solution for a planning graph"""\n",
"\n",
" graphplan_solution = GraphPlan(self.pddl).execute()\n",
" filtered_solution = self.filter(graphplan_solution)\n",
" ordered_solution = []\n",
" pddl = self.pddl\n",
" for level in filtered_solution:\n",
" level_solution, pddl = self.orderlevel(level, pddl)\n",
" for element in level_solution:\n",
" ordered_solution.append(element)\n",
"\n",
" return ordered_solution\n",
"
class PartialOrderPlanner:\n",
"\n",
" def __init__(self, pddl):\n",
" self.pddl = pddl\n",
" self.initialize()\n",
"\n",
" def initialize(self):\n",
" """Initialize all variables"""\n",
" self.causal_links = []\n",
" self.start = Action('Start', [], self.pddl.init)\n",
" self.finish = Action('Finish', self.pddl.goals, [])\n",
" self.actions = set()\n",
" self.actions.add(self.start)\n",
" self.actions.add(self.finish)\n",
" self.constraints = set()\n",
" self.constraints.add((self.start, self.finish))\n",
" self.agenda = set()\n",
" for precond in self.finish.precond:\n",
" self.agenda.add((precond, self.finish))\n",
" self.expanded_actions = self.expand_actions()\n",
"\n",
" def expand_actions(self, name=None):\n",
" """Generate all possible actions with variable bindings for precondition selection heuristic"""\n",
"\n",
" objects = set(arg for clause in self.pddl.init for arg in clause.args)\n",
" expansions = []\n",
" action_list = []\n",
" if name is not None:\n",
" for action in self.pddl.actions:\n",
" if str(action.name) == name:\n",
" action_list.append(action)\n",
" else:\n",
" action_list = self.pddl.actions\n",
"\n",
" for action in action_list:\n",
" for permutation in itertools.permutations(objects, len(action.args)):\n",
" bindings = unify(Expr(action.name, *action.args), Expr(action.name, *permutation))\n",
" if bindings is not None:\n",
" new_args = []\n",
" for arg in action.args:\n",
" if arg in bindings:\n",
" new_args.append(bindings[arg])\n",
" else:\n",
" new_args.append(arg)\n",
" new_expr = Expr(str(action.name), *new_args)\n",
" new_preconds = []\n",
" for precond in action.precond:\n",
" new_precond_args = []\n",
" for arg in precond.args:\n",
" if arg in bindings:\n",
" new_precond_args.append(bindings[arg])\n",
" else:\n",
" new_precond_args.append(arg)\n",
" new_precond = Expr(str(precond.op), *new_precond_args)\n",
" new_preconds.append(new_precond)\n",
" new_effects = []\n",
" for effect in action.effect:\n",
" new_effect_args = []\n",
" for arg in effect.args:\n",
" if arg in bindings:\n",
" new_effect_args.append(bindings[arg])\n",
" else:\n",
" new_effect_args.append(arg)\n",
" new_effect = Expr(str(effect.op), *new_effect_args)\n",
" new_effects.append(new_effect)\n",
" expansions.append(Action(new_expr, new_preconds, new_effects))\n",
"\n",
" return expansions\n",
"\n",
" def find_open_precondition(self):\n",
" """Find open precondition with the least number of possible actions"""\n",
"\n",
" number_of_ways = dict()\n",
" actions_for_precondition = dict()\n",
" for element in self.agenda:\n",
" open_precondition = element[0]\n",
" possible_actions = list(self.actions) + self.expanded_actions\n",
" for action in possible_actions:\n",
" for effect in action.effect:\n",
" if effect == open_precondition:\n",
" if open_precondition in number_of_ways:\n",
" number_of_ways[open_precondition] += 1\n",
" actions_for_precondition[open_precondition].append(action)\n",
" else:\n",
" number_of_ways[open_precondition] = 1\n",
" actions_for_precondition[open_precondition] = [action]\n",
"\n",
" number = sorted(number_of_ways, key=number_of_ways.__getitem__)\n",
" \n",
" for k, v in number_of_ways.items():\n",
" if v == 0:\n",
" return None, None, None\n",
"\n",
" act1 = None\n",
" for element in self.agenda:\n",
" if element[0] == number[0]:\n",
" act1 = element[1]\n",
" break\n",
"\n",
" if number[0] in self.expanded_actions:\n",
" self.expanded_actions.remove(number[0])\n",
"\n",
" return number[0], act1, actions_for_precondition[number[0]]\n",
"\n",
" def find_action_for_precondition(self, oprec):\n",
" """Find action for a given precondition"""\n",
"\n",
" # either\n",
" # choose act0 E Actions such that act0 achieves G\n",
" for action in self.actions:\n",
" for effect in action.effect:\n",
" if effect == oprec:\n",
" return action, 0\n",
"\n",
" # or\n",
" # choose act0 E Actions such that act0 achieves G\n",
" for action in self.pddl.actions:\n",
" for effect in action.effect:\n",
" if effect.op == oprec.op:\n",
" bindings = unify(effect, oprec)\n",
" if bindings is None:\n",
" break\n",
" return action, bindings\n",
"\n",
" def generate_expr(self, clause, bindings):\n",
" """Generate atomic expression from generic expression given variable bindings"""\n",
"\n",
" new_args = []\n",
" for arg in clause.args:\n",
" if arg in bindings:\n",
" new_args.append(bindings[arg])\n",
" else:\n",
" new_args.append(arg)\n",
"\n",
" try:\n",
" return Expr(str(clause.name), *new_args)\n",
" except:\n",
" return Expr(str(clause.op), *new_args)\n",
" \n",
" def generate_action_object(self, action, bindings):\n",
" """Generate action object given a generic action andvariable bindings"""\n",
"\n",
" # if bindings is 0, it means the action already exists in self.actions\n",
" if bindings == 0:\n",
" return action\n",
"\n",
" # bindings cannot be None\n",
" else:\n",
" new_expr = self.generate_expr(action, bindings)\n",
" new_preconds = []\n",
" for precond in action.precond:\n",
" new_precond = self.generate_expr(precond, bindings)\n",
" new_preconds.append(new_precond)\n",
" new_effects = []\n",
" for effect in action.effect:\n",
" new_effect = self.generate_expr(effect, bindings)\n",
" new_effects.append(new_effect)\n",
" return Action(new_expr, new_preconds, new_effects)\n",
"\n",
" def cyclic(self, graph):\n",
" """Check cyclicity of a directed graph"""\n",
"\n",
" new_graph = dict()\n",
" for element in graph:\n",
" if element[0] in new_graph:\n",
" new_graph[element[0]].append(element[1])\n",
" else:\n",
" new_graph[element[0]] = [element[1]]\n",
"\n",
" path = set()\n",
"\n",
" def visit(vertex):\n",
" path.add(vertex)\n",
" for neighbor in new_graph.get(vertex, ()):\n",
" if neighbor in path or visit(neighbor):\n",
" return True\n",
" path.remove(vertex)\n",
" return False\n",
"\n",
" value = any(visit(v) for v in new_graph)\n",
" return value\n",
"\n",
" def add_const(self, constraint, constraints):\n",
" """Add the constraint to constraints if the resulting graph is acyclic"""\n",
"\n",
" if constraint[0] == self.finish or constraint[1] == self.start:\n",
" return constraints\n",
"\n",
" new_constraints = set(constraints)\n",
" new_constraints.add(constraint)\n",
"\n",
" if self.cyclic(new_constraints):\n",
" return constraints\n",
" return new_constraints\n",
"\n",
" def is_a_threat(self, precondition, effect):\n",
" """Check if effect is a threat to precondition"""\n",
"\n",
" if (str(effect.op) == 'Not' + str(precondition.op)) or ('Not' + str(effect.op) == str(precondition.op)):\n",
" if effect.args == precondition.args:\n",
" return True\n",
" return False\n",
"\n",
" def protect(self, causal_link, action, constraints):\n",
" """Check and resolve threats by promotion or demotion"""\n",
"\n",
" threat = False\n",
" for effect in action.effect:\n",
" if self.is_a_threat(causal_link[1], effect):\n",
" threat = True\n",
" break\n",
"\n",
" if action != causal_link[0] and action != causal_link[2] and threat:\n",
" # try promotion\n",
" new_constraints = set(constraints)\n",
" new_constraints.add((action, causal_link[0]))\n",
" if not self.cyclic(new_constraints):\n",
" constraints = self.add_const((action, causal_link[0]), constraints)\n",
" else:\n",
" # try demotion\n",
" new_constraints = set(constraints)\n",
" new_constraints.add((causal_link[2], action))\n",
" if not self.cyclic(new_constraints):\n",
" constraints = self.add_const((causal_link[2], action), constraints)\n",
" else:\n",
" # both promotion and demotion fail\n",
" print('Unable to resolve a threat caused by', action, 'onto', causal_link)\n",
" return\n",
" return constraints\n",
"\n",
" def convert(self, constraints):\n",
" """Convert constraints into a dict of Action to set orderings"""\n",
"\n",
" graph = dict()\n",
" for constraint in constraints:\n",
" if constraint[0] in graph:\n",
" graph[constraint[0]].add(constraint[1])\n",
" else:\n",
" graph[constraint[0]] = set()\n",
" graph[constraint[0]].add(constraint[1])\n",
" return graph\n",
"\n",
" def toposort(self, graph):\n",
" """Generate topological ordering of constraints"""\n",
"\n",
" if len(graph) == 0:\n",
" return\n",
"\n",
" graph = graph.copy()\n",
"\n",
" for k, v in graph.items():\n",
" v.discard(k)\n",
"\n",
" extra_elements_in_dependencies = _reduce(set.union, graph.values()) - set(graph.keys())\n",
"\n",
" graph.update({element:set() for element in extra_elements_in_dependencies})\n",
" while True:\n",
" ordered = set(element for element, dependency in graph.items() if len(dependency) == 0)\n",
" if not ordered:\n",
" break\n",
" yield ordered\n",
" graph = {element: (dependency - ordered) for element, dependency in graph.items() if element not in ordered}\n",
" if len(graph) != 0:\n",
" raise ValueError('The graph is not acyclic and cannot be linearly ordered')\n",
"\n",
" def display_plan(self):\n",
" """Display causal links, constraints and the plan"""\n",
"\n",
" print('Causal Links')\n",
" for causal_link in self.causal_links:\n",
" print(causal_link)\n",
"\n",
" print('\\nConstraints')\n",
" for constraint in self.constraints:\n",
" print(constraint[0], '<', constraint[1])\n",
"\n",
" print('\\nPartial Order Plan')\n",
" print(list(reversed(list(self.toposort(self.convert(self.constraints))))))\n",
"\n",
" def execute(self, display=True):\n",
" """Execute the algorithm"""\n",
"\n",
" step = 1\n",
" self.tries = 1\n",
" while len(self.agenda) > 0:\n",
" step += 1\n",
" # select <G, act1> from Agenda\n",
" try:\n",
" G, act1, possible_actions = self.find_open_precondition()\n",
" except IndexError:\n",
" print('Probably Wrong')\n",
" break\n",
"\n",
" act0 = possible_actions[0]\n",
" # remove <G, act1> from Agenda\n",
" self.agenda.remove((G, act1))\n",
"\n",
" # For actions with variable number of arguments, use least commitment principle\n",
" # act0_temp, bindings = self.find_action_for_precondition(G)\n",
" # act0 = self.generate_action_object(act0_temp, bindings)\n",
"\n",
" # Actions = Actions U {act0}\n",
" self.actions.add(act0)\n",
"\n",
" # Constraints = add_const(start < act0, Constraints)\n",
" self.constraints = self.add_const((self.start, act0), self.constraints)\n",
"\n",
" # for each CL E CausalLinks do\n",
" # Constraints = protect(CL, act0, Constraints)\n",
" for causal_link in self.causal_links:\n",
" self.constraints = self.protect(causal_link, act0, self.constraints)\n",
"\n",
" # Agenda = Agenda U {<P, act0>: P is a precondition of act0}\n",
" for precondition in act0.precond:\n",
" self.agenda.add((precondition, act0))\n",
"\n",
" # Constraints = add_const(act0 < act1, Constraints)\n",
" self.constraints = self.add_const((act0, act1), self.constraints)\n",
"\n",
" # CausalLinks U {<act0, G, act1>}\n",
" if (act0, G, act1) not in self.causal_links:\n",
" self.causal_links.append((act0, G, act1))\n",
"\n",
" # for each A E Actions do\n",
" # Constraints = protect(<act0, G, act1>, A, Constraints)\n",
" for action in self.actions:\n",
" self.constraints = self.protect((act0, G, act1), action, self.constraints)\n",
"\n",
" if step > 200:\n",
" print('Couldn\\'t find a solution')\n",
" return None, None\n",
"\n",
" if display:\n",
" self.display_plan()\n",
" else:\n",
" return self.constraints, self.causal_links \n",
"
class Problem(PlanningProblem):\n",
" """\n",
" Define real-world problems by aggregating resources as numerical quantities instead of\n",
" named entities.\n",
"\n",
" This class is identical to PDLL, except that it overloads the act function to handle\n",
" resource and ordering conditions imposed by HLA as opposed to Action.\n",
" """\n",
" def __init__(self, init, goals, actions, jobs=None, resources=None):\n",
" super().__init__(init, goals, actions)\n",
" self.jobs = jobs\n",
" self.resources = resources or {}\n",
"\n",
" def act(self, action):\n",
" """\n",
" Performs the HLA given as argument.\n",
"\n",
" Note that this is different from the superclass action - where the parameter was an\n",
" Expression. For real world problems, an Expr object isn't enough to capture all the\n",
" detail required for executing the action - resources, preconditions, etc need to be\n",
" checked for too.\n",
" """\n",
" args = action.args\n",
" list_action = first(a for a in self.actions if a.name == action.name)\n",
" if list_action is None:\n",
" raise Exception("Action '{}' not found".format(action.name))\n",
" self.init = list_action.do_action(self.jobs, self.resources, self.init, args).clauses\n",
"\n",
" def refinements(hla, state, library): # TODO - refinements may be (multiple) HLA themselves ...\n",
" """\n",
" state is a Problem, containing the current state kb\n",
" library is a dictionary containing details for every possible refinement. eg:\n",
" {\n",
" 'HLA': ['Go(Home,SFO)', 'Go(Home,SFO)', 'Drive(Home, SFOLongTermParking)', 'Shuttle(SFOLongTermParking, SFO)', 'Taxi(Home, SFO)'],\n",
" 'steps': [['Drive(Home, SFOLongTermParking)', 'Shuttle(SFOLongTermParking, SFO)'], ['Taxi(Home, SFO)'], [], [], []],\n",
" # empty refinements ie primitive action\n",
" 'precond': [['At(Home), Have(Car)'], ['At(Home)'], ['At(Home)', 'Have(Car)'], ['At(SFOLongTermParking)'], ['At(Home)']],\n",
" 'effect': [['At(SFO)'], ['At(SFO)'], ['At(SFOLongTermParking)'], ['At(SFO)'], ['At(SFO)'], ['~At(Home)'], ['~At(Home)'], ['~At(Home)'], ['~At(SFOLongTermParking)'], ['~At(Home)']]\n",
" }\n",
" """\n",
" e = Expr(hla.name, hla.args)\n",
" indices = [i for i, x in enumerate(library['HLA']) if expr(x).op == hla.name]\n",
" for i in indices:\n",
" # TODO multiple refinements\n",
" precond = []\n",
" for p in library['precond'][i]:\n",
" if p[0] == '~':\n",
" precond.append(expr('Not' + p[1:]))\n",
" else:\n",
" precond.append(expr(p))\n",
" effect = []\n",
" for e in library['effect'][i]:\n",
" if e[0] == '~':\n",
" effect.append(expr('Not' + e[1:]))\n",
" else:\n",
" effect.append(expr(e))\n",
" action = HLA(library['steps'][i][0], precond, effect)\n",
" if action.check_precond(state.init, action.args):\n",
" yield action\n",
"\n",
" def hierarchical_search(problem, hierarchy):\n",
" """\n",
" [Figure 11.5] 'Hierarchical Search, a Breadth First Search implementation of Hierarchical\n",
" Forward Planning Search'\n",
" The problem is a real-world problem defined by the problem class, and the hierarchy is\n",
" a dictionary of HLA - refinements (see refinements generator for details)\n",
" """\n",
" act = Node(problem.actions[0])\n",
" frontier = deque()\n",
" frontier.append(act)\n",
" while True:\n",
" if not frontier:\n",
" return None\n",
" plan = frontier.popleft()\n",
" print(plan.state.name)\n",
" hla = plan.state # first_or_null(plan)\n",
" prefix = None\n",
" if plan.parent:\n",
" prefix = plan.parent.state.action # prefix, suffix = subseq(plan.state, hla)\n",
" outcome = Problem.result(problem, prefix)\n",
" if hla is None:\n",
" if outcome.goal_test():\n",
" return plan.path()\n",
" else:\n",
" print("else")\n",
" for sequence in Problem.refinements(hla, outcome, hierarchy):\n",
" print("...")\n",
" frontier.append(Node(plan.state, plan.parent, sequence))\n",
"\n",
" def result(problem, action):\n",
" """The outcome of applying an action to the current problem"""\n",
" if action is not None:\n",
" problem.act(action)\n",
" return problem\n",
" else:\n",
" return problem\n",
"
class HLA(Action):\n",
" """\n",
" Define Actions for the real-world (that may be refined further), and satisfy resource\n",
" constraints.\n",
" """\n",
" unique_group = 1\n",
"\n",
" def __init__(self, action, precond=None, effect=None, duration=0,\n",
" consume=None, use=None):\n",
" """\n",
" As opposed to actions, to define HLA, we have added constraints.\n",
" duration holds the amount of time required to execute the task\n",
" consumes holds a dictionary representing the resources the task consumes\n",
" uses holds a dictionary representing the resources the task uses\n",
" """\n",
" precond = precond or [None]\n",
" effect = effect or [None]\n",
" super().__init__(action, precond, effect)\n",
" self.duration = duration\n",
" self.consumes = consume or {}\n",
" self.uses = use or {}\n",
" self.completed = False\n",
" # self.priority = -1 # must be assigned in relation to other HLAs\n",
" # self.job_group = -1 # must be assigned in relation to other HLAs\n",
"\n",
" def do_action(self, job_order, available_resources, kb, args):\n",
" """\n",
" An HLA based version of act - along with knowledge base updation, it handles\n",
" resource checks, and ensures the actions are executed in the correct order.\n",
" """\n",
" # print(self.name)\n",
" if not self.has_usable_resource(available_resources):\n",
" raise Exception('Not enough usable resources to execute {}'.format(self.name))\n",
" if not self.has_consumable_resource(available_resources):\n",
" raise Exception('Not enough consumable resources to execute {}'.format(self.name))\n",
" if not self.inorder(job_order):\n",
" raise Exception("Can't execute {} - execute prerequisite actions first".\n",
" format(self.name))\n",
" kb = super().act(kb, args) # update knowledge base\n",
" for resource in self.consumes: # remove consumed resources\n",
" available_resources[resource] -= self.consumes[resource]\n",
" self.completed = True # set the task status to complete\n",
" return kb\n",
"\n",
" def has_consumable_resource(self, available_resources):\n",
" """\n",
" Ensure there are enough consumable resources for this action to execute.\n",
" """\n",
" for resource in self.consumes:\n",
" if available_resources.get(resource) is None:\n",
" return False\n",
" if available_resources[resource] < self.consumes[resource]:\n",
" return False\n",
" return True\n",
"\n",
" def has_usable_resource(self, available_resources):\n",
" """\n",
" Ensure there are enough usable resources for this action to execute.\n",
" """\n",
" for resource in self.uses:\n",
" if available_resources.get(resource) is None:\n",
" return False\n",
" if available_resources[resource] < self.uses[resource]:\n",
" return False\n",
" return True\n",
"\n",
" def inorder(self, job_order):\n",
" """\n",
" Ensure that all the jobs that had to be executed before the current one have been\n",
" successfully executed.\n",
" """\n",
" for jobs in job_order:\n",
" if self in jobs:\n",
" for job in jobs:\n",
" if job is self:\n",
" return True\n",
" if not job.completed:\n",
" return False\n",
" return True\n",
"
def job_shop_problem():\n",
" """\n",
" [Figure 11.1] JOB-SHOP-PROBLEM\n",
"\n",
" A job-shop scheduling problem for assembling two cars,\n",
" with resource and ordering constraints.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> p = job_shop_problem()\n",
" >>> p.goal_test()\n",
" False\n",
" >>> p.act(p.jobs[1][0])\n",
" >>> p.act(p.jobs[1][1])\n",
" >>> p.act(p.jobs[1][2])\n",
" >>> p.act(p.jobs[0][0])\n",
" >>> p.act(p.jobs[0][1])\n",
" >>> p.goal_test()\n",
" False\n",
" >>> p.act(p.jobs[0][2])\n",
" >>> p.goal_test()\n",
" True\n",
" >>>\n",
" """\n",
" resources = {'EngineHoists': 1, 'WheelStations': 2, 'Inspectors': 2, 'LugNuts': 500}\n",
"\n",
" add_engine1 = HLA('AddEngine1', precond='~Has(C1, E1)', effect='Has(C1, E1)', duration=30, use={'EngineHoists': 1})\n",
" add_engine2 = HLA('AddEngine2', precond='~Has(C2, E2)', effect='Has(C2, E2)', duration=60, use={'EngineHoists': 1})\n",
" add_wheels1 = HLA('AddWheels1', precond='~Has(C1, W1)', effect='Has(C1, W1)', duration=30, use={'WheelStations': 1}, consume={'LugNuts': 20})\n",
" add_wheels2 = HLA('AddWheels2', precond='~Has(C2, W2)', effect='Has(C2, W2)', duration=15, use={'WheelStations': 1}, consume={'LugNuts': 20})\n",
" inspect1 = HLA('Inspect1', precond='~Inspected(C1)', effect='Inspected(C1)', duration=10, use={'Inspectors': 1})\n",
" inspect2 = HLA('Inspect2', precond='~Inspected(C2)', effect='Inspected(C2)', duration=10, use={'Inspectors': 1})\n",
"\n",
" actions = [add_engine1, add_engine2, add_wheels1, add_wheels2, inspect1, inspect2]\n",
"\n",
" job_group1 = [add_engine1, add_wheels1, inspect1]\n",
" job_group2 = [add_engine2, add_wheels2, inspect2]\n",
"\n",
" return Problem(init='Car(C1) & Car(C2) & Wheels(W1) & Wheels(W2) & Engine(E2) & Engine(E2) & ~Has(C1, E1) & ~Has(C2, E2) & ~Has(C1, W1) & ~Has(C2, W2) & ~Inspected(C1) & ~Inspected(C2)',\n",
" goals='Has(C1, W1) & Has(C1, E1) & Inspected(C1) & Has(C2, W2) & Has(C2, E2) & Inspected(C2)',\n",
" actions=actions,\n",
" jobs=[job_group1, job_group2],\n",
" resources=resources)\n",
"
def double_tennis_problem():\n",
" """\n",
" [Figure 11.10] DOUBLE-TENNIS-PROBLEM\n",
"\n",
" A multiagent planning problem involving two partner tennis players\n",
" trying to return an approaching ball and repositioning around in the court.\n",
"\n",
" Example:\n",
" >>> from planning import *\n",
" >>> dtp = double_tennis_problem()\n",
" >>> goal_test(dtp.goals, dtp.init)\n",
" False\n",
" >>> dtp.act(expr('Go(A, RightBaseLine, LeftBaseLine)'))\n",
" >>> dtp.act(expr('Hit(A, Ball, RightBaseLine)'))\n",
" >>> goal_test(dtp.goals, dtp.init)\n",
" False\n",
" >>> dtp.act(expr('Go(A, LeftNet, RightBaseLine)'))\n",
" >>> goal_test(dtp.goals, dtp.init)\n",
" True\n",
" >>>\n",
" """\n",
"\n",
" return PlanningProblem(init='At(A, LeftBaseLine) & At(B, RightNet) & Approaching(Ball, RightBaseLine) & Partner(A, B) & Partner(B, A)',\n",
" goals='Returned(Ball) & At(x, LeftNet) & At(y, RightNet)',\n",
" actions=[Action('Hit(actor, Ball, loc)',\n",
" precond='Approaching(Ball, loc) & At(actor, loc)',\n",
" effect='Returned(Ball)'),\n",
" Action('Go(actor, to, loc)', \n",
" precond='At(actor, loc)',\n",
" effect='At(actor, to) & ~At(actor, loc)')])\n",
"