微信扫码
添加专属顾问
我要投稿
掌握知识图谱,提升大数据挖掘效率的实战秘籍! 核心内容: 1. 知识图谱在大数据挖掘中的应用及其效率提升 2. 从搭建开发环境到知识图谱嵌入的完整项目流程 3. 代码实践与GitHub资源链接,助力快速上手
大数据本身蕴藏着丰富价值,但从中挖掘洞见需要耗费大量人力。而知识图谱能够简化这一过程,通过将零散信息整合成结构化、可搜索的形式,能显著提高效率。一项研究表明,使用知识图谱可将数据探索时间最高缩短 70%(https://arxiv.org/abs/1811.01660)。这意味着我们能减少繁琐的挖掘工作,同时获得更多有价值的发现。
本文将基于本书(Knowledge Graphs and Big Data Processing)的理论知识和方法,手把手教你如何利用知识图谱处理大数据,完成一个端到端的项目。
首先,我们需要准备必要的工具。我们将使用一些关键的 Python 库来完成这项工作。先来安装它们。
# Install libraries (run this cell once)
pip install openai rdflib spacy pyvis datasets scikit-learn matplotlib tqdm pandas
安装完成后,您可能需要重启 Jupyter 内核或运行环境,以便更改生效。
现在我们已经安装了所需的库,接下来将它们全部导入到脚本中。
# Import necessary libraries
import os
import re
import json
from collections importCounter
import matplotlib.pyplotas plt
from tqdm.autoimport tqdm
import pandas as pd
import time# NLP and KG libraries
import spacy
from rdflib importGraph, Literal, Namespace, URIRef
from rdflib.namespaceimportRDF, RDFS, XSD, SKOS # AddedSKOSfor altLabel# OpenAI client forLLM
from openai importOpenAI# Visualization
from pyvis.networkimportNetwork# HuggingFace datasets library
from datasets import load_dataset# For embedding similarity
import numpy as np
from sklearn.metrics.pairwiseimport cosine_similarity
所需库已成功导入,工具箱已备齐。
在本文中,我们将使用 CNN/DailyMail(https://huggingface.co/datasets/abisee/cnn_dailymail) 数据集。该数据集包含超过 30 万篇新闻文章以及相应的人工撰写摘要,是提取实体、关系和事件的绝佳资源。
让我们加载这个数据集并打印一个样本。
# Using a specific version can help with consistency
cnn_dm_dataset = load_dataset("cnn_dailymail", "3.0.0")
我们使用的是 3.0.0
版本,这是该数据集的最新版本。
接下来,打印一些关于这个数据的基本信息。
# Calculate the total number of records
total_records = len(cnn_dm_dataset["train"]) + len(cnn_dm_dataset["validation"]) + len(cnn_dm_dataset["test"])# Print the total and a sample record
print(f"Total number of records in the dataset: {total_records}\n")
print("Sample record from the training dataset:")
print(cnn_dm_dataset["train"][0])#### OUTPUT ####
Total number of records in the dataset: 311971Sample record from the training dataset:
{'article': 'LONDON, England (Reuters) -- Harry Potter star Daniel ...'}
可以看到,我们总共有 30 多万篇新闻文章,从中提取有价值的洞见无疑是一项具有挑战性的任务。不过,我们将借助知识图谱来探索它如何帮助我们应对这一挑战。
直接使用整个数据集(包含超过 30 万篇新闻文章)构建知识图谱并不可取,因为其中的内容并不完全相关。可以将新闻文章按主题分类,例如,关于科技的新闻可以构成一个大型知识图谱,而体育新闻则构成另一个。
构建知识图谱处理大数据的早期阶段是:将数据分解成更小的块。
考虑到我们处理的是新闻文章,可以使用基于关键词的方法将这 30 万多篇文章分解成不同的子集。
首先,创建 ACQUISITION_KEYWORDS
列表,它将帮助我们从大数据集中过滤出相关文章。
# Define keywords relevant to technology company acquisitions
ACQUISITION_KEYWORDS = ["acquire", "acquisition", "merger", "buyout", "purchased by", "acquired by", "takeover"]
TECH_KEYWORDS = ["technology", "software", "startup", "app", "platform", "digital", "AI", "cloud"]
这些关键词通常是在对数据进行文本分析时浮现出来的,但为了简化起见,我们在此预先定义。它们是新闻文章中常见且出现频率较高的词汇。
我们旨在通过预定义的科技和收购相关关键词,筛选出关于科技公司收购、合并或并购的新闻文章。
# taking train split only
cnn_dm_dataset_train = cnn_dm_dataset['train']# Initialize an empty list to store filtered articles
filtered_articles = []# Loop through the dataset and filter articles based on keywords
for record in cnn_dm_dataset_train: # Check if any of the keywords appear in the article text
found_keyword = False
for keyword in ACQUISITION_KEYWORDS:
if keyword.lower() in record['article'].lower():
found_keyword = True
break # Stop once a keyword is found # If a keyword was found, append the article to the filtered list
if found_keyword:
filtered_articles.append(record)
现在我们已经过滤出了文章,接下来检查一下过滤后的文章总数以及一个样本。
# Print the total number of filtered articles
print(f"Total number of filtered articles: {len(filtered_articles)}")# Print a sample of one filtered article
print("\nSample of a filtered article:")
print(filtered_articles[0]['article'])### OUTPUT ####
Total number of filtered articles: 65249Sample of a filtered article:
SAN DIEGO, California (CNN) -- You must know whats really driving the
immigration debate ...
我们过滤出的文章大约有 6.5 万篇。生成数据子集后,下一步是清洗记录。
我们需要尽可能多地移除不必要的信息,因为这些数据后续将被用作 LLM 的输入。处理大规模数据集时,不必要的信息可能会影响成本和性能。
在我们的新闻数据中,可以移除链接、不必要的字符、频道名称等。现在就来做这些清洗工作。
cleaned_articles = []for record in filtered_articles:
text = record['article'] # Basic cleaning using regular expressions
text = re.sub(r'^\(CNN\)\s*(--)?\s*', '', text) # Remove (CNN) prefix
text = re.sub(r'By .*? for Dailymail\.com.*?Updated:.*', '', text, flags=re.I | re.S) # Remove byline
text = re.sub(r'PUBLISHED:.*?UPDATED:.*', '', text, flags=re.I | re.S) # Remove published/updated
text = re.sub(r'Last updated at.*on.*', '', text, flags=re.I) # Remove last updated
text = re.sub(r'https?://\S+|www\.\S+', '[URL]', text) # Replace URLs
text = re.sub(r'<.*?>', '', text) # Remove HTML tags
text = re.sub(r'\b[\w.-]+@[\w.-]+\.\w+\b', '[EMAIL]', text) # Replace emails
text = re.sub(r'\s+', ' ', text).strip() # Normalize whitespace # Store cleaned result
cleaned_articles.append({
"id": record['id'],
"cleaned_text": text,
"summary": record.get('highlights', '')
})
我们通过一个 for 循环遍历过滤后的新闻文章,移除那些对于知识图谱构建来说不必要的冗余数据。
本次清洗基于我个人对数据进行初步探索后的经验。显然,如果作为团队项目处理,通常需要投入更多精力进行数据清洗,以确保数据质量并进一步减小每篇文章的体积,从而优化后续处理流程。
我们目前有 6.5 万多篇新闻文章,从中提取实体是具有挑战性的一步。通常我们会使用 LLM 从每个文本块中提取实体,但 LLM 应该寻找哪些类型的实体呢?
如果我们完全放任 LLM,它可能会从不同的文本块中提取出各种各样的实体。为了避免这种情况,我们必须使用 NLP 技术来定义一组固定的实体类型,指导 LLM 重点关注这些类型。
有很多方法可以实现这一点,比如使用嵌入(embeddings)等技术。但在这里,我们将使用一个经过 SpaCy 训练的模型,利用其内置的命名实体识别(NER)能力。它将分析我们的数据,从中提取实体标签,然后我们将使用这组标签来指导 LLM 提取特定类型的实体。
# Download and load the English language model for spaCy
# only needs to be run once)
spacy.cli.download("en*core_web_sm")
nlp = spacy.load("en_core_web_sm")# Initialize a counter to hold entity label counts (e.g., PERSON, ORG, DATE)
entity_counts = Counter()# Loop through each article and apply spaCy's Named Entity Recognition
for article in cleaned_articles:
text = article['cleaned_text'] # Get the cleaned text
doc = nlp(text) # Process text with spaCy # Count each entity label found in the text
for ent in doc.ents:
entity_counts[ent.label*] += 1
让我们打印实体计数并查看一个样本。
print(entity_counts)### OUTPUT ###
spaCy Entity Counts:
ORG: 2314
GPE: 1253
PERSON: 524
NORP: 3341
CARDINAL: 7542
DATE: 6344
...
接下来,将这些标签绘制成图表,以便更直观地了解我们的数据在实体类型分布和数量方面的概况。
# Extract labels and counts
labels, counts = zip(\*entity_counts)# Plotting the bar chart
plt.figure(figsize=(12, 7)) # Set figure size
plt.bar(labels, counts, color='skyblue') # Create the bar plot
plt.title("Top Entity Type Distribution (via spaCy)") # Chart title
plt.ylabel("Frequency") # Y-axis label
plt.xlabel("Entity Label") # X-axis label
plt.xticks(rotation=45, ha="right") # Rotate x-axis labels for better visibility
plt.tight_layout() # Adjust layout to make sure everything fits
plt.show() # Display the plot
我们目前使用的是 SpaCy 的一个小模型,但可以切换到更大的模型。更大的模型很可能从数据中提取出更多有效且更深层次的标签。
这些标签将用于指导 LLM 从文章中提取实体。我们将使用微软的 Phi-4
模型,它将以这些实体类型为依据,从每篇文章中提取实体。
下一步是提取实体,它们将作为知识图谱中的节点。
为此,我们必须定义一个系统提示,用于指导 LLM 根据我们之前使用 SpaCy 定义的实体类型来提取实体,以及定义一个用户提示(在本例中即新闻文章),以及其他一些组成部分。
首先,设置与 LLM 的连接。我们开始进行这一步。
# Initialize the OpenAI client using provided configuration
client = OpenAI(
base_url="YOUR LLM API Provider link",
api_key="LLM API KEY"
)
我们需要一个辅助函数来打包请求并发送给 LLM。该函数将接收系统提示、用户提示(文章文本)和模型名称。
def call_llm(system_prompt, user_prompt, model_name):
"""
Sends a request to a language model (LLM) to extract entities based on provided prompts. Args:
system_prompt (str): Instructions or context for the LLM (e.g., how to behave).
user_prompt (str): The user input containing text to extract entities from.
model_name (str): The identifier of the LLM model to use (e.g., "gpt-4"). Returns:
str: The JSON-formatted string response from the LLM, or None if the client is unavailable.
""" # Construct and send the chat completion request to the LLM
response = client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": system_prompt}, # System-level instructions
{"role": "user", "content": user_prompt} # User-provided input
],
) # Extract and return the response content (JSON string)
return response.choices[0].message.content.strip()
现在,创建这个至关重要的系统提示。我们将使用 Python 的 f-string
来动态插入我们的实体类型列表(该列表来自 entity_counts.keys()
)。
# Get the top N entity types by frequency
relevant_entity_labels_for_llm = [label for label, count in entity_counts.most_common(TOP_N_ENTITY_TYPES)]
entity_types_string_for_prompt = ", ".join(relevant_entity_labels_for_llm)# System prompt for the LLM
# We are instructing it to return a JSON object with a key "entities"
# whose value is a list of entity objects.
llm_ner_system_prompt = (
f"You are an expert Named Entity Recognition system. "
f"From the provided news article text, identify and extract entities. "
f"The entity types to focus on are: {entity_types_string_for_prompt}. "
f"For each identified entity, provide its exact text span from the article and its type (use one of the provided types). "
f"Output ONLY a valid JSON object with a single key 'entities'. The value of 'entities' MUST be a list of JSON objects, "
f"where each object has 'text' and 'type' keys. "
f"Example: {{\"entities\": [{{\"text\": \"United Nations\", \"type\": \"ORG\"}}, {{\"text\": \"Barack Obama\", \"type\": \"PERSON\"}}]}} "
f"If no entities of the specified types are found, the 'entities' list should be empty: {{\"entities\": []}}."
)
这个系统提示将以有效的 JSON 格式输出实体数据。
在创建主循环之前,我们需要一个 JSON 解析函数,将文本输出转换成有效的 JSON 格式。
def parse_llm_entity_json_output(llm_output_str):
"""
Parses the JSON string from the LLM and returns a list of entities.
Assumes the format: {"entities": [{"text": "...", "type": "..."}]} Args:
llm_output_str (str): JSON string from the LLM. Returns:
list: Extracted entities or empty list if parsing fails.
"""
if not llm_output_str:
return [] # Return empty list if no output # Remove markdown code block if present
if llm_output_str.startswith("```json"):
llm_output_str = llm_output_str[7:].rstrip("```").strip() try:
data = json.loads(llm_output_str)
return data.get("entities", []) # Return entities list, or empty if not found
except json.JSONDecodeError:
return [] # Return empty list on JSON error
接着,创建一个循环,对数据集中的每篇文章都使用这个系统提示。
# Defining our entities extraction LLM
TEXT_GEN_MODEL_NAME = "microsoft/phi-4"# Loop through a limited number of cleaned articles to
# extract entities using the LLM
for i, article_data in enumerate(cleaned_articles):
article_id = article_data['id']
article_text = article_data['cleaned_text'] # Call the LLM to extract entities
llm_response_content = call_llm(
llm_ner_system_prompt,
article_text,
TEXT_GEN_MODEL_NAME
) # Parse the LLM's response into a list of entities
extracted_llm_entities = []
if llm_response_content:
extracted_llm_entities = parse_llm_entity_json_output(llm_response_content) # Store the results with the article
articles_with_llm_entities.append({
"id": article_id,
"cleaned_text": article_text,
"summary": article_data['summary'],
"llm_extracted_entities": extracted_llm_entities
})
运行这个循环后,从 6.5 万篇新闻文章中提取实体需要一些时间。让我们打印其中一篇文章的提取结果。
# Print entities of a sample article
print(articles_with_llm_entities[4212]['llm_extracted_entities'])### OUTPUT ###
Extracted 20 entities for article ID 4cf51ce937a.
Sample entities: [
{
"text": "United Nations",
"type": "ORG"
},
{
"text": "Algiers",
"type": "GPE"
},
{
"text": "CNN",
"type": "ORG"
} ...
我们已成功从 6.5 万多篇新闻文章中提取了实体,这些实体将作为我们知识图谱中的节点。然而,这些节点需要关系来定义图谱中的边。我们将在下一步中解决这个问题。
为了让我们的图谱真正有用,我们需要找出实体之间的连接和关系。这些关系将构成知识图谱的边,将节点连接起来,从而讲述一个完整的故事。例如,我们想知道:
收购了 (ACQUIRED)
哪家公司?价格是多少 (HAS_PRICE)
?宣布的 (ANNOUNCED_ON)
具体的日期 (DATE)
?我们将使用与实体提取相同的 LLM 调用函数,但需要一个新的系统提示用于关系提取。
# System prompt for relationship extraction
# We're asking for a JSON object with a "relationships" key.
llm_re_system_prompt = (
"You are an expert system for extracting relationships between entities from text, "
"specifically focusing on **technology company acquisitions**. "
"Given an article text and a list of pre-extracted named entities (each with 'text' and 'type'), "
"your task is to identify and extract relationships. "
"The 'subject_text' and 'object_text' in your output MUST be exact text spans of entities found in the provided 'Extracted Entities' list. "
"The 'subject_type' and 'object_type' MUST correspond to the types of those entities from the provided list. "
"Output ONLY a valid JSON object with a single key 'relationships'. The value of 'relationships' MUST be a list of JSON objects. "
"Each relationship object must have these keys: 'subject_text', 'subject_type', 'predicate' (one of the types listed above), 'object_text', 'object_type'. "
"Example: {\"relationships\": [{\"subject_text\": \"Innovatech Ltd.\", \"subject_type\": \"ORG\", \"predicate\": \"ACQUIRED\", \"object_text\": \"Global Solutions Inc.\", \"object_type\": \"ORG\"}]} "
"If no relevant relationships of the specified types are found between the provided entities, the 'relationships' list should be empty: {\"relationships\": []}."
)
在这个系统提示中,我们实际上是指导 LLM 以如下格式输出关系:
{
"relationships":[
{
"subject_text":"Innovatech Ltd.",
"subject_type":"ORG",
"predicate":"ACQUIRED",
"object_text":"Global Solutions Inc.",
"object_type":"ORG"
}
]
}
谓语(predicate)定义了关系。与实体类似,我们也需要从 LLM 输出中解析出关系 JSON 格式。
def parse_llm_relationship_json_output(llm_output_str_rels):
"""
Parses the JSON string from the LLM to extract relationships. Expected format:
{"relationships": [{"subject_text": ..., "predicate": ..., "object_text": ...}]} Args:
llm_output_str_rels (str): JSON string from the LLM. Returns:
list: Extracted relationships or empty list if parsing fails.
"""
if not llm_output_str_rels:
return [] # Return empty list if no output # Remove markdown code block if present
if llm_output_str_rels.startswith("```json"):
llm_output_str_rels = llm_output_str_rels[7:].rstrip("```").strip() try:
data = json.loads(llm_output_str_rels)
return data.get("relationships", []) # Return relationships list, or empty if not found
except json.JSONDecodeError:
return [] # Return empty list on JSON error
现在,我们将使用这个系统提示和 JSON 解析器,结合文章对应的实体以及我们目前收集到的其他信息。
让我们遍历 articles_with_llm_entities
列表。对于每篇包含实体的文章,我们将准备一个新的用户提示,其中包含文章文本及其提取出的实体,然后请 LLM 找出它们之间的关系。
# Iterate through each article's entity data
for i, article_entity_data in enumerate(articles_with_llm_entities): # Extract article id, cleaned text, and extracted entities from the article data
article_id_rels = article_entity_data['id']
article_text_rels = article_entity_data['cleaned_text']
current_entities = article_entity_data['llm_extracted_entities'] # Serialize the list of entities into a JSON string for inclusion in the prompt
entities_json_for_prompt = json.dumps(current_entities) # Construct the user prompt to request relationship extraction from the LLM
user_prompt_for_re = (
f"Article Text:\n`\n{article_text_for_llm_re}\n`\n\n"
f"Extracted Entities (use these exact texts for subjects/objects of relationships):\n`json\n{entities_json_for_prompt}\n`\n\n"
"Identify and extract relationships between these entities based on the system instructions."
) # Call the LLM to get the relationship extraction based on the prompt
llm_response_rels_content = call_llm_for_relationships(llm_re_system_prompt, user_prompt_for_re, TEXT_GEN_MODEL_NAME) # Initialize an empty list to store the extracted relationships
extracted_llm_rels = [] # If LLM response is not empty, parse the extracted relationships from the JSON response
if llm_response_rels_content:
extracted_llm_rels = parse_llm_relationship_json_output(llm_response_rels_content) # Append the original article data along with the extracted relationships to the results list
articles_with_llm_relations.append({
\*\*article_entity_data, # Keep the original article data (id, text, entities, etc.)
"llm_extracted_relationships": extracted_llm_rels # Add the extracted relationships
})
这个循环将开始从每篇文章数据中提取关系。让我们打印一个样本数据关系。
# Print entities of a sample article
print(articles_with_llm_entities[1234]['llm_extracted_relationships'])### OUTPUT ###
Extracted 3 relationships using LLM.
Sample LLM relationships: [
{
"subject_text": "Microsoft Corp.",
"subject_type": "ORG",
"predicate": "ACQUIRED",
"object_text": "Nuance Communications Inc.",
"object_type": "ORG"
},
{
"subject_text": "Nuance Communications Inc.",
"subject_type": "ORG",
"predicate": "HAS_PRICE",
"object_text": "$19.7 billion",
"object_type": "MONEY"
}
]
... (similar output for other articles) ...
很好!我们已成功从文章数据中提取出实体(节点)和关系(边)。接下来,我们将继续下一步构建。
我们已经提取了实体(比如 Microsoft Corp.
, Microsoft
, MSFT
)以及连接它们的关系。
仔细观察会发现,LLM 可能从一篇文章中提取出 Microsoft Corp.
,而从另一篇中只提取 Microsoft
,即使它们指的都是现实世界中的同一家公司。
若在知识图谱中将它们视为两个独立节点,则会错失一个至关重要的关联。
我们的图谱将不知道它们是同一个事物。这时就需要实体规范化(entity normalization)(也与实体消歧或解析相关)登场了。
将实体完整地链接到像 Wikidata 这样的大型外部知识库是一个复杂的任务。对于我们的项目,我们将采用一种简化的方法:
Inc.
、Ltd.
、Corp.
等。这样,Microsoft Corp.
就变成了 Microsoft
。URI
。例如,规范化文本 Microsoft
且类型为 ORG
的实体将获得一个特定的 URI
。如果我们再次遇到 Microsoft
(ORG
),它将获得相同的 URI
。首先,创建一个函数来规范化实体文本。
def normalize_entity_text_for_uri(entity_text, entity_type):
"""
Normalizes entity text, primarily by stripping common
suffixes for organizations.
"""
normalized_text = entity_text.strip() if entity_type == 'ORG': # List of common suffixes to remove from organization names # This list can be expanded based on your data
suffixes_to_remove = [
'Inc.', 'Incorporated', 'Ltd.', 'Limited', 'LLC', 'L.L.C.',
'Corp.', 'Corporation', 'PLC', 'Co.', 'Company',
'Group', 'Holdings', 'Solutions', 'Technologies', 'Systems'
] # Sort by length to remove longer matches first (e.g., "Corp." before "Co.")
suffixes_to_remove.sort(key=len, reverse=True) for suffix in suffixes_to_remove: # Case-insensitive check if the text ends with the suffix
if normalized_text.lower().endswith(" " + suffix.lower()) or normalized_text.lower() == suffix.lower(): # Find the start of the suffix in the original cased string
suffix_start_index = normalized_text.lower().rfind(suffix.lower()) # Slice the string to remove the suffix
normalized_text = normalized_text[:suffix_start_index].strip() # Once a suffix is removed, we break to avoid over-stripping if not careful # e.g. "The The Co." -> "The The" not "The"
break # Remove any trailing commas or periods that might be left
normalized_text = re.sub(r'[,.]\*
, '', normalized_text).strip() # Remove possessives like 's or s' which are sometimes caught by NER
if normalized_text.endswith("'s") or normalized_text.endswith("s'"):
normalized_text = normalized_text[:-2].strip() # If normalization results in an empty string, revert to original (should be rare)
return normalized_text if normalized_text else entity_text
有了 normalize_entity_text_for_uri
函数,我们就可以处理所有由 LLM 提取的实体了。对于每个实体,我们将规范化其文本,然后创建一个(或复用已有的)唯一 URI
。
我们将使用一个名为 unique_entities_map
的 Python 字典来跟踪我们的规范 URI。键将是 (normalized_text, entity_type)
的元组,值则是其 URI
。
这确保了每次遇到相同类型、相同规范化文本的实体时,我们都使用完全相同的 URI
。
# Final output list to store articles with processed entity info
articles*with_normalized_entities_and_uris = []# Dictionary to track unique entities and assign stable URIs
unique_entities_map = {}# Define a base namespace for Knowledge Graph URIs
EX = Namespace("http://example.org/kg/")
print("KG Namespace EX defined.")# Process each article
for article in tqdm(articles_with_llm_relations, desc="Normalizing & URI Gen"):
processed = [] # Extract and process each entity if available
for ent in article.get('llm_extracted_entities', []):
text = ent['text']
type_raw = ent['type'] # Normalize the type (e.g., "ORG (Organization)" → "ORG")
type_simple = type_raw.split()[0].upper() # Normalize the entity text for URI generation
norm_text = normalize_entity_text_for_uri(text, type_simple)
key = (norm_text, type_simple) # Assign a unique URI if it's a new entity
if key not in unique_entities_map: # Clean and truncate the text to make it URI-safe
safe_text = re.sub(r'[^a-zA-Z0-9*]', '_', norm_text.replace(' ', '_'))[:50] # If cleaning makes the name empty, fallback to a hash
if not safe*text:
safe_text = f"entity*{hashlib.md5(norm*text.encode()).hexdigest()[:8]}" # Generate full URI
unique_entities_map[key] = EX[f"{safe_text}*{type_simple}"] # Add normalized fields and URI to the entity
processed.append({
**ent,
'normalized_text': norm_text,
'simple_type': type_simple,
'uri': unique_entities_map[key]
}) # Add the processed entity list to the article
articles_with_normalized_entities_and_uris.append({
**article,
"processed_entities": processed
})
这个循环需要一些时间来运行,因为它会规范化实体并为它们生成 URI。现在,让我们打印一个样本结果。
# Display the first 3 processed entities from the first article for ent in
articles_with_normalized_entities_and_uris[2222]['processed_entities'][:3]:
print(f" Original: '{ent['text']}' ({ent['type']})") # Original entity text and
raw type print(f" Normalized: '{ent['normalized_text']}' (Simple Type:
{ent['simple_type']})") # Cleaned text and type print(f" URI: <{ent['uri']}>") #
The generated URI for the entity### OUTPUT ### Example of processed entities
from the first article (sample): Original: 'Inabix Corp.' (ORG) Normalized:
'Inabix' (Simple Type: ORG) URI: <http://example.org/kg/Inabix_ORG> Original:
'Nuance Communications Inc.' (ORG) Normalized: 'Nuance Communications' (Simple
Type: ORG) URI: <http://example.org/kg/Nuance_Communications_ORG> Original:
'$73.1 billion' (MONEY) Normalized: '$3.1 billion' (Simple Type: MONEY) URI:
<http://example.org/kg/73_1_billion_MONEY>
现在,数据集中每个独特的实体都有了一个干净、规范化后的名称,最重要的是,拥有了一个唯一的 URI
。这个 processed_entities
列表,连同我们之前提取的关系,现在已经准备好被转换为实际的 RDF
三元组了。
我们需要为我们的知识图谱赋予更正式的结构。这时就需要进行模式(Schema)或本体(Ontology)对齐。
可以将它想象成知识图谱的蓝图或字典。
在本项目中,我们将进行一个简化的模式对齐。我们希望告诉图谱,ex:Microsoft_ORG
这个 URI 不仅仅是一个随机的字符串;它代表一个组织 (Organization)。或者,ex:Satya_Nadella_PERSON
这个 URI 代表一个人物 (Person)。
我们将创建一个函数,它接收实体的简单类型 (simple_type)(比如我们之前提取的 ORG
、PERSON
、MONEY
),并将其映射到一个正式的 RDF 类 (RDF Class)
。RDF 类
就像 RDF 世界中的一个类别或类型。
现在,编写函数来获取实体类型的 RDF 类。
def get_rdf_class_for_entity_type(simple_entity_type_str):
"""
Maps a simple entity type string (e.g., 'ORG') to an RDF Class URI.
Uses Schema.org where possible, otherwise defaults to our custom EX namespace.
"""
type_to_rdf_class_map = {
'ORG': SCHEMA.Organization,
'PERSON': SCHEMA.Person,
'MONEY': SCHEMA.PriceSpecification, # Schema.org uses thisfor monetary amounts
'DATE': SCHEMA.Date, # Represents a date.
'PRODUCT': SCHEMA.Product,
'GPE': SCHEMA.Place, # GeopoliticalEntity (maps well to Place)
'LOC': SCHEMA.Place, # GeneralLocation
'EVENT': SCHEMA.Event,
'NORP': SCHEMA.Nationality, # Nationalities, religious, or political groups
'CARDINAL': XSD.integer, # Cardinal numbers are often just literal integers
# or could be mapped to schema:QuantitativeValueif more context.
# For typing a node, it's less common unless it's a complex value.
# Often, cardinal numbers become literal values of properties.
# Add more mappings if your LLM identified other relevant 'simple_type's
} # Use .get() to provide a fallback if the type isn't in our map
# If not in map, create a class in our EX namespace
rdf_class = type_to_rdf_class_map.get(simple_entity_type_str.upper(), EX[simple_entity_type_str.upper()]) return rdf_class
让我们用几个例子来测试这个函数,看看我们的实体类型将映射到哪些 RDF 类。
print("Example RDF Class mappings for our entity types:")sample_type1 = 'ORG'
rdf_class1 = get_rdf_class_for_entity_type(sample_type1)
print(f" Entity Type '{sample_type1}' maps to RDF Class: <{rdf_class1}>")sample_type2 = 'MONEY'
rdf_class2 = get_rdf_class_for_entity_type(sample_type2)
print(f" Entity Type '{sample_type2}' maps to RDF Class: <{rdf_class2}>")sample_type3 = 'INVESTMENT_ROUND' # A hypothetical custom type
rdf_class3 = get_rdf_class_for_entity_type(sample_type3)
print(f" Entity Type '{sample_type3}' (custom) maps to RDF Class: <{rdf_class3}>")### OUTPUT ###
CustomKGNamespaceEX re-defined for clarity.
RDFNamespace: http://www.w3.org/1999/02/22-rdf-syntax-ns#
RDFSNamespace: http://www.w3.org/2000/01/rdf-schema#
SCHEMANamespace (Schema.org): http://schema.org/
EXNamespace (Custom): http://example.org/kg/Example RDF Class mappings for our entity types:
EntityType'ORG' maps to RDFClass: <http://schema.org/Organization>
EntityType'MONEY' maps to RDFClass: <http://schema.org/PriceSpecification>
EntityType'INVESTMENT_ROUND' (custom) maps to RDFClass: <http://example.org/kg/INVESTMENT_ROUND>
现在,当我们构建图谱时,每个实体 URI(例如 ex:Microsoft_ORG
)将被明确声明为其相应 RDF 类的实例(例如 schema:Organization
)。
这通过使用 rdf:type
谓语来完成,形成一个三元组,例如:
<ex:Microsoft_ORG>
<rdf:type> <schema:Organization></schema:Organization></rdf:type
></ex:Microsoft_ORG>
接下来,我们将为知识图谱生成三元组。
RDF 三元组是基于资源描述框架(RDF)的知识图谱的核心组成部分。每个三元组由三个部分组成,分别表示:
它就像一个简单的句子:
# Original Form
<Subject>
<Predicate>
<object>
# Example
<ex:Microsoft_ORG>
<rdf:type>
<schema:Organization>
(Microsoft is an Organization)</schema:Organization
></rdf:type
></ex:Microsoft_ORG
>
</object></Predicate
></Subject
>
以下是我们将如何生成三元组:
schema:Article
,并添加其摘要。rdf:type
、rdfs:label
,可选的 skos:altLabel
,并通过 schema:mentions
将其链接到文章。首先,像处理实体类一样,为关系谓语定义一个辅助函数。这能确保我们的谓语 URI 保持一致。
def get*rdf_predicate_uri(predicate_string_from_llm):
"""
Converts a predicate string (e.g., 'ACQUIRED', 'HAS_PRICE')
into a proper RDF Property URI in our EX namespace.
""" # Sanitize: uppercase, replace spaces with underscores
sanitized_predicate = predicate_string_from_llm.strip().replace(" ", "*").upper()
return EX[sanitized_predicate]
现在,创建并填充我们的图谱!
# Initialize RDF graph and namespaces
kg = Graph()
SKOS = Namespace("http://www.w3.org/2004/02/skos/core#")
kg.bind("ex", EX)
kg.bind("schema", SCHEMA)
kg.bind("rdfs", RDFS)
kg.bind("skos", SKOS)total*triples_added = 0for article in tqdm(articles_with_normalized_entities_and_uris): # Create URI for the article
article_uri = EX[f"article*{article['id'].replace('-', '\_')}"]
kg.add((article_uri, RDF.type, SCHEMA.Article)) # Add summary or fallback label
label = article.get('summary') or f"Article {article['id']}"
pred = SCHEMA.headline if article.get('summary') else RDFS.label
kg.add((article_uri, pred, Literal(label, lang='en')))
total_triples_added += 2 entity_map = {} # Process entities
for e in article.get('processed_entities', []):
uri = e['uri']
kg.add((uri, RDF.type, get_rdf_class_for_entity_type(e['simple_type'])))
kg.add((uri, RDFS.label, Literal(e['normalized_text'], lang='en')))
if e['text'] != e['normalized_text']:
kg.add((uri, SKOS.altLabel, Literal(e['text'], lang='en')))
kg.add((article_uri, SCHEMA.mentions, uri))
total_triples_added += 4
entity_map[e['text']] = uri # Process relationships
for r in article.get('llm_extracted_relationships', []):
s_uri = entity_map.get(r.get('subject_text'))
o_uri = entity_map.get(r.get('object_text'))
p_uri = get_rdf_predicate_uri(r.get('predicate'))
if s_uri and o_uri:
kg.add((s_uri, p_uri, o_uri))
total_triples_added += 1
这将开始处理实体及其关系,并开始创建三元组。让我们打印一个结果样本,看看它是什么样的。
Sample of first 5 triples from the Knowledge Graph (N3 format):
ex:article_02002614879655690596592a07ba827b1651f065 rdf:type schema:Article .
ex:article_02002614879655690596592a07ba827b1651f065 schema:headline "SAN DIEGO, California (CNN) -- You must know whats really driving the immigration debate because its not what you hear on TV Dont be fooled This has nothing to do with national security It has nothing to do with Mexican immigrants It has nothing to do with illegal immigrants The real driver of this debate is the insecurity of American working-class white men All the other stuff is just window dressing..." .
ex:Microsoft_ORG rdf:type schema:Organization .
ex:Microsoft_ORG rdfs:label Microsoft@en .
ex:Microsoft_ORG ex:ACQUIRED ex:Nuance_Communications_ORG .
我们的知识图谱现在已经填充了 RDF
三元组。每个实体都被赋予了类型、标签,并通过有意义的关系连接起来。
我们已成功地将非结构化的新闻文本转换成结构化、机器可读的知识图谱。
这个 kg
对象,一个 rdflib.Graph
,现在是我们的主要资产。在接下来的步骤中,我们将利用它。
到目前为止,我们的知识图谱已初具形态,其结构清晰,实体(节点)和关系(边)都已明确定义为 RDF
三元组。
但目前,它主要还是象征性的。图谱知道 ex:Microsoft_ORG
是一个 schema:Organization
,但它本身并不理解 Microsoft
和例如 Apple
或 Google
之间的含义或语义相似性,除非我们明确地告诉它。
知识图谱嵌入(Embeddings)能够将图谱中的实体和关系映射到低维向量空间中。这些向量捕捉了实体和关系之间的语义信息。基于这些嵌入,我们可以实现链接预测(预测缺失的关系)、实体相似性(比较实体)、类比推理(例如,“Satya Nadella 之于微软,正如 Tim Cook 之于?”)以及知识图谱补全(填补图谱空白)。
从零开始在我们的图谱上训练像 TransE、ComplEx 或 DistMult 这样的知识图谱嵌入模型(KGE model)可能相当复杂。对于这个入门指南,我们将采用一种更简便的方法,使用预训练的嵌入模型。
定义一个函数,使用 LLM 客户端获取这些嵌入。
EMBEDDING_MODEL_NAME = "BAAI/bge-multilingual-gemma2"def get_text_embeddings(list_of_texts_to_embed, embedding_model):
"""
Generates embeddings for a list of text strings using
the specified model.
""" # Remove invalid or empty texts
valid_texts = [str(text).strip() for text in list_of_texts_to_embed if str(text).strip()] # Return empty embeddings if no valid texts
if not valid_texts:
return {text: [] for text in list_of_texts_to_embed} # Generate embeddings using the specified model
response = client.embeddings.create(model=embedding_model, input=valid_texts) # Map the embeddings to their corresponding texts
embeddings_map = {valid_texts[i]: data_item.embedding for i, data_item in enumerate(response.data)} # Ensure all original texts are included in the result, even if they were empty
for text in list_of_texts_to_embed:
if text.strip() not in embeddings_map:
embeddings_map[text.strip()] = [] return embeddings_map
现在,收集所有来自 unique_entities_map
的独特、规范化后的实体标签(这个映射存储 (normalized_text, simple_type) -> URI
)。
我们希望为每个独特的 normalized_text
获取一个嵌入向量。
# Dictionary to store entity URI -> embedding vector
entity_uri_to_embedding_vector = {}# Extract unique normalized texts from the entity map
unique_normalized_texts = list(set([key[0] for key in unique_entities_map.keys() if key[0].strip()]))# Get embeddings for these texts
text_to_embedding_result_map = get_text_embeddings(unique_normalized_texts, EMBEDDING_MODEL_NAME)# Map entity URIs to their corresponding embeddings
for (norm_text, simple_type), entity_uri in unique_entities_map.items():
if norm_text in text_to_embedding_result_map and text_to_embedding_result_map[norm_text]:
entity_uri_to_embedding_vector[entity_uri] = text_to_embedding_result_map[norm_text]
现在,我们每个独特的实体(准确地说,是它们的名称)都有了一个存储在 entity_uri_to_embedding_vector
中的密集向量表示。
这个向量捕捉了其部分语义含义。
我们当前的图谱只包含了从文本中明确陈述或提取出来的事实。然而,如果某些潜在的关系或相似性未被直接提及,但我们却可以通过推理发现它们呢?这就进入了链接预测(Link Prediction) 的研究领域。
知识图谱中的链接预测是指识别实体(节点)之间缺失关系(边)的任务。
知识图谱中的链接预测就像玩连点游戏,但对象是数据,试图找出即使尚未画线也应该连接起来的点。
想象一下,我们的图谱知道:
SoftwareCompany
。SoftwareCompany
。CloudComputing
领域运营。如果“公司 A”和“公司 B”的嵌入向量非常相似,并且它们的名称暗示着相似的业务活动,链接预测系统可能会推断“公司 B”也很可能在 CloudComputing
领域运营,即使新闻文章中从未明确提及公司 B 的这一点。
我们将使用**余弦相似度(Cosine Similarity)**来衡量实体嵌入之间的相似性。它测量两个向量之间夹角的余弦值。余弦相似度值越高,表示两个实体在向量空间中越相似,可能存在某种未明确的关系。
编写一个简单的函数来计算余弦相似度。
def calculate_cosine_similarity(embedding_vector_1, embedding_vector_2):
"""
Calculates the cosine similarity between two embedding vectors.
""" # Convert to numpy arrays and reshape to 2D arrays as expected by sklearn
vec1 = np.array(embedding_vector_1).reshape(1, -1)
vec2 = np.array(embedding_vector_2).reshape(1, -1) similarity_score = cosine_similarity(vec1, vec2) return similarity_score[0][0] # The result is a 2D array, get the single value
现在,从 entity_uri_to_embedding_vector
映射中选取几个实体(最好是组织,如果数量足够的话),看看它们的名称嵌入的相似度。
# Get URIs with valid embeddings
uris = [uri for uri, emb in entity_uri_to_embedding_vector.items() if isinstance(emb, (list, np.ndarray)) and len(emb) > 0]# Find at least two ORG entities, fallback to any two entities
org_entities = [uri for uri in uris if (uri, RDF.type, SCHEMA.Organization) in kg]
entity1_uri, entity2_uri = (org_entities[:2] if len(org_entities) >= 2 else uris[:2]) if len(uris) >= 2 else (None, None)if entity1_uri and entity2_uri:
emb1, emb2 = entity_uri_to_embedding_vector[entity1_uri], entity_uri_to_embedding_vector[entity2_uri]
label1 = kg.value(subject=entity1_uri, predicate=RDFS.label, default=str(entity1_uri))
label2 = kg.value(subject=entity2_uri, predicate=RDFS.label, default=str(entity2_uri)) # Calculate similarity and print interpretation
similarity = calculate_cosine_similarity(emb1, emb2)
print(f"\nSimilarity between '{label1}' and '{label2}': {similarity:.4f}")
if similarity > 0.75: print("Highly similar.")
elif similarity > 0.5: print("Moderately similar.")
else: print("Not very similar.")
看看它的输出结果如何。
Semantic Similarity between 'Microsoft' and 'Google': 0.8234
Interpretation: These entities are quite similar based on their name embeddings.
This could suggest they operate in similar domains or have related functions.
即使是这种基于名称嵌入的简单相似度也能暗示出有趣的连接。
例如,如果两家知名度较低的公司与大型科技公司显示出高度相似性,可能值得调查它们是否是新兴的竞争对手,或者是否在相关的利基市场运营。
如果我们有一个完整的链接预测模型,能够以高置信度预测一个新的、未陈述的三元组,比如 (ex:CompanyX, ex:potentialCompetitorOf, ex:CompanyY)
,那么我们可以选择将这个新的、推断出的知识添加回我们的主知识图谱(kg
)。这使得图谱随着时间的推移变得更加丰富和完整。
存储 RDF
知识图谱有几种方法;我们将重点介绍最直接的方式:
rdflib.Graph
对象(kg
)序列化为 Turtle (.ttl
) 文件。这会创建一个包含我们所有三元组的可移植文本文件。
def save_knowledge_graph_to_file(graph_object, output_filepath="my_knowledge_graph.ttl", rdf_format="turtle"):
"""
Saves the rdflib.Graph object to a file in the specified RDF format.
"""
ifnot graph_object orlen(graph_object) == 0:
print("The knowledge graph is empty. Nothing to save.")
returnFalse try:
graph_object.serialize(destination=output_filepath, format=rdf_format)
print(f"Knowledge Graph with {len(graph_object)} triples successfully saved to: {output_filepath} (Format: {rdf_format})")
returnTrue
except Exception as e:
print(f"Error saving knowledge graph to {output_filepath}: {e}")
return False
现在,调用这个函数来保存我们的 kg
。我们将文件名命名为 tech_acquisitions_knowledge_graph.ttl
。
# Define the filename for our saved KG
KG_FILENAME = "tech_acquisitions_knowledge_graph.ttl"
was_saved = save_knowledge_graph_to_file(kg,
output_filepath=KG_FILENAME,
rdf_format="turtle")
如果你用文本编辑器打开 .ttl
文件,你会看到类似以下内容(一个非常小的片段):
@prefix ex: <http://example.org/kg/> . @prefix rdf:
<http://www.w3.org/1999/02/22-rdf-syntax-ns#> . @prefix rdfs:
<http://www.w3.org/2000/01/rdf-schema#> . ...
你可以看到我们在顶部定义的命名空间前缀,后面跟着三元组。分号(;)用于列出同一主语的多个谓语和宾语,句号(.)表示该主语的一组陈述结束。
知识图谱的真正力量在于我们能够向它提问并提取特定的洞见。
针对 RDF 知识图谱,标准查询语言是 SPARQL。
编写一个辅助函数,用于在我们的 kg
(我们填充的 rdflib.Graph
对象)上执行 SPARQL
查询。
def execute_and_print_sparql_query(graph, query, title="SPARQL Query"): print(f"\n--- {title} ---\nQuery:\n{query}") # Run query and format results
results = graph.query(query)
results_list = [{str(var): str(val) for var, val in row.asdict().items()} for row in results] # Print results (up to 10)
print(f"\nResults ({len(results_list)} found):")
for i, result in enumerate(results_list[:10]):
print(f" {i+1}: {result}")
if len(results_list) > 10:
print(f" ... (and {len(results_list) - 10} more results)") return results_list
现在,编写几个 SPARQL 查询,演示如何从我们的科技收购知识图谱中获取洞见。
在我们的图谱中提到了哪些组织?
sparql_query_organizations = """ PREFIX rdfs:
<http://www.w3.org/2000/01/rdf-schema#> PREFIX schema:
<http://schema.org/>SELECT DISTINCT ?org_label ?org_uri WHERE { ?org_uri
rdf:type schema:Organization . ?org_uri rdfs:label ?org_label . } ORDER BY
?org_label LIMIT 10 """org_results = execute_and_print_sparql_query(kg,
sparql_query_organizations, "Query 1: List Organizations")
运行此查询后,其输出如下:
Results (10 found):
1: {'org_label': 'Accenture', 'org_uri': 'http://example.org/kg/Accenture_ORG'}
2: {'org_label': 'Adobe', 'org_uri': 'http://example.org/kg/Adobe_ORG'}
3: {'org_label': 'Advanced Micro Devices', 'org_uri': 'http://example.org/kg/Advanced_Micro_Devices_ORG'}
...
再写一个查询:
给我看看一些收购案例!(收购方和被收购公司)
Results (7 found):
1: {'acquiring_company_label': 'Microsoft', 'acquired_company_label': 'Nuance Communications'}
2: {'acquiring_company_label': 'Salesforce', 'acquired_company_label': 'Slack Technologies'}
3: {'acquiring_company_label': 'Google', 'acquired_company_label': 'Fitbit'}
...
我们正在直接查询从那些杂乱的新闻文章中提取出的结构化知识。
我们可以询问谁收购了谁,花了多少钱,以及何时发生的。这只是 SPARQL 功能的冰山一角。
有很多工具可以用于图谱可视化。在本指南中,我们将使用一个简单而有效的 Python 库,叫做 pyvis
。
它能够生成交互式 HTML 网络可视化,用户既可以在浏览器中查看,也可以直接在 Jupyter Notebook 内进行探索。
定义一个函数,它接收我们的 kg
图谱,并为其一部分三元组生成可视化。
def visualize_kg(graph, filename="kg_viz.html", num_triples=50): # Create a pyvis network for interactive visualization
net = Network(height="600px", width="100%", directed=True) # Collect up to `num_triples` where both subject and object are URIs
triples = [(s, p, o) for s, p, o in graph if isinstance(s, URIRef) and isinstance(o, URIRef)][:num_triples] nodes = set() # To avoid adding duplicate nodes # Add nodes and edges to the visualization
for s, p, o in tqdm(triples, desc="Visualizing"):
for node in (s, o):
if node not in nodes: # Get label and type for each node
label = graph.label(node) or node.n3(graph.namespace_manager)
ntype = graph.value(node, RDF.type)
group = ntype.n3(graph.namespace_manager).split(":")[-1] if ntype else "Unknown" # Add node to graph
net.add_node(str(node), label=str(label), group=group)
nodes.add(node) # Add edge with predicate label
label = p.n3(graph.namespace_manager).split(":")[-1]
net.add_edge(str(s), str(o), label=label, title=label) # Save the visualization to an HTML file
net.save_graph(filename)
现在,为我们的 kg 生成可视化。该函数将在当前工作目录下生成一个名为 tech_acquisitions_kg_sample.html
的文件。
# Call the visualization function
kg_visualization_net = visualize_kg_sample_pyvis(kg, filename=VIZ_FILENAME, num_triples_to_show=75)
运行此代码后,将在当前工作目录下生成一个 HTML 文件,其中包含知识图谱的可视化结果。我们打开它看看效果如何。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-07-17
2025-01-02
2024-08-13
2024-08-27
2024-07-11
2025-01-03
2024-06-24
2024-07-13
2024-07-12
2024-06-10
2025-04-20
2025-04-15
2025-04-09
2025-03-29
2025-02-13
2025-01-14
2025-01-10
2025-01-06