diff --git a/.envrc b/.envrc index 943710d0..7b658b74 100644 --- a/.envrc +++ b/.envrc @@ -10,6 +10,10 @@ export LAUNCHPAD_HOST="0.0.0.0" export LAUNCHPAD_PORT="2218" export LAUNCHPAD_RPC_SHARED_SECRET="launchpad-also-very-long-value-haha" export SENTRY_BASE_URL="http://localhost:8000" + +export LAUNCHPAD_WORKER_RPC_HOST="localhost:50051" +export LAUNCHPAD_WORKER_CONCURRENCY="16" + # STATSD_HOST=... # defaults to 127.0.0.1 # STATSD_PORT=... # defaults to 8125 @@ -38,6 +42,8 @@ if ! command -v "$DEVENV" >/dev/null; then fi PATH_add "${PWD}/.devenv/all/bin" +PATH_add "${PWD}/.devenv/bin" + case $(uname -s) in Darwin) PATH_add "${PWD}/.devenv/aarch64-darwin/bin";; *) PATH_add "${PWD}/.devenv/x86_64-linux/bin";; diff --git a/CLAUDE.md b/CLAUDE.md index cd3e6f2b..f7046fb9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -35,14 +35,16 @@ launchpad size app.xcarchive.zip --skip-swift-metadata --skip-symbols # faster # Service development devservices up # Start Kafka infrastructure -make serve # Start Launchpad server +make serve # Start Launchpad server (Kafka mode: HTTP + Kafka consumer) +make worker # Start Launchpad worker (TaskWorker mode: no HTTP server) ``` ## Architecture ### Core Components -- **CLI** (`src/launchpad/cli.py`): Main entry point, uses Click -- **Service** (`src/launchpad/service.py`): Kafka consumer + HTTP server for production +- **CLI** (`src/launchpad/cli.py`): Main entry point, uses Click. Two service commands: `serve` (Kafka mode) and `worker` (TaskWorker mode) +- **Service** (`src/launchpad/service.py`): Kafka consumer + HTTP server for production (`launchpad serve`) +- **Worker** (`src/launchpad/worker/`): TaskWorker mode — receives tasks via TaskBroker RPC, no HTTP server (`launchpad worker`) - **Analyzers** (`src/launchpad/size/analyzers/`): Platform-specific analysis engines (AppleAppAnalyzer, AndroidAnalyzer) - **Parsers** (`src/launchpad/parsers/`): Binary parsing - Mach-O via LIEF, custom DEX parsers - **Insights** (`src/launchpad/size/insights/`): Optimization recommendations (image compression, symbol stripping, etc.) diff --git a/Makefile b/Makefile index 1d8307d9..21ec1ebc 100644 --- a/Makefile +++ b/Makefile @@ -113,6 +113,10 @@ serve: ## Start the Launchpad server with proper Kafka configuration @echo "Starting Launchpad server..." $(PYTHON_VENV) -m launchpad.cli serve --verbose +worker: ## Start the Launchpad TaskWorker (no HTTP server) + @echo "Starting Launchpad TaskWorker..." + $(PYTHON_VENV) -m launchpad.cli worker --verbose + test-kafka-message: ## Send a test message to Kafka (requires Kafka running) $(PYTHON_VENV) scripts/test_kafka.py --count 1 diff --git a/README.md b/README.md index dddf9217..c545cc5d 100644 --- a/README.md +++ b/README.md @@ -26,8 +26,11 @@ If you don't have devenv installed, [follow these instructions](https://github.c # Start dependency containers (e.g. Kafka) devservices up -# Begin listening for messages -launchpad serve +# Begin listening for messages (Kafka mode) +make serve + +# Or run the TaskWorker instead +make worker # Stop containers devservices down @@ -35,7 +38,31 @@ devservices down ## Usage -Launchpad is primarily designed to run as a Kafka consumer alongside the [Sentry monolith](https://github.com/getsentry/sentry) codebase via `launchpad serve`. +Launchpad can run in two operational modes: + +- **Kafka mode** (`launchpad serve`): HTTP server + Kafka consumer. This is the existing production mode that runs alongside the [Sentry monolith](https://github.com/getsentry/sentry). +- **TaskWorker mode** (`launchpad worker`): TaskWorker only, no HTTP server. This is a lighter-weight mode that receives work via the TaskBroker RPC interface instead of Kafka. + +### Running in Kafka mode + +```bash +devservices up +make serve +# or: launchpad serve --dev +``` + +### Running in TaskWorker mode + +Requires `LAUNCHPAD_WORKER_RPC_HOST` and `LAUNCHPAD_WORKER_CONCURRENCY` environment variables (already configured in `.envrc`). + +The TaskBroker handles task distribution and dispatches work to the worker via RPC. A single worker instance processes tasks in parallel — `LAUNCHPAD_WORKER_CONCURRENCY` controls how many child processes run simultaneously (e.g., 16 means up to 16 artifacts processed in parallel). + +```bash +make worker +# or: launchpad worker -v +``` + +### One-off analysis Alternatively for a one-off analysis, such as a local size analysis, you can invoke our various CLI subcommands. @@ -94,14 +121,18 @@ devservices up --mode ingest devservices serve --workers ``` -Next run `launchpad` in another terminal: +Next run `launchpad` in another terminal using either mode: ```bash +# Kafka mode (HTTP server + Kafka consumer) devservices up -launchpad serve +make serve + +# TaskWorker mode (TaskWorker only, no HTTP server) +make worker ``` -And finally use the `sentry-cli` to upload to your local machine: +And finally use the `sentry-cli` (version 3.0.1 or higher) to upload to your local machine: ```bash sentry-cli --log-level DEBUG \ diff --git a/requirements.txt b/requirements.txt index d00639e4..06e857b0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,5 +19,6 @@ sentry-arroyo==2.34.0 sentry-kafka-schemas==2.1.2 sentry-sdk>=2.36.0 sortedcontainers>=2.4.0 +taskbroker-client>=0.1.5 typing-extensions>=4.15.0 zipfile-zstd==0.0.4 diff --git a/src/launchpad/cli.py b/src/launchpad/cli.py index 8b3adf8e..dd0d7f1b 100644 --- a/src/launchpad/cli.py +++ b/src/launchpad/cli.py @@ -82,6 +82,35 @@ def serve(host: str, port: int, mode: str | None, verbose: bool) -> None: raise click.Abort() +@cli.command() +@click.option("--verbose", "-v", is_flag=True, help="Enable verbose logging output.") +def worker(verbose: bool) -> None: + """Start the Launchpad TaskWorker. + + Runs the TaskWorker only, without an HTTP server. + Requires LAUNCHPAD_WORKER_RPC_HOST and LAUNCHPAD_WORKER_CONCURRENCY env vars. + """ + from .worker.config import run_worker + + setup_logging(verbose=verbose, quiet=False) + + console.print(f"[bold blue]Launchpad TaskWorker v{__version__}[/bold blue]") + console.print("Press Ctrl+C to stop the worker") + console.print() + + try: + run_worker() + except KeyboardInterrupt: + console.print("\n[yellow]Worker stopped by user[/yellow]") + except SystemExit: + raise + except Exception as e: + console.print(f"[bold red]Worker error:[/bold red] {e}") + if verbose: + console.print_exception() + raise click.Abort() + + cli.add_command(size_command) cli.add_command(app_icon_command) cli.add_command(distribution_command) diff --git a/src/launchpad/config.py b/src/launchpad/config.py new file mode 100644 index 00000000..b2bd0ec5 --- /dev/null +++ b/src/launchpad/config.py @@ -0,0 +1,9 @@ +import os + +_TASKWORKER_ONLY_PROJECT_IDS: set[str] = { + p.strip() for p in os.getenv("PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING", "").split(",") if p.strip() +} + + +def is_taskworker_only_project(project_id: str) -> bool: + return project_id in _TASKWORKER_ONLY_PROJECT_IDS diff --git a/src/launchpad/kafka.py b/src/launchpad/kafka.py index 0678c753..7f970ad7 100644 --- a/src/launchpad/kafka.py +++ b/src/launchpad/kafka.py @@ -24,6 +24,7 @@ from sentry_kafka_schemas import get_codec from launchpad.artifact_processor import ArtifactProcessor +from launchpad.config import is_taskworker_only_project from launchpad.constants import PREPROD_ARTIFACT_EVENTS_TOPIC from launchpad.tracing import RequestLogFilter from launchpad.utils.arroyo_metrics import DatadogMetricsBackend @@ -93,6 +94,11 @@ def process_kafka_message_with_service( raise artifact_id = decoded.get("artifact_id", "unknown") + project_id = str(decoded.get("project_id", "")) + + if is_taskworker_only_project(project_id): + logger.info("Skipping Kafka processing for project %s (taskworker-only)", project_id) + return None # type: ignore[return-value] # Spawn actual processing in a subprocess process = multiprocessing.Process(target=_process_in_subprocess, args=(decoded, log_queue)) diff --git a/src/launchpad/utils/statsd.py b/src/launchpad/utils/statsd.py index 9038ee5e..877b6b3a 100644 --- a/src/launchpad/utils/statsd.py +++ b/src/launchpad/utils/statsd.py @@ -137,6 +137,22 @@ def timed(self, metric: str, tags: list[str] | None = None) -> Any: yield +def create_dogstatsd_client(namespace: str) -> DogStatsd: + host = os.getenv("STATSD_HOST", "127.0.0.1") + port_str = os.getenv("STATSD_PORT", "8125") + try: + port = int(port_str) + except ValueError: + raise ValueError(f"STATSD_PORT must be a valid integer, got: {port_str}") + return DogStatsd( + host=host, + port=port, + namespace=namespace, + disable_telemetry=True, + origin_detection_enabled=False, + ) + + _namespace_to_statsd: dict[str, StatsdInterface] = {} @@ -146,25 +162,6 @@ def get_statsd(namespace_suffix: Literal[None, "consumer"] = None) -> StatsdInte if namespace in _namespace_to_statsd: return _namespace_to_statsd[namespace] - disable_telemetry = True - origin_detection_enabled = False - - host = os.getenv("STATSD_HOST", "127.0.0.1") - port_str = os.getenv("STATSD_PORT", "8125") - - try: - port = int(port_str) - except ValueError: - raise ValueError(f"STATSD_PORT must be a valid integer, got: {port_str}") - - wrapper = DogStatsdWrapper( - DogStatsd( - host=host, - port=port, - namespace=namespace, - disable_telemetry=disable_telemetry, - origin_detection_enabled=origin_detection_enabled, - ) - ) + wrapper = DogStatsdWrapper(create_dogstatsd_client(namespace)) _namespace_to_statsd[namespace] = wrapper return wrapper diff --git a/src/launchpad/worker/__init__.py b/src/launchpad/worker/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/launchpad/worker/app.py b/src/launchpad/worker/app.py new file mode 100644 index 00000000..8c8f1a1d --- /dev/null +++ b/src/launchpad/worker/app.py @@ -0,0 +1,105 @@ +import os +import platform +import resource +import time + +from collections.abc import Generator +from contextlib import contextmanager + +from arroyo.backends.kafka import KafkaProducer +from taskbroker_client.app import TaskbrokerApp +from taskbroker_client.metrics import MetricsBackend, Tags +from taskbroker_client.router import TaskRouter + +from launchpad.utils.statsd import create_dogstatsd_client + +_RUSAGE_TO_BYTES = 1 if platform.system() == "Darwin" else 1024 + + +def _convert_tags(tags: Tags | None) -> list[str] | None: + if tags is None: + return None + return [f"{k}:{v}" for k, v in tags.items()] + + +class TaskworkerMetricsBackend(MetricsBackend): + def __init__(self) -> None: + self._dogstatsd = create_dogstatsd_client("launchpad.taskworker") + + def incr( + self, + name: str, + value: int | float = 1, + tags: Tags | None = None, + sample_rate: float | None = None, + ) -> None: + kwargs: dict = {"tags": _convert_tags(tags)} + if sample_rate is not None: + kwargs["sample_rate"] = sample_rate + self._dogstatsd.increment(name, int(value), **kwargs) + + def distribution( + self, + name: str, + value: int | float, + tags: Tags | None = None, + unit: str | None = None, + sample_rate: float | None = None, + ) -> None: + kwargs: dict = {"tags": _convert_tags(tags)} + if sample_rate is not None: + kwargs["sample_rate"] = sample_rate + self._dogstatsd.distribution(name, value, **kwargs) + + @contextmanager + def timer( + self, + key: str, + tags: Tags | None = None, + sample_rate: float | None = None, + stacklevel: int = 0, + ) -> Generator[None]: + start = time.monotonic() + try: + yield + finally: + duration_ms = (time.monotonic() - start) * 1000 + self.distribution(key, duration_ms, tags=tags, unit="millisecond", sample_rate=sample_rate) + + @contextmanager + def track_memory_usage( + self, + key: str, + tags: Tags | None = None, + ) -> Generator[None]: + before = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + try: + yield + finally: + after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + self.distribution(key, (after - before) * _RUSAGE_TO_BYTES, tags=tags, unit="byte") + + +class CustomRouter(TaskRouter): + def route_namespace(self, name: str) -> str: + return "taskworker-launchpad" + + +def producer_factory(topic: str) -> KafkaProducer: + bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "127.0.0.1:9092") + config = { + "bootstrap.servers": bootstrap_servers, + "compression.type": "lz4", + "message.max.bytes": 50000000, + } + return KafkaProducer(config) + + +app = TaskbrokerApp( + name="launchpad", + producer_factory=producer_factory, + router_class=CustomRouter(), + metrics_class=TaskworkerMetricsBackend(), +) + +app.set_modules(["launchpad.worker.tasks"]) diff --git a/src/launchpad/worker/config.py b/src/launchpad/worker/config.py new file mode 100644 index 00000000..5b01b740 --- /dev/null +++ b/src/launchpad/worker/config.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +import os + +from dataclasses import dataclass + +from taskbroker_client.worker import TaskWorker + +from launchpad.sentry_sdk_init import initialize_sentry_sdk +from launchpad.utils.logging import get_logger + +logger = get_logger(__name__) + + +@dataclass +class WorkerConfig: + rpc_hosts: list[str] + concurrency: int + + +def get_worker_config() -> WorkerConfig: + rpc_host = os.getenv("LAUNCHPAD_WORKER_RPC_HOST") + if not rpc_host: + raise ValueError("LAUNCHPAD_WORKER_RPC_HOST environment variable is required") + + rpc_hosts = [h.strip() for h in rpc_host.split(",")] + + concurrency_str = os.getenv("LAUNCHPAD_WORKER_CONCURRENCY") + if not concurrency_str: + raise ValueError("LAUNCHPAD_WORKER_CONCURRENCY environment variable is required") + + try: + concurrency = int(concurrency_str) + except ValueError: + raise ValueError(f"LAUNCHPAD_WORKER_CONCURRENCY must be a valid integer, got: {concurrency_str}") + + return WorkerConfig(rpc_hosts=rpc_hosts, concurrency=concurrency) + + +def run_worker() -> None: + initialize_sentry_sdk() + config = get_worker_config() + + logger.info(f"Starting TaskWorker (rpc_hosts={config.rpc_hosts}, concurrency={config.concurrency})") + + # TODO: Should we explore setting health_check_file_path for K8s file-based liveness probes (TaskWorker has no HTTP server) + worker = TaskWorker( + app_module="launchpad.worker.app:app", + broker_hosts=config.rpc_hosts, + max_child_task_count=1000, + concurrency=config.concurrency, + child_tasks_queue_maxsize=config.concurrency * 2, + result_queue_maxsize=config.concurrency * 2, + rebalance_after=16, + processing_pool_name="launchpad", + process_type="forkserver", + ) + + exitcode = worker.start() + raise SystemExit(exitcode) diff --git a/src/launchpad/worker/tasks.py b/src/launchpad/worker/tasks.py new file mode 100644 index 00000000..b7694d87 --- /dev/null +++ b/src/launchpad/worker/tasks.py @@ -0,0 +1,21 @@ +from launchpad.artifact_processor import ArtifactProcessor +from launchpad.config import is_taskworker_only_project +from launchpad.utils.logging import get_logger, setup_logging + +from .app import app + +logger = get_logger(__name__) + +default = app.taskregistry.create_namespace("default") + + +@default.register(name="process_artifact") +def process_artifact(artifact_id: str, project_id: str, organization_id: str) -> None: + setup_logging() + logger.info(f"Processing artifact {artifact_id}") + logger.info(f"Params: artifact_id={artifact_id}, project_id={project_id}, organization_id={organization_id}") + if not is_taskworker_only_project(str(project_id)): + logger.info("Skipping TaskWorker processing for project %s (not in taskworker-only list)", project_id) + return + ArtifactProcessor.process_message(artifact_id, project_id, organization_id) + logger.info(f"Processed artifact {artifact_id}") diff --git a/tests/integration/size/test_file_analysis_integration.py b/tests/integration/size/test_file_analysis_integration.py index e3163d1e..2ae98fc9 100644 --- a/tests/integration/size/test_file_analysis_integration.py +++ b/tests/integration/size/test_file_analysis_integration.py @@ -30,7 +30,7 @@ def test_analyze_apple_files_hackernews(self, hackernews_xcarchive_obj): result = analyze_apple_files(hackernews_xcarchive_obj) duration = time.time() - start - assert duration < 1 + assert duration < 3 assert isinstance(result, FileAnalysis) assert len(result.files) == 32 diff --git a/tests/unit/test_kafka.py b/tests/unit/test_kafka.py new file mode 100644 index 00000000..33d1b68a --- /dev/null +++ b/tests/unit/test_kafka.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import multiprocessing +import threading + +from unittest.mock import MagicMock, patch + +from launchpad.kafka import process_kafka_message_with_service + + +def _make_decoded(project_id: str = "123", artifact_id: str = "art-1") -> dict: + return { + "project_id": project_id, + "artifact_id": artifact_id, + "organization_id": "org-1", + } + + +def _call_process(decoded: dict) -> MagicMock: + msg = MagicMock() + msg.payload.value = b"raw" + + mock_process = MagicMock(spec=multiprocessing.Process) + mock_process.pid = 999 + mock_process.exitcode = 0 + mock_process.is_alive.return_value = False + + factory = MagicMock() + factory._killed_during_rebalance = set() + + with ( + patch("launchpad.kafka.PREPROD_ARTIFACT_SCHEMA") as mock_schema, + patch("launchpad.kafka.multiprocessing.Process", return_value=mock_process) as mock_proc_cls, + ): + mock_schema.decode.return_value = decoded + process_kafka_message_with_service( + msg=msg, + log_queue=MagicMock(), + process_registry={}, + registry_lock=threading.Lock(), + factory=factory, + ) + + return mock_proc_cls + + +class TestTaskworkerOnlyProjectSkip: + def test_skips_project_in_taskworker_only_list(self): + decoded = _make_decoded(project_id="42") + + with patch("launchpad.config._TASKWORKER_ONLY_PROJECT_IDS", {"42"}): + mock_proc_cls = _call_process(decoded) + + mock_proc_cls.assert_not_called() + + def test_skips_with_multiple_projects_in_list(self): + decoded = _make_decoded(project_id="99") + + with patch("launchpad.config._TASKWORKER_ONLY_PROJECT_IDS", {"42", "99", "7"}): + mock_proc_cls = _call_process(decoded) + + mock_proc_cls.assert_not_called() + + def test_processes_project_not_in_list(self): + decoded = _make_decoded(project_id="123") + + with patch("launchpad.config._TASKWORKER_ONLY_PROJECT_IDS", {"42", "99"}): + mock_proc_cls = _call_process(decoded) + + mock_proc_cls.assert_called_once() + + def test_processes_when_env_var_empty(self): + decoded = _make_decoded(project_id="123") + + with patch("launchpad.config._TASKWORKER_ONLY_PROJECT_IDS", set()): + mock_proc_cls = _call_process(decoded) + + mock_proc_cls.assert_called_once() + + def test_processes_when_env_var_unset(self): + decoded = _make_decoded(project_id="123") + + with patch("launchpad.config._TASKWORKER_ONLY_PROJECT_IDS", set()): + mock_proc_cls = _call_process(decoded) + + mock_proc_cls.assert_called_once() + + def test_handles_integer_project_id(self): + decoded = _make_decoded() + decoded["project_id"] = 42 + + with patch("launchpad.config._TASKWORKER_ONLY_PROJECT_IDS", {"42"}): + mock_proc_cls = _call_process(decoded) + + mock_proc_cls.assert_not_called() diff --git a/tests/unit/test_worker_config.py b/tests/unit/test_worker_config.py new file mode 100644 index 00000000..9d4590e4 --- /dev/null +++ b/tests/unit/test_worker_config.py @@ -0,0 +1,52 @@ +import os + +from unittest.mock import patch + +import pytest + +from launchpad.worker.config import WorkerConfig, get_worker_config + + +class TestGetWorkerConfig: + def test_valid_config(self): + with patch.dict( + os.environ, + { + "LAUNCHPAD_WORKER_RPC_HOST": "localhost:50051", + "LAUNCHPAD_WORKER_CONCURRENCY": "8", + }, + ): + config = get_worker_config() + assert config == WorkerConfig(rpc_hosts=["localhost:50051"], concurrency=8) + + def test_comma_separated_hosts(self): + with patch.dict( + os.environ, + { + "LAUNCHPAD_WORKER_RPC_HOST": "host1:50051, host2:50051, host3:50051", + "LAUNCHPAD_WORKER_CONCURRENCY": "4", + }, + ): + config = get_worker_config() + assert config.rpc_hosts == ["host1:50051", "host2:50051", "host3:50051"] + + def test_missing_rpc_host(self): + with patch.dict(os.environ, {"LAUNCHPAD_WORKER_CONCURRENCY": "8"}, clear=True): + with pytest.raises(ValueError, match="LAUNCHPAD_WORKER_RPC_HOST"): + get_worker_config() + + def test_missing_concurrency(self): + with patch.dict(os.environ, {"LAUNCHPAD_WORKER_RPC_HOST": "localhost:50051"}, clear=True): + with pytest.raises(ValueError, match="LAUNCHPAD_WORKER_CONCURRENCY"): + get_worker_config() + + def test_invalid_concurrency(self): + with patch.dict( + os.environ, + { + "LAUNCHPAD_WORKER_RPC_HOST": "localhost:50051", + "LAUNCHPAD_WORKER_CONCURRENCY": "not-a-number", + }, + ): + with pytest.raises(ValueError, match="must be a valid integer"): + get_worker_config()