Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .envrc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ export LAUNCHPAD_HOST="0.0.0.0"
export LAUNCHPAD_PORT="2218"
export LAUNCHPAD_RPC_SHARED_SECRET="launchpad-also-very-long-value-haha"
export SENTRY_BASE_URL="http://localhost:8000"

export LAUNCHPAD_WORKER_RPC_HOST="localhost:50051"
export LAUNCHPAD_WORKER_CONCURRENCY="16"

# STATSD_HOST=... # defaults to 127.0.0.1
# STATSD_PORT=... # defaults to 8125

Expand Down Expand Up @@ -38,6 +42,8 @@ if ! command -v "$DEVENV" >/dev/null; then
fi

PATH_add "${PWD}/.devenv/all/bin"
PATH_add "${PWD}/.devenv/bin"

case $(uname -s) in
Darwin) PATH_add "${PWD}/.devenv/aarch64-darwin/bin";;
*) PATH_add "${PWD}/.devenv/x86_64-linux/bin";;
Expand Down
8 changes: 5 additions & 3 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ launchpad size app.xcarchive.zip --skip-swift-metadata --skip-symbols # faster

# Service development
devservices up # Start Kafka infrastructure
make serve # Start Launchpad server
make serve # Start Launchpad server (Kafka mode: HTTP + Kafka consumer)
make worker # Start Launchpad worker (TaskWorker mode: no HTTP server)
```

## Architecture

### Core Components
- **CLI** (`src/launchpad/cli.py`): Main entry point, uses Click
- **Service** (`src/launchpad/service.py`): Kafka consumer + HTTP server for production
- **CLI** (`src/launchpad/cli.py`): Main entry point, uses Click. Two service commands: `serve` (Kafka mode) and `worker` (TaskWorker mode)
- **Service** (`src/launchpad/service.py`): Kafka consumer + HTTP server for production (`launchpad serve`)
- **Worker** (`src/launchpad/worker/`): TaskWorker mode — receives tasks via TaskBroker RPC, no HTTP server (`launchpad worker`)
- **Analyzers** (`src/launchpad/size/analyzers/`): Platform-specific analysis engines (AppleAppAnalyzer, AndroidAnalyzer)
- **Parsers** (`src/launchpad/parsers/`): Binary parsing - Mach-O via LIEF, custom DEX parsers
- **Insights** (`src/launchpad/size/insights/`): Optimization recommendations (image compression, symbol stripping, etc.)
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ serve: ## Start the Launchpad server with proper Kafka configuration
@echo "Starting Launchpad server..."
$(PYTHON_VENV) -m launchpad.cli serve --verbose

worker: ## Start the Launchpad TaskWorker (no HTTP server)
@echo "Starting Launchpad TaskWorker..."
$(PYTHON_VENV) -m launchpad.cli worker --verbose

test-kafka-message: ## Send a test message to Kafka (requires Kafka running)
$(PYTHON_VENV) scripts/test_kafka.py --count 1

Expand Down
43 changes: 37 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,43 @@ If you don't have devenv installed, [follow these instructions](https://github.c
# Start dependency containers (e.g. Kafka)
devservices up

# Begin listening for messages
launchpad serve
# Begin listening for messages (Kafka mode)
make serve

# Or run the TaskWorker instead
make worker

# Stop containers
devservices down
```

## Usage

Launchpad is primarily designed to run as a Kafka consumer alongside the [Sentry monolith](https://github.com/getsentry/sentry) codebase via `launchpad serve`.
Launchpad can run in two operational modes:

- **Kafka mode** (`launchpad serve`): HTTP server + Kafka consumer. This is the existing production mode that runs alongside the [Sentry monolith](https://github.com/getsentry/sentry).
- **TaskWorker mode** (`launchpad worker`): TaskWorker only, no HTTP server. This is a lighter-weight mode that receives work via the TaskBroker RPC interface instead of Kafka.

### Running in Kafka mode

```bash
devservices up
make serve
# or: launchpad serve --dev
```

### Running in TaskWorker mode

Requires `LAUNCHPAD_WORKER_RPC_HOST` and `LAUNCHPAD_WORKER_CONCURRENCY` environment variables (already configured in `.envrc`).

The TaskBroker handles task distribution and dispatches work to the worker via RPC. A single worker instance processes tasks in parallel — `LAUNCHPAD_WORKER_CONCURRENCY` controls how many child processes run simultaneously (e.g., 16 means up to 16 artifacts processed in parallel).

```bash
make worker
# or: launchpad worker -v
```

### One-off analysis

Alternatively for a one-off analysis, such as a local size analysis, you can invoke our various CLI subcommands.

Expand Down Expand Up @@ -94,14 +121,18 @@ devservices up --mode ingest
devservices serve --workers
```

Next run `launchpad` in another terminal:
Next run `launchpad` in another terminal using either mode:

```bash
# Kafka mode (HTTP server + Kafka consumer)
devservices up
launchpad serve
make serve

# TaskWorker mode (TaskWorker only, no HTTP server)
make worker
```

And finally use the `sentry-cli` to upload to your local machine:
And finally use the `sentry-cli` (version 3.0.1 or higher) to upload to your local machine:

```bash
sentry-cli --log-level DEBUG \
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ sentry-arroyo==2.34.0
sentry-kafka-schemas==2.1.2
sentry-sdk>=2.36.0
sortedcontainers>=2.4.0
taskbroker-client>=0.1.5
typing-extensions>=4.15.0
zipfile-zstd==0.0.4
29 changes: 29 additions & 0 deletions src/launchpad/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,35 @@ def serve(host: str, port: int, mode: str | None, verbose: bool) -> None:
raise click.Abort()


@cli.command()
@click.option("--verbose", "-v", is_flag=True, help="Enable verbose logging output.")
def worker(verbose: bool) -> None:
"""Start the Launchpad TaskWorker.

Runs the TaskWorker only, without an HTTP server.
Requires LAUNCHPAD_WORKER_RPC_HOST and LAUNCHPAD_WORKER_CONCURRENCY env vars.
"""
from .worker.config import run_worker

setup_logging(verbose=verbose, quiet=False)

console.print(f"[bold blue]Launchpad TaskWorker v{__version__}[/bold blue]")
console.print("Press Ctrl+C to stop the worker")
console.print()

try:
run_worker()
except KeyboardInterrupt:
console.print("\n[yellow]Worker stopped by user[/yellow]")
except SystemExit:
raise
except Exception as e:
console.print(f"[bold red]Worker error:[/bold red] {e}")
if verbose:
console.print_exception()
raise click.Abort()


cli.add_command(size_command)
cli.add_command(app_icon_command)
cli.add_command(distribution_command)
Expand Down
9 changes: 9 additions & 0 deletions src/launchpad/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import os

_TASKWORKER_ONLY_PROJECT_IDS: set[str] = {
p.strip() for p in os.getenv("PROJECT_IDS_TO_ONLY_TRY_TASKWORKER_PROCESSING", "").split(",") if p.strip()
}


def is_taskworker_only_project(project_id: str) -> bool:
return project_id in _TASKWORKER_ONLY_PROJECT_IDS
6 changes: 6 additions & 0 deletions src/launchpad/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from sentry_kafka_schemas import get_codec

from launchpad.artifact_processor import ArtifactProcessor
from launchpad.config import is_taskworker_only_project
from launchpad.constants import PREPROD_ARTIFACT_EVENTS_TOPIC
from launchpad.tracing import RequestLogFilter
from launchpad.utils.arroyo_metrics import DatadogMetricsBackend
Expand Down Expand Up @@ -93,6 +94,11 @@ def process_kafka_message_with_service(
raise

artifact_id = decoded.get("artifact_id", "unknown")
project_id = str(decoded.get("project_id", ""))

if is_taskworker_only_project(project_id):
logger.info("Skipping Kafka processing for project %s (taskworker-only)", project_id)
return None # type: ignore[return-value]

# Spawn actual processing in a subprocess
process = multiprocessing.Process(target=_process_in_subprocess, args=(decoded, log_queue))
Expand Down
37 changes: 17 additions & 20 deletions src/launchpad/utils/statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,22 @@ def timed(self, metric: str, tags: list[str] | None = None) -> Any:
yield


def create_dogstatsd_client(namespace: str) -> DogStatsd:
host = os.getenv("STATSD_HOST", "127.0.0.1")
port_str = os.getenv("STATSD_PORT", "8125")
try:
port = int(port_str)
except ValueError:
raise ValueError(f"STATSD_PORT must be a valid integer, got: {port_str}")
return DogStatsd(
host=host,
port=port,
namespace=namespace,
disable_telemetry=True,
origin_detection_enabled=False,
)


_namespace_to_statsd: dict[str, StatsdInterface] = {}


Expand All @@ -146,25 +162,6 @@ def get_statsd(namespace_suffix: Literal[None, "consumer"] = None) -> StatsdInte
if namespace in _namespace_to_statsd:
return _namespace_to_statsd[namespace]

disable_telemetry = True
origin_detection_enabled = False

host = os.getenv("STATSD_HOST", "127.0.0.1")
port_str = os.getenv("STATSD_PORT", "8125")

try:
port = int(port_str)
except ValueError:
raise ValueError(f"STATSD_PORT must be a valid integer, got: {port_str}")

wrapper = DogStatsdWrapper(
DogStatsd(
host=host,
port=port,
namespace=namespace,
disable_telemetry=disable_telemetry,
origin_detection_enabled=origin_detection_enabled,
)
)
wrapper = DogStatsdWrapper(create_dogstatsd_client(namespace))
_namespace_to_statsd[namespace] = wrapper
return wrapper
Empty file.
105 changes: 105 additions & 0 deletions src/launchpad/worker/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import os
import platform
import resource
import time

from collections.abc import Generator
from contextlib import contextmanager

from arroyo.backends.kafka import KafkaProducer
from taskbroker_client.app import TaskbrokerApp
from taskbroker_client.metrics import MetricsBackend, Tags
from taskbroker_client.router import TaskRouter

from launchpad.utils.statsd import create_dogstatsd_client

_RUSAGE_TO_BYTES = 1 if platform.system() == "Darwin" else 1024


def _convert_tags(tags: Tags | None) -> list[str] | None:
if tags is None:
return None
return [f"{k}:{v}" for k, v in tags.items()]


class TaskworkerMetricsBackend(MetricsBackend):
def __init__(self) -> None:
self._dogstatsd = create_dogstatsd_client("launchpad.taskworker")

def incr(
self,
name: str,
value: int | float = 1,
tags: Tags | None = None,
sample_rate: float | None = None,
) -> None:
kwargs: dict = {"tags": _convert_tags(tags)}
if sample_rate is not None:
kwargs["sample_rate"] = sample_rate
self._dogstatsd.increment(name, int(value), **kwargs)

def distribution(
self,
name: str,
value: int | float,
tags: Tags | None = None,
unit: str | None = None,
sample_rate: float | None = None,
) -> None:
kwargs: dict = {"tags": _convert_tags(tags)}
if sample_rate is not None:
kwargs["sample_rate"] = sample_rate
self._dogstatsd.distribution(name, value, **kwargs)

@contextmanager
def timer(
self,
key: str,
tags: Tags | None = None,
sample_rate: float | None = None,
stacklevel: int = 0,
) -> Generator[None]:
start = time.monotonic()
try:
yield
finally:
duration_ms = (time.monotonic() - start) * 1000
self.distribution(key, duration_ms, tags=tags, unit="millisecond", sample_rate=sample_rate)

@contextmanager
def track_memory_usage(
self,
key: str,
tags: Tags | None = None,
) -> Generator[None]:
before = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
try:
yield
finally:
after = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
self.distribution(key, (after - before) * _RUSAGE_TO_BYTES, tags=tags, unit="byte")


class CustomRouter(TaskRouter):
def route_namespace(self, name: str) -> str:
return "taskworker-launchpad"


def producer_factory(topic: str) -> KafkaProducer:
bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "127.0.0.1:9092")
config = {
"bootstrap.servers": bootstrap_servers,
"compression.type": "lz4",
"message.max.bytes": 50000000,
}
return KafkaProducer(config)


app = TaskbrokerApp(
name="launchpad",
producer_factory=producer_factory,
router_class=CustomRouter(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll want worker runtime metrics too. You'll need a taskbroker_client.metrics.MetricsBackend implementation for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright did a take to implement 🙏

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks ok to me. Does launchpad not have an existing metrics API that you could wrap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this is our current metrics implementation https://github.com/getsentry/launchpad/blob/main/src/launchpad/utils/statsd.py

went ahead and updated this branch to make sure we didn't duplicate logic

metrics_class=TaskworkerMetricsBackend(),
)

app.set_modules(["launchpad.worker.tasks"])
Loading
Loading