Skip to content

feat(worker): Add TaskWorker mode for TaskBroker RPC#582

Open
NicoHinderling wants to merge 10 commits intomainfrom
03-12-feat_worker_add_taskworker_mode_for_taskbroker_rpc
Open

feat(worker): Add TaskWorker mode for TaskBroker RPC#582
NicoHinderling wants to merge 10 commits intomainfrom
03-12-feat_worker_add_taskworker_mode_for_taskbroker_rpc

Conversation

@NicoHinderling
Copy link
Contributor

@NicoHinderling NicoHinderling commented Mar 13, 2026

Summary

Adds a new TaskWorker mode (launchpad worker) where Launchpad receives tasks via TaskBroker RPC instead of Kafka. This is a lighter-weight execution mode with no HTTP server — the TaskBroker handles task distribution and dispatches work to the worker via gRPC.

Depends on #581 (decouples ArtifactProcessor from Kafka message schema).

What's included

  • src/launchpad/worker/: New module — TaskbrokerApp config, WorkerConfig with multi-host support, process_artifact task registration, in-memory dedup store
  • src/launchpad/cli.py: Adds worker CLI command
  • requirements.txt: Adds taskbroker-client dependency
  • .envrc: Adds LAUNCHPAD_WORKER_RPC_HOST and LAUNCHPAD_WORKER_CONCURRENCY env vars
  • Makefile: Adds make worker target
  • Docs: Updates CLAUDE.md and README.md to document both operational modes

Production readiness fixes

  • Kafka producer reads KAFKA_BOOTSTRAP_SERVERS env var (was hardcoded to 127.0.0.1:9092)
  • broker_hosts supports comma-separated hosts for HA
  • TODO added for health_check_file_path (K8s file-based liveness probes)

Kafka-side skip for taskworker-only projects

  • src/launchpad/kafka.py: Adds _should_skip_kafka_processing() — checks PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING env var and skips Kafka processing for listed project IDs so they are only handled via the TaskWorker path
  • tests/unit/test_kafka.py: New unit tests covering skip/no-skip/empty/unset/integer-project-id cases

Test plan

  • make check passes
  • make test-unit passes (including new test_worker_config.py and test_kafka.py)
  • make worker starts without errors locally

Copy link
Contributor Author

NicoHinderling commented Mar 13, 2026

@sentry
Copy link
Contributor

sentry bot commented Mar 13, 2026

Sentry Build Distribution

App Name App ID Version Configuration Install Page
Hacker News com.emergetools.hackernews 1.0.2 (13) Release Install Build

@NicoHinderling NicoHinderling force-pushed the 03-12-ref_processor_decouple_artifactprocessor_from_kafka_message_schema branch from fcce11c to fd1fdde Compare March 13, 2026 04:29
@NicoHinderling NicoHinderling force-pushed the 03-12-feat_worker_add_taskworker_mode_for_taskbroker_rpc branch 2 times, most recently from 0217fd3 to dddb238 Compare March 13, 2026 05:01
@NicoHinderling NicoHinderling changed the base branch from 03-12-ref_processor_decouple_artifactprocessor_from_kafka_message_schema to graphite-base/582 March 13, 2026 05:07
@NicoHinderling NicoHinderling marked this pull request as ready for review March 13, 2026 05:29
@NicoHinderling NicoHinderling force-pushed the 03-12-feat_worker_add_taskworker_mode_for_taskbroker_rpc branch from dddb238 to 62b8424 Compare March 13, 2026 05:39
@NicoHinderling NicoHinderling changed the base branch from graphite-base/582 to 03-12-ref_processor_decouple_artifactprocessor_from_kafka_message_schema March 13, 2026 05:39
markstory added a commit to getsentry/taskbroker that referenced this pull request Mar 13, 2026
I missed fixing some logger names when porting code from sentry.

Refs getsentry/launchpad#582
app = TaskbrokerApp(
name="launchpad",
producer_factory=producer_factory,
router_class=CustomRouter(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll want worker runtime metrics too. You'll need a taskbroker_client.metrics.MetricsBackend implementation for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright did a take to implement 🙏

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks ok to me. Does launchpad not have an existing metrics API that you could wrap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this is our current metrics implementation https://github.com/getsentry/launchpad/blob/main/src/launchpad/utils/statsd.py

went ahead and updated this branch to make sure we didn't duplicate logic


logger.info(f"Starting TaskWorker (rpc_hosts={config.rpc_hosts}, concurrency={config.concurrency})")

# TODO: Should we explore setting health_check_file_path for K8s file-based liveness probes (TaskWorker has no HTTP server)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the healthcheck file is important as sometimes multiprocessing queues deadlock, and without the healthcheck file the worker will freeze.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result_queue_maxsize=config.concurrency * 2,
rebalance_after=32,
processing_pool_name="launchpad",
process_type="forkserver",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've had problems with forkserver + sentry as the forking would timeout. Hopefully it works well for launchpad though as it provides measurable memory savings.

Copy link
Contributor Author

@NicoHinderling NicoHinderling Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(we synced offline and can consider spawn as an alternative (what is used for sentry) too. regarding the issues that were encountered w forkserver: "the child processes wouldn't start, they would timeout on a socket inside multiprocessing.")

@NicoHinderling NicoHinderling force-pushed the 03-12-feat_worker_add_taskworker_mode_for_taskbroker_rpc branch from 0beb9b8 to c172651 Compare March 13, 2026 19:07
@NicoHinderling NicoHinderling force-pushed the 03-12-ref_processor_decouple_artifactprocessor_from_kafka_message_schema branch from 8f447fb to 77d269c Compare March 13, 2026 19:07
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Base automatically changed from 03-12-ref_processor_decouple_artifactprocessor_from_kafka_message_schema to main March 13, 2026 19:15
NicoHinderling and others added 5 commits March 13, 2026 12:16
Ensures project_id is converted to string before checking the
taskworker-only allowlist, matching the Kafka consumer behavior.
Prevents type mismatch if TaskBroker RPC deserializes project_id
as an integer.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
NicoHinderling and others added 4 commits March 13, 2026 12:16
_TASKWORKER_ONLY_PROJECT_IDS is computed at module import time, so
patching os.environ after import has no effect. Patch the set directly.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Disambiguate from the identically-named class in utils/arroyo_metrics.py
which implements the Arroyo Metrics protocol.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@NicoHinderling NicoHinderling force-pushed the 03-12-feat_worker_add_taskworker_mode_for_taskbroker_rpc branch from 5b4d191 to e7fae19 Compare March 13, 2026 19:16
markstory added a commit to getsentry/taskbroker that referenced this pull request Mar 16, 2026
* fix(client) Align logger names with package name

I missed fixing some logger names when porting code from sentry.

Refs getsentry/launchpad#582

* Align logging scopes more

* Align on __name__ as that is the convention we use most often
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants