支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


LangChain:工具链和大型文档处理

发布日期:2025-05-14 14:05:58 浏览次数: 1533 作者:胡十一的生命力
推荐语

探索LangChain如何通过高级架构提升AI应用处理复杂任务的能力。

核心内容:
1. LangChain系列文章的高级架构设计要点
2. 环境配置和API密钥的设置方法
3. 工具链(ToolChain)模式在AI应用中的实践示例

杨芳贤
53A创始人/腾讯云(TVP)最具价值专家

 

高级架构让AI更强大

大家好,我是胡十一!这是我们LangChain系列的最后一篇,今天要分享的是更高级的架构设计:如何用工具链组织复杂任务,以及如何处理超大型文档。这些技术能让你的AI应用更加强大!

环境准备

要运行本文示例,需要以下环境配置:

  1. 1. Python 3.10 (基于Anaconda环境)
  2. 2. 安装必要依赖:
pip install langchain langchain-openai langchain-community python-dotenv 
  1. 3. 安装额外依赖:
pip install faiss-cpu unstructured
  1. 4. 配置API密钥(.env文件):
OPENAI_API_KEY=你的阿里云API密钥
  1. 5. 创建示例数据:
mkdir -p data
echo "这是测试长文档..." > data/long_document.txt

为什么需要高级架构?

当我们的AI应用变得越来越复杂,简单的单模型调用已经不够用了。比如:

  • • 处理一份几百页的文档
  • • 需要多个步骤才能完成的任务
  • • 需要不同专业领域智能体协作的任务

这时候,我们需要更高级的架构设计,让AI能够处理复杂任务。

示例17:工具链 (ToolChain)

工具链是一种强大的设计模式,可以将多个工具按特定顺序组合使用:

import os
from typing importListDictAnyCallableOptional
from langchain_core.tools import Tool
from dotenv import load_dotenv
import logging
from datetime import datetime
from collections import Counter

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'toolchain_debug_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# 加载环境变量
load_dotenv()

classToolChain:
    """工具链基类,用于管理工具的执行流程"""
    
    def__init__(self, name: str):
        self.name = name
        self.tools: List[Tool] = []
        self.conditions: List[Callable] = []
        self.metadata: Dict[strAny] = {}
        
    defadd_tool(self, tool: Tool, condition: Optional[Callable] = None):
        """添加工具到链中"""
        self.tools.append(tool)
        self.conditions.append(condition or (lambda x: True))
        
    defexecute(self, input_data: Any) -> Any:
        """执行工具链"""
        result = input_data
        for tool, condition inzip(self.tools, self.conditions):
            if condition(result):
                try:
                    logger.info(f"执行工具: {tool.name}")
                    result = tool.func(result)
                    logger.info(f"工具 {tool.name} 执行完成")
                except Exception as e:
                    logger.error(f"工具 {tool.name} 执行失败: {str(e)}")
                    raise
        return result
    
    defget_tools(self) -> List[Tool]:
        """获取工具链中的所有工具"""
        returnself.tools

# 示例:文本处理工具链
classTextProcessingChain(ToolChain):
    """文本处理工具链"""
    
    def__init__(self):
        super().__init__("text_processing")
        self.add_tool(
            Tool(
                name="text_statistics",
                description="统计文本的基本信息",
                func=self.text_statistics
            ),
            condition=lambda x: isinstance(x, str)
        )
        self.add_tool(
            Tool(
                name="format_text",
                description="格式化文本",
                func=self.format_text
            ),
            condition=lambda x: isinstance(x, str)
        )
    
    deftext_statistics(self, text: str) -> str:
        """文本统计工具"""
        try:
            char_count = len(text)
            word_count = len(text.split())
            line_count = len(text.splitlines())
            words = text.lower().split()
            word_freq = Counter(words).most_common(3)
            
            result = f"""文本统计结果:
- 字符数: {char_count}
- 词数: {word_count}
- 行数: {line_count}
- 最常用词: {', '.join(f'{word}({count}次)' for word, count in word_freq)}"""

            return result
        except Exception as e:
            returnf"统计错误: {str(e)}"
    
    defformat_text(self, text: str) -> str:
        """文本格式化工具"""
        try:
            # 简单示例:转换为大写
            return text.upper()
        except Exception as e:
            returnf"格式化错误: {str(e)}"

# 使用示例
text_chain = TextProcessingChain()
result = text_chain.execute("这是一个示例文本。这是第二句话。这是测试文本。")
print(result)

这个例子展示了工具链的核心思想:

  1. 1. 创建一个工具链基类,管理工具的添加和执行
  2. 2. 定义特定领域的工具链类,如文本处理链
  3. 3. 每个工具都有特定的执行条件,只有满足条件才会执行
  4. 4. 工具链按添加顺序依次执行,前一工具的输出是后一工具的输入

使用工具链的好处是什么?

  • • 模块化:每个工具专注做一件事,便于维护
  • • 可重用:工具可以在不同工具链中重复使用
  • • 可扩展:随时添加新工具到链中
  • • 条件执行:根据条件决定是否执行某工具

这种架构特别适合复杂的多步骤任务,比如数据分析流程、自动化报告生成等。

示例18:Map-Reduce 分析器

处理大型文档时,我们可以使用Map-Reduce模式:先将文档分割成小块,然后并行处理每个块,最后合并结果。

from langchain_openai import ChatOpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import TextLoader
from dotenv import load_dotenv
import json
import os
import time
from typing importListDictAny

# 加载环境变量
load_dotenv()

classMapReduceDocumentAnalyzer:
    """使用 Map-Reduce 链的文档分析器类,用于分析长文档"""
    
    def__init__(self, 
                 model_name: str = "qwen-turbo",
                 api_base: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
                 temperature: float = 0.1
                 chunk_size: int = 4000,
                 chunk_overlap: int = 500
):
        
        # 初始化LLM
        self.analyze_llm = ChatOpenAI(
            model=model_name,
            openai_api_base=api_base,
            temperature=temperature,
            model_kwargs={"response_format": {"type""json_object"}}
        )
        
        # 设置文本分割器
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n## ""\n### ""\n\n""\n""。"","" "]
        )
    
    defprocess_document(self, document_path: str) -> Dict[strAny]:
        """处理文档的主方法"""
        # 加载文档
        loader = TextLoader(document_path, encoding="utf-8")
        documents = loader.load()
        
        # 分割文档
        splits = self.text_splitter.split_documents(documents)
        print(f"文档被分割为 {len(splits)} 个块")
        
        # MAP阶段: 处理每个文档块
        map_results = []
        for i, doc inenumerate(splits):
            print(f"处理文档块 {i+1}/{len(splits)}...")
            
            # 构建提示词
            prompt = f"""
            请分析以下文档片段,找出所有关键信息点:
            
            {doc.page_content}
            
            以JSON格式返回结果:
            {{
              "key_points": [
                {{
                  "topic": "主题",
                  "content": "内容摘要",
                  "importance": "high/medium/low"
                }}
              ]
            }}
            """

            
            # 调用LLM分析
            result = self.analyze_llm.invoke(prompt).content
            map_results.append(result)
        
        # REDUCE阶段: 合并所有结果
        reduce_prompt = f"""
        请整合以下分析结果,去除重复点,按重要性排序:
        
        {json.dumps(map_results, ensure_ascii=False, indent=2)}
        
        返回最终整合后的JSON结果:
        {{
          "key_points": [
            {{
              "topic": "主题",
              "content": "内容摘要",
              "importance": "high/medium/low"
            }}
          ],
          "summary": "整体摘要"
        }}
        """

        
        final_result = self.analyze_llm.invoke(reduce_prompt).content
        return json.loads(final_result)

# 使用示例
analyzer = MapReduceDocumentAnalyzer()
results = analyzer.process_document("data/long_document.txt")
print(json.dumps(results, ensure_ascii=False, indent=2))

Map-Reduce模式的强大之处:

  1. 1. 并行处理:可以同时处理多个文档块,提高效率
  2. 2. 突破长度限制:能处理远超模型上下文窗口的长文档
  3. 3. 全局视角:通过Reduce阶段整合全文信息,保持整体一致性

这种模式特别适合分析长篇论文、报告、法律文件等大型文档。

示例19:Refine Chain (精炼链)

Refine Chain是另一种处理大型文档的方法,它通过迭代优化来生成最终结果:

from langchain_openai import ChatOpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import TextLoader
from dotenv import load_dotenv
import json

# 加载环境变量
load_dotenv()

classRefineDocumentAnalyzer:
    """使用 Refine 链的文档分析器类,用于分析长文档"""
    
    def__init__(self, 
                 model_name: str = "qwen-turbo",
                 api_base: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
                 temperature: float = 0.1
                 chunk_size: int = 4000,
                 chunk_overlap: int = 500
):
        
        # 初始化LLM
        self.analyze_llm = ChatOpenAI(
            model=model_name,
            openai_api_base=api_base,
            temperature=temperature,
            model_kwargs={"response_format": {"type""json_object"}}
        )
        
        # 设置文本分割器
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n## ""\n### ""\n\n""\n""。"","" "]
        )
    
    defprocess_document(self, document_path: str) -> dict:
        """处理文档的主方法"""
        # 加载文档
        loader = TextLoader(document_path, encoding="utf-8")
        documents = loader.load()
        
        # 分割文档
        splits = self.text_splitter.split_documents(documents)
        print(f"文档被分割为 {len(splits)} 个块")
        
        # 处理第一个文档块
        print(f"处理初始文档块 1/{len(splits)}...")
        initial_prompt = f"""
        请分析以下文档片段:
        
        {splits[0].page_content}
        
        以JSON格式返回关键信息:
        {{
          "key_points": [
            {{
              "topic": "主题",
              "content": "内容摘要"
            }}
          ],
          "summary": "初步摘要"
        }}
        """

        
        current_result = self.analyze_llm.invoke(initial_prompt).content
        
        # 依次处理剩余文档块,每次都基于前一次的结果进行精炼
        for i, doc inenumerate(splits[1:], 2):
            print(f"精炼处理文档块 {i}/{len(splits)}...")
            
            refine_prompt = f"""
            已有分析结果:
            {current_result}
            
            请基于新的文档片段更新分析结果:
            {doc.page_content}
            
            返回更新后的JSON结果,保持相同格式但内容更完整准确。
            """

            
            current_result = self.analyze_llm.invoke(refine_prompt).content
        
        return json.loads(current_result)

# 使用示例
analyzer = RefineDocumentAnalyzer()
results = analyzer.process_document("data/long_document.txt")
print(json.dumps(results, ensure_ascii=False, indent=2))

Refine Chain的优势:

  1. 1. 渐进改进:每次考虑新内容时都保留之前的分析结果
  2. 2. 全局一致性:整个过程保持单一的结果对象,逐步完善
  3. 3. 内存效率:只需保留当前状态,无需存储所有中间结果

这种模式适合需要连贯性的摘要、报告生成等任务,特别是在分析需要逐步理解深化的专业文档时。

示例20:对话分析器

最后一个示例是对话分析器,它能从对话中提取关键信息:

from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import json

load_dotenv()

classConversationAnalyzer:
    """对话分析器,用于分析对话内容并提取关键信息"""
    
    def__init__(self, 
                 model_name: str = "qwen-plus",
                 api_base: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
                 temperature: float = 0.2
):
        
        self.llm = ChatOpenAI(
            model=model_name,
            openai_api_base=api_base,
            temperature=temperature,
            model_kwargs={"response_format": {"type""json_object"}}
        )
    
    defanalyze_conversation(self, conversation: list) -> dict:
        """分析对话并提取关键信息"""
        
        # 格式化对话
        formatted_conversation = ""
        for i, msg inenumerate(conversation):
            role = "用户"if msg["role"] == "user"else"助手"
            formatted_conversation += f"{role}{msg['content']}\n\n"
        
        # 构建分析提示词
        prompt = f"""
        请分析以下对话内容:
        
        {formatted_conversation}
        
        请以JSON格式提取以下信息:
        1. 用户意图和需求
        2. 情感倾向
        3. 关键问题和回答
        4. 未解决的问题
        5. 后续行动建议
        
        返回格式:
        {{
          "user_intent": "用户主要意图",
          "sentiment": "正面/负面/中性",
          "key_qa": [
            {{
              "question": "关键问题",
              "answer": "对应回答",
              "is_solved": true/false
            }}
          ],
          "unsolved_issues": ["未解决问题1", "未解决问题2"],
          "next_steps": ["建议1", "建议2"]
        }}
        """

        
        # 调用LLM分析
        result = self.llm.invoke(prompt).content
        return json.loads(result)

# 使用示例
analyzer = ConversationAnalyzer()
conversation = [
    {"role""user""content""你好,我想了解如何使用LangChain构建AI应用"},
    {"role""assistant""content""您好!LangChain是一个强大的框架,可以帮助构建基于大语言模型的应用。您有特定的应用场景吗?"},
    {"role""user""content""我想做一个能回答公司内部文档问题的AI助手"},
    {"role""assistant""content""这是一个很好的应用场景!您需要使用LangChain的RAG(检索增强生成)功能。首先需要处理文档,然后构建向量数据库..."},
    {"role""user""content""这听起来很复杂,有没有简单点的方法?而且我担心成本问题"}
]

result = analyzer.analyze_conversation(conversation)
print(json.dumps(result, ensure_ascii=False, indent=2))

对话分析器的应用场景:

  1. 1. 客服质量监控:自动评估客服对话质量
  2. 2. 用户意图分析:识别用户真正需求
  3. 3. 情感分析:了解用户情绪变化
  4. 4. 自动生成跟进计划:基于对话制定后续行动

这个工具对于需要处理大量对话数据的企业特别有价值,可以大大提高客户服务效率。

小结

在这个系列的最后一篇中,我们探索了LangChain的高级架构:

  1. 1. 工具链:将多个工具按顺序组合使用
  2. 2. Map-Reduce模式:并行处理大型文档
  3. 3. Refine Chain:通过迭代优化生成结果
  4. 4. 对话分析器:从对话中提取关键信息

这些高级架构让我们能够构建更复杂、更强大的AI应用,处理更大规模的数据和更复杂的任务。

希望这个系列对你有所帮助!接下来,就看你如何将这些技术应用到实际项目中了。如果有问题,欢迎在评论区留言!

#LangChain #工具链 #大型文档处理 #对话分析

 

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询