1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
|
from typing import Dict, List, Optional from dataclasses import dataclass from enum import Enum import asyncio
class AgentRole(Enum): """Agent 角色枚举""" PM = "pm" ARCHITECT = "architect" DEVELOPER = "developer" TESTER = "tester" WRITER = "writer" DEVOPS = "devops"
@dataclass class AgentConfig: """Agent 配置""" role: AgentRole label: str model: str = "qwen3.5-plus" timeout: int = 1800 workspace: str = "" read_only: List[str] = None thread: bool = True mode: str = "session"
class SubAgentManager: """子 Agent 管理器""" def __init__(self): self.agents: Dict[str, AgentConfig] = {} self.sessions: Dict[str, str] = {} def spawn(self, config: AgentConfig, task: str) -> str: """ 生成子 Agent Args: config: Agent 配置 task: 任务描述 Returns: session_key: 会话密钥 """ workspace = config.workspace or f"private/{config.role.value}/" read_only = config.read_only or [ "memory/MEMORY.md", "skills/", "docs/", "projects/" ] full_task = self._build_task_description(config.role, task) session_key = sessions_spawn( label=config.label, runtime="subagent", mode=config.mode, task=full_task, model=config.model, thread=config.thread, timeout_seconds=config.timeout, cleanup="keep" ) self.agents[session_key] = config self.sessions[config.role.value] = session_key return session_key def _build_task_description(self, role: AgentRole, task: str) -> str: """构建任务描述""" role_instructions = { AgentRole.PM: """ 【PM 角色】产品需求分析
**工作规则**: 1. 工作文件:private/pm/work/ 2. 私有笔记:private/pm/notes/ 3. 提交输出:private/pm/output/ 4. 读取共享:memory/, skills/, docs/, projects/
**输出要求**: - 需求文档(PRD) - 用户故事(User Stories) - 任务清单与优先级 """, AgentRole.ARCHITECT: """ 【Architect 角色】系统架构设计
**工作规则**: 1. 工作文件:private/architect/work/ 2. 私有笔记:private/architect/notes/ 3. 提交输出:private/architect/output/ 4. 读取共享:memory/, skills/, docs/, projects/
**输出要求**: - 架构图(Mermaid/draw.io) - 技术选型文档 - API 设计规范 """, AgentRole.DEVELOPER: """ 【Developer 角色】编码实现
**工作规则**: 1. 工作文件:private/developer/work/ 2. 私有笔记:private/developer/notes/ 3. 提交输出:private/developer/output/ 4. 读取共享:memory/, skills/, docs/, projects/
**输出要求**: - 源代码(含注释) - 单元测试 - 代码审查通过 """, AgentRole.TESTER: """ 【Tester 角色】测试验证
**工作规则**: 1. 工作文件:private/tester/work/ 2. 私有笔记:private/tester/notes/ 3. 提交输出:private/tester/output/ 4. 读取共享:memory/, skills/, docs/, projects/
**输出要求**: - 测试用例 - 测试报告 - 缺陷报告 """, AgentRole.WRITER: """ 【Writer 角色】文档编写
**工作规则**: 1. 工作文件:private/writer/work/ 2. 私有笔记:private/writer/notes/ 3. 提交输出:private/writer/output/ 4. 读取共享:memory/, skills/, docs/, projects/
**输出要求**: - 技术文档 - API 文档 - 用户手册 """, AgentRole.DEVOPS: """ 【DevOps 角色】部署配置
**工作规则**: 1. 工作文件:private/devops/work/ 2. 私有笔记:private/devops/notes/ 3. 提交输出:private/devops/output/ 4. 读取共享:memory/, skills/, docs/, projects/
**输出要求**: - Dockerfile - Jenkinsfile - K8s 配置 - 监控仪表盘 """ } base_instruction = role_instructions.get(role, "") return f"""{base_instruction}
**当前任务**: {task}
**完成标准**: - 质量符合项目标准 - 输出到私有 output/ 目录等待审核 - 在共享 memory/ 中记录关键决策 """ async def wait_all(self, timeout: int = 3600) -> Dict[str, str]: """等待所有子 Agent 完成""" results = {} for role, session_key in self.sessions.items(): try: result = await self._poll_session(session_key, timeout) results[role] = result except asyncio.TimeoutError: results[role] = f"❌ 超时({timeout}s)" return results async def _poll_session(self, session_key: str, timeout: int) -> str: """轮询会话完成""" start_time = time.time() while time.time() - start_time < timeout: status = sessions_list(active_minutes=1) for session in status: if session['key'] == session_key: if session['status'] == 'completed': return session['result'] elif session['status'] == 'failed': return f"❌ 失败:{session['error']}" await asyncio.sleep(10) raise asyncio.TimeoutError(f"等待超时:{timeout}s") def get_outputs(self, role: str) -> List[str]: """获取子 Agent 输出""" output_dir = Path(f"private/{role}/output/") if not output_dir.exists(): return [] return [ str(f) for f in output_dir.glob("*") if f.is_file() ] def review_output(self, role: str, output_path: str) -> ReviewResult: """审核子 Agent 输出""" content = Path(output_path).read_text() checklist = self._get_review_checklist(role) results = {} for item in checklist: results[item] = self._check_item(content, item) passed = all(results.values()) return ReviewResult( passed=passed, checklist=results, suggestions=self._generate_suggestions(results) ) def _get_review_checklist(self, role: str) -> List[str]: """获取审核清单""" checklists = { "pm": [ "需求描述清晰", "用户故事完整", "优先级合理", "验收标准明确" ], "architect": [ "架构图清晰", "技术选型合理", "接口设计完整", "性能/安全考虑" ], "developer": [ "代码规范", "单元测试覆盖", "注释完整", "无安全漏洞" ], "tester": [ "测试用例完整", "边界条件覆盖", "自动化程度", "缺陷描述清晰" ], "writer": [ "文档结构清晰", "内容准确", "示例完整", "格式规范" ], "devops": [ "配置完整", "安全性考虑", "监控告警", "回滚方案" ] } return checklists.get(role, []) def merge_to_shared(self, role: str, output_path: str, target_dir: str): """合并输出到共享区""" content = Path(output_path).read_text() target_path = Path(target_dir) / Path(output_path).name if target_path.exists(): backup_path = target_path.with_suffix(target_path.suffix + '.backup') shutil.copy(target_path, backup_path) shutil.copy(output_path, target_path) self._log_merge(role, output_path, target_path)
|