微信扫码
添加专属顾问
我要投稿
BAML的模糊解析技术让LangChain知识图谱抽取成功率从25%飙升到99%,彻底解决非结构化数据处理的痛点。核心内容: 1. LangChain严格JSON解析的局限性分析 2. BAML模糊解析技术原理与集成方法 3. 实验数据对比与Neo4j图谱分析实践
在构建基于知识图谱的RAG系统或使用LangChain的代理时,最大的挑战之一是从非结构化数据中准确提取节点和关系。特别是当使用较小的、量化的本地LLM时,这一点尤其困难,结果往往是AI系统表现不佳。
LangChain提取功能的一个关键问题是它依赖严格的JSON解析,即使使用更大的模型或非常详细的提示模板,也可能失败。相比之下,BAML使用一种模糊解析(fuzzy parsing)方法,即使LLM的输出不是完美的JSON格式,也能成功提取数据。
在这篇博客中,我们将探讨在使用较小的量化模型时LangChain提取的局限性,并展示BAML如何将提取成功率从大约25%提升到超过99%。
所有代码都可以在这个GitHub仓库中找到:
https://github.com/FareedKhan-dev/langchain-graphrag-baml
为了理解问题及其解决方案,我们需要一个评估数据集来进行多次测试,以了解BAML如何改进LangChain知识图谱。
我们将使用Tomasonjo的博客数据集,托管在GitHub上,先加载这些数据。
# 导入pandas库用于数据操作和分析
import pandas as pd
# 从GitHub上的CSV文件加载新闻文章数据集到pandas DataFrame
news = pd.read_csv(
"https://raw.githubusercontent.com/tomasonjo/blog-datasets/main/news_articles.csv"
)
# 显示DataFrame的前5行
news.head()
我们的DataFrame很简单(包含标题和文本,文本是新闻的描述)。我们还需要一列来存储每篇新闻文章文本对应的总token数。
为此,我们可以使用OpenAI的tiktoken库来计算token,方法很简单,用循环来处理数据集。
# 导入tiktoken库来计算文本的token数
import tiktoken
# 定义一个函数,计算给定字符串在指定模型中的token数
defnum_tokens_from_string(string: str, model: str = "gpt-4o") -> int:
"""返回文本字符串中的token数。"""
# 获取指定模型的编码
encoding = tiktoken.encoding_for_model(model)
# 将字符串编码为token并计数
num_tokens = len(encoding.encode(string))
# 返回总token数
return num_tokens
# 在DataFrame中创建新列'tokens'
# 计算每篇文章标题和文本组合的token数
news["tokens"] = [
num_tokens_from_string(f"{row['title']} {row['text']}")
for i, row in news.iterrows()
]
计算DataFrame的token只需要几秒钟。
这是更新后的DataFrame。
# 显示DataFrame的前5行,展示新的'tokens'列
news.head()
这些token将在后续的评估和分析阶段使用,因此我们进行了这一步。
为了将数据转换为知识图谱,我们将使用一个低级别量化的模型来进行严格的测试。
在生产环境中,开源LLM通常以量化形式部署,以降低成本和延迟。本博客使用LLaMA 3.1。
这里选择Ollama作为平台,但LangChain支持多种API和本地LLM提供商,可以选择任何合适的选项。
# ChatOllama是Ollama语言模型的接口
from langchain_ollama import ChatOllama
# 定义要使用的模型名称
model = "llama3"
# 初始化ChatOllama语言模型
# 'temperature'参数控制输出的随机性
# 低值(如0.001)使模型的响应更确定
llm = ChatOllama(model=model, temperature=0.001)
你还需要在系统上安装Ollama,它支持macOS、Windows和Linux。
访问Ollama官方网站:https://ollama.com/
下载适合你操作系统的安装程序并按照说明安装。安装后,Ollama会作为后台服务运行。
在macOS和Windows上,应用程序应自动启动并在后台运行(你可能在菜单栏或系统托盘中看到一个图标)。在Linux上,你可能需要用systemctl start ollama
手动启动。
要检查服务是否运行,打开终端或命令提示符并输入:
# 检查可用模型
ollama list
[ ] <-- No models
如果服务在运行但没有模型,你会看到一个空的模型列表,这在这个阶段是正常的。如果出现“command not found”错误,确保Ollama已正确安装。如果出现连接错误,说明服务器未运行。
你可以通过pull命令简单下载llama3模型。这需要一些时间和几GB的磁盘空间,因为模型很大。
# 下载llama3模型
ollama pull llama3
这些命令完成后,再次运行ollama list
,你应该能看到模型已列出。
# 向本地Ollama API发送请求以生成文本
curl http://localhost:11434/api/generate \
# 设置Content-Type头以指示JSON负载
-H "Content-Type: application/json" \
# 提供请求数据
-d '{
"model": "llama3",
"prompt": "Why is the sky blue?"
}'
{
"model": "llama3",
"created_at": "2025-08-03T12:00:00Z",
"response": "The sky appears blue be ... blue.",
"done": true
}
如果成功,你会在终端看到一串JSON响应,确认服务器正在运行并能提供模型服务。
现在评估数据和LLM都准备好了,下一步是将数据转换以更好地理解LangChain中的问题。
使用LangChain或LangGraph将原始或结构化数据转换为知识图谱的正确方法是使用它们提供的方法。最常见的方法之一是langchain_experimental库中的LLMGraphTransformer。
这个工具设计为一体化的解决方案:提供文本和LLM,它会处理提示和解析,返回图谱结构。
让我们看看它与本地llama3模型的表现如何。
首先,我们需要导入所有必要的组件。
# 从LangChain的实验库中导入主要的图谱转换器
from langchain_experimental.graph_transformers import LLMGraphTransformer
# 导入图谱和文档的数据结构
from langchain_community.graphs.graph_document import GraphDocument, Node, Relationship
from langchain_core.documents import Document
现在,初始化转换器。我们将使用之前创建的llm对象(即llama3模型)。
我们还需要告诉转换器我们希望为节点和关系提取哪些额外信息或“属性”。在这个例子中,我们只要求描述。
# 使用llama3模型初始化LLMGraphTransformer
# 指定我们希望节点和关系都有'description'属性
llm_transformer = LLMGraphTransformer(
llm=llm,
node_properties=["description"],
relationship_properties=["description"]
)
为了让流程可重复且整洁,我们将创建一个简单的辅助函数。这个函数将接受一个文本字符串,将其包装成LangChain的Document格式,然后传递给llm_transformer以获取图谱结构。
# 导入List类型用于类型提示
from typing import List
# 定义一个函数,处理单个文本字符串并将其转换为图谱文档
def process_text(text: str) -> List[GraphDocument]:
# 从原始文本创建LangChain Document对象
doc = Document(page_content=text)
# 使用转换器将文档转换为图谱文档列表
return llm_transformer.convert_to_graph_documents([doc])
一切设置好后,是时候运行实验了。为了保持可管理性并突出核心问题,我们将处理数据集中的20篇文章样本。
我们将使用ThreadPoolExecutor并行运行处理,以加快工作流程。
# 导入并发处理和进度条的库
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
# 设置并行工作者的数量和要处理的文章数量
MAX_WORKERS = 10
NUM_ARTICLES = 20
# 这个列表将存储生成的图谱文档
graph_documents = []
# 使用ThreadPoolExecutor并行处理文章
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 为样本中的每篇文章提交处理任务
futures = [
executor.submit(process_text, f"{row['title']} {row['text']}")
for i, row in news.head(NUM_ARTICLES).iterrows()
]
# 每当任务完成时,获取结果并添加到列表中
for future in tqdm(
as_completed(futures), total=len(futures), desc="处理文档"
):
graph_document = future.result()
graph_documents.extend(graph_document)
运行代码后,进度条显示所有20篇文章都已处理。
处理文档: 100%|██████████| 20/20 [01:32<00:00, 4.64s/it]
那么,我们得到了什么?让我们检查graph_documents列表。
# 显示图谱文档列表
print(graph_documents)
这是我们得到的输出:
[GraphDocument(nodes=[], relationships=[], source=Document(metadata={}, page_content='XPeng Stock Rises...')),
GraphDocument(nodes=[], relationships=[], source=Document(metadata={}, page_content='Ryanair sacks chief pilot...')),
GraphDocument(nodes=[], relationships=[], source=Document(metadata={}, page_content='Dáil almost suspended...')),
GraphDocument(nodes=[Node(id='Jude Bellingham', type='Person', properties={}), Node(id='Real Madrid', type='Organization', properties={})], relationships=[], source=Document(metadata={}, page_content='Arsenal have Rice bid rejected...')),
...
]
立刻就能看出问题。许多GraphDocument对象的节点和关系列表是空的。
这意味着对于这些文章,LLM要么生成了LangChain无法解析成有效图谱结构的输出,要么完全无法提取任何实体。
这就是使用较小的量化LLM进行结构化数据提取的核心挑战。它们往往难以遵循像LLMGraphTransformer这样的工具所期望的严格JSON格式。如果有一个小小的错误——比如多余的逗号、缺少引号——解析就会失败,我们什么也得不到。
让我们量化这个失败率。我们将统计20篇文档中有多少篇生成了空的图谱。
# 初始化一个计数器,用于统计没有节点的文档
empty_count = 0
# 遍历生成的图谱文档
for doc in graph_documents:
# 如果'nodes'列表为空,计数器加1
if not doc.nodes:
empty_count += 1
现在,计算失败的百分比。
# 计算并打印失败生成节点的文档百分比
print(f"Percentage missing: {empty_count/len(graph_documents)*100}")
Percentage missing: 75.0
75%的失败率。这太糟糕了。这意味着在我们的20篇文章样本中,只有5篇成功转换成了知识图谱。
25%的成功率对于任何生产系统来说都是不可接受的。
这就是问题的所在,而且这是一个常见问题。标准方法对于较小LLM略显不可预测的特性来说过于严格。
75%的失败率是个大问题。作为开发者,当LLM表现不佳时,我们的第一反应往往是调整提示。更好的指令应该带来更好的结果,对吧?LLMGraphTransformer内部使用默认提示,但我们无法轻易修改它。
所以,我们用LangChain的ChatPromptTemplate构建自己的简单链。这让我们可以完全控制发送给llama3的指令。我们可以更明确地“引导”模型每次生成正确的JSON格式。
我们先用Pydantic模型定义我们想要的输出结构。这是LangChain中结构化输出的常见模式。
# 导入Pydantic模型以定义数据结构
from langchain_core.pydantic_v1 import BaseModel, Field
# 定义一个简单的节点结构
classNode(BaseModel):
id: str = Field(description="节点的唯一标识符。")
type: str = Field(description="节点类型(例如,Person, Organization)。")
# 定义一个简单的关系结构
classRelationship(BaseModel):
source: Node = Field(description="关系的源节点。")
target: Node = Field(description="关系的目标节点。")
type: str = Field(description="关系类型(例如,WORKS_FOR)。")
# 定义整体图谱结构
classKnowledgeGraph(BaseModel):
nodes: List[Node] = Field(description="图谱中的节点列表。")
relationships: List[Relationship] = Field(description="图谱中的关系列表。")
接下来,我们创建一个更详细的提示。这个提示将明确包含从Pydantic模型生成的JSON schema,并给LLM非常具体的指令。
我们的目标是尽量减少错误。
# 导入提示模板和输出解析器
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers.json import JsonOutputParser
# 创建我们期望输出结构的实例
parser = JsonOutputParser(pydantic_object=KnowledgeGraph)
# 创建一个详细的提示模板,包含明确指令
template = """
你是一个顶级算法,擅长以结构化格式提取信息。
从给定的输入文本中提取知识图谱,包括节点和关系。
你的目标是尽可能全面,提取所有相关实体及其连接。
将输出格式化为带有'nodes'和'relationships'键的JSON对象。
严格遵循以下JSON schema:
{schema}
以下是输入文本:
--------------------
{text}
--------------------
"""
prompt = ChatPromptTemplate.from_template(
template,
partial_variables={"schema": parser.get_format_instructions()},
)
# 创建完整的提取链
chain = prompt | llm | parser
这个新链比LLMGraphTransformer更明确。我们给模型提供了详细的schema和清晰的指令。让我们再次运行20篇文章样本,看看成功率是否有所提高。
# 这个列表将存储新结果
graph_documents_prompt_engineered = []
errors = []
for i, row in tqdm(news.head(NUM_ARTICLES).iterrows(), total=NUM_ARTICLES, desc="使用改进提示处理"):
text = f"{row['title']} {row['text']}"
try:
# 调用我们改进的新链
graph_data = chain.invoke({"text": text})
# 手动将解析的JSON转换回GraphDocument格式
nodes = [Node(id=node['id'], type=node['type']) for node in graph_data.get('nodes', [])]
relationships = [Relationship(source=Node(id=rel['source']['id'], type=rel['source']['type']),
target=Node(id=rel['target']['id'], type=rel['target']['type']),
type=rel['type']) for rel in graph_data.get('relationships', [])]
doc = Document(page_content=text)
graph_documents_prompt_engineered.append(GraphDocument(nodes=nodes, relationships=relationships, source=doc))
except Exception as e:
# 如果LLM输出不是有效的JSON,解析器会失败。我们捕获这个错误。
errors.append(str(e))
doc = Document(page_content=text)
graph_documents_prompt_engineered.append(GraphDocument(nodes=[], relationships=[], source=doc))
现在是关键时刻。让我们再次检查失败率。
# 初始化一个计数器,用于统计没有节点的文档
empty_count_prompt_engineered = 0
# 遍历新结果
for doc in graph_documents_prompt_engineered:
if not doc.nodes:
empty_count_prompt_engineered += 1
# 计算并打印新的失败百分比
print(f"Percentage missing with improved prompt: {empty_count_prompt_engineered / len(graph_documents_prompt_engineered) * 100}%")
print(f"Number of JSON parsing errors: {len(errors)}")
Percentage missing with improved prompt: 62.0%
Number of JSON parsing errors: 13
结果呢?失败率约为62%。虽然比最初的75%略有改进,但仍远不够可靠。我们仍然无法从20篇文章中的13篇提取图谱。JsonOutputParser每次都抛出错误,因为尽管我们尽力优化了提示,llama3仍然生成了格式错误的JSON。
这表明了一个根本性限制:
仅靠提示工程无法完全解决较小LLM生成不一致结构化输出的问题。
那么,如果更好的提示不是答案,那是什么?我们需要一个工具,不仅要求好的输出,还能聪明地处理LLM给出的不完美输出。这正是BAML设计来解决的问题。
在接下来的部分,我们将用BAML驱动的实现替换整个链,看看它带来的变化。
我们已经确定,即使小心进行提示工程,依赖严格的JSON解析与较小的LLM一起使用是失败的秘诀。模型很强大,但不是完美的格式化工具。
这正是BAML(Basically, A Made-up Language)非常重要的地方。BAML提供了两个关键优势,直接解决了我们的问题:
首先,你需要安装BAML客户端和它的VS Code扩展。
# 安装BAML客户端
pip install baml-py
在VS Code市场中搜索BAML并安装扩展。这个扩展很棒,因为它提供了一个交互式游乐场,让你无需每次运行Python代码即可测试提示和schema。
接下来,我们在一个.baml文件中定义图谱提取逻辑。将其视为LLM调用的配置文件。我们创建一个名为extract_graph.baml
的文件:
// 定义图谱中的节点,包含ID、类型和可选属性
class SimpleNode {
id string // 节点的唯一标识符
type string // 节点的类型/类别
properties Properties // 与节点相关的附加属性
}
// 定义节点或关系的可选属性结构
class Properties {
description string? // 可选的文本描述
}
// 定义两个节点之间的关系
class SimpleRelationship {
source_node_id string // 源节点的ID
source_node_type string // 源节点的类型
target_node_id string // 目标节点的ID
target_node_type string // 目标节点的类型
type string // 关系类型(例如,"connects_to", "belongs_to")
properties Properties // 关系的附加属性
}
// 定义包含节点和关系的整体图谱
class DynamicGraph {
nodes SimpleNode[] // 图谱中的所有节点列表
relationships SimpleRelationship[] // 节点之间的所有关系列表
}
// 从原始输入字符串提取DynamicGraph的函数
function ExtractGraph(graph: string) -> DynamicGraph {
client Ollama // 使用Ollama客户端解释输入
prompt #"
Extract from this content:
{{ ctx.output_format }}
{{ graph }} // 提示模板,指导Ollama提取图谱
}
类定义简单易读。ExtractGraph
函数告诉BAML使用Ollama客户端,并提供了一个Jinja提示模板。特殊的{{ ctx.output_format }}
变量是BAML自动注入我们简化schema定义的地方。
现在,我们将这个BAML函数集成到LangChain工作流程中。我们需要一些辅助函数,将BAML的输出转换为LangChain和Neo4j理解的GraphDocument格式。
# 导入必要的库
from typing importAny, List
import baml_client as client
from langchain_community.graphs.graph_document import GraphDocument, Node, Relationship
from langchain_core.runnables import chain
# 辅助函数,正确格式化节点(例如,适当大写)
def_format_nodes(nodes: List[Node]) -> List[Node]:
return [
Node(
id=el.id.title() ifisinstance(el.id, str) else el.id,
type=el.type.capitalize() if el.typeelseNone,
properties=el.properties
)
for el in nodes
]
# 辅助函数,将BAML的关系输出映射到LangChain的Relationship对象
defmap_to_base_relationship(rel: Any) -> Relationship:
source = Node(id=rel.source_node_id, type=rel.source_node_type)
target = Node(id=rel.target_node_id, type=rel.target_node_type)
return Relationship(
source=source, target=target, type=rel.type, properties=rel.properties
)
# 主要辅助函数,格式化所有关系
def_format_relationships(rels) -> List[Relationship]:
relationships = [
map_to_base_relationship(rel)
for rel in rels
if rel.typeand rel.source_node_id and rel.target_node_id
]
return [
Relationship(
source=_format_nodes([el.source])[0],
target=_format_nodes([el.target])[0],
type=el.type.replace(" ", "_").upper(),
properties=el.properties,
)
for el in relationships
]
# 定义一个LangChain可链式调用的函数,调用我们的BAML函数
@chain
asyncdefget_graph(message):
graph = await client.b.ExtractGraph(graph=message.content)
return graph
让我们了解每个辅助函数的目的:
_format_nodes(nodes)
:通过大写ID和类型来标准化节点格式,返回格式整洁的Node对象列表。map_to_base_relationship(rel)
:将原始BAML关系转换为基本的LangChain Relationship对象,将源和目标包装为Node对象。_format_relationships(rels)
:过滤无效关系,将其映射到LangChain Relationship对象,并格式化节点类型和关系类型以保持一致性。get_graph(message)
:一个异步链函数,将输入消息发送到BAML API,调用ExtractGraph,并返回原始图谱输出。有了这些辅助函数,我们可以定义新的处理链。我们将使用一个更简单的自定义提示,因为BAML为我们处理了复杂的schema注入。
# 导入提示模板
from langchain_core.prompts import ChatPromptTemplate
# 一个简单有效的系统提示
system_prompt = """
你是一个知识渊博的助手,擅长从文本中提取实体及其关系。
你的目标是创建知识图谱。
"""
# 最终提示模板
default_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
(
"human",
(
"提示:确保以正确格式回答,不要包含任何解释。 "
"使用给定格式从以下输入中提取信息:{input}"
),
),
]
)
# 定义完整的BAML驱动链
chain = default_prompt | llm | get_graph
这个提示模板指导模型提取实体和关系以构建知识图谱:
system_prompt
:将模型角色设置为实体-关系提取器。default_prompt
:结合系统和人类消息,带有输入文本的占位符。chain
:通过语言模型运行提示,然后将输出传递给get_graph进行图谱提取。现在是再次运行实验的时候了。这次我们将处理更大的文章批次,以真正测试新方法的可靠性。
由于时间限制,我在处理344篇文章后停止了执行,但这比最初的20篇样本要稳健得多。
在执行并行处理之前,需要一些辅助函数,我们先来写这些函数。
import asyncio
# 异步函数,处理单个文档
asyncdefaprocess_response(document: Document) -> GraphDocument:
# 调用我们的BAML链
resp = await chain.ainvoke({"input": document.page_content})
# 将响应格式化为GraphDocument
return GraphDocument(
nodes=_format_nodes(resp.nodes),
relationships=_format_relationships(resp.relationships),
source=document,
)
# 异步函数,处理文档列表
asyncdefaconvert_to_graph_documents(
documents: List[Document],
) -> List[GraphDocument]:
tasks = [asyncio.create_task(aprocess_response(document)) for document in documents]
results = await asyncio.gather(*tasks)
return results
# 异步函数,处理原始文本
asyncdefaprocess_text(texts: List[str]) -> List[GraphDocument]:
docs = [Document(page_content=text) for text in texts]
graph_docs = await aconvert_to_graph_documents(docs)
return graph_docs
让我们分解每个异步函数的目的:
aprocess_response
:处理一个文档并返回GraphDocument。aconvert_to_graph_documents
:并行处理多个文档并返回图谱结果。aprocess_text
:将原始文本转换为文档并提取图谱数据。现在,我们可以简单地执行主循环来处理文章。
# 初始化一个空列表,存储生成的图谱文档
graph_documents_baml = []
# 设置要处理的文章总数
NUM_ARTICLES_BAML = 344
# 创建一个仅包含要处理的文章的较小DataFrame
news_baml = news.head(NUM_ARTICLES_BAML)
# 从新DataFrame中提取标题和文本
titles = news_baml["title"]
texts = news_baml["text"]
# 定义每批(chunk)处理的文章数量
chunk_size = 4
# 使用tqdm显示进度条,逐批迭代文章
for i in tqdm(range(0, len(titles), chunk_size), desc="使用BAML处理分块"):
# 获取当前分块的标题
title_chunk = titles[i : i + chunk_size]
# 获取当前分块的文本
text_chunk = texts[i : i + chunk_size]
# 将每篇文章的标题和文本合并为单个字符串
combined_docs = [f"{title} {text}"for title, text inzip(title_chunk, text_chunk)]
try:
# 异步处理合并的文档以提取图谱结构
docs = await aprocess_text(combined_docs)
# 将处理好的图谱文档添加到主列表
graph_documents_baml.extend(docs)
except Exception as e:
# 处理处理过程中发生的任何错误并打印错误消息
print(f"处理从索引{i}开始的分块时出错:{e}")
# 循环结束后,显示成功处理的图谱文档总数
len(graph_documents_baml)
这是我们得到的输出。
# 图谱文档总数
344
我们处理了344篇文章。现在,让我们运行之前做的失败分析。
# 初始化一个计数器,用于统计没有节点的文档
empty_count_baml = 0
# 遍历BAML方法的处理结果
for doc in graph_documents_baml:
if not doc.nodes:
empty_count_baml += 1
# 计算并打印新的失败百分比
print(f"Percentage missing with BAML: {empty_count_baml / len(graph_documents_baml) * 100}%")
Percentage missing with BAML: 0.5813953488372093%
这是一个惊人的结果。我们的失败率从75%下降到仅0.58%。这意味着我们的成功率现在是99.4%!
通过简单地将严格的LLMGraphTransformer替换为BAML驱动的链,我们从一个失败的原型转变为一个稳健的生产就绪流程。
这表明瓶颈不是小型LLM理解任务的能力,而是系统对完美JSON的脆弱期望。
仅仅提取实体是不够的。GraphRAG的真正力量在于结构化这些知识,找到隐藏的联系,并总结相关信息的社区。
我们现在将高质量的图谱数据加载到Neo4j中,并使用图数据科学技术来丰富它。
首先,我们设置与Neo4j数据库的连接。
import os
from langchain_community.graphs import Neo4jGraph
# 使用环境变量设置Neo4j连接详情
os.environ["NEO4J_URI"] = "bolt://localhost:7687"
os.environ["NEO4J_USERNAME"] = "neo4j"
os.environ["NEO4J_PASSWORD"] = "your_password" # 将此更改为你的密码
os.environ["DATABASE"] = "graphragdemo"
# 初始化Neo4jGraph对象
graph = Neo4jGraph()
现在,我们可以将graph_documents_baml添加到数据库中。baseEntityLabel=True
参数为所有节点添加__Entity__
标签,便于后续查询。
# 将图谱文档添加到Neo4j
graph.add_graph_documents(graph_documents_baml, baseEntityLabel=True, include_source=True)
数据加载后,我们可以运行一些Cypher查询来了解新知识图谱的结构。让我们从查看文章长度(以token计)和从中提取的实体数量之间的关系开始。
# 导入绘图和数据分析的库
import matplotlib.pyplot as plt
import seaborn as sns
# 查询Neo4j以获取每个文档的实体数量和token数量
entity_dist = graph.query(
"""
MATCH (d:Document)
RETURN d.text AS text,
count {(d)-[:MENTIONS]->()} AS entity_count
"""
)
entity_dist_df = pd.DataFrame.from_records(entity_dist)
entity_dist_df["token_count"] = [
num_tokens_from_string(str(el)) for el in entity_dist_df["text"]
]
# 创建带回归线的散点图
sns.lmplot(
x="token_count", y="entity_count", data=entity_dist_df, line_kws={"color": "red"}
)
plt.title("实体数量与Token数量分布")
plt.xlabel("Token数量")
plt.ylabel("实体数量")
plt.show()
实体数量与Token数量
该图显示了一个明显的正相关:随着文章中token数量的增加,提取的实体数量也倾向于增加。这正是我们期望的,证实了我们的提取过程表现得很合理。
接下来,我们看看节点度分布。这告诉我们实体的连接程度。在现实世界的网络中,少数高度连接的节点(中心节点)是常见的。
import numpy as np
# 查询每个实体节点的度
degree_dist = graph.query(
"""
MATCH (e:__Entity__)
RETURN count {(e)-[:!MENTIONS]-()} AS node_degree
"""
)
degree_dist_df = pd.DataFrame.from_records(degree_dist)
# 计算统计数据
mean_degree = np.mean(degree_dist_df["node_degree"])
percentiles = np.percentile(degree_dist_df["node_degree"], [25, 50, 75, 90])
# 绘制对数尺度的直方图
plt.figure(figsize=(12, 6))
sns.histplot(degree_dist_df["node_degree"], bins=50, kde=False, color="blue")
plt.yscale("log")
plt.title("节点度分布")
plt.legend()
plt.show()
节点度分布
直方图显示了一个“长尾”分布,这是知识图谱的典型特征。大多数实体只有少数连接(低度),而少数实体是高度连接的中心节点。
例如,第90百分位的度数是4,但最大度数是37。这表明像“USA”或“Microsoft”这样的实体可能是图谱中的中心点。
为了找到语义上相似的实体(即使名称不同),我们需要为它们创建向量嵌入(embedding)。嵌入是文本的数字表示。我们将为每个实体的ID和描述生成嵌入,并存储在图谱中。
我们将通过Ollama使用llama3模型进行嵌入,并使用LangChain的Neo4jVector来处理这个过程。
from langchain_community.vectorstores import Neo4jVector
from langchain_ollama import OllamaEmbeddings
# 使用本地llama3模型创建嵌入
embeddings = OllamaEmbeddings(model="llama3")
# 初始化Neo4jVector实例以管理图谱中的嵌入
vector = Neo4jVector.from_existing_graph(
embeddings,
node_label="__Entity__",
text_node_properties=["id", "description"],
embedding_node_property="embedding",
database=os.environ["DATABASE"],
)
此命令遍历Neo4j中的所有__Entity__
节点,为其属性生成嵌入,并将其存储回节点的embedding属性中。
有了嵌入,我们现在可以使用k-Nearest Neighbors(kNN)算法找到向量空间中彼此接近的节点。这是识别潜在重复或高度相关实体(例如,“Man United”和“Manchester United”)的强大方法。
我们将使用Neo4j的Graph Data Science(GDS)库来实现这一点。
# 导入GraphDataScience库
from graphdatascience import GraphDataScience
# --- GDS客户端初始化 ---
# 初始化GraphDataScience客户端以连接到Neo4j数据库
# 使用环境变量中的连接详情(URI、用户名、密码)
gds = GraphDataScience(
os.environ["NEO4J_URI"],
auth=(os.environ["NEO4J_USERNAME"], os.environ["NEO4J_PASSWORD"]),
)
# 为GDS操作设置特定数据库
gds.set_database(os.environ["DATABASE"])
# --- 内存图投影 ---
# 将图谱投影到内存中以便GDS算法高效处理
# 此投影命名为'entities'
G, result = gds.graph.project(
"entities", # 内存图的名称
"__Entity__", # 要投影的节点标签
"*", # 投影所有关系类型
nodeProperties=["embedding"] # 包含节点的'embedding'属性
)
# --- 使用kNN计算相似性 ---
# 定义创建关系的相似性阈值
similarity_threshold = 0.95
# 使用k-Nearest Neighbors(kNN)算法找到相似节点
# 这会通过添加新关系“变异”内存图
gds.knn.mutate(
G, # 要修改的内存图
nodeProperties=["embedding"], # 用于相似性计算的属性
mutateRelationshipType="SIMILAR", # 要创建的关系类型
mutateProperty="score", # 新关系上存储相似性分数的属性
similarityCutoff=similarity_threshold, # 过滤关系的阈值
)
我们为嵌入相似性分数高于0.95的节点创建SIMILAR
关系。
kNN算法帮助我们找到了潜在的重复实体,但仅靠文本相似性并不完美。我们可以通过寻找不仅语义相似而且名称非常相似的实体(低“编辑距离”)进一步优化。
我们将查询这些候选实体,然后使用LLM做出最终的合并决定。
# 根据社区和名称相似性查询潜在重复实体
word_edit_distance = 3
potential_duplicate_candidates = graph.query(
"""
MATCH (e:`__Entity__`)
WHERE size(e.id) > 4
WITH e.wcc AS community, collect(e) AS nodes, count(*) AS count
WHERE count > 1
# ... (笔记本中的完整Cypher查询) ...
RETURN distinct(combinedResult)
""",
params={"distance": word_edit_distance},
)
# 看看几个候选实体
potential_duplicate_candidates[:5]
上述代码的输出如下。
[{'combinedResult': ['David Van', 'Davidvan']},
{'combinedResult': ['Cyb003', 'Cyb004']},
{'combinedResult': ['Delta Air Lines', 'Delta_Air_Lines']},
{'combinedResult': ['Elon Musk', 'Elonmusk']},
{'combinedResult': ['Market', 'Markets']}]
这些看起来明显是重复的。我们现在可以使用另一个BAML函数让LLM决定保留哪个名称。运行这个分辨过程后,我们在Neo4j中合并这些节点。
# (假设'merged_entities'由LLM分辨过程创建)
graph.query(
"""
UNWIND $data AS candidates
CALL {
WITH candidates
MATCH (e:__Entity__) WHERE e.id IN candidates
RETURN collect(e) AS nodes
}
CALL apoc.refactor.mergeNodes(nodes, {properties: {'`.*`': 'discard'}})
YIELD node
RETURN count(*)
""",
params={"data": merged_entities},
)
现在是GraphRAG的核心:将相关实体分组为社区。
我们将投影整个图谱(包括所有原始关系)到内存中,并运行Leiden算法,这是一个最先进的社区检测算法。
# 投影整个图谱,按关系频率加权
G, result = gds.graph.project(
"communities",
"__Entity__",
{
"_ALL_": {
"type": "*",
"orientation": "UNDIRECTED",
"properties": {"weight": {"property": "*", "aggregation": "COUNT"}},
}
},
)
# 运行Leiden社区检测并将结果写回节点
gds.leiden.write(
G,
writeProperty="communities",
includeIntermediateCommunities=True, # 这会创建层次社区
relationshipWeightProperty="weight",
)
这会为每个实体节点添加一个communities属性,这是一个不同粒度级别的社区ID列表(从小型紧密群体到更大的广泛主题)。
最后,我们通过创建__Community__
节点并将它们链接起来,将这个层次结构具体化在图谱中。这创建了一个可浏览的主题结构。
# 为社区节点创建唯一性约束
graph.query("CREATE CONSTRAINT IF NOT EXISTS FOR (c:__Community__) REQUIRE c.id IS UNIQUE;")
# 创建社区节点并将实体和社区链接起来
graph.query(
"""
MATCH (e:`__Entity__`)
UNWIND range(0, size(e.communities) - 1 , 1) AS index
// ... (笔记本中的完整社区创建查询) ...
RETURN count(*)
"""
)
这个复杂查询创建了一个多级社区结构,例如:(Entity)-[:IN_COMMUNITY]->(Level_0_Community)-[:IN_COMMUNITY]->(Level_1_Community)
。
经过所有这些工作,我们的知识图谱是什么样的?让我们分析每一级的社区规模。
# 查询每一级社区的大小
community_size = graph.query(
"""
MATCH (c:__Community__)<-[:IN_COMMUNITY*]-(e:__Entity__)
WITH c, count(distinct e) AS entities
RETURN split(c.id, '-')[0] AS level, entities
"""
)
# 打印处理后的DataFrame
percentiles_df
百分位DataFrame
这个表格很重要。它显示了Leiden算法如何对我们的1,875个实体进行分组。
在Level 0,我们有858个小型、聚焦的社区,其中90%包含4个或更少的成员。
到Level 3,算法将这些合并为732个更大、更广泛的社区,这一级的最大社区包含77个实体。
这种层次结构正是我们进行有效GraphRAG所需的。我们现在可以在不同抽象级别进行检索。
结果很明显。虽然标准的LangChain工具提供了一个快速入门的途径,但它们在与较小的开源LLM一起使用时可能不稳定且不可靠。
通过引入BAML,我们解决了过于复杂的提示和严格JSON解析的核心问题。结果是将成功率从25%大幅提升到超过99%,将一个失败的实验转变为一个稳健且可扩展的知识图谱构建流程。
以下是我们采取的关键步骤的快速回顾:
这种使用LangChain进行强大编排和BAML进行可靠结构化输出的方法是构建强大且成本效益高的AI应用的制胜组合。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-08-17
Manus、LangChain一手经验:先别给Multi Agent判死刑,是你不会管理上下文
2025-08-16
关于Langchain/Langgraph框架的流式与非流式返回——invoke/ainvoke/stream/astream
2025-08-12
LangChain+BAML:打造99.4%成功率的知识图谱构建方案
2025-08-09
基于LangChain+LangGraph+LangSmith+Streamlit开发带Web页面交互的Agent
2025-08-06
LangChain 的「记忆压缩术」:聊聊大模型的上下文工程设计之道
2025-08-05
LangChain与LlamaIndex对比
2025-08-05
使用 LangSmith 实现 12 种 AI 智能体评估技术
2025-08-04
基于上下文工程的LangChain人工智能智能体应用
2025-06-05
2025-05-28
2025-05-28
2025-07-14
2025-06-26
2025-07-14
2025-05-30
2025-07-16
2025-06-16
2025-05-30
2025-07-14
2025-07-13
2025-07-05
2025-06-26
2025-06-13
2025-05-21
2025-05-19
2025-05-08