fix(container): correct resetPendingTasks SQL on startup

Split knowledge list/update queries to avoid GORM UPDATE...FROM
duplicate-table errors after Find, and use sync_logs started_at/
finished_at column names instead of start_time/end_time.
This commit is contained in:
wizardchen
2026-05-29 11:52:20 +08:00
committed by lyingbug
parent 7d1dfc78b7
commit 19a1b15106
3 changed files with 282 additions and 104 deletions

View File

@@ -635,110 +635,6 @@ func syncSequences(db *gorm.DB) {
}
}
// resetPendingTasks resets the state of any knowledge items or sync logs stuck in processing
// due to an unexpected application restart.
//
// In Lite mode (no REDIS_ADDR) every queued task lived in process memory, so
// any "processing" row at startup is definitively orphaned and must be marked
// failed wholesale. In distributed mode (Asynq on Redis) the active queue
// survives restart, but the *currently executing* task on the dead instance
// is lost — Asynq won't reschedule it until at-least-once retry kicks in,
// which can take minutes or never (e.g. if the deadline has passed). To bound
// the worst case we mark only "long-stale" rows failed: anything that hasn't
// been touched for 30 minutes is well past any reasonable in-flight window.
// Newer rows are left alone so we don't race a peer instance that's mid-process.
func resetPendingTasks(db *gorm.DB) {
distributed := os.Getenv("REDIS_ADDR") != ""
ctx := context.Background()
spanRepo := repository.NewKnowledgeSpanRepository(db)
knowledgeQuery := db.Model(&types.Knowledge{}).
Where("parse_status IN ?", []string{
types.ParseStatusPending,
types.ParseStatusProcessing,
types.ParseStatusFinalizing,
types.ParseStatusDeleting,
})
summaryQuery := db.Model(&types.Knowledge{}).
Where("summary_status IN ?", []string{types.SummaryStatusPending, types.SummaryStatusProcessing})
syncQuery := db.Model(&types.SyncLog{}).
Where("status IN ?", []string{types.SyncLogStatusRunning, "pending"})
if distributed {
// 30-minute stale threshold — comfortably longer than any individual
// stage's expected duration (DocReader 30m, embedding seconds, etc.)
// while short enough that operators don't wait hours after a crash.
staleCutoff := time.Now().Add(-30 * time.Minute)
knowledgeQuery = knowledgeQuery.Where("updated_at < ?", staleCutoff)
summaryQuery = summaryQuery.Where("updated_at < ?", staleCutoff)
syncQuery = syncQuery.Where("start_time < ?", staleCutoff)
}
// Cancel orphaned trace spans for knowledge rows we are about to mark
// failed. resetPendingTasks does not touch asynq queues; this only
// prevents the UI from showing duplicate running postprocess.*
// subspans when a later retry also opens fresh spans.
var stuckKnowledge []types.Knowledge
if err := knowledgeQuery.Select("id").Find(&stuckKnowledge).Error; err != nil {
logger.Warnf(ctx, "resetPendingTasks: list stuck knowledge failed: %v", err)
} else {
for _, k := range stuckKnowledge {
attempt, err := spanRepo.LatestAttempt(ctx, k.ID)
if err != nil || attempt <= 0 {
continue
}
if n, err := spanRepo.CancelAllOpenSpans(ctx, k.ID, attempt,
"SERVER_RESTART", "task interrupted due to application restart"); err != nil {
logger.Warnf(ctx, "resetPendingTasks: cancel spans for %s failed: %v", k.ID, err)
} else if n > 0 {
logger.Infof(ctx, "resetPendingTasks: cancelled %d open span(s) for knowledge %s attempt %d",
n, k.ID, attempt)
}
}
}
// 1. Reset knowledge parsing tasks (including finalizing rows whose
// enrichment subtasks were lost with the process).
result := knowledgeQuery.Updates(map[string]interface{}{
"parse_status": types.ParseStatusFailed,
"error_message": "Task interrupted due to application restart",
"pending_subtasks_count": 0,
})
if result.Error != nil {
logger.Warnf(context.Background(), "Failed to reset pending knowledge tasks: %v", result.Error)
} else if result.RowsAffected > 0 {
logger.Infof(context.Background(),
"Reset %d stuck knowledge parsing tasks to failed state (distributed=%v)",
result.RowsAffected, distributed)
}
// 2. Reset knowledge summary tasks
resultSummary := summaryQuery.Updates(map[string]interface{}{
"summary_status": types.SummaryStatusFailed,
})
if resultSummary.Error != nil {
logger.Warnf(context.Background(), "Failed to reset pending summary tasks: %v", resultSummary.Error)
} else if resultSummary.RowsAffected > 0 {
logger.Infof(context.Background(),
"Reset %d stuck summary generation tasks to failed state (distributed=%v)",
resultSummary.RowsAffected, distributed)
}
// 3. Reset data source sync tasks
resultSync := syncQuery.Updates(map[string]interface{}{
"status": types.SyncLogStatusFailed,
"error_message": "Sync interrupted due to application restart",
"end_time": time.Now(),
})
if resultSync.Error != nil {
logger.Warnf(context.Background(), "Failed to reset pending data source sync tasks: %v", resultSync.Error)
} else if resultSync.RowsAffected > 0 {
logger.Infof(context.Background(),
"Reset %d stuck data source sync tasks to failed state (distributed=%v)",
resultSync.RowsAffected, distributed)
}
}
// initFileService initializes file storage service
// Creates the appropriate file storage service based on configuration
// Supports multiple storage backends (MinIO, COS, local filesystem)

View File

@@ -0,0 +1,137 @@
package container
import (
"context"
"os"
"time"
"github.com/Tencent/WeKnora/internal/application/repository"
"github.com/Tencent/WeKnora/internal/logger"
"github.com/Tencent/WeKnora/internal/types"
"gorm.io/gorm"
)
const resetPendingStaleWindow = 30 * time.Minute
// resetPendingTasks resets the state of any knowledge items or sync logs stuck in processing
// due to an unexpected application restart.
//
// In Lite mode (no REDIS_ADDR) every queued task lived in process memory, so
// any "processing" row at startup is definitively orphaned and must be marked
// failed wholesale. In distributed mode (Asynq on Redis) the active queue
// survives restart, but the *currently executing* task on the dead instance
// is lost — Asynq won't reschedule it until at-least-once retry kicks in,
// which can take minutes or never (e.g. if the deadline has passed). To bound
// the worst case we mark only "long-stale" rows failed: anything that hasn't
// been touched for 30 minutes is well past any reasonable in-flight window.
// Newer rows are left alone so we don't race a peer instance that's mid-process.
func resetPendingTasks(db *gorm.DB) {
distributed := os.Getenv("REDIS_ADDR") != ""
ctx := context.Background()
spanRepo := repository.NewKnowledgeSpanRepository(db)
var staleCutoff time.Time
if distributed {
staleCutoff = time.Now().Add(-resetPendingStaleWindow)
}
// Cancel orphaned trace spans for knowledge rows we are about to mark
// failed. resetPendingTasks does not touch asynq queues; this only
// prevents the UI from showing duplicate running postprocess.*
// subspans when a later retry also opens fresh spans.
var stuckKnowledge []types.Knowledge
if err := stuckKnowledgeParseQuery(db, distributed, staleCutoff).
Select("id").Find(&stuckKnowledge).Error; err != nil {
logger.Warnf(ctx, "resetPendingTasks: list stuck knowledge failed: %v", err)
} else {
for _, k := range stuckKnowledge {
attempt, err := spanRepo.LatestAttempt(ctx, k.ID)
if err != nil || attempt <= 0 {
continue
}
if n, err := spanRepo.CancelAllOpenSpans(ctx, k.ID, attempt,
"SERVER_RESTART", "task interrupted due to application restart"); err != nil {
logger.Warnf(ctx, "resetPendingTasks: cancel spans for %s failed: %v", k.ID, err)
} else if n > 0 {
logger.Infof(ctx, "resetPendingTasks: cancelled %d open span(s) for knowledge %s attempt %d",
n, k.ID, attempt)
}
}
}
// 1. Reset knowledge parsing tasks (including finalizing rows whose
// enrichment subtasks were lost with the process).
// Fresh query — reusing the *gorm.DB chain after Find() makes GORM emit
// UPDATE ... FROM knowledges which PostgreSQL rejects (SQLSTATE 42712).
result := stuckKnowledgeParseQuery(db, distributed, staleCutoff).Updates(map[string]interface{}{
"parse_status": types.ParseStatusFailed,
"error_message": "Task interrupted due to application restart",
"pending_subtasks_count": 0,
})
if result.Error != nil {
logger.Warnf(context.Background(), "Failed to reset pending knowledge tasks: %v", result.Error)
} else if result.RowsAffected > 0 {
logger.Infof(context.Background(),
"Reset %d stuck knowledge parsing tasks to failed state (distributed=%v)",
result.RowsAffected, distributed)
}
// 2. Reset knowledge summary tasks
resultSummary := stuckKnowledgeSummaryQuery(db, distributed, staleCutoff).Updates(map[string]interface{}{
"summary_status": types.SummaryStatusFailed,
})
if resultSummary.Error != nil {
logger.Warnf(context.Background(), "Failed to reset pending summary tasks: %v", resultSummary.Error)
} else if resultSummary.RowsAffected > 0 {
logger.Infof(context.Background(),
"Reset %d stuck summary generation tasks to failed state (distributed=%v)",
resultSummary.RowsAffected, distributed)
}
// 3. Reset data source sync tasks
now := time.Now()
resultSync := stuckSyncLogQuery(db, distributed, staleCutoff).Updates(map[string]interface{}{
"status": types.SyncLogStatusFailed,
"error_message": "Sync interrupted due to application restart",
"finished_at": &now,
})
if resultSync.Error != nil {
logger.Warnf(context.Background(), "Failed to reset pending data source sync tasks: %v", resultSync.Error)
} else if resultSync.RowsAffected > 0 {
logger.Infof(context.Background(),
"Reset %d stuck data source sync tasks to failed state (distributed=%v)",
resultSync.RowsAffected, distributed)
}
}
func stuckKnowledgeParseQuery(db *gorm.DB, distributed bool, staleCutoff time.Time) *gorm.DB {
q := db.Model(&types.Knowledge{}).
Where("parse_status IN ?", []string{
types.ParseStatusPending,
types.ParseStatusProcessing,
types.ParseStatusFinalizing,
types.ParseStatusDeleting,
})
if distributed {
q = q.Where("updated_at < ?", staleCutoff)
}
return q
}
func stuckKnowledgeSummaryQuery(db *gorm.DB, distributed bool, staleCutoff time.Time) *gorm.DB {
q := db.Model(&types.Knowledge{}).
Where("summary_status IN ?", []string{types.SummaryStatusPending, types.SummaryStatusProcessing})
if distributed {
q = q.Where("updated_at < ?", staleCutoff)
}
return q
}
func stuckSyncLogQuery(db *gorm.DB, distributed bool, staleCutoff time.Time) *gorm.DB {
q := db.Model(&types.SyncLog{}).
Where("status = ?", types.SyncLogStatusRunning)
if distributed {
q = q.Where("started_at < ?", staleCutoff)
}
return q
}

View File

@@ -0,0 +1,145 @@
package container
import (
"os"
"testing"
"time"
"github.com/Tencent/WeKnora/internal/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
const resetPendingKnowledgeDDL = `
CREATE TABLE IF NOT EXISTS knowledges (
id VARCHAR(64) PRIMARY KEY,
parse_status VARCHAR(32) NOT NULL DEFAULT 'pending',
summary_status VARCHAR(32) NOT NULL DEFAULT 'none',
pending_subtasks_count INTEGER NOT NULL DEFAULT 0,
error_message TEXT,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
deleted_at DATETIME
);
`
const resetPendingSyncLogDDL = `
CREATE TABLE IF NOT EXISTS sync_logs (
id VARCHAR(64) PRIMARY KEY,
data_source_id VARCHAR(64) NOT NULL DEFAULT '',
tenant_id INTEGER NOT NULL DEFAULT 0,
status VARCHAR(32) NOT NULL,
started_at DATETIME,
finished_at DATETIME,
error_message TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
`
func setupResetPendingDB(t *testing.T) *gorm.DB {
t.Helper()
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
require.NoError(t, err)
require.NoError(t, db.Exec(resetPendingKnowledgeDDL).Error)
require.NoError(t, db.Exec(resetPendingSyncLogDDL).Error)
return db
}
func TestResetPendingTasks_KnowledgeFindThenUpdate(t *testing.T) {
db := setupResetPendingDB(t)
stale := time.Now().Add(-2 * time.Hour)
require.NoError(t, db.Exec(
`INSERT INTO knowledges (id, parse_status, updated_at) VALUES (?, ?, ?)`,
"k-stuck", types.ParseStatusProcessing, stale,
).Error)
t.Setenv("REDIS_ADDR", "redis:6379")
resetPendingTasks(db)
var status, errMsg string
require.NoError(t, db.Raw(
`SELECT parse_status, error_message FROM knowledges WHERE id = ?`, "k-stuck",
).Row().Scan(&status, &errMsg))
assert.Equal(t, types.ParseStatusFailed, status)
assert.Contains(t, errMsg, "application restart")
}
func TestResetPendingTasks_KnowledgeFreshInDistributedMode(t *testing.T) {
db := setupResetPendingDB(t)
fresh := time.Now().Add(-5 * time.Minute)
require.NoError(t, db.Exec(
`INSERT INTO knowledges (id, parse_status, updated_at) VALUES (?, ?, ?)`,
"k-fresh", types.ParseStatusProcessing, fresh,
).Error)
t.Setenv("REDIS_ADDR", "redis:6379")
resetPendingTasks(db)
var status string
require.NoError(t, db.Raw(
`SELECT parse_status FROM knowledges WHERE id = ?`, "k-fresh",
).Row().Scan(&status))
assert.Equal(t, types.ParseStatusProcessing, status)
}
func TestResetPendingTasks_SyncLogStaleRunning(t *testing.T) {
db := setupResetPendingDB(t)
stale := time.Now().Add(-2 * time.Hour)
require.NoError(t, db.Exec(
`INSERT INTO sync_logs (id, status, started_at) VALUES (?, ?, ?)`,
"sync-1", types.SyncLogStatusRunning, stale,
).Error)
t.Setenv("REDIS_ADDR", "redis:6379")
resetPendingTasks(db)
var status string
var finishedAt *time.Time
require.NoError(t, db.Raw(
`SELECT status, finished_at FROM sync_logs WHERE id = ?`, "sync-1",
).Row().Scan(&status, &finishedAt))
assert.Equal(t, types.SyncLogStatusFailed, status)
require.NotNil(t, finishedAt)
}
func TestResetPendingTasks_SyncLogLiteMode(t *testing.T) {
db := setupResetPendingDB(t)
os.Unsetenv("REDIS_ADDR")
require.NoError(t, db.Exec(
`INSERT INTO sync_logs (id, status, started_at) VALUES (?, ?, ?)`,
"sync-lite", types.SyncLogStatusRunning, time.Now(),
).Error)
resetPendingTasks(db)
var status string
require.NoError(t, db.Raw(
`SELECT status FROM sync_logs WHERE id = ?`, "sync-lite",
).Row().Scan(&status))
assert.Equal(t, types.SyncLogStatusFailed, status)
}
func TestStuckKnowledgeParseQuery_ReuseAfterFindDoesNotBreakUpdate(t *testing.T) {
db := setupResetPendingDB(t)
stale := time.Now().Add(-2 * time.Hour)
require.NoError(t, db.Exec(
`INSERT INTO knowledges (id, parse_status, updated_at) VALUES (?, ?, ?)`,
"k-reuse", types.ParseStatusProcessing, stale,
).Error)
distributed := true
staleCutoff := time.Now().Add(-resetPendingStaleWindow)
var rows []types.Knowledge
q := stuckKnowledgeParseQuery(db, distributed, staleCutoff)
require.NoError(t, q.Select("id").Find(&rows).Error)
require.Len(t, rows, 1)
result := stuckKnowledgeParseQuery(db, distributed, staleCutoff).Updates(map[string]interface{}{
"parse_status": types.ParseStatusFailed,
})
require.NoError(t, result.Error)
assert.Equal(t, int64(1), result.RowsAffected)
}