微信扫码
添加专属顾问
我要投稿
告别手动录入发票的烦恼,用Dify打造批量识别神器,轻松处理上百张发票! 核心内容: 1. 发票批量识别的痛点与市场需求 2. 基于Dify搭建自动化流程的技术方案 3. 从PDF转换到AI识别的完整实现代码
相信我们每个人,多少都和发票打过交道,有不少人都体验过报销时整理票据的烦恼。
如果你是开公司的小伙伴,还没有专职财务,那每月处理发票的滋味,就更是一言难尽了。
当然,如果只是一两张发票,现在随便一个AI模型都能帮你搞定。但真正的考验是,当几十上百张发票堆在面前,并且每一个数据都要求精准无误时,该怎么办?
尤其对于小微企业和代理记账公司来说,这几乎是每月固定的体力活。
当用AI处理掉这些琐碎事情的时候,真的挺有成就感的,不管是自己的需求还是帮朋友解决的问题,都能让我们暂时喘口气。
今天,我就想完整分享一下,我是如何用dify来搭建一套批量处理发票的自动化流程,彻底告别这种烦恼。
下图是在官方税务查询的界面,但是数据还是不够明细。
在正式搭建工作流之前,需要完成两项准备。
先聊一下为什么要转成图片吧,开始用文档提取器识别发票的PDF文件,但是识别的效果不好,尤其是商品详情那块,格式有点乱,识别的结果总是串行。
所以最后决定用阿里千问的多模型来做发票识别,把本地发票的PDF文件统一转换成图片进行输出。
代码中需要用到一个开源工具poppler:
https://github.com/oschwartz10612/poppler-windows/releases
转换代码:pdf_to_images.py
from pdf2image import convert_from_path
import os
# 配置参数
input_folder = 'pdf'# 当前目录下的 pdf 文件夹
output_folder = 'output_images'# 图片输出文件夹
poppler_path = os.path.abspath('poppler-24.08.0/Library/bin')
ifnot os.path.exists(output_folder):
os.makedirs(output_folder)
# 批量处理 pdf 文件夹下所有 PDF 文件
for filename in os.listdir(input_folder):
if filename.lower().endswith('.pdf'):
pdf_path = os.path.join(input_folder, filename)
pdf_base_name = os.path.splitext(os.path.basename(pdf_path))[0]
try:
for i, page inenumerate(convert_from_path(pdf_path, poppler_path=poppler_path)):
img_path = os.path.join(output_folder, f'{pdf_base_name}_page_{i+1}.png')
page.save(img_path, 'PNG')
print(f'{filename} 已成功转换为图片并保存到文件夹!')
except Exception as e:
print(f'转换 {filename} 时出错:{e}')
本地执行:
转换后的目录:
工作流最后是写入到飞书多维表格中,需要创建一个多维表格,记录url地址。
多维表格需要关联飞书应用,这块不会的参考我以前的文章:
《Dify接入飞书多维表格,5步搞定,全网最详细教程(建议收藏)》
我这里是提取了发票上能提取的所有信息
开票日期是日期类型,合计金额、合计税额、价税合计三个字段为数字类型,其他为文本类型。
工作流就是上传图片->大模型分析数据->代码节点格式转换->写入飞书多维表格->输出结果。
开始节点增加一个upload参数,单文件,图片类型。
这是实现发票识别的核心节点。
提示词如下:
# 角色
你是一个专业的发票信息提取助手。
# 任务
你的任务是从图片中,精准地提取所有关键信息,并严格按照指定的 JSON 格式输出。请忽略文本中与发票无关的干扰项(如“下载次数”)。
# 输入变量
发票的原始文本内容将通过变量 `{{upload}}` 传入。
# 输出要求
1.**严格的JSON格式**:你的回答必须是一个完整的、格式正确的 JSON 对象,不能包含任何 JSON 之外的解释、注释或文字。
2.**数据清洗**:
* 对于金额和数量,只保留数字,去除货币符号(如 `¥`)。
* 对于日期,统一输出为 `YYYY-MM-DD` 格式。
* 对于税率,输出百分比字符串,例如 `"13%"`。
* 如果某一项信息在发票中不存在,请使用 `null` 或者空字符串 `""` 作为值。
3.**处理项目列表**:发票中的“项目名称”部分可能包含多个商品或服务,甚至可能包含折扣(金额为负数)。你需要将每一个项目都作为一个独立的对象,放入 `items` 数组中。
# JSON 输出格式
```json
{
"invoice_title": "string",
"invoice_number": "string",
"issue_date": "string",
"buyer_info": {
"name": "string",
"tax_id": "string"
},
"seller_info": {
"name": "string",
"tax_id": "string"
},
"items": [
{
"name": "string",
"model": "string",
"unit": "string",
"quantity": "number | null",
"unit_price": "number | null",
"amount": "number",
"tax_rate": "string",
"tax_amount": "number"
}
],
"total_amount_exclusive_tax": "number",
"total_tax_amount": "number",
"total_amount_inclusive_tax": {
"in_words": "string",
"in_figures": "number"
},
"remarks": "string",
"issuer": "string"
}
LLM节点返回的JSON数据无法直接被飞书插件使用,代码节点就是把大模型分析的json数组转成飞书多维表格插件可以识别的格式。
这里需要注意两个问题:
1.飞书API要求的日期格式为毫秒级时间戳;
2.LLM的输出可能被Markdown代码块包裹。
完整代码如下:
import json
import re
from datetime import datetime, timezone
defmain(output: str) -> dict:
# 去除 markdown 代码块标记
output = re.sub(r"^```[\w]*\s*", "", output.strip(), flags=re.IGNORECASE)
output = re.sub(r"```$", "", output.strip())
output = output.strip()
data = json.loads(output)
# 日期字段转为时间戳(毫秒)
date_str = data.get("issue_date", "").replace("/", "-").replace(".", "-").strip()
# 转为时间戳(毫秒)
if date_str:
dt = datetime.strptime(date_str, "%Y-%m-%d")
# 如果你需要时区,改为 dt.replace(tzinfo=timezone.utc)
timestamp = int(dt.replace(tzinfo=timezone.utc).timestamp() * 1000)
else:
timestamp = None
items_str = json.dumps(data["items"], ensure_ascii=False)
record = {
"发票抬头(invoice_title)": data.get("invoice_title", ""),
"发票号码(invoice_number)": data.get("invoice_number", ""),
"开票日期(issue_date)": timestamp,
"购买方名称(buyer_info.name)": data["buyer_info"].get("name", ""),
"购买方税号(buyer_info.tax_id)": data["buyer_info"].get("tax_id", ""),
"销售方名称(seller_info.name)": data["seller_info"].get("name", ""),
"销售方税号(seller_info.tax_id)": data["seller_info"].get("tax_id", ""),
"商品详情(items)": items_str,
"合计金额(total_amount_exclusive_tax)": data.get("total_amount_exclusive_tax", ""),
"合计税额(total_tax_amount)": data.get("total_tax_amount", ""),
"价税合计大写(total_amount_inclusive_tax.in_words)": data["total_amount_inclusive_tax"].get("in_words", ""),
"价税合计小写(total_amount_inclusive_tax.in_figures)": data["total_amount_inclusive_tax"].get("in_figures", ""),
"备注(remarks)": data.get("remarks", ""),
"开票人(issuer)": data.get("issuer", "")
}
return {
"result": json.dumps([record], ensure_ascii=False)
}
填写两个参数。
这里注意一下,飞书多维表格的日期类型,传入的是时间戳,所以代码节点这块做了一层转换的。
为了测试方便,返回LLM节点识别出来的发票数据。
上传一个发票图片进行测试。
整体JSON结构如下:
```json
{
"invoice_title":"电子发票(普通发票)",
"invoice_number":"2511XXX120",
"issue_date":"2025-06-06",
"buyer_info":{
"name":"中国XXX大学",
"tax_id":"A1XXX9621"
},
"seller_info":{
"name":"北京XXX限公司",
"tax_id":"91XXXXX057T"
},
"items":[
{
"name":"*电子元件*RFID读卡器",
"model":"FX600-B",
"unit":"个",
"quantity":1,
"unit_price":116.831683168317,
"amount":116.83,
"tax_rate":"1%",
"tax_amount":1.17
},
{
"name":"*电子元件*NFC标签卡",
"model":"M1",
"unit":"个",
"quantity":25,
"unit_price":1.039603960396,
"amount":25.99,
"tax_rate":"1%",
"tax_amount":0.26
}
],
"total_amount_exclusive_tax":142.82,
"total_tax_amount":1.43,
"total_amount_inclusive_tax":{
"in_words":"壹佰肆拾肆圆贰角伍分",
"in_figures":144.25
},
"remarks":"",
"issuer":"XX"
}
```
飞书多维表格会多一行记录。代码工作流执行正常。
记住这个API秘钥:app-zT4UQvqqeLKfq0hX8sKDAOxj,后面脚本中会用到。
批量处理本地发票文件,是转换好的图片文件。
这个脚本会遍历本地
output_images文件夹中的所有图片,对每张图片执行两个步骤:
参数类型记得是图片,与开始节点我们选择的类型一致。
batch_invoice_recognize.py
import os
import requests
import json
import csv
# 配置区
API_KEY = "app-zT4UQvqqeLKfq0hX8sKDAOxj"
USER = "rongjie"
DIFY_SERVER = "http://127.0.0.1"# 你的Dify服务器地址
DIR_PATH = "output_images" # 需要批量处理的图片目录
RESULT_FILE = "result.csv" # 结果保存文件
defupload_image(file_path, api_key):
url = f'{DIFY_SERVER}/v1/files/upload'
headers = {'Authorization': f'Bearer {api_key}'}
ext = os.path.splitext(file_path)[-1].lower()
mime = 'image/png'if ext == '.png'else'image/jpeg'
withopen(file_path, 'rb') as file:
files = {'file': (os.path.basename(file_path), file, mime)}
data = {'user': USER}
response = requests.post(url, headers=headers, files=files, data=data)
if response.status_code == 201:
print(f"[上传成功] {file_path}")
resp = response.json()
print("上传返回信息:", resp)
return resp['id']
else:
print(f"[上传失败] {file_path},状态码: {response.status_code},返回: {response.text}")
returnNone
defrun_workflow(file_id, api_key):
url = f"{DIFY_SERVER}/v1/workflows/run"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
data = {
"inputs": {
"upload": {
"type": "image",
"transfer_method": "local_file",
"upload_file_id": file_id
}
},
"user": USER,
"response_mode": "blocking"
}
import pprint
print("工作流调用参数:")
pprint.pprint(data)
response = requests.post(url, headers=headers, json=data)
print("工作流返回内容:", response.status_code, response.text)
if response.status_code == 200:
resp_json = response.json()
wf_status = resp_json.get("data", {}).get("status")
if wf_status == "succeeded":
outputs = resp_json.get("data", {}).get("outputs")
print(f"[工作流执行成功] 文件ID: {file_id}")
return resp_json, True, outputs
else:
error_info = resp_json.get("data", {}).get("error") or resp_json.get("message")
print(f"[工作流内部失败] 文件ID: {file_id},状态: {wf_status}")
return resp_json, False, error_info
else:
print(f"[工作流执行失败] 文件ID: {file_id},状态码: {response.status_code},返回: {response.text}")
return {"status": "error", "message": f"Failed to execute workflow, status code: {response.status_code}"}, False, response.text
defbatch_process_images(dir_path, result_file):
results = []
for filename in os.listdir(dir_path):
file_path = os.path.join(dir_path, filename)
ifnot os.path.isfile(file_path) ornot filename.lower().endswith((".jpg", ".jpeg", ".png")):
continue
print(f"\n处理文件: {file_path}")
file_id = upload_image(file_path, API_KEY)
upload_status = "成功"if file_id else"失败"
workflow_status = "失败"# 默认失败
result_info = ""
if file_id:
result, success, info = run_workflow(file_id, API_KEY)
workflow_status = "成功"if success else"失败"
result_info = json.dumps(info, ensure_ascii=False) if info else""
else:
result_info = "上传失败"
results.append({
"filename": filename,
"upload_status": upload_status,
"workflow_status": workflow_status,
"result_info": result_info
})
# 写入CSV文件
withopen(result_file, "w", newline='', encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=["filename", "upload_status", "workflow_status", "result_info"])
writer.writeheader()
for row in results:
writer.writerow(row)
print(f"\n所有结果已写入 {result_file}")
if __name__ == "__main__":
batch_process_images(DIR_PATH, RESULT_FILE)
结果会记录到本地result.csv文件中。记录文件上传的状态、工作流执行的结果、工作流输出的结果。
截取的部分飞书多维表格的数据
经过这一系列操作,成功打造了一个高效的自动化工具。
现在,只需要将发票的PDF文件放入指定文件夹,运行两个脚本,所有发票的数据就能自动、准确地归档到飞书多维表格中,甚至可以直接利用表格生成各种维度的统计图表。
看到数据唰唰地自动写入表格,我心里总会涌起一种特别的开心。
看看下个阶段是不是优化一下这个流程,比如Python程序做成GUI界面,或者把记录到飞书表格换成写到本地Excel表格。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业
2025-06-25
2025-06-04
2025-04-18
2025-04-28
2025-05-08
2025-06-03
2025-05-08
2025-04-14
2025-04-14
2025-04-15
2025-06-26
2025-06-17
2025-05-29
2025-05-28
2025-05-22
2025-04-27
2025-04-15
2025-03-20