mirror of
https://github.com/Tencent/WeKnora.git
synced 2026-06-04 13:30:32 +08:00
chore(deps): 更新并新增go.mod依赖项
- 新增aliyun alibabacloud-oss-go-sdk-v2依赖 - 删除部分冗余依赖,调整依赖列表结构 - 添加jackc/pgx/v5作为直接依赖 - 统一和规范间接依赖声明顺序 - 移除go.opentelemetry.io/contrib的版本替换声明
This commit is contained in:
committed by
lyingbug
parent
5082460cc0
commit
a2900d5540
3
go.mod
3
go.mod
@@ -5,6 +5,7 @@ go 1.24.11
|
||||
require (
|
||||
github.com/JohannesKaufmann/html-to-markdown/v2 v2.5.0
|
||||
github.com/PuerkitoBio/goquery v1.10.3
|
||||
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.4.1
|
||||
github.com/asg017/sqlite-vec-go-bindings v0.1.6
|
||||
github.com/aws/aws-sdk-go-v2 v1.41.3
|
||||
github.com/aws/aws-sdk-go-v2/config v1.29.14
|
||||
@@ -24,6 +25,7 @@ require (
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
github.com/hibiken/asynq v0.25.1
|
||||
github.com/jackc/pgx/v5 v5.7.2
|
||||
github.com/joho/godotenv v1.5.1
|
||||
github.com/larksuite/oapi-sdk-go/v3 v3.5.3
|
||||
github.com/longbridgeapp/opencc v0.3.13
|
||||
@@ -190,7 +192,6 @@ require (
|
||||
github.com/invopop/jsonschema v0.13.0 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||
github.com/jackc/pgx/v5 v5.7.2 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
github.com/jchv/go-winloader v0.0.0-20210711035445-715c2860da7e // indirect
|
||||
github.com/jinzhu/inflection v1.0.0 // indirect
|
||||
|
||||
2
go.sum
2
go.sum
@@ -1377,6 +1377,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
|
||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
|
||||
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.4.1 h1:wF5rZUhhahzJiRSeLSCQhAkaDBXLa/R893C/ZmEpGcE=
|
||||
github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.4.1/go.mod h1:FTzydeQVmR24FI0D6XWUOMKckjXehM/jgMn1xC+DA9M=
|
||||
github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
|
||||
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
|
||||
github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ=
|
||||
|
||||
@@ -7,25 +7,23 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Tencent/WeKnora/internal/types/interfaces"
|
||||
"github.com/Tencent/WeKnora/internal/utils"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
|
||||
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// ossFileService implements the FileService interface for Aliyun OSS
|
||||
// using the S3-compatible protocol with virtual-hosted style addressing.
|
||||
// using the official Aliyun OSS SDK v2 (github.com/aliyun/alibabacloud-oss-go-sdk-v2).
|
||||
type ossFileService struct {
|
||||
client *s3.Client
|
||||
tempClient *s3.Client
|
||||
client *oss.Client
|
||||
tempClient *oss.Client
|
||||
pathPrefix string
|
||||
bucketName string
|
||||
tempBucketName string
|
||||
@@ -33,61 +31,37 @@ type ossFileService struct {
|
||||
|
||||
const ossScheme = "oss://"
|
||||
|
||||
// newOSSClient creates a bare s3.Client configured for OSS S3-compatible mode.
|
||||
// OSS uses virtual-hosted style addressing and does not support aws-chunked encoding.
|
||||
func newOSSClient(endpoint, region, accessKey, secretKey string) (*s3.Client, error) {
|
||||
cfg, err := config.LoadDefaultConfig(context.Background(),
|
||||
config.WithRegion(region),
|
||||
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")),
|
||||
// Disable automatic aws-chunked encoding — OSS does not support it.
|
||||
config.WithRequestChecksumCalculation(aws.RequestChecksumCalculationWhenRequired),
|
||||
config.WithResponseChecksumValidation(aws.ResponseChecksumValidationWhenRequired),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load AWS config: %w", err)
|
||||
}
|
||||
// newOSSClient creates an OSS client using the official Aliyun SDK v2.
|
||||
func newOSSClient(endpoint, region, accessKey, secretKey string) (*oss.Client, error) {
|
||||
creds := credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
|
||||
|
||||
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
|
||||
o.BaseEndpoint = aws.String(endpoint)
|
||||
o.UsePathStyle = false
|
||||
})
|
||||
cfg := oss.LoadDefaultConfig().
|
||||
WithCredentialsProvider(creds).
|
||||
WithRegion(region).
|
||||
WithEndpoint(endpoint)
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// ossBucketExists checks if the bucket exists using the provided client.
|
||||
func ossBucketExists(ctx context.Context, client *s3.Client, bucketName string) (bool, error) {
|
||||
_, err := client.HeadBucket(ctx, &s3.HeadBucketInput{
|
||||
Bucket: aws.String(bucketName),
|
||||
})
|
||||
if err != nil {
|
||||
var notFound *types.NotFound
|
||||
if errors.As(err, ¬Found) {
|
||||
return false, nil
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// ossCreateBucket creates a new bucket using the provided client.
|
||||
func ossCreateBucket(ctx context.Context, client *s3.Client, bucketName string) error {
|
||||
_, err := client.CreateBucket(ctx, &s3.CreateBucketInput{
|
||||
Bucket: aws.String(bucketName),
|
||||
})
|
||||
return err
|
||||
return oss.NewClient(cfg), nil
|
||||
}
|
||||
|
||||
// ossEnsureBucket checks if the bucket exists and creates it if missing.
|
||||
func ossEnsureBucket(ctx context.Context, client *s3.Client, bucketName string) error {
|
||||
exists, err := ossBucketExists(ctx, client, bucketName)
|
||||
func ossEnsureBucket(client *oss.Client, bucketName string) error {
|
||||
exists, err := client.IsBucketExist(context.Background(), bucketName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check bucket: %w", err)
|
||||
return fmt.Errorf("failed to check OSS bucket: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
if err := ossCreateBucket(ctx, client, bucketName); err != nil {
|
||||
return fmt.Errorf("failed to create bucket: %w", err)
|
||||
if exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = client.PutBucket(context.Background(), &oss.PutBucketRequest{
|
||||
Bucket: oss.Ptr(bucketName),
|
||||
})
|
||||
if err != nil {
|
||||
var svcErr *oss.ServiceError
|
||||
if errors.As(err, &svcErr) && svcErr.StatusCode == http.StatusConflict {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("failed to create OSS bucket: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -105,11 +79,11 @@ func NewOssFileServiceWithTempBucket(endpoint, region, accessKey, secretKey, buc
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := ossEnsureBucket(context.Background(), client, bucketName); err != nil {
|
||||
if err := ossEnsureBucket(client, bucketName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var tempClient *s3.Client
|
||||
var tempClient *oss.Client
|
||||
if tempBucketName != "" {
|
||||
if tempRegion == "" {
|
||||
tempRegion = region
|
||||
@@ -118,7 +92,7 @@ func NewOssFileServiceWithTempBucket(endpoint, region, accessKey, secretKey, buc
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize OSS temp client: %w", err)
|
||||
}
|
||||
if err := ossEnsureBucket(context.Background(), tempClient, tempBucketName); err != nil {
|
||||
if err := ossEnsureBucket(tempClient, tempBucketName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@@ -144,15 +118,12 @@ func CheckOssConnectivity(ctx context.Context, endpoint, region, accessKey, secr
|
||||
return err
|
||||
}
|
||||
|
||||
checkCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
exists, err := ossBucketExists(checkCtx, client, bucketName)
|
||||
exists, err := client.IsBucketExist(ctx, bucketName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check bucket: %w", err)
|
||||
return fmt.Errorf("failed to check OSS bucket: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
return fmt.Errorf("bucket %q does not exist", bucketName)
|
||||
return fmt.Errorf("bucket %q does not exist or is not accessible", bucketName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -176,9 +147,9 @@ func (s *ossFileService) CheckConnectivity(ctx context.Context) error {
|
||||
checkCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
exists, err := ossBucketExists(checkCtx, s.client, s.bucketName)
|
||||
exists, err := s.client.IsBucketExist(checkCtx, s.bucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("failed to check OSS bucket: %w", err)
|
||||
}
|
||||
if !exists {
|
||||
return fmt.Errorf("bucket %q does not exist", s.bucketName)
|
||||
@@ -186,7 +157,7 @@ func (s *ossFileService) CheckConnectivity(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SaveFile saves a file to OSS.
|
||||
// SaveFile saves a file to OSS using the Uploader manager for large files.
|
||||
func (s *ossFileService) SaveFile(ctx context.Context,
|
||||
file *multipart.FileHeader, tenantID uint64, knowledgeID string,
|
||||
) (string, error) {
|
||||
@@ -204,14 +175,35 @@ func (s *ossFileService) SaveFile(ctx context.Context,
|
||||
contentType = getContentTypeByExt(ext)
|
||||
}
|
||||
|
||||
_, err = s.client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(s.bucketName),
|
||||
Key: aws.String(objectName),
|
||||
Body: src,
|
||||
ContentType: aws.String(contentType),
|
||||
})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to upload file to OSS: %w", err)
|
||||
// Use Uploader for files > 10MB (auto multipart with concurrent uploads)
|
||||
const multipartThreshold = 10 * 1024 * 1024
|
||||
if file.Size > multipartThreshold {
|
||||
uploader := s.client.NewUploader(func(uo *oss.UploaderOptions) {
|
||||
uo.PartSize = 10 * 1024 * 1024 // 10MB per part
|
||||
uo.ParallelNum = 3 // 3 concurrent uploads
|
||||
})
|
||||
|
||||
_, err = uploader.UploadFrom(ctx,
|
||||
&oss.PutObjectRequest{
|
||||
Bucket: oss.Ptr(s.bucketName),
|
||||
Key: oss.Ptr(objectName),
|
||||
ContentType: oss.Ptr(contentType),
|
||||
},
|
||||
src,
|
||||
)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to upload file to OSS (multipart): %w", err)
|
||||
}
|
||||
} else {
|
||||
_, err = s.client.PutObject(ctx, &oss.PutObjectRequest{
|
||||
Bucket: oss.Ptr(s.bucketName),
|
||||
Key: oss.Ptr(objectName),
|
||||
Body: src,
|
||||
ContentType: oss.Ptr(contentType),
|
||||
})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to upload file to OSS: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Sprintf("oss://%s/%s", s.bucketName, objectName), nil
|
||||
@@ -227,34 +219,27 @@ func (s *ossFileService) SaveBytes(ctx context.Context, data []byte, tenantID ui
|
||||
}
|
||||
ext := filepath.Ext(safeName)
|
||||
|
||||
// If requesting temp bucket and it is configured, use it
|
||||
targetBucket := s.bucketName
|
||||
client := s.client
|
||||
objectName := fmt.Sprintf("%s%d/exports/%s%s", s.pathPrefix, tenantID, uuid.New().String(), ext)
|
||||
|
||||
if temp && s.tempClient != nil {
|
||||
objectName := fmt.Sprintf("exports/%d/%s%s", tenantID, uuid.New().String(), ext)
|
||||
_, err := s.tempClient.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(s.tempBucketName),
|
||||
Key: aws.String(objectName),
|
||||
Body: bytes.NewReader(data),
|
||||
ContentType: aws.String("text/csv; charset=utf-8"),
|
||||
})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to upload bytes to OSS temp bucket: %w", err)
|
||||
}
|
||||
return fmt.Sprintf("oss://%s/%s", s.tempBucketName, objectName), nil
|
||||
targetBucket = s.tempBucketName
|
||||
client = s.tempClient
|
||||
objectName = fmt.Sprintf("exports/%d/%s%s", tenantID, uuid.New().String(), ext)
|
||||
}
|
||||
|
||||
// Save to main bucket
|
||||
objectName := fmt.Sprintf("%s%d/exports/%s%s", s.pathPrefix, tenantID, uuid.New().String(), ext)
|
||||
_, err = s.client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(s.bucketName),
|
||||
Key: aws.String(objectName),
|
||||
_, err = client.PutObject(ctx, &oss.PutObjectRequest{
|
||||
Bucket: oss.Ptr(targetBucket),
|
||||
Key: oss.Ptr(objectName),
|
||||
Body: bytes.NewReader(data),
|
||||
ContentType: aws.String("text/csv; charset=utf-8"),
|
||||
ContentType: oss.Ptr("text/csv; charset=utf-8"),
|
||||
})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to upload bytes to OSS: %w", err)
|
||||
}
|
||||
|
||||
return fmt.Sprintf("oss://%s/%s", s.bucketName, objectName), nil
|
||||
return fmt.Sprintf("oss://%s/%s", targetBucket, objectName), nil
|
||||
}
|
||||
|
||||
// GetFile retrieves a file from OSS by its path.
|
||||
@@ -267,9 +252,16 @@ func (s *ossFileService) GetFile(ctx context.Context, filePath string) (io.ReadC
|
||||
return nil, fmt.Errorf("invalid file path: %w", err)
|
||||
}
|
||||
|
||||
resp, err := s.client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(bucketName),
|
||||
Key: aws.String(objectName),
|
||||
var client *oss.Client
|
||||
if bucketName == s.tempBucketName && s.tempClient != nil {
|
||||
client = s.tempClient
|
||||
} else {
|
||||
client = s.client
|
||||
}
|
||||
|
||||
resp, err := client.GetObject(ctx, &oss.GetObjectRequest{
|
||||
Bucket: oss.Ptr(bucketName),
|
||||
Key: oss.Ptr(objectName),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get file from OSS: %w", err)
|
||||
@@ -288,9 +280,16 @@ func (s *ossFileService) DeleteFile(ctx context.Context, filePath string) error
|
||||
return fmt.Errorf("invalid file path: %w", err)
|
||||
}
|
||||
|
||||
_, err = s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
||||
Bucket: aws.String(bucketName),
|
||||
Key: aws.String(objectName),
|
||||
var client *oss.Client
|
||||
if bucketName == s.tempBucketName && s.tempClient != nil {
|
||||
client = s.tempClient
|
||||
} else {
|
||||
client = s.client
|
||||
}
|
||||
|
||||
_, err = client.DeleteObject(ctx, &oss.DeleteObjectRequest{
|
||||
Bucket: oss.Ptr(bucketName),
|
||||
Key: oss.Ptr(objectName),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete file from OSS: %w", err)
|
||||
@@ -310,22 +309,21 @@ func (s *ossFileService) GetFileURL(ctx context.Context, filePath string) (strin
|
||||
}
|
||||
|
||||
// Determine which client to use
|
||||
var client *s3.Client
|
||||
var client *oss.Client
|
||||
if bucketName == s.tempBucketName && s.tempClient != nil {
|
||||
client = s.tempClient
|
||||
} else {
|
||||
client = s.client
|
||||
}
|
||||
|
||||
presignClient := s3.NewPresignClient(client)
|
||||
|
||||
presignedReq, err := presignClient.PresignGetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(bucketName),
|
||||
Key: aws.String(objectName),
|
||||
}, s3.WithPresignExpires(24*time.Hour))
|
||||
// Generate presigned URL (valid for 24 hours)
|
||||
result, err := client.Presign(ctx, &oss.GetObjectRequest{
|
||||
Bucket: oss.Ptr(bucketName),
|
||||
Key: oss.Ptr(objectName),
|
||||
}, oss.PresignExpires(24*time.Hour))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to generate OSS presigned URL: %w", err)
|
||||
}
|
||||
|
||||
return presignedReq.URL, nil
|
||||
return result.URL, nil
|
||||
}
|
||||
|
||||
@@ -163,7 +163,7 @@ func TestCheckOssConnectivity_InvalidEndpoint(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestOssBucketExists_NonExistent(t *testing.T) {
|
||||
func TestOssEnsureBucket_NonExistent(t *testing.T) {
|
||||
client, err := newOSSClient(
|
||||
"https://oss-cn-hangzhou.aliyuncs.com",
|
||||
"cn-hangzhou",
|
||||
@@ -174,19 +174,14 @@ func TestOssBucketExists_NonExistent(t *testing.T) {
|
||||
t.Fatalf("newOSSClient() error: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Bucket that definitely doesn't exist - should return false, not error
|
||||
// (auth may fail first, but we verify the function doesn't crash)
|
||||
exists, err := ossBucketExists(ctx, client, "this-bucket-definitely-does-not-exist-12345")
|
||||
// We don't assert on the result because it depends on network/auth,
|
||||
// but the function should not panic or hang.
|
||||
_ = exists
|
||||
_ = err
|
||||
// Bucket that definitely doesn't exist - should return error
|
||||
err = ossEnsureBucket(client, "this-bucket-definitely-does-not-exist-12345")
|
||||
if err == nil {
|
||||
t.Error("ossEnsureBucket with non-existent bucket should return an error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOssCreateBucket(t *testing.T) {
|
||||
func TestOssEnsureBucket_CreateFails(t *testing.T) {
|
||||
client, err := newOSSClient(
|
||||
"https://oss-cn-hangzhou.aliyuncs.com",
|
||||
"cn-hangzhou",
|
||||
@@ -197,12 +192,9 @@ func TestOssCreateBucket(t *testing.T) {
|
||||
t.Fatalf("newOSSClient() error: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Should fail with invalid credentials
|
||||
err = ossCreateBucket(ctx, client, "test-bucket")
|
||||
err = ossEnsureBucket(client, "test-bucket")
|
||||
if err == nil {
|
||||
t.Error("ossCreateBucket with invalid credentials should return an error")
|
||||
t.Error("ossEnsureBucket with invalid credentials should return an error")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user