fix(im): isolate IM sessions per agent and recover from deleted sessions

Three bugs fixed:

1. Session cross-contamination (#1066): resolveUserSession and
   resolveThreadSession looked up ChannelSession by (platform, user_id,
   chat_id, tenant_id) without agent_id. When the same user talked to
   two bots bound to different agents under the same tenant, they shared
   a session — mixing knowledge base context. Add agent_id to all four
   WHERE clauses and update the DB unique indexes to match.

2. Swapped parameters in resolveThreadSession: the SQL expected
   (chat_id, thread_id) but Go args passed (threadID, msg.ChatID).

3. Orphaned ChannelSession crash (#1046): deleting a session from the
   WeKnora UI soft-deletes it (GORM), but the ChannelSession row
   survives because soft-delete doesn't trigger SQL ON DELETE CASCADE.
   Subsequent IM messages hit "record not found" on GetSession and the
   bot becomes permanently unresponsive. Now detect this case, recycle
   the stale mapping, and transparently create a fresh session.
This commit is contained in:
wizardchen
2026-04-28 20:46:47 +08:00
committed by lyingbug
parent a4c0832007
commit c578fdbad6
3 changed files with 64 additions and 11 deletions

View File

@@ -3,6 +3,7 @@ package im
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"mime/multipart"
@@ -925,7 +926,28 @@ func (s *Service) HandleMessage(ctx context.Context, msg *IncomingMessage, chann
// 4. Get the WeKnora session
session, err := s.sessionService.GetSession(sessionCtx, channelSession.SessionID)
if err != nil {
return fmt.Errorf("get session: %w", err)
// The underlying session may have been deleted from the UI while the
// ChannelSession mapping still exists (GORM soft-delete does not trigger
// SQL ON DELETE CASCADE). Recover by soft-deleting the stale mapping and
// re-creating a fresh session so the IM bot doesn't become permanently
// unresponsive. (fixes #1046)
if errors.Is(err, gorm.ErrRecordNotFound) {
logger.Warnf(ctx, "[IM] Session %s not found (deleted?), recycling stale channel session %s",
channelSession.SessionID, channelSession.ID)
if delErr := s.db.Delete(&ChannelSession{}, "id = ?", channelSession.ID).Error; delErr != nil {
logger.Warnf(ctx, "[IM] Failed to delete stale channel session %s: %v", channelSession.ID, delErr)
}
channelSession, err = s.resolveSession(sessionCtx, msg, tenantID, agentID, channelID, channel.SessionMode)
if err != nil {
return fmt.Errorf("resolve session (retry): %w", err)
}
session, err = s.sessionService.GetSession(sessionCtx, channelSession.SessionID)
if err != nil {
return fmt.Errorf("get session (retry): %w", err)
}
} else {
return fmt.Errorf("get session: %w", err)
}
}
// 5. Enqueue the QA request into the bounded worker pool.
@@ -1195,12 +1217,12 @@ func (s *Service) resolveSession(ctx context.Context, msg *IncomingMessage, tena
}
}
// resolveUserSession finds or creates a ChannelSession keyed by (platform, user_id, chat_id, tenant_id).
// resolveUserSession finds or creates a ChannelSession keyed by (platform, user_id, chat_id, tenant_id, agent_id).
// This is the original session resolution strategy.
func (s *Service) resolveUserSession(ctx context.Context, msg *IncomingMessage, tenantID uint64, agentID string, imChannelID string) (*ChannelSession, error) {
var cs ChannelSession
result := s.db.Where("platform = ? AND user_id = ? AND chat_id = ? AND tenant_id = ? AND deleted_at IS NULL",
string(msg.Platform), msg.UserID, msg.ChatID, tenantID).
result := s.db.Where("platform = ? AND user_id = ? AND chat_id = ? AND tenant_id = ? AND agent_id = ? AND deleted_at IS NULL",
string(msg.Platform), msg.UserID, msg.ChatID, tenantID, agentID).
First(&cs)
if result.Error == nil {
@@ -1244,8 +1266,8 @@ func (s *Service) resolveUserSession(ctx context.Context, msg *IncomingMessage,
logger.Warnf(ctx, "[IM] Failed to clean up orphaned session %s: %v", createdSession.ID, delErr)
}
var existing ChannelSession
if findErr := s.db.Where("platform = ? AND user_id = ? AND chat_id = ? AND tenant_id = ? AND deleted_at IS NULL",
string(msg.Platform), msg.UserID, msg.ChatID, tenantID).
if findErr := s.db.Where("platform = ? AND user_id = ? AND chat_id = ? AND tenant_id = ? AND agent_id = ? AND deleted_at IS NULL",
string(msg.Platform), msg.UserID, msg.ChatID, tenantID, agentID).
First(&existing).Error; findErr != nil {
return nil, fmt.Errorf("create channel session: %w (lookup fallback: %v)", err, findErr)
}
@@ -1258,7 +1280,7 @@ func (s *Service) resolveUserSession(ctx context.Context, msg *IncomingMessage,
return &cs, nil
}
// resolveThreadSession finds or creates a ChannelSession keyed by (platform, chat_id, thread_id, tenant_id).
// resolveThreadSession finds or creates a ChannelSession keyed by (platform, chat_id, thread_id, tenant_id, agent_id).
// In thread mode, each message thread gets its own session. Multiple users in the
// same thread share the same session. Top-level messages use their own ID as
// ThreadID, creating a new session per top-level message.
@@ -1274,8 +1296,8 @@ func (s *Service) resolveThreadSession(ctx context.Context, msg *IncomingMessage
var cs ChannelSession
result := s.db.Where(
"platform = ? AND chat_id = ? AND thread_id = ? AND tenant_id = ? AND deleted_at IS NULL",
string(msg.Platform), threadID, msg.ChatID, tenantID,
"platform = ? AND chat_id = ? AND thread_id = ? AND tenant_id = ? AND agent_id = ? AND deleted_at IS NULL",
string(msg.Platform), msg.ChatID, threadID, tenantID, agentID,
).First(&cs)
if result.Error == nil {
@@ -1322,8 +1344,8 @@ func (s *Service) resolveThreadSession(ctx context.Context, msg *IncomingMessage
}
var existing ChannelSession
if findErr := s.db.Where(
"platform = ? AND chat_id = ? AND thread_id = ? AND tenant_id = ? AND deleted_at IS NULL",
string(msg.Platform), msg.ChatID, threadID, tenantID,
"platform = ? AND chat_id = ? AND thread_id = ? AND tenant_id = ? AND agent_id = ? AND deleted_at IS NULL",
string(msg.Platform), msg.ChatID, threadID, tenantID, agentID,
).First(&existing).Error; findErr != nil {
return nil, fmt.Errorf("create thread session: %w (lookup fallback: %v)", err, findErr)
}

View File

@@ -0,0 +1,12 @@
-- Rollback: 000038_im_session_agent_id
-- Restores the original indexes without agent_id.
DROP INDEX IF EXISTS idx_channel_lookup;
CREATE UNIQUE INDEX idx_channel_lookup
ON im_channel_sessions (platform, user_id, chat_id, tenant_id)
WHERE deleted_at IS NULL;
DROP INDEX IF EXISTS idx_channel_thread_lookup;
CREATE UNIQUE INDEX idx_channel_thread_lookup
ON im_channel_sessions (platform, chat_id, thread_id, tenant_id)
WHERE deleted_at IS NULL AND thread_id != '';

View File

@@ -0,0 +1,19 @@
-- Migration: 000038_im_session_agent_id
-- Description: Add agent_id to IM channel session unique indexes to isolate sessions per agent.
-- Fixes: GitHub #1066 (cross-agent session contamination)
DO $$ BEGIN RAISE NOTICE '[Migration 000038] Adding agent_id to IM channel session indexes'; END $$;
-- 1. User-mode lookup: include agent_id so the same user talking to
-- different agents gets separate sessions.
DROP INDEX IF EXISTS idx_channel_lookup;
CREATE UNIQUE INDEX idx_channel_lookup
ON im_channel_sessions (platform, user_id, chat_id, tenant_id, agent_id)
WHERE deleted_at IS NULL;
-- 2. Thread-mode lookup: same fix for thread-based sessions.
DROP INDEX IF EXISTS idx_channel_thread_lookup;
CREATE UNIQUE INDEX idx_channel_thread_lookup
ON im_channel_sessions (platform, chat_id, thread_id, tenant_id, agent_id)
WHERE deleted_at IS NULL AND thread_id != '';
DO $$ BEGIN RAISE NOTICE '[Migration 000038] IM session agent_id indexes updated'; END $$;