2026年7月9日 周四晚上19:30,报名腾讯会议了解“如何构建自进化的动态知识库(Brain)”(限30人)
免费POC, 零成本试错
FDE知识库

FDE知识库

学习大模型的前沿技术与行业落地应用


收藏

基于Agent的金融问答系统:RAG检索模块初建成

发布日期:2024-10-26 09:21:57 浏览次数: 2974
作者:一起AI技术

微信搜一搜,关注“一起AI技术”

前言

我们在上一章《【项目实战】基于Agent的金融问答系统之项目简介》中简单介绍了项目背景以及数据集情况,本章将介绍RAG模块的实现。

功能列表

参考之前所学内容《大模型之初识RAG》,我们需要实现如下功能:

  • • 向量库的基础功能

    • • 连接向量库

    • • 数据入库

  • • 文件导入

    • • PDF文件的读取

    • • PDF文件的切分

    • • 调用向量库接口入库

  • • 文件检索

    • • 连接向量库

    • • 检索器检索文件

开发过程

1、规划工程文件

项目开始之后,我们如果能够抑制住直接撸代码的冲动,改为提前做好规划,这会是一个好的习惯。 为此,我提前做了如下规划:

代码管理

  • • 代码使用Git进行管理,这样后期多人协作时方便代码更新、代码Merge、冲突解决等。

  • • 由于国内访问Github优势会ban,所以我们将代码仓库放在Gitee上。

  • • 为代码仓库起了一个响亮的名称后,仓库地址定为https://gitee.com/deadwalk/smart-finance-bot

项目目录

考虑这个项目会涉及到前端、后端、数据集、模型等,所以项目目录规划如下:

smart-finance-bot \
|- dataset \  # 该目录用于保存PDF以及SQLite数据
|- doc \  # 该目录用于保存文档类文件,例如:需求文档、说明文档、数据文档
|- app \   # 该目录用于服务端代码
|- agent \ # 该目录用于保存agent相关代码
|- rag \   # 该目录用于保存rag相关代码
|-test \   # 该目录用于保存测试类驱动相关代码
|- conf \  # 该目录用于保存配置文件
|-.qwen # 该文件保存QWen的配置文件(请自行添加对应的API KEY)
|-.ernie # 该文件保存百度千帆的配置文件(请自行添加对应的API KEY)
|- chatweb \   # 该目录用于保存前端页面代码
|- scripts \   # 该目录用于保存脚本文件,例如:启动脚本、导入向量数据库脚本等
|- test_result \   # 该目录用于保存测试结果
|- docker \
|- backend \ # 该目录对应后端python服务的Dockerfile
|- frontend \ # 该目录对应前端python服务的Dockerfile

上述目录中,dataset 是直接使用git的submodul命令,直接将天池大赛提供的数据集引入到本项目中,方便后续使用。

引入方法:

git submodule add https://www.modelscope.cn/datasets/BJQW14B/bs_challenge_financial_14b_dataset.git dataset

命名规范

项目如果能够约束统一的命名规范,这对于后续代码的可读性、可维护性会提供需要便利,在此我沿用了约定俗成的代码命名规范:

  • • 类名:使用大驼峰命名法,例如:MyClass

  • • 函数名:使用小驼峰命名法,例如:my_function

  • • 变量名:使用小驼峰命名法,例如:my_variable

  • • 文件夹:使用小驼峰命名法。

整体命名时,要尽量见文知意。

2、实现基本的连接大模型的util库

代码文件及目录:app/utils/util.py


from dotenv import load_dotenv
import os

# 获取当前文件的目录
current_dir = os.path.dirname(__file__)

# 构建到 conf/.qwen 的相对路径
conf_file_path_qwen = os.path.join(current_dir,'..','conf','.qwen')

# 加载千问环境变量
load_dotenv(dotenv_path=conf_file_path_qwen)

defget_qwen_models():
"""
    加载千问系列大模型
    """

# llm 大模型
from langchain_community.llms.tongyi importTongyi

    llm =Tongyi(model="qwen-max", temperature=0.1, top_p=0.7, max_tokens=1024)

# chat 大模型
from langchain_community.chat_models importChatTongyi

    chat =ChatTongyi(model="qwen-max", temperature=0.01, top_p=0.2, max_tokens=1024)
# embedding 大模型
from langchain_community.embeddings importDashScopeEmbeddings

    embed =DashScopeEmbeddings(model="text-embedding-v3")

return llm, chat, embed

在app/conf/.qwen中,添加对应的API KEY,例如:

DASHSCOPE_API_KEY = sk-xxxxxx

3、实现向量库基础功能

向量库文件考虑使用Chroma实现,所以我们先实现一个向量库的类,用于完成基本的向量库连接、数据入库操作。

代码文件及目录:app/rag/chroma_conn.py

import chromadb
from chromadb importSettings
from langchain_chroma importChroma


classChromaDB:
def__init__(self,
                 chroma_server_type="local",  # 服务器类型:http是http方式连接方式,local是本地读取文件方式
                 host="localhost", port=8000,  # 服务器地址,http方式必须指定
                 persist_path="chroma_db",  # 数据库的路径:如果是本地连接,需要指定数据库的路径
                 collection_name="langchain",  # 数据库的集合名称
                 embed=None  # 数据库的向量化函数
                 ):

        self.host = host
        self.port = port
        self.path = persist_path
        self.embed = embed
        self.store =None

# 如果是http协议方式连接数据库
if chroma_server_type =="http":
            client = chromadb.HttpClient(host=host, port=port)

            self.store =Chroma(collection_name=collection_name,
                                embedding_function=embed,
                                client=client)

if chroma_server_type =="local":
            self.store =Chroma(collection_name=collection_name,
                                embedding_function=embed,
                                persist_directory=persist_path)

if self.store isNone:
raiseValueError("Chroma store init failed!")

defadd_with_langchain(self, docs):
"""
        将文档添加到数据库
        """

        self.store.add_documents(documents=docs)

defget_store(self):
"""
        获得向量数据库的对象实例
        """

return self.store

在实际项目实践过程中,我们发现导入Chroma数据时使用本地化连接方式更快一些,所以对连接方式做了两个参数的扩展,local 代表本地连接,http 代表远程连接。

4、实现入库功能

本着先跑通流程,再优化交互过程的思路,对于PDF文件入向量库的过程,我们先通过一段脚本实现(暂不做前端UI的交互)。

代码文件及目录:app/rag/pdf_processor.py

import os
import logging
import time
from tqdm import tqdm
from langchain_community.document_loaders importPyMuPDFLoader
from langchain_text_splitters importRecursiveCharacterTextSplitter
from rag.chroma_conn importChromaDB


classPDFProcessor:
def__init__(self,
                 directory,  # PDF文件所在目录
                 chroma_server_type,  # ChromaDB服务器类型
                 persist_path,  # ChromaDB持久化路径
                 embed):# 向量化函数

        self.directory = directory
        self.file_group_num =80# 每组处理的文件数
        self.batch_num =6# 每次插入的批次数量

        self.chunksize =500# 切分文本的大小
        self.overlap =100# 切分文本的重叠大小

        self.chroma_db =ChromaDB(chroma_server_type=chroma_server_type,
                                  persist_path=persist_path,
                                  embed=embed)
# 配置日志
        logging.basicConfig(
            level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
)

defload_pdf_files(self):
"""
        加载目录下的所有PDF文件
        """

        pdf_files =[]
for file in os.listdir(self.directory):
if file.lower().endswith('.pdf'):
                pdf_files.append(os.path.join(self.directory, file))

        logging.info(f"Found {len(pdf_files)} PDF files.")
return pdf_files

defload_pdf_content(self, pdf_path):
"""
        读取PDF文件内容
        """

        pdf_loader =PyMuPDFLoader(file_path=pdf_path)
        docs = pdf_loader.load()
        logging.info(f"Loading content from {pdf_path}.")
return docs

defsplit_text(self, documents):
"""
        将文本切分成小段
        """

# 切分文档
        text_splitter =RecursiveCharacterTextSplitter(
            chunk_size=self.chunksize,
            chunk_overlap=self.overlap,
            length_function=len,
            add_start_index=True,
)

        docs = text_splitter.split_documents(documents)

        logging.info("Split text into smaller chunks with RecursiveCharacterTextSplitter.")
return docs

definsert_docs_chromadb(self, docs, batch_size=6):
"""
        将文档插入到ChromaDB
        """

# 分批入库
        logging.info(f"Inserting {len(docs)} documents into ChromaDB.")

# 记录开始时间
        start_time = time.time()
        total_docs_inserted =0

# 计算总批次
        total_batches =(len(docs)+ batch_size -1)// batch_size

with tqdm(total=total_batches, desc="Inserting batches", unit="batch")as pbar:
for i inrange(0,len(docs), batch_size):
# 获取当前批次的样本
                batch = docs[i:i + batch_size]

# 将样本入库
                self.chroma_db.add_with_langchain(batch)
# self.chroma_db.async_add_with_langchain(batch)

# 更新已插入的文档数量
                total_docs_inserted +=len(batch)

# 计算并显示当前的TPM
                elapsed_time = time.time()- start_time  # 计算已用时间(秒)
if elapsed_time >0:# 防止除以零
                    tpm =(total_docs_inserted / elapsed_time)*60# 转换为每分钟插入的文档数
                    pbar.set_postfix({"TPM":f"{tpm:.2f}"})# 更新进度条的后缀信息

# 更新进度条
                pbar.update(1)

defprocess_pdfs_group(self, pdf_files_group):
# 读取PDF文件内容
        pdf_contents =[]

for pdf_path in pdf_files_group:
# 读取PDF文件内容
            documents = self.load_pdf_content(pdf_path)

# 将documents 逐一添加到pdf_contents
            pdf_contents.extend(documents)

# 将文本切分成小段
        docs = self.split_text(pdf_contents)

# 将文档插入到ChromaDB
        self.insert_docs_chromadb(docs, self.batch_num)

defprocess_pdfs(self):
# 获取目录下所有的PDF文件
        pdf_files = self.load_pdf_files()

        group_num = self.file_group_num

# group_num 个PDF文件为一组,分批处理
for i inrange(0,len(pdf_files), group_num):
            pdf_files_group = pdf_files[i:i + group_num]
            self.process_pdfs_group(pdf_files_group)

print("PDFs processed successfully!")

5、测试导入功能

因为Python的导入库的原因(一般都是从工作目录查找),所以我们在项目根目录下创建test_framework.py,方便后续统一测试工作。

smart-finance-bot \
    |- app \   # 该目录用于服务端代码
        |- rag \   # 该目录用于保存rag相关代码
            |- pdf_processor.py
            |- chroma_conn.py
        |- test_framework.py

代码文件: app/test_framework.py

# 测试导入PDF到向量库主流程
deftest_import():
from rag.pdf_processor importPDFProcessor
from utils.util import get_qwen_models

    llm , chat , embed = get_qwen_models()
# embed = get_huggingface_embeddings()

    directory ="./app/dataset/pdf"
    persist_path ="chroma_db"
    server_type ="local"

# 创建 PDFProcessor 实例
    pdf_processor =PDFProcessor(directory=directory,
                                 chroma_server_type=server_type,
                                 persist_path=persist_path,
                                 embed=embed)

# 处理 PDF 文件
    pdf_processor.process_pdfs()

if __name__ =="__main__":
    test_import()

1、通过命令行启动ChromaDB服务端:

chroma run --path chroma_db --port 8000 --host localhost

2、运行test_framework.py

运行效果:

备注:一般测试框架会使用Pytest并且编写相应的单元测试函数,本次项目中由于项目较小且函数返回结果不固定,所以就没有写UnitTest。如果想了解Pytest的使用示例,可以参考我的其他代码仓库,例如:UnitTest的使用

6、实现检索功能

代码文件: app/rag/rag.py

import logging
from langchain_core.prompts importChatPromptTemplate
from langchain_core.runnables importRunnablePassthrough
from langchain_core.runnables.base importRunnableLambda
from langchain_core.output_parsers importStrOutputParser
from.chroma_conn importChromaDB

# 配置日志记录
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s')


classRagManager:
def__init__(self,
                 chroma_server_type="http",
                 host="localhost", port=8000,
                 persist_path="chroma_db",
                 llm=None, embed=None):
        self.llm = llm
        self.embed = embed

        chrom_db =ChromaDB(chroma_server_type=chroma_server_type,
                            host=host, port=port,
                            persist_path=persist_path,
                            embed=embed)
        self.store = chrom_db.get_store()

defget_chain(self, retriever):
"""获取RAG查询链"""

# RAG系统经典的 Prompt (A 增强的过程)
        prompt =ChatPromptTemplate.from_messages([
("human","""You are an assistant for question-answering tasks. Use the following pieces 
          of retrieved context to answer the question. 
          If you don't know the answer, just say that you don't know. 
          Use three sentences maximum and keep the answer concise.
          Question: {question} 
          Context: {context} 
          Answer:"""
)
])
# 将 format_docs 方法包装为 Runnable
        format_docs_runnable =RunnableLambda(self.format_docs)
# RAG 链
        rag_chain =(
{"context": retriever | format_docs_runnable,
"question":RunnablePassthrough()}
| prompt
| self.llm
|StrOutputParser()
)

return rag_chain

defformat_docs(self, docs):
# 返回检索到的资料文件名称
        logging.info(f"检索到资料文件个数:{len(docs)}")
        retrieved_files ="\n".join([doc.metadata["source"]for doc in docs])
        logging.info(f"资料文件分别是:\n{retrieved_files}")

        retrieved_content ="\n\n".join(doc.page_content for doc in docs)
        logging.info(f"检索到的资料为:\n{retrieved_content}")

return retrieved_content

defget_retriever(self, k=4, mutuality=0.3):
        retriever = self.store.as_retriever(search_type="similarity_score_threshold",
                                            search_kwargs={"k": k,"score_threshold": mutuality})

return retriever

defget_result(self, question, k=4, mutuality=0.3):
"""获取RAG查询结果"""

        retriever = self.get_retriever(k, mutuality)

        rag_chain = self.get_chain(retriever)

return rag_chain.invoke(input=question)

以上是实现了一个使用基本检索器的RAG,其中:

  • • 代码中通过chroma_conn.py模块连接到ChromaDB数据库,并使用ChromaDB的as_retriever方法创建一个检索器。

7、测试检索效果

在test_framework.py中添加RAG的测试调用函数。

代码文件:app/test_framework.py

# 测试RAG主流程
deftest_rag():
from rag.rag importRagManager
from utils.util import get_qwen_models

    llm, chat, embed = get_qwen_models()
    rag =RagManager(host="localhost", port=8000, llm=llm, embed=embed)

    result = rag.get_result("湖南长远锂科股份有限公司变更设立时作为发起人的法人有哪些?")

print(result)


if __name__ =="__main__":
    test_rag()# RAG测试函数
# test_import()         # 批量导入PDF测试函数

注释掉批量导入函数,开启test_rag()函数,运行效果:

至此,我们完成了RAG模块的基本功能,它包括PDF文件的批量导入以及检索功能。

内容小结

  • • 首先,我们创建了一个ChromaDB的类,封装了基础的Chroma连接、插入、检索。

  • • 其次,我们实现了PDFProcessor类,该类中会调用ChromaDB类的插入函数,将批量读取的PDF文件进行切分后保存至向量库。

  • • 然后,我们实现了RagManager类,该类中封装了RAG的检索链,并定义了检索的参数。

  • • 最后,我们实现了一个测试函数,用于测试RAG的检索功能。

  • • 除此之外,有两个注意事项:

    • • 在项目初期,进行合理的项目文件目录规划,可以有效减少项目维护的难度。

    • • 在项目行进中,通过搭建测试框架,可以方便函数验证以及后续重构的回归测试。

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询

扫码登录
登录即表示您同意《53AI网站服务协议》
服务协议

欢迎您使用【53AI 官方网站】(以下简称“本网站”或“我们”)。本《会员服务协议》(以下简称“本协议”)是您(以下简称“会员”或“用户”)与【深圳市博思协创网络科技有限公司】之间关于注册、登录及使用本网站会员服务所订立的法律协议。

在您注册或登录前,请务必审慎阅读、充分理解各条款内容,特别是免除或限制责任的条款、知识产权条款、争议解决条款等。此类条款将以加粗形式提示您注意。 当您通过微信公众号授权、手机验证码验证或其他方式成功登录本网站时,即视为您已完全理解并同意接受本协议的全部内容。

一、 定义

本网站:指由【深圳市博思协创网络科技有限公司】运营的,域名为【53ai.com】的网站及相关移动端页面。

会员服务:指本网站向注册会员提供的知识库文章查阅、内容检索及其他相关增值服务。

知识库内容:指本网站发布的包括但不限于文字、图表、数据、研究报告、行业分析等数字化内容资源。

二、 账号注册与登录

登录方式:本网站支持以下登录方式,您可根据实际情况选择:

微信公众号授权登录:您同意将您的微信OpenID信息授权给本网站,用于创建或关联会员账号。

手机验证码登录:您需提供真实有效的手机号码,并通过短信验证码完成身份验证与登录/注册。

账号安全:您的账号仅限您本人使用,禁止赠与、借用、租用、转让或售卖。因您保管不善导致的账号被盗、密码泄露等损失,由您自行承担。

实名认证:根据相关法律法规要求,我们可能要求您在特定功能下完成实名认证。如您拒绝提供,可能无法使用部分或全部服务。

未成年人保护:若您未满18周岁,请在法定监护人的陪同下阅读本协议,并在征得监护人同意后使用本服务。

三、 服务内容与规范

知识库查阅权限:会员登录后,有权按照其会员等级对应的权限范围,在线浏览、检索本网站知识库中的相关文章及内容。

服务变更:我们有权根据业务发展需要,调整、变更或终止部分服务内容,并将以网站公告、公众号消息等方式提前通知。

禁止行为:您在使用服务时不得实施以下行为:

利用技术手段批量爬取、下载、转存知识库内容;

将知识库内容用于商业目的或未经授权地向第三方传播;

干扰本网站正常运行或侵犯其他用户合法权益;

发布违法违规信息或从事违反公序良俗的活动。

四、 知识产权声明

权利归属:本网站知识库中的排版设计、软件代码等内容的知识产权均归【公司全称】或原权利人所有,受《中华人民共和国著作权法》等法律保护。

有限许可:本网站授予会员一项非独占、不可转让、不可转授权的普通许可,仅限于个人学习、研究之目的在线查阅知识库内容。

侵权追责:未经书面许可,任何单位或个人不得以任何形式复制、转载、摘编、镜像、汇编或以其他方式使用上述内容。一经发现,我们保留追究其法律责任的权利。

五、 个人信息保护

我们重视对您个人信息的保护。关于我们如何收集、使用、存储和保护您的个人信息,请单独阅读 《隐私政策》。

您通过微信公众号授权或手机号验证所提供的信息,我们将严格按照《个人信息保护法》的规定处理,仅用于身份识别、服务提供及安全验证等必要用途。

您可以随时通过网站设置或联系客服行使查阅、更正、删除个人信息及撤回授权同意的权利。

六、 免责声明

内容准确性:知识库内容仅供参考,不构成专业建议。我们不对其完整性、准确性、时效性作任何明示或暗示的保证,您应自行判断并承担使用风险。

不可抗力:因自然灾害、政策法规变化、网络故障、第三方平台接口异常(如微信接口维护、运营商短信通道故障)等不可抗力导致的服务中断或延迟,我们不承担违约责任。

第三方链接:本网站可能包含指向第三方网站的链接,该等网站的内容和服务不受我们控制,请您自行甄别风险。

七、 违约责任

如您违反本协议约定,我们有权视情节采取警告、限制功能、暂停服务、注销账号等措施,并保留要求赔偿损失的权利。

如因您的违约行为导致我们遭受行政处罚、第三方索赔或商誉损失,您应承担全部赔偿责任(包括但不限于罚款、赔偿金、律师费、公证费等)。

八、 法律适用与争议解决

本协议的订立、执行和解释均适用中华人民共和国大陆地区法律。

因本协议产生的或与本协议有关的任何争议,双方应友好协商解决;协商不成的,任何一方均可向【公司所在地】有管辖权的人民法院提起诉讼。

九、 其他

本协议构成双方就本服务达成的完整协议,取代此前任何口头或书面约定。

本协议任一条款被认定为无效或不可执行的,不影响其他条款的效力。

我们对本协议享有最终解释权,并在法律允许的范围内保留随时修改的权利。修改后的协议一经公布即生效,继续使用服务即视为同意修订内容。


已查阅