From 2f71af94810efd0cb9db9fdca17eeabb3d342317 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 17 Mar 2026 15:30:39 -0700 Subject: [PATCH 01/11] Adding a script and a pytest to help clean up files on the bl832recon1x as a cron job --- orchestration/_tests/test_bl832/__init__.py | 0 .../test_bl832/test_prune_bl832recon1x.py | 358 ++++++++++++++++++ .../flows/bl832/prune_bl832recon1x.py | 234 ++++++++++++ 3 files changed, 592 insertions(+) create mode 100644 orchestration/_tests/test_bl832/__init__.py create mode 100644 orchestration/_tests/test_bl832/test_prune_bl832recon1x.py create mode 100644 orchestration/flows/bl832/prune_bl832recon1x.py diff --git a/orchestration/_tests/test_bl832/__init__.py b/orchestration/_tests/test_bl832/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orchestration/_tests/test_bl832/test_prune_bl832recon1x.py b/orchestration/_tests/test_bl832/test_prune_bl832recon1x.py new file mode 100644 index 00000000..c5136b73 --- /dev/null +++ b/orchestration/_tests/test_bl832/test_prune_bl832recon1x.py @@ -0,0 +1,358 @@ +"""Tests for orchestration/flows/bl832/prune_bl832recon1x.py.""" + +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from orchestration.flows.bl832.prune_bl832recon1x import ( + get_creation_time, + prune_docker, + prune_scratch_endpoint, + prune_zarr_volumes, +) +from orchestration.transfer_endpoints import FileSystemEndpoint + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +CUTOFF = datetime(2025, 1, 1, tzinfo=timezone.utc) +OLD_CREATION = datetime(2024, 6, 1, tzinfo=timezone.utc) # before cutoff — should be pruned +NEW_CREATION = datetime(2025, 6, 1, tzinfo=timezone.utc) # after cutoff — should be retained + +MODULE = "orchestration.flows.bl832.prune_bl832recon1x" + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def mock_config(mocker): + """Mocked BeamlineConfig — avoids importing Config832 and triggering Globus/Prefect init.""" + return mocker.MagicMock() + + +@pytest.fixture() +def sample_endpoint(tmp_path: Path) -> FileSystemEndpoint: + """FileSystemEndpoint pointing at a temporary Zarr samples directory.""" + return FileSystemEndpoint( + name="test_samples", + root_path=str(tmp_path), + uri="bl832recon1x.lbl.gov", + ) + + +@pytest.fixture() +def scratch_endpoint(tmp_path: Path) -> FileSystemEndpoint: + """FileSystemEndpoint pointing at a temporary scratch directory.""" + return FileSystemEndpoint( + name="test_scratch", + root_path=str(tmp_path), + uri="bl832recon1x.lbl.gov", + ) + + +# --------------------------------------------------------------------------- +# get_creation_time +# --------------------------------------------------------------------------- + + +def test_get_creation_time_returns_datetime_when_available(mocker) -> None: + """Returns a timezone-aware datetime when filesystem creation time is present.""" + mock_stat = mocker.MagicMock() + mock_stat.st_birthtime = OLD_CREATION.timestamp() + mock_path = mocker.MagicMock() + mock_path.stat.return_value = mock_stat + + result = get_creation_time(mock_path) + + assert result == OLD_CREATION + + +def test_get_creation_time_returns_none_when_creation_time_is_zero(mocker) -> None: + """Returns None when st_birthtime is zero (filesystem does not support creation time).""" + mock_stat = mocker.MagicMock() + mock_stat.st_birthtime = 0 # filesystem does not expose creation time + mock_path = mocker.MagicMock() + mock_path.stat.return_value = mock_stat + + result = get_creation_time(mock_path) + + assert result is None + + +def test_get_creation_time_returns_none_when_attribute_missing(mocker) -> None: + """Returns None when creation time attribute is not present on the stat result.""" + mock_stat = mocker.MagicMock(spec=[]) # no attributes + mock_path = mocker.MagicMock() + mock_path.stat.return_value = mock_stat + + result = get_creation_time(mock_path) + + assert result is None + + +def test_get_creation_time_returns_none_on_os_error(mocker) -> None: + """Returns None and logs a warning when stat raises OSError.""" + mock_path = mocker.MagicMock() + mock_path.stat.side_effect = OSError("permission denied") + + result = get_creation_time(mock_path) + + assert result is None + + +# --------------------------------------------------------------------------- +# prune_zarr_volumes +# --------------------------------------------------------------------------- + + +def test_prune_zarr_volumes_removes_old_volume( + sample_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Calls controller.prune for Zarr volumes created before the cutoff.""" + old_zarr = Path(sample_endpoint.root_path) / "old_scan" + old_zarr.mkdir() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config) + + mock_controller.prune.assert_called_once_with( + file_path="old_scan", + source_endpoint=sample_endpoint, + days_from_now=0, + ) + + +def test_prune_zarr_volumes_retains_new_volume( + sample_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call controller.prune for Zarr volumes created after the cutoff.""" + new_zarr = Path(sample_endpoint.root_path) / "new_scan" + new_zarr.mkdir() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=NEW_CREATION) + + prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config) + + mock_controller.prune.assert_not_called() + + +def test_prune_zarr_volumes_skips_demo_prefix( + sample_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call controller.prune for directories prefixed with 'demo_'.""" + demo_zarr = Path(sample_endpoint.root_path) / "demo_sample" + demo_zarr.mkdir() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config) + + mock_controller.prune.assert_not_called() + + +def test_prune_zarr_volumes_skips_when_creation_time_unavailable( + sample_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call controller.prune when creation time cannot be determined.""" + zarr = Path(sample_endpoint.root_path) / "unknown_age" + zarr.mkdir() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=None) + + prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config) + + mock_controller.prune.assert_not_called() + + +def test_prune_zarr_volumes_skips_files( + sample_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call controller.prune for loose files in the sample directory.""" + loose_file = Path(sample_endpoint.root_path) / "stray_file.txt" + loose_file.touch() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config) + + mock_controller.prune.assert_not_called() + + +def test_prune_zarr_volumes_noop_when_directory_missing(mock_config, mocker) -> None: + """Returns without error when the sample directory does not exist.""" + endpoint = FileSystemEndpoint( + name="missing", + root_path="/nonexistent/path", + uri="bl832recon1x.lbl.gov", + ) + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + + prune_zarr_volumes(endpoint, CUTOFF, mock_config) + + mock_controller.prune.assert_not_called() + + +# --------------------------------------------------------------------------- +# prune_scratch_endpoint +# --------------------------------------------------------------------------- + + +def test_prune_scratch_endpoint_removes_old_file( + scratch_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Calls controller.prune for files created before the cutoff.""" + old_file = Path(scratch_endpoint.root_path) / "old_result.h5" + old_file.touch() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config) + + mock_controller.prune.assert_called_once_with( + file_path="old_result.h5", + source_endpoint=scratch_endpoint, + days_from_now=0, + ) + + +def test_prune_scratch_endpoint_retains_new_file( + scratch_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call controller.prune for files created after the cutoff.""" + new_file = Path(scratch_endpoint.root_path) / "new_result.h5" + new_file.touch() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=NEW_CREATION) + + prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config) + + mock_controller.prune.assert_not_called() + + +def test_prune_scratch_endpoint_recurses_into_subdirectories( + scratch_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Calls controller.prune for old files in nested subdirectories.""" + nested = Path(scratch_endpoint.root_path) / "subdir" / "nested" + nested.mkdir(parents=True) + nested_file = nested / "data.h5" + nested_file.touch() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config) + + mock_controller.prune.assert_called_once_with( + file_path="subdir/nested/data.h5", + source_endpoint=scratch_endpoint, + days_from_now=0, + ) + + +def test_prune_scratch_endpoint_removes_empty_directories_after_pruning( + scratch_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Removes directories that are left empty after file deletion.""" + subdir = Path(scratch_endpoint.root_path) / "empty_after_prune" + subdir.mkdir() + old_file = subdir / "old.h5" + old_file.touch() + + # Simulate the controller actually deleting the file so the sweep has something to act on + def fake_prune(**kwargs: object) -> bool: + Path(scratch_endpoint.root_path, kwargs["file_path"]).unlink(missing_ok=True) + return True + + mock_controller = mocker.MagicMock() + mock_controller.prune.side_effect = fake_prune + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config) + + assert not subdir.exists() + + +def test_prune_scratch_endpoint_skips_when_creation_time_unavailable( + scratch_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call controller.prune when creation time cannot be determined.""" + f = Path(scratch_endpoint.root_path) / "unknown.h5" + f.touch() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=None) + + prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config) + + mock_controller.prune.assert_not_called() + + +def test_prune_scratch_endpoint_noop_when_directory_missing(mock_config, mocker) -> None: + """Returns without error when the scratch directory does not exist.""" + endpoint = FileSystemEndpoint( + name="missing", + root_path="/nonexistent/path", + uri="bl832recon1x.lbl.gov", + ) + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + + prune_scratch_endpoint(endpoint, CUTOFF, mock_config) + + mock_controller.prune.assert_not_called() + + +# --------------------------------------------------------------------------- +# prune_docker +# --------------------------------------------------------------------------- + + +def test_prune_docker_runs_all_commands(mocker) -> None: + """Calls subprocess.run for each of the three Docker prune commands.""" + mock_run = mocker.patch(f"{MODULE}.subprocess.run") + mock_run.return_value = mocker.MagicMock(stdout="", returncode=0) + + prune_docker() + + assert mock_run.call_count == 3 + calls = [c.args[0] for c in mock_run.call_args_list] + assert ["docker", "image", "prune", "-af"] in calls + assert ["docker", "container", "prune", "-f"] in calls + assert ["docker", "builder", "prune", "-f"] in calls + + +def test_prune_docker_logs_on_failure(mocker) -> None: + """Logs an error without raising when a Docker command fails.""" + import subprocess + + mocker.patch( + f"{MODULE}.subprocess.run", + side_effect=subprocess.CalledProcessError(1, "docker", stderr="daemon not running"), + ) + + # Should not raise + prune_docker() diff --git a/orchestration/flows/bl832/prune_bl832recon1x.py b/orchestration/flows/bl832/prune_bl832recon1x.py new file mode 100644 index 00000000..e3a077ba --- /dev/null +++ b/orchestration/flows/bl832/prune_bl832recon1x.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python3 +"""Periodic storage cleanup for bl832recon1x. + +Removes old Zarr volumes, scratch data, and Docker artifacts to prevent +storage saturation. Intended to run monthly as a cron job with root privileges. + +Cron setup (run once): + cd /home/bl832user/Documents/code/splash_flows + python3 -m venv .venv + source .venv/bin/activate + pip install -e . + sudo crontab -e + +Cron entry (runs at 2am on the 1st of each month): + 0 2 1 * * /home/bl832user/Documents/code/splash_flows/.venv/bin/python + /home/bl832user/Documents/code/splash_flows/orchestration/flows/bl832/bl832_cleanup.py + +Requires Ubuntu 24.04 (kernel 6.8+, ext4) for reliable creation time via stat st_birthtime. +""" + +import logging +import subprocess +from datetime import datetime, timedelta, timezone +from pathlib import Path + +from orchestration.config import BeamlineConfig +from orchestration.prune_controller import FileSystemPruneController +from orchestration.transfer_endpoints import FileSystemEndpoint + +# --------------------------------------------------------------------------- +# Configuration — edit here to adjust behaviour +# --------------------------------------------------------------------------- + +SAMPLE_ENDPOINT = FileSystemEndpoint( + name="bl832recon1x_samples", + root_path="/home/bl832user/Documents/example_samples", + uri="bl832recon1x.lbl.gov", +) + +# Add or remove endpoints to apply recursive file pruning. +# Each entry uses FileSystemEndpoint so root_path, name, and uri stay consistent +# with the rest of splash_flows. +SCRATCH_ENDPOINTS: list[FileSystemEndpoint] = [ + FileSystemEndpoint( + name="bl832recon1x_scratch", + root_path="/home/bl832user/Documents/data/scratch", + uri="bl832recon1x.lbl.gov", + ), + # FileSystemEndpoint( + # name="bl832recon1x_new_folder", + # root_path="/home/bl832user/Documents/some/new/folder", + # uri="bl832recon1x.lbl.gov", + # ), +] + +PRUNE_AFTER_DAYS = 30 +LOG_FILE = Path("/var/log/bl832_cleanup.log") + +# --------------------------------------------------------------------------- + +logger = logging.getLogger(__name__) + + +def get_creation_time(path: Path) -> datetime | None: + """Return the birth (creation) time of a file or directory. + + Uses st_birthtime via getattr — available on Ubuntu 24.04 (kernel 6.8+, ext4). + Returns None if creation time is unavailable or reported as zero. + + Args: + path: Path to the file or directory. + + Returns: + A timezone-aware datetime of the creation time, or None if unavailable. + """ + try: + stat = path.stat() + creation_ts = getattr(stat, "st_birthtime", None) # filesystem creation time + if not creation_ts: + return None + return datetime.fromtimestamp(creation_ts, tz=timezone.utc) + except OSError as e: + logger.warning(f"Could not stat {path}: {e}") + return None + + +def prune_zarr_volumes(endpoint: FileSystemEndpoint, cutoff: datetime, config: BeamlineConfig) -> None: + """Remove Zarr volumes in the endpoint's root_path older than cutoff. + + Top-level directories prefixed with 'demo_' are preserved regardless of age. + Each Zarr volume is removed atomically by FileSystemPruneController. + + Args: + endpoint: FileSystemEndpoint representing the Zarr samples directory. + cutoff: Datetime threshold; volumes created before this are removed. + config: Beamline configuration passed to FileSystemPruneController. + """ + sample_dir = Path(endpoint.root_path) + + if not sample_dir.is_dir(): + logger.warning(f"Sample dir does not exist, skipping: {sample_dir}") + return + + logger.info(f"--- Zarr volume cleanup: {endpoint.name} ({sample_dir}) ---") + + controller = FileSystemPruneController(config) + + for zarr_dir in sorted(sample_dir.iterdir()): + if not zarr_dir.is_dir(): + continue + + if zarr_dir.name.startswith("demo_"): + logger.info(f"Skipping demo volume: {zarr_dir}") + continue + + creation = get_creation_time(zarr_dir) + if creation is None: + logger.warning(f"Creation time unavailable for {zarr_dir} — skipping") + continue + + if creation < cutoff: + logger.info(f"Removing Zarr volume: {zarr_dir} (created {creation.date()})") + controller.prune( + file_path=zarr_dir.name, + source_endpoint=endpoint, + days_from_now=0, + ) + else: + logger.info(f"Retaining: {zarr_dir} (created {creation.date()})") + + +def prune_scratch_endpoint(endpoint: FileSystemEndpoint, cutoff: datetime, config: BeamlineConfig) -> None: + """Recursively remove files in the endpoint's root_path created before cutoff. + + After removing old files, sweeps empty directories bottom-up. + + Args: + endpoint: FileSystemEndpoint representing the scratch directory to prune. + cutoff: Datetime threshold; files created before this are removed. + config: Beamline configuration passed to FileSystemPruneController. + """ + scratch_dir = Path(endpoint.root_path) + + if not scratch_dir.is_dir(): + logger.warning(f"Scratch dir does not exist, skipping: {endpoint.name} ({scratch_dir})") + return + + logger.info(f"--- Scratch data cleanup: {endpoint.name} ({scratch_dir}) ---") + + controller = FileSystemPruneController(config) + + for file in sorted(scratch_dir.rglob("*")): + if not file.is_file(): + continue + + creation = get_creation_time(file) + if creation is None: + logger.warning(f"Creation time unavailable for {file} — skipping") + continue + + if creation < cutoff: + logger.info(f"Removing file: {file} (created {creation.date()})") + controller.prune( + file_path=str(file.relative_to(scratch_dir)), + source_endpoint=endpoint, + days_from_now=0, + ) + + # Sweep empty directories left behind, deepest-first + for directory in sorted(scratch_dir.rglob("*"), reverse=True): + if directory.is_dir() and not any(directory.iterdir()): + try: + directory.rmdir() + logger.info(f"Removed empty directory: {directory}") + except OSError as e: + logger.error(f"Failed to remove empty directory {directory}: {e}") + + logger.info(f"Scratch cleanup complete for {endpoint.name}") + + +def prune_docker() -> None: + """Remove unused Docker images, stopped containers, and build cache.""" + logger.info("--- Docker cleanup ---") + + commands: list[list[str]] = [ + ["docker", "image", "prune", "-af"], + ["docker", "container", "prune", "-f"], + ["docker", "builder", "prune", "-f"], + ] + + for cmd in commands: + logger.info(f"Running: {' '.join(cmd)}") + try: + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + if result.stdout.strip(): + logger.info(result.stdout.strip()) + except subprocess.CalledProcessError as e: + logger.error(f"Docker command failed: {' '.join(cmd)}\n{e.stderr.strip()}") + + +def main() -> None: + """Run all cleanup tasks for bl832recon1x.""" + logging.basicConfig( + level=logging.INFO, + format="[%(asctime)s] %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + handlers=[ + logging.FileHandler(LOG_FILE), + logging.StreamHandler(), + ], + ) + from orchestration.flows.bl832.config import Config832 + config = Config832() + cutoff = datetime.now(tz=timezone.utc) - timedelta(days=PRUNE_AFTER_DAYS) + + logger.info("==========================================") + logger.info("Starting bl832 cleanup") + logger.info(f"Pruning items created before {cutoff.date()}") + logger.info("==========================================") + + prune_zarr_volumes(SAMPLE_ENDPOINT, cutoff, config) + + for endpoint in SCRATCH_ENDPOINTS: + prune_scratch_endpoint(endpoint, cutoff, config) + + prune_docker() + + logger.info("==========================================") + logger.info("Cleanup complete") + logger.info("==========================================") + + +if __name__ == "__main__": + main() From b5e029e48b6229db241cb0fd2d6c6166962cefb7 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 31 Mar 2026 10:52:09 -0700 Subject: [PATCH 02/11] updating file name in docstring, commenting out docker prune for now --- orchestration/flows/bl832/prune_bl832recon1x.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl832/prune_bl832recon1x.py b/orchestration/flows/bl832/prune_bl832recon1x.py index e3a077ba..31ce78e6 100644 --- a/orchestration/flows/bl832/prune_bl832recon1x.py +++ b/orchestration/flows/bl832/prune_bl832recon1x.py @@ -13,7 +13,7 @@ Cron entry (runs at 2am on the 1st of each month): 0 2 1 * * /home/bl832user/Documents/code/splash_flows/.venv/bin/python - /home/bl832user/Documents/code/splash_flows/orchestration/flows/bl832/bl832_cleanup.py + /home/bl832user/Documents/code/splash_flows/orchestration/flows/bl832/prune_bl832recon1x.py Requires Ubuntu 24.04 (kernel 6.8+, ext4) for reliable creation time via stat st_birthtime. """ @@ -223,7 +223,7 @@ def main() -> None: for endpoint in SCRATCH_ENDPOINTS: prune_scratch_endpoint(endpoint, cutoff, config) - prune_docker() + # prune_docker() logger.info("==========================================") logger.info("Cleanup complete") From 65a5beb3ce5d162976b390f88e670c9a84eb2a25 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 31 Mar 2026 13:10:17 -0700 Subject: [PATCH 03/11] creating a simple config class for the prune_bl832recon1x.py script so it doesn't try to use Globus --- orchestration/flows/bl832/prune_bl832recon1x.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl832/prune_bl832recon1x.py b/orchestration/flows/bl832/prune_bl832recon1x.py index 31ce78e6..dc6b5137 100644 --- a/orchestration/flows/bl832/prune_bl832recon1x.py +++ b/orchestration/flows/bl832/prune_bl832recon1x.py @@ -31,6 +31,16 @@ # Configuration — edit here to adjust behaviour # --------------------------------------------------------------------------- + +class _LocalConfig(BeamlineConfig): + """Minimal config for local-only cleanup — no Globus connections.""" + def __init__(self) -> None: + super().__init__(beamline_id="8.3.2") + + def _beam_specific_config(self) -> None: + pass # no Globus, no transfer client + + SAMPLE_ENDPOINT = FileSystemEndpoint( name="bl832recon1x_samples", root_path="/home/bl832user/Documents/example_samples", @@ -209,8 +219,7 @@ def main() -> None: logging.StreamHandler(), ], ) - from orchestration.flows.bl832.config import Config832 - config = Config832() + config = _LocalConfig() cutoff = datetime.now(tz=timezone.utc) - timedelta(days=PRUNE_AFTER_DAYS) logger.info("==========================================") From 23b035ec2facc77f00cd198f8d930dfaa793e88e Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 31 Mar 2026 13:10:47 -0700 Subject: [PATCH 04/11] saving logs in /tmp --- orchestration/flows/bl832/prune_bl832recon1x.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl832/prune_bl832recon1x.py b/orchestration/flows/bl832/prune_bl832recon1x.py index dc6b5137..94673523 100644 --- a/orchestration/flows/bl832/prune_bl832recon1x.py +++ b/orchestration/flows/bl832/prune_bl832recon1x.py @@ -64,7 +64,7 @@ def _beam_specific_config(self) -> None: ] PRUNE_AFTER_DAYS = 30 -LOG_FILE = Path("/var/log/bl832_cleanup.log") +LOG_FILE = Path("/tmp/bl832_cleanup.log") # --------------------------------------------------------------------------- From 2862f5d366f918d9b97fa51b8d529093408978c7 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 31 Mar 2026 13:21:57 -0700 Subject: [PATCH 05/11] Creating a non-prefect filesystem prune method in prune_controller, and using it in prune_bl832recon1x.py --- .../flows/bl832/prune_bl832recon1x.py | 10 ++-- orchestration/prune_controller.py | 51 ++++++++++++++++++- 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/orchestration/flows/bl832/prune_bl832recon1x.py b/orchestration/flows/bl832/prune_bl832recon1x.py index 94673523..343cdb00 100644 --- a/orchestration/flows/bl832/prune_bl832recon1x.py +++ b/orchestration/flows/bl832/prune_bl832recon1x.py @@ -85,7 +85,7 @@ def get_creation_time(path: Path) -> datetime | None: """ try: stat = path.stat() - creation_ts = getattr(stat, "st_birthtime", None) # filesystem creation time + creation_ts = getattr(stat, "st_birthtime", None) or stat.st_mtime # filesystem creation time if not creation_ts: return None return datetime.fromtimestamp(creation_ts, tz=timezone.utc) @@ -130,10 +130,10 @@ def prune_zarr_volumes(endpoint: FileSystemEndpoint, cutoff: datetime, config: B if creation < cutoff: logger.info(f"Removing Zarr volume: {zarr_dir} (created {creation.date()})") - controller.prune( + controller.prune_no_prefect( file_path=zarr_dir.name, source_endpoint=endpoint, - days_from_now=0, + check_endpoint=None, ) else: logger.info(f"Retaining: {zarr_dir} (created {creation.date()})") @@ -170,10 +170,10 @@ def prune_scratch_endpoint(endpoint: FileSystemEndpoint, cutoff: datetime, confi if creation < cutoff: logger.info(f"Removing file: {file} (created {creation.date()})") - controller.prune( + controller.prune_no_prefect( file_path=str(file.relative_to(scratch_dir)), source_endpoint=endpoint, - days_from_now=0, + check_endpoint=None, ) # Sweep empty directories left behind, deepest-first diff --git a/orchestration/prune_controller.py b/orchestration/prune_controller.py index 4d97826d..50783ef3 100644 --- a/orchestration/prune_controller.py +++ b/orchestration/prune_controller.py @@ -3,6 +3,8 @@ from enum import Enum import logging import os +from pathlib import Path +import shutil from typing import Generic, Optional, TypeVar from prefect import flow @@ -160,6 +162,54 @@ def prune( logger.error(f"Failed to schedule pruning task: {str(e)}", exc_info=True) return False + def prune_no_prefect( + self, + file_path: str, + source_endpoint: FileSystemEndpoint, + check_endpoint: FileSystemEndpoint | None = None, + ) -> bool: + """Prune a file or directory immediately using only local filesystem operations. + + Bypasses Prefect entirely — safe to call outside a running Prefect server. + Intended for standalone scripts such as cron-based cleanup jobs. + + :param file_path: Relative path of the file or directory to remove. + :param source_endpoint: The filesystem endpoint whose root_path anchors the deletion. + :param check_endpoint: If provided, abort unless the path also exists here. + :return: True if pruning succeeded, False otherwise. + """ + if not file_path: + logger.error("No file_path provided for pruning operation") + return False + + if not source_endpoint: + logger.error("No source_endpoint provided for pruning operation") + return False + + source_full_path = Path(source_endpoint.full_path(file_path)) + + if not source_full_path.exists(): + logger.warning(f"Path does not exist at source, skipping: {source_full_path}") + return False + + if check_endpoint is not None: + check_full_path = Path(check_endpoint.full_path(file_path)) + if not check_full_path.exists(): + logger.warning(f"Path not found at check endpoint {check_endpoint.name}, skipping: {file_path}") + return False + logger.info(f"Check endpoint confirmed: {check_full_path}") + + try: + if source_full_path.is_dir(): + shutil.rmtree(source_full_path) + else: + source_full_path.unlink() + logger.info(f"Pruned: {source_full_path}") + return True + except OSError as e: + logger.error(f"Failed to prune {source_full_path}: {e}") + return False + @flow(name="prune_filesystem_endpoint") def prune_filesystem_endpoint( @@ -200,7 +250,6 @@ def prune_filesystem_endpoint( # Now perform the pruning operation if os.path.isdir(source_full_path): logger.info(f"Pruning directory {relative_path}") - import shutil shutil.rmtree(source_full_path) else: logger.info(f"Pruning file {relative_path}") From 8879a59d3b4290761a11fefeb305768559adf67d Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 31 Mar 2026 15:50:24 -0700 Subject: [PATCH 06/11] Adding a --dry-run flag, also linting --- .../flows/bl832/prune_bl832recon1x.py | 90 ++++++++++++------- 1 file changed, 58 insertions(+), 32 deletions(-) diff --git a/orchestration/flows/bl832/prune_bl832recon1x.py b/orchestration/flows/bl832/prune_bl832recon1x.py index 343cdb00..6d8c5135 100644 --- a/orchestration/flows/bl832/prune_bl832recon1x.py +++ b/orchestration/flows/bl832/prune_bl832recon1x.py @@ -18,10 +18,11 @@ Requires Ubuntu 24.04 (kernel 6.8+, ext4) for reliable creation time via stat st_birthtime. """ -import logging -import subprocess +import argparse from datetime import datetime, timedelta, timezone +import logging from pathlib import Path +import subprocess from orchestration.config import BeamlineConfig from orchestration.prune_controller import FileSystemPruneController @@ -72,10 +73,10 @@ def _beam_specific_config(self) -> None: def get_creation_time(path: Path) -> datetime | None: - """Return the birth (creation) time of a file or directory. - - Uses st_birthtime via getattr — available on Ubuntu 24.04 (kernel 6.8+, ext4). - Returns None if creation time is unavailable or reported as zero. + """ + Get the creation time of a file or directory as a timezone-aware datetime. + Uses st_birthtime if available (Ubuntu 24.04+ with ext4), otherwise falls back to st_ctime. + Returns None if the creation time cannot be determined. Args: path: Path to the file or directory. @@ -85,16 +86,19 @@ def get_creation_time(path: Path) -> datetime | None: """ try: stat = path.stat() - creation_ts = getattr(stat, "st_birthtime", None) or stat.st_mtime # filesystem creation time - if not creation_ts: + # st_ctime = inode change time on Linux; reflects when file arrived on + # this filesystem regardless of source mtime. st_birthtime preferred if + # available (Ubuntu 24.04+ with ext4), but st_ctime is a reliable fallback. + ts = getattr(stat, "st_birthtime", None) or stat.st_ctime + if not ts: return None - return datetime.fromtimestamp(creation_ts, tz=timezone.utc) + return datetime.fromtimestamp(ts, tz=timezone.utc) except OSError as e: logger.warning(f"Could not stat {path}: {e}") return None -def prune_zarr_volumes(endpoint: FileSystemEndpoint, cutoff: datetime, config: BeamlineConfig) -> None: +def prune_zarr_volumes(endpoint: FileSystemEndpoint, cutoff: datetime, config: BeamlineConfig, dry_run: bool = False) -> None: """Remove Zarr volumes in the endpoint's root_path older than cutoff. Top-level directories prefixed with 'demo_' are preserved regardless of age. @@ -104,6 +108,10 @@ def prune_zarr_volumes(endpoint: FileSystemEndpoint, cutoff: datetime, config: B endpoint: FileSystemEndpoint representing the Zarr samples directory. cutoff: Datetime threshold; volumes created before this are removed. config: Beamline configuration passed to FileSystemPruneController. + dry_run: If True, preview deletions without removing anything. + + Returns: + None """ sample_dir = Path(endpoint.root_path) @@ -129,17 +137,24 @@ def prune_zarr_volumes(endpoint: FileSystemEndpoint, cutoff: datetime, config: B continue if creation < cutoff: - logger.info(f"Removing Zarr volume: {zarr_dir} (created {creation.date()})") - controller.prune_no_prefect( - file_path=zarr_dir.name, - source_endpoint=endpoint, - check_endpoint=None, - ) + if dry_run: + logger.info(f"[DRY RUN] Would remove Zarr volume: {zarr_dir} (created {creation.date()})") + else: + logger.info(f"Removing Zarr volume: {zarr_dir} (created {creation.date()})") + controller.prune_no_prefect( + file_path=zarr_dir.name, + source_endpoint=endpoint, + ) else: logger.info(f"Retaining: {zarr_dir} (created {creation.date()})") -def prune_scratch_endpoint(endpoint: FileSystemEndpoint, cutoff: datetime, config: BeamlineConfig) -> None: +def prune_scratch_endpoint( + endpoint: FileSystemEndpoint, + cutoff: datetime, + config: BeamlineConfig, + dry_run: bool = False +) -> None: """Recursively remove files in the endpoint's root_path created before cutoff. After removing old files, sweeps empty directories bottom-up. @@ -148,6 +163,7 @@ def prune_scratch_endpoint(endpoint: FileSystemEndpoint, cutoff: datetime, confi endpoint: FileSystemEndpoint representing the scratch directory to prune. cutoff: Datetime threshold; files created before this are removed. config: Beamline configuration passed to FileSystemPruneController. + dry_run: If True, preview deletions without removing anything. """ scratch_dir = Path(endpoint.root_path) @@ -169,21 +185,24 @@ def prune_scratch_endpoint(endpoint: FileSystemEndpoint, cutoff: datetime, confi continue if creation < cutoff: - logger.info(f"Removing file: {file} (created {creation.date()})") - controller.prune_no_prefect( - file_path=str(file.relative_to(scratch_dir)), - source_endpoint=endpoint, - check_endpoint=None, - ) + if dry_run: + logger.info(f"[DRY RUN] Would remove file: {file} (created {creation.date()})") + else: + logger.info(f"Removing file: {file} (created {creation.date()})") + controller.prune_no_prefect( + file_path=str(file.relative_to(scratch_dir)), + source_endpoint=endpoint, + ) # Sweep empty directories left behind, deepest-first - for directory in sorted(scratch_dir.rglob("*"), reverse=True): - if directory.is_dir() and not any(directory.iterdir()): - try: - directory.rmdir() - logger.info(f"Removed empty directory: {directory}") - except OSError as e: - logger.error(f"Failed to remove empty directory {directory}: {e}") + if not dry_run: + for directory in sorted(scratch_dir.rglob("*"), reverse=True): + if directory.is_dir() and not any(directory.iterdir()): + try: + directory.rmdir() + logger.info(f"Removed empty directory: {directory}") + except OSError as e: + logger.error(f"Failed to remove empty directory {directory}: {e}") logger.info(f"Scratch cleanup complete for {endpoint.name}") @@ -210,6 +229,13 @@ def prune_docker() -> None: def main() -> None: """Run all cleanup tasks for bl832recon1x.""" + parser = argparse.ArgumentParser(description="bl832recon1x periodic storage cleanup.") + parser.add_argument( + "--dry-run", + action="store_true", + help="Preview deletions without removing anything.", + ) + args = parser.parse_args() logging.basicConfig( level=logging.INFO, format="[%(asctime)s] %(levelname)s %(message)s", @@ -227,10 +253,10 @@ def main() -> None: logger.info(f"Pruning items created before {cutoff.date()}") logger.info("==========================================") - prune_zarr_volumes(SAMPLE_ENDPOINT, cutoff, config) + prune_zarr_volumes(SAMPLE_ENDPOINT, cutoff, config, dry_run=args.dry_run) for endpoint in SCRATCH_ENDPOINTS: - prune_scratch_endpoint(endpoint, cutoff, config) + prune_scratch_endpoint(endpoint, cutoff, config, dry_run=args.dry_run) # prune_docker() From 44913f7437942613a29a0c2ed141ab93e437e6cf Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 31 Mar 2026 15:52:52 -0700 Subject: [PATCH 07/11] logging --- orchestration/flows/bl832/prune_bl832recon1x.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/orchestration/flows/bl832/prune_bl832recon1x.py b/orchestration/flows/bl832/prune_bl832recon1x.py index 6d8c5135..5ad8628c 100644 --- a/orchestration/flows/bl832/prune_bl832recon1x.py +++ b/orchestration/flows/bl832/prune_bl832recon1x.py @@ -236,15 +236,15 @@ def main() -> None: help="Preview deletions without removing anything.", ) args = parser.parse_args() - logging.basicConfig( - level=logging.INFO, - format="[%(asctime)s] %(levelname)s %(message)s", + root_logger = logging.getLogger() + root_logger.setLevel(logging.INFO) + formatter = logging.Formatter( + fmt="[%(asctime)s] %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", - handlers=[ - logging.FileHandler(LOG_FILE), - logging.StreamHandler(), - ], ) + for handler in [logging.FileHandler(LOG_FILE), logging.StreamHandler()]: + handler.setFormatter(formatter) + root_logger.addHandler(handler) config = _LocalConfig() cutoff = datetime.now(tz=timezone.utc) - timedelta(days=PRUNE_AFTER_DAYS) From cf432211915808a138c3bbc788dd04c4b351ebf1 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 31 Mar 2026 15:57:35 -0700 Subject: [PATCH 08/11] less verbose logging, should report a summary at the directory level not per file --- orchestration/flows/bl832/prune_bl832recon1x.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/orchestration/flows/bl832/prune_bl832recon1x.py b/orchestration/flows/bl832/prune_bl832recon1x.py index 5ad8628c..ea8d362e 100644 --- a/orchestration/flows/bl832/prune_bl832recon1x.py +++ b/orchestration/flows/bl832/prune_bl832recon1x.py @@ -19,6 +19,7 @@ """ import argparse +from collections import defaultdict from datetime import datetime, timedelta, timezone import logging from pathlib import Path @@ -175,6 +176,8 @@ def prune_scratch_endpoint( controller = FileSystemPruneController(config) + files_by_dir: dict[Path, list[Path]] = defaultdict(list) + for file in sorted(scratch_dir.rglob("*")): if not file.is_file(): continue @@ -185,10 +188,14 @@ def prune_scratch_endpoint( continue if creation < cutoff: - if dry_run: - logger.info(f"[DRY RUN] Would remove file: {file} (created {creation.date()})") - else: - logger.info(f"Removing file: {file} (created {creation.date()})") + files_by_dir[file.parent].append(file) + + for parent_dir, files in sorted(files_by_dir.items()): + if dry_run: + logger.info(f"[DRY RUN] Would remove {len(files)} file(s) from {parent_dir}") + else: + logger.info(f"Removing {len(files)} file(s) from {parent_dir}") + for file in files: controller.prune_no_prefect( file_path=str(file.relative_to(scratch_dir)), source_endpoint=endpoint, From 196dcaf22e7a071a4ce268b117b2759add117e1a Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 31 Mar 2026 16:05:05 -0700 Subject: [PATCH 09/11] Adding file sizes and amounts to the logging --- .../flows/bl832/prune_bl832recon1x.py | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/orchestration/flows/bl832/prune_bl832recon1x.py b/orchestration/flows/bl832/prune_bl832recon1x.py index ea8d362e..3a340151 100644 --- a/orchestration/flows/bl832/prune_bl832recon1x.py +++ b/orchestration/flows/bl832/prune_bl832recon1x.py @@ -73,6 +73,11 @@ def _beam_specific_config(self) -> None: logger = logging.getLogger(__name__) +def _dir_size_mb(path: Path) -> float: + """Return the total size of all files under path in megabytes.""" + return sum(f.stat().st_size for f in path.rglob("*") if f.is_file()) / 1024 / 1024 + + def get_creation_time(path: Path) -> datetime | None: """ Get the creation time of a file or directory as a timezone-aware datetime. @@ -139,7 +144,8 @@ def prune_zarr_volumes(endpoint: FileSystemEndpoint, cutoff: datetime, config: B if creation < cutoff: if dry_run: - logger.info(f"[DRY RUN] Would remove Zarr volume: {zarr_dir} (created {creation.date()})") + size_mb = _dir_size_mb(zarr_dir) + logger.info(f"[DRY RUN] Would remove Zarr volume: {zarr_dir} ({size_mb:.1f} MB, created {creation.date()})") else: logger.info(f"Removing Zarr volume: {zarr_dir} (created {creation.date()})") controller.prune_no_prefect( @@ -176,7 +182,7 @@ def prune_scratch_endpoint( controller = FileSystemPruneController(config) - files_by_dir: dict[Path, list[Path]] = defaultdict(list) + files_by_dir: dict[Path, list[tuple[Path, datetime]]] = defaultdict(list) for file in sorted(scratch_dir.rglob("*")): if not file.is_file(): @@ -188,13 +194,21 @@ def prune_scratch_endpoint( continue if creation < cutoff: - files_by_dir[file.parent].append(file) + files_by_dir[file.parent].append((file, creation)) - for parent_dir, files in sorted(files_by_dir.items()): + for parent_dir, entries in sorted(files_by_dir.items()): + files = [f for f, _ in entries] + oldest = min(c.date() for _, c in entries) + newest = max(c.date() for _, c in entries) + total_mb = sum(f.stat().st_size for f in files) / 1024 / 1024 if dry_run: - logger.info(f"[DRY RUN] Would remove {len(files)} file(s) from {parent_dir}") + total_files = sum(1 for f in parent_dir.rglob("*") if f.is_file()) + logger.info( + f"[DRY RUN] Would remove {len(files)}/{total_files} file(s) ({total_mb:.1f} MB) " + f"from {parent_dir} (created {oldest} – {newest})" + ) else: - logger.info(f"Removing {len(files)} file(s) from {parent_dir}") + logger.info(f"Removing {len(files)} file(s) ({total_mb:.1f} MB) from {parent_dir}") for file in files: controller.prune_no_prefect( file_path=str(file.relative_to(scratch_dir)), From 884b261a3715b985fa3be1357c229cdaf60c76f7 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 31 Mar 2026 16:12:03 -0700 Subject: [PATCH 10/11] updating cronjob docstring --- orchestration/flows/bl832/prune_bl832recon1x.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/orchestration/flows/bl832/prune_bl832recon1x.py b/orchestration/flows/bl832/prune_bl832recon1x.py index 3a340151..29191482 100644 --- a/orchestration/flows/bl832/prune_bl832recon1x.py +++ b/orchestration/flows/bl832/prune_bl832recon1x.py @@ -5,15 +5,14 @@ storage saturation. Intended to run monthly as a cron job with root privileges. Cron setup (run once): - cd /home/bl832user/Documents/code/splash_flows + cd /home/bl832user/Documents/code/prune_recon1x/splash_flows python3 -m venv .venv source .venv/bin/activate pip install -e . sudo crontab -e Cron entry (runs at 2am on the 1st of each month): - 0 2 1 * * /home/bl832user/Documents/code/splash_flows/.venv/bin/python - /home/bl832user/Documents/code/splash_flows/orchestration/flows/bl832/prune_bl832recon1x.py + 0 2 1 * * cd /home/bl832user/Documents/code/prune_recon1x/splash_flows && /home/bl832user/Documents/code/prune_recon1x/splash_flows/.venv/bin/python -m orchestration.flows.bl832.prune_bl832recon1x Requires Ubuntu 24.04 (kernel 6.8+, ext4) for reliable creation time via stat st_birthtime. """ From f37fa53966005de4775682766ffe49a1c05a528d Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 31 Mar 2026 16:27:06 -0700 Subject: [PATCH 11/11] fixing pytests --- .../test_bl832/test_prune_bl832recon1x.py | 118 +++++++++++++----- .../flows/bl832/prune_bl832recon1x.py | 2 +- 2 files changed, 89 insertions(+), 31 deletions(-) diff --git a/orchestration/_tests/test_bl832/test_prune_bl832recon1x.py b/orchestration/_tests/test_bl832/test_prune_bl832recon1x.py index c5136b73..0ff6a699 100644 --- a/orchestration/_tests/test_bl832/test_prune_bl832recon1x.py +++ b/orchestration/_tests/test_bl832/test_prune_bl832recon1x.py @@ -18,8 +18,8 @@ # --------------------------------------------------------------------------- CUTOFF = datetime(2025, 1, 1, tzinfo=timezone.utc) -OLD_CREATION = datetime(2024, 6, 1, tzinfo=timezone.utc) # before cutoff — should be pruned -NEW_CREATION = datetime(2025, 6, 1, tzinfo=timezone.utc) # after cutoff — should be retained +OLD_CREATION = datetime(2024, 6, 1, tzinfo=timezone.utc) +NEW_CREATION = datetime(2025, 6, 1, tzinfo=timezone.utc) MODULE = "orchestration.flows.bl832.prune_bl832recon1x" @@ -71,10 +71,24 @@ def test_get_creation_time_returns_datetime_when_available(mocker) -> None: assert result == OLD_CREATION +def test_get_creation_time_falls_back_to_ctime(mocker) -> None: + """Falls back to st_ctime when st_birthtime is not available.""" + mock_stat = mocker.MagicMock(spec=["st_ctime"]) + mock_stat.st_ctime = 0 + mock_stat.st_ctime = OLD_CREATION.timestamp() + mock_path = mocker.MagicMock() + mock_path.stat.return_value = mock_stat + + result = get_creation_time(mock_path) + + assert result == OLD_CREATION + + def test_get_creation_time_returns_none_when_creation_time_is_zero(mocker) -> None: """Returns None when st_birthtime is zero (filesystem does not support creation time).""" mock_stat = mocker.MagicMock() - mock_stat.st_birthtime = 0 # filesystem does not expose creation time + mock_stat.st_birthtime = 0 + mock_stat.st_ctime = 0 mock_path = mocker.MagicMock() mock_path.stat.return_value = mock_stat @@ -85,7 +99,7 @@ def test_get_creation_time_returns_none_when_creation_time_is_zero(mocker) -> No def test_get_creation_time_returns_none_when_attribute_missing(mocker) -> None: """Returns None when creation time attribute is not present on the stat result.""" - mock_stat = mocker.MagicMock(spec=[]) # no attributes + mock_stat = mocker.MagicMock(spec=[]) mock_path = mocker.MagicMock() mock_path.stat.return_value = mock_stat @@ -112,7 +126,7 @@ def test_get_creation_time_returns_none_on_os_error(mocker) -> None: def test_prune_zarr_volumes_removes_old_volume( sample_endpoint: FileSystemEndpoint, mock_config, mocker ) -> None: - """Calls controller.prune for Zarr volumes created before the cutoff.""" + """Calls prune_no_prefect for Zarr volumes created before the cutoff.""" old_zarr = Path(sample_endpoint.root_path) / "old_scan" old_zarr.mkdir() @@ -122,17 +136,16 @@ def test_prune_zarr_volumes_removes_old_volume( prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config) - mock_controller.prune.assert_called_once_with( + mock_controller.prune_no_prefect.assert_called_once_with( file_path="old_scan", source_endpoint=sample_endpoint, - days_from_now=0, ) def test_prune_zarr_volumes_retains_new_volume( sample_endpoint: FileSystemEndpoint, mock_config, mocker ) -> None: - """Does not call controller.prune for Zarr volumes created after the cutoff.""" + """Does not call prune_no_prefect for Zarr volumes created after the cutoff.""" new_zarr = Path(sample_endpoint.root_path) / "new_scan" new_zarr.mkdir() @@ -142,13 +155,13 @@ def test_prune_zarr_volumes_retains_new_volume( prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config) - mock_controller.prune.assert_not_called() + mock_controller.prune_no_prefect.assert_not_called() def test_prune_zarr_volumes_skips_demo_prefix( sample_endpoint: FileSystemEndpoint, mock_config, mocker ) -> None: - """Does not call controller.prune for directories prefixed with 'demo_'.""" + """Does not call prune_no_prefect for directories prefixed with 'demo_'.""" demo_zarr = Path(sample_endpoint.root_path) / "demo_sample" demo_zarr.mkdir() @@ -158,13 +171,30 @@ def test_prune_zarr_volumes_skips_demo_prefix( prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config) - mock_controller.prune.assert_not_called() + mock_controller.prune_no_prefect.assert_not_called() + + +def test_prune_zarr_volumes_dry_run_does_not_delete( + sample_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call prune_no_prefect when dry_run=True.""" + old_zarr = Path(sample_endpoint.root_path) / "old_scan" + old_zarr.mkdir() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + mocker.patch(f"{MODULE}._dir_size_mb", return_value=100.0) + + prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config, dry_run=True) + + mock_controller.prune_no_prefect.assert_not_called() def test_prune_zarr_volumes_skips_when_creation_time_unavailable( sample_endpoint: FileSystemEndpoint, mock_config, mocker ) -> None: - """Does not call controller.prune when creation time cannot be determined.""" + """Does not call prune_no_prefect when creation time cannot be determined.""" zarr = Path(sample_endpoint.root_path) / "unknown_age" zarr.mkdir() @@ -174,13 +204,13 @@ def test_prune_zarr_volumes_skips_when_creation_time_unavailable( prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config) - mock_controller.prune.assert_not_called() + mock_controller.prune_no_prefect.assert_not_called() def test_prune_zarr_volumes_skips_files( sample_endpoint: FileSystemEndpoint, mock_config, mocker ) -> None: - """Does not call controller.prune for loose files in the sample directory.""" + """Does not call prune_no_prefect for loose files in the sample directory.""" loose_file = Path(sample_endpoint.root_path) / "stray_file.txt" loose_file.touch() @@ -190,7 +220,7 @@ def test_prune_zarr_volumes_skips_files( prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config) - mock_controller.prune.assert_not_called() + mock_controller.prune_no_prefect.assert_not_called() def test_prune_zarr_volumes_noop_when_directory_missing(mock_config, mocker) -> None: @@ -205,7 +235,7 @@ def test_prune_zarr_volumes_noop_when_directory_missing(mock_config, mocker) -> prune_zarr_volumes(endpoint, CUTOFF, mock_config) - mock_controller.prune.assert_not_called() + mock_controller.prune_no_prefect.assert_not_called() # --------------------------------------------------------------------------- @@ -216,7 +246,7 @@ def test_prune_zarr_volumes_noop_when_directory_missing(mock_config, mocker) -> def test_prune_scratch_endpoint_removes_old_file( scratch_endpoint: FileSystemEndpoint, mock_config, mocker ) -> None: - """Calls controller.prune for files created before the cutoff.""" + """Calls prune_no_prefect for files created before the cutoff.""" old_file = Path(scratch_endpoint.root_path) / "old_result.h5" old_file.touch() @@ -226,17 +256,16 @@ def test_prune_scratch_endpoint_removes_old_file( prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config) - mock_controller.prune.assert_called_once_with( + mock_controller.prune_no_prefect.assert_called_once_with( file_path="old_result.h5", source_endpoint=scratch_endpoint, - days_from_now=0, ) def test_prune_scratch_endpoint_retains_new_file( scratch_endpoint: FileSystemEndpoint, mock_config, mocker ) -> None: - """Does not call controller.prune for files created after the cutoff.""" + """Does not call prune_no_prefect for files created after the cutoff.""" new_file = Path(scratch_endpoint.root_path) / "new_result.h5" new_file.touch() @@ -246,13 +275,13 @@ def test_prune_scratch_endpoint_retains_new_file( prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config) - mock_controller.prune.assert_not_called() + mock_controller.prune_no_prefect.assert_not_called() def test_prune_scratch_endpoint_recurses_into_subdirectories( scratch_endpoint: FileSystemEndpoint, mock_config, mocker ) -> None: - """Calls controller.prune for old files in nested subdirectories.""" + """Calls prune_no_prefect for old files in nested subdirectories.""" nested = Path(scratch_endpoint.root_path) / "subdir" / "nested" nested.mkdir(parents=True) nested_file = nested / "data.h5" @@ -264,13 +293,28 @@ def test_prune_scratch_endpoint_recurses_into_subdirectories( prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config) - mock_controller.prune.assert_called_once_with( + mock_controller.prune_no_prefect.assert_called_once_with( file_path="subdir/nested/data.h5", source_endpoint=scratch_endpoint, - days_from_now=0, ) +def test_prune_scratch_endpoint_dry_run_does_not_delete( + scratch_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call prune_no_prefect when dry_run=True.""" + old_file = Path(scratch_endpoint.root_path) / "old_result.h5" + old_file.touch() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config, dry_run=True) + + mock_controller.prune_no_prefect.assert_not_called() + + def test_prune_scratch_endpoint_removes_empty_directories_after_pruning( scratch_endpoint: FileSystemEndpoint, mock_config, mocker ) -> None: @@ -280,13 +324,12 @@ def test_prune_scratch_endpoint_removes_empty_directories_after_pruning( old_file = subdir / "old.h5" old_file.touch() - # Simulate the controller actually deleting the file so the sweep has something to act on def fake_prune(**kwargs: object) -> bool: Path(scratch_endpoint.root_path, kwargs["file_path"]).unlink(missing_ok=True) return True mock_controller = mocker.MagicMock() - mock_controller.prune.side_effect = fake_prune + mock_controller.prune_no_prefect.side_effect = fake_prune mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) @@ -295,10 +338,26 @@ def fake_prune(**kwargs: object) -> bool: assert not subdir.exists() +def test_prune_scratch_endpoint_dry_run_does_not_remove_empty_directories( + scratch_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not sweep empty directories when dry_run=True.""" + subdir = Path(scratch_endpoint.root_path) / "subdir" + subdir.mkdir() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config, dry_run=True) + + assert subdir.exists() + + def test_prune_scratch_endpoint_skips_when_creation_time_unavailable( scratch_endpoint: FileSystemEndpoint, mock_config, mocker ) -> None: - """Does not call controller.prune when creation time cannot be determined.""" + """Does not call prune_no_prefect when creation time cannot be determined.""" f = Path(scratch_endpoint.root_path) / "unknown.h5" f.touch() @@ -308,7 +367,7 @@ def test_prune_scratch_endpoint_skips_when_creation_time_unavailable( prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config) - mock_controller.prune.assert_not_called() + mock_controller.prune_no_prefect.assert_not_called() def test_prune_scratch_endpoint_noop_when_directory_missing(mock_config, mocker) -> None: @@ -323,7 +382,7 @@ def test_prune_scratch_endpoint_noop_when_directory_missing(mock_config, mocker) prune_scratch_endpoint(endpoint, CUTOFF, mock_config) - mock_controller.prune.assert_not_called() + mock_controller.prune_no_prefect.assert_not_called() # --------------------------------------------------------------------------- @@ -354,5 +413,4 @@ def test_prune_docker_logs_on_failure(mocker) -> None: side_effect=subprocess.CalledProcessError(1, "docker", stderr="daemon not running"), ) - # Should not raise prune_docker() diff --git a/orchestration/flows/bl832/prune_bl832recon1x.py b/orchestration/flows/bl832/prune_bl832recon1x.py index 29191482..dd52eda5 100644 --- a/orchestration/flows/bl832/prune_bl832recon1x.py +++ b/orchestration/flows/bl832/prune_bl832recon1x.py @@ -94,7 +94,7 @@ def get_creation_time(path: Path) -> datetime | None: # st_ctime = inode change time on Linux; reflects when file arrived on # this filesystem regardless of source mtime. st_birthtime preferred if # available (Ubuntu 24.04+ with ext4), but st_ctime is a reliable fallback. - ts = getattr(stat, "st_birthtime", None) or stat.st_ctime + ts = getattr(stat, "st_birthtime", None) or getattr(stat, "st_ctime", None) if not ts: return None return datetime.fromtimestamp(ts, tz=timezone.utc)