diff --git a/e2e/scenarios/openai-auto-instrumentation-node-hook/__snapshots__/openai-v4.span-events.json b/e2e/scenarios/openai-auto-instrumentation-node-hook/__snapshots__/openai-v4.span-events.json index 5dfa62183..32954b365 100644 --- a/e2e/scenarios/openai-auto-instrumentation-node-hook/__snapshots__/openai-v4.span-events.json +++ b/e2e/scenarios/openai-auto-instrumentation-node-hook/__snapshots__/openai-v4.span-events.json @@ -291,16 +291,7 @@ "scenario": null }, "metric_keys": [ - "completion_accepted_prediction_tokens", - "completion_audio_tokens", - "completion_reasoning_tokens", - "completion_rejected_prediction_tokens", - "completion_tokens", - "prompt_audio_tokens", - "prompt_cached_tokens", - "prompt_tokens", - "time_to_first_token", - "tokens" + "time_to_first_token" ], "name": "Chat Completion", "root_span_id": "", @@ -646,7 +637,7 @@ "time_to_first_token", "tokens" ], - "name": "openai.responses.create", + "name": "openai.responses.parse", "root_span_id": "", "span_id": "", "span_parents": [ diff --git a/e2e/scenarios/openai-auto-instrumentation-node-hook/__snapshots__/openai-v5.span-events.json b/e2e/scenarios/openai-auto-instrumentation-node-hook/__snapshots__/openai-v5.span-events.json index 5dfa62183..32954b365 100644 --- a/e2e/scenarios/openai-auto-instrumentation-node-hook/__snapshots__/openai-v5.span-events.json +++ b/e2e/scenarios/openai-auto-instrumentation-node-hook/__snapshots__/openai-v5.span-events.json @@ -291,16 +291,7 @@ "scenario": null }, "metric_keys": [ - "completion_accepted_prediction_tokens", - "completion_audio_tokens", - "completion_reasoning_tokens", - "completion_rejected_prediction_tokens", - "completion_tokens", - "prompt_audio_tokens", - "prompt_cached_tokens", - "prompt_tokens", - "time_to_first_token", - "tokens" + "time_to_first_token" ], "name": "Chat Completion", "root_span_id": "", @@ -646,7 +637,7 @@ "time_to_first_token", "tokens" ], - "name": "openai.responses.create", + "name": "openai.responses.parse", "root_span_id": "", "span_id": "", "span_parents": [ diff --git a/e2e/scenarios/openai-auto-instrumentation-node-hook/__snapshots__/openai.span-events.json b/e2e/scenarios/openai-auto-instrumentation-node-hook/__snapshots__/openai.span-events.json index 5dfa62183..32954b365 100644 --- a/e2e/scenarios/openai-auto-instrumentation-node-hook/__snapshots__/openai.span-events.json +++ b/e2e/scenarios/openai-auto-instrumentation-node-hook/__snapshots__/openai.span-events.json @@ -291,16 +291,7 @@ "scenario": null }, "metric_keys": [ - "completion_accepted_prediction_tokens", - "completion_audio_tokens", - "completion_reasoning_tokens", - "completion_rejected_prediction_tokens", - "completion_tokens", - "prompt_audio_tokens", - "prompt_cached_tokens", - "prompt_tokens", - "time_to_first_token", - "tokens" + "time_to_first_token" ], "name": "Chat Completion", "root_span_id": "", @@ -646,7 +637,7 @@ "time_to_first_token", "tokens" ], - "name": "openai.responses.create", + "name": "openai.responses.parse", "root_span_id": "", "span_id": "", "span_parents": [ diff --git a/e2e/scenarios/openai-auto-instrumentation-node-hook/scenario.impl.mjs b/e2e/scenarios/openai-auto-instrumentation-node-hook/scenario.impl.mjs index e0e6ad75c..77574f16d 100644 --- a/e2e/scenarios/openai-auto-instrumentation-node-hook/scenario.impl.mjs +++ b/e2e/scenarios/openai-auto-instrumentation-node-hook/scenario.impl.mjs @@ -18,9 +18,6 @@ export async function runOpenAIAutoInstrumentationNodeHook( projectNameBase: "e2e-openai-auto-instrumentation-hook", rootName: "openai-auto-hook-root", scenarioName: "openai-auto-instrumentation-node-hook", - useChatParseHelper: false, - useResponsesParseHelper: false, - useSyncStreamHelper: false, }); } diff --git a/js/src/auto-instrumentations/configs/openai.test.ts b/js/src/auto-instrumentations/configs/openai.test.ts index 824f9608d..49a6f02e6 100644 --- a/js/src/auto-instrumentations/configs/openai.test.ts +++ b/js/src/auto-instrumentations/configs/openai.test.ts @@ -27,7 +27,7 @@ describe("OpenAI Instrumentation Configs", () => { functionQuery: expect.objectContaining({ className: "Completions", methodName: "create", - kind: "Async", + kind: "Sync", }), }), expect.objectContaining({ @@ -39,7 +39,7 @@ describe("OpenAI Instrumentation Configs", () => { functionQuery: expect.objectContaining({ className: "Completions", methodName: "create", - kind: "Async", + kind: "Sync", }), }), expect.objectContaining({ @@ -51,7 +51,7 @@ describe("OpenAI Instrumentation Configs", () => { functionQuery: expect.objectContaining({ className: "Completions", methodName: "create", - kind: "Async", + kind: "Sync", }), }), ]), @@ -67,7 +67,7 @@ describe("OpenAI Instrumentation Configs", () => { expect(config?.module.name).toBe("openai"); expect((config?.functionQuery as any).className).toBe("Embeddings"); expect((config?.functionQuery as any).methodName).toBe("create"); - expect((config?.functionQuery as any).kind).toBe("Async"); + expect((config?.functionQuery as any).kind).toBe("Sync"); }); it("should have moderations.create config", () => { @@ -79,7 +79,7 @@ describe("OpenAI Instrumentation Configs", () => { expect(config?.module.name).toBe("openai"); expect((config?.functionQuery as any).className).toBe("Moderations"); expect((config?.functionQuery as any).methodName).toBe("create"); - expect((config?.functionQuery as any).kind).toBe("Async"); + expect((config?.functionQuery as any).kind).toBe("Sync"); }); it("should have beta.chat.completions.parse config", () => { @@ -97,7 +97,7 @@ describe("OpenAI Instrumentation Configs", () => { functionQuery: expect.objectContaining({ className: "Completions", methodName: "parse", - kind: "Async", + kind: "Sync", }), }), expect.objectContaining({ @@ -109,7 +109,7 @@ describe("OpenAI Instrumentation Configs", () => { functionQuery: expect.objectContaining({ className: "Completions", methodName: "parse", - kind: "Async", + kind: "Sync", }), }), ]), @@ -189,7 +189,7 @@ describe("OpenAI Instrumentation Configs", () => { expect(config?.module.filePath).toBe("resources/responses/responses.mjs"); expect((config?.functionQuery as any).className).toBe("Responses"); expect((config?.functionQuery as any).methodName).toBe("create"); - expect((config?.functionQuery as any).kind).toBe("Async"); + expect((config?.functionQuery as any).kind).toBe("Sync"); }); it("should have responses.stream config with Sync kind and version >=4.87.0", () => { @@ -217,6 +217,6 @@ describe("OpenAI Instrumentation Configs", () => { expect(config?.module.filePath).toBe("resources/responses/responses.mjs"); expect((config?.functionQuery as any).className).toBe("Responses"); expect((config?.functionQuery as any).methodName).toBe("parse"); - expect((config?.functionQuery as any).kind).toBe("Async"); + expect((config?.functionQuery as any).kind).toBe("Sync"); }); }); diff --git a/js/src/auto-instrumentations/configs/openai.ts b/js/src/auto-instrumentations/configs/openai.ts index ca145c887..699098b3b 100644 --- a/js/src/auto-instrumentations/configs/openai.ts +++ b/js/src/auto-instrumentations/configs/openai.ts @@ -24,7 +24,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ functionQuery: { className: "Completions", methodName: "create", - kind: "Async", + kind: "Sync", }, }, @@ -38,7 +38,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ functionQuery: { className: "Completions", methodName: "create", - kind: "Async", + kind: "Sync", }, }, @@ -52,7 +52,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ functionQuery: { className: "Completions", methodName: "create", - kind: "Async", + kind: "Sync", }, }, @@ -67,7 +67,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ functionQuery: { className: "Embeddings", methodName: "create", - kind: "Async", + kind: "Sync", }, }, @@ -82,7 +82,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ functionQuery: { className: "Completions", methodName: "parse", - kind: "Async", + kind: "Sync", }, }, @@ -96,7 +96,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ functionQuery: { className: "Completions", methodName: "parse", - kind: "Async", + kind: "Sync", }, }, @@ -111,7 +111,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ functionQuery: { className: "Moderations", methodName: "create", - kind: "Async", + kind: "Sync", }, }, @@ -155,7 +155,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ functionQuery: { className: "Responses", methodName: "create", - kind: "Async", + kind: "Sync", }, }, @@ -183,7 +183,7 @@ export const openaiConfigs: InstrumentationConfig[] = [ functionQuery: { className: "Responses", methodName: "parse", - kind: "Async", + kind: "Sync", }, }, ]; diff --git a/js/src/instrumentation/core/channel-tracing.ts b/js/src/instrumentation/core/channel-tracing.ts index d941b6982..faa2cd639 100644 --- a/js/src/instrumentation/core/channel-tracing.ts +++ b/js/src/instrumentation/core/channel-tracing.ts @@ -122,6 +122,65 @@ export type SyncStreamChannelSpanConfig = }) => boolean; }; +export type SyncResultChannelSpanConfig< + TChannel extends AnyAsyncChannel | AnySyncStreamChannel, +> = ChannelConfig & { + extractInput: ( + args: [...ArgsOf], + event: StartOf, + span: Span, + ) => { + input: unknown; + metadata: unknown; + }; + extractOutput?: ( + result: ResultOf, + endEvent?: TChannel extends AnyAsyncChannel + ? EndOf | AsyncEndOf + : EndOf, + ) => unknown; + extractMetadata?: ( + result: ResultOf, + endEvent?: TChannel extends AnyAsyncChannel + ? EndOf | AsyncEndOf + : EndOf, + ) => unknown; + extractMetrics?: ( + result: ResultOf, + startTime?: number, + endEvent?: TChannel extends AnyAsyncChannel + ? EndOf | AsyncEndOf + : EndOf, + ) => Record; + aggregateChunks?: TChannel extends AnyAsyncChannel + ? ( + chunks: ChunkOf[], + result?: ResultOf, + endEvent?: EndOf | AsyncEndOf, + startTime?: number, + ) => { + output: unknown; + metrics: Record; + metadata?: Record; + } + : never; + patchResult?: (args: { + channelName: string; + endEvent: TChannel extends AnyAsyncChannel + ? EndOf | AsyncEndOf + : EndOf; + result: ResultOf; + span: Span; + startTime: number; + }) => boolean; +}; + +type SyncResultEndEvent< + TChannel extends AnyAsyncChannel | AnySyncStreamChannel, +> = TChannel extends AnyAsyncChannel + ? EndOf | AsyncEndOf + : EndOf; + type SyncStreamLike = { on(event: "chunk", handler: (payload?: unknown) => void): unknown; on( @@ -211,6 +270,122 @@ function logErrorAndEnd< states.delete(event as object); } +function logResultAndEnd< + TChannel extends AnyAsyncChannel | AnySyncStreamChannel, +>( + states: WeakMap, + config: SyncResultChannelSpanConfig, + channelName: string, + event: SyncResultEndEvent, +): void { + const spanData = states.get(event as object); + if (!spanData) { + return; + } + + const { span, startTime } = spanData; + const result = event.result as ResultOf; + + if ( + config.patchResult?.({ + channelName, + endEvent: event as SyncResultEndEvent, + result, + span, + startTime, + }) + ) { + states.delete(event as object); + return; + } + + if (config.aggregateChunks && isAsyncIterable(result)) { + let firstChunkTime: number | undefined; + + patchStreamIfNeeded(result, { + onChunk: () => { + if (firstChunkTime === undefined) { + firstChunkTime = getCurrentUnixTimestamp(); + } + }, + onComplete: (chunks) => { + try { + const aggregated = config.aggregateChunks?.( + chunks as ChunkOf>[], + result, + event, + startTime, + ); + + if (!aggregated) { + span.end(); + return; + } + + if ( + aggregated.metrics.time_to_first_token === undefined && + firstChunkTime !== undefined + ) { + aggregated.metrics.time_to_first_token = firstChunkTime - startTime; + } else if ( + aggregated.metrics.time_to_first_token === undefined && + chunks.length > 0 + ) { + aggregated.metrics.time_to_first_token = + getCurrentUnixTimestamp() - startTime; + } + + span.log({ + output: aggregated.output, + ...(aggregated.metadata !== undefined + ? { metadata: aggregated.metadata } + : {}), + metrics: aggregated.metrics, + }); + } catch (error) { + console.error(`Error extracting output for ${channelName}:`, error); + } finally { + span.end(); + states.delete(event as object); + } + }, + onError: (error: Error) => { + span.log({ + error: error.message, + }); + span.end(); + states.delete(event as object); + }, + }); + return; + } + + try { + const output = config.extractOutput?.(result, event); + const metrics = config.extractMetrics?.(result, startTime, event); + const metadata = config.extractMetadata?.(result, event); + + if ( + output !== undefined || + metrics !== undefined || + normalizeMetadata(metadata) !== undefined + ) { + span.log({ + ...(output !== undefined ? { output } : {}), + ...(normalizeMetadata(metadata) !== undefined + ? { metadata: normalizeMetadata(metadata) } + : {}), + ...(metrics !== undefined ? { metrics } : {}), + }); + } + } catch (error) { + console.error(`Error extracting output for ${channelName}:`, error); + } finally { + span.end(); + states.delete(event as object); + } +} + export function traceAsyncChannel( channel: TChannel, config: AsyncChannelSpanConfig, @@ -442,6 +617,82 @@ export function traceStreamingChannel( }; } +export function traceSyncResultChannel< + TChannel extends AnyAsyncChannel | AnySyncStreamChannel, +>( + channel: TChannel, + config: SyncResultChannelSpanConfig, +): () => void { + const tracingChannel = channel.tracingChannel() as IsoTracingChannel< + ChannelMessage + >; + const states = new WeakMap(); + const channelName = channel.channelName; + + const handlers: IsoChannelHandlers> = { + start: (event) => { + states.set( + event as object, + startSpanForEvent( + config, + event as StartOf, + channelName, + ), + ); + }, + end: (event) => { + const spanData = states.get(event as object); + if (!spanData) { + return; + } + + const { span, startTime } = spanData; + const endEvent = event as EndOf; + + if ( + config.patchResult?.({ + channelName, + endEvent: endEvent as SyncResultEndEvent, + result: endEvent.result, + span, + startTime, + }) + ) { + states.delete(event as object); + return; + } + + if (channel.kind === "async") { + return; + } + + logResultAndEnd( + states, + config, + channelName, + endEvent as SyncResultEndEvent, + ); + }, + asyncEnd: (event) => { + logResultAndEnd( + states, + config, + channelName, + event as SyncResultEndEvent, + ); + }, + error: (event) => { + logErrorAndEnd(states, event as ErrorOf); + }, + }; + + tracingChannel.subscribe(handlers); + + return () => { + tracingChannel.unsubscribe(handlers); + }; +} + export function traceSyncStreamChannel( channel: TChannel, config: SyncStreamChannelSpanConfig, diff --git a/js/src/instrumentation/plugins/openai-plugin.test.ts b/js/src/instrumentation/plugins/openai-plugin.test.ts index a4c0d6484..c642ee8a7 100644 --- a/js/src/instrumentation/plugins/openai-plugin.test.ts +++ b/js/src/instrumentation/plugins/openai-plugin.test.ts @@ -1,8 +1,9 @@ -import { describe, it, expect } from "vitest"; +import { describe, it, expect, vi } from "vitest"; import { parseMetricsFromUsage, processImagesInOutput, aggregateChatCompletionChunks, + patchOpenAIAPIPromiseResult, } from "./openai-plugin"; import { Attachment } from "../../logger"; @@ -1258,3 +1259,109 @@ describe("processImagesInOutput", () => { }); }); }); + +describe("patchOpenAIAPIPromiseResult", () => { + it("preserves helper methods while tracing resolved withResponse results", async () => { + const thenUnwrap = vi.fn(); + const withResponse = vi.fn().mockResolvedValue({ + data: { + choices: [{ message: { content: "ok", role: "assistant" } }], + usage: { + prompt_tokens: 1, + completion_tokens: 1, + total_tokens: 2, + }, + }, + request_id: "req_123", + response: new Response(null, { + headers: { "x-request-id": "req_123" }, + }), + }); + + const apiPromise = { + _thenUnwrap: thenUnwrap, + catch(onRejected?: (reason: unknown) => unknown) { + return Promise.resolve(this).catch(onRejected); + }, + finally(onFinally?: () => void) { + return Promise.resolve(this).finally(onFinally); + }, + then( + onFulfilled?: (value: unknown) => unknown, + onRejected?: (reason: unknown) => unknown, + ) { + return Promise.resolve(this).then(onFulfilled, onRejected); + }, + withResponse, + }; + + const span = { + end: vi.fn(), + log: vi.fn(), + }; + + expect( + patchOpenAIAPIPromiseResult({ + config: { + extractMetadata: () => ({ provider: "openai" }), + extractMetrics: () => ({ tokens: 2 }), + extractOutput: (result: any) => result.choices, + }, + result: apiPromise, + span: span as any, + startTime: 0, + }), + ).toBe(true); + + const enhanced = await apiPromise.withResponse(); + const data = await apiPromise; + + expect(apiPromise._thenUnwrap).toBe(thenUnwrap); + expect(withResponse).toHaveBeenCalledTimes(1); + expect(enhanced.request_id).toBe("req_123"); + expect(data).toEqual(enhanced.data); + expect(span.log).toHaveBeenCalledWith({ + metadata: { provider: "openai" }, + metrics: { tokens: 2 }, + output: enhanced.data.choices, + }); + expect(span.end).toHaveBeenCalledTimes(1); + }); + + it("traces plain promise results when withResponse is unavailable", async () => { + const result = Promise.resolve({ + choices: [{ message: { content: "ok", role: "assistant" } }], + usage: { + prompt_tokens: 1, + completion_tokens: 1, + total_tokens: 2, + }, + }); + + const span = { + end: vi.fn(), + log: vi.fn(), + }; + + expect( + patchOpenAIAPIPromiseResult({ + config: { + extractMetrics: () => ({ tokens: 2 }), + extractOutput: (resolvedResult: any) => resolvedResult.choices, + }, + result, + span: span as any, + startTime: 0, + }), + ).toBe(true); + + const data = await result; + + expect(data.choices).toHaveLength(1); + expect(span.log).toHaveBeenCalledWith({ + metrics: { tokens: 2 }, + output: data.choices, + }); + expect(span.end).toHaveBeenCalledTimes(1); + }); +}); diff --git a/js/src/instrumentation/plugins/openai-plugin.ts b/js/src/instrumentation/plugins/openai-plugin.ts index 62a48e62f..0d44f6183 100644 --- a/js/src/instrumentation/plugins/openai-plugin.ts +++ b/js/src/instrumentation/plugins/openai-plugin.ts @@ -1,31 +1,66 @@ -import { BasePlugin } from "../core"; -import { - traceAsyncChannel, - traceStreamingChannel, - traceSyncStreamChannel, - unsubscribeAll, -} from "../core/channel-tracing"; +import type { Span } from "../../logger"; import { Attachment } from "../../logger"; -import { SpanTypeAttribute, isObject } from "../../../util/index"; -import { getCurrentUnixTimestamp } from "../../util"; -import { processInputAttachments } from "../../wrappers/attachment-utils"; -import { openAIChannels } from "./openai-channels"; import { BRAINTRUST_CACHED_STREAM_METRIC, getCachedMetricFromHeaders, parseMetricsFromUsage, } from "../../openai-utils"; +import { getCurrentUnixTimestamp } from "../../util"; +import { processInputAttachments } from "../../wrappers/attachment-utils"; +import { SpanTypeAttribute, isObject } from "../../../util/index"; +import { BasePlugin } from "../core"; +import { + traceSyncResultChannel, + traceSyncStreamChannel, + unsubscribeAll, +} from "../core/channel-tracing"; +import { isAsyncIterable, patchStreamIfNeeded } from "../core/stream-patcher"; +import { openAIChannels } from "./openai-channels"; import type { OpenAIChatChoice, OpenAIChatCompletionChunk, OpenAIResponseStreamEvent, } from "../../vendor-sdk-types/openai"; +type OpenAIWithResponseLike = { + data: TResult; + response: Response; + [key: string]: unknown; +}; + +type OpenAIAPIPromiseLike = Promise & { + withResponse(): Promise>; +}; + +type OpenAIThenableLike = Promise & { + withResponse?: () => Promise>; +}; + +type OpenAIPromisePatchConfig = { + extractOutput: (result: TResult, endEvent?: unknown) => unknown; + extractMetadata?: (result: TResult, endEvent?: unknown) => unknown; + extractMetrics: ( + result: TResult, + startTime?: number, + endEvent?: unknown, + ) => Record; + aggregateChunks?: ( + chunks: TChunk[], + result?: TResult, + endEvent?: unknown, + startTime?: number, + ) => { + output: unknown; + metrics: Record; + metadata?: Record; + }; +}; + /** * Plugin for OpenAI SDK instrumentation. * * Handles instrumentation for: - * - Chat completions (streaming and non-streaming) + * - Chat completions * - Embeddings * - Moderations * - Beta API (parse, stream) @@ -37,9 +72,47 @@ export class OpenAIPlugin extends BasePlugin { } protected onEnable(): void { - // Chat Completions - supports streaming + const chatConfig = { + aggregateChunks: aggregateChatCompletionChunks, + extractMetrics: (result: any, startTime?: number, endEvent?: unknown) => { + const metrics = withCachedMetric( + parseMetricsFromUsage(result?.usage), + result, + endEvent, + ); + if (startTime) { + metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; + } + return metrics; + }, + extractOutput: (result: any) => result?.choices, + } satisfies OpenAIPromisePatchConfig; + + const responsesConfig = { + aggregateChunks: aggregateResponseStreamEvents, + extractMetadata: (result: any) => { + if (!result) { + return undefined; + } + const { output: _output, usage: _usage, ...metadata } = result; + return Object.keys(metadata).length > 0 ? metadata : undefined; + }, + extractMetrics: (result: any, startTime?: number, endEvent?: unknown) => { + const metrics = withCachedMetric( + parseMetricsFromUsage(result?.usage), + result, + endEvent, + ); + if (startTime) { + metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; + } + return metrics; + }, + extractOutput: (result: any) => processImagesInOutput(result?.output), + } satisfies OpenAIPromisePatchConfig; + this.unsubscribers.push( - traceStreamingChannel(openAIChannels.chatCompletionsCreate, { + traceSyncResultChannel(openAIChannels.chatCompletionsCreate, { name: "Chat Completion", type: SpanTypeAttribute.LLM, extractInput: ([params]) => { @@ -49,27 +122,21 @@ export class OpenAIPlugin extends BasePlugin { metadata: { ...metadata, provider: "openai" }, }; }, - extractOutput: (result) => { - return result?.choices; - }, - extractMetrics: (result, startTime, endEvent) => { - const metrics = withCachedMetric( - parseMetricsFromUsage(result?.usage), + aggregateChunks: chatConfig.aggregateChunks, + extractMetrics: chatConfig.extractMetrics, + extractOutput: chatConfig.extractOutput, + patchResult: ({ result, span, startTime }) => + patchOpenAIAPIPromiseResult({ + config: chatConfig, result, - endEvent, - ); - if (startTime) { - metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; - } - return metrics; - }, - aggregateChunks: aggregateChatCompletionChunks, + span, + startTime, + }), }), ); - // Embeddings this.unsubscribers.push( - traceAsyncChannel(openAIChannels.embeddingsCreate, { + traceSyncResultChannel(openAIChannels.embeddingsCreate, { name: "Embedding", type: SpanTypeAttribute.LLM, extractInput: ([params]) => { @@ -79,25 +146,51 @@ export class OpenAIPlugin extends BasePlugin { metadata: { ...metadata, provider: "openai" }, }; }, - extractOutput: (result) => { - const embedding = result?.data?.[0]?.embedding; + extractMetrics: ( + resolvedResult: any, + _startTime, + endEvent, + ): Record => + withCachedMetric( + parseMetricsFromUsage(resolvedResult?.usage), + resolvedResult, + endEvent, + ), + extractOutput: (resolvedResult: any) => { + const embedding = resolvedResult?.data?.[0]?.embedding; return Array.isArray(embedding) ? { embedding_length: embedding.length } : undefined; }, - extractMetrics: (result, _startTime, endEvent) => { - return withCachedMetric( - parseMetricsFromUsage(result?.usage), + patchResult: ({ result, span, startTime }) => + patchOpenAIAPIPromiseResult({ + config: { + extractMetrics: ( + resolvedResult: any, + _startTime, + endEvent, + ): Record => + withCachedMetric( + parseMetricsFromUsage(resolvedResult?.usage), + resolvedResult, + endEvent, + ), + extractOutput: (resolvedResult: any) => { + const embedding = resolvedResult?.data?.[0]?.embedding; + return Array.isArray(embedding) + ? { embedding_length: embedding.length } + : undefined; + }, + }, result, - endEvent, - ); - }, + span, + startTime, + }), }), ); - // Beta Chat Completions Parse this.unsubscribers.push( - traceStreamingChannel(openAIChannels.betaChatCompletionsParse, { + traceSyncResultChannel(openAIChannels.betaChatCompletionsParse, { name: "Chat Completion", type: SpanTypeAttribute.LLM, extractInput: ([params]) => { @@ -107,25 +200,19 @@ export class OpenAIPlugin extends BasePlugin { metadata: { ...metadata, provider: "openai" }, }; }, - extractOutput: (result) => { - return result?.choices; - }, - extractMetrics: (result, startTime, endEvent) => { - const metrics = withCachedMetric( - parseMetricsFromUsage(result?.usage), + aggregateChunks: chatConfig.aggregateChunks, + extractMetrics: chatConfig.extractMetrics, + extractOutput: chatConfig.extractOutput, + patchResult: ({ result, span, startTime }) => + patchOpenAIAPIPromiseResult({ + config: chatConfig, result, - endEvent, - ); - if (startTime) { - metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; - } - return metrics; - }, - aggregateChunks: aggregateChatCompletionChunks, + span, + startTime, + }), }), ); - // Beta Chat Completions Stream (sync method returning event-based stream) this.unsubscribers.push( traceSyncStreamChannel(openAIChannels.betaChatCompletionsStream, { name: "Chat Completion", @@ -140,9 +227,8 @@ export class OpenAIPlugin extends BasePlugin { }), ); - // Moderations this.unsubscribers.push( - traceAsyncChannel(openAIChannels.moderationsCreate, { + traceSyncResultChannel(openAIChannels.moderationsCreate, { name: "Moderation", type: SpanTypeAttribute.LLM, extractInput: ([params]) => { @@ -152,22 +238,41 @@ export class OpenAIPlugin extends BasePlugin { metadata: { ...metadata, provider: "openai" }, }; }, - extractOutput: (result) => { - return result?.results; - }, - extractMetrics: (result, _startTime, endEvent) => { - return withCachedMetric( - parseMetricsFromUsage(result?.usage), - result, + extractMetrics: ( + resolvedResult: any, + _startTime, + endEvent, + ): Record => + withCachedMetric( + parseMetricsFromUsage(resolvedResult?.usage), + resolvedResult, endEvent, - ); - }, + ), + extractOutput: (resolvedResult: any) => resolvedResult?.results, + patchResult: ({ result, span, startTime }) => + patchOpenAIAPIPromiseResult({ + config: { + extractMetrics: ( + resolvedResult: any, + _startTime, + endEvent, + ): Record => + withCachedMetric( + parseMetricsFromUsage(resolvedResult?.usage), + resolvedResult, + endEvent, + ), + extractOutput: (resolvedResult: any) => resolvedResult?.results, + }, + result, + span, + startTime, + }), }), ); - // Responses API - create (supports streaming via stream=true param) this.unsubscribers.push( - traceStreamingChannel(openAIChannels.responsesCreate, { + traceSyncResultChannel(openAIChannels.responsesCreate, { name: "openai.responses.create", type: SpanTypeAttribute.LLM, extractInput: ([params]) => { @@ -177,32 +282,20 @@ export class OpenAIPlugin extends BasePlugin { metadata: { ...metadata, provider: "openai" }, }; }, - extractOutput: (result) => { - return processImagesInOutput(result?.output); - }, - extractMetadata: (result) => { - if (!result) { - return undefined; - } - const { output: _output, usage: _usage, ...metadata } = result; - return Object.keys(metadata).length > 0 ? metadata : undefined; - }, - extractMetrics: (result, startTime, endEvent) => { - const metrics = withCachedMetric( - parseMetricsFromUsage(result?.usage), + aggregateChunks: responsesConfig.aggregateChunks, + extractMetadata: responsesConfig.extractMetadata, + extractMetrics: responsesConfig.extractMetrics, + extractOutput: responsesConfig.extractOutput, + patchResult: ({ result, span, startTime }) => + patchOpenAIAPIPromiseResult({ + config: responsesConfig, result, - endEvent, - ); - if (startTime) { - metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; - } - return metrics; - }, - aggregateChunks: aggregateResponseStreamEvents, + span, + startTime, + }), }), ); - // Responses API - stream (sync method returning event-based stream) this.unsubscribers.push( traceSyncStreamChannel(openAIChannels.responsesStream, { name: "openai.responses.create", @@ -237,9 +330,8 @@ export class OpenAIPlugin extends BasePlugin { }), ); - // Responses API - parse this.unsubscribers.push( - traceStreamingChannel(openAIChannels.responsesParse, { + traceSyncResultChannel(openAIChannels.responsesParse, { name: "openai.responses.parse", type: SpanTypeAttribute.LLM, extractInput: ([params]) => { @@ -249,28 +341,17 @@ export class OpenAIPlugin extends BasePlugin { metadata: { ...metadata, provider: "openai" }, }; }, - extractOutput: (result) => { - return processImagesInOutput(result?.output); - }, - extractMetadata: (result) => { - if (!result) { - return undefined; - } - const { output: _output, usage: _usage, ...metadata } = result; - return Object.keys(metadata).length > 0 ? metadata : undefined; - }, - extractMetrics: (result, startTime, endEvent) => { - const metrics = withCachedMetric( - parseMetricsFromUsage(result?.usage), + aggregateChunks: responsesConfig.aggregateChunks, + extractMetadata: responsesConfig.extractMetadata, + extractMetrics: responsesConfig.extractMetrics, + extractOutput: responsesConfig.extractOutput, + patchResult: ({ result, span, startTime }) => + patchOpenAIAPIPromiseResult({ + config: responsesConfig, result, - endEvent, - ); - if (startTime) { - metrics.time_to_first_token = getCurrentUnixTimestamp() - startTime; - } - return metrics; - }, - aggregateChunks: aggregateResponseStreamEvents, + span, + startTime, + }), }), ); } @@ -280,6 +361,212 @@ export class OpenAIPlugin extends BasePlugin { } } +function isOpenAIAPIPromiseLike( + value: unknown, +): value is OpenAIAPIPromiseLike { + return ( + !!value && + (typeof value === "object" || typeof value === "function") && + typeof (value as { then?: unknown }).then === "function" && + typeof (value as { withResponse?: unknown }).withResponse === "function" + ); +} + +function isPromiseLike( + value: unknown, +): value is PromiseLike & object { + return ( + !!value && + (typeof value === "object" || typeof value === "function") && + typeof (value as { then?: unknown }).then === "function" + ); +} + +export function patchOpenAIAPIPromiseResult(args: { + config: OpenAIPromisePatchConfig; + result: unknown; + span: Span; + startTime: number; +}): boolean { + const { config, result, span, startTime } = args; + + if (!isPromiseLike(result) || !Object.isExtensible(result)) { + return false; + } + + const promiseLike = result as OpenAIThenableLike; + const originalThen = promiseLike.then.bind(promiseLike); + const originalWithResponse = isOpenAIAPIPromiseLike(promiseLike) + ? promiseLike.withResponse.bind(promiseLike) + : null; + let executionPromise: Promise> | null = null; + let dataPromise: Promise | null = null; + + const logResolvedData = (data: TResult, response?: Response): TResult => { + const endEvent = response ? { response } : undefined; + + if (isAsyncIterable(data)) { + let firstChunkTime: number | undefined; + + patchStreamIfNeeded(data, { + onChunk: () => { + if (firstChunkTime === undefined) { + firstChunkTime = getCurrentUnixTimestamp(); + } + }, + onComplete: (chunks) => { + try { + if (!config.aggregateChunks) { + span.end(); + return; + } + + const aggregated = config.aggregateChunks( + chunks, + data, + endEvent, + startTime, + ); + + if ( + aggregated.metrics.time_to_first_token === undefined && + firstChunkTime !== undefined + ) { + aggregated.metrics.time_to_first_token = + firstChunkTime - startTime; + } else if ( + aggregated.metrics.time_to_first_token === undefined && + chunks.length > 0 + ) { + aggregated.metrics.time_to_first_token = + getCurrentUnixTimestamp() - startTime; + } + + span.log({ + output: aggregated.output, + ...(aggregated.metadata !== undefined + ? { metadata: aggregated.metadata } + : {}), + metrics: aggregated.metrics, + }); + } catch (error) { + console.error("Error extracting OpenAI stream output:", error); + } finally { + span.end(); + } + }, + onError: (error) => { + span.log({ + error: error.message, + }); + span.end(); + }, + }); + + return data; + } + + const output = config.extractOutput(data, endEvent); + const metrics = config.extractMetrics(data, startTime, endEvent); + const metadata = config.extractMetadata?.(data, endEvent); + const normalizedMetadata = isObject(metadata) + ? (metadata as Record) + : undefined; + + span.log({ + output, + ...(normalizedMetadata !== undefined + ? { metadata: normalizedMetadata } + : {}), + metrics, + }); + span.end(); + + return data; + }; + + const ensureDataPromise = (): Promise => { + if (!dataPromise) { + if (originalWithResponse) { + if (!executionPromise) { + executionPromise = originalWithResponse() + .then((enhancedResponse) => { + logResolvedData(enhancedResponse.data, enhancedResponse.response); + return enhancedResponse; + }) + .catch((error: unknown) => { + const resolvedError = + error instanceof Error ? error : new Error(String(error)); + span.log({ + error: resolvedError.message, + }); + span.end(); + throw resolvedError; + }); + } + + dataPromise = executionPromise.then(({ data }) => data); + } else { + dataPromise = Promise.resolve( + originalThen((data) => logResolvedData(data)), + ).catch((error: unknown) => { + const resolvedError = + error instanceof Error ? error : new Error(String(error)); + span.log({ + error: resolvedError.message, + }); + span.end(); + throw resolvedError; + }); + } + } + + return dataPromise; + }; + + if (!originalWithResponse) { + void ensureDataPromise(); + return true; + } + + Object.defineProperties(promiseLike, { + catch: { + configurable: true, + value(onRejected?: (reason: unknown) => unknown) { + return ensureDataPromise().catch(onRejected); + }, + }, + finally: { + configurable: true, + value(onFinally?: () => void) { + return ensureDataPromise().finally(onFinally); + }, + }, + then: { + configurable: true, + value( + onFulfilled?: (value: TResult) => unknown, + onRejected?: (reason: unknown) => unknown, + ) { + return ensureDataPromise().then(onFulfilled, onRejected); + }, + }, + }); + + Object.defineProperty(promiseLike, "withResponse", { + configurable: true, + value() { + if (!executionPromise) { + void ensureDataPromise(); + } + + return executionPromise!; + }, + }); + + return true; +} + function getCachedMetricFromEndEvent(endEvent: unknown): number | undefined { if (!isObject(endEvent)) { return undefined; @@ -357,7 +644,6 @@ export function processImagesInOutput(output: any): any { : "generated_image"; const filename = `${baseFilename}.${fileExtension}`; - // Convert base64 string to Blob const binaryString = atob(output.result); const bytes = new Uint8Array(binaryString.length); for (let i = 0; i < binaryString.length; i++) { diff --git a/js/tests/auto-instrumentations/error-handling.test.ts b/js/tests/auto-instrumentations/error-handling.test.ts index 52eb876f3..4f5024be3 100644 --- a/js/tests/auto-instrumentations/error-handling.test.ts +++ b/js/tests/auto-instrumentations/error-handling.test.ts @@ -98,10 +98,10 @@ describe("Error Handling", () => { // Give events time to emit await new Promise((resolve) => setImmediate(resolve)); - // Verify error event was emitted - expect(collector.error.length).toBeGreaterThan(0); - expect(collector.error[0].error).toBeDefined(); - expect(collector.error[0].error.message).toBe("Test error"); + // traceSync publishes the returned promise on end; the rejection happens later. + expect(collector.end.length).toBeGreaterThan(0); + await expect(collector.end[0].result).rejects.toThrow("Test error"); + expect(collector.error).toHaveLength(0); }); it("should emit error event with correct error details", async () => { @@ -157,12 +157,13 @@ describe("Error Handling", () => { await new Promise((resolve) => setImmediate(resolve)); - // Verify error details are captured - expect(collector.error.length).toBeGreaterThan(0); - const errorEvent = collector.error[0]; - expect(errorEvent.error.message).toBe("API failure"); - expect(errorEvent.error.name).toBe("CustomError"); - expect(errorEvent.error.code).toBe("ERR_API_FAILURE"); + expect(collector.end.length).toBeGreaterThan(0); + await expect(collector.end[0].result).rejects.toMatchObject({ + code: "ERR_API_FAILURE", + message: "API failure", + name: "CustomError", + }); + expect(collector.error).toHaveLength(0); }); }); @@ -208,9 +209,11 @@ describe("Error Handling", () => { // Verify error is still thrown await expect(bundled.run()).rejects.toThrow("Propagated error"); - // Also verify error event was emitted + // The rejection is still propagated to the caller and also remains on the end event promise. await new Promise((resolve) => setImmediate(resolve)); - expect(collector.error.length).toBeGreaterThan(0); + expect(collector.end.length).toBeGreaterThan(0); + await expect(collector.end[0].result).rejects.toThrow("Propagated error"); + expect(collector.error).toHaveLength(0); }); it("should handle errors in promise rejections", async () => { @@ -252,10 +255,13 @@ describe("Error Handling", () => { // Verify promise rejection is propagated await expect(bundled.run()).rejects.toThrow("Promise rejection"); - // Verify error event was emitted + // The rejection is carried by the returned promise rather than a channel error event. await new Promise((resolve) => setImmediate(resolve)); - expect(collector.error.length).toBeGreaterThan(0); - expect(collector.error[0].error.message).toBe("Promise rejection"); + expect(collector.end.length).toBeGreaterThan(0); + await expect(collector.end[0].result).rejects.toThrow( + "Promise rejection", + ); + expect(collector.error).toHaveLength(0); }); }); @@ -305,13 +311,14 @@ describe("Error Handling", () => { await new Promise((resolve) => setImmediate(resolve)); - // Verify both start and error events were emitted + // traceSync emits start/end immediately for the returned promise. expect(collector.start.length).toBeGreaterThan(0); - expect(collector.error.length).toBeGreaterThan(0); + expect(collector.end.length).toBeGreaterThan(0); + expect(collector.error).toHaveLength(0); - // Verify start event came before error event + // Verify start event came before end event expect(collector.start[0].timestamp).toBeLessThanOrEqual( - collector.error[0].timestamp, + collector.end[0].timestamp, ); }); @@ -360,17 +367,9 @@ describe("Error Handling", () => { await new Promise((resolve) => setImmediate(resolve)); - // Verify error event was emitted - expect(collector.error.length).toBeGreaterThan(0); - - // Verify end event was NOT emitted (or if emitted, came before or at the same time as error due to asyncEnd) - // Note: For async functions, asyncEnd might still fire, but end should not - if (collector.end.length > 0) { - // If end event exists, it should be before or at the same time as the error - expect(collector.end[0].timestamp).toBeLessThanOrEqual( - collector.error[0].timestamp, - ); - } + expect(collector.end.length).toBeGreaterThan(0); + await expect(collector.end[0].result).rejects.toThrow("Test error"); + expect(collector.error).toHaveLength(0); }); }); diff --git a/js/tests/auto-instrumentations/event-content.test.ts b/js/tests/auto-instrumentations/event-content.test.ts index 40e426a97..2aa697bb4 100644 --- a/js/tests/auto-instrumentations/event-content.test.ts +++ b/js/tests/auto-instrumentations/event-content.test.ts @@ -239,21 +239,14 @@ describe("Event Content Validation", () => { await new Promise((resolve) => setImmediate(resolve)); - // Verify end event was emitted with result - // Note: For async functions, result appears in asyncEnd event - const hasAsyncEnd = collector.asyncEnd.length > 0; - const hasEnd = collector.end.length > 0; - - expect(hasAsyncEnd || hasEnd).toBe(true); - - // Check the appropriate event type - const resultEvent = hasAsyncEnd - ? collector.asyncEnd[0] - : collector.end[0]; - expect(resultEvent.result).toBeDefined(); - expect(resultEvent.result.id).toBe("chatcmpl-123"); - expect(resultEvent.result.model).toBe("gpt-4"); - expect(resultEvent.result.choices[0].message.content).toBe("Hello!"); + // OpenAI methods are transformed with traceSync, so end carries the returned promise. + expect(collector.end.length).toBeGreaterThan(0); + + const result = await collector.end[0].result; + expect(result).toBeDefined(); + expect(result.id).toBe("chatcmpl-123"); + expect(result.model).toBe("gpt-4"); + expect(result.choices[0].message.content).toBe("Hello!"); }); it("should capture complex nested results", async () => { @@ -319,21 +312,15 @@ describe("Event Content Validation", () => { await new Promise((resolve) => setImmediate(resolve)); - // Verify complex result structure is captured - const hasAsyncEnd = collector.asyncEnd.length > 0; - const hasEnd = collector.end.length > 0; - - expect(hasAsyncEnd || hasEnd).toBe(true); - - const resultEvent = hasAsyncEnd - ? collector.asyncEnd[0] - : collector.end[0]; - expect(resultEvent.result).toBeDefined(); - expect(resultEvent.result.usage.total_tokens).toBe(75); - expect(resultEvent.result.choices[0].message.tool_calls).toHaveLength(1); - expect( - resultEvent.result.choices[0].message.tool_calls[0].function.name, - ).toBe("get_weather"); + expect(collector.end.length).toBeGreaterThan(0); + + const result = await collector.end[0].result; + expect(result).toBeDefined(); + expect(result.usage.total_tokens).toBe(75); + expect(result.choices[0].message.tool_calls).toHaveLength(1); + expect(result.choices[0].message.tool_calls[0].function.name).toBe( + "get_weather", + ); }); }); diff --git a/js/tests/auto-instrumentations/fixtures/listener-esm.mjs b/js/tests/auto-instrumentations/fixtures/listener-esm.mjs index 4ab2f726a..4a272cee7 100644 --- a/js/tests/auto-instrumentations/fixtures/listener-esm.mjs +++ b/js/tests/auto-instrumentations/fixtures/listener-esm.mjs @@ -9,6 +9,14 @@ const expectedChannel = "orchestrion:openai:chat.completions.create"; // Get the kStoreKey symbol to access the store const kStoreKey = dc.kStoreKey || Symbol.for("diagnostics_channel.store"); +function serializeResult(ctx) { + try { + return ctx.result ? JSON.parse(JSON.stringify(ctx.result)) : null; + } catch { + return { hasResult: ctx.result !== undefined }; + } +} + // Subscribe to the channel and accumulate events const channel = tracingChannel(expectedChannel); channel.subscribe({ @@ -20,10 +28,14 @@ channel.subscribe({ self: !!store?.self, }); }, + end: (ctx) => { + events.end.push({ + result: serializeResult(ctx), + }); + }, asyncEnd: (ctx) => { - // Only send serializable result data events.end.push({ - result: ctx.result ? JSON.parse(JSON.stringify(ctx.result)) : null, + result: serializeResult(ctx), }); }, error: (ctx) => { diff --git a/js/tests/auto-instrumentations/streaming-and-responses.test.ts b/js/tests/auto-instrumentations/streaming-and-responses.test.ts index 8d4a40ff5..4f4216861 100644 --- a/js/tests/auto-instrumentations/streaming-and-responses.test.ts +++ b/js/tests/auto-instrumentations/streaming-and-responses.test.ts @@ -239,7 +239,7 @@ describe("Streaming Methods and Responses API", () => { // Verify events were captured expect(collector.start.length).toBeGreaterThan(0); - expect(collector.asyncEnd.length).toBeGreaterThan(0); + expect(collector.end.length).toBeGreaterThan(0); // Verify input was captured const startEvent = collector.start[0]; @@ -336,7 +336,7 @@ describe("Streaming Methods and Responses API", () => { // Verify events were captured expect(collector.start.length).toBeGreaterThan(0); - expect(collector.asyncEnd.length).toBeGreaterThan(0); + expect(collector.end.length).toBeGreaterThan(0); }); }); @@ -484,7 +484,7 @@ describe("Streaming Methods and Responses API", () => { // Verify events were captured expect(collector.start.length).toBeGreaterThan(0); - expect(collector.asyncEnd.length).toBeGreaterThan(0); + expect(collector.end.length).toBeGreaterThan(0); // Verify input was captured const startEvent = collector.start[0]; @@ -581,7 +581,7 @@ describe("Streaming Methods and Responses API", () => { // Verify events were captured expect(collector.start.length).toBeGreaterThan(0); - expect(collector.asyncEnd.length).toBeGreaterThan(0); + expect(collector.end.length).toBeGreaterThan(0); }); }); }); diff --git a/js/tests/auto-instrumentations/transformation.test.ts b/js/tests/auto-instrumentations/transformation.test.ts index 96f2eb64a..1db63a165 100644 --- a/js/tests/auto-instrumentations/transformation.test.ts +++ b/js/tests/auto-instrumentations/transformation.test.ts @@ -68,7 +68,7 @@ describe("Orchestrion Transformation Tests", () => { // Verify orchestrion transformed the code expect(output).toContain("tracingChannel"); expect(output).toContain("orchestrion:openai:chat.completions.create"); - expect(output).toContain("tracePromise"); + expect(output).toContain("traceSync"); }); it("should bundle dc-browser module when browser: true", async () => { @@ -149,7 +149,7 @@ describe("Orchestrion Transformation Tests", () => { // Verify orchestrion transformed the code expect(output).toContain("tracingChannel"); expect(output).toContain("orchestrion:openai:chat.completions.create"); - expect(output).toContain("tracePromise"); + expect(output).toContain("traceSync"); }); it("should bundle dc-browser module when browser: true", async () => { @@ -243,7 +243,7 @@ describe("Orchestrion Transformation Tests", () => { // Verify orchestrion transformed the code expect(output).toContain("tracingChannel"); expect(output).toContain("orchestrion:openai:chat.completions.create"); - expect(output).toContain("tracePromise"); + expect(output).toContain("traceSync"); }); it("should bundle dc-browser module when browser: true", async () => {