支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


ragflow架构解析及性能优化方式

发布日期:2025-05-20 14:16:27 浏览次数: 1547 作者:开源技术人
推荐语

深入了解RagFlow架构,提升数据处理与检索性能。

核心内容:
1. RagFlow数据处理模块职能与核心流程解析
2. DocumentService关键类与方法介绍
3. 性能优化策略与二次开发建议

杨芳贤
53A创始人/腾讯云(TVP)最具价值专家

目前好用的rag系统有非常多,而其中ragflow就是最有代表性的佼佼者,如果把ragflow用到自己的生产系统中,有哪些需要基本了解的点呢,趁着周末打开了很久之前写的一个内容,把我的理解和各位朋友分享一下。由于ragflow比较大,我把之前写的解析数据处理、Retriever和Embedding模块的设计与实现逻辑的这一部分给修改出来。感兴趣的朋友们可以基于此作性能优化或二开。

数据处理模块

模块职能与流程: ragflow 的数据处理模块负责将原始资料(文档、图片、音频等)加载并预处理成适合检索的文本“切片”。其核心流程包括:数据加载(读取文件内容)、解析分块(根据文件类型提取文本和相关信息,按策略切分成多个片段)、向量化嵌入(对每个片段生成语义向量)、以及持久化存储(将片段及向量写入向量索引或数据库)。这一流程主要由DocumentService等服务类配合各类型文件的解析器完成。

关键类与方法:DocumentServiceapi/db/services/document_service.py)是数据处理的核心调度者。它利用解析器工厂根据文档类型选择相应的解析器,对每个上传的文档进行并行处理。DocumentService维护一个解析器映射表(如FACTORY),将文件的parser_id映射到具体解析器模块,如Word/PDF使用通用文本解析,PPT使用presentation解析,图片使用picture解析,音频使用audio解析等。

FACTORY = {        ParserType.PRESENTATION.value: presentation,        ParserType.PICTURE.value: picture,        ParserType.AUDIO.value: audio,        ParserType.EMAIL.value: email    }    parser_config = {"chunk_token_num": 4096, "delimiter": "\n!?;。;!?", "layout_recognize": "Plain Text"}    exe = ThreadPoolExecutor(max_workers=12)    threads = []...for d, blob in files:        kwargs = {            "callback": dummy,            "parser_config": parser_config,            "from_page": 0,            "to_page": 100000,            "tenant_id": kb.tenant_id,            "lang": kb.language        }        threads.append(exe.submit(FACTORY.get(d["parser_id"], naive).chunk, d["name"], blob, **kwargs))

每种解析器实现一个统一的chunk接口,负责将输入文件解析成若干带有元数据的文本块。例如,PPT 解析器会提取每页幻灯片的文本和缩略图,PDF 解析器则按照页码拆分文本并结合OCR提取扫描图片。

if re.search(r"\.pptx?$", filename, re.IGNORECASE):        ppt_parser = Ppt()        for pn, (txt, img) in enumerate(ppt_parser(                filename if not binary else binary, from_page, 1000000, callback)):            d = copy.deepcopy(doc)            pn += from_page            d["image"] = img            d["page_num_int"] = [pn + 1]            d["top_int"] = [0]            d["position_int"] = [(pn + 10, img.size[0], 0, img.size[1])]            tokenize(d, txt, eng)            res.append(d)        return reselif re.search(r"\.pdf$", filename, re.IGNORECASE):        pdf_parser = Pdf()        if kwargs.get("layout_recognize""DeepDOC") == "Plain Text":            pdf_parser = PlainParser()        for pn, (txt, img) in enumerate(pdf_parser(filename, binary,                                                   from_page=from_page, to_page=to_page, callback=callback)):            d = copy.deepcopy(doc)            pn += from_page            if img:                d["image"] = img            d["page_num_int"] = [pn + 1]            d["top_int"] = [0]            d["position_int"] = [(pn + 10, img.size[0if img else 00, img.size[1if img else 0)]            tokenize(d, txt, eng)            res.append(d)        return res

在处理流程中,DocumentService会启动一个线程池(默认最多12线程)并发执行多个文档的解析。每个线程调用相应解析器的chunk方法将文档切分成片段列表,并返回包含文本内容(以及可能的权重、图像等)的字典结构。例如,对于文本型文档,解析器会对段落或页进行切分,并调用rag_tokenizer.tokenize进行中文分词细粒度标记。解析结果中还包含文档ID、知识库ID等元数据,用于后续索引。对于解析出的图片,如PPT页面截图或PDF扫描图片,系统会将其二进制数据存入STORAGE_IMPL指定的存储(如本地或云存储,默认是MINIO),并在片段数据中仅保留引用(如img_id)以减少向量库负担。

for (docinfo, _), th in zip(files, threads):        docs = []        doc = {            "doc_id": docinfo["id"],            "kb_id": [kb.id]        }        for ck in th.result():            d = deepcopy(doc)            d.update(ck)            d["id"] = xxhash.xxh64((ck["content_with_weight"] + str(d["doc_id"])).encode("utf-8")).hexdigest()            d["create_time"] = str(datetime.now()).replace("T"" ")[:19]            d["create_timestamp_flt"] = datetime.now().timestamp()            if not d.get("image"):                docs.append(d)                continue
            output_buffer = BytesIO()            if isinstance(d["image"], bytes):                output_buffer = BytesIO(d["image"])            else:                d["image"].save(output_buffer, format='JPEG')
            STORAGE_IMPL.put(kb.id, d["id"], output_buffer.getvalue())            d["img_id"] = "{}-{}".format(kb.id, d["id"])            d.pop("image", None)            docs.append(d)
class StorageFactory:    storage_mapping = {        Storage.MINIORAGFlowMinio,        Storage.AZURE_SPNRAGFlowAzureSpnBlob,        Storage.AZURE_SASRAGFlowAzureSasBlob,        Storage.AWS_S3RAGFlowS3,        Storage.OSSRAGFlowOSS,    }
    @classmethod    def create(cls, storage: Storage):        return cls.storage_mapping[storage]()

STORAGE_IMPL_TYPE = os.getenv('STORAGE_IMPL''MINIO')STORAGE_IMPL = StorageFactory.create(Storage[STORAGE_IMPL_TYPE])

潜在瓶颈: 当前数据处理流程虽然通过线程池并行解析多个文档,但仍有性能瓶颈和改进空间:

  • 大文件处理开销高: 对于超大文件(如含成千上万页的PDF),单个解析线程可能长时间占用,且会一次性产生大量片段数据存入内存。代码中会等待解析完成后再批量处理嵌入和存储,这可能导致内存占用峰值较高。优化建议: 可引入流式处理或分段加载机制,分批解析和处理文档。例如,将超长文档按章节分页解析,每生成一定数量片段就立刻进行嵌入和写入,减少单批内存占用。也可考虑在解析阶段对文本进行摘要或过滤无关信息,减小后续处理量。

  • 缺乏解析缓存和增量更新: 每次新增或修改文档都会重新解析全文。DocumentService目前通过在入库前删除原有片段实现更新,没有细粒度的增量更新。优化建议: 对于未变化的文档内容可跳过重复解析和嵌入。可以维护文件内容的哈希或指纹,如果文档再次上传且内容未变则复用已有向量。对于部分更新的文档,考虑对比新旧内容的差异,仅解析变化部分并更新对应片段。


@classmethod@DB.connection_context()def remove_document(cls, doc, tenant_id):        cls.clear_chunk_num(doc.id)        try:            settings.docStoreConn.delete({"doc_id": doc.id}, search.index_name(tenant_id), doc.kb_id)            graph_source = settings.docStoreConn.getFields(                settings.docStoreConn.search(["source_id"], [], {"kb_id": doc.kb_id, "knowledge_graph_kwd": ["graph"]}, [], OrderByExpr(), 0, 1, search.index_name(tenant_id), [doc.kb_id]), ["source_id"]            )            if len(graph_source) > 0 and doc.id in list(graph_source.values())[0]["source_id"]:                settings.docStoreConn.update({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "source_id": doc.id},                                            {"remove": {"source_id": doc.id}},                                            search.index_name(tenant_id), doc.kb_id)                settings.docStoreConn.update({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["graph"]},                                            {"removed_kwd": "Y"},                                            search.index_name(tenant_id), doc.kb_id)                settings.docStoreConn.delete({"kb_id": doc.kb_id, "knowledge_graph_kwd": ["entity", "relation", "graph", "subgraph", "community_report"], "must_not": {"exists": "source_id"}},                                            search.index_name(tenant_id), doc.kb_id)        except Exception:            pass        return cls.delete_by_id(doc.id)
  • 并行度与异步处理: 代码使用线程池并行解析,但嵌入计算仍在主线程同步执行:线程池收集所有文档片段后,依次对每个文档调用嵌入模型编码。若解析速度远快于嵌入,则CPU可能空闲等待GPU或外部API响应。优化建议: 引入异步任务队列多线程/多进程并发嵌入。例如,可在解析线程完成一个文档的切片后立即异步提交嵌入任务,而不是等所有解析结束;或者利用多个GPU/进程并行处理不同文档的向量生成,从而提高整体吞吐。还可以考虑批量嵌入的异步I/O,例如使用asyncio批发调用OpenAI等服务接口,加速远程嵌入API调用。

def embedding(doc_id, cnts, batch_size=16):        nonlocal embd_mdl, chunk_counts, token_counts        vects = []        for i in range(0, len(cnts), batch_size):            vts, c = embd_mdl.encode(cnts[i: i + batch_size])            vects.extend(vts.tolist())            chunk_counts[doc_id] += len(cnts[i:i + batch_size])            token_counts[doc_id] += c        return vects
  • 解析器性能与定制: ragflow 支持丰富格式,但解析依赖第三方库(如Aspose用于PPT, PyPDF用于PDF, OCR库用于图片)可能成为瓶颈。例如,大PDF的OCR耗时很长。优化建议: 针对常用格式引入更高效的解析工具或并行OCR方案;提供解析参数调优(如是否提取图片、OCR精度等)以在性能和精度间平衡。对于企业场景,可允许用户预先转换好部分数据格式,减少在线解析工作量,高精度场景可以提供多种方法及模型等配置选项,如minerU等。


完成解析和嵌入后,DocumentService通过docStoreConn接口将片段数据持久化到向量存储。它先检查或创建索引(index)结构,然后按批次(默认每批64条)插入片段。每个片段包含文本、元数据以及生成的向量(字段名形如q_<dim>_vec)。同时,DocumentService会更新关系型数据库中的文档记录(如总切片数、总Token数等)以供监控。整个数据入库流程确保了解析 -> 嵌入 -> 存储的顺序执行,并在出现错误时回滚或记录异常。

for i, d in enumerate(cks):    v = vects[i]    d["q_%d_vec" % len(v)] = vfor b in range(0len(cks), es_bulk_size=64):    if try_create_idx:        if not settings.docStoreConn.indexExist(idxnm, kb_id):            settings.docStoreConn.createIdx(idxnm, kb_id, len(vects[0]))        try_create_idx = False    settings.docStoreConn.insert(cks[b:b + es_bulk_size], idxnm, kb_id)

Retriever 模块

结构设计:ragflow的检索模块采用面向接口的设计,屏蔽了不同向量库实现的差异。核心抽象为DocStoreConnectionrag/utils/doc_store_conn.py),以及基于它构建的search.Dealer检索器(位于rag/nlp/search.py)。系统在初始化时根据配置的后端向量引擎类型(DOC_ENGINE)创建相应的DocStoreConnection实例,并赋给全局的settings.docStoreConn

global DOC_ENGINE, docStoreConn, retrievaler, kg_retrievalerDOC_ENGINE = os.environ.get("DOC_ENGINE""elasticsearch")
lower_case_doc_engine = DOC_ENGINE.lower()if lower_case_doc_engine == "elasticsearch":        docStoreConn = rag.utils.es_conn.ESConnection()elif lower_case_doc_engine == "infinity":        docStoreConn = rag.utils.infinity_conn.InfinityConnection()elif lower_case_doc_engine == "opensearch":        docStoreConn = rag.utils.opensearch_coon.OSConnection()else:        raise Exception(f"Not supported doc engine: {DOC_ENGINE}")
retrievaler = search.Dealer(docStoreConn)kg_retrievaler = kg_search.KGSearch(docStoreConn)

支持的引擎包括:Elasticsearch(利用其内置稠密向量索引)、OpenSearch(偏传统搜索引擎的向量支持),以及InfiniFlow团队自研的“Infinity”引擎。每种引擎在rag/utils目录下有对应实现类(如ESConnectionOSConnectionInfinityConnection),它们都提供统一的方法,例如:createIdx创建索引、insert插入文档、search执行搜索、getTotal获取结果总数等。在代码中可以看到,根据配置选择不同引擎,否则抛出异常“Not supported doc engine”。这种设计使得更换底层向量库(如从Elasticsearch换成Milvus或FAISS)对上层检索流程影响较小,只需实现新的DocStoreConnection子类并配置启用即可。

增加新的向量库类型: 尽管还有FAISS、Weaviate、Milvus等向量库,当前源码里主要直接支持的还是上述三种后端。ragflow架构上具备扩展能力:开发者可以参考现有DocStoreConnection接口封装新的后端(例如利用FAISS在本地内存建立索引,或通过Weaviate/Milvus的Python客户端发送查询)。引入这些向量库有望在纯向量检索性能和分布式扩展上提供优势。

检索策略: ragflow 的 Retriever 模块采用多路召回融合的策略,即同时执行稠密向量检索和稀疏关键字检索,并融合结果进行重排序。在search.Dealer.search方法中,这一逻辑清晰可见:首先,对用户查询进行处理,生成语义向量查询文本关键词查询。语义向量查询通过Embedding模块将用户问题编码成向量(使用emb_mdl.encode_queries得到查询向量);

def get_vector(self, txt, emb_mdl, topk=10, similarity=0.1):        qv, _ = emb_mdl.encode_queries(txt)        shape = np.array(qv).shape        if len(shape) > 1:            raise Exception(                f"Dealer.get_vector returned array's shape {shape} doesn't match expectation(exact one dimension).")        embedding_data = [get_float(v) for v in qv]        vector_column_name = f"q_{len(embedding_data)}_vec"        return MatchDenseExpr(vector_column_name, embedding_data, 'float''cosine', topk, {"similarity": similarity})

关键词查询则使用FulltextQueryer将问题转换为搜索式(提取关键词并设定匹配度)。接着,Dealer构造一个融合查询,将稠密向量匹配(MatchDenseExpr)和稀疏文本匹配(MatchTextExpr)组合(加权融合则通过FusionExpr实现)。最终通过docStoreConn.search接口提交给底层引擎执行。例如,当底层是ElasticSearch,会同时在索引中进行match文本查询和向量最近邻查询,再根据预设的相似度权重融合得分。当初次检索结果为空或不足时,系统降低文本匹配阈值(min_match)并再次发起结合关键词匹配(matchText) + 向量匹配(matchDense) + 融合表达式(fusionExpr)的检索,以最大限度召回相关片段。

matchText, keywords = self.qryr.question(qst, min_match=0.3)if emb_mdl is None:    matchExprs = [matchText]    res = self.dataStore.search(src, highlightFields, filters, matchExprs, orderBy, offset, limit,                                idx_names, kb_ids, rank_feature=rank_feature)    total = self.dataStore.getTotal(res)    logging.debug("Dealer.search TOTAL: {}".format(total))else:    matchDense = self.get_vector(qst, emb_mdl, topk, req.get("similarity", 0.1))    q_vec = matchDense.embedding_data    src.append(f"q_{len(q_vec)}_vec")    fusionExpr = FusionExpr("weighted_sum", topk, {"weights": "0.05, 0.95"})    matchExprs = [matchText, matchDense, fusionExpr]    res = self.dataStore.search(src, highlightFields, filters, matchExprs, orderBy, offset, limit,                                idx_names, kb_ids, rank_feature=rank_feature)    total = self.dataStore.getTotal(res)    logging.debug("Dealer.search TOTAL: {}".format(total))    # If result is empty, try again with lower min_match    if total == 0:        if filters.get("doc_id"):            res = self.dataStore.search(src, [], filters, [], orderBy, offset, limit, idx_names, kb_ids)            total = self.dataStore.getTotal(res)        else:            matchText, _ = self.qryr.question(qst, min_match=0.1)            filters.pop("doc_id", None)            matchDense.extra_options["similarity"] = 0.17            res = self.dataStore.search(src, highlightFields, filters, [matchText, matchDense, fusionExpr],                                        orderBy, offset, limit, idx_names, kb_ids, rank_feature=rank_feature)            total = self.dataStore.getTotal(res)            logging.debug("Dealer.search 2 TOTAL: {}".format(total))

除了 dense/sparse 双检索外,ragflow 还支持知识图谱检索(KGSearch)等多模态场景,代码中KGSearch(docStoreConn)表明针对结构化知识使用知识图谱检索器。但就文本QA场景而言,当前主要是稠密与稀疏信息的互补。这样设计的好处是:召回率更高且结果有解释性。稠密向量确保语义相关但可能引入模糊匹配,稀疏关键词确保精确匹配但可能漏掉语义改述,通过融合可以综合两者优点。同时,Dealer在返回结果时也提供了高亮和聚合等信息用于结果呈现。

class SearchResult:        total: int        ids: list[str]        query_vector: list[float] | None = None        field: dict | None = None        highlight: dict | None = None        aggregation: list | dict | None = None        keywords: list[str] | None = None        group_docs: list[list] | None = None

检索结果返回后,ragflow还可以执行重排序(rerank)操作。代码中预留了RERANK_MDL配置以及rag/llm/rerank_model.py模块用于加载例如BCE、BGE这样的cross-encoder重排序模型。在问答流程中,当Retriever找回N条候选片段后,调用similarity(query, texts)对片段与原问句打分,再重新排序以提升相关性。这种两阶段检索在文档中被称为“融合重排序”,可有效提升最终答案依据的准确性。

潜在问题:

  • 召回率不稳定: 稠密检索依赖向量质量,若嵌入模型未涵盖领域知识,可能漏掉相关内容;稀疏检索则受限于文本关键字匹配。融合后效果一般较单一路径好,但仍需调优参数(如向量与文本匹配的相对权重、min_match阈值等)。目前代码里similarity下限等参数硬编码(如再次检索时将matchDense阈值设0.17),这些可能需要根据实际数据调整。改进建议: 引入基于训练的数据驱动方法优化融合策略,例如学习一个小模型来判断向量匹配结果是否可靠,从而动态调整dense vs. sparse权重。此外可以增加对查询意图的判断:当用户问句包含具体实体/关键词时,加强BM25权重;当问句偏概念模糊时,更多依赖向量召回。

  • 检索速度较慢: 使用ElasticSearch/OpenSearch进行大规模向量检索,在向量维度高和数据量大时性能可能不如专门的向量数据库(如Milvus、FAISS)。Infinity引擎采用HNSW提升了稠密检索速度,但如果知识库规模千万级,单机HNSW内存和查询延迟都会增长。优化方案:引入更高性能的向量索引:例如将Faiss纳入后端,通过GPU加速近似最近邻搜索;或集成Milvus等支持分布式分片,以水平扩展大数据量下的查询。另外,可对现有HNSW参数调优(如M、ef值)或启用压缩降低内存占用。对于稀疏检索部分,可提前对文本做关键词过滤索引精简(如移除停用词)提升匹配效率。

  • 多GPU/分布式支持不足: 当前检索流程主要在单实例完成,并未充分利用多GPU并行。Infinity等向量库如果部署在多节点,需要协调多服务器的检索结果合并,这部分框架暂未体现。另外,在本地调用Faiss时,可以利用多GPU进行IndexShard或数据并行搜索,但目前架构未涵盖。改进建议: 提供分布式检索模式的支持,例如通过gRPC调用远程向量引擎集群;在应用层对大索引做分片并行检索,然后融合结果。此外,如上所述允许embedding环节利用多GPU生成查询向量,也能稍微减轻单卡压力。值得注意的是,多GPU更多地会用在嵌入阶段,检索阶段更多是CPU/内存瓶颈,因此提升检索并发能力和索引优化更为重要。

Retriever模块通过接口封装实现对不同向量存储的支持,采用稠密+稀疏融合提升召回和准确率,搭配可选的重排序模型以进一步优化结果排序。为让检索模块更为健壮,后续可以:扩展支持更丰富的后端(Faiss/Milvus 等)以提高性能和部署灵活性;引入Agent式检索调优(例如动态调整查询或多跳检索);加强并行与分布式能力,确保在大规模知识库下也能快速稳定地返回结果。

嵌入(Embedding)模块

支持的嵌入模型: ragflow的嵌入模块设计非常灵活,集成了多种来源的Embedding模型,包括本地和云端模型。例如:

  • 本地向量模型: 默认使用开源的通用文本嵌入模型,代码中DefaultEmbedding会通过FlagEmbedding库加载指定名称的模型权重(默认BAAI/bge-large-zh-v1.5),并将其缓存在本地。此外还有FastEmbed实现,使用轻量模型(如bge-small-en-v1.5)及优化推理库以提升嵌入计算速度。另一个本地模型是YoudaoEmbed,集成了网易有道的中英双语Embedding模型。这些本地模型在初始化时如果未缓存,会自动从HuggingFace Hub下载权重。

  • OpenAI 等云嵌入服务: 模块提供OpenAIEmbed类封装OpenAI的文本嵌入API(默认模型text-embedding-ada-002)。AzureEmbed支持Azure OpenAI服务。国内云服务也有:如BaiChuanEmbed对接百川智能的Embedding API,QWenEmbed通过阿里达摩院提供的DashScope接口获取其Embedding模型(如通义千问embedding),ZhipuEmbed对接智谱AI的Embedding接口。还有JinaEmbed用于调用Jina AI提供的向量服务接口。这些云服务类通常封装HTTP请求调用,并解析返回的embedding向量。例如JinaEmbed中直接以requests.post发送JSON请求并获取结果。

  • 本地部署的Embedding服务: 模块也考虑了用户自托管的大模型服务,如LocalAIEmbed用于对接本地的开源Embedding API(OpenAI兼容接口),OllamaEmbed用于调用本地的Ollama引擎生成embedding(支持一些本地LLM),XinferenceEmbed对接XInference服务等。这些类通常复用OpenAI接口逻辑但将base_url指向本地服务器地址,从而利用本地算力生成向量。

Embedding模块通过不同子类囊括了主流的嵌入方案:既支持开源模型离线推理,也支持各大厂商在线服务。配置上,系统允许用户设置EMBEDDING_MDL,选择默认的embedding模型。例如默认embedding模型列表里包括BGE-largeYoudao BCE。根据部署模式(LIGHTEN配置)可以决定是否加载大模型权重还是调用轻量服务。

BUILTIN_EMBEDDING_MODELS = ["BAAI/bge-large-zh-v1.5@BAAI", "maidalun1020/bce-embedding-base_v1@Youdao"]

Token限制与批处理策略: 由于不同模型对输入文本长度有限制,代码对长文本都做了截断处理,避免超过模型支持的token数。比如OpenAI的ada-002模型最多支持8191个token,OpenAIEmbed.encode对每段文本截断至8191长度;智谱embedding-3模型上限3072token,代码中据此截断输入。本地FlagEmbedding模型也将文本裁剪在2048字符左右以控制计算量。这种截断保证了对任意长文段都能安全编码,但也可能丢失部分信息。优化上,未来可考虑对超长文本分段嵌入(如对长段落进一步切分取均值)或者引入长上下文embedding模型。

class ZhipuEmbed(Base):    def __init__(self, key, model_name="embedding-2", **kwargs):        self.client = ZhipuAI(api_key=key)        self.model_name = model_name
    def encode(self, texts: list):        arr = []        tks_num = 0        MAX_LEN = -1        if self.model_name.lower() == "embedding-2":            MAX_LEN = 512        if self.model_name.lower() == "embedding-3":            MAX_LEN = 3072        if MAX_LEN > 0:            texts = [truncate(t, MAX_LEN) for t in texts]
        for txt in texts:            res = self.client.embeddings.create(input=txt,                                                model=self.model_name)            arr.append(res.data[0].embedding)            tks_num += self.total_token_count(res)        return np.array(arr), tks_num
    def encode_queries(self, text):        res = self.client.embeddings.create(input=text,                                            model=self.model_name)        return np.array(res.data[0].embedding), self.total_token_count(res)

在批处理方面,多数实现将请求按批大小16拆分,以兼顾效率和API限制。例如OpenAI每次请求最多16条输入,代码中显式设置batch_size=16;本地模型也通常16条一批以充分利用矩阵并行。也有特殊情况,如Qwen的DashScope接口批量能力较弱,仅用batch_size=4。批处理逻辑多采用简单的for循环累积结果。例如对一组文本列表,按16条一组调用底层模型获取embedding,并将结果扩展成一个numpy数组返回。每批次调用后,有的实现如OpenAI会累加已处理的token数方便后续记录。另外,有些会提供encode_queries专门处理单条查询的情况,有的则会直接调用encode([text])简化实现。总的来看,当前批处理策略较为保守,没有动态调整批大小或并行发出多个批次请求。

class LocalAIEmbed(Base):    def __init__(self, key, model_name, base_url):        if not base_url:            raise ValueError("Local embedding model url cannot be None")        if base_url.split("/")[-1] != "v1":            base_url = os.path.join(base_url, "v1")        self.client = OpenAI(api_key="empty", base_url=base_url)        self.model_name = model_name.split("___")[0]
    def encode(selftexts: list):        batch_size = 16        ress = []        for i in range(0, len(texts), batch_size):            res = self.client.embeddings.create(input=texts[i:i + batch_size], model=self.model_name)            ress.extend([d.embedding for d in res.data])        # local embedding for LmStudio donot count tokens        return np.array(ress), 1024
    def encode_queries(self, text):        embds, cnt = self.encode([text])        return np.array(embds[0]), cnt

多线程/并发实现: 在Embedding模块内部,各模型的encode实现大多是同步的,并未显式使用多线程。但ragflow利用上层流程并发处理不同文档的嵌入来提高吞吐:DocumentService中对不同文档的chunk是并行的,多个文档的向量生成可以借助线程池同时进行(受限于GPU算力)。不过对于单个大文档,其所有chunk向量还是在单线程顺序计算。目前也没有看到使用GPU多卡并行计算同一批次embedding的机制。而且DefaultEmbedding类初始化时将环境变量CUDA_VISIBLE_DEVICES强制设为0锁定使用第一块GPU,这在多GPU机器上可能造成闲置。改进建议: 考虑引入多线程/异步的编码方式。例如,对于需要调用外部API(OpenAI等),可以使用asyncio.gather同时发出多个请求批次,充分利用网络带宽;对于本地模型,如PyTorch的embedding模型,可采用多线程批次推理或OpenMP并行(前提是模型计算瓶颈在CPU)。此外,可以让用户配置使用哪些GPU设备,或在初始化时检测多GPU并对模型和数据进行切分,做到多卡并行嵌入(例如FlagEmbedding模型实例化多份各占一卡,分摊批次)。这些措施对大规模文档批量入库时的嵌入吞吐将有明显提升。

效率和可维护性评估: ragflow 的Embedding模块代码虽长但结构清晰:使用抽象基类Base定义接口,然后为不同服务商/模型各建一个子类实现。这种方式在增加新模型支持时需要修改代码增添类,但也提供了高度的控制力。目前来看,可维护性方面,每个子类基本独立,新增其它平台支持(例如TogetherAI、PerfX等,在代码中已经有一些雏形)很方便。如果后续模型种类继续增加,考虑采用插件机制或配置驱动会更灵活,例如通过配置文件列出可用的embedding后端及参数,运行时反射加载对应实现,避免主代码不断膨胀。

大规模文档嵌入情境下,当前结构可能暴露一些效率瓶颈:如所有文档embedding共用单一线程顺序执行,难以充分利用多核;本地大型模型(BGE large)占用显存大且推理速度相对慢,批量处理时GPU成为瓶颈。如果要提升吞吐,可考虑以下建议:

  • 批量并行:对于上万段文本的嵌入,可将其划分给多个进程/线程分别处理,比如启动多个EmbeddingWorker进程,各自加载模型处理一部分数据,然后合并结果。这在多核CPU或多GPU场景下尤为有效。也可使用向量化工具(如NumPy并行、PyTorch DataLoader)对CPU计算部分提速。

  • 模型压缩和加速:针对大模型嵌入的性能问题,可引入蒸馏、小模型(如使用small版BGE或其他轻量模型)在对性能要求高时使用;或者使用OpenVINO、TensorRT等对embedding模型做推理加速。FastEmbed类就是一个采用优化库的例子,其调用的fastembed.TextEmbedding内部可能利用C++实现提升计算速度。进一步的优化可以考虑量化embedding模型的权重,用FP16甚至INT8以换取更高速度和更低内存占用。

  • 流水线并行:将文档解析->嵌入->存储三步解耦成流水线,不必串行等待。比如在一个文档解析出部分chunk后,就立刻交给嵌入线程处理,同时解析线程继续处理下一个部分,从而解析和嵌入两个阶段重叠执行,提高硬件利用率。需要在架构上实现任务调度和缓冲队列,防止某一步过慢拖累整体。

ragflow的Embedding模块具有灵活易扩展的优点。在后续扩展中,可以引入配置化和插件化降低新增模型的代码改动,并通过并行优化模型加速方案,使大规模文档嵌入过程更高效。随着开源社区新的embedding模型出现(如多模态embedding,将图片、音频也编码到同一向量空间),ragflow也可以通过类似架构无缝接入,从而增强系统在多模态检索上的能力。

模块依赖关系概览:数据处理模块负责将原始数据转化为向量库可用的结构,内部依赖解析器(如DeepDoc等库)提取文本,并调用 Embedding 模块完成向量生成;Embedding模块对接外部模型或服务,将文本转为语义向量;生成的文本片段和向量通过DocStore接口持久化到检索引擎。检索查询发生时,Retriever模块调用 Embedding 模块将用户查询转为向量,并结合关键词匹配通过 DocStore 接口检索相关片段;Retriever返回的结果可再送入重排序模型(二次调用Embedding或独立的Rerank模型)以优化顺序,最终将结果提供给上层问答流程使用。因此,数据处理 -> 向量存储 <- 检索查询这一闭环确保了知识库内容与用户问题在同一向量空间匹配。

各模块解耦明确:解析与嵌入模块以配置和接口衔接,检索模块通过统一接口适配不同存储,实现了较好的模块化和可扩展性。今后在此架构上进行二次开发和性能优化,只需针对瓶颈模块局部改进,而不影响整体流程。各改进方案如上所述,将有助于ragflow更高效地处理海量异构数据,服务大规模问答应用。

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询