Newer
Older
from utils import Expr, expr, first
from logic import FolKB
PDLL used to define a search problem.
It stores states in a knowledge base consisting of first order logic statements.
The conjunction of these logical statements completely defines a state.
def __init__(self, initial_state, actions, goal_test):
self.kb = FolKB(initial_state)
self.actions = actions
self.goal_test_func = goal_test
def goal_test(self):
return self.goal_test_func(self.kb)
def act(self, action):
"""
Note that action is an Expr like expr('Remove(Glass, Table)') or expr('Eat(Sandwich)')
"""
action_name = action.op
args = action.args
list_action = first(a for a in self.actions if a.name == action_name)
if list_action is None:
raise Exception("Action '{}' not found".format(action_name))
if not list_action.check_precond(self.kb, args):
raise Exception("Action '{}' pre-conditions not satisfied".format(action))
list_action(self.kb, args)
Defines an action schema using preconditions and effects.
Use this to describe actions in PDDL.
action is an Expr where variables are given as arguments(args).
Precondition and effect are both lists with positive and negated literals.
Example:
precond_pos = [expr("Human(person)"), expr("Hungry(Person)")]
precond_neg = [expr("Eaten(food)")]
effect_add = [expr("Eaten(food)")]
effect_rem = [expr("Hungry(person)")]
eat = Action(expr("Eat(person, food)"), [precond_pos, precond_neg], [effect_add, effect_rem])
"""
def __init__(self, action, precond, effect):
self.name = action.op
self.args = action.args
self.precond_pos = precond[0]
self.precond_neg = precond[1]
self.effect_add = effect[0]
self.effect_rem = effect[1]
def __call__(self, kb, args):
return self.act(kb, args)
def substitute(self, e, args):
"""Replaces variables in expression with their respective Propositional symbol"""
new_args = list(e.args)
for num, x in enumerate(e.args):
for i in range(len(self.args)):
if self.args[i] == x:
new_args[num] = args[i]
return Expr(e.op, *new_args)
def check_precond(self, kb, args):
"""Checks if the precondition is satisfied in the current state"""
# check for positive clauses
for clause in self.precond_pos:
if self.substitute(clause, args) not in kb.clauses:
return False
# check for negative clauses
for clause in self.precond_neg:
if self.substitute(clause, args) in kb.clauses:
return False
return True
def act(self, kb, args):
"""Executes the action on the state's kb"""
# check if the preconditions are satisfied
if not self.check_precond(kb, args):
raise Exception("Action pre-conditions not satisfied")
# remove negative literals
for clause in self.effect_rem:
kb.retract(self.substitute(clause, args))
for clause in self.effect_add:
kb.tell(self.substitute(clause, args))
def air_cargo():
init = [expr('At(C1, SFO)'),
expr('At(C2, JFK)'),
expr('At(P1, SFO)'),
expr('At(P2, JFK)'),
expr('Cargo(C1)'),
expr('Cargo(C2)'),
expr('Plane(P1)'),
expr('Plane(P2)'),
expr('Airport(JFK)'),
expr('Airport(SFO)')]
def goal_test(kb):
required = [expr('At(C1 , JFK)'), expr('At(C2 ,SFO)')]
for q in required:
if kb.ask(q) is False:
return False
return True
precond_pos = [expr("At(c, a)"), expr("At(p, a)"), expr("Cargo(c)"), expr("Plane(p)"),
expr("Airport(a)")]
precond_neg = []
effect_add = [expr("In(c, p)")]
effect_rem = [expr("At(c, a)")]
load = Action(expr("Load(c, p, a)"), [precond_pos, precond_neg], [effect_add, effect_rem])
# Unload
precond_pos = [expr("In(c, p)"), expr("At(p, a)"), expr("Cargo(c)"), expr("Plane(p)"),
expr("Airport(a)")]
precond_neg = []
effect_add = [expr("At(c, a)")]
effect_rem = [expr("In(c, p)")]
unload = Action(expr("Unload(c, p, a)"), [precond_pos, precond_neg], [effect_add, effect_rem])
# Fly
# Used 'f' instead of 'from' because 'from' is a python keyword and expr uses eval() function
precond_pos = [expr("At(p, f)"), expr("Plane(p)"), expr("Airport(f)"), expr("Airport(to)")]
precond_neg = []
effect_add = [expr("At(p, to)")]
effect_rem = [expr("At(p, f)")]
fly = Action(expr("Fly(p, f, to)"), [precond_pos, precond_neg], [effect_add, effect_rem])
return PDLL(init, [load, unload, fly], goal_test)
def spare_tire():
init = [expr('Tire(Flat)'),
expr('Tire(Spare)'),
expr('At(Flat, Axle)'),
expr('At(Spare, Trunk)')]
def goal_test(kb):
required = [expr('At(Spare, Axle)'), expr('At(Flat, Ground)')]
for q in required:
if kb.ask(q) is False:
return False
return True
precond_pos = [expr("At(obj, loc)")]
precond_neg = []
effect_add = [expr("At(obj, Ground)")]
effect_rem = [expr("At(obj, loc)")]
remove = Action(expr("Remove(obj, loc)"), [precond_pos, precond_neg], [effect_add, effect_rem])
precond_pos = [expr("Tire(t)"), expr("At(t, Ground)")]
precond_neg = [expr("At(Flat, Axle)")]
effect_add = [expr("At(t, Axle)")]
effect_rem = [expr("At(t, Ground)")]
put_on = Action(expr("PutOn(t, Axle)"), [precond_pos, precond_neg], [effect_add, effect_rem])
precond_pos = []
precond_neg = []
effect_add = []
effect_rem = [expr("At(Spare, Ground)"), expr("At(Spare, Axle)"), expr("At(Spare, Trunk)"),
expr("At(Flat, Ground)"), expr("At(Flat, Axle)"), expr("At(Flat, Trunk)")]
leave_overnight = Action(expr("LeaveOvernight"), [precond_pos, precond_neg],
[effect_add, effect_rem])
return PDLL(init, [remove, put_on, leave_overnight], goal_test)
opensourceware
a validé
def three_block_tower():
init = [expr('On(A, Table)'),
expr('On(B, Table)'),
expr('On(C, A)'),
expr('Block(A)'),
expr('Block(B)'),
expr('Block(C)'),
expr('Clear(B)'),
expr('Clear(C)')]
def goal_test(kb):
required = [expr('On(A, B)'), expr('On(B, C)')]
for q in required:
if kb.ask(q) is False:
return False
return True
precond_pos = [expr('On(b, x)'), expr('Clear(b)'), expr('Clear(y)'), expr('Block(b)'),
expr('Block(y)')]
precond_neg = []
effect_add = [expr('On(b, y)'), expr('Clear(x)')]
effect_rem = [expr('On(b, x)'), expr('Clear(y)')]
move = Action(expr('Move(b, x, y)'), [precond_pos, precond_neg], [effect_add, effect_rem])
opensourceware
a validé
precond_pos = [expr('On(b, x)'), expr('Clear(b)'), expr('Block(b)')]
precond_neg = []
effect_add = [expr('On(b, Table)'), expr('Clear(x)')]
effect_rem = [expr('On(b, x)')]
moveToTable = Action(expr('MoveToTable(b, x)'), [precond_pos, precond_neg],
[effect_add, effect_rem])
return PDLL(init, [move, moveToTable], goal_test)
opensourceware
a validé
opensourceware
a validé
def have_cake_and_eat_cake_too():
init = [expr('Have(Cake)')]
def goal_test(kb):
required = [expr('Have(Cake)'), expr('Eaten(Cake)')]
for q in required:
if kb.ask(q) is False:
return False
return True
opensourceware
a validé
# Eat cake
precond_pos = [expr('Have(Cake)')]
precond_neg = []
effect_add = [expr('Eaten(Cake)')]
effect_rem = [expr('Have(Cake)')]
eat_cake = Action(expr('Eat(Cake)'), [precond_pos, precond_neg], [effect_add, effect_rem])
opensourceware
a validé
precond_pos = []
precond_neg = [expr('Have(Cake)')]
effect_add = [expr('Have(Cake)')]
effect_rem = []
bake_cake = Action(expr('Bake(Cake)'), [precond_pos, precond_neg], [effect_add, effect_rem])
return PDLL(init, [eat_cake, bake_cake], goal_test)
class Level():
"""
Contains the state of the planning problem
and exhaustive list of actions which use the
states as pre-condition.
"""
def __init__(self, poskb, negkb):
self.poskb = poskb
self.current_state_pos = poskb.clauses
self.current_state_neg = negkb.clauses
self.current_action_links_pos = {}
self.current_action_links_neg = {}
self.current_state_links_pos = {}
self.current_state_links_neg = {}
self.next_state_links_pos = {}
self.next_state_links_neg = {}
self.mutex = []
def __call__(self, actions, objects):
self.build(actions, objects)
self.find_mutex()
def find_mutex(self):
for poseff in self.next_state_links_pos:
negeff = poseff
if negeff in self.next_state_links_neg:
for a in self.next_state_links_pos[poseff]:
for b in self.next_state_links_neg[negeff]:
if set([a, b]) not in self.mutex:
self.mutex.append(set([a, b]))
for posprecond in self.current_state_links_pos:
negeff = posprecond
if negeff in self.next_state_links_neg:
for a in self.current_state_links_pos[posprecond]:
for b in self.next_state_links_neg[negeff]:
if set([a, b]) not in self.mutex:
self.mutex.append(set([a, b]))
for negprecond in self.current_state_links_neg:
poseff = negprecond
if poseff in self.next_state_links_pos:
for a in self.next_state_links_pos[poseff]:
for b in self.current_state_links_neg[negprecond]:
if set([a, b]) not in self.mutex:
self.mutex.append(set([a, b]))
for posprecond in self.current_state_links_pos:
negprecond = posprecond
if negprecond in self.current_state_links_neg:
for a in self.current_state_links_pos[posprecond]:
for b in self.current_state_links_neg[negprecond]:
if set([a, b]) not in self.mutex:
self.mutex.append(set([a, b]))
state_mutex = []
for pair in self.mutex:
next_state_0 = self.next_action_links[list(pair)[0]]
if len(pair) == 2:
next_state_1 = self.next_action_links[list(pair)[1]]
else:
next_state_1 = self.next_action_links[list(pair)[0]]
if (len(next_state_0) == 1) and (len(next_state_1) == 1):
state_mutex.append(set([next_state_0[0], next_state_1[0]]))
self.mutex = self.mutex+state_mutex
def build(self, actions, objects):
# Add persistence actions for positive states
for clause in self.current_state_pos:
self.current_action_links_pos[Expr('Persistence', clause)] = [clause]
self.next_action_links[Expr('Persistence', clause)] = [clause]
self.current_state_links_pos[clause] = [Expr('Persistence', clause)]
self.next_state_links_pos[clause] = [Expr('Persistence', clause)]
# Add persistence actions for negative states
not_expr = Expr('not'+clause.op, clause.args)
self.current_action_links_neg[Expr('Persistence', not_expr)] = [clause]
self.next_action_links[Expr('Persistence', not_expr)] = [clause]
self.current_state_links_neg[clause] = [Expr('Persistence', not_expr)]
self.next_state_links_neg[clause] = [Expr('Persistence', not_expr)]
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
for a in actions:
num_args = len(a.args)
possible_args = tuple(itertools.permutations(objects, num_args))
for arg in possible_args:
if a.check_precond(self.poskb, arg):
for num, symbol in enumerate(a.args):
if not symbol.op.islower():
arg = list(arg)
arg[num] = symbol
arg = tuple(arg)
new_action = a.substitute(Expr(a.name, *a.args), arg)
self.current_action_links_pos[new_action] = []
self.current_action_links_neg[new_action] = []
for clause in a.precond_pos:
new_clause = a.substitute(clause, arg)
self.current_action_links_pos[new_action].append(new_clause)
if new_clause in self.current_state_links_pos:
self.current_state_links_pos[new_clause].append(new_action)
else:
self.current_state_links_pos[new_clause] = [new_action]
for clause in a.precond_neg:
new_clause = a.substitute(clause, arg)
self.current_action_links_neg[new_action].append(new_clause)
if new_clause in self.current_state_links_neg:
self.current_state_links_neg[new_clause].append(new_action)
else:
self.current_state_links_neg[new_clause] = [new_action]
self.next_action_links[new_action] = []
for clause in a.effect_add:
new_clause = a.substitute(clause, arg)
self.next_action_links[new_action].append(new_clause)
if new_clause in self.next_state_links_pos:
self.next_state_links_pos[new_clause].append(new_action)
else:
self.next_state_links_pos[new_clause] = [new_action]
for clause in a.effect_rem:
new_clause = a.substitute(clause, arg)
self.next_action_links[new_action].append(new_clause)
if new_clause in self.next_state_links_neg:
self.next_state_links_neg[new_clause].append(new_action)
else:
self.next_state_links_neg[new_clause] = [new_action]
def perform_actions(self):
new_kb_pos = FolKB(list(set(self.next_state_links_pos.keys())))
new_kb_neg = FolKB(list(set(self.next_state_links_neg.keys())))
return Level(new_kb_pos, new_kb_neg)
class Graph:
"""
Contains levels of state and actions
Used in graph planning algorithm to extract a solution
"""
def __init__(self, pdll, negkb):
self.pdll = pdll
self.levels = [Level(pdll.kb, negkb)]
self.objects = set(arg for clause in pdll.kb.clauses + negkb.clauses for arg in clause.args)
def __call__(self):
self.expand_graph()
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
def expand_graph(self):
last_level = self.levels[-1]
last_level(self.pdll.actions, self.objects)
self.levels.append(last_level.perform_actions())
def non_mutex_goals(self, goals, index):
goal_perm = itertools.combinations(goals, 2)
for g in goal_perm:
if set(g) in self.levels[index].mutex:
return False
return True
class GraphPlan:
"""
Class for formulation GraphPlan algorithm
Constructs a graph of state and action space
Returns solution for the planning problem
"""
def __init__(self, pdll, negkb):
self.graph = Graph(pdll, negkb)
self.nogoods = []
self.solution = []
def check_leveloff(self):
first_check = (set(self.graph.levels[-1].current_state_pos) ==
set(self.graph.levels[-2].current_state_pos))
second_check = (set(self.graph.levels[-1].current_state_neg) ==
set(self.graph.levels[-2].current_state_neg))
if first_check and second_check:
return True
def extract_solution(self, goals_pos, goals_neg, index):
level = self.graph.levels[index]
if not self.graph.non_mutex_goals(goals_pos+goals_neg, index):
self.nogoods.append((level, goals_pos, goals_neg))
return
level = self.graph.levels[index-1]
# Create all combinations of actions that satisfy the goal
actions = []
for goal in goals_pos:
actions.append(level.next_state_links_pos[goal])
for goal in goals_neg:
actions.append(level.next_state_links_neg[goal])
all_actions = list(itertools.product(*actions))
# Filter out the action combinations which contain mutexes
non_mutex_actions = []
for action_tuple in all_actions:
action_pairs = itertools.combinations(list(set(action_tuple)), 2)
non_mutex_actions.append(list(set(action_tuple)))
for pair in action_pairs:
if set(pair) in level.mutex:
non_mutex_actions.pop(-1)
break
for action_list in non_mutex_actions:
if [action_list, index] not in self.solution:
self.solution.append([action_list, index])
new_goals_pos = []
new_goals_neg = []
for act in set(action_list):
if act in level.current_action_links_pos:
new_goals_pos = new_goals_pos + level.current_action_links_pos[act]
for act in set(action_list):
if act in level.current_action_links_neg:
new_goals_neg = new_goals_neg + level.current_action_links_neg[act]
if abs(index)+1 == len(self.graph.levels):
return
elif (level, new_goals_pos, new_goals_neg) in self.nogoods:
return
else:
self.extract_solution(new_goals_pos, new_goals_neg, index-1)
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
solution = []
for item in self.solution:
if item[1] == -1:
solution.append([])
solution[-1].append(item[0])
else:
solution[-1].append(item[0])
for num, item in enumerate(solution):
item.reverse()
solution[num] = item
return solution
def goal_test(kb, goals):
for q in goals:
if kb.ask(q) is False:
return False
return True
def spare_tire_graphplan():
pdll = spare_tire()
negkb = FolKB([expr('At(Flat, Trunk)')])
graphplan = GraphPlan(pdll, negkb)
goals_pos = [expr('At(Spare, Axle)'), expr('At(Flat, Ground)')]
goals_neg = []
while True:
if (goal_test(graphplan.graph.levels[-1].poskb, goals_pos) and
graphplan.graph.non_mutex_goals(goals_pos+goals_neg, -1)):
solution = graphplan.extract_solution(goals_pos, goals_neg, -1)
if solution:
return solution
graphplan.graph.expand_graph()
if len(graphplan.graph.levels)>=2 and graphplan.check_leveloff():
return None
def double_tennis_problem():
init = [expr('At(A, LeftBaseLine)'),
expr('At(B, RightNet)'),
expr('Approaching(Ball, RightBaseLine)'),
expr('Partner(A, B)'),
expr('Partner(B, A)')]
def goal_test(kb):
required = [expr('Goal(Returned(Ball))'), expr('At(a, RightNet)'), expr('At(a, LeftNet)')]
for q in required:
if kb.ask(q) is False:
return False
return True
# Actions
# Hit
precond_pos = [expr("Approaching(Ball,loc)"), expr("At(actor,loc)")]
precond_neg = []
effect_add = [expr("Returned(Ball)")]
effect_rem = []
hit = Action(expr("Hit(actor, Ball)"), [precond_pos, precond_neg], [effect_add, effect_rem])
precond_pos = [expr("At(actor, loc)")]
precond_neg = []
effect_add = [expr("At(actor, to)")]
effect_rem = [expr("At(actor, loc)")]
go = Action(expr("Go(actor, to)"), [precond_pos, precond_neg], [effect_add, effect_rem])
return PDLL(init, [hit, go], goal_test)