Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
310 changes: 310 additions & 0 deletions orchestration/_tests/test_bl832/test_nersc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
# orchestration/_tests/test_bl832/test_nersc.py
import pytest
from uuid import uuid4

from prefect.blocks.system import Secret
from prefect.testing.utilities import prefect_test_harness


@pytest.fixture(autouse=True, scope="session")
def prefect_test_fixture():
with prefect_test_harness():
Secret(value=str(uuid4())).save(name="globus-client-id", overwrite=True)
Secret(value=str(uuid4())).save(name="globus-client-secret", overwrite=True)
yield


# ---------------------------------------------------------------------------
# Shared fixtures
# ---------------------------------------------------------------------------

@pytest.fixture
def mock_config(mocker):
config = mocker.MagicMock()
config.ghcr_images832 = {
"recon_image": "mock_recon_image",
"multires_image": "mock_multires_image",
}
return config


@pytest.fixture
def mock_sfapi_client(mocker):
"""sfapi_client.Client mock with user, compute, submit_job, and job chained."""
client = mocker.MagicMock()

mock_user = mocker.MagicMock()
mock_user.name = "testuser"
client.user.return_value = mock_user

mock_job = mocker.MagicMock()
mock_job.jobid = "12345"
mock_job.state = "COMPLETED"
client.compute.return_value.submit_job.return_value = mock_job
client.compute.return_value.job.return_value = mock_job

return client


@pytest.fixture
def mock_iriapi_client(mocker):
"""httpx.Client mock for IRI API responses."""
client = mocker.MagicMock()

submit_response = mocker.MagicMock()
submit_response.json.return_value = {"id": "99999"}
client.post.return_value = submit_response

status_response = mocker.MagicMock()
status_response.json.return_value = {"status": {"state": "completed"}}
client.get.return_value = status_response

return client

# ---------------------------------------------------------------------------
# _create_sfapi_client
# ---------------------------------------------------------------------------


def test_create_sfapi_client_success(mocker):
"""Valid credentials produce a Client instance."""
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController

mocker.patch("orchestration.flows.bl832.nersc.os.getenv", side_effect=lambda x: {
"PATH_NERSC_CLIENT_ID": "/path/to/client_id",
"PATH_NERSC_PRI_KEY": "/path/to/client_secret",
}.get(x))
mocker.patch("orchestration.flows.bl832.nersc.os.path.isfile", return_value=True)
mocker.patch(
"builtins.open",
side_effect=[
mocker.mock_open(read_data="my-client-id")(),
mocker.mock_open(read_data='{"kty": "RSA", "n": "x", "e": "y"}')(),
]
)
mocker.patch("orchestration.flows.bl832.nersc.JsonWebKey.import_key", return_value="mock_secret")
mock_client_cls = mocker.patch("orchestration.flows.bl832.nersc.Client")

client = NERSCTomographyHPCController._create_sfapi_client()

mock_client_cls.assert_called_once_with("my-client-id", "mock_secret")
assert client is mock_client_cls.return_value


def test_create_sfapi_client_missing_paths(mocker):
"""Unset env vars raise ValueError."""
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController

mocker.patch("orchestration.flows.bl832.nersc.os.getenv", return_value=None)

with pytest.raises(ValueError, match="Missing NERSC credentials paths."):
NERSCTomographyHPCController._create_sfapi_client()


def test_create_sfapi_client_missing_files(mocker):
"""Env vars set but files absent raise FileNotFoundError."""
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController

mocker.patch("orchestration.flows.bl832.nersc.os.getenv", side_effect=lambda x: {
"PATH_NERSC_CLIENT_ID": "/path/to/client_id",
"PATH_NERSC_PRI_KEY": "/path/to/client_secret",
}.get(x))
mocker.patch("orchestration.flows.bl832.nersc.os.path.isfile", return_value=False)

with pytest.raises(FileNotFoundError, match="NERSC credential files are missing."):
NERSCTomographyHPCController._create_sfapi_client()


# ---------------------------------------------------------------------------
# reconstruct — SFAPI
# ---------------------------------------------------------------------------

def test_reconstruct_sfapi_success(mocker, mock_sfapi_client, mock_config):
"""SFAPI reconstruct submits a job and waits for completion."""
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController, NERSCLoginMethod
from sfapi_client.compute import Machine

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")

controller = NERSCTomographyHPCController(
client=mock_sfapi_client,
config=mock_config,
login_method=NERSCLoginMethod.SFAPI,
)

result = controller.reconstruct(file_path="folder/scan.h5")

assert result is True
assert mock_sfapi_client.compute.call_count == 2
mock_sfapi_client.compute.assert_called_with(Machine.perlmutter)
mock_sfapi_client.compute.return_value.submit_job.assert_called_once()
mock_sfapi_client.compute.return_value.job.assert_called_once_with(jobid="12345")
mock_sfapi_client.compute.return_value.job.return_value.complete.assert_called_once()


def test_reconstruct_sfapi_submission_failure(mocker, mock_sfapi_client, mock_config):
"""SFAPI reconstruct returns False when submission raises."""
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController, NERSCLoginMethod

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mock_sfapi_client.compute.return_value.submit_job.side_effect = Exception("SFAPI error")

controller = NERSCTomographyHPCController(
client=mock_sfapi_client,
config=mock_config,
login_method=NERSCLoginMethod.SFAPI,
)

result = controller.reconstruct(file_path="folder/scan.h5")

assert result is False


# ---------------------------------------------------------------------------
# reconstruct — IRIAPI
# ---------------------------------------------------------------------------

def test_reconstruct_iriapi_success(mocker, mock_iriapi_client, mock_config, monkeypatch):
"""IRIAPI reconstruct POSTs a job and polls for COMPLETED state."""
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController, NERSCLoginMethod

monkeypatch.setenv("NERSC_USERNAME", "alsdev")
mocker.patch("orchestration.flows.bl832.nersc.time.sleep")

controller = NERSCTomographyHPCController(
client=mock_iriapi_client,
config=mock_config,
login_method=NERSCLoginMethod.IRIAPI,
)

result = controller.reconstruct(file_path="folder/scan.h5")

assert result is True
mock_iriapi_client.post.assert_called_once()
assert mock_iriapi_client.post.call_args.args[0] == "/api/v1/compute/job/compute"
assert "executable" in mock_iriapi_client.post.call_args.kwargs["json"]
mock_iriapi_client.get.assert_called_once_with(
"/api/v1/compute/status/compute/99999"
)


def test_reconstruct_iriapi_job_failed(mocker, mock_iriapi_client, mock_config, monkeypatch):
"""IRIAPI reconstruct returns False when job state is failed."""
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController, NERSCLoginMethod

monkeypatch.setenv("NERSC_USERNAME", "alsdev")
mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mock_iriapi_client.get.return_value.json.return_value = {"status": {"state": "failed"}} # was {"state": "FAILED"}

controller = NERSCTomographyHPCController(
client=mock_iriapi_client,
config=mock_config,
login_method=NERSCLoginMethod.IRIAPI,
)

result = controller.reconstruct(file_path="folder/scan.h5")

assert result is False


def test_reconstruct_iriapi_missing_username(mocker, mock_iriapi_client, mock_config, monkeypatch):
"""IRIAPI reconstruct raises ValueError when NERSC_USERNAME is unset."""
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController, NERSCLoginMethod

monkeypatch.delenv("NERSC_USERNAME", raising=False)

controller = NERSCTomographyHPCController(
client=mock_iriapi_client,
config=mock_config,
login_method=NERSCLoginMethod.IRIAPI,
)

with pytest.raises(ValueError, match="NERSC_USERNAME"):
controller.reconstruct(file_path="folder/scan.h5")


# ---------------------------------------------------------------------------
# build_multi_resolution — SFAPI
# ---------------------------------------------------------------------------

def test_build_multi_resolution_sfapi_success(mocker, mock_sfapi_client, mock_config):
"""SFAPI build_multi_resolution submits and waits successfully."""
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController, NERSCLoginMethod
from sfapi_client.compute import Machine

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")

controller = NERSCTomographyHPCController(
client=mock_sfapi_client,
config=mock_config,
login_method=NERSCLoginMethod.SFAPI,
)

result = controller.build_multi_resolution(file_path="folder/scan.h5")

assert result is True
assert mock_sfapi_client.compute.call_count == 2
mock_sfapi_client.compute.assert_called_with(Machine.perlmutter)


def test_build_multi_resolution_sfapi_failure(mocker, mock_sfapi_client, mock_config):
"""SFAPI build_multi_resolution returns False when submission raises."""
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController, NERSCLoginMethod

mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mock_sfapi_client.compute.return_value.submit_job.side_effect = Exception("error")

controller = NERSCTomographyHPCController(
client=mock_sfapi_client,
config=mock_config,
login_method=NERSCLoginMethod.SFAPI,
)

result = controller.build_multi_resolution(file_path="folder/scan.h5")

assert result is False


# ---------------------------------------------------------------------------
# build_multi_resolution — IRIAPI
# ---------------------------------------------------------------------------

def test_build_multi_resolution_iriapi_success(mocker, mock_iriapi_client, mock_config, monkeypatch):
"""IRIAPI build_multi_resolution POSTs and polls successfully."""
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController, NERSCLoginMethod

monkeypatch.setenv("NERSC_USERNAME", "alsdev")
mocker.patch("orchestration.flows.bl832.nersc.time.sleep")

controller = NERSCTomographyHPCController(
client=mock_iriapi_client,
config=mock_config,
login_method=NERSCLoginMethod.IRIAPI,
)

result = controller.build_multi_resolution(file_path="folder/scan.h5")

assert result is True
mock_iriapi_client.post.assert_called_once()
mock_iriapi_client.get.assert_called_once_with(
"/api/v1/compute/status/compute/99999"
)


def test_build_multi_resolution_iriapi_failure(mocker, mock_iriapi_client, mock_config, monkeypatch):
"""IRIAPI build_multi_resolution returns False when job state is failed."""
from orchestration.flows.bl832.nersc import NERSCTomographyHPCController, NERSCLoginMethod

monkeypatch.setenv("NERSC_USERNAME", "alsdev")
mocker.patch("orchestration.flows.bl832.nersc.time.sleep")
mock_iriapi_client.get.return_value.json.return_value = {"status": {"state": "failed"}}

controller = NERSCTomographyHPCController(
client=mock_iriapi_client,
config=mock_config,
login_method=NERSCLoginMethod.IRIAPI,
)

result = controller.build_multi_resolution(file_path="folder/scan.h5")

assert result is False
Loading
Loading