Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ ty==0.0.1a20
# Testing web endpoints
sortedcontainers-stubs>=2.4.0
kafka-python>=2.0.0
sentry-protos==0.2.0
sentry-protos>=0.4.11
27 changes: 8 additions & 19 deletions src/launchpad/artifact_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, Iterator, cast
from typing import Any, Iterator, cast

import sentry_sdk

Expand All @@ -18,9 +18,6 @@
Usecase,
)
from objectstore_client.metadata import TimeToLive
from sentry_kafka_schemas.schema_types.preprod_artifact_events_v1 import (
PreprodArtifactEvents,
)

from launchpad.api.update_api_models import AndroidAppInfo as AndroidAppInfoModel
from launchpad.api.update_api_models import AppleAppInfo as AppleAppInfoModel
Expand Down Expand Up @@ -67,7 +64,9 @@ def __init__(

@staticmethod
def process_message(
payload: PreprodArtifactEvents,
artifact_id: str,
project_id: str,
organization_id: str,
service_config=None,
artifact_processor=None,
statsd=None,
Expand All @@ -85,10 +84,6 @@ def process_message(

initialize_sentry_sdk()

organization_id = payload["organization_id"]
project_id = payload["project_id"]
artifact_id = payload["artifact_id"]

if statsd is None:
statsd = get_statsd()
if artifact_processor is None:
Expand Down Expand Up @@ -144,7 +139,7 @@ def process_artifact(
project_id: str,
artifact_id: str,
) -> str:
"""Process an artifact with the requested features. Returns the artifact type string."""
"""Process an artifact and return the artifact type string."""
dequeued_at = datetime.now()

with contextlib.ExitStack() as stack:
Expand Down Expand Up @@ -476,7 +471,7 @@ def _prepare_update_data(
artifact: Artifact,
dequeued_at: datetime,
app_icon_id: str | None,
) -> Dict[str, Any]:
) -> dict[str, Any]:
build_number = int(app_info.build) if app_info.build.isdigit() else None

apple_app_info = None
Expand Down Expand Up @@ -545,7 +540,7 @@ def _upload_results(
ProcessingErrorMessage.UPLOAD_FAILED,
e.user_facing_message(),
)
raise e
raise
else:
logger.info(f"Successfully uploaded analysis results for artifact {artifact_id}")

Expand All @@ -565,13 +560,7 @@ def _guess_message(code: ProcessingErrorCode, e: Exception) -> ProcessingErrorMe
if code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR:
if isinstance(e, NotImplementedError):
return ProcessingErrorMessage.UNSUPPORTED_ARTIFACT_TYPE

# If we can't guess from the exception but the code is set to
# something useful return the matching message.
if code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR:
return ProcessingErrorMessage.ARTIFACT_PARSING_FAILED
elif code == ProcessingErrorCode.ARTIFACT_PROCESSING_TIMEOUT:
return ProcessingErrorMessage.PROCESSING_TIMEOUT
else:
# If all else fails return unknown
return ProcessingErrorMessage.UNKNOWN_ERROR
return ProcessingErrorMessage.UNKNOWN_ERROR
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor cleanup

6 changes: 5 additions & 1 deletion src/launchpad/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ def _process_in_subprocess(decoded_message: Any, log_queue: multiprocessing.Queu
root_logger.setLevel(logging.DEBUG)

try:
ArtifactProcessor.process_message(decoded_message)
ArtifactProcessor.process_message(
artifact_id=decoded_message["artifact_id"],
project_id=decoded_message["project_id"],
organization_id=decoded_message["organization_id"],
)
except Exception:
logger.exception("Error processing message in subprocess")
sys.exit(1)
Expand Down
45 changes: 21 additions & 24 deletions tests/integration/test_kafka_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,14 @@ def test_process_message_with_skipped_project(self):
objectstore_url="http://test.objectstore.io",
)

test_message = {
"artifact_id": "test-123",
"project_id": "skip-project",
"organization_id": "test-org",
"requested_features": ["size_analysis"],
}

with patch.object(ArtifactProcessor, "process_artifact") as mock_process:
ArtifactProcessor.process_message(test_message, service_config=service_config, statsd=fake_statsd)
ArtifactProcessor.process_message(
artifact_id="test-123",
project_id="skip-project",
organization_id="test-org",
service_config=service_config,
statsd=fake_statsd,
)
mock_process.assert_not_called()

def test_process_message_with_allowed_project(self):
Expand All @@ -200,15 +199,14 @@ def test_process_message_with_allowed_project(self):
objectstore_url="http://test.objectstore.io",
)

test_message = {
"artifact_id": "test-123",
"project_id": "normal-project",
"organization_id": "test-org",
"requested_features": ["size_analysis"],
}

with patch.object(ArtifactProcessor, "process_artifact") as mock_process:
ArtifactProcessor.process_message(test_message, service_config=service_config, statsd=fake_statsd)
ArtifactProcessor.process_message(
artifact_id="test-123",
project_id="normal-project",
organization_id="test-org",
service_config=service_config,
statsd=fake_statsd,
)

mock_process.assert_called_once_with(
"test-org",
Expand All @@ -230,15 +228,14 @@ def test_process_message_error_handling(self):
objectstore_url="http://test.objectstore.io",
)

test_message = {
"artifact_id": "test-123",
"project_id": "test-project",
"organization_id": "test-org",
"requested_features": ["size_analysis"],
}

with patch.object(ArtifactProcessor, "process_artifact", side_effect=RuntimeError("Test error")):
ArtifactProcessor.process_message(test_message, service_config=service_config, statsd=fake_statsd)
ArtifactProcessor.process_message(
artifact_id="test-123",
project_id="test-project",
organization_id="test-org",
service_config=service_config,
statsd=fake_statsd,
)

calls = fake_statsd.calls
increment_calls = [call for call in calls if call[0] == "increment"]
Expand Down
95 changes: 35 additions & 60 deletions tests/unit/artifacts/test_artifact_processor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from unittest.mock import Mock, patch

from objectstore_client import Client as ObjectstoreClient
from sentry_kafka_schemas.schema_types.preprod_artifact_events_v1 import (
PreprodArtifactEvents,
)

from launchpad.artifact_processor import ArtifactProcessor
from launchpad.constants import (
Expand Down Expand Up @@ -152,18 +149,14 @@ def test_process_message_ios(self, mock_process, mock_sentry_client):
objectstore_url="http://test.objectstore.io",
)

# Create a payload for iOS artifact
payload: PreprodArtifactEvents = {
"artifact_id": "ios-test-123",
"project_id": "test-project-ios",
"organization_id": "test-org-123",
"requested_features": ["size_analysis"],
}

# Call the static method
ArtifactProcessor.process_message(payload, service_config=service_config, statsd=fake_statsd)
ArtifactProcessor.process_message(
artifact_id="ios-test-123",
project_id="test-project-ios",
organization_id="test-org-123",
service_config=service_config,
statsd=fake_statsd,
)

# Verify process_artifact was called with correct args
mock_process.assert_called_once_with(
"test-org-123",
"test-project-ios",
Expand Down Expand Up @@ -192,18 +185,14 @@ def test_process_message_android(self, mock_process, mock_sentry_client):
objectstore_url="http://test.objectstore.io",
)

# Create a payload for Android artifact
payload: PreprodArtifactEvents = {
"artifact_id": "android-test-456",
"project_id": "test-project-android",
"organization_id": "test-org-456",
"requested_features": ["size_analysis", "build_distribution"],
}

# Call the static method
ArtifactProcessor.process_message(payload, service_config=service_config, statsd=fake_statsd)
ArtifactProcessor.process_message(
artifact_id="android-test-456",
project_id="test-project-android",
organization_id="test-org-456",
service_config=service_config,
statsd=fake_statsd,
)

# Verify process_artifact was called with correct args
mock_process.assert_called_once_with(
"test-org-456",
"test-project-android",
Expand Down Expand Up @@ -232,21 +221,16 @@ def test_process_message_error(self, mock_process, mock_sentry_client):
objectstore_url="http://test.objectstore.io",
)

# Make process_artifact raise an exception
mock_process.side_effect = RuntimeError("Download failed: HTTP 404")

# Create a valid payload
payload: PreprodArtifactEvents = {
"artifact_id": "test-123",
"project_id": "test-project",
"organization_id": "test-org",
"requested_features": ["size_analysis", "build_distribution"],
}

# This should not raise (error handling catches all exceptions)
ArtifactProcessor.process_message(payload, service_config=service_config, statsd=fake_statsd)
ArtifactProcessor.process_message(
artifact_id="test-123",
project_id="test-project",
organization_id="test-org",
service_config=service_config,
statsd=fake_statsd,
)

# Verify process_artifact was called
mock_process.assert_called_once_with(
"test-org",
"test-project",
Expand All @@ -271,21 +255,16 @@ def test_process_message_project_skipped(self, mock_process, mock_sentry_client)
objectstore_url="http://test.objectstore.io",
)

# Create a payload for a project that should be skipped
payload: PreprodArtifactEvents = {
"artifact_id": "skip-test-123",
"project_id": "skip-project-1",
"organization_id": "test-org-123",
"requested_features": ["size_analysis", "build_distribution"],
}

# process_message should return early and not process
ArtifactProcessor.process_message(payload, service_config=service_config, statsd=fake_statsd)
ArtifactProcessor.process_message(
artifact_id="skip-test-123",
project_id="skip-project-1",
organization_id="test-org-123",
service_config=service_config,
statsd=fake_statsd,
)

# Verify process_artifact was NOT called
mock_process.assert_not_called()

# Verify no metrics were recorded (since processing was skipped entirely)
calls = fake_statsd.calls
assert len(calls) == 0

Expand All @@ -300,18 +279,14 @@ def test_process_message_project_not_skipped(self, mock_process, mock_sentry_cli
objectstore_url="http://test.objectstore.io",
)

# Create a payload for a project that should NOT be skipped
payload: PreprodArtifactEvents = {
"artifact_id": "normal-test-123",
"project_id": "normal-project",
"organization_id": "test-org-123",
"requested_features": ["size_analysis", "build_distribution"],
}

# process_message should process normally
ArtifactProcessor.process_message(payload, service_config=service_config, statsd=fake_statsd)
ArtifactProcessor.process_message(
artifact_id="normal-test-123",
project_id="normal-project",
organization_id="test-org-123",
service_config=service_config,
statsd=fake_statsd,
)

# Verify process_artifact was called
mock_process.assert_called_once_with(
"test-org-123",
"normal-project",
Expand Down
Loading