Skip to content

feat(triggers): handle MSK events#1066

Open
joeyzhao2018 wants to merge 16 commits intomainfrom
joey/handle_msk
Open

feat(triggers): handle MSK events#1066
joeyzhao2018 wants to merge 16 commits intomainfrom
joey/handle_msk

Conversation

@joeyzhao2018
Copy link
Contributor

@joeyzhao2018 joeyzhao2018 commented Mar 5, 2026

https://datadoghq.atlassian.net/browse/SLES-2739

In Kafka's wire protocol (KIP-82), header values are always byte[]. Every Kafka client library enforces this:

Tracer Injection code Mechanism
dd-trace-java headers.add(key, value.getBytes(UTF_8)) String.getBytes() → byte[]
dd-trace-go Value: []byte(val) Go type conversion → []byte
dd-trace-dotnet _headers.Add(name, Encoding.UTF8.GetBytes(value)) UTF8.GetBytes() → byte[]

All three tracers accept string trace context values from the propagation layer, convert to UTF-8 bytes at the carrier adapter boundary, and hand byte[] to the Kafka client.
This isn't a quirk of Java's getBytes() — it's the only way Kafka headers work.

What MSK Lambda does

When MSK triggers a Lambda, AWS serializes the Kafka record to JSON. Since header values are byte[] on the wire, AWS encodes them as decimal byte values. However, the exact JSON
shape depends on the Lambda runtime:

  • Array format (observed in the existing msk_event.json testing payloads, i didn't change the support for this to be safe): byte values as a JSON array of integers
    "headers": [{"x-datadog-trace-id": [51, 54, 57, ...]}]
  • Object format (observed with the Java Lambda runtime): both the records list and the per-header byte values are JSON objects with numeric string keys, and byte values are
    decimal strings
    "records": {
    "topic-0": {
    "0": {
    "headers": {
    "0": {"someOtherHeader": ["70", "114", ...]},
    "2": {"x-datadog-trace-id": {"0":"52","1":"54",...}},
    "4": {"x-datadog-sampling-priority": ["49"]}
    }
    }
    }
    }
  • Note that Datadog headers can appear at any index — non-instrumentation headers may precede them.

What's the difference between the msk_event.json and the newly added msk_event_with_headers.json here?

  • msk_event.json represents a standard MSK trigger where the producer didn't attach any Kafka headers — i.e. no Datadog tracer was running on the producer side (or it's a non-instrumented producer like a raw Kafka client, a Kinesis Firehose delivery stream, or a schema-registry message). In those cases Lambda still delivers the event but with "headers": []. It's also the format you get when testing MSK triggers manually in the AWS console, which doesn't inject headers. ( source: Claude Code)
  • msk_event_with_headers.json reflects the real-world object format produced by the Java Lambda runtime, with a producer instrumented with a Datadog tracer injecting trace context
    as Kafka headers. It includes non-Datadog headers at lower indices to verify that the carrier extraction correctly finds Datadog headers regardless of their position. (source: I did a real world example and below is the evidence of testing)
Screenshot 2026-03-12 at 11 14 33 PM

joeyzhao2018 and others added 10 commits March 9, 2026 15:32
MSK event headers delivered by the Java Lambda runtime use a JSON object
with numeric string keys and decimal string values rather than an array
of integers. Records are similarly delivered as an object with numeric
string keys instead of an array.

Update deserialization and carrier extraction to support both formats,
and update the fixture and tests to reflect the real-world payload shape.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Replace `n as u8` cast with `u8::try_from(n).ok()` to avoid truncation
- Collapse nested `if let` blocks into a single `if let ... && let ...`
- Replace redundant closure `|o| o.len()` with `serde_json::Map::len`

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the changes here?

it's just trying to fix the test failures. I also put this change in a separate pr here #1099 if you prefer that way.

@joeyzhao2018 joeyzhao2018 marked this pull request as ready for review March 13, 2026 15:45
@joeyzhao2018 joeyzhao2018 requested a review from a team as a code owner March 13, 2026 15:45
@joeyzhao2018 joeyzhao2018 requested a review from duncanista March 13, 2026 15:45
@duncanista duncanista changed the title feature: handle MSK events feat(triggers): handle MSK events Mar 13, 2026
@duncanista duncanista requested a review from Copilot March 16, 2026 16:07
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds support for MSK (Managed Streaming for Apache Kafka) events where records and headers are delivered in a JSON object format with numeric string keys (observed with the Java Lambda runtime), in addition to the existing array format. It extracts Datadog trace context from Kafka headers encoded as byte arrays (decimal byte values), enabling distributed tracing for MSK-triggered Lambda functions.

Changes:

  • Added bytes_from_header_value and carrier_from_headers functions to decode Kafka header byte values (both array and object formats) and extract trace propagation headers into a carrier map.
  • Updated MSKEvent::new and is_match to handle both array and object record formats from different Lambda runtimes.
  • Added a new test payload (msk_event_with_headers.json) and tests validating deserialization and carrier extraction for the object-format MSK events.

Reviewed changes

Copilot reviewed 2 out of 3 changed files in this pull request and generated 2 comments.

File Description
bottlecap/src/lifecycle/invocation/triggers/msk_event.rs Core logic: header decoding, carrier extraction, updated new/is_match for object-format records, new tests
bottlecap/tests/payloads/msk_event_with_headers.json New test payload representing Java runtime MSK event with Datadog trace headers
bottlecap/Cargo.lock Routine dependency updates (itertools, quinn-proto)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review. Take the survey.

joeyzhao2018 and others added 4 commits March 16, 2026 12:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants