forked from lizard1998myx/MultiBot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdistributor.py
More file actions
129 lines (115 loc) · 5.61 KB
/
distributor.py
File metadata and controls
129 lines (115 loc) · 5.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import pickle, os, traceback
from MultiBot.session_initialize import NEW_SESSIONS, NEW_SESSIONS_CRON
from MultiBot.responses import Response, ResponseMsg
from MultiBot.requests import Request
ACTIVE_SESSIONS = os.path.join('..', 'data', 'active_sessions.data')
# 分拣中心,对于每个来自各聊天软件接口的Request,寻找活动的Session或创建合适的Session
class Distributor:
def __init__(self):
self.active_sessions = [] # 初始化活动Session列表
self.current_session = None # 暂存Session,用于和各聊天软件接口沟通
self._load_sessions() # 载入存在硬盘里的活动Session
self._new_session = NEW_SESSIONS # 创建新Session时的列表
self._max_iterate = 10
# 载入硬盘里的活动Session,不做任何判断
def _load_sessions(self):
try:
with open(ACTIVE_SESSIONS, 'rb') as f:
self.active_sessions = pickle.load(f)
except FileNotFoundError:
self._save_sessions()
except EOFError:
self._save_sessions()
# 把活动Session列表保存回硬盘中
def _save_sessions(self):
with open(ACTIVE_SESSIONS, 'wb') as f:
pickle.dump(self.active_sessions, f)
# 刷新活动Session表(检查)并保存
def refresh_and_save(self, save=True):
new_list = []
for session in self.active_sessions:
if session.is_active():
new_list.append(session)
self.active_sessions = new_list
if save:
self._save_sessions()
# 检查request是否在活动Session表中,若在,则返回True并将session放到current_session中
def use_active(self, request, save=True):
self.refresh_and_save(save=save)
# 在活动Session表中检查,若有符合的(用户id相同并且Request合法),采用该Session的处理方法
if not request.echo:
for session in self.active_sessions:
if request.user_id == session.user_id:
if session.is_legal_request(request=request):
session.refresh()
self.current_session = session
return True
else: # 相同id但Request不合法,对应Session会被关闭
session.deactivate()
# request需要echo或不存在活动session时,返回False
# echo一般是预处理出现问题时返回报错使用的
return False
# 处理任意Request,返回Response序列
def handle(self, request, debug=True):
session = None
if self.use_active(request=request):
# 若在活动Session表中,采用该Session的处理方法
# .use_active方法将current_session置为[active_session]
session = self.current_session
pass
# return make_list(self.current_session.handle(request=request))
else:
# 如果没有符合条件的活动Session,新建一个Session(Possibility最高者)
self.current_session = None
max_possibility = 0
for session_class_candidate in self._new_session:
session_candidate = session_class_candidate(user_id=request.user_id)
if session_candidate.is_legal_request(request=request):
possibility = session_candidate.probability_to_call(request=request)
if possibility > max_possibility:
session = session_candidate
max_possibility = possibility
if max_possibility > 0:
# 把新Session存入内存的表中,把Request交给新Session处理
self.active_sessions.append(session)
self.current_session = session
# 同一处理current_session
if session is not None:
responses = []
try:
raw_results = make_list(session.handle(request=request))
for r in raw_results:
if isinstance(r, Response):
responses.append(r)
elif isinstance(r, Request): # iterate handling requests
self._max_iterate -= 1
if self._max_iterate <= 0:
pass
else:
responses += self.handle(request=r, debug=debug)
except:
session.deactivate()
if debug:
# 如果debug,将错误信息返回
responses.append(ResponseMsg(traceback.format_exc()))
# 不debug则返回空气
finally:
return responses
else: # 若没有active session,且Possibility均为0,不返回Response
return []
# 在原Session中继续处理output,返回后续的Response序列
def process_output(self, output):
assert self.current_session is not None
self.current_session.refresh()
return make_list(self.current_session.process_output(output=output))
# 定时器的分拣中心,接收来自定时器的Request
class DistributorCron(Distributor):
def __init__(self):
Distributor.__init__(self)
self._new_session = NEW_SESSIONS_CRON # 定时器专属的新Session列表
# 把原始response转化为序列
def make_list(raw_response):
if isinstance(raw_response, list):
return raw_response
else:
return [raw_response]