-
Notifications
You must be signed in to change notification settings - Fork 728
fix: bedrock throttling exception on maintainers processing [CM-740] #3913
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -1,8 +1,11 @@ | ||||||
| import asyncio | ||||||
| import json | ||||||
| import random | ||||||
| from typing import Generic, TypeVar | ||||||
|
|
||||||
| import aioboto3 | ||||||
| from botocore.config import Config | ||||||
| from botocore.exceptions import ClientError | ||||||
| from pydantic import BaseModel, ValidationError | ||||||
|
|
||||||
| from crowdgit.logger import logger | ||||||
|
|
@@ -20,6 +23,10 @@ class BedrockResponse(BaseModel, Generic[T]): | |||||
| cost: float | ||||||
|
|
||||||
|
|
||||||
| MAX_THROTTLE_RETRIES = 5 | ||||||
| THROTTLE_BASE_DELAY = 10 # seconds | ||||||
|
|
||||||
|
|
||||||
| async def invoke_bedrock( | ||||||
| instruction, pydantic_model: type[T], replacements=None, max_tokens=65000, temperature=0 | ||||||
| ) -> BedrockResponse[T]: | ||||||
|
|
@@ -71,41 +78,54 @@ async def invoke_bedrock( | |||||
| } | ||||||
| ) | ||||||
|
|
||||||
| modelId = "us.anthropic.claude-sonnet-4-20250514-v1:0" | ||||||
| accept = "application/json" | ||||||
| contentType = "application/json" | ||||||
|
|
||||||
| for attempt in range(1, MAX_THROTTLE_RETRIES + 1): | ||||||
| try: | ||||||
| response = await bedrock_client.invoke_model( | ||||||
| body=body, modelId=modelId, accept=accept, contentType=contentType | ||||||
| ) | ||||||
| break | ||||||
| except ClientError as e: | ||||||
| if ( | ||||||
| e.response["Error"]["Code"] == "ThrottlingException" | ||||||
| and attempt < MAX_THROTTLE_RETRIES | ||||||
| ): | ||||||
| delay = THROTTLE_BASE_DELAY * (2 ** (attempt - 1)) + random.uniform(0, 2) | ||||||
| logger.warning( | ||||||
| f"Bedrock ThrottlingException (attempt {attempt}/{MAX_THROTTLE_RETRIES}), " | ||||||
| f"retrying in {delay:.1f}s: {e}" | ||||||
| ) | ||||||
| await asyncio.sleep(delay) | ||||||
| else: | ||||||
| raise | ||||||
|
|
||||||
| try: | ||||||
| modelId = "us.anthropic.claude-sonnet-4-20250514-v1:0" | ||||||
| accept = "application/json" | ||||||
| contentType = "application/json" | ||||||
| response = await bedrock_client.invoke_model( | ||||||
| body=body, modelId=modelId, accept=accept, contentType=contentType | ||||||
| ) | ||||||
| body_bytes = await response["body"].read() | ||||||
| response_body = json.loads(body_bytes.decode("utf-8")) | ||||||
| raw_text = response_body["content"][0]["text"].replace('"""', "").strip() | ||||||
|
|
||||||
| # Expect pure JSON - no markdown handling | ||||||
| output = json.loads(raw_text) | ||||||
|
|
||||||
| # Calculate cost | ||||||
| input_tokens = response_body["usage"]["input_tokens"] | ||||||
| output_tokens = response_body["usage"]["output_tokens"] | ||||||
| input_cost = (input_tokens / 1000) * 0.003 | ||||||
| output_cost = (output_tokens / 1000) * 0.015 | ||||||
| total_cost = input_cost + output_cost | ||||||
|
|
||||||
| # Validate output with the provided model if it exists | ||||||
| try: | ||||||
| body_bytes = await response["body"].read() | ||||||
| response_body = json.loads(body_bytes.decode("utf-8")) | ||||||
| raw_text = response_body["content"][0]["text"].replace('"""', "").strip() | ||||||
|
|
||||||
| # Expect pure JSON - no markdown handling | ||||||
| output = json.loads(raw_text) | ||||||
|
|
||||||
| # Calculate cost | ||||||
| input_tokens = response_body["usage"]["input_tokens"] | ||||||
| output_tokens = response_body["usage"]["output_tokens"] | ||||||
| input_cost = (input_tokens / 1000) * 0.003 | ||||||
| output_cost = (output_tokens / 1000) * 0.015 | ||||||
| total_cost = input_cost + output_cost | ||||||
|
|
||||||
| # Validate output with the provided model if it exists | ||||||
| try: | ||||||
| validated_output = pydantic_model.model_validate(output, strict=True) | ||||||
| except ValidationError as ve: | ||||||
| logger.error(f"Output validation failed: {ve}") | ||||||
| raise ve | ||||||
|
|
||||||
| return BedrockResponse[T](output=validated_output, cost=total_cost) | ||||||
| except Exception as e: | ||||||
| logger.error("Failed to parse the response as JSON. Raw response:") | ||||||
| logger.error(response_body["content"][0]["text"]) | ||||||
| raise e | ||||||
| validated_output = pydantic_model.model_validate(output, strict=True) | ||||||
| except ValidationError as ve: | ||||||
| logger.error(f"Output validation failed: {ve}") | ||||||
| raise ve | ||||||
|
Comment on lines
+123
to
+125
|
||||||
|
|
||||||
| return BedrockResponse[T](output=validated_output, cost=total_cost) | ||||||
| except Exception as e: | ||||||
| logger.error(f"Amazon Bedrock API error: {e}") | ||||||
| logger.error("Failed to parse the response as JSON. Raw response:") | ||||||
| logger.error(response_body["content"][0]["text"]) | ||||||
| raise e | ||||||
|
||||||
| raise e | |
| raise |
Copilot
AI
Mar 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The outer except Exception will also catch ValidationError raised from the validation block and log it as a JSON parsing failure (and potentially log raw model text). Consider narrowing this handler to JSON/KeyError-related failures (or adding a dedicated except ValidationError: raise) so validation errors aren’t misclassified.
Copilot
AI
Mar 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The except block logs response_body["content"][0]["text"], but response_body may be undefined if JSON decoding fails (or if response["body"].read()/json.loads(...) raises). This can mask the original error with an UnboundLocalError. Consider initializing response_body/raw text to a safe default and logging body_bytes (decoded) when available, or using logger.exception without indexing into response_body in the failure path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This adds a custom retry loop on throttling, but the client
Configalready enables botocore retries (retries={"max_attempts": 3}), which can compound the total number of attempts and overall wait time. Consider consolidating retry behavior (either rely on botocore retries, or reduce/disable them when using this explicit backoff loop) to keep worst-case latency bounded and predictable.