Conversation
d33b0e2 to
fe2c84b
Compare
MSK event headers delivered by the Java Lambda runtime use a JSON object with numeric string keys and decimal string values rather than an array of integers. Records are similarly delivered as an object with numeric string keys instead of an array. Update deserialization and carrier extraction to support both formats, and update the fixture and tests to reflect the real-world payload shape. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Replace `n as u8` cast with `u8::try_from(n).ok()` to avoid truncation - Collapse nested `if let` blocks into a single `if let ... && let ...` - Replace redundant closure `|o| o.len()` with `serde_json::Map::len` Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
why the changes here?
it's just trying to fix the test failures. I also put this change in a separate pr here #1099 if you prefer that way.
There was a problem hiding this comment.
Pull request overview
This PR adds support for MSK (Managed Streaming for Apache Kafka) events where records and headers are delivered in a JSON object format with numeric string keys (observed with the Java Lambda runtime), in addition to the existing array format. It extracts Datadog trace context from Kafka headers encoded as byte arrays (decimal byte values), enabling distributed tracing for MSK-triggered Lambda functions.
Changes:
- Added
bytes_from_header_valueandcarrier_from_headersfunctions to decode Kafka header byte values (both array and object formats) and extract trace propagation headers into a carrier map. - Updated
MSKEvent::newandis_matchto handle both array and object record formats from different Lambda runtimes. - Added a new test payload (
msk_event_with_headers.json) and tests validating deserialization and carrier extraction for the object-format MSK events.
Reviewed changes
Copilot reviewed 2 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
bottlecap/src/lifecycle/invocation/triggers/msk_event.rs |
Core logic: header decoding, carrier extraction, updated new/is_match for object-format records, new tests |
bottlecap/tests/payloads/msk_event_with_headers.json |
New test payload representing Java runtime MSK event with Datadog trace headers |
bottlecap/Cargo.lock |
Routine dependency updates (itertools, quinn-proto) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
https://datadoghq.atlassian.net/browse/SLES-2739
In Kafka's wire protocol (KIP-82), header values are always byte[]. Every Kafka client library enforces this:
All three tracers accept string trace context values from the propagation layer, convert to UTF-8 bytes at the carrier adapter boundary, and hand byte[] to the Kafka client.
This isn't a quirk of Java's getBytes() — it's the only way Kafka headers work.
What MSK Lambda does
When MSK triggers a Lambda, AWS serializes the Kafka record to JSON. Since header values are byte[] on the wire, AWS encodes them as decimal byte values. However, the exact JSON
shape depends on the Lambda runtime:
"headers": [{"x-datadog-trace-id": [51, 54, 57, ...]}]
decimal strings
"records": {
"topic-0": {
"0": {
"headers": {
"0": {"someOtherHeader": ["70", "114", ...]},
"2": {"x-datadog-trace-id": {"0":"52","1":"54",...}},
"4": {"x-datadog-sampling-priority": ["49"]}
}
}
}
}
What's the difference between the msk_event.json and the newly added
msk_event_with_headers.jsonhere?as Kafka headers. It includes non-Datadog headers at lower indices to verify that the carrier extraction correctly finds Datadog headers regardless of their position. (source: I did a real world example and below is the evidence of testing)