支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


RAGFlow中的Embeddings模型选择及向量数据库选型与实现分析

发布日期:2025-06-05 13:57:12 浏览次数: 1604 作者:硅基漫游指南
推荐语

深入探索RAGFlow项目中Embeddings模型的选型与配置,以及向量数据库的技术细节。

核心内容:
1. RAGFlow中Embeddings模型的接口设计与配置
2. 支持的Embedding模型及其特点
3. 向量数据库的选型与实现分析

杨芳贤
53A创始人/腾讯云(TVP)最具价值专家

 

本文继续基于 RAGFlow 源代码和官方文档,详细分析其 Embeddings 模型选择与配置以及向量数据库选型与实现的技术细节。

1. Embeddings 模型选择与配置

RAGFlow 支持多种 Embeddings 模型,通过rag/llm/embedding_model.py实现了丰富的模型接口和配置选项。

1.1 模型架构设计

RAGFlow 采用了抽象基类设计模式,通过Base类定义了所有 Embedding 模型必须实现的接口:

class Base(ABC):
    def __init__(self, key, model_name):
        pass

    def encode(self, texts: list):
        raise NotImplementedError("Please implement encode method!")

    def encode_queries(self, text: str):
        raise NotImplementedError("Please implement encode method!")

    def total_token_count(self, resp):
        try:
            return resp.usage.total_tokens
        except Exception:
            pass
        try:
            return resp["usage"]["total_tokens"]
        except Exception:
            pass
        return 0

这种设计使得 RAGFlow 可以轻松支持和扩展不同的 Embedding 模型,只需实现特定的接口方法。

1.2 支持的 Embedding 模型

从源码中可以看到,RAGFlow 支持以下 Embedding 模型:

  1. 1. DefaultEmbedding:默认使用 FlagEmbedding 模型,基于 BAAI/bge-large-zh-v1.5
    class DefaultEmbedding(Base):
        os.environ['CUDA_VISIBLE_DEVICES'] = '0'
        _model = None
        _model_name = ""
        _model_lock = threading.Lock()

        def __init__(self, key, model_name, **kwargs):
            if not settings.LIGHTEN:
                with DefaultEmbedding._model_lock:
                    from FlagEmbedding import FlagModel
                    import torch
                    ifnot DefaultEmbedding._model or model_name != DefaultEmbedding._model_name:
                        try:
                            DefaultEmbedding._model = FlagModel(os.path.join(get_home_cache_dir(), re.sub(r"^[a-zA-Z0-9]+/""", model_name)),
                                                               query_instruction_for_retrieval="为这个句子生成表示以用于检索相关文章:",
                                                               use_fp16=torch.cuda.is_available())
                            DefaultEmbedding._model_name = model_name
                        except Exception:
                            model_dir = snapshot_download(repo_id="BAAI/bge-large-zh-v1.5",
                                                         local_dir=os.path.join(get_home_cache_dir(), re.sub(r"^[a-zA-Z0-9]+/""", model_name)),
                                                         local_dir_use_symlinks=False)
                            DefaultEmbedding._model = FlagModel(model_dir,
                                                              query_instruction_for_retrieval="为这个句子生成表示以用于检索相关文章:",
                                                              use_fp16=torch.cuda.is_available())
                    self._model = DefaultEmbedding._model
                    self._model_name = DefaultEmbedding._model_name
  2. 2. OpenAIEmbed:使用 OpenAI 的 embedding 模型
    class OpenAIEmbed(Base):
        def __init__(self, key, model_name="text-embedding-ada-002", base_url="https://api.openai.com/v1"):
            if not base_url:
                base_url = "https://api.openai.com/v1"
            self.client = OpenAI(api_key=key, base_url=base_url)
            self.model_name = model_name
  3. 3. LocalAIEmbed:支持本地部署的 embedding 模型
    class LocalAIEmbed(Base):
        def __init__(self, key, model_name, base_url):
            if not base_url:
                raise ValueError("Local embedding model url cannot be None")
            base_url = urljoin(base_url, "v1")
            self.client = OpenAI(api_key="empty", base_url=base_url)
            self.model_name = model_name.split("___")[0]
  4. 4. AzureEmbed:Azure OpenAI 的 embedding 模型
    class AzureEmbed(OpenAIEmbed):
        def __init__(self, key, model_name, **kwargs):
            from openai.lib.azure import AzureOpenAI
            api_key = json.loads(key).get('api_key''')
            api_version = json.loads(key).get('api_version''2024-02-01')
            self.client = AzureOpenAI(api_key=api_key, azure_endpoint=kwargs["base_url"], api_version=api_version)
            self.model_name = model_name
  5. 5. BaiChuanEmbed:百川 AI 的 embedding 模型
    class BaiChuanEmbed(OpenAIEmbed):
        def __init__(self, key, model_name='Baichuan-Text-Embedding', base_url='https://api.baichuan-ai.com/v1'):
            if not base_url:
                base_url = "https://api.baichuan-ai.com/v1"
            super().__init__(key, model_name, base_url)
  6. 6. QWenEmbed:通义千问的 embedding 模型
    class QWenEmbed(Base):
        def __init__(self, key, model_name="text_embedding_v2", **kwargs):
            self.key = key
            self.model_name = model_name
  7. 7. ZhipuEmbed:智谱 AI 的 embedding 模型
    class ZhipuEmbed(Base):
        def __init__(self, key, model_name="embedding-2", **kwargs):
            self.client = ZhipuAI(api_key=key)
            self.model_name = model_name
  8. 8. OllamaEmbed:Ollama 的 embedding 模型
    class OllamaEmbed(Base):
        def __init__(self, key, model_name, **kwargs):
            self.client = Client(host=kwargs["base_url"]) if not key or key == "x" else \
                Client(host=kwargs["base_url"], headers={"Authorization"f"Bear {key}"})
            self.model_name = model_name
  9. 9. GoogleEmbed:Google 的 embedding 模型
    class GoogleEmbed(Base):
        def __init__(self, key, model_name="embedding-001", **kwargs):
            genai.configure(api_key=key)
            self.model_name = model_name

1.3 批处理优化

RAGFlow 在 embedding 模型实现中采用了批处理优化,以提高处理效率:

def encode(self, texts: list):
    batch_size = 16
    texts = [truncate(t, 2048for t in texts]
    token_count = 0
    for t in texts:
        token_count += num_tokens_from_string(t)
    ress = []
    for i in range(0len(texts), batch_size):
        ress.extend(self._model.encode(texts[i:i + batch_size]).tolist())
    return np.array(ress), token_count

这种批处理方式可以减少 API 调用次数,提高效率。不同的模型实现了不同的批处理策略,例如:

  • • DefaultEmbedding:批量大小为 16
  • • OpenAIEmbed:批量大小为 16,并限制文本长度为 8191
  • • QWenEmbed:批量大小为 4,并限制文本长度为 2048

1.4 文本长度处理

RAGFlow 对不同模型的文本长度限制进行了处理:

# 在OpenAIEmbed中
texts = [truncate(t, 8191for t in texts]

# 在QWenEmbed中
texts = [truncate(t, 2048for t in texts]

# 在ZhipuEmbed中
if self.model_name.lower() == "embedding-2":
    MAX_LEN = 512
if self.model_name.lower() == "embedding-3":
    MAX_LEN = 3072
if MAX_LEN > 0:
    texts = [truncate(t, MAX_LEN) for t in texts]

这种处理确保了文本不会超出模型的最大长度限制,避免了 API 调用错误。

1.5 默认模型选择

根据官方文档和源码,RAGFlow 的 Docker 镜像(非 slim 版本)预装了两个优化的 embedding 模型:

  1. 1. BAAI/bge-large-zh-v1.5
  2. 2. maidalun1020/bce-embedding-base_v1

这两个模型专为中英文优化,提供了良好的多语言支持。在DefaultEmbedding类中,如果未指定模型,默认使用 BAAI/bge-large-zh-v1.5。

2. 向量数据库选型与实现

RAGFlow 采用了灵活的向量数据库架构,通过抽象接口支持多种向量数据库。

2.1 数据库抽象接口

RAGFlow 在rag/utils/doc_store_conn.py中定义了向量数据库的抽象接口:

class DocStoreConnection(ABC):
    """
    Database operations
    """

    @abstractmethod
    def dbType(self) -> str:
        """
        Return the type of the database.
        """

        raise NotImplementedError("Not implemented")

    @abstractmethod
    def health(self) -> dict:
        """
        Return the health status of the database.
        """

        raise NotImplementedError("Not implemented")

    """
    Table operations
    """

    @abstractmethod
    def createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int):
        """
        Create an index with given name
        """

        raise NotImplementedError("Not implemented")

    @abstractmethod
    def deleteIdx(self, indexName: str, knowledgebaseId: str):
        """
        Delete an index with given name
        """

        raise NotImplementedError("Not implemented")

    @abstractmethod
    def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
        """
        Check if an index with given name exists
        """

        raise NotImplementedError("Not implemented")

    """
    CRUD operations
    """

    @abstractmethod
    def search(
        self, selectFields: list[str], highlightFields: list[str],
        condition: dict, matchExprs: list[MatchExpr],
        orderBy: OrderByExpr, offset: int, limit: int,
        indexNames: str|list[str], knowledgebaseIds: list[str],
        aggFields: list[str] = [], rank_feature: dict | None = None
    
):
        """
        Search with given conjunctive equivalent filtering condition and return all fields of matched documents
        """

        raise NotImplementedError("Not implemented")

这种抽象接口设计使得 RAGFlow 可以轻松支持不同的向量数据库,只需实现特定的接口方法。

2.2 OpenSearch 实现

RAGFlow 默认使用 OpenSearch 作为向量数据库,通过rag/utils/opensearch_coon.py实现:

@singleton
class OSConnection(DocStoreConnection):
    def __init__(self):
        self.info = {}
        logger.info(f"Use OpenSearch {settings.OS['hosts']} as the doc engine.")
        for _ in range(ATTEMPT_TIME):
            try:
                self.os = OpenSearch(
                    settings.OS["hosts"].split(","),
                    http_auth=(settings.OS["username"], settings.OS["password"]) if "username" in settings.OS and "password" in settings.OS else None,
                    verify_certs=False,
                    timeout=600
                )
                if self.os:
                    self.info = self.os.info()
                    break
            except Exception as e:
                logger.warning(f"{str(e)}. Waiting OpenSearch {settings.OS['hosts']} to be healthy.")
                time.sleep(5)

OpenSearch 实现了所有必要的接口方法,包括索引创建、删除、搜索等。

2.3 索引设计

RAGFlow 为每个知识库创建独立的索引,使用命名规则ragflow_{uid}确保索引唯一性:

def createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int):
    if self.indexExist(indexName, knowledgebaseId):
        return True
    try:
        from opensearchpy.client import IndicesClient
        return IndicesClient(self.os).create(index=indexName, body=self.mapping)
    except Exception:
        logger.exception("OSConnection.createIndex error %s" % (indexName))

索引结构通过配置文件conf/os_mapping.json定义,确保了向量数据的正确存储和检索。

2.4 混合检索策略

RAGFlow 实现了混合检索策略,结合关键词搜索和向量相似度搜索:

def search(
    self, selectFields: list[str], highlightFields: list[str],
    condition: dict, matchExprs: list[MatchExpr],
    orderBy: OrderByExpr, offset: int, limit: int,
    indexNames: str|list[str], knowledgebaseIds: list[str],
    aggFields: list[str] = [], rank_feature: dict | None = None
):
    use_knn = False
    if isinstance(indexNames, str):
        indexNames = indexNames.split(",")
    assert isinstance(indexNames, listand len(indexNames) > 0
    assert "_id" not in condition

    bqry = Q("bool", must=[])
    condition["kb_id"] = knowledgebaseIds

    # ... 构建查询条件 ...

    s = Search()
    vector_similarity_weight = 0.5

    for m in matchExprs:
        if isinstance(m, FusionExpr) and m.method == "weighted_sum" and "weights" in m.fusion_params:
            assert len(matchExprs) == and isinstance(matchExprs[0], MatchTextExpr) and isinstance(matchExprs[1], MatchDenseExpr) and isinstance(matchExprs[2], FusionExpr)
            weights = m.fusion_params["weights"]
            vector_similarity_weight = float(weights.split(",")[1])

    knn_query = {}
    for m in matchExprs:
        if isinstance(m, MatchTextExpr):
            # 关键词搜索
            minimum_should_match = m.extra_options.get("minimum_should_match"0.0)
            if isinstance(minimum_should_match, float):
                minimum_should_match = str(int(minimum_should_match * 100)) + "%"
            bqry.must.append(Q("query_string", fields=m.fields, type="best_fields", query=m.matching_text, minimum_should_match=minimum_should_match, boost=1))
            bqry.boost = 1.0 - vector_similarity_weight

        elif isinstance(m, MatchDenseExpr):
            # 向量相似度搜索
            assert (bqry is not None)
            similarity = 0.0
            if "similarity" in m.extra_options:
                similarity = m.extra_options["similarity"]
            use_knn = True
            vector_column_name = m.vector_column_name
            knn_query[vector_column_name] = {}
            knn_query[vector_column_name]["vector"] = list(m.embedding_data)
            knn_query[vector_column_name]["k"] = m.topn
            knn_query[vector_column_name]["filter"] = bqry.to_dict()
            knn_query[vector_column_name]["boost"] = similarity

这种混合检索策略可以提高检索质量,结合了关键词搜索的精确性和向量相似度搜索的语义理解能力。

2.5 权重配置

RAGFlow 支持配置关键词搜索和向量相似度搜索的权重:

vector_similarity_weight = 0.5
for m in matchExprs:
    if isinstance(m, FusionExpr) and m.method == "weighted_sum" and "weights" in m.fusion_params:
        assert len(matchExprs) == 3 and isinstance(matchExprs[0], MatchTextExpr) and isinstance(matchExprs[1], MatchDenseExpr) and isinstance(matchExprs[2], FusionExpr)
        weights = m.fusion_params["weights"]
        vector_similarity_weight = float(weights.split(",")[1])

默认情况下,向量相似度权重为 0.5,关键词搜索权重为 0.5。用户可以根据需要调整这些权重,以优化检索结果。

2.6 批量操作优化

RAGFlow 实现了批量操作以提高写入效率:

def insert(self, rows: list[dict], indexName: str, knowledgebaseId: str = None) -> list[str]:
    """
    Update or insert a bulk of rows
    """

    if len(rows) == 0:
        return []

    actions = []
    ids = []

    for row in rows:
        if "_id" in row:
            _id = row["_id"]
            del row["_id"]
        else:
            _id = None

        action = {
            "_index": indexName,
            "_source": row
        }

        if _id:
            action["_id"] = _id
            ids.append(_id)

        actions.append(action)

    try:
        from opensearchpy.helpers import bulk
        success, failed = bulk(self.os, actions, stats_only=True)
        return ids
    except Exception:
        logger.exception("OSConnection.insert error")
        return []

这种批量操作方式可以减少 API 调用次数,提高写入效率。

3. 配置与集成

3.1 Embedding 模型配置

根据官方文档,RAGFlow 允许为不同知识库选择不同的 embedding 模型:

An embedding model converts chunks into embeddings. It cannot be changed once the knowledge base has chunks. To switch to a different embedding model, you must delete all existing chunks in the knowledge base. The obvious reason is that we must ensure that files in a specific knowledge base are converted to embeddings using the same embedding model (ensure that they are compared in the same embedding space).

这种设计确保了同一知识库内所有文档使用相同的 embedding 模型,保证了向量空间的一致性。

3.2 向量数据库配置

RAGFlow 通过配置文件设置向量数据库连接参数:

self.os = OpenSearch(
    settings.OS["hosts"].split(","),
    http_auth=(settings.OS["username"], settings.OS["password"]) if "username" in settings.OS and "password" in settings.OS else None,
    verify_certs=False,
    timeout=600
)

这些参数可以通过环境变量或配置文件进行设置,使得 RAGFlow 可以灵活地连接不同的 OpenSearch 实例。

3.3 检索参数配置

RAGFlow 支持配置多种检索参数,如相似度阈值、向量相似度权重等:

RAGFlow uses multiple recall of both full-text search and vector search in its chats. Prior to setting up an AI chat, consider adjusting the following parameters to ensure that the intended information always turns up in answers:

* Similarity threshold: Chunks with similarities below the threshold will be filtered. By default, it is set to 0.2.
* Vector similarity weight: The percentage by which vector similarity contributes to the overall score. By default, it is set to 0.3.

这些参数可以根据具体需求进行调整,以优化检索结果。

4. 总结

RAGFlow 在 Embeddings 模型选择与配置以及向量数据库选型与实现方面具有以下特点:

  1. 1. 多模型支持:支持多种 embedding 模型,包括 OpenAI、智谱 AI、通义千问等,以及本地部署的模型。
  2. 2. 批处理优化:实现了批量编码以提高效率,并自动处理长文本截断。
  3. 3. 一致性保证:确保同一知识库内所有文档使用相同的 embedding 模型,保证向量空间的一致性。
  4. 4. 灵活配置:允许为不同知识库选择不同的 embedding 模型,支持通过配置文件调整模型参数。
  5. 5. 默认使用 OpenSearch:作为默认的向量数据库,提供高性能的检索能力和良好的扩展性。
  6. 6. 混合检索策略:结合关键词搜索和向量相似度搜索,通过加权融合提高检索质量。
  7. 7. 批量操作优化:实现了批量操作以提高写入效率,减少 API 调用次数。

这些特点使得 RAGFlow 能够灵活地适应不同的应用场景,提供高质量的检索结果。


53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询