diff --git a/.github/workflows/ci_test.yml b/.github/workflows/ci_test.yml index 2bc94500..a9db0cf0 100644 --- a/.github/workflows/ci_test.yml +++ b/.github/workflows/ci_test.yml @@ -95,30 +95,33 @@ jobs: matrix: include: - os: ubuntu-latest - python: '3.10' + python: '3.13' tox_env: 'codestyle' - os: ubuntu-latest - python: '3.12' - tox_env: 'py312-test-cov' + python: '3.13' + tox_env: 'py313-test-cov' + - os: ubuntu-latest + python: '3.13' + tox_env: 'py313-test-mpi-cov' - os: ubuntu-latest python: '3.9' tox_env: 'py39-test-cov' - os: ubuntu-latest - python: '3.11' - tox_env: 'py311-test-alldeps-cov' + python: '3.13' + tox_env: 'py313-test-alldeps-cov' use_remote_data: true - os: macos-14 - python: '3.11' - tox_env: 'py311-test-alldeps-cov' + python: '3.13' + tox_env: 'py313-test-alldeps-cov' use_remote_data: true - os: ubuntu-latest - python: '3.12' - tox_env: 'py312-test-devdeps' + python: '3.14' + tox_env: 'py314-test-devdeps' use_remote_data: true continue-on-error: true - os: ubuntu-latest - python: '3.12' - tox_env: 'py312-test-devpint' + python: '3.13' + tox_env: 'py313-test-devpint' use_remote_data: true continue-on-error: true - os: ubuntu-latest @@ -126,11 +129,11 @@ jobs: tox_env: 'py39-test-oldestdeps-cov' use_remote_data: true - os: macos-latest - python: '3.11' - tox_env: 'py311-test' + python: '3.13' + tox_env: 'py313-test' - os: windows-latest - python: '3.11' - tox_env: 'py311-test' + python: '3.13' + tox_env: 'py313-test' steps: - name: Check out repository @@ -152,6 +155,11 @@ jobs: python -c "import pip; print(f'pip {pip.__version__}')" python -c "import setuptools; print(f'setuptools {setuptools.__version__}')" python -c "import tox; print(f'tox {tox.__version__}')" + - name: Setup MPI + if: "contains(matrix.tox_env, 'mpi')" + uses: mpi4py/setup-mpi@v1 + with: + mpi: "openmpi" - name: Run tests if: "! matrix.use_remote_data" run: tox -e ${{ matrix.tox_env }} diff --git a/docs/changes/180.feature.rst b/docs/changes/180.feature.rst new file mode 100644 index 00000000..fcddf6ae --- /dev/null +++ b/docs/changes/180.feature.rst @@ -0,0 +1 @@ +HENfspec can now use a new parallel implementation using MPI or multiprocessing for the power spectrum diff --git a/hendrics/parallel.py b/hendrics/parallel.py new file mode 100644 index 00000000..fc3dab71 --- /dev/null +++ b/hendrics/parallel.py @@ -0,0 +1,501 @@ +from functools import partial +from multiprocessing import Pool + +import numpy as np +from stingray import AveragedPowerspectrum, EventList +from stingray.fourier import positive_fft_bins +from stingray.gti import time_intervals_from_gtis +from stingray.io import FITSTimeseriesReader +from stingray.loggingconfig import logger +from stingray.utils import histogram + + +def get_data_intervals(interval_idxs, info=None, fname=None, sample_time=None): + """Generate light curves for specified time intervals from event data. + + Parameters + ---------- + interval_idxs : array-like + Indices specifying which intervals to extract from `info["interval_times"]`. + info : dict, optional + Dictionary containing metadata, must include "interval_times" key with time intervals. + fname : str, optional + Path to the FITS file containing event data. + sample_time : float, optional + The time resolution (bin width) for the light curve histogram. + + Yields + ------ + lc : array-like + Histogrammed light curve for each specified interval. + + Notes + ----- + - Uses FITSTimeseriesReader to read event data from the FITS file. + - Each yielded light curve corresponds to one interval in `interval_idxs`. + - The number of bins is determined by the interval duration and `sample_time`. + """ + tsreader = FITSTimeseriesReader(fname, output_class=EventList) + time_intervals = info["interval_times"][interval_idxs] + if np.shape(time_intervals) == (2,): + time_intervals = [time_intervals] + + # This creates a generator of event lists + event_lists = tsreader.filter_at_time_intervals(time_intervals) + + for ev, t_int in zip(event_lists, time_intervals): + nbin = int(np.rint((t_int[1] - t_int[0]) / sample_time)) + lc = histogram(ev.time, bins=nbin, range=(t_int[0], t_int[1])) + yield lc + + +def single_rank_intervals(this_ranks_intervals, sample_time=None, info=None, fname=None): + """Generate an averaged powerspectrum from light curve intervals for a single rank. + + Parameters + ---------- + this_ranks_intervals : list or array-like + List of intervals (e.g., time ranges) assigned to the current rank. + sample_time : float, optional + The time resolution (bin size) for the light curve data. + info : dict, optional + Dictionary containing metadata about the intervals, including "interval_times". + fname : str or None, optional + Filename or path to the data source, if required by `get_data_intervals`. + + Returns + ------- + pds : AveragedPowerspectrum + The averaged powerspectrum computed from the light curve intervals. + nbin : int + Number of bins in each interval, calculated from the interval duration and sample time. + + Notes + ----- + This function extracts light curve data for the specified intervals, computes the number of bins, + and returns the averaged powerspectrum using the "leahy" normalization. + """ + # print(kwargs) + t_int = info["interval_times"][0] + nbin = int(np.rint((t_int[1] - t_int[0]) / sample_time)) + lc_iterable = get_data_intervals( + this_ranks_intervals, info=info, fname=fname, sample_time=sample_time + ) + intv = info["interval_times"][0] + segment_size = intv[1] - intv[0] + pds = AveragedPowerspectrum.from_lc_iterable( + lc_iterable, + segment_size=segment_size, + dt=sample_time, + norm="leahy", + silent=True, + use_common_mean=False, + ) + return pds, nbin + + +def main_none(fname, sample_time, segment_size): + """Process a FITS timeseries file and compute an averaged powerspectrum. This function reads event data from a FITS file, processes it using the Stingray library, and computes an averaged powerspectrum with Leahy normalization. + + Parameters + ---------- + fname : str + Path to the FITS file containing the timeseries data. + sample_time : float + The time resolution (bin size) to use when processing the data. + segment_size : float + The size of each segment (in seconds) for averaging the powerspectrum. + + Returns + ------- + freq : numpy.ndarray + Array of frequency values for the computed powerspectrum. + power : numpy.ndarray + Array of power values corresponding to each frequency. + + Notes + ----- + This function uses the standard Stingray processing pipeline and does not + parallelize the computation. + """ + logger.info("Using standard Stingray processing") + tsreader = FITSTimeseriesReader(fname, output_class=EventList) + + data = tsreader[:] + pds = AveragedPowerspectrum.from_events( + data, + dt=sample_time, + segment_size=segment_size, + norm="leahy", + use_common_mean=False, + ) + + return pds + + +def main_mpi(fname, sample_time, segment_size): + """ + Perform parallel processing of time series data using MPI. + This function distributes the processing of time series intervals across multiple MPI ranks. + Each rank processes a subset of intervals, computes partial results, and then combines them + using a binary tree reduction algorithm to obtain the final result. + Algorithm: + 1. The root rank (rank 0) loads time series data and computes interval boundaries. + 2. The interval information is broadcasted to all ranks. + 3. Each rank determines its assigned intervals and processes them using `single_rank_intervals`. + 4. All ranks synchronize using MPI barriers. + 5. Partial results are combined using a binary tree reduction, where ranks pair up and + send/receive data until only one rank holds the final result. + 6. The final result is normalized and frequency bins are computed. + + Parameters + ---------- + fname : str + Path to the FITS file containing the time series data. + sample_time : float + The sampling time for the time series analysis. + segment_size : float + The size of each segment (interval) to be processed. + + Returns + ------- + output : AveragedPowerspectrum + The averaged power spectrum computed across all intervals and processes. + + Notes + ----- + - This function requires an MPI environment and assumes that the necessary MPI communicator + and dependencies are available. + - Only the rank responsible for the final reduction returns the results; other ranks return None. + """ + from mpi4py import MPI + + tsreader = FITSTimeseriesReader(fname, output_class=EventList) + + def data_lookup(): + # This will also contain the boundaries of the data + # to be loaded + start, stop = time_intervals_from_gtis(tsreader.gti, segment_size) + interval_times = np.array(list(zip(start, stop))) + return { + "gtis": tsreader.gti, + "interval_times": interval_times, + "n_intervals": len(interval_times), + "nphots": tsreader.nphot / tsreader.exposure * segment_size, + } + + world_comm = MPI.COMM_WORLD + world_size = world_comm.Get_size() + my_rank = world_comm.Get_rank() + + info = None + if my_rank == 0: + logger.debug(f"{my_rank}: Loading data") + info = data_lookup() + + info = world_comm.bcast(info, root=0) + + if my_rank == 0: + logger.debug(f"{my_rank}: Info:", info) + + total_n_intervals = info["n_intervals"] + + intervals_per_rank = total_n_intervals / world_size + all_intervals = np.arange(total_n_intervals, dtype=int) + + this_ranks_intervals = all_intervals[ + (all_intervals >= my_rank * intervals_per_rank) + & (all_intervals < (my_rank + 1) * intervals_per_rank) + ] + logger.debug( + f"{my_rank}: Intervals {this_ranks_intervals[0] + 1} " f"to {this_ranks_intervals[-1] + 1}" + ) + + # data = get_data_intervals(this_ranks_intervals) + result, data_size = single_rank_intervals( + this_ranks_intervals, info=info, fname=fname, sample_time=sample_time + ) + + world_comm.Barrier() + + # NOW, the binary tree reduction + # Perform binary tree reduction + totals = result.power * result.m + previous_processors = np.arange(world_size, dtype=int) + + while 1: + new_processors = previous_processors[::2] + parner_processors = previous_processors[1::2] + + if my_rank == 0: + logger.debug(f"{my_rank}: New processors: {new_processors}") + if my_rank == 0: + logger.debug(f"{my_rank}: Partners: {parner_processors}") + if len(parner_processors) == 0: + if my_rank == 0: + logger.debug(f"{my_rank}: Done.") + break + world_comm.Barrier() + + for i, (sender, receiver) in enumerate(zip(parner_processors, new_processors)): + if my_rank == 0: + logger.debug(f"Loop {i + 1}: {sender}, {receiver}") + tag = 10000 + 100 * sender + receiver + if my_rank == receiver: + data_from_partners = np.zeros(totals.size) + logger.debug(f"{my_rank}: Receiving from {sender} with tag {tag}") + # Might be good to use a non-blocking receive here + world_comm.Recv( + data_from_partners, + source=sender, + tag=tag, + ) + totals += data_from_partners + logger.debug(f"{my_rank}: New data are now {totals}") + elif my_rank == sender: + # Only one partner for now. Might be tweaked differently. The rest should work with + # any number of partners for a given processing rank + + logger.debug(f"{my_rank}: Sending to {receiver} with tag {tag}") + world_comm.Send(totals, dest=receiver, tag=tag) + else: + logger.debug(f"{my_rank}: Doing nothing") + + world_comm.Barrier() + previous_processors = new_processors + + world_comm.Barrier() + + assert len(new_processors) == 1 + if my_rank == new_processors[0]: + logger.debug("Results") + totals /= total_n_intervals + + freq = np.fft.fftfreq(data_size, d=sample_time)[positive_fft_bins(data_size)] + output = AveragedPowerspectrum() + output.freq = freq + output.power = totals + output.m = total_n_intervals + output.gti = info["gtis"] + output.dt = sample_time + output.power_err = totals / np.sqrt(total_n_intervals) + output.unnorm_power = totals / 2 * info["nphots"] + output.unnorm_power_err = output.unnorm_power / np.sqrt(total_n_intervals) + output.norm = "leahy" + output.nphots = info["nphots"] + output.n = np.rint(segment_size / sample_time).astype(int) + + return output + + return None + + +def main_multiprocessing(fname, sample_time, segment_size, world_size=8): + """Run parallel analysis on time series data using Python's multiprocessing. + + This function divides the input time series data into segments and distributes the analysis + across multiple processes. Each process computes results for a subset of intervals, and the + results are aggregated to produce the final output. + Algorithm: + 1. Load time series data and determine Good Time Intervals (GTIs). + 2. Split the data into segments of specified size. + 3. Assign segments to worker processes based on the number of available processes (`world_size`). + 4. Each process analyzes its assigned intervals using `single_rank_intervals`. + 5. Aggregate results from all processes and compute the average power spectrum. + 6. Return the frequency array and the averaged power spectrum. + + Parameters + ---------- + fname : str + Path to the FITS file containing the time series data. + sample_time : float + The time resolution of the data samples. + segment_size : float + The size (in seconds) of each segment to analyze. + world_size : int, optional + Number of parallel worker processes to use (default is 8). + + Returns + ------- + output : AveragedPowerspectrum + The averaged power spectrum computed across all intervals and processes. + """ + + def data_lookup(): + # This will also contain the boundaries of the data + # to be loaded + tsreader = FITSTimeseriesReader(fname, output_class=EventList) + + start, stop = time_intervals_from_gtis(tsreader.gti, segment_size) + interval_times = np.array(list(zip(start, stop))) + return { + "gtis": tsreader.gti, + "interval_times": interval_times, + "n_intervals": len(interval_times), + } + + info = data_lookup() + data_size = np.rint(segment_size / sample_time).astype(int) + + logger.debug("Info:", info) + + total_n_intervals = info["n_intervals"] + + # intervals_per_rank = total_n_intervals / world_size + all_intervals = np.arange(total_n_intervals, dtype=int) + + intervals_per_rank = total_n_intervals / world_size + this_ranks_intervals = [] + for my_rank in range(world_size): + this_ranks_intervals.append( + all_intervals[ + (all_intervals >= my_rank * intervals_per_rank) + & (all_intervals < (my_rank + 1) * intervals_per_rank) + ] + ) + + p = Pool(world_size) + + totals = 0 + nphots = 0 + for results, data_size in p.imap_unordered( + partial( + single_rank_intervals, + info=info, + fname=fname, + sample_time=sample_time, + ), + this_ranks_intervals, + ): + totals += results.power * results.m + nphots += results.nphots * results.m + logger.debug("Results") + totals /= total_n_intervals + nphots /= total_n_intervals + + freq = np.fft.fftfreq(data_size, d=sample_time)[positive_fft_bins(data_size)] + + output = AveragedPowerspectrum() + output.freq = freq + output.power = totals + output.m = total_n_intervals + output.gti = info["gtis"] + output.dt = sample_time + output.power_err = totals / np.sqrt(total_n_intervals) + output.unnorm_power = totals / 2 * nphots + output.unnorm_power_err = output.unnorm_power / np.sqrt(total_n_intervals) + output.norm = "leahy" + output.nphots = nphots + output.n = np.rint(segment_size / sample_time).astype(int) + + return output + + +def main(args=None): + import argparse + + from hendrics.base import _add_default_args, check_negative_numbers_in_args + + description = ( + "Compute the Leahy-normalized power spectrum of an event list in parallel.\n" + "The MPI version needs to be run with mpiexec, as follows:\n" + "mpiexec -n 10 python HENparfspec filename.fits --method mpi\n" + "To run the algorithm in parallel using multiprocessing, use:\n" + "python HENparfspec filename.fits --method multiprocessing --nproc 10\n" + "To run the algorithm sequentially, for testing purposes, just execute" + "HENparfspec filename.fits\n" + ) + parser = argparse.ArgumentParser(description=description) + parser.add_argument("fname", help="Input FITS file name") + parser.add_argument("-o", "--outfname", help="Output FITS file name", default="out_pds.fits") + parser.add_argument( + "-b", + "--sample_time", + type=float, + default=1 / 8129 / 2, + help=( + "Light curve bin time; if negative, interpreted" + " as negative power of 2." + " Default: 2^-13, or keep input lc bin time" + " (whatever is larger)" + ), + ) + + parser.add_argument( + "-f", + "--segment_size", + type=float, + default=128, + help="Length of FFTs. Default: 16 s", + ) + parser.add_argument( + "--norm", + type=str, + default="leahy", + help="Normalization to use" + " (Accepted: leahy and rms;" + ' Default: "leahy")', + ) + parser.add_argument( + "--method", + type=str, + choices=["mpi", "multiprocessing", "none"], + default="none", + help="Computation distribution method", + ) + parser.add_argument( + "--nproc", + type=int, + default=8, + help="Number of processors to use", + ) + _add_default_args(parser, ["loglevel", "debug"]) + + args = check_negative_numbers_in_args(args) + args = parser.parse_args(args) + + if args.debug: + args.loglevel = "DEBUG" + + logger.setLevel(args.loglevel) + + fname = args.fname + + sample_time = args.sample_time + segment_size = args.segment_size + + unsorted_error = False + try: + if args.method == "mpi": + # This method needs to be run with mpiexec -n python script.py + pds = main_mpi(fname, sample_time, segment_size) + elif args.method == "multiprocessing": + pds = main_multiprocessing(fname, sample_time, segment_size, world_size=args.nproc) + else: + pds = main_none(fname, sample_time, segment_size) + except AssertionError as e: # pragma: no cover + if "Start:" in str(e): + unsorted_error = True + else: + raise + except TypeError as e: # pragma: no cover + if "'>=' not supported between instances" in str(e): + unsorted_error = True + else: + raise + except AttributeError as e: # pragma: no cover + if "'NoneType' object has no attribute 'dtype'" in str(e): + unsorted_error = True + else: + raise + + if unsorted_error: + logger.error( + "The input file is probably not sorted. Please sort it (e.g. with ftsort) and try again." + ) + return + + if pds is None: + return + if args.norm != "leahy": + pds = pds.to_norm(args.norm) + + pds.write(args.outfname) diff --git a/hendrics/tests/data/monol_testA_sort.evt b/hendrics/tests/data/monol_testA_sort.evt new file mode 100644 index 00000000..69e62cec --- /dev/null +++ b/hendrics/tests/data/monol_testA_sort.evt @@ -0,0 +1,22 @@ +SIMPLE = T / conforms to FITS standard BITPIX = 8 / array data type NAXIS = 0 / number of array dimensions EXTEND = T OBSERVER= 'Edwige Bubble' TELESCOP= 'NuSTAR ' / Telescope (mission) name INSTRUME= 'FPMA ' / Instrument name CHECKSUM= 'Kc4GMZ1FKa1FKY1F' / HDU checksum updated 2021-05-23T20:25:05 DATASUM = '0 ' / data unit checksum updated 2021-05-23T20:25:05 END XTENSION= 'BINTABLE' / binary table extension BITPIX = 8 / array data type NAXIS = 2 / number of array dimensions NAXIS1 = 12 / length of dimension 1 NAXIS2 = 1000 / length of dimension 2 PCOUNT = 0 / number of group parameters GCOUNT = 1 / number of groups TFIELDS = 2 / number of table fields TTYPE1 = 'TIME ' TFORM1 = '1D ' TTYPE2 = 'PI ' TFORM2 = '1J ' EXTNAME = 'EVENTS ' / extension name OBSERVER= 'Edwige Bubble' TELESCOP= 'NuSTAR ' / Telescope (mission) name INSTRUME= 'FPMA ' / Instrument name OBS_ID = '00000000001' / Observation ID TARG_ID = 0 / Target ID OBJECT = 'Fake X-1' / Name of observed object RA_OBJ = 0.0 / [deg] R.A. Object DEC_OBJ = 0.0 / [deg] Dec Object RA_NOM = 0.0 / Right Ascension used for barycenter correctionsDEC_NOM = 0.0 / Declination used for barycenter corrections RA_PNT = 0.0 / [deg] RA pointing DEC_PNT = 0.0 / [deg] Dec pointing PA_PNT = 0.0 / [deg] Position angle (roll) EQUINOX = 2000.0 / Equinox of celestial coord system RADECSYS= 'FK5 ' / Coordinate Reference System TASSIGN = 'SATELLITE' / Time assigned by onboard clock TIMESYS = 'TDB ' / All times in this file are TDB MJDREFI = 55197 / TDB time reference; Modified Julian Day (int) MJDREFF = 0.00076601852 / TDB time reference; Modified Julian Day (frac) TIMEREF = 'SOLARSYSTEM' / Times are pathlength-corrected to barycenter CLOCKAPP= F / TRUE if timestamps corrected by gnd sware TIMEUNIT= 's ' / unit for time keywords TSTART = 80000000.0 / Elapsed seconds since MJDREF at start of file TSTOP = 80001025.0 / Elapsed seconds since MJDREF at end of file LIVETIME= 1025.0 / On-source time PLEPHEM = 'JPL-DE200' TIMEZERO= 0.0 / Time Zero CHECKSUM= 'SLa3ULY2SLa2SLY2' / HDU checksum updated 2021-05-23T20:25:05 DATASUM = '3443983656' / data unit checksum updated 2021-05-23T20:25:05 COMMENT FITS (Flexible Image Transport System) format is defined in 'Astronomy aCOMMENT nd Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H COMMENT MJDREFI+MJDREFF = epoch of Jan 1, 2010, in TT time system. HISTORY File modified by user 'meo' with fv on 2015-08-17T14:10:02 HISTORY File modified by user 'meo' with fv on 2015-08-17T14:48:52 HISTORY File modified by user 'meo' with fv on 2021-05-05T10:33:13 END A:A02A#k6A (aA AQV4A_VtAA >kA A&!A)sBA+" A/~R|A7A8d)A9 +1A=QsAHU;AK[A5AO AS +aOATAX1A^JA^xw2AcnAgAh BAl[GQArc;A|CO A|ljAЂUAЃ;7AЄ}AЇg7oAЋr/AИFAЙH,AЩ@QAЮĐF +Aж@BA!AӭA )Av>AA;3A#(ZAoGA9A(8PgA ]KAg(dAA#}A%XA'B7A+A0,DA2zLBA9|82A=t8A@|AAJ8ANzAQ#fAQ]AR AVteAX@ +CA\IA\oRLA]Aa"Ab$AddAhv_Ak]Ak6S=AnApsAsA{8FA|2AтDxAуNVAъUAы"@Aь Aѐ@Aѓi% Aћ ] Aќ:PAѠ,0AѭUp@AѭAѮDAѵ»AѶ AѽAøA.ACAt=lAҁGtXA'8AAHAڒAޑ&AHA{yfA\A4(%A]{!A.#XAUA3z0A5yAAXwAЁyAv!A8A 8e?AԸ0A]'A1MAA!DaA!A!jA"c&G8A#AtA$r,A'`OA0aA6WA7nA94AAA͗AIp&{DAIAAKYqAOsm +APFAS+7AXAYAZ + AyAzCg"AҀ~AҊ5AAҎAҐ'eAґ AҔ'}Aҙy.Aҡ\\AҦWmTAҦx-%Aҫ8TAұAұAҸn"AҸ|Aһ5AһZlAq$AwAکAQ0AӪAoX/|Aظ(6HAC8A~A޻e/A?YAFe[AM;baAOGB AQAT!L{AW5υAY%AZ A[(zjA\bOA]IbSA]Ad8X9Af;AfVAnl':ArAsJAvbQVAŰAԁ3lAԃaVRAԅ""vAԆAԋ$AԋAԌmAԌ팒AԕlAԡ0-cABOj}ALAN,AP(AQr_AR QAcҭxAh(AkzApdAp%As=A{y.A};AւAք|>A։vA֑+ A֒"A֓8&nA֓tA֓mA֘r+WA֛1J A֡A֣C8RA֥ A֧x A֪^#A֬ZA֭qmCAֲ-/Aֳ]AֵASAʘA%AAKAOw'A5k+A+A5AA'BAdAAEaEAAXpA>A +/A +<A6`AB'ATAjArMAIA!+A(}4A,!6&A-<CA/A@@AAUfAI].,ALwAO|kAS'AWe?AW}" A`SAAa=MAd|Af+.ZAp!AsJ A׃Aׄ=]KAׅ}FAׇ59AׇT)fA׋-A׏nAבI`LAד9+AכfAל6XAן^.:A׭A׮A׵ogyA׹FNA׼A׽אEAAnYAӖAٽZA]A;Ay`AhA=A诈APLeJAټSNA0AҠAxApACHAAa3YAAMkAR"tUA"8A"/PxA$ A%A&GmA,;8A43A?FAL<APl8?ATAY0AY]z#A[VVA[1 A[A_AerAiHwApP.Apt%A؊ތ2A؋\sDA؍:&Aؖ!%Aؚ@~AؚhAآGAاOAحbAػKҋAػAؽ!gA[yAķAK3AʷRABA(gBArOAA$ A2GAFA~AUA +AW +A.@AATWH*A +kcQA gA -AtSvA)vArA("iA71/A=ܗ|A? AD\8AH+!dAK?]AMpqyAO*ASRw=A[xA^ZAc,cAhn/Aj5|Am}|Az +ADAAE +5sAEUAJpSAP2 ARXAWQ]k*AWXzAY AZOA\A\ +iDAah;Ab5,AdqAhWT6AzƽA|8Aڅb6Aڋ/AڐRAJAڒƌAړ7AڙAڡAڪ +AڱdahAڵAںAڻF`ABAPAÇ&AĢwA FJA2AOtÁ*[VAҨpA}AIm#Aߑ(Aj=hAtAEAA|NAAvAP[A QAA̩A=y+ATA2A%??A'A'+ A-Vֆ A.7A3AuA3ݴA4GA5 Y@A6牱LA8OA<AE^AEt"SAG)E9AHoDAIAN67 AN7AXJqMAa@AaŰAawAc >OAkjMAA &^A (@A@\+AmAAANAEs'A]yAA!l)A#A&M{A&"A( A,OA3{)A5y>VA7??JA9Vz6A:pcA=,$AFÈj7AHY AJ)xARAR[YA^+XAbAi\AtWAy"v AzOdAހ:˖Aޖ7Aޗ Aޘh`Aޟ?.@AޠaAǬRA*ޓAmߣA? 3AQAARJ!'AVqɣAVA]AdneAdTAkwApZAAsAxc7A{*CA|@A7:A߄2A߉oAߊKxcAߍ:Aߍ/Aߏ͸AߑӰAߒ;IAߔ鱰Aߕ)AߗAߛ@ZAߝAߞ_;AߠaxAߥAߪ-PA߬xA߮BRA߷+A߾x+Ab4AF +A²AΟ0AU-Ab_A AJAY|AśXTENSION= 'BINTABLE' / binary table extension BITPIX = 8 / array data type NAXIS = 2 / number of array dimensions NAXIS1 = 16 / length of dimension 1 NAXIS2 = 1 / length of dimension 2 PCOUNT = 0 / number of group parameters GCOUNT = 1 / number of groups TFIELDS = 2 / number of table fields TTYPE1 = 'START ' TFORM1 = '1D ' TTYPE2 = 'STOP ' TFORM2 = '1D ' EXTNAME = 'GTI ' / extension name CHECKSUM= 'Mah9PUh6Mah6MUh6' / HDU checksum updated 2021-05-23T20:25:05 DATASUM = '2267424176' / data unit checksum updated 2021-05-23T20:25:05 END AA \ No newline at end of file diff --git a/hendrics/tests/test_parallel.py b/hendrics/tests/test_parallel.py new file mode 100644 index 00000000..667c1e6f --- /dev/null +++ b/hendrics/tests/test_parallel.py @@ -0,0 +1,127 @@ +import importlib +import os +import subprocess as sp +import tempfile + +import numpy as np +import pytest +from stingray import AveragedPowerspectrum, EventList + +from hendrics.fake import main as main_fake +from hendrics.parallel import main as main_parallel + +HAS_MPI = importlib.util.find_spec("mpi4py") is not None +HAS_HDF5 = importlib.util.find_spec("h5py") is not None + +test_cases = ["none", "multiprocessing"] +if HAS_MPI: + from mpi4py import MPI + + test_cases.append("mpi") + + +@pytest.mark.skipif(not HAS_HDF5, reason="h5py is required for this test") +class TestParallel: + def setup_class(cls): + curdir = os.path.abspath(os.path.dirname(__file__)) + cls.datadir = os.path.join(curdir, "data") + cls.fname_unsorted = os.path.join(cls.datadir, "monol_testA.evt") + cls.fname_sorted = os.path.join(cls.datadir, "monol_testA_sort.evt") + + cls.tempfile = tempfile.NamedTemporaryFile(suffix=".fits", delete=False) + cls.fname = cls.tempfile.name + print(cls.fname) + + main_fake( + ["-o", cls.fname, "-c", "10", "--tstart", "0", "--tstop", "10000", "--seed", "42"] + ) + cls.events = EventList.read(cls.fname, fmt="ogip") + cls.pds = AveragedPowerspectrum.from_events( + cls.events, dt=0.1, segment_size=10.0, use_common_mean=False, norm="leahy" + ) + + @pytest.mark.parametrize("method", test_cases) + def test_parallel_versions_on_small_file(self, method): + out_file = tempfile.NamedTemporaryFile(suffix=".hdf5", delete=False) + command = [ + self.fname_sorted, + "-o", + out_file.name, + "-b", + "0.1", + "-f", + "10.0", + "--method", + method, + "--norm", + "leahy", + ] + main_parallel(command) + + @pytest.mark.parametrize("method", test_cases) + @pytest.mark.parametrize("norm", ["leahy", "frac", "abs"]) + def test_parallel_versions(self, method, norm): + out_file = tempfile.NamedTemporaryFile(suffix=".hdf5", delete=False) + command = [ + self.fname, + "-o", + out_file.name, + "-b", + "0.1", + "-f", + "10.0", + "--method", + method, + "--norm", + norm, + ] + main_parallel(command) + if method == "mpi": + world_comm = MPI.COMM_WORLD + my_rank = world_comm.Get_rank() + if my_rank != 0: + return + + pds = AveragedPowerspectrum.read(out_file.name) + compare_pds = self.pds.to_norm(norm) if norm != "leahy" else self.pds + assert pds.m == compare_pds.m + assert np.allclose(pds.freq, compare_pds.freq) + if norm == "leahy": + power_rtol = 1e-6 + else: + power_rtol = 1e-2 + assert np.allclose(pds.power, compare_pds.power, rtol=power_rtol) + assert np.allclose(pds.power_err, compare_pds.power_err, rtol=1e-2) + assert pds.norm == compare_pds.norm + assert np.allclose(pds.unnorm_power, compare_pds.unnorm_power, rtol=1e-2) + assert np.isclose(pds.nphots, compare_pds.nphots, rtol=1e-2) + + @pytest.mark.parametrize("method", test_cases[1:]) # Skip "none" method for this test + def test_parallel_versions_fail_unsorted(self, method, caplog): + command = [ + self.fname_unsorted, + "-b", + "0.1", + "-f", + "10.0", + "--method", + method, + "--norm", + "leahy", + ] + main_parallel(command) + if method == "mpi": + world_comm = MPI.COMM_WORLD + my_rank = world_comm.Get_rank() + + if my_rank != 0: + return + + for record in caplog.records: + if ( + record.levelname == "ERROR" + and "The input file is probably not sorted." in record.message + ): + break + else: + raise AssertionError("Expected error message not found in logs") diff --git a/pyproject.toml b/pyproject.toml index 3551a7ff..b0338e09 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,7 +56,7 @@ test_all = [ "pint-pulsar", "imageio", "numba", - "netcdf4==1.7.0", + "netcdf4", "scikit-image", ] recommended = [ @@ -70,7 +70,7 @@ all = [ "pandas", "pint-pulsar", "imageio", - "netcdf4==1.7.0", + "netcdf4", "scikit-image", ] docs = [ @@ -115,6 +115,7 @@ HENlcurve = "hendrics.lcurve:main" HENmodel = "hendrics.modeling:main_model" HENphaseogram = "hendrics.phaseogram:main_phaseogram" HENphasetag = "hendrics.phasetag:main_phasetag" +HENparfspec = "hendrics.parallel:main" HENplot = "hendrics.plot:main" HENpowercolors = "hendrics.power_colors:main" HENreadevents = "hendrics.read_events:main" @@ -255,6 +256,7 @@ filterwarnings = [ [tool.coverage] [tool.coverage.run] + source = ["hendrics"] omit = [ "hendrics/_astropy_init*", "hendrics/conftest.py", @@ -274,6 +276,7 @@ filterwarnings = [ "*/hendrics/version*", ] + parallel="true" [tool.coverage.report] exclude_lines = [ diff --git a/tox.ini b/tox.ini index df65fa1c..a5c90662 100644 --- a/tox.ini +++ b/tox.ini @@ -36,6 +36,7 @@ description = cov: and test coverage astropy4: with astropy 4 astropy5: with astropy 5 + mpi: with MPI support setenv = devdeps: PIP_EXTRA_INDEX_URL = https://pypi.anaconda.org/scientific-python-nightly-wheels/simple https://pypi.anaconda.org/astropy/simple https://pypi.anaconda.org/liberfa/simple @@ -57,7 +58,15 @@ deps = devdeps: pyerfa>=0.0.dev devpint: git+https://github.com/nanograv/pint.git#egg=pint-pulsar devdeps: git+https://github.com/stingraysoftware/stingray.git#egg=stingray - alldeps: netcdf4==1.7.0 + ; alldeps: netcdf4==1.7.0 + mpi: mpi4py + mpi: h5py + mpi: pytest-mpi + + +allowlist_externals = + mpi: mpirun + mpi: mpiexec # The following indicates which extras_require will be installed extras = @@ -67,11 +76,17 @@ extras = commands = # Force numpy reinstall to work around upper version limits dependencies put on numpy devdeps: pip install -U --pre --extra-index-url https://pypi.anaconda.org/scientific-python-nightly-wheels/simple numpy + mpi: pip install mpi4py h5py pytest-mpi pip freeze - !cov: pytest --pyargs hendrics {toxinidir}/docs {posargs} - cov: pytest --pyargs hendrics {toxinidir}/docs --cov hendrics --cov-config={toxinidir}/pyproject.toml {posargs} - cov: coverage xml -o {toxinidir}/coverage.xml + mpi-cov: mpiexec -n 2 coverage run --rcfile={toxinidir}/pyproject.toml -m pytest --pyargs hendrics -k mpi --with-mpi {posargs} + mpi-!cov: mpirun -n 2 pytest --pyargs hendrics -k mpi --with-mpi {posargs} + + !mpi-!cov: pytest --pyargs hendrics {toxinidir}/docs {posargs} + !mpi-cov: pytest --pyargs hendrics {toxinidir}/docs --cov hendrics --cov-config={toxinidir}/pyproject.toml {posargs} + mpi-cov: coverage combine --rcfile={toxinidir}/pyproject.toml + mpi-cov: coverage report --rcfile={toxinidir}/pyproject.toml + cov: coverage xml -o {toxinidir}/coverage.xml --rcfile={toxinidir}/pyproject.toml [testenv:build_docs] changedir = docs