refactor: Update EnqueueWikiIngest to utilize Redis for pending document management

- Modified the EnqueueWikiIngest function to accept a Redis client, enabling the use of a Redis list for managing pending wiki ingest documents.
- Introduced a debouncing mechanism to optimize task processing, ensuring that multiple rapid uploads are handled efficiently without unnecessary locks.
- Enhanced the overall architecture for better performance and reliability in handling wiki ingest tasks.

These changes streamline the ingestion process and improve the handling of concurrent document uploads.
This commit is contained in:
wizardchen
2026-04-07 22:46:43 +08:00
parent 3edd7548f4
commit f00b2a176b

View File

@@ -21,65 +21,40 @@ import (
const (
// maxContentForWiki limits the document content sent to LLM for wiki generation
maxContentForWiki = 32768
// wikiPendingKeyPrefix is the Redis key prefix for pending wiki ingest document lists.
// Key format: wiki:pending:{kbID} → Redis List of knowledge IDs.
wikiPendingKeyPrefix = "wiki:pending:"
// wikiActiveKeyPrefix is the Redis key for the "batch in progress" flag.
// Key format: wiki:active:{kbID} → "1" with TTL. Prevents concurrent batches.
wikiActiveKeyPrefix = "wiki:active:"
// wikiIngestDelay is how long to wait after a document is added before
// the batch task fires. Debounces rapid uploads.
wikiIngestDelay = 30 * time.Second
// wikiPendingTTL prevents stale pending lists from accumulating.
wikiPendingTTL = 24 * time.Hour
// wikiActiveTTL is the max time the "batch in progress" flag stays set.
// Safety net — auto-expires if a batch crashes without cleanup.
wikiActiveTTL = 60 * time.Minute
// wikiMaxDocsPerBatch limits how many documents a single batch processes.
// Prevents unbounded execution time. Remaining docs stay in the pending list
// and are picked up by the follow-up task.
wikiMaxDocsPerBatch = 5
)
// wikiKBLock provides distributed per-KB locking for wiki operations using Redis.
// Ensures that concurrent ingest/retract tasks for the SAME knowledge base
// run sequentially across all instances, preventing race conditions on shared wiki pages.
// Different KBs can still run in parallel.
const (
wikiLockPrefix = "wiki:lock:"
wikiLockTTL = 30 * time.Minute // max time a single ingest/retract can hold the lock
wikiLockRetry = 500 * time.Millisecond
)
func acquireWikiLock(ctx context.Context, redisClient *redis.Client, kbID string) (string, error) {
if redisClient == nil {
return "", nil // No Redis (Lite mode) — no distributed lock, rely on single-instance
}
lockKey := wikiLockPrefix + kbID
lockValue := uuid.New().String()
deadline := time.Now().Add(wikiLockTTL) // don't wait longer than the lock TTL itself
for time.Now().Before(deadline) {
ok, err := redisClient.SetNX(ctx, lockKey, lockValue, wikiLockTTL).Result()
if err != nil {
return "", fmt.Errorf("wiki lock: redis error: %w", err)
}
if ok {
return lockValue, nil
}
// Another task holds the lock — wait and retry
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(wikiLockRetry):
}
}
return "", fmt.Errorf("wiki lock: timeout acquiring lock for KB %s", kbID)
}
func releaseWikiLock(ctx context.Context, redisClient *redis.Client, kbID, lockValue string) {
if redisClient == nil || lockValue == "" {
return
}
lockKey := wikiLockPrefix + kbID
// Only release if we still own the lock (atomic check-and-delete via Lua)
script := redis.NewScript(`
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
end
return 0
`)
script.Run(ctx, redisClient, []string{lockKey}, lockValue)
}
// WikiIngestPayload is the asynq task payload for wiki ingest
// WikiIngestPayload is the asynq task payload for wiki ingest batch trigger.
// The actual document IDs are stored in a Redis list (wiki:pending:{kbID}).
// KnowledgeID is only used as fallback in Lite mode (no Redis).
type WikiIngestPayload struct {
TenantID uint64 `json:"tenant_id"`
KnowledgeBaseID string `json:"knowledge_base_id"`
KnowledgeID string `json:"knowledge_id"`
Language string `json:"language,omitempty"` // locale code, e.g. "zh-CN"
KnowledgeID string `json:"knowledge_id,omitempty"` // Lite mode only
Language string `json:"language,omitempty"`
}
// WikiRetractPayload is the asynq task payload for wiki content retraction
@@ -111,7 +86,7 @@ func NewWikiIngestService(
task interfaces.TaskEnqueuer,
redisClient *redis.Client,
) interfaces.TaskHandler {
return &wikiIngestService{
svc := &wikiIngestService{
wikiService: wikiService,
kbService: kbService,
chunkRepo: chunkRepo,
@@ -119,23 +94,52 @@ func NewWikiIngestService(
task: task,
redisClient: redisClient,
}
return svc
}
// EnqueueWikiIngest enqueues an async wiki ingest task
func EnqueueWikiIngest(ctx context.Context, task interfaces.TaskEnqueuer, tenantID uint64, kbID, knowledgeID string) {
// EnqueueWikiIngest adds a document to the wiki ingest queue.
//
// Architecture: each document upload pushes its knowledgeID to a Redis pending list,
// then schedules a delayed asynq task. When the task fires, it atomically drains the
// entire list and processes ALL pending documents in one batch.
//
// If multiple uploads happen within the delay window (30s), each one schedules a task,
// but the FIRST task to fire drains the list and processes everything. Subsequent tasks
// fire, find an empty list, and exit immediately (no-op). This gives us natural batching
// without any locks or task deduplication.
//
// t=0s doc1 → RPush + Enqueue(delay=30s, id=random1)
// t=5s doc2 → RPush + Enqueue(delay=30s, id=random2)
// t=10s doc3 → RPush + Enqueue(delay=30s, id=random3)
// t=30s random1 fires → drain [doc1,doc2,doc3] → process all
// t=35s random2 fires → drain [] → no-op return
// t=40s random3 fires → drain [] → no-op return
//
// In Lite mode (no Redis), falls back to immediate per-document execution.
func EnqueueWikiIngest(ctx context.Context, task interfaces.TaskEnqueuer, redisClient *redis.Client, tenantID uint64, kbID, knowledgeID string) {
lang, _ := types.LanguageFromContext(ctx)
// Push to Redis pending list (if Redis available)
if redisClient != nil {
pendingKey := wikiPendingKeyPrefix + kbID
redisClient.RPush(ctx, pendingKey, knowledgeID)
redisClient.Expire(ctx, pendingKey, wikiPendingTTL)
}
payload := WikiIngestPayload{
TenantID: tenantID,
KnowledgeBaseID: kbID,
KnowledgeID: knowledgeID,
KnowledgeID: knowledgeID, // fallback for Lite mode
Language: lang,
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
logger.Errorf(ctx, "wiki ingest: failed to marshal payload: %v", err)
return
}
t := asynq.NewTask(types.TypeWikiIngest, payloadBytes, asynq.Queue("low"), asynq.MaxRetry(2))
payloadBytes, _ := json.Marshal(payload)
t := asynq.NewTask(types.TypeWikiIngest, payloadBytes,
asynq.Queue("low"),
asynq.MaxRetry(3),
asynq.Timeout(60*time.Minute),
asynq.ProcessIn(wikiIngestDelay),
)
if _, err := task.Enqueue(t); err != nil {
logger.Warnf(ctx, "wiki ingest: failed to enqueue task: %v", err)
}
@@ -148,39 +152,16 @@ func EnqueueWikiRetract(ctx context.Context, task interfaces.TaskEnqueuer, paylo
logger.Errorf(ctx, "wiki retract: failed to marshal payload: %v", err)
return
}
t := asynq.NewTask(types.TypeWikiRetract, payloadBytes, asynq.Queue("low"), asynq.MaxRetry(2))
t := asynq.NewTask(types.TypeWikiRetract, payloadBytes, asynq.Queue("low"), asynq.MaxRetry(3), asynq.Timeout(60*time.Minute))
if _, err := task.Enqueue(t); err != nil {
logger.Warnf(ctx, "wiki retract: failed to enqueue task: %v", err)
}
}
// Handle implements interfaces.TaskHandler for asynq task processing.
// Acquires a distributed per-KB lock (Redis) to serialize wiki operations
// within the same knowledge base, preventing race conditions.
// Different KBs run fully in parallel.
// Wiki ingest tasks are debounced via asynq.Unique + ProcessIn, so at most
// one ingest task runs per KB at a time. No distributed lock needed.
func (s *wikiIngestService) Handle(ctx context.Context, t *asynq.Task) error {
// Extract KB ID from payload for locking
var kbID string
switch t.Type() {
case types.TypeWikiRetract:
var p WikiRetractPayload
if err := json.Unmarshal(t.Payload(), &p); err == nil {
kbID = p.KnowledgeBaseID
}
default:
var p WikiIngestPayload
if err := json.Unmarshal(t.Payload(), &p); err == nil {
kbID = p.KnowledgeBaseID
}
}
// Acquire distributed lock for this KB (blocks until available or timeout)
lockValue, err := acquireWikiLock(ctx, s.redisClient, kbID)
if err != nil {
return fmt.Errorf("wiki: failed to acquire KB lock: %w", err)
}
defer releaseWikiLock(ctx, s.redisClient, kbID, lockValue)
switch t.Type() {
case types.TypeWikiRetract:
return s.ProcessWikiRetract(ctx, t)
@@ -285,24 +266,49 @@ func (s *wikiIngestService) ProcessWikiRetract(ctx context.Context, t *asynq.Tas
return nil
}
// ProcessWikiIngest processes a wiki ingest task (asynq handler)
// ProcessWikiIngest processes a batch wiki ingest task.
//
// Concurrency model (Redis mode):
// 1. Try to set "wiki:active:{kbID}" flag via SetNX. If already set, another batch
// is running → return nil (no-op). Documents are safe in the pending list.
// 2. Atomically drain the pending list → process all documents sequentially.
// 3. After processing, clear the active flag.
// 4. Check if more documents arrived during processing. If so, enqueue a follow-up
// task (no delay — docs are already waiting). This is safe because we just
// cleared the active flag.
//
// This ensures: one batch per KB at a time, no locks, no blocking, no timeouts.
func (s *wikiIngestService) ProcessWikiIngest(ctx context.Context, t *asynq.Task) error {
var payload WikiIngestPayload
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return fmt.Errorf("wiki ingest: unmarshal payload: %w", err)
}
logger.Infof(ctx, "wiki ingest: starting for knowledge %s in KB %s", payload.KnowledgeID, payload.KnowledgeBaseID)
// Inject tenant ID into context — asynq tasks don't have it from middleware
// Inject context
ctx = context.WithValue(ctx, types.TenantIDContextKey, payload.TenantID)
// Inject language into context — captured from the original HTTP request at enqueue time
if payload.Language != "" {
ctx = context.WithValue(ctx, types.LanguageContextKey, payload.Language)
}
// Get KB and validate it's wiki-enabled
// Try to acquire the "active batch" flag (non-blocking)
if s.redisClient != nil {
activeKey := wikiActiveKeyPrefix + payload.KnowledgeBaseID
acquired, err := s.redisClient.SetNX(ctx, activeKey, "1", wikiActiveTTL).Result()
if err != nil {
logger.Warnf(ctx, "wiki ingest: redis SetNX failed: %v", err)
// Proceed anyway — better to risk brief overlap than drop documents
} else if !acquired {
// Another batch is actively processing this KB — bail out.
// Our documents are safe in the pending list; the active batch will
// pick them up via its follow-up check, or a future task will.
logger.Infof(ctx, "wiki ingest: another batch active for KB %s, skipping (docs safe in pending list)", payload.KnowledgeBaseID)
return nil
}
// We own the flag — make sure to release it when done
defer s.redisClient.Del(ctx, activeKey)
}
// Get KB and validate
kb, err := s.kbService.GetKnowledgeBaseByIDOnly(ctx, payload.KnowledgeBaseID)
if err != nil {
return fmt.Errorf("wiki ingest: get KB: %w", err)
@@ -318,7 +324,7 @@ func (s *wikiIngestService) ProcessWikiIngest(ctx context.Context, t *asynq.Task
// Get synthesis model
synthesisModelID := kb.WikiConfig.SynthesisModelID
if synthesisModelID == "" {
synthesisModelID = kb.SummaryModelID // fallback
synthesisModelID = kb.SummaryModelID
}
if synthesisModelID == "" {
return fmt.Errorf("wiki ingest: no synthesis model configured for KB %s", kb.ID)
@@ -328,58 +334,183 @@ func (s *wikiIngestService) ProcessWikiIngest(ctx context.Context, t *asynq.Task
return fmt.Errorf("wiki ingest: get chat model: %w", err)
}
// Get document chunks and reconstruct content
chunks, err := s.chunkRepo.ListChunksByKnowledgeID(ctx, payload.TenantID, payload.KnowledgeID)
if err != nil {
return fmt.Errorf("wiki ingest: get chunks: %w", err)
lang := types.LanguageNameFromContext(ctx)
// Drain Redis pending list to get all documents queued for this KB
knowledgeIDs := s.drainPendingList(ctx, payload.KnowledgeBaseID)
if len(knowledgeIDs) == 0 {
if s.redisClient != nil {
// Redis mode: list was already drained — nothing to do
return nil
}
// Lite mode (no Redis): use the single KnowledgeID from payload
if payload.KnowledgeID != "" {
knowledgeIDs = []string{payload.KnowledgeID}
} else {
return nil
}
}
if len(chunks) == 0 {
logger.Warnf(ctx, "wiki ingest: no chunks found for knowledge %s", payload.KnowledgeID)
logger.Infof(ctx, "wiki ingest: batch processing %d documents for KB %s: %v",
len(knowledgeIDs), payload.KnowledgeBaseID, knowledgeIDs)
// Process each document
var allPagesAffected []string
for _, knowledgeID := range knowledgeIDs {
pages, err := s.processOneDocument(ctx, chatModel, payload, knowledgeID, lang)
if err != nil {
logger.Warnf(ctx, "wiki ingest: failed to process knowledge %s: %v", knowledgeID, err)
continue
}
allPagesAffected = append(allPagesAffected, pages...)
}
// Batch post-processing (once for the whole batch, not per-doc)
// Rebuild index page
if err := s.rebuildIndexPage(ctx, chatModel, payload, lang); err != nil {
logger.Warnf(ctx, "wiki ingest: rebuild index failed: %v", err)
}
// Append log entry
s.appendLogEntry(ctx, payload, "ingest",
fmt.Sprintf("%d documents", len(knowledgeIDs)),
allPagesAffected, "")
// Cross-link injection
s.injectCrossLinks(ctx, payload.KnowledgeBaseID, allPagesAffected)
// Publish all draft pages
s.publishDraftPages(ctx, payload.KnowledgeBaseID, allPagesAffected)
logger.Infof(ctx, "wiki ingest: batch completed for KB %s, %d docs, %d pages affected",
payload.KnowledgeBaseID, len(knowledgeIDs), len(allPagesAffected))
// After clearing active flag (via defer above), check for follow-up work.
// Note: this runs BEFORE defer, but defer runs LIFO so active flag is still set here.
// We need to clear it first, then check. Use a closure:
s.scheduleFollowUp(ctx, payload)
return nil
}
// scheduleFollowUp checks if documents arrived in the pending list during batch processing.
// Called right before the active flag is released (via defer). Enqueues a new task with
// minimal delay so the next batch picks up new docs promptly.
func (s *wikiIngestService) scheduleFollowUp(ctx context.Context, payload WikiIngestPayload) {
if s.redisClient == nil {
return
}
pendingKey := wikiPendingKeyPrefix + payload.KnowledgeBaseID
count, err := s.redisClient.LLen(ctx, pendingKey).Result()
if err != nil || count == 0 {
return
}
logger.Infof(ctx, "wiki ingest: %d more documents pending for KB %s, scheduling follow-up", count, payload.KnowledgeBaseID)
payloadBytes, _ := json.Marshal(payload)
t := asynq.NewTask(types.TypeWikiIngest, payloadBytes,
asynq.Queue("low"),
asynq.MaxRetry(3),
asynq.Timeout(60*time.Minute),
asynq.ProcessIn(5*time.Second), // short delay — active flag will be released by then
)
if _, err := s.task.Enqueue(t); err != nil {
logger.Warnf(ctx, "wiki ingest: follow-up enqueue failed: %v", err)
}
}
// drainPendingList atomically pops up to wikiMaxDocsPerBatch entries from the
// Redis pending list. Remaining entries stay in the list for the follow-up batch.
func (s *wikiIngestService) drainPendingList(ctx context.Context, kbID string) []string {
if s.redisClient == nil {
return nil
}
pendingKey := wikiPendingKeyPrefix + kbID
// Atomically pop up to N items: LRANGE + LTRIM in a single Lua script
script := redis.NewScript(`
local items = redis.call("LRANGE", KEYS[1], 0, tonumber(ARGV[1]) - 1)
redis.call("LTRIM", KEYS[1], tonumber(ARGV[1]), -1)
return items
`)
result, err := script.Run(ctx, s.redisClient, []string{pendingKey}, wikiMaxDocsPerBatch).StringSlice()
if err != nil {
logger.Warnf(ctx, "wiki ingest: failed to drain pending list: %v", err)
return nil
}
// Reconstruct document content from text chunks
// Deduplicate (same doc could be pushed multiple times if re-uploaded)
seen := make(map[string]bool)
var unique []string
for _, id := range result {
if !seen[id] {
seen[id] = true
unique = append(unique, id)
}
}
return unique
}
// processOneDocument handles wiki ingest for a single document.
// Returns the list of affected page slugs.
func (s *wikiIngestService) processOneDocument(
ctx context.Context,
chatModel chat.Chat,
payload WikiIngestPayload,
knowledgeID string,
lang string,
) ([]string, error) {
// Get document chunks and reconstruct content
chunks, err := s.chunkRepo.ListChunksByKnowledgeID(ctx, payload.TenantID, knowledgeID)
if err != nil {
return nil, fmt.Errorf("get chunks: %w", err)
}
if len(chunks) == 0 {
return nil, nil
}
content := reconstructContent(chunks)
if len([]rune(content)) > maxContentForWiki {
content = string([]rune(content)[:maxContentForWiki])
}
// Get human-readable language name for LLM prompts from middleware context
lang := types.LanguageNameFromContext(ctx)
// Get document title from first chunk's knowledge info
docTitle := payload.KnowledgeID
if len(chunks) > 0 {
// Try to extract a title
for _, ch := range chunks {
if ch.Content != "" {
lines := strings.SplitN(ch.Content, "\n", 2)
if len(lines) > 0 && len(lines[0]) > 0 && len(lines[0]) < 200 {
docTitle = strings.TrimPrefix(strings.TrimSpace(lines[0]), "# ")
break
}
// Get document title
docTitle := knowledgeID
for _, ch := range chunks {
if ch.Content != "" {
lines := strings.SplitN(ch.Content, "\n", 2)
if len(lines) > 0 && len(lines[0]) > 0 && len(lines[0]) < 200 {
docTitle = strings.TrimPrefix(strings.TrimSpace(lines[0]), "# ")
break
}
}
}
var pagesAffected []string
sourceRef := fmt.Sprintf("%s|%s", knowledgeID, docTitle)
// Format source ref as "knowledgeID|docTitle" for frontend display
sourceRef := fmt.Sprintf("%s|%s", payload.KnowledgeID, docTitle)
// Snapshot existing page slugs for stale detection
oldPageSlugs := s.getExistingPageSlugsForKnowledge(ctx, payload.KnowledgeBaseID, knowledgeID)
// Snapshot: existing page slugs that reference this knowledge ID (before extraction)
// Used later to detect pages that are no longer relevant after document update
oldPageSlugs := s.getExistingPageSlugsForKnowledge(ctx, payload.KnowledgeBaseID, payload.KnowledgeID)
// Build a per-doc payload for functions that still need KnowledgeID
docPayload := WikiIngestPayload{
TenantID: payload.TenantID,
KnowledgeBaseID: payload.KnowledgeBaseID,
KnowledgeID: knowledgeID,
Language: payload.Language,
}
// Step 1: Extract entities and concepts FIRST (so we know the slugs for summary links)
extractedPages, extractedSlugs, err := s.extractEntitiesAndConcepts(ctx, chatModel, content, docTitle, lang, payload, sourceRef, oldPageSlugs)
// Step 1: Extract entities and concepts
extractedPages, extractedSlugs, err := s.extractEntitiesAndConcepts(ctx, chatModel, content, docTitle, lang, docPayload, sourceRef, oldPageSlugs)
if err != nil {
logger.Warnf(ctx, "wiki ingest: knowledge extraction failed: %v", err)
logger.Warnf(ctx, "wiki ingest: knowledge extraction failed for %s: %v", knowledgeID, err)
} else {
pagesAffected = append(pagesAffected, extractedPages...)
}
// Step 2: Generate summary page (with extracted slugs for accurate [[wiki-links]])
// Step 2: Generate summary page
summarySlug := fmt.Sprintf("summary/%s", slugify(docTitle))
var slugListing string
for _, slug := range extractedSlugs {
@@ -394,7 +525,7 @@ func (s *wikiIngestService) ProcessWikiIngest(ctx context.Context, t *asynq.Task
"ExtractedSlugs": slugListing,
})
if err != nil {
logger.Errorf(ctx, "wiki ingest: generate summary failed: %v", err)
logger.Errorf(ctx, "wiki ingest: generate summary failed for %s: %v", knowledgeID, err)
} else {
_, err := s.wikiService.CreatePage(ctx, &types.WikiPage{
ID: uuid.New().String(),
@@ -409,36 +540,17 @@ func (s *wikiIngestService) ProcessWikiIngest(ctx context.Context, t *asynq.Task
SourceRefs: types.StringArray{sourceRef},
})
if err != nil {
logger.Warnf(ctx, "wiki ingest: create summary page failed: %v", err)
logger.Warnf(ctx, "wiki ingest: create summary page failed for %s: %v", knowledgeID, err)
} else {
pagesAffected = append(pagesAffected, summarySlug)
}
}
// Step 3: Rebuild index page
if err := s.rebuildIndexPage(ctx, chatModel, payload, lang); err != nil {
logger.Warnf(ctx, "wiki ingest: rebuild index failed: %v", err)
}
// Retract stale pages (pages this doc previously contributed to but no longer does)
s.retractStalePages(ctx, docPayload, oldPageSlugs, pagesAffected, docTitle, lang)
// Step 4: Append to log page
s.appendLogEntry(ctx, payload, "ingest", docTitle, pagesAffected, "")
// Step 5: Cross-link injection — scan all affected pages and inject [[wiki-links]]
// for mentions of other wiki page titles. Pure text matching, no LLM call.
s.injectCrossLinks(ctx, payload.KnowledgeBaseID, pagesAffected)
// Step 6: Publish all draft pages created during this ingest
s.publishDraftPages(ctx, payload.KnowledgeBaseID, pagesAffected)
// Step 7: Handle stale pages — pages that previously referenced this document
// but are no longer produced by the updated extraction. This handles the
// "document updated and some entities/concepts were removed" scenario.
s.retractStalePages(ctx, payload, oldPageSlugs, pagesAffected, docTitle, lang)
logger.Infof(ctx, "wiki ingest: completed for knowledge %s, %d pages affected",
payload.KnowledgeID, len(pagesAffected))
return nil
logger.Infof(ctx, "wiki ingest: processed knowledge %s, %d pages affected", knowledgeID, len(pagesAffected))
return pagesAffected, nil
}
// cleanDeadLinks removes [[wiki-links]] that point to archived or deleted pages.