微信扫码
添加专属顾问
我要投稿
深入解析Milvus查询引擎的工作原理,从SQL到向量检索的完整执行流程一网打尽。核心内容: 1. Milvus支持的三种主要查询类型及其实现方式 2. 查询请求在分布式架构中的完整执行流程 3. 向量检索与标量查询的协同工作机制
“一条查询请求如何在 Milvus 中执行?本文将深入查询引擎的内部,揭示向量检索的完整流程。
全文约 6000 字,建议阅读时间 15 分钟
Milvus 支持三种主要查询类型:
from pymilvus import Collection
collection = Collection("documents")
# 1. 向量搜索(ANN Search)
results = collection.search(
data=[[0.1] * 768], # 查询向量
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"ef": 64}},
limit=10
)
# 2. 标量查询(Query)
results = collection.query(
expr="score > 0.8 and category == 'tech'",
output_fields=["id", "text", "score"]
)
# 3. 混合查询(Hybrid Search)
results = collection.hybrid_search(
reqs=[
# 向量搜索
AnnSearchRequest(
data=[[0.1] * 768],
anns_field="dense_vector",
param={"metric_type": "COSINE"},
limit=10
),
# 全文检索
AnnSearchRequest(
data=[[0.2] * 1000],
anns_field="sparse_vector",
param={"metric_type": "IP"},
limit=10
)
],
rerank=WeightedRanker(0.7, 0.3), # 重排序
limit=10
)
查询执行流程:
┌─────────┐
│ Client │ 发起查询
└────┬────┘
│
┌────▼────┐
│ Proxy │ 1. 解析请求
└────┬────┘ 2. 查询路由
│ 3. 负载均衡
│
├──────────────────┬──────────────────┐
│ │ │
┌────▼────┐ ┌───▼────┐ ┌───▼────┐
│QueryNode│ │QueryNode│ │QueryNode│
│ Node1 │ │ Node2 │ │ Node3 │
└────┬────┘ └───┬────┘ └───┬────┘
│ │ │
│ 1. 标量过滤 │ │
│ 2. 向量检索 │ │
│ 3. 返回 TopK │ │
│ │ │
└─────────────────┴──────────────────┘
│
┌──────────────────────▼────┐
│ Proxy │
│ 1. 合并结果 │
│ 2. 全局排序 │
│ 3. 返回最终 TopK │
└────────────────────────────┘
Milvus 支持类 SQL 的表达式语法:
# 基础表达式
expr = "id in [1, 2, 3]"
expr = "score > 0.8"
expr = "category == 'tech'"
# 复合表达式
expr = "score > 0.8 and category == 'tech'"
expr = "(score > 0.8 or views > 1000) and category != 'spam'"
# 数组操作
expr = "tags array_contains 'AI'"
expr = "tags array_contains_any ['AI', 'ML']"
expr = "tags array_contains_all ['AI', 'ML', 'DL']"
# JSON 字段
expr = "metadata['author'] == 'Alice'"
expr = "metadata['tags'][0] == 'featured'"
# 字符串操作
expr = "text like 'Milvus%'"# 前缀匹配
expr = "text like '%database%'"# 包含匹配
表达式 AST(抽象语法树):
// 表达式解析器
type ExprParser struct {
lexer *Lexer
tokens []Token
}
// 解析表达式
func (p *ExprParser) Parse(expr string) (*ExprNode, error) {
// 1. 词法分析
tokens := p.lexer.Tokenize(expr)
// 2. 语法分析,构建 AST
ast := p.buildAST(tokens)
// 3. 语义分析,类型检查
if err := p.validate(ast); err != nil {
returnnil, err
}
return ast, nil
}
// AST 节点
type ExprNode struct {
Type NodeType // AND, OR, EQ, GT, LT, IN, etc.
Left *ExprNode
Right *ExprNode
Value interface{}
FieldID int64
}
// 示例:score > 0.8 and category == 'tech'
// AST:
// AND
// / \
// GT EQ
// / \ / \
// score 0.8 category 'tech'
// 查询计划器
type QueryPlanner struct {
schema *Schema
stats *Statistics
}
// 生成查询计划
func (qp *QueryPlanner) Plan(req *SearchRequest) (*QueryPlan, error) {
plan := &QueryPlan{}
// 1. 解析表达式
if req.Expr != "" {
exprNode, err := qp.parseExpr(req.Expr)
if err != nil {
returnnil, err
}
plan.FilterNode = exprNode
}
// 2. 选择索引
plan.VectorIndex = qp.selectVectorIndex(req.AnnsField)
plan.ScalarIndexes = qp.selectScalarIndexes(plan.FilterNode)
// 3. 估算成本
plan.Cost = qp.estimateCost(plan)
// 4. 优化计划
plan = qp.optimize(plan)
return plan, nil
}
// 查询计划
type QueryPlan struct {
// 过滤条件
FilterNode *ExprNode
// 向量检索
VectorIndex *IndexInfo
VectorField string
MetricType string
SearchParams map[string]interface{}
// 标量索引
ScalarIndexes []*IndexInfo
// 执行顺序
ExecutionOrder []PlanNode
// 成本估算
Cost float64
}
// 将过滤条件尽早应用,减少数据量
func (qp *QueryPlanner) pushDownPredicate(plan *QueryPlan) {
// 原始计划:
// 1. 向量检索 → 10000 个结果
// 2. 标量过滤 → 100 个结果
// 优化后:
// 1. 标量过滤 → 1000 个候选
// 2. 向量检索 → 100 个结果
if plan.FilterNode != nil && qp.hasScalarIndex(plan.FilterNode) {
// 先执行标量过滤
plan.ExecutionOrder = []PlanNode{
&ScalarFilterNode{Expr: plan.FilterNode},
&VectorSearchNode{...},
}
}
}
// 选择最优索引
func (qp *QueryPlanner) selectBestIndex(expr *ExprNode) *IndexInfo {
candidates := qp.findApplicableIndexes(expr)
// 评估每个索引的成本
bestIndex := candidates[0]
minCost := qp.estimateIndexCost(bestIndex, expr)
for _, idx := range candidates[1:] {
cost := qp.estimateIndexCost(idx, expr)
if cost < minCost {
minCost = cost
bestIndex = idx
}
}
return bestIndex
}
// 并行执行多个 Segment 的查询
func (qn *QueryNode) parallelSearch(plan *QueryPlan) []*SearchResult {
segments := qn.getSegments(plan.CollectionID)
// 创建工作池
numWorkers := runtime.NuMCPU()
jobs := make(chan *Segment, len(segments))
results := make(chan *SearchResult, len(segments))
// 启动 worker
for i := 0; i < numWorkers; i++ {
gofunc() {
for seg := range jobs {
result := qn.searchSegment(seg, plan)
results <- result
}
}()
}
// 分发任务
for _, seg := range segments {
jobs <- seg
}
close(jobs)
// 收集结果
allResults := make([]*SearchResult, 0, len(segments))
for i := 0; i < len(segments); i++ {
allResults = append(allResults, <-results)
}
return allResults
}
// Segment 的向量检索
func (seg *Segment) Search(query []float32, topK int, filter *Bitset) *SearchResult {
// 1. 应用标量过滤(生成 Bitset)
bitset := seg.applyFilter(filter)
// 2. 向量检索
var result *SearchResult
if seg.hasIndex() {
// 使用索引检索
result = seg.index.Search(query, topK, bitset)
} else {
// 暴力搜索
result = seg.bruteForceSearch(query, topK, bitset)
}
// 3. 应用删除过滤
result = seg.filterDeleted(result)
return result
}
// HNSW 索引检索
func (idx *HNSWIndex) Search(query []float32, k int, bitset *Bitset) *SearchResult {
// 1. 从入口点开始
ep := idx.entryPoint
// 2. 逐层贪心搜索
for level := idx.maxLevel; level > 0; level-- {
ep = idx.searchLayer(query, ep, 1, level, bitset)
}
// 3. 在第 0 层精确搜索
candidates := idx.searchLayer(query, ep, idx.ef, 0, bitset)
// 4. 返回 TopK
return candidates.TopK(k)
}
// 搜索层
func (idx *HNSWIndex) searchLayer(
query []float32,
entryPoint *Node,
numClosest int,
layer int,
bitset *Bitset,
) []*Candidate {
visited := make(map[int64]bool)
candidates := NewMinHeap()
results := NewMaxHeap()
// 初始化
dist := idx.distance(query, entryPoint.Vector)
candidates.Push(&Candidate{Node: entryPoint, Distance: dist})
results.Push(&Candidate{Node: entryPoint, Distance: dist})
visited[entryPoint.ID] = true
// 贪心搜索
for candidates.Len() > 0 {
current := candidates.Pop()
// 如果当前点比结果中最远的点还远,停止
if current.Distance > results.Top().Distance {
break
}
// 扩展邻居
for _, neighbor := range current.Node.Neighbors[layer] {
if visited[neighbor.ID] {
continue
}
visited[neighbor.ID] = true
// 应用 Bitset 过滤
if bitset != nil && !bitset.Test(neighbor.ID) {
continue
}
dist := idx.distance(query, neighbor.Vector)
if dist < results.Top().Distance || results.Len() < numClosest {
candidates.Push(&Candidate{Node: neighbor, Distance: dist})
results.Push(&Candidate{Node: neighbor, Distance: dist})
if results.Len() > numClosest {
results.Pop()
}
}
}
}
return results.ToSlice()
}
// 标量过滤器
type ScalarFilter struct {
expr *ExprNode
schema *Schema
}
// 生成 Bitset
func (sf *ScalarFilter) Filter(segment *Segment) *Bitset {
bitset := NewBitset(segment.NumRows)
// 遍历所有行
for rowID := 0; rowID < segment.NumRows; rowID++ {
if sf.evaluate(segment, rowID) {
bitset.Set(rowID)
}
}
return bitset
}
// 评估表达式
func (sf *ScalarFilter) evaluate(segment *Segment, rowID int) bool {
return sf.evaluateNode(sf.expr, segment, rowID)
}
func (sf *ScalarFilter) evaluateNode(node *ExprNode, segment *Segment, rowID int) bool {
switch node.Type {
case NodeType_AND:
return sf.evaluateNode(node.Left, segment, rowID) &&
sf.evaluateNode(node.Right, segment, rowID)
case NodeType_OR:
return sf.evaluateNode(node.Left, segment, rowID) ||
sf.evaluateNode(node.Right, segment, rowID)
case NodeType_EQ:
fieldValue := segment.GetFieldValue(node.FieldID, rowID)
return fieldValue == node.Value
case NodeType_GT:
fieldValue := segment.GetFieldValue(node.FieldID, rowID)
return fieldValue.(float64) > node.Value.(float64)
case NodeType_IN:
fieldValue := segment.GetFieldValue(node.FieldID, rowID)
values := node.Value.([]interface{})
for _, v := range values {
if fieldValue == v {
returntrue
}
}
returnfalse
default:
returnfalse
}
}
// SIMD 优化的距离计算
import"golang.org/x/sys/cpu"
// L2 距离(欧氏距离)
func L2Distance(a, b []float32) float32 {
if cpu.X86.HasAVX2 {
return l2DistanceAVX2(a, b)
}
return l2DistanceScalar(a, b)
}
// AVX2 优化版本
func l2DistanceAVX2(a, b []float32) float32 {
// 使用 AVX2 指令集,一次处理 8 个 float32
// 性能提升 4-8 倍
var sum float32
// ... AVX2 实现
return sum
}
// 标量版本
func l2DistanceScalar(a, b []float32) float32 {
var sum float32
for i := range a {
diff := a[i] - b[i]
sum += diff * diff
}
return sum
}
// 余弦相似度
func CosineSimilarity(a, b []float32) float32 {
var dotProduct, normA, normB float32
for i := range a {
dotProduct += a[i] * b[i]
normA += a[i] * a[i]
normB += b[i] * b[i]
}
return dotProduct / (sqrt(normA) * sqrt(normB))
}
// 内积
func InnerProduct(a, b []float32) float32 {
var sum float32
for i := range a {
sum += a[i] * b[i]
}
return sum
}
// Proxy 的结果聚合
func (p *Proxy) mergeResults(results []*SearchResult, topK int) *SearchResult {
// 1. 使用最小堆合并多个有序列表
heap := NewMinHeap()
// 2. 初始化:每个结果的第一个元素入堆
for i, result := range results {
iflen(result.IDs) > 0 {
heap.Push(&HeapNode{
Distance: result.Distances[0],
ID: result.IDs[0],
ResultIdx: i,
ItemIdx: 0,
})
}
}
// 3. 归并排序
merged := &SearchResult{
IDs: make([]int64, 0, topK),
Distances: make([]float32, 0, topK),
}
for heap.Len() > 0 && len(merged.IDs) < topK {
node := heap.Pop()
// 去重(同一个 ID 可能在多个 Segment 中)
if !merged.Contains(node.ID) {
merged.IDs = append(merged.IDs, node.ID)
merged.Distances = append(merged.Distances, node.Distance)
}
// 将该结果的下一个元素入堆
result := results[node.ResultIdx]
nextIdx := node.ItemIdx + 1
if nextIdx < len(result.IDs) {
heap.Push(&HeapNode{
Distance: result.Distances[nextIdx],
ID: result.IDs[nextIdx],
ResultIdx: node.ResultIdx,
ItemIdx: nextIdx,
})
}
}
return merged
}
// 多 QueryNode 结果聚合
func (p *Proxy) aggregateFromNodes(nodeResults map[int64]*SearchResult, topK int) *SearchResult {
// 1. 收集所有结果
allResults := make([]*SearchResult, 0, len(nodeResults))
for _, result := range nodeResults {
allResults = append(allResults, result)
}
// 2. 合并结果
merged := p.mergeResults(allResults, topK)
// 3. 填充其他字段(如果需要)
iflen(p.outputFields) > 0 {
merged = p.fillOutputFields(merged)
}
return merged
}
// 填充输出字段
func (p *Proxy) fillOutputFields(result *SearchResult) *SearchResult {
// 批量查询其他字段
ids := result.IDs
fields := p.queryFieldsByIDs(ids, p.outputFields)
result.Fields = fields
return result
}
from pymilvus import Collection, AnnSearchRequest, RRFRanker
collection = Collection("documents")
# 多路召回 + 重排序
req1 = AnnSearchRequest(
data=[[0.1] * 768],
anns_field="dense_vector",
param={"metric_type": "COSINE"},
limit=100# 召回 100 个
)
req2 = AnnSearchRequest(
data=[[0.2] * 1000],
anns_field="sparse_vector",
param={"metric_type": "IP"},
limit=100# 召回 100 个
)
# RRF(Reciprocal Rank Fusion)重排序
results = collection.hybrid_search(
reqs=[req1, req2],
rerank=RRFRanker(k=60), # RRF 参数
limit=10# 最终返回 10 个
)
RRF 算法:
// RRF 重排序
func RRFRerank(results [][]*SearchResult, k int) []*SearchResult {
// RRF 公式:score(d) = Σ 1 / (k + rank_i(d))
scores := make(map[int64]float64)
for _, resultList := range results {
for rank, result := range resultList {
scores[result.ID] += 1.0 / float64(k + rank + 1)
}
}
// 按分数排序
ranked := make([]*SearchResult, 0, len(scores))
for id, score := range scores {
ranked = append(ranked, &SearchResult{
ID: id,
Score: score,
})
}
sort.Slice(ranked, func(i, j int) bool {
return ranked[i].Score > ranked[j].Score
})
return ranked
}
// QueryNode 的缓存
type QueryNodeCache struct {
// Segment 缓存
segmentCache *LRUCache
// 查询结果缓存
resultCache *LRUCache
// 向量缓存
vectorCache *LRUCache
}
// 查询时先检查缓存
func (qn *QueryNode) searchWithCache(req *SearchRequest) *SearchResult {
// 1. 生成缓存 key
cacheKey := qn.generateCacheKey(req)
// 2. 检查结果缓存
if cached, ok := qn.cache.resultCache.Get(cacheKey); ok {
return cached.(*SearchResult)
}
// 3. 执行查询
result := qn.search(req)
// 4. 缓存结果
qn.cache.resultCache.Put(cacheKey, result)
return result
}
// 预取下一批 Segment
func (qn *QueryNode) prefetchSegments(currentSegmentID int64) {
// 预测下一个可能访问的 Segment
nextSegments := qn.predictNextSegments(currentSegmentID)
// 异步加载
for _, segID := range nextSegments {
go func(id int64) {
qn.loadSegment(id)
}(segID)
}
}
# 单条查询(慢)
for query in queries:
result = collection.search(data=[query], ...)
# 批量查询(快 10 倍)
batch_size = 100
for i in range(0, len(queries), batch_size):
batch = queries[i:i+batch_size]
results = collection.search(data=batch, ...)
# 不指定分区(扫描所有分区)
results = collection.search(
data=[query_vector],
anns_field="embedding",
limit=10
)
# 指定分区(只扫描相关分区)
results = collection.search(
data=[query_vector],
anns_field="embedding",
partition_names=["2024_11"], # 只搜索 11 月的数据
limit=10
)
from pymilvus import Collection
collection = Collection("products")
# 复杂过滤 + 向量搜索
results = collection.search(
data=[[0.1] * 768],
anns_field="image_embedding",
param={"metric_type": "L2", "params": {"nprobe": 32}},
limit=20,
expr="""
(price >= 100 and price <= 500) and
category in ['electronics', 'computers'] and
rating > 4.0 and
stock > 0 and
tags array_contains_any ['featured', 'bestseller']
""",
output_fields=["name", "price", "rating", "image_url"]
)
# 处理结果
for hit in results[0]:
print(f"Product: {hit.entity.get('name')}")
print(f"Price: ${hit.entity.get('price')}")
print(f"Rating: {hit.entity.get('rating')}")
print(f"Distance: {hit.distance}")
print("---")
# 第一页
page_size = 20
results_page1 = collection.search(
data=[query_vector],
anns_field="embedding",
limit=page_size,
offset=0
)
# 第二页
results_page2 = collection.search(
data=[query_vector],
anns_field="embedding",
limit=page_size,
offset=page_size
)
# 查找距离在 [0.5, 1.0] 范围内的向量
results = collection.search(
data=[query_vector],
anns_field="embedding",
param={
"metric_type": "L2",
"params": {"nprobe": 32},
"radius": 1.0, # 最大距离
"range_filter": 0.5 # 最小距离
},
limit=100
)
✅ 使用表达式过滤减少搜索空间 ✅ 批量查询提升吞吐量 ✅ 合理设置 TopK 和搜索参数 ✅ 利用分区裁剪加速查询 ✅ 使用混合搜索提高召回率
关注云与数字化,一起探索向量数据库的技术奥秘!
如果觉得文章有帮助,欢迎点赞、转发、收藏!
标签:#Milvus #查询引擎 #向量检索 #查询优化 #HNSW
Agents 2025 全景解读:从单兵工具到企业级协作体" data-itemshowtype="0" linktype="text" data-linktype="2">AI Agents 2025 全景解读:从单兵工具到企业级协作体
DeepSeek OCR:学习理解" data-itemshowtype="0" linktype="text" data-linktype="2">DeepSeek OCR:学习理解
dify Agent 和 Semantic Kernel 的智能运维系统:从告警到自动修复" data-itemshowtype="0" linktype="text" data-linktype="2">构建基于 Dify Agent 和 Semantic Kernel 的智能运维系统:从告警到自动修复
AI Agent 工具盘点:AutoGPT、CrewAI、LangGraph、AgentVerse 谁更适合 DevOps?
Manus - 面向 Java 开发者的开源通用智能体" data-itemshowtype="11" linktype="text" data-linktype="2">JManus - 面向 Java 开发者的开源通用智能体
Cilium 首次集成国内云服务,阿里云 ENI 被纳入新版本特性
53AI,企业落地大模型首选服务商
产品:场景落地咨询+大模型应用平台+行业解决方案
承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业
2025-11-28
Graphiti 为 AI 智能体构建实时知识图谱
2025-11-25
再聊一聊怎么将非结构化文本转换为可交互知识图谱
2025-11-24
让企业级大模型落地:每个企业 AI项目都需要的知识图谱KG基础
2025-11-23
详解Palantir本体中的地理位置数据
2025-11-20
Context Engineering:Weaviate构建智能体系统的完整指南
2025-11-17
基于递归抽象树检索技术构建反洗钱知识库的探索与实践
2025-11-14
本地知识库搭建(Mia VS cheer studio VS AnythingLLM)
2025-11-13
用 Cognee 构建端到端知识图谱,实现当前效果最好的AI Agent记忆层
2025-09-02
2025-09-17
2025-09-03
2025-09-02
2025-10-19
2025-10-30
2025-09-20
2025-11-05
2025-10-21
2025-10-13