diff --git a/requirements-dev.txt b/requirements-dev.txt index d27e0ef2..d09d5e21 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -24,4 +24,4 @@ ty==0.0.1a20 # Testing web endpoints sortedcontainers-stubs>=2.4.0 kafka-python>=2.0.0 -sentry-protos==0.2.0 +sentry-protos>=0.4.11 diff --git a/src/launchpad/artifact_processor.py b/src/launchpad/artifact_processor.py index 3a1e2384..8185a8c2 100644 --- a/src/launchpad/artifact_processor.py +++ b/src/launchpad/artifact_processor.py @@ -7,7 +7,7 @@ from datetime import datetime, timedelta from pathlib import Path -from typing import Any, Dict, Iterator, cast +from typing import Any, Iterator, cast import sentry_sdk @@ -18,9 +18,6 @@ Usecase, ) from objectstore_client.metadata import TimeToLive -from sentry_kafka_schemas.schema_types.preprod_artifact_events_v1 import ( - PreprodArtifactEvents, -) from launchpad.api.update_api_models import AndroidAppInfo as AndroidAppInfoModel from launchpad.api.update_api_models import AppleAppInfo as AppleAppInfoModel @@ -67,7 +64,9 @@ def __init__( @staticmethod def process_message( - payload: PreprodArtifactEvents, + artifact_id: str, + project_id: str, + organization_id: str, service_config=None, artifact_processor=None, statsd=None, @@ -85,10 +84,6 @@ def process_message( initialize_sentry_sdk() - organization_id = payload["organization_id"] - project_id = payload["project_id"] - artifact_id = payload["artifact_id"] - if statsd is None: statsd = get_statsd() if artifact_processor is None: @@ -144,7 +139,7 @@ def process_artifact( project_id: str, artifact_id: str, ) -> str: - """Process an artifact with the requested features. Returns the artifact type string.""" + """Process an artifact and return the artifact type string.""" dequeued_at = datetime.now() with contextlib.ExitStack() as stack: @@ -476,7 +471,7 @@ def _prepare_update_data( artifact: Artifact, dequeued_at: datetime, app_icon_id: str | None, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: build_number = int(app_info.build) if app_info.build.isdigit() else None apple_app_info = None @@ -545,7 +540,7 @@ def _upload_results( ProcessingErrorMessage.UPLOAD_FAILED, e.user_facing_message(), ) - raise e + raise else: logger.info(f"Successfully uploaded analysis results for artifact {artifact_id}") @@ -565,13 +560,7 @@ def _guess_message(code: ProcessingErrorCode, e: Exception) -> ProcessingErrorMe if code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR: if isinstance(e, NotImplementedError): return ProcessingErrorMessage.UNSUPPORTED_ARTIFACT_TYPE - - # If we can't guess from the exception but the code is set to - # something useful return the matching message. - if code == ProcessingErrorCode.ARTIFACT_PROCESSING_ERROR: return ProcessingErrorMessage.ARTIFACT_PARSING_FAILED elif code == ProcessingErrorCode.ARTIFACT_PROCESSING_TIMEOUT: return ProcessingErrorMessage.PROCESSING_TIMEOUT - else: - # If all else fails return unknown - return ProcessingErrorMessage.UNKNOWN_ERROR + return ProcessingErrorMessage.UNKNOWN_ERROR diff --git a/src/launchpad/kafka.py b/src/launchpad/kafka.py index 6dfec9bf..0678c753 100644 --- a/src/launchpad/kafka.py +++ b/src/launchpad/kafka.py @@ -49,7 +49,11 @@ def _process_in_subprocess(decoded_message: Any, log_queue: multiprocessing.Queu root_logger.setLevel(logging.DEBUG) try: - ArtifactProcessor.process_message(decoded_message) + ArtifactProcessor.process_message( + artifact_id=decoded_message["artifact_id"], + project_id=decoded_message["project_id"], + organization_id=decoded_message["organization_id"], + ) except Exception: logger.exception("Error processing message in subprocess") sys.exit(1) diff --git a/tests/integration/test_kafka_service.py b/tests/integration/test_kafka_service.py index 706a9e56..2084c24e 100644 --- a/tests/integration/test_kafka_service.py +++ b/tests/integration/test_kafka_service.py @@ -179,15 +179,14 @@ def test_process_message_with_skipped_project(self): objectstore_url="http://test.objectstore.io", ) - test_message = { - "artifact_id": "test-123", - "project_id": "skip-project", - "organization_id": "test-org", - "requested_features": ["size_analysis"], - } - with patch.object(ArtifactProcessor, "process_artifact") as mock_process: - ArtifactProcessor.process_message(test_message, service_config=service_config, statsd=fake_statsd) + ArtifactProcessor.process_message( + artifact_id="test-123", + project_id="skip-project", + organization_id="test-org", + service_config=service_config, + statsd=fake_statsd, + ) mock_process.assert_not_called() def test_process_message_with_allowed_project(self): @@ -200,15 +199,14 @@ def test_process_message_with_allowed_project(self): objectstore_url="http://test.objectstore.io", ) - test_message = { - "artifact_id": "test-123", - "project_id": "normal-project", - "organization_id": "test-org", - "requested_features": ["size_analysis"], - } - with patch.object(ArtifactProcessor, "process_artifact") as mock_process: - ArtifactProcessor.process_message(test_message, service_config=service_config, statsd=fake_statsd) + ArtifactProcessor.process_message( + artifact_id="test-123", + project_id="normal-project", + organization_id="test-org", + service_config=service_config, + statsd=fake_statsd, + ) mock_process.assert_called_once_with( "test-org", @@ -230,15 +228,14 @@ def test_process_message_error_handling(self): objectstore_url="http://test.objectstore.io", ) - test_message = { - "artifact_id": "test-123", - "project_id": "test-project", - "organization_id": "test-org", - "requested_features": ["size_analysis"], - } - with patch.object(ArtifactProcessor, "process_artifact", side_effect=RuntimeError("Test error")): - ArtifactProcessor.process_message(test_message, service_config=service_config, statsd=fake_statsd) + ArtifactProcessor.process_message( + artifact_id="test-123", + project_id="test-project", + organization_id="test-org", + service_config=service_config, + statsd=fake_statsd, + ) calls = fake_statsd.calls increment_calls = [call for call in calls if call[0] == "increment"] diff --git a/tests/unit/artifacts/test_artifact_processor.py b/tests/unit/artifacts/test_artifact_processor.py index bdd18b2e..e5c75779 100644 --- a/tests/unit/artifacts/test_artifact_processor.py +++ b/tests/unit/artifacts/test_artifact_processor.py @@ -1,9 +1,6 @@ from unittest.mock import Mock, patch from objectstore_client import Client as ObjectstoreClient -from sentry_kafka_schemas.schema_types.preprod_artifact_events_v1 import ( - PreprodArtifactEvents, -) from launchpad.artifact_processor import ArtifactProcessor from launchpad.constants import ( @@ -152,18 +149,14 @@ def test_process_message_ios(self, mock_process, mock_sentry_client): objectstore_url="http://test.objectstore.io", ) - # Create a payload for iOS artifact - payload: PreprodArtifactEvents = { - "artifact_id": "ios-test-123", - "project_id": "test-project-ios", - "organization_id": "test-org-123", - "requested_features": ["size_analysis"], - } - - # Call the static method - ArtifactProcessor.process_message(payload, service_config=service_config, statsd=fake_statsd) + ArtifactProcessor.process_message( + artifact_id="ios-test-123", + project_id="test-project-ios", + organization_id="test-org-123", + service_config=service_config, + statsd=fake_statsd, + ) - # Verify process_artifact was called with correct args mock_process.assert_called_once_with( "test-org-123", "test-project-ios", @@ -192,18 +185,14 @@ def test_process_message_android(self, mock_process, mock_sentry_client): objectstore_url="http://test.objectstore.io", ) - # Create a payload for Android artifact - payload: PreprodArtifactEvents = { - "artifact_id": "android-test-456", - "project_id": "test-project-android", - "organization_id": "test-org-456", - "requested_features": ["size_analysis", "build_distribution"], - } - - # Call the static method - ArtifactProcessor.process_message(payload, service_config=service_config, statsd=fake_statsd) + ArtifactProcessor.process_message( + artifact_id="android-test-456", + project_id="test-project-android", + organization_id="test-org-456", + service_config=service_config, + statsd=fake_statsd, + ) - # Verify process_artifact was called with correct args mock_process.assert_called_once_with( "test-org-456", "test-project-android", @@ -232,21 +221,16 @@ def test_process_message_error(self, mock_process, mock_sentry_client): objectstore_url="http://test.objectstore.io", ) - # Make process_artifact raise an exception mock_process.side_effect = RuntimeError("Download failed: HTTP 404") - # Create a valid payload - payload: PreprodArtifactEvents = { - "artifact_id": "test-123", - "project_id": "test-project", - "organization_id": "test-org", - "requested_features": ["size_analysis", "build_distribution"], - } - - # This should not raise (error handling catches all exceptions) - ArtifactProcessor.process_message(payload, service_config=service_config, statsd=fake_statsd) + ArtifactProcessor.process_message( + artifact_id="test-123", + project_id="test-project", + organization_id="test-org", + service_config=service_config, + statsd=fake_statsd, + ) - # Verify process_artifact was called mock_process.assert_called_once_with( "test-org", "test-project", @@ -271,21 +255,16 @@ def test_process_message_project_skipped(self, mock_process, mock_sentry_client) objectstore_url="http://test.objectstore.io", ) - # Create a payload for a project that should be skipped - payload: PreprodArtifactEvents = { - "artifact_id": "skip-test-123", - "project_id": "skip-project-1", - "organization_id": "test-org-123", - "requested_features": ["size_analysis", "build_distribution"], - } - - # process_message should return early and not process - ArtifactProcessor.process_message(payload, service_config=service_config, statsd=fake_statsd) + ArtifactProcessor.process_message( + artifact_id="skip-test-123", + project_id="skip-project-1", + organization_id="test-org-123", + service_config=service_config, + statsd=fake_statsd, + ) - # Verify process_artifact was NOT called mock_process.assert_not_called() - # Verify no metrics were recorded (since processing was skipped entirely) calls = fake_statsd.calls assert len(calls) == 0 @@ -300,18 +279,14 @@ def test_process_message_project_not_skipped(self, mock_process, mock_sentry_cli objectstore_url="http://test.objectstore.io", ) - # Create a payload for a project that should NOT be skipped - payload: PreprodArtifactEvents = { - "artifact_id": "normal-test-123", - "project_id": "normal-project", - "organization_id": "test-org-123", - "requested_features": ["size_analysis", "build_distribution"], - } - - # process_message should process normally - ArtifactProcessor.process_message(payload, service_config=service_config, statsd=fake_statsd) + ArtifactProcessor.process_message( + artifact_id="normal-test-123", + project_id="normal-project", + organization_id="test-org-123", + service_config=service_config, + statsd=fake_statsd, + ) - # Verify process_artifact was called mock_process.assert_called_once_with( "test-org-123", "normal-project",