From d4591fdb6a281470e0ab28110c731a5aa5bfbbfe Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Thu, 12 Mar 2026 21:19:53 -0700 Subject: [PATCH 01/10] feat(worker): Add TaskWorker mode for TaskBroker RPC --- .envrc | 6 ++ CLAUDE.md | 8 ++- Makefile | 4 ++ README.md | 43 ++++++++++++-- requirements.txt | 1 + src/launchpad/cli.py | 29 ++++++++++ src/launchpad/kafka.py | 11 ++++ src/launchpad/utils/logging.py | 1 + src/launchpad/worker/__init__.py | 0 src/launchpad/worker/app.py | 29 ++++++++++ src/launchpad/worker/config.py | 60 +++++++++++++++++++ src/launchpad/worker/tasks.py | 23 ++++++++ tests/unit/test_kafka.py | 98 ++++++++++++++++++++++++++++++++ tests/unit/test_worker_config.py | 52 +++++++++++++++++ 14 files changed, 356 insertions(+), 9 deletions(-) create mode 100644 src/launchpad/worker/__init__.py create mode 100644 src/launchpad/worker/app.py create mode 100644 src/launchpad/worker/config.py create mode 100644 src/launchpad/worker/tasks.py create mode 100644 tests/unit/test_kafka.py create mode 100644 tests/unit/test_worker_config.py diff --git a/.envrc b/.envrc index 943710d0..10a7d825 100644 --- a/.envrc +++ b/.envrc @@ -10,6 +10,10 @@ export LAUNCHPAD_HOST="0.0.0.0" export LAUNCHPAD_PORT="2218" export LAUNCHPAD_RPC_SHARED_SECRET="launchpad-also-very-long-value-haha" export SENTRY_BASE_URL="http://localhost:8000" + +export LAUNCHPAD_WORKER_RPC_HOST="localhost:50051" +export LAUNCHPAD_WORKER_CONCURRENCY="32" + # STATSD_HOST=... # defaults to 127.0.0.1 # STATSD_PORT=... # defaults to 8125 @@ -38,6 +42,8 @@ if ! command -v "$DEVENV" >/dev/null; then fi PATH_add "${PWD}/.devenv/all/bin" +PATH_add "${PWD}/.devenv/bin" + case $(uname -s) in Darwin) PATH_add "${PWD}/.devenv/aarch64-darwin/bin";; *) PATH_add "${PWD}/.devenv/x86_64-linux/bin";; diff --git a/CLAUDE.md b/CLAUDE.md index cd3e6f2b..f7046fb9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -35,14 +35,16 @@ launchpad size app.xcarchive.zip --skip-swift-metadata --skip-symbols # faster # Service development devservices up # Start Kafka infrastructure -make serve # Start Launchpad server +make serve # Start Launchpad server (Kafka mode: HTTP + Kafka consumer) +make worker # Start Launchpad worker (TaskWorker mode: no HTTP server) ``` ## Architecture ### Core Components -- **CLI** (`src/launchpad/cli.py`): Main entry point, uses Click -- **Service** (`src/launchpad/service.py`): Kafka consumer + HTTP server for production +- **CLI** (`src/launchpad/cli.py`): Main entry point, uses Click. Two service commands: `serve` (Kafka mode) and `worker` (TaskWorker mode) +- **Service** (`src/launchpad/service.py`): Kafka consumer + HTTP server for production (`launchpad serve`) +- **Worker** (`src/launchpad/worker/`): TaskWorker mode — receives tasks via TaskBroker RPC, no HTTP server (`launchpad worker`) - **Analyzers** (`src/launchpad/size/analyzers/`): Platform-specific analysis engines (AppleAppAnalyzer, AndroidAnalyzer) - **Parsers** (`src/launchpad/parsers/`): Binary parsing - Mach-O via LIEF, custom DEX parsers - **Insights** (`src/launchpad/size/insights/`): Optimization recommendations (image compression, symbol stripping, etc.) diff --git a/Makefile b/Makefile index 1d8307d9..21ec1ebc 100644 --- a/Makefile +++ b/Makefile @@ -113,6 +113,10 @@ serve: ## Start the Launchpad server with proper Kafka configuration @echo "Starting Launchpad server..." $(PYTHON_VENV) -m launchpad.cli serve --verbose +worker: ## Start the Launchpad TaskWorker (no HTTP server) + @echo "Starting Launchpad TaskWorker..." + $(PYTHON_VENV) -m launchpad.cli worker --verbose + test-kafka-message: ## Send a test message to Kafka (requires Kafka running) $(PYTHON_VENV) scripts/test_kafka.py --count 1 diff --git a/README.md b/README.md index dddf9217..95815c47 100644 --- a/README.md +++ b/README.md @@ -26,8 +26,11 @@ If you don't have devenv installed, [follow these instructions](https://github.c # Start dependency containers (e.g. Kafka) devservices up -# Begin listening for messages -launchpad serve +# Begin listening for messages (Kafka mode) +make serve + +# Or run the TaskWorker instead +make worker # Stop containers devservices down @@ -35,7 +38,31 @@ devservices down ## Usage -Launchpad is primarily designed to run as a Kafka consumer alongside the [Sentry monolith](https://github.com/getsentry/sentry) codebase via `launchpad serve`. +Launchpad can run in two operational modes: + +- **Kafka mode** (`launchpad serve`): HTTP server + Kafka consumer. This is the existing production mode that runs alongside the [Sentry monolith](https://github.com/getsentry/sentry). +- **TaskWorker mode** (`launchpad worker`): TaskWorker only, no HTTP server. This is a lighter-weight mode that receives work via the TaskBroker RPC interface instead of Kafka. + +### Running in Kafka mode + +```bash +devservices up +make serve +# or: launchpad serve --dev +``` + +### Running in TaskWorker mode + +Requires `LAUNCHPAD_WORKER_RPC_HOST` and `LAUNCHPAD_WORKER_CONCURRENCY` environment variables (already configured in `.envrc`). + +The TaskBroker handles task distribution and dispatches work to the worker via RPC. A single worker instance processes tasks in parallel — `LAUNCHPAD_WORKER_CONCURRENCY` controls how many child processes run simultaneously (e.g., 32 means up to 32 artifacts processed in parallel). + +```bash +make worker +# or: launchpad worker -v +``` + +### One-off analysis Alternatively for a one-off analysis, such as a local size analysis, you can invoke our various CLI subcommands. @@ -94,14 +121,18 @@ devservices up --mode ingest devservices serve --workers ``` -Next run `launchpad` in another terminal: +Next run `launchpad` in another terminal using either mode: ```bash +# Kafka mode (HTTP server + Kafka consumer) devservices up -launchpad serve +make serve + +# TaskWorker mode (TaskWorker only, no HTTP server) +make worker ``` -And finally use the `sentry-cli` to upload to your local machine: +And finally use the `sentry-cli` (version 3.0.1 or higher) to upload to your local machine: ```bash sentry-cli --log-level DEBUG \ diff --git a/requirements.txt b/requirements.txt index d00639e4..a990f2b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,5 +19,6 @@ sentry-arroyo==2.34.0 sentry-kafka-schemas==2.1.2 sentry-sdk>=2.36.0 sortedcontainers>=2.4.0 +taskbroker-client @ git+https://github.com/getsentry/taskbroker.git@main#subdirectory=clients/python typing-extensions>=4.15.0 zipfile-zstd==0.0.4 diff --git a/src/launchpad/cli.py b/src/launchpad/cli.py index 8b3adf8e..dd0d7f1b 100644 --- a/src/launchpad/cli.py +++ b/src/launchpad/cli.py @@ -82,6 +82,35 @@ def serve(host: str, port: int, mode: str | None, verbose: bool) -> None: raise click.Abort() +@cli.command() +@click.option("--verbose", "-v", is_flag=True, help="Enable verbose logging output.") +def worker(verbose: bool) -> None: + """Start the Launchpad TaskWorker. + + Runs the TaskWorker only, without an HTTP server. + Requires LAUNCHPAD_WORKER_RPC_HOST and LAUNCHPAD_WORKER_CONCURRENCY env vars. + """ + from .worker.config import run_worker + + setup_logging(verbose=verbose, quiet=False) + + console.print(f"[bold blue]Launchpad TaskWorker v{__version__}[/bold blue]") + console.print("Press Ctrl+C to stop the worker") + console.print() + + try: + run_worker() + except KeyboardInterrupt: + console.print("\n[yellow]Worker stopped by user[/yellow]") + except SystemExit: + raise + except Exception as e: + console.print(f"[bold red]Worker error:[/bold red] {e}") + if verbose: + console.print_exception() + raise click.Abort() + + cli.add_command(size_command) cli.add_command(app_icon_command) cli.add_command(distribution_command) diff --git a/src/launchpad/kafka.py b/src/launchpad/kafka.py index 0678c753..a23fdefa 100644 --- a/src/launchpad/kafka.py +++ b/src/launchpad/kafka.py @@ -37,6 +37,12 @@ PREPROD_ARTIFACT_SCHEMA = get_codec(PREPROD_ARTIFACT_EVENTS_TOPIC) +def _should_skip_kafka_processing(project_id: str) -> bool: + raw = os.getenv("PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING", "") + taskworker_only = {p.strip() for p in raw.split(",") if p.strip()} + return project_id in taskworker_only + + def _process_in_subprocess(decoded_message: Any, log_queue: multiprocessing.Queue[Any]) -> None: """Worker function that runs in subprocess.""" root_logger = logging.getLogger() @@ -93,6 +99,11 @@ def process_kafka_message_with_service( raise artifact_id = decoded.get("artifact_id", "unknown") + project_id = str(decoded.get("project_id", "")) + + if _should_skip_kafka_processing(project_id): + logger.info("Skipping Kafka processing for project %s (taskworker-only)", project_id) + return None # type: ignore[return-value] # Spawn actual processing in a subprocess process = multiprocessing.Process(target=_process_in_subprocess, args=(decoded, log_queue)) diff --git a/src/launchpad/utils/logging.py b/src/launchpad/utils/logging.py index c3c151a1..49a65b57 100644 --- a/src/launchpad/utils/logging.py +++ b/src/launchpad/utils/logging.py @@ -145,6 +145,7 @@ def setup_logging(verbose: bool = False, quiet: bool = False) -> None: logging.getLogger("arroyo.processing.strategies.run_task_with_multiprocessing").setLevel(logging.INFO) logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING) + logging.getLogger("sentry.taskworker").setLevel(logging.INFO) def get_logger(name: str) -> logging.Logger: diff --git a/src/launchpad/worker/__init__.py b/src/launchpad/worker/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/launchpad/worker/app.py b/src/launchpad/worker/app.py new file mode 100644 index 00000000..bebf5879 --- /dev/null +++ b/src/launchpad/worker/app.py @@ -0,0 +1,29 @@ +import os + +from arroyo.backends.kafka import KafkaProducer +from taskbroker_client.app import TaskbrokerApp +from taskbroker_client.router import TaskRouter + + +class CustomRouter(TaskRouter): + def route_namespace(self, name: str) -> str: + return "taskworker" + + +def producer_factory(topic: str) -> KafkaProducer: + bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "127.0.0.1:9092") + config = { + "bootstrap.servers": bootstrap_servers, + "compression.type": "lz4", + "message.max.bytes": 50000000, + } + return KafkaProducer(config) + + +app = TaskbrokerApp( + name="launchpad", + producer_factory=producer_factory, + router_class=CustomRouter(), +) + +app.set_modules(["launchpad.worker.tasks"]) diff --git a/src/launchpad/worker/config.py b/src/launchpad/worker/config.py new file mode 100644 index 00000000..b35f00db --- /dev/null +++ b/src/launchpad/worker/config.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +import os + +from dataclasses import dataclass + +from taskbroker_client.worker import TaskWorker + +from launchpad.sentry_sdk_init import initialize_sentry_sdk +from launchpad.utils.logging import get_logger + +logger = get_logger(__name__) + + +@dataclass +class WorkerConfig: + rpc_hosts: list[str] + concurrency: int + + +def get_worker_config() -> WorkerConfig: + rpc_host = os.getenv("LAUNCHPAD_WORKER_RPC_HOST") + if not rpc_host: + raise ValueError("LAUNCHPAD_WORKER_RPC_HOST environment variable is required") + + rpc_hosts = [h.strip() for h in rpc_host.split(",")] + + concurrency_str = os.getenv("LAUNCHPAD_WORKER_CONCURRENCY") + if not concurrency_str: + raise ValueError("LAUNCHPAD_WORKER_CONCURRENCY environment variable is required") + + try: + concurrency = int(concurrency_str) + except ValueError: + raise ValueError(f"LAUNCHPAD_WORKER_CONCURRENCY must be a valid integer, got: {concurrency_str}") + + return WorkerConfig(rpc_hosts=rpc_hosts, concurrency=concurrency) + + +def run_worker() -> None: + initialize_sentry_sdk() + config = get_worker_config() + + logger.info(f"Starting TaskWorker (rpc_hosts={config.rpc_hosts}, concurrency={config.concurrency})") + + # TODO: Should we explore setting health_check_file_path for K8s file-based liveness probes (TaskWorker has no HTTP server) + worker = TaskWorker( + app_module="launchpad.worker.app:app", + broker_hosts=config.rpc_hosts, + max_child_task_count=100, + concurrency=config.concurrency, + child_tasks_queue_maxsize=config.concurrency * 2, + result_queue_maxsize=config.concurrency * 2, + rebalance_after=32, + processing_pool_name="launchpad", + process_type="forkserver", + ) + + exitcode = worker.start() + raise SystemExit(exitcode) diff --git a/src/launchpad/worker/tasks.py b/src/launchpad/worker/tasks.py new file mode 100644 index 00000000..f39d1887 --- /dev/null +++ b/src/launchpad/worker/tasks.py @@ -0,0 +1,23 @@ +from launchpad.artifact_processor import ArtifactProcessor +from launchpad.constants import PreprodFeature +from launchpad.utils.logging import get_logger, setup_logging + +from .app import app + +logger = get_logger(__name__) + +default = app.taskregistry.create_namespace("default") + + +@default.register(name="process_artifact") +def process_artifact( + artifact_id: str, project_id: str, organization_id: str, requested_features: list[PreprodFeature] +) -> None: + setup_logging() + logger.info(f"Processing artifact {artifact_id}") + logger.info( + f"Params: artifact_id={artifact_id}, project_id={project_id}, " + f"organization_id={organization_id}, requested_features={requested_features}" + ) + ArtifactProcessor.process_message(artifact_id, project_id, organization_id, requested_features) + logger.info(f"Processed artifact {artifact_id}") diff --git a/tests/unit/test_kafka.py b/tests/unit/test_kafka.py new file mode 100644 index 00000000..187b63c2 --- /dev/null +++ b/tests/unit/test_kafka.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import multiprocessing +import threading + +from unittest.mock import MagicMock, patch + +from launchpad.kafka import process_kafka_message_with_service + + +def _make_decoded(project_id: str = "123", artifact_id: str = "art-1") -> dict: + return { + "project_id": project_id, + "artifact_id": artifact_id, + "organization_id": "org-1", + } + + +def _call_process(decoded: dict) -> MagicMock: + msg = MagicMock() + msg.payload.value = b"raw" + + mock_process = MagicMock(spec=multiprocessing.Process) + mock_process.pid = 999 + mock_process.exitcode = 0 + mock_process.is_alive.return_value = False + + factory = MagicMock() + factory._killed_during_rebalance = set() + + with ( + patch("launchpad.kafka.PREPROD_ARTIFACT_SCHEMA") as mock_schema, + patch("launchpad.kafka.multiprocessing.Process", return_value=mock_process) as mock_proc_cls, + ): + mock_schema.decode.return_value = decoded + process_kafka_message_with_service( + msg=msg, + log_queue=MagicMock(), + process_registry={}, + registry_lock=threading.Lock(), + factory=factory, + ) + + return mock_proc_cls + + +class TestTaskworkerOnlyProjectSkip: + def test_skips_project_in_taskworker_only_list(self): + decoded = _make_decoded(project_id="42") + + with patch.dict("os.environ", {"PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING": "42"}): + mock_proc_cls = _call_process(decoded) + + mock_proc_cls.assert_not_called() + + def test_skips_with_multiple_projects_in_list(self): + decoded = _make_decoded(project_id="99") + + with patch.dict("os.environ", {"PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING": "42, 99, 7"}): + mock_proc_cls = _call_process(decoded) + + mock_proc_cls.assert_not_called() + + def test_processes_project_not_in_list(self): + decoded = _make_decoded(project_id="123") + + with patch.dict("os.environ", {"PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING": "42,99"}): + mock_proc_cls = _call_process(decoded) + + mock_proc_cls.assert_called_once() + + def test_processes_when_env_var_empty(self): + decoded = _make_decoded(project_id="123") + + with patch.dict("os.environ", {"PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING": ""}): + mock_proc_cls = _call_process(decoded) + + mock_proc_cls.assert_called_once() + + def test_processes_when_env_var_unset(self): + decoded = _make_decoded(project_id="123") + + with patch.dict("os.environ", {}, clear=False): + import os + + os.environ.pop("PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING", None) + mock_proc_cls = _call_process(decoded) + + mock_proc_cls.assert_called_once() + + def test_handles_integer_project_id(self): + decoded = _make_decoded() + decoded["project_id"] = 42 + + with patch.dict("os.environ", {"PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING": "42"}): + mock_proc_cls = _call_process(decoded) + + mock_proc_cls.assert_not_called() diff --git a/tests/unit/test_worker_config.py b/tests/unit/test_worker_config.py new file mode 100644 index 00000000..9d4590e4 --- /dev/null +++ b/tests/unit/test_worker_config.py @@ -0,0 +1,52 @@ +import os + +from unittest.mock import patch + +import pytest + +from launchpad.worker.config import WorkerConfig, get_worker_config + + +class TestGetWorkerConfig: + def test_valid_config(self): + with patch.dict( + os.environ, + { + "LAUNCHPAD_WORKER_RPC_HOST": "localhost:50051", + "LAUNCHPAD_WORKER_CONCURRENCY": "8", + }, + ): + config = get_worker_config() + assert config == WorkerConfig(rpc_hosts=["localhost:50051"], concurrency=8) + + def test_comma_separated_hosts(self): + with patch.dict( + os.environ, + { + "LAUNCHPAD_WORKER_RPC_HOST": "host1:50051, host2:50051, host3:50051", + "LAUNCHPAD_WORKER_CONCURRENCY": "4", + }, + ): + config = get_worker_config() + assert config.rpc_hosts == ["host1:50051", "host2:50051", "host3:50051"] + + def test_missing_rpc_host(self): + with patch.dict(os.environ, {"LAUNCHPAD_WORKER_CONCURRENCY": "8"}, clear=True): + with pytest.raises(ValueError, match="LAUNCHPAD_WORKER_RPC_HOST"): + get_worker_config() + + def test_missing_concurrency(self): + with patch.dict(os.environ, {"LAUNCHPAD_WORKER_RPC_HOST": "localhost:50051"}, clear=True): + with pytest.raises(ValueError, match="LAUNCHPAD_WORKER_CONCURRENCY"): + get_worker_config() + + def test_invalid_concurrency(self): + with patch.dict( + os.environ, + { + "LAUNCHPAD_WORKER_RPC_HOST": "localhost:50051", + "LAUNCHPAD_WORKER_CONCURRENCY": "not-a-number", + }, + ): + with pytest.raises(ValueError, match="must be a valid integer"): + get_worker_config() From b9a31d192f49ee860654160a3b99d34722deb24d Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Thu, 12 Mar 2026 22:39:32 -0700 Subject: [PATCH 02/10] make it so that actual processing only happens if the project id is allowlisted --- src/launchpad/config.py | 6 ++++++ src/launchpad/kafka.py | 9 ++------- src/launchpad/worker/tasks.py | 9 +++++---- 3 files changed, 13 insertions(+), 11 deletions(-) create mode 100644 src/launchpad/config.py diff --git a/src/launchpad/config.py b/src/launchpad/config.py new file mode 100644 index 00000000..e0ef3d47 --- /dev/null +++ b/src/launchpad/config.py @@ -0,0 +1,6 @@ +import os + + +def is_taskworker_only_project(project_id: str) -> bool: + raw = os.getenv("PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING", "") + return project_id in {p.strip() for p in raw.split(",") if p.strip()} diff --git a/src/launchpad/kafka.py b/src/launchpad/kafka.py index a23fdefa..7f970ad7 100644 --- a/src/launchpad/kafka.py +++ b/src/launchpad/kafka.py @@ -24,6 +24,7 @@ from sentry_kafka_schemas import get_codec from launchpad.artifact_processor import ArtifactProcessor +from launchpad.config import is_taskworker_only_project from launchpad.constants import PREPROD_ARTIFACT_EVENTS_TOPIC from launchpad.tracing import RequestLogFilter from launchpad.utils.arroyo_metrics import DatadogMetricsBackend @@ -37,12 +38,6 @@ PREPROD_ARTIFACT_SCHEMA = get_codec(PREPROD_ARTIFACT_EVENTS_TOPIC) -def _should_skip_kafka_processing(project_id: str) -> bool: - raw = os.getenv("PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING", "") - taskworker_only = {p.strip() for p in raw.split(",") if p.strip()} - return project_id in taskworker_only - - def _process_in_subprocess(decoded_message: Any, log_queue: multiprocessing.Queue[Any]) -> None: """Worker function that runs in subprocess.""" root_logger = logging.getLogger() @@ -101,7 +96,7 @@ def process_kafka_message_with_service( artifact_id = decoded.get("artifact_id", "unknown") project_id = str(decoded.get("project_id", "")) - if _should_skip_kafka_processing(project_id): + if is_taskworker_only_project(project_id): logger.info("Skipping Kafka processing for project %s (taskworker-only)", project_id) return None # type: ignore[return-value] diff --git a/src/launchpad/worker/tasks.py b/src/launchpad/worker/tasks.py index f39d1887..970041fc 100644 --- a/src/launchpad/worker/tasks.py +++ b/src/launchpad/worker/tasks.py @@ -1,5 +1,5 @@ from launchpad.artifact_processor import ArtifactProcessor -from launchpad.constants import PreprodFeature +from launchpad.config import is_taskworker_only_project from launchpad.utils.logging import get_logger, setup_logging from .app import app @@ -10,14 +10,15 @@ @default.register(name="process_artifact") -def process_artifact( - artifact_id: str, project_id: str, organization_id: str, requested_features: list[PreprodFeature] -) -> None: +def process_artifact(artifact_id: str, project_id: str, organization_id: str, requested_features: list[str]) -> None: setup_logging() logger.info(f"Processing artifact {artifact_id}") logger.info( f"Params: artifact_id={artifact_id}, project_id={project_id}, " f"organization_id={organization_id}, requested_features={requested_features}" ) + if not is_taskworker_only_project(project_id): + logger.info("Skipping TaskWorker processing for project %s (not in taskworker-only list)", project_id) + return ArtifactProcessor.process_message(artifact_id, project_id, organization_id, requested_features) logger.info(f"Processed artifact {artifact_id}") From 43d6cac40003cb277ff804aaaf986757066f3716 Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Thu, 12 Mar 2026 22:46:55 -0700 Subject: [PATCH 03/10] fix(worker): Add str() conversion for project_id in TaskWorker task Ensures project_id is converted to string before checking the taskworker-only allowlist, matching the Kafka consumer behavior. Prevents type mismatch if TaskBroker RPC deserializes project_id as an integer. Co-Authored-By: Claude Opus 4.6 --- src/launchpad/worker/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/launchpad/worker/tasks.py b/src/launchpad/worker/tasks.py index 970041fc..b632c17e 100644 --- a/src/launchpad/worker/tasks.py +++ b/src/launchpad/worker/tasks.py @@ -17,7 +17,7 @@ def process_artifact(artifact_id: str, project_id: str, organization_id: str, re f"Params: artifact_id={artifact_id}, project_id={project_id}, " f"organization_id={organization_id}, requested_features={requested_features}" ) - if not is_taskworker_only_project(project_id): + if not is_taskworker_only_project(str(project_id)): logger.info("Skipping TaskWorker processing for project %s (not in taskworker-only list)", project_id) return ArtifactProcessor.process_message(artifact_id, project_id, organization_id, requested_features) From defc79042275a664d523585fe1742304e60fb00e Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Fri, 13 Mar 2026 08:57:08 -0700 Subject: [PATCH 04/10] do 16 not 32 --- .envrc | 2 +- README.md | 2 +- src/launchpad/worker/config.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.envrc b/.envrc index 10a7d825..7b658b74 100644 --- a/.envrc +++ b/.envrc @@ -12,7 +12,7 @@ export LAUNCHPAD_RPC_SHARED_SECRET="launchpad-also-very-long-value-haha" export SENTRY_BASE_URL="http://localhost:8000" export LAUNCHPAD_WORKER_RPC_HOST="localhost:50051" -export LAUNCHPAD_WORKER_CONCURRENCY="32" +export LAUNCHPAD_WORKER_CONCURRENCY="16" # STATSD_HOST=... # defaults to 127.0.0.1 # STATSD_PORT=... # defaults to 8125 diff --git a/README.md b/README.md index 95815c47..c545cc5d 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ make serve Requires `LAUNCHPAD_WORKER_RPC_HOST` and `LAUNCHPAD_WORKER_CONCURRENCY` environment variables (already configured in `.envrc`). -The TaskBroker handles task distribution and dispatches work to the worker via RPC. A single worker instance processes tasks in parallel — `LAUNCHPAD_WORKER_CONCURRENCY` controls how many child processes run simultaneously (e.g., 32 means up to 32 artifacts processed in parallel). +The TaskBroker handles task distribution and dispatches work to the worker via RPC. A single worker instance processes tasks in parallel — `LAUNCHPAD_WORKER_CONCURRENCY` controls how many child processes run simultaneously (e.g., 16 means up to 16 artifacts processed in parallel). ```bash make worker diff --git a/src/launchpad/worker/config.py b/src/launchpad/worker/config.py index b35f00db..fdc7ddcd 100644 --- a/src/launchpad/worker/config.py +++ b/src/launchpad/worker/config.py @@ -51,7 +51,7 @@ def run_worker() -> None: concurrency=config.concurrency, child_tasks_queue_maxsize=config.concurrency * 2, result_queue_maxsize=config.concurrency * 2, - rebalance_after=32, + rebalance_after=16, processing_pool_name="launchpad", process_type="forkserver", ) From 514bfa543b86b67e288ef8acba169789fbface9e Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Fri, 13 Mar 2026 10:34:49 -0700 Subject: [PATCH 05/10] improvements --- requirements.txt | 2 +- src/launchpad/utils/logging.py | 1 - src/launchpad/worker/app.py | 87 ++++++++++++++++++- src/launchpad/worker/config.py | 2 +- .../size/test_file_analysis_integration.py | 2 +- 5 files changed, 89 insertions(+), 5 deletions(-) diff --git a/requirements.txt b/requirements.txt index a990f2b1..06e857b0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,6 +19,6 @@ sentry-arroyo==2.34.0 sentry-kafka-schemas==2.1.2 sentry-sdk>=2.36.0 sortedcontainers>=2.4.0 -taskbroker-client @ git+https://github.com/getsentry/taskbroker.git@main#subdirectory=clients/python +taskbroker-client>=0.1.5 typing-extensions>=4.15.0 zipfile-zstd==0.0.4 diff --git a/src/launchpad/utils/logging.py b/src/launchpad/utils/logging.py index 49a65b57..c3c151a1 100644 --- a/src/launchpad/utils/logging.py +++ b/src/launchpad/utils/logging.py @@ -145,7 +145,6 @@ def setup_logging(verbose: bool = False, quiet: bool = False) -> None: logging.getLogger("arroyo.processing.strategies.run_task_with_multiprocessing").setLevel(logging.INFO) logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING) - logging.getLogger("sentry.taskworker").setLevel(logging.INFO) def get_logger(name: str) -> logging.Logger: diff --git a/src/launchpad/worker/app.py b/src/launchpad/worker/app.py index bebf5879..c0a9b3a9 100644 --- a/src/launchpad/worker/app.py +++ b/src/launchpad/worker/app.py @@ -1,13 +1,97 @@ import os +import resource +import time + +from collections.abc import Generator +from contextlib import contextmanager from arroyo.backends.kafka import KafkaProducer +from datadog.dogstatsd.base import DogStatsd from taskbroker_client.app import TaskbrokerApp +from taskbroker_client.metrics import MetricsBackend, Tags from taskbroker_client.router import TaskRouter +def _convert_tags(tags: Tags | None) -> list[str] | None: + if tags is None: + return None + return [f"{k}:{v}" for k, v in tags.items()] + + +class DatadogMetricsBackend(MetricsBackend): + def __init__(self) -> None: + host = os.getenv("STATSD_HOST", "127.0.0.1") + port_str = os.getenv("STATSD_PORT", "8125") + try: + port = int(port_str) + except ValueError: + raise ValueError(f"STATSD_PORT must be a valid integer, got: {port_str}") + + self._dogstatsd = DogStatsd( + host=host, + port=port, + namespace="launchpad.taskworker", + disable_telemetry=True, + origin_detection_enabled=False, + ) + + def incr( + self, + name: str, + value: int | float = 1, + tags: Tags | None = None, + sample_rate: float | None = None, + ) -> None: + kwargs: dict = {"tags": _convert_tags(tags)} + if sample_rate is not None: + kwargs["sample_rate"] = sample_rate + self._dogstatsd.increment(name, int(value), **kwargs) + + def distribution( + self, + name: str, + value: int | float, + tags: Tags | None = None, + unit: str | None = None, + sample_rate: float | None = None, + ) -> None: + kwargs: dict = {"tags": _convert_tags(tags)} + if sample_rate is not None: + kwargs["sample_rate"] = sample_rate + self._dogstatsd.distribution(name, value, **kwargs) + + @contextmanager + def timer( + self, + key: str, + tags: Tags | None = None, + sample_rate: float | None = None, + stacklevel: int = 0, + ) -> Generator[None]: + start = time.monotonic() + try: + yield + finally: + duration_ms = (time.monotonic() - start) * 1000 + self.distribution(key, duration_ms, tags=tags, unit="millisecond", sample_rate=sample_rate) + + @contextmanager + def track_memory_usage( + self, + key: str, + tags: Tags | None = None, + ) -> Generator[None]: + before = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + try: + yield + finally: + after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + self.distribution(key, after - before, tags=tags, unit="byte") + + class CustomRouter(TaskRouter): def route_namespace(self, name: str) -> str: - return "taskworker" + return "taskworker-launchpad" def producer_factory(topic: str) -> KafkaProducer: @@ -24,6 +108,7 @@ def producer_factory(topic: str) -> KafkaProducer: name="launchpad", producer_factory=producer_factory, router_class=CustomRouter(), + metrics_class=DatadogMetricsBackend(), ) app.set_modules(["launchpad.worker.tasks"]) diff --git a/src/launchpad/worker/config.py b/src/launchpad/worker/config.py index fdc7ddcd..5b01b740 100644 --- a/src/launchpad/worker/config.py +++ b/src/launchpad/worker/config.py @@ -47,7 +47,7 @@ def run_worker() -> None: worker = TaskWorker( app_module="launchpad.worker.app:app", broker_hosts=config.rpc_hosts, - max_child_task_count=100, + max_child_task_count=1000, concurrency=config.concurrency, child_tasks_queue_maxsize=config.concurrency * 2, result_queue_maxsize=config.concurrency * 2, diff --git a/tests/integration/size/test_file_analysis_integration.py b/tests/integration/size/test_file_analysis_integration.py index e3163d1e..2ae98fc9 100644 --- a/tests/integration/size/test_file_analysis_integration.py +++ b/tests/integration/size/test_file_analysis_integration.py @@ -30,7 +30,7 @@ def test_analyze_apple_files_hackernews(self, hackernews_xcarchive_obj): result = analyze_apple_files(hackernews_xcarchive_obj) duration = time.time() - start - assert duration < 1 + assert duration < 3 assert isinstance(result, FileAnalysis) assert len(result.files) == 32 From 282142f868fd5aba9b102380755a70cdd7e51199 Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Fri, 13 Mar 2026 10:58:07 -0700 Subject: [PATCH 06/10] bot comments --- src/launchpad/config.py | 7 +++++-- src/launchpad/worker/app.py | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/launchpad/config.py b/src/launchpad/config.py index e0ef3d47..b2bd0ec5 100644 --- a/src/launchpad/config.py +++ b/src/launchpad/config.py @@ -1,6 +1,9 @@ import os +_TASKWORKER_ONLY_PROJECT_IDS: set[str] = { + p.strip() for p in os.getenv("PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING", "").split(",") if p.strip() +} + def is_taskworker_only_project(project_id: str) -> bool: - raw = os.getenv("PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING", "") - return project_id in {p.strip() for p in raw.split(",") if p.strip()} + return project_id in _TASKWORKER_ONLY_PROJECT_IDS diff --git a/src/launchpad/worker/app.py b/src/launchpad/worker/app.py index c0a9b3a9..db524cb9 100644 --- a/src/launchpad/worker/app.py +++ b/src/launchpad/worker/app.py @@ -1,4 +1,5 @@ import os +import platform import resource import time @@ -11,6 +12,8 @@ from taskbroker_client.metrics import MetricsBackend, Tags from taskbroker_client.router import TaskRouter +_RUSAGE_TO_BYTES = 1 if platform.system() == "Darwin" else 1024 + def _convert_tags(tags: Tags | None) -> list[str] | None: if tags is None: @@ -86,7 +89,7 @@ def track_memory_usage( yield finally: after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss - self.distribution(key, after - before, tags=tags, unit="byte") + self.distribution(key, (after - before) * _RUSAGE_TO_BYTES, tags=tags, unit="byte") class CustomRouter(TaskRouter): From ccf83f876870ef111d117bf40e98553f7cca74aa Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Fri, 13 Mar 2026 11:08:41 -0700 Subject: [PATCH 07/10] fix(worker): Patch config set directly instead of os.environ in tests _TASKWORKER_ONLY_PROJECT_IDS is computed at module import time, so patching os.environ after import has no effect. Patch the set directly. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/unit/test_kafka.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/tests/unit/test_kafka.py b/tests/unit/test_kafka.py index 187b63c2..33d1b68a 100644 --- a/tests/unit/test_kafka.py +++ b/tests/unit/test_kafka.py @@ -48,7 +48,7 @@ class TestTaskworkerOnlyProjectSkip: def test_skips_project_in_taskworker_only_list(self): decoded = _make_decoded(project_id="42") - with patch.dict("os.environ", {"PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING": "42"}): + with patch("launchpad.config._TASKWORKER_ONLY_PROJECT_IDS", {"42"}): mock_proc_cls = _call_process(decoded) mock_proc_cls.assert_not_called() @@ -56,7 +56,7 @@ def test_skips_project_in_taskworker_only_list(self): def test_skips_with_multiple_projects_in_list(self): decoded = _make_decoded(project_id="99") - with patch.dict("os.environ", {"PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING": "42, 99, 7"}): + with patch("launchpad.config._TASKWORKER_ONLY_PROJECT_IDS", {"42", "99", "7"}): mock_proc_cls = _call_process(decoded) mock_proc_cls.assert_not_called() @@ -64,7 +64,7 @@ def test_skips_with_multiple_projects_in_list(self): def test_processes_project_not_in_list(self): decoded = _make_decoded(project_id="123") - with patch.dict("os.environ", {"PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING": "42,99"}): + with patch("launchpad.config._TASKWORKER_ONLY_PROJECT_IDS", {"42", "99"}): mock_proc_cls = _call_process(decoded) mock_proc_cls.assert_called_once() @@ -72,7 +72,7 @@ def test_processes_project_not_in_list(self): def test_processes_when_env_var_empty(self): decoded = _make_decoded(project_id="123") - with patch.dict("os.environ", {"PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING": ""}): + with patch("launchpad.config._TASKWORKER_ONLY_PROJECT_IDS", set()): mock_proc_cls = _call_process(decoded) mock_proc_cls.assert_called_once() @@ -80,10 +80,7 @@ def test_processes_when_env_var_empty(self): def test_processes_when_env_var_unset(self): decoded = _make_decoded(project_id="123") - with patch.dict("os.environ", {}, clear=False): - import os - - os.environ.pop("PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING", None) + with patch("launchpad.config._TASKWORKER_ONLY_PROJECT_IDS", set()): mock_proc_cls = _call_process(decoded) mock_proc_cls.assert_called_once() @@ -92,7 +89,7 @@ def test_handles_integer_project_id(self): decoded = _make_decoded() decoded["project_id"] = 42 - with patch.dict("os.environ", {"PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING": "42"}): + with patch("launchpad.config._TASKWORKER_ONLY_PROJECT_IDS", {"42"}): mock_proc_cls = _call_process(decoded) mock_proc_cls.assert_not_called() From d91cd45e4bb9c16993eecf5058a2f16240e5ca4a Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Fri, 13 Mar 2026 11:34:45 -0700 Subject: [PATCH 08/10] ref(worker): Rename DatadogMetricsBackend to TaskworkerMetricsBackend Disambiguate from the identically-named class in utils/arroyo_metrics.py which implements the Arroyo Metrics protocol. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/launchpad/worker/app.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/launchpad/worker/app.py b/src/launchpad/worker/app.py index db524cb9..12d74764 100644 --- a/src/launchpad/worker/app.py +++ b/src/launchpad/worker/app.py @@ -21,7 +21,7 @@ def _convert_tags(tags: Tags | None) -> list[str] | None: return [f"{k}:{v}" for k, v in tags.items()] -class DatadogMetricsBackend(MetricsBackend): +class TaskworkerMetricsBackend(MetricsBackend): def __init__(self) -> None: host = os.getenv("STATSD_HOST", "127.0.0.1") port_str = os.getenv("STATSD_PORT", "8125") @@ -111,7 +111,7 @@ def producer_factory(topic: str) -> KafkaProducer: name="launchpad", producer_factory=producer_factory, router_class=CustomRouter(), - metrics_class=DatadogMetricsBackend(), + metrics_class=TaskworkerMetricsBackend(), ) app.set_modules(["launchpad.worker.tasks"]) From e7fae1961fa9e1095d720984604b833a04a28ff8 Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Fri, 13 Mar 2026 12:15:21 -0700 Subject: [PATCH 09/10] no more requested_features --- src/launchpad/worker/tasks.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/launchpad/worker/tasks.py b/src/launchpad/worker/tasks.py index b632c17e..b7694d87 100644 --- a/src/launchpad/worker/tasks.py +++ b/src/launchpad/worker/tasks.py @@ -10,15 +10,12 @@ @default.register(name="process_artifact") -def process_artifact(artifact_id: str, project_id: str, organization_id: str, requested_features: list[str]) -> None: +def process_artifact(artifact_id: str, project_id: str, organization_id: str) -> None: setup_logging() logger.info(f"Processing artifact {artifact_id}") - logger.info( - f"Params: artifact_id={artifact_id}, project_id={project_id}, " - f"organization_id={organization_id}, requested_features={requested_features}" - ) + logger.info(f"Params: artifact_id={artifact_id}, project_id={project_id}, organization_id={organization_id}") if not is_taskworker_only_project(str(project_id)): logger.info("Skipping TaskWorker processing for project %s (not in taskworker-only list)", project_id) return - ArtifactProcessor.process_message(artifact_id, project_id, organization_id, requested_features) + ArtifactProcessor.process_message(artifact_id, project_id, organization_id) logger.info(f"Processed artifact {artifact_id}") From 31e60c6b27b0039b9f6edce84f87249bea5b9389 Mon Sep 17 00:00:00 2001 From: Nico Hinderling Date: Mon, 16 Mar 2026 09:36:32 -0700 Subject: [PATCH 10/10] consolidate metrics implementation --- src/launchpad/utils/statsd.py | 37 ++++++++++++++++------------------- src/launchpad/worker/app.py | 18 +++-------------- 2 files changed, 20 insertions(+), 35 deletions(-) diff --git a/src/launchpad/utils/statsd.py b/src/launchpad/utils/statsd.py index 9038ee5e..877b6b3a 100644 --- a/src/launchpad/utils/statsd.py +++ b/src/launchpad/utils/statsd.py @@ -137,6 +137,22 @@ def timed(self, metric: str, tags: list[str] | None = None) -> Any: yield +def create_dogstatsd_client(namespace: str) -> DogStatsd: + host = os.getenv("STATSD_HOST", "127.0.0.1") + port_str = os.getenv("STATSD_PORT", "8125") + try: + port = int(port_str) + except ValueError: + raise ValueError(f"STATSD_PORT must be a valid integer, got: {port_str}") + return DogStatsd( + host=host, + port=port, + namespace=namespace, + disable_telemetry=True, + origin_detection_enabled=False, + ) + + _namespace_to_statsd: dict[str, StatsdInterface] = {} @@ -146,25 +162,6 @@ def get_statsd(namespace_suffix: Literal[None, "consumer"] = None) -> StatsdInte if namespace in _namespace_to_statsd: return _namespace_to_statsd[namespace] - disable_telemetry = True - origin_detection_enabled = False - - host = os.getenv("STATSD_HOST", "127.0.0.1") - port_str = os.getenv("STATSD_PORT", "8125") - - try: - port = int(port_str) - except ValueError: - raise ValueError(f"STATSD_PORT must be a valid integer, got: {port_str}") - - wrapper = DogStatsdWrapper( - DogStatsd( - host=host, - port=port, - namespace=namespace, - disable_telemetry=disable_telemetry, - origin_detection_enabled=origin_detection_enabled, - ) - ) + wrapper = DogStatsdWrapper(create_dogstatsd_client(namespace)) _namespace_to_statsd[namespace] = wrapper return wrapper diff --git a/src/launchpad/worker/app.py b/src/launchpad/worker/app.py index 12d74764..8c8f1a1d 100644 --- a/src/launchpad/worker/app.py +++ b/src/launchpad/worker/app.py @@ -7,11 +7,12 @@ from contextlib import contextmanager from arroyo.backends.kafka import KafkaProducer -from datadog.dogstatsd.base import DogStatsd from taskbroker_client.app import TaskbrokerApp from taskbroker_client.metrics import MetricsBackend, Tags from taskbroker_client.router import TaskRouter +from launchpad.utils.statsd import create_dogstatsd_client + _RUSAGE_TO_BYTES = 1 if platform.system() == "Darwin" else 1024 @@ -23,20 +24,7 @@ def _convert_tags(tags: Tags | None) -> list[str] | None: class TaskworkerMetricsBackend(MetricsBackend): def __init__(self) -> None: - host = os.getenv("STATSD_HOST", "127.0.0.1") - port_str = os.getenv("STATSD_PORT", "8125") - try: - port = int(port_str) - except ValueError: - raise ValueError(f"STATSD_PORT must be a valid integer, got: {port_str}") - - self._dogstatsd = DogStatsd( - host=host, - port=port, - namespace="launchpad.taskworker", - disable_telemetry=True, - origin_detection_enabled=False, - ) + self._dogstatsd = create_dogstatsd_client("launchpad.taskworker") def incr( self,