diff --git a/go.mod b/go.mod index 55506497..6c78ef97 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.24.11 require ( github.com/JohannesKaufmann/html-to-markdown/v2 v2.5.0 github.com/PuerkitoBio/goquery v1.10.3 + github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.4.1 github.com/asg017/sqlite-vec-go-bindings v0.1.6 github.com/aws/aws-sdk-go-v2 v1.41.3 github.com/aws/aws-sdk-go-v2/config v1.29.14 @@ -24,6 +25,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/hibiken/asynq v0.25.1 + github.com/jackc/pgx/v5 v5.7.2 github.com/joho/godotenv v1.5.1 github.com/larksuite/oapi-sdk-go/v3 v3.5.3 github.com/longbridgeapp/opencc v0.3.13 @@ -190,7 +192,6 @@ require ( github.com/invopop/jsonschema v0.13.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect - github.com/jackc/pgx/v5 v5.7.2 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jchv/go-winloader v0.0.0-20210711035445-715c2860da7e // indirect github.com/jinzhu/inflection v1.0.0 // indirect diff --git a/go.sum b/go.sum index fc44008a..84a30633 100644 --- a/go.sum +++ b/go.sum @@ -1377,6 +1377,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.4.1 h1:wF5rZUhhahzJiRSeLSCQhAkaDBXLa/R893C/ZmEpGcE= +github.com/aliyun/alibabacloud-oss-go-sdk-v2 v1.4.1/go.mod h1:FTzydeQVmR24FI0D6XWUOMKckjXehM/jgMn1xC+DA9M= github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= diff --git a/internal/application/service/file/oss.go b/internal/application/service/file/oss.go index fdfb9017..1757e2a8 100644 --- a/internal/application/service/file/oss.go +++ b/internal/application/service/file/oss.go @@ -7,25 +7,23 @@ import ( "fmt" "io" "mime/multipart" + "net/http" "path/filepath" "strings" "time" "github.com/Tencent/WeKnora/internal/types/interfaces" "github.com/Tencent/WeKnora/internal/utils" - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/credentials" - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss" + "github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials" "github.com/google/uuid" ) // ossFileService implements the FileService interface for Aliyun OSS -// using the S3-compatible protocol with virtual-hosted style addressing. +// using the official Aliyun OSS SDK v2 (github.com/aliyun/alibabacloud-oss-go-sdk-v2). type ossFileService struct { - client *s3.Client - tempClient *s3.Client + client *oss.Client + tempClient *oss.Client pathPrefix string bucketName string tempBucketName string @@ -33,61 +31,37 @@ type ossFileService struct { const ossScheme = "oss://" -// newOSSClient creates a bare s3.Client configured for OSS S3-compatible mode. -// OSS uses virtual-hosted style addressing and does not support aws-chunked encoding. -func newOSSClient(endpoint, region, accessKey, secretKey string) (*s3.Client, error) { - cfg, err := config.LoadDefaultConfig(context.Background(), - config.WithRegion(region), - config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")), - // Disable automatic aws-chunked encoding — OSS does not support it. - config.WithRequestChecksumCalculation(aws.RequestChecksumCalculationWhenRequired), - config.WithResponseChecksumValidation(aws.ResponseChecksumValidationWhenRequired), - ) - if err != nil { - return nil, fmt.Errorf("failed to load AWS config: %w", err) - } +// newOSSClient creates an OSS client using the official Aliyun SDK v2. +func newOSSClient(endpoint, region, accessKey, secretKey string) (*oss.Client, error) { + creds := credentials.NewStaticCredentialsProvider(accessKey, secretKey, "") - client := s3.NewFromConfig(cfg, func(o *s3.Options) { - o.BaseEndpoint = aws.String(endpoint) - o.UsePathStyle = false - }) + cfg := oss.LoadDefaultConfig(). + WithCredentialsProvider(creds). + WithRegion(region). + WithEndpoint(endpoint) - return client, nil -} - -// ossBucketExists checks if the bucket exists using the provided client. -func ossBucketExists(ctx context.Context, client *s3.Client, bucketName string) (bool, error) { - _, err := client.HeadBucket(ctx, &s3.HeadBucketInput{ - Bucket: aws.String(bucketName), - }) - if err != nil { - var notFound *types.NotFound - if errors.As(err, ¬Found) { - return false, nil - } - return false, err - } - return true, nil -} - -// ossCreateBucket creates a new bucket using the provided client. -func ossCreateBucket(ctx context.Context, client *s3.Client, bucketName string) error { - _, err := client.CreateBucket(ctx, &s3.CreateBucketInput{ - Bucket: aws.String(bucketName), - }) - return err + return oss.NewClient(cfg), nil } // ossEnsureBucket checks if the bucket exists and creates it if missing. -func ossEnsureBucket(ctx context.Context, client *s3.Client, bucketName string) error { - exists, err := ossBucketExists(ctx, client, bucketName) +func ossEnsureBucket(client *oss.Client, bucketName string) error { + exists, err := client.IsBucketExist(context.Background(), bucketName) if err != nil { - return fmt.Errorf("failed to check bucket: %w", err) + return fmt.Errorf("failed to check OSS bucket: %w", err) } - if !exists { - if err := ossCreateBucket(ctx, client, bucketName); err != nil { - return fmt.Errorf("failed to create bucket: %w", err) + if exists { + return nil + } + + _, err = client.PutBucket(context.Background(), &oss.PutBucketRequest{ + Bucket: oss.Ptr(bucketName), + }) + if err != nil { + var svcErr *oss.ServiceError + if errors.As(err, &svcErr) && svcErr.StatusCode == http.StatusConflict { + return nil } + return fmt.Errorf("failed to create OSS bucket: %w", err) } return nil } @@ -105,11 +79,11 @@ func NewOssFileServiceWithTempBucket(endpoint, region, accessKey, secretKey, buc return nil, err } - if err := ossEnsureBucket(context.Background(), client, bucketName); err != nil { + if err := ossEnsureBucket(client, bucketName); err != nil { return nil, err } - var tempClient *s3.Client + var tempClient *oss.Client if tempBucketName != "" { if tempRegion == "" { tempRegion = region @@ -118,7 +92,7 @@ func NewOssFileServiceWithTempBucket(endpoint, region, accessKey, secretKey, buc if err != nil { return nil, fmt.Errorf("failed to initialize OSS temp client: %w", err) } - if err := ossEnsureBucket(context.Background(), tempClient, tempBucketName); err != nil { + if err := ossEnsureBucket(tempClient, tempBucketName); err != nil { return nil, err } } @@ -144,15 +118,12 @@ func CheckOssConnectivity(ctx context.Context, endpoint, region, accessKey, secr return err } - checkCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - exists, err := ossBucketExists(checkCtx, client, bucketName) + exists, err := client.IsBucketExist(ctx, bucketName) if err != nil { - return fmt.Errorf("failed to check bucket: %w", err) + return fmt.Errorf("failed to check OSS bucket: %w", err) } if !exists { - return fmt.Errorf("bucket %q does not exist", bucketName) + return fmt.Errorf("bucket %q does not exist or is not accessible", bucketName) } return nil } @@ -176,9 +147,9 @@ func (s *ossFileService) CheckConnectivity(ctx context.Context) error { checkCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - exists, err := ossBucketExists(checkCtx, s.client, s.bucketName) + exists, err := s.client.IsBucketExist(checkCtx, s.bucketName) if err != nil { - return err + return fmt.Errorf("failed to check OSS bucket: %w", err) } if !exists { return fmt.Errorf("bucket %q does not exist", s.bucketName) @@ -186,7 +157,7 @@ func (s *ossFileService) CheckConnectivity(ctx context.Context) error { return nil } -// SaveFile saves a file to OSS. +// SaveFile saves a file to OSS using the Uploader manager for large files. func (s *ossFileService) SaveFile(ctx context.Context, file *multipart.FileHeader, tenantID uint64, knowledgeID string, ) (string, error) { @@ -204,14 +175,35 @@ func (s *ossFileService) SaveFile(ctx context.Context, contentType = getContentTypeByExt(ext) } - _, err = s.client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: aws.String(s.bucketName), - Key: aws.String(objectName), - Body: src, - ContentType: aws.String(contentType), - }) - if err != nil { - return "", fmt.Errorf("failed to upload file to OSS: %w", err) + // Use Uploader for files > 10MB (auto multipart with concurrent uploads) + const multipartThreshold = 10 * 1024 * 1024 + if file.Size > multipartThreshold { + uploader := s.client.NewUploader(func(uo *oss.UploaderOptions) { + uo.PartSize = 10 * 1024 * 1024 // 10MB per part + uo.ParallelNum = 3 // 3 concurrent uploads + }) + + _, err = uploader.UploadFrom(ctx, + &oss.PutObjectRequest{ + Bucket: oss.Ptr(s.bucketName), + Key: oss.Ptr(objectName), + ContentType: oss.Ptr(contentType), + }, + src, + ) + if err != nil { + return "", fmt.Errorf("failed to upload file to OSS (multipart): %w", err) + } + } else { + _, err = s.client.PutObject(ctx, &oss.PutObjectRequest{ + Bucket: oss.Ptr(s.bucketName), + Key: oss.Ptr(objectName), + Body: src, + ContentType: oss.Ptr(contentType), + }) + if err != nil { + return "", fmt.Errorf("failed to upload file to OSS: %w", err) + } } return fmt.Sprintf("oss://%s/%s", s.bucketName, objectName), nil @@ -227,34 +219,27 @@ func (s *ossFileService) SaveBytes(ctx context.Context, data []byte, tenantID ui } ext := filepath.Ext(safeName) - // If requesting temp bucket and it is configured, use it + targetBucket := s.bucketName + client := s.client + objectName := fmt.Sprintf("%s%d/exports/%s%s", s.pathPrefix, tenantID, uuid.New().String(), ext) + if temp && s.tempClient != nil { - objectName := fmt.Sprintf("exports/%d/%s%s", tenantID, uuid.New().String(), ext) - _, err := s.tempClient.PutObject(ctx, &s3.PutObjectInput{ - Bucket: aws.String(s.tempBucketName), - Key: aws.String(objectName), - Body: bytes.NewReader(data), - ContentType: aws.String("text/csv; charset=utf-8"), - }) - if err != nil { - return "", fmt.Errorf("failed to upload bytes to OSS temp bucket: %w", err) - } - return fmt.Sprintf("oss://%s/%s", s.tempBucketName, objectName), nil + targetBucket = s.tempBucketName + client = s.tempClient + objectName = fmt.Sprintf("exports/%d/%s%s", tenantID, uuid.New().String(), ext) } - // Save to main bucket - objectName := fmt.Sprintf("%s%d/exports/%s%s", s.pathPrefix, tenantID, uuid.New().String(), ext) - _, err = s.client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: aws.String(s.bucketName), - Key: aws.String(objectName), + _, err = client.PutObject(ctx, &oss.PutObjectRequest{ + Bucket: oss.Ptr(targetBucket), + Key: oss.Ptr(objectName), Body: bytes.NewReader(data), - ContentType: aws.String("text/csv; charset=utf-8"), + ContentType: oss.Ptr("text/csv; charset=utf-8"), }) if err != nil { return "", fmt.Errorf("failed to upload bytes to OSS: %w", err) } - return fmt.Sprintf("oss://%s/%s", s.bucketName, objectName), nil + return fmt.Sprintf("oss://%s/%s", targetBucket, objectName), nil } // GetFile retrieves a file from OSS by its path. @@ -267,9 +252,16 @@ func (s *ossFileService) GetFile(ctx context.Context, filePath string) (io.ReadC return nil, fmt.Errorf("invalid file path: %w", err) } - resp, err := s.client.GetObject(ctx, &s3.GetObjectInput{ - Bucket: aws.String(bucketName), - Key: aws.String(objectName), + var client *oss.Client + if bucketName == s.tempBucketName && s.tempClient != nil { + client = s.tempClient + } else { + client = s.client + } + + resp, err := client.GetObject(ctx, &oss.GetObjectRequest{ + Bucket: oss.Ptr(bucketName), + Key: oss.Ptr(objectName), }) if err != nil { return nil, fmt.Errorf("failed to get file from OSS: %w", err) @@ -288,9 +280,16 @@ func (s *ossFileService) DeleteFile(ctx context.Context, filePath string) error return fmt.Errorf("invalid file path: %w", err) } - _, err = s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ - Bucket: aws.String(bucketName), - Key: aws.String(objectName), + var client *oss.Client + if bucketName == s.tempBucketName && s.tempClient != nil { + client = s.tempClient + } else { + client = s.client + } + + _, err = client.DeleteObject(ctx, &oss.DeleteObjectRequest{ + Bucket: oss.Ptr(bucketName), + Key: oss.Ptr(objectName), }) if err != nil { return fmt.Errorf("failed to delete file from OSS: %w", err) @@ -310,22 +309,21 @@ func (s *ossFileService) GetFileURL(ctx context.Context, filePath string) (strin } // Determine which client to use - var client *s3.Client + var client *oss.Client if bucketName == s.tempBucketName && s.tempClient != nil { client = s.tempClient } else { client = s.client } - presignClient := s3.NewPresignClient(client) - - presignedReq, err := presignClient.PresignGetObject(ctx, &s3.GetObjectInput{ - Bucket: aws.String(bucketName), - Key: aws.String(objectName), - }, s3.WithPresignExpires(24*time.Hour)) + // Generate presigned URL (valid for 24 hours) + result, err := client.Presign(ctx, &oss.GetObjectRequest{ + Bucket: oss.Ptr(bucketName), + Key: oss.Ptr(objectName), + }, oss.PresignExpires(24*time.Hour)) if err != nil { return "", fmt.Errorf("failed to generate OSS presigned URL: %w", err) } - return presignedReq.URL, nil + return result.URL, nil } diff --git a/internal/application/service/file/oss_test.go b/internal/application/service/file/oss_test.go index 3a1b06b6..a6f71167 100644 --- a/internal/application/service/file/oss_test.go +++ b/internal/application/service/file/oss_test.go @@ -163,7 +163,7 @@ func TestCheckOssConnectivity_InvalidEndpoint(t *testing.T) { } } -func TestOssBucketExists_NonExistent(t *testing.T) { +func TestOssEnsureBucket_NonExistent(t *testing.T) { client, err := newOSSClient( "https://oss-cn-hangzhou.aliyuncs.com", "cn-hangzhou", @@ -174,19 +174,14 @@ func TestOssBucketExists_NonExistent(t *testing.T) { t.Fatalf("newOSSClient() error: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - // Bucket that definitely doesn't exist - should return false, not error - // (auth may fail first, but we verify the function doesn't crash) - exists, err := ossBucketExists(ctx, client, "this-bucket-definitely-does-not-exist-12345") - // We don't assert on the result because it depends on network/auth, - // but the function should not panic or hang. - _ = exists - _ = err + // Bucket that definitely doesn't exist - should return error + err = ossEnsureBucket(client, "this-bucket-definitely-does-not-exist-12345") + if err == nil { + t.Error("ossEnsureBucket with non-existent bucket should return an error") + } } -func TestOssCreateBucket(t *testing.T) { +func TestOssEnsureBucket_CreateFails(t *testing.T) { client, err := newOSSClient( "https://oss-cn-hangzhou.aliyuncs.com", "cn-hangzhou", @@ -197,12 +192,9 @@ func TestOssCreateBucket(t *testing.T) { t.Fatalf("newOSSClient() error: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - // Should fail with invalid credentials - err = ossCreateBucket(ctx, client, "test-bucket") + err = ossEnsureBucket(client, "test-bucket") if err == nil { - t.Error("ossCreateBucket with invalid credentials should return an error") + t.Error("ossEnsureBucket with invalid credentials should return an error") } }