Newer
Older
"""CSP (Constraint Satisfaction Problems) problems and solvers. (Chapter 5)."""
from __future__ import generators
from utils import *
import search
import types
class CSP(search.Problem):
"""This class describes finite-domain Constraint Satisfaction Problems.
vars A list of variables; each is atomic (e.g. int or string).
domains A dict of {var:[possible_value, ...]} entries.
neighbors A dict of {var:[var,...]} that for each variable lists
the other variables that participate in constraints.
constraints A function f(A, a, B, b) that returns true if neighbors
A, B satisfy the constraint when they have values A=a, B=b
In the textbook and in most mathematical definitions, the
constraints are specified as explicit pairs of allowable values,
but the formulation here is easier to express and more compact for
most cases. (For example, the n-Queens problem can be represented
in O(n) space using this notation, instead of O(N^4) for the
explicit representation.) In terms of describing the CSP as a
problem, that's all there is.
However, the class also supports data structures and methods that help you
solve CSPs by calling a search function on the CSP. Methods and slots are
as follows, where the argument 'a' represents an assignment, which is a
dict of {var:val} entries:
assign(var, val, a) Assign a[var] = val; do other bookkeeping
unassign(var, a) Do del a[var], plus other bookkeeping
nconflicts(var, val, a) Return the number of other variables that
conflict with var=val
curr_domains[var] Slot: remaining consistent values for var
Used by constraint propagation routines.
The following methods are used only by graph_search and tree_search:
successor(state) Return a list of (action, state) pairs
goal_test(state) Return true if all constraints satisfied
The following are just for debugging purposes:
nassigns Slot: tracks the number of assignments made
display(a) Print a human-readable representation
>>> search.depth_first_graph_search(australia)
<Node (('WA', 'B'), ('Q', 'B'), ('T', 'B'), ('V', 'B'), ('SA', 'G'), ('NT', 'R'), ('NSW', 'R'))>
def __init__(self, vars, domains, neighbors, constraints):
"Construct a CSP problem. If vars is empty, it becomes domains.keys()."
vars = vars or domains.keys()
update(self, vars=vars, domains=domains,
neighbors=neighbors, constraints=constraints,
initial=(), curr_domains=None, nassigns=0)
def assign(self, var, val, assignment):
"Add {var: val} to assignment; Discard the old value if any."
assignment[var] = val
def unassign(self, var, assignment):
"""Remove {var: val} from assignment.
DO NOT call this if you are changing a variable to a new value;
just call assign for that."""
if var in assignment:
del assignment[var]
def nconflicts(self, var, val, assignment):
"Return the number of conflicts var=val has with other variables."
# Subclasses may implement this more efficiently
def conflict(var2):
val2 = assignment.get(var2, None)
return val2 != None and not self.constraints(var, val, var2, val2)
return count_if(conflict, self.neighbors[var])
def display(self, assignment):
"Show a human-readable representation of the CSP."
# Subclasses can print in a prettier way, or display with a GUI
print 'CSP:', self, 'with assignment:', assignment
## These methods are for the tree and graph search interface:
"Return a list of (action, state) pairs."
if len(state) == len(self.vars):
return []
else:
var = find_if(lambda v: v not in assignment, self.vars)
return [((var, val), state + ((var, val),))
for val in self.domains[var]
if self.nconflicts(var, val, assignment) == 0]
"The goal is to assign all vars, with all constraints satisfied."
return (len(assignment) == len(self.vars) and
every(lambda var: self.nconflicts(var, assignment[var],
assignment) == 0,
self.vars))
## These are for constraint propagation
def support_pruning(self):
"""Make sure we can prune values from domains. (We want to pay
for this only if we use it.)"""
if self.curr_domains is None:
self.curr_domains = dict((v, self.domains[v][:]) for v in self.vars)
def suppose(self, var, value):
"Start accumulating inferences from assuming var=value."
self.support_pruning()
removals = [(var, a) for a in self.curr_domains[var] if a != value]
self.curr_domains[var] = [value]
return removals
def prune(self, var, value, removals):
"Rule out var=value."
self.curr_domains[var].remove(value)
if removals is not None: removals.append((var, value))
def choices(self, var):
"Return all values for var that aren't currently ruled out."
return (self.curr_domains or self.domains)[var]
def restore(self, removals):
"Undo all inferences from a supposition."
for B, b in removals:
self.curr_domains[B].append(b)
def infer_assignment(self):
"Return the partial assignment implied by the current inferences."
self.support_pruning()
return dict((v, self.curr_domains[v][0])
for v in self.vars if 1 == len(self.curr_domains[v]))
## This is for min_conflicts search
def conflicted_vars(self, current):
"Return a list of variables in current assignment that are in conflict"
return [var for var in self.vars
if self.nconflicts(var, current[var], current) > 0]
#______________________________________________________________________________
# CSP Backtracking Search
## Variable ordering
def first_unassigned_variable(assignment, csp):
"The default variable order."
return find_if(lambda var: var not in assignment, csp.vars)
def mrv(assignment, csp):
"Minimum-remaining-values heuristic."
return argmin_random_tie(
[v for v in csp.vars if v not in assignment],
lambda var: num_legal_values(csp, var, assignment))
def num_legal_values(csp, var, assignment):
if csp.curr_domains:
return len(csp.curr_domains[var])
else:
return count_if(lambda val: csp.nconflicts(var, val, assignment) == 0,
csp.domains[var])
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
## Value ordering
def unordered_domain_values(var, assignment, csp):
"The default value order."
return csp.choices(var)
def lcv(var, assignment, csp):
"Least-constraining-values heuristic."
return sorted(csp.choices(var),
key=lambda val: csp.nconflicts(var, val, assignment))
## Inference
def no_inference(assignment, csp, var, value):
return []
def forward_checking(assignment, csp, var, value):
"Prune neighbor values inconsistent with var=value."
removals = csp.suppose(var, value)
for B in csp.neighbors[var]:
if B not in assignment:
for b in csp.curr_domains[B][:]:
if not csp.constraints(var, value, B, b):
csp.prune(B, b, removals)
return removals
def mac(assignment, csp, var, value):
"Maintain arc consistency."
removals = csp.suppose(var, value)
AC3(csp, [(X, var) for X in csp.neighbors[var]], removals)
return removals
## The search, proper
def backtracking_search(csp,
select_unassigned_variable = first_unassigned_variable,
order_domain_values = unordered_domain_values,
inference = no_inference):
"""Fig. 6.5 (3rd edition)
>>> backtracking_search(australia) is not None
True
>>> backtracking_search(australia, select_unassigned_variable=mrv) is not None
True
>>> backtracking_search(australia, order_domain_values=lcv) is not None
True
>>> backtracking_search(australia, select_unassigned_variable=mrv, order_domain_values=lcv) is not None
True
>>> backtracking_search(australia, inference=forward_checking) is not None
True
>>> backtracking_search(australia, inference=mac) is not None
True
>>> backtracking_search(usa, select_unassigned_variable=mrv, order_domain_values=lcv, inference=mac) is not None
True
"""
if inference is not no_inference:
csp.curr_domains = dict((v, csp.domains[v][:])
for v in csp.vars)
def backtrack(assignment):
if len(assignment) == len(csp.vars):
return assignment
var = select_unassigned_variable(assignment, csp)
for value in order_domain_values(var, assignment, csp):
if 0 == csp.nconflicts(var, value, assignment):
csp.assign(var, value, assignment)
removals = inference(assignment, csp, var, value)
result = backtrack(assignment)
if result is not None:
return result
csp.restore(removals)
csp.unassign(var, assignment)
return None
result = backtrack({})
assert result is None or csp.goal_test(result)
return result
#______________________________________________________________________________
# Constraint Propagation with AC-3
def AC3(csp, queue=None, removals=None):
"""[Fig. 5.7]"""
queue = [(Xi, Xk) for Xi in csp.vars for Xk in csp.neighbors[Xi]]
csp.support_pruning()
while queue:
(Xi, Xj) = queue.pop()
if remove_inconsistent_values(csp, Xi, Xj, removals):
for Xk in csp.neighbors[Xi]:
queue.append((Xk, Xi))
def remove_inconsistent_values(csp, Xi, Xj, removals):
"Return true if we remove a value."
removed = False
for x in csp.curr_domains[Xi][:]:
# If Xi=x conflicts with Xj=y for every possible y, eliminate Xi=x
if every(lambda y: not csp.constraints(Xi, x, Xj, y),
removed = True
return removed
#______________________________________________________________________________
# Min-conflicts hillclimbing search for CSPs
def min_conflicts(csp, max_steps=100000):
"""Solve a CSP by stochastic hillclimbing on the number of conflicts."""
# Generate a complete assignment for all vars (probably with conflicts)
for var in csp.vars:
val = min_conflicts_value(csp, var, current)
csp.assign(var, val, current)
# Now repeatedly choose a random conflicted variable and change it
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
for i in range(max_steps):
conflicted = csp.conflicted_vars(current)
if not conflicted:
return current
var = random.choice(conflicted)
val = min_conflicts_value(csp, var, current)
csp.assign(var, val, current)
return None
def min_conflicts_value(csp, var, current):
"""Return the value that will give var the least number of conflicts.
If there is a tie, choose at random."""
return argmin_random_tie(csp.domains[var],
lambda val: csp.nconflicts(var, val, current))
#______________________________________________________________________________
# Map-Coloring Problems
class UniversalDict:
"""A universal dict maps any key to the same value. We use it here
as the domains dict for CSPs in which all vars have the same domain.
>>> d = UniversalDict(42)
>>> d['life']
42
"""
def __init__(self, value): self.value = value
def __getitem__(self, key): return self.value
def __repr__(self): return '{Any: %r}' % self.value
def different_values_constraint(A, a, B, b):
"A constraint saying two neighboring variables must differ in value."
return a != b
def MapColoringCSP(colors, neighbors):
"""Make a CSP for the problem of coloring a map with different colors
for any two adjacent regions. Arguments are a list of colors, and a
dict of {region: [neighbor,...]} entries. This dict may also be
specified as a string of the form defined by parse_neighbors"""
if isinstance(neighbors, str):
neighbors = parse_neighbors(neighbors)
return CSP(neighbors.keys(), UniversalDict(colors), neighbors,
different_values_constraint)
def parse_neighbors(neighbors, vars=[]):
"""Convert a string of the form 'X: Y Z; Y: Z' into a dict mapping
regions to neighbors. The syntax is a region name followed by a ':'
followed by zero or more region names, followed by ';', repeated for
each region name. If you say 'X: Y' you don't need 'Y: X'.
>>> parse_neighbors('X: Y Z; Y: Z')
{'Y': ['X', 'Z'], 'X': ['Y', 'Z'], 'Z': ['X', 'Y']}
"""
dict = DefaultDict([])
for var in vars:
dict[var] = []
specs = [spec.split(':') for spec in neighbors.split(';')]
for (A, Aneighbors) in specs:
dict.setdefault(A, [])
for B in Aneighbors.split():
dict[A].append(B)
dict[B].append(A)
return dict
australia = MapColoringCSP(list('RGB'),
'SA: WA NT Q NSW V; NT: WA Q; NSW: Q V; T: ')
usa = MapColoringCSP(list('RGBY'),
"""WA: OR ID; OR: ID NV CA; CA: NV AZ; NV: ID UT AZ; ID: MT WY UT;
UT: WY CO AZ; MT: ND SD WY; WY: SD NE CO; CO: NE KA OK NM; NM: OK TX;
ND: MN SD; SD: MN IA NE; NE: IA MO KA; KA: MO OK; OK: MO AR TX;
TX: AR LA; MN: WI IA; IA: WI IL MO; MO: IL KY TN AR; AR: MS TN LA;
LA: MS; WI: MI IL; IL: IN; IN: KY; MS: TN AL; AL: TN GA FL; MI: OH;
OH: PA WV KY; KY: WV VA TN; TN: VA NC GA; GA: NC SC FL;
PA: NY NJ DE MD WV; WV: MD VA; VA: MD DC NC; NC: SC; NY: VT MA CT NJ;
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
NJ: DE; DE: MD; MD: DC; VT: NH MA; MA: NH RI CT; CT: RI; ME: NH;
HI: ; AK: """)
#______________________________________________________________________________
# n-Queens Problem
def queen_constraint(A, a, B, b):
"""Constraint is satisfied (true) if A, B are really the same variable,
or if they are not in the same row, down diagonal, or up diagonal."""
return A == B or (a != b and A + a != B + b and A - a != B - b)
class NQueensCSP(CSP):
"""Make a CSP for the nQueens problem for search with min_conflicts.
Suitable for large n, it uses only data structures of size O(n).
Think of placing queens one per column, from left to right.
That means position (x, y) represents (var, val) in the CSP.
The main structures are three arrays to count queens that could conflict:
rows[i] Number of queens in the ith row (i.e val == i)
downs[i] Number of queens in the \ diagonal
such that their (x, y) coordinates sum to i
ups[i] Number of queens in the / diagonal
such that their (x, y) coordinates have x-y+n-1 = i
We increment/decrement these counts each time a queen is placed/moved from
a row/diagonal. So moving is O(1), as is nconflicts. But choosing
a variable, and a best value for the variable, are each O(n).
If you want, you can keep track of conflicted vars, then variable
selection will also be O(1).
>>> len(backtracking_search(NQueensCSP(8)))
8
"""
def __init__(self, n):
"""Initialize data structures for n Queens."""
CSP.__init__(self, range(n), UniversalDict(range(n)),
UniversalDict(range(n)), queen_constraint)
update(self, rows=[0]*n, ups=[0]*(2*n - 1), downs=[0]*(2*n - 1))
def nconflicts(self, var, val, assignment):
"""The number of conflicts, as recorded with each assignment.
Count conflicts in row and in up, down diagonals. If there
is a queen there, it can't conflict with itself, so subtract 3."""
n = len(self.vars)
c = self.rows[val] + self.downs[var+val] + self.ups[var-val+n-1]
if assignment.get(var, None) == val:
c -= 3
return c
def assign(self, var, val, assignment):
"Assign var, and keep track of conflicts."
oldval = assignment.get(var, None)
if val != oldval:
if oldval is not None: # Remove old val if there was one
self.record_conflict(assignment, var, oldval, -1)
self.record_conflict(assignment, var, val, +1)
CSP.assign(self, var, val, assignment)
def unassign(self, var, assignment):
"Remove var from assignment (if it is there) and track conflicts."
if var in assignment:
self.record_conflict(assignment, var, assignment[var], -1)
CSP.unassign(self, var, assignment)
def record_conflict(self, assignment, var, val, delta):
"Record conflicts caused by addition or deletion of a Queen."
n = len(self.vars)
self.rows[val] += delta
self.downs[var + val] += delta
self.ups[var - val + n - 1] += delta
def display(self, assignment):
"Print the queens and the nconflicts values (for debugging)."
n = len(self.vars)
for val in range(n):
for var in range(n):
if assignment.get(var,'') == val: ch ='Q'
elif (var+val) % 2 == 0: ch = '.'
else: ch = '-'
print ch,
print ' ',
for var in range(n):
if assignment.get(var,'') == val: ch ='*'
else: ch = ' '
print str(self.nconflicts(var, val, assignment))+ch,
print
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
#______________________________________________________________________________
# Sudoku
import itertools, re
def flatten(seqs): return sum(seqs, [])
easy1 = '..3.2.6..9..3.5..1..18.64....81.29..7.......8..67.82....26.95..8..2.3..9..5.1.3..'
harder1 = '4173698.5.3..........7......2.....6.....8.4......1.......6.3.7.5..2.....1.4......'
class Sudoku(CSP):
"""A Sudoku problem.
The box grid is a 3x3 array of boxes, each a 3x3 array of cells.
Each cell holds a digit in 1..9. In each box, all digits are
different; the same for each row and column as a 9x9 grid.
>>> e = Sudoku(easy1)
>>> e.display(e.infer_assignment())
. . 3 | . 2 . | 6 . .
9 . . | 3 . 5 | . . 1
. . 1 | 8 . 6 | 4 . .
------+-------+------
. . 8 | 1 . 2 | 9 . .
7 . . | . . . | . . 8
. . 6 | 7 . 8 | 2 . .
------+-------+------
. . 2 | 6 . 9 | 5 . .
8 . . | 2 . 3 | . . 9
. . 5 | . 1 . | 3 . .
>>> AC3(e); e.display(e.infer_assignment())
4 8 3 | 9 2 1 | 6 5 7
9 6 7 | 3 4 5 | 8 2 1
2 5 1 | 8 7 6 | 4 9 3
------+-------+------
5 4 8 | 1 3 2 | 9 7 6
7 2 9 | 5 6 4 | 1 3 8
1 3 6 | 7 9 8 | 2 4 5
------+-------+------
3 7 2 | 6 8 9 | 5 1 4
8 1 4 | 2 5 3 | 7 6 9
6 9 5 | 4 1 7 | 3 8 2
>>> h = Sudoku(harder1)
>>> None != backtracking_search(h, select_unassigned_variable=mrv, inference=forward_checking)
True
"""
R3 = range(3)
Cell = itertools.count().next
bgrid = [[[[Cell() for x in R3] for y in R3] for bx in R3] for by in R3]
boxes = flatten([map(flatten, brow) for brow in bgrid])
rows = flatten([map(flatten, zip(*brow)) for brow in bgrid])
units = map(set, boxes + rows + zip(*rows))
neighbors = dict([(v, set.union(*[u for u in units if v in u]) - set([v]))
for v in flatten(rows)])
def __init__(self, grid):
squares = re.findall(r'\d|\.', grid)
domains = dict((var, [int(ch)] if ch.isdigit() else range(1, 10))
for var, ch in zip(flatten(self.rows), squares))
CSP.__init__(self, None, domains, self.neighbors,
different_values_constraint)
def display(self, assignment):
def show_box(box): return [' '.join(map(show_cell, row)) for row in box]
def show_cell(cell): return str(assignment.get(cell, '.'))
def abut(lines1, lines2): return map(' | '.join, zip(lines1, lines2))
print '\n------+-------+------\n'.join(
'\n'.join(reduce(abut, map(show_box, brow))) for brow in self.bgrid)
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
#______________________________________________________________________________
# The Zebra Puzzle
def Zebra():
"Return an instance of the Zebra Puzzle."
Colors = 'Red Yellow Blue Green Ivory'.split()
Pets = 'Dog Fox Snails Horse Zebra'.split()
Drinks = 'OJ Tea Coffee Milk Water'.split()
Countries = 'Englishman Spaniard Norwegian Ukranian Japanese'.split()
Smokes = 'Kools Chesterfields Winston LuckyStrike Parliaments'.split()
vars = Colors + Pets + Drinks + Countries + Smokes
domains = {}
for var in vars:
domains[var] = range(1, 6)
domains['Norwegian'] = [1]
domains['Milk'] = [3]
neighbors = parse_neighbors("""Englishman: Red;
Spaniard: Dog; Kools: Yellow; Chesterfields: Fox;
Norwegian: Blue; Winston: Snails; LuckyStrike: OJ;
Ukranian: Tea; Japanese: Parliaments; Kools: Horse;
Coffee: Green; Green: Ivory""", vars)
for type in [Colors, Pets, Drinks, Countries, Smokes]:
for A in type:
for B in type:
if A != B:
if B not in neighbors[A]: neighbors[A].append(B)
if A not in neighbors[B]: neighbors[B].append(A)
def zebra_constraint(A, a, B, b, recurse=0):
same = (a == b)
next_to = abs(a - b) == 1
if A == 'Englishman' and B == 'Red': return same
if A == 'Spaniard' and B == 'Dog': return same
if A == 'Chesterfields' and B == 'Fox': return next_to
if A == 'Norwegian' and B == 'Blue': return next_to
if A == 'Kools' and B == 'Yellow': return same
if A == 'Winston' and B == 'Snails': return same
if A == 'LuckyStrike' and B == 'OJ': return same
if A == 'Ukranian' and B == 'Tea': return same
if A == 'Japanese' and B == 'Parliaments': return same
if A == 'Kools' and B == 'Horse': return next_to
if A == 'Coffee' and B == 'Green': return same
if A == 'Green' and B == 'Ivory': return (a - 1) == b
if recurse == 0: return zebra_constraint(B, b, A, a, 1)
if ((A in Colors and B in Colors) or
(A in Pets and B in Pets) or
(A in Drinks and B in Drinks) or
(A in Countries and B in Countries) or
(A in Smokes and B in Smokes)): return not same
raise 'error'
return CSP(vars, domains, neighbors, zebra_constraint)
def solve_zebra(algorithm=min_conflicts, **args):
z = Zebra()
ans = algorithm(z, **args)
for h in range(1, 6):
print 'House', h,
for (var, val) in ans.items():
if val == h: print var,
print
return ans['Zebra'], ans['Water'], z.nassigns, ans
__doc__ += random_tests("""
>>> min_conflicts(australia)
{'WA': 'B', 'Q': 'B', 'T': 'G', 'V': 'B', 'SA': 'R', 'NT': 'G', 'NSW': 'G'}
>>> min_conflicts(NQueensCSP(8), max_steps=10000)
{0: 5, 1: 0, 2: 4, 3: 1, 4: 7, 5: 2, 6: 6, 7: 3}