mirror of
https://github.com/Tencent/WeKnora.git
synced 2026-06-04 13:30:32 +08:00
feat(timeline): enhance knowledge processing spans and UI elements
This commit introduces several improvements to the knowledge processing timeline and related components. Key changes include: 1. Added a `gracePoll` prop to the `KnowledgeProcessingTimeline` component to manage polling behavior more effectively. 2. Enhanced the UI by displaying the document title in the drawer, improving user visibility of the current document context. 3. Implemented new CSS classes for better styling of the drawer title bar, ensuring a more polished appearance. 4. Updated the backend to support the new `WikiSpan` tracking, allowing for detailed monitoring of document processing stages. These changes aim to improve user experience and provide better insights into the document processing workflow.
This commit is contained in:
@@ -846,7 +846,8 @@ const handleDetailsScroll = () => {
|
||||
opens the secondary drawer. -->
|
||||
<div class="kp-trigger-shadow" aria-hidden="true">
|
||||
<KnowledgeProcessingTimeline v-if="details.id" :knowledge-id="details.id" :parse-status="details.parse_status"
|
||||
:compact="true" @update:has-spans="hasTimelineSpans = $event" @update:summary="timelineSummary = $event" />
|
||||
:compact="true" :grace-poll="false" @update:has-spans="hasTimelineSpans = $event"
|
||||
@update:summary="timelineSummary = $event" />
|
||||
</div>
|
||||
|
||||
<!-- 二级抽屉:完整 Langfuse-style waterfall -->
|
||||
@@ -858,6 +859,9 @@ const handleDetailsScroll = () => {
|
||||
<div class="kp-drawer-titlebar-left">
|
||||
<span class="kp-drawer-titlebar-kind">trace</span>
|
||||
<span class="kp-drawer-titlebar-title">{{ $t('knowledgeStages.title') }}</span>
|
||||
<span v-if="details.title" class="kp-drawer-titlebar-sep">·</span>
|
||||
<span v-if="details.title" class="kp-drawer-titlebar-doc" :title="details.title">{{ details.title
|
||||
}}</span>
|
||||
</div>
|
||||
<button type="button" class="kp-drawer-titlebar-close" @click="closeTimeline"
|
||||
:aria-label="$t('knowledgeStages.close')">
|
||||
@@ -1375,6 +1379,22 @@ const handleDetailsScroll = () => {
|
||||
letter-spacing: -0.005em;
|
||||
}
|
||||
|
||||
.kp-drawer-titlebar-sep {
|
||||
font-size: 13px;
|
||||
color: var(--td-text-color-placeholder);
|
||||
flex-shrink: 0;
|
||||
}
|
||||
|
||||
.kp-drawer-titlebar-doc {
|
||||
font-size: 13px;
|
||||
color: var(--td-text-color-secondary);
|
||||
min-width: 0;
|
||||
flex: 0 1 auto;
|
||||
overflow: hidden;
|
||||
text-overflow: ellipsis;
|
||||
white-space: nowrap;
|
||||
}
|
||||
|
||||
.kp-drawer-titlebar-close {
|
||||
display: inline-flex;
|
||||
align-items: center;
|
||||
|
||||
@@ -50,10 +50,20 @@ const props = withDefaults(
|
||||
parseStatus?: string
|
||||
autoPoll?: boolean
|
||||
compact?: boolean
|
||||
// gracePoll keeps the loop alive for QUIESCE_GRACE_MS after the
|
||||
// trace appears quiescent, so post-pipeline subspans (wiki ingest
|
||||
// fires 30s after enqueue, summary/question/graph hit asynq queues
|
||||
// with variable latency) surface without a manual refresh. Off by
|
||||
// default for "background" mounts (e.g. the hidden badge-driver in
|
||||
// doc-content) so closing the visible drawer doesn't leave a
|
||||
// headless polling loop chewing through 2 more minutes of fetches
|
||||
// on nobody's behalf.
|
||||
gracePoll?: boolean
|
||||
}>(),
|
||||
{
|
||||
autoPoll: true,
|
||||
compact: false,
|
||||
gracePoll: true,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -88,7 +98,11 @@ const lastFetchedAt = ref<number>(0)
|
||||
// - New logic re-evaluates on every fetch: stages with children get
|
||||
// expanded unless the user has spoken. Discoverability + respect
|
||||
// for manual collapse, both.
|
||||
const userToggledStages = ref<Set<string>>(new Set())
|
||||
// Tracks every row the user has manually expanded or collapsed (by
|
||||
// row.key). Auto-expand checks this set so deeper subspans that the
|
||||
// user explicitly hid stay hidden across polls, instead of being
|
||||
// reopened by the next fetch's re-evaluation pass.
|
||||
const userToggledRows = ref<Set<string>>(new Set())
|
||||
// Tracks consecutive fetch failures so the "更新于" caption can surface
|
||||
// staleness. When the parse_status is mid-flight but every fetch is
|
||||
// hitting an error, the loop keeps going silently — without this
|
||||
@@ -200,6 +214,58 @@ const isLive = computed<boolean>(() => {
|
||||
return isPolling(props.parseStatus)
|
||||
})
|
||||
|
||||
// Walk every node in the tree and return the freshest updated_at /
|
||||
// finished_at timestamp we can see. Used by the quiescent grace
|
||||
// window below to decide "did this trace finish recently, or is it
|
||||
// just an old completed one we shouldn't waste polls on?"
|
||||
function spanTreeLastActivity(node?: SpanNode): number {
|
||||
if (!node) return 0
|
||||
let max = 0
|
||||
const stamps: (string | null | undefined)[] = [
|
||||
(node as any).updated_at,
|
||||
node.finished_at,
|
||||
node.started_at,
|
||||
(node as any).created_at,
|
||||
]
|
||||
for (const s of stamps) {
|
||||
const t = parseTime(s || undefined)
|
||||
if (t !== null && t > max) max = t
|
||||
}
|
||||
const kids = node.children || []
|
||||
for (let i = 0; i < kids.length; i++) {
|
||||
const t = spanTreeLastActivity(kids[i])
|
||||
if (t > max) max = t
|
||||
}
|
||||
return max
|
||||
}
|
||||
|
||||
// Async post-pipeline tasks (summary, question, graph.chunk[*],
|
||||
// wiki) open their postprocess.* subspans AFTER the parse pipeline
|
||||
// has finalised — wiki in particular fires 30s after enqueue thanks
|
||||
// to wikiIngestDelay. At that exact moment parse_status is already
|
||||
// 'completed' AND every existing span is 'done', so isLive flips to
|
||||
// false and polling stops. The user is then stranded watching a
|
||||
// stale tree until they hit refresh.
|
||||
//
|
||||
// QUIESCE_GRACE_MS keeps the poll loop alive for a bounded window
|
||||
// after the trace appears quiescent so late subspans surface
|
||||
// automatically. Sized generously past wikiIngestDelay (30s) plus a
|
||||
// typical ingest run, but short enough that opening an
|
||||
// already-completed knowledge from days ago doesn't waste many
|
||||
// polls before the loop falls silent.
|
||||
const QUIESCE_GRACE_MS = 2 * 60 * 1000
|
||||
|
||||
const lastTraceActivityAt = computed<number>(() =>
|
||||
spanTreeLastActivity(data.value?.trace),
|
||||
)
|
||||
|
||||
const isWithinQuiesceGrace = computed<boolean>(() => {
|
||||
if (isLive.value) return false
|
||||
const last = lastTraceActivityAt.value
|
||||
if (!last) return false
|
||||
return nowTick.value - last < QUIESCE_GRACE_MS
|
||||
})
|
||||
|
||||
async function fetchSpans(opts: { manual?: boolean } = {}) {
|
||||
if (!props.knowledgeId) return
|
||||
if (fetchInFlight) return
|
||||
@@ -215,22 +281,29 @@ async function fetchSpans(opts: { manual?: boolean } = {}) {
|
||||
if (selectedAttempt.value === undefined) {
|
||||
selectedAttempt.value = data.value.attempt
|
||||
}
|
||||
// Auto-expand rule, applied on EVERY fetch:
|
||||
// - Stage has children + user hasn't toggled it → expand
|
||||
// - Stage has no children → leave whatever state it was in
|
||||
// - User has toggled the stage (either direction) → don't touch
|
||||
// This way subspans that arrive mid-parse surface themselves
|
||||
// without forcing a click, but a user who explicitly collapsed
|
||||
// a noisy stage stays in control.
|
||||
// Auto-expand rule, applied on EVERY fetch and walked over
|
||||
// EVERY level of the tree (stage + subspan + sub-subspan, …):
|
||||
// - Row has children + user hasn't toggled it → expand
|
||||
// - Row has no children → leave whatever state it was in
|
||||
// - User has toggled the row (either direction) → don't touch
|
||||
// Walking the full depth (not just stages) is what makes deep
|
||||
// subspans like postprocess.wiki — whose own children like
|
||||
// postprocess.wiki.extract / .summary / .classify / .page[*]
|
||||
// appear later in the run — surface automatically. Without this
|
||||
// the user would click the chevron on postprocess.wiki and see
|
||||
// nothing change, because the row would default to collapsed
|
||||
// and the auto-expand pass would skip it.
|
||||
const expanded = new Set(expandedRows.value)
|
||||
expanded.add('__root__')
|
||||
for (const stage of data.value.trace?.children || []) {
|
||||
const key = stage.span_id || `stage:${stage.name}`
|
||||
if (userToggledStages.value.has(key)) continue
|
||||
if ((stage.children || []).length > 0) {
|
||||
const autoExpand = (n: SpanNode) => {
|
||||
const key = n.span_id || `stage:${n.name}`
|
||||
const kids = n.children || []
|
||||
if (kids.length > 0 && !userToggledRows.value.has(key)) {
|
||||
expanded.add(key)
|
||||
}
|
||||
for (const c of kids) autoExpand(c)
|
||||
}
|
||||
for (const stage of data.value.trace?.children || []) autoExpand(stage)
|
||||
expandedRows.value = expanded
|
||||
const traceStatus = data.value.trace?.status || data.value.parse_status || 'running'
|
||||
attemptStatuses.set(data.value.attempt, traceStatus)
|
||||
@@ -330,9 +403,9 @@ function onAttemptChange(n: number) {
|
||||
if (Number.isNaN(n)) return
|
||||
selectedAttempt.value = n
|
||||
selectedSpanId.value = null
|
||||
// New attempt: forget per-stage user choices so the auto-expand
|
||||
// New attempt: forget per-row user choices so the auto-expand
|
||||
// rule re-evaluates cleanly against the new attempt's tree.
|
||||
userToggledStages.value = new Set()
|
||||
userToggledRows.value = new Set()
|
||||
expandedRows.value = new Set(['__root__'])
|
||||
fetchSpans()
|
||||
}
|
||||
@@ -345,7 +418,7 @@ watch(
|
||||
expandedRows.value = new Set(['__root__'])
|
||||
selectedSpanId.value = null
|
||||
attemptStatuses.clear()
|
||||
userToggledStages.value = new Set()
|
||||
userToggledRows.value = new Set()
|
||||
fetchSpans()
|
||||
},
|
||||
)
|
||||
@@ -367,9 +440,16 @@ onMounted(() => {
|
||||
pollTimer = setInterval(() => {
|
||||
if (unmounted) return
|
||||
if (fetchInFlight) return
|
||||
// Reuse the same isLive logic that drives the badge — so what
|
||||
// the user sees and what we actually fetch can never disagree.
|
||||
if (!isLive.value) return
|
||||
// isLive covers the "pipeline is obviously still running" case
|
||||
// (parse_status pending/processing OR any span running/pending).
|
||||
// isWithinQuiesceGrace covers the harder case where parse_status
|
||||
// has flipped to 'completed' but a post-pipeline subspan
|
||||
// (wiki/summary/question/graph) is about to be created on the
|
||||
// backend queue and we'd otherwise stop polling right before it
|
||||
// appears. See QUIESCE_GRACE_MS for the rationale. Background
|
||||
// mounts (gracePoll=false) opt out so they don't keep firing
|
||||
// requests after their host UI (e.g. the trace drawer) closed.
|
||||
if (!isLive.value && !(props.gracePoll && isWithinQuiesceGrace.value)) return
|
||||
fetchSpans()
|
||||
}, POLL_INTERVAL_MS)
|
||||
}
|
||||
@@ -568,6 +648,11 @@ const flatRows = computed<FlatRow[]>(() => {
|
||||
isStage: false,
|
||||
parentKey,
|
||||
})
|
||||
// Honour the expand/collapse state for subspans too — without
|
||||
// this gate, clicking the chevron on a subspan (e.g.
|
||||
// postprocess.wiki) only rotated the icon but the children
|
||||
// were always rendered, so the toggle looked broken.
|
||||
if (!expandedRows.value.has(key)) return
|
||||
kids.forEach((c, i) => walk(c, depth + 1, `${idxPath}/${i}`, key))
|
||||
}
|
||||
stageChildren.forEach((c, i) => walk(c, 2, `${stageKey}/${i}`, stageKey))
|
||||
@@ -677,15 +762,14 @@ function toggleTree(row: FlatRow, ev?: MouseEvent) {
|
||||
if (next.has(row.key)) next.delete(row.key)
|
||||
else next.add(row.key)
|
||||
expandedRows.value = next
|
||||
// Once the user toggles a stage row, opt it out of further
|
||||
// auto-management so subsequent polls don't override their choice.
|
||||
// Deeper rows (subspans of subspans) aren't auto-managed anyway,
|
||||
// so we only need to track stage-level intent here.
|
||||
if (row.isStage) {
|
||||
const touched = new Set(userToggledStages.value)
|
||||
touched.add(row.key)
|
||||
userToggledStages.value = touched
|
||||
}
|
||||
// Record the manual toggle at every depth so the next poll's
|
||||
// auto-expand pass leaves this row alone. Without this, a user
|
||||
// who collapsed a noisy subspan (e.g. postprocess.wiki with a
|
||||
// dozen page[*] children) would see it pop back open on the
|
||||
// next 2s tick.
|
||||
const touched = new Set(userToggledRows.value)
|
||||
touched.add(row.key)
|
||||
userToggledRows.value = touched
|
||||
}
|
||||
|
||||
function selectRow(row: FlatRow) {
|
||||
@@ -2032,6 +2116,19 @@ const stageBreakdown = computed<StageRowSummary[]>(() => {
|
||||
opacity: 1;
|
||||
}
|
||||
|
||||
/* ROOT row sits flush against the sticky ruler at the top of
|
||||
.kp-body (which is `overflow-y: auto` — it clips anything that
|
||||
tries to escape upward). The default `bottom: calc(100% + 8px)`
|
||||
tooltip placement therefore renders as a half-cropped black bar
|
||||
for the root row's solid + dashed bars. Flip it below the bar so
|
||||
the tooltip lands inside the next row's bar lane (always empty in
|
||||
that horizontal slice — the dashed wrap bar sits at top:9), which
|
||||
neither clips nor obscures the next stage's label cell. */
|
||||
.kp-row-root .kp-bar-tip {
|
||||
bottom: auto;
|
||||
top: calc(100% + 8px);
|
||||
}
|
||||
|
||||
/* Status palette — NOT all green. The project brand color happens
|
||||
to be green, which made done/running visually identical (both
|
||||
solid green). Done stays green (universal "success" semantic);
|
||||
|
||||
@@ -201,6 +201,13 @@ type wikiIngestService struct {
|
||||
pendingRepo interfaces.TaskPendingOpsRepository
|
||||
deadLetterRepo interfaces.TaskDeadLetterRepository
|
||||
redisClient *redis.Client // nil in Lite mode (no Redis)
|
||||
// spanTracker lets per-document map work surface as a
|
||||
// postprocess.wiki subspan in the knowledge trace tree. Async
|
||||
// batch design means we look up the parent attempt by knowledge
|
||||
// id at run-time (LatestAttempt) rather than carrying it in the
|
||||
// asynq payload, which is per-KB and would otherwise be ambiguous
|
||||
// for the 5-docs-per-batch fan-out.
|
||||
spanTracker SpanTracker
|
||||
// liteLocks provides per-KB mutual exclusion in Lite mode (no Redis).
|
||||
// Keys are kbID strings; values are unused (presence = locked).
|
||||
liteLocks sync.Map
|
||||
@@ -218,6 +225,7 @@ func NewWikiIngestService(
|
||||
pendingRepo interfaces.TaskPendingOpsRepository,
|
||||
deadLetterRepo interfaces.TaskDeadLetterRepository,
|
||||
redisClient *redis.Client,
|
||||
spanTracker SpanTracker,
|
||||
) interfaces.TaskHandler {
|
||||
svc := &wikiIngestService{
|
||||
wikiService: wikiService,
|
||||
@@ -230,10 +238,45 @@ func NewWikiIngestService(
|
||||
pendingRepo: pendingRepo,
|
||||
deadLetterRepo: deadLetterRepo,
|
||||
redisClient: redisClient,
|
||||
spanTracker: spanTracker,
|
||||
}
|
||||
return svc
|
||||
}
|
||||
|
||||
// tracker returns a non-nil span tracker so callers don't have to
|
||||
// nil-check on every Begin/End. Matches the noopSpanTracker pattern
|
||||
// used elsewhere (see knowledgeService.tracker, KnowledgePostProcessService.tracker).
|
||||
func (s *wikiIngestService) tracker() SpanTracker {
|
||||
if s.spanTracker == nil {
|
||||
return noopSpanTracker{}
|
||||
}
|
||||
return s.spanTracker
|
||||
}
|
||||
|
||||
// beginWikiSubspan opens a postprocess.wiki subspan for this document
|
||||
// under the knowledge's most recent attempt. Returns nil when there is
|
||||
// no parse attempt to attach to (e.g. a wiki ingest fired from a manual
|
||||
// reparse path that never went through the tracker) — callers must
|
||||
// pair every begin with a tolerant end / fail / skip below.
|
||||
//
|
||||
// Lookups are by `LatestAttempt(knowledgeID)` because the asynq task
|
||||
// payload (WikiIngestPayload) is KB-scoped and carries no per-doc
|
||||
// attempt — see the type's comment for the batch architecture.
|
||||
func (s *wikiIngestService) beginWikiSubspan(ctx context.Context, knowledgeID string, input types.JSONMap) *Span {
|
||||
if knowledgeID == "" {
|
||||
return nil
|
||||
}
|
||||
attempt := s.tracker().LatestAttempt(ctx, knowledgeID)
|
||||
if attempt <= 0 {
|
||||
return nil
|
||||
}
|
||||
parent := s.tracker().LookupStage(ctx, knowledgeID, attempt, types.StagePostProcess)
|
||||
if parent == nil {
|
||||
return nil
|
||||
}
|
||||
return s.tracker().BeginSubSpan(ctx, parent, "postprocess.wiki", types.SpanKindSubSpan, input)
|
||||
}
|
||||
|
||||
// EnqueueWikiIngest queues a document for wiki ingestion.
|
||||
//
|
||||
// Architecture: each upload inserts one row into task_pending_ops
|
||||
@@ -533,6 +576,20 @@ type docIngestResult struct {
|
||||
// the slug (for navigation / retract lookups) and the human-readable
|
||||
// title captured at ingest time (for the log feed's display layer).
|
||||
Pages []types.WikiLogPageRef
|
||||
// MapStats are the per-doc map-phase metrics captured at the moment
|
||||
// mapOneDocument finishes. Surfaced into the postprocess.wiki span's
|
||||
// output so the trace viewer can show "what the map phase produced"
|
||||
// even though the span itself stays open until the batch's reduce +
|
||||
// cleanup phases complete (so the user-visible duration covers the
|
||||
// whole pipeline for this doc, not just LLM extraction).
|
||||
MapStats types.JSONMap
|
||||
// WikiSpan is the postprocess.wiki subspan opened at the start of
|
||||
// mapOneDocument. ProcessWikiIngest holds it open across the reduce
|
||||
// + cleanup phases and closes it once this doc's pages have all
|
||||
// been materialised — see the EndSpan call near the end of
|
||||
// ProcessWikiIngest. nil when no parent attempt was found, in which
|
||||
// case the tracker helpers are all no-ops anyway.
|
||||
WikiSpan *Span
|
||||
}
|
||||
|
||||
// WikiBatchContext holds shared data across Map and Reduce phases.
|
||||
@@ -636,6 +693,93 @@ func previewStringSlice(items []string, limit int) string {
|
||||
return fmt.Sprintf("[%s]", strings.Join(out, ", "))
|
||||
}
|
||||
|
||||
// previewExtractedItems returns a JSON-friendly preview of the first
|
||||
// `limit` extracted entities or concepts so the trace viewer's
|
||||
// postprocess.wiki.extract span shows actual names/slugs/descriptions
|
||||
// instead of bare counts. Each item is trimmed to a small fixed
|
||||
// budget — these end up serialised into the spans table's JSONB
|
||||
// output column, so the cumulative size matters more than per-item
|
||||
// fidelity.
|
||||
func previewExtractedItems(items []extractedItem, limit int) []map[string]string {
|
||||
if limit <= 0 {
|
||||
limit = 1
|
||||
}
|
||||
n := len(items)
|
||||
if n > limit {
|
||||
items = items[:limit]
|
||||
}
|
||||
out := make([]map[string]string, 0, len(items))
|
||||
for _, it := range items {
|
||||
out = append(out, map[string]string{
|
||||
"name": previewText(it.Name, 60),
|
||||
"slug": it.Slug,
|
||||
"description": previewText(it.Description, 120),
|
||||
})
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// topCitedSlugs returns the top `limit` slugs by chunk-citation count.
|
||||
// Used by postprocess.wiki.classify so the trace surfaces which
|
||||
// candidate slugs the citation pass attached the most chunks to —
|
||||
// useful when triaging "this LLM run extracted weird things" without
|
||||
// having to open and diff full chunk lists.
|
||||
func topCitedSlugs(citations map[string][]string, limit int) []map[string]any {
|
||||
if len(citations) == 0 {
|
||||
return nil
|
||||
}
|
||||
type entry struct {
|
||||
slug string
|
||||
count int
|
||||
}
|
||||
entries := make([]entry, 0, len(citations))
|
||||
for slug, ids := range citations {
|
||||
entries = append(entries, entry{slug: slug, count: len(ids)})
|
||||
}
|
||||
sort.Slice(entries, func(i, j int) bool {
|
||||
if entries[i].count != entries[j].count {
|
||||
return entries[i].count > entries[j].count
|
||||
}
|
||||
return entries[i].slug < entries[j].slug
|
||||
})
|
||||
if limit > 0 && len(entries) > limit {
|
||||
entries = entries[:limit]
|
||||
}
|
||||
out := make([]map[string]any, 0, len(entries))
|
||||
for _, e := range entries {
|
||||
out = append(out, map[string]any{
|
||||
"slug": e.slug,
|
||||
"chunks": e.count,
|
||||
})
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// previewNewSlugs returns a JSON-friendly preview of the first
|
||||
// `limit` slugs that the citation pass discovered (i.e. did not appear
|
||||
// in pass-0's candidate list). Surfacing these makes "the citation
|
||||
// LLM kept inventing entries" trivially diagnosable from the trace
|
||||
// viewer.
|
||||
func previewNewSlugs(items []newSlugFromCitation, limit int) []map[string]string {
|
||||
if limit <= 0 {
|
||||
limit = 1
|
||||
}
|
||||
n := len(items)
|
||||
if n > limit {
|
||||
items = items[:limit]
|
||||
}
|
||||
out := make([]map[string]string, 0, len(items))
|
||||
for _, it := range items {
|
||||
out = append(out, map[string]string{
|
||||
"name": previewText(it.Name, 60),
|
||||
"slug": it.Slug,
|
||||
"type": it.Type,
|
||||
"chunks": fmt.Sprintf("%d", len(it.SourceChunks)),
|
||||
})
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// wikiLinkRE matches `[[slug]]` and `[[slug|display text]]` references
|
||||
// inside wiki page content. The slug capture group rejects whitespace and
|
||||
// the closing-bracket / pipe characters so we don't accidentally swallow
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/Tencent/WeKnora/internal/agent"
|
||||
"github.com/Tencent/WeKnora/internal/logger"
|
||||
@@ -485,11 +486,22 @@ func (s *wikiIngestService) ProcessWikiIngest(ctx context.Context, t *asynq.Task
|
||||
// pointing at missing pages.
|
||||
failedAdditionSlugs := make(map[string]struct{})
|
||||
|
||||
// Build the kid → wikiSpan lookup before kicking off reduce. Each
|
||||
// per-slug reduce attaches a postprocess.wiki.page[slug] subspan
|
||||
// under the FIRST contributing doc's wiki span — see comment in
|
||||
// reduceSlugUpdates for the multi-contributor attribution rule.
|
||||
kidToWikiSpan := make(map[string]*Span, len(docResults))
|
||||
for _, r := range docResults {
|
||||
if r != nil && r.WikiSpan != nil {
|
||||
kidToWikiSpan[r.KnowledgeID] = r.WikiSpan
|
||||
}
|
||||
}
|
||||
|
||||
for slug, updates := range slugUpdates {
|
||||
slug := slug
|
||||
updates := updates
|
||||
egReduce.Go(func() error {
|
||||
changed, affectedType, additionFailed, err := s.reduceSlugUpdates(reduceCtx, chatModel, payload.KnowledgeBaseID, slug, updates, payload.TenantID, batchCtx)
|
||||
changed, affectedType, additionFailed, err := s.reduceSlugUpdates(reduceCtx, chatModel, payload.KnowledgeBaseID, slug, updates, payload.TenantID, batchCtx, kidToWikiSpan)
|
||||
if err != nil {
|
||||
logger.Warnf(reduceCtx, "wiki ingest: reduce failed for slug %s: %v", slug, err)
|
||||
}
|
||||
@@ -645,6 +657,51 @@ func (s *wikiIngestService) ProcessWikiIngest(ctx context.Context, t *asynq.Task
|
||||
s.publishDraftPages(ctx, payload.KnowledgeBaseID, allPagesAffected)
|
||||
}
|
||||
|
||||
// Close postprocess.wiki spans for every successfully-mapped doc.
|
||||
// Span duration now spans map + reduce + index rebuild + cleanup +
|
||||
// cross-link injection + publish, matching the wall-clock window
|
||||
// the user thinks of as "wiki processing for this knowledge".
|
||||
// Per-doc page write outcomes are summarised in the output so the
|
||||
// trace viewer can show how many of the doc's extracted pages
|
||||
// actually landed (vs. dropped because reduce-phase generation
|
||||
// failed).
|
||||
failedAdditionSlugCount := len(failedAdditionSlugs)
|
||||
for _, r := range docResults {
|
||||
if r == nil || r.WikiSpan == nil {
|
||||
continue
|
||||
}
|
||||
writtenPages := make([]map[string]string, 0, len(r.Pages))
|
||||
droppedPages := make([]map[string]string, 0)
|
||||
for _, p := range r.Pages {
|
||||
entry := map[string]string{
|
||||
"slug": p.Slug,
|
||||
"title": previewText(p.Title, 80),
|
||||
}
|
||||
if _, bad := failedAdditionSlugs[p.Slug]; bad {
|
||||
droppedPages = append(droppedPages, entry)
|
||||
continue
|
||||
}
|
||||
writtenPages = append(writtenPages, entry)
|
||||
}
|
||||
output := types.JSONMap{
|
||||
"pages_written": len(writtenPages),
|
||||
"pages_dropped": len(droppedPages),
|
||||
"pages_total": len(r.Pages),
|
||||
"failed_slug_writes": failedAdditionSlugCount,
|
||||
"pages_written_preview": writtenPages,
|
||||
}
|
||||
if len(droppedPages) > 0 {
|
||||
output["pages_dropped_preview"] = droppedPages
|
||||
}
|
||||
for k, v := range r.MapStats {
|
||||
output[k] = v
|
||||
}
|
||||
s.tracker().EndSpan(ctx, r.WikiSpan, output)
|
||||
}
|
||||
// Failed-map docs already had FailSpan called inside
|
||||
// mapOneDocument (the failedOps path returns before reaching
|
||||
// docResults). Nothing extra to do here for them.
|
||||
|
||||
// Build the trim set: rows that should be removed from
|
||||
// task_pending_ops. We start from the full peekedIDs (every row we
|
||||
// pulled, even ones de-duplicated by knowledge_id) and subtract
|
||||
@@ -690,6 +747,16 @@ func (s *wikiIngestService) mapOneDocument(
|
||||
knowledgeID := op.KnowledgeID
|
||||
lang := op.Language
|
||||
|
||||
// Open a postprocess.wiki subspan under the parent attempt's
|
||||
// postprocess stage so the actual per-doc work (LLM extraction +
|
||||
// summary + classification) shows up in the trace tree. Returns
|
||||
// nil when the parent attempt is gone (no panic on missing
|
||||
// lookups — span tracker is best-effort).
|
||||
wikiSpan := s.beginWikiSubspan(ctx, knowledgeID, types.JSONMap{
|
||||
"language": lang,
|
||||
"knowledge_base_id": payload.KnowledgeBaseID,
|
||||
})
|
||||
|
||||
// Guard against the ingest/delete race: if the user deleted the doc while
|
||||
// this task was queued (wikiIngestDelay = 30s) or while an earlier stage
|
||||
// was in flight, we must NOT proceed to LLM extraction — doing so would
|
||||
@@ -697,15 +764,18 @@ func (s *wikiIngestService) mapOneDocument(
|
||||
// permanently unreachable via wiki_read_source_doc.
|
||||
if s.isKnowledgeGone(ctx, payload.KnowledgeBaseID, knowledgeID) {
|
||||
logger.Infof(ctx, "wiki ingest: knowledge %s has been deleted, skip map", knowledgeID)
|
||||
s.tracker().SkipSpan(ctx, wikiSpan, "knowledge_deleted")
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
chunks, err := s.chunkRepo.ListChunksByKnowledgeID(ctx, payload.TenantID, knowledgeID)
|
||||
if err != nil {
|
||||
s.tracker().FailSpan(ctx, wikiSpan, "LIST_CHUNKS_FAILED", err.Error(), err)
|
||||
return nil, nil, fmt.Errorf("get chunks: %w", err)
|
||||
}
|
||||
if len(chunks) == 0 {
|
||||
logger.Infof(ctx, "wiki ingest: document %s has no chunks, skip", knowledgeID)
|
||||
s.tracker().SkipSpan(ctx, wikiSpan, "no_chunks")
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
@@ -725,6 +795,7 @@ func (s *wikiIngestService) mapOneDocument(
|
||||
"wiki ingest: doc %s has insufficient text content after stripping image markup (raw_len=%d), skipping LLM extraction",
|
||||
knowledgeID, rawRuneCount,
|
||||
)
|
||||
s.tracker().SkipSpan(ctx, wikiSpan, "insufficient_text_content")
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
@@ -760,6 +831,10 @@ func (s *wikiIngestService) mapOneDocument(
|
||||
pass0Failed bool
|
||||
)
|
||||
logger.Infof(ctx, "wiki ingest: pass 0 — extracting candidate slugs for %s", knowledgeID)
|
||||
extractSpan := s.tracker().BeginSubSpan(ctx, wikiSpan, "postprocess.wiki.extract", types.SpanKindSubSpan, types.JSONMap{
|
||||
"content_chars": utf8.RuneCountInString(content),
|
||||
"old_pages": len(oldPageSlugs),
|
||||
})
|
||||
extractedEntities, extractedConcepts, slugItems, err = s.extractCandidateSlugs(ctx, chatModel, payload.KnowledgeBaseID, content, lang, oldPageSlugs, batchCtx)
|
||||
if err != nil {
|
||||
logger.Warnf(ctx, "wiki ingest: pass 0 failed for %s (%v) — falling back to legacy extractor", knowledgeID, err)
|
||||
@@ -767,9 +842,18 @@ func (s *wikiIngestService) mapOneDocument(
|
||||
extractedEntities, extractedConcepts, slugItems, err = s.extractEntitiesAndConceptsNoUpsert(ctx, chatModel, payload.KnowledgeBaseID, content, lang, oldPageSlugs, batchCtx)
|
||||
if err != nil {
|
||||
logger.Warnf(ctx, "wiki ingest: legacy fallback also failed for %s: %v", knowledgeID, err)
|
||||
s.tracker().FailSpan(ctx, extractSpan, "EXTRACT_FAILED", err.Error(), err)
|
||||
s.tracker().FailSpan(ctx, wikiSpan, "EXTRACT_FAILED", err.Error(), err)
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
s.tracker().EndSpan(ctx, extractSpan, types.JSONMap{
|
||||
"entities": len(extractedEntities),
|
||||
"concepts": len(extractedConcepts),
|
||||
"pass0_fallback": pass0Failed,
|
||||
"entities_preview": previewExtractedItems(extractedEntities, 8),
|
||||
"concepts_preview": previewExtractedItems(extractedConcepts, 8),
|
||||
})
|
||||
|
||||
// Build slug listing for Summary's wiki-link input.
|
||||
var summaryExtractedPages []string
|
||||
@@ -806,6 +890,21 @@ func (s *wikiIngestService) mapOneDocument(
|
||||
batchCount int
|
||||
)
|
||||
|
||||
// Both calls run in parallel goroutines under the same wikiSpan
|
||||
// parent — their subspans will visually overlap in the trace view,
|
||||
// which correctly reflects their wall-clock concurrency.
|
||||
summarySpan := s.tracker().BeginSubSpan(ctx, wikiSpan, "postprocess.wiki.summary", types.SpanKindSubSpan, types.JSONMap{
|
||||
"content_chars": utf8.RuneCountInString(content),
|
||||
"extracted_slugs": len(summaryExtractedPages),
|
||||
})
|
||||
var classifySpan *Span
|
||||
if !pass0Failed {
|
||||
classifySpan = s.tracker().BeginSubSpan(ctx, wikiSpan, "postprocess.wiki.classify", types.SpanKindSubSpan, types.JSONMap{
|
||||
"chunks": len(chunks),
|
||||
"candidates": len(extractedEntities) + len(extractedConcepts),
|
||||
})
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
@@ -815,6 +914,16 @@ func (s *wikiIngestService) mapOneDocument(
|
||||
"Language": lang,
|
||||
"ExtractedSlugs": slugListing,
|
||||
})
|
||||
if summaryErr != nil {
|
||||
s.tracker().FailSpan(ctx, summarySpan, "SUMMARY_FAILED", summaryErr.Error(), summaryErr)
|
||||
} else {
|
||||
sumLine, sumBody := splitSummaryLine(summaryContent)
|
||||
s.tracker().EndSpan(ctx, summarySpan, types.JSONMap{
|
||||
"chars": utf8.RuneCountInString(summaryContent),
|
||||
"summary_line": previewText(sumLine, 160),
|
||||
"body_preview": previewText(sumBody, 320),
|
||||
})
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
@@ -827,6 +936,13 @@ func (s *wikiIngestService) mapOneDocument(
|
||||
}
|
||||
candidatesXML := renderCandidateSlugsXML(extractedEntities, extractedConcepts)
|
||||
citations, newSlugs, batchCount = s.classifyChunkCitations(ctx, chatModel, candidatesXML, chunks, lang)
|
||||
s.tracker().EndSpan(ctx, classifySpan, types.JSONMap{
|
||||
"cited_slugs": len(citations),
|
||||
"new_slugs": len(newSlugs),
|
||||
"batches": batchCount,
|
||||
"top_cited": topCitedSlugs(citations, 8),
|
||||
"new_slugs_sample": previewNewSlugs(newSlugs, 8),
|
||||
})
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
@@ -894,6 +1010,7 @@ func (s *wikiIngestService) mapOneDocument(
|
||||
// The internal retries in generateWithTemplate already exhaust
|
||||
// the LLM's own transient-error budget before we give up here.
|
||||
logger.Errorf(ctx, "wiki ingest: generate summary failed for %s, will requeue: %v", knowledgeID, summaryErr)
|
||||
s.tracker().FailSpan(ctx, wikiSpan, "SUMMARY_FAILED", summaryErr.Error(), summaryErr)
|
||||
return nil, nil, fmt.Errorf("generate summary: %w", summaryErr)
|
||||
}
|
||||
sumLine, sumBody := splitSummaryLine(summaryContent)
|
||||
@@ -1028,11 +1145,37 @@ func (s *wikiIngestService) mapOneDocument(
|
||||
time.Since(docStartedAt).Round(time.Millisecond),
|
||||
)
|
||||
|
||||
// Map-phase metrics get attached to the postprocess.wiki span's
|
||||
// output, but we do NOT EndSpan here — the batch driver keeps the
|
||||
// span open through reduce + index rebuild + cross-link injection
|
||||
// + page publish, then closes it once this doc's pages have all
|
||||
// been written. That way the span's duration reflects the full
|
||||
// "wiki processing for this knowledge" time the user sees in the
|
||||
// trace viewer, not just the LLM extraction slice.
|
||||
mapStats := types.JSONMap{
|
||||
"doc_title": previewText(docTitle, 120),
|
||||
"chunks": len(chunks),
|
||||
"candidate_slugs": len(slugItems),
|
||||
"cited_chunks": len(citedChunkSet),
|
||||
"uncited_slugs": uncited,
|
||||
"new_slugs": len(newSlugs),
|
||||
"updates": len(updates),
|
||||
"reparse_slugs": reparseOverlap,
|
||||
"stale_slugs": staleCount,
|
||||
"extracted_pages": len(extractedPages),
|
||||
"summary_chars": utf8.RuneCountInString(docSummary),
|
||||
"pass0_fallback": pass0Failed,
|
||||
"classify_batches": batchCount,
|
||||
"summary_preview": previewText(docSummaryLine, 160),
|
||||
}
|
||||
|
||||
return &docIngestResult{
|
||||
KnowledgeID: knowledgeID,
|
||||
DocTitle: docTitle,
|
||||
Summary: docSummaryLine,
|
||||
Pages: extractedPages,
|
||||
MapStats: mapStats,
|
||||
WikiSpan: wikiSpan,
|
||||
}, updates, nil
|
||||
}
|
||||
|
||||
@@ -1121,6 +1264,7 @@ func (s *wikiIngestService) reduceSlugUpdates(
|
||||
updates []SlugUpdate,
|
||||
tenantID uint64,
|
||||
batchCtx *WikiBatchContext,
|
||||
kidToWikiSpan map[string]*Span,
|
||||
) (changed bool, affectedType string, additionFailed bool, err error) {
|
||||
// Final safety net for the ingest/delete race: between Map (which already
|
||||
// checks isKnowledgeGone) and Reduce there is a long LLM call where the
|
||||
@@ -1133,7 +1277,75 @@ func (s *wikiIngestService) reduceSlugUpdates(
|
||||
return false, "", false, nil
|
||||
}
|
||||
|
||||
// Per-slug page span attribution: a single slug can receive
|
||||
// contributions from multiple docs in the same batch (entity /
|
||||
// concept pages aggregate across sources). We attach the
|
||||
// postprocess.wiki.page[slug] subspan under whichever
|
||||
// contributing doc's wikiSpan is encountered first in the updates
|
||||
// list — span tree topology only allows one parent. Every
|
||||
// contributing knowledge id is recorded in the span's `contributors`
|
||||
// output so users can still see the full attribution. Pages whose
|
||||
// only contributors had no wikiSpan (e.g. their parse attempt
|
||||
// already closed and was archived) simply get a nil pageSpan,
|
||||
// which the tracker helpers no-op on.
|
||||
var (
|
||||
pageSpan *Span
|
||||
contributors []string
|
||||
)
|
||||
{
|
||||
seen := make(map[string]bool, len(updates))
|
||||
for _, u := range updates {
|
||||
kid := u.KnowledgeID
|
||||
if kid == "" || seen[kid] {
|
||||
continue
|
||||
}
|
||||
seen[kid] = true
|
||||
contributors = append(contributors, kid)
|
||||
if pageSpan == nil {
|
||||
if sp, ok := kidToWikiSpan[kid]; ok && sp != nil {
|
||||
pageSpan = s.tracker().BeginSubSpan(ctx, sp, fmt.Sprintf("postprocess.wiki.page[%s]", slug), types.SpanKindSubSpan, types.JSONMap{
|
||||
"slug": slug,
|
||||
"updates": len(updates),
|
||||
"contributors": contributors,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
var page *types.WikiPage
|
||||
// Deferred output captures `&page` so it observes the post-merge
|
||||
// state (title, page type, content snippet) at function return —
|
||||
// that's what's actually useful in the trace viewer, not the
|
||||
// stale pre-reduce shell that exists when the defer is registered.
|
||||
defer func() {
|
||||
if pageSpan == nil {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
s.tracker().FailSpan(ctx, pageSpan, "REDUCE_FAILED", err.Error(), err)
|
||||
return
|
||||
}
|
||||
if !changed {
|
||||
s.tracker().SkipSpan(ctx, pageSpan, "no_change")
|
||||
return
|
||||
}
|
||||
out := types.JSONMap{
|
||||
"affected_type": affectedType,
|
||||
"addition_failed": additionFailed,
|
||||
"contributors": contributors,
|
||||
}
|
||||
if page != nil {
|
||||
out["page_title"] = previewText(page.Title, 160)
|
||||
out["page_type"] = string(page.PageType)
|
||||
out["page_summary"] = previewText(page.Summary, 200)
|
||||
out["content_preview"] = previewText(page.Content, 320)
|
||||
out["source_refs"] = len(page.SourceRefs)
|
||||
out["chunk_refs"] = len(page.ChunkRefs)
|
||||
out["aliases"] = []string(page.Aliases)
|
||||
}
|
||||
s.tracker().EndSpan(ctx, pageSpan, out)
|
||||
}()
|
||||
|
||||
page, err = s.wikiService.GetPageBySlug(ctx, kbID, slug)
|
||||
exists := (err == nil && page != nil)
|
||||
|
||||
|
||||
@@ -718,11 +718,40 @@ func buildSpanTree(knowledgeID string, attempt int, rows []types.KnowledgeProces
|
||||
root = nodes[rootRow.SpanID]
|
||||
}
|
||||
|
||||
// Link real children to their parents. Walk `rows` (not the
|
||||
// `nodes` map!) so the append order matches the repo's stable
|
||||
// `ORDER BY id ASC`. Iterating the map directly would give
|
||||
// callers a different child ordering on every request — Go map
|
||||
// iteration is intentionally randomised — and the UI would
|
||||
// flicker subspans into a different order on each refresh.
|
||||
for i := range rows {
|
||||
r := &rows[i]
|
||||
n := nodes[r.SpanID]
|
||||
if n == nil || n == root {
|
||||
continue
|
||||
}
|
||||
if r.ParentSpanID == "" {
|
||||
// Real top-level row with no parent and not the root
|
||||
// itself — attach to root so it doesn't dangle.
|
||||
root.Children = append(root.Children, n)
|
||||
continue
|
||||
}
|
||||
parent, ok := nodes[r.ParentSpanID]
|
||||
if !ok {
|
||||
// Unknown parent (orphan); attach to root.
|
||||
root.Children = append(root.Children, n)
|
||||
continue
|
||||
}
|
||||
parent.Children = append(parent.Children, n)
|
||||
}
|
||||
|
||||
// Synthesize missing stage rows as children of root so the timeline
|
||||
// always shows 5 segments. Status mirrors the synthesized root —
|
||||
// pending while the pipeline is still running, done/failed for
|
||||
// historical knowledge whose terminal state we know but whose
|
||||
// per-stage timing was never recorded.
|
||||
// per-stage timing was never recorded. Appended in AllStages order
|
||||
// so the canonical stage layout is deterministic regardless of
|
||||
// which rows are missing.
|
||||
for _, name := range types.AllStages {
|
||||
if _, ok := stageRowByName[name]; ok {
|
||||
continue
|
||||
@@ -736,31 +765,7 @@ func buildSpanTree(knowledgeID string, attempt int, rows []types.KnowledgeProces
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
nodes[name+"_placeholder_"+knowledgeID] = &types.SpanTreeNode{KnowledgeProcessingSpan: placeholder}
|
||||
// Attach placeholder under root by giving it the root's
|
||||
// span_id as parent — done after the link pass below.
|
||||
}
|
||||
|
||||
// Link real children to their parents. Placeholders we just
|
||||
// added have no real parent; we attach them to root in a
|
||||
// second pass after the linking is done.
|
||||
for _, n := range nodes {
|
||||
if n == root {
|
||||
continue
|
||||
}
|
||||
if n.ParentSpanID == "" {
|
||||
// Real top-level row with no parent and not the root
|
||||
// itself — attach to root so it doesn't dangle.
|
||||
root.Children = append(root.Children, n)
|
||||
continue
|
||||
}
|
||||
parent, ok := nodes[n.ParentSpanID]
|
||||
if !ok {
|
||||
// Unknown parent (orphan); attach to root.
|
||||
root.Children = append(root.Children, n)
|
||||
continue
|
||||
}
|
||||
parent.Children = append(parent.Children, n)
|
||||
root.Children = append(root.Children, &types.SpanTreeNode{KnowledgeProcessingSpan: placeholder})
|
||||
}
|
||||
|
||||
return root, currentStage, lastFailure
|
||||
|
||||
Reference in New Issue
Block a user