2026年5月7日 周四晚上19:30,来了解“企业AI训练师:从个人提效到构建企业AI生产力”(限30人)
免费POC, 零成本试错
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


我要投稿

多Agent场景,子agent 之间数据读写不同步,如何解决?

发布日期:2026-05-06 18:54:55 浏览次数: 1519
作者:Zilliz

微信搜一搜,关注“Zilliz”

推荐语

多Agent系统中数据读写不同步?揭秘Milvus四档一致性配置如何解决这一痛点。

核心内容:
1. 单Agent与多Agent系统读写设计的本质差异
2. Milvus写入流程中数据可见性的时间窗口问题
3. 通过guarantee_timestamp参数实现读写同步的解决方案

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家
图片
Agent 系统里,经常会出现一个单 Agent 里从来不会出现的问题:一个子 Agent 刚写完数据,另一个子 Agent 立刻去读,结果是空的。
根本问题出在 Agent 的写-读模式撞上了很多数据库为单 Agent 场景设计的默认一致性配置。
接下来,这篇文章将说清楚这个矛盾从哪来,以及怎么用一行参数解决它。

01 

单 Agent 与多 Agent 的读写设计有何异同?

单 Agent RAG 的工作方式是这样的:用户提出一个问题,Agent 把问题向量化,去 Milvus 检索 Top-K 文档片段,拼成 prompt 喂给模型,模型输出答案。整条链路里,向量数据库是默认只读的——数据在应用启动时、文档更新时已经写好了,推理过程中没有人再继续往里写东西。
但多 Agent 系统里有两类角色:Writer Agent负责执行任务、调用外部工具、发现新信息,把结果 embedding 后写入 Milvus 作为共享记忆;Reader Agent收到协调信号后,从 Milvus 检索最新记忆,基于这些上下文生成下一步行动。
两者是独立的进程或线程,通过消息、回调或事件协调。Writer 写完,立刻通知 Reader,这个间隔是毫秒级的。
在这种情况下,Writer 写完、信号发出、Reader 立刻查,这种模式会导致Reader的查询动作,恰好落在“数据已写入但未对Query Node可见”的时间窗口内,最终返回空结果。
那么,这个时间窗口是怎么产生的,又要如何解决?

02 

Milvus如何用四档一致性控制数据对外可见的时机

出现“写后读空”的关键,在于我们对Milvus的insert()操作存在一个认知误区——insert()返回成功,不代表数据已经可以被查询。
具体来说,Milvus 的写入流程分两段,insert()操作在第一阶段完成后就会返回“成功”,但数据此时只是被写入了消息队列(类似Kafka producer ack的语义)安全落盘,但消费者(Query Node)尚未处理,此时读取自然无法看到新数据。
如图所示,这个“写入成功到数据进入Growing Segment、查询可见”的几十毫秒到几秒的时间差,就是多Agent场景下读空问题的核心诱因。
要想解决这个问题,在Milvus中,我们可以通过guarantee_timestamp来控制数据的可见性:每次search()调用都携带上这个时间戳,Query Node执行查询前会先检查自己使用的数据版本是否追上了这个时间戳?没追上就等待,追上了再执行查询
而我们在代码中设置的consistency_level(一致性级别),本质上就是在控制guarantee_timestamp的设定逻辑。
Milvus提供四档一致性选项,可在创建Collection时设置默认值,也可在每次search()调用时单独覆盖,不同级别对应不同的可见性、性能代价,具体如下:
这里需要重点说明:Milvus创建Collection的默认一致性级别是Bounded,这对单Agent RAG场景是完全合理的——因为单Agent场景没有推理过程中的写入操作,Bounded的5秒窗口不会被触发,既能保证检索性能,又能满足需求,是性能与体验的双赢选择。
但对于Writer写完数据后Reader立即查询的多Agent事件驱动场景,此时查询的guarantee_timestamp如果仍落在Bounded的5秒窗口内,新写入的数据就会不可见,返回空结果。
而解决这个问题的关键,就是将consistency_level从默认的Bounded,切换到适配多Agent场景的strong级别。

03

实验:Bounded 查不到,Strong 查得到

为了直观验证上述结论,我们设计了一组实验:通过模拟生产环境的高写压,让Query Node始终处于数据追赶状态,再执行“写入一条数据后立即查询”的操作,对比Bounded和Strong两种一致性级别的查询结果。

实验设计思路

通过两个机制模拟生产环境的写压,确保Query Node始终处于忙碌的追赶状态:
  • preload预写:提前写入大批量数据,制造WAL(Write-Ahead Log)历史积压;
  • storm writers后台写入:用多个后台线程持续高速写入数据,维持Query Node的追赶压力。
每轮实验中,先写入一条带唯一标记(marker)的记录,然后立即分别用Bounded和Strong级别查询该记录——一旦出现Bounded=0、Strong=1,即判定问题复现成功。
运行前提:pymilvus >= 2.6.0 已安装,Milvus 服务可访问。
#!/usr/bin/env python3import argparseimport itertoolsimport randomimport threadingimport timeimport uuidfrom contextlib import suppressfrom pymilvus import DataType, MilvusClientdef make_vector(seed, dim):    rng = random.Random(seed)    vec = [rng.uniform(-1.01.0for _ in range(dim)]    norm = sum(x * x for x in vec) ** 0.5 or 1.0    return [x / norm for x in vec]def make_records(start_id, count, dim, marker, round_no):    return [        {            "id": start_id + i,            "vector": make_vector(start_id + i, dim),            "marker": marker,            "round": round_no,        }        for i in range(count)    ]def create_collection(client, name, dim):    if client.has_collection(name):        client.drop_collection(name)    schema = client.create_schema(auto_id=False, enable_dynamic_field=False)    schema.add_field("id", DataType.INT64, is_primary=True)    schema.add_field("vector", DataType.FLOAT_VECTOR, dim=dim)    schema.add_field("marker", DataType.VARCHAR, max_length=128)    schema.add_field("round", DataType.INT64)    index_params = client.prepare_index_params()    index_params.add_index(        field_name="vector",        index_type="AUTOINDEX",        metric_type="COSINE",    )    client.create_collection(        collection_name=name,        schema=schema,        index_params=index_params,        consistency_level="Bounded",    )    client.load_collection(name)def search_marker(client, name, vector, marker, consistency, timeout):    result = client.search(        collection_name=name,        data=[vector],        anns_field="vector",        search_params={"metric_type""COSINE"},        filter=f'marker == "{marker}"',        limit=1,        output_fields=["id""marker""round"],        consistency_level=consistency,        timeout=timeout,    )    hits = result[0if result else []    return len(hits), hitsdef writer_storm(uri, name, dim, stop_event, id_counter, batch_size, sleep_seconds):    client = MilvusClient(uri=uri)    while not stop_event.is_set():        start_id = next(id_counter)        records = make_records(start_id, batch_size, dim, "storm", -1)        with suppress(Exception):            client.insert(collection_name=name, data=records)        if sleep_seconds > 0:            time.sleep(sleep_seconds)def main():    parser = argparse.ArgumentParser()    parser.add_argument("--uri", default="http://localhost:19530")    parser.add_argument("--collection", default="")    parser.add_argument("--dim"type=int, default=16)    parser.add_argument("--attempts"type=int, default=200)    parser.add_argument("--bounded-timeout"type=float, default=2.0)    parser.add_argument("--strong-timeout"type=float, default=30.0)    parser.add_argument("--storm-writers"type=int, default=2)    parser.add_argument("--storm-batch-size"type=int, default=2000)    parser.add_argument("--storm-sleep"type=float, default=0.0)    parser.add_argument("--preload"type=int, default=5000)    parser.add_argument("--keep", action="store_true")    args = parser.parse_args()    collection = args.collection or f"consistency_probe_{int(time.time())}_{uuid.uuid4().hex[:8]}"    writer = MilvusClient(uri=args.uri)    bounded_reader = MilvusClient(uri=args.uri)    strong_reader = MilvusClient(uri=args.uri)    stop_event = threading.Event()    storm_threads = []    storm_id_counter = itertools.count(10_000_000, args.storm_batch_size)    print(f"uri={args.uri}")    print(f"collection={collection}")    try:        create_collection(writer, collection, args.dim)        if args.preload > 0:            print(f"preload {args.preload} rows")            writer.insert(                collection_name=collection,                data=make_records(1_000_000, args.preload, args.dim, "preload", -2),            )            _, _ = search_marker(                strong_reader,                collection,                make_vector(1_000_000, args.dim),                "preload",                "Strong",                args.strong_timeout,            )        for _ in range(args.storm_writers):            thread = threading.Thread(                target=writer_storm,                args=(                    args.uri,                    collection,                    args.dim,                    stop_event,                    storm_id_counter,                    args.storm_batch_size,                    args.storm_sleep,                ),                daemon=True,            )            thread.start()            storm_threads.append(thread)        for attempt in range(args.attempts):            marker = f"probe_{attempt}_{uuid.uuid4().hex[:12]}"            record_id = attempt + 1            vector = make_vector(record_id, args.dim)            record = {                "id": record_id,                "vector": vector,                "marker": marker,                "round": attempt,            }            insert_start = time.perf_counter()            writer.insert(collection_name=collection, data=[record])            insert_ms = (time.perf_counter() - insert_start) * 1000            bounded_start = time.perf_counter()            bounded_count, bounded_hits = search_marker(                bounded_reader, collection, vector, marker,                "Bounded", args.bounded_timeout,            )            bounded_ms = (time.perf_counter() - bounded_start) * 1000            strong_start = time.perf_counter()            strong_count, strong_hits = search_marker(                strong_reader, collection, vector, marker,                "Strong", args.strong_timeout,            )            strong_ms = (time.perf_counter() - strong_start) * 1000            print(                f"attempt={attempt:03d} insert={insert_ms:.1f}ms "                f"bounded={bounded_count}({bounded_ms:.1f}ms) "                f"strong={strong_count}({strong_ms:.1f}ms)"            )            if bounded_count == 0 and strong_count > 0:                print("\nREPRODUCED: Bounded missed the just-inserted row, Strong found it.")                print(f"marker={marker}")                print(f"strong_hit={strong_hits[0if strong_hits else None}")                return            if bounded_hits and not strong_hits:                print("Unexpected: Bounded found the row but Strong did not; check service config.")        print("\nNot reproduced. QueryNode likely consumed the insert before each Bounded search.")        print("Try increasing --storm-writers/--storm-batch-size/--attempts, or run against a cluster under write load.")    finally:        stop_event.set()        for thread in storm_threads:            thread.join(timeout=1)        if args.keep:            print(f"kept collection={collection}")        else:            with suppress(Exception):                writer.drop_collection(collection)            print(f"dropped collection={collection}")if __name__ == "__main__":    main()

运行命令(替换uri为自身Milvus服务地址):

python probe.py --uri http://localhost:19530 \--storm-writers 2 \                --storm-batch-size 2000 \                --preload 5000

运行结果:(与文档中报错URL对应的服务地址一致):

uri=http://192.168.4.115:19530collection=consistency_probe_1777278755_71fb2959preload 5000 rowsattempt=000 insert=47.7ms bounded=0(100.7ms) strong=1(171.7ms)REPRODUCED: Bounded missed the just-inserted row, Strong found it.marker=probe_0_96fadc07d29estrong_hit={'id': 1, 'distance': 1.0, 'entity': {'marker': 'probe_0_96fadc07d29e', 'round': 0, 'id': 1}}dropped collection=consistency_probe_1777278755_71fb2959
实验结论
第一次尝试(attempt=000)即复现:bounded=0 说明 Query Node 正忙于消费 storm writers 制造的写入积压,Bounded 的 guarantee_timestamp 落在本次写入之前,新记录对此次查询不可见;strong=1 说明 Strong 强制 Query Node 追赶到全局最新时间戳后再返回,新记录被稳定查到。
其中distance=1.0确认了查询向量与写入向量完全一致,排除了向量不匹配的干扰。这进一步证明:问题的核心不是数据未写入,而是一致性级别导致的数据可见性时序冲突,与原文开头提出的多Agent写后读空问题完全吻合。

04 

不是所有场景都需要 Strong

虽然consistency_level="Strong" 能解决多 Agent 写后立刻读的问题,但它需要等待所有并发写入同步完成,会牺牲一定的性能。
因此,我们无需盲目将所有场景都设置为Strong级别,核心判断标准是:写入和查询之间是否有明确的因果关系,以及查询对数据新鲜度的要求。
结合多Agent常见场景,我们整理了针对性的一致性级别推荐方案,兼顾性能与一致性需求:
有明确因果——Writer 写完触发 Reader 查,流水线上一阶段写完触发下一阶段读,用 Strong。
无固定因果、但必须看到最新——多个 Agent 并发读写共享状态,没有固定上下游,任何人的写入都可能影响其他人的决策。用 Strong,等全局最新。

尾声

在实际开发中,很多人会用time.sleep(N)或无视一致性配置的方式“规避”读空问题,但这两种方式都不可靠:time.sleep靠猜测时间窗口,无法适配不同负载场景;无视一致性则完全靠运气,会导致系统偶发异常,难以排查。
其实,consistency_level参数的作用非常简单:告诉Milvus这次查询需要看到多新的数据。将默认的Bounded改为Strong,就为多Agent的“写后立即读”提供了确定性的可见性保证——这一行参数的差距,就是多Agent系统稳定运行与偶发空结果之间的全部距离。
总结来说:单Agent RAG场景,Milvus的默认配置完全够用;但在多Agent事件驱动场景中,你需要明确告诉Milvus“这次查询要看到最新数据”,通过一行参数调整一致性级别,就能彻底解决写后读空的核心难题。

作者介绍

图片

Zilliz黄金写手:尹珉

阅读推荐
官宣:Zilliz Cloud&Milvus发布CLI工具与官方Skill,让AI Agent成为专业VDB运维与开发助手
OpenClaw的记忆系统,用在企业场景还是太粗糙了
Vector Graph RAG 开源!一套向量数据库同时搞定语义检索+RAG多跳
黄仁勋GTC演讲上,Milvus为什么能够站稳非结构化数据处理C位
用RAG的思路做agent知识管理,为什么跑不通
图片
图片

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询