微信扫码
添加专属顾问
我要投稿
智能体与GraphRAG的完美结合,打造动态知识图谱问答新范式。 核心内容: 1. 人工参与的多智能体系统构建知识图谱 2. 智能体驱动的GraphRAG实现动态策略选择 3. 基于CORD-19数据集的实践应用与验证
在我之前的文章中,我探讨了如何利用基于智能体(Agent)的方法来模块化文档到知识图谱(KG)的构建流程。我们可以将这种方法进一步拓展,不仅利用大型语言模型(LLMs)进行信息抽取,还能让它们参与到本体(Ontology)的设计中。
这开启了构建一个真正端到端框架的可能性:LLMs 辅助定义本体,然后直接从文本中提取信息并构建知识图谱。但同时我也意识到,构建一个有效的本体并非易事,它不仅取决于领域的复杂性,还与知识图谱的使用目的(例如推荐、问答、探索等)紧密相关。
因此,我没有赋予系统完全自主权,而是引入了一种融合“人在环路”(Human-in-the-loop)技术的工作流程。这使得人类能够进行最终决策,从而确保本体设计与用户的目的相符。
我也注意到,使用知识图谱进行问答(QA)比传统 RAG 方法更复杂,也更具挑战性。在本文中,我将介绍如何在统一的智能体驱动框架内实现 GraphRAG,并通过集成路由机制动态选择最佳策略。通过将基于智能体(Agent-based)的设计与路由相结合,我们可以为每个问题动态选择最适合的策略。
总而言之,本文主要包含两大部分:
本次,我将使用 CORD-19(https://github.com/allenai/cord19?tab=readme-ov-file) 数据集,这是一个关于 COVID-19 及相关冠状病毒研究的学术论文语料库,用于构建知识图谱。由于资源限制,我随机抽取了 1,000 篇论文,并提取了它们的摘要,将每篇摘要保存为一个独立的 .txt
文件。希望尽管数据量有限,我们仍能从中发现有价值的信息。
在深入探讨系统设计之前,我想先澄清几个关键概念。我研究了 Neo4j,他们将知识图谱分为两种类型:文档结构图(Document Structure Graphs) 和 领域图(Domain Graphs)。
我在之前的文章中所做的是这两种类型的结合。实际上,这样做是可行的,并且适用于大多数情况。而选择哪种类型,很大程度上取决于知识图谱的具体用途。领域图虽然缺乏上下文信息,但它非常擅长解决聚合(Aggregation)查询。聚合查询就像是把分散在不同地方(文档段落)的关于同一个主题(例如药物 B 的症状)的信息汇集到一起。例如:针对药物 B的症状,有多篇研究资源进行了探讨。通过使用领域图,我们可以聚合这些信息,并通过一个名为药物 B的共同实体将那些分散的段落关联起来。所以当用户询问:药物 B 有什么症状?我们可以通过类似这样的查询来获取答案:
MATCH (a)-[:HAS_SYMPTOM]->(b) WHERE a.name = "drug B" RETURN b
这也解决了 RAG 的一个局限性:RAG 通常需要截取 top-k 个相关段落,但有时我们需要的信息不止于此。使用图查询,我们则不需要使用“top-k”参数。然而,为了提供完整且有依据的答案,我们仍然需要将子图查询结果追溯到其原始文本段落,并将这些段落输入 LLMs。这时,文档结构图便能发挥作用。
接下来,我们进一步探讨实体间的**关系(Relationship)**类型,我将其分为两类:
选择哪种关系类型取决于下游任务。封闭关系适用于许多机器学习问题,如链接预测、推荐、探索等。而当实体间的关系过于复杂、灵活甚至未知,无法用一个预定义的关系列表来表达时,则会使用开放关系。
在本次工作中,我将继续结合领域图和文档结构图,并采用封闭关系类型。
具体来说,我们将涵盖医学(Medical)
领域,重点关注药物(DRUG)、疾病(DISEASE)、症状(SYMPTOM)
这三类实体以及治疗(TREATS)、引起(CAUSES)、有副作用(HAS_SIDE_EFFECT)、有症状(HAS_SYMPTOM)
这些关系。我们还将介绍如何确保提取出的知识图谱满足本体的约束条件,例如,HAS_SIDE_EFFECT
关系仅存在于药物(DRUG)
和症状(SYMPTOM)
实体之间,从而使知识图谱更加可靠。
这次我选择了文档解析方法(适合处理短文档),而不是分块方法,来生成文档结构图。我相信将分块选项添加到框架中是可行的,但本文中不做展示。
回顾上一篇文章,我已经在 models/schema.py
中定义了本体模式(Ontology Schema)。现在我们无需再手动定义它们了。
from typing importList, Literal, Optional, Type, Any, get_args, get_origin, Union
from pydantic import BaseModel, Field
from dataclasses import dataclass
classMention(BaseModel):
type: Literal["LOCATION", "TIME", "PERSON", "EVENT", "ORGANIZATION"]
string: str = Field(...,
description="命名实体(如地点、时间、人物或事件)在原文中出现的精确文本字符串。")
classRelation(BaseModel):
head: str = Field(..., description="头部提及的实体。") # 根据LLM3建议修改描述
tail: str = Field(..., description="尾部提及的实体。") # 根据LLM3建议修改描述
relation_description: str = Field(...,
description="对头部(head)和尾部(tail)提及实体之间关系的简要描述。")
classSection(BaseModel):
title: str = Field(..., description="章节标题")
summary: str = Field(...,
description="一份简要总结(150-300 字),突出本章节涵盖的关键点。")
mentions: List[Mention] = Field(...,
description="在本章节中找到的提及(实体)的完整列表。")
relations: List[Relation] = Field(...,
description="在本章节中找到的提及之间的关系的完整列表。")
@dataclass
classDistilledUnit:
title: str = Field(..., description="单元标题")
summary: str = Field(..., description="对单元内容的简明总结,介于 150 至 300 字之间。")
sections: List[Section] = Field(..., description="单元内容中包含的 `Section` 列表")
下面是我们的新工作流程图:
我们设计了一个包含三个主要智能体以及若干辅助工具的系统,用于协助构建本体和模式:
本体初始化智能体(Ontology Initialization Agent)
该智能体负责分析数据,以确定领域知识,包括相关的实体类型和关系类型。它利用了两个关键的辅助工具:
重要的是,该智能体还支持人工反馈(Human Feedback),允许用户改进或调整本体,使其更好地符合他们的具体目标。这是“人在环路”的关键环节。
模式设计智能体(Schema Design Agent)
该智能体读取原始数据和前一个智能体生成的本体,以设计一个符合文档结构的模式(Schema)。例如,如果输入文档是学术论文,模式可能包含诸如 title
(标题)、authors
(作者)、publication date
(发表日期)、sections
(章节)、figures
(图表)和 tables
(表格)等属性。与本体智能体类似,该智能体也支持交互式反馈(Interactive Feedback),允许用户根据需要修改模式。这一步的输出是一个伪代码定义(Pseudo Code Definition),它概述了本体和文档结构。
模式生成智能体(Schema Generation Agent)
该智能体接收上一步生成的伪代码,并将其转换为JSON 模式(JSON Schema)。然后可以使用此模式通过 Pydantic 等库生成模型。这一步是自动化生成代码模式的关键。
实际上,步骤 2 和步骤 3 可以合并为一步,但我发现当代码生成智能体收到关于模式模型定义的更清晰输入时,其性能会更好。
人工参与(Human-in-the-loop)示例:
首先,我们定义 ontology_init_agent
(本体初始化智能体)。以下代码片段展示了智能体的基本结构,包括名称、使用的模型、输出类型、指令以及可用的工具。
# agents/ontology_init_agent.py
from typing importList, Optional
from metagpt.actions import Action
from metagpt.agents import Agent
from metagpt.config import Config
from metagpt.environment import Environment
# from metagpt.ext.stanford_corenlp.stanford_corenlp import StanfordCoreNLP # Original import commented out
from metagpt.logs import logger
from metagpt.tools import Tool
from metagpt.roles import Role
from metagpt.prompts.base import BasePrompt
from typing importAny# 用于占位符类型
# Assume these are defined elsewhere (假定这些在其他地方已定义)
instruct_model = Any# 占位符:指令模型
AgentName = Any# 占位符:智能体名称枚举
Ontology = Any# 占位符:本体模型
ONTOLOGY_INIT_PROMPT = ""# 占位符:本体初始化提示词
retrieve_data = Any# 占位符:数据检索工具函数
search = Any# 占位符:搜索工具函数
defcreate_ontology_init_agent() -> Agent:
"""创建本体初始化智能体"""# 添加中文注释
agent = Agent(
name=AgentName.ontology_init_agent.value,
model=instruct_model,
result_type=Ontology,
instructions=ONTOLOGY_INIT_PROMPT,
tools=[
Tool(retrieve_data, takes_ctx=True), # 数据检索工具,需要上下文
Tool(search, takes_ctx=False), # 搜索工具
],
retries=5, # 失败重试次数
)
return agent
所以流程是一个循环:智能体生成本体 -> 将本体展示给用户 -> 征求用户确认。只有当用户同意设计时,循环才会终止。你可以简单地要求用户输入“agree”来中断循环,但这里我将使用一个“人工偏好智能体(Human Preference Agent)”来将用户意图解析成结构化对象,从而使对话更自然。以下是用于表示人工评审结果的 Pydantic 模型:
from pydantic import BaseModel, Field
from typing import Optional
class HumanReview(BaseModel):
is_agreed: bool = Field(..., description="指示人工评审员是否同意生成的本体。")
feedback: Optional[str] = Field(description="人工评审员关于本体的评论或建议。")
只有当 is_agreed
为 True
时,循环才会终止。
(可选)你可以使用一个“接口智能体(Interface Agent)”来与非技术用户沟通,用自然语言解释本体。以下代码展示了本体初始化节点的主要逻辑,包括与用户交互和调用智能体:
# nodes/ontology_init_node.py
import asyncio
import os
import glob
from typing importAny
from dataclasses import dataclass
from metagpt.actions import Action
from metagpt.agents import Agent
from metagpt.nodes import BaseNode
from metagpt.graph_trans import GraphRunContext, Dependency, End
from metagpt.utils.task_group import task_group_gather
from tqdm import tqdm
# Assume these are defined elsewhere (假定这些在其他地方已定义)
create_ontology_init_agent = Any# 占位符
create_interface_agent = Any# 占位符
create_human_preference_agent = Any# 占位符
select_sample_data = Any# 占位符
Prompt = Any# 占位符
Ontology = Any# 占位符
HumanReview = Any# 占位符
SchemaDesignNode = Any# 占位符
# 创建智能体实例
ontology_init_agent = create_ontology_init_agent()
interface_agent = create_interface_agent()
human_preference_agent = create_human_preference_agent()
@dataclass
classOntologyInitNode(BaseNode):
data_dir: str# 数据目录路径
asyncdefrun(self, ctx: GraphRunContext) -> SchemaDesignNode:
"""运行本体初始化节点"""# 添加中文注释
messages = [] # 用于保存对话历史
deps = Dependency(data_dir=self.data_dir) # 创建依赖对象,存储数据目录信息
sample_data = select_sample_data(data_dir=self.data_dir) # 选取样本数据
# 第一步:收集数据背景信息
background = Prompt.ask(
"能否请您详细介绍一下这些数据?\n"
"它们是做什么用的,以及您打算如何使用它们?\n"
"我询问这些是为了更好地理解数据背后的意图和目的,以便我们可以为知识图谱设计一个更合适的本体。"
)
# 进入人工评审循环
whileTrue:
# 第二步:运行本体初始化智能体,生成本体草案
# 第一次运行时提供背景和样本数据,后续运行则依赖消息历史
ontology_result = await ontology_init_agent.run(
user_prompt=f"""
**背景**: {background}
**样本数据**: {sample_data}
"""iflen(messages)==0elseNone,
message_history=messages,
deps=deps,
)
messages += ontology_result.new_messages() # 添加智能体响应到对话历史
# 第三步:使用接口智能体解释本体草案给用户
interface_response = await interface_agent.run(f"""{str(ontology_result.output)}""")
messages += interface_response.new_messages() # 添加解释响应到对话历史
print(interface_response.output) # 打印解释给用户
# 第四步:获取用户反馈
user_response = input("请提供您的反馈或确认(输入 'agree' 同意):") # 提示用户输入
# 第五步:使用人工偏好智能体解析用户意图
human_review = await human_preference_agent.run(
user_prompt=user_response,
message_history=messages
)
review: HumanReview = human_review.output # 获取解析结果
if review.is_agreed: # 如果用户同意,则跳出循环
break
else:
messages += human_review.new_messages() # 如果不同意,添加反馈到历史,继续循环
# 循环结束后,返回包含本体和数据目录的SchemaDesignNode
return SchemaDesignNode(data_dir=self.data_dir, ontology=ontology_result.output)
我们对模式设计节点(Schema Design Node)采用同样的方式,该节点负责设计并生成 JSON 模式。这是一个输出示例:
{
"$defs":{
"EntityType":{
"type":"string",
"enum":[
"PERSON","ORGANIZATION","DATE","MEDICATION","DISEASE",
"LOCATION","STATISTICAL_MEASURE"
],
"title":"实体类型"
},
"RelationType":{
"type":"string",
"enum":[
"INFECTS","LOCALIZED_IN","TREATED_WITH","OCCURS_IN",
"CAUSES","IS_PART_OF","IS_HOST_TO","IS_VARIANT_OF","IS_ASSOCIATED_WITH"
],
"title":"关系类型"
},
"Mention":{
"type":"object",
"properties":{
"type":{"$ref":"#/$defs/EntityType"},
"text":{"type":"string","description":"提及的实体文本"}
},
"required":["type","text"]
},
"DocUnit":{
"type":"object",
"properties":{
"text":{"type":"string","description":"单元文本内容"},
"section_title":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"单元所属章节标题"},
"mentions":{
"anyOf":[
{"type":"array","items":{"$ref":"#/$defs/Mention"}},
{"type":"null"}
],
"default":null,
"description":"本单元中提取出的提及列表"
},
"relationships":{
"anyOf":[
{"type":"array","items":{"type":"object"}},
{"type":"null"}
],
"default":null,
"description":"本单元中提取出的关系列表"
}
},
"required":["text","section_title"]
}
},
"type":"object",
"title":"文档",
"properties":{
"title":{"type":"string","description":"文档标题"},
"authors":{"type":"array","items":{"type":"string"},"description":"文档作者列表"},
"abstract":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"文档摘要"},
"units":{"type":"array","items":{"$ref":"#/$defs/DocUnit"},"description":"文档单元列表"},
"source":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"文档来源"},
"doc_type":{"anyOf":[{"type":"string"},{"type":"null"}],"default":null,"description":"文档类型"}
},
"required":["title","authors","units","abstract","source","doc_type"],
"RelationConstraints":{
// 必须包含:关于每种关系类型可拥有的主语/宾语实体类型的约束
"CAUSES":[
{"subject_type":"DISEASE","object_type":"SYMPTOM"},
{"subject_type":"DISEASE","object_type":"DISEASE"}
],
"TREATS":[
{"subject_type":"MEDICATION","object_type":"DISEASE"}
]
}
}
为了确保当 LLMs 未能正确生成 JSON 时流程不会中断,我们可以添加一些后处理/验证和重新生成的循环。例如,为模式生成智能体添加一个输出验证器:
import re
import json
from typing importType, Any
from metagpt.agents import Agent
from metagpt.utils.model_utils import create_model_from_schema
from metagpt.const import ModelRetry
from metagpt.graph_trans import RunContext # 假设 RunContext 存在
from pydantic import BaseModel
from typing import get_args, get_origin, Union# 导入 Union
# Assume these are defined elsewhere (假定这些在其他地方已定义)
instruct_model = Any# 占位符
AgentName = Any# 占位符
SCHEMA_GENERATION_PROMPT = ""# 占位符
extract_json_from_markdown = Any# 占位符 (用于从 Markdown 提取 JSON)
defcreate_schema_generation_agent() -> Agent:
"""创建模式生成智能体"""# 添加中文注释
agent = Agent(
name=AgentName.schema_generation_agent.value,
model=instruct_model,
instructions=SCHEMA_GENERATION_PROMPT,
result_type=str, # 智能体直接输出字符串形式的 JSON
retries=5, # 失败重试次数
)
# 添加输出验证器
@agent.output_validator
asyncdefvalidate_schema(_: RunContext, output: str) -> str:
"""验证生成的 JSON 模式是否有效且符合要求"""# 添加中文注释
try:
# 从可能的 Markdown 代码块中提取 JSON 字符串
match = re.search(r'```(?:json)?\s*([\s\S]*?)\s*```', output)
ifnotmatch:
raise ModelRetry("输出不是有效的 JSON Markdown 代码块。") # 改进错误消息
json_string = match.group(1).strip()
schema =json.loads(json_string) # 解析 JSON
# 基本模式结构验证
if"$defs"notin schema:
raise ModelRetry("模式必须包含 '$defs' 部分。") # 改进错误消息
if"type"notin schema or schema["type"] != "object":
raise ModelRetry("模式根节点必须是类型为 'object' 的对象。") # 改进错误消息
# 验证关键模型的存在和结构
try:
doc_model = create_model_from_schema(schema,"Doc")
except Exception as e:
raise ModelRetry(f"无法从模式创建 'Doc' 模型: {e}")
try:
doc_unit_model = create_model_from_schema(schema, "DocUnit")
except Exception as e:
raise ModelRetry(f"无法从模式创建 'DocUnit' 模型: {e}")
try:
# 检查 RelationType 枚举定义
create_model_from_schema(schema, "RelationType")
except:
raise ModelRetry("模式必须定义 `RelationType` (Enum) 模型。")
# 检查 Doc 模型必须有 'units' 属性
if'units'notin doc_model.model_fields:
raise ModelRetry("模型 'Doc' 必须包含属性 'units'。")
# 检查 DocUnit 模型必须有可选的 'mentions' 属性
mentions_field_info = doc_unit_model.model_fields.get("mentions")
if mentions_field_info:
# 检查是否是 Union 且包含 NoneType
ifnot (get_origin(mentions_field_info.annotation) isUnionand
type(None) in get_args(mentions_field_info.annotation)):
raise ModelRetry(f"模型 'DocUnit' 的属性 'mentions' 必须是可选的 (允许 null)。")
else:
# 如果 'mentions' 属性缺失,也视为错误
raise ModelRetry(f"模型 'DocUnit' 必须包含属性 'mentions'。")
# 检查并验证 RelationConstraints 部分
if"RelationConstraints"notin schema:
raise ModelRetry(f"模式必须包含 `RelationConstraints`。")
else:
relation_constraints = schema["RelationConstraints"]
ifnotisinstance(relation_constraints, dict):
raise ModelRetry("`RelationConstraints` 必须是一个字典。") # 改进错误消息
for key, value in relation_constraints.items():
ifnotisinstance(key, str) ornotisinstance(value, list):
raise ModelRetry("RelationConstraints 的键必须是字符串 (关系类型),值必须是列表。") # 改进错误消息
for constraint in value:
ifnotisinstance(constraint, dict) or"subject_type"notin constraint or"object_type"notin constraint:
raise ModelRetry(f"RelationConstraints 中的每个约束必须是包含 'subject_type' 和 'object_type' 的字典。无效约束: {constraint}") # 改进错误消息
# 验证通过,返回原始输出字符串
return output
except Exception as e:
# 捕获所有验证中的异常,返回 ModelRetry
raise ModelRetry(f"无效的 JSON 模式: {e}")
根据我的经验,使用单次提示(One Shot Prompting),qwen2.5 coder 或 gpt4o-mini 通常能在第一次尝试时就生成正确的结果。以下代码展示了模式设计节点如何使用这些智能体并集成人工反馈:
# nodes/schema_design_node.py
import asyncio
import os
import glob
from typing importAny
from dataclasses import dataclass
from metagpt.actions import Action
from metagpt.agents import Agent
from metagpt.nodes import BaseNode
from metagpt.graph_trans import GraphRunContext, Dependency, End
from metagpt.utils.task_group import task_group_gather
from tqdm import tqdm
# Assume these are defined elsewhere (假定这些在其他地方已定义)
create_schema_design_agent = Any# 占位符
create_interface_agent = Any# 占位符
create_human_preference_agent = Any# 占位符
select_sample_data = Any# 占位符
Ontology = Any# 占位符
HumanReview = Any# 占位符
InformationExtractionNode = Any# 占位符
create_schema_generation_agent = Any# 占位符
extract_json_from_markdown = Any# 占位符
# 创建智能体实例
schema_design_agent = create_schema_design_agent()
interface_agent = create_interface_agent()
human_preference_agent = create_human_preference_agent()
schema_generation_agent = create_schema_generation_agent() # 使用上面定义的带验证器的智能体
@dataclass
classSchemaDesignNode(BaseNode):
data_dir: str# 数据目录路径
ontology: Ontology # 本体信息
asyncdefrun(self, ctx: GraphRunContext[None, Dependency]) -> InformationExtractionNode:
"""运行模式设计节点"""# 添加中文注释
messages = [] # 用于保存对话历史
# 第 1 步:选取数据样本
sample_data: str = select_sample_data(self.data_dir)
# 进入人工评审循环
whileTrue:
# 第 2 步:运行模式设计智能体,生成模式草案
# 第一次运行时提供文档样本和本体信息,后续依赖消息历史
schema_design_result = await schema_design_agent.run(
user_prompt=f"文档样本:{sample_data}\n"
"\n=============\n"
f"本体:{str(self.ontology)}"iflen(messages) == 0elseNone,
message_history=messages,
)
messages += schema_design_result.new_messages() # 添加智能体响应到对话历史
# 第 3 步:使用接口智能体解释模式草案给用户
# 移除思维标签,清理输出以便展示
interface_response_result = await interface_agent.run(
user_prompt=""
f" {str(schema_design_result.output).replace('<think>', '').replace('</think>', '')}"
)
print(interface_response_result.output) # 打印解释给用户
# 第 4 步:获取用户反馈
user_response = input("请提供您的反馈或确认(输入 'agree' 同意):") # 提示用户输入
# 第 5 步:使用人工偏好智能体解析用户意图
human_review = await human_preference_agent.run(
user_prompt=user_response,
message_history=messages
)
review: HumanReview = human_review.output # 获取解析结果
if review.is_agreed: # 如果用户同意,则跳出循环
break
else:
messages += human_review.new_messages() # 如果不同意,添加反馈到历史,继续循环
# 循环结束后,运行模式生成智能体,将伪代码转换为 JSON Schema
schema_result = await schema_generation_agent.run(
user_prompt=schema_design_result.output # 输入是模式设计的伪代码输出
)
schema = extract_json_from_markdown(schema_result.output) # 从带 Markdown 格式的输出中提取 JSON
# 返回包含数据目录和最终模式的 InformationExtractionNode
return InformationExtractionNode(data_dir=self.data_dir, schema=schema)
这就是模式设计(Schema Design)部分。
现在我们进入信息抽取(Information Extraction)阶段,这部分与上一篇文章中的做法非常相似。但由于我们没有预定义的 Mention 模型,数据模型只能在运行时(runtime)定义。因此,与其在智能体中显式定义输出模型,我们可以通过 Type[BaseModel]
对其进行抽象,并将 output_type
作为函数的参数传递。以下是用于提及检测智能体的定义:
from typing importList, Type, Any
from pydantic import BaseModel
from metagpt.agents import Agent
from metagpt.graph_trans import RunContext # 假设 RunContext 存在
# Assume these are defined elsewhere (假定这些在其他地方已定义)
instruct_model = Any# 占位符
AgentName = Any# 占位符
MENTION_DETECTION_PROMPT = ""# 占位符
defcreate_mention_detection_agent(
mention_model: Type[BaseModel], # 提及模型的类型
entity_types: List[str] # 需要检测的实体类型列表
) -> Agent:
"""创建提及检测智能体"""# 添加中文注释
agent = Agent(
name=AgentName.mention_detection_agent.value,
model=instruct_model,
system_prompt=MENTION_DETECTION_PROMPT.format(entity_types=entity_types), # 格式化系统提示词
result_type=List[mention_model], # 预期结果是提及模型的列表
retries=5, # 失败重试次数
)
# 提及检测智能体此处没有额外的输出验证器
return agent
类似地,我们创建 relation_extraction_agent
(关系抽取智能体)。我也使用了 output_validation
来确保抽取出的关系符合本体的约束。这是为确保知识图谱质量设置的“质量控制关卡”。
# agent/relation_extraction_agent.py
import re
from typing importList, Type, Any, Dict# 导入 Dict
from pydantic import BaseModel
from metagpt.agents import Agent
from metagpt.const import ModelRetry
from metagpt.graph_trans import RunContext, Dependency # 假设 RunContext 和 Dependency 存在
# Assume these are defined elsewhere (假定这些在其他地方已定义)
instruct_model = Any# 占位符
AgentName = Any# 占位符
RELATION_EXTRACTION_PROMPT = ""# 占位符
build_dynamic_relation_model = Any# 占位符 (用于构建动态关系模型)
defcreate_relation_extraction_agent(relation_model: Type[BaseModel], mention_strings: List[str], relation_types: List[str], constraints: Dict[str, List[Dict[str, str]]]) -> Agent:
"""创建关系抽取智能体"""# 添加中文注释
agent = Agent(
name=AgentName.relation_extraction_agent.value,
model=instruct_model,
system_prompt=RELATION_EXTRACTION_PROMPT.format(
mention_strings=mention_strings,
relation_types=relation_types,
constraints=constraints), # 将约束传递给提示词
result_type=List[relation_model], # 预期结果是关系模型的列表
retries=5, # 失败重试次数
)
# 添加输出验证器以检查关系是否符合本体约束
@agent.output_validator
asyncdefvalidate_relation_constraint(ctx: RunContext[Dependency], list_relations: List[Any]): # 接收 LLM 的原始输出列表
"""验证抽取出的关系是否符合本体约束"""# 添加中文注释
import re
# 这个正则表达式预期提及内容的格式为 <TYPE>text</TYPE>
pattern = re.compile(r"<(?P<type>\w+)>(?P<text>.*?)</\1>") # 预编译正则表达式
validated_relations = [] # 用于存放验证通过的关系对象
for relation_dict in list_relations: # LLM 输出通常是字典列表
# 检查是否为字典,并包含必要的键
ifnotisinstance(relation_dict, dict) or'subject'notin relation_dict or'predicate'notin relation_dict or'object'notin relation_dict:
raise ModelRetry(f"关系输出格式不正确,应为包含 subject, predicate, object 键的字典。无效输出: {relation_dict}") # 改进错误消息
# 解析主语和宾语字符串,获取类型和文本
subject_str = relation_dict.get('subject', '')
object_str = relation_dict.get('object', '')
predicate = relation_dict.get('predicate')
ifnot subject_str ornot object_str ornot predicate:
raise ModelRetry(f"关系信息不完整,缺少 subject, object 或 predicate。无效输出: {relation_dict}") # 改进错误消息
subject_match = pattern.match(subject_str)
object_match = pattern.match(object_str)
ifnot subject_match:
raise ModelRetry(f"主语 '{subject_str}' 不符合 <TYPE>text</TYPE> 格式。") # 改进错误消息
ifnot object_match:
raise ModelRetry(f"宾语 '{object_str}' 不符合 <TYPE>text</TYPE> 格式。") # 改进错误消息
subject_type = subject_match.group("type")
object_type = object_match.group("type")
triplet_type = {
"subject_type": subject_type,
"object_type": object_type
}
# 从依赖中获取关系约束并进行检查
relationship_constraints = ctx.deps.relationship_constraints # 确保约束已加载到deps中
allowed_constraints = relationship_constraints.get(predicate)
if allowed_constraints isNone:
# 如果关系类型未在约束中定义,则视为无效
raise ModelRetry(f"关系类型 '{predicate}' 未在 RelationConstraints 中定义。") # 改进错误消息
if triplet_type notin allowed_constraints:
raise ModelRetry(f"三元组: ({subject_type})-[:{predicate}]->({object_type}) 不符合本体约束!") # 改进错误消息
# 验证通过后,尝试将字典解析为 Pydantic 模型实例
try:
validated_relation = relation_model(**relation_dict) # 使用字典解包创建模型实例
validated_relations.append(validated_relation)
except Exception as e:
raise ModelRetry(f"三元组 '{relation_dict}' 无法解析为预期的关系模型 '{relation_model.__name__}': {e}") # 改进错误消息
return validated_relations # 返回验证通过的 Pydantic 模型列表
return agent
信息抽取节点(Information Extraction Node)中的逻辑流程如下。它协调文档蒸馏、提及检测和关系抽取这三个步骤:
import asyncio
import os
import glob
import json
from typing importList, Type, Any, Dict
from dataclasses import dataclass
from metagpt.nodes import BaseNode
from metagpt.graph_trans import GraphRunContext, Dependency, End
from metagpt.utils.task_group import task_group_gather
from tqdm import tqdm
from pydantic import BaseModel
from typing import get_args, get_origin, Literal# 导入get_origin, Literal
# Assume these are defined elsewhere (假定这些在其他地方已定义)
create_doc_distiller_agent = Any# 占位符
create_mention_detection_agent = Any# 占位符
create_relation_extraction_agent = Any# 占位符
create_model_from_schema = Any# 占位符
build_dynamic_relation_model = Any# 占位符
AgentName = Any# 占位符
instruct_model = Any# 占位符
logger = Any# 占位符
@dataclass
classInformationExtractionNode(BaseNode[None, Dependency, None]):
"""信息抽取节点:协调文档解析、提及检测和关系抽取"""# 添加中文注释
def__init__(self, data_dir: str, schema: Dict, **kwargs):
super().__init__(**kwargs)
self.data_dir = data_dir
self.schema = schema
# 从 Schema 中创建 Pydantic 模型和类型定义
# 假定 create_model_from_schema 可以处理枚举定义并返回类型
self.relation_types: Type = create_model_from_schema(schema, "RelationType")
# 假定 create_model_from_schema 返回的是 Literal 或 Enum 类型
self.entity_types: Type = create_model_from_schema(schema, "EntityType")
# 创建文档主模型和单元模型
self.doc_model: Type[BaseModel] = create_model_from_schema(schema, "Doc")
# 创建提及模型
self.mention_model: Type[BaseModel] = create_model_from_schema(schema, "Mention") # 假定返回 Pydantic Mention 模型
# 初始化智能体实例
# 假定 create_doc_distiller_agent 接收输出模型类型
self.doc_distiller_agent = create_doc_distiller_agent(output_model=self.doc_model, schema="") # Schema 参数看起来未使用
# 假定 entity_types 可以转换为字符串列表
self.mention_detection_agent = create_mention_detection_agent(
self.mention_model,
entity_types=list(get_args(self.entity_types)) if get_origin(self.entity_types) isLiteralelse [] # 从 Literal 类型中提取枚举值
)
asyncdefrun_task(self, ctx: GraphRunContext[None, Dependency], data: str, output_path: str):
"""处理单个文档的信息抽取任务"""# 添加中文注释
# 第 1 步:蒸馏文档结构
doc_results = await task_group_gather(
[
lambda: self.doc_distiller_agent.run(
user_prompt=f"原始文档:{data}\n"# 翻译提示词
)
],
timeout_seconds=100, # 设定超时时间
)
doc_result = doc_results[0]
doc = doc_result.output # doc 是 self.doc_model 的实例
doc_units = doc.units # doc_units 是 DocUnit 实例的列表
# 第 2 步:为每个文档单元检测提及
all_entity_types_list = list(get_args(self.entity_types)) if get_origin(self.entity_types) isLiteralelse []
mentions_results = await task_group_gather(
[
# 为每个文档单元创建一个提及检测任务
lambda i=i: self.mention_detection_agent.run(
user_prompt=f"""段落:{doc_units[i].text}\n
实体类型:{all_entity_types_list}"""# 翻译提示词,传递实体类型列表
)
for i inrange(len(doc_units))
],
timeout_seconds=100# 设定超时时间
)
# 将提取出的提及分配回对应的文档单元
for i inrange(len(doc_units)):
doc_units[i].mentions = mentions_results[i].output # mentions_results[i].output 是 List[Mention]
# 第 3 步:为包含提及的文档单元抽取关系
relation_extraction_agents = []
relation_indices = [] # 记录原始文档单元的索引
all_relation_types_list = list(get_args(self.relation_types)) if get_origin(self.relation_types) isLiteralelse []
for idx, mention_list_result inenumerate(mentions_results):
# 仅处理找到提及的单元
if mention_list_result.output andlen(mention_list_result.output) > 0:
# 格式化提及内容,用于关系抽取提示词/模型构建
# 根据前面 Pydantic 模型定义,使用 .type 和 .string
mention_strings = [
f"<{item.type}>{item.string}</{item.type}>"
for item in mention_list_result.output # mention_list_result.output 是 List[Mention]
]
# 根据找到的提及和模式中定义的关系类型动态构建关系模型
# 假定 build_dynamic_relation_model 接收提及列表和关系类型列表
relation_model = build_dynamic_relation_model(mention_strings=mention_strings,
relation_types=all_relation_types_list)
# 为这个单元创建关系抽取智能体
relation_extraction_agents.append(
create_relation_extraction_agent(
relation_model,
constraints=ctx.deps.relationship_constraints, # 从上下文依赖中传递约束
mention_strings=mention_strings,
relation_types=all_relation_types_list # 传递所有定义的关系类型
)
)
relation_indices.append(idx) # 存储原始单元索引
# 并行运行关系抽取智能体
if relation_extraction_agents: # 仅当存在需要抽取关系的单元时运行
relations_results = await task_group_gather(
[
# 为每个关系抽取智能体创建一个任务
lambda i=i: relation_extraction_agents[i].run(
user_prompt=doc_units[relation_indices[i]].text, # 使用原始文档单元的文本作为输入
deps=ctx.deps # 传递依赖,用于验证器
)
for i inrange(len(relation_extraction_agents))
],
timeout_seconds=100# 设定超时时间
)
# 将提取出的关系分配回对应的文档单元
for rel_idx, doc_idx inenumerate(relation_indices):
try:
# relations_results[rel_idx].output 是 List[Relation] (动态模型实例)
# 假定 DocUnit 有一个 'relationships' 属性可以接受这个列表
doc_units[doc_idx].relationships = relations_results[rel_idx].output
except Exception as e:
# 处理分配过程中可能出现的错误
logger.error(f"为文档单元 {doc_idx} 分配关系失败: {e}")
doc_units[doc_idx].relationships = None# 失败时设置为 None
# 使用更新后的单元列表更新原始文档对象
doc.units = doc_units
# 可选:保存处理后的文档 (原代码中注释掉了)
# with open(output_path, "w", encoding="utf-8") as f: # 添加编码
# json.dump(doc.dict(), f, indent=2, ensure_ascii=False) # 添加 ensure_ascii=False
asyncdefrun(self, ctx: GraphRunContext[None, Dependency]) -> End:
"""批量处理文件进行信息抽取"""# 添加中文注释
# 确保 Dependency 对象包含 relationship_constraints
ifnothasattr(ctx.deps, 'relationship_constraints'):
ctx.deps.relationship_constraints = {} # 初始化如果不存在
# 从 Schema 中加载关系约束到依赖对象中
if"RelationConstraints"inself.schema:
ctx.deps.relationship_constraints = self.schema["RelationConstraints"]
else:
logger.warning("Schema 中不包含 'RelationConstraints'。关系验证可能无法正常工作。") # 增加警告
# 查找所有输入文本文件
# 根据实际设置调整路径。原代码使用 /data/cord-19/articles/
files = glob.glob(os.path.join(self.data_dir, "*.txt")) # 使用 self.data_dir 提高灵活性
# 准备 run_task 的参数列表
args = []
for file in tqdm(files, desc="准备任务参数"): # 添加进度条描述
withopen(file,"r", encoding="utf-8") as f: # 指定编码
basename = os.path.basename(file)
sample_data = f.read()
output_path = f"data/processed/{basename}.json"# 定义输出路径
args.append([sample_data, output_path])
# 分批处理文件
bs=10# 批次大小
for i in tqdm(range(0,len(args),bs), desc="处理文件批次"): # 添加批次进度条描述
batch_args = args[i:i+bs]
# task_group_gather 接收一个可等待对象(callable)列表
await task_group_gather(
[
# 使用 lambda 捕获当前批次的文件数据和输出路径,创建异步任务
(lambda data=data, output_path=output_path: self.run_task(
ctx=ctx, # 传递当前上下文
data=data,
output_path=output_path))
for data, output_path in batch_args
],
timeout_seconds=120, # 设置批次任务的超时时间
)
# 完成所有批次处理后,返回 End 节点
return End(None)
这就是信息抽取(Information Extraction)的全部过程。
将提取出的三元组集成到 Neo4j 时,我们需要注意的另一点是创建节点和节点间关系有两种方式:CREATE
(创建)或 MERGE
(合并)节点。CREATE
模式会创建两个独立的节点,即使它们指向的是同一个概念;而 MERGE
模式则会在表面形式相同时仅保留一个节点,但这要求你确保它们确实指向的是同一个实体。
在实践中,在医学等某些领域存在许多歧义,一个术语可能指代不同的概念。我们需要一个链接(Linking)步骤,在一个词汇表/参考集中识别某个提及的唯一标识符。我们将在后续的文章中更深入地探讨这个话题。目前,我将使用 MERGE
方法来在提及之间构建桥接关系。
以下是一个将三元组导入 Neo4j 的示例代码片段。请注意,您需要根据实际的文件结构和三元组格式加载数据。
from neo4j import GraphDatabase, basic_auth
from tqdm import tqdm
import os
import glob
import json
import re # 用于解析三元组字符串
# 假设 URI, AUTH, pattern 已定义。您需要根据实际情况替换或定义它们。
# 三元组的结构假定为一个列表,列表元素是字典,例如 {'subject': '<TYPE>text</TYPE>', 'predicate': 'RELATION_TYPE', 'object': '<TYPE>text</TYPE>'}
# pattern = re.compile(r"<(?P<type>\w+)>(?P<text>.*?)</\1>") # 用于解析主语和宾语字符串
URI = "bolt://localhost:7687"# Neo4j 数据库连接 URI 占位符
AUTH = basic_auth("neo4j", "password") # Neo4j 认证信息占位符 - 请替换为您的实际凭据
# 示例:从之前生成的 JSON 文件中加载三元组
triplets = [] # 存放加载的三元组
data_dir = "data/processed"# 之前信息抽取生成的 JSON 文件目录
json_files = glob.glob(os.path.join(data_dir, "*.json"))
# 用于解析三元组字符串的正则表达式
pattern = re.compile(r"<(?P<type>\w+)>(?P<text>.*?)</\1>")
for json_file in tqdm(json_files, desc="加载三元组"): # 为文件加载添加进度条
withopen(json_file, 'r', encoding='utf-8') as f: # 指定编码
try:
doc_data = json.load(f)
# 假设 doc_data 结构包含 units -> relationships
if'units'in doc_data:
for unit in doc_data['units']:
if'relationships'in unit and unit['relationships']:
# 遍历抽取出的关系列表
for rel in unit['relationships']:
# rel 应该是 Pydantic 模型实例的字典表示,其中 subject 和 object 已经是 <TYPE>text</TYPE> 格式的字符串
ifisinstance(rel, dict) and'subject'in rel and'predicate'in rel and'object'in rel:
triplets.append(rel) # 直接添加符合格式的字典
except Exception as e:
# print(f"加载文件 {json_file} 中的三元组失败: {e}") # 打印加载失败信息 (可选)
pass# 忽略加载单个文件失败的情况 (原代码行为)
# 连接 Neo4j 数据库并导入数据
with GraphDatabase.driver(URI, auth=AUTH) as driver:
driver.verify_connectivity()
print("连接 Neo4j 数据库成功") # 翻译连接成功消息
for triplet in tqdm(triplets, desc="导入三元组到 Neo4j"): # 为导入过程添加进度条
try:
# 解析主语和宾语字符串以获取类型和文本
subject_match = pattern.match(triplet.get('subject', ''))
object_match = pattern.match(triplet.get('object', ''))
ifnot subject_match ornot object_match:
# print(f"跳过无效三元组格式: {triplet}") # 打印跳过信息 (可选)
continue# 跳过格式不符的三元组
subject_type = subject_match.group("type")
subject_text = subject_match.group("text")
object_type = object_match.group("type")
object_text = object_match.group("text")
predicate = triplet.get('predicate')
ifnot subject_type ornot subject_text ornot object_type ornot object_text ornot predicate:
# print(f"跳过不完整三元组: {triplet}") # 打印跳过信息 (可选)
continue# 跳过信息不完整的三元组
# 执行 MERGE 查询,创建或合并节点和关系
result = driver.execute_query(f"""
MERGE (a:{subject_type} {{name: $head}})
MERGE (b:{object_type} {{name: $tail}})
MERGE (a)-[:{predicate}]->(b)
""",
head=subject_text,
tail=object_text,
database="neo4j", # 如果需要,指定数据库名称
).summary
# print(f"导入三元组 ({subject_text})-[:{predicate}]->({object_text}) 成功") # 打印导入成功信息 (可选)
except Exception as e:
# print(f"导入三元组 {triplet} 失败: {e}") # 打印导入失败信息 (可选)
pass# 忽略导入单个三元组失败的情况 (原代码行为)
结果展示
我原本期望能提取到更多药物副作用的信息,但目前看来,语料库中关于副作用的提及较少。未来可以通过扩展数据集或优化模型来改善这一情况。
在本文中,我展示了如何将“人工参与(Human-in-the-loop)”设计集成到用于知识图谱构建的多智能体系统中,并以 CORD-19 数据集为例进行了案例研究。通过结合领域图和文档结构图,并借助封闭关系(Closed Relationship)强制执行本体约束,我们可以构建出更准确、结构化的知识图谱表示。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-06-17
大模型生成知识图谱——GraphRAG原理
2025-06-16
知识图谱焕发生机,激发大模型LLM深层次推理 —— 昨天,今天和明天
2025-06-15
基于知识图谱的Zero-Shot问答:大语言模型的事实锚定新范式
2025-06-14
如何为客户数据构建语义视图?
2025-06-13
构建下一代AI:深入探讨知识图谱 KG 与大模型 LLM 的集成方法
2025-06-02
知识图谱与LLM接口优化:突破复杂推理的性能瓶颈
2025-06-02
大模型时代知识图谱驱动的企业知识大脑
2025-05-28
知识图谱激活 DeepSeek 智能体,图模互补重构企业专业知识管理
2025-03-26
2025-04-07
2025-04-21
2025-05-06
2025-03-29
2025-05-23
2025-04-09
2025-04-07
2025-04-07
2025-04-03
2025-06-14
2025-05-23
2025-05-23
2025-05-22
2025-05-20
2025-04-20
2025-04-15
2025-04-09