Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/bundled_models/persistence/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# SCM syntax highlighting & preventing 3-way merges
pixi.lock merge=binary linguist-language=YAML linguist-generated=true -diff
5 changes: 5 additions & 0 deletions packages/bundled_models/persistence/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# pixi environments
.zig-cache
.pixi/*
!.pixi/config.toml
report.xml
45 changes: 45 additions & 0 deletions packages/bundled_models/persistence/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Persistence Model for use with the PyEarthTools Package

**TODO: description**

## Installation

Clone the repository, then run
```shell
pip install -e .
```

## Training

No training is required for this model. It computes persistence on-the-fly using historical data loaded via the PET pipeline.

## Predictions / Inference

You can generate persistence values out of the box using the `pet predict` command line API, or by using a Jupyter Notebook as demonstrated in the tutorial gallery.

```shell
pet predict
```

and `Development/Persistence` should be visible.

If so, you can now run some inference.

```shell
pet predict --model Development/Persistence
```

When running the command, it will prompt for other required arguments.

**TODO: description of required arguments**


#### Example

```shell
pet predict --model Development/Persistence # TODO
```

## Acknowledgments

Not applicable. Heuristically developed.
17 changes: 17 additions & 0 deletions packages/bundled_models/persistence/build.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
const std = @import("std");

pub fn build(b: *std.Build) void {
const target = b.standardTargetOptions(.{});
const optimize = b.standardOptimizeOption(.{});
const lib_persistence_zig = b.addLibrary(.{
.name = "persistence_zig",
.linkage = .dynamic,
.root_module = b.createModule(.{
.root_source_file = b.path("src/lib/zig/lib.zig"),
.target = target,
.optimize = optimize,
}),
});
lib_persistence_zig.linkLibC();
b.installArtifact(lib_persistence_zig);
}
109 changes: 109 additions & 0 deletions packages/bundled_models/persistence/examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
## Overview & Philosophy

Examples in this folder serve as patterns and architectural blueprints for library usage. They are intended to provide a starting point rather than production-ready, optimized code.

* **Not Optimal:** These examples represent "worst-case scenarios" or basic implementations. Assume they are inefficient.
* **Iterative Improvement:** If you find a better way to perform a task, commit it to the codebase and use it to forge a new, improved example for future users.
* **Goal:** The objective is functional and *functioning* code. Current benchmarks take 5 minutes for 8 time instances; verification requires better performance.

## Technical Context: Persistence Models

Persistence models (statistical methods like mean, median, etc.) differ significantly from inference models (pre-trained weights) in computational requirements.

### Comparison: Persistence vs. Inference

| Attribute | Persistence Models | Inference Models |
| :--- | :--- | :--- |
| **Hardware** | CPU only (No GPU usage) | GPU Accelerated (Tensor calculations) |
| **Data Requirement** | Requires extensive historical data | Weights encode historical data |
| **Performance** | Slower than GPU inference | Faster due to weight encoding |
| **Parallelism** | Avoids existing paradigms (e.g., Dask) if data is associated with them | Utilizes standard parallel paradigms |
| **Chunking** | Spatial (2D) preferred | Temporal (Time) preferred |

**Why this is a pain point:** Software cannot solve all storage and loading inefficiencies. Hardware and platform-specific storage paradigms are often the root cause. While libraries can improve data processing predictability, they cannot universally solve nuanced data loading issues.

## Execution Modes

The examples are organized around specific execution paradigms. Understanding these modes is critical to selecting the correct example for your environment.

### Core Concepts

* **`pet-pipeline` (Default):** The library pipeline retrieves file information (indexing).
* *Note:* Retrieving file metadata is less costly than loading raw data for arbitrarily chunked files.
* **`standalone` (Custom Loader):** The user is responsible for data loading.
* The `pet-pipeline` provides the indexing/accessor, but the actual data is fetched via custom logic.
* **`mp` (Multiprocessing):**
* `py`: Uses Python processes (disables Dask).
* `1p`: Single worker (serial processing).
* **`<backend_name>` (e.g., `zig`):** Backend-specific computation.
* Assumption: Backend ingests chunks from the `pet-pipeline` and chunking is done on-the-fly.
* *Note:* This differs from expensive Xarray rechunking operations.

### Execution Matrix

```mermaid
flowchart TD
A[Start] --> B{Data Loading Strategy};

B -- Standard --> C[pet-pipeline<br>Retrieves Indexing];
B -- Custom --> D[standalone<br>User loads Data];

C --> E{Computation Strategy};
D --> E;

E -- Max Python Compatibility --> F[py<br>Processes];
E -- Max Performance/Quantized --> G[1p+zig<br>Custom Backend];
E -- Hybrid/Parallel --> H[mp+rust<br>Rust Backend];
E -- Stability Testing --> I[1p<br>Single Worker];

F --> J[Use Case: Standard ML workflows];
G --> K[Use Case: Quantized/In-memory];
H --> L[Use Case: Hybrid Compute];
I --> M[Use Case: Testing/Debugging];
```

### Selection Guide

> **NOTE:** Not all combinations are implemented. Use the following logic to select the correct example:

| Scenario | Recommended Configuration | Reasoning |
| :--- | :--- | :--- |
| **Testing / Simple Methods** | `1p + py` | Minimal overhead, high compatibility. |
| **High Perf / Quantized / In-Memory** | `1p + zig + standalone` | Enables SIMD/efficient code and quantization (e.g., 4-bit representation). |
| **Hybrid Compute** | `mp + rust` | PET pipeline for data retrieval, Rust for computation. |
| **Platform Constraints** | `standalone` | Required if you need fine-grained control or if the platform lacks multiprocessing support (e.g., restricted environments). |
| **Backend Control** | `<backend>` | Required if you need custom computation logic (e.g., Numpy vs. Zig). |

## Available Examples

### Linux / HPC Environment

These examples are optimized for Linux systems (e.g., RHEL8, Arch Linux) typically running on HPC nodes.

| Filename | Description | Execution Context |
| :--- | :--- | :--- |
| `nci_py_mp.py` | Multiprocessing with Python on NCI. Uses **PET pipeline**. | HPC / Linux |
| `nci_py_mp_standalone.py` | Multiprocessing with Python on NCI. Uses **adhoc loading**. | HPC / Linux |
| `anylinux_py_mp.py` | Multiprocessing with Python. Uses **PET pipeline**. | Any Linux (tested Arch) |
| `anylinux_py_standalone.py` | Multiprocessing with Python. Uses **adhoc loading**. | Any Linux |

### General / Local Environment

These examples focus on portability across different architectures and operating systems.

| Filename | Description | Execution Context |
| :--- | :--- | :--- |
| `any_py_1p.py` | Sequential processing with Python. Best for **portability** (Windows/Mac/Linux). | Any OS / Architecture |

### Experimental / Backend Specific

These examples utilize specific backends (e.g., Zig) and may require additional C libraries or specific OS support.

| Filename | Description | Notes |
| :--- | :--- | :--- |
| `zigc.py` | Contains various approaches using the **Zig backend** for computation. | **Linux only**. Tested with parallel HDF5 loader, single-threaded, and NCI contexts. Use at your own risk. |

> **Resources:** Refer to the following technical documentation for deeper understanding of data storage and loading nuances:
> * [ATPESC 2023: Principles of HPC I/O](https://extremecomputingtraining.anl.gov/wp-content/uploads/sites/96/2023/08/ATPESC-2023-Track-7-Talk-2-carns-io-principles.pdf)
> * [NCSA HDF5 Introduction](https://learn.ncsa.illinois.edu/pluginfile.php/20067/mod_label/intro/HDF_NCSA_3_2024.pdf)

156 changes: 156 additions & 0 deletions packages/bundled_models/persistence/examples/nci_py_mp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""
This example is a WORK IN PROGRESS.

Use multiprocessing to delegate chunks to various processes, in order to compute the persistence
method in an embarassingly parallel fashion.

PROS:
- Hooks up to PET pipelines easily.
- Good for small-medium datasets (<a few gigabytes unpacked).
CONS:
- Not good for largescale operational use.
- Taylored for NCI use only.
- Windows/mac support not guarenteed.
- Data MUST be stored in a chunked fashion for multiprocessing to have any benefit, otherwise
everything will be loaded in memory anyway, and its better to let numpy handle everything (set
workers=1 and num_chunks=1). TODO: there likely should be a smart way to detect this.
- Slow.
"""


def _mock_dataset():
"""
A mock dataset used for testing. This is a relatively small array so it may not benefit much
from parallelism. The actual example should be running on real satellite data.

x, y, time, ensembles, levels
(intentional) suboptimal placement with ensembles/levels at the end.
- 500, 500 is a reasonable size for a region
- 24 => typical of hourly data
- 3 ensembles
- 3 levels
"""
# these could also be in main guard, but just being explicit
import numpy as np
import xarray as xr

# --- Uh Oh! ---
# shape_input1 = (500, 500, 24, 99, 168)
# ---
# NOTE: setting the above would give you this impressive warning which is worth understanding:
# ```
# numpy._core._exceptions._ArrayMemoryError: Unable to allocate 744. GiB for an array with
# shape (500, 500, 24, 99, 128) and data type float64
# ```
# The reason why is important is that you may _think_ this is a reasonably small dataset, and on
# disk it may actually just be stored as 1GB or even less maybe 20MB the reason is:
# 1. bit packing
# 2. compression
# 3. np.nan is not the same as "nothing", otherwise the structural integrity of the array will
# collapse. Sparse arrays will need a sparse array paradigm, but that will also make things
# complicated in the backend.
# 4. wait but this is mocking it in memory, my dataset will be chunked!
# ---
shape_input1 = (500, 500, 24, 10, 24)
dimnames1 = ("x1", "y1", "time", "n_ens", "levels")
dimnames2 = ("x2", "y2", "time")

# without loss of generality specify two variables, they can have varying dimensions.
# for arguments sake, change the shape of the second.
shape_input2 = (400, 400, 24)
name_varA = "varA"
name_varB = "varB"

# set unique rng context and constant seed for reprodicibility
rng_context = np.random.default_rng(seed=42)
arr1 = rng_context.random(list(shape_input1))
arr2 = rng_context.random(list(shape_input2))

# make dataset from numpy data above and dims, assume the dim names are common and taken from left
# to right, i.e. either A in B and/or B in A without loss of generality.
ds_mock = xr.Dataset(
{
name_varA: xr.DataArray(arr1, dims=dimnames1),
name_varB: xr.DataArray(arr2, dims=dimnames2),
}
)

return ds_mock


def run_example(ds_input, use_real=True, num_workers=1, num_chunks=1):
# TODO: use library directly under main guard
print("Example: python multiprocessing on nci.")
print("---")
print("NOTE: this example requires appropriate project data accessible")
print(" it currently uses the satellite data (TODO: which nci group?)")

# NOTE: scoped import so that context isn't leaked - being safe here though it is likely okay
# for this to be on the global scope or at the very least main guarded is sufficient.
from persistence import persistence_impl

if use_real:
NotImplementedError("mechanism to run real satellite data not yet implemented")
else:
# ---
# some printing logic for display/debugging
print("using mock data... use_real=False")
print("\n--- mocking data ---")
for v, da in ds_input.data_vars.items():
print("...")
for i, (n, s) in enumerate(zip(da.dims, da.shape)):
print(f"{v}:shape={n}={s}")
print("---")
# ---

# ---
# TODO:
# There is a flaw here if time index is not always the first index, since there is no
# guarantee that the datasets share the array dimensions - this needs to be rectified.
#
# This can be done by requesting named index for time at the higher level api instead of the
# integer directly. This is only really necessary for datasets and is infact insufficient
# for numpy.
#
# We still need `idx_time_dim` for `numpy` support, so it'll have to be a mutually
# exclusive argument.
#
# For testing purposes, this is a lower priority since the user can always just stick to
# data arrays and computing each variable separately in a for loop wrapper with minimal loss
# to performance, since the variable count is not likely to be very large.
import time

ts = time.time()
print(f"ts={ts}")
ds_output = persistence_impl.predict(
ds_input,
idx_time_dim=list(ds_input.dims).index("time"),
num_workers=num_workers,
# 20 chunks/2 workers => 10% of the data is loaded at any given time (assuming optimal chunking)
num_chunks=num_chunks,
method="median_of_three",
simple_impute=False,
backend_type="numpy",
)

# ---
te = time.time()
print(f"te={te}")
print(f"total={te - ts}s")
print(f"size={ds_output.sizes}")
print("---")
print(ds_output)


if __name__ == "__main__":
import multiprocessing

# CAUTION: windows/mac - this may not work, use num_workers=1 instead
try:
multiprocessing.set_start_method("forkserver")
print("Start method set to 'forkserver'")
except RuntimeError as e:
print(f"Could not set start method: {e}")

ds_input = _mock_dataset()
run_example(ds_input, use_real=False, num_workers=1, num_chunks=1)
Loading