diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 65177900a..87c645213 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -524,6 +524,7 @@ dependencies = [ "time", "tokio", "tokio-util", + "tonic 0.14.5", "tonic-types", "tower", "tower-http", @@ -3623,8 +3624,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fec7c61a0695dc1887c1b53952990f3ad2e3a31453e1f49f10e75424943a93ec" dependencies = [ "async-trait", + "axum", "base64 0.22.1", "bytes", + "h2", "http 1.4.0", "http-body 1.0.1", "http-body-util", @@ -3633,6 +3636,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", + "socket2 0.6.2", "sync_wrapper", "tokio", "tokio-stream", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index e6611236e..2b74ff578 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -45,6 +45,7 @@ rustls-pki-types = { version = "1.0", default-features = false } hyper-rustls = { version = "0.27.7", default-features = false } rand = { version = "0.8", default-features = false } prost = { version = "0.14", default-features = false } +tonic = { version = "0.14", features = ["transport", "codegen", "server", "channel", "router"], default-features = false } tonic-types = { version = "0.14", default-features = false } zstd = { version = "0.13.3", default-features = false } futures = { version = "0.3.31", default-features = false } diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index 3c2ef31cb..1c87d5d22 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -52,7 +52,10 @@ use bottlecap::{ }, flusher::LogsFlusher, }, - otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent}, + otlp::{ + agent::Agent as OtlpAgent, grpc_agent::GrpcAgent as OtlpGrpcAgent, should_enable_otlp_grpc, + should_enable_otlp_http, + }, proxy::{interceptor, should_start_proxy}, secrets::decrypt, tags::{ @@ -1354,24 +1357,49 @@ fn start_otlp_agent( trace_tx: Sender, stats_concentrator: StatsConcentratorHandle, ) -> Option { - if !should_enable_otlp_agent(config) { + let http_enabled = should_enable_otlp_http(config); + let grpc_enabled = should_enable_otlp_grpc(config); + + if !http_enabled && !grpc_enabled { return None; } + let stats_generator = Arc::new(StatsGenerator::new(stats_concentrator)); - let agent = OtlpAgent::new( - config.clone(), - tags_provider, - trace_processor, - trace_tx, - stats_generator, - ); - let cancel_token = agent.cancel_token(); + let cancel_token = CancellationToken::new(); - tokio::spawn(async move { - if let Err(e) = agent.start().await { - error!("Error starting OTLP agent: {e:?}"); - } - }); + if http_enabled { + let agent = OtlpAgent::new( + config.clone(), + tags_provider.clone(), + trace_processor.clone(), + trace_tx.clone(), + stats_generator.clone(), + cancel_token.clone(), + ); + + tokio::spawn(async move { + if let Err(e) = agent.start().await { + error!("Error starting OTLP HTTP agent: {e:?}"); + } + }); + } + + if grpc_enabled { + let grpc_agent = OtlpGrpcAgent::new( + config.clone(), + tags_provider, + trace_processor, + trace_tx, + stats_generator, + cancel_token.clone(), + ); + + tokio::spawn(async move { + if let Err(e) = grpc_agent.start().await { + error!("Error starting OTLP gRPC agent: {e:?}"); + } + }); + } Some(cancel_token) } diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index 92ed5321c..b2d16a321 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -56,12 +56,12 @@ impl Agent { trace_processor: Arc, trace_tx: Sender, stats_generator: Arc, + cancel_token: CancellationToken, ) -> Self { let port = Self::parse_port( config.otlp_config_receiver_protocols_http_endpoint.as_ref(), OTLP_AGENT_HTTP_PORT, ); - let cancel_token = CancellationToken::new(); Self { config: Arc::clone(&config), @@ -75,11 +75,6 @@ impl Agent { } } - #[must_use] - pub fn cancel_token(&self) -> CancellationToken { - self.cancel_token.clone() - } - fn parse_port(endpoint: Option<&String>, default_port: u16) -> u16 { if let Some(endpoint) = endpoint { let port = endpoint.split(':').nth(1); @@ -209,13 +204,9 @@ impl Agent { } } - // This needs to be after process_traces() because process_traces() - // performs obfuscation, and we need to compute stats on the obfuscated traces. if compute_trace_stats_on_extension && let Err(err) = stats_generator.send(&processed_traces) { - // Just log the error. We don't think trace stats are critical, so we don't want to - // return an error if only stats fail to send. error!("OTLP | Error sending traces to the stats concentrator: {err}"); } @@ -312,7 +303,6 @@ mod tests { #[test] fn test_parse_port_with_valid_endpoint() { - // Test with a valid endpoint containing a port let endpoint = Some("localhost:8080".to_string()); assert_eq!( Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT), @@ -322,7 +312,6 @@ mod tests { #[test] fn test_parse_port_with_invalid_port_format() { - // Test with an endpoint containing an invalid port format let endpoint = Some("localhost:invalid".to_string()); assert_eq!( Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT), @@ -332,7 +321,6 @@ mod tests { #[test] fn test_parse_port_with_missing_port() { - // Test with an endpoint missing a port let endpoint = Some("localhost".to_string()); assert_eq!( Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT), @@ -342,7 +330,6 @@ mod tests { #[test] fn test_parse_port_with_none_endpoint() { - // Test with None endpoint let endpoint: Option = None; assert_eq!( Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT), @@ -352,7 +339,6 @@ mod tests { #[test] fn test_parse_port_with_empty_endpoint() { - // Test with an empty endpoint let endpoint = Some(String::new()); assert_eq!( Agent::parse_port(endpoint.as_ref(), OTLP_AGENT_HTTP_PORT), diff --git a/bottlecap/src/otlp/grpc_agent.rs b/bottlecap/src/otlp/grpc_agent.rs new file mode 100644 index 000000000..b7f83ad15 --- /dev/null +++ b/bottlecap/src/otlp/grpc_agent.rs @@ -0,0 +1,280 @@ +use libdd_trace_utils::trace_utils::TracerHeaderTags as DatadogTracerHeaderTags; +use opentelemetry_proto::tonic::collector::trace::v1::{ + ExportTraceServiceRequest, ExportTraceServiceResponse, + trace_service_server::{TraceService, TraceServiceServer}, +}; +use prost::Message; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::sync::mpsc::Sender; +use tokio_util::sync::CancellationToken; +use tonic::{Request, Response, Status}; +use tracing::{debug, error}; + +use crate::{ + config::Config, + otlp::processor::Processor as OtlpProcessor, + tags::provider, + traces::{ + stats_generator::StatsGenerator, trace_aggregator::SendDataBuilderInfo, + trace_processor::TraceProcessor, + }, +}; + +const OTLP_AGENT_GRPC_PORT: u16 = 4317; +const DEFAULT_MAX_RECV_MSG_SIZE: usize = 4 * 1024 * 1024; // 4MB default +const MAX_RECV_MSG_SIZE_CAP: usize = 50 * 1024 * 1024; // 50MB cap (matches trace agent) + +struct OtlpGrpcService { + config: Arc, + tags_provider: Arc, + processor: OtlpProcessor, + trace_processor: Arc, + trace_tx: Sender, + stats_generator: Arc, +} + +#[tonic::async_trait] +impl TraceService for OtlpGrpcService { + async fn export( + &self, + request: Request, + ) -> Result, Status> { + let inner_request = request.into_inner(); + let body_size = inner_request.encoded_len(); + + let traces = match self.processor.process_request(inner_request) { + Ok(traces) => traces, + Err(e) => { + error!("OTLP gRPC | Failed to process request: {:?}", e); + return Err(Status::internal(format!("Failed to process request: {e}"))); + } + }; + + if traces.iter().all(Vec::is_empty) { + error!("OTLP gRPC | Not sending traces, processor returned empty data"); + return Err(Status::internal( + "Not sending traces, processor returned empty data", + )); + } + + let tracer_header_tags = DatadogTracerHeaderTags::default(); + + let compute_trace_stats_on_extension = self.config.compute_trace_stats_on_extension; + let (send_data_builder, processed_traces) = self.trace_processor.process_traces( + self.config.clone(), + self.tags_provider.clone(), + tracer_header_tags, + traces, + body_size, + None, + ); + + if let Some(send_data_builder) = send_data_builder { + if let Err(err) = self.trace_tx.send(send_data_builder).await { + error!("OTLP gRPC | Error sending traces to the trace aggregator: {err}"); + return Err(Status::internal(format!( + "Error sending traces to the trace aggregator: {err}" + ))); + } + debug!("OTLP gRPC | Successfully buffered traces to be aggregated."); + } + + if compute_trace_stats_on_extension + && let Err(err) = self.stats_generator.send(&processed_traces) + { + error!("OTLP gRPC | Error sending traces to the stats concentrator: {err}"); + } + + Ok(Response::new(ExportTraceServiceResponse { + partial_success: None, + })) + } +} + +pub struct GrpcAgent { + config: Arc, + tags_provider: Arc, + processor: OtlpProcessor, + trace_processor: Arc, + trace_tx: Sender, + stats_generator: Arc, + port: u16, + cancel_token: CancellationToken, +} + +impl GrpcAgent { + pub fn new( + config: Arc, + tags_provider: Arc, + trace_processor: Arc, + trace_tx: Sender, + stats_generator: Arc, + cancel_token: CancellationToken, + ) -> Self { + let port = Self::parse_port( + config.otlp_config_receiver_protocols_grpc_endpoint.as_ref(), + OTLP_AGENT_GRPC_PORT, + ); + + Self { + config: Arc::clone(&config), + tags_provider: Arc::clone(&tags_provider), + processor: OtlpProcessor::new(Arc::clone(&config)), + trace_processor, + trace_tx, + stats_generator, + port, + cancel_token, + } + } + + fn parse_port(endpoint: Option<&String>, default_port: u16) -> u16 { + if let Some(endpoint) = endpoint { + let without_scheme = endpoint + .strip_prefix("http://") + .or_else(|| endpoint.strip_prefix("https://")) + .unwrap_or(endpoint); + + // rsplit handles IPv6 like [::1]:4317 + if let Some(port_str) = without_scheme.rsplit(':').next() + && let Ok(port) = port_str.parse::() + { + return port; + } + + error!( + "Invalid OTLP gRPC endpoint format '{}', using default port {}", + endpoint, default_port + ); + } + + default_port + } + + pub async fn start(&self) -> Result<(), Box> { + let socket = SocketAddr::from(([127, 0, 0, 1], self.port)); + + let max_recv_msg_size = self + .config + .otlp_config_receiver_protocols_grpc_max_recv_msg_size_mib + .map_or(DEFAULT_MAX_RECV_MSG_SIZE, |mib| { + if mib <= 0 { + error!( + "Invalid gRPC max message size {}MiB, using default {}MiB", + mib, + DEFAULT_MAX_RECV_MSG_SIZE / (1024 * 1024) + ); + return DEFAULT_MAX_RECV_MSG_SIZE; + } + // Safe: we validated mib > 0 above + #[allow(clippy::cast_sign_loss)] + let size = (mib as usize) * 1024 * 1024; + if size > MAX_RECV_MSG_SIZE_CAP { + error!( + "gRPC max message size {}MiB exceeds cap, limiting to {}MiB", + mib, + MAX_RECV_MSG_SIZE_CAP / (1024 * 1024) + ); + return MAX_RECV_MSG_SIZE_CAP; + } + size + }); + + let service = OtlpGrpcService { + config: Arc::clone(&self.config), + tags_provider: Arc::clone(&self.tags_provider), + processor: self.processor.clone(), + trace_processor: Arc::clone(&self.trace_processor), + trace_tx: self.trace_tx.clone(), + stats_generator: Arc::clone(&self.stats_generator), + }; + + let cancel_token = self.cancel_token.clone(); + + debug!( + "OTLP gRPC | Starting collector on {} with max message size {} bytes", + socket, max_recv_msg_size + ); + + tonic::transport::Server::builder() + .add_service( + TraceServiceServer::new(service).max_decoding_message_size(max_recv_msg_size), + ) + .serve_with_shutdown(socket, async move { + cancel_token.cancelled().await; + debug!("OTLP gRPC | Shutdown signal received, shutting down"); + }) + .await?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_port_with_valid_endpoint() { + let endpoint = Some("localhost:4317".to_string()); + assert_eq!( + GrpcAgent::parse_port(endpoint.as_ref(), OTLP_AGENT_GRPC_PORT), + 4317 + ); + } + + #[test] + fn test_parse_port_with_custom_port() { + let endpoint = Some("0.0.0.0:9999".to_string()); + assert_eq!( + GrpcAgent::parse_port(endpoint.as_ref(), OTLP_AGENT_GRPC_PORT), + 9999 + ); + } + + #[test] + fn test_parse_port_with_http_scheme() { + let endpoint = Some("http://localhost:4317".to_string()); + assert_eq!( + GrpcAgent::parse_port(endpoint.as_ref(), OTLP_AGENT_GRPC_PORT), + 4317 + ); + } + + #[test] + fn test_parse_port_with_https_scheme() { + let endpoint = Some("https://localhost:4317".to_string()); + assert_eq!( + GrpcAgent::parse_port(endpoint.as_ref(), OTLP_AGENT_GRPC_PORT), + 4317 + ); + } + + #[test] + fn test_parse_port_with_invalid_port_format() { + let endpoint = Some("localhost:invalid".to_string()); + assert_eq!( + GrpcAgent::parse_port(endpoint.as_ref(), OTLP_AGENT_GRPC_PORT), + OTLP_AGENT_GRPC_PORT + ); + } + + #[test] + fn test_parse_port_with_missing_port() { + let endpoint = Some("localhost".to_string()); + assert_eq!( + GrpcAgent::parse_port(endpoint.as_ref(), OTLP_AGENT_GRPC_PORT), + OTLP_AGENT_GRPC_PORT + ); + } + + #[test] + fn test_parse_port_with_none_endpoint() { + let endpoint: Option = None; + assert_eq!( + GrpcAgent::parse_port(endpoint.as_ref(), OTLP_AGENT_GRPC_PORT), + OTLP_AGENT_GRPC_PORT + ); + } +} diff --git a/bottlecap/src/otlp/mod.rs b/bottlecap/src/otlp/mod.rs index 4e0f4aed7..4c776bd11 100644 --- a/bottlecap/src/otlp/mod.rs +++ b/bottlecap/src/otlp/mod.rs @@ -3,17 +3,26 @@ use std::sync::Arc; use crate::config::Config; pub mod agent; +pub mod grpc_agent; pub mod processor; pub mod transform; #[must_use] -pub fn should_enable_otlp_agent(config: &Arc) -> bool { +pub fn should_enable_otlp_http(config: &Arc) -> bool { config.otlp_config_traces_enabled && config .otlp_config_receiver_protocols_http_endpoint .is_some() } +#[must_use] +pub fn should_enable_otlp_grpc(config: &Arc) -> bool { + config.otlp_config_traces_enabled + && config + .otlp_config_receiver_protocols_grpc_endpoint + .is_some() +} + #[cfg(test)] mod tests { use super::*; @@ -23,7 +32,7 @@ mod tests { use crate::config::get_config; #[test] - fn test_should_enable_otlp_agent_from_yaml() { + fn test_should_enable_otlp_http_from_yaml() { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.create_file( @@ -37,17 +46,17 @@ mod tests { ", )?; - let config = get_config(Path::new("")); + let config = Arc::new(get_config(Path::new(""))); - // Since the default for traces is `true`, we don't need to set it. - assert!(should_enable_otlp_agent(&Arc::new(config))); + assert!(should_enable_otlp_http(&config)); + assert!(!should_enable_otlp_grpc(&config)); Ok(()) }); } #[test] - fn test_should_enable_otlp_agent_from_env_vars() { + fn test_should_enable_otlp_http_from_env_vars() { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env( @@ -55,17 +64,17 @@ mod tests { "0.0.0.0:4318", ); - let config = get_config(Path::new("")); + let config = Arc::new(get_config(Path::new(""))); - // Since the default for traces is `true`, we don't need to set it. - assert!(should_enable_otlp_agent(&Arc::new(config))); + assert!(should_enable_otlp_http(&config)); + assert!(!should_enable_otlp_grpc(&config)); Ok(()) }); } #[test] - fn test_should_not_enable_otlp_agent_if_traces_are_disabled() { + fn test_should_not_enable_http_if_traces_disabled() { figment::Jail::expect_with(|jail| { jail.clear_env(); jail.set_env("DD_OTLP_CONFIG_TRACES_ENABLED", "false"); @@ -74,9 +83,67 @@ mod tests { "0.0.0.0:4318", ); - let config = get_config(Path::new("")); + let config = Arc::new(get_config(Path::new(""))); + + assert!(!should_enable_otlp_http(&config)); + + Ok(()) + }); + } + + #[test] + fn test_should_enable_otlp_grpc_from_env_vars() { + figment::Jail::expect_with(|jail| { + jail.clear_env(); + jail.set_env( + "DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_GRPC_ENDPOINT", + "0.0.0.0:4317", + ); + + let config = Arc::new(get_config(Path::new(""))); + + assert!(should_enable_otlp_grpc(&config)); + assert!(!should_enable_otlp_http(&config)); + + Ok(()) + }); + } + + #[test] + fn test_should_enable_both_http_and_grpc() { + figment::Jail::expect_with(|jail| { + jail.clear_env(); + jail.set_env( + "DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_HTTP_ENDPOINT", + "0.0.0.0:4318", + ); + jail.set_env( + "DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_GRPC_ENDPOINT", + "0.0.0.0:4317", + ); + + let config = Arc::new(get_config(Path::new(""))); + + assert!(should_enable_otlp_http(&config)); + assert!(should_enable_otlp_grpc(&config)); + + Ok(()) + }); + } + + #[test] + fn test_should_not_enable_grpc_if_traces_disabled() { + figment::Jail::expect_with(|jail| { + jail.clear_env(); + jail.set_env("DD_OTLP_CONFIG_TRACES_ENABLED", "false"); + jail.set_env( + "DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_GRPC_ENDPOINT", + "0.0.0.0:4317", + ); + + let config = Arc::new(get_config(Path::new(""))); - assert!(!should_enable_otlp_agent(&Arc::new(config))); + assert!(!should_enable_otlp_grpc(&config)); Ok(()) }); diff --git a/bottlecap/src/otlp/processor.rs b/bottlecap/src/otlp/processor.rs index cacf5202f..82e81cd7e 100644 --- a/bottlecap/src/otlp/processor.rs +++ b/bottlecap/src/otlp/processor.rs @@ -105,6 +105,25 @@ impl Processor { Self { config } } + /// Process an OTLP trace request that has already been deserialized. + /// This is the core processing logic used by both HTTP and gRPC paths. + pub fn process_request( + &self, + request: ExportTraceServiceRequest, + ) -> Result>, Box> { + let mut spans: Vec> = Vec::new(); + for resource_spans in &request.resource_spans { + spans.extend(otel_resource_spans_to_dd_spans( + resource_spans, + self.config.clone(), + )); + } + + Ok(spans) + } + + /// Process raw bytes from an HTTP request. + /// Decodes based on content-type and delegates to `process_request`. pub fn process( &self, body: &[u8], @@ -119,15 +138,7 @@ impl Processor { OtlpEncoding::Protobuf => ExportTraceServiceRequest::decode(body)?, }; - let mut spans: Vec> = Vec::new(); - for resource_spans in &request.resource_spans { - spans.extend(otel_resource_spans_to_dd_spans( - resource_spans, - self.config.clone(), - )); - } - - Ok(spans) + self.process_request(request) } } diff --git a/integration-tests/lambda/otlp-node/grpc-handler.js b/integration-tests/lambda/otlp-node/grpc-handler.js new file mode 100644 index 000000000..1eac3e380 --- /dev/null +++ b/integration-tests/lambda/otlp-node/grpc-handler.js @@ -0,0 +1,45 @@ +const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node'); +const { BatchSpanProcessor } = require('@opentelemetry/sdk-trace-base'); +const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-grpc'); +const { Resource } = require('@opentelemetry/resources'); +const { ATTR_SERVICE_NAME } = require('@opentelemetry/semantic-conventions'); +const api = require('@opentelemetry/api'); + +const resource = new Resource({ + [ATTR_SERVICE_NAME]: process.env.OTEL_SERVICE_NAME || 'otlp-grpc-node-lambda', +}); + +const provider = new NodeTracerProvider({ resource }); +const processor = new BatchSpanProcessor( + new OTLPTraceExporter({ + url: process.env.OTEL_EXPORTER_OTLP_ENDPOINT, + }) +); +provider.addSpanProcessor(processor); +provider.register(); + +api.trace.setGlobalTracerProvider(provider); + +exports.handler = async (event, context) => { + const tracer = api.trace.getTracer('otlp-grpc-node-lambda'); + + await tracer.startActiveSpan('grpc-handler', async (span) => { + try { + span.setAttribute('request_id', context.awsRequestId); + span.setAttribute('protocol', 'grpc'); + span.setAttribute('http.status_code', 200); + } catch (error) { + span.recordException(error); + span.setStatus({ code: api.SpanStatusCode.ERROR, message: error.message }); + throw error; + } finally { + span.end(); + } + }); + await provider.forceFlush(); + + return { + statusCode: 200, + body: JSON.stringify({ message: 'Success via gRPC' }) + }; +}; diff --git a/integration-tests/lambda/otlp-node/package.json b/integration-tests/lambda/otlp-node/package.json index fc31aa36a..0966f0a0c 100644 --- a/integration-tests/lambda/otlp-node/package.json +++ b/integration-tests/lambda/otlp-node/package.json @@ -5,6 +5,7 @@ "main": "index.js", "dependencies": { "@opentelemetry/api": "^1.9.0", + "@opentelemetry/exporter-trace-otlp-grpc": "^0.54.2", "@opentelemetry/exporter-trace-otlp-http": "^0.54.2", "@opentelemetry/exporter-trace-otlp-proto": "^0.54.2", "@opentelemetry/otlp-transformer": "^0.54.2", diff --git a/integration-tests/lib/stacks/otlp.ts b/integration-tests/lib/stacks/otlp.ts index 475de9987..62f2f2feb 100644 --- a/integration-tests/lib/stacks/otlp.ts +++ b/integration-tests/lib/stacks/otlp.ts @@ -40,6 +40,28 @@ export class Otlp extends cdk.Stack { nodeFunction.addToRolePolicy(defaultDatadogSecretPolicy); nodeFunction.addLayers(extensionLayer); + const nodeGrpcFunctionName = `${id}-node-grpc-lambda`; + const nodeGrpcFunction = new lambda.Function(this, nodeGrpcFunctionName, { + runtime: defaultNodeRuntime, + architecture: lambda.Architecture.ARM_64, + handler: 'grpc-handler.handler', + code: lambda.Code.fromAsset('./lambda/otlp-node'), + functionName: nodeGrpcFunctionName, + timeout: cdk.Duration.seconds(30), + memorySize: 256, + environment: { + ...defaultDatadogEnvVariables, + DD_SERVICE: nodeGrpcFunctionName, + DD_OTLP_CONFIG_RECEIVER_PROTOCOLS_GRPC_ENDPOINT: 'localhost:4317', + OTEL_EXPORTER_OTLP_ENDPOINT: 'http://localhost:4317', + OTEL_EXPORTER_OTLP_PROTOCOL: 'grpc', + OTEL_SERVICE_NAME: nodeGrpcFunctionName, + }, + logGroup: createLogGroup(this, nodeGrpcFunctionName) + }); + nodeGrpcFunction.addToRolePolicy(defaultDatadogSecretPolicy); + nodeGrpcFunction.addLayers(extensionLayer); + const validationFunctionName = `${id}-response-validation-lambda`; const validationFunction = new lambda.Function(this, validationFunctionName, { runtime: defaultNodeRuntime, diff --git a/integration-tests/tests/otlp.test.ts b/integration-tests/tests/otlp.test.ts index 19dc2eba6..f54b41823 100644 --- a/integration-tests/tests/otlp.test.ts +++ b/integration-tests/tests/otlp.test.ts @@ -12,7 +12,7 @@ describe('OTLP Integration Tests', () => { let results: Record; beforeAll(async () => { - // Build function configs for all runtimes plus response validation + // Build function configs for all runtimes plus response validation and gRPC const functions: FunctionConfig[] = [ ...runtimes.map(runtime => ({ functionName: `${stackName}-${runtime}-lambda`, @@ -22,6 +22,10 @@ describe('OTLP Integration Tests', () => { functionName: `${stackName}-response-validation-lambda`, runtime: 'responseValidation', }, + { + functionName: `${stackName}-node-grpc-lambda`, + runtime: 'nodeGrpc', + }, ]; console.log('Invoking all OTLP Lambda functions...'); @@ -84,4 +88,37 @@ describe('OTLP Integration Tests', () => { expect(hasProtobufSpan).toBe(true); }); }); + + describe('OTLP gRPC Protocol', () => { + const getResult = () => results['nodeGrpc']?.[0]?.[0]; + + it('should invoke gRPC Lambda successfully', () => { + const result = getResult(); + expect(result).toBeDefined(); + expect(result.statusCode).toBe(200); + }); + + it('should send traces via gRPC to Datadog', () => { + const result = getResult(); + expect(result).toBeDefined(); + expect(result.traces?.length).toEqual(1); + }); + + it('should have gRPC handler span with correct attributes', () => { + const result = getResult(); + expect(result).toBeDefined(); + const allSpans = result.traces?.flatMap(t => t.spans) || []; + const hasGrpcSpan = allSpans.some(s => + s.attributes?.resource_name === 'grpc-handler' && s.attributes?.custom?.protocol === 'grpc' + ); + expect(hasGrpcSpan).toBe(true); + }); + + it('should have spans in the trace', () => { + const result = getResult(); + expect(result).toBeDefined(); + const trace = result.traces![0]; + expect(trace.spans.length).toBeGreaterThan(0); + }); + }); });