-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlinearizer.py
More file actions
94 lines (65 loc) · 2.58 KB
/
linearizer.py
File metadata and controls
94 lines (65 loc) · 2.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import time
import random
CACHE = {}
def linearize(pop, number = 0):
if number:
return compute_bounded_plans(pop, number)
else:
return compute_plans(pop, set(), set([pop.init]))
def count_linearizations(pop):
global CACHE
CACHE = {}
return count_plans(pop, set(), set([pop.init]))
def check_successor(pop, seen, successor):
return set([item[0] for item in pop.network.in_edges(successor)]) <= seen
def compute_plans(pop, seen, possible):
if 0 == len(possible):
return [[]]
#print "Call:\n Seen: %s\n Possible: %s\n\n" % (str(seen), str(possible))
#time.sleep(1)
plans = []
for action in possible:
new_possible = set()
for successor in [item[1] for item in pop.network.out_edges(action)]:
if check_successor(pop, seen | set([action]), successor):
new_possible.add(successor)
new_plans = compute_plans(pop, seen | set([action]), (possible - set([action])) | new_possible)
plans.extend([[action] + item for item in new_plans])
#print "Returning Plans: %s" % str(plans)
#time.sleep(1)
return plans
def count_plans(pop, seen, possible):
global CACHE
hash_val = '/'.join(sorted([str(hash(item)) for item in list(seen)]))
if hash_val in CACHE:
return CACHE[hash_val]
if 0 == len(possible):
return 1
total = 0
for action in possible:
new_possible = set()
for successor in [item[1] for item in pop.network.out_edges(action)]:
if check_successor(pop, seen | set([action]), successor):
new_possible.add(successor)
total += count_plans(pop, seen | set([action]), (possible - set([action])) | new_possible)
CACHE[hash_val] = total
return total
def compute_bounded_plans(pop, bound):
plans = []
seen = set()
while len(plans) < bound:
new_plan = generate_random_plan(pop, set(), set([pop.init]))
hash_val = ','.join([str(hash(item)) for item in new_plan])
if hash_val not in seen:
seen.add(hash_val)
plans.append(new_plan)
return plans
def generate_random_plan(pop, seen, possible):
if 0 == len(possible):
return []
next_action = random.choice(list(possible))
new_possible = set()
for successor in [item[1] for item in pop.network.out_edges(next_action)]:
if check_successor(pop, seen | set([next_action]), successor):
new_possible.add(successor)
return [next_action] + generate_random_plan(pop, seen | set([next_action]), (possible - set([next_action])) | new_possible)