refactor: Update wiki prompts and ingestion logic for improved content handling

- Renamed `WikiPageUpdatePrompt` to `WikiPageModifyPrompt` to better reflect its functionality of adding new information and removing outdated content in a single operation.
- Enhanced the instructions for both `WikiPageModifyPrompt` and `WikiPageRetractPrompt` to clarify the handling of additions and retractions, ensuring accurate content updates.
- Consolidated the logic for enqueuing retraction tasks in the `cleanupWikiOnKnowledgeDelete` method to streamline the process of managing affected pages.
- Removed the `rebuildWikiIndexSimple` function to simplify the codebase, as its functionality is no longer needed with the new ingestion approach.

These changes improve the clarity and efficiency of the wiki content management process, enhancing overall system performance.
This commit is contained in:
wizardchen
2026-04-09 16:24:48 +08:00
parent 8daf6b368c
commit fba6d7d145
5 changed files with 859 additions and 952 deletions

View File

@@ -112,70 +112,49 @@ Output ONLY valid JSON. Example:
]
}`
// WikiPageUpdatePrompt incrementally updates an existing wiki page with new information.
const WikiPageUpdatePrompt = `You are a wiki editor tasked with updating an existing wiki page with new information from a recently ingested document.
// WikiPageModifyPrompt updates an existing wiki page with new additions and removes stale/deleted information in a single pass.
const WikiPageModifyPrompt = `You are a wiki editor tasked with updating an existing wiki page. You must process a set of NEW information to add, AND/OR a set of deleted documents whose exclusive contributions must be REMOVED.
<existing_page_content>
{{.ExistingContent}}
</existing_page_content>
{{if .HasAdditions}}
<new_information>
<source_document>{{.NewDocTitle}}</source_document>
<content>
{{.NewContent}}
</content>
</new_information>
{{end}}
<instructions>
1. The FIRST line of your output MUST be: SUMMARY: {one sentence, 15-40 words, describing what this page is about after the update — for wiki index listing}
2. Merge the new information into the existing page content.
3. Preserve all existing information that is still valid.
4. If the new information contradicts existing content, prefer the newer information and silently replace the old claim.
5. Add new facts, details, and context from the new document.
6. Preserve any existing [[slug|name]] wiki-link references in the content. Do NOT invent new wiki-link slugs.
7. Maintain the existing page structure and formatting style.
8. Add a source reference to the new document at the bottom.
9. **Image rule**: If the new document contains <images> tags with <image> elements, you SHOULD include the relevant images in your updated page using the Markdown syntax: ![caption](url). Place the images where they are contextually relevant to the text.
10. Write in {{.Language}}.
</instructions>
Output the SUMMARY line first, then the updated Markdown content. Do not include any other preamble.`
// WikiPageRetractPrompt removes information contributed by a deleted document from an existing wiki page.
// It includes the actual content of remaining source documents (from their summary pages) so the LLM
// can accurately determine which facts are still supported and which should be removed.
const WikiPageRetractPrompt = `You are a wiki editor. A source document has been DELETED from the knowledge base. You must update the existing wiki page to remove any information that came exclusively from this deleted document.
<existing_page_content>
{{.ExistingContent}}
</existing_page_content>
<deleted_document>
<title>{{.DeletedDocTitle}}</title>
<content>
{{.DeletedDocContent}}
</content>
</deleted_document>
{{if .HasRetractions}}
<deleted_documents>
{{.DeletedContent}}
</deleted_documents>
<remaining_source_documents>
{{.RemainingSourcesContent}}
</remaining_source_documents>
{{end}}
<valid_wiki_links>
{{.AvailableSlugs}}
</valid_wiki_links>
<instructions>
1. The FIRST line of your output MUST be: SUMMARY: {one sentence, 15-40 words, describing what this page is about after retraction — for wiki index listing}
2. Carefully review the existing page content. Compare it against BOTH the deleted document and the remaining source documents above.
3. Remove any facts, claims, or details that were ONLY sourced from the deleted document and are NOT present in any remaining source document.
4. If a fact appears in both the deleted document and a remaining source, KEEP it.
5. If you are unsure whether a fact came from the deleted document, keep it as-is — do NOT add any review notes or annotations.
6. Update or remove the "Source: {{.DeletedDocTitle}}" reference line if present.
7. Keep [[slug|name]] wiki-link references ONLY if the slug appears in the <valid_wiki_links> list above. Remove any [[slug|name]] whose slug is NOT in that list.
8. Maintain the existing page structure, formatting style, and language.
9. If after removing the deleted document's contributions the page becomes nearly empty, output just: "SUMMARY: (empty page)\n# [Title]\n\n*This page's primary source document was removed.*"
10. Write in {{.Language}}.
1. The FIRST line of your output MUST be: SUMMARY: {one sentence, 15-40 words, describing what this page is about after the update — for wiki index listing}
{{if .HasRetractions}}
2. REMOVE facts/claims that were ONLY sourced from the <deleted_documents> and are NOT present in any <remaining_source_documents> or <new_information>.
{{end}}
{{if .HasAdditions}}
3. ADD and MERGE the facts, details, and context from the <new_information> into the page. If it contradicts old content, prefer the newer information.
{{end}}
4. Preserve existing information that is still valid.
5. Keep [[slug|name]] wiki-link references ONLY if the slug appears in the <valid_wiki_links> list above. Remove any [[slug|name]] whose slug is NOT in that list. Do NOT invent new wiki-link slugs.
6. Maintain the existing page structure and formatting style.
7. **Image rule**: Include relevant images using Markdown syntax: ![caption](url) from new information if applicable.
{{if .HasRetractions}}
8. If after removing deleted content the page becomes nearly empty and there is no new information to add, output just: "SUMMARY: (empty page)\n# [Title]\n\n*This page's primary source document was removed.*"
{{end}}
9. Write in {{.Language}}.
</instructions>
Output the SUMMARY line first, then the updated Markdown content. Do not include any other preamble.`

View File

@@ -1327,8 +1327,10 @@ func (s *knowledgeService) cleanupWikiOnKnowledgeDelete(ctx context.Context, kbI
len(deletedSlugs), knowledgeID, deletedSlugs)
}
// Enqueue async LLM retraction for multi-source pages
if len(retractSlugs) > 0 {
allAffectedSlugs := append(retractSlugs, deletedSlugs...)
// Enqueue async LLM retraction and index rebuild
if len(allAffectedSlugs) > 0 {
lang, _ := types.LanguageFromContext(ctx)
tenantID, _ := types.TenantIDFromContext(ctx)
EnqueueWikiRetract(ctx, s.task, s.redisClient, WikiRetractPayload{
@@ -1338,16 +1340,9 @@ func (s *knowledgeService) cleanupWikiOnKnowledgeDelete(ctx context.Context, kbI
DocTitle: docTitle,
DocSummary: docSummary,
Language: lang,
PageSlugs: retractSlugs,
PageSlugs: allAffectedSlugs,
})
logger.Infof(ctx, "wiki cleanup: enqueued retract task for %d pages: %v", len(retractSlugs), retractSlugs)
}
// If pages were deleted but no retract task was enqueued,
// we still need to update the index and log synchronously.
// (When retractSlugs > 0, ProcessWikiRetract handles index/log.)
if len(deletedSlugs) > 0 && len(retractSlugs) == 0 {
s.rebuildWikiIndexSimple(ctx, kbID, knowledgeID, docTitle, deletedSlugs)
logger.Infof(ctx, "wiki cleanup: enqueued retract task for %d pages: %v", len(allAffectedSlugs), allAffectedSlugs)
}
}
@@ -1365,81 +1360,6 @@ func removeSourceRef(refs types.StringArray, knowledgeID string) types.StringArr
return result
}
// rebuildWikiIndexSimple regenerates the index page from current page listing
// and appends a deletion log entry. Does NOT use LLM — generates a simple
// structured listing while preserving the LLM-generated intro from Summary field.
func (s *knowledgeService) rebuildWikiIndexSimple(ctx context.Context, kbID, knowledgeID, docTitle string, deletedSlugs []string) {
// Rebuild index
allPages, err := s.wikiService.ListAllPages(ctx, kbID)
if err != nil {
logger.Warnf(ctx, "wiki cleanup: failed to list pages: %v", err)
return
}
indexPage, _ := s.wikiService.GetIndex(ctx, kbID)
if indexPage == nil {
return
}
// Preserve LLM-generated intro from Summary field
var indexContent strings.Builder
if indexPage.Summary != "" && indexPage.Summary != "Index" {
indexContent.WriteString(indexPage.Summary)
indexContent.WriteString("\n")
} else {
indexContent.WriteString("# Wiki Index\n\n")
}
// Group pages by type
typeOrder := []string{types.WikiPageTypeSummary, types.WikiPageTypeEntity, types.WikiPageTypeConcept, types.WikiPageTypeSynthesis, types.WikiPageTypeComparison}
grouped := make(map[string][]*types.WikiPage)
for _, p := range allPages {
if p.PageType == types.WikiPageTypeIndex || p.PageType == types.WikiPageTypeLog {
continue
}
if p.Status == types.WikiPageStatusArchived {
continue
}
grouped[p.PageType] = append(grouped[p.PageType], p)
}
for _, pt := range typeOrder {
pages := grouped[pt]
if len(pages) == 0 {
continue
}
label := pt
if len(label) > 0 {
label = strings.ToUpper(label[:1]) + label[1:]
}
fmt.Fprintf(&indexContent, "\n## %s\n\n", label)
for _, p := range pages {
fmt.Fprintf(&indexContent, "- [[%s]] — %s\n", p.Slug, p.Summary)
}
}
indexPage.Content = indexContent.String()
if _, err := s.wikiService.UpdatePage(ctx, indexPage); err != nil {
logger.Warnf(ctx, "wiki cleanup: failed to rebuild index: %v", err)
}
// Append log entry
logPage, _ := s.wikiService.GetLog(ctx, kbID)
if logPage != nil {
entry := fmt.Sprintf("\n## [%s] delete | %s\n- **Source**: knowledge/%s\n- **Pages deleted**: %d (%s)\n",
time.Now().UTC().Format("2006-01-02 15:04:05"),
docTitle,
knowledgeID,
len(deletedSlugs),
strings.Join(deletedSlugs, ", "),
)
logPage.Content = logPage.Content + entry
if _, err := s.wikiService.UpdatePage(ctx, logPage); err != nil {
logger.Warnf(ctx, "wiki cleanup: failed to update log: %v", err)
}
}
}
// DeleteKnowledgeList deletes a knowledge entry and all related resources
func (s *knowledgeService) DeleteKnowledgeList(ctx context.Context, ids []string) error {
if len(ids) == 0 {

View File

@@ -14,7 +14,6 @@ import (
"github.com/Tencent/WeKnora/internal/models/chat"
"github.com/Tencent/WeKnora/internal/types"
"github.com/Tencent/WeKnora/internal/types/interfaces"
"github.com/google/uuid"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
)
@@ -223,421 +222,6 @@ func (s *wikiIngestService) Handle(ctx context.Context, t *asynq.Task) error {
return s.ProcessWikiIngest(ctx, t)
}
// retractPagesContent uses LLM to remove a deleted/stale document's contributions
// from the given wiki pages. docContent is the deleted document's content (or summary)
// so the LLM can accurately identify which parts to remove. It does NOT rebuild the
// index or append log entries — callers are responsible for those post-processing steps.
func (s *wikiIngestService) retractPagesContent(
ctx context.Context,
chatModel chat.Chat,
kbID, docTitle, docContent string,
pageSlugs []string,
lang string,
) {
allPages, _ := s.wikiService.ListAllPages(ctx, kbID)
slugTitleMap := make(map[string]string)
summaryContentByKnowledgeID := make(map[string]string)
for _, p := range allPages {
if p.PageType != types.WikiPageTypeIndex && p.PageType != types.WikiPageTypeLog && p.Status != types.WikiPageStatusArchived {
slugTitleMap[p.Slug] = p.Title
}
if p.PageType == types.WikiPageTypeSummary && p.Content != "" {
for _, ref := range p.SourceRefs {
kid := ref
if pipeIdx := strings.Index(ref, "|"); pipeIdx > 0 {
kid = ref[:pipeIdx]
}
summaryContentByKnowledgeID[kid] = p.Content
}
}
}
for _, slug := range pageSlugs {
page, err := s.wikiService.GetPageBySlug(ctx, kbID, slug)
if err != nil || page == nil {
continue
}
var remainingSourcesContent strings.Builder
for _, ref := range page.SourceRefs {
pipeIdx := strings.Index(ref, "|")
var refKnowledgeID, refTitle string
if pipeIdx > 0 {
refKnowledgeID = ref[:pipeIdx]
refTitle = ref[pipeIdx+1:]
} else {
refKnowledgeID = ref
refTitle = ref
}
if content, ok := summaryContentByKnowledgeID[refKnowledgeID]; ok {
fmt.Fprintf(&remainingSourcesContent, "<source title=%q>\n%s\n</source>\n\n", refTitle, content)
} else {
fmt.Fprintf(&remainingSourcesContent, "<source title=%q>\n(summary not available)\n</source>\n\n", refTitle)
}
}
if remainingSourcesContent.Len() == 0 {
remainingSourcesContent.WriteString("(no remaining sources)")
}
var relatedSlugs strings.Builder
for _, outSlug := range page.OutLinks {
if title, ok := slugTitleMap[outSlug]; ok {
fmt.Fprintf(&relatedSlugs, "- %s (%s)\n", outSlug, title)
}
}
updatedContent, err := s.generateWithTemplate(ctx, chatModel, agent.WikiPageRetractPrompt, map[string]string{
"ExistingContent": page.Content,
"DeletedDocTitle": docTitle,
"DeletedDocContent": docContent,
"RemainingSourcesContent": remainingSourcesContent.String(),
"AvailableSlugs": relatedSlugs.String(),
"Language": lang,
})
if err != nil {
logger.Warnf(ctx, "wiki retract: LLM call failed for page %s: %v", slug, err)
continue
}
retractedSummary, retractedBody := splitSummaryLine(updatedContent)
if retractedBody != "" {
page.Content = retractedBody
} else {
page.Content = updatedContent
}
if retractedSummary != "" {
page.Summary = retractedSummary
}
if _, err := s.wikiService.UpdatePage(ctx, page); err != nil {
logger.Warnf(ctx, "wiki retract: failed to update page %s: %v", slug, err)
} else {
logger.Infof(ctx, "wiki retract: updated page %s after removing content from '%s'", slug, docTitle)
}
}
}
// ProcessWikiIngest processes a batch wiki ingest task.
//
// Concurrency model (Redis mode):
// 1. Try to set "wiki:active:{kbID}" flag via SetNX. If already set, another batch
// is running → return nil (no-op). Documents are safe in the pending list.
// 2. Atomically drain the pending list → process all documents sequentially.
// 3. After processing, clear the active flag.
// 4. Check if more documents arrived during processing. If so, enqueue a follow-up
// task (no delay — docs are already waiting). This is safe because we just
// cleared the active flag.
//
// This ensures: one batch per KB at a time, no locks, no blocking, no timeouts.
func (s *wikiIngestService) ProcessWikiIngest(ctx context.Context, t *asynq.Task) error {
taskStartedAt := time.Now()
retryCount, _ := asynq.GetRetryCount(ctx)
maxRetry, _ := asynq.GetMaxRetry(ctx)
var payload WikiIngestPayload
exitStatus := "success"
mode := "redis"
lockAcquired := false
pendingOpsCount := 0
ingestOps := 0
retractOps := 0
ingestSucceeded := 0
ingestFailed := 0
retractHandled := 0
indexRebuildAttempted := false
indexRebuildSucceeded := false
followUpScheduled := false
ingestPagesCount := 0
retractPagesCount := 0
totalPagesAffected := 0
docPreview := make([]string, 0, 6)
defer func() {
logger.Infof(
ctx,
"wiki ingest stats: kb=%s tenant=%d retry=%d/%d status=%s elapsed=%s mode=%s lock_acquired=%v pending_ops=%d ops(ingest=%d,retract=%d) ingest(success=%d,failed=%d) retract_handled=%d pages(ingest=%d,retract=%d,total=%d) index(rebuild_attempted=%v,rebuild_succeeded=%v) followup=%v preview=%s",
payload.KnowledgeBaseID,
payload.TenantID,
retryCount,
maxRetry,
exitStatus,
time.Since(taskStartedAt).Round(time.Millisecond),
mode,
lockAcquired,
pendingOpsCount,
ingestOps,
retractOps,
ingestSucceeded,
ingestFailed,
retractHandled,
ingestPagesCount,
retractPagesCount,
totalPagesAffected,
indexRebuildAttempted,
indexRebuildSucceeded,
followUpScheduled,
previewStringSlice(docPreview, 6),
)
}()
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
exitStatus = "invalid_payload"
return fmt.Errorf("wiki ingest: unmarshal payload: %w", err)
}
// Inject context
ctx = context.WithValue(ctx, types.TenantIDContextKey, payload.TenantID)
if payload.Language != "" {
ctx = context.WithValue(ctx, types.LanguageContextKey, payload.Language)
}
// Try to acquire the "active batch" flag (non-blocking)
if s.redisClient != nil {
activeKey := wikiActiveKeyPrefix + payload.KnowledgeBaseID
// Use a 5-minute initial TTL
acquired, err := s.redisClient.SetNX(ctx, activeKey, "1", 5*time.Minute).Result()
if err != nil {
logger.Warnf(ctx, "wiki ingest: redis SetNX failed: %v", err)
// Proceed anyway — better to risk brief overlap than drop documents
} else if !acquired {
exitStatus = "active_lock_conflict"
// Another batch is actively processing this KB.
// By returning an error, we leverage Asynq's built-in exponential backoff to retry this task later.
// This ensures that if the active task crashes (leaking the lock until its TTL expires),
// this task will eventually retry and process the remaining items in the queue, preventing a stalled queue.
logger.Infof(ctx, "wiki ingest: another batch active for KB %s, deferring to asynq retry", payload.KnowledgeBaseID)
return fmt.Errorf("concurrent wiki task active, please retry")
}
lockAcquired = acquired
// Create a context to cancel the keep-alive goroutine when we're done
lockCtx, cancelLock := context.WithCancel(context.Background())
// We own the flag — make sure to release it when done
defer func() {
cancelLock()
// Use context.Background() to ensure release even if ctx is cancelled
s.redisClient.Del(context.Background(), activeKey)
}()
// Keep-alive goroutine to extend lock TTL while the task is running
go func() {
ticker := time.NewTicker(2 * time.Minute)
defer ticker.Stop()
for {
select {
case <-lockCtx.Done():
return
case <-ticker.C:
s.redisClient.Expire(context.Background(), activeKey, 5*time.Minute)
}
}
}()
} else {
mode = "lite"
}
// Get KB and validate
kb, err := s.kbService.GetKnowledgeBaseByIDOnly(ctx, payload.KnowledgeBaseID)
if err != nil {
exitStatus = "get_kb_failed"
return fmt.Errorf("wiki ingest: get KB: %w", err)
}
if !kb.IsWikiEnabled() {
exitStatus = "kb_not_wiki_enabled"
return fmt.Errorf("wiki ingest: KB %s is not wiki type", kb.ID)
}
if kb.WikiConfig == nil || !kb.WikiConfig.AutoIngest {
exitStatus = "auto_ingest_disabled"
logger.Infof(ctx, "wiki ingest: auto_ingest disabled for KB %s, skipping", kb.ID)
return nil
}
// Get synthesis model
synthesisModelID := kb.WikiConfig.SynthesisModelID
if synthesisModelID == "" {
synthesisModelID = kb.SummaryModelID
}
if synthesisModelID == "" {
exitStatus = "missing_synthesis_model"
return fmt.Errorf("wiki ingest: no synthesis model configured for KB %s", kb.ID)
}
chatModel, err := s.modelService.GetChatModel(ctx, synthesisModelID)
if err != nil {
exitStatus = "get_chat_model_failed"
return fmt.Errorf("wiki ingest: get chat model: %w", err)
}
lang := types.LanguageNameFromContext(ctx)
// Peek Redis pending list to get all operations queued for this KB without removing them
pendingOps, peekedCount := s.peekPendingList(ctx, payload.KnowledgeBaseID)
pendingOpsCount = len(pendingOps)
if len(pendingOps) == 0 {
if s.redisClient != nil {
// Redis mode: list was already drained — nothing to do
exitStatus = "no_pending_ops"
logger.Infof(ctx, "wiki ingest: no pending operations for KB %s", payload.KnowledgeBaseID)
return nil
}
// Lite mode (no Redis): use LiteOps from payload
if len(payload.LiteOps) > 0 {
pendingOps = payload.LiteOps
peekedCount = len(pendingOps)
pendingOpsCount = len(pendingOps)
} else {
exitStatus = "no_lite_ops"
return nil
}
}
logger.Infof(ctx, "wiki ingest: batch processing %d ops for KB %s",
len(pendingOps), payload.KnowledgeBaseID)
// Process each operation
var ingestPagesAffected []string
var retractPagesAffected []string
var docResults []*docIngestResult
var retractChangeDesc strings.Builder
for _, op := range pendingOps {
if op.Op == WikiOpRetract {
retractOps++
logger.Infof(ctx, "wiki ingest: retracting document '%s' (%s)", op.DocTitle, op.KnowledgeID)
s.retractPagesContent(ctx, chatModel, payload.KnowledgeBaseID, op.DocTitle, op.DocSummary, op.PageSlugs, op.Language)
retractPagesAffected = append(retractPagesAffected, op.PageSlugs...)
retractPagesCount += len(op.PageSlugs)
retractHandled++
docPreview = append(docPreview,
fmt.Sprintf("retract[%s]: %s", previewText(op.KnowledgeID, 24), previewText(op.DocTitle, 48)))
fmt.Fprintf(&retractChangeDesc, "Removed document '%s': %s\n", op.DocTitle, op.DocSummary)
s.appendLogEntry(ctx, WikiIngestPayload{
TenantID: payload.TenantID,
KnowledgeBaseID: payload.KnowledgeBaseID,
}, "retract", op.KnowledgeID, op.DocTitle, op.PageSlugs, "")
} else {
ingestOps++
// Default to ingest
logger.Infof(ctx, "wiki ingest: processing document '%s' (%s)", op.DocTitle, op.KnowledgeID)
result, err := s.processOneDocument(ctx, chatModel, payload, op.KnowledgeID, op.Language)
if err != nil {
ingestFailed++
logger.Warnf(ctx, "wiki ingest: failed to process knowledge %s: %v", op.KnowledgeID, err)
continue
}
if result != nil {
ingestSucceeded++
ingestPagesAffected = append(ingestPagesAffected, result.Pages...)
ingestPagesCount += len(result.Pages)
docResults = append(docResults, result)
docPreview = append(docPreview,
fmt.Sprintf(
"ingest[%s]: title=%s summary=%s pages=%s",
previewText(result.KnowledgeID, 24),
previewText(result.DocTitle, 40),
previewText(result.Summary, 64),
previewStringSlice(result.Pages, 4),
))
}
}
}
allPagesAffected := append(ingestPagesAffected, retractPagesAffected...)
totalPagesAffected = len(allPagesAffected)
// Batch post-processing (once for the whole batch, not per-doc)
// Build change description from processed documents
var changeDesc strings.Builder
if len(docResults) > 0 {
fmt.Fprintf(&changeDesc, "Added %d documents:\n", len(docResults))
for _, r := range docResults {
fmt.Fprintf(&changeDesc, "- %s: %s\n", r.DocTitle, r.Summary)
}
}
if retractChangeDesc.Len() > 0 {
changeDesc.WriteString(retractChangeDesc.String())
}
// Rebuild index page
if changeDesc.Len() > 0 {
indexRebuildAttempted = true
logger.Infof(ctx, "wiki ingest: rebuilding index page")
if err := s.rebuildIndexPage(ctx, chatModel, payload, changeDesc.String(), lang); err != nil {
logger.Warnf(ctx, "wiki ingest: rebuild index failed: %v", err)
docPreview = append(docPreview,
fmt.Sprintf("index_change=%s", previewText(changeDesc.String(), 160)))
} else {
indexRebuildSucceeded = true
docPreview = append(docPreview,
fmt.Sprintf("index_change=%s", previewText(changeDesc.String(), 160)))
}
}
// Append log entry for ingests
if len(docResults) > 0 {
s.appendLogEntry(ctx, payload, "ingest", "",
fmt.Sprintf("%d documents", len(docResults)),
ingestPagesAffected, "")
}
// Clean dead links (needed after retracts)
if len(retractPagesAffected) > 0 {
logger.Infof(ctx, "wiki ingest: cleaning dead links")
s.cleanDeadLinks(ctx, payload.KnowledgeBaseID)
}
if len(allPagesAffected) > 0 {
// Cross-link injection
logger.Infof(ctx, "wiki ingest: injecting cross links")
s.injectCrossLinks(ctx, payload.KnowledgeBaseID, allPagesAffected)
// Publish all draft pages
logger.Infof(ctx, "wiki ingest: publishing draft pages")
s.publishDraftPages(ctx, payload.KnowledgeBaseID, allPagesAffected)
}
// Trim the pending list now that processing is complete
s.trimPendingList(ctx, payload.KnowledgeBaseID, peekedCount)
logger.Infof(ctx, "wiki ingest: batch completed for KB %s, %d ops, %d pages affected",
payload.KnowledgeBaseID, len(pendingOps), len(allPagesAffected))
// After clearing active flag (via defer above), check for follow-up work.
// Note: this runs BEFORE defer, but defer runs LIFO so active flag is still set here.
// We need to clear it first, then check. Use a closure:
followUpScheduled = s.scheduleFollowUp(ctx, payload)
return nil
}
// scheduleFollowUp checks if documents arrived in the pending list during batch processing.
// Called right before the active flag is released (via defer). Enqueues a new task with
// minimal delay so the next batch picks up new docs promptly.
func (s *wikiIngestService) scheduleFollowUp(ctx context.Context, payload WikiIngestPayload) bool {
if s.redisClient == nil {
return false
}
pendingKey := wikiPendingKeyPrefix + payload.KnowledgeBaseID
count, err := s.redisClient.LLen(ctx, pendingKey).Result()
if err != nil || count == 0 {
return false
}
logger.Infof(ctx, "wiki ingest: %d more documents pending for KB %s, scheduling follow-up", count, payload.KnowledgeBaseID)
payloadBytes, _ := json.Marshal(payload)
t := asynq.NewTask(types.TypeWikiIngest, payloadBytes,
asynq.Queue("low"),
asynq.MaxRetry(10), // Increased from 3 to 10 to outlast the active lock TTL
asynq.Timeout(60*time.Minute),
asynq.ProcessIn(5*time.Second), // short delay — active flag will be released by then
)
if _, err := s.task.Enqueue(t); err != nil {
logger.Warnf(ctx, "wiki ingest: follow-up enqueue failed: %v", err)
return false
}
return true
}
// peekPendingList gets up to wikiMaxDocsPerBatch entries from the Redis pending list
// WITHOUT removing them. It returns the unique ops and the actual number of items peeked.
func (s *wikiIngestService) peekPendingList(ctx context.Context, kbID string) ([]WikiPendingOp, int) {
@@ -708,161 +292,24 @@ type docIngestResult struct {
Pages []string // affected page slugs
}
// processOneDocument handles wiki ingest for a single document.
func (s *wikiIngestService) processOneDocument(
ctx context.Context,
chatModel chat.Chat,
payload WikiIngestPayload,
knowledgeID string,
lang string,
) (*docIngestResult, error) {
docStartedAt := time.Now()
// Get document chunks and reconstruct content
chunks, err := s.chunkRepo.ListChunksByKnowledgeID(ctx, payload.TenantID, knowledgeID)
if err != nil {
return nil, fmt.Errorf("get chunks: %w", err)
}
if len(chunks) == 0 {
logger.Infof(ctx, "wiki ingest: document %s has no chunks, skip", knowledgeID)
return nil, nil
}
// WikiBatchContext holds data needed during the Reduce phase
type WikiBatchContext struct {
SlugTitleMap map[string]string
SummaryContentByKnowledgeID map[string]string
}
content := reconstructContent(chunks)
rawRuneCount := len([]rune(content))
if len([]rune(content)) > maxContentForWiki {
content = string([]rune(content)[:maxContentForWiki])
}
logger.Infof(ctx,
"wiki ingest: doc %s chunks=%d content_len(raw=%d,truncated=%d) content_preview=%q",
knowledgeID, len(chunks), rawRuneCount, len([]rune(content)), previewText(content, 120))
// Get document title
docTitle := knowledgeID
if kn, err := s.knowledgeSvc.GetKnowledgeByIDOnly(ctx, knowledgeID); err == nil && kn != nil && kn.Title != "" {
docTitle = kn.Title
} else {
for _, ch := range chunks {
if ch.Content != "" {
lines := strings.SplitN(ch.Content, "\n", 2)
if len(lines) > 0 && len(lines[0]) > 0 && len(lines[0]) < 200 {
docTitle = strings.TrimPrefix(strings.TrimSpace(lines[0]), "# ")
break
}
}
}
}
var pagesAffected []string
var docSummaryLine string
sourceRef := fmt.Sprintf("%s|%s", knowledgeID, docTitle)
// Snapshot existing page slugs for stale detection
oldPageSlugs := s.getExistingPageSlugsForKnowledge(ctx, payload.KnowledgeBaseID, knowledgeID)
// Build a per-doc payload for functions that still need KnowledgeID
// (docPayload removed as WikiIngestPayload no longer holds KnowledgeID)
// Step 1: Extract entities and concepts
logger.Infof(ctx, "wiki ingest: extracting entities and concepts for %s", knowledgeID)
extractedPages, slugItems, err := s.extractEntitiesAndConcepts(ctx, chatModel, content, docTitle, lang, payload, sourceRef, oldPageSlugs)
if err != nil {
logger.Warnf(ctx, "wiki ingest: knowledge extraction failed for %s: %v", knowledgeID, err)
} else {
pagesAffected = append(pagesAffected, extractedPages...)
}
// Step 2: Generate summary page
logger.Infof(ctx, "wiki ingest: generating summary page for %s", knowledgeID)
summarySlug := fmt.Sprintf("summary/%s", slugify(docTitle))
var slugListing string
for _, slug := range extractedPages {
if item, ok := slugItems[slug]; ok {
aliases := ""
if len(item.Aliases) > 0 {
aliases = fmt.Sprintf(" (Aliases: %s)", strings.Join(item.Aliases, ", "))
}
slugListing += fmt.Sprintf("- [[%s]] = %s%s\n", slug, item.Name, aliases)
} else {
slugListing += fmt.Sprintf("- [[%s]]\n", slug)
}
}
summaryContent, err := s.generateWithTemplate(ctx, chatModel, agent.WikiSummaryPrompt, map[string]string{
"Title": docTitle,
"FileName": docTitle,
"FileType": "document",
"Content": content,
"Language": lang,
"ExtractedSlugs": slugListing,
})
if err != nil {
logger.Errorf(ctx, "wiki ingest: generate summary failed for %s: %v", knowledgeID, err)
} else {
sumLine, sumBody := splitSummaryLine(summaryContent)
if sumBody == "" {
sumBody = summaryContent
}
if sumLine == "" {
sumLine = docTitle
}
docSummaryLine = sumLine
logger.Infof(ctx, "wiki ingest: summary preview for %s => line=%q body=%q",
knowledgeID, previewText(sumLine, 100), previewText(sumBody, 140))
existingSummary, err := s.wikiService.GetPageBySlug(ctx, payload.KnowledgeBaseID, summarySlug)
if err == nil && existingSummary != nil {
// Update existing summary page (idempotent for retries)
existingSummary.Title = docTitle + " - Summary"
existingSummary.Content = sumBody
existingSummary.Summary = sumLine
existingSummary.Status = types.WikiPageStatusDraft
existingSummary.SourceRefs = appendUnique(existingSummary.SourceRefs, sourceRef)
if _, err := s.wikiService.UpdatePage(ctx, existingSummary); err != nil {
logger.Warnf(ctx, "wiki ingest: update summary page failed for %s: %v", knowledgeID, err)
} else {
pagesAffected = append(pagesAffected, summarySlug)
}
} else {
// Create new summary page
_, err = s.wikiService.CreatePage(ctx, &types.WikiPage{
ID: uuid.New().String(),
TenantID: payload.TenantID,
KnowledgeBaseID: payload.KnowledgeBaseID,
Slug: summarySlug,
Title: docTitle + " - Summary",
PageType: types.WikiPageTypeSummary,
Status: types.WikiPageStatusDraft,
Content: sumBody,
Summary: sumLine,
SourceRefs: types.StringArray{sourceRef},
})
if err != nil {
logger.Warnf(ctx, "wiki ingest: create summary page failed for %s: %v", knowledgeID, err)
} else {
pagesAffected = append(pagesAffected, summarySlug)
}
}
}
// Retract stale pages (pages this doc previously contributed to but no longer does)
logger.Infof(ctx, "wiki ingest: retracting stale pages for %s", knowledgeID)
s.retractStalePages(ctx, chatModel, payload, knowledgeID, oldPageSlugs, pagesAffected, docTitle, content, lang)
logger.Infof(ctx,
"wiki ingest: processed knowledge %s title=%q affected_pages=%d page_preview=%s extracted_pages=%s elapsed=%s",
knowledgeID,
previewText(docTitle, 80),
len(pagesAffected),
previewStringSlice(pagesAffected, 6),
previewStringSlice(extractedPages, 6),
time.Since(docStartedAt).Round(time.Millisecond),
)
return &docIngestResult{
KnowledgeID: knowledgeID,
DocTitle: docTitle,
Summary: docSummaryLine,
Pages: pagesAffected,
}, nil
// SlugUpdate represents a single update operation for a specific slug
type SlugUpdate struct {
Slug string
Type string // "entity", "concept", "summary", "retract", "retractStale"
Item extractedItem // For entity/concept
DocTitle string
KnowledgeID string
SourceRef string
Language string
SummaryBody string // For summary
SummaryLine string // For summary
RetractDocContent string // For retract / retractStale
}
func previewText(s string, maxRunes int) string {
@@ -1112,77 +559,16 @@ func (s *wikiIngestService) getExistingPageSlugsForKnowledge(ctx context.Context
// but are no longer produced by the updated extraction.
// - Single-source stale pages → deleted
// - Multi-source stale pages → LLM retract to clean content synchronously
func (s *wikiIngestService) retractStalePages(
ctx context.Context,
chatModel chat.Chat,
payload WikiIngestPayload,
knowledgeID string,
oldSlugs map[string]bool,
newSlugs []string,
docTitle, docContent, lang string,
) {
if len(oldSlugs) == 0 {
return
}
// Build set of newly affected slugs (including summary)
newSet := make(map[string]bool, len(newSlugs))
for _, s := range newSlugs {
newSet[s] = true
}
// Build set of newly affected slugs (including summary)
// Stale = was in old set but not in new set
var staleSlugs []string
for slug := range oldSlugs {
if !newSet[slug] {
staleSlugs = append(staleSlugs, slug)
}
}
if len(staleSlugs) == 0 {
return
}
// Stale = was in old set but not in new set
logger.Infof(ctx, "wiki ingest: %d stale pages detected after document update: %v", len(staleSlugs), staleSlugs)
// Remove this doc's source ref
var retractSlugs []string
sourceRef := fmt.Sprintf("%s|%s", knowledgeID, docTitle)
prefix := knowledgeID + "|"
// No other sources → delete the page
for _, slug := range staleSlugs {
page, err := s.wikiService.GetPageBySlug(ctx, payload.KnowledgeBaseID, slug)
if err != nil || page == nil {
continue
}
// Remove this doc's source ref
var remaining types.StringArray
for _, ref := range page.SourceRefs {
if ref == knowledgeID || ref == sourceRef || strings.HasPrefix(ref, prefix) {
continue
}
remaining = append(remaining, ref)
}
if len(remaining) == 0 {
// No other sources → delete the page
if err := s.wikiService.DeletePage(ctx, payload.KnowledgeBaseID, slug); err != nil {
logger.Warnf(ctx, "wiki ingest: failed to delete stale page %s: %v", slug, err)
}
} else {
// Multi-source → remove ref, queue retract
page.SourceRefs = remaining
if err := s.wikiService.UpdatePageMeta(ctx, page); err != nil {
logger.Warnf(ctx, "wiki ingest: failed to update stale page %s: %v", slug, err)
} else {
retractSlugs = append(retractSlugs, slug)
}
}
}
if len(retractSlugs) > 0 {
s.retractPagesContent(ctx, chatModel, payload.KnowledgeBaseID, docTitle, docContent, retractSlugs, lang)
}
}
// Multi-source → remove ref, queue retract
// extractedItem represents a single extracted entity or concept
type extractedItem struct {
@@ -1199,189 +585,6 @@ type combinedExtraction struct {
Concepts []extractedItem `json:"concepts"`
}
// extractEntitiesAndConcepts performs a single LLM call to extract both entities and concepts,
// then upserts pages for each. Returns the list of successfully upserted page slugs and
// a slug→display name map for building wiki-link references.
// oldPageSlugs contains slugs from the previous version of this document — passed to LLM for slug stability.
func (s *wikiIngestService) extractEntitiesAndConcepts(
ctx context.Context,
chatModel chat.Chat,
content, docTitle, lang string,
payload WikiIngestPayload,
sourceRef string,
oldPageSlugs map[string]bool,
) ([]string, map[string]extractedItem, error) {
// Build previous slugs listing for the prompt
var prevSlugsText string
if len(oldPageSlugs) > 0 {
var sb strings.Builder
for slug := range oldPageSlugs {
fmt.Fprintf(&sb, "- %s\n", slug)
}
prevSlugsText = sb.String()
} else {
prevSlugsText = "(none — this is a new document)"
}
// Single LLM call for both entities and concepts
extractionJSON, err := s.generateWithTemplate(ctx, chatModel, agent.WikiKnowledgeExtractPrompt, map[string]string{
"Title": docTitle,
"Content": content,
"Language": lang,
"PreviousSlugs": prevSlugsText,
})
if err != nil {
return nil, nil, fmt.Errorf("combined extraction failed: %w", err)
}
// Clean JSON - strip markdown code blocks if present
extractionJSON = strings.TrimSpace(extractionJSON)
extractionJSON = strings.TrimPrefix(extractionJSON, "```json")
extractionJSON = strings.TrimPrefix(extractionJSON, "```")
extractionJSON = strings.TrimSuffix(extractionJSON, "```")
extractionJSON = strings.TrimSpace(extractionJSON)
extractionJSON = sanitizeJSONString(extractionJSON)
var result combinedExtraction
if err := json.Unmarshal([]byte(extractionJSON), &result); err != nil {
logger.Warnf(ctx, "wiki ingest: failed to parse combined extraction JSON: %v\nRaw: %s", err, extractionJSON)
return nil, nil, fmt.Errorf("parse combined extraction JSON: %w", err)
}
var affected []string
// Deduplicate entities against existing wiki pages (LLM-based)
logger.Infof(ctx, "wiki ingest: deduplicating %d entities", len(result.Entities))
result.Entities = s.deduplicateItems(ctx, chatModel, result.Entities, types.WikiPageTypeEntity, payload.KnowledgeBaseID)
// Deduplicate concepts against existing wiki pages (LLM-based)
logger.Infof(ctx, "wiki ingest: deduplicating %d concepts", len(result.Concepts))
result.Concepts = s.deduplicateItems(ctx, chatModel, result.Concepts, types.WikiPageTypeConcept, payload.KnowledgeBaseID)
// Build slug→item map for wiki-link generation in summary pages
slugItems := make(map[string]extractedItem)
for _, item := range result.Entities {
if item.Slug != "" && item.Name != "" {
slugItems[item.Slug] = item
}
}
for _, item := range result.Concepts {
if item.Slug != "" && item.Name != "" {
slugItems[item.Slug] = item
}
}
// Upsert entity pages
logger.Infof(ctx, "wiki ingest: upserting %d entity pages", len(result.Entities))
entitySlugs, err := s.upsertExtractedPages(ctx, chatModel, result.Entities, types.WikiPageTypeEntity, docTitle, lang, payload, sourceRef)
if err != nil {
logger.Warnf(ctx, "wiki ingest: entity upsert failed: %v", err)
} else {
affected = append(affected, entitySlugs...)
}
// Upsert concept pages
logger.Infof(ctx, "wiki ingest: upserting %d concept pages", len(result.Concepts))
conceptSlugs, err := s.upsertExtractedPages(ctx, chatModel, result.Concepts, types.WikiPageTypeConcept, docTitle, lang, payload, sourceRef)
if err != nil {
logger.Warnf(ctx, "wiki ingest: concept upsert failed: %v", err)
} else {
affected = append(affected, conceptSlugs...)
}
return affected, slugItems, nil
}
// upsertExtractedPages creates or updates wiki pages from pre-extracted items.
func (s *wikiIngestService) upsertExtractedPages(
ctx context.Context,
chatModel chat.Chat,
items []extractedItem,
pageType string,
docTitle, lang string,
payload WikiIngestPayload,
sourceRef string,
) ([]string, error) {
var affected []string
for _, item := range items {
if item.Slug == "" || item.Name == "" {
continue
}
// Check if page already exists
existing, err := s.wikiService.GetPageBySlug(ctx, payload.KnowledgeBaseID, item.Slug)
if err == nil && existing != nil {
// Page exists → incremental update
updatedContent, err := s.generateWithTemplate(ctx, chatModel, agent.WikiPageUpdatePrompt, map[string]string{
"ExistingContent": existing.Content,
"NewDocTitle": docTitle,
"NewContent": fmt.Sprintf("**%s**: %s\n\n%s", item.Name, item.Description, item.Details),
"Language": lang,
})
if err != nil {
logger.Warnf(ctx, "wiki ingest: update page %s failed: %v", item.Slug, err)
continue
}
// LLM returns "SUMMARY: ..." on first line — use it as the authoritative summary
updatedSummary, updatedBody := splitSummaryLine(updatedContent)
if updatedBody != "" {
existing.Content = updatedBody
} else {
existing.Content = updatedContent
}
if updatedSummary != "" {
existing.Summary = updatedSummary
}
if len(item.Aliases) > 0 {
// Merge new aliases with existing ones, deduplicating
aliasMap := make(map[string]bool)
for _, alias := range existing.Aliases {
aliasMap[alias] = true
}
for _, newAlias := range item.Aliases {
if !aliasMap[newAlias] {
existing.Aliases = append(existing.Aliases, newAlias)
aliasMap[newAlias] = true
}
}
}
existing.SourceRefs = appendUnique(existing.SourceRefs, sourceRef)
if _, err := s.wikiService.UpdatePage(ctx, existing); err != nil {
logger.Warnf(ctx, "wiki ingest: save updated page %s failed: %v", item.Slug, err)
continue
}
affected = append(affected, item.Slug)
} else {
// New page
pageContent := fmt.Sprintf("# %s\n\n%s\n\n%s\n\n---\n*Source: %s*\n",
item.Name, item.Description, item.Details, docTitle)
if _, err := s.wikiService.CreatePage(ctx, &types.WikiPage{
ID: uuid.New().String(),
TenantID: payload.TenantID,
KnowledgeBaseID: payload.KnowledgeBaseID,
Slug: item.Slug,
Title: item.Name,
Aliases: item.Aliases,
PageType: pageType,
Status: types.WikiPageStatusDraft,
Content: pageContent,
Summary: item.Description,
SourceRefs: types.StringArray{sourceRef},
}); err != nil {
logger.Warnf(ctx, "wiki ingest: create page %s failed: %v", item.Slug, err)
continue
}
affected = append(affected, item.Slug)
}
}
return affected, nil
}
// rebuildIndexPage regenerates the index page.
//
// Strategy: Index = LLM-generated intro (stored in Summary field) + code-generated directory.
@@ -1426,7 +629,7 @@ func (s *wikiIngestService) rebuildIndexPage(ctx context.Context, chatModel chat
// Build document summaries listing (only summary-type pages — they represent documents)
var docSummaries strings.Builder
for _, p := range grouped[types.WikiPageTypeSummary] {
fmt.Fprintf(&docSummaries, "- %s: %s\n", p.Title, p.Summary)
fmt.Fprintf(&docSummaries, "<document>\n<title>%s</title>\n<summary>%s</summary>\n</document>\n\n", p.Title, p.Summary)
}
if docSummaries.Len() == 0 {
docSummaries.WriteString("(no documents yet)")

View File

@@ -0,0 +1,803 @@
package service
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/Tencent/WeKnora/internal/agent"
"github.com/Tencent/WeKnora/internal/logger"
"github.com/Tencent/WeKnora/internal/models/chat"
"github.com/Tencent/WeKnora/internal/types"
"github.com/google/uuid"
"github.com/hibiken/asynq"
"golang.org/x/sync/errgroup"
)
func (s *wikiIngestService) scheduleFollowUp(ctx context.Context, payload WikiIngestPayload) bool {
if s.redisClient == nil {
return false
}
pendingKey := wikiPendingKeyPrefix + payload.KnowledgeBaseID
count, err := s.redisClient.LLen(ctx, pendingKey).Result()
if err != nil || count == 0 {
return false
}
logger.Infof(ctx, "wiki ingest: %d more documents pending for KB %s, scheduling follow-up", count, payload.KnowledgeBaseID)
payloadBytes, _ := json.Marshal(payload)
t := asynq.NewTask(types.TypeWikiIngest, payloadBytes,
asynq.Queue("low"),
asynq.MaxRetry(10), // Increased from 3 to 10 to outlast the active lock TTL
asynq.Timeout(60*time.Minute),
asynq.ProcessIn(5*time.Second), // short delay — active flag will be released by then
)
if _, err := s.task.Enqueue(t); err != nil {
logger.Warnf(ctx, "wiki ingest: follow-up enqueue failed: %v", err)
return false
}
return true
}
func (s *wikiIngestService) ProcessWikiIngest(ctx context.Context, t *asynq.Task) error {
taskStartedAt := time.Now()
retryCount, _ := asynq.GetRetryCount(ctx)
maxRetry, _ := asynq.GetMaxRetry(ctx)
var payload WikiIngestPayload
exitStatus := "success"
mode := "redis"
lockAcquired := false
pendingOpsCount := 0
ingestOps := 0
retractOps := 0
ingestSucceeded := 0
ingestFailed := 0
retractHandled := 0
indexRebuildAttempted := false
indexRebuildSucceeded := false
followUpScheduled := false
totalPagesAffected := 0
docPreview := make([]string, 0, 6)
defer func() {
logger.Infof(
ctx,
"wiki ingest stats: kb=%s tenant=%d retry=%d/%d status=%s elapsed=%s mode=%s lock_acquired=%v pending_ops=%d ops(ingest=%d,retract=%d) ingest(success=%d,failed=%d) retract_handled=%d pages(total=%d) index(rebuild_attempted=%v,rebuild_succeeded=%v) followup=%v preview=%s",
payload.KnowledgeBaseID,
payload.TenantID,
retryCount,
maxRetry,
exitStatus,
time.Since(taskStartedAt).Round(time.Millisecond),
mode,
lockAcquired,
pendingOpsCount,
ingestOps,
retractOps,
ingestSucceeded,
ingestFailed,
retractHandled,
totalPagesAffected,
indexRebuildAttempted,
indexRebuildSucceeded,
followUpScheduled,
previewStringSlice(docPreview, 6),
)
}()
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
exitStatus = "invalid_payload"
return fmt.Errorf("wiki ingest: unmarshal payload: %w", err)
}
// Inject context
ctx = context.WithValue(ctx, types.TenantIDContextKey, payload.TenantID)
if payload.Language != "" {
ctx = context.WithValue(ctx, types.LanguageContextKey, payload.Language)
}
// Try to acquire the "active batch" flag (non-blocking)
if s.redisClient != nil {
activeKey := wikiActiveKeyPrefix + payload.KnowledgeBaseID
acquired, err := s.redisClient.SetNX(ctx, activeKey, "1", 5*time.Minute).Result()
if err != nil {
logger.Warnf(ctx, "wiki ingest: redis SetNX failed: %v", err)
} else if !acquired {
exitStatus = "active_lock_conflict"
logger.Infof(ctx, "wiki ingest: another batch active for KB %s, deferring to asynq retry", payload.KnowledgeBaseID)
return fmt.Errorf("concurrent wiki task active, please retry")
}
lockAcquired = acquired
lockCtx, cancelLock := context.WithCancel(context.Background())
defer func() {
cancelLock()
s.redisClient.Del(context.Background(), activeKey)
}()
go func() {
ticker := time.NewTicker(2 * time.Minute)
defer ticker.Stop()
for {
select {
case <-lockCtx.Done():
return
case <-ticker.C:
s.redisClient.Expire(context.Background(), activeKey, 5*time.Minute)
}
}
}()
} else {
mode = "lite"
}
kb, err := s.kbService.GetKnowledgeBaseByIDOnly(ctx, payload.KnowledgeBaseID)
if err != nil {
exitStatus = "get_kb_failed"
return fmt.Errorf("wiki ingest: get KB: %w", err)
}
if !kb.IsWikiEnabled() {
exitStatus = "kb_not_wiki_enabled"
return fmt.Errorf("wiki ingest: KB %s is not wiki type", kb.ID)
}
if kb.WikiConfig == nil || !kb.WikiConfig.AutoIngest {
exitStatus = "auto_ingest_disabled"
logger.Infof(ctx, "wiki ingest: auto_ingest disabled for KB %s, skipping", kb.ID)
return nil
}
synthesisModelID := kb.WikiConfig.SynthesisModelID
if synthesisModelID == "" {
synthesisModelID = kb.SummaryModelID
}
if synthesisModelID == "" {
exitStatus = "missing_synthesis_model"
return fmt.Errorf("wiki ingest: no synthesis model configured for KB %s", kb.ID)
}
chatModel, err := s.modelService.GetChatModel(ctx, synthesisModelID)
if err != nil {
exitStatus = "get_chat_model_failed"
return fmt.Errorf("wiki ingest: get chat model: %w", err)
}
lang := types.LanguageNameFromContext(ctx)
pendingOps, peekedCount := s.peekPendingList(ctx, payload.KnowledgeBaseID)
pendingOpsCount = len(pendingOps)
if len(pendingOps) == 0 {
if s.redisClient != nil {
exitStatus = "no_pending_ops"
logger.Infof(ctx, "wiki ingest: no pending operations for KB %s", payload.KnowledgeBaseID)
return nil
}
if len(payload.LiteOps) > 0 {
pendingOps = payload.LiteOps
peekedCount = len(pendingOps)
pendingOpsCount = len(pendingOps)
} else {
exitStatus = "no_lite_ops"
return nil
}
}
logger.Infof(ctx, "wiki ingest: batch processing %d ops for KB %s", len(pendingOps), payload.KnowledgeBaseID)
// Fetch all existing pages to pass to the Map-Reduce phases
allPages, _ := s.wikiService.ListAllPages(ctx, payload.KnowledgeBaseID)
batchCtx := &WikiBatchContext{
SlugTitleMap: make(map[string]string),
SummaryContentByKnowledgeID: make(map[string]string),
}
for _, p := range allPages {
if p.PageType != types.WikiPageTypeIndex && p.PageType != types.WikiPageTypeLog && p.Status != types.WikiPageStatusArchived {
batchCtx.SlugTitleMap[p.Slug] = p.Title
}
if p.PageType == types.WikiPageTypeSummary && p.Content != "" {
for _, ref := range p.SourceRefs {
kid := ref
if pipeIdx := strings.Index(ref, "|"); pipeIdx > 0 {
kid = ref[:pipeIdx]
}
batchCtx.SummaryContentByKnowledgeID[kid] = p.Content
}
}
}
// 1. MAP PHASE (Parallel extraction and generation of updates)
var mapMu sync.Mutex
slugUpdates := make(map[string][]SlugUpdate)
var docResults []*docIngestResult
var retractChangeDesc strings.Builder
eg, mapCtx := errgroup.WithContext(ctx)
eg.SetLimit(10) // Map phase limit
for _, op := range pendingOps {
op := op
eg.Go(func() error {
if op.Op == WikiOpRetract {
mapMu.Lock()
retractOps++
retractHandled++
docPreview = append(docPreview, fmt.Sprintf("retract[%s]: %s", previewText(op.KnowledgeID, 24), previewText(op.DocTitle, 48)))
fmt.Fprintf(&retractChangeDesc, "<document_removed>\n<title>%s</title>\n<summary>%s</summary>\n</document_removed>\n\n", op.DocTitle, op.DocSummary)
for _, slug := range op.PageSlugs {
slugUpdates[slug] = append(slugUpdates[slug], SlugUpdate{
Slug: slug,
Type: "retract",
RetractDocContent: op.DocSummary,
DocTitle: op.DocTitle,
KnowledgeID: op.KnowledgeID,
Language: op.Language,
})
}
mapMu.Unlock()
return nil
}
// Ingest
mapMu.Lock()
ingestOps++
mapMu.Unlock()
logger.Infof(mapCtx, "wiki ingest: processing document '%s' (%s)", op.DocTitle, op.KnowledgeID)
result, updates, err := s.mapOneDocument(mapCtx, chatModel, payload, op, batchCtx)
if err != nil {
mapMu.Lock()
ingestFailed++
mapMu.Unlock()
logger.Warnf(mapCtx, "wiki ingest: failed to map knowledge %s: %v", op.KnowledgeID, err)
return nil // Don't fail the whole batch
}
if result != nil {
mapMu.Lock()
ingestSucceeded++
docResults = append(docResults, result)
docPreview = append(docPreview, fmt.Sprintf("ingest[%s]: title=%s summary=%s", previewText(result.KnowledgeID, 24), previewText(result.DocTitle, 40), previewText(result.Summary, 64)))
for _, u := range updates {
slugUpdates[u.Slug] = append(slugUpdates[u.Slug], u)
}
mapMu.Unlock()
}
return nil
})
}
_ = eg.Wait()
// 2. REDUCE PHASE (Parallel upserting grouped by Slug)
egReduce, reduceCtx := errgroup.WithContext(ctx)
egReduce.SetLimit(10) // Reduce phase limit (LLM + DB concurrent connections)
var reduceMu sync.Mutex
var allPagesAffected []string
var ingestPagesAffected []string
var retractPagesAffected []string
for slug, updates := range slugUpdates {
slug := slug
updates := updates
egReduce.Go(func() error {
changed, affectedType, err := s.reduceSlugUpdates(reduceCtx, chatModel, payload.KnowledgeBaseID, slug, updates, payload.TenantID, batchCtx)
if err != nil {
logger.Warnf(reduceCtx, "wiki ingest: reduce failed for slug %s: %v", slug, err)
}
if changed {
reduceMu.Lock()
allPagesAffected = append(allPagesAffected, slug)
if affectedType == "ingest" {
ingestPagesAffected = append(ingestPagesAffected, slug)
} else if affectedType == "retract" {
retractPagesAffected = append(retractPagesAffected, slug)
}
reduceMu.Unlock()
}
return nil
})
}
_ = egReduce.Wait()
totalPagesAffected = len(allPagesAffected)
// Append log entry for retracts (since retracts aren't in docResults)
for _, op := range pendingOps {
if op.Op == WikiOpRetract {
s.appendLogEntry(ctx, WikiIngestPayload{
TenantID: payload.TenantID,
KnowledgeBaseID: payload.KnowledgeBaseID,
}, "retract", op.KnowledgeID, op.DocTitle, op.PageSlugs, "")
}
}
// Build change description for the Index Intro LLM prompt
var changeDesc strings.Builder
if len(docResults) > 0 {
for _, r := range docResults {
fmt.Fprintf(&changeDesc, "<document_added>\n<title>%s</title>\n<summary>%s</summary>\n</document_added>\n\n", r.DocTitle, r.Summary)
}
}
if retractChangeDesc.Len() > 0 {
changeDesc.WriteString(retractChangeDesc.String())
}
// Rebuild index page
if changeDesc.Len() > 0 {
indexRebuildAttempted = true
logger.Infof(ctx, "wiki ingest: rebuilding index page")
if err := s.rebuildIndexPage(ctx, chatModel, payload, changeDesc.String(), lang); err != nil {
logger.Warnf(ctx, "wiki ingest: rebuild index failed: %v", err)
docPreview = append(docPreview, fmt.Sprintf("index_change=%s", previewText(changeDesc.String(), 160)))
} else {
indexRebuildSucceeded = true
docPreview = append(docPreview, fmt.Sprintf("index_change=%s", previewText(changeDesc.String(), 160)))
}
}
// Append log entry for ingests
if len(docResults) > 0 {
s.appendLogEntry(ctx, payload, "ingest", "", fmt.Sprintf("%d documents", len(docResults)), ingestPagesAffected, "")
}
if len(retractPagesAffected) > 0 {
logger.Infof(ctx, "wiki ingest: cleaning dead links")
s.cleanDeadLinks(ctx, payload.KnowledgeBaseID)
}
if len(allPagesAffected) > 0 {
logger.Infof(ctx, "wiki ingest: injecting cross links")
s.injectCrossLinks(ctx, payload.KnowledgeBaseID, allPagesAffected)
logger.Infof(ctx, "wiki ingest: publishing draft pages")
s.publishDraftPages(ctx, payload.KnowledgeBaseID, allPagesAffected)
}
s.trimPendingList(ctx, payload.KnowledgeBaseID, peekedCount)
logger.Infof(ctx, "wiki ingest: batch completed for KB %s, %d ops, %d pages affected", payload.KnowledgeBaseID, len(pendingOps), len(allPagesAffected))
followUpScheduled = s.scheduleFollowUp(ctx, payload)
return nil
}
func (s *wikiIngestService) mapOneDocument(
ctx context.Context,
chatModel chat.Chat,
payload WikiIngestPayload,
op WikiPendingOp,
batchCtx *WikiBatchContext,
) (*docIngestResult, []SlugUpdate, error) {
docStartedAt := time.Now()
knowledgeID := op.KnowledgeID
lang := op.Language
chunks, err := s.chunkRepo.ListChunksByKnowledgeID(ctx, payload.TenantID, knowledgeID)
if err != nil {
return nil, nil, fmt.Errorf("get chunks: %w", err)
}
if len(chunks) == 0 {
logger.Infof(ctx, "wiki ingest: document %s has no chunks, skip", knowledgeID)
return nil, nil, nil
}
content := reconstructContent(chunks)
rawRuneCount := len([]rune(content))
if len([]rune(content)) > maxContentForWiki {
content = string([]rune(content)[:maxContentForWiki])
}
logger.Infof(ctx, "wiki ingest: doc %s chunks=%d content_len(raw=%d,truncated=%d)", knowledgeID, len(chunks), rawRuneCount, len([]rune(content)))
docTitle := knowledgeID
if kn, err := s.knowledgeSvc.GetKnowledgeByIDOnly(ctx, knowledgeID); err == nil && kn != nil && kn.Title != "" {
docTitle = kn.Title
} else {
for _, ch := range chunks {
if ch.Content != "" {
lines := strings.SplitN(ch.Content, "\n", 2)
if len(lines) > 0 && len(lines[0]) > 0 && len(lines[0]) < 200 {
docTitle = strings.TrimPrefix(strings.TrimSpace(lines[0]), "# ")
break
}
}
}
}
sourceRef := fmt.Sprintf("%s|%s", knowledgeID, docTitle)
oldPageSlugs := s.getExistingPageSlugsForKnowledge(ctx, payload.KnowledgeBaseID, knowledgeID)
logger.Infof(ctx, "wiki ingest: extracting entities and concepts for %s", knowledgeID)
extractedEntities, extractedConcepts, slugItems, err := s.extractEntitiesAndConceptsNoUpsert(ctx, chatModel, content, docTitle, lang, payload, oldPageSlugs)
if err != nil {
logger.Warnf(ctx, "wiki ingest: knowledge extraction failed for %s: %v", knowledgeID, err)
return nil, nil, err
}
var extractedPages []string
for slug := range slugItems {
extractedPages = append(extractedPages, slug)
}
// Summary
summarySlug := fmt.Sprintf("summary/%s", slugify(docTitle))
var slugListing string
for _, slug := range extractedPages {
if item, ok := slugItems[slug]; ok {
aliases := ""
if len(item.Aliases) > 0 {
aliases = fmt.Sprintf(" (Aliases: %s)", strings.Join(item.Aliases, ", "))
}
slugListing += fmt.Sprintf("- [[%s]] = %s%s\n", slug, item.Name, aliases)
} else {
slugListing += fmt.Sprintf("- [[%s]]\n", slug)
}
}
var docSummaryLine string
summaryContent, err := s.generateWithTemplate(ctx, chatModel, agent.WikiSummaryPrompt, map[string]string{
"Title": docTitle,
"FileName": docTitle,
"FileType": "document",
"Content": content,
"Language": lang,
"ExtractedSlugs": slugListing,
})
var updates []SlugUpdate
if err != nil {
logger.Errorf(ctx, "wiki ingest: generate summary failed for %s: %v", knowledgeID, err)
} else {
sumLine, sumBody := splitSummaryLine(summaryContent)
if sumBody == "" {
sumBody = summaryContent
}
if sumLine == "" {
sumLine = docTitle
}
docSummaryLine = sumLine
updates = append(updates, SlugUpdate{
Slug: summarySlug,
Type: types.WikiPageTypeSummary,
DocTitle: docTitle,
KnowledgeID: knowledgeID,
SourceRef: sourceRef,
Language: lang,
SummaryLine: sumLine,
SummaryBody: sumBody,
})
extractedPages = append(extractedPages, summarySlug)
}
// Entities
for _, item := range extractedEntities {
if item.Slug != "" {
updates = append(updates, SlugUpdate{
Slug: item.Slug,
Type: types.WikiPageTypeEntity,
Item: item,
DocTitle: docTitle,
KnowledgeID: knowledgeID,
SourceRef: sourceRef,
Language: lang,
})
}
}
// Concepts
for _, item := range extractedConcepts {
if item.Slug != "" {
updates = append(updates, SlugUpdate{
Slug: item.Slug,
Type: types.WikiPageTypeConcept,
Item: item,
DocTitle: docTitle,
KnowledgeID: knowledgeID,
SourceRef: sourceRef,
Language: lang,
})
}
}
// Stale Pages
for oldSlug := range oldPageSlugs {
found := false
for _, newSlug := range extractedPages {
if oldSlug == newSlug {
found = true
break
}
}
if !found {
updates = append(updates, SlugUpdate{
Slug: oldSlug,
Type: "retractStale",
RetractDocContent: content,
DocTitle: docTitle,
KnowledgeID: knowledgeID,
Language: lang,
})
}
}
logger.Infof(ctx, "wiki ingest: mapped knowledge %s title=%q generated_updates=%d elapsed=%s",
knowledgeID, previewText(docTitle, 80), len(updates), time.Since(docStartedAt).Round(time.Millisecond))
return &docIngestResult{
KnowledgeID: knowledgeID,
DocTitle: docTitle,
Summary: docSummaryLine,
}, updates, nil
}
func (s *wikiIngestService) extractEntitiesAndConceptsNoUpsert(
ctx context.Context,
chatModel chat.Chat,
content, docTitle, lang string,
payload WikiIngestPayload,
oldPageSlugs map[string]bool,
) ([]extractedItem, []extractedItem, map[string]extractedItem, error) {
var prevSlugsText string
if len(oldPageSlugs) > 0 {
var sb strings.Builder
for slug := range oldPageSlugs {
fmt.Fprintf(&sb, "- %s\n", slug)
}
prevSlugsText = sb.String()
} else {
prevSlugsText = "(none — this is a new document)"
}
extractionJSON, err := s.generateWithTemplate(ctx, chatModel, agent.WikiKnowledgeExtractPrompt, map[string]string{
"Title": docTitle,
"Content": content,
"Language": lang,
"PreviousSlugs": prevSlugsText,
})
if err != nil {
return nil, nil, nil, fmt.Errorf("combined extraction failed: %w", err)
}
extractionJSON = strings.TrimSpace(extractionJSON)
extractionJSON = strings.TrimPrefix(extractionJSON, "```json")
extractionJSON = strings.TrimPrefix(extractionJSON, "```")
extractionJSON = strings.TrimSuffix(extractionJSON, "```")
extractionJSON = strings.TrimSpace(extractionJSON)
extractionJSON = sanitizeJSONString(extractionJSON)
var result combinedExtraction
if err := json.Unmarshal([]byte(extractionJSON), &result); err != nil {
logger.Warnf(ctx, "wiki ingest: failed to parse combined extraction JSON: %v\nRaw: %s", err, extractionJSON)
return nil, nil, nil, fmt.Errorf("parse combined extraction JSON: %w", err)
}
result.Entities = s.deduplicateItems(ctx, chatModel, result.Entities, types.WikiPageTypeEntity, payload.KnowledgeBaseID)
result.Concepts = s.deduplicateItems(ctx, chatModel, result.Concepts, types.WikiPageTypeConcept, payload.KnowledgeBaseID)
slugItems := make(map[string]extractedItem)
for _, item := range result.Entities {
if item.Slug != "" && item.Name != "" {
slugItems[item.Slug] = item
}
}
for _, item := range result.Concepts {
if item.Slug != "" && item.Name != "" {
slugItems[item.Slug] = item
}
}
return result.Entities, result.Concepts, slugItems, nil
}
func (s *wikiIngestService) reduceSlugUpdates(
ctx context.Context,
chatModel chat.Chat,
kbID string,
slug string,
updates []SlugUpdate,
tenantID uint64,
batchCtx *WikiBatchContext,
) (bool, string, error) {
page, err := s.wikiService.GetPageBySlug(ctx, kbID, slug)
exists := (err == nil && page != nil)
if !exists {
hasAdditions := false
for _, u := range updates {
if u.Type == types.WikiPageTypeEntity || u.Type == types.WikiPageTypeConcept || u.Type == "summary" {
hasAdditions = true
break
}
}
if !hasAdditions {
return false, "", nil
}
page = &types.WikiPage{
ID: uuid.New().String(),
TenantID: tenantID,
KnowledgeBaseID: kbID,
Slug: slug,
Status: types.WikiPageStatusDraft,
SourceRefs: types.StringArray{},
Aliases: types.StringArray{},
}
}
changed := false
affectedType := "ingest"
var summaryUpdate *SlugUpdate
var retracts []SlugUpdate
var additions []SlugUpdate
for i, u := range updates {
if u.Type == "summary" {
summaryUpdate = &updates[i]
} else if u.Type == "retract" || u.Type == "retractStale" {
retracts = append(retracts, u)
affectedType = "retract"
} else if u.Type == types.WikiPageTypeEntity || u.Type == types.WikiPageTypeConcept {
additions = append(additions, u)
affectedType = "ingest" // Additions override retracts type
}
}
if summaryUpdate != nil {
page.Title = summaryUpdate.DocTitle + " - Summary"
page.Content = summaryUpdate.SummaryBody
page.Summary = summaryUpdate.SummaryLine
page.PageType = types.WikiPageTypeSummary
page.SourceRefs = appendUnique(page.SourceRefs, summaryUpdate.SourceRef)
changed = true
if exists {
_, err = s.wikiService.UpdatePage(ctx, page)
} else {
_, err = s.wikiService.CreatePage(ctx, page)
}
return changed, affectedType, err
}
var remainingSourcesContent strings.Builder
var deletedContent strings.Builder
var relatedSlugs strings.Builder
var newContentBuilder strings.Builder
var docTitles []string
var language string
if len(retracts) > 0 {
language = retracts[0].Language
for _, r := range retracts {
fmt.Fprintf(&deletedContent, "<document>\n<title>%s</title>\n<content>\n%s\n</content>\n</document>\n\n", r.DocTitle, r.RetractDocContent)
}
retractKIDs := make(map[string]bool)
for _, r := range retracts {
retractKIDs[r.KnowledgeID] = true
}
for _, ref := range page.SourceRefs {
pipeIdx := strings.Index(ref, "|")
var refKnowledgeID, refTitle string
if pipeIdx > 0 {
refKnowledgeID = ref[:pipeIdx]
refTitle = ref[pipeIdx+1:]
} else {
refKnowledgeID = ref
refTitle = ref
}
if retractKIDs[refKnowledgeID] {
continue
}
if content, ok := batchCtx.SummaryContentByKnowledgeID[refKnowledgeID]; ok {
fmt.Fprintf(&remainingSourcesContent, "<document>\n<title>%s</title>\n<content>\n%s\n</content>\n</document>\n\n", refTitle, content)
} else {
fmt.Fprintf(&remainingSourcesContent, "<document>\n<title>%s</title>\n<content>\n(summary not available)\n</content>\n</document>\n\n", refTitle)
}
}
if remainingSourcesContent.Len() == 0 {
remainingSourcesContent.WriteString("(no remaining sources)")
}
newRefs := types.StringArray{}
for _, ref := range page.SourceRefs {
pipeIdx := strings.Index(ref, "|")
refKnowledgeID := ref
if pipeIdx > 0 {
refKnowledgeID = ref[:pipeIdx]
}
if !retractKIDs[refKnowledgeID] {
newRefs = append(newRefs, ref)
}
}
page.SourceRefs = newRefs
}
if len(additions) > 0 {
language = additions[0].Language
for _, add := range additions {
fmt.Fprintf(&newContentBuilder, "<document>\n<title>%s</title>\n<content>\n**%s**: %s\n\n%s\n</content>\n</document>\n\n",
add.DocTitle, add.Item.Name, add.Item.Description, add.Item.Details)
docTitles = appendUnique(docTitles, add.DocTitle)
for _, alias := range add.Item.Aliases {
page.Aliases = appendUnique(page.Aliases, alias)
}
page.SourceRefs = appendUnique(page.SourceRefs, add.SourceRef)
if page.Title == "" {
page.Title = add.Item.Name
}
if page.PageType == "" {
page.PageType = add.Type
}
}
}
if len(additions) > 0 || len(retracts) > 0 {
for _, outSlug := range page.OutLinks {
if title, ok := batchCtx.SlugTitleMap[outSlug]; ok {
fmt.Fprintf(&relatedSlugs, "- %s (%s)\n", outSlug, title)
}
}
existingContent := page.Content
if !exists || existingContent == "" {
existingContent = "(New page)"
}
hasAdditionsStr := ""
if len(additions) > 0 {
hasAdditionsStr = "1"
}
hasRetractionsStr := ""
if len(retracts) > 0 {
hasRetractionsStr = "1"
}
updatedContent, err := s.generateWithTemplate(ctx, chatModel, agent.WikiPageModifyPrompt, map[string]string{
"HasAdditions": hasAdditionsStr,
"HasRetractions": hasRetractionsStr,
"ExistingContent": existingContent,
"NewContent": newContentBuilder.String(),
"DeletedContent": deletedContent.String(),
"RemainingSourcesContent": remainingSourcesContent.String(),
"AvailableSlugs": relatedSlugs.String(),
"Language": language,
})
if err == nil && updatedContent != "" {
updatedSummary, updatedBody := splitSummaryLine(updatedContent)
if updatedBody != "" {
page.Content = updatedBody
} else {
page.Content = updatedContent
}
if updatedSummary != "" {
page.Summary = updatedSummary
}
changed = true
} else if err != nil {
logger.Warnf(ctx, "wiki ingest: update/retract failed for slug %s: %v", slug, err)
}
}
if changed {
if exists {
_, err = s.wikiService.UpdatePage(ctx, page)
} else {
_, err = s.wikiService.CreatePage(ctx, page)
}
return true, affectedType, err
}
return false, "", nil
}

View File

@@ -143,7 +143,9 @@ func Logger() gin.HandlerFunc {
start := time.Now()
path := c.Request.URL.Path
raw := c.Request.URL.RawQuery
if strings.HasPrefix(path, "/assets/") {
isWikiStats := strings.HasPrefix(path, "/api/v1/knowledgebase/") && strings.HasSuffix(path, "/wiki/stats")
if strings.HasPrefix(path, "/assets/") || isWikiStats {
c.Next()
return
}