diff --git a/src/workos/authorization.py b/src/workos/authorization.py index 6e12f035..45ae6e2f 100644 --- a/src/workos/authorization.py +++ b/src/workos/authorization.py @@ -2,12 +2,14 @@ from pydantic import TypeAdapter +from workos.types.authorization.access_evaluation import AccessEvaluation from workos.types.authorization.environment_role import ( EnvironmentRole, EnvironmentRoleList, ) from workos.types.authorization.organization_role import OrganizationRole from workos.types.authorization.permission import Permission +from workos.types.authorization.resource_identifier import ResourceIdentifier from workos.types.authorization.role import Role, RoleList from workos.types.list_resource import ( ListArgs, @@ -161,6 +163,16 @@ def add_environment_role_permission( permission_slug: str, ) -> SyncOrAsync[EnvironmentRole]: ... + # Access Evaluation + + def check( + self, + organization_membership_id: str, + *, + permission_slug: str, + resource: ResourceIdentifier, + ) -> SyncOrAsync[AccessEvaluation]: ... + class Authorization(AuthorizationModule): _http_client: SyncHTTPClient @@ -437,6 +449,24 @@ def add_environment_role_permission( return EnvironmentRole.model_validate(response) + def check( + self, + organization_membership_id: str, + *, + permission_slug: str, + resource: ResourceIdentifier, + ) -> AccessEvaluation: + json: Dict[str, Any] = {"permission_slug": permission_slug} + json.update(resource) + + response = self._http_client.request( + f"authorization/organization_memberships/{organization_membership_id}/check", + method=REQUEST_METHOD_POST, + json=json, + ) + + return AccessEvaluation.model_validate(response) + class AsyncAuthorization(AuthorizationModule): _http_client: AsyncHTTPClient @@ -712,3 +742,23 @@ async def add_environment_role_permission( ) return EnvironmentRole.model_validate(response) + + # Access Evaluation + + async def check( + self, + organization_membership_id: str, + *, + permission_slug: str, + resource: ResourceIdentifier, + ) -> AccessEvaluation: + json: Dict[str, Any] = {"permission_slug": permission_slug} + json.update(resource) + + response = await self._http_client.request( + f"authorization/organization_memberships/{organization_membership_id}/check", + method=REQUEST_METHOD_POST, + json=json, + ) + + return AccessEvaluation.model_validate(response) diff --git a/src/workos/types/authorization/__init__.py b/src/workos/types/authorization/__init__.py index 9eb705a0..93946662 100644 --- a/src/workos/types/authorization/__init__.py +++ b/src/workos/types/authorization/__init__.py @@ -13,6 +13,11 @@ ) from workos.types.authorization.permission import Permission from workos.types.authorization.resource import Resource +from workos.types.authorization.resource_identifier import ( + ResourceIdentifier, + ResourceIdentifierByExternalId, + ResourceIdentifierById, +) from workos.types.authorization.role import ( Role, RoleList, diff --git a/src/workos/types/authorization/resource_identifier.py b/src/workos/types/authorization/resource_identifier.py new file mode 100644 index 00000000..081a175d --- /dev/null +++ b/src/workos/types/authorization/resource_identifier.py @@ -0,0 +1,15 @@ +from typing import Union + +from typing_extensions import TypedDict + + +class ResourceIdentifierById(TypedDict): + resource_id: str + + +class ResourceIdentifierByExternalId(TypedDict): + resource_external_id: str + resource_type_slug: str + + +ResourceIdentifier = Union[ResourceIdentifierById, ResourceIdentifierByExternalId] diff --git a/tests/test_authorization_check.py b/tests/test_authorization_check.py new file mode 100644 index 00000000..09fad9ba --- /dev/null +++ b/tests/test_authorization_check.py @@ -0,0 +1,127 @@ +from typing import Union + +import pytest +from tests.utils.syncify import syncify +from workos.authorization import AsyncAuthorization, Authorization +from workos.types.authorization.resource_identifier import ( + ResourceIdentifierByExternalId, + ResourceIdentifierById, +) + + +@pytest.mark.sync_and_async(Authorization, AsyncAuthorization) +class TestAuthorizationCheck: + @pytest.fixture(autouse=True) + def setup(self, module_instance: Union[Authorization, AsyncAuthorization]): + self.http_client = module_instance._http_client + self.authorization = module_instance + + @pytest.fixture + def mock_check_authorized(self): + return {"authorized": True} + + @pytest.fixture + def mock_check_unauthorized(self): + return {"authorized": False} + + def test_check_authorized( + self, mock_check_authorized, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_check_authorized, 200 + ) + + result = syncify( + self.authorization.check( + "om_01ABC", + permission_slug="documents:read", + resource=ResourceIdentifierById(resource_id="res_01ABC"), + ) + ) + + assert result.authorized is True + assert request_kwargs["method"] == "post" + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01ABC/check" + ) + + def test_check_unauthorized( + self, mock_check_unauthorized, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_check_unauthorized, 200 + ) + + result = syncify( + self.authorization.check( + "om_01ABC", + permission_slug="documents:write", + resource=ResourceIdentifierById(resource_id="res_01ABC"), + ) + ) + + assert result.authorized is False + assert request_kwargs["method"] == "post" + + def test_check_with_resource_id( + self, mock_check_authorized, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_check_authorized, 200 + ) + + syncify( + self.authorization.check( + "om_01ABC", + permission_slug="documents:read", + resource=ResourceIdentifierById(resource_id="res_01XYZ"), + ) + ) + + assert request_kwargs["json"] == { + "permission_slug": "documents:read", + "resource_id": "res_01XYZ", + } + + def test_check_with_resource_external_id( + self, mock_check_authorized, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_check_authorized, 200 + ) + + syncify( + self.authorization.check( + "om_01ABC", + permission_slug="documents:read", + resource=ResourceIdentifierByExternalId( + resource_external_id="ext_doc_123", + resource_type_slug="document", + ), + ) + ) + + assert request_kwargs["json"] == { + "permission_slug": "documents:read", + "resource_external_id": "ext_doc_123", + "resource_type_slug": "document", + } + + def test_check_url_construction( + self, mock_check_authorized, capture_and_mock_http_client_request + ): + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock_check_authorized, 200 + ) + + syncify( + self.authorization.check( + "om_01MEMBERSHIP", + permission_slug="admin:access", + resource=ResourceIdentifierById(resource_id="res_01ABC"), + ) + ) + + assert request_kwargs["url"].endswith( + "/authorization/organization_memberships/om_01MEMBERSHIP/check" + )