支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


当智能体遇上GraphRAG:构建下一代动态路由知识图谱问答系统

发布日期:2025-06-17 09:06:19 浏览次数: 1538
作者:活水智能

微信搜一搜,关注“活水智能”

推荐语

智能体与GraphRAG的完美结合,打造动态知识图谱问答新范式。

核心内容:
1. 人工参与的多智能体系统构建知识图谱
2. 智能体驱动的GraphRAG实现动态策略选择
3. 基于CORD-19数据集的实践应用与验证

杨芳贤
53A创始人/腾讯云(TVP)最具价值专家


 

在我之前的文章中,我探讨了如何利用基于智能体(Agent)的方法来模块化文档到知识图谱(KG)的构建流程。我们可以将这种方法进一步拓展,不仅利用大型语言模型(LLMs)进行信息抽取,还能让它们参与到本体(Ontology)的设计中。

这开启了构建一个真正端到端框架的可能性:LLMs 辅助定义本体,然后直接从文本中提取信息并构建知识图谱。但同时我也意识到,构建一个有效的本体并非易事,它不仅取决于领域的复杂性,还与知识图谱的使用目的(例如推荐、问答、探索等)紧密相关。

因此,我没有赋予系统完全自主权,而是引入了一种融合“人在环路”(Human-in-the-loop)技术的工作流程。这使得人类能够进行最终决策,从而确保本体设计与用户的目的相符。

我也注意到,使用知识图谱进行问答(QA)比传统 RAG 方法更复杂,也更具挑战性。在本文中,我将介绍如何在统一的智能体驱动框架内实现 GraphRAG,并通过集成路由机制动态选择最佳策略。通过将基于智能体(Agent-based)的设计与路由相结合,我们可以为每个问题动态选择最适合的策略。

总而言之,本文主要包含两大部分:

  • • 第一部分:用于知识图谱构建的人工参与的多智能体系统
  • • 第二部分:智能体驱动的 GraphRAG

数据集

本次,我将使用 CORD-19(https://github.com/allenai/cord19?tab=readme-ov-file) 数据集,这是一个关于 COVID-19 及相关冠状病毒研究的学术论文语料库,用于构建知识图谱。由于资源限制,我随机抽取了 1,000 篇论文,并提取了它们的摘要,将每篇摘要保存为一个独立的 .txt 文件。希望尽管数据量有限,我们仍能从中发现有价值的信息。

第一部分:用于从文本构建知识图谱的人工参与的多智能体系统

1. 探讨与概念澄清

在深入探讨系统设计之前,我想先澄清几个关键概念。我研究了 Neo4j,他们将知识图谱分为两种类型:文档结构图(Document Structure Graphs) 和 领域图(Domain Graphs)

来源:Neo4j

我在之前的文章中所做的是这两种类型的结合。实际上,这样做是可行的,并且适用于大多数情况。而选择哪种类型,很大程度上取决于知识图谱的具体用途。领域图虽然缺乏上下文信息,但它非常擅长解决聚合(Aggregation)查询。聚合查询就像是把分散在不同地方(文档段落)的关于同一个主题(例如药物 B 的症状)的信息汇集到一起。例如:针对药物 B的症状,有多篇研究资源进行了探讨。通过使用领域图,我们可以聚合这些信息,并通过一个名为药物 B的共同实体将那些分散的段落关联起来。所以当用户询问:药物 B 有什么症状?我们可以通过类似这样的查询来获取答案:

MATCH (a)-[:HAS_SYMPTOM]->(b) WHERE a.name = "drug B" RETURN b

这也解决了 RAG 的一个局限性:RAG 通常需要截取 top-k 个相关段落,但有时我们需要的信息不止于此。使用图查询,我们则不需要使用“top-k”参数。然而,为了提供完整且有依据的答案,我们仍然需要将子图查询结果追溯到其原始文本段落,并将这些段落输入 LLMs。这时,文档结构图便能发挥作用。

接下来,我们进一步探讨实体间的**关系(Relationship)**类型,我将其分为两类:

  • • 开放关系(Open Relationship):这类关系没有预设的约束,关系类型也不是预先定义好的,通常以自然语言的形式表达。
  • • 封闭关系(Closed Relationship):这类关系则从一个固定的、预定义好的类型集合中选取。

选择哪种关系类型取决于下游任务。封闭关系适用于许多机器学习问题,如链接预测、推荐、探索等。而当实体间的关系过于复杂、灵活甚至未知,无法用一个预定义的关系列表来表达时,则会使用开放关系。

2. 实践方法

在本次工作中,我将继续结合领域图和文档结构图,并采用封闭关系类型。

具体来说,我们将涵盖医学(Medical)领域,重点关注药物(DRUG)、疾病(DISEASE)、症状(SYMPTOM)这三类实体以及治疗(TREATS)、引起(CAUSES)、有副作用(HAS_SIDE_EFFECT)、有症状(HAS_SYMPTOM)这些关系。我们还将介绍如何确保提取出的知识图谱满足本体的约束条件,例如,HAS_SIDE_EFFECT关系仅存在于药物(DRUG)症状(SYMPTOM)实体之间,从而使知识图谱更加可靠。

这次我选择了文档解析方法(适合处理短文档),而不是分块方法,来生成文档结构图。我相信将分块选项添加到框架中是可行的,但本文中不做展示。

回顾上一篇文章,我已经在 models/schema.py 中定义了本体模式(Ontology Schema)。现在我们无需再手动定义它们了。

from typing importListLiteralOptionalTypeAny, get_args, get_origin, Union
from pydantic import BaseModel, Field
from dataclasses import dataclass

classMention(BaseModel):
    typeLiteral["LOCATION""TIME""PERSON""EVENT""ORGANIZATION"]
    string: str = Field(...,
                        description="命名实体(如地点、时间、人物或事件)在原文中出现的精确文本字符串。")


classRelation(BaseModel):
    head: str = Field(..., description="头部提及的实体。"# 根据LLM3建议修改描述
    tail: str = Field(..., description="尾部提及的实体。"# 根据LLM3建议修改描述
    relation_description: str = Field(...,
                                      description="对头部(head)和尾部(tail)提及实体之间关系的简要描述。")


classSection(BaseModel):
    title: str = Field(..., description="章节标题")
    summary: str = Field(...,
                         description="一份简要总结(150-300 字),突出本章节涵盖的关键点。")
    mentions: List[Mention] = Field(...,
                                    description="在本章节中找到的提及(实体)的完整列表。")
    relations: List[Relation]  = Field(...,
                                    description="在本章节中找到的提及之间的关系的完整列表。")


@dataclass
classDistilledUnit:
    title: str = Field(..., description="单元标题")
    summary: str = Field(..., description="对单元内容的简明总结,介于 150 至 300 字之间。")
    sections: List[Section] = Field(..., description="单元内容中包含的 `Section` 列表")

下面是我们的新工作流程图:

工作流程概览

我们设计了一个包含三个主要智能体以及若干辅助工具的系统,用于协助构建本体和模式:

本体初始化智能体(Ontology Initialization Agent)

该智能体负责分析数据,以确定领域知识,包括相关的实体类型关系类型。它利用了两个关键的辅助工具:

  • • 数据检索工具(Retrieve data):由于无法将所有数据都输入 LLMs,智能体将使用此工具随机选取样本进行分析。
  • • 搜索工具(Search Tool):一旦初步确定了领域知识,智能体可以使用此工具在线搜索该领域的常见实体和关系,以辅助最终决策。

重要的是,该智能体还支持人工反馈(Human Feedback),允许用户改进或调整本体,使其更好地符合他们的具体目标。这是“人在环路”的关键环节。

模式设计智能体(Schema Design Agent)

该智能体读取原始数据和前一个智能体生成的本体,以设计一个符合文档结构的模式(Schema)。例如,如果输入文档是学术论文,模式可能包含诸如 title(标题)、authors(作者)、publication date(发表日期)、sections(章节)、figures(图表)和 tables(表格)等属性。与本体智能体类似,该智能体也支持交互式反馈(Interactive Feedback),允许用户根据需要修改模式。这一步的输出是一个伪代码定义(Pseudo Code Definition),它概述了本体和文档结构。

模式生成智能体(Schema Generation Agent)

该智能体接收上一步生成的伪代码,并将其转换为JSON 模式(JSON Schema)。然后可以使用此模式通过 Pydantic 等库生成模型。这一步是自动化生成代码模式的关键。

实际上,步骤 2 和步骤 3 可以合并为一步,但我发现当代码生成智能体收到关于模式模型定义的更清晰输入时,其性能会更好。

人工参与(Human-in-the-loop)示例:

首先,我们定义 ontology_init_agent(本体初始化智能体)。以下代码片段展示了智能体的基本结构,包括名称、使用的模型、输出类型、指令以及可用的工具。

# agents/ontology_init_agent.py
from typing importListOptional
from metagpt.actions import Action
from metagpt.agents import Agent
from metagpt.config import Config
from metagpt.environment import Environment
# from metagpt.ext.stanford_corenlp.stanford_corenlp import StanfordCoreNLP # Original import commented out
from metagpt.logs import logger
from metagpt.tools import Tool
from metagpt.roles import Role
from metagpt.prompts.base import BasePrompt
from typing importAny# 用于占位符类型

# Assume these are defined elsewhere (假定这些在其他地方已定义)
instruct_model = Any# 占位符:指令模型
AgentName = Any# 占位符:智能体名称枚举
Ontology = Any# 占位符:本体模型
ONTOLOGY_INIT_PROMPT = ""# 占位符:本体初始化提示词
retrieve_data = Any# 占位符:数据检索工具函数
search = Any# 占位符:搜索工具函数

defcreate_ontology_init_agent() -> Agent:
    """创建本体初始化智能体"""# 添加中文注释
    agent = Agent(
        name=AgentName.ontology_init_agent.value,
        model=instruct_model,
        result_type=Ontology,
        instructions=ONTOLOGY_INIT_PROMPT,
        tools=[
            Tool(retrieve_data, takes_ctx=True), # 数据检索工具,需要上下文
            Tool(search, takes_ctx=False),      # 搜索工具
        ],
        retries=5# 失败重试次数
    )

    return agent

所以流程是一个循环:智能体生成本体 -> 将本体展示给用户 -> 征求用户确认。只有当用户同意设计时,循环才会终止。你可以简单地要求用户输入“agree”来中断循环,但这里我将使用一个“人工偏好智能体(Human Preference Agent)”来将用户意图解析成结构化对象,从而使对话更自然。以下是用于表示人工评审结果的 Pydantic 模型:

from pydantic import BaseModel, Field
from typing import Optional

class HumanReview(BaseModel):
    is_agreed: bool = Field(..., description="指示人工评审员是否同意生成的本体。")
    feedback: Optional[str] = Field(description="人工评审员关于本体的评论或建议。")

只有当 is_agreed 为 True 时,循环才会终止。

(可选)你可以使用一个“接口智能体(Interface Agent)”来与非技术用户沟通,用自然语言解释本体。以下代码展示了本体初始化节点的主要逻辑,包括与用户交互和调用智能体:

# nodes/ontology_init_node.py
import asyncio
import os
import glob
from typing importAny
from dataclasses import dataclass
from metagpt.actions import Action
from metagpt.agents import Agent
from metagpt.nodes import BaseNode
from metagpt.graph_trans import GraphRunContext, Dependency, End
from metagpt.utils.task_group import task_group_gather
from tqdm import tqdm

# Assume these are defined elsewhere (假定这些在其他地方已定义)
create_ontology_init_agent = Any# 占位符
create_interface_agent = Any# 占位符
create_human_preference_agent = Any# 占位符
select_sample_data = Any# 占位符
Prompt = Any# 占位符
Ontology = Any# 占位符
HumanReview = Any# 占位符
SchemaDesignNode = Any# 占位符

# 创建智能体实例
ontology_init_agent = create_ontology_init_agent()
interface_agent = create_interface_agent()
human_preference_agent = create_human_preference_agent()


@dataclass
classOntologyInitNode(BaseNode):
    data_dir: str# 数据目录路径

    asyncdefrun(self, ctx: GraphRunContext) -> SchemaDesignNode:
        """运行本体初始化节点"""# 添加中文注释

        messages = [] # 用于保存对话历史
        deps = Dependency(data_dir=self.data_dir) # 创建依赖对象,存储数据目录信息
        sample_data = select_sample_data(data_dir=self.data_dir) # 选取样本数据

        # 第一步:收集数据背景信息
        background = Prompt.ask(
            "能否请您详细介绍一下这些数据?\n"
            "它们是做什么用的,以及您打算如何使用它们?\n"
            "我询问这些是为了更好地理解数据背后的意图和目的,以便我们可以为知识图谱设计一个更合适的本体。"
        )

        # 进入人工评审循环
        whileTrue:
            # 第二步:运行本体初始化智能体,生成本体草案
            # 第一次运行时提供背景和样本数据,后续运行则依赖消息历史
            ontology_result = await ontology_init_agent.run(
                user_prompt=f"""
                **背景**: {background}
                **样本数据**: {sample_data}
                """
iflen(messages)==0elseNone,
                message_history=messages,
                deps=deps,
            )

            messages += ontology_result.new_messages() # 添加智能体响应到对话历史

            # 第三步:使用接口智能体解释本体草案给用户
            interface_response = await interface_agent.run(f"""{str(ontology_result.output)}""")
            messages += interface_response.new_messages() # 添加解释响应到对话历史

            print(interface_response.output) # 打印解释给用户

            # 第四步:获取用户反馈
            user_response =  input("请提供您的反馈或确认(输入 'agree' 同意):"# 提示用户输入

            # 第五步:使用人工偏好智能体解析用户意图
            human_review = await human_preference_agent.run(
                user_prompt=user_response,
                message_history=messages
            )

            review: HumanReview = human_review.output # 获取解析结果
            if review.is_agreed: # 如果用户同意,则跳出循环
                break
            else:
                messages += human_review.new_messages() # 如果不同意,添加反馈到历史,继续循环


        # 循环结束后,返回包含本体和数据目录的SchemaDesignNode
        return SchemaDesignNode(data_dir=self.data_dir, ontology=ontology_result.output)

我们对模式设计节点(Schema Design Node)采用同样的方式,该节点负责设计并生成 JSON 模式。这是一个输出示例:

{
  "$defs":{
    "EntityType":{
      "type":"string",
      "enum":[
        "PERSON","ORGANIZATION","DATE","MEDICATION","DISEASE",
        "LOCATION","STATISTICAL_MEASURE"
      ],
      "title":"实体类型"
    },
    "RelationType":{
      "type":"string",
      "enum":[
        "INFECTS","LOCALIZED_IN","TREATED_WITH","OCCURS_IN",
        "CAUSES","IS_PART_OF","IS_HOST_TO","IS_VARIANT_OF","IS_ASSOCIATED_WITH"
      ],
      "title":"关系类型"
    },
    "Mention":{
      "type":"object",
      "properties":{
        "type":{"$ref":"#/$defs/EntityType"},
        "text":{"type":"string","description":"提及的实体文本"}
      },
      "required":["type","text"]
    },
    "DocUnit":{
      "type":"object",
      "properties":{
        "text":{"type":"string","description":"单元文本内容"},
        "section_title":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"单元所属章节标题"},
        "mentions":{
          "anyOf":[
            {"type":"array","items":{"$ref":"#/$defs/Mention"}},
            {"type":"null"}
          ],
          "default":null,
          "description":"本单元中提取出的提及列表"
        },
        "relationships":{
          "anyOf":[
            {"type":"array","items":{"type":"object"}},
            {"type":"null"}
          ],
          "default":null,
          "description":"本单元中提取出的关系列表"
        }
      },
      "required":["text","section_title"]
    }
},
"type":"object",
"title":"文档",
"properties":{
    "title":{"type":"string","description":"文档标题"},
    "authors":{"type":"array","items":{"type":"string"},"description":"文档作者列表"},
    "abstract":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"文档摘要"},
    "units":{"type":"array","items":{"$ref":"#/$defs/DocUnit"},"description":"文档单元列表"},
    "source":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"文档来源"},
    "doc_type":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"文档类型"}
},
"required":["title","authors","units","abstract","source","doc_type"],

"RelationConstraints":{
      // 必须包含:关于每种关系类型可拥有的主语/宾语实体类型的约束
      "CAUSES":[
        {"subject_type":"DISEASE","object_type":"SYMPTOM"},
        {"subject_type":"DISEASE","object_type":"DISEASE"}
      ],
      "TREATS":[
        {"subject_type":"MEDICATION","object_type":"DISEASE"}
      ]
    }
}

为了确保当 LLMs 未能正确生成 JSON 时流程不会中断,我们可以添加一些后处理/验证和重新生成的循环。例如,为模式生成智能体添加一个输出验证器:

import re
import json
from typing importTypeAny
from metagpt.agents import Agent
from metagpt.utils.model_utils import create_model_from_schema
from metagpt.const import ModelRetry
from metagpt.graph_trans import RunContext # 假设 RunContext 存在
from pydantic import BaseModel
from typing import get_args, get_origin, Union# 导入 Union

# Assume these are defined elsewhere (假定这些在其他地方已定义)
instruct_model = Any# 占位符
AgentName = Any# 占位符
SCHEMA_GENERATION_PROMPT = ""# 占位符
extract_json_from_markdown = Any# 占位符 (用于从 Markdown 提取 JSON)

defcreate_schema_generation_agent() -> Agent:
    """创建模式生成智能体"""# 添加中文注释
    agent = Agent(
        name=AgentName.schema_generation_agent.value,
        model=instruct_model,
        instructions=SCHEMA_GENERATION_PROMPT,
        result_type=str# 智能体直接输出字符串形式的 JSON
        retries=5# 失败重试次数
    )

    # 添加输出验证器
    @agent.output_validator
    asyncdefvalidate_schema(_: RunContext, output: str) -> str:
        """验证生成的 JSON 模式是否有效且符合要求"""# 添加中文注释
        try:
            # 从可能的 Markdown 代码块中提取 JSON 字符串
            match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', output)
            ifnotmatch:
                 raise ModelRetry("输出不是有效的 JSON Markdown 代码块。"# 改进错误消息

            json_string = match.group(1).strip()
            schema =json.loads(json_string) # 解析 JSON

            # 基本模式结构验证
            if"$defs"notin schema:
                 raise ModelRetry("模式必须包含 '$defs' 部分。"# 改进错误消息
            if"type"notin schema or schema["type"] != "object":
                 raise ModelRetry("模式根节点必须是类型为 'object' 的对象。"# 改进错误消息


            # 验证关键模型的存在和结构
            try:
                doc_model = create_model_from_schema(schema,"Doc")
            except Exception as e:
                 raise ModelRetry(f"无法从模式创建 'Doc' 模型: {e}")

            try:
                doc_unit_model = create_model_from_schema(schema, "DocUnit")
            except Exception as e:
                 raise ModelRetry(f"无法从模式创建 'DocUnit' 模型: {e}")

            try:
                # 检查 RelationType 枚举定义
                create_model_from_schema(schema, "RelationType")
            except:
                raise ModelRetry("模式必须定义 `RelationType` (Enum) 模型。")

            # 检查 Doc 模型必须有 'units' 属性
            if'units'notin doc_model.model_fields:
                raise ModelRetry("模型 'Doc' 必须包含属性 'units'。")

            # 检查 DocUnit 模型必须有可选的 'mentions' 属性
            mentions_field_info = doc_unit_model.model_fields.get("mentions")

            if mentions_field_info:
                # 检查是否是 Union 且包含 NoneType
                ifnot (get_origin(mentions_field_info.annotation) isUnionand
                        type(Nonein get_args(mentions_field_info.annotation)):
                    raise ModelRetry(f"模型 'DocUnit' 的属性 'mentions' 必须是可选的 (允许 null)。")
            else:
                 # 如果 'mentions' 属性缺失,也视为错误
                 raise ModelRetry(f"模型 'DocUnit' 必须包含属性 'mentions'。")


            # 检查并验证 RelationConstraints 部分
            if"RelationConstraints"notin schema:
                raise ModelRetry(f"模式必须包含 `RelationConstraints`。")
            else:
                relation_constraints = schema["RelationConstraints"]
                ifnotisinstance(relation_constraints, dict):
                    raise ModelRetry("`RelationConstraints` 必须是一个字典。"# 改进错误消息
                for key, value in relation_constraints.items():
                    ifnotisinstance(key, strornotisinstance(value, list):
                        raise ModelRetry("RelationConstraints 的键必须是字符串 (关系类型),值必须是列表。"# 改进错误消息
                    for constraint in value:
                         ifnotisinstance(constraint, dictor"subject_type"notin constraint or"object_type"notin constraint:
                              raise ModelRetry(f"RelationConstraints 中的每个约束必须是包含 'subject_type' 和 'object_type' 的字典。无效约束: {constraint}"# 改进错误消息


            # 验证通过,返回原始输出字符串
            return output
        except Exception as e:
            # 捕获所有验证中的异常,返回 ModelRetry
            raise ModelRetry(f"无效的 JSON 模式: {e}")

根据我的经验,使用单次提示(One Shot Prompting),qwen2.5 coder 或 gpt4o-mini 通常能在第一次尝试时就生成正确的结果。以下代码展示了模式设计节点如何使用这些智能体并集成人工反馈:

# nodes/schema_design_node.py
import asyncio
import os
import glob
from typing importAny
from dataclasses import dataclass
from metagpt.actions import Action
from metagpt.agents import Agent
from metagpt.nodes import BaseNode
from metagpt.graph_trans import GraphRunContext, Dependency, End
from metagpt.utils.task_group import task_group_gather
from tqdm import tqdm

# Assume these are defined elsewhere (假定这些在其他地方已定义)
create_schema_design_agent = Any# 占位符
create_interface_agent = Any# 占位符
create_human_preference_agent = Any# 占位符
select_sample_data = Any# 占位符
Ontology = Any# 占位符
HumanReview = Any# 占位符
InformationExtractionNode = Any# 占位符
create_schema_generation_agent = Any# 占位符
extract_json_from_markdown = Any# 占位符

# 创建智能体实例
schema_design_agent = create_schema_design_agent()
interface_agent = create_interface_agent()
human_preference_agent = create_human_preference_agent()
schema_generation_agent = create_schema_generation_agent() # 使用上面定义的带验证器的智能体


@dataclass
classSchemaDesignNode(BaseNode):
    data_dir: str# 数据目录路径
    ontology: Ontology # 本体信息

    asyncdefrun(self, ctx: GraphRunContext[None, Dependency]) -> InformationExtractionNode:
        """运行模式设计节点"""# 添加中文注释

        messages = [] # 用于保存对话历史
        # 第 1 步:选取数据样本
        sample_data: str = select_sample_data(self.data_dir)

        # 进入人工评审循环
        whileTrue:
            # 第 2 步:运行模式设计智能体,生成模式草案
            # 第一次运行时提供文档样本和本体信息,后续依赖消息历史
            schema_design_result = await schema_design_agent.run(
                user_prompt=f"文档样本:{sample_data}\n"
                            "\n=============\n"
                            f"本体:{str(self.ontology)}"iflen(messages) == 0elseNone,
                message_history=messages,
            )

            messages += schema_design_result.new_messages() # 添加智能体响应到对话历史

            # 第 3 步:使用接口智能体解释模式草案给用户
            # 移除思维标签,清理输出以便展示
            interface_response_result = await interface_agent.run(
                user_prompt=""
                            f" {str(schema_design_result.output).replace('<think>''').replace('</think>''')}"
            )

            print(interface_response_result.output) # 打印解释给用户

            # 第 4 步:获取用户反馈
            user_response = input("请提供您的反馈或确认(输入 'agree' 同意):"# 提示用户输入

            # 第 5 步:使用人工偏好智能体解析用户意图
            human_review = await human_preference_agent.run(
                user_prompt=user_response,
                message_history=messages
            )

            review: HumanReview = human_review.output # 获取解析结果

            if review.is_agreed:  # 如果用户同意,则跳出循环
                break
            else:
                messages += human_review.new_messages() # 如果不同意,添加反馈到历史,继续循环

        # 循环结束后,运行模式生成智能体,将伪代码转换为 JSON Schema
        schema_result = await schema_generation_agent.run(
            user_prompt=schema_design_result.output # 输入是模式设计的伪代码输出
        )
        schema = extract_json_from_markdown(schema_result.output) # 从带 Markdown 格式的输出中提取 JSON

        # 返回包含数据目录和最终模式的 InformationExtractionNode
        return InformationExtractionNode(data_dir=self.data_dir, schema=schema)

这就是模式设计(Schema Design)部分。

现在我们进入信息抽取(Information Extraction)阶段,这部分与上一篇文章中的做法非常相似。但由于我们没有预定义的 Mention 模型,数据模型只能在运行时(runtime)定义。因此,与其在智能体中显式定义输出模型,我们可以通过 Type[BaseModel] 对其进行抽象,并将 output_type 作为函数的参数传递。以下是用于提及检测智能体的定义:

from typing importListTypeAny
from pydantic import BaseModel
from metagpt.agents import Agent
from metagpt.graph_trans import RunContext # 假设 RunContext 存在

# Assume these are defined elsewhere (假定这些在其他地方已定义)
instruct_model = Any# 占位符
AgentName = Any# 占位符
MENTION_DETECTION_PROMPT = ""# 占位符

defcreate_mention_detection_agent(
  mention_model: Type[BaseModel], # 提及模型的类型
  entity_types: List[str]        # 需要检测的实体类型列表
) -> Agent:
    """创建提及检测智能体"""# 添加中文注释
    agent = Agent(
        name=AgentName.mention_detection_agent.value,
        model=instruct_model,
        system_prompt=MENTION_DETECTION_PROMPT.format(entity_types=entity_types), # 格式化系统提示词
        result_type=List[mention_model], # 预期结果是提及模型的列表
        retries=5# 失败重试次数
    )
    # 提及检测智能体此处没有额外的输出验证器

    return agent

类似地,我们创建 relation_extraction_agent(关系抽取智能体)。我也使用了 output_validation 来确保抽取出的关系符合本体的约束。这是为确保知识图谱质量设置的“质量控制关卡”。

# agent/relation_extraction_agent.py
import re
from typing importListTypeAnyDict# 导入 Dict
from pydantic import BaseModel
from metagpt.agents import Agent
from metagpt.const import ModelRetry
from metagpt.graph_trans import RunContext, Dependency # 假设 RunContext 和 Dependency 存在

# Assume these are defined elsewhere (假定这些在其他地方已定义)
instruct_model = Any# 占位符
AgentName = Any# 占位符
RELATION_EXTRACTION_PROMPT = ""# 占位符
build_dynamic_relation_model = Any# 占位符 (用于构建动态关系模型)

defcreate_relation_extraction_agent(relation_model: Type[BaseModel], mention_strings: List[str], relation_types: List[str], constraints: Dict[strList[Dict[strstr]]]) -> Agent:
    """创建关系抽取智能体"""# 添加中文注释
    agent = Agent(
        name=AgentName.relation_extraction_agent.value,
        model=instruct_model,
        system_prompt=RELATION_EXTRACTION_PROMPT.format(
            mention_strings=mention_strings,
            relation_types=relation_types,
            constraints=constraints), # 将约束传递给提示词
        result_type=List[relation_model], # 预期结果是关系模型的列表
        retries=5# 失败重试次数
    )

    # 添加输出验证器以检查关系是否符合本体约束
    @agent.output_validator
    asyncdefvalidate_relation_constraint(ctx: RunContext[Dependency], list_relations: List[Any]): # 接收 LLM 的原始输出列表
        """验证抽取出的关系是否符合本体约束"""# 添加中文注释
        import re

        # 这个正则表达式预期提及内容的格式为 <TYPE>text</TYPE>
        pattern = re.compile(r"<(?P<type>\w+)>(?P<text>.*?)</\1>"# 预编译正则表达式

        validated_relations = [] # 用于存放验证通过的关系对象
        for relation_dict in list_relations: # LLM 输出通常是字典列表
             # 检查是否为字典,并包含必要的键
            ifnotisinstance(relation_dict, dictor'subject'notin relation_dict or'predicate'notin relation_dict or'object'notin relation_dict:
                 raise ModelRetry(f"关系输出格式不正确,应为包含 subject, predicate, object 键的字典。无效输出: {relation_dict}"# 改进错误消息

            # 解析主语和宾语字符串,获取类型和文本
            subject_str = relation_dict.get('subject''')
            object_str = relation_dict.get('object''')
            predicate = relation_dict.get('predicate')

            ifnot subject_str ornot object_str ornot predicate:
                 raise ModelRetry(f"关系信息不完整,缺少 subject, object 或 predicate。无效输出: {relation_dict}"# 改进错误消息

            subject_match = pattern.match(subject_str)
            object_match = pattern.match(object_str)

            ifnot subject_match:
                 raise ModelRetry(f"主语 '{subject_str}' 不符合 <TYPE>text</TYPE> 格式。"# 改进错误消息
            ifnot object_match:
                raise ModelRetry(f"宾语 '{object_str}' 不符合 <TYPE>text</TYPE> 格式。"# 改进错误消息

            subject_type = subject_match.group("type")
            object_type = object_match.group("type")

            triplet_type = {
                "subject_type": subject_type,
                "object_type": object_type
            }

            # 从依赖中获取关系约束并进行检查
            relationship_constraints = ctx.deps.relationship_constraints # 确保约束已加载到deps中
            allowed_constraints = relationship_constraints.get(predicate)

            if allowed_constraints isNone:
                 # 如果关系类型未在约束中定义,则视为无效
                 raise ModelRetry(f"关系类型 '{predicate}' 未在 RelationConstraints 中定义。"# 改进错误消息

            if triplet_type notin allowed_constraints:
                raise ModelRetry(f"三元组: ({subject_type})-[:{predicate}]->({object_type}) 不符合本体约束!"# 改进错误消息

            # 验证通过后,尝试将字典解析为 Pydantic 模型实例
            try:
                 validated_relation = relation_model(**relation_dict) # 使用字典解包创建模型实例
                 validated_relations.append(validated_relation)
            except Exception as e:
                 raise ModelRetry(f"三元组 '{relation_dict}' 无法解析为预期的关系模型 '{relation_model.__name__}': {e}"# 改进错误消息


        return validated_relations # 返回验证通过的 Pydantic 模型列表

    return agent

信息抽取节点(Information Extraction Node)中的逻辑流程如下。它协调文档蒸馏、提及检测和关系抽取这三个步骤:

import asyncio
import os
import glob
import json
from typing importListTypeAnyDict
from dataclasses import dataclass
from metagpt.nodes import BaseNode
from metagpt.graph_trans import GraphRunContext, Dependency, End
from metagpt.utils.task_group import task_group_gather
from tqdm import tqdm
from pydantic import BaseModel
from typing import get_args, get_origin, Literal# 导入get_origin, Literal

# Assume these are defined elsewhere (假定这些在其他地方已定义)
create_doc_distiller_agent = Any# 占位符
create_mention_detection_agent = Any# 占位符
create_relation_extraction_agent = Any# 占位符
create_model_from_schema = Any# 占位符
build_dynamic_relation_model = Any# 占位符
AgentName = Any# 占位符
instruct_model = Any# 占位符
logger = Any# 占位符

@dataclass
classInformationExtractionNode(BaseNode[None, Dependency, None]):
    """信息抽取节点:协调文档解析、提及检测和关系抽取"""# 添加中文注释

    def__init__(self, data_dir: str, schema: Dict, **kwargs):
        super().__init__(**kwargs)

        self.data_dir = data_dir
        self.schema = schema

        # 从 Schema 中创建 Pydantic 模型和类型定义
        # 假定 create_model_from_schema 可以处理枚举定义并返回类型
        self.relation_types: Type = create_model_from_schema(schema, "RelationType")
        # 假定 create_model_from_schema 返回的是 Literal 或 Enum 类型
        self.entity_types: Type = create_model_from_schema(schema, "EntityType")

        # 创建文档主模型和单元模型
        self.doc_model: Type[BaseModel] = create_model_from_schema(schema, "Doc")
        # 创建提及模型
        self.mention_model: Type[BaseModel] = create_model_from_schema(schema, "Mention"# 假定返回 Pydantic Mention 模型

        # 初始化智能体实例
        # 假定 create_doc_distiller_agent 接收输出模型类型
        self.doc_distiller_agent = create_doc_distiller_agent(output_model=self.doc_model, schema=""# Schema 参数看起来未使用

        # 假定 entity_types 可以转换为字符串列表
        self.mention_detection_agent = create_mention_detection_agent(
            self.mention_model,
            entity_types=list(get_args(self.entity_types)) if get_origin(self.entity_types) isLiteralelse [] # 从 Literal 类型中提取枚举值
        )

    asyncdefrun_task(self, ctx: GraphRunContext[None, Dependency], data: str, output_path: str):
        """处理单个文档的信息抽取任务"""# 添加中文注释

        # 第 1 步:蒸馏文档结构
        doc_results = await task_group_gather(
            [
                lambdaself.doc_distiller_agent.run(
                    user_prompt=f"原始文档:{data}\n"# 翻译提示词
                )
            ],
            timeout_seconds=100# 设定超时时间
        )
        doc_result = doc_results[0]
        doc = doc_result.output # doc 是 self.doc_model 的实例
        doc_units = doc.units # doc_units 是 DocUnit 实例的列表

        # 第 2 步:为每个文档单元检测提及
        all_entity_types_list = list(get_args(self.entity_types)) if get_origin(self.entity_types) isLiteralelse []
        mentions_results = await task_group_gather(
            [
                # 为每个文档单元创建一个提及检测任务
                lambda i=i: self.mention_detection_agent.run(
                    user_prompt=f"""段落:{doc_units[i].text}\n
                                    实体类型:{all_entity_types_list}"""
# 翻译提示词,传递实体类型列表
                )
                for i inrange(len(doc_units))
            ],
            timeout_seconds=100# 设定超时时间
        )

        # 将提取出的提及分配回对应的文档单元
        for i inrange(len(doc_units)):
            doc_units[i].mentions = mentions_results[i].output # mentions_results[i].output 是 List[Mention]

        # 第 3 步:为包含提及的文档单元抽取关系
        relation_extraction_agents = []
        relation_indices = [] # 记录原始文档单元的索引
        all_relation_types_list = list(get_args(self.relation_types)) if get_origin(self.relation_types) isLiteralelse []

        for idx, mention_list_result inenumerate(mentions_results):
            # 仅处理找到提及的单元
            if mention_list_result.output andlen(mention_list_result.output) > 0:
                # 格式化提及内容,用于关系抽取提示词/模型构建
                # 根据前面 Pydantic 模型定义,使用 .type 和 .string
                mention_strings = [
                    f"<{item.type}>{item.string}</{item.type}>"
                    for item in mention_list_result.output # mention_list_result.output 是 List[Mention]
                ]

                # 根据找到的提及和模式中定义的关系类型动态构建关系模型
                # 假定 build_dynamic_relation_model 接收提及列表和关系类型列表
                relation_model = build_dynamic_relation_model(mention_strings=mention_strings,
                                                              relation_types=all_relation_types_list)

                # 为这个单元创建关系抽取智能体
                relation_extraction_agents.append(
                    create_relation_extraction_agent(
                        relation_model,
                        constraints=ctx.deps.relationship_constraints, # 从上下文依赖中传递约束
                        mention_strings=mention_strings,
                        relation_types=all_relation_types_list # 传递所有定义的关系类型
                    )
                )

                relation_indices.append(idx) # 存储原始单元索引

        # 并行运行关系抽取智能体
        if relation_extraction_agents: # 仅当存在需要抽取关系的单元时运行
            relations_results = await task_group_gather(
                [
                    # 为每个关系抽取智能体创建一个任务
                    lambda i=i: relation_extraction_agents[i].run(
                        user_prompt=doc_units[relation_indices[i]].text, # 使用原始文档单元的文本作为输入
                        deps=ctx.deps # 传递依赖,用于验证器
                    )
                    for i inrange(len(relation_extraction_agents))
                ],
                timeout_seconds=100# 设定超时时间
            )

            # 将提取出的关系分配回对应的文档单元
            for rel_idx, doc_idx inenumerate(relation_indices):
                try:
                    # relations_results[rel_idx].output 是 List[Relation] (动态模型实例)
                    # 假定 DocUnit 有一个 'relationships' 属性可以接受这个列表
                    doc_units[doc_idx].relationships = relations_results[rel_idx].output
                except Exception as e:
                    # 处理分配过程中可能出现的错误
                    logger.error(f"为文档单元 {doc_idx} 分配关系失败: {e}")
                    doc_units[doc_idx].relationships = None# 失败时设置为 None


        # 使用更新后的单元列表更新原始文档对象
        doc.units = doc_units

        # 可选:保存处理后的文档 (原代码中注释掉了)
        # with open(output_path, "w", encoding="utf-8") as f: # 添加编码
        #     json.dump(doc.dict(), f, indent=2, ensure_ascii=False) # 添加 ensure_ascii=False

    asyncdefrun(self, ctx: GraphRunContext[None, Dependency]) -> End:
        """批量处理文件进行信息抽取"""# 添加中文注释

        # 确保 Dependency 对象包含 relationship_constraints
        ifnothasattr(ctx.deps, 'relationship_constraints'):
             ctx.deps.relationship_constraints = {} # 初始化如果不存在

        # 从 Schema 中加载关系约束到依赖对象中
        if"RelationConstraints"inself.schema:
             ctx.deps.relationship_constraints = self.schema["RelationConstraints"]
        else:
             logger.warning("Schema 中不包含 'RelationConstraints'。关系验证可能无法正常工作。"# 增加警告


        # 查找所有输入文本文件
        # 根据实际设置调整路径。原代码使用 /data/cord-19/articles/
        files = glob.glob(os.path.join(self.data_dir, "*.txt")) # 使用 self.data_dir 提高灵活性

        # 准备 run_task 的参数列表
        args = []
        for file in tqdm(files, desc="准备任务参数"): # 添加进度条描述
            withopen(file,"r", encoding="utf-8"as f: # 指定编码
                basename = os.path.basename(file)
                sample_data = f.read()
                output_path = f"data/processed/{basename}.json"# 定义输出路径

                args.append([sample_data, output_path])

        # 分批处理文件
        bs=10# 批次大小
        for i in tqdm(range(0,len(args),bs), desc="处理文件批次"): # 添加批次进度条描述
            batch_args = args[i:i+bs]
            # task_group_gather 接收一个可等待对象(callable)列表
            await task_group_gather(
                [
                    # 使用 lambda 捕获当前批次的文件数据和输出路径,创建异步任务
                    (lambda data=data, output_path=output_path: self.run_task(
                        ctx=ctx, # 传递当前上下文
                        data=data,
                        output_path=output_path))
                    for data, output_path in batch_args
                ],
                timeout_seconds=120# 设置批次任务的超时时间
        )

        # 完成所有批次处理后,返回 End 节点
        return End(None)

这就是信息抽取(Information Extraction)的全部过程。

3. 集成 Neo4j

将提取出的三元组集成到 Neo4j 时,我们需要注意的另一点是创建节点和节点间关系有两种方式:CREATE(创建)或 MERGE(合并)节点。CREATE 模式会创建两个独立的节点,即使它们指向的是同一个概念;而 MERGE 模式则会在表面形式相同时仅保留一个节点,但这要求你确保它们确实指向的是同一个实体。

在实践中,在医学等某些领域存在许多歧义,一个术语可能指代不同的概念。我们需要一个链接(Linking)步骤,在一个词汇表/参考集中识别某个提及的唯一标识符。我们将在后续的文章中更深入地探讨这个话题。目前,我将使用 MERGE 方法来在提及之间构建桥接关系。

以下是一个将三元组导入 Neo4j 的示例代码片段。请注意,您需要根据实际的文件结构和三元组格式加载数据。

from neo4j import GraphDatabase, basic_auth
from tqdm import tqdm
import os
import glob
import json
import re # 用于解析三元组字符串

# 假设 URI, AUTH, pattern 已定义。您需要根据实际情况替换或定义它们。
# 三元组的结构假定为一个列表,列表元素是字典,例如 {'subject': '<TYPE>text</TYPE>', 'predicate': 'RELATION_TYPE', 'object': '<TYPE>text</TYPE>'}
# pattern = re.compile(r"<(?P<type>\w+)>(?P<text>.*?)</\1>") # 用于解析主语和宾语字符串

URI = "bolt://localhost:7687"# Neo4j 数据库连接 URI 占位符
AUTH = basic_auth("neo4j""password"# Neo4j 认证信息占位符 - 请替换为您的实际凭据

# 示例:从之前生成的 JSON 文件中加载三元组
triplets = [] # 存放加载的三元组
data_dir = "data/processed"# 之前信息抽取生成的 JSON 文件目录
json_files = glob.glob(os.path.join(data_dir, "*.json"))

# 用于解析三元组字符串的正则表达式
pattern = re.compile(r"<(?P<type>\w+)>(?P<text>.*?)</\1>")

for json_file in tqdm(json_files, desc="加载三元组"): # 为文件加载添加进度条
    withopen(json_file, 'r', encoding='utf-8'as f: # 指定编码
        try:
            doc_data = json.load(f)
            # 假设 doc_data 结构包含 units -> relationships
            if'units'in doc_data:
                for unit in doc_data['units']:
                    if'relationships'in unit and unit['relationships']:
                        # 遍历抽取出的关系列表
                        for rel in unit['relationships']:
                            # rel 应该是 Pydantic 模型实例的字典表示,其中 subject 和 object 已经是 <TYPE>text</TYPE> 格式的字符串
                            ifisinstance(rel, dictand'subject'in rel and'predicate'in rel and'object'in rel:
                                triplets.append(rel) # 直接添加符合格式的字典
        except Exception as e:
            # print(f"加载文件 {json_file} 中的三元组失败: {e}") # 打印加载失败信息 (可选)
            pass# 忽略加载单个文件失败的情况 (原代码行为)


# 连接 Neo4j 数据库并导入数据
with GraphDatabase.driver(URI, auth=AUTH) as driver:
    driver.verify_connectivity()
    print("连接 Neo4j 数据库成功"# 翻译连接成功消息

    for triplet in tqdm(triplets, desc="导入三元组到 Neo4j"): # 为导入过程添加进度条
        try:
            # 解析主语和宾语字符串以获取类型和文本
            subject_match = pattern.match(triplet.get('subject'''))
            object_match = pattern.match(triplet.get('object'''))

            ifnot subject_match ornot object_match:
                # print(f"跳过无效三元组格式: {triplet}") # 打印跳过信息 (可选)
                continue# 跳过格式不符的三元组

            subject_type = subject_match.group("type")
            subject_text = subject_match.group("text")
            object_type = object_match.group("type")
            object_text = object_match.group("text")
            predicate = triplet.get('predicate')

            ifnot subject_type ornot subject_text ornot object_type ornot object_text ornot predicate:
                 # print(f"跳过不完整三元组: {triplet}") # 打印跳过信息 (可选)
                 continue# 跳过信息不完整的三元组


            # 执行 MERGE 查询,创建或合并节点和关系
            result = driver.execute_query(f"""
                MERGE (a:{subject_type} {{name: $head}})
                MERGE (b:{object_type} {{name: $tail}})
                MERGE (a)-[:{predicate}]->(b)
                """
,
                                          head=subject_text,
                                          tail=object_text,
                                          database="neo4j"# 如果需要,指定数据库名称
                                          ).summary
            # print(f"导入三元组 ({subject_text})-[:{predicate}]->({object_text}) 成功") # 打印导入成功信息 (可选)

        except Exception as e:
            # print(f"导入三元组 {triplet} 失败: {e}") # 打印导入失败信息 (可选)
            pass# 忽略导入单个三元组失败的情况 (原代码行为)

结果展示

我原本期望能提取到更多药物副作用的信息,但目前看来,语料库中关于副作用的提及较少。未来可以通过扩展数据集或优化模型来改善这一情况。

总结

在本文中,我展示了如何将“人工参与(Human-in-the-loop)”设计集成到用于知识图谱构建的多智能体系统中,并以 CORD-19 数据集为例进行了案例研究。通过结合领域图文档结构图,并借助封闭关系(Closed Relationship)强制执行本体约束,我们可以构建出更准确、结构化的知识图谱表示。

 


53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询