欢迎阅读 HugeGraph 博客!
这里汇集了 HugeGraph 生态的技术文章、最佳实践和社区动态:
- HugeGraph (OLTP): 图数据库核心功能、性能优化、存储引擎等
- HugeGraph-AI: AI 与图计算结合的应用案例
- HugeGraph Computing (OLAP): 大规模图计算和分析实践
探索更多内容,提升您的图数据库使用体验!
This is the multi-page printable view of this section. Click here to print.
欢迎阅读 HugeGraph 博客!
这里汇集了 HugeGraph 生态的技术文章、最佳实践和社区动态:
探索更多内容,提升您的图数据库使用体验!
ToplingDB是对 RocksDB 的可配置、可观测扩展,支持通过 YAML 文件进行动态调优,并通过内置 Web Server 实现实时监控
修改 hugegraph.properties
backend=rocksdb
serializer=binary
rocksdb.data_path=.
rocksdb.wal_path=.
# 配置文件路径
# 出于安全性考虑, HG中仅允许 yaml 文件位于conf/graphs目录下
rocksdb.option_path=./conf/graphs/rocksdb_plus.yaml
# 是否开启Web Server
rocksdb.open_http=true
初始化数据库(第一次启动时或在 conf/graphs/ 下手动添加了新配置时需要进行初始化)
cd *hugegraph-${version}
bin/init-store.sh
启动 server
bin/start-hugegraph.sh
Starting HugeGraphServer...
Connecting to HugeGraphServer (http://127.0.0.1:8080/graphs)....OK
提示的 url 与 rest-server.properties 中配置的 restserver.url 一致
Web Server 的监听端口在 YAML 文件中通过 http.listening_ports 字段进行配置
检查 hugegraph.properties 中包含:
# 配置文件路径
rocksdb.option_path=./conf/graphs/rocksdb_plus.yaml
# 是否开启Web Server
rocksdb.open_http=true
可视化访问Web监控页面,页面示例如下图所示:

通过终端验证:
# 访问 http://localhost:2011 (端口号取决于 YAML 配置中的 listening_ports)
curl http://localhost:2011 | grep topling
得到以下输出说明页面正常:
<p><a href="https://topling.cn">Topling Inc.</a>This is <strong>Engine Inspector</strong>, for metrics, see <a href='javascript:grafana()'>Grafana</a>!</p>
tail -f logs/hugegraph-server.log | grep -i topling
类似输出说明存储引擎启动为ToplingDB:
2025-10-14 08:56:25 [db-open-1] [INFO] o.a.h.b.s.r.RocksDBStdSessions - SidePluginRepo found. Will attempt to open multi CFs RocksDB using Topling plugin.
2025-10-14 08:56:25 [db-open-1] [INFO] o.a.h.b.s.r.RocksDBStdSessions - Topling HTTP Server has been started according to the listening_ports specified in ./conf/graphs/rocksdb_plus.yaml
启动时有类似日志:
2025-10-15 01:55:50 [db-open-1] [INFO] o.a.h.b.s.r.RocksDBStdSessions - SidePluginRepo found. Will attempt to open multi CFs RocksDB using Topling plugin.
21:1: (891B):ERROR:
sideplugin/rockside/3rdparty/rapidyaml/src/c4/yml/parse.cpp:3310: ERROR parsing yml: parse error: incorrect indentation?
解决方案:
python -c "import yaml; yaml.safe_load(open('conf/graphs/rocksdb_plus.yaml'))"启动时有类似日志:
2025-10-15 01:57:34 [db-open-1] [INFO] o.a.h.b.s.r.RocksDBStdSessions - SidePluginRepo found. Will attempt to open multi CFs RocksDB using Topling plugin.
2025-10-15 01:57:34 [db-open-1] [ERROR] o.a.h.b.s.r.RocksDBStore - Failed to open RocksDB 'rocksdb-data/data/g'
org.rocksdb.RocksDBException: rocksdb::Status rocksdb::SidePluginRepo::StartHttpServer(): null context when constructing CivetServer. Possible problem binding to port.
at org.rocksdb.SidePluginRepo.startHttpServer(Native Method) ~[rocksdbjni-8.10.2-20250804.074027-4.jar:?]
解决方案:
lsof -i :2011listening_ports 配置类似输出说明无法获取数据库锁,可能缺乏写权限,也可能是被数据库被另外一个进程锁定:
Caused by: org.rocksdb.RocksDBException: While lock file: rocksdb-data/data/m/LOCK: Resource temporarily unavailable
at org.rocksdb.SidePluginRepo.nativeOpenDBMultiCF(Native Method)
at org.rocksdb.SidePluginRepo.openDB(SidePluginRepo.java:22)
解决方案:
rocksdb.option_path=./conf/graphs/rocksdb_plus.yamlbin/init-store.sh 2>&1 | tee init.logRocksDB 提供了丰富的参数配置,但大多数情况下,这些配置需要通过硬编码完成。
ToplingDB 在此基础上引入了 SidePlugin + YAML 的方式,使得配置更加模块化和可组合。
本文重点介绍 ToplingDB 扩展参数 的部分,帮助读者理解这些配置的意义。
下文只包括HugeGraph中所使用的配置参数,ToplingDB支持的完整配置请参考:SidePlugin Wiki
http: # Web Server 相关配置
# normally parent path of db path
document_root: /dev/shm/rocksdb_resource # 静态资源目录, HugeGraph中通过`preload_topling.sh`进行静态资源提取
listening_ports: '127.0.0.1:2011' # Web Server监听端口,用于管理/监控 如端口被占用,请改为其他端口,例如 2012 或 2013
setenv: # 环境变量设置
StrSimpleEnvNameNotOverwrite: StringValue
IntSimpleEnvNameNotOverwrite: 16384
OverwriteThisEnv:
#comment: overwrite is default to false
overwrite: true
value: force overwrite this env by overwrite true
Cache: # Cache 相关配置
lru_cache: # 定义一个 LRU 缓存实例
class: LRUCache
params:
capacity: 8G # 缓存容量 8GB
num_shard_bits: -1 # 分片数量,-1 表示自动
strict_capacity_limit: false
high_pri_pool_ratio: 0.5
use_adaptive_mutex: false
metadata_charge_policy: kFullChargeCacheMetadata # 元数据也计入缓存容量
Statistics: # 数据采样配置
stat:
class: default
params:
discard_tickers: # 丢弃的统计计数器,减少开销
- rocksdb.block.cache
- rocksdb.block.cachecompressed
- rocksdb.block
- rocksdb.memtable.payload.bytes.at.flush
- rocksdb.memtable.garbage.bytes.at.flush
- rocksdb.txn
- rocksdb.blobdb
- rocksdb.row.cache
- rocksdb.number.block
- rocksdb.bloom.filter
- rocksdb.persistent
- rocksdb.sim.block.cache
discard_histograms: # 丢弃的直方图统计项
# comment: ....
- rocksdb.blobdb
- rocksdb.bytes.compressed
- rocksdb.bytes.decompressed
- rocksdb.num.index.and.filter.blocks.read.per.level
- rocksdb.num.data.blocks.read.per.level
- rocksdb.compression.times.nanos
- rocksdb.decompression.times.nanos
- rocksdb.read.block.get.micros
- rocksdb.write.raw.block.micros
# comment end of array
#stats_level: kAll
stats_level: kDisableAll # 禁用所有统计
MemTableRepFactory: # 内存中 memtable 的实现
cspp: # ToplingDB 独有的高并发内存结构
class: cspp
params:
mem_cap: 16G # 预分配足够的单块内存地址空间,这些内存可以只是保留地址空间,但并未实际分配 对物理内存并无要求,只是为CSPP保留虚拟内存空间
use_vm: false
token_use_idle: true
chunk_size: 16K # 内部分配粒度
convert_to_sst: kFileMmap # 直接将 MemTable 转化为 SST,省去 Flush,可选值:{kDontConvert, kDumpMem, kFileMmap}
sync_sst_file: false # convert_to_sst 为 kFileMmap 时,SST 转化完成后是否执行 fsync
skiplist: # RocksDB 默认的跳表结构
class: SkipList
params:
lookahead: 0
TableFactory:
cspp_memtab_sst:
class: CSPPMemTabTable # 与 cspp 配套的 TableFactory
params: # empty params
bb:
class: BlockBasedTable # RocksDB 默认的块表
params:
checksum: kCRC32c
block_size: 4K
block_restart_interval: 16
index_block_restart_interval: 1
metadata_block_size: 4K
enable_index_compression: true
block_cache: "${lru_cache}" # 使用上面定义的 LRU 缓存
block_cache_compressed:
persistent_cache:
filter_policy:
dispatch:
class: DispatcherTable
params:
default: bb # 默认使用 BlockBasedTable
readers:
BlockBasedTable: bb
CSPPMemTabTable: cspp_memtab_sst
level_writers: [ bb, bb, bb, bb, bb, bb ] # 支持自定义各层写入策略
CFOptions:
default:
max_write_buffer_number: 6
memtable_factory: "${cspp}" # 引用上方定义的cspp,使用 cspp 作为 MemTable
write_buffer_size: 128M
# set target_file_size_base as small as 512K is to make many SST files,
# thus key prefix cache can present efficiency
target_file_size_base: 64M
target_file_size_multiplier: 1
table_factory: dispatch # 引用上方定义的 dispatch, 使用 DispatcherTable Class
max_bytes_for_level_base: 512M
max_bytes_for_level_multiplier: 10
level_compaction_dynamic_level_bytes: false
level0_slowdown_writes_trigger: 20
level0_stop_writes_trigger: 36
level0_file_num_compaction_trigger: 2
merge_operator: uint64add # support merge
level_compaction_dynamic_file_size: true
optimize_filters_for_hits: true
allow_merge_memtables: true
min_write_buffer_number_to_merge: 2
compression_per_level:
- kNoCompression
- kNoCompression
- kSnappyCompression
- kSnappyCompression
- kSnappyCompression
- kSnappyCompression
- kSnappyCompression
DBOptions:
dbo:
create_if_missing: true
create_missing_column_families: false # this is important, must be false to hugegraph
max_background_compactions: -1
max_subcompactions: 4
max_level1_subcompactions: 0
inplace_update_support: false
WAL_size_limit_MB: 0
statistics: "${stat}" # 使用上面定义的统计配置
max_manifest_file_size: 100M
max_background_jobs: 8 # 设置flush和compaction线程总数 建议设置为 (cpu核数 / 2)
compaction_readahead_size: 0
memtable_as_log_index: true # 此配置结合 convert_to_sst: kFileMmap 可实现[omit L0 Flush](https://github.com/topling/toplingdb/wiki/Omit-L0-Flush)
关键要点:
listening_ports: '127.0.0.1:2011' 指定Web Server监听端口为2011,并且仅允许本地访问memtable_as_log_index: true 与 convert_to_sst: kFileMmap 结合实现omit L0 Flushmemtable_factory: "${cspp}" 指定了内存结构采用CSPP Memtabletable_factory: dispatch 指定了TableFactory使用YAML中自定义的DispatcherTable结构${lru_cache}、${cspp} 等方式在不同段落复用对象。ToplingDB YAML 引用与复用图示:

这种机制使得配置更灵活,便于在复杂场景下组合不同组件。
ToplingDB 提供了一个 RocksDB 原生没有的 MemTable 类型,通过以下参数配置:
mem_cap 是指在内存地址空间中,为 CSPP 预留的空间大小,这些内存可以只是保留地址空间,并未实际分配的。
mem_cap 真正占用的内存大小约为 write_buffer_size 。
CSPP 的底层算法为了支持高并发写入,采用了预分配内存的策略。
当预分配的内存被写满时,新的写入操作将无法继续。
然而,RocksDB 本身缺乏一种机制,使得 memtable 能够主动反馈 ‘预分配内存已满,需要切换到新的 memtable’ 。
由于其函数调用链路复杂,难以通过重构来实现这一机制,因此 CSPP 只能通过参数设计来适配 RocksDB 的行为。
ToplingDB 将 mem_cap 设置为远大于 write_buffer_size,从而避免 RocksDB 在向 Memtable 写入时过早触发“内存已满”的错误。
在 CSPP 初始化(New)时,会再次检查,如果发现 mem_cap 设置过小,则会自动调整为 2 * write_buffer_size,以确保写入过程的稳定性。
mem_cap 默认值为 2G,有效最大值为 16G。
在使用 malloc/posix_memalign 分配内存时,地址空间可能是已经实际分配的(位于堆空间中,已有对应的物理页面),而 CSPP 在分配时只需要获得保留的地址空间。
use_vm 选项为 true 时会强制使用 mmap 分配内存,从而保证分配的一定时是保留地址空间,但并不实际占用物理页面。
use_vm 默认值为 true。如果用户物理内存空间充足,建议关闭此选项,mmap 分配的虚拟内存空间在建立对物理地址的映射时会触发大量minor page fault,可能会影响性能。
convert_to_sst 支持以下三种枚举值:
kDontConvert: 禁用该功能,为默认值。使用传统的 Flush 流程,兼容性最好,适合对稳定性要求高的场景kDumpMem: 转化时将 MemTable 的整块内存写入 SST 文件,避免 CPU 消耗,但未降低内存消耗kFileMmap: 将 MemTable 内容 mmap 到文件,这是关键功能,同时降低 CPU 和内存消耗,可同时将 DBOptions.memtable_as_log_index 设为 true 从本质上消除 MemTable Flush这些参数为数据写入路径提供了更多可调节空间,用户可按需选择。
更多设计细节请参考 ToplingDB 作者的撰写的相关博客:cspp-memtable, ToplingDB CSPP MemTable Design Essentials, CSPP Trie Design Analysis
cspp MemTable 配套的 TableFactory。这类灵活性在 RocksDB 原生配置中并不存在。
相比 RocksDB 的粗粒度控制,ToplingDB 提供了更细的调节方式。
Web Server 安全配置:
listening_ports: '127.0.0.1:2011' 仅限本地访问hugegraph.properties 中设置 rocksdb.open_http=false共享内存安全:
document_root: /dev/shm/rocksdb_resource 使用共享内存目录ToplingDB 在 RocksDB 的基础上增加了以下能力:
这些扩展为用户提供了更多的可调节空间,尤其适合需要高写入性能和灵活运维的场景。
为了应对模型训练数据和现实生活中实际数据之间存在的时效性差异问题,RAG技术应运而生。RAG,顾名思义就是通过向外部数据源获取对应的数据(Retrieval),用于增强(Argument)大模型生成(Generation)回答质量的技术。
最早的RAG采用简单的Retrieval - Generation架构,我们拿到用户给出的问题,进行一定的预处理(关键词提取等等),得到预处理之后的问题,接着通过Embedding Model从海量资料中抓取相关的资料作为Prompt交给大模型用于增强模型回答的质量。
但是基于语义相似性匹配进行相关语料的抓取未必能够处理所有情况,因为能够用于增强回答质量的语料不一定和问题本身存在语义相似性。一个常见的例子就是:**告诉我“提出水是万物的本源”的哲学家的徒弟提出的本体论观点。**而我们的语料中并不直接存在这个问题的答案,语料库中可能提到:
如果单纯从语义相似度匹配出发,我们大概率只能retrieval到第一个句子用于增强大模型的回答。但是缺失语料2和语料3的情况下,如果我们所使用的大模型训练语料中没有哲学相关知识,在缺失这些关键信息的情况下,大模型将无法正确回答这些问题,甚至会出现“幻觉”。
因此GraphRAG技术诞生了,常见的GraphRAG包含两个步骤:

但是GraphRAG本身也存在几个问题:
本次项目主要针对第三个问题展开。我们希望借助大模型的泛化能力使其自动识别用户问题中的意图,然后选择合适配置(比如选择最合适的图算法)从图数据库中读取对应的数据用于增强大模型回答质量——也就是本次项目Agentic GraphRAG的目的所在。
现在的HugeGraph-AI项目中存在两个核心抽象:
由于Operator的实现需要遵循下面的接口:
class Operator:
@abstractmethod
def run(context: dict[str, Any]) -> dict[str,Any]:
return {}
Operator在实际运行时接受字典类型的context对象作为输入,返回的对象也是一个字典,可以用来作为下一个Operator的输入,这样的设计有一个很高明的地方——他将不同的Operator之间的依赖关系和Operator本身的具体实现解耦了,每个Operator是一个相对独立的存在,如果Operator A需要依赖Operator B的输出,那么只需要检查context对象中是否存有Operator B的输出即可。这是一种低耦合的设计。好处是我们能很方便地将不同的Operator自由组合。根据不同的用户输入组装(配置)合适Workflow Serving用户请求,那不正是我们在项目背景中提到的Agentic GraphRAG的目的所在吗?
👉🏼 理论上现有设计已经可以正常过渡到Agentic GraphRAG,但是现有设计存在诸多悬而未决的问题:
1. 现有调度器仅仅支持链状Workflow,缺失了可能存在的并行空间
2. 现有调度器无法复用被反复使用到的Workflow
之前的调度器给我们的启发是Operator粒度的解耦是一个不错的设计理念,但是调度器本身能力有限,限制了Workflow的能力。因此我们计划替换项目中的调度器!经过对几种不同的Workflow编排框架进行简单的调研之后,我们认为下面几个特性是我们筛选调度器的标准。(下面我们统一将框架编排对象称为Workflow,Workflow由一系列Task组成)
我们首先将目光放到了现在炙手可热的AI Workflow调度框架。围绕前面提到的几个维度,我们分别调研了下面几种不同的Workflow编排框架——LlamaIndex,Agno,Pydantic-Ai,LangGraph。
对于LlamaIndex,我们用一个常见的例子说明LlamaIndex这个框架的设计理念。
from workflows import Workflow, Context, step
from workflows.events import StartEvent, StopEvent, Event
class StepEvent(Event):
message: str
class MyWorkflow(Workflow):
@step
async def step_one(self, ctx: Context, ev: StartEvent) -> StepEvent:
current_count = await ctx.store.get("count", default=0)
current_count += 1
await ctx.store.set("count", current_count)
print("step one called once")
return StepEvent("launch step two")
@step
async def step_two(self, ctx: Context, ev: StepEvent) -> StopEvent:
print("step two called once")
return StopEvent()
从上面这个简单的例子我们可以看到很多问题。首先明确一个观念:Workflow由两个元素构成:Task,Task之间的依赖关系。只要这两个元素确定之后一个Workflow就确定下来了。我们可以看到LlamaIndex中每个Task(对应代码中用@step注解的函数)的实现和Workflow存在依赖关系。因为每个Task的实现都需要传入Event对象作为参数,但是Event参数其实就是对Task之间依赖关系的一种限定。所以LlamaIndex不具备低耦合的特点。同时我们也发现Task作为Workflow类成员函数本身就违背了我们前面提到的Task需要能够在多种不同Workflow中使用的诉求。但是经过调研,LlamaIndex的数据共享和并行特性支持还算不错。只不过从基于事件驱动模型构建的编程接口在保证了接口易用性的同时也牺牲了编程的灵活性。
同样还是从例子入手
from agno.workflow import Router, Step, Workflow
def route_by_topic(step_input) -> List[Step]:
topic = step_input.input.lower()
if "tech" in topic:
return [Step(name="Tech Research", agent=tech_expert)]
elif "business" in topic:
return [Step(name="Business Research", agent=biz_expert)]
else:
return [Step(name="General Research", agent=generalist)]
workflow = Workflow(
name="Expert Routing",
steps=[
Router(
name="Topic Router",
selector=route_by_topic,
choices=[tech_step, business_step, general_step]
),
Step(name="Synthesis", agent=synthesizer),
]
)
workflow.print_response("Latest developments in artificial intelligence and machine learning", markdown=True)
从这个例子我们可以看到Workflow本身和Task之间的绑定关系是通过指定steps参数确定的。理论上来说定义好一种Task之后我们可以将其用于不同的Workflow中,Agno的设计符合我们的低耦合标准。
但是数据共享和任务并行方面的支持就存在一定的限制。
首先是任务并行,例子如下:
workflow = Workflow(
name="Parallel Research Pipeline",
steps=[
Parallel(
Step(name="HackerNews Research", agent=hn_researcher),
Step(name="Web Research", agent=web_researcher),
Step(name="Academic Research", agent=academic_researcher),
name="Research Step"
),
Step(name="Synthesis", agent=synthesizer), # Combines the results and produces a report
]
)
Agno专门设计了并行接口,我们需要在静态编译时(Python哪有编译时?应该叫写代码的时候哈哈😀)明确哪些任务可以并行。但是Agentic GraphRAG最终构造的Workflow有可能是在运行时由模型规划出来的,是动态运行时明确的,出于这样的考量,我们认为Agno的并行特性并不符合我们的要求
接下来是数据共享,Agno框架中支持三种不同的Task:
我们检查了调研时最新版本的Agno源代码,发现Agno支持的状态共享仅限于Agent和Team。那么对于那些适合用Pure Function实现的Task,我们就需要额外支持数据共享的机制。因此Agno的数据共享机制也不符合我们的要求。
我们从官方文档中就看到

Pydantic-Ai框架竟然不支持Task粒度的自动并行。
和LlamaIndex框架类似采用事件驱动的编程模型,因此Workflow和Task之间不算是完全解耦,但是值得注意的时Pydantic-Ai的Task是可以用到多个不同的Workflow的。
最后的最后,终于还是遇到了LangGraph,之前一直没有调研LangGraph的原因是因为由团队伙伴认为LangGraph本身太重了。在上一个版本中,即使只是使用LangGraph的部分功能(调度),也需要引入LangGraph的完整依赖,引入LangGraph可能会让项目变“重”。时不时在其他开源项目中看到“xxx比LangGraph快xxx倍”诸如此类的字眼也确实影响到我们的决策判断。所以直到此时此刻才把它提上调研日程。
我们还是来看看LangGraph的例子
class State(TypedDict):
topic: str
joke: str
improved_joke: str
# Nodes
def generate_joke(state: State):
"""First LLM call to generate initial joke"""
msg = llm.invoke(f"Write a short joke about {state['topic']}")
return {"joke": msg.content}
def check_punchline(state: State):
"""Gate function to check if the joke has a punchline"""
# Simple check - does the joke contain "?" or "!"
if "?" in state["joke"] or "!" in state["joke"]:
return "Pass"
return "Fail"
def improve_joke(state: State):
"""Second LLM call to improve the joke"""
msg = llm.invoke(f"Make this joke funnier by adding wordplay: {state['joke']}")
return {"improved_joke": msg.content}
# Build workflow
workflow = StateGraph(State)
# Add nodes
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
# Add edges to connect nodes
workflow.add_edge(START, "generate_joke")
workflow.add_conditional_edges(
"generate_joke", check_punchline, {"Fail": "improve_joke", "Pass": END}
)
workflow.add_edge("improve_joke", END)
# Compile
chain = workflow.compile()
# Invoke
state = chain.invoke({"topic": "cats"}
这是一个我简化后的官方文档中的例子,我们可以看到基于GraphAPI的LangGraph通过调用workflow.add_edge指定Workflow的依赖关系,将Workflow和Task解耦。同时支持全局State作为Workflow的状态进行Task之间的数据共享。根据官方文档的说法,LangGraph是支持Task自动并行执行的。我们总算是找到了符合所有要求的Workflow编排框架了!
| 并行性 | 低耦合 | 数据共享 | Python Interface | |
|---|---|---|---|---|
| LlamaIndex | 支持 | 不支持 | 支持 | 支持 |
| Agno | 支持但不符合要求 | 支持 | 支持但不符合要求 | 支持 |
| Pydantic-Ai | 不支持 | 不支持 | 支持 | 支持 |
| LangGraph | 支持 | 支持 | 支持 | 支持 |
正当我们将目光聚焦于 LangGraph 时,团队伙伴提到了一个新的方案——CGraph,这是由开源创作者Chunel使用C++开发的图调度框架,对标SOTA任务调度框架——taskflow。CGraph,学名Color Graph,虽然是C++项目,但是它很贴心地提供了Python接口。深入了解后,我们发现 CGraph 的设计理念与我们不谋而合:和LangGraph一样,CGraph基于图的声明式 API,完美支持我们所需的并行化、低耦合和数据共享需求。
如果说“C++ 站在编程语言鄙视链顶端”是个有趣的玩笑,那它背后反映的其实是程序员对底层性能的极致追求。除去这个“先天”优势, CGraph 相比 LangGraph 最大的不同,在于它的纯粹——它不构建庞大的生态,只专注于将“任务调度”这一件事做到极致。
然而,真正让我们下定决心的,是这个项目的“心跳”。我们联系上了作者 Chunel,感受到了 CGraph 作为一个项目旺盛的生命力。在开源世界里,活力即未来。一个持续进化、积极响应的社区,远比一个功能冻结的庞然大物更值得信赖。
我们相信,优秀的技术选型,不仅是功能的匹配,更是对项目未来潜力的认同。(欢迎一同见证它的成长:https://github.com/ChunelFeng/CGraph)

起初,我们的目标很纯粹:基于CGraph,打造一个属于我们自己的调度器。然而深入思考之后,我们发现:一个好的调度器,源于对调度对象的深刻理解(是时候拷问自己了🤣)。就像CPU调度器和GPU调度器由于其调度对象以及生态定位的不同也会采取不同调度策略。
所以我们开始考察那个被我们称之为Workflow的抽象,在上一个设计中,它是由一系列Operator组成的链表。这样的设计否定了并行的可能,那么如果我们说Workflow是一系列Operator组成的DAG图是否合理呢?
直观来说这样的定义很合理,但是实际实践下来,我们发现Workflow中的每个节点(后面我们称之为Node)和Operator一一对应却不是一个好的设计,因为我们需要在不同的请求之间复用Workflow(这样可以节省Workflow构造过程中不可避免的资源创建以及一些DAG图校验带来的性能开销)。
举个例子,向量相似度查询是一个很常见的RAG流程,但是根据不同底层向量数据库暴露的接口不同,我们可能需要提供FaissVectorSearch、VectraVectorSearch等多种目的相同但是具体实现不同的Operator。如果我们将Operator和Workflow中的Node等同,那么我们对于Workflow的复用机会将大大减少,因为使用Faiss进行搜索和使用Vectra进行搜索的Workflow将会是不同的Workflow,但是如果我们将功能类似的向量索引Operator都封装到VectorSearchNode中,那么我们是不是能够有更多的Workflow复用机会呢?在VectorSearchNode的具体实现中我们只需要根据需要调用对应的Operator即可。通过在Workflow和Operator中间加一层的方式,有下面三个好处:

既然我们希望跨请求复用同类Workflow,那么我们就需要保证Workflow本身是无状态的,因为如果复用的Workflow还带着上一个请求的状态,用户就可能得到发生意料之外的结果。而Workflow的状态可以分为两种:
我们需要保证Workflow执行的过程中这两部分状态是干净的。但是这两种不同的状态使用时机又不同,这也就决定了他们截然不同的生命周期:
我们利用 CGraph 框架提供的 GParam(全局参数)抽象来实现精细化的状态隔离:
GParam 上下文。这样我们可以保证每次Workflow执行时这两种状态中都只包含本次请求的状态。由于WorkflowInput状态在Workflow执行结束就被重置了,我们只能从WorkflowState中有选择性地选择部分数据返回给用户。因此我们得到了一个Flow抽象应该实现的接口。
class BaseFlow(ABC):
"""
Base class for flows, defines three interface methods: prepare, build_flow, and post_deal.
"""
@abstractmethod
def prepare(self, prepared_input: WkFlowInput, **kwargs):
"""
根据用户请求初始化Workflow输入状态(WkFlowInput)
"""
@abstractmethod
def build_flow(self, **kwargs) -> GPipeline:
"""
用来构建可以运行在CGraph之上的Workflow对象
"""
@abstractmethod
def post_deal(self, **kwargs):
"""
从中间状态(WkFlowState)中组装返回给用户的Response
"""
那么回到Node抽象本身,Node本身是对某个功能的抽象,其底层可能对应着多种不同的抽象。我们大致需要考虑下面几个问题:
我们知道Operator的run方法输入输出都是字典(见上面现有Workflow架构的介绍),为了使得Node层和Operator层尽可能地解耦,我们希望Node层也按照相同的方式调用Operator,因此我们需要为WorkflowState实现一个json序列化方法,在调用Operator前将当前Workflow中间状态转化为字典格式然后交给Operator,然后将Operator执行结果重新反序列化为WorkflowState。为了解决并发访问带来的数据竞争问题,我们可以采用MVCC的并发控制方法,保证Operator操作的是多个不同的副本,得到Operator的返回结果之后,在有并发安全的锁保护的情况下将返回的结果同步到WorkflowState中。因此我们可以得到Node的抽象大致如下:
class BaseNode(GNode):
// Workflow中间状态
context: Optional[WkFlowState] = None
// Workflow输入状态
wk_input: Optional[WkFlowInput] = None
def init(self):
// 从Pipeline中获取对应状态
return init_context(self)
def node_init(self):
"""
重写这个方法定制化Node初始化逻辑
"""
return CStatus()
def run(self):
"""
Main logic for node execution, can be overridden by subclasses.
Returns a CStatus object indicating whether execution succeeded.
"""
sts = self.node_init()
if sts.isErr():
return sts
if self.context is None:
return CStatus(-1, "Context not initialized")
self.context.lock()
try:
data_json = self.context.to_json()
finally:
self.context.unlock()
res = self.operator_schedule(data_json)
self.context.lock()
try:
if res is not None and isinstance(res, dict):
self.context.assign_from_json(res)
elif res is not None:
log.warning("operator_schedule returned non-dict type: %s", type(res))
finally:
self.context.unlock()
return CStatus()
def operator_schedule(self, data_json) -> Optional[Dict]:
"""
根据用户请求或者Workflow状态决定调用哪些Operator
"""
raise NotImplementedError("Subclasses must implement operator_schedule")
至此,我们完成了调度对象Workflow的抽象设计。
在新的调度器体系中,Workflow 对象的实例化与销毁是一项不可忽视的资源开销。为了最大限度地降低延迟、减少内存抖动,我们引入了 Workflow Pool 机制,旨在实现 Workflow 实例的高效复用。 我们充分利用了 CGraph 框架底层提供的 Pool 抽象,并将其应用于 Workflow 的生命周期管理。其核心机制如下:
🤔 当前版本的 Scheduler 实现了其最核心的职责,为整个系统提供了一个稳定且高效的调度基座。其主要特性包括:
1. 能够根据请求类型,准确地调度对应的 Workflow 实例进行处理。
2. 内置了基于 Workflow 类型的池化(Pooling)机制,通过复用已有实例,显著降低了高并发场景下的对象创建与销毁开销。
但是在未来,Scheduler值得优化的地方还有很多,比如:
1. 工作负载感知的资源隔离:并非所有 Workflow 的资源消耗都是均等的。我们将利用 CGraph 提供的线程池绑定功能,为不同类型的 Workflow Pool 分配专属的计算资源。
2. 生产级容量管理与稳定性:为每个 Workflow Pool 引入可配置的容量上限。在生产环境中,无限制的资源池是潜在的风险点。通过设置池大小的上限,我们可以防止系统在极端负载下耗尽内存,确保服务的稳定性。
现在我们可以得到整个项目的整体架构图

项目之初,我们畅想一个完全“Agentic”的 GraphRAG。你只需告诉它你的问题,一个强大的 LLM 就能像一位资深架构师,从一堆工具(Nodes/Operators)中为你量身定制一套最高效的执行流程(Workflow)。 但理想与现实之间总有距离。我们深入调研了 chat2graph 等类似项目,并实际动手检验效果。结果发现,让 LLM 凭空“创造”一个完美的、无懈可击的执行计划,比我们想象的要难得多。即使是使用 Gemini 2.5 Flash 这样的模型,它在理解复杂指令并将其精确映射到一系列操作时,也时常力不从心。一方面,因为chat2graph的调用链可能会很长,即便大模型推理单步准确率高达95%,但随着调用链路的延长,误差会逐级累积,导致整体任务的成功率急剧下降。另一方面,即使调用链的问题不存在,通过prompt的方式让模型自动规划的准确度目前仍然不能使人满意。完全依赖大模型进行零样本工作流规划的时代,或许还未完全到来。 “一口吃不成胖子”,于是我们做出了一个关键决策:让大模型从零开始构建工作流存在阻碍,但是如果让大模型基于Workflow模板“实例化”具体的Workflow或许是一种可行的方案。我们构建了一个功能强大、高度模块化的 GraphRAG 工作流,而 LLM 的任务,就是根据观众用户请求的特点,为Workflow选择最优配置。这是我们在Agentic GraphRAG和GraphRAG之间找到的一个平衡点。 这让我们的核心问题发生了转变,不再是“如何从零构建”,而是:
我们如何从用户的寥寥数语中,让大模型知道什么是GraphRAG的最优配置?
举个具体的例子:
👉🏼 给定一个用户请求,我们怎么通过用户的请求推断出最优配置呢?
具体例子:用户的请求是:告诉我“提出水是万物的本源”的哲学家的徒弟提出的本体论观点。那么采用什么图算法进行知识图谱检索会使得结果更精确呢?
综上所述,LLM需要站在自然语言处理和图数据库这两座大山上才能真正窥见问题全貌,而他现在还处于山脚🤣。而这,就是我们下一座亟待翻越的高山。
在本项目中,我们采用了 CGraph 作为底层的图调度框架。其基于 Graph API 的设计,成功地将 Workflow 的定义与各个 Node 的具体实现解耦,赋予了系统良好的扩展性。
然而,我们也注意到,这种声明式 API 在带来灵活性的同时,也引入了新的编程复杂度。随着我们越来越多地使用 Condition、Region 等 CGraph 的高级特性,用于构建 Workflow 的代码可读性和可维护性开始面临挑战。这引发了我们的一个核心思考:是否存在一种更为直观和用户友好的接口,既能保持框架的灵活性,又能降低定义复杂流程时的认知负担?
举一个具体的例子:当我们需要定义一个主 Workflow 和一个作为其子集的子 Workflow 时,目前的实现方式倾向于将子 Workflow 封装为 GRegion,并利用 Condition 节点来控制其执行。这种“合并”式的定义方式虽然功能上可行,但无疑增加了 Workflow 的内在复杂度和理解难度。我们设想,如果 CGraph 框架能原生提供子图组合 (Subgraph Composition) 的能力,允许开发者将一个 Workflow 显式声明为另一个 Workflow 的一部分,或许能从根本上简化这类场景的实现。
尽管存在上述挑战,CGraph 框架在项目中依然发挥了重要作用,显著提升了我们的工程效率:
最后,我想说,技术上的探索固然重要,但这段旅程中最宝贵的财富,是与优秀的同行者共事的经历以及从中获得的成长与感悟。这才是推动我们不断前行的真正动力。
在 HugeGraph-AI 的重构与革新过程中,AI工具的引入大大提升了开发效率,但最终的决策和取舍,仍然需要严谨的思考,Talk is not cheap, thinking is necessary。每当遇到新的技术难题,我们总能提出多种解决方案,而真正的考验在于如何在它们之间进行权衡 (trade-off)。技术世界并非非黑即白,不同的架构选择、方案取舍,往往对应着不同的目标用户与应用场景。我们对 CGraph 的选择,正是这一理念的最佳实践。它并非业界首个图调度框架,但在性能、灵活性与开发成本等诸多维度的考量中,它为我们提供了最契合当前需求的平衡点——在这条充满权衡的道路上,它离我们最近。
这也让我们更加坚信:一个产品或技术最终被选择,往往不只因其理念的先进,更在于它能否在复杂的现实约束下,为特定问题提供最务实的解法。
项目暂告一段落,由衷感谢每一位并肩作战的伙伴!这段经历因你们而精彩。