Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
513f0ca
handle MSK events
joeyzhao2018 Mar 5, 2026
fe2c84b
cargo fmt and merge main
joeyzhao2018 Mar 9, 2026
1dbb7d6
fix
joeyzhao2018 Mar 9, 2026
4683724
cargo fmt
joeyzhao2018 Mar 9, 2026
e731d52
Merge branch 'main' into joey/handle_msk
joeyzhao2018 Mar 10, 2026
b13ade2
Merge branch 'main' into joey/handle_msk
joeyzhao2018 Mar 11, 2026
22b24a2
fix critical vulnerability
joeyzhao2018 Mar 11, 2026
9f9af1f
Merge branch 'main' into joey/handle_msk
joeyzhao2018 Mar 11, 2026
40cd56f
fix(msk): handle real-world header format from Java Lambda runtime
joeyzhao2018 Mar 13, 2026
9dbcb53
Merge branch 'main' into joey/handle_msk
joeyzhao2018 Mar 13, 2026
f64e583
fix(msk): fix clippy lints in msk_event header parsing
joeyzhao2018 Mar 13, 2026
46014bb
cargo fmt
joeyzhao2018 Mar 13, 2026
79fe393
Potential fix for pull request finding
joeyzhao2018 Mar 16, 2026
8df55f0
Merge branch 'main' into joey/handle_msk
joeyzhao2018 Mar 16, 2026
148d9f3
rename method and update comment
joeyzhao2018 Mar 16, 2026
9ff4e25
cargo fmt
joeyzhao2018 Mar 16, 2026
dd16016
updates based on codex suggestions
joeyzhao2018 Mar 18, 2026
26c328f
Merge branch 'main' into joey/handle_msk
joeyzhao2018 Mar 18, 2026
aa37257
find the first record with actual tracecontext and remove the unnecce…
joeyzhao2018 Mar 18, 2026
bb20187
Potential fix for pull request finding
joeyzhao2018 Mar 18, 2026
89fee19
Potential fix for pull request finding
joeyzhao2018 Mar 18, 2026
e321cf8
fixing issues from codex commits
joeyzhao2018 Mar 18, 2026
1b1427d
clippy fix
joeyzhao2018 Mar 18, 2026
3c9ab3b
codex suggested fixes
joeyzhao2018 Mar 18, 2026
509a079
cargo fmt
joeyzhao2018 Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 297 additions & 16 deletions bottlecap/src/lifecycle/invocation/triggers/msk_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,164 @@ pub struct MSKRecord {
pub topic: String,
pub partition: i32,
pub timestamp: f64,
#[serde(default)]
pub headers: Value,
}

/// Decodes a header value into raw bytes. Two formats have been observed:
///
/// - **Array**: elements are integers `[104, 101]` or decimal strings `["49"]`
/// - **Object** with numeric string keys and decimal string values: `{"0":"52","1":"54",...}`
fn bytes_from_header_value(val: &Value) -> Option<Vec<u8>> {
match val {
// Array format: elements may be integers `[104, 101]` or decimal strings `["49"]`
Value::Array(arr) => arr
.iter()
.map(|v| match v {
Value::Number(n) => n.as_u64().and_then(|n| u8::try_from(n).ok()),
Value::String(s) => s.parse::<u8>().ok(),
_ => None,
})
.collect(),
// Object format with numeric string keys and decimal string values: `{"0":"52","1":"54",...}`
Value::Object(obj) => {
let mut pairs: Vec<(u64, u8)> = obj
.iter()
.filter_map(|(k, v)| {
Some((k.parse::<u64>().ok()?, v.as_str()?.parse::<u8>().ok()?))
})
.collect();
pairs.sort_by_key(|(idx, _)| *idx);
Some(pairs.into_iter().map(|(_, b)| b).collect())
}
_ => None,
}
}

/// Returns true if the `headers` JSON contains a trace context header key.
/// This performs a lightweight scan of the raw JSON structure without decoding
/// header values or allocating intermediate collections.
fn headers_has_trace_context(headers: &Value) -> bool {
let is_trace_context_entry = |entry: &Value| {
if let Value::Object(header_map) = entry {
header_map.keys().any(|k| {
k.eq_ignore_ascii_case("x-datadog-trace-id")
|| k.eq_ignore_ascii_case("traceparent")
})
} else {
false
}
};
match headers {
Value::Array(arr) => arr.iter().any(is_trace_context_entry),
Value::Object(obj) => obj.values().any(is_trace_context_entry),
_ => false,
}
}

/// Scans all records in the records map and returns the `(topic_key, record_value)` of the first
/// record whose headers contain a tracecontext key. Returns `None` if none found.
fn find_record_with_trace_context(
records_map: &serde_json::Map<String, Value>,
) -> Option<(String, Value)> {
for (key, group) in records_map {
match group {
Value::Array(arr) => {
for record in arr {
if let Some(headers) = record.get("headers")
&& headers_has_trace_context(headers)
{
return Some((key.clone(), record.clone()));
}
}
}
Value::Object(obj) => {
for record in obj.values() {
if let Some(headers) = record.get("headers")
&& headers_has_trace_context(headers)
{
return Some((key.clone(), record.clone()));
}
}
}
Comment on lines +79 to +103
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"The concern is real but the optimization is premature here. The Lambda payload cap is 6MB total, so even in the worst case the clone is bounded. The locator adds a non-trivial amount of complexity (new enum, more complex extraction logic) for a one-time cost per invocation on what's already a cold path. The current code is significantly easier to read and maintain. " by Claude Code

_ => {}
}
}
None
}

/// Decodes an MSK record's `headers` field into a `HashMap<String, String>` by converting
/// each header's byte values to a UTF-8 string. The `headers` field may be either a JSON
/// array or a JSON object with numeric string keys, one entry per Kafka header, ordered by index.
fn headers_to_string_map(headers: &Value) -> HashMap<String, String> {
let mut carrier = HashMap::new();

match headers {
Value::Array(arr) => {
for entry in arr {
if let Value::Object(header_map) = entry {
for (key, val) in header_map {
if let Some(bytes) = bytes_from_header_value(val)
&& let Ok(s) = String::from_utf8(bytes)
{
carrier.insert(key.to_lowercase(), s);
}
}
}
}
}
// Object format: numeric string keys are just ordering artifacts from the Java runtime;
// insertion order into the HashMap doesn't matter so no sort needed.
Value::Object(obj) => {
for entry in obj.values() {
if let Value::Object(header_map) = entry {
for (key, val) in header_map {
if let Some(bytes) = bytes_from_header_value(val)
&& let Ok(s) = String::from_utf8(bytes)
{
carrier.insert(key.to_lowercase(), s);
}
}
}
}
}
_ => {}
}

carrier
}

impl Trigger for MSKEvent {
fn new(mut payload: Value) -> Option<Self> {
// We only care about the first item in the first record, so drop the others before deserializing.
if let Some(records_map) = payload.get_mut("records").and_then(Value::as_object_mut) {
match records_map.iter_mut().next() {
Some((first_key, Value::Array(arr))) => {
arr.truncate(1);
let key = first_key.clone();
records_map.retain(|k, _| k == &key);
}
_ => {
records_map.clear();
// We only need one record: prefer the first one carrying Datadog trace context so we can
// propagate the trace, falling back to the very first record otherwise. Records may be
// delivered as a JSON object with numeric string keys; normalize to a single-element array
// before deserializing.
let chosen = payload
.get("records")
.and_then(Value::as_object)
.and_then(find_record_with_trace_context);

if let Some((chosen_key, chosen_record)) = chosen {
let records_map = payload.get_mut("records").and_then(Value::as_object_mut)?;
records_map.retain(|k, _| k == &chosen_key);
if let Some(entry) = records_map.get_mut(&chosen_key) {
*entry = Value::Array(vec![chosen_record]);
}
} else {
// Fallback: no record with Datadog trace context; normalize to the very first record
// without cloning the full record payload.
let records_map = payload.get_mut("records").and_then(Value::as_object_mut)?;
let first_key = records_map.keys().next()?.to_owned();
records_map.retain(|k, _| k == &first_key);
if let Some(entry) = records_map.get_mut(&first_key) {
match entry {
Value::Array(arr) => arr.truncate(1),
Value::Object(obj) => {
let first_record = obj.values().next()?.clone();
*entry = Value::Array(vec![first_record]);
}
_ => return None,
}
}
}
Expand All @@ -49,13 +193,16 @@ impl Trigger for MSKEvent {
}

fn is_match(payload: &Value) -> bool {
payload
let first_record_group = payload
.get("records")
.and_then(Value::as_object)
.and_then(|map| map.values().next())
.and_then(Value::as_array)
.and_then(|arr| arr.first())
.is_some_and(|rec| rec.get("topic").is_some())
.and_then(|map| map.values().next());
let first_record = match first_record_group {
Some(Value::Array(arr)) => arr.first(),
Some(Value::Object(obj)) => obj.values().next(),
_ => return false,
};
first_record.is_some_and(|rec| rec.get("topic").is_some())
}

#[allow(clippy::cast_possible_truncation)]
Expand Down Expand Up @@ -105,7 +252,12 @@ impl Trigger for MSKEvent {
}

fn get_carrier(&self) -> HashMap<String, String> {
HashMap::new()
self.records
.values()
.find_map(|arr| arr.first())
.map_or_else(HashMap::new, |record| {
headers_to_string_map(&record.headers)
})
}

fn is_async(&self) -> bool {
Expand Down Expand Up @@ -142,6 +294,7 @@ mod tests {
topic: String::from("topic1"),
partition: 0,
timestamp: 1_745_846_213_022f64,
headers: Value::Array(vec![]),
};
let mut expected_records = HashMap::new();
expected_records.insert(String::from("topic1"), vec![record]);
Expand Down Expand Up @@ -335,4 +488,132 @@ mod tests {
"msk" // fallback value
);
}

#[test]
fn test_new_with_headers() {
let json = read_json_file("msk_event_with_headers.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let result = MSKEvent::new(payload).expect("Failed to deserialize into MSKEvent");

let record = result
.records
.values()
.find_map(|arr| arr.first())
.expect("Expected at least one record");
assert_eq!(record.topic, "demo-topic");
// headers is an object with 6 entries (2 non-datadog + 4 datadog)
assert_eq!(
record.headers.as_object().map(serde_json::Map::len),
Some(6)
);
}

#[test]
fn test_is_match_with_headers() {
let json = read_json_file("msk_event_with_headers.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");

assert!(MSKEvent::is_match(&payload));
}

#[test]
fn test_new_prefers_record_with_trace_context() {
// Two records in topic1: first has no headers, second has x-datadog-trace-id.
// [49, 50, 51] = ASCII "123"
let payload = serde_json::json!({
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
"bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"records": {
"topic1": [
{
"topic": "topic1", "partition": 0, "offset": 100,
"timestamp": 1000.0, "timestampType": "CREATE_TIME",
"key": null, "value": null,
"headers": []
},
{
"topic": "topic1", "partition": 0, "offset": 101,
"timestamp": 2000.0, "timestampType": "CREATE_TIME",
"key": null, "value": null,
"headers": [{"x-datadog-trace-id": [49, 50, 51]}]
}
]
}
});

let event = MSKEvent::new(payload).expect("Failed to deserialize MSKEvent");
let carrier = event.get_carrier();
assert_eq!(
carrier.get("x-datadog-trace-id").map(String::as_str),
Some("123"),
"Should pick the record with trace context, not the first one"
);
}

#[test]
fn test_get_carrier_with_headers() {
let json = read_json_file("msk_event_with_headers.json");
let payload = serde_json::from_str(&json).expect("Failed to deserialize into Value");
let event = MSKEvent::new(payload).expect("Failed to deserialize MSKEvent");
let carrier = event.get_carrier();

// Datadog headers appear at indices 2-5; non-datadog headers at 0-1 are also decoded
// but won't be used by the propagator.
assert_eq!(
carrier.get("x-datadog-trace-id").map(String::as_str),
Some("1497116011738644768")
);
assert_eq!(
carrier.get("x-datadog-parent-id").map(String::as_str),
Some("2239801583077304042")
);
assert_eq!(
carrier
.get("x-datadog-sampling-priority")
.map(String::as_str),
Some("1")
);
assert_eq!(
carrier.get("x-datadog-tags").map(String::as_str),
Some("_dd.p.dm=-1,_dd.p.tid=699c836500000000")
);
}

/// Verifies that a Java-runtime-format payload (records as object with numeric string keys)
/// without any trace context falls back to the first record and deserializes successfully.
#[test]
fn test_new_java_format_no_trace_context() {
let payload = serde_json::json!({
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
"bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"records": {
"demo-topic-0": {
"0": {
"topic": "demo-topic", "partition": 0, "offset": 5,
"timestamp": 1000.0, "timestampType": "CREATE_TIME",
"key": null, "value": null,
"headers": {}
},
"1": {
"topic": "demo-topic", "partition": 0, "offset": 6,
"timestamp": 2000.0, "timestampType": "CREATE_TIME",
"key": null, "value": null,
"headers": {}
}
}
}
});

let event = MSKEvent::new(payload).expect("Should deserialize despite no trace context");
let record = event
.records
.values()
.find_map(|arr| arr.first())
.expect("Expected at least one record");
assert_eq!(record.topic, "demo-topic");
assert_eq!(record.partition, 0);
assert!(event.get_carrier().is_empty());
}
}
26 changes: 26 additions & 0 deletions bottlecap/tests/payloads/msk_event_with_headers.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/demo-cluster/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
"bootstrapServers": "b-1.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-2.demo-cluster.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
"records": {
"demo-topic-0": {
"0": {
"topic": "demo-topic",
"partition": 0,
"offset": 101,
"timestamp": 1745846213022,
"timestampType": "CREATE_TIME",
"key": "b3JkZXJJZA==",
"value": "eyJvcmRlcklkIjoiMTIzNCIsImFtb3VudCI6MTAwLjAxfQ==",
"headers": {
"0": {"someId": ["70","114","111","109","66","114","117","110","111"]},
"1": {"anotherId": {"0":"55","1":"52","2":"50","3":"101","4":"101","5":"101","6":"52","7":"57","8":"45","9":"101","10":"51","11":"100","12":"55","13":"45","14":"52","15":"51","16":"52","17":"54","18":"45","19":"97","20":"54","21":"57","22":"57","23":"45","24":"52","25":"100","26":"49","27":"56","28":"101","29":"99","30":"98","31":"53","32":"53","33":"101","34":"50","35":"99"}},
"2": {"x-datadog-trace-id": {"0":"49","1":"52","2":"57","3":"55","4":"49","5":"49","6":"54","7":"48","8":"49","9":"49","10":"55","11":"51","12":"56","13":"54","14":"52","15":"52","16":"55","17":"54","18":"56"}},
"3": {"x-datadog-parent-id": {"0":"50","1":"50","2":"51","3":"57","4":"56","5":"48","6":"49","7":"53","8":"56","9":"51","10":"48","11":"55","12":"55","13":"51","14":"48","15":"52","16":"48","17":"52","18":"50"}},
"4": {"x-datadog-sampling-priority": ["49"]},
"5": {"x-datadog-tags": {"0":"95","1":"100","2":"100","3":"46","4":"112","5":"46","6":"100","7":"109","8":"61","9":"45","10":"49","11":"44","12":"95","13":"100","14":"100","15":"46","16":"112","17":"46","18":"116","19":"105","20":"100","21":"61","22":"54","23":"57","24":"57","25":"99","26":"56","27":"51","28":"54","29":"53","30":"48","31":"48","32":"48","33":"48","34":"48","35":"48","36":"48","37":"48"}}
}
}
}
}
}
Loading