Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ type Config struct {

// OAuth providers keyed by provider name (e.g. "google", "okta").
OAuthProviders map[string]*OAuthProviderConfig

// Engine is an optional engine lifecycle manager used by the workflow
// deploy/stop endpoints to actually start and stop workflow engines.
Engine EngineRunner
}
Comment on lines 22 to 29
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description says only cmd/server/main.go is modified, but this PR also changes the api/ package (router + workflow handler). Please update the PR description/test plan to reflect the additional files/behavior being changed, since it impacts review scope and release notes.

Copilot uses AI. Check for mistakes.

// Stores groups all store interfaces needed by the API.
Expand Down Expand Up @@ -109,6 +113,9 @@ func NewRouterWithIAM(stores Stores, cfg Config, iamResolver *iam.IAMResolver) h

// --- Workflows ---
wfH := NewWorkflowHandler(stores.Workflows, stores.Projects, stores.Memberships, permissions)
if cfg.Engine != nil {
wfH.WithEngine(cfg.Engine)
}
mux.Handle("POST /api/v1/projects/{pid}/workflows", mw.RequireAuth(http.HandlerFunc(wfH.Create)))
mux.Handle("GET /api/v1/workflows", mw.RequireAuth(http.HandlerFunc(wfH.ListAll)))
mux.Handle("GET /api/v1/projects/{pid}/workflows", mw.RequireAuth(http.HandlerFunc(wfH.ListInProject)))
Expand Down
27 changes: 27 additions & 0 deletions api/workflow_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"context"
"encoding/json"
"errors"
"net/http"
Expand All @@ -11,12 +12,20 @@ import (
"github.com/google/uuid"
)

// EngineRunner is an optional engine lifecycle manager that the workflow handler
// uses to actually start and stop workflow engines when deploying/stopping workflows.
type EngineRunner interface {
DeployWorkflow(ctx context.Context, workflowID uuid.UUID) error
StopWorkflow(ctx context.Context, workflowID uuid.UUID) error
}

// WorkflowHandler handles workflow CRUD and lifecycle endpoints.
type WorkflowHandler struct {
workflows store.WorkflowStore
projects store.ProjectStore
memberships store.MembershipStore
permissions *PermissionService
engine EngineRunner
}

// NewWorkflowHandler creates a new WorkflowHandler.
Expand All @@ -29,6 +38,12 @@ func NewWorkflowHandler(workflows store.WorkflowStore, projects store.ProjectSto
}
}

// WithEngine sets the optional engine runner used by Deploy and Stop.
func (h *WorkflowHandler) WithEngine(engine EngineRunner) *WorkflowHandler {
h.engine = engine
return h
}

// Create handles POST /api/v1/projects/{pid}/workflows.
func (h *WorkflowHandler) Create(w http.ResponseWriter, r *http.Request) {
user := UserFromContext(r.Context())
Expand Down Expand Up @@ -259,6 +274,12 @@ func (h *WorkflowHandler) Deploy(w http.ResponseWriter, r *http.Request) {
WriteError(w, http.StatusInternalServerError, "internal error")
return
}
if h.engine != nil {
if err := h.engine.DeployWorkflow(r.Context(), id); err != nil {
WriteError(w, http.StatusInternalServerError, "failed to deploy workflow engine")
return
}
}
Comment on lines +277 to +282
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New behavior when h.engine != nil (calling DeployWorkflow/StopWorkflow and handling errors) isn’t covered by existing tests in api/workflow_handler_test.go (which only exercises the nil-engine path). Add tests for the engine-runner path, including success and failure cases, to prevent regressions and to validate HTTP status mapping.

Copilot generated this review using guidance from organization custom instructions.
wf.Status = store.WorkflowStatusActive
wf.UpdatedAt = time.Now()
if err := h.workflows.Update(r.Context(), wf); err != nil {
Expand All @@ -284,6 +305,12 @@ func (h *WorkflowHandler) Stop(w http.ResponseWriter, r *http.Request) {
WriteError(w, http.StatusInternalServerError, "internal error")
return
}
if h.engine != nil {
if err := h.engine.StopWorkflow(r.Context(), id); err != nil {
WriteError(w, http.StatusInternalServerError, "failed to stop workflow engine")
return
}
}
wf.Status = store.WorkflowStatusStopped
wf.UpdatedAt = time.Now()
if err := h.workflows.Update(r.Context(), wf); err != nil {
Expand Down
144 changes: 126 additions & 18 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
"context"
"database/sql"
"encoding/json"
"errors"

Check failure on line 7 in cmd/server/main.go

View workflow job for this annotation

GitHub Actions / Test (Go 1.26)

other declaration of errors

Check failure on line 7 in cmd/server/main.go

View workflow job for this annotation

GitHub Actions / Build

other declaration of errors
"flag"
"errors"

Check failure on line 9 in cmd/server/main.go

View workflow job for this annotation

GitHub Actions / Test (Go 1.26)

"errors" imported and not used

Check failure on line 9 in cmd/server/main.go

View workflow job for this annotation

GitHub Actions / Test (Go 1.26)

errors redeclared in this block

Check failure on line 9 in cmd/server/main.go

View workflow job for this annotation

GitHub Actions / Build

"errors" imported and not used

Check failure on line 9 in cmd/server/main.go

View workflow job for this annotation

GitHub Actions / Build

errors redeclared in this block
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import block includes errors twice, which will make this file fail to compile (imported and not used/duplicate import). Remove the duplicate and keep a single errors import.

Suggested change
"errors"

Copilot uses AI. Check for mistakes.
"fmt"
"log"
"log/slog"
Expand All @@ -24,7 +25,7 @@
"github.com/GoCodeAlone/workflow/ai"
copilotai "github.com/GoCodeAlone/workflow/ai/copilot"
"github.com/GoCodeAlone/workflow/ai/llm"
workflowapi "github.com/GoCodeAlone/workflow/api"
apihandler "github.com/GoCodeAlone/workflow/api"
"github.com/GoCodeAlone/workflow/audit"
"github.com/GoCodeAlone/workflow/billing"
"github.com/GoCodeAlone/workflow/bundle"
Expand Down Expand Up @@ -82,10 +83,11 @@
anthropicModel = flag.String("anthropic-model", "", "Anthropic model name")

// Multi-workflow mode flags
databaseDSN = flag.String("database-dsn", "", "PostgreSQL connection string for multi-workflow mode")
jwtSecret = flag.String("jwt-secret", "", "JWT signing secret for API authentication")
adminEmail = flag.String("admin-email", "", "Initial admin user email (first-run bootstrap)")
adminPassword = flag.String("admin-password", "", "Initial admin user password (first-run bootstrap)")
databaseDSN = flag.String("database-dsn", "", "PostgreSQL connection string for multi-workflow mode")
jwtSecret = flag.String("jwt-secret", "", "JWT signing secret for API authentication")
adminEmail = flag.String("admin-email", "", "Initial admin user email (first-run bootstrap)")
adminPassword = flag.String("admin-password", "", "Initial admin user password (first-run bootstrap)")
multiWorkflowAddr = flag.String("multi-workflow-addr", ":8090", "HTTP listen address for multi-workflow REST API")

// License flags
licenseKey = flag.String("license-key", "", "License key for the workflow engine (or set WORKFLOW_LICENSE_KEY env var)")
Expand Down Expand Up @@ -1335,25 +1337,131 @@
}))

if *databaseDSN != "" {
// Multi-workflow mode: PG-backed engine manager with REST API.
logger.Info("Starting in multi-workflow mode")

if err := runMultiWorkflow(logger); err != nil {
log.Fatalf("Multi-workflow error: %v", err)
// Multi-workflow mode: connect to PostgreSQL, run migrations, start the
// REST API router on a dedicated port. Single-config mode is skipped.
if *jwtSecret == "" {
log.Fatal("multi-workflow mode: --jwt-secret is required")
}
logger.Info("Starting in multi-workflow mode",
"database_dsn_set", true,
"admin_email_set", *adminEmail != "",
"api_addr", *multiWorkflowAddr,
)
dbCtx, dbCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer dbCancel()
pgStore, pgErr := evstore.NewPGStore(dbCtx, evstore.PGConfig{URL: *databaseDSN})
if pgErr != nil {
log.Fatalf("multi-workflow mode: failed to connect to PostgreSQL: %v", pgErr) //nolint:gocritic // exitAfterDefer: intentional, cleanup is best-effort
}
migrator := evstore.NewMigrator(pgStore.Pool())
if mErr := migrator.Migrate(dbCtx); mErr != nil {
log.Fatalf("multi-workflow mode: database migration failed: %v", mErr)
}
logger.Info("multi-workflow mode: database migrations applied")

// Bootstrap admin user on first run.
if *adminEmail != "" && *adminPassword != "" {
_, lookupErr := pgStore.Users().GetByEmail(context.Background(), *adminEmail)
switch {
case errors.Is(lookupErr, evstore.ErrNotFound):
Comment on lines +1363 to +1366
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admin bootstrap looks up the user with context.Background() and uses the raw --admin-email value. Consider reusing the existing timed context (dbCtx) and normalizing the email (trim + lower-case) to match the auth handlers’ behavior, otherwise the bootstrap user may not be able to log in if the stored email casing differs.

Copilot uses AI. Check for mistakes.
hash, hashErr := bcrypt.GenerateFromPassword([]byte(*adminPassword), bcrypt.DefaultCost)
if hashErr != nil {
log.Fatalf("multi-workflow mode: failed to hash admin password: %v", hashErr)
}
now := time.Now()
adminUser := &evstore.User{
ID: uuid.New(),
Email: *adminEmail,
PasswordHash: string(hash),
DisplayName: "Admin",
Active: true,
CreatedAt: now,
UpdatedAt: now,
}
if createErr := pgStore.Users().Create(context.Background(), adminUser); createErr != nil {
logger.Warn("multi-workflow mode: failed to create admin user (may already exist)", "error", createErr)
Comment on lines +1381 to +1382
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admin bootstrap creates the user with context.Background(), so these DB operations ignore the connection/migration timeout and any shutdown cancellation. Use the same bounded context used for startup (or a derived context) so a hung DB call can’t block shutdown indefinitely.

Copilot uses AI. Check for mistakes.
} else {
logger.Info("multi-workflow mode: created bootstrap admin user", "email", *adminEmail)
}
case lookupErr != nil:
log.Fatalf("multi-workflow mode: failed to check for admin user: %v", lookupErr)
default:
logger.Info("multi-workflow mode: admin user already exists, skipping bootstrap", "email", *adminEmail)
}
}
fmt.Println("Shutdown complete")

engineBuilder := func(cfg *config.WorkflowConfig, lg *slog.Logger) (*workflow.StdEngine, modular.Application, error) {
eng, _, _, buildErr := buildEngine(cfg, lg)
if buildErr != nil {
return nil, nil, buildErr
}
app := eng.GetApp()
return eng, app, nil
}
engineMgr := workflow.NewWorkflowEngineManager(pgStore.Workflows(), pgStore.CrossWorkflowLinks(), logger, engineBuilder)

apiRouter := apihandler.NewRouter(apihandler.Stores{
Users: pgStore.Users(),
Sessions: pgStore.Sessions(),
Companies: pgStore.Companies(),
Projects: pgStore.Projects(),
Workflows: pgStore.Workflows(),
Memberships: pgStore.Memberships(),
Links: pgStore.CrossWorkflowLinks(),
Executions: pgStore.Executions(),
Logs: pgStore.Logs(),
Audit: pgStore.Audit(),
IAM: pgStore.IAM(),
}, apihandler.Config{JWTSecret: *jwtSecret, Engine: engineMgr})

// Bind the listener eagerly so port conflicts are detected before the
// deferred cleanup is registered (fail-fast instead of silent goroutine death).
listener, listenErr := net.Listen("tcp", *multiWorkflowAddr)
if listenErr != nil {
log.Fatalf("multi-workflow mode: failed to listen on %s: %v", *multiWorkflowAddr, listenErr)
}

apiServer := &http.Server{
Handler: apiRouter,
ReadHeaderTimeout: 10 * time.Second,
}
go func() {
logger.Info("multi-workflow API listening", "addr", *multiWorkflowAddr)
if sErr := apiServer.Serve(listener); sErr != nil && sErr != http.ErrServerClosed {
logger.Error("multi-workflow API server error", "error", sErr)
}
}()
defer func() {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
if sErr := apiServer.Shutdown(shutdownCtx); sErr != nil {
logger.Warn("multi-workflow API server shutdown error", "error", sErr)
}
if sErr := engineMgr.StopAll(shutdownCtx); sErr != nil {
logger.Warn("multi-workflow engine manager shutdown error", "error", sErr)
}
if shutdownCtx.Err() == context.DeadlineExceeded {
logger.Warn("multi-workflow shutdown timed out; some in-flight operations may be incomplete")
}
pgStore.Close()
}()

// Block until a termination signal is received, then let deferred cleanup run.
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
fmt.Printf("Multi-workflow API on %s\n", *multiWorkflowAddr)
<-sigCh
fmt.Println("Shutting down multi-workflow mode...")
return
}

// Existing single-config behavior
cfg, err := loadConfig(logger)
if err != nil {
log.Fatalf("Configuration error: %v", err)
log.Fatalf("Configuration error: %v", err) //nolint:gocritic // exitAfterDefer: intentional, cleanup is best-effort
}

app, err := setup(logger, cfg)
if err != nil {
log.Fatalf("Setup error: %v", err)
log.Fatalf("Setup error: %v", err) //nolint:gocritic // exitAfterDefer: intentional, cleanup is best-effort
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -1448,7 +1556,7 @@
secret = "dev-secret-change-me"
logger.Error("No JWT secret configured — using insecure default; set JWT_SECRET env var or -jwt-secret flag")
Comment on lines 1556 to 1557
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runMultiWorkflow still exists and is no longer referenced from main(), but it’s being kept partially updated (e.g., switching to apihandler.*). This risks the inlined multi-workflow startup logic and runMultiWorkflow drifting further apart (and runMultiWorkflow currently allows an insecure default JWT secret). Consider deleting runMultiWorkflow or refactoring main() to call it so there’s a single implementation to maintain.

Suggested change
secret = "dev-secret-change-me"
logger.Error("No JWT secret configured — using insecure default; set JWT_SECRET env var or -jwt-secret flag")
log.Fatal("No JWT secret configured — refusing to start; set JWT_SECRET env var or -jwt-secret flag")

Copilot uses AI. Check for mistakes.
}
stores := workflowapi.Stores{
stores := apihandler.Stores{
Users: pg.Users(),
Sessions: pg.Sessions(),
Companies: pg.Companies(),
Expand All @@ -1461,13 +1569,13 @@
Audit: pg.Audit(),
IAM: pg.IAM(),
}
apiCfg := workflowapi.Config{
apiCfg := apihandler.Config{
JWTSecret: secret,
JWTIssuer: "workflow-server",
AccessTTL: 15 * time.Minute,
RefreshTTL: 7 * 24 * time.Hour,
}
apiRouter := workflowapi.NewRouter(stores, apiCfg)
apiRouter := apihandler.NewRouter(stores, apiCfg)

// 7. Set up admin UI and management infrastructure for workflow management
singleCfg, err := loadConfig(logger)
Expand Down
Loading