mirror of
https://github.com/Tencent/WeKnora.git
synced 2026-06-04 13:30:32 +08:00
refactor(postgres): optimize vector retrieval and HNSW settings
- Adjusted the expanded TopK limit to a maximum of 200 candidates to enhance HNSW efficiency. - Updated the vector retrieval query to ensure the ORDER BY expression matches the indexed expression, improving index usage. - Implemented local settings for HNSW's ef_search and iterative_scan within a transaction to optimize candidate retrieval. - Added fallback logic to handle unsupported GUCs gracefully, ensuring query execution continues even with older pgvector versions. These changes improve the performance and reliability of vector retrieval in the PostgreSQL implementation.
This commit is contained in:
@@ -193,13 +193,13 @@ func (g *pgRepository) KeywordsRetrieve(ctx context.Context,
|
||||
Values: common.ToInterfaceSlice(params.TagIDs),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
// Use ParadeDB's ||| operator for matching any token
|
||||
conds = append(conds, clause.Expr{
|
||||
SQL: "content ||| ?",
|
||||
Vars: []interface{}{params.Query},
|
||||
})
|
||||
|
||||
|
||||
// Filter by is_enabled = true or NULL (NULL means enabled for historical data)
|
||||
conds = append(conds, clause.Expr{
|
||||
SQL: "(is_enabled IS NULL OR is_enabled = ?)",
|
||||
@@ -340,22 +340,36 @@ func (g *pgRepository) VectorRetrieve(ctx context.Context,
|
||||
whereClause = "WHERE " + strings.Join(whereParts, " AND ")
|
||||
}
|
||||
|
||||
// Expand TopK to get more candidates before threshold filtering
|
||||
// Expand TopK to get more candidates before threshold filtering.
|
||||
//
|
||||
// HNSW requires `ef_search >= LIMIT`, and a very large LIMIT (e.g. 1000)
|
||||
// forces HNSW to walk a near-exhaustive portion of the graph, often making
|
||||
// it slower than a sequential scan and pushing the planner to pick Seq Scan
|
||||
// even when an index exists. 200 is a good sweet spot: it gives enough
|
||||
// headroom for threshold/filter post-processing without ballooning ef_search.
|
||||
expandedTopK := params.TopK * 2
|
||||
if expandedTopK < 100 {
|
||||
expandedTopK = 100 // Minimum 100 candidates
|
||||
}
|
||||
if expandedTopK > 1000 {
|
||||
expandedTopK = 1000 // Maximum 1000 candidates
|
||||
if expandedTopK > 200 {
|
||||
expandedTopK = 200 // Maximum 200 candidates (keeps HNSW efficient)
|
||||
}
|
||||
if expandedTopK < params.TopK {
|
||||
expandedTopK = params.TopK // Ensure subquery limit is at least final limit
|
||||
}
|
||||
|
||||
// Optimized query: Use subquery to calculate distance once
|
||||
// Strategy: Use ORDER BY with vector distance to leverage HNSW index,
|
||||
// then filter by threshold in outer query
|
||||
// This allows PostgreSQL to use HNSW index efficiently
|
||||
// Optimized query: Use subquery to calculate distance once.
|
||||
//
|
||||
// IMPORTANT: The HNSW index in this project is built on the EXPRESSION
|
||||
// (embedding::halfvec(<dim>)) halfvec_cosine_ops
|
||||
// because the `embedding` column itself is `halfvec` without a fixed dimension
|
||||
// (so the table can store multiple embedding sizes such as 798 / 3584 / ...).
|
||||
//
|
||||
// pgvector requires the ORDER BY expression to match the indexed expression
|
||||
// EXACTLY, otherwise the planner falls back to a sequential scan. The
|
||||
// `embedding::halfvec(%d)` cast on both sides of `<=>` is therefore NOT
|
||||
// redundant — it is the only way to make the HNSW index get used at all.
|
||||
// See: pgvector issues #702, #835 and ParadeDB "indexing-expressions" docs.
|
||||
subqueryLimitParam := len(allVars) + 1
|
||||
thresholdParam := len(allVars) + 2
|
||||
finalLimitParam := len(allVars) + 3
|
||||
@@ -367,24 +381,66 @@ func (g *pgRepository) VectorRetrieve(ctx context.Context,
|
||||
FROM (
|
||||
SELECT
|
||||
id, content, source_id, source_type, chunk_id, knowledge_id, knowledge_base_id, tag_id,
|
||||
embedding <=> $1::halfvec as distance
|
||||
embedding::halfvec(%[1]d) <=> $1::halfvec(%[1]d) as distance
|
||||
FROM embeddings
|
||||
%s
|
||||
ORDER BY embedding <=> $1::halfvec
|
||||
LIMIT $%d
|
||||
%[2]s
|
||||
ORDER BY embedding::halfvec(%[1]d) <=> $1::halfvec(%[1]d)
|
||||
LIMIT $%[3]d
|
||||
) AS candidates
|
||||
WHERE distance <= $%d
|
||||
WHERE distance <= $%[4]d
|
||||
ORDER BY distance ASC
|
||||
LIMIT $%d
|
||||
`, whereClause, subqueryLimitParam, thresholdParam, finalLimitParam)
|
||||
LIMIT $%[5]d
|
||||
`, dimension, whereClause, subqueryLimitParam, thresholdParam, finalLimitParam)
|
||||
|
||||
allVars = append(allVars, expandedTopK) // LIMIT in subquery
|
||||
allVars = append(allVars, 1-params.Threshold) // Distance threshold
|
||||
allVars = append(allVars, params.TopK) // Final LIMIT
|
||||
|
||||
// HNSW's `ef_search` defaults to 40, which is much smaller than our
|
||||
// `expandedTopK` budget (up to 1000). Without raising it, HNSW would only
|
||||
// return ~40 candidates per scan and our outer threshold/filter step would
|
||||
// silently lose recall. `SET LOCAL` requires a transaction so we wrap the
|
||||
// query in one. We never write inside this transaction, so the cost is
|
||||
// negligible.
|
||||
efSearch := expandedTopK
|
||||
if efSearch < 40 {
|
||||
efSearch = 40
|
||||
}
|
||||
|
||||
var embeddingDBList []pgVectorWithScore
|
||||
|
||||
err := g.db.WithContext(ctx).Raw(querySQL, allVars...).Scan(&embeddingDBList).Error
|
||||
err := g.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
|
||||
if err := tx.Exec(fmt.Sprintf("SET LOCAL hnsw.ef_search = %d", efSearch)).Error; err != nil {
|
||||
// Treat as non-fatal: pgvector should always expose this GUC, but if
|
||||
// for any reason it does not we still want the query to run (just
|
||||
// with default recall). We must rollback first because a failed
|
||||
// statement aborts the transaction in PostgreSQL.
|
||||
logger.GetLogger(ctx).Warnf("[Postgres] Failed to set hnsw.ef_search=%d: %v", efSearch, err)
|
||||
return err
|
||||
}
|
||||
// pgvector >= 0.8 supports iterative scan, which keeps pulling more
|
||||
// candidates from HNSW until the post-filter (knowledge_base_id /
|
||||
// knowledge_id / tag_id / is_enabled) yields enough rows. Without it,
|
||||
// HNSW returns at most ef_search candidates and the outer filter may
|
||||
// silently lose recall when the filter is selective.
|
||||
// Best-effort: ignore failure on older pgvector versions.
|
||||
if err := tx.Exec("SET LOCAL hnsw.iterative_scan = strict_order").Error; err != nil {
|
||||
logger.GetLogger(ctx).Debugf("[Postgres] hnsw.iterative_scan not available: %v", err)
|
||||
// abort transaction and let the fallback path below handle it.
|
||||
return err
|
||||
}
|
||||
return tx.Raw(querySQL, allVars...).Scan(&embeddingDBList).Error
|
||||
})
|
||||
|
||||
// Fallback: if the transaction failed because of an unsupported GUC (e.g.
|
||||
// older pgvector that doesn't have hnsw.ef_search or hnsw.iterative_scan),
|
||||
// retry the query without the SETs so we still return results.
|
||||
if err != nil && len(embeddingDBList) == 0 &&
|
||||
(strings.Contains(err.Error(), "hnsw.ef_search") ||
|
||||
strings.Contains(err.Error(), "hnsw.iterative_scan")) {
|
||||
logger.GetLogger(ctx).Warnf("[Postgres] Retrying vector query without HNSW GUC overrides: %v", err)
|
||||
err = g.db.WithContext(ctx).Raw(querySQL, allVars...).Scan(&embeddingDBList).Error
|
||||
}
|
||||
|
||||
if err == gorm.ErrRecordNotFound {
|
||||
logger.GetLogger(ctx).Warnf("[Postgres] No vector matches found that meet threshold %.4f", params.Threshold)
|
||||
|
||||
@@ -30,10 +30,36 @@ func (p *PluginMerge) mergeOverlappingChunks(
|
||||
continue
|
||||
}
|
||||
|
||||
// Partial overlap: append the non-overlapping suffix
|
||||
// Partial overlap: append the non-overlapping suffix.
|
||||
//
|
||||
// Offset math assumes len([]rune(Content)) == EndAt-StartAt, but a
|
||||
// few upstream paths break that invariant:
|
||||
// - Parent-child chunker prepends table headers, so Content may be
|
||||
// longer than EndAt-StartAt.
|
||||
// - Legacy data / mixed chunk sources may carry EndAt-StartAt that
|
||||
// exceed the actual rune length of Content.
|
||||
// We clamp offset into [0, len(contentRunes)] so the merge degrades
|
||||
// gracefully instead of panicking with a negative slice bound.
|
||||
if chunks[i].EndAt > lastChunk.EndAt {
|
||||
contentRunes := []rune(chunks[i].Content)
|
||||
offset := len(contentRunes) - (chunks[i].EndAt - lastChunk.EndAt)
|
||||
suffixLen := chunks[i].EndAt - lastChunk.EndAt
|
||||
offset := len(contentRunes) - suffixLen
|
||||
if offset < 0 || offset > len(contentRunes) {
|
||||
pipelineWarn(ctx, "Merge", "overlap_offset_clamp", map[string]interface{}{
|
||||
"knowledge_id": knowledgeID,
|
||||
"chunk_id": chunks[i].ID,
|
||||
"content_runes": len(contentRunes),
|
||||
"chunk_start": chunks[i].StartAt,
|
||||
"chunk_end": chunks[i].EndAt,
|
||||
"last_end": lastChunk.EndAt,
|
||||
"computed_offset": offset,
|
||||
})
|
||||
if offset < 0 {
|
||||
offset = 0
|
||||
} else {
|
||||
offset = len(contentRunes)
|
||||
}
|
||||
}
|
||||
lastChunk.Content = lastChunk.Content + string(contentRunes[offset:])
|
||||
lastChunk.EndAt = chunks[i].EndAt
|
||||
lastChunk.SubChunkID = append(lastChunk.SubChunkID, chunks[i].ID)
|
||||
|
||||
@@ -114,8 +114,8 @@ func (s *knowledgeBaseService) HybridSearch(ctx context.Context,
|
||||
// Use 5x over-retrieval to ensure sufficient candidates for RRF fusion and reranking.
|
||||
// Scale proportionally when searching multiple KBs to maintain per-KB recall quality.
|
||||
matchCount := max(params.MatchCount*5, 50) * len(searchKBIDs)
|
||||
if matchCount > 1000 {
|
||||
matchCount = 1000
|
||||
if matchCount > 500 {
|
||||
matchCount = 500
|
||||
}
|
||||
|
||||
// Build retrieval parameters for vector and keyword engines
|
||||
|
||||
Reference in New Issue
Block a user