diff --git a/internal/container/container.go b/internal/container/container.go index 0ea27f11..3c582287 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -635,110 +635,6 @@ func syncSequences(db *gorm.DB) { } } -// resetPendingTasks resets the state of any knowledge items or sync logs stuck in processing -// due to an unexpected application restart. -// -// In Lite mode (no REDIS_ADDR) every queued task lived in process memory, so -// any "processing" row at startup is definitively orphaned and must be marked -// failed wholesale. In distributed mode (Asynq on Redis) the active queue -// survives restart, but the *currently executing* task on the dead instance -// is lost — Asynq won't reschedule it until at-least-once retry kicks in, -// which can take minutes or never (e.g. if the deadline has passed). To bound -// the worst case we mark only "long-stale" rows failed: anything that hasn't -// been touched for 30 minutes is well past any reasonable in-flight window. -// Newer rows are left alone so we don't race a peer instance that's mid-process. -func resetPendingTasks(db *gorm.DB) { - distributed := os.Getenv("REDIS_ADDR") != "" - ctx := context.Background() - spanRepo := repository.NewKnowledgeSpanRepository(db) - - knowledgeQuery := db.Model(&types.Knowledge{}). - Where("parse_status IN ?", []string{ - types.ParseStatusPending, - types.ParseStatusProcessing, - types.ParseStatusFinalizing, - types.ParseStatusDeleting, - }) - summaryQuery := db.Model(&types.Knowledge{}). - Where("summary_status IN ?", []string{types.SummaryStatusPending, types.SummaryStatusProcessing}) - syncQuery := db.Model(&types.SyncLog{}). - Where("status IN ?", []string{types.SyncLogStatusRunning, "pending"}) - - if distributed { - // 30-minute stale threshold — comfortably longer than any individual - // stage's expected duration (DocReader 30m, embedding seconds, etc.) - // while short enough that operators don't wait hours after a crash. - staleCutoff := time.Now().Add(-30 * time.Minute) - knowledgeQuery = knowledgeQuery.Where("updated_at < ?", staleCutoff) - summaryQuery = summaryQuery.Where("updated_at < ?", staleCutoff) - syncQuery = syncQuery.Where("start_time < ?", staleCutoff) - } - - // Cancel orphaned trace spans for knowledge rows we are about to mark - // failed. resetPendingTasks does not touch asynq queues; this only - // prevents the UI from showing duplicate running postprocess.* - // subspans when a later retry also opens fresh spans. - var stuckKnowledge []types.Knowledge - if err := knowledgeQuery.Select("id").Find(&stuckKnowledge).Error; err != nil { - logger.Warnf(ctx, "resetPendingTasks: list stuck knowledge failed: %v", err) - } else { - for _, k := range stuckKnowledge { - attempt, err := spanRepo.LatestAttempt(ctx, k.ID) - if err != nil || attempt <= 0 { - continue - } - if n, err := spanRepo.CancelAllOpenSpans(ctx, k.ID, attempt, - "SERVER_RESTART", "task interrupted due to application restart"); err != nil { - logger.Warnf(ctx, "resetPendingTasks: cancel spans for %s failed: %v", k.ID, err) - } else if n > 0 { - logger.Infof(ctx, "resetPendingTasks: cancelled %d open span(s) for knowledge %s attempt %d", - n, k.ID, attempt) - } - } - } - - // 1. Reset knowledge parsing tasks (including finalizing rows whose - // enrichment subtasks were lost with the process). - result := knowledgeQuery.Updates(map[string]interface{}{ - "parse_status": types.ParseStatusFailed, - "error_message": "Task interrupted due to application restart", - "pending_subtasks_count": 0, - }) - if result.Error != nil { - logger.Warnf(context.Background(), "Failed to reset pending knowledge tasks: %v", result.Error) - } else if result.RowsAffected > 0 { - logger.Infof(context.Background(), - "Reset %d stuck knowledge parsing tasks to failed state (distributed=%v)", - result.RowsAffected, distributed) - } - - // 2. Reset knowledge summary tasks - resultSummary := summaryQuery.Updates(map[string]interface{}{ - "summary_status": types.SummaryStatusFailed, - }) - if resultSummary.Error != nil { - logger.Warnf(context.Background(), "Failed to reset pending summary tasks: %v", resultSummary.Error) - } else if resultSummary.RowsAffected > 0 { - logger.Infof(context.Background(), - "Reset %d stuck summary generation tasks to failed state (distributed=%v)", - resultSummary.RowsAffected, distributed) - } - - // 3. Reset data source sync tasks - resultSync := syncQuery.Updates(map[string]interface{}{ - "status": types.SyncLogStatusFailed, - "error_message": "Sync interrupted due to application restart", - "end_time": time.Now(), - }) - if resultSync.Error != nil { - logger.Warnf(context.Background(), "Failed to reset pending data source sync tasks: %v", resultSync.Error) - } else if resultSync.RowsAffected > 0 { - logger.Infof(context.Background(), - "Reset %d stuck data source sync tasks to failed state (distributed=%v)", - resultSync.RowsAffected, distributed) - } -} - // initFileService initializes file storage service // Creates the appropriate file storage service based on configuration // Supports multiple storage backends (MinIO, COS, local filesystem) diff --git a/internal/container/reset_pending_tasks.go b/internal/container/reset_pending_tasks.go new file mode 100644 index 00000000..2d4c2c2b --- /dev/null +++ b/internal/container/reset_pending_tasks.go @@ -0,0 +1,137 @@ +package container + +import ( + "context" + "os" + "time" + + "github.com/Tencent/WeKnora/internal/application/repository" + "github.com/Tencent/WeKnora/internal/logger" + "github.com/Tencent/WeKnora/internal/types" + "gorm.io/gorm" +) + +const resetPendingStaleWindow = 30 * time.Minute + +// resetPendingTasks resets the state of any knowledge items or sync logs stuck in processing +// due to an unexpected application restart. +// +// In Lite mode (no REDIS_ADDR) every queued task lived in process memory, so +// any "processing" row at startup is definitively orphaned and must be marked +// failed wholesale. In distributed mode (Asynq on Redis) the active queue +// survives restart, but the *currently executing* task on the dead instance +// is lost — Asynq won't reschedule it until at-least-once retry kicks in, +// which can take minutes or never (e.g. if the deadline has passed). To bound +// the worst case we mark only "long-stale" rows failed: anything that hasn't +// been touched for 30 minutes is well past any reasonable in-flight window. +// Newer rows are left alone so we don't race a peer instance that's mid-process. +func resetPendingTasks(db *gorm.DB) { + distributed := os.Getenv("REDIS_ADDR") != "" + ctx := context.Background() + spanRepo := repository.NewKnowledgeSpanRepository(db) + + var staleCutoff time.Time + if distributed { + staleCutoff = time.Now().Add(-resetPendingStaleWindow) + } + + // Cancel orphaned trace spans for knowledge rows we are about to mark + // failed. resetPendingTasks does not touch asynq queues; this only + // prevents the UI from showing duplicate running postprocess.* + // subspans when a later retry also opens fresh spans. + var stuckKnowledge []types.Knowledge + if err := stuckKnowledgeParseQuery(db, distributed, staleCutoff). + Select("id").Find(&stuckKnowledge).Error; err != nil { + logger.Warnf(ctx, "resetPendingTasks: list stuck knowledge failed: %v", err) + } else { + for _, k := range stuckKnowledge { + attempt, err := spanRepo.LatestAttempt(ctx, k.ID) + if err != nil || attempt <= 0 { + continue + } + if n, err := spanRepo.CancelAllOpenSpans(ctx, k.ID, attempt, + "SERVER_RESTART", "task interrupted due to application restart"); err != nil { + logger.Warnf(ctx, "resetPendingTasks: cancel spans for %s failed: %v", k.ID, err) + } else if n > 0 { + logger.Infof(ctx, "resetPendingTasks: cancelled %d open span(s) for knowledge %s attempt %d", + n, k.ID, attempt) + } + } + } + + // 1. Reset knowledge parsing tasks (including finalizing rows whose + // enrichment subtasks were lost with the process). + // Fresh query — reusing the *gorm.DB chain after Find() makes GORM emit + // UPDATE ... FROM knowledges which PostgreSQL rejects (SQLSTATE 42712). + result := stuckKnowledgeParseQuery(db, distributed, staleCutoff).Updates(map[string]interface{}{ + "parse_status": types.ParseStatusFailed, + "error_message": "Task interrupted due to application restart", + "pending_subtasks_count": 0, + }) + if result.Error != nil { + logger.Warnf(context.Background(), "Failed to reset pending knowledge tasks: %v", result.Error) + } else if result.RowsAffected > 0 { + logger.Infof(context.Background(), + "Reset %d stuck knowledge parsing tasks to failed state (distributed=%v)", + result.RowsAffected, distributed) + } + + // 2. Reset knowledge summary tasks + resultSummary := stuckKnowledgeSummaryQuery(db, distributed, staleCutoff).Updates(map[string]interface{}{ + "summary_status": types.SummaryStatusFailed, + }) + if resultSummary.Error != nil { + logger.Warnf(context.Background(), "Failed to reset pending summary tasks: %v", resultSummary.Error) + } else if resultSummary.RowsAffected > 0 { + logger.Infof(context.Background(), + "Reset %d stuck summary generation tasks to failed state (distributed=%v)", + resultSummary.RowsAffected, distributed) + } + + // 3. Reset data source sync tasks + now := time.Now() + resultSync := stuckSyncLogQuery(db, distributed, staleCutoff).Updates(map[string]interface{}{ + "status": types.SyncLogStatusFailed, + "error_message": "Sync interrupted due to application restart", + "finished_at": &now, + }) + if resultSync.Error != nil { + logger.Warnf(context.Background(), "Failed to reset pending data source sync tasks: %v", resultSync.Error) + } else if resultSync.RowsAffected > 0 { + logger.Infof(context.Background(), + "Reset %d stuck data source sync tasks to failed state (distributed=%v)", + resultSync.RowsAffected, distributed) + } +} + +func stuckKnowledgeParseQuery(db *gorm.DB, distributed bool, staleCutoff time.Time) *gorm.DB { + q := db.Model(&types.Knowledge{}). + Where("parse_status IN ?", []string{ + types.ParseStatusPending, + types.ParseStatusProcessing, + types.ParseStatusFinalizing, + types.ParseStatusDeleting, + }) + if distributed { + q = q.Where("updated_at < ?", staleCutoff) + } + return q +} + +func stuckKnowledgeSummaryQuery(db *gorm.DB, distributed bool, staleCutoff time.Time) *gorm.DB { + q := db.Model(&types.Knowledge{}). + Where("summary_status IN ?", []string{types.SummaryStatusPending, types.SummaryStatusProcessing}) + if distributed { + q = q.Where("updated_at < ?", staleCutoff) + } + return q +} + +func stuckSyncLogQuery(db *gorm.DB, distributed bool, staleCutoff time.Time) *gorm.DB { + q := db.Model(&types.SyncLog{}). + Where("status = ?", types.SyncLogStatusRunning) + if distributed { + q = q.Where("started_at < ?", staleCutoff) + } + return q +} diff --git a/internal/container/reset_pending_tasks_test.go b/internal/container/reset_pending_tasks_test.go new file mode 100644 index 00000000..97128dd5 --- /dev/null +++ b/internal/container/reset_pending_tasks_test.go @@ -0,0 +1,145 @@ +package container + +import ( + "os" + "testing" + "time" + + "github.com/Tencent/WeKnora/internal/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +const resetPendingKnowledgeDDL = ` +CREATE TABLE IF NOT EXISTS knowledges ( + id VARCHAR(64) PRIMARY KEY, + parse_status VARCHAR(32) NOT NULL DEFAULT 'pending', + summary_status VARCHAR(32) NOT NULL DEFAULT 'none', + pending_subtasks_count INTEGER NOT NULL DEFAULT 0, + error_message TEXT, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, + deleted_at DATETIME +); +` + +const resetPendingSyncLogDDL = ` +CREATE TABLE IF NOT EXISTS sync_logs ( + id VARCHAR(64) PRIMARY KEY, + data_source_id VARCHAR(64) NOT NULL DEFAULT '', + tenant_id INTEGER NOT NULL DEFAULT 0, + status VARCHAR(32) NOT NULL, + started_at DATETIME, + finished_at DATETIME, + error_message TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP +); +` + +func setupResetPendingDB(t *testing.T) *gorm.DB { + t.Helper() + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + require.NoError(t, err) + require.NoError(t, db.Exec(resetPendingKnowledgeDDL).Error) + require.NoError(t, db.Exec(resetPendingSyncLogDDL).Error) + return db +} + +func TestResetPendingTasks_KnowledgeFindThenUpdate(t *testing.T) { + db := setupResetPendingDB(t) + stale := time.Now().Add(-2 * time.Hour) + require.NoError(t, db.Exec( + `INSERT INTO knowledges (id, parse_status, updated_at) VALUES (?, ?, ?)`, + "k-stuck", types.ParseStatusProcessing, stale, + ).Error) + + t.Setenv("REDIS_ADDR", "redis:6379") + resetPendingTasks(db) + + var status, errMsg string + require.NoError(t, db.Raw( + `SELECT parse_status, error_message FROM knowledges WHERE id = ?`, "k-stuck", + ).Row().Scan(&status, &errMsg)) + assert.Equal(t, types.ParseStatusFailed, status) + assert.Contains(t, errMsg, "application restart") +} + +func TestResetPendingTasks_KnowledgeFreshInDistributedMode(t *testing.T) { + db := setupResetPendingDB(t) + fresh := time.Now().Add(-5 * time.Minute) + require.NoError(t, db.Exec( + `INSERT INTO knowledges (id, parse_status, updated_at) VALUES (?, ?, ?)`, + "k-fresh", types.ParseStatusProcessing, fresh, + ).Error) + + t.Setenv("REDIS_ADDR", "redis:6379") + resetPendingTasks(db) + + var status string + require.NoError(t, db.Raw( + `SELECT parse_status FROM knowledges WHERE id = ?`, "k-fresh", + ).Row().Scan(&status)) + assert.Equal(t, types.ParseStatusProcessing, status) +} + +func TestResetPendingTasks_SyncLogStaleRunning(t *testing.T) { + db := setupResetPendingDB(t) + stale := time.Now().Add(-2 * time.Hour) + require.NoError(t, db.Exec( + `INSERT INTO sync_logs (id, status, started_at) VALUES (?, ?, ?)`, + "sync-1", types.SyncLogStatusRunning, stale, + ).Error) + + t.Setenv("REDIS_ADDR", "redis:6379") + resetPendingTasks(db) + + var status string + var finishedAt *time.Time + require.NoError(t, db.Raw( + `SELECT status, finished_at FROM sync_logs WHERE id = ?`, "sync-1", + ).Row().Scan(&status, &finishedAt)) + assert.Equal(t, types.SyncLogStatusFailed, status) + require.NotNil(t, finishedAt) +} + +func TestResetPendingTasks_SyncLogLiteMode(t *testing.T) { + db := setupResetPendingDB(t) + os.Unsetenv("REDIS_ADDR") + require.NoError(t, db.Exec( + `INSERT INTO sync_logs (id, status, started_at) VALUES (?, ?, ?)`, + "sync-lite", types.SyncLogStatusRunning, time.Now(), + ).Error) + + resetPendingTasks(db) + + var status string + require.NoError(t, db.Raw( + `SELECT status FROM sync_logs WHERE id = ?`, "sync-lite", + ).Row().Scan(&status)) + assert.Equal(t, types.SyncLogStatusFailed, status) +} + +func TestStuckKnowledgeParseQuery_ReuseAfterFindDoesNotBreakUpdate(t *testing.T) { + db := setupResetPendingDB(t) + stale := time.Now().Add(-2 * time.Hour) + require.NoError(t, db.Exec( + `INSERT INTO knowledges (id, parse_status, updated_at) VALUES (?, ?, ?)`, + "k-reuse", types.ParseStatusProcessing, stale, + ).Error) + + distributed := true + staleCutoff := time.Now().Add(-resetPendingStaleWindow) + + var rows []types.Knowledge + q := stuckKnowledgeParseQuery(db, distributed, staleCutoff) + require.NoError(t, q.Select("id").Find(&rows).Error) + require.Len(t, rows, 1) + + result := stuckKnowledgeParseQuery(db, distributed, staleCutoff).Updates(map[string]interface{}{ + "parse_status": types.ParseStatusFailed, + }) + require.NoError(t, result.Error) + assert.Equal(t, int64(1), result.RowsAffected) +}