feat: event-driven tenant discovery in MultiTenantConsumer#397
feat: event-driven tenant discovery in MultiTenantConsumer#397jeffersonrodrigues92 merged 2 commits intodevelopfrom
Conversation
Replace polling-based tenant discovery with Redis Pub/Sub + lazy-load. Consumer pods boot with an empty tenant map, subscribe to tenant-events:*, and lazy-load tenant context on first request. All subsequent state changes arrive exclusively via Redis Pub/Sub events. Polling is eliminated entirely. New components: - event/types.go: TenantLifecycleEvent envelope, 12 event type constants, 7 typed payload structs, ParseEvent() helper - event/listener.go: TenantEventListener with Redis PSubscribe, goroutine lifecycle management, graceful shutdown - consumer/multi_tenant_cache.go: TTL-aware tenant cache with lazy eviction, configurable TTL (default 12h), thread-safe operations - consumer/multi_tenant_lazy.go: loadTenant() hook for on-demand tenant fetching from tenant-manager API with per-tenant mutex - consumer/multi_tenant_events.go: Event-to-action dispatch with service filtering, jitter on pool rebuild, 12 handler methods Breaking changes: - NewMultiTenantConsumerWithError() now requires redisClient parameter - Removed WithRedisClient() option (now constructor param) - Deleted syncActiveTenants(), discoverTenants(), and all polling functions - Deleted multi_tenant_sync.go, multi_tenant_revalidate.go Closes #396 X-Lerian-Ref: 0x1
WalkthroughThe PR converts the multi-tenant consumer from eager, polling-driven startup to an event-driven, lazy-loading model. A new Sequence Diagram(s)sequenceDiagram
participant Client
participant Consumer as MultiTenantConsumer
participant Cache as TenantCache
participant API as Tenant-Manager API
participant RabbitMQ
Client->>Consumer: Request for tenant
activate Consumer
Consumer->>Cache: Get(tenantID)
Cache-->>Consumer: Miss / Expired
Note over Consumer: Acquire per-tenant lock
Consumer->>API: GetTenantConfig(tenantID, service)
API-->>Consumer: TenantConfig / Error
alt Success
Consumer->>Cache: Set(tenantID, config, TTL)
Consumer->>Consumer: Mark known
Consumer->>RabbitMQ: ensureConsumerStarted(tenantID)
RabbitMQ-->>Consumer: Consumer started
Consumer-->>Client: Success
else Not found / Suspended / Error
Consumer-->>Client: Error (no cache)
end
deactivate Consumer
sequenceDiagram
participant Redis as Redis Pub/Sub
participant Listener as TenantEventListener
participant Consumer as MultiTenantConsumer
participant Cache as TenantCache
participant Handlers as Event Handlers
participant Connections as Consumer Connections
Redis->>Listener: Publish TenantLifecycleEvent
activate Listener
Listener->>Listener: ParseEvent(JSON)
Listener->>Consumer: handleLifecycleEvent(event)
deactivate Listener
activate Consumer
Consumer->>Consumer: Filter by service / tenant
Consumer->>Handlers: dispatchEvent(eventType, payload)
alt tenant.activated / tenant.updated
Handlers->>Cache: Touch(tenantID)
Cache->>Cache: Refresh TTL
else tenant.suspended / tenant.deleted
Handlers->>Cache: Delete(tenantID)
Handlers->>Connections: closeTenantConnections(tenantID)
else tenant.service.associated
Handlers->>Cache: Set(tenantID, config, TTL)
Handlers->>Consumer: ensureConsumerStarted(tenantID)
else tenant.credentials.rotated
Handlers->>Connections: closeTenantConnections(tenantID)
Handlers->>Consumer: (lazy reload on next request)
end
Consumer-->>Consumer: Return
deactivate Consumer
🚥 Pre-merge checks | ✅ 4✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
commons/tenant-manager/consumer/multi_tenant.go (1)
343-353:⚠️ Potential issue | 🔴 CriticalStop the event listener outside
c.mu.
Close()holdsc.muwhile callingeventListener.Stop(), andStop()waits for the listener goroutine to exit. That goroutine dispatches handlers synchronously, and several handlers takec.mu, soClose()can deadlock against an in-flight lifecycle event. Snapshot the listener/cancel funcs under lock, unlock, then stop the listener and cancel tenants outside the mutex. As per coding guidelines, "Keep behavior nil-safe and concurrency-safe by default in exported APIs".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@commons/tenant-manager/consumer/multi_tenant.go` around lines 343 - 353, Close() currently holds c.mu while calling c.eventListener.Stop(), which can deadlock because Stop() waits for a goroutine that may call handlers which reacquire c.mu; fix by snapshotting c.eventListener and any tenant cancel functions (or the slice/map of cancel funcs) into local variables while holding c.mu, set c.closed = true, then release the lock and call listener.Stop() and invoke cancels outside the mutex; ensure you nil-check c.eventListener before calling Stop() and clear/replace stored cancel references under the lock to avoid races.commons/tenant-manager/consumer/multi_tenant_test.go (1)
361-408: 🧹 Nitpick | 🔵 TrivialThis test doesn't verify the propagation it claims.
The assertions only check
consumer.config.CacheTTL, so a regression that dropsclient.WithCacheTTLwould still pass. Please assert observable client behavior or expose the effective client cache TTL through a test seam.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@commons/tenant-manager/consumer/multi_tenant_test.go` around lines 361 - 408, The test TestMultiTenantConsumer_CacheTTLPropagation currently only checks MultiTenantConfig.CacheTTL and doesn't verify client.WithCacheTTL was applied; update the test to assert the effective TTL on the underlying client instead of only the config: after creating consumer via NewMultiTenantConsumerWithError, inspect consumer.pmClient (or add a small test seam accessor on the consumer to return the effective client cache TTL) and assert it equals tt.cacheTTL, or alternatively perform a short-lived request sequence against a mocked pmClient that demonstrates caching behavior consistent with tt.cacheTTL; reference consumer.pmClient, NewMultiTenantConsumerWithError, client.WithCacheTTL and MultiTenantConfig.CacheTTL when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.env.reference:
- Line 416: Update the documentation comment that currently says "Passed to
consumer via: WithRedisClient(redisClient)" to reflect that WithRedisClient was
removed and a required redisClient parameter is now passed directly to
NewMultiTenantConsumerWithError(redisClient, ...); locate the line in
.env.reference referencing WithRedisClient and replace the wording to indicate
the redisClient must be provided to NewMultiTenantConsumerWithError instead,
mentioning the exact constructor name NewMultiTenantConsumerWithError and the
redisClient parameter.
In `@commons/tenant-manager/consumer/multi_tenant_events_handlers.go`:
- Around line 223-235: The handler currently applies every
ConnectionsUpdatedPayload to the local Postgres manager when c.postgres != nil;
gate this by checking payload.ServiceName (or payload.Service) equals
c.config.Service and payload.Module equals the Postgres module string (e.g.
"postgres") before calling buildConfigFromConnectionsPayload and
c.postgres.ApplyConnectionSettings; if either check fails, return early. Use the
existing symbols ConnectionsUpdatedPayload, c.config.Service, payload.Module,
buildConfigFromConnectionsPayload and c.postgres.ApplyConnectionSettings to
locate and implement the guard.
- Around line 171-189: The handler currently calls applyJitter inline which
blocks the shared Pub/Sub dispatch goroutine; remove the direct applyJitter(ctx)
call from the event handler and instead perform jittered delay inside
tenant-specific async restart/reload logic (e.g., move the jitter into
ensureConsumerStarted or into a goroutine spawned by ensureConsumerStarted so
the delay happens off the dispatch thread). Update ensureConsumerStarted (or the
new per-tenant restart routine) to run applyJitter/ sleep before performing
restart/reload, keep the cache population (cache.Set) and knownTenants update in
the handler, and apply the same change to the other occurrences of applyJitter
in this file.
In `@commons/tenant-manager/consumer/multi_tenant_option_test.go`:
- Around line 1-11: Add the same copyright and license header comment present in
other test files (e.g., multi_tenant_cache_test.go) to the top of
multi_tenant_option_test.go so it matches project conventions; place the block
before the package declaration (package consumer) and ensure it includes the
exact copyright line and SPDX or license comment used across the repo.
In `@commons/tenant-manager/consumer/multi_tenant.go`:
- Around line 83-86: Validate TenantCacheTTL the same way CacheTTL is validated:
in the constructor that builds the tenant-manager consumer, check TenantCacheTTL
for negative values and return an error (rather than silently treating negatives
as default in resolveCacheTTL()); change the constructor to reject
TenantCacheTTL < 0 with a clear error message and keep resolveCacheTTL() only
for normalizing valid values, referencing TenantCacheTTL, CacheTTL and
resolveCacheTTL() to locate the code to modify.
In `@commons/tenant-manager/event/listener.go`:
- Around line 168-172: The call to the user-supplied EventHandler
(l.handler(ctx, *evt)) must be protected from panics so the listener goroutine
doesn't die; wrap the invocation in a small helper closure or inline anonymous
function that defers a recover() which, on panic, captures the panic value,
converts it to an error, logs it with logger (e.g., logger.ErrorfCtx or
logger.WarnfCtx) including evt.EventID/evt.EventType and the panic value, and
call libOpentelemetry.HandleSpanError(span, "tenant event handler panic", err)
so the span is marked; then proceed to call l.handler and handle its returned
error as before (logger.WarnfCtx + libOpentelemetry.HandleSpanError) so both
panics and normal errors are handled without crashing the listener.
- Around line 53-59: The constructor for the tenant event listener currently
only checks redisClient == nil which misses typed-nil interface values; add a
guard using core.IsNilInterface(redisClient) (after the existing nil check) and
return an error like the existing ones so typed-nil redis.UniversalClient values
are rejected; this prevents Start() from calling PSubscribe on a nil receiver
and aligns NewTenantEventListener with the consumer constructor's nil-safety
pattern.
- Around line 92-98: The Start() method currently calls
l.redisClient.PSubscribe(ctx, SubscriptionPattern) and immediately spawns go
l.listen(ctx, pubsub) and returns nil; change this to perform the subscription
handshake by calling pubsub.Receive(ctx) (or ReceiveWithContext) after
PSubscribe and before starting the goroutine, check and return any error from
Receive (and close pubsub on failure), and only call go l.listen(ctx, pubsub)
and log success after Receive confirms the subscription; ensure you reference
PSubscribe, pubsub, Receive, listen, SubscriptionPattern and Start() in the
updated flow so failures are returned instead of swallowed.
In `@commons/tenant-manager/event/types.go`:
- Around line 5-9: Update the package documentation comment in package event to
accurately reflect the transport used: replace the phrase "Valkey Streams-based
event-driven tenant discovery" with wording that references Pub/Sub (e.g.,
"Valkey/Redis Pub/Sub (PSUBSCRIBE)-based event-driven tenant discovery" or
"Valkey Pub/Sub-based") so the package doc matches the implementation.
In `@commons/tenant-manager/watcher/settings_watcher_test.go`:
- Around line 417-465: TestSettingsWatcher_StartWorks duplicates the behavior
already covered by TestSettingsWatcher_ImmediateRevalidationOnStart; remove or
merge the redundant test to avoid overlap. Locate the
TestSettingsWatcher_StartWorks function (which constructs NewSettingsWatcher,
calls Start/Stop and asserts apiCalled was >=1) and either delete it or fold any
unique assertions into TestSettingsWatcher_ImmediateRevalidationOnStart so only
one test verifies the immediate revalidation on Start; ensure the remaining test
still sets up tmClient, NewSettingsWatcher (and
WithInterval/WithPostgresManager/WithLogger), calls Start, sleeps briefly, calls
Stop, and asserts apiCalled.Load() >= 1.
---
Outside diff comments:
In `@commons/tenant-manager/consumer/multi_tenant_test.go`:
- Around line 361-408: The test TestMultiTenantConsumer_CacheTTLPropagation
currently only checks MultiTenantConfig.CacheTTL and doesn't verify
client.WithCacheTTL was applied; update the test to assert the effective TTL on
the underlying client instead of only the config: after creating consumer via
NewMultiTenantConsumerWithError, inspect consumer.pmClient (or add a small test
seam accessor on the consumer to return the effective client cache TTL) and
assert it equals tt.cacheTTL, or alternatively perform a short-lived request
sequence against a mocked pmClient that demonstrates caching behavior consistent
with tt.cacheTTL; reference consumer.pmClient, NewMultiTenantConsumerWithError,
client.WithCacheTTL and MultiTenantConfig.CacheTTL when making the change.
In `@commons/tenant-manager/consumer/multi_tenant.go`:
- Around line 343-353: Close() currently holds c.mu while calling
c.eventListener.Stop(), which can deadlock because Stop() waits for a goroutine
that may call handlers which reacquire c.mu; fix by snapshotting c.eventListener
and any tenant cancel functions (or the slice/map of cancel funcs) into local
variables while holding c.mu, set c.closed = true, then release the lock and
call listener.Stop() and invoke cancels outside the mutex; ensure you nil-check
c.eventListener before calling Stop() and clear/replace stored cancel references
under the lock to avoid races.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 45c41ec9-a40a-4062-9b6b-4c8aa94c7a19
📒 Files selected for processing (25)
.env.referencecommons/tenant-manager/consumer/goroutine_leak_test.gocommons/tenant-manager/consumer/multi_tenant.gocommons/tenant-manager/consumer/multi_tenant_cache.gocommons/tenant-manager/consumer/multi_tenant_cache_test.gocommons/tenant-manager/consumer/multi_tenant_consume.gocommons/tenant-manager/consumer/multi_tenant_ensure_test.gocommons/tenant-manager/consumer/multi_tenant_events.gocommons/tenant-manager/consumer/multi_tenant_events_handlers.gocommons/tenant-manager/consumer/multi_tenant_events_test.gocommons/tenant-manager/consumer/multi_tenant_lazy.gocommons/tenant-manager/consumer/multi_tenant_lazy_test.gocommons/tenant-manager/consumer/multi_tenant_option_test.gocommons/tenant-manager/consumer/multi_tenant_retry_test.gocommons/tenant-manager/consumer/multi_tenant_revalidate.gocommons/tenant-manager/consumer/multi_tenant_run_test.gocommons/tenant-manager/consumer/multi_tenant_sync.gocommons/tenant-manager/consumer/multi_tenant_sync_test.gocommons/tenant-manager/consumer/multi_tenant_test.gocommons/tenant-manager/event/listener.gocommons/tenant-manager/event/listener_test.gocommons/tenant-manager/event/payloads.gocommons/tenant-manager/event/types.gocommons/tenant-manager/event/types_test.gocommons/tenant-manager/watcher/settings_watcher_test.go
💤 Files with no reviewable changes (3)
- commons/tenant-manager/consumer/multi_tenant_revalidate.go
- commons/tenant-manager/consumer/multi_tenant_sync_test.go
- commons/tenant-manager/consumer/multi_tenant_sync.go
| # Redis/Valkey URL for event-driven tenant discovery via Pub/Sub. | ||
| # When set, the consumer subscribes to tenant-events:* and uses lazy-loading | ||
| # instead of polling. When unset, the consumer falls back to polling mode. | ||
| # Passed to consumer via: WithRedisClient(redisClient) |
There was a problem hiding this comment.
Documentation references removed functional option.
Line 416 states "Passed to consumer via: WithRedisClient(redisClient)" but according to the PR summary, WithRedisClient() was removed and replaced with a required redisClient parameter in NewMultiTenantConsumerWithError(). The documentation should reflect the new constructor signature.
📝 Suggested fix
-# Passed to consumer via: WithRedisClient(redisClient)
+# Passed to consumer via: NewMultiTenantConsumerWithError(..., redisClient, ...)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Passed to consumer via: WithRedisClient(redisClient) | |
| # Passed to consumer via: NewMultiTenantConsumerWithError(..., redisClient, ...) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In @.env.reference at line 416, Update the documentation comment that currently
says "Passed to consumer via: WithRedisClient(redisClient)" to reflect that
WithRedisClient was removed and a required redisClient parameter is now passed
directly to NewMultiTenantConsumerWithError(redisClient, ...); locate the line
in .env.reference referencing WithRedisClient and replace the wording to
indicate the redisClient must be provided to NewMultiTenantConsumerWithError
instead, mentioning the exact constructor name NewMultiTenantConsumerWithError
and the redisClient parameter.
| logger.InfofCtx(ctx, "tenant.service.reactivated: re-adding tenant=%s with jitter", evt.TenantID) | ||
|
|
||
| c.applyJitter(ctx) | ||
|
|
||
| // Build minimal TenantConfig from payload | ||
| config := &core.TenantConfig{ | ||
| ID: evt.TenantID, | ||
| TenantSlug: evt.TenantSlug, | ||
| Service: c.config.Service, | ||
| } | ||
|
|
||
| ttl := c.resolveCacheTTL() | ||
| c.cache.Set(evt.TenantID, config, ttl) | ||
|
|
||
| c.mu.Lock() | ||
| c.knownTenants[evt.TenantID] = true | ||
| c.mu.Unlock() | ||
|
|
||
| c.ensureConsumerStarted(ctx, evt.TenantID) |
There was a problem hiding this comment.
Don't sleep inside the shared Pub/Sub dispatch path.
applyJitter runs inline here. The listener processes messages serially on one goroutine, so every jitter delay stalls unrelated tenant events behind it. Move the delay into tenant-specific async restart/reload work instead of the listener thread.
Also applies to: 201-207
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/consumer/multi_tenant_events_handlers.go` around lines
171 - 189, The handler currently calls applyJitter inline which blocks the
shared Pub/Sub dispatch goroutine; remove the direct applyJitter(ctx) call from
the event handler and instead perform jittered delay inside tenant-specific
async restart/reload logic (e.g., move the jitter into ensureConsumerStarted or
into a goroutine spawned by ensureConsumerStarted so the delay happens off the
dispatch thread). Update ensureConsumerStarted (or the new per-tenant restart
routine) to run applyJitter/ sleep before performing restart/reload, keep the
cache population (cache.Set) and knownTenants update in the handler, and apply
the same change to the other occurrences of applyJitter in this file.
| var payload event.ConnectionsUpdatedPayload | ||
| if err := json.Unmarshal(evt.Payload, &payload); err != nil { | ||
| return fmt.Errorf("handleConnectionsUpdated: unmarshal payload: %w", err) | ||
| } | ||
|
|
||
| logger.InfofCtx(ctx, "tenant.connections.updated: tenant=%s module=%s max_open=%d max_idle=%d", | ||
| evt.TenantID, payload.Module, payload.MaxOpenConns, payload.MaxIdleConns) | ||
|
|
||
| if c.postgres != nil { | ||
| // Build a TenantConfig with the updated settings for ApplyConnectionSettings | ||
| config := buildConfigFromConnectionsPayload(evt.TenantID, payload) | ||
| c.postgres.ApplyConnectionSettings(evt.TenantID, config) | ||
| } |
There was a problem hiding this comment.
Gate connection updates by service_name and module.
ConnectionsUpdatedPayload includes both fields, but this handler applies every event to the local Postgres manager whenever c.postgres != nil. A Mongo update or another service's Postgres update will retune the wrong pools. Return early unless the payload matches c.config.Service and the Postgres module.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/consumer/multi_tenant_events_handlers.go` around lines
223 - 235, The handler currently applies every ConnectionsUpdatedPayload to the
local Postgres manager when c.postgres != nil; gate this by checking
payload.ServiceName (or payload.Service) equals c.config.Service and
payload.Module equals the Postgres module string (e.g. "postgres") before
calling buildConfigFromConnectionsPayload and
c.postgres.ApplyConnectionSettings; if either check fails, return early. Use the
existing symbols ConnectionsUpdatedPayload, c.config.Service, payload.Module,
buildConfigFromConnectionsPayload and c.postgres.ApplyConnectionSettings to
locate and implement the guard.
| package consumer | ||
|
|
||
| import ( | ||
| "testing" | ||
|
|
||
| "github.com/redis/go-redis/v9" | ||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
|
|
||
| "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/internal/testutil" | ||
| ) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Missing copyright header and license comment.
This test file is missing the copyright header that other files in this PR have (e.g., multi_tenant_cache_test.go lines 1-3). Consider adding for consistency.
📝 Suggested fix
+// Copyright (c) 2026 Lerian Studio. All rights reserved.
+// Use of this source code is governed by the Elastic License 2.0
+// that can be found in the LICENSE file.
+
package consumer📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| package consumer | |
| import ( | |
| "testing" | |
| "github.com/redis/go-redis/v9" | |
| "github.com/stretchr/testify/assert" | |
| "github.com/stretchr/testify/require" | |
| "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/internal/testutil" | |
| ) | |
| // Copyright (c) 2026 Lerian Studio. All rights reserved. | |
| // Use of this source code is governed by the Elastic License 2.0 | |
| // that can be found in the LICENSE file. | |
| package consumer | |
| import ( | |
| "testing" | |
| "github.com/redis/go-redis/v9" | |
| "github.com/stretchr/testify/assert" | |
| "github.com/stretchr/testify/require" | |
| "github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/internal/testutil" | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/consumer/multi_tenant_option_test.go` around lines 1 -
11, Add the same copyright and license header comment present in other test
files (e.g., multi_tenant_cache_test.go) to the top of
multi_tenant_option_test.go so it matches project conventions; place the block
before the package declaration (package consumer) and ensure it includes the
exact copyright line and SPDX or license comment used across the repo.
| // TenantCacheTTL is the time-to-live for tenant entries in the local cache. | ||
| // When an entry expires, the next request triggers a fresh lazy-load from tenant-manager. | ||
| // Default: 12 hours. | ||
| TenantCacheTTL time.Duration |
There was a problem hiding this comment.
Validate TenantCacheTTL the same way as CacheTTL.
Negative values are accepted here and later treated as "use the default" by resolveCacheTTL(), which silently turns a bad config into a 12-hour cache. Reject < 0 in the constructor so misconfiguration fails fast.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/consumer/multi_tenant.go` around lines 83 - 86,
Validate TenantCacheTTL the same way CacheTTL is validated: in the constructor
that builds the tenant-manager consumer, check TenantCacheTTL for negative
values and return an error (rather than silently treating negatives as default
in resolveCacheTTL()); change the constructor to reject TenantCacheTTL < 0 with
a clear error message and keep resolveCacheTTL() only for normalizing valid
values, referencing TenantCacheTTL, CacheTTL and resolveCacheTTL() to locate the
code to modify.
| if redisClient == nil { | ||
| return nil, errors.New("event.NewTenantEventListener: redisClient must not be nil") | ||
| } | ||
|
|
||
| if handler == nil { | ||
| return nil, errors.New("event.NewTenantEventListener: handler must not be nil") | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find the listener.go file
find . -path "*commons/tenant-manager/event/listener.go" -type fRepository: LerianStudio/lib-commons
Length of output: 110
🏁 Script executed:
# List files in the commons/tenant-manager/event directory
ls -la commons/tenant-manager/event/ 2>/dev/null || find . -path "*commons/tenant-manager/event*" -type dRepository: LerianStudio/lib-commons
Length of output: 502
🏁 Script executed:
# Search for listener.go anywhere
fd -t f "listener.go" 2>/dev/null | grep -E "(tenant|event)" | head -20Repository: LerianStudio/lib-commons
Length of output: 108
🏁 Script executed:
# Read the listener.go file with line numbers
cat -n commons/tenant-manager/event/listener.goRepository: LerianStudio/lib-commons
Length of output: 6479
🏁 Script executed:
# Search for core.IsNilInterface usage in the codebase
rg "IsNilInterface" --type goRepository: LerianStudio/lib-commons
Length of output: 434
🏁 Script executed:
# Search for the consumer constructor mentioned in the review
rg "consumer.*constructor\|NewTenantEventListener" --type go -B 5 -A 15Repository: LerianStudio/lib-commons
Length of output: 50
🏁 Script executed:
# Find the consumer constructor that uses IsNilInterface for redisClient
rg "func New.*Consumer" --type go -A 20 | grep -A 20 "redisClient"Repository: LerianStudio/lib-commons
Length of output: 3342
🏁 Script executed:
# Look at the core.IsNilInterface function definition
cat -n commons/tenant-manager/core/errors.go | grep -A 15 "func IsNilInterface"Repository: LerianStudio/lib-commons
Length of output: 573
🏁 Script executed:
# Check consumer/multi_tenant.go to see how it uses IsNilInterface
cat -n commons/tenant-manager/consumer/multi_tenant.go | head -100Repository: LerianStudio/lib-commons
Length of output: 5005
🏁 Script executed:
# Verify the current state - check if listener.go imports or uses core.IsNilInterface
grep -n "IsNilInterface\|import.*core" commons/tenant-manager/event/listener.goRepository: LerianStudio/lib-commons
Length of output: 50
🏁 Script executed:
# Double-check by comparing the full constructor signatures in both files
grep -A 30 "func NewTenantEventListener" commons/tenant-manager/event/listener.go | head -35Repository: LerianStudio/lib-commons
Length of output: 974
🏁 Script executed:
# Look for any test cases that might pass a typed-nil to the listener constructor
grep -n "typed-nil\|var.*redis.Client.*nil" commons/tenant-manager/event/listener_test.goRepository: LerianStudio/lib-commons
Length of output: 50
Add core.IsNilInterface check to reject typed-nil Redis clients in this constructor.
redisClient == nil only catches direct nil values. It misses var rc *redis.Client = nil passed as a redis.UniversalClient interface—a typed-nil. The Start() method then calls PSubscribe on a nil receiver at line 92, causing undefined behavior. The consumer constructor in commons/tenant-manager/consumer/multi_tenant.go guards against this exact case using core.IsNilInterface(redisClient) after the nil check. This exported constructor should follow the same pattern to comply with the guideline: "Keep behavior nil-safe and concurrency-safe by default in exported APIs".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/event/listener.go` around lines 53 - 59, The
constructor for the tenant event listener currently only checks redisClient ==
nil which misses typed-nil interface values; add a guard using
core.IsNilInterface(redisClient) (after the existing nil check) and return an
error like the existing ones so typed-nil redis.UniversalClient values are
rejected; this prevents Start() from calling PSubscribe on a nil receiver and
aligns NewTenantEventListener with the consumer constructor's nil-safety
pattern.
| pubsub := l.redisClient.PSubscribe(ctx, SubscriptionPattern) | ||
|
|
||
| logger.InfofCtx(ctx, "tenant event listener subscribed to pattern: %s", SubscriptionPattern) | ||
|
|
||
| go l.listen(ctx, pubsub) | ||
|
|
||
| return nil |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name "listener.go" -path "*/event/*" | head -20Repository: LerianStudio/lib-commons
Length of output: 110
🏁 Script executed:
wc -l ./commons/tenant-manager/event/listener.goRepository: LerianStudio/lib-commons
Length of output: 114
🏁 Script executed:
cat -n ./commons/tenant-manager/event/listener.goRepository: LerianStudio/lib-commons
Length of output: 6479
🏁 Script executed:
# Check Redis PubSub documentation and understand how go-redis PSubscribe works
# Let's also see if there are any tests for this listener
find . -name "*listener*test*" -o -name "*test*listener*" | grep -E "\.go$"Repository: LerianStudio/lib-commons
Length of output: 115
🏁 Script executed:
cat -n ./commons/tenant-manager/event/listener_test.goRepository: LerianStudio/lib-commons
Length of output: 13235
🏁 Script executed:
# Check what SubscriptionPattern is and look for other event-related code
grep -n "SubscriptionPattern" ./commons/tenant-manager/event/listener.go
head -30 ./commons/tenant-manager/event/constants.go 2>/dev/null || echo "No constants.go found"Repository: LerianStudio/lib-commons
Length of output: 251
🏁 Script executed:
rg "SubscriptionPattern\s*=" ./commons/tenant-manager/event/Repository: LerianStudio/lib-commons
Length of output: 148
🏁 Script executed:
# Search for any use of Receive() on PubSub in the codebase to see if there's a pattern
rg "\.Receive\(" ./commons/tenant-manager/ -A 2 -B 2Repository: LerianStudio/lib-commons
Length of output: 50
🏁 Script executed:
# Let's check go-redis documentation behavior by looking at the listen method more carefully
# and see if there's any error signaling through the Channel
rg "pubsub\." ./commons/tenant-manager/event/listener.go -A 2Repository: LerianStudio/lib-commons
Length of output: 231
🏁 Script executed:
# Let's check the go-redis PubSub API and examples in the codebase
# to understand how subscription confirmation works
find . -path "*/vendor" -prune -o -type f -name "*.go" -exec grep -l "PSubscribe\|Subscribe" {} \; | head -10Repository: LerianStudio/lib-commons
Length of output: 1006
🏁 Script executed:
# Let's check if there are any other redis subscription examples
rg "redis\.(Subscribe|PSubscribe)" ./commons/ -A 3 -B 1Repository: LerianStudio/lib-commons
Length of output: 50
🏁 Script executed:
# Let's look at the full context of what happens when pubsub is created
# and understand the error handling in listen() more carefully
sed -n '117,141p' ./commons/tenant-manager/event/listener.goRepository: LerianStudio/lib-commons
Length of output: 638
🏁 Script executed:
# Let's verify the go-redis PubSub behavior by checking if there's documentation
# or examples in go-redis. First, let's check what version is being used
rg "redis/go-redis" . --include="*.mod" -A 2Repository: LerianStudio/lib-commons
Length of output: 509
🏁 Script executed:
cat go.mod | grep redisRepository: LerianStudio/lib-commons
Length of output: 214
Block on the subscription handshake before returning from Start().
PSubscribe() returns a *PubSub immediately without verifying subscription succeeded. If Redis is unavailable, Start() logs success and returns nil even though the subscription will fail silently in the background goroutine. Use Receive() on the PubSub to wait for the initial subscription confirmation message before returning, and return an error if subscription setup fails. This follows the coding guideline: "Do not swallow errors; return or handle errors with context."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/event/listener.go` around lines 92 - 98, The Start()
method currently calls l.redisClient.PSubscribe(ctx, SubscriptionPattern) and
immediately spawns go l.listen(ctx, pubsub) and returns nil; change this to
perform the subscription handshake by calling pubsub.Receive(ctx) (or
ReceiveWithContext) after PSubscribe and before starting the goroutine, check
and return any error from Receive (and close pubsub on failure), and only call
go l.listen(ctx, pubsub) and log success after Receive confirms the
subscription; ensure you reference PSubscribe, pubsub, Receive, listen,
SubscriptionPattern and Start() in the updated flow so failures are returned
instead of swallowed.
| if handlerErr := l.handler(ctx, *evt); handlerErr != nil { | ||
| logger.WarnfCtx(ctx, "tenant event listener: handler error for event %s (type=%s): %v", | ||
| evt.EventID, evt.EventType, handlerErr) | ||
| libOpentelemetry.HandleSpanError(span, "tenant event handler error", handlerErr) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cd commons/tenant-manager && find . -name "listener.go" -type fRepository: LerianStudio/lib-commons
Length of output: 87
🏁 Script executed:
cd commons/tenant-manager && wc -l event/listener.goRepository: LerianStudio/lib-commons
Length of output: 89
🏁 Script executed:
cd commons/tenant-manager && sed -n '150,185p' event/listener.goRepository: LerianStudio/lib-commons
Length of output: 876
🏁 Script executed:
cd commons/tenant-manager && sed -n '1,50p' event/listener.goRepository: LerianStudio/lib-commons
Length of output: 1850
🏁 Script executed:
cd commons/tenant-manager && sed -n '70,150p' event/listener.goRepository: LerianStudio/lib-commons
Length of output: 2256
🏁 Script executed:
cd commons/tenant-manager && sed -n '85,120p' event/listener.goRepository: LerianStudio/lib-commons
Length of output: 992
🏁 Script executed:
cd commons/tenant-manager && rg -n "go " event/listener.goRepository: LerianStudio/lib-commons
Length of output: 96
🏁 Script executed:
cd commons/tenant-manager && rg -n "func.*Start\|func.*Listen\|func.*Run" event/listener.go -A 10Repository: LerianStudio/lib-commons
Length of output: 50
🏁 Script executed:
cd commons/tenant-manager && sed -n '138,173p' event/listener.goRepository: LerianStudio/lib-commons
Length of output: 1284
🏁 Script executed:
cd commons/tenant-manager && rg -n "recover\|defer func" event/listener.goRepository: LerianStudio/lib-commons
Length of output: 50
🏁 Script executed:
cd commons/tenant-manager && rg -n "handleMessage" event/ -B 2 -A 15Repository: LerianStudio/lib-commons
Length of output: 1404
🏁 Script executed:
cd commons/tenant-manager && sed -n '122,141p' event/listener.goRepository: LerianStudio/lib-commons
Length of output: 394
Add panic recovery around the EventHandler invocation.
l.handler is user-supplied and runs on the listener goroutine. An unrecovered panic will crash the listener, causing the process to stop handling tenant events. Wrap the handler invocation with defer/recover(), log the panic, and keep the listener alive.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/event/listener.go` around lines 168 - 172, The call to
the user-supplied EventHandler (l.handler(ctx, *evt)) must be protected from
panics so the listener goroutine doesn't die; wrap the invocation in a small
helper closure or inline anonymous function that defers a recover() which, on
panic, captures the panic value, converts it to an error, logs it with logger
(e.g., logger.ErrorfCtx or logger.WarnfCtx) including evt.EventID/evt.EventType
and the panic value, and call libOpentelemetry.HandleSpanError(span, "tenant
event handler panic", err) so the span is marked; then proceed to call l.handler
and handle its returned error as before (logger.WarnfCtx +
libOpentelemetry.HandleSpanError) so both panics and normal errors are handled
without crashing the listener.
| // Package event defines the shared contract for tenant lifecycle events | ||
| // published by tenant-manager and consumed by downstream services. | ||
| // It provides the event envelope, typed payload structs, and channel helpers | ||
| // for Valkey Streams-based event-driven tenant discovery. | ||
| package event |
There was a problem hiding this comment.
Package doc mentions "Valkey Streams" but implementation uses Pub/Sub.
The package documentation (line 8) states "Valkey Streams-based event-driven tenant discovery" but the PR objectives and implementation use Redis/Valkey Pub/Sub (PSUBSCRIBE), not Streams. Consider updating the documentation for accuracy.
📝 Suggested fix
// Package event defines the shared contract for tenant lifecycle events
// published by tenant-manager and consumed by downstream services.
// It provides the event envelope, typed payload structs, and channel helpers
-// for Valkey Streams-based event-driven tenant discovery.
+// for Redis/Valkey Pub/Sub-based event-driven tenant discovery.
package event📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Package event defines the shared contract for tenant lifecycle events | |
| // published by tenant-manager and consumed by downstream services. | |
| // It provides the event envelope, typed payload structs, and channel helpers | |
| // for Valkey Streams-based event-driven tenant discovery. | |
| package event | |
| // Package event defines the shared contract for tenant lifecycle events | |
| // published by tenant-manager and consumed by downstream services. | |
| // It provides the event envelope, typed payload structs, and channel helpers | |
| // for Redis/Valkey Pub/Sub-based event-driven tenant discovery. | |
| package event |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/event/types.go` around lines 5 - 9, Update the package
documentation comment in package event to accurately reflect the transport used:
replace the phrase "Valkey Streams-based event-driven tenant discovery" with
wording that references Pub/Sub (e.g., "Valkey/Redis Pub/Sub (PSUBSCRIBE)-based
event-driven tenant discovery" or "Valkey Pub/Sub-based") so the package doc
matches the implementation.
| func TestSettingsWatcher_StartWorks(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| apiCalled := atomic.Int32{} | ||
|
|
||
| server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| apiCalled.Add(1) | ||
| w.Header().Set("Content-Type", "application/json") | ||
|
|
||
| resp := map[string]any{ | ||
| "id": "tenant-poll", | ||
| "tenantSlug": "poll", | ||
| "databases": map[string]any{}, | ||
| } | ||
|
|
||
| _ = json.NewEncoder(w).Encode(resp) | ||
| })) | ||
| defer server.Close() | ||
|
|
||
| tmClient := newTestClient(t, server.URL) | ||
| logger := testutil.NewCapturingLogger() | ||
|
|
||
| pgManager := tmpostgres.NewManager(tmClient, "ledger", | ||
| tmpostgres.WithModule("onboarding"), | ||
| tmpostgres.WithLogger(logger), | ||
| tmpostgres.WithTestConnections("tenant-poll"), | ||
| ) | ||
|
|
||
| w := NewSettingsWatcher(tmClient, "ledger", | ||
| WithPostgresManager(pgManager), | ||
| WithLogger(logger), | ||
| WithInterval(time.Second), | ||
| ) | ||
|
|
||
| ctx := context.Background() | ||
| ctx = libCommons.ContextWithLogger(ctx, logger) | ||
|
|
||
| w.Start(ctx) | ||
|
|
||
| // Give the immediate revalidation time to run. | ||
| time.Sleep(100 * time.Millisecond) | ||
|
|
||
| w.Stop() | ||
|
|
||
| // The watcher should have called the API at least once | ||
| // (immediate revalidation on start). | ||
| assert.GreaterOrEqual(t, apiCalled.Load(), int32(1), | ||
| "should call API for revalidation on start") | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consolidate duplicate Start/revalidation tests
Line 417 introduces coverage that substantially overlaps TestSettingsWatcher_ImmediateRevalidationOnStart (Line 479). Keeping both adds maintenance cost without meaningful extra signal.
♻️ Proposed simplification
-func TestSettingsWatcher_StartWorks(t *testing.T) {
- t.Parallel()
-
- apiCalled := atomic.Int32{}
-
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- apiCalled.Add(1)
- w.Header().Set("Content-Type", "application/json")
-
- resp := map[string]any{
- "id": "tenant-poll",
- "tenantSlug": "poll",
- "databases": map[string]any{},
- }
-
- _ = json.NewEncoder(w).Encode(resp)
- }))
- defer server.Close()
-
- tmClient := newTestClient(t, server.URL)
- logger := testutil.NewCapturingLogger()
-
- pgManager := tmpostgres.NewManager(tmClient, "ledger",
- tmpostgres.WithModule("onboarding"),
- tmpostgres.WithLogger(logger),
- tmpostgres.WithTestConnections("tenant-poll"),
- )
-
- w := NewSettingsWatcher(tmClient, "ledger",
- WithPostgresManager(pgManager),
- WithLogger(logger),
- WithInterval(time.Second),
- )
-
- ctx := context.Background()
- ctx = libCommons.ContextWithLogger(ctx, logger)
-
- w.Start(ctx)
-
- // Give the immediate revalidation time to run.
- time.Sleep(100 * time.Millisecond)
-
- w.Stop()
-
- // The watcher should have called the API at least once
- // (immediate revalidation on start).
- assert.GreaterOrEqual(t, apiCalled.Load(), int32(1),
- "should call API for revalidation on start")
-}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func TestSettingsWatcher_StartWorks(t *testing.T) { | |
| t.Parallel() | |
| apiCalled := atomic.Int32{} | |
| server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
| apiCalled.Add(1) | |
| w.Header().Set("Content-Type", "application/json") | |
| resp := map[string]any{ | |
| "id": "tenant-poll", | |
| "tenantSlug": "poll", | |
| "databases": map[string]any{}, | |
| } | |
| _ = json.NewEncoder(w).Encode(resp) | |
| })) | |
| defer server.Close() | |
| tmClient := newTestClient(t, server.URL) | |
| logger := testutil.NewCapturingLogger() | |
| pgManager := tmpostgres.NewManager(tmClient, "ledger", | |
| tmpostgres.WithModule("onboarding"), | |
| tmpostgres.WithLogger(logger), | |
| tmpostgres.WithTestConnections("tenant-poll"), | |
| ) | |
| w := NewSettingsWatcher(tmClient, "ledger", | |
| WithPostgresManager(pgManager), | |
| WithLogger(logger), | |
| WithInterval(time.Second), | |
| ) | |
| ctx := context.Background() | |
| ctx = libCommons.ContextWithLogger(ctx, logger) | |
| w.Start(ctx) | |
| // Give the immediate revalidation time to run. | |
| time.Sleep(100 * time.Millisecond) | |
| w.Stop() | |
| // The watcher should have called the API at least once | |
| // (immediate revalidation on start). | |
| assert.GreaterOrEqual(t, apiCalled.Load(), int32(1), | |
| "should call API for revalidation on start") | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/watcher/settings_watcher_test.go` around lines 417 -
465, TestSettingsWatcher_StartWorks duplicates the behavior already covered by
TestSettingsWatcher_ImmediateRevalidationOnStart; remove or merge the redundant
test to avoid overlap. Locate the TestSettingsWatcher_StartWorks function (which
constructs NewSettingsWatcher, calls Start/Stop and asserts apiCalled was >=1)
and either delete it or fold any unique assertions into
TestSettingsWatcher_ImmediateRevalidationOnStart so only one test verifies the
immediate revalidation on Start; ensure the remaining test still sets up
tmClient, NewSettingsWatcher (and WithInterval/WithPostgresManager/WithLogger),
calls Start, sleeps briefly, calls Stop, and asserts apiCalled.Load() >= 1.
gandalf-at-lerian
left a comment
There was a problem hiding this comment.
PR Review: lib-commons #397
Summary
Major refactor of MultiTenantConsumer: replaces polling-based tenant discovery with Redis Pub/Sub events + lazy loading on first request. Adds TTL-based cache (12h default), per-tenant lazy-load locks, event listener, and 12 event handlers. Removes sync loop and dead code.
🔴 Critical: JSON field name mismatch with tenant-manager
Same issue as tenant-manager #224, other side of the wire.
This PR expects snake_case JSON tags (event_id, event_type, tenant_id, tenant_slug). tenant-manager #224 publishes camelCase (eventId, eventType, tenantId, tenantSlug).
ParseEvent will succeed (no unmarshal error) but all snake_case fields will be zero-value empty strings. Every event handler will receive events with empty TenantID — closeTenantConnections(""), cache.Get(""), cache.Touch(""), etc.
Fix: Align JSON tags with tenant-manager. Use camelCase to match.
🟡 Major: handleServiceAssociated creates incomplete TenantConfig
config := &core.TenantConfig{
ID: evt.TenantID,
TenantSlug: evt.TenantSlug,
Service: payload.ServiceName,
IsolationMode: payload.IsolationMode,
}
c.cache.Set(evt.TenantID, config, ttl)This config has no connection settings (SecretPaths, ConnectionSettings). When the consumer tries to create a Postgres/RabbitMQ connection from this cached config, it will have nil connection details. The lazy-load path (loadTenant) correctly fetches full config via GetTenantConfig, but the event-driven path creates a skeleton.
Consider: should handleServiceAssociated do a lazy-load fetch instead of caching a partial config? Or ensure ensureConsumerStarted handles nil connection settings gracefully.
Same issue in handleServiceReactivated.
🟡 Major: SyncInterval and DiscoveryTimeout deprecated but not removed
These fields are marked Deprecated with "retained for backward config compatibility". This is fine for one release cycle, but they should be tracked for removal in v5. If v4 is the current major, add a TODO or issue reference.
🟢 Minor: tenantCache is solid
- Thread-safe with RWMutex
- Lazy eviction on Get (no background sweeper needed)
Touchcorrectly extends TTL on eventsTenantIDsprunes expired entries on iterationIsExpiredmethod unused currently but useful for diagnostics
🟢 Minor: Lazy-load serialization
lazyLoadLocks (sync.Map of per-tenant mutexes) prevents thundering herd on the same tenant. Double-check after lock acquisition (cache.Get recheck) is correct. Good pattern.
🟢 Minor: Event handler design
tenant.created→ no-op (lazy-load on first request) — correct per design.tenant.suspended/deleted/disassociated/purged→ close connections — correct.credentials.rotated→ close + jitter → lazy-reload on next request — correct.connections.updated→ apply settings without full reconnect — efficient.- Service-scoped events filter by
service_namein payload — correct. - Tenant-level events skip if tenant not in local cache — correct (except
created).
🟢 Positive
- Dead code fully removed:
multi_tenant_sync.godeleted (294 lines), sync loop gone. DefaultTenantCacheTTL = 12 * time.Hour— matches design exactly.- Jitter on credentials rotation uses crypto/rand — not math/rand.
- Event listener has clean Start/Stop lifecycle with done channel.
- Comprehensive test files added for cache, events, lazy-load.
- Net: +3804 / -1864. Significant new test coverage.
Verdict: Request changes
JSON tag mismatch is a hard blocker (same as tenant-manager #224 — fix on one side or both). Incomplete TenantConfig in event handlers needs a decision: fetch full config or handle nil connection settings downstream.
…enantConfig - Fix ServiceAssociatedPayload.Modules: map[string]ModuleConfig → []string - Fix SecretPaths: map[string]string → map[string]map[string]string - Fix MessagingEventConfig: nested RabbitMQ struct → flat RabbitMQSecretPath - Delete unused ModuleConfig and RabbitMQEventConfig structs - Populate ConnectionSettings, Databases, and Messaging in handleServiceAssociated - Populate ConnectionSettings and Databases in handleServiceReactivated - Update tests for new payload shapes and TenantConfig assertions X-Lerian-Ref: 0x1
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@commons/tenant-manager/consumer/multi_tenant_events_test.go`:
- Around line 474-533: The table-driven test
TestHandleLifecycleEvent_ServiceEvent_FiltersMismatchedService is missing the
credentials.rotated case; add an entry to the tests slice with name "credentials
rotated different service", eventType event.EventTenantCredentialsRotated and
payload event.CredentialsRotatedPayload{ServiceName: "other-service",
CredentialType: "database"} (using mustMarshalPayload like the other cases) so
handleLifecycleEvent is exercised for mismatched-Service credentials.rotated
events and the tenant remains in the cache.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 8b322d73-5714-4258-990d-dc146ef5abe4
📒 Files selected for processing (4)
commons/tenant-manager/consumer/multi_tenant_events_handlers.gocommons/tenant-manager/consumer/multi_tenant_events_test.gocommons/tenant-manager/event/payloads.gocommons/tenant-manager/event/types_test.go
| func TestHandleLifecycleEvent_ServiceEvent_FiltersMismatchedService(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| tests := []struct { | ||
| name string | ||
| eventType string | ||
| payload any | ||
| }{ | ||
| { | ||
| name: "disassociated different service", | ||
| eventType: event.EventTenantServiceDisassociated, | ||
| payload: event.ServiceDisassociatedPayload{ServiceName: "other-service"}, | ||
| }, | ||
| { | ||
| name: "suspended different service", | ||
| eventType: event.EventTenantServiceSuspended, | ||
| payload: event.ServiceSuspendedPayload{ServiceName: "other-service"}, | ||
| }, | ||
| { | ||
| name: "purged different service", | ||
| eventType: event.EventTenantServicePurged, | ||
| payload: event.ServicePurgedPayload{ServiceName: "other-service"}, | ||
| }, | ||
| { | ||
| name: "reactivated different service", | ||
| eventType: event.EventTenantServiceReactivated, | ||
| payload: event.ServiceReactivatedPayload{ServiceName: "other-service"}, | ||
| }, | ||
| { | ||
| name: "connections updated different service", | ||
| eventType: event.EventTenantConnectionsUpdated, | ||
| payload: event.ConnectionsUpdatedPayload{ServiceName: "other-service"}, | ||
| }, | ||
| } | ||
|
|
||
| for _, tt := range tests { | ||
| t.Run(tt.name, func(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| consumer := newEventsTestConsumer(t) | ||
| seedCacheTenant(consumer, "tenant-filter-001") | ||
| ctx := context.Background() | ||
|
|
||
| evt := event.TenantLifecycleEvent{ | ||
| EventID: "evt-filter", | ||
| EventType: tt.eventType, | ||
| TenantID: "tenant-filter-001", | ||
| Payload: mustMarshalPayload(t, tt.payload), | ||
| } | ||
|
|
||
| err := consumer.handleLifecycleEvent(ctx, evt) | ||
| require.NoError(t, err, "mismatched service event should be skipped without error") | ||
|
|
||
| // Verify tenant was NOT removed (event was for different service) | ||
| entry, ok := consumer.cache.Get("tenant-filter-001") | ||
| assert.True(t, ok, "tenant should still be in cache after mismatched service event") | ||
| assert.NotNil(t, entry, "cache entry should not be nil") | ||
| }) | ||
| } | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider adding credentials.rotated to service filtering tests.
The table-driven test covers service filtering for disassociated, suspended, purged, reactivated, and connections.updated. However, CredentialsRotatedPayload also contains a ServiceName field (per payloads.go Line 61). If credentials.rotated events should be filtered by service, add a test case to verify the behavior when ServiceName doesn't match.
Suggested addition to test table
{
name: "credentials rotated different service",
eventType: event.EventTenantCredentialsRotated,
payload: event.CredentialsRotatedPayload{ServiceName: "other-service", CredentialType: "database"},
},🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@commons/tenant-manager/consumer/multi_tenant_events_test.go` around lines 474
- 533, The table-driven test
TestHandleLifecycleEvent_ServiceEvent_FiltersMismatchedService is missing the
credentials.rotated case; add an entry to the tests slice with name "credentials
rotated different service", eventType event.EventTenantCredentialsRotated and
payload event.CredentialsRotatedPayload{ServiceName: "other-service",
CredentialType: "database"} (using mustMarshalPayload like the other cases) so
handleLifecycleEvent is exercised for mismatched-Service credentials.rotated
events and the tenant remains in the cache.
…tenant-discovery" This reverts commit 9b53b5c.
Summary
tenant-events:*, and lazy-load on first requestWhat changed
New package:
commons/tenant-manager/event/TenantLifecycleEventenvelope with 12 event type constantsTenantEventListener— Redis Pub/Sub subscriber with graceful shutdownParseEvent(),ChannelForTenant()helpersNew components in
consumer/multi_tenant_cache.go) — 12h default, lazy eviction, thread-safemulti_tenant_lazy.go) — on-demand tenant fetch with per-tenant mutexmulti_tenant_events.go+_handlers.go) — 12 handlers with service filtering and jitterBreaking changes
NewMultiTenantConsumerWithError()now requiresredisClientparameterWithRedisClient()option removed (now constructor param)syncActiveTenants(),discoverTenants(), and all polling functionsmulti_tenant_sync.go,multi_tenant_revalidate.goTest plan
-racepanic()in production codeCloses #396
🤖 Generated with Claude Code