diff --git a/CHANGELOG.md b/CHANGELOG.md index a30c71f..48c7d25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/). ## [Unreleased] ### Added +- Outbox info collector ### Changed ### Deprecated ### Removed diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringHandler.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringHandler.java index 40aaefc..8bbff2d 100644 --- a/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringHandler.java +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringHandler.java @@ -1,5 +1,6 @@ package com.sap.cds.feature.console.connectivity; +import com.sap.cds.feature.console.info.Path; import com.sap.cds.feature.console.service.CommandEventContext; import com.sap.cds.feature.console.service.InfoEventContext; import com.sap.cds.feature.console.service.RemoteMonitoringService; @@ -24,8 +25,16 @@ public RemoteMonitoringHandler(RemoteMonitoringServer server) { @On private void handleInfoEvent(InfoEventContext context) { logger.debug("Handling info '{}'", context.getEvent()); - this.remoteMonitoringServer.broadcastToPath( - context.getInfoEvent().toJson(), RemoteMonitoringServer.PATH_LOGS); + + String path = context.getInfoEvent().getPath(); + + if (path.startsWith(Path.OUTBOX)) { + this.remoteMonitoringServer.broadcastToPath( + context.getInfoEvent().toJson(), RemoteMonitoringServer.PATH_TASKS); + } else { + this.remoteMonitoringServer.broadcastToPath( + context.getInfoEvent().toJson(), RemoteMonitoringServer.PATH_LOGS); + } context.setCompleted(); } @@ -36,5 +45,4 @@ private void handleDashboardCommandEvent(CommandEventContext context) { logger.debug("Handling command '{}'", context.getEvent()); context.setCompleted(); } - } diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringServer.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringServer.java index e8d1be5..3a2edee 100644 --- a/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringServer.java +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringServer.java @@ -32,6 +32,7 @@ public class RemoteMonitoringServer extends WebSocketServer { private static final Logger logger = LoggerFactory.getLogger(RemoteMonitoringServer.class); public static final String PATH_CAP_CONSOLE = "/cap-console"; public static final String PATH_LOGS = PATH_CAP_CONSOLE + "/logs"; + public static final String PATH_TASKS = PATH_CAP_CONSOLE + "/tasks"; private final Map> clientsByPaths = new ConcurrentHashMap<>(); private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -132,13 +133,14 @@ public void onStart() { } private void welcomeClient(WebSocket conn, String path) { - RemoteLogData welcomeMsg = new RemoteLogData.Builder() - .level("INFO") - .logger("system") - .thread(Thread.currentThread().getName()) - .type("welcome") - .message("Welcome to CAP console Remote Monitoring.") - .build(); + RemoteLogData welcomeMsg = + new RemoteLogData.Builder() + .level("INFO") + .logger("system") + .thread(Thread.currentThread().getName()) + .type("welcome") + .message("Welcome to CAP console Remote Monitoring.") + .build(); InfoEvent infoEvent = InfoEvent.createRemoteLog(path, welcomeMsg); conn.send(infoEvent.toJson()); diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/InfoCollector.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/InfoCollector.java index 3497378..6367c90 100644 --- a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/InfoCollector.java +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/InfoCollector.java @@ -52,7 +52,8 @@ protected void emitInfoEvent(Supplier infoProducer) { } getRemoteMonitoringService().emit(event); } catch (Exception e) { - sendErrorNotification("Data Access Error", e.getMessage()); + String errorMessage = e.getMessage() != null ? e.getMessage() : e.getClass().getName(); + sendErrorNotification("Data Access Error", errorMessage); logger.error("Could not emit remote-monitoring info event!", e); } } @@ -72,33 +73,44 @@ public void sendSuccessNotification(String header, String notification, Object.. } public void sendErrorNotification(String header, String notification, Object... args) { - RemoteLogData logData = new RemoteLogData.Builder() - .type(header) - .logger("system") - .thread(Thread.currentThread().getName()) - .level("error") - .message(String.format(notification, args)) - .ts(System.currentTimeMillis()) - .build(); + RemoteLogData logData = + new RemoteLogData.Builder() + .type(header) + .logger("system") + .thread(Thread.currentThread().getName()) + .level("error") + .message(String.format(notification, args)) + .ts(System.currentTimeMillis()) + .build(); InfoEvent event = InfoEvent.createRemoteLog(Path.CONSOLE_NOTIFICATION, logData); getRemoteMonitoringService().emit(event); } public void sendNotification(NotificationType type, String notification, Object... args) { - RemoteLogData logData = new RemoteLogData.Builder() - .type(type.name()) - .logger("system") - .thread(Thread.currentThread().getName()) - .level("info") - .message(String.format(notification, args)) - .ts(System.currentTimeMillis()) - .build(); + RemoteLogData logData = + new RemoteLogData.Builder() + .type(type.name()) + .logger("system") + .thread(Thread.currentThread().getName()) + .level("info") + .message(String.format(notification, args)) + .ts(System.currentTimeMillis()) + .build(); InfoEvent event = InfoEvent.createRemoteLog(Path.CONSOLE_NOTIFICATION, logData); getRemoteMonitoringService().emit(event); } + public static void inRemoteMonitoringContext(Runnable action) { + REMOTE_MONITORING_EVENT.set(true); + try { + action.run(); + } finally { + REMOTE_MONITORING_EVENT.set(false); + } + } + public static boolean isInRemoteMonitoringContext() { return REMOTE_MONITORING_EVENT.get(); } diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/Path.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/Path.java index 5741469..5da120a 100644 --- a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/Path.java +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/Path.java @@ -2,9 +2,7 @@ public final class Path { - private Path() { - - } + private Path() {} public static final String CONSOLE = "console"; public static final String REMOTE_MONITORING = "remote-monitoring"; @@ -20,4 +18,6 @@ private Path() { public static final String TRACES_OUTPUT = TRACES + ".output"; public static final String TRACES_EVENTS = TRACES + ".events"; + public static final String OUTBOX = "outbox"; + public static final String OUTBOX_TENANTS = OUTBOX + ".tenants"; } diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollector.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollector.java new file mode 100644 index 0000000..d976504 --- /dev/null +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollector.java @@ -0,0 +1,434 @@ +package com.sap.cds.feature.console.info.collectors; + +import static com.sap.cds.feature.console.service.RemoteMonitoringConfiguration.COMMAND_ATTACHED; + +import com.sap.cds.Result; +import com.sap.cds.Row; +import com.sap.cds.feature.console.info.InfoCollector; +import com.sap.cds.feature.console.info.Path; +import com.sap.cds.feature.console.service.CommandEventContext; +import com.sap.cds.feature.console.service.InfoEvent; +import com.sap.cds.feature.console.service.OutboxConfig; +import com.sap.cds.feature.console.service.RemoteMonitoringService; +import com.sap.cds.ql.Delete; +import com.sap.cds.ql.Insert; +import com.sap.cds.ql.Select; +import com.sap.cds.ql.Update; +import com.sap.cds.ql.cqn.CqnAnalyzer; +import com.sap.cds.ql.cqn.CqnSelect; +import com.sap.cds.services.EventContext; +import com.sap.cds.services.cds.CdsDeleteEventContext; +import com.sap.cds.services.cds.CqnService; +import com.sap.cds.services.changeset.ChangeSetListener; +import com.sap.cds.services.environment.CdsProperties.Outbox.OutboxServiceConfig; +import com.sap.cds.services.handler.EventHandler; +import com.sap.cds.services.handler.annotations.After; +import com.sap.cds.services.handler.annotations.Before; +import com.sap.cds.services.handler.annotations.ServiceName; +import com.sap.cds.services.impl.outbox.Messages; +import com.sap.cds.services.impl.outbox.Messages_; +import com.sap.cds.services.impl.outbox.persistence.PersistentOutbox; +import com.sap.cds.services.impl.outbox.persistence.collectors.PartitionCollector; +import com.sap.cds.services.messaging.utils.CloudEventUtils; +import com.sap.cds.services.mt.DeploymentService; +import com.sap.cds.services.mt.SubscribeEventContext; +import com.sap.cds.services.mt.TenantProviderService; +import com.sap.cds.services.persistence.PersistenceService; +import com.sap.cds.services.runtime.CdsRuntime; +import com.sap.cds.services.utils.outbox.OutboxUtils; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.slf4j.LoggerFactory; + +@ServiceName(RemoteMonitoringService.DEFAULT_NAME) +public class OutboxInfoCollector extends InfoCollector implements EventHandler { + + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(InfoCollector.class); + + public static final String TYPE = "outbox"; + + public static final String COMMAND_RESET = TYPE + "/reset"; + public static final String COMMAND_REMOVE = TYPE + "/remove"; + public static final String COMMAND_REPLAY = TYPE + "/replay"; + public static final String COMMAND_REMOVE_HISTORY = TYPE + "/remove-history"; + public static final String COMMAND_START_COLLECTOR = TYPE + "/start-collector"; + public static final String COMMAND_STOP_COLLECTOR = TYPE + "/stop-collector"; + + private static final int MAX_HISTORY = 25; + + private PersistenceService persistenceService; + private boolean isPersistentOutboxEnabled; + + private Map> lastSeenEntries = new HashMap<>(); + + public OutboxInfoCollector(CdsRuntime runtime, RemoteMonitoringService remoteMonitoringService) { + super(runtime, remoteMonitoringService); + persistenceService = + runtime + .getServiceCatalog() + .getService(PersistenceService.class, PersistenceService.DEFAULT_NAME); + isPersistentOutboxEnabled = + runtime.getServiceCatalog().getServices(PersistentOutbox.class).count() > 0; + } + + private TenantProviderService getTenantService() { + return getRuntime() + .getServiceCatalog() + .getService(TenantProviderService.class, TenantProviderService.DEFAULT_NAME); + } + + @After(event = COMMAND_ATTACHED) + private void capConsoleAttached(CommandEventContext context) { + if (!isPersistentOutboxEnabled) { + return; + } + + List outBoxConfigs = + context + .getServiceCatalog() + .getServices(PersistentOutbox.class) + .map( + box -> { + OutboxServiceConfig config = + context + .getCdsRuntime() + .getEnvironment() + .getCdsProperties() + .getOutbox() + .getService(box.getName()); + return OutboxConfig.fromServiceConfig(config, box.getName()); + }) + .collect(Collectors.toList()); + + InfoEvent outboxConfigsEvent = InfoEvent.create(Path.OUTBOX, Map.of("outboxes", outBoxConfigs)); + emitInfoEvent(() -> outboxConfigsEvent); + + if (isMultitenancyEnabled()) { + List tenants = getTenantService().readTenants(); + tenants.forEach(tenant -> emitInfoEvent(() -> getTenantOutboxes(tenant))); + } else { + emitInfoEvent(() -> getTenantOutboxes(null)); + } + } + + private boolean isMultitenancyEnabled() { + return getRuntime() + .getServiceCatalog() + .getService(TenantProviderService.class, TenantProviderService.DEFAULT_NAME) + != null; + } + + private String getTenant(CommandEventContext context) { + if (isMultitenancyEnabled()) { + return (String) context.getData().get("tenant"); + } + return null; + } + + @After(event = COMMAND_RESET) + private void resetEntry(CommandEventContext context) { + sendInfoNotification("Outbox Entry Reset", "Resetting the outbox entry..."); + String id = (String) context.getData().get("id"); + String tenant = getTenant(context); + String target = (String) context.getData().get("target"); + getRuntime() + .requestContext() + .systemUser(tenant) + .run( + r -> { + getRuntime() + .changeSetContext() + .run( + ch -> { + if (target != null) { + ch.register( + new ChangeSetListener() { + @Override + public void afterClose(boolean completed) { + scheduleOutbox(context.getCdsRuntime(), target); + } + }); + } + try { + InfoCollector.inRemoteMonitoringContext( + () -> + persistenceService.run( + Update.entity(Messages_.class) + .data(Messages.ATTEMPTS, 0) + .where(m -> m.ID().eq(id)))); + sendSuccessNotification( + "Outbox Entry Reset", "Outbox entry successfully reset!"); + } catch (Throwable th) { + sendErrorNotification( + "Error Outbox Entry Reset", + "Entry could not be reset '%s'", + th.getMessage()); + } + }); + }); + } + + @After(event = COMMAND_REMOVE) + private void removeEntry(CommandEventContext context) { + sendInfoNotification("Outbox Entry Remove", "Removing the outbox entry..."); + String id = (String) context.getData().get("id"); + String tenant = getTenant(context); + getRuntime() + .requestContext() + .systemUser(tenant) + .run( + r -> { + getRuntime() + .changeSetContext() + .run( + ch -> { + try { + InfoCollector.inRemoteMonitoringContext( + () -> + persistenceService.run( + Delete.from(Messages_.class).where(e -> e.ID().eq(id)))); + sendSuccessNotification( + "Outbox Entry Remove", "Outbox entry successfully removed!"); + } catch (Throwable th) { + sendErrorNotification( + "Error Outbox Entry Remove", + "Entry could not be removed '%s'", + th.getMessage()); + } + }); + }); + } + + @After(event = COMMAND_REPLAY) + private void replayEntry(CommandEventContext context) { + sendInfoNotification("Outbox Entry Replay", "Replaying the outbox entry..."); + String id = (String) context.getData().get("id"); + String tenant = getTenant(context); + synchronized (lastSeenEntries) { + if (lastSeenEntries.containsKey(tenant)) { + List removedFromHistory = new ArrayList<>(); + lastSeenEntries.get(tenant).stream() + .map(e -> ((Row) e).as(Messages.class)) + .filter(msg -> msg.getId().equals(id)) + .forEach( + msg -> { + getRuntime() + .requestContext() + .systemUser(tenant) + .run( + req -> { + getRuntime() + .changeSetContext() + .run( + ch -> { + try { + + Messages newMsg = Messages.create(); + newMsg.setMsg(msg.getMsg()); + newMsg.setTarget(msg.getTarget()); + newMsg.setTimestamp(Instant.now()); + + InfoCollector.inRemoteMonitoringContext( + () -> + persistenceService.run( + Insert.into(Messages_.class).entry(newMsg))); + removedFromHistory.add(msg.getId()); + scheduleOutbox(context.getCdsRuntime(), msg.getTarget()); + sendSuccessNotification( + "Outbox Entry Replayed", + "Outbox entry successfully replayed!"); + + } catch (Throwable th) { + sendErrorNotification( + "Error Outbox Entry Replay", + "Entry could not be replayed '%s'", + th.getMessage()); + } + }); + }); + }); + removedFromHistory.forEach( + msg -> lastSeenEntries.get(tenant).removeIf(entry -> entry.equals(msg))); + emitInfoEvent(() -> getTenantOutboxes(tenant)); + } + } + } + + @After(event = COMMAND_REMOVE_HISTORY) + private void removeHistoryEntry(CommandEventContext context) { + sendInfoNotification( + "Outbox Entry Remove History", "Removing the outbox entry from history..."); + String id = (String) context.getData().get("id"); + String tenant = getTenant(context); + synchronized (lastSeenEntries) { + if (lastSeenEntries.containsKey(tenant)) { + lastSeenEntries + .get(tenant) + .removeIf(entry -> ((Row) entry).as(Messages.class).getId().equals(id)); + emitInfoEvent(() -> getTenantOutboxes(tenant)); + } + } + } + + @Before( + service = PersistenceService.DEFAULT_NAME, + entity = OutboxUtils.OUTBOX_MODEL, + event = CqnService.EVENT_DELETE) + private void outboxEventDelete(CdsDeleteEventContext context) { + String tenant = context.getUserInfo().getTenant(); + getRuntime() + .requestContext() + .systemUser(tenant) + .run( + req -> { + getRuntime() + .changeSetContext() + .run( + ch -> { + CqnAnalyzer analyzer = CqnAnalyzer.create(context.getModel()); + String id = + (String) + analyzer.analyze(context.getCqn()).targetKeys().get(Messages.ID); + CqnSelect select = Select.from(Messages_.class).where(e -> e.ID().eq(id)); + InfoCollector.inRemoteMonitoringContext( + () -> { + persistenceService + .run(select) + .forEach( + row -> { + synchronized (lastSeenEntries) { + if (lastSeenEntries.get(tenant) == null) { + lastSeenEntries.put(tenant, new ArrayList<>()); + } + List history = lastSeenEntries.get(tenant); + if (!history.stream() + .anyMatch( + o -> + ((Row) o) + .as(Messages.class) + .getMsg() + .equals( + row.as(Messages.class).getMsg()) + && ((Row) o) + .as(Messages.class) + .getTarget() + .equals( + row.as(Messages.class) + .getTarget()))) { + updateOutboxEntry(row); + history.add(row); + if (history.size() > MAX_HISTORY) { + history.remove(0); + } + } + } + }); + }); + }); + }); + } + + @After(service = PersistenceService.DEFAULT_NAME, entity = OutboxUtils.OUTBOX_MODEL, event = "*") + private void outboxEvents(EventContext context) { + String event = context.getEvent(); + if (!event.equals(CqnService.EVENT_READ)) { + String tenant = context.getUserInfo().getTenant(); + context + .getChangeSetContext() + .register( + new ChangeSetListener() { + + @Override + public void afterClose(boolean completed) { + emitInfoEvent(() -> getTenantOutboxes(tenant)); + } + }); + } + } + + @After(service = DeploymentService.DEFAULT_NAME) + private void tenantSubscribed(SubscribeEventContext context) { + getTenantService() + .readTenants() + .forEach(tenant -> emitInfoEvent(() -> getTenantOutboxes(tenant))); + } + + private InfoEvent getTenantOutboxes(String tenant) { + InfoEvent event = InfoEvent.create(Path.OUTBOX_TENANTS + '.' + tenant); + if (isPersistentOutboxEnabled) { + getRuntime() + .requestContext() + .systemUser(tenant) + .run( + r -> { + getRuntime() + .changeSetContext() + .run( + ch -> { + InfoCollector.inRemoteMonitoringContext( + () -> { + CqnSelect select = + Select.from(Messages_.class).orderBy(e -> e.timestamp().asc()); + Result res = persistenceService.run(select); + res.forEach(entry -> updateOutboxEntry(entry)); + event.getData().put("entries", res.list()); + }); + }); + }); + + // Add history + List reversedList; + synchronized (lastSeenEntries) { + reversedList = + lastSeenEntries.get(tenant) != null + ? new ArrayList<>(lastSeenEntries.get(tenant)) + : new ArrayList<>(); + } + Collections.reverse(reversedList); + event.getData().put("history", reversedList); + } + + return event; + } + + private void updateOutboxEntry(Row row) { + Map data = row; + data.put("jsonMsg", CloudEventUtils.toMap((String) data.get("msg"))); + } + + private void scheduleOutbox(CdsRuntime runtime, String target) { + if (isPersistentOutboxEnabled) { + runtime + .getServiceCatalog() + .getServices(PersistentOutbox.class) + .filter(s -> s.getName().endsWith(target)) + .forEach(this::scheduleCollector); + } + } + + private void scheduleCollector(PersistentOutbox outbox) { + try { + Field collector = outbox.getClass().getDeclaredField("collector"); + collector.setAccessible(true); + Object collectorInstance = collector.get(outbox); + + // Check if it's a PartitionCollector (which has unpause method) + if (collectorInstance instanceof PartitionCollector) { + Method unpause = PartitionCollector.class.getDeclaredMethod("unpause"); + unpause.setAccessible(true); + unpause.invoke(collectorInstance); + } + // For TaskBasedCollector, no manual scheduling is needed as it's task-based + // and will automatically pick up new messages through its scheduled tasks + } catch (Exception e) { + logger.error("Cannot schedule the collector for the outbox {}", outbox.getName(), e); + } + } +} diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/service/OutboxConfig.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/service/OutboxConfig.java new file mode 100644 index 0000000..991c4c0 --- /dev/null +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/service/OutboxConfig.java @@ -0,0 +1,37 @@ +package com.sap.cds.feature.console.service; + +import com.sap.cds.services.environment.CdsProperties.Outbox.OutboxServiceConfig; +import java.util.Map; + +/** + * Class representing outbox configuration data for serialization to the client. Extracts only + * serializable properties from OutboxServiceConfig. + */ +public final class OutboxConfig { + private final String name; + private final int maxAttempts; + private final boolean ordered; + private final boolean enabled; + + private OutboxConfig(String name, int maxAttempts, boolean ordered, boolean enabled) { + this.name = name; + this.maxAttempts = maxAttempts; + this.ordered = ordered; + this.enabled = enabled; + } + + /** + * Creates an OutboxConfig from a CAP OutboxServiceConfig and outbox name. + * + * @param config the OutboxServiceConfig from CAP framework + * @param name the name of the outbox service + * @return OutboxConfig instance + */ + public static OutboxConfig fromServiceConfig(OutboxServiceConfig config, String name) { + return new OutboxConfig(name, config.getMaxAttempts(), config.isOrdered(), config.isEnabled()); + } + + public Map toMap() { + return Map.of("name", name, "maxAttempts", maxAttempts, "ordered", ordered, "enabled", enabled); + } +} diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/service/RemoteMonitoringConfiguration.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/service/RemoteMonitoringConfiguration.java index dfcfa6d..342ccd6 100644 --- a/cds-feature-console/src/main/java/com/sap/cds/feature/console/service/RemoteMonitoringConfiguration.java +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/service/RemoteMonitoringConfiguration.java @@ -2,6 +2,7 @@ import com.sap.cds.feature.console.connectivity.RemoteMonitoringHandler; import com.sap.cds.feature.console.info.collectors.LogCollector; +import com.sap.cds.feature.console.info.collectors.OutboxInfoCollector; import com.sap.cds.services.runtime.CdsRuntimeConfiguration; import com.sap.cds.services.runtime.CdsRuntimeConfigurer; @@ -26,9 +27,12 @@ public void services(CdsRuntimeConfigurer configurer) { @Override public void eventHandlers(CdsRuntimeConfigurer configurer) { if (remoteMonitoringService != null) { - configurer.eventHandler(new RemoteMonitoringHandler(remoteMonitoringService.getRemoteMonitoringServer())); - configurer.eventHandler(new LogCollector(configurer.getCdsRuntime(), remoteMonitoringService)); + configurer.eventHandler( + new RemoteMonitoringHandler(remoteMonitoringService.getRemoteMonitoringServer())); + configurer.eventHandler( + new LogCollector(configurer.getCdsRuntime(), remoteMonitoringService)); + configurer.eventHandler( + new OutboxInfoCollector(configurer.getCdsRuntime(), remoteMonitoringService)); } } - } diff --git a/cds-feature-console/src/test/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringHandlerTest.java b/cds-feature-console/src/test/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringHandlerTest.java new file mode 100644 index 0000000..50edb3a --- /dev/null +++ b/cds-feature-console/src/test/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringHandlerTest.java @@ -0,0 +1,152 @@ +package com.sap.cds.feature.console.connectivity; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.sap.cds.feature.console.info.Path; +import com.sap.cds.feature.console.service.InfoEvent; +import com.sap.cds.feature.console.service.RemoteLogData; +import com.sap.cds.feature.console.service.RemoteMonitoringService; +import com.sap.cds.services.runtime.CdsRuntime; +import com.sap.cds.services.runtime.CdsRuntimeConfigurer; +import java.net.URI; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class RemoteMonitoringHandlerTest { + + private RemoteMonitoringService remoteMonitoringService; + private TestWebSocketClient logsClient; + private TestWebSocketClient tasksClient; + + @BeforeAll + void setupAll() throws Exception { + CdsRuntime runtime = CdsRuntimeConfigurer.create() + .serviceConfigurations() + .eventHandlerConfigurations() + .complete(); + this.remoteMonitoringService = runtime + .getServiceCatalog() + .getService(RemoteMonitoringService.class, RemoteMonitoringService.DEFAULT_NAME); + + int port = remoteMonitoringService.getRemoteMonitoringServer().getPort(); + + logsClient = new TestWebSocketClient("ws://localhost:" + port + RemoteMonitoringServer.PATH_LOGS); + logsClient.connectBlocking(); + assertTrue(logsClient.isOpen(), "Logs WebSocket client should be open"); + logsClient.awaitMessage(2, TimeUnit.SECONDS); + + tasksClient = new TestWebSocketClient("ws://localhost:" + port + RemoteMonitoringServer.PATH_TASKS); + tasksClient.connectBlocking(); + assertTrue(tasksClient.isOpen(), "Tasks WebSocket client should be open"); + tasksClient.awaitMessage(2, TimeUnit.SECONDS); + } + + @AfterAll + void tearDownAll() throws Exception { + if (logsClient != null) { + logsClient.close(); + } + if (tasksClient != null) { + tasksClient.close(); + } + if (remoteMonitoringService != null && remoteMonitoringService.getRemoteMonitoringServer() != null) { + remoteMonitoringService.getRemoteMonitoringServer().stop(); + } + } + + @BeforeEach + void beforeEach() { + logsClient.resetLatch(); + tasksClient.resetLatch(); + } + + @Test + void outboxEventIsRoutedToTasksPath() throws Exception { + InfoEvent outboxEvent = InfoEvent.create(Path.OUTBOX_TENANTS + ".t1", Map.of("entries", "[]")); + + remoteMonitoringService.emit(outboxEvent); + + String tasksMsg = tasksClient.awaitMessage(2, TimeUnit.SECONDS); + assertNotNull(tasksMsg, "Tasks client should receive outbox event"); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode root = mapper.readTree(tasksMsg); + assertTrue(root.path("path").asText().startsWith(Path.OUTBOX)); + + String logsMsg = logsClient.awaitMessage(500, TimeUnit.MILLISECONDS); + assertNull(logsMsg, "Logs client should NOT receive outbox event"); + } + + @Test + void nonOutboxEventIsRoutedToLogsPath() throws Exception { + RemoteLogData logData = new RemoteLogData.Builder() + .level("INFO") + .logger("test.logger") + .thread("test-thread") + .type("log") + .message("Test log message") + .ts(System.currentTimeMillis()) + .build(); + InfoEvent logEvent = InfoEvent.createRemoteLog("test.path", logData); + + remoteMonitoringService.emit(logEvent); + + String logsMsg = logsClient.awaitMessage(2, TimeUnit.SECONDS); + assertNotNull(logsMsg, "Logs client should receive non-outbox event"); + + ObjectMapper mapper = new ObjectMapper(); + JsonNode root = mapper.readTree(logsMsg); + assertEquals("test.path", root.path("path").asText()); + + String tasksMsg = tasksClient.awaitMessage(500, TimeUnit.MILLISECONDS); + assertNull(tasksMsg, "Tasks client should NOT receive non-outbox event"); + } + + static class TestWebSocketClient extends WebSocketClient { + private CountDownLatch latch = new CountDownLatch(1); + private String message; + + TestWebSocketClient(String uri) throws Exception { + super(new URI(uri)); + } + + @Override + public void onOpen(ServerHandshake handshakedata) {} + + @Override + public void onMessage(String message) { + this.message = message; + latch.countDown(); + } + + @Override + public void onClose(int code, String reason, boolean remote) {} + + @Override + public void onError(Exception ex) {} + + String awaitMessage(long timeout, TimeUnit unit) throws InterruptedException { + boolean received = latch.await(timeout, unit); + return received ? message : null; + } + + public void resetLatch() { + latch = new CountDownLatch(1); + message = null; + } + } +} diff --git a/cds-feature-console/src/test/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringTest.java b/cds-feature-console/src/test/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringTest.java index 6002762..685f453 100644 --- a/cds-feature-console/src/test/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringTest.java +++ b/cds-feature-console/src/test/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringTest.java @@ -28,7 +28,6 @@ @TestInstance(TestInstance.Lifecycle.PER_CLASS) class RemoteMonitoringTest { - private static final int PORT = 54953; // must match RemoteMonitoringConfiguration private RemoteMonitoringService remoteMonitoringService; private TestWebSocketClient client; @@ -42,7 +41,8 @@ void setupAll() throws Exception { .getServiceCatalog() .getService(RemoteMonitoringService.class, RemoteMonitoringService.DEFAULT_NAME); - client = new TestWebSocketClient("ws://localhost:" + PORT + "/cap-console/logs"); + int port = remoteMonitoringService.getRemoteMonitoringServer().getPort(); + client = new TestWebSocketClient("ws://localhost:" + port + "/cap-console/logs"); client.connectBlocking(); assertTrue(client.isOpen(), "WebSocket client should be open"); diff --git a/cds-feature-console/src/test/java/com/sap/cds/feature/console/info/InfoCollectorTest.java b/cds-feature-console/src/test/java/com/sap/cds/feature/console/info/InfoCollectorTest.java new file mode 100644 index 0000000..5a8fdf4 --- /dev/null +++ b/cds-feature-console/src/test/java/com/sap/cds/feature/console/info/InfoCollectorTest.java @@ -0,0 +1,138 @@ +package com.sap.cds.feature.console.info; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import com.sap.cds.feature.console.service.InfoEvent; +import com.sap.cds.feature.console.service.RemoteMonitoringService; +import com.sap.cds.services.runtime.CdsRuntime; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; + +class InfoCollectorTest { + + private RemoteMonitoringService mockService; + private TestInfoCollector collector; + + @BeforeEach + void setUp() { + mockService = mock(RemoteMonitoringService.class); + CdsRuntime mockRuntime = mock(CdsRuntime.class); + collector = new TestInfoCollector(mockRuntime, mockService); + } + + @AfterEach + void cleanup() { + InfoCollector.REMOTE_MONITORING_EVENT.remove(); + } + + @Test + void inRemoteMonitoringContextSetsFlag() { + assertFalse(InfoCollector.isInRemoteMonitoringContext()); + + AtomicBoolean flagDuringExecution = new AtomicBoolean(false); + InfoCollector.inRemoteMonitoringContext( + () -> flagDuringExecution.set(InfoCollector.isInRemoteMonitoringContext())); + + assertTrue(flagDuringExecution.get(), "Flag should be true during execution"); + assertFalse(InfoCollector.isInRemoteMonitoringContext(), "Flag should be reset after execution"); + } + + @Test + void inRemoteMonitoringContextResetsAfterException() { + assertThrows(RuntimeException.class, () -> + InfoCollector.inRemoteMonitoringContext(() -> { + throw new RuntimeException("test error"); + })); + + assertFalse(InfoCollector.isInRemoteMonitoringContext(), + "Flag should be reset even after exception"); + } + + @Test + void emitInfoEventEmitsEvent() { + InfoEvent event = InfoEvent.create("test.path", Map.of("message", "hello")); + + collector.emitInfoEvent(() -> event); + + verify(mockService).emit(event); + } + + static Stream missingMessageData() { + Map emptyMsg = new HashMap<>(Map.of("message", "")); + Map nullMsg = new HashMap<>(); + nullMsg.put("message", null); + Map absentMsg = new HashMap<>(); + return Stream.of( + Arguments.of("empty string", emptyMsg), + Arguments.of("null value", nullMsg), + Arguments.of("absent key", absentMsg)); + } + + @ParameterizedTest(name = "message={0}") + @MethodSource("missingMessageData") + void emitInfoEventReplacesMissingMessageWithDash(String desc, Map data) { + InfoEvent event = InfoEvent.create("test.path", data); + + collector.emitInfoEvent(() -> event); + + ArgumentCaptor captor = ArgumentCaptor.forClass(InfoEvent.class); + verify(mockService).emit(captor.capture()); + assertEquals("-", captor.getValue().getData().get("message")); + } + + @Test + void emitInfoEventHandlesSupplierException() { + collector.emitInfoEvent(() -> { + throw new RuntimeException((String) null); + }); + + verify(mockService).emit(any(InfoEvent.class)); + } + + @Test + void sendInfoNotificationEmitsEvent() { + collector.sendInfoNotification("Test notification %s", "arg1"); + + ArgumentCaptor captor = ArgumentCaptor.forClass(InfoEvent.class); + verify(mockService).emit(captor.capture()); + InfoEvent emitted = captor.getValue(); + assertEquals(Path.CONSOLE_NOTIFICATION, emitted.getPath()); + assertEquals("info", emitted.getData().get("type")); + assertEquals("Test notification arg1", emitted.getData().get("message")); + } + + @Test + void sendErrorNotificationEmitsEvent() { + collector.sendErrorNotification("Error Header", "Something failed: %s", "reason"); + + ArgumentCaptor captor = ArgumentCaptor.forClass(InfoEvent.class); + verify(mockService).emit(captor.capture()); + InfoEvent emitted = captor.getValue(); + assertEquals(Path.CONSOLE_NOTIFICATION, emitted.getPath()); + assertEquals("Error Header", emitted.getData().get("type")); + assertEquals("ERROR", emitted.getData().get("level")); + assertEquals("Something failed: reason", emitted.getData().get("message")); + } + + /** Concrete subclass for testing the abstract InfoCollector. */ + static class TestInfoCollector extends InfoCollector { + TestInfoCollector(CdsRuntime runtime, RemoteMonitoringService service) { + super(runtime, service); + } + } +} diff --git a/cds-feature-console/src/test/java/com/sap/cds/feature/console/info/collectors/LogCollectorTest.java b/cds-feature-console/src/test/java/com/sap/cds/feature/console/info/collectors/LogCollectorTest.java index b8217f9..59133c6 100644 --- a/cds-feature-console/src/test/java/com/sap/cds/feature/console/info/collectors/LogCollectorTest.java +++ b/cds-feature-console/src/test/java/com/sap/cds/feature/console/info/collectors/LogCollectorTest.java @@ -8,15 +8,19 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; class LogCollectorTest extends InfoCollectorTestBase { - private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(LogCollectorTest.class); + private static final Logger LOG = LoggerFactory.getLogger(LogCollectorTest.class); @AfterEach void resetLogEventsStore() { @@ -27,14 +31,21 @@ void resetLogEventsStore() { assertEquals(0, logs.size()); } - @Test - void testUpdateLogLevelsToInfo() { - updateLogLevel(Level.INFO, LOG.getName()); + static Stream logLevelFilterData() { + return Stream.of( + Arguments.of(Level.INFO, 1, 0), + Arguments.of(Level.ERROR, 0, 0)); + } + + @ParameterizedTest(name = "level={0}") + @MethodSource("logLevelFilterData") + void testUpdateLogLevelsFiltersLowerPriority(Level level, int expectedInfo, int expectedDebug) { + updateLogLevel(level, LOG.getName()); LOG.info("info log"); LOG.debug("debug log"); - assertEquals(1, remoteMonitoringTestHandler.getLogEventsByLevel(Level.INFO).size()); - assertEquals(0, remoteMonitoringTestHandler.getLogEventsByLevel(Level.DEBUG).size()); + assertEquals(expectedInfo, remoteMonitoringTestHandler.getLogEventsByLevel(Level.INFO).size()); + assertEquals(expectedDebug, remoteMonitoringTestHandler.getLogEventsByLevel(Level.DEBUG).size()); } @Test @@ -67,78 +78,47 @@ void testUpdateRootLogLevelsToWarnAndRestore() { assertEquals(0, remoteMonitoringTestHandler.getLogEventsByLevel(Level.DEBUG).size()); } - @Test - void testUpdateLogLevelsToError() { - updateLogLevel(Level.ERROR, LOG.getName()); - - LOG.info("info log"); - LOG.debug("debug log"); - System.out.println("out"); - - assertEquals(0, remoteMonitoringTestHandler.getLogEventsByLevel(Level.INFO).size()); - assertEquals(0, remoteMonitoringTestHandler.getLogEventsByLevel(Level.DEBUG).size()); - } - - @Test - void testUpdateLogLevelsForCdsLoggerGroup() { - Map logger = new HashMap<>(); - logger.put("logger", "handlers"); // "handlers" is a CdsLoggerGroup value - logger.put("level", Level.WARN.toString()); - logger.put("group", Boolean.TRUE); - - List> loggers = new ArrayList<>(); - loggers.add(logger); - Map data = new HashMap<>(); - data.put("loggers", loggers); - - CommandEvent update = new CommandEvent(LogCollector.COMMAND_UPDATE, data); - remoteMonitoringService.emit(update); - - Logger handlersLogger = LoggerFactory.getLogger("com.sap.cds.services.impl.ServiceImpl"); - handlersLogger.warn("warn log for CdsLoggerGroup"); - handlersLogger.info("info log for CdsLoggerGroup"); - - assertEquals(1, remoteMonitoringTestHandler.getLogEventsByLevel(Level.WARN).size()); - assertEquals(0, remoteMonitoringTestHandler.getLogEventsByLevel(Level.INFO).size()); - - // reset - remoteMonitoringTestHandler.resetInfoEventsStore(); - logger.put("level", null); - remoteMonitoringService.emit(update); - handlersLogger.info("debug log for CdsLoggerGroup"); - - assertEquals(1, remoteMonitoringTestHandler.getLogEventsByLevel(Level.INFO).size()); + static Stream loggerGroupData() { + return Stream.of( + Arguments.of("handlers", true, Level.WARN, + "com.sap.cds.services.impl.ServiceImpl", Level.INFO, 1), + Arguments.of("com.sap.cds.feature.console", false, Level.DEBUG, + null, Level.DEBUG, 0)); } - @Test - void testUpdateLogLevelsForGroup() { - Map logger = new HashMap<>(); - logger.put("logger", "com.sap.cds.feature.console"); - logger.put("level", Level.DEBUG.toString()); - logger.put("group", Boolean.FALSE); + @ParameterizedTest(name = "logger={0}, group={1}") + @MethodSource("loggerGroupData") + void testUpdateLogLevelsForLoggerGroup( + String loggerName, boolean isGroup, Level level, + String testLoggerName, Level resetLevel, int resetExpected) { + Map loggerConfig = new HashMap<>(); + loggerConfig.put("logger", loggerName); + loggerConfig.put("level", level.toString()); + loggerConfig.put("group", isGroup); List> loggers = new ArrayList<>(); - loggers.add(logger); + loggers.add(loggerConfig); Map data = new HashMap<>(); data.put("loggers", loggers); CommandEvent update = new CommandEvent(LogCollector.COMMAND_UPDATE, data); remoteMonitoringService.emit(update); - LOG.debug("debug log for group"); - - assertEquals(1, remoteMonitoringTestHandler.getLogEventsByMessage("debug log for group").size()); + Logger testLogger = testLoggerName != null + ? LoggerFactory.getLogger(testLoggerName) : LOG; + logAtLevel(testLogger, level, "test log for group"); + assertEquals(1, + remoteMonitoringTestHandler.getLogEventsByMessage("test log for group").size()); - // reset remoteMonitoringTestHandler.resetInfoEventsStore(); - logger.put("level", null); + loggerConfig.put("level", null); remoteMonitoringService.emit(update); - LOG.debug("debug log for group"); + logAtLevel(testLogger, resetLevel, "test log after reset"); - assertEquals(0, remoteMonitoringTestHandler.getLogEventsByLevel(Level.DEBUG).size()); + assertEquals(resetExpected, + remoteMonitoringTestHandler.getLogEventsByMessage("test log after reset").size()); } - @SuppressWarnings("unchecked") private void updateLogLevel(Level level, String loggerName) { Map logger = new HashMap<>(); logger.put("logger", loggerName); @@ -155,4 +135,13 @@ private void updateLogLevel(Level level, String loggerName) { remoteMonitoringService.emit(update); } + private static void logAtLevel(Logger logger, Level level, String message) { + switch (level) { + case DEBUG -> logger.debug(message); + case INFO -> logger.info(message); + case WARN -> logger.warn(message); + case ERROR -> logger.error(message); + default -> logger.trace(message); + } + } } diff --git a/cds-feature-console/src/test/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollectorTest.java b/cds-feature-console/src/test/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollectorTest.java new file mode 100644 index 0000000..7fd0d22 --- /dev/null +++ b/cds-feature-console/src/test/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollectorTest.java @@ -0,0 +1,271 @@ +package com.sap.cds.feature.console.info.collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.sap.cds.Row; +import com.sap.cds.feature.console.info.Path; +import com.sap.cds.feature.console.service.CommandEventContext; +import com.sap.cds.feature.console.service.InfoEvent; +import com.sap.cds.feature.console.service.RemoteMonitoringService; +import com.sap.cds.services.EventContext; +import com.sap.cds.services.ServiceCatalog; +import com.sap.cds.services.cds.CqnService; +import com.sap.cds.services.changeset.ChangeSetContext; +import com.sap.cds.services.changeset.ChangeSetListener; +import com.sap.cds.services.environment.CdsEnvironment; +import com.sap.cds.services.environment.CdsProperties; +import com.sap.cds.services.environment.CdsProperties.Outbox; +import com.sap.cds.services.environment.CdsProperties.Outbox.OutboxServiceConfig; +import com.sap.cds.services.impl.outbox.Messages; +import com.sap.cds.services.impl.outbox.persistence.PersistentOutbox; +import com.sap.cds.services.mt.TenantProviderService; +import com.sap.cds.services.persistence.PersistenceService; +import com.sap.cds.services.request.RequestContext; +import com.sap.cds.services.request.UserInfo; +import com.sap.cds.services.runtime.CdsRuntime; +import com.sap.cds.services.runtime.ChangeSetContextRunner; +import com.sap.cds.services.runtime.RequestContextRunner; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.Stream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; + +class OutboxInfoCollectorTest { + + private RemoteMonitoringService service; + private CdsRuntime runtime; + private ServiceCatalog catalog; + private PersistenceService persistenceService; + private RequestContextRunner requestContextRunner; + private ChangeSetContextRunner changeSetContextRunner; + + @BeforeEach + @SuppressWarnings("unchecked") + void setUp() { + service = mock(RemoteMonitoringService.class); + runtime = mock(CdsRuntime.class); + catalog = mock(ServiceCatalog.class); + persistenceService = mock(PersistenceService.class); + requestContextRunner = mock(RequestContextRunner.class); + changeSetContextRunner = mock(ChangeSetContextRunner.class); + + when(runtime.getServiceCatalog()).thenReturn(catalog); + when(catalog.getService(PersistenceService.class, PersistenceService.DEFAULT_NAME)).thenReturn(persistenceService); + + when(runtime.requestContext()).thenReturn(requestContextRunner); + when(requestContextRunner.systemUser()).thenReturn(requestContextRunner); + when(requestContextRunner.systemUser(any())).thenReturn(requestContextRunner); + doAnswer(inv -> { + inv.getArgument(0, Consumer.class).accept(mock(RequestContext.class)); + return null; + }).when(requestContextRunner).run(any(Consumer.class)); + + when(runtime.changeSetContext()).thenReturn(changeSetContextRunner); + doAnswer(inv -> { + inv.getArgument(0, Consumer.class).accept(mock(ChangeSetContext.class)); + return null; + }).when(changeSetContextRunner).run(any(Consumer.class)); + } + + private OutboxInfoCollector createCollector(boolean persistentOutboxEnabled) { + if (persistentOutboxEnabled) { + when(catalog.getServices(PersistentOutbox.class)).thenAnswer(inv -> Stream.of(mock(PersistentOutbox.class))); + } else { + when(catalog.getServices(PersistentOutbox.class)).thenAnswer(inv -> Stream.empty()); + } + return new OutboxInfoCollector(runtime, service); + } + + private Method getPrivateMethod(String name, Class... paramTypes) throws Exception { + Method m = OutboxInfoCollector.class.getDeclaredMethod(name, paramTypes); + m.setAccessible(true); + return m; + } + + private void setLastSeenEntries(OutboxInfoCollector collector, Map> entries) throws Exception { + Field field = OutboxInfoCollector.class.getDeclaredField("lastSeenEntries"); + field.setAccessible(true); + field.set(collector, entries); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void constructorInitializesService(boolean persistentOutboxEnabled) { + OutboxInfoCollector collector = createCollector(persistentOutboxEnabled); + assertEquals(service, collector.getRemoteMonitoringService()); + } + + @Test + void capConsoleAttachedReturnsEarlyWhenOutboxDisabled() throws Exception { + OutboxInfoCollector collector = createCollector(false); + + CommandEventContext ctx = mock(CommandEventContext.class); + getPrivateMethod("capConsoleAttached", CommandEventContext.class).invoke(collector, ctx); + + verify(service, never()).emit(any(InfoEvent.class)); + } + + @Test + void capConsoleAttachedEmitsOutboxConfigsWhenEnabled() throws Exception { + OutboxInfoCollector collector = createCollector(true); + + when(catalog.getService(TenantProviderService.class, TenantProviderService.DEFAULT_NAME)).thenReturn(null); + + CommandEventContext ctx = mock(CommandEventContext.class); + ServiceCatalog ctxCatalog = mock(ServiceCatalog.class); + when(ctx.getServiceCatalog()).thenReturn(ctxCatalog); + + PersistentOutbox mockOutbox = mock(PersistentOutbox.class); + when(mockOutbox.getName()).thenReturn("test-outbox"); + when(ctxCatalog.getServices(PersistentOutbox.class)).thenAnswer(inv -> Stream.of(mockOutbox)); + + CdsRuntime ctxRuntime = mock(CdsRuntime.class); + when(ctx.getCdsRuntime()).thenReturn(ctxRuntime); + CdsEnvironment env = mock(CdsEnvironment.class); + when(ctxRuntime.getEnvironment()).thenReturn(env); + CdsProperties props = mock(CdsProperties.class); + when(env.getCdsProperties()).thenReturn(props); + Outbox outbox = mock(Outbox.class); + when(props.getOutbox()).thenReturn(outbox); + OutboxServiceConfig config = mock(OutboxServiceConfig.class); + when(outbox.getService("test-outbox")).thenReturn(config); + when(config.getMaxAttempts()).thenReturn(5); + when(config.isOrdered()).thenReturn(true); + when(config.isEnabled()).thenReturn(true); + + getPrivateMethod("capConsoleAttached", CommandEventContext.class).invoke(collector, ctx); + + verify(service, atLeastOnce()).emit(any(InfoEvent.class)); + } + + @ParameterizedTest + @ValueSource(strings = {"resetEntry", "removeEntry"}) + void entryCommandEmitsNotification(String methodName) throws Exception { + when(catalog.getService(TenantProviderService.class, TenantProviderService.DEFAULT_NAME)).thenReturn(null); + CommandEventContext ctx = mock(CommandEventContext.class); + Map data = new HashMap<>(); + data.put("id", "entry-1"); + data.put("target", "my-target"); + when(ctx.getData()).thenReturn(data); + + OutboxInfoCollector collector = createCollector(true); + getPrivateMethod(methodName, CommandEventContext.class).invoke(collector, ctx); + + verify(service, atLeastOnce()).emit(any(InfoEvent.class)); + } + + @ParameterizedTest + @ValueSource(strings = {"removeHistoryEntry", "replayEntry"}) + void commandWhenTenantNotInHistoryEmitsNotification(String methodName) throws Exception { + when(catalog.getService(TenantProviderService.class, TenantProviderService.DEFAULT_NAME)).thenReturn(null); + CommandEventContext ctx = mock(CommandEventContext.class); + Map data = new HashMap<>(); + data.put("id", "entry-1"); + when(ctx.getData()).thenReturn(data); + + OutboxInfoCollector collector = createCollector(true); + getPrivateMethod(methodName, CommandEventContext.class).invoke(collector, ctx); + + ArgumentCaptor captor = ArgumentCaptor.forClass(InfoEvent.class); + verify(service).emit(captor.capture()); + assertEquals(Path.CONSOLE_NOTIFICATION, captor.getValue().getPath()); + } + + @Test + void removeHistoryEntryRemovesMatchingEntry() throws Exception { + when(catalog.getService(TenantProviderService.class, TenantProviderService.DEFAULT_NAME)).thenReturn(null); + Row mockRow = mock(Row.class); + Messages mockMsg = mock(Messages.class); + when(mockRow.as(Messages.class)).thenReturn(mockMsg); + when(mockMsg.getId()).thenReturn("entry-1"); + CommandEventContext ctx = mock(CommandEventContext.class); + Map data = new HashMap<>(); + data.put("id", "entry-1"); + when(ctx.getData()).thenReturn(data); + + OutboxInfoCollector collector = createCollector(true); + Map> history = new HashMap<>(); + List entries = new ArrayList<>(); + entries.add(mockRow); + history.put(null, entries); + setLastSeenEntries(collector, history); + getPrivateMethod("removeHistoryEntry", CommandEventContext.class).invoke(collector, ctx); + + assertTrue(entries.isEmpty(), "History entry should have been removed"); + verify(service, atLeastOnce()).emit(any(InfoEvent.class)); + } + + @Test + void outboxEventsSkipsReadEvent() throws Exception { + OutboxInfoCollector collector = createCollector(true); + + EventContext ctx = mock(EventContext.class); + when(ctx.getEvent()).thenReturn(CqnService.EVENT_READ); + + getPrivateMethod("outboxEvents", EventContext.class).invoke(collector, ctx); + + verify(service, never()).emit(any(InfoEvent.class)); + } + + @Test + void outboxEventsRegistersListenerForNonReadEvent() throws Exception { + when(catalog.getService(TenantProviderService.class, TenantProviderService.DEFAULT_NAME)).thenReturn(null); + EventContext ctx = mock(EventContext.class); + when(ctx.getEvent()).thenReturn(CqnService.EVENT_CREATE); + UserInfo userInfo = mock(UserInfo.class); + when(ctx.getUserInfo()).thenReturn(userInfo); + when(userInfo.getTenant()).thenReturn("t1"); + ChangeSetContext csCtx = mock(ChangeSetContext.class); + when(ctx.getChangeSetContext()).thenReturn(csCtx); + + OutboxInfoCollector collector = createCollector(true); + getPrivateMethod("outboxEvents", EventContext.class).invoke(collector, ctx); + + ArgumentCaptor captor = ArgumentCaptor.forClass(ChangeSetListener.class); + verify(csCtx).register(captor.capture()); + + captor.getValue().afterClose(true); + verify(service, atLeastOnce()).emit(any(InfoEvent.class)); + } + + @Test + void getTenantOutboxesWithDisabledOutbox() throws Exception { + OutboxInfoCollector collector = createCollector(false); + + InfoEvent event = (InfoEvent) getPrivateMethod("getTenantOutboxes", String.class).invoke(collector, "t1"); + + assertEquals(Path.OUTBOX_TENANTS + ".t1", event.getPath()); + assertFalse(event.getData().containsKey("entries")); + } + + @Test + void getTenantOutboxesWithEnabledOutboxIncludesHistory() throws Exception { + com.sap.cds.Result mockResult = mock(com.sap.cds.Result.class); + when(mockResult.list()).thenReturn(List.of()); + when(persistenceService.run(any(com.sap.cds.ql.cqn.CqnSelect.class))).thenReturn(mockResult); + + OutboxInfoCollector collector = createCollector(true); + InfoEvent event = (InfoEvent) getPrivateMethod("getTenantOutboxes", String.class).invoke(collector, "t1"); + + assertEquals(Path.OUTBOX_TENANTS + ".t1", event.getPath()); + assertTrue(event.getData().containsKey("history")); + } +} diff --git a/cds-feature-console/src/test/java/com/sap/cds/feature/console/service/OutboxConfigTest.java b/cds-feature-console/src/test/java/com/sap/cds/feature/console/service/OutboxConfigTest.java new file mode 100644 index 0000000..4a310f1 --- /dev/null +++ b/cds-feature-console/src/test/java/com/sap/cds/feature/console/service/OutboxConfigTest.java @@ -0,0 +1,41 @@ +package com.sap.cds.feature.console.service; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.sap.cds.services.environment.CdsProperties.Outbox.OutboxServiceConfig; +import java.util.Map; +import java.util.stream.Stream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +class OutboxConfigTest { + + static Stream outboxConfigs() { + return Stream.of( + Arguments.of("my-outbox", 5, true, true), + Arguments.of("disabled-outbox", 3, false, false), + Arguments.of("test", 1, false, true)); + } + + @ParameterizedTest + @MethodSource("outboxConfigs") + void fromServiceConfigMapsAllProperties( + String name, int maxAttempts, boolean ordered, boolean enabled) { + OutboxServiceConfig config = mock(OutboxServiceConfig.class); + when(config.getMaxAttempts()).thenReturn(maxAttempts); + when(config.isOrdered()).thenReturn(ordered); + when(config.isEnabled()).thenReturn(enabled); + + OutboxConfig outboxConfig = OutboxConfig.fromServiceConfig(config, name); + + Map map = outboxConfig.toMap(); + assertEquals(4, map.size()); + assertEquals(name, map.get("name")); + assertEquals(maxAttempts, map.get("maxAttempts")); + assertEquals(ordered, map.get("ordered")); + assertEquals(enabled, map.get("enabled")); + } +}