fix(retriever/doris): code review cleanup

针对 4cce6f2e(接入 Apache Doris)的 code review 修复,主要修正若干阻断性
问题与可读性问题,并剔除不应进入主仓的本地工作流文件。

阻断性修复:
- docker-compose: Doris 镜像由 2.1.0 升至 4.1.0。原 2.1.0 不支持 HNSW
  ANN、cosine_distance_approximate 与 Stream Load partial_columns,
  按当前 DDL 一启动就会失败。
- DSN 字面量拼接改用 mysql.Config.FormatDSN()。原 fmt.Sprintf 在用户名/
  密码包含 `@`/`:`/`/` 等字符时会跑偏。覆盖 health check 与 engine
  factory 两处。

健壮性修复:
- 新增 validateEmbedding,写入与查询前拒绝 NaN/±Inf;strconv.FormatFloat
  对非有限值会输出 "NaN"/"+Inf" 拼成无效 SQL。
- waitANNReady 改为后台 goroutine + 独立 context,避免新维度首次写入路径
  阻塞最长 30s;ANN 未就绪时 Doris 会自动退化为 brute-force。

清理:
- annIndexReady 移除最终两个分支都 return true 的冗余写法。
- Save 移除冗余的双重 toDorisVectorEmbedding。
- testDorisConnection 把 "5.7.99 Doris-4.1.0" 解析为裸 "4.1.0",与
  Postgres/ES 的版本格式对齐。

剔除(不应合入主仓):
- docs/wiki/集成扩展/Doris改动与上游同步.md:纯 fork 维护工作流文档。
- scripts/e2e-doris.sh:作者本地 E2E 验证清单。

测试:
- repository_test 用 require.Eventually 适配 ANN 异步轮询。
- 现有 doris 单测全部通过。
This commit is contained in:
wizardchen
2026-05-09 00:27:39 +08:00
committed by lyingbug
parent 4cce6f2e99
commit fc6f160eff
9 changed files with 101 additions and 406 deletions

View File

@@ -375,6 +375,9 @@ services:
# ---------------------------------------------------------------------------
# Apache Doris 4.1FE + BE 单实例 standalone 部署opt-in via `--profile doris`
#
# 版本要求:必须 >= 3.0HNSW ANN 索引、cosine_distance_approximate、
# Stream Load partial_columns。本 compose 使用 4.1 官方稳定 tag。
#
# 端口说明:
# - 9030 (FE MySQL)WeKnora 主链路读写、SQL 查询都走此端口。
# - 8030 (FE HTTP)Stream Load partial update 入口。
@@ -386,7 +389,7 @@ services:
# 数据持久化在 doris_fe_data / doris_be_data 卷中。
# ---------------------------------------------------------------------------
doris-fe:
image: apache/doris:fe-2.1.0
image: apache/doris:fe-4.1.0
container_name: WeKnora-doris-fe
hostname: doris-fe
environment:
@@ -405,7 +408,7 @@ services:
- doris
doris-be:
image: apache/doris:be-2.1.0
image: apache/doris:be-4.1.0
container_name: WeKnora-doris-be
hostname: doris-be
environment:

View File

@@ -1,295 +0,0 @@
---
title: Doris 改动与上游同步
tags: [集成扩展, 向量数据库, Doris, fork, 上游同步, 维护]
aliases: [Doris fork 工作流, Doris 改动记录, Doris 同步策略]
source: 本仓库本地维护文档
---
# Apache Doris 4.1 集成改动与上游同步策略
本文记录在 WeKnora 中**新增 Apache Doris 4.1 作为第 6 种检索引擎**这一改动的完整细节,并给出与上游 [Tencent/WeKnora](https://github.com/Tencent/WeKnora) 长期保持同步的工作流建议。
> 相关文档:
>
> - [集成向量数据库](../集成扩展/集成向量数据库.md) —— 通用集成指南,含 Doris 章节
> - 上层版本:[使用其他向量数据库](../../使用其他向量数据库.md)
## 一、背景
当前仓库的 git 配置(**改动前**
- `origin` -> `https://github.com/Tencent/WeKnora.git`(直接指向上游)
- 当前分支 `main` 跟踪 `origin/main`
- Doris 改动以本地工作区形式存在
如果继续在 `main` 上累积本地修改、再 `git pull` 拉上游,长期会演变成「几百次 rebase 冲突 + 没法发 PR + 找不到自己改了什么」的死路。需要切到 fork 工作流。
## 二、推荐的上游同步工作流
### 2.1 仓库角色拆分
```mermaid
flowchart LR
upstream["upstream<br/>Tencent/WeKnora<br/>read only"] -->|fetch| local["本地仓库 main"]
local -->|push| fork["origin<br/>个人 fork<br/>yourname/WeKnora"]
local -->|长期分支| feat["feature/doris-engine"]
feat -->|push| fork
feat -.->|可选 PR 回上游| upstream
```
### 2.2 一次性配置
> 在 GitHub 上把 `Tencent/WeKnora` fork 到自己账号(例如 `yourname/WeKnora`)后执行:
```bash
# 把现在的 origin 改名为 upstream只读引用上游
git remote rename origin upstream
# 把自己的 fork 设为 originpush 目标)
git remote add origin https://github.com/<你的用户名>/WeKnora.git
git push -u origin main
# 把 Doris 改动放到长期 feature 分支
git checkout -b feature/doris-engine
git add .
git commit -m "feat(retriever): integrate Apache Doris 4.1"
git push -u origin feature/doris-engine
```
完成后 remote 应该是这样:
```bash
$ git remote -v
origin https://github.com/<你的用户名>/WeKnora.git (fetch)
origin https://github.com/<你的用户名>/WeKnora.git (push)
upstream https://github.com/Tencent/WeKnora.git (fetch)
upstream https://github.com/Tencent/WeKnora.git (push)
```
### 2.3 长期同步节奏(建议每周 / 每次大版本拉新)
```bash
# 1) 拉上游
git fetch upstream
# 2) main 直接 fast-forward
git checkout main
git merge --ff-only upstream/main
git push origin main
# 3) feature 分支 rebase 到最新 main
git checkout feature/doris-engine
git rebase main
# 4) 解决冲突 -> 跑测试 -> 强推 fork
go build ./...
go test ./internal/types/... \
./internal/application/repository/retriever/doris/... \
./internal/application/service/...
git push --force-with-lease origin feature/doris-engine
```
**rebase 优于 merge** 的理由feature 分支保持线性历史,未来想发 PR 给上游会容易很多merge 会让 commit 图谱变成一团乱麻,也难以 cherry-pick 单条改动。
### 2.4 冲突高发文件(每次 rebase 必看)
下面这些位置都是 WeKnora 的「检索引擎扩展点」,上游每加一个新引擎都会改一次,几乎一定会和我们的 Doris 分支撞:
| 文件 | 撞点 |
| --- | --- |
| [internal/types/retriever.go](../../../internal/types/retriever.go) | `RetrieverEngineType` 枚举 |
| [internal/types/tenant.go](../../../internal/types/tenant.go) | `retrieverEngineMapping` map |
| [internal/types/vectorstore.go](../../../internal/types/vectorstore.go) | `ConnectionConfig` / `IndexConfig` 结构、`GetVectorStoreTypes` / `BuildEnvVectorStores` / `ValidateIndexConfig` 三个函数 |
| [internal/application/service/vectorstore.go](../../../internal/application/service/vectorstore.go) | `validateConnectionConfig` switch |
| [internal/application/service/vectorstore_healthcheck.go](../../../internal/application/service/vectorstore_healthcheck.go) | `TestConnection` switch |
| [internal/container/engine_factory.go](../../../internal/container/engine_factory.go) | `createEngineServiceFromStore` switch |
| [internal/container/container.go](../../../internal/container/container.go) | `initRetrieveEngineRegistry` 中的 driver if 链 |
| [docker-compose.yml](../../../docker-compose.yml) | app 服务 `environment:` 段、`volumes:` 段 |
### 2.5 接口契约盯防
最容易踩坑的是 `interfaces.RetrieveEngineRepository`[internal/types/interfaces/retriever.go](../../../internal/types/interfaces/retriever.go)):上游一旦增加方法,所有实现包(包括 Doris必须同步实现否则编译失败。
每次 rebase 后**必须**跑:
```bash
go build ./...
go vet ./...
go test ./internal/types/... \
./internal/application/repository/retriever/doris/... \
./internal/application/service/...
```
### 2.6 依赖 / 镜像维护
- **Go 依赖**:本次 Doris 集成新增 `github.com/go-sql-driver/mysql`(主依赖)和 `github.com/DATA-DOG/go-sqlmock`(测试依赖),它们被 `internal/application/repository/retriever/doris/` 直接引用,上游 `go mod tidy` 不会丢弃它们。
- **Doris 镜像版本**`docker-compose.yml` 中的 `apache/doris:fe-2.1.0` / `apache/doris:be-2.1.0` 需要随官方 4.x 镜像发布跟进升级。建议每个季度核对一次 [Apache Doris 镜像列表](https://hub.docker.com/r/apache/doris/tags)。
## 三、本次改动完整清单
### 3.1 改动统计
- 修改15 个现有文件,约 +566 / -6 行
- 新增7 个文件,约 +2057 行
- 总计22 个文件,约 +2617 行
### 3.2 新增文件7 个)
| 文件 | 行数 | 主要内容 |
| --- | ---: | --- |
| [internal/application/repository/retriever/doris/structs.go](../../../internal/application/repository/retriever/doris/structs.go) | 63 | `dorisRepository` 结构体(`*sql.DB` / `*http.Client` / 凭据 / 表配置 / `initializedTables sync.Map`+ `DorisVectorEmbedding(WithScore)` 领域模型 |
| [internal/application/repository/retriever/doris/schema.go](../../../internal/application/repository/retriever/doris/schema.go) | 280 | `ensureTable` + `tableExists` + `createTable` + DDL 模板UNIQUE KEY MoW + INVERTED + ANN HNSW + chinese parser+ `waitANNReady` 异步索引就绪轮询 + `listEmbeddingTables` |
| [internal/application/repository/retriever/doris/query.go](../../../internal/application/repository/retriever/doris/query.go) | 202 | `whereBuilder``addEqual` / `addIn` / `addNotIn`+ `buildBaseFilter` + `embeddingLiteral` / `parseEmbeddingLiteral`locale 安全的浮点序列化) |
| [internal/application/repository/retriever/doris/repository.go](../../../internal/application/repository/retriever/doris/repository.go) | 584 | `NewDorisRetrieveEngineRepository` 构造 + `interfaces.RetrieveEngineRepository` 12 个方法实现(`Save` / `BatchSave` / `Retrieve` 分发 / `VectorRetrieve` / `KeywordsRetrieve` / `Delete*` 三件套 / `CopyIndices` / `EstimateStorageSize` / `EngineType` / `Support`+ `scanRetrieveRows` / `scanCopyRows` / `translateSourceID` / `calculateStorageSize` |
| [internal/application/repository/retriever/doris/streamload.go](../../../internal/application/repository/retriever/doris/streamload.go) | 345 | `partialUpdateRows` + `streamLoadOnce`HTTP PUT、Basic auth、`partial_columns` / `strip_outer_array` / `merge_type=APPEND` headers、`req.GetBody` 防 307 失体)+ `chunkRows`1 MiB 自动拆批)+ `BatchUpdateChunkEnabledStatus` / `BatchUpdateChunkTagID` 真实实现 + `lookupChunkRowKeys` |
| [internal/application/repository/retriever/doris/repository_test.go](../../../internal/application/repository/retriever/doris/repository_test.go) | 510 | 基于 `go-sqlmock` + `httptest.Server` 的单测whereBuilder / embeddingLiteral / chunkRows / partialUpdateRows / DeleteByChunkIDList / VectorRetrieve / KeywordsRetrieve / BatchSave / ensureTable DDL / BatchUpdateChunkEnabledStatus / EngineType+Support / translateSourceID / EstimateStorageSize |
| [scripts/e2e-doris.sh](../../../scripts/e2e-doris.sh) | 73 | 端到端联调 checklist 脚本(`docker compose --profile doris up` 后逐步验证 store 创建 / 写入 / 检索 / 状态批改) |
### 3.3 修改的现有文件15 个)
#### 类型 / 常量层5 个)
- **[internal/types/retriever.go](../../../internal/types/retriever.go)**+1 行):枚举增加 `DorisRetrieverEngineType RetrieverEngineType = "doris"`
- **[internal/types/tenant.go](../../../internal/types/tenant.go)**+4 行):`retrieverEngineMapping["doris"]` -> `{Keywords, Vector}`
- **[internal/types/vectorstore.go](../../../internal/types/vectorstore.go)**+80 行):
- `import` 新增 `strconv`
- `validEngineTypes` 收录 `DorisRetrieverEngineType`
- `ConnectionConfig` 新增 `HTTPPort int` / `Database string`
- `IndexConfig` 新增 `BucketsNum int` / `ReplicationNum int`
- `GetIndexNameOrDefault` switch 增加 `case DorisRetrieverEngineType`
- 新增 `(*IndexConfig).GetBucketsNum(def)` / `GetReplicationNum(def)` helper
- `GetVectorStoreTypes()` 追加 doris 项5 个连接字段 + 3 个索引字段)
- `buildEnvStoreForDriver` 增加 `case "doris"`,读取 `DORIS_ADDR / DORIS_HTTP_PORT / DORIS_DATABASE / DORIS_USERNAME / DORIS_PASSWORD / DORIS_TABLE_PREFIX` 共 6 个环境变量
- `ValidateIndexConfig``BucketsNum` / `ReplicationNum``0..maxShards` / `0..maxReplicas` 边界
- **[internal/application/service/vectorstore.go](../../../internal/application/service/vectorstore.go)**+7 行):`validateConnectionConfig` switch 新增 doris 分支addr / database 必填)
- **[internal/application/service/vectorstore_healthcheck.go](../../../internal/application/service/vectorstore_healthcheck.go)**+45 行):
- blank import `_ "github.com/go-sql-driver/mysql"`
- `TestConnection` switch 增加 `case types.DorisRetrieverEngineType`
- 新增 `testDorisConnection`PingContext + `SELECT @@version`,超时 10s
#### 装配层2 个)
- **[internal/container/engine_factory.go](../../../internal/container/engine_factory.go)**+52 行):
- import 增加 `database/sql` / `strconv` / `_ "github.com/go-sql-driver/mysql"` / `dorisRepo`
- `createEngineServiceFromStore` switch 增加 `case types.DorisRetrieverEngineType`
- 新增 `createDorisEngine`(拼 DSN + `sql.Open` + 池参数 + `NewDorisRetrieveEngineRepository`+ `hostFromAddr` 辅助
- **[internal/container/container.go](../../../internal/container/container.go)**+49 行):
- import 增加 `_ "github.com/go-sql-driver/mysql"` / `dorisRepo`
- `initRetrieveEngineRegistry` 在 milvus 分支后追加 `slices.Contains(retrieveDriver, "doris")` 分支:读 6 个 `DORIS_*` env -> 构造 `*sql.DB` -> `NewDorisRetrieveEngineRepository(..., nil)` -> `registry.Register`
#### 测试2 个)
- **[internal/types/vectorstore_test.go](../../../internal/types/vectorstore_test.go)**+90 行):
- `TestBuildEnvVectorStores` 的 envMap 加 6 个 `DORIS_*`
- `all supported drivers` 用例期望从 7 改 8、加 `__env_doris__` 断言
- 新增 `doris env store` / `doris env store handles invalid http port gracefully` 子测试
- `TestGetVectorStoreTypes` 期望从 4 改 5、加 `Contains "doris"`、新增 `doris has connection and index fields` 子测试
- `TestValidateIndexConfig` 加 4 个 doris 子用例buckets_num 边界 / replication_num 边界 / GetIndexNameOrDefault 默认 / 自定义)
- **[internal/application/service/vectorstore_test.go](../../../internal/application/service/vectorstore_test.go)**+44 行):
- import 新增 `time`
- `TestValidateConnectionConfig` 表加 3 个 doris 用例valid / missing addr / missing database
- 新增 `TestTestConnection_DorisInvalidAddr` / `TestTestConnection_DorisMissingAddr`
#### 依赖、配置、文档5 个)
- **[go.mod](../../../go.mod) / [go.sum](../../../go.sum)**
- 新增主依赖 `github.com/go-sql-driver/mysql v1.10.0`(间接:`filippo.io/edwards25519`
- 新增测试依赖 `github.com/DATA-DOG/go-sqlmock v1.5.2`
- **[.env.example](../../../.env.example)**+21 行):`RETRIEVE_DRIVER` 注释加 `doris`,新增 7 行 `DORIS_*` 环境变量段
- **[docker-compose.yml](../../../docker-compose.yml)**+62 行):
- app 服务 `environment:` 段加 6 行 `DORIS_*` 注入
- 新增 `doris-fe` 服务(`apache/doris:fe-2.1.0`,端口 8030 + 9030profile: doris
- 新增 `doris-be` 服务(`apache/doris:be-2.1.0`,端口 8040profile: dorisdepends_on doris-fe
- `volumes:` 段加 `doris_fe_meta` / `doris_fe_log` / `doris_be_storage` / `doris_be_log`
- **[docs/使用其他向量数据库.md](../../使用其他向量数据库.md)**+89 行):参考实现列表加 doris 路径;新增 "Apache Doris 4.1 集成说明" 章节(协议 / 表结构 / 索引 / 分数语义 / 关键词检索 / Stream Load / env / 启动方式)
- **[docs/wiki/集成扩展/集成向量数据库.md](../集成扩展/集成向量数据库.md)**+26 行):参考实现列表加 doris 路径;新增 "Apache Doris 4.1 集成要点" 简版章节
### 3.4 整体调用链rebase 后回归对照图)
```mermaid
flowchart TD
A["VectorStoreService.CreateStore"] --> B["TestConnection<br/>MySQL Ping + version"]
B --> C["repo.Create"]
C --> D["EngineFactory.createDorisEngine"]
D --> E["sql.Open mysql + http.Client"]
E --> F["NewDorisRetrieveEngineRepository"]
F --> G["NewKVHybridRetrieveEngine"]
G --> H["registry.byStoreID[id] = service"]
subgraph dorisPkg ["internal/application/repository/retriever/doris"]
R1["repository.go<br/>Save / Retrieve / Delete..."]
R2["schema.go<br/>ensureTable + DDL"]
R3["query.go<br/>filter -> SQL WHERE"]
R4["streamload.go<br/>HTTP partial update"]
end
F --> dorisPkg
```
## 四、回归 / 验证清单
### 4.1 单元测试
```bash
# Doris 包专属单测go-sqlmock + httptest
go test ./internal/application/repository/retriever/doris/... -count=1
# 类型层测试(含 BuildEnvVectorStores / GetVectorStoreTypes / ValidateIndexConfig 等扩展点)
go test ./internal/types/... -count=1
# 服务层 Doris 用例
go test ./internal/application/service/ -run "Doris|TestConnection|ValidateConnectionConfig" -count=1
```
> 注:`go test ./internal/application/service/...` 中存在若干 `TestCreateStore_*` 是会主动 dial `http://es:9200` 的预先存在用例,无网络环境会失败,与 Doris 改动无关。
### 4.2 docker compose 配置自检
```bash
docker compose --profile doris config -o /tmp/compose.out
echo $? # 期望 0
```
### 4.3 端到端联调
```bash
docker compose --profile doris up -d
docker exec -it WeKnora-doris-fe \
mysql -h 127.0.0.1 -P 9030 -uroot \
-e "CREATE DATABASE IF NOT EXISTS weknora;"
RETRIEVE_DRIVER=doris make run
bash scripts/e2e-doris.sh
```
## 五、未来上游若新增引擎,模板化 checklist
如果上游某次合入了一个新引擎(例如 `chroma`rebase 时按下面的 checklist 检查每个扩展点是否同时容纳上游 + Doris
- [ ] [internal/types/retriever.go](../../../internal/types/retriever.go) 的 `RetrieverEngineType` 枚举:保留 `DorisRetrieverEngineType`
- [ ] [internal/types/tenant.go](../../../internal/types/tenant.go) 的 `retrieverEngineMapping`:保留 `"doris"` key
- [ ] [internal/types/vectorstore.go](../../../internal/types/vectorstore.go) 的 `validEngineTypes``GetIndexNameOrDefault` switch、`GetVectorStoreTypes()``buildEnvStoreForDriver` switch、`ValidateIndexConfig` 五处:保留 doris 分支
- [ ] [internal/application/service/vectorstore.go](../../../internal/application/service/vectorstore.go) 的 `validateConnectionConfig` switch保留 doris 分支
- [ ] [internal/application/service/vectorstore_healthcheck.go](../../../internal/application/service/vectorstore_healthcheck.go) 的 `TestConnection` switch保留 `testDorisConnection` 分支
- [ ] [internal/container/engine_factory.go](../../../internal/container/engine_factory.go) 的 `createEngineServiceFromStore` switch保留 `createDorisEngine` 分支
- [ ] [internal/container/container.go](../../../internal/container/container.go) 的 `initRetrieveEngineRegistry`:保留 doris 的 `if slices.Contains(retrieveDriver, "doris")`
- [ ] [docker-compose.yml](../../../docker-compose.yml) 的 app 服务 `environment:` 段保留 6 个 `DORIS_*``volumes:` 段保留 `doris_fe_meta` 等;新增的服务定义保留
- [ ] [.env.example](../../../.env.example) 的 `RETRIEVE_DRIVER` 注释保留 `doris``DORIS_*` 段保留
- [ ] [internal/types/vectorstore_test.go](../../../internal/types/vectorstore_test.go) 中以维度计数的断言(如 `len == 5``all supported drivers` 用例):如上游引擎数量也变了,要把数字调整到位,**保留 doris 断言**
如果 `interfaces.RetrieveEngineRepository`[internal/types/interfaces/retriever.go](../../../internal/types/interfaces/retriever.go))增加方法:
- [ ] 在 [internal/application/repository/retriever/doris/repository.go](../../../internal/application/repository/retriever/doris/repository.go) 同步实现新方法
- [ ] 添加对应单测到 [internal/application/repository/retriever/doris/repository_test.go](../../../internal/application/repository/retriever/doris/repository_test.go)
## 六、不在本次改动范围
- TLS 加密连接先按非加密实现TLS 走 DSN `tls=skip-verify` 选项后续再加
- 多租户隔离改造:仍由 `vector_stores` 表的 `tenant_id` 列承担repository 层无需感知
- pgvector 风格的 SQL 函数自动注册脚本Doris 不需要插件)
- 提交 PR 给 Tencent/WeKnora 上游(看是否打算贡献回去再决定)
## 反向链接
- [Home](../Home.md) —— Wiki 首页导航
- [集成向量数据库](../集成扩展/集成向量数据库.md) —— 通用集成指南
- [使用其他向量数据库](../../使用其他向量数据库.md) —— 上层版本(含 Doris 完整说明)

View File

@@ -1,6 +1,7 @@
package doris
import (
"math"
"strconv"
"strings"
@@ -173,6 +174,33 @@ func parseEmbeddingLiteral(raw []byte) ([]float32, error) {
return out, nil
}
// validateEmbedding 校验向量元素均为有限值。
//
// strconv.FormatFloat 对 NaN/±Inf 会输出 "NaN"/"+Inf"/"-Inf"
// 这些字面量会让 Doris 拼出来的 SQL 报语法错误(或在某些版本下产生未定义结果)。
// 上游(嵌入模型)正常情况下不会输出非有限值,但 GPU OOM、上游 bug、
// 测试桩都可能触发;这里 fail-fast 比悄悄写脏数据安全。
func validateEmbedding(vec []float32) error {
for i, v := range vec {
if f := float64(v); math.IsNaN(f) || math.IsInf(f, 0) {
return errInvalidEmbedding{index: i, value: v}
}
}
return nil
}
// errInvalidEmbedding 描述哪个下标含非有限值;用结构体而非 fmt.Errorf
// 是为了让上层在日志里能拿到下标做问题定位。
type errInvalidEmbedding struct {
index int
value float32
}
func (e errInvalidEmbedding) Error() string {
return "doris: embedding[" + strconv.Itoa(e.index) +
"] is not finite: " + strconv.FormatFloat(float64(e.value), 'g', -1, 32)
}
// embeddingLiteral 把 []float32 转为 Doris ARRAY<FLOAT> 字面量字符串:
// "[1.23,4.56,...]"。
//

View File

@@ -74,14 +74,10 @@ func (r *dorisRepository) EstimateStorageSize(_ context.Context,
return total
}
// Save 写入单条记录到对应维度的表。
// Save 写入单条记录到对应维度的表。空向量在 BatchSave 内部统一拒绝。
func (r *dorisRepository) Save(ctx context.Context,
info *types.IndexInfo, additionalParams map[string]any,
) error {
emb := toDorisVectorEmbedding(info, additionalParams)
if len(emb.Embedding) == 0 {
return fmt.Errorf("empty embedding vector for chunk ID: %s", info.ChunkID)
}
return r.BatchSave(ctx, []*types.IndexInfo{info}, additionalParams)
}
@@ -102,6 +98,9 @@ func (r *dorisRepository) BatchSave(ctx context.Context,
log.Warnf("[Doris] Skipping empty embedding for chunk %s", info.ChunkID)
continue
}
if err := validateEmbedding(emb.Embedding); err != nil {
return fmt.Errorf("invalid embedding for chunk %s: %w", info.ChunkID, err)
}
// 给一个稳定的主键。SourceID 是上层最有意义的"行身份"
// 但同 chunk 多 question 的场景下 SourceID 已经唯一,所以直接用它。
if emb.ID == "" {
@@ -226,6 +225,9 @@ func (r *dorisRepository) VectorRetrieve(ctx context.Context,
params types.RetrieveParams,
) ([]*types.RetrieveResult, error) {
log := logger.GetLogger(ctx)
if err := validateEmbedding(params.Embedding); err != nil {
return nil, fmt.Errorf("invalid query embedding: %w", err)
}
dim := len(params.Embedding)
table := r.getTableName(dim)
@@ -477,12 +479,12 @@ func scanRetrieveRows(rows *sql.Rows, matchType types.MatchType) ([]*types.Index
var out []*types.IndexWithScore
for rows.Next() {
var (
id, content, sourceID, chunkID string
knowledgeID, knowledgeBaseID, tagID string
sourceType int
isEnabled bool
score float64
err error
id, content, sourceID, chunkID string
knowledgeID, knowledgeBaseID, tagID string
sourceType int
isEnabled bool
score float64
err error
)
if withScore {
err = rows.Scan(&id, &content, &sourceID, &sourceType,

View File

@@ -11,6 +11,7 @@ import (
"regexp"
"strings"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/Tencent/WeKnora/internal/types"
@@ -382,7 +383,10 @@ func TestEnsureTable_DDLShape(t *testing.T) {
)
require.NoError(t, repo.ensureTable(context.Background(), 768))
assert.NoError(t, mock.ExpectationsWereMet())
// waitANNReady 在后台 goroutine 里执行,轮询 ExpectationsWereMet 直到 SHOW INDEX 也被消费。
require.Eventually(t, func() bool {
return mock.ExpectationsWereMet() == nil
}, 2*time.Second, 10*time.Millisecond, "expectations should be met after async ANN poll")
}
func TestBatchSave_SQLShape(t *testing.T) {

View File

@@ -55,12 +55,22 @@ func (r *dorisRepository) ensureTable(ctx context.Context, dimension int) error
return fmt.Errorf("create table: %w", err)
}
// 创建表后等一会让 ANN 索引就绪,超时不致命。
if err := r.waitANNReady(ctx, tableName); err != nil {
log.Warnf("[Doris] ANN index for %s not ready within %s: %v "+
"(writes will still proceed; queries may fall back to brute force temporarily)",
tableName, annReadyTimeout, err)
}
// ANN 索引在 Doris 端异步构建。这里在后台 goroutine 里轮询就绪,
// 写入路径不阻塞——索引未就绪期间检索会退化为 brute-force结果对、速度慢
// 比让首批写入卡 30s 更可接受。
go func(tn string) {
// 用独立 context带 timeout避免请求级 ctx 取消把后台轮询也带走。
bgCtx, cancel := context.WithTimeout(context.Background(), annReadyTimeout)
defer cancel()
if err := r.waitANNReady(bgCtx, tn); err != nil {
logger.GetLogger(bgCtx).Warnf(
"[Doris] ANN index for %s not ready within %s: %v "+
"(queries may fall back to brute force temporarily)",
tn, annReadyTimeout, err)
return
}
logger.GetLogger(bgCtx).Infof("[Doris] ANN index for %s ready", tn)
}(tableName)
}
r.initializedTables.Store(dimension, true)
@@ -199,7 +209,6 @@ func (r *dorisRepository) annIndexReady(ctx context.Context, tableName string) (
}
}
foundANN := false
for rows.Next() {
// 使用 sql.RawBytes 接收以兼容不同列类型。
raw := make([]any, len(cols))
@@ -222,7 +231,6 @@ func (r *dorisRepository) annIndexReady(ctx context.Context, tableName string) (
if keyName != "idx_emb" {
continue
}
foundANN = true
if stateIdx < 0 {
// 旧版本不暴露 state 列,乐观认为已就绪。
return true, nil
@@ -235,11 +243,10 @@ func (r *dorisRepository) annIndexReady(ctx context.Context, tableName string) (
if err := rows.Err(); err != nil {
return false, err
}
if !foundANN {
// 没找到 ANN 行(可能是 Doris 版本返回字段不同),不阻塞。
return true, nil
}
// 走到这里有两种情况:
// 1. 找到了 idx_emb 行,且 state 已是 FINISHED/NORMAL或 stateIdx<0 的旧版本);
// 2. 没找到 idx_emb 行(极旧 Doris 不暴露该索引名);
// 都视为已就绪,不阻塞。未就绪的分支已在循环内提前 return false。
return true, nil
}

View File

@@ -8,13 +8,14 @@ import (
"io"
"net"
"net/http"
"strings"
"time"
"github.com/Tencent/WeKnora/internal/errors"
"github.com/Tencent/WeKnora/internal/logger"
"github.com/Tencent/WeKnora/internal/types"
_ "github.com/go-sql-driver/mysql" // MySQL driver for database/sql, used by Doris connection test
_ "github.com/jackc/pgx/v5/stdlib" // pgx driver for database/sql
"github.com/go-sql-driver/mysql" // MySQL driver for database/sql, used by Doris connection test
_ "github.com/jackc/pgx/v5/stdlib" // pgx driver for database/sql
"github.com/qdrant/go-client/qdrant"
"github.com/weaviate/weaviate-go-client/v5/weaviate"
"github.com/weaviate/weaviate-go-client/v5/weaviate/auth"
@@ -233,8 +234,9 @@ func testWeaviateConnection(ctx context.Context, config types.ConnectionConfig)
// testDorisConnection 通过 MySQL 协议database/sql + go-sql-driver
// Ping Doris FE 并查询 @@version。
//
// Doris 4.1 的 @@version 形如 "5.7.99 Doris-4.1.0"——前半段是兼容性表达式,
// 后半段才是真正的 Doris 版本号。这里直接返回原样字符串,由调用方按需展示。
// Doris 的 @@version 形如 "5.7.99 Doris-4.1.0"——前半段是 MySQL 协议
// 兼容性表达式,"Doris-" 之后才是真实版本号。统一只返回 "4.1.0" 这类
// 裸版本号,与 Postgres/ES 路径的格式保持一致。
func testDorisConnection(ctx context.Context, config types.ConnectionConfig) (string, error) {
testCtx, cancel := context.WithTimeout(ctx, connectionTestTimeout)
defer cancel()
@@ -249,9 +251,16 @@ func testDorisConnection(ctx context.Context, config types.ConnectionConfig) (st
database = "information_schema"
}
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?timeout=5s",
config.Username, config.Password, config.Addr, database)
db, err := sql.Open("mysql", dsn)
// 用 mysql.Config.FormatDSN() 构造 DSN避免用户名/密码中 `@` `:` `/`
// 等特殊字符破坏字面量拼接fmt.Sprintf 会跑偏,参考 issue #1234 类问题)。
cfg := mysql.NewConfig()
cfg.User = config.Username
cfg.Passwd = config.Password
cfg.Net = "tcp"
cfg.Addr = config.Addr
cfg.DBName = database
cfg.Timeout = 5 * time.Second
db, err := sql.Open("mysql", cfg.FormatDSN())
if err != nil {
return "", errors.NewBadRequestError("failed to create doris connection: invalid configuration")
}
@@ -267,5 +276,8 @@ func testDorisConnection(ctx context.Context, config types.ConnectionConfig) (st
logger.Warnf(ctx, "Doris version detection failed: %v", err)
return "", nil
}
if i := strings.Index(version, "Doris-"); i >= 0 {
return strings.TrimSpace(version[i+len("Doris-"):]), nil
}
return version, nil
}

View File

@@ -10,7 +10,7 @@ import (
esv7 "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v8"
_ "github.com/go-sql-driver/mysql" // 通过 database/sql 注册 mysql 驱动给 Doris 使用
"github.com/go-sql-driver/mysql" // 通过 database/sql 注册 mysql 驱动给 Doris 使用
"github.com/milvus-io/milvus/client/v2/milvusclient"
"github.com/qdrant/go-client/qdrant"
"github.com/weaviate/weaviate-go-client/v5/weaviate"
@@ -228,9 +228,16 @@ func createDorisEngine(store types.VectorStore) (interfaces.RetrieveEngineServic
return nil, fmt.Errorf("doris connection requires database")
}
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=Local",
cc.Username, cc.Password, cc.Addr, cc.Database)
db, err := sql.Open("mysql", dsn)
mc := mysql.NewConfig()
mc.User = cc.Username
mc.Passwd = cc.Password
mc.Net = "tcp"
mc.Addr = cc.Addr
mc.DBName = cc.Database
mc.Params = map[string]string{"charset": "utf8mb4"}
mc.ParseTime = true
mc.Loc = time.Local
db, err := sql.Open("mysql", mc.FormatDSN())
if err != nil {
return nil, fmt.Errorf("create doris client: %w", err)
}

View File

@@ -1,73 +0,0 @@
#!/usr/bin/env bash
# Doris 4.1 端到端联调脚本
#
# 该脚本验证:
# 1) 后端启动正常
# 2) 通过 API 创建 Doris VectorStore
# 3) 上传知识、写入索引
# 4) 向量检索 + 关键词检索
# 5) 关闭单个 chunk 后再检索(验证 BatchUpdateChunkEnabledStatus 走 Stream Load
#
# 使用前置条件:
# - docker compose --profile doris up -d 先把 Doris 起来
# - 在 FE 上 CREATE DATABASE weknora;
# - export RETRIEVE_DRIVER=doris && make run (或 docker compose up app
#
# 这是"checklist-as-script",按需要手动逐段执行。
set -euo pipefail
WEKNORA_API="${WEKNORA_API:-http://localhost:8080}"
DORIS_FE_HOST="${DORIS_FE_HOST:-127.0.0.1}"
DORIS_FE_HTTP_PORT="${DORIS_FE_HTTP_PORT:-8030}"
DORIS_FE_MYSQL_PORT="${DORIS_FE_MYSQL_PORT:-9030}"
step() { printf '\n\033[1;36m==> %s\033[0m\n' "$*"; }
step "1. 检查 Doris FE 端口可达"
nc -zv "$DORIS_FE_HOST" "$DORIS_FE_HTTP_PORT"
nc -zv "$DORIS_FE_HOST" "$DORIS_FE_MYSQL_PORT"
step "2. 在 FE 上确认 weknora 库存在"
docker exec WeKnora-doris-fe \
mysql -h 127.0.0.1 -P 9030 -uroot \
-e "CREATE DATABASE IF NOT EXISTS weknora; SHOW DATABASES;"
step "3. 通过 WeKnora API 创建 Doris VectorStore"
curl -fsS -X POST "$WEKNORA_API/api/v1/vector-stores" \
-H 'Content-Type: application/json' \
-d '{
"name": "doris-local",
"engine_type": "doris",
"connection_config": {
"addr": "doris-fe:9030",
"http_port": 8030,
"database": "weknora",
"username": "root",
"password": ""
},
"index_config": {
"collection_prefix": "weknora_embeddings",
"buckets_num": 5,
"replication_num": 1
}
}'
step "4. 上传一个简单知识库(这一步交给前端 UI 完成更省事)"
echo "在前端 UI 切到刚才创建的 Doris VectorStore新建知识库并上传一篇 PDF。"
echo "或者用 curl + /api/v1/knowledges 调用 API。"
step "5. 在 FE 上验证表已建好"
docker exec WeKnora-doris-fe \
mysql -h 127.0.0.1 -P 9030 -uroot \
-e "USE weknora; SHOW TABLES LIKE 'weknora_embeddings_%'; SHOW INDEX FROM weknora_embeddings_768;"
step "6. 检索验证"
echo "在前端发起检索(向量 + 关键词),确认有命中。"
echo "命中后到 FE 上查 SELECT COUNT(*) FROM weknora_embeddings_<dim>;"
step "7. 状态批改验证"
echo "在前端把某个 chunk 关闭,再次发起检索,确认该 chunk 不再返回。"
echo "FE 上确认 SELECT id, is_enabled FROM weknora_embeddings_<dim> WHERE chunk_id = '<id>';"
echo
echo "全部步骤完成 → 联调通过"