feat: implement IM channel management in agent editor

- Introduced a new IMChannelPanel component for managing WeCom and Feishu channels.
- Added CRUD operations for IM channels, including create, update, delete, and list functionalities.
- Enhanced the backend with new API endpoints for IM channel management.
- Updated documentation to reflect changes in IM integration and channel management.
- Improved localization support for new IM-related UI elements across multiple languages.
This commit is contained in:
wizardchen
2026-03-16 02:23:22 +08:00
parent e5a397faeb
commit 6159e8e3f7
18 changed files with 1571 additions and 518 deletions

View File

@@ -535,41 +535,3 @@ extract:
tenant:
# Enable cross-tenant access (can be enabled for intranet environments)
enable_cross_tenant_access: false
# IM integration configuration (optional)
# Uncomment and configure to enable WeCom/Feishu bot integration
#
# mode:
# "webhook" — (default) platform pushes events to your callback URL, requires public domain
# "websocket" — long connection, no public domain needed, SDK maintains persistent connection
im:
wecom:
enabled: true
tenant_id: 10000 # WeKnora tenant ID to use
agent_id: "" # Default agent ID (optional)
knowledge_base_ids:
- "2f046c68-869c-460f-866a-b149ac9a5195" # Default knowledge bases (optional)
mode: "websocket" # "webhook" or "websocket"
output_mode: "stream" # "stream" (default, real-time) or "full" (wait for complete answer)
# --- websocket mode (智能机器人长连接) ---
bot_id: "${WECOM_BOT_ID}" # 智能机器人 Bot ID
bot_secret: "${WECOM_BOT_SECRET}" # 智能机器人 Secret
# --- webhook mode (自建应用回调) ---
# corp_id: "${WECOM_CORP_ID}" # 企业微信 Corp ID
# agent_secret: "${WECOM_AGENT_SECRET}" # 应用 Secret
# token: "${WECOM_TOKEN}" # 回调 Token
# encoding_aes_key: "${WECOM_AES_KEY}" # 回调 EncodingAESKey
# corp_agent_id: 1000001 # 应用 AgentID
feishu:
enabled: true
tenant_id: 10000 # WeKnora tenant ID to use
agent_id: "" # Default agent ID (optional)
knowledge_base_ids: # Default knowledge bases
- "2f046c68-869c-460f-866a-b149ac9a5195"
mode: "websocket" # "webhook" or "websocket"
output_mode: "stream" # "stream" (default, real-time) or "full" (wait for complete answer)
app_id: "${FEISHU_APP_ID}" # 飞书 App ID
app_secret: "${FEISHU_APP_SECRET}" # 飞书 App Secret
# --- 以下仅 webhook 模式需要websocket 模式可留空 ---
# verification_token: "${FEISHU_TOKEN}" # 事件订阅 Verification Token (webhook only)
# encrypt_key: "${FEISHU_ENCRYPT_KEY}" # 事件订阅 Encrypt Key (webhook only)

View File

@@ -38,7 +38,6 @@ services:
volumes:
- data-files:/data/files
- docreader-tmp:/tmp/docreader:ro
# Mount custom config file (required for IM integration)
- ./config/config.yaml:/app/config/config.yaml
# Optional: mount custom skills directory (allows adding skills without rebuilding image)
- ./skills/preloaded:/app/skills/preloaded
@@ -114,11 +113,6 @@ services:
- SYSTEM_AES_KEY=${SYSTEM_AES_KEY:-}
- CONCURRENCY_POOL_SIZE=${CONCURRENCY_POOL_SIZE:-5}
- JWT_SECRET=${JWT_SECRET:-}
# IM integration
- FEISHU_APP_ID=${FEISHU_APP_ID:-}
- FEISHU_APP_SECRET=${FEISHU_APP_SECRET:-}
- WECOM_BOT_ID=${WECOM_BOT_ID:-}
- WECOM_BOT_SECRET=${WECOM_BOT_SECRET:-}
# File size limit (in MB)
- MAX_FILE_SIZE_MB=${MAX_FILE_SIZE_MB:-50}
# Agent Skills Sandbox

View File

@@ -2,13 +2,17 @@
WeKnora 的 IM 集成模块将企业即时通讯平台(企业微信、飞书)接入 WeKnora 知识问答管道,支持在 IM 中直接向 AI 提问并获得实时流式回答。
IM 渠道绑定到 Agent一个 Agent 可接入多个 IM 渠道,所有配置通过前端 Agent 编辑器管理,存储在数据库中。
## 目录
- [快速接入指南](#快速接入指南)
- [企业微信接入](#企业微信接入)
- [飞书接入](#飞书接入)
- [配置参考](#配置参考)
- [前端管理](#前端管理)
- [架构总览](#架构总览)
- [数据模型](#数据模型)
- [API 端点](#api-端点)
- [核心概念](#核心概念)
- [数据流](#数据流)
- [接口定义](#接口定义)
@@ -23,6 +27,12 @@ WeKnora 的 IM 集成模块将企业即时通讯平台(企业微信、飞书
## 快速接入指南
### 前置条件
- WeKnora 已部署并运行
- 已创建至少一个 Agent自定义智能体
- Agent 已配置好模型和知识库
### 企业微信接入
企业微信提供两种接入模式,根据你的应用类型选择:
@@ -33,30 +43,27 @@ WeKnora 的 IM 集成模块将企业即时通讯平台(企业微信、飞书
**第一步:创建智能机器人**
1. 登录 [企业微信工作台](确认已升级到最新版企业微信) → **智能机器人****创建机器人****手动创建****切换API模式创建****选择使用长连接**
1. 登录 [企业微信工作台](确认已升级到最新版企业微信) → **智能机器人****创建机器人****手动创建****切换API模式创建****选择"使用长连接"**
2. 创建完成后,在机器人详情页获取:
- **BotID** — 机器人唯一标识
- **BotSecret** — 机器人密钥(点击重置可重新生成)
**第二步:配置 WeKnora**
**第二步: WeKnora 中添加 IM 渠道**
编辑 `config/config.yaml`
1. 进入 Agent 编辑器 → 左侧导航选择 **IM 集成** 标签页
2. 点击 **添加渠道**
3. 填写配置:
- **平台**:选择「企业微信」
- **渠道名称**:自定义名称,方便辨识(如「客服机器人」)
- **接入模式**选择「WebSocket」
- **输出模式**:选择「流式输出」(推荐)
- **Bot ID**:填入从企业微信获取的 BotID
- **Bot Secret**:填入从企业微信获取的 BotSecret
4. 点击保存
```yaml
im:
wecom:
enabled: true
mode: "websocket" # 使用 WebSocket 长连接
output_mode: "stream" # 流式输出(推荐)
tenant_id: ["你的租户ID"] # WeKnora 租户 ID
knowledge_base_ids: ["你的知识库ID"] # 关联的知识库
bot_id: "${WECOM_BOT_ID}" # 机器人 ID
bot_secret: "${WECOM_BOT_SECRET}" # 机器人 Secret
```
**第三步:验证**
**第三步:启动服务**
启动 WeKnora 后,程序会自动建立到企业微信的 WebSocket 长连接(`wss://openws.work.weixin.qq.com`),无需额外配置回调地址。日志中出现以下内容表示连接成功:
保存后 WeKnora 会自动建立到企业微信的 WebSocket 长连接。日志中出现以下内容表示连接成功:
```
[IM] WeCom WebSocket connecting (bot_id=xxx)...
@@ -78,31 +85,29 @@ im:
- **AgentID** — 应用详情页中的 AgentId整数
- **Secret** — 应用详情页中的 Secret
**第二步:配置接收消息**
**第二步:在 WeKnora 中添加 IM 渠道**
1. 进入 Agent 编辑器 → **IM 集成** 标签页 → **添加渠道**
2. 填写配置:
- **平台**:选择「企业微信」
- **接入模式**选择「Webhook」
- **输出模式**:选择「流式输出」
- **Corp ID**:企业 ID
- **Agent Secret**:应用 Secret
- **Token**:自定义或随机生成(记录下来)
- **EncodingAESKey**:自定义或随机生成(记录下来)
- **Corp Agent ID**:应用 AgentID整数
3. 保存后,渠道卡片上会显示**回调地址**,格式为 `https://你的域名/api/v1/im/callback/{channel_id}`
4. 复制该回调地址
**第三步:配置企业微信接收消息**
1. 在应用详情页 → **接收消息****设置 API 接收**
2. 填写:
- **URL**`https://你的域名/api/v1/im/callback/wecom`
- **Token**自定义或随机生成(记录下来)
- **EncodingAESKey**点击随机生成(记录下来)
3. 点击保存,企业微信会发送 GET 验证请求WeKnora 需先启动以响应验证
**第三步:配置 WeKnora**
```yaml
im:
wecom:
enabled: true
mode: "webhook" # 使用 Webhook 回调
output_mode: "stream"
tenant_id: ["你的租户ID"]
knowledge_base_ids: ["你的知识库ID"]
corp_id: "${WECOM_CORP_ID}" # 企业 ID
agent_secret: "${WECOM_AGENT_SECRET}" # 应用 Secret
token: "${WECOM_TOKEN}" # 接收消息 Token
encoding_aes_key: "${WECOM_AES_KEY}" # 接收消息 EncodingAESKey
corp_agent_id: 1000001 # 应用 AgentID整数
```
- **URL**粘贴上一步复制的回调地址
- **Token**填入在 WeKnora 中设置的 Token
- **EncodingAESKey**填入在 WeKnora 中设置的 EncodingAESKey
3. 点击保存,企业微信会发送 GET 验证请求WeKnora 会自动响应
**第四步:配置可信域名(可选)**
@@ -141,20 +146,16 @@ im:
**版本管理与发布** 中创建版本并提交审核。审核通过后用户才能与机器人交互。
**第四步:配置 WeKnora**
**第四步: WeKnora 中添加 IM 渠道**
```yaml
im:
feishu:
enabled: true
mode: "websocket" # 使用长连接
output_mode: "stream" # 流式输出(推荐,需开启 cardkit:card 权限)
tenant_id: ["你的租户ID"]
knowledge_base_ids: ["你的知识库ID"]
app_id: "${FEISHU_APP_ID}" # App ID
app_secret: "${FEISHU_APP_SECRET}" # App Secret
# WebSocket 模式无需 verification_token 和 encrypt_key
```
1. 进入 Agent 编辑器 → **IM 集成****添加渠道**
2. 填写配置:
- **平台**:选择「飞书」
- **接入模式**选择「WebSocket」
- **输出模式**:选择「流式输出」(需开启 cardkit:card 权限)
- **App ID**:填入从飞书获取的 App ID
- **App Secret**:填入从飞书获取的 App Secret
3. 保存
启动后日志出现以下内容表示连接成功:
@@ -170,40 +171,26 @@ im:
**前置步骤**同上(创建应用、开通权限),额外需要:
1. **配置事件订阅**
-**事件与回调****事件配置** 中,选择请求方式为 **将事件发送到开发者服务器**
- **请求地址**`https://你的域名/api/v1/im/callback/feishu`
- 记录页面上的 **Encrypt Key****Verification Token**
- 添加事件 `im.message.receive_v1`
2. 点击保存时飞书会发送 URL 验证请求challengeWeKnora 需先启动以响应
**第一步:在 WeKnora 中添加 IM 渠道**
```yaml
im:
feishu:
enabled: true
mode: "webhook"
output_mode: "stream"
tenant_id: ["你的租户ID"]
knowledge_base_ids: ["你的知识库ID"]
app_id: "${FEISHU_APP_ID}"
app_secret: "${FEISHU_APP_SECRET}"
verification_token: "${FEISHU_TOKEN}" # 事件订阅页面的 Verification Token
encrypt_key: "${FEISHU_ENCRYPT_KEY}" # 事件订阅页面的 Encrypt Key
```
1. 进入 Agent 编辑器 → **IM 集成****添加渠道**
2. 填写配置:
- **平台**:选择「飞书」
- **接入模式**选择「Webhook」
- **App ID** / **App Secret**
- **Verification Token**:从飞书事件订阅页面获取
- **Encrypt Key**:从飞书事件订阅页面获取
3. 保存后,复制渠道卡片上显示的**回调地址**
**第二步:配置飞书事件订阅**
1.**事件与回调****事件配置** 中,选择请求方式为 **将事件发送到开发者服务器**
2. **请求地址**:粘贴从 WeKnora 复制的回调地址
3. 添加事件 `im.message.receive_v1`
4. 点击保存时飞书会发送 URL 验证请求challengeWeKnora 会自动响应
---
### 通用配置项说明
| 配置项 | 类型 | 说明 |
|--------|------|------|
| `enabled` | bool | 是否启用该平台 |
| `tenant_id` | uint64 | 关联的 WeKnora 租户 ID消息处理在该租户上下文中执行 |
| `agent_id` | string | 指定自定义 Agent可选为空则使用默认知识库问答 |
| `knowledge_base_ids` | []string | 默认关联的知识库 ID 列表 |
| `mode` | string | 连接模式:`"websocket"` (长连接) 或 `"webhook"` (HTTP 回调) |
| `output_mode` | string | 输出模式:`"stream"` (实时流式) 或 `"full"` (等待完整回答后一次发送) |
### 模式选择指南
| | Webhook | WebSocket |
@@ -218,45 +205,27 @@ im:
---
## 配置参考
## 前端管理
完整的 `config/config.yaml``im` 段配置:
IM 渠道在 Agent 编辑器的 **IM 集成** 标签页中管理(仅编辑模式可见,创建 Agent 时不显示)。
```yaml
im:
wecom:
enabled: true # 是否启用企业微信
tenant_id: 10000 # 对应的 WeKnora 租户 ID
agent_id: "" # 指定自定义 Agent (可选)
knowledge_base_ids: [] # 默认知识库 ID 列表
mode: "websocket" # 连接模式: "webhook" | "websocket"
output_mode: "stream" # 输出模式: "stream" (流式) | "full" (完整)
### 渠道列表
# ── Webhook 模式配置 (自建应用) ──
corp_id: "${WECOM_CORP_ID}" # 企业 ID
agent_secret: "${WECOM_AGENT_SECRET}" # 应用 Secret
token: "${WECOM_TOKEN}" # 接收消息的 Token
encoding_aes_key: "${WECOM_AES_KEY}" # 接收消息的 EncodingAESKey
corp_agent_id: 1000001 # 应用 AgentID整数
每个渠道以卡片形式展示,包含:
- **平台标识**:企业微信(绿色)/ 飞书(蓝色)
- **渠道名称**:用户自定义
- **接入模式**WebSocket / Webhook
- **输出模式**:流式输出 / 完整输出
- **启用开关**:可即时启用/停用渠道
- **回调地址**Webhook 模式下显示,可一键复制
- **编辑/删除**:管理渠道配置
# ── WebSocket 模式配置 (智能机器人) ──
bot_id: "${WECOM_BOT_ID}" # 机器人 ID
bot_secret: "${WECOM_BOT_SECRET}" # 机器人 Secret
### 渠道操作
feishu:
enabled: true # 是否启用飞书
tenant_id: 10000
agent_id: ""
knowledge_base_ids: []
mode: "websocket" # "websocket" | "webhook"
output_mode: "stream" # "stream" | "full"
app_id: "${FEISHU_APP_ID}" # 应用 App ID
app_secret: "${FEISHU_APP_SECRET}" # 应用 App Secret
verification_token: "${FEISHU_TOKEN}" # 事件验证 Token (仅 Webhook 模式)
encrypt_key: "${FEISHU_ENCRYPT_KEY}" # 加密密钥 (仅 Webhook 模式)
```
> **提示:** 配置值支持环境变量引用(`${VAR_NAME}` 语法),建议将密钥类配置通过环境变量注入,避免明文提交到代码仓库。
- **添加渠道**:选择平台 → 填写凭证 → 选择模式 → 保存
- **编辑渠道**:可修改名称、模式、输出模式和凭证(平台不可更改)
- **启用/停用**:通过开关即时切换,停用的渠道不会处理消息
- **删除渠道**:删除后不可恢复
---
@@ -280,15 +249,17 @@ im:
│ ─────┼───────────────┼────────────────────────────────────────────── │
│ └───────┬───────┘ │
│ ▼ │
┌──────────────┐
im.Service 服务编排层 │
· 消息去重 (MessageID + TTL)
│ │ · 会话映射 (ChannelSession) │
│ · 流式/全量路由
└──────┬───────┘ · QA 管道调度
│ ─────────────────────────────────────────────────────────────────
│ ┌─────────────────────┐ │
im.Service 服务编排层
· IM 渠道管理 (CRUD)
│ ┌───────────────┐ │ · Adapter Factory (动态创建) │
im_channels │ │ · 消息去重 (MessageID + TTL)
(DB表) · 会话映射 (ChannelSession)
└───────────────┘ │ · 流式/全量路由
────────────────────┘ · QA 管道调度
│ ───────────┼────────────────────────────────────────────────────── │
│ ▼ │
│ ┌─────────────────────────────────────┐ │
│ │ WeKnora Core (QA Pipeline) │ 核心层 │
│ │ SessionService · MessageService │ │
@@ -302,13 +273,84 @@ im:
| 模式 | 用途 |
|------|------|
| Adapter Pattern | 统一不同 IM 平台的差异,每个平台实现 `im.Adapter` 接口 |
| Factory Pattern | 通过 `AdapterFactory` 从数据库渠道配置动态创建 Adapter 实例 |
| Strategy Pattern | `StreamSender` 可选接口,让平台按需支持流式输出 |
| Event-Driven | 通过 `EventBus` 解耦 QA 管道与 IM 输出,支持实时块推送 |
---
## 数据模型
### im_channels 表
IM 渠道配置存储在 `im_channels` 表中,绑定到 Agent
```sql
CREATE TABLE im_channels (
id VARCHAR(36) PRIMARY KEY,
tenant_id BIGINT NOT NULL,
agent_id VARCHAR(36) NOT NULL, -- 绑定的 Agent ID
platform VARCHAR(20) NOT NULL, -- 'wecom' | 'feishu'
name VARCHAR(255) NOT NULL DEFAULT '',
enabled BOOLEAN NOT NULL DEFAULT true,
mode VARCHAR(20) NOT NULL DEFAULT 'websocket', -- 'webhook' | 'websocket'
output_mode VARCHAR(20) NOT NULL DEFAULT 'stream', -- 'stream' | 'full'
credentials JSONB NOT NULL DEFAULT '{}', -- 平台凭证
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMPTZ
);
```
**credentials 字段结构:**
| 平台 | 模式 | 字段 |
|------|------|------|
| 企业微信 | WebSocket | `bot_id`, `bot_secret` |
| 企业微信 | Webhook | `corp_id`, `agent_secret`, `token`, `encoding_aes_key`, `corp_agent_id` |
| 飞书 | WebSocket | `app_id`, `app_secret` |
| 飞书 | Webhook | `app_id`, `app_secret`, `verification_token`, `encrypt_key` |
### im_channel_sessions 表
将 IM 渠道中的用户会话映射到 WeKnora 会话:
```
(im_channel_id, Platform, UserID, ChatID, TenantID) → SessionID
```
---
## API 端点
### IM 渠道管理 API需认证
| 方法 | 路径 | 说明 |
|------|------|------|
| POST | `/api/v1/agents/:id/im-channels` | 创建 IM 渠道 |
| GET | `/api/v1/agents/:id/im-channels` | 列出 Agent 的所有 IM 渠道 |
| PUT | `/api/v1/im-channels/:id` | 更新 IM 渠道 |
| DELETE | `/api/v1/im-channels/:id` | 删除 IM 渠道 |
| POST | `/api/v1/im-channels/:id/toggle` | 启用/停用 IM 渠道 |
### IM 回调端点(无需认证,平台签名验证)
| 方法 | 路径 | 说明 |
|------|------|------|
| GET/POST | `/api/v1/im/callback/:channel_id` | 通用回调(根据 channel_id 自动路由到对应 Adapter |
> Webhook 模式下,每个渠道有唯一的回调地址 `/api/v1/im/callback/{channel_id}`,在前端渠道卡片上可一键复制。
---
## 核心概念
### IMChannel — IM 渠道
每个 IM 渠道代表一个 IM 平台机器人与 WeKnora Agent 的绑定关系。一个 Agent 可以绑定多个渠道(如同时接入企业微信和飞书),同一平台也可以创建多个渠道(如不同的企业微信机器人)。
渠道启动时Service 通过 `AdapterFactory` 根据平台类型和凭证动态创建对应的 Adapter 实例。
### IncomingMessage — 统一入站消息
所有平台的消息在解密、解析后被归一化为 `IncomingMessage`,抹平平台差异:
@@ -339,13 +381,7 @@ type ReplyMessage struct {
### ChannelSession — 会话映射
将 IM 渠道 (平台 + 用户 + 群聊) 映射到 WeKnora 会话,实现对话上下文持续性
```
(Platform, UserID, ChatID, TenantID) → SessionID
```
首次交互自动创建,后续消息复用同一会话。存储于 `im_channel_sessions` 表。
将 IM 渠道 (渠道 ID + 用户 + 群聊) 映射到 WeKnora 会话,实现对话上下文持续性。首次交互自动创建,后续消息复用同一会话。存储于 `im_channel_sessions` 表。
---
@@ -358,20 +394,23 @@ type ReplyMessage struct {
┌─ HTTP Handler ──────────────────────────────────┐
│ 1. 签名验证 (VerifyCallback)
│ 2. URL 验证处理 (HandleURLVerification)
│ 3. 解密 + 解析 → IncomingMessage (ParseCallback)
│ 4. 立即返回 HTTP 200 (异步处理)
│ 1. 根据 channel_id 查找渠道配置
│ 2. 获取对应 Adapter
│ 3. 签名验证 (VerifyCallback)
│ 4. URL 验证处理 (HandleURLVerification)
│ 5. 解密 + 解析 → IncomingMessage (ParseCallback) │
│ 6. 立即返回 HTTP 200 (异步处理) │
└──────────────────────────┬──────────────────────-┘
│ goroutine
┌─ im.Service ────────────────────────────────────┐
│ 1. 去重检查 (MessageID, 5 分钟 TTL) │
│ 2. 内容长度校验 (≤ 4096 字符) │
│ 3. 解析/创建 ChannelSession
│ 4. 获取 WeKnora Session
│ 5. 解析自定义 Agent (可选)
│ 6. 判断流式/全量模式
│ 3. 从渠道配置获取 agent_id、tenant_id
│ 4. 解析/创建 ChannelSession │
│ 5. 获取 WeKnora Session
│ 6. 加载 Agent 配置(获取知识库等信息)
│ 7. 判断流式/全量模式 │
└───────────┬─────────────────────┬───────────────┘
│ │
流式模式 ▼ 全量模式 ▼
@@ -389,6 +428,29 @@ type ReplyMessage struct {
消息持久化 (user + assistant)
```
### 渠道生命周期
```
渠道创建/更新 (前端 UI)
┌─ im.Service ──────────────────────────┐
│ 1. 保存渠道配置到数据库 │
│ 2. 如果渠道已启用: │
│ a. AdapterFactory 创建 Adapter │
│ b. WebSocket 模式:建立长连接 │
│ c. Webhook 模式:注册回调处理 │
│ 3. 维护 channels map (channel_id → │
│ channelState{Channel, Adapter}) │
└────────────────────────────────────────┘
服务启动时:
LoadAndStartChannels() → 从 DB 加载所有 enabled 的渠道 → 逐个 StartChannel()
渠道停用/删除时:
StopChannel() → 取消 Adapter 上下文 → 从 map 移除
```
### 流式输出机制
流式模式通过 `EventBus` 实时收集 QA 管道产生的内容块,以 **300ms 间隔批量推送**,在延迟与 API 限频之间取得平衡:
@@ -443,7 +505,15 @@ type StreamSender interface {
}
```
实现此接口后Service 会自动路由到流式模式。可通过配置 `output_mode: "full"` 强制关闭。
实现此接口后Service 会自动路由到流式模式。渠道配置 `output_mode: "full"` 强制关闭。
### im.AdapterFactory — 适配器工厂
```go
type AdapterFactory func(ctx context.Context, channel *IMChannel, msgHandler func(*IncomingMessage)) (Adapter, CancelFunc, error)
```
每个平台注册一个工厂函数Service 在启动渠道时调用工厂创建 Adapter 实例。工厂函数根据渠道的 `mode``credentials` 决定创建哪种 Adapter。
---
@@ -458,7 +528,7 @@ type StreamSender interface {
适用于**自建应用**,需要公网可访问的回调地址。
```
企业微信服务器 ──HTTP POST──▶ /api/v1/im/callback/wecom
企业微信服务器 ──HTTP POST──▶ /api/v1/im/callback/{channel_id}
解密 (AES-128-CBC)
解析 XML → IncomingMessage
@@ -493,8 +563,8 @@ LongConnClient ══WebSocket══▶ wss://openws.work.weixin.qq.com
| 文件 | 职责 |
|------|------|
| `internal/im/wecom/adapter.go` | Webhook 模式回调解密、签名验证、REST API 回复、Token 缓存 |
| `internal/im/wecom/bot_adapter.go` | WebSocket 模式适配器壳,代理到 `LongConnClient` |
| `internal/im/wecom/webhook_adapter.go` | Webhook 模式回调解密、签名验证、REST API 回复、Token 缓存 |
| `internal/im/wecom/ws_adapter.go` | WebSocket 模式适配器壳,代理到 `LongConnClient` |
| `internal/im/wecom/longconn.go` | WebSocket 客户端:连接管理、心跳、帧协议、自动重连 |
---
@@ -506,7 +576,7 @@ LongConnClient ══WebSocket══▶ wss://openws.work.weixin.qq.com
#### Webhook 模式
```
飞书服务器 ──HTTP POST──▶ /api/v1/im/callback/feishu
飞书服务器 ──HTTP POST──▶ /api/v1/im/callback/{channel_id}
解密 (AES-256-CBC可选)
解析 JSON → IncomingMessage
@@ -574,15 +644,7 @@ EndStream:
| 空回答 | 回复 "抱歉,我暂时无法回答这个问题。" |
| WebSocket 断连 | 指数退避自动重连 |
| 平台重试 | MessageID 去重5 分钟内自动跳过 |
---
## API 端点
| 方法 | 路径 | 说明 |
|------|------|------|
| GET/POST | `/api/v1/im/callback/wecom` | 企业微信回调 (GET 用于 URL 验证) |
| POST | `/api/v1/im/callback/feishu` | 飞书回调 (URL 验证 + 消息事件) |
| 渠道启动失败 | 日志记录错误,不影响其他渠道 |
---
@@ -608,23 +670,35 @@ func (a *Adapter) HandleURLVerification(c *gin.Context) bool { /* URL 验证 */
如需流式输出,额外实现 `im.StreamSender`
### 2. 注册适配器
### 2. 注册适配器工厂
`internal/container/container.go``registerIMAdapters` 中初始化并注册
`internal/container/container.go``registerIMAdapterFactories` 中注册工厂函数
```go
if cfg.IM.DingTalk.Enabled {
adapter := dingtalk.NewAdapter(cfg.IM.DingTalk.AppKey, cfg.IM.DingTalk.AppSecret)
imService.RegisterAdapter(adapter)
}
imService.RegisterAdapterFactory("dingtalk", func(ctx context.Context, channel *im.IMChannel, msgHandler func(*im.IncomingMessage)) (im.Adapter, im.CancelFunc, error) {
creds := parseCredentials(channel.Credentials)
appKey := getString(creds, "app_key")
appSecret := getString(creds, "app_secret")
adapter := dingtalk.NewAdapter(appKey, appSecret)
// WebSocket 模式需要启动长连接
if channel.Mode == "websocket" {
cancelCtx, cancel := context.WithCancel(ctx)
go adapter.StartLongConn(cancelCtx, msgHandler)
return adapter, func() { cancel() }, nil
}
return adapter, func() {}, nil
})
```
### 3. 添加回调路由
### 3. 前端添加平台选项
`internal/router/router.go` 中注册 HTTP 端点
`IMChannelPanel.vue`
- 添加平台 radio 选项
- 添加该平台的凭证表单字段
```go
im.POST("/callback/dingtalk", imHandler.DingTalkCallback)
```
在 i18n 文件中添加平台名称翻译。
Service 层 (`im.Service`) 不需要任何修改 — 消息编排、会话管理、QA 调度、流式控制全部由 Service 统一处理。
Service 层 (`im.Service`) 不需要任何修改 — 渠道管理、消息编排、会话管理、QA 调度、流式控制全部由 Service 统一处理。

View File

@@ -175,3 +175,39 @@ export interface PlaceholdersResponse {
export function getPlaceholders() {
return get<{ data: PlaceholdersResponse }>('/api/v1/agents/placeholders');
}
// ===== IM渠道 =====
export interface IMChannel {
id: string;
tenant_id?: number;
agent_id: string;
platform: 'wecom' | 'feishu';
name: string;
enabled: boolean;
mode: 'webhook' | 'websocket';
output_mode: 'stream' | 'full';
credentials: Record<string, any>;
created_at?: string;
updated_at?: string;
}
export function listIMChannels(agentId: string) {
return get<{ data: IMChannel[] }>(`/api/v1/agents/${agentId}/im-channels`);
}
export function createIMChannel(agentId: string, data: Partial<IMChannel>) {
return post<{ data: IMChannel }>(`/api/v1/agents/${agentId}/im-channels`, data);
}
export function updateIMChannel(id: string, data: Partial<IMChannel>) {
return put<{ data: IMChannel }>(`/api/v1/im-channels/${id}`, data);
}
export function deleteIMChannel(id: string) {
return del<{ success: boolean }>(`/api/v1/im-channels/${id}`);
}
export function toggleIMChannel(id: string) {
return post<{ data: IMChannel }>(`/api/v1/im-channels/${id}/toggle`);
}

View File

@@ -0,0 +1,524 @@
<template>
<div class="section-content">
<!-- Channel list header -->
<div class="channels-section">
<div class="channels-header">
<span class="channels-title">{{ $t('agentEditor.im.addChannel') }}</span>
<span class="channels-count">{{ channels.length }}</span>
</div>
<div v-if="loading" class="channels-loading">
<t-loading size="small" />
<span>{{ $t('common.loading') }}</span>
</div>
<div v-else-if="channels.length === 0" class="channels-empty">
<t-icon name="chat-message" class="empty-icon" />
<span>{{ $t('agentEditor.im.empty') }}</span>
</div>
<div v-else class="channels-list">
<div v-for="channel in channels" :key="channel.id" class="channel-item">
<div class="channel-info">
<div class="channel-info-top">
<div class="channel-main">
<span class="platform-badge" :class="channel.platform">
{{ channel.platform === 'wecom' ? $t('agentEditor.im.wecom') : $t('agentEditor.im.feishu') }}
</span>
<span class="channel-name">{{ channel.name || $t('agentEditor.im.unnamed') }}</span>
</div>
</div>
<div class="channel-meta">
<span class="meta-tag">
<t-icon name="link" class="meta-icon" />
{{ channel.mode }}
</span>
<span class="meta-tag">
<t-icon name="play-circle" class="meta-icon" />
{{ channel.output_mode === 'stream' ? $t('agentEditor.im.outputStream') : $t('agentEditor.im.outputFull') }}
</span>
</div>
<div v-if="channel.mode === 'webhook'" class="callback-url-row">
<span class="url-label">{{ $t('agentEditor.im.callbackUrl') }}:</span>
<code class="url-value">{{ getCallbackUrl(channel) }}</code>
<t-button theme="default" size="small" variant="text" @click="copyUrl(channel)">
<t-icon name="file-copy" />
</t-button>
</div>
</div>
<div class="channel-actions">
<t-switch
:value="channel.enabled"
size="small"
@change="handleToggle(channel)"
/>
<t-button variant="text" theme="default" size="small" @click="editChannel(channel)">
<t-icon name="edit" />
</t-button>
<t-popconfirm :content="$t('agentEditor.im.deleteConfirm')" @confirm="handleDelete(channel.id)">
<t-button variant="text" theme="danger" size="small">
<t-icon name="delete" />
</t-button>
</t-popconfirm>
</div>
</div>
</div>
</div>
<!-- Add button -->
<t-button theme="default" variant="dashed" block @click="showCreateDialog = true" class="add-btn">
<t-icon name="add" />
{{ $t('agentEditor.im.addChannel') }}
</t-button>
<!-- Create/Edit dialog -->
<t-dialog
v-model:visible="showCreateDialog"
:header="editingChannel ? $t('agentEditor.im.editChannel') : $t('agentEditor.im.addChannel')"
:confirm-btn="$t('common.save')"
:cancel-btn="$t('common.cancel')"
@confirm="handleSave"
@close="resetForm"
width="560px"
>
<div class="dialog-form">
<!-- Platform -->
<div class="form-item">
<label class="form-label">{{ $t('agentEditor.im.platform') }}</label>
<t-radio-group v-model="formData.platform" :disabled="!!editingChannel">
<t-radio-button value="wecom">{{ $t('agentEditor.im.wecom') }}</t-radio-button>
<t-radio-button value="feishu">{{ $t('agentEditor.im.feishu') }}</t-radio-button>
</t-radio-group>
</div>
<!-- Name -->
<div class="form-item">
<label class="form-label">{{ $t('agentEditor.im.channelName') }}</label>
<t-input v-model="formData.name" :placeholder="$t('agentEditor.im.channelNamePlaceholder')" />
</div>
<!-- Mode -->
<div class="form-item">
<label class="form-label">{{ $t('agentEditor.im.mode') }}</label>
<t-radio-group v-model="formData.mode">
<t-radio-button value="websocket">WebSocket</t-radio-button>
<t-radio-button value="webhook">Webhook</t-radio-button>
</t-radio-group>
</div>
<!-- Output mode -->
<div class="form-item">
<label class="form-label">{{ $t('agentEditor.im.outputMode') }}</label>
<t-radio-group v-model="formData.output_mode">
<t-radio-button value="stream">{{ $t('agentEditor.im.outputStream') }}</t-radio-button>
<t-radio-button value="full">{{ $t('agentEditor.im.outputFull') }}</t-radio-button>
</t-radio-group>
</div>
<!-- Credentials divider -->
<div class="form-divider"></div>
<!-- WeCom credentials -->
<template v-if="formData.platform === 'wecom'">
<template v-if="formData.mode === 'websocket'">
<div class="form-item">
<label class="form-label">Bot ID</label>
<t-input v-model="formData.credentials.bot_id" placeholder="Bot ID" />
</div>
<div class="form-item">
<label class="form-label">Bot Secret</label>
<t-input v-model="formData.credentials.bot_secret" type="password" placeholder="Bot Secret" />
</div>
</template>
<template v-else>
<div class="form-item">
<label class="form-label">Corp ID</label>
<t-input v-model="formData.credentials.corp_id" placeholder="Corp ID" />
</div>
<div class="form-item">
<label class="form-label">Agent Secret</label>
<t-input v-model="formData.credentials.agent_secret" type="password" placeholder="Agent Secret" />
</div>
<div class="form-item">
<label class="form-label">Token</label>
<t-input v-model="formData.credentials.token" placeholder="Token" />
</div>
<div class="form-item">
<label class="form-label">EncodingAESKey</label>
<t-input v-model="formData.credentials.encoding_aes_key" placeholder="EncodingAESKey" />
</div>
<div class="form-item">
<label class="form-label">Corp Agent ID</label>
<t-input-number v-model="formData.credentials.corp_agent_id" placeholder="Corp Agent ID" style="width: 100%;" />
</div>
</template>
</template>
<!-- Feishu credentials -->
<template v-if="formData.platform === 'feishu'">
<div class="form-item">
<label class="form-label">App ID</label>
<t-input v-model="formData.credentials.app_id" placeholder="App ID" />
</div>
<div class="form-item">
<label class="form-label">App Secret</label>
<t-input v-model="formData.credentials.app_secret" type="password" placeholder="App Secret" />
</div>
<template v-if="formData.mode === 'webhook'">
<div class="form-item">
<label class="form-label">Verification Token</label>
<t-input v-model="formData.credentials.verification_token" placeholder="Verification Token" />
</div>
<div class="form-item">
<label class="form-label">Encrypt Key</label>
<t-input v-model="formData.credentials.encrypt_key" type="password" placeholder="Encrypt Key" />
</div>
</template>
</template>
</div>
</t-dialog>
</div>
</template>
<script setup lang="ts">
import { ref, onMounted } from 'vue';
import { useI18n } from 'vue-i18n';
import { MessagePlugin } from 'tdesign-vue-next';
import { listIMChannels, createIMChannel, updateIMChannel, deleteIMChannel, toggleIMChannel } from '@/api/agent';
import type { IMChannel } from '@/api/agent';
const { t } = useI18n();
const props = defineProps<{
agentId: string;
}>();
const channels = ref<IMChannel[]>([]);
const loading = ref(false);
const showCreateDialog = ref(false);
const editingChannel = ref<IMChannel | null>(null);
const defaultCredentials = (): Record<string, any> => ({});
const formData = ref({
platform: 'wecom' as 'wecom' | 'feishu',
name: '',
mode: 'websocket' as 'webhook' | 'websocket',
output_mode: 'stream' as 'stream' | 'full',
credentials: defaultCredentials(),
});
async function loadChannels() {
loading.value = true;
try {
const res = await listIMChannels(props.agentId);
channels.value = res.data || [];
} catch {
channels.value = [];
} finally {
loading.value = false;
}
}
function getCallbackUrl(channel: IMChannel): string {
const base = window.location.origin;
return `${base}/api/v1/im/callback/${channel.id}`;
}
async function copyUrl(channel: IMChannel) {
try {
await navigator.clipboard.writeText(getCallbackUrl(channel));
MessagePlugin.success(t('common.copySuccess'));
} catch {
MessagePlugin.error(t('common.copyFailed'));
}
}
function editChannel(channel: IMChannel) {
editingChannel.value = channel;
formData.value = {
platform: channel.platform,
name: channel.name,
mode: channel.mode,
output_mode: channel.output_mode,
credentials: { ...channel.credentials },
};
showCreateDialog.value = true;
}
function resetForm() {
editingChannel.value = null;
formData.value = {
platform: 'wecom',
name: '',
mode: 'websocket',
output_mode: 'stream',
credentials: defaultCredentials(),
};
}
async function handleSave() {
try {
if (editingChannel.value) {
await updateIMChannel(editingChannel.value.id, {
name: formData.value.name,
mode: formData.value.mode,
output_mode: formData.value.output_mode,
credentials: formData.value.credentials,
});
MessagePlugin.success(t('common.updateSuccess'));
} else {
await createIMChannel(props.agentId, {
platform: formData.value.platform,
name: formData.value.name,
mode: formData.value.mode,
output_mode: formData.value.output_mode,
credentials: formData.value.credentials,
});
MessagePlugin.success(t('common.createSuccess'));
}
showCreateDialog.value = false;
resetForm();
await loadChannels();
} catch (e: any) {
MessagePlugin.error(e?.message || t('common.operationFailed'));
}
}
async function handleToggle(channel: IMChannel) {
try {
await toggleIMChannel(channel.id);
await loadChannels();
} catch (e: any) {
MessagePlugin.error(e?.message || t('common.operationFailed'));
}
}
async function handleDelete(id: string) {
try {
await deleteIMChannel(id);
MessagePlugin.success(t('common.deleteSuccess'));
await loadChannels();
} catch (e: any) {
MessagePlugin.error(e?.message || t('common.operationFailed'));
}
}
onMounted(() => {
loadChannels();
});
</script>
<style scoped lang="less">
.section-content {
display: flex;
flex-direction: column;
gap: 16px;
}
// --- Channel list section (matches AgentShareSettings pattern) ---
.channels-section {
margin-bottom: 8px;
}
.channels-header {
display: flex;
align-items: center;
gap: 8px;
margin-bottom: 16px;
.channels-title {
font-size: 14px;
font-weight: 500;
color: var(--td-text-color-primary);
}
.channels-count {
padding: 2px 8px;
background: var(--td-bg-color-secondarycontainer);
border-radius: 10px;
font-size: 12px;
color: var(--td-text-color-disabled);
}
}
.channels-loading {
display: flex;
align-items: center;
justify-content: center;
gap: 8px;
padding: 32px;
color: var(--td-text-color-disabled);
font-size: 14px;
}
.channels-empty {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
gap: 12px;
padding: 40px 20px;
background: var(--td-bg-color-secondarycontainer);
border-radius: 8px;
color: var(--td-text-color-disabled);
.empty-icon {
font-size: 32px;
opacity: 0.5;
}
}
.channels-list {
display: flex;
flex-direction: column;
gap: 10px;
max-height: 400px;
overflow-y: auto;
}
// --- Channel card (matches share-item pattern) ---
.channel-item {
display: flex;
justify-content: space-between;
align-items: center;
gap: 12px;
padding: 14px 16px;
background: var(--td-bg-color-secondarycontainer);
border: 1px solid var(--td-component-stroke);
border-radius: 8px;
transition: background 0.2s ease, border-color 0.2s ease;
&:hover {
border-color: var(--td-brand-color-focus);
}
}
.channel-info {
flex: 1;
min-width: 0;
display: flex;
flex-direction: column;
gap: 8px;
}
.channel-info-top {
display: flex;
align-items: center;
gap: 12px;
}
.channel-main {
display: flex;
align-items: center;
gap: 8px;
}
.platform-badge {
display: inline-block;
padding: 2px 8px;
border-radius: 4px;
font-size: 12px;
font-weight: 500;
line-height: 18px;
&.wecom {
background: rgba(7, 193, 96, 0.08);
color: #07c160;
}
&.feishu {
background: rgba(51, 112, 255, 0.08);
color: #3370ff;
}
}
.channel-name {
font-size: 14px;
font-weight: 500;
color: var(--td-text-color-primary);
}
.channel-meta {
display: flex;
align-items: center;
gap: 6px;
font-size: 12px;
color: var(--td-text-color-placeholder);
.meta-tag {
display: inline-flex;
align-items: center;
gap: 3px;
padding: 2px 6px;
background: var(--td-bg-color-secondarycontainer);
border-radius: 4px;
}
.meta-icon {
font-size: 12px;
flex-shrink: 0;
}
}
.callback-url-row {
display: flex;
align-items: center;
gap: 8px;
font-size: 12px;
padding-top: 4px;
border-top: 1px dashed var(--td-component-stroke);
.url-label {
color: var(--td-text-color-secondary);
white-space: nowrap;
}
.url-value {
background: var(--td-bg-color-container);
padding: 2px 8px;
border-radius: 4px;
font-size: 11px;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
flex: 1;
min-width: 0;
}
}
.channel-actions {
display: flex;
align-items: center;
gap: 6px;
flex-shrink: 0;
}
// --- Add button ---
.add-btn {
margin-top: 4px;
}
// --- Dialog form (matches share form pattern) ---
.dialog-form {
display: flex;
flex-direction: column;
gap: 16px;
}
.form-item {
.form-label {
display: block;
margin-bottom: 8px;
font-size: 14px;
font-weight: 500;
color: var(--td-text-color-primary);
}
}
.form-divider {
height: 1px;
background: var(--td-component-stroke);
margin: 4px 0;
}
</style>

View File

@@ -3252,6 +3252,25 @@ export default {
dataSchemaDesc: 'Get metadata of tabular files',
requiresKb: '(requires knowledge base configuration)',
},
im: {
title: 'IM Integration',
description: 'Connect agent to instant messaging platforms like WeCom and Feishu',
wecom: 'WeCom',
feishu: 'Feishu',
addChannel: 'Add Channel',
editChannel: 'Edit Channel',
deleteConfirm: 'Are you sure you want to delete this channel? This action cannot be undone.',
channelName: 'Channel Name',
channelNamePlaceholder: 'Enter a name for easy identification',
platform: 'Platform',
mode: 'Connection Mode',
outputMode: 'Output Mode',
outputStream: 'Streaming',
outputFull: 'Full Response',
callbackUrl: 'Callback URL',
empty: 'No IM channels yet. Click the button below to add one.',
unnamed: 'Unnamed Channel',
},
mcp: {
label: 'MCP Services',
desc: 'Select MCP services available to the Agent',

View File

@@ -3052,7 +3052,7 @@ export default {
permissionEditable: "편집 가능",
sharedKBs: "지식베이스",
sharedAgents: "에이전트",
},
}
},
preview: {
tab: '미리보기',
@@ -3282,6 +3282,25 @@ export default {
fallbackResponse: '응답할 수 없을 때 반환할 고정 텍스트',
fallbackPrompt: '지식베이스에서 답변을 찾을 수 없을 때 모델 응답을 유도하는 프롬프트',
},
im: {
title: "IM 통합",
description: "에이전트를 WeCom, Feishu 등 인스턴트 메시징 플랫폼에 연결",
wecom: "WeCom",
feishu: "Feishu",
addChannel: "채널 추가",
editChannel: "채널 편집",
deleteConfirm: "이 채널을 삭제하시겠습니까? 이 작업은 되돌릴 수 없습니다.",
channelName: "채널 이름",
channelNamePlaceholder: "식별하기 쉬운 이름을 입력하세요",
platform: "플랫폼",
mode: "연결 모드",
outputMode: "출력 모드",
outputStream: "스트리밍",
outputFull: "전체 응답",
callbackUrl: "콜백 URL",
empty: "IM 채널이 없습니다. 아래 버튼을 클릭하여 추가하세요.",
unnamed: "이름 없는 채널",
},
tools: {
thinking: '사고',
thinkingDesc: '동적이고 반성적인 문제 해결 사고 도구',

View File

@@ -2835,6 +2835,25 @@ export default {
dataSchemaDesc: 'Получение метаинформации табличных файлов',
requiresKb: '(требуется настройка базы знаний)'
},
im: {
title: 'Интеграция IM',
description: 'Подключите агента к платформам мгновенных сообщений, таким как WeCom и Feishu',
wecom: 'WeCom',
feishu: 'Feishu',
addChannel: 'Добавить канал',
editChannel: 'Редактировать канал',
deleteConfirm: 'Вы уверены, что хотите удалить этот канал? Это действие не может быть отменено.',
channelName: 'Имя канала',
channelNamePlaceholder: 'Введите имя для легкой идентификации',
platform: 'Платформа',
mode: 'Режим подключения',
outputMode: 'Режим вывода',
outputStream: 'Стриминг',
outputFull: 'Полное выходное значение',
callbackUrl: 'URL обратного вызова',
empty: 'Нет IM каналов. Нажмите кнопку ниже, чтобы добавить один.',
unnamed: 'Неименованный канал'
},
mcp: {
label: 'MCP-сервисы',
desc: 'Выберите MCP-сервисы, доступные агенту',

View File

@@ -3227,6 +3227,25 @@ export default {
fallbackResponse: "当无法回答时返回的固定文本",
fallbackPrompt: "当无法从知识库找到答案时,引导模型生成回复的提示词",
},
im: {
title: "IM 集成",
description: "将智能体接入即时通讯平台,支持企业微信和飞书",
wecom: "企业微信",
feishu: "飞书",
addChannel: "添加渠道",
editChannel: "编辑渠道",
deleteConfirm: "确定删除该渠道?删除后无法恢复。",
channelName: "渠道名称",
channelNamePlaceholder: "输入渠道名称,方便辨识",
platform: "平台",
mode: "接入模式",
outputMode: "输出模式",
outputStream: "流式输出",
outputFull: "完整输出",
callbackUrl: "回调地址",
empty: "暂无 IM 渠道,点击下方按钮添加",
unnamed: "未命名渠道",
},
tools: {
thinking: "思考",
thinkingDesc: "动态和反思性的问题解决思考工具",

View File

@@ -1115,6 +1115,17 @@
<div v-if="props.mode === 'edit' && props.agent?.id && !props.agent?.is_builtin" v-show="currentSection === 'share'" class="section">
<AgentShareSettings :agent-id="props.agent.id" :agent="props.agent" />
</div>
<!-- IM集成仅编辑模式 -->
<div v-if="props.mode === 'edit' && props.agent?.id" v-show="currentSection === 'im'" class="section">
<div class="section-header">
<h2>{{ $t('agentEditor.im.title') }}</h2>
<p class="section-description">{{ $t('agentEditor.im.description') }}</p>
</div>
<div class="settings-group">
<IMChannelPanel :agent-id="props.agent.id" />
</div>
</div>
</div>
<!-- 底部操作栏 -->
@@ -1146,6 +1157,7 @@ import AgentAvatar from '@/components/AgentAvatar.vue';
import PromptTemplateSelector from '@/components/PromptTemplateSelector.vue';
import ModelSelector from '@/components/ModelSelector.vue';
import AgentShareSettings from '@/components/AgentShareSettings.vue';
import IMChannelPanel from '@/components/IMChannelPanel.vue';
const uiStore = useUIStore();
const orgStore = useOrganizationStore();
@@ -1378,6 +1390,10 @@ const navItems = computed(() => {
if (props.mode === 'edit' && props.agent?.id && !props.agent?.is_builtin) {
items.push({ key: 'share', icon: 'share', label: t('knowledgeEditor.sidebar.share') });
}
// IM集成仅编辑模式创建时Agent还没有ID
if (props.mode === 'edit' && props.agent?.id) {
items.push({ key: 'im', icon: 'chat-message', label: t('agentEditor.im.title') });
}
return items;
});

View File

@@ -27,50 +27,6 @@ type Config struct {
ExtractManager *ExtractManagerConfig `yaml:"extract" json:"extract"`
WebSearch *WebSearchConfig `yaml:"web_search" json:"web_search"`
PromptTemplates *PromptTemplatesConfig `yaml:"prompt_templates" json:"prompt_templates"`
IM *IMConfig `yaml:"im" json:"im"`
}
// IMConfig IM 集成配置
type IMConfig struct {
WeCom *WeComIMConfig `yaml:"wecom" json:"wecom"`
Feishu *FeishuIMConfig `yaml:"feishu" json:"feishu"`
}
// WeComIMConfig 企业微信配置
type WeComIMConfig struct {
Enabled bool `yaml:"enabled" json:"enabled"`
TenantID uint64 `yaml:"tenant_id" json:"tenant_id"`
AgentID string `yaml:"agent_id" json:"agent_id"`
KnowledgeBases []string `yaml:"knowledge_base_ids" json:"knowledge_base_ids"`
// Mode: "webhook" (default, requires public domain) or "websocket" (long connection via intelligent bot, no public domain needed)
Mode string `yaml:"mode" json:"mode"`
// OutputMode: "stream" (default, real-time streaming) or "full" (wait for complete answer then send)
OutputMode string `yaml:"output_mode" json:"output_mode"`
// --- Webhook mode fields (self-built app callback) ---
CorpID string `yaml:"corp_id" json:"corp_id"`
AgentSecret string `yaml:"agent_secret" json:"agent_secret"`
Token string `yaml:"token" json:"token"`
EncodingAESKey string `yaml:"encoding_aes_key" json:"encoding_aes_key"`
CorpAgentID int `yaml:"corp_agent_id" json:"corp_agent_id"`
// --- WebSocket mode fields (intelligent bot long connection) ---
BotID string `yaml:"bot_id" json:"bot_id"`
BotSecret string `yaml:"bot_secret" json:"bot_secret"`
}
// FeishuIMConfig 飞书配置
type FeishuIMConfig struct {
Enabled bool `yaml:"enabled" json:"enabled"`
TenantID uint64 `yaml:"tenant_id" json:"tenant_id"`
AgentID string `yaml:"agent_id" json:"agent_id"`
KnowledgeBases []string `yaml:"knowledge_base_ids" json:"knowledge_base_ids"`
AppID string `yaml:"app_id" json:"app_id"`
AppSecret string `yaml:"app_secret" json:"app_secret"`
VerificationToken string `yaml:"verification_token" json:"verification_token"`
EncryptKey string `yaml:"encrypt_key" json:"encrypt_key"`
// Mode: "websocket" (default, long connection, no public domain needed) or "webhook" (requires public domain)
Mode string `yaml:"mode" json:"mode"`
// OutputMode: "stream" (default, real-time streaming) or "full" (wait for complete answer then send)
OutputMode string `yaml:"output_mode" json:"output_mode"`
}
// DocReaderConfig configures the document parser client (gRPC or HTTP).

View File

@@ -6,6 +6,7 @@ package container
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/url"
"os"
@@ -242,7 +243,7 @@ func BuildContainer(container *dig.Container) *dig.Container {
// IM integration
logger.Debugf(ctx, "[Container] Registering IM integration...")
must(container.Provide(imPkg.NewService))
must(container.Invoke(registerIMAdapters))
must(container.Invoke(registerIMAdapterFactories))
must(container.Provide(handler.NewIMHandler))
logger.Debugf(ctx, "[Container] HTTP handlers registered")
@@ -947,122 +948,133 @@ func registerWebSearchProviders(registry *web_search.Registry) {
})
}
// registerIMAdapters registers IM platform adapters based on configuration.
// For "websocket" mode, it also starts a long connection client in a goroutine.
func registerIMAdapters(cfg *config.Config, imService *imPkg.Service) {
if cfg.IM == nil {
logger.Infof(context.Background(), "[IM] No IM configuration found, skipping adapter registration")
return
}
// registerIMAdapterFactories registers adapter factories for each IM platform
// and loads enabled channels from the database.
func registerIMAdapterFactories(imService *imPkg.Service) {
ctx := context.Background()
// Register WeCom
if cfg.IM.WeCom != nil && cfg.IM.WeCom.Enabled {
registerWeComAdapter(ctx, cfg.IM.WeCom, imService)
if cfg.IM.WeCom.OutputMode == "full" {
imService.SetStreamDisabled(imPkg.PlatformWeCom, true)
logger.Infof(ctx, "[IM] WeCom streaming disabled (output_mode=full)")
}
}
// Register Feishu
if cfg.IM.Feishu != nil && cfg.IM.Feishu.Enabled {
registerFeishuAdapter(ctx, cfg.IM.Feishu, imService)
if cfg.IM.Feishu.OutputMode == "full" {
imService.SetStreamDisabled(imPkg.PlatformFeishu, true)
logger.Infof(ctx, "[IM] Feishu streaming disabled (output_mode=full)")
}
}
}
func registerWeComAdapter(ctx context.Context, cfg *config.WeComIMConfig, imService *imPkg.Service) {
mode := cfg.Mode
if mode == "" {
mode = "websocket"
}
switch mode {
case "webhook":
adapter, err := wecom.NewWebhookAdapter(
cfg.CorpID,
cfg.AgentSecret,
cfg.Token,
cfg.EncodingAESKey,
cfg.CorpAgentID,
)
// Register WeCom adapter factory
imService.RegisterAdapterFactory("wecom", func(factoryCtx context.Context, channel *imPkg.IMChannel, msgHandler func(context.Context, *imPkg.IncomingMessage) error) (imPkg.Adapter, context.CancelFunc, error) {
creds, err := parseCredentials(channel.Credentials)
if err != nil {
logger.Warnf(ctx, "[IM] Failed to create WeCom webhook adapter: %v", err)
return
}
imService.RegisterAdapter(adapter)
logger.Infof(ctx, "[IM] WeCom adapter registered (mode=webhook, corp_id=%s)", cfg.CorpID)
case "websocket":
// Build the message handler that delegates to imService.HandleMessage
handler := func(msgCtx context.Context, msg *imPkg.IncomingMessage) error {
return imService.HandleMessage(msgCtx, msg, cfg.TenantID, cfg.AgentID, cfg.KnowledgeBases)
return nil, nil, fmt.Errorf("parse wecom credentials: %w", err)
}
client := wecom.NewLongConnClient(cfg.BotID, cfg.BotSecret, handler)
mode := channel.Mode
if mode == "" {
mode = "websocket"
}
// Register a BotAdapter so the service can send replies via WebSocket
imService.RegisterAdapter(wecom.NewWSAdapter(client))
logger.Infof(ctx, "[IM] WeCom adapter registered (mode=websocket, bot_id=%s)", cfg.BotID)
// Start the long connection in a goroutine
go func() {
if err := client.Start(context.Background()); err != nil {
logger.Errorf(context.Background(), "[IM] WeCom long connection stopped: %v", err)
switch mode {
case "webhook":
corpAgentID := 0
if v, ok := creds["corp_agent_id"]; ok {
switch val := v.(type) {
case float64:
corpAgentID = int(val)
case int:
corpAgentID = val
}
}
}()
adapter, err := wecom.NewWebhookAdapter(
getString(creds, "corp_id"),
getString(creds, "agent_secret"),
getString(creds, "token"),
getString(creds, "encoding_aes_key"),
corpAgentID,
)
if err != nil {
return nil, nil, err
}
return adapter, nil, nil
default:
logger.Warnf(ctx, "[IM] Unknown WeCom mode: %s (expected 'webhook' or 'websocket')", mode)
case "websocket":
client := wecom.NewLongConnClient(
getString(creds, "bot_id"),
getString(creds, "bot_secret"),
msgHandler,
)
wsCtx, wsCancel := context.WithCancel(context.Background())
go func() {
if err := client.Start(wsCtx); err != nil && wsCtx.Err() == nil {
logger.Errorf(context.Background(), "[IM] WeCom long connection stopped for channel %s: %v", channel.ID, err)
}
}()
adapter := wecom.NewWSAdapter(client)
return adapter, wsCancel, nil
default:
return nil, nil, fmt.Errorf("unknown WeCom mode: %s", mode)
}
})
// Register Feishu adapter factory
imService.RegisterAdapterFactory("feishu", func(factoryCtx context.Context, channel *imPkg.IMChannel, msgHandler func(context.Context, *imPkg.IncomingMessage) error) (imPkg.Adapter, context.CancelFunc, error) {
creds, err := parseCredentials(channel.Credentials)
if err != nil {
return nil, nil, fmt.Errorf("parse feishu credentials: %w", err)
}
appID := getString(creds, "app_id")
appSecret := getString(creds, "app_secret")
verificationToken := getString(creds, "verification_token")
encryptKey := getString(creds, "encrypt_key")
// Always create the HTTP adapter (needed for SendReply in both modes)
adapter := feishu.NewAdapter(appID, appSecret, verificationToken, encryptKey)
mode := channel.Mode
if mode == "" {
mode = "websocket"
}
switch mode {
case "webhook":
return adapter, nil, nil
case "websocket":
client := feishu.NewLongConnClient(appID, appSecret, msgHandler)
wsCtx, wsCancel := context.WithCancel(context.Background())
go func() {
if err := client.Start(wsCtx); err != nil && wsCtx.Err() == nil {
logger.Errorf(context.Background(), "[IM] Feishu long connection stopped for channel %s: %v", channel.ID, err)
}
}()
return adapter, wsCancel, nil
default:
return nil, nil, fmt.Errorf("unknown Feishu mode: %s", mode)
}
})
// Load and start all enabled channels from database
if err := imService.LoadAndStartChannels(); err != nil {
logger.Warnf(ctx, "[IM] Failed to load channels from database: %v", err)
}
}
func registerFeishuAdapter(ctx context.Context, cfg *config.FeishuIMConfig, imService *imPkg.Service) {
mode := cfg.Mode
if mode == "" {
mode = "websocket"
// parseCredentials parses the JSONB credentials field into a map.
func parseCredentials(data []byte) (map[string]interface{}, error) {
if len(data) == 0 {
return map[string]interface{}{}, nil
}
// Always register the HTTP adapter (needed for SendReply in both modes)
adapter := feishu.NewAdapter(
cfg.AppID,
cfg.AppSecret,
cfg.VerificationToken,
cfg.EncryptKey,
)
imService.RegisterAdapter(adapter)
switch mode {
case "webhook":
logger.Infof(ctx, "[IM] Feishu adapter registered (mode=webhook, app_id=%s)", cfg.AppID)
case "websocket":
logger.Infof(ctx, "[IM] Feishu adapter registered (mode=websocket, app_id=%s)", cfg.AppID)
// Build the message handler
handler := func(msgCtx context.Context, msg *imPkg.IncomingMessage) error {
return imService.HandleMessage(msgCtx, msg, cfg.TenantID, cfg.AgentID, cfg.KnowledgeBases)
}
client := feishu.NewLongConnClient(
cfg.AppID,
cfg.AppSecret,
handler,
)
// Start the long connection in a goroutine
go func() {
if err := client.Start(context.Background()); err != nil {
logger.Errorf(context.Background(), "[IM] Feishu long connection stopped: %v", err)
}
}()
default:
logger.Warnf(ctx, "[IM] Unknown Feishu mode: %s (expected 'webhook' or 'websocket')", mode)
var creds map[string]interface{}
if err := json.Unmarshal(data, &creds); err != nil {
return nil, err
}
return creds, nil
}
// getString safely extracts a string value from a credentials map.
func getString(creds map[string]interface{}, key string) string {
if v, ok := creds[key]; ok {
if s, ok := v.(string); ok {
return s
}
}
return ""
}

View File

@@ -4,45 +4,233 @@ import (
"context"
"net/http"
"github.com/Tencent/WeKnora/internal/config"
"github.com/Tencent/WeKnora/internal/im"
"github.com/Tencent/WeKnora/internal/logger"
"github.com/Tencent/WeKnora/internal/types"
"github.com/gin-gonic/gin"
)
// IMHandler handles IM platform callback requests.
// IMHandler handles IM platform callback requests and channel CRUD.
type IMHandler struct {
imService *im.Service
config *config.Config
}
// NewIMHandler creates a new IM handler.
func NewIMHandler(imService *im.Service, cfg *config.Config) *IMHandler {
func NewIMHandler(imService *im.Service) *IMHandler {
return &IMHandler{
imService: imService,
config: cfg,
}
}
// WeComCallback handles WeCom callback requests (both URL verification and message events).
func (h *IMHandler) WeComCallback(c *gin.Context) {
ctx := c.Request.Context()
// ── Channel CRUD handlers ──
adapter, ok := h.imService.GetAdapter(im.PlatformWeCom)
if !ok {
logger.Error(ctx, "[IM] WeCom adapter not registered")
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "WeCom integration not enabled"})
// CreateIMChannel creates a new IM channel for an agent.
func (h *IMHandler) CreateIMChannel(c *gin.Context) {
agentID := c.Param("id")
if agentID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "agent_id is required"})
return
}
// Handle URL verification (GET request)
tenantID, ok := c.Request.Context().Value(types.TenantIDContextKey).(uint64)
if !ok {
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"})
return
}
var req struct {
Platform string `json:"platform" binding:"required"`
Name string `json:"name"`
Mode string `json:"mode"`
OutputMode string `json:"output_mode"`
Credentials types.JSON `json:"credentials"`
Enabled *bool `json:"enabled"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if req.Platform != "wecom" && req.Platform != "feishu" {
c.JSON(http.StatusBadRequest, gin.H{"error": "platform must be 'wecom' or 'feishu'"})
return
}
channel := &im.IMChannel{
TenantID: tenantID,
AgentID: agentID,
Platform: req.Platform,
Name: req.Name,
Mode: req.Mode,
OutputMode: req.OutputMode,
Credentials: req.Credentials,
Enabled: true,
}
if req.Enabled != nil {
channel.Enabled = *req.Enabled
}
if channel.Mode == "" {
channel.Mode = "websocket"
}
if channel.OutputMode == "" {
channel.OutputMode = "stream"
}
if channel.Credentials == nil {
channel.Credentials = types.JSON("{}")
}
if err := h.imService.CreateChannel(channel); err != nil {
logger.Errorf(c.Request.Context(), "[IM] Create channel failed: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create channel"})
return
}
c.JSON(http.StatusOK, gin.H{"data": channel})
}
// ListIMChannels lists all IM channels for an agent.
func (h *IMHandler) ListIMChannels(c *gin.Context) {
agentID := c.Param("id")
if agentID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "agent_id is required"})
return
}
channels, err := h.imService.ListChannelsByAgent(agentID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list channels"})
return
}
c.JSON(http.StatusOK, gin.H{"data": channels})
}
// UpdateIMChannel updates an IM channel.
func (h *IMHandler) UpdateIMChannel(c *gin.Context) {
channelID := c.Param("id")
if channelID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "channel id is required"})
return
}
channel, err := h.imService.GetChannelByID(channelID)
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "channel not found"})
return
}
var req struct {
Name *string `json:"name"`
Mode *string `json:"mode"`
OutputMode *string `json:"output_mode"`
Credentials types.JSON `json:"credentials"`
Enabled *bool `json:"enabled"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if req.Name != nil {
channel.Name = *req.Name
}
if req.Mode != nil {
channel.Mode = *req.Mode
}
if req.OutputMode != nil {
channel.OutputMode = *req.OutputMode
}
if req.Credentials != nil {
channel.Credentials = req.Credentials
}
if req.Enabled != nil {
channel.Enabled = *req.Enabled
}
if err := h.imService.UpdateChannel(channel); err != nil {
logger.Errorf(c.Request.Context(), "[IM] Update channel failed: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update channel"})
return
}
c.JSON(http.StatusOK, gin.H{"data": channel})
}
// DeleteIMChannel deletes an IM channel.
func (h *IMHandler) DeleteIMChannel(c *gin.Context) {
channelID := c.Param("id")
if channelID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "channel id is required"})
return
}
if err := h.imService.DeleteChannel(channelID); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete channel"})
return
}
c.JSON(http.StatusOK, gin.H{"success": true})
}
// ToggleIMChannel toggles the enabled state of an IM channel.
func (h *IMHandler) ToggleIMChannel(c *gin.Context) {
channelID := c.Param("id")
if channelID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "channel id is required"})
return
}
channel, err := h.imService.ToggleChannel(channelID)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to toggle channel"})
return
}
c.JSON(http.StatusOK, gin.H{"data": channel})
}
// ── Callback handlers ──
// IMCallback handles IM platform callback requests for a specific channel.
// Route: POST /api/v1/im/callback/:channel_id
func (h *IMHandler) IMCallback(c *gin.Context) {
ctx := c.Request.Context()
channelID := c.Param("channel_id")
adapter, channel, ok := h.imService.GetChannelAdapter(channelID)
if !ok {
// Try loading from DB
ch, err := h.imService.GetChannelByID(channelID)
if err != nil {
logger.Errorf(ctx, "[IM] Channel not found for callback: %s", channelID)
c.JSON(http.StatusNotFound, gin.H{"error": "channel not found"})
return
}
if err := h.imService.StartChannel(ch); err != nil {
logger.Errorf(ctx, "[IM] Failed to start channel for callback: %v", err)
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "channel not available"})
return
}
adapter, channel, ok = h.imService.GetChannelAdapter(channelID)
if !ok {
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "channel not available"})
return
}
}
if !channel.Enabled {
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "channel is disabled"})
return
}
// Handle URL verification
if adapter.HandleURLVerification(c) {
return
}
// Verify callback signature
if err := adapter.VerifyCallback(c); err != nil {
logger.Errorf(ctx, "[IM] WeCom callback verification failed: %v", err)
logger.Errorf(ctx, "[IM] Callback verification failed for channel %s: %v", channelID, err)
c.JSON(http.StatusForbidden, gin.H{"error": "verification failed"})
return
}
@@ -50,83 +238,27 @@ func (h *IMHandler) WeComCallback(c *gin.Context) {
// Parse the callback message
msg, err := adapter.ParseCallback(c)
if err != nil {
logger.Errorf(ctx, "[IM] WeCom parse callback failed: %v", err)
logger.Errorf(ctx, "[IM] Parse callback failed for channel %s: %v", channelID, err)
c.JSON(http.StatusBadRequest, gin.H{"error": "parse failed"})
return
}
// If nil, it's a non-message event (e.g., system event) - just acknowledge
// If nil, it's a non-message event - just acknowledge
if msg == nil {
c.JSON(http.StatusOK, gin.H{"success": true})
return
}
// Respond immediately to avoid WeCom timeout, process asynchronously
// Respond immediately to avoid platform timeout
c.JSON(http.StatusOK, gin.H{"success": true})
// Get config for this platform
wecomCfg := h.config.IM.WeCom
// Detach from gin request context to prevent cancellation after HTTP response.
// Detach from gin request context
asyncCtx := context.WithoutCancel(ctx)
// Process message asynchronously
go func() {
if err := h.imService.HandleMessage(asyncCtx, msg, wecomCfg.TenantID, wecomCfg.AgentID, wecomCfg.KnowledgeBases); err != nil {
logger.Errorf(asyncCtx, "[IM] WeCom handle message error: %v", err)
}
}()
}
// FeishuCallback handles Feishu callback requests (both URL verification and message events).
func (h *IMHandler) FeishuCallback(c *gin.Context) {
ctx := c.Request.Context()
adapter, ok := h.imService.GetAdapter(im.PlatformFeishu)
if !ok {
logger.Error(ctx, "[IM] Feishu adapter not registered")
c.JSON(http.StatusServiceUnavailable, gin.H{"error": "Feishu integration not enabled"})
return
}
// Handle URL verification (challenge)
if adapter.HandleURLVerification(c) {
return
}
// Verify callback
if err := adapter.VerifyCallback(c); err != nil {
logger.Errorf(ctx, "[IM] Feishu callback verification failed: %v", err)
c.JSON(http.StatusForbidden, gin.H{"error": "verification failed"})
return
}
// Parse the callback message
msg, err := adapter.ParseCallback(c)
if err != nil {
logger.Errorf(ctx, "[IM] Feishu parse callback failed: %v", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "parse failed"})
return
}
if msg == nil {
c.JSON(http.StatusOK, gin.H{"success": true})
return
}
// Respond immediately
c.JSON(http.StatusOK, gin.H{"success": true})
// Get config
feishuCfg := h.config.IM.Feishu
// Detach from gin request context to prevent cancellation after HTTP response.
asyncCtx := context.WithoutCancel(ctx)
// Process asynchronously
go func() {
if err := h.imService.HandleMessage(asyncCtx, msg, feishuCfg.TenantID, feishuCfg.AgentID, feishuCfg.KnowledgeBases); err != nil {
logger.Errorf(asyncCtx, "[IM] Feishu handle message error: %v", err)
if err := h.imService.HandleMessage(asyncCtx, msg, channelID); err != nil {
logger.Errorf(asyncCtx, "[IM] Handle message error for channel %s: %v", channelID, err)
}
}()
}

View File

@@ -2,7 +2,7 @@ package im
import (
"context"
"fmt"
"fmt"
"strings"
"sync"
"time"
@@ -29,6 +29,17 @@ const (
streamFlushInterval = 300 * time.Millisecond
)
// channelState holds runtime state for a running IM channel.
type channelState struct {
Channel *IMChannel
Adapter Adapter
Cancel context.CancelFunc // for stopping websocket goroutines
}
// AdapterFactory creates an Adapter from an IMChannel configuration.
// The second return value is an optional cleanup function (e.g., for stopping websocket connections).
type AdapterFactory func(ctx context.Context, channel *IMChannel, msgHandler func(ctx context.Context, msg *IncomingMessage) error) (Adapter, context.CancelFunc, error)
// Service orchestrates IM message handling:
// 1. Receives a unified IncomingMessage from an Adapter
// 2. Resolves or creates a WeKnora session for the IM channel
@@ -41,15 +52,16 @@ type Service struct {
tenantService interfaces.TenantService
agentService interfaces.CustomAgentService
adapters map[Platform]Adapter
// channels maps channel ID -> running channel state
channels map[string]*channelState
mu sync.RWMutex
// adapterFactories maps platform name -> factory function
adapterFactories map[string]AdapterFactory
// processedMsgs tracks recently processed message IDs to prevent duplicate handling.
processedMsgs sync.Map
// streamDisabled tracks platforms where streaming is explicitly disabled via output_mode config.
streamDisabled map[Platform]bool
stopCh chan struct{}
}
@@ -62,14 +74,14 @@ func NewService(
agentService interfaces.CustomAgentService,
) *Service {
s := &Service{
db: db,
sessionService: sessionService,
messageService: messageService,
tenantService: tenantService,
agentService: agentService,
adapters: make(map[Platform]Adapter),
streamDisabled: make(map[Platform]bool),
stopCh: make(chan struct{}),
db: db,
sessionService: sessionService,
messageService: messageService,
tenantService: tenantService,
agentService: agentService,
channels: make(map[string]*channelState),
adapterFactories: make(map[string]AdapterFactory),
stopCh: make(chan struct{}),
}
// Start periodic dedup cleanup instead of per-message goroutines
@@ -78,9 +90,24 @@ func NewService(
return s
}
// Stop gracefully shuts down the service, stopping background goroutines.
// RegisterAdapterFactory registers a factory for creating adapters for a given platform.
func (s *Service) RegisterAdapterFactory(platform string, factory AdapterFactory) {
s.mu.Lock()
defer s.mu.Unlock()
s.adapterFactories[platform] = factory
}
// Stop gracefully shuts down the service, stopping all channels and background goroutines.
func (s *Service) Stop() {
close(s.stopCh)
s.mu.Lock()
defer s.mu.Unlock()
for id, cs := range s.channels {
if cs.Cancel != nil {
cs.Cancel()
}
delete(s.channels, id)
}
}
// dedupCleanupLoop periodically cleans up expired entries from the dedup map.
@@ -103,33 +130,102 @@ func (s *Service) dedupCleanupLoop() {
}
}
// RegisterAdapter registers an IM platform adapter.
func (s *Service) RegisterAdapter(adapter Adapter) {
s.mu.Lock()
defer s.mu.Unlock()
s.adapters[adapter.Platform()] = adapter
// LoadAndStartChannels loads all enabled channels from the database and starts them.
func (s *Service) LoadAndStartChannels() error {
ctx := context.Background()
var channels []IMChannel
if err := s.db.Where("enabled = ? AND deleted_at IS NULL", true).Find(&channels).Error; err != nil {
return fmt.Errorf("load im channels: %w", err)
}
for i := range channels {
ch := channels[i]
if err := s.StartChannel(&ch); err != nil {
logger.Warnf(ctx, "[IM] Failed to start channel %s (%s/%s): %v", ch.ID, ch.Platform, ch.Name, err)
} else {
logger.Infof(ctx, "[IM] Started channel: id=%s platform=%s name=%s mode=%s agent=%s",
ch.ID, ch.Platform, ch.Name, ch.Mode, ch.AgentID)
}
}
logger.Infof(ctx, "[IM] Loaded %d enabled channels", len(channels))
return nil
}
// SetStreamDisabled disables streaming output for a specific platform.
// When disabled, the service will always collect the full answer before sending,
// even if the adapter implements StreamSender.
func (s *Service) SetStreamDisabled(platform Platform, disabled bool) {
// StartChannel creates and registers an adapter for the given channel.
func (s *Service) StartChannel(channel *IMChannel) error {
s.mu.Lock()
defer s.mu.Unlock()
s.streamDisabled[platform] = disabled
factory, ok := s.adapterFactories[channel.Platform]
if !ok {
s.mu.Unlock()
return fmt.Errorf("no adapter factory for platform: %s", channel.Platform)
}
// Stop existing channel if running
if existing, ok := s.channels[channel.ID]; ok {
if existing.Cancel != nil {
existing.Cancel()
}
delete(s.channels, channel.ID)
}
s.mu.Unlock()
// Build the message handler that delegates to HandleMessage with this channel's config
msgHandler := func(msgCtx context.Context, msg *IncomingMessage) error {
return s.HandleMessage(msgCtx, msg, channel.ID)
}
ctx := context.Background()
adapter, cancelFn, err := factory(ctx, channel, msgHandler)
if err != nil {
return fmt.Errorf("create adapter: %w", err)
}
s.mu.Lock()
s.channels[channel.ID] = &channelState{
Channel: channel,
Adapter: adapter,
Cancel: cancelFn,
}
s.mu.Unlock()
return nil
}
// GetAdapter returns the adapter for a given platform.
func (s *Service) GetAdapter(platform Platform) (Adapter, bool) {
// StopChannel stops and removes a running channel.
func (s *Service) StopChannel(channelID string) {
s.mu.Lock()
defer s.mu.Unlock()
if cs, ok := s.channels[channelID]; ok {
if cs.Cancel != nil {
cs.Cancel()
}
delete(s.channels, channelID)
logger.Infof(context.Background(), "[IM] Stopped channel: id=%s", channelID)
}
}
// GetChannelAdapter returns the adapter and channel config for a given channel ID.
func (s *Service) GetChannelAdapter(channelID string) (Adapter, *IMChannel, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
a, ok := s.adapters[platform]
return a, ok
cs, ok := s.channels[channelID]
if !ok {
return nil, nil, false
}
return cs.Adapter, cs.Channel, true
}
// HandleMessage processes an incoming IM message end-to-end:
// resolves session, runs QA, sends reply.
func (s *Service) HandleMessage(ctx context.Context, msg *IncomingMessage, tenantID uint64, agentID string, kbIDs []string) error {
// GetChannelByID loads a channel from the database.
func (s *Service) GetChannelByID(channelID string) (*IMChannel, error) {
var ch IMChannel
if err := s.db.Where("id = ? AND deleted_at IS NULL", channelID).First(&ch).Error; err != nil {
return nil, err
}
return &ch, nil
}
// HandleMessage processes an incoming IM message end-to-end using channel config.
func (s *Service) HandleMessage(ctx context.Context, msg *IncomingMessage, channelID string) error {
// Dedup: skip if this message was already processed (IM platforms may retry)
if msg.MessageID != "" {
if _, loaded := s.processedMsgs.LoadOrStore(msg.MessageID, time.Now()); loaded {
@@ -145,10 +241,31 @@ func (s *Service) HandleMessage(ctx context.Context, msg *IncomingMessage, tenan
msg.Content = string(contentRunes[:maxContentLength])
}
logger.Infof(ctx, "[IM] HandleMessage: platform=%s user=%s chat=%s content_len=%d",
msg.Platform, msg.UserID, msg.ChatID, len(msg.Content))
// Get channel config
adapter, channel, ok := s.GetChannelAdapter(channelID)
if !ok {
// Try loading from DB (channel might have been created after service start)
ch, err := s.GetChannelByID(channelID)
if err != nil {
return fmt.Errorf("channel not found: %s", channelID)
}
// Start it dynamically
if err := s.StartChannel(ch); err != nil {
return fmt.Errorf("start channel %s: %w", channelID, err)
}
adapter, channel, ok = s.GetChannelAdapter(channelID)
if !ok {
return fmt.Errorf("channel adapter not available after start: %s", channelID)
}
}
// 1. Get tenant (once, shared across resolve + QA)
tenantID := channel.TenantID
agentID := channel.AgentID
logger.Infof(ctx, "[IM] HandleMessage: channel=%s platform=%s user=%s chat=%s content_len=%d",
channelID, msg.Platform, msg.UserID, msg.ChatID, len(msg.Content))
// 1. Get tenant
tenant, err := s.tenantService.GetTenantByID(ctx, tenantID)
if err != nil {
return fmt.Errorf("get tenant: %w", err)
@@ -157,7 +274,7 @@ func (s *Service) HandleMessage(ctx context.Context, msg *IncomingMessage, tenan
sessionCtx = context.WithValue(sessionCtx, types.TenantInfoContextKey, tenant)
// 2. Resolve or create a WeKnora session
channelSession, err := s.resolveSession(sessionCtx, msg, tenantID, agentID)
channelSession, err := s.resolveSession(sessionCtx, msg, tenantID, agentID, channelID)
if err != nil {
return fmt.Errorf("resolve session: %w", err)
}
@@ -179,20 +296,17 @@ func (s *Service) HandleMessage(ctx context.Context, msg *IncomingMessage, tenan
}
}
// 5. Get the platform adapter
adapter, ok := s.GetAdapter(msg.Platform)
if !ok {
return fmt.Errorf("no adapter for platform: %s", msg.Platform)
// 5. Resolve knowledge base IDs from agent config
var kbIDs []string
if customAgent != nil {
kbIDs = customAgent.Config.KnowledgeBases
}
// 6. If the adapter supports streaming and streaming is not disabled, use streaming mode;
// otherwise collect full answer.
s.mu.RLock()
disabled := s.streamDisabled[msg.Platform]
s.mu.RUnlock()
if !disabled {
// 6. If the adapter supports streaming and output_mode is not "full", use streaming
streamDisabled := channel.OutputMode == "full"
if !streamDisabled {
if streamer, ok := adapter.(StreamSender); ok {
return s.handleMessageStream(sessionCtx, msg, session, customAgent, kbIDs, streamer)
return s.handleMessageStream(sessionCtx, msg, session, customAgent, kbIDs, streamer, adapter)
}
}
@@ -211,13 +325,14 @@ func (s *Service) HandleMessage(ctx context.Context, msg *IncomingMessage, tenan
return fmt.Errorf("send reply: %w", err)
}
logger.Infof(ctx, "[IM] Reply sent: platform=%s user=%s answer_len=%d", msg.Platform, msg.UserID, len(answer))
logger.Infof(ctx, "[IM] Reply sent: channel=%s platform=%s user=%s answer_len=%d",
channelID, msg.Platform, msg.UserID, len(answer))
return nil
}
// resolveSession finds or creates a ChannelSession for the given IM message.
// ctx must already carry TenantIDContextKey and TenantInfoContextKey.
func (s *Service) resolveSession(ctx context.Context, msg *IncomingMessage, tenantID uint64, agentID string) (*ChannelSession, error) {
func (s *Service) resolveSession(ctx context.Context, msg *IncomingMessage, tenantID uint64, agentID string, imChannelID string) (*ChannelSession, error) {
var cs ChannelSession
result := s.db.Where("platform = ? AND user_id = ? AND chat_id = ? AND tenant_id = ? AND deleted_at IS NULL",
string(msg.Platform), msg.UserID, msg.ChatID, tenantID).
@@ -251,12 +366,13 @@ func (s *Service) resolveSession(ctx context.Context, msg *IncomingMessage, tena
// Create the channel-session mapping; use a unique constraint fallback
// to handle concurrent creation attempts for the same channel.
cs = ChannelSession{
Platform: string(msg.Platform),
UserID: msg.UserID,
ChatID: msg.ChatID,
SessionID: createdSession.ID,
TenantID: tenantID,
AgentID: agentID,
Platform: string(msg.Platform),
UserID: msg.UserID,
ChatID: msg.ChatID,
SessionID: createdSession.ID,
TenantID: tenantID,
AgentID: agentID,
IMChannelID: imChannelID,
}
if err := s.db.Create(&cs).Error; err != nil {
// If the insert failed due to unique constraint (concurrent request),
@@ -279,12 +395,12 @@ func (s *Service) resolveSession(ctx context.Context, msg *IncomingMessage, tena
// handleMessageStream runs the QA pipeline and streams answer chunks to the IM platform
// in real-time via the StreamSender interface. Chunks are batched at streamFlushInterval
// to avoid API rate-limiting.
func (s *Service) handleMessageStream(ctx context.Context, msg *IncomingMessage, session *types.Session, customAgent *types.CustomAgent, kbIDs []string, streamer StreamSender) error {
func (s *Service) handleMessageStream(ctx context.Context, msg *IncomingMessage, session *types.Session, customAgent *types.CustomAgent, kbIDs []string, streamer StreamSender, adapter Adapter) error {
// Start the stream on the IM platform (e.g., create Feishu streaming card)
streamID, err := streamer.StartStream(ctx, msg)
if err != nil {
logger.Warnf(ctx, "[IM] StartStream failed, falling back to non-streaming: %v", err)
return s.fallbackNonStream(ctx, msg, session, customAgent, kbIDs)
return s.fallbackNonStream(ctx, msg, session, customAgent, kbIDs, adapter)
}
// Prepare the QA pipeline
@@ -457,18 +573,13 @@ loop:
}
// fallbackNonStream is used when streaming initialization fails.
func (s *Service) fallbackNonStream(ctx context.Context, msg *IncomingMessage, session *types.Session, customAgent *types.CustomAgent, kbIDs []string) error {
func (s *Service) fallbackNonStream(ctx context.Context, msg *IncomingMessage, session *types.Session, customAgent *types.CustomAgent, kbIDs []string, adapter Adapter) error {
answer, err := s.runQA(ctx, session, msg.Content, customAgent, kbIDs)
if err != nil {
logger.Errorf(ctx, "[IM] QA fallback failed: %v", err)
answer = "抱歉,处理您的问题时出现了异常,请稍后再试。"
}
adapter, ok := s.GetAdapter(msg.Platform)
if !ok {
return fmt.Errorf("no adapter for platform: %s", msg.Platform)
}
return adapter.SendReply(ctx, msg, &ReplyMessage{Content: answer, IsFinal: true})
}
@@ -599,3 +710,69 @@ func (s *Service) runQA(ctx context.Context, session *types.Session, query strin
return answer, nil
}
// ── CRUD operations for IM channels ──
// ListChannelsByAgent returns all channels for a given agent.
func (s *Service) ListChannelsByAgent(agentID string) ([]IMChannel, error) {
var channels []IMChannel
if err := s.db.Where("agent_id = ? AND deleted_at IS NULL", agentID).
Order("created_at DESC").Find(&channels).Error; err != nil {
return nil, err
}
return channels, nil
}
// CreateChannel creates a new IM channel and optionally starts it.
func (s *Service) CreateChannel(channel *IMChannel) error {
if err := s.db.Create(channel).Error; err != nil {
return err
}
if channel.Enabled {
if err := s.StartChannel(channel); err != nil {
logger.Warnf(context.Background(), "[IM] Created channel %s but failed to start: %v", channel.ID, err)
}
}
return nil
}
// UpdateChannel updates a channel and restarts it if needed.
func (s *Service) UpdateChannel(channel *IMChannel) error {
if err := s.db.Save(channel).Error; err != nil {
return err
}
// Restart channel: stop old, start new if enabled
s.StopChannel(channel.ID)
if channel.Enabled {
if err := s.StartChannel(channel); err != nil {
logger.Warnf(context.Background(), "[IM] Updated channel %s but failed to restart: %v", channel.ID, err)
}
}
return nil
}
// DeleteChannel soft-deletes a channel and stops it.
func (s *Service) DeleteChannel(channelID string) error {
s.StopChannel(channelID)
return s.db.Where("id = ?", channelID).Delete(&IMChannel{}).Error
}
// ToggleChannel enables or disables a channel.
func (s *Service) ToggleChannel(channelID string) (*IMChannel, error) {
var ch IMChannel
if err := s.db.Where("id = ? AND deleted_at IS NULL", channelID).First(&ch).Error; err != nil {
return nil, err
}
ch.Enabled = !ch.Enabled
if err := s.db.Save(&ch).Error; err != nil {
return nil, err
}
if ch.Enabled {
if err := s.StartChannel(&ch); err != nil {
logger.Warnf(context.Background(), "[IM] Failed to start channel %s after enable: %v", ch.ID, err)
}
} else {
s.StopChannel(channelID)
}
return &ch, nil
}

View File

@@ -8,21 +8,56 @@ import (
"gorm.io/gorm"
)
// IMChannel represents an IM channel configuration stored in the database.
// Each channel binds to an agent and contains platform-specific credentials.
type IMChannel struct {
ID string `json:"id" gorm:"type:varchar(36);primaryKey;default:uuid_generate_v4()"`
TenantID uint64 `json:"tenant_id" gorm:"not null;index:idx_im_channels_tenant"`
AgentID string `json:"agent_id" gorm:"type:varchar(36);not null;index:idx_im_channels_agent"`
Platform string `json:"platform" gorm:"type:varchar(20);not null"`
Name string `json:"name" gorm:"type:varchar(255);not null;default:''"`
Enabled bool `json:"enabled" gorm:"not null;default:true"`
Mode string `json:"mode" gorm:"type:varchar(20);not null;default:'websocket'"`
OutputMode string `json:"output_mode" gorm:"type:varchar(20);not null;default:'stream'"`
Credentials types.JSON `json:"credentials" gorm:"type:jsonb;not null;default:'{}'"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
DeletedAt gorm.DeletedAt `json:"deleted_at" gorm:"index"`
}
func (IMChannel) TableName() string {
return "im_channels"
}
func (ch *IMChannel) BeforeCreate(tx *gorm.DB) error {
if ch.ID == "" {
ch.ID = uuid.New().String()
}
if ch.Mode == "" {
ch.Mode = "websocket"
}
if ch.OutputMode == "" {
ch.OutputMode = "stream"
}
return nil
}
// ChannelSession maps an IM channel (user+chat combination) to a WeKnora session.
// This allows the IM integration to maintain conversation continuity.
type ChannelSession struct {
ID string `json:"id" gorm:"type:varchar(36);primaryKey;default:uuid_generate_v4()"`
Platform string `json:"platform" gorm:"type:varchar(20);not null"`
UserID string `json:"user_id" gorm:"type:varchar(128);not null"`
ChatID string `json:"chat_id" gorm:"type:varchar(128);not null;default:''"`
SessionID string `json:"session_id" gorm:"type:varchar(36);not null;index"`
TenantID uint64 `json:"tenant_id" gorm:"not null;index"`
AgentID string `json:"agent_id" gorm:"type:varchar(36);default:''"`
Status string `json:"status" gorm:"type:varchar(20);not null;default:'active'"`
Metadata types.JSON `json:"metadata" gorm:"type:jsonb;default:'{}'"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
DeletedAt gorm.DeletedAt `json:"deleted_at" gorm:"index"`
ID string `json:"id" gorm:"type:varchar(36);primaryKey;default:uuid_generate_v4()"`
Platform string `json:"platform" gorm:"type:varchar(20);not null"`
UserID string `json:"user_id" gorm:"type:varchar(128);not null"`
ChatID string `json:"chat_id" gorm:"type:varchar(128);not null;default:''"`
SessionID string `json:"session_id" gorm:"type:varchar(36);not null;index"`
TenantID uint64 `json:"tenant_id" gorm:"not null;index"`
AgentID string `json:"agent_id" gorm:"type:varchar(36);default:''"`
IMChannelID string `json:"im_channel_id" gorm:"type:varchar(36);default:''"`
Status string `json:"status" gorm:"type:varchar(20);not null;default:'active'"`
Metadata types.JSON `json:"metadata" gorm:"type:jsonb;default:'{}'"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
DeletedAt gorm.DeletedAt `json:"deleted_at" gorm:"index"`
}
func (ChannelSession) TableName() string {

View File

@@ -138,6 +138,7 @@ func NewRouter(params RouterParams) *gin.Engine {
RegisterCustomAgentRoutes(v1, params.CustomAgentHandler)
RegisterSkillRoutes(v1, params.SkillHandler)
RegisterOrganizationRoutes(v1, params.OrganizationHandler)
RegisterIMChannelRoutes(v1, params.IMHandler)
}
return r
@@ -586,11 +587,26 @@ func RegisterOrganizationRoutes(r *gin.RouterGroup, orgHandler *handler.Organiza
func RegisterIMRoutes(r *gin.Engine, imHandler *handler.IMHandler) {
im := r.Group("/api/v1/im")
{
// WeCom callback (supports both GET for URL verification and POST for message events)
im.GET("/callback/wecom", imHandler.WeComCallback)
im.POST("/callback/wecom", imHandler.WeComCallback)
// Feishu callback (POST for both URL verification challenge and message events)
im.POST("/callback/feishu", imHandler.FeishuCallback)
im.GET("/callback/:channel_id", imHandler.IMCallback)
im.POST("/callback/:channel_id", imHandler.IMCallback)
}
}
// RegisterIMChannelRoutes registers IM channel CRUD routes (requires authentication).
func RegisterIMChannelRoutes(r *gin.RouterGroup, imHandler *handler.IMHandler) {
// Channel CRUD under agents
agentChannels := r.Group("/agents/:id/im-channels")
{
agentChannels.POST("", imHandler.CreateIMChannel)
agentChannels.GET("", imHandler.ListIMChannels)
}
// Channel operations by channel ID
channels := r.Group("/im-channels")
{
channels.PUT("/:id", imHandler.UpdateIMChannel)
channels.DELETE("/:id", imHandler.DeleteIMChannel)
channels.POST("/:id/toggle", imHandler.ToggleIMChannel)
}
}

View File

@@ -0,0 +1,7 @@
-- Rollback: 000022_im_channels
DO $$ BEGIN RAISE NOTICE '[Migration 000022] Rolling back: im_channels'; END $$;
ALTER TABLE im_channel_sessions DROP COLUMN IF EXISTS im_channel_id;
DROP TABLE IF EXISTS im_channels;
DO $$ BEGIN RAISE NOTICE '[Migration 000022] Rollback completed'; END $$;

View File

@@ -0,0 +1,36 @@
-- Migration: 000022_im_channels
-- Description: Create IM channels table for database-driven IM integration (replaces config.yaml IM settings)
DO $$ BEGIN RAISE NOTICE '[Migration 000022] Creating table: im_channels'; END $$;
CREATE TABLE IF NOT EXISTS im_channels (
id VARCHAR(36) PRIMARY KEY DEFAULT uuid_generate_v4(),
tenant_id BIGINT NOT NULL,
agent_id VARCHAR(36) NOT NULL,
platform VARCHAR(20) NOT NULL,
name VARCHAR(255) NOT NULL DEFAULT '',
enabled BOOLEAN NOT NULL DEFAULT true,
mode VARCHAR(20) NOT NULL DEFAULT 'websocket',
output_mode VARCHAR(20) NOT NULL DEFAULT 'stream',
credentials JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP WITH TIME ZONE
);
CREATE INDEX IF NOT EXISTS idx_im_channels_tenant ON im_channels (tenant_id);
CREATE INDEX IF NOT EXISTS idx_im_channels_agent ON im_channels (agent_id);
CREATE INDEX IF NOT EXISTS idx_im_channels_deleted ON im_channels (deleted_at) WHERE deleted_at IS NOT NULL;
COMMENT ON TABLE im_channels IS 'IM platform channel configurations bound to agents';
COMMENT ON COLUMN im_channels.agent_id IS 'Agent ID this channel is bound to';
COMMENT ON COLUMN im_channels.platform IS 'IM platform: wecom, feishu';
COMMENT ON COLUMN im_channels.name IS 'User-defined channel name for identification';
COMMENT ON COLUMN im_channels.mode IS 'Connection mode: webhook or websocket';
COMMENT ON COLUMN im_channels.output_mode IS 'Output mode: stream (real-time) or full (wait for complete answer)';
COMMENT ON COLUMN im_channels.credentials IS 'Platform credentials (JSONB): WeCom webhook={corp_id,agent_secret,token,encoding_aes_key,corp_agent_id}, WeCom ws={bot_id,bot_secret}, Feishu={app_id,app_secret,verification_token,encrypt_key}';
-- Add im_channel_id column to im_channel_sessions for linking
ALTER TABLE im_channel_sessions ADD COLUMN IF NOT EXISTS im_channel_id VARCHAR(36) DEFAULT '';
CREATE INDEX IF NOT EXISTS idx_im_channel_sessions_channel ON im_channel_sessions (im_channel_id) WHERE im_channel_id != '';
DO $$ BEGIN RAISE NOTICE '[Migration 000022] im_channels setup completed successfully!'; END $$;