免费POC,零成本试错

AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


LangChain+BAML:打造99.4%成功率的知识图谱构建方案

发布日期:2025-08-12 08:48:30 浏览次数: 1534
作者:活水智能

微信搜一搜,关注“活水智能”

推荐语

突破知识图谱构建瓶颈!LangChain+BAML组合将抽取成功率提升至99.4%,彻底解决非结构化数据处理难题。

核心内容:
1. LangChain在知识图谱构建中的局限性分析
2. BAML模糊解析技术的突破性优势
3. 完整实现方案与Neo4j图数据库集成案例

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家


 

在使用 LangChain 构建基于知识图谱的 RAG 系统或智能体时,一个主要挑战是实现从非结构化数据中准确抽取节点和关系。尤其是使用本地化小型大语言模型 (LLM) 时,这一问题尤为显著。这往往会导致 AI 系统的性能不佳。

Langchain 与 BAML 对比 (图片由 Fareed Khan 创建)

LangChain 的信息抽取能力受限于其对严格 JSON 解析的依赖。即使在使用大型模型或高度详细的提示词模板时,这种解析方式也可能失败。

与此形成对比的是,BAML(https://github.com/BoundaryML/baml) 采用了模糊解析方法。即使大语言模型的输出不是完美的 JSON 格式,它也能成功抽取数据。

本文将探讨 LangChain 使用小型量化模型进行抽取时的局限性,并展示 BAML 如何将其抽取成功率从约 25% 提升至接近 99%

所有代码都已上传至我的 GitHub 仓库:

目录

  • • 初始化评估数据集
  • • 量化小型 LLaMA 模型
  • • 基于 LLMGraphTransformer 的方法
  • • 理解 LangChain 的问题所在
  • • 改进提示词是否能够解决问题?
  • • BAML 初始化与快速概览
  • • 整合 BAML 与 LangChain
  • • 运行 BAML 实验
  • • 使用 Neo4j 分析 GraphRAG
  • • 查找并链接相似实体
  • • 使用 Leiden 算法进行社区检测
  • • 分析最终图结构
  • • 结论

初始化评估数据集

为了深入理解问题并找到解决方案,我们需要准备评估数据集。通过对这些数据进行多项测试,我们可以全面掌握 BAML 改进 LangChain 知识图谱抽取效果的方式。

我们将使用 Tomasonjo(https://github.com/tomasonjo) 在 GitHub 上提供的 博客数据集(https://github.com/tomasonjo/blog-datasets)。首先,让我们加载这些数据。

# Import the pandas library for data manipulation and analysis
import pandas as pd

# Load the news articles dataset from a CSV file hosted on GitHub into a pandas DataFrame
news = pd.read_csv(
    "https://raw.githubusercontent.com/tomasonjo/blog-datasets/main/news_articles.csv"
)

# Display the first 5 rows of the DataFrame
news.head()

我们的 DataFrame 结构非常简单:包含新闻的标题和正文(即描述)。我们需要再添加一列,用于存储每篇新闻文章文本对应的总 token 数量。

为此,我们可以使用 OpenAI 的 tiktoken 库。通过循环遍历数据集来计算 token 数量的方法很直接,我们现在开始实现。

# Import the tiktoken library to count tokens from text
import tiktoken

# Define a function to calculate the number of tokens in a given string for a specific model
defnum_tokens_from_string(string: str, model: str = "gpt-4o") -> int:
    """Returns the number of tokens in a text string."""
    # Get the encoding for the specified model
    encoding = tiktoken.encoding_for_model(model)
    # Encode the string into tokens and count them
    num_tokens = len(encoding.encode(string))
    # Return the total number of tokens
    return num_tokens

# Create a new column 'tokens' in the DataFrame
# It calculates the number of tokens for the combined 'title' and 'text' of each article
news["tokens"] = [
    num_tokens_from_string(f"{row['title']} {row['text']}")
    for i, row in news.iterrows()
]

计算 DataFrame 中所有文章的 token 数量仅需几秒钟。

以下是更新后的 DataFrame 展示。

# Display the first 5 rows of the DataFrame to show the new 'tokens' column
news.head()

这些 token 稍后将在我们的评估和分析阶段中使用,这也是我们执行此步骤的原因。

量化小型 LLaMA 模型

为了使用 AI 模型将数据转换为知识图谱,我们将采用轻量级量化模型进行严格测试评估。

在生产环境中,开源大语言模型通常通过量化部署来降低成本和延迟。本文将使用 LLaMA 3.1 模型。

本文选择 Ollama 作为平台,但 LangChain 支持多种 API 和本地大语言模型提供商,因此可根据需求选择任一合适选项。

# ChatOllama 是 Ollama 语言模型的接口
from langchain_ollama import ChatOllama

# 定义要使用的模型名称
model = "llama3"

# 初始化 ChatOllama 语言模型
# 'temperature' 参数控制输出的随机性。
# 较低的值(如 0.001)使模型的响应更具确定性。
llm = ChatOllama(model=model, temperature=0.001)

您还需要在系统上安装 Ollama。它支持 macOS、Windows 和 Linux 操作系统。

  1. 1. 访问 Ollama 官方网站:https://ollama.com/
  2. 2. 下载适用于您操作系统的安装程序,并按照安装说明进行操作。

安装完成后,Ollama 将作为后台服务运行。

  • • 在 macOS 和 Windows 上,应用程序应自动启动并在后台运行(您可能会在菜单栏或系统托盘中看到图标)。
  • • 在 Linux 上,您可能需要使用 systemctl start ollama 手动启动它。

要检查服务是否正在运行,请打开您的终端或命令提示符并输入:

# 检查可用模型
ollama list

#### 输出 ####
[ ] <-- 暂无模型

如果服务正在运行但尚未有模型,您会看到一个空模型列表,这在当前阶段是完全正常的。如果您收到“命令未找到”错误,请确保 Ollama 已正确安装。如果您遇到连接错误,则表示服务器未运行。

您只需使用 pull 命令即可下载 llama3 模型。由于模型较大,这需要一些时间和数千兆字节的磁盘空间。

# 下载 llama3 模型
ollama pull llama3

这些命令执行完成后,您可以再次运行 ollama list,应该会看到模型已列出。

# 向本地 Ollama API 发送请求以生成文本
curl http://localhost:11434/api/generate \
    # 设置 Content-Type 头以指示 JSON 有效负载
    -H "Content-Type: application/json" \
    # 提供请求数据
    -d '{
        "model": "llama3",
        "prompt": "Why is the sky blue?"
    }'


#### 输出 ####
{
"model""llama3",
"created_at""2025-08-03T12:00:00Z",
"response""The sky appears blue be ... blue.",
"done"true
}

如果一切正常,您将在终端中看到 JSON 响应流。这证实了服务器正在运行并能够提供模型服务。

评估数据和大语言模型均已准备就绪,下一步是将数据进行转换,以深入理解 LangChain 的内部问题。

基于 LLMGraphTransformer 的方法

使用 LangChain 或 LangGraph 将原始或结构化数据转换为知识图谱的推荐方法是借助其内置工具。其中最常见的方法之一是 langchain_experimental 库中的 LLMGraphTransformer

该工具被设计为一体化解决方案:仅需提供文本和大语言模型,它便能自动处理提示词与解析,并返回图结构。

让我们看看它在我们的本地 llama3 模型上的表现如何。

首先,我们需要导入所有必需的组件。

# 从 Langchain 的实验库中导入主要图转换器
from langchain_experimental.graph_transformers import LLMGraphTransformer

# 导入图和文档的数据结构
from langchain_community.graphs.graph_document import GraphDocument, Node, Relationship
from langchain_core.documents import Document

现在,我们来初始化转换器。我们将使用之前创建的 llm 对象(即我们的 llama3 模型)。

我们还需要告诉转换器,希望它为节点和关系抽取哪些额外信息,即“属性”。在这个例子中,我们只要求抽取 description 属性。

# 使用 llama3 模型初始化 LLMGraphTransformer
# 我们指定节点和关系都需要 'description' 属性
llm_transformer = LLMGraphTransformer(
    llm=llm,
    node_properties=["description"],
    relationship_properties=["description"]
)

为了使流程可重复且规范,我们将创建一个简单的辅助函数。这个函数将接收一段文本字符串,将其封装为 Langchain 的 Document 格式,然后传递给我们的 llm_transformer 以获取图结构。

# 导入 List 类型以用于类型提示
from typing import List

# 定义一个函数,用于处理单个文本字符串并将其转换为图文档
def process_text(text: str) -> List[GraphDocument]:
    # 从原始文本创建 Langchain Document 对象
    doc = Document(page_content=text)
    # 使用转换器将文档转换为 GraphDocument 对象列表
    return llm_transformer.convert_to_graph_documents([doc])

所有准备工作就绪,实验即将开始。为了便于管理并突出核心问题,我们将处理数据集中的 20 篇文章作为小样本。

我们将使用 ThreadPoolExecutor 来并行处理,以加快工作流程。

# 导入用于并发处理和进度条的库
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

# 设置并行工作者数量和要处理的文章数量
MAX_WORKERS = 10
NUM_ARTICLES = 20

# 此列表将存储生成的图文档
graph_documents = []

# 使用 ThreadPoolExecutor 并行处理文章
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    # 为样本中的每篇文章提交处理任务
    futures = [
        executor.submit(process_text, f"{row['title']} {row['text']}")
        for i, row in news.head(NUM_ARTICLES).iterrows()
    ]

    # 当每个任务完成时,获取结果并将其添加到列表中
    for future in tqdm(
        as_completed(futures), total=len(futures), desc="Processing documents"
    ):
        graph_document = future.result()
        graph_documents.extend(graph_document)

运行代码后,进度条显示所有 20 篇文章都已处理完毕。

#### 输出 ####
Processing documents: 100%|██████████| 20/20 [01:32<00:00,  4.64s/it]

理解 LangChain 的问题所在

那么,我们得到了什么?让我们检查一下 graph_documents 列表。

# 显示图文档列表
print(graph_documents)

这是我们得到的输出:

#### 输出 ####
[GraphDocument(nodes=[], relationships=[], source=Document(metadata={}, page_content='XPeng Stock Rises...')),
 GraphDocument(nodes=[], relationships=[], source=Document(metadata={}, page_content='Ryanair sacks chief pilot...')),
 GraphDocument(nodes=[], relationships=[], source=Document(metadata={}, page_content='Dáil almost suspended...')),
 GraphDocument(nodes=[Node(id='Jude Bellingham'type='Person', properties={}), Node(id='Real Madrid'type='Organization', properties={})], relationships=[], source=Document(metadata={}, page_content='Arsenal have Rice bid rejected...')),
 ...
]

我们立刻发现了一个问题。许多 GraphDocument 对象的 nodes 和 relationships 列表都是空的。

这意味着对于这些文章,大语言模型生成的输出存在两种问题:一是 LangChain 无法将其解析为有效的图结构;二是完全没有抽取到任何实体。

这就是使用小型量化大语言模型进行结构化数据抽取的根本挑战。它们常常难以遵循像 LLMGraphTransformer 这类工具所期望的严格 JSON 格式。即使是微小的错误——比如多余的逗号、遗漏的引号——都会导致解析失败,从而一无所获。

让我们量化一下这个失败率。我们将统计 20 份文档中有多少份未能成功生成图。

# 初始化无节点文档计数器
empty_count = 0

# 遍历生成的图文档
for doc in graph_documents:
    # 如果 'nodes' 列表为空,则增加计数器
    if not doc.nodes:
        empty_count += 1

现在,我们来计算失败的百分比。

# 计算并打印未能生成任何节点的文档百分比
print(f"Percentage missing: {empty_count/len(graph_documents)*100}")


#### 输出 ####
Percentage missing: 75.0

75% 的失败率。这完全不能接受。这意味着在我们的 20 篇样本文章中,只有 5 篇成功转换成了知识图谱。

对于任何生产系统来说,仅有 25% 的成功率是不可接受的。

症结所在即在此,这是一个普遍现象。标准方法对于小型大语言模型稍显不稳定的特性来说过于僵化。

改进提示词是否能够解决问题?

75% 的失败率是一个巨大的问题。作为开发者,当大语言模型表现不佳时,我们的第一反应通常是调整提示词。更明确的指令通常应带来更好的结果。LLMGraphTransformer 内部使用默认提示词,但我们无法轻易修改它。

因此,让我们使用 Langchain 的 ChatPromptTemplate 构建一个自己的简单链。这使我们能够完全控制发送给 llama3 的指令。我们可以更明确地“引导”模型,使其每次都生成正确的 JSON 格式。

首先,我们使用 Pydantic 模型来定义我们期望的输出结构。这在 Langchain 中是用于结构化输出的常见模式。

# 导入 Pydantic 模型以定义数据结构
from langchain_core.pydantic_v1 import BaseModel, Field

# 定义一个简单的节点结构
classNode(BaseModel):
    idstr = Field(description="节点的唯一标识符。")
    typestr = Field(description="节点的类型(例如:Person, Organization)。")

# 定义一个简单的关系结构
classRelationship(BaseModel):
    source: Node = Field(description="关系源节点。")
    target: Node = Field(description="关系目标节点。")
    typestr = Field(description="关系的类型(例如:WORKS_FOR)。")

# 定义整体图结构
classKnowledgeGraph(BaseModel):
    nodes: List[Node] = Field(description="图中的节点列表。")
    relationships: List[Relationship] = Field(description="图中的关系列表。")

接下来,我们将创建一个新的、更详细的提示词。这个提示词将明确包含由我们的 Pydantic 模型生成的 JSON Schema,并向大语言模型提供非常具体的指令。

我们的想法是:不给模型留下任何出错的可能性。

# 导入提示词模板和输出解析器
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers.json import JsonOutputParser

# 创建所需输出结构的一个实例
parser = JsonOutputParser(pydantic_object=KnowledgeGraph)

# 创建一个包含明确指令的详细提示词模板
template = """
你是一个顶级的算法,用于以结构化格式抽取信息。
从给定的输入文本中抽取一个知识图谱,包含节点和关系。
你的目标是尽可能全面,抽取所有相关的实体及其连接。

将输出格式化为带有 'nodes' 和 'relationships' 键的 JSON 对象。
严格遵守以下 JSON Schema:
{schema}

这是输入文本:
--------------------
{text}
--------------------
"""


prompt = ChatPromptTemplate.from_template(
    template,
    partial_variables={"schema": parser.get_format_instructions()},
)

# 创建完整的抽取链
chain = prompt | llm | parser

这条新链比 LLMGraphTransformer 更明确。我们为模型提供了详细的 Schema 和清晰的指令。让我们再次用这 20 篇文章的样本来运行它,看看成功率是否有所提高。

# 此列表将存储新的结果
graph_documents_prompt_engineered = []
errors = []

for i, row in tqdm(news.head(NUM_ARTICLES).iterrows(), total=NUM_ARTICLES, desc="Processing with better prompt"):
    text = f"{row['title']} {row['text']}"
    try:
        # 调用我们新的、改进后的链
        graph_data = chain.invoke({"text": text})

        # 手动将解析后的 JSON 转换回 GraphDocument 格式
        nodes = [Node(id=node['id'], type=node['type']) for node in graph_data.get('nodes', [])]
        relationships = [Relationship(source=Node(id=rel['source']['id'], type=rel['source']['type']),
                                      target=Node(id=rel['target']['id'], type=rel['target']['type']),
                                      type=rel['type']) for rel in graph_data.get('relationships', [])]

        doc = Document(page_content=text)
        graph_documents_prompt_engineered.append(GraphDocument(nodes=nodes, relationships=relationships, source=doc))

    except Exception as e:
        # 如果大语言模型输出的 JSON 无效,解析器将失败。我们将捕获该错误。
        errors.append(str(e))
        doc = Document(page_content=text)
        graph_documents_prompt_engineered.append(GraphDocument(nodes=[], relationships=[], source=doc))

现在,我们来揭晓结果。让我们再次检查失败率。

# 初始化无节点文档计数器
empty_count_prompt_engineered = 0

# 遍历新结果
for doc in graph_documents_prompt_engineered:
    ifnot doc.nodes:
        empty_count_prompt_engineered += 1

# 计算并打印新的失败百分比
print(f"Percentage missing with improved prompt: {empty_count_prompt_engineered / len(graph_documents_prompt_engineered) * 100}%")
print(f"Number of JSON parsing errors: {len(errors)}")


#### 输出 ####
Percentage missing with improved prompt: 62.0%
Number of JSON parsing errors: 13

结果如何?失败率约为 62%。虽然这比最初的 75% 略有改善,但仍然远未达到可靠的水平。每次失败的原因都是 llama3 生成了格式错误的 JSON,即使我们已尽力优化提示词,这仍导致 JsonOutputParser 抛出错误。

这表明了一个根本性的局限性:

单纯依靠提示词工程,无法完全解决小型大语言模型结构化输出不一致的问题。

那么,如果改进提示词不是解决办法,那是什么呢?我们需要一个工具,它不仅要求好的输出,而且足够智能,能够处理大语言模型给出的不完美输出。这正是 BAML 旨在解决的问题。

在接下来的章节中,我们将用 BAML 驱动的实现来替换整个链,看看它能带来怎样的改变。

BAML 初始化与快速概览

我们已经确定,即使经过精心设计的提示词工程,对于小型大语言模型来说,过度依赖严格的 JSON 解析也注定会失败。这些模型功能强大,但并非完美的格式化工具。

这就是 BAML (Basically, A Made-up Language) 发挥重要作用的地方。BAML 提供了两大关键优势,直接解决了我们的问题:

  1. 1. 简化 Schema: BAML 不使用冗长的 JSON Schema,而是采用一种简洁、类似 TypeScript 的语法来定义数据结构。这种语法既便于人类理解,也利于大语言模型解析,同时减少了 token 消耗并降低了出错风险。
  2. 2. 健壮的解析: BAML 的客户端内置了“模糊”或“与 Schema 对齐”的解析器。它不期望完美的 JSON。即使大语言模型出现常见的错误,如多余的逗号、遗漏的引号或额外文本,它仍能成功抽取数据。

首先,您需要安装 BAML 客户端及其 VS Code 扩展。

# 安装 baml 客户端
pip install baml-py

在 VS Code 市场中搜索 baml 并安装该扩展。这个扩展功能强大,因为它提供了一个交互式平台,让您无需每次都运行 Python 代码即可测试提示词和 Schema。

接下来,我们将在 .baml 文件中定义图抽取逻辑。可以将其视为我们大语言模型调用的配置文件。我们将创建一个名为 extract_graph.baml 的文件:

// 定义图中的节点,包含 ID、类型和可选属性
classSimpleNode {
  id string                   // 节点的唯一标识符
  type string                // 节点的类型/类别
  properties Properties      // 与节点关联的额外属性
}

// 定义节点或关系的可选属性结构
classProperties {
  description string?        // 可选的文本描述
}

// 定义两个节点之间的关系
classSimpleRelationship {
  source_node_id string      // 源节点的 ID
  source_node_type string    // 源节点的类型
  target_node_id string      // 目标节点的 ID
  target_node_type string    // 目标节点的类型
  type string                // 关系类型(例如:“连接到”、“属于”)
  properties Properties      // 关系的额外属性
}

// 定义由节点和关系组成的整体图结构
classDynamicGraph {
  nodes SimpleNode[]               // 图中所有节点的列表
  relationships SimpleRelationship[] // 节点间所有关系的列表
}

// 从原始输入字符串中抽取 DynamicGraph 的函数
function ExtractGraph(graph: string) -> DynamicGraph {
  client Ollama                   // 使用 Ollama 客户端解释输入
  prompt #"
    Extract fromthis content:
    {{ ctx.output_format }}

    {{ graph }}                           // 指导 Ollama 抽取图的提示词模板
}

class 定义简洁易读。ExtractGraph 函数指示 BAML 使用 Ollama 客户端并提供一个 Jinja 提示词模板。特殊变量 {{ ctx.output_format }} 由 BAML 自动注入,用于定义简化的 Schema。

将 BAML 集成到 LangChain 工作流中

下面,我们将 BAML 函数集成到 LangChain 的工作流中。为此,我们需要编写辅助函数,将 BAML 输出转换为 LangChain 和 Neo4j 可识别的 GraphDocument 格式。

# Import necessary libraries
from typing importAnyList
import baml_client as client
from langchain_community.graphs.graph_document import GraphDocument, Node, Relationship
from langchain_core.runnables import chain

# Helper function to format nodes correctly (e.g., proper capitalization)
def_format_nodes(nodes: List[Node]) -> List[Node]:
    return [
        Node(
            id=el.id.title() ifisinstance(el.idstrelse el.id,
            type=el.type.capitalize() if el.typeelseNone,
            properties=el.properties
        )
        for el in nodes
    ]

# Helper to map BAML's relationship output to Langchain's Relationship object
defmap_to_base_relationship(rel: Any) -> Relationship:
    source = Node(id=rel.source_node_id, type=rel.source_node_type)
    target = Node(id=rel.target_node_id, type=rel.target_node_type)
    return Relationship(
        source=source, target=target, type=rel.type, properties=rel.properties
    )

# Main helper to format all relationships
def_format_relationships(rels) -> List[Relationship]:
    relationships = [
        map_to_base_relationship(rel)
        for rel in rels
        if rel.typeand rel.source_node_id and rel.target_node_id
    ]
    return [
        Relationship(
            source=_format_nodes([el.source])[0],
            target=_format_nodes([el.target])[0],
            type=el.type.replace(" ""_").upper(),
            properties=el.properties,
        )
        for el in relationships
    ]

# Define a LangChain chainable function to call our BAML function
@chain
asyncdefget_graph(message):
    graph = await client.b.ExtractGraph(graph=message.content)
    return graph

以下是每个辅助函数的目的:

  • • _format_nodes(nodes):通过将节点 ID 和类型首字母大写,标准化节点格式,并返回格式化的 Node 对象列表。
  • • map_to_base_relationship(rel):将原始 BAML 关系转换为基本的 LangChain Relationship 对象,通过将源节点和目标节点封装为 Node 对象。
  • • _format_relationships(rels):过滤无效关系,将其映射为 LangChain Relationship 对象,并统一节点和关系的类型格式。
  • • get_graph(message):这是一个异步链式函数,负责将输入消息发送到 BAML API,调用 ExtractGraph 函数,并返回原始的图数据输出。

借助这些辅助函数,我们可以定义新的处理链。我们将使用一个更为简单的自定义提示词,因为 BAML 会替我们处理复杂的模式注入工作。

# Import the prompt template
from langchain_core.prompts import ChatPromptTemplate

# A simple, effective system prompt
system_prompt = """
You are a knowledgeable assistant skilled in extracting entities and their relationships from text.
Your goal is to create a knowledge graph.
"""


# The final prompt template
default_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        (
            "human",
            (
                "Tip: Make sure to answer in the correct format and do not include any explanations. "
                "Use the given format to extract information from the following input: {input}"
            ),
        ),
    ]
)

# Define the full BAML-powered chain
chain = default_prompt | llm | get_graph

该系统提示模板指导模型从文本中提取实体和关系,以构建知识图谱:

  • • system_prompt:将模型的角色设定为实体-关系提取器。
  • • default_prompt:结合系统指令和用户输入,并包含一个用于输入文本的占位符。
  • • chain:将提示词通过语言模型运行,然后将其输出传递给 get_graph 函数进行图提取。

运行 BAML 实验

现在,我们将再次运行实验。这一次,我们将处理更大批量的文章,以真正测试这种新方法的可靠性。

由于时间限制,我在处理完 344 篇文章后停止了执行,但与最初的20篇相比,这是一个更具代表性的样本量。

为了实现并行处理,我们需要一些辅助函数。接下来,我们首先编写这些函数。

import asyncio

# Asynchronous function to process a single document
asyncdefaprocess_response(document: Document) -> GraphDocument:
    # Invoke our BAML chain
    resp = await chain.ainvoke({"input": document.page_content})
    # Format the response into a GraphDocument
    return GraphDocument(
        nodes=_format_nodes(resp.nodes),
        relationships=_format_relationships(resp.relationships),
        source=document,
    )

# Asynchronous function to process a list of documents
asyncdefaconvert_to_graph_documents(
    documents: List[Document],
) -> List[GraphDocument]:
    tasks = [asyncio.create_task(aprocess_response(document)) for document in documents]
    results = await asyncio.gather(*tasks)
    return results

# Asynchronous function to process raw texts
asyncdefaprocess_text(texts: List[str]) -> List[GraphDocument]:
    docs = [Document(page_content=text) for text in texts]
    graph_docs = await aconvert_to_graph_documents(docs)
    return graph_docs

以下是每个异步函数的作用:

  • • aprocess_response:处理单个文档并返回一个 GraphDocument
  • • aconvert_to_graph_documents:并行处理多个文档并返回图文档列表。
  • • aprocess_text:将原始文本转换为文档并提取图数据。

现在,我们可以执行主循环来处理我们的文章。

# Initialize an empty list to store the resulting graph documents.
graph_documents_baml = []

# Set the total number of articles to be processed.
NUM_ARTICLES_BAML = 344

# Create a smaller DataFrame containing only the articles to be processed.
news_baml = news.head(NUM_ARTICLES_BAML)

# Extract titles and texts from the new DataFrame.
titles = news_baml["title"]
texts = news_baml["text"]

# Define the number of articles to process in each batch (chunk).
chunk_size = 4

# Iterate over the articles in chunks, using tqdm to display a progress bar.
for i in tqdm(range(0len(titles), chunk_size), desc="Processing Chunks with BAML"):
    # Get the titles for the current chunk.
    title_chunk = titles[i : i + chunk_size]
    # Get the texts for the current chunk.
    text_chunk = texts[i : i + chunk_size]

    # Combine the title and text for each article in the chunk into a single string.
    combined_docs = [f"{title} {text}"for title, text inzip(title_chunk, text_chunk)]

    try:
        # Asynchronously process the combined documents to extract graph structures.
        docs = await aprocess_text(combined_docs)
        # Add the processed graph documents to the main list.
        graph_documents_baml.extend(docs)
    except Exception as e:
        # Handle any errors that occur during processing and print an error message.
        print(f"Error processing chunk starting at index {i}{e}")

# After the loop, display the total number of graph documents successfully processed.
len(graph_documents_baml)

这是我们得到的输出。

# Total number of graph documents
344
分块大小分布图

我们处理了344篇文章。现在,让我们进行与之前相同的故障分析。

# Initialize a counter for documents with no nodes
empty_count_baml = 0

# Iterate through the results from the BAML approach
for doc in graph_documents_baml:
    if not doc.nodes:
        empty_count_baml += 1

# Calculate and print the new failure percentage
print(f"Percentage missing with BAML: {empty_count_baml / len(graph_documents_baml) * 100}%")


#### OUTPUT ####
Percentage missing with BAML: 0.5813953488372093%

这是一个令人难以置信的结果。我们的失败率从 75% 大幅下降到仅仅 0.58%。这意味着我们的成功率现在达到了 99.4%

通过简单地将死板的 LLMGraphTransformer 替换为由 BAML 驱动的链,我们成功地将一个失败的原型转变为一个健壮的、可投入生产的管道。

这表明瓶颈并非小型 LLM 理解任务的能力,而是系统期望完美 JSON 的脆弱性。

使用 Neo4j 分析 GraphRAG

仅靠提取实体远不足以满足需求。GraphRAG 的真正力量在于构建这些知识、发现隐藏连接以及识别信息社区。

现在,我们将高质量的图数据加载到 Neo4j 中,并使用图数据科学技术来丰富它。

首先,我们设置与 Neo4j 数据库的连接。

import os
from langchain_community.graphs import Neo4jGraph

# Set up Neo4j connection details using environment variables
os.environ["NEO4J_URI"] = "bolt://localhost:7687"
os.environ["NEO4J_USERNAME"] = "neo4j"
os.environ["NEO4J_PASSWORD"] = "your_password" # Change this to your password
os.environ["DATABASE"] = "graphragdemo"

# Initialize the Neo4jGraph object
graph = Neo4jGraph()

现在,我们可以将 graph_documents_baml 添加到数据库中。baseEntityLabel=True 参数会为所有节点添加 __Entity__ 标签,这对于后续查询非常有用。

# Add the graph documents to Neo4j
graph.add_graph_documents(graph_documents_baml, baseEntityLabel=True, include_source=True)

数据加载完成后,我们可以运行一些 Cypher 查询来了解新知识图谱的结构。首先,我们来看看文章长度(以 token 计)与从中提取的实体数量之间的关系。

# Import libraries for plotting and data analysis
import matplotlib.pyplot as plt
import seaborn as sns

# Query Neo4j to get the entity count and token count for each document
entity_dist = graph.query(
    """
    MATCH (d:Document)
    RETURN d.text AS text,
           count {(d)-[:MENTIONS]->()} AS entity_count
    """

)
entity_dist_df = pd.DataFrame.from_records(entity_dist)
entity_dist_df["token_count"] = [
    num_tokens_from_string(str(el)) for el in entity_dist_df["text"]
]

# Create a scatter plot with a regression line
sns.lmplot(
    x="token_count", y="entity_count", data=entity_dist_df, line_kws={"color""red"}
)
plt.title("Entity Count vs Token Count Distribution")
plt.xlabel("Token Count")
plt.ylabel("Entity Count")
plt.show()
实体数量与 Token 数量图

该图显示出明显的正相关关系:随着文章中 token 数量的增加,我们提取的实体数量也呈增长趋势。这正是我们预期的结果,也证实了我们的提取过程是符合逻辑的。

接下来,我们看看节点度分布。这能告诉我们实体之间的连接程度。在真实世界的网络中,少数高度连接的节点(中心节点)是常见的。

import numpy as np

# Query for the degree of each entity node
degree_dist = graph.query(
    """
    MATCH (e:__Entity__)
    RETURN count {(e)-[:!MENTIONS]-()} AS node_degree
    """

)
degree_dist_df = pd.DataFrame.from_records(degree_dist)

# Calculate statistics
mean_degree = np.mean(degree_dist_df["node_degree"])
percentiles = np.percentile(degree_dist_df["node_degree"], [25507590])

# Plot the histogram with a log scale
plt.figure(figsize=(126))
sns.histplot(degree_dist_df["node_degree"], bins=50, kde=False, color="blue")
plt.yscale("log")
plt.title("Node Degree Distribution")
plt.legend()
plt.show()
节点度分布图

直方图显示出一种“长尾”分布,这在知识图谱中很常见。大多数实体只有少数连接(度数较低),而少数实体是高度连接的中心节点。

例如,90分位数的度数为4,但最大度数达到了37。这表明像“美国”或“微软”这样的实体很可能在我们的图谱中充当关键枢纽节点。

为了找到语义上相似(即使名称不同)的实体,我们需要为它们创建向量嵌入。嵌入是文本的数值表示。我们将为每个实体的 id 和 description 生成嵌入,并将它们存储在图谱中。

我们将使用 Ollama 提供的 llama3 模型进行嵌入,并利用 LangChain 的 Neo4jVector 来实现嵌入管理。

from langchain_community.vectorstores import Neo4jVector
from langchain_ollama import OllamaEmbeddings

# Create embeddings using our local llama3 model
embeddings = OllamaEmbeddings(model="llama3")

# Initialize the Neo4jVector instance to manage embeddings in the graph
vector = Neo4jVector.from_existing_graph(
    embeddings,
    node_label="__Entity__",
    text_node_properties=["id""description"],
    embedding_node_property="embedding",
    database=os.environ["DATABASE"],
)

此命令会遍历 Neo4j 中所有的 __Entity__ 节点,为它们的属性生成嵌入,并将其作为 embedding 属性存储回节点中。

查找并链接相似实体

有了嵌入,我们现在可以使用 **k近邻(kNN)**算法来查找向量空间中彼此接近的节点。这是一种识别潜在重复或高度相关实体(例如,“曼联”和“曼彻斯特联”)的强大方法。

我们将为此使用 Neo4j 的图数据科学(GDS)库。

# Import the GraphDataScience library
from graphdatascience import GraphDataScience

# --- GDS Client Initialization ---
# Initialize the GraphDataScience client to connect to the Neo4j database.
# It uses connection details (URI, username, password) from environment variables.
gds = GraphDataScience(
    os.environ["NEO4J_URI"],
    auth=(os.environ["NEO4J_USERNAME"], os.environ["NEO4J_PASSWORD"]),
)
# Set the specific database for the GDS operations.
gds.set_database(os.environ["DATABASE"])

# --- In-Memory Graph Projection ---
# Project a graph into memory for efficient processing by GDS algorithms.
# This projection is named 'entities'.
G, result = gds.graph.project(
    "entities",                   # Name of the in-memory graph
    "__Entity__",                 # Node label to project
    "*",                          # Project all relationship types
    nodeProperties=["embedding"]  # Include the 'embedding' property for nodes
)

# --- Similarity Calculation using kNN ---
# Define the similarity threshold for creating relationships.
similarity_threshold = 0.95

# Use the k-Nearest Neighbors (kNN) algorithm to find similar nodes.
# This 'mutates' the in-memory graph by adding new relationships.
gds.knn.mutate(
    G,                                  # The in-memory graph to modify
    nodeProperties=["embedding"],       # Property to use for similarity calculation
    mutateRelationshipType="SIMILAR",   # The type of relationship to create
    mutateProperty="score",             # The property on the new relationship to store the similarity score
    similarityCutoff=similarity_threshold, # Threshold to filter relationships
)

我们正在为嵌入相似度分数高于 0.95 的节点之间创建 SIMILAR 关系。

kNN 算法帮助我们找到了重复项的候选对象,但仅凭文本相似性并不完美。我们可以通过查找不仅语义相似而且名称也高度相似(“编辑距离”较小)的实体来进一步完善这一点。

我们将查询这些候选实体,然后使用大型语言模型(LLM)来最终决定是否合并它们。

# Query for potential duplicates based on community and name similarity
word_edit_distance = 3
potential_duplicate_candidates = graph.query(
    # ... (full Cypher query from the notebook) ...
    """
    MATCH (e:`__Entity__`)
    WHERE size(e.id) > 4
    WITH e.wcc AS community, collect(e) AS nodes, count(*) AS count
    WHERE count > 1
    # ... (rest of the complex query) ...
    RETURN distinct(combinedResult)
    """
,
    params={"distance": word_edit_distance},
)

# Let's look at a few candidates
potential_duplicate_candidates[:5]

上述代码的输出如下。

OUTPUT

[{'combinedResult': ['David Van', 'Davidvan']},
{'combinedResult': ['Cyb003', 'Cyb004']},
{'combinedResult': ['Delta Air Lines', 'Delta_Air_Lines']},
{'combinedResult': ['Elon Musk', 'Elonmusk']},
{'combinedResult': ['Market', 'Markets']}]

这些看起来是明显的重复项。现在,我们可以使用另一个 BAML 函数,让 LLM 决定保留哪个名称。完成此解析过程后,我们将在 Neo4j 中合并这些节点。

# (Assuming 'merged_entities' is created by the LLM resolution process)
graph.query(
    """
    UNWIND $data AS candidates
    CALL {
      WITH candidates
      MATCH (e:__Entity__) WHERE e.id IN candidates
      RETURN collect(e) AS nodes
    }
    CALL apoc.refactor.mergeNodes(nodes, {properties: {'`.*`': 'discard'}})
    YIELD node
    RETURN count(*)
    """
,
    params={"data": merged_entities},
)

使用 Leiden 算法进行社区检测

现在来到 GraphRAG 的核心:将相关实体分组为社区。

我们将把完整的图(包括所有原始关系)投影到内存中,并运行 Leiden 算法,这是一种先进的社区检测算法。

# Project the full graph, weighting relationships by their frequency
G, result = gds.graph.project(
    "communities",
    "__Entity__",
    {
        "_ALL_": {
            "type""*",
            "orientation""UNDIRECTED",
            "properties": {"weight": {"property""*""aggregation""COUNT"}},
        }
    },
)

# Run Leiden community detection and write the results back to the nodes
gds.leiden.write(
    G,
    writeProperty="communities",
    includeIntermediateCommunities=True# This creates hierarchical communities
    relationshipWeightProperty="weight",
)

这会为每个实体节点添加一个 communities 属性,它是一个社区ID列表,包含不同粒度层级的社区(从小型、紧密结合的组到更大、更广泛的主题)。

最后,我们通过创建 __Community__ 节点并将它们连接起来,在图谱中实现这种层次结构。这创建了一个可浏览的主题结构。

# Create a uniqueness constraint for community nodes
graph.query("CREATE CONSTRAINT IF NOT EXISTS FOR (c:__Community__) REQUIRE c.id IS UNIQUE;")

# Create community nodes and link entities and communities together
graph.query(
    """
    MATCH (e:`__Entity__`)
    UNWIND range(0, size(e.communities) - 1 , 1) AS index
    // ... (full community creation query from notebook) ...
    RETURN count(*)
    """

)

这个复杂的查询创建了一个多层次的社区结构,例如:(Entity)-[:IN_COMMUNITY]->(Level_0_Community)-[:IN_COMMUNITY]->(Level_1_Community)

分析最终图谱结构

经过所有这些工作,我们的知识图谱结构如何?让我们来分析每个层级的社区规模。

# Query for the size of each community at each level
community_size = graph.query(
    """
    MATCH (c:__Community__)<-[:IN_COMMUNITY*]-(e:__Entity__)
    WITH c, count(distinct e) AS entities
    RETURN split(c.id, '-')[0] AS level, entities
    """

)

# Printing the processed dataframe
percentiles_df

这个表格在这里很重要。它向我们展示了 Leiden 算法是如何将我们的 1,875 个实体进行分组的。

  • • 在 0级,我们有 858 个小型、高度集中的社区。其中90%的社区成员数量在4个或更少。
  • • 当我们提升到 3级时,算法已将这些社区合并为 732 个更大、更普遍的社区。此级别中最大的社区现在包含 77 个实体。

这种分层结构恰好满足了高效 GraphRAG 的需求。我们现在可以在不同抽象级别执行检索。

结论

实验结果明确表明,虽然标准的 LangChain 工具提供了快速入门的方法,但当与小型、开源 LLM 结合使用时,它们可能显得脆弱且不可靠。

通过引入 BAML,我们解决了提示词过于复杂和严格的 JSON 解析等核心问题。结果是成功率从 25% 大幅提升到 99% 以上,将一个失败的实验转变为一个健壮且可扩展的知识图谱构建管道。

以下是我们所采取的关键步骤的快速回顾:

  • • 首先,我们准备了新闻文章数据集,并使用 Ollama 设置了本地 llama3 模型。
  • • 我们首次使用 LangChain 的 LLMGraphTransformer 进行测试,其 75% 的失败率源于严格的 JSON 解析。
  • • 尝试通过高级提示词工程修复此问题,但失败率仅略微改善至约 62%。
  • • 随后,我们集成了 BAML,利用其简化的模式和健壮的解析器,在图提取方面取得了 99.4% 的成功率
  • • 高质量的图数据被加载到 Neo4j 中进行结构化和分析。
  • • 我们通过为所有实体生成向量嵌入来丰富图谱,以捕获其语义信息。
  • • 利用 k 近邻(kNN)算法,我们识别并连接了语义相似的节点。
  • • 我们进一步完善了图谱,使用 LLM 智能地查找并合并重复实体。
  • • 最后,我们应用 Leiden 算法将实体组织成多层次的社区结构,为高级 GraphRAG 奠定了基础。

这种结合 LangChain 强大编排能力和 BAML 可靠结构化输出的方法,是构建强大且经济高效 AI 应用程序的有效途径。


 


53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询