refactor(watcher): decouple settings revalidation from RabbitMQ consumer#395
Conversation
Move connection settings revalidation to a standalone SettingsWatcher component with its own goroutine. Services without RabbitMQ now get PostgreSQL/MongoDB pool settings revalidation. - New package: commons/tenant-manager/watcher - Add ConnectedTenantIDs() to PG and Mongo managers - Remove revalidateConnectionSettings from MultiTenantConsumer - Keep evictSuspendedTenant on consumer (used by consumeForTenant) - Consumer retains manager refs for connection cleanup on tenant removal X-Lerian-Ref: 0x1
|
Caution Review failedPull request was closed or merged during review WalkthroughThis PR removes Sequence DiagramsequenceDiagram
participant App as Application
participant SW as SettingsWatcher
participant PM as PostgreSQL Manager
participant TMC as Tenant Manager Client
App->>SW: Start(ctx)
SW->>SW: spawn goroutine with ticker
loop Every interval
SW->>PM: ConnectedTenantIDs()
PM-->>SW: []tenantID
loop For each tenant ID
SW->>TMC: GetTenantConfig(tenantID)
alt Success
TMC-->>SW: config
SW->>PM: ApplyConnectionSettings(tenantID, config)
else Suspended/Purged Error
TMC-->>SW: error (403)
SW->>PM: CloseConnection(tenantID)
end
end
SW->>SW: log summary
end
App->>SW: Stop()
SW->>SW: cancel goroutine
SW-->>App: stopped
🚥 Pre-merge checks | ✅ 1 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@commons/tenant-manager/watcher/settings_watcher_test.go`:
- Around line 172-210: The test never hits the API-error branch because
pgManager has no tenant connections; before calling w.revalidate in
TestSettingsWatcher_HandlesAPIErrors you should add a tenant connection to
pgManager so revalidate performs the HTTP call and triggers the 500 path—create
a minimal tenant connection (using the tmpostgres manager's public add/insert
method on pgManager) pointing at the test server URL, ensure it's visible to
revalidate, then call w.revalidate(ctx) as before to exercise the error-handling
code in settings_watcher.revalidate.
- Around line 67-124: Rename the test function
TestSettingsWatcher_AppliesSettings to a name that reflects its behavior (for
example TestSettingsWatcher_NoAPICallsWhenNoConnections) so the intent is clear;
locate the test function declaration in settings_watcher_test.go (the function
that constructs tmClient, pgManager, NewSettingsWatcher and calls
w.revalidate(ctx)) and update the function name and any references accordingly
(keep the test body unchanged).
In `@commons/tenant-manager/watcher/settings_watcher.go`:
- Around line 84-96: The Start method on SettingsWatcher can be called twice and
overwrite w.done/w.cancel, orphaning the first goroutine; add a start-guard
(e.g., a boolean or check of w.done/w.cancel protected by a mutex) at the top of
Start to return early (or error) if the watcher is already running, and update
Stop to clear that guard when the goroutine exits; specifically modify
SettingsWatcher.Start to check a running flag or non-nil w.done/w.cancel (and
lock around these checks using a new sync.Mutex or sync/atomic) before creating
the context and launching loop, and ensure SettingsWatcher.Stop resets the flag
and nils w.done/w.cancel after cancelling and waiting for the goroutine so
subsequent Start calls behave correctly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: ea7ae9c4-833d-41d6-82fc-e71725f77491
📒 Files selected for processing (7)
commons/tenant-manager/consumer/multi_tenant_revalidate.gocommons/tenant-manager/consumer/multi_tenant_sync.gocommons/tenant-manager/consumer/multi_tenant_test.gocommons/tenant-manager/mongo/manager.gocommons/tenant-manager/postgres/manager.gocommons/tenant-manager/watcher/settings_watcher.gocommons/tenant-manager/watcher/settings_watcher_test.go
💤 Files with no reviewable changes (3)
- commons/tenant-manager/consumer/multi_tenant_sync.go
- commons/tenant-manager/consumer/multi_tenant_revalidate.go
- commons/tenant-manager/consumer/multi_tenant_test.go
| func TestSettingsWatcher_AppliesSettings(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| configCalled := atomic.Int32{} | ||
|
|
||
| server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| configCalled.Add(1) | ||
| w.Header().Set("Content-Type", "application/json") | ||
|
|
||
| resp := map[string]any{ | ||
| "id": "tenant-abc", | ||
| "tenantSlug": "abc", | ||
| "databases": map[string]any{ | ||
| "onboarding": map[string]any{ | ||
| "connectionSettings": map[string]any{ | ||
| "maxOpenConns": 50, | ||
| "maxIdleConns": 15, | ||
| }, | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| _ = json.NewEncoder(w).Encode(resp) | ||
| })) | ||
| defer server.Close() | ||
|
|
||
| tmClient := newTestClient(t, server.URL) | ||
| logger := testutil.NewCapturingLogger() | ||
|
|
||
| // Create a PG manager with a fake connection so ConnectedTenantIDs returns something. | ||
| // We use the manager's internal state to simulate an active tenant. | ||
| pgManager := tmpostgres.NewManager(tmClient, "ledger", | ||
| tmpostgres.WithModule("onboarding"), | ||
| tmpostgres.WithLogger(logger), | ||
| ) | ||
|
|
||
| // Inject a tenant connection into the PG manager's map via GetConnection. | ||
| // Since we don't have a real DB, we'll test that the watcher at least | ||
| // calls the API by checking configCalled. The ApplyConnectionSettings | ||
| // will be a no-op since there's no real connection. | ||
|
|
||
| w := NewSettingsWatcher(tmClient, "ledger", | ||
| WithPostgresManager(pgManager), | ||
| WithLogger(logger), | ||
| WithInterval(20*time.Millisecond), | ||
| ) | ||
|
|
||
| ctx := context.Background() | ||
| ctx = libCommons.ContextWithLogger(ctx, logger) | ||
|
|
||
| // Directly test the revalidate method with a manager that has connections. | ||
| // We need to verify the watcher uses ConnectedTenantIDs correctly. | ||
| // Since pgManager has no connections, revalidate should be a no-op. | ||
| w.revalidate(ctx) | ||
|
|
||
| assert.Equal(t, int32(0), configCalled.Load(), | ||
| "should not call API when no tenants have connections") | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider renaming this test for clarity.
The test name TestSettingsWatcher_AppliesSettings is misleading. The test actually verifies that no API calls are made when the manager has no connections. Consider renaming to TestSettingsWatcher_NoAPICallsWhenNoConnections or similar to better reflect the actual test behavior.
🔧 Suggested rename
-func TestSettingsWatcher_AppliesSettings(t *testing.T) {
+func TestSettingsWatcher_NoAPICallsWhenNoConnections(t *testing.T) {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func TestSettingsWatcher_AppliesSettings(t *testing.T) { | |
| t.Parallel() | |
| configCalled := atomic.Int32{} | |
| server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
| configCalled.Add(1) | |
| w.Header().Set("Content-Type", "application/json") | |
| resp := map[string]any{ | |
| "id": "tenant-abc", | |
| "tenantSlug": "abc", | |
| "databases": map[string]any{ | |
| "onboarding": map[string]any{ | |
| "connectionSettings": map[string]any{ | |
| "maxOpenConns": 50, | |
| "maxIdleConns": 15, | |
| }, | |
| }, | |
| }, | |
| } | |
| _ = json.NewEncoder(w).Encode(resp) | |
| })) | |
| defer server.Close() | |
| tmClient := newTestClient(t, server.URL) | |
| logger := testutil.NewCapturingLogger() | |
| // Create a PG manager with a fake connection so ConnectedTenantIDs returns something. | |
| // We use the manager's internal state to simulate an active tenant. | |
| pgManager := tmpostgres.NewManager(tmClient, "ledger", | |
| tmpostgres.WithModule("onboarding"), | |
| tmpostgres.WithLogger(logger), | |
| ) | |
| // Inject a tenant connection into the PG manager's map via GetConnection. | |
| // Since we don't have a real DB, we'll test that the watcher at least | |
| // calls the API by checking configCalled. The ApplyConnectionSettings | |
| // will be a no-op since there's no real connection. | |
| w := NewSettingsWatcher(tmClient, "ledger", | |
| WithPostgresManager(pgManager), | |
| WithLogger(logger), | |
| WithInterval(20*time.Millisecond), | |
| ) | |
| ctx := context.Background() | |
| ctx = libCommons.ContextWithLogger(ctx, logger) | |
| // Directly test the revalidate method with a manager that has connections. | |
| // We need to verify the watcher uses ConnectedTenantIDs correctly. | |
| // Since pgManager has no connections, revalidate should be a no-op. | |
| w.revalidate(ctx) | |
| assert.Equal(t, int32(0), configCalled.Load(), | |
| "should not call API when no tenants have connections") | |
| } | |
| func TestSettingsWatcher_NoAPICallsWhenNoConnections(t *testing.T) { | |
| t.Parallel() | |
| configCalled := atomic.Int32{} | |
| server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
| configCalled.Add(1) | |
| w.Header().Set("Content-Type", "application/json") | |
| resp := map[string]any{ | |
| "id": "tenant-abc", | |
| "tenantSlug": "abc", | |
| "databases": map[string]any{ | |
| "onboarding": map[string]any{ | |
| "connectionSettings": map[string]any{ | |
| "maxOpenConns": 50, | |
| "maxIdleConns": 15, | |
| }, | |
| }, | |
| }, | |
| } | |
| _ = json.NewEncoder(w).Encode(resp) | |
| })) | |
| defer server.Close() | |
| tmClient := newTestClient(t, server.URL) | |
| logger := testutil.NewCapturingLogger() | |
| // Create a PG manager with a fake connection so ConnectedTenantIDs returns something. | |
| // We use the manager's internal state to simulate an active tenant. | |
| pgManager := tmpostgres.NewManager(tmClient, "ledger", | |
| tmpostgres.WithModule("onboarding"), | |
| tmpostgres.WithLogger(logger), | |
| ) | |
| // Inject a tenant connection into the PG manager's map via GetConnection. | |
| // Since we don't have a real DB, we'll test that the watcher at least | |
| // calls the API by checking configCalled. The ApplyConnectionSettings | |
| // will be a no-op since there's no real connection. | |
| w := NewSettingsWatcher(tmClient, "ledger", | |
| WithPostgresManager(pgManager), | |
| WithLogger(logger), | |
| WithInterval(20*time.Millisecond), | |
| ) | |
| ctx := context.Background() | |
| ctx = libCommons.ContextWithLogger(ctx, logger) | |
| // Directly test the revalidate method with a manager that has connections. | |
| // We need to verify the watcher uses ConnectedTenantIDs correctly. | |
| // Since pgManager has no connections, revalidate should be a no-op. | |
| w.revalidate(ctx) | |
| assert.Equal(t, int32(0), configCalled.Load(), | |
| "should not call API when no tenants have connections") | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/watcher/settings_watcher_test.go` around lines 67 -
124, Rename the test function TestSettingsWatcher_AppliesSettings to a name that
reflects its behavior (for example
TestSettingsWatcher_NoAPICallsWhenNoConnections) so the intent is clear; locate
the test function declaration in settings_watcher_test.go (the function that
constructs tmClient, pgManager, NewSettingsWatcher and calls w.revalidate(ctx))
and update the function name and any references accordingly (keep the test body
unchanged).
| func TestSettingsWatcher_HandlesAPIErrors(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| // Return 500 for all tenant config calls | ||
| if strings.Contains(r.URL.Path, "/connections/") { | ||
| w.WriteHeader(http.StatusInternalServerError) | ||
| _, _ = w.Write([]byte(`{"error":"internal error"}`)) | ||
| return | ||
| } | ||
|
|
||
| w.WriteHeader(http.StatusOK) | ||
| })) | ||
| defer server.Close() | ||
|
|
||
| tmClient := newTestClient(t, server.URL) | ||
| logger := testutil.NewCapturingLogger() | ||
|
|
||
| pgManager := tmpostgres.NewManager(tmClient, "ledger", | ||
| tmpostgres.WithModule("onboarding"), | ||
| tmpostgres.WithLogger(logger), | ||
| ) | ||
|
|
||
| w := NewSettingsWatcher(tmClient, "ledger", | ||
| WithPostgresManager(pgManager), | ||
| WithLogger(logger), | ||
| WithInterval(20*time.Millisecond), | ||
| ) | ||
|
|
||
| ctx := context.Background() | ||
| ctx = libCommons.ContextWithLogger(ctx, logger) | ||
|
|
||
| // Directly call revalidate — since pgManager has no connections, nothing happens. | ||
| // This test verifies the watcher doesn't panic on errors. | ||
| w.revalidate(ctx) | ||
|
|
||
| // No connections means no API calls, so no error logs expected. | ||
| // The test passes if no panic occurred. | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Test does not exercise the error handling path.
This test sets up a server to return 500 errors, but since pgManager has no connections, revalidate() returns early at line 143-145 without making any API calls. The error handling code path (lines 151-159 in settings_watcher.go) is never exercised.
To properly test API error handling, you would need to inject a tenant connection into the manager first.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/watcher/settings_watcher_test.go` around lines 172 -
210, The test never hits the API-error branch because pgManager has no tenant
connections; before calling w.revalidate in TestSettingsWatcher_HandlesAPIErrors
you should add a tenant connection to pgManager so revalidate performs the HTTP
call and triggers the 500 path—create a minimal tenant connection (using the
tmpostgres manager's public add/insert method on pgManager) pointing at the test
server URL, ensure it's visible to revalidate, then call w.revalidate(ctx) as
before to exercise the error-handling code in settings_watcher.revalidate.
| // Start launches the background revalidation goroutine. It returns immediately. | ||
| // The goroutine runs until Stop is called or the parent context is cancelled. | ||
| func (w *SettingsWatcher) Start(ctx context.Context) { | ||
| if w.postgres == nil && w.mongo == nil { | ||
| return // no-op: nothing to revalidate | ||
| } | ||
|
|
||
| ctx, cancel := context.WithCancel(ctx) | ||
| w.cancel = cancel | ||
| w.done = make(chan struct{}) | ||
|
|
||
| go w.loop(ctx) | ||
| } |
There was a problem hiding this comment.
Calling Start() multiple times can orphan goroutines.
If Start() is called twice without calling Stop() in between, the first goroutine's done channel reference is overwritten and that goroutine becomes orphaned (it will continue running until context cancellation, but Stop() won't wait for it).
Consider adding a guard to prevent double-start, or document that callers must not call Start() multiple times.
🛡️ Suggested fix to prevent double-start
func (w *SettingsWatcher) Start(ctx context.Context) {
if w.postgres == nil && w.mongo == nil {
return // no-op: nothing to revalidate
}
+ if w.cancel != nil {
+ return // already started
+ }
+
ctx, cancel := context.WithCancel(ctx)
w.cancel = cancel
w.done = make(chan struct{})
go w.loop(ctx)
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/watcher/settings_watcher.go` around lines 84 - 96, The
Start method on SettingsWatcher can be called twice and overwrite
w.done/w.cancel, orphaning the first goroutine; add a start-guard (e.g., a
boolean or check of w.done/w.cancel protected by a mutex) at the top of Start to
return early (or error) if the watcher is already running, and update Stop to
clear that guard when the goroutine exits; specifically modify
SettingsWatcher.Start to check a running flag or non-nil w.done/w.cancel (and
lock around these checks using a new sync.Mutex or sync/atomic) before creating
the context and launching loop, and ensure SettingsWatcher.Stop resets the flag
and nils w.done/w.cancel after cancelling and waiting for the goroutine so
subsequent Start calls behave correctly.
Add TestMain with goleak.VerifyTestMain to catch goroutine leaks. Fix client cleanup in tests — InMemoryCache spawns a background goroutine that was leaked without calling Client.Close(). X-Lerian-Ref: 0x1
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@commons/tenant-manager/watcher/settings_watcher_test.go`:
- Around line 224-237: The test sets up an httptest.NewServer (server) that
returns a 403 TS-SUSPENDED response but never exercises it because the test
calls handleSuspendedTenant() directly; either remove the unused server setup or
change the test to exercise revalidate() so the code path that performs the HTTP
call to /connections/ runs and the server handler is invoked; specifically, if
you want to test the suspended-tenant HTTP path, wire the test to call
revalidate() (or the method that triggers the HTTP GET to /connections/) with
the server URL injected so the request hits the server and triggers the 403
branch, otherwise delete the httptest.NewServer setup to avoid dead code.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: ddc81065-2d1f-4906-893b-1c87cb195f14
📒 Files selected for processing (1)
commons/tenant-manager/watcher/settings_watcher_test.go
MongoDB driver does not support pool resize after creation, so connection pool settings revalidation only applies to PostgreSQL. Remove mongo manager, WithMongoManager option, and ConnectedTenantIDs from mongo manager. X-Lerian-Ref: 0x1
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (4)
commons/tenant-manager/watcher/settings_watcher_test.go (3)
181-219: 🧹 Nitpick | 🔵 TrivialTest does not exercise the HTTP error handling path.
The server is configured to return 500 errors for
/connections/paths, but sincepgManagerhas no tenant connections,revalidate()returns early at thelen(tenantIDs) == 0check without making any API calls. The error handling code inrevalidate()(lines 147-156 in settings_watcher.go) is never exercised.To properly test API error handling, you would need to inject a tenant connection into the manager first, or mock
ConnectedTenantIDs()to return tenant IDs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@commons/tenant-manager/watcher/settings_watcher_test.go` around lines 181 - 219, The test TestSettingsWatcher_HandlesAPIErrors never triggers the HTTP 500 path because pgManager has no tenant connections; update the test to ensure revalidate() iterates tenants by either injecting a tenant connection into pgManager (use tmpostgres.NewManager(...).CreateConnection / AddConnection equivalent) or by mocking/overriding pgManager.ConnectedTenantIDs() to return a non-empty slice (e.g., a single tenant ID) before calling w.revalidate(ctx), so the HTTP handler for "/connections/" is invoked and the watcher’s error handling is exercised.
76-133: 🧹 Nitpick | 🔵 TrivialTest name does not match test behavior.
The test
TestSettingsWatcher_AppliesSettingsactually verifies that no API calls are made when the manager has no connections. The comment on line 128 confirms this: "Since pgManager has no connections, revalidate should be a no-op." Consider renaming to better reflect the actual test behavior, e.g.,TestSettingsWatcher_NoAPICallsWhenNoConnections.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@commons/tenant-manager/watcher/settings_watcher_test.go` around lines 76 - 133, Rename the test function TestSettingsWatcher_AppliesSettings to reflect its actual behavior (e.g., TestSettingsWatcher_NoAPICallsWhenNoConnections) and update any references; the test is asserting that no API calls occur when the postgres manager has no connections, so change the test name and any related comments to match that semantic (locate the test function TestSettingsWatcher_AppliesSettings in settings_watcher_test.go and update its identifier and accompanying comment accordingly).
224-237: 🧹 Nitpick | 🔵 TrivialServer setup is dead code—handler is never invoked.
The server is configured to return 403 with
TS-SUSPENDED, buthandleSuspendedTenant()is called directly at line 256 without making any API calls. The server handler is never exercised.Either remove the unused server setup or restructure the test to call
revalidate()with an injected tenant connection so the API call occurs and triggers the suspended tenant path.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@commons/tenant-manager/watcher/settings_watcher_test.go` around lines 224 - 237, The test currently sets up an httptest.Server but never calls it because handleSuspendedTenant() is invoked directly; either remove the unused server setup or change the test to exercise the HTTP path: call revalidate() (or the watcher method that triggers tenant API validation) with a tenant connection that points to the test server URL so the handler runs and returns the 403 TS-SUSPENDED response; update the test to assert the suspended behavior after invoking revalidate() (or otherwise inject the server URL into the code path that performs the API call) and remove direct calls to handleSuspendedTenant() to ensure the handler is exercised.commons/tenant-manager/watcher/settings_watcher.go (1)
82-92:⚠️ Potential issue | 🟡 MinorCalling
Start()multiple times can orphan goroutines.If
Start()is called twice without an interveningStop(), the first goroutine'sdonechannel reference is overwritten and that goroutine becomes orphaned—it continues running until context cancellation butStop()won't wait for it.Additionally, there's a data race:
w.cancelandw.doneare accessed without synchronization, so concurrent calls toStart()andStop()can race.🛡️ Suggested fix to prevent double-start
+import "sync" + type SettingsWatcher struct { client *client.Client service string interval time.Duration postgres *tmpostgres.Manager logger *logcompat.Logger + mu sync.Mutex cancel context.CancelFunc done chan struct{} } func (w *SettingsWatcher) Start(ctx context.Context) { if w.postgres == nil { return // no-op: nothing to revalidate } + w.mu.Lock() + defer w.mu.Unlock() + + if w.cancel != nil { + return // already started + } + ctx, cancel := context.WithCancel(ctx) w.cancel = cancel w.done = make(chan struct{}) go w.loop(ctx) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@commons/tenant-manager/watcher/settings_watcher.go` around lines 82 - 92, Start currently overwrites w.cancel and w.done and races with Stop; modify SettingsWatcher to guard Start/Stop with a mutex (e.g., add a sync.Mutex on the struct) and make Start a no-op if the watcher is already running: inside Start, lock the mutex, check a running flag or that w.done != nil, return if already started, otherwise create ctx, set w.cancel and w.done, then spawn go w.loop(ctx) and unlock; update Stop to lock the same mutex, use w.cancel() if non-nil, wait for the goroutine via the w.done channel (and set w.done and w.cancel to nil after waiting), and unlock—this ensures no double-start, no orphaned goroutines, and no data races on w.cancel/w.done (refer to SettingsWatcher.Start, SettingsWatcher.Stop, w.cancel, w.done, and loop).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@commons/tenant-manager/watcher/settings_watcher.go`:
- Around line 94-103: Stop() must use the watcher mutex and clear internal state
so the watcher can be restarted: acquire w.mu, capture local references to
w.cancel and w.done (or check nil), call cancel() if non-nil, then release the
lock before waiting on the done channel to avoid deadlock, after the goroutine
finishes re-acquire w.mu and set w.cancel = nil and w.done = nil, and finally
unlock; update the SettingsWatcher.Stop method to follow this sequence (use the
existing mutex field name and w.cancel/w.done identifiers).
---
Duplicate comments:
In `@commons/tenant-manager/watcher/settings_watcher_test.go`:
- Around line 181-219: The test TestSettingsWatcher_HandlesAPIErrors never
triggers the HTTP 500 path because pgManager has no tenant connections; update
the test to ensure revalidate() iterates tenants by either injecting a tenant
connection into pgManager (use tmpostgres.NewManager(...).CreateConnection /
AddConnection equivalent) or by mocking/overriding
pgManager.ConnectedTenantIDs() to return a non-empty slice (e.g., a single
tenant ID) before calling w.revalidate(ctx), so the HTTP handler for
"/connections/" is invoked and the watcher’s error handling is exercised.
- Around line 76-133: Rename the test function
TestSettingsWatcher_AppliesSettings to reflect its actual behavior (e.g.,
TestSettingsWatcher_NoAPICallsWhenNoConnections) and update any references; the
test is asserting that no API calls occur when the postgres manager has no
connections, so change the test name and any related comments to match that
semantic (locate the test function TestSettingsWatcher_AppliesSettings in
settings_watcher_test.go and update its identifier and accompanying comment
accordingly).
- Around line 224-237: The test currently sets up an httptest.Server but never
calls it because handleSuspendedTenant() is invoked directly; either remove the
unused server setup or change the test to exercise the HTTP path: call
revalidate() (or the watcher method that triggers tenant API validation) with a
tenant connection that points to the test server URL so the handler runs and
returns the 403 TS-SUSPENDED response; update the test to assert the suspended
behavior after invoking revalidate() (or otherwise inject the server URL into
the code path that performs the API call) and remove direct calls to
handleSuspendedTenant() to ensure the handler is exercised.
In `@commons/tenant-manager/watcher/settings_watcher.go`:
- Around line 82-92: Start currently overwrites w.cancel and w.done and races
with Stop; modify SettingsWatcher to guard Start/Stop with a mutex (e.g., add a
sync.Mutex on the struct) and make Start a no-op if the watcher is already
running: inside Start, lock the mutex, check a running flag or that w.done !=
nil, return if already started, otherwise create ctx, set w.cancel and w.done,
then spawn go w.loop(ctx) and unlock; update Stop to lock the same mutex, use
w.cancel() if non-nil, wait for the goroutine via the w.done channel (and set
w.done and w.cancel to nil after waiting), and unlock—this ensures no
double-start, no orphaned goroutines, and no data races on w.cancel/w.done
(refer to SettingsWatcher.Start, SettingsWatcher.Stop, w.cancel, w.done, and
loop).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 87934d2a-eda0-4cf7-94a3-a3464f3c0530
📒 Files selected for processing (2)
commons/tenant-manager/watcher/settings_watcher.gocommons/tenant-manager/watcher/settings_watcher_test.go
| // Stop cancels the background goroutine and waits for it to finish. | ||
| func (w *SettingsWatcher) Stop() { | ||
| if w.cancel != nil { | ||
| w.cancel() | ||
| } | ||
|
|
||
| if w.done != nil { | ||
| <-w.done | ||
| } | ||
| } |
There was a problem hiding this comment.
Stop() should also use mutex protection and reset state.
To pair with the fix for Start(), Stop() should acquire the mutex and reset w.cancel and w.done to nil after the goroutine finishes. This ensures the watcher can be restarted after being stopped and prevents races.
🛡️ Suggested fix for Stop()
func (w *SettingsWatcher) Stop() {
+ w.mu.Lock()
+ cancel := w.cancel
+ done := w.done
+ w.mu.Unlock()
+
- if w.cancel != nil {
- w.cancel()
+ if cancel != nil {
+ cancel()
}
- if w.done != nil {
- <-w.done
+ if done != nil {
+ <-done
}
+
+ w.mu.Lock()
+ w.cancel = nil
+ w.done = nil
+ w.mu.Unlock()
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/watcher/settings_watcher.go` around lines 94 - 103,
Stop() must use the watcher mutex and clear internal state so the watcher can be
restarted: acquire w.mu, capture local references to w.cancel and w.done (or
check nil), call cancel() if non-nil, then release the lock before waiting on
the done channel to avoid deadlock, after the goroutine finishes re-acquire w.mu
and set w.cancel = nil and w.done = nil, and finally unlock; update the
SettingsWatcher.Stop method to follow this sequence (use the existing mutex
field name and w.cancel/w.done identifiers).
Instantiate SettingsWatcher in multi-tenant bootstrap with both PG managers (onboarding + transaction). Runs as a Runnable alongside the multi-tenant consumer, stopped on graceful shutdown. Depends on: LerianStudio/lib-commons#395 X-Lerian-Ref: 0x1
… on start - Change postgres field to managers slice — WithPostgresManager appends instead of overwriting. Fixes silent loss of onboarding PG manager when two managers are registered. - collectTenantIDs unions and deduplicates across all managers. - Run one revalidation immediately on Start before entering ticker loop, so settings changed while the service was down are applied without waiting 30s. - Add WithTestConnections option for external test packages. X-Lerian-Ref: 0x1
gandalf-at-lerian
left a comment
There was a problem hiding this comment.
Review Summary
Architecture is solid — decoupling settings revalidation from the RabbitMQ consumer is the right call. Services without RabbitMQ now get PostgreSQL pool settings revalidation, and the consumer stays focused on tenant discovery + messaging. Clean separation.
🔴 Blocker: Single PG Manager limitation
SettingsWatcher holds a single postgres *tmpostgres.Manager field. WithPostgresManager overwrites instead of appending:
func WithPostgresManager(p *tmpostgres.Manager) Option {
return func(w *SettingsWatcher) { w.postgres = p }
}Any consumer with multiple PG managers (e.g., Midaz has onboarding + transaction) silently loses the first one — only the last call wins. Pool settings for the overwritten manager are never revalidated.
Fix: Change to managers []*tmpostgres.Manager, WithPostgresManager appends, collectTenantIDs does a union across all managers, revalidate iterates all managers per tenant.
🟡 No immediate revalidation on Start
The loop uses a pure ticker — first revalidation only fires after the interval (30s default). If pool settings changed while the service was down, it runs stale for up to 30s. Consider running one revalidate() call before entering the ticker loop.
🟡 Watcher does not detect connection-level config changes
revalidate() calls ApplyConnectionSettings directly but does not call detectAndReconnectPostgres. If host/port/credentials changed (not just pool sizes), the watcher won't trigger reconnection. Only the inline revalidation in GetConnection handles that path. This may be intentional (watcher handles pool tuning, reconnection is left to hot-path), but it should be documented explicitly — services relying solely on the watcher (no RabbitMQ, low traffic) could miss credential rotations until the next request.
✅ What's good
- Functional options pattern composes well for other services
ConnectedTenantIDs()with RLock is clean- goleak in tests catches goroutine leaks
- Suspended/purged tenant handling with connection cleanup
- Package-level doc is clear
The single-manager issue is the only blocker. The rest are improvements worth considering before this becomes the standard pattern across services.
gandalf-at-lerian
left a comment
There was a problem hiding this comment.
Updating review — my initial diff was stale. The current code already addresses the multi-manager concern:
managers []*tmpostgres.Managerwith append semantics- Deduplicated tenant ID collection across all managers
- All managers iterated in revalidate + handleSuspendedTenant
- Immediate revalidation on Start before the ticker loop
Only remaining suggestion: consider documenting that the watcher handles pool tuning only — credential/host changes are handled by the inline revalidation path in GetConnection. This matters for services with low traffic that rely primarily on the watcher.
Clean implementation. LGTM. ✅
Summary
Moves connection settings revalidation to a standalone
SettingsWatchercomponent, completely independent of the RabbitMQ consumer. Services without RabbitMQ now get PostgreSQL pool settings revalidation.PostgreSQL only — MongoDB is excluded because the Go driver does not support pool resize after
mongo.Connect(). Mongo Manager continues to be initialized separately via middleware/bootstrap.Problem
revalidateConnectionSettingswas insideMultiTenantConsumer(RabbitMQ). If a service doesn't use RabbitMQ, it loses PostgreSQL pool settings revalidation entirely.Solution
New standalone component:
commons/tenant-manager/watcher.SettingsWatcherFiles changed
watcher/settings_watcher.gowatcher/settings_watcher_test.gopostgres/manager.goConnectedTenantIDs()consumer/multi_tenant_revalidate.gorevalidateConnectionSettingsconsumer/multi_tenant_sync.goconsumer/multi_tenant_test.goUsage
What about MongoDB?
Mongo Manager is initialized normally via middleware/bootstrap — it just doesn't participate in the SettingsWatcher because its pool settings are immutable after creation.
Test plan
go test ./commons/tenant-manager/... -count=1 -race— all 15 packages pass🤖 Generated with Claude Code