工作流骨架:提示链、路由、并行与规划

从”自由发挥”到”有结构地行动”

当你第一次让 LLM 做复杂任务时,通常会经历这样的过程:写一个精心设计的提示词,满怀期待地运行,得到一个还凑合的结果,然后发现要么有错误,要么不够深入,要么完全跑题。于是你加大提示词,加入更多约束和示例,模型变好了,但仍然不够稳定。

问题的根源在于:你还在用单轮对话的思维方式去处理多轮、多步骤的复杂任务。

Agent 系统的核心突破点,就是把”让模型一次性完成所有推理”变成”让模型有结构地分步完成推理”。这个结构由四种基础工作流模式支撑:提示链、路由、并行化和规划。

一、提示链:让失败位置可定位

核心思想

提示链(Prompt Chaining)的出发点很简单:不要让模型一次性完成从理解到表达的全过程,而是把任务拆成多个中间步骤,每一步产生一个可检查的产物。

输入 -> [步骤1] -> 中间产物1 -> [步骤2] -> 中间产物2 -> ... -> 输出

比如让模型写一篇技术文章,单次调用的流程是:

用户需求 -> 模型直接生成文章

提示链的流程是:

用户需求 -> 资料提取 -> 主题归纳 -> 大纲生成 -> 分节写作 -> 事实校验 -> 风格润色

两种方式的表面差异是步骤数量,深层差异是可控性和可调试性。单次调用失败时,你不知道是资料理解错了、结构错了,还是表达错了。提示链可以把错误锁定到具体节点。

价值拆解

可观测性

每个中间步骤都是一个检查点。你可以在资料提取后检查模型是否抓住了关键信息,在大纲生成后验证逻辑结构是否合理,在分节写作后确认每部分的内容密度。

可复用性

中间产物可以被后续步骤复用,也可以被其他任务复用。一次资料提取的结果,可以用于生成文章、制作幻灯片、撰写摘要等多个下游任务。

可迭代性

某个步骤失败时,不需要重跑整个流程。只需要修复该步骤的提示词或输入,然后从断点继续。

专业化

不同步骤可以使用不同的模型、不同的温度参数、甚至完全不同的处理逻辑。事实校验可以用低温度的模型保证准确性,风格润色可以用高温度的模型增加多样性。

一个完整的提示链案例:技术文章写作

把技术文章写作拆成一个完整的提示链:

步骤1:资料提取

def extract_sources(topic: str) -> dict:
    prompt = f"""
    从以下来源中提取与"{topic}"相关的关键信息:
    - 官方文档
    - 技术博客
    - GitHub 仓库
    - Stack Overflow 讨论
 
    输出格式:
    {{
        "concepts": ["概念1", "概念2", ...],
        "key_points": ["要点1", "要点2", ...],
        "examples": ["示例1", "示例2", ...],
        "common_issues": ["问题1", "问题2", ...]
    }}
    """
    return call_model(prompt, context=gather_sources(topic))

步骤2:主题归纳

def synthesize_theme(extracted: dict) -> str:
    prompt = f"""
    基于以下提取的信息,归纳文章的核心主题和读者价值:
 
    {json.dumps(extracted, ensure_ascii=False)}
 
    输出:
    - 一句话主题
    - 目标读者
    - 读者将获得什么
    - 文章独特角度
    """
    return call_model(prompt)

步骤3:大纲生成

def generate_outline(theme: str, extracted: dict) -> dict:
    prompt = f"""
    主题:{theme}
 
    可用素材:
    {json.dumps(extracted, ensure_ascii=False)}
 
    生成文章大纲,要求:
    1. 每个章节有明确的目标
    2. 标注每章使用的素材来源
    3. 标注哪些章节需要代码示例
    4. 标注哪些章节需要图表
 
    输出格式:
    {{
        "title": "文章标题",
        "sections": [
            {{
                "title": "章节标题",
                "goal": "本章目标",
                "sources": ["素材来源"],
                "needs_code": true/false,
                "needs_diagram": true/false
            }}
        ]
    }}
    """
    return call_model(prompt)

步骤4:分节写作

def write_section(outline: dict, section_index: int, context: dict) -> str:
    section = outline["sections"][section_index]
    prompt = f"""
    章节:{section['title']}
    目标:{section['goal']}
 
    可用素材:
    {context.get('extracted', {})}
 
    前文概要:
    {context.get('previous_summary', '这是第一章')}
 
    写作要求:
    - 技术准确,用词精确
    - 代码示例完整可运行
    - 适当地使用引用块强调重点
    - 段落之间逻辑连贯
 
    输出本节完整内容。
    """
    return call_model(prompt)

步骤5:事实校验

def fact_check(draft: str, sources: list) -> list:
    prompt = f"""
    检查以下草稿中的事实陈述:
 
    {draft}
 
    可验证来源:
    {sources}
 
    输出所有需要验证的陈述:
    {{
        "claims": [
            {{
                "statement": "原文陈述",
                "verification_needed": "需要验证的点",
                "source_hint": "可能在哪个来源中"
            }}
        ]
    }}
    """
    return call_model(prompt)

步骤6:风格润色

def polish(draft: str, style_guide: dict) -> str:
    prompt = f"""
    对以下草稿进行风格润色:
 
    {draft}
 
    风格要求:
    - 标题层级:{style_guide['heading_levels']}
    - 代码块语言标注:{style_guide['code_annotations']}
    - 引用块使用场景:{style_guide['quote_usage']}
    - 段落长度:{style_guide['paragraph_length']}
 
    保持技术准确性,只调整表达方式。
    """
    return call_model(prompt)

主流程编排

def write_technical_article(topic: str) -> str:
    # 步骤1:资料提取
    extracted = extract_sources(topic)
    print(f"提取了 {len(extracted['concepts'])} 个概念")
 
    # 步骤2:主题归纳
    theme = synthesize_theme(extracted)
    print(f"主题:{theme}")
 
    # 步骤3:大纲生成
    outline = generate_outline(theme, extracted)
    print(f"大纲包含 {len(outline['sections'])} 章")
 
    # 步骤4:分节写作
    sections = []
    for i in range(len(outline['sections'])):
        context = {
            'extracted': extracted,
            'previous_summary': summarize_previous(sections)
        }
        section_content = write_section(outline, i, context)
        sections.append(section_content)
        print(f"完成第 {i+1} 章")
 
    # 步骤5:事实校验
    draft = '\n\n'.join(sections)
    claims = fact_check(draft, extracted['sources'])
    verified_draft = verify_claims(draft, claims)
    print(f"验证了 {len(claims['claims'])} 个陈述")
 
    # 步骤6:风格润色
    final = polish(verified_draft, get_style_guide())
    print("文章完成")
 
    return final

适用场景判断

适合使用提示链的情况:

  1. 输出质量要求高:需要事实准确、逻辑严密、表达精炼
  2. 中间推理必须可审查:医疗诊断、法律分析、金融风控等场景
  3. 任务可以自然拆成阶段:写作、代码生成、数据分析等
  4. 失败成本较高:不能接受”重试几次也许能对”的不确定性

不适合使用提示链的情况:

  1. 简单问答:事实查询、定义解释、简单计算
  2. 低风险、低成本的一次性生成:闲聊、创意发散、头脑风暴
  3. 强实时交互:实时对话、游戏 NPC、即时翻译

常见反模式

链条过长

把每个微小的思考步骤都变成一个链节点,导致:

  • 延迟累积,用户体验差
  • 错误传播,前面的小错误被后面放大
  • 维护成本高,一个改动要调整多个节点

判断标准:如果一个步骤的输入直接来自上一步的输出,且没有独立的验证价值,考虑合并。

检查点过多

每个步骤都设置人工检查点,导致:

  • 流程频繁中断,失去自动化价值
  • 人工疲劳,检查质量下降
  • 吞吐量极低,无法规模化

判断标准:只对”失败风险高”或”失败代价大”的步骤设置人工检查点。

步骤间信息丢失

中间产物格式设计不当,导致:

  • 后续步骤无法获取前序步骤的关键信息
  • 上下文断裂,逻辑不连贯
  • 重复计算,浪费资源

判断标准:每个中间产物应该包含”下游步骤可能需要的所有信息”,用结构化格式(JSON、YAML)而非自由文本。

提示链设计原则

步骤粒度

每个步骤应该:

  • 有明确的输入和输出
  • 有独立的验证标准
  • 有可复用的价值

中间产物格式

  • 使用结构化格式(JSON、YAML)而非自由文本
  • 包含下游步骤可能需要的所有信息
  • 预留扩展字段,便于后续增强

错误传播策略

  • 每个步骤应该能处理上游的异常输出
  • 关键步骤应该有降级策略
  • 考虑设置质量阈值,低于阈值时触发人工介入

二、路由:先判断,再分流

为什么需要路由

一个 Agent 系统通常会面对多种输入:简单问答、复杂分析、代码修改、资料检索、工具操作、人工审批。不同输入需要:

  • 不同的模型:简单任务用小模型,复杂任务用大模型
  • 不同的工具:查询用搜索,修改用编辑器,执行用运行时
  • 不同的预算:高风险任务需要人工审批,低风险任务可以自动执行
  • 不同的安全策略:只读操作放开,写入操作需要鉴权

如果所有请求都走同一个处理链,要么简单任务被过度处理(浪费资源),要么复杂任务被轻率对待(风险失控)。

路由的分流维度

按任务意图分流

用户输入
  -> 意图识别
     -> 查询类 -> 搜索/检索流程
     -> 写作类 -> 生成/编辑流程
     -> 修改类 -> 代码审查+修改流程
     -> 执行类 -> 命令构建+执行流程
     -> 审查类 -> 分析+报告流程

按风险等级分流

用户请求
  -> 风险评估
     -> 只读操作 -> 直接执行
     -> 可逆写入 -> 低权限执行+日志
     -> 不可逆操作 -> 人工审批+高权限执行
     -> 破坏性操作 -> 拒绝或特殊审批流程

按复杂度分流

任务分析
  -> 复杂度评估
     -> 单步任务 -> 直接执行
     -> 多步任务 -> 提示链执行
     -> 需要规划 -> 规划模式
     -> 需要多智能体 -> 协作模式

按数据来源分流

信息请求
  -> 来源判断
     -> 通用知识 -> 模型内置知识
     -> 最新信息 -> 网页搜索
     -> 内部文档 -> 知识库检索
     -> 代码库 -> 代码搜索+分析
     -> 数据库 -> 查询生成+执行

按输出形态分流

生成请求
  -> 形态判断
     -> 答案 -> 文本生成
     -> 文件 -> 文件写入
     -> 代码 -> 代码生成+格式化
     -> 报告 -> 结构化输出+可视化
     -> 操作结果 -> 执行+结果呈现

路由误分流的代价

路由的核心难点是误分流。误分流的代价不是慢一点,而是用错了处理方式。

高风险任务被误判为低风险

  • 代码删除操作被当作普通查询
  • 生产环境配置修改被当作本地测试
  • 敏感数据访问被当作普通查询

这是最危险的情况,防护策略是:不确定时走保守路径。

低风险任务被误判为高风险

  • 简单问答被要求人工审批
  • 只读操作被当作写入操作
  • 本地测试被当作生产操作

这种情况下代价是效率降低,但相对安全。

路由置信度与保守路径设计

路由结果不应该是一个确定的分类,而应该包含置信度:

def route_with_confidence(request: str) -> dict:
    prompt = f"""
    分析以下请求的类型和风险等级:
 
    请求:{request}
 
    输出:
    {{
        "intent": "查询/写作/修改/执行/审查",
        "intent_confidence": 0.0-1.0,
        "risk_level": "只读/可逆写入/不可逆操作/破坏性操作",
        "risk_confidence": 0.0-1.0,
        "reasoning": "判断依据"
    }}
    """
    return call_model(prompt)
 
def conservative_route(route_result: dict) -> str:
    # 低置信度时走保守路径
    if route_result['risk_confidence'] < 0.7:
        return 'conservative_path'
    if route_result['risk_level'] in ['不可逆操作', '破坏性操作']:
        return 'high_risk_path'
    return route_result['intent']

保守路径的设计原则:

  • 增加验证步骤
  • 降低自动化程度
  • 增加人工检查点
  • 使用更严格的权限

多级路由架构

单级路由容易在复杂场景下失效,多级路由可以逐步细化:

def multi_level_route(request: str):
    # 第一级:粗分类
    level1 = coarse_classification(request)
    # level1: ['信息类', '操作类', '生成类']
 
    # 第二级:细分类(基于第一级结果)
    if level1 == '信息类':
        level2 = information_subclass(request)
        # level2: ['查询', '检索', '分析']
    elif level1 == '操作类':
        level2 = operation_subclass(request)
        # level2: ['只读', '写入', '执行']
    else:
        level2 = generation_subclass(request)
        # level2: ['文本', '代码', '多媒体']
 
    # 第三级:具体路由(基于第二级结果)
    return specific_route(level1, level2, request)

多级路由的优势:

  • 每一级只需要处理有限的类别
  • 可以为不同级别使用不同的模型
  • 便于调试和优化

常见反模式

过度路由

把每个细微的差异都变成一个路由分支,导致:

  • 路由规则难以维护
  • 新场景需要添加新分支
  • 分支过多,每个分支的样本不足

判断标准:如果两个分支的处理逻辑差异小于 30%,考虑合并。

路由规则难维护

路由规则散落在代码各处,或者用复杂的嵌套条件,导致:

  • 新人难以理解
  • 修改容易引入 bug
  • 无法快速响应新场景

建议:使用配置文件或规则引擎,集中管理路由规则。

路由与业务逻辑耦合

路由判断中嵌入业务逻辑,导致:

  • 路由规则难以复用
  • 业务逻辑变更影响路由
  • 难以测试和调试

建议:路由只负责分类,具体处理逻辑由下游处理器实现。

三、并行化:吞吐量的代价是协调复杂度

并行化的前提条件

并行化适合处理互不依赖的任务。但”互不依赖”这个条件比看起来更严格:

任务独立性

  • 子任务之间没有数据依赖
  • 子任务的执行顺序不影响结果
  • 子任务之间没有控制流依赖

写入不冲突

  • 多个写入操作不会修改同一个资源
  • 如果必须修改同一资源,有明确的冲突解决策略
  • 写入操作的幂等性得到保证

汇总规则明确

  • 有明确的合并策略
  • 能处理部分失败的情况
  • 有超时和取消机制

并行结果合并策略

简单聚合

def parallel_research(topic: str) -> dict:
    sources = ['arxiv', 'github', 'stackoverflow', 'docs']
 
    results = parallel_execute(
        [lambda: search_source(source, topic) for source in sources]
    )
 
    return {
        'papers': results[0]['papers'],
        'repos': results[1]['repos'],
        'discussions': results[2]['posts'],
        'docs': results[3]['docs']
    }

适用于:结果来源明确,不需要去重和排序。

去重合并

def merge_with_dedup(results: list) -> list:
    seen = set()
    merged = []
 
    for result in results:
        for item in result:
            # 用唯一标识去重
            identifier = get_identifier(item)
            if identifier not in seen:
                seen.add(identifier)
                merged.append(item)
 
    return merged

适用于:有重复可能,需要去重。

排序合并

def merge_with_ranking(results: list) -> list:
    all_items = []
 
    for result in results:
        all_items.extend(result)
 
    # 按相关性或其他指标排序
    return sorted(all_items, key=lambda x: x['score'], reverse=True)

适用于:需要对结果进行全局排序。

冲突解决

def merge_with_conflict_resolution(results: list) -> dict:
    merged = {}
 
    for result in results:
        for key, value in result.items():
            if key not in merged:
                merged[key] = value
            else:
                # 冲突解决策略
                merged[key] = resolve_conflict(merged[key], value)
 
    return merged
 
def resolve_conflict(existing, new):
    # 策略1:保留时间戳较新的
    if existing['timestamp'] < new['timestamp']:
        return new
 
    # 策略2:保留置信度较高的
    if existing.get('confidence', 0) < new.get('confidence', 0):
        return new
 
    # 策略3:触发人工审查
    if existing != new:
        return trigger_review(existing, new)
 
    return existing

适用于:可能有冲突,需要明确的解决策略。

写入冲突的预防

所有权边界

# 每个智能体负责不同的文件
agents = {
    'agent_1': 'src/auth/*',
    'agent_2': 'src/database/*',
    'agent_3': 'src/api/*'
}
 
# 并行执行,保证不会写入同一文件
parallel_execute([
    lambda: process_files(agents['agent_1']),
    lambda: process_files(agents['agent_2']),
    lambda: process_files(agents['agent_3'])
])

锁机制

def write_with_lock(file_path: str, content: str):
    acquire_lock(file_path)
    try:
        current = read_file(file_path)
        merged = merge_content(current, content)
        write_file(file_path, merged)
    finally:
        release_lock(file_path)

原子操作

def atomic_write(file_path: str, content: str):
    # 先写临时文件
    temp_path = f"{file_path}.tmp"
    write_file(temp_path, content)
 
    # 原子性重命名
    os.rename(temp_path, file_path)

常见反模式

伪并行

名义上是并行,实际上有隐藏的依赖:

# 看起来是并行
results = parallel_execute([
    lambda: step1(),
    lambda: step2(),
    lambda: step3()
])
 
# 但 step2 依赖 step1 的输出
def step2():
    result = get_result_from_step1()  # 隐藏依赖
    ...

合并灾难

并行结果难以合并,导致:

# 三个智能体同时编辑同一文件
agents = [
    lambda: edit_file('config.yaml', {'key': 'value1'}),
    lambda: edit_file('config.yaml', {'key': 'value2'}),
    lambda: edit_file('config.yaml', {'key': 'value3'})
]
 
# 最后一个写入覆盖前面的,前面的工作白费

资源竞争

并行任务争抢有限资源:

# 所有任务都需要访问数据库
for i in range(100):
    parallel_execute([
        lambda: query_database(query),
        lambda: query_database(query),
        lambda: query_database(query)
    ])
 
# 数据库连接池耗尽

解决方案:限制并发度,使用资源池。

四、规划:让 Agent 先建立任务地图

计划的要素

规划模式用于处理无法一步完成的任务。计划不是为了形式感,而是为了提前暴露依赖、风险和验收标准。

一个好的计划应该包含:

目标

最终要交付什么,用可验证的方式描述。

坏目标:优化代码
好目标:将 API 响应时间从 500ms 降到 200ms 以下

范围

哪些内容做,哪些不做。

做:用户认证相关的 API
不做:管理员功能、第三方登录

步骤

先后顺序和依赖关系。

steps = [
    {'name': '分析现状', 'deps': []},
    {'name': '设计优化方案', 'deps': ['分析现状']},
    {'name': '实现优化', 'deps': ['设计优化方案']},
    {'name': '测试验证', 'deps': ['实现优化']},
    {'name': '部署上线', 'deps': ['测试验证']}
]

验证

每一步如何证明完成。

steps = [
    {
        'name': '分析现状',
        'validation': '输出性能分析报告,包含瓶颈定位'
    },
    {
        'name': '设计优化方案',
        'validation': '方案通过代码审查'
    }
]

风险

可能失败的点和回退路径。

risks = [
    {
        'risk': '优化后可能引入 bug',
        'probability': 'medium',
        'mitigation': '完整的回归测试',
        'fallback': '保留回滚方案'
    }
]

什么时候该规划,什么时候不该

需要规划的情况:

  • 任务跨多个文件:需要协调多个模块的修改
  • 任务跨多个系统:需要协调多个服务或 API
  • 任务跨多个阶段:有明确的先后依赖
  • 失败成本明显:出错后难以恢复或代价高昂

不需要规划的情况:

  • 单文件内的简单修改
  • 低风险的探索性任务
  • 可以快速重试的操作
  • 实时性要求高的场景

动态规划与重规划

静态计划在执行过程中可能失效,需要动态调整:

def dynamic_plan(task: str):
    # 初始计划
    plan = create_initial_plan(task)
 
    executed_steps = []
 
    for step in plan['steps']:
        # 执行步骤
        result = execute_step(step)
 
        # 检查是否需要重规划
        if result.get('need_replan'):
            # 基于已执行步骤的结果重新规划
            plan = replan(plan, executed_steps, result)
            continue
 
        executed_steps.append(result)
 
    return executed_steps

重规划的触发条件:

  • 执行结果与预期不符
  • 发现新的依赖关系
  • 外部条件发生变化
  • 某个步骤失败且无法恢复

常见反模式

过度规划

为简单的任务写详细的计划,导致:

  • 花在规划上的时间比执行还长
  • 计划本身成为负担
  • 无法适应变化

判断标准:如果规划时间超过预期执行时间的 20%,考虑简化。

计划与执行脱节

计划写得很详细,但执行时完全不参考:

  • 计划成为文档而非指导
  • 执行人员不知道计划的存在
  • 计划没有更新机制

建议:让计划成为执行流程的一部分,而不是独立的文档。

计划过于刚性

无法适应执行过程中的变化:

  • 意外情况导致整个计划失效
  • 没有调整和更新的机制
  • 过度依赖初始假设

建议:计划应该包含调整策略和重规划触发条件。

五、模式组合:从单体到协作

四种模式很少单独使用,通常是组合在一起形成一个完整的工作流。

组合方式

用户请求
    |
    v
[路由层] 判断任务类型和风险
    |
    +-- 简单任务 --> [直接执行]
    |
    +-- 复杂任务 --> [规划层] 创建执行计划
                          |
                          v
                     [提示链] 拆解为可执行步骤
                          |
                          v
                     [并行化] 独立步骤并行执行
                          |
                          v
                     [汇总] 合并结果并验证

实战案例:代码审查 Agent

看一个完整的例子:代码审查 Agent 如何串联四种模式。

class CodeReviewAgent:
    def review(self, pr_url: str):
        # 1. 路由:判断 PR 类型
        pr_type = self.classify_pr(pr_url)
 
        if pr_type == 'simple':
            return self.simple_review(pr_url)
        elif pr_type == 'complex':
            return self.complex_review(pr_url)
        else:
            return self.manual_review(pr_url)
 
    def complex_review(self, pr_url: str):
        # 2. 规划:创建审查计划
        plan = self.create_review_plan(pr_url)
 
        results = {}
 
        # 3. 提示链:按步骤执行
        for step in plan['steps']:
            if step['parallelizable']:
                # 4. 并行化:可并行的步骤并行执行
                step_results = self.execute_parallel(step)
            else:
                step_results = self.execute_serial(step)
 
            results[step['name']] = step_results
 
        # 5. 汇总:合并审查结果
        return self.synthesize_results(results)
 
    def execute_parallel(self, step: dict):
        # 并行执行多个审查维度
        tasks = [
            lambda: self.security_review(step['files']),
            lambda: self.performance_review(step['files']),
            lambda: self.style_review(step['files']),
            lambda: self.test_coverage_review(step['files'])
        ]
 
        return parallel_execute(tasks)
 
    def security_review(self, files: list):
        # 提示链:安全审查的子步骤
        issues = []
 
        # 子步骤1:识别潜在的安全问题
        candidates = self.identify_security_candidates(files)
 
        # 子步骤2:验证每个候选问题
        for candidate in candidates:
            issue = self.verify_security_issue(candidate)
            if issue:
                issues.append(issue)
 
        # 子步骤3:生成修复建议
        for issue in issues:
            issue['suggestion'] = self.generate_fix_suggestion(issue)
 
        return issues

模式选择决策树

任务是否复杂?
|
+-- 否 --> 是否需要多个步骤?
|           |
|           +-- 否 --> 直接执行
|           |
|           +-- 是 --> 提示链
|
+-- 是 --> 是否有多个独立的子任务?
            |
            +-- 是 --> 并行化 + 提示链
            |
            +-- 否 --> 是否需要动态调整?
                        |
                        +-- 是 --> 规划 + 提示链
                        |
                        +-- 否 --> 是否有多种类型?
                                    |
                                    +-- 是 --> 路由 + 提示链
                                    |
                                    +-- 否 --> 提示链

模式选择检查表

特征提示链路由并行化规划
多步骤可能可能
多类型可能
独立子任务
不确定性
失败定位
吞吐量
复杂度

总结

基础工作流模式的目标是把 Agent 从”自由发挥”变成”有结构地行动”。结构不是为了限制智能,而是为了降低失控概率。

提示链把复杂任务拆成可检查的步骤,让失败位置可定位。

路由根据任务类型和风险选择处理路径,避免一刀切。

并行化通过同时执行独立任务提升吞吐量,但需要解决合并和冲突问题。

规划让 Agent 先建立任务地图,暴露依赖、风险和验收标准。

四种模式可以组合使用,形成一个完整的工作流:路由判断任务类型,复杂任务进入规划,规划拆成提示链,独立步骤并行执行,最后汇总结果并验证。

结构化的 Agent 不是更聪明的 Agent,而是更可控的 Agent。


原书相关章节

原书作者:Jimmy Song