refactor(knowledge): replace flat stage table with langfuse-style span tree

Addresses review feedback that the PR② design had four shortcomings:

  1. The pipeline is a DAG, not a sequence — Embedding and Multimodal
     are independent of each other, both downstream of Chunking, both
     upstream of PostProcess. The flat (knowledge_id, stage) table
     couldn't represent that, so a Chunking failure left dependents
     stranded as "pending" forever instead of being marked as
     impossible-to-run.

  2. No history across attempts. A reparse erased the previous run's
     status before the new run started, leaving operators with no way
     to investigate "why did this fail twice?".

  3. Stages had only status + duration. Operators want to know how big
     the work was — pages parsed, chunks created, tokens embedded, VLM
     calls made — to distinguish "slow because the file is huge" from
     "slow because the docreader is wedged".

  4. Multimodal fans out N image tasks; Embedding fans out M batches;
     PostProcess fans out into Summary/Question/Wiki/Graph. Each unit
     is interesting on its own (Langfuse already captures this for
     LLM calls). The flat model couldn't express it.

The redesign mirrors Langfuse's trace/span/generation hierarchy:

  * Migration 000053 supersedes 000052: knowledge_processing_spans
    table with (knowledge_id, attempt, span_id) primary key, plus
    parent_span_id, kind ∈ {root, stage, subspan, generation},
    status ∈ {pending,running,done,failed,skipped,cancelled}, and
    JSONB input/output/metadata fields.

  * SpanTracker (replacing StageTracker) exposes OpenAttempt /
    BeginStage / BeginSubSpan / EndSpan / FailSpan / SkipSpan /
    LookupStage. Cross-process workers (image_multimodal) get the
    parent's attempt + span via payload + LookupStage so subspans
    attach correctly.

  * StageDependencies declares the DAG; FailSpan now cascades —
    descendants of the failed span and dependent stages are flipped
    to "cancelled" with a UPSTREAM_FAILED code. The UI sees a clear
    blast radius instead of orphan spinners.

  * Reparse now calls OpenAttempt up front so the timeline reflects
    "new attempt, all pending" instead of letting the previous run's
    status linger until the worker picks up the task.

  * Image_multimodal records each image as a generation subspan with
    its own success/failure on the parent attempt's multimodal stage.
    The finalize-on-last-attempt counter logic is preserved unchanged.

  * GET /api/v1/knowledge/:id/spans (also kept /stages alias) returns
    a tree shape with synthesized pending placeholders so the
    frontend always renders five timeline segments. ?attempt=N
    enables history navigation.
This commit is contained in:
wizardchen
2026-05-27 14:05:47 +08:00
committed by lyingbug
parent 04f56f9cda
commit c9941a6688
17 changed files with 1369 additions and 438 deletions

View File

@@ -0,0 +1,175 @@
package repository
import (
"context"
"errors"
"github.com/Tencent/WeKnora/internal/types"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// KnowledgeSpanRepository persists the per-attempt span tree used by the
// processing pipeline. Operations are deliberately narrow:
//
// - Upsert covers Begin/End/Fail/Skip — every state transition routes
// through the same write so the row stays internally consistent.
// - NextAttempt allocates a new attempt for re-parses without touching
// historical rows. Old attempts stay queryable for post-mortem.
// - ListByAttempt is the only read path; the handler builds the tree
// in memory rather than recursing through the DB.
type KnowledgeSpanRepository interface {
Upsert(ctx context.Context, row *types.KnowledgeProcessingSpan) error
NextAttempt(ctx context.Context, knowledgeID string) (int, error)
LatestAttempt(ctx context.Context, knowledgeID string) (int, error)
ListByAttempt(ctx context.Context, knowledgeID string, attempt int) ([]types.KnowledgeProcessingSpan, error)
GetSpan(ctx context.Context, knowledgeID string, attempt int, spanID string) (*types.KnowledgeProcessingSpan, error)
// CancelDescendants marks every descendant of a parent span as
// "cancelled" with the given reason. Used by the tracker to
// cascade an upstream failure across a stage's downstream subtree
// without iterating in Go memory.
CancelDescendants(ctx context.Context, knowledgeID string, attempt int, parentSpanID, reason string) (int64, error)
}
type knowledgeSpanRepository struct {
db *gorm.DB
}
// NewKnowledgeSpanRepository wires the GORM-backed implementation.
func NewKnowledgeSpanRepository(db *gorm.DB) KnowledgeSpanRepository {
return &knowledgeSpanRepository{db: db}
}
func (r *knowledgeSpanRepository) Upsert(ctx context.Context, row *types.KnowledgeProcessingSpan) error {
if row == nil || row.KnowledgeID == "" || row.SpanID == "" {
return errors.New("knowledgeSpanRepository.Upsert: knowledge_id and span_id required")
}
if row.Attempt == 0 {
row.Attempt = 1
}
// We let GORM populate created_at/updated_at via the autoCreate /
// autoUpdate tags. ON CONFLICT updates only the fields that may
// transition between calls — name/kind/parent are immutable once
// set so we don't list them in DoUpdates (saves a few bytes per
// write, and any mismatch indicates a programming error).
return r.db.WithContext(ctx).Clauses(clause.OnConflict{
Columns: []clause.Column{
{Name: "knowledge_id"},
{Name: "attempt"},
{Name: "span_id"},
},
DoUpdates: clause.AssignmentColumns([]string{
"status",
"input",
"output",
"metadata",
"error_code",
"error_message",
"error_detail",
"started_at",
"finished_at",
"duration_ms",
"updated_at",
}),
}).Create(row).Error
}
func (r *knowledgeSpanRepository) NextAttempt(ctx context.Context, knowledgeID string) (int, error) {
var max int
err := r.db.WithContext(ctx).Model(&types.KnowledgeProcessingSpan{}).
Where("knowledge_id = ?", knowledgeID).
Select("COALESCE(MAX(attempt), 0)").
Row().Scan(&max)
if err != nil {
return 0, err
}
return max + 1, nil
}
func (r *knowledgeSpanRepository) LatestAttempt(ctx context.Context, knowledgeID string) (int, error) {
var max int
err := r.db.WithContext(ctx).Model(&types.KnowledgeProcessingSpan{}).
Where("knowledge_id = ?", knowledgeID).
Select("COALESCE(MAX(attempt), 0)").
Row().Scan(&max)
return max, err
}
func (r *knowledgeSpanRepository) ListByAttempt(ctx context.Context, knowledgeID string, attempt int) ([]types.KnowledgeProcessingSpan, error) {
if knowledgeID == "" {
return nil, nil
}
var rows []types.KnowledgeProcessingSpan
q := r.db.WithContext(ctx).Where("knowledge_id = ?", knowledgeID)
if attempt > 0 {
q = q.Where("attempt = ?", attempt)
}
// id ASC keeps the natural insertion order — useful for stable
// rendering of fan-out subspans (e.g. multimodal.image[0..N] in
// the order they were enqueued).
err := q.Order("id ASC").Find(&rows).Error
return rows, err
}
func (r *knowledgeSpanRepository) GetSpan(ctx context.Context, knowledgeID string, attempt int, spanID string) (*types.KnowledgeProcessingSpan, error) {
var row types.KnowledgeProcessingSpan
err := r.db.WithContext(ctx).
Where("knowledge_id = ? AND attempt = ? AND span_id = ?", knowledgeID, attempt, spanID).
Take(&row).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, err
}
return &row, nil
}
// CancelDescendants performs an iterative SQL walk: each level we update
// every row whose parent_span_id is in the previous level's span_id set,
// flipping pending/running rows to cancelled. We bail when a level adds
// zero rows (fixed point reached) or after a generous depth bound.
//
// Postgres-specific WITH RECURSIVE would be denser but harder to test on
// the SQLite Lite backend. The iterative path stays portable.
func (r *knowledgeSpanRepository) CancelDescendants(ctx context.Context, knowledgeID string, attempt int, parentSpanID, reason string) (int64, error) {
frontier := []string{parentSpanID}
var totalAffected int64
for depth := 0; depth < 16 && len(frontier) > 0; depth++ {
var nextFrontier []string
// Find children of every span currently on the frontier
// that are still in a non-terminal state — terminal rows
// (done/failed/skipped/cancelled) are left as-is so the UI
// can still see their original outcome.
var children []types.KnowledgeProcessingSpan
err := r.db.WithContext(ctx).
Where("knowledge_id = ? AND attempt = ? AND parent_span_id IN ? AND status IN ?",
knowledgeID, attempt, frontier,
[]string{types.SpanStatusPending, types.SpanStatusRunning}).
Find(&children).Error
if err != nil {
return totalAffected, err
}
if len(children) == 0 {
break
}
ids := make([]string, 0, len(children))
for _, c := range children {
ids = append(ids, c.SpanID)
nextFrontier = append(nextFrontier, c.SpanID)
}
res := r.db.WithContext(ctx).Model(&types.KnowledgeProcessingSpan{}).
Where("knowledge_id = ? AND attempt = ? AND span_id IN ?", knowledgeID, attempt, ids).
Updates(map[string]any{
"status": types.SpanStatusCancelled,
"error_code": "UPSTREAM_FAILED",
"error_message": reason,
})
if res.Error != nil {
return totalAffected, res.Error
}
totalAffected += res.RowsAffected
frontier = nextFrontier
}
return totalAffected, nil
}

View File

@@ -1,110 +0,0 @@
package repository
import (
"context"
"time"
"github.com/Tencent/WeKnora/internal/types"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// KnowledgeStageRepository persists per-stage progress for a Knowledge.
//
// The repo is intentionally narrow: only the operations the tracker
// service needs (upsert, list, mark all skipped). We don't expose a
// generic Update — every state transition goes through one of the
// purpose-built methods so the audit trail is consistent.
type KnowledgeStageRepository interface {
// Upsert inserts or updates a stage row by (knowledge_id, stage).
// On conflict, ALL provided columns are overwritten and the
// attempt counter is incremented. The caller fills in the row
// fields it wants to set and leaves the rest at their zero values.
Upsert(ctx context.Context, row *types.KnowledgeProcessingStage) error
// ListByKnowledge returns every stage row for the given knowledge,
// ordered by the canonical stage list. Missing stages are NOT
// synthesized here — the API layer decides whether to fill them
// with "pending" placeholders.
ListByKnowledge(ctx context.Context, knowledgeID string) ([]types.KnowledgeProcessingStage, error)
// MarkPendingAll resets every stage row for a knowledge to
// "pending" with a fresh attempt number. Called at the start of
// re-parsing so a previous run's "failed" badge doesn't linger
// while the retry is in flight.
MarkPendingAll(ctx context.Context, knowledgeID string) error
}
type knowledgeStageRepository struct {
db *gorm.DB
}
// NewKnowledgeStageRepository wires the GORM-backed implementation.
func NewKnowledgeStageRepository(db *gorm.DB) KnowledgeStageRepository {
return &knowledgeStageRepository{db: db}
}
func (r *knowledgeStageRepository) Upsert(ctx context.Context, row *types.KnowledgeProcessingStage) error {
if row == nil || row.KnowledgeID == "" || row.Stage == "" {
return nil
}
now := time.Now()
row.UpdatedAt = now
if row.CreatedAt.IsZero() {
row.CreatedAt = now
}
if row.Attempt == 0 {
row.Attempt = 1
}
// ON CONFLICT (knowledge_id, stage) bumps attempt and overwrites
// state. We must also explicitly null-out finished_at / duration_ms
// when the new state is "running" — otherwise stale completion
// timestamps from a previous attempt leak into the timeline.
return r.db.WithContext(ctx).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "knowledge_id"}, {Name: "stage"}},
DoUpdates: clause.Assignments(map[string]interface{}{
"status": row.Status,
"started_at": row.StartedAt,
"finished_at": row.FinishedAt,
"duration_ms": row.DurationMs,
"error_code": row.ErrorCode,
"error_message": row.ErrorMessage,
"error_detail": row.ErrorDetail,
"attempt": gorm.Expr("knowledge_processing_stages.attempt + ?", 1),
"updated_at": now,
}),
}).Create(row).Error
}
func (r *knowledgeStageRepository) ListByKnowledge(ctx context.Context, knowledgeID string) ([]types.KnowledgeProcessingStage, error) {
if knowledgeID == "" {
return nil, nil
}
var rows []types.KnowledgeProcessingStage
err := r.db.WithContext(ctx).
Where("knowledge_id = ?", knowledgeID).
Find(&rows).Error
return rows, err
}
func (r *knowledgeStageRepository) MarkPendingAll(ctx context.Context, knowledgeID string) error {
if knowledgeID == "" {
return nil
}
now := time.Now()
// Reset every existing stage. We don't pre-create rows for stages
// that haven't been touched yet — those will be added lazily by
// the tracker on first Begin call. This keeps the table small for
// knowledge that gets created but never parsed.
return r.db.WithContext(ctx).Model(&types.KnowledgeProcessingStage{}).
Where("knowledge_id = ?", knowledgeID).
Updates(map[string]interface{}{
"status": types.ProcessingStagePending,
"started_at": nil,
"finished_at": nil,
"duration_ms": 0,
"error_code": "",
"error_message": "",
"error_detail": "",
"attempt": gorm.Expr("attempt + ?", 1),
"updated_at": now,
}).Error
}

View File

@@ -70,6 +70,10 @@ type ImageMultimodalService struct {
// tenant's StorageEngineConfig.MinIO is empty). Mirrors the write-side
// fallback in knowledgeService.resolveFileService.
fileSvc interfaces.FileService
// spanTracker records this image's subspan under the parent attempt's
// multimodal stage. nil-safe — falls back to no-op via tracker().
spanTracker SpanTracker
}
func NewImageMultimodalService(
@@ -84,6 +88,7 @@ func NewImageMultimodalService(
taskEnqueuer interfaces.TaskEnqueuer,
redisClient *redis.Client,
fileSvc interfaces.FileService,
spanTracker SpanTracker,
) interfaces.TaskHandler {
return &ImageMultimodalService{
chunkService: chunkService,
@@ -97,9 +102,19 @@ func NewImageMultimodalService(
taskEnqueuer: taskEnqueuer,
redisClient: redisClient,
fileSvc: fileSvc,
spanTracker: spanTracker,
}
}
// tracker returns a usable SpanTracker — falls back to a no-op when the
// service was constructed without one.
func (s *ImageMultimodalService) tracker() SpanTracker {
if s.spanTracker == nil {
return noopSpanTracker{}
}
return s.spanTracker
}
// Handle implements asynq handler for TypeImageMultimodal.
func (s *ImageMultimodalService) Handle(ctx context.Context, task *asynq.Task) error {
var payload types.ImageMultimodalPayload
@@ -115,6 +130,26 @@ func (s *ImageMultimodalService) Handle(ctx context.Context, task *asynq.Task) e
ctx = context.WithValue(ctx, types.LanguageContextKey, payload.Language)
}
// Open a per-image subspan under the parent attempt's multimodal
// stage. If the parent stage row is missing (legacy in-flight
// task, or the upstream code shipped without span tracking), the
// tracker is a no-op so we silently fall back to the existing
// counter-based finalize semantics.
tracker := s.tracker()
var imgSpan *Span
if payload.Attempt > 0 {
parent := tracker.LookupStage(ctx, payload.KnowledgeID, payload.Attempt, types.StageMultimodal)
if parent != nil {
name := fmt.Sprintf("multimodal.image[%d]", payload.ImageIndex)
imgSpan = tracker.BeginSubSpan(ctx, parent, name, types.SpanKindGeneration, types.JSONMap{
"image_url": payload.ImageURL,
"image_source_type": payload.ImageSourceType,
"enable_ocr": payload.EnableOCR,
"enable_caption": payload.EnableCaption,
})
}
}
// finalize-once semantics: on success we always decrement the parent's
// pending counter. On failure we only decrement when this is the last
// asynq retry, so a permanently-failing single image cannot leave the
@@ -123,6 +158,22 @@ func (s *ImageMultimodalService) Handle(ctx context.Context, task *asynq.Task) e
// so we don't double-count and prematurely trigger post-process.
var handleErr error
defer func() {
// Finalize the image subspan with the actual outcome — not the
// finalize-counter outcome. The counter logic counts a "tried"
// image regardless of inner success; the span surface tells the
// UI whether THIS specific image worked.
if imgSpan != nil {
if handleErr == nil {
tracker.EndSpan(ctx, imgSpan, types.JSONMap{
"chunk_id": payload.ChunkID,
})
} else if isFinalAsynqAttempt(ctx) {
tracker.FailSpan(ctx, imgSpan,
"MULTIMODAL_VLM_FAILED",
handleErr.Error(),
handleErr)
}
}
if handleErr == nil || isFinalAsynqAttempt(ctx) {
s.checkAndFinalizeAllImages(ctx, payload)
} else {

View File

@@ -65,11 +65,11 @@ type knowledgeService struct {
wikiRepo interfaces.WikiPageRepository
wikiService interfaces.WikiPageService
// stageTracker records per-stage progress for the parsing pipeline.
// Best-effort: a nil tracker (e.g. test harness) is safely handled
// because every call site goes through the StageTracker interface
// which has a no-op fallback. See knowledge_stage_tracker.go.
stageTracker StageTracker
// spanTracker records the per-attempt span tree for the parsing
// pipeline. Best-effort: a nil tracker (test harness) is safely
// handled because the public surface is the SpanTracker interface,
// which has a no-op fallback. See knowledge_span_tracker.go.
spanTracker SpanTracker
}
const (
@@ -102,7 +102,7 @@ func NewKnowledgeService(
wikiRepo interfaces.WikiPageRepository,
wikiService interfaces.WikiPageService,
taskPendingRepo interfaces.TaskPendingOpsRepository,
stageTracker StageTracker,
spanTracker SpanTracker,
) (interfaces.KnowledgeService, error) {
return &knowledgeService{
config: config,
@@ -127,18 +127,96 @@ func NewKnowledgeService(
wikiRepo: wikiRepo,
wikiService: wikiService,
taskPendingRepo: taskPendingRepo,
stageTracker: stageTracker,
spanTracker: spanTracker,
}, nil
}
// stage returns a usable StageTracker — falls back to a no-op when the
// tracker returns a usable SpanTracker — falls back to a no-op when the
// service was constructed without one (test harness, lite mode w/o repo).
// All pipeline call sites go through this so they never need a nil check.
func (s *knowledgeService) stage() StageTracker {
if s.stageTracker == nil {
return noopStageTracker{}
func (s *knowledgeService) tracker() SpanTracker {
if s.spanTracker == nil {
return noopSpanTracker{}
}
return s.stageTracker
return s.spanTracker
}
// attemptCtxKey scopes the per-task attempt number to a single execution.
// Set once at the start of ProcessDocument / ProcessManualUpdate /
// KnowledgePostProcess so every nested tracker call within the same task
// can locate the right attempt without threading it through signatures.
type attemptCtxKey struct{}
// withAttempt returns a child ctx tagged with the given attempt number.
// Pass through every call site that may invoke the tracker.
func withAttempt(ctx context.Context, attempt int) context.Context {
if attempt <= 0 {
return ctx
}
return context.WithValue(ctx, attemptCtxKey{}, attempt)
}
// attemptFromCtx extracts the attempt number stored by withAttempt;
// returns 0 when missing (legacy paths or tests). Tracker call sites
// treat 0 as "skip recording" since we have no attempt to anchor under.
func attemptFromCtx(ctx context.Context) int {
if v, ok := ctx.Value(attemptCtxKey{}).(int); ok {
return v
}
return 0
}
// beginStage / endStage / failStage / skipStage are the by-name shims
// the pipeline uses so call sites don't have to thread *Span values
// through the existing function signatures. Each helper looks up the
// stage from (kid, attempt-from-ctx, stageName) at write time — costs
// one extra DB read per terminal transition (≤ a dozen per knowledge),
// which is dwarfed by the work the stages themselves do.
func (s *knowledgeService) beginStage(ctx context.Context, kid, name string, input types.JSONMap) {
a := attemptFromCtx(ctx)
if a <= 0 {
return
}
s.tracker().BeginStage(ctx, kid, a, name, input)
}
func (s *knowledgeService) endStage(ctx context.Context, kid, name string, output types.JSONMap) {
a := attemptFromCtx(ctx)
if a <= 0 {
return
}
span := s.tracker().LookupStage(ctx, kid, a, name)
if span == nil {
return
}
s.tracker().EndSpan(ctx, span, output)
}
func (s *knowledgeService) failStage(ctx context.Context, kid, name, code, msg string, err error) {
a := attemptFromCtx(ctx)
if a <= 0 {
return
}
span := s.tracker().LookupStage(ctx, kid, a, name)
if span == nil {
return
}
s.tracker().FailSpan(ctx, span, code, msg, err)
}
func (s *knowledgeService) skipStage(ctx context.Context, kid, name, reason string) {
a := attemptFromCtx(ctx)
if a <= 0 {
return
}
span := s.tracker().LookupStage(ctx, kid, a, name)
if span == nil {
// No begin recorded — synthesize a span row for skipped state.
// Use BeginStage with no input then SkipSpan to keep schema
// invariants (started_at / kind set).
span = s.tracker().BeginStage(ctx, kid, a, name, nil)
}
s.tracker().SkipSpan(ctx, span, reason)
}
// getParserEngineOverridesFromContext returns parser engine overrides from tenant in context (e.g. MinerU endpoint, API key).

View File

@@ -23,7 +23,7 @@ type KnowledgePostProcessService struct {
taskEnqueuer interfaces.TaskEnqueuer
pendingRepo interfaces.TaskPendingOpsRepository
redisClient *redis.Client
stageTracker StageTracker
spanTracker SpanTracker
}
func NewKnowledgePostProcessService(
@@ -33,7 +33,7 @@ func NewKnowledgePostProcessService(
taskEnqueuer interfaces.TaskEnqueuer,
pendingRepo interfaces.TaskPendingOpsRepository,
redisClient *redis.Client,
stageTracker StageTracker,
spanTracker SpanTracker,
) interfaces.TaskHandler {
return &KnowledgePostProcessService{
knowledgeRepo: knowledgeRepo,
@@ -42,15 +42,15 @@ func NewKnowledgePostProcessService(
taskEnqueuer: taskEnqueuer,
pendingRepo: pendingRepo,
redisClient: redisClient,
stageTracker: stageTracker,
spanTracker: spanTracker,
}
}
func (s *KnowledgePostProcessService) tracker() StageTracker {
if s.stageTracker == nil {
return noopStageTracker{}
func (s *KnowledgePostProcessService) tracker() SpanTracker {
if s.spanTracker == nil {
return noopSpanTracker{}
}
return s.stageTracker
return s.spanTracker
}
// Handle implements asynq handler for TypeKnowledgePostProcess.
@@ -67,14 +67,25 @@ func (s *KnowledgePostProcessService) Handle(ctx context.Context, task *asynq.Ta
ctx = context.WithValue(ctx, types.LanguageContextKey, payload.Language)
}
s.tracker().Begin(ctx, payload.KnowledgeID, types.ProcessingStagePostProcess)
// Resolve attempt: payload carries it from the upstream stage, but
// fall back to the latest known attempt for compatibility with
// in-flight tasks queued before this code shipped.
attempt := payload.Attempt
if attempt <= 0 {
attempt = s.tracker().LatestAttempt(ctx, payload.KnowledgeID)
}
// Track Multimodal completion: by the time we reach post-process,
// any per-image work has either finished or been counted by
// finalize-on-last-attempt (see image_multimodal.go). If the parent
// has reached this stage at all, multimodal effectively succeeded
// from the parent's perspective even if individual images failed.
s.tracker().Done(ctx, payload.KnowledgeID, types.ProcessingStageMultimodal)
// Close the multimodal stage span (parent enqueued it as "running"
// and we never see the per-image fan-in here other than by reaching
// post-process). If the parent skipped multimodal entirely, the
// stage row will already be in "skipped" state and EndSpan is a
// no-op for missing rows.
if mm := s.tracker().LookupStage(ctx, payload.KnowledgeID, attempt, types.StageMultimodal); mm != nil &&
mm.Kind == types.SpanKindStage {
s.tracker().EndSpan(ctx, mm, nil)
}
postSpan := s.tracker().BeginStage(ctx, payload.KnowledgeID, attempt, types.StagePostProcess, nil)
// 1. Fetch Knowledge and KB
knowledge, err := s.knowledgeRepo.GetKnowledgeByIDOnly(ctx, payload.KnowledgeID)
@@ -151,7 +162,9 @@ func (s *KnowledgePostProcessService) Handle(ctx context.Context, task *asynq.Ta
EnqueueWikiIngest(ctx, s.taskEnqueuer, s.pendingRepo, payload.TenantID, payload.KnowledgeBaseID, payload.KnowledgeID)
logger.Infof(ctx, "[KnowledgePostProcess] Enqueued wiki ingest task for %s", payload.KnowledgeID)
}
s.tracker().Done(ctx, payload.KnowledgeID, types.ProcessingStagePostProcess)
s.tracker().EndSpan(ctx, postSpan, types.JSONMap{
"chunks_total": len(textChunks),
})
return nil
}

View File

@@ -466,24 +466,24 @@ func (s *knowledgeService) processChunks(ctx context.Context,
// Chunks are needed for wiki generation, graph extraction, and summary generation
// even when vector/keyword indexing is disabled.
span.AddEvent("create chunks")
s.stage().Begin(ctx, knowledge.ID, types.ProcessingStageChunking)
s.beginStage(ctx, knowledge.ID, types.StageChunking, nil)
if err := s.chunkService.CreateChunks(ctx, insertChunks); err != nil {
knowledge.ParseStatus = types.ParseStatusFailed
knowledge.ErrorMessage = err.Error()
knowledge.UpdatedAt = time.Now()
s.repo.UpdateKnowledge(ctx, knowledge)
span.RecordError(err)
s.stage().Fail(ctx, knowledge.ID, types.ProcessingStageChunking,
s.failStage(ctx, knowledge.ID, types.StageChunking,
werrors.ErrCodeChunkingFailed, "create chunks failed", err)
return
}
s.stage().Done(ctx, knowledge.ID, types.ProcessingStageChunking)
s.endStage(ctx, knowledge.ID, types.StageChunking, nil)
// Create index information and perform vector indexing — only when vector/keyword is enabled.
// Chunks are ALWAYS saved to DB (above) because wiki and graph need them even without vector indexing.
var totalStorageSize int64
if kb.NeedsEmbeddingModel() && embeddingModel != nil {
s.stage().Begin(ctx, knowledge.ID, types.ProcessingStageEmbedding)
s.beginStage(ctx, knowledge.ID, types.StageEmbedding, nil)
// Create index information — only for child/flat chunks, NOT parent chunks.
// Parent chunks are stored for context retrieval but do not need vector embeddings.
// Prepend the document title to improve semantic alignment between
@@ -571,12 +571,12 @@ func (s *knowledgeService) processChunks(ctx context.Context,
if isLikelyRateLimitError(err) {
code = werrors.ErrCodeEmbeddingRateLimit
}
s.stage().Fail(ctx, knowledge.ID, types.ProcessingStageEmbedding,
s.failStage(ctx, knowledge.ID, types.StageEmbedding,
code, "batch index failed", err)
return
}
logger.GetLogger(ctx).Infof("processChunks batch index successfully, with %d index", len(indexInfoList))
s.stage().Done(ctx, knowledge.ID, types.ProcessingStageEmbedding)
s.endStage(ctx, knowledge.ID, types.StageEmbedding, nil)
// Final check before marking as completed - if deleted during processing, don't update status
if s.isKnowledgeDeleting(ctx, knowledge.TenantID, knowledge.ID) {
@@ -593,7 +593,7 @@ func (s *knowledgeService) processChunks(ctx context.Context,
}
} else {
logger.Infof(ctx, "Vector/keyword indexing disabled for KB %s, skipping BatchIndex", kb.ID)
s.stage().Skip(ctx, knowledge.ID, types.ProcessingStageEmbedding)
s.skipStage(ctx, knowledge.ID, types.StageEmbedding, "skipped")
}
// Check if this document has extracted images that will be processed asynchronously
@@ -617,10 +617,10 @@ func (s *knowledgeService) processChunks(ctx context.Context,
// Enqueue multimodal tasks for images (async, non-blocking)
if options.EnableMultimodel && len(options.StoredImages) > 0 {
s.stage().Begin(ctx, knowledge.ID, types.ProcessingStageMultimodal)
s.beginStage(ctx, knowledge.ID, types.StageMultimodal, nil)
s.enqueueImageMultimodalTasks(ctx, knowledge, kb, options.StoredImages, chunks, options.Metadata)
} else {
s.stage().Skip(ctx, knowledge.ID, types.ProcessingStageMultimodal)
s.skipStage(ctx, knowledge.ID, types.StageMultimodal, "skipped")
// If there are no multimodal tasks, enqueue the post process task immediately
lang, _ := types.LanguageFromContext(ctx)
postProcessPayload := types.KnowledgePostProcessPayload{
@@ -628,6 +628,7 @@ func (s *knowledgeService) processChunks(ctx context.Context,
KnowledgeID: knowledge.ID,
KnowledgeBaseID: knowledge.KnowledgeBaseID,
Language: lang,
Attempt: attemptFromCtx(ctx),
}
langfuse.InjectTracing(ctx, &postProcessPayload)
payloadBytes, err := json.Marshal(postProcessPayload)
@@ -1395,6 +1396,19 @@ func (s *knowledgeService) ReparseKnowledge(ctx context.Context, knowledgeID str
return nil, err
}
// Allocate a fresh span tree attempt up front. Doing this BEFORE
// the cleanup + enqueue means: (a) the UI immediately sees a new
// attempt with all five stages back to "pending" instead of the
// previous run's "failed" badge lingering; (b) the worker's
// fallback path won't double-allocate when payload.Attempt is
// already set on the queued task.
reparseAttempt := 0
if root, n, err := s.tracker().OpenAttempt(ctx, existing.ID, ""); err == nil && root != nil {
reparseAttempt = n
} else if err != nil {
logger.Warnf(ctx, "[Reparse] OpenAttempt failed for %s: %v (will fall back in worker)", existing.ID, err)
}
// Get knowledge base configuration
kb, err := s.kbService.GetKnowledgeBaseByID(ctx, existing.KnowledgeBaseID)
if err != nil {
@@ -1493,6 +1507,7 @@ func (s *knowledgeService) ReparseKnowledge(ctx context.Context, knowledgeID str
EnableQuestionGeneration: enableQuestionGeneration,
QuestionCount: questionCount,
Language: lang,
Attempt: reparseAttempt,
}
langfuse.InjectTracing(ctx, &taskPayload)
@@ -1546,6 +1561,7 @@ func (s *knowledgeService) ReparseKnowledge(ctx context.Context, knowledgeID str
EnableQuestionGeneration: enableQuestionGeneration,
QuestionCount: questionCount,
Language: lang,
Attempt: reparseAttempt,
}
langfuse.InjectTracing(ctx, &taskPayload)
@@ -1592,6 +1608,7 @@ func (s *knowledgeService) ReparseKnowledge(ctx context.Context, knowledgeID str
EnableQuestionGeneration: enableQuestionGeneration,
QuestionCount: questionCount,
Language: lang,
Attempt: reparseAttempt,
}
langfuse.InjectTracing(ctx, &taskPayload)
@@ -1997,6 +2014,20 @@ func (s *knowledgeService) ProcessDocument(ctx context.Context, t *asynq.Task) e
return nil
}
// Resolve the attempt for span tracking. The enqueue site sets
// payload.Attempt to a fresh number for the initial parse and to
// max+1 for each user-initiated reparse. Asynq retries within a
// single user action keep the same payload (so retries record
// onto the same attempt). For payloads predating this code we
// fall back to OpenAttempt.
attempt := payload.Attempt
if attempt <= 0 {
if root, n, err := s.tracker().OpenAttempt(ctx, knowledge.ID, payload.LangfuseTraceID); err == nil && root != nil {
attempt = n
}
}
ctx = withAttempt(ctx, attempt)
// 检查多模态配置(仅对文件导入)
if payload.FilePath != "" && !payload.EnableMultimodel && IsImageType(payload.FileType) {
logger.GetLogger(ctx).WithField("knowledge_id", knowledge.ID).
@@ -2305,7 +2336,7 @@ func (s *knowledgeService) convert(
// up — before that, the stage stays "pending" from the initial
// upload. Failure/skip transitions are emitted at the specific
// failure points below; success is emitted at the bottom.
s.stage().Begin(ctx, knowledge.ID, types.ProcessingStageDocReader)
s.beginStage(ctx, knowledge.ID, types.StageDocReader, nil)
isURL := payload.URL != ""
fileType := payload.FileType
overrides := s.getParserEngineOverridesFromContext(ctx)
@@ -2317,7 +2348,7 @@ func (s *knowledgeService) convert(
knowledge.ErrorMessage = "URL is not allowed for security reasons"
knowledge.UpdatedAt = time.Now()
s.repo.UpdateKnowledge(ctx, knowledge)
s.stage().Fail(ctx, knowledge.ID, types.ProcessingStageDocReader,
s.failStage(ctx, knowledge.ID, types.StageDocReader,
werrors.ErrCodeDocReaderParseFailed, "URL rejected for security reasons", err)
return nil, nil
}
@@ -2339,7 +2370,7 @@ func (s *knowledgeService) convert(
knowledge.ErrorMessage = "Document parsing service is not configured. Please use text/paragraph import or set DOCREADER_ADDR."
knowledge.UpdatedAt = time.Now()
s.repo.UpdateKnowledge(ctx, knowledge)
s.stage().Fail(ctx, knowledge.ID, types.ProcessingStageDocReader,
s.failStage(ctx, knowledge.ID, types.StageDocReader,
werrors.ErrCodeDocReaderUnavailable, knowledge.ErrorMessage, nil)
return nil, nil
}
@@ -2355,14 +2386,14 @@ func (s *knowledgeService) convert(
if !isURL {
fileReader, err := s.resolveFileServiceForPath(ctx, kb, payload.FilePath).GetFile(ctx, payload.FilePath)
if err != nil {
s.stage().Fail(ctx, knowledge.ID, types.ProcessingStageDocReader,
s.failStage(ctx, knowledge.ID, types.StageDocReader,
werrors.ErrCodeDocReaderParseFailed, "failed to get file", err)
return s.failKnowledge(ctx, knowledge, isLastRetry, "failed to get file: %v", err)
}
defer fileReader.Close()
contentBytes, err := io.ReadAll(fileReader)
if err != nil {
s.stage().Fail(ctx, knowledge.ID, types.ProcessingStageDocReader,
s.failStage(ctx, knowledge.ID, types.StageDocReader,
werrors.ErrCodeDocReaderParseFailed, "failed to read file", err)
return s.failKnowledge(ctx, knowledge, isLastRetry, "failed to read file: %v", err)
}
@@ -2380,7 +2411,7 @@ func (s *knowledgeService) convert(
if errors.Is(err, context.DeadlineExceeded) || strings.Contains(err.Error(), "docreader call timeout") {
code = werrors.ErrCodeDocReaderTimeout
}
s.stage().Fail(ctx, knowledge.ID, types.ProcessingStageDocReader,
s.failStage(ctx, knowledge.ID, types.StageDocReader,
code, "document read failed", err)
return s.failKnowledge(ctx, knowledge, isLastRetry, "document read failed: %v", err)
}
@@ -2391,11 +2422,11 @@ func (s *knowledgeService) convert(
knowledge.ErrorMessage = result.Error
knowledge.UpdatedAt = time.Now()
s.repo.UpdateKnowledge(ctx, knowledge)
s.stage().Fail(ctx, knowledge.ID, types.ProcessingStageDocReader,
s.failStage(ctx, knowledge.ID, types.StageDocReader,
werrors.ErrCodeDocReaderParseFailed, result.Error, nil)
return nil, nil
}
s.stage().Done(ctx, knowledge.ID, types.ProcessingStageDocReader)
s.endStage(ctx, knowledge.ID, types.StageDocReader, nil)
return result, nil
}
@@ -2517,6 +2548,7 @@ func (s *knowledgeService) enqueueImageMultimodalTasks(
return
}
attempt := attemptFromCtx(ctx)
redisKey := fmt.Sprintf("multimodal:pending:%s", knowledge.ID)
if s.redisClient != nil {
if err := s.redisClient.Set(ctx, redisKey, len(images), 24*time.Hour).Err(); err != nil {
@@ -2524,7 +2556,7 @@ func (s *knowledgeService) enqueueImageMultimodalTasks(
}
}
for _, img := range images {
for idx, img := range images {
// Match image to the ParsedChunk whose content contains the image URL.
// ChunkID was populated by processChunks with the real DB UUID.
chunkID := ""
@@ -2549,6 +2581,8 @@ func (s *knowledgeService) enqueueImageMultimodalTasks(
EnableCaption: true,
Language: lang,
ImageSourceType: metadata["image_source_type"],
Attempt: attempt,
ImageIndex: idx,
}
langfuse.InjectTracing(ctx, &payload)

View File

@@ -0,0 +1,508 @@
// Package service: span tracker.
//
// SpanTracker is the pipeline-facing facade for recording per-attempt
// progress trees. It mirrors Langfuse's vocabulary (root / span /
// generation) so the UI's mental model matches what operators already use
// for LLM call observability.
//
// Lifecycle:
//
// attempt := tracker.OpenAttempt(ctx, knowledgeID, langfuseTraceID)
// // creates the root span; every subsequent Begin* call uses this attempt
//
// stage := tracker.BeginStage(ctx, knowledgeID, attempt, types.StageDocReader, input)
// // ...do work...
// tracker.EndSpan(ctx, stage, output) // success
// tracker.FailSpan(ctx, stage, code, msg, err) // error
// tracker.SkipSpan(ctx, stage, reason) // intentionally not run
//
// sub := tracker.BeginSubSpan(ctx, parentSpan, "multimodal.image[0]", types.SpanKindGeneration, input)
// // ...
//
// All operations are best-effort: a DB error is logged and swallowed so a
// tracker hiccup never breaks the parsing pipeline. Knowledge.parse_status
// remains the authoritative source of truth for completion.
package service
import (
"context"
"strings"
"sync"
"time"
"github.com/Tencent/WeKnora/internal/application/repository"
"github.com/Tencent/WeKnora/internal/logger"
"github.com/Tencent/WeKnora/internal/types"
"github.com/google/uuid"
)
// Span is the in-memory handle the pipeline holds while a stage / subspan
// is executing. It carries enough context for End/Fail/Skip to write back
// without re-querying the DB. Returned (and required) from every Begin*.
type Span struct {
KnowledgeID string
Attempt int
SpanID string
ParentSpanID string
Name string
Kind string
StartedAt time.Time
}
// SpanTracker is the only public surface — kept as an interface so tests
// can swap in a no-op without spinning up a database.
type SpanTracker interface {
// OpenAttempt creates a new root span for (knowledgeID,
// nextAttempt) and returns its number plus the root *Span. Call
// at the start of a parse / reparse, before any other Begin*.
OpenAttempt(ctx context.Context, knowledgeID, langfuseTraceID string) (root *Span, attempt int, err error)
// LatestAttempt returns the highest attempt number recorded for
// the knowledge, or 0 if it's never been parsed. Used by the API
// layer to default to "show me the most recent run".
LatestAttempt(ctx context.Context, knowledgeID string) int
// BeginStage starts one of the canonical stages. Looks up the
// root span for (kid, attempt) — caller passes attempt to make
// the wiring explicit and let cross-process workers join an
// existing attempt without new repo lookups.
BeginStage(ctx context.Context, knowledgeID string, attempt int, stage string, input types.JSONMap) *Span
// BeginSubSpan creates a child span under parent. parent may be a
// stage span (for multimodal.image[i] / embedding.batch[i]) or
// another subspan. kind is "subspan" or "generation" — generations
// will be stitched to a Langfuse generation by trace_id.
BeginSubSpan(ctx context.Context, parent *Span, name, kind string, input types.JSONMap) *Span
// EndSpan marks span as done with optional output. Safe with nil.
EndSpan(ctx context.Context, span *Span, output types.JSONMap)
// FailSpan marks span as failed and cascade-cancels its
// descendants. errorDetail (a Go error) is recorded verbatim in
// error_detail (truncated to 8 KB) for admin views.
FailSpan(ctx context.Context, span *Span, errorCode, errorMessage string, errorDetail error)
// SkipSpan marks an intentionally not-run span (e.g. multimodal
// on a text-only document). Distinct from cancelled — skipped is
// "we chose not to" while cancelled is "an upstream broke".
SkipSpan(ctx context.Context, span *Span, reason string)
// LookupStage returns the stage's *Span for an in-flight attempt
// — the cross-process bridge that lets an asynq worker (e.g.
// image_multimodal) attach subspans to the parent stage span
// created by the upstream pipeline.
LookupStage(ctx context.Context, knowledgeID string, attempt int, stage string) *Span
}
type spanTracker struct {
repo repository.KnowledgeSpanRepository
// startsMu guards the in-process duration cache. Cross-process
// workers won't find their parent's start here — that's fine,
// duration_ms falls back to (FinishedAt - row.StartedAt) computed
// at write time when the cache misses.
startsMu sync.Mutex
starts map[string]time.Time // span_id → started_at
}
// NewSpanTracker constructs the GORM-backed tracker. A nil repo collapses
// to a no-op so test harnesses don't need to spin up a database.
func NewSpanTracker(repo repository.KnowledgeSpanRepository) SpanTracker {
if repo == nil {
return noopSpanTracker{}
}
return &spanTracker{
repo: repo,
starts: make(map[string]time.Time),
}
}
func newSpanID() string {
// Stripping the dashes saves 4 bytes per row — JSON parsers don't
// care, and operators paste these into queries / Langfuse where a
// hex-only ID is friendlier.
return strings.ReplaceAll(uuid.NewString(), "-", "")
}
func (t *spanTracker) recordStart(spanID string, at time.Time) {
t.startsMu.Lock()
t.starts[spanID] = at
t.startsMu.Unlock()
}
func (t *spanTracker) takeStart(spanID string) (time.Time, bool) {
t.startsMu.Lock()
defer t.startsMu.Unlock()
v, ok := t.starts[spanID]
if ok {
delete(t.starts, spanID)
}
return v, ok
}
func (t *spanTracker) OpenAttempt(ctx context.Context, knowledgeID, langfuseTraceID string) (*Span, int, error) {
attempt, err := t.repo.NextAttempt(ctx, knowledgeID)
if err != nil {
return nil, 0, err
}
now := time.Now()
rootID := newSpanID()
meta := types.JSONMap{}
if langfuseTraceID != "" {
// The frontend renders a "open in Langfuse" link from this.
meta["langfuse_trace_id"] = langfuseTraceID
}
row := &types.KnowledgeProcessingSpan{
KnowledgeID: knowledgeID,
Attempt: attempt,
SpanID: rootID,
Name: "knowledge_processing",
Kind: types.SpanKindRoot,
Status: types.SpanStatusRunning,
Metadata: meta,
StartedAt: &now,
}
if err := t.repo.Upsert(ctx, row); err != nil {
logger.Warnf(ctx, "[SpanTracker] OpenAttempt failed kid=%s: %v", knowledgeID, err)
return nil, attempt, err
}
t.recordStart(rootID, now)
return &Span{
KnowledgeID: knowledgeID,
Attempt: attempt,
SpanID: rootID,
Name: "knowledge_processing",
Kind: types.SpanKindRoot,
StartedAt: now,
}, attempt, nil
}
func (t *spanTracker) LatestAttempt(ctx context.Context, knowledgeID string) int {
n, err := t.repo.LatestAttempt(ctx, knowledgeID)
if err != nil {
logger.Warnf(ctx, "[SpanTracker] LatestAttempt failed kid=%s: %v", knowledgeID, err)
return 0
}
return n
}
func (t *spanTracker) BeginStage(ctx context.Context, knowledgeID string, attempt int, stage string, input types.JSONMap) *Span {
if knowledgeID == "" || stage == "" {
return nil
}
// Find root span — we need its span_id as parent for stage rows.
rows, err := t.repo.ListByAttempt(ctx, knowledgeID, attempt)
if err != nil {
logger.Warnf(ctx, "[SpanTracker] BeginStage list failed kid=%s attempt=%d: %v",
knowledgeID, attempt, err)
return nil
}
var rootID string
for i := range rows {
if rows[i].Kind == types.SpanKindRoot {
rootID = rows[i].SpanID
break
}
}
if rootID == "" {
// Pipeline started before tracker was wired (legacy data,
// or the OpenAttempt repo write failed). Synthesize a
// rootless stage so we still record SOMETHING.
logger.Warnf(ctx, "[SpanTracker] BeginStage: no root for kid=%s attempt=%d, recording rootless",
knowledgeID, attempt)
}
now := time.Now()
id := newSpanID()
row := &types.KnowledgeProcessingSpan{
KnowledgeID: knowledgeID,
Attempt: attempt,
SpanID: id,
ParentSpanID: rootID,
Name: stage,
Kind: types.SpanKindStage,
Status: types.SpanStatusRunning,
Input: input,
StartedAt: &now,
}
if err := t.repo.Upsert(ctx, row); err != nil {
logger.Warnf(ctx, "[SpanTracker] BeginStage failed kid=%s stage=%s: %v",
knowledgeID, stage, err)
return nil
}
t.recordStart(id, now)
return &Span{
KnowledgeID: knowledgeID,
Attempt: attempt,
SpanID: id,
ParentSpanID: rootID,
Name: stage,
Kind: types.SpanKindStage,
StartedAt: now,
}
}
func (t *spanTracker) BeginSubSpan(ctx context.Context, parent *Span, name, kind string, input types.JSONMap) *Span {
if parent == nil || name == "" {
return nil
}
if kind != types.SpanKindGeneration && kind != types.SpanKindSubSpan {
kind = types.SpanKindSubSpan
}
now := time.Now()
id := newSpanID()
row := &types.KnowledgeProcessingSpan{
KnowledgeID: parent.KnowledgeID,
Attempt: parent.Attempt,
SpanID: id,
ParentSpanID: parent.SpanID,
Name: name,
Kind: kind,
Status: types.SpanStatusRunning,
Input: input,
StartedAt: &now,
}
if err := t.repo.Upsert(ctx, row); err != nil {
logger.Warnf(ctx, "[SpanTracker] BeginSubSpan failed parent=%s name=%s: %v",
parent.SpanID, name, err)
return nil
}
t.recordStart(id, now)
return &Span{
KnowledgeID: parent.KnowledgeID,
Attempt: parent.Attempt,
SpanID: id,
ParentSpanID: parent.SpanID,
Name: name,
Kind: kind,
StartedAt: now,
}
}
func (t *spanTracker) EndSpan(ctx context.Context, span *Span, output types.JSONMap) {
if span == nil {
return
}
now := time.Now()
dur := durationSince(t, span, now)
row := &types.KnowledgeProcessingSpan{
KnowledgeID: span.KnowledgeID,
Attempt: span.Attempt,
SpanID: span.SpanID,
ParentSpanID: span.ParentSpanID,
Name: span.Name,
Kind: span.Kind,
Status: types.SpanStatusDone,
Output: output,
StartedAt: &span.StartedAt,
FinishedAt: &now,
DurationMs: dur,
}
if err := t.repo.Upsert(ctx, row); err != nil {
logger.Warnf(ctx, "[SpanTracker] EndSpan failed span=%s: %v", span.SpanID, err)
}
}
func (t *spanTracker) FailSpan(ctx context.Context, span *Span, errorCode, errorMessage string, errorDetail error) {
if span == nil {
return
}
now := time.Now()
dur := durationSince(t, span, now)
detail := ""
if errorDetail != nil {
detail = errorDetail.Error()
if len(detail) > 8192 {
detail = detail[:8192]
}
}
if len(errorMessage) > 1024 {
errorMessage = errorMessage[:1024]
}
row := &types.KnowledgeProcessingSpan{
KnowledgeID: span.KnowledgeID,
Attempt: span.Attempt,
SpanID: span.SpanID,
ParentSpanID: span.ParentSpanID,
Name: span.Name,
Kind: span.Kind,
Status: types.SpanStatusFailed,
ErrorCode: strings.TrimSpace(errorCode),
ErrorMessage: errorMessage,
ErrorDetail: detail,
StartedAt: &span.StartedAt,
FinishedAt: &now,
DurationMs: dur,
}
if err := t.repo.Upsert(ctx, row); err != nil {
logger.Warnf(ctx, "[SpanTracker] FailSpan failed span=%s: %v", span.SpanID, err)
}
// Cascade: anything downstream of this span gets cancelled. The
// reason string is what the UI surfaces under each cancelled
// child's tooltip — keep it short and human.
reason := "upstream " + span.Name + " failed"
if errorCode != "" {
reason = reason + " (" + errorCode + ")"
}
if _, err := t.repo.CancelDescendants(ctx, span.KnowledgeID, span.Attempt, span.SpanID, reason); err != nil {
logger.Warnf(ctx, "[SpanTracker] cancel descendants failed span=%s: %v", span.SpanID, err)
}
// For STAGE failures, also cascade to dependent stages declared
// in StageDependencies (those are siblings, not descendants).
if span.Kind == types.SpanKindStage {
t.cascadeDependentStages(ctx, span, reason)
}
}
func (t *spanTracker) SkipSpan(ctx context.Context, span *Span, reason string) {
if span == nil {
return
}
now := time.Now()
row := &types.KnowledgeProcessingSpan{
KnowledgeID: span.KnowledgeID,
Attempt: span.Attempt,
SpanID: span.SpanID,
ParentSpanID: span.ParentSpanID,
Name: span.Name,
Kind: span.Kind,
Status: types.SpanStatusSkipped,
ErrorMessage: reason,
StartedAt: &span.StartedAt,
FinishedAt: &now,
}
if err := t.repo.Upsert(ctx, row); err != nil {
logger.Warnf(ctx, "[SpanTracker] SkipSpan failed span=%s: %v", span.SpanID, err)
}
}
func (t *spanTracker) LookupStage(ctx context.Context, knowledgeID string, attempt int, stage string) *Span {
rows, err := t.repo.ListByAttempt(ctx, knowledgeID, attempt)
if err != nil {
logger.Warnf(ctx, "[SpanTracker] LookupStage list failed kid=%s attempt=%d: %v",
knowledgeID, attempt, err)
return nil
}
for i := range rows {
r := rows[i]
if r.Kind != types.SpanKindStage || r.Name != stage {
continue
}
started := time.Time{}
if r.StartedAt != nil {
started = *r.StartedAt
}
return &Span{
KnowledgeID: r.KnowledgeID,
Attempt: r.Attempt,
SpanID: r.SpanID,
ParentSpanID: r.ParentSpanID,
Name: r.Name,
Kind: r.Kind,
StartedAt: started,
}
}
return nil
}
// cascadeDependentStages flips downstream STAGE rows to "cancelled" using
// types.StageDependencies. Without this, a Chunking failure leaves
// Embedding / Multimodal as "pending" forever, even though they cannot
// possibly run.
func (t *spanTracker) cascadeDependentStages(ctx context.Context, failedStage *Span, reason string) {
rows, err := t.repo.ListByAttempt(ctx, failedStage.KnowledgeID, failedStage.Attempt)
if err != nil {
return
}
dependents := stagesDependingOn(failedStage.Name)
if len(dependents) == 0 {
return
}
now := time.Now()
for _, row := range rows {
if row.Kind != types.SpanKindStage {
continue
}
if row.Status != types.SpanStatusPending && row.Status != types.SpanStatusRunning {
continue
}
if !contains(dependents, row.Name) {
continue
}
updated := row // copy
updated.Status = types.SpanStatusCancelled
updated.ErrorCode = "UPSTREAM_FAILED"
updated.ErrorMessage = reason
updated.FinishedAt = &now
if err := t.repo.Upsert(ctx, &updated); err != nil {
logger.Warnf(ctx, "[SpanTracker] cascade dependent stage %s: %v", row.Name, err)
}
}
}
// stagesDependingOn returns the transitive closure of stages that have
// `stage` as an upstream dependency (direct or indirect). Computed by
// reverse-walking StageDependencies; the result is bounded to 5 since
// AllStages has five members, so a naive O(N²) walk is fine.
func stagesDependingOn(stage string) []string {
var out []string
seen := map[string]bool{}
frontier := []string{stage}
for len(frontier) > 0 {
var next []string
for _, candidate := range types.AllStages {
if seen[candidate] {
continue
}
deps := types.StageDependencies[candidate]
for _, d := range deps {
if contains(frontier, d) {
seen[candidate] = true
out = append(out, candidate)
next = append(next, candidate)
break
}
}
}
frontier = next
}
return out
}
func contains(haystack []string, needle string) bool {
for _, h := range haystack {
if h == needle {
return true
}
}
return false
}
// durationSince computes elapsed ms preferring the in-process cache;
// falls back to the *Span's StartedAt for cross-process callers.
func durationSince(t *spanTracker, span *Span, now time.Time) int64 {
if start, ok := t.takeStart(span.SpanID); ok {
return now.Sub(start).Milliseconds()
}
if !span.StartedAt.IsZero() {
return now.Sub(span.StartedAt).Milliseconds()
}
return 0
}
// noopSpanTracker collapses every method to a no-op for tests/lite.
type noopSpanTracker struct{}
func (noopSpanTracker) OpenAttempt(_ context.Context, _, _ string) (*Span, int, error) {
return nil, 0, nil
}
func (noopSpanTracker) LatestAttempt(_ context.Context, _ string) int { return 0 }
func (noopSpanTracker) BeginStage(_ context.Context, _ string, _ int, _ string, _ types.JSONMap) *Span {
return nil
}
func (noopSpanTracker) BeginSubSpan(_ context.Context, _ *Span, _, _ string, _ types.JSONMap) *Span {
return nil
}
func (noopSpanTracker) EndSpan(_ context.Context, _ *Span, _ types.JSONMap) {}
func (noopSpanTracker) FailSpan(_ context.Context, _ *Span, _, _ string, _ error) {}
func (noopSpanTracker) SkipSpan(_ context.Context, _ *Span, _ string) {}
func (noopSpanTracker) LookupStage(_ context.Context, _ string, _ int, _ string) *Span { return nil }

View File

@@ -1,156 +0,0 @@
// Package service: stage tracker.
//
// StageTracker is the thin facade every pipeline call site uses to record
// progress. Its three operations (Begin, Done, Fail) cover the lifecycle
// of every stage; everything else (ordering, finalize semantics) is the
// caller's responsibility — the tracker just persists what it's told.
//
// All operations are best-effort: a DB error is logged and swallowed so
// the parsing pipeline itself never breaks because tracker bookkeeping
// failed. The Knowledge.parse_status column remains the authoritative
// source of truth for whether a document is done.
package service
import (
"context"
"strings"
"time"
"github.com/Tencent/WeKnora/internal/application/repository"
"github.com/Tencent/WeKnora/internal/logger"
"github.com/Tencent/WeKnora/internal/types"
)
// StageTracker exposes the trio of write operations the pipeline needs.
// Kept as an interface so unit tests can swap in a no-op implementation
// without spinning up a database.
type StageTracker interface {
Begin(ctx context.Context, knowledgeID, stage string)
Done(ctx context.Context, knowledgeID, stage string)
Fail(ctx context.Context, knowledgeID, stage, errorCode, errorMessage string, errorDetail error)
Skip(ctx context.Context, knowledgeID, stage string)
}
type stageTracker struct {
repo repository.KnowledgeStageRepository
// in-memory map of (knowledge_id|stage) → start time so we can
// compute duration_ms when Done/Fail fires without an extra DB
// read. Best-effort: a missed Begin (e.g. process restart between
// Begin and Done) just yields duration_ms=0, which the UI renders
// as "—" instead of a wrong number.
starts map[string]time.Time
}
// NewStageTracker constructs a tracker backed by the given repo.
func NewStageTracker(repo repository.KnowledgeStageRepository) StageTracker {
if repo == nil {
return noopStageTracker{}
}
return &stageTracker{
repo: repo,
starts: make(map[string]time.Time),
}
}
func startKey(kid, stage string) string {
return kid + "|" + stage
}
func (t *stageTracker) Begin(ctx context.Context, knowledgeID, stage string) {
if knowledgeID == "" || stage == "" {
return
}
now := time.Now()
t.starts[startKey(knowledgeID, stage)] = now
row := &types.KnowledgeProcessingStage{
KnowledgeID: knowledgeID,
Stage: stage,
Status: types.ProcessingStageRunning,
StartedAt: &now,
}
if err := t.repo.Upsert(ctx, row); err != nil {
logger.Warnf(ctx, "[StageTracker] Begin failed kid=%s stage=%s: %v", knowledgeID, stage, err)
}
}
func (t *stageTracker) Done(ctx context.Context, knowledgeID, stage string) {
if knowledgeID == "" || stage == "" {
return
}
now := time.Now()
var dur int64
if start, ok := t.starts[startKey(knowledgeID, stage)]; ok {
dur = now.Sub(start).Milliseconds()
delete(t.starts, startKey(knowledgeID, stage))
}
row := &types.KnowledgeProcessingStage{
KnowledgeID: knowledgeID,
Stage: stage,
Status: types.ProcessingStageDone,
FinishedAt: &now,
DurationMs: dur,
}
if err := t.repo.Upsert(ctx, row); err != nil {
logger.Warnf(ctx, "[StageTracker] Done failed kid=%s stage=%s: %v", knowledgeID, stage, err)
}
}
func (t *stageTracker) Fail(ctx context.Context, knowledgeID, stage, errorCode, errorMessage string, errorDetail error) {
if knowledgeID == "" || stage == "" {
return
}
now := time.Now()
var dur int64
if start, ok := t.starts[startKey(knowledgeID, stage)]; ok {
dur = now.Sub(start).Milliseconds()
delete(t.starts, startKey(knowledgeID, stage))
}
detail := ""
if errorDetail != nil {
detail = errorDetail.Error()
if len(detail) > 8192 {
detail = detail[:8192]
}
}
if len(errorMessage) > 1024 {
errorMessage = errorMessage[:1024]
}
row := &types.KnowledgeProcessingStage{
KnowledgeID: knowledgeID,
Stage: stage,
Status: types.ProcessingStageFailed,
FinishedAt: &now,
DurationMs: dur,
ErrorCode: strings.TrimSpace(errorCode),
ErrorMessage: errorMessage,
ErrorDetail: detail,
}
if err := t.repo.Upsert(ctx, row); err != nil {
logger.Warnf(ctx, "[StageTracker] Fail failed kid=%s stage=%s: %v", knowledgeID, stage, err)
}
}
func (t *stageTracker) Skip(ctx context.Context, knowledgeID, stage string) {
if knowledgeID == "" || stage == "" {
return
}
now := time.Now()
row := &types.KnowledgeProcessingStage{
KnowledgeID: knowledgeID,
Stage: stage,
Status: types.ProcessingStageSkipped,
FinishedAt: &now,
}
if err := t.repo.Upsert(ctx, row); err != nil {
logger.Warnf(ctx, "[StageTracker] Skip failed kid=%s stage=%s: %v", knowledgeID, stage, err)
}
}
// noopStageTracker is returned when the repo is nil — keeps call sites
// trivial (no `if t != nil` guards) at the cost of one struct allocation.
type noopStageTracker struct{}
func (noopStageTracker) Begin(_ context.Context, _, _ string) {}
func (noopStageTracker) Done(_ context.Context, _, _ string) {}
func (noopStageTracker) Fail(_ context.Context, _, _, _, _ string, _ error) {}
func (noopStageTracker) Skip(_ context.Context, _, _ string) {}

View File

@@ -140,7 +140,7 @@ func BuildContainer(container *dig.Container) *dig.Container {
must(container.Provide(repository.NewAuditLogRepository))
must(container.Provide(repository.NewKnowledgeBaseRepository))
must(container.Provide(repository.NewKnowledgeRepository))
must(container.Provide(repository.NewKnowledgeStageRepository))
must(container.Provide(repository.NewKnowledgeSpanRepository))
must(container.Provide(repository.NewChunkRepository))
must(container.Provide(repository.NewKnowledgeTagRepository))
must(container.Provide(repository.NewSessionRepository))
@@ -183,7 +183,7 @@ func BuildContainer(container *dig.Container) *dig.Container {
must(container.Provide(service.NewKBShareService)) // KBShareService must be registered before KnowledgeService and KnowledgeTagService
must(container.Provide(service.NewAgentShareService))
must(container.Provide(service.NewKnowledgeService))
must(container.Provide(service.NewStageTracker))
must(container.Provide(service.NewSpanTracker))
must(container.Provide(service.NewChunkService))
must(container.Provide(service.NewKnowledgeTagService))
must(container.Provide(embedding.NewBatchEmbedder))

View File

@@ -34,7 +34,7 @@ type KnowledgeHandler struct {
kbShareService interfaces.KBShareService
agentShareService interfaces.AgentShareService
asynqClient interfaces.TaskEnqueuer
stageRepo repository.KnowledgeStageRepository
spanRepo repository.KnowledgeSpanRepository
}
// NewKnowledgeHandler creates a new knowledge handler instance
@@ -44,7 +44,7 @@ func NewKnowledgeHandler(
kbShareService interfaces.KBShareService,
agentShareService interfaces.AgentShareService,
asynqClient interfaces.TaskEnqueuer,
stageRepo repository.KnowledgeStageRepository,
spanRepo repository.KnowledgeSpanRepository,
) *KnowledgeHandler {
return &KnowledgeHandler{
kgService: kgService,
@@ -52,7 +52,7 @@ func NewKnowledgeHandler(
kbShareService: kbShareService,
agentShareService: agentShareService,
asynqClient: asynqClient,
stageRepo: stageRepo,
spanRepo: spanRepo,
}
}
@@ -549,20 +549,22 @@ func (h *KnowledgeHandler) GetKnowledge(c *gin.Context) {
})
}
// GetKnowledgeStages godoc
// @Summary 获取知识文档解析阶段进度
// @Description 返回该知识在解析流水线docreader/chunking/embedding/multimodal/postprocess的每段状态、耗时与错误码。前端用于渲染时间线
// GetKnowledgeSpans godoc
// @Summary 获取知识文档解析的 Span 树(含历史尝试)
// @Description 返回该知识在解析流水线的 trace treeroot → stage → subspan每段状态、耗时、input/output、错误码、langfuse_trace_id。支持 ?attempt=N 查看历史尝试;不传则返回最新尝试。前端用于渲染时间线 + 多模态/embedding 子节点 + 一键跳转 Langfuse
// @Tags 知识管理
// @Accept json
// @Produce json
// @Param id path string true "知识ID"
// @Success 200 {object} map[string]interface{}
// @Router /api/v1/knowledge/{id}/stages [get]
// @Param id path string true "知识ID"
// @Param attempt query int false "指定尝试号;省略=最新"
// @Success 200 {object} map[string]interface{}
// @Router /api/v1/knowledge/{id}/spans [get]
//
// Always returns AllProcessingStages segments — missing rows are
// Always returns the canonical 5-stage timeline; missing stage rows are
// synthesized as "pending" so the frontend timeline always renders five
// segments and doesn't have to know about backend persistence races.
func (h *KnowledgeHandler) GetKnowledgeStages(c *gin.Context) {
// segments. Subspans (multimodal.image[i], generation.*) ride along under
// each stage as children when present.
func (h *KnowledgeHandler) GetKnowledgeSpans(c *gin.Context) {
ctx := c.Request.Context()
id := secutils.SanitizeForLog(c.Param("id"))
@@ -577,63 +579,60 @@ func (h *KnowledgeHandler) GetKnowledgeStages(c *gin.Context) {
return
}
rows := []types.KnowledgeProcessingStage{}
if h.stageRepo != nil {
rows, err = h.stageRepo.ListByKnowledge(ctx, knowledge.ID)
if err != nil {
// A repo error doesn't break the response — we still
// return the placeholder timeline so the UI renders.
logger.Warnf(ctx, "stages list failed for %s: %v", knowledge.ID, err)
rows = nil
// Pick attempt: explicit ?attempt=N wins; otherwise pull the
// latest attempt from the spans table. Lite-mode / fresh installs
// with zero rows fall through to attempt=0, in which case we
// return a placeholder tree (5 pending stages, no root, no
// children) so the UI still renders.
requestedAttempt := 0
if v := strings.TrimSpace(c.Query("attempt")); v != "" {
if n, perr := strconv.Atoi(v); perr == nil && n > 0 {
requestedAttempt = n
}
}
// Index by stage so synthesizing missing rows is O(N).
byStage := make(map[string]types.KnowledgeProcessingStage, len(rows))
for _, r := range rows {
byStage[r.Stage] = r
rows := []types.KnowledgeProcessingSpan{}
currentAttempt := 0
if h.spanRepo != nil {
if requestedAttempt == 0 {
latest, lerr := h.spanRepo.LatestAttempt(ctx, knowledge.ID)
if lerr != nil {
logger.Warnf(ctx, "spans LatestAttempt failed for %s: %v", knowledge.ID, lerr)
} else {
currentAttempt = latest
}
} else {
currentAttempt = requestedAttempt
}
if currentAttempt > 0 {
rows, err = h.spanRepo.ListByAttempt(ctx, knowledge.ID, currentAttempt)
if err != nil {
logger.Warnf(ctx, "spans ListByAttempt failed kid=%s attempt=%d: %v",
knowledge.ID, currentAttempt, err)
rows = nil
}
}
}
now := time.Now()
out := make([]types.KnowledgeProcessingStage, 0, len(types.AllProcessingStages))
currentStage := ""
var lastFailure *types.KnowledgeProcessingStage
for _, name := range types.AllProcessingStages {
if r, ok := byStage[name]; ok {
out = append(out, r)
if r.Status == types.ProcessingStageRunning && currentStage == "" {
currentStage = r.Stage
}
if r.Status == types.ProcessingStageFailed {
cp := r
lastFailure = &cp
}
continue
}
// Synthesize a pending placeholder. CreatedAt set to now so
// the JSON response is well-formed; clients should treat
// timestamps on placeholders as "not meaningful".
out = append(out, types.KnowledgeProcessingStage{
KnowledgeID: knowledge.ID,
Stage: name,
Status: types.ProcessingStagePending,
CreatedAt: now,
UpdatedAt: now,
})
}
// Build tree: index by SpanID, then attach to parents. Stages
// missing from the DB are synthesized as "pending" placeholders
// under a synthetic (or real, if present) root so the timeline
// always renders five segments.
tree, currentStageName, lastErr := buildSpanTree(knowledge.ID, currentAttempt, rows)
resp := gin.H{
"knowledge_id": knowledge.ID,
"parse_status": knowledge.ParseStatus,
"current_stage": currentStage,
"stages": out,
"knowledge_id": knowledge.ID,
"parse_status": knowledge.ParseStatus,
"current_attempt": currentAttempt,
"current_stage": currentStageName,
"trace": tree,
}
if lastFailure != nil {
if lastErr != nil {
resp["last_error"] = gin.H{
"stage": lastFailure.Stage,
"code": lastFailure.ErrorCode,
"message": lastFailure.ErrorMessage,
"finished_at": lastFailure.FinishedAt,
"stage": lastErr.Name,
"code": lastErr.ErrorCode,
"message": lastErr.ErrorMessage,
"finished_at": lastErr.FinishedAt,
}
}
c.JSON(http.StatusOK, gin.H{
@@ -642,6 +641,102 @@ func (h *KnowledgeHandler) GetKnowledgeStages(c *gin.Context) {
})
}
// buildSpanTree assembles a flat list of span rows into a parent-child
// tree rooted at the (knowledge, attempt)'s root span. Missing canonical
// stages are filled in with pending placeholders so the UI always renders
// the five timeline segments. Returns the root, the current_stage name
// (the running stage if any), and the most recent failed span if one
// exists.
func buildSpanTree(knowledgeID string, attempt int, rows []types.KnowledgeProcessingSpan) (
root *types.SpanTreeNode, currentStage string, lastFailure *types.KnowledgeProcessingSpan,
) {
now := time.Now()
// Build node lookup, identify root.
nodes := make(map[string]*types.SpanTreeNode, len(rows))
var rootRow *types.KnowledgeProcessingSpan
stageRowByName := map[string]*types.KnowledgeProcessingSpan{}
for i := range rows {
r := rows[i]
nodes[r.SpanID] = &types.SpanTreeNode{KnowledgeProcessingSpan: r}
if r.Kind == types.SpanKindRoot && rootRow == nil {
cp := r
rootRow = &cp
}
if r.Kind == types.SpanKindStage {
cp := r
stageRowByName[r.Name] = &cp
}
if r.Status == types.SpanStatusRunning && r.Kind == types.SpanKindStage && currentStage == "" {
currentStage = r.Name
}
if r.Status == types.SpanStatusFailed {
cp := r
lastFailure = &cp
}
}
// Synthesize root if no rows came back so the API contract stays
// stable (frontend always expects a `trace` object).
if rootRow == nil {
root = &types.SpanTreeNode{KnowledgeProcessingSpan: types.KnowledgeProcessingSpan{
KnowledgeID: knowledgeID,
Attempt: attempt,
SpanID: "",
Name: "knowledge_processing",
Kind: types.SpanKindRoot,
Status: types.SpanStatusPending,
CreatedAt: now,
UpdatedAt: now,
}}
} else {
root = nodes[rootRow.SpanID]
}
// Synthesize missing stage rows as "pending" children of root so
// the timeline always shows 5 segments.
for _, name := range types.AllStages {
if _, ok := stageRowByName[name]; ok {
continue
}
placeholder := types.KnowledgeProcessingSpan{
KnowledgeID: knowledgeID,
Attempt: attempt,
Name: name,
Kind: types.SpanKindStage,
Status: types.SpanStatusPending,
CreatedAt: now,
UpdatedAt: now,
}
nodes[name+"_placeholder_"+knowledgeID] = &types.SpanTreeNode{KnowledgeProcessingSpan: placeholder}
// Attach placeholder under root by giving it the root's
// span_id as parent — done after the link pass below.
}
// Link real children to their parents. Placeholders we just
// added have no real parent; we attach them to root in a
// second pass after the linking is done.
for _, n := range nodes {
if n == root {
continue
}
if n.ParentSpanID == "" {
// Real top-level row with no parent and not the root
// itself — attach to root so it doesn't dangle.
root.Children = append(root.Children, n)
continue
}
parent, ok := nodes[n.ParentSpanID]
if !ok {
// Unknown parent (orphan); attach to root.
root.Children = append(root.Children, n)
continue
}
parent.Children = append(parent.Children, n)
}
return root, currentStage, lastFailure
}
// ListKnowledge godoc
// @Summary 获取知识列表
// @Description 获取知识库下的知识列表,支持分页和筛选

View File

@@ -283,7 +283,8 @@ func RegisterKnowledgeRoutes(r *gin.RouterGroup, handler *handler.KnowledgeHandl
// the role-only floor; /move and /batch-delete stay Contributor.
k.GET("/batch", g.Viewer(), handler.GetKnowledgeBatch)
k.GET("/:id", g.Viewer(), g.KBAccessReadFromKnowledgeIDParam("id"), handler.GetKnowledge)
k.GET("/:id/stages", g.Viewer(), g.KBAccessReadFromKnowledgeIDParam("id"), handler.GetKnowledgeStages)
k.GET("/:id/stages", g.Viewer(), g.KBAccessReadFromKnowledgeIDParam("id"), handler.GetKnowledgeSpans)
k.GET("/:id/spans", g.Viewer(), g.KBAccessReadFromKnowledgeIDParam("id"), handler.GetKnowledgeSpans)
k.DELETE("/:id", g.OwnedKnowledgeKBOrAdmin(), g.KBAccessWriteFromKnowledgeIDParam("id"), handler.DeleteKnowledge)
k.PUT("/:id", g.OwnedKnowledgeKBOrAdmin(), g.KBAccessWriteFromKnowledgeIDParam("id"), handler.UpdateKnowledge)
k.PUT("/manual/:id", g.OwnedKnowledgeKBOrAdmin(), g.KBAccessWriteFromKnowledgeIDParam("id"), handler.UpdateManualKnowledge)

View File

@@ -0,0 +1,50 @@
package types
import (
"database/sql/driver"
"encoding/json"
"errors"
)
// JSONMap is a map[string]any that round-trips through Postgres JSONB
// columns via GORM. The standard library's database/sql doesn't know how
// to scan JSONB into a typed map, so we hand-roll Value() / Scan().
//
// Lives here (not in /utils) because the only callers are types whose
// schemas already declare JSONB columns — keeping the helper next to its
// users avoids a circular import in any package that already pulls in
// /types.
type JSONMap map[string]any
// Value implements driver.Valuer. Returns nil when the map is nil so
// JSONB columns store SQL NULL instead of "null".
func (m JSONMap) Value() (driver.Value, error) {
if m == nil {
return nil, nil
}
return json.Marshal(m)
}
// Scan implements sql.Scanner. Accepts both []byte (real JSONB) and
// string (some drivers under sqlmock); anything else is a programming
// error.
func (m *JSONMap) Scan(src any) error {
if src == nil {
*m = nil
return nil
}
var raw []byte
switch v := src.(type) {
case []byte:
raw = v
case string:
raw = []byte(v)
default:
return errors.New("JSONMap.Scan: unsupported source type")
}
if len(raw) == 0 {
*m = nil
return nil
}
return json.Unmarshal(raw, m)
}

View File

@@ -0,0 +1,117 @@
package types
import "time"
// Span kinds — kept narrow because every kind has dedicated rendering on
// the frontend timeline:
//
// - SpanKindRoot — the per-(knowledge, attempt) trace root. Always
// the parent_span_id ancestor of every other span
// in that attempt. UI uses it for total elapsed.
// - SpanKindStage — one of the 5 canonical stages (DocReader, etc.).
// UI renders these as the timeline segments.
// - SpanKindSubSpan — anything inside a stage (e.g. multimodal.image[i]).
// UI shows them as collapsible children.
// - SpanKindGeneration — a SubSpan that wraps an LLM/VLM call. Same UI
// treatment as SubSpan but tagged so we can stitch
// to the matching Langfuse generation.
const (
SpanKindRoot = "root"
SpanKindStage = "stage"
SpanKindSubSpan = "subspan"
SpanKindGeneration = "generation"
)
// Span statuses. We deliberately distinguish "failed" (this span itself
// errored) from "cancelled" (an upstream span failed and we abandoned this
// one without running it) so the UI can render the cause differently —
// "you broke X, so we never ran Y" vs. "Y itself broke".
const (
SpanStatusPending = "pending"
SpanStatusRunning = "running"
SpanStatusDone = "done"
SpanStatusFailed = "failed"
SpanStatusSkipped = "skipped" // intentionally not run (e.g. multimodal on a text-only doc)
SpanStatusCancelled = "cancelled" // not run because an upstream span failed
)
// Stage names — the closed set the UI builds its 5-segment timeline from.
// Adding a stage requires a coordinated frontend release. SubSpan names
// are free-form (e.g. "multimodal.image[0]") and don't go through this
// list.
const (
StageDocReader = "docreader"
StageChunking = "chunking"
StageEmbedding = "embedding"
StageMultimodal = "multimodal"
StagePostProcess = "postprocess"
)
// AllStages is the canonical, ordered stage list. Used by the API layer
// to synthesize "pending" placeholders so the timeline always renders five
// segments even before parsing starts.
var AllStages = []string{
StageDocReader,
StageChunking,
StageEmbedding,
StageMultimodal,
StagePostProcess,
}
// StageDependencies declares the DAG between stages. Used by the tracker
// to cascade-cancel dependents when a stage fails — a Chunking failure
// silently turns Embedding/Multimodal/PostProcess into "cancelled" so the
// timeline shows a clear blast radius instead of three pending spinners.
//
// Important: Multimodal does NOT depend on Embedding. They share Chunking
// as their upstream and are otherwise independent (Multimodal kicks off
// regardless of vector indexing config). PostProcess joins both before
// running its handlers.
var StageDependencies = map[string][]string{
StageDocReader: nil,
StageChunking: {StageDocReader},
StageEmbedding: {StageChunking},
StageMultimodal: {StageChunking},
StagePostProcess: {StageEmbedding, StageMultimodal},
}
// KnowledgeProcessingSpan is one row in knowledge_processing_spans.
//
// Field tags pull double duty: GORM (storage) and JSON (API). ErrorDetail
// is excluded by default — handlers must opt in for admin views, matching
// how the dead-letter middleware already protects raw stack traces.
type KnowledgeProcessingSpan struct {
ID int64 `gorm:"primaryKey;column:id" json:"-"`
KnowledgeID string `gorm:"column:knowledge_id" json:"knowledge_id"`
Attempt int `gorm:"column:attempt" json:"attempt"`
SpanID string `gorm:"column:span_id;size:64" json:"span_id"`
ParentSpanID string `gorm:"column:parent_span_id;size:64" json:"parent_span_id,omitempty"`
Name string `gorm:"column:name;size:64" json:"name"`
Kind string `gorm:"column:kind;size:16" json:"kind"`
Status string `gorm:"column:status;size:16" json:"status"`
Input JSONMap `gorm:"column:input;type:jsonb" json:"input,omitempty"`
Output JSONMap `gorm:"column:output;type:jsonb" json:"output,omitempty"`
Metadata JSONMap `gorm:"column:metadata;type:jsonb" json:"metadata,omitempty"`
ErrorCode string `gorm:"column:error_code;size:64" json:"error_code,omitempty"`
ErrorMessage string `gorm:"column:error_message;type:text" json:"error_message,omitempty"`
ErrorDetail string `gorm:"column:error_detail;type:text" json:"-"`
StartedAt *time.Time `gorm:"column:started_at" json:"started_at,omitempty"`
FinishedAt *time.Time `gorm:"column:finished_at" json:"finished_at,omitempty"`
DurationMs int64 `gorm:"column:duration_ms" json:"duration_ms,omitempty"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime" json:"created_at"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime" json:"updated_at"`
}
// TableName pins the table because GORM's default pluralization
// ("knowledge_processing_spans") happens to match — explicit beats
// implicit.
func (KnowledgeProcessingSpan) TableName() string {
return "knowledge_processing_spans"
}
// SpanTreeNode is the API-only tree projection. The repo returns flat
// rows; the handler/tracker assembles SpanTreeNode for the response.
type SpanTreeNode struct {
KnowledgeProcessingSpan
Children []*SpanTreeNode `json:"children,omitempty"`
}

View File

@@ -1,64 +0,0 @@
package types
import "time"
// ProcessingStage names — kept as a closed set on the Go side so the
// frontend timeline can rely on exactly five segments. Adding a new stage
// requires a coordinated frontend release; rename in this file is breaking.
const (
ProcessingStageDocReader = "docreader"
ProcessingStageChunking = "chunking"
ProcessingStageEmbedding = "embedding"
ProcessingStageMultimodal = "multimodal"
ProcessingStagePostProcess = "postprocess"
)
// AllProcessingStages is the canonical, ordered list. Used by the API
// to fill in pending entries when no row exists yet for a stage so the
// timeline always renders five segments even before parsing starts.
var AllProcessingStages = []string{
ProcessingStageDocReader,
ProcessingStageChunking,
ProcessingStageEmbedding,
ProcessingStageMultimodal,
ProcessingStagePostProcess,
}
// ProcessingStageStatus values. Distinct from Knowledge.parse_status —
// each stage row tracks its own state independently of the parent.
const (
ProcessingStagePending = "pending" // not started yet
ProcessingStageRunning = "running" // currently executing
ProcessingStageDone = "done"
ProcessingStageFailed = "failed"
ProcessingStageSkipped = "skipped" // e.g. multimodal skipped for image-free docs
)
// KnowledgeProcessingStage is one row in knowledge_processing_stages.
//
// We intentionally keep it as a flat struct with both Go and JSON tags so
// the same type round-trips through GORM (DB layer) and the API response.
// Sensitive ErrorDetail (full stack) is excluded from the default JSON
// output via "-" — handlers must opt in explicitly for admin views.
type KnowledgeProcessingStage struct {
ID int64 `gorm:"primaryKey;column:id" json:"-"`
KnowledgeID string `gorm:"column:knowledge_id;index" json:"knowledge_id"`
Stage string `gorm:"column:stage;size:32" json:"stage"`
Status string `gorm:"column:status;size:16" json:"status"`
StartedAt *time.Time `gorm:"column:started_at" json:"started_at,omitempty"`
FinishedAt *time.Time `gorm:"column:finished_at" json:"finished_at,omitempty"`
DurationMs int64 `gorm:"column:duration_ms" json:"duration_ms,omitempty"`
ErrorCode string `gorm:"column:error_code;size:64" json:"error_code,omitempty"`
ErrorMessage string `gorm:"column:error_message;type:text" json:"error_message,omitempty"`
ErrorDetail string `gorm:"column:error_detail;type:text" json:"-"`
Attempt int `gorm:"column:attempt" json:"attempt"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime" json:"created_at"`
UpdatedAt time.Time `gorm:"column:updated_at;autoUpdateTime" json:"updated_at"`
}
// TableName pins the GORM table since the natural pluralization
// ("knowledge_processing_stages") happens to match what the migration
// creates — but explicit beats implicit here.
func (KnowledgeProcessingStage) TableName() string {
return "knowledge_processing_stages"
}

View File

@@ -44,6 +44,13 @@ type DocumentProcessPayload struct {
EnableQuestionGeneration bool `json:"enable_question_generation"` // 是否启用问题生成
QuestionCount int `json:"question_count,omitempty"` // 每个chunk生成的问题数量
Language string `json:"language,omitempty"` // Request locale for {{language}} in prompt templates
// Attempt is the per-knowledge attempt number this task belongs to.
// Set on enqueue (initial parse → attempt 1; reparse → max+1) so
// every span recorded by this task lands on the right attempt
// branch. Asynq retries within an attempt keep the same value so
// retried spans overwrite the previous attempt's row rather than
// fan out into a new attempt for every retry.
Attempt int `json:"attempt,omitempty"`
}
// FAQImportPayload represents the FAQ import task payload (including dry run mode)
@@ -176,6 +183,14 @@ type ImageMultimodalPayload struct {
EnableCaption bool `json:"enable_caption"`
Language string `json:"language,omitempty"` // Request locale for {{language}} in prompt templates
ImageSourceType string `json:"image_source_type,omitempty"` // Source type of the image (e.g., "scanned_pdf")
// Attempt links this image task back to the parent ProcessDocument
// attempt so the worker can record its image[i] subspan under the
// same attempt's multimodal stage span.
Attempt int `json:"attempt,omitempty"`
// ImageIndex is the 0-based ordinal of this image inside the
// parent's image set. Used as the subspan name suffix
// ("multimodal.image[3]") so the timeline preserves order.
ImageIndex int `json:"image_index,omitempty"`
}
// KnowledgePostProcessPayload represents the knowledge post process task payload.
@@ -185,6 +200,7 @@ type KnowledgePostProcessPayload struct {
KnowledgeID string `json:"knowledge_id"`
KnowledgeBaseID string `json:"knowledge_base_id"`
Language string `json:"language,omitempty"` // Request locale for {{language}} in prompt templates
Attempt int `json:"attempt,omitempty"`
}
// KBCloneTaskStatus represents the status of a knowledge base clone task

View File

@@ -0,0 +1,33 @@
-- Migration: 000053_knowledge_processing_spans (rollback)
-- Restores the 000052 flat-stage table so the system is operational on a
-- back-rev. We do NOT migrate data back — the spans table carries strict
-- supersets of the stages table's information, so a forward-and-back round
-- trip silently drops the new tree shape (acceptable; rollback is a rare
-- emergency lever).
DO $$ BEGIN RAISE NOTICE '[Migration 000053 rollback] Dropping spans, restoring stages table'; END $$;
DROP INDEX IF EXISTS idx_kpspan_parent;
DROP INDEX IF EXISTS idx_kpspan_status_started;
DROP INDEX IF EXISTS idx_kpspan_knowledge_attempt;
DROP TABLE IF EXISTS knowledge_processing_spans;
CREATE TABLE IF NOT EXISTS knowledge_processing_stages (
id BIGSERIAL PRIMARY KEY,
knowledge_id VARCHAR(64) NOT NULL,
stage VARCHAR(32) NOT NULL,
status VARCHAR(16) NOT NULL,
started_at TIMESTAMP WITH TIME ZONE,
finished_at TIMESTAMP WITH TIME ZONE,
duration_ms BIGINT,
error_code VARCHAR(64),
error_message TEXT,
error_detail TEXT,
attempt INT NOT NULL DEFAULT 1,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT uq_kps_knowledge_stage UNIQUE (knowledge_id, stage)
);
CREATE INDEX IF NOT EXISTS idx_kps_knowledge ON knowledge_processing_stages (knowledge_id);
CREATE INDEX IF NOT EXISTS idx_kps_status_started ON knowledge_processing_stages (status, started_at);
DO $$ BEGIN RAISE NOTICE '[Migration 000053 rollback] complete'; END $$;

View File

@@ -0,0 +1,90 @@
-- Migration: 000053_knowledge_processing_spans
-- Replace the flat 5-stage tracking from migration 000052 with a tree-shaped
-- span model inspired by Langfuse traces.
--
-- Why redesign?
--
-- The flat (knowledge_id, stage) model from 000052 had four shortcomings the
-- "stuck parsing" UX requires us to solve:
--
-- 1. Reparse semantics — a re-trigger of parsing on the same knowledge
-- reset the existing rows, erasing the previous attempt's history.
-- Operators investigating "why did it fail twice?" had no record of
-- attempt 1 once attempt 2 began.
--
-- 2. No DAG — Embedding and Multimodal both depend on Chunking but are
-- independent of each other. The flat model couldn't represent that,
-- so a Chunking failure couldn't auto-cascade "cancelled" to its
-- downstream stages and the UI had to guess.
--
-- 3. No subspans — a Multimodal stage produces N parallel image tasks;
-- Embedding produces M batches; PostProcess fans out into Summary /
-- Question / Wiki / Graph. Those finer-grained units have their own
-- success / failure / duration that operators want to see (and that
-- Langfuse generations already capture for the LLM-call subset).
--
-- 4. No input/output — duration alone hides the WHY of a slow run. With
-- input ({pages:48, images:5}) and output ({tokens:5840, dim:1024})
-- JSON, the timeline answers "how big was the work" without a log
-- dive.
--
-- The new schema mirrors Langfuse's trace/span/generation hierarchy:
-- * one ROOT span per (knowledge_id, attempt) acting as the trace
-- * STAGE spans (docreader/chunking/embedding/multimodal/postprocess)
-- are children of root
-- * SUBSPANs (multimodal.image[i], embedding.batch[i], postprocess.spawn.X)
-- hang off their stage. The kind="generation" subset corresponds 1:1 to
-- a Langfuse generation; metadata.langfuse_trace_id stitches them.
--
-- We keep the OLD 000052 table untouched on rollback so a `migrate down`
-- doesn't lose either schema. The forward path drops it because no UI
-- callers use it (the API was only added in the same branch and is being
-- replaced as part of this migration).
DO $$ BEGIN RAISE NOTICE '[Migration 000053] Replacing knowledge_processing_stages with knowledge_processing_spans'; END $$;
DROP INDEX IF EXISTS idx_kps_status_started;
DROP INDEX IF EXISTS idx_kps_knowledge;
DROP TABLE IF EXISTS knowledge_processing_stages;
CREATE TABLE IF NOT EXISTS knowledge_processing_spans (
id BIGSERIAL PRIMARY KEY,
knowledge_id VARCHAR(64) NOT NULL,
attempt INT NOT NULL DEFAULT 1,
span_id VARCHAR(64) NOT NULL,
parent_span_id VARCHAR(64),
name VARCHAR(64) NOT NULL,
kind VARCHAR(16) NOT NULL, -- root / stage / subspan / generation
status VARCHAR(16) NOT NULL, -- pending/running/done/failed/skipped/cancelled
input JSONB,
output JSONB,
metadata JSONB,
error_code VARCHAR(64),
error_message TEXT,
error_detail TEXT,
started_at TIMESTAMP WITH TIME ZONE,
finished_at TIMESTAMP WITH TIME ZONE,
duration_ms BIGINT,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT uq_kpspan_attempt_span UNIQUE (knowledge_id, attempt, span_id)
);
-- Primary read path: fetch every span for a (knowledge, attempt) tuple in
-- one indexed range scan, then build the tree in memory. The unique
-- constraint above already covers point lookups.
CREATE INDEX IF NOT EXISTS idx_kpspan_knowledge_attempt
ON knowledge_processing_spans (knowledge_id, attempt);
-- Operator query: "find spans stuck in running too long". Used by ad-hoc
-- diagnostics and a future housekeeping sweep.
CREATE INDEX IF NOT EXISTS idx_kpspan_status_started
ON knowledge_processing_spans (status, started_at);
-- Lineage walks: cascade-cancel a stage's downstream needs to find every
-- child by parent_span_id. The cardinality is small (≤ tens per attempt)
-- so we don't need a covering index, just B-tree on parent.
CREATE INDEX IF NOT EXISTS idx_kpspan_parent
ON knowledge_processing_spans (parent_span_id)
WHERE parent_span_id IS NOT NULL;
DO $$ BEGIN RAISE NOTICE '[Migration 000053] knowledge_processing_spans table ready'; END $$;