feat(worker): Add TaskWorker mode for TaskBroker RPC#582
feat(worker): Add TaskWorker mode for TaskBroker RPC#582NicoHinderling wants to merge 10 commits intomainfrom
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
Sentry Build Distribution
|
fcce11c to
fd1fdde
Compare
0217fd3 to
dddb238
Compare
dddb238 to
62b8424
Compare
I missed fixing some logger names when porting code from sentry. Refs getsentry/launchpad#582
| app = TaskbrokerApp( | ||
| name="launchpad", | ||
| producer_factory=producer_factory, | ||
| router_class=CustomRouter(), |
There was a problem hiding this comment.
We'll want worker runtime metrics too. You'll need a taskbroker_client.metrics.MetricsBackend implementation for that.
There was a problem hiding this comment.
alright did a take to implement 🙏
There was a problem hiding this comment.
That looks ok to me. Does launchpad not have an existing metrics API that you could wrap?
There was a problem hiding this comment.
yes this is our current metrics implementation https://github.com/getsentry/launchpad/blob/main/src/launchpad/utils/statsd.py
went ahead and updated this branch to make sure we didn't duplicate logic
|
|
||
| logger.info(f"Starting TaskWorker (rpc_hosts={config.rpc_hosts}, concurrency={config.concurrency})") | ||
|
|
||
| # TODO: Should we explore setting health_check_file_path for K8s file-based liveness probes (TaskWorker has no HTTP server) |
There was a problem hiding this comment.
Yes, the healthcheck file is important as sometimes multiprocessing queues deadlock, and without the healthcheck file the worker will freeze.
| result_queue_maxsize=config.concurrency * 2, | ||
| rebalance_after=32, | ||
| processing_pool_name="launchpad", | ||
| process_type="forkserver", |
There was a problem hiding this comment.
I've had problems with forkserver + sentry as the forking would timeout. Hopefully it works well for launchpad though as it provides measurable memory savings.
There was a problem hiding this comment.
(we synced offline and can consider spawn as an alternative (what is used for sentry) too. regarding the issues that were encountered w forkserver: "the child processes wouldn't start, they would timeout on a socket inside multiprocessing.")
0beb9b8 to
c172651
Compare
8f447fb to
77d269c
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Ensures project_id is converted to string before checking the taskworker-only allowlist, matching the Kafka consumer behavior. Prevents type mismatch if TaskBroker RPC deserializes project_id as an integer. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
_TASKWORKER_ONLY_PROJECT_IDS is computed at module import time, so patching os.environ after import has no effect. Patch the set directly. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Disambiguate from the identically-named class in utils/arroyo_metrics.py which implements the Arroyo Metrics protocol. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
5b4d191 to
e7fae19
Compare
* fix(client) Align logger names with package name I missed fixing some logger names when porting code from sentry. Refs getsentry/launchpad#582 * Align logging scopes more * Align on __name__ as that is the convention we use most often


Summary
Adds a new TaskWorker mode (
launchpad worker) where Launchpad receives tasks via TaskBroker RPC instead of Kafka. This is a lighter-weight execution mode with no HTTP server — the TaskBroker handles task distribution and dispatches work to the worker via gRPC.Depends on #581 (decouples
ArtifactProcessorfrom Kafka message schema).What's included
src/launchpad/worker/: New module —TaskbrokerAppconfig,WorkerConfigwith multi-host support,process_artifacttask registration, in-memory dedup storesrc/launchpad/cli.py: AddsworkerCLI commandrequirements.txt: Addstaskbroker-clientdependency.envrc: AddsLAUNCHPAD_WORKER_RPC_HOSTandLAUNCHPAD_WORKER_CONCURRENCYenv varsMakefile: Addsmake workertargetCLAUDE.mdandREADME.mdto document both operational modesProduction readiness fixes
KAFKA_BOOTSTRAP_SERVERSenv var (was hardcoded to127.0.0.1:9092)broker_hostssupports comma-separated hosts for HAhealth_check_file_path(K8s file-based liveness probes)Kafka-side skip for taskworker-only projects
src/launchpad/kafka.py: Adds_should_skip_kafka_processing()— checksPROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSINGenv var and skips Kafka processing for listed project IDs so they are only handled via the TaskWorker pathtests/unit/test_kafka.py: New unit tests covering skip/no-skip/empty/unset/integer-project-id casesTest plan
make checkpassesmake test-unitpasses (including newtest_worker_config.pyandtest_kafka.py)make workerstarts without errors locally