feat: correct data analysis errors using fileService.

This commit is contained in:
hjz
2026-04-27 10:06:45 +08:00
committed by lyingbug
parent e7cdd787f0
commit efae3d466f
4 changed files with 139 additions and 44 deletions

View File

@@ -5,6 +5,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
filesvc "github.com/Tencent/WeKnora/internal/application/service/file"
"io"
"os"
"strings"
@@ -40,25 +41,31 @@ type DataAnalysisInput struct {
type DataAnalysisTool struct {
BaseTool
knowledgeService interfaces.KnowledgeService
fileService interfaces.FileService
db *sql.DB
sessionID string
createdTables []string // Track tables created in this session
knowledgeBaseService interfaces.KnowledgeBaseService
knowledgeService interfaces.KnowledgeService
fileService interfaces.FileService
tenantService interfaces.TenantService
db *sql.DB
sessionID string
createdTables []string // Track tables created in this session
}
func NewDataAnalysisTool(
knowledgeBaseService interfaces.KnowledgeBaseService,
knowledgeService interfaces.KnowledgeService,
tenantService interfaces.TenantService,
fileService interfaces.FileService,
db *sql.DB,
sessionID string,
) *DataAnalysisTool {
return &DataAnalysisTool{
BaseTool: dataAnalysisTool,
knowledgeService: knowledgeService,
fileService: fileService,
db: db,
sessionID: sessionID,
BaseTool: dataAnalysisTool,
knowledgeBaseService: knowledgeBaseService,
knowledgeService: knowledgeService,
fileService: fileService,
tenantService: tenantService,
db: db,
sessionID: sessionID,
}
}
@@ -497,7 +504,7 @@ func (t *DataAnalysisTool) LoadFromKnowledge(ctx context.Context, knowledge *typ
func (t *DataAnalysisTool) materializeKnowledgeFile(ctx context.Context, knowledge *types.Knowledge) (string, func(), error) {
noop := func() {}
reader, err := t.fileService.GetFile(ctx, knowledge.FilePath)
reader, err := t.resolveFileServiceForKnowledge(ctx, knowledge).GetFile(ctx, knowledge.FilePath)
if err != nil {
return "", noop, fmt.Errorf("failed to open file for knowledge '%s': %w", knowledge.ID, err)
}
@@ -653,3 +660,79 @@ func (t *TableSchema) Description() string {
return builder.String()
}
// resolveFileServiceForKnowledge resolves a provider-specific FileService based on the knowledge file path.
// It falls back to the injected default service when provider/config cannot be resolved.
func (t *DataAnalysisTool) resolveFileServiceForKnowledge(ctx context.Context, knowledge *types.Knowledge) interfaces.FileService {
if knowledge == nil {
logger.Warnf(ctx, "[Tool][DataAnalysis][storage] fallback default: session_id=%s reason=knowledge_nil", t.sessionID)
return t.fileService
}
kbID := strings.TrimSpace(knowledge.KnowledgeBaseID)
var kb *types.KnowledgeBase
if t.knowledgeBaseService != nil && kbID != "" {
var err error
kb, err = t.knowledgeBaseService.GetKnowledgeBaseByID(ctx, kbID)
if err != nil {
logger.Warnf(ctx, "[Tool][DataAnalysis][storage] get kb failed, fallback default: session_id=%s knowledge_id=%s kb_id=%s err=%v",
t.sessionID, knowledge.ID, kbID, err)
return t.fileService
}
}
if kb == nil && kbID != "" {
logger.Infof(ctx, "[Tool][DataAnalysis][storage] kb not found, fallback default: session_id=%s knowledge_id=%s kb_id=%s",
t.sessionID, knowledge.ID, kbID)
return t.fileService
}
provider := ""
if kb != nil {
provider = kb.GetStorageProvider()
}
tenant, _ := ctx.Value(types.TenantInfoContextKey).(*types.Tenant)
if tenant == nil {
tenantID := uint64(0)
if tid, ok := ctx.Value(types.TenantIDContextKey).(uint64); ok {
tenantID = tid
}
if tenantID == 0 && kb != nil {
tenantID = knowledge.TenantID
}
if tenantID > 0 && t.tenantService != nil {
resolvedTenant, err := t.tenantService.GetTenantByID(ctx, tenantID)
if err != nil {
logger.Warnf(ctx, "[Tool][DataAnalysis][storage] get tenant failed: session_id=%s knowledge_id=%s kb_id=%s tenant_id=%d err=%v",
t.sessionID, knowledge.ID, kbID, tenantID, err)
} else if resolvedTenant != nil {
tenant = resolvedTenant
logger.Infof(ctx, "[Tool][DataAnalysis][storage] resolved tenant from service: session_id=%s knowledge_id=%s kb_id=%s tenant_id=%d",
t.sessionID, knowledge.ID, kbID, tenantID)
}
}
}
if provider == "" && tenant != nil && tenant.StorageEngineConfig != nil {
provider = strings.ToLower(strings.TrimSpace(tenant.StorageEngineConfig.DefaultProvider))
}
if provider == "" || tenant == nil || tenant.StorageEngineConfig == nil {
hasTenantStorageConfig := tenant != nil && tenant.StorageEngineConfig != nil
logger.Infof(ctx, "[Tool][DataAnalysis][storage] fallback default: session_id=%s knowledge_id=%s kb_id=%s provider=%q tenant_cfg=%t",
t.sessionID, knowledge.ID, kbID, provider, hasTenantStorageConfig)
return t.fileService
}
storageConfig := tenant.StorageEngineConfig
baseDir := strings.TrimSpace(os.Getenv("LOCAL_STORAGE_BASE_DIR"))
resolvedSvc, resolvedProvider, err := filesvc.NewFileServiceFromStorageConfig(provider, storageConfig, baseDir)
if err != nil {
logger.Warnf(ctx, "[Tool][DataAnalysis][storage] create file service failed, fallback default: session_id=%s knowledge_id=%s kb_id=%s provider=%s err=%v",
t.sessionID, knowledge.ID, kbID, provider, err)
return t.fileService
}
logger.Infof(ctx, "[Tool][DataAnalysis][storage] resolved file service: session_id=%s knowledge_id=%s kb_id=%s provider=%s",
t.sessionID, knowledge.ID, kbID, resolvedProvider)
return resolvedSvc
}

View File

@@ -58,6 +58,7 @@ type agentService struct {
duckdb *sql.DB
webSearchStateService interfaces.WebSearchStateService
wikiPageService interfaces.WikiPageService
tenantService interfaces.TenantService
}
// NewAgentService creates a new agent service
@@ -76,6 +77,7 @@ func NewAgentService(
duckdb *sql.DB,
webSearchStateService interfaces.WebSearchStateService,
wikiPageService interfaces.WikiPageService,
tenantService interfaces.TenantService,
) interfaces.AgentService {
return &agentService{
cfg: cfg,
@@ -92,6 +94,7 @@ func NewAgentService(
duckdb: duckdb,
webSearchStateService: webSearchStateService,
wikiPageService: wikiPageService,
tenantService: tenantService,
}
}
@@ -544,7 +547,7 @@ func (s *agentService) registerTools(
logger.Infof(ctx, "Registered web_fetch tool for session: %s", sessionID)
case tools.ToolDataAnalysis:
toolToRegister = tools.NewDataAnalysisTool(s.knowledgeService, s.fileService, s.duckdb, sessionID)
toolToRegister = tools.NewDataAnalysisTool(s.knowledgeBaseService, s.knowledgeService, s.tenantService, s.fileService, s.duckdb, sessionID)
logger.Infof(ctx, "Registered data_analysis tool for session: %s", sessionID)
case tools.ToolDataSchema:

View File

@@ -17,27 +17,33 @@ import (
)
type PluginDataAnalysis struct {
modelService interfaces.ModelService
knowledgeService interfaces.KnowledgeService
fileService interfaces.FileService
chunkRepo interfaces.ChunkRepository
db *sql.DB
modelService interfaces.ModelService
knowledgeBaseService interfaces.KnowledgeBaseService
knowledgeService interfaces.KnowledgeService
fileService interfaces.FileService
chunkRepo interfaces.ChunkRepository
tenantService interfaces.TenantService
db *sql.DB
}
func NewPluginDataAnalysis(
eventManager *EventManager,
modelService interfaces.ModelService,
knowledgeBaseService interfaces.KnowledgeBaseService,
knowledgeService interfaces.KnowledgeService,
fileService interfaces.FileService,
chunkRepo interfaces.ChunkRepository,
tenantService interfaces.TenantService,
db *sql.DB,
) *PluginDataAnalysis {
p := &PluginDataAnalysis{
modelService: modelService,
knowledgeService: knowledgeService,
fileService: fileService,
chunkRepo: chunkRepo,
db: db,
modelService: modelService,
knowledgeBaseService: knowledgeBaseService,
knowledgeService: knowledgeService,
fileService: fileService,
chunkRepo: chunkRepo,
tenantService: tenantService,
db: db,
}
eventManager.Register(p)
return p
@@ -83,7 +89,7 @@ func (p *PluginDataAnalysis) OnEvent(
}
// Initialize DataAnalysisTool
tool := tools.NewDataAnalysisTool(p.knowledgeService, p.fileService, p.db, chatManage.SessionID)
tool := tools.NewDataAnalysisTool(p.knowledgeBaseService, p.knowledgeService, p.tenantService, p.fileService, p.db, chatManage.SessionID)
defer tool.Cleanup(ctx)
// Load data into DuckDB
@@ -136,11 +142,11 @@ Return your response in the specified JSON format.`, chatManage.Query, knowledge
// 5. Store result
// Create a new SearchResult for the analysis output
analysisResult := &types.SearchResult{
ID: "analysis_" + knowledge.ID,
Content: toolResult.Output,
Score: 1.0,
MatchType: types.MatchTypeDataAnalysis,
KnowledgeID: knowledge.ID,
ID: "analysis_" + knowledge.ID,
Content: toolResult.Output,
Score: 1.0,
MatchType: types.MatchTypeDataAnalysis,
KnowledgeID: knowledge.ID,
KnowledgeTitle: knowledge.Title,
KnowledgeFilename: knowledge.FileName,
KnowledgeDescription: knowledge.Description,

View File

@@ -9,8 +9,8 @@ import (
"strings"
"github.com/Tencent/WeKnora/internal/agent/tools"
filesvc "github.com/Tencent/WeKnora/internal/application/service/file"
chatpipeline "github.com/Tencent/WeKnora/internal/application/service/chat_pipeline"
filesvc "github.com/Tencent/WeKnora/internal/application/service/file"
"github.com/Tencent/WeKnora/internal/application/service/retriever"
"github.com/Tencent/WeKnora/internal/config"
"github.com/Tencent/WeKnora/internal/logger"
@@ -252,18 +252,20 @@ type DataTableSummaryPayload struct {
// DataTableSummaryService is a service for extracting tables
type DataTableSummaryService struct {
modelService interfaces.ModelService
knowledgeService interfaces.KnowledgeService
fileService interfaces.FileService
chunkService interfaces.ChunkService
tenantService interfaces.TenantService
retrieveEngine interfaces.RetrieveEngineRegistry
sqlDB *sql.DB
modelService interfaces.ModelService
knowledgeBaseService interfaces.KnowledgeBaseService
knowledgeService interfaces.KnowledgeService
fileService interfaces.FileService
chunkService interfaces.ChunkService
tenantService interfaces.TenantService
retrieveEngine interfaces.RetrieveEngineRegistry
sqlDB *sql.DB
}
// NewDataTableSummaryService creates a new DataTableSummaryService
func NewDataTableSummaryService(
modelService interfaces.ModelService,
knowledgeBaseService interfaces.KnowledgeBaseService,
knowledgeService interfaces.KnowledgeService,
fileService interfaces.FileService,
chunkService interfaces.ChunkService,
@@ -272,13 +274,14 @@ func NewDataTableSummaryService(
sqlDB *sql.DB,
) interfaces.TaskHandler {
return &DataTableSummaryService{
modelService: modelService,
knowledgeService: knowledgeService,
fileService: fileService,
chunkService: chunkService,
tenantService: tenantService,
retrieveEngine: retrieveEngine,
sqlDB: sqlDB,
modelService: modelService,
knowledgeBaseService: knowledgeBaseService,
knowledgeService: knowledgeService,
fileService: fileService,
chunkService: chunkService,
tenantService: tenantService,
retrieveEngine: retrieveEngine,
sqlDB: sqlDB,
}
}
@@ -421,7 +424,7 @@ func (s *DataTableSummaryService) processTableData(ctx context.Context, resource
// 创建DuckDB会话并加载数据
sessionID := fmt.Sprintf("table_summary_%s", resources.knowledge.ID)
fileSvc := s.resolveFileServiceForKnowledge(ctx, resources)
duckdbTool := tools.NewDataAnalysisTool(s.knowledgeService, fileSvc, s.sqlDB, sessionID)
duckdbTool := tools.NewDataAnalysisTool(s.knowledgeBaseService, s.knowledgeService, s.tenantService, fileSvc, s.sqlDB, sessionID)
defer duckdbTool.Cleanup(ctx)
// 使用knowledge.ID作为表名根据文件类型自动加载数据