Projects

Created
Oct 25, 2025 01:38 PM
Tags

项目背景

为了应对模型训练数据和现实生活中实际数据之间的存在的时效性问题以及涉及私域知识的问答场景,RAG技术应运而生。RAG,顾名思义就是通过向外部数据源获取对应的数据(Retrieval),用于增强(Argument)大模型生成(Generation)的回答质量。
最早出现的RAG架构是简单的Retrieval - Generation架构,我们拿到用户给出的问题,进行一定的预处理(关键词提取等等),得到预处理之后的问题,接着通过Embedding Model从海量资料中抓取相关的资料作为Prompt交给大模型用于增强模型回答的质量。
但是基于语义相似性的匹配进行相关预料的抓取未必能够处理所有情况,因为能够用于增强回答质量的语料不一定和问题本身存在相似性。一个常见的例子就是:告诉我“提出水是万物的本源”的哲学家的徒弟提出的本体论观点。而我们的语料中并不直接存在这个问题的答案,语料库中可能提到:
  1. 泰勒斯提出水是万物的本源
  1. 泰勒斯的弟子有阿纳克西曼德
  1. 阿纳克西曼德将没有任何形式规定性的阿派朗认定为万物的本源
如果单纯从语义相似度匹配出发,我们大概率只能retrieval到第一个句子用于增强大模型的回答。但是缺失语料2和语料3的情况下,如果我们所使用的大模型训练语料中没有哲学相关知识,那么缺失这些关键信息,大模型将无法正确回答这些问题,甚至会出现“幻觉”。
因此GraphRAG技术诞生了,常见的GraphRAG包含两个步骤:
  1. Offline: 我们需要离线对语料库进行图索引的构建(将非结构化语料转化为结构化数据存储到图数据库中)
    1. Online: 当GraphRAG系统接收到用户问题时,根据图数据库捕捉到的语料库中不同实体之间的关联关系,我们可以从图数据库中抓取到上面的三句话(具体图数据库索引可能如下图所示)
notion imagenotion image
但是GraphRAG本身也存在几个问题:
  1. Graph Index的构建过程存在优化的可能,构建的GraphIndex会影响答案的质量。
  1. GraphRAG Serving的过程中Token消耗巨大
  1. GraphRAG中存在各种各样的图算法,如何Retrieval效果最好呢?(配置空间过大)
本次项目主要针对第三个问题展开。我们希望借助大模型的泛化能力使其自动识别用户问题中的意图,然后选择合适配置(比如选择图算法)从图数据库中读取对应的数据用于增强大模型回答质量——也就是本次项目Agentic GraphRAG的目的所在。

现有 Workflow:优雅的解耦,未竟的并行

现在的HugeGraph-AI项目中存在两个核心抽象:
  1. Operator:表示「原子式的操作单元」,负责完成一个明确的子任务,如向量索引构建、向量相似度查询、图数据相关操作等等
  1. Workflow:由Operator作为节点构成的链状执行流。项目中预定义好的Workflow和项目Demo用例一一对应(如GraphRAG, Vector-Similarity-Based RAG)
由于Operator的实现需要遵循下面的接口
class Operator: @abstractmethod def run(context: dict[str, Any]) -> dict[str,Any]: return {}
Operator在实际运行的时候接受字典类型的context对象作为输入,返回的对象也是一个字典,用来作为下一个Operator的输入,这样的设计有一个很高明的地方——他将不同的Operator之间的依赖关系和Operator本身的具体实现解耦了,每个Operator是一个相对独立的存在,如果Operator A需要依赖Operator B的输出,那么只需要检查context对象中是否存有Operator B的输出即可。是一种解耦的设计。这种设计的好处在于我们能够很方便地将不同的Operator自由地组合。根据不同的用户输入组装合适Workflow Serving用户请求,那不正是我们在项目背景中提到的Agentic GraphRAG的目的所在吗?
👉🏼
理论上现有设计已经可以正常过渡到Agentic GraphRAG,但是现有设计存在诸多悬而未决的问题:
  1. 现有调度器仅仅支持链状Workflow,缺失了可能存在的并行空间
  1. 现有调度器无法复用被反复使用到的Workflow(本质原因是因为现有Workflow设计是有状态的)

打破链式束缚,拥抱全新架构

之前的调度器给我们的启发是Operator粒度的解耦是一个不错的设计理念,但是调度器本身能力有限,限制了Workflow的能力。因此我们计划替换项目中的调度器!经过对几种不同的Workflow编排框架进行简单的调研之后,我们认为下面几个特性是我们筛选调度器的标准。(下面我们统一将框架编排对象称为Workflow,Workflow由一系列Task组成)
  1. 并行性:Workflow中没有数据依赖关系的不同Task能否支持自动并行
  1. 低耦合:Task的具体实现应该和Workflow本身解耦(用人话:一种Task可以作为多种不同的Workflow的节点,同时Task的实现是否需要包含其依赖关系约束?)
  1. 数据共享:由于我们希望不同的Task之间的依赖关系解耦,那么我们需要Workflow粒度的数据共享机制用来在不同的Task共享数据(用于参数传递)
  1. 提供Python接口

海底捞针,山穷水尽

我们首先将目光放到了现在炙手可热的AI Workflow调度框架。围绕前面提到的几个维度,我们分别调研了下面几种不同的Workflow编排框架——LlamaIndex,Agno,Pydantic-Ai,LangGraph。

LlamaIndex

对于LlamaIndex,我们用一个常见的例子说明LlamaIndex这个框架的设计理念。
from workflows import Workflow, Context, step from workflows.events import StartEvent, StopEvent, Event class StepEvent(Event): message: str class MyWorkflow(Workflow): @step async def step_one(self, ctx: Context, ev: StartEvent) -> StepEvent: current_count = await ctx.store.get("count", default=0) current_count += 1 await ctx.store.set("count", current_count) print("step one called once") return StepEvent("launch step two") @step async def step_two(self, ctx: Context, ev: StepEvent) -> StopEvent: print("step two called once") return StopEvent()
从上面这个简单的例子我们可以看到很多问题。首先明确一个观念:Workflow由两个元素构成:Task,Task之间的依赖关系。只要这两个元素确定之后一个Workflow就确定下来了。我们可以看到LlamaIndex中每个Task(对应代码中的@step注解的函数)的实现和Workflow存在依赖关系。因为每个Task的实现都需要传入Event对象作为参数,但是Event参数其实就是对Task之间依赖关系的一种限定。所以LlamaIndex不具备低耦合的特点。同时我们发现Task作为Workflow对象的成员函数本身就违背了我们前面提到的Task需要能够在多种不同Workflow中共享的诉求。但是经过调研,LlamaIndex的数据共享和并行这边支持还算不错。只不过从基于事件驱动模型构建的编程接口在保证了接口易用性的同时也牺牲了编程的灵活性。

Agno

同样还是从例子入手
from agno.workflow import Router, Step, Workflow def route_by_topic(step_input) -> List[Step]: topic = step_input.input.lower() if "tech" in topic: return [Step(name="Tech Research", agent=tech_expert)] elif "business" in topic: return [Step(name="Business Research", agent=biz_expert)] else: return [Step(name="General Research", agent=generalist)] workflow = Workflow( name="Expert Routing", steps=[ Router( name="Topic Router", selector=route_by_topic, choices=[tech_step, business_step, general_step] ), Step(name="Synthesis", agent=synthesizer), ] ) workflow.print_response("Latest developments in artificial intelligence and machine learning", markdown=True)
从这个例子我们可以看到Workflow本身和Task之间的绑定关系是通过指定steps参数确定的。理论上来说定义好一种Task之后我们可以将其用于不同的Workflow中,Agno的设计符合我们的低耦合标准。
但是数据共享和任务并行方面的支持就存在一定的限制。
首先是任务并行,例子如下:
workflow = Workflow( name="Parallel Research Pipeline", steps=[ Parallel( Step(name="HackerNews Research", agent=hn_researcher), Step(name="Web Research", agent=web_researcher), Step(name="Academic Research", agent=academic_researcher), name="Research Step" ), Step(name="Synthesis", agent=synthesizer), # Combines the results and produces a report ] )
Agno专门设计了并行接口,我们需要在静态编译时(Python哪有编译时?😀)明确哪些任务可以并行。但是Agentic GraphRAG最终构造的Workflow有可能是在运行时由模型规划出来的,是动态运行时明确的,出于这样的考量,我们认为Agno的并行特性并不符合我们的要求
接下来是数据共享,Agno框架中支持三种不同的Task:
  1. Agent
  1. 由多个Agent构成的Team
  1. Pure Function
通过看调研时最新版本的Agno源代码,发现Agno支持的状态共享仅限于Agent和Team。那么对于那些适合用Pure Function实现的Task,我们就需要额外支持数据共享的机制。因此Agno的数据共享机制也不是我们所需要的。

Pydantic-Ai

我们从官方文档中就看到
notion imagenotion image
Pydantic-Ai框架竟然不支持Task粒度的自动并行。
和LlamaIndex框架类似采用事件驱动的编程模型,因此Workflow和Task之间不算是完全解耦,但是值得注意的时Pydantic-Ai的Task是可以用到多个不同的Workflow的。

LangGraph

最后的最后,终于还是遇到了LangGraph,之前一直没有调研LangGraph的原因是因为有小伙伴认为LangGraph本身太重了。在上一个版本中,即使只是使用LangGraph的部分功能(调度),也需要拉取LangGraph的完整依赖,引入LangGraph可能会让项目变“重”。同时还时不时在其他开源项目中看到“xxx比LangGraph快xxx倍”诸如此类字眼。所以直到此时此刻才把它提上调研日程。
我们还是来看看LangGraph的例子
class State(TypedDict): topic: str joke: str improved_joke: str # Nodes def generate_joke(state: State): """First LLM call to generate initial joke""" msg = llm.invoke(f"Write a short joke about {state['topic']}") return {"joke": msg.content} def check_punchline(state: State): """Gate function to check if the joke has a punchline""" # Simple check - does the joke contain "?" or "!" if "?" in state["joke"] or "!" in state["joke"]: return "Pass" return "Fail" def improve_joke(state: State): """Second LLM call to improve the joke""" msg = llm.invoke(f"Make this joke funnier by adding wordplay: {state['joke']}") return {"improved_joke": msg.content} # Build workflow workflow = StateGraph(State) # Add nodes workflow.add_node("generate_joke", generate_joke) workflow.add_node("improve_joke", improve_joke) # Add edges to connect nodes workflow.add_edge(START, "generate_joke") workflow.add_conditional_edges( "generate_joke", check_punchline, {"Fail": "improve_joke", "Pass": END} ) workflow.add_edge("improve_joke", END) # Compile chain = workflow.compile() # Invoke state = chain.invoke({"topic": "cats"}
这是一个我简化后的官方文档中的例子,我们可以看到基于GraphAPI的LangGraph通过调用workflow.add_edge指定Workflow的依赖关系,将Workflow和Task解耦。同时支持全局State作为Workflow的状态进行Task之间的数据共享。根据官方文档的说法,LangGraph是支持Task自动并行执行的。我们总算是找到了符合所有要求的Workflow编排框架了!

总结

并行性
低耦合
数据共享
Python Interface
LlamaIndex
支持
不支持
支持
支持
Agno
支持但不符合要求
支持
支持但不符合要求
支持
Pydantic-Ai
不支持
不支持
支持
支持
LangGraph
支持
支持
支持
支持

CGraph —— Graph with Python Interaface Implement in C++

在我们最后决定使用LangGraph之前,金哥提到了一个新的方案——CGraph,这是由开源创作者Chunel使用C++开发的图调度框架,对标taskflow。但是提供了Python接口。由于采用和LangGraph类似的Graph-based接口,因此前面提到的并行性、低耦合、数据共享等特点都符合我们的需求。从程序员鄙视链的角度粗浅评价,我们还是决定使用CGraph,毕竟CPP的性能摆在那里,哈哈。同时CGraph相比于LangGraph的优势在于CGraph专注于任务调度本身,而LangGraph中任务调度只是这个大生态位中的一小部分。最重要的是,当我们和Chunel取得联系之后,我们知道CGraph仍然处于动态发展的过程中,活力本身就是开源项目中最重要的组成部分(欢迎贡献:https://github.com/ChunelFeng/CGraph)。

架构设计

一开始,我们只是想要设计一个新的调度器。后来,我们发现设计一个好的调度器需要和对其调度的对象有足够的了解,就像CPU调度器和GPU调度器由于其调度对象以及生态定位的不同也会采取不同的调度策略。

抽象设计是否合理?

所以我们开始考察那个被我们称之为Workflow的抽象,在上一个设计中,它是由一系列Operator组成的链表。这样的设计否定了并行的可能,那么如果我们说Workflow是一系列Operator组成的DAG图是否合理呢?
直观来说这样的定义很合理,但是实际实践下来,我们发现Workflow中的每个节点(后面我们称之为Node)和Operator一一对应却不是一个好的设计,因为我们需要在不同的请求之间复用Workflow(这样可以节省Workflow构造过程中不可避免的资源创建以及一些DAG图校验带来的性能开销)。
举个例子,向量相似度查询是一个很常见的RAG场景中的流程,但是根据底层向量数据库的不同,由于接口不同,我们可能需要提供FaissVectorSearch、VectraVectorSearch等多种目的相同但是具体实现不同的Operator。如果我们将Operator和Workflow中的Node等同,那么我们对于Workflow的复用机会将大大减少,因为使用Faiss进行搜索和使用Vectra进行搜索的Workflow将会是不同的Workflow,但是如果我们将功能类似的向量索引Operator都封装到VectorSearchNode中,那么我们是不是能够有更多的Workflow复用机会呢?在VectorSearchNode的具体实现中我们只需要根据需要调用对应的Operator即可。通过在Workflow和Operator中间加一层的方式,有下面两个好处:
  1. 新增新的Operator,我们只需要修改对应Node的具体实现即可,不需要修改上层Workflow的逻辑
    1. Operator对Node负责,Node对Workflow负责
  1. 拥有更多的Workflow复用的机会
  1. 通过引入新的Node抽象,我们在重构的过程中不需要修改底层Operator的实现,从某种程度上减轻了重构过程中的心智负担
既然我们希望跨请求复用同类Workflow,那么我们就需要保证Workflow本身是无状态的,因为如果复用的Workflow还带着上一个请求的状态,用户就可能得到发生意料之外的记过。而Workflow的状态可以分为两种:
  1. 用户输入的状态(我们称之为WorkflowInput):这部分由用户的请求构成
  1. Workflow的中间状态(我们称之为WorkflowState):这部分由Workflow中Node执行过程中产生
我们需要保证Workflow执行的过程中这两部分状态是干净的。但是这两种不同的状态使用时机又不同:
  • WorkflowInput需要在Workflow执行前构建好,在Workflow执行之后就不再需要WorkflowInput了
  • WorkflowState在Workflow执行过程中被逐渐写入,我们需要保证Workflow执行前是干净的
通过两种不同状态使用特点的不同,一个在Workflow执行前被使用,一个在Workflow执行过程中被使用。因此我们可以使用CGraph的GParam抽象(CGraph中用于Workflow内部共享状态的抽象)分别定义这两种不同的状态,通过修改GParam相关成员的定义,可以保证WorkflowInput在Workflow执行结束时被重置,保证WorkflowState在执行前重置。这样我们可以保证每次Workflow执行时这两种状态中都只包含本次请求的状态。由于WorkflowInput状态在Workflow执行结束就被重置了,那么我们只能从WorkflowState中有选择性地选择部分数据返回给用户。因此我们得到了一个Flow抽象应该实现的接口。
class BaseFlow(ABC): """ Base class for flows, defines three interface methods: prepare, build_flow, and post_deal. """ @abstractmethod def prepare(self, prepared_input: WkFlowInput, **kwargs): """ 根据用户请求初始化Workflow输入状态 """ @abstractmethod def build_flow(self, **kwargs) -> GPipeline: """ 用来构建可以运行在CGraph之上的Workflow对象 """ @abstractmethod def post_deal(self, **kwargs): """ 从中间状态中组装返回给用户的Response """
那么回到Node抽象本身,Node本身是对某个功能的抽象,其底层可能对应着多种不同的抽象。我们大致需要考虑下面几个问题:
  1. 如何将Node层和Operator层尽可能解耦
  1. Node本身是可以并行的,而Workflow内部存在共享数据,那么如何解决可能存在的并发问题
我们知道Operator的run方法输入输出都是字典(见上面现有Workflow的介绍),为了使得Node层和Operator层尽可能地解耦,我们希望Node层也按照相同的方式调用Operator,因此我们需要为WorkflowState实现一个json序列化方法,在调用Operator前将当前Workflow中间状态转化为字典格式然后交给Operator,为了解决并发访问的问题,我们可以采用MVCC的并发控制方法,保证Operator操作的是多个不同的副本,得到Operator的返回结果之后,在有并发安全的锁保护的情况下将返回的结果同步到WorkflowState中。因此我们可以得到Node的抽象大致如下:
class BaseNode(GNode): // Workflow中间状态 context: Optional[WkFlowState] = None // Workflow输入状态 wk_input: Optional[WkFlowInput] = None def init(self): // 从Pipeline中获取对应状态 return init_context(self) def node_init(self): """ 重写这个方法定制化Node初始化逻辑 """ return CStatus() def run(self): """ Main logic for node execution, can be overridden by subclasses. Returns a CStatus object indicating whether execution succeeded. """ sts = self.node_init() if sts.isErr(): return sts if self.context is None: return CStatus(-1, "Context not initialized") self.context.lock() try: data_json = self.context.to_json() finally: self.context.unlock() res = self.operator_schedule(data_json) self.context.lock() try: if res is not None and isinstance(res, dict): self.context.assign_from_json(res) elif res is not None: log.warning("operator_schedule returned non-dict type: %s", type(res)) finally: self.context.unlock() return CStatus() def operator_schedule(self, data_json) -> Optional[Dict]: """ 根据用户请求或者Workflow状态决定调用哪些Operator """ raise NotImplementedError("Subclasses must implement operator_schedule")
至此,我们完成了调度对象Workflow的抽象设计。

调度器的设计

新的调度器我们需要支持Workflow的复用,避免不必要的资源开销,借助CGraph底层提供的Pool抽象,我们可以将我们构建好的相同的Workflow放到Workflow Pool中,当请求到来时优先检查Pool中是否有空闲Workflow,如果没有空闲Workflow则手动构造一个新的Workflow服务请求并在处理结束之后放回Workflow Pool。
🤔
这里我们只是做了一个Scheduler所需要的最基本的功能:
  1. 面对新请求,调度对应的Workflow进行处理
  1. 在Workflow层面尽可能复用相同类型的Workflow,避免不必要的资源开销
但是在未来,Scheduler值得优化的地方还有很多,比如:
  1. CGraph为每个Workflow Pool提供了资源分配接口,我们可以为每种Workflow指定底层执行的线程池资源。在Scheduler层面我们可以根据不同Workflow工作负载的不同分配不同的线程池资源
  1. 目前的Workflow Pool的实现没有限定Pool所能容纳的Workflow上界,而在生产环境中这样的配置显然是不合理的

Agentic GraphRAG的实现愿景

这部分之所以用愿景一词来描述,是因为经过对当前的common LLM的能力进行简单的估计之后,我们认为给定一系列Node(每个Node底层可能对应多种Operator)和一个用户的请求,然后让LLM基于这些Node的description和用户请求自动规划出一个Workflow似乎还太难。在开始这个项目之前我调研了chat2graph这个项目,该项目中存在一系列抽象:Expert、Operator、Action、Task等等。对于给定的用户请求,有大模型自动规划选择什么样的Expert执行什么样的Operator。大模型的职责覆盖到请求处理的各种细节。但是当我实际运行这个项目并抛出一个问题之后,可能是因为我所使用的模型能力太弱(gemini-2.5-flash),用户的请求无法被正常处理,无法返回令人满意的回复。所以完全由大模型规划的Agentic GraphRAG在目前看来似乎还面临一定的挑战(也可能是我太菜了😀)。
所以我们把目标缩减到目前能够达到的水准——GraphRAG的workflow存在许多可配置项——小到Retrieval的graph item,大到某个Node底层具体执行的Operator。这些都是可以通过配置项修改的。一口吃不成胖子,那就一点点吃成胖子吧。而达到此目的的bottleneck并不在架构设计本身,而是nlp相关的领域问题了。我们可以抽象成下面这个问题
👉🏼
给定一个用户请求,我们怎么通过用户的请求推断出最优配置呢?
举个具体的例子:用户的请求是:告诉我“提出水是万物的本源”的哲学家的徒弟提出的本体论观点。那么采用什么图算法进行检索会使得结果更精确呢?
回答这个问题需要语言和图数据库两个领域的知识。所以目前这个部分采取的是一种比较启发式的实现方式,还有进步的空间。

反思

最后简单总结一下项目过程中的一些技术思考吧
我们选择了CGraph这个graph-based的调度框架,但是基于graph api声明Workflow虽然保证Workflow和Node之间解耦合,但是却也引入了新的复杂度,我们构造Workflow的代码可读性随着各种新特性(如Condition,Region等CGraph-Specific的特性)的使用之后使得代码变得格外复杂。那么是否存在更加友好的接口方式能够满足我们前面提到的要求呢?这是一个值得思考的问题。使用Graph-based api进行Workflow的定义存在的常见问题就是当我们需要定义两种Workflow,其中一种Workflow是另一种Workflow的子集,使用CGraph实现的话或许就是将小的Workflow定义为GRegion然后使用Condition,将两种Workflow“合并”成一种。但是这样的实现无疑是让Workflow的定义变得复杂,或许能不能CGraph底层提供子图包含的能力,我们能够将某种Workflow声明为另一个Workflow的子图等等。
引入了CGraph着实减轻了我的工作量,CGraph自带的Perf功能让性能调试更加简便(虽然bottleneck大多数时候都是在LLM调用的过程),CGraph底层提供的Pooling抽象也让我将更多调度相关的逻辑无感offload到CPP侧。
不过我想最宝贵的还是这一路走来遇到的人和事,他们带来了真正的成长和领悟。

结语

在HugeGraph-AI改头换面时,我们遇到过不少问题,Talk is not cheap, Thinking is necessary。值得一提的是当问题出现时,我们想出的解决方案往往不止一种。不同的解决方案之间存在各种各样的trade off,世界不是二元对立的,这一点也在程序世界里面有所体现。项目告一段落,感谢并肩作战的小伙伴们!感谢Cheems和Chunel!
 
If you have any questions, please contact me.