From b3dbf86bf5643433d7016ef33819d0ecd5665b37 Mon Sep 17 00:00:00 2001 From: Li Xianggang Date: Mon, 18 May 2026 18:54:57 +0800 Subject: [PATCH] =?UTF-8?q?feat(obs):=20=E6=94=AF=E6=8C=81=E5=8D=8E?= =?UTF-8?q?=E4=B8=BA=E4=BA=91obs=E5=AD=98=E5=82=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 23 ++ client/system.go | 1 + docker-compose.yml | 14 + frontend/src/api/initialization/index.ts | 2 +- frontend/src/api/system/index.ts | 13 +- frontend/src/i18n/locales/en-US.ts | 10 + frontend/src/i18n/locales/ko-KR.ts | 10 + frontend/src/i18n/locales/ru-RU.ts | 10 + frontend/src/i18n/locales/zh-CN.ts | 10 + frontend/src/utils/security.ts | 10 +- .../knowledge/settings/KBStorageSettings.vue | 10 +- .../views/settings/StorageEngineSettings.vue | 120 ++++++++ internal/application/service/file/factory.go | 19 ++ internal/application/service/file/obs.go | 266 ++++++++++++++++++ .../application/service/image_multimodal.go | 6 +- internal/container/container.go | 20 ++ internal/handler/storage_allowlist.go | 2 +- internal/handler/system.go | 51 +++- .../docparser/image_resolver.go | 2 +- internal/types/knowledgebase.go | 27 +- internal/types/tenant.go | 30 +- 21 files changed, 626 insertions(+), 30 deletions(-) create mode 100644 internal/application/service/file/obs.go diff --git a/.env.example b/.env.example index 0f76bfda..75c12fcb 100644 --- a/.env.example +++ b/.env.example @@ -368,6 +368,29 @@ COS_ENABLE_OLD_DOMAIN=true # AWS S3可选路径前缀(可选) # S3_PATH_PREFIX=your_s3_path_prefix +# 如果使用华为云OBS作为文件存储,需要配置以下参数 +# 华为云OBS的访问端点,例如 obs.cn-north-4.myhuaweicloud.com +# OBS_ENDPOINT=obs.cn-north-4.myhuaweicloud.com + +# 华为云OBS的区域,默认 cn-north-4 +# OBS_REGION=cn-north-4 + +# 华为云OBS访问密钥 Access Key +# OBS_ACCESS_KEY=your_obs_access_key + +# 华为云OBS访问密钥 Secret Key +# OBS_SECRET_KEY=your_obs_secret_key + +# 华为云OBS桶名称 +# OBS_BUCKET_NAME=your_obs_bucket_name + +# 华为云OBS可选路径前缀(可选) +# OBS_PATH_PREFIX=weknora/ + + +# 华云OBS代理域名(可选,优先级最高) +# OBS_PROXY_DOMAIN=https://your-domain.com/obs + # 如果解析网络连接使用Web代理,需要配置以下参数 # WEB_PROXY=your_web_proxy diff --git a/client/system.go b/client/system.go index 2f8ce328..44cd25e1 100644 --- a/client/system.go +++ b/client/system.go @@ -49,6 +49,7 @@ type StorageCheckRequest struct { COS json.RawMessage `json:"cos,omitempty"` TOS json.RawMessage `json:"tos,omitempty"` S3 json.RawMessage `json:"s3,omitempty"` + OBS json.RawMessage `json:"obs,omitempty"` } // StorageCheckResponse is the response for storage engine check diff --git a/docker-compose.yml b/docker-compose.yml index 745e481e..75a8e5b6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -127,6 +127,13 @@ services: - MINIO_ACCESS_KEY_ID=${MINIO_ACCESS_KEY_ID:-minioadmin} - MINIO_SECRET_ACCESS_KEY=${MINIO_SECRET_ACCESS_KEY:-minioadmin} - MINIO_BUCKET_NAME=${MINIO_BUCKET_NAME:-} + - OBS_ENDPOINT=${OBS_ENDPOINT:-} + - OBS_REGION=${OBS_REGION:-} + - OBS_ACCESS_KEY=${OBS_ACCESS_KEY:-} + - OBS_SECRET_KEY=${OBS_SECRET_KEY:-} + - OBS_BUCKET_NAME=${OBS_BUCKET_NAME:-} + - OBS_PATH_PREFIX=${OBS_PATH_PREFIX:-} + - OBS_PROXY_DOMAIN=${OBS_PROXY_DOMAIN:-} - OLLAMA_BASE_URL=${OLLAMA_BASE_URL:-http://host.docker.internal:11434} - STREAM_MANAGER_TYPE=${STREAM_MANAGER_TYPE:-} - REDIS_ADDR=redis:6379 @@ -217,6 +224,13 @@ services: - GRPC_TLS_KEY=${GRPC_TLS_KEY:-} - GRPC_TLS_CA=${GRPC_TLS_CA:-} - GRPC_AUTH_TOKEN=${GRPC_AUTH_TOKEN:-} + - OBS_ENDPOINT=${OBS_ENDPOINT:-} + - OBS_REGION=${OBS_REGION:-} + - OBS_ACCESS_KEY=${OBS_ACCESS_KEY:-} + - OBS_SECRET_KEY=${OBS_SECRET_KEY:-} + - OBS_BUCKET_NAME=${OBS_BUCKET_NAME:-} + - OBS_PATH_PREFIX=${OBS_PATH_PREFIX:-} + - OBS_PROXY_DOMAIN=${OBS_PROXY_DOMAIN:-} healthcheck: test: ["CMD", "grpc_health_probe", "-addr=localhost:50051"] interval: 30s diff --git a/frontend/src/api/initialization/index.ts b/frontend/src/api/initialization/index.ts index 851cd586..71e8ec56 100644 --- a/frontend/src/api/initialization/index.ts +++ b/frontend/src/api/initialization/index.ts @@ -117,7 +117,7 @@ export interface KBModelConfigRequest { multimodal: { enabled: boolean } - /** 存储引擎选择:"local" | "minio" | "cos",影响文档上传与文档内图片存储 */ + /** 存储引擎选择:"local" | "minio" | "cos" | "obs" 等,影响文档上传与文档内图片存储 */ storageProvider?: string nodeExtract: { enabled: boolean diff --git a/frontend/src/api/system/index.ts b/frontend/src/api/system/index.ts index d9a3b863..e1c80de4 100644 --- a/frontend/src/api/system/index.ts +++ b/frontend/src/api/system/index.ts @@ -118,7 +118,7 @@ export function reconnectDocReader(addr: string): Promise]*?)\ssrc=(["'])(local|minio|cos|tos|s3|oss|ks3):(?:\/\/|//|//)([^"']+)\2([^>]*)>/gi, + /]*?)\ssrc=(["'])(local|minio|cos|tos|s3|oss|ks3|obs):(?:\/\/|//|//)([^"']+)\2([^>]*)>/gi, (_m, before, quote, provider, restPathRaw, after) => { const restPath = decodeProviderURL(restPathRaw); const protectedSrc = `${provider}://${restPath}`; @@ -178,7 +178,7 @@ export function isValidURL(url: string): boolean { } // 允许 provider:// 形式,由前端后续鉴权拉取并替换为 blob URL - if (/^(local|minio|cos|tos|s3|oss|ks3):\/\/\S+$/i.test(trimmed)) { + if (/^(local|minio|cos|tos|s3|oss|ks3|obs):\/\/\S+$/i.test(trimmed)) { return true; } @@ -301,7 +301,7 @@ export async function hydrateProtectedFileImages(root: ParentNode | null | undef } const images = root.querySelectorAll( - 'img[data-protected-src], img[src^="local://"], img[src^="minio://"], img[src^="cos://"], img[src^="tos://"], img[src^="s3://"], img[src^="oss://"], img[src^="ks3://"]', + 'img[data-protected-src], img[src^="local://"], img[src^="minio://"], img[src^="cos://"], img[src^="tos://"], img[src^="s3://"], img[src^="oss://"], img[src^="ks3://"], img[src^="obs://"]', ); if (!images.length) { return; @@ -321,7 +321,7 @@ export async function hydrateProtectedFileImages(root: ParentNode | null | undef } img.dataset.authHydrated = '1'; - const isProviderScheme = /^(local|minio|cos|tos|s3|oss|ks3):\/\//.test(sourceURL); + const isProviderScheme = /^(local|minio|cos|tos|s3|oss|ks3|obs):\/\//.test(sourceURL); const requestURL = isProviderScheme ? `/files?${new URLSearchParams({ file_path: sourceURL }).toString()}` : sourceURL; diff --git a/frontend/src/views/knowledge/settings/KBStorageSettings.vue b/frontend/src/views/knowledge/settings/KBStorageSettings.vue index be5b13fb..406b926a 100644 --- a/frontend/src/views/knowledge/settings/KBStorageSettings.vue +++ b/frontend/src/views/knowledge/settings/KBStorageSettings.vue @@ -139,6 +139,14 @@ const engineOptions = computed(() => { available: statusMap.ks3, disabled: allowedMap.ks3 === false || statusMap.ks3 === false, }, + { + value: 'obs', + label: t('kbSettings.storage.engineObs'), + desc: t('kbSettings.storage.engineObsDesc'), + allowed: allowedMap.obs !== false, + available: statusMap.obs, + disabled: allowedMap.obs === false || statusMap.obs === false, + }, ] }) @@ -179,7 +187,7 @@ async function load() { allowedProviders.value = statusRes?.data?.allowed_providers ?? [] defaultProvider.value = configRes?.data?.default_provider || 'local' const d = configRes?.data - hasAnyConfig.value = !!(d?.local?.path_prefix || d?.minio?.bucket_name || d?.cos?.bucket_name || d?.tos?.bucket_name || d?.s3?.bucket_name || d?.oss?.bucket_name || d?.ks3?.bucket_name) + hasAnyConfig.value = !!(d?.local?.path_prefix || d?.minio?.bucket_name || d?.cos?.bucket_name || d?.tos?.bucket_name || d?.s3?.bucket_name || d?.oss?.bucket_name || d?.ks3?.bucket_name || d?.obs?.bucket_name) if (!localProvider.value || localProvider.value === '') { localProvider.value = defaultProvider.value emit('update:storageProvider', localProvider.value) diff --git a/frontend/src/views/settings/StorageEngineSettings.vue b/frontend/src/views/settings/StorageEngineSettings.vue index de3aa17c..06bde059 100644 --- a/frontend/src/views/settings/StorageEngineSettings.vue +++ b/frontend/src/views/settings/StorageEngineSettings.vue @@ -135,6 +135,18 @@

{{ $t('settings.storage.ks3Desc') }}

+ +
+
+

{{ $t('settings.storage.obsTitle') }}

+ {{ $t('settings.storage.configurable') }} +
+

{{ $t('settings.storage.obsDesc') }}

+
@@ -449,6 +461,67 @@ + +
@@ -513,6 +586,14 @@ const defaultConfig = (): StorageEngineConfig => ({ bucket_name: '', path_prefix: '', }, + obs: { + endpoint: '', + region: '', + access_key: '', + secret_key: '', + bucket_name: '', + path_prefix: '', + }, }) const loading = ref(true) @@ -537,6 +618,8 @@ const checkingOss = ref(false) const ossCheckResult = ref<{ ok: boolean; message: string } | null>(null) const checkingKs3 = ref(false) const ks3CheckResult = ref<{ ok: boolean; message: string } | null>(null) +const checkingObs = ref(false) +const obsCheckResult = ref<{ ok: boolean; message: string } | null>(null) const drawerVisible = ref(false) const currentEngine = ref(null) @@ -549,6 +632,7 @@ const providerOptions = computed(() => [ { value: 's3', label: 'AWS S3', allowed: isProviderAllowed('s3') }, { value: 'oss', label: t('settings.storage.engineOss'), allowed: isProviderAllowed('oss') }, { value: 'ks3', label: t('settings.storage.engineKs3'), allowed: isProviderAllowed('ks3') }, + { value: 'obs', label: t('settings.storage.engineObs'), allowed: isProviderAllowed('obs') }, ]) const hasAllowedProviders = computed(() => (allowedProviders.value?.length ?? 0) > 0) @@ -567,6 +651,8 @@ const currentCheckState = computed(() => { return { loading: checkingOss.value, result: ossCheckResult.value, onCheck: onCheckOss } case 'ks3': return { loading: checkingKs3.value, result: ks3CheckResult.value, onCheck: onCheckKs3 } + case 'obs': + return { loading: checkingObs.value, result: obsCheckResult.value, onCheck: onCheckObs } default: return { loading: false, result: null, onCheck: () => undefined } } @@ -582,6 +668,7 @@ const drawerTitle = computed(() => { s3: t('settings.storage.s3Title'), oss: t('settings.storage.ossTitle'), ks3: t('settings.storage.ks3Title'), + obs: t('settings.storage.obsTitle'), } return titles[currentEngine.value] || currentEngine.value }) @@ -614,6 +701,7 @@ function openDrawer(engine: string) { s3CheckResult.value = null ossCheckResult.value = null ks3CheckResult.value = null + obsCheckResult.value = null } async function loadConfig() { @@ -688,6 +776,16 @@ async function loadConfig() { path_prefix: d.ks3.path_prefix || '', } : defaultConfig().ks3, + obs: d.obs + ? { + endpoint: d.obs.endpoint || '', + region: d.obs.region || '', + access_key: d.obs.access_key || '', + secret_key: d.obs.secret_key || '', + bucket_name: d.obs.bucket_name || '', + path_prefix: d.obs.path_prefix || '', + } + : defaultConfig().obs, } } } catch { @@ -787,6 +885,14 @@ function buildPayload(): StorageEngineConfig { bucket_name: (config.value.ks3?.bucket_name || '').trim(), path_prefix: (config.value.ks3?.path_prefix || '').trim(), }, + obs: { + endpoint: (config.value.obs?.endpoint || '').trim(), + region: (config.value.obs?.region || '').trim(), + access_key: (config.value.obs?.access_key || '').trim(), + secret_key: (config.value.obs?.secret_key || '').trim(), + bucket_name: (config.value.obs?.bucket_name || '').trim(), + path_prefix: (config.value.obs?.path_prefix || '').trim(), + }, } } @@ -897,6 +1003,20 @@ async function onCheckKs3() { } } +async function onCheckObs() { + checkingObs.value = true + obsCheckResult.value = null + try { + const payload = buildPayload() + const res = await checkStorageEngine({ provider: 'obs', obs: payload.obs }) + obsCheckResult.value = res?.data ?? { ok: false, message: t('settings.storage.unknownError') } + } catch (e: unknown) { + obsCheckResult.value = { ok: false, message: e instanceof Error ? e.message : t('settings.storage.requestFailed') } + } finally { + checkingObs.value = false + } +} + onMounted(loadAll) diff --git a/internal/application/service/file/factory.go b/internal/application/service/file/factory.go index 1ca115ba..adcd73ee 100644 --- a/internal/application/service/file/factory.go +++ b/internal/application/service/file/factory.go @@ -102,6 +102,25 @@ func NewFileServiceFromStorageConfig( svc, err := NewS3FileService(sec.S3.Endpoint, sec.S3.AccessKey, sec.S3.SecretKey, sec.S3.BucketName, sec.S3.Region, pathPrefix) return svc, p, err + case "obs": + obsEndpoint := strings.TrimSpace(os.Getenv("OBS_ENDPOINT")) + obsRegion := strings.TrimSpace(os.Getenv("OBS_REGION")) + obsAccessKey := strings.TrimSpace(os.Getenv("OBS_ACCESS_KEY")) + obsSecretKey := strings.TrimSpace(os.Getenv("OBS_SECRET_KEY")) + obsBucketName := strings.TrimSpace(os.Getenv("OBS_BUCKET_NAME")) + obsPathPrefix := strings.TrimSpace(os.Getenv("OBS_PATH_PREFIX")) + if obsPathPrefix == "" { + obsPathPrefix = "weknora/" + } + if obsEndpoint == "" || obsAccessKey == "" || obsSecretKey == "" || obsBucketName == "" { + return nil, p, fmt.Errorf("incomplete obs config") + } + if obsRegion == "" { + obsRegion = "cn-north-4" + } + svc, err := NewObsFileService(obsEndpoint, obsRegion, obsAccessKey, obsSecretKey, obsBucketName, obsPathPrefix) + return svc, p, err + case "oss": if sec == nil || sec.OSS == nil || sec.OSS.Endpoint == "" || sec.OSS.Region == "" || sec.OSS.AccessKey == "" || sec.OSS.SecretKey == "" || sec.OSS.BucketName == "" { return nil, p, fmt.Errorf("incomplete oss config") diff --git a/internal/application/service/file/obs.go b/internal/application/service/file/obs.go new file mode 100644 index 00000000..735606dd --- /dev/null +++ b/internal/application/service/file/obs.go @@ -0,0 +1,266 @@ +package file + +import ( + "context" + "fmt" + "io" + "mime/multipart" + "os" + "path/filepath" + "strings" + "time" + + "github.com/Tencent/WeKnora/internal/types/interfaces" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/google/uuid" +) + +type obsFileService struct { + client *s3.Client + bucketName string + endpoint string + region string + pathPrefix string + proxyDomain string +} + +type obsEndpointResolver struct { + url string +} + +func (r *obsEndpointResolver) ResolveEndpoint(region string, options s3.EndpointResolverOptions) (aws.Endpoint, error) { + return aws.Endpoint{ + URL: r.url, + SigningRegion: region, + HostnameImmutable: true, + }, nil +} + +func NewObsFileService( + endpoint, region, accessKeyID, secretAccessKey, bucketName string, + pathPrefix string, +) (interfaces.FileService, error) { + + client := s3.New(s3.Options{ + Region: region, + EndpointResolver: &obsEndpointResolver{url: endpoint}, + Credentials: credentials.NewStaticCredentialsProvider(accessKeyID, secretAccessKey, ""), + UsePathStyle: true, + }) + + _, err := client.HeadBucket(context.Background(), &s3.HeadBucketInput{ + Bucket: aws.String(bucketName), + }) + if err != nil { + _, createErr := client.CreateBucket(context.Background(), &s3.CreateBucketInput{ + Bucket: aws.String(bucketName), + }) + if createErr != nil { + fmt.Printf("Warning: bucket %s may not exist or cannot be created: %v\n", bucketName, createErr) + } + } + + proxyDomain := strings.TrimSuffix(os.Getenv("OBS_PROXY_DOMAIN"), "/") + + return &obsFileService{ + client: client, + bucketName: bucketName, + endpoint: endpoint, + region: region, + pathPrefix: strings.Trim(pathPrefix, "/"), + proxyDomain: proxyDomain, + }, nil +} + +func CheckObsConnectivity(ctx context.Context, endpoint, region, accessKey, secretKey, bucketName string) error { + checkCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + client := s3.New(s3.Options{ + Region: region, + EndpointResolver: &obsEndpointResolver{url: endpoint}, + Credentials: credentials.NewStaticCredentialsProvider(accessKey, secretKey, ""), + UsePathStyle: true, + }) + + _, err := client.HeadBucket(checkCtx, &s3.HeadBucketInput{ + Bucket: aws.String(bucketName), + }) + if err != nil { + return fmt.Errorf("OBS connectivity check failed: %w", err) + } + return nil +} + +func (s *obsFileService) CheckConnectivity(ctx context.Context) error { + checkCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + _, err := s.client.HeadBucket(checkCtx, &s3.HeadBucketInput{ + Bucket: aws.String(s.bucketName), + }) + if err != nil { + return fmt.Errorf("OBS connectivity check failed: %w", err) + } + return nil +} + +func (s *obsFileService) parseObsFilePath(filePath string) (string, error) { + prefix := s.getPrifix() + + if strings.HasPrefix(filePath, prefix) { + rest := strings.TrimPrefix(filePath, prefix) + // With proxy domain: path is {prefix}/{objectKey} (no bucket name) + if s.proxyDomain != "" { + rest = strings.TrimPrefix(rest, "/") + if rest != "" { + return rest, nil + } + return "", fmt.Errorf("invalid OBS file path: %s", filePath) + } + // Without proxy domain: path is {prefix}/{bucketName}/{objectKey} + parts := strings.SplitN(rest, "/", 2) + if len(parts) == 2 && parts[0] == s.bucketName && parts[1] != "" { + return parts[1], nil + } + return "", fmt.Errorf("invalid OBS file path: %s", filePath) + } + return filePath, nil +} + +func (s *obsFileService) getPrifix() string { + if s.proxyDomain != "" { + return s.proxyDomain + "/" + } + return "obs://" +} + +func (s *obsFileService) SaveFile(ctx context.Context, + file *multipart.FileHeader, tenantID uint64, knowledgeID string, +) (string, error) { + ext := filepath.Ext(file.Filename) + + var objectKey string + if s.pathPrefix != "" { + objectKey = fmt.Sprintf("%s/%d/%s/%s%s", s.pathPrefix, tenantID, knowledgeID, uuid.New().String(), ext) + } else { + objectKey = fmt.Sprintf("%d/%s/%s%s", tenantID, knowledgeID, uuid.New().String(), ext) + } + + src, err := file.Open() + if err != nil { + return "", fmt.Errorf("failed to open file: %w", err) + } + defer src.Close() + + contentType := file.Header.Get("Content-Type") + if contentType == "" { + contentType = "application/octet-stream" + } + + _, err = s.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(s.bucketName), + Key: aws.String(objectKey), + Body: src, + ContentLength: aws.Int64(file.Size), + ContentType: aws.String(contentType), + // ACL: "private", + }) + if err != nil { + return "", fmt.Errorf("failed to upload file to OBS: %w", err) + } + prefix := s.getPrifix() + if s.proxyDomain != "" { + return fmt.Sprintf("%s%s", prefix, objectKey), nil + } + return fmt.Sprintf("%s%s/%s", prefix, s.bucketName, objectKey), nil +} + +func (s *obsFileService) GetFile(ctx context.Context, filePath string) (io.ReadCloser, error) { + objectKey, err := s.parseObsFilePath(filePath) + if err != nil { + return nil, err + } + + output, err := s.client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(s.bucketName), + Key: aws.String(objectKey), + }) + if err != nil { + return nil, fmt.Errorf("failed to get file from OBS: %w", err) + } + + return output.Body, nil +} + +func (s *obsFileService) DeleteFile(ctx context.Context, filePath string) error { + objectKey, err := s.parseObsFilePath(filePath) + if err != nil { + return err + } + + _, err = s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(s.bucketName), + Key: aws.String(objectKey), + }) + if err != nil { + return fmt.Errorf("failed to delete file from OBS: %w", err) + } + + return nil +} + +func (s *obsFileService) GetFileURL(ctx context.Context, filePath string) (string, error) { + if strings.HasPrefix(filePath, "http://") || strings.HasPrefix(filePath, "https://") { + return filePath, nil + } + + objectKey, err := s.parseObsFilePath(filePath) + if err != nil { + return "", err + } + + if s.proxyDomain != "" { + return s.proxyDomain + "/" + strings.TrimPrefix(objectKey, "/"), nil + } + + return fmt.Sprintf("%s/%s/%s", s.endpoint, s.bucketName, strings.TrimPrefix(objectKey, "/")), nil +} + +func (s *obsFileService) SaveBytes(ctx context.Context, data []byte, tenantID uint64, fileName string, temp bool) (string, error) { + ext := filepath.Ext(fileName) + + var objectKey string + if temp { + if s.pathPrefix != "" { + objectKey = fmt.Sprintf("%s/temp/%d/%s%s", s.pathPrefix, tenantID, uuid.New().String(), ext) + } else { + objectKey = fmt.Sprintf("temp/%d/%s%s", tenantID, uuid.New().String(), ext) + } + } else { + if s.pathPrefix != "" { + objectKey = fmt.Sprintf("%s/%d/%s%s", s.pathPrefix, tenantID, uuid.New().String(), ext) + } else { + objectKey = fmt.Sprintf("%d/%s%s", tenantID, uuid.New().String(), ext) + } + } + + _, err := s.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(s.bucketName), + Key: aws.String(objectKey), + Body: strings.NewReader(string(data)), + ContentType: aws.String("application/octet-stream"), + ACL: "public-read", + }) + if err != nil { + return "", fmt.Errorf("failed to upload bytes to OBS: %w", err) + } + + prefix := s.getPrifix() + if s.proxyDomain != "" { + return fmt.Sprintf("%s%s", prefix, objectKey), nil + } + return fmt.Sprintf("%s%s/%s", prefix, s.bucketName, objectKey), nil +} diff --git a/internal/application/service/image_multimodal.go b/internal/application/service/image_multimodal.go index 1b8e1537..6b97a0d1 100644 --- a/internal/application/service/image_multimodal.go +++ b/internal/application/service/image_multimodal.go @@ -143,7 +143,7 @@ func (s *ImageMultimodalService) Handle(ctx context.Context, task *asynq.Task) e prompt = vlmOCRScannedPDFPrompt logger.Infof(ctx, "[ImageMultimodal] Using scanned PDF prompt for OCR: %s", payload.ImageURL) } - + ocrText, ocrErr := vlmModel.Predict(ctx, [][]byte{imgBytes}, prompt) if ocrErr != nil { logger.Warnf(ctx, "[ImageMultimodal] OCR failed for %s: %v", payload.ImageURL, ocrErr) @@ -430,7 +430,7 @@ func (s *ImageMultimodalService) checkAndFinalizeAllImages(ctx context.Context, } redisKey := fmt.Sprintf("multimodal:pending:%s", payload.KnowledgeID) - + pendingCount, err := s.redisClient.Decr(ctx, redisKey).Result() if err != nil && err != redis.Nil { logger.Warnf(ctx, "[ImageMultimodal] Failed to decrement pending count for %s: %v", payload.KnowledgeID, err) @@ -449,7 +449,7 @@ func (s *ImageMultimodalService) enqueueKnowledgePostProcessTask(ctx context.Con if s.taskEnqueuer == nil { return } - + taskPayload := types.KnowledgePostProcessPayload{ TenantID: payload.TenantID, KnowledgeID: payload.KnowledgeID, diff --git a/internal/container/container.go b/internal/container/container.go index 78be5f90..112b8bdb 100644 --- a/internal/container/container.go +++ b/internal/container/container.go @@ -746,6 +746,26 @@ func initFileService(cfg *config.Config) (interfaces.FileService, error) { os.Getenv("S3_REGION"), pathPrefix, ) + case "obs": + if os.Getenv("OBS_ENDPOINT") == "" || + os.Getenv("OBS_ACCESS_KEY") == "" || + os.Getenv("OBS_SECRET_KEY") == "" || + os.Getenv("OBS_BUCKET_NAME") == "" { + return nil, fmt.Errorf("missing OBS configuration") + } + obsRegion := os.Getenv("OBS_REGION") + obsPathPrefix := os.Getenv("OBS_PATH_PREFIX") + if obsPathPrefix == "" { + obsPathPrefix = "weknora/" + } + return file.NewObsFileService( + os.Getenv("OBS_ENDPOINT"), + obsRegion, + os.Getenv("OBS_ACCESS_KEY"), + os.Getenv("OBS_SECRET_KEY"), + os.Getenv("OBS_BUCKET_NAME"), + obsPathPrefix, + ) case "oss": if os.Getenv("OSS_ENDPOINT") == "" || os.Getenv("OSS_REGION") == "" || diff --git a/internal/handler/storage_allowlist.go b/internal/handler/storage_allowlist.go index b4d583e5..d041896c 100644 --- a/internal/handler/storage_allowlist.go +++ b/internal/handler/storage_allowlist.go @@ -7,7 +7,7 @@ import ( const storageAllowListEnv = "STORAGE_ALLOW_LIST" -var supportedStorageProviders = []string{"local", "minio", "cos", "tos", "s3", "oss", "ks3"} +var supportedStorageProviders = []string{"local", "minio", "cos", "tos", "s3", "oss", "ks3", "obs"} func getSupportedStorageProviders() []string { providers := make([]string, len(supportedStorageProviders)) diff --git a/internal/handler/system.go b/internal/handler/system.go index f12f41af..8fd07265 100644 --- a/internal/handler/system.go +++ b/internal/handler/system.go @@ -614,13 +614,14 @@ func isBlockedStorageEndpoint(endpoint string) (bool, string) { // StorageCheckRequest is the body for POST /system/storage-engine-check. type StorageCheckRequest struct { - Provider string `json:"provider"` // "minio", "cos", "tos", "s3", "oss", "ks3" + Provider string `json:"provider"` // "minio", "cos", "tos", "s3", "oss", "ks3", "obs" MinIO *types.MinIOEngineConfig `json:"minio,omitempty"` COS *types.COSEngineConfig `json:"cos,omitempty"` TOS *types.TOSEngineConfig `json:"tos,omitempty"` S3 *types.S3EngineConfig `json:"s3,omitempty"` OSS *types.OSSEngineConfig `json:"oss,omitempty"` KS3 *types.KS3EngineConfig `json:"ks3,omitempty"` + OBS *types.OBSEngineConfig `json:"obs,omitempty"` } // StorageCheckResponse is the response for a single-engine connectivity check. @@ -665,6 +666,8 @@ func (h *SystemHandler) CheckStorageEngine(c *gin.Context) { h.checkOSS(c, ctx, req.OSS) case "ks3": h.checkKS3(c, ctx, req.KS3) + case "obs": + h.checkOBS(c, ctx, req.OBS) default: c.JSON(200, gin.H{"code": 0, "data": StorageCheckResponse{OK: true, Message: "本地存储无需检测"}}) } @@ -934,6 +937,52 @@ func (h *SystemHandler) checkKS3(c *gin.Context, ctx context.Context, cfg *types c.JSON(200, gin.H{"code": 0, "data": StorageCheckResponse{OK: true, Message: fmt.Sprintf("连接成功,Bucket「%s」已确认存在", cfg.BucketName)}}) } +func (h *SystemHandler) checkOBS(c *gin.Context, ctx context.Context, cfg *types.OBSEngineConfig) { + if cfg == nil { + c.JSON(200, gin.H{"code": 0, "data": StorageCheckResponse{OK: false, Message: "未提供 OBS 配置"}}) + return + } + + endpoint, region, accessKey, secretKey := cfg.Endpoint, cfg.Region, cfg.AccessKey, cfg.SecretKey + if endpoint == "" || region == "" || accessKey == "" || secretKey == "" || cfg.BucketName == "" { + c.JSON(200, gin.H{"code": 0, "data": StorageCheckResponse{OK: false, Message: "Endpoint、Region、Access Key、Secret Key、Bucket 名称不能为空"}}) + return + } + + ssrfEndpoint := strings.TrimPrefix(strings.TrimPrefix(endpoint, "https://"), "http://") + if blocked, reason := isBlockedStorageEndpoint(ssrfEndpoint); blocked { + logger.Warnf(ctx, "Storage check: OBS endpoint blocked by SSRF protection, endpoint: %s", endpoint) + c.JSON(200, gin.H{"code": 0, "data": StorageCheckResponse{OK: false, Message: reason}}) + return + } + + if !ossFieldPattern.MatchString(cfg.Region) { + c.JSON(200, gin.H{"code": 0, "data": StorageCheckResponse{OK: false, Message: "Region 格式不正确,仅允许字母、数字、点、连字符"}}) + return + } + if !ossFieldPattern.MatchString(cfg.BucketName) { + c.JSON(200, gin.H{"code": 0, "data": StorageCheckResponse{OK: false, Message: "Bucket 名称格式不正确,仅允许字母、数字、点、连字符"}}) + return + } + + err := file.CheckObsConnectivity(ctx, endpoint, region, accessKey, secretKey, cfg.BucketName) + if err != nil { + logger.Errorf(ctx, "Storage check: OBS connectivity failed, bucket: %s, error: %v", cfg.BucketName, err) + errMsg := err.Error() + if strings.Contains(errMsg, "403") || strings.Contains(errMsg, "AccessDenied") { + c.JSON(200, gin.H{"code": 0, "data": StorageCheckResponse{OK: false, Message: "认证失败,请检查 Access Key / Secret Key 是否正确"}}) + return + } + if strings.Contains(errMsg, "404") || strings.Contains(errMsg, "NoSuchBucket") { + c.JSON(200, gin.H{"code": 0, "data": StorageCheckResponse{OK: false, Message: fmt.Sprintf("Bucket「%s」不存在,请检查名称和 Region", cfg.BucketName)}}) + return + } + c.JSON(200, gin.H{"code": 0, "data": StorageCheckResponse{OK: false, Message: sanitizeStorageCheckError(err)}}) + return + } + c.JSON(200, gin.H{"code": 0, "data": StorageCheckResponse{OK: true, Message: fmt.Sprintf("连接成功,Bucket「%s」已确认存在", cfg.BucketName)}}) +} + func (h *SystemHandler) ResolveDocumentReader(ctx context.Context, addr string) interfaces.DocumentReader { if addr == "" { return h.documentReader diff --git a/internal/infrastructure/docparser/image_resolver.go b/internal/infrastructure/docparser/image_resolver.go index 9c2b35bf..5adce2c6 100644 --- a/internal/infrastructure/docparser/image_resolver.go +++ b/internal/infrastructure/docparser/image_resolver.go @@ -187,7 +187,7 @@ func extFromMime(mime string) string { // isProviderScheme checks if the path uses a provider:// scheme (local://, minio://, cos://, tos://). func isProviderScheme(p string) bool { - for _, prefix := range []string{"local://", "minio://", "cos://", "tos://", "s3://"} { + for _, prefix := range []string{"local://", "minio://", "cos://", "tos://", "s3://", "obs://"} { if strings.HasPrefix(p, prefix) { return true } diff --git a/internal/types/knowledgebase.go b/internal/types/knowledgebase.go index 848aa2fe..4671fb68 100644 --- a/internal/types/knowledgebase.go +++ b/internal/types/knowledgebase.go @@ -199,7 +199,7 @@ func (c ChunkingConfig) ResolveParserEngine(fileType string) string { // StorageProviderConfig stores the KB-level storage provider selection. // Credentials are managed at the tenant level (StorageEngineConfig). type StorageProviderConfig struct { - Provider string `yaml:"provider" json:"provider"` // "local", "minio", "cos", "tos", "s3", "oss", "ks3" + Provider string `yaml:"provider" json:"provider"` // "local", "minio", "cos", "tos", "s3", "oss", "ks3", "obs" } func (c StorageProviderConfig) Value() (driver.Value, error) { @@ -220,13 +220,26 @@ func (c *StorageProviderConfig) Scan(value interface{}) error { // Deprecated: StorageConfig is the legacy COS configuration stored in the cos_config column. // New code should use StorageProviderConfig. Kept for backward compatibility with old data. type StorageConfig struct { - SecretID string `yaml:"secret_id" json:"secret_id"` - SecretKey string `yaml:"secret_key" json:"secret_key"` - Region string `yaml:"region" json:"region"` + // Secret ID (COS) / Access Key ID (S3, MinIO) + SecretID string `yaml:"secret_id" json:"secret_id"` + // Secret Key (COS) / Secret Access Key (S3, MinIO) + SecretKey string `yaml:"secret_key" json:"secret_key"` + // Region + Region string `yaml:"region" json:"region"` + // Bucket Name BucketName string `yaml:"bucket_name" json:"bucket_name"` - AppID string `yaml:"app_id" json:"app_id"` + // App ID (COS specific) + AppID string `yaml:"app_id" json:"app_id"` + // Path Prefix PathPrefix string `yaml:"path_prefix" json:"path_prefix"` - Provider string `yaml:"provider" json:"provider"` + // Provider: "cos", "minio", "s3" + Provider string `yaml:"provider" json:"provider"` + // Endpoint (S3 specific) - e.g., s3.amazonaws.com, oss-cn-hangzhou.aliyuncs.com + Endpoint string `yaml:"endpoint" json:"endpoint,omitempty"` + // UseSSL (S3 specific) - whether to use HTTPS + UseSSL bool `yaml:"use_ssl" json:"use_ssl,omitempty"` + // ForcePathStyle (S3 specific) - whether to use path-style URLs + ForcePathStyle bool `yaml:"force_path_style" json:"force_path_style,omitempty"` } func (c StorageConfig) Value() (driver.Value, error) { @@ -313,7 +326,7 @@ func InferStorageFromFilePath(filePath string) string { // e.g. "minio://bucket/key" → "minio", "local://tenant/file.pdf" → "local" // Returns "" if the path does not use a known provider scheme. func ParseProviderScheme(filePath string) string { - for _, provider := range []string{"local", "minio", "cos", "tos", "s3", "oss", "ks3"} { + for _, provider := range []string{"local", "minio", "cos", "tos", "s3", "oss", "ks3", "obs"} { if strings.HasPrefix(filePath, provider+"://") { return provider } diff --git a/internal/types/tenant.go b/internal/types/tenant.go index fa3fec98..ef68e674 100644 --- a/internal/types/tenant.go +++ b/internal/types/tenant.go @@ -351,10 +351,10 @@ func (c *ParserEngineConfig) Scan(value interface{}) error { return json.Unmarshal(b, c) } -// StorageEngineConfig holds tenant-level storage engine parameters for Local, MinIO, COS, TOS, S3, OSS, and KS3. +// StorageEngineConfig holds tenant-level storage engine parameters for Local, MinIO, COS, TOS, S3, OSS, KS3, and OBS. // Knowledge bases select which provider to use; parameters are read from here. type StorageEngineConfig struct { - DefaultProvider string `json:"default_provider"` // "local", "minio", "cos", "tos", "s3", "oss", "ks3" + DefaultProvider string `json:"default_provider"` // "local", "minio", "cos", "tos", "s3", "oss", "ks3", "obs" Local *LocalEngineConfig `json:"local,omitempty"` MinIO *MinIOEngineConfig `json:"minio,omitempty"` COS *COSEngineConfig `json:"cos,omitempty"` @@ -362,6 +362,7 @@ type StorageEngineConfig struct { S3 *S3EngineConfig `json:"s3,omitempty"` OSS *OSSEngineConfig `json:"oss,omitempty"` KS3 *KS3EngineConfig `json:"ks3,omitempty"` + OBS *OBSEngineConfig `json:"obs,omitempty"` } // LocalEngineConfig is for local file system storage (single-machine deployment only). @@ -403,12 +404,14 @@ type TOSEngineConfig struct { // S3EngineConfig is for AWS S3 and S3-compatible object storage. type S3EngineConfig struct { - Endpoint string `json:"endpoint"` - Region string `json:"region"` - AccessKey string `json:"access_key"` - SecretKey string `json:"secret_key"` - BucketName string `json:"bucket_name"` - PathPrefix string `json:"path_prefix"` + Endpoint string `json:"endpoint"` + Region string `json:"region"` + AccessKey string `json:"access_key"` + SecretKey string `json:"secret_key"` + BucketName string `json:"bucket_name"` + PathPrefix string `json:"path_prefix"` + UseSSL bool `json:"use_ssl"` + ForcePathStyle bool `json:"force_path_style"` } // OSSEngineConfig is for Alibaba Cloud OSS (对象存储服务). @@ -434,6 +437,17 @@ type KS3EngineConfig struct { PathPrefix string `json:"path_prefix"` } +// OBSEngineConfig is for Huawei Cloud OBS (对象存储服务). +type OBSEngineConfig struct { + Endpoint string `json:"endpoint"` + Region string `json:"region"` + AccessKey string `json:"access_key"` + SecretKey string `json:"secret_key"` + BucketName string `json:"bucket_name"` + PathPrefix string `json:"path_prefix"` + UseSSL bool `json:"use_ssl"` +} + // Value implements the driver.Valuer interface for StorageEngineConfig func (c *StorageEngineConfig) Value() (driver.Value, error) { if c == nil {