From 28d40a5b741c0bb497c8c711e596ab6a5792006f Mon Sep 17 00:00:00 2001 From: Amer Alsayed <99971255+Amer-alsayed@users.noreply.github.com> Date: Fri, 13 Mar 2026 16:29:12 +0200 Subject: [PATCH 1/2] Fix desktop WS URL bootstrap and skip legacy provider events --- apps/desktop/src/main.ts | 6 + apps/desktop/src/preload.ts | 11 +- .../Layers/OrchestrationEventStore.ts | 541 +++++++++--------- .../Layers/ProviderSessionDirectory.ts | 12 + 4 files changed, 302 insertions(+), 268 deletions(-) diff --git a/apps/desktop/src/main.ts b/apps/desktop/src/main.ts index 460684929..4d41cf892 100644 --- a/apps/desktop/src/main.ts +++ b/apps/desktop/src/main.ts @@ -47,6 +47,7 @@ import { isArm64HostRunningIntelBuild, resolveDesktopRuntimeInfo } from "./runti fixPath(); const PICK_FOLDER_CHANNEL = "desktop:pick-folder"; +const GET_WS_URL_CHANNEL = "desktop:get-ws-url"; const CONFIRM_CHANNEL = "desktop:confirm"; const SET_THEME_CHANNEL = "desktop:set-theme"; const CONTEXT_MENU_CHANNEL = "desktop:context-menu"; @@ -1086,6 +1087,11 @@ function registerIpcHandlers(): void { return result.filePaths[0] ?? null; }); + ipcMain.removeAllListeners(GET_WS_URL_CHANNEL); + ipcMain.on(GET_WS_URL_CHANNEL, (event) => { + event.returnValue = backendWsUrl || null; + }); + ipcMain.removeHandler(CONFIRM_CHANNEL); ipcMain.handle(CONFIRM_CHANNEL, async (_event, message: unknown) => { if (typeof message !== "string") { diff --git a/apps/desktop/src/preload.ts b/apps/desktop/src/preload.ts index 1e1bb3bd8..4759ad51f 100644 --- a/apps/desktop/src/preload.ts +++ b/apps/desktop/src/preload.ts @@ -2,6 +2,7 @@ import { contextBridge, ipcRenderer } from "electron"; import type { DesktopBridge } from "@t3tools/contracts"; const PICK_FOLDER_CHANNEL = "desktop:pick-folder"; +const GET_WS_URL_CHANNEL = "desktop:get-ws-url"; const CONFIRM_CHANNEL = "desktop:confirm"; const SET_THEME_CHANNEL = "desktop:set-theme"; const CONTEXT_MENU_CHANNEL = "desktop:context-menu"; @@ -11,7 +12,15 @@ const UPDATE_STATE_CHANNEL = "desktop:update-state"; const UPDATE_GET_STATE_CHANNEL = "desktop:update-get-state"; const UPDATE_DOWNLOAD_CHANNEL = "desktop:update-download"; const UPDATE_INSTALL_CHANNEL = "desktop:update-install"; -const wsUrl = process.env.T3CODE_DESKTOP_WS_URL ?? null; +let wsUrl: string | null = null; +try { + const value = ipcRenderer.sendSync(GET_WS_URL_CHANNEL); + if (typeof value === "string" && value.length > 0) { + wsUrl = value; + } +} catch { + wsUrl = null; +} contextBridge.exposeInMainWorld("desktopBridge", { getWsUrl: () => wsUrl, diff --git a/apps/server/src/persistence/Layers/OrchestrationEventStore.ts b/apps/server/src/persistence/Layers/OrchestrationEventStore.ts index 4d81cf5e8..6e65d2316 100644 --- a/apps/server/src/persistence/Layers/OrchestrationEventStore.ts +++ b/apps/server/src/persistence/Layers/OrchestrationEventStore.ts @@ -1,267 +1,274 @@ -import { - CommandId, - EventId, - IsoDateTime, - NonNegativeInt, - OrchestrationActorKind, - OrchestrationAggregateKind, - OrchestrationEvent, - OrchestrationEventMetadata, - OrchestrationEventType, - ProjectId, - ThreadId, -} from "@t3tools/contracts"; -import * as SqlClient from "effect/unstable/sql/SqlClient"; -import * as SqlSchema from "effect/unstable/sql/SqlSchema"; -import { Effect, Layer, Schema, Stream } from "effect"; - -import { - toPersistenceDecodeError, - toPersistenceSqlError, - type OrchestrationEventStoreError, -} from "../Errors.ts"; -import { - OrchestrationEventStore, - type OrchestrationEventStoreShape, -} from "../Services/OrchestrationEventStore.ts"; - -const decodeEvent = Schema.decodeUnknownEffect(OrchestrationEvent); -const UnknownFromJsonString = Schema.fromJsonString(Schema.Unknown); -const EventMetadataFromJsonString = Schema.fromJsonString(OrchestrationEventMetadata); - -const AppendEventRequestSchema = Schema.Struct({ - eventId: EventId, - aggregateKind: OrchestrationAggregateKind, - streamId: Schema.Union([ProjectId, ThreadId]), - type: OrchestrationEventType, - causationEventId: Schema.NullOr(EventId), - correlationId: Schema.NullOr(CommandId), - actorKind: OrchestrationActorKind, - occurredAt: IsoDateTime, - commandId: Schema.NullOr(CommandId), - payloadJson: UnknownFromJsonString, - metadataJson: EventMetadataFromJsonString, -}); - -const OrchestrationEventPersistedRowSchema = Schema.Struct({ - sequence: NonNegativeInt, - eventId: EventId, - type: OrchestrationEventType, - aggregateKind: OrchestrationAggregateKind, - aggregateId: Schema.Union([ProjectId, ThreadId]), - occurredAt: IsoDateTime, - commandId: Schema.NullOr(CommandId), - causationEventId: Schema.NullOr(EventId), - correlationId: Schema.NullOr(CommandId), - payload: UnknownFromJsonString, - metadata: EventMetadataFromJsonString, -}); - -const ReadFromSequenceRequestSchema = Schema.Struct({ - sequenceExclusive: NonNegativeInt, - limit: Schema.Number, -}); -const DEFAULT_READ_FROM_SEQUENCE_LIMIT = 1_000; -const READ_PAGE_SIZE = 500; - -function inferActorKind( - event: Omit, -): Schema.Schema.Type { - if (event.commandId !== null && event.commandId.startsWith("provider:")) { - return "provider"; - } - if (event.commandId !== null && event.commandId.startsWith("server:")) { - return "server"; - } - if ( - event.metadata.providerTurnId !== undefined || - event.metadata.providerItemId !== undefined || - event.metadata.adapterKey !== undefined - ) { - return "provider"; - } - if (event.commandId === null) { - return "server"; - } - return "client"; -} - -function toPersistenceSqlOrDecodeError(sqlOperation: string, decodeOperation: string) { - return (cause: unknown): OrchestrationEventStoreError => - Schema.isSchemaError(cause) - ? toPersistenceDecodeError(decodeOperation)(cause) - : toPersistenceSqlError(sqlOperation)(cause); -} - -const makeEventStore = Effect.gen(function* () { - const sql = yield* SqlClient.SqlClient; - - const appendEventRow = SqlSchema.findOne({ - Request: AppendEventRequestSchema, - Result: OrchestrationEventPersistedRowSchema, - execute: (request) => - sql` - INSERT INTO orchestration_events ( - event_id, - aggregate_kind, - stream_id, - stream_version, - event_type, - occurred_at, - command_id, - causation_event_id, - correlation_id, - actor_kind, - payload_json, - metadata_json - ) - VALUES ( - ${request.eventId}, - ${request.aggregateKind}, - ${request.streamId}, - COALESCE( - ( - SELECT stream_version + 1 - FROM orchestration_events - WHERE aggregate_kind = ${request.aggregateKind} - AND stream_id = ${request.streamId} - ORDER BY stream_version DESC - LIMIT 1 - ), - 0 - ), - ${request.type}, - ${request.occurredAt}, - ${request.commandId}, - ${request.causationEventId}, - ${request.correlationId}, - ${request.actorKind}, - ${request.payloadJson}, - ${request.metadataJson} - ) - RETURNING - sequence, - event_id AS "eventId", - event_type AS "type", - aggregate_kind AS "aggregateKind", - stream_id AS "aggregateId", - occurred_at AS "occurredAt", - command_id AS "commandId", - causation_event_id AS "causationEventId", - correlation_id AS "correlationId", - payload_json AS "payload", - metadata_json AS "metadata" - `, - }); - - const readEventRowsFromSequence = SqlSchema.findAll({ - Request: ReadFromSequenceRequestSchema, - Result: OrchestrationEventPersistedRowSchema, - execute: (request) => - sql` - SELECT - sequence, - event_id AS "eventId", - event_type AS "type", - aggregate_kind AS "aggregateKind", - stream_id AS "aggregateId", - occurred_at AS "occurredAt", - command_id AS "commandId", - causation_event_id AS "causationEventId", - correlation_id AS "correlationId", - payload_json AS "payload", - metadata_json AS "metadata" - FROM orchestration_events - WHERE sequence > ${request.sequenceExclusive} - ORDER BY sequence ASC - LIMIT ${request.limit} - `, - }); - - const append: OrchestrationEventStoreShape["append"] = (event) => - appendEventRow({ - eventId: event.eventId, - aggregateKind: event.aggregateKind, - streamId: event.aggregateId, - type: event.type, - causationEventId: event.causationEventId, - correlationId: event.correlationId, - actorKind: inferActorKind(event), - occurredAt: event.occurredAt, - commandId: event.commandId, - payloadJson: event.payload, - metadataJson: event.metadata, - }).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "OrchestrationEventStore.append:insert", - "OrchestrationEventStore.append:decodeRow", - ), - ), - Effect.flatMap((row) => - decodeEvent(row).pipe( - Effect.mapError(toPersistenceDecodeError("OrchestrationEventStore.append:rowToEvent")), - ), - ), - ); - - const readFromSequence: OrchestrationEventStoreShape["readFromSequence"] = ( - sequenceExclusive, - limit = DEFAULT_READ_FROM_SEQUENCE_LIMIT, - ) => { - const normalizedLimit = Math.max(0, Math.floor(limit)); - if (normalizedLimit === 0) { - return Stream.empty; - } - const readPage = ( - cursor: number, - remaining: number, - ): Stream.Stream => - Stream.fromEffect( - readEventRowsFromSequence({ - sequenceExclusive: cursor, - limit: Math.min(remaining, READ_PAGE_SIZE), - }).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "OrchestrationEventStore.readFromSequence:query", - "OrchestrationEventStore.readFromSequence:decodeRows", - ), - ), - Effect.flatMap((rows) => - Effect.forEach(rows, (row) => - decodeEvent(row).pipe( - Effect.mapError( - toPersistenceDecodeError("OrchestrationEventStore.readFromSequence:rowToEvent"), - ), - ), - ), - ), - ), - ).pipe( - Stream.flatMap((events) => { - if (events.length === 0) { - return Stream.empty; - } - const nextRemaining = remaining - events.length; - if (nextRemaining <= 0) { - return Stream.fromIterable(events); - } - return Stream.concat( - Stream.fromIterable(events), - readPage(events[events.length - 1]!.sequence, nextRemaining), - ); - }), - ); - - return readPage(sequenceExclusive, normalizedLimit); - }; - - return { - append, - readFromSequence, - readAll: () => readFromSequence(0, Number.MAX_SAFE_INTEGER), - } satisfies OrchestrationEventStoreShape; -}); - -export const OrchestrationEventStoreLive = Layer.effect(OrchestrationEventStore, makeEventStore); +import { + CommandId, + EventId, + IsoDateTime, + NonNegativeInt, + OrchestrationActorKind, + OrchestrationAggregateKind, + OrchestrationEvent, + OrchestrationEventMetadata, + OrchestrationEventType, + ProjectId, + ThreadId, +} from "@t3tools/contracts"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; +import * as SqlSchema from "effect/unstable/sql/SqlSchema"; +import { Effect, Layer, Option, Schema, Stream } from "effect"; + +import { + toPersistenceDecodeError, + toPersistenceSqlError, + type OrchestrationEventStoreError, +} from "../Errors.ts"; +import { + OrchestrationEventStore, + type OrchestrationEventStoreShape, +} from "../Services/OrchestrationEventStore.ts"; + +const decodeEvent = Schema.decodeUnknownEffect(OrchestrationEvent); +const UnknownFromJsonString = Schema.fromJsonString(Schema.Unknown); +const EventMetadataFromJsonString = Schema.fromJsonString(OrchestrationEventMetadata); + +const AppendEventRequestSchema = Schema.Struct({ + eventId: EventId, + aggregateKind: OrchestrationAggregateKind, + streamId: Schema.Union([ProjectId, ThreadId]), + type: OrchestrationEventType, + causationEventId: Schema.NullOr(EventId), + correlationId: Schema.NullOr(CommandId), + actorKind: OrchestrationActorKind, + occurredAt: IsoDateTime, + commandId: Schema.NullOr(CommandId), + payloadJson: UnknownFromJsonString, + metadataJson: EventMetadataFromJsonString, +}); + +const OrchestrationEventPersistedRowSchema = Schema.Struct({ + sequence: NonNegativeInt, + eventId: EventId, + type: OrchestrationEventType, + aggregateKind: OrchestrationAggregateKind, + aggregateId: Schema.Union([ProjectId, ThreadId]), + occurredAt: IsoDateTime, + commandId: Schema.NullOr(CommandId), + causationEventId: Schema.NullOr(EventId), + correlationId: Schema.NullOr(CommandId), + payload: UnknownFromJsonString, + metadata: EventMetadataFromJsonString, +}); + +const ReadFromSequenceRequestSchema = Schema.Struct({ + sequenceExclusive: NonNegativeInt, + limit: Schema.Number, +}); +const DEFAULT_READ_FROM_SEQUENCE_LIMIT = 1_000; +const READ_PAGE_SIZE = 500; + +function inferActorKind( + event: Omit, +): Schema.Schema.Type { + if (event.commandId !== null && event.commandId.startsWith("provider:")) { + return "provider"; + } + if (event.commandId !== null && event.commandId.startsWith("server:")) { + return "server"; + } + if ( + event.metadata.providerTurnId !== undefined || + event.metadata.providerItemId !== undefined || + event.metadata.adapterKey !== undefined + ) { + return "provider"; + } + if (event.commandId === null) { + return "server"; + } + return "client"; +} + +function toPersistenceSqlOrDecodeError(sqlOperation: string, decodeOperation: string) { + return (cause: unknown): OrchestrationEventStoreError => + Schema.isSchemaError(cause) + ? toPersistenceDecodeError(decodeOperation)(cause) + : toPersistenceSqlError(sqlOperation)(cause); +} + +const makeEventStore = Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + const appendEventRow = SqlSchema.findOne({ + Request: AppendEventRequestSchema, + Result: OrchestrationEventPersistedRowSchema, + execute: (request) => + sql` + INSERT INTO orchestration_events ( + event_id, + aggregate_kind, + stream_id, + stream_version, + event_type, + occurred_at, + command_id, + causation_event_id, + correlation_id, + actor_kind, + payload_json, + metadata_json + ) + VALUES ( + ${request.eventId}, + ${request.aggregateKind}, + ${request.streamId}, + COALESCE( + ( + SELECT stream_version + 1 + FROM orchestration_events + WHERE aggregate_kind = ${request.aggregateKind} + AND stream_id = ${request.streamId} + ORDER BY stream_version DESC + LIMIT 1 + ), + 0 + ), + ${request.type}, + ${request.occurredAt}, + ${request.commandId}, + ${request.causationEventId}, + ${request.correlationId}, + ${request.actorKind}, + ${request.payloadJson}, + ${request.metadataJson} + ) + RETURNING + sequence, + event_id AS "eventId", + event_type AS "type", + aggregate_kind AS "aggregateKind", + stream_id AS "aggregateId", + occurred_at AS "occurredAt", + command_id AS "commandId", + causation_event_id AS "causationEventId", + correlation_id AS "correlationId", + payload_json AS "payload", + metadata_json AS "metadata" + `, + }); + + const readEventRowsFromSequence = SqlSchema.findAll({ + Request: ReadFromSequenceRequestSchema, + Result: OrchestrationEventPersistedRowSchema, + execute: (request) => + sql` + SELECT + sequence, + event_id AS "eventId", + event_type AS "type", + aggregate_kind AS "aggregateKind", + stream_id AS "aggregateId", + occurred_at AS "occurredAt", + command_id AS "commandId", + causation_event_id AS "causationEventId", + correlation_id AS "correlationId", + payload_json AS "payload", + metadata_json AS "metadata" + FROM orchestration_events + WHERE sequence > ${request.sequenceExclusive} + ORDER BY sequence ASC + LIMIT ${request.limit} + `, + }); + + const append: OrchestrationEventStoreShape["append"] = (event) => + appendEventRow({ + eventId: event.eventId, + aggregateKind: event.aggregateKind, + streamId: event.aggregateId, + type: event.type, + causationEventId: event.causationEventId, + correlationId: event.correlationId, + actorKind: inferActorKind(event), + occurredAt: event.occurredAt, + commandId: event.commandId, + payloadJson: event.payload, + metadataJson: event.metadata, + }).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "OrchestrationEventStore.append:insert", + "OrchestrationEventStore.append:decodeRow", + ), + ), + Effect.flatMap((row) => + decodeEvent(row).pipe( + Effect.mapError(toPersistenceDecodeError("OrchestrationEventStore.append:rowToEvent")), + ), + ), + ); + + const readFromSequence: OrchestrationEventStoreShape["readFromSequence"] = ( + sequenceExclusive, + limit = DEFAULT_READ_FROM_SEQUENCE_LIMIT, + ) => { + const normalizedLimit = Math.max(0, Math.floor(limit)); + if (normalizedLimit === 0) { + return Stream.empty; + } + const readPage = ( + cursor: number, + remaining: number, + ): Stream.Stream => + Stream.fromEffect( + readEventRowsFromSequence({ + sequenceExclusive: cursor, + limit: Math.min(remaining, READ_PAGE_SIZE), + }).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "OrchestrationEventStore.readFromSequence:query", + "OrchestrationEventStore.readFromSequence:decodeRows", + ), + ), + Effect.flatMap((rows) => + Effect.forEach(rows, (row) => + decodeEvent(row).pipe( + Effect.map(Option.some), + Effect.catch((cause) => + Effect.logWarning( + `OrchestrationEventStore.readFromSequence: dropping event sequence=${row.sequence} due to decode error: ${String(cause)}` + ).pipe(Effect.as(Option.none())), + ), + ), + ).pipe( + Effect.map((events) => + events.flatMap((event) => (Option.isSome(event) ? [event.value] : [])), + ), + ), + ), + ), + ).pipe( + Stream.flatMap((events) => { + if (events.length === 0) { + return Stream.empty; + } + const nextRemaining = remaining - events.length; + if (nextRemaining <= 0) { + return Stream.fromIterable(events); + } + return Stream.concat( + Stream.fromIterable(events), + readPage(events[events.length - 1]!.sequence, nextRemaining), + ); + }), + ); + + return readPage(sequenceExclusive, normalizedLimit); + }; + + return { + append, + readFromSequence, + readAll: () => readFromSequence(0, Number.MAX_SAFE_INTEGER), + } satisfies OrchestrationEventStoreShape; +}); + +export const OrchestrationEventStoreLive = Layer.effect(OrchestrationEventStore, makeEventStore); diff --git a/apps/server/src/provider/Layers/ProviderSessionDirectory.ts b/apps/server/src/provider/Layers/ProviderSessionDirectory.ts index 38e097e1c..6d0eda835 100644 --- a/apps/server/src/provider/Layers/ProviderSessionDirectory.ts +++ b/apps/server/src/provider/Layers/ProviderSessionDirectory.ts @@ -72,6 +72,18 @@ const makeProviderSessionDirectory = Effect.gen(function* () { runtimePayload: value.runtimePayload, }), ), + Effect.catch(() => + // Unknown providers should not poison startup; drop the binding and move on. + repository + .deleteByThreadId({ threadId: value.threadId }) + .pipe( + Effect.mapError( + toPersistenceError("ProviderSessionDirectory.getBinding:cleanup"), + ), + Effect.ignore, + Effect.as(Option.none()), + ), + ), ), }), ), From 23a1270177099dc88c12a903c20c0015903e8cef Mon Sep 17 00:00:00 2001 From: Amer Alsayed <99971255+Amer-alsayed@users.noreply.github.com> Date: Fri, 13 Mar 2026 16:39:16 +0200 Subject: [PATCH 2/2] Fix pagination cursor when skipping invalid events --- .../Layers/OrchestrationEventStore.ts | 546 +++++++++--------- 1 file changed, 272 insertions(+), 274 deletions(-) diff --git a/apps/server/src/persistence/Layers/OrchestrationEventStore.ts b/apps/server/src/persistence/Layers/OrchestrationEventStore.ts index 6e65d2316..8122d8b32 100644 --- a/apps/server/src/persistence/Layers/OrchestrationEventStore.ts +++ b/apps/server/src/persistence/Layers/OrchestrationEventStore.ts @@ -1,274 +1,272 @@ -import { - CommandId, - EventId, - IsoDateTime, - NonNegativeInt, - OrchestrationActorKind, - OrchestrationAggregateKind, - OrchestrationEvent, - OrchestrationEventMetadata, - OrchestrationEventType, - ProjectId, - ThreadId, -} from "@t3tools/contracts"; -import * as SqlClient from "effect/unstable/sql/SqlClient"; -import * as SqlSchema from "effect/unstable/sql/SqlSchema"; -import { Effect, Layer, Option, Schema, Stream } from "effect"; - -import { - toPersistenceDecodeError, - toPersistenceSqlError, - type OrchestrationEventStoreError, -} from "../Errors.ts"; -import { - OrchestrationEventStore, - type OrchestrationEventStoreShape, -} from "../Services/OrchestrationEventStore.ts"; - -const decodeEvent = Schema.decodeUnknownEffect(OrchestrationEvent); -const UnknownFromJsonString = Schema.fromJsonString(Schema.Unknown); -const EventMetadataFromJsonString = Schema.fromJsonString(OrchestrationEventMetadata); - -const AppendEventRequestSchema = Schema.Struct({ - eventId: EventId, - aggregateKind: OrchestrationAggregateKind, - streamId: Schema.Union([ProjectId, ThreadId]), - type: OrchestrationEventType, - causationEventId: Schema.NullOr(EventId), - correlationId: Schema.NullOr(CommandId), - actorKind: OrchestrationActorKind, - occurredAt: IsoDateTime, - commandId: Schema.NullOr(CommandId), - payloadJson: UnknownFromJsonString, - metadataJson: EventMetadataFromJsonString, -}); - -const OrchestrationEventPersistedRowSchema = Schema.Struct({ - sequence: NonNegativeInt, - eventId: EventId, - type: OrchestrationEventType, - aggregateKind: OrchestrationAggregateKind, - aggregateId: Schema.Union([ProjectId, ThreadId]), - occurredAt: IsoDateTime, - commandId: Schema.NullOr(CommandId), - causationEventId: Schema.NullOr(EventId), - correlationId: Schema.NullOr(CommandId), - payload: UnknownFromJsonString, - metadata: EventMetadataFromJsonString, -}); - -const ReadFromSequenceRequestSchema = Schema.Struct({ - sequenceExclusive: NonNegativeInt, - limit: Schema.Number, -}); -const DEFAULT_READ_FROM_SEQUENCE_LIMIT = 1_000; -const READ_PAGE_SIZE = 500; - -function inferActorKind( - event: Omit, -): Schema.Schema.Type { - if (event.commandId !== null && event.commandId.startsWith("provider:")) { - return "provider"; - } - if (event.commandId !== null && event.commandId.startsWith("server:")) { - return "server"; - } - if ( - event.metadata.providerTurnId !== undefined || - event.metadata.providerItemId !== undefined || - event.metadata.adapterKey !== undefined - ) { - return "provider"; - } - if (event.commandId === null) { - return "server"; - } - return "client"; -} - -function toPersistenceSqlOrDecodeError(sqlOperation: string, decodeOperation: string) { - return (cause: unknown): OrchestrationEventStoreError => - Schema.isSchemaError(cause) - ? toPersistenceDecodeError(decodeOperation)(cause) - : toPersistenceSqlError(sqlOperation)(cause); -} - -const makeEventStore = Effect.gen(function* () { - const sql = yield* SqlClient.SqlClient; - - const appendEventRow = SqlSchema.findOne({ - Request: AppendEventRequestSchema, - Result: OrchestrationEventPersistedRowSchema, - execute: (request) => - sql` - INSERT INTO orchestration_events ( - event_id, - aggregate_kind, - stream_id, - stream_version, - event_type, - occurred_at, - command_id, - causation_event_id, - correlation_id, - actor_kind, - payload_json, - metadata_json - ) - VALUES ( - ${request.eventId}, - ${request.aggregateKind}, - ${request.streamId}, - COALESCE( - ( - SELECT stream_version + 1 - FROM orchestration_events - WHERE aggregate_kind = ${request.aggregateKind} - AND stream_id = ${request.streamId} - ORDER BY stream_version DESC - LIMIT 1 - ), - 0 - ), - ${request.type}, - ${request.occurredAt}, - ${request.commandId}, - ${request.causationEventId}, - ${request.correlationId}, - ${request.actorKind}, - ${request.payloadJson}, - ${request.metadataJson} - ) - RETURNING - sequence, - event_id AS "eventId", - event_type AS "type", - aggregate_kind AS "aggregateKind", - stream_id AS "aggregateId", - occurred_at AS "occurredAt", - command_id AS "commandId", - causation_event_id AS "causationEventId", - correlation_id AS "correlationId", - payload_json AS "payload", - metadata_json AS "metadata" - `, - }); - - const readEventRowsFromSequence = SqlSchema.findAll({ - Request: ReadFromSequenceRequestSchema, - Result: OrchestrationEventPersistedRowSchema, - execute: (request) => - sql` - SELECT - sequence, - event_id AS "eventId", - event_type AS "type", - aggregate_kind AS "aggregateKind", - stream_id AS "aggregateId", - occurred_at AS "occurredAt", - command_id AS "commandId", - causation_event_id AS "causationEventId", - correlation_id AS "correlationId", - payload_json AS "payload", - metadata_json AS "metadata" - FROM orchestration_events - WHERE sequence > ${request.sequenceExclusive} - ORDER BY sequence ASC - LIMIT ${request.limit} - `, - }); - - const append: OrchestrationEventStoreShape["append"] = (event) => - appendEventRow({ - eventId: event.eventId, - aggregateKind: event.aggregateKind, - streamId: event.aggregateId, - type: event.type, - causationEventId: event.causationEventId, - correlationId: event.correlationId, - actorKind: inferActorKind(event), - occurredAt: event.occurredAt, - commandId: event.commandId, - payloadJson: event.payload, - metadataJson: event.metadata, - }).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "OrchestrationEventStore.append:insert", - "OrchestrationEventStore.append:decodeRow", - ), - ), - Effect.flatMap((row) => - decodeEvent(row).pipe( - Effect.mapError(toPersistenceDecodeError("OrchestrationEventStore.append:rowToEvent")), - ), - ), - ); - - const readFromSequence: OrchestrationEventStoreShape["readFromSequence"] = ( - sequenceExclusive, - limit = DEFAULT_READ_FROM_SEQUENCE_LIMIT, - ) => { - const normalizedLimit = Math.max(0, Math.floor(limit)); - if (normalizedLimit === 0) { - return Stream.empty; - } - const readPage = ( - cursor: number, - remaining: number, - ): Stream.Stream => - Stream.fromEffect( - readEventRowsFromSequence({ - sequenceExclusive: cursor, - limit: Math.min(remaining, READ_PAGE_SIZE), - }).pipe( - Effect.mapError( - toPersistenceSqlOrDecodeError( - "OrchestrationEventStore.readFromSequence:query", - "OrchestrationEventStore.readFromSequence:decodeRows", - ), - ), - Effect.flatMap((rows) => - Effect.forEach(rows, (row) => - decodeEvent(row).pipe( - Effect.map(Option.some), - Effect.catch((cause) => - Effect.logWarning( - `OrchestrationEventStore.readFromSequence: dropping event sequence=${row.sequence} due to decode error: ${String(cause)}` - ).pipe(Effect.as(Option.none())), - ), - ), - ).pipe( - Effect.map((events) => - events.flatMap((event) => (Option.isSome(event) ? [event.value] : [])), - ), - ), - ), - ), - ).pipe( - Stream.flatMap((events) => { - if (events.length === 0) { - return Stream.empty; - } - const nextRemaining = remaining - events.length; - if (nextRemaining <= 0) { - return Stream.fromIterable(events); - } - return Stream.concat( - Stream.fromIterable(events), - readPage(events[events.length - 1]!.sequence, nextRemaining), - ); - }), - ); - - return readPage(sequenceExclusive, normalizedLimit); - }; - - return { - append, - readFromSequence, - readAll: () => readFromSequence(0, Number.MAX_SAFE_INTEGER), - } satisfies OrchestrationEventStoreShape; -}); - -export const OrchestrationEventStoreLive = Layer.effect(OrchestrationEventStore, makeEventStore); +import { + CommandId, + EventId, + IsoDateTime, + NonNegativeInt, + OrchestrationActorKind, + OrchestrationAggregateKind, + OrchestrationEvent, + OrchestrationEventMetadata, + OrchestrationEventType, + ProjectId, + ThreadId, +} from "@t3tools/contracts"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; +import * as SqlSchema from "effect/unstable/sql/SqlSchema"; +import { Effect, Layer, Option, Schema, Stream } from "effect"; + +import { + toPersistenceDecodeError, + toPersistenceSqlError, + type OrchestrationEventStoreError, +} from "../Errors.ts"; +import { + OrchestrationEventStore, + type OrchestrationEventStoreShape, +} from "../Services/OrchestrationEventStore.ts"; + +const decodeEvent = Schema.decodeUnknownEffect(OrchestrationEvent); +const UnknownFromJsonString = Schema.fromJsonString(Schema.Unknown); +const EventMetadataFromJsonString = Schema.fromJsonString(OrchestrationEventMetadata); + +const AppendEventRequestSchema = Schema.Struct({ + eventId: EventId, + aggregateKind: OrchestrationAggregateKind, + streamId: Schema.Union([ProjectId, ThreadId]), + type: OrchestrationEventType, + causationEventId: Schema.NullOr(EventId), + correlationId: Schema.NullOr(CommandId), + actorKind: OrchestrationActorKind, + occurredAt: IsoDateTime, + commandId: Schema.NullOr(CommandId), + payloadJson: UnknownFromJsonString, + metadataJson: EventMetadataFromJsonString, +}); + +const OrchestrationEventPersistedRowSchema = Schema.Struct({ + sequence: NonNegativeInt, + eventId: EventId, + type: OrchestrationEventType, + aggregateKind: OrchestrationAggregateKind, + aggregateId: Schema.Union([ProjectId, ThreadId]), + occurredAt: IsoDateTime, + commandId: Schema.NullOr(CommandId), + causationEventId: Schema.NullOr(EventId), + correlationId: Schema.NullOr(CommandId), + payload: UnknownFromJsonString, + metadata: EventMetadataFromJsonString, +}); + +const ReadFromSequenceRequestSchema = Schema.Struct({ + sequenceExclusive: NonNegativeInt, + limit: Schema.Number, +}); +const DEFAULT_READ_FROM_SEQUENCE_LIMIT = 1_000; +const READ_PAGE_SIZE = 500; + +function inferActorKind( + event: Omit, +): Schema.Schema.Type { + if (event.commandId !== null && event.commandId.startsWith("provider:")) { + return "provider"; + } + if (event.commandId !== null && event.commandId.startsWith("server:")) { + return "server"; + } + if ( + event.metadata.providerTurnId !== undefined || + event.metadata.providerItemId !== undefined || + event.metadata.adapterKey !== undefined + ) { + return "provider"; + } + if (event.commandId === null) { + return "server"; + } + return "client"; +} + +function toPersistenceSqlOrDecodeError(sqlOperation: string, decodeOperation: string) { + return (cause: unknown): OrchestrationEventStoreError => + Schema.isSchemaError(cause) + ? toPersistenceDecodeError(decodeOperation)(cause) + : toPersistenceSqlError(sqlOperation)(cause); +} + +const makeEventStore = Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + const appendEventRow = SqlSchema.findOne({ + Request: AppendEventRequestSchema, + Result: OrchestrationEventPersistedRowSchema, + execute: (request) => + sql` + INSERT INTO orchestration_events ( + event_id, + aggregate_kind, + stream_id, + stream_version, + event_type, + occurred_at, + command_id, + causation_event_id, + correlation_id, + actor_kind, + payload_json, + metadata_json + ) + VALUES ( + ${request.eventId}, + ${request.aggregateKind}, + ${request.streamId}, + COALESCE( + ( + SELECT stream_version + 1 + FROM orchestration_events + WHERE aggregate_kind = ${request.aggregateKind} + AND stream_id = ${request.streamId} + ORDER BY stream_version DESC + LIMIT 1 + ), + 0 + ), + ${request.type}, + ${request.occurredAt}, + ${request.commandId}, + ${request.causationEventId}, + ${request.correlationId}, + ${request.actorKind}, + ${request.payloadJson}, + ${request.metadataJson} + ) + RETURNING + sequence, + event_id AS "eventId", + event_type AS "type", + aggregate_kind AS "aggregateKind", + stream_id AS "aggregateId", + occurred_at AS "occurredAt", + command_id AS "commandId", + causation_event_id AS "causationEventId", + correlation_id AS "correlationId", + payload_json AS "payload", + metadata_json AS "metadata" + `, + }); + + const readEventRowsFromSequence = SqlSchema.findAll({ + Request: ReadFromSequenceRequestSchema, + Result: OrchestrationEventPersistedRowSchema, + execute: (request) => + sql` + SELECT + sequence, + event_id AS "eventId", + event_type AS "type", + aggregate_kind AS "aggregateKind", + stream_id AS "aggregateId", + occurred_at AS "occurredAt", + command_id AS "commandId", + causation_event_id AS "causationEventId", + correlation_id AS "correlationId", + payload_json AS "payload", + metadata_json AS "metadata" + FROM orchestration_events + WHERE sequence > ${request.sequenceExclusive} + ORDER BY sequence ASC + LIMIT ${request.limit} + `, + }); + + const append: OrchestrationEventStoreShape["append"] = (event) => + appendEventRow({ + eventId: event.eventId, + aggregateKind: event.aggregateKind, + streamId: event.aggregateId, + type: event.type, + causationEventId: event.causationEventId, + correlationId: event.correlationId, + actorKind: inferActorKind(event), + occurredAt: event.occurredAt, + commandId: event.commandId, + payloadJson: event.payload, + metadataJson: event.metadata, + }).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "OrchestrationEventStore.append:insert", + "OrchestrationEventStore.append:decodeRow", + ), + ), + Effect.flatMap((row) => + decodeEvent(row).pipe( + Effect.mapError(toPersistenceDecodeError("OrchestrationEventStore.append:rowToEvent")), + ), + ), + ); + + const readFromSequence: OrchestrationEventStoreShape["readFromSequence"] = ( + sequenceExclusive, + limit = DEFAULT_READ_FROM_SEQUENCE_LIMIT, + ) => { + const normalizedLimit = Math.max(0, Math.floor(limit)); + if (normalizedLimit === 0) { + return Stream.empty; + } + const readPage = ( + cursor: number, + remaining: number, + ): Stream.Stream => + Stream.fromEffect( + readEventRowsFromSequence({ + sequenceExclusive: cursor, + limit: Math.min(remaining, READ_PAGE_SIZE), + }).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "OrchestrationEventStore.readFromSequence:query", + "OrchestrationEventStore.readFromSequence:decodeRows", + ), + ), + Effect.flatMap((rows) => + Effect.forEach(rows, (row) => + decodeEvent(row).pipe( + Effect.map(Option.some), + Effect.catch((cause) => + Effect.logWarning( + `OrchestrationEventStore.readFromSequence: dropping event sequence=${row.sequence} due to decode error: ${String(cause)}`, + ).pipe(Effect.as(Option.none())), + ), + ), + ).pipe( + Effect.map((events) => ({ + events: events.flatMap((event) => (Option.isSome(event) ? [event.value] : [])), + lastSequence: rows.length > 0 ? rows[rows.length - 1]!.sequence : null, + })), + ), + ), + ), + ).pipe( + Stream.flatMap(({ events, lastSequence }) => { + if (lastSequence === null) { + return Stream.empty; + } + const nextRemaining = remaining - events.length; + if (nextRemaining <= 0) { + return Stream.fromIterable(events); + } + return Stream.concat(Stream.fromIterable(events), readPage(lastSequence, nextRemaining)); + }), + ); + + return readPage(sequenceExclusive, normalizedLimit); + }; + + return { + append, + readFromSequence, + readAll: () => readFromSequence(0, Number.MAX_SAFE_INTEGER), + } satisfies OrchestrationEventStoreShape; +}); + +export const OrchestrationEventStoreLive = Layer.effect(OrchestrationEventStore, makeEventStore);