免费POC, 零成本试错
AI知识库

53AI知识库

学习大模型的前沿技术与行业应用场景


我要投稿

Milvus 查询引擎剖析:从 SQL 到向量检索的执行全流程

发布日期:2025-11-30 09:32:02 浏览次数: 1511
作者:云与数字化

微信搜一搜,关注“云与数字化”

推荐语

深入解析Milvus查询引擎的工作原理,从SQL到向量检索的完整执行流程一网打尽。

核心内容:
1. Milvus支持的三种主要查询类型及其实现方式
2. 查询请求在分布式架构中的完整执行流程
3. 向量检索与标量查询的协同工作机制

杨芳贤
53AI创始人/腾讯云(TVP)最具价值专家

一条查询请求如何在 Milvus 中执行?本文将深入查询引擎的内部,揭示向量检索的完整流程。

全文约 6000 字,建议阅读时间 15 分钟


一、查询引擎概览

1.1 查询类型

Milvus 支持三种主要查询类型:

from pymilvus import Collection

collection = Collection("documents")

# 1. 向量搜索(ANN Search)
results = collection.search(
    data=[[0.1] * 768],  # 查询向量
    anns_field="embedding",
    param={"metric_type""COSINE""params": {"ef"64}},
    limit=10
)

# 2. 标量查询(Query)
results = collection.query(
    expr="score > 0.8 and category == 'tech'",
    output_fields=["id""text""score"]
)

# 3. 混合查询(Hybrid Search)
results = collection.hybrid_search(
    reqs=[
        # 向量搜索
        AnnSearchRequest(
            data=[[0.1] * 768],
            anns_field="dense_vector",
            param={"metric_type""COSINE"},
            limit=10
        ),
        # 全文检索
        AnnSearchRequest(
            data=[[0.2] * 1000],
            anns_field="sparse_vector",
            param={"metric_type""IP"},
            limit=10
        )
    ],
    rerank=WeightedRanker(0.70.3),  # 重排序
    limit=10
)

1.2 查询架构

查询执行流程:

┌─────────┐
│ Client  │ 发起查询
└────┬────┘
     │
┌────▼────┐
│  Proxy  │ 1. 解析请求
└────┬────┘ 2. 查询路由
     │      3. 负载均衡
     │
     ├──────────────────┬──────────────────┐
     │                  │                  │
┌────▼────┐        ┌───▼────┐        ┌───▼────┐
│QueryNode│        │QueryNode│        │QueryNode│
│  Node1  │        │  Node2  │        │  Node3  │
└────┬────┘        └───┬────┘        └───┬────┘
     │                 │                  │
     │ 1. 标量过滤      │                  │
     │ 2. 向量检索      │                  │
     │ 3. 返回 TopK     │                  │
     │                 │                  │
     └─────────────────┴──────────────────┘
                       │
┌──────────────────────▼────┐
│  Proxy                     │
│  1. 合并结果                │
│  2. 全局排序                │
│  3. 返回最终 TopK           │
└────────────────────────────┘

二、查询解析与规划

2.1 表达式解析

Milvus 支持类 SQL 的表达式语法:

# 基础表达式
expr = "id in [1, 2, 3]"
expr = "score > 0.8"
expr = "category == 'tech'"

# 复合表达式
expr = "score > 0.8 and category == 'tech'"
expr = "(score > 0.8 or views > 1000) and category != 'spam'"

# 数组操作
expr = "tags array_contains 'AI'"
expr = "tags array_contains_any ['AI', 'ML']"
expr = "tags array_contains_all ['AI', 'ML', 'DL']"

# JSON 字段
expr = "metadata['author'] == 'Alice'"
expr = "metadata['tags'][0] == 'featured'"

# 字符串操作
expr = "text like 'Milvus%'"# 前缀匹配
expr = "text like '%database%'"# 包含匹配

表达式 AST(抽象语法树)

// 表达式解析器
type ExprParser struct {
    lexer  *Lexer
    tokens []Token
}

// 解析表达式
func (p *ExprParser) Parse(expr string) (*ExprNode, error) {
    // 1. 词法分析
    tokens := p.lexer.Tokenize(expr)
    
    // 2. 语法分析,构建 AST
    ast := p.buildAST(tokens)
    
    // 3. 语义分析,类型检查
    if err := p.validate(ast); err != nil {
        returnnil, err
    }
    
    return ast, nil
}

// AST 节点
type ExprNode struct {
    Type     NodeType  // AND, OR, EQ, GT, LT, IN, etc.
    Left     *ExprNode
    Right    *ExprNode
    Value    interface{}
    FieldID  int64
}

// 示例:score > 0.8 and category == 'tech'
// AST:
//       AND
//      /   \
//     GT    EQ
//    / \   / \
// score 0.8 category 'tech'

2.2 查询计划生成

// 查询计划器
type QueryPlanner struct {
    schema *Schema
    stats  *Statistics
}

// 生成查询计划
func (qp *QueryPlanner) Plan(req *SearchRequest) (*QueryPlan, error) {
    plan := &QueryPlan{}
    
    // 1. 解析表达式
    if req.Expr != "" {
        exprNode, err := qp.parseExpr(req.Expr)
        if err != nil {
            returnnil, err
        }
        plan.FilterNode = exprNode
    }
    
    // 2. 选择索引
    plan.VectorIndex = qp.selectVectorIndex(req.AnnsField)
    plan.ScalarIndexes = qp.selectScalarIndexes(plan.FilterNode)
    
    // 3. 估算成本
    plan.Cost = qp.estimateCost(plan)
    
    // 4. 优化计划
    plan = qp.optimize(plan)
    
    return plan, nil
}

// 查询计划
type QueryPlan struct {
    // 过滤条件
    FilterNode *ExprNode
    
    // 向量检索
    VectorIndex  *IndexInfo
    VectorField  string
    MetricType   string
    SearchParams map[string]interface{}
    
    // 标量索引
    ScalarIndexes []*IndexInfo
    
    // 执行顺序
    ExecutionOrder []PlanNode
    
    // 成本估算
    Cost float64
}

2.3 查询优化

优化 1:谓词下推(Predicate Pushdown)

// 将过滤条件尽早应用,减少数据量
func (qp *QueryPlanner) pushDownPredicate(plan *QueryPlan) {
    // 原始计划:
    // 1. 向量检索 → 10000 个结果
    // 2. 标量过滤 → 100 个结果
    
    // 优化后:
    // 1. 标量过滤 → 1000 个候选
    // 2. 向量检索 → 100 个结果
    
    if plan.FilterNode != nil && qp.hasScalarIndex(plan.FilterNode) {
        // 先执行标量过滤
        plan.ExecutionOrder = []PlanNode{
            &ScalarFilterNode{Expr: plan.FilterNode},
            &VectorSearchNode{...},
        }
    }
}

优化 2:索引选择

// 选择最优索引
func (qp *QueryPlanner) selectBestIndex(expr *ExprNode) *IndexInfo {
    candidates := qp.findApplicableIndexes(expr)
    
    // 评估每个索引的成本
    bestIndex := candidates[0]
    minCost := qp.estimateIndexCost(bestIndex, expr)
    
    for _, idx := range candidates[1:] {
        cost := qp.estimateIndexCost(idx, expr)
        if cost < minCost {
            minCost = cost
            bestIndex = idx
        }
    }
    
    return bestIndex
}

优化 3:并行执行

// 并行执行多个 Segment 的查询
func (qn *QueryNode) parallelSearch(plan *QueryPlan) []*SearchResult {
    segments := qn.getSegments(plan.CollectionID)
    
    // 创建工作池
    numWorkers := runtime.NuMCPU()
    jobs := make(chan *Segment, len(segments))
    results := make(chan *SearchResult, len(segments))
    
    // 启动 worker
    for i := 0; i < numWorkers; i++ {
        gofunc() {
            for seg := range jobs {
                result := qn.searchSegment(seg, plan)
                results <- result
            }
        }()
    }
    
    // 分发任务
    for _, seg := range segments {
        jobs <- seg
    }
    close(jobs)
    
    // 收集结果
    allResults := make([]*SearchResult, 0len(segments))
    for i := 0; i < len(segments); i++ {
        allResults = append(allResults, <-results)
    }
    
    return allResults
}

三、向量检索执行

3.1 Segment 内检索

// Segment 的向量检索
func (seg *Segment) Search(query []float32, topK int, filter *Bitset) *SearchResult {
    // 1. 应用标量过滤(生成 Bitset)
    bitset := seg.applyFilter(filter)
    
    // 2. 向量检索
    var result *SearchResult
    if seg.hasIndex() {
        // 使用索引检索
        result = seg.index.Search(query, topK, bitset)
    } else {
        // 暴力搜索
        result = seg.bruteForceSearch(query, topK, bitset)
    }
    
    // 3. 应用删除过滤
    result = seg.filterDeleted(result)
    
    return result
}

// HNSW 索引检索
func (idx *HNSWIndex) Search(query []float32, k int, bitset *Bitset) *SearchResult {
    // 1. 从入口点开始
    ep := idx.entryPoint
    
    // 2. 逐层贪心搜索
    for level := idx.maxLevel; level > 0; level-- {
        ep = idx.searchLayer(query, ep, 1, level, bitset)
    }
    
    // 3. 在第 0 层精确搜索
    candidates := idx.searchLayer(query, ep, idx.ef, 0, bitset)
    
    // 4. 返回 TopK
    return candidates.TopK(k)
}

// 搜索层
func (idx *HNSWIndex) searchLayer(
    query []float32,
    entryPoint *Node,
    numClosest int,
    layer int,
    bitset *Bitset,
)
 []*Candidate
 {
    visited := make(map[int64]bool)
    candidates := NewMinHeap()
    results := NewMaxHeap()
    
    // 初始化
    dist := idx.distance(query, entryPoint.Vector)
    candidates.Push(&Candidate{Node: entryPoint, Distance: dist})
    results.Push(&Candidate{Node: entryPoint, Distance: dist})
    visited[entryPoint.ID] = true
    
    // 贪心搜索
    for candidates.Len() > 0 {
        current := candidates.Pop()
        
        // 如果当前点比结果中最远的点还远,停止
        if current.Distance > results.Top().Distance {
            break
        }
        
        // 扩展邻居
        for _, neighbor := range current.Node.Neighbors[layer] {
            if visited[neighbor.ID] {
                continue
            }
            visited[neighbor.ID] = true
            
            // 应用 Bitset 过滤
            if bitset != nil && !bitset.Test(neighbor.ID) {
                continue
            }
            
            dist := idx.distance(query, neighbor.Vector)
            
            if dist < results.Top().Distance || results.Len() < numClosest {
                candidates.Push(&Candidate{Node: neighbor, Distance: dist})
                results.Push(&Candidate{Node: neighbor, Distance: dist})
                
                if results.Len() > numClosest {
                    results.Pop()
                }
            }
        }
    }
    
    return results.ToSlice()
}

3.2 标量过滤

// 标量过滤器
type ScalarFilter struct {
    expr   *ExprNode
    schema *Schema
}

// 生成 Bitset
func (sf *ScalarFilter) Filter(segment *Segment) *Bitset {
    bitset := NewBitset(segment.NumRows)
    
    // 遍历所有行
    for rowID := 0; rowID < segment.NumRows; rowID++ {
        if sf.evaluate(segment, rowID) {
            bitset.Set(rowID)
        }
    }
    
    return bitset
}

// 评估表达式
func (sf *ScalarFilter) evaluate(segment *Segment, rowID int) bool {
    return sf.evaluateNode(sf.expr, segment, rowID)
}

func (sf *ScalarFilter) evaluateNode(node *ExprNode, segment *Segment, rowID int) bool {
    switch node.Type {
    case NodeType_AND:
        return sf.evaluateNode(node.Left, segment, rowID) &&
               sf.evaluateNode(node.Right, segment, rowID)
    
    case NodeType_OR:
        return sf.evaluateNode(node.Left, segment, rowID) ||
               sf.evaluateNode(node.Right, segment, rowID)
    
    case NodeType_EQ:
        fieldValue := segment.GetFieldValue(node.FieldID, rowID)
        return fieldValue == node.Value
    
    case NodeType_GT:
        fieldValue := segment.GetFieldValue(node.FieldID, rowID)
        return fieldValue.(float64) > node.Value.(float64)
    
    case NodeType_IN:
        fieldValue := segment.GetFieldValue(node.FieldID, rowID)
        values := node.Value.([]interface{})
        for _, v := range values {
            if fieldValue == v {
                returntrue
            }
        }
        returnfalse
    
    default:
        returnfalse
    }
}

3.3 距离计算优化

// SIMD 优化的距离计算
import"golang.org/x/sys/cpu"

// L2 距离(欧氏距离)
func L2Distance(a, b []float32) float32 {
    if cpu.X86.HasAVX2 {
        return l2DistanceAVX2(a, b)
    }
    return l2DistanceScalar(a, b)
}

// AVX2 优化版本
func l2DistanceAVX2(a, b []float32) float32 {
    // 使用 AVX2 指令集,一次处理 8 个 float32
    // 性能提升 4-8 倍
    var sum float32
    // ... AVX2 实现
    return sum
}

// 标量版本
func l2DistanceScalar(a, b []float32) float32 {
    var sum float32
    for i := range a {
        diff := a[i] - b[i]
        sum += diff * diff
    }
    return sum
}

// 余弦相似度
func CosineSimilarity(a, b []float32) float32 {
    var dotProduct, normA, normB float32
    
    for i := range a {
        dotProduct += a[i] * b[i]
        normA += a[i] * a[i]
        normB += b[i] * b[i]
    }
    
    return dotProduct / (sqrt(normA) * sqrt(normB))
}

// 内积
func InnerProduct(a, b []float32) float32 {
    var sum float32
    for i := range a {
        sum += a[i] * b[i]
    }
    return sum
}

四、结果聚合与排序

4.1 多 Segment 结果合并

// Proxy 的结果聚合
func (p *Proxy) mergeResults(results []*SearchResult, topK int) *SearchResult {
    // 1. 使用最小堆合并多个有序列表
    heap := NewMinHeap()
    
    // 2. 初始化:每个结果的第一个元素入堆
    for i, result := range results {
        iflen(result.IDs) > 0 {
            heap.Push(&HeapNode{
                Distance:  result.Distances[0],
                ID:        result.IDs[0],
                ResultIdx: i,
                ItemIdx:   0,
            })
        }
    }
    
    // 3. 归并排序
    merged := &SearchResult{
        IDs:       make([]int640, topK),
        Distances: make([]float320, topK),
    }
    
    for heap.Len() > 0 && len(merged.IDs) < topK {
        node := heap.Pop()
        
        // 去重(同一个 ID 可能在多个 Segment 中)
        if !merged.Contains(node.ID) {
            merged.IDs = append(merged.IDs, node.ID)
            merged.Distances = append(merged.Distances, node.Distance)
        }
        
        // 将该结果的下一个元素入堆
        result := results[node.ResultIdx]
        nextIdx := node.ItemIdx + 1
        if nextIdx < len(result.IDs) {
            heap.Push(&HeapNode{
                Distance:  result.Distances[nextIdx],
                ID:        result.IDs[nextIdx],
                ResultIdx: node.ResultIdx,
                ItemIdx:   nextIdx,
            })
        }
    }
    
    return merged
}

4.2 分布式结果聚合

// 多 QueryNode 结果聚合
func (p *Proxy) aggregateFromNodes(nodeResults map[int64]*SearchResult, topK int) *SearchResult {
    // 1. 收集所有结果
    allResults := make([]*SearchResult, 0len(nodeResults))
    for _, result := range nodeResults {
        allResults = append(allResults, result)
    }
    
    // 2. 合并结果
    merged := p.mergeResults(allResults, topK)
    
    // 3. 填充其他字段(如果需要)
    iflen(p.outputFields) > 0 {
        merged = p.fillOutputFields(merged)
    }
    
    return merged
}

// 填充输出字段
func (p *Proxy) fillOutputFields(result *SearchResult) *SearchResult {
    // 批量查询其他字段
    ids := result.IDs
    fields := p.queryFieldsByIDs(ids, p.outputFields)
    
    result.Fields = fields
    return result
}

4.3 重排序(Reranking)

from pymilvus import Collection, AnnSearchRequest, RRFRanker

collection = Collection("documents")

# 多路召回 + 重排序
req1 = AnnSearchRequest(
    data=[[0.1] * 768],
    anns_field="dense_vector",
    param={"metric_type""COSINE"},
    limit=100# 召回 100 个
)

req2 = AnnSearchRequest(
    data=[[0.2] * 1000],
    anns_field="sparse_vector",
    param={"metric_type""IP"},
    limit=100# 召回 100 个
)

# RRF(Reciprocal Rank Fusion)重排序
results = collection.hybrid_search(
    reqs=[req1, req2],
    rerank=RRFRanker(k=60),  # RRF 参数
    limit=10# 最终返回 10 个
)

RRF 算法

// RRF 重排序
func RRFRerank(results [][]*SearchResult, k int) []*SearchResult {
    // RRF 公式:score(d) = Σ 1 / (k + rank_i(d))
    scores := make(map[int64]float64)
    
    for _, resultList := range results {
        for rank, result := range resultList {
            scores[result.ID] += 1.0 / float64(k + rank + 1)
        }
    }
    
    // 按分数排序
    ranked := make([]*SearchResult, 0len(scores))
    for id, score := range scores {
        ranked = append(ranked, &SearchResult{
            ID:    id,
            Score: score,
        })
    }
    
    sort.Slice(ranked, func(i, j int) bool {
        return ranked[i].Score > ranked[j].Score
    })
    
    return ranked
}

五、查询性能优化

5.1 缓存策略

// QueryNode 的缓存
type QueryNodeCache struct {
    // Segment 缓存
    segmentCache *LRUCache
    
    // 查询结果缓存
    resultCache *LRUCache
    
    // 向量缓存
    vectorCache *LRUCache
}

// 查询时先检查缓存
func (qn *QueryNode) searchWithCache(req *SearchRequest) *SearchResult {
    // 1. 生成缓存 key
    cacheKey := qn.generateCacheKey(req)
    
    // 2. 检查结果缓存
    if cached, ok := qn.cache.resultCache.Get(cacheKey); ok {
        return cached.(*SearchResult)
    }
    
    // 3. 执行查询
    result := qn.search(req)
    
    // 4. 缓存结果
    qn.cache.resultCache.Put(cacheKey, result)
    
    return result
}

5.2 预取(Prefetch)

// 预取下一批 Segment
func (qn *QueryNode) prefetchSegments(currentSegmentID int64) {
    // 预测下一个可能访问的 Segment
    nextSegments := qn.predictNextSegments(currentSegmentID)
    
    // 异步加载
    for _, segID := range nextSegments {
        go func(id int64) {
            qn.loadSegment(id)
        }(segID)
    }
}

5.3 批量查询

# 单条查询(慢)
for query in queries:
    result = collection.search(data=[query], ...)

# 批量查询(快 10 倍)
batch_size = 100
for i in range(0, len(queries), batch_size):
    batch = queries[i:i+batch_size]
    results = collection.search(data=batch, ...)

5.4 分区裁剪

# 不指定分区(扫描所有分区)
results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    limit=10
)

# 指定分区(只扫描相关分区)
results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    partition_names=["2024_11"],  # 只搜索 11 月的数据
    limit=10
)

六、实战案例

6.1 复杂查询示例

from pymilvus import Collection

collection = Collection("products")

# 复杂过滤 + 向量搜索
results = collection.search(
    data=[[0.1] * 768],
    anns_field="image_embedding",
    param={"metric_type""L2""params": {"nprobe"32}},
    limit=20,
    expr="""
        (price >= 100 and price <= 500) and
        category in ['electronics', 'computers'] and
        rating > 4.0 and
        stock > 0 and
        tags array_contains_any ['featured', 'bestseller']
    """
,
    output_fields=["name""price""rating""image_url"]
)

# 处理结果
for hit in results[0]:
    print(f"Product: {hit.entity.get('name')}")
    print(f"Price: ${hit.entity.get('price')}")
    print(f"Rating: {hit.entity.get('rating')}")
    print(f"Distance: {hit.distance}")
    print("---")

6.2 分页查询

# 第一页
page_size = 20
results_page1 = collection.search(
    data=[query_vector],
    anns_field="embedding",
    limit=page_size,
    offset=0
)

# 第二页
results_page2 = collection.search(
    data=[query_vector],
    anns_field="embedding",
    limit=page_size,
    offset=page_size
)

6.3 范围查询

# 查找距离在 [0.5, 1.0] 范围内的向量
results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    param={
        "metric_type""L2",
        "params": {"nprobe"32},
        "radius"1.0,      # 最大距离
        "range_filter"0.5  # 最小距离
    },
    limit=100
)

七、总结

核心要点

  1. 查询类型:向量搜索、标量查询、混合查询
  2. 查询优化:谓词下推、索引选择、并行执行
  3. 向量检索:HNSW 图搜索、SIMD 加速
  4. 结果聚合:多 Segment 合并、分布式聚合、重排序
  5. 性能优化:缓存、预取、批量查询、分区裁剪

最佳实践

✅ 使用表达式过滤减少搜索空间 ✅ 批量查询提升吞吐量 ✅ 合理设置 TopK 和搜索参数 ✅ 利用分区裁剪加速查询 ✅ 使用混合搜索提高召回率


关注云与数字化,一起探索向量数据库的技术奥秘!

如果觉得文章有帮助,欢迎点赞、转发、收藏!


参考资料

  • Milvus 查询文档:https://milvus.io/docs/search.md
  • HNSW 论文:Efficient and robust approximate nearest neighbor search
  • 查询优化论文:Query Optimization in Database Systems

标签#Milvus #查询引擎 #向量检索 #查询优化 #HNSW


基于身份的网络安全:Cilium策略执行机制

深入理解Cilium eBPF数据平面架构

Cilium 首次集成国内云服务,阿里云 ENI 被纳入新版本特性

Cilium连接跟踪机制深度解析:网络状态管理的基础

Cilium隧道网络实现深度解析:多集群部署的核心



53AI,企业落地大模型首选服务商

产品:场景落地咨询+大模型应用平台+行业解决方案

承诺:免费POC验证,效果达标后再合作。零风险落地应用大模型,已交付160+中大型企业

联系我们

售前咨询
186 6662 7370
预约演示
185 8882 0121

微信扫码

添加专属顾问

回到顶部

加载中...

扫码咨询