Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changes/unreleased/Fixed-20260218-230320.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Fixed
body: Replace fixed 100-second sync wait with configurable health-based polling mechanism
time: 2026-02-18T23:03:20.105537+05:30
56 changes: 40 additions & 16 deletions server/internal/database/wait_for_sync_event_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/jackc/pgx/v5"

Expand Down Expand Up @@ -72,23 +73,46 @@ func (r *WaitForSyncEventResource) Refresh(ctx context.Context, rc *resource.Con
return fmt.Errorf("sync event LSN is empty on resource %q", syncEvent.Identifier())
}

// TODO: Set wait limit
synced, err := postgres.WaitForSyncEvent(r.ProviderNode, syncEvent.SyncEventLsn, 100).Scalar(ctx, subscriberConn)
if errors.Is(err, pgx.ErrNoRows) {
return resource.ErrNotFound
} else if err != nil {
return fmt.Errorf("failed to wait for sync event on subscriber: %w", err)
const pollInterval = 10 * time.Second

for {
if ctx.Err() != nil {
return ctx.Err()
}

// Check subscription health first — fail early if broken.
// Only statuses where the spock worker is running can make
// progress. The others ("disabled", "down") mean sync will
// never complete.
status, err := postgres.GetSubscriptionStatus(r.ProviderNode, r.SubscriberNode).
Scalar(ctx, subscriberConn)
if err != nil {
return fmt.Errorf("failed to check subscription status: %w", err)
}
switch status {
case postgres.SubStatusInitializing, postgres.SubStatusReplicating, postgres.SubStatusUnknown:
// Worker is running — continue waiting
default:
return fmt.Errorf("subscription has unhealthy status %q: provider=%s subscriber=%s",
status, r.ProviderNode, r.SubscriberNode)
}

// Try short wait for sync event with poll interval as timeout
synced, err := postgres.WaitForSyncEvent(
r.ProviderNode, syncEvent.SyncEventLsn, int(pollInterval.Seconds()),
).Scalar(ctx, subscriberConn)
if errors.Is(err, pgx.ErrNoRows) {
return resource.ErrNotFound
}
if err != nil {
return fmt.Errorf("failed to wait for sync event on subscriber: %w", err)
}
if synced {
return nil
}

// Not yet synced, but subscription is healthy — continue waiting
}
if !synced {
return fmt.Errorf("replication sync not confirmed: provider=%s subscriber=%s lsn=%s timeout_seconds=%d",
r.ProviderNode,
r.SubscriberNode,
syncEvent.SyncEventLsn,
100,
)
}

return nil
}

func (r *WaitForSyncEventResource) Create(ctx context.Context, rc *resource.Context) error {
Expand Down
10 changes: 10 additions & 0 deletions server/internal/postgres/create_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,16 @@ func AdvanceReplicationSlotToLSN(databaseName, providerNode, subscriberNode stri
}
}

// GetSubscriptionStatus returns the current status of a specific subscription
func GetSubscriptionStatus(providerNode, subscriberNode string) Query[string] {
return Query[string]{
SQL: `SELECT (spock.sub_show_status(@sub_name)).status;`,
Args: pgx.NamedArgs{
"sub_name": subName(providerNode, subscriberNode),
},
}
}

func EnableSubscription(providerNode, subscriberNode string, disabled bool) ConditionalStatement {
return ConditionalStatement{
If: SubscriptionNeedsEnable(providerNode, subscriberNode, disabled),
Expand Down
10 changes: 10 additions & 0 deletions server/internal/postgres/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ func GetSpockReadOnly() Query[string] {
}
}

// Spock subscription statuses returned by spock.sub_show_status().
// See: https://github.com/pgEdge/spock/blob/main/src/spock_functions.c
const (
SubStatusInitializing = "initializing" // Worker running, sync in progress
SubStatusReplicating = "replicating" // Worker running, sync ready
SubStatusUnknown = "unknown" // Worker running, no sync status record
SubStatusDisabled = "disabled" // Worker not running, subscription disabled
SubStatusDown = "down" // Worker not running, subscription enabled
)

type SubscriptionStatus struct {
SubscriptionName string `json:"subscription_name"`
Status string `json:"status"`
Expand Down