微信扫码
添加专属顾问
我要投稿
探索LangChain如何通过高级架构提升AI应用处理复杂任务的能力。核心内容:1. LangChain系列文章的高级架构设计要点2. 环境配置和API密钥的设置方法3. 工具链(ToolChain)模式在AI应用中的实践示例
大家好,我是胡十一!这是我们LangChain系列的最后一篇,今天要分享的是更高级的架构设计:如何用工具链组织复杂任务,以及如何处理超大型文档。这些技术能让你的AI应用更加强大!
要运行本文示例,需要以下环境配置:
pip install langchain langchain-openai langchain-community python-dotenv
pip install faiss-cpu unstructured
OPENAI_API_KEY=你的阿里云API密钥
mkdir -p data
echo "这是测试长文档..." > data/long_document.txt
当我们的AI应用变得越来越复杂,简单的单模型调用已经不够用了。比如:
这时候,我们需要更高级的架构设计,让AI能够处理复杂任务。
工具链是一种强大的设计模式,可以将多个工具按特定顺序组合使用:
import os
from typing importList, Dict, Any, Callable, Optional
from langchain_core.tools import Tool
from dotenv import load_dotenv
import logging
from datetime import datetime
from collections import Counter
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'toolchain_debug_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 加载环境变量
load_dotenv()
classToolChain:
"""工具链基类,用于管理工具的执行流程"""
def__init__(self, name: str):
self.name = name
self.tools: List[Tool] = []
self.conditions: List[Callable] = []
self.metadata: Dict[str, Any] = {}
defadd_tool(self, tool: Tool, condition: Optional[Callable] = None):
"""添加工具到链中"""
self.tools.append(tool)
self.conditions.append(condition or (lambda x: True))
defexecute(self, input_data: Any) -> Any:
"""执行工具链"""
result = input_data
for tool, condition inzip(self.tools, self.conditions):
if condition(result):
try:
logger.info(f"执行工具: {tool.name}")
result = tool.func(result)
logger.info(f"工具 {tool.name} 执行完成")
except Exception as e:
logger.error(f"工具 {tool.name} 执行失败: {str(e)}")
raise
return result
defget_tools(self) -> List[Tool]:
"""获取工具链中的所有工具"""
returnself.tools
# 示例:文本处理工具链
classTextProcessingChain(ToolChain):
"""文本处理工具链"""
def__init__(self):
super().__init__("text_processing")
self.add_tool(
Tool(
name="text_statistics",
description="统计文本的基本信息",
func=self.text_statistics
),
condition=lambda x: isinstance(x, str)
)
self.add_tool(
Tool(
name="format_text",
description="格式化文本",
func=self.format_text
),
condition=lambda x: isinstance(x, str)
)
deftext_statistics(self, text: str) -> str:
"""文本统计工具"""
try:
char_count = len(text)
word_count = len(text.split())
line_count = len(text.splitlines())
words = text.lower().split()
word_freq = Counter(words).most_common(3)
result = f"""文本统计结果:
- 字符数: {char_count}
- 词数: {word_count}
- 行数: {line_count}
- 最常用词: {', '.join(f'{word}({count}次)' for word, count in word_freq)}"""
return result
except Exception as e:
returnf"统计错误: {str(e)}"
defformat_text(self, text: str) -> str:
"""文本格式化工具"""
try:
# 简单示例:转换为大写
return text.upper()
except Exception as e:
returnf"格式化错误: {str(e)}"
# 使用示例
text_chain = TextProcessingChain()
result = text_chain.execute("这是一个示例文本。这是第二句话。这是测试文本。")
print(result)
这个例子展示了工具链的核心思想:
使用工具链的好处是什么?
这种架构特别适合复杂的多步骤任务,比如数据分析流程、自动化报告生成等。
处理大型文档时,我们可以使用Map-Reduce模式:先将文档分割成小块,然后并行处理每个块,最后合并结果。
from langchain_openai import ChatOpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import TextLoader
from dotenv import load_dotenv
import json
import os
import time
from typing importList, Dict, Any
# 加载环境变量
load_dotenv()
classMapReduceDocumentAnalyzer:
"""使用 Map-Reduce 链的文档分析器类,用于分析长文档"""
def__init__(self,
model_name: str = "qwen-turbo",
api_base: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
temperature: float = 0.1,
chunk_size: int = 4000,
chunk_overlap: int = 500):
# 初始化LLM
self.analyze_llm = ChatOpenAI(
model=model_name,
openai_api_base=api_base,
temperature=temperature,
model_kwargs={"response_format": {"type": "json_object"}}
)
# 设置文本分割器
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n## ", "\n### ", "\n\n", "\n", "。", ",", " "]
)
defprocess_document(self, document_path: str) -> Dict[str, Any]:
"""处理文档的主方法"""
# 加载文档
loader = TextLoader(document_path, encoding="utf-8")
documents = loader.load()
# 分割文档
splits = self.text_splitter.split_documents(documents)
print(f"文档被分割为 {len(splits)} 个块")
# MAP阶段: 处理每个文档块
map_results = []
for i, doc inenumerate(splits):
print(f"处理文档块 {i+1}/{len(splits)}...")
# 构建提示词
prompt = f"""
请分析以下文档片段,找出所有关键信息点:
{doc.page_content}
以JSON格式返回结果:
{{
"key_points": [
{{
"topic": "主题",
"content": "内容摘要",
"importance": "high/medium/low"
}}
]
}}
"""
# 调用LLM分析
result = self.analyze_llm.invoke(prompt).content
map_results.append(result)
# REDUCE阶段: 合并所有结果
reduce_prompt = f"""
请整合以下分析结果,去除重复点,按重要性排序:
{json.dumps(map_results, ensure_ascii=False, indent=2)}
返回最终整合后的JSON结果:
{{
"key_points": [
{{
"topic": "主题",
"content": "内容摘要",
"importance": "high/medium/low"
}}
],
"summary": "整体摘要"
}}
"""
final_result = self.analyze_llm.invoke(reduce_prompt).content
return json.loads(final_result)
# 使用示例
analyzer = MapReduceDocumentAnalyzer()
results = analyzer.process_document("data/long_document.txt")
print(json.dumps(results, ensure_ascii=False, indent=2))
Map-Reduce模式的强大之处:
这种模式特别适合分析长篇论文、报告、法律文件等大型文档。
Refine Chain是另一种处理大型文档的方法,它通过迭代优化来生成最终结果:
from langchain_openai import ChatOpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import TextLoader
from dotenv import load_dotenv
import json
# 加载环境变量
load_dotenv()
classRefineDocumentAnalyzer:
"""使用 Refine 链的文档分析器类,用于分析长文档"""
def__init__(self,
model_name: str = "qwen-turbo",
api_base: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
temperature: float = 0.1,
chunk_size: int = 4000,
chunk_overlap: int = 500):
# 初始化LLM
self.analyze_llm = ChatOpenAI(
model=model_name,
openai_api_base=api_base,
temperature=temperature,
model_kwargs={"response_format": {"type": "json_object"}}
)
# 设置文本分割器
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n## ", "\n### ", "\n\n", "\n", "。", ",", " "]
)
defprocess_document(self, document_path: str) -> dict:
"""处理文档的主方法"""
# 加载文档
loader = TextLoader(document_path, encoding="utf-8")
documents = loader.load()
# 分割文档
splits = self.text_splitter.split_documents(documents)
print(f"文档被分割为 {len(splits)} 个块")
# 处理第一个文档块
print(f"处理初始文档块 1/{len(splits)}...")
initial_prompt = f"""
请分析以下文档片段:
{splits[0].page_content}
以JSON格式返回关键信息:
{{
"key_points": [
{{
"topic": "主题",
"content": "内容摘要"
}}
],
"summary": "初步摘要"
}}
"""
current_result = self.analyze_llm.invoke(initial_prompt).content
# 依次处理剩余文档块,每次都基于前一次的结果进行精炼
for i, doc inenumerate(splits[1:], 2):
print(f"精炼处理文档块 {i}/{len(splits)}...")
refine_prompt = f"""
已有分析结果:
{current_result}
请基于新的文档片段更新分析结果:
{doc.page_content}
返回更新后的JSON结果,保持相同格式但内容更完整准确。
"""
current_result = self.analyze_llm.invoke(refine_prompt).content
return json.loads(current_result)
# 使用示例
analyzer = RefineDocumentAnalyzer()
results = analyzer.process_document("data/long_document.txt")
print(json.dumps(results, ensure_ascii=False, indent=2))
Refine Chain的优势:
这种模式适合需要连贯性的摘要、报告生成等任务,特别是在分析需要逐步理解深化的专业文档时。
最后一个示例是对话分析器,它能从对话中提取关键信息:
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import json
load_dotenv()
classConversationAnalyzer:
"""对话分析器,用于分析对话内容并提取关键信息"""
def__init__(self,
model_name: str = "qwen-plus",
api_base: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
temperature: float = 0.2):
self.llm = ChatOpenAI(
model=model_name,
openai_api_base=api_base,
temperature=temperature,
model_kwargs={"response_format": {"type": "json_object"}}
)
defanalyze_conversation(self, conversation: list) -> dict:
"""分析对话并提取关键信息"""
# 格式化对话
formatted_conversation = ""
for i, msg inenumerate(conversation):
role = "用户"if msg["role"] == "user"else"助手"
formatted_conversation += f"{role}: {msg['content']}\n\n"
# 构建分析提示词
prompt = f"""
请分析以下对话内容:
{formatted_conversation}
请以JSON格式提取以下信息:
1. 用户意图和需求
2. 情感倾向
3. 关键问题和回答
4. 未解决的问题
5. 后续行动建议
返回格式:
{{
"user_intent": "用户主要意图",
"sentiment": "正面/负面/中性",
"key_qa": [
{{
"question": "关键问题",
"answer": "对应回答",
"is_solved": true/false
}}
],
"unsolved_issues": ["未解决问题1", "未解决问题2"],
"next_steps": ["建议1", "建议2"]
}}
"""
# 调用LLM分析
result = self.llm.invoke(prompt).content
return json.loads(result)
# 使用示例
analyzer = ConversationAnalyzer()
conversation = [
{"role": "user", "content": "你好,我想了解如何使用LangChain构建AI应用"},
{"role": "assistant", "content": "您好!LangChain是一个强大的框架,可以帮助构建基于大语言模型的应用。您有特定的应用场景吗?"},
{"role": "user", "content": "我想做一个能回答公司内部文档问题的AI助手"},
{"role": "assistant", "content": "这是一个很好的应用场景!您需要使用LangChain的RAG(检索增强生成)功能。首先需要处理文档,然后构建向量数据库..."},
{"role": "user", "content": "这听起来很复杂,有没有简单点的方法?而且我担心成本问题"}
]
result = analyzer.analyze_conversation(conversation)
print(json.dumps(result, ensure_ascii=False, indent=2))
对话分析器的应用场景:
这个工具对于需要处理大量对话数据的企业特别有价值,可以大大提高客户服务效率。
在这个系列的最后一篇中,我们探索了LangChain的高级架构:
这些高级架构让我们能够构建更复杂、更强大的AI应用,处理更大规模的数据和更复杂的任务。
希望这个系列对你有所帮助!接下来,就看你如何将这些技术应用到实际项目中了。如果有问题,欢迎在评论区留言!
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2024-10-10
2024-07-13
2024-06-03
2024-04-08
2024-09-04
2024-04-08
2024-08-18
2024-03-28
2024-06-24
2024-07-10
2025-05-08
2025-05-06
2025-04-22
2025-04-18
2025-03-22
2025-03-22
2025-03-15
2025-02-05