feat(im): add AI card streaming for DingTalk and improve IM platform robustness

- Add DingTalk AI Card streaming support with create/deliver/update via OpenAPI
- Extract shared think-block rendering into reusable im.TransformThinkBlocks
- Add stream orphan reaper and edit throttling for DingTalk and Telegram
- Apply think-block transform on all reply paths (DingTalk, Telegram, Feishu)
- Add Card Template ID config to frontend IM channel panel with i18n
This commit is contained in:
nullkey
2026-03-24 20:12:40 +08:00
committed by lyingbug
parent a1a384ec67
commit 1bf6603271
11 changed files with 414 additions and 136 deletions

View File

@@ -277,6 +277,11 @@
<label class="form-label">Client Secret (AppSecret)</label>
<t-input v-model="formData.credentials.client_secret" type="password" placeholder="Client Secret / AppSecret" />
</div>
<div class="form-item">
<label class="form-label">{{ $t('agentEditor.im.dingtalkCardTemplateId') }}</label>
<t-input v-model="formData.credentials.card_template_id" placeholder="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.schema" />
<p class="form-hint">{{ $t('agentEditor.im.dingtalkCardTemplateIdHint') }}</p>
</div>
</template>
</div>
</t-dialog>

View File

@@ -3207,6 +3207,8 @@ export default {
slackConsole: 'Slack API Console',
telegramConsole: 'Telegram BotFather',
dingtalkConsole: 'DingTalk Open Platform',
dingtalkCardTemplateId: 'Card Template ID (optional)',
dingtalkCardTemplateIdHint: 'Create an AI Card template at open-dev.dingtalk.com/fe/card to enable streaming output with typewriter effect',
modeHint: 'WebSocket is recommended for easier setup',
consoleTip: 'to get credentials',
fileKnowledgeBase: 'File Storage Knowledge Base',

View File

@@ -3237,6 +3237,8 @@ export default {
slackConsole: "Slack API 콘솔",
telegramConsole: "Telegram BotFather",
dingtalkConsole: "DingTalk 개발 플랫폼",
dingtalkCardTemplateId: "카드 템플릿 ID (선택 사항)",
dingtalkCardTemplateIdHint: "open-dev.dingtalk.com/fe/card에서 AI 카드 템플릿을 만들면 타자기 효과 스트리밍 출력이 활성화됩니다",
modeHint: "WebSocket 방식이 설정이 더 간편하여 권장됩니다",
consoleTip: "자격 증명 정보를 가져오세요",
fileKnowledgeBase: "파일 저장 지식 베이스",

View File

@@ -2870,6 +2870,8 @@ export default {
slackConsole: 'Консоль Slack API',
telegramConsole: 'Telegram BotFather',
dingtalkConsole: 'Платформа DingTalk',
dingtalkCardTemplateId: 'ID шаблона карточки (необязательно)',
dingtalkCardTemplateIdHint: 'Создайте шаблон AI-карточки на open-dev.dingtalk.com/fe/card для потоковой передачи с эффектом печатной машинки',
modeHint: 'Рекомендуется WebSocket — проще настроить',
consoleTip: 'для получения учётных данных',
fileKnowledgeBase: 'База знаний для файлов',

View File

@@ -3182,6 +3182,8 @@ export default {
slackConsole: "Slack API 控制台",
telegramConsole: "Telegram BotFather",
dingtalkConsole: "钉钉开放平台",
dingtalkCardTemplateId: "卡片模板 ID可选",
dingtalkCardTemplateIdHint: "在 open-dev.dingtalk.com/fe/card 创建 AI 卡片模板,启用后支持打字机效果的流式输出",
modeHint: "推荐使用 WebSocket 方式接入,配置更简单",
consoleTip: "前往获取凭证信息",
fileKnowledgeBase: "文件保存知识库",

View File

@@ -1143,6 +1143,7 @@ func registerIMAdapterFactories(imService *imPkg.Service) {
clientID := getString(creds, "client_id")
clientSecret := getString(creds, "client_secret")
cardTemplateID := getString(creds, "card_template_id")
mode := channel.Mode
if mode == "" {
@@ -1151,7 +1152,7 @@ func registerIMAdapterFactories(imService *imPkg.Service) {
switch mode {
case "webhook":
adapter := dingtalk.NewWebhookAdapter(clientID, clientSecret)
adapter := dingtalk.NewWebhookAdapter(clientID, clientSecret, cardTemplateID)
return adapter, nil, nil
case "websocket":
@@ -1164,7 +1165,7 @@ func registerIMAdapterFactories(imService *imPkg.Service) {
}
}()
adapter := dingtalk.NewAdapter(client, clientID, clientSecret)
adapter := dingtalk.NewAdapter(client, clientID, clientSecret, cardTemplateID)
return adapter, wsCancel, nil
default:

View File

@@ -16,6 +16,7 @@ import (
"time"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/Tencent/WeKnora/internal/im"
"github.com/Tencent/WeKnora/internal/logger"
@@ -24,6 +25,12 @@ import (
// httpClient is a shared HTTP client with a reasonable timeout for DingTalk API calls.
var httpClient = &http.Client{Timeout: 15 * time.Second}
// minCardUpdateInterval is the minimum time between consecutive card streaming updates.
const minCardUpdateInterval = 500 * time.Millisecond
// dingtalkConvTypeGroup is the DingTalk conversation type value for group chats.
const dingtalkConvTypeGroup = "2"
// Compile-time checks.
var (
_ im.Adapter = (*Adapter)(nil)
@@ -32,9 +39,10 @@ var (
// Adapter implements im.Adapter for DingTalk.
type Adapter struct {
clientID string
clientSecret string
client *LongConnClient // nil for webhook mode
clientID string
clientSecret string
cardTemplateID string // optional: enables AI card streaming when set
client *LongConnClient // nil for webhook mode
// accessToken cache
tokenMu sync.RWMutex
@@ -43,19 +51,23 @@ type Adapter struct {
}
// NewWebhookAdapter creates a DingTalk adapter for HTTP callback mode.
func NewWebhookAdapter(clientID, clientSecret string) *Adapter {
func NewWebhookAdapter(clientID, clientSecret, cardTemplateID string) *Adapter {
startStreamReaper()
return &Adapter{
clientID: clientID,
clientSecret: clientSecret,
clientID: clientID,
clientSecret: clientSecret,
cardTemplateID: cardTemplateID,
}
}
// NewAdapter creates a DingTalk adapter backed by a stream client.
func NewAdapter(client *LongConnClient, clientID, clientSecret string) *Adapter {
func NewAdapter(client *LongConnClient, clientID, clientSecret, cardTemplateID string) *Adapter {
startStreamReaper()
return &Adapter{
clientID: clientID,
clientSecret: clientSecret,
client: client,
clientID: clientID,
clientSecret: clientSecret,
cardTemplateID: cardTemplateID,
client: client,
}
}
@@ -64,7 +76,7 @@ func (a *Adapter) Platform() im.Platform {
}
func (a *Adapter) HandleURLVerification(c *gin.Context) bool {
return false // DingTalk does not use URL verification challenges.
return false
}
// VerifyCallback verifies the DingTalk webhook signature (HmacSHA256).
@@ -79,7 +91,6 @@ func (a *Adapter) VerifyCallback(c *gin.Context) error {
return fmt.Errorf("missing timestamp or sign header")
}
// Check timestamp is within 1 hour
ts, err := strconv.ParseInt(timestamp, 10, 64)
if err != nil {
return fmt.Errorf("invalid timestamp: %w", err)
@@ -89,7 +100,6 @@ func (a *Adapter) VerifyCallback(c *gin.Context) error {
return fmt.Errorf("timestamp expired")
}
// Verify HMAC-SHA256 signature
stringToSign := timestamp + "\n" + a.clientSecret
h := hmac.New(sha256.New, []byte(a.clientSecret))
h.Write([]byte(stringToSign))
@@ -104,19 +114,19 @@ func (a *Adapter) VerifyCallback(c *gin.Context) error {
// DingTalk callback message structure.
type callbackMessage struct {
ConversationID string `json:"conversationId"`
ConversationType string `json:"conversationType"` // "1" = single, "2" = group
MsgID string `json:"msgId"`
Msgtype string `json:"msgtype"` // "text", "picture", "richText", "file"
Text *textContent `json:"text"`
SenderNick string `json:"senderNick"`
SenderStaffId string `json:"senderStaffId"`
SenderID string `json:"senderId"`
SessionWebhook string `json:"sessionWebhook"`
RobotCode string `json:"robotCode"`
AtUsers []atUser `json:"atUsers"`
IsInAtList bool `json:"isInAtList"`
ChatbotCorpId string `json:"chatbotCorpId"`
ConversationID string `json:"conversationId"`
ConversationType string `json:"conversationType"`
MsgID string `json:"msgId"`
Msgtype string `json:"msgtype"`
Text *textContent `json:"text"`
SenderNick string `json:"senderNick"`
SenderStaffId string `json:"senderStaffId"`
SenderID string `json:"senderId"`
SessionWebhook string `json:"sessionWebhook"`
RobotCode string `json:"robotCode"`
AtUsers []atUser `json:"atUsers"`
IsInAtList bool `json:"isInAtList"`
ChatbotCorpId string `json:"chatbotCorpId"`
}
type textContent struct {
@@ -146,7 +156,7 @@ func (a *Adapter) ParseCallback(c *gin.Context) (*im.IncomingMessage, error) {
func parseCallbackMessage(msg *callbackMessage) *im.IncomingMessage {
chatType := im.ChatTypeDirect
chatID := ""
if msg.ConversationType == "2" {
if msg.ConversationType == dingtalkConvTypeGroup {
chatType = im.ChatTypeGroup
chatID = msg.ConversationID
}
@@ -161,7 +171,7 @@ func parseCallbackMessage(msg *callbackMessage) *im.IncomingMessage {
content = strings.TrimSpace(msg.Text.Content)
}
incoming := &im.IncomingMessage{
return &im.IncomingMessage{
Platform: im.PlatformDingtalk,
UserID: userID,
UserName: msg.SenderNick,
@@ -174,25 +184,22 @@ func parseCallbackMessage(msg *callbackMessage) *im.IncomingMessage {
"session_webhook": msg.SessionWebhook,
},
}
return incoming
}
// ── Send reply ──
func (a *Adapter) SendReply(ctx context.Context, incoming *im.IncomingMessage, reply *im.ReplyMessage) error {
// Prefer sessionWebhook for simplicity
content := transformThinkBlocks(reply.Content)
sessionWebhook := ""
if incoming.Extra != nil {
sessionWebhook = incoming.Extra["session_webhook"]
}
if sessionWebhook != "" {
return a.replyViaSessionWebhook(ctx, sessionWebhook, reply.Content)
return a.replyViaSessionWebhook(ctx, sessionWebhook, content)
}
// Fallback to OpenAPI
return a.replyViaOpenAPI(ctx, incoming, reply.Content)
return a.replyViaOpenAPI(ctx, incoming, content)
}
func (a *Adapter) replyViaSessionWebhook(ctx context.Context, webhookURL, content string) error {
@@ -292,7 +299,6 @@ func (a *Adapter) getAccessToken(ctx context.Context) (string, error) {
a.tokenMu.Lock()
defer a.tokenMu.Unlock()
// Double check after acquiring write lock
if a.token != "" && time.Now().Before(a.tokenExpAt) {
return a.token, nil
}
@@ -334,27 +340,155 @@ func (a *Adapter) getAccessToken(ctx context.Context) (string, error) {
}
a.token = result.AccessToken
// Refresh 5 minutes before expiry
a.tokenExpAt = time.Now().Add(time.Duration(result.ExpireIn)*time.Second - 5*time.Minute)
return a.token, nil
}
// ── DingTalk OpenAPI helpers for AI Card ──
func (a *Adapter) dingtalkAPI(ctx context.Context, method, path string, body interface{}) (json.RawMessage, error) {
token, err := a.getAccessToken(ctx)
if err != nil {
return nil, fmt.Errorf("get access token: %w", err)
}
jsonBody, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("marshal body: %w", err)
}
url := "https://api.dingtalk.com" + path
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewReader(jsonBody))
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("x-acs-dingtalk-access-token", token)
resp, err := httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("do request: %w", err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("dingtalk API %s returned %d: %s", path, resp.StatusCode, string(respBody))
}
return respBody, nil
}
// createAndDeliverCard creates an AI card and delivers it to the conversation.
func (a *Adapter) createAndDeliverCard(ctx context.Context, incoming *im.IncomingMessage) (string, error) {
outTrackID := uuid.New().String()
body := map[string]interface{}{
"cardTemplateId": a.cardTemplateID,
"outTrackId": outTrackID,
"callbackType": "STREAM",
"cardData": map[string]interface{}{
"cardParamMap": map[string]string{
"content": "",
},
},
"userIdType": 1,
}
if incoming.ChatType == im.ChatTypeGroup {
// Group chat
convID := incoming.ChatID
body["openSpaceId"] = "dtv1.card//IM_GROUP." + convID
body["imGroupOpenSpaceModel"] = map[string]interface{}{"supportForward": true}
body["imGroupOpenDeliverModel"] = map[string]interface{}{
"robotCode": a.clientID,
"extension": map[string]string{},
}
} else {
// Single chat (1:1 DM)
body["openSpaceId"] = "dtv1.card//IM_ROBOT." + incoming.UserID
body["imRobotOpenSpaceModel"] = map[string]interface{}{"supportForward": true}
body["imRobotOpenDeliverModel"] = map[string]interface{}{
"robotCode": a.clientID,
"spaceType": "IM_ROBOT",
"extension": map[string]string{},
}
}
_, err := a.dingtalkAPI(ctx, http.MethodPost, "/v1.0/card/instances/createAndDeliver", body)
if err != nil {
return "", fmt.Errorf("create card: %w", err)
}
return outTrackID, nil
}
// streamingUpdateCard pushes content to an existing AI card.
func (a *Adapter) streamingUpdateCard(ctx context.Context, outTrackID, content string, isFinalize bool) error {
body := map[string]interface{}{
"outTrackId": outTrackID,
"guid": uuid.New().String(),
"key": "content",
"content": content,
"isFull": true,
"isFinalize": isFinalize,
"isError": false,
}
_, err := a.dingtalkAPI(ctx, http.MethodPut, "/v1.0/card/streaming", body)
return err
}
// ── StreamSender implementation ──
// DingTalk does not support editing messages in place. We accumulate content
// and send the full reply at EndStream via sessionWebhook.
type streamState struct {
mu sync.Mutex
content strings.Builder
sessionWebhook string
outTrackID string // non-empty when using AI card streaming
lastUpdate time.Time // for card update throttling
createdAt time.Time // for orphan stream detection
}
var (
streamsMu sync.Mutex
dStreams = map[string]*streamState{}
const (
streamOrphanTTL = 5 * time.Minute
streamReaperInterval = 1 * time.Minute
)
var (
streamsMu sync.Mutex
dStreams = map[string]*streamState{}
startReaperOnce sync.Once
reaperStopCh = make(chan struct{})
)
// startStreamReaper starts a background goroutine (once) that periodically
// removes orphaned stream entries. This prevents memory leaks when EndStream
// is never called due to panics or pipeline errors.
func startStreamReaper() {
startReaperOnce.Do(func() {
go func() {
ticker := time.NewTicker(streamReaperInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cutoff := time.Now().Add(-streamOrphanTTL)
streamsMu.Lock()
for id, state := range dStreams {
if state.createdAt.Before(cutoff) {
delete(dStreams, id)
}
}
streamsMu.Unlock()
case <-reaperStopCh:
return
}
}
}()
})
}
func (a *Adapter) StartStream(ctx context.Context, incoming *im.IncomingMessage) (string, error) {
sessionWebhook := ""
if incoming.Extra != nil {
@@ -363,13 +497,26 @@ func (a *Adapter) StartStream(ctx context.Context, incoming *im.IncomingMessage)
streamID := fmt.Sprintf("dt:%s:%s", incoming.UserID, incoming.MessageID)
streamsMu.Lock()
dStreams[streamID] = &streamState{
state := &streamState{
sessionWebhook: sessionWebhook,
createdAt: time.Now(),
}
// If card template is configured, create an AI card for streaming
if a.cardTemplateID != "" {
outTrackID, err := a.createAndDeliverCard(ctx, incoming)
if err != nil {
logger.Warnf(ctx, "[DingTalk] Failed to create AI card, falling back to sessionWebhook: %v", err)
} else {
state.outTrackID = outTrackID
}
}
streamsMu.Lock()
dStreams[streamID] = state
streamsMu.Unlock()
logger.Infof(ctx, "[DingTalk] Streaming started: stream_id=%s", streamID)
logger.Infof(ctx, "[DingTalk] Streaming started: stream_id=%s, card=%v", streamID, state.outTrackID != "")
return streamID, nil
}
@@ -387,8 +534,28 @@ func (a *Adapter) SendStreamChunk(ctx context.Context, incoming *im.IncomingMess
state.mu.Lock()
state.content.WriteString(content)
// No card template → just accumulate, send at EndStream
if state.outTrackID == "" {
state.mu.Unlock()
return nil
}
// Throttle card updates
if time.Since(state.lastUpdate) < minCardUpdateInterval {
state.mu.Unlock()
return nil
}
fullContent := transformThinkBlocks(state.content.String())
state.lastUpdate = time.Now()
outTrackID := state.outTrackID
state.mu.Unlock()
if err := a.streamingUpdateCard(ctx, outTrackID, fullContent, false); err != nil {
logger.Warnf(ctx, "[DingTalk] Failed to update card stream: %v", err)
}
return nil
}
@@ -403,15 +570,20 @@ func (a *Adapter) EndStream(ctx context.Context, incoming *im.IncomingMessage, s
}
state.mu.Lock()
fullContent := state.content.String()
fullContent := transformThinkBlocks(state.content.String())
outTrackID := state.outTrackID
state.mu.Unlock()
if state.sessionWebhook != "" {
if outTrackID != "" {
// Finalize AI card
if err := a.streamingUpdateCard(ctx, outTrackID, fullContent, true); err != nil {
logger.Warnf(ctx, "[DingTalk] Failed to finalize card stream: %v", err)
}
} else if state.sessionWebhook != "" {
if err := a.replyViaSessionWebhook(ctx, state.sessionWebhook, fullContent); err != nil {
logger.Warnf(ctx, "[DingTalk] Failed to end stream: %v", err)
}
} else {
// Fallback to OpenAPI
if err := a.replyViaOpenAPI(ctx, incoming, fullContent); err != nil {
logger.Warnf(ctx, "[DingTalk] Failed to end stream via OpenAPI: %v", err)
}
@@ -420,3 +592,7 @@ func (a *Adapter) EndStream(ctx context.Context, incoming *im.IncomingMessage, s
logger.Infof(ctx, "[DingTalk] Streaming ended: stream_id=%s", streamID)
return nil
}
func transformThinkBlocks(content string) string {
return im.TransformThinkBlocks(content, im.MarkdownThinkStyle)
}

View File

@@ -63,7 +63,7 @@ func (c *LongConnClient) Start(ctx context.Context) error {
func (c *LongConnClient) onChatBotMessage(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) {
chatType := im.ChatTypeDirect
chatID := ""
if data.ConversationType == "2" {
if data.ConversationType == dingtalkConvTypeGroup {
chatType = im.ChatTypeGroup
chatID = data.ConversationId
}

View File

@@ -666,75 +666,8 @@ func (a *Adapter) SendStreamChunk(ctx context.Context, incoming *im.IncomingMess
return a.cardkitUpdateElement(ctx, accessToken, streamID, streamingElementID, fullContent, seq)
}
// transformThinkBlocks converts <think>...</think> blocks into Feishu-compatible
// markdown blockquotes. Handles both complete blocks and in-progress blocks
// (where </think> has not yet arrived during streaming).
//
// Output format (matching the OpenClaw Feishu convention):
//
// > 💭 **思考过程**
// > thinking line 1
// > thinking line 2
//
// ---
//
// answer text
func transformThinkBlocks(content string) string {
const (
openTag = "<think>"
closeTag = "</think>"
)
openIdx := strings.Index(content, openTag)
if openIdx < 0 {
return content
}
before := content[:openIdx]
after := content[openIdx+len(openTag):]
closeIdx := strings.Index(after, closeTag)
thinkClosed := closeIdx >= 0
var thinkContent, rest string
if thinkClosed {
thinkContent = after[:closeIdx]
rest = after[closeIdx+len(closeTag):]
} else {
thinkContent = after
}
thinkContent = strings.TrimSpace(thinkContent)
var result strings.Builder
result.WriteString(before)
if thinkContent == "" {
if !thinkClosed {
result.WriteString("> 💭 **思考中...**\n")
return result.String()
}
result.WriteString(strings.TrimLeft(rest, "\n"))
return result.String()
}
// Render each line as a blockquote
result.WriteString("> 💭 **思考过程**\n")
for _, line := range strings.Split(thinkContent, "\n") {
result.WriteString("> ")
result.WriteString(line)
result.WriteString("\n")
}
if thinkClosed {
rest = strings.TrimLeft(rest, "\n")
if rest != "" {
result.WriteString("\n---\n\n")
result.WriteString(rest)
}
}
return result.String()
return im.TransformThinkBlocks(content, im.MarkdownThinkStyle)
}
// EndStream disables streaming_mode and cleans up state.

View File

@@ -34,6 +34,7 @@ type Adapter struct {
// NewWebhookAdapter creates a Telegram adapter for webhook mode.
func NewWebhookAdapter(botToken, secretToken string) *Adapter {
startStreamReaper()
return &Adapter{
botToken: botToken,
secretToken: secretToken,
@@ -42,6 +43,7 @@ func NewWebhookAdapter(botToken, secretToken string) *Adapter {
// NewAdapter creates a Telegram adapter backed by a long-polling client.
func NewAdapter(client *LongConnClient, botToken string) *Adapter {
startStreamReaper()
return &Adapter{
botToken: botToken,
client: client,
@@ -204,7 +206,7 @@ func resolveChatID(incoming *im.IncomingMessage) string {
// ── Send reply ──
func (a *Adapter) SendReply(ctx context.Context, incoming *im.IncomingMessage, reply *im.ReplyMessage) error {
return a.sendMessage(ctx, resolveChatID(incoming), reply.Content, "")
return a.sendMessage(ctx, resolveChatID(incoming), transformThinkBlocks(reply.Content), "")
}
func (a *Adapter) sendMessage(ctx context.Context, chatID, text, replyToMessageID string) error {
@@ -220,12 +222,14 @@ func (a *Adapter) sendMessage(ctx context.Context, chatID, text, replyToMessageI
return a.callAPI(ctx, "sendMessage", body)
}
func (a *Adapter) editMessage(ctx context.Context, chatID, messageID, text string) error {
func (a *Adapter) editMessage(ctx context.Context, chatID, messageID, text, parseMode string) error {
body := map[string]interface{}{
"chat_id": chatID,
"message_id": json.Number(messageID),
"text": text,
"parse_mode": "Markdown",
}
if parseMode != "" {
body["parse_mode"] = parseMode
}
return a.callAPI(ctx, "editMessageText", body)
}
@@ -279,18 +283,58 @@ func (a *Adapter) callAPIWithResult(ctx context.Context, method string, body int
// ── StreamSender implementation (edit message in-place) ──
// minEditInterval is the minimum time between consecutive editMessageText calls
// to avoid hitting Telegram's rate limit (~30 msg/sec global, ~20 edit/min per chat).
const minEditInterval = 500 * time.Millisecond
type streamState struct {
mu sync.Mutex
content strings.Builder
msgID string // Telegram message ID of the "thinking" message
chatID string
mu sync.Mutex
content strings.Builder
msgID string // Telegram message ID of the "thinking" message
chatID string
lastEdit time.Time // last successful editMessageText timestamp
createdAt time.Time // for orphan stream detection
}
var (
streamsMu sync.Mutex
streams = map[string]*streamState{}
const (
streamOrphanTTL = 5 * time.Minute
streamReaperInterval = 1 * time.Minute
)
var (
streamsMu sync.Mutex
streams = map[string]*streamState{}
startReaperOnce sync.Once
reaperStopCh = make(chan struct{})
)
// startStreamReaper starts a background goroutine (once) that periodically
// removes orphaned stream entries. This prevents memory leaks when EndStream
// is never called due to panics or pipeline errors.
func startStreamReaper() {
startReaperOnce.Do(func() {
go func() {
ticker := time.NewTicker(streamReaperInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
cutoff := time.Now().Add(-streamOrphanTTL)
streamsMu.Lock()
for id, state := range streams {
if state.createdAt.Before(cutoff) {
delete(streams, id)
}
}
streamsMu.Unlock()
case <-reaperStopCh:
return
}
}
}()
})
}
func (a *Adapter) StartStream(ctx context.Context, incoming *im.IncomingMessage) (string, error) {
chatID := resolveChatID(incoming)
@@ -311,8 +355,9 @@ func (a *Adapter) StartStream(ctx context.Context, incoming *im.IncomingMessage)
streamsMu.Lock()
streams[streamID] = &streamState{
msgID: msgID,
chatID: chatID,
msgID: msgID,
chatID: chatID,
createdAt: time.Now(),
}
streamsMu.Unlock()
@@ -334,11 +379,18 @@ func (a *Adapter) SendStreamChunk(ctx context.Context, incoming *im.IncomingMess
state.mu.Lock()
state.content.WriteString(content)
fullContent := state.content.String()
// Throttle: skip edit if last edit was too recent
if time.Since(state.lastEdit) < minEditInterval {
state.mu.Unlock()
return nil
}
fullContent := transformThinkBlocks(state.content.String())
state.lastEdit = time.Now()
state.mu.Unlock()
if err := a.editMessage(ctx, state.chatID, state.msgID, fullContent); err != nil {
// Telegram has rate limits, just log
if err := a.editMessage(ctx, state.chatID, state.msgID, fullContent, ""); err != nil {
logger.Warnf(ctx, "[Telegram] Failed to update stream chunk: %v", err)
}
@@ -356,10 +408,10 @@ func (a *Adapter) EndStream(ctx context.Context, incoming *im.IncomingMessage, s
}
state.mu.Lock()
fullContent := state.content.String()
fullContent := transformThinkBlocks(state.content.String())
state.mu.Unlock()
if err := a.editMessage(ctx, state.chatID, state.msgID, fullContent); err != nil {
if err := a.editMessage(ctx, state.chatID, state.msgID, fullContent, "Markdown"); err != nil {
logger.Warnf(ctx, "[Telegram] Failed to end stream: %v", err)
}
@@ -367,6 +419,10 @@ func (a *Adapter) EndStream(ctx context.Context, incoming *im.IncomingMessage, s
return nil
}
func transformThinkBlocks(content string) string {
return im.TransformThinkBlocks(content, im.TelegramThinkStyle)
}
// ── FileDownloader implementation ──
func (a *Adapter) DownloadFile(ctx context.Context, msg *im.IncomingMessage) (io.ReadCloser, string, error) {

99
internal/im/think.go Normal file
View File

@@ -0,0 +1,99 @@
package im
import "strings"
// ThinkBlockStyle controls how <think>...</think> blocks are rendered.
type ThinkBlockStyle struct {
// ThinkingHeader is shown when the think block is still in-progress (no closing tag yet).
ThinkingHeader string // e.g. "> 💭 **思考中...**\n" or "💭 _思考中..._\n"
// ThoughtHeader is shown before the think block content.
ThoughtHeader string // e.g. "> 💭 **思考过程**\n" or "💭 *思考过程*\n"
// LinePrefix is prepended to each line of think content.
LinePrefix string // e.g. "> " or "> _"
// LineSuffix is appended to each line of think content (before newline).
LineSuffix string // e.g. "" or "_"
// Separator is inserted between the think block and the rest of the content.
Separator string // e.g. "\n---\n\n" or "\n"
}
// MarkdownThinkStyle renders think blocks as markdown blockquotes.
// Used by DingTalk and Feishu.
var MarkdownThinkStyle = ThinkBlockStyle{
ThinkingHeader: "> 💭 **思考中...**\n",
ThoughtHeader: "> 💭 **思考过程**\n",
LinePrefix: "> ",
LineSuffix: "",
Separator: "\n---\n\n",
}
// TelegramThinkStyle renders think blocks as blockquotes for Telegram.
// Uses the same blockquote format as other platforms for reliable rendering
// during streaming (where incomplete markdown can cause API failures).
var TelegramThinkStyle = ThinkBlockStyle{
ThinkingHeader: "> 💭 *思考中...*\n",
ThoughtHeader: "> 💭 *思考过程*\n",
LinePrefix: "> ",
LineSuffix: "",
Separator: "\n---\n\n",
}
// TransformThinkBlocks converts <think>...</think> blocks using the given style.
// Handles both complete blocks and in-progress blocks (where </think> has not
// yet arrived during streaming).
func TransformThinkBlocks(content string, style ThinkBlockStyle) string {
const (
openTag = "<think>"
closeTag = "</think>"
)
openIdx := strings.Index(content, openTag)
if openIdx < 0 {
return content
}
before := content[:openIdx]
after := content[openIdx+len(openTag):]
closeIdx := strings.Index(after, closeTag)
thinkClosed := closeIdx >= 0
var thinkContent, rest string
if thinkClosed {
thinkContent = after[:closeIdx]
rest = after[closeIdx+len(closeTag):]
} else {
thinkContent = after
}
thinkContent = strings.TrimSpace(thinkContent)
var result strings.Builder
result.WriteString(before)
if thinkContent == "" {
if !thinkClosed {
result.WriteString(style.ThinkingHeader)
return result.String()
}
result.WriteString(strings.TrimLeft(rest, "\n"))
return result.String()
}
result.WriteString(style.ThoughtHeader)
for _, line := range strings.Split(thinkContent, "\n") {
result.WriteString(style.LinePrefix)
result.WriteString(line)
result.WriteString(style.LineSuffix)
result.WriteString("\n")
}
if thinkClosed {
rest = strings.TrimLeft(rest, "\n")
if rest != "" {
result.WriteString(style.Separator)
result.WriteString(rest)
}
}
return result.String()
}