-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathA02_task_workflow.py
More file actions
265 lines (221 loc) · 11.6 KB
/
A02_task_workflow.py
File metadata and controls
265 lines (221 loc) · 11.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# -*- encoding: utf-8 -*-
"""
@File: A02_task_workflow.py
@Modify Time: 2026/1/13 15:34
@Author: Kevin-Chen
@Descriptions: 任务拆分 工作流
"""
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from B00_agent_config import (
format_agent_skills,
REQUIREMENT_CLARIFICATION_MD,
TASK_DOC_STRUCTURE_PROMPT,
agent_names_list,
design_md,
requirement_str,
run_agent,
task_agent_init_prompt,
task_md,
today_str,
working_path,
)
from B03_init_function_agents import init_agent, custom_init_agent, parse_director_response
print_lock = threading.Lock()
# [任务拆分模式] 各个智能体的 skills 技能标签
agent_skills_dict = {
'需求分析师': ['$Scrum Master'],
'审核员': ['$System Architect'],
'测试工程师': ['$Business Analyst'],
'开发工程师': ['$Product Manager'],
}
DIRECTOR_SUCCESS_TEXT = "任务拆分完成"
# [任务拆分模式] 调度智能体的 prompt 主体
base_director_prompt = f"""你是一个专业的调度智能体.
现在有: {agent_names_list} 这{len(agent_names_list)}个智能体.
当前阶段为 任务拆解 阶段. 当前的需求描述如下: {requirement_str}
需求分析师 已经根据需求描述写了一份详细设计文档 {design_md} 以及 任务单文档 {task_md}
需要其他智能体对这个 {task_md} 文档进行检查, 然后结合各个智能体的输出, 让需求分析师对 {task_md} 任务单文档进行优化修改.
直到所有智能体都返回检查通过.
{TASK_DOC_STRUCTURE_PROMPT}
---
主要流程如下:
1) 通知 审核员智能体, 测试工程师智能体, 开发工程师智能体 对 {task_md} 进行文档审核.
1.1) 调用 审核员/测试工程师/开发工程师 智能体 时, prompt 模板如下:
```
你是一个专业的{{智能体名称}}. 现在有以下需求: {requirement_str}
需求分析师已经根据原始需求, 进行了详细设计, 并且写入了 {design_md} 文档中.
并且根据详细设计文档, 需求分析师已经生成了 {task_md} 任务单.
以专业{{智能体名称}}的角度, 根据 {design_md} 详细设计文档, 审查 {task_md} 中的任务安排.
重点检查:
1) 里程碑划分是否体现了合理的重大节点/阶段性成果;
2) 每个里程碑下是否都有为达成该成果所需的具体任务单;
3) 任务单是否粒度足够具体, 能被直接开发、审核、测试和勾选完成;
4) 任务单是否存在遗漏、重复、顺序错误、跨里程碑混放等问题;
5) 每个任务单是否写清了目标、涉及模块/文件、完成标准、验证方式.
如果有 疑问/歧义 也可以提出.
返回你发现的 错误/遗漏点 以及 疑问/歧义点. 如果没有任何错误或疑问, 则返回 '检查通过'.
不要返回多余的信息. 不要修改代码与文档.
```
---
2) 收集 审核员智能体, 测试工程师智能体, 开发工程师智能体 的审核结果.
2.1) 如果所有智能体都没有提出问题或疑问点, 则认为该任务开发完成, 直接到步骤5, 返回 {{"success": "{DIRECTOR_SUCCESS_TEXT}"}}
2.2) 如果有任一智能体提出错误点或者有疑问点, 则认为该需求详细设计未完成. 需要通知 需求分析师智能体 进行修复. prompt 模板如下:
```
你是一个专业 需求分析师, 针对 {task_md} 文档, 专家团队发现一些错误/遗漏点, 或者存在疑问/歧义点:
审核员发现以下错误/遗漏点:
{{具体内容}}
审核员有以下疑问/歧义点:
{{具体内容}}
测试工程师发现以下错误/遗漏点:
{{具体内容}}
测试工程师有以下疑问/歧义点:
{{具体内容}}
开发工程师发现以下错误/遗漏点:
{{具体内容}}
开发工程师有以下疑问/歧义点:
{{具体内容}}
- 总结并去重上面提到错误/遗漏点, 疑问/歧义点.
- 详细分析这些 错误/遗漏点 是否属实.
- 对这些疑问/分歧点进行深度设计.
- 对属实的问题进行修复. 并且说明对应的假设.
- 修改并补充说明 {task_md} 文档.
- 必须保证最终文档仍然严格采用“里程碑 + 任务单”两级结构.
- 必须说明你如何调整了里程碑划分、任务单归属、任务粒度、顺序和验证方式.
修改完成后, 返回以下内容:
1) 说明属实的问题是哪些
2) 对疑问/分歧点的回答与设计是什么
3) 你对 {task_md} 文档做了哪些修改
注意: 只返回上面3点内容, 不要返回其他无关的内容.
```
---
3) 需求分析师智能体 修改 {task_md} 文档, 然后你需要告诉各个其他智能体修改内容是什么, 以及对于 疑问/歧义点 的回答与设计是什么, 并且让各个智能体再次检查.
3.1) 调用 审核员/测试工程师/开发工程师 智能体 时, prompt 模板如下:
```
你是一个专业的{{智能体名称}}.
刚刚 需求分析师修改了 {task_md} 文档. 其修改内容如下:
{{修改内容}}
需求分析师对于疑问/歧义点的回答与设计如下:
{{回答与设计内容}}
以专业{{智能体名称}}的角度, 再次根据 {design_md} 详细设计文档, 审查 {task_md} 中的任务安排.
继续重点检查里程碑划分、任务单归属、任务粒度、执行顺序、完成标准、验证方式是否已经合理.
如果有 疑问/歧义 也可以提出. 返回你发现的 错误/遗漏点 以及 疑问/歧义点.
如果没有任何错误或疑问, 则返回 '检查通过'.
不要返回多余的信息. 不要修改代码与文档.
```
---
4) 收集 审核员智能体, 测试工程师智能体, 开发工程师智能体 的审核结果.
4.1) 如果所有智能体都没有提出问题或疑问点, 则认为 任务拆解 已经完成, 直接到步骤5, 返回 {{"success": "{DIRECTOR_SUCCESS_TEXT}"}}
4.2) 如果有任一智能体提出错误点或者有疑问点, 则认为该需求详细设计未完成. 需要通知 需求分析师智能体 进行修复. 回到 2.2)
---
5) 当所有智能体都认为当前需求分析与详细设计没有问题了, 则结束整个流程. 返回如下JSON:
```
{{"success": "{DIRECTOR_SUCCESS_TEXT}"}}
```
---
返回JSON格式的数据, 格式需要是 {{智能体名称: 提示词}} 格式如下:
{{"需求分析师": "相关提示词..."}}
{{"审核员": "相关提示词...", "测试工程师": "相关提示词...", "开发工程师": "相关提示词...", }}
注意JSON内部双引号需要有转义符, 保证JSON合法
最终回复必须是一个固定字段的 JSON 对象, 且只能输出 JSON 本身, 禁止解释说明:
```json
{{"success":"","需求分析师":"","审核员":"","测试工程师":"","开发工程师":""}}
```
规则:
- 5个字段必须全部出现
- 当前不使用的字段必须填空字符串 ""
- 如果流程结束, 只能填写 success, 其他4个字段必须为空字符串
- 如果 success 非空, 它的值必须精确等于 "{DIRECTOR_SUCCESS_TEXT}"
- 如果当前只是准备读取、分析、规划下一步, 绝不能填写 success
- 如果要调度智能体, success 必须为空字符串, 其余需要调度的智能体字段填写 prompt, 不调度的智能体字段填空字符串
---
智能体名称必须是: 开发工程师, 需求分析师, 审核员, 测试工程师
返回的格式必须严格满足JSON格式要求.
"""
def prepare_agent_prompt(agent_name, agent_prompt):
skills_prefix = format_agent_skills(agent_name, agent_skills_dict)
clarification_rule = f"""
前置规则:
1) 你必须优先阅读并参考需求澄清记录文件: {REQUIREMENT_CLARIFICATION_MD}
2) 文件绝对路径: {working_path}/{REQUIREMENT_CLARIFICATION_MD}
3) 若文件存在, 你的评审结论必须结合澄清记录中的结论;
4) 若文件不存在, 说明“暂无人类澄清记录”, 再基于现有信息继续任务.
"""
return f"{skills_prefix} {clarification_rule}\n{agent_prompt}".strip()
# [任务拆分模式] 任务拆分工作流主函数
def main():
"""
任务拆分 工作流 主函数
:return:
"""
''' 1) 初始化 各个功能型智能体 ------------------------------------------------------------------------------------ '''
agent_session_id_dict = dict()
# 使用线程池并发初始化多个agent,将agent名称和对应的session ID存储到字典中
with ThreadPoolExecutor(max_workers=len(agent_names_list)) as executor:
futures = [executor.submit(init_agent, agent_name) for agent_name in agent_names_list]
for future in as_completed(futures):
a_name, s_id = future.result()
agent_session_id_dict[a_name] = s_id
# 个性化初始化 需求分析师 智能体
custom_init_agent('需求分析师', agent_session_id_dict['需求分析师'], task_agent_init_prompt)
''' 2) 初始化 调度器智能体 -------------------------------------------------------------------------------------- '''
director_agent_name = "调度器"
director_log_file_path = f"{working_path}/agent_{director_agent_name}_{today_str}.log"
init_director_prompt = f"""
---
现在开始你的调度任务.
"""
init_director_prompt = base_director_prompt + init_director_prompt
msg, director_session_id = run_agent(director_agent_name, director_log_file_path,
init_director_prompt, init_yn=True, session_id=None)
msg_dict = parse_director_response(msg, director_log_file_path, allowed_success_values={DIRECTOR_SUCCESS_TEXT})
first_agent_name = list(msg_dict.keys())[0]
''' 3) 调用 各个功能型智能体 ------------------------------------------------------------------------------------- '''
while first_agent_name != 'success':
if first_agent_name not in ['需求分析师', '审核员', '测试工程师', '开发工程师']:
raise ValueError(f"调度器智能体返回了未知的智能体名称: {first_agent_name}")
# 并发执行智能体
agent_items = list(msg_dict.items())
what_agent_replay_dict = dict()
with ThreadPoolExecutor(max_workers=len(agent_items)) as executor:
futures = {
executor.submit(
run_agent,
agent_name,
f"{working_path}/agent_{agent_name}_{today_str}.log",
prepare_agent_prompt(agent_name, agent_prompt),
False,
agent_session_id_dict[agent_name],
): agent_name
for agent_name, agent_prompt in agent_items
}
for future in as_completed(futures):
agent_name = futures[future]
msg, _ = future.result()
what_agent_replay_dict[agent_name] = msg
what_agent_just_use = [agent_name for agent_name, _ in agent_items]
''' 4) 调用 调度器智能体 ------------------------------------------------------------------------------------ '''
# 合并处理结果, 生成 新的调度器提示词
what_agent_replay = ''
for agent_name, msg in what_agent_replay_dict.items():
what_agent_replay += f"""
{agent_name}:
```
{msg}
```
"""
doing_director_prompt = f"""
---
你刚刚调用的智能体为 {what_agent_just_use} 返回内容如下:
{what_agent_replay}
---
继续按照上述流程进行调度.
"""
doing_director_prompt = base_director_prompt + doing_director_prompt
# 调用 调度器智能体
msg, session_id = run_agent(director_agent_name, director_log_file_path, doing_director_prompt,
init_yn=False, session_id=director_session_id)
msg_dict = parse_director_response(msg, director_log_file_path, allowed_success_values={DIRECTOR_SUCCESS_TEXT})
first_agent_name = list(msg_dict.keys())[0]
if __name__ == '__main__':
main()