diff --git a/internal/application/repository/retriever/postgres/repository.go b/internal/application/repository/retriever/postgres/repository.go index 5d26e591..f89b9463 100644 --- a/internal/application/repository/retriever/postgres/repository.go +++ b/internal/application/repository/retriever/postgres/repository.go @@ -193,13 +193,13 @@ func (g *pgRepository) KeywordsRetrieve(ctx context.Context, Values: common.ToInterfaceSlice(params.TagIDs), }) } - + // Use ParadeDB's ||| operator for matching any token conds = append(conds, clause.Expr{ SQL: "content ||| ?", Vars: []interface{}{params.Query}, }) - + // Filter by is_enabled = true or NULL (NULL means enabled for historical data) conds = append(conds, clause.Expr{ SQL: "(is_enabled IS NULL OR is_enabled = ?)", @@ -340,22 +340,36 @@ func (g *pgRepository) VectorRetrieve(ctx context.Context, whereClause = "WHERE " + strings.Join(whereParts, " AND ") } - // Expand TopK to get more candidates before threshold filtering + // Expand TopK to get more candidates before threshold filtering. + // + // HNSW requires `ef_search >= LIMIT`, and a very large LIMIT (e.g. 1000) + // forces HNSW to walk a near-exhaustive portion of the graph, often making + // it slower than a sequential scan and pushing the planner to pick Seq Scan + // even when an index exists. 200 is a good sweet spot: it gives enough + // headroom for threshold/filter post-processing without ballooning ef_search. expandedTopK := params.TopK * 2 if expandedTopK < 100 { expandedTopK = 100 // Minimum 100 candidates } - if expandedTopK > 1000 { - expandedTopK = 1000 // Maximum 1000 candidates + if expandedTopK > 200 { + expandedTopK = 200 // Maximum 200 candidates (keeps HNSW efficient) } if expandedTopK < params.TopK { expandedTopK = params.TopK // Ensure subquery limit is at least final limit } - // Optimized query: Use subquery to calculate distance once - // Strategy: Use ORDER BY with vector distance to leverage HNSW index, - // then filter by threshold in outer query - // This allows PostgreSQL to use HNSW index efficiently + // Optimized query: Use subquery to calculate distance once. + // + // IMPORTANT: The HNSW index in this project is built on the EXPRESSION + // (embedding::halfvec()) halfvec_cosine_ops + // because the `embedding` column itself is `halfvec` without a fixed dimension + // (so the table can store multiple embedding sizes such as 798 / 3584 / ...). + // + // pgvector requires the ORDER BY expression to match the indexed expression + // EXACTLY, otherwise the planner falls back to a sequential scan. The + // `embedding::halfvec(%d)` cast on both sides of `<=>` is therefore NOT + // redundant — it is the only way to make the HNSW index get used at all. + // See: pgvector issues #702, #835 and ParadeDB "indexing-expressions" docs. subqueryLimitParam := len(allVars) + 1 thresholdParam := len(allVars) + 2 finalLimitParam := len(allVars) + 3 @@ -367,24 +381,66 @@ func (g *pgRepository) VectorRetrieve(ctx context.Context, FROM ( SELECT id, content, source_id, source_type, chunk_id, knowledge_id, knowledge_base_id, tag_id, - embedding <=> $1::halfvec as distance + embedding::halfvec(%[1]d) <=> $1::halfvec(%[1]d) as distance FROM embeddings - %s - ORDER BY embedding <=> $1::halfvec - LIMIT $%d + %[2]s + ORDER BY embedding::halfvec(%[1]d) <=> $1::halfvec(%[1]d) + LIMIT $%[3]d ) AS candidates - WHERE distance <= $%d + WHERE distance <= $%[4]d ORDER BY distance ASC - LIMIT $%d - `, whereClause, subqueryLimitParam, thresholdParam, finalLimitParam) + LIMIT $%[5]d + `, dimension, whereClause, subqueryLimitParam, thresholdParam, finalLimitParam) allVars = append(allVars, expandedTopK) // LIMIT in subquery allVars = append(allVars, 1-params.Threshold) // Distance threshold allVars = append(allVars, params.TopK) // Final LIMIT + // HNSW's `ef_search` defaults to 40, which is much smaller than our + // `expandedTopK` budget (up to 1000). Without raising it, HNSW would only + // return ~40 candidates per scan and our outer threshold/filter step would + // silently lose recall. `SET LOCAL` requires a transaction so we wrap the + // query in one. We never write inside this transaction, so the cost is + // negligible. + efSearch := expandedTopK + if efSearch < 40 { + efSearch = 40 + } + var embeddingDBList []pgVectorWithScore - err := g.db.WithContext(ctx).Raw(querySQL, allVars...).Scan(&embeddingDBList).Error + err := g.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + if err := tx.Exec(fmt.Sprintf("SET LOCAL hnsw.ef_search = %d", efSearch)).Error; err != nil { + // Treat as non-fatal: pgvector should always expose this GUC, but if + // for any reason it does not we still want the query to run (just + // with default recall). We must rollback first because a failed + // statement aborts the transaction in PostgreSQL. + logger.GetLogger(ctx).Warnf("[Postgres] Failed to set hnsw.ef_search=%d: %v", efSearch, err) + return err + } + // pgvector >= 0.8 supports iterative scan, which keeps pulling more + // candidates from HNSW until the post-filter (knowledge_base_id / + // knowledge_id / tag_id / is_enabled) yields enough rows. Without it, + // HNSW returns at most ef_search candidates and the outer filter may + // silently lose recall when the filter is selective. + // Best-effort: ignore failure on older pgvector versions. + if err := tx.Exec("SET LOCAL hnsw.iterative_scan = strict_order").Error; err != nil { + logger.GetLogger(ctx).Debugf("[Postgres] hnsw.iterative_scan not available: %v", err) + // abort transaction and let the fallback path below handle it. + return err + } + return tx.Raw(querySQL, allVars...).Scan(&embeddingDBList).Error + }) + + // Fallback: if the transaction failed because of an unsupported GUC (e.g. + // older pgvector that doesn't have hnsw.ef_search or hnsw.iterative_scan), + // retry the query without the SETs so we still return results. + if err != nil && len(embeddingDBList) == 0 && + (strings.Contains(err.Error(), "hnsw.ef_search") || + strings.Contains(err.Error(), "hnsw.iterative_scan")) { + logger.GetLogger(ctx).Warnf("[Postgres] Retrying vector query without HNSW GUC overrides: %v", err) + err = g.db.WithContext(ctx).Raw(querySQL, allVars...).Scan(&embeddingDBList).Error + } if err == gorm.ErrRecordNotFound { logger.GetLogger(ctx).Warnf("[Postgres] No vector matches found that meet threshold %.4f", params.Threshold) diff --git a/internal/application/service/chat_pipeline/merge_overlap.go b/internal/application/service/chat_pipeline/merge_overlap.go index ba27e6ec..009d11d3 100644 --- a/internal/application/service/chat_pipeline/merge_overlap.go +++ b/internal/application/service/chat_pipeline/merge_overlap.go @@ -30,10 +30,36 @@ func (p *PluginMerge) mergeOverlappingChunks( continue } - // Partial overlap: append the non-overlapping suffix + // Partial overlap: append the non-overlapping suffix. + // + // Offset math assumes len([]rune(Content)) == EndAt-StartAt, but a + // few upstream paths break that invariant: + // - Parent-child chunker prepends table headers, so Content may be + // longer than EndAt-StartAt. + // - Legacy data / mixed chunk sources may carry EndAt-StartAt that + // exceed the actual rune length of Content. + // We clamp offset into [0, len(contentRunes)] so the merge degrades + // gracefully instead of panicking with a negative slice bound. if chunks[i].EndAt > lastChunk.EndAt { contentRunes := []rune(chunks[i].Content) - offset := len(contentRunes) - (chunks[i].EndAt - lastChunk.EndAt) + suffixLen := chunks[i].EndAt - lastChunk.EndAt + offset := len(contentRunes) - suffixLen + if offset < 0 || offset > len(contentRunes) { + pipelineWarn(ctx, "Merge", "overlap_offset_clamp", map[string]interface{}{ + "knowledge_id": knowledgeID, + "chunk_id": chunks[i].ID, + "content_runes": len(contentRunes), + "chunk_start": chunks[i].StartAt, + "chunk_end": chunks[i].EndAt, + "last_end": lastChunk.EndAt, + "computed_offset": offset, + }) + if offset < 0 { + offset = 0 + } else { + offset = len(contentRunes) + } + } lastChunk.Content = lastChunk.Content + string(contentRunes[offset:]) lastChunk.EndAt = chunks[i].EndAt lastChunk.SubChunkID = append(lastChunk.SubChunkID, chunks[i].ID) diff --git a/internal/application/service/knowledgebase_search.go b/internal/application/service/knowledgebase_search.go index d2717aa5..39d007f7 100644 --- a/internal/application/service/knowledgebase_search.go +++ b/internal/application/service/knowledgebase_search.go @@ -114,8 +114,8 @@ func (s *knowledgeBaseService) HybridSearch(ctx context.Context, // Use 5x over-retrieval to ensure sufficient candidates for RRF fusion and reranking. // Scale proportionally when searching multiple KBs to maintain per-KB recall quality. matchCount := max(params.MatchCount*5, 50) * len(searchKBIDs) - if matchCount > 1000 { - matchCount = 1000 + if matchCount > 500 { + matchCount = 500 } // Build retrieval parameters for vector and keyword engines