From 4a0a151d419d4d89c3a69d603e1fd1ad101abc8c Mon Sep 17 00:00:00 2001 From: wizardchen Date: Fri, 29 May 2026 19:27:58 +0800 Subject: [PATCH] fix(knowledge): drain finalizing counter on all terminal subtask exits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The finalizing subtask counter (introduced when wiki ingest was counted) could leak slots, leaving a fully-parsed doc stuck in "finalizing" until the housekeeping sweep wrongly marked it "failed". - wiki ingest: a doc skipped in map (knowledge deleted / no chunks / insufficient text) produced no docResult and was not a failedOp, so neither the success nor the dead-letter drain fired. Drain the slot on that terminal skip path. - summary & question: the drain was keyed on the span-error variable, which assumes "err != nil => asynq will retry". Several branches set that variable yet `return nil` (insufficient text content, KB/knowledge fetch failures) - terminal, no retry - so the drain was skipped. Key the drain on the value actually returned to asynq (named retErr) instead, so terminal nil-returns drain and only retried errors wait for the final attempt. Also fix the trace panel header flashing "已完成" mid-wiki: the latest attempt's root span closes while async post-pipeline subspans keep running, so trace.status read terminal while the row was still "finalizing". Prefer parse_status on the latest attempt while it is non-terminal so the panel header, LIVE badge and doc card agree, and add the "finalizing" status label to all locales. --- .../knowledge-processing-timeline.vue | 49 +++++++++++++++++-- frontend/src/i18n/locales/en-US.ts | 1 + frontend/src/i18n/locales/ko-KR.ts | 1 + frontend/src/i18n/locales/ru-RU.ts | 1 + frontend/src/i18n/locales/zh-CN.ts | 1 + .../application/service/knowledge_process.go | 29 +++++++---- .../application/service/wiki_ingest_batch.go | 15 ++++++ 7 files changed, 85 insertions(+), 12 deletions(-) diff --git a/frontend/src/components/knowledge-processing-timeline.vue b/frontend/src/components/knowledge-processing-timeline.vue index 665b5043..cd411811 100644 --- a/frontend/src/components/knowledge-processing-timeline.vue +++ b/frontend/src/components/knowledge-processing-timeline.vue @@ -1105,14 +1105,56 @@ function attemptGlyph(status: string): { ch: string; cls: string } { } } +// True when the panel is showing the most recent attempt (or there's +// only one). Historical attempts must keep their own per-attempt +// trace.status; only the latest attempt's header should defer to the +// knowledge-level parse_status. +const viewingLatestAttempt = computed(() => { + const latest = data.value?.latest_attempt || 0 + if (latest <= 1) return true + const active = selectedAttempt.value ?? data.value?.attempt ?? latest + return active === latest +}) + +// Project the knowledge-level parse_status onto the trace-span status +// vocabulary localizedStatus() speaks, so the header badge reads the +// same whether it comes from the root span or from parse_status. +// 'finalizing' keeps its own label ("优化中") to match the doc card. +function parseStatusToTraceStatus(s?: string): string { + switch (s) { + case 'completed': + return 'done' + case 'processing': + return 'running' + case 'finalizing': + return 'finalizing' + default: + return s || '' + } +} + +// The authoritative status for the header badge. During the async +// post-pipeline window (summary / question / graph / wiki), the latest +// attempt's ROOT span closes — so trace.status reads 'done' — while +// those subspans keep running and the row is still 'finalizing'. +// Trusting trace.status there flashes "已完成" mid-wiki even though the +// doc card (and LIVE badge) still say "优化中". Prefer parse_status while +// it is non-terminal on the latest attempt so all three agree. +const headerStatus = computed(() => { + const parseStatus = data.value?.parse_status + if (viewingLatestAttempt.value && isPolling(parseStatus)) { + return parseStatusToTraceStatus(parseStatus) + } + return data.value?.trace?.status || parseStatusToTraceStatus(parseStatus) +}) + const headerStatusText = computed(() => { - const s = data.value?.trace?.status || data.value?.parse_status || '' + const s = headerStatus.value return s ? localizedStatus(s) : '' }) const headerStatusTheme = computed(() => { - const s = data.value?.trace?.status || data.value?.parse_status || '' - switch (s) { + switch (headerStatus.value) { case 'done': case 'completed': return 'success' @@ -1121,6 +1163,7 @@ const headerStatusTheme = computed(() => { case 'running': case 'processing': case 'pending': + case 'finalizing': return 'warning' default: return 'default' diff --git a/frontend/src/i18n/locales/en-US.ts b/frontend/src/i18n/locales/en-US.ts index 67c0d4d5..1fab158d 100755 --- a/frontend/src/i18n/locales/en-US.ts +++ b/frontend/src/i18n/locales/en-US.ts @@ -475,6 +475,7 @@ export default { status: { pending: 'Pending', running: 'Running', + finalizing: 'Finalizing', done: 'Done', failed: 'Failed', skipped: 'Skipped', diff --git a/frontend/src/i18n/locales/ko-KR.ts b/frontend/src/i18n/locales/ko-KR.ts index 587faefe..2474aca0 100755 --- a/frontend/src/i18n/locales/ko-KR.ts +++ b/frontend/src/i18n/locales/ko-KR.ts @@ -481,6 +481,7 @@ export default { status: { pending: "대기 중", running: "진행 중", + finalizing: "최적화 중", done: "완료", failed: "실패", skipped: "건너뜀", diff --git a/frontend/src/i18n/locales/ru-RU.ts b/frontend/src/i18n/locales/ru-RU.ts index 4579c790..638c294f 100755 --- a/frontend/src/i18n/locales/ru-RU.ts +++ b/frontend/src/i18n/locales/ru-RU.ts @@ -396,6 +396,7 @@ export default { status: { pending: 'Ожидание', running: 'Выполняется', + finalizing: 'Оптимизация', done: 'Готово', failed: 'Ошибка', skipped: 'Пропущено', diff --git a/frontend/src/i18n/locales/zh-CN.ts b/frontend/src/i18n/locales/zh-CN.ts index 20009942..e32f0720 100755 --- a/frontend/src/i18n/locales/zh-CN.ts +++ b/frontend/src/i18n/locales/zh-CN.ts @@ -478,6 +478,7 @@ export default { status: { pending: "等待中", running: "进行中", + finalizing: "优化中", done: "已完成", failed: "失败", skipped: "已跳过", diff --git a/internal/application/service/knowledge_process.go b/internal/application/service/knowledge_process.go index cadc45f2..04b90003 100644 --- a/internal/application/service/knowledge_process.go +++ b/internal/application/service/knowledge_process.go @@ -886,7 +886,7 @@ func sampleLongContent(content string, maxChars int) string { } // ProcessSummaryGeneration handles async summary generation task -func (s *knowledgeService) ProcessSummaryGeneration(ctx context.Context, t *asynq.Task) error { +func (s *knowledgeService) ProcessSummaryGeneration(ctx context.Context, t *asynq.Task) (retErr error) { var payload types.SummaryGenerationPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { logger.Errorf(ctx, "Failed to unmarshal summary generation payload: %v", err) @@ -922,10 +922,16 @@ func (s *knowledgeService) ProcessSummaryGeneration(ctx context.Context, t *asyn var summaryErr error summaryOut := types.JSONMap{} defer func() { - // Decrement the parent's enrichment counter on every terminal - // exit (success OR final retry failure). Intermediate retries - // skip the decrement so the counter cannot drain prematurely. - if (summaryErr == nil || isFinalAsynqAttempt(ctx)) && payload.KnowledgeID != "" { + // Decrement the parent's enrichment counter on terminal exit. + // "Terminal" is keyed on the value RETURNED to asynq, not on + // summaryErr: several branches record a failure on the span + // (summaryErr != nil) yet deliberately `return nil` so asynq does + // NOT retry (e.g. insufficient text content, KB/knowledge fetch + // failures). Those are terminal and must drain — keying on + // summaryErr would skip them and leave the row stuck in + // "finalizing". When we DO return an error asynq will retry, so + // we only drain on the final attempt. + if (retErr == nil || isFinalAsynqAttempt(ctx)) && payload.KnowledgeID != "" { if _, _, ferr := s.repo.FinalizeSubtask(ctx, payload.KnowledgeID); ferr != nil { logger.Warnf(ctx, "summary: FinalizeSubtask failed for %s: %v", payload.KnowledgeID, ferr) } @@ -1181,7 +1187,7 @@ func (s *knowledgeService) ProcessSummaryGeneration(ctx context.Context, t *asyn } // ProcessQuestionGeneration handles async question generation task -func (s *knowledgeService) ProcessQuestionGeneration(ctx context.Context, t *asynq.Task) error { +func (s *knowledgeService) ProcessQuestionGeneration(ctx context.Context, t *asynq.Task) (retErr error) { taskStartedAt := time.Now() retryCount, _ := asynq.GetRetryCount(ctx) maxRetry, _ := asynq.GetMaxRetry(ctx) @@ -1217,11 +1223,16 @@ func (s *knowledgeService) ProcessQuestionGeneration(ctx context.Context, t *asy // FinalizeSubtask decrement so a stale task can't drain the new // attempt's counter. superseded := false - // Decrement enrichment counter on terminal exit (success OR final - // retry failure). Runs AFTER the stats-log defer below — defers + // Decrement enrichment counter on terminal exit. Keyed on the value + // RETURNED to asynq (retErr), not qErr: some branches record a span + // failure (qErr != nil) yet `return nil` so asynq won't retry (KB / + // knowledge fetch failures); those are terminal and must drain. + // Keying on qErr would skip them and strand the row in "finalizing". + // When we return an error asynq retries, so we only drain on the + // final attempt. Runs AFTER the stats-log defer below — defers // unwind LIFO, so this one declared first executes last. defer func() { - if !superseded && (qErr == nil || isFinalAsynqAttempt(ctx)) && payload.KnowledgeID != "" { + if !superseded && (retErr == nil || isFinalAsynqAttempt(ctx)) && payload.KnowledgeID != "" { if _, _, ferr := s.repo.FinalizeSubtask(ctx, payload.KnowledgeID); ferr != nil { logger.Warnf(ctx, "question: FinalizeSubtask failed for %s: %v", payload.KnowledgeID, ferr) } diff --git a/internal/application/service/wiki_ingest_batch.go b/internal/application/service/wiki_ingest_batch.go index 8be3dc85..a12723e1 100644 --- a/internal/application/service/wiki_ingest_batch.go +++ b/internal/application/service/wiki_ingest_batch.go @@ -464,6 +464,21 @@ func (s *wikiIngestService) ProcessWikiIngest(ctx context.Context, t *asynq.Task // scrub. Compare with the legacy Redis path, which kept // a separate wiki:failcount:<...> key alive for 24h // regardless of whether the original op had drained. + // + // The finalizing slot is drained later (after reduce + + // publish) in the docResults loop, so "completed" only + // arrives once wiki is fully written. + } else { + // err == nil && result == nil: mapOneDocument skipped this + // doc at a terminal, non-retryable state (knowledge + // deleted / no chunks / insufficient text). It produces no + // docResult and is not a failedOp, so neither the success + // nor the dead-letter drain path will fire. Release the + // finalizing slot here so the row doesn't hang in + // "finalizing" until the housekeeping sweep marks it + // failed. The matching +1 was seeded by + // KnowledgePostProcess.SetFinalizing. + s.finalizeWikiSubtask(mapCtx, op.KnowledgeID) } return nil })