From 513f0ca91f47e66d007e338f1c2e0464dc1305a7 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 5 Mar 2026 15:56:52 -0500 Subject: [PATCH 01/19] handle MSK events --- .../invocation/triggers/msk_event.rs | 47 ++++++++++++++++++- .../payloads/msk_event_with_headers.json | 23 +++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 bottlecap/tests/payloads/msk_event_with_headers.json diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 91bd85d17..d8e9dba36 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -21,6 +21,8 @@ pub struct MSKRecord { pub topic: String, pub partition: i32, pub timestamp: f64, + #[serde(default)] + pub headers: Vec>>, } impl Trigger for MSKEvent { @@ -105,7 +107,17 @@ impl Trigger for MSKEvent { } fn get_carrier(&self) -> HashMap { - HashMap::new() + let mut carrier = HashMap::new(); + if let Some(record) = self.records.values().find_map(|arr| arr.first()) { + for header_map in &record.headers { + for (key, value_bytes) in header_map { + if let Ok(value_str) = String::from_utf8(value_bytes.clone()) { + carrier.insert(key.to_lowercase(), value_str); + } + } + } + } + carrier } fn is_async(&self) -> bool { @@ -142,6 +154,7 @@ mod tests { topic: String::from("topic1"), partition: 0, timestamp: 1_745_846_213_022f64, + headers: vec![], }; let mut expected_records = HashMap::new(); expected_records.insert(String::from("topic1"), vec![record]); @@ -335,4 +348,36 @@ mod tests { "msk" // fallback value ); } + + #[test] + fn test_new_with_headers() { + let json = read_json_file("msk_event_with_headers.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let result = MSKEvent::new(payload).expect("Failed to deserialize into MSKEvent"); + + let record = result.records.values().find_map(|arr| arr.first()).unwrap(); + assert_eq!(record.topic, "topic1"); + assert_eq!(record.headers.len(), 3); + } + + #[test] + fn test_get_carrier_with_headers() { + let json = read_json_file("msk_event_with_headers.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + let event = MSKEvent::new(payload).expect("Failed to deserialize MSKEvent"); + let carrier = event.get_carrier(); + + assert_eq!( + carrier.get("x-datadog-trace-id").map(String::as_str), + Some("36979754430890456950") + ); + assert_eq!( + carrier.get("x-datadog-parent-id").map(String::as_str), + Some("7431398482019833808") + ); + assert_eq!( + carrier.get("x-datadog-sampling-priority").map(String::as_str), + Some("1") + ); + } } diff --git a/bottlecap/tests/payloads/msk_event_with_headers.json b/bottlecap/tests/payloads/msk_event_with_headers.json new file mode 100644 index 000000000..b5a6e6238 --- /dev/null +++ b/bottlecap/tests/payloads/msk_event_with_headers.json @@ -0,0 +1,23 @@ +{ + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-2.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": { + "topic1": [ + { + "topic": "topic1", + "partition": 0, + "offset": 101, + "timestamp": 1745846213022, + "timestampType":"CREATE_TIME", + "key": "b3JkZXJJZA==", + "value": "eyJvcmRlcklkIjoiMTIzNCIsImFtb3VudCI6MTAwLjAxfQ==", + "headers": [ + {"x-datadog-trace-id": [51, 54, 57, 55, 57, 55, 53, 52, 52, 51, 48, 56, 57, 48, 52, 53, 54, 57, 53, 48]}, + {"x-datadog-parent-id": [55, 52, 51, 49, 51, 57, 56, 52, 56, 50, 48, 49, 57, 56, 51, 51, 56, 48, 56]}, + {"x-datadog-sampling-priority": [49]} + ] + } + ] + } +} From fe2c84b99a7ed832441d060bd83fa976ef9f0eef Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Mon, 9 Mar 2026 15:20:19 -0400 Subject: [PATCH 02/19] cargo fmt and merge main --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index d8e9dba36..cb5cf8796 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -376,7 +376,9 @@ mod tests { Some("7431398482019833808") ); assert_eq!( - carrier.get("x-datadog-sampling-priority").map(String::as_str), + carrier + .get("x-datadog-sampling-priority") + .map(String::as_str), Some("1") ); } From 1dbb7d6c8dc9939466f8797de8f1b9b627290b8e Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Mon, 9 Mar 2026 15:32:35 -0400 Subject: [PATCH 03/19] fix --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index cb5cf8796..58ae9da97 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -355,7 +355,7 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let result = MSKEvent::new(payload).expect("Failed to deserialize into MSKEvent"); - let record = result.records.values().find_map(|arr| arr.first()).unwrap(); + let record = result.records.values().find_map(|arr| arr.first()).expect("Expected at least one record"); assert_eq!(record.topic, "topic1"); assert_eq!(record.headers.len(), 3); } From 4683724ea818605ad5e472fadcbbc18035733676 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Mon, 9 Mar 2026 15:33:38 -0400 Subject: [PATCH 04/19] cargo fmt --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 58ae9da97..723a60bbb 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -355,7 +355,11 @@ mod tests { let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); let result = MSKEvent::new(payload).expect("Failed to deserialize into MSKEvent"); - let record = result.records.values().find_map(|arr| arr.first()).expect("Expected at least one record"); + let record = result + .records + .values() + .find_map(|arr| arr.first()) + .expect("Expected at least one record"); assert_eq!(record.topic, "topic1"); assert_eq!(record.headers.len(), 3); } From 22b24a23b5e5c2979147192b277122757ccbcbff Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 11 Mar 2026 13:44:58 -0400 Subject: [PATCH 05/19] fix critical vulnerability --- bottlecap/Cargo.lock | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 37517a6ae..cb3b02ab8 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -383,7 +383,7 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools 0.13.0", + "itertools 0.11.0", "log", "prettyplease", "proc-macro2", @@ -1640,15 +1640,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.14.0" @@ -2585,9 +2576,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.13" +version = "0.11.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" dependencies = [ "bytes", "getrandom 0.3.4", From 40cd56f4aba77f436e89e019abdbfd2fa30a7af7 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 12 Mar 2026 23:38:13 -0400 Subject: [PATCH 06/19] fix(msk): handle real-world header format from Java Lambda runtime MSK event headers delivered by the Java Lambda runtime use a JSON object with numeric string keys and decimal string values rather than an array of integers. Records are similarly delivered as an object with numeric string keys instead of an array. Update deserialization and carrier extraction to support both formats, and update the fixture and tests to reflect the real-world payload shape. Co-Authored-By: Claude Sonnet 4.6 --- .../invocation/triggers/msk_event.rs | 139 ++++++++++++++---- .../payloads/msk_event_with_headers.json | 29 ++-- 2 files changed, 123 insertions(+), 45 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 723a60bbb..04250bbd6 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -22,23 +22,95 @@ pub struct MSKRecord { pub partition: i32, pub timestamp: f64, #[serde(default)] - pub headers: Vec>>, + pub headers: Value, +} + +/// Decodes a header value into raw bytes. Two formats have been observed: +/// +/// - **Array**: elements are integers `[104, 101]` or decimal strings `["49"]` +/// - **Object** with numeric string keys and decimal string values: `{"0":"52","1":"54",...}` +fn bytes_from_header_value(val: &Value) -> Option> { + match val { + // Array format: elements may be integers `[104, 101]` or decimal strings `["49"]` + Value::Array(arr) => arr + .iter() + .map(|v| match v { + Value::Number(n) => n.as_u64().map(|n| n as u8), + Value::String(s) => s.parse::().ok(), + _ => None, + }) + .collect(), + // Object format with numeric string keys and decimal string values: `{"0":"52","1":"54",...}` + Value::Object(obj) => { + let mut pairs: Vec<(u64, u8)> = obj + .iter() + .filter_map(|(k, v)| { + Some((k.parse::().ok()?, v.as_str()?.parse::().ok()?)) + }) + .collect(); + pairs.sort_by_key(|(idx, _)| *idx); + Some(pairs.into_iter().map(|(_, b)| b).collect()) + } + _ => None, + } +} + +/// Extracts trace propagation headers from an MSK record's `headers` field into a carrier map. +/// The `headers` field is a JSON object with numeric string keys, one entry per Kafka header. +fn carrier_from_headers(headers: &Value) -> HashMap { + let mut carrier = HashMap::new(); + + let entries: Vec<&Value> = match headers { + Value::Array(arr) => arr.iter().collect(), + Value::Object(obj) => { + let mut pairs: Vec<(u64, &Value)> = obj + .iter() + .filter_map(|(k, v)| k.parse::().ok().map(|n| (n, v))) + .collect(); + pairs.sort_by_key(|(n, _)| *n); + pairs.into_iter().map(|(_, v)| v).collect() + } + _ => return carrier, + }; + + for entry in entries { + if let Value::Object(header_map) = entry { + for (key, val) in header_map { + if let Some(bytes) = bytes_from_header_value(val) { + if let Ok(s) = String::from_utf8(bytes) { + carrier.insert(key.to_lowercase(), s); + } + } + } + } + } + + carrier } impl Trigger for MSKEvent { fn new(mut payload: Value) -> Option { - // We only care about the first item in the first record, so drop the others before deserializing. + // We only care about the first item in the first record, so drop the others before + // deserializing. Records are delivered as a JSON object with numeric string keys; + // normalize to a single-element array before deserializing. if let Some(records_map) = payload.get_mut("records").and_then(Value::as_object_mut) { - match records_map.iter_mut().next() { - Some((first_key, Value::Array(arr))) => { - arr.truncate(1); - let key = first_key.clone(); - records_map.retain(|k, _| k == &key); - } - _ => { - records_map.clear(); + let first_key = records_map.keys().next()?.clone(); + let normalized = match records_map.get(&first_key)? { + Value::Array(arr) => Value::Array(vec![arr.first()?.clone()]), + Value::Object(obj) => { + // Records delivered as object with numeric string keys: {"0": {...}, "1": {...}, ...} + // Take the record with the lowest numeric key. + let mut pairs: Vec<(u64, Value)> = obj + .iter() + .filter_map(|(k, v)| k.parse::().ok().map(|n| (n, v.clone()))) + .collect(); + pairs.sort_by_key(|(n, _)| *n); + let (_, first_record) = pairs.into_iter().next()?; + Value::Array(vec![first_record]) } - } + _ => return None, + }; + *records_map = serde_json::Map::from_iter([(first_key, normalized)]); } match serde_json::from_value::(payload) { @@ -51,13 +123,16 @@ impl Trigger for MSKEvent { } fn is_match(payload: &Value) -> bool { - payload + let first_record_group = payload .get("records") .and_then(Value::as_object) - .and_then(|map| map.values().next()) - .and_then(Value::as_array) - .and_then(|arr| arr.first()) - .is_some_and(|rec| rec.get("topic").is_some()) + .and_then(|map| map.values().next()); + let first_record = match first_record_group { + Some(Value::Array(arr)) => arr.first(), + Some(Value::Object(obj)) => obj.values().next(), + _ => return false, + }; + first_record.is_some_and(|rec| rec.get("topic").is_some()) } #[allow(clippy::cast_possible_truncation)] @@ -107,17 +182,10 @@ impl Trigger for MSKEvent { } fn get_carrier(&self) -> HashMap { - let mut carrier = HashMap::new(); - if let Some(record) = self.records.values().find_map(|arr| arr.first()) { - for header_map in &record.headers { - for (key, value_bytes) in header_map { - if let Ok(value_str) = String::from_utf8(value_bytes.clone()) { - carrier.insert(key.to_lowercase(), value_str); - } - } - } - } - carrier + self.records + .values() + .find_map(|arr| arr.first()) + .map_or_else(HashMap::new, |record| carrier_from_headers(&record.headers)) } fn is_async(&self) -> bool { @@ -154,7 +222,7 @@ mod tests { topic: String::from("topic1"), partition: 0, timestamp: 1_745_846_213_022f64, - headers: vec![], + headers: Value::Array(vec![]), }; let mut expected_records = HashMap::new(); expected_records.insert(String::from("topic1"), vec![record]); @@ -360,8 +428,9 @@ mod tests { .values() .find_map(|arr| arr.first()) .expect("Expected at least one record"); - assert_eq!(record.topic, "topic1"); - assert_eq!(record.headers.len(), 3); + assert_eq!(record.topic, "demo-topic"); + // headers is an object with 6 entries (2 non-datadog + 4 datadog) + assert_eq!(record.headers.as_object().map(|o| o.len()), Some(6)); } #[test] @@ -371,13 +440,15 @@ mod tests { let event = MSKEvent::new(payload).expect("Failed to deserialize MSKEvent"); let carrier = event.get_carrier(); + // Datadog headers appear at indices 2-5; non-datadog headers at 0-1 are also decoded + // but won't be used by the propagator. assert_eq!( carrier.get("x-datadog-trace-id").map(String::as_str), - Some("36979754430890456950") + Some("1497116011738644768") ); assert_eq!( carrier.get("x-datadog-parent-id").map(String::as_str), - Some("7431398482019833808") + Some("2239801583077304042") ); assert_eq!( carrier @@ -385,5 +456,9 @@ mod tests { .map(String::as_str), Some("1") ); + assert_eq!( + carrier.get("x-datadog-tags").map(String::as_str), + Some("_dd.p.dm=-1,_dd.p.tid=699c836500000000") + ); } } diff --git a/bottlecap/tests/payloads/msk_event_with_headers.json b/bottlecap/tests/payloads/msk_event_with_headers.json index b5a6e6238..939658526 100644 --- a/bottlecap/tests/payloads/msk_event_with_headers.json +++ b/bottlecap/tests/payloads/msk_event_with_headers.json @@ -3,21 +3,24 @@ "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-2.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records": { - "topic1": [ - { - "topic": "topic1", - "partition": 0, - "offset": 101, - "timestamp": 1745846213022, - "timestampType":"CREATE_TIME", + "demo-topic-0": { + "0": { + "topic": "demo-topic", + "partition": 0, + "offset": 101, + "timestamp": 1745846213022, + "timestampType": "CREATE_TIME", "key": "b3JkZXJJZA==", "value": "eyJvcmRlcklkIjoiMTIzNCIsImFtb3VudCI6MTAwLjAxfQ==", - "headers": [ - {"x-datadog-trace-id": [51, 54, 57, 55, 57, 55, 53, 52, 52, 51, 48, 56, 57, 48, 52, 53, 54, 57, 53, 48]}, - {"x-datadog-parent-id": [55, 52, 51, 49, 51, 57, 56, 52, 56, 50, 48, 49, 57, 56, 51, 51, 56, 48, 56]}, - {"x-datadog-sampling-priority": [49]} - ] + "headers": { + "0": {"someId": ["70","114","111","109","66","114","117","110","111"]}, + "1": {"anotherId": {"0":"55","1":"52","2":"50","3":"101","4":"101","5":"101","6":"52","7":"57","8":"45","9":"101","10":"51","11":"100","12":"55","13":"45","14":"52","15":"51","16":"52","17":"54","18":"45","19":"97","20":"54","21":"57","22":"57","23":"45","24":"52","25":"100","26":"49","27":"56","28":"101","29":"99","30":"98","31":"53","32":"53","33":"101","34":"50","35":"99"}}, + "2": {"x-datadog-trace-id": {"0":"49","1":"52","2":"57","3":"55","4":"49","5":"49","6":"54","7":"48","8":"49","9":"49","10":"55","11":"51","12":"56","13":"54","14":"52","15":"52","16":"55","17":"54","18":"56"}}, + "3": {"x-datadog-parent-id": {"0":"50","1":"50","2":"51","3":"57","4":"56","5":"48","6":"49","7":"53","8":"56","9":"51","10":"48","11":"55","12":"55","13":"51","14":"48","15":"52","16":"48","17":"52","18":"50"}}, + "4": {"x-datadog-sampling-priority": ["49"]}, + "5": {"x-datadog-tags": {"0":"95","1":"100","2":"100","3":"46","4":"112","5":"46","6":"100","7":"109","8":"61","9":"45","10":"49","11":"44","12":"95","13":"100","14":"100","15":"46","16":"112","17":"46","18":"116","19":"105","20":"100","21":"61","22":"54","23":"57","24":"57","25":"99","26":"56","27":"51","28":"54","29":"53","30":"48","31":"48","32":"48","33":"48","34":"48","35":"48","36":"48","37":"48"}} + } } - ] + } } } From f64e58340da53b5fed765a4c8c29ab23755483db Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 12 Mar 2026 23:55:15 -0400 Subject: [PATCH 07/19] fix(msk): fix clippy lints in msk_event header parsing - Replace `n as u8` cast with `u8::try_from(n).ok()` to avoid truncation - Collapse nested `if let` blocks into a single `if let ... && let ...` - Replace redundant closure `|o| o.len()` with `serde_json::Map::len` Co-Authored-By: Claude Sonnet 4.6 --- .../src/lifecycle/invocation/triggers/msk_event.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 04250bbd6..1d265b203 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -35,7 +35,7 @@ fn bytes_from_header_value(val: &Value) -> Option> { Value::Array(arr) => arr .iter() .map(|v| match v { - Value::Number(n) => n.as_u64().map(|n| n as u8), + Value::Number(n) => n.as_u64().and_then(|n| u8::try_from(n).ok()), Value::String(s) => s.parse::().ok(), _ => None, }) @@ -76,10 +76,10 @@ fn carrier_from_headers(headers: &Value) -> HashMap { for entry in entries { if let Value::Object(header_map) = entry { for (key, val) in header_map { - if let Some(bytes) = bytes_from_header_value(val) { - if let Ok(s) = String::from_utf8(bytes) { - carrier.insert(key.to_lowercase(), s); - } + if let Some(bytes) = bytes_from_header_value(val) + && let Ok(s) = String::from_utf8(bytes) + { + carrier.insert(key.to_lowercase(), s); } } } @@ -430,7 +430,7 @@ mod tests { .expect("Expected at least one record"); assert_eq!(record.topic, "demo-topic"); // headers is an object with 6 entries (2 non-datadog + 4 datadog) - assert_eq!(record.headers.as_object().map(|o| o.len()), Some(6)); + assert_eq!(record.headers.as_object().map(serde_json::Map::len), Some(6)); } #[test] From 46014bb888a0801c2a1d6d3b8ed2019b9a9bdb12 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Fri, 13 Mar 2026 00:31:59 -0400 Subject: [PATCH 08/19] cargo fmt --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 1d265b203..1bf2e3db8 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -430,7 +430,10 @@ mod tests { .expect("Expected at least one record"); assert_eq!(record.topic, "demo-topic"); // headers is an object with 6 entries (2 non-datadog + 4 datadog) - assert_eq!(record.headers.as_object().map(serde_json::Map::len), Some(6)); + assert_eq!( + record.headers.as_object().map(serde_json::Map::len), + Some(6) + ); } #[test] From 79fe393c96659ca1f57fcf3191ba664d229f4f68 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Mon, 16 Mar 2026 12:26:37 -0400 Subject: [PATCH 09/19] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 1bf2e3db8..c3f663302 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -436,6 +436,14 @@ mod tests { ); } + #[test] + fn test_is_match_with_headers() { + let json = read_json_file("msk_event_with_headers.json"); + let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value"); + + assert!(MSKEvent::is_match(&payload)); + } + #[test] fn test_get_carrier_with_headers() { let json = read_json_file("msk_event_with_headers.json"); From 148d9f31770a82a07c8e3d6a41ee36e4a830b25f Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Mon, 16 Mar 2026 13:17:02 -0400 Subject: [PATCH 10/19] rename method and update comment --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index c3f663302..350e6db6a 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -55,9 +55,10 @@ fn bytes_from_header_value(val: &Value) -> Option> { } } -/// Extracts trace propagation headers from an MSK record's `headers` field into a carrier map. -/// The `headers` field is a JSON object with numeric string keys, one entry per Kafka header. -fn carrier_from_headers(headers: &Value) -> HashMap { +/// Decodes an MSK record's `headers` field into a `HashMap` by converting +/// each header's byte values to a UTF-8 string. The `headers` field may be either a JSON +/// array or a JSON object with numeric string keys, one entry per Kafka header, ordered by index. +fn headers_to_string_map(headers: &Value) -> HashMap { let mut carrier = HashMap::new(); let entries: Vec<&Value> = match headers { @@ -185,7 +186,7 @@ impl Trigger for MSKEvent { self.records .values() .find_map(|arr| arr.first()) - .map_or_else(HashMap::new, |record| carrier_from_headers(&record.headers)) + .map_or_else(HashMap::new, |record| headers_to_string_map(&record.headers)) } fn is_async(&self) -> bool { From 9ff4e256b630c0d6c5ee7b97b7493b888bdcefac Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Mon, 16 Mar 2026 13:22:44 -0400 Subject: [PATCH 11/19] cargo fmt --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 350e6db6a..4389cf448 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -56,7 +56,7 @@ fn bytes_from_header_value(val: &Value) -> Option> { } /// Decodes an MSK record's `headers` field into a `HashMap` by converting -/// each header's byte values to a UTF-8 string. The `headers` field may be either a JSON +/// each header's byte values to a UTF-8 string. The `headers` field may be either a JSON /// array or a JSON object with numeric string keys, one entry per Kafka header, ordered by index. fn headers_to_string_map(headers: &Value) -> HashMap { let mut carrier = HashMap::new(); @@ -186,7 +186,9 @@ impl Trigger for MSKEvent { self.records .values() .find_map(|arr| arr.first()) - .map_or_else(HashMap::new, |record| headers_to_string_map(&record.headers)) + .map_or_else(HashMap::new, |record| { + headers_to_string_map(&record.headers) + }) } fn is_async(&self) -> bool { From dd1601612f77cab9a5f2edfad9301317f9c3c262 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 18 Mar 2026 10:39:52 -0400 Subject: [PATCH 12/19] updates based on codex suggestions --- .../invocation/triggers/msk_event.rs | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 4389cf448..0c4cd1f49 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -92,26 +92,24 @@ fn headers_to_string_map(headers: &Value) -> HashMap { impl Trigger for MSKEvent { fn new(mut payload: Value) -> Option { // We only care about the first item in the first record, so drop the others before - // deserializing. Records are delivered as a JSON object with numeric string keys; + // deserializing. Records may be delivered as a JSON object with numeric string keys; // normalize to a single-element array before deserializing. if let Some(records_map) = payload.get_mut("records").and_then(Value::as_object_mut) { let first_key = records_map.keys().next()?.clone(); - let normalized = match records_map.get(&first_key)? { - Value::Array(arr) => Value::Array(vec![arr.first()?.clone()]), - Value::Object(obj) => { - // Records delivered as object with numeric string keys: {"0": {...}, "1": {...}, ...} - // Take the record with the lowest numeric key. - let mut pairs: Vec<(u64, Value)> = obj - .iter() - .filter_map(|(k, v)| k.parse::().ok().map(|n| (n, v.clone()))) - .collect(); - pairs.sort_by_key(|(n, _)| *n); - let (_, first_record) = pairs.into_iter().next()?; - Value::Array(vec![first_record]) + { + let entry = records_map.get_mut(&first_key)?; + match entry { + // Truncate in place — no clone needed since payload is already mutable. + Value::Array(arr) => arr.truncate(1), + // Object format: extract the first record and replace with a single-element array. + Value::Object(obj) => { + let first_record = obj.values().next()?.clone(); + *entry = Value::Array(vec![first_record]); + } + _ => return None, } - _ => return None, - }; - *records_map = serde_json::Map::from_iter([(first_key, normalized)]); + } + records_map.retain(|k, _| k == &first_key); } match serde_json::from_value::(payload) { From aa372574313f4d4af6493bd6dca66eea4f8e432e Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 18 Mar 2026 11:22:07 -0400 Subject: [PATCH 13/19] find the first record with actual tracecontext and remove the unneccessary sort in `headers_to_string_map` --- .../invocation/triggers/msk_event.rs | 110 +++++++++++++----- 1 file changed, 84 insertions(+), 26 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 0c4cd1f49..83d3794f7 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -55,6 +55,28 @@ fn bytes_from_header_value(val: &Value) -> Option> { } } +/// Scans all records in the records map and returns the `(topic_key, record_value)` of the first +/// record whose headers contain a tracecontext key. Returns `None` if none found. +fn find_record_with_trace_context( + records_map: &serde_json::Map, +) -> Option<(String, Value)> { + for (key, group) in records_map { + let records: Vec<&Value> = match group { + Value::Array(arr) => arr.iter().collect(), + Value::Object(obj) => obj.values().collect(), + _ => continue, + }; + for record in records { + let headers = record.get("headers").unwrap_or(&Value::Null); + let carrier = headers_to_string_map(headers); + if carrier.contains_key("x-datadog-trace-id") || carrier.contains_key("traceparent") { + return Some((key.clone(), record.clone())); + } + } + } + None +} + /// Decodes an MSK record's `headers` field into a `HashMap` by converting /// each header's byte values to a UTF-8 string. The `headers` field may be either a JSON /// array or a JSON object with numeric string keys, one entry per Kafka header, ordered by index. @@ -63,14 +85,9 @@ fn headers_to_string_map(headers: &Value) -> HashMap { let entries: Vec<&Value> = match headers { Value::Array(arr) => arr.iter().collect(), - Value::Object(obj) => { - let mut pairs: Vec<(u64, &Value)> = obj - .iter() - .filter_map(|(k, v)| k.parse::().ok().map(|n| (n, v))) - .collect(); - pairs.sort_by_key(|(n, _)| *n); - pairs.into_iter().map(|(_, v)| v).collect() - } + // Object format: numeric string keys are just ordering artifacts from the Java runtime; + // insertion order into the HashMap doesn't matter so no sort needed. + Value::Object(obj) => obj.values().collect(), _ => return carrier, }; @@ -91,25 +108,31 @@ fn headers_to_string_map(headers: &Value) -> HashMap { impl Trigger for MSKEvent { fn new(mut payload: Value) -> Option { - // We only care about the first item in the first record, so drop the others before - // deserializing. Records may be delivered as a JSON object with numeric string keys; - // normalize to a single-element array before deserializing. - if let Some(records_map) = payload.get_mut("records").and_then(Value::as_object_mut) { - let first_key = records_map.keys().next()?.clone(); - { - let entry = records_map.get_mut(&first_key)?; - match entry { - // Truncate in place — no clone needed since payload is already mutable. - Value::Array(arr) => arr.truncate(1), - // Object format: extract the first record and replace with a single-element array. - Value::Object(obj) => { - let first_record = obj.values().next()?.clone(); - *entry = Value::Array(vec![first_record]); - } - _ => return None, - } + // We only need one record: prefer the first one carrying Datadog trace context so we can + // propagate the trace, falling back to the very first record otherwise. Records may be + // delivered as a JSON object with numeric string keys; normalize to a single-element array + // before deserializing. + let chosen = payload + .get("records") + .and_then(Value::as_object) + .and_then(|records_map| { + find_record_with_trace_context(records_map).or_else(|| { + let (first_key, group) = records_map.iter().next()?; + let first_record = match group { + Value::Array(arr) => arr.first()?.clone(), + Value::Object(obj) => obj.values().next()?.clone(), + _ => return None, + }; + Some((first_key.clone(), first_record)) + }) + }); + + if let Some((chosen_key, chosen_record)) = chosen { + let records_map = payload.get_mut("records").and_then(Value::as_object_mut)?; + records_map.retain(|k, _| k == &chosen_key); + if let Some(entry) = records_map.get_mut(&chosen_key) { + *entry = Value::Array(vec![chosen_record]); } - records_map.retain(|k, _| k == &first_key); } match serde_json::from_value::(payload) { @@ -445,6 +468,41 @@ mod tests { assert!(MSKEvent::is_match(&payload)); } + #[test] + fn test_new_prefers_record_with_trace_context() { + // Two records in topic1: first has no headers, second has x-datadog-trace-id. + // [49, 50, 51] = ASCII "123" + let payload = serde_json::json!({ + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": { + "topic1": [ + { + "topic": "topic1", "partition": 0, "offset": 100, + "timestamp": 1000.0, "timestampType": "CREATE_TIME", + "key": null, "value": null, + "headers": [] + }, + { + "topic": "topic1", "partition": 0, "offset": 101, + "timestamp": 2000.0, "timestampType": "CREATE_TIME", + "key": null, "value": null, + "headers": [{"x-datadog-trace-id": [49, 50, 51]}] + } + ] + } + }); + + let event = MSKEvent::new(payload).expect("Failed to deserialize MSKEvent"); + let carrier = event.get_carrier(); + assert_eq!( + carrier.get("x-datadog-trace-id").map(String::as_str), + Some("123"), + "Should pick the record with trace context, not the first one" + ); + } + #[test] fn test_get_carrier_with_headers() { let json = read_json_file("msk_event_with_headers.json"); From bb201878ed523fcbed2b1e14d13afc00190c8e0e Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 18 Mar 2026 11:34:54 -0400 Subject: [PATCH 14/19] Potential fix for pull request finding performance enhancement from codex Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../invocation/triggers/msk_event.rs | 95 ++++++++++++++----- 1 file changed, 72 insertions(+), 23 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 83d3794f7..5bd52f36e 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -55,23 +55,60 @@ fn bytes_from_header_value(val: &Value) -> Option> { } } +/// Returns true if the `headers` JSON contains a trace context header key. +/// This performs a lightweight scan of the raw JSON structure without decoding +/// header values or allocating intermediate collections. +fn headers_has_trace_context(headers: &Value) -> bool { + // The `headers` field may be either: + // - an array of header entries + // - an object with numeric string keys mapping to header entries + let iter: Box + '_> = match headers { + Value::Array(arr) => Box::new(arr.iter()), + Value::Object(obj) => Box::new(obj.values()), + _ => return false, + }; + + for entry in iter { + if let Value::Object(header_map) = entry { + for key in header_map.keys() { + if key.eq_ignore_ascii_case("x-datadog-trace-id") + || key.eq_ignore_ascii_case("traceparent") + { + return true; + } + } + } + } + + false +} + /// Scans all records in the records map and returns the `(topic_key, record_value)` of the first /// record whose headers contain a tracecontext key. Returns `None` if none found. fn find_record_with_trace_context( records_map: &serde_json::Map, ) -> Option<(String, Value)> { for (key, group) in records_map { - let records: Vec<&Value> = match group { - Value::Array(arr) => arr.iter().collect(), - Value::Object(obj) => obj.values().collect(), - _ => continue, - }; - for record in records { - let headers = record.get("headers").unwrap_or(&Value::Null); - let carrier = headers_to_string_map(headers); - if carrier.contains_key("x-datadog-trace-id") || carrier.contains_key("traceparent") { - return Some((key.clone(), record.clone())); + match group { + Value::Array(arr) => { + for record in arr { + if let Some(headers) = record.get("headers") { + if headers_has_trace_context(headers) { + return Some((key.clone(), record.clone())); + } + } + } } + Value::Object(obj) => { + for record in obj.values() { + if let Some(headers) = record.get("headers") { + if headers_has_trace_context(headers) { + return Some((key.clone(), record.clone())); + } + } + } + } + _ => continue, } } None @@ -83,24 +120,36 @@ fn find_record_with_trace_context( fn headers_to_string_map(headers: &Value) -> HashMap { let mut carrier = HashMap::new(); - let entries: Vec<&Value> = match headers { - Value::Array(arr) => arr.iter().collect(), + match headers { + Value::Array(arr) => { + for entry in arr { + if let Value::Object(header_map) = entry { + for (key, val) in header_map { + if let Some(bytes) = bytes_from_header_value(val) + && let Ok(s) = String::from_utf8(bytes) + { + carrier.insert(key.to_lowercase(), s); + } + } + } + } + } // Object format: numeric string keys are just ordering artifacts from the Java runtime; // insertion order into the HashMap doesn't matter so no sort needed. - Value::Object(obj) => obj.values().collect(), - _ => return carrier, - }; - - for entry in entries { - if let Value::Object(header_map) = entry { - for (key, val) in header_map { - if let Some(bytes) = bytes_from_header_value(val) - && let Ok(s) = String::from_utf8(bytes) - { - carrier.insert(key.to_lowercase(), s); + Value::Object(obj) => { + for entry in obj.values() { + if let Value::Object(header_map) = entry { + for (key, val) in header_map { + if let Some(bytes) = bytes_from_header_value(val) + && let Ok(s) = String::from_utf8(bytes) + { + carrier.insert(key.to_lowercase(), s); + } + } } } } + _ => {} } carrier From 89fee193fa84e82336ae72597237647fea72208c Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 18 Mar 2026 11:36:16 -0400 Subject: [PATCH 15/19] Potential fix for pull request finding performance gain during normalization Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../invocation/triggers/msk_event.rs | 44 ++++++++++++++----- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 5bd52f36e..883d7b100 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -164,17 +164,7 @@ impl Trigger for MSKEvent { let chosen = payload .get("records") .and_then(Value::as_object) - .and_then(|records_map| { - find_record_with_trace_context(records_map).or_else(|| { - let (first_key, group) = records_map.iter().next()?; - let first_record = match group { - Value::Array(arr) => arr.first()?.clone(), - Value::Object(obj) => obj.values().next()?.clone(), - _ => return None, - }; - Some((first_key.clone(), first_record)) - }) - }); + .and_then(|records_map| find_record_with_trace_context(records_map)); if let Some((chosen_key, chosen_record)) = chosen { let records_map = payload.get_mut("records").and_then(Value::as_object_mut)?; @@ -182,6 +172,38 @@ impl Trigger for MSKEvent { if let Some(entry) = records_map.get_mut(&chosen_key) { *entry = Value::Array(vec![chosen_record]); } + } else { + // Fallback: no record with Datadog trace context; normalize to the very first record + // without cloning the full record payload. + let records_map = payload.get_mut("records").and_then(Value::as_object_mut)?; + if let Some((first_key, group)) = records_map.iter_mut().next() { + let chosen_key = first_key.clone(); + // Retain only the chosen topic/partition group. + records_map.retain(|k, _| k == &chosen_key); + match group { + Value::Array(arr) => { + if !arr.is_empty() { + // Move the first record out, drop the rest. + let first = arr.swap_remove(0); + arr.clear(); + arr.push(first); + } + } + Value::Object(obj) => { + if let Some((subkey, val)) = obj.iter_mut().next() { + // Move the first record out under its original key, drop the rest. + let first = std::mem::take(val); + let subkey_cloned = subkey.clone(); + obj.clear(); + obj.insert(subkey_cloned, first); + } + } + _ => { + // Non-array and non-object groups are left as-is, but only the first + // outer key is retained above. + } + } + } } match serde_json::from_value::(payload) { From e321cf858e16c6236bf73ef2f7f0b251c8fea71c Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 18 Mar 2026 11:41:08 -0400 Subject: [PATCH 16/19] fixing issues from codex commits --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 883d7b100..c857a1835 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -176,10 +176,9 @@ impl Trigger for MSKEvent { // Fallback: no record with Datadog trace context; normalize to the very first record // without cloning the full record payload. let records_map = payload.get_mut("records").and_then(Value::as_object_mut)?; - if let Some((first_key, group)) = records_map.iter_mut().next() { - let chosen_key = first_key.clone(); - // Retain only the chosen topic/partition group. - records_map.retain(|k, _| k == &chosen_key); + let first_key = records_map.keys().next()?.to_owned(); + records_map.retain(|k, _| k == &first_key); + if let Some(group) = records_map.get_mut(&first_key) { match group { Value::Array(arr) => { if !arr.is_empty() { From 1b1427d6fb980c2e40e216b8b106cc1b43dff9c2 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 18 Mar 2026 11:52:21 -0400 Subject: [PATCH 17/19] clippy fix --- .../invocation/triggers/msk_event.rs | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index c857a1835..3670f6510 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -92,23 +92,23 @@ fn find_record_with_trace_context( match group { Value::Array(arr) => { for record in arr { - if let Some(headers) = record.get("headers") { - if headers_has_trace_context(headers) { - return Some((key.clone(), record.clone())); - } + if let Some(headers) = record.get("headers") + && headers_has_trace_context(headers) + { + return Some((key.clone(), record.clone())); } } } Value::Object(obj) => { for record in obj.values() { - if let Some(headers) = record.get("headers") { - if headers_has_trace_context(headers) { - return Some((key.clone(), record.clone())); - } + if let Some(headers) = record.get("headers") + && headers_has_trace_context(headers) + { + return Some((key.clone(), record.clone())); } } } - _ => continue, + _ => {} } } None @@ -164,7 +164,7 @@ impl Trigger for MSKEvent { let chosen = payload .get("records") .and_then(Value::as_object) - .and_then(|records_map| find_record_with_trace_context(records_map)); + .and_then(find_record_with_trace_context); if let Some((chosen_key, chosen_record)) = chosen { let records_map = payload.get_mut("records").and_then(Value::as_object_mut)?; From 3c9ab3bcfef4b28a1baa4177dfc1a272fe404bf6 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 18 Mar 2026 12:30:31 -0400 Subject: [PATCH 18/19] codex suggested fixes --- .../invocation/triggers/msk_event.rs | 92 +++++++++++-------- 1 file changed, 52 insertions(+), 40 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 3670f6510..8e46b83d8 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -59,28 +59,21 @@ fn bytes_from_header_value(val: &Value) -> Option> { /// This performs a lightweight scan of the raw JSON structure without decoding /// header values or allocating intermediate collections. fn headers_has_trace_context(headers: &Value) -> bool { - // The `headers` field may be either: - // - an array of header entries - // - an object with numeric string keys mapping to header entries - let iter: Box + '_> = match headers { - Value::Array(arr) => Box::new(arr.iter()), - Value::Object(obj) => Box::new(obj.values()), - _ => return false, - }; - - for entry in iter { + let is_trace_context_entry = |entry: &Value| { if let Value::Object(header_map) = entry { - for key in header_map.keys() { - if key.eq_ignore_ascii_case("x-datadog-trace-id") - || key.eq_ignore_ascii_case("traceparent") - { - return true; - } - } + header_map.keys().any(|k| { + k.eq_ignore_ascii_case("x-datadog-trace-id") + || k.eq_ignore_ascii_case("traceparent") + }) + } else { + false } + }; + match headers { + Value::Array(arr) => arr.iter().any(is_trace_context_entry), + Value::Object(obj) => obj.values().any(is_trace_context_entry), + _ => false, } - - false } /// Scans all records in the records map and returns the `(topic_key, record_value)` of the first @@ -178,29 +171,14 @@ impl Trigger for MSKEvent { let records_map = payload.get_mut("records").and_then(Value::as_object_mut)?; let first_key = records_map.keys().next()?.to_owned(); records_map.retain(|k, _| k == &first_key); - if let Some(group) = records_map.get_mut(&first_key) { - match group { - Value::Array(arr) => { - if !arr.is_empty() { - // Move the first record out, drop the rest. - let first = arr.swap_remove(0); - arr.clear(); - arr.push(first); - } - } + if let Some(entry) = records_map.get_mut(&first_key) { + match entry { + Value::Array(arr) => arr.truncate(1), Value::Object(obj) => { - if let Some((subkey, val)) = obj.iter_mut().next() { - // Move the first record out under its original key, drop the rest. - let first = std::mem::take(val); - let subkey_cloned = subkey.clone(); - obj.clear(); - obj.insert(subkey_cloned, first); - } - } - _ => { - // Non-array and non-object groups are left as-is, but only the first - // outer key is retained above. + let first_record = obj.values().next()?.clone(); + *entry = Value::Array(vec![first_record]); } + _ => return None, } } } @@ -601,4 +579,38 @@ mod tests { Some("_dd.p.dm=-1,_dd.p.tid=699c836500000000") ); } + + /// Verifies that a Java-runtime-format payload (records as object with numeric string keys) + /// without any trace context falls back to the first record and deserializes successfully. + #[test] + fn test_new_java_format_no_trace_context() { + let payload = serde_json::json!({ + "eventSource": "aws:kafka", + "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2", + "bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records": { + "demo-topic-0": { + "0": { + "topic": "demo-topic", "partition": 0, "offset": 5, + "timestamp": 1000.0, "timestampType": "CREATE_TIME", + "key": null, "value": null, + "headers": {} + }, + "1": { + "topic": "demo-topic", "partition": 0, "offset": 6, + "timestamp": 2000.0, "timestampType": "CREATE_TIME", + "key": null, "value": null, + "headers": {} + } + } + } + }); + + let event = MSKEvent::new(payload).expect("Should deserialize despite no trace context"); + let record = event.records.values().find_map(|arr| arr.first()) + .expect("Expected at least one record"); + assert_eq!(record.topic, "demo-topic"); + assert_eq!(record.partition, 0); + assert!(event.get_carrier().is_empty()); + } } From 509a079772f0d880cdca091ed619a3ccddd5e440 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 18 Mar 2026 12:31:58 -0400 Subject: [PATCH 19/19] cargo fmt --- bottlecap/src/lifecycle/invocation/triggers/msk_event.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs index 8e46b83d8..c36607d18 100644 --- a/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs +++ b/bottlecap/src/lifecycle/invocation/triggers/msk_event.rs @@ -607,7 +607,10 @@ mod tests { }); let event = MSKEvent::new(payload).expect("Should deserialize despite no trace context"); - let record = event.records.values().find_map(|arr| arr.first()) + let record = event + .records + .values() + .find_map(|arr| arr.first()) .expect("Expected at least one record"); assert_eq!(record.topic, "demo-topic"); assert_eq!(record.partition, 0);