Skip to content

feat: improve quorum loss disaster recovery and host removal resilience#267

Merged
tsivaprasad merged 4 commits intomainfrom
PLAT-423-add-remove-host-option-to-update-database-endpoint
Feb 18, 2026
Merged

feat: improve quorum loss disaster recovery and host removal resilience#267
tsivaprasad merged 4 commits intomainfrom
PLAT-423-add-remove-host-option-to-update-database-endpoint

Conversation

@tsivaprasad
Copy link
Contributor

@tsivaprasad tsivaprasad commented Feb 16, 2026

Summary

This PR improve disaster recovery for quorum loss scenarios by adding ForceNewCluster safety (sentinel file, etcd auth handling), continue-on-error in host removal, orphaned replication slot cleanup, and quorum health reporting.

Changes

  • Add replication slot cleanup in update-database workflow when removing hosts
  • Add orphaned replication slot cleanup on provider nodes when subscriber nodes are removed
  • Add CleanupOrphanedSlots activity for dropping slots on surviving providers
  • Report total_members, started_members, has_quorum in etcd health check endpoint
  • Add post-recovery validation that marks databases as degraded when subscriptions are unhealthy
  • Add ForceNewCluster sentinel file to prevent repeated recovery attempts on restart
  • Handle etcd auth disable/re-enable during ForceNewCluster so in-process v3client works with auth-enabled clusters
  • Switch to continue-on-error in removeHostFromDatabases using errors.Join() so a single database failure doesn't block cleanup of others

Testing

Verification:

  1. Created a 3 node cluster

  2. Created DB and inserted some data

  3. Verified other nodes and the db entries are in sync

  4. Deleted host-1 & host-3 and now host-2 is in surviving host and lost quoram

  5. Recovering from Host-2
    On Host-2:

    Remove dead swarm nodes:

sudo docker swarm init --force-new-cluster --advertise-addr 192.168.104.3

    sudo docker node ls
    docker node demote lima-host-1 lima-host-3
    docker node rm --force lima-host-1 lima-host-3
Clean up old services that can't run (constrained to dead nodes):
    sudo docker service rm  control-plane_host-1 control-plane_host-3
    sudo docker service rm  postgres-storefront-n1-689qacsi postgres-storefront-n3-ant97dj4
Recreate registries:
`sudo docker service rm registry ghcr-mirror 2>/dev/null`
    sudo docker service create --name registry \
        --constraint 'node.hostname==lima-host-2' \
        --publish published=5000,target=5000 \
        --mount type=volume,source=registry-data,target=/var/lib/registry \
        registry:2
Copy binary:
   ` mkdir -p /tmp/control-plane-build`
    
    from build machine - `scp -F <ssh_config_file> <source_file> <remote_host>:<remote_path>`
    example:   
        scp -F ~/.lima/host-2/ssh.config control-plane/docker/control-plane/Dockerfile lima-host-2:/tmp/control-plane-build/
        scp -F ~/.lima/host-2/ssh.config control-plane/dist/control-plane_0.6.2_linux_arm64.tar.gz lima-host-2:/tmp/control-plane-build/
Build and push control-plane image:

cd /tmp/control-plane-build && sudo docker build . --build-arg=ARCHIVE_VERSION=0.6.2 --tag=127.0.0.1:5000/control-plane:latest --push

Recreate control-plane service with ForceNewCluster=true:
   ` sudo docker service rm control-plane_host-2`
   ```

sudo docker service create
--name control-plane_host-2
--constraint 'node.hostname==lima-host-2'
--env PGEDGE_IPV4_ADDRESS=192.168.104.3
--env PGEDGE_HOST_ID=host-2
--env PGEDGE_DATA_DIR=/data/control-plane
--env PGEDGE_DOCKER_SWARM__IMAGE_REPOSITORY_HOST=127.0.0.1:5001/pgedge
--env PGEDGE_ETCD_SERVER__FORCE_NEW_CLUSTER=true
--mount type=bind,source=/data/control-plane,destination=/data/control-plane
--mount type=bind,source=/var/run/docker.sock,destination=/var/run/docker.sock
--network host
127.0.0.1:5000/control-plane:latest run

6. Verify API works:
  `curl http://192.168.105.3:3000/v1/databases`

HTTP/1.1 200 OK
Content-Length: 406
Content-Type: application/json
Date: Mon, 16 Feb 2026 09:15:15 GMT

{
databases: [
{
created_at: "2026-02-16T06:52:20Z"
id: "storefront"
instances: [
{
host_id: "host-1"
id: "storefront-n1-689qacsi"
node_name: "n1"
state: "unknown"
}
{
host_id: "host-2"
id: "storefront-n2-9ptayhma"
node_name: "n2"
state: "available"
}
{
host_id: "host-3"
id: "storefront-n3-ant97dj4"
node_name: "n3"
state: "unknown"
}
]
state: "available"
updated_at: "2026-02-16T09:14:00Z"
}
]
}


7. Remove unknown state hosts and db instances:
  Using Update-Database API with remove_host query parameter to remove unknown state hosts and db instances from the cluster:

curl -X POST "http://192.168.105.4:3000/v1/databases/storefront?remove_host=host-1&remove_host=host-3"
-H "Content-Type: application/json"
-d @../demo/recovery/db_update.json


curl -X DELETE http://192.168.105.3:3000/v1/hosts/host-1?force=true
curl -X DELETE http://192.168.105.3:3000/v1/hosts/host-3?force=true

  Verify list-databases and list-hosts API's:
  `curl http://192.168.105.3:3000/v1/databases`

HTTP/1.1 200 OK
Content-Length: 230
Content-Type: application/json
Date: Mon, 16 Feb 2026 09:02:39 GMT

{
databases: [
{
created_at: "2026-02-16T06:52:20Z"
id: "storefront"
instances: [
{
host_id: "host-2"
id: "storefront-n2-9ptayhma"
node_name: "n2"
state: "available"
}
]
state: "available"
updated_at: "2026-02-16T07:48:35Z"
}
]
}


`curl http://192.168.105.3:3000/v1/hosts`

HTTP/1.1 200 OK
Content-Length: 1094
Content-Type: application/json
Date: Mon, 16 Feb 2026 08:19:49 GMT

{
hosts: [
{
cohort: {
control_available: true
member_id: "1rgb65viq0eifwjo9m29vyxyk"
type: "swarm"
}
cpus: 1
data_dir: "/data/control-plane"
default_pgedge_version: {
postgres_version: "18.1"
spock_version: "5"
}
etcd_mode: "server"
hostname: "lima-host-2"
id: "host-2"
ipv4_address: "192.168.104.3"
memory: "3.8GB"
orchestrator: "swarm"
status: {
components: {
docker: {
details: {
containers: 9
containers_paused: 0
containers_running: 4
containers_stopped: 5
swarm.control_available: true
swarm.error: ""
swarm.local_node_state: "active"
}
error: ""
healthy: true
}
etcd: {
details: {
alarms: []
errors: null
has_quorum: true
started_members: 1
total_members: 1
}
error: ""
healthy: true
}
}
state: "healthy"
updated_at: "2026-02-16T08:19:43Z"
}
supported_pgedge_versions: [
{
postgres_version: "16.10"
spock_version: "5"
}
{
postgres_version: "16.11"
spock_version: "5"
}
{
postgres_version: "17.6"
spock_version: "5"
}
{
postgres_version: "17.7"
spock_version: "5"
}
{
postgres_version: "18.0"
spock_version: "5"
}
{
postgres_version: "18.1"
spock_version: "5"
}
]
}
]
}

8. Create fresh host-1 and host-3 machines
 On host2 get join-token 

docker swarm join-token manager
docker swarm join --token SWMTKN-1-4jh28zd1xrxuntza3krxmhqthljwlvm1mius1qgu62j0gi1z95-9cf5hismsv5yosjtznu5t5e0p 192.168.104.3:2377


On host-1 and host-3 run the join command to rejoin the swarm cluster. After joining, verify the nodes are active in the swarm cluster using `docker node ls` on host-2.
 
9. Join Control Plane Cluster
 ` JOIN_TOKEN="$(curl http://host-2:3000/v1/cluster/join-token)"  `

  join host-1 & host-3:
    ```
curl -X POST http://host-1:3000/v1/cluster/join \                      
    -H 'Content-Type:application/json' \
    --data "${JOIN_TOKEN}"

    curl -X POST http://host-3:3000/v1/cluster/join \                      
    -H 'Content-Type:application/json' \
    --data "${JOIN_TOKEN}"
  1. Using update-database API to add host-1 and host-3 back to the database cluster:
curl -X POST http://host-1:3000/v1/databases/storefront \
-H 'Content-Type:application/json' \
--data '{
  "spec": {
    "nodes": [
      {"name":"n1","host_ids":["host-1"]},
      {"name":"n2","host_ids":["host-2"]},
      {"name":"n3","host_ids":["host-3"]}
    ]
  }
}'

Verify list-databases API:

curl http://192.168.105.3:3000/v1/databases

HTTP/1.1 200 OK
Content-Length: 406
Content-Type: application/json
Date: Mon, 16 Feb 2026 09:15:15 GMT

{
  databases: [
    {
      created_at: "2026-02-16T06:52:20Z"
      id: "storefront"
      instances: [
        {
          host_id: "host-1"
          id: "storefront-n1-689qacsi"
          node_name: "n1"
          state: "available"
        }
        {
          host_id: "host-2"
          id: "storefront-n2-9ptayhma"
          node_name: "n2"
          state: "available"
        }
        {
          host_id: "host-3"
          id: "storefront-n3-ant97dj4"
          node_name: "n3"
          state: "available"
        }
      ]
      state: "available"
      updated_at: "2026-02-16T09:14:00Z"
    }
  ]
}

Also verified database is in sync across all 3 nodes.

Checklist

  • Tests added or updated (unit and/or e2e, as needed)
  • Changelog

PLAT-423

@coderabbitai
Copy link

coderabbitai bot commented Feb 16, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a remove_host query parameter to UpdateDatabase, implements ReplicationSlotResource and Postgres slot helpers (terminate/drop/check), introduces CleanupInstance activity and orphan-instance cleanup, changes remove-host orchestration to aggregate per-database errors and continue, extends tests/golden fixtures for replication slots, adds EtcdServer.ForceNewCluster, and an e2e playbook.

Changes

Cohort / File(s) Summary
API & Handler
api/apiv1/design/api.go, server/internal/api/apiv1/post_init_handlers.go
Adds remove_host query parameter to update-database endpoint and forwards req.RemoveHost... into UpdateDatabase workflow call.
Config
server/internal/config/config.go
Adds ForceNewCluster bool field to EtcdServer with koanf/json tags.
Replication Slot Resource & Registry
server/internal/database/replication_slot_resource.go, server/internal/database/resources.go
Introduces ReplicationSlotResource type, identifier, lifecycle methods (noop refresh/create/update; Delete drops slot on provider) and registers the resource type.
Postgres slot helpers
server/internal/postgres/create_db.go, server/internal/database/replication_slot_advance_from_cts_resource.go
Adds TerminateReplicationSlot, DropReplicationSlot, IsReplicationSlotActive; slot-advance now checks active status and skips when active.
Workflows: CleanupInstance
server/internal/workflows/activities/cleanup_instance.go, server/internal/workflows/activities/activities.go, server/internal/workflows/common.go
Adds CleanupInstance activity and executor, registers it, and invokes it from event apply path to delete orphaned instance records when instance resources are removed.
Remove-host orchestration
server/internal/workflows/remove_host.go, server/internal/api/apiv1/post_init_handlers.go
Change per-database loop to collect non-fatal errors into a slice, log per-item failures, aggregate with errors.Join, and return aggregated error after processing all items.
Subscription & Dependencies
server/internal/database/subscription_resource.go, server/internal/database/operations/populate_nodes.go
Adds replication-slot dependency to Subscription dependencies and inserts ReplicationSlotResource entries when adding/ populating peers.
Operations graph & end-state
server/internal/database/operations/end.go, server/internal/database/operations/populate_nodes_test.go
End/populate logic now includes ReplicationSlotResource entries alongside subscriptions; tests updated to assert replication-slot resources.
Docker readiness filtering
server/internal/docker/docker.go
Adds readyNodeIDs helper and filters service tasks to nodes in Ready state when computing task counts.
Tests & Golden fixtures
server/internal/database/operations/golden_test/..., server/internal/database/operations/update_database_test.go, server/internal/database/operations/restore_database_test.go
Extensive updates/additions to golden fixtures and unit tests to reflect replication-slot create/update/delete across topology-change scenarios, plus new three-node removal tests.
E2E & Changelog
e2e/fixtures/setup_new_host.yaml, changes/unreleased/Added-20260216-213934.yaml
Adds Ansible playbook for provisioning a new host for lima provider and a changelog entry documenting quorum-loss recovery and replication-slot cleanup.
Golden removal fixtures
server/internal/database/operations/golden_test/TestUpdateDatabase/remove_one_node_from_three_node_database.json, server/internal/database/operations/golden_test/TestUpdateDatabase/remove_two_nodes_from_three_node_database.json
Adds golden fixtures enumerating expected deletion groups (replication slots, subscriptions, instances, orchestrator resources) for removing one or two nodes from a three-node database.

Poem

🐇 I nibble at slots with a careful paw,
I hop through hosts and tidy what I saw,
Terminate a walsender, drop a stale name,
Queue a cleanup task, then hop back to my game,
A rabbit's small fix keeps the cluster tame.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 15.79% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main changes: improving quorum loss disaster recovery and host removal resilience, which aligns with the substantive improvements throughout the changeset.
Description check ✅ Passed The PR description is comprehensive and well-structured, covering summary, detailed changes, extensive testing verification, and checklist completion. All required template sections are addressed.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch PLAT-423-add-remove-host-option-to-update-database-endpoint

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@tsivaprasad tsivaprasad marked this pull request as ready for review February 16, 2026 17:47
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
server/internal/etcd/embedded.go (1)

162-248: ⚠️ Potential issue | 🟠 Major

Sentinel file write failure should propagate an error rather than silently continuing.

The concern about markForceNewClusterApplied() at lines 225-227 is valid. When the sentinel file write fails, the error is only logged but not returned. On the next restart, if ForceNewCluster remains true in the config, the sentinel check will fail and ForceNewCluster will be applied again. Per the comment at line 168, this is to "prevent data corruption," so silently continuing past a write failure undermines that safety measure. Return the error instead:

Proposed fix
 	if forceNew {
 		if err := e.markForceNewClusterApplied(); err != nil {
-			e.logger.Error().Err(err).Msg("failed to write force-new-cluster sentinel file")
+			return fmt.Errorf("failed to write force-new-cluster sentinel file (required to prevent repeated ForceNewCluster on restart): %w", err)
 		}

Regarding e.initialized: It is not closed in start(), unlike in initialize() and Join(). However, this is not a practical issue. The channel is only awaited in app.go line 137 during runPreInitialization(), which is only called when the cluster is not yet initialized. On restarts of an already-initialized cluster, Start() calls start() directly without any code awaiting Initialized(), so the missing close does not block callers. The inconsistency is a minor code smell but not a bug in the current flow.

🤖 Fix all issues with AI agents
In `@api/apiv1/design/api.go`:
- Around line 278-280: The remove_host attribute currently uses
g.ArrayOf(g.String) which bypasses the Identifier validation; change its
declaration to use g.ArrayOf(Identifier) so each element enforces MinLength(1),
MaxLength(63) and the identifier pattern—update the attribute definition for
remove_host in the API design (the g.Attribute call named "remove_host") to
reference Identifier instead of g.String to match other host ID fields like
host_id.

In `@changes/unreleased/Added-20260216-213934.yaml`:
- Line 2: Update the changelog entry by fixing the typo in the YAML `body`
field: replace the leading "mprove" with "Improve" so the `body` value reads
"Improve quorum loss recovery: ForceNewCluster sentinel and auth handling,
continue-on-error host removal, replication slot cleanup, quorum health checks";
locate and edit the `body` key in the
changes/unreleased/Added-20260216-213934.yaml file.

In `@e2e/fixtures/setup_new_host.yaml`:
- Around line 45-48: The task that sets peer_ip uses
ansible_facts.eth0.ipv4.address but its when guard only checks for 'eth0' and
can fail if ipv4 or its address is absent; update the when condition for the
"Set peer_ip (Lima)" task to also verify that ansible_facts.eth0.ipv4 is defined
and that ansible_facts.eth0.ipv4.address is defined (e.g., "'eth0' in
ansible_facts and ansible_facts.eth0.ipv4 is defined and
ansible_facts.eth0.ipv4.address is defined") so the set_fact peer_ip assignment
is only executed when the nested keys exist.

In `@server/internal/database/subscription_resource.go`:
- Around line 116-119: In the Create path (in subscription_resource.go's Create
method) do not swallow a subscriber.Connection(ctx, rc,
subscriber.Spec.DatabaseName) error by returning resource.ErrNotFound; instead
return the actual connection error so the controller can retry transient
failures. Update the Create branch that currently does "if err != nil { return
resource.ErrNotFound }" to return the original err (or wrap it with context) and
leave the Delete behavior unchanged where unreachable subscribers can return
nil.

In `@server/internal/etcd/embedded.go`:
- Around line 417-420: The forceNewClusterAlreadyApplied function currently
returns false for any os.Stat error; change it so it only treats a non-existent
file as "not applied" and treats all other errors (permission, IO, etc.) as
"already applied" for safety: in EmbeddedEtcd.forceNewClusterAlreadyApplied call
os.Stat on forceNewClusterSentinelPath(), if err == nil return true, if
os.IsNotExist(err) return false, otherwise return true (i.e., treat other errors
as sentinel present).
- Around line 793-797: embedConfig currently copies
cfg.EtcdServer.ForceNewCluster into c.ForceNewCluster but start() later
overrides it after the sentinel check, while Join() calls embedConfig without
that sentinel override; remove the unconditional assignment (c.ForceNewCluster =
cfg.EtcdServer.ForceNewCluster) from embedConfig so ForceNewCluster is only
applied by start(), or alternatively guard embedConfig to ignore
cfg.EtcdServer.ForceNewCluster when invoked from Join(); update codepaths
referencing embedConfig/Join/start() to ensure Join never propagates a true
ForceNewCluster to the embedded config.

In `@server/internal/workflows/common.go`:
- Around line 253-287: The cleanupOrphanedSlots function assumes spec.Nodes has
already been filtered which can hide nodes listed in RemoveHost and prevent
scheduling cleanup; update the call-site or this function so node iteration uses
the survivor set computed by excluding names in removedNodeNames (or recompute
removedNodeNames directly from the incoming RemoveHost parameter) before looping
over spec.Nodes and scheduling ExecuteCleanupOrphanedSlots; reference the
cleanupOrphanedSlots method, spec.Nodes, removedNodeNames and the RemoveHost
source when implementing the filter/recalculation.
🧹 Nitpick comments (7)
e2e/fixtures/setup_new_host.yaml (2)

32-39: Quote the command arguments to prevent shell interpolation issues.

The Jinja2 expressions in the command value are not quoted. If the expanded path (e.g., HOME) ever contains spaces or special characters, YAML parsing or shell execution could break.

Proposed fix
-      command: ssh -F {{ ansible_env.HOME }}/.lima/{{ target_host }}/ssh.config lima-{{ target_host }} echo ok
+      command: "ssh -F {{ ansible_env.HOME }}/.lima/{{ target_host }}/ssh.config lima-{{ target_host }} echo ok"

13-39: All tasks in the first play are no-ops when provider != 'lima', leaving the new_host group empty.

If provider is not lima, no VM is deployed, no host is added to inventory, and the subsequent plays targeting new_host will silently skip. This is fine if Lima is currently the only supported provider, but there's no explicit guard or error message to alert the operator. Worth a comment or a fail task if other providers are expected in the future.

server/internal/database/subscription_resource.go (2)

63-75: Swallowing all errors as resource.ErrNotFound in Refresh loses diagnostic context.

If GetPrimaryInstance or Connection fail due to a transient issue (not a missing node), the caller won't see the real error. This is acceptable for disaster-recovery flows but could hinder debugging in normal operations. Consider logging the original error at debug level before returning ErrNotFound.


238-259: Consider adding a debug log when cleanupProviderSlot fails.

Currently all errors are silently discarded. A debug-level log would aid troubleshooting without impacting the best-effort semantics.

Proposed enhancement
 func (s *SubscriptionResource) cleanupProviderSlot(ctx context.Context, rc *resource.Context) {
 	provider, err := GetPrimaryInstance(ctx, rc, s.ProviderNode)
 	if err != nil {
 		return // Provider gone, slot gone with it
 	}
 
 	conn, err := provider.Connection(ctx, rc, provider.Spec.DatabaseName)
 	if err != nil {
 		return // Can't reach provider, best effort
 	}
 	defer conn.Close(ctx)
 
-	_ = postgres.DropReplicationSlot(
+	if err := postgres.DropReplicationSlot(
 		provider.Spec.DatabaseName,
 		s.ProviderNode,
 		s.SubscriberNode,
-	).Exec(ctx, conn)
+	).Exec(ctx, conn); err != nil {
+		rc.Logger.Debug().Err(err).
+			Str("provider", s.ProviderNode).
+			Str("subscriber", s.SubscriberNode).
+			Msg("best-effort provider slot cleanup failed")
+	}
 }
server/internal/workflows/activities/cleanup_replication_slots.go (1)

62-67: Consider distinguishing "not found" from transient errors on GetPrimaryInstance.

Currently any error from GetPrimaryInstance is treated as "provider instance doesn't exist." A transient error (e.g., etcd timeout) would silently skip slot cleanup, leaving an orphaned slot behind. Since MaxAttempts is 3, you could let transient errors propagate to trigger a retry, and only swallow "not found" errors.

That said, given the upstream caller already treats slot-cleanup failure as non-fatal (logged warning in update_database.go line 146), this is low risk.

♻️ Optional: distinguish not-found from transient errors
 	provider, err := database.GetPrimaryInstance(ctx, rc, input.ProviderNode)
 	if err != nil {
-		// Provider instance doesn't exist — slot is already gone
-		logger.Info("provider instance not found, skipping slot cleanup")
-		return &CleanupOrphanedSlotsOutput{}, nil
+		if database.IsNotFound(err) {
+			logger.Info("provider instance not found, skipping slot cleanup")
+			return &CleanupOrphanedSlotsOutput{}, nil
+		}
+		return nil, fmt.Errorf("failed to get provider instance: %w", err)
 	}
server/internal/workflows/remove_host.go (1)

138-164: Good improvement: continue-on-error prevents one DB failure from blocking others.

The errors.Join aggregation pattern is well-suited here. One nit:

Line 162 uses logger.Info for "some database update workflows failed" — this is a partial-failure scenario that likely warrants Warn level to match the severity and improve alerting/observability.

♻️ Use Warn level for partial failure summary
 	if err := errors.Join(errs...); err != nil {
-		logger.Info("some database update workflows failed", "failed_count", len(errs), "total_count", len(inputs))
+		logger.Warn("some database update workflows failed", "failed_count", len(errs), "total_count", len(inputs))
 		return err
 	}
server/internal/workflows/update_database.go (1)

199-201: Return value uses the pre-refresh current state — confirm this is intentional.

UpdateDatabaseOutput.Updated returns current (from the initial refresh at line 96), which was mutated by applyPlans. The post-removal refresh output (line 158) is only used for validation and is discarded. If callers depend on the returned state reflecting post-cleanup reality (e.g., subscription health), they'll get stale data.

This is likely acceptable since the returned state reflects the structural outcome (resources added/removed), and the database-level state (finalState) is separately persisted. Just flagging for awareness.

Comment on lines +278 to +280
g.Attribute("remove_host", g.ArrayOf(g.String), func() {
g.Description("Host IDs to treat as removed during this update. Events targeting these hosts will be skipped.")
})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's find how Identifier is defined in the design files
rg -n 'Identifier\s*=' --type=go api/apiv1/design/

Repository: pgEdge/control-plane

Length of output: 149


🏁 Script executed:

# Let's also check other host_id field usages for comparison
rg -n 'host_id.*Identifier|Attribute.*Identifier' --type=go api/apiv1/design/ -A 2

Repository: pgEdge/control-plane

Length of output: 8788


🏁 Script executed:

# Look at the actual lines in the api.go file around 278-280
sed -n '270,290p' api/apiv1/design/api.go

Repository: pgEdge/control-plane

Length of output: 798


🏁 Script executed:

# Let's find if ArrayOf(Identifier) is used anywhere else in the design files
rg -n 'ArrayOf\(Identifier\)' --type=go api/apiv1/design/

Repository: pgEdge/control-plane

Length of output: 46


🏁 Script executed:

# Let's see the full Identifier type definition to understand what validation it carries
sed -n '7,20p' api/apiv1/design/common.go

Repository: pgEdge/control-plane

Length of output: 785


🏁 Script executed:

# Let's search for other ArrayOf usages to see the pattern used in this codebase
rg -n 'ArrayOf' --type=go api/apiv1/design/ -A 1

Repository: pgEdge/control-plane

Length of output: 4464


Use g.ArrayOf(Identifier) for remove_host to match validation on other host ID fields.

The Identifier type enforces MinLength(1), MaxLength(63), and documents that identifiers must be 1-63 characters with lowercase letters and hyphens, starting and ending with a letter or number. Using g.ArrayOf(g.String) skips this validation, allowing malformed host IDs through the API layer. All other host ID fields in the design (e.g., host_id at line 183) use the Identifier type for consistency.

Proposed fix
-			g.Attribute("remove_host", g.ArrayOf(g.String), func() {
+			g.Attribute("remove_host", g.ArrayOf(Identifier), func() {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
g.Attribute("remove_host", g.ArrayOf(g.String), func() {
g.Description("Host IDs to treat as removed during this update. Events targeting these hosts will be skipped.")
})
g.Attribute("remove_host", g.ArrayOf(Identifier), func() {
g.Description("Host IDs to treat as removed during this update. Events targeting these hosts will be skipped.")
})
🤖 Prompt for AI Agents
In `@api/apiv1/design/api.go` around lines 278 - 280, The remove_host attribute
currently uses g.ArrayOf(g.String) which bypasses the Identifier validation;
change its declaration to use g.ArrayOf(Identifier) so each element enforces
MinLength(1), MaxLength(63) and the identifier pattern—update the attribute
definition for remove_host in the API design (the g.Attribute call named
"remove_host") to reference Identifier instead of g.String to match other host
ID fields like host_id.

@@ -0,0 +1,3 @@
kind: Added
body: 'mprove quorum loss recovery: ForceNewCluster sentinel and auth handling, continue-on-error host removal, replication slot cleanup, quorum health checks'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Typo in changelog body: mproveImprove.

The leading I is missing.

-body: 'mprove quorum loss recovery: ForceNewCluster sentinel and auth handling, continue-on-error host removal, replication slot cleanup, quorum health checks'
+body: 'Improve quorum loss recovery: ForceNewCluster sentinel and auth handling, continue-on-error host removal, replication slot cleanup, quorum health checks'
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
body: 'mprove quorum loss recovery: ForceNewCluster sentinel and auth handling, continue-on-error host removal, replication slot cleanup, quorum health checks'
body: 'Improve quorum loss recovery: ForceNewCluster sentinel and auth handling, continue-on-error host removal, replication slot cleanup, quorum health checks'
🤖 Prompt for AI Agents
In `@changes/unreleased/Added-20260216-213934.yaml` at line 2, Update the
changelog entry by fixing the typo in the YAML `body` field: replace the leading
"mprove" with "Improve" so the `body` value reads "Improve quorum loss recovery:
ForceNewCluster sentinel and auth handling, continue-on-error host removal,
replication slot cleanup, quorum health checks"; locate and edit the `body` key
in the changes/unreleased/Added-20260216-213934.yaml file.

Comment on lines +45 to +48
- name: Set peer_ip (Lima)
set_fact:
peer_ip: "{{ ansible_facts.eth0.ipv4.address }}"
when: "'eth0' in ansible_facts"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

The when guard doesn't protect against a missing ipv4 sub-key.

If eth0 exists in ansible_facts but lacks an ipv4 address (e.g., IPv6-only or link not up), this task will fail with an attribute error. Consider tightening the condition.

Proposed fix
     - name: Set peer_ip (Lima)
       set_fact:
         peer_ip: "{{ ansible_facts.eth0.ipv4.address }}"
-      when: "'eth0' in ansible_facts"
+      when: "'eth0' in ansible_facts and ansible_facts.eth0.ipv4 is defined"
🤖 Prompt for AI Agents
In `@e2e/fixtures/setup_new_host.yaml` around lines 45 - 48, The task that sets
peer_ip uses ansible_facts.eth0.ipv4.address but its when guard only checks for
'eth0' and can fail if ipv4 or its address is absent; update the when condition
for the "Set peer_ip (Lima)" task to also verify that ansible_facts.eth0.ipv4 is
defined and that ansible_facts.eth0.ipv4.address is defined (e.g., "'eth0' in
ansible_facts and ansible_facts.eth0.ipv4 is defined and
ansible_facts.eth0.ipv4.address is defined") so the set_fact peer_ip assignment
is only executed when the nested keys exist.

Comment on lines +417 to +420
func (e *EmbeddedEtcd) forceNewClusterAlreadyApplied() bool {
_, err := os.Stat(e.forceNewClusterSentinelPath())
return err == nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

forceNewClusterAlreadyApplied should treat stat errors as "already applied" for safety.

Currently, any os.Stat error (including permission denied) causes this to return false, meaning ForceNewCluster would proceed. For a safety sentinel designed to prevent repeated execution, the conservative approach is to treat any stat error (other than ErrNotExist) as "already applied" to avoid accidental re-runs.

Proposed fix
 func (e *EmbeddedEtcd) forceNewClusterAlreadyApplied() bool {
 	_, err := os.Stat(e.forceNewClusterSentinelPath())
-	return err == nil
+	if err == nil {
+		return true
+	}
+	if errors.Is(err, os.ErrNotExist) {
+		return false
+	}
+	e.logger.Error().Err(err).Msg("failed to check force-new-cluster sentinel file; assuming already applied for safety")
+	return true
 }
🤖 Prompt for AI Agents
In `@server/internal/etcd/embedded.go` around lines 417 - 420, The
forceNewClusterAlreadyApplied function currently returns false for any os.Stat
error; change it so it only treats a non-existent file as "not applied" and
treats all other errors (permission, IO, etc.) as "already applied" for safety:
in EmbeddedEtcd.forceNewClusterAlreadyApplied call os.Stat on
forceNewClusterSentinelPath(), if err == nil return true, if os.IsNotExist(err)
return false, otherwise return true (i.e., treat other errors as sentinel
present).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
server/internal/database/operations/populate_nodes_test.go (2)

140-140: ⚠️ Potential issue | 🟡 Minor

Pre-existing bug: t.Run(t.Name(), ...) should be t.Run(tc.name, ...).

Using t.Name() in a table-driven test loop gives every sub-test the parent's name instead of the test case name. This makes failures harder to identify and could cause golden file collisions.

Proposed fix
-		t.Run(t.Name(), func(t *testing.T) {
+		t.Run(tc.name, func(t *testing.T) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/database/operations/populate_nodes_test.go` at line 140, The
subtest is using t.Run(t.Name(), ...) which uses the parent test's name instead
of each table case; update the table-driven loop to call t.Run(tc.name, func(t
*testing.T) { ... }) (or the actual case identifier used in the table, e.g.,
tc.name or tc.desc) so each subtest is named uniquely; modify the t.Run
invocation in the test function (the one containing the table loop, e.g.,
TestPopulateNodes) to pass the case's name variable instead of t.Name().

537-537: ⚠️ Potential issue | 🟡 Minor

Same pre-existing t.Run(t.Name(), ...) bug as in TestPopulateNode.

Proposed fix
-		t.Run(t.Name(), func(t *testing.T) {
+		t.Run(tc.name, func(t *testing.T) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/database/operations/populate_nodes_test.go` at line 537, The
subtest is using the loop's t variable incorrectly (t.Run(t.Name(), ...)) which
causes the classic closure/variable-capture bug; fix by running the subtest with
the correct test-case name and capturing the loop variable before the closure
(e.g., rename the loop element to tc or tt, do `tc := tc` before calling t.Run,
and call `t.Run(tc.Name(), func(t *testing.T) { ... })`), ensuring the inner
func still receives its own *testing.T parameter.
🧹 Nitpick comments (3)
server/internal/workflows/common.go (2)

11-13: Nit: blank line between imports from the same parent module.

Lines 11-13 have a blank line separating operations and database, both under server/internal/database. Typically Go groups imports from the same module together without blank-line separators.

Suggested fix
 	"github.com/pgEdge/control-plane/server/internal/database/operations"
-
-	"github.com/pgEdge/control-plane/server/internal/database"
+	"github.com/pgEdge/control-plane/server/internal/database"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/workflows/common.go` around lines 11 - 13, Remove the extra
blank line between the two imports that share the same parent module so they are
grouped together; specifically, collapse the separation between
"github.com/pgEdge/control-plane/server/internal/database/operations" and
"github.com/pgEdge/control-plane/server/internal/database" in the import block
so imports from the same module are adjacent with no intervening blank line.

50-65: Cleanup is synchronously awaited inside the futures-collection loop — verify this is intentional.

The ExecuteCleanupInstance(...).Get(ctx) call blocks inline at Line 62, whereas all other events in the phase collect futures first and resolve them in the subsequent loop (Lines 78-101). This means if cleanup fails for one instance, later events in the same phase won't even have their futures dispatched.

This is likely acceptable given the narrow scope (only instance resources on deleted/removed hosts), but worth confirming it's the intended behavior vs. collecting the cleanup future alongside others.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/workflows/common.go` around lines 50 - 65, The synchronous
call to w.Activities.ExecuteCleanupInstance(...).Get(ctx) blocks inside the
event futures-collection loop; change it to mirror the other events by creating
the CleanupInstanceInput and calling w.Activities.ExecuteCleanupInstance(ctx,
cleanupIn) without .Get, append the returned future to the same futures slice
used by the surrounding loop, and let the subsequent futures resolution loop
handle .Get() and error handling (preserving the existing error message that
includes event.Resource.Identifier.ID) so cleanup runs concurrently with other
dispatched activities.
server/internal/database/replication_slot_resource.go (1)

66-86: Swallowed errors should be logged for observability.

Lines 69–71 and 74–77 return nil for any error (not just "not found"), which is reasonable for disaster-recovery resilience but loses diagnostic information. Per the coding guidelines, structured zerolog logging should be used — a Warn or Debug entry here would help operators distinguish "node genuinely gone" from "transient failure left an orphaned slot."

Additionally, the comments ("slot is already gone") are misleading when the error is a transient connectivity issue rather than a missing provider.

Proposed fix: log swallowed errors
+	"github.com/rs/zerolog"
 func (r *ReplicationSlotResource) Delete(ctx context.Context, rc *resource.Context) error {
 	provider, err := GetPrimaryInstance(ctx, rc, r.ProviderNode)
 	if err != nil {
-		// Provider instance doesn't exist — slot is already gone
+		zerolog.Ctx(ctx).Warn().Err(err).
+			Str("provider_node", r.ProviderNode).
+			Str("subscriber_node", r.SubscriberNode).
+			Msg("cannot resolve provider instance; treating replication slot as already gone")
 		return nil
 	}
 
 	conn, err := provider.Connection(ctx, rc, provider.Spec.DatabaseName)
 	if err != nil {
-		// Can't connect to provider — slot is already gone
+		zerolog.Ctx(ctx).Warn().Err(err).
+			Str("provider_node", r.ProviderNode).
+			Str("subscriber_node", r.SubscriberNode).
+			Msg("cannot connect to provider; treating replication slot as already gone")
 		return nil
 	}
 	defer conn.Close(ctx)

As per coding guidelines, "Use structured JSON logging with zerolog throughout the codebase, with pretty-printing enabled in dev mode."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/database/replication_slot_resource.go` around lines 66 - 86,
In ReplicationSlotResource.Delete, don’t silently swallow errors from
GetPrimaryInstance and provider.Connection; instead emit structured zerolog
entries (Warn or Debug) that include the provider node (r.ProviderNode),
subscriber node (r.SubscriberNode) and the underlying error before returning
nil, and replace the misleading comment text with the logged message; keep the
existing behavior of returning nil for resilience but add the logs so operators
can distinguish "node gone" vs transient failures (use the available logger from
the resource/context or package logger) — leave the existing stmt.Exec error
handling intact.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@server/internal/database/replication_slot_resource.go`:
- Around line 15-20: The composite ID in ReplicationSlotResourceIdentifier
currently concatenates providerNode and subscriberNode without a delimiter,
causing possible collisions; update the function
ReplicationSlotResourceIdentifier to build the ID using an explicit separator
(e.g., providerNode + "-" + subscriberNode or fmt.Sprintf("%s-%s", providerNode,
subscriberNode)) so the resulting resource.Identifier (Type:
ResourceTypeReplicationSlot) is unambiguous, and apply the same delimiter
pattern to the other composite ID generators mentioned (lag_tracker_commit_ts,
subscription, wait_for_sync_event, sync_event,
replication_slot_advance_from_cts) to ensure consistency.

---

Outside diff comments:
In `@server/internal/database/operations/populate_nodes_test.go`:
- Line 140: The subtest is using t.Run(t.Name(), ...) which uses the parent
test's name instead of each table case; update the table-driven loop to call
t.Run(tc.name, func(t *testing.T) { ... }) (or the actual case identifier used
in the table, e.g., tc.name or tc.desc) so each subtest is named uniquely;
modify the t.Run invocation in the test function (the one containing the table
loop, e.g., TestPopulateNodes) to pass the case's name variable instead of
t.Name().
- Line 537: The subtest is using the loop's t variable incorrectly
(t.Run(t.Name(), ...)) which causes the classic closure/variable-capture bug;
fix by running the subtest with the correct test-case name and capturing the
loop variable before the closure (e.g., rename the loop element to tc or tt, do
`tc := tc` before calling t.Run, and call `t.Run(tc.Name(), func(t *testing.T) {
... })`), ensuring the inner func still receives its own *testing.T parameter.

---

Nitpick comments:
In `@server/internal/database/replication_slot_resource.go`:
- Around line 66-86: In ReplicationSlotResource.Delete, don’t silently swallow
errors from GetPrimaryInstance and provider.Connection; instead emit structured
zerolog entries (Warn or Debug) that include the provider node (r.ProviderNode),
subscriber node (r.SubscriberNode) and the underlying error before returning
nil, and replace the misleading comment text with the logged message; keep the
existing behavior of returning nil for resilience but add the logs so operators
can distinguish "node gone" vs transient failures (use the available logger from
the resource/context or package logger) — leave the existing stmt.Exec error
handling intact.

In `@server/internal/workflows/common.go`:
- Around line 11-13: Remove the extra blank line between the two imports that
share the same parent module so they are grouped together; specifically,
collapse the separation between
"github.com/pgEdge/control-plane/server/internal/database/operations" and
"github.com/pgEdge/control-plane/server/internal/database" in the import block
so imports from the same module are adjacent with no intervening blank line.
- Around line 50-65: The synchronous call to
w.Activities.ExecuteCleanupInstance(...).Get(ctx) blocks inside the event
futures-collection loop; change it to mirror the other events by creating the
CleanupInstanceInput and calling w.Activities.ExecuteCleanupInstance(ctx,
cleanupIn) without .Get, append the returned future to the same futures slice
used by the surrounding loop, and let the subsequent futures resolution loop
handle .Get() and error handling (preserving the existing error message that
includes event.Resource.Identifier.ID) so cleanup runs concurrently with other
dispatched activities.

Comment on lines +15 to +20
func ReplicationSlotResourceIdentifier(providerNode, subscriberNode string) resource.Identifier {
return resource.Identifier{
Type: ResourceTypeReplicationSlot,
ID: providerNode + subscriberNode,
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

rg -n 'resource\.Identifier\{' --type=go -A3 -g '!vendor/**'

Repository: pgEdge/control-plane

Length of output: 31756


Add a separator to the composite identifier to prevent collisions.

providerNode + subscriberNode without a delimiter can produce ambiguous IDs. For example, provider "n1" + subscriber "n12" yields "n1n12", identical to provider "n1n" + subscriber "12".

Proposed fix
 func ReplicationSlotResourceIdentifier(providerNode, subscriberNode string) resource.Identifier {
 	return resource.Identifier{
 		Type: ResourceTypeReplicationSlot,
-		ID:   providerNode + subscriberNode,
+		ID:   providerNode + "/" + subscriberNode,
 	}
 }

Note: This same pattern without separators appears in several other database resources (lag_tracker_commit_ts, subscription, wait_for_sync_event, sync_event, replication_slot_advance_from_cts). Other composites in the codebase (e.g., pgbackrest_config, scale_service) use explicit separators like "-".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/database/replication_slot_resource.go` around lines 15 - 20,
The composite ID in ReplicationSlotResourceIdentifier currently concatenates
providerNode and subscriberNode without a delimiter, causing possible
collisions; update the function ReplicationSlotResourceIdentifier to build the
ID using an explicit separator (e.g., providerNode + "-" + subscriberNode or
fmt.Sprintf("%s-%s", providerNode, subscriberNode)) so the resulting
resource.Identifier (Type: ResourceTypeReplicationSlot) is unambiguous, and
apply the same delimiter pattern to the other composite ID generators mentioned
(lag_tracker_commit_ts, subscription, wait_for_sync_event, sync_event,
replication_slot_advance_from_cts) to ensure consistency.

@tsivaprasad tsivaprasad force-pushed the PLAT-423-add-remove-host-option-to-update-database-endpoint branch from ea9aad8 to ad45bdd Compare February 17, 2026 17:49
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
server/internal/docker/docker.go (1)

441-454: Consider the impact of filtering on tasks with empty NodeID.

Tasks in TaskStatePending or TaskStateAssigned may not yet have a NodeID assigned, so readyNodes.Has("") returns false and they'll be skipped. In practice this just means the loop keeps polling until tasks land on ready nodes, so it shouldn't cause incorrect behavior — but it does mean the pending counter in the debug log will under-report. If accurate debug logging matters, consider allowing tasks with an empty NodeID through the filter:

Proposed fix
 			for _, t := range tasks {
 				taskStates = append(taskStates, string(t.Status.State))
-				if !readyNodes.Has(t.NodeID) {
+				if t.NodeID != "" && !readyNodes.Has(t.NodeID) {
 					continue
 				}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/docker/docker.go` around lines 441 - 454, The loop that
counts task states currently skips any task whose NodeID is empty because it
checks readyNodes.Has(t.NodeID), which causes pending/assigned tasks
(TaskStatePending/TaskStateAssigned) to be under-counted; update the loop in the
section that calls readyNodeIDs() and iterates over tasks so that tasks with
empty NodeID are still counted for the debug counters: treat empty NodeID as a
valid case (e.g., always include when t.Status.State is TaskStatePending or
TaskStateAssigned) before applying readyNodes.Has, and ensure variables running,
stopping, failed, preparing, pending and lastFailureMsg are updated accordingly
in that branch so the debug log reflects true pending/assigned totals (look for
function/method readyNodeIDs, the tasks loop, readyNodes.Has, and the pending
variable).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@server/internal/postgres/create_db.go`:
- Around line 330-341: The query in IsReplicationSlotActive (which builds
slotName via ReplicationSlotName) can return zero rows, causing Scalar() callers
to get pgx.ErrNoRows; change the SQL to use an EXISTS expression so it always
returns a single boolean row (e.g. SELECT EXISTS(SELECT 1 FROM
pg_replication_slots WHERE slot_name = `@slot_name` AND active_pid IS NOT NULL)),
keep the same named arg "slot_name", and leave the function signature and return
type Query[bool] unchanged so callers receive a definitive true/false instead of
a missing-row error.

---

Duplicate comments:
In `@changes/unreleased/Added-20260216-213934.yaml`:
- Line 2: The changelog entry has a typo: replace "mprove quorum loss recovery:
ForceNewCluster sentinel and auth handling, continue-on-error host removal,
replication slot cleanup, quorum health checks" with "Improve quorum loss
recovery: ForceNewCluster sentinel and auth handling, continue-on-error host
removal, replication slot cleanup, quorum health checks" by correcting the
leading 'mprove' to 'Improve' in the YAML value for the body field.

In `@server/internal/database/replication_slot_resource.go`:
- Around line 15-20: The composite ID built in ReplicationSlotResourceIdentifier
currently concatenates providerNode and subscriberNode without a delimiter,
causing ambiguous collisions; change the ID construction to insert a fixed
separator (e.g., ":" or "::") between providerNode and subscriberNode so the
returned resource.Identifier has ID like provider + ":" + subscriber; update any
related callers or tests that depend on the ID format if needed and use the
existing ReplicationSlotResourceIdentifier function to ensure consistent
encoding.

---

Nitpick comments:
In `@server/internal/docker/docker.go`:
- Around line 441-454: The loop that counts task states currently skips any task
whose NodeID is empty because it checks readyNodes.Has(t.NodeID), which causes
pending/assigned tasks (TaskStatePending/TaskStateAssigned) to be under-counted;
update the loop in the section that calls readyNodeIDs() and iterates over tasks
so that tasks with empty NodeID are still counted for the debug counters: treat
empty NodeID as a valid case (e.g., always include when t.Status.State is
TaskStatePending or TaskStateAssigned) before applying readyNodes.Has, and
ensure variables running, stopping, failed, preparing, pending and
lastFailureMsg are updated accordingly in that branch so the debug log reflects
true pending/assigned totals (look for function/method readyNodeIDs, the tasks
loop, readyNodes.Has, and the pending variable).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@server/internal/database/replication_slot_resource.go`:
- Around line 67-76: The code currently returns nil when
provider.GetPrimaryInstance (the prior call) or provider.Connection(ctx, rc,
provider.Spec.DatabaseName) fails, which can hide transient infra issues; modify
the error handling in the GetPrimaryInstance failure branch and the
provider.Connection error branch to log the error details (using the component
logger used elsewhere) including context (provider ID/name and database name)
before returning, so operators can correlate connectivity problems with orphaned
slots while preserving the existing behavior of returning nil.
- Around line 65-92: The Delete method on ReplicationSlotResource currently
silences errors from postgres.TerminateReplicationSlot(...).Exec and
postgres.DropReplicationSlot(...).Exec; update ReplicationSlotResource.Delete to
log any returned errors at Warn level using the project's zerolog-based logger
(include context fields like "provider_node", "subscriber_node", "db_name", and
the error) instead of discarding them, while still returning nil to preserve the
current resilience behavior; place the logging immediately after each Exec call
and include enough structured fields to diagnose permission/SQL failures.

In `@server/internal/postgres/create_db.go`:
- Around line 308-321: DropReplicationSlot may fail silently if the slot is
still active because TerminateReplicationSlot is asynchronous; add a retry/wait
before dropping the slot to avoid "replication slot … is active" errors. Modify
the logic around DropReplicationSlot (and where TerminateReplicationSlot is
called) to either (a) perform a short loop that checks pg_replication_slots for
slot_name and waits/polls (e.g., small sleep + retry up to N times) before
executing pg_drop_replication_slot, or (b) convert the drop into a single SQL
DO/PLPGSQL block that attempts pg_drop_replication_slot and retries on active
status, and ensure errors are returned/logged instead of being discarded;
reference the ReplicationSlotName, TerminateReplicationSlot, DropReplicationSlot
and CleanupOrphanedSlots symbols to locate the related code paths.

---

Duplicate comments:
In `@server/internal/database/replication_slot_resource.go`:
- Around line 14-19: The composite ID created in
ReplicationSlotResourceIdentifier currently concatenates providerNode +
subscriberNode which can collide; change the ID generation to include a clear
delimiter (e.g., providerNode + ":" + subscriberNode or another unique
separator) when setting resource.Identifier{Type: ResourceTypeReplicationSlot,
ID: ...} so the ID is unambiguous; ensure any code that expects the old format
(parsers/consumers) is updated accordingly to handle the new delimiter.

In `@server/internal/postgres/create_db.go`:
- Around line 349-360: The IsReplicationSlotActive function currently runs a
SELECT that can return zero rows (causing Scalar() to surface pgx.ErrNoRows);
change the SQL in Query[bool] inside IsReplicationSlotActive to use SELECT
EXISTS(...) so it always returns a single boolean — e.g. SELECT EXISTS(SELECT 1
FROM pg_replication_slots WHERE slot_name = `@slot_name` AND active_pid IS NOT
NULL); keep the same named arg "slot_name" (slotName from ReplicationSlotName)
so callers and the Query[bool] shape do not change.

Comment on lines +65 to +92
func (r *ReplicationSlotResource) Delete(ctx context.Context, rc *resource.Context) error {
provider, err := GetPrimaryInstance(ctx, rc, r.ProviderNode)
if err != nil {
// Provider instance doesn't exist — slot is already gone
return nil
}

conn, err := provider.Connection(ctx, rc, provider.Spec.DatabaseName)
if err != nil {
// Can't connect to provider — slot is already gone
return nil
}
defer conn.Close(ctx)

dbName := provider.Spec.DatabaseName

// Terminate any active walsender using this slot. This is necessary
// when the subscriber has gone down and the walsender hasn't detected
// the broken connection yet — pg_drop_replication_slot fails on active
// slots.
_ = postgres.TerminateReplicationSlot(dbName, r.ProviderNode, r.SubscriberNode).
Exec(ctx, conn)

_ = postgres.DropReplicationSlot(dbName, r.ProviderNode, r.SubscriberNode).
Exec(ctx, conn)

return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Silently discarding errors hinders debugging; add structured logging.

Lines 85-89 discard errors from both TerminateReplicationSlot and DropReplicationSlot. While returning nil to treat failures as "slot already gone" is a reasonable resilience choice for this cleanup path, the complete absence of logging makes it very difficult to diagnose situations where slots are not being cleaned up (e.g., permission errors, unexpected SQL failures).

At minimum, log the errors at Warn level so operators have visibility:

Proposed fix
+	"github.com/rs/zerolog/log"
-	_ = postgres.TerminateReplicationSlot(dbName, r.ProviderNode, r.SubscriberNode).
-		Exec(ctx, conn)
+	if err := postgres.TerminateReplicationSlot(dbName, r.ProviderNode, r.SubscriberNode).
+		Exec(ctx, conn); err != nil {
+		log.Ctx(ctx).Warn().Err(err).
+			Str("provider", r.ProviderNode).
+			Str("subscriber", r.SubscriberNode).
+			Msg("failed to terminate replication slot walsender")
+	}

-	_ = postgres.DropReplicationSlot(dbName, r.ProviderNode, r.SubscriberNode).
-		Exec(ctx, conn)
+	if err := postgres.DropReplicationSlot(dbName, r.ProviderNode, r.SubscriberNode).
+		Exec(ctx, conn); err != nil {
+		log.Ctx(ctx).Warn().Err(err).
+			Str("provider", r.ProviderNode).
+			Str("subscriber", r.SubscriberNode).
+			Msg("failed to drop replication slot")
+	}

As per coding guidelines, "Use structured JSON logging with zerolog throughout the codebase."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/database/replication_slot_resource.go` around lines 65 - 92,
The Delete method on ReplicationSlotResource currently silences errors from
postgres.TerminateReplicationSlot(...).Exec and
postgres.DropReplicationSlot(...).Exec; update ReplicationSlotResource.Delete to
log any returned errors at Warn level using the project's zerolog-based logger
(include context fields like "provider_node", "subscriber_node", "db_name", and
the error) instead of discarding them, while still returning nil to preserve the
current resilience behavior; place the logging immediately after each Exec call
and include enough structured fields to diagnose permission/SQL failures.

Comment on lines +67 to +76
if err != nil {
// Provider instance doesn't exist — slot is already gone
return nil
}

conn, err := provider.Connection(ctx, rc, provider.Spec.DatabaseName)
if err != nil {
// Can't connect to provider — slot is already gone
return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Returning nil on connection error may mask persistent infrastructure issues.

Both the GetPrimaryInstance failure (Line 69) and connection failure (Line 75) are treated as "slot is already gone." This is a valid assumption during node removal, but if the provider is temporarily unreachable (e.g., network blip during a non-removal scenario), the slot will remain and continue accumulating WAL.

Given the PR also introduces CleanupOrphanedSlots as a separate safety net, this is acceptable, but at minimum these should be logged (as noted in the sibling comment) so operators can correlate cleanup gaps with transient connectivity issues.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/database/replication_slot_resource.go` around lines 67 - 76,
The code currently returns nil when provider.GetPrimaryInstance (the prior call)
or provider.Connection(ctx, rc, provider.Spec.DatabaseName) fails, which can
hide transient infra issues; modify the error handling in the GetPrimaryInstance
failure branch and the provider.Connection error branch to log the error details
(using the component logger used elsewhere) including context (provider ID/name
and database name) before returning, so operators can correlate connectivity
problems with orphaned slots while preserving the existing behavior of returning
nil.

Comment on lines +308 to +321
func DropReplicationSlot(databaseName, providerNode, subscriberNode string) ConditionalStatement {
slotName := ReplicationSlotName(databaseName, providerNode, subscriberNode)

return ConditionalStatement{
If: Query[bool]{
SQL: "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = @slot_name);",
Args: pgx.NamedArgs{"slot_name": slotName},
},
Then: Statement{
SQL: "SELECT pg_drop_replication_slot(@slot_name);",
Args: pgx.NamedArgs{"slot_name": slotName},
},
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential race: slot may still be active when drop is attempted.

pg_terminate_backend (called by TerminateReplicationSlot) is asynchronous — the walsender may not have released the slot by the time DropReplicationSlot executes. If the slot is still active, pg_drop_replication_slot will error with "replication slot … is active". Since the caller discards the error, the slot silently survives.

Consider adding a brief pg_sleep or polling loop between terminate and drop, or combining both into a single SQL block that retries. Alternatively, document that the separate CleanupOrphanedSlots activity is the safety net for this race.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/postgres/create_db.go` around lines 308 - 321,
DropReplicationSlot may fail silently if the slot is still active because
TerminateReplicationSlot is asynchronous; add a retry/wait before dropping the
slot to avoid "replication slot … is active" errors. Modify the logic around
DropReplicationSlot (and where TerminateReplicationSlot is called) to either (a)
perform a short loop that checks pg_replication_slots for slot_name and
waits/polls (e.g., small sleep + retry up to N times) before executing
pg_drop_replication_slot, or (b) convert the drop into a single SQL DO/PLPGSQL
block that attempts pg_drop_replication_slot and retries on active status, and
ensure errors are returned/logged instead of being discarded; reference the
ReplicationSlotName, TerminateReplicationSlot, DropReplicationSlot and
CleanupOrphanedSlots symbols to locate the related code paths.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
server/internal/database/replication_slot_resource.go (1)

67-99: Error propagation improved; add zerolog logging for observability.

The previous silent _ = ...Exec(...) discards have been replaced with proper error returns — good call, since this enables workflow retries on failures. However, the Delete method has zero logging: there's no trace when a slot is successfully dropped, when the provider is not found (silent nil return), or when a connection error is returned. Per the coding guidelines, zerolog should be used throughout.

✨ Suggested observability additions
 import (
 	"context"
 	"errors"
 	"fmt"

 	"github.com/pgEdge/control-plane/server/internal/postgres"
 	"github.com/pgEdge/control-plane/server/internal/resource"
+	"github.com/rs/zerolog/log"
 )
 	if errors.Is(err, resource.ErrNotFound) {
-		// Provider instance doesn't exist — slot is already gone
+		log.Ctx(ctx).Debug().
+			Str("provider", r.ProviderNode).
+			Str("subscriber", r.SubscriberNode).
+			Msg("provider instance not found; replication slot assumed gone")
 		return nil
 	}
 	if err := postgres.DropReplicationSlot(dbName, r.ProviderNode, r.SubscriberNode).
 		Exec(ctx, conn); err != nil {
 		return fmt.Errorf("failed to drop replication slot %s->%s: %w", r.ProviderNode, r.SubscriberNode, err)
 	}
+
+	log.Ctx(ctx).Info().
+		Str("provider", r.ProviderNode).
+		Str("subscriber", r.SubscriberNode).
+		Msg("replication slot dropped")

 	return nil

As per coding guidelines, "Use structured JSON logging with zerolog throughout the codebase."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/internal/database/replication_slot_resource.go` around lines 67 - 99,
The Delete method on ReplicationSlotResource lacks structured zerolog
observability; update the function to log key events using the package logger
(e.g., r.Logger or a context-derived zerolog logger) when GetPrimaryInstance
returns resource.ErrNotFound (info that slot is already gone), when
provider.Connection fails (error with context provider.Spec.DatabaseName and
ProviderNode), before and after calling
postgres.TerminateReplicationSlot(dbName, ProviderNode, SubscriberNode) and
postgres.DropReplicationSlot(... ) (debug/info for attempts and success, include
dbName, ProviderNode, SubscriberNode), and on any returned errors (use
structured fields and wrap the error). Ensure logs are emitted alongside the
existing error returns in Delete, referencing the functions GetPrimaryInstance,
provider.Connection, postgres.TerminateReplicationSlot, and
postgres.DropReplicationSlot to locate the instrumentation points.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@server/internal/database/replication_slot_resource.go`:
- Around line 16-21: The composite ID built in ReplicationSlotResourceIdentifier
currently concatenates providerNode and subscriberNode (Type
ResourceTypeReplicationSlot, ID field) without a separator which can cause
collisions; change the function to join the two node strings with a clear
separator (e.g., ":" or "|") or another unambiguous delimiter when constructing
the ID so providerNode and subscriberNode cannot run together and cause
ambiguous identifiers.

In `@server/internal/postgres/create_db.go`:
- Around line 289-321: The TerminateReplicationSlot function uses
pg_terminate_backend(active_pid) which returns immediately and can race with
DropReplicationSlot; change the Then branch SQL in TerminateReplicationSlot to
use the two-argument blocking form pg_terminate_backend(active_pid,
<timeout_ms>) (e.g., 5000) so the call waits for the walsender to exit before
proceeding, keeping the same slot_name argument usage (function:
TerminateReplicationSlot, symbol: pg_terminate_backend(active_pid, timeout_ms));
keep DropReplicationSlot unchanged but rely on the blocking termination to avoid
the "slot is active" error and spurious retries.

---

Nitpick comments:
In `@server/internal/database/replication_slot_resource.go`:
- Around line 67-99: The Delete method on ReplicationSlotResource lacks
structured zerolog observability; update the function to log key events using
the package logger (e.g., r.Logger or a context-derived zerolog logger) when
GetPrimaryInstance returns resource.ErrNotFound (info that slot is already
gone), when provider.Connection fails (error with context
provider.Spec.DatabaseName and ProviderNode), before and after calling
postgres.TerminateReplicationSlot(dbName, ProviderNode, SubscriberNode) and
postgres.DropReplicationSlot(... ) (debug/info for attempts and success, include
dbName, ProviderNode, SubscriberNode), and on any returned errors (use
structured fields and wrap the error). Ensure logs are emitted alongside the
existing error returns in Delete, referencing the functions GetPrimaryInstance,
provider.Connection, postgres.TerminateReplicationSlot, and
postgres.DropReplicationSlot to locate the instrumentation points.

Copy link
Member

@jason-lynch jason-lynch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! Thanks for you doing this.

@tsivaprasad tsivaprasad merged commit 8db3759 into main Feb 18, 2026
3 checks passed
@tsivaprasad tsivaprasad deleted the PLAT-423-add-remove-host-option-to-update-database-endpoint branch February 18, 2026 13:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants