fix(knowledge): unstick summary and isolate manual re-index trace attempts

Manually uploaded text documents could get stuck showing "generating
summary" forever, and editing/re-saving a document mixed its trace into
the previous attempt.

Three related fixes:

1. finalizeIndexedKnowledgeState no longer marks a text document
   completed the moment chunks are indexed. Doing so made
   KnowledgePostProcess hit its non-processing guard and skip the
   summary/question/graph fan-out, stranding summary_status on
   "pending". The row now stays "processing" (still retrievable via
   enable_status=enabled) and post-process remains the sole authority
   that drives processing -> finalizing -> completed.

2. ProcessManualUpdate now allocates a fresh span-tracking attempt via
   OpenAttempt. Previously it ran with attempt 0, so processChunks
   dropped every stage span and KnowledgePostProcess fell back to
   LatestAttempt, piling the new run's summary/wiki subspans onto the
   prior attempt's trace.

3. Enrichment workers (summary, question, graph extract) skip when a
   newer attempt has superseded theirs. A stale subtask from a previous
   upload/edit/reparse must not read deleted chunks or decrement the new
   attempt's pending_subtasks_count, which would race-promote the row to
   completed before the new attempt finishes.

Updated TestFinalizeIndexedKnowledgeState to expect the deferred
completion for text documents.
This commit is contained in:
wizardchen
2026-05-29 17:44:20 +08:00
committed by lyingbug
parent 9f139ff4d8
commit 9cb9505114
4 changed files with 77 additions and 13 deletions

View File

@@ -202,6 +202,16 @@ func (s *ChunkExtractService) Handle(ctx context.Context, t *asynq.Task) error {
ctx = logger.WithField(ctx, "extract", p.ChunkID)
ctx = context.WithValue(ctx, types.TenantIDContextKey, p.TenantID)
// A newer attempt (re-upload / edit / reparse) has superseded this one:
// skip before opening the span or registering the FinalizeSubtask defer.
// The chunk this task references was deleted by the new attempt's cleanup,
// and decrementing here would drain the new attempt's counter.
if attemptSuperseded(ctx, s.tracker(), p.KnowledgeID, p.Attempt) {
logger.Infof(ctx, "graph extract: attempt %d superseded for %s, skipping stale enrichment",
p.Attempt, p.KnowledgeID)
return nil
}
// Open a postprocess subspan keyed by chunk ordinal so the trace
// shows real per-chunk graph extraction time. Skipped silently when
// upstream didn't pass the parent attempt (legacy in-flight tasks)

View File

@@ -169,6 +169,20 @@ func attemptFromCtx(ctx context.Context) int {
return 0
}
// attemptSuperseded reports whether a newer parse attempt has started for the
// knowledge since this enrichment subtask was enqueued. Stale subtasks from a
// previous upload/edit/reparse that is still draining must NOT touch the new
// attempt's chunks or decrement its pending_subtasks_count — doing so would
// race-promote the row to completed before the new attempt finishes. An attempt
// of 0 predates attempt tracking (or tracking is disabled) and is never treated
// as superseded.
func attemptSuperseded(ctx context.Context, tracker SpanTracker, knowledgeID string, attempt int) bool {
if attempt <= 0 || knowledgeID == "" {
return false
}
return tracker.LatestAttempt(ctx, knowledgeID) > attempt
}
// beginStage / endStage / failStage / skipStage are the by-name shims
// the pipeline uses so call sites don't have to thread *Span values
// through the existing function signatures. Each helper looks up the

View File

@@ -142,9 +142,15 @@ type ProcessChunksOptions struct {
Metadata map[string]string
}
// finalizeIndexedKnowledgeState marks a document searchable as soon as chunks
// and indexes are persisted; post-processing tasks should not keep parsing UI
// indicators alive once retrieval can use the document.
// finalizeIndexedKnowledgeState makes a document retrievable as soon as chunks
// and indexes are persisted (enable_status=enabled), but it deliberately does
// NOT mark the row completed when enrichment is still expected. Whenever the
// document still has work to fan out — pending multimodal image tasks, or text
// chunks that feed summary/question/graph generation — parse_status stays
// "processing" so KnowledgePostProcess remains the single authority that drives
// processing → finalizing → completed. Marking the row completed here would make
// post-process hit its "non-processing status" guard and skip the summary
// fan-out, stranding summary_status on "pending" forever.
func finalizeIndexedKnowledgeState(
knowledge *types.Knowledge,
totalStorageSize int64,
@@ -152,16 +158,14 @@ func finalizeIndexedKnowledgeState(
hasPendingMultimodal bool,
now time.Time,
) {
if hasPendingMultimodal {
if hasPendingMultimodal || textChunkCount > 0 {
knowledge.ParseStatus = types.ParseStatusProcessing
knowledge.SummaryStatus = types.SummaryStatusNone
} else {
// No text chunks and no pending multimodal work: there is nothing for
// post-process to enrich, so complete immediately.
knowledge.ParseStatus = types.ParseStatusCompleted
if textChunkCount > 0 {
knowledge.SummaryStatus = types.SummaryStatusPending
} else {
knowledge.SummaryStatus = types.SummaryStatusNone
}
knowledge.SummaryStatus = types.SummaryStatusNone
}
knowledge.EnableStatus = "enabled"
@@ -897,6 +901,15 @@ func (s *knowledgeService) ProcessSummaryGeneration(ctx context.Context, t *asyn
ctx = context.WithValue(ctx, types.LanguageContextKey, payload.Language)
}
// A newer attempt (re-upload / edit / reparse) has superseded this one:
// skip before opening the span or registering the FinalizeSubtask defer
// so we neither read stale chunks nor decrement the new attempt's counter.
if attemptSuperseded(ctx, s.tracker(), payload.KnowledgeID, payload.Attempt) {
logger.Infof(ctx, "summary: attempt %d superseded for %s, skipping stale enrichment",
payload.Attempt, payload.KnowledgeID)
return nil
}
// Open a subspan under the parent attempt's postprocess stage so the
// trace surface shows the real summary-generation duration (LLM call
// + chunk write + index) instead of just the upstream's enqueue time.
@@ -1200,11 +1213,15 @@ func (s *knowledgeService) ProcessQuestionGeneration(ctx context.Context, t *asy
// what we already log to stdout.
var qSpan *Span
var qErr error
// Set when a newer attempt supersedes this run; suppresses the
// FinalizeSubtask decrement so a stale task can't drain the new
// attempt's counter.
superseded := false
// Decrement enrichment counter on terminal exit (success OR final
// retry failure). Runs AFTER the stats-log defer below — defers
// unwind LIFO, so this one declared first executes last.
defer func() {
if (qErr == nil || isFinalAsynqAttempt(ctx)) && payload.KnowledgeID != "" {
if !superseded && (qErr == nil || isFinalAsynqAttempt(ctx)) && payload.KnowledgeID != "" {
if _, _, ferr := s.repo.FinalizeSubtask(ctx, payload.KnowledgeID); ferr != nil {
logger.Warnf(ctx, "question: FinalizeSubtask failed for %s: %v", payload.KnowledgeID, ferr)
}
@@ -1287,6 +1304,17 @@ func (s *knowledgeService) ProcessQuestionGeneration(ctx context.Context, t *asy
logger.Infof(ctx, "Processing question generation for knowledge: %s", payload.KnowledgeID)
// A newer attempt has superseded this one: skip before opening the span
// so we don't read stale chunks. superseded suppresses the counter
// decrement in the defer above; qSpan stays nil so the stats defer no-ops.
if attemptSuperseded(ctx, s.tracker(), payload.KnowledgeID, payload.Attempt) {
superseded = true
exitStatus = "superseded"
logger.Infof(ctx, "question: attempt %d superseded for %s, skipping stale enrichment",
payload.Attempt, payload.KnowledgeID)
return nil
}
// Open the postprocess.question subspan now that we have payload.Attempt.
// Closes via the defer above.
qSpan = s.beginPostprocessSubspan(ctx, payload.KnowledgeID, payload.Attempt, "postprocess.question",
@@ -2245,6 +2273,18 @@ func (s *knowledgeService) ProcessManualUpdate(ctx context.Context, t *asynq.Tas
return nil
}
// Allocate a fresh span-tracking attempt for this manual (re)index.
// Without it attemptFromCtx stays 0, so processChunks drops all stage
// spans and KnowledgePostProcess falls back to LatestAttempt — piling
// this run's summary/wiki subspans onto the previous attempt's trace.
attempt := 0
if root, n, err := s.tracker().OpenAttempt(ctx, knowledge.ID, payload.LangfuseTraceID); err == nil && root != nil {
attempt = n
} else if err != nil {
logger.Warnf(ctx, "ProcessManualUpdate: OpenAttempt failed for %s: %v", knowledge.ID, err)
}
ctx = withAttempt(ctx, attempt)
// Cleanup old resources (indexes, chunks, graph) for update operations
if payload.NeedCleanup {
if err := s.cleanupKnowledgeResources(ctx, knowledge); err != nil {

View File

@@ -18,10 +18,10 @@ func TestFinalizeIndexedKnowledgeState(t *testing.T) {
wantSummaryStatus string
}{
{
name: "text document is completed once chunks are indexed",
name: "text document stays processing so post-process can fan out enrichment",
textChunkCount: 2,
wantParseStatus: types.ParseStatusCompleted,
wantSummaryStatus: types.SummaryStatusPending,
wantParseStatus: types.ParseStatusProcessing,
wantSummaryStatus: types.SummaryStatusNone,
},
{
name: "empty indexed document is completed without summary work",