Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 72 additions & 23 deletions pkg/mcp/pftools/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"strings"
"sync"
"time"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
"github.com/sirupsen/logrus"
"github.com/streamnative/streamnative-mcp-server/pkg/kafka"
"github.com/streamnative/streamnative-mcp-server/pkg/pulsar"
"github.com/streamnative/streamnative-mcp-server/pkg/schema"
Expand Down Expand Up @@ -59,7 +59,7 @@ type Server struct {
MCPServer *server.MCPServer
KafkaSession *kafka.Session
PulsarSession *pulsar.Session
Logger interface{}
Logger *logrus.Logger
}

// NewPulsarFunctionManager creates a new PulsarFunctionManager
Expand Down Expand Up @@ -89,6 +89,11 @@ func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerO
options = DefaultManagerOptions()
}

logger := snServer.Logger
if logger == nil {
logger = logrus.New()
}

// Create the manager
manager := &PulsarFunctionManager{
adminClient: adminClient,
Expand All @@ -103,6 +108,7 @@ func NewPulsarFunctionManager(snServer *Server, readOnly bool, options *ManagerO
stopCh: make(chan struct{}),
callInProgressMap: make(map[string]context.CancelFunc),
mcpServer: snServer.MCPServer,
logger: logger,
readOnly: readOnly,
defaultTimeout: options.DefaultTimeout,
circuitBreakers: make(map[string]*CircuitBreaker),
Expand All @@ -127,11 +133,11 @@ func (m *PulsarFunctionManager) Stop() {
m.producerMutex.Lock()
defer m.producerMutex.Unlock()
for topic, producer := range m.producerCache {
log.Printf("Closing producer for topic: %s", topic)
m.logger.WithField("topic", topic).Info("Closing producer for topic")
producer.Close()
}
m.producerCache = make(map[string]pulsarclient.Producer)
log.Println("All cached producers closed and cache cleared.")
m.logger.Info("All cached producers closed and cache cleared")
}

// pollFunctions polls for functions periodically
Expand All @@ -157,7 +163,7 @@ func (m *PulsarFunctionManager) updateFunctions() {
// Get all functions
functions, err := m.getFunctionsList()
if err != nil {
log.Printf("Failed to get functions list: %v", err)
m.logger.WithError(err).Warn("Failed to get functions list")

// Check if this is a cluster health error and invoke callback if configured
if (IsClusterUnhealthy(err) || IsAuthError(err) || IsNotFoundError(err)) && m.clusterErrorHandler != nil {
Expand All @@ -176,7 +182,7 @@ func (m *PulsarFunctionManager) updateFunctions() {

configHash, hashErr := computeFunctionConfigHash(fn)
if hashErr != nil {
log.Printf("Failed to compute config hash for function %s: %v", fullName, hashErr)
m.logger.WithError(hashErr).WithField("function", fullName).Warn("Failed to compute config hash")
}

// Check if we already have this function
Expand Down Expand Up @@ -247,9 +253,15 @@ func (m *PulsarFunctionManager) updateFunctions() {
}
if logNow {
if err != nil {
log.Printf("Failed to convert function %s to tool: %v (category=%s)", fullName, failureErr, category)
m.logger.WithError(failureErr).WithFields(logrus.Fields{
"function": fullName,
"category": category,
}).Warn("Failed to convert function to tool")
} else {
log.Printf("Failed to fetch schema for function %s, retry later: %v (category=%s)", fullName, failureErr, category)
m.logger.WithError(failureErr).WithFields(logrus.Fields{
"function": fullName,
"category": category,
}).Warn("Failed to fetch schema for function, retry later")
}
}
continue
Expand All @@ -265,7 +277,10 @@ func (m *PulsarFunctionManager) updateFunctions() {
if m.sessionID != "" {
err := m.mcpServer.DeleteSessionTools(m.sessionID, fnTool.Tool.Name)
if err != nil {
log.Printf("Failed to delete tool %s from session %s: %v", fnTool.Tool.Name, m.sessionID, err)
m.logger.WithError(err).WithFields(logrus.Fields{
"tool": fnTool.Tool.Name,
"session_id": m.sessionID,
}).Warn("Failed to delete tool from session")
}
} else {
m.mcpServer.DeleteTools(fnTool.Tool.Name)
Expand All @@ -274,7 +289,10 @@ func (m *PulsarFunctionManager) updateFunctions() {
if m.sessionID != "" {
err := m.mcpServer.AddSessionTool(m.sessionID, fnTool.Tool, m.handleToolCall(fnTool))
if err != nil {
log.Printf("Failed to add tool %s to session %s: %v", fnTool.Tool.Name, m.sessionID, err)
m.logger.WithError(err).WithFields(logrus.Fields{
"tool": fnTool.Tool.Name,
"session_id": m.sessionID,
}).Warn("Failed to add tool to session")
}
} else {
m.mcpServer.AddTool(fnTool.Tool, m.handleToolCall(fnTool))
Expand All @@ -286,9 +304,15 @@ func (m *PulsarFunctionManager) updateFunctions() {
m.mutex.Unlock()

if changed {
log.Printf("Updated function %s as MCP tool [%s]", fullName, fnTool.Tool.Name)
m.logger.WithFields(logrus.Fields{
"function": fullName,
"tool": fnTool.Tool.Name,
}).Info("Updated function as MCP tool")
} else {
log.Printf("Added function %s as MCP tool [%s]", fullName, fnTool.Tool.Name)
m.logger.WithFields(logrus.Fields{
"function": fullName,
"tool": fnTool.Tool.Name,
}).Info("Added function as MCP tool")
}
}

Expand All @@ -299,14 +323,20 @@ func (m *PulsarFunctionManager) updateFunctions() {
if m.sessionID != "" {
err := m.mcpServer.DeleteSessionTools(m.sessionID, fnTool.Tool.Name)
if err != nil {
log.Printf("Failed to delete tool %s from session %s: %v", fnTool.Tool.Name, m.sessionID, err)
m.logger.WithError(err).WithFields(logrus.Fields{
"tool": fnTool.Tool.Name,
"session_id": m.sessionID,
}).Warn("Failed to delete tool from session")
}
} else {
m.mcpServer.DeleteTools(fnTool.Tool.Name)
}
delete(m.fnToToolMap, fullName)
delete(m.failedFunctions, fullName)
log.Printf("Removed function %s from MCP tools [%s]", fullName, fnTool.Tool.Name)
m.logger.WithFields(logrus.Fields{
"function": fullName,
"tool": fnTool.Tool.Name,
}).Info("Removed function from MCP tools")
}
}
m.mutex.Unlock()
Expand Down Expand Up @@ -378,7 +408,7 @@ func (m *PulsarFunctionManager) getFunctionsList() ([]*utils.FunctionConfig, err
for _, tn := range m.tenantNamespaces {
parts := strings.Split(tn, "/")
if len(parts) != 2 {
log.Printf("Invalid tenant/namespace format: %s", tn)
m.logger.WithField("tenant_namespace", tn).Warn("Invalid tenant/namespace format")
continue
}

Expand All @@ -387,7 +417,10 @@ func (m *PulsarFunctionManager) getFunctionsList() ([]*utils.FunctionConfig, err

functions, err := m.getFunctionsInNamespace(tenant, namespace)
if err != nil {
log.Printf("Failed to get functions in namespace %s/%s: %v", tenant, namespace, err)
m.logger.WithError(err).WithFields(logrus.Fields{
"tenant": tenant,
"namespace": namespace,
}).Warn("Failed to get functions in namespace")
continue
}

Expand Down Expand Up @@ -441,13 +474,17 @@ func (m *PulsarFunctionManager) getFunctionsInNamespace(tenant, namespace string
for _, name := range functionNames {
parts := strings.Split(name, "/")
if len(parts) != 3 {
log.Printf("Invalid function name format: %s", name)
m.logger.WithField("function_name", name).Warn("Invalid function name format")
continue
}

function, err := m.adminClient.Functions().GetFunction(parts[0], parts[1], parts[2])
if err != nil {
log.Printf("Failed to get function details for %s/%s/%s: %v", parts[0], parts[1], parts[2], err)
m.logger.WithError(err).WithFields(logrus.Fields{
"tenant": parts[0],
"namespace": parts[1],
"function": parts[2],
}).Warn("Failed to get function details")
continue
}

Expand Down Expand Up @@ -483,12 +520,18 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig)
inputSchema = DefaultStringSchemaInfo
if restError, ok := err.(rest.Error); ok {
if restError.Code != 404 {
log.Printf("Failed to get schema for input topic %s: %v", inputTopic, err)
m.logger.WithError(err).WithFields(logrus.Fields{
"topic": inputTopic,
"direction": "input",
}).Warn("Failed to get schema for topic")
schemaFetchSuccess = false
schemaFetchErr = errors.Join(schemaFetchErr, err)
}
} else {
log.Printf("Failed to get schema for input topic %s: %v", inputTopic, err)
m.logger.WithError(err).WithFields(logrus.Fields{
"topic": inputTopic,
"direction": "input",
}).Warn("Failed to get schema for topic")
schemaFetchSuccess = false
schemaFetchErr = errors.Join(schemaFetchErr, err)
}
Expand All @@ -504,12 +547,18 @@ func (m *PulsarFunctionManager) convertFunctionToTool(fn *utils.FunctionConfig)
outputSchema = DefaultStringSchemaInfo
if restError, ok := err.(rest.Error); ok {
if restError.Code != 404 {
log.Printf("Failed to get schema for output topic %s: %v", outputTopic, err)
m.logger.WithError(err).WithFields(logrus.Fields{
"topic": outputTopic,
"direction": "output",
}).Warn("Failed to get schema for topic")
schemaFetchSuccess = false
schemaFetchErr = errors.Join(schemaFetchErr, err)
}
} else {
log.Printf("Failed to get schema for output topic %s: %v", outputTopic, err)
m.logger.WithError(err).WithFields(logrus.Fields{
"topic": outputTopic,
"direction": "output",
}).Warn("Failed to get schema for topic")
schemaFetchSuccess = false
schemaFetchErr = errors.Join(schemaFetchErr, err)
}
Expand Down Expand Up @@ -688,6 +737,6 @@ func (m *PulsarFunctionManager) GetProducer(topic string) (pulsarclient.Producer
}

m.producerCache[topic] = newProducer
log.Printf("Created and cached producer for topic: %s", topic)
m.logger.WithField("topic", topic).Info("Created and cached producer for topic")
return newProducer, nil
}
2 changes: 2 additions & 0 deletions pkg/mcp/pftools/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
"github.com/sirupsen/logrus"
"github.com/streamnative/pulsarctl/pkg/cmdutils"
)

Expand All @@ -40,6 +41,7 @@ type PulsarFunctionManager struct {
stopCh chan struct{}
callInProgressMap map[string]context.CancelFunc
mcpServer *server.MCPServer
logger *logrus.Logger
readOnly bool
defaultTimeout time.Duration
circuitBreakers map[string]*CircuitBreaker
Expand Down