Skip to content

refactor(watcher): decouple settings revalidation from RabbitMQ consumer#395

Merged
jeffersonrodrigues92 merged 4 commits intodevelopfrom
refactor/decouple-settings-watcher
Mar 25, 2026
Merged

refactor(watcher): decouple settings revalidation from RabbitMQ consumer#395
jeffersonrodrigues92 merged 4 commits intodevelopfrom
refactor/decouple-settings-watcher

Conversation

@jeffersonrodrigues92
Copy link
Contributor

@jeffersonrodrigues92 jeffersonrodrigues92 commented Mar 25, 2026

Summary

Moves connection settings revalidation to a standalone SettingsWatcher component, completely independent of the RabbitMQ consumer. Services without RabbitMQ now get PostgreSQL pool settings revalidation.

PostgreSQL only — MongoDB is excluded because the Go driver does not support pool resize after mongo.Connect(). Mongo Manager continues to be initialized separately via middleware/bootstrap.

Problem

revalidateConnectionSettings was inside MultiTenantConsumer (RabbitMQ). If a service doesn't use RabbitMQ, it loses PostgreSQL pool settings revalidation entirely.

Solution

New standalone component: commons/tenant-manager/watcher.SettingsWatcher

BEFORE:
  MultiTenantConsumer (RabbitMQ)
    └─ syncIteration (30s)
         ├─ syncTenants()
         └─ revalidateConnectionSettings()  ← coupled to RabbitMQ

AFTER:
  MultiTenantConsumer (RabbitMQ)
    └─ syncIteration (30s)
         └─ syncTenants()                   ← only tenant discovery + messaging

  SettingsWatcher (standalone goroutine, PostgreSQL only)
    └─ revalidate (30s)
         └─ PG Manager → ConnectedTenantIDs() → ApplyConnectionSettings

Files changed

File Change
watcher/settings_watcher.go New — standalone PostgreSQL settings revalidation
watcher/settings_watcher_test.go New — 8 tests + goleak verification
postgres/manager.go Added ConnectedTenantIDs()
consumer/multi_tenant_revalidate.go Removed revalidateConnectionSettings
consumer/multi_tenant_sync.go Removed revalidation call from sync loop
consumer/multi_tenant_test.go Removed revalidation tests (moved to watcher)

Usage

watcherOpts := []tmwatcher.Option{
    tmwatcher.WithPostgresManager(pgManager),
    tmwatcher.WithInterval(30 * time.Second),
    tmwatcher.WithLogger(logger),
}

settingsWatcher := tmwatcher.NewSettingsWatcher(tenantClient, serviceName, watcherOpts...)
settingsWatcher.Start(ctx)
defer settingsWatcher.Stop()

What about MongoDB?

Mongo Manager is initialized normally via middleware/bootstrap — it just doesn't participate in the SettingsWatcher because its pool settings are immutable after creation.

Test plan

  • go test ./commons/tenant-manager/... -count=1 -race — all 15 packages pass
  • goleak: zero goroutine leaks
  • Service without RabbitMQ: SettingsWatcher revalidates PG pool settings independently
  • Service with RabbitMQ: consumer syncs tenants, watcher handles settings

🤖 Generated with Claude Code

Move connection settings revalidation to a standalone SettingsWatcher
component with its own goroutine. Services without RabbitMQ now get
PostgreSQL/MongoDB pool settings revalidation.

- New package: commons/tenant-manager/watcher
- Add ConnectedTenantIDs() to PG and Mongo managers
- Remove revalidateConnectionSettings from MultiTenantConsumer
- Keep evictSuspendedTenant on consumer (used by consumeForTenant)
- Consumer retains manager refs for connection cleanup on tenant removal

X-Lerian-Ref: 0x1
@coderabbitai
Copy link

coderabbitai bot commented Mar 25, 2026

Caution

Review failed

Pull request was closed or merged during review

Walkthrough

This PR removes MultiTenantConsumer.revalidateConnectionSettings and its tests, and removes the post-sync call to that method. It introduces a new SettingsWatcher background component that periodically revalidates PostgreSQL connection settings across registered postgres managers by fetching tenant configs and applying or closing connections as needed. A Manager.ConnectedTenantIDs() accessor and test helper WithTestConnections were added to expose connected tenant IDs. Comprehensive tests for the new watcher were added.

Sequence Diagram

sequenceDiagram
    participant App as Application
    participant SW as SettingsWatcher
    participant PM as PostgreSQL Manager
    participant TMC as Tenant Manager Client

    App->>SW: Start(ctx)
    SW->>SW: spawn goroutine with ticker

    loop Every interval
        SW->>PM: ConnectedTenantIDs()
        PM-->>SW: []tenantID

        loop For each tenant ID
            SW->>TMC: GetTenantConfig(tenantID)
            alt Success
                TMC-->>SW: config
                SW->>PM: ApplyConnectionSettings(tenantID, config)
            else Suspended/Purged Error
                TMC-->>SW: error (403)
                SW->>PM: CloseConnection(tenantID)
            end
        end
        SW->>SW: log summary
    end

    App->>SW: Stop()
    SW->>SW: cancel goroutine
    SW-->>App: stopped
Loading
🚥 Pre-merge checks | ✅ 1 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description provides comprehensive context including problem statement, solution architecture, files changed, usage examples, and test plan. However, the PR does not follow the required repository template structure with Pull Request Type checkboxes and Checklist items. Restructure the description to match the required template format, including PR Type selection and completion of the provided checklist items.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The PR title accurately describes the main architectural change: decoupling connection settings revalidation from the RabbitMQ consumer into a standalone watcher component.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@commons/tenant-manager/watcher/settings_watcher_test.go`:
- Around line 172-210: The test never hits the API-error branch because
pgManager has no tenant connections; before calling w.revalidate in
TestSettingsWatcher_HandlesAPIErrors you should add a tenant connection to
pgManager so revalidate performs the HTTP call and triggers the 500 path—create
a minimal tenant connection (using the tmpostgres manager's public add/insert
method on pgManager) pointing at the test server URL, ensure it's visible to
revalidate, then call w.revalidate(ctx) as before to exercise the error-handling
code in settings_watcher.revalidate.
- Around line 67-124: Rename the test function
TestSettingsWatcher_AppliesSettings to a name that reflects its behavior (for
example TestSettingsWatcher_NoAPICallsWhenNoConnections) so the intent is clear;
locate the test function declaration in settings_watcher_test.go (the function
that constructs tmClient, pgManager, NewSettingsWatcher and calls
w.revalidate(ctx)) and update the function name and any references accordingly
(keep the test body unchanged).

In `@commons/tenant-manager/watcher/settings_watcher.go`:
- Around line 84-96: The Start method on SettingsWatcher can be called twice and
overwrite w.done/w.cancel, orphaning the first goroutine; add a start-guard
(e.g., a boolean or check of w.done/w.cancel protected by a mutex) at the top of
Start to return early (or error) if the watcher is already running, and update
Stop to clear that guard when the goroutine exits; specifically modify
SettingsWatcher.Start to check a running flag or non-nil w.done/w.cancel (and
lock around these checks using a new sync.Mutex or sync/atomic) before creating
the context and launching loop, and ensure SettingsWatcher.Stop resets the flag
and nils w.done/w.cancel after cancelling and waiting for the goroutine so
subsequent Start calls behave correctly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: ea7ae9c4-833d-41d6-82fc-e71725f77491

📥 Commits

Reviewing files that changed from the base of the PR and between c0d80e8 and 973bcf8.

📒 Files selected for processing (7)
  • commons/tenant-manager/consumer/multi_tenant_revalidate.go
  • commons/tenant-manager/consumer/multi_tenant_sync.go
  • commons/tenant-manager/consumer/multi_tenant_test.go
  • commons/tenant-manager/mongo/manager.go
  • commons/tenant-manager/postgres/manager.go
  • commons/tenant-manager/watcher/settings_watcher.go
  • commons/tenant-manager/watcher/settings_watcher_test.go
💤 Files with no reviewable changes (3)
  • commons/tenant-manager/consumer/multi_tenant_sync.go
  • commons/tenant-manager/consumer/multi_tenant_revalidate.go
  • commons/tenant-manager/consumer/multi_tenant_test.go

Comment on lines +67 to +124
func TestSettingsWatcher_AppliesSettings(t *testing.T) {
t.Parallel()

configCalled := atomic.Int32{}

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
configCalled.Add(1)
w.Header().Set("Content-Type", "application/json")

resp := map[string]any{
"id": "tenant-abc",
"tenantSlug": "abc",
"databases": map[string]any{
"onboarding": map[string]any{
"connectionSettings": map[string]any{
"maxOpenConns": 50,
"maxIdleConns": 15,
},
},
},
}

_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()

tmClient := newTestClient(t, server.URL)
logger := testutil.NewCapturingLogger()

// Create a PG manager with a fake connection so ConnectedTenantIDs returns something.
// We use the manager's internal state to simulate an active tenant.
pgManager := tmpostgres.NewManager(tmClient, "ledger",
tmpostgres.WithModule("onboarding"),
tmpostgres.WithLogger(logger),
)

// Inject a tenant connection into the PG manager's map via GetConnection.
// Since we don't have a real DB, we'll test that the watcher at least
// calls the API by checking configCalled. The ApplyConnectionSettings
// will be a no-op since there's no real connection.

w := NewSettingsWatcher(tmClient, "ledger",
WithPostgresManager(pgManager),
WithLogger(logger),
WithInterval(20*time.Millisecond),
)

ctx := context.Background()
ctx = libCommons.ContextWithLogger(ctx, logger)

// Directly test the revalidate method with a manager that has connections.
// We need to verify the watcher uses ConnectedTenantIDs correctly.
// Since pgManager has no connections, revalidate should be a no-op.
w.revalidate(ctx)

assert.Equal(t, int32(0), configCalled.Load(),
"should not call API when no tenants have connections")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider renaming this test for clarity.

The test name TestSettingsWatcher_AppliesSettings is misleading. The test actually verifies that no API calls are made when the manager has no connections. Consider renaming to TestSettingsWatcher_NoAPICallsWhenNoConnections or similar to better reflect the actual test behavior.

🔧 Suggested rename
-func TestSettingsWatcher_AppliesSettings(t *testing.T) {
+func TestSettingsWatcher_NoAPICallsWhenNoConnections(t *testing.T) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func TestSettingsWatcher_AppliesSettings(t *testing.T) {
t.Parallel()
configCalled := atomic.Int32{}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
configCalled.Add(1)
w.Header().Set("Content-Type", "application/json")
resp := map[string]any{
"id": "tenant-abc",
"tenantSlug": "abc",
"databases": map[string]any{
"onboarding": map[string]any{
"connectionSettings": map[string]any{
"maxOpenConns": 50,
"maxIdleConns": 15,
},
},
},
}
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
tmClient := newTestClient(t, server.URL)
logger := testutil.NewCapturingLogger()
// Create a PG manager with a fake connection so ConnectedTenantIDs returns something.
// We use the manager's internal state to simulate an active tenant.
pgManager := tmpostgres.NewManager(tmClient, "ledger",
tmpostgres.WithModule("onboarding"),
tmpostgres.WithLogger(logger),
)
// Inject a tenant connection into the PG manager's map via GetConnection.
// Since we don't have a real DB, we'll test that the watcher at least
// calls the API by checking configCalled. The ApplyConnectionSettings
// will be a no-op since there's no real connection.
w := NewSettingsWatcher(tmClient, "ledger",
WithPostgresManager(pgManager),
WithLogger(logger),
WithInterval(20*time.Millisecond),
)
ctx := context.Background()
ctx = libCommons.ContextWithLogger(ctx, logger)
// Directly test the revalidate method with a manager that has connections.
// We need to verify the watcher uses ConnectedTenantIDs correctly.
// Since pgManager has no connections, revalidate should be a no-op.
w.revalidate(ctx)
assert.Equal(t, int32(0), configCalled.Load(),
"should not call API when no tenants have connections")
}
func TestSettingsWatcher_NoAPICallsWhenNoConnections(t *testing.T) {
t.Parallel()
configCalled := atomic.Int32{}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
configCalled.Add(1)
w.Header().Set("Content-Type", "application/json")
resp := map[string]any{
"id": "tenant-abc",
"tenantSlug": "abc",
"databases": map[string]any{
"onboarding": map[string]any{
"connectionSettings": map[string]any{
"maxOpenConns": 50,
"maxIdleConns": 15,
},
},
},
}
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
tmClient := newTestClient(t, server.URL)
logger := testutil.NewCapturingLogger()
// Create a PG manager with a fake connection so ConnectedTenantIDs returns something.
// We use the manager's internal state to simulate an active tenant.
pgManager := tmpostgres.NewManager(tmClient, "ledger",
tmpostgres.WithModule("onboarding"),
tmpostgres.WithLogger(logger),
)
// Inject a tenant connection into the PG manager's map via GetConnection.
// Since we don't have a real DB, we'll test that the watcher at least
// calls the API by checking configCalled. The ApplyConnectionSettings
// will be a no-op since there's no real connection.
w := NewSettingsWatcher(tmClient, "ledger",
WithPostgresManager(pgManager),
WithLogger(logger),
WithInterval(20*time.Millisecond),
)
ctx := context.Background()
ctx = libCommons.ContextWithLogger(ctx, logger)
// Directly test the revalidate method with a manager that has connections.
// We need to verify the watcher uses ConnectedTenantIDs correctly.
// Since pgManager has no connections, revalidate should be a no-op.
w.revalidate(ctx)
assert.Equal(t, int32(0), configCalled.Load(),
"should not call API when no tenants have connections")
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/watcher/settings_watcher_test.go` around lines 67 -
124, Rename the test function TestSettingsWatcher_AppliesSettings to a name that
reflects its behavior (for example
TestSettingsWatcher_NoAPICallsWhenNoConnections) so the intent is clear; locate
the test function declaration in settings_watcher_test.go (the function that
constructs tmClient, pgManager, NewSettingsWatcher and calls w.revalidate(ctx))
and update the function name and any references accordingly (keep the test body
unchanged).

Comment on lines +172 to +210
func TestSettingsWatcher_HandlesAPIErrors(t *testing.T) {
t.Parallel()

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Return 500 for all tenant config calls
if strings.Contains(r.URL.Path, "/connections/") {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(`{"error":"internal error"}`))
return
}

w.WriteHeader(http.StatusOK)
}))
defer server.Close()

tmClient := newTestClient(t, server.URL)
logger := testutil.NewCapturingLogger()

pgManager := tmpostgres.NewManager(tmClient, "ledger",
tmpostgres.WithModule("onboarding"),
tmpostgres.WithLogger(logger),
)

w := NewSettingsWatcher(tmClient, "ledger",
WithPostgresManager(pgManager),
WithLogger(logger),
WithInterval(20*time.Millisecond),
)

ctx := context.Background()
ctx = libCommons.ContextWithLogger(ctx, logger)

// Directly call revalidate — since pgManager has no connections, nothing happens.
// This test verifies the watcher doesn't panic on errors.
w.revalidate(ctx)

// No connections means no API calls, so no error logs expected.
// The test passes if no panic occurred.
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Test does not exercise the error handling path.

This test sets up a server to return 500 errors, but since pgManager has no connections, revalidate() returns early at line 143-145 without making any API calls. The error handling code path (lines 151-159 in settings_watcher.go) is never exercised.

To properly test API error handling, you would need to inject a tenant connection into the manager first.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/watcher/settings_watcher_test.go` around lines 172 -
210, The test never hits the API-error branch because pgManager has no tenant
connections; before calling w.revalidate in TestSettingsWatcher_HandlesAPIErrors
you should add a tenant connection to pgManager so revalidate performs the HTTP
call and triggers the 500 path—create a minimal tenant connection (using the
tmpostgres manager's public add/insert method on pgManager) pointing at the test
server URL, ensure it's visible to revalidate, then call w.revalidate(ctx) as
before to exercise the error-handling code in settings_watcher.revalidate.

Comment on lines +84 to +96
// Start launches the background revalidation goroutine. It returns immediately.
// The goroutine runs until Stop is called or the parent context is cancelled.
func (w *SettingsWatcher) Start(ctx context.Context) {
if w.postgres == nil && w.mongo == nil {
return // no-op: nothing to revalidate
}

ctx, cancel := context.WithCancel(ctx)
w.cancel = cancel
w.done = make(chan struct{})

go w.loop(ctx)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Calling Start() multiple times can orphan goroutines.

If Start() is called twice without calling Stop() in between, the first goroutine's done channel reference is overwritten and that goroutine becomes orphaned (it will continue running until context cancellation, but Stop() won't wait for it).

Consider adding a guard to prevent double-start, or document that callers must not call Start() multiple times.

🛡️ Suggested fix to prevent double-start
 func (w *SettingsWatcher) Start(ctx context.Context) {
 	if w.postgres == nil && w.mongo == nil {
 		return // no-op: nothing to revalidate
 	}

+	if w.cancel != nil {
+		return // already started
+	}
+
 	ctx, cancel := context.WithCancel(ctx)
 	w.cancel = cancel
 	w.done = make(chan struct{})

 	go w.loop(ctx)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/watcher/settings_watcher.go` around lines 84 - 96, The
Start method on SettingsWatcher can be called twice and overwrite
w.done/w.cancel, orphaning the first goroutine; add a start-guard (e.g., a
boolean or check of w.done/w.cancel protected by a mutex) at the top of Start to
return early (or error) if the watcher is already running, and update Stop to
clear that guard when the goroutine exits; specifically modify
SettingsWatcher.Start to check a running flag or non-nil w.done/w.cancel (and
lock around these checks using a new sync.Mutex or sync/atomic) before creating
the context and launching loop, and ensure SettingsWatcher.Stop resets the flag
and nils w.done/w.cancel after cancelling and waiting for the goroutine so
subsequent Start calls behave correctly.

Add TestMain with goleak.VerifyTestMain to catch goroutine leaks.
Fix client cleanup in tests — InMemoryCache spawns a background
goroutine that was leaked without calling Client.Close().

X-Lerian-Ref: 0x1
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@commons/tenant-manager/watcher/settings_watcher_test.go`:
- Around line 224-237: The test sets up an httptest.NewServer (server) that
returns a 403 TS-SUSPENDED response but never exercises it because the test
calls handleSuspendedTenant() directly; either remove the unused server setup or
change the test to exercise revalidate() so the code path that performs the HTTP
call to /connections/ runs and the server handler is invoked; specifically, if
you want to test the suspended-tenant HTTP path, wire the test to call
revalidate() (or the method that triggers the HTTP GET to /connections/) with
the server URL injected so the request hits the server and triggers the 403
branch, otherwise delete the httptest.NewServer setup to avoid dead code.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: ddc81065-2d1f-4906-893b-1c87cb195f14

📥 Commits

Reviewing files that changed from the base of the PR and between 973bcf8 and a840988.

📒 Files selected for processing (1)
  • commons/tenant-manager/watcher/settings_watcher_test.go

MongoDB driver does not support pool resize after creation, so
connection pool settings revalidation only applies to PostgreSQL.
Remove mongo manager, WithMongoManager option, and ConnectedTenantIDs
from mongo manager.

X-Lerian-Ref: 0x1
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
commons/tenant-manager/watcher/settings_watcher_test.go (3)

181-219: 🧹 Nitpick | 🔵 Trivial

Test does not exercise the HTTP error handling path.

The server is configured to return 500 errors for /connections/ paths, but since pgManager has no tenant connections, revalidate() returns early at the len(tenantIDs) == 0 check without making any API calls. The error handling code in revalidate() (lines 147-156 in settings_watcher.go) is never exercised.

To properly test API error handling, you would need to inject a tenant connection into the manager first, or mock ConnectedTenantIDs() to return tenant IDs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/watcher/settings_watcher_test.go` around lines 181 -
219, The test TestSettingsWatcher_HandlesAPIErrors never triggers the HTTP 500
path because pgManager has no tenant connections; update the test to ensure
revalidate() iterates tenants by either injecting a tenant connection into
pgManager (use tmpostgres.NewManager(...).CreateConnection / AddConnection
equivalent) or by mocking/overriding pgManager.ConnectedTenantIDs() to return a
non-empty slice (e.g., a single tenant ID) before calling w.revalidate(ctx), so
the HTTP handler for "/connections/" is invoked and the watcher’s error handling
is exercised.

76-133: 🧹 Nitpick | 🔵 Trivial

Test name does not match test behavior.

The test TestSettingsWatcher_AppliesSettings actually verifies that no API calls are made when the manager has no connections. The comment on line 128 confirms this: "Since pgManager has no connections, revalidate should be a no-op." Consider renaming to better reflect the actual test behavior, e.g., TestSettingsWatcher_NoAPICallsWhenNoConnections.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/watcher/settings_watcher_test.go` around lines 76 -
133, Rename the test function TestSettingsWatcher_AppliesSettings to reflect its
actual behavior (e.g., TestSettingsWatcher_NoAPICallsWhenNoConnections) and
update any references; the test is asserting that no API calls occur when the
postgres manager has no connections, so change the test name and any related
comments to match that semantic (locate the test function
TestSettingsWatcher_AppliesSettings in settings_watcher_test.go and update its
identifier and accompanying comment accordingly).

224-237: 🧹 Nitpick | 🔵 Trivial

Server setup is dead code—handler is never invoked.

The server is configured to return 403 with TS-SUSPENDED, but handleSuspendedTenant() is called directly at line 256 without making any API calls. The server handler is never exercised.

Either remove the unused server setup or restructure the test to call revalidate() with an injected tenant connection so the API call occurs and triggers the suspended tenant path.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/watcher/settings_watcher_test.go` around lines 224 -
237, The test currently sets up an httptest.Server but never calls it because
handleSuspendedTenant() is invoked directly; either remove the unused server
setup or change the test to exercise the HTTP path: call revalidate() (or the
watcher method that triggers tenant API validation) with a tenant connection
that points to the test server URL so the handler runs and returns the 403
TS-SUSPENDED response; update the test to assert the suspended behavior after
invoking revalidate() (or otherwise inject the server URL into the code path
that performs the API call) and remove direct calls to handleSuspendedTenant()
to ensure the handler is exercised.
commons/tenant-manager/watcher/settings_watcher.go (1)

82-92: ⚠️ Potential issue | 🟡 Minor

Calling Start() multiple times can orphan goroutines.

If Start() is called twice without an intervening Stop(), the first goroutine's done channel reference is overwritten and that goroutine becomes orphaned—it continues running until context cancellation but Stop() won't wait for it.

Additionally, there's a data race: w.cancel and w.done are accessed without synchronization, so concurrent calls to Start() and Stop() can race.

🛡️ Suggested fix to prevent double-start
+import "sync"
+
 type SettingsWatcher struct {
 	client   *client.Client
 	service  string
 	interval time.Duration
 	postgres *tmpostgres.Manager
 	logger   *logcompat.Logger
+	mu       sync.Mutex
 	cancel   context.CancelFunc
 	done     chan struct{}
 }

 func (w *SettingsWatcher) Start(ctx context.Context) {
 	if w.postgres == nil {
 		return // no-op: nothing to revalidate
 	}

+	w.mu.Lock()
+	defer w.mu.Unlock()
+
+	if w.cancel != nil {
+		return // already started
+	}
+
 	ctx, cancel := context.WithCancel(ctx)
 	w.cancel = cancel
 	w.done = make(chan struct{})

 	go w.loop(ctx)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/watcher/settings_watcher.go` around lines 82 - 92,
Start currently overwrites w.cancel and w.done and races with Stop; modify
SettingsWatcher to guard Start/Stop with a mutex (e.g., add a sync.Mutex on the
struct) and make Start a no-op if the watcher is already running: inside Start,
lock the mutex, check a running flag or that w.done != nil, return if already
started, otherwise create ctx, set w.cancel and w.done, then spawn go
w.loop(ctx) and unlock; update Stop to lock the same mutex, use w.cancel() if
non-nil, wait for the goroutine via the w.done channel (and set w.done and
w.cancel to nil after waiting), and unlock—this ensures no double-start, no
orphaned goroutines, and no data races on w.cancel/w.done (refer to
SettingsWatcher.Start, SettingsWatcher.Stop, w.cancel, w.done, and loop).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@commons/tenant-manager/watcher/settings_watcher.go`:
- Around line 94-103: Stop() must use the watcher mutex and clear internal state
so the watcher can be restarted: acquire w.mu, capture local references to
w.cancel and w.done (or check nil), call cancel() if non-nil, then release the
lock before waiting on the done channel to avoid deadlock, after the goroutine
finishes re-acquire w.mu and set w.cancel = nil and w.done = nil, and finally
unlock; update the SettingsWatcher.Stop method to follow this sequence (use the
existing mutex field name and w.cancel/w.done identifiers).

---

Duplicate comments:
In `@commons/tenant-manager/watcher/settings_watcher_test.go`:
- Around line 181-219: The test TestSettingsWatcher_HandlesAPIErrors never
triggers the HTTP 500 path because pgManager has no tenant connections; update
the test to ensure revalidate() iterates tenants by either injecting a tenant
connection into pgManager (use tmpostgres.NewManager(...).CreateConnection /
AddConnection equivalent) or by mocking/overriding
pgManager.ConnectedTenantIDs() to return a non-empty slice (e.g., a single
tenant ID) before calling w.revalidate(ctx), so the HTTP handler for
"/connections/" is invoked and the watcher’s error handling is exercised.
- Around line 76-133: Rename the test function
TestSettingsWatcher_AppliesSettings to reflect its actual behavior (e.g.,
TestSettingsWatcher_NoAPICallsWhenNoConnections) and update any references; the
test is asserting that no API calls occur when the postgres manager has no
connections, so change the test name and any related comments to match that
semantic (locate the test function TestSettingsWatcher_AppliesSettings in
settings_watcher_test.go and update its identifier and accompanying comment
accordingly).
- Around line 224-237: The test currently sets up an httptest.Server but never
calls it because handleSuspendedTenant() is invoked directly; either remove the
unused server setup or change the test to exercise the HTTP path: call
revalidate() (or the watcher method that triggers tenant API validation) with a
tenant connection that points to the test server URL so the handler runs and
returns the 403 TS-SUSPENDED response; update the test to assert the suspended
behavior after invoking revalidate() (or otherwise inject the server URL into
the code path that performs the API call) and remove direct calls to
handleSuspendedTenant() to ensure the handler is exercised.

In `@commons/tenant-manager/watcher/settings_watcher.go`:
- Around line 82-92: Start currently overwrites w.cancel and w.done and races
with Stop; modify SettingsWatcher to guard Start/Stop with a mutex (e.g., add a
sync.Mutex on the struct) and make Start a no-op if the watcher is already
running: inside Start, lock the mutex, check a running flag or that w.done !=
nil, return if already started, otherwise create ctx, set w.cancel and w.done,
then spawn go w.loop(ctx) and unlock; update Stop to lock the same mutex, use
w.cancel() if non-nil, wait for the goroutine via the w.done channel (and set
w.done and w.cancel to nil after waiting), and unlock—this ensures no
double-start, no orphaned goroutines, and no data races on w.cancel/w.done
(refer to SettingsWatcher.Start, SettingsWatcher.Stop, w.cancel, w.done, and
loop).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 87934d2a-eda0-4cf7-94a3-a3464f3c0530

📥 Commits

Reviewing files that changed from the base of the PR and between a840988 and abad4bb.

📒 Files selected for processing (2)
  • commons/tenant-manager/watcher/settings_watcher.go
  • commons/tenant-manager/watcher/settings_watcher_test.go

Comment on lines +94 to +103
// Stop cancels the background goroutine and waits for it to finish.
func (w *SettingsWatcher) Stop() {
if w.cancel != nil {
w.cancel()
}

if w.done != nil {
<-w.done
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Stop() should also use mutex protection and reset state.

To pair with the fix for Start(), Stop() should acquire the mutex and reset w.cancel and w.done to nil after the goroutine finishes. This ensures the watcher can be restarted after being stopped and prevents races.

🛡️ Suggested fix for Stop()
 func (w *SettingsWatcher) Stop() {
+	w.mu.Lock()
+	cancel := w.cancel
+	done := w.done
+	w.mu.Unlock()
+
-	if w.cancel != nil {
-		w.cancel()
+	if cancel != nil {
+		cancel()
 	}

-	if w.done != nil {
-		<-w.done
+	if done != nil {
+		<-done
 	}
+
+	w.mu.Lock()
+	w.cancel = nil
+	w.done = nil
+	w.mu.Unlock()
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/watcher/settings_watcher.go` around lines 94 - 103,
Stop() must use the watcher mutex and clear internal state so the watcher can be
restarted: acquire w.mu, capture local references to w.cancel and w.done (or
check nil), call cancel() if non-nil, then release the lock before waiting on
the done channel to avoid deadlock, after the goroutine finishes re-acquire w.mu
and set w.cancel = nil and w.done = nil, and finally unlock; update the
SettingsWatcher.Stop method to follow this sequence (use the existing mutex
field name and w.cancel/w.done identifiers).

jeffersonrodrigues92 added a commit to LerianStudio/midaz that referenced this pull request Mar 25, 2026
Instantiate SettingsWatcher in multi-tenant bootstrap with both PG
managers (onboarding + transaction). Runs as a Runnable alongside the
multi-tenant consumer, stopped on graceful shutdown.

Depends on: LerianStudio/lib-commons#395

X-Lerian-Ref: 0x1
… on start

- Change postgres field to managers slice — WithPostgresManager appends
  instead of overwriting. Fixes silent loss of onboarding PG manager
  when two managers are registered.
- collectTenantIDs unions and deduplicates across all managers.
- Run one revalidation immediately on Start before entering ticker loop,
  so settings changed while the service was down are applied without
  waiting 30s.
- Add WithTestConnections option for external test packages.

X-Lerian-Ref: 0x1
Copy link
Contributor

@gandalf-at-lerian gandalf-at-lerian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Summary

Architecture is solid — decoupling settings revalidation from the RabbitMQ consumer is the right call. Services without RabbitMQ now get PostgreSQL pool settings revalidation, and the consumer stays focused on tenant discovery + messaging. Clean separation.

🔴 Blocker: Single PG Manager limitation

SettingsWatcher holds a single postgres *tmpostgres.Manager field. WithPostgresManager overwrites instead of appending:

func WithPostgresManager(p *tmpostgres.Manager) Option {
    return func(w *SettingsWatcher) { w.postgres = p }
}

Any consumer with multiple PG managers (e.g., Midaz has onboarding + transaction) silently loses the first one — only the last call wins. Pool settings for the overwritten manager are never revalidated.

Fix: Change to managers []*tmpostgres.Manager, WithPostgresManager appends, collectTenantIDs does a union across all managers, revalidate iterates all managers per tenant.

🟡 No immediate revalidation on Start

The loop uses a pure ticker — first revalidation only fires after the interval (30s default). If pool settings changed while the service was down, it runs stale for up to 30s. Consider running one revalidate() call before entering the ticker loop.

🟡 Watcher does not detect connection-level config changes

revalidate() calls ApplyConnectionSettings directly but does not call detectAndReconnectPostgres. If host/port/credentials changed (not just pool sizes), the watcher won't trigger reconnection. Only the inline revalidation in GetConnection handles that path. This may be intentional (watcher handles pool tuning, reconnection is left to hot-path), but it should be documented explicitly — services relying solely on the watcher (no RabbitMQ, low traffic) could miss credential rotations until the next request.

✅ What's good

  • Functional options pattern composes well for other services
  • ConnectedTenantIDs() with RLock is clean
  • goleak in tests catches goroutine leaks
  • Suspended/purged tenant handling with connection cleanup
  • Package-level doc is clear

The single-manager issue is the only blocker. The rest are improvements worth considering before this becomes the standard pattern across services.

Copy link
Contributor

@gandalf-at-lerian gandalf-at-lerian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating review — my initial diff was stale. The current code already addresses the multi-manager concern:

  • managers []*tmpostgres.Manager with append semantics
  • Deduplicated tenant ID collection across all managers
  • All managers iterated in revalidate + handleSuspendedTenant
  • Immediate revalidation on Start before the ticker loop

Only remaining suggestion: consider documenting that the watcher handles pool tuning only — credential/host changes are handled by the inline revalidation path in GetConnection. This matters for services with low traffic that rely primarily on the watcher.

Clean implementation. LGTM. ✅

@jeffersonrodrigues92 jeffersonrodrigues92 merged commit 8f323c2 into develop Mar 25, 2026
2 of 3 checks passed
@jeffersonrodrigues92 jeffersonrodrigues92 deleted the refactor/decouple-settings-watcher branch March 25, 2026 02:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants