-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase_observer.py
More file actions
51 lines (42 loc) · 1.45 KB
/
database_observer.py
File metadata and controls
51 lines (42 loc) · 1.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from threading import Thread
from time import sleep
import inject
from config import Config
from dao.changes import ChangesDAO
from event_system import MutableChannel, Channel
class DatabaseObserver:
source = inject.attr(ChangesDAO)
config = inject.attr(Config)
def __init__(self):
self._channel: MutableChannel[None] = MutableChannel()
self._last_change = None
self._force_update = True
self._thread = Thread(
name="DatabasePolling",
target=self._poll_db,
daemon=True
)
self._UPDATE_EVENT = 0
def subscribe(self, *callbacks: Channel.Callback):
self._channel.subscribe(self._UPDATE_EVENT, *callbacks)
def run(self):
self._thread.start()
def _poll_db(self):
while True:
try:
if self._force_update or self._has_changes():
self._channel.publish(self._UPDATE_EVENT, None)
self._force_update = False
except Exception as e:
self._force_update = True
print(e)
sleep(self.config.database_observer.refresh_interval)
def _has_changes(self):
last_change = self.source.get_last_change()
if not last_change:
return False
last_change = last_change.timestamp()
if last_change != self._last_change:
self._last_change = last_change
return True
return False