Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
import json
import random
from typing import Generic, TypeVar

import aioboto3
from botocore.config import Config
from botocore.exceptions import ClientError
from pydantic import BaseModel, ValidationError

from crowdgit.logger import logger
Expand All @@ -20,6 +23,10 @@ class BedrockResponse(BaseModel, Generic[T]):
cost: float


MAX_THROTTLE_RETRIES = 5
THROTTLE_BASE_DELAY = 10 # seconds


async def invoke_bedrock(
instruction, pydantic_model: type[T], replacements=None, max_tokens=65000, temperature=0
) -> BedrockResponse[T]:
Expand Down Expand Up @@ -71,41 +78,54 @@ async def invoke_bedrock(
}
)

modelId = "us.anthropic.claude-sonnet-4-20250514-v1:0"
accept = "application/json"
contentType = "application/json"

for attempt in range(1, MAX_THROTTLE_RETRIES + 1):
try:
response = await bedrock_client.invoke_model(
body=body, modelId=modelId, accept=accept, contentType=contentType
)
break
Comment on lines +85 to +90
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds a custom retry loop on throttling, but the client Config already enables botocore retries (retries={"max_attempts": 3}), which can compound the total number of attempts and overall wait time. Consider consolidating retry behavior (either rely on botocore retries, or reduce/disable them when using this explicit backoff loop) to keep worst-case latency bounded and predictable.

Copilot uses AI. Check for mistakes.
except ClientError as e:
if (
e.response["Error"]["Code"] == "ThrottlingException"
and attempt < MAX_THROTTLE_RETRIES
):
delay = THROTTLE_BASE_DELAY * (2 ** (attempt - 1)) + random.uniform(0, 2)
logger.warning(
f"Bedrock ThrottlingException (attempt {attempt}/{MAX_THROTTLE_RETRIES}), "
f"retrying in {delay:.1f}s: {e}"
)
await asyncio.sleep(delay)
else:
raise

try:
modelId = "us.anthropic.claude-sonnet-4-20250514-v1:0"
accept = "application/json"
contentType = "application/json"
response = await bedrock_client.invoke_model(
body=body, modelId=modelId, accept=accept, contentType=contentType
)
body_bytes = await response["body"].read()
response_body = json.loads(body_bytes.decode("utf-8"))
raw_text = response_body["content"][0]["text"].replace('"""', "").strip()

# Expect pure JSON - no markdown handling
output = json.loads(raw_text)

# Calculate cost
input_tokens = response_body["usage"]["input_tokens"]
output_tokens = response_body["usage"]["output_tokens"]
input_cost = (input_tokens / 1000) * 0.003
output_cost = (output_tokens / 1000) * 0.015
total_cost = input_cost + output_cost

# Validate output with the provided model if it exists
try:
body_bytes = await response["body"].read()
response_body = json.loads(body_bytes.decode("utf-8"))
raw_text = response_body["content"][0]["text"].replace('"""', "").strip()

# Expect pure JSON - no markdown handling
output = json.loads(raw_text)

# Calculate cost
input_tokens = response_body["usage"]["input_tokens"]
output_tokens = response_body["usage"]["output_tokens"]
input_cost = (input_tokens / 1000) * 0.003
output_cost = (output_tokens / 1000) * 0.015
total_cost = input_cost + output_cost

# Validate output with the provided model if it exists
try:
validated_output = pydantic_model.model_validate(output, strict=True)
except ValidationError as ve:
logger.error(f"Output validation failed: {ve}")
raise ve

return BedrockResponse[T](output=validated_output, cost=total_cost)
except Exception as e:
logger.error("Failed to parse the response as JSON. Raw response:")
logger.error(response_body["content"][0]["text"])
raise e
validated_output = pydantic_model.model_validate(output, strict=True)
except ValidationError as ve:
logger.error(f"Output validation failed: {ve}")
raise ve
Comment on lines +123 to +125
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raise ve resets the traceback/context. If the goal is just to rethrow after logging, use a bare raise to preserve the original stack trace.

Copilot uses AI. Check for mistakes.

return BedrockResponse[T](output=validated_output, cost=total_cost)
except Exception as e:
logger.error(f"Amazon Bedrock API error: {e}")
logger.error("Failed to parse the response as JSON. Raw response:")
logger.error(response_body["content"][0]["text"])
raise e
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raise e resets the traceback and makes debugging harder. Use a bare raise here so the original exception context is preserved.

Suggested change
raise e
raise

Copilot uses AI. Check for mistakes.
Comment on lines +120 to 131
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outer except Exception will also catch ValidationError raised from the validation block and log it as a JSON parsing failure (and potentially log raw model text). Consider narrowing this handler to JSON/KeyError-related failures (or adding a dedicated except ValidationError: raise) so validation errors aren’t misclassified.

Copilot uses AI. Check for mistakes.
Comment on lines 128 to 131
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The except block logs response_body["content"][0]["text"], but response_body may be undefined if JSON decoding fails (or if response["body"].read()/json.loads(...) raises). This can mask the original error with an UnboundLocalError. Consider initializing response_body/raw text to a safe default and logging body_bytes (decoded) when available, or using logger.exception without indexing into response_body in the failure path.

Copilot uses AI. Check for mistakes.
Loading