微信扫码
添加专属顾问
我要投稿
火山引擎LAS团队揭秘:如何用Daft框架实现百万级文本Embedding处理,并保持GPU接近100%利用率。 核心内容: 1. 基于Daft框架的百万级文本向量化处理方案 2. Qwen3-Embedding模型实现GPU极致利用率的关键技术 3. 从数据读取到向量入库的完整高效处理流程
在计算密集型任务中,GPU作为核心算力资源,其利用率的高低对任务效率与资源成本有着直接影响。因此,工程师们在优化性能时,其中一个重要目标便是让GPU资源尽可能 “跑满”,实现近乎100%的利用率。尤其在大规模数据处理场景下,如何避免GPU空转、充分发挥其并行计算能力,往往成为优化的关键所在。
近期,我们在运用Qwen3-Embedding-0.6B模型对百万级文本数据进行向量化操作时,达成了这一目标:整个任务运行过程中,GPU 始终保持接近 100% 的使用率。这不仅显著缩短了数据处理耗时,也让我们对GPU资源的极致利用有了更具体的实践经验。接下来,我们将通过一个样例,分享一些心得体会。
另外,在撰写本文之际,我们还发现了另一种方法,能够将数据处理速度提升3倍,且该优化方法并非通过压榨剩余GPU资源实现。不过,这并非本文重点,我们将在后续分享这一内容。今天的目标主要还是借助一个Daft任务样例,分享实现GPU接近100%利用率的方法。
从S3读取百万级的文档数据。
运用spaCy将每个文档切分为多个语句。
采用开源最优的embedding模型Qwen3-Embedding-0.6B
为每一条语句生成对应的 Embedding向量。
将结果写入TurboPuffer向量数据库,以备后续业务检索。
在开展 Daft任务开发前,需创建一个虚拟环境,例如 virtualenv
、uv
、conda
等。以下以 uv
为例:
uv venv --seed --python 3.11source .venv/bin/activate
随后,需安装以下依赖包
pip install "daft[ray]" turbopuffer torch sentence-transformers spacy accelerate transformerspython -m spacy download en_core_web_sm
en_core_web_sm
这个spaCy 模型实现文档切分,使用intfloat/multilingual-e5-large-instruct
模型生成Embedding向量,当然也可将这些模型替换为其他开源模型或企业级模型。import torchimport daftfrom daft import colNUM_GPU_NODES = 8 # GPU nodes in your clusterNLP_MODEL_NAME = "en_core_web_sm" # spaCy model for sentence detectionCHUNKING_PARALLELISM = 8 # Parallel chunking processes per nodeEMBEDDING_MODEL_NAME = "Qwen/Qwen3-Embedding-0.6B" # Text embedding modelENCODING_DIM = 1024 # Embedding dimensionsBATCH_SIZE = 512 # Records per embedding batchSENTENCE_TRANSFORMER_BATCH_SIZE = 16 # GPU batch size for embeddings
语句级分块:适用于多数场景,尤其是文档结构不清晰时。
段落级分块:适用于RAG(检索增强生成)应用。
章节级分块:适用于结构清晰的长文档。
固定长度分块:此方式实现最为简便,灵活性高,但可能破坏语义边界,适用于自定义逻辑切分场景。
我们可依据具体的业务场景,按照不同粒度或策略进行切分,例如:
当文档结构未知,或文档内容类型多样时,优先采用语句级分块。
当计划构建RAG系统,需要跨语句上下文信息时,优先采用段落级分块。
当文本数据主要来自推文、短信、代码等非标准结构内容时,可考虑自定义分割。
在此样例中,我们采用语句级分块,借助自然语言处理库spaCy,相较于其他简单的基于标点分割的方法,它提供了更为健壮的语句边界检测能力,能够更好地处理边缘问题。
# Define the return type for chunked text
# Here we'll keep both the chunked text and the chunk ID which
# we'll later use for creating IDs for the sentences
chunked_type = daft.DataType.list(
daft.DataType.struct({
"text": daft.DataType.string(),
"chunk_id": daft.DataType.int32()
})
)
return_dtype=chunked_type,
concurrency=NUM_GPU_NODES * CHUNKING_PARALLELISM,
batch_size=BATCH_SIZE // CHUNKING_PARALLELISM // 2
)
class ChunkingUDF:
def __init__(self):
import spacy
self.nlp = spacy.load(NLP_MODEL_NAME)
def __call__(self, text_col):
results = []
for text in text_col:
doc = self.nlp(text)
sentence_texts = [
{"text": sentence.text, "chunk_id": i}
for i, sentence in enumerate(doc.sents)
]
results.append(sentence_texts)
return results
这里我们使用Daft提供的Class UDFs实现一个自定义ChunkingUDF
,来实现文本切分的逻辑,具体内容如下:
ChunkingUDF
实例初始化时,加载spaCy模型,每UDF实例仅加载一次模型。
通过批量处理的方式,处理文档数据(text_col
),减少调用开销。
返回结果包含切分后的文本列表,列表中每个元素包含文本内容和对应的chunk ID。
整个ChunkingUDF
支持分布式运行,该示例中会启动64个(NUM_GPU_NODES * CHUNKING_PARALLELISM
)实例并行的处理数据。
MTEB Leaderboard:可参考 MTEB排行榜,查看不同模型在不同任务上的排名。
特定任务类型性能:不同模型擅长的任务类型各异,如语义搜索、聚类等。可根据具体业务场景确定任务类型,进而选择该类型中的最优模型。
多语言支持:需考量业务场景是否需处理多语言文本。若仅需单语言,应更关注该语言对应的专项排行榜,而非多语言排行榜。
以下为一些主流模型及其特点:
Qwen3-Embedding-0.6B:整体性价比高,在该级别大小的开源模型中性能最优,也是本示例所使用的模型。
all-MiniLM-L6-v2:Sentence Transformer的文档默认使用的模型,通常用于教程。
gemini-embedding-001:目前MTEB上的顶级多语言模型,需要Gemini API访问。
Seed1.6-Embedding:目前中国MTEB排行榜的顶级模型,需要Volcengine API访问。
确认使用的模型后,可通过修改EMBEDDING_MODEL_NAME
变量的值轻松替换模型。
接下来,实现Embedding的逻辑:
# Define the return type for embeddingsembedding_type = daft.DataType.embedding(daft.DataType.float32(), ENCODING_DIM)@daft.udf( return_dtype=embedding_type, concurrency=NUM_GPU_NODES, num_gpus=1, batch_size=BATCH_SIZE)class EncodingUDF: def __init__(self): from sentence_transformers import SentenceTransformer device = 'cuda' if torch.cuda.is_available() else 'cpu' self.model = SentenceTransformer(EMBEDDING_MODEL_NAME, device=device) self.model.compile() def __call__(self, text_col): embeddings = self.model.encode( text_col.to_pylist(), batch_size=SENTENCE_TRANSFORMER_BATCH_SIZE, convert_to_tensor=True, torch_dtype=torch.bfloat16, ) return embeddings.cpu().numpy()
与ChunkingUDF
类似,我们实现一个自定义UDF(EncodingUDF
)来执行Embedding操作:
EncodingUDF
实例初始化时,判断是否存在GPU资源,若GPU资源可用,则将模型加载到 GPU 的内存中。
在生成文本Embedding向量时,使用bfloat16精度降低内存占用,同时通过批量处理文本的方式(SENTENCE_TRANSFORMER_BATCH_SIZE=128)优化GPU利用率。
最终将生成的Embedding向量以Numpy数组的形式返回。
在本示例中,我们在8个g5.2xlarge(每节点含 A10G GPU)的Ray集群上运行,配置如下:
# Configure Daft to use Ray to schedule work on different worker nodesdaft.context.set_runner_ray()# Configure S3 access for reading datadaft.set_planning_config( default_io_config=daft.io.IOConfig( s3=daft.io.S3Config.from_env() ))
步骤 4:执行完整流程
以下展示整个端到端的流程:
( daft.read_parquet("s3://desmond-demo/text-embedding-dataset.parquet") .with_column("sentences", ChunkingUDF(col("text"))) .explode("sentences") .with_column("text", col("sentences")["text"]) .with_column("chunk_id", col("sentences")["chunk_id"]) .exclude("sentences") .with_column("embedding", EncodingUDF(col("text"))) .with_column( "id", col("url").str.right(50) + "-" + col("chunk_id").cast(daft.DataType.string()) ) .select("id", "url", "language", "source", "text", "embedding") .write_turbopuffer( namespace="desmond-scale-experiment6", region="aws-us-west-2", id_column="id", vector_column="embedding", distance_metric="cosine_distance" ))
流程解析:
读取数据:首先从S3高效地加载Parquet文件,读取文档内容。
文本分块:使用ChunkingUDF
对text
列的文本数据进行语句切分,新增sentences
列存储切分后的数据。
展开列表:sentences
列每个元素的类型是list,通过explode
将list展开后,sentences
列中每个元素类型变为Struct,其中包含具体的文本内容和chunk ID。
提取字段:从sentences
列中提取文本和chunk ID,分别用text
和chunk_id
存储对应的内容,并从dataframe中移除sentences
列。
生成嵌入:使用EncodingUDF
为每个切分后的语句生成Embedding向量,将数据存储在embedding
列中。
创建唯一 ID:通过组合 url和chunk ID为每一个Embedding
向量生成唯一标识符。
筛选列:筛选出必要字段。
写入 TurboPuffer:将生成的Embedding
数据及关联数据写入向量数据库中。
当在集群上运行该脚本后,应能看到整个集群的网络 I/O、CPU/GPU资源被任务充分利用,特别是GPU负载持续保持在高水位,同时我们可以调整批处理大小,并发参数不断优化GPU资源使用率。
调整批大小:可以增大SENTENCE_TRANSFORMER_BATCH_SIZE
提升吞吐量,同时提升GPU内存使用率。
扩展节点:根据集群规模调整NUM_GPU_NODES
和CHUNKING_PARALLELISM
。
更换模型:可以修改EMBEDDING_MODEL_NAME
的值切换Sentence Transformers模型。
自定义分块:可以修改ChunkingUDF
实现其他分块策略。
替换向量库:除了将生成的Embedding
向量写到TurboPuffer之外,Daft还原生支持将Embedding
向量写入其他向量数据库,比如LanceDB。
GPU 内存:在任务运行过程中,最好结合监控系统,对GPU内存使用情况进行监控。若分配失败或超过模型最大序列长度,应减小SENTENCE_TRANSFORMER_BATCH_SIZE
。
模型加载:UDF在初始化时会加载一次模型,首次加载时,往往需从远程下载模型。在带宽受限的场景下,模型加载操作可能耗时较长。此时,可考虑提前将模型缓存到本地。在容器化场景下,可提前下载模型并挂载到容器中。
模型量化:针对精度要求不高的场景,可使用bfloat16/float16精度类型进行量化,降低内存占用,提升吞吐量。
下一代计划
通过当前示例,我们已将GPU利用率提升至接近100%的水平,这是众多工程师所追求的 “圣杯”。但Daft的探索脚步不会停歇。我们正在尝试新方法,通过自定义GPU流水线,以及用vLLM替代Sentence Transformers,使整体处理速度提升3倍。我们正在逐步完善这一方案,并将在后续博客继续分享如何突破 “峰值利用率” 的限制,大幅提升吞吐量。
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-08-20
微信开源的RAG黑科技:ComoRAG
2025-08-20
Sam Altman:“GPT-6 即将问世”
2025-08-20
Deepseek 重磅更新V3.1版本,上下文长度拓展至128K, 代码编程思维能力急剧上升
2025-08-20
亲测百度GenFlow 2.0:我的AI搭子能同时开100个线程帮我打工!
2025-08-20
DeepSeek 3.1 正式发布:为AI Agent时代重塑效率基准
2025-08-20
等了这么久,企业微信的AI终于来了!
2025-08-20
你的供应链还在“裸奔”吗?这份AI转型蓝图,AI产品经理看完都收藏
2025-08-20
解构1688 AI黑盒:从用户交互到技术实现,五大功能全链路拆解
2025-05-29
2025-05-23
2025-06-01
2025-06-21
2025-06-07
2025-06-12
2025-06-13
2025-06-19
2025-05-28
2025-07-29
2025-08-20
2025-08-19
2025-08-19
2025-08-18
2025-08-18
2025-08-18
2025-08-15
2025-08-14