diff --git a/changes/unreleased/Fixed-20260218-230320.yaml b/changes/unreleased/Fixed-20260218-230320.yaml new file mode 100644 index 00000000..48fc2394 --- /dev/null +++ b/changes/unreleased/Fixed-20260218-230320.yaml @@ -0,0 +1,3 @@ +kind: Fixed +body: Replace fixed 100-second sync wait with configurable health-based polling mechanism +time: 2026-02-18T23:03:20.105537+05:30 diff --git a/server/internal/database/wait_for_sync_event_resource.go b/server/internal/database/wait_for_sync_event_resource.go index b1187e3c..d53a5c9d 100644 --- a/server/internal/database/wait_for_sync_event_resource.go +++ b/server/internal/database/wait_for_sync_event_resource.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/jackc/pgx/v5" @@ -72,23 +73,46 @@ func (r *WaitForSyncEventResource) Refresh(ctx context.Context, rc *resource.Con return fmt.Errorf("sync event LSN is empty on resource %q", syncEvent.Identifier()) } - // TODO: Set wait limit - synced, err := postgres.WaitForSyncEvent(r.ProviderNode, syncEvent.SyncEventLsn, 100).Scalar(ctx, subscriberConn) - if errors.Is(err, pgx.ErrNoRows) { - return resource.ErrNotFound - } else if err != nil { - return fmt.Errorf("failed to wait for sync event on subscriber: %w", err) + const pollInterval = 10 * time.Second + + for { + if ctx.Err() != nil { + return ctx.Err() + } + + // Check subscription health first — fail early if broken. + // Only statuses where the spock worker is running can make + // progress. The others ("disabled", "down") mean sync will + // never complete. + status, err := postgres.GetSubscriptionStatus(r.ProviderNode, r.SubscriberNode). + Scalar(ctx, subscriberConn) + if err != nil { + return fmt.Errorf("failed to check subscription status: %w", err) + } + switch status { + case postgres.SubStatusInitializing, postgres.SubStatusReplicating, postgres.SubStatusUnknown: + // Worker is running — continue waiting + default: + return fmt.Errorf("subscription has unhealthy status %q: provider=%s subscriber=%s", + status, r.ProviderNode, r.SubscriberNode) + } + + // Try short wait for sync event with poll interval as timeout + synced, err := postgres.WaitForSyncEvent( + r.ProviderNode, syncEvent.SyncEventLsn, int(pollInterval.Seconds()), + ).Scalar(ctx, subscriberConn) + if errors.Is(err, pgx.ErrNoRows) { + return resource.ErrNotFound + } + if err != nil { + return fmt.Errorf("failed to wait for sync event on subscriber: %w", err) + } + if synced { + return nil + } + + // Not yet synced, but subscription is healthy — continue waiting } - if !synced { - return fmt.Errorf("replication sync not confirmed: provider=%s subscriber=%s lsn=%s timeout_seconds=%d", - r.ProviderNode, - r.SubscriberNode, - syncEvent.SyncEventLsn, - 100, - ) - } - - return nil } func (r *WaitForSyncEventResource) Create(ctx context.Context, rc *resource.Context) error { diff --git a/server/internal/postgres/create_db.go b/server/internal/postgres/create_db.go index 5ceb9964..fb7cddf0 100644 --- a/server/internal/postgres/create_db.go +++ b/server/internal/postgres/create_db.go @@ -395,6 +395,16 @@ func AdvanceReplicationSlotToLSN(databaseName, providerNode, subscriberNode stri } } +// GetSubscriptionStatus returns the current status of a specific subscription +func GetSubscriptionStatus(providerNode, subscriberNode string) Query[string] { + return Query[string]{ + SQL: `SELECT (spock.sub_show_status(@sub_name)).status;`, + Args: pgx.NamedArgs{ + "sub_name": subName(providerNode, subscriberNode), + }, + } +} + func EnableSubscription(providerNode, subscriberNode string, disabled bool) ConditionalStatement { return ConditionalStatement{ If: SubscriptionNeedsEnable(providerNode, subscriberNode, disabled), diff --git a/server/internal/postgres/info.go b/server/internal/postgres/info.go index 04a86c76..c27c1672 100644 --- a/server/internal/postgres/info.go +++ b/server/internal/postgres/info.go @@ -18,6 +18,16 @@ func GetSpockReadOnly() Query[string] { } } +// Spock subscription statuses returned by spock.sub_show_status(). +// See: https://github.com/pgEdge/spock/blob/main/src/spock_functions.c +const ( + SubStatusInitializing = "initializing" // Worker running, sync in progress + SubStatusReplicating = "replicating" // Worker running, sync ready + SubStatusUnknown = "unknown" // Worker running, no sync status record + SubStatusDisabled = "disabled" // Worker not running, subscription disabled + SubStatusDown = "down" // Worker not running, subscription enabled +) + type SubscriptionStatus struct { SubscriptionName string `json:"subscription_name"` Status string `json:"status"`