Newer
Older
"""CSP (Constraint Satisfaction Problems) problems and solvers. (Chapter 6)."""
Donato Meoli
a validé
import string
from operator import eq, neg
Donato Meoli
a validé
from sortedcontainers import SortedSet
from utils import argmin_random_tie, count, first, extend
Donato Meoli
a validé
from collections import defaultdict, Counter
class CSP(search.Problem):
"""This class describes finite-domain Constraint Satisfaction Problems.
variables A list of variables; each is atomic (e.g. int or string).
domains A dict of {var:[possible_value, ...]} entries.
neighbors A dict of {var:[var,...]} that for each variable lists
the other variables that participate in constraints.
constraints A function f(A, a, B, b) that returns true if neighbors
A, B satisfy the constraint when they have values A=a, B=b
In the textbook and in most mathematical definitions, the
constraints are specified as explicit pairs of allowable values,
but the formulation here is easier to express and more compact for
most cases. (For example, the n-Queens problem can be represented
in O(n) space using this notation, instead of O(N^4) for the
explicit representation.) In terms of describing the CSP as a
problem, that's all there is.
However, the class also supports data structures and methods that help you
solve CSPs by calling a search function on the CSP. Methods and slots are
as follows, where the argument 'a' represents an assignment, which is a
dict of {var:val} entries:
assign(var, val, a) Assign a[var] = val; do other bookkeeping
unassign(var, a) Do del a[var], plus other bookkeeping
nconflicts(var, val, a) Return the number of other variables that
conflict with var=val
curr_domains[var] Slot: remaining consistent values for var
Used by constraint propagation routines.
The following methods are used only by graph_search and tree_search:
actions(state) Return a list of actions
result(state, action) Return a successor of state
goal_test(state) Return true if all constraints satisfied
The following are just for debugging purposes:
nassigns Slot: tracks the number of assignments made
display(a) Print a human-readable representation
def __init__(self, variables, domains, neighbors, constraints):
"""Construct a CSP problem. If variables is empty, it becomes domains.keys()."""
variables = variables or list(domains.keys())
self.variables = variables
self.domains = domains
self.neighbors = neighbors
self.constraints = constraints
self.initial = ()
self.curr_domains = None
self.nassigns = 0
def assign(self, var, val, assignment):
"""Add {var: val} to assignment; Discard the old value if any."""
assignment[var] = val
def unassign(self, var, assignment):
"""Remove {var: val} from assignment.
DO NOT call this if you are changing a variable to a new value;
just call assign for that."""
if var in assignment:
del assignment[var]
def nconflicts(self, var, val, assignment):
"""Return the number of conflicts var=val has with other variables."""
# Subclasses may implement this more efficiently
def conflict(var2):
return (var2 in assignment and
not self.constraints(var, val, var2, assignment[var2]))
return count(conflict(v) for v in self.neighbors[var])
def display(self, assignment):
"""Show a human-readable representation of the CSP."""
# Subclasses can print in a prettier way, or display with a GUI
print('CSP:', self, 'with assignment:', assignment)
# These methods are for the tree and graph-search interface:
def actions(self, state):
"""Return a list of applicable actions: nonconflicting
assignments to an unassigned variable."""
if len(state) == len(self.variables):
return []
else:
var = first([v for v in self.variables if v not in assignment])
return [(var, val) for val in self.domains[var]
if self.nconflicts(var, val, assignment) == 0]
def result(self, state, action):
"""Perform an action and return the new state."""
"""The goal is to assign all variables, with all constraints satisfied."""
return (len(assignment) == len(self.variables)
and all(self.nconflicts(variables, assignment[variables], assignment) == 0
for variables in self.variables))
def support_pruning(self):
"""Make sure we can prune values from domains. (We want to pay
for this only if we use it.)"""
if self.curr_domains is None:
self.curr_domains = {v: list(self.domains[v]) for v in self.variables}
def suppose(self, var, value):
"""Start accumulating inferences from assuming var=value."""
self.support_pruning()
removals = [(var, a) for a in self.curr_domains[var] if a != value]
self.curr_domains[var] = [value]
return removals
def prune(self, var, value, removals):
self.curr_domains[var].remove(value)
if removals is not None:
removals.append((var, value))
def choices(self, var):
"""Return all values for var that aren't currently ruled out."""
return (self.curr_domains or self.domains)[var]
def infer_assignment(self):
"""Return the partial assignment implied by the current inferences."""
self.support_pruning()
return {v: self.curr_domains[v][0]
for v in self.variables if 1 == len(self.curr_domains[v])}
def restore(self, removals):
"""Undo a supposition and all inferences from it."""
for B, b in removals:
self.curr_domains[B].append(b)
def conflicted_vars(self, current):
"""Return a list of variables in current assignment that are in conflict"""
return [var for var in self.variables
if self.nconflicts(var, current[var], current) > 0]
# ______________________________________________________________________________
# Constraint Propagation with AC-3
Donato Meoli
a validé
def no_arc_heuristic(csp, queue):
return queue
def dom_j_up(csp, queue):
return SortedSet(queue, key=lambda t: neg(len(csp.curr_domains[t[1]])))
def AC3(csp, queue=None, removals=None, arc_heuristic=dom_j_up):
"""[Figure 6.3]"""
queue = {(Xi, Xk) for Xi in csp.variables for Xk in csp.neighbors[Xi]}
Donato Meoli
a validé
queue = arc_heuristic(csp, queue)
while queue:
(Xi, Xj) = queue.pop()
if revise(csp, Xi, Xj, removals):
if not csp.curr_domains[Xi]:
return False
for Xk in csp.neighbors[Xi]:
def revise(csp, Xi, Xj, removals):
revised = False
for x in csp.curr_domains[Xi][:]:
# If Xi=x conflicts with Xj=y for every possible y, eliminate Xi=x
if all(not csp.constraints(Xi, x, Xj, y) for y in csp.curr_domains[Xj]):
csp.prune(Xi, x, removals)
revised = True
return revised
Donato Meoli
a validé
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# Constraint Propagation with AC-3b: an improved version of AC-3 with
# double-support domain-heuristic
def AC3b(csp, queue=None, removals=None, arc_heuristic=dom_j_up):
if queue is None:
queue = {(Xi, Xk) for Xi in csp.variables for Xk in csp.neighbors[Xi]}
csp.support_pruning()
queue = arc_heuristic(csp, queue)
while queue:
(Xi, Xj) = queue.pop()
# Si_p values are all known to be supported by Xj
# Sj_p values are all known to be supported by Xi
# Dj - Sj_p = Sj_u values are unknown, as yet, to be supported by Xi
Si_p, Sj_p, Sj_u = partition(csp, Xi, Xj)
if not Si_p:
return False
revised = False
for x in set(csp.curr_domains[Xi]) - Si_p:
csp.prune(Xi, x, removals)
revised = True
if revised:
for Xk in csp.neighbors[Xi]:
if Xk != Xj:
queue.add((Xk, Xi))
if (Xj, Xi) in queue:
if isinstance(queue, set):
# or queue -= {(Xj, Xi)} or queue.remove((Xj, Xi))
queue.difference_update({(Xj, Xi)})
else:
queue.difference_update((Xj, Xi))
# the elements in D_j which are supported by Xi are given by the union of Sj_p with the set of those
# elements of Sj_u which further processing will show to be supported by some vi_p in Si_p
for vj_p in Sj_u:
for vi_p in Si_p:
conflict = True
if csp.constraints(Xj, vj_p, Xi, vi_p):
conflict = False
Sj_p.add(vj_p)
if not conflict:
break
revised = False
for x in set(csp.curr_domains[Xj]) - Sj_p:
csp.prune(Xj, x, removals)
revised = True
if revised:
for Xk in csp.neighbors[Xj]:
if Xk != Xi:
queue.add((Xk, Xj))
return True
def partition(csp, Xi, Xj):
Si_p = set()
Sj_p = set()
Sj_u = set(csp.curr_domains[Xj])
for vi_u in csp.curr_domains[Xi]:
conflict = True
# now, in order to establish support for a value vi_u in Di it seems better to try to find a support among
# the values in Sj_u first, because for each vj_u in Sj_u the check (vi_u, vj_u) is a double-support check
# and it is just as likely that any vj_u in Sj_u supports vi_u than it is that any vj_p in Sj_p does...
for vj_u in Sj_u - Sj_p:
# double-support check
if csp.constraints(Xi, vi_u, Xj, vj_u):
conflict = False
Si_p.add(vi_u)
Sj_p.add(vj_u)
if not conflict:
break
# ... and only if no support can be found among the elements in Sj_u, should the elements vj_p in Sj_p be used
# for single-support checks (vi_u, vj_p)
if conflict:
for vj_p in Sj_p:
# single-support check
if csp.constraints(Xi, vi_u, Xj, vj_p):
conflict = False
Si_p.add(vi_u)
if not conflict:
break
return Si_p, Sj_p, Sj_u - Sj_p
# Constraint Propagation with AC-4
def AC4(csp, queue=None, removals=None, arc_heuristic=dom_j_up):
if queue is None:
queue = {(Xi, Xk) for Xi in csp.variables for Xk in csp.neighbors[Xi]}
csp.support_pruning()
queue = arc_heuristic(csp, queue)
support_counter = Counter()
variable_value_pairs_supported = defaultdict(set)
unsupported_variable_value_pairs = []
# construction and initialization of support sets
while queue:
(Xi, Xj) = queue.pop()
revised = False
for x in csp.curr_domains[Xi][:]:
for y in csp.curr_domains[Xj]:
if csp.constraints(Xi, x, Xj, y):
support_counter[(Xi, x, Xj)] += 1
variable_value_pairs_supported[(Xj, y)].add((Xi, x))
if support_counter[(Xi, x, Xj)] == 0:
csp.prune(Xi, x, removals)
revised = True
unsupported_variable_value_pairs.append((Xi, x))
if revised:
if not csp.curr_domains[Xi]:
return False
# propagation of removed values
while unsupported_variable_value_pairs:
Xj, y = unsupported_variable_value_pairs.pop()
for Xi, x in variable_value_pairs_supported[(Xj, y)]:
revised = False
if x in csp.curr_domains[Xi][:]:
support_counter[(Xi, x, Xj)] -= 1
if support_counter[(Xi, x, Xj)] == 0:
csp.prune(Xi, x, removals)
revised = True
unsupported_variable_value_pairs.append((Xi, x))
if revised:
if not csp.curr_domains[Xi]:
return False
return True
# ______________________________________________________________________________
# CSP Backtracking Search
def first_unassigned_variable(assignment, csp):
return first([var for var in csp.variables if var not in assignment])
[v for v in csp.variables if v not in assignment],
def num_legal_values(csp, var, assignment):
if csp.curr_domains:
return len(csp.curr_domains[var])
else:
return count(csp.nconflicts(var, val, assignment) == 0
for val in csp.domains[var])
def unordered_domain_values(var, assignment, csp):
return csp.choices(var)
def lcv(var, assignment, csp):
return sorted(csp.choices(var),
key=lambda val: csp.nconflicts(var, val, assignment))
def no_inference(csp, var, value, assignment, removals):
return True
def forward_checking(csp, var, value, assignment, removals):
"""Prune neighbor values inconsistent with var=value."""
csp.support_pruning()
for B in csp.neighbors[var]:
if B not in assignment:
for b in csp.curr_domains[B][:]:
if not csp.constraints(var, value, B, b):
csp.prune(B, b, removals)
if not csp.curr_domains[B]:
return False
return True
Donato Meoli
a validé
def mac(csp, var, value, assignment, removals, constraint_propagation=AC3b):
Donato Meoli
a validé
return constraint_propagation(csp, {(X, var) for X in csp.neighbors[var]}, removals)
def backtracking_search(csp,
select_unassigned_variable=first_unassigned_variable,
order_domain_values=unordered_domain_values,
inference=no_inference):
def backtrack(assignment):
if len(assignment) == len(csp.variables):
return assignment
var = select_unassigned_variable(assignment, csp)
for value in order_domain_values(var, assignment, csp):
if 0 == csp.nconflicts(var, value, assignment):
csp.assign(var, value, assignment)
removals = csp.suppose(var, value)
if inference(csp, var, value, assignment, removals):
result = backtrack(assignment)
if result is not None:
return result
csp.unassign(var, assignment)
return None
result = backtrack({})
assert result is None or csp.goal_test(result)
return result
# ______________________________________________________________________________
Donato Meoli
a validé
# Min-conflicts Hill Climbing search for CSPs
def min_conflicts(csp, max_steps=100000):
Donato Meoli
a validé
"""Solve a CSP by stochastic Hill Climbing on the number of conflicts."""
# Generate a complete assignment for all variables (probably with conflicts)
for var in csp.variables:
val = min_conflicts_value(csp, var, current)
csp.assign(var, val, current)
# Now repeatedly choose a random conflicted variable and change it
for i in range(max_steps):
conflicted = csp.conflicted_vars(current)
if not conflicted:
return current
var = random.choice(conflicted)
val = min_conflicts_value(csp, var, current)
csp.assign(var, val, current)
return None
def min_conflicts_value(csp, var, current):
"""Return the value that will give var the least number of conflicts.
If there is a tie, choose at random."""
return argmin_random_tie(csp.domains[var],
# ______________________________________________________________________________
root = csp.variables[0]
X, parent = topological_sort(csp, root)
if not make_arc_consistent(parent[Xj], Xj, csp):
return None
assignment[root] = csp.curr_domains[root][0]
for Xi in X[1:]:
assignment[Xi] = assign_value(parent[Xi], Xi, csp, assignment)
if not assignment[Xi]:
return None
return assignment
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
def topological_sort(X, root):
"""Returns the topological sort of X starting from the root.
Input:
X is a list with the nodes of the graph
N is the dictionary with the neighbors of each node
root denotes the root of the graph.
Output:
stack is a list with the nodes topologically sorted
parents is a dictionary pointing to each node's parent
Other:
visited shows the state (visited - not visited) of nodes
"""
neighbors = X.neighbors
visited = defaultdict(lambda: False)
stack = []
parents = {}
build_topological(root, None, neighbors, visited, stack, parents)
return stack, parents
def build_topological(node, parent, neighbors, visited, stack, parents):
"""Build the topological sort and the parents of each node in the graph."""
visited[node] = True
for n in neighbors[node]:
build_topological(n, node, neighbors, visited, stack, parents)
parents[node] = parent
"""Make arc between parent (Xj) and child (Xk) consistent under the csp's constraints,
by removing the possible values of Xj that cause inconsistencies."""
for val1 in csp.domains[Xj]:
for val2 in csp.domains[Xk]:
if csp.constraints(Xj, val1, Xk, val2):
# Found a consistent assignment for val1, keep it
keep = True
break
if not keep:
# Remove val1
csp.prune(Xj, val1, None)
return csp.curr_domains[Xj]
def assign_value(Xj, Xk, csp, assignment):
"""Assign a value to Xk given Xj's (Xk's parent) assignment.
Return the first value that satisfies the constraints."""
parent_assignment = assignment[Xj]
for val in csp.curr_domains[Xk]:
if csp.constraints(Xj, parent_assignment, Xk, val):
return val
# No consistent assignment available
return None
# ______________________________________________________________________________
class UniversalDict:
"""A universal dict maps any key to the same value. We use it here
as the domains dict for CSPs in which all variables have the same domain.
>>> d = UniversalDict(42)
>>> d['life']
42
"""
def __init__(self, value): self.value = value
def __getitem__(self, key): return self.value
def __repr__(self): return '{{Any: {0!r}}}'.format(self.value)
def different_values_constraint(A, a, B, b):
"""A constraint saying two neighboring variables must differ in value."""
return a != b
def MapColoringCSP(colors, neighbors):
"""Make a CSP for the problem of coloring a map with different colors
for any two adjacent regions. Arguments are a list of colors, and a
dict of {region: [neighbor,...]} entries. This dict may also be
specified as a string of the form defined by parse_neighbors."""
if isinstance(neighbors, str):
return CSP(list(neighbors.keys()), UniversalDict(colors), neighbors,
different_values_constraint)
def parse_neighbors(neighbors, variables=None):
"""Convert a string of the form 'X: Y Z; Y: Z' into a dict mapping
regions to neighbors. The syntax is a region name followed by a ':'
followed by zero or more region names, followed by ';', repeated for
each region name. If you say 'X: Y' you don't need 'Y: X'.
>>> parse_neighbors('X: Y Z; Y: Z') == {'Y': ['X', 'Z'], 'X': ['Y', 'Z'], 'Z': ['X', 'Y']}
True
specs = [spec.split(':') for spec in neighbors.split(';')]
for (A, Aneighbors) in specs:
for B in Aneighbors.split():
australia_csp = MapColoringCSP(list('RGB'), """SA: WA NT Q NSW V; NT: WA Q; NSW: Q V; T: """)
usa_csp = MapColoringCSP(list('RGBY'),
"""WA: OR ID; OR: ID NV CA; CA: NV AZ; NV: ID UT AZ; ID: MT WY UT;
UT: WY CO AZ; MT: ND SD WY; WY: SD NE CO; CO: NE KA OK NM; NM: OK TX AZ;
ND: MN SD; SD: MN IA NE; NE: IA MO KA; KA: MO OK; OK: MO AR TX;
TX: AR LA; MN: WI IA; IA: WI IL MO; MO: IL KY TN AR; AR: MS TN LA;
LA: MS; WI: MI IL; IL: IN KY; IN: OH KY; MS: TN AL; AL: TN GA FL;
MI: OH IN; OH: PA WV KY; KY: WV VA TN; TN: VA NC GA; GA: NC SC FL;
PA: NY NJ DE MD WV; WV: MD VA; VA: MD DC NC; NC: SC; NY: VT MA CT NJ;
NJ: DE; DE: MD; MD: DC; VT: NH MA; MA: NH RI CT; CT: RI; ME: NH;
HI: ; AK: """)
france_csp = MapColoringCSP(list('RGBY'),
"""AL: LO FC; AQ: MP LI PC; AU: LI CE BO RA LR MP; BO: CE IF CA FC RA
AU; BR: NB PL; CA: IF PI LO FC BO; CE: PL NB NH IF BO AU LI PC; FC: BO
CA LO AL RA; IF: NH PI CA BO CE; LI: PC CE AU MP AQ; LO: CA AL FC; LR:
MP AU RA PA; MP: AQ LI AU LR; NB: NH CE PL BR; NH: PI IF CE NB; NO:
PI; PA: LR RA; PC: PL CE LI AQ; PI: NH NO CA IF; PL: BR NB CE PC; RA:
AU BO FC PA LR""")
# ______________________________________________________________________________
# n-Queens Problem
def queen_constraint(A, a, B, b):
"""Constraint is satisfied (true) if A, B are really the same variable,
or if they are not in the same row, down diagonal, or up diagonal."""
return A == B or (a != b and A + a != B + b and A - a != B - b)
class NQueensCSP(CSP):
"""Make a CSP for the nQueens problem for search with min_conflicts.
Suitable for large n, it uses only data structures of size O(n).
Think of placing queens one per column, from left to right.
That means position (x, y) represents (var, val) in the CSP.
The main structures are three arrays to count queens that could conflict:
rows[i] Number of queens in the ith row (i.e val == i)
downs[i] Number of queens in the \ diagonal
such that their (x, y) coordinates sum to i
ups[i] Number of queens in the / diagonal
such that their (x, y) coordinates have x-y+n-1 = i
We increment/decrement these counts each time a queen is placed/moved from
a row/diagonal. So moving is O(1), as is nconflicts. But choosing
a variable, and a best value for the variable, are each O(n).
If you want, you can keep track of conflicted variables, then variable
selection will also be O(1).
>>> len(backtracking_search(NQueensCSP(8)))
8
"""
def __init__(self, n):
"""Initialize data structures for n Queens."""
CSP.__init__(self, list(range(n)), UniversalDict(list(range(n))),
UniversalDict(list(range(n))), queen_constraint)
self.rows = [0] * n
self.ups = [0] * (2 * n - 1)
self.downs = [0] * (2 * n - 1)
"""The number of conflicts, as recorded with each assignment.
Count conflicts in row and in up, down diagonals. If there
is a queen there, it can't conflict with itself, so subtract 3."""
n = len(self.variables)
c = self.rows[val] + self.downs[var + val] + self.ups[var - val + n - 1]
if assignment.get(var, None) == val:
c -= 3
return c
def assign(self, var, val, assignment):
"""Assign var, and keep track of conflicts."""
oldval = assignment.get(var, None)
if val != oldval:
if oldval is not None: # Remove old val if there was one
self.record_conflict(assignment, var, oldval, -1)
self.record_conflict(assignment, var, val, +1)
CSP.assign(self, var, val, assignment)
def unassign(self, var, assignment):
"""Remove var from assignment (if it is there) and track conflicts."""
if var in assignment:
self.record_conflict(assignment, var, assignment[var], -1)
CSP.unassign(self, var, assignment)
def record_conflict(self, assignment, var, val, delta):
"""Record conflicts caused by addition or deletion of a Queen."""
n = len(self.variables)
self.rows[val] += delta
self.downs[var + val] += delta
self.ups[var - val + n - 1] += delta
def display(self, assignment):
"""Print the queens and the nconflicts values (for debugging)."""
n = len(self.variables)
for val in range(n):
for var in range(n):
elif (var + val) % 2 == 0:
ch = '.'
else:
ch = '-'
print(ch, end=' ')
print(' ', end=' ')
for var in range(n):
if assignment.get(var, '') == val:
ch = '*'
else:
ch = ' '
print(str(self.nconflicts(var, val, assignment)) + ch, end=' ')
# ______________________________________________________________________________
# Sudoku
easy1 = '..3.2.6..9..3.5..1..18.64....81.29..7.......8..67.82....26.95..8..2.3..9..5.1.3..'
harder1 = '4173698.5.3..........7......2.....6.....8.4......1.......6.3.7.5..2.....1.4......'
_R3 = list(range(3))
_CELL = itertools.count().__next__
_BGRID = [[[[_CELL() for x in _R3] for y in _R3] for bx in _R3] for by in _R3]
_BOXES = flatten([list(map(flatten, brow)) for brow in _BGRID])
_ROWS = flatten([list(map(flatten, zip(*brow))) for brow in _BGRID])
_NEIGHBORS = {v: set() for v in flatten(_ROWS)}
for unit in map(set, _BOXES + _ROWS + _COLS):
for v in unit:
class Sudoku(CSP):
"""A Sudoku problem.
The box grid is a 3x3 array of boxes, each a 3x3 array of cells.
Each cell holds a digit in 1..9. In each box, all digits are
different; the same for each row and column as a 9x9 grid.
>>> e = Sudoku(easy1)
>>> e.display(e.infer_assignment())
. . 3 | . 2 . | 6 . .
9 . . | 3 . 5 | . . 1
. . 1 | 8 . 6 | 4 . .
------+-------+------
. . 8 | 1 . 2 | 9 . .
7 . . | . . . | . . 8
. . 6 | 7 . 8 | 2 . .
------+-------+------
. . 2 | 6 . 9 | 5 . .
8 . . | 2 . 3 | . . 9
. . 5 | . 1 . | 3 . .
>>> AC3(e); e.display(e.infer_assignment())
4 8 3 | 9 2 1 | 6 5 7
9 6 7 | 3 4 5 | 8 2 1
2 5 1 | 8 7 6 | 4 9 3
------+-------+------
5 4 8 | 1 3 2 | 9 7 6
7 2 9 | 5 6 4 | 1 3 8
1 3 6 | 7 9 8 | 2 4 5
------+-------+------
3 7 2 | 6 8 9 | 5 1 4
8 1 4 | 2 5 3 | 7 6 9
6 9 5 | 4 1 7 | 3 8 2
>>> h = Sudoku(harder1)
>>> backtracking_search(h, select_unassigned_variable=mrv, inference=forward_checking) is not None
R3 = _R3
Cell = _CELL
bgrid = _BGRID
boxes = _BOXES
rows = _ROWS
cols = _COLS
neighbors = _NEIGHBORS
def __init__(self, grid):
"""Build a Sudoku problem from a string representing the grid:
the digits 1-9 denote a filled cell, '.' or '0' an empty one;
other characters are ignored."""
squares = iter(re.findall(r'\d|\.', grid))
domains = {var: [ch] if ch in '123456789' else '123456789'
for var, ch in zip(flatten(self.rows), squares)}
raise ValueError("Not a Sudoku grid", grid) # Too many squares
CSP.__init__(self, None, domains, self.neighbors, different_values_constraint)
def display(self, assignment):
def show_box(box): return [' '.join(map(show_cell, row)) for row in box]
def show_cell(cell): return str(assignment.get(cell, '.'))
def abut(lines1, lines2): return list(
map(' | '.join, list(zip(lines1, lines2))))
abut, map(show_box, brow))) for brow in self.bgrid))
# ______________________________________________________________________________
# The Zebra Puzzle
def Zebra():
"""Return an instance of the Zebra Puzzle."""
Colors = 'Red Yellow Blue Green Ivory'.split()
Pets = 'Dog Fox Snails Horse Zebra'.split()
Drinks = 'OJ Tea Coffee Milk Water'.split()
Countries = 'Englishman Spaniard Norwegian Ukranian Japanese'.split()
Smokes = 'Kools Chesterfields Winston LuckyStrike Parliaments'.split()
variables = Colors + Pets + Drinks + Countries + Smokes
domains = {}
domains['Norwegian'] = [1]
domains['Milk'] = [3]
neighbors = parse_neighbors("""Englishman: Red;
Spaniard: Dog; Kools: Yellow; Chesterfields: Fox;
Norwegian: Blue; Winston: Snails; LuckyStrike: OJ;
Ukranian: Tea; Japanese: Parliaments; Kools: Horse;
Coffee: Green; Green: Ivory""", variables)
for type in [Colors, Pets, Drinks, Countries, Smokes]:
for A in type:
for B in type:
if A != B:
if B not in neighbors[A]:
neighbors[A].append(B)
if A not in neighbors[B]:
neighbors[B].append(A)
def zebra_constraint(A, a, B, b, recurse=0):
same = (a == b)
next_to = abs(a - b) == 1
if A == 'Englishman' and B == 'Red':
return same
if A == 'Spaniard' and B == 'Dog':
return same
if A == 'Chesterfields' and B == 'Fox':
return next_to
if A == 'Norwegian' and B == 'Blue':
return next_to
if A == 'Kools' and B == 'Yellow':
return same
if A == 'Winston' and B == 'Snails':
return same
if A == 'LuckyStrike' and B == 'OJ':
return same
if A == 'Ukranian' and B == 'Tea':
return same
if A == 'Japanese' and B == 'Parliaments':
return same
if A == 'Kools' and B == 'Horse':
return next_to
if A == 'Coffee' and B == 'Green':
return same
if A == 'Green' and B == 'Ivory':
return a - 1 == b
if recurse == 0:
return zebra_constraint(B, b, A, a, 1)
if ((A in Colors and B in Colors) or
(A in Pets and B in Pets) or
(A in Drinks and B in Drinks) or
(A in Countries and B in Countries) or
(A in Smokes and B in Smokes)):
return not same
raise Exception('error')
return CSP(variables, domains, neighbors, zebra_constraint)
def solve_zebra(algorithm=min_conflicts, **args):
z = Zebra()
ans = algorithm(z, **args)
for h in range(1, 6):
Donato Meoli
a validé
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
# ______________________________________________________________________________
# n-ary Constraint Satisfaction Problem
class NaryCSP:
"""A nary-CSP consists of
* domains, a dictionary that maps each variable to its domain
* constraints, a list of constraints
* variables, a set of variables
* var_to_const, a variable to set of constraints dictionary
"""
def __init__(self, domains, constraints):
"""domains is a variable:domain dictionary
constraints is a list of constraints
"""
self.variables = set(domains)
self.domains = domains
self.constraints = constraints
self.var_to_const = {var: set() for var in self.variables}
for con in constraints:
for var in con.scope:
self.var_to_const[var].add(con)
def __str__(self):
"""string representation of CSP"""
return str(self.domains)
def display(self, assignment=None):
"""more detailed string representation of CSP"""
if assignment is None:
assignment = {}
print('CSP(' + str(self.domains) + ', ' + str([str(c) for c in self.constraints]) + ') with assignment: ' +
str(assignment))
def consistent(self, assignment):
"""assignment is a variable:value dictionary
returns True if all of the constraints that can be evaluated
evaluate to True given assignment.
"""
return all(con.holds(assignment)
for con in self.constraints
if all(v in assignment for v in con.scope))
class Constraint:
"""A Constraint consists of
* scope: a tuple of variables
* condition: a function that can applied to a tuple of values
for the variables
"""
def __init__(self, scope, condition):
self.scope = scope
self.condition = condition
def __repr__(self):
return self.condition.__name__ + str(self.scope)
def holds(self, assignment):
"""Returns the value of Constraint con evaluated in assignment.
precondition: all variables are assigned in assignment
"""
return self.condition(*tuple(assignment[v] for v in self.scope))
def all_diff(*values):
"""Returns True if all values are different, False otherwise"""
return len(values) is len(set(values))
def is_word(words):
"""Returns True if the letters concatenated form a word in words, False otherwise"""
def isw(*letters):
return "".join(letters) in words
return isw
def meet_at(p1, p2):
"""Returns a function that is True when the words meet at the positions (p1, p2), False otherwise"""
def meets(w1, w2):
return w1[p1] == w2[p2]
meets.__name__ = "meet_at(" + str(p1) + ',' + str(p2) + ')'
return meets
def adjacent(x, y):
"""Returns True if x and y are adjacent numbers, False otherwise"""
return abs(x - y) == 1
def sum_(n):
"""Returns a function that is True when the the sum of all values is n, False otherwise"""
def sumv(*values):
return sum(values) is n
sumv.__name__ = str(n) + "==sum"
return sumv
def is_(val):
"""Returns a function that is True when x is equal to val, False otherwise"""
def isv(x):
return val == x
isv.__name__ = str(val) + "=="
return isv
def ne_(val):
"""Returns a function that is True when x is not equal to val, False otherwise"""
def nev(x):
return val != x
nev.__name__ = str(val) + "!="
return nev
def no_heuristic(to_do):
return to_do
def sat_up(to_do):
return SortedSet(to_do, key=lambda t: 1 / len([var for var in t[1].scope]))
class ACSolver:
"""Solves a CSP with arc consistency and domain splitting"""
def __init__(self, csp):
"""a CSP solver that uses arc consistency
* csp is the CSP to be solved
"""
self.csp = csp
def GAC(self, orig_domains=None, to_do=None, arc_heuristic=sat_up):
"""Makes this CSP arc-consistent using Generalized Arc Consistency
orig_domains is the original domains
to_do is a set of (variable,constraint) pairs
returns the reduced domains (an arc-consistent variable:domain dictionary)
"""
if orig_domains is None:
orig_domains = self.csp.domains
if to_do is None:
to_do = {(var, const) for const in self.csp.constraints
for var in const.scope}
else:
to_do = to_do.copy()
domains = orig_domains.copy()
to_do = arc_heuristic(to_do)
while to_do:
var, const = to_do.pop()
other_vars = [ov for ov in const.scope if ov != var]
if len(other_vars) == 0:
new_domain = {val for val in domains[var]
if const.holds({var: val})}
elif len(other_vars) == 1:
other = other_vars[0]
new_domain = {val for val in domains[var]
if any(const.holds({var: val, other: other_val})
for other_val in domains[other])}
else:
new_domain = {val for val in domains[var]
if self.any_holds(domains, const, {var: val}, other_vars)}
if new_domain != domains[var]:
domains[var] = new_domain
if not new_domain:
return False, domains
add_to_do = self.new_to_do(var, const).difference(to_do)
to_do |= add_to_do
return True, domains
def new_to_do(self, var, const):
"""returns new elements to be added to to_do after assigning
variable var in constraint const.
"""
return {(nvar, nconst) for nconst in self.csp.var_to_const[var]
if nconst != const
for nvar in nconst.scope
if nvar != var}
def any_holds(self, domains, const, env, other_vars, ind=0):
"""returns True if Constraint const holds for an assignment
that extends env with the variables in other_vars[ind:]
env is a dictionary
Warning: this has side effects and changes the elements of env
"""
if ind == len(other_vars):
return const.holds(env)
else:
var = other_vars[ind]
for val in domains[var]:
# env = dict_union(env,{var:val}) # no side effects!
env[var] = val
holds = self.any_holds(domains, const, env, other_vars, ind + 1)
if holds:
return True
return False
def domain_splitting(self, domains=None, to_do=None, arc_heuristic=sat_up):
"""return a solution to the current CSP or False if there are no solutions
to_do is the list of arcs to check
"""
if domains is None:
domains = self.csp.domains
consistency, new_domains = self.GAC(domains, to_do, arc_heuristic)
if not consistency:
return False
elif all(len(new_domains[var]) == 1 for var in domains):
return {var: first(new_domains[var]) for var in domains}
else:
var = first(x for x in self.csp.variables if len(new_domains[x]) > 1)
if var:
dom1, dom2 = partition_domain(new_domains[var])
new_doms1 = extend(new_domains, var, dom1)
new_doms2 = extend(new_domains, var, dom2)
to_do = self.new_to_do(var, None)
return self.domain_splitting(new_doms1, to_do, arc_heuristic) or \
self.domain_splitting(new_doms2, to_do, arc_heuristic)
def partition_domain(dom):
"""partitions domain dom into two"""
split = len(dom) // 2
dom1 = set(list(dom)[:split])
dom2 = dom - dom1
return dom1, dom2
class ACSearchSolver(search.Problem):
"""A search problem with arc consistency and domain splitting
A node is a CSP """
def __init__(self, csp, arc_heuristic=sat_up):
self.cons = ACSolver(csp)
consistency, self.domains = self.cons.GAC(arc_heuristic=arc_heuristic)
if not consistency:
raise Exception('CSP is inconsistent')
self.heuristic = arc_heuristic
super().__init__(self.domains)
def goal_test(self, node):
"""node is a goal if all domains have 1 element"""
return all(len(node[var]) == 1 for var in node)
def actions(self, state):
var = first(x for x in state if len(state[x]) > 1)
neighs = []
if var:
dom1, dom2 = partition_domain(state[var])
to_do = self.cons.new_to_do(var, None)
for dom in [dom1, dom2]:
new_domains = extend(state, var, dom)
consistency, cons_doms = self.cons.GAC(new_domains, to_do, self.heuristic)
if consistency:
neighs.append(cons_doms)
return neighs
def result(self, state, action):
return action
def ac_solver(csp, arc_heuristic=sat_up):
"""arc consistency (domain splitting)"""
return ACSolver(csp).domain_splitting(arc_heuristic=arc_heuristic)
def ac_search_solver(csp, arc_heuristic=sat_up):
"""arc consistency (search interface)"""
from search import depth_first_tree_search
solution = None
try:
solution = depth_first_tree_search(ACSearchSolver(csp, arc_heuristic=arc_heuristic)).state
except:
return solution
if solution:
return {var: first(solution[var]) for var in solution}
# ______________________________________________________________________________
# Crossword Problem
csp_crossword = NaryCSP({'one_across': {'ant', 'big', 'bus', 'car', 'has'},
'one_down': {'book', 'buys', 'hold', 'lane', 'year'},
'two_down': {'ginger', 'search', 'symbol', 'syntax'},
'three_across': {'book', 'buys', 'hold', 'land', 'year'},
'four_across': {'ant', 'big', 'bus', 'car', 'has'}},
[Constraint(('one_across', 'one_down'), meet_at(0, 0)),
Constraint(('one_across', 'two_down'), meet_at(2, 0)),
Constraint(('three_across', 'two_down'), meet_at(2, 2)),
Constraint(('three_across', 'one_down'), meet_at(0, 2)),
Constraint(('four_across', 'two_down'), meet_at(0, 4))])
crossword1 = [['_', '_', '_', '*', '*'],
['_', '*', '_', '*', '*'],
['_', '_', '_', '_', '*'],
['_', '*', '_', '*', '*'],
['*', '*', '_', '_', '_'],
['*', '*', '_', '*', '*']]
words1 = {'ant', 'big', 'bus', 'car', 'has', 'book', 'buys', 'hold',
'lane', 'year', 'ginger', 'search', 'symbol', 'syntax'}
class Crossword(NaryCSP):
def __init__(self, puzzle, words):
domains = {}
constraints = []
for i, line in enumerate(puzzle):
scope = []
for j, element in enumerate(line):
if element == '_':
var = "p" + str(j) + str(i)
domains[var] = list(string.ascii_lowercase)
scope.append(var)
else:
if len(scope) > 1:
constraints.append(Constraint(tuple(scope), is_word(words)))
scope.clear()
if len(scope) > 1:
constraints.append(Constraint(tuple(scope), is_word(words)))
puzzle_t = list(map(list, zip(*puzzle)))
for i, line in enumerate(puzzle_t):
scope = []
for j, element in enumerate(line):
if element == '_':
scope.append("p" + str(i) + str(j))
else:
if len(scope) > 1:
constraints.append(Constraint(tuple(scope), is_word(words)))
scope.clear()
if len(scope) > 1:
constraints.append(Constraint(tuple(scope), is_word(words)))
super().__init__(domains, constraints)
self.puzzle = puzzle
def display(self, assignment=None):
for i, line in enumerate(self.puzzle):
puzzle = ""
for j, element in enumerate(line):
if element == '*':
puzzle += "[*] "
else:
var = "p" + str(j) + str(i)
if assignment is not None:
if isinstance(assignment[var], set) and len(assignment[var]) is 1:
puzzle += "[" + str(first(assignment[var])).upper() + "] "
elif isinstance(assignment[var], str):
puzzle += "[" + str(assignment[var]).upper() + "] "
else:
puzzle += "[_] "
else:
puzzle += "[_] "
print(puzzle)
# ______________________________________________________________________________
Donato Meoli
a validé
# Kakuro Problem
Donato Meoli
a validé
# difficulty 0
Donato Meoli
a validé
kakuro1 = [['*', '*', '*', [6, ''], [3, '']],
Donato Meoli
a validé
['*', [4, ''], [3, 3], '_', '_'],
[['', 10], '_', '_', '_', '_'],
[['', 3], '_', '_', '*', '*']]
# difficulty 0
Donato Meoli
a validé
kakuro2 = [
Donato Meoli
a validé
['*', [10, ''], [13, ''], '*'],
[['', 3], '_', '_', [13, '']],
[['', 12], '_', '_', '_'],
[['', 21], '_', '_', '_']]
# difficulty 1
Donato Meoli
a validé
kakuro3 = [
Donato Meoli
a validé
['*', [17, ''], [28, ''], '*', [42, ''], [22, '']],
[['', 9], '_', '_', [31, 14], '_', '_'],
[['', 20], '_', '_', '_', '_', '_'],
['*', ['', 30], '_', '_', '_', '_'],
['*', [22, 24], '_', '_', '_', '*'],
[['', 25], '_', '_', '_', '_', [11, '']],
[['', 20], '_', '_', '_', '_', '_'],
[['', 14], '_', '_', ['', 17], '_', '_']]
# difficulty 2
Donato Meoli
a validé
kakuro4 = [
Donato Meoli
a validé
['*', '*', '*', '*', '*', [4, ''], [24, ''], [11, ''], '*', '*', '*', [11, ''], [17, ''], '*', '*'],
['*', '*', '*', [17, ''], [11, 12], '_', '_', '_', '*', '*', [24, 10], '_', '_', [11, ''], '*'],
['*', [4, ''], [16, 26], '_', '_', '_', '_', '_', '*', ['', 20], '_', '_', '_', '_', [16, '']],
[['', 20], '_', '_', '_', '_', [24, 13], '_', '_', [16, ''], ['', 12], '_', '_', [23, 10], '_', '_'],
[['', 10], '_', '_', [24, 12], '_', '_', [16, 5], '_', '_', [16, 30], '_', '_', '_', '_', '_'],
['*', '*', [3, 26], '_', '_', '_', '_', ['', 12], '_', '_', [4, ''], [16, 14], '_', '_', '*'],
['*', ['', 8], '_', '_', ['', 15], '_', '_', [34, 26], '_', '_', '_', '_', '_', '*', '*'],
['*', ['', 11], '_', '_', [3, ''], [17, ''], ['', 14], '_', '_', ['', 8], '_', '_', [7, ''], [17, ''], '*'],
['*', '*', '*', [23, 10], '_', '_', [3, 9], '_', '_', [4, ''], [23, ''], ['', 13], '_', '_', '*'],
['*', '*', [10, 26], '_', '_', '_', '_', '_', ['', 7], '_', '_', [30, 9], '_', '_', '*'],
['*', [17, 11], '_', '_', [11, ''], [24, 8], '_', '_', [11, 21], '_', '_', '_', '_', [16, ''], [17, '']],
[['', 29], '_', '_', '_', '_', '_', ['', 7], '_', '_', [23, 14], '_', '_', [3, 17], '_', '_'],
[['', 10], '_', '_', [3, 10], '_', '_', '*', ['', 8], '_', '_', [4, 25], '_', '_', '_', '_'],
['*', ['', 16], '_', '_', '_', '_', '*', ['', 23], '_', '_', '_', '_', '_', '*', '*'],
['*', '*', ['', 6], '_', '_', '*', '*', ['', 15], '_', '_', '_', '*', '*', '*', '*']]
Donato Meoli
a validé
class Kakuro(NaryCSP):
Donato Meoli
a validé
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
def __init__(self, puzzle):
variables = []
for i, line in enumerate(puzzle):
# print line
for j, element in enumerate(line):
if element == '_':
var1 = str(i)
if len(var1) == 1:
var1 = "0" + var1
var2 = str(j)
if len(var2) == 1:
var2 = "0" + var2
variables.append("X" + var1 + var2)
domains = {}
for var in variables:
domains[var] = set(range(1, 10))
constraints = []
for i, line in enumerate(puzzle):
for j, element in enumerate(line):
if element != '_' and element != '*':
# down - column
if element[0] != '':
x = []
for k in range(i + 1, len(puzzle)):
if puzzle[k][j] != '_':
break
var1 = str(k)
if len(var1) == 1:
var1 = "0" + var1
var2 = str(j)
if len(var2) == 1:
var2 = "0" + var2
x.append("X" + var1 + var2)
constraints.append(Constraint(x, sum_(element[0])))
constraints.append(Constraint(x, all_diff))
# right - line
if element[1] != '':
x = []
for k in range(j + 1, len(puzzle[i])):
if puzzle[i][k] != '_':
break
var1 = str(i)
if len(var1) == 1:
var1 = "0" + var1
var2 = str(k)
if len(var2) == 1:
var2 = "0" + var2
x.append("X" + var1 + var2)
constraints.append(Constraint(x, sum_(element[1])))
constraints.append(Constraint(x, all_diff))
super().__init__(domains, constraints)
self.puzzle = puzzle
def display(self, assignment=None):
for i, line in enumerate(self.puzzle):
puzzle = ""
for j, element in enumerate(line):
if element == '*':
puzzle += "[*]\t"
elif element == '_':
var1 = str(i)
if len(var1) == 1:
var1 = "0" + var1
var2 = str(j)
if len(var2) == 1:
var2 = "0" + var2
var = "X" + var1 + var2
if assignment is not None:
if isinstance(assignment[var], set) and len(assignment[var]) is 1:
puzzle += "[" + str(first(assignment[var])) + "]\t"
elif isinstance(assignment[var], int):
puzzle += "[" + str(assignment[var]) + "]\t"
else:
puzzle += "[_]\t"
else:
puzzle += "[_]\t"
else:
puzzle += str(element[0]) + "\\" + str(element[1]) + "\t"
print(puzzle)
# ______________________________________________________________________________
# Cryptarithmetic Problem
# [Figure 6.2]
# T W O + T W O = F O U R
two_two_four = NaryCSP({'T': set(range(1, 10)), 'F': set(range(1, 10)),
'W': set(range(0, 10)), 'O': set(range(0, 10)), 'U': set(range(0, 10)), 'R': set(range(0, 10)),
'C1': set(range(0, 2)), 'C2': set(range(0, 2)), 'C3': set(range(0, 2))},
[Constraint(('T', 'F', 'W', 'O', 'U', 'R'), all_diff),
Constraint(('O', 'R', 'C1'), lambda o, r, c1: o + o == r + 10 * c1),
Constraint(('W', 'U', 'C1', 'C2'), lambda w, u, c1, c2: c1 + w + w == u + 10 * c2),
Constraint(('T', 'O', 'C2', 'C3'), lambda t, o, c2, c3: c2 + t + t == o + 10 * c3),
Constraint(('F', 'C3'), eq)])
# S E N D + M O R E = M O N E Y
send_more_money = NaryCSP({'S': set(range(1, 10)), 'M': set(range(1, 10)),
'E': set(range(0, 10)), 'N': set(range(0, 10)), 'D': set(range(0, 10)),
'O': set(range(0, 10)), 'R': set(range(0, 10)), 'Y': set(range(0, 10)),
'C1': set(range(0, 2)), 'C2': set(range(0, 2)), 'C3': set(range(0, 2)),
'C4': set(range(0, 2))},
[Constraint(('S', 'E', 'N', 'D', 'M', 'O', 'R', 'Y'), all_diff),
Constraint(('D', 'E', 'Y', 'C1'), lambda d, e, y, c1: d + e == y + 10 * c1),
Constraint(('N', 'R', 'E', 'C1', 'C2'), lambda n, r, e, c1, c2: c1 + n + r == e + 10 * c2),
Constraint(('E', 'O', 'N', 'C2', 'C3'), lambda e, o, n, c2, c3: c2 + e + o == n + 10 * c3),
Constraint(('S', 'M', 'O', 'C3', 'C4'), lambda s, m, o, c3, c4: c3 + s + m == o + 10 * c4),
Constraint(('M', 'C4'), eq)])