微信扫码
添加专属顾问
我要投稿
深入探索RAGFlow项目中Embeddings模型的选型与配置,以及向量数据库的技术细节。 核心内容: 1. RAGFlow中Embeddings模型的接口设计与配置 2. 支持的Embedding模型及其特点 3. 向量数据库的选型与实现分析
本文继续基于 RAGFlow 源代码和官方文档,详细分析其 Embeddings 模型选择与配置以及向量数据库选型与实现的技术细节。
RAGFlow 支持多种 Embeddings 模型,通过rag/llm/embedding_model.py
实现了丰富的模型接口和配置选项。
RAGFlow 采用了抽象基类设计模式,通过Base
类定义了所有 Embedding 模型必须实现的接口:
class Base(ABC):
def __init__(self, key, model_name):
pass
def encode(self, texts: list):
raise NotImplementedError("Please implement encode method!")
def encode_queries(self, text: str):
raise NotImplementedError("Please implement encode method!")
def total_token_count(self, resp):
try:
return resp.usage.total_tokens
except Exception:
pass
try:
return resp["usage"]["total_tokens"]
except Exception:
pass
return 0
这种设计使得 RAGFlow 可以轻松支持和扩展不同的 Embedding 模型,只需实现特定的接口方法。
从源码中可以看到,RAGFlow 支持以下 Embedding 模型:
class DefaultEmbedding(Base):
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
_model = None
_model_name = ""
_model_lock = threading.Lock()
def __init__(self, key, model_name, **kwargs):
if not settings.LIGHTEN:
with DefaultEmbedding._model_lock:
from FlagEmbedding import FlagModel
import torch
ifnot DefaultEmbedding._model or model_name != DefaultEmbedding._model_name:
try:
DefaultEmbedding._model = FlagModel(os.path.join(get_home_cache_dir(), re.sub(r"^[a-zA-Z0-9]+/", "", model_name)),
query_instruction_for_retrieval="为这个句子生成表示以用于检索相关文章:",
use_fp16=torch.cuda.is_available())
DefaultEmbedding._model_name = model_name
except Exception:
model_dir = snapshot_download(repo_id="BAAI/bge-large-zh-v1.5",
local_dir=os.path.join(get_home_cache_dir(), re.sub(r"^[a-zA-Z0-9]+/", "", model_name)),
local_dir_use_symlinks=False)
DefaultEmbedding._model = FlagModel(model_dir,
query_instruction_for_retrieval="为这个句子生成表示以用于检索相关文章:",
use_fp16=torch.cuda.is_available())
self._model = DefaultEmbedding._model
self._model_name = DefaultEmbedding._model_name
class OpenAIEmbed(Base):
def __init__(self, key, model_name="text-embedding-ada-002", base_url="https://api.openai.com/v1"):
if not base_url:
base_url = "https://api.openai.com/v1"
self.client = OpenAI(api_key=key, base_url=base_url)
self.model_name = model_name
class LocalAIEmbed(Base):
def __init__(self, key, model_name, base_url):
if not base_url:
raise ValueError("Local embedding model url cannot be None")
base_url = urljoin(base_url, "v1")
self.client = OpenAI(api_key="empty", base_url=base_url)
self.model_name = model_name.split("___")[0]
class AzureEmbed(OpenAIEmbed):
def __init__(self, key, model_name, **kwargs):
from openai.lib.azure import AzureOpenAI
api_key = json.loads(key).get('api_key', '')
api_version = json.loads(key).get('api_version', '2024-02-01')
self.client = AzureOpenAI(api_key=api_key, azure_endpoint=kwargs["base_url"], api_version=api_version)
self.model_name = model_name
class BaiChuanEmbed(OpenAIEmbed):
def __init__(self, key, model_name='Baichuan-Text-Embedding', base_url='https://api.baichuan-ai.com/v1'):
if not base_url:
base_url = "https://api.baichuan-ai.com/v1"
super().__init__(key, model_name, base_url)
class QWenEmbed(Base):
def __init__(self, key, model_name="text_embedding_v2", **kwargs):
self.key = key
self.model_name = model_name
class ZhipuEmbed(Base):
def __init__(self, key, model_name="embedding-2", **kwargs):
self.client = ZhipuAI(api_key=key)
self.model_name = model_name
class OllamaEmbed(Base):
def __init__(self, key, model_name, **kwargs):
self.client = Client(host=kwargs["base_url"]) if not key or key == "x" else \
Client(host=kwargs["base_url"], headers={"Authorization": f"Bear {key}"})
self.model_name = model_name
class GoogleEmbed(Base):
def __init__(self, key, model_name="embedding-001", **kwargs):
genai.configure(api_key=key)
self.model_name = model_name
RAGFlow 在 embedding 模型实现中采用了批处理优化,以提高处理效率:
def encode(self, texts: list):
batch_size = 16
texts = [truncate(t, 2048) for t in texts]
token_count = 0
for t in texts:
token_count += num_tokens_from_string(t)
ress = []
for i in range(0, len(texts), batch_size):
ress.extend(self._model.encode(texts[i:i + batch_size]).tolist())
return np.array(ress), token_count
这种批处理方式可以减少 API 调用次数,提高效率。不同的模型实现了不同的批处理策略,例如:
RAGFlow 对不同模型的文本长度限制进行了处理:
# 在OpenAIEmbed中
texts = [truncate(t, 8191) for t in texts]
# 在QWenEmbed中
texts = [truncate(t, 2048) for t in texts]
# 在ZhipuEmbed中
if self.model_name.lower() == "embedding-2":
MAX_LEN = 512
if self.model_name.lower() == "embedding-3":
MAX_LEN = 3072
if MAX_LEN > 0:
texts = [truncate(t, MAX_LEN) for t in texts]
这种处理确保了文本不会超出模型的最大长度限制,避免了 API 调用错误。
根据官方文档和源码,RAGFlow 的 Docker 镜像(非 slim 版本)预装了两个优化的 embedding 模型:
这两个模型专为中英文优化,提供了良好的多语言支持。在DefaultEmbedding
类中,如果未指定模型,默认使用 BAAI/bge-large-zh-v1.5。
RAGFlow 采用了灵活的向量数据库架构,通过抽象接口支持多种向量数据库。
RAGFlow 在rag/utils/doc_store_conn.py
中定义了向量数据库的抽象接口:
class DocStoreConnection(ABC):
"""
Database operations
"""
@abstractmethod
def dbType(self) -> str:
"""
Return the type of the database.
"""
raise NotImplementedError("Not implemented")
@abstractmethod
def health(self) -> dict:
"""
Return the health status of the database.
"""
raise NotImplementedError("Not implemented")
"""
Table operations
"""
@abstractmethod
def createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int):
"""
Create an index with given name
"""
raise NotImplementedError("Not implemented")
@abstractmethod
def deleteIdx(self, indexName: str, knowledgebaseId: str):
"""
Delete an index with given name
"""
raise NotImplementedError("Not implemented")
@abstractmethod
def indexExist(self, indexName: str, knowledgebaseId: str) -> bool:
"""
Check if an index with given name exists
"""
raise NotImplementedError("Not implemented")
"""
CRUD operations
"""
@abstractmethod
def search(
self, selectFields: list[str], highlightFields: list[str],
condition: dict, matchExprs: list[MatchExpr],
orderBy: OrderByExpr, offset: int, limit: int,
indexNames: str|list[str], knowledgebaseIds: list[str],
aggFields: list[str] = [], rank_feature: dict | None = None
):
"""
Search with given conjunctive equivalent filtering condition and return all fields of matched documents
"""
raise NotImplementedError("Not implemented")
这种抽象接口设计使得 RAGFlow 可以轻松支持不同的向量数据库,只需实现特定的接口方法。
RAGFlow 默认使用 OpenSearch 作为向量数据库,通过rag/utils/opensearch_coon.py
实现:
@singleton
class OSConnection(DocStoreConnection):
def __init__(self):
self.info = {}
logger.info(f"Use OpenSearch {settings.OS['hosts']} as the doc engine.")
for _ in range(ATTEMPT_TIME):
try:
self.os = OpenSearch(
settings.OS["hosts"].split(","),
http_auth=(settings.OS["username"], settings.OS["password"]) if "username" in settings.OS and "password" in settings.OS else None,
verify_certs=False,
timeout=600
)
if self.os:
self.info = self.os.info()
break
except Exception as e:
logger.warning(f"{str(e)}. Waiting OpenSearch {settings.OS['hosts']} to be healthy.")
time.sleep(5)
OpenSearch 实现了所有必要的接口方法,包括索引创建、删除、搜索等。
RAGFlow 为每个知识库创建独立的索引,使用命名规则ragflow_{uid}
确保索引唯一性:
def createIdx(self, indexName: str, knowledgebaseId: str, vectorSize: int):
if self.indexExist(indexName, knowledgebaseId):
return True
try:
from opensearchpy.client import IndicesClient
return IndicesClient(self.os).create(index=indexName, body=self.mapping)
except Exception:
logger.exception("OSConnection.createIndex error %s" % (indexName))
索引结构通过配置文件conf/os_mapping.json
定义,确保了向量数据的正确存储和检索。
RAGFlow 实现了混合检索策略,结合关键词搜索和向量相似度搜索:
def search(
self, selectFields: list[str], highlightFields: list[str],
condition: dict, matchExprs: list[MatchExpr],
orderBy: OrderByExpr, offset: int, limit: int,
indexNames: str|list[str], knowledgebaseIds: list[str],
aggFields: list[str] = [], rank_feature: dict | None = None
):
use_knn = False
if isinstance(indexNames, str):
indexNames = indexNames.split(",")
assert isinstance(indexNames, list) and len(indexNames) > 0
assert "_id" not in condition
bqry = Q("bool", must=[])
condition["kb_id"] = knowledgebaseIds
# ... 构建查询条件 ...
s = Search()
vector_similarity_weight = 0.5
for m in matchExprs:
if isinstance(m, FusionExpr) and m.method == "weighted_sum" and "weights" in m.fusion_params:
assert len(matchExprs) == 3 and isinstance(matchExprs[0], MatchTextExpr) and isinstance(matchExprs[1], MatchDenseExpr) and isinstance(matchExprs[2], FusionExpr)
weights = m.fusion_params["weights"]
vector_similarity_weight = float(weights.split(",")[1])
knn_query = {}
for m in matchExprs:
if isinstance(m, MatchTextExpr):
# 关键词搜索
minimum_should_match = m.extra_options.get("minimum_should_match", 0.0)
if isinstance(minimum_should_match, float):
minimum_should_match = str(int(minimum_should_match * 100)) + "%"
bqry.must.append(Q("query_string", fields=m.fields, type="best_fields", query=m.matching_text, minimum_should_match=minimum_should_match, boost=1))
bqry.boost = 1.0 - vector_similarity_weight
elif isinstance(m, MatchDenseExpr):
# 向量相似度搜索
assert (bqry is not None)
similarity = 0.0
if "similarity" in m.extra_options:
similarity = m.extra_options["similarity"]
use_knn = True
vector_column_name = m.vector_column_name
knn_query[vector_column_name] = {}
knn_query[vector_column_name]["vector"] = list(m.embedding_data)
knn_query[vector_column_name]["k"] = m.topn
knn_query[vector_column_name]["filter"] = bqry.to_dict()
knn_query[vector_column_name]["boost"] = similarity
这种混合检索策略可以提高检索质量,结合了关键词搜索的精确性和向量相似度搜索的语义理解能力。
RAGFlow 支持配置关键词搜索和向量相似度搜索的权重:
vector_similarity_weight = 0.5
for m in matchExprs:
if isinstance(m, FusionExpr) and m.method == "weighted_sum" and "weights" in m.fusion_params:
assert len(matchExprs) == 3 and isinstance(matchExprs[0], MatchTextExpr) and isinstance(matchExprs[1], MatchDenseExpr) and isinstance(matchExprs[2], FusionExpr)
weights = m.fusion_params["weights"]
vector_similarity_weight = float(weights.split(",")[1])
默认情况下,向量相似度权重为 0.5,关键词搜索权重为 0.5。用户可以根据需要调整这些权重,以优化检索结果。
RAGFlow 实现了批量操作以提高写入效率:
def insert(self, rows: list[dict], indexName: str, knowledgebaseId: str = None) -> list[str]:
"""
Update or insert a bulk of rows
"""
if len(rows) == 0:
return []
actions = []
ids = []
for row in rows:
if "_id" in row:
_id = row["_id"]
del row["_id"]
else:
_id = None
action = {
"_index": indexName,
"_source": row
}
if _id:
action["_id"] = _id
ids.append(_id)
actions.append(action)
try:
from opensearchpy.helpers import bulk
success, failed = bulk(self.os, actions, stats_only=True)
return ids
except Exception:
logger.exception("OSConnection.insert error")
return []
这种批量操作方式可以减少 API 调用次数,提高写入效率。
根据官方文档,RAGFlow 允许为不同知识库选择不同的 embedding 模型:
An embedding model converts chunks into embeddings. It cannot be changed once the knowledge base has chunks. To switch to a different embedding model, you must delete all existing chunks in the knowledge base. The obvious reason is that we must ensure that files in a specific knowledge base are converted to embeddings using the same embedding model (ensure that they are compared in the same embedding space).
这种设计确保了同一知识库内所有文档使用相同的 embedding 模型,保证了向量空间的一致性。
RAGFlow 通过配置文件设置向量数据库连接参数:
self.os = OpenSearch(
settings.OS["hosts"].split(","),
http_auth=(settings.OS["username"], settings.OS["password"]) if "username" in settings.OS and "password" in settings.OS else None,
verify_certs=False,
timeout=600
)
这些参数可以通过环境变量或配置文件进行设置,使得 RAGFlow 可以灵活地连接不同的 OpenSearch 实例。
RAGFlow 支持配置多种检索参数,如相似度阈值、向量相似度权重等:
RAGFlow uses multiple recall of both full-text search and vector search in its chats. Prior to setting up an AI chat, consider adjusting the following parameters to ensure that the intended information always turns up in answers:
* Similarity threshold: Chunks with similarities below the threshold will be filtered. By default, it is set to 0.2.
* Vector similarity weight: The percentage by which vector similarity contributes to the overall score. By default, it is set to 0.3.
这些参数可以根据具体需求进行调整,以优化检索结果。
RAGFlow 在 Embeddings 模型选择与配置以及向量数据库选型与实现方面具有以下特点:
这些特点使得 RAGFlow 能够灵活地适应不同的应用场景,提供高质量的检索结果。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-10-27
2024-09-04
2024-05-05
2024-07-18
2024-06-20
2024-06-13
2024-07-09
2024-07-09
2024-05-19
2024-07-07
2025-06-06
2025-05-30
2025-05-29
2025-05-29
2025-05-23
2025-05-16
2025-05-15
2025-05-14