工作流骨架:提示链、路由、并行与规划
从”自由发挥”到”有结构地行动”
当你第一次让 LLM 做复杂任务时,通常会经历这样的过程:写一个精心设计的提示词,满怀期待地运行,得到一个还凑合的结果,然后发现要么有错误,要么不够深入,要么完全跑题。于是你加大提示词,加入更多约束和示例,模型变好了,但仍然不够稳定。
问题的根源在于:你还在用单轮对话的思维方式去处理多轮、多步骤的复杂任务。
Agent 系统的核心突破点,就是把”让模型一次性完成所有推理”变成”让模型有结构地分步完成推理”。这个结构由四种基础工作流模式支撑:提示链、路由、并行化和规划。
一、提示链:让失败位置可定位
核心思想
提示链(Prompt Chaining)的出发点很简单:不要让模型一次性完成从理解到表达的全过程,而是把任务拆成多个中间步骤,每一步产生一个可检查的产物。
输入 -> [步骤1] -> 中间产物1 -> [步骤2] -> 中间产物2 -> ... -> 输出
比如让模型写一篇技术文章,单次调用的流程是:
用户需求 -> 模型直接生成文章
提示链的流程是:
用户需求 -> 资料提取 -> 主题归纳 -> 大纲生成 -> 分节写作 -> 事实校验 -> 风格润色
两种方式的表面差异是步骤数量,深层差异是可控性和可调试性。单次调用失败时,你不知道是资料理解错了、结构错了,还是表达错了。提示链可以把错误锁定到具体节点。
价值拆解
可观测性
每个中间步骤都是一个检查点。你可以在资料提取后检查模型是否抓住了关键信息,在大纲生成后验证逻辑结构是否合理,在分节写作后确认每部分的内容密度。
可复用性
中间产物可以被后续步骤复用,也可以被其他任务复用。一次资料提取的结果,可以用于生成文章、制作幻灯片、撰写摘要等多个下游任务。
可迭代性
某个步骤失败时,不需要重跑整个流程。只需要修复该步骤的提示词或输入,然后从断点继续。
专业化
不同步骤可以使用不同的模型、不同的温度参数、甚至完全不同的处理逻辑。事实校验可以用低温度的模型保证准确性,风格润色可以用高温度的模型增加多样性。
一个完整的提示链案例:技术文章写作
把技术文章写作拆成一个完整的提示链:
步骤1:资料提取
def extract_sources(topic: str) -> dict:
prompt = f"""
从以下来源中提取与"{topic}"相关的关键信息:
- 官方文档
- 技术博客
- GitHub 仓库
- Stack Overflow 讨论
输出格式:
{{
"concepts": ["概念1", "概念2", ...],
"key_points": ["要点1", "要点2", ...],
"examples": ["示例1", "示例2", ...],
"common_issues": ["问题1", "问题2", ...]
}}
"""
return call_model(prompt, context=gather_sources(topic))步骤2:主题归纳
def synthesize_theme(extracted: dict) -> str:
prompt = f"""
基于以下提取的信息,归纳文章的核心主题和读者价值:
{json.dumps(extracted, ensure_ascii=False)}
输出:
- 一句话主题
- 目标读者
- 读者将获得什么
- 文章独特角度
"""
return call_model(prompt)步骤3:大纲生成
def generate_outline(theme: str, extracted: dict) -> dict:
prompt = f"""
主题:{theme}
可用素材:
{json.dumps(extracted, ensure_ascii=False)}
生成文章大纲,要求:
1. 每个章节有明确的目标
2. 标注每章使用的素材来源
3. 标注哪些章节需要代码示例
4. 标注哪些章节需要图表
输出格式:
{{
"title": "文章标题",
"sections": [
{{
"title": "章节标题",
"goal": "本章目标",
"sources": ["素材来源"],
"needs_code": true/false,
"needs_diagram": true/false
}}
]
}}
"""
return call_model(prompt)步骤4:分节写作
def write_section(outline: dict, section_index: int, context: dict) -> str:
section = outline["sections"][section_index]
prompt = f"""
章节:{section['title']}
目标:{section['goal']}
可用素材:
{context.get('extracted', {})}
前文概要:
{context.get('previous_summary', '这是第一章')}
写作要求:
- 技术准确,用词精确
- 代码示例完整可运行
- 适当地使用引用块强调重点
- 段落之间逻辑连贯
输出本节完整内容。
"""
return call_model(prompt)步骤5:事实校验
def fact_check(draft: str, sources: list) -> list:
prompt = f"""
检查以下草稿中的事实陈述:
{draft}
可验证来源:
{sources}
输出所有需要验证的陈述:
{{
"claims": [
{{
"statement": "原文陈述",
"verification_needed": "需要验证的点",
"source_hint": "可能在哪个来源中"
}}
]
}}
"""
return call_model(prompt)步骤6:风格润色
def polish(draft: str, style_guide: dict) -> str:
prompt = f"""
对以下草稿进行风格润色:
{draft}
风格要求:
- 标题层级:{style_guide['heading_levels']}
- 代码块语言标注:{style_guide['code_annotations']}
- 引用块使用场景:{style_guide['quote_usage']}
- 段落长度:{style_guide['paragraph_length']}
保持技术准确性,只调整表达方式。
"""
return call_model(prompt)主流程编排
def write_technical_article(topic: str) -> str:
# 步骤1:资料提取
extracted = extract_sources(topic)
print(f"提取了 {len(extracted['concepts'])} 个概念")
# 步骤2:主题归纳
theme = synthesize_theme(extracted)
print(f"主题:{theme}")
# 步骤3:大纲生成
outline = generate_outline(theme, extracted)
print(f"大纲包含 {len(outline['sections'])} 章")
# 步骤4:分节写作
sections = []
for i in range(len(outline['sections'])):
context = {
'extracted': extracted,
'previous_summary': summarize_previous(sections)
}
section_content = write_section(outline, i, context)
sections.append(section_content)
print(f"完成第 {i+1} 章")
# 步骤5:事实校验
draft = '\n\n'.join(sections)
claims = fact_check(draft, extracted['sources'])
verified_draft = verify_claims(draft, claims)
print(f"验证了 {len(claims['claims'])} 个陈述")
# 步骤6:风格润色
final = polish(verified_draft, get_style_guide())
print("文章完成")
return final适用场景判断
适合使用提示链的情况:
- 输出质量要求高:需要事实准确、逻辑严密、表达精炼
- 中间推理必须可审查:医疗诊断、法律分析、金融风控等场景
- 任务可以自然拆成阶段:写作、代码生成、数据分析等
- 失败成本较高:不能接受”重试几次也许能对”的不确定性
不适合使用提示链的情况:
- 简单问答:事实查询、定义解释、简单计算
- 低风险、低成本的一次性生成:闲聊、创意发散、头脑风暴
- 强实时交互:实时对话、游戏 NPC、即时翻译
常见反模式
链条过长
把每个微小的思考步骤都变成一个链节点,导致:
- 延迟累积,用户体验差
- 错误传播,前面的小错误被后面放大
- 维护成本高,一个改动要调整多个节点
判断标准:如果一个步骤的输入直接来自上一步的输出,且没有独立的验证价值,考虑合并。
检查点过多
每个步骤都设置人工检查点,导致:
- 流程频繁中断,失去自动化价值
- 人工疲劳,检查质量下降
- 吞吐量极低,无法规模化
判断标准:只对”失败风险高”或”失败代价大”的步骤设置人工检查点。
步骤间信息丢失
中间产物格式设计不当,导致:
- 后续步骤无法获取前序步骤的关键信息
- 上下文断裂,逻辑不连贯
- 重复计算,浪费资源
判断标准:每个中间产物应该包含”下游步骤可能需要的所有信息”,用结构化格式(JSON、YAML)而非自由文本。
提示链设计原则
步骤粒度
每个步骤应该:
- 有明确的输入和输出
- 有独立的验证标准
- 有可复用的价值
中间产物格式
- 使用结构化格式(JSON、YAML)而非自由文本
- 包含下游步骤可能需要的所有信息
- 预留扩展字段,便于后续增强
错误传播策略
- 每个步骤应该能处理上游的异常输出
- 关键步骤应该有降级策略
- 考虑设置质量阈值,低于阈值时触发人工介入
二、路由:先判断,再分流
为什么需要路由
一个 Agent 系统通常会面对多种输入:简单问答、复杂分析、代码修改、资料检索、工具操作、人工审批。不同输入需要:
- 不同的模型:简单任务用小模型,复杂任务用大模型
- 不同的工具:查询用搜索,修改用编辑器,执行用运行时
- 不同的预算:高风险任务需要人工审批,低风险任务可以自动执行
- 不同的安全策略:只读操作放开,写入操作需要鉴权
如果所有请求都走同一个处理链,要么简单任务被过度处理(浪费资源),要么复杂任务被轻率对待(风险失控)。
路由的分流维度
按任务意图分流
用户输入
-> 意图识别
-> 查询类 -> 搜索/检索流程
-> 写作类 -> 生成/编辑流程
-> 修改类 -> 代码审查+修改流程
-> 执行类 -> 命令构建+执行流程
-> 审查类 -> 分析+报告流程
按风险等级分流
用户请求
-> 风险评估
-> 只读操作 -> 直接执行
-> 可逆写入 -> 低权限执行+日志
-> 不可逆操作 -> 人工审批+高权限执行
-> 破坏性操作 -> 拒绝或特殊审批流程
按复杂度分流
任务分析
-> 复杂度评估
-> 单步任务 -> 直接执行
-> 多步任务 -> 提示链执行
-> 需要规划 -> 规划模式
-> 需要多智能体 -> 协作模式
按数据来源分流
信息请求
-> 来源判断
-> 通用知识 -> 模型内置知识
-> 最新信息 -> 网页搜索
-> 内部文档 -> 知识库检索
-> 代码库 -> 代码搜索+分析
-> 数据库 -> 查询生成+执行
按输出形态分流
生成请求
-> 形态判断
-> 答案 -> 文本生成
-> 文件 -> 文件写入
-> 代码 -> 代码生成+格式化
-> 报告 -> 结构化输出+可视化
-> 操作结果 -> 执行+结果呈现
路由误分流的代价
路由的核心难点是误分流。误分流的代价不是慢一点,而是用错了处理方式。
高风险任务被误判为低风险
- 代码删除操作被当作普通查询
- 生产环境配置修改被当作本地测试
- 敏感数据访问被当作普通查询
这是最危险的情况,防护策略是:不确定时走保守路径。
低风险任务被误判为高风险
- 简单问答被要求人工审批
- 只读操作被当作写入操作
- 本地测试被当作生产操作
这种情况下代价是效率降低,但相对安全。
路由置信度与保守路径设计
路由结果不应该是一个确定的分类,而应该包含置信度:
def route_with_confidence(request: str) -> dict:
prompt = f"""
分析以下请求的类型和风险等级:
请求:{request}
输出:
{{
"intent": "查询/写作/修改/执行/审查",
"intent_confidence": 0.0-1.0,
"risk_level": "只读/可逆写入/不可逆操作/破坏性操作",
"risk_confidence": 0.0-1.0,
"reasoning": "判断依据"
}}
"""
return call_model(prompt)
def conservative_route(route_result: dict) -> str:
# 低置信度时走保守路径
if route_result['risk_confidence'] < 0.7:
return 'conservative_path'
if route_result['risk_level'] in ['不可逆操作', '破坏性操作']:
return 'high_risk_path'
return route_result['intent']保守路径的设计原则:
- 增加验证步骤
- 降低自动化程度
- 增加人工检查点
- 使用更严格的权限
多级路由架构
单级路由容易在复杂场景下失效,多级路由可以逐步细化:
def multi_level_route(request: str):
# 第一级:粗分类
level1 = coarse_classification(request)
# level1: ['信息类', '操作类', '生成类']
# 第二级:细分类(基于第一级结果)
if level1 == '信息类':
level2 = information_subclass(request)
# level2: ['查询', '检索', '分析']
elif level1 == '操作类':
level2 = operation_subclass(request)
# level2: ['只读', '写入', '执行']
else:
level2 = generation_subclass(request)
# level2: ['文本', '代码', '多媒体']
# 第三级:具体路由(基于第二级结果)
return specific_route(level1, level2, request)多级路由的优势:
- 每一级只需要处理有限的类别
- 可以为不同级别使用不同的模型
- 便于调试和优化
常见反模式
过度路由
把每个细微的差异都变成一个路由分支,导致:
- 路由规则难以维护
- 新场景需要添加新分支
- 分支过多,每个分支的样本不足
判断标准:如果两个分支的处理逻辑差异小于 30%,考虑合并。
路由规则难维护
路由规则散落在代码各处,或者用复杂的嵌套条件,导致:
- 新人难以理解
- 修改容易引入 bug
- 无法快速响应新场景
建议:使用配置文件或规则引擎,集中管理路由规则。
路由与业务逻辑耦合
路由判断中嵌入业务逻辑,导致:
- 路由规则难以复用
- 业务逻辑变更影响路由
- 难以测试和调试
建议:路由只负责分类,具体处理逻辑由下游处理器实现。
三、并行化:吞吐量的代价是协调复杂度
并行化的前提条件
并行化适合处理互不依赖的任务。但”互不依赖”这个条件比看起来更严格:
任务独立性
- 子任务之间没有数据依赖
- 子任务的执行顺序不影响结果
- 子任务之间没有控制流依赖
写入不冲突
- 多个写入操作不会修改同一个资源
- 如果必须修改同一资源,有明确的冲突解决策略
- 写入操作的幂等性得到保证
汇总规则明确
- 有明确的合并策略
- 能处理部分失败的情况
- 有超时和取消机制
并行结果合并策略
简单聚合
def parallel_research(topic: str) -> dict:
sources = ['arxiv', 'github', 'stackoverflow', 'docs']
results = parallel_execute(
[lambda: search_source(source, topic) for source in sources]
)
return {
'papers': results[0]['papers'],
'repos': results[1]['repos'],
'discussions': results[2]['posts'],
'docs': results[3]['docs']
}适用于:结果来源明确,不需要去重和排序。
去重合并
def merge_with_dedup(results: list) -> list:
seen = set()
merged = []
for result in results:
for item in result:
# 用唯一标识去重
identifier = get_identifier(item)
if identifier not in seen:
seen.add(identifier)
merged.append(item)
return merged适用于:有重复可能,需要去重。
排序合并
def merge_with_ranking(results: list) -> list:
all_items = []
for result in results:
all_items.extend(result)
# 按相关性或其他指标排序
return sorted(all_items, key=lambda x: x['score'], reverse=True)适用于:需要对结果进行全局排序。
冲突解决
def merge_with_conflict_resolution(results: list) -> dict:
merged = {}
for result in results:
for key, value in result.items():
if key not in merged:
merged[key] = value
else:
# 冲突解决策略
merged[key] = resolve_conflict(merged[key], value)
return merged
def resolve_conflict(existing, new):
# 策略1:保留时间戳较新的
if existing['timestamp'] < new['timestamp']:
return new
# 策略2:保留置信度较高的
if existing.get('confidence', 0) < new.get('confidence', 0):
return new
# 策略3:触发人工审查
if existing != new:
return trigger_review(existing, new)
return existing适用于:可能有冲突,需要明确的解决策略。
写入冲突的预防
所有权边界
# 每个智能体负责不同的文件
agents = {
'agent_1': 'src/auth/*',
'agent_2': 'src/database/*',
'agent_3': 'src/api/*'
}
# 并行执行,保证不会写入同一文件
parallel_execute([
lambda: process_files(agents['agent_1']),
lambda: process_files(agents['agent_2']),
lambda: process_files(agents['agent_3'])
])锁机制
def write_with_lock(file_path: str, content: str):
acquire_lock(file_path)
try:
current = read_file(file_path)
merged = merge_content(current, content)
write_file(file_path, merged)
finally:
release_lock(file_path)原子操作
def atomic_write(file_path: str, content: str):
# 先写临时文件
temp_path = f"{file_path}.tmp"
write_file(temp_path, content)
# 原子性重命名
os.rename(temp_path, file_path)常见反模式
伪并行
名义上是并行,实际上有隐藏的依赖:
# 看起来是并行
results = parallel_execute([
lambda: step1(),
lambda: step2(),
lambda: step3()
])
# 但 step2 依赖 step1 的输出
def step2():
result = get_result_from_step1() # 隐藏依赖
...合并灾难
并行结果难以合并,导致:
# 三个智能体同时编辑同一文件
agents = [
lambda: edit_file('config.yaml', {'key': 'value1'}),
lambda: edit_file('config.yaml', {'key': 'value2'}),
lambda: edit_file('config.yaml', {'key': 'value3'})
]
# 最后一个写入覆盖前面的,前面的工作白费资源竞争
并行任务争抢有限资源:
# 所有任务都需要访问数据库
for i in range(100):
parallel_execute([
lambda: query_database(query),
lambda: query_database(query),
lambda: query_database(query)
])
# 数据库连接池耗尽解决方案:限制并发度,使用资源池。
四、规划:让 Agent 先建立任务地图
计划的要素
规划模式用于处理无法一步完成的任务。计划不是为了形式感,而是为了提前暴露依赖、风险和验收标准。
一个好的计划应该包含:
目标
最终要交付什么,用可验证的方式描述。
坏目标:优化代码
好目标:将 API 响应时间从 500ms 降到 200ms 以下
范围
哪些内容做,哪些不做。
做:用户认证相关的 API
不做:管理员功能、第三方登录
步骤
先后顺序和依赖关系。
steps = [
{'name': '分析现状', 'deps': []},
{'name': '设计优化方案', 'deps': ['分析现状']},
{'name': '实现优化', 'deps': ['设计优化方案']},
{'name': '测试验证', 'deps': ['实现优化']},
{'name': '部署上线', 'deps': ['测试验证']}
]验证
每一步如何证明完成。
steps = [
{
'name': '分析现状',
'validation': '输出性能分析报告,包含瓶颈定位'
},
{
'name': '设计优化方案',
'validation': '方案通过代码审查'
}
]风险
可能失败的点和回退路径。
risks = [
{
'risk': '优化后可能引入 bug',
'probability': 'medium',
'mitigation': '完整的回归测试',
'fallback': '保留回滚方案'
}
]什么时候该规划,什么时候不该
需要规划的情况:
- 任务跨多个文件:需要协调多个模块的修改
- 任务跨多个系统:需要协调多个服务或 API
- 任务跨多个阶段:有明确的先后依赖
- 失败成本明显:出错后难以恢复或代价高昂
不需要规划的情况:
- 单文件内的简单修改
- 低风险的探索性任务
- 可以快速重试的操作
- 实时性要求高的场景
动态规划与重规划
静态计划在执行过程中可能失效,需要动态调整:
def dynamic_plan(task: str):
# 初始计划
plan = create_initial_plan(task)
executed_steps = []
for step in plan['steps']:
# 执行步骤
result = execute_step(step)
# 检查是否需要重规划
if result.get('need_replan'):
# 基于已执行步骤的结果重新规划
plan = replan(plan, executed_steps, result)
continue
executed_steps.append(result)
return executed_steps重规划的触发条件:
- 执行结果与预期不符
- 发现新的依赖关系
- 外部条件发生变化
- 某个步骤失败且无法恢复
常见反模式
过度规划
为简单的任务写详细的计划,导致:
- 花在规划上的时间比执行还长
- 计划本身成为负担
- 无法适应变化
判断标准:如果规划时间超过预期执行时间的 20%,考虑简化。
计划与执行脱节
计划写得很详细,但执行时完全不参考:
- 计划成为文档而非指导
- 执行人员不知道计划的存在
- 计划没有更新机制
建议:让计划成为执行流程的一部分,而不是独立的文档。
计划过于刚性
无法适应执行过程中的变化:
- 意外情况导致整个计划失效
- 没有调整和更新的机制
- 过度依赖初始假设
建议:计划应该包含调整策略和重规划触发条件。
五、模式组合:从单体到协作
四种模式很少单独使用,通常是组合在一起形成一个完整的工作流。
组合方式
用户请求
|
v
[路由层] 判断任务类型和风险
|
+-- 简单任务 --> [直接执行]
|
+-- 复杂任务 --> [规划层] 创建执行计划
|
v
[提示链] 拆解为可执行步骤
|
v
[并行化] 独立步骤并行执行
|
v
[汇总] 合并结果并验证实战案例:代码审查 Agent
看一个完整的例子:代码审查 Agent 如何串联四种模式。
class CodeReviewAgent:
def review(self, pr_url: str):
# 1. 路由:判断 PR 类型
pr_type = self.classify_pr(pr_url)
if pr_type == 'simple':
return self.simple_review(pr_url)
elif pr_type == 'complex':
return self.complex_review(pr_url)
else:
return self.manual_review(pr_url)
def complex_review(self, pr_url: str):
# 2. 规划:创建审查计划
plan = self.create_review_plan(pr_url)
results = {}
# 3. 提示链:按步骤执行
for step in plan['steps']:
if step['parallelizable']:
# 4. 并行化:可并行的步骤并行执行
step_results = self.execute_parallel(step)
else:
step_results = self.execute_serial(step)
results[step['name']] = step_results
# 5. 汇总:合并审查结果
return self.synthesize_results(results)
def execute_parallel(self, step: dict):
# 并行执行多个审查维度
tasks = [
lambda: self.security_review(step['files']),
lambda: self.performance_review(step['files']),
lambda: self.style_review(step['files']),
lambda: self.test_coverage_review(step['files'])
]
return parallel_execute(tasks)
def security_review(self, files: list):
# 提示链:安全审查的子步骤
issues = []
# 子步骤1:识别潜在的安全问题
candidates = self.identify_security_candidates(files)
# 子步骤2:验证每个候选问题
for candidate in candidates:
issue = self.verify_security_issue(candidate)
if issue:
issues.append(issue)
# 子步骤3:生成修复建议
for issue in issues:
issue['suggestion'] = self.generate_fix_suggestion(issue)
return issues模式选择决策树
任务是否复杂?
|
+-- 否 --> 是否需要多个步骤?
| |
| +-- 否 --> 直接执行
| |
| +-- 是 --> 提示链
|
+-- 是 --> 是否有多个独立的子任务?
|
+-- 是 --> 并行化 + 提示链
|
+-- 否 --> 是否需要动态调整?
|
+-- 是 --> 规划 + 提示链
|
+-- 否 --> 是否有多种类型?
|
+-- 是 --> 路由 + 提示链
|
+-- 否 --> 提示链模式选择检查表
| 特征 | 提示链 | 路由 | 并行化 | 规划 |
|---|---|---|---|---|
| 多步骤 | 是 | 可能 | 可能 | 是 |
| 多类型 | 否 | 是 | 可能 | 否 |
| 独立子任务 | 否 | 否 | 是 | 否 |
| 不确定性 | 低 | 中 | 低 | 高 |
| 失败定位 | 强 | 弱 | 弱 | 中 |
| 吞吐量 | 低 | 中 | 高 | 低 |
| 复杂度 | 低 | 中 | 高 | 高 |
总结
基础工作流模式的目标是把 Agent 从”自由发挥”变成”有结构地行动”。结构不是为了限制智能,而是为了降低失控概率。
提示链把复杂任务拆成可检查的步骤,让失败位置可定位。
路由根据任务类型和风险选择处理路径,避免一刀切。
并行化通过同时执行独立任务提升吞吐量,但需要解决合并和冲突问题。
规划让 Agent 先建立任务地图,暴露依赖、风险和验收标准。
四种模式可以组合使用,形成一个完整的工作流:路由判断任务类型,复杂任务进入规划,规划拆成提示链,独立步骤并行执行,最后汇总结果并验证。
结构化的 Agent 不是更聪明的 Agent,而是更可控的 Agent。
原书相关章节
原书作者:Jimmy Song