diff --git a/services/apps/git_integration/src/crowdgit/services/maintainer/bedrock.py b/services/apps/git_integration/src/crowdgit/services/maintainer/bedrock.py index 4e47f6b8ee..7388e294ad 100644 --- a/services/apps/git_integration/src/crowdgit/services/maintainer/bedrock.py +++ b/services/apps/git_integration/src/crowdgit/services/maintainer/bedrock.py @@ -1,8 +1,11 @@ +import asyncio import json +import random from typing import Generic, TypeVar import aioboto3 from botocore.config import Config +from botocore.exceptions import ClientError from pydantic import BaseModel, ValidationError from crowdgit.logger import logger @@ -20,6 +23,10 @@ class BedrockResponse(BaseModel, Generic[T]): cost: float +MAX_THROTTLE_RETRIES = 5 +THROTTLE_BASE_DELAY = 10 # seconds + + async def invoke_bedrock( instruction, pydantic_model: type[T], replacements=None, max_tokens=65000, temperature=0 ) -> BedrockResponse[T]: @@ -71,41 +78,54 @@ async def invoke_bedrock( } ) + modelId = "us.anthropic.claude-sonnet-4-20250514-v1:0" + accept = "application/json" + contentType = "application/json" + + for attempt in range(1, MAX_THROTTLE_RETRIES + 1): + try: + response = await bedrock_client.invoke_model( + body=body, modelId=modelId, accept=accept, contentType=contentType + ) + break + except ClientError as e: + if ( + e.response["Error"]["Code"] == "ThrottlingException" + and attempt < MAX_THROTTLE_RETRIES + ): + delay = THROTTLE_BASE_DELAY * (2 ** (attempt - 1)) + random.uniform(0, 2) + logger.warning( + f"Bedrock ThrottlingException (attempt {attempt}/{MAX_THROTTLE_RETRIES}), " + f"retrying in {delay:.1f}s: {e}" + ) + await asyncio.sleep(delay) + else: + raise + try: - modelId = "us.anthropic.claude-sonnet-4-20250514-v1:0" - accept = "application/json" - contentType = "application/json" - response = await bedrock_client.invoke_model( - body=body, modelId=modelId, accept=accept, contentType=contentType - ) + body_bytes = await response["body"].read() + response_body = json.loads(body_bytes.decode("utf-8")) + raw_text = response_body["content"][0]["text"].replace('"""', "").strip() + + # Expect pure JSON - no markdown handling + output = json.loads(raw_text) + # Calculate cost + input_tokens = response_body["usage"]["input_tokens"] + output_tokens = response_body["usage"]["output_tokens"] + input_cost = (input_tokens / 1000) * 0.003 + output_cost = (output_tokens / 1000) * 0.015 + total_cost = input_cost + output_cost + + # Validate output with the provided model if it exists try: - body_bytes = await response["body"].read() - response_body = json.loads(body_bytes.decode("utf-8")) - raw_text = response_body["content"][0]["text"].replace('"""', "").strip() - - # Expect pure JSON - no markdown handling - output = json.loads(raw_text) - - # Calculate cost - input_tokens = response_body["usage"]["input_tokens"] - output_tokens = response_body["usage"]["output_tokens"] - input_cost = (input_tokens / 1000) * 0.003 - output_cost = (output_tokens / 1000) * 0.015 - total_cost = input_cost + output_cost - - # Validate output with the provided model if it exists - try: - validated_output = pydantic_model.model_validate(output, strict=True) - except ValidationError as ve: - logger.error(f"Output validation failed: {ve}") - raise ve - - return BedrockResponse[T](output=validated_output, cost=total_cost) - except Exception as e: - logger.error("Failed to parse the response as JSON. Raw response:") - logger.error(response_body["content"][0]["text"]) - raise e + validated_output = pydantic_model.model_validate(output, strict=True) + except ValidationError as ve: + logger.error(f"Output validation failed: {ve}") + raise ve + + return BedrockResponse[T](output=validated_output, cost=total_cost) except Exception as e: - logger.error(f"Amazon Bedrock API error: {e}") + logger.error("Failed to parse the response as JSON. Raw response:") + logger.error(response_body["content"][0]["text"]) raise e