支持私有化部署
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


我用Dify打造了批量发票识别助手,一键识别上百张发票,这才是AI识别发票的正确方式

发布日期:2025-07-10 09:28:18 浏览次数: 1564
作者:荣姐聊AI

微信搜一搜,关注“荣姐聊AI”

推荐语

告别手动录入发票的烦恼,用Dify打造批量识别神器,轻松处理上百张发票!

核心内容:
1. 发票批量识别的痛点与市场需求
2. 基于Dify搭建自动化流程的技术方案
3. 从PDF转换到AI识别的完整实现代码

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

相信我们每个人,多少都和发票打过交道,有不少人都体验过报销时整理票据的烦恼。

如果你是开公司的小伙伴,还没有专职财务,那每月处理发票的滋味,就更是一言难尽了。

当然,如果只是一两张发票,现在随便一个AI模型都能帮你搞定。但真正的考验是,当几十上百张发票堆在面前,并且每一个数据都要求精准无误时,该怎么办?

尤其对于小微企业和代理记账公司来说,这几乎是每月固定的体力活。

当用AI处理掉这些琐碎事情的时候,真的挺有成就感的,不管是自己的需求还是帮朋友解决的问题,都能让我们暂时喘口气。

今天,我就想完整分享一下,我是如何用dify来搭建一套批量处理发票的自动化流程,彻底告别这种烦恼。

下图是在官方税务查询的界面,但是数据还是不够明细。

Image

1 准备工作

在正式搭建工作流之前,需要完成两项准备。

1.1 发票PDF文件转图片

先聊一下为什么要转成图片吧,开始用文档提取器识别发票的PDF文件,但是识别的效果不好,尤其是商品详情那块,格式有点乱,识别的结果总是串行。

所以最后决定用阿里千问的多模型来做发票识别,把本地发票的PDF文件统一转换成图片进行输出。

Image

代码中需要用到一个开源工具poppler:

https://github.com/oschwartz10612/poppler-windows/releases

转换代码:pdf_to_images.py

from pdf2image import convert_from_path
import os

# 配置参数
input_folder = 'pdf'# 当前目录下的 pdf 文件夹
output_folder = 'output_images'# 图片输出文件夹
poppler_path = os.path.abspath('poppler-24.08.0/Library/bin')

ifnot os.path.exists(output_folder):
    os.makedirs(output_folder)

# 批量处理 pdf 文件夹下所有 PDF 文件
for filename in os.listdir(input_folder):
    if filename.lower().endswith('.pdf'):
        pdf_path = os.path.join(input_folder, filename)
        pdf_base_name = os.path.splitext(os.path.basename(pdf_path))[0]
        try:
            for i, page inenumerate(convert_from_path(pdf_path, poppler_path=poppler_path)):
                img_path = os.path.join(output_folder, f'{pdf_base_name}_page_{i+1}.png')
                page.save(img_path, 'PNG')
            print(f'{filename} 已成功转换为图片并保存到文件夹!')
        except Exception as e:
            print(f'转换 {filename} 时出错:{e}'

本地执行:

Image

转换后的目录:

Image

1.2 创建飞书多维表格

工作流最后是写入到飞书多维表格中,需要创建一个多维表格,记录url地址。

多维表格需要关联飞书应用,这块不会的参考我以前的文章:

《Dify接入飞书多维表格,5步搞定,全网最详细教程(建议收藏)》

我这里是提取了发票上能提取的所有信息

开票日期是日期类型,合计金额、合计税额、价税合计三个字段为数字类型,其他为文本类型。

2 工作流搭建

工作流就是上传图片->大模型分析数据->代码节点格式转换->写入飞书多维表格->输出结果。

Image

2.1 开始节点

开始节点增加一个upload参数,单文件,图片类型。

Image
Image

2.2 大模型发票识别节点

这是实现发票识别的核心节点。

  • 模型选择:我选用了支持多模态视觉理解的Qwen2.5-VL-72B-Instruct模型。
  • 视觉能力:必须打开该模型节点的“视觉”开关,并将开始节点的upload变量传入。
  • Prompt设计:为了让模型能稳定、准确地返回结构化数据,需要在提示词中明确定义AI的角色、任务,并给出了严格的JSON输出格式要求,包括处理项目列表、数据清洗规则等。
Image

提示词如下:

# 角色
你是一个专业的发票信息提取助手。

# 任务
你的任务是从图片中,精准地提取所有关键信息,并严格按照指定的 JSON 格式输出。请忽略文本中与发票无关的干扰项(如“下载次数”)。

# 输入变量
发票的原始文本内容将通过变量 `{{upload}}` 传入。

# 输出要求
1.**严格的JSON格式**:你的回答必须是一个完整的、格式正确的 JSON 对象,不能包含任何 JSON 之外的解释、注释或文字。
2.**数据清洗**
    * 对于金额和数量,只保留数字,去除货币符号(如 `¥`)。
    * 对于日期,统一输出为 `YYYY-MM-DD` 格式。
    * 对于税率,输出百分比字符串,例如 `"13%"`
    * 如果某一项信息在发票中不存在,请使用 `null` 或者空字符串 `""` 作为值。
3.**处理项目列表**:发票中的“项目名称”部分可能包含多个商品或服务,甚至可能包含折扣(金额为负数)。你需要将每一个项目都作为一个独立的对象,放入 `items` 数组中。

# JSON 输出格式

```json
{
  "invoice_title": "string",
  "invoice_number": "string",
  "issue_date": "string",
  "buyer_info": {
    "name": "string",
    "tax_id": "string"
  },
  "seller_info": {
    "name": "string",
    "tax_id": "string"
  },
  "items": [
    {
      "name": "string",
      "model": "string",
      "unit": "string",
      "quantity": "number | null",
      "unit_price": "number | null",
      "amount": "number",
      "tax_rate": "string",
      "tax_amount": "number"
    }
  ],
  "total_amount_exclusive_tax": "number",
  "total_tax_amount": "number",
  "total_amount_inclusive_tax": {
    "in_words": "string",
    "in_figures": "number"
  },
  "remarks": "string",
  "issuer": "string"
}

2.3 代码节点

LLM节点返回的JSON数据无法直接被飞书插件使用,代码节点就是把大模型分析的json数组转成飞书多维表格插件可以识别的格式。

这里需要注意两个问题:

1.飞书API要求的日期格式为毫秒级时间戳;

2.LLM的输出可能被Markdown代码块包裹。

Image

完整代码如下:

import json
import re
from datetime import datetime, timezone

defmain(output: str) -> dict:
    # 去除 markdown 代码块标记
    output = re.sub(r"^```[\w]*\s*""", output.strip(), flags=re.IGNORECASE)
    output = re.sub(r"```$""", output.strip())
    output = output.strip()

    data = json.loads(output)
    # 日期字段转为时间戳(毫秒)
    date_str = data.get("issue_date""").replace("/""-").replace(".""-").strip()
    # 转为时间戳(毫秒)
    if date_str:
        dt = datetime.strptime(date_str, "%Y-%m-%d")
        # 如果你需要时区,改为 dt.replace(tzinfo=timezone.utc)
        timestamp = int(dt.replace(tzinfo=timezone.utc).timestamp() * 1000)
    else:
        timestamp = None

    items_str = json.dumps(data["items"], ensure_ascii=False)
    record = {
        "发票抬头(invoice_title)": data.get("invoice_title"""),
        "发票号码(invoice_number)": data.get("invoice_number"""),
        "开票日期(issue_date)": timestamp,
        "购买方名称(buyer_info.name)": data["buyer_info"].get("name"""),
        "购买方税号(buyer_info.tax_id)": data["buyer_info"].get("tax_id"""),
        "销售方名称(seller_info.name)": data["seller_info"].get("name"""),
        "销售方税号(seller_info.tax_id)": data["seller_info"].get("tax_id"""),
        "商品详情(items)": items_str,
        "合计金额(total_amount_exclusive_tax)": data.get("total_amount_exclusive_tax"""),
        "合计税额(total_tax_amount)": data.get("total_tax_amount"""),
        "价税合计大写(total_amount_inclusive_tax.in_words)": data["total_amount_inclusive_tax"].get("in_words"""),
        "价税合计小写(total_amount_inclusive_tax.in_figures)": data["total_amount_inclusive_tax"].get("in_figures"""),
        "备注(remarks)": data.get("remarks"""),
        "开票人(issuer)": data.get("issuer""")
    }
    return {
        "result": json.dumps([record], ensure_ascii=False)
    }

2.4 飞书多维表格插件

填写两个参数。

  • app_token为之前创建的飞书多维表格的url地址。
  • 记录列表是代码节点转换的结果。

这里注意一下,飞书多维表格的日期类型,传入的是时间戳,所以代码节点这块做了一层转换的。

Image

2.5 结束节点

为了测试方便,返回LLM节点识别出来的发票数据。

Image

3 测试

3.1 工作流测试

上传一个发票图片进行测试。

Image

整体JSON结构如下:

```json
{
"invoice_title":"电子发票(普通发票)",
"invoice_number":"2511XXX120",
"issue_date":"2025-06-06",
"buyer_info":{
    "name":"中国XXX大学",
    "tax_id":"A1XXX9621"
},
"seller_info":{
    "name":"北京XXX限公司",
    "tax_id":"91XXXXX057T"
},
"items":[
    {
      "name":"*电子元件*RFID读卡器",
      "model":"FX600-B",
      "unit":"个",
      "quantity":1,
      "unit_price":116.831683168317,
      "amount":116.83,
      "tax_rate":"1%",
      "tax_amount":1.17
    },
    {
      "name":"*电子元件*NFC标签卡",
      "model":"M1",
      "unit":"个",
      "quantity":25,
      "unit_price":1.039603960396,
      "amount":25.99,
      "tax_rate":"1%",
      "tax_amount":0.26
    }
],
"total_amount_exclusive_tax":142.82,
"total_tax_amount":1.43,
"total_amount_inclusive_tax":{
    "in_words":"壹佰肆拾肆圆贰角伍分",
    "in_figures":144.25
},
"remarks":"",
"issuer":"XX"
}
```

飞书多维表格会多一行记录。代码工作流执行正常。

3.2 API接口批量执行测试

3.2.1 创建API密钥

Image

记住这个API秘钥:app-zT4UQvqqeLKfq0hX8sKDAOxj,后面脚本中会用到。

Image

3.2.2 编写脚本调用工作流API

批量处理本地发票文件,是转换好的图片文件。

这个脚本会遍历本地

output_images文件夹中的所有图片,对每张图片执行两个步骤:

  • 调用Dify的/v1/files/upload接口上传图片文件。
  • 使用返回的file_id,调用/v1/workflows/run接口来执行工作流。

参数类型记得是图片,与开始节点我们选择的类型一致。

batch_invoice_recognize.py

import os
import requests
import json
import csv

# 配置区
API_KEY = "app-zT4UQvqqeLKfq0hX8sKDAOxj"
USER = "rongjie"
DIFY_SERVER = "http://127.0.0.1"# 你的Dify服务器地址
DIR_PATH = "output_images"         # 需要批量处理的图片目录
RESULT_FILE = "result.csv"         # 结果保存文件

defupload_image(file_path, api_key):
    url = f'{DIFY_SERVER}/v1/files/upload'
    headers = {'Authorization'f'Bearer {api_key}'}
    ext = os.path.splitext(file_path)[-1].lower()
    mime = 'image/png'if ext == '.png'else'image/jpeg'
    withopen(file_path, 'rb'as file:
        files = {'file': (os.path.basename(file_path), file, mime)}
        data = {'user': USER}
        response = requests.post(url, headers=headers, files=files, data=data)
    if response.status_code == 201:
        print(f"[上传成功] {file_path}")
        resp = response.json()
        print("上传返回信息:", resp)
        return resp['id']
    else:
        print(f"[上传失败] {file_path},状态码: {response.status_code},返回: {response.text}")
        returnNone

defrun_workflow(file_id, api_key):
    url = f"{DIFY_SERVER}/v1/workflows/run"
    headers = {
        "Authorization"f"Bearer {api_key}",
        "Content-Type""application/json"
    }
    data = {
        "inputs": {
            "upload": {
                "type""image",
                "transfer_method""local_file",
                "upload_file_id": file_id
            }
        },
        "user": USER,
        "response_mode""blocking"
    }
    import pprint
    print("工作流调用参数:")
    pprint.pprint(data)
    response = requests.post(url, headers=headers, json=data)
    print("工作流返回内容:", response.status_code, response.text)
    if response.status_code == 200:
        resp_json = response.json()
        wf_status = resp_json.get("data", {}).get("status")
        if wf_status == "succeeded":
            outputs = resp_json.get("data", {}).get("outputs")
            print(f"[工作流执行成功] 文件ID: {file_id}")
            return resp_json, True, outputs
        else:
            error_info = resp_json.get("data", {}).get("error"or resp_json.get("message")
            print(f"[工作流内部失败] 文件ID: {file_id},状态: {wf_status}")
            return resp_json, False, error_info
    else:
        print(f"[工作流执行失败] 文件ID: {file_id},状态码: {response.status_code},返回: {response.text}")
        return {"status""error""message"f"Failed to execute workflow, status code: {response.status_code}"}, False, response.text

defbatch_process_images(dir_path, result_file):
    results = []
    for filename in os.listdir(dir_path):
        file_path = os.path.join(dir_path, filename)
        ifnot os.path.isfile(file_path) ornot filename.lower().endswith((".jpg"".jpeg"".png")):
            continue
        print(f"\n处理文件: {file_path}")
        file_id = upload_image(file_path, API_KEY)
        upload_status = "成功"if file_id else"失败"
        workflow_status = "失败"# 默认失败
        result_info = ""
        if file_id:
            result, success, info = run_workflow(file_id, API_KEY)
            workflow_status = "成功"if success else"失败"
            result_info = json.dumps(info, ensure_ascii=Falseif info else""
        else:
            result_info = "上传失败"
        results.append({
            "filename": filename,
            "upload_status": upload_status,
            "workflow_status": workflow_status,
            "result_info": result_info
        })
    # 写入CSV文件
    withopen(result_file, "w", newline='', encoding="utf-8-sig"as f:
        writer = csv.DictWriter(f, fieldnames=["filename""upload_status""workflow_status""result_info"])
        writer.writeheader()
        for row in results:
            writer.writerow(row)
    print(f"\n所有结果已写入 {result_file}")

if __name__ == "__main__":
    batch_process_images(DIR_PATH, RESULT_FILE) 

结果会记录到本地result.csv文件中。记录文件上传的状态、工作流执行的结果、工作流输出的结果。

Image

截取的部分飞书多维表格的数据

Image

经过这一系列操作,成功打造了一个高效的自动化工具。

现在,只需要将发票的PDF文件放入指定文件夹,运行两个脚本,所有发票的数据就能自动、准确地归档到飞书多维表格中,甚至可以直接利用表格生成各种维度的统计图表。

Image

看到数据唰唰地自动写入表格,我心里总会涌起一种特别的开心。

看看下个阶段是不是优化一下这个流程,比如Python程序做成GUI界面,或者把记录到飞书表格换成写到本地Excel表格。

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费场景POC验证,效果验证后签署服务协议。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询