Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions awslambdaric/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,31 @@
Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
"""

import os
import sys

from .lambda_config import LambdaConfigProvider
from .lambda_runtime_client import LambdaRuntimeClient
from .lambda_multi_concurrent_utils import MultiConcurrentRunner
from . import bootstrap


def main(args):
app_root = os.getcwd()

try:
handler = args[1]
except IndexError:
raise ValueError("Handler not set")

lambda_runtime_api_addr = os.environ["AWS_LAMBDA_RUNTIME_API"]

bootstrap.run(app_root, handler, lambda_runtime_api_addr)
config = LambdaConfigProvider(args)
handler = config.handler
api_addr = config.api_address
use_thread = config.use_thread_polling

if config.is_multi_concurrent:
# Multi-concurrent mode: redirect fork, stdout/stderr and run
max_conc = int(config.max_concurrency)
socket_path = config.lmi_socket_path
MultiConcurrentRunner.run_concurrent(
handler, api_addr, use_thread, socket_path, max_conc
)
else:
# Standard Lambda mode: single call
client = LambdaRuntimeClient(api_addr, use_thread)
bootstrap.run(handler, client)


if __name__ == "__main__":
Expand Down
12 changes: 2 additions & 10 deletions awslambdaric/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

def _get_handler(handler):
try:
(modname, fname) = handler.rsplit(".", 1)
modname, fname = handler.rsplit(".", 1)
except ValueError as e:
raise FaultException(
FaultException.MALFORMED_HANDLER_NAME,
Expand Down Expand Up @@ -477,19 +477,11 @@ def _setup_logging(log_format, log_level, log_sink):
logger.addHandler(logger_handler)


def run(app_root, handler, lambda_runtime_api_addr):
def run(handler, lambda_runtime_client):
sys.stdout = Unbuffered(sys.stdout)
sys.stderr = Unbuffered(sys.stderr)

use_thread_for_polling_next = os.environ.get("AWS_EXECUTION_ENV") in {
"AWS_Lambda_python3.12",
"AWS_Lambda_python3.13",
}

with create_log_sink() as log_sink:
lambda_runtime_client = LambdaRuntimeClient(
lambda_runtime_api_addr, use_thread_for_polling_next
)
error_result = None

try:
Expand Down
70 changes: 70 additions & 0 deletions awslambdaric/lambda_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""
Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
"""

import os


class LambdaConfigProvider:
SUPPORTED_THREADPOLLING_ENVS = {
"AWS_Lambda_python3.12",
"AWS_Lambda_python3.13",
"AWS_Lambda_python3.14",
}
SOCKET_PATH_ENV = "_LAMBDA_TELEMETRY_LOG_FD_PROVIDER_SOCKET"
AWS_LAMBDA_RUNTIME_API = "AWS_LAMBDA_RUNTIME_API"
AWS_LAMBDA_MAX_CONCURRENCY = "AWS_LAMBDA_MAX_CONCURRENCY"
AWS_EXECUTION_ENV = "AWS_EXECUTION_ENV"

def __init__(self, args, environ=None):
self._environ = environ if environ is not None else os.environ
self._handler = self._parse_handler(args)
self._api_address = self._parse_api_address()
self._max_concurrency = self._parse_concurrency()
self._use_thread_polling = self._parse_thread_polling()
self._lmi_socket_path = self._parse_lmi_socket_path()

def _parse_handler(self, args):
try:
return args[1]
except IndexError:
raise ValueError("Handler not set")

def _parse_api_address(self):
return self._environ[self.AWS_LAMBDA_RUNTIME_API]

def _parse_concurrency(self):
return self._environ.get(self.AWS_LAMBDA_MAX_CONCURRENCY)

def _parse_thread_polling(self):
return (
self._environ.get(self.AWS_EXECUTION_ENV)
in self.SUPPORTED_THREADPOLLING_ENVS
)

def _parse_lmi_socket_path(self):
return self._environ.get(self.SOCKET_PATH_ENV)

@property
def handler(self):
return self._handler

@property
def api_address(self):
return self._api_address

@property
def max_concurrency(self):
return self._max_concurrency

@property
def use_thread_polling(self):
return self._use_thread_polling

@property
def is_multi_concurrent(self):
return self._max_concurrency is not None

@property
def lmi_socket_path(self):
return self._lmi_socket_path
53 changes: 53 additions & 0 deletions awslambdaric/lambda_multi_concurrent_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Copyright 2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
"""

import os
import sys
import socket
import multiprocessing

from . import bootstrap
from .lambda_runtime_client import LambdaMultiConcurrentRuntimeClient


class MultiConcurrentRunner:
@staticmethod
def _redirect_stream_to_fd(stream_fd: int, socket_path: str):
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.connect(socket_path)
os.dup2(s.fileno(), stream_fd)

@classmethod
def _redirect_output(cls, socket_path: str):
for std_fd in (sys.stdout.fileno(), sys.stderr.fileno()):
cls._redirect_stream_to_fd(std_fd, socket_path)

@classmethod
def run_single(
cls, handler: str, api_addr: str, use_thread: bool, socket_path: str
):
if socket_path:
cls._redirect_output(socket_path)
client = LambdaMultiConcurrentRuntimeClient(api_addr, use_thread)
bootstrap.run(handler, client)

@classmethod
def run_concurrent(
cls,
handler: str,
api_addr: str,
use_thread: bool,
socket_path: str,
max_concurrency: int,
):
processes = []
for _ in range(max_concurrency):
p = multiprocessing.Process(
target=cls.run_single,
args=(handler, api_addr, use_thread, socket_path),
)
p.start()
processes.append(p)
for p in processes:
p.join()
112 changes: 94 additions & 18 deletions awslambdaric/lambda_runtime_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@
from awslambdaric import __version__
from .lambda_runtime_exception import FaultException
from .lambda_runtime_marshaller import to_json
import logging
import time

ERROR_TYPE_HEADER = "Lambda-Runtime-Function-Error-Type"
# Retry config constants
DEFAULT_RETRY_MAX_ATTEMPTS = 5
DEFAULT_RETRY_INITIAL_DELAY = 0.1 # seconds
DEFAULT_RETRY_BACKOFF_FACTOR = 2.0


def _user_agent():
Expand Down Expand Up @@ -46,13 +52,17 @@ def __init__(self, endpoint, response_code, response_body):
)


class LambdaRuntimeClient(object):
class BaseLambdaRuntimeClient(object):
marshaller = LambdaMarshaller()
"""marshaller is a class attribute that determines the unmarshalling and marshalling logic of a function's event
and response. It allows for function authors to override the the default implementation, LambdaMarshaller which
unmarshals and marshals JSON, to an instance of a class that implements the same interface."""

def __init__(self, lambda_runtime_address, use_thread_for_polling_next=False):
def __init__(
self,
lambda_runtime_address,
use_thread_for_polling_next=False,
):
self.lambda_runtime_address = lambda_runtime_address
self.use_thread_for_polling_next = use_thread_for_polling_next
if self.use_thread_for_polling_next:
Expand Down Expand Up @@ -94,9 +104,16 @@ def post_init_error(self, error_response_data, error_type_override=None):
else error_response_data["errorType"]
)
}
self.call_rapid(
"POST", endpoint, http.HTTPStatus.ACCEPTED, error_response_data, headers
)
try:
self.call_rapid(
"POST", endpoint, http.HTTPStatus.ACCEPTED, error_response_data, headers
)
except Exception as e:
self.handle_init_error(e)

def handle_init_error(self, exc):
"""Override in subclasses to customize init error handling."""
raise NotImplementedError

def restore_next(self):
import http
Expand All @@ -113,14 +130,24 @@ def report_restore_error(self, restore_error_data):
"POST", endpoint, http.HTTPStatus.ACCEPTED, restore_error_data, headers
)

def handle_exception(self, exc, func_to_retry=None, use_backoff=False):
"""Override in subclasses to customize error handling."""
raise NotImplementedError

def _get_next(self):
try:
return runtime_client.next()
except Exception as e:
return self.handle_exception(e, runtime_client.next, True)

def wait_next_invocation(self):
# Calling runtime_client.next() from a separate thread unblocks the main thread,
# which can then process signals.
if self.use_thread_for_polling_next:
try:
# TPE class is supposed to be registered at construction time and be ready to use.
with self.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(runtime_client.next)
future = executor.submit(self._get_next)
response_body, headers = future.result()
except Exception as e:
raise FaultException(
Expand All @@ -145,17 +172,66 @@ def wait_next_invocation(self):
def post_invocation_result(
self, invoke_id, result_data, content_type="application/json"
):
runtime_client.post_invocation_result(
invoke_id,
(
result_data
if isinstance(result_data, bytes)
else result_data.encode("utf-8")
),
content_type,
)
try:
runtime_client.post_invocation_result(
invoke_id,
(
result_data
if isinstance(result_data, bytes)
else result_data.encode("utf-8")
),
content_type,
)
except Exception as e:
self.handle_exception(e)

def post_invocation_error(self, invoke_id, error_response_data, xray_fault):
max_header_size = 1024 * 1024 # 1MiB
xray_fault = xray_fault if len(xray_fault.encode()) < max_header_size else ""
runtime_client.post_error(invoke_id, error_response_data, xray_fault)
try:
max_header_size = 1024 * 1024
xray_fault = (
xray_fault if len(xray_fault.encode()) < max_header_size else ""
)
runtime_client.post_error(invoke_id, error_response_data, xray_fault)
except Exception as e:
self.handle_exception(e)


class LambdaRuntimeClient(BaseLambdaRuntimeClient):
def handle_exception(self, exc, func_to_retry=None, use_backoff=False):
raise exc

def handle_init_error(self, exc):
raise exc


class LambdaMultiConcurrentRuntimeClient(BaseLambdaRuntimeClient):
def _get_next_with_backoff(self, e, func_to_retry):
logging.warning(f"Initial runtime_client.next() failed: {e}")
delay = DEFAULT_RETRY_INITIAL_DELAY
latest_exception = None
for attempt in range(1, DEFAULT_RETRY_MAX_ATTEMPTS):
try:
logging.info(
f"Retrying runtime_client.next() [attempt {attempt + 1}]..."
)
time.sleep(delay)
return func_to_retry()
except Exception as e:
logging.warning(f"Attempt {attempt + 1} failed: {e}")
delay *= DEFAULT_RETRY_BACKOFF_FACTOR
latest_exception = e

raise latest_exception

# In multi-concurrent mode we don't want to raise unhandled exception and crash the worker on non-2xx responses from RAPID
def handle_exception(self, exc, func_to_retry=None, use_backoff=False):
if use_backoff:
return self._get_next_with_backoff(exc, func_to_retry)
# We retry if getting next invoke failed, but if posting response to RAPID failed we just log it and continue
logging.warning(f"{exc}: This won't kill the Runtime loop")

def handle_init_error(self, exc):
if isinstance(exc, LambdaRuntimeClientError) and exc.response_code == 403:
# Suppress 403 errors from RAPID during init - indicates another runtime worker has already posted init error
return
raise exc
1 change: 1 addition & 0 deletions awslambdaric/lambda_runtime_marshaller.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self):
if os.environ.get("AWS_EXECUTION_ENV") in {
"AWS_Lambda_python3.12",
"AWS_Lambda_python3.13",
"AWS_Lambda_python3.14",
}:
super().__init__(use_decimal=False, ensure_ascii=False, allow_nan=True)
else:
Expand Down
Binary file modified deps/aws-lambda-cpp-0.2.6.tar.gz
Binary file not shown.
16 changes: 16 additions & 0 deletions deps/patches/aws-lambda-cpp-logging-error.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
diff --git a/src/runtime.cpp b/src/runtime.cpp
index 9763282..9fe78d8 100644
--- a/src/runtime.cpp
+++ b/src/runtime.cpp
@@ -379,7 +379,10 @@ runtime::post_outcome runtime::do_post(

if (!is_success(aws::http::response_code(http_response_code))) {
logging::log_error(
- LOG_TAG, "Failed to post handler success response. Http response code: %ld.", http_response_code);
+ LOG_TAG,
+ "Failed to post handler success response. Http response code: %ld. %s",
+ http_response_code,
+ resp.get_body().c_str());
return aws::http::response_code(http_response_code);
}

3 changes: 2 additions & 1 deletion scripts/update_deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ wget -c https://github.com/awslabs/aws-lambda-cpp/archive/v$AWS_LAMBDA_CPP_RELEA
patch -p1 < ../patches/aws-lambda-cpp-make-the-runtime-client-user-agent-overrideable.patch && \
patch -p1 < ../patches/aws-lambda-cpp-make-lto-optional.patch && \
patch -p1 < ../patches/aws-lambda-cpp-add-content-type.patch && \
patch -p1 < ../patches/aws-lambda-cpp-add-tenant-id.patch
patch -p1 < ../patches/aws-lambda-cpp-add-tenant-id.patch && \
patch -p1 < ../patches/aws-lambda-cpp-logging-error.patch
)

## Pack again and remove the folder
Expand Down
Loading
Loading