feat(observability): integrate Langfuse for LLM token tracking and tracing

Closes #620 #497. Add opt-in Langfuse observability covering all five
model types (chat, embedding, rerank, VLM, ASR) with HTTP-request-scoped
traces and Docker Compose support (both cloud and self-hosted).

Core package internal/tracing/langfuse:
- HTTP client with batched async ingestion (non-blocking in request path)
- Sampling, environment / release tagging, and graceful fallback when
  LANGFUSE_* env vars are absent (wrappers become no-ops)
- Gin middleware opens one trace per traced request and finishes it after
  the handler chain returns, attaching method / path / user / session
- Trace context is stored under a typed key exported from internal/types
  so logger.CloneContext can preserve it across handler / goroutine
  boundaries (otherwise each LLM call auto-created an orphan trace,
  fragmenting one request into many)

Per-model generation wrappers (opt-in via NewChat/NewEmbedder/...):
- chat: captures prompt, streaming output, token usage + TTFT
- embedding: approximates tokens when the provider omits usage
- rerank: previews query/docs, summarizes results to keep payload small
- vlm: records image count and total bytes, never uploads raw pixels
- asr: records file size and audio duration, never uploads audio bytes

Async title generation (GenerateTitleAsync) now forwards the trace key
into the goroutine so title calls appear under the parent chat trace.

Docker Compose:
- LANGFUSE_* env passthrough on the `app` service for cloud deployments
- Optional `langfuse` profile spins up a self-hosted Langfuse stack that
  reuses WeKnora's existing PostgreSQL (separate database via an idempotent
  init container that fixes ICU collation drift) and Redis (separate DB
  number), adding only ClickHouse, MinIO, web and worker containers
- web/worker entrypoints URL-encode DB_PASSWORD / REDIS_PASSWORD at start
  to avoid Prisma P1013 when passwords contain @ / # / etc.

Docs: docs/Langfuse集成.md covers cloud vs self-hosted, per-model usage
strategy, code map, and resource footprint.
This commit is contained in:
wizardchen
2026-04-24 01:57:12 +08:00
committed by lyingbug
parent b8c45b173e
commit 492e92580b
29 changed files with 2548 additions and 23 deletions

View File

@@ -18,6 +18,88 @@ GIN_MODE=release
# 可选值true自动放在 LOG_PATH 同目录下 llm_debug.log、false/空(关闭)、或指定文件路径
# LLM_DEBUG_LOG=true
# ========== Langfuse 可观测性(可选) ==========
# 用于追踪 chat / embedding / rerank / VLM / ASR 模型调用,统计 token 消耗。
# 详细说明docs/Langfuse集成.md
#
# 方案 A接入 Langfuse Cloud最简单
# 1) 登录 https://cloud.langfuse.com 生成 API Key
# 2) 填入下方 PUBLIC_KEY / SECRET_KEY
# 3) docker compose up -d app
#
# 方案 B自建 Langfuse局域网/内网环境)
# 1) 启动自建栈docker compose --profile langfuse up -d
# 首次启动后 ClickHouse 迁移约 1-2 分钟,耐心等待 langfuse-web 健康
# 2) 浏览器打开 http://localhost:3000 注册管理员账号并生成 API Key
# 3) 取消注释下方 LANGFUSE_HOST=http://langfuse-web:3000
# 填入刚生成的 PUBLIC_KEY / SECRET_KEY
# 4) docker compose up -d app
#
# 只要同时设置了 PUBLIC_KEY + SECRET_KEY 就会自动启用,无需显式开关。
LANGFUSE_PUBLIC_KEY=pk-lf-xxxxxxxx
LANGFUSE_SECRET_KEY=sk-lf-xxxxxxxx
LANGFUSE_HOST=http://langfuse-web:3000
# 自建模式下改成LANGFUSE_HOST=http://langfuse-web:3000
#
# 可选显式开关true/false默认根据 key 自动判断)
# LANGFUSE_ENABLED=true
# 可选:版本 / 环境标签,便于在 Langfuse UI 过滤
# LANGFUSE_RELEASE=v0.4.2
# LANGFUSE_ENVIRONMENT=production
# 可选:批量上报与采样策略(生产高流量建议调大 FLUSH_AT、降低 SAMPLE_RATE
# LANGFUSE_FLUSH_AT=15
# LANGFUSE_FLUSH_INTERVAL=3s
# LANGFUSE_QUEUE_SIZE=2048
# LANGFUSE_REQUEST_TIMEOUT=10s
# LANGFUSE_SAMPLE_RATE=1.0
# LANGFUSE_DEBUG=false
# ========== Langfuse 自建栈配置(仅在使用 --profile langfuse 时需要) ==========
# 设计说明为了最小化资源占用Langfuse 自建栈会**复用** WeKnora 已有的
# - postgres创建独立的 "langfuse" 数据库(由 langfuse-db-init 容器一次性创建)
# - redis :使用独立的 Redis DB 号(默认 1WeKnora 用 0
# 真正新增的只有 3 个常驻容器langfuse-web、langfuse-worker、langfuse-clickhouse
# + 1 个专用 S3langfuse-minio
# + 1 个一次性 initlangfuse-db-init
#
# Langfuse Web UI 对外端口
# LANGFUSE_WEB_PORT=3000
#
# Langfuse 专用 MinIO 端口(避免和 WeKnora 主 MinIO 9000/9001 冲突)
# LANGFUSE_MINIO_S3_PORT=9100
# LANGFUSE_MINIO_CONSOLE_PORT=9101
#
# 在 WeKnora-postgres 中创建的 Langfuse 库名
# LANGFUSE_DB_NAME=langfuse
#
# 在 WeKnora-redis 中使用的 DB 号1~15不要和 WeKnora 的 0 冲突)
# LANGFUSE_REDIS_DB=1
#
# ClickHouse / Langfuse 专用 MinIO 凭证(生产请务必修改)
# LANGFUSE_CLICKHOUSE_USER=clickhouse
# LANGFUSE_CLICKHOUSE_PASSWORD=clickhouse
# LANGFUSE_MINIO_USER=langfuseminio
# LANGFUSE_MINIO_PASSWORD=langfuseminiosecret
#
# Langfuse 核心安全字段,生产必须重新生成:
# LANGFUSE_SALT=$(openssl rand -base64 32)
# LANGFUSE_ENCRYPTION_KEY=$(openssl rand -hex 32)
# LANGFUSE_NEXTAUTH_SECRET=$(openssl rand -base64 32)
# LANGFUSE_SALT=
# LANGFUSE_ENCRYPTION_KEY=
# LANGFUSE_NEXTAUTH_SECRET=
# LANGFUSE_NEXTAUTH_URL=http://localhost:3000
# LANGFUSE_TELEMETRY_ENABLED=false
#
# 可选:自动化首次启动(填写后直接注入管理员+项目,跳过 UI 注册)
# LANGFUSE_INIT_ORG_NAME=WeKnora
# LANGFUSE_INIT_PROJECT_NAME=WeKnora
# LANGFUSE_INIT_PROJECT_PUBLIC_KEY=pk-lf-weknora-init
# LANGFUSE_INIT_PROJECT_SECRET_KEY=sk-lf-weknora-init
# LANGFUSE_INIT_USER_EMAIL=admin@example.com
# LANGFUSE_INIT_USER_NAME=Admin
# LANGFUSE_INIT_USER_PASSWORD=change-me-please
# 时区设置,默认为 Asia/Shanghai
# 影响系统时间显示和日志时间戳
# 常用值Asia/Shanghai, Asia/Tokyo, America/New_York, Europe/London, UTC

View File

@@ -44,6 +44,13 @@ WEKNORA_SANDBOX_MODE=disabled
ENABLE_GRAPH_RAG=false
DISABLE_REGISTRATION=false
# === Langfuse 可观测性(可选) ===
# 追踪 chat / embedding / rerank / VLM / ASR 的 prompt、响应与 token 消耗。
# 同时填了 PUBLIC_KEY 和 SECRET_KEY 就会自动启用,详见 docs/Langfuse集成.md。
# LANGFUSE_PUBLIC_KEY=pk-lf-xxxxxxxx
# LANGFUSE_SECRET_KEY=sk-lf-xxxxxxxx
# LANGFUSE_HOST=https://cloud.langfuse.com
# === 性能 ===
CONCURRENCY_POOL_SIZE=3

View File

@@ -198,6 +198,208 @@ services:
- dex
- full
# ---------------------------------------------------------------------------
# Langfuse 自建栈dev 对称版)
#
# 用法:
# docker compose -f docker-compose.dev.yml --profile langfuse up -d
#
# 本地 app (go run) 需要的环境变量:
# export LANGFUSE_HOST=http://localhost:3000
# export LANGFUSE_PUBLIC_KEY=pk-lf-xxx
# export LANGFUSE_SECRET_KEY=sk-lf-xxx
#
# 复用 dev 已有的 postgres独立 langfuse 数据库)和 redisDB 1
# 新增clickhouse、minio、web、worker + 一次性 db-init和生产版结构一致。
# ---------------------------------------------------------------------------
# 复用 dev 已有的 ParadeDB 镜像,不额外拉 postgres 镜像
langfuse-db-init:
image: paradedb/paradedb:v0.22.2-pg17
container_name: WeKnora-langfuse-db-init-dev
depends_on:
postgres:
condition: service_healthy
environment:
PGPASSWORD: ${DB_PASSWORD}
# ${LANGFUSE_DB_NAME:-langfuse} / ${DB_USER} 由 compose 解析成字面量后再传给 shell。
entrypoint: ["sh", "-c"]
command:
- |
set -e
echo "[langfuse-db-init] ensuring database '${LANGFUSE_DB_NAME:-langfuse}' exists..."
# 先刷新现有库的 collation镜像 ICU 2.36 与宿主 2.41 不匹配时必须做),否则 CREATE DATABASE 会失败
psql -h postgres -U ${DB_USER} -d postgres -v ON_ERROR_STOP=0 -c "ALTER DATABASE template1 REFRESH COLLATION VERSION;" >/dev/null 2>&1 || true
psql -h postgres -U ${DB_USER} -d postgres -v ON_ERROR_STOP=0 -c "ALTER DATABASE postgres REFRESH COLLATION VERSION;" >/dev/null 2>&1 || true
# 幂等创建:已存在则跳过;不存在则从 template0 克隆template0 永远不会有 collation 漂移)
if psql -h postgres -U ${DB_USER} -d postgres -tAc "SELECT 1 FROM pg_database WHERE datname='${LANGFUSE_DB_NAME:-langfuse}'" | grep -q 1; then
echo "[langfuse-db-init] database '${LANGFUSE_DB_NAME:-langfuse}' already exists, skipping."
else
psql -h postgres -U ${DB_USER} -d postgres -v ON_ERROR_STOP=1 -c "CREATE DATABASE \"${LANGFUSE_DB_NAME:-langfuse}\" TEMPLATE template0;"
echo "[langfuse-db-init] database '${LANGFUSE_DB_NAME:-langfuse}' created."
fi
echo "[langfuse-db-init] done."
networks:
- WeKnora-network-dev
restart: "no"
profiles:
- langfuse
- full
langfuse-clickhouse:
image: clickhouse/clickhouse-server:24.8
container_name: WeKnora-langfuse-clickhouse-dev
restart: unless-stopped
user: "101:101"
environment:
CLICKHOUSE_DB: default
CLICKHOUSE_USER: ${LANGFUSE_CLICKHOUSE_USER:-clickhouse}
CLICKHOUSE_PASSWORD: ${LANGFUSE_CLICKHOUSE_PASSWORD:-clickhouse}
volumes:
- langfuse_clickhouse_data_dev:/var/lib/clickhouse
- langfuse_clickhouse_logs_dev:/var/log/clickhouse-server
healthcheck:
test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1
interval: 5s
timeout: 5s
retries: 10
start_period: 10s
networks:
- WeKnora-network-dev
profiles:
- langfuse
- full
langfuse-minio:
image: minio/minio:RELEASE.2025-09-07T16-13-09Z
container_name: WeKnora-langfuse-minio-dev
restart: unless-stopped
entrypoint: sh
command: -c 'mkdir -p /data/langfuse && minio server --address ":9000" --console-address ":9001" /data'
environment:
MINIO_ROOT_USER: ${LANGFUSE_MINIO_USER:-langfuseminio}
MINIO_ROOT_PASSWORD: ${LANGFUSE_MINIO_PASSWORD:-langfuseminiosecret}
ports:
- "${LANGFUSE_MINIO_S3_PORT:-9100}:9000"
- "${LANGFUSE_MINIO_CONSOLE_PORT:-9101}:9001"
volumes:
- langfuse_minio_data_dev:/data
healthcheck:
test: ["CMD", "mc", "ready", "local"]
interval: 5s
timeout: 10s
retries: 5
networks:
- WeKnora-network-dev
profiles:
- langfuse
- full
langfuse-worker:
image: langfuse/langfuse-worker:3
container_name: WeKnora-langfuse-worker-dev
restart: unless-stopped
depends_on: &langfuse-dev-depends-on
langfuse-db-init:
condition: service_completed_successfully
redis:
condition: service_started
langfuse-clickhouse:
condition: service_healthy
langfuse-minio:
condition: service_healthy
# wrapper entrypoint 把 DB_PASSWORD / REDIS_PASSWORD URL 编码后再拼 URL
# 避免密码含 '@' / '#' 等字符导致 Prisma P1013 解析失败。
# 注意compose 覆盖 entrypoint 会清空镜像默认 CMD所以末尾写死原始命令。
entrypoint:
- /bin/sh
- -ec
- |
_enc() { node -e 'process.stdout.write(encodeURIComponent(process.argv[1]))' "$$1"; }
DU=$$(_enc "$$_LF_DB_USER")
DP=$$(_enc "$$_LF_DB_PASSWORD")
RP=$$(_enc "$$_LF_REDIS_PASSWORD")
export DATABASE_URL="postgresql://$$DU:$$DP@postgres:5432/$$_LF_DB_NAME"
export REDIS_CONNECTION_STRING="redis://:$$RP@redis:6379/$$_LF_REDIS_DB"
unset _LF_DB_USER _LF_DB_PASSWORD _LF_REDIS_PASSWORD
exec dumb-init -- ./worker/entrypoint.sh node worker/dist/index.js
environment: &langfuse-dev-env
# 原始凭证(未 URL 编码),由 entrypoint wrapper 读取并组装
_LF_DB_USER: ${DB_USER}
_LF_DB_PASSWORD: ${DB_PASSWORD}
_LF_DB_NAME: ${LANGFUSE_DB_NAME:-langfuse}
_LF_REDIS_PASSWORD: ${REDIS_PASSWORD}
_LF_REDIS_DB: ${LANGFUSE_REDIS_DB:-1}
SALT: ${LANGFUSE_SALT:-weknora-langfuse-dev-salt-change-me}
ENCRYPTION_KEY: ${LANGFUSE_ENCRYPTION_KEY:-0000000000000000000000000000000000000000000000000000000000000000}
NEXTAUTH_URL: ${LANGFUSE_NEXTAUTH_URL:-http://localhost:3000}
NEXTAUTH_SECRET: ${LANGFUSE_NEXTAUTH_SECRET:-weknora-langfuse-dev-nextauth-secret-change-me}
TELEMETRY_ENABLED: ${LANGFUSE_TELEMETRY_ENABLED:-false}
LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES: "false"
CLICKHOUSE_URL: http://langfuse-clickhouse:8123
CLICKHOUSE_MIGRATION_URL: clickhouse://langfuse-clickhouse:9000
CLICKHOUSE_USER: ${LANGFUSE_CLICKHOUSE_USER:-clickhouse}
CLICKHOUSE_PASSWORD: ${LANGFUSE_CLICKHOUSE_PASSWORD:-clickhouse}
CLICKHOUSE_CLUSTER_ENABLED: "false"
LANGFUSE_S3_EVENT_UPLOAD_BUCKET: langfuse
LANGFUSE_S3_EVENT_UPLOAD_REGION: auto
LANGFUSE_S3_EVENT_UPLOAD_ACCESS_KEY_ID: ${LANGFUSE_MINIO_USER:-langfuseminio}
LANGFUSE_S3_EVENT_UPLOAD_SECRET_ACCESS_KEY: ${LANGFUSE_MINIO_PASSWORD:-langfuseminiosecret}
LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT: http://langfuse-minio:9000
LANGFUSE_S3_EVENT_UPLOAD_FORCE_PATH_STYLE: "true"
LANGFUSE_S3_EVENT_UPLOAD_PREFIX: events/
LANGFUSE_S3_MEDIA_UPLOAD_BUCKET: langfuse
LANGFUSE_S3_MEDIA_UPLOAD_REGION: auto
LANGFUSE_S3_MEDIA_UPLOAD_ACCESS_KEY_ID: ${LANGFUSE_MINIO_USER:-langfuseminio}
LANGFUSE_S3_MEDIA_UPLOAD_SECRET_ACCESS_KEY: ${LANGFUSE_MINIO_PASSWORD:-langfuseminiosecret}
LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT: ${LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT:-http://localhost:9100}
LANGFUSE_S3_MEDIA_UPLOAD_FORCE_PATH_STYLE: "true"
LANGFUSE_S3_MEDIA_UPLOAD_PREFIX: media/
networks:
- WeKnora-network-dev
profiles:
- langfuse
- full
langfuse-web:
image: langfuse/langfuse:3
container_name: WeKnora-langfuse-web-dev
restart: unless-stopped
depends_on: *langfuse-dev-depends-on
ports:
- "${LANGFUSE_WEB_PORT:-3000}:3000"
entrypoint:
- /bin/sh
- -ec
- |
_enc() { node -e 'process.stdout.write(encodeURIComponent(process.argv[1]))' "$$1"; }
DU=$$(_enc "$$_LF_DB_USER")
DP=$$(_enc "$$_LF_DB_PASSWORD")
RP=$$(_enc "$$_LF_REDIS_PASSWORD")
export DATABASE_URL="postgresql://$$DU:$$DP@postgres:5432/$$_LF_DB_NAME"
export REDIS_CONNECTION_STRING="redis://:$$RP@redis:6379/$$_LF_REDIS_DB"
unset _LF_DB_USER _LF_DB_PASSWORD _LF_REDIS_PASSWORD
if [ -n "$$NEXT_PUBLIC_LANGFUSE_CLOUD_REGION" ]; then
exec dumb-init -- ./web/entrypoint.sh node --import dd-trace/initialize.mjs ./web/server.js --keepAliveTimeout 110000
else
exec dumb-init -- ./web/entrypoint.sh node ./web/server.js --keepAliveTimeout 110000
fi
environment:
<<: *langfuse-dev-env
LANGFUSE_INIT_ORG_ID: ${LANGFUSE_INIT_ORG_ID:-}
LANGFUSE_INIT_ORG_NAME: ${LANGFUSE_INIT_ORG_NAME:-}
LANGFUSE_INIT_PROJECT_ID: ${LANGFUSE_INIT_PROJECT_ID:-}
LANGFUSE_INIT_PROJECT_NAME: ${LANGFUSE_INIT_PROJECT_NAME:-}
LANGFUSE_INIT_PROJECT_PUBLIC_KEY: ${LANGFUSE_INIT_PROJECT_PUBLIC_KEY:-}
LANGFUSE_INIT_PROJECT_SECRET_KEY: ${LANGFUSE_INIT_PROJECT_SECRET_KEY:-}
LANGFUSE_INIT_USER_EMAIL: ${LANGFUSE_INIT_USER_EMAIL:-}
LANGFUSE_INIT_USER_NAME: ${LANGFUSE_INIT_USER_NAME:-}
LANGFUSE_INIT_USER_PASSWORD: ${LANGFUSE_INIT_USER_PASSWORD:-}
networks:
- WeKnora-network-dev
profiles:
- langfuse
- full
networks:
WeKnora-network-dev:
@@ -212,3 +414,6 @@ volumes:
qdrant_data_dev:
milvus_data_dev:
docreader-tmp-dev:
langfuse_clickhouse_data_dev:
langfuse_clickhouse_logs_dev:
langfuse_minio_data_dev:

View File

@@ -72,6 +72,21 @@ services:
- OTEL_METRICS_EXPORTER=none
- OTEL_LOGS_EXPORTER=none
- OTEL_PROPAGATORS=tracecontext,baggage
# ========== Langfuse (optional observability) ==========
# Enabled automatically when LANGFUSE_PUBLIC_KEY + LANGFUSE_SECRET_KEY are set.
# See docs/Langfuse集成.md for the full list of tuning knobs.
- LANGFUSE_ENABLED=${LANGFUSE_ENABLED:-}
- LANGFUSE_HOST=${LANGFUSE_HOST:-https://cloud.langfuse.com}
- LANGFUSE_PUBLIC_KEY=${LANGFUSE_PUBLIC_KEY:-}
- LANGFUSE_SECRET_KEY=${LANGFUSE_SECRET_KEY:-}
- LANGFUSE_RELEASE=${LANGFUSE_RELEASE:-}
- LANGFUSE_ENVIRONMENT=${LANGFUSE_ENVIRONMENT:-}
- LANGFUSE_FLUSH_AT=${LANGFUSE_FLUSH_AT:-}
- LANGFUSE_FLUSH_INTERVAL=${LANGFUSE_FLUSH_INTERVAL:-}
- LANGFUSE_QUEUE_SIZE=${LANGFUSE_QUEUE_SIZE:-}
- LANGFUSE_REQUEST_TIMEOUT=${LANGFUSE_REQUEST_TIMEOUT:-}
- LANGFUSE_SAMPLE_RATE=${LANGFUSE_SAMPLE_RATE:-}
- LANGFUSE_DEBUG=${LANGFUSE_DEBUG:-}
- RETRIEVE_DRIVER=${RETRIEVE_DRIVER:-}
- ELASTICSEARCH_ADDR=${ELASTICSEARCH_ADDR:-}
- ELASTICSEARCH_USERNAME=${ELASTICSEARCH_USERNAME:-}
@@ -359,6 +374,225 @@ services:
- dex
- full
# ---------------------------------------------------------------------------
# Langfuse 自建可观测栈 (opt-in via `--profile langfuse`)
#
# 为了最小化资源占用Langfuse 会复用 WeKnora 已有的 postgres 和 redis
# - postgres在 WeKnora-postgres 中创建独立的 "langfuse" 数据库
# - redis :复用 WeKnora-redis 的 DB 1WeKnora 自己用 DB 0
# 需要新增的只有 Langfuse 应用自身 + ClickHouseOLAP+ 专用 MinIO事件/媒体 S3
#
# 启用方式:
# docker compose --profile langfuse up -d
# 启用后:
# 1. Langfuse UI: http://localhost:3000 (首次访问注册管理员并在 Settings → API Keys 生成 key
# 2. 在 .env 里把 LANGFUSE_HOST 改成 http://langfuse-web:3000容器间通信
# 并填入刚生成的 LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEY
# 3. 重启 app 容器docker compose up -d app
# 相关默认密码已写成占位符,生产部署务必用 openssl rand 重新生成!
# ---------------------------------------------------------------------------
# 一次性初始化脚本:在已有的 WeKnora-postgres 里创建 langfuse 数据库(幂等,数据库已存在则跳过)
# 复用 WeKnora-postgres 的 ParadeDB 镜像,避免多拉一个 postgres 镜像
langfuse-db-init:
image: paradedb/paradedb:v0.22.2-pg17
container_name: WeKnora-langfuse-db-init
depends_on:
postgres:
condition: service_healthy
environment:
PGPASSWORD: ${DB_PASSWORD}
# ${LANGFUSE_DB_NAME:-langfuse} / ${DB_USER} 由 compose 解析成字面量后再传给 shell
# 脚本中需要 shell 自己展开的变量(无)一律用 $$ 转义。
entrypoint: ["sh", "-c"]
command:
- |
set -e
echo "[langfuse-db-init] ensuring database '${LANGFUSE_DB_NAME:-langfuse}' exists in WeKnora-postgres..."
# 先刷新现有库的 collation镜像 ICU 2.36 与宿主 2.41 不匹配时必须做),否则 CREATE DATABASE 会失败
psql -h postgres -U ${DB_USER} -d postgres -v ON_ERROR_STOP=0 -c "ALTER DATABASE template1 REFRESH COLLATION VERSION;" >/dev/null 2>&1 || true
psql -h postgres -U ${DB_USER} -d postgres -v ON_ERROR_STOP=0 -c "ALTER DATABASE postgres REFRESH COLLATION VERSION;" >/dev/null 2>&1 || true
# 幂等创建:已存在则跳过;不存在则从 template0 克隆template0 永远不会有 collation 漂移)
if psql -h postgres -U ${DB_USER} -d postgres -tAc "SELECT 1 FROM pg_database WHERE datname='${LANGFUSE_DB_NAME:-langfuse}'" | grep -q 1; then
echo "[langfuse-db-init] database '${LANGFUSE_DB_NAME:-langfuse}' already exists, skipping."
else
psql -h postgres -U ${DB_USER} -d postgres -v ON_ERROR_STOP=1 -c "CREATE DATABASE \"${LANGFUSE_DB_NAME:-langfuse}\" TEMPLATE template0;"
echo "[langfuse-db-init] database '${LANGFUSE_DB_NAME:-langfuse}' created."
fi
echo "[langfuse-db-init] done."
networks:
- WeKnora-network
restart: "no"
profiles:
- langfuse
- full
langfuse-clickhouse:
image: clickhouse/clickhouse-server:24.8
container_name: WeKnora-langfuse-clickhouse
restart: unless-stopped
user: "101:101"
environment:
CLICKHOUSE_DB: default
CLICKHOUSE_USER: ${LANGFUSE_CLICKHOUSE_USER:-clickhouse}
CLICKHOUSE_PASSWORD: ${LANGFUSE_CLICKHOUSE_PASSWORD:-clickhouse}
volumes:
- langfuse_clickhouse_data:/var/lib/clickhouse
- langfuse_clickhouse_logs:/var/log/clickhouse-server
healthcheck:
test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1
interval: 5s
timeout: 5s
retries: 10
start_period: 10s
networks:
- WeKnora-network
profiles:
- langfuse
- full
langfuse-minio:
image: minio/minio:RELEASE.2025-09-07T16-13-09Z
container_name: WeKnora-langfuse-minio
restart: unless-stopped
entrypoint: sh
# 启动前创建 `langfuse` 桶Langfuse 开箱即用
command: -c 'mkdir -p /data/langfuse && minio server --address ":9000" --console-address ":9001" /data'
environment:
MINIO_ROOT_USER: ${LANGFUSE_MINIO_USER:-langfuseminio}
MINIO_ROOT_PASSWORD: ${LANGFUSE_MINIO_PASSWORD:-langfuseminiosecret}
ports:
# 9100S3 API浏览器上传媒体时会直连此端口必须对外暴露
- "${LANGFUSE_MINIO_S3_PORT:-9100}:9000"
# 9101MinIO 控制台(可选,方便排障)
- "${LANGFUSE_MINIO_CONSOLE_PORT:-9101}:9001"
volumes:
- langfuse_minio_data:/data
healthcheck:
test: ["CMD", "mc", "ready", "local"]
interval: 5s
timeout: 10s
retries: 5
networks:
- WeKnora-network
profiles:
- langfuse
- full
langfuse-worker:
image: langfuse/langfuse-worker:3
container_name: WeKnora-langfuse-worker
restart: unless-stopped
depends_on: &langfuse-depends-on
# 复用 WeKnora 已有的 postgres/redis需要先完成 langfuse 数据库的创建
langfuse-db-init:
condition: service_completed_successfully
redis:
condition: service_started
langfuse-clickhouse:
condition: service_healthy
langfuse-minio:
condition: service_healthy
# 用 wrapper entrypoint在容器启动时把 DB_PASSWORD / REDIS_PASSWORD 做 URL 编码,
# 避免 DB_PASSWORD 含 '@' / '#' 等字符导致 Prisma 无法解析 DATABASE_URL (P1013)。
# Langfuse 镜像基于 Node.js直接用 node 的 encodeURIComponent不引外部依赖。
# 注意compose 覆盖 entrypoint 会清空镜像默认 CMD因此在 wrapper 末尾写死 exec 的原始命令。
entrypoint:
- /bin/sh
- -ec
- |
_enc() { node -e 'process.stdout.write(encodeURIComponent(process.argv[1]))' "$$1"; }
DU=$$(_enc "$$_LF_DB_USER")
DP=$$(_enc "$$_LF_DB_PASSWORD")
RP=$$(_enc "$$_LF_REDIS_PASSWORD")
export DATABASE_URL="postgresql://$$DU:$$DP@postgres:5432/$$_LF_DB_NAME"
export REDIS_CONNECTION_STRING="redis://:$$RP@redis:6379/$$_LF_REDIS_DB"
unset _LF_DB_USER _LF_DB_PASSWORD _LF_REDIS_PASSWORD
exec dumb-init -- ./worker/entrypoint.sh node worker/dist/index.js
environment: &langfuse-env
# 原始凭证(未 URL 编码),由 entrypoint wrapper 读取并组装成 DATABASE_URL / REDIS_CONNECTION_STRING
_LF_DB_USER: ${DB_USER}
_LF_DB_PASSWORD: ${DB_PASSWORD}
_LF_DB_NAME: ${LANGFUSE_DB_NAME:-langfuse}
_LF_REDIS_PASSWORD: ${REDIS_PASSWORD}
_LF_REDIS_DB: ${LANGFUSE_REDIS_DB:-1}
# SALT / ENCRYPTION_KEY 生产环境务必重新生成:
# SALT: openssl rand -base64 32
# ENCRYPTION_KEY: openssl rand -hex 32
SALT: ${LANGFUSE_SALT:-weknora-langfuse-dev-salt-change-me}
ENCRYPTION_KEY: ${LANGFUSE_ENCRYPTION_KEY:-0000000000000000000000000000000000000000000000000000000000000000}
NEXTAUTH_URL: ${LANGFUSE_NEXTAUTH_URL:-http://localhost:3000}
NEXTAUTH_SECRET: ${LANGFUSE_NEXTAUTH_SECRET:-weknora-langfuse-dev-nextauth-secret-change-me}
TELEMETRY_ENABLED: ${LANGFUSE_TELEMETRY_ENABLED:-false}
LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES: "false"
CLICKHOUSE_URL: http://langfuse-clickhouse:8123
CLICKHOUSE_MIGRATION_URL: clickhouse://langfuse-clickhouse:9000
CLICKHOUSE_USER: ${LANGFUSE_CLICKHOUSE_USER:-clickhouse}
CLICKHOUSE_PASSWORD: ${LANGFUSE_CLICKHOUSE_PASSWORD:-clickhouse}
CLICKHOUSE_CLUSTER_ENABLED: "false"
LANGFUSE_S3_EVENT_UPLOAD_BUCKET: langfuse
LANGFUSE_S3_EVENT_UPLOAD_REGION: auto
LANGFUSE_S3_EVENT_UPLOAD_ACCESS_KEY_ID: ${LANGFUSE_MINIO_USER:-langfuseminio}
LANGFUSE_S3_EVENT_UPLOAD_SECRET_ACCESS_KEY: ${LANGFUSE_MINIO_PASSWORD:-langfuseminiosecret}
LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT: http://langfuse-minio:9000
LANGFUSE_S3_EVENT_UPLOAD_FORCE_PATH_STYLE: "true"
LANGFUSE_S3_EVENT_UPLOAD_PREFIX: events/
LANGFUSE_S3_MEDIA_UPLOAD_BUCKET: langfuse
LANGFUSE_S3_MEDIA_UPLOAD_REGION: auto
LANGFUSE_S3_MEDIA_UPLOAD_ACCESS_KEY_ID: ${LANGFUSE_MINIO_USER:-langfuseminio}
LANGFUSE_S3_MEDIA_UPLOAD_SECRET_ACCESS_KEY: ${LANGFUSE_MINIO_PASSWORD:-langfuseminiosecret}
# 媒体上传用的外部 endpoint浏览器直连默认指向宿主机映射的 9100 端口
LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT: ${LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT:-http://localhost:9100}
LANGFUSE_S3_MEDIA_UPLOAD_FORCE_PATH_STYLE: "true"
LANGFUSE_S3_MEDIA_UPLOAD_PREFIX: media/
networks:
- WeKnora-network
profiles:
- langfuse
- full
langfuse-web:
image: langfuse/langfuse:3
container_name: WeKnora-langfuse-web
restart: unless-stopped
depends_on: *langfuse-depends-on
ports:
- "${LANGFUSE_WEB_PORT:-3000}:3000"
# 同样走 wrapper entrypoint末尾写死 web 的原始启动命令(见镜像 Dockerfile 的 CMD
entrypoint:
- /bin/sh
- -ec
- |
_enc() { node -e 'process.stdout.write(encodeURIComponent(process.argv[1]))' "$$1"; }
DU=$$(_enc "$$_LF_DB_USER")
DP=$$(_enc "$$_LF_DB_PASSWORD")
RP=$$(_enc "$$_LF_REDIS_PASSWORD")
export DATABASE_URL="postgresql://$$DU:$$DP@postgres:5432/$$_LF_DB_NAME"
export REDIS_CONNECTION_STRING="redis://:$$RP@redis:6379/$$_LF_REDIS_DB"
unset _LF_DB_USER _LF_DB_PASSWORD _LF_REDIS_PASSWORD
if [ -n "$$NEXT_PUBLIC_LANGFUSE_CLOUD_REGION" ]; then
exec dumb-init -- ./web/entrypoint.sh node --import dd-trace/initialize.mjs ./web/server.js --keepAliveTimeout 110000
else
exec dumb-init -- ./web/entrypoint.sh node ./web/server.js --keepAliveTimeout 110000
fi
environment:
<<: *langfuse-env
# 可选:首次启动时自动创建组织、项目和管理员(填了就不用在 UI 上手动注册)
LANGFUSE_INIT_ORG_ID: ${LANGFUSE_INIT_ORG_ID:-}
LANGFUSE_INIT_ORG_NAME: ${LANGFUSE_INIT_ORG_NAME:-}
LANGFUSE_INIT_PROJECT_ID: ${LANGFUSE_INIT_PROJECT_ID:-}
LANGFUSE_INIT_PROJECT_NAME: ${LANGFUSE_INIT_PROJECT_NAME:-}
LANGFUSE_INIT_PROJECT_PUBLIC_KEY: ${LANGFUSE_INIT_PROJECT_PUBLIC_KEY:-}
LANGFUSE_INIT_PROJECT_SECRET_KEY: ${LANGFUSE_INIT_PROJECT_SECRET_KEY:-}
LANGFUSE_INIT_USER_EMAIL: ${LANGFUSE_INIT_USER_EMAIL:-}
LANGFUSE_INIT_USER_NAME: ${LANGFUSE_INIT_USER_NAME:-}
LANGFUSE_INIT_USER_PASSWORD: ${LANGFUSE_INIT_USER_PASSWORD:-}
networks:
- WeKnora-network
profiles:
- langfuse
- full
networks:
WeKnora-network:
driver: bridge
@@ -373,3 +607,6 @@ volumes:
qdrant_data:
milvus_data:
weaviate_data:
langfuse_clickhouse_data:
langfuse_clickhouse_logs:
langfuse_minio_data:

281
docs/Langfuse集成.md Normal file
View File

@@ -0,0 +1,281 @@
# Langfuse 集成
WeKnora 内置了对 [Langfuse](https://langfuse.com) 的轻量级集成,用于统计 token 消耗、追踪 LLM 调用链路、并为每个对话生成可在 Langfuse 控制台查看的 trace。该集成解决 issue [#497](https://github.com/Tencent/WeKnora/issues/497)token 使用量统计)和 discussion [#620](https://github.com/Tencent/WeKnora/discussions/620)(接入 Langfuse
## 1. 特性
- 自动上报 **chat / embedding / rerank / VLM视觉语言模型/ ASR语音识别** 全部 5 类模型调用的 prompt、响应和 token 使用量。
- 为每个对话或检索请求创建一条 **trace**,对应 HTTP 请求级别,方便按 session / user 聚合。
- 支持 **流式响应**:记录首 token 延迟Time-To-First-Token完整响应在流结束后一次性写入。
- **完全可选**:不配置 `LANGFUSE_*` 环境变量时Langfuse 相关代码路径是 no-op不产生任何性能开销。
- **异步批量上报**:不阻塞业务请求;队列满时静默丢弃,观测数据不会影响用户对话。
- **开箱即用的部署方式**Docker Compose`docker-compose.yml` 已内置环境变量、Helm Chart通过 `extraEnv`、Lite 版本(本地单机)均支持。
## 2. 快速开始
### 2.1 获取 Langfuse 凭证
1. 登录 [cloud.langfuse.com](https://cloud.langfuse.com) 或自建 Langfuse 实例。
2. 进入 `Project Settings → API Keys`,生成一对 `Public Key` / `Secret Key`
### 2.2 按部署方式配置
#### ADocker Compose 部署(推荐)
`docker-compose.yml` 已经把所有 `LANGFUSE_*` 环境变量串到 `app` 服务。下面提供两种选择。
##### A-1) 接入 Langfuse Cloud最简单
只需要在 **`.env`** 里加 3 行:
```bash
LANGFUSE_PUBLIC_KEY=pk-lf-xxxxxxxx
LANGFUSE_SECRET_KEY=sk-lf-xxxxxxxx
LANGFUSE_HOST=https://cloud.langfuse.com # 美区用 https://us.cloud.langfuse.com
```
然后重启服务:
```bash
docker compose up -d app
docker compose logs -f app | grep Langfuse
```
看到下面这行就说明已启用:
```
[Langfuse] enabled host=https://cloud.langfuse.com flush_at=15 flush_interval=3s sample_rate=1.00
```
##### A-2) 自建 Langfuse 栈(离线 / 内网 / 数据合规)
`docker-compose.yml` 内置了一个可选的 `langfuse` profile用一条命令就能拉起 Langfuse v3。
**设计上已尽可能复用 WeKnora 已有容器,避免资源浪费**
| 组件 | 来源 | 备注 |
| --- | --- | --- |
| PostgreSQL | 复用 `WeKnora-postgres` | 通过一次性的 `langfuse-db-init` 容器,在同一 pg 实例里创建独立的 `langfuse` 数据库。库级隔离,互不影响。 |
| Redis | 复用 `WeKnora-redis` | 使用独立的 Redis DB 号(默认 DB 1WeKnora 用 DB 0`REDIS_CONNECTION_STRING` 指定 DB 后缀。 |
| ClickHouse | 新增 `langfuse-clickhouse` | Langfuse 专有OLAP 事件存储WeKnora 不用,必须独立。 |
| MinIO | 新增 `langfuse-minio` | 故意和 WeKnora 的 `minio` 分开(后者是可选 profile未必激活Langfuse S3 要专属 bucket。 |
| Web / Worker | 新增 `langfuse-web` + `langfuse-worker` | Langfuse 应用本体。 |
最终 `--profile langfuse` 只新增 **4 个常驻容器 + 1 个一次性 init**,内存开销由原先的 ~1.52.5 GB 降到约 **1.01.5 GB**
```bash
# 1. 启动自建栈ClickHouse 首次迁移大约需要 1-2 分钟)
docker compose --profile langfuse up -d
# 2. 浏览器打开 http://localhost:3000 注册管理员账号
# 然后在 Project Settings → API Keys 生成 Public/Secret Key
# 3. 把 key 填回 .env 并把 HOST 改成容器内部地址
cat >> .env <<'EOF'
LANGFUSE_HOST=http://langfuse-web:3000
LANGFUSE_PUBLIC_KEY=pk-lf-xxxxxxxx
LANGFUSE_SECRET_KEY=sk-lf-xxxxxxxx
EOF
# 4. 让 app 重新加载配置
docker compose up -d app
```
> ⚠️ **生产部署安全提示**`.env.example` 里的默认密码 / `SALT` / `ENCRYPTION_KEY` 都是开发占位符,生产环境必须用以下命令重新生成:
>
> ```bash
> echo "LANGFUSE_SALT=$(openssl rand -base64 32)"
> echo "LANGFUSE_ENCRYPTION_KEY=$(openssl rand -hex 32)"
> echo "LANGFUSE_NEXTAUTH_SECRET=$(openssl rand -base64 32)"
> ```
>
> 同时把 `LANGFUSE_DB_PASSWORD` / `LANGFUSE_CLICKHOUSE_PASSWORD` / `LANGFUSE_REDIS_PASSWORD` / `LANGFUSE_MINIO_PASSWORD` 全部换成强密码。完整变量清单见 `.env.example` 的 "Langfuse 自建栈配置" 段。
##### 通用调优
可选调优变量(`LANGFUSE_FLUSH_AT``LANGFUSE_SAMPLE_RATE` 等)都已经在 `docker-compose.yml` 中预设直通,只要在 `.env` 追加对应行即可生效。完整列表见 `.env.example` 的 Langfuse 段,或本文第 3 节。
##### 资源开销估算A-2 自建方案)
| 组件 | 类型 | 典型 RSS | 备注 |
| --- | --- | --- | --- |
| langfuse-db-init | 一次性 | | 创建 `langfuse` 数据库后立即退出 |
| langfuse-web | 常驻 | 300500 MB | Next.js |
| langfuse-worker | 常驻 | 200400 MB | Node.jsQueue consumer |
| langfuse-clickhouse | 常驻 | 500 MB1 GB | 首次迁移稍高,稳态约 500 MB |
| langfuse-minio | 常驻 | 100200 MB | |
| 复用WeKnora-postgres | | +~50 MB | 多一个 `langfuse` 数据库 |
| 复用WeKnora-redis | | +3080 MB | 共用实例的 DB 1 |
| **新增合计** | | **≈ 1.01.5 GB** | 推荐 3 GB+ 可用内存 |
> 和"完全隔离各建一套 pg/redis"方案相比,这里节省了约 **400500 MB** 内存。代价是 WeKnora 的 pg/redis 容量规划需要为 Langfuse 预留一点余量Langfuse 写入量并不大(只是元数据 + 任务队列,事件主体走 ClickHouse实际影响很小。
对单机部署而言,若只想使用 Langfuse Cloud 方案A-1**完全不需要**这些容器;原有服务 CPU/内存占用不变。
##### 生产环境下的注意事项
- **WeKnora-redis 的驱逐策略**Langfuse 建议 `maxmemory-policy noeviction`(避免 Redis 在内存紧张时丢弃队列任务)。如果 WeKnora 的 redis 未配置该策略,建议在 `docker-compose.yml` 的 redis command 中加上 `--maxmemory-policy noeviction`
- **备份**`pg_dump -d langfuse` 可独立备份 Langfuse 的元数据;事件数据在 ClickHouse 卷(`langfuse_clickhouse_data`)中。
- **想彻底隔离**(跨机部署、强运维隔离):可以直接把 `langfuse-web` / `langfuse-worker``DATABASE_URL``REDIS_CONNECTION_STRING` 指向任意外部 pg/redis例如 RDS + ElastiCache`langfuse-db-init` 容器可以选择不启动,手动在目标 pg 上 `CREATE DATABASE langfuse` 即可。
#### BWeKnora Lite单机
`.env.lite`(或启动脚本导出的环境变量)里加:
```bash
LANGFUSE_PUBLIC_KEY=pk-lf-xxxxxxxx
LANGFUSE_SECRET_KEY=sk-lf-xxxxxxxx
LANGFUSE_HOST=https://cloud.langfuse.com
```
启动 `weknora-lite`(或 macOS `.app`)后效果同上。
#### CHelm Chart 部署
`values.yaml``app.extraEnv` 添加:
```yaml
app:
extraEnv:
- name: LANGFUSE_PUBLIC_KEY
valueFrom:
secretKeyRef:
name: langfuse-credentials
key: public_key
- name: LANGFUSE_SECRET_KEY
valueFrom:
secretKeyRef:
name: langfuse-credentials
key: secret_key
- name: LANGFUSE_HOST
value: https://cloud.langfuse.com
```
建议把 Secret Key 放到 Kubernetes Secret 中,切勿写进 values.yaml。
#### D二进制 / 源码运行
```bash
export LANGFUSE_PUBLIC_KEY="pk-lf-xxxx"
export LANGFUSE_SECRET_KEY="sk-lf-xxxx"
export LANGFUSE_HOST="https://cloud.langfuse.com"
./weknora-server
```
#### E本地开发`docker-compose.dev.yml` + `go run`
`docker-compose.dev.yml` 只启动基础设施容器postgres/redis/docreader 等),`app` 走本地 `go run ./cmd/server`。Langfuse 的两种接入方式:
**E-1) 直连 Langfuse Clouddev 最常见)**
无需改任何 compose 文件,本地 shell 导出即可:
```bash
export LANGFUSE_PUBLIC_KEY="pk-lf-xxxx"
export LANGFUSE_SECRET_KEY="sk-lf-xxxx"
export LANGFUSE_HOST="https://cloud.langfuse.com"
go run ./cmd/server
```
**E-2) 本地自建栈调试**
dev compose 也支持对称的 `langfuse` profile复用同一个 dev postgres + redis
```bash
# 拉起基础设施 + Langfuse 栈
docker compose -f docker-compose.dev.yml up -d postgres redis docreader
docker compose -f docker-compose.dev.yml --profile langfuse up -d
# 浏览器打开 http://localhost:3000 注册并生成 key
# 本地 app 接入(注意是 localhost不是 langfuse-web因为 go run 跑在宿主机)
export LANGFUSE_HOST=http://localhost:3000
export LANGFUSE_PUBLIC_KEY=pk-lf-xxxxxxxx
export LANGFUSE_SECRET_KEY=sk-lf-xxxxxxxx
go run ./cmd/server
```
Dev 相关容器都带 `-dev` 后缀、用独立网络 `WeKnora-network-dev`,和生产 compose **不冲突**
### 2.3 验证
发起一次知识问答(`POST /api/v1/knowledge-chat/:session_id`)或知识检索(`POST /api/v1/knowledge-search`)。等待 3 秒(或批量大小达到 `flush_at`Langfuse 控制台的 **Traces** 页面会出现对应的 trace
- 顶层节点HTTP 请求(带 `userId` / `sessionId`)。
- 子节点依次为 rerank、chat、VLM 等具体模型调用,点击可查看 prompt、响应以及 usageprompt/completion/total tokens
- 流式对话会额外标注 Time-To-First-Token。
## 3. 环境变量参考
| 变量名 | 默认值 | 说明 |
| --- | --- | --- |
| `LANGFUSE_ENABLED` | 自动 | 显式开关。未设置时,只要 `PUBLIC_KEY` + `SECRET_KEY` 都存在就自动启用。支持 `true/false/1/0/yes/no`。 |
| `LANGFUSE_HOST` | `https://cloud.langfuse.com` | Langfuse 实例地址。美区用 `https://us.cloud.langfuse.com`,自建实例填 `https://langfuse.your-domain.com`。 |
| `LANGFUSE_PUBLIC_KEY` | — | 项目 Public Key`pk-lf-...`)。 |
| `LANGFUSE_SECRET_KEY` | — | 项目 Secret Key`sk-lf-...`),请走密钥管理工具注入,不要提交到仓库。 |
| `LANGFUSE_RELEASE` | — | 可选,上报到 Langfuse 的版本号,例如 CI 构建号。 |
| `LANGFUSE_ENVIRONMENT` | — | 可选,环境标签(`production` / `staging` / `dev`),方便在 UI 过滤。 |
| `LANGFUSE_FLUSH_AT` | `15` | 批处理大小:缓冲区积累到该数量立即上报。 |
| `LANGFUSE_FLUSH_INTERVAL` | `3s` | 定时刷新间隔。支持 `500ms``5s``1m` 等 Go duration 写法;纯数字按秒处理。 |
| `LANGFUSE_QUEUE_SIZE` | `2048` | 内存队列容量。队列满时新事件会被静默丢弃(避免拖慢业务)。 |
| `LANGFUSE_REQUEST_TIMEOUT` | `10s` | 单次 HTTP ingest 请求超时。 |
| `LANGFUSE_SAMPLE_RATE` | `1.0` | 采样率 (0..1)。`0` 视为 `1.0`。高流量环境可下调。 |
| `LANGFUSE_DEBUG` | `false` | 打开后会在 WeKnora 日志里打印上报失败的详细原因,排障期间临时开启。 |
## 4. 观测数据说明
| Langfuse 概念 | WeKnora 对应 | 备注 |
| --- | --- | --- |
| Trace | 一次 HTTP 请求 | 自动在 `/api/v1/knowledge-chat``/api/v1/agent-chat``/api/v1/knowledge-search``generate_title`、模型连通性测试等端点创建。 |
| Generationtype=GENERATION | 每次 chat / embedding / rerank / VLM / ASR 调用 | 包含 model、prompt、响应、token usage、modelParameterstemperature / top_p / max_tokens 等)。 |
| Input Tokens | `TokenUsage.PromptTokens` | 来自模型返回的 usage 字段。 |
| Output Tokens | `TokenUsage.CompletionTokens` | 来自模型返回的 usage 字段。 |
| Total Tokens | `TokenUsage.TotalTokens` | 大多数厂商返回;未返回时自动求和。 |
| `userId` | `X-User-ID` / 租户 ID | 未登录时退化为 `tenant:<id>`,方便按租户汇总消耗。 |
| `sessionId` | URL 中的 `:session_id` | 可以在 Langfuse 的 Sessions 视图聚合一整场对话。 |
| Time-To-First-Token | 流式调用首条有效 chunk 的时间 | 通过 `generation-update.completionStartTime` 上报。 |
### 各模型的 usage 处理策略
| 模型类型 | 上报名称 | Token 计量方式 | 备注 |
| --- | --- | --- | --- |
| Chat | `chat.completion` / `chat.completion.stream` | 直接使用模型返回的 `prompt_tokens` / `completion_tokens` / `total_tokens` | 流式请求会记录 TTFT。 |
| Embedding | `embedding.embed` / `embedding.batch_embed` | 模型未返回 usage 时按 `rune_count/4 + 1` 估算 input tokens | 批量接口会上报批量大小和前 5 条文本预览,避免把整批内容塞进 trace。 |
| Rerank | `rerank` | 按 `query + 所有文档` 的 rune 数估算 input tokens | 输出只上报前 10 条 `(index, score)`。 |
| VLM | `vlm.predict` | prompt/result 分别按 `rune/4` 估算 input/output | 不上传原始图片字节;仅记录图片数量与总字节大小。 |
| ASR | `asr.transcribe` | 以 **秒**`SECONDS`)为计量单位,取转录结果最后一个 segment 的 `end` 作为音频时长 | 便于 Langfuse 按"分钟"结算 Whisper 类 API。 |
> TipLangfuse 的 `Settings → Models` 页面可以为自定义模型(本地 Ollama、阿里云百炼等配置单价每 1K tokens、每分钟等Langfuse 会据此自动核算费用。
## 5. 高流量部署建议
- **调高 `LANGFUSE_FLUSH_AT`** 到 50100降低 ingest HTTP 调用频率。
- **采样**:把 `LANGFUSE_SAMPLE_RATE=0.1` 只采样 10% 的对话,生产成本与信噪比通常能得到较好的平衡。
- **扩大 `LANGFUSE_QUEUE_SIZE`** 至 8192防止短时峰值触发事件丢弃。
- 将 Langfuse 实例部署在离 WeKnora 同机房(例如自建 Langfuse + 内网地址),可以显著降低上报延迟。
- 打开 `LANGFUSE_DEBUG=true` 几分钟即可确认链路,生产环境常态下关闭,避免日志噪音。
## 6. 禁用
删除或留空 `LANGFUSE_PUBLIC_KEY` / `LANGFUSE_SECRET_KEY`,或显式设置 `LANGFUSE_ENABLED=false`,再重启服务即可。所有 Langfuse 相关代码路径会回退到 no-op不会影响其他观测组件OpenTelemetry、LLM Debug Log
## 7. 故障排查
| 现象 | 建议排查步骤 |
| --- | --- |
| 启动日志没有 `[Langfuse] enabled` | 检查 `LANGFUSE_PUBLIC_KEY` / `LANGFUSE_SECRET_KEY` 是否被服务进程读到;容器里可 `env \| grep LANGFUSE` 验证。 |
| 控制台看不到 trace | 打开 `LANGFUSE_DEBUG=true`,观察日志中是否有 `[Langfuse] flush ... failed`。常见原因:`LANGFUSE_HOST` 错误、企业防火墙拦截 HTTPS、Secret Key 轮换后未更新。 |
| 部分 chunk 缺失 | 调大 `LANGFUSE_QUEUE_SIZE`;确认 Langfuse ingest API 没有返回 429/503。 |
| token 数为 0 | 该模型在返回中未提供 usage常见于部分本地 Ollama / 自建模型)。可在模型侧开启 usage 统计,或在 Langfuse 配置里为该模型提供 tokenizer。 |
## 8. 代码位置
- `internal/tracing/langfuse/` — Langfuse 客户端、异步批量上报、Gin 中间件。
- `internal/models/chat/langfuse_wrapper.go` — Chat 调用装饰器(含流式)。
- `internal/models/embedding/langfuse_wrapper.go` — Embedding 调用装饰器。
- `internal/models/rerank/langfuse_wrapper.go` — Rerank 调用装饰器。
- `internal/models/vlm/langfuse_wrapper.go` — VLM视觉语言模型调用装饰器。
- `internal/models/asr/langfuse_wrapper.go` — ASR语音识别调用装饰器。
- `internal/container/container.go` — 初始化 + 资源清理。
- `internal/router/router.go` — 自动为 LLM 相关路由创建 trace。
- `docker-compose.yml` / `.env.example` / `.env.lite.example` — 预置 `LANGFUSE_*` 环境变量直通。

View File

@@ -25,21 +25,21 @@ func generateEventID(suffix string) string {
// sessionService implements the SessionService interface for managing conversation sessions
type sessionService struct {
cfg *config.Config // Application configuration
sessionRepo interfaces.SessionRepository // Repository for session data
messageRepo interfaces.MessageRepository // Repository for message data
knowledgeBaseService interfaces.KnowledgeBaseService // Service for knowledge base operations
modelService interfaces.ModelService // Service for model operations
tenantService interfaces.TenantService // Service for tenant operations
eventManager *chatpipeline.EventManager // Event manager for chat pipeline
agentService interfaces.AgentService // Service for agent operations
sessionStorage llmcontext.ContextStorage // Session storage
knowledgeService interfaces.KnowledgeService // Service for knowledge operations
chunkService interfaces.ChunkService // Service for chunk operations
webSearchStateRepo interfaces.WebSearchStateService // Service for web search state
webSearchProviderRepo interfaces.WebSearchProviderRepository // Repository for web search provider entities
kbShareService interfaces.KBShareService // Service for KB sharing operations
memoryService interfaces.MemoryService // Service for memory operations
cfg *config.Config // Application configuration
sessionRepo interfaces.SessionRepository // Repository for session data
messageRepo interfaces.MessageRepository // Repository for message data
knowledgeBaseService interfaces.KnowledgeBaseService // Service for knowledge base operations
modelService interfaces.ModelService // Service for model operations
tenantService interfaces.TenantService // Service for tenant operations
eventManager *chatpipeline.EventManager // Event manager for chat pipeline
agentService interfaces.AgentService // Service for agent operations
sessionStorage llmcontext.ContextStorage // Session storage
knowledgeService interfaces.KnowledgeService // Service for knowledge operations
chunkService interfaces.ChunkService // Service for chunk operations
webSearchStateRepo interfaces.WebSearchStateService // Service for web search state
webSearchProviderRepo interfaces.WebSearchProviderRepository // Repository for web search provider entities
kbShareService interfaces.KBShareService // Service for KB sharing operations
memoryService interfaces.MemoryService // Service for memory operations
}
// NewSessionService creates a new session service instance with all required dependencies
@@ -458,6 +458,9 @@ func (s *sessionService) GenerateTitleAsync(
tenantID := ctx.Value(types.TenantIDContextKey)
requestID := ctx.Value(types.RequestIDContextKey)
language := ctx.Value(types.LanguageContextKey)
// Keep the Langfuse trace handle so the async title generation shows up
// as a child of the same trace as the originating chat request.
langfuseTrace := ctx.Value(types.LangfuseTraceContextKey)
go func() {
bgCtx := context.Background()
if tenantID != nil {
@@ -469,6 +472,9 @@ func (s *sessionService) GenerateTitleAsync(
if language != nil {
bgCtx = context.WithValue(bgCtx, types.LanguageContextKey, language)
}
if langfuseTrace != nil {
bgCtx = context.WithValue(bgCtx, types.LangfuseTraceContextKey, langfuseTrace)
}
// Skip if title already exists
if session.Title != "" {

View File

@@ -73,6 +73,7 @@ import (
"github.com/Tencent/WeKnora/internal/router"
"github.com/Tencent/WeKnora/internal/stream"
"github.com/Tencent/WeKnora/internal/tracing"
"github.com/Tencent/WeKnora/internal/tracing/langfuse"
"github.com/Tencent/WeKnora/internal/types"
"github.com/Tencent/WeKnora/internal/types/interfaces"
slackpkg "github.com/slack-go/slack"
@@ -100,6 +101,7 @@ func BuildContainer(container *dig.Container) *dig.Container {
logger.Debugf(ctx, "[Container] Registering core infrastructure...")
must(container.Provide(config.LoadConfig))
must(container.Provide(initTracer))
must(container.Provide(initLangfuse))
must(container.Provide(initDatabase))
must(container.Provide(initFileService))
must(container.Provide(initRedisClient))
@@ -108,6 +110,7 @@ func BuildContainer(container *dig.Container) *dig.Container {
// Register tracer cleanup handler (tracer needs to be available for cleanup registration)
must(container.Invoke(registerTracerCleanup))
must(container.Invoke(registerLangfuseCleanup))
// Register goroutine pool cleanup handler
must(container.Invoke(registerPoolCleanup))
@@ -329,6 +332,15 @@ func initTracer() (*tracing.Tracer, error) {
return tracing.InitTracer()
}
// initLangfuse initializes the Langfuse ingestion client.
// Configuration is read from LANGFUSE_* environment variables (see
// docs/langfuse.md). Returns a disabled manager if credentials are absent —
// never an error — so deployments that don't use Langfuse are unaffected.
func initLangfuse() (*langfuse.Manager, error) {
cfg := langfuse.LoadConfigFromEnv()
return langfuse.Init(cfg)
}
func initRedisClient() (*redis.Client, error) {
redisAddr := os.Getenv("REDIS_ADDR")
if redisAddr == "" {
@@ -1024,6 +1036,20 @@ func registerTracerCleanup(tracer *tracing.Tracer, cleaner interfaces.ResourceCl
})
}
// registerLangfuseCleanup ensures buffered Langfuse events are flushed on
// shutdown. A 5-second timeout matches other external-service cleanups and
// balances data durability against a slow remote endpoint holding up exit.
func registerLangfuseCleanup(mgr *langfuse.Manager, cleaner interfaces.ResourceCleaner) {
if mgr == nil {
return
}
cleaner.RegisterWithName("Langfuse", func() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return mgr.Shutdown(ctx)
})
}
// initDocReaderClient initializes the DocumentReader client (lightweight API).
func initDocReaderClient(cfg *config.Config) (interfaces.DocumentReader, error) {
addr := strings.TrimSpace(os.Getenv("DOCREADER_ADDR"))

View File

@@ -398,6 +398,11 @@ func CloneContext(ctx context.Context) context.Context {
types.LanguageContextKey,
types.SessionTenantIDContextKey,
types.EmbedQueryContextKey,
// Keep the Langfuse trace alive across CloneContext boundaries so
// LLM/Embedder/Reranker/VLM/ASR wrappers attach their generations
// to the same trace opened by GinMiddleware, instead of each call
// auto-creating its own orphan trace.
types.LangfuseTraceContextKey,
} {
if v := ctx.Value(k); v != nil {
newCtx = context.WithValue(newCtx, k, v)

View File

@@ -60,5 +60,6 @@ func ConfigFromModel(m *types.Model) *Config {
// NewASR creates an ASR instance based on the provided configuration.
// All ASR vendors use the OpenAI-compatible /v1/audio/transcriptions API.
func NewASR(config *Config) (ASR, error) {
return NewOpenAIASR(config)
a, err := NewOpenAIASR(config)
return wrapASRLangfuse(a, err)
}

View File

@@ -0,0 +1,79 @@
package asr
import (
"context"
"github.com/Tencent/WeKnora/internal/tracing/langfuse"
)
// langfuseASR wraps an ASR implementation and reports each Transcribe call
// as a Langfuse generation observation. Audio bytes are not uploaded — we
// record file name, audio size and duration (derived from segment end times)
// so usage can be billed by second/minute, which is how most ASR providers
// price their services.
type langfuseASR struct {
inner ASR
}
func (l *langfuseASR) GetModelName() string { return l.inner.GetModelName() }
func (l *langfuseASR) GetModelID() string { return l.inner.GetModelID() }
func (l *langfuseASR) Transcribe(ctx context.Context, audioBytes []byte, fileName string) (*TranscriptionResult, error) {
mgr := langfuse.GetManager()
if !mgr.Enabled() {
return l.inner.Transcribe(ctx, audioBytes, fileName)
}
genCtx, gen := mgr.StartGeneration(ctx, langfuse.GenerationOptions{
Name: "asr.transcribe",
Model: l.inner.GetModelName(),
Input: map[string]interface{}{
"file_name": fileName,
"audio_size": len(audioBytes),
},
Metadata: map[string]interface{}{
"model_id": l.inner.GetModelID(),
"audio_size": len(audioBytes),
},
})
result, err := l.inner.Transcribe(genCtx, audioBytes, fileName)
output := map[string]interface{}{}
var duration float64
if result != nil {
output["text"] = result.Text
output["segment_count"] = len(result.Segments)
if n := len(result.Segments); n > 0 {
duration = result.Segments[n-1].End
output["duration_seconds"] = duration
}
}
// ASR is billed per second; we emit the audio duration (when available)
// in the "Output" side of usage so users can configure per-model pricing
// in Langfuse (e.g. $0.006 per minute for Whisper-1).
var usage *langfuse.TokenUsage
if duration > 0 {
seconds := int(duration + 0.5)
usage = &langfuse.TokenUsage{
Output: seconds,
Total: seconds,
Unit: "SECONDS",
}
}
gen.Finish(output, usage, err)
return result, err
}
// wrapASRLangfuse applies the Langfuse decorator when the manager is enabled.
func wrapASRLangfuse(a ASR, err error) (ASR, error) {
if err != nil || a == nil {
return a, err
}
if !langfuse.GetManager().Enabled() {
return a, nil
}
return &langfuseASR{inner: a}, nil
}

View File

@@ -141,7 +141,8 @@ func NewChat(config *ChatConfig, ollamaService *ollama.OllamaService) (Chat, err
default:
return nil, fmt.Errorf("unsupported chat model source: %s", config.Source)
}
return wrapChatDebug(c, err)
c, err = wrapChatDebug(c, err)
return wrapChatLangfuse(c, err)
}
// NewRemoteChat 根据 provider 创建远程聊天实例

View File

@@ -0,0 +1,210 @@
package chat
import (
"context"
"time"
"github.com/Tencent/WeKnora/internal/tracing/langfuse"
"github.com/Tencent/WeKnora/internal/types"
)
// langfuseChat wraps a Chat implementation and emits a Langfuse generation
// observation for every Chat/ChatStream call, capturing prompt, response and
// token usage. The wrapper is only installed when the Langfuse manager is
// enabled, so there is no cost for deployments that don't use Langfuse.
type langfuseChat struct {
inner Chat
}
func (l *langfuseChat) GetModelName() string { return l.inner.GetModelName() }
func (l *langfuseChat) GetModelID() string { return l.inner.GetModelID() }
func (l *langfuseChat) Chat(ctx context.Context, messages []Message, opts *ChatOptions) (*types.ChatResponse, error) {
mgr := langfuse.GetManager()
if !mgr.Enabled() {
return l.inner.Chat(ctx, messages, opts)
}
genCtx, gen := mgr.StartGeneration(ctx, langfuse.GenerationOptions{
Name: "chat.completion",
Model: l.inner.GetModelName(),
Input: buildLangfuseMessages(messages),
ModelParameters: buildLangfuseModelParams(opts),
Metadata: map[string]interface{}{
"model_id": l.inner.GetModelID(),
"streaming": false,
"has_tools": opts != nil && len(opts.Tools) > 0,
},
})
resp, err := l.inner.Chat(genCtx, messages, opts)
var usage *langfuse.TokenUsage
var output interface{}
if resp != nil {
usage = convertUsage(&resp.Usage)
output = map[string]interface{}{
"content": resp.Content,
"tool_calls": resp.ToolCalls,
"finish_reason": resp.FinishReason,
}
}
gen.Finish(output, usage, err)
return resp, err
}
func (l *langfuseChat) ChatStream(ctx context.Context, messages []Message, opts *ChatOptions) (<-chan types.StreamResponse, error) {
mgr := langfuse.GetManager()
if !mgr.Enabled() {
return l.inner.ChatStream(ctx, messages, opts)
}
genCtx, gen := mgr.StartGeneration(ctx, langfuse.GenerationOptions{
Name: "chat.completion.stream",
Model: l.inner.GetModelName(),
Input: buildLangfuseMessages(messages),
ModelParameters: buildLangfuseModelParams(opts),
Metadata: map[string]interface{}{
"model_id": l.inner.GetModelID(),
"streaming": true,
"has_tools": opts != nil && len(opts.Tools) > 0,
},
})
ch, err := l.inner.ChatStream(genCtx, messages, opts)
if err != nil {
gen.Finish(nil, nil, err)
return ch, err
}
if ch == nil {
gen.Finish(nil, nil, nil)
return nil, nil
}
wrapped := make(chan types.StreamResponse)
go func() {
defer close(wrapped)
var contentBuf []byte
var usage *types.TokenUsage
var toolCalls []types.LLMToolCall
var finishReason string
var firstToken bool
for resp := range ch {
if resp.ResponseType == types.ResponseTypeAnswer && resp.Content != "" {
if !firstToken {
gen.MarkCompletionStart(time.Now())
firstToken = true
}
contentBuf = append(contentBuf, resp.Content...)
}
if resp.Usage != nil {
usage = resp.Usage
}
if len(resp.ToolCalls) > 0 {
toolCalls = resp.ToolCalls
}
if resp.FinishReason != "" {
finishReason = resp.FinishReason
}
wrapped <- resp
}
output := map[string]interface{}{
"content": string(contentBuf),
"tool_calls": toolCalls,
"finish_reason": finishReason,
}
gen.Finish(output, convertUsage(usage), nil)
}()
return wrapped, nil
}
func buildLangfuseMessages(messages []Message) []map[string]interface{} {
out := make([]map[string]interface{}, 0, len(messages))
for _, m := range messages {
entry := map[string]interface{}{
"role": m.Role,
}
if m.Content != "" {
entry["content"] = m.Content
}
if len(m.MultiContent) > 0 {
entry["content"] = m.MultiContent
}
if m.Name != "" {
entry["name"] = m.Name
}
if m.ToolCallID != "" {
entry["tool_call_id"] = m.ToolCallID
}
if len(m.ToolCalls) > 0 {
entry["tool_calls"] = m.ToolCalls
}
out = append(out, entry)
}
return out
}
func buildLangfuseModelParams(opts *ChatOptions) map[string]interface{} {
if opts == nil {
return nil
}
params := map[string]interface{}{}
if opts.Temperature != 0 {
params["temperature"] = opts.Temperature
}
if opts.TopP != 0 {
params["top_p"] = opts.TopP
}
if opts.MaxTokens > 0 {
params["max_tokens"] = opts.MaxTokens
}
if opts.MaxCompletionTokens > 0 {
params["max_completion_tokens"] = opts.MaxCompletionTokens
}
if opts.FrequencyPenalty != 0 {
params["frequency_penalty"] = opts.FrequencyPenalty
}
if opts.PresencePenalty != 0 {
params["presence_penalty"] = opts.PresencePenalty
}
if opts.Seed != 0 {
params["seed"] = opts.Seed
}
if opts.ToolChoice != "" {
params["tool_choice"] = opts.ToolChoice
}
if len(params) == 0 {
return nil
}
return params
}
func convertUsage(u *types.TokenUsage) *langfuse.TokenUsage {
if u == nil {
return nil
}
if u.PromptTokens == 0 && u.CompletionTokens == 0 && u.TotalTokens == 0 {
return nil
}
return &langfuse.TokenUsage{
Input: u.PromptTokens,
Output: u.CompletionTokens,
Total: u.TotalTokens,
Unit: "TOKENS",
}
}
// wrapChatLangfuse wraps a Chat in a Langfuse-aware decorator when the
// manager is enabled. Called from NewChat after the debug wrapper so both
// sinks observe the same call.
func wrapChatLangfuse(c Chat, err error) (Chat, error) {
if err != nil || c == nil {
return c, err
}
if !langfuse.GetManager().Enabled() {
return c, nil
}
return &langfuseChat{inner: c}, nil
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/Tencent/WeKnora/internal/logger"
"github.com/Tencent/WeKnora/internal/models/provider"
"github.com/Tencent/WeKnora/internal/models/utils/ollama"
"github.com/Tencent/WeKnora/internal/tracing/langfuse"
"github.com/Tencent/WeKnora/internal/types"
)
@@ -81,10 +82,16 @@ func ConfigFromModel(m *types.Model, appID, appSecret string) Config {
// NewEmbedder creates an embedder based on the configuration
func NewEmbedder(config Config, pooler EmbedderPooler, ollamaService *ollama.OllamaService) (Embedder, error) {
e, err := newEmbedder(config, pooler, ollamaService)
if err != nil || !logger.LLMDebugEnabled() {
if err != nil {
return e, err
}
return &debugEmbedder{inner: e}, nil
if logger.LLMDebugEnabled() {
e = &debugEmbedder{inner: e}
}
if langfuse.GetManager().Enabled() {
e = &langfuseEmbedder{inner: e}
}
return e, nil
}
func newEmbedder(config Config, pooler EmbedderPooler, ollamaService *ollama.OllamaService) (Embedder, error) {

View File

@@ -0,0 +1,136 @@
package embedding
import (
"context"
"github.com/Tencent/WeKnora/internal/tracing/langfuse"
)
// langfuseEmbedder wraps an Embedder and reports each call as a Langfuse
// generation observation. Input token counts are approximated from the text
// lengths when the underlying provider doesn't return usage data, because
// Langfuse's cost reports require non-zero input tokens.
type langfuseEmbedder struct {
inner Embedder
}
func (l *langfuseEmbedder) Embed(ctx context.Context, text string) ([]float32, error) {
mgr := langfuse.GetManager()
if !mgr.Enabled() {
return l.inner.Embed(ctx, text)
}
genCtx, gen := mgr.StartGeneration(ctx, langfuse.GenerationOptions{
Name: "embedding.embed",
Model: l.inner.GetModelName(),
Input: text,
Metadata: map[string]interface{}{
"model_id": l.inner.GetModelID(),
"dimensions": l.inner.GetDimensions(),
},
})
result, err := l.inner.Embed(genCtx, text)
usage := approxEmbeddingUsage([]string{text})
var out interface{}
if len(result) > 0 {
out = map[string]interface{}{
"dimensions": len(result),
"vector_preview": result[:min(3, len(result))],
}
}
gen.Finish(out, usage, err)
return result, err
}
func (l *langfuseEmbedder) BatchEmbed(ctx context.Context, texts []string) ([][]float32, error) {
mgr := langfuse.GetManager()
if !mgr.Enabled() {
return l.inner.BatchEmbed(ctx, texts)
}
genCtx, gen := mgr.StartGeneration(ctx, langfuse.GenerationOptions{
Name: "embedding.batch_embed",
Model: l.inner.GetModelName(),
Input: map[string]interface{}{
"count": len(texts),
// Avoid sending megabytes of full text — Langfuse truncates but
// the network cost is still real. Keep a short preview instead.
"preview": previewTexts(texts, 5),
},
Metadata: map[string]interface{}{
"model_id": l.inner.GetModelID(),
"dimensions": l.inner.GetDimensions(),
"batch_size": len(texts),
},
})
result, err := l.inner.BatchEmbed(genCtx, texts)
usage := approxEmbeddingUsage(texts)
var out interface{}
if len(result) > 0 {
out = map[string]interface{}{
"count": len(result),
"dimensions": len(result[0]),
}
}
gen.Finish(out, usage, err)
return result, err
}
func (l *langfuseEmbedder) BatchEmbedWithPool(ctx context.Context, model Embedder, texts []string) ([][]float32, error) {
return l.inner.BatchEmbedWithPool(ctx, l, texts)
}
func (l *langfuseEmbedder) GetModelName() string { return l.inner.GetModelName() }
func (l *langfuseEmbedder) GetDimensions() int { return l.inner.GetDimensions() }
func (l *langfuseEmbedder) GetModelID() string { return l.inner.GetModelID() }
// approxEmbeddingUsage estimates input tokens as ~rune_count / 4, matching the
// rule of thumb OpenAI uses in their tokenizer docs. This is purely for cost
// reporting — Langfuse lets users define per-model cost multipliers, so the
// approximation need only be proportional to length.
func approxEmbeddingUsage(texts []string) *langfuse.TokenUsage {
total := 0
for _, t := range texts {
runes := len([]rune(t))
if runes == 0 {
continue
}
total += runes/4 + 1
}
if total == 0 {
return nil
}
return &langfuse.TokenUsage{
Input: total,
Total: total,
Unit: "TOKENS",
}
}
func previewTexts(texts []string, n int) []string {
if len(texts) <= n {
out := make([]string, len(texts))
for i, t := range texts {
out[i] = truncateRunes(t, 120)
}
return out
}
out := make([]string, n)
for i := 0; i < n; i++ {
out[i] = truncateRunes(texts[i], 120)
}
return out
}
func truncateRunes(s string, maxRunes int) string {
r := []rune(s)
if len(r) <= maxRunes {
return s
}
return string(r[:maxRunes]) + "..."
}
func min(a, b int) int {
if a < b {
return a
}
return b
}

View File

@@ -0,0 +1,116 @@
package rerank
import (
"context"
"github.com/Tencent/WeKnora/internal/tracing/langfuse"
)
// langfuseReranker wraps a Reranker and reports each rerank call as a
// Langfuse generation observation. Rerankers don't return token usage, but
// the call still incurs cost (billed per 1K documents by most vendors); we
// estimate input tokens from the query + documents so the Langfuse cost
// dashboard gets a proportional signal.
type langfuseReranker struct {
inner Reranker
}
func (l *langfuseReranker) GetModelName() string { return l.inner.GetModelName() }
func (l *langfuseReranker) GetModelID() string { return l.inner.GetModelID() }
func (l *langfuseReranker) Rerank(ctx context.Context, query string, documents []string) ([]RankResult, error) {
mgr := langfuse.GetManager()
if !mgr.Enabled() {
return l.inner.Rerank(ctx, query, documents)
}
genCtx, gen := mgr.StartGeneration(ctx, langfuse.GenerationOptions{
Name: "rerank",
Model: l.inner.GetModelName(),
Input: map[string]interface{}{
"query": query,
"document_count": len(documents),
// Only send short previews — reranker inputs can be hundreds of
// passages totalling hundreds of KB, which would bloat traces.
"documents_preview": previewDocs(documents, 5),
},
Metadata: map[string]interface{}{
"model_id": l.inner.GetModelID(),
"num_queries": 1,
},
})
results, err := l.inner.Rerank(genCtx, query, documents)
output := map[string]interface{}{
"results": summarizeResults(results, 10),
"total_count": len(results),
}
gen.Finish(output, approxRerankUsage(query, documents), err)
return results, err
}
func approxRerankUsage(query string, documents []string) *langfuse.TokenUsage {
total := len([]rune(query))/4 + 1
for _, d := range documents {
total += len([]rune(d))/4 + 1
}
if total == 0 {
return nil
}
return &langfuse.TokenUsage{
Input: total,
Total: total,
Unit: "TOKENS",
}
}
func previewDocs(docs []string, n int) []map[string]interface{} {
if len(docs) < n {
n = len(docs)
}
out := make([]map[string]interface{}, 0, n)
for i := 0; i < n; i++ {
out = append(out, map[string]interface{}{
"index": i,
"preview": truncateRunes(docs[i], 160),
"length": len([]rune(docs[i])),
})
}
return out
}
func summarizeResults(results []RankResult, n int) []map[string]interface{} {
if len(results) < n {
n = len(results)
}
out := make([]map[string]interface{}, 0, n)
for i := 0; i < n; i++ {
out = append(out, map[string]interface{}{
"index": results[i].Index,
"score": results[i].RelevanceScore,
})
}
return out
}
func truncateRunes(s string, maxRunes int) string {
r := []rune(s)
if len(r) <= maxRunes {
return s
}
return string(r[:maxRunes]) + "..."
}
// wrapRerankerLangfuse applies the Langfuse decorator when the manager is
// enabled. Called from NewReranker after the debug wrapper so both sinks see
// the same calls.
func wrapRerankerLangfuse(r Reranker, err error) (Reranker, error) {
if err != nil || r == nil {
return r, err
}
if !langfuse.GetManager().Enabled() {
return r, nil
}
return &langfuseReranker{inner: r}, nil
}

View File

@@ -116,10 +116,13 @@ func ConfigFromModel(m *types.Model, appID, appSecret string) *RerankerConfig {
// NewReranker creates a reranker based on the configuration
func NewReranker(config *RerankerConfig) (Reranker, error) {
r, err := newReranker(config)
if err != nil || !logger.LLMDebugEnabled() {
if err != nil {
return r, err
}
return &debugReranker{inner: r}, nil
if logger.LLMDebugEnabled() {
r = &debugReranker{inner: r}
}
return wrapRerankerLangfuse(r, nil)
}
// customHeaderSetter 表示支持注入自定义 HTTP header 的 reranker 实现。

View File

@@ -0,0 +1,72 @@
package vlm
import (
"context"
"github.com/Tencent/WeKnora/internal/tracing/langfuse"
)
// langfuseVLM wraps a VLM and reports each Predict call as a Langfuse
// generation. The raw image bytes are NOT uploaded — Langfuse traces are
// designed for text. We include image count and total byte size in the
// metadata, plus the text prompt, which matches how Langfuse's own VLM
// integrations report multimodal calls.
type langfuseVLM struct {
inner VLM
}
func (l *langfuseVLM) GetModelName() string { return l.inner.GetModelName() }
func (l *langfuseVLM) GetModelID() string { return l.inner.GetModelID() }
func (l *langfuseVLM) Predict(ctx context.Context, imgBytes [][]byte, prompt string) (string, error) {
mgr := langfuse.GetManager()
if !mgr.Enabled() {
return l.inner.Predict(ctx, imgBytes, prompt)
}
totalImgSize := 0
for _, b := range imgBytes {
totalImgSize += len(b)
}
genCtx, gen := mgr.StartGeneration(ctx, langfuse.GenerationOptions{
Name: "vlm.predict",
Model: l.inner.GetModelName(),
Input: map[string]interface{}{
"prompt": prompt,
"image_count": len(imgBytes),
},
Metadata: map[string]interface{}{
"model_id": l.inner.GetModelID(),
"image_count": len(imgBytes),
"image_bytes_total": totalImgSize,
},
})
result, err := l.inner.Predict(genCtx, imgBytes, prompt)
// VLMs don't return token usage; approximate prompt tokens for cost
// tracking in Langfuse (users can configure per-model pricing in the UI).
promptTokens := len([]rune(prompt))/4 + 1
outputTokens := len([]rune(result)) / 4
usage := &langfuse.TokenUsage{
Input: promptTokens,
Output: outputTokens,
Total: promptTokens + outputTokens,
Unit: "TOKENS",
}
gen.Finish(result, usage, err)
return result, err
}
// wrapVLMLangfuse applies the Langfuse decorator when the manager is enabled.
func wrapVLMLangfuse(v VLM, err error) (VLM, error) {
if err != nil || v == nil {
return v, err
}
if !langfuse.GetManager().Enabled() {
return v, nil
}
return &langfuseVLM{inner: v}, nil
}

View File

@@ -81,10 +81,13 @@ func stringMapToAnyMap(in map[string]string) map[string]any {
// NewVLM creates a VLM instance based on the provided configuration.
func NewVLM(config *Config, ollamaService *ollama.OllamaService) (VLM, error) {
v, err := newVLM(config, ollamaService)
if err != nil || !logger.LLMDebugEnabled() {
if err != nil {
return v, err
}
return &debugVLM{inner: v}, nil
if logger.LLMDebugEnabled() {
v = &debugVLM{inner: v}
}
return wrapVLMLangfuse(v, nil)
}
func newVLM(config *Config, ollamaService *ollama.OllamaService) (VLM, error) {

View File

@@ -21,6 +21,7 @@ import (
"github.com/Tencent/WeKnora/internal/handler/session"
"github.com/Tencent/WeKnora/internal/logger"
"github.com/Tencent/WeKnora/internal/middleware"
"github.com/Tencent/WeKnora/internal/tracing/langfuse"
"github.com/Tencent/WeKnora/internal/types"
"github.com/Tencent/WeKnora/internal/types/interfaces"
@@ -122,6 +123,10 @@ func NewRouter(params RouterParams) *gin.Engine {
// 添加OpenTelemetry追踪中间件
// r.Use(middleware.TracingMiddleware())
// Langfuse observability — only active when LANGFUSE_* env vars are set.
// The middleware is registered unconditionally; when disabled it's a no-op.
r.Use(langfuse.GinMiddleware())
// 需要认证的API路由
v1 := r.Group("/api/v1")
{

View File

@@ -0,0 +1,85 @@
package langfuse
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
)
// client is a thin HTTP client for the Langfuse ingestion API.
type client struct {
host string
auth string // pre-computed "Basic <base64>"
httpClient *http.Client
debug bool
}
func newClient(cfg Config) *client {
credentials := cfg.PublicKey + ":" + cfg.SecretKey
return &client{
host: strings.TrimRight(cfg.Host, "/"),
auth: "Basic " + base64.StdEncoding.EncodeToString([]byte(credentials)),
httpClient: &http.Client{
Timeout: cfg.RequestTimeout,
},
debug: cfg.Debug,
}
}
type ingestionRequest struct {
Batch []ingestionEvent `json:"batch"`
}
// ingest posts a batch of events to Langfuse. The API accepts partial failure:
// individual event errors are returned in a 207 response, which we surface as
// a logged debug message rather than a hard error (the batch as a whole was
// accepted).
func (c *client) ingest(ctx context.Context, events []ingestionEvent) error {
if len(events) == 0 {
return nil
}
body, err := json.Marshal(ingestionRequest{Batch: events})
if err != nil {
return fmt.Errorf("langfuse: marshal batch: %w", err)
}
endpoint := c.host + "/api/public/ingestion"
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("langfuse: build request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", c.auth)
req.Header.Set("User-Agent", "weknora-langfuse/1.0")
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("langfuse: ingest request failed: %w", err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
// 207 = "multi-status": batch accepted, some events may have failed.
// 2xx = success. Anything else is a transport/auth error worth surfacing.
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
if c.debug && resp.StatusCode == http.StatusMultiStatus {
return fmt.Errorf("langfuse: partial ingest success (207): %s", truncate(string(respBody), 512))
}
return nil
}
return fmt.Errorf("langfuse: ingest failed with status %d: %s", resp.StatusCode, truncate(string(respBody), 512))
}
func truncate(s string, max int) string {
if len(s) <= max {
return s
}
return s[:max] + "..."
}

View File

@@ -0,0 +1,140 @@
// Package langfuse implements a lightweight client for the Langfuse ingestion
// API (https://langfuse.com/docs/api). It lets WeKnora record LLM traces,
// generations and token usage in Langfuse without pulling in a heavy SDK.
//
// The integration is fully opt-in: when disabled (the default), all public
// entry points are cheap no-ops, so callers can wire them unconditionally.
package langfuse
import (
"fmt"
"os"
"strconv"
"strings"
"time"
)
// Config holds the runtime configuration for the Langfuse client.
//
// In practice users enable Langfuse purely through environment variables —
// Host / PublicKey / SecretKey — which matches every other Langfuse SDK and
// keeps WeKnora's YAML config free of secrets.
type Config struct {
// Enabled is the master switch. If false the entire package is a no-op.
Enabled bool
// Host is the Langfuse base URL, e.g. https://cloud.langfuse.com or
// https://us.cloud.langfuse.com or a self-hosted address.
Host string
// PublicKey / SecretKey are the project credentials used for Basic Auth.
PublicKey string
SecretKey string
// FlushAt flushes the queued events once the buffer reaches this size.
FlushAt int
// FlushInterval is the maximum time between automatic flushes.
FlushInterval time.Duration
// QueueSize bounds the in-memory buffer to avoid unbounded growth if the
// Langfuse endpoint is unreachable.
QueueSize int
// RequestTimeout is the HTTP timeout for a single ingestion batch.
RequestTimeout time.Duration
// Release / Environment are attached to every trace for filtering in the
// Langfuse UI (e.g. release="v0.4.2", environment="production").
Release string
Environment string
// SampleRate (0..1) controls trace sampling. 0 means "use 1.0".
SampleRate float64
// Debug enables verbose logging of batch send errors.
Debug bool
}
// LoadConfigFromEnv builds a Config by reading the LANGFUSE_* environment
// variables, mirroring the official Python / JS SDK conventions.
func LoadConfigFromEnv() Config {
cfg := Config{
Host: firstNonEmpty(os.Getenv("LANGFUSE_HOST"), "https://cloud.langfuse.com"),
PublicKey: strings.TrimSpace(os.Getenv("LANGFUSE_PUBLIC_KEY")),
SecretKey: strings.TrimSpace(os.Getenv("LANGFUSE_SECRET_KEY")),
Release: strings.TrimSpace(os.Getenv("LANGFUSE_RELEASE")),
Environment: strings.TrimSpace(os.Getenv("LANGFUSE_ENVIRONMENT")),
FlushAt: 15,
FlushInterval: 3 * time.Second,
QueueSize: 2048,
RequestTimeout: 10 * time.Second,
SampleRate: 1.0,
}
if v := strings.TrimSpace(os.Getenv("LANGFUSE_ENABLED")); v != "" {
cfg.Enabled = parseBool(v)
} else if cfg.PublicKey != "" && cfg.SecretKey != "" {
// Auto-enable when credentials are present — matches the Python SDK.
cfg.Enabled = true
}
if v := strings.TrimSpace(os.Getenv("LANGFUSE_FLUSH_AT")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
cfg.FlushAt = n
}
}
if v := strings.TrimSpace(os.Getenv("LANGFUSE_FLUSH_INTERVAL")); v != "" {
if d, err := time.ParseDuration(v); err == nil && d > 0 {
cfg.FlushInterval = d
} else if n, err := strconv.Atoi(v); err == nil && n > 0 {
cfg.FlushInterval = time.Duration(n) * time.Second
}
}
if v := strings.TrimSpace(os.Getenv("LANGFUSE_QUEUE_SIZE")); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
cfg.QueueSize = n
}
}
if v := strings.TrimSpace(os.Getenv("LANGFUSE_REQUEST_TIMEOUT")); v != "" {
if d, err := time.ParseDuration(v); err == nil && d > 0 {
cfg.RequestTimeout = d
}
}
if v := strings.TrimSpace(os.Getenv("LANGFUSE_SAMPLE_RATE")); v != "" {
if f, err := strconv.ParseFloat(v, 64); err == nil && f >= 0 && f <= 1 {
cfg.SampleRate = f
}
}
if v := strings.TrimSpace(os.Getenv("LANGFUSE_DEBUG")); v != "" {
cfg.Debug = parseBool(v)
}
if cfg.SampleRate == 0 {
cfg.SampleRate = 1.0
}
return cfg
}
// Validate verifies required fields are present when Langfuse is enabled.
func (c Config) Validate() error {
if !c.Enabled {
return nil
}
if strings.TrimSpace(c.Host) == "" {
return fmt.Errorf("langfuse: host is required when enabled")
}
if c.PublicKey == "" || c.SecretKey == "" {
return fmt.Errorf("langfuse: public_key and secret_key are required when enabled")
}
return nil
}
func firstNonEmpty(values ...string) string {
for _, v := range values {
if s := strings.TrimSpace(v); s != "" {
return s
}
}
return ""
}
func parseBool(v string) bool {
switch strings.ToLower(strings.TrimSpace(v)) {
case "1", "true", "t", "yes", "y", "on":
return true
}
return false
}

View File

@@ -0,0 +1,84 @@
package langfuse
import (
"testing"
"time"
)
func TestLoadConfigFromEnv_AutoEnablesWithCredentials(t *testing.T) {
t.Setenv("LANGFUSE_PUBLIC_KEY", "pk-test")
t.Setenv("LANGFUSE_SECRET_KEY", "sk-test")
t.Setenv("LANGFUSE_HOST", "https://example.langfuse.com")
cfg := LoadConfigFromEnv()
if !cfg.Enabled {
t.Fatalf("expected Enabled=true when both keys are set, got false")
}
if cfg.Host != "https://example.langfuse.com" {
t.Errorf("unexpected host: %q", cfg.Host)
}
if cfg.SampleRate != 1.0 {
t.Errorf("expected default SampleRate=1.0, got %v", cfg.SampleRate)
}
}
func TestLoadConfigFromEnv_DisabledWithoutKeys(t *testing.T) {
t.Setenv("LANGFUSE_PUBLIC_KEY", "")
t.Setenv("LANGFUSE_SECRET_KEY", "")
t.Setenv("LANGFUSE_ENABLED", "")
cfg := LoadConfigFromEnv()
if cfg.Enabled {
t.Fatalf("expected Enabled=false when no keys set")
}
}
func TestLoadConfigFromEnv_ExplicitDisableOverridesKeys(t *testing.T) {
t.Setenv("LANGFUSE_PUBLIC_KEY", "pk")
t.Setenv("LANGFUSE_SECRET_KEY", "sk")
t.Setenv("LANGFUSE_ENABLED", "false")
cfg := LoadConfigFromEnv()
if cfg.Enabled {
t.Fatalf("expected Enabled=false when LANGFUSE_ENABLED=false")
}
}
func TestLoadConfigFromEnv_FlushIntervalAcceptsSecondsAndDuration(t *testing.T) {
t.Setenv("LANGFUSE_PUBLIC_KEY", "pk")
t.Setenv("LANGFUSE_SECRET_KEY", "sk")
t.Setenv("LANGFUSE_FLUSH_INTERVAL", "500ms")
cfg := LoadConfigFromEnv()
if cfg.FlushInterval != 500*time.Millisecond {
t.Errorf("expected 500ms, got %v", cfg.FlushInterval)
}
t.Setenv("LANGFUSE_FLUSH_INTERVAL", "7")
cfg = LoadConfigFromEnv()
if cfg.FlushInterval != 7*time.Second {
t.Errorf("expected 7s (bare integer), got %v", cfg.FlushInterval)
}
}
func TestConfigValidate(t *testing.T) {
cases := []struct {
name string
cfg Config
wantErr bool
}{
{"disabled is always valid", Config{Enabled: false}, false},
{"enabled without host fails", Config{Enabled: true, PublicKey: "pk", SecretKey: "sk"}, true},
{"enabled without keys fails", Config{Enabled: true, Host: "https://x"}, true},
{"enabled with all fields passes", Config{Enabled: true, Host: "https://x", PublicKey: "pk", SecretKey: "sk"}, false},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
err := tc.cfg.Validate()
if (err != nil) != tc.wantErr {
t.Fatalf("err=%v wantErr=%v", err, tc.wantErr)
}
})
}
}

View File

@@ -0,0 +1,40 @@
package langfuse
import (
"context"
"github.com/Tencent/WeKnora/internal/types"
)
// traceCtxKey is the exported context key defined in types/const.go. It lives
// there (not inside this package) so that logger.CloneContext — which rebuilds
// a stripped-down context on every request — can preserve the Langfuse trace
// without importing this package. If we kept the key private here, every
// CloneContext call would drop the trace and downstream LLM wrappers would
// each auto-create their own shallow trace, fragmenting a single HTTP request
// into many unrelated traces in the Langfuse UI.
var traceCtxKey = types.LangfuseTraceContextKey
// withTrace stores a *Trace on the context so downstream LLM wrappers can
// attach their generations to it.
func withTrace(ctx context.Context, t *Trace) context.Context {
if t == nil || ctx == nil {
return ctx
}
return context.WithValue(ctx, traceCtxKey, t)
}
// traceFromCtx retrieves the active trace, if any.
func traceFromCtx(ctx context.Context) (*Trace, bool) {
if ctx == nil {
return nil, false
}
t, ok := ctx.Value(traceCtxKey).(*Trace)
return t, ok && t != nil
}
// TraceFromContext is the public accessor used by HTTP middlewares and
// handlers that want to set the trace input/output on the active trace.
func TraceFromContext(ctx context.Context) (*Trace, bool) {
return traceFromCtx(ctx)
}

View File

@@ -0,0 +1,67 @@
package langfuse
import (
"time"
)
// Langfuse ingestion API event envelope.
// https://api.reference.langfuse.com/#tag/ingestion
type ingestionEvent struct {
ID string `json:"id"`
Timestamp string `json:"timestamp"`
Type string `json:"type"`
Body interface{} `json:"body"`
}
// TokenUsage captures the input/output/total token counts reported by the
// underlying model, in Langfuse's canonical schema.
type TokenUsage struct {
Input int `json:"input,omitempty"`
Output int `json:"output,omitempty"`
Total int `json:"total,omitempty"`
Unit string `json:"unit,omitempty"`
}
// traceBody mirrors the /api/public/ingestion trace-create body.
type traceBody struct {
ID string `json:"id"`
Timestamp string `json:"timestamp,omitempty"`
Name string `json:"name,omitempty"`
UserID string `json:"userId,omitempty"`
SessionID string `json:"sessionId,omitempty"`
Release string `json:"release,omitempty"`
Environment string `json:"environment,omitempty"`
Input interface{} `json:"input,omitempty"`
Output interface{} `json:"output,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
Tags []string `json:"tags,omitempty"`
Public bool `json:"public,omitempty"`
}
// observationBody is shared between span-create / generation-create /
// span-update / generation-update events (different fields are populated
// depending on the event type).
type observationBody struct {
ID string `json:"id,omitempty"`
TraceID string `json:"traceId,omitempty"`
ParentObservationID string `json:"parentObservationId,omitempty"`
Type string `json:"type,omitempty"` // SPAN, GENERATION, EVENT
Name string `json:"name,omitempty"`
StartTime string `json:"startTime,omitempty"`
EndTime string `json:"endTime,omitempty"`
Input interface{} `json:"input,omitempty"`
Output interface{} `json:"output,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
Level string `json:"level,omitempty"` // DEFAULT | ERROR | WARNING
StatusMessage string `json:"statusMessage,omitempty"` // free-form
// Generation-specific fields
Model string `json:"model,omitempty"`
ModelParameters map[string]interface{} `json:"modelParameters,omitempty"`
Usage *TokenUsage `json:"usage,omitempty"`
CompletionStart string `json:"completionStartTime,omitempty"`
}
func isoTime(t time.Time) string {
return t.UTC().Format("2006-01-02T15:04:05.000Z")
}

View File

@@ -0,0 +1,185 @@
package langfuse
import (
"context"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/Tencent/WeKnora/internal/logger"
"github.com/google/uuid"
)
// Manager is the public façade of the langfuse package. A singleton is
// installed via Init(); callers should treat a nil *Manager as "disabled"
// and still invoke methods — every public method tolerates a nil receiver.
type Manager struct {
cfg Config
client *client
queue chan ingestionEvent
done chan struct{}
workerWG sync.WaitGroup
closed atomic.Bool
// rng is used for sampling decisions. Guarded by rngMu because
// math/rand.Source isn't goroutine-safe.
rngMu sync.Mutex
rng *rand.Rand
}
var (
globalMu sync.RWMutex
global *Manager
)
// Init builds a Manager from cfg and installs it as the package-wide
// singleton. When cfg.Enabled is false this returns a disabled manager that
// behaves as a no-op for every public method.
func Init(cfg Config) (*Manager, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
m := &Manager{cfg: cfg}
if cfg.Enabled {
m.client = newClient(cfg)
m.queue = make(chan ingestionEvent, cfg.QueueSize)
m.done = make(chan struct{})
m.rng = rand.New(rand.NewSource(time.Now().UnixNano()))
m.workerWG.Add(1)
go m.runWorker()
}
globalMu.Lock()
global = m
globalMu.Unlock()
if cfg.Enabled {
logger.Infof(context.Background(),
"[Langfuse] enabled host=%s flush_at=%d flush_interval=%s sample_rate=%.2f",
cfg.Host, cfg.FlushAt, cfg.FlushInterval, cfg.SampleRate,
)
}
return m, nil
}
// GetManager returns the installed singleton, or nil if Init has not been
// called. Callers must tolerate a nil return.
func GetManager() *Manager {
globalMu.RLock()
defer globalMu.RUnlock()
return global
}
// Enabled reports whether the manager would actually emit events.
func (m *Manager) Enabled() bool {
return m != nil && m.cfg.Enabled && !m.closed.Load()
}
// Shutdown drains pending events, signals the worker to stop and waits for it.
// Safe to call multiple times.
func (m *Manager) Shutdown(ctx context.Context) error {
if m == nil || !m.cfg.Enabled {
return nil
}
if !m.closed.CompareAndSwap(false, true) {
return nil
}
close(m.done)
doneCh := make(chan struct{})
go func() {
m.workerWG.Wait()
close(doneCh)
}()
select {
case <-doneCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// enqueue drops silently when either disabled, full or closed. Langfuse is
// observability, not business logic — back-pressure or failures must never
// block the request path.
func (m *Manager) enqueue(ev ingestionEvent) {
if !m.Enabled() {
return
}
select {
case m.queue <- ev:
default:
if m.cfg.Debug {
logger.Warnf(context.Background(), "[Langfuse] queue full, dropping event type=%s", ev.Type)
}
}
}
// sample decides whether to emit based on SampleRate. Sampling is applied once
// per trace; observations attached to an already-sampled trace are always kept
// (Langfuse itself would drop orphaned observations anyway).
func (m *Manager) sample() bool {
if !m.Enabled() {
return false
}
if m.cfg.SampleRate >= 1.0 {
return true
}
m.rngMu.Lock()
defer m.rngMu.Unlock()
return m.rng.Float64() < m.cfg.SampleRate
}
// runWorker batches queued events and flushes them either when the batch
// reaches FlushAt, when FlushInterval elapses, or when the manager shuts down.
func (m *Manager) runWorker() {
defer m.workerWG.Done()
ticker := time.NewTicker(m.cfg.FlushInterval)
defer ticker.Stop()
buf := make([]ingestionEvent, 0, m.cfg.FlushAt)
flush := func(reason string) {
if len(buf) == 0 {
return
}
ctx, cancel := context.WithTimeout(context.Background(), m.cfg.RequestTimeout)
defer cancel()
if err := m.client.ingest(ctx, buf); err != nil && m.cfg.Debug {
logger.Warnf(ctx, "[Langfuse] flush (%s) failed: %v", reason, err)
}
buf = buf[:0]
}
for {
select {
case ev := <-m.queue:
buf = append(buf, ev)
if len(buf) >= m.cfg.FlushAt {
flush("batch-full")
}
case <-ticker.C:
flush("interval")
case <-m.done:
// Drain whatever remains in the queue before exiting.
for {
select {
case ev := <-m.queue:
buf = append(buf, ev)
if len(buf) >= m.cfg.FlushAt {
flush("batch-full-on-shutdown")
}
default:
flush("shutdown")
return
}
}
}
}
}
// newID returns a Langfuse-compatible UUIDv4.
func newID() string {
return uuid.New().String()
}

View File

@@ -0,0 +1,130 @@
package langfuse
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
)
// TestManager_DisabledIsNoop verifies that when the manager is disabled the
// public API is safe to call and produces no side effects.
func TestManager_DisabledIsNoop(t *testing.T) {
m, err := Init(Config{Enabled: false})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if m.Enabled() {
t.Fatal("expected disabled")
}
ctx, trace := m.StartTrace(context.Background(), TraceOptions{Name: "x"})
trace.Finish(nil, nil)
_, gen := m.StartGeneration(ctx, GenerationOptions{Name: "g", Model: "m"})
gen.Finish(nil, nil, nil)
if err := m.Shutdown(context.Background()); err != nil {
t.Fatalf("shutdown: %v", err)
}
}
// TestManager_FullRoundTrip boots a fake Langfuse server, runs a trace +
// generation through the manager, and asserts the ingested payload contains
// the expected ids, model name and usage.
func TestManager_FullRoundTrip(t *testing.T) {
var mu sync.Mutex
var batches []ingestionRequest
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/public/ingestion" {
t.Errorf("unexpected path: %s", r.URL.Path)
w.WriteHeader(404)
return
}
if auth := r.Header.Get("Authorization"); auth == "" {
t.Errorf("missing Authorization header")
}
body, _ := io.ReadAll(r.Body)
var req ingestionRequest
if err := json.Unmarshal(body, &req); err != nil {
t.Errorf("decode body: %v", err)
w.WriteHeader(400)
return
}
mu.Lock()
batches = append(batches, req)
mu.Unlock()
w.WriteHeader(200)
w.Write([]byte(`{}`))
}))
defer srv.Close()
m, err := Init(Config{
Enabled: true,
Host: srv.URL,
PublicKey: "pk",
SecretKey: "sk",
FlushAt: 1,
FlushInterval: 10 * time.Millisecond,
QueueSize: 16,
RequestTimeout: 2 * time.Second,
SampleRate: 1.0,
})
if err != nil {
t.Fatalf("init: %v", err)
}
ctx, trace := m.StartTrace(context.Background(), TraceOptions{
Name: "test.trace",
UserID: "user-42",
})
_, gen := m.StartGeneration(ctx, GenerationOptions{
Name: "chat.completion",
Model: "gpt-test",
Input: []map[string]string{{"role": "user", "content": "hi"}},
})
gen.Finish("hello", &TokenUsage{Input: 10, Output: 20, Total: 30, Unit: "TOKENS"}, nil)
trace.Finish("hello", nil)
if err := m.Shutdown(context.Background()); err != nil {
t.Fatalf("shutdown: %v", err)
}
mu.Lock()
defer mu.Unlock()
// We should have received at least:
// trace-create, generation-create, generation-update, trace-create(update)
// possibly split across multiple HTTP calls depending on batching.
var events []ingestionEvent
for _, b := range batches {
events = append(events, b.Batch...)
}
if len(events) < 4 {
t.Fatalf("expected >=4 events, got %d: %+v", len(events), events)
}
var sawGenerationUpdate bool
for _, ev := range events {
if ev.Type != "generation-update" {
continue
}
b, _ := json.Marshal(ev.Body)
var body observationBody
_ = json.Unmarshal(b, &body)
if body.Usage == nil || body.Usage.Total != 30 {
t.Errorf("expected usage total=30, got %+v", body.Usage)
}
if body.TraceID != trace.ID {
t.Errorf("generation trace id mismatch: got %s want %s", body.TraceID, trace.ID)
}
sawGenerationUpdate = true
}
if !sawGenerationUpdate {
t.Fatalf("no generation-update event found")
}
}

View File

@@ -0,0 +1,99 @@
package langfuse
import (
"context"
"strconv"
"strings"
"github.com/Tencent/WeKnora/internal/types"
"github.com/gin-gonic/gin"
)
// GinMiddleware returns a Gin handler that opens a Langfuse trace for each
// incoming request that hits a traced path. The trace is auto-finished when
// the handler chain returns; individual LLM calls inside the handler attach
// their generations to this trace via the request context.
//
// Only paths matching shouldTrace are traced — static assets, health checks
// and polling endpoints are noisy and uninteresting.
func GinMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
mgr := GetManager()
if !mgr.Enabled() || !shouldTrace(c) {
c.Next()
return
}
ctx := c.Request.Context()
userID := extractUserID(ctx)
sessionID := extractSessionID(c)
opts := TraceOptions{
Name: c.Request.Method + " " + c.FullPath(),
UserID: userID,
SessionID: sessionID,
Metadata: map[string]interface{}{
"http.method": c.Request.Method,
"http.path": c.FullPath(),
"http.query": c.Request.URL.RawQuery,
},
Tags: []string{"http", strings.ToLower(c.Request.Method)},
}
if rid, ok := types.RequestIDFromContext(ctx); ok {
opts.Metadata["request_id"] = rid
}
newCtx, trace := mgr.StartTrace(ctx, opts)
c.Request = c.Request.WithContext(newCtx)
c.Next()
trace.Finish(map[string]interface{}{
"status": c.Writer.Status(),
}, map[string]interface{}{
"http.status_code": c.Writer.Status(),
"response.size": c.Writer.Size(),
})
}
}
// shouldTrace restricts tracing to chat / search / agent / evaluation
// endpoints where LLMs run. Everything else (auth, list, config, static…)
// is skipped to keep the Langfuse dashboard signal-to-noise high.
func shouldTrace(c *gin.Context) bool {
path := c.FullPath()
if path == "" {
return false
}
switch {
case strings.HasPrefix(path, "/api/v1/knowledge-chat"),
strings.HasPrefix(path, "/api/v1/agent-chat"),
strings.HasPrefix(path, "/api/v1/knowledge-search"),
strings.HasPrefix(path, "/api/v1/sessions") && strings.Contains(path, "generate_title"),
strings.HasPrefix(path, "/api/v1/initialization/remote/check"),
strings.HasPrefix(path, "/api/v1/initialization/embedding/test"),
strings.HasPrefix(path, "/api/v1/evaluation"):
return true
}
return false
}
func extractUserID(ctx context.Context) string {
if v, ok := ctx.Value(types.UserIDContextKey).(string); ok && v != "" {
return v
}
if v, ok := ctx.Value(types.TenantIDContextKey).(uint64); ok && v != 0 {
return "tenant:" + strconv.FormatUint(v, 10)
}
return ""
}
func extractSessionID(c *gin.Context) string {
if v := c.Param("session_id"); v != "" {
return v
}
if v := c.Param("id"); v != "" && strings.Contains(c.FullPath(), "/sessions/") {
return v
}
return ""
}

View File

@@ -0,0 +1,209 @@
package langfuse
import (
"context"
"time"
)
// Trace represents an active root observation. A Trace is conceptually one
// "request" (e.g. a chat turn). Generations and spans attached to it roll up
// as children in the Langfuse UI.
type Trace struct {
ID string
manager *Manager
sampled bool
}
// Generation represents a single model invocation (LLM / embedding / VLM).
type Generation struct {
ID string
TraceID string
manager *Manager
sampled bool
startTime time.Time
model string
name string
}
// TraceOptions configures a new trace.
type TraceOptions struct {
Name string
UserID string
SessionID string
Input interface{}
Metadata map[string]interface{}
Tags []string
Environment string
Release string
}
// GenerationOptions configures a new generation observation.
type GenerationOptions struct {
Name string
Model string
Input interface{}
Metadata map[string]interface{}
ModelParameters map[string]interface{}
}
// StartTrace opens a new trace, stores its ID in the returned ctx, and returns
// a handle callers can finish with FinishTrace. When the manager is disabled
// or sampling excludes the trace, the returned *Trace is non-nil but all
// methods are no-ops so callers don't need nil checks.
func (m *Manager) StartTrace(ctx context.Context, opts TraceOptions) (context.Context, *Trace) {
if m == nil || !m.cfg.Enabled {
return ctx, &Trace{}
}
sampled := m.sample()
id := newID()
t := &Trace{ID: id, manager: m, sampled: sampled}
if sampled {
env := opts.Environment
if env == "" {
env = m.cfg.Environment
}
release := opts.Release
if release == "" {
release = m.cfg.Release
}
body := traceBody{
ID: id,
Timestamp: isoTime(time.Now()),
Name: opts.Name,
UserID: opts.UserID,
SessionID: opts.SessionID,
Input: opts.Input,
Metadata: opts.Metadata,
Tags: opts.Tags,
Environment: env,
Release: release,
}
m.enqueue(ingestionEvent{
ID: newID(),
Timestamp: isoTime(time.Now()),
Type: "trace-create",
Body: body,
})
}
return withTrace(ctx, t), t
}
// Finish updates the trace with its final output. Safe to call on a disabled
// trace (no-op).
func (t *Trace) Finish(output interface{}, metadata map[string]interface{}) {
if t == nil || t.manager == nil || !t.sampled {
return
}
// "trace-create" events are also used to update traces in Langfuse —
// the server merges repeated events by ID. See the ingestion API docs.
body := traceBody{
ID: t.ID,
Output: output,
Metadata: metadata,
}
t.manager.enqueue(ingestionEvent{
ID: newID(),
Timestamp: isoTime(time.Now()),
Type: "trace-create",
Body: body,
})
}
// StartGeneration opens a generation observation under the trace carried by
// ctx (or a newly auto-created trace if none is present).
func (m *Manager) StartGeneration(ctx context.Context, opts GenerationOptions) (context.Context, *Generation) {
if m == nil || !m.cfg.Enabled {
return ctx, &Generation{}
}
// If the caller hasn't opened a trace yet, create a shallow auto-trace so
// the generation has a parent. This keeps single-shot internal callers
// (e.g. test connections) observable.
trace, ok := traceFromCtx(ctx)
if !ok || trace == nil {
newCtx, t := m.StartTrace(ctx, TraceOptions{Name: opts.Name})
ctx = newCtx
trace = t
}
if !trace.sampled {
return ctx, &Generation{}
}
now := time.Now()
g := &Generation{
ID: newID(),
TraceID: trace.ID,
manager: m,
sampled: true,
startTime: now,
model: opts.Model,
name: opts.Name,
}
body := observationBody{
ID: g.ID,
TraceID: g.TraceID,
Type: "GENERATION",
Name: opts.Name,
StartTime: isoTime(now),
Input: opts.Input,
Metadata: opts.Metadata,
Model: opts.Model,
ModelParameters: opts.ModelParameters,
}
m.enqueue(ingestionEvent{
ID: newID(),
Timestamp: isoTime(now),
Type: "generation-create",
Body: body,
})
return ctx, g
}
// Finish updates a generation with its final output, token usage and any
// error. A non-nil err marks the observation as ERROR level in Langfuse.
func (g *Generation) Finish(output interface{}, usage *TokenUsage, err error) {
if g == nil || g.manager == nil || !g.sampled {
return
}
level := "DEFAULT"
var statusMsg string
if err != nil {
level = "ERROR"
statusMsg = err.Error()
}
body := observationBody{
ID: g.ID,
TraceID: g.TraceID,
Type: "GENERATION",
EndTime: isoTime(time.Now()),
Output: output,
Usage: usage,
Level: level,
StatusMessage: statusMsg,
}
g.manager.enqueue(ingestionEvent{
ID: newID(),
Timestamp: isoTime(time.Now()),
Type: "generation-update",
Body: body,
})
}
// MarkCompletionStart records the time at which the first token was received
// in a streaming generation. Langfuse surfaces this as time-to-first-token.
func (g *Generation) MarkCompletionStart(t time.Time) {
if g == nil || g.manager == nil || !g.sampled {
return
}
body := observationBody{
ID: g.ID,
TraceID: g.TraceID,
Type: "GENERATION",
CompletionStart: isoTime(t),
}
g.manager.enqueue(ingestionEvent{
ID: newID(),
Timestamp: isoTime(time.Now()),
Type: "generation-update",
Body: body,
})
}

View File

@@ -23,6 +23,10 @@ const (
EmbedQueryContextKey ContextKey = "EmbedQuery"
// LanguageContextKey is the context key for user language preference (e.g. "zh-CN", "en-US")
LanguageContextKey ContextKey = "Language"
// LangfuseTraceContextKey carries the active Langfuse *Trace across the
// request lifecycle. Defined here (not inside the langfuse package) so
// that logger.CloneContext can preserve it without importing langfuse.
LangfuseTraceContextKey ContextKey = "LangfuseTrace"
)
// String returns the string representation of the context key