-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrunner.py
More file actions
95 lines (63 loc) · 2.14 KB
/
runner.py
File metadata and controls
95 lines (63 loc) · 2.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import importlib
from multiprocessing import Process
from pathlib import Path
import time
class ReloadableModule:
def __init__(self, name):
self.module = importlib.import_module(name, package='laundry-pi')
def reload(self):
"""Hope the target module doesn't have a reload attribute"""
self.module = importlib.reload(self.module)
def __getattr__(self, attr):
try:
return getattr(self.module, attr)
except AttributeError:
raise ValueError('attribute', attr, 'not found in module')
def run_with_updates(module, function, update_interval=60):
"""runs a function from a module and reloads it whenever it completes"""
module = ReloadableModule(module)
p = Process(target=getattr(module, function))
p.start()
while True:
time.sleep(update_interval)
if not p.is_alive():
module.reload()
p = Process(target=getattr(module, function))
p.start()
def main():
import sys
try:
module_name = sys.argv[1]
method_name = sys.argv[2]
except:
print('Invalid arguments! use python3 runner.py [module path] [function name]')
return
module_name = module_name.replace('.py', '')
run_with_updates(module_name, method_name, update_interval=5)
def test():
import time
from flagger import Flag
timeout = 30
flag = Flag('.flag')
##### Add stuff here to test #####
##################################
for t in range(timeout):
if flag: break
print('waiting for' + str(t) + 's...')
##### Add stuff here to test #####
if t > 3:
flag.set()
print('flag.test() set the flag after ' + str(t) + 's!')
##################################
time.sleep(1)
else:
print('reloading: timeout after ' + str(timeout) + 's')
##### Add stuff here to test #####
##################################
return
print('Reloading: Flag set!')
##### Add stuff here to test #####
##################################
flag.unflag()
if __name__ == '__main__':
main()