fix(knowledge): make finalizing subtask counter reliable + async question fan-out

The finalizing subtask counter (pending_subtasks_count) could get stuck at a
non-zero value, leaving knowledge permanently in "finalizing" and never
promoted to "completed". Root cause and related hardening:

- UpdateKnowledge does a full-row Save; pending_subtasks_count was NOT in the
  omit list, so any concurrent enrichment subtask that loaded the row, did slow
  work (LLM call), then saved an unrelated field wrote back the STALE counter,
  clobbering decrements other subtasks performed in between. Add
  PendingSubtasksCount to omitFieldsOnUpdate so only the atomic helpers
  (SetFinalizing / FinalizeSubtask / explicit column writes) ever touch it.
- FinalizeSubtask no longer gates the finalizing->completed promote on a
  separate re-read of the counter. Every caller unconditionally runs the
  guarded promote UPDATE (WHERE pending_subtasks_count = 0), the single
  authoritative atomic check, so a racy/stale read can't strand the row.
- Decrements run on a context detached from the caller's cancellation
  (context.WithoutCancel + timeout) so graceful shutdown / preemption can't
  silently skip a decrement.
- Reconcile seeded vs actually-enqueued subtask slots (summary/question/graph)
  and release any shortfall so un-enqueued planned slots (e.g. graph with
  NEO4J off) don't strand the row.
- Reparse paths reset pending_subtasks_count via an explicit column write since
  full-row Save now omits it.

Question generation is also moved off the synchronous single-task path onto a
batched async fan-out (windows of text chunks), each batch independently
queued / retried / traced, grouped under a postprocess.question span.
This commit is contained in:
wizardchen
2026-05-30 00:09:46 +08:00
committed by lyingbug
parent 9ffb886e40
commit 84bd95275d
8 changed files with 744 additions and 109 deletions

View File

@@ -22,8 +22,21 @@ func escapeLikeKeyword(keyword string) string {
return keyword
}
// omitFieldsOnUpdate defines fields to omit when updating knowledge
var omitFieldsOnUpdate = []string{"DeletedAt"}
// omitFieldsOnUpdate defines fields to omit when updating knowledge.
//
// PendingSubtasksCount is deliberately omitted from every full-row Save:
// it is an orchestration counter owned exclusively by the atomic helpers
// SetFinalizing (seed), FinalizeSubtask (decrement+promote) and the
// explicit UpdateKnowledgeColumns resets (cancel/reparse). A generic
// UpdateKnowledge call persists the WHOLE in-memory struct, so any
// concurrent enrichment subtask that loaded the row, did slow work
// (e.g. an LLM call), then saved an unrelated field would otherwise
// write back the STALE counter it read at load time — clobbering the
// decrements other subtasks performed in the meantime. That made the
// counter jump back up and never reach zero (the "stuck
// pending_subtasks_count / never promoted to completed" bug). Omitting
// the column here means Save can never touch it.
var omitFieldsOnUpdate = []string{"DeletedAt", "PendingSubtasksCount"}
// knowledgeRepository implements knowledge base and knowledge repository interface
type knowledgeRepository struct {
@@ -340,28 +353,18 @@ func (r *knowledgeRepository) FinalizeSubtask(
return 0, false, res.Error
}
// 2) Re-read to discover the new count and current parse_status.
// Reading after the UPDATE gives us a value that is at worst one
// decrement stale relative to other callers — the promote step
// below is guarded by the same condition at SQL level, so the
// stale read only causes us to attempt a no-op promote.
var snap struct {
PendingSubtasksCount int `gorm:"column:pending_subtasks_count"`
ParseStatus string `gorm:"column:parse_status"`
}
if err := r.db.WithContext(ctx).Model(&types.Knowledge{}).
Select("pending_subtasks_count", "parse_status").
Where("id = ?", id).Take(&snap).Error; err != nil {
return 0, false, err
}
if snap.PendingSubtasksCount != 0 || snap.ParseStatus != types.ParseStatusFinalizing {
return snap.PendingSubtasksCount, false, nil
}
// 3) Guarded promote. The WHERE clause ensures only ONE caller wins
// when multiple subtasks decrement to zero in the same instant,
// and that cancel/delete cannot be clobbered by a late promote.
// 2) Guarded promote. EVERY caller unconditionally attempts this after
// decrementing — we must NOT gate it on a separate SELECT of the
// counter. That read can be served by a lagging read-replica (or a
// stale connection snapshot) and return a non-zero value even after
// the counter has truly reached zero on the primary; if every caller
// trusts that stale read, NONE of them runs the promote and the row
// is stranded in `finalizing` forever (the observed "stuck
// pending_subtasks_count" bug). The promote is a WRITE, so it executes
// on the primary and its `pending_subtasks_count = 0` WHERE clause is
// the single authoritative, atomic check on the live row: only the
// caller whose decrement actually brought the counter to zero matches,
// and cancel/delete cannot be clobbered by a late promote.
promoteRes := r.db.WithContext(ctx).Model(&types.Knowledge{}).
Where("id = ? AND parse_status = ? AND pending_subtasks_count = 0",
id, types.ParseStatusFinalizing).
@@ -371,9 +374,23 @@ func (r *knowledgeRepository) FinalizeSubtask(
"updated_at": now,
})
if promoteRes.Error != nil {
return snap.PendingSubtasksCount, false, promoteRes.Error
return 0, false, promoteRes.Error
}
return snap.PendingSubtasksCount, promoteRes.RowsAffected > 0, nil
promoted := promoteRes.RowsAffected > 0
// 3) Best-effort re-read of the new count for diagnostics/return value
// only. This read may be replica-stale and is intentionally NOT used
// to decide whether to promote (see above). A read failure here does
// not affect correctness, so we don't propagate it as an error.
var snap struct {
PendingSubtasksCount int `gorm:"column:pending_subtasks_count"`
}
if err := r.db.WithContext(ctx).Model(&types.Knowledge{}).
Select("pending_subtasks_count").
Where("id = ?", id).Take(&snap).Error; err != nil {
return 0, promoted, nil
}
return snap.PendingSubtasksCount, promoted, nil
}
// SetFinalizing atomically transitions a row from 'processing' to

View File

@@ -80,7 +80,12 @@ Please output in the following format (one paragraph per column):
- Write descriptions in the same language as the data content`
)
// NewChunkExtractTask creates a new chunk extract task
// NewChunkExtractTask creates a new chunk extract task. It returns
// (enqueued, err): enqueued is true only when a task was actually placed on
// the queue. When NEO4J is disabled the call is a no-op and returns
// (false, nil) — callers that seeded a pending-subtask counter for this chunk
// MUST release that slot, otherwise the parent knowledge stays stuck in
// "finalizing" forever (the graph subtask it's waiting on was never enqueued).
func NewChunkExtractTask(
ctx context.Context,
client interfaces.TaskEnqueuer,
@@ -90,10 +95,10 @@ func NewChunkExtractTask(
knowledgeID string,
attempt int,
chunkIndex int,
) error {
) (bool, error) {
if strings.ToLower(os.Getenv("NEO4J_ENABLE")) != "true" {
logger.Warn(ctx, "NEO4J is not enabled, skip chunk extract task")
return nil
return false, nil
}
taskPayload := types.ExtractChunkPayload{
TenantID: tenantID,
@@ -106,16 +111,16 @@ func NewChunkExtractTask(
langfuse.InjectTracing(ctx, &taskPayload)
payload, err := json.Marshal(taskPayload)
if err != nil {
return err
return false, err
}
task := asynq.NewTask(types.TypeChunkExtract, payload, asynq.MaxRetry(3))
info, err := client.Enqueue(task)
if err != nil {
logger.Errorf(ctx, "failed to enqueue task: %v", err)
return fmt.Errorf("failed to enqueue task: %v", err)
return false, fmt.Errorf("failed to enqueue task: %v", err)
}
logger.Infof(ctx, "enqueued task: id=%s queue=%s chunk=%s", info.ID, info.Queue, chunkID)
return nil
return true, nil
}
// NewTableExtractTask creates a new table extract task
@@ -237,11 +242,9 @@ func (s *ChunkExtractService) Handle(ctx context.Context, t *asynq.Task) error {
// completed (or terminally-failed) per-chunk extract releases its
// slot in pending_subtasks_count. KnowledgeID is the new (post-#? )
// payload field; legacy in-flight tasks without it are skipped.
if (handleErr == nil || isFinalAsynqAttempt(ctx)) && p.KnowledgeID != "" && s.knowledgeRepo != nil {
if _, _, ferr := s.knowledgeRepo.FinalizeSubtask(ctx, p.KnowledgeID); ferr != nil {
logger.Warnf(ctx, "graph extract: FinalizeSubtask failed for %s: %v", p.KnowledgeID, ferr)
}
}
finalizeSubtaskDetached(ctx, s.knowledgeRepo, p.KnowledgeID,
fmt.Sprintf("graph_chunk[%d]", p.ChunkIndex),
handleErr, false, isFinalAsynqAttempt(ctx))
if gSpan == nil {
return
}

View File

@@ -7,6 +7,7 @@ import (
"io"
"strings"
"sync"
"time"
"github.com/Tencent/WeKnora/internal/application/repository"
"github.com/Tencent/WeKnora/internal/application/service/retriever"
@@ -183,6 +184,51 @@ func attemptSuperseded(ctx context.Context, tracker SpanTracker, knowledgeID str
return tracker.LatestAttempt(ctx, knowledgeID) > attempt
}
// finalizeSubtaskDetachedTimeout bounds the detached decrement so a wedged DB
// connection can't hang a worker goroutine forever in its terminal defer.
const finalizeSubtaskDetachedTimeout = 10 * time.Second
// finalizeSubtaskDetached evaluates the drain decision for a subtask's
// terminal exit and — when the subtask should drain — decrements
// pending_subtasks_count using a context DETACHED from the caller's
// cancellation.
//
// Decision: a subtask drains exactly once, on its terminal exit, UNLESS a newer
// attempt superseded it. "Terminal" means either the handler succeeded
// (retErr == nil) or it's the final asynq attempt (final). A non-final failure
// returns without draining because asynq will retry.
//
// Why detach: the decrement runs after the handler body, often as the very
// last thing a worker does. If it rode the task ctx, a cancelled ctx (graceful
// shutdown, a worker being preempted, or the task being interrupted under
// load) would make the DB UPDATE fail. That failure is only logged and
// swallowed, and because enrichment handlers frequently still return success
// (per-chunk LLM errors are tolerated, not propagated), asynq never retries —
// so the slot is never drained and the parent knowledge is stranded in
// "finalizing" forever with a non-zero counter. Detaching keeps the counter
// correct across cancellation; a bounded timeout guards against a wedged DB.
//
// source is a free-form tag (e.g. "question_batch[3]", "summary", "wiki")
// used to attribute a decrement failure to a specific subtask in logs.
func finalizeSubtaskDetached(
ctx context.Context,
repo interfaces.KnowledgeRepository,
knowledgeID, source string,
retErr error,
superseded, final bool,
) {
willDrain := repo != nil && knowledgeID != "" && !superseded && (retErr == nil || final)
if !willDrain {
return
}
dctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), finalizeSubtaskDetachedTimeout)
defer cancel()
if _, _, err := repo.FinalizeSubtask(dctx, knowledgeID); err != nil {
logger.Warnf(ctx, "finalize subtask decrement failed source=%s knowledge=%s err=%v",
source, knowledgeID, err)
}
}
// beginStage / endStage / failStage / skipStage are the by-name shims
// the pipeline uses so call sites don't have to thread *Span values
// through the existing function signatures. Each helper looks up the
@@ -262,6 +308,27 @@ func (s *knowledgeService) beginPostprocessSubspan(
return s.tracker().BeginSubSpan(ctx, parent, name, types.SpanKindSubSpan, input)
}
// beginQuestionBatchSubspan opens a per-batch question subspan under the
// "postprocess.question" grouping span created by the orchestrator, falling
// back to the postprocess stage when the group span isn't found (legacy
// in-flight tasks or a tracker that skipped it). Mirrors beginPostprocessSubspan
// but resolves the grouping parent first.
func (s *knowledgeService) beginQuestionBatchSubspan(
ctx context.Context, knowledgeID string, attempt int, name string, input types.JSONMap,
) *Span {
if attempt <= 0 || knowledgeID == "" || name == "" {
return nil
}
parent := s.tracker().LookupSpanByName(ctx, knowledgeID, attempt, postprocessQuestionGroupSpanName)
if parent == nil {
parent = s.tracker().LookupStage(ctx, knowledgeID, attempt, types.StagePostProcess)
}
if parent == nil {
return nil
}
return s.tracker().BeginSubSpan(ctx, parent, name, types.SpanKindSubSpan, input)
}
func (s *knowledgeService) endPostprocessSubspan(ctx context.Context, span *Span, output types.JSONMap) {
if span == nil {
return

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sort"
"time"
"github.com/Tencent/WeKnora/internal/logger"
@@ -156,6 +157,32 @@ func (s *KnowledgePostProcessService) Handle(ctx context.Context, task *asynq.Ta
willSpawnQuestion := willSpawnSummary && kb.NeedsEmbeddingModel() &&
kb.QuestionGenerationConfig != nil && kb.QuestionGenerationConfig.Enabled
willSpawnWiki := kb.IndexingStrategy.WikiEnabled && len(textChunks) > 0
// Question generation now fans out one subtask per plain text chunk
// (mirroring the graph-extract per-chunk pattern) so each chunk's LLM
// call retries / cancels / traces independently. We only target
// ChunkTypeText here — OCR / Caption chunks were never fed to question
// generation in the legacy whole-knowledge loop, so excluding them
// keeps behavior identical. Sorted by StartAt so the per-chunk
// context (prev / next) matches the legacy ordering.
var questionChunks []*types.Chunk
if willSpawnQuestion {
for _, c := range textChunks {
if c.ChunkType == types.ChunkTypeText {
questionChunks = append(questionChunks, c)
}
}
sort.Slice(questionChunks, func(i, j int) bool {
return questionChunks[i].StartAt < questionChunks[j].StartAt
})
}
// Question generation is batched: one subtask per window of
// questionGenChunkBatchSize text chunks (not one per chunk), so a
// huge document doesn't spawn thousands of tiny tasks. The counter
// must match exactly how many batch tasks we enqueue below.
questionBatchCount := (len(questionChunks) + questionGenChunkBatchSize - 1) / questionGenChunkBatchSize
graphChunkCount := 0
if kb.IsGraphEnabled() {
graphChunkCount = len(textChunks)
@@ -164,14 +191,18 @@ func (s *KnowledgePostProcessService) Handle(ctx context.Context, task *asynq.Ta
if willSpawnSummary {
expectedSubtasks++
}
if willSpawnQuestion {
expectedSubtasks++
}
expectedSubtasks += questionBatchCount
if willSpawnWiki {
expectedSubtasks++
}
expectedSubtasks += graphChunkCount
// enteredFinalizing is set only when SetFinalizing actually seeded the
// counter (the promoted branch below). It gates the reconciliation that
// releases planned-but-not-enqueued slots so the row can leave
// "finalizing" — see the note where enqueue actuals are tallied.
enteredFinalizing := false
switch {
case knowledge.ParseStatus != types.ParseStatusProcessing:
// The row was already in some other state (deleting / cancelled /
@@ -216,6 +247,7 @@ func (s *KnowledgePostProcessService) Handle(ctx context.Context, task *asynq.Ta
payload.KnowledgeID, err)
}
if promoted {
enteredFinalizing = true
// Reflect summary status separately so the UI shows the
// summary as queued for users who already had it visible.
summaryStatus := types.SummaryStatusNone
@@ -250,42 +282,97 @@ func (s *KnowledgePostProcessService) Handle(ctx context.Context, task *asynq.Ta
// 4. Spawn Summary and Question Tasks
enqueuedSummary := false
enqueuedQuestion := false
enqueuedQuestionCount := 0
if willSpawnSummary {
s.enqueueSummaryGenerationTask(ctx, payload, attempt)
enqueuedSummary = true
enqueuedSummary = s.enqueueSummaryGenerationTask(ctx, payload, attempt)
if willSpawnQuestion {
enqueuedQuestion = s.enqueueQuestionGenerationIfEnabled(ctx, payload, kb, attempt)
// Create the postprocess.question grouping span up front so the
// per-batch subspans (enqueued just below, run later in their own
// workers) have a parent to nest under. It's begun and ended right
// here as a structural container — the batches extend past it,
// which the timeline renders with the wrapping outline bar.
if grp := s.tracker().BeginSubSpan(ctx, postSpan, postprocessQuestionGroupSpanName,
types.SpanKindSubSpan, types.JSONMap{
"batch_count": questionBatchCount,
"chunk_count": len(questionChunks),
"batch_size": questionGenChunkBatchSize,
}); grp != nil {
s.tracker().EndSpan(ctx, grp, types.JSONMap{
"batch_count": questionBatchCount,
"chunk_count": len(questionChunks),
})
}
enqueuedQuestionCount = s.enqueueQuestionGenerationTasks(ctx, payload, kb, attempt, questionChunks)
}
}
// 5. Spawn Graph RAG Tasks — only when graph indexing is enabled in IndexingStrategy
enqueuedGraph := false
enqueuedGraphCount := 0
if graphChunkCount > 0 {
logger.Infof(ctx, "[KnowledgePostProcess] Spawning Graph RAG extract tasks for %d text-like chunks", len(textChunks))
for i, chunk := range textChunks {
err := NewChunkExtractTask(ctx, s.taskEnqueuer, payload.TenantID, chunk.ID, kb.SummaryModelID,
ok, err := NewChunkExtractTask(ctx, s.taskEnqueuer, payload.TenantID, chunk.ID, kb.SummaryModelID,
payload.KnowledgeID, attempt, i)
if err != nil {
logger.Errorf(ctx, "[KnowledgePostProcess] Failed to create chunk extract task for %s: %v", chunk.ID, err)
}
if ok {
enqueuedGraphCount++
}
}
enqueuedGraph = true
}
// 6. Spawn Wiki Ingest Task if wiki indexing is enabled in IndexingStrategy
// 6. Spawn Wiki Ingest Task if wiki indexing is enabled in IndexingStrategy.
// Wiki is NOT reconciled here: it's a debounced KB-scoped batch whose
// worker calls FinalizeSubtask once when the per-knowledge op reaches a
// terminal state, so its single counted slot drains on its own path.
enqueuedWiki := false
if willSpawnWiki {
EnqueueWikiIngest(ctx, s.taskEnqueuer, s.pendingRepo, payload.TenantID, payload.KnowledgeBaseID, payload.KnowledgeID)
logger.Infof(ctx, "[KnowledgePostProcess] Enqueued wiki ingest task for %s", payload.KnowledgeID)
enqueuedWiki = true
}
// Reconcile the seeded counter against what was actually enqueued.
// summary/question/graph each own a counted slot that ONLY their own
// task drains; a slot whose task was never enqueued (graph with NEO4J
// off, a transient enqueue/marshal failure, a nil enqueuer) has no owner
// and would otherwise strand the row in "finalizing". Release exactly the
// shortfall — each release is a clamped decrement that promotes the row to
// "completed" if it brings the counter to zero. Wiki is excluded (see
// above). Safe against fast workers: shortfall slots have no draining
// task, so total drains == seeded count regardless of ordering.
if enteredFinalizing {
plannedOwned := questionBatchCount + graphChunkCount
if willSpawnSummary {
plannedOwned++
}
actualOwned := enqueuedQuestionCount + enqueuedGraphCount
if enqueuedSummary {
actualOwned++
}
if shortfall := plannedOwned - actualOwned; shortfall > 0 {
logger.Warnf(ctx,
"[KnowledgePostProcess] Releasing %d un-enqueued subtask slot(s) for %s (planned=%d actual=%d)",
shortfall, payload.KnowledgeID, plannedOwned, actualOwned)
for i := 0; i < shortfall; i++ {
if _, _, err := s.knowledgeRepo.FinalizeSubtask(ctx, payload.KnowledgeID); err != nil {
logger.Warnf(ctx, "[KnowledgePostProcess] Failed to release subtask slot for %s: %v",
payload.KnowledgeID, err)
break
}
}
}
}
postOutput := types.JSONMap{
"chunks_total": len(textChunks),
"enqueued_summary": enqueuedSummary,
"enqueued_question": enqueuedQuestion,
"enqueued_wiki": enqueuedWiki,
"enqueued_graph": enqueuedGraph,
"chunks_total": len(textChunks),
"enqueued_summary": enqueuedSummary,
"enqueued_question": enqueuedQuestionCount > 0,
"enqueued_question_count": enqueuedQuestionCount,
"enqueued_wiki": enqueuedWiki,
"enqueued_graph": enqueuedGraphCount > 0,
"enqueued_graph_count": enqueuedGraphCount,
}
s.tracker().EndSpan(ctx, postSpan, postOutput)
// Close the root span — the parse pipeline is done. Async
@@ -298,9 +385,12 @@ func (s *KnowledgePostProcessService) Handle(ctx context.Context, task *asynq.Ta
return nil
}
func (s *KnowledgePostProcessService) enqueueSummaryGenerationTask(ctx context.Context, payload types.KnowledgePostProcessPayload, attempt int) {
// enqueueSummaryGenerationTask enqueues the summary task. Returns true only
// when a task was actually placed on the queue, so the caller can release the
// seeded pending-subtask slot when enqueue is skipped or fails.
func (s *KnowledgePostProcessService) enqueueSummaryGenerationTask(ctx context.Context, payload types.KnowledgePostProcessPayload, attempt int) bool {
if s.taskEnqueuer == nil {
return
return false
}
taskPayload := types.SummaryGenerationPayload{
@@ -314,24 +404,54 @@ func (s *KnowledgePostProcessService) enqueueSummaryGenerationTask(ctx context.C
payloadBytes, err := json.Marshal(taskPayload)
if err != nil {
logger.Warnf(ctx, "[KnowledgePostProcess] Failed to marshal summary generation payload: %v", err)
return
return false
}
task := asynq.NewTask(types.TypeSummaryGeneration, payloadBytes, asynq.Queue("low"), asynq.MaxRetry(3))
if _, err := s.taskEnqueuer.Enqueue(task); err != nil {
logger.Warnf(ctx, "[KnowledgePostProcess] Failed to enqueue summary generation for %s: %v", payload.KnowledgeID, err)
} else {
logger.Infof(ctx, "[KnowledgePostProcess] Enqueued summary generation task for %s", payload.KnowledgeID)
return false
}
logger.Infof(ctx, "[KnowledgePostProcess] Enqueued summary generation task for %s", payload.KnowledgeID)
return true
}
func (s *KnowledgePostProcessService) enqueueQuestionGenerationIfEnabled(ctx context.Context, payload types.KnowledgePostProcessPayload, kb *types.KnowledgeBase, attempt int) bool {
if s.taskEnqueuer == nil {
return false
}
// questionGenChunkBatchSize is the number of text chunks handled by a single
// question-generation task. Batching keeps the task count bounded for very
// large documents (a 5k-chunk doc becomes ~250 tasks instead of 5k) while
// preserving per-batch retry / cancellation granularity and letting each task
// do one embedding BatchIndex over the whole batch.
const questionGenChunkBatchSize = 20
// postprocessQuestionGroupSpanName is the grouping span the per-batch
// question subspans (postprocess.question.batch[i]) nest under, so the trace
// viewer shows one "postprocess.question" node instead of dozens of siblings
// directly beneath the postprocess stage.
const postprocessQuestionGroupSpanName = "postprocess.question"
// enqueueQuestionGenerationTasks fans out one TypeQuestionGeneration task per
// batch of questionGenChunkBatchSize text chunks. Each task carries only chunk
// ids (+ the adjacent boundary ids for context) — never the chunk content — so
// the payload stays small and the worker reads fresh content at run time,
// matching the ExtractChunkPayload precedent.
//
// Returns the number of batch tasks successfully enqueued. NOTE: the batch
// count was already added to pending_subtasks_count by the caller, so an
// enqueue failure here leaves that slot undrained; like the graph-extract
// loop, this is intentionally left to the housekeeping finalizing sweep
// rather than special-cased, keeping the fan-out paths consistent.
func (s *KnowledgePostProcessService) enqueueQuestionGenerationTasks(
ctx context.Context,
payload types.KnowledgePostProcessPayload,
kb *types.KnowledgeBase,
attempt int,
questionChunks []*types.Chunk,
) int {
if s.taskEnqueuer == nil || len(questionChunks) == 0 {
return 0
}
if kb.QuestionGenerationConfig == nil || !kb.QuestionGenerationConfig.Enabled {
return false
return 0
}
questionCount := kb.QuestionGenerationConfig.QuestionCount
@@ -342,26 +462,54 @@ func (s *KnowledgePostProcessService) enqueueQuestionGenerationIfEnabled(ctx con
questionCount = 10
}
taskPayload := types.QuestionGenerationPayload{
TenantID: payload.TenantID,
KnowledgeBaseID: payload.KnowledgeBaseID,
KnowledgeID: payload.KnowledgeID,
QuestionCount: questionCount,
Language: payload.Language,
Attempt: attempt,
}
langfuse.InjectTracing(ctx, &taskPayload)
payloadBytes, err := json.Marshal(taskPayload)
if err != nil {
logger.Warnf(ctx, "[KnowledgePostProcess] Failed to marshal question generation payload: %v", err)
return false
}
total := len(questionChunks)
enqueued := 0
batchIndex := 0
for start := 0; start < total; start += questionGenChunkBatchSize {
end := start + questionGenChunkBatchSize
if end > total {
end = total
}
batch := questionChunks[start:end]
chunkIDs := make([]string, len(batch))
for i, c := range batch {
chunkIDs[i] = c.ID
}
task := asynq.NewTask(types.TypeQuestionGeneration, payloadBytes, asynq.Queue("low"), asynq.MaxRetry(3))
if _, err := s.taskEnqueuer.Enqueue(task); err != nil {
logger.Warnf(ctx, "[KnowledgePostProcess] Failed to enqueue question generation for %s: %v", payload.KnowledgeID, err)
return false
taskPayload := types.QuestionGenerationPayload{
TenantID: payload.TenantID,
KnowledgeBaseID: payload.KnowledgeBaseID,
KnowledgeID: payload.KnowledgeID,
QuestionCount: questionCount,
Language: payload.Language,
Attempt: attempt,
ChunkIDs: chunkIDs,
BatchIndex: batchIndex,
}
// Boundary context: the text chunk just before / after this window.
if start > 0 {
taskPayload.PrevChunkID = questionChunks[start-1].ID
}
if end < total {
taskPayload.NextChunkID = questionChunks[end].ID
}
batchIndex++
langfuse.InjectTracing(ctx, &taskPayload)
payloadBytes, err := json.Marshal(taskPayload)
if err != nil {
logger.Warnf(ctx, "[KnowledgePostProcess] Failed to marshal question generation payload for batch %d: %v", batchIndex-1, err)
continue
}
task := asynq.NewTask(types.TypeQuestionGeneration, payloadBytes, asynq.Queue("low"), asynq.MaxRetry(3))
if _, err := s.taskEnqueuer.Enqueue(task); err != nil {
logger.Warnf(ctx, "[KnowledgePostProcess] Failed to enqueue question generation batch %d for %s: %v", batchIndex-1, payload.KnowledgeID, err)
continue
}
enqueued++
}
logger.Infof(ctx, "[KnowledgePostProcess] Enqueued question generation task for %s (count=%d)", payload.KnowledgeID, questionCount)
return true
logger.Infof(ctx, "[KnowledgePostProcess] Enqueued %d question generation batch tasks (%d chunks, batch_size=%d) for %s (count=%d)",
enqueued, total, questionGenChunkBatchSize, payload.KnowledgeID, questionCount)
return enqueued
}

View File

@@ -931,11 +931,8 @@ func (s *knowledgeService) ProcessSummaryGeneration(ctx context.Context, t *asyn
// summaryErr would skip them and leave the row stuck in
// "finalizing". When we DO return an error asynq will retry, so
// we only drain on the final attempt.
if (retErr == nil || isFinalAsynqAttempt(ctx)) && payload.KnowledgeID != "" {
if _, _, ferr := s.repo.FinalizeSubtask(ctx, payload.KnowledgeID); ferr != nil {
logger.Warnf(ctx, "summary: FinalizeSubtask failed for %s: %v", payload.KnowledgeID, ferr)
}
}
finalizeSubtaskDetached(ctx, s.repo, payload.KnowledgeID, "summary",
retErr, false, isFinalAsynqAttempt(ctx))
if span == nil {
return
}
@@ -1186,13 +1183,32 @@ func (s *knowledgeService) ProcessSummaryGeneration(ctx context.Context, t *asyn
return nil
}
// ProcessQuestionGeneration handles async question generation task
// ProcessQuestionGeneration handles async question generation task. It
// dispatches between the batched fan-out path (current: one task per window of
// text chunks, payload.ChunkIDs set) and the legacy whole-knowledge path (kept
// for tasks enqueued before fan-out shipped, no chunk ids). A lone ChunkID
// (from an interim per-chunk build) is treated as a one-element batch.
func (s *knowledgeService) ProcessQuestionGeneration(ctx context.Context, t *asynq.Task) (retErr error) {
var payload types.QuestionGenerationPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
logger.Errorf(ctx, "Failed to unmarshal question generation payload: %v", err)
return nil // Don't retry on unmarshal error
}
if len(payload.ChunkIDs) > 0 || payload.ChunkID != "" {
return s.processQuestionGenerationForChunks(ctx, t, payload)
}
return s.processQuestionGenerationForKnowledge(ctx, t, payload)
}
// processQuestionGenerationForKnowledge is the legacy whole-knowledge handler:
// it iterates every text chunk of the knowledge in one task. Retained for
// in-flight tasks queued before per-chunk fan-out; new enqueues always set
// payload.ChunkID and take the per-chunk path instead.
func (s *knowledgeService) processQuestionGenerationForKnowledge(ctx context.Context, t *asynq.Task, payload types.QuestionGenerationPayload) (retErr error) {
taskStartedAt := time.Now()
retryCount, _ := asynq.GetRetryCount(ctx)
maxRetry, _ := asynq.GetMaxRetry(ctx)
var payload types.QuestionGenerationPayload
exitStatus := "success"
totalChunks := 0
totalTextChunks := 0
@@ -1232,11 +1248,8 @@ func (s *knowledgeService) ProcessQuestionGeneration(ctx context.Context, t *asy
// final attempt. Runs AFTER the stats-log defer below — defers
// unwind LIFO, so this one declared first executes last.
defer func() {
if !superseded && (retErr == nil || isFinalAsynqAttempt(ctx)) && payload.KnowledgeID != "" {
if _, _, ferr := s.repo.FinalizeSubtask(ctx, payload.KnowledgeID); ferr != nil {
logger.Warnf(ctx, "question: FinalizeSubtask failed for %s: %v", payload.KnowledgeID, ferr)
}
}
finalizeSubtaskDetached(ctx, s.repo, payload.KnowledgeID, "question_legacy",
retErr, superseded, isFinalAsynqAttempt(ctx))
}()
defer func() {
logger.Infof(
@@ -1307,12 +1320,6 @@ func (s *knowledgeService) ProcessQuestionGeneration(ctx context.Context, t *asy
}
}()
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
exitStatus = "invalid_payload"
logger.Errorf(ctx, "Failed to unmarshal question generation payload: %v", err)
return nil // Don't retry on unmarshal error
}
logger.Infof(ctx, "Processing question generation for knowledge: %s", payload.KnowledgeID)
// A newer attempt has superseded this one: skip before opening the span
@@ -1552,6 +1559,317 @@ func (s *knowledgeService) ProcessQuestionGeneration(ctx context.Context, t *asy
return nil
}
// processQuestionGenerationForChunks generates questions for a batch (window)
// of text chunks. This is the batched fan-out path (one asynq task per
// questionGenChunkBatchSize chunks), aligned with the graph-extract
// TypeChunkExtract pattern: independent retry, per-batch cancellation, and a
// postprocess.question.batch[i] subspan. The payload carries only chunk ids
// (never content); content is read fresh here, and all questions for the batch
// are indexed in a single embedding BatchIndex call.
func (s *knowledgeService) processQuestionGenerationForChunks(ctx context.Context, t *asynq.Task, payload types.QuestionGenerationPayload) (retErr error) {
taskStartedAt := time.Now()
retryCount, _ := asynq.GetRetryCount(ctx)
maxRetry, _ := asynq.GetMaxRetry(ctx)
// Normalize the batch: prefer ChunkIDs, fall back to a lone ChunkID
// (interim per-chunk build) so those in-flight tasks still run.
batchIDs := payload.ChunkIDs
if len(batchIDs) == 0 && payload.ChunkID != "" {
batchIDs = []string{payload.ChunkID}
}
exitStatus := "success"
chunksInBatch := len(batchIDs)
chunksProcessed := 0
emptyChunks := 0
llmCallFailed := 0
generatedQuestionsTotal := 0
indexEntriesPrepared := 0
indexBatchSucceeded := false
var sampleQuestion string
var resolvedModelID string
var qSpan *Span
var qErr error
// Suppresses the FinalizeSubtask drain when a newer attempt superseded
// this run, so a stale task can't decrement the new attempt's counter.
superseded := false
ctx = context.WithValue(ctx, types.TenantIDContextKey, payload.TenantID)
if payload.Language != "" {
ctx = context.WithValue(ctx, types.LanguageContextKey, payload.Language)
}
// Drain the parent's enrichment counter on terminal exit. Keyed on the
// value RETURNED to asynq (retErr), not qErr: some branches record a
// span failure yet `return nil` (terminal, must drain). Declared first
// so it runs LAST (after the stats/span defer below).
defer func() {
finalizeSubtaskDetached(ctx, s.repo, payload.KnowledgeID,
fmt.Sprintf("question_batch[%d]", payload.BatchIndex),
retErr, superseded, isFinalAsynqAttempt(ctx))
}()
defer func() {
logger.Infof(ctx,
"Question generation (batch) stats: knowledge=%s batch=%d chunks(in_batch=%d,processed=%d,empty=%d) llm_failed=%d retry=%d/%d status=%s elapsed=%s generated_questions=%d index(entries=%d,succeeded=%v)",
payload.KnowledgeID, payload.BatchIndex, chunksInBatch, chunksProcessed, emptyChunks, llmCallFailed,
retryCount, maxRetry, exitStatus, time.Since(taskStartedAt).Round(time.Millisecond),
generatedQuestionsTotal, indexEntriesPrepared, indexBatchSucceeded,
)
if qSpan != nil {
out := types.JSONMap{
"status": exitStatus,
"batch_index": payload.BatchIndex,
"chunks_in_batch": chunksInBatch,
"chunks_processed": chunksProcessed,
"empty_chunks": emptyChunks,
"llm_failed": llmCallFailed,
"questions_generated": generatedQuestionsTotal,
"index_entries_prepared": indexEntriesPrepared,
"index_batch_succeeded": indexBatchSucceeded,
"retry": retryCount,
"max_retry": maxRetry,
}
if resolvedModelID != "" {
out["model_id"] = resolvedModelID
}
if sampleQuestion != "" {
out["sample_question"] = sampleQuestion
}
if exitStatus != "success" || qErr != nil {
msg := exitStatus
if qErr != nil {
msg = qErr.Error()
}
s.failPostprocessSubspan(ctx, qSpan, "QUESTION_FAILED", msg, qErr)
} else {
s.endPostprocessSubspan(ctx, qSpan, out)
}
}
}()
logger.Infof(ctx, "Processing question generation for knowledge=%s batch=%d chunks=%d",
payload.KnowledgeID, payload.BatchIndex, chunksInBatch)
if chunksInBatch == 0 {
exitStatus = "empty_batch"
return nil
}
// A newer attempt has superseded this one: skip before opening the span
// so we don't read stale chunks and don't drain the new attempt.
if attemptSuperseded(ctx, s.tracker(), payload.KnowledgeID, payload.Attempt) {
superseded = true
exitStatus = "superseded"
logger.Infof(ctx, "question: attempt %d superseded for %s, skipping stale enrichment",
payload.Attempt, payload.KnowledgeID)
return nil
}
qSpan = s.beginQuestionBatchSubspan(ctx, payload.KnowledgeID, payload.Attempt,
fmt.Sprintf("postprocess.question.batch[%d]", payload.BatchIndex),
types.JSONMap{
"batch_index": payload.BatchIndex,
"chunks": chunksInBatch,
"question_count": payload.QuestionCount,
"language": payload.Language,
})
if strings.TrimSpace(s.config.Conversation.GenerateQuestionsPrompt) == "" {
exitStatus = "prompt_not_configured"
logger.Errorf(ctx, "GenerateQuestionsPrompt is empty: configure conversation.generate_questions_prompt_id")
qErr = fmt.Errorf("generate questions prompt not configured")
return qErr
}
kb, err := s.kbService.GetKnowledgeBaseByID(ctx, payload.KnowledgeBaseID)
if err != nil {
exitStatus = "kb_not_found"
logger.Errorf(ctx, "Failed to get knowledge base: %v", err)
qErr = err
return nil
}
knowledge, err := s.repo.GetKnowledgeByID(ctx, payload.TenantID, payload.KnowledgeID)
if err != nil {
exitStatus = "knowledge_not_found"
logger.Errorf(ctx, "Failed to get knowledge: %v", err)
qErr = err
return nil
}
// Short-circuit when the user cancelled parsing or the row is being
// deleted — batched fan-out means we get this check for free on every
// batch, so a cancel stops burning LLM quota on the remaining batches.
if knowledge != nil {
switch knowledge.ParseStatus {
case types.ParseStatusCancelled, types.ParseStatusDeleting:
exitStatus = "knowledge_" + knowledge.ParseStatus
logger.Infof(ctx, "Question generation: knowledge aborted (%s), skipping batch %d",
knowledge.ParseStatus, payload.BatchIndex)
return nil
}
}
chatModel, err := s.modelService.GetChatModel(ctx, kb.SummaryModelID)
if err != nil {
exitStatus = "get_chat_model_failed"
logger.Errorf(ctx, "Failed to get chat model: %v", err)
return fmt.Errorf("failed to get chat model: %w", err)
}
resolvedModelID = kb.SummaryModelID
embeddingModel, err := s.modelService.GetEmbeddingModel(ctx, kb.EmbeddingModelID)
if err != nil {
exitStatus = "get_embedding_model_failed"
logger.Errorf(ctx, "Failed to get embedding model: %v", err)
return fmt.Errorf("failed to get embedding model: %w", err)
}
tenantInfo, err := s.tenantRepo.GetTenantByID(ctx, payload.TenantID)
if err != nil {
exitStatus = "get_tenant_failed"
logger.Errorf(ctx, "Failed to get tenant info: %v", err)
return fmt.Errorf("failed to get tenant info: %w", err)
}
ctx = context.WithValue(ctx, types.TenantInfoContextKey, tenantInfo)
retrieveEngine, err := retriever.CreateRetrieveEngineForKB(
ctx, s.retrieveEngine, s.ownership, tenantInfo.ID, kb.VectorStoreID)
if err != nil {
exitStatus = "init_retrieve_engine_failed"
logger.Errorf(ctx, "Failed to init retrieve engine: %v", err)
return fmt.Errorf("failed to init retrieve engine: %w", err)
}
questionCount := payload.QuestionCount
if questionCount <= 0 {
questionCount = 3
}
if questionCount > 10 {
questionCount = 10
}
// Fetch the batch chunks (in payload order) plus the two boundary
// neighbors so we can rebuild the same surrounding context the legacy
// loop used, all enriched with image OCR / caption info. A vanished
// chunk degrades gracefully (skipped / empty context).
getChunk := func(id string) *types.Chunk {
if id == "" {
return nil
}
c, gerr := s.chunkRepo.GetChunkByID(ctx, payload.TenantID, id)
if gerr != nil {
return nil
}
return c
}
batchChunks := make([]*types.Chunk, len(batchIDs))
for i, id := range batchIDs {
batchChunks[i] = getChunk(id)
}
prevChunk := getChunk(payload.PrevChunkID)
nextChunk := getChunk(payload.NextChunkID)
infoIDs := make([]string, 0, len(batchIDs)+2)
infoIDs = append(infoIDs, batchIDs...)
if payload.PrevChunkID != "" {
infoIDs = append(infoIDs, payload.PrevChunkID)
}
if payload.NextChunkID != "" {
infoIDs = append(infoIDs, payload.NextChunkID)
}
imageInfoMap := searchutil.CollectImageInfoByChunkIDs(ctx, s.chunkRepo, payload.TenantID, infoIDs)
enrich := func(c *types.Chunk) string {
if c == nil {
return ""
}
if info, ok := imageInfoMap[c.ID]; ok && info != "" {
return searchutil.EnrichContentWithImageInfo(c.Content, info)
}
return c.Content
}
// neighborContent returns the context content for position i within the
// batch: the in-batch neighbor when present, else the boundary chunk.
prevContentAt := func(i int) string {
if i > 0 {
return enrich(batchChunks[i-1])
}
return enrich(prevChunk)
}
nextContentAt := func(i int) string {
if i < len(batchChunks)-1 {
return enrich(batchChunks[i+1])
}
return enrich(nextChunk)
}
var indexInfoList []*types.IndexInfo
for i, chunk := range batchChunks {
if chunk == nil || strings.TrimSpace(chunk.Content) == "" {
emptyChunks++
continue
}
questions, gerr := s.generateQuestionsWithContext(
ctx, chatModel, enrich(chunk), prevContentAt(i), nextContentAt(i), knowledge.Title, questionCount)
if gerr != nil {
llmCallFailed++
logger.Warnf(ctx, "Failed to generate questions for chunk %s: %v", chunk.ID, gerr)
continue
}
if len(questions) == 0 {
continue
}
chunksProcessed++
generatedQuestionsTotal += len(questions)
if sampleQuestion == "" {
sampleQuestion = previewText(questions[0], 200)
}
generatedQuestions := make([]types.GeneratedQuestion, len(questions))
for j, question := range questions {
generatedQuestions[j] = types.GeneratedQuestion{
ID: fmt.Sprintf("q%d", time.Now().UnixNano()+int64(j)),
Question: question,
}
}
meta := &types.DocumentChunkMetadata{GeneratedQuestions: generatedQuestions}
if err := chunk.SetDocumentMetadata(meta); err != nil {
logger.Warnf(ctx, "Failed to set document metadata for chunk %s: %v", chunk.ID, err)
continue
}
if err := s.chunkService.UpdateChunk(ctx, chunk); err != nil {
logger.Warnf(ctx, "Failed to update chunk %s: %v", chunk.ID, err)
continue
}
for _, gq := range generatedQuestions {
indexInfoList = append(indexInfoList, &types.IndexInfo{
Content: gq.Question,
SourceID: fmt.Sprintf("%s-%s", chunk.ID, gq.ID),
SourceType: types.ChunkSourceType,
ChunkID: chunk.ID,
KnowledgeID: knowledge.ID,
KnowledgeBaseID: knowledge.KnowledgeBaseID,
IsEnabled: true,
})
}
}
indexEntriesPrepared = len(indexInfoList)
if len(indexInfoList) > 0 {
if err := retrieveEngine.BatchIndex(ctx, embeddingModel, indexInfoList); err != nil {
exitStatus = "index_questions_failed"
qErr = err
logger.Errorf(ctx, "Failed to index generated questions for batch %d: %v", payload.BatchIndex, err)
return fmt.Errorf("failed to index questions: %w", err)
}
indexBatchSucceeded = true
logger.Infof(ctx, "Indexed %d generated questions for knowledge=%s batch=%d",
len(indexInfoList), payload.KnowledgeID, payload.BatchIndex)
}
return nil
}
// generateQuestionsWithContext generates questions for a chunk with surrounding context
func (s *knowledgeService) generateQuestionsWithContext(ctx context.Context,
chatModel chat.Chat, content, prevContent, nextContent, docName string, questionCount int,
@@ -1680,13 +1998,20 @@ func (s *knowledgeService) ReparseKnowledge(ctx context.Context, knowledgeID str
existing.EmbeddingModelID = kb.EmbeddingModelID
// Reset the enrichment counter so a leftover value from a
// previous attempt (e.g. cancelled before all subtasks decremented)
// cannot block the new finalizing transition later.
// cannot block the new finalizing transition later. This must be
// an explicit column write: UpdateKnowledge (full-row Save) omits
// pending_subtasks_count, so the struct assignment alone would not
// persist.
existing.PendingSubtasksCount = 0
if err := s.repo.UpdateKnowledge(ctx, existing); err != nil {
logger.Errorf(ctx, "Failed to update knowledge status before reparse: %v", err)
return nil, err
}
if err := s.repo.UpdateKnowledgeColumn(ctx, existing.ID, "pending_subtasks_count", 0); err != nil {
logger.Errorf(ctx, "Failed to reset pending_subtasks_count before reparse: %v", err)
return nil, err
}
if err := s.enqueueManualProcessing(ctx, existing, meta.Content, true); err != nil {
logger.Errorf(ctx, "Failed to enqueue manual reparse task: %v", err)
@@ -1713,13 +2038,20 @@ func (s *knowledgeService) ReparseKnowledge(ctx context.Context, knowledgeID str
existing.ProcessedAt = nil
existing.EmbeddingModelID = kb.EmbeddingModelID
// Reset the enrichment counter so a leftover value from a previous
// attempt cannot block the new finalizing transition later.
// attempt cannot block the new finalizing transition later. This must
// be an explicit column write: UpdateKnowledge (full-row Save) omits
// pending_subtasks_count, so the struct assignment alone would not
// persist.
existing.PendingSubtasksCount = 0
if err := s.repo.UpdateKnowledge(ctx, existing); err != nil {
logger.Errorf(ctx, "Failed to update knowledge status before reparse: %v", err)
return nil, err
}
if err := s.repo.UpdateKnowledgeColumn(ctx, existing.ID, "pending_subtasks_count", 0); err != nil {
logger.Errorf(ctx, "Failed to reset pending_subtasks_count before reparse: %v", err)
return nil, err
}
// Step 3: Trigger async re-parsing based on knowledge type
logger.Infof(ctx, "Knowledge status updated, scheduling async reparse, ID: %s, Type: %s", existing.ID, existing.Type)

View File

@@ -94,6 +94,13 @@ type SpanTracker interface {
// created by the upstream pipeline.
LookupStage(ctx context.Context, knowledgeID string, attempt int, stage string) *Span
// LookupSpanByName returns the first span of any kind matching name
// for (knowledgeID, attempt) — the cross-process bridge that lets a
// fan-out worker (e.g. a question-generation batch) attach its subspan
// under a grouping span created earlier by the orchestrator. Returns
// nil when no such span exists (caller should fall back to the stage).
LookupSpanByName(ctx context.Context, knowledgeID string, attempt int, name string) *Span
// FinalizeAttempt closes the root span for (knowledgeID, attempt)
// with the given terminal status (done | failed). Idempotent:
// re-closing an already-terminal root is a no-op so callers from
@@ -547,6 +554,38 @@ func (t *spanTracker) LookupStage(ctx context.Context, knowledgeID string, attem
return nil
}
func (t *spanTracker) LookupSpanByName(ctx context.Context, knowledgeID string, attempt int, name string) *Span {
if name == "" || knowledgeID == "" || attempt <= 0 {
return nil
}
rows, err := t.repo.ListByAttempt(ctx, knowledgeID, attempt)
if err != nil {
logger.Warnf(ctx, "[SpanTracker] LookupSpanByName list failed kid=%s attempt=%d: %v",
knowledgeID, attempt, err)
return nil
}
for i := range rows {
r := rows[i]
if r.Name != name {
continue
}
started := time.Time{}
if r.StartedAt != nil {
started = *r.StartedAt
}
return &Span{
KnowledgeID: r.KnowledgeID,
Attempt: r.Attempt,
SpanID: r.SpanID,
ParentSpanID: r.ParentSpanID,
Name: r.Name,
Kind: r.Kind,
StartedAt: started,
}
}
return nil
}
// cascadeDependentStages flips downstream STAGE rows to "cancelled" using
// types.StageDependencies. Without this, a Chunking failure leaves
// Embedding / Multimodal as "pending" forever, even though they cannot
@@ -794,6 +833,9 @@ func (noopSpanTracker) EndSpan(_ context.Context, _ *Span, _ types.JSONMap)
func (noopSpanTracker) FailSpan(_ context.Context, _ *Span, _, _ string, _ error) {}
func (noopSpanTracker) SkipSpan(_ context.Context, _ *Span, _ string) {}
func (noopSpanTracker) LookupStage(_ context.Context, _ string, _ int, _ string) *Span { return nil }
func (noopSpanTracker) LookupSpanByName(_ context.Context, _ string, _ int, _ string) *Span {
return nil
}
func (noopSpanTracker) FinalizeAttempt(_ context.Context, _ string, _ int, _ string, _ types.JSONMap, _, _ string) {
}
func (noopSpanTracker) AbortAttempt(_ context.Context, _ string, _ int, _, _, _ string) {}

View File

@@ -521,12 +521,11 @@ func (s *wikiIngestService) trimPendingList(ctx context.Context, ids []int64) {
// the promote (parse_status = finalizing AND count = 0), so an op enqueued
// before this accounting shipped is a harmless no-op.
func (s *wikiIngestService) finalizeWikiSubtask(ctx context.Context, knowledgeID string) {
if s.knowledgeRepo == nil || knowledgeID == "" {
return
}
if _, _, err := s.knowledgeRepo.FinalizeSubtask(ctx, knowledgeID); err != nil {
logger.Warnf(ctx, "wiki ingest: FinalizeSubtask failed for %s: %v", knowledgeID, err)
}
// Wiki is only finalized when its op reaches a terminal state, so this is
// always an intended drain (retErr=nil, final=true). Detached context: the
// wiki batch worker may be mid-shutdown or have a cancelled ctx when this
// runs; a swallowed failure would strand the parent in "finalizing".
finalizeSubtaskDetached(ctx, s.knowledgeRepo, knowledgeID, "wiki", nil, false, true)
}
// requeueFailedOps records in-batch failures.

View File

@@ -93,6 +93,33 @@ type QuestionGenerationPayload struct {
// tasks queued before this field shipped, or callers without a
// tracker).
Attempt int `json:"attempt,omitempty"`
// ChunkIDs switches the handler into batched fan-out mode: the task
// generates questions for this ordered window of text chunks only.
// Batching (rather than one task per chunk) keeps the task count
// bounded for very large documents, while still giving each batch
// independent retry / cancellation / tracing and letting the worker
// do a single embedding BatchIndex per batch. Empty means the legacy
// whole-knowledge mode (kept for in-flight tasks queued before fan-out
// shipped), where the handler iterates all text chunks itself.
// Following the ExtractChunkPayload precedent, we carry only chunk ids
// (not their content) so the payload stays small and the worker reads
// fresh content at run time.
ChunkIDs []string `json:"chunk_ids,omitempty"`
// ChunkID is the single-chunk variant of ChunkIDs, retained only so
// tasks enqueued by an interim per-chunk build still run (treated as a
// one-element batch). New enqueues use ChunkIDs.
ChunkID string `json:"chunk_id,omitempty"`
// BatchIndex is the 0-based ordinal of this batch inside the parent
// knowledge's text-chunk set, used as the subspan name suffix
// ("postprocess.question.batch[3]") so the timeline preserves order.
BatchIndex int `json:"batch_index,omitempty"`
// PrevChunkID / NextChunkID are the text chunks (by StartAt) just
// outside this batch window, computed at enqueue time so the worker can
// rebuild the same surrounding context the legacy whole-knowledge loop
// used at the batch boundaries, without re-listing every chunk of the
// knowledge. Empty when the batch is at a document boundary.
PrevChunkID string `json:"prev_chunk_id,omitempty"`
NextChunkID string `json:"next_chunk_id,omitempty"`
}
// SummaryGenerationPayload represents the summary generation task payload