微信扫码
添加专属顾问
原先经常调用各家的闭源大模型的API,如果使用同步的方式调用,速度会很慢。为了加快 API 的调用速度,决定使用异步调用 API 的方式。
本文编写的代码,支持原生的 llamafactory 的数据集导入方式。
推理速度远远快于同步的 API 调用方式。基于 langchain_openai.ChatOpenAI 的 invoke 方法实现异步调用。
下述代码的主要工作介绍如下:
AsyncAPICall,限制API的调用速度,逐块推理,避免程序崩溃导致所有数据丢失;async_call_api.py
# pip install langchain langchain_openai
import os
import sys
import json
import asyncio
import fire
from tqdm import tqdm
from dataclasses import dataclass
from aiolimiter import AsyncLimiter
from typing import List
import pandas as pd
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
from llamafactory.hparams import get_train_args
from llamafactory.extras.constants import IGNORE_INDEX
from llamafactory.data.loader import _get_merged_dataset
load_dotenv()
class AsyncLLM:
def __init__(
self,
model: str = "gpt-3.5-turbo",
base_url: str = "http://localhost:{}/v1/".format(
os.environ.get("API_PORT", 8000)
),
api_key: str = "{}".format(os.environ.get("API_KEY", "0")),
num_per_second: int = 6,
**kwargs,
):
self.model = model
self.base_url = base_url
self.api_key = api_key
self.num_per_second = num_per_second
self.limiter = AsyncLimiter(self.num_per_second, 1)
self.llm = ChatOpenAI(
model=self.model, base_url=self.base_url, api_key=self.api_key, **kwargs
)
async def __call__(self, text):
# 限速
async with self.limiter:
return await self.llm.ainvoke([text])
llm = AsyncLLM(
base_url="http://localhost:{}/v1/".format(os.environ.get("API_PORT", 8000)),
api_key="{}".format(os.environ.get("API_KEY", "0")),
num_per_second=10,
)
llms = [llm]
@dataclass
class AsyncAPICall:
uid: str = "0"
@staticmethod
async def _run_task_with_progress(task, pbar):
result = await task
pbar.update(1)
return result
@staticmethod
def async_run(
llms: List[AsyncLLM],
data: List[str],
keyword: str = "",
output_dir: str = "output",
chunk_size=500,
) -> List[str]:
async def infer_chunk(llms: List[AsyncLLM], data: List):
results = [llms[i % len(llms)](text) for i, text in enumerate(data)]
with tqdm(total=len(results)) as pbar:
results = await asyncio.gather(
*[
AsyncAPICall._run_task_with_progress(task, pbar)
for task in results
]
)
return results
idx = 0
all_df = []
file_exist_skip = False
user_confirm = False
while idx < len(data):
file_path = os.path.join(output_dir, "tmp", f"{idx}.csv.temp")
if os.path.exists(file_path):
if not user_confirm:
while True:
user_response = input(
f"Find {file_path} file already exists. Do you want to skip them forever?\ny or Y to skip, n or N to rerun to overwrite: "
)
if user_response.lower() == "y":
user_confirm = True
file_exist_skip = True
break
elif user_response.lower() == "n":
user_confirm = True
file_exist_skip = False
break
if file_exist_skip:
tmp_df = pd.read_csv(file_path)
all_df.append(tmp_df)
idx += chunk_size
continue
tmp_data = data[idx : idx + chunk_size]
loop = asyncio.get_event_loop()
tmp_result = loop.run_until_complete(infer_chunk(llms=llms, data=tmp_data))
tmp_result = [item.content for item in tmp_result]
tmp_df = pd.DataFrame({"infer": tmp_result})
if not os.path.exists(p := os.path.dirname(file_path)):
os.makedirs(p, exist_ok=True)
tmp_df.to_csv(file_path, index=False)
all_df.append(tmp_df)
idx += chunk_size
all_df = pd.concat(all_df)
return all_df["infer"]
def async_api_infer(
model_name_or_path: str = "",
eval_dataset: str = "",
template: str = "",
dataset_dir: str = "data",
do_predict: bool = True,
predict_with_generate: bool = True,
max_samples: int = None,
output_dir: str = "output",
chunk_size=50,
):
if len(sys.argv) == 1:
model_args, data_args, training_args, finetuning_args, generating_args = (
get_train_args(
dict(
model_name_or_path=model_name_or_path,
dataset_dir=dataset_dir,
eval_dataset=eval_dataset,
template=template,
output_dir=output_dir,
do_predict=True,
predict_with_generate=True,
max_samples=max_samples,
)
)
)
else:
model_args, data_args, training_args, finetuning_args, generating_args = (
get_train_args()
)
dataset = _get_merged_dataset(
data_args.eval_dataset, model_args, data_args, training_args, "sft"
)
labels = [item[0]["content"] for item in dataset["_response"]]
prompts = [item[0]["content"] for item in dataset["_prompt"]]
infers = AsyncAPICall.async_run(
llms,
prompts,
chunk_size=chunk_size,
output_dir=training_args.output_dir,
)
if not os.path.exists(training_args.output_dir):
os.makedirs(training_args.output_dir, exist_ok=True)
output_prediction_file = os.path.join(
training_args.output_dir, "generated_predictions.jsonl"
)
with open(output_prediction_file, "w", encoding="utf-8") as writer:
res: List[str] = []
for text, pred, label in zip(prompts, infers, labels):
res.append(
json.dumps(
{"prompt": text, "predict": pred, "label": label},
ensure_ascii=False,
)
)
writer.write("\n".join(res))
if __name__ == "__main__":
fire.Fire(async_api_infer)
上述代码支持 yaml 文件:
async.yaml
## model
model_name_or_path: qwen/Qwen2.5-7B-Instruct
### method
do_predict: true
### dataset
dataset_dir: ../data/
eval_dataset: alpaca_zh_demo
template: qwen
### output
output_dir: output
# overwrite_output_dir: true
### eval
predict_with_generate: true
max_samples: 100
使用下述代码运行:
python async_call_api.py async.yaml
异步调用的API推理结果:
{"prompt": "识别并解释给定列表中的两个科学理论:细胞理论和日心说。", "predict": "细胞理论和日心说是两个重要的科学理论,分别属于生物学和天文学领域。\n\n1. 细胞理论:细胞理论是生物学中的一个基本理论,它描述了所有生命体的基本结构和功能单位——细胞。细胞理论包含以下三个主要观点:\n\n a. 所有生物体都是由一个或多个细胞组成的。\n b. 细胞是生命活动的基本单位,负责维持生物体的生命活动。\n c. 新细胞只能由已存在的细胞分裂产生。\n\n 细胞理论的提出和发展对于生物学研究具有重要意义,它揭示了生物体的基本结构和功能单位,为生物学研究提供了理论基础。\n\n2. 日心说:日心说是一种关于太阳系结构的理论,它描述了太阳在太阳系中的中心位置,而地球和其他行星则围绕太阳旋转。日心说最初由古希腊天文学家阿里斯塔克提出,但直到16世纪,哥白尼重新提出这一理论,并得到了进一步的发展和完善。日心说的提出颠覆了传统的地心说观念,为现代天文学的发展奠定了基础。\n\n这两个科学理论分别在生物学和天文学领域中具有重要的意义,它们揭示了生命体和宇宙的基本结构和运行规律。", "label": "细胞理论是生物科学的一个理论,它认为所有生命体都是由微小的基本单元——细胞所构成。这是生物学的一个基础理论,认为细胞是所有生物的基本结构和功能单位,所有的生物都是由一个或多个细胞组成,细胞只能通过细胞分裂产生新的细胞。这一理论由薛定谔、施瓦内和雪莱于1839年首次提出。\n\n日心说是指太阳是太阳系的中心,也就是说,行星围绕太阳旋转的理论。这个理论打破了传统的地心说观点,认为地球并不是宇宙的中心。日心说的提出者是尼古拉·哥白尼,他在16世纪初发表了他的著作《天体运行论》,阐述了太阳系行星围绕太阳运行的模型,为天文学的发展做出了巨大贡献。"},...,
}
在输出结果中, predict 是大模型的推理结果。方便大家对比 predict 和 label,并评估大模型推理的精度。
为了避免大模型中途程序崩溃,把原始数据分块进行推理。这样即使程序中途崩溃,也能基于之前保存的分快数据继续推理,而不用重新开始推理。
下面是两个异步调用的进度条:
100%|██████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [00:22<00:00, 2.27it/s]
100%|██████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [00:22<00:00, 2.22it/s]
上述异步实验总共数据为100条数据,分块大小为50,故有2个进度条。100条速度44秒全部处理完成,平均处理速度 每秒处理2.2条数据。
同步调用 LLM api 的代码很简单,如下所示:
infers = []
for prompt in tqdm(prompts):
infers.append(llm.llm.invoke(prompt))
下面是同步调用的进度条:
100%|████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [06:54<00:00, 4.15s/it]
如果使用同步调用,100条数据,总共耗时 6分54秒,平均每条耗时4.15秒。
对比之下,异步调用比同步调用快了大约 9.41 倍。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2026-07-04
Cursor 如何把 AI 部署进企业内部
2026-07-04
字节跳动CEO梁汝波最新万字分享深度拆解:这可能是2026年最重要的一堂管理课
2026-07-03
开发者转向 AI 应用工程,真正要迁移的是工程判断力
2026-07-02
不改一行代码,看透 AI Agent 的每一次调用
2026-07-02
AI 不缺智商缺纪律:一场 Harness 工程化实践
2026-07-02
天工 3.2 重磅升级:Skywork Tags 上线,给 Agent 一张工牌,邀其加入你的工作群聊
2026-07-02
Context Infra 会是 AI 领域的下一个热点
2026-07-01
一文了解|SkillScan 智能体技能安全扫描最佳实践
2026-04-15
2026-04-07
2026-04-07
2026-04-24
2026-04-17
2026-04-14
2026-04-24
2026-04-22
2026-05-19
2026-04-24
欢迎您使用【53AI 官方网站】(以下简称“本网站”或“我们”)。本《会员服务协议》(以下简称“本协议”)是您(以下简称“会员”或“用户”)与【深圳市博思协创网络科技有限公司】之间关于注册、登录及使用本网站会员服务所订立的法律协议。
在您注册或登录前,请务必审慎阅读、充分理解各条款内容,特别是免除或限制责任的条款、知识产权条款、争议解决条款等。此类条款将以加粗形式提示您注意。 当您通过微信公众号授权、手机验证码验证或其他方式成功登录本网站时,即视为您已完全理解并同意接受本协议的全部内容。
一、 定义
本网站:指由【深圳市博思协创网络科技有限公司】运营的,域名为【53ai.com】的网站及相关移动端页面。
会员服务:指本网站向注册会员提供的知识库文章查阅、内容检索及其他相关增值服务。
知识库内容:指本网站发布的包括但不限于文字、图表、数据、研究报告、行业分析等数字化内容资源。
二、 账号注册与登录
登录方式:本网站支持以下登录方式,您可根据实际情况选择:
微信公众号授权登录:您同意将您的微信OpenID信息授权给本网站,用于创建或关联会员账号。
手机验证码登录:您需提供真实有效的手机号码,并通过短信验证码完成身份验证与登录/注册。
账号安全:您的账号仅限您本人使用,禁止赠与、借用、租用、转让或售卖。因您保管不善导致的账号被盗、密码泄露等损失,由您自行承担。
实名认证:根据相关法律法规要求,我们可能要求您在特定功能下完成实名认证。如您拒绝提供,可能无法使用部分或全部服务。
未成年人保护:若您未满18周岁,请在法定监护人的陪同下阅读本协议,并在征得监护人同意后使用本服务。
三、 服务内容与规范
知识库查阅权限:会员登录后,有权按照其会员等级对应的权限范围,在线浏览、检索本网站知识库中的相关文章及内容。
服务变更:我们有权根据业务发展需要,调整、变更或终止部分服务内容,并将以网站公告、公众号消息等方式提前通知。
禁止行为:您在使用服务时不得实施以下行为:
利用技术手段批量爬取、下载、转存知识库内容;
将知识库内容用于商业目的或未经授权地向第三方传播;
干扰本网站正常运行或侵犯其他用户合法权益;
发布违法违规信息或从事违反公序良俗的活动。
四、 知识产权声明
权利归属:本网站知识库中的排版设计、软件代码等内容的知识产权均归【公司全称】或原权利人所有,受《中华人民共和国著作权法》等法律保护。
有限许可:本网站授予会员一项非独占、不可转让、不可转授权的普通许可,仅限于个人学习、研究之目的在线查阅知识库内容。
侵权追责:未经书面许可,任何单位或个人不得以任何形式复制、转载、摘编、镜像、汇编或以其他方式使用上述内容。一经发现,我们保留追究其法律责任的权利。
五、 个人信息保护
我们重视对您个人信息的保护。关于我们如何收集、使用、存储和保护您的个人信息,请单独阅读 《隐私政策》。
您通过微信公众号授权或手机号验证所提供的信息,我们将严格按照《个人信息保护法》的规定处理,仅用于身份识别、服务提供及安全验证等必要用途。
您可以随时通过网站设置或联系客服行使查阅、更正、删除个人信息及撤回授权同意的权利。
六、 免责声明
内容准确性:知识库内容仅供参考,不构成专业建议。我们不对其完整性、准确性、时效性作任何明示或暗示的保证,您应自行判断并承担使用风险。
不可抗力:因自然灾害、政策法规变化、网络故障、第三方平台接口异常(如微信接口维护、运营商短信通道故障)等不可抗力导致的服务中断或延迟,我们不承担违约责任。
第三方链接:本网站可能包含指向第三方网站的链接,该等网站的内容和服务不受我们控制,请您自行甄别风险。
七、 违约责任
如您违反本协议约定,我们有权视情节采取警告、限制功能、暂停服务、注销账号等措施,并保留要求赔偿损失的权利。
如因您的违约行为导致我们遭受行政处罚、第三方索赔或商誉损失,您应承担全部赔偿责任(包括但不限于罚款、赔偿金、律师费、公证费等)。
八、 法律适用与争议解决
本协议的订立、执行和解释均适用中华人民共和国大陆地区法律。
因本协议产生的或与本协议有关的任何争议,双方应友好协商解决;协商不成的,任何一方均可向【公司所在地】有管辖权的人民法院提起诉讼。
九、 其他
本协议构成双方就本服务达成的完整协议,取代此前任何口头或书面约定。
本协议任一条款被认定为无效或不可执行的,不影响其他条款的效力。
我们对本协议享有最终解释权,并在法律允许的范围内保留随时修改的权利。修改后的协议一经公布即生效,继续使用服务即视为同意修订内容。