diff --git a/pkg/mcp/pftools/manager.go b/pkg/mcp/pftools/manager.go index 083a433..08f799f 100644 --- a/pkg/mcp/pftools/manager.go +++ b/pkg/mcp/pftools/manager.go @@ -20,7 +20,6 @@ import ( "encoding/json" "errors" "fmt" - "log" "strings" "sync" "time" @@ -31,6 +30,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sirupsen/logrus" "github.com/streamnative/streamnative-mcp-server/pkg/kafka" "github.com/streamnative/streamnative-mcp-server/pkg/pulsar" "github.com/streamnative/streamnative-mcp-server/pkg/schema" @@ -59,7 +59,7 @@ type Server struct { MCPServer *server.MCPServer KafkaSession *kafka.Session PulsarSession *pulsar.Session - Logger interface{} + Logger *logrus.Logger } // NewPulsarFunctionManager creates a new PulsarFunctionManager @@ -89,6 +89,11 @@ func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerO options = DefaultManagerOptions() } + logger := snServer.Logger + if logger == nil { + logger = logrus.New() + } + // Create the manager manager := &PulsarFunctionManager{ adminClient: adminClient, @@ -103,6 +108,7 @@ func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerO stopCh: make(chan struct{}), callInProgressMap: make(map[string]context.CancelFunc), mcpServer: snServer.MCPServer, + logger: logger, readOnly: readOnly, defaultTimeout: options.DefaultTimeout, circuitBreakers: make(map[string]*CircuitBreaker), @@ -127,11 +133,11 @@ func (m *PulsarFunctionManager) Stop() { m.producerMutex.Lock() defer m.producerMutex.Unlock() for topic, producer := range m.producerCache { - log.Printf("Closing producer for topic: %s", topic) + m.logger.WithField("topic", topic).Info("Closing producer for topic") producer.Close() } m.producerCache = make(map[string]pulsarclient.Producer) - log.Println("All cached producers closed and cache cleared.") + m.logger.Info("All cached producers closed and cache cleared") } // pollFunctions polls for functions periodically @@ -157,7 +163,7 @@ func (m *PulsarFunctionManager) updateFunctions() { // Get all functions functions, err := m.getFunctionsList() if err != nil { - log.Printf("Failed to get functions list: %v", err) + m.logger.WithError(err).Warn("Failed to get functions list") // Check if this is a cluster health error and invoke callback if configured if (IsClusterUnhealthy(err) || IsAuthError(err) || IsNotFoundError(err)) && m.clusterErrorHandler != nil { @@ -176,7 +182,7 @@ func (m *PulsarFunctionManager) updateFunctions() { configHash, hashErr := computeFunctionConfigHash(fn) if hashErr != nil { - log.Printf("Failed to compute config hash for function %s: %v", fullName, hashErr) + m.logger.WithError(hashErr).WithField("function", fullName).Warn("Failed to compute config hash") } // Check if we already have this function @@ -247,9 +253,15 @@ func (m *PulsarFunctionManager) updateFunctions() { } if logNow { if err != nil { - log.Printf("Failed to convert function %s to tool: %v (category=%s)", fullName, failureErr, category) + m.logger.WithError(failureErr).WithFields(logrus.Fields{ + "function": fullName, + "category": category, + }).Warn("Failed to convert function to tool") } else { - log.Printf("Failed to fetch schema for function %s, retry later: %v (category=%s)", fullName, failureErr, category) + m.logger.WithError(failureErr).WithFields(logrus.Fields{ + "function": fullName, + "category": category, + }).Warn("Failed to fetch schema for function, retry later") } } continue @@ -265,7 +277,10 @@ func (m *PulsarFunctionManager) updateFunctions() { if m.sessionID != "" { err := m.mcpServer.DeleteSessionTools(m.sessionID, fnTool.Tool.Name) if err != nil { - log.Printf("Failed to delete tool %s from session %s: %v", fnTool.Tool.Name, m.sessionID, err) + m.logger.WithError(err).WithFields(logrus.Fields{ + "tool": fnTool.Tool.Name, + "session_id": m.sessionID, + }).Warn("Failed to delete tool from session") } } else { m.mcpServer.DeleteTools(fnTool.Tool.Name) @@ -274,7 +289,10 @@ func (m *PulsarFunctionManager) updateFunctions() { if m.sessionID != "" { err := m.mcpServer.AddSessionTool(m.sessionID, fnTool.Tool, m.handleToolCall(fnTool)) if err != nil { - log.Printf("Failed to add tool %s to session %s: %v", fnTool.Tool.Name, m.sessionID, err) + m.logger.WithError(err).WithFields(logrus.Fields{ + "tool": fnTool.Tool.Name, + "session_id": m.sessionID, + }).Warn("Failed to add tool to session") } } else { m.mcpServer.AddTool(fnTool.Tool, m.handleToolCall(fnTool)) @@ -286,9 +304,15 @@ func (m *PulsarFunctionManager) updateFunctions() { m.mutex.Unlock() if changed { - log.Printf("Updated function %s as MCP tool [%s]", fullName, fnTool.Tool.Name) + m.logger.WithFields(logrus.Fields{ + "function": fullName, + "tool": fnTool.Tool.Name, + }).Info("Updated function as MCP tool") } else { - log.Printf("Added function %s as MCP tool [%s]", fullName, fnTool.Tool.Name) + m.logger.WithFields(logrus.Fields{ + "function": fullName, + "tool": fnTool.Tool.Name, + }).Info("Added function as MCP tool") } } @@ -299,14 +323,20 @@ func (m *PulsarFunctionManager) updateFunctions() { if m.sessionID != "" { err := m.mcpServer.DeleteSessionTools(m.sessionID, fnTool.Tool.Name) if err != nil { - log.Printf("Failed to delete tool %s from session %s: %v", fnTool.Tool.Name, m.sessionID, err) + m.logger.WithError(err).WithFields(logrus.Fields{ + "tool": fnTool.Tool.Name, + "session_id": m.sessionID, + }).Warn("Failed to delete tool from session") } } else { m.mcpServer.DeleteTools(fnTool.Tool.Name) } delete(m.fnToToolMap, fullName) delete(m.failedFunctions, fullName) - log.Printf("Removed function %s from MCP tools [%s]", fullName, fnTool.Tool.Name) + m.logger.WithFields(logrus.Fields{ + "function": fullName, + "tool": fnTool.Tool.Name, + }).Info("Removed function from MCP tools") } } m.mutex.Unlock() @@ -378,7 +408,7 @@ func (m *PulsarFunctionManager) getFunctionsList() ([]*utils.FunctionConfig, err for _, tn := range m.tenantNamespaces { parts := strings.Split(tn, "/") if len(parts) != 2 { - log.Printf("Invalid tenant/namespace format: %s", tn) + m.logger.WithField("tenant_namespace", tn).Warn("Invalid tenant/namespace format") continue } @@ -387,7 +417,10 @@ func (m *PulsarFunctionManager) getFunctionsList() ([]*utils.FunctionConfig, err functions, err := m.getFunctionsInNamespace(tenant, namespace) if err != nil { - log.Printf("Failed to get functions in namespace %s/%s: %v", tenant, namespace, err) + m.logger.WithError(err).WithFields(logrus.Fields{ + "tenant": tenant, + "namespace": namespace, + }).Warn("Failed to get functions in namespace") continue } @@ -441,13 +474,17 @@ func (m *PulsarFunctionManager) getFunctionsInNamespace(tenant, namespace string for _, name := range functionNames { parts := strings.Split(name, "/") if len(parts) != 3 { - log.Printf("Invalid function name format: %s", name) + m.logger.WithField("function_name", name).Warn("Invalid function name format") continue } function, err := m.adminClient.Functions().GetFunction(parts[0], parts[1], parts[2]) if err != nil { - log.Printf("Failed to get function details for %s/%s/%s: %v", parts[0], parts[1], parts[2], err) + m.logger.WithError(err).WithFields(logrus.Fields{ + "tenant": parts[0], + "namespace": parts[1], + "function": parts[2], + }).Warn("Failed to get function details") continue } @@ -483,12 +520,18 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig) inputSchema = DefaultStringSchemaInfo if restError, ok := err.(rest.Error); ok { if restError.Code != 404 { - log.Printf("Failed to get schema for input topic %s: %v", inputTopic, err) + m.logger.WithError(err).WithFields(logrus.Fields{ + "topic": inputTopic, + "direction": "input", + }).Warn("Failed to get schema for topic") schemaFetchSuccess = false schemaFetchErr = errors.Join(schemaFetchErr, err) } } else { - log.Printf("Failed to get schema for input topic %s: %v", inputTopic, err) + m.logger.WithError(err).WithFields(logrus.Fields{ + "topic": inputTopic, + "direction": "input", + }).Warn("Failed to get schema for topic") schemaFetchSuccess = false schemaFetchErr = errors.Join(schemaFetchErr, err) } @@ -504,12 +547,18 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig) outputSchema = DefaultStringSchemaInfo if restError, ok := err.(rest.Error); ok { if restError.Code != 404 { - log.Printf("Failed to get schema for output topic %s: %v", outputTopic, err) + m.logger.WithError(err).WithFields(logrus.Fields{ + "topic": outputTopic, + "direction": "output", + }).Warn("Failed to get schema for topic") schemaFetchSuccess = false schemaFetchErr = errors.Join(schemaFetchErr, err) } } else { - log.Printf("Failed to get schema for output topic %s: %v", outputTopic, err) + m.logger.WithError(err).WithFields(logrus.Fields{ + "topic": outputTopic, + "direction": "output", + }).Warn("Failed to get schema for topic") schemaFetchSuccess = false schemaFetchErr = errors.Join(schemaFetchErr, err) } @@ -688,6 +737,6 @@ func (m *PulsarFunctionManager) GetProducer(topic string) (pulsarclient.Producer } m.producerCache[topic] = newProducer - log.Printf("Created and cached producer for topic: %s", topic) + m.logger.WithField("topic", topic).Info("Created and cached producer for topic") return newProducer, nil } diff --git a/pkg/mcp/pftools/types.go b/pkg/mcp/pftools/types.go index 3e01b5a..55b97ae 100644 --- a/pkg/mcp/pftools/types.go +++ b/pkg/mcp/pftools/types.go @@ -23,6 +23,7 @@ import ( "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "github.com/sirupsen/logrus" "github.com/streamnative/pulsarctl/pkg/cmdutils" ) @@ -40,6 +41,7 @@ type PulsarFunctionManager struct { stopCh chan struct{} callInProgressMap map[string]context.CancelFunc mcpServer *server.MCPServer + logger *logrus.Logger readOnly bool defaultTimeout time.Duration circuitBreakers map[string]*CircuitBreaker