Skip to content

feat: event-driven tenant discovery in MultiTenantConsumer#397

Merged
jeffersonrodrigues92 merged 2 commits intodevelopfrom
feat/event-driven-tenant-discovery
Mar 25, 2026
Merged

feat: event-driven tenant discovery in MultiTenantConsumer#397
jeffersonrodrigues92 merged 2 commits intodevelopfrom
feat/event-driven-tenant-discovery

Conversation

@jeffersonrodrigues92
Copy link
Contributor

Summary

  • Replace polling-based tenant discovery with Redis Pub/Sub + lazy-load
  • Consumer pods boot with empty tenant map, subscribe to tenant-events:*, and lazy-load on first request
  • All state changes arrive via Redis Pub/Sub events — polling eliminated entirely
  • 12 event types handled (tenant lifecycle + service association + credentials + connections)

What changed

New package: commons/tenant-manager/event/

  • TenantLifecycleEvent envelope with 12 event type constants
  • 7 typed payload structs for service-level events
  • TenantEventListener — Redis Pub/Sub subscriber with graceful shutdown
  • ParseEvent(), ChannelForTenant() helpers

New components in consumer/

  • TTL cache (multi_tenant_cache.go) — 12h default, lazy eviction, thread-safe
  • Lazy-load (multi_tenant_lazy.go) — on-demand tenant fetch with per-tenant mutex
  • Event dispatch (multi_tenant_events.go + _handlers.go) — 12 handlers with service filtering and jitter

Breaking changes

  • NewMultiTenantConsumerWithError() now requires redisClient parameter
  • WithRedisClient() option removed (now constructor param)
  • Deleted syncActiveTenants(), discoverTenants(), and all polling functions
  • Deleted multi_tenant_sync.go, multi_tenant_revalidate.go

Test plan

  • All 15 tenant-manager packages pass with -race
  • ~60+ new test cases covering event types, listener, cache, lazy-load, dispatch, Run/Close
  • Lint: 0 issues
  • No panic() in production code
  • Integration test with real Redis Pub/Sub (future PR)

Closes #396

🤖 Generated with Claude Code

Replace polling-based tenant discovery with Redis Pub/Sub + lazy-load.
Consumer pods boot with an empty tenant map, subscribe to tenant-events:*,
and lazy-load tenant context on first request. All subsequent state changes
arrive exclusively via Redis Pub/Sub events. Polling is eliminated entirely.

New components:
- event/types.go: TenantLifecycleEvent envelope, 12 event type constants,
  7 typed payload structs, ParseEvent() helper
- event/listener.go: TenantEventListener with Redis PSubscribe, goroutine
  lifecycle management, graceful shutdown
- consumer/multi_tenant_cache.go: TTL-aware tenant cache with lazy eviction,
  configurable TTL (default 12h), thread-safe operations
- consumer/multi_tenant_lazy.go: loadTenant() hook for on-demand tenant
  fetching from tenant-manager API with per-tenant mutex
- consumer/multi_tenant_events.go: Event-to-action dispatch with service
  filtering, jitter on pool rebuild, 12 handler methods

Breaking changes:
- NewMultiTenantConsumerWithError() now requires redisClient parameter
- Removed WithRedisClient() option (now constructor param)
- Deleted syncActiveTenants(), discoverTenants(), and all polling functions
- Deleted multi_tenant_sync.go, multi_tenant_revalidate.go

Closes #396

X-Lerian-Ref: 0x1
@coderabbitai
Copy link

coderabbitai bot commented Mar 25, 2026

Walkthrough

The PR converts the multi-tenant consumer from eager, polling-driven startup to an event-driven, lazy-loading model. A new event package and a Redis-backed TenantEventListener deliver tenant lifecycle events; the consumer now requires a Redis client at construction. Tenants are loaded on-demand via loadTenant and cached locally in a TTL-based tenantCache (default 12h / 43200s); stale entries are evicted on access. The previous discovery/sync loop and eager consumer spawning were removed (related config fields deprecated). Two environment variables were documented: MULTI_TENANT_REDIS_URL and MULTI_TENANT_CACHE_TTL_SEC.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Consumer as MultiTenantConsumer
    participant Cache as TenantCache
    participant API as Tenant-Manager API
    participant RabbitMQ

    Client->>Consumer: Request for tenant
    activate Consumer
    Consumer->>Cache: Get(tenantID)
    Cache-->>Consumer: Miss / Expired
    Note over Consumer: Acquire per-tenant lock
    Consumer->>API: GetTenantConfig(tenantID, service)
    API-->>Consumer: TenantConfig / Error
    alt Success
        Consumer->>Cache: Set(tenantID, config, TTL)
        Consumer->>Consumer: Mark known
        Consumer->>RabbitMQ: ensureConsumerStarted(tenantID)
        RabbitMQ-->>Consumer: Consumer started
        Consumer-->>Client: Success
    else Not found / Suspended / Error
        Consumer-->>Client: Error (no cache)
    end
    deactivate Consumer
Loading
sequenceDiagram
    participant Redis as Redis Pub/Sub
    participant Listener as TenantEventListener
    participant Consumer as MultiTenantConsumer
    participant Cache as TenantCache
    participant Handlers as Event Handlers
    participant Connections as Consumer Connections

    Redis->>Listener: Publish TenantLifecycleEvent
    activate Listener
    Listener->>Listener: ParseEvent(JSON)
    Listener->>Consumer: handleLifecycleEvent(event)
    deactivate Listener

    activate Consumer
    Consumer->>Consumer: Filter by service / tenant
    Consumer->>Handlers: dispatchEvent(eventType, payload)

    alt tenant.activated / tenant.updated
        Handlers->>Cache: Touch(tenantID)
        Cache->>Cache: Refresh TTL
    else tenant.suspended / tenant.deleted
        Handlers->>Cache: Delete(tenantID)
        Handlers->>Connections: closeTenantConnections(tenantID)
    else tenant.service.associated
        Handlers->>Cache: Set(tenantID, config, TTL)
        Handlers->>Consumer: ensureConsumerStarted(tenantID)
    else tenant.credentials.rotated
        Handlers->>Connections: closeTenantConnections(tenantID)
        Handlers->>Consumer: (lazy reload on next request)
    end
    Consumer-->>Consumer: Return
    deactivate Consumer
Loading
🚥 Pre-merge checks | ✅ 4
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: replacing polling-based tenant discovery with event-driven approach using Redis Pub/Sub and lazy-loading, which is the primary objective of this changeset.
Description check ✅ Passed The PR description comprehensively covers the summary, changes, breaking changes, and test plan, though it does not follow the repository's template structure with checkboxes.
Linked Issues check ✅ Passed The PR successfully implements all major objectives from issue #396: event-driven tenant discovery with Redis Pub/Sub, lazy-load mechanism, TTL-based caching, 12 event type handlers, service filtering, jitter on pool rebuilds, and removal of polling code.
Out of Scope Changes check ✅ Passed All changes are directly aligned with the linked issue #396 objectives. New files, refactored components, and removals all support the transition from polling to event-driven architecture. No unrelated modifications detected.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
commons/tenant-manager/consumer/multi_tenant.go (1)

343-353: ⚠️ Potential issue | 🔴 Critical

Stop the event listener outside c.mu.

Close() holds c.mu while calling eventListener.Stop(), and Stop() waits for the listener goroutine to exit. That goroutine dispatches handlers synchronously, and several handlers take c.mu, so Close() can deadlock against an in-flight lifecycle event. Snapshot the listener/cancel funcs under lock, unlock, then stop the listener and cancel tenants outside the mutex. As per coding guidelines, "Keep behavior nil-safe and concurrency-safe by default in exported APIs".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/consumer/multi_tenant.go` around lines 343 - 353,
Close() currently holds c.mu while calling c.eventListener.Stop(), which can
deadlock because Stop() waits for a goroutine that may call handlers which
reacquire c.mu; fix by snapshotting c.eventListener and any tenant cancel
functions (or the slice/map of cancel funcs) into local variables while holding
c.mu, set c.closed = true, then release the lock and call listener.Stop() and
invoke cancels outside the mutex; ensure you nil-check c.eventListener before
calling Stop() and clear/replace stored cancel references under the lock to
avoid races.
commons/tenant-manager/consumer/multi_tenant_test.go (1)

361-408: 🧹 Nitpick | 🔵 Trivial

This test doesn't verify the propagation it claims.

The assertions only check consumer.config.CacheTTL, so a regression that drops client.WithCacheTTL would still pass. Please assert observable client behavior or expose the effective client cache TTL through a test seam.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/consumer/multi_tenant_test.go` around lines 361 - 408,
The test TestMultiTenantConsumer_CacheTTLPropagation currently only checks
MultiTenantConfig.CacheTTL and doesn't verify client.WithCacheTTL was applied;
update the test to assert the effective TTL on the underlying client instead of
only the config: after creating consumer via NewMultiTenantConsumerWithError,
inspect consumer.pmClient (or add a small test seam accessor on the consumer to
return the effective client cache TTL) and assert it equals tt.cacheTTL, or
alternatively perform a short-lived request sequence against a mocked pmClient
that demonstrates caching behavior consistent with tt.cacheTTL; reference
consumer.pmClient, NewMultiTenantConsumerWithError, client.WithCacheTTL and
MultiTenantConfig.CacheTTL when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In @.env.reference:
- Line 416: Update the documentation comment that currently says "Passed to
consumer via: WithRedisClient(redisClient)" to reflect that WithRedisClient was
removed and a required redisClient parameter is now passed directly to
NewMultiTenantConsumerWithError(redisClient, ...); locate the line in
.env.reference referencing WithRedisClient and replace the wording to indicate
the redisClient must be provided to NewMultiTenantConsumerWithError instead,
mentioning the exact constructor name NewMultiTenantConsumerWithError and the
redisClient parameter.

In `@commons/tenant-manager/consumer/multi_tenant_events_handlers.go`:
- Around line 223-235: The handler currently applies every
ConnectionsUpdatedPayload to the local Postgres manager when c.postgres != nil;
gate this by checking payload.ServiceName (or payload.Service) equals
c.config.Service and payload.Module equals the Postgres module string (e.g.
"postgres") before calling buildConfigFromConnectionsPayload and
c.postgres.ApplyConnectionSettings; if either check fails, return early. Use the
existing symbols ConnectionsUpdatedPayload, c.config.Service, payload.Module,
buildConfigFromConnectionsPayload and c.postgres.ApplyConnectionSettings to
locate and implement the guard.
- Around line 171-189: The handler currently calls applyJitter inline which
blocks the shared Pub/Sub dispatch goroutine; remove the direct applyJitter(ctx)
call from the event handler and instead perform jittered delay inside
tenant-specific async restart/reload logic (e.g., move the jitter into
ensureConsumerStarted or into a goroutine spawned by ensureConsumerStarted so
the delay happens off the dispatch thread). Update ensureConsumerStarted (or the
new per-tenant restart routine) to run applyJitter/ sleep before performing
restart/reload, keep the cache population (cache.Set) and knownTenants update in
the handler, and apply the same change to the other occurrences of applyJitter
in this file.

In `@commons/tenant-manager/consumer/multi_tenant_option_test.go`:
- Around line 1-11: Add the same copyright and license header comment present in
other test files (e.g., multi_tenant_cache_test.go) to the top of
multi_tenant_option_test.go so it matches project conventions; place the block
before the package declaration (package consumer) and ensure it includes the
exact copyright line and SPDX or license comment used across the repo.

In `@commons/tenant-manager/consumer/multi_tenant.go`:
- Around line 83-86: Validate TenantCacheTTL the same way CacheTTL is validated:
in the constructor that builds the tenant-manager consumer, check TenantCacheTTL
for negative values and return an error (rather than silently treating negatives
as default in resolveCacheTTL()); change the constructor to reject
TenantCacheTTL < 0 with a clear error message and keep resolveCacheTTL() only
for normalizing valid values, referencing TenantCacheTTL, CacheTTL and
resolveCacheTTL() to locate the code to modify.

In `@commons/tenant-manager/event/listener.go`:
- Around line 168-172: The call to the user-supplied EventHandler
(l.handler(ctx, *evt)) must be protected from panics so the listener goroutine
doesn't die; wrap the invocation in a small helper closure or inline anonymous
function that defers a recover() which, on panic, captures the panic value,
converts it to an error, logs it with logger (e.g., logger.ErrorfCtx or
logger.WarnfCtx) including evt.EventID/evt.EventType and the panic value, and
call libOpentelemetry.HandleSpanError(span, "tenant event handler panic", err)
so the span is marked; then proceed to call l.handler and handle its returned
error as before (logger.WarnfCtx + libOpentelemetry.HandleSpanError) so both
panics and normal errors are handled without crashing the listener.
- Around line 53-59: The constructor for the tenant event listener currently
only checks redisClient == nil which misses typed-nil interface values; add a
guard using core.IsNilInterface(redisClient) (after the existing nil check) and
return an error like the existing ones so typed-nil redis.UniversalClient values
are rejected; this prevents Start() from calling PSubscribe on a nil receiver
and aligns NewTenantEventListener with the consumer constructor's nil-safety
pattern.
- Around line 92-98: The Start() method currently calls
l.redisClient.PSubscribe(ctx, SubscriptionPattern) and immediately spawns go
l.listen(ctx, pubsub) and returns nil; change this to perform the subscription
handshake by calling pubsub.Receive(ctx) (or ReceiveWithContext) after
PSubscribe and before starting the goroutine, check and return any error from
Receive (and close pubsub on failure), and only call go l.listen(ctx, pubsub)
and log success after Receive confirms the subscription; ensure you reference
PSubscribe, pubsub, Receive, listen, SubscriptionPattern and Start() in the
updated flow so failures are returned instead of swallowed.

In `@commons/tenant-manager/event/types.go`:
- Around line 5-9: Update the package documentation comment in package event to
accurately reflect the transport used: replace the phrase "Valkey Streams-based
event-driven tenant discovery" with wording that references Pub/Sub (e.g.,
"Valkey/Redis Pub/Sub (PSUBSCRIBE)-based event-driven tenant discovery" or
"Valkey Pub/Sub-based") so the package doc matches the implementation.

In `@commons/tenant-manager/watcher/settings_watcher_test.go`:
- Around line 417-465: TestSettingsWatcher_StartWorks duplicates the behavior
already covered by TestSettingsWatcher_ImmediateRevalidationOnStart; remove or
merge the redundant test to avoid overlap. Locate the
TestSettingsWatcher_StartWorks function (which constructs NewSettingsWatcher,
calls Start/Stop and asserts apiCalled was >=1) and either delete it or fold any
unique assertions into TestSettingsWatcher_ImmediateRevalidationOnStart so only
one test verifies the immediate revalidation on Start; ensure the remaining test
still sets up tmClient, NewSettingsWatcher (and
WithInterval/WithPostgresManager/WithLogger), calls Start, sleeps briefly, calls
Stop, and asserts apiCalled.Load() >= 1.

---

Outside diff comments:
In `@commons/tenant-manager/consumer/multi_tenant_test.go`:
- Around line 361-408: The test TestMultiTenantConsumer_CacheTTLPropagation
currently only checks MultiTenantConfig.CacheTTL and doesn't verify
client.WithCacheTTL was applied; update the test to assert the effective TTL on
the underlying client instead of only the config: after creating consumer via
NewMultiTenantConsumerWithError, inspect consumer.pmClient (or add a small test
seam accessor on the consumer to return the effective client cache TTL) and
assert it equals tt.cacheTTL, or alternatively perform a short-lived request
sequence against a mocked pmClient that demonstrates caching behavior consistent
with tt.cacheTTL; reference consumer.pmClient, NewMultiTenantConsumerWithError,
client.WithCacheTTL and MultiTenantConfig.CacheTTL when making the change.

In `@commons/tenant-manager/consumer/multi_tenant.go`:
- Around line 343-353: Close() currently holds c.mu while calling
c.eventListener.Stop(), which can deadlock because Stop() waits for a goroutine
that may call handlers which reacquire c.mu; fix by snapshotting c.eventListener
and any tenant cancel functions (or the slice/map of cancel funcs) into local
variables while holding c.mu, set c.closed = true, then release the lock and
call listener.Stop() and invoke cancels outside the mutex; ensure you nil-check
c.eventListener before calling Stop() and clear/replace stored cancel references
under the lock to avoid races.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 45c41ec9-a40a-4062-9b6b-4c8aa94c7a19

📥 Commits

Reviewing files that changed from the base of the PR and between 8f323c2 and 751cbfe.

📒 Files selected for processing (25)
  • .env.reference
  • commons/tenant-manager/consumer/goroutine_leak_test.go
  • commons/tenant-manager/consumer/multi_tenant.go
  • commons/tenant-manager/consumer/multi_tenant_cache.go
  • commons/tenant-manager/consumer/multi_tenant_cache_test.go
  • commons/tenant-manager/consumer/multi_tenant_consume.go
  • commons/tenant-manager/consumer/multi_tenant_ensure_test.go
  • commons/tenant-manager/consumer/multi_tenant_events.go
  • commons/tenant-manager/consumer/multi_tenant_events_handlers.go
  • commons/tenant-manager/consumer/multi_tenant_events_test.go
  • commons/tenant-manager/consumer/multi_tenant_lazy.go
  • commons/tenant-manager/consumer/multi_tenant_lazy_test.go
  • commons/tenant-manager/consumer/multi_tenant_option_test.go
  • commons/tenant-manager/consumer/multi_tenant_retry_test.go
  • commons/tenant-manager/consumer/multi_tenant_revalidate.go
  • commons/tenant-manager/consumer/multi_tenant_run_test.go
  • commons/tenant-manager/consumer/multi_tenant_sync.go
  • commons/tenant-manager/consumer/multi_tenant_sync_test.go
  • commons/tenant-manager/consumer/multi_tenant_test.go
  • commons/tenant-manager/event/listener.go
  • commons/tenant-manager/event/listener_test.go
  • commons/tenant-manager/event/payloads.go
  • commons/tenant-manager/event/types.go
  • commons/tenant-manager/event/types_test.go
  • commons/tenant-manager/watcher/settings_watcher_test.go
💤 Files with no reviewable changes (3)
  • commons/tenant-manager/consumer/multi_tenant_revalidate.go
  • commons/tenant-manager/consumer/multi_tenant_sync_test.go
  • commons/tenant-manager/consumer/multi_tenant_sync.go

# Redis/Valkey URL for event-driven tenant discovery via Pub/Sub.
# When set, the consumer subscribes to tenant-events:* and uses lazy-loading
# instead of polling. When unset, the consumer falls back to polling mode.
# Passed to consumer via: WithRedisClient(redisClient)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Documentation references removed functional option.

Line 416 states "Passed to consumer via: WithRedisClient(redisClient)" but according to the PR summary, WithRedisClient() was removed and replaced with a required redisClient parameter in NewMultiTenantConsumerWithError(). The documentation should reflect the new constructor signature.

📝 Suggested fix
-# Passed to consumer via: WithRedisClient(redisClient)
+# Passed to consumer via: NewMultiTenantConsumerWithError(..., redisClient, ...)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Passed to consumer via: WithRedisClient(redisClient)
# Passed to consumer via: NewMultiTenantConsumerWithError(..., redisClient, ...)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.env.reference at line 416, Update the documentation comment that currently
says "Passed to consumer via: WithRedisClient(redisClient)" to reflect that
WithRedisClient was removed and a required redisClient parameter is now passed
directly to NewMultiTenantConsumerWithError(redisClient, ...); locate the line
in .env.reference referencing WithRedisClient and replace the wording to
indicate the redisClient must be provided to NewMultiTenantConsumerWithError
instead, mentioning the exact constructor name NewMultiTenantConsumerWithError
and the redisClient parameter.

Comment on lines +171 to +189
logger.InfofCtx(ctx, "tenant.service.reactivated: re-adding tenant=%s with jitter", evt.TenantID)

c.applyJitter(ctx)

// Build minimal TenantConfig from payload
config := &core.TenantConfig{
ID: evt.TenantID,
TenantSlug: evt.TenantSlug,
Service: c.config.Service,
}

ttl := c.resolveCacheTTL()
c.cache.Set(evt.TenantID, config, ttl)

c.mu.Lock()
c.knownTenants[evt.TenantID] = true
c.mu.Unlock()

c.ensureConsumerStarted(ctx, evt.TenantID)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't sleep inside the shared Pub/Sub dispatch path.

applyJitter runs inline here. The listener processes messages serially on one goroutine, so every jitter delay stalls unrelated tenant events behind it. Move the delay into tenant-specific async restart/reload work instead of the listener thread.

Also applies to: 201-207

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/consumer/multi_tenant_events_handlers.go` around lines
171 - 189, The handler currently calls applyJitter inline which blocks the
shared Pub/Sub dispatch goroutine; remove the direct applyJitter(ctx) call from
the event handler and instead perform jittered delay inside tenant-specific
async restart/reload logic (e.g., move the jitter into ensureConsumerStarted or
into a goroutine spawned by ensureConsumerStarted so the delay happens off the
dispatch thread). Update ensureConsumerStarted (or the new per-tenant restart
routine) to run applyJitter/ sleep before performing restart/reload, keep the
cache population (cache.Set) and knownTenants update in the handler, and apply
the same change to the other occurrences of applyJitter in this file.

Comment on lines +223 to +235
var payload event.ConnectionsUpdatedPayload
if err := json.Unmarshal(evt.Payload, &payload); err != nil {
return fmt.Errorf("handleConnectionsUpdated: unmarshal payload: %w", err)
}

logger.InfofCtx(ctx, "tenant.connections.updated: tenant=%s module=%s max_open=%d max_idle=%d",
evt.TenantID, payload.Module, payload.MaxOpenConns, payload.MaxIdleConns)

if c.postgres != nil {
// Build a TenantConfig with the updated settings for ApplyConnectionSettings
config := buildConfigFromConnectionsPayload(evt.TenantID, payload)
c.postgres.ApplyConnectionSettings(evt.TenantID, config)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Gate connection updates by service_name and module.

ConnectionsUpdatedPayload includes both fields, but this handler applies every event to the local Postgres manager whenever c.postgres != nil. A Mongo update or another service's Postgres update will retune the wrong pools. Return early unless the payload matches c.config.Service and the Postgres module.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/consumer/multi_tenant_events_handlers.go` around lines
223 - 235, The handler currently applies every ConnectionsUpdatedPayload to the
local Postgres manager when c.postgres != nil; gate this by checking
payload.ServiceName (or payload.Service) equals c.config.Service and
payload.Module equals the Postgres module string (e.g. "postgres") before
calling buildConfigFromConnectionsPayload and
c.postgres.ApplyConnectionSettings; if either check fails, return early. Use the
existing symbols ConnectionsUpdatedPayload, c.config.Service, payload.Module,
buildConfigFromConnectionsPayload and c.postgres.ApplyConnectionSettings to
locate and implement the guard.

Comment on lines +1 to +11
package consumer

import (
"testing"

"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/internal/testutil"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Missing copyright header and license comment.

This test file is missing the copyright header that other files in this PR have (e.g., multi_tenant_cache_test.go lines 1-3). Consider adding for consistency.

📝 Suggested fix
+// Copyright (c) 2026 Lerian Studio. All rights reserved.
+// Use of this source code is governed by the Elastic License 2.0
+// that can be found in the LICENSE file.
+
 package consumer
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
package consumer
import (
"testing"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/internal/testutil"
)
// Copyright (c) 2026 Lerian Studio. All rights reserved.
// Use of this source code is governed by the Elastic License 2.0
// that can be found in the LICENSE file.
package consumer
import (
"testing"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/LerianStudio/lib-commons/v4/commons/tenant-manager/internal/testutil"
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/consumer/multi_tenant_option_test.go` around lines 1 -
11, Add the same copyright and license header comment present in other test
files (e.g., multi_tenant_cache_test.go) to the top of
multi_tenant_option_test.go so it matches project conventions; place the block
before the package declaration (package consumer) and ensure it includes the
exact copyright line and SPDX or license comment used across the repo.

Comment on lines +83 to +86
// TenantCacheTTL is the time-to-live for tenant entries in the local cache.
// When an entry expires, the next request triggers a fresh lazy-load from tenant-manager.
// Default: 12 hours.
TenantCacheTTL time.Duration
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate TenantCacheTTL the same way as CacheTTL.

Negative values are accepted here and later treated as "use the default" by resolveCacheTTL(), which silently turns a bad config into a 12-hour cache. Reject < 0 in the constructor so misconfiguration fails fast.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/consumer/multi_tenant.go` around lines 83 - 86,
Validate TenantCacheTTL the same way CacheTTL is validated: in the constructor
that builds the tenant-manager consumer, check TenantCacheTTL for negative
values and return an error (rather than silently treating negatives as default
in resolveCacheTTL()); change the constructor to reject TenantCacheTTL < 0 with
a clear error message and keep resolveCacheTTL() only for normalizing valid
values, referencing TenantCacheTTL, CacheTTL and resolveCacheTTL() to locate the
code to modify.

Comment on lines +53 to +59
if redisClient == nil {
return nil, errors.New("event.NewTenantEventListener: redisClient must not be nil")
}

if handler == nil {
return nil, errors.New("event.NewTenantEventListener: handler must not be nil")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Find the listener.go file
find . -path "*commons/tenant-manager/event/listener.go" -type f

Repository: LerianStudio/lib-commons

Length of output: 110


🏁 Script executed:

# List files in the commons/tenant-manager/event directory
ls -la commons/tenant-manager/event/ 2>/dev/null || find . -path "*commons/tenant-manager/event*" -type d

Repository: LerianStudio/lib-commons

Length of output: 502


🏁 Script executed:

# Search for listener.go anywhere
fd -t f "listener.go" 2>/dev/null | grep -E "(tenant|event)" | head -20

Repository: LerianStudio/lib-commons

Length of output: 108


🏁 Script executed:

# Read the listener.go file with line numbers
cat -n commons/tenant-manager/event/listener.go

Repository: LerianStudio/lib-commons

Length of output: 6479


🏁 Script executed:

# Search for core.IsNilInterface usage in the codebase
rg "IsNilInterface" --type go

Repository: LerianStudio/lib-commons

Length of output: 434


🏁 Script executed:

# Search for the consumer constructor mentioned in the review
rg "consumer.*constructor\|NewTenantEventListener" --type go -B 5 -A 15

Repository: LerianStudio/lib-commons

Length of output: 50


🏁 Script executed:

# Find the consumer constructor that uses IsNilInterface for redisClient
rg "func New.*Consumer" --type go -A 20 | grep -A 20 "redisClient"

Repository: LerianStudio/lib-commons

Length of output: 3342


🏁 Script executed:

# Look at the core.IsNilInterface function definition
cat -n commons/tenant-manager/core/errors.go | grep -A 15 "func IsNilInterface"

Repository: LerianStudio/lib-commons

Length of output: 573


🏁 Script executed:

# Check consumer/multi_tenant.go to see how it uses IsNilInterface
cat -n commons/tenant-manager/consumer/multi_tenant.go | head -100

Repository: LerianStudio/lib-commons

Length of output: 5005


🏁 Script executed:

# Verify the current state - check if listener.go imports or uses core.IsNilInterface
grep -n "IsNilInterface\|import.*core" commons/tenant-manager/event/listener.go

Repository: LerianStudio/lib-commons

Length of output: 50


🏁 Script executed:

# Double-check by comparing the full constructor signatures in both files
grep -A 30 "func NewTenantEventListener" commons/tenant-manager/event/listener.go | head -35

Repository: LerianStudio/lib-commons

Length of output: 974


🏁 Script executed:

# Look for any test cases that might pass a typed-nil to the listener constructor
grep -n "typed-nil\|var.*redis.Client.*nil" commons/tenant-manager/event/listener_test.go

Repository: LerianStudio/lib-commons

Length of output: 50


Add core.IsNilInterface check to reject typed-nil Redis clients in this constructor.

redisClient == nil only catches direct nil values. It misses var rc *redis.Client = nil passed as a redis.UniversalClient interface—a typed-nil. The Start() method then calls PSubscribe on a nil receiver at line 92, causing undefined behavior. The consumer constructor in commons/tenant-manager/consumer/multi_tenant.go guards against this exact case using core.IsNilInterface(redisClient) after the nil check. This exported constructor should follow the same pattern to comply with the guideline: "Keep behavior nil-safe and concurrency-safe by default in exported APIs".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/event/listener.go` around lines 53 - 59, The
constructor for the tenant event listener currently only checks redisClient ==
nil which misses typed-nil interface values; add a guard using
core.IsNilInterface(redisClient) (after the existing nil check) and return an
error like the existing ones so typed-nil redis.UniversalClient values are
rejected; this prevents Start() from calling PSubscribe on a nil receiver and
aligns NewTenantEventListener with the consumer constructor's nil-safety
pattern.

Comment on lines +92 to +98
pubsub := l.redisClient.PSubscribe(ctx, SubscriptionPattern)

logger.InfofCtx(ctx, "tenant event listener subscribed to pattern: %s", SubscriptionPattern)

go l.listen(ctx, pubsub)

return nil
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "listener.go" -path "*/event/*" | head -20

Repository: LerianStudio/lib-commons

Length of output: 110


🏁 Script executed:

wc -l ./commons/tenant-manager/event/listener.go

Repository: LerianStudio/lib-commons

Length of output: 114


🏁 Script executed:

cat -n ./commons/tenant-manager/event/listener.go

Repository: LerianStudio/lib-commons

Length of output: 6479


🏁 Script executed:

# Check Redis PubSub documentation and understand how go-redis PSubscribe works
# Let's also see if there are any tests for this listener
find . -name "*listener*test*" -o -name "*test*listener*" | grep -E "\.go$"

Repository: LerianStudio/lib-commons

Length of output: 115


🏁 Script executed:

cat -n ./commons/tenant-manager/event/listener_test.go

Repository: LerianStudio/lib-commons

Length of output: 13235


🏁 Script executed:

# Check what SubscriptionPattern is and look for other event-related code
grep -n "SubscriptionPattern" ./commons/tenant-manager/event/listener.go
head -30 ./commons/tenant-manager/event/constants.go 2>/dev/null || echo "No constants.go found"

Repository: LerianStudio/lib-commons

Length of output: 251


🏁 Script executed:

rg "SubscriptionPattern\s*=" ./commons/tenant-manager/event/

Repository: LerianStudio/lib-commons

Length of output: 148


🏁 Script executed:

# Search for any use of Receive() on PubSub in the codebase to see if there's a pattern
rg "\.Receive\(" ./commons/tenant-manager/ -A 2 -B 2

Repository: LerianStudio/lib-commons

Length of output: 50


🏁 Script executed:

# Let's check go-redis documentation behavior by looking at the listen method more carefully
# and see if there's any error signaling through the Channel
rg "pubsub\." ./commons/tenant-manager/event/listener.go -A 2

Repository: LerianStudio/lib-commons

Length of output: 231


🏁 Script executed:

# Let's check the go-redis PubSub API and examples in the codebase
# to understand how subscription confirmation works
find . -path "*/vendor" -prune -o -type f -name "*.go" -exec grep -l "PSubscribe\|Subscribe" {} \; | head -10

Repository: LerianStudio/lib-commons

Length of output: 1006


🏁 Script executed:

# Let's check if there are any other redis subscription examples
rg "redis\.(Subscribe|PSubscribe)" ./commons/ -A 3 -B 1

Repository: LerianStudio/lib-commons

Length of output: 50


🏁 Script executed:

# Let's look at the full context of what happens when pubsub is created
# and understand the error handling in listen() more carefully
sed -n '117,141p' ./commons/tenant-manager/event/listener.go

Repository: LerianStudio/lib-commons

Length of output: 638


🏁 Script executed:

# Let's verify the go-redis PubSub behavior by checking if there's documentation
# or examples in go-redis. First, let's check what version is being used
rg "redis/go-redis" . --include="*.mod" -A 2

Repository: LerianStudio/lib-commons

Length of output: 509


🏁 Script executed:

cat go.mod | grep redis

Repository: LerianStudio/lib-commons

Length of output: 214


Block on the subscription handshake before returning from Start().

PSubscribe() returns a *PubSub immediately without verifying subscription succeeded. If Redis is unavailable, Start() logs success and returns nil even though the subscription will fail silently in the background goroutine. Use Receive() on the PubSub to wait for the initial subscription confirmation message before returning, and return an error if subscription setup fails. This follows the coding guideline: "Do not swallow errors; return or handle errors with context."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/event/listener.go` around lines 92 - 98, The Start()
method currently calls l.redisClient.PSubscribe(ctx, SubscriptionPattern) and
immediately spawns go l.listen(ctx, pubsub) and returns nil; change this to
perform the subscription handshake by calling pubsub.Receive(ctx) (or
ReceiveWithContext) after PSubscribe and before starting the goroutine, check
and return any error from Receive (and close pubsub on failure), and only call
go l.listen(ctx, pubsub) and log success after Receive confirms the
subscription; ensure you reference PSubscribe, pubsub, Receive, listen,
SubscriptionPattern and Start() in the updated flow so failures are returned
instead of swallowed.

Comment on lines +168 to +172
if handlerErr := l.handler(ctx, *evt); handlerErr != nil {
logger.WarnfCtx(ctx, "tenant event listener: handler error for event %s (type=%s): %v",
evt.EventID, evt.EventType, handlerErr)
libOpentelemetry.HandleSpanError(span, "tenant event handler error", handlerErr)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

cd commons/tenant-manager && find . -name "listener.go" -type f

Repository: LerianStudio/lib-commons

Length of output: 87


🏁 Script executed:

cd commons/tenant-manager && wc -l event/listener.go

Repository: LerianStudio/lib-commons

Length of output: 89


🏁 Script executed:

cd commons/tenant-manager && sed -n '150,185p' event/listener.go

Repository: LerianStudio/lib-commons

Length of output: 876


🏁 Script executed:

cd commons/tenant-manager && sed -n '1,50p' event/listener.go

Repository: LerianStudio/lib-commons

Length of output: 1850


🏁 Script executed:

cd commons/tenant-manager && sed -n '70,150p' event/listener.go

Repository: LerianStudio/lib-commons

Length of output: 2256


🏁 Script executed:

cd commons/tenant-manager && sed -n '85,120p' event/listener.go

Repository: LerianStudio/lib-commons

Length of output: 992


🏁 Script executed:

cd commons/tenant-manager && rg -n "go " event/listener.go

Repository: LerianStudio/lib-commons

Length of output: 96


🏁 Script executed:

cd commons/tenant-manager && rg -n "func.*Start\|func.*Listen\|func.*Run" event/listener.go -A 10

Repository: LerianStudio/lib-commons

Length of output: 50


🏁 Script executed:

cd commons/tenant-manager && sed -n '138,173p' event/listener.go

Repository: LerianStudio/lib-commons

Length of output: 1284


🏁 Script executed:

cd commons/tenant-manager && rg -n "recover\|defer func" event/listener.go

Repository: LerianStudio/lib-commons

Length of output: 50


🏁 Script executed:

cd commons/tenant-manager && rg -n "handleMessage" event/ -B 2 -A 15

Repository: LerianStudio/lib-commons

Length of output: 1404


🏁 Script executed:

cd commons/tenant-manager && sed -n '122,141p' event/listener.go

Repository: LerianStudio/lib-commons

Length of output: 394


Add panic recovery around the EventHandler invocation.

l.handler is user-supplied and runs on the listener goroutine. An unrecovered panic will crash the listener, causing the process to stop handling tenant events. Wrap the handler invocation with defer/recover(), log the panic, and keep the listener alive.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/event/listener.go` around lines 168 - 172, The call to
the user-supplied EventHandler (l.handler(ctx, *evt)) must be protected from
panics so the listener goroutine doesn't die; wrap the invocation in a small
helper closure or inline anonymous function that defers a recover() which, on
panic, captures the panic value, converts it to an error, logs it with logger
(e.g., logger.ErrorfCtx or logger.WarnfCtx) including evt.EventID/evt.EventType
and the panic value, and call libOpentelemetry.HandleSpanError(span, "tenant
event handler panic", err) so the span is marked; then proceed to call l.handler
and handle its returned error as before (logger.WarnfCtx +
libOpentelemetry.HandleSpanError) so both panics and normal errors are handled
without crashing the listener.

Comment on lines +5 to +9
// Package event defines the shared contract for tenant lifecycle events
// published by tenant-manager and consumed by downstream services.
// It provides the event envelope, typed payload structs, and channel helpers
// for Valkey Streams-based event-driven tenant discovery.
package event
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Package doc mentions "Valkey Streams" but implementation uses Pub/Sub.

The package documentation (line 8) states "Valkey Streams-based event-driven tenant discovery" but the PR objectives and implementation use Redis/Valkey Pub/Sub (PSUBSCRIBE), not Streams. Consider updating the documentation for accuracy.

📝 Suggested fix
 // Package event defines the shared contract for tenant lifecycle events
 // published by tenant-manager and consumed by downstream services.
 // It provides the event envelope, typed payload structs, and channel helpers
-// for Valkey Streams-based event-driven tenant discovery.
+// for Redis/Valkey Pub/Sub-based event-driven tenant discovery.
 package event
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Package event defines the shared contract for tenant lifecycle events
// published by tenant-manager and consumed by downstream services.
// It provides the event envelope, typed payload structs, and channel helpers
// for Valkey Streams-based event-driven tenant discovery.
package event
// Package event defines the shared contract for tenant lifecycle events
// published by tenant-manager and consumed by downstream services.
// It provides the event envelope, typed payload structs, and channel helpers
// for Redis/Valkey Pub/Sub-based event-driven tenant discovery.
package event
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/event/types.go` around lines 5 - 9, Update the package
documentation comment in package event to accurately reflect the transport used:
replace the phrase "Valkey Streams-based event-driven tenant discovery" with
wording that references Pub/Sub (e.g., "Valkey/Redis Pub/Sub (PSUBSCRIBE)-based
event-driven tenant discovery" or "Valkey Pub/Sub-based") so the package doc
matches the implementation.

Comment on lines +417 to +465
func TestSettingsWatcher_StartWorks(t *testing.T) {
t.Parallel()

apiCalled := atomic.Int32{}

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
apiCalled.Add(1)
w.Header().Set("Content-Type", "application/json")

resp := map[string]any{
"id": "tenant-poll",
"tenantSlug": "poll",
"databases": map[string]any{},
}

_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()

tmClient := newTestClient(t, server.URL)
logger := testutil.NewCapturingLogger()

pgManager := tmpostgres.NewManager(tmClient, "ledger",
tmpostgres.WithModule("onboarding"),
tmpostgres.WithLogger(logger),
tmpostgres.WithTestConnections("tenant-poll"),
)

w := NewSettingsWatcher(tmClient, "ledger",
WithPostgresManager(pgManager),
WithLogger(logger),
WithInterval(time.Second),
)

ctx := context.Background()
ctx = libCommons.ContextWithLogger(ctx, logger)

w.Start(ctx)

// Give the immediate revalidation time to run.
time.Sleep(100 * time.Millisecond)

w.Stop()

// The watcher should have called the API at least once
// (immediate revalidation on start).
assert.GreaterOrEqual(t, apiCalled.Load(), int32(1),
"should call API for revalidation on start")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consolidate duplicate Start/revalidation tests

Line 417 introduces coverage that substantially overlaps TestSettingsWatcher_ImmediateRevalidationOnStart (Line 479). Keeping both adds maintenance cost without meaningful extra signal.

♻️ Proposed simplification
-func TestSettingsWatcher_StartWorks(t *testing.T) {
-	t.Parallel()
-
-	apiCalled := atomic.Int32{}
-
-	server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-		apiCalled.Add(1)
-		w.Header().Set("Content-Type", "application/json")
-
-		resp := map[string]any{
-			"id":         "tenant-poll",
-			"tenantSlug": "poll",
-			"databases":  map[string]any{},
-		}
-
-		_ = json.NewEncoder(w).Encode(resp)
-	}))
-	defer server.Close()
-
-	tmClient := newTestClient(t, server.URL)
-	logger := testutil.NewCapturingLogger()
-
-	pgManager := tmpostgres.NewManager(tmClient, "ledger",
-		tmpostgres.WithModule("onboarding"),
-		tmpostgres.WithLogger(logger),
-		tmpostgres.WithTestConnections("tenant-poll"),
-	)
-
-	w := NewSettingsWatcher(tmClient, "ledger",
-		WithPostgresManager(pgManager),
-		WithLogger(logger),
-		WithInterval(time.Second),
-	)
-
-	ctx := context.Background()
-	ctx = libCommons.ContextWithLogger(ctx, logger)
-
-	w.Start(ctx)
-
-	// Give the immediate revalidation time to run.
-	time.Sleep(100 * time.Millisecond)
-
-	w.Stop()
-
-	// The watcher should have called the API at least once
-	// (immediate revalidation on start).
-	assert.GreaterOrEqual(t, apiCalled.Load(), int32(1),
-		"should call API for revalidation on start")
-}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func TestSettingsWatcher_StartWorks(t *testing.T) {
t.Parallel()
apiCalled := atomic.Int32{}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
apiCalled.Add(1)
w.Header().Set("Content-Type", "application/json")
resp := map[string]any{
"id": "tenant-poll",
"tenantSlug": "poll",
"databases": map[string]any{},
}
_ = json.NewEncoder(w).Encode(resp)
}))
defer server.Close()
tmClient := newTestClient(t, server.URL)
logger := testutil.NewCapturingLogger()
pgManager := tmpostgres.NewManager(tmClient, "ledger",
tmpostgres.WithModule("onboarding"),
tmpostgres.WithLogger(logger),
tmpostgres.WithTestConnections("tenant-poll"),
)
w := NewSettingsWatcher(tmClient, "ledger",
WithPostgresManager(pgManager),
WithLogger(logger),
WithInterval(time.Second),
)
ctx := context.Background()
ctx = libCommons.ContextWithLogger(ctx, logger)
w.Start(ctx)
// Give the immediate revalidation time to run.
time.Sleep(100 * time.Millisecond)
w.Stop()
// The watcher should have called the API at least once
// (immediate revalidation on start).
assert.GreaterOrEqual(t, apiCalled.Load(), int32(1),
"should call API for revalidation on start")
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/watcher/settings_watcher_test.go` around lines 417 -
465, TestSettingsWatcher_StartWorks duplicates the behavior already covered by
TestSettingsWatcher_ImmediateRevalidationOnStart; remove or merge the redundant
test to avoid overlap. Locate the TestSettingsWatcher_StartWorks function (which
constructs NewSettingsWatcher, calls Start/Stop and asserts apiCalled was >=1)
and either delete it or fold any unique assertions into
TestSettingsWatcher_ImmediateRevalidationOnStart so only one test verifies the
immediate revalidation on Start; ensure the remaining test still sets up
tmClient, NewSettingsWatcher (and WithInterval/WithPostgresManager/WithLogger),
calls Start, sleeps briefly, calls Stop, and asserts apiCalled.Load() >= 1.

Copy link
Contributor

@gandalf-at-lerian gandalf-at-lerian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Review: lib-commons #397

Summary

Major refactor of MultiTenantConsumer: replaces polling-based tenant discovery with Redis Pub/Sub events + lazy loading on first request. Adds TTL-based cache (12h default), per-tenant lazy-load locks, event listener, and 12 event handlers. Removes sync loop and dead code.

🔴 Critical: JSON field name mismatch with tenant-manager

Same issue as tenant-manager #224, other side of the wire.

This PR expects snake_case JSON tags (event_id, event_type, tenant_id, tenant_slug). tenant-manager #224 publishes camelCase (eventId, eventType, tenantId, tenantSlug).

ParseEvent will succeed (no unmarshal error) but all snake_case fields will be zero-value empty strings. Every event handler will receive events with empty TenantIDcloseTenantConnections(""), cache.Get(""), cache.Touch(""), etc.

Fix: Align JSON tags with tenant-manager. Use camelCase to match.

🟡 Major: handleServiceAssociated creates incomplete TenantConfig

config := &core.TenantConfig{
    ID:            evt.TenantID,
    TenantSlug:    evt.TenantSlug,
    Service:       payload.ServiceName,
    IsolationMode: payload.IsolationMode,
}
c.cache.Set(evt.TenantID, config, ttl)

This config has no connection settings (SecretPaths, ConnectionSettings). When the consumer tries to create a Postgres/RabbitMQ connection from this cached config, it will have nil connection details. The lazy-load path (loadTenant) correctly fetches full config via GetTenantConfig, but the event-driven path creates a skeleton.

Consider: should handleServiceAssociated do a lazy-load fetch instead of caching a partial config? Or ensure ensureConsumerStarted handles nil connection settings gracefully.

Same issue in handleServiceReactivated.

🟡 Major: SyncInterval and DiscoveryTimeout deprecated but not removed

These fields are marked Deprecated with "retained for backward config compatibility". This is fine for one release cycle, but they should be tracked for removal in v5. If v4 is the current major, add a TODO or issue reference.

🟢 Minor: tenantCache is solid

  • Thread-safe with RWMutex
  • Lazy eviction on Get (no background sweeper needed)
  • Touch correctly extends TTL on events
  • TenantIDs prunes expired entries on iteration
  • IsExpired method unused currently but useful for diagnostics

🟢 Minor: Lazy-load serialization

lazyLoadLocks (sync.Map of per-tenant mutexes) prevents thundering herd on the same tenant. Double-check after lock acquisition (cache.Get recheck) is correct. Good pattern.

🟢 Minor: Event handler design

  • tenant.created → no-op (lazy-load on first request) — correct per design.
  • tenant.suspended/deleted/disassociated/purged → close connections — correct.
  • credentials.rotated → close + jitter → lazy-reload on next request — correct.
  • connections.updated → apply settings without full reconnect — efficient.
  • Service-scoped events filter by service_name in payload — correct.
  • Tenant-level events skip if tenant not in local cache — correct (except created).

🟢 Positive

  • Dead code fully removed: multi_tenant_sync.go deleted (294 lines), sync loop gone.
  • DefaultTenantCacheTTL = 12 * time.Hour — matches design exactly.
  • Jitter on credentials rotation uses crypto/rand — not math/rand.
  • Event listener has clean Start/Stop lifecycle with done channel.
  • Comprehensive test files added for cache, events, lazy-load.
  • Net: +3804 / -1864. Significant new test coverage.

Verdict: Request changes

JSON tag mismatch is a hard blocker (same as tenant-manager #224 — fix on one side or both). Incomplete TenantConfig in event handlers needs a decision: fetch full config or handle nil connection settings downstream.

…enantConfig

- Fix ServiceAssociatedPayload.Modules: map[string]ModuleConfig → []string
- Fix SecretPaths: map[string]string → map[string]map[string]string
- Fix MessagingEventConfig: nested RabbitMQ struct → flat RabbitMQSecretPath
- Delete unused ModuleConfig and RabbitMQEventConfig structs
- Populate ConnectionSettings, Databases, and Messaging in handleServiceAssociated
- Populate ConnectionSettings and Databases in handleServiceReactivated
- Update tests for new payload shapes and TenantConfig assertions

X-Lerian-Ref: 0x1
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@commons/tenant-manager/consumer/multi_tenant_events_test.go`:
- Around line 474-533: The table-driven test
TestHandleLifecycleEvent_ServiceEvent_FiltersMismatchedService is missing the
credentials.rotated case; add an entry to the tests slice with name "credentials
rotated different service", eventType event.EventTenantCredentialsRotated and
payload event.CredentialsRotatedPayload{ServiceName: "other-service",
CredentialType: "database"} (using mustMarshalPayload like the other cases) so
handleLifecycleEvent is exercised for mismatched-Service credentials.rotated
events and the tenant remains in the cache.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 8b322d73-5714-4258-990d-dc146ef5abe4

📥 Commits

Reviewing files that changed from the base of the PR and between 751cbfe and 09ecf87.

📒 Files selected for processing (4)
  • commons/tenant-manager/consumer/multi_tenant_events_handlers.go
  • commons/tenant-manager/consumer/multi_tenant_events_test.go
  • commons/tenant-manager/event/payloads.go
  • commons/tenant-manager/event/types_test.go

Comment on lines +474 to +533
func TestHandleLifecycleEvent_ServiceEvent_FiltersMismatchedService(t *testing.T) {
t.Parallel()

tests := []struct {
name string
eventType string
payload any
}{
{
name: "disassociated different service",
eventType: event.EventTenantServiceDisassociated,
payload: event.ServiceDisassociatedPayload{ServiceName: "other-service"},
},
{
name: "suspended different service",
eventType: event.EventTenantServiceSuspended,
payload: event.ServiceSuspendedPayload{ServiceName: "other-service"},
},
{
name: "purged different service",
eventType: event.EventTenantServicePurged,
payload: event.ServicePurgedPayload{ServiceName: "other-service"},
},
{
name: "reactivated different service",
eventType: event.EventTenantServiceReactivated,
payload: event.ServiceReactivatedPayload{ServiceName: "other-service"},
},
{
name: "connections updated different service",
eventType: event.EventTenantConnectionsUpdated,
payload: event.ConnectionsUpdatedPayload{ServiceName: "other-service"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

consumer := newEventsTestConsumer(t)
seedCacheTenant(consumer, "tenant-filter-001")
ctx := context.Background()

evt := event.TenantLifecycleEvent{
EventID: "evt-filter",
EventType: tt.eventType,
TenantID: "tenant-filter-001",
Payload: mustMarshalPayload(t, tt.payload),
}

err := consumer.handleLifecycleEvent(ctx, evt)
require.NoError(t, err, "mismatched service event should be skipped without error")

// Verify tenant was NOT removed (event was for different service)
entry, ok := consumer.cache.Get("tenant-filter-001")
assert.True(t, ok, "tenant should still be in cache after mismatched service event")
assert.NotNil(t, entry, "cache entry should not be nil")
})
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider adding credentials.rotated to service filtering tests.

The table-driven test covers service filtering for disassociated, suspended, purged, reactivated, and connections.updated. However, CredentialsRotatedPayload also contains a ServiceName field (per payloads.go Line 61). If credentials.rotated events should be filtered by service, add a test case to verify the behavior when ServiceName doesn't match.

Suggested addition to test table
{
    name:      "credentials rotated different service",
    eventType: event.EventTenantCredentialsRotated,
    payload:   event.CredentialsRotatedPayload{ServiceName: "other-service", CredentialType: "database"},
},
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@commons/tenant-manager/consumer/multi_tenant_events_test.go` around lines 474
- 533, The table-driven test
TestHandleLifecycleEvent_ServiceEvent_FiltersMismatchedService is missing the
credentials.rotated case; add an entry to the tests slice with name "credentials
rotated different service", eventType event.EventTenantCredentialsRotated and
payload event.CredentialsRotatedPayload{ServiceName: "other-service",
CredentialType: "database"} (using mustMarshalPayload like the other cases) so
handleLifecycleEvent is exercised for mismatched-Service credentials.rotated
events and the tenant remains in the cache.

@jeffersonrodrigues92 jeffersonrodrigues92 merged commit 7f9fac0 into develop Mar 25, 2026
2 of 3 checks passed
@jeffersonrodrigues92 jeffersonrodrigues92 deleted the feat/event-driven-tenant-discovery branch March 25, 2026 15:14
jeffersonrodrigues92 added a commit that referenced this pull request Mar 25, 2026
…enant-discovery"

This reverts commit 7f9fac0, reversing
changes made to 8f323c2.
jeffersonrodrigues92 added a commit that referenced this pull request Mar 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants