diff --git a/orchestration/_tests/test_bl832/__init__.py b/orchestration/_tests/test_bl832/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orchestration/_tests/test_bl832/test_prune_bl832recon1x.py b/orchestration/_tests/test_bl832/test_prune_bl832recon1x.py new file mode 100644 index 00000000..0ff6a699 --- /dev/null +++ b/orchestration/_tests/test_bl832/test_prune_bl832recon1x.py @@ -0,0 +1,416 @@ +"""Tests for orchestration/flows/bl832/prune_bl832recon1x.py.""" + +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from orchestration.flows.bl832.prune_bl832recon1x import ( + get_creation_time, + prune_docker, + prune_scratch_endpoint, + prune_zarr_volumes, +) +from orchestration.transfer_endpoints import FileSystemEndpoint + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +CUTOFF = datetime(2025, 1, 1, tzinfo=timezone.utc) +OLD_CREATION = datetime(2024, 6, 1, tzinfo=timezone.utc) +NEW_CREATION = datetime(2025, 6, 1, tzinfo=timezone.utc) + +MODULE = "orchestration.flows.bl832.prune_bl832recon1x" + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def mock_config(mocker): + """Mocked BeamlineConfig — avoids importing Config832 and triggering Globus/Prefect init.""" + return mocker.MagicMock() + + +@pytest.fixture() +def sample_endpoint(tmp_path: Path) -> FileSystemEndpoint: + """FileSystemEndpoint pointing at a temporary Zarr samples directory.""" + return FileSystemEndpoint( + name="test_samples", + root_path=str(tmp_path), + uri="bl832recon1x.lbl.gov", + ) + + +@pytest.fixture() +def scratch_endpoint(tmp_path: Path) -> FileSystemEndpoint: + """FileSystemEndpoint pointing at a temporary scratch directory.""" + return FileSystemEndpoint( + name="test_scratch", + root_path=str(tmp_path), + uri="bl832recon1x.lbl.gov", + ) + + +# --------------------------------------------------------------------------- +# get_creation_time +# --------------------------------------------------------------------------- + + +def test_get_creation_time_returns_datetime_when_available(mocker) -> None: + """Returns a timezone-aware datetime when filesystem creation time is present.""" + mock_stat = mocker.MagicMock() + mock_stat.st_birthtime = OLD_CREATION.timestamp() + mock_path = mocker.MagicMock() + mock_path.stat.return_value = mock_stat + + result = get_creation_time(mock_path) + + assert result == OLD_CREATION + + +def test_get_creation_time_falls_back_to_ctime(mocker) -> None: + """Falls back to st_ctime when st_birthtime is not available.""" + mock_stat = mocker.MagicMock(spec=["st_ctime"]) + mock_stat.st_ctime = 0 + mock_stat.st_ctime = OLD_CREATION.timestamp() + mock_path = mocker.MagicMock() + mock_path.stat.return_value = mock_stat + + result = get_creation_time(mock_path) + + assert result == OLD_CREATION + + +def test_get_creation_time_returns_none_when_creation_time_is_zero(mocker) -> None: + """Returns None when st_birthtime is zero (filesystem does not support creation time).""" + mock_stat = mocker.MagicMock() + mock_stat.st_birthtime = 0 + mock_stat.st_ctime = 0 + mock_path = mocker.MagicMock() + mock_path.stat.return_value = mock_stat + + result = get_creation_time(mock_path) + + assert result is None + + +def test_get_creation_time_returns_none_when_attribute_missing(mocker) -> None: + """Returns None when creation time attribute is not present on the stat result.""" + mock_stat = mocker.MagicMock(spec=[]) + mock_path = mocker.MagicMock() + mock_path.stat.return_value = mock_stat + + result = get_creation_time(mock_path) + + assert result is None + + +def test_get_creation_time_returns_none_on_os_error(mocker) -> None: + """Returns None and logs a warning when stat raises OSError.""" + mock_path = mocker.MagicMock() + mock_path.stat.side_effect = OSError("permission denied") + + result = get_creation_time(mock_path) + + assert result is None + + +# --------------------------------------------------------------------------- +# prune_zarr_volumes +# --------------------------------------------------------------------------- + + +def test_prune_zarr_volumes_removes_old_volume( + sample_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Calls prune_no_prefect for Zarr volumes created before the cutoff.""" + old_zarr = Path(sample_endpoint.root_path) / "old_scan" + old_zarr.mkdir() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config) + + mock_controller.prune_no_prefect.assert_called_once_with( + file_path="old_scan", + source_endpoint=sample_endpoint, + ) + + +def test_prune_zarr_volumes_retains_new_volume( + sample_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call prune_no_prefect for Zarr volumes created after the cutoff.""" + new_zarr = Path(sample_endpoint.root_path) / "new_scan" + new_zarr.mkdir() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=NEW_CREATION) + + prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config) + + mock_controller.prune_no_prefect.assert_not_called() + + +def test_prune_zarr_volumes_skips_demo_prefix( + sample_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call prune_no_prefect for directories prefixed with 'demo_'.""" + demo_zarr = Path(sample_endpoint.root_path) / "demo_sample" + demo_zarr.mkdir() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config) + + mock_controller.prune_no_prefect.assert_not_called() + + +def test_prune_zarr_volumes_dry_run_does_not_delete( + sample_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call prune_no_prefect when dry_run=True.""" + old_zarr = Path(sample_endpoint.root_path) / "old_scan" + old_zarr.mkdir() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + mocker.patch(f"{MODULE}._dir_size_mb", return_value=100.0) + + prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config, dry_run=True) + + mock_controller.prune_no_prefect.assert_not_called() + + +def test_prune_zarr_volumes_skips_when_creation_time_unavailable( + sample_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call prune_no_prefect when creation time cannot be determined.""" + zarr = Path(sample_endpoint.root_path) / "unknown_age" + zarr.mkdir() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=None) + + prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config) + + mock_controller.prune_no_prefect.assert_not_called() + + +def test_prune_zarr_volumes_skips_files( + sample_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call prune_no_prefect for loose files in the sample directory.""" + loose_file = Path(sample_endpoint.root_path) / "stray_file.txt" + loose_file.touch() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_zarr_volumes(sample_endpoint, CUTOFF, mock_config) + + mock_controller.prune_no_prefect.assert_not_called() + + +def test_prune_zarr_volumes_noop_when_directory_missing(mock_config, mocker) -> None: + """Returns without error when the sample directory does not exist.""" + endpoint = FileSystemEndpoint( + name="missing", + root_path="/nonexistent/path", + uri="bl832recon1x.lbl.gov", + ) + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + + prune_zarr_volumes(endpoint, CUTOFF, mock_config) + + mock_controller.prune_no_prefect.assert_not_called() + + +# --------------------------------------------------------------------------- +# prune_scratch_endpoint +# --------------------------------------------------------------------------- + + +def test_prune_scratch_endpoint_removes_old_file( + scratch_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Calls prune_no_prefect for files created before the cutoff.""" + old_file = Path(scratch_endpoint.root_path) / "old_result.h5" + old_file.touch() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config) + + mock_controller.prune_no_prefect.assert_called_once_with( + file_path="old_result.h5", + source_endpoint=scratch_endpoint, + ) + + +def test_prune_scratch_endpoint_retains_new_file( + scratch_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call prune_no_prefect for files created after the cutoff.""" + new_file = Path(scratch_endpoint.root_path) / "new_result.h5" + new_file.touch() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=NEW_CREATION) + + prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config) + + mock_controller.prune_no_prefect.assert_not_called() + + +def test_prune_scratch_endpoint_recurses_into_subdirectories( + scratch_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Calls prune_no_prefect for old files in nested subdirectories.""" + nested = Path(scratch_endpoint.root_path) / "subdir" / "nested" + nested.mkdir(parents=True) + nested_file = nested / "data.h5" + nested_file.touch() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config) + + mock_controller.prune_no_prefect.assert_called_once_with( + file_path="subdir/nested/data.h5", + source_endpoint=scratch_endpoint, + ) + + +def test_prune_scratch_endpoint_dry_run_does_not_delete( + scratch_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call prune_no_prefect when dry_run=True.""" + old_file = Path(scratch_endpoint.root_path) / "old_result.h5" + old_file.touch() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config, dry_run=True) + + mock_controller.prune_no_prefect.assert_not_called() + + +def test_prune_scratch_endpoint_removes_empty_directories_after_pruning( + scratch_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Removes directories that are left empty after file deletion.""" + subdir = Path(scratch_endpoint.root_path) / "empty_after_prune" + subdir.mkdir() + old_file = subdir / "old.h5" + old_file.touch() + + def fake_prune(**kwargs: object) -> bool: + Path(scratch_endpoint.root_path, kwargs["file_path"]).unlink(missing_ok=True) + return True + + mock_controller = mocker.MagicMock() + mock_controller.prune_no_prefect.side_effect = fake_prune + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config) + + assert not subdir.exists() + + +def test_prune_scratch_endpoint_dry_run_does_not_remove_empty_directories( + scratch_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not sweep empty directories when dry_run=True.""" + subdir = Path(scratch_endpoint.root_path) / "subdir" + subdir.mkdir() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=OLD_CREATION) + + prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config, dry_run=True) + + assert subdir.exists() + + +def test_prune_scratch_endpoint_skips_when_creation_time_unavailable( + scratch_endpoint: FileSystemEndpoint, mock_config, mocker +) -> None: + """Does not call prune_no_prefect when creation time cannot be determined.""" + f = Path(scratch_endpoint.root_path) / "unknown.h5" + f.touch() + + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + mocker.patch(f"{MODULE}.get_creation_time", return_value=None) + + prune_scratch_endpoint(scratch_endpoint, CUTOFF, mock_config) + + mock_controller.prune_no_prefect.assert_not_called() + + +def test_prune_scratch_endpoint_noop_when_directory_missing(mock_config, mocker) -> None: + """Returns without error when the scratch directory does not exist.""" + endpoint = FileSystemEndpoint( + name="missing", + root_path="/nonexistent/path", + uri="bl832recon1x.lbl.gov", + ) + mock_controller = mocker.MagicMock() + mocker.patch(f"{MODULE}.FileSystemPruneController", return_value=mock_controller) + + prune_scratch_endpoint(endpoint, CUTOFF, mock_config) + + mock_controller.prune_no_prefect.assert_not_called() + + +# --------------------------------------------------------------------------- +# prune_docker +# --------------------------------------------------------------------------- + + +def test_prune_docker_runs_all_commands(mocker) -> None: + """Calls subprocess.run for each of the three Docker prune commands.""" + mock_run = mocker.patch(f"{MODULE}.subprocess.run") + mock_run.return_value = mocker.MagicMock(stdout="", returncode=0) + + prune_docker() + + assert mock_run.call_count == 3 + calls = [c.args[0] for c in mock_run.call_args_list] + assert ["docker", "image", "prune", "-af"] in calls + assert ["docker", "container", "prune", "-f"] in calls + assert ["docker", "builder", "prune", "-f"] in calls + + +def test_prune_docker_logs_on_failure(mocker) -> None: + """Logs an error without raising when a Docker command fails.""" + import subprocess + + mocker.patch( + f"{MODULE}.subprocess.run", + side_effect=subprocess.CalledProcessError(1, "docker", stderr="daemon not running"), + ) + + prune_docker() diff --git a/orchestration/flows/bl832/prune_bl832recon1x.py b/orchestration/flows/bl832/prune_bl832recon1x.py new file mode 100644 index 00000000..dd52eda5 --- /dev/null +++ b/orchestration/flows/bl832/prune_bl832recon1x.py @@ -0,0 +1,289 @@ +#!/usr/bin/env python3 +"""Periodic storage cleanup for bl832recon1x. + +Removes old Zarr volumes, scratch data, and Docker artifacts to prevent +storage saturation. Intended to run monthly as a cron job with root privileges. + +Cron setup (run once): + cd /home/bl832user/Documents/code/prune_recon1x/splash_flows + python3 -m venv .venv + source .venv/bin/activate + pip install -e . + sudo crontab -e + +Cron entry (runs at 2am on the 1st of each month): + 0 2 1 * * cd /home/bl832user/Documents/code/prune_recon1x/splash_flows && /home/bl832user/Documents/code/prune_recon1x/splash_flows/.venv/bin/python -m orchestration.flows.bl832.prune_bl832recon1x + +Requires Ubuntu 24.04 (kernel 6.8+, ext4) for reliable creation time via stat st_birthtime. +""" + +import argparse +from collections import defaultdict +from datetime import datetime, timedelta, timezone +import logging +from pathlib import Path +import subprocess + +from orchestration.config import BeamlineConfig +from orchestration.prune_controller import FileSystemPruneController +from orchestration.transfer_endpoints import FileSystemEndpoint + +# --------------------------------------------------------------------------- +# Configuration — edit here to adjust behaviour +# --------------------------------------------------------------------------- + + +class _LocalConfig(BeamlineConfig): + """Minimal config for local-only cleanup — no Globus connections.""" + def __init__(self) -> None: + super().__init__(beamline_id="8.3.2") + + def _beam_specific_config(self) -> None: + pass # no Globus, no transfer client + + +SAMPLE_ENDPOINT = FileSystemEndpoint( + name="bl832recon1x_samples", + root_path="/home/bl832user/Documents/example_samples", + uri="bl832recon1x.lbl.gov", +) + +# Add or remove endpoints to apply recursive file pruning. +# Each entry uses FileSystemEndpoint so root_path, name, and uri stay consistent +# with the rest of splash_flows. +SCRATCH_ENDPOINTS: list[FileSystemEndpoint] = [ + FileSystemEndpoint( + name="bl832recon1x_scratch", + root_path="/home/bl832user/Documents/data/scratch", + uri="bl832recon1x.lbl.gov", + ), + # FileSystemEndpoint( + # name="bl832recon1x_new_folder", + # root_path="/home/bl832user/Documents/some/new/folder", + # uri="bl832recon1x.lbl.gov", + # ), +] + +PRUNE_AFTER_DAYS = 30 +LOG_FILE = Path("/tmp/bl832_cleanup.log") + +# --------------------------------------------------------------------------- + +logger = logging.getLogger(__name__) + + +def _dir_size_mb(path: Path) -> float: + """Return the total size of all files under path in megabytes.""" + return sum(f.stat().st_size for f in path.rglob("*") if f.is_file()) / 1024 / 1024 + + +def get_creation_time(path: Path) -> datetime | None: + """ + Get the creation time of a file or directory as a timezone-aware datetime. + Uses st_birthtime if available (Ubuntu 24.04+ with ext4), otherwise falls back to st_ctime. + Returns None if the creation time cannot be determined. + + Args: + path: Path to the file or directory. + + Returns: + A timezone-aware datetime of the creation time, or None if unavailable. + """ + try: + stat = path.stat() + # st_ctime = inode change time on Linux; reflects when file arrived on + # this filesystem regardless of source mtime. st_birthtime preferred if + # available (Ubuntu 24.04+ with ext4), but st_ctime is a reliable fallback. + ts = getattr(stat, "st_birthtime", None) or getattr(stat, "st_ctime", None) + if not ts: + return None + return datetime.fromtimestamp(ts, tz=timezone.utc) + except OSError as e: + logger.warning(f"Could not stat {path}: {e}") + return None + + +def prune_zarr_volumes(endpoint: FileSystemEndpoint, cutoff: datetime, config: BeamlineConfig, dry_run: bool = False) -> None: + """Remove Zarr volumes in the endpoint's root_path older than cutoff. + + Top-level directories prefixed with 'demo_' are preserved regardless of age. + Each Zarr volume is removed atomically by FileSystemPruneController. + + Args: + endpoint: FileSystemEndpoint representing the Zarr samples directory. + cutoff: Datetime threshold; volumes created before this are removed. + config: Beamline configuration passed to FileSystemPruneController. + dry_run: If True, preview deletions without removing anything. + + Returns: + None + """ + sample_dir = Path(endpoint.root_path) + + if not sample_dir.is_dir(): + logger.warning(f"Sample dir does not exist, skipping: {sample_dir}") + return + + logger.info(f"--- Zarr volume cleanup: {endpoint.name} ({sample_dir}) ---") + + controller = FileSystemPruneController(config) + + for zarr_dir in sorted(sample_dir.iterdir()): + if not zarr_dir.is_dir(): + continue + + if zarr_dir.name.startswith("demo_"): + logger.info(f"Skipping demo volume: {zarr_dir}") + continue + + creation = get_creation_time(zarr_dir) + if creation is None: + logger.warning(f"Creation time unavailable for {zarr_dir} — skipping") + continue + + if creation < cutoff: + if dry_run: + size_mb = _dir_size_mb(zarr_dir) + logger.info(f"[DRY RUN] Would remove Zarr volume: {zarr_dir} ({size_mb:.1f} MB, created {creation.date()})") + else: + logger.info(f"Removing Zarr volume: {zarr_dir} (created {creation.date()})") + controller.prune_no_prefect( + file_path=zarr_dir.name, + source_endpoint=endpoint, + ) + else: + logger.info(f"Retaining: {zarr_dir} (created {creation.date()})") + + +def prune_scratch_endpoint( + endpoint: FileSystemEndpoint, + cutoff: datetime, + config: BeamlineConfig, + dry_run: bool = False +) -> None: + """Recursively remove files in the endpoint's root_path created before cutoff. + + After removing old files, sweeps empty directories bottom-up. + + Args: + endpoint: FileSystemEndpoint representing the scratch directory to prune. + cutoff: Datetime threshold; files created before this are removed. + config: Beamline configuration passed to FileSystemPruneController. + dry_run: If True, preview deletions without removing anything. + """ + scratch_dir = Path(endpoint.root_path) + + if not scratch_dir.is_dir(): + logger.warning(f"Scratch dir does not exist, skipping: {endpoint.name} ({scratch_dir})") + return + + logger.info(f"--- Scratch data cleanup: {endpoint.name} ({scratch_dir}) ---") + + controller = FileSystemPruneController(config) + + files_by_dir: dict[Path, list[tuple[Path, datetime]]] = defaultdict(list) + + for file in sorted(scratch_dir.rglob("*")): + if not file.is_file(): + continue + + creation = get_creation_time(file) + if creation is None: + logger.warning(f"Creation time unavailable for {file} — skipping") + continue + + if creation < cutoff: + files_by_dir[file.parent].append((file, creation)) + + for parent_dir, entries in sorted(files_by_dir.items()): + files = [f for f, _ in entries] + oldest = min(c.date() for _, c in entries) + newest = max(c.date() for _, c in entries) + total_mb = sum(f.stat().st_size for f in files) / 1024 / 1024 + if dry_run: + total_files = sum(1 for f in parent_dir.rglob("*") if f.is_file()) + logger.info( + f"[DRY RUN] Would remove {len(files)}/{total_files} file(s) ({total_mb:.1f} MB) " + f"from {parent_dir} (created {oldest} – {newest})" + ) + else: + logger.info(f"Removing {len(files)} file(s) ({total_mb:.1f} MB) from {parent_dir}") + for file in files: + controller.prune_no_prefect( + file_path=str(file.relative_to(scratch_dir)), + source_endpoint=endpoint, + ) + + # Sweep empty directories left behind, deepest-first + if not dry_run: + for directory in sorted(scratch_dir.rglob("*"), reverse=True): + if directory.is_dir() and not any(directory.iterdir()): + try: + directory.rmdir() + logger.info(f"Removed empty directory: {directory}") + except OSError as e: + logger.error(f"Failed to remove empty directory {directory}: {e}") + + logger.info(f"Scratch cleanup complete for {endpoint.name}") + + +def prune_docker() -> None: + """Remove unused Docker images, stopped containers, and build cache.""" + logger.info("--- Docker cleanup ---") + + commands: list[list[str]] = [ + ["docker", "image", "prune", "-af"], + ["docker", "container", "prune", "-f"], + ["docker", "builder", "prune", "-f"], + ] + + for cmd in commands: + logger.info(f"Running: {' '.join(cmd)}") + try: + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + if result.stdout.strip(): + logger.info(result.stdout.strip()) + except subprocess.CalledProcessError as e: + logger.error(f"Docker command failed: {' '.join(cmd)}\n{e.stderr.strip()}") + + +def main() -> None: + """Run all cleanup tasks for bl832recon1x.""" + parser = argparse.ArgumentParser(description="bl832recon1x periodic storage cleanup.") + parser.add_argument( + "--dry-run", + action="store_true", + help="Preview deletions without removing anything.", + ) + args = parser.parse_args() + root_logger = logging.getLogger() + root_logger.setLevel(logging.INFO) + formatter = logging.Formatter( + fmt="[%(asctime)s] %(levelname)s %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + for handler in [logging.FileHandler(LOG_FILE), logging.StreamHandler()]: + handler.setFormatter(formatter) + root_logger.addHandler(handler) + config = _LocalConfig() + cutoff = datetime.now(tz=timezone.utc) - timedelta(days=PRUNE_AFTER_DAYS) + + logger.info("==========================================") + logger.info("Starting bl832 cleanup") + logger.info(f"Pruning items created before {cutoff.date()}") + logger.info("==========================================") + + prune_zarr_volumes(SAMPLE_ENDPOINT, cutoff, config, dry_run=args.dry_run) + + for endpoint in SCRATCH_ENDPOINTS: + prune_scratch_endpoint(endpoint, cutoff, config, dry_run=args.dry_run) + + # prune_docker() + + logger.info("==========================================") + logger.info("Cleanup complete") + logger.info("==========================================") + + +if __name__ == "__main__": + main() diff --git a/orchestration/prune_controller.py b/orchestration/prune_controller.py index 4d97826d..50783ef3 100644 --- a/orchestration/prune_controller.py +++ b/orchestration/prune_controller.py @@ -3,6 +3,8 @@ from enum import Enum import logging import os +from pathlib import Path +import shutil from typing import Generic, Optional, TypeVar from prefect import flow @@ -160,6 +162,54 @@ def prune( logger.error(f"Failed to schedule pruning task: {str(e)}", exc_info=True) return False + def prune_no_prefect( + self, + file_path: str, + source_endpoint: FileSystemEndpoint, + check_endpoint: FileSystemEndpoint | None = None, + ) -> bool: + """Prune a file or directory immediately using only local filesystem operations. + + Bypasses Prefect entirely — safe to call outside a running Prefect server. + Intended for standalone scripts such as cron-based cleanup jobs. + + :param file_path: Relative path of the file or directory to remove. + :param source_endpoint: The filesystem endpoint whose root_path anchors the deletion. + :param check_endpoint: If provided, abort unless the path also exists here. + :return: True if pruning succeeded, False otherwise. + """ + if not file_path: + logger.error("No file_path provided for pruning operation") + return False + + if not source_endpoint: + logger.error("No source_endpoint provided for pruning operation") + return False + + source_full_path = Path(source_endpoint.full_path(file_path)) + + if not source_full_path.exists(): + logger.warning(f"Path does not exist at source, skipping: {source_full_path}") + return False + + if check_endpoint is not None: + check_full_path = Path(check_endpoint.full_path(file_path)) + if not check_full_path.exists(): + logger.warning(f"Path not found at check endpoint {check_endpoint.name}, skipping: {file_path}") + return False + logger.info(f"Check endpoint confirmed: {check_full_path}") + + try: + if source_full_path.is_dir(): + shutil.rmtree(source_full_path) + else: + source_full_path.unlink() + logger.info(f"Pruned: {source_full_path}") + return True + except OSError as e: + logger.error(f"Failed to prune {source_full_path}: {e}") + return False + @flow(name="prune_filesystem_endpoint") def prune_filesystem_endpoint( @@ -200,7 +250,6 @@ def prune_filesystem_endpoint( # Now perform the pruning operation if os.path.isdir(source_full_path): logger.info(f"Pruning directory {relative_path}") - import shutil shutil.rmtree(source_full_path) else: logger.info(f"Pruning file {relative_path}")