免费POC,零成本试错

AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


基于Daft实现百万级文本Embedding

发布日期:2025-08-20 13:44:42 浏览次数: 1559
作者:Daft

微信搜一搜,关注“Daft”

推荐语

火山引擎LAS团队揭秘:如何用Daft框架实现百万级文本Embedding处理,并保持GPU接近100%利用率。

核心内容:
1. 基于Daft框架的百万级文本向量化处理方案
2. Qwen3-Embedding模型实现GPU极致利用率的关键技术
3. 从数据读取到向量入库的完整高效处理流程

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

在计算密集型任务中,GPU作为核心算力资源,其利用率的高低对任务效率与资源成本有着直接影响。因此,工程师们在优化性能时,其中一个重要目标便是让GPU资源尽可能 “跑满”,实现近乎100%的利用率。尤其在大规模数据处理场景下,如何避免GPU空转、充分发挥其并行计算能力,往往成为优化的关键所在。

近期,我们在运用Qwen3-Embedding-0.6B模型对百万级文本数据进行向量化操作时,达成了这一目标:整个任务运行过程中,GPU 始终保持接近 100% 的使用率。这不仅显著缩短了数据处理耗时,也让我们对GPU资源的极致利用有了更具体的实践经验。接下来,我们将通过一个样例,分享一些心得体会。

另外,在撰写本文之际,我们还发现了另一种方法,能够将数据处理速度提升3倍,且该优化方法并非通过压榨剩余GPU资源实现。不过,这并非本文重点,我们将在后续分享这一内容。今天的目标主要还是借助一个Daft任务样例,分享实现GPU接近100%利用率的方法。


Daft任务描述


在Daft的日常开发过程中,我们留意到很多用户存在这样的需求:“期望借助一款高效、便捷且使用门槛低的工具,将海量文档数据存入向量数据库,以支撑下游业务的检索需求”。针对此业务场景,我们基于Daft数据处理框架设计了一个 作业样例,用以展示Daft如何快速高效地解决这类业务问题。整个样例涵盖以下步骤:
  • 从S3读取百万级的文档数据。

  • 运用spaCy将每个文档切分为多个语句。

  • 采用开源最优的embedding模型Qwen3-Embedding-0.6B为每一条语句生成对应的 Embedding向量。

  • 将结果写入TurboPuffer向量数据库,以备后续业务检索。



准备工作


在开展 Daft任务开发前,需创建一个虚拟环境,例如 virtualenvuvconda 等。以下以 uv 为例:

uv venv --seed --python 3.11source .venv/bin/activate

随后,需安装以下依赖包

pip install "daft[ray]" turbopuffer torch sentence-transformers spacy accelerate transformerspython -m spacy download en_core_web_sm


导入依赖并配置常量


接着开始编写作业代码,首先引入必要的依赖,同时设置一些工作负载参数。用户可依据集群规模、数据大小、GPU 可用资源对上述参数进行调整,以提升处理效率。例如,使用en_core_web_sm这个spaCy 模型实现文档切分,使用intfloat/multilingual-e5-large-instruct模型生成Embedding向量,当然也可将这些模型替换为其他开源模型或企业级模型。
import torchimport daftfrom daft import colNUM_GPU_NODES = 8                     # GPU nodes in your clusterNLP_MODEL_NAME = "en_core_web_sm"     # spaCy model for sentence detectionCHUNKING_PARALLELISM = 8              # Parallel chunking processes per nodeEMBEDDING_MODEL_NAME = "Qwen/Qwen3-Embedding-0.6B"  # Text embedding modelENCODING_DIM = 1024                   # Embedding dimensionsBATCH_SIZE = 512                      # Records per embedding batchSENTENCE_TRANSFORMER_BATCH_SIZE = 16  # GPU batch size for embeddings


步骤 1:文本切块


在为文本数据生成embedding向量之前,通常需要先对大文本进行切分,再作处理。在我们的业务场景中,文本数据主要源自文档,文档一般具有层级结构,如文档→章节→段落→句子→单词→字符,分别有不同的切分策略,常见的切分策略包括:
  • 语句级分块:适用于多数场景,尤其是文档结构不清晰时。

  • 段落级分块:适用于RAG(检索增强生成)应用。

  • 章节级分块:适用于结构清晰的长文档。

  • 固定长度分块:此方式实现最为简便,灵活性高,但可能破坏语义边界,适用于自定义逻辑切分场景。

我们可依据具体的业务场景,按照不同粒度或策略进行切分,例如:

  • 当文档结构未知,或文档内容类型多样时,优先采用语句级分块

  • 当计划构建RAG系统,需要跨语句上下文信息时,优先采用段落级分块

  • 当文本数据主要来自推文、短信、代码等非标准结构内容时,可考虑自定义分割

在此样例中,我们采用语句级分块,借助自然语言处理库spaCy,相较于其他简单的基于标点分割的方法,它提供了更为健壮的语句边界检测能力,能够更好地处理边缘问题。

# Define the return type for chunked text# Here we'll keep both the chunked text and the chunk ID which# we'll later use for creating IDs for the sentenceschunked_type = daft.DataType.list(    daft.DataType.struct({        "text": daft.DataType.string(),        "chunk_id": daft.DataType.int32()    }))@daft.udf(    return_dtype=chunked_type,    concurrency=NUM_GPU_NODES * CHUNKING_PARALLELISM,    batch_size=BATCH_SIZE // CHUNKING_PARALLELISM // 2)class ChunkingUDF:    def __init__(self):        import spacy        self.nlp = spacy.load(NLP_MODEL_NAME)
    def __call__(self, text_col):        results = []        for text in text_col:            doc = self.nlp(text)            sentence_texts = [                {"text": sentence.text, "chunk_id": i}                for i, sentence in enumerate(doc.sents)            ]            results.append(sentence_texts)        return results

这里我们使用Daft提供的Class UDFs实现一个自定义ChunkingUDF,来实现文本切分的逻辑,具体内容如下:

  • ChunkingUDF实例初始化时,加载spaCy模型,每UDF实例仅加载一次模型。

  • 通过批量处理的方式,处理文档数据(text_col),减少调用开销。

  • 返回结果包含切分后的文本列表,列表中每个元素包含文本内容和对应的chunk ID。

  • 整个ChunkingUDF支持分布式运行,该示例中会启动64个(NUM_GPU_NODES * CHUNKING_PARALLELISM)实例并行的处理数据。


步骤 2:GPU加速Embedding向量生成


Embedding向量的质量与模型选择紧密相关。在选择具体模型时,可考虑以下因素:
  • MTEB Leaderboard:可参考 MTEB排行榜,查看不同模型在不同任务上的排名。

  • 特定任务类型性能:不同模型擅长的任务类型各异,如语义搜索、聚类等。可根据具体业务场景确定任务类型,进而选择该类型中的最优模型。

  • 多语言支持:需考量业务场景是否需处理多语言文本。若仅需单语言,应更关注该语言对应的专项排行榜,而非多语言排行榜。

以下为一些主流模型及其特点:

  • Qwen3-Embedding-0.6B:整体性价比高,在该级别大小的开源模型中性能最优,也是本示例所使用的模型。

  • all-MiniLM-L6-v2:Sentence Transformer的文档默认使用的模型,通常用于教程。

  • gemini-embedding-001:目前MTEB上的顶级多语言模型,需要Gemini API访问。

  • Seed1.6-Embedding:目前中国MTEB排行榜的顶级模型,需要Volcengine API访问。

确认使用的模型后,可通过修改EMBEDDING_MODEL_NAME变量的值轻松替换模型。

接下来,实现Embedding的逻辑:

# Define the return type for embeddingsembedding_type = daft.DataType.embedding(daft.DataType.float32(), ENCODING_DIM)@daft.udf(    return_dtype=embedding_type,    concurrency=NUM_GPU_NODES,    num_gpus=1,    batch_size=BATCH_SIZE)class EncodingUDF:    def __init__(self):        from sentence_transformers import SentenceTransformer        device = 'cuda' if torch.cuda.is_available() else 'cpu'        self.model = SentenceTransformer(EMBEDDING_MODEL_NAME, device=device)        self.model.compile()    def __call__(self, text_col):        embeddings = self.model.encode(            text_col.to_pylist(),            batch_size=SENTENCE_TRANSFORMER_BATCH_SIZE,            convert_to_tensor=True,            torch_dtype=torch.bfloat16,        )        return embeddings.cpu().numpy()

ChunkingUDF类似,我们实现一个自定义UDF(EncodingUDF)来执行Embedding操作:

  • EncodingUDF实例初始化时,判断是否存在GPU资源,若GPU资源可用,则将模型加载到 GPU 的内存中。

  • 在生成文本Embedding向量时,使用bfloat16精度降低内存占用,同时通过批量处理文本的方式(SENTENCE_TRANSFORMER_BATCH_SIZE=128)优化GPU利用率。

  • 最终将生成的Embedding向量以Numpy数组的形式返回。


步骤 3:配置分布式处理


完成步骤 2 后,整个Embedding的业务处理逻辑就完成了。此时,可先在本地环境基于小批量数据进行测试。当处理大规模数据时,通常需将任务提交到分布式集群上执行,详细内容可参考 扩展指南。

在本示例中,我们在8个g5.2xlarge(每节点含 A10G GPU)的Ray集群上运行,配置如下:

# Configure Daft to use Ray to schedule work on different worker nodesdaft.context.set_runner_ray()# Configure S3 access for reading datadaft.set_planning_config(    default_io_config=daft.io.IOConfig(        s3=daft.io.S3Config.from_env()    ))


步骤 4:执行完整流程

以下展示整个端到端的流程:

(    daft.read_parquet("s3://desmond-demo/text-embedding-dataset.parquet")    .with_column("sentences", ChunkingUDF(col("text")))    .explode("sentences")    .with_column("text", col("sentences")["text"])    .with_column("chunk_id", col("sentences")["chunk_id"])    .exclude("sentences")    .with_column("embedding", EncodingUDF(col("text")))    .with_column(        "id",        col("url").str.right(50) + "-" + col("chunk_id").cast(daft.DataType.string())    )    .select("id", "url", "language", "source", "text", "embedding")    .write_turbopuffer(        namespace="desmond-scale-experiment6",        region="aws-us-west-2",        id_column="id",        vector_column="embedding",        distance_metric="cosine_distance"    ))

流程解析:

  • 读取数据:首先从S3高效地加载Parquet文件,读取文档内容。

  • 文本分块:使用ChunkingUDFtext列的文本数据进行语句切分,新增sentences列存储切分后的数据。

  • 展开列表sentences列每个元素的类型是list,通过explode将list展开后,sentences列中每个元素类型变为Struct,其中包含具体的文本内容和chunk ID。

  • 提取字段:从sentences列中提取文本和chunk ID,分别用textchunk_id存储对应的内容,并从dataframe中移除sentences列。

  • 生成嵌入:使用EncodingUDF为每个切分后的语句生成Embedding向量,将数据存储在embedding列中。

  • 创建唯一 ID:通过组合 url和chunk ID为每一个Embedding向量生成唯一标识符。

  • 筛选列:筛选出必要字段。

  • 写入 TurboPuffer:将生成的Embedding数据及关联数据写入向量数据库中。

当在集群上运行该脚本后,应能看到整个集群的网络 I/O、CPU/GPU资源被任务充分利用,特别是GPU负载持续保持在高水位,同时我们可以调整批处理大小,并发参数不断优化GPU资源使用率。


自定义处理逻辑


我们可依据具体业务场景对本示例内容进行调整,包括:
  • 调整批大小:可以增大SENTENCE_TRANSFORMER_BATCH_SIZE提升吞吐量,同时提升GPU内存使用率。

  • 扩展节点:根据集群规模调整NUM_GPU_NODESCHUNKING_PARALLELISM

  • 更换模型:可以修改EMBEDDING_MODEL_NAME的值切换Sentence Transformers模型。

  • 自定义分块:可以修改ChunkingUDF实现其他分块策略。

  • 替换向量库:除了将生成的Embedding向量写到TurboPuffer之外,Daft还原生支持将Embedding向量写入其他向量数据库,比如LanceDB。


性能优化实践


除自定义处理逻辑外,还可对相关参数进行精细化调优,以实现高效处理百万级文本的目标。以下为几点实践建议:
  • GPU 内存:在任务运行过程中,最好结合监控系统,对GPU内存使用情况进行监控。若分配失败或超过模型最大序列长度,应减小SENTENCE_TRANSFORMER_BATCH_SIZE

  • 模型加载:UDF在初始化时会加载一次模型,首次加载时,往往需从远程下载模型。在带宽受限的场景下,模型加载操作可能耗时较长。此时,可考虑提前将模型缓存到本地。在容器化场景下,可提前下载模型并挂载到容器中。

  • 模型量化:针对精度要求不高的场景,可使用bfloat16/float16精度类型进行量化,降低内存占用,提升吞吐量。


下一代计划

通过当前示例,我们已将GPU利用率提升至接近100%的水平,这是众多工程师所追求的 “圣杯”。但Daft的探索脚步不会停歇。我们正在尝试新方法,通过自定义GPU流水线,以及用vLLM替代Sentence Transformers,使整体处理速度提升3倍。我们正在逐步完善这一方案,并将在后续博客继续分享如何突破 “峰值利用率” 的限制,大幅提升吞吐量。

53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询