免费POC, 零成本试错
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


我要投稿

如何为 RAGFlow 添加新的数据源

发布日期:2025-11-20 11:46:33 浏览次数: 1529
作者:InfiniFlow

微信搜一搜,关注“InfiniFlow”

推荐语

为RAGFlow扩展数据源?这份指南详细解析了如何通过四类核心接口实现异构系统的无缝接入。

核心内容:
1. RAGFlow connector框架的四大核心接口设计
2. 简单与复杂数据源的差异化实现方案
3. 从体系架构到具体实现的完整接入流程

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家


本指南面向希望为 RAGFlow 扩展数据源能力的社区开发者,旨在以专业、可复用的流程说明如何实现并接入新的 connector。RAGFlow 的 connector 框架深受 Onyx 开源项目启发,特此致谢。

在实际使用中,除了从本地文件系统导入文件,RAGFlow 需要从大量异构系统中获取数据。


为此,引入了统一的 datasource / connector 组件:每一种外部数据源通过一个 connector 完成“认证、拉取文档、增量更新以及按 checkpoint 续传 等一系列动作,从而让上层检索与问答逻辑完全解耦于底层数据源。


围绕这一目标,RAGFlow 暴露了四个主要接口,覆盖了从简单到复杂的大部分场景:

  • LoadConnector:定义全量同步(full sync,load_from_state)的行为,用于重建或恢复完整数据视图。
  • PollConnector:定义增量同步(incremental sync,poll_source)的行为,用于在已有数据基础上只拉取变化。
  • CheckpointedConnector:在需要分页或“断点续传”时,通过 load_from_checkpoint + ConnectorCheckpoint 显式管理游标与进度。
  • CheckpointedConnectorWithPermSync:在需要同时考虑内容与权限(permission)时,在 checkpoint 流中附带权限与失败信息,支持更精细的同步与重试策略。


对于“结构简单”的数据源(例如对象存储、单一 API 拉取),通常只需实现 LoadConnector 和 PollConnector 即可完成接入;


对于需要分页、断点续扫或权限同步的“复杂数据源”(例如 Confluence,Jira,Google Drive),可以进一步实现 CheckpointedConnector 或 CheckpointedConnectorWithPermSync


只要按照本文说明实现并接好这四类接口,就可以将你的数据源无缝接入 RAGFlow。


本文共有五个部分:体系概览、核心抽象接口、实现流程、接入 SyncBase 的示例解析,以及一个交付检查清单。



体系概览

  1. 1. 目录结构:各类数据源代码位于 common/data_source/,复杂源(如 Jira)会拆分为子包。公共工具位于 common/data_source/utils.py

  2. 2. 调度入口rag/svr/sync_data_source.py 通过 Trio 协程调度同步任务,并根据 FileSource 选择具体实现。

  3. 3. 常量定义common/constants.py 中的 FileSource 与 common/data_source/config.py 中的 DocumentSource 分别决定任务标识和文档来源标签。新增数据源时必须同步更新。


整体结构可以抽象为三层:

  • Sync 层:rag/svr/sync_data_source.py 中的 SyncBase 及其子类(例如 S3Jira),负责根据任务配置决定何时调用 load_from_state / poll_source / load_from_checkpoint,并将批次写入知识库。

  • Connector 实现层:common/data_source/ 下具体的 connector 类(例如 BlobStorageConnectorJiraConnector),实现 LoadConnectorPollConnector 和/或 CheckpointedConnector(WithPermSync) 接口,专注于“如何从外部系统取数并组装为 Document”。

  • 基础服务层:SyncLogsService 与 KnowledgebaseService,分别负责同步日志统计与文档入库。



从贡献者视角,可以简单地理解为:

  • 在 connector 层实现好“如何产生 Document”(含凭证、分页、checkpoint 等细节);
  • 在 Sync 层实现好“何时调用 connector、如何把批次交给基础服务”,并按照第 3 节的流程将新 connector 挂接进整个体系。


SyncBase 是调度流程的核心。所有数据源的执行逻辑都会在 __call__ 中被统一处理。一般不需要被修改


SyncBase 负责统一的批量写入、日志与 checkpoint 更新,而_generate() 由各数据源实现,负责返回 Iterable[list[Document]]

class SyncBase:
    SOURCE_NAME: str = None

    async def __call__(self, task: dict):
        ...
        
    async def _generate(self, task: dict):
        raise NotImplementedError



核心抽象接口

文档模型

所有 connector 必须产出 Documentdoc_updated_at 必须是 UTC 时间,以保证增量同步精度。


class Document(BaseModel):
    idstr
    source: str
    semantic_identifier: str
    extension: str
    blob: bytes
    doc_updated_at: datetime
    size_bytes: int
  • id:在同一数据源内必须唯一,并且在多次同步中尽量保持不变。常见形式是 "source_type:business_key"
  • semantic_identifier:尽量包含标题、作者或位置等关键信息,方便检索。


同步接口:全量同步(Load)与增量同步(Poll)


  • 全量同步(Load,load_from_state):
  • 用于重建当前 connector 下的“完整视图”,通常在任务 reindex == "1" 或首次同步时触发。实现上,一般会从“时间起点”(如 1970-01-01)扫到当前时间,将符合条件的所有文档分批 yield list[Document]。在这一模式下,connector 不需要关心上一次同步到哪里,只需要确保能完整遍历外部系统中应被索引的对象。
class LoadConnector(ABC):
    @abstractmethod
    def load_credentials(self, credentials: Dict[strAny]) -> Dict[strAny] | None: ...

    @abstractmethod
    def load_from_state(self) -> Generator[list[Document], NoneNone]: 
        """load all documents up to now"""
        ...

    @abstractmethod
    def validate_connector_settings(self) -> None: ...


  • 增量同步(Poll,poll_source):
  • 用于在已有数据基础上“只拉变化”,避免重复处理历史内容。系统会为每次任务提供一个时间窗口 (start, end](例如从上一次成功同步的时间到现在),connector 应当只返回在该窗口内新增或更新过的文档。实现时,应严格遵守 (start, end] 条件,并依赖 Document.doc_updated_at 的准确性,防止遗漏或重复。
class PollConnector(ABC):
    @abstractmethod
    def poll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> Generator[list[Document], NoneNone]: 
        """load documents from start to end"""
        ...


下面的代码展示了 S3 对应的具体实现:

class S3(SyncBase):
    SOURCE_NAME: str = FileSource.S3

    asyncdef_generate(self, task: dict):
        self.connector = BlobStorageConnector(
            bucket_type=self.conf.get("bucket_type""s3"),
            bucket_name=self.conf["bucket_name"],
            prefix=self.conf.get("prefix""")
        )
        self.connector.load_credentials(self.conf["credentials"])
        document_batch_generator = (
            self.connector.load_from_state()
            if task["reindex"] == "1"ornot task["poll_range_start"]
            elseself.connector.poll_source(
                task["poll_range_start"].timestamp(),
                datetime.now(timezone.utc).timestamp()
            )
        )
        return document_batch_generator


对应的 connector 的实现:

class BlobStorageConnector(LoadConnector, PollConnector):
    defload_from_state(self) -> GenerateDocumentsOutput:
        returnself._yield_blob_objects(
            start=datetime(197011, tzinfo=timezone.utc),
            end=datetime.now(timezone.utc),
        )

    defpoll_source(self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch) -> GenerateDocumentsOutput:
        start_datetime = datetime.fromtimestamp(start, tz=timezone.utc)
        end_datetime = datetime.fromtimestamp(end, tz=timezone.utc)
        for batch inself._yield_blob_objects(start_datetime, end_datetime):
            yield batch

    defvalidate_connector_settings(self) -> None:
        ...


可以看到:

  • 调度层(S3._generate)只关心“全量 vs 增量”的切换。
  • connector 层通过统一的 _yield_blob_objects 分别实现“从起点到当前”的一次性加载与“窗口内”的滚动拉取。


Checkpoint 支持

对于只能分页读取或需要“断点续扫”的系统,推荐使用 checkpoint 抽象来管理游标状态。这里的 checkpoint 可以理解为“本轮同步结束时的游标快照”,通常包含:

  • 当前分页位置或 offset(例如某一页的起始索引)
  • 已处理 ID 集合或光标(cursor),用来避免重复拉取
  • 是否还有剩余数据的标记(如 has_more


典型的使用场景包括:

  • 外部 API 只支持分页访问,且每页大小有限(如 Jira、Confluence、Google Drive 等)
  • 同步过程可能被中断,需要从上一次完成的位置继续,而不是从头重跑
  • 希望按较小的批次反复调用 connector,每次只处理一部分数据


在这类场景下,每次调用 load_from_checkpoint 都会:

  1. 1. 于传入的 checkpoint(上一轮的游标快照)决定本轮要读取的数据范围
  2. 2. 逐个返回 Document 或 ConnectorFailure
  3. 3. 在生成器结束时返回一个“新的 checkpoint”,供下一轮调用使用


当前在 RAGFlow 中主要有两种典型用法,也对应了“简单 connector”与“复杂 connector”的典型划分:


相对简单的CheckpointedConnector

以 Confluence 为例

对于“相对简单”的 connector(只需要按时间和分页遍历内容,不关心权限,也不需要把复杂失败信息编码进 checkpoint),通常实现 CheckpointedConnector 或甚至只实现 LoadConnector / PollConnector 即可,例如 Confluence 内容拉取或 S3 这类存储源。这类 connector 的关注点是“把所有需要索引的对象可靠地遍历一遍”。


接口定义如下:

class CheckpointedConnector(BaseConnector[CT]):
    @abc.abstractmethod
    defload_from_checkpoint(
        self,
        start: SecondsSinceUnixEpoch,
        end: SecondsSinceUnixEpoch,
        checkpoint: CT,
    ) -> CheckpointOutput[CT]:
        ...

    @abc.abstractmethod
    defbuild_dummy_checkpoint(self) -> CT:
        ...

    @abc.abstractmethod
    defvalidate_checkpoint_json(self, checkpoint_json: str) -> CT:
        ...


Confluence 的实现只关注“遍历内容”,不在 checkpoint 中携带权限信息:

class ConfluenceCheckpoint(ConnectorCheckpoint):
    next_page_url: str | None

classConfluenceConnector(
    CheckpointedConnector[ConfluenceCheckpoint],
    SlimConnector,
    SlimConnectorWithPermSync,
    CredentialsConnector,
):
    defload_from_checkpoint(
        self,
        start: SecondsSinceUnixEpoch,
        end: SecondsSinceUnixEpoch,
        checkpoint: ConfluenceCheckpoint,
    ) -> CheckpointOutput[ConfluenceCheckpoint]:
        end += ONE_DAY  # handle time zone weirdness
        try:
            returnself._fetch_document_batches(checkpoint, start, end)
        except Exception as e:
            ...

    defbuild_dummy_checkpoint(self) -> ConfluenceCheckpoint:
        return ConfluenceCheckpoint(has_more=True, next_page_url=None)

    defvalidate_checkpoint_json(self, checkpoint_json: str) -> ConfluenceCheckpoint:
        return ConfluenceCheckpoint.model_validate_json(checkpoint_json)


配合工具函数可以一次性加载或增量加载文档:

for doc in load_all_docs_from_checkpoint_connector(
    connector=confluence_connector,
    start=start,
    end=end,
):
    print(doc)


相对复杂的CheckpointedConnectorWithPermSync

以 Jira 为例

对于“相对复杂”的 connector(需要结合权限、失败记录、外部系统特有游标等信息),更推荐实现 CheckpointedConnectorWithPermSync,并在 checkpoint 中显式记录分页游标、剩余状态等,例如 Jira,Google Drive,Slack,Teams。这类 connector 的关注点除了内容本身,还包括“谁能看到什么”和“哪些对象在某一轮中失败了,需要后续重试”。


接口如下:

class CheckpointedConnectorWithPermSync(ABC):
    @abstractmethod
    defload_from_checkpoint(
        self,
        start: SecondsSinceUnixEpoch,
        end: SecondsSinceUnixEpoch,
        checkpoint: ConnectorCheckpoint,
    ) -> Generator[Document | ConnectorFailure, None, ConnectorCheckpoint]:
        ...

    @abstractmethod
    defload_from_checkpoint_with_perm_sync(
        self,
        start: SecondsSinceUnixEpoch,
        end: SecondsSinceUnixEpoch,
        checkpoint: ConnectorCheckpoint,
    ) -> Generator[Document | ConnectorFailure, None, ConnectorCheckpoint]:
        ...

    @abstractmethod
    defbuild_dummy_checkpoint(self) -> ConnectorCheckpoint:
        ...

    @abstractmethod
    defvalidate_checkpoint_json(self, checkpoint_json: str) -> ConnectorCheckpoint:
        ...


Jira 同时需要内容与权限/元数据同步,因此实现了带权限的 checkpoint 接口,并使用专门的 checkpoint 类型来跟踪分页状态:

class JiraCheckpoint(ConnectorCheckpoint):
    """Checkpoint that tracks which slice of the current JQL result set was emitted."""

    start_at: int = 0
    cursor: str | None = None
    ids_done: bool = False
    all_issue_ids: list[list[str]]


class JiraConnector(CheckpointedConnectorWithPermSync):
    ...


在调度侧,rag/svr/sync_data_source.py 显式编写 checkpoint 循环,利用 CheckpointOutputWrapper 统一处理成功与失败:

def document_batches():
    checkpoint = self.connector.build_dummy_checkpoint()
    pending_docs = []
    ...
    while checkpoint.has_more:
        wrapper = CheckpointOutputWrapper()
        generator = wrapper(
            self.connector.load_from_checkpoint(
                start_time,
                end_time,
                checkpoint,
            )
        )
        for document, failure, next_checkpoint in generator:
            if failure isnotNone:
                continue
            if document isnotNone:
                pending_docs.append(document)
                iflen(pending_docs) >= batch_size:
                    yield pending_docs
                    pending_docs = []
            if next_checkpoint isnotNone:
                checkpoint = next_checkpoint
    ...
    if pending_docs:
        yield pending_docs


对贡献者的建议:

  • 仅内容同步时,可参考 Confluence,实现 CheckpointedConnector 并复用 load_all_docs_from_checkpoint_connector
    参考https://github.com/infiniflow/ragflow/issues/11376

  • 内容与权限同步或复杂错误策略时,可参考 Jira,实现 CheckpointedConnectorWithPermSync 并在 sync_data_source.py 中编写借助CheckpointOutputWrapper显式编写 checkpoint 循环。
    参考https://github.com/infiniflow/ragflow/blob/bd4bc57009fe2990b3be1000564a4d5559477cfc/rag/svr/sync_data_source.py#L351



实现流程

最小交付要求如下:

通过以上六步即可得到一个结构完整、可被调度器识别的最小实现。




接入 SyncBase 示例 

以下示例展示了如何在 SyncBase._generate 中接入不同类型的 connector,并从调度逻辑到批量产出 Document 的完整链路,可作为实现接入步骤的参考。


S3 Blob Storage 示例

SyncBase 中的调度逻辑

class S3(SyncBase):
    SOURCE_NAME: str = FileSource.S3

    asyncdef_generate(self, task: dict):
        self.connector = BlobStorageConnector(
            bucket_type=self.conf.get("bucket_type""s3"),
            bucket_name=self.conf["bucket_name"],
            prefix=self.conf.get("prefix""")
        )
        self.connector.load_credentials(self.conf["credentials"])
        document_batch_generator = (
            self.connector.load_from_state()
            if task["reindex"] == "1"ornot task["poll_range_start"]
            elseself.connector.poll_source(
                task["poll_range_start"].timestamp(),
                datetime.now(timezone.utc).timestamp()
            )
        )
        begin_info = "totally"if task["reindex"] == "1"ornot task["poll_range_start"elsef"from {task['poll_range_start']}"
        logging.info(
            f"Connect to S3: {self.conf['bucket_name']}/"
            f"{self.conf.get('prefix''')} {begin_info}"
        )
        return document_batch_generator


Connector 逻辑

class BlobStorageConnector(LoadConnector, PollConnector):
    def__init__(self, bucket_type: str, bucket_name: str, prefix: str = "", batch_size: int = INDEX_BATCH_SIZE, european_residency: bool = False) -> None:
        self.bucket_type: BlobType = BlobType(bucket_type)
        self.bucket_name = bucket_name.strip()
        self.prefix = prefix ifnot prefix or prefix.endswith("/"else prefix + "/"
        self.batch_size = batch_size
        self.s3_client: Optional[Any] = None

    defload_credentials(self, credentials: dict[strAny]) -> dict[strAny] | None:
        ifself.bucket_type == BlobType.S3:
            authentication_method = credentials.get("authentication_method""access_key")
            if authentication_method == "access_key":
                ifnotall(credentials.get(key) for key in ["aws_access_key_id""aws_secret_access_key"]):
                    raise ConnectorMissingCredentialError("Amazon S3")
            elif authentication_method == "iam_role":
                ifnot credentials.get("aws_role_arn"):
                    raise ConnectorMissingCredentialError("Amazon S3 IAM role ARN is required")
        ...
        self.s3_client = create_s3_client(self.bucket_type, credentials, self.european_residency)
        returnNone

    def_yield_blob_objects(self, start: datetime, end: datetime) -> GenerateDocumentsOutput:
        paginator = self.s3_client.get_paginator("list_objects_v2")
        pages = paginator.paginate(Bucket=self.bucket_name, Prefix=self.prefix)
        batch: list[Document] = []
        for page in pages:
            for obj in page.get("Contents", []):
                ...
                file_name = os.path.basename(obj["Key"])
                blob = download_object(self.s3_client, self.bucket_name, obj["Key"], self.size_threshold)
                if blob isNone:
                    continue
                batch.append(Document(
                    id=f"{self.bucket_type}:{self.bucket_name}:{obj['Key']}",
                    blob=blob,
                    source=DocumentSource(self.bucket_type.value),
                    semantic_identifier=file_name,
                    extension=get_file_ext(file_name),
                    doc_updated_at=last_modified,
                    size_bytes=extract_size_bytes(obj) or0
                ))
                iflen(batch) == self.batch_size:
                    yield batch
        if batch:
            yield batch


实现要点

  1. 1. SyncBase 层只负责根据任务配置在“全量/增量”之间切换,实际抓取与分页逻辑全部封装在 connector 内。
  2. 2. Document.id 使用 bucket_type:bucket:key 组合,天然避免重复。
  3. 3. _yield_blob_objects 同时服务于 load_from_state 与 poll_source,可确保两种模式行为一致。



交付前检查清单 

  1. 1. FileSource 与 DocumentSource 已添加对应条目。
  2. 2. load_credentials() 校验了所有必需字段,如涉及 OAuth 亦支持凭证刷新与持久化。
  3. 3. load_from_state() 与 poll_source() 在适用时共享批处理实现,或在行为不完全一致时有清晰的设计理由和文档说明。
  4. 4. 生成的 Document 填写了 idsemantic_identifierextensiondoc_updated_at(UTC)及 size_bytes
  5. 5. SyncBase 子类根据 task["reindex"] 正确切换同步模式,并输出明确日志。
  6. 6. 涉及 checkpoint/分页的实现具备终止条件或迭代上限,避免无限循环。
  7. 7. 已完成至少一次全量同步与一次增量同步的本地验证,并在 PR 中注明参考 Onyx 的实现要点。


完成上述检查后,即可提交 PR。期待你的贡献帮助 RAGFlow 覆盖更多企业数据源,与社区共同构建稳健的知识底座。

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询