从Airflow迁移到LangGraph:392行代码重写AI测试流水线的技术决策复盘
三年前团队选型时,我拍胸脯选了Airflow。两年后,我亲手把这套东西推翻重来。技术选型的坑,踩过一次才懂。
背景:为什么放弃Airflow
Airflow的DAG设计,数据流靠key-value传递。我们的测试场景要携带:需求分析结果、RAG检索上下文、生成的测试步骤、执行结果……这些东西塞进kv里,代码写得像解谜游戏。
LangGraph不一样。State是一个完整的TypedDict,节点之间传的是结构化对象。不是零散的参数,是有类型的、能IDE补全的、运行时报错的Pythondict。
核心文件:backend/app/graphs/test_flow.py(392行),6个节点文件分布在backend/app/nodes/目录下。
State设计:TypedDict的11个字段
TestState定义如下:
PYTHON
class TestState(TypedDict):messages:Annotated[List[BaseMessage],add_messages]
test_case_id:str
status:Literal["pending","analyzing","generating","executing","reporting","completed","failed"]
requirements:List[str]
rag_context:List[dict]
test_steps:List[dict]
execution_result:dict
report:str
error:str
created_at:float
updated_at:float
11个字段覆盖从需求输入到报告输出的全流程数据。每个节点只需读取所需字段,修改负责的字段,返回修改后的dict。LangGraph自动合并回State。
注意messages字段的Annotated包装。LangGraph默认对List做append操作,但add_messages归约函数会智能合并(去重+追加),保证节点间对话上下文不重复。
实战经验:节点修改State后下一个节点读不到,99%原因是返回的不是dict,或字段名拼写错误。LangGraph静默忽略,不报错。
四节点流水线:各司其职
流程类似工厂流水线:需求入口进,报告出口出,中间4个工位。
analyze_node负责:提取用户需求文本→RAG知识库检索→ContextOptimizer裁剪至3000tokens→功能点列表写入requirements。
代码关键点:RAG检索包在try-except内。检索失败不影响流程继续,降级设计保证链路稳定性。
generate_node遍历requirements和rag_context,逐功能点生成测试步骤。实际项目调用LLMSkill做增强生成,当前版本为简化实现。
execute_node遍历test_steps逐个执行,记录每步结果到execution_result.details。
report_node汇总执行数据,生成Markdown格式测试报告,包含通过率统计。
条件边:流程的刹车系统
任何节点失败,后续步骤不应继续。LangGraph条件边实现流程控制:
PYTHON
def should_continue(state: TestState) -> str:ifstate.get("status")=="failed":
returnEND
return"generate_node"
每个条件函数只返回两个值:节点名(继续)或END(终止)。比一堆if-else清晰得多。
图构建:4行代码串起流水线
PYTHON
def build_test_graph() -> StateGraph:builder=StateGraph(TestState)
builder.add_node("analyze_node",analyze_node)
builder.add_node("generate_node",generate_node)
builder.add_node("execute_node",execute_node)
builder.add_node("report_node",report_node)
builder.set_entry_point("analyze_node")
builder.add_conditional_edges("analyze_node",should_continue)
builder.add_conditional_edges("generate_node",after_generate)
builder.add_conditional_edges("execute_node",after_execute)
builder.add_edge("report_node",END)
returnbuilder.compile()
调用run_test_flow("TC001","用户登录功能..."),系统自动走完4个节点,返回最终State。
踩坑实录:3个血泪教训
坑1:节点返回None。LangGraph要求节点必须返回dict。忘了return时框架不报错,但State就是不更新。排查半小时才发现。
坑2:async节点用同步方式调用。analyze_node是asyncdef,但图构建后用test_graph.invoke()同步调用——LangGraph自动处理async节点。手动用asyncio.run()包一层反而报EventLoop冲突。
迁移收益:数字说话
单条用例从需求到报告,全流程耗时从人工15-30分钟缩短到自动化30秒以内(含RAG检索和LLM调用)。不是每个环节都完美,但投入产出比显著提升。
LangGraphStateGraph的核心价值:用可组合的节点网络替代硬编码的工作流。每个节点只关心输入输出,节点间通过State传递数据,条件边控制流程走向。392行代码,替代了之前600多行的Airflow定制代码。
