mirror of
https://github.com/Tencent/WeKnora.git
synced 2026-06-04 13:30:32 +08:00
feat(server): enhance server startup and shutdown process
- Added support for SO_REUSEPORT in the listenWithRetry function to improve port binding during hot-reloads. - Implemented graceful shutdown by closing the listener immediately upon receiving a shutdown signal, allowing for quicker port release. - Updated logging to provide clearer feedback during server shutdown and error handling.
This commit is contained in:
@@ -26,7 +26,7 @@ type Agent struct {
|
||||
|
||||
// AgentConfig represents the configuration for an agent
|
||||
type AgentConfig struct {
|
||||
AgentMode string `json:"agent_mode"` // "quick-answer" or "smart-reasoning"
|
||||
AgentMode string `json:"agent_mode"` // "quick-answer" or "smart-reasoning"
|
||||
SystemPrompt string `json:"system_prompt,omitempty"`
|
||||
ContextTemplate string `json:"context_template,omitempty"`
|
||||
ModelID string `json:"model_id,omitempty"`
|
||||
@@ -35,10 +35,9 @@ type AgentConfig struct {
|
||||
MaxCompletionTokens int `json:"max_completion_tokens,omitempty"`
|
||||
MaxIterations int `json:"max_iterations,omitempty"`
|
||||
AllowedTools []string `json:"allowed_tools,omitempty"`
|
||||
ReflectionEnabled bool `json:"reflection_enabled,omitempty"`
|
||||
MCPSelectionMode string `json:"mcp_selection_mode,omitempty"` // "all", "selected", "none"
|
||||
MCPSelectionMode string `json:"mcp_selection_mode,omitempty"` // "all", "selected", "none"
|
||||
MCPServices []string `json:"mcp_services,omitempty"`
|
||||
KBSelectionMode string `json:"kb_selection_mode,omitempty"` // "all", "selected", "none"
|
||||
KBSelectionMode string `json:"kb_selection_mode,omitempty"` // "all", "selected", "none"
|
||||
KnowledgeBases []string `json:"knowledge_bases,omitempty"`
|
||||
SupportedFileTypes []string `json:"supported_file_types,omitempty"`
|
||||
FAQPriorityEnabled bool `json:"faq_priority_enabled,omitempty"`
|
||||
@@ -57,7 +56,7 @@ type AgentConfig struct {
|
||||
EnableRewrite bool `json:"enable_rewrite,omitempty"`
|
||||
RewritePromptSystem string `json:"rewrite_prompt_system,omitempty"`
|
||||
RewritePromptUser string `json:"rewrite_prompt_user,omitempty"`
|
||||
FallbackStrategy string `json:"fallback_strategy,omitempty"` // "fixed" or "model"
|
||||
FallbackStrategy string `json:"fallback_strategy,omitempty"` // "fixed" or "model"
|
||||
FallbackResponse string `json:"fallback_response,omitempty"`
|
||||
FallbackPrompt string `json:"fallback_prompt,omitempty"`
|
||||
}
|
||||
|
||||
@@ -40,6 +40,8 @@ import (
|
||||
"github.com/Tencent/WeKnora/internal/runtime"
|
||||
"github.com/Tencent/WeKnora/internal/tracing"
|
||||
"github.com/Tencent/WeKnora/internal/types/interfaces"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
func main() {
|
||||
@@ -65,14 +67,24 @@ func main() {
|
||||
Handler: router,
|
||||
}
|
||||
|
||||
addr := fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port)
|
||||
listener, err := listenWithRetry(addr, 10, 300*time.Millisecond)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start server: %v", err)
|
||||
}
|
||||
|
||||
ctx, done := context.WithCancel(context.Background())
|
||||
|
||||
signals := make(chan os.Signal, 1)
|
||||
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
|
||||
go func() {
|
||||
sig := <-signals
|
||||
logger.Infof(context.Background(), "Received signal: %v, starting server shutdown...", sig)
|
||||
|
||||
// Create a context with timeout for server shutdown
|
||||
// Close listener first to release port immediately,
|
||||
// so the next process can bind during our graceful drain.
|
||||
listener.Close()
|
||||
|
||||
shutdownTimeout := cfg.Server.ShutdownTimeout
|
||||
if shutdownTimeout == 0 {
|
||||
shutdownTimeout = 30 * time.Second
|
||||
@@ -80,11 +92,18 @@ func main() {
|
||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
|
||||
defer shutdownCancel()
|
||||
|
||||
// Second signal → force close all connections immediately
|
||||
go func() {
|
||||
sig := <-signals
|
||||
logger.Warnf(context.Background(), "Received second signal: %v, forcing shutdown...", sig)
|
||||
server.Close()
|
||||
}()
|
||||
|
||||
if err := server.Shutdown(shutdownCtx); err != nil {
|
||||
logger.Fatalf(context.Background(), "Server forced to shutdown: %v", err)
|
||||
logger.Errorf(context.Background(), "Server forced to shutdown: %v", err)
|
||||
server.Close()
|
||||
}
|
||||
|
||||
// Clean up all registered resources
|
||||
logger.Info(context.Background(), "Cleaning up resources...")
|
||||
errs := resourceCleaner.Cleanup(shutdownCtx)
|
||||
if len(errs) > 0 {
|
||||
@@ -94,18 +113,11 @@ func main() {
|
||||
done()
|
||||
}()
|
||||
|
||||
// Start server with retry to handle port not yet released during hot-reload
|
||||
addr := fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port)
|
||||
listener, err := listenWithRetry(addr, 10, 300*time.Millisecond)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start server: %v", err)
|
||||
}
|
||||
logger.Infof(context.Background(), "Server is running at %s", addr)
|
||||
if err := server.Serve(listener); err != nil && err != http.ErrServerClosed {
|
||||
return fmt.Errorf("server error: %v", err)
|
||||
}
|
||||
|
||||
// Wait for shutdown signal
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
})
|
||||
@@ -114,12 +126,20 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
// listenWithRetry retries net.Listen with exponential backoff,
|
||||
// listenWithRetry retries listening with exponential backoff and SO_REUSEPORT,
|
||||
// useful during hot-reload when the previous process may not have released the port yet.
|
||||
func listenWithRetry(addr string, maxRetries int, baseDelay time.Duration) (net.Listener, error) {
|
||||
lc := net.ListenConfig{
|
||||
Control: func(network, address string, c syscall.RawConn) error {
|
||||
return c.Control(func(fd uintptr) {
|
||||
_ = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, unix.SO_REUSEPORT, 1)
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
var lastErr error
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
listener, err := net.Listen("tcp", addr)
|
||||
listener, err := lc.Listen(context.Background(), "tcp", addr)
|
||||
if err == nil {
|
||||
return listener, nil
|
||||
}
|
||||
|
||||
@@ -231,22 +231,5 @@ func (e *AgentEngine) executeToolCalls(
|
||||
},
|
||||
})
|
||||
|
||||
// Optional: Reflection after each tool call (streaming)
|
||||
if e.config.ReflectionEnabled && result != nil {
|
||||
reflection, err := e.streamReflectionToEventBus(
|
||||
ctx, tc.ID, tc.Function.Name, result.Output,
|
||||
iteration, sessionID,
|
||||
)
|
||||
if err != nil {
|
||||
logger.Warnf(ctx, "Reflection failed: %v", err)
|
||||
} else if reflection != "" {
|
||||
// Store reflection in the corresponding tool call
|
||||
// Find the tool call we just added and update it
|
||||
if len(step.ToolCalls) > 0 {
|
||||
lastIdx := len(step.ToolCalls) - 1
|
||||
step.ToolCalls[lastIdx].Reflection = reflection
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,8 +13,6 @@ const (
|
||||
DefaultAgentTemperature = 0.7
|
||||
// DefaultAgentMaxIterations is the default maximum number of iterations for the agent
|
||||
DefaultAgentMaxIterations = 20
|
||||
// DefaultAgentReflectionEnabled is the default whether to enable reflection for the agent
|
||||
DefaultAgentReflectionEnabled = false
|
||||
// DefaultUseCustomSystemPrompt is the default whether to use custom system prompt for the agent
|
||||
DefaultUseCustomSystemPrompt = false
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
"github.com/Tencent/WeKnora/internal/agent/token"
|
||||
"github.com/Tencent/WeKnora/internal/logger"
|
||||
"github.com/Tencent/WeKnora/internal/models/chat"
|
||||
"github.com/Tencent/WeKnora/internal/types"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -317,7 +316,3 @@ const consolidationSystemPrompt = "" +
|
||||
"- Be structured with clear sections if the conversation covered multiple topics\n" +
|
||||
"- Be concise — aim for 30% or less of the original length\n\n" +
|
||||
"Output only the summary, no preamble or explanation."
|
||||
|
||||
// ChatResponse is used to extract content from LLM response.
|
||||
// This avoids a direct dependency on the types package for the response format.
|
||||
var _ = (*types.ChatResponse)(nil) // compile-time check that types.ChatResponse exists
|
||||
|
||||
@@ -68,60 +68,6 @@ func (e *AgentEngine) streamLLMToEventBus(
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// streamReflectionToEventBus streams reflection process through EventBus
|
||||
// Note: Reflection is now handled through the think tool in main loop
|
||||
func (e *AgentEngine) streamReflectionToEventBus(
|
||||
ctx context.Context,
|
||||
toolCallID string,
|
||||
toolName string,
|
||||
result string,
|
||||
iteration int,
|
||||
sessionID string,
|
||||
) (string, error) {
|
||||
// Simplified reflection without BuildReflectionPrompt
|
||||
reflectionPrompt := fmt.Sprintf(`Evaluate the result of calling tool %s and decide the next action.
|
||||
|
||||
Tool returned: %s
|
||||
|
||||
Think:
|
||||
1. Does the result satisfy the requirement?
|
||||
2. What should be done next?`, toolName, result)
|
||||
|
||||
messages := []chat.Message{
|
||||
{Role: "user", Content: reflectionPrompt},
|
||||
}
|
||||
|
||||
// Generate a single ID for this entire reflection stream
|
||||
reflectionID := generateEventID("reflection")
|
||||
|
||||
llmResult, err := e.streamLLMToEventBus(
|
||||
ctx,
|
||||
messages,
|
||||
&chat.ChatOptions{Temperature: 0.5},
|
||||
func(chunk *types.StreamResponse, fullContent string) {
|
||||
if chunk.Content != "" {
|
||||
e.eventBus.Emit(ctx, event.Event{
|
||||
ID: reflectionID,
|
||||
Type: event.EventAgentReflection,
|
||||
SessionID: sessionID,
|
||||
Data: event.AgentReflectionData{
|
||||
ToolCallID: toolCallID,
|
||||
Content: chunk.Content,
|
||||
Iteration: iteration,
|
||||
Done: chunk.Done,
|
||||
},
|
||||
})
|
||||
}
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
logger.Warnf(ctx, "Reflection failed: %v", err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
return llmResult.Content, nil
|
||||
}
|
||||
|
||||
// streamThinkingToEventBus streams the thinking process through EventBus
|
||||
func (e *AgentEngine) streamThinkingToEventBus(
|
||||
ctx context.Context,
|
||||
|
||||
@@ -206,7 +206,6 @@ func (s *sessionService) buildAgentConfig(
|
||||
customAgent := req.CustomAgent
|
||||
agentConfig := &types.AgentConfig{
|
||||
MaxIterations: customAgent.Config.MaxIterations,
|
||||
ReflectionEnabled: customAgent.Config.ReflectionEnabled,
|
||||
Temperature: customAgent.Config.Temperature,
|
||||
WebSearchEnabled: customAgent.Config.WebSearchEnabled && req.WebSearchEnabled,
|
||||
WebSearchMaxResults: customAgent.Config.WebSearchMaxResults,
|
||||
|
||||
@@ -448,11 +448,10 @@ func (h *TenantHandler) SearchTenants(c *gin.Context) {
|
||||
|
||||
// AgentConfigRequest represents the request body for updating agent configuration
|
||||
type AgentConfigRequest struct {
|
||||
MaxIterations int `json:"max_iterations"`
|
||||
ReflectionEnabled bool `json:"reflection_enabled"`
|
||||
AllowedTools []string `json:"allowed_tools"`
|
||||
Temperature float64 `json:"temperature"`
|
||||
SystemPrompt string `json:"system_prompt,omitempty"` // Unified system prompt (uses {{web_search_status}} placeholder)
|
||||
MaxIterations int `json:"max_iterations"`
|
||||
AllowedTools []string `json:"allowed_tools"`
|
||||
Temperature float64 `json:"temperature"`
|
||||
SystemPrompt string `json:"system_prompt,omitempty"` // Unified system prompt (uses {{web_search_status}} placeholder)
|
||||
}
|
||||
|
||||
// GetTenantAgentConfig godoc
|
||||
@@ -501,7 +500,6 @@ func (h *TenantHandler) GetTenantAgentConfig(c *gin.Context) {
|
||||
"success": true,
|
||||
"data": gin.H{
|
||||
"max_iterations": agent.DefaultAgentMaxIterations,
|
||||
"reflection_enabled": agent.DefaultAgentReflectionEnabled,
|
||||
"allowed_tools": agenttools.DefaultAllowedTools(),
|
||||
"temperature": agent.DefaultAgentTemperature,
|
||||
"system_prompt": agent.GetProgressiveRAGSystemPrompt(h.config),
|
||||
@@ -524,7 +522,6 @@ func (h *TenantHandler) GetTenantAgentConfig(c *gin.Context) {
|
||||
"success": true,
|
||||
"data": gin.H{
|
||||
"max_iterations": tenant.AgentConfig.MaxIterations,
|
||||
"reflection_enabled": tenant.AgentConfig.ReflectionEnabled,
|
||||
"allowed_tools": agenttools.DefaultAllowedTools(),
|
||||
"temperature": tenant.AgentConfig.Temperature,
|
||||
"system_prompt": systemPrompt,
|
||||
@@ -572,7 +569,6 @@ func (h *TenantHandler) updateTenantAgentConfigInternal(c *gin.Context) {
|
||||
|
||||
agentConfig := &types.AgentConfig{
|
||||
MaxIterations: req.MaxIterations,
|
||||
ReflectionEnabled: req.ReflectionEnabled,
|
||||
AllowedTools: agenttools.DefaultAllowedTools(),
|
||||
Temperature: req.Temperature,
|
||||
SystemPrompt: systemPrompt,
|
||||
|
||||
@@ -10,13 +10,12 @@ import (
|
||||
// AgentConfig represents the full agent configuration (used at tenant level and runtime)
|
||||
// This includes all configuration parameters for agent execution
|
||||
type AgentConfig struct {
|
||||
MaxIterations int `json:"max_iterations"` // Maximum number of ReAct iterations
|
||||
ReflectionEnabled bool `json:"reflection_enabled"` // Whether to enable reflection
|
||||
AllowedTools []string `json:"allowed_tools"` // List of allowed tool names
|
||||
Temperature float64 `json:"temperature"` // LLM temperature for agent
|
||||
KnowledgeBases []string `json:"knowledge_bases"` // Accessible knowledge base IDs
|
||||
KnowledgeIDs []string `json:"knowledge_ids"` // Accessible knowledge IDs (individual documents)
|
||||
SystemPrompt string `json:"system_prompt,omitempty"` // Unified system prompt (uses web_search_status placeholder for dynamic behavior)
|
||||
MaxIterations int `json:"max_iterations"` // Maximum number of ReAct iterations
|
||||
AllowedTools []string `json:"allowed_tools"` // List of allowed tool names
|
||||
Temperature float64 `json:"temperature"` // LLM temperature for agent
|
||||
KnowledgeBases []string `json:"knowledge_bases"` // Accessible knowledge base IDs
|
||||
KnowledgeIDs []string `json:"knowledge_ids"` // Accessible knowledge IDs (individual documents)
|
||||
SystemPrompt string `json:"system_prompt,omitempty"` // Unified system prompt (uses web_search_status placeholder for dynamic behavior)
|
||||
// Deprecated: Use SystemPrompt instead. Kept for backward compatibility during migration.
|
||||
SystemPromptWebEnabled string `json:"system_prompt_web_enabled,omitempty"` // Deprecated: Custom prompt when web search is enabled
|
||||
SystemPromptWebDisabled string `json:"system_prompt_web_disabled,omitempty"` // Deprecated: Custom prompt when web search is disabled
|
||||
|
||||
@@ -93,8 +93,6 @@ type CustomAgentConfig struct {
|
||||
MaxIterations int `yaml:"max_iterations" json:"max_iterations"`
|
||||
// Allowed tools (only for agent type)
|
||||
AllowedTools []string `yaml:"allowed_tools" json:"allowed_tools"`
|
||||
// Whether reflection is enabled (only for agent type)
|
||||
ReflectionEnabled bool `yaml:"reflection_enabled" json:"reflection_enabled"`
|
||||
// MCP service selection mode: "all" = all enabled MCP services, "selected" = specific services, "none" = no MCP
|
||||
MCPSelectionMode string `yaml:"mcp_selection_mode" json:"mcp_selection_mode"`
|
||||
// Selected MCP service IDs (only used when MCPSelectionMode is "selected")
|
||||
|
||||
Reference in New Issue
Block a user