diff --git a/cli/internal/scheduleserver/server.go b/cli/internal/scheduleserver/server.go new file mode 100644 index 0000000..6303372 --- /dev/null +++ b/cli/internal/scheduleserver/server.go @@ -0,0 +1,150 @@ +package scheduleserver + +import ( + "fmt" + "net" + "net/http" + "strconv" + + "github.com/nitrictech/suga/cli/internal/netx" +) + +// Server provides HTTP endpoints to manually trigger schedules +type Server struct { + services map[string]ServiceWithSchedules + mux *http.ServeMux + listener net.Listener + port netx.ReservedPort + server *http.Server +} + +// ServiceWithSchedules interface for services that support schedule triggering +type ServiceWithSchedules interface { + GetName() string + TriggerSchedule(index int, async bool) error +} + +// NewServer creates a new schedule trigger server +func NewServer(services map[string]ServiceWithSchedules) (*Server, error) { + // Get an available port + port, err := netx.GetNextPort(netx.MinPort(8000), netx.MaxPort(8999)) + if err != nil { + return nil, fmt.Errorf("failed to find open port: %w", err) + } + + s := &Server{ + services: services, + mux: http.NewServeMux(), + port: port, + } + + s.setupRoutes() + + return s, nil +} + +func (s *Server) setupRoutes() { + // Schedule trigger endpoint: GET /schedules/{serviceId}/{scheduleIndex}?async=true + s.mux.HandleFunc("/schedules/{serviceId}/{scheduleIndex}", s.handleTriggerSchedule) +} + +func (s *Server) handleTriggerSchedule(w http.ResponseWriter, r *http.Request) { + // Only accept GET requests + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + fmt.Fprintf(w, "✗ Method not allowed. Use GET to trigger schedules.\n") + return + } + + // Extract path parameters + serviceId := r.PathValue("serviceId") + scheduleIndexStr := r.PathValue("scheduleIndex") + + // Parse schedule index + scheduleIndex, err := strconv.Atoi(scheduleIndexStr) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprintf(w, "✗ Invalid schedule index: %s\n", scheduleIndexStr) + return + } + + // Check if schedule index is valid (non-negative) + if scheduleIndex < 0 { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprintf(w, "✗ Schedule index must be non-negative\n") + return + } + + // Find the service + svc, ok := s.services[serviceId] + if !ok { + w.WriteHeader(http.StatusNotFound) + fmt.Fprintf(w, "✗ Service '%s' not found\n", serviceId) + return + } + + // Parse async query parameter (defaults to false for synchronous execution) + asyncStr := r.URL.Query().Get("async") + async := false + if asyncStr == "true" || asyncStr == "1" { + async = true + } + + // Trigger the schedule + err = svc.TriggerSchedule(scheduleIndex, async) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(w, "✗ Failed to trigger schedule %d on service '%s': %v\n", scheduleIndex, serviceId, err) + return + } + + // Success response + w.WriteHeader(http.StatusOK) + if async { + fmt.Fprintf(w, "✓ Schedule %d on service '%s' triggered asynchronously\n", scheduleIndex, serviceId) + } else { + fmt.Fprintf(w, "✓ Schedule %d on service '%s' executed successfully\n", scheduleIndex, serviceId) + } +} + +// Start starts the HTTP server +func (s *Server) Start() error { + addr := fmt.Sprintf("localhost:%d", s.port) + + listener, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("failed to listen on %s: %w", addr, err) + } + + s.listener = listener + s.server = &http.Server{ + Handler: s.mux, + } + + // Start server in goroutine + go func() { + if err := s.server.Serve(listener); err != nil && err != http.ErrServerClosed { + fmt.Printf("Schedule trigger server error: %v\n", err) + } + }() + + return nil +} + +// Stop stops the HTTP server +func (s *Server) Stop() error { + if s.server != nil { + return s.server.Close() + } + return nil +} + +// GetPort returns the port the server is listening on +func (s *Server) GetPort() int { + return int(s.port) +} + +// GetURL returns the base URL of the schedule trigger server +func (s *Server) GetURL() string { + return fmt.Sprintf("http://localhost:%d", s.port) +} diff --git a/cli/internal/simulation/service/service.go b/cli/internal/simulation/service/service.go index a707d2b..76386a5 100644 --- a/cli/internal/simulation/service/service.go +++ b/cli/internal/simulation/service/service.go @@ -126,7 +126,7 @@ func (s *ServiceSimulation) hasExceededFailureLimit() bool { return len(s.consecutiveFailures) >= s.maxFailures } -func (s *ServiceSimulation) startSchedules(stdoutWriter, stderrorWriter io.Writer) (*cron.Cron, error) { +func (s *ServiceSimulation) startSchedules(stderrorWriter io.Writer) (*cron.Cron, error) { cron := cron.New() for _, schedule := range s.intent.Schedules { @@ -244,7 +244,7 @@ func (s *ServiceSimulation) Start(autoRestart bool) error { s.cmd = srvCommand s.updateStatus(Status_Running) - cron, err := s.startSchedules(stdoutWriter, stderrWriter) + cron, err := s.startSchedules(stderrWriter) if err != nil { s.updateStatus(Status_Fatal) return err @@ -277,6 +277,61 @@ func (s *ServiceSimulation) Start(autoRestart bool) error { return nil } +// TriggerSchedule manually triggers a schedule by index +// If async is true, the schedule runs in a goroutine and returns immediately +// If async is false, waits for the HTTP response +func (s *ServiceSimulation) TriggerSchedule(index int, async bool) error { + // Validate schedule index + if index < 0 || index >= len(s.intent.Schedules) { + return fmt.Errorf("schedule index %d out of range (service has %d schedules)", index, len(s.intent.Schedules)) + } + + // Check if service is running + if s.currentStatus != Status_Running { + return fmt.Errorf("service is not running (current status: %v)", s.currentStatus) + } + + schedule := s.intent.Schedules[index] + + // Build the URL + url := url.URL{ + Scheme: "http", + Host: fmt.Sprintf("localhost:%d", s.port), + Path: schedule.Path, + } + + // Function to execute the schedule + executeSchedule := func() error { + req, err := http.NewRequest(http.MethodPost, url.String(), nil) + if err != nil { + return fmt.Errorf("error creating request: %w", err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("error sending request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("request returned status %d", resp.StatusCode) + } + + return nil + } + + if async { + // Asynchronous execution + go func() { + _ = executeSchedule() + }() + return nil + } + + // Synchronous execution + return executeSchedule() +} + func NewServiceSimulation(name string, intent schema.ServiceIntent, port netx.ReservedPort, apiPort netx.ReservedPort) (*ServiceSimulation, <-chan ServiceEvent, error) { if intent.Dev == nil { return nil, nil, fmt.Errorf("service does not have a dev configuration and cannot be started") diff --git a/cli/internal/simulation/simulation.go b/cli/internal/simulation/simulation.go index 19e7972..36b1fcf 100644 --- a/cli/internal/simulation/simulation.go +++ b/cli/internal/simulation/simulation.go @@ -16,6 +16,7 @@ import ( "sync" "github.com/nitrictech/suga/cli/internal/netx" + "github.com/nitrictech/suga/cli/internal/scheduleserver" "github.com/nitrictech/suga/cli/internal/simulation/database" "github.com/nitrictech/suga/cli/internal/simulation/middleware" "github.com/nitrictech/suga/cli/internal/simulation/service" @@ -37,11 +38,12 @@ type SimulationServer struct { storagepb.UnimplementedStorageServer pubsubpb.UnimplementedPubsubServer - apiPort netx.ReservedPort - fileServerPort int - services map[string]*service.ServiceSimulation - databaseManager *database.DatabaseManager - servicesWg sync.WaitGroup + apiPort netx.ReservedPort + fileServerPort int + services map[string]*service.ServiceSimulation + databaseManager *database.DatabaseManager + servicesWg sync.WaitGroup + scheduleTriggerSv *scheduleserver.Server } const ( @@ -408,6 +410,64 @@ func styledName(name string, styleFunc func(...string) string) string { return styledNames[name] } +func (s *SimulationServer) startScheduleTriggerServer(output io.Writer) error { + // Check if any service has schedules + hasSchedules := false + for _, serviceIntent := range s.appSpec.ServiceIntents { + if len(serviceIntent.Schedules) > 0 { + hasSchedules = true + break + } + } + + if !hasSchedules { + return nil + } + + // Convert services map to interface map for schedule trigger server + servicesWithSchedules := make(map[string]scheduleserver.ServiceWithSchedules) + for name, svc := range s.services { + servicesWithSchedules[name] = svc + } + + // Create and start the schedule trigger server + triggerServer, err := scheduleserver.NewServer(servicesWithSchedules) + if err != nil { + return fmt.Errorf("failed to create schedule trigger server: %w", err) + } + + err = triggerServer.Start() + if err != nil { + return fmt.Errorf("failed to start schedule trigger server: %w", err) + } + + s.scheduleTriggerSv = triggerServer + + fmt.Fprintf(output, "%s\n\n", style.Purple("Schedule Triggers")) + + // Print clickable trigger URLs for each service's schedules + for serviceName, serviceIntent := range s.appSpec.ServiceIntents { + if len(serviceIntent.Schedules) == 0 { + continue + } + + for i, schedule := range serviceIntent.Schedules { + triggerURL := fmt.Sprintf("%s/schedules/%s/%d", triggerServer.GetURL(), serviceName, i) + fmt.Fprintf(output, "%s %s schedule %d (%s -> %s)\n Trigger: %s\n", + greenCheck, + styledName(serviceName, style.Teal), + i, + style.Gray(schedule.Cron), + style.Gray(fmt.Sprintf("POST %s", schedule.Path)), + style.Cyan(triggerURL)) + } + } + + fmt.Fprint(output, "\n") + + return nil +} + func (s *SimulationServer) Start(output io.Writer) error { err := s.startSugaApis() if err != nil { @@ -449,6 +509,14 @@ func (s *SimulationServer) Start(output io.Writer) error { fmt.Fprint(output, "\n") } + // Start schedule trigger server after services are running + if len(s.services) > 0 { + err = s.startScheduleTriggerServer(output) + if err != nil { + return err + } + } + fmt.Println(style.Gray("Use Ctrl-C to exit\n")) // block on handling service outputs for now @@ -459,6 +527,14 @@ func (s *SimulationServer) Start(output io.Writer) error { // Stop gracefully shuts down the simulation server and cleans up resources func (s *SimulationServer) Stop() error { + // Stop the schedule trigger server first + if s.scheduleTriggerSv != nil { + fmt.Println("Stopping schedule trigger server...") + if err := s.scheduleTriggerSv.Stop(); err != nil { + return fmt.Errorf("failed to stop schedule trigger server: %w", err) + } + } + // Stop services first before stopping database for serviceName, svc := range s.services { if svc != nil {