Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from ._generated import models as _models
from ._generated._utils.serialization import Serializer
from ._generated.operations import CallRecordingOperations
from ._shared.recording_url_validator import validate_recording_url

_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False
Expand Down Expand Up @@ -64,6 +65,9 @@ def download_streaming(
if not parsed_hostname:
raise ValueError("Recording client endpoint must not be None.")

# Validate recording URL before sending authenticated request
validate_recording_url(source_location, "source_location")

_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
request = _build_call_recording_download_recording_request(
Expand Down Expand Up @@ -111,6 +115,9 @@ def delete_recording(self, recording_location: str, **kwargs: Any) -> None:
if not parsed_hostname:
raise ValueError("Recording client endpoint must not be None.")

# Validate recording URL before sending authenticated request
validate_recording_url(recording_location, "recording_location")

_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
request = _build_call_recording_delete_recording_request(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from typing import Tuple
from urllib.parse import urlparse


# Allowed recording endpoint host suffixes.
# These are the only domains permitted for recording URLs to prevent credential exfiltration.
ALLOWED_HOST_SUFFIXES: Tuple[str, ...] = (
".asm.skype.com",
".asyncgw.teams.microsoft.com",
)


def validate_recording_url(recording_url: str, parameter_name: str) -> None:
"""
Validate that a recording URL points to Azure Communication Services
endpoint before credentials are attached.
This prevents credential exfiltration via SSRF attacks.

:param recording_url: The recording URL to validate.
:type recording_url: str
:param parameter_name: The parameter name for exception messages.
:type parameter_name: str
:raises TypeError: If the recording URL is None or empty.
:raises ValueError: If the recording URL is not a valid absolute HTTPS URI or not from an allowed domain.
"""
if not recording_url:
raise TypeError(f"{parameter_name} cannot be null or undefined.")

parsed_url = urlparse(recording_url)

# Validate it's a valid absolute URI
if not parsed_url.scheme or not parsed_url.netloc:
raise ValueError(f"{parameter_name} must be a valid absolute URI.")

# Ensure the URL uses HTTPS
if parsed_url.scheme.lower() != "https":
raise ValueError(f"{parameter_name} must use HTTPS scheme for security.")

host = parsed_url.hostname
if not host:
raise ValueError(f"{parameter_name} must have a valid hostname.")

host_lower = host.lower()

# Check against allowed suffixes
is_valid_endpoint = any(host_lower.endswith(suffix) for suffix in ALLOWED_HOST_SUFFIXES)

if not is_valid_endpoint:
raise ValueError(
f"{parameter_name} host '{host}' is not a valid Azure Communication Services recording endpoint. "
"Only URLs pointing to *.asm.skype.com, *.asyncgw.teams.microsoft.com are allowed."
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from .._generated import models as _models
from .._generated._utils.serialization import Serializer
from .._generated.aio.operations import CallRecordingOperations
from .._shared.recording_url_validator import validate_recording_url

_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False
Expand Down Expand Up @@ -65,6 +66,9 @@ async def download_streaming(
if not parsed_hostname:
raise ValueError("Recording client endpoint must not be None.")

# Validate recording URL before sending authenticated request
validate_recording_url(source_location, "source_location")

_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
request = _build_call_recording_download_recording_request(
Expand Down Expand Up @@ -113,6 +117,9 @@ async def delete_recording(self, recording_location: str, **kwargs: Any) -> None
if not parsed_hostname:
raise ValueError("Recording client endpoint must not be None.")

# Validate recording URL before sending authenticated request
validate_recording_url(recording_location, "recording_location")

_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
request = _build_call_recording_delete_recording_request(
Expand Down