From dddc98e305e193916846c41302168e386766295d Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Mon, 25 Aug 2025 11:18:02 +0200 Subject: [PATCH 01/22] Draft parallel implementation --- hendrics/parallel.py | 461 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 461 insertions(+) create mode 100644 hendrics/parallel.py diff --git a/hendrics/parallel.py b/hendrics/parallel.py new file mode 100644 index 00000000..da457338 --- /dev/null +++ b/hendrics/parallel.py @@ -0,0 +1,461 @@ +import os +import subprocess as sp +import time +from functools import partial +from multiprocessing import Pool + +import matplotlib.pyplot as plt +import numpy as np +from mpi4py import MPI +from stingray import AveragedPowerspectrum, EventList +from stingray.fourier import positive_fft_bins +from stingray.gti import time_intervals_from_gtis +from stingray.io import FITSTimeseriesReader +from stingray.loggingconfig import logger +from stingray.utils import histogram + + +def get_data_intervals(interval_idxs, info=None, fname=None, sample_time=None): + """ + Generate light curves for specified time intervals from event data. + Parameters + ---------- + interval_idxs : array-like + Indices specifying which intervals to extract from `info["interval_times"]`. + info : dict, optional + Dictionary containing metadata, must include "interval_times" key with time intervals. + fname : str, optional + Path to the FITS file containing event data. + sample_time : float, optional + The time resolution (bin width) for the light curve histogram. + Yields + ------ + lc : array-like + Histogrammed light curve for each specified interval. + Notes + ----- + - Uses FITSTimeseriesReader to read event data from the FITS file. + - Each yielded light curve corresponds to one interval in `interval_idxs`. + - The number of bins is determined by the interval duration and `sample_time`. + """ + + tsreader = FITSTimeseriesReader(fname, output_class=EventList) + time_intervals = info["interval_times"][interval_idxs] + if np.shape(time_intervals) == (2,): + time_intervals = [time_intervals] + + # This creates a generator of event lists + event_lists = tsreader.filter_at_time_intervals(time_intervals) + + for ev, t_int in zip(event_lists, time_intervals): + nbin = int(np.rint((t_int[1] - t_int[0]) / sample_time)) + lc = histogram(ev.time, bins=nbin, range=(t_int[0], t_int[1])) + yield lc + + +def single_rank_intervals(this_ranks_intervals, sample_time=None, info=None, fname=None): + """ + Generate an averaged powerspectrum from light curve intervals for a single rank. + Parameters + ---------- + this_ranks_intervals : list or array-like + List of intervals (e.g., time ranges) assigned to the current rank. + sample_time : float, optional + The time resolution (bin size) for the light curve data. + info : dict, optional + Dictionary containing metadata about the intervals, including "interval_times". + fname : str or None, optional + Filename or path to the data source, if required by `get_data_intervals`. + Returns + ------- + pds : AveragedPowerspectrum + The averaged powerspectrum computed from the light curve intervals. + nbin : int + Number of bins in each interval, calculated from the interval duration and sample time. + Notes + ----- + This function extracts light curve data for the specified intervals, computes the number of bins, + and returns the averaged powerspectrum using the "leahy" normalization. + """ + + # print(kwargs) + t_int = info["interval_times"][0] + nbin = int(np.rint((t_int[1] - t_int[0]) / sample_time)) + lc_iterable = get_data_intervals( + this_ranks_intervals, info=info, fname=fname, sample_time=sample_time + ) + intv = info["interval_times"][0] + segment_size = intv[1] - intv[0] + pds = AveragedPowerspectrum.from_lc_iterable( + lc_iterable, + segment_size=segment_size, + dt=sample_time, + norm="leahy", + silent=True, + use_common_mean=False, + ) + return pds, nbin + + +def main_none(fname, sample_time, segment_size): + """ + Process a FITS timeseries file and compute an averaged powerspectrum. + This function reads event data from a FITS file, processes it using the Stingray + library, and computes an averaged powerspectrum with Leahy normalization. + + Parameters + ---------- + fname : str + Path to the FITS file containing the timeseries data. + sample_time : float + The time resolution (bin size) to use when processing the data. + segment_size : float + The size of each segment (in seconds) for averaging the powerspectrum. + Returns + ------- + freq : numpy.ndarray + Array of frequency values for the computed powerspectrum. + power : numpy.ndarray + Array of power values corresponding to each frequency. + + Notes + ----- + This function uses the standard Stingray processing pipeline and does not + parallelize the computation. + """ + + logger.info("Using standard Stingray processing") + tsreader = FITSTimeseriesReader(fname, output_class=EventList) + + data = tsreader[:] + pds = AveragedPowerspectrum.from_events( + data, + dt=sample_time, + segment_size=segment_size, + norm="leahy", + use_common_mean=False, + ) + + return pds + + +def main_mpi(fname, sample_time, segment_size): + """ + Perform parallel processing of time series data using MPI. + This function distributes the processing of time series intervals across multiple MPI ranks. + Each rank processes a subset of intervals, computes partial results, and then combines them + using a binary tree reduction algorithm to obtain the final result. + Algorithm: + 1. The root rank (rank 0) loads time series data and computes interval boundaries. + 2. The interval information is broadcasted to all ranks. + 3. Each rank determines its assigned intervals and processes them using `single_rank_intervals`. + 4. All ranks synchronize using MPI barriers. + 5. Partial results are combined using a binary tree reduction, where ranks pair up and + send/receive data until only one rank holds the final result. + 6. The final result is normalized and frequency bins are computed. + Parameters + ---------- + fname : str + Path to the FITS file containing the time series data. + sample_time : float + The sampling time for the time series analysis. + segment_size : float + The size of each segment (interval) to be processed. + Returns + ------- + output : AveragedPowerspectrum + The averaged power spectrum computed across all intervals and processes. + + Notes + ----- + - This function requires an MPI environment and assumes that the necessary MPI communicator + and dependencies are available. + - Only the rank responsible for the final reduction returns the results; other ranks return None. + """ + + tsreader = FITSTimeseriesReader(fname, output_class=EventList) + + def data_lookup(): + # This will also contain the boundaries of the data + # to be loaded + start, stop = time_intervals_from_gtis(tsreader.gti, segment_size) + interval_times = np.array(list(zip(start, stop))) + return { + "gtis": tsreader.gti, + "interval_times": interval_times, + "n_intervals": len(interval_times), + } + + world_comm = MPI.COMM_WORLD + world_size = world_comm.Get_size() + my_rank = world_comm.Get_rank() + + info = None + if my_rank == 0: + logger.debug(f"{my_rank}: Loading data") + info = data_lookup() + + info = world_comm.bcast(info, root=0) + + if my_rank == 0: + logger.debug(f"{my_rank}: Info:", info) + + total_n_intervals = info["n_intervals"] + + intervals_per_rank = total_n_intervals / world_size + all_intervals = np.arange(total_n_intervals, dtype=int) + + this_ranks_intervals = all_intervals[ + (all_intervals >= my_rank * intervals_per_rank) + & (all_intervals < (my_rank + 1) * intervals_per_rank) + ] + logger.debug( + f"{my_rank}: Intervals {this_ranks_intervals[0] + 1} " f"to {this_ranks_intervals[-1] + 1}" + ) + + # data = get_data_intervals(this_ranks_intervals) + result, data_size = single_rank_intervals( + this_ranks_intervals, info=info, fname=fname, sample_time=sample_time + ) + + world_comm.Barrier() + + # NOW, the binary tree reduction + # Perform binary tree reduction + totals = result.power * result.m + previous_processors = np.arange(world_size, dtype=int) + + while 1: + new_processors = previous_processors[::2] + parner_processors = previous_processors[1::2] + + if my_rank == 0: + logger.debug(f"{my_rank}: New processors: {new_processors}") + if my_rank == 0: + logger.debug(f"{my_rank}: Partners: {parner_processors}") + if len(parner_processors) == 0: + if my_rank == 0: + logger.debug(f"{my_rank}: Done.") + break + world_comm.Barrier() + + for i, (sender, receiver) in enumerate(zip(parner_processors, new_processors)): + if my_rank == 0: + logger.debug(f"Loop {i + 1}: {sender}, {receiver}") + tag = 10000 + 100 * sender + receiver + if my_rank == receiver: + data_from_partners = np.zeros(totals.size) + logger.debug(f"{my_rank}: Receiving from {sender} with tag {tag}") + # Might be good to use a non-blocking receive here + world_comm.Recv( + data_from_partners, + source=sender, + tag=tag, + ) + totals += data_from_partners + logger.debug(f"{my_rank}: New data are now {totals}") + elif my_rank == sender: + # Only one partner for now. Might be tweaked differently. The rest should work with + # any number of partners for a given processing rank + + logger.debug(f"{my_rank}: Sending to {receiver} with tag {tag}") + world_comm.Send(totals, dest=receiver, tag=tag) + else: + logger.debug(f"{my_rank}: Doing nothing") + + world_comm.Barrier() + previous_processors = new_processors + + world_comm.Barrier() + + assert len(new_processors) == 1 + if my_rank == new_processors[0]: + logger.debug("Results") + totals /= total_n_intervals + + freq = np.fft.fftfreq(data_size, d=sample_time)[positive_fft_bins(data_size)] + output = AveragedPowerspectrum() + output.freq = freq + output.power = totals + output.m = total_n_intervals + output.gti = info["gtis"] + output.dt = sample_time + + return output + + return None + + +def main_multiprocessing(fname, sample_time, segment_size, world_size=8): + """Run parallel analysis on time series data using Python's multiprocessing. + + This function divides the input time series data into segments and distributes the analysis + across multiple processes. Each process computes results for a subset of intervals, and the + results are aggregated to produce the final output. + Algorithm: + 1. Load time series data and determine Good Time Intervals (GTIs). + 2. Split the data into segments of specified size. + 3. Assign segments to worker processes based on the number of available processes (`world_size`). + 4. Each process analyzes its assigned intervals using `single_rank_intervals`. + 5. Aggregate results from all processes and compute the average power spectrum. + 6. Return the frequency array and the averaged power spectrum. + + Parameters + ---------- + fname : str + Path to the FITS file containing the time series data. + sample_time : float + The time resolution of the data samples. + segment_size : float + The size (in seconds) of each segment to analyze. + world_size : int, optional + Number of parallel worker processes to use (default is 8). + + Returns + ------- + output : AveragedPowerspectrum + The averaged power spectrum computed across all intervals and processes. + + """ + + def data_lookup(): + # This will also contain the boundaries of the data + # to be loaded + tsreader = FITSTimeseriesReader(fname, output_class=EventList) + + start, stop = time_intervals_from_gtis(tsreader.gti, segment_size) + interval_times = np.array(list(zip(start, stop))) + return { + "gtis": tsreader.gti, + "interval_times": interval_times, + "n_intervals": len(interval_times), + } + + info = data_lookup() + data_size = np.rint(segment_size / sample_time).astype(int) + + logger.debug("Info:", info) + + total_n_intervals = info["n_intervals"] + + # intervals_per_rank = total_n_intervals / world_size + all_intervals = np.arange(total_n_intervals, dtype=int) + + intervals_per_rank = total_n_intervals / world_size + this_ranks_intervals = [] + for my_rank in range(world_size): + this_ranks_intervals.append( + all_intervals[ + (all_intervals >= my_rank * intervals_per_rank) + & (all_intervals < (my_rank + 1) * intervals_per_rank) + ] + ) + + p = Pool(world_size) + + totals = 0 + for results, data_size in p.imap_unordered( + partial( + single_rank_intervals, + info=info, + fname=fname, + sample_time=sample_time, + ), + this_ranks_intervals, + ): + totals += results.power * results.m + logger.debug("Results") + totals /= total_n_intervals + + freq = np.fft.fftfreq(data_size, d=sample_time)[positive_fft_bins(data_size)] + + output = AveragedPowerspectrum() + output.freq = freq + output.power = totals + output.m = total_n_intervals + output.gti = info["gtis"] + output.dt = sample_time + + return output + + +def main(args=None): + import argparse + + from hendrics.base import _add_default_args, check_negative_numbers_in_args + + description = ( + "Compute the Leahy-normalized power spectrum of an event list in parallel.\n" + "The MPI version needs to be run with mpiexec, as follows:\n" + "mpiexec -n 10 python HENparfspec filename.fits --method mpi\n" + "To run the algorithm in parallel using multiprocessing, use:\n" + "python HENparfspec filename.fits --method multiprocessing --nproc 10\n" + "To run the algorithm sequentially, for testing purposes, just execute" + "HENparfspec filename.fits\n" + ) + parser = argparse.ArgumentParser(description=description) + parser.add_argument("fname", help="Input FITS file name") + parser.add_argument("-o", "--outfname", help="Output FITS file name", default="out_pds.fits") + parser.add_argument( + "-b", + "--sample_time", + type=float, + default=1 / 8129 / 2, + help="Light curve bin time; if negative, interpreted" + + " as negative power of 2." + + " Default: 2^-13, or keep input lc bin time" + + " (whatever is larger)", + ) + + parser.add_argument( + "-f", + "--segment_size", + type=float, + default=128, + help="Length of FFTs. Default: 16 s", + ) + parser.add_argument( + "--norm", + type=str, + default="leahy", + help="Normalization to use" + " (Accepted: leahy and rms;" + ' Default: "leahy")', + ) + parser.add_argument( + "--method", + type=str, + choices=["mpi", "multiprocessing", "none"], + default="none", + help="Computation distribution method", + ) + parser.add_argument( + "--nproc", + type=int, + default=8, + help="Number of processors to use", + ) + _add_default_args(parser, ["loglevel", "debug"]) + + args = check_negative_numbers_in_args(args) + args = parser.parse_args(args) + + if args.debug: + args.loglevel = "DEBUG" + + logger.setLevel(args.loglevel) + + fname = args.fname + + sample_time = args.sample_time + segment_size = args.segment_size + + if args.method == "mpi": + # This method needs to be run with mpiexec -n python script.py + pds = main_mpi(fname, sample_time, segment_size) + elif args.method == "multiprocessing": + pds = main_multiprocessing(fname, sample_time, segment_size, world_size=args.nproc) + else: + pds = main_none(fname, sample_time, segment_size) + if pds is None: + return + + pds.write(args.outfname) From a2fb03661ec21a2a82771352fe4b7af2474b0b9d Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Mon, 25 Aug 2025 12:16:28 +0200 Subject: [PATCH 02/22] Add script --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 3551a7ff..c87a2b7e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -115,6 +115,7 @@ HENlcurve = "hendrics.lcurve:main" HENmodel = "hendrics.modeling:main_model" HENphaseogram = "hendrics.phaseogram:main_phaseogram" HENphasetag = "hendrics.phasetag:main_phasetag" +HENparfspec = "hendrics.parallel:main" HENplot = "hendrics.plot:main" HENpowercolors = "hendrics.power_colors:main" HENreadevents = "hendrics.read_events:main" From 88a0a382951699873d377b6f94b1e3faf2782215 Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Mon, 25 Aug 2025 12:16:40 +0200 Subject: [PATCH 03/22] Add tests --- hendrics/tests/test_parallel.py | 83 +++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 hendrics/tests/test_parallel.py diff --git a/hendrics/tests/test_parallel.py b/hendrics/tests/test_parallel.py new file mode 100644 index 00000000..a9a7d23c --- /dev/null +++ b/hendrics/tests/test_parallel.py @@ -0,0 +1,83 @@ +import subprocess as sp +import tempfile +import numpy as np +from stingray import EventList, AveragedPowerspectrum +from hendrics.fake import main as main_fake +from hendrics.parallel import main as main_parallel + + +class TestParallel: + def setup_class(cls): + cls.tempfile = tempfile.NamedTemporaryFile(suffix=".fits", delete=False) + cls.fname = cls.tempfile.name + print(cls.fname) + + main_fake(["-o", cls.fname, "-c", "10", "--tstart", "0", "--tstop", "10000"]) + cls.events = EventList.read(cls.fname, fmt="ogip") + cls.pds = AveragedPowerspectrum.from_events( + cls.events, dt=0.1, segment_size=10.0, use_common_mean=False, norm="leahy" + ) + + def test_none_version(self): + out_file = tempfile.NamedTemporaryFile(suffix=".hdf5", delete=False) + main_parallel( + [ + self.fname, + "-o", + out_file.name, + "-b", + "0.1", + "-f", + "10.0", + "--method", + "none", + ] + ) + pds = AveragedPowerspectrum.read(out_file.name) + assert pds.m == self.pds.m + assert np.allclose(pds.freq, self.pds.freq) + assert np.allclose(pds.power, self.pds.power, rtol=1e-6) + + def test_multiprocessing_version(self): + out_file = tempfile.NamedTemporaryFile(suffix=".hdf5", delete=False) + main_parallel( + [ + self.fname, + "-o", + out_file.name, + "-b", + "0.1", + "-f", + "10.0", + "--method", + "multiprocessing", + ] + ) + pds = AveragedPowerspectrum.read(out_file.name) + assert pds.m == self.pds.m + assert np.allclose(pds.freq, self.pds.freq) + assert np.allclose(pds.power, self.pds.power, rtol=1e-4) + + def test_mpi_version(self): + out_file = tempfile.NamedTemporaryFile(suffix=".hdf5", delete=False) + sp.check_call( + [ + "mpirun", + "-n", + "4", + "HENparfspec", + self.fname, + "-o", + out_file.name, + "-b", + "0.1", + "-f", + "10.0", + "--method", + "mpi", + ] + ) + pds = AveragedPowerspectrum.read(out_file.name) + assert pds.m == self.pds.m + assert np.allclose(pds.freq, self.pds.freq) + assert np.allclose(pds.power, self.pds.power, rtol=1e-4) From 41a748796ce8b13ea8bfabd078f2b11d4d85b9ba Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Wed, 18 Mar 2026 14:48:53 +0100 Subject: [PATCH 04/22] Complete creation of PDSs. Test all attributes --- hendrics/parallel.py | 58 +++++++++++++------- hendrics/tests/test_parallel.py | 97 +++++++++++++-------------------- 2 files changed, 74 insertions(+), 81 deletions(-) diff --git a/hendrics/parallel.py b/hendrics/parallel.py index da457338..2bd9447d 100644 --- a/hendrics/parallel.py +++ b/hendrics/parallel.py @@ -1,10 +1,6 @@ -import os -import subprocess as sp -import time from functools import partial from multiprocessing import Pool -import matplotlib.pyplot as plt import numpy as np from mpi4py import MPI from stingray import AveragedPowerspectrum, EventList @@ -16,8 +12,8 @@ def get_data_intervals(interval_idxs, info=None, fname=None, sample_time=None): - """ - Generate light curves for specified time intervals from event data. + """Generate light curves for specified time intervals from event data. + Parameters ---------- interval_idxs : array-like @@ -28,17 +24,18 @@ def get_data_intervals(interval_idxs, info=None, fname=None, sample_time=None): Path to the FITS file containing event data. sample_time : float, optional The time resolution (bin width) for the light curve histogram. + Yields ------ lc : array-like Histogrammed light curve for each specified interval. + Notes ----- - Uses FITSTimeseriesReader to read event data from the FITS file. - Each yielded light curve corresponds to one interval in `interval_idxs`. - The number of bins is determined by the interval duration and `sample_time`. """ - tsreader = FITSTimeseriesReader(fname, output_class=EventList) time_intervals = info["interval_times"][interval_idxs] if np.shape(time_intervals) == (2,): @@ -54,8 +51,8 @@ def get_data_intervals(interval_idxs, info=None, fname=None, sample_time=None): def single_rank_intervals(this_ranks_intervals, sample_time=None, info=None, fname=None): - """ - Generate an averaged powerspectrum from light curve intervals for a single rank. + """Generate an averaged powerspectrum from light curve intervals for a single rank. + Parameters ---------- this_ranks_intervals : list or array-like @@ -66,18 +63,19 @@ def single_rank_intervals(this_ranks_intervals, sample_time=None, info=None, fna Dictionary containing metadata about the intervals, including "interval_times". fname : str or None, optional Filename or path to the data source, if required by `get_data_intervals`. + Returns ------- pds : AveragedPowerspectrum The averaged powerspectrum computed from the light curve intervals. nbin : int Number of bins in each interval, calculated from the interval duration and sample time. + Notes ----- This function extracts light curve data for the specified intervals, computes the number of bins, and returns the averaged powerspectrum using the "leahy" normalization. """ - # print(kwargs) t_int = info["interval_times"][0] nbin = int(np.rint((t_int[1] - t_int[0]) / sample_time)) @@ -98,10 +96,7 @@ def single_rank_intervals(this_ranks_intervals, sample_time=None, info=None, fna def main_none(fname, sample_time, segment_size): - """ - Process a FITS timeseries file and compute an averaged powerspectrum. - This function reads event data from a FITS file, processes it using the Stingray - library, and computes an averaged powerspectrum with Leahy normalization. + """Process a FITS timeseries file and compute an averaged powerspectrum. This function reads event data from a FITS file, processes it using the Stingray library, and computes an averaged powerspectrum with Leahy normalization. Parameters ---------- @@ -111,6 +106,7 @@ def main_none(fname, sample_time, segment_size): The time resolution (bin size) to use when processing the data. segment_size : float The size of each segment (in seconds) for averaging the powerspectrum. + Returns ------- freq : numpy.ndarray @@ -123,7 +119,6 @@ def main_none(fname, sample_time, segment_size): This function uses the standard Stingray processing pipeline and does not parallelize the computation. """ - logger.info("Using standard Stingray processing") tsreader = FITSTimeseriesReader(fname, output_class=EventList) @@ -153,6 +148,7 @@ def main_mpi(fname, sample_time, segment_size): 5. Partial results are combined using a binary tree reduction, where ranks pair up and send/receive data until only one rank holds the final result. 6. The final result is normalized and frequency bins are computed. + Parameters ---------- fname : str @@ -161,6 +157,7 @@ def main_mpi(fname, sample_time, segment_size): The sampling time for the time series analysis. segment_size : float The size of each segment (interval) to be processed. + Returns ------- output : AveragedPowerspectrum @@ -172,7 +169,6 @@ def main_mpi(fname, sample_time, segment_size): and dependencies are available. - Only the rank responsible for the final reduction returns the results; other ranks return None. """ - tsreader = FITSTimeseriesReader(fname, output_class=EventList) def data_lookup(): @@ -184,6 +180,7 @@ def data_lookup(): "gtis": tsreader.gti, "interval_times": interval_times, "n_intervals": len(interval_times), + "nphots": tsreader.nphot / tsreader.exposure * segment_size, } world_comm = MPI.COMM_WORLD @@ -280,6 +277,12 @@ def data_lookup(): output.m = total_n_intervals output.gti = info["gtis"] output.dt = sample_time + output.power_err = totals / np.sqrt(total_n_intervals) + output.unnorm_power = totals / 2 * info["nphots"] + output.unnorm_power_err = output.unnorm_power / np.sqrt(total_n_intervals) + output.norm = "leahy" + output.nphots = info["nphots"] + output.n = np.rint(segment_size / sample_time).astype(int) return output @@ -315,7 +318,6 @@ def main_multiprocessing(fname, sample_time, segment_size, world_size=8): ------- output : AveragedPowerspectrum The averaged power spectrum computed across all intervals and processes. - """ def data_lookup(): @@ -354,6 +356,7 @@ def data_lookup(): p = Pool(world_size) totals = 0 + nphots = 0 for results, data_size in p.imap_unordered( partial( single_rank_intervals, @@ -364,8 +367,10 @@ def data_lookup(): this_ranks_intervals, ): totals += results.power * results.m + nphots += results.nphots * results.m logger.debug("Results") totals /= total_n_intervals + nphots /= total_n_intervals freq = np.fft.fftfreq(data_size, d=sample_time)[positive_fft_bins(data_size)] @@ -375,6 +380,12 @@ def data_lookup(): output.m = total_n_intervals output.gti = info["gtis"] output.dt = sample_time + output.power_err = totals / np.sqrt(total_n_intervals) + output.unnorm_power = totals / 2 * nphots + output.unnorm_power_err = output.unnorm_power / np.sqrt(total_n_intervals) + output.norm = "leahy" + output.nphots = nphots + output.n = np.rint(segment_size / sample_time).astype(int) return output @@ -401,10 +412,12 @@ def main(args=None): "--sample_time", type=float, default=1 / 8129 / 2, - help="Light curve bin time; if negative, interpreted" - + " as negative power of 2." - + " Default: 2^-13, or keep input lc bin time" - + " (whatever is larger)", + help=( + "Light curve bin time; if negative, interpreted" + " as negative power of 2." + " Default: 2^-13, or keep input lc bin time" + " (whatever is larger)" + ), ) parser.add_argument( @@ -455,7 +468,10 @@ def main(args=None): pds = main_multiprocessing(fname, sample_time, segment_size, world_size=args.nproc) else: pds = main_none(fname, sample_time, segment_size) + if pds is None: return + if args.norm != "leahy": + pds = pds.to_norm(args.norm) pds.write(args.outfname) diff --git a/hendrics/tests/test_parallel.py b/hendrics/tests/test_parallel.py index a9a7d23c..7ad0506c 100644 --- a/hendrics/tests/test_parallel.py +++ b/hendrics/tests/test_parallel.py @@ -1,7 +1,10 @@ import subprocess as sp import tempfile + import numpy as np -from stingray import EventList, AveragedPowerspectrum +import pytest +from stingray import AveragedPowerspectrum, EventList + from hendrics.fake import main as main_fake from hendrics.parallel import main as main_parallel @@ -18,66 +21,40 @@ def setup_class(cls): cls.events, dt=0.1, segment_size=10.0, use_common_mean=False, norm="leahy" ) - def test_none_version(self): + @pytest.mark.parametrize("method", ["none", "multiprocessing", "mpi"]) + @pytest.mark.parametrize("norm", ["leahy", "frac", "abs"]) + def test_parallel_versions(self, method, norm): out_file = tempfile.NamedTemporaryFile(suffix=".hdf5", delete=False) - main_parallel( - [ - self.fname, - "-o", - out_file.name, - "-b", - "0.1", - "-f", - "10.0", - "--method", - "none", - ] - ) - pds = AveragedPowerspectrum.read(out_file.name) - assert pds.m == self.pds.m - assert np.allclose(pds.freq, self.pds.freq) - assert np.allclose(pds.power, self.pds.power, rtol=1e-6) + command = [ + self.fname, + "-o", + out_file.name, + "-b", + "0.1", + "-f", + "10.0", + "--method", + method, + "--norm", + norm, + ] + if method == "mpi": + command = ["mpirun", "-n", "4", "HENparfspec"] + command + sp.check_call(command) + else: + main_parallel(command) - def test_multiprocessing_version(self): - out_file = tempfile.NamedTemporaryFile(suffix=".hdf5", delete=False) - main_parallel( - [ - self.fname, - "-o", - out_file.name, - "-b", - "0.1", - "-f", - "10.0", - "--method", - "multiprocessing", - ] - ) pds = AveragedPowerspectrum.read(out_file.name) - assert pds.m == self.pds.m - assert np.allclose(pds.freq, self.pds.freq) - assert np.allclose(pds.power, self.pds.power, rtol=1e-4) + compare_pds = self.pds.to_norm(norm) if norm != "leahy" else self.pds + assert pds.m == compare_pds.m + assert np.allclose(pds.freq, compare_pds.freq) + if norm == "leahy": + power_rtol = 1e-6 + else: + power_rtol = 1e-2 - def test_mpi_version(self): - out_file = tempfile.NamedTemporaryFile(suffix=".hdf5", delete=False) - sp.check_call( - [ - "mpirun", - "-n", - "4", - "HENparfspec", - self.fname, - "-o", - out_file.name, - "-b", - "0.1", - "-f", - "10.0", - "--method", - "mpi", - ] - ) - pds = AveragedPowerspectrum.read(out_file.name) - assert pds.m == self.pds.m - assert np.allclose(pds.freq, self.pds.freq) - assert np.allclose(pds.power, self.pds.power, rtol=1e-4) + assert np.allclose(pds.power, compare_pds.power, rtol=power_rtol) + assert np.allclose(pds.power_err, compare_pds.power_err, rtol=1e-2) + assert pds.norm == compare_pds.norm + assert np.allclose(pds.unnorm_power, compare_pds.unnorm_power, rtol=1e-2) + assert np.isclose(pds.nphots, compare_pds.nphots, rtol=1e-2) From 424813ba6f2cd2826e7b0a1de6b33f7038f96041 Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Wed, 18 Mar 2026 14:59:03 +0100 Subject: [PATCH 05/22] Only test MPI if installed --- hendrics/parallel.py | 3 ++- hendrics/tests/test_parallel.py | 9 ++++++++- pyproject.toml | 2 ++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/hendrics/parallel.py b/hendrics/parallel.py index 2bd9447d..3cb1ae13 100644 --- a/hendrics/parallel.py +++ b/hendrics/parallel.py @@ -2,7 +2,6 @@ from multiprocessing import Pool import numpy as np -from mpi4py import MPI from stingray import AveragedPowerspectrum, EventList from stingray.fourier import positive_fft_bins from stingray.gti import time_intervals_from_gtis @@ -169,6 +168,8 @@ def main_mpi(fname, sample_time, segment_size): and dependencies are available. - Only the rank responsible for the final reduction returns the results; other ranks return None. """ + from mpi4py import MPI + tsreader = FITSTimeseriesReader(fname, output_class=EventList) def data_lookup(): diff --git a/hendrics/tests/test_parallel.py b/hendrics/tests/test_parallel.py index 7ad0506c..3cd7157d 100644 --- a/hendrics/tests/test_parallel.py +++ b/hendrics/tests/test_parallel.py @@ -1,3 +1,4 @@ +import importlib import subprocess as sp import tempfile @@ -8,6 +9,12 @@ from hendrics.fake import main as main_fake from hendrics.parallel import main as main_parallel +HAS_MPI = importlib.util.find_spec("mpi4py") is not None + +test_cases = ["none", "multiprocessing"] +if HAS_MPI: + test_cases.append("mpi") + class TestParallel: def setup_class(cls): @@ -21,7 +28,7 @@ def setup_class(cls): cls.events, dt=0.1, segment_size=10.0, use_common_mean=False, norm="leahy" ) - @pytest.mark.parametrize("method", ["none", "multiprocessing", "mpi"]) + @pytest.mark.parametrize("method", test_cases) @pytest.mark.parametrize("norm", ["leahy", "frac", "abs"]) def test_parallel_versions(self, method, norm): out_file = tempfile.NamedTemporaryFile(suffix=".hdf5", delete=False) diff --git a/pyproject.toml b/pyproject.toml index c87a2b7e..ae5a9b0a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,7 @@ test_all = [ "numba", "netcdf4==1.7.0", "scikit-image", + "mpi4py", ] recommended = [ "numba", @@ -72,6 +73,7 @@ all = [ "imageio", "netcdf4==1.7.0", "scikit-image", + "mpi4py", ] docs = [ "tomli>=1.1.0; python_version < '3.11'", From c919206adf979bb7e5d922994d2f6af310479938 Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Wed, 18 Mar 2026 15:06:36 +0100 Subject: [PATCH 06/22] Only test h5py if installed --- hendrics/tests/test_parallel.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hendrics/tests/test_parallel.py b/hendrics/tests/test_parallel.py index 3cd7157d..ef201a1c 100644 --- a/hendrics/tests/test_parallel.py +++ b/hendrics/tests/test_parallel.py @@ -10,12 +10,14 @@ from hendrics.parallel import main as main_parallel HAS_MPI = importlib.util.find_spec("mpi4py") is not None +HAS_HDF5 = importlib.util.find_spec("h5py") is not None test_cases = ["none", "multiprocessing"] if HAS_MPI: test_cases.append("mpi") +@pytest.mark.skipif(not HAS_HDF5, reason="h5py is required for this test") class TestParallel: def setup_class(cls): cls.tempfile = tempfile.NamedTemporaryFile(suffix=".fits", delete=False) From ca1c9bc5708594801af45aebd3b4ed0040ea6c1c Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Thu, 19 Mar 2026 08:46:58 +0100 Subject: [PATCH 07/22] Fix seed when simulating data --- hendrics/tests/test_parallel.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hendrics/tests/test_parallel.py b/hendrics/tests/test_parallel.py index ef201a1c..ff7560a7 100644 --- a/hendrics/tests/test_parallel.py +++ b/hendrics/tests/test_parallel.py @@ -24,7 +24,9 @@ def setup_class(cls): cls.fname = cls.tempfile.name print(cls.fname) - main_fake(["-o", cls.fname, "-c", "10", "--tstart", "0", "--tstop", "10000"]) + main_fake( + ["-o", cls.fname, "-c", "10", "--tstart", "0", "--tstop", "10000", "--seed", "42"] + ) cls.events = EventList.read(cls.fname, fmt="ogip") cls.pds = AveragedPowerspectrum.from_events( cls.events, dt=0.1, segment_size=10.0, use_common_mean=False, norm="leahy" From 35fd23c9a888f5494422e5650a2898bcb0cab041 Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Thu, 19 Mar 2026 13:46:09 +0100 Subject: [PATCH 08/22] Add changelog [docs only] --- docs/changes/180.feature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 docs/changes/180.feature.rst diff --git a/docs/changes/180.feature.rst b/docs/changes/180.feature.rst new file mode 100644 index 00000000..fcddf6ae --- /dev/null +++ b/docs/changes/180.feature.rst @@ -0,0 +1 @@ +HENfspec can now use a new parallel implementation using MPI or multiprocessing for the power spectrum From 55137bdab9482ae30c1313ffa5212e86dcd32ff5 Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Thu, 19 Mar 2026 13:56:06 +0100 Subject: [PATCH 09/22] Add action to install MPI --- .github/workflows/ci_test.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/ci_test.yml b/.github/workflows/ci_test.yml index 2bc94500..9a29e1eb 100644 --- a/.github/workflows/ci_test.yml +++ b/.github/workflows/ci_test.yml @@ -152,6 +152,11 @@ jobs: python -c "import pip; print(f'pip {pip.__version__}')" python -c "import setuptools; print(f'setuptools {setuptools.__version__}')" python -c "import tox; print(f'tox {tox.__version__}')" + - name: Setup MPI + if: "contains(matrix.tox_env, 'alldeps')" + uses: mpi4py/setup-mpi@v1 + with: + mpi: "openmpi" - name: Run tests if: "! matrix.use_remote_data" run: tox -e ${{ matrix.tox_env }} From b8b163fafa9df37ad496ccbb22a152345faa0e89 Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Thu, 19 Mar 2026 14:39:51 +0100 Subject: [PATCH 10/22] Make tests with MPI only --- .github/workflows/ci_test.yml | 33 ++++++++++++++++++--------------- hendrics/tests/test_parallel.py | 6 +----- pyproject.toml | 2 -- tox.ini | 13 +++++++++++-- 4 files changed, 30 insertions(+), 24 deletions(-) diff --git a/.github/workflows/ci_test.yml b/.github/workflows/ci_test.yml index 9a29e1eb..abea4e40 100644 --- a/.github/workflows/ci_test.yml +++ b/.github/workflows/ci_test.yml @@ -95,30 +95,33 @@ jobs: matrix: include: - os: ubuntu-latest - python: '3.10' + python: '3.13' tox_env: 'codestyle' - os: ubuntu-latest - python: '3.12' - tox_env: 'py312-test-cov' + python: '3.13' + tox_env: 'py313-test-cov' + - os: ubuntu-latest + python: '3.13' + tox_env: 'py313-test-mpi-cov' - os: ubuntu-latest python: '3.9' tox_env: 'py39-test-cov' - os: ubuntu-latest - python: '3.11' - tox_env: 'py311-test-alldeps-cov' + python: '3.13' + tox_env: 'py313-test-alldeps-cov' use_remote_data: true - os: macos-14 - python: '3.11' - tox_env: 'py311-test-alldeps-cov' + python: '3.13' + tox_env: 'py313-test-alldeps-cov' use_remote_data: true - os: ubuntu-latest - python: '3.12' - tox_env: 'py312-test-devdeps' + python: '3.14' + tox_env: 'py314-test-devdeps' use_remote_data: true continue-on-error: true - os: ubuntu-latest - python: '3.12' - tox_env: 'py312-test-devpint' + python: '3.13' + tox_env: 'py313-test-devpint' use_remote_data: true continue-on-error: true - os: ubuntu-latest @@ -126,11 +129,11 @@ jobs: tox_env: 'py39-test-oldestdeps-cov' use_remote_data: true - os: macos-latest - python: '3.11' - tox_env: 'py311-test' + python: '3.13' + tox_env: 'py313-test' - os: windows-latest - python: '3.11' - tox_env: 'py311-test' + python: '3.13' + tox_env: 'py313-test' steps: - name: Check out repository diff --git a/hendrics/tests/test_parallel.py b/hendrics/tests/test_parallel.py index ff7560a7..c892c54a 100644 --- a/hendrics/tests/test_parallel.py +++ b/hendrics/tests/test_parallel.py @@ -49,11 +49,7 @@ def test_parallel_versions(self, method, norm): "--norm", norm, ] - if method == "mpi": - command = ["mpirun", "-n", "4", "HENparfspec"] + command - sp.check_call(command) - else: - main_parallel(command) + main_parallel(command) pds = AveragedPowerspectrum.read(out_file.name) compare_pds = self.pds.to_norm(norm) if norm != "leahy" else self.pds diff --git a/pyproject.toml b/pyproject.toml index ae5a9b0a..c87a2b7e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,7 +58,6 @@ test_all = [ "numba", "netcdf4==1.7.0", "scikit-image", - "mpi4py", ] recommended = [ "numba", @@ -73,7 +72,6 @@ all = [ "imageio", "netcdf4==1.7.0", "scikit-image", - "mpi4py", ] docs = [ "tomli>=1.1.0; python_version < '3.11'", diff --git a/tox.ini b/tox.ini index df65fa1c..a66b101d 100644 --- a/tox.ini +++ b/tox.ini @@ -36,6 +36,7 @@ description = cov: and test coverage astropy4: with astropy 4 astropy5: with astropy 5 + mpi: with MPI support setenv = devdeps: PIP_EXTRA_INDEX_URL = https://pypi.anaconda.org/scientific-python-nightly-wheels/simple https://pypi.anaconda.org/astropy/simple https://pypi.anaconda.org/liberfa/simple @@ -58,6 +59,11 @@ deps = devpint: git+https://github.com/nanograv/pint.git#egg=pint-pulsar devdeps: git+https://github.com/stingraysoftware/stingray.git#egg=stingray alldeps: netcdf4==1.7.0 + mpi: mpi4py + + +allowlist_externals = + mpi: mpirun # The following indicates which extras_require will be installed extras = @@ -67,10 +73,13 @@ extras = commands = # Force numpy reinstall to work around upper version limits dependencies put on numpy devdeps: pip install -U --pre --extra-index-url https://pypi.anaconda.org/scientific-python-nightly-wheels/simple numpy + mpi: pip install mpi4py h5py pip freeze - !cov: pytest --pyargs hendrics {toxinidir}/docs {posargs} - cov: pytest --pyargs hendrics {toxinidir}/docs --cov hendrics --cov-config={toxinidir}/pyproject.toml {posargs} + mpi: mpirun -n 1 pytest --pyargs hendrics -k mpi {posargs} --cov hendrics --cov-config={toxinidir}/pyproject.toml {posargs} + + !mpi-!cov: pytest --pyargs hendrics {toxinidir}/docs {posargs} + !mpi-cov: pytest --pyargs hendrics {toxinidir}/docs --cov hendrics --cov-config={toxinidir}/pyproject.toml {posargs} cov: coverage xml -o {toxinidir}/coverage.xml [testenv:build_docs] From 3463d22f20a0e2adde1cb7e5c50e2d28f24805ca Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Thu, 19 Mar 2026 14:43:15 +0100 Subject: [PATCH 11/22] Fix config --- .github/workflows/ci_test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci_test.yml b/.github/workflows/ci_test.yml index abea4e40..a9db0cf0 100644 --- a/.github/workflows/ci_test.yml +++ b/.github/workflows/ci_test.yml @@ -156,7 +156,7 @@ jobs: python -c "import setuptools; print(f'setuptools {setuptools.__version__}')" python -c "import tox; print(f'tox {tox.__version__}')" - name: Setup MPI - if: "contains(matrix.tox_env, 'alldeps')" + if: "contains(matrix.tox_env, 'mpi')" uses: mpi4py/setup-mpi@v1 with: mpi: "openmpi" From fdb55da74d8f99433be9713ce8ff80c404e5ad89 Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Thu, 19 Mar 2026 14:46:12 +0100 Subject: [PATCH 12/22] Fix netcdf4 version --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index a66b101d..0b091add 100644 --- a/tox.ini +++ b/tox.ini @@ -58,7 +58,7 @@ deps = devdeps: pyerfa>=0.0.dev devpint: git+https://github.com/nanograv/pint.git#egg=pint-pulsar devdeps: git+https://github.com/stingraysoftware/stingray.git#egg=stingray - alldeps: netcdf4==1.7.0 + ; alldeps: netcdf4==1.7.0 mpi: mpi4py From dcc7bc8cda6a2cea2b97455b540cc826f3002a2b Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Thu, 19 Mar 2026 14:48:24 +0100 Subject: [PATCH 13/22] Fix netcdf4 version --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index c87a2b7e..9942a7b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,7 +56,7 @@ test_all = [ "pint-pulsar", "imageio", "numba", - "netcdf4==1.7.0", + "netcdf4", "scikit-image", ] recommended = [ @@ -70,7 +70,7 @@ all = [ "pandas", "pint-pulsar", "imageio", - "netcdf4==1.7.0", + "netcdf4", "scikit-image", ] docs = [ From 998d8a8d8fd3a7463e53949e1a88816fbcd973b9 Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Thu, 19 Mar 2026 21:43:26 +0100 Subject: [PATCH 14/22] Fail well if file is unsorted --- hendrics/parallel.py | 28 ++++++++++---- hendrics/tests/data/monol_testA_sort.evt | 22 +++++++++++ hendrics/tests/test_parallel.py | 48 ++++++++++++++++++++++++ 3 files changed, 91 insertions(+), 7 deletions(-) create mode 100644 hendrics/tests/data/monol_testA_sort.evt diff --git a/hendrics/parallel.py b/hendrics/parallel.py index 3cb1ae13..49e2888f 100644 --- a/hendrics/parallel.py +++ b/hendrics/parallel.py @@ -462,13 +462,27 @@ def main(args=None): sample_time = args.sample_time segment_size = args.segment_size - if args.method == "mpi": - # This method needs to be run with mpiexec -n python script.py - pds = main_mpi(fname, sample_time, segment_size) - elif args.method == "multiprocessing": - pds = main_multiprocessing(fname, sample_time, segment_size, world_size=args.nproc) - else: - pds = main_none(fname, sample_time, segment_size) + unsorted_error = False + try: + if args.method == "mpi": + # This method needs to be run with mpiexec -n python script.py + pds = main_mpi(fname, sample_time, segment_size) + elif args.method == "multiprocessing": + pds = main_multiprocessing(fname, sample_time, segment_size, world_size=args.nproc) + else: + pds = main_none(fname, sample_time, segment_size) + except AssertionError as e: + if "Start:" in str(e): + unsorted_error = True + except TypeError as e: + if "'>=' not supported between instances" in str(e): + unsorted_error = True + + if unsorted_error: + logger.error( + "The input file is probably not sorted. Please sort it (e.g. with ftsort) and try again." + ) + return if pds is None: return diff --git a/hendrics/tests/data/monol_testA_sort.evt b/hendrics/tests/data/monol_testA_sort.evt new file mode 100644 index 00000000..69e62cec --- /dev/null +++ b/hendrics/tests/data/monol_testA_sort.evt @@ -0,0 +1,22 @@ +SIMPLE = T / conforms to FITS standard BITPIX = 8 / array data type NAXIS = 0 / number of array dimensions EXTEND = T OBSERVER= 'Edwige Bubble' TELESCOP= 'NuSTAR ' / Telescope (mission) name INSTRUME= 'FPMA ' / Instrument name CHECKSUM= 'Kc4GMZ1FKa1FKY1F' / HDU checksum updated 2021-05-23T20:25:05 DATASUM = '0 ' / data unit checksum updated 2021-05-23T20:25:05 END XTENSION= 'BINTABLE' / binary table extension BITPIX = 8 / array data type NAXIS = 2 / number of array dimensions NAXIS1 = 12 / length of dimension 1 NAXIS2 = 1000 / length of dimension 2 PCOUNT = 0 / number of group parameters GCOUNT = 1 / number of groups TFIELDS = 2 / number of table fields TTYPE1 = 'TIME ' TFORM1 = '1D ' TTYPE2 = 'PI ' TFORM2 = '1J ' EXTNAME = 'EVENTS ' / extension name OBSERVER= 'Edwige Bubble' TELESCOP= 'NuSTAR ' / Telescope (mission) name INSTRUME= 'FPMA ' / Instrument name OBS_ID = '00000000001' / Observation ID TARG_ID = 0 / Target ID OBJECT = 'Fake X-1' / Name of observed object RA_OBJ = 0.0 / [deg] R.A. Object DEC_OBJ = 0.0 / [deg] Dec Object RA_NOM = 0.0 / Right Ascension used for barycenter correctionsDEC_NOM = 0.0 / Declination used for barycenter corrections RA_PNT = 0.0 / [deg] RA pointing DEC_PNT = 0.0 / [deg] Dec pointing PA_PNT = 0.0 / [deg] Position angle (roll) EQUINOX = 2000.0 / Equinox of celestial coord system RADECSYS= 'FK5 ' / Coordinate Reference System TASSIGN = 'SATELLITE' / Time assigned by onboard clock TIMESYS = 'TDB ' / All times in this file are TDB MJDREFI = 55197 / TDB time reference; Modified Julian Day (int) MJDREFF = 0.00076601852 / TDB time reference; Modified Julian Day (frac) TIMEREF = 'SOLARSYSTEM' / Times are pathlength-corrected to barycenter CLOCKAPP= F / TRUE if timestamps corrected by gnd sware TIMEUNIT= 's ' / unit for time keywords TSTART = 80000000.0 / Elapsed seconds since MJDREF at start of file TSTOP = 80001025.0 / Elapsed seconds since MJDREF at end of file LIVETIME= 1025.0 / On-source time PLEPHEM = 'JPL-DE200' TIMEZERO= 0.0 / Time Zero CHECKSUM= 'SLa3ULY2SLa2SLY2' / HDU checksum updated 2021-05-23T20:25:05 DATASUM = '3443983656' / data unit checksum updated 2021-05-23T20:25:05 COMMENT FITS (Flexible Image Transport System) format is defined in 'Astronomy aCOMMENT nd Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H COMMENT MJDREFI+MJDREFF = epoch of Jan 1, 2010, in TT time system. HISTORY File modified by user 'meo' with fv on 2015-08-17T14:10:02 HISTORY File modified by user 'meo' with fv on 2015-08-17T14:48:52 HISTORY File modified by user 'meo' with fv on 2021-05-05T10:33:13 END A:A02A#k6A (aA AQV4A_VtAA >kA A&!A)sBA+" A/~R|A7A8d)A9 +1A=QsAHU;AK[A5AO AS +aOATAX1A^JA^xw2AcnAgAh BAl[GQArc;A|CO A|ljAЂUAЃ;7AЄ}AЇg7oAЋr/AИFAЙH,AЩ@QAЮĐF +Aж@BA!AӭA )Av>AA;3A#(ZAoGA9A(8PgA ]KAg(dAA#}A%XA'B7A+A0,DA2zLBA9|82A=t8A@|AAJ8ANzAQ#fAQ]AR AVteAX@ +CA\IA\oRLA]Aa"Ab$AddAhv_Ak]Ak6S=AnApsAsA{8FA|2AтDxAуNVAъUAы"@Aь Aѐ@Aѓi% Aћ ] Aќ:PAѠ,0AѭUp@AѭAѮDAѵ»AѶ AѽAøA.ACAt=lAҁGtXA'8AAHAڒAޑ&AHA{yfA\A4(%A]{!A.#XAUA3z0A5yAAXwAЁyAv!A8A 8e?AԸ0A]'A1MAA!DaA!A!jA"c&G8A#AtA$r,A'`OA0aA6WA7nA94AAA͗AIp&{DAIAAKYqAOsm +APFAS+7AXAYAZ + AyAzCg"AҀ~AҊ5AAҎAҐ'eAґ AҔ'}Aҙy.Aҡ\\AҦWmTAҦx-%Aҫ8TAұAұAҸn"AҸ|Aһ5AһZlAq$AwAکAQ0AӪAoX/|Aظ(6HAC8A~A޻e/A?YAFe[AM;baAOGB AQAT!L{AW5υAY%AZ A[(zjA\bOA]IbSA]Ad8X9Af;AfVAnl':ArAsJAvbQVAŰAԁ3lAԃaVRAԅ""vAԆAԋ$AԋAԌmAԌ팒AԕlAԡ0-cABOj}ALAN,AP(AQr_AR QAcҭxAh(AkzApdAp%As=A{y.A};AւAք|>A։vA֑+ A֒"A֓8&nA֓tA֓mA֘r+WA֛1J A֡A֣C8RA֥ A֧x A֪^#A֬ZA֭qmCAֲ-/Aֳ]AֵASAʘA%AAKAOw'A5k+A+A5AA'BAdAAEaEAAXpA>A +/A +<A6`AB'ATAjArMAIA!+A(}4A,!6&A-<CA/A@@AAUfAI].,ALwAO|kAS'AWe?AW}" A`SAAa=MAd|Af+.ZAp!AsJ A׃Aׄ=]KAׅ}FAׇ59AׇT)fA׋-A׏nAבI`LAד9+AכfAל6XAן^.:A׭A׮A׵ogyA׹FNA׼A׽אEAAnYAӖAٽZA]A;Ay`AhA=A诈APLeJAټSNA0AҠAxApACHAAa3YAAMkAR"tUA"8A"/PxA$ A%A&GmA,;8A43A?FAL<APl8?ATAY0AY]z#A[VVA[1 A[A_AerAiHwApP.Apt%A؊ތ2A؋\sDA؍:&Aؖ!%Aؚ@~AؚhAآGAاOAحbAػKҋAػAؽ!gA[yAķAK3AʷRABA(gBArOAA$ A2GAFA~AUA +AW +A.@AATWH*A +kcQA gA -AtSvA)vArA("iA71/A=ܗ|A? AD\8AH+!dAK?]AMpqyAO*ASRw=A[xA^ZAc,cAhn/Aj5|Am}|Az +ADAAE +5sAEUAJpSAP2 ARXAWQ]k*AWXzAY AZOA\A\ +iDAah;Ab5,AdqAhWT6AzƽA|8Aڅb6Aڋ/AڐRAJAڒƌAړ7AڙAڡAڪ +AڱdahAڵAںAڻF`ABAPAÇ&AĢwA FJA2AOtÁ*[VAҨpA}AIm#Aߑ(Aj=hAtAEAA|NAAvAP[A QAA̩A=y+ATA2A%??A'A'+ A-Vֆ A.7A3AuA3ݴA4GA5 Y@A6牱LA8OA<AE^AEt"SAG)E9AHoDAIAN67 AN7AXJqMAa@AaŰAawAc >OAkjMAA &^A (@A@\+AmAAANAEs'A]yAA!l)A#A&M{A&"A( A,OA3{)A5y>VA7??JA9Vz6A:pcA=,$AFÈj7AHY AJ)xARAR[YA^+XAbAi\AtWAy"v AzOdAހ:˖Aޖ7Aޗ Aޘh`Aޟ?.@AޠaAǬRA*ޓAmߣA? 3AQAARJ!'AVqɣAVA]AdneAdTAkwApZAAsAxc7A{*CA|@A7:A߄2A߉oAߊKxcAߍ:Aߍ/Aߏ͸AߑӰAߒ;IAߔ鱰Aߕ)AߗAߛ@ZAߝAߞ_;AߠaxAߥAߪ-PA߬xA߮BRA߷+A߾x+Ab4AF +A²AΟ0AU-Ab_A AJAY|AśXTENSION= 'BINTABLE' / binary table extension BITPIX = 8 / array data type NAXIS = 2 / number of array dimensions NAXIS1 = 16 / length of dimension 1 NAXIS2 = 1 / length of dimension 2 PCOUNT = 0 / number of group parameters GCOUNT = 1 / number of groups TFIELDS = 2 / number of table fields TTYPE1 = 'START ' TFORM1 = '1D ' TTYPE2 = 'STOP ' TFORM2 = '1D ' EXTNAME = 'GTI ' / extension name CHECKSUM= 'Mah9PUh6Mah6MUh6' / HDU checksum updated 2021-05-23T20:25:05 DATASUM = '2267424176' / data unit checksum updated 2021-05-23T20:25:05 END AA \ No newline at end of file diff --git a/hendrics/tests/test_parallel.py b/hendrics/tests/test_parallel.py index c892c54a..00b5e45a 100644 --- a/hendrics/tests/test_parallel.py +++ b/hendrics/tests/test_parallel.py @@ -20,6 +20,13 @@ @pytest.mark.skipif(not HAS_HDF5, reason="h5py is required for this test") class TestParallel: def setup_class(cls): + import os + + curdir = os.path.abspath(os.path.dirname(__file__)) + cls.datadir = os.path.join(curdir, "data") + cls.fname_unsorted = os.path.join(cls.datadir, "monol_testA.evt") + cls.fname_sorted = os.path.join(cls.datadir, "monol_testA_sort.evt") + cls.tempfile = tempfile.NamedTemporaryFile(suffix=".fits", delete=False) cls.fname = cls.tempfile.name print(cls.fname) @@ -32,6 +39,24 @@ def setup_class(cls): cls.events, dt=0.1, segment_size=10.0, use_common_mean=False, norm="leahy" ) + @pytest.mark.parametrize("method", test_cases) + def test_parallel_versions_on_small_file(self, method): + out_file = tempfile.NamedTemporaryFile(suffix=".hdf5", delete=False) + command = [ + self.fname_sorted, + "-o", + out_file.name, + "-b", + "0.1", + "-f", + "10.0", + "--method", + method, + "--norm", + "leahy", + ] + main_parallel(command) + @pytest.mark.parametrize("method", test_cases) @pytest.mark.parametrize("norm", ["leahy", "frac", "abs"]) def test_parallel_versions(self, method, norm): @@ -65,3 +90,26 @@ def test_parallel_versions(self, method, norm): assert pds.norm == compare_pds.norm assert np.allclose(pds.unnorm_power, compare_pds.unnorm_power, rtol=1e-2) assert np.isclose(pds.nphots, compare_pds.nphots, rtol=1e-2) + + @pytest.mark.parametrize("method", ["mpi", "multiprocessing"]) + def test_parallel_versions_fail_unsorted(self, method, caplog): + command = [ + self.fname_unsorted, + "-b", + "0.1", + "-f", + "10.0", + "--method", + method, + "--norm", + "leahy", + ] + main_parallel(command) + for record in caplog.records: + if ( + record.levelname == "ERROR" + and "The input file is probably not sorted." in record.message + ): + break + else: + raise AssertionError("Expected error message not found in logs") From e48875b04ea60c9d5af817ab198f42cbff614e33 Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Fri, 20 Mar 2026 08:42:15 +0100 Subject: [PATCH 15/22] Only test the appropriate combinations --- hendrics/parallel.py | 4 ++-- hendrics/tests/test_parallel.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hendrics/parallel.py b/hendrics/parallel.py index 49e2888f..929a0499 100644 --- a/hendrics/parallel.py +++ b/hendrics/parallel.py @@ -471,10 +471,10 @@ def main(args=None): pds = main_multiprocessing(fname, sample_time, segment_size, world_size=args.nproc) else: pds = main_none(fname, sample_time, segment_size) - except AssertionError as e: + except AssertionError as e: # pragma: no cover if "Start:" in str(e): unsorted_error = True - except TypeError as e: + except TypeError as e: # pragma: no cover if "'>=' not supported between instances" in str(e): unsorted_error = True diff --git a/hendrics/tests/test_parallel.py b/hendrics/tests/test_parallel.py index 00b5e45a..41ccdb18 100644 --- a/hendrics/tests/test_parallel.py +++ b/hendrics/tests/test_parallel.py @@ -91,7 +91,7 @@ def test_parallel_versions(self, method, norm): assert np.allclose(pds.unnorm_power, compare_pds.unnorm_power, rtol=1e-2) assert np.isclose(pds.nphots, compare_pds.nphots, rtol=1e-2) - @pytest.mark.parametrize("method", ["mpi", "multiprocessing"]) + @pytest.mark.parametrize("method", test_cases[1:]) # Skip "none" method for this test def test_parallel_versions_fail_unsorted(self, method, caplog): command = [ self.fname_unsorted, From fccf1ceda5930e72850b93e75005de54250bddc2 Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Fri, 20 Mar 2026 09:00:35 +0100 Subject: [PATCH 16/22] Additional unsorted error --- hendrics/parallel.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hendrics/parallel.py b/hendrics/parallel.py index 929a0499..cf3764a8 100644 --- a/hendrics/parallel.py +++ b/hendrics/parallel.py @@ -477,6 +477,9 @@ def main(args=None): except TypeError as e: # pragma: no cover if "'>=' not supported between instances" in str(e): unsorted_error = True + except AttributeError as e: # pragma: no cover + if "AttributeError: 'NoneType' object has no attribute 'dtype'" in str(e): + unsorted_error = True if unsorted_error: logger.error( From ced98bd9de2136b9dcdb83245e80f19ea46e8228 Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Fri, 20 Mar 2026 09:33:56 +0100 Subject: [PATCH 17/22] Use two cpus in mpi test --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 0b091add..91a06b9b 100644 --- a/tox.ini +++ b/tox.ini @@ -76,7 +76,7 @@ commands = mpi: pip install mpi4py h5py pip freeze - mpi: mpirun -n 1 pytest --pyargs hendrics -k mpi {posargs} --cov hendrics --cov-config={toxinidir}/pyproject.toml {posargs} + mpi: mpirun -n 2 pytest --pyargs hendrics -k mpi {posargs} --cov hendrics --cov-config={toxinidir}/pyproject.toml {posargs} !mpi-!cov: pytest --pyargs hendrics {toxinidir}/docs {posargs} !mpi-cov: pytest --pyargs hendrics {toxinidir}/docs --cov hendrics --cov-config={toxinidir}/pyproject.toml {posargs} From 59102098439993fd7ba8549a3e56047da92f02d6 Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Fri, 20 Mar 2026 11:53:59 +0100 Subject: [PATCH 18/22] Pay attention to rank when testing MPI version --- hendrics/tests/test_parallel.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/hendrics/tests/test_parallel.py b/hendrics/tests/test_parallel.py index 41ccdb18..6c7e18ba 100644 --- a/hendrics/tests/test_parallel.py +++ b/hendrics/tests/test_parallel.py @@ -1,4 +1,5 @@ import importlib +import os import subprocess as sp import tempfile @@ -20,8 +21,6 @@ @pytest.mark.skipif(not HAS_HDF5, reason="h5py is required for this test") class TestParallel: def setup_class(cls): - import os - curdir = os.path.abspath(os.path.dirname(__file__)) cls.datadir = os.path.join(curdir, "data") cls.fname_unsorted = os.path.join(cls.datadir, "monol_testA.evt") @@ -75,6 +74,13 @@ def test_parallel_versions(self, method, norm): norm, ] main_parallel(command) + if method == "mpi": + from mpi4py import MPI + + world_comm = MPI.COMM_WORLD + my_rank = world_comm.Get_rank() + if my_rank != 0: + return pds = AveragedPowerspectrum.read(out_file.name) compare_pds = self.pds.to_norm(norm) if norm != "leahy" else self.pds @@ -84,7 +90,6 @@ def test_parallel_versions(self, method, norm): power_rtol = 1e-6 else: power_rtol = 1e-2 - assert np.allclose(pds.power, compare_pds.power, rtol=power_rtol) assert np.allclose(pds.power_err, compare_pds.power_err, rtol=1e-2) assert pds.norm == compare_pds.norm @@ -105,6 +110,14 @@ def test_parallel_versions_fail_unsorted(self, method, caplog): "leahy", ] main_parallel(command) + if method == "mpi": + from mpi4py import MPI + + world_comm = MPI.COMM_WORLD + my_rank = world_comm.Get_rank() + if my_rank != 0: + return + for record in caplog.records: if ( record.levelname == "ERROR" From 0c7ad0fbfe1c7e2e74ea91a89a8c2a51ee76640b Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Fri, 20 Mar 2026 12:03:47 +0100 Subject: [PATCH 19/22] Make the tests rank-safe --- hendrics/tests/test_parallel.py | 1 + tox.ini | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/hendrics/tests/test_parallel.py b/hendrics/tests/test_parallel.py index 6c7e18ba..22d064af 100644 --- a/hendrics/tests/test_parallel.py +++ b/hendrics/tests/test_parallel.py @@ -115,6 +115,7 @@ def test_parallel_versions_fail_unsorted(self, method, caplog): world_comm = MPI.COMM_WORLD my_rank = world_comm.Get_rank() + if my_rank != 0: return diff --git a/tox.ini b/tox.ini index 91a06b9b..390a5242 100644 --- a/tox.ini +++ b/tox.ini @@ -73,10 +73,10 @@ extras = commands = # Force numpy reinstall to work around upper version limits dependencies put on numpy devdeps: pip install -U --pre --extra-index-url https://pypi.anaconda.org/scientific-python-nightly-wheels/simple numpy - mpi: pip install mpi4py h5py + mpi: pip install mpi4py h5py pytest-mpi pip freeze - mpi: mpirun -n 2 pytest --pyargs hendrics -k mpi {posargs} --cov hendrics --cov-config={toxinidir}/pyproject.toml {posargs} + mpi: mpirun -n 2 pytest --pyargs hendrics -k mpi {posargs} --cov hendrics --with-mpi --cov-config={toxinidir}/pyproject.toml {posargs} !mpi-!cov: pytest --pyargs hendrics {toxinidir}/docs {posargs} !mpi-cov: pytest --pyargs hendrics {toxinidir}/docs --cov hendrics --cov-config={toxinidir}/pyproject.toml {posargs} From acb96e283e4bdd1b82df0823d3d243cee5d86c51 Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Fri, 20 Mar 2026 12:13:58 +0100 Subject: [PATCH 20/22] Raise unless there is a known message --- hendrics/parallel.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/hendrics/parallel.py b/hendrics/parallel.py index cf3764a8..eb63008c 100644 --- a/hendrics/parallel.py +++ b/hendrics/parallel.py @@ -474,12 +474,18 @@ def main(args=None): except AssertionError as e: # pragma: no cover if "Start:" in str(e): unsorted_error = True + else: + raise except TypeError as e: # pragma: no cover if "'>=' not supported between instances" in str(e): unsorted_error = True + else: + raise except AttributeError as e: # pragma: no cover if "AttributeError: 'NoneType' object has no attribute 'dtype'" in str(e): unsorted_error = True + else: + raise if unsorted_error: logger.error( From 62e10ca81cf21637b5ed4c4df9293828c777cf90 Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Fri, 20 Mar 2026 12:40:31 +0100 Subject: [PATCH 21/22] Fix errors --- hendrics/parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hendrics/parallel.py b/hendrics/parallel.py index eb63008c..fc3dab71 100644 --- a/hendrics/parallel.py +++ b/hendrics/parallel.py @@ -482,7 +482,7 @@ def main(args=None): else: raise except AttributeError as e: # pragma: no cover - if "AttributeError: 'NoneType' object has no attribute 'dtype'" in str(e): + if "'NoneType' object has no attribute 'dtype'" in str(e): unsorted_error = True else: raise From 5373f5c55b5fae9003631c4b667bd3bfbd1d5094 Mon Sep 17 00:00:00 2001 From: Matteo Bachetti Date: Fri, 20 Mar 2026 16:49:03 +0100 Subject: [PATCH 22/22] Make coverage work in parallel --- hendrics/tests/test_parallel.py | 6 ++---- pyproject.toml | 2 ++ tox.ini | 10 ++++++++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/hendrics/tests/test_parallel.py b/hendrics/tests/test_parallel.py index 22d064af..667c1e6f 100644 --- a/hendrics/tests/test_parallel.py +++ b/hendrics/tests/test_parallel.py @@ -15,6 +15,8 @@ test_cases = ["none", "multiprocessing"] if HAS_MPI: + from mpi4py import MPI + test_cases.append("mpi") @@ -75,8 +77,6 @@ def test_parallel_versions(self, method, norm): ] main_parallel(command) if method == "mpi": - from mpi4py import MPI - world_comm = MPI.COMM_WORLD my_rank = world_comm.Get_rank() if my_rank != 0: @@ -111,8 +111,6 @@ def test_parallel_versions_fail_unsorted(self, method, caplog): ] main_parallel(command) if method == "mpi": - from mpi4py import MPI - world_comm = MPI.COMM_WORLD my_rank = world_comm.Get_rank() diff --git a/pyproject.toml b/pyproject.toml index 9942a7b3..b0338e09 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -256,6 +256,7 @@ filterwarnings = [ [tool.coverage] [tool.coverage.run] + source = ["hendrics"] omit = [ "hendrics/_astropy_init*", "hendrics/conftest.py", @@ -275,6 +276,7 @@ filterwarnings = [ "*/hendrics/version*", ] + parallel="true" [tool.coverage.report] exclude_lines = [ diff --git a/tox.ini b/tox.ini index 390a5242..a5c90662 100644 --- a/tox.ini +++ b/tox.ini @@ -60,10 +60,13 @@ deps = devdeps: git+https://github.com/stingraysoftware/stingray.git#egg=stingray ; alldeps: netcdf4==1.7.0 mpi: mpi4py + mpi: h5py + mpi: pytest-mpi allowlist_externals = mpi: mpirun + mpi: mpiexec # The following indicates which extras_require will be installed extras = @@ -76,11 +79,14 @@ commands = mpi: pip install mpi4py h5py pytest-mpi pip freeze - mpi: mpirun -n 2 pytest --pyargs hendrics -k mpi {posargs} --cov hendrics --with-mpi --cov-config={toxinidir}/pyproject.toml {posargs} + mpi-cov: mpiexec -n 2 coverage run --rcfile={toxinidir}/pyproject.toml -m pytest --pyargs hendrics -k mpi --with-mpi {posargs} + mpi-!cov: mpirun -n 2 pytest --pyargs hendrics -k mpi --with-mpi {posargs} !mpi-!cov: pytest --pyargs hendrics {toxinidir}/docs {posargs} !mpi-cov: pytest --pyargs hendrics {toxinidir}/docs --cov hendrics --cov-config={toxinidir}/pyproject.toml {posargs} - cov: coverage xml -o {toxinidir}/coverage.xml + mpi-cov: coverage combine --rcfile={toxinidir}/pyproject.toml + mpi-cov: coverage report --rcfile={toxinidir}/pyproject.toml + cov: coverage xml -o {toxinidir}/coverage.xml --rcfile={toxinidir}/pyproject.toml [testenv:build_docs] changedir = docs