微信扫码
添加专属顾问
我要投稿
为RAGFlow扩展数据源?这份指南详细解析了如何通过四类核心接口实现异构系统的无缝接入。 核心内容: 1. RAGFlow connector框架的四大核心接口设计 2. 简单与复杂数据源的差异化实现方案 3. 从体系架构到具体实现的完整接入流程
本指南面向希望为 RAGFlow 扩展数据源能力的社区开发者,旨在以专业、可复用的流程说明如何实现并接入新的 connector。RAGFlow 的 connector 框架深受 Onyx 开源项目启发,特此致谢。
在实际使用中,除了从本地文件系统导入文件,RAGFlow 还需要从大量异构系统中获取数据。
为此,引入了统一的 datasource / connector 组件:每一种外部数据源通过一个 connector 完成“认证、拉取文档、增量更新以及按 checkpoint 续传” 等一系列动作,从而让上层检索与问答逻辑完全解耦于底层数据源。
围绕这一目标,RAGFlow 暴露了四个主要接口,覆盖了从简单到复杂的大部分场景:
对于“结构简单”的数据源(例如对象存储、单一 API 拉取),通常只需实现 LoadConnector 和 PollConnector 即可完成接入;
对于需要分页、断点续扫或权限同步的“复杂数据源”(例如 Confluence,Jira,Google Drive),可以进一步实现 CheckpointedConnector 或 CheckpointedConnectorWithPermSync。
只要按照本文说明实现并接好这四类接口,就可以将你的数据源无缝接入 RAGFlow。
本文共有五个部分:体系概览、核心抽象接口、实现流程、接入 SyncBase 的示例解析,以及一个交付检查清单。
体系概览
整体结构可以抽象为三层:
从贡献者视角,可以简单地理解为:
SyncBase 是调度流程的核心。所有数据源的执行逻辑都会在 __call__ 中被统一处理。一般不需要被修改。
SyncBase 负责统一的批量写入、日志与 checkpoint 更新,而_generate() 由各数据源实现,负责返回 Iterable[list[Document]]。
class SyncBase:
SOURCE_NAME: str = None
async def __call__(self, task: dict):
...
async def _generate(self, task: dict):
raise NotImplementedError核心抽象接口
文档模型
所有 connector 必须产出 Document。doc_updated_at 必须是 UTC 时间,以保证增量同步精度。
class Document(BaseModel):
id: str
source: str
semantic_identifier: str
extension: str
blob: bytes
doc_updated_at: datetime
size_bytes: intclass LoadConnector(ABC):
@abstractmethod
def load_credentials(self, credentials: Dict[str, Any]) -> Dict[str, Any] | None: ...
@abstractmethod
def load_from_state(self) -> Generator[list[Document], None, None]:
"""load all documents up to now"""
...
@abstractmethod
def validate_connector_settings(self) -> None: ...class PollConnector(ABC):
@abstractmethod
def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> Generator[list[Document], None, None]:
"""load documents from start to end"""
...下面的代码展示了 S3 对应的具体实现:
class S3(SyncBase):
SOURCE_NAME: str = FileSource.S3
asyncdef_generate(self, task: dict):
self.connector = BlobStorageConnector(
bucket_type=self.conf.get("bucket_type", "s3"),
bucket_name=self.conf["bucket_name"],
prefix=self.conf.get("prefix", "")
)
self.connector.load_credentials(self.conf["credentials"])
document_batch_generator = (
self.connector.load_from_state()
if task["reindex"] == "1"ornot task["poll_range_start"]
elseself.connector.poll_source(
task["poll_range_start"].timestamp(),
datetime.now(timezone.utc).timestamp()
)
)
return document_batch_generator对应的 connector 的实现:
class BlobStorageConnector(LoadConnector, PollConnector):
defload_from_state(self) -> GenerateDocumentsOutput:
returnself._yield_blob_objects(
start=datetime(1970, 1, 1, tzinfo=timezone.utc),
end=datetime.now(timezone.utc),
)
defpoll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> GenerateDocumentsOutput:
start_datetime = datetime.fromtimestamp(start, tz=timezone.utc)
end_datetime = datetime.fromtimestamp(end, tz=timezone.utc)
for batch inself._yield_blob_objects(start_datetime, end_datetime):
yield batch
defvalidate_connector_settings(self) -> None:
...可以看到:
对于只能分页读取或需要“断点续扫”的系统,推荐使用 checkpoint 抽象来管理游标状态。这里的 checkpoint 可以理解为“本轮同步结束时的游标快照”,通常包含:
典型的使用场景包括:
在这类场景下,每次调用 load_from_checkpoint 都会:
当前在 RAGFlow 中主要有两种典型用法,也对应了“简单 connector”与“复杂 connector”的典型划分:
对于“相对简单”的 connector(只需要按时间和分页遍历内容,不关心权限,也不需要把复杂失败信息编码进 checkpoint),通常实现 CheckpointedConnector 或甚至只实现 LoadConnector / PollConnector 即可,例如 Confluence 内容拉取或 S3 这类存储源。这类 connector 的关注点是“把所有需要索引的对象可靠地遍历一遍”。
接口定义如下:
class CheckpointedConnector(BaseConnector[CT]):
@abc.abstractmethod
defload_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: CT,
) -> CheckpointOutput[CT]:
...
@abc.abstractmethod
defbuild_dummy_checkpoint(self) -> CT:
...
@abc.abstractmethod
defvalidate_checkpoint_json(self, checkpoint_json: str) -> CT:
...Confluence 的实现只关注“遍历内容”,不在 checkpoint 中携带权限信息:
class ConfluenceCheckpoint(ConnectorCheckpoint):
next_page_url: str | None
classConfluenceConnector(
CheckpointedConnector[ConfluenceCheckpoint],
SlimConnector,
SlimConnectorWithPermSync,
CredentialsConnector,
):
defload_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: ConfluenceCheckpoint,
) -> CheckpointOutput[ConfluenceCheckpoint]:
end += ONE_DAY # handle time zone weirdness
try:
returnself._fetch_document_batches(checkpoint, start, end)
except Exception as e:
...
defbuild_dummy_checkpoint(self) -> ConfluenceCheckpoint:
return ConfluenceCheckpoint(has_more=True, next_page_url=None)
defvalidate_checkpoint_json(self, checkpoint_json: str) -> ConfluenceCheckpoint:
return ConfluenceCheckpoint.model_validate_json(checkpoint_json)配合工具函数可以一次性加载或增量加载文档:
for doc in load_all_docs_from_checkpoint_connector(
connector=confluence_connector,
start=start,
end=end,
):
print(doc)对于“相对复杂”的 connector(需要结合权限、失败记录、外部系统特有游标等信息),更推荐实现 CheckpointedConnectorWithPermSync,并在 checkpoint 中显式记录分页游标、剩余状态等,例如 Jira,Google Drive,Slack,Teams。这类 connector 的关注点除了内容本身,还包括“谁能看到什么”和“哪些对象在某一轮中失败了,需要后续重试”。
接口如下:
class CheckpointedConnectorWithPermSync(ABC):
@abstractmethod
defload_from_checkpoint(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: ConnectorCheckpoint,
) -> Generator[Document | ConnectorFailure, None, ConnectorCheckpoint]:
...
@abstractmethod
defload_from_checkpoint_with_perm_sync(
self,
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
checkpoint: ConnectorCheckpoint,
) -> Generator[Document | ConnectorFailure, None, ConnectorCheckpoint]:
...
@abstractmethod
defbuild_dummy_checkpoint(self) -> ConnectorCheckpoint:
...
@abstractmethod
defvalidate_checkpoint_json(self, checkpoint_json: str) -> ConnectorCheckpoint:
...Jira 同时需要内容与权限/元数据同步,因此实现了带权限的 checkpoint 接口,并使用专门的 checkpoint 类型来跟踪分页状态:
class JiraCheckpoint(ConnectorCheckpoint):
"""Checkpoint that tracks which slice of the current JQL result set was emitted."""
start_at: int = 0
cursor: str | None = None
ids_done: bool = False
all_issue_ids: list[list[str]]
class JiraConnector(CheckpointedConnectorWithPermSync):
...在调度侧,rag/svr/sync_data_source.py 显式编写 checkpoint 循环,利用 CheckpointOutputWrapper 统一处理成功与失败:
def document_batches():
checkpoint = self.connector.build_dummy_checkpoint()
pending_docs = []
...
while checkpoint.has_more:
wrapper = CheckpointOutputWrapper()
generator = wrapper(
self.connector.load_from_checkpoint(
start_time,
end_time,
checkpoint,
)
)
for document, failure, next_checkpoint in generator:
if failure isnotNone:
continue
if document isnotNone:
pending_docs.append(document)
iflen(pending_docs) >= batch_size:
yield pending_docs
pending_docs = []
if next_checkpoint isnotNone:
checkpoint = next_checkpoint
...
if pending_docs:
yield pending_docs对贡献者的建议:
实现流程
最小交付要求如下:
通过以上六步即可得到一个结构完整、可被调度器识别的最小实现。
接入 SyncBase 示例
以下示例展示了如何在 SyncBase._generate 中接入不同类型的 connector,并从调度逻辑到批量产出 Document 的完整链路,可作为实现接入步骤的参考。
SyncBase 中的调度逻辑
class S3(SyncBase):
SOURCE_NAME: str = FileSource.S3
asyncdef_generate(self, task: dict):
self.connector = BlobStorageConnector(
bucket_type=self.conf.get("bucket_type", "s3"),
bucket_name=self.conf["bucket_name"],
prefix=self.conf.get("prefix", "")
)
self.connector.load_credentials(self.conf["credentials"])
document_batch_generator = (
self.connector.load_from_state()
if task["reindex"] == "1"ornot task["poll_range_start"]
elseself.connector.poll_source(
task["poll_range_start"].timestamp(),
datetime.now(timezone.utc).timestamp()
)
)
begin_info = "totally"if task["reindex"] == "1"ornot task["poll_range_start"] elsef"from {task['poll_range_start']}"
logging.info(
f"Connect to S3: {self.conf['bucket_name']}/"
f"{self.conf.get('prefix', '')} {begin_info}"
)
return document_batch_generatorConnector 逻辑
class BlobStorageConnector(LoadConnector, PollConnector):
def__init__(self, bucket_type: str, bucket_name: str, prefix: str = "", batch_size: int = INDEX_BATCH_SIZE, european_residency: bool = False) -> None:
self.bucket_type: BlobType = BlobType(bucket_type)
self.bucket_name = bucket_name.strip()
self.prefix = prefix ifnot prefix or prefix.endswith("/") else prefix + "/"
self.batch_size = batch_size
self.s3_client: Optional[Any] = None
defload_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
ifself.bucket_type == BlobType.S3:
authentication_method = credentials.get("authentication_method", "access_key")
if authentication_method == "access_key":
ifnotall(credentials.get(key) for key in ["aws_access_key_id", "aws_secret_access_key"]):
raise ConnectorMissingCredentialError("Amazon S3")
elif authentication_method == "iam_role":
ifnot credentials.get("aws_role_arn"):
raise ConnectorMissingCredentialError("Amazon S3 IAM role ARN is required")
...
self.s3_client = create_s3_client(self.bucket_type, credentials, self.european_residency)
returnNone
def_yield_blob_objects(self, start: datetime, end: datetime) -> GenerateDocumentsOutput:
paginator = self.s3_client.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=self.bucket_name, Prefix=self.prefix)
batch: list[Document] = []
for page in pages:
for obj in page.get("Contents", []):
...
file_name = os.path.basename(obj["Key"])
blob = download_object(self.s3_client, self.bucket_name, obj["Key"], self.size_threshold)
if blob isNone:
continue
batch.append(Document(
id=f"{self.bucket_type}:{self.bucket_name}:{obj['Key']}",
blob=blob,
source=DocumentSource(self.bucket_type.value),
semantic_identifier=file_name,
extension=get_file_ext(file_name),
doc_updated_at=last_modified,
size_bytes=extract_size_bytes(obj) or0
))
iflen(batch) == self.batch_size:
yield batch
if batch:
yield batch实现要点
交付前检查清单
完成上述检查后,即可提交 PR。期待你的贡献帮助 RAGFlow 覆盖更多企业数据源,与社区共同构建稳健的知识底座。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-11-20
多源 RAG 自动化处理:从 0 到 1 构建事件驱动的实时 RAG 应用
2025-11-20
再谈RAG的文档解析——文档解析的难点在哪里?
2025-11-19
为什么RDF是AI系统的“天然知识层”?
2025-11-16
Google 让 RAG 变得前所未有地简单:全新 File Search 工具震撼登场
2025-11-14
从答案到洞察:Structured RAG正在重塑企业知识库的底层逻辑
2025-11-13
RAG Chunking 2.0:提升文档分块效果的一些经验
2025-11-13
RAGFlow v0.22.0 发布:数据源同步、变量聚合、全新管理界面与多项重大更新
2025-11-13
RAG实战(一):Simple RAG篇
2025-09-15
2025-09-02
2025-08-25
2025-08-25
2025-08-25
2025-09-08
2025-09-03
2025-08-28
2025-09-10
2025-09-10
2025-11-20
2025-11-19
2025-11-04
2025-10-04
2025-09-30
2025-09-10
2025-09-10
2025-09-03