认知与协作:记忆、反思与多智能体
Agent 的长期能力不是靠无限上下文窗口堆出来的,而是靠”可治理的记忆”和有效的协作机制形成的。一个 Agent 系统要能持续改进,需要反思能力来把失败变成下一轮输入,需要记忆管理来避免每次从零开始,需要学习机制来从单次反馈积累成策略更新。当复杂度超过单个 Agent 的承载能力时,还需要多智能体协作,但协作的复杂度会随智能体数量快速上升,因此必须谨慎设计。
上篇:认知能力
反思:把失败变成下一轮输入
反思模式让 Agent 在输出后检查自己的结果,发现遗漏、矛盾、风险或不满足要求的地方,再进入修正。这个机制对写作、代码生成、计划制定、需求分析和多步骤推理尤其有用。
反思循环的完整流程
一个典型反思循环包含五个阶段:
生成初稿
-> 按标准审查
-> 找出缺陷
-> 修改
-> 再审查
-> 达到停止条件
- 生成初稿:Agent 根据任务要求生成初步输出
- 按标准审查:对照质量标准、需求规格或验证规则检查
- 找出缺陷:识别遗漏、矛盾、风险点或不满足要求的地方
- 修改:基于发现的问题进行针对性修正
- 再审查:重新检查修改后的结果
- 达到停止条件:满足预设的停止条件时结束循环
停止条件设计
没有明确停止条件的反思会陷入无限打磨。
质量阈值:
- 设定可量化的质量指标
- 例如:代码覆盖率 > 80%、文档完整性 > 95%
- 通过自动化测试或规则检查实现
最大轮次:
- 限制反思迭代次数
- 例如:最多 3 轮反思,避免过度优化
- 防止资源消耗失控
资源预算:
- 设定 token 或时间预算
- 例如:反思阶段不超过总预算的 30%
- 在成本与质量之间取得平衡
外部验证通过:
- 引入外部验证机制
- 例如:测试套件全部通过、人工审核通过
- 比模型自评更可靠的停止信号
反思与测试结合
用测试结果驱动反思方向:
def reflect_with_tests(initial_solution, test_suite):
solution = initial_solution
for iteration in range(MAX_REFLECTION_ROUNDS):
test_results = test_suite.run(solution)
if test_results.passed:
return solution
# 基于具体失败点生成反思
failure_analysis = analyze_failures(test_results.failures)
solution = refine(solution, failure_analysis)
return solution # 返回最佳可用方案测试提供外部证据,指导反思聚焦于真实问题,而非模型想象的缺陷。
反思与检索结合
用外部证据校验模型自评,减少”自圆其说”的风险:
def reflect_with_rag(draft, knowledge_base):
# 先让模型自我评估
self_critique = model.generate_critique(draft)
# 用外部知识验证关键断言
verified_claims = []
for claim in extract_claims(draft):
external_evidence = knowledge_base.search(claim)
if external_evidence contradicts claim:
verified_claims.append((claim, " contradicted by evidence"))
# 结合自评和外部证据生成最终反思
return synthesize_critique(self_critique, verified_claims)检索提供独立于模型的验证来源,反思基于证据而非自我确认。
反思的风险
反思模式有几个需要警惕的风险:
自圆其说:
- 模型可能在反思中为错误结论寻找合理化解释
- 需要外部验证机制(测试、检索、人工审核)来制衡
无限打磨:
- 没有明确停止条件时,反思可能永不结束
- 必须预设质量阈值、最大轮次或资源预算
反思成本:
- 每轮反思都需要额外 token 和时间
- 要在质量提升和成本增加之间找到平衡点
反模式识别
- 完全依赖模型自评:没有外部验证的反思容易陷入自我确认
- 没有停止条件:缺乏明确的终止标准会导致资源浪费
- 反思过度:为了微小改进投入过多资源,边际收益递减
- 忽略执行环境:反思没有考虑实际部署环境的约束
实践案例:代码生成的反思循环
async def generate_with_reflection(task_description, max_rounds=3):
"""带反思的代码生成流程"""
async def reflection_round(code, round_num):
# 执行静态分析
static_issues = analyze_code(code)
# 运行测试(如果有)
test_results = run_tests(code) if has_tests(code) else None
# 生成反思
critique = await model.generate(
f"分析以下代码的问题:\n{code}\n"
f"静态分析结果:{static_issues}\n"
f"测试结果:{test_results}\n"
f"这是第 {round_num} 轮反思,重点关注关键问题。"
)
return critique, static_issues, test_results
# 第一轮:生成初稿
code = await model.generate(task_description)
# 反思循环
for round_num in range(1, max_rounds + 1):
critique, issues, tests = await reflection_round(code, round_num)
# 停止条件检查
if not issues and (not tests or tests.passed):
break
if round_num >= max_rounds:
break
# 基于反思修改代码
code = await model.generate(
f"根据以下反馈修改代码:\n{critique}\n"
f"当前代码:\n{code}"
)
return code这个案例展示了如何将反思与静态分析、测试结合,形成有明确停止条件的改进循环。
记忆管理:让 Agent 不必每次从零开始
记忆管理解决两个问题:当前任务中如何保留必要状态,跨会话如何复用历史知识。
四类记忆详解
工作记忆(Working Memory):
- 当前任务的计划、已做步骤、临时结论
- 生命周期:单次任务期间
- 存储位置:上下文窗口或临时状态
- 典型内容:待办列表、已完成的步骤、中间变量
working_memory = {
"task_plan": ["分析需求", "设计架构", "实现代码", "测试"],
"completed_steps": ["分析需求"],
"current_focus": "设计架构",
"temporary_notes": ["用户提到需要支持离线模式"]
}情景记忆(Episodic Memory):
- 某次任务发生了什么、为什么这样决策
- 生命周期:长期存储,带时间戳
- 存储位置:持久化存储(向量数据库、文件系统)
- 典型内容:任务记录、决策依据、结果复盘
episodic_memory = {
"task_id": "task_20250428_001",
"timestamp": "2025-04-28T10:30:00Z",
"task_description": "实现用户认证模块",
"decisions": [
{"what": "选择 JWT 方案", "why": "需要支持无状态认证"},
{"what": "使用 Redis 缓存", "why": "性能要求"}
],
"outcome": "成功部署,延迟降低 40%"
}语义记忆(Semantic Memory):
- 抽象后的规则、偏好、项目知识
- 生命周期:长期,可更新
- 存储位置:知识库、规则引擎
- 典型内容:项目约束、编码规范、领域知识
semantic_memory = {
"project_constraints": [
"禁止新增外部依赖",
"API 响应时间必须 < 100ms",
"必须兼容 Python 3.8+"
],
"code_style": "遵循 PEP 8,使用 Black 格式化",
"domain_knowledge": {
"认证": "本项目使用 JWT + Redis 方案",
"数据库": "主库 PostgreSQL,从库用于报表查询"
}
}程序记忆(Procedural Memory):
- 可复用流程、检查清单、操作习惯
- 生命周期:长期,可累积
- 存储位置:工作流引擎、脚本库
- 典型内容:部署流程、测试步骤、调试方法
procedural_memory = {
"deployment_checklist": [
"运行单元测试",
"检查环境变量",
"备份数据库",
"执行迁移",
"验证关键功能"
],
"debugging_workflow": {
"性能问题": ["1. 生成 profile", "2. 分析热点", "3. 优化瓶颈"],
"内存泄漏": ["1. 运行内存分析器", "2. 检查引用计数", "3. 修复泄漏点"]
}
}记忆架构设计
每类记忆有不同的技术实现方式:
class MemoryArchitecture:
def __init__(self):
# 工作记忆:上下文内状态
self.working = WorkingMemory()
# 情景记忆:向量数据库 + 时序索引
self.episodic = VectorDatabase(
index="episodes",
time_field="timestamp",
vector_field="embedding"
)
# 语义记忆:知识图谱 + 规则引擎
self.semantic = KnowledgeGraph(
store="concepts",
rule_engine="constraints"
)
# 程序记忆:工作流引擎
self.procedural = WorkflowEngine(
repository="playbooks"
)
def recall(self, query, memory_type):
"""跨记忆类型检索"""
if memory_type == "working":
return self.working.get(query)
elif memory_type == "episodic":
return self.episodic.search(query, k=5)
elif memory_type == "semantic":
return self.semantic.query(query)
elif memory_type == "procedural":
return self.procedural.find_playbook(query)遗忘策略
不是所有历史都值得记住。记忆系统必须有筛选和遗忘机制。
TTL(Time To Live):
- 为每条记忆设置过期时间
- 工作记忆:任务结束后立即清理
- 情景记忆:根据重要性设置不同 TTL
- 语义记忆:定期审查和更新
def apply_ttl(memory_records):
now = datetime.now()
ttl_rules = {
"temporary_note": timedelta(hours=1),
"task_outcome": timedelta(days=30),
"project_knowledge": timedelta(days=365),
"deprecated_info": timedelta(days=7)
}
for record in memory_records:
ttl = ttl_rules.get(record.type, timedelta(days=90))
if now - record.timestamp > ttl:
archive_or_delete(record)访问频率:
- 高频访问的记忆保留
- 长期未访问的记忆降级或清理
- LRU(Least Recently Used)策略
def access_based_cleanup(memory_records, threshold=0.1):
"""清理访问频率低于阈值的记忆"""
total_access = sum(r.access_count for r in memory_records)
for record in memory_records:
if record.access_count / total_access < threshold:
if is_old_record(record):
demote_or_remove(record)相关性衰减:
- 记忆与当前任务的相关性随时间衰减
- 定期重新评估记忆的关联强度
- 清除不再相关的记忆
def relevance_decay(memory_records, current_context):
"""基于当前上下文评估记忆相关性"""
for record in memory_records:
relevance = compute_relevance(record, current_context)
if relevance < RELEVANCE_THRESHOLD:
mark_for_review(record)记忆污染防护
长期记忆会变成污染源:旧事实、错误结论、过期偏好会不断影响新任务。
过时信息处理:
- 标记记忆的时间戳和版本
- 定期审查和更新
- 对可能过时的信息添加警告标记
def handle_outdated_info(memory_record):
if is_potentially_outdated(memory_record):
memory_record.status = "needs_review"
memory_record.last_reviewed = datetime.now()
memory_record.confidence *= 0.5 # 降低置信度错误结论处理:
- 记录决策的结果反馈
- 将被验证错误的结论标记
- 避免重复错误模式
def handle_failed_prediction(memory_record, actual_outcome):
if memory_record.prediction != actual_outcome:
memory_record.accuracy_history.append({
"predicted": memory_record.prediction,
"actual": actual_outcome,
"timestamp": datetime.now()
})
# 准确率过低则标记为不可靠
if compute_accuracy(memory_record) < 0.5:
memory_record.reliable = False过期偏好处理:
- 用户偏好会随时间变化
- 定期验证偏好是否仍然有效
- 给近期偏好更高权重
def update_user_preference(preference_memory, new_preference):
existing = preference_memory.get(new_preference.key)
if existing:
# 时间衰减因子
decay = compute_time_decay(existing.timestamp)
existing.value = (existing.value * decay +
new_preference.value * (1 - decay))
existing.timestamp = datetime.now()
else:
preference_memory.add(new_preference)反模式识别
- 什么都记:没有筛选机制导致记忆膨胀,检索效率下降
- 不标注来源和时间:无法评估记忆的可靠性和时效性
- 把结论当事实:未经验证的决策被当作永久规则
- 单一记忆类型:没有区分工作记忆、情景记忆等,导致混乱
- 缺乏遗忘机制:记忆无限增长,包含大量噪音信息
学习与适应:从单次反馈到策略更新
学习与适应比记忆更进一步。记忆保存事实,学习改变策略。
记忆 vs 学习
# 记忆:保存事实
memory.save("用户 A 偏好简洁的回答")
# 学习:改变策略
if user.prefers_concise:
strategy.response_style = "concise"
strategy.max_detail_level = 0.7记忆回答”发生了什么”,学习回答”下次怎么做”。
学习分层机制
单次任务内:临时注意事项
- 生命周期:当前会话
- 存储位置:工作记忆
- 更新条件:即时生效
- 例子:用户提到”这次用 Python”,临时切换语言
working_memory.set_temporary_note(
key="preferred_language",
value="python",
scope="current_session"
)多次重复后:项目经验沉淀
- 生命周期:跨会话,项目级别
- 存储位置:情景记忆
- 更新条件:模式重复 N 次
- 例子:某类任务总在测试阶段失败,提前补测试
if pattern_occurs("test_failure_after_implementation", count=3):
semantic_memory.update_rule(
"implementation_workflow",
add_step="编写测试用例",
position="after_implementation"
)人工确认后:长期规则或技能
- 生命周期:永久,直到被覆盖
- 存储位置:程序记忆
- 更新条件:人工验证
- 例子:项目约定禁止新增依赖,变成默认约束
def learn_with_validation(pattern, confidence):
if confidence > VALIDATION_THRESHOLD:
# 需要人工确认
if request_human_confirmation(pattern):
procedural_memory.add_skill(pattern)
elif confidence > LEARNING_THRESHOLD:
# 自动学习到项目级别
semantic_memory.add_rule(pattern)从反馈到策略更新的完整链路
class LearningLoop:
def __init__(self):
self.feedback_buffer = []
self.pattern_detector = PatternDetector()
self.strategy_updater = StrategyUpdater()
def record_feedback(self, task, outcome, feedback):
"""记录单次反馈"""
self.feedback_buffer.append({
"task": task,
"outcome": outcome,
"feedback": feedback,
"timestamp": datetime.now()
})
# 定期触发模式检测
if len(self.feedback_buffer) >= FEEDBACK_BATCH_SIZE:
self.detect_and_learn()
def detect_and_learn(self):
"""检测模式并更新策略"""
patterns = self.pattern_detector.detect(self.feedback_buffer)
for pattern in patterns:
if pattern.occurrence >= MIN_OCCURRENCE:
# 评估置信度
confidence = self.evaluate_confidence(pattern)
# 分层学习
if confidence > VALIDATION_THRESHOLD:
# 需要人工确认的高置信度模式
self.request_validation(pattern)
elif confidence > LEARNING_THRESHOLD:
# 自动学习到项目经验
self.semantic_memory.add_rule(pattern)
else:
# 保留在工作记忆作为临时注意
self.working_memory.add_note(pattern)
# 清理已处理的反馈
self.feedback_buffer = self.feedback_buffer[-KEEP_RECENT:]
def evaluate_confidence(self, pattern):
"""评估模式置信度"""
factors = {
"occurrence": min(pattern.occurrence / 10, 1.0),
"consistency": pattern.consistency_score,
"outcome_quality": pattern.avg_outcome_score,
"source_reliability": pattern.avg_feedback_reliability
}
return weighted_average(factors)这个链路展示了从单次反馈到策略更新的完整流程:收集反馈 → 检测模式 → 评估置信度 → 分层学习。
反模式识别
- 一次失败就泛化为规则:偶然失败被过度泛化,导致错误策略
- 学习过于激进:置信度阈值过低,引入不可靠的规则
- 忽略上下文:没有考虑模式适用的边界条件
- 不记录学习来源:无法追溯和审计策略变更
- 缺乏人工确认:高风险的自动化学习没有人工把关
认知能力的安全循环
反思、记忆、学习三者经常被混在一起,但职责不同:
- 反思关注本次输出是否足够好
- 记忆关注哪些信息下次还要用
- 学习关注未来行为策略是否要调整
一个安全的循环应该是:
执行任务
-> 验证结果
-> 反思失败原因
-> 记录必要证据
-> 提炼可复用规则
-> 在合适边界内更新行为
async def safe_cognitive_loop(task, validator, memory, learning_system):
"""安全的认知循环"""
# 1. 执行任务
result = await execute(task)
# 2. 验证结果
validation = await validator.validate(result)
if not validation.passed:
# 3. 反思失败原因
reflection = await reflect_on_failure(result, validation)
# 4. 记录必要证据
memory.record({
"task": task,
"result": result,
"validation": validation,
"reflection": reflection
})
# 5. 提炼可复用规则
pattern = extract_pattern(reflection, memory.get_similar_cases())
# 6. 在合适边界内更新行为
if pattern.confidence > learning_system.threshold:
if pattern.risk_level == "high":
# 高风险模式需要人工确认
await learning_system.request_validation(pattern)
else:
# 低风险模式可以自动学习
learning_system.update_strategy(pattern)
# 基于反思重试
result = await execute_with_insights(task, reflection)
return result可控记忆的设计原则:
- 保守原则:宁可少记,也不要乱记
- 证据原则:宁可保存证据,也不要只保存结论
- 标注原则:宁可标注时间和来源,也不要把过去判断伪装成永恒事实
- 边界原则:在明确边界内更新行为,不要无限泛化
- 可审计原则:每个策略变更都应该可追溯
对工程 Agent 来说,最有价值的记忆不是”用户说过什么”,而是”项目真实约束是什么、哪些路径试过失败、哪些验证命令可信”。
下篇:多智能体协作
多智能体不是越多越好
多智能体协作的直觉很诱人:把复杂任务拆给多个专家,让它们并行工作。但智能体数量增加后,系统复杂度不是线性增加,而是快速上升。
# 通信复杂度:N 个智能体需要 N*(N-1)/2 条通信通道
def communication_complexity(n_agents):
return n_agents * (n_agents - 1) / 2
# 协调成本:随智能体数量指数增长
def coordination_cost(n_agents):
return BASE_COST * (1.5 ** (n_agents - 1))适合的场景
多智能体真正适合的场景是:
子任务相互独立,可以并行推进
- 例如:分析多个不相关的模块
- 价值:缩短总执行时间
- 案例:代码库分析,每个 Agent 负责一个模块
async def parallel_code_analysis(modules):
tasks = [analyze_module(module) for module in modules]
results = await asyncio.gather(*tasks)
return aggregate_results(results)需要不同专业视角
- 例如:安全、性能、产品、实现
- 价值:覆盖单一视角的盲区
- 案例:架构评审,不同角色关注不同维度
async def multi_perspective_review(design):
security_review = await security_agent.review(design)
performance_review = await performance_agent.review(design)
product_review = await product_agent.review(design)
return synthesize_reviews([security_review, performance_review, product_review])需要生成与审查分离
- 例如:降低自证偏差
- 价值:提高输出质量
- 案例:代码生成后独立审查
async def generate_and_review(task):
proposal = await generator_agent.generate(task)
critique = await reviewer_agent.critique(proposal)
return refine_based_on_critique(proposal, critique)单个上下文窗口无法容纳全部信息
- 例如:大型代码库分析
- 价值:突破单模型上下文限制
- 案例:分布式代码审查
不同角色需要不同工具权限
- 例如:生产环境操作受限
- 价值:最小权限原则
- 案例:部署流程,开发 Agent 生成配置,运维 Agent 执行部署
不适合的场景
任务很小,协调成本高于执行成本
- 简单任务不需要拆分
- 协调开销可能超过任务本身
# 反模式:为简单查询创建多个 Agent
async def bad_simple_query(question):
analyzer = Agent("analyzer")
validator = Agent("validator")
formatter = Agent("formatter")
# 三个 Agent 的协调成本远超简单查询的价值
result = await analyzer.analyze(question)
validated = await validator.validate(result)
return await formatter.format(validated)
# 正确:直接处理
async def good_simple_query(question):
return await direct_answer(question)子任务强耦合,需要频繁同步
- 高度依赖的任务不适合拆分
- 同步开销抵消并行收益
没有清晰的最终仲裁者
- 多个 Agent 产生冲突时需要仲裁
- 缺少仲裁机制会导致僵局
多个智能体会同时写同一片文件或数据
- 写入冲突需要复杂的协调机制
- 最好通过所有权边界避免
角色拆分:基于责任,不是头衔
常见错误是先起角色名:架构师、开发、测试、审查员。更好的方式是先问责任边界。
基于头衔拆分的错误案例
# 反模式:基于头衔拆分
class ArchitectAgent:
def work(self):
# 太宽泛,不知道具体做什么
"负责架构设计"
class DeveloperAgent:
def work(self):
# 与其他角色边界模糊
"负责开发"
class TesterAgent:
def work(self):
# 与开发角色重叠
"负责测试"这种拆分的问题是角色职责模糊,容易产生工作重叠或遗漏。
基于责任拆分的正确方法
先问六个关键问题:
-
谁负责定义目标?
- 产品负责人
- 职责:明确要做什么,为什么做
-
谁负责收集证据?
- 研究员
- 职责:收集技术选项、市场数据、竞品信息
-
谁负责实现?
- 实现者
- 职责:编写代码、配置系统、实现功能
-
谁负责验证?
- 验证者
- 职责:测试、审查、检查是否符合要求
-
谁负责质疑?
- 批评者
- 职责:挑战假设、指出风险、提出替代方案
-
谁拥有最终决策权?
- 决策者
- 职责:在冲突中做出选择,对结果负责
# 好的模式:基于责任拆分
class GoalDefiner:
"""定义目标"""
def define(self, requirements):
return {
"objectives": extract_objectives(requirements),
"success_criteria": define_criteria(requirements),
"constraints": identify_constraints(requirements)
}
class EvidenceCollector:
"""收集证据"""
def collect(self, topic):
return {
"technical_options": research_options(topic),
"market_data": gather_market_data(topic),
"benchmarks": collect_benchmarks(topic)
}
class Implementer:
"""实现方案"""
def implement(self, design):
return {
"code": write_code(design),
"configuration": configure_system(design),
"deployment": deploy(design)
}
class Verifier:
"""验证结果"""
def verify(self, implementation, criteria):
return {
"tests": run_tests(implementation),
"review": conduct_review(implementation),
"compliance": check_compliance(implementation, criteria)
}
class Critic:
"""质疑方案"""
def critique(self, proposal):
return {
"assumptions": challenge_assumptions(proposal),
"risks": identify_risks(proposal),
"alternatives": propose_alternatives(proposal)
}
class Decider:
"""最终决策"""
def decide(self, options, critique):
return make_decision(options, critique)角色有意义的四个条件
角色只有在满足以下条件之一时才有意义:
- 责任不同:承担不同的决策责任
- 工具不同:需要不同的工具集
- 上下文不同:操作在不同的信息空间
- 判断标准不同:使用不同的评估标准
如果多个 Agent 在这些维度上都相同,它们只是在重复工作。
def validate_role_separation(agent1, agent2):
"""验证角色分离是否合理"""
checks = {
"different_responsibilities": agent1.responsibilities != agent2.responsibilities,
"different_tools": agent1.tools != agent2.tools,
"different_context": agent1.context_space != agent2.context_space,
"different_criteria": agent1.evaluation_criteria != agent2.evaluation_criteria
}
# 至少满足一个条件,角色分离才有意义
return any(checks.values())通信协议比智能体本身更重要
多智能体系统的失败常常不是某个 Agent 不聪明,而是通信没有约束。
消息格式设计
结构化消息是可靠协作的基础:
@dataclass
class AgentMessage:
"""智能体间通信消息格式"""
message_id: str
sender: str
receiver: str
timestamp: datetime
# 任务描述
task: str
task_type: Literal["generate", "analyze", "verify", "decide"]
# 输入输出
inputs: Dict[str, Any]
expected_outputs: List[str]
# 证据和推理
reasoning: str
evidence: List[Dict[str, Any]]
# 风险和置信度
risks: List[str]
confidence: float
# 当前状态
status: Literal["pending", "in_progress", "blocked", "completed", "failed"]
dependencies: List[str] # 依赖的其他消息 ID
def to_dict(self):
return asdict(self)
@classmethod
def from_dict(cls, data):
return cls(**data)生命周期管理
明确的状态转换规则:
class MessageLifecycle:
"""消息生命周期管理"""
def __init__(self):
self.states = {
"pending": [], # 待处理
"in_progress": [], # 进行中
"blocked": [], # 阻塞
"completed": [], # 已完成
"failed": [] # 失败
}
def transition(self, message, new_state):
"""状态转换"""
valid_transitions = {
"pending": ["in_progress", "failed"],
"in_progress": ["blocked", "completed", "failed"],
"blocked": ["in_progress", "failed"],
"completed": [], # 终态
"failed": [] # 终态
}
if new_state in valid_transitions[message.status]:
self.states[message.status].remove(message)
message.status = new_state
self.states[new_state].append(message)
return True
return False
def get_ready_messages(self):
"""获取可处理的消息(依赖已满足)"""
ready = []
for msg in self.states["pending"]:
if all(dep.status == "completed" for dep in msg.dependencies):
ready.append(msg)
return ready冲突处理与仲裁机制
当多个 Agent 产生不一致结果时,需要明确的仲裁规则:
class ConflictResolver:
"""冲突解决机制"""
def __init__(self, arbitration_strategy):
self.strategy = arbitration_strategy
def resolve(self, conflicting_results):
"""解决冲突"""
if self.strategy == "highest_confidence":
return self._by_confidence(conflicting_results)
elif self.strategy == "most_specific":
return self._by_specificity(conflicting_results)
elif self.strategy == "voting":
return self._by_voting(conflicting_results)
elif self.strategy == "human_arbitrator":
return self._human_arbitrate(conflicting_results)
else:
raise ValueError(f"Unknown strategy: {self.strategy}")
def _by_confidence(self, results):
"""选择置信度最高的结果"""
return max(results, key=lambda r: r.confidence)
def _by_specificity(self, results):
"""选择最具体的结果"""
return max(results, key=lambda r: len(r.evidence))
def _by_voting(self, results):
"""投票决定"""
from collections import Counter
votes = Counter(r.conclusion for r in results)
return next(r for r in results if r.conclusion == votes.most_common(1)[0][0])
def _human_arbitrate(self, results):
"""人工仲裁"""
# 展示冲突结果,请求人工决策
return request_human_decision(conflicting_results)共享状态 vs 局部状态
class StateManager:
"""状态管理:区分共享和局部状态"""
def __init__(self):
# 共享状态:所有 Agent 可见
self.shared_state = {
"project_goals": None,
"constraints": None,
"decisions_made": [],
"artifacts": {}
}
# 局部状态:Agent 专属
self.local_states = {}
def get_shared(self, key):
"""读取共享状态"""
return self.shared_state.get(key)
def update_shared(self, key, value, agent_id):
"""更新共享状态(需要记录谁修改了什么)"""
old_value = self.shared_state.get(key)
self.shared_state[key] = value
self._log_change(key, old_value, value, agent_id)
def get_local(self, agent_id, key):
"""读取局部状态"""
return self.local_states.setdefault(agent_id, {}).get(key)
def update_local(self, agent_id, key, value):
"""更新局部状态"""
self.local_states.setdefault(agent_id, {})[key] = value
def _log_change(self, key, old, new, agent_id):
"""记录变更历史"""
self.shared_state.setdefault("change_log", []).append({
"key": key,
"old_value": old,
"new_value": new,
"changed_by": agent_id,
"timestamp": datetime.now()
})A2A 协议:结构化交互的核心价值
A2A(Agent-to-Agent)协议的价值在于把智能体间交互协议化。协议化之后,协作不再依赖”自然语言互相喊话”,而是有结构化状态、能力声明和任务交接。
class A2AProtocol:
"""智能体间通信协议"""
def __init__(self):
self.capability_registry = {}
self.message_queue = []
def register_capability(self, agent_id, capability):
"""注册能力声明"""
self.capability_registry[agent_id] = {
"can_do": capability.tasks,
"requires": capability.requirements,
"produces": capability.outputs,
"constraints": capability.constraints
}
def find_capable_agents(self, task):
"""查找能处理任务的 Agent"""
capable = []
for agent_id, caps in self.capability_registry.items():
if task in caps["can_do"]:
capable.append(agent_id)
return capable
def send_message(self, sender, receiver, message):
"""发送结构化消息"""
envelope = {
"protocol_version": "1.0",
"message_id": generate_uuid(),
"timestamp": datetime.now().isoformat(),
"sender": sender,
"receiver": receiver,
"payload": message.to_dict(),
"signature": sign_message(message, sender)
}
self.message_queue.append(envelope)
return envelope["message_id"]
def receive_messages(self, agent_id):
"""接收发给指定 Agent 的消息"""
return [msg for msg in self.message_queue if msg["receiver"] == agent_id]三种协作结构
主从结构
一个主 Agent 负责规划、分派和整合,多个子 Agent 执行独立任务。这是最容易落地的结构。
适用场景:
- 代码库分析:主 Agent 协调,子 Agent 分析各模块
- 资料调研:主 Agent 定义问题,子 Agent 收集不同来源
- 多模块修复:主 Agent 识别问题,子 Agent 并行修复
- 并行审查:主 Agent 分配审查任务,子 Agent 独立审查
详细案例:代码库分析
class MasterAgent:
"""主 Agent:负责协调和整合"""
def __init__(self, sub_agents):
self.sub_agents = sub_agents
self.analysis_plan = None
async def analyze_codebase(self, codebase_path):
# 1. 规划:分解代码库为模块
modules = self._discover_modules(codebase_path)
self.analysis_plan = self._create_analysis_plan(modules)
# 2. 分派:分配任务给子 Agent
tasks = []
for module in modules:
capable_agent = self._find_capable_agent(module)
task = {
"module": module,
"focus_areas": self._get_focus_areas(module),
"output_format": "structured_report"
}
tasks.append((capable_agent, task))
# 3. 执行:并行分析
results = await asyncio.gather(*[
agent.analyze(**task) for agent, task in tasks
])
# 4. 整合:合并分析结果
integrated = self._integrate_results(results)
# 5. 总结:生成整体洞察
insights = self._generate_insights(integrated)
return {
"detailed_analysis": integrated,
"key_insights": insights,
"recommendations": self._generate_recommendations(insights)
}
class SubAgent:
"""子 Agent:负责具体分析"""
def __init__(self, specialization):
self.specialization = specialization # 如 "security", "performance", "architecture"
async def analyze(self, module, focus_areas, output_format):
"""分析指定模块"""
analysis = {
"module": module,
"specialization": self.specialization,
"findings": [],
"metrics": {},
"risks": [],
"opportunities": []
}
for area in focus_areas:
finding = await self._analyze_area(module, area)
analysis["findings"].append(finding)
return analysis设计要点:
- 主 Agent 专注于协调,不做具体分析
- 子 Agent 专注于执行,不关心全局
- 明确的任务分派接口
- 标准化的结果格式便于整合
- 子 Agent 之间不需要直接通信
评审结构
一个 Agent 生成方案,另一个或多个 Agent 专门审查。评审者不参与实现,减少立场污染。
适用场景:
- 架构方案评审:生成者提出方案,评审者挑战假设
- 安全审查:实现者完成功能,安全专家独立审查
- 代码 review:开发者提交代码,审查者独立检查
- 重要文档定稿:作者撰写,编辑独立审查
详细案例:安全审查
class GeneratorAgent:
"""生成者 Agent:负责实现功能"""
async def implement_feature(self, requirements):
implementation = {
"code": await self._write_code(requirements),
"configuration": await self._write_config(requirements),
"documentation": await self._write_docs(requirements)
}
# 生成者自评(但不作为最终结论)
self_assessment = {
"functionality": "✓ 需求已实现",
"known_limitations": ["未处理边缘情况 X"],
"testing_coverage": "单元测试覆盖率 80%"
}
return {
"implementation": implementation,
"self_assessment": self_assessment,
"metadata": {
"author": "GeneratorAgent",
"timestamp": datetime.now()
}
}
class ReviewerAgent:
"""评审者 Agent:负责独立审查"""
async def security_review(self, artifact):
"""独立安全审查,不受生成者影响"""
review = {
"reviewer": "SecurityReviewer",
"timestamp": datetime.now(),
"independence": "full", # 完全独立,不看生成者自评
"security_checks": {
"injection_vulnerabilities": await self._check_injections(artifact),
"authentication": await self._check_auth(artifact),
"authorization": await self._check_access_control(artifact),
"data_validation": await self._check_validation(artifact),
"sensitive_data": await self._check_data_exposure(artifact)
},
"findings": [],
"severity_levels": [],
"remediation": []
}
# 生成报告时不参考生成者的自评
for check, result in review["security_checks"].items():
if result["status"] == "failed":
review["findings"].append({
"check": check,
"issue": result["issue"],
"severity": result["severity"],
"evidence": result["evidence"]
})
return review
class ReviewCoordinator:
"""评审协调器:管理生成和评审流程"""
async def feature_with_review(self, requirements):
# 1. 生成阶段
generator = GeneratorAgent()
artifact = await generator.implement_feature(requirements)
# 2. 独立评审阶段
reviewers = [
ReviewerAgent("security"),
ReviewerAgent("performance"),
ReviewerAgent("architecture")
]
reviews = await asyncio.gather(*[
reviewer.review(artifact) for reviewer in reviewers
])
# 3. 评审结果处理
critical_issues = [
issue for review in reviews
for issue in review["findings"]
if issue["severity"] == "critical"
]
if critical_issues:
# 有严重问题,需要修复
return {
"status": "rejected",
"artifact": artifact,
"reviews": reviews,
"required_fixes": critical_issues
}
else:
# 通过评审
return {
"status": "approved",
"artifact": artifact,
"reviews": reviews,
"minor_suggestions": [
issue for review in reviews
for issue in review["findings"]
if issue["severity"] == "minor"
]
}设计要点:
- 评审者完全不参与实现,保持独立性
- 评审者不看实现者的自评,避免偏见
- 多个评审者并行工作,覆盖不同维度
- 明确的严重性分级和处理流程
- 评审结果驱动是否需要重新实现
市场结构
多个 Agent 提出候选方案,系统按评分、成本、风险或用户偏好选择。这种结构探索能力强,但协调成本高。
适用场景:
- 创意生成:多个 Agent 提出不同创意方向
- 策略设计:不同 Agent 提出竞争策略
- 开放式研究:探索多个可能的答案路径
- 方案比选:从多个方案中选择最优
详细案例:策略设计
class ProposerAgent:
"""提案者 Agent:生成候选策略"""
def __init__(self, perspective):
self.perspective = perspective # 如 "aggressive", "conservative", "balanced"
async def propose_strategy(self, context):
"""基于自身视角提出策略"""
strategy = {
"proposer": f"{self.perspective}_proposer",
"perspective": self.perspective,
"approach": await self._generate_approach(context, self.perspective),
"expected_outcomes": await self._predict_outcomes(context, self.perspective),
"resource_requirements": await self._estimate_resources(context, self.perspective),
"risks": await self._identify_risks(context, self.perspective),
"confidence": 0.0 # 待评估
}
return strategy
class EvaluatorAgent:
"""评估者 Agent:评估候选策略"""
async def evaluate_strategy(self, strategy, evaluation_criteria):
"""基于多维度评估策略"""
evaluation = {
"strategy_id": strategy["proposer"],
"scores": {},
"analysis": {}
}
for criterion in evaluation_criteria:
score = await self._score_on_criterion(strategy, criterion)
evaluation["scores"][criterion] = score
evaluation["analysis"][criterion] = await self._explain_score(strategy, criterion)
# 综合评分
evaluation["overall_score"] = self._compute_weighted_score(
evaluation["scores"],
evaluation_criteria
)
return evaluation
class MarketCoordinator:
"""市场协调器:管理提案和选择"""
def __init__(self):
self.proposers = [
ProposerAgent("aggressive"),
ProposerAgent("conservative"),
ProposerAgent("innovative"),
ProposerAgent("pragmatic")
]
self.evaluator = EvaluatorAgent()
async def design_strategy(self, context, evaluation_criteria):
# 1. 市场阶段:收集提案
proposals = await asyncio.gather(*[
proposer.propose_strategy(context) for proposer in self.proposers
])
# 2. 评估阶段:评估所有提案
evaluations = await asyncio.gather(*[
self.evaluator.evaluate_strategy(proposal, evaluation_criteria)
for proposal in proposals
])
# 3. 选择阶段:根据评估结果选择
best_strategy_idx = max(
range(len(evaluations)),
key=lambda i: evaluations[i]["overall_score"]
)
# 4. 组合阶段:可以组合不同策略的优点
if self._should_combine(evaluations):
final_strategy = self._combine_strategies(proposals, evaluations)
else:
final_strategy = proposals[best_strategy_idx]
return {
"selected_strategy": final_strategy,
"all_proposals": proposals,
"evaluations": evaluations,
"selection_rationale": evaluations[best_strategy_idx]["analysis"]
}
def _should_combine(self, evaluations):
"""判断是否应该组合多个策略"""
# 如果多个策略得分接近且互补,可以考虑组合
scores = [e["overall_score"] for e in evaluations]
max_score = max(scores)
close_scores = [s for s in scores if max_score - s < 0.1]
return len(close_scores) >= 2设计要点:
- 多个提案者基于不同视角生成方案
- 独立的评估者进行客观评估
- 明确的评估标准和权重
- 可以选择最优方案或组合方案
- 保留所有提案和评估记录用于追溯
多智能体的写入边界
多智能体设计的第一原则是:能不拆就不拆,必须拆时先定义所有权。
所有权边界
class OwnershipBoundary:
"""所有权边界管理"""
def __init__(self):
# 明确每个 Agent 拥有的资源
self.ownerships = {
"auth_agent": {
"files": ["auth/*", "middleware/auth*"],
"configs": ["config/auth*"],
"tables": ["users", "sessions", "permissions"]
},
"payment_agent": {
"files": ["payment/*", "billing/*"],
"configs": ["config/payment*"],
"tables": ["transactions", "invoices", "payments"]
},
"notification_agent": {
"files": ["notifications/*"],
"configs": ["config/notification*"],
"tables": ["notifications", "notification_templates"]
}
}
def can_write(self, agent_id, resource):
"""检查 Agent 是否有写入权限"""
agent_resources = self.ownerships.get(agent_id, {})
if resource.startswith("file:"):
return self._check_file_ownership(agent_resources, resource[5:])
elif resource.startswith("config:"):
return self._check_config_ownership(agent_resources, resource[7:])
elif resource.startswith("table:"):
return self._check_table_ownership(agent_resources, resource[6:])
return False
def request_write(self, agent_id, resource, operation):
"""请求写入权限(用于跨边界写入)"""
owner = self._find_owner(resource)
if owner == agent_id:
return {"status": "approved", "reason": "direct ownership"}
elif owner is None:
return {"status": "rejected", "reason": "no owner defined"}
else:
# 需要所有者批准
return {
"status": "requires_approval",
"owner": owner,
"operation": operation
}冲突预防与处理
class ConflictManager:
"""冲突管理"""
def __init__(self):
self.pending_operations = {}
self.conflict_history = []
def register_operation(self, agent_id, resources, operation):
"""注册操作意图"""
# 检测潜在冲突
conflicts = self._detect_conflicts(agent_id, resources)
if conflicts:
# 有冲突,需要协调
return self._handle_conflicts(agent_id, resources, operation, conflicts)
# 无冲突,批准操作
operation_id = generate_uuid()
self.pending_operations[operation_id] = {
"agent": agent_id,
"resources": resources,
"operation": operation,
"status": "approved",
"timestamp": datetime.now()
}
return {"status": "approved", "operation_id": operation_id}
def _detect_conflicts(self, agent_id, resources):
"""检测资源冲突"""
conflicts = []
for op_id, op in self.pending_operations.items():
if op["status"] in ["approved", "in_progress"]:
# 检查资源重叠
common_resources = set(resources) & set(op["resources"])
if common_resources and op["agent"] != agent_id:
conflicts.append({
"operation_id": op_id,
"conflicting_agent": op["agent"],
"conflicting_resources": list(common_resources)
})
return conflicts
def _handle_conflicts(self, agent_id, resources, operation, conflicts):
"""处理冲突"""
# 策略 1:串行化(先来先得)
# 策略 2:优先级(高优先级抢占)
# 策略 3:协调(重新组织操作避免冲突)
resolution = self._choose_resolution_strategy(agent_id, conflicts)
if resolution == "serialize":
return {"status": "queued", "reason": "awaiting conflicting operations"}
elif resolution == "priority":
if self._has_higher_priority(agent_id, conflicts):
self._interrupt_conflicts(conflicts)
return {"status": "approved", "reason": "priority preemption"}
else:
return {"status": "rejected", "reason": "lower priority"}
elif resolution == "reorganize":
new_plan = self._reorganize_operations(agent_id, resources, operation, conflicts)
return {"status": "approved_with_modification", "new_plan": new_plan}”能不拆就不拆”的第一原则
def should_split_into_agents(task):
"""评估是否应该拆分为多 Agent"""
# 不拆分的条件
if task.size < THRESHOLD_SIZE:
return False, "任务太小,协调成本过高"
if task.subtasks_are_highly_coupled():
return False, "子任务强耦合,并行收益有限"
if task.has_clear_owner():
return False, "已有明确责任主体,无需拆分"
# 需要拆分的条件
if task.requires_distinct_perspectives():
return True, "需要不同专业视角"
if task.can_be_parallelized():
return True, "子任务可并行执行"
if task.exceeds_single_context():
return True, "超出单 Agent 上下文容量"
if task.requires_separation_of_concerns():
return True, "需要关注点分离(如生成与审查)"
# 默认不拆分
return False, "未满足拆分条件"连接篇:记忆如何支撑多智能体协作
记忆系统是多智能体协作的基础设施。没有共享记忆,多个 Agent 只能重复工作或互相冲突。
共享记忆空间的设计
class SharedMemorySpace:
"""多智能体共享记忆空间"""
def __init__(self):
# 全局共享记忆
self.global_memory = {
"project_context": {},
"decisions_made": [],
"constraints": [],
"terminology": {}
}
# Agent 专属记忆(可选择性共享)
self.agent_memories = {}
# 记忆访问控制
self.access_control = {}
def write_global(self, key, value, agent_id):
"""写入全局记忆"""
self.global_memory[key] = {
"value": value,
"written_by": agent_id,
"timestamp": datetime.now(),
"version": self._next_version(key)
}
def read_global(self, key):
"""读取全局记忆"""
return self.global_memory.get(key, {}).get("value")
def write_agent_memory(self, agent_id, key, value, share_with=None):
"""写入 Agent 专属记忆"""
if agent_id not in self.agent_memories:
self.agent_memories[agent_id] = {}
self.agent_memories[agent_id][key] = {
"value": value,
"timestamp": datetime.now(),
"shared_with": share_with or []
}
def read_agent_memory(self, agent_id, key, requesting_agent):
"""读取 Agent 专属记忆"""
memory = self.agent_memories.get(agent_id, {}).get(key)
if memory:
# 检查访问权限
if (requesting_agent in memory["shared_with"] or
requesting_agent == agent_id or
memory["shared_with"] == ["all"]):
return memory["value"]
return None协作历史的记录与复用
class CollaborationHistory:
"""协作历史管理"""
def __init__(self, memory_space):
self.memory = memory_space
self.history = []
def record_interaction(self, interaction):
"""记录 Agent 间交互"""
record = {
"interaction_id": generate_uuid(),
"timestamp": datetime.now(),
"participants": interaction["participants"],
"task": interaction["task"],
"messages": interaction["messages"],
"outcome": interaction["outcome"],
"patterns": self._extract_patterns(interaction)
}
self.history.append(record)
# 提取可复用的模式到语义记忆
for pattern in record["patterns"]:
if pattern["confidence"] > PATTERN_THRESHOLD:
self.memory.write_global(
f"pattern_{pattern['type']}",
pattern,
record["interaction_id"]
)
def find_similar_interactions(self, current_task):
"""查找相似的历史交互"""
similar = []
for record in self.history:
similarity = self._compute_similarity(current_task, record["task"])
if similarity > SIMILARITY_THRESHOLD:
similar.append({
"record": record,
"similarity": similarity
})
return sorted(similar, key=lambda x: x["similarity"], reverse=True)
def get_lessons_learned(self, pattern_type):
"""获取特定模式的经验教训"""
lessons = []
for record in self.history:
for pattern in record["patterns"]:
if pattern["type"] == pattern_type and pattern.get("lesson"):
lessons.append({
"lesson": pattern["lesson"],
"context": record["task"],
"success_rate": pattern.get("success_rate", 0)
})
return sorted(lessons, key=lambda x: x["success_rate"], reverse=True)跨智能体的经验传递
class ExperienceTransfer:
"""跨智能体经验传递"""
def __init__(self, shared_memory):
self.shared_memory = shared_memory
self.transfers = []
def capture_experience(self, agent_id, experience):
"""捕获 Agent 的经验"""
experience_record = {
"agent": agent_id,
"task": experience["task"],
"outcome": experience["outcome"],
"lessons_learned": experience["lessons_learned"],
"timestamp": datetime.now(),
"transferable": self._assess_transferability(experience)
}
# 如果经验可传递,写入共享记忆
if experience_record["transferable"]:
self.shared_memory.write_global(
f"experience_{experience['task_type']}",
experience_record,
agent_id
)
return experience_record
def transfer_relevant_experiences(self, agent_id, current_task):
"""为 Agent 传递相关经验"""
relevant = []
# 从共享记忆中检索相关经验
all_experiences = self.shared_memory.find_by_prefix("experience_")
for exp_key, exp_record in all_experiences.items():
relevance = self._compute_relevance(current_task, exp_record["task"])
if relevance > RELEVANCE_THRESHOLD:
relevant.append({
"experience": exp_record,
"relevance": relevance,
"source_agent": exp_record["agent"]
})
# 按相关性排序
relevant.sort(key=lambda x: x["relevance"], reverse=True)
# 记录传递
self.transfers.append({
"to_agent": agent_id,
"task": current_task,
"experiences_transferred": len(relevant),
"timestamp": datetime.now()
})
return relevant[:MAX_TRANSFERRED_EXPERIENCES]
def _assess_transferability(self, experience):
"""评估经验是否可传递"""
# 可传递性标准
criteria = {
"is_generalizable": not experience.get("specific_to_context", False),
"is_successful": experience["outcome"] in ["success", "partial_success"],
"has_clear_lessons": len(experience.get("lessons_learned", [])) > 0,
"not_agent_specific": "personal_preference" not in experience.get("tags", [])
}
return all(criteria.values())通过共享记忆、协作历史记录和经验传递,多智能体系统可以从过去的协作中学习,避免重复错误,积累成功模式,形成组织级的协作智能。
原书相关章节
原书作者:Jimmy Song