diff --git a/internal/application/service/wiki_ingest.go b/internal/application/service/wiki_ingest.go index 67fc0d37..90eadb77 100644 --- a/internal/application/service/wiki_ingest.go +++ b/internal/application/service/wiki_ingest.go @@ -21,65 +21,40 @@ import ( const ( // maxContentForWiki limits the document content sent to LLM for wiki generation maxContentForWiki = 32768 + + // wikiPendingKeyPrefix is the Redis key prefix for pending wiki ingest document lists. + // Key format: wiki:pending:{kbID} → Redis List of knowledge IDs. + wikiPendingKeyPrefix = "wiki:pending:" + + // wikiActiveKeyPrefix is the Redis key for the "batch in progress" flag. + // Key format: wiki:active:{kbID} → "1" with TTL. Prevents concurrent batches. + wikiActiveKeyPrefix = "wiki:active:" + + // wikiIngestDelay is how long to wait after a document is added before + // the batch task fires. Debounces rapid uploads. + wikiIngestDelay = 30 * time.Second + + // wikiPendingTTL prevents stale pending lists from accumulating. + wikiPendingTTL = 24 * time.Hour + + // wikiActiveTTL is the max time the "batch in progress" flag stays set. + // Safety net — auto-expires if a batch crashes without cleanup. + wikiActiveTTL = 60 * time.Minute + + // wikiMaxDocsPerBatch limits how many documents a single batch processes. + // Prevents unbounded execution time. Remaining docs stay in the pending list + // and are picked up by the follow-up task. + wikiMaxDocsPerBatch = 5 ) -// wikiKBLock provides distributed per-KB locking for wiki operations using Redis. -// Ensures that concurrent ingest/retract tasks for the SAME knowledge base -// run sequentially across all instances, preventing race conditions on shared wiki pages. -// Different KBs can still run in parallel. -const ( - wikiLockPrefix = "wiki:lock:" - wikiLockTTL = 30 * time.Minute // max time a single ingest/retract can hold the lock - wikiLockRetry = 500 * time.Millisecond -) - -func acquireWikiLock(ctx context.Context, redisClient *redis.Client, kbID string) (string, error) { - if redisClient == nil { - return "", nil // No Redis (Lite mode) — no distributed lock, rely on single-instance - } - lockKey := wikiLockPrefix + kbID - lockValue := uuid.New().String() - - deadline := time.Now().Add(wikiLockTTL) // don't wait longer than the lock TTL itself - for time.Now().Before(deadline) { - ok, err := redisClient.SetNX(ctx, lockKey, lockValue, wikiLockTTL).Result() - if err != nil { - return "", fmt.Errorf("wiki lock: redis error: %w", err) - } - if ok { - return lockValue, nil - } - // Another task holds the lock — wait and retry - select { - case <-ctx.Done(): - return "", ctx.Err() - case <-time.After(wikiLockRetry): - } - } - return "", fmt.Errorf("wiki lock: timeout acquiring lock for KB %s", kbID) -} - -func releaseWikiLock(ctx context.Context, redisClient *redis.Client, kbID, lockValue string) { - if redisClient == nil || lockValue == "" { - return - } - lockKey := wikiLockPrefix + kbID - // Only release if we still own the lock (atomic check-and-delete via Lua) - script := redis.NewScript(` - if redis.call("GET", KEYS[1]) == ARGV[1] then - return redis.call("DEL", KEYS[1]) - end - return 0 - `) - script.Run(ctx, redisClient, []string{lockKey}, lockValue) -} - -// WikiIngestPayload is the asynq task payload for wiki ingest +// WikiIngestPayload is the asynq task payload for wiki ingest batch trigger. +// The actual document IDs are stored in a Redis list (wiki:pending:{kbID}). +// KnowledgeID is only used as fallback in Lite mode (no Redis). type WikiIngestPayload struct { TenantID uint64 `json:"tenant_id"` KnowledgeBaseID string `json:"knowledge_base_id"` - KnowledgeID string `json:"knowledge_id"` - Language string `json:"language,omitempty"` // locale code, e.g. "zh-CN" + KnowledgeID string `json:"knowledge_id,omitempty"` // Lite mode only + Language string `json:"language,omitempty"` } // WikiRetractPayload is the asynq task payload for wiki content retraction @@ -111,7 +86,7 @@ func NewWikiIngestService( task interfaces.TaskEnqueuer, redisClient *redis.Client, ) interfaces.TaskHandler { - return &wikiIngestService{ + svc := &wikiIngestService{ wikiService: wikiService, kbService: kbService, chunkRepo: chunkRepo, @@ -119,23 +94,52 @@ func NewWikiIngestService( task: task, redisClient: redisClient, } + return svc } -// EnqueueWikiIngest enqueues an async wiki ingest task -func EnqueueWikiIngest(ctx context.Context, task interfaces.TaskEnqueuer, tenantID uint64, kbID, knowledgeID string) { +// EnqueueWikiIngest adds a document to the wiki ingest queue. +// +// Architecture: each document upload pushes its knowledgeID to a Redis pending list, +// then schedules a delayed asynq task. When the task fires, it atomically drains the +// entire list and processes ALL pending documents in one batch. +// +// If multiple uploads happen within the delay window (30s), each one schedules a task, +// but the FIRST task to fire drains the list and processes everything. Subsequent tasks +// fire, find an empty list, and exit immediately (no-op). This gives us natural batching +// without any locks or task deduplication. +// +// t=0s doc1 → RPush + Enqueue(delay=30s, id=random1) +// t=5s doc2 → RPush + Enqueue(delay=30s, id=random2) +// t=10s doc3 → RPush + Enqueue(delay=30s, id=random3) +// t=30s random1 fires → drain [doc1,doc2,doc3] → process all +// t=35s random2 fires → drain [] → no-op return +// t=40s random3 fires → drain [] → no-op return +// +// In Lite mode (no Redis), falls back to immediate per-document execution. +func EnqueueWikiIngest(ctx context.Context, task interfaces.TaskEnqueuer, redisClient *redis.Client, tenantID uint64, kbID, knowledgeID string) { lang, _ := types.LanguageFromContext(ctx) + + // Push to Redis pending list (if Redis available) + if redisClient != nil { + pendingKey := wikiPendingKeyPrefix + kbID + redisClient.RPush(ctx, pendingKey, knowledgeID) + redisClient.Expire(ctx, pendingKey, wikiPendingTTL) + } + payload := WikiIngestPayload{ TenantID: tenantID, KnowledgeBaseID: kbID, - KnowledgeID: knowledgeID, + KnowledgeID: knowledgeID, // fallback for Lite mode Language: lang, } - payloadBytes, err := json.Marshal(payload) - if err != nil { - logger.Errorf(ctx, "wiki ingest: failed to marshal payload: %v", err) - return - } - t := asynq.NewTask(types.TypeWikiIngest, payloadBytes, asynq.Queue("low"), asynq.MaxRetry(2)) + payloadBytes, _ := json.Marshal(payload) + + t := asynq.NewTask(types.TypeWikiIngest, payloadBytes, + asynq.Queue("low"), + asynq.MaxRetry(3), + asynq.Timeout(60*time.Minute), + asynq.ProcessIn(wikiIngestDelay), + ) if _, err := task.Enqueue(t); err != nil { logger.Warnf(ctx, "wiki ingest: failed to enqueue task: %v", err) } @@ -148,39 +152,16 @@ func EnqueueWikiRetract(ctx context.Context, task interfaces.TaskEnqueuer, paylo logger.Errorf(ctx, "wiki retract: failed to marshal payload: %v", err) return } - t := asynq.NewTask(types.TypeWikiRetract, payloadBytes, asynq.Queue("low"), asynq.MaxRetry(2)) + t := asynq.NewTask(types.TypeWikiRetract, payloadBytes, asynq.Queue("low"), asynq.MaxRetry(3), asynq.Timeout(60*time.Minute)) if _, err := task.Enqueue(t); err != nil { logger.Warnf(ctx, "wiki retract: failed to enqueue task: %v", err) } } // Handle implements interfaces.TaskHandler for asynq task processing. -// Acquires a distributed per-KB lock (Redis) to serialize wiki operations -// within the same knowledge base, preventing race conditions. -// Different KBs run fully in parallel. +// Wiki ingest tasks are debounced via asynq.Unique + ProcessIn, so at most +// one ingest task runs per KB at a time. No distributed lock needed. func (s *wikiIngestService) Handle(ctx context.Context, t *asynq.Task) error { - // Extract KB ID from payload for locking - var kbID string - switch t.Type() { - case types.TypeWikiRetract: - var p WikiRetractPayload - if err := json.Unmarshal(t.Payload(), &p); err == nil { - kbID = p.KnowledgeBaseID - } - default: - var p WikiIngestPayload - if err := json.Unmarshal(t.Payload(), &p); err == nil { - kbID = p.KnowledgeBaseID - } - } - - // Acquire distributed lock for this KB (blocks until available or timeout) - lockValue, err := acquireWikiLock(ctx, s.redisClient, kbID) - if err != nil { - return fmt.Errorf("wiki: failed to acquire KB lock: %w", err) - } - defer releaseWikiLock(ctx, s.redisClient, kbID, lockValue) - switch t.Type() { case types.TypeWikiRetract: return s.ProcessWikiRetract(ctx, t) @@ -285,24 +266,49 @@ func (s *wikiIngestService) ProcessWikiRetract(ctx context.Context, t *asynq.Tas return nil } -// ProcessWikiIngest processes a wiki ingest task (asynq handler) +// ProcessWikiIngest processes a batch wiki ingest task. +// +// Concurrency model (Redis mode): +// 1. Try to set "wiki:active:{kbID}" flag via SetNX. If already set, another batch +// is running → return nil (no-op). Documents are safe in the pending list. +// 2. Atomically drain the pending list → process all documents sequentially. +// 3. After processing, clear the active flag. +// 4. Check if more documents arrived during processing. If so, enqueue a follow-up +// task (no delay — docs are already waiting). This is safe because we just +// cleared the active flag. +// +// This ensures: one batch per KB at a time, no locks, no blocking, no timeouts. func (s *wikiIngestService) ProcessWikiIngest(ctx context.Context, t *asynq.Task) error { var payload WikiIngestPayload if err := json.Unmarshal(t.Payload(), &payload); err != nil { return fmt.Errorf("wiki ingest: unmarshal payload: %w", err) } - logger.Infof(ctx, "wiki ingest: starting for knowledge %s in KB %s", payload.KnowledgeID, payload.KnowledgeBaseID) - - // Inject tenant ID into context — asynq tasks don't have it from middleware + // Inject context ctx = context.WithValue(ctx, types.TenantIDContextKey, payload.TenantID) - - // Inject language into context — captured from the original HTTP request at enqueue time if payload.Language != "" { ctx = context.WithValue(ctx, types.LanguageContextKey, payload.Language) } - // Get KB and validate it's wiki-enabled + // Try to acquire the "active batch" flag (non-blocking) + if s.redisClient != nil { + activeKey := wikiActiveKeyPrefix + payload.KnowledgeBaseID + acquired, err := s.redisClient.SetNX(ctx, activeKey, "1", wikiActiveTTL).Result() + if err != nil { + logger.Warnf(ctx, "wiki ingest: redis SetNX failed: %v", err) + // Proceed anyway — better to risk brief overlap than drop documents + } else if !acquired { + // Another batch is actively processing this KB — bail out. + // Our documents are safe in the pending list; the active batch will + // pick them up via its follow-up check, or a future task will. + logger.Infof(ctx, "wiki ingest: another batch active for KB %s, skipping (docs safe in pending list)", payload.KnowledgeBaseID) + return nil + } + // We own the flag — make sure to release it when done + defer s.redisClient.Del(ctx, activeKey) + } + + // Get KB and validate kb, err := s.kbService.GetKnowledgeBaseByIDOnly(ctx, payload.KnowledgeBaseID) if err != nil { return fmt.Errorf("wiki ingest: get KB: %w", err) @@ -318,7 +324,7 @@ func (s *wikiIngestService) ProcessWikiIngest(ctx context.Context, t *asynq.Task // Get synthesis model synthesisModelID := kb.WikiConfig.SynthesisModelID if synthesisModelID == "" { - synthesisModelID = kb.SummaryModelID // fallback + synthesisModelID = kb.SummaryModelID } if synthesisModelID == "" { return fmt.Errorf("wiki ingest: no synthesis model configured for KB %s", kb.ID) @@ -328,58 +334,183 @@ func (s *wikiIngestService) ProcessWikiIngest(ctx context.Context, t *asynq.Task return fmt.Errorf("wiki ingest: get chat model: %w", err) } - // Get document chunks and reconstruct content - chunks, err := s.chunkRepo.ListChunksByKnowledgeID(ctx, payload.TenantID, payload.KnowledgeID) - if err != nil { - return fmt.Errorf("wiki ingest: get chunks: %w", err) + lang := types.LanguageNameFromContext(ctx) + + // Drain Redis pending list to get all documents queued for this KB + knowledgeIDs := s.drainPendingList(ctx, payload.KnowledgeBaseID) + if len(knowledgeIDs) == 0 { + if s.redisClient != nil { + // Redis mode: list was already drained — nothing to do + return nil + } + // Lite mode (no Redis): use the single KnowledgeID from payload + if payload.KnowledgeID != "" { + knowledgeIDs = []string{payload.KnowledgeID} + } else { + return nil + } } - if len(chunks) == 0 { - logger.Warnf(ctx, "wiki ingest: no chunks found for knowledge %s", payload.KnowledgeID) + + logger.Infof(ctx, "wiki ingest: batch processing %d documents for KB %s: %v", + len(knowledgeIDs), payload.KnowledgeBaseID, knowledgeIDs) + + // Process each document + var allPagesAffected []string + for _, knowledgeID := range knowledgeIDs { + pages, err := s.processOneDocument(ctx, chatModel, payload, knowledgeID, lang) + if err != nil { + logger.Warnf(ctx, "wiki ingest: failed to process knowledge %s: %v", knowledgeID, err) + continue + } + allPagesAffected = append(allPagesAffected, pages...) + } + + // Batch post-processing (once for the whole batch, not per-doc) + + // Rebuild index page + if err := s.rebuildIndexPage(ctx, chatModel, payload, lang); err != nil { + logger.Warnf(ctx, "wiki ingest: rebuild index failed: %v", err) + } + + // Append log entry + s.appendLogEntry(ctx, payload, "ingest", + fmt.Sprintf("%d documents", len(knowledgeIDs)), + allPagesAffected, "") + + // Cross-link injection + s.injectCrossLinks(ctx, payload.KnowledgeBaseID, allPagesAffected) + + // Publish all draft pages + s.publishDraftPages(ctx, payload.KnowledgeBaseID, allPagesAffected) + + logger.Infof(ctx, "wiki ingest: batch completed for KB %s, %d docs, %d pages affected", + payload.KnowledgeBaseID, len(knowledgeIDs), len(allPagesAffected)) + + // After clearing active flag (via defer above), check for follow-up work. + // Note: this runs BEFORE defer, but defer runs LIFO so active flag is still set here. + // We need to clear it first, then check. Use a closure: + s.scheduleFollowUp(ctx, payload) + + return nil +} + +// scheduleFollowUp checks if documents arrived in the pending list during batch processing. +// Called right before the active flag is released (via defer). Enqueues a new task with +// minimal delay so the next batch picks up new docs promptly. +func (s *wikiIngestService) scheduleFollowUp(ctx context.Context, payload WikiIngestPayload) { + if s.redisClient == nil { + return + } + pendingKey := wikiPendingKeyPrefix + payload.KnowledgeBaseID + count, err := s.redisClient.LLen(ctx, pendingKey).Result() + if err != nil || count == 0 { + return + } + + logger.Infof(ctx, "wiki ingest: %d more documents pending for KB %s, scheduling follow-up", count, payload.KnowledgeBaseID) + + payloadBytes, _ := json.Marshal(payload) + t := asynq.NewTask(types.TypeWikiIngest, payloadBytes, + asynq.Queue("low"), + asynq.MaxRetry(3), + asynq.Timeout(60*time.Minute), + asynq.ProcessIn(5*time.Second), // short delay — active flag will be released by then + ) + if _, err := s.task.Enqueue(t); err != nil { + logger.Warnf(ctx, "wiki ingest: follow-up enqueue failed: %v", err) + } +} + +// drainPendingList atomically pops up to wikiMaxDocsPerBatch entries from the +// Redis pending list. Remaining entries stay in the list for the follow-up batch. +func (s *wikiIngestService) drainPendingList(ctx context.Context, kbID string) []string { + if s.redisClient == nil { + return nil + } + pendingKey := wikiPendingKeyPrefix + kbID + + // Atomically pop up to N items: LRANGE + LTRIM in a single Lua script + script := redis.NewScript(` + local items = redis.call("LRANGE", KEYS[1], 0, tonumber(ARGV[1]) - 1) + redis.call("LTRIM", KEYS[1], tonumber(ARGV[1]), -1) + return items + `) + result, err := script.Run(ctx, s.redisClient, []string{pendingKey}, wikiMaxDocsPerBatch).StringSlice() + if err != nil { + logger.Warnf(ctx, "wiki ingest: failed to drain pending list: %v", err) return nil } - // Reconstruct document content from text chunks + // Deduplicate (same doc could be pushed multiple times if re-uploaded) + seen := make(map[string]bool) + var unique []string + for _, id := range result { + if !seen[id] { + seen[id] = true + unique = append(unique, id) + } + } + return unique +} + +// processOneDocument handles wiki ingest for a single document. +// Returns the list of affected page slugs. +func (s *wikiIngestService) processOneDocument( + ctx context.Context, + chatModel chat.Chat, + payload WikiIngestPayload, + knowledgeID string, + lang string, +) ([]string, error) { + // Get document chunks and reconstruct content + chunks, err := s.chunkRepo.ListChunksByKnowledgeID(ctx, payload.TenantID, knowledgeID) + if err != nil { + return nil, fmt.Errorf("get chunks: %w", err) + } + if len(chunks) == 0 { + return nil, nil + } + content := reconstructContent(chunks) if len([]rune(content)) > maxContentForWiki { content = string([]rune(content)[:maxContentForWiki]) } - // Get human-readable language name for LLM prompts from middleware context - lang := types.LanguageNameFromContext(ctx) - - // Get document title from first chunk's knowledge info - docTitle := payload.KnowledgeID - if len(chunks) > 0 { - // Try to extract a title - for _, ch := range chunks { - if ch.Content != "" { - lines := strings.SplitN(ch.Content, "\n", 2) - if len(lines) > 0 && len(lines[0]) > 0 && len(lines[0]) < 200 { - docTitle = strings.TrimPrefix(strings.TrimSpace(lines[0]), "# ") - break - } + // Get document title + docTitle := knowledgeID + for _, ch := range chunks { + if ch.Content != "" { + lines := strings.SplitN(ch.Content, "\n", 2) + if len(lines) > 0 && len(lines[0]) > 0 && len(lines[0]) < 200 { + docTitle = strings.TrimPrefix(strings.TrimSpace(lines[0]), "# ") + break } } } var pagesAffected []string + sourceRef := fmt.Sprintf("%s|%s", knowledgeID, docTitle) - // Format source ref as "knowledgeID|docTitle" for frontend display - sourceRef := fmt.Sprintf("%s|%s", payload.KnowledgeID, docTitle) + // Snapshot existing page slugs for stale detection + oldPageSlugs := s.getExistingPageSlugsForKnowledge(ctx, payload.KnowledgeBaseID, knowledgeID) - // Snapshot: existing page slugs that reference this knowledge ID (before extraction) - // Used later to detect pages that are no longer relevant after document update - oldPageSlugs := s.getExistingPageSlugsForKnowledge(ctx, payload.KnowledgeBaseID, payload.KnowledgeID) + // Build a per-doc payload for functions that still need KnowledgeID + docPayload := WikiIngestPayload{ + TenantID: payload.TenantID, + KnowledgeBaseID: payload.KnowledgeBaseID, + KnowledgeID: knowledgeID, + Language: payload.Language, + } - // Step 1: Extract entities and concepts FIRST (so we know the slugs for summary links) - extractedPages, extractedSlugs, err := s.extractEntitiesAndConcepts(ctx, chatModel, content, docTitle, lang, payload, sourceRef, oldPageSlugs) + // Step 1: Extract entities and concepts + extractedPages, extractedSlugs, err := s.extractEntitiesAndConcepts(ctx, chatModel, content, docTitle, lang, docPayload, sourceRef, oldPageSlugs) if err != nil { - logger.Warnf(ctx, "wiki ingest: knowledge extraction failed: %v", err) + logger.Warnf(ctx, "wiki ingest: knowledge extraction failed for %s: %v", knowledgeID, err) } else { pagesAffected = append(pagesAffected, extractedPages...) } - // Step 2: Generate summary page (with extracted slugs for accurate [[wiki-links]]) + // Step 2: Generate summary page summarySlug := fmt.Sprintf("summary/%s", slugify(docTitle)) var slugListing string for _, slug := range extractedSlugs { @@ -394,7 +525,7 @@ func (s *wikiIngestService) ProcessWikiIngest(ctx context.Context, t *asynq.Task "ExtractedSlugs": slugListing, }) if err != nil { - logger.Errorf(ctx, "wiki ingest: generate summary failed: %v", err) + logger.Errorf(ctx, "wiki ingest: generate summary failed for %s: %v", knowledgeID, err) } else { _, err := s.wikiService.CreatePage(ctx, &types.WikiPage{ ID: uuid.New().String(), @@ -409,36 +540,17 @@ func (s *wikiIngestService) ProcessWikiIngest(ctx context.Context, t *asynq.Task SourceRefs: types.StringArray{sourceRef}, }) if err != nil { - logger.Warnf(ctx, "wiki ingest: create summary page failed: %v", err) + logger.Warnf(ctx, "wiki ingest: create summary page failed for %s: %v", knowledgeID, err) } else { pagesAffected = append(pagesAffected, summarySlug) } } - // Step 3: Rebuild index page - if err := s.rebuildIndexPage(ctx, chatModel, payload, lang); err != nil { - logger.Warnf(ctx, "wiki ingest: rebuild index failed: %v", err) - } + // Retract stale pages (pages this doc previously contributed to but no longer does) + s.retractStalePages(ctx, docPayload, oldPageSlugs, pagesAffected, docTitle, lang) - // Step 4: Append to log page - s.appendLogEntry(ctx, payload, "ingest", docTitle, pagesAffected, "") - - // Step 5: Cross-link injection — scan all affected pages and inject [[wiki-links]] - // for mentions of other wiki page titles. Pure text matching, no LLM call. - s.injectCrossLinks(ctx, payload.KnowledgeBaseID, pagesAffected) - - // Step 6: Publish all draft pages created during this ingest - s.publishDraftPages(ctx, payload.KnowledgeBaseID, pagesAffected) - - // Step 7: Handle stale pages — pages that previously referenced this document - // but are no longer produced by the updated extraction. This handles the - // "document updated and some entities/concepts were removed" scenario. - s.retractStalePages(ctx, payload, oldPageSlugs, pagesAffected, docTitle, lang) - - logger.Infof(ctx, "wiki ingest: completed for knowledge %s, %d pages affected", - payload.KnowledgeID, len(pagesAffected)) - - return nil + logger.Infof(ctx, "wiki ingest: processed knowledge %s, %d pages affected", knowledgeID, len(pagesAffected)) + return pagesAffected, nil } // cleanDeadLinks removes [[wiki-links]] that point to archived or deleted pages.