{ "cells": [ { "cell_type": "markdown", "metadata": { "collapsed": true }, "source": [ "# Planning\n", "#### Chapters 10-11\n", "----" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook serves as supporting material for topics covered in **Chapter 10 - Classical Planning** and **Chapter 11 - Planning and Acting in the Real World** from the book *[Artificial Intelligence: A Modern Approach](http://aima.cs.berkeley.edu)*. \n", "This notebook uses implementations from the [planning.py](https://github.com/aimacode/aima-python/blob/master/planning.py) module. \n", "See the [intro notebook](https://github.com/aimacode/aima-python/blob/master/intro.ipynb) for instructions.\n", "\n", "We'll start by looking at `PDDL` and `Action` data types for defining problems and actions. \n", "Then, we will see how to use them by trying to plan a trip from *Sibiu* to *Bucharest* across the familiar map of Romania, from [search.ipynb](https://github.com/aimacode/aima-python/blob/master/search.ipynb) \n", "followed by some common planning problems and methods of solving them.\n", "\n", "Let's start by importing everything from the planning module." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from planning import *\n", "from notebook import psource" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## CONTENTS\n", "\n", "- PDDL\n", "- Action\n", "- Planning Problems\n", " * Air cargo problem\n", " * Spare tire problem\n", " * Three block tower problem\n", " * Shopping Problem\n", " * Cake problem\n", "- Solving Planning Problems\n", " * GraphPlan" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## PDDL\n", "\n", "PDDL stands for Planning Domain Definition Language.\n", "The `PDDL` class is used to represent planning problems in this module. The following attributes are essential to be able to define a problem:\n", "* an initial state\n", "* a set of goals\n", "* a set of viable actions that can be executed in the search space of the problem\n", "\n", "View the source to see how the Python code tries to realise these." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", " \n", " \n", " \n", "\n", "\n", "

\n", "\n", "
class PDDL:\n",
       "    """\n",
       "    Planning Domain Definition Language (PDDL) used to define a search problem.\n",
       "    It stores states in a knowledge base consisting of first order logic statements.\n",
       "    The conjunction of these logical statements completely defines a state.\n",
       "    """\n",
       "\n",
       "    def __init__(self, init, goals, actions):\n",
       "        self.init = self.convert(init)\n",
       "        self.goals = expr(goals)\n",
       "        self.actions = actions\n",
       "\n",
       "    def convert(self, init):\n",
       "        """Converts strings into exprs"""\n",
       "        try:\n",
       "            init = conjuncts(expr(init))\n",
       "        except AttributeError:\n",
       "            init = expr(init)\n",
       "        return init\n",
       "\n",
       "    def goal_test(self):\n",
       "        """Checks if the goals have been reached"""\n",
       "        return all(goal in self.init for goal in conjuncts(self.goals))\n",
       "\n",
       "    def act(self, action):\n",
       "        """\n",
       "        Performs the action given as argument.\n",
       "        Note that action is an Expr like expr('Remove(Glass, Table)') or expr('Eat(Sandwich)')\n",
       "        """       \n",
       "        action_name = action.op\n",
       "        args = action.args\n",
       "        list_action = first(a for a in self.actions if a.name == action_name)\n",
       "        if list_action is None:\n",
       "            raise Exception("Action '{}' not found".format(action_name))\n",
       "        if not list_action.check_precond(self.init, args):\n",
       "            raise Exception("Action '{}' pre-conditions not satisfied".format(action))\n",
       "        self.init = list_action(self.init, args).clauses\n",
       "
\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "psource(PDDL)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `init` attribute is an expression that forms the initial knowledge base for the problem.\n", "
\n", "The `goals` attribute is an expression that indicates the goals to be reached by the problem.\n", "
\n", "Lastly, `actions` contains a list of `Action` objects that may be executed in the search space of the problem.\n", "
\n", "The `goal_test` method checks if the goal has been reached.\n", "
\n", "The `act` method acts out the given action and updates the current state.\n", "
\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## ACTION\n", "\n", "To be able to model a planning problem properly, it is essential to be able to represent an Action. Each action we model requires at least three things:\n", "* preconditions that the action must meet\n", "* the effects of executing the action\n", "* some expression that represents the action" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The module models actions using the `Action` class" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", " \n", " \n", " \n", "\n", "\n", "

\n", "\n", "
class Action:\n",
       "    """\n",
       "    Defines an action schema using preconditions and effects.\n",
       "    Use this to describe actions in PDDL.\n",
       "    action is an Expr where variables are given as arguments(args).\n",
       "    Precondition and effect are both lists with positive and negative literals.\n",
       "    Negative preconditions and effects are defined by adding a 'Not' before the name of the clause\n",
       "    Example:\n",
       "    precond = [expr("Human(person)"), expr("Hungry(Person)"), expr("NotEaten(food)")]\n",
       "    effect = [expr("Eaten(food)"), expr("Hungry(person)")]\n",
       "    eat = Action(expr("Eat(person, food)"), precond, effect)\n",
       "    """\n",
       "\n",
       "    def __init__(self, action, precond, effect):\n",
       "        action = expr(action)\n",
       "        self.name = action.op\n",
       "        self.args = action.args\n",
       "        self.precond, self.effect = self.convert(precond, effect)\n",
       "\n",
       "    def __call__(self, kb, args):\n",
       "        return self.act(kb, args)\n",
       "\n",
       "    def convert(self, precond, effect):\n",
       "        """Converts strings into Exprs"""\n",
       "\n",
       "        precond = precond.replace('~', 'Not')\n",
       "        if len(precond) > 0:\n",
       "            precond = expr(precond)\n",
       "        effect = effect.replace('~', 'Not')\n",
       "        if len(effect) > 0:\n",
       "            effect = expr(effect)\n",
       "\n",
       "        try:\n",
       "            precond = conjuncts(precond)\n",
       "        except AttributeError:\n",
       "            pass\n",
       "        try:\n",
       "            effect = conjuncts(effect)\n",
       "        except AttributeError:\n",
       "            pass\n",
       "\n",
       "        return precond, effect\n",
       "\n",
       "    def substitute(self, e, args):\n",
       "        """Replaces variables in expression with their respective Propositional symbol"""\n",
       "\n",
       "        new_args = list(e.args)\n",
       "        for num, x in enumerate(e.args):\n",
       "            for i, _ in enumerate(self.args):\n",
       "                if self.args[i] == x:\n",
       "                    new_args[num] = args[i]\n",
       "        return Expr(e.op, *new_args)\n",
       "\n",
       "    def check_precond(self, kb, args):\n",
       "        """Checks if the precondition is satisfied in the current state"""\n",
       "\n",
       "        if isinstance(kb, list):\n",
       "            kb = FolKB(kb)\n",
       "\n",
       "        for clause in self.precond:\n",
       "            if self.substitute(clause, args) not in kb.clauses:\n",
       "                return False\n",
       "        return True\n",
       "\n",
       "    def act(self, kb, args):\n",
       "        """Executes the action on the state's knowledge base"""\n",
       "\n",
       "        if isinstance(kb, list):\n",
       "            kb = FolKB(kb)\n",
       "\n",
       "        if not self.check_precond(kb, args):\n",
       "            raise Exception('Action pre-conditions not satisfied')\n",
       "        for clause in self.effect:\n",
       "            kb.tell(self.substitute(clause, args))\n",
       "            if clause.op[:3] == 'Not':\n",
       "                new_clause = Expr(clause.op[3:], *clause.args)\n",
       "\n",
       "                if kb.ask(self.substitute(new_clause, args)) is not False:\n",
       "                    kb.retract(self.substitute(new_clause, args))\n",
       "            else:\n",
       "                new_clause = Expr('Not' + clause.op, *clause.args)\n",
       "\n",
       "                if kb.ask(self.substitute(new_clause, args)) is not False:    \n",
       "                    kb.retract(self.substitute(new_clause, args))\n",
       "\n",
       "        return kb\n",
       "
\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "psource(Action)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This class represents an action given the expression, the preconditions and its effects. \n", "A list `precond` stores the preconditions of the action and a list `effect` stores its effects.\n", "Negative preconditions and effects are input using a `~` symbol before the clause, which are internally prefixed with a `Not` to make it easier to work with.\n", "For example, the negation of `At(obj, loc)` will be input as `~At(obj, loc)` and internally represented as `NotAt(obj, loc)`. \n", "This equivalently creates a new clause for each negative literal, removing the hassle of maintaining two separate knowledge bases.\n", "This greatly simplifies algorithms like `GraphPlan` as we will see later.\n", "The `convert` method takes an input string, parses it, removes conjunctions if any and returns a list of `Expr` objects.\n", "The `check_precond` method checks if the preconditions for that action are valid, given a `kb`.\n", "The `act` method carries out the action on the given knowledge base." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now lets try to define a planning problem using these tools. Since we already know about the map of Romania, lets see if we can plan a trip across a simplified map of Romania.\n", "\n", "Here is our simplified map definition:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "collapsed": true }, "outputs": [], "source": [ "from utils import *\n", "# this imports the required expr so we can create our knowledge base\n", "\n", "knowledge_base = [\n", " expr(\"Connected(Bucharest,Pitesti)\"),\n", " expr(\"Connected(Pitesti,Rimnicu)\"),\n", " expr(\"Connected(Rimnicu,Sibiu)\"),\n", " expr(\"Connected(Sibiu,Fagaras)\"),\n", " expr(\"Connected(Fagaras,Bucharest)\"),\n", " expr(\"Connected(Pitesti,Craiova)\"),\n", " expr(\"Connected(Craiova,Rimnicu)\")\n", " ]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us add some logic propositions to complete our knowledge about travelling around the map. These are the typical symmetry and transitivity properties of connections on a map. We can now be sure that our `knowledge_base` understands what it truly means for two locations to be connected in the sense usually meant by humans when we use the term.\n", "\n", "Let's also add our starting location - *Sibiu* to the map." ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "collapsed": true }, "outputs": [], "source": [ "knowledge_base.extend([\n", " expr(\"Connected(x,y) ==> Connected(y,x)\"),\n", " expr(\"Connected(x,y) & Connected(y,z) ==> Connected(x,z)\"),\n", " expr(\"At(Sibiu)\")\n", " ])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now have a complete knowledge base, which can be seen like this:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Connected(Bucharest, Pitesti),\n", " Connected(Pitesti, Rimnicu),\n", " Connected(Rimnicu, Sibiu),\n", " Connected(Sibiu, Fagaras),\n", " Connected(Fagaras, Bucharest),\n", " Connected(Pitesti, Craiova),\n", " Connected(Craiova, Rimnicu),\n", " (Connected(x, y) ==> Connected(y, x)),\n", " ((Connected(x, y) & Connected(y, z)) ==> Connected(x, z)),\n", " At(Sibiu)]" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "knowledge_base" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now define possible actions to our problem. We know that we can drive between any connected places. But, as is evident from [this](https://en.wikipedia.org/wiki/List_of_airports_in_Romania) list of Romanian airports, we can also fly directly between Sibiu, Bucharest, and Craiova.\n", "\n", "We can define these flight actions like this:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "collapsed": true }, "outputs": [], "source": [ "#Sibiu to Bucharest\n", "precond = 'At(Sibiu)'\n", "effect = 'At(Bucharest) & ~At(Sibiu)'\n", "fly_s_b = Action('Fly(Sibiu, Bucharest)', precond, effect)\n", "\n", "#Bucharest to Sibiu\n", "precond = 'At(Bucharest)'\n", "effect = 'At(Sibiu) & ~At(Bucharest)'\n", "fly_b_s = Action('Fly(Bucharest, Sibiu)', precond, effect)\n", "\n", "#Sibiu to Craiova\n", "precond = 'At(Sibiu)'\n", "effect = 'At(Craiova) & ~At(Sibiu)'\n", "fly_s_c = Action('Fly(Sibiu, Craiova)', precond, effect)\n", "\n", "#Craiova to Sibiu\n", "precond = 'At(Craiova)'\n", "effect = 'At(Sibiu) & ~At(Craiova)'\n", "fly_c_s = Action('Fly(Craiova, Sibiu)', precond, effect)\n", "\n", "#Bucharest to Craiova\n", "precond = 'At(Bucharest)'\n", "effect = 'At(Craiova) & ~At(Bucharest)'\n", "fly_b_c = Action('Fly(Bucharest, Craiova)', precond, effect)\n", "\n", "#Craiova to Bucharest\n", "precond = 'At(Craiova)'\n", "effect = 'At(Bucharest) & ~At(Craiova)'\n", "fly_c_b = Action('Fly(Craiova, Bucharest)', precond, effect)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And the drive actions like this." ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "collapsed": true }, "outputs": [], "source": [ "#Drive\n", "precond = 'At(x)'\n", "effect = 'At(y) & ~At(x)'\n", "drive = Action('Drive(x, y)', precond, effect)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Our goal is defined as" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "collapsed": true }, "outputs": [], "source": [ "goals = 'At(Bucharest)'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, we can define a a function that will tell us when we have reached our destination, Bucharest." ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "collapsed": true }, "outputs": [], "source": [ "def goal_test(kb):\n", " return kb.ask(expr('At(Bucharest)'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Thus, with all the components in place, we can define the planning problem." ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "collapsed": true }, "outputs": [], "source": [ "prob = PDDL(knowledge_base, goals, [fly_s_b, fly_b_s, fly_s_c, fly_c_s, fly_b_c, fly_c_b, drive])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## PLANNING PROBLEMS\n", "---\n", "\n", "## Air Cargo Problem" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the Air Cargo problem, we start with cargo at two airports, SFO and JFK. Our goal is to send each cargo to the other airport. We have two airplanes to help us accomplish the task. \n", "The problem can be defined with three actions: Load, Unload and Fly. \n", "Let us look how the `air_cargo` problem has been defined in the module. " ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", " \n", " \n", " \n", "\n", "\n", "

\n", "\n", "
def air_cargo():\n",
       "    """Air cargo problem"""\n",
       "\n",
       "    return PDDL(init='At(C1, SFO) & At(C2, JFK) & At(P1, SFO) & At(P2, JFK) & Cargo(C1) & Cargo(C2) & Plane(P1) & Plane(P2) & Airport(SFO) & Airport(JFK)',\n",
       "                goals='At(C1, JFK) & At(C2, SFO)', \n",
       "                actions=[Action('Load(c, p, a)', \n",
       "                                precond='At(c, a) & At(p, a) & Cargo(c) & Plane(p) & Airport(a)', \n",
       "                                effect='In(c, p) & ~At(c, a)'),\n",
       "                         Action('Unload(c, p, a)',\n",
       "                                precond='In(c, p) & At(p, a) & Cargo(c) & Plane(p) & Airport(a)',\n",
       "                                effect='At(c, a) & ~In(c, p)'),\n",
       "                         Action('Fly(p, f, to)',\n",
       "                                precond='At(p, f) & Plane(p) & Airport(f) & Airport(to)',\n",
       "                                effect='At(p, to) & ~At(p, f)')])\n",
       "
\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "psource(air_cargo)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**At(c, a):** The cargo **'c'** is at airport **'a'**.\n", "\n", "**~At(c, a):** The cargo **'c'** is _not_ at airport **'a'**.\n", "\n", "**In(c, p):** Cargo **'c'** is in plane **'p'**.\n", "\n", "**~In(c, p):** Cargo **'c'** is _not_ in plane **'p'**.\n", "\n", "**Cargo(c):** Declare **'c'** as cargo.\n", "\n", "**Plane(p):** Declare **'p'** as plane.\n", "\n", "**Airport(a):** Declare **'a'** as airport.\n", "\n", "\n", "\n", "In the `initial_state`, we have cargo C1, plane P1 at airport SFO and cargo C2, plane P2 at airport JFK. \n", "Our goal state is to have cargo C1 at airport JFK and cargo C2 at airport SFO. We will discuss on how to achieve this. Let us now define an object of the `air_cargo` problem:" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "collapsed": true }, "outputs": [], "source": [ "airCargo = air_cargo()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before taking any actions, we will check if `airCargo` has reached its goal:" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "False\n" ] } ], "source": [ "print(airCargo.goal_test())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It returns False because the goal state is not yet reached. Now, we define the sequence of actions that it should take in order to achieve the goal.\n", "The actions are then carried out on the `airCargo` PDDL.\n", "\n", "The actions available to us are the following: Load, Unload, Fly\n", "\n", "**Load(c, p, a):** Load cargo **'c'** into plane **'p'** from airport **'a'**.\n", "\n", "**Fly(p, f, t):** Fly the plane **'p'** from airport **'f'** to airport **'t'**.\n", "\n", "**Unload(c, p, a):** Unload cargo **'c'** from plane **'p'** to airport **'a'**.\n", "\n", "This problem can have multiple valid solutions.\n", "One such solution is shown below." ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "collapsed": true }, "outputs": [], "source": [ "solution = [expr(\"Load(C1 , P1, SFO)\"),\n", " expr(\"Fly(P1, SFO, JFK)\"),\n", " expr(\"Unload(C1, P1, JFK)\"),\n", " expr(\"Load(C2, P2, JFK)\"),\n", " expr(\"Fly(P2, JFK, SFO)\"),\n", " expr(\"Unload (C2, P2, SFO)\")] \n", "\n", "for action in solution:\n", " airCargo.act(action)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As the `airCargo` has taken all the steps it needed in order to achieve the goal, we can now check if it has acheived its goal:" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True\n" ] } ], "source": [ "print(airCargo.goal_test())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It has now achieved its goal." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## The Spare Tire Problem" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's consider the problem of changing a flat tire of a car. \n", "The goal is to mount a spare tire onto the car's axle, given that we have a flat tire on the axle and a spare tire in the trunk. " ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", " \n", " \n", " \n", "\n", "\n", "

\n", "\n", "
def spare_tire():\n",
       "    """Spare tire problem"""\n",
       "\n",
       "    return PDDL(init='Tire(Flat) & Tire(Spare) & At(Flat, Axle) & At(Spare, Trunk)',\n",
       "                goals='At(Spare, Axle) & At(Flat, Ground)',\n",
       "                actions=[Action('Remove(obj, loc)',\n",
       "                                precond='At(obj, loc)',\n",
       "                                effect='At(obj, Ground) & ~At(obj, loc)'),\n",
       "                         Action('PutOn(t, Axle)',\n",
       "                                precond='Tire(t) & At(t, Ground) & ~At(Flat, Axle)',\n",
       "                                effect='At(t, Axle) & ~At(t, Ground)'),\n",
       "                         Action('LeaveOvernight',\n",
       "                                precond='',\n",
       "                                effect='~At(Spare, Ground) & ~At(Spare, Axle) & ~At(Spare, Trunk) & \\\n",
       "                                        ~At(Flat, Ground) & ~At(Flat, Axle) & ~At(Flat, Trunk)')])\n",
       "
\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "psource(spare_tire)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**At(obj, loc):** object **'obj'** is at location **'loc'**.\n", "\n", "**~At(obj, loc):** object **'obj'** is _not_ at location **'loc'**.\n", "\n", "**Tire(t):** Declare a tire of type **'t'**.\n", "\n", "Let us now define an object of `spare_tire` problem:" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "collapsed": true }, "outputs": [], "source": [ "spareTire = spare_tire()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before taking any actions, we will check if `spare_tire` has reached its goal:" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "False\n" ] } ], "source": [ "print(spareTire.goal_test())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As we can see, it hasn't completed the goal. \n", "We now define a possible solution that can help us reach the goal of having a spare tire mounted onto the car's axle. \n", "The actions are then carried out on the `spareTire` PDDL.\n", "\n", "The actions available to us are the following: Remove, PutOn\n", "\n", "**Remove(obj, loc):** Remove the tire **'obj'** from the location **'loc'**.\n", "\n", "**PutOn(t, Axle):** Attach the tire **'t'** on the Axle.\n", "\n", "**LeaveOvernight():** We live in a particularly bad neighborhood and all tires, flat or not, are stolen if we leave them overnight.\n", "\n" ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "collapsed": true }, "outputs": [], "source": [ "solution = [expr(\"Remove(Flat, Axle)\"),\n", " expr(\"Remove(Spare, Trunk)\"),\n", " expr(\"PutOn(Spare, Axle)\")]\n", "\n", "for action in solution:\n", " spareTire.act(action)" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True\n" ] } ], "source": [ "print(spareTire.goal_test())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is a valid solution.\n", "
\n", "Another possible solution is" ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "collapsed": true }, "outputs": [], "source": [ "spareTire = spare_tire()\n", "\n", "solution = [expr('Remove(Spare, Trunk)'),\n", " expr('Remove(Flat, Axle)'),\n", " expr('PutOn(Spare, Axle)')]\n", "\n", "for action in solution:\n", " spareTire.act(action)" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True\n" ] } ], "source": [ "print(spareTire.goal_test())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that both solutions work, which means that the problem can be solved irrespective of the order in which the `Remove` actions take place, as long as both `Remove` actions take place before the `PutOn` action." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We have successfully mounted a spare tire onto the axle." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Three Block Tower Problem" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This problem's domain consists of a set of cube-shaped blocks sitting on a table. \n", "The blocks can be stacked, but only one block can fit directly on top of another.\n", "A robot arm can pick up a block and move it to another position, either on the table or on top of another block. \n", "The arm can pick up only one block at a time, so it cannot pick up a block that has another one on it. \n", "The goal will always be to build one or more stacks of blocks. \n", "In our case, we consider only three blocks.\n", "The particular configuration we will use is called the Sussman anomaly after Prof. Gerry Sussman." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's take a look at the definition of `three_block_tower()` in the module." ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", " \n", " \n", " \n", "\n", "\n", "

\n", "\n", "
def three_block_tower():\n",
       "    """Sussman Anomaly problem"""\n",
       "\n",
       "    return PDDL(init='On(A, Table) & On(B, Table) & On(C, A) & Block(A) & Block(B) & Block(C) & Clear(B) & Clear(C)',\n",
       "                goals='On(A, B) & On(B, C)',\n",
       "                actions=[Action('Move(b, x, y)',\n",
       "                                precond='On(b, x) & Clear(b) & Clear(y) & Block(b) & Block(y)',\n",
       "                                effect='On(b, y) & Clear(x) & ~On(b, x) & ~Clear(y)'),\n",
       "                         Action('MoveToTable(b, x)',\n",
       "                                precond='On(b, x) & Clear(b) & Block(b)',\n",
       "                                effect='On(b, Table) & Clear(x) & ~On(b, x)')])\n",
       "
\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "psource(three_block_tower)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**On(b, x):** The block **'b'** is on **'x'**. **'x'** can be a table or a block.\n", "\n", "**~On(b, x):** The block **'b'** is _not_ on **'x'**. **'x'** can be a table or a block.\n", "\n", "**Block(b):** Declares **'b'** as a block.\n", "\n", "**Clear(x):** To indicate that there is nothing on **'x'** and it is free to be moved around.\n", "\n", "**~Clear(x):** To indicate that there is something on **'x'** and it cannot be moved.\n", " \n", " Let us now define an object of `three_block_tower` problem:" ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "collapsed": true }, "outputs": [], "source": [ "threeBlockTower = three_block_tower()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before taking any actions, we will check if `threeBlockTower` has reached its goal:" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "False\n" ] } ], "source": [ "print(threeBlockTower.goal_test())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As we can see, it hasn't completed the goal. \n", "We now define a sequence of actions that can stack three blocks in the required order. \n", "The actions are then carried out on the `threeBlockTower` PDDL.\n", "\n", "The actions available to us are the following: MoveToTable, Move\n", "\n", "**MoveToTable(b, x): ** Move box **'b'** stacked on **'x'** to the table, given that box **'b'** is clear.\n", "\n", "**Move(b, x, y): ** Move box **'b'** stacked on **'x'** to the top of **'y'**, given that both **'b'** and **'y'** are clear.\n" ] }, { "cell_type": "code", "execution_count": 27, "metadata": { "collapsed": true }, "outputs": [], "source": [ "solution = [expr(\"MoveToTable(C, A)\"),\n", " expr(\"Move(B, Table, C)\"),\n", " expr(\"Move(A, Table, B)\")]\n", "\n", "for action in solution:\n", " threeBlockTower.act(action)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As the `three_block_tower` has taken all the steps it needed in order to achieve the goal, we can now check if it has acheived its goal." ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True\n" ] } ], "source": [ "print(threeBlockTower.goal_test())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It has now successfully achieved its goal i.e, to build a stack of three blocks in the specified order." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Shopping Problem" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This problem requires us to acquire a carton of milk, a banana and a drill.\n", "Initially, we start from home and it is known to us that milk and bananas are available in the supermarket and the hardware store sells drills.\n", "Let's take a look at the definition of the `shopping_problem` in the module." ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", " \n", " \n", " \n", "\n", "\n", "

\n", "\n", "
def shopping_problem():\n",
       "    """Shopping problem"""\n",
       "\n",
       "    return PDDL(init='At(Home) & Sells(SM, Milk) & Sells(SM, Banana) & Sells(HW, Drill)',\n",
       "                goals='Have(Milk) & Have(Banana) & Have(Drill)', \n",
       "                actions=[Action('Buy(x, store)',\n",
       "                                precond='At(store) & Sells(store, x)',\n",
       "                                effect='Have(x)'),\n",
       "                         Action('Go(x, y)',\n",
       "                                precond='At(x)',\n",
       "                                effect='At(y) & ~At(x)')])\n",
       "
\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "psource(shopping_problem)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**At(x):** Indicates that we are currently at **'x'** where **'x'** can be Home, SM (supermarket) or HW (Hardware store).\n", "\n", "**~At(x):** Indicates that we are currently _not_ at **'x'**.\n", "\n", "**Sells(s, x):** Indicates that item **'x'** can be bought from store **'s'**.\n", "\n", "**Have(x):** Indicates that we possess the item **'x'**." ] }, { "cell_type": "code", "execution_count": 30, "metadata": { "collapsed": true }, "outputs": [], "source": [ "shoppingProblem = shopping_problem()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's first check whether the goal state Have(Milk), Have(Banana), Have(Drill) is reached or not." ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "False\n" ] } ], "source": [ "print(shoppingProblem.goal_test())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's look at the possible actions\n", "\n", "**Buy(x, store):** Buy an item **'x'** from a **'store'** given that the **'store'** sells **'x'**.\n", "\n", "**Go(x, y):** Go to destination **'y'** starting from source **'x'**." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now define a valid solution that will help us reach the goal.\n", "The sequence of actions will then be carried out onto the `shoppingProblem` PDDL." ] }, { "cell_type": "code", "execution_count": 32, "metadata": { "collapsed": true }, "outputs": [], "source": [ "solution = [expr('Go(Home, SM)'),\n", " expr('Buy(Milk, SM)'),\n", " expr('Buy(Banana, SM)'),\n", " expr('Go(SM, HW)'),\n", " expr('Buy(Drill, HW)')]\n", "\n", "for action in solution:\n", " shoppingProblem.act(action)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We have taken the steps required to acquire all the stuff we need. \n", "Let's see if we have reached our goal." ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "True" ] }, "execution_count": 33, "metadata": {}, "output_type": "execute_result" } ], "source": [ "shoppingProblem.goal_test()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It has now successfully achieved the goal." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Have Cake and Eat Cake Too" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This problem requires us to reach the state of having a cake and having eaten a cake simlutaneously, given a single cake.\n", "Let's first take a look at the definition of the `have_cake_and_eat_cake_too` problem in the module." ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", " \n", " \n", " \n", "\n", "\n", "

\n", "\n", "
def have_cake_and_eat_cake_too():\n",
       "    """Cake problem"""\n",
       "\n",
       "    return PDDL(init='Have(Cake)',\n",
       "                goals='Have(Cake) & Eaten(Cake)',\n",
       "                actions=[Action('Eat(Cake)',\n",
       "                                precond='Have(Cake)',\n",
       "                                effect='Eaten(Cake) & ~Have(Cake)'),\n",
       "                         Action('Bake(Cake)',\n",
       "                                precond='~Have(Cake)',\n",
       "                                effect='Have(Cake)')])\n",
       "
\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "psource(have_cake_and_eat_cake_too)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Since this problem doesn't involve variables, states can be considered similar to symbols in propositional logic.\n", "\n", "**Have(Cake):** Declares that we have a **'Cake'**.\n", "\n", "**~Have(Cake):** Declares that we _don't_ have a **'Cake'**." ] }, { "cell_type": "code", "execution_count": 35, "metadata": { "collapsed": true }, "outputs": [], "source": [ "cakeProblem = have_cake_and_eat_cake_too()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First let us check whether the goal state 'Have(Cake)' and 'Eaten(Cake)' are reached or not." ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "False\n" ] } ], "source": [ "print(cakeProblem.goal_test())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us look at the possible actions.\n", "\n", "**Bake(x):** To bake **' x '**.\n", "\n", "**Eat(x):** To eat **' x '**." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now define a valid solution that can help us reach the goal.\n", "The sequence of actions will then be acted upon the `cakeProblem` PDDL." ] }, { "cell_type": "code", "execution_count": 37, "metadata": { "collapsed": true }, "outputs": [], "source": [ "solution = [expr(\"Eat(Cake)\"),\n", " expr(\"Bake(Cake)\")]\n", "\n", "for action in solution:\n", " cakeProblem.act(action)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we have made actions to bake the cake and eat the cake. Let us check if we have reached the goal." ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "True\n" ] } ], "source": [ "print(cakeProblem.goal_test())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It has now successfully achieved its goal i.e, to have and eat the cake." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "One might wonder if the order of the actions matters for this problem.\n", "Let's see for ourselves." ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "ename": "Exception", "evalue": "Action 'Bake(Cake)' pre-conditions not satisfied", "output_type": "error", "traceback": [ "\u001b[1;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[1;31mException\u001b[0m Traceback (most recent call last)", "\u001b[1;32m\u001b[0m in \u001b[0;36m\u001b[1;34m()\u001b[0m\n\u001b[0;32m 5\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 6\u001b[0m \u001b[1;32mfor\u001b[0m \u001b[0maction\u001b[0m \u001b[1;32min\u001b[0m \u001b[0msolution\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m----> 7\u001b[1;33m \u001b[0mcakeProblem\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mact\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0maction\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m", "\u001b[1;32m~\\Documents\\Python\\Aima\\aima-python\\planning.py\u001b[0m in \u001b[0;36mact\u001b[1;34m(self, action)\u001b[0m\n\u001b[0;32m 44\u001b[0m \u001b[1;32mraise\u001b[0m \u001b[0mException\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m\"Action '{}' not found\"\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mformat\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0maction_name\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 45\u001b[0m \u001b[1;32mif\u001b[0m \u001b[1;32mnot\u001b[0m \u001b[0mlist_action\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcheck_precond\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0minit\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0margs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m:\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 46\u001b[1;33m \u001b[1;32mraise\u001b[0m \u001b[0mException\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;34m\"Action '{}' pre-conditions not satisfied\"\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mformat\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0maction\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 47\u001b[0m \u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0minit\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mlist_action\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0minit\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0margs\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mclauses\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 48\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n", "\u001b[1;31mException\u001b[0m: Action 'Bake(Cake)' pre-conditions not satisfied" ] } ], "source": [ "cakeProblem = have_cake_and_eat_cake_too()\n", "\n", "solution = [expr('Bake(Cake)'),\n", " expr('Eat(Cake)')]\n", "\n", "for action in solution:\n", " cakeProblem.act(action)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It raises an exception.\n", "Indeed, according to the problem, we cannot bake a cake if we already have one.\n", "In planning terms, '~Have(Cake)' is a precondition to the action 'Bake(Cake)'.\n", "Hence, this solution is invalid." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SOLVING PLANNING PROBLEMS\n", "----\n", "### GRAPHPLAN\n", "
\n", "The GraphPlan algorithm is a popular method of solving classical planning problems.\n", "Before we get into the details of the algorithm, let's look at a special data structure called **planning graph**, used to give better heuristic estimates and plays a key role in the GraphPlan algorithm." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Planning Graph\n", "A planning graph is a directed graph organized into levels. \n", "Each level contains information about the current state of the knowledge base and the possible state-action links to and from that level.\n", "The first level contains the initial state with nodes representing each fluent that holds in that level.\n", "This level has state-action links linking each state to valid actions in that state.\n", "Each action is linked to all its preconditions and its effect states.\n", "Based on these effects, the next level is constructed.\n", "The next level contains similarly structured information about the next state.\n", "In this way, the graph is expanded using state-action links till we reach a state where all the required goals hold true simultaneously.\n", "We can say that we have reached our goal if none of the goal states in the current level are mutually exclusive.\n", "This will be explained in detail later.\n", "
\n", "Planning graphs only work for propositional planning problems, hence we need to eliminate all variables by generating all possible substitutions.\n", "
\n", "For example, the planning graph of the `have_cake_and_eat_cake_too` problem might look like this\n", "![title](images/cake_graph.jpg)\n", "
\n", "The black lines indicate links between states and actions.\n", "
\n", "In every planning problem, we are allowed to carry out the `no-op` action, ie, we can choose no action for a particular state.\n", "These are called 'Persistence' actions and are represented in the graph by the small square boxes.\n", "In technical terms, a persistence action has effects same as its preconditions.\n", "This enables us to carry a state to the next level.\n", "
\n", "
\n", "The gray lines indicate mutual exclusivity.\n", "This means that the actions connected by a gray line cannot be taken together.\n", "Mutual exclusivity (mutex) occurs in the following cases:\n", "1. **Inconsistent effects**: One action negates the effect of the other. For example, _Eat(Cake)_ and the persistence of _Have(Cake)_ have inconsistent effects because they disagree on the effect _Have(Cake)_\n", "2. **Interference**: One of the effects of an action is the negation of a precondition of the other. For example, _Eat(Cake)_ interferes with the persistence of _Have(Cake)_ by negating its precondition.\n", "3. **Competing needs**: One of the preconditions of one action is mutually exclusive with a precondition of the other. For example, _Bake(Cake)_ and _Eat(Cake)_ are mutex because they compete on the value of the _Have(Cake)_ precondition." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the module, planning graphs have been implemented using two classes, `Level` which stores data for a particular level and `Graph` which connects multiple levels together.\n", "Let's look at the `Level` class." ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", " \n", " \n", " \n", "\n", "\n", "

\n", "\n", "
class Level:\n",
       "    """\n",
       "    Contains the state of the planning problem\n",
       "    and exhaustive list of actions which use the\n",
       "    states as pre-condition.\n",
       "    """\n",
       "\n",
       "    def __init__(self, kb):\n",
       "        """Initializes variables to hold state and action details of a level"""\n",
       "\n",
       "        self.kb = kb\n",
       "        # current state\n",
       "        self.current_state = kb.clauses\n",
       "        # current action to state link\n",
       "        self.current_action_links = {}\n",
       "        # current state to action link\n",
       "        self.current_state_links = {}\n",
       "        # current action to next state link\n",
       "        self.next_action_links = {}\n",
       "        # next state to current action link\n",
       "        self.next_state_links = {}\n",
       "        # mutually exclusive actions\n",
       "        self.mutex = []\n",
       "\n",
       "    def __call__(self, actions, objects):\n",
       "        self.build(actions, objects)\n",
       "        self.find_mutex()\n",
       "\n",
       "    def separate(self, e):\n",
       "        """Separates an iterable of elements into positive and negative parts"""\n",
       "\n",
       "        positive = []\n",
       "        negative = []\n",
       "        for clause in e:\n",
       "            if clause.op[:3] == 'Not':\n",
       "                negative.append(clause)\n",
       "            else:\n",
       "                positive.append(clause)\n",
       "        return positive, negative\n",
       "\n",
       "    def find_mutex(self):\n",
       "        """Finds mutually exclusive actions"""\n",
       "\n",
       "        # Inconsistent effects\n",
       "        pos_nsl, neg_nsl = self.separate(self.next_state_links)\n",
       "\n",
       "        for negeff in neg_nsl:\n",
       "            new_negeff = Expr(negeff.op[3:], *negeff.args)\n",
       "            for poseff in pos_nsl:\n",
       "                if new_negeff == poseff:\n",
       "                    for a in self.next_state_links[poseff]:\n",
       "                        for b in self.next_state_links[negeff]:\n",
       "                            if {a, b} not in self.mutex:\n",
       "                                self.mutex.append({a, b})\n",
       "\n",
       "        # Interference will be calculated with the last step\n",
       "        pos_csl, neg_csl = self.separate(self.current_state_links)\n",
       "\n",
       "        # Competing needs\n",
       "        for posprecond in pos_csl:\n",
       "            for negprecond in neg_csl:\n",
       "                new_negprecond = Expr(negprecond.op[3:], *negprecond.args)\n",
       "                if new_negprecond == posprecond:\n",
       "                    for a in self.current_state_links[posprecond]:\n",
       "                        for b in self.current_state_links[negprecond]:\n",
       "                            if {a, b} not in self.mutex:\n",
       "                                self.mutex.append({a, b})\n",
       "\n",
       "        # Inconsistent support\n",
       "        state_mutex = []\n",
       "        for pair in self.mutex:\n",
       "            next_state_0 = self.next_action_links[list(pair)[0]]\n",
       "            if len(pair) == 2:\n",
       "                next_state_1 = self.next_action_links[list(pair)[1]]\n",
       "            else:\n",
       "                next_state_1 = self.next_action_links[list(pair)[0]]\n",
       "            if (len(next_state_0) == 1) and (len(next_state_1) == 1):\n",
       "                state_mutex.append({next_state_0[0], next_state_1[0]})\n",
       "        \n",
       "        self.mutex = self.mutex + state_mutex\n",
       "\n",
       "    def build(self, actions, objects):\n",
       "        """Populates the lists and dictionaries containing the state action dependencies"""\n",
       "\n",
       "        for clause in self.current_state:\n",
       "            p_expr = Expr('P' + clause.op, *clause.args)\n",
       "            self.current_action_links[p_expr] = [clause]\n",
       "            self.next_action_links[p_expr] = [clause]\n",
       "            self.current_state_links[clause] = [p_expr]\n",
       "            self.next_state_links[clause] = [p_expr]\n",
       "\n",
       "        for a in actions:\n",
       "            num_args = len(a.args)\n",
       "            possible_args = tuple(itertools.permutations(objects, num_args))\n",
       "\n",
       "            for arg in possible_args:\n",
       "                if a.check_precond(self.kb, arg):\n",
       "                    for num, symbol in enumerate(a.args):\n",
       "                        if not symbol.op.islower():\n",
       "                            arg = list(arg)\n",
       "                            arg[num] = symbol\n",
       "                            arg = tuple(arg)\n",
       "\n",
       "                    new_action = a.substitute(Expr(a.name, *a.args), arg)\n",
       "                    self.current_action_links[new_action] = []\n",
       "\n",
       "                    for clause in a.precond:\n",
       "                        new_clause = a.substitute(clause, arg)\n",
       "                        self.current_action_links[new_action].append(new_clause)\n",
       "                        if new_clause in self.current_state_links:\n",
       "                            self.current_state_links[new_clause].append(new_action)\n",
       "                        else:\n",
       "                            self.current_state_links[new_clause] = [new_action]\n",
       "                   \n",
       "                    self.next_action_links[new_action] = []\n",
       "                    for clause in a.effect:\n",
       "                        new_clause = a.substitute(clause, arg)\n",
       "\n",
       "                        self.next_action_links[new_action].append(new_clause)\n",
       "                        if new_clause in self.next_state_links:\n",
       "                            self.next_state_links[new_clause].append(new_action)\n",
       "                        else:\n",
       "                            self.next_state_links[new_clause] = [new_action]\n",
       "\n",
       "    def perform_actions(self):\n",
       "        """Performs the necessary actions and returns a new Level"""\n",
       "\n",
       "        new_kb = FolKB(list(set(self.next_state_links.keys())))\n",
       "        return Level(new_kb)\n",
       "
\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "psource(Level)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Each level stores the following data\n", "1. The current state of the level in `current_state`\n", "2. Links from an action to its preconditions in `current_action_links`\n", "3. Links from a state to the possible actions in that state in `current_state_links`\n", "4. Links from each action to its effects in `next_action_links`\n", "5. Links from each possible next state from each action in `next_state_links`. This stores the same information as the `current_action_links` of the next level.\n", "6. Mutex links in `mutex`.\n", "
\n", "
\n", "The `find_mutex` method finds the mutex links according to the points given above.\n", "
\n", "The `build` method populates the data structures storing the state and action information.\n", "Persistence actions for each clause in the current state are also defined here. \n", "The newly created persistence action has the same name as its state, prefixed with a 'P'." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's now look at the `Graph` class." ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", " \n", " \n", " \n", "\n", "\n", "

\n", "\n", "
class Graph:\n",
       "    """\n",
       "    Contains levels of state and actions\n",
       "    Used in graph planning algorithm to extract a solution\n",
       "    """\n",
       "\n",
       "    def __init__(self, pddl):\n",
       "        self.pddl = pddl\n",
       "        self.kb = FolKB(pddl.init)\n",
       "        self.levels = [Level(self.kb)]\n",
       "        self.objects = set(arg for clause in self.kb.clauses for arg in clause.args)\n",
       "\n",
       "    def __call__(self):\n",
       "        self.expand_graph()\n",
       "\n",
       "    def expand_graph(self):\n",
       "        """Expands the graph by a level"""\n",
       "\n",
       "        last_level = self.levels[-1]\n",
       "        last_level(self.pddl.actions, self.objects)\n",
       "        self.levels.append(last_level.perform_actions())\n",
       "\n",
       "    def non_mutex_goals(self, goals, index):\n",
       "        """Checks whether the goals are mutually exclusive"""\n",
       "\n",
       "        goal_perm = itertools.combinations(goals, 2)\n",
       "        for g in goal_perm:\n",
       "            if set(g) in self.levels[index].mutex:\n",
       "                return False\n",
       "        return True\n",
       "
\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "psource(Graph)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The class stores a problem definition in `pddl`, \n", "a knowledge base in `kb`, \n", "a list of `Level` objects in `levels` and \n", "all the possible arguments found in the initial state of the problem in `objects`.\n", "
\n", "The `expand_graph` method generates a new level of the graph.\n", "This method is invoked when the goal conditions haven't been met in the current level or the actions that lead to it are mutually exclusive.\n", "The `non_mutex_goals` method checks whether the goals in the current state are mutually exclusive.\n", "
\n", "
\n", "Using these two classes, we can define a planning graph which can either be used to provide reliable heuristics for planning problems or used in the `GraphPlan` algorithm.\n", "
\n", "Let's have a look at the `GraphPlan` class." ] }, { "cell_type": "code", "execution_count": 42, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", " \n", " \n", " \n", "\n", "\n", "

\n", "\n", "
class GraphPlan:\n",
       "    """\n",
       "    Class for formulation GraphPlan algorithm\n",
       "    Constructs a graph of state and action space\n",
       "    Returns solution for the planning problem\n",
       "    """\n",
       "\n",
       "    def __init__(self, pddl):\n",
       "        self.graph = Graph(pddl)\n",
       "        self.nogoods = []\n",
       "        self.solution = []\n",
       "\n",
       "    def check_leveloff(self):\n",
       "        """Checks if the graph has levelled off"""\n",
       "\n",
       "        check = (set(self.graph.levels[-1].current_state) == set(self.graph.levels[-2].current_state))\n",
       "\n",
       "        if check:\n",
       "            return True\n",
       "\n",
       "    def extract_solution(self, goals, index):\n",
       "        """Extracts the solution"""\n",
       "\n",
       "        level = self.graph.levels[index]    \n",
       "        if not self.graph.non_mutex_goals(goals, index):\n",
       "            self.nogoods.append((level, goals))\n",
       "            return\n",
       "\n",
       "        level = self.graph.levels[index - 1]    \n",
       "\n",
       "        # Create all combinations of actions that satisfy the goal    \n",
       "        actions = []\n",
       "        for goal in goals:\n",
       "            actions.append(level.next_state_links[goal])    \n",
       "\n",
       "        all_actions = list(itertools.product(*actions))    \n",
       "\n",
       "        # Filter out non-mutex actions\n",
       "        non_mutex_actions = []    \n",
       "        for action_tuple in all_actions:\n",
       "            action_pairs = itertools.combinations(list(set(action_tuple)), 2)        \n",
       "            non_mutex_actions.append(list(set(action_tuple)))        \n",
       "            for pair in action_pairs:            \n",
       "                if set(pair) in level.mutex:\n",
       "                    non_mutex_actions.pop(-1)\n",
       "                    break\n",
       "    \n",
       "\n",
       "        # Recursion\n",
       "        for action_list in non_mutex_actions:        \n",
       "            if [action_list, index] not in self.solution:\n",
       "                self.solution.append([action_list, index])\n",
       "\n",
       "                new_goals = []\n",
       "                for act in set(action_list):                \n",
       "                    if act in level.current_action_links:\n",
       "                        new_goals = new_goals + level.current_action_links[act]\n",
       "\n",
       "                if abs(index) + 1 == len(self.graph.levels):\n",
       "                    return\n",
       "                elif (level, new_goals) in self.nogoods:\n",
       "                    return\n",
       "                else:\n",
       "                    self.extract_solution(new_goals, index - 1)\n",
       "\n",
       "        # Level-Order multiple solutions\n",
       "        solution = []\n",
       "        for item in self.solution:\n",
       "            if item[1] == -1:\n",
       "                solution.append([])\n",
       "                solution[-1].append(item[0])\n",
       "            else:\n",
       "                solution[-1].append(item[0])\n",
       "\n",
       "        for num, item in enumerate(solution):\n",
       "            item.reverse()\n",
       "            solution[num] = item\n",
       "\n",
       "        return solution\n",
       "
\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "psource(GraphPlan)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Given a planning problem defined as a PDDL, `GraphPlan` creates a planning graph stored in `graph` and expands it till it reaches a state where all its required goals are present simultaneously without mutual exclusivity.\n", "
\n", "Once a goal is found, `extract_solution` is called.\n", "This method recursively finds the path to a solution given a planning graph.\n", "In the case where `extract_solution` fails to find a solution for a set of goals as a given level, we record the `(level, goals)` pair as a **no-good**.\n", "Whenever `extract_solution` is called again with the same level and goals, we can find the recorded no-good and immediately return failure rather than searching again. \n", "No-goods are also used in the termination test.\n", "
\n", "The `check_leveloff` method checks if the planning graph for the problem has **levelled-off**, ie, it has the same states, actions and mutex pairs as the previous level.\n", "If the graph has already levelled off and we haven't found a solution, there is no point expanding the graph, as it won't lead to anything new.\n", "In such a case, we can declare that the planning problem is unsolvable with the given constraints.\n", "
\n", "
\n", "To summarize, the `GraphPlan` algorithm calls `expand_graph` and tests whether it has reached the goal and if the goals are non-mutex.\n", "
\n", "If so, `extract_solution` is invoked which recursively reconstructs the solution from the planning graph.\n", "
\n", "If not, then we check if our graph has levelled off and continue if it hasn't." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's solve a few planning problems that we had defined earlier." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Air cargo problem:\n", "
\n", "In accordance with the summary above, we have defined a helper function to carry out `GraphPlan` on the `air_cargo` problem.\n", "The function is pretty straightforward.\n", "Let's have a look." ] }, { "cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", " \n", " \n", " \n", "\n", "\n", "

\n", "\n", "
def air_cargo_graphplan():\n",
       "    """Solves the air cargo problem using GraphPlan"""\n",
       "\n",
       "    pddl = air_cargo()\n",
       "    graphplan = GraphPlan(pddl)\n",
       "\n",
       "    def goal_test(kb, goals):\n",
       "        return all(kb.ask(q) is not False for q in goals)\n",
       "\n",
       "    goals = expr('At(C1, JFK), At(C2, SFO)')\n",
       "\n",
       "    while True:\n",
       "        if (goal_test(graphplan.graph.levels[-1].kb, goals) and graphplan.graph.non_mutex_goals(goals, -1)):\n",
       "            solution = graphplan.extract_solution(goals, -1)\n",
       "            if solution:\n",
       "                return solution\n",
       "\n",
       "        graphplan.graph.expand_graph()\n",
       "        if len(graphplan.graph.levels) >= 2 and graphplan.check_leveloff():\n",
       "            return None\n",
       "
\n", "\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "psource(air_cargo_graphplan)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's instantiate the problem and find a solution using this helper function." ] }, { "cell_type": "code", "execution_count": 44, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[[[PCargo(C2),\n", " Load(C2, P2, JFK),\n", " PPlane(P2),\n", " Load(C1, P1, SFO),\n", " Fly(P1, SFO, JFK),\n", " PAirport(SFO),\n", " PAirport(JFK),\n", " PPlane(P1),\n", " PCargo(C1),\n", " Fly(P2, JFK, SFO)],\n", " [Unload(C2, P2, SFO), Unload(C1, P1, JFK)]]]" ] }, "execution_count": 44, "metadata": {}, "output_type": "execute_result" } ], "source": [ "air_cargo = air_cargo_graphplan()\n", "air_cargo" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Each element in the solution is a valid action.\n", "The solution is separated into lists for each level.\n", "The actions prefixed with a 'P' are persistence actions and can be ignored.\n", "They simply carry certain states forward.\n", "We have another helper function `linearize` that presents the solution in a more readable format, much like a total-order planner." ] }, { "cell_type": "code", "execution_count": 45, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Load(C2, P2, JFK),\n", " Load(C1, P1, SFO),\n", " Fly(P1, SFO, JFK),\n", " Fly(P2, JFK, SFO),\n", " Unload(C2, P2, SFO),\n", " Unload(C1, P1, JFK)]" ] }, "execution_count": 45, "metadata": {}, "output_type": "execute_result" } ], "source": [ "linearize(air_cargo)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Indeed, this is a correct solution.\n", "
\n", "There are similar helper functions for some other planning problems.\n", "
\n", "Lets' try solving the spare tire problem." ] }, { "cell_type": "code", "execution_count": 46, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Remove(Flat, Axle), Remove(Spare, Trunk), PutOn(Spare, Axle)]" ] }, "execution_count": 46, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spare_tire = spare_tire_graphplan()\n", "linearize(spare_tire)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Solution for the cake problem" ] }, { "cell_type": "code", "execution_count": 47, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Eat(Cake), Bake(Cake)]" ] }, "execution_count": 47, "metadata": {}, "output_type": "execute_result" } ], "source": [ "cake_problem = have_cake_and_eat_cake_too_graphplan()\n", "linearize(cake_problem)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Solution for the Sussman's Anomaly configuration of three blocks." ] }, { "cell_type": "code", "execution_count": 48, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[MoveToTable(C, A), Move(B, Table, C), Move(A, Table, B)]" ] }, "execution_count": 48, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sussman_anomaly = three_block_tower_graphplan()\n", "linearize(sussman_anomaly)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.1" } }, "nbformat": 4, "nbformat_minor": 1 }