从Airflow迁移到LangGraph:392行代码重写AI测试流水线的技术决策复盘

三年前团队选型时,我拍胸脯选了Airflow。两年后,我亲手把这套东西推翻重来。技术选型的坑,踩过一次才懂。从Airflow迁移到LangGraph:392行代码重写AI测试流水线的技术决策复盘 IT技术

背景:为什么放弃Airflow

Airflow的DAG设计,数据流靠key-value传递。我们的测试场景要携带:需求分析结果、RAG检索上下文、生成的测试步骤、执行结果……这些东西塞进kv里,代码写得像解谜游戏。

LangGraph不一样。State是一个完整的TypedDict,节点之间传的是结构化对象。不是零散的参数,是有类型的、能IDE补全的、运行时报错的Pythondict。

核心文件:backend/app/graphs/test_flow.py(392行),6个节点文件分布在backend/app/nodes/目录下。

State设计:TypedDict的11个字段

TestState定义如下:

PYTHON

class TestState(TypedDict):

messages:Annotated[List[BaseMessage],add_messages]

test_case_id:str

status:Literal["pending","analyzing","generating","executing","reporting","completed","failed"]

requirements:List[str]

rag_context:List[dict]

test_steps:List[dict]

execution_result:dict

report:str

error:str

created_at:float

updated_at:float

11个字段覆盖从需求输入到报告输出的全流程数据。每个节点只需读取所需字段,修改负责的字段,返回修改后的dict。LangGraph自动合并回State。

注意messages字段的Annotated包装。LangGraph默认对List做append操作,但add_messages归约函数会智能合并(去重+追加),保证节点间对话上下文不重复。

实战经验:节点修改State后下一个节点读不到,99%原因是返回的不是dict,或字段名拼写错误。LangGraph静默忽略,不报错。

四节点流水线:各司其职

流程类似工厂流水线:需求入口进,报告出口出,中间4个工位。

analyze_node负责:提取用户需求文本→RAG知识库检索→ContextOptimizer裁剪至3000tokens→功能点列表写入requirements。

代码关键点:RAG检索包在try-except内。检索失败不影响流程继续,降级设计保证链路稳定性。

generate_node遍历requirements和rag_context,逐功能点生成测试步骤。实际项目调用LLMSkill做增强生成,当前版本为简化实现。

execute_node遍历test_steps逐个执行,记录每步结果到execution_result.details。

report_node汇总执行数据,生成Markdown格式测试报告,包含通过率统计。

条件边:流程的刹车系统

任何节点失败,后续步骤不应继续。LangGraph条件边实现流程控制:

PYTHON

def should_continue(state: TestState) -> str:

ifstate.get("status")=="failed":

returnEND

return"generate_node"

每个条件函数只返回两个值:节点名(继续)或END(终止)。比一堆if-else清晰得多。

图构建:4行代码串起流水线

PYTHON

def build_test_graph() -> StateGraph:

builder=StateGraph(TestState)

builder.add_node("analyze_node",analyze_node)

builder.add_node("generate_node",generate_node)

builder.add_node("execute_node",execute_node)

builder.add_node("report_node",report_node)

builder.set_entry_point("analyze_node")

builder.add_conditional_edges("analyze_node",should_continue)

builder.add_conditional_edges("generate_node",after_generate)

builder.add_conditional_edges("execute_node",after_execute)

builder.add_edge("report_node",END)

returnbuilder.compile()

调用run_test_flow("TC001","用户登录功能..."),系统自动走完4个节点,返回最终State。

踩坑实录:3个血泪教训

坑1:节点返回None。LangGraph要求节点必须返回dict。忘了return时框架不报错,但State就是不更新。排查半小时才发现。

坑2:async节点用同步方式调用。analyze_node是asyncdef,但图构建后用test_graph.invoke()同步调用——LangGraph自动处理async节点。手动用asyncio.run()包一层反而报EventLoop冲突。

迁移收益:数字说话

单条用例从需求到报告,全流程耗时从人工15-30分钟缩短到自动化30秒以内(含RAG检索和LLM调用)。不是每个环节都完美,但投入产出比显著提升。

LangGraphStateGraph的核心价值:用可组合的节点网络替代硬编码的工作流。每个节点只关心输入输出,节点间通过State传递数据,条件边控制流程走向。392行代码,替代了之前600多行的Airflow定制代码。