微信扫码
添加专属顾问
我要投稿
突破知识图谱构建瓶颈!LangChain+BAML组合将抽取成功率提升至99.4%,彻底解决非结构化数据处理难题。 核心内容: 1. LangChain在知识图谱构建中的局限性分析 2. BAML模糊解析技术的突破性优势 3. 完整实现方案与Neo4j图数据库集成案例
在使用 LangChain 构建基于知识图谱的 RAG 系统或智能体时,一个主要挑战是实现从非结构化数据中准确抽取节点和关系。尤其是使用本地化小型大语言模型 (LLM) 时,这一问题尤为显著。这往往会导致 AI 系统的性能不佳。
LangChain 的信息抽取能力受限于其对严格 JSON 解析的依赖。即使在使用大型模型或高度详细的提示词模板时,这种解析方式也可能失败。
与此形成对比的是,BAML(https://github.com/BoundaryML/baml) 采用了模糊解析方法。即使大语言模型的输出不是完美的 JSON 格式,它也能成功抽取数据。
本文将探讨 LangChain 使用小型量化模型进行抽取时的局限性,并展示 BAML 如何将其抽取成功率从约 25% 提升至接近 99%。
所有代码都已上传至我的 GitHub 仓库:
为了深入理解问题并找到解决方案,我们需要准备评估数据集。通过对这些数据进行多项测试,我们可以全面掌握 BAML 改进 LangChain 知识图谱抽取效果的方式。
我们将使用 Tomasonjo(https://github.com/tomasonjo) 在 GitHub 上提供的 博客数据集(https://github.com/tomasonjo/blog-datasets)。首先,让我们加载这些数据。
# Import the pandas library for data manipulation and analysis
import pandas as pd
# Load the news articles dataset from a CSV file hosted on GitHub into a pandas DataFrame
news = pd.read_csv(
"https://raw.githubusercontent.com/tomasonjo/blog-datasets/main/news_articles.csv"
)
# Display the first 5 rows of the DataFrame
news.head()
我们的 DataFrame 结构非常简单:包含新闻的标题和正文(即描述)。我们需要再添加一列,用于存储每篇新闻文章文本对应的总 token 数量。
为此,我们可以使用 OpenAI 的 tiktoken
库。通过循环遍历数据集来计算 token 数量的方法很直接,我们现在开始实现。
# Import the tiktoken library to count tokens from text
import tiktoken
# Define a function to calculate the number of tokens in a given string for a specific model
defnum_tokens_from_string(string: str, model: str = "gpt-4o") -> int:
"""Returns the number of tokens in a text string."""
# Get the encoding for the specified model
encoding = tiktoken.encoding_for_model(model)
# Encode the string into tokens and count them
num_tokens = len(encoding.encode(string))
# Return the total number of tokens
return num_tokens
# Create a new column 'tokens' in the DataFrame
# It calculates the number of tokens for the combined 'title' and 'text' of each article
news["tokens"] = [
num_tokens_from_string(f"{row['title']} {row['text']}")
for i, row in news.iterrows()
]
计算 DataFrame 中所有文章的 token 数量仅需几秒钟。
以下是更新后的 DataFrame 展示。
# Display the first 5 rows of the DataFrame to show the new 'tokens' column
news.head()
这些 token 稍后将在我们的评估和分析阶段中使用,这也是我们执行此步骤的原因。
为了使用 AI 模型将数据转换为知识图谱,我们将采用轻量级量化模型进行严格测试评估。
在生产环境中,开源大语言模型通常通过量化部署来降低成本和延迟。本文将使用 LLaMA 3.1 模型。
本文选择 Ollama 作为平台,但 LangChain 支持多种 API 和本地大语言模型提供商,因此可根据需求选择任一合适选项。
# ChatOllama 是 Ollama 语言模型的接口
from langchain_ollama import ChatOllama
# 定义要使用的模型名称
model = "llama3"
# 初始化 ChatOllama 语言模型
# 'temperature' 参数控制输出的随机性。
# 较低的值(如 0.001)使模型的响应更具确定性。
llm = ChatOllama(model=model, temperature=0.001)
您还需要在系统上安装 Ollama。它支持 macOS、Windows 和 Linux 操作系统。
安装完成后,Ollama 将作为后台服务运行。
systemctl start ollama
手动启动它。要检查服务是否正在运行,请打开您的终端或命令提示符并输入:
# 检查可用模型
ollama list
#### 输出 ####
[ ] <-- 暂无模型
如果服务正在运行但尚未有模型,您会看到一个空模型列表,这在当前阶段是完全正常的。如果您收到“命令未找到”错误,请确保 Ollama 已正确安装。如果您遇到连接错误,则表示服务器未运行。
您只需使用 pull
命令即可下载 llama3
模型。由于模型较大,这需要一些时间和数千兆字节的磁盘空间。
# 下载 llama3 模型
ollama pull llama3
这些命令执行完成后,您可以再次运行 ollama list
,应该会看到模型已列出。
# 向本地 Ollama API 发送请求以生成文本
curl http://localhost:11434/api/generate \
# 设置 Content-Type 头以指示 JSON 有效负载
-H "Content-Type: application/json" \
# 提供请求数据
-d '{
"model": "llama3",
"prompt": "Why is the sky blue?"
}'
#### 输出 ####
{
"model": "llama3",
"created_at": "2025-08-03T12:00:00Z",
"response": "The sky appears blue be ... blue.",
"done": true
}
如果一切正常,您将在终端中看到 JSON 响应流。这证实了服务器正在运行并能够提供模型服务。
评估数据和大语言模型均已准备就绪,下一步是将数据进行转换,以深入理解 LangChain 的内部问题。
使用 LangChain 或 LangGraph 将原始或结构化数据转换为知识图谱的推荐方法是借助其内置工具。其中最常见的方法之一是 langchain_experimental
库中的 LLMGraphTransformer
。
该工具被设计为一体化解决方案:仅需提供文本和大语言模型,它便能自动处理提示词与解析,并返回图结构。
让我们看看它在我们的本地 llama3
模型上的表现如何。
首先,我们需要导入所有必需的组件。
# 从 Langchain 的实验库中导入主要图转换器
from langchain_experimental.graph_transformers import LLMGraphTransformer
# 导入图和文档的数据结构
from langchain_community.graphs.graph_document import GraphDocument, Node, Relationship
from langchain_core.documents import Document
现在,我们来初始化转换器。我们将使用之前创建的 llm
对象(即我们的 llama3
模型)。
我们还需要告诉转换器,希望它为节点和关系抽取哪些额外信息,即“属性”。在这个例子中,我们只要求抽取 description
属性。
# 使用 llama3 模型初始化 LLMGraphTransformer
# 我们指定节点和关系都需要 'description' 属性
llm_transformer = LLMGraphTransformer(
llm=llm,
node_properties=["description"],
relationship_properties=["description"]
)
为了使流程可重复且规范,我们将创建一个简单的辅助函数。这个函数将接收一段文本字符串,将其封装为 Langchain 的 Document
格式,然后传递给我们的 llm_transformer
以获取图结构。
# 导入 List 类型以用于类型提示
from typing import List
# 定义一个函数,用于处理单个文本字符串并将其转换为图文档
def process_text(text: str) -> List[GraphDocument]:
# 从原始文本创建 Langchain Document 对象
doc = Document(page_content=text)
# 使用转换器将文档转换为 GraphDocument 对象列表
return llm_transformer.convert_to_graph_documents([doc])
所有准备工作就绪,实验即将开始。为了便于管理并突出核心问题,我们将处理数据集中的 20 篇文章作为小样本。
我们将使用 ThreadPoolExecutor
来并行处理,以加快工作流程。
# 导入用于并发处理和进度条的库
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
# 设置并行工作者数量和要处理的文章数量
MAX_WORKERS = 10
NUM_ARTICLES = 20
# 此列表将存储生成的图文档
graph_documents = []
# 使用 ThreadPoolExecutor 并行处理文章
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# 为样本中的每篇文章提交处理任务
futures = [
executor.submit(process_text, f"{row['title']} {row['text']}")
for i, row in news.head(NUM_ARTICLES).iterrows()
]
# 当每个任务完成时,获取结果并将其添加到列表中
for future in tqdm(
as_completed(futures), total=len(futures), desc="Processing documents"
):
graph_document = future.result()
graph_documents.extend(graph_document)
运行代码后,进度条显示所有 20 篇文章都已处理完毕。
#### 输出 ####
Processing documents: 100%|██████████| 20/20 [01:32<00:00, 4.64s/it]
那么,我们得到了什么?让我们检查一下 graph_documents
列表。
# 显示图文档列表
print(graph_documents)
这是我们得到的输出:
#### 输出 ####
[GraphDocument(nodes=[], relationships=[], source=Document(metadata={}, page_content='XPeng Stock Rises...')),
GraphDocument(nodes=[], relationships=[], source=Document(metadata={}, page_content='Ryanair sacks chief pilot...')),
GraphDocument(nodes=[], relationships=[], source=Document(metadata={}, page_content='Dáil almost suspended...')),
GraphDocument(nodes=[Node(id='Jude Bellingham', type='Person', properties={}), Node(id='Real Madrid', type='Organization', properties={})], relationships=[], source=Document(metadata={}, page_content='Arsenal have Rice bid rejected...')),
...
]
我们立刻发现了一个问题。许多
GraphDocument
对象的nodes
和relationships
列表都是空的。
这意味着对于这些文章,大语言模型生成的输出存在两种问题:一是 LangChain 无法将其解析为有效的图结构;二是完全没有抽取到任何实体。
这就是使用小型量化大语言模型进行结构化数据抽取的根本挑战。它们常常难以遵循像 LLMGraphTransformer
这类工具所期望的严格 JSON 格式。即使是微小的错误——比如多余的逗号、遗漏的引号——都会导致解析失败,从而一无所获。
让我们量化一下这个失败率。我们将统计 20 份文档中有多少份未能成功生成图。
# 初始化无节点文档计数器
empty_count = 0
# 遍历生成的图文档
for doc in graph_documents:
# 如果 'nodes' 列表为空,则增加计数器
if not doc.nodes:
empty_count += 1
现在,我们来计算失败的百分比。
# 计算并打印未能生成任何节点的文档百分比
print(f"Percentage missing: {empty_count/len(graph_documents)*100}")
#### 输出 ####
Percentage missing: 75.0
75% 的失败率。这完全不能接受。这意味着在我们的 20 篇样本文章中,只有 5 篇成功转换成了知识图谱。
对于任何生产系统来说,仅有 25% 的成功率是不可接受的。
症结所在即在此,这是一个普遍现象。标准方法对于小型大语言模型稍显不稳定的特性来说过于僵化。
75% 的失败率是一个巨大的问题。作为开发者,当大语言模型表现不佳时,我们的第一反应通常是调整提示词。更明确的指令通常应带来更好的结果。LLMGraphTransformer
内部使用默认提示词,但我们无法轻易修改它。
因此,让我们使用 Langchain 的 ChatPromptTemplate
构建一个自己的简单链。这使我们能够完全控制发送给 llama3
的指令。我们可以更明确地“引导”模型,使其每次都生成正确的 JSON 格式。
首先,我们使用 Pydantic 模型来定义我们期望的输出结构。这在 Langchain 中是用于结构化输出的常见模式。
# 导入 Pydantic 模型以定义数据结构
from langchain_core.pydantic_v1 import BaseModel, Field
# 定义一个简单的节点结构
classNode(BaseModel):
id: str = Field(description="节点的唯一标识符。")
type: str = Field(description="节点的类型(例如:Person, Organization)。")
# 定义一个简单的关系结构
classRelationship(BaseModel):
source: Node = Field(description="关系源节点。")
target: Node = Field(description="关系目标节点。")
type: str = Field(description="关系的类型(例如:WORKS_FOR)。")
# 定义整体图结构
classKnowledgeGraph(BaseModel):
nodes: List[Node] = Field(description="图中的节点列表。")
relationships: List[Relationship] = Field(description="图中的关系列表。")
接下来,我们将创建一个新的、更详细的提示词。这个提示词将明确包含由我们的 Pydantic 模型生成的 JSON Schema,并向大语言模型提供非常具体的指令。
我们的想法是:不给模型留下任何出错的可能性。
# 导入提示词模板和输出解析器
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers.json import JsonOutputParser
# 创建所需输出结构的一个实例
parser = JsonOutputParser(pydantic_object=KnowledgeGraph)
# 创建一个包含明确指令的详细提示词模板
template = """
你是一个顶级的算法,用于以结构化格式抽取信息。
从给定的输入文本中抽取一个知识图谱,包含节点和关系。
你的目标是尽可能全面,抽取所有相关的实体及其连接。
将输出格式化为带有 'nodes' 和 'relationships' 键的 JSON 对象。
严格遵守以下 JSON Schema:
{schema}
这是输入文本:
--------------------
{text}
--------------------
"""
prompt = ChatPromptTemplate.from_template(
template,
partial_variables={"schema": parser.get_format_instructions()},
)
# 创建完整的抽取链
chain = prompt | llm | parser
这条新链比 LLMGraphTransformer
更明确。我们为模型提供了详细的 Schema 和清晰的指令。让我们再次用这 20 篇文章的样本来运行它,看看成功率是否有所提高。
# 此列表将存储新的结果
graph_documents_prompt_engineered = []
errors = []
for i, row in tqdm(news.head(NUM_ARTICLES).iterrows(), total=NUM_ARTICLES, desc="Processing with better prompt"):
text = f"{row['title']} {row['text']}"
try:
# 调用我们新的、改进后的链
graph_data = chain.invoke({"text": text})
# 手动将解析后的 JSON 转换回 GraphDocument 格式
nodes = [Node(id=node['id'], type=node['type']) for node in graph_data.get('nodes', [])]
relationships = [Relationship(source=Node(id=rel['source']['id'], type=rel['source']['type']),
target=Node(id=rel['target']['id'], type=rel['target']['type']),
type=rel['type']) for rel in graph_data.get('relationships', [])]
doc = Document(page_content=text)
graph_documents_prompt_engineered.append(GraphDocument(nodes=nodes, relationships=relationships, source=doc))
except Exception as e:
# 如果大语言模型输出的 JSON 无效,解析器将失败。我们将捕获该错误。
errors.append(str(e))
doc = Document(page_content=text)
graph_documents_prompt_engineered.append(GraphDocument(nodes=[], relationships=[], source=doc))
现在,我们来揭晓结果。让我们再次检查失败率。
# 初始化无节点文档计数器
empty_count_prompt_engineered = 0
# 遍历新结果
for doc in graph_documents_prompt_engineered:
ifnot doc.nodes:
empty_count_prompt_engineered += 1
# 计算并打印新的失败百分比
print(f"Percentage missing with improved prompt: {empty_count_prompt_engineered / len(graph_documents_prompt_engineered) * 100}%")
print(f"Number of JSON parsing errors: {len(errors)}")
#### 输出 ####
Percentage missing with improved prompt: 62.0%
Number of JSON parsing errors: 13
结果如何?失败率约为 62%。虽然这比最初的 75% 略有改善,但仍然远未达到可靠的水平。每次失败的原因都是 llama3
生成了格式错误的 JSON,即使我们已尽力优化提示词,这仍导致 JsonOutputParser
抛出错误。
这表明了一个根本性的局限性:
单纯依靠提示词工程,无法完全解决小型大语言模型结构化输出不一致的问题。
那么,如果改进提示词不是解决办法,那是什么呢?我们需要一个工具,它不仅要求好的输出,而且足够智能,能够处理大语言模型给出的不完美输出。这正是 BAML 旨在解决的问题。
在接下来的章节中,我们将用 BAML 驱动的实现来替换整个链,看看它能带来怎样的改变。
我们已经确定,即使经过精心设计的提示词工程,对于小型大语言模型来说,过度依赖严格的 JSON 解析也注定会失败。这些模型功能强大,但并非完美的格式化工具。
这就是 BAML (Basically, A Made-up Language) 发挥重要作用的地方。BAML 提供了两大关键优势,直接解决了我们的问题:
首先,您需要安装 BAML 客户端及其 VS Code 扩展。
# 安装 baml 客户端
pip install baml-py
在 VS Code 市场中搜索 baml
并安装该扩展。这个扩展功能强大,因为它提供了一个交互式平台,让您无需每次都运行 Python 代码即可测试提示词和 Schema。
接下来,我们将在 .baml
文件中定义图抽取逻辑。可以将其视为我们大语言模型调用的配置文件。我们将创建一个名为 extract_graph.baml
的文件:
// 定义图中的节点,包含 ID、类型和可选属性
classSimpleNode {
id string // 节点的唯一标识符
type string // 节点的类型/类别
properties Properties // 与节点关联的额外属性
}
// 定义节点或关系的可选属性结构
classProperties {
description string? // 可选的文本描述
}
// 定义两个节点之间的关系
classSimpleRelationship {
source_node_id string // 源节点的 ID
source_node_type string // 源节点的类型
target_node_id string // 目标节点的 ID
target_node_type string // 目标节点的类型
type string // 关系类型(例如:“连接到”、“属于”)
properties Properties // 关系的额外属性
}
// 定义由节点和关系组成的整体图结构
classDynamicGraph {
nodes SimpleNode[] // 图中所有节点的列表
relationships SimpleRelationship[] // 节点间所有关系的列表
}
// 从原始输入字符串中抽取 DynamicGraph 的函数
function ExtractGraph(graph: string) -> DynamicGraph {
client Ollama // 使用 Ollama 客户端解释输入
prompt #"
Extract fromthis content:
{{ ctx.output_format }}
{{ graph }} // 指导 Ollama 抽取图的提示词模板
}
class
定义简洁易读。ExtractGraph
函数指示 BAML 使用 Ollama
客户端并提供一个 Jinja 提示词模板。特殊变量 {{ ctx.output_format }}
由 BAML 自动注入,用于定义简化的 Schema。
下面,我们将 BAML 函数集成到 LangChain 的工作流中。为此,我们需要编写辅助函数,将 BAML 输出转换为 LangChain 和 Neo4j 可识别的 GraphDocument
格式。
# Import necessary libraries
from typing importAny, List
import baml_client as client
from langchain_community.graphs.graph_document import GraphDocument, Node, Relationship
from langchain_core.runnables import chain
# Helper function to format nodes correctly (e.g., proper capitalization)
def_format_nodes(nodes: List[Node]) -> List[Node]:
return [
Node(
id=el.id.title() ifisinstance(el.id, str) else el.id,
type=el.type.capitalize() if el.typeelseNone,
properties=el.properties
)
for el in nodes
]
# Helper to map BAML's relationship output to Langchain's Relationship object
defmap_to_base_relationship(rel: Any) -> Relationship:
source = Node(id=rel.source_node_id, type=rel.source_node_type)
target = Node(id=rel.target_node_id, type=rel.target_node_type)
return Relationship(
source=source, target=target, type=rel.type, properties=rel.properties
)
# Main helper to format all relationships
def_format_relationships(rels) -> List[Relationship]:
relationships = [
map_to_base_relationship(rel)
for rel in rels
if rel.typeand rel.source_node_id and rel.target_node_id
]
return [
Relationship(
source=_format_nodes([el.source])[0],
target=_format_nodes([el.target])[0],
type=el.type.replace(" ", "_").upper(),
properties=el.properties,
)
for el in relationships
]
# Define a LangChain chainable function to call our BAML function
@chain
asyncdefget_graph(message):
graph = await client.b.ExtractGraph(graph=message.content)
return graph
以下是每个辅助函数的目的:
_format_nodes(nodes)
:通过将节点 ID 和类型首字母大写,标准化节点格式,并返回格式化的 Node
对象列表。map_to_base_relationship(rel)
:将原始 BAML 关系转换为基本的 LangChain Relationship
对象,通过将源节点和目标节点封装为 Node
对象。_format_relationships(rels)
:过滤无效关系,将其映射为 LangChain Relationship
对象,并统一节点和关系的类型格式。get_graph(message)
:这是一个异步链式函数,负责将输入消息发送到 BAML API,调用 ExtractGraph
函数,并返回原始的图数据输出。借助这些辅助函数,我们可以定义新的处理链。我们将使用一个更为简单的自定义提示词,因为 BAML 会替我们处理复杂的模式注入工作。
# Import the prompt template
from langchain_core.prompts import ChatPromptTemplate
# A simple, effective system prompt
system_prompt = """
You are a knowledgeable assistant skilled in extracting entities and their relationships from text.
Your goal is to create a knowledge graph.
"""
# The final prompt template
default_prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
(
"human",
(
"Tip: Make sure to answer in the correct format and do not include any explanations. "
"Use the given format to extract information from the following input: {input}"
),
),
]
)
# Define the full BAML-powered chain
chain = default_prompt | llm | get_graph
该系统提示模板指导模型从文本中提取实体和关系,以构建知识图谱:
system_prompt
:将模型的角色设定为实体-关系提取器。default_prompt
:结合系统指令和用户输入,并包含一个用于输入文本的占位符。chain
:将提示词通过语言模型运行,然后将其输出传递给 get_graph
函数进行图提取。现在,我们将再次运行实验。这一次,我们将处理更大批量的文章,以真正测试这种新方法的可靠性。
由于时间限制,我在处理完 344 篇文章后停止了执行,但与最初的20篇相比,这是一个更具代表性的样本量。
为了实现并行处理,我们需要一些辅助函数。接下来,我们首先编写这些函数。
import asyncio
# Asynchronous function to process a single document
asyncdefaprocess_response(document: Document) -> GraphDocument:
# Invoke our BAML chain
resp = await chain.ainvoke({"input": document.page_content})
# Format the response into a GraphDocument
return GraphDocument(
nodes=_format_nodes(resp.nodes),
relationships=_format_relationships(resp.relationships),
source=document,
)
# Asynchronous function to process a list of documents
asyncdefaconvert_to_graph_documents(
documents: List[Document],
) -> List[GraphDocument]:
tasks = [asyncio.create_task(aprocess_response(document)) for document in documents]
results = await asyncio.gather(*tasks)
return results
# Asynchronous function to process raw texts
asyncdefaprocess_text(texts: List[str]) -> List[GraphDocument]:
docs = [Document(page_content=text) for text in texts]
graph_docs = await aconvert_to_graph_documents(docs)
return graph_docs
以下是每个异步函数的作用:
aprocess_response
:处理单个文档并返回一个 GraphDocument
。aconvert_to_graph_documents
:并行处理多个文档并返回图文档列表。aprocess_text
:将原始文本转换为文档并提取图数据。现在,我们可以执行主循环来处理我们的文章。
# Initialize an empty list to store the resulting graph documents.
graph_documents_baml = []
# Set the total number of articles to be processed.
NUM_ARTICLES_BAML = 344
# Create a smaller DataFrame containing only the articles to be processed.
news_baml = news.head(NUM_ARTICLES_BAML)
# Extract titles and texts from the new DataFrame.
titles = news_baml["title"]
texts = news_baml["text"]
# Define the number of articles to process in each batch (chunk).
chunk_size = 4
# Iterate over the articles in chunks, using tqdm to display a progress bar.
for i in tqdm(range(0, len(titles), chunk_size), desc="Processing Chunks with BAML"):
# Get the titles for the current chunk.
title_chunk = titles[i : i + chunk_size]
# Get the texts for the current chunk.
text_chunk = texts[i : i + chunk_size]
# Combine the title and text for each article in the chunk into a single string.
combined_docs = [f"{title} {text}"for title, text inzip(title_chunk, text_chunk)]
try:
# Asynchronously process the combined documents to extract graph structures.
docs = await aprocess_text(combined_docs)
# Add the processed graph documents to the main list.
graph_documents_baml.extend(docs)
except Exception as e:
# Handle any errors that occur during processing and print an error message.
print(f"Error processing chunk starting at index {i}: {e}")
# After the loop, display the total number of graph documents successfully processed.
len(graph_documents_baml)
这是我们得到的输出。
# Total number of graph documents
344
我们处理了344篇文章。现在,让我们进行与之前相同的故障分析。
# Initialize a counter for documents with no nodes
empty_count_baml = 0
# Iterate through the results from the BAML approach
for doc in graph_documents_baml:
if not doc.nodes:
empty_count_baml += 1
# Calculate and print the new failure percentage
print(f"Percentage missing with BAML: {empty_count_baml / len(graph_documents_baml) * 100}%")
#### OUTPUT ####
Percentage missing with BAML: 0.5813953488372093%
这是一个令人难以置信的结果。我们的失败率从 75% 大幅下降到仅仅 0.58%。这意味着我们的成功率现在达到了 99.4%!
通过简单地将死板的 LLMGraphTransformer
替换为由 BAML 驱动的链,我们成功地将一个失败的原型转变为一个健壮的、可投入生产的管道。
这表明瓶颈并非小型 LLM 理解任务的能力,而是系统期望完美 JSON 的脆弱性。
仅靠提取实体远不足以满足需求。GraphRAG 的真正力量在于构建这些知识、发现隐藏连接以及识别信息社区。
现在,我们将高质量的图数据加载到 Neo4j 中,并使用图数据科学技术来丰富它。
首先,我们设置与 Neo4j 数据库的连接。
import os
from langchain_community.graphs import Neo4jGraph
# Set up Neo4j connection details using environment variables
os.environ["NEO4J_URI"] = "bolt://localhost:7687"
os.environ["NEO4J_USERNAME"] = "neo4j"
os.environ["NEO4J_PASSWORD"] = "your_password" # Change this to your password
os.environ["DATABASE"] = "graphragdemo"
# Initialize the Neo4jGraph object
graph = Neo4jGraph()
现在,我们可以将 graph_documents_baml
添加到数据库中。baseEntityLabel=True
参数会为所有节点添加 __Entity__
标签,这对于后续查询非常有用。
# Add the graph documents to Neo4j
graph.add_graph_documents(graph_documents_baml, baseEntityLabel=True, include_source=True)
数据加载完成后,我们可以运行一些 Cypher 查询来了解新知识图谱的结构。首先,我们来看看文章长度(以 token 计)与从中提取的实体数量之间的关系。
# Import libraries for plotting and data analysis
import matplotlib.pyplot as plt
import seaborn as sns
# Query Neo4j to get the entity count and token count for each document
entity_dist = graph.query(
"""
MATCH (d:Document)
RETURN d.text AS text,
count {(d)-[:MENTIONS]->()} AS entity_count
"""
)
entity_dist_df = pd.DataFrame.from_records(entity_dist)
entity_dist_df["token_count"] = [
num_tokens_from_string(str(el)) for el in entity_dist_df["text"]
]
# Create a scatter plot with a regression line
sns.lmplot(
x="token_count", y="entity_count", data=entity_dist_df, line_kws={"color": "red"}
)
plt.title("Entity Count vs Token Count Distribution")
plt.xlabel("Token Count")
plt.ylabel("Entity Count")
plt.show()
该图显示出明显的正相关关系:随着文章中 token 数量的增加,我们提取的实体数量也呈增长趋势。这正是我们预期的结果,也证实了我们的提取过程是符合逻辑的。
接下来,我们看看节点度分布。这能告诉我们实体之间的连接程度。在真实世界的网络中,少数高度连接的节点(中心节点)是常见的。
import numpy as np
# Query for the degree of each entity node
degree_dist = graph.query(
"""
MATCH (e:__Entity__)
RETURN count {(e)-[:!MENTIONS]-()} AS node_degree
"""
)
degree_dist_df = pd.DataFrame.from_records(degree_dist)
# Calculate statistics
mean_degree = np.mean(degree_dist_df["node_degree"])
percentiles = np.percentile(degree_dist_df["node_degree"], [25, 50, 75, 90])
# Plot the histogram with a log scale
plt.figure(figsize=(12, 6))
sns.histplot(degree_dist_df["node_degree"], bins=50, kde=False, color="blue")
plt.yscale("log")
plt.title("Node Degree Distribution")
plt.legend()
plt.show()
直方图显示出一种“长尾”分布,这在知识图谱中很常见。大多数实体只有少数连接(度数较低),而少数实体是高度连接的中心节点。
例如,90分位数的度数为4,但最大度数达到了37。这表明像“美国”或“微软”这样的实体很可能在我们的图谱中充当关键枢纽节点。
为了找到语义上相似(即使名称不同)的实体,我们需要为它们创建向量嵌入。嵌入是文本的数值表示。我们将为每个实体的 id
和 description
生成嵌入,并将它们存储在图谱中。
我们将使用 Ollama 提供的 llama3
模型进行嵌入,并利用 LangChain 的 Neo4jVector
来实现嵌入管理。
from langchain_community.vectorstores import Neo4jVector
from langchain_ollama import OllamaEmbeddings
# Create embeddings using our local llama3 model
embeddings = OllamaEmbeddings(model="llama3")
# Initialize the Neo4jVector instance to manage embeddings in the graph
vector = Neo4jVector.from_existing_graph(
embeddings,
node_label="__Entity__",
text_node_properties=["id", "description"],
embedding_node_property="embedding",
database=os.environ["DATABASE"],
)
此命令会遍历 Neo4j 中所有的 __Entity__
节点,为它们的属性生成嵌入,并将其作为 embedding
属性存储回节点中。
有了嵌入,我们现在可以使用 **k近邻(kNN)**算法来查找向量空间中彼此接近的节点。这是一种识别潜在重复或高度相关实体(例如,“曼联”和“曼彻斯特联”)的强大方法。
我们将为此使用 Neo4j 的图数据科学(GDS)库。
# Import the GraphDataScience library
from graphdatascience import GraphDataScience
# --- GDS Client Initialization ---
# Initialize the GraphDataScience client to connect to the Neo4j database.
# It uses connection details (URI, username, password) from environment variables.
gds = GraphDataScience(
os.environ["NEO4J_URI"],
auth=(os.environ["NEO4J_USERNAME"], os.environ["NEO4J_PASSWORD"]),
)
# Set the specific database for the GDS operations.
gds.set_database(os.environ["DATABASE"])
# --- In-Memory Graph Projection ---
# Project a graph into memory for efficient processing by GDS algorithms.
# This projection is named 'entities'.
G, result = gds.graph.project(
"entities", # Name of the in-memory graph
"__Entity__", # Node label to project
"*", # Project all relationship types
nodeProperties=["embedding"] # Include the 'embedding' property for nodes
)
# --- Similarity Calculation using kNN ---
# Define the similarity threshold for creating relationships.
similarity_threshold = 0.95
# Use the k-Nearest Neighbors (kNN) algorithm to find similar nodes.
# This 'mutates' the in-memory graph by adding new relationships.
gds.knn.mutate(
G, # The in-memory graph to modify
nodeProperties=["embedding"], # Property to use for similarity calculation
mutateRelationshipType="SIMILAR", # The type of relationship to create
mutateProperty="score", # The property on the new relationship to store the similarity score
similarityCutoff=similarity_threshold, # Threshold to filter relationships
)
我们正在为嵌入相似度分数高于 0.95 的节点之间创建 SIMILAR
关系。
kNN 算法帮助我们找到了重复项的候选对象,但仅凭文本相似性并不完美。我们可以通过查找不仅语义相似而且名称也高度相似(“编辑距离”较小)的实体来进一步完善这一点。
我们将查询这些候选实体,然后使用大型语言模型(LLM)来最终决定是否合并它们。
# Query for potential duplicates based on community and name similarity
word_edit_distance = 3
potential_duplicate_candidates = graph.query(
# ... (full Cypher query from the notebook) ...
"""
MATCH (e:`__Entity__`)
WHERE size(e.id) > 4
WITH e.wcc AS community, collect(e) AS nodes, count(*) AS count
WHERE count > 1
# ... (rest of the complex query) ...
RETURN distinct(combinedResult)
""",
params={"distance": word_edit_distance},
)
# Let's look at a few candidates
potential_duplicate_candidates[:5]
上述代码的输出如下。
[{'combinedResult': ['David Van', 'Davidvan']},
{'combinedResult': ['Cyb003', 'Cyb004']},
{'combinedResult': ['Delta Air Lines', 'Delta_Air_Lines']},
{'combinedResult': ['Elon Musk', 'Elonmusk']},
{'combinedResult': ['Market', 'Markets']}]
这些看起来是明显的重复项。现在,我们可以使用另一个 BAML 函数,让 LLM 决定保留哪个名称。完成此解析过程后,我们将在 Neo4j 中合并这些节点。
# (Assuming 'merged_entities' is created by the LLM resolution process)
graph.query(
"""
UNWIND $data AS candidates
CALL {
WITH candidates
MATCH (e:__Entity__) WHERE e.id IN candidates
RETURN collect(e) AS nodes
}
CALL apoc.refactor.mergeNodes(nodes, {properties: {'`.*`': 'discard'}})
YIELD node
RETURN count(*)
""",
params={"data": merged_entities},
)
现在来到 GraphRAG 的核心:将相关实体分组为社区。
我们将把完整的图(包括所有原始关系)投影到内存中,并运行 Leiden 算法,这是一种先进的社区检测算法。
# Project the full graph, weighting relationships by their frequency
G, result = gds.graph.project(
"communities",
"__Entity__",
{
"_ALL_": {
"type": "*",
"orientation": "UNDIRECTED",
"properties": {"weight": {"property": "*", "aggregation": "COUNT"}},
}
},
)
# Run Leiden community detection and write the results back to the nodes
gds.leiden.write(
G,
writeProperty="communities",
includeIntermediateCommunities=True, # This creates hierarchical communities
relationshipWeightProperty="weight",
)
这会为每个实体节点添加一个 communities
属性,它是一个社区ID列表,包含不同粒度层级的社区(从小型、紧密结合的组到更大、更广泛的主题)。
最后,我们通过创建 __Community__
节点并将它们连接起来,在图谱中实现这种层次结构。这创建了一个可浏览的主题结构。
# Create a uniqueness constraint for community nodes
graph.query("CREATE CONSTRAINT IF NOT EXISTS FOR (c:__Community__) REQUIRE c.id IS UNIQUE;")
# Create community nodes and link entities and communities together
graph.query(
"""
MATCH (e:`__Entity__`)
UNWIND range(0, size(e.communities) - 1 , 1) AS index
// ... (full community creation query from notebook) ...
RETURN count(*)
"""
)
这个复杂的查询创建了一个多层次的社区结构,例如:(Entity)-[:IN_COMMUNITY]->(Level_0_Community)-[:IN_COMMUNITY]->(Level_1_Community)
。
经过所有这些工作,我们的知识图谱结构如何?让我们来分析每个层级的社区规模。
# Query for the size of each community at each level
community_size = graph.query(
"""
MATCH (c:__Community__)<-[:IN_COMMUNITY*]-(e:__Entity__)
WITH c, count(distinct e) AS entities
RETURN split(c.id, '-')[0] AS level, entities
"""
)
# Printing the processed dataframe
percentiles_df
这个表格在这里很重要。它向我们展示了 Leiden 算法是如何将我们的 1,875 个实体进行分组的。
这种分层结构恰好满足了高效 GraphRAG 的需求。我们现在可以在不同抽象级别执行检索。
实验结果明确表明,虽然标准的 LangChain 工具提供了快速入门的方法,但当与小型、开源 LLM 结合使用时,它们可能显得脆弱且不可靠。
通过引入 BAML,我们解决了提示词过于复杂和严格的 JSON 解析等核心问题。结果是成功率从 25% 大幅提升到 99% 以上,将一个失败的实验转变为一个健壮且可扩展的知识图谱构建管道。
以下是我们所采取的关键步骤的快速回顾:
llama3
模型。LLMGraphTransformer
进行测试,其 75% 的失败率源于严格的 JSON 解析。这种结合 LangChain 强大编排能力和 BAML 可靠结构化输出的方法,是构建强大且经济高效 AI 应用程序的有效途径。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-08-09
基于LangChain+LangGraph+LangSmith+Streamlit开发带Web页面交互的Agent
2025-08-06
LangChain 的「记忆压缩术」:聊聊大模型的上下文工程设计之道
2025-08-05
LangChain与LlamaIndex对比
2025-08-05
使用 LangSmith 实现 12 种 AI 智能体评估技术
2025-08-04
基于上下文工程的LangChain人工智能智能体应用
2025-08-03
万字讲透Dify、Coze、LangChain竞品分析
2025-08-02
万字长文!从 0 到 1 搭建基于 LangGraph 的 AI Agent
2025-07-30
用 LangChain 打造你的第一个 AI Agent:从零到一的实践指南(含关键代码)
2025-06-05
2025-05-28
2025-07-14
2025-05-28
2025-05-19
2025-06-26
2025-05-19
2025-07-14
2025-05-30
2025-05-15
2025-07-14
2025-07-13
2025-07-05
2025-06-26
2025-06-13
2025-05-21
2025-05-19
2025-05-08