diff --git a/pysp2/util/__init__.py b/pysp2/util/__init__.py index 0a224ee..e35fb92 100644 --- a/pysp2/util/__init__.py +++ b/pysp2/util/__init__.py @@ -18,4 +18,4 @@ from .particle_properties import calc_diams_masses, process_psds from .deadtime import deadtime from .leo_fit import beam_shape,leo_fit -from .normalized_derivative_method import central_difference +from .normalized_derivative_method import central_difference, plot_normalized_derivative, mle_tau_moteki_kondo diff --git a/pysp2/util/normalized_derivative_method.py b/pysp2/util/normalized_derivative_method.py index 27aaa0b..1a91266 100644 --- a/pysp2/util/normalized_derivative_method.py +++ b/pysp2/util/normalized_derivative_method.py @@ -1,6 +1,12 @@ +from __future__ import annotations + import numpy as np import xarray as xr +from dataclasses import dataclass +from typing import Optional, Union + + def central_difference(S, num_records=None, normalize=True, baseline_to_zero=True): """ @@ -23,6 +29,8 @@ def central_difference(S, num_records=None, normalize=True, baseline_to_zero=Tru normalize: bool If True, normalize the derivative by the scattering signal S(t) to get (1/S) * dS/dt. + baseline_to_zero: bool + If True, shift each record's minimum to zero before differentiation. Returns ------- @@ -110,15 +118,307 @@ def plot_normalized_derivative(ds, record_no, chn=0): spectra['Data_ch' + str(chn)].values[np.newaxis, :], dims=['time', 'bins']) inp_data = xr.Dataset(inp_data) - bins = np.linspace(0, 100, 100) + + bins = np.arange(0, 0.00004-0.3e-6, 0.4e-6) # 0 to 0.0004 microseconds in steps of 0.4e-6 seconds + bins = bins*1e6 # convert to microseconds for plotting ch_name = f'Data_ch{chn}' plt.figure(figsize=(10, 6)) ax = plt.gca() - inp_data[ch_name].plot(ax=ax) + # Plot using bins for x-axis + ax.plot(bins, spectra['Data_ch' + str(chn)].values, label=ch_name) + ax.set_xlim([bins[0], bins[-1]]) ax.set_title(f'Normalized Derivative of Scattering Signal - Channel {chn} Record {record_no}') - ax.set_xlabel('Time (s)') + ax.set_xlabel('Time ($\mu$s)') ax.set_ylabel('Normalized Derivative') plt.grid() + ax.legend() return ax + + +@dataclass(frozen=True) +class MLEConfig: + h: float + sigma_bar: float + delta_sigma: float + A1: float + A2: float + A3: float + grid_size: int = 401 + grid_margin: float = 0.5 + +def mle_tau_moteki_kondo( + S: Union[xr.DataArray, xr.Dataset], + norm_deriv: Union[xr.DataArray, xr.Dataset], + p: int, + *, + ch: Optional[str] = None, + event_index: int, + event_dim: str = "event_index", + S_sample_dim: Optional[str] = None, + y_sample_dim: Optional[str] = None, + tau_grid: Optional[Union[np.ndarray, xr.DataArray]] = None, + k_end: Optional[int] = None, + config: Optional[MLEConfig] = None, +) -> xr.DataArray: + """ + Estimate tau_hat using the Moteki & Kondo grid-search MLE. + + Parameters + ---------- + S : xr.DataArray or xr.Dataset + Scattering signal. + norm_deriv : xr.DataArray or xr.Dataset + Normalized derivative. + p : int + Number of consecutive points in each k-subset. + ch : str, optional + Variable to select when S and/or norm_deriv are Datasets. + Required if a Dataset contains multiple variables and no unique choice exists. + event_index : int + Event index to select. This function returns tau_hat(k) for one event only. + event_dim : str + Name of event dimension. + S_sample_dim : str, optional + Sample dimension in S. + y_sample_dim : str, optional + Sample dimension in norm_deriv. + tau_grid : 1D array-like, optional + Global tau grid for all subsets. + k_end : int, optional + Largest starting k. + config : MLEConfig + Calibration / noise / grid settings. + """ + if config is None: + raise ValueError("config must be provided.") + + def _to_dataarray(obj: Union[xr.DataArray, xr.Dataset], name: str) -> xr.DataArray: + """ + Accept either a DataArray or Dataset. + If a Dataset is provided, select the variable named `ch`. + """ + if isinstance(obj, xr.DataArray): + return obj + if isinstance(obj, xr.Dataset): + if ch is not None: + # Use the user input channel. + if ch not in obj.data_vars: + raise ValueError( + f"{ch!r} not found in {name}.data_vars={list(obj.data_vars)}" + ) + return obj[ch] + if len(obj.data_vars) == 1: + only_var = next(iter(obj.data_vars)) + return obj[only_var] + raise ValueError( + f"{name} is a Dataset with multiple variables. " + f"Provide ch. Available: {list(obj.data_vars)}" + ) + raise TypeError(f"{name} must be an xarray DataArray or Dataset.") + + # Convert datasets to the selected DataArrays. + S = _to_dataarray(S, "S") + norm_deriv = _to_dataarray(norm_deriv, "norm_deriv") + + # The method requires one event axis and one sample axis. + if event_dim not in S.dims: + raise ValueError(f"{event_dim!r} not found in S.dims={S.dims}") + if event_dim not in norm_deriv.dims: + raise ValueError(f"{event_dim!r} not found in norm_deriv.dims={norm_deriv.dims}") + + # Infer the sample dimension if the user did not specify it. + if S_sample_dim is None: + s_non_event_dims = [d for d in S.dims if d != event_dim] + if len(s_non_event_dims) != 1: + raise ValueError( + f"Could not infer S sample dim. Non-event dims in S: {s_non_event_dims}" + ) + S_sample_dim = s_non_event_dims[0] + + if y_sample_dim is None: + y_non_event_dims = [d for d in norm_deriv.dims if d != event_dim] + if len(y_non_event_dims) != 1: + raise ValueError( + f"Could not infer norm_deriv sample dim. Non-event dims in norm_deriv: {y_non_event_dims}" + ) + y_sample_dim = y_non_event_dims[0] + + # Rename the sample dimensions to a common internal name. + S_std = S.rename({S_sample_dim: "sample"}) + y_std = norm_deriv.rename({y_sample_dim: "sample"}) + + # Align the arrays so the same event/sample positions are used in both inputs. + S_std, y_std = xr.align(S_std, y_std, join="inner") + + if event_index < 0 or event_index >= S_std.sizes[event_dim]: + raise ValueError( + f"event_index must be in [0, {S_std.sizes[event_dim] - 1}], got {event_index}" + ) + + n_samples = S_std.sizes["sample"] + + if p < 2 or p > n_samples: + raise ValueError(f"p must be in [2, {n_samples}], got {p}") + + if k_end is None: + k_end = n_samples - p + if k_end < 0 or k_end > n_samples - p: + raise ValueError(f"k_end must be in [0, {n_samples - p}], got {k_end}") + + # Optional tau grid for the 1D grid search in tau. + # Moteki & Kondo determine tau numerically by maximizing L_k(tau). + if tau_grid is not None: + tau_grid_np = np.asarray( + tau_grid.data if isinstance(tau_grid, xr.DataArray) else tau_grid, + dtype=float, + ) + if tau_grid_np.ndim != 1: + raise ValueError("tau_grid must be 1D.") + else: + tau_grid_np = None + + # Parameters from Appendix A. + h = float(config.h) + sigma_bar = float(config.sigma_bar) + delta_sigma = float(config.delta_sigma) + A1, A2, A3 = float(config.A1), float(config.A2), float(config.A3) + + # Time axis used in the fit. + # Here we use physical time spacing h so tk is in seconds (or whatever unit h uses). + # This must match sigma_bar and delta_sigma units. + t = np.arange(n_samples) * h + + if h <= 0: + raise ValueError("config.h must be positive.") + if sigma_bar <= 0: + raise ValueError("config.sigma_bar must be positive.") + if delta_sigma < 0: + raise ValueError("config.delta_sigma must be >= 0.") + + # Eq. (A.7): finite-difference amplification factor for the derivative noise. + Af_d = np.sqrt(130.0) / 12.0 + + def _logL_for_tau(yk: np.ndarray, sk: np.ndarray, tk: np.ndarray, tau: float) -> float: + """ + Log-likelihood for one k-subset and one candidate tau. + + Mean model: + ybar_i(tau) = -(t_i - tau) / sigma_bar^2 [Eq. (A.4)] + where y_i = S'_i / S_i. + + Covariance: + Cov[y_i, y_j] = 4 / sigma_bar^6 * (t_i - tau)(t_j - tau) * (delta_sigma)^2 [Eq. (A.10a)] + Var[y_i] = 4 / sigma_bar^6 * (t_i - tau)^2 * (delta_sigma)^2 + + (Af_d^2 / h^2) * (1/S_i^2) * (delta S_i)^2 [Eq. (A.10b)] + with + delta S_i = sqrt(A1^2 + A2^2 S_i + A3^2 S_i^2) [Eq. (A.6)] + and + (delta y_i)_ran = Af_d * (1/h) * (1/S_i) * delta S_i [Eq. (A.7)] + + The full likelihood is the multivariate Gaussian in Eq. (A.9). + """ + # Mean vector of the normalized derivative under the Gaussian beam model. + # This is the line I'/I = -(t - tau)/sigma^2 [Eq. (5)] used as the mean [Eq. (A.4)]. + ybar = -(tk - tau) / (sigma_bar * sigma_bar) + + # Signal-noise amplitude from Appendix A [Eq. (A.6)]. + deltaS = np.sqrt(A1 * A1 + (A2 * A2) * sk + (A3 * A3) * (sk * sk)) + + # Random variance of y = S'/S from finite-difference error propagation [Eq. (A.7)]. + with np.errstate(divide="ignore", invalid="ignore"): + var_rand_k = (Af_d * Af_d) / (h * h) * (deltaS * deltaS) / (sk * sk) + + # If any term is non-finite, this tau candidate is unusable. + if not np.all(np.isfinite(var_rand_k)): + return -np.inf + if np.any(var_rand_k <= 0): + return -np.inf + + # Systematic covariance from particle-by-particle fluctuations in sigma [Eq. (A.10a)]. + dt = (tk - tau).reshape(-1, 1) + sys_pref = 4.0 * (delta_sigma * delta_sigma) / (sigma_bar ** 6) + Sigma = sys_pref * (dt @ dt.T) + # Add the diagonal random variance term [Eq. (A.10b)]. + Sigma[np.diag_indices_from(Sigma)] += var_rand_k + + # Residual vector y - ybar. + r = yk - ybar + + # Use Cholesky factorization for numerical stability when evaluating Eq. (A.9). + try: + L = np.linalg.cholesky(Sigma) + except np.linalg.LinAlgError: + return -np.inf + + # Compute statistical distance. + # d^2 = (y - ybar)^T Sigma^{-1} (y - ybar) [Eq. (A.11)] + z = np.linalg.solve(L, r) + d2 = float(z.T @ z) + # log |Sigma| from the Cholesky factor. + logdet = 2.0 * np.sum(np.log(np.diag(L))) + + # Multivariate normal log-likelihood [Eq. (A.9)]. + p_local = yk.size + return float(-0.5 * (p_local * np.log(2.0 * np.pi) + logdet + d2)) + + def _tau_hat_for_one_event(s_event: np.ndarray, y_event: np.ndarray) -> np.ndarray: + """ + For one event, scan all k-subsets of length p and return tau_hat(k). + """ + tau_hat = np.full(k_end + 1, np.nan, dtype=float) + + # Skip events with missing values. + if not (np.all(np.isfinite(s_event)) and np.all(np.isfinite(y_event))): + return tau_hat + + for k in range(k_end + 1): + # Consecutive p-point subset starting at k. + # This is the subset over which Moteki & Kondo search for the leading-edge + # segment that best matches I'/I [Appendix A.5]. + yk = y_event[k : k + p] + sk = s_event[k : k + p] + tk = t[k : k + p] + + if not (np.all(np.isfinite(yk)) and np.all(np.isfinite(sk))): + continue + + # If the user did not supply a global tau grid, build a local grid for this k. + if tau_grid_np is None: + span = float(tk[-1] - tk[0]) + margin = config.grid_margin * (span + h) + grid = np.linspace(tk[0] - margin, tk[-1] + margin, config.grid_size) + else: + grid = tau_grid_np + + # Grid-search maximization of L_k(tau) [Appendix A.5]. + best_ll = -np.inf + best_tau = np.nan + for tau_cand in grid: + ll = _logL_for_tau(yk, sk, tk, float(tau_cand)) + if ll > best_ll: + best_ll = ll + best_tau = float(tau_cand) + + if np.isfinite(best_ll): + tau_hat[k] = best_tau + + return tau_hat + + # Select the requested event only, and return tau_hat(k) for that one event. + s_event = np.asarray(S_std.sel({event_dim: event_index}).values, dtype=float) + y_event = np.asarray(y_std.sel({event_dim: event_index}).values, dtype=float) + + tau_hat_1d = _tau_hat_for_one_event(s_event, y_event) + + return xr.DataArray( + tau_hat_1d, + dims=("k",), + coords={"k": np.arange(k_end + 1)}, + name="tau_hat", + attrs={ + "long_name": f"MLE tau_hat(k) for {event_dim}={event_index}", + "units": "sample_index_or_time_units_of_t", + }, + ) diff --git a/pysp2/util/peak_fit.py b/pysp2/util/peak_fit.py index 86f38e2..0258d5a 100644 --- a/pysp2/util/peak_fit.py +++ b/pysp2/util/peak_fit.py @@ -84,7 +84,7 @@ def chisquare(obs, f_exp): return np.sum((obs - f_exp)**2) -def gaussian_fit(my_ds, config, parallel=False, num_records=None): +def gaussian_fit(my_ds, config, parallel=False, num_records=None, baseline_to_zero=False): """ Does Gaussian fitting for each wave in the dataset. This will do the fitting for channel 0 only. @@ -102,6 +102,8 @@ def gaussian_fit(my_ds, config, parallel=False, num_records=None): num_records: int or None Only process first num_records datapoints. Set to None to process all records. + baseline_to_zero: bool + If True, shift each record's minimum to zero before fitting (channel 0). Returns ------- @@ -114,6 +116,12 @@ def gaussian_fit(my_ds, config, parallel=False, num_records=None): num_trig_pts = int(config['Acquisition']['Pre-Trig Points']) start_time = time.time() + # Move baseline to zero for channel 0 if flag is set + if baseline_to_zero: + # For each event, subtract the minimum from Data_ch0 + min_vals = np.nanmin(my_ds['Data_ch0'].values, axis=1) + my_ds['Data_ch0'].values = my_ds['Data_ch0'].values - min_vals[:, np.newaxis] + for i in [3, 7]: coeffs = _split_scatter_fit(my_ds, i) Base2 = coeffs['base'] @@ -185,7 +193,7 @@ def gaussian_fit(my_ds, config, parallel=False, num_records=None): proc_records = [] for i in range(num_records): proc_records.append(_do_fit_records(my_ds, i, num_trig_pts)) - + FtAmp = np.stack([x[0] for x in proc_records]) FtPos = np.stack([x[1] for x in proc_records]) Base = np.stack([x[2] for x in proc_records]) diff --git a/tests/baseline/test_plot_normalized_derivative.png b/tests/baseline/test_plot_normalized_derivative.png index d794486..d2a361c 100644 Binary files a/tests/baseline/test_plot_normalized_derivative.png and b/tests/baseline/test_plot_normalized_derivative.png differ diff --git a/tests/baseline/test_plot_wave.png b/tests/baseline/test_plot_wave.png new file mode 100644 index 0000000..941839a Binary files /dev/null and b/tests/baseline/test_plot_wave.png differ diff --git a/tests/test_ndm.py b/tests/test_ndm.py index d670855..9e5d41a 100644 --- a/tests/test_ndm.py +++ b/tests/test_ndm.py @@ -1,5 +1,8 @@ import pysp2 import numpy as np +np.set_printoptions(threshold=np.inf) + +from pysp2.util.normalized_derivative_method import MLEConfig, mle_tau_moteki_kondo def test_central_difference(): my_sp2b = pysp2.io.read_sp2(pysp2.testing.EXAMPLE_SP2B) @@ -22,4 +25,48 @@ def test_central_difference(): 7.166666666e7/-30152, decimal=2) np.testing.assert_almost_equal(dSdt_norm['Data_ch4'].isel(event_index=5876, time=19).item(), 1.5e7/-30132, decimal=2) + +def test_mle_estimate_tau(): + my_sp2b = pysp2.io.read_sp2(pysp2.testing.EXAMPLE_SP2B) + my_ini = pysp2.io.read_config(pysp2.testing.EXAMPLE_INI) + my_binary = pysp2.util.gaussian_fit(my_sp2b, my_ini, parallel=False, baseline_to_zero=True) + dSdt = pysp2.util.central_difference(my_binary, normalize=False, baseline_to_zero=True) + + cfg = MLEConfig( + h=0.4e-6, # example: 0.4 microseconds + sigma_bar=26.28*0.4e-6, # example; use your measured average width + delta_sigma=1.2*0.4e-6,# example; use your measured width std dev + A1=0.37, + A2=1.6e-2, + A3=6.2e-4, +) + + ## Test one event + tau = mle_tau_moteki_kondo( + S=my_binary, + norm_deriv=dSdt, + p=21, + ch="Data_ch0", + event_index=499, + config=cfg, + ) + tau_val = my_binary['Data_ch0'].isel(event_index=499).argmax().item()*0.4e-6 + # Test that the estimated tau for a subset of results is close to the true value for the event + for i in range(21, 37): + np.testing.assert_almost_equal(tau[i], tau_val, decimal=6) + + ## Test another event + tau = mle_tau_moteki_kondo( + S=my_binary, + norm_deriv=dSdt, + p=21, + ch="Data_ch0", + event_index=1040, + config=cfg, + ) + + tau_val = my_binary['Data_ch0'].isel(event_index=1040).argmax().item()*0.4e-6 + # Test that the estimated tau for a subset of results is close to the true value for the event + for i in range(23, 38): + np.testing.assert_almost_equal(tau[i], tau_val, decimal=6) \ No newline at end of file diff --git a/tests/test_vis.py b/tests/test_vis.py index d4f109f..40fef1a 100644 --- a/tests/test_vis.py +++ b/tests/test_vis.py @@ -5,6 +5,7 @@ import pysp2 from pysp2.util.normalized_derivative_method import plot_normalized_derivative +from pysp2.vis.plot_wave import plot_wave matplotlib.use("Agg") @@ -13,11 +14,23 @@ def test_plot_normalized_derivative(): my_sp2b = pysp2.io.read_sp2(pysp2.testing.EXAMPLE_SP2B) my_ini = pysp2.io.read_config(pysp2.testing.EXAMPLE_INI) - my_binary = pysp2.util.gaussian_fit(my_sp2b, my_ini, parallel=False) + my_binary = pysp2.util.gaussian_fit(my_sp2b, my_ini, parallel=False, baseline_to_zero=True) dSdt_norm = pysp2.util.central_difference(my_binary, normalize=True, baseline_to_zero=True) # Test the plotting function for channel 0 and record number 2 ax = plot_normalized_derivative(dSdt_norm, record_no=499, chn=0) fig = ax.figure + return fig + +@pytest.mark.mpl_image_compare(tolerance=10) +def test_plot_wave(): + + my_sp2b = pysp2.io.read_sp2(pysp2.testing.EXAMPLE_SP2B) + my_ini = pysp2.io.read_config(pysp2.testing.EXAMPLE_INI) + my_binary = pysp2.util.gaussian_fit(my_sp2b, my_ini, parallel=False, baseline_to_zero=True) + + # Test the plotting function for channel 0 and record number 2 + display = plot_wave(my_binary, record_no=499, chn=0) + fig = display.axes[0].figure return fig \ No newline at end of file