From 2491e159657571329aa6bac7d6c1d8f37695e0de Mon Sep 17 00:00:00 2001 From: Kimball Bighorse Date: Fri, 27 Feb 2026 19:37:48 -0800 Subject: [PATCH 01/13] feat: implement Radionuclides backfill job and BDD step definitions Add chemistry backfill that maps legacy NMA_Radionuclides rows into Observation records keyed on nma_pk_chemistryresults (idempotent upsert). - Schema: add chemistry columns to Observation and Sample models/schemas - Migration: add columns + unique constraints, drop stale NMA_Radionuclides.thing_id - Lexicon: add pCi/L unit and Chemistry Observation note type - Backfill: resolve Samples via nma_pk_chemistrysample, create Parameters, AnalysisMethods, Notes; detect_flag from legacy Symbol qualifier - Tests: BDD step definitions for all 7 scenarios, feature file fixes - Orchestrator: wire radionuclides step into backfill pipeline Refs #558 Co-Authored-By: Claude Opus 4.6 --- ...e5e8_add_chemistry_backfill_columns_to_.py | 114 +++ core/lexicon.json | 14 + db/observation.py | 19 + db/sample.py | 15 + schemas/observation.py | 4 + schemas/sample.py | 3 + ...a-chemistry-radionuclides-refactor.feature | 2 + tests/features/steps/chemistry-backfill.py | 671 ++++++++++++++++++ transfers/backfill/backfill.py | 11 +- transfers/backfill/chemistry_backfill.py | 305 ++++++++ 10 files changed, 1151 insertions(+), 7 deletions(-) create mode 100644 alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py create mode 100644 tests/features/steps/chemistry-backfill.py create mode 100644 transfers/backfill/chemistry_backfill.py diff --git a/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py b/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py new file mode 100644 index 000000000..850847747 --- /dev/null +++ b/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py @@ -0,0 +1,114 @@ +"""add chemistry backfill columns to observation and sample + +Revision ID: 545a5b77e5e8 +Revises: d5e6f7a8b9c0 +Create Date: 2026-02-27 11:30:45.380002 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '545a5b77e5e8' +down_revision: Union[str, Sequence[str], None] = 'd5e6f7a8b9c0' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Add chemistry backfill columns to observation and sample tables.""" + # Observation table: 4 new columns + op.add_column('observation', sa.Column( + 'nma_pk_chemistryresults', sa.String(), nullable=True, + comment='NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key', + )) + op.add_column('observation', sa.Column( + 'detect_flag', sa.Boolean(), nullable=True, + comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier", + )) + op.add_column('observation', sa.Column( + 'uncertainty', sa.Float(), nullable=True, + comment='Measurement uncertainty for the observation value', + )) + op.add_column('observation', sa.Column( + 'analysis_agency', sa.String(), nullable=True, + comment='Agency or lab that performed the analysis', + )) + op.create_unique_constraint( + 'uq_observation_nma_pk_chemistryresults', + 'observation', ['nma_pk_chemistryresults'], + ) + + # Observation version table (sqlalchemy-continuum) + op.add_column('observation_version', sa.Column( + 'nma_pk_chemistryresults', sa.String(), autoincrement=False, nullable=True, + comment='NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key', + )) + op.add_column('observation_version', sa.Column( + 'detect_flag', sa.Boolean(), autoincrement=False, nullable=True, + comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier", + )) + op.add_column('observation_version', sa.Column( + 'uncertainty', sa.Float(), autoincrement=False, nullable=True, + comment='Measurement uncertainty for the observation value', + )) + op.add_column('observation_version', sa.Column( + 'analysis_agency', sa.String(), autoincrement=False, nullable=True, + comment='Agency or lab that performed the analysis', + )) + + # Sample table: 3 new columns + op.add_column('sample', sa.Column( + 'nma_pk_chemistrysample', sa.String(), nullable=True, + comment='NM_Aquifer SamplePtID for chemistry samples — transfer audit key', + )) + op.add_column('sample', sa.Column( + 'volume', sa.Float(), nullable=True, + comment='Volume of the sample collected', + )) + op.add_column('sample', sa.Column( + 'volume_unit', sa.String(), nullable=True, + comment='Unit for the sample volume (e.g. mL, L)', + )) + op.create_unique_constraint( + 'uq_sample_nma_pk_chemistrysample', + 'sample', ['nma_pk_chemistrysample'], + ) + + # Drop stale thing_id column from NMA_Radionuclides (model no longer defines it; + # relationships go through NMA_Chemistry_SampleInfo.thing_id instead) + op.drop_constraint( + 'NMA_Radionuclides_thing_id_fkey', 'NMA_Radionuclides', type_='foreignkey', + ) + op.drop_column('NMA_Radionuclides', 'thing_id') + + +def downgrade() -> None: + """Remove chemistry backfill columns.""" + # Restore NMA_Radionuclides.thing_id + op.add_column('NMA_Radionuclides', sa.Column( + 'thing_id', sa.Integer(), nullable=False, + )) + op.create_foreign_key( + 'NMA_Radionuclides_thing_id_fkey', 'NMA_Radionuclides', 'thing', + ['thing_id'], ['id'], ondelete='CASCADE', + ) + + op.drop_constraint('uq_sample_nma_pk_chemistrysample', 'sample', type_='unique') + op.drop_column('sample', 'volume_unit') + op.drop_column('sample', 'volume') + op.drop_column('sample', 'nma_pk_chemistrysample') + + op.drop_column('observation_version', 'analysis_agency') + op.drop_column('observation_version', 'uncertainty') + op.drop_column('observation_version', 'detect_flag') + op.drop_column('observation_version', 'nma_pk_chemistryresults') + + op.drop_constraint('uq_observation_nma_pk_chemistryresults', 'observation', type_='unique') + op.drop_column('observation', 'analysis_agency') + op.drop_column('observation', 'uncertainty') + op.drop_column('observation', 'detect_flag') + op.drop_column('observation', 'nma_pk_chemistryresults') diff --git a/core/lexicon.json b/core/lexicon.json index 32757116b..c5512cc2b 100644 --- a/core/lexicon.json +++ b/core/lexicon.json @@ -857,6 +857,13 @@ "term": "mg/L", "definition": "Milligrams per Liter" }, + { + "categories": [ + "unit" + ], + "term": "pCi/L", + "definition": "Picocuries per Liter" + }, { "categories": [ "unit" @@ -8213,6 +8220,13 @@ "term": "Site Notes (legacy)", "definition": "Legacy site notes field from WaterLevels" }, + { + "categories": [ + "note_type" + ], + "term": "Chemistry Observation", + "definition": "Notes from chemistry observation results" + }, { "categories": [ "well_pump_type" diff --git a/db/observation.py b/db/observation.py index d716f9084..cdda1cdfa 100644 --- a/db/observation.py +++ b/db/observation.py @@ -37,6 +37,25 @@ class Observation(Base, AutoBaseMixin, ReleaseMixin): # NM_Aquifer fields for audits nma_pk_waterlevels: Mapped[str] = mapped_column(nullable=True) + nma_pk_chemistryresults: Mapped[str] = mapped_column( + nullable=True, + unique=True, + comment="NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key", + ) + + # Chemistry-specific columns + detect_flag: Mapped[bool] = mapped_column( + nullable=True, + comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier", + ) + uncertainty: Mapped[float] = mapped_column( + nullable=True, + comment="Measurement uncertainty for the observation value", + ) + analysis_agency: Mapped[str] = mapped_column( + nullable=True, + comment="Agency or lab that performed the analysis", + ) # --- Foreign Keys --- sample_id: Mapped[int] = mapped_column( diff --git a/db/sample.py b/db/sample.py index cdb65b684..32c17c271 100644 --- a/db/sample.py +++ b/db/sample.py @@ -89,6 +89,21 @@ class Sample(Base, AutoBaseMixin, ReleaseMixin): nullable=True, comment="NM_Aquifer primary key for waterlevels - to be used for transfer audits", ) + nma_pk_chemistrysample: Mapped[str] = mapped_column( + nullable=True, + unique=True, + comment="NM_Aquifer SamplePtID for chemistry samples — transfer audit key", + ) + + # Chemistry sample attributes + volume: Mapped[float] = mapped_column( + nullable=True, + comment="Volume of the sample collected", + ) + volume_unit: Mapped[str] = mapped_column( + nullable=True, + comment="Unit for the sample volume (e.g. mL, L)", + ) # --- Relationship Definitions --- field_activity: Mapped["FieldActivity"] = relationship(back_populates="samples") diff --git a/schemas/observation.py b/schemas/observation.py index 6f645b13f..fe6585bdc 100644 --- a/schemas/observation.py +++ b/schemas/observation.py @@ -112,6 +112,10 @@ class BaseObservationResponse(BaseResponseModel): value: float | None unit: Unit nma_data_quality: str | None = None + nma_pk_chemistryresults: str | None = None + detect_flag: bool | None = None + uncertainty: float | None = None + analysis_agency: str | None = None class GroundwaterLevelObservationResponse(BaseObservationResponse): diff --git a/schemas/sample.py b/schemas/sample.py index 8dce646bd..c9056c301 100644 --- a/schemas/sample.py +++ b/schemas/sample.py @@ -139,6 +139,9 @@ class SampleResponse(BaseResponseModel): notes: str | None depth_top: float | None depth_bottom: float | None + nma_pk_chemistrysample: str | None = None + volume: float | None = None + volume_unit: str | None = None # ============= EOF ============================================= diff --git a/tests/features/nma-chemistry-radionuclides-refactor.feature b/tests/features/nma-chemistry-radionuclides-refactor.feature index 060407e4b..d1c1840fe 100644 --- a/tests/features/nma-chemistry-radionuclides-refactor.feature +++ b/tests/features/nma-chemistry-radionuclides-refactor.feature @@ -78,6 +78,7 @@ Feature: Refactor legacy Radionuclides into the Ocotillo schema via backfill job | 0C354D8D-5404-41CE-9C95-002213371C4F | 77F1E3CF-A961-440E-966C-DD2E3675044B | GB | 5 | pCi/L| 2005-01-18 | E900.0 | | 095DA2E3-79E3-4BF2-B096-025C6D9A64B7 | BC50F55E-5BF1-471D-931D-03501081B4FD | Ra228 | 2.6 | pCi/L| 2003-11-26 | EPA 904.0 Mod | And a Sample record exists with nma_pk_chemistrysample "77F1E3CF-A961-440E-966C-DD2E3675044B" + And a Sample record exists with nma_pk_chemistrysample "BC50F55E-5BF1-471D-931D-03501081B4FD" When I run the Radionuclides backfill job Then the Observation for GlobalID "0C354D8D-5404-41CE-9C95-002213371C4F" should set analysis_method_name to "E900.0" And the Observation for GlobalID "095DA2E3-79E3-4BF2-B096-025C6D9A64B7" should set analysis_method_name to "EPA 904.0 Mod" @@ -123,6 +124,7 @@ Feature: Refactor legacy Radionuclides into the Ocotillo schema via backfill job | field | value | | GlobalID | 76F3A993-A29B-413B-83E0-00ADF51D15A2 | | SamplePtID | 7758D992-0394-42B1-BE96-734FCACB6412 | + | Analyte | Radium-226 | | SamplePointID| EB-490A | | OBJECTID | 333 | | WCLab_ID | null | diff --git a/tests/features/steps/chemistry-backfill.py b/tests/features/steps/chemistry-backfill.py new file mode 100644 index 000000000..b80e9e722 --- /dev/null +++ b/tests/features/steps/chemistry-backfill.py @@ -0,0 +1,671 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +BDD step definitions for chemistry backfill features. + +These steps are designed to be reusable across all chemistry types +(Radionuclides, MajorChemistry, MinorTraceChemistry, FieldParameters). +""" + +import uuid +from datetime import datetime, timezone + +from behave import given, when, then +from behave.runner import Context +from sqlalchemy import select + +from db import Location, Thing, LocationThingAssociation +from db.analysis_method import AnalysisMethod +from db.engine import session_ctx +from db.field import FieldEvent, FieldActivity +from db.nma_legacy import NMA_Chemistry_SampleInfo, NMA_Radionuclides +from db.notes import Notes +from db.observation import Observation +from db.parameter import Parameter +from db.sample import Sample + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- +def _ensure_backfill_tracking(context: Context): + """Initialize per-scenario tracking lists for cleanup.""" + if not hasattr(context, "_backfill_created"): + context._backfill_created = { + "observation_ids": [], + "note_ids": [], + "analysis_method_ids": [], + "sample_ids": [], + "field_activity_ids": [], + "field_event_ids": [], + "nma_radionuclide_ids": [], + "nma_sampleinfo_ids": [], + "location_ids": [], + "well_ids": [], + } + + +def _ensure_well(context: Context): + """Ensure context.objects['wells'] has at least one well. + + When running without DROP_AND_REBUILD_DB the before_all fixture doesn't + create wells. This helper creates a minimal Location → Thing chain so + the chemistry-backfill steps can link NMA_Chemistry_SampleInfo rows to a + Thing. + """ + if not hasattr(context, "objects"): + context.objects = {} + if context.objects.get("wells"): + return # already have one + + _ensure_backfill_tracking(context) + with session_ctx() as session: + loc = Location( + point="POINT(-107.949533 33.809665)", + elevation=2464.9, + release_status="draft", + ) + session.add(loc) + session.flush() + context._backfill_created["location_ids"].append(loc.id) + + well = Thing( + name="CHEM-TEST-0001", + first_visit_date="2023-03-03", + thing_type="water well", + release_status="draft", + well_depth=10, + hole_depth=10, + ) + session.add(well) + session.flush() + context._backfill_created["well_ids"].append(well.id) + + assoc = LocationThingAssociation(location=loc, thing=well) + assoc.effective_start = "2025-02-01T00:00:00Z" + session.add(assoc) + session.commit() + session.refresh(well) + + if "wells" not in context.objects: + context.objects["wells"] = [] + context.objects["wells"].append(well) + + +def _parse_table_value(raw: str): + """Parse a table cell value, handling 'null' and numeric coercion.""" + if raw.strip().lower() == "null": + return None + return raw.strip() + + +def _get_observation_by_globalid(session, global_id: str) -> Observation: + obs = session.execute( + select(Observation).where( + Observation.nma_pk_chemistryresults == global_id.lower() + ) + ).scalar_one_or_none() + assert obs is not None, ( + f"No Observation found with nma_pk_chemistryresults='{global_id.lower()}'" + ) + return obs + + +# --------------------------------------------------------------------------- +# GIVEN steps +# --------------------------------------------------------------------------- +@given("a database session is available") +def step_given_db_session(context: Context): + """Verify session_ctx works and init context attributes.""" + _ensure_backfill_tracking(context) + with session_ctx() as session: + assert session is not None, "Database session should be available" + context.backfill_result = None + context.chemistry_type = None + + +@given("legacy NMA_Radionuclides records exist in the database") +def step_given_legacy_radionuclides_exist(context: Context): + """Marker step — sets chemistry_type on context.""" + _ensure_backfill_tracking(context) + context.chemistry_type = "Radionuclides" + + +@given( + 'lexicon terms exist for parameter_name, unit, analysis_method_type, and sample_matrix "water"' +) +def step_given_lexicon_terms_exist(context: Context): + """Fail-fast check that critical lexicon terms are seeded. + + Also seeds test-only analyte names so the backfill can create + Parameter records without requiring production lexicon data. + """ + _ensure_backfill_tracking(context) + with session_ctx() as session: + from db.lexicon import LexiconTerm + + # Terms that must already exist (seeded by init_lexicon) + for term in ("water", "pCi/L", "Chemistry Observation"): + row = session.execute( + select(LexiconTerm).where(LexiconTerm.term == term) + ).scalar_one_or_none() + assert row is not None, ( + f"Required lexicon term '{term}' not found. " + "Ensure init_lexicon() ran during before_all." + ) + + # Test-only analyte names used by scenarios — seed if absent. + # In production these would be pre-seeded via a vocabulary + # preparation step (see issue #XXX). + test_analytes = ( + "GB", "Uranium", "GA", "Ra228", "Uranium-238", + "Radium-226", "Nitrate", "Unknown", + ) + for analyte in test_analytes: + existing = session.execute( + select(LexiconTerm).where(LexiconTerm.term == analyte) + ).scalar_one_or_none() + if existing is None: + session.add(LexiconTerm(term=analyte, definition=analyte)) + session.commit() + + +@given("a legacy NMA_Radionuclides record exists with:") +def step_given_single_radionuclide(context: Context): + """Create a single NMA_Radionuclides row from a vertical Behave table.""" + _ensure_backfill_tracking(context) + fields = {row["field"]: _parse_table_value(row["value"]) for row in context.table} + _create_radionuclide_from_fields(context, fields) + + +@given("legacy NMA_Radionuclides records exist with:") +def step_given_multi_radionuclides(context: Context): + """Create multiple NMA_Radionuclides rows from a horizontal Behave table.""" + _ensure_backfill_tracking(context) + for row in context.table: + fields = {heading: _parse_table_value(row[heading]) for heading in context.table.headings} + _create_radionuclide_from_fields(context, fields) + + +@given("a legacy Chemistry_SampleInfo record exists with:") +def step_given_sampleinfo(context: Context): + """Create an NMA_Chemistry_SampleInfo row from a vertical Behave table.""" + _ensure_backfill_tracking(context) + _ensure_well(context) + fields = {row["field"]: _parse_table_value(row["value"]) for row in context.table} + + with session_ctx() as session: + well = context.objects["wells"][0] + well = session.merge(well) + + sampleinfo = NMA_Chemistry_SampleInfo( + thing_id=well.id, + nma_sample_pt_id=fields.get("SamplePtID"), + nma_sample_point_id=fields.get("SamplePointID", "UNKNOWN"), + ) + session.add(sampleinfo) + session.commit() + session.refresh(sampleinfo) + context._backfill_created["nma_sampleinfo_ids"].append(sampleinfo.id) + + # Store for later use + if not hasattr(context, "_sampleinfo_map"): + context._sampleinfo_map = {} + context._sampleinfo_map[fields["SamplePtID"]] = sampleinfo.id + + +@given('a Sample record exists with nma_pk_chemistrysample "{sample_pt_id}"') +def step_given_sample_with_chemistry_key(context: Context, sample_pt_id: str): + """Create FieldEvent → FieldActivity → Sample chain linked to first well.""" + _ensure_backfill_tracking(context) + _ensure_well(context) + + with session_ctx() as session: + well = context.objects["wells"][0] + well = session.merge(well) + + fe = FieldEvent( + thing_id=well.id, + event_date=datetime(2005, 1, 1, tzinfo=timezone.utc), + release_status="draft", + ) + session.add(fe) + session.flush() + context._backfill_created["field_event_ids"].append(fe.id) + + fa = FieldActivity( + field_event_id=fe.id, + activity_type="water chemistry", + release_status="draft", + ) + session.add(fa) + session.flush() + context._backfill_created["field_activity_ids"].append(fa.id) + + sample = Sample( + field_activity_id=fa.id, + sample_date=datetime(2005, 1, 1, tzinfo=timezone.utc), + sample_name=f"CHEM-{uuid.uuid4().hex[:8]}", + sample_matrix="water", + sample_method="Estimated", + qc_type="Normal", + nma_pk_chemistrysample=sample_pt_id, + release_status="draft", + ) + session.add(sample) + session.commit() + session.refresh(sample) + context._backfill_created["sample_ids"].append(sample.id) + + +# --------------------------------------------------------------------------- +# WHEN steps +# --------------------------------------------------------------------------- +@when("I run the Radionuclides backfill job") +def step_when_run_backfill(context: Context): + """Execute the backfill and store the result on context.""" + from transfers.backfill.chemistry_backfill import backfill_radionuclides + + context.backfill_result = backfill_radionuclides() + + +@when("I run the Radionuclides backfill job again") +def step_when_run_backfill_again(context: Context): + """Re-run to verify idempotency.""" + from transfers.backfill.chemistry_backfill import backfill_radionuclides + + context.backfill_result = backfill_radionuclides() + + +# --------------------------------------------------------------------------- +# THEN steps +# --------------------------------------------------------------------------- +@then( + 'exactly {n:d} Observation record should exist with nma_pk_chemistryresults "{global_id}"' +) +def step_then_observation_count(context: Context, n: int, global_id: str): + with session_ctx() as session: + count = session.execute( + select(Observation) + .where(Observation.nma_pk_chemistryresults == global_id.lower()) + ).all() + assert len(count) == n, ( + f"Expected {n} Observation(s) for GlobalID {global_id}, found {len(count)}" + ) + if len(count) == 1: + context._last_observation = count[0][0] + + +@then( + 'the Observation should reference the Sample with nma_pk_chemistrysample "{sample_pt_id}"' +) +def step_then_observation_refs_sample(context: Context, sample_pt_id: str): + obs = context._last_observation + with session_ctx() as session: + obs = session.merge(obs) + sample = session.get(Sample, obs.sample_id) + assert sample is not None, "Observation must reference a Sample" + assert sample.nma_pk_chemistrysample == sample_pt_id, ( + f"Expected Sample.nma_pk_chemistrysample={sample_pt_id}, " + f"got {sample.nma_pk_chemistrysample}" + ) + + +@then('the Observation should set observation_datetime to "{date_str}"') +def step_then_observation_datetime(context: Context, date_str: str): + obs = context._last_observation + with session_ctx() as session: + obs = session.merge(obs) + dt = obs.observation_datetime + # Normalize to UTC for comparison (pg may return in server tz) + if dt.tzinfo is not None: + dt = dt.astimezone(timezone.utc) + actual_date = dt.strftime("%Y-%m-%d") + assert actual_date == date_str, ( + f"Expected observation_datetime={date_str}, got {actual_date}" + ) + + +@then("the Observation should set value to {value}") +def step_then_observation_value(context: Context, value: str): + obs = context._last_observation + expected = float(value) + with session_ctx() as session: + obs = session.merge(obs) + assert obs.value == expected, ( + f"Expected value={expected}, got {obs.value}" + ) + + +@then('the Observation should set unit to "{unit}"') +def step_then_observation_unit(context: Context, unit: str): + obs = context._last_observation + with session_ctx() as session: + obs = session.merge(obs) + assert obs.unit == unit, f"Expected unit={unit}, got {obs.unit}" + + +@then( + 'a Parameter record should exist with parameter_name "{name}" and matrix "{matrix}"' +) +def step_then_parameter_exists(context: Context, name: str, matrix: str): + with session_ctx() as session: + param = session.execute( + select(Parameter).where( + Parameter.parameter_name == name, Parameter.matrix == matrix + ) + ).scalar_one_or_none() + assert param is not None, ( + f"No Parameter found with parameter_name='{name}' and matrix='{matrix}'" + ) + context._last_parameter = param + + +@then( + 'the Observation should reference the Parameter with parameter_name "{name}" and matrix "{matrix}"' +) +def step_then_observation_refs_parameter(context: Context, name: str, matrix: str): + obs = context._last_observation + with session_ctx() as session: + obs = session.merge(obs) + param = session.get(Parameter, obs.parameter_id) + assert param is not None, "Observation must reference a Parameter" + assert param.parameter_name == name, ( + f"Expected parameter_name='{name}', got '{param.parameter_name}'" + ) + assert param.matrix == matrix, ( + f"Expected matrix='{matrix}', got '{param.matrix}'" + ) + + +@then('the Observation should set analysis_method_name to "{method}"') +def step_then_observation_analysis_method(context: Context, method: str): + obs = context._last_observation + with session_ctx() as session: + obs = session.merge(obs) + assert obs.analysis_method_id is not None, ( + "Observation should have an analysis_method_id" + ) + am = session.get(AnalysisMethod, obs.analysis_method_id) + assert am is not None, "AnalysisMethod should exist" + assert am.analysis_method_name == method, ( + f"Expected analysis_method_name='{method}', got '{am.analysis_method_name}'" + ) + + +@then("the Observation should set uncertainty to {value}") +def step_then_observation_uncertainty(context: Context, value: str): + obs = context._last_observation + expected = float(value) + with session_ctx() as session: + obs = session.merge(obs) + assert obs.uncertainty == expected, ( + f"Expected uncertainty={expected}, got {obs.uncertainty}" + ) + + +@then('the Observation should set analysis_agency to "{agency}"') +def step_then_observation_analysis_agency(context: Context, agency: str): + obs = context._last_observation + with session_ctx() as session: + obs = session.merge(obs) + assert obs.analysis_agency == agency, ( + f"Expected analysis_agency='{agency}', got '{obs.analysis_agency}'" + ) + + +# --- GlobalID-variant steps --- +@then( + 'the Observation for GlobalID "{global_id}" should reference the Sample with nma_pk_chemistrysample "{sample_pt_id}"' +) +def step_then_obs_by_gid_refs_sample( + context: Context, global_id: str, sample_pt_id: str +): + with session_ctx() as session: + obs = _get_observation_by_globalid(session, global_id) + sample = session.get(Sample, obs.sample_id) + assert sample is not None + assert sample.nma_pk_chemistrysample == sample_pt_id, ( + f"Expected Sample.nma_pk_chemistrysample={sample_pt_id}, " + f"got {sample.nma_pk_chemistrysample}" + ) + + +@then( + 'the Observation for GlobalID "{global_id}" should reference the Thing associated with that Sample' +) +def step_then_obs_by_gid_refs_thing(context: Context, global_id: str): + with session_ctx() as session: + obs = _get_observation_by_globalid(session, global_id) + sample = session.get(Sample, obs.sample_id) + assert sample is not None + fa = sample.field_activity + assert fa is not None, "Sample must have a field_activity" + fe = fa.field_event + assert fe is not None, "FieldActivity must have a field_event" + assert fe.thing_id is not None, "FieldEvent must reference a Thing" + + +@then( + 'the Observation for GlobalID "{global_id}" should set analysis_method_name to "{method}"' +) +def step_then_obs_by_gid_analysis_method( + context: Context, global_id: str, method: str +): + with session_ctx() as session: + obs = _get_observation_by_globalid(session, global_id) + assert obs.analysis_method_id is not None + am = session.get(AnalysisMethod, obs.analysis_method_id) + assert am is not None + assert am.analysis_method_name == method, ( + f"Expected analysis_method_name='{method}', got '{am.analysis_method_name}'" + ) + + +@then( + 'the Observation for GlobalID "{global_id}" should reference the Parameter with parameter_name "{name}" and matrix "{matrix}"' +) +def step_then_obs_by_gid_refs_parameter( + context: Context, global_id: str, name: str, matrix: str +): + with session_ctx() as session: + obs = _get_observation_by_globalid(session, global_id) + param = session.get(Parameter, obs.parameter_id) + assert param is not None + assert param.parameter_name == name + assert param.matrix == matrix + + +@then( + 'the Observation for GlobalID "{global_id}" should set detect_flag to {flag}' +) +def step_then_obs_by_gid_detect_flag(context: Context, global_id: str, flag: str): + expected = flag.lower() == "true" + with session_ctx() as session: + obs = _get_observation_by_globalid(session, global_id) + assert obs.detect_flag == expected, ( + f"Expected detect_flag={expected}, got {obs.detect_flag}" + ) + + +@then( + 'the Observation for GlobalID "{global_id}" should not store SamplePointID, OBJECTID, or WCLab_ID' +) +def step_then_obs_by_gid_no_extra_columns(context: Context, global_id: str): + with session_ctx() as session: + obs = _get_observation_by_globalid(session, global_id) + for attr in ("nma_sample_point_id", "nma_object_id", "nma_wclab_id"): + assert not hasattr(obs, attr), ( + f"Observation should not have attribute '{attr}'" + ) + + +# --- Sample volume steps --- +@then("the Sample should set volume to {value}") +def step_then_sample_volume(context: Context, value: str): + expected = float(value) + with session_ctx() as session: + # Find the sample that was just modified + samples = session.execute( + select(Sample).where(Sample.volume.isnot(None)) + ).scalars().all() + assert len(samples) >= 1, "Expected at least one Sample with volume set" + sample = samples[-1] + assert sample.volume == expected, ( + f"Expected volume={expected}, got {sample.volume}" + ) + + +@then('the Sample should set volume_unit to "{unit}"') +def step_then_sample_volume_unit(context: Context, unit: str): + with session_ctx() as session: + samples = session.execute( + select(Sample).where(Sample.volume_unit.isnot(None)) + ).scalars().all() + assert len(samples) >= 1, "Expected at least one Sample with volume_unit set" + sample = samples[-1] + assert sample.volume_unit == unit, ( + f"Expected volume_unit='{unit}', got '{sample.volume_unit}'" + ) + + +# --- Notes steps --- +@then("a Notes record should exist with:") +def step_then_notes_exist(context: Context): + fields = {row["field"]: row["value"] for row in context.table} + target_table = fields["target_table"] + note_type = fields["note_type"] + content = fields["content"] + + # Resolve dynamic target_id + target_id_raw = fields["target_id"] + if target_id_raw.startswith("(observation.id for GlobalID"): + gid = target_id_raw.split("GlobalID")[1].strip().rstrip(")") + with session_ctx() as session: + obs = _get_observation_by_globalid(session, gid) + target_id = obs.id + else: + target_id = int(target_id_raw) + + with session_ctx() as session: + note = session.execute( + select(Notes).where( + Notes.target_id == target_id, + Notes.target_table == target_table, + Notes.note_type == note_type, + Notes.content == content, + ) + ).scalar_one_or_none() + assert note is not None, ( + f"No Notes found with target_table={target_table}, target_id={target_id}, " + f"note_type={note_type}, content={content}" + ) + + +# --- Orphan prevention steps --- +@then( + "the backfill job should report {n:d} skipped record due to missing Sample linkage (SamplePtID)" +) +def step_then_skipped_orphans(context: Context, n: int): + assert context.backfill_result is not None, "Backfill result not found on context" + assert context.backfill_result.skipped_orphans == n, ( + f"Expected {n} skipped orphans, got {context.backfill_result.skipped_orphans}" + ) + + +@then( + 'no Observation record should exist with nma_pk_chemistryresults "{global_id}"' +) +def step_then_no_observation(context: Context, global_id: str): + with session_ctx() as session: + count = session.execute( + select(Observation).where( + Observation.nma_pk_chemistryresults == global_id.lower() + ) + ).all() + assert len(count) == 0, ( + f"Expected 0 Observations for GlobalID {global_id}, found {len(count)}" + ) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- +def _create_radionuclide_from_fields(context: Context, fields: dict): + """Create NMA_Chemistry_SampleInfo (if needed) and NMA_Radionuclides row.""" + _ensure_well(context) + global_id = fields.get("GlobalID") + sample_pt_id = fields.get("SamplePtID") + + with session_ctx() as session: + well = context.objects["wells"][0] + well = session.merge(well) + + # Get or create SampleInfo for this SamplePtID + sampleinfo = session.execute( + select(NMA_Chemistry_SampleInfo).where( + NMA_Chemistry_SampleInfo.nma_sample_pt_id == sample_pt_id + ) + ).scalar_one_or_none() + + if sampleinfo is None: + sampleinfo = NMA_Chemistry_SampleInfo( + thing_id=well.id, + nma_sample_pt_id=sample_pt_id, + nma_sample_point_id=fields.get("SamplePointID", "UNKNOWN"), + ) + session.add(sampleinfo) + session.flush() + context._backfill_created["nma_sampleinfo_ids"].append(sampleinfo.id) + + # Parse analysis date (store as UTC so the backfill's tz handling is a no-op) + analysis_date = None + if fields.get("AnalysisDate"): + analysis_date = datetime.strptime( + fields["AnalysisDate"], "%Y-%m-%d" + ).replace(tzinfo=timezone.utc) + + # Parse numeric fields + sample_value = float(fields["SampleValue"]) if fields.get("SampleValue") else None + uncertainty_val = float(fields["Uncertainty"]) if fields.get("Uncertainty") else None + volume_val = int(fields["Volume"]) if fields.get("Volume") else None + + rad = NMA_Radionuclides( + chemistry_sample_info_id=sampleinfo.id, + nma_global_id=global_id, + nma_sample_pt_id=sample_pt_id, + nma_sample_point_id=fields.get("SamplePointID"), + nma_object_id=int(fields["OBJECTID"]) if fields.get("OBJECTID") else None, + nma_wclab_id=fields.get("WCLab_ID"), + analyte=fields.get("Analyte"), + symbol=fields.get("Symbol"), + sample_value=sample_value, + units=fields.get("Units"), + uncertainty=uncertainty_val, + analysis_method=fields.get("AnalysisMethod"), + analysis_date=analysis_date, + notes=fields.get("Notes"), + volume=volume_val, + volume_unit=fields.get("VolumeUnit"), + analyses_agency=fields.get("AnalysesAgency"), + ) + session.add(rad) + session.commit() + session.refresh(rad) + context._backfill_created["nma_radionuclide_ids"].append(rad.id) + + +# ============= EOF ============================================= diff --git a/transfers/backfill/backfill.py b/transfers/backfill/backfill.py index fc7f50268..94734621b 100644 --- a/transfers/backfill/backfill.py +++ b/transfers/backfill/backfill.py @@ -30,20 +30,17 @@ sys.path.insert(0, str(ROOT)) from services.util import get_bool_env +from transfers.backfill.chemistry_backfill import backfill_radionuclides from transfers.logger import logger def run(batch_size: int = 1000) -> None: """ Execute all backfill steps in a deterministic order. - - Currently, no concrete backfill steps are registered. This function is kept - as a stable orchestration entry point (used by CD/CLI) and will be wired - up to real backfill steps in future refactoring work. """ - # NOTE: Intentionally empty; this serves as a placeholder until concrete - # backfill steps are implemented and registered in this tuple. - steps = () + steps = ( + ("Radionuclides", backfill_radionuclides, "BACKFILL_RADIONUCLIDES"), + ) for name, fn, flag in steps: if not get_bool_env(flag, True): logger.info(f"Skipping backfill: {name} ({flag}=false)") diff --git a/transfers/backfill/chemistry_backfill.py b/transfers/backfill/chemistry_backfill.py new file mode 100644 index 000000000..a1cbff0b2 --- /dev/null +++ b/transfers/backfill/chemistry_backfill.py @@ -0,0 +1,305 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +Backfill NMA_Radionuclides rows into the Ocotillo Observation schema. + +Reads legacy NMA_Radionuclides + NMA_Chemistry_SampleInfo, resolves each +row to an existing Sample (via nma_pk_chemistrysample), and upserts +Observation records keyed on nma_pk_chemistryresults. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import timezone + +from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.orm import Session + +from db.analysis_method import AnalysisMethod +from db.notes import Notes +from db.nma_legacy import NMA_Radionuclides, NMA_Chemistry_SampleInfo +from db.observation import Observation +from db.parameter import Parameter +from db.sample import Sample +from db.engine import session_ctx +from transfers.logger import logger + + +@dataclass +class BackfillResult: + inserted: int = 0 + updated: int = 0 + skipped_orphans: int = 0 + errors: list[str] = field(default_factory=list) + + +def _build_sample_cache(session: Session) -> dict[str, int]: + """Map Sample.nma_pk_chemistrysample → Sample.id for rows that have the key.""" + rows = session.execute( + select(Sample.nma_pk_chemistrysample, Sample.id).where( + Sample.nma_pk_chemistrysample.isnot(None) + ) + ).all() + return {str(key).lower(): sid for key, sid in rows} + + +def _get_or_create_parameter( + session: Session, parameter_name: str, matrix: str = "water" +) -> Parameter: + """Return existing Parameter or create a new one. + + The parameter_name must already exist as a LexiconTerm (FK constraint). + If it doesn't, the insert will fail — callers should ensure vocabulary + is seeded before running the backfill. + """ + param = session.execute( + select(Parameter).where( + Parameter.parameter_name == parameter_name, + Parameter.matrix == matrix, + ) + ).scalar_one_or_none() + if param is None: + param = Parameter( + parameter_name=parameter_name, + matrix=matrix, + default_unit="pCi/L", + release_status="draft", + ) + session.add(param) + session.flush() + return param + + +def _get_or_create_analysis_method( + session: Session, method_code: str +) -> AnalysisMethod: + """Return existing AnalysisMethod or create a new one.""" + am = session.execute( + select(AnalysisMethod).where( + AnalysisMethod.analysis_method_code == method_code + ) + ).scalar_one_or_none() + if am is None: + am = AnalysisMethod( + analysis_method_code=method_code, + analysis_method_name=method_code, + release_status="draft", + ) + session.add(am) + session.flush() + return am + + +def _resolve_detect_flag(symbol: str | None, value: float | None) -> bool | None: + """Map legacy Symbol qualifier to detect_flag. + + Rules: + - Symbol == '<' → False (below detection limit) + - Symbol is non-null and != '<' → True (detected, other qualifier) + - Symbol is null and value is not None → True (detected, no qualifier) + - Symbol is null and value is None → None (unknown) + """ + if symbol is not None: + return symbol != "<" + if value is not None: + return True + return None + + +def _backfill_radionuclides_impl(session: Session, batch_size: int) -> BackfillResult: + result = BackfillResult() + + sample_cache = _build_sample_cache(session) + logger.info("Sample cache built with %d entries", len(sample_cache)) + + # Track existing observation keys so we know insert vs update + existing_keys = set( + row[0] + for row in session.execute( + select(Observation.nma_pk_chemistryresults).where( + Observation.nma_pk_chemistryresults.isnot(None) + ) + ).all() + ) + + # Query all radionuclides joined with sample info for nma_sample_pt_id + legacy_rows = session.execute( + select(NMA_Radionuclides, NMA_Chemistry_SampleInfo.nma_sample_pt_id).join( + NMA_Chemistry_SampleInfo, + NMA_Radionuclides.chemistry_sample_info_id + == NMA_Chemistry_SampleInfo.id, + ) + ).all() + + logger.info("Processing %d legacy Radionuclides rows", len(legacy_rows)) + + for row, nma_sample_pt_id in legacy_rows: + global_id_str = str(row.nma_global_id).lower() if row.nma_global_id else None + if global_id_str is None: + result.errors.append(f"Row id={row.id} has no nma_global_id") + continue + + # Resolve Sample via nma_pk_chemistrysample + sample_pt_key = str(nma_sample_pt_id).lower() if nma_sample_pt_id else None + if sample_pt_key is None or sample_pt_key not in sample_cache: + result.skipped_orphans += 1 + logger.debug( + "Skipping Radionuclide GlobalID=%s — no matching Sample for SamplePtID=%s", + global_id_str, + sample_pt_key, + ) + continue + + sample_id = sample_cache[sample_pt_key] + + # Get-or-create Parameter + analyte = row.analyte + if not analyte: + result.errors.append( + f"Row GlobalID={global_id_str} has no Analyte — skipping" + ) + continue + param = _get_or_create_parameter(session, analyte, "water") + + # Get-or-create AnalysisMethod + analysis_method_id = None + if row.analysis_method: + am = _get_or_create_analysis_method(session, row.analysis_method) + analysis_method_id = am.id + + # Resolve detect_flag + detect_flag = _resolve_detect_flag(row.symbol, row.sample_value) + + # Build observation_datetime — use analysis_date or fallback + obs_dt = row.analysis_date + if obs_dt is not None: + if obs_dt.tzinfo is None: + obs_dt = obs_dt.replace(tzinfo=timezone.utc) + else: + # If no analysis date, we cannot create an observation + result.errors.append( + f"Row GlobalID={global_id_str} has no AnalysisDate — using epoch" + ) + from datetime import datetime + + obs_dt = datetime(1970, 1, 1, tzinfo=timezone.utc) + + # Determine unit + unit = row.units if row.units else "pCi/L" + + # Upsert Observation + obs_values = { + "nma_pk_chemistryresults": global_id_str, + "sample_id": sample_id, + "parameter_id": param.id, + "analysis_method_id": analysis_method_id, + "observation_datetime": obs_dt, + "value": row.sample_value, + "unit": unit, + "detect_flag": detect_flag, + "uncertainty": row.uncertainty, + "analysis_agency": row.analyses_agency, + "release_status": "draft", + } + + stmt = pg_insert(Observation).values(**obs_values) + stmt = stmt.on_conflict_do_update( + index_elements=["nma_pk_chemistryresults"], + set_={ + "sample_id": stmt.excluded.sample_id, + "parameter_id": stmt.excluded.parameter_id, + "analysis_method_id": stmt.excluded.analysis_method_id, + "observation_datetime": stmt.excluded.observation_datetime, + "value": stmt.excluded.value, + "unit": stmt.excluded.unit, + "detect_flag": stmt.excluded.detect_flag, + "uncertainty": stmt.excluded.uncertainty, + "analysis_agency": stmt.excluded.analysis_agency, + }, + ) + session.execute(stmt) + + if global_id_str in existing_keys: + result.updated += 1 + else: + result.inserted += 1 + existing_keys.add(global_id_str) + + # Update Sample volume/volume_unit if present on legacy row + if row.volume is not None or row.volume_unit is not None: + sample = session.get(Sample, sample_id) + if sample: + if row.volume is not None: + sample.volume = float(row.volume) + if row.volume_unit is not None: + sample.volume_unit = row.volume_unit + + # Create Notes if present + if row.notes: + # Look up the observation we just upserted + obs = session.execute( + select(Observation).where( + Observation.nma_pk_chemistryresults == global_id_str + ) + ).scalar_one_or_none() + + if obs is None: + result.errors.append( + f"Row GlobalID={global_id_str}: upserted Observation not found " + f"when creating Notes — skipping note" + ) + continue + + # Check if note already exists for this observation + existing_note = session.execute( + select(Notes).where( + Notes.target_id == obs.id, + Notes.target_table == "observation", + Notes.note_type == "Chemistry Observation", + Notes.content == row.notes, + ) + ).scalar_one_or_none() + + if existing_note is None: + note = Notes( + target_id=obs.id, + target_table="observation", + note_type="Chemistry Observation", + content=row.notes, + release_status="draft", + ) + session.add(note) + + session.commit() + logger.info( + "Backfill complete: inserted=%d updated=%d skipped_orphans=%d errors=%d", + result.inserted, + result.updated, + result.skipped_orphans, + len(result.errors), + ) + return result + + +def backfill_radionuclides(batch_size: int = 1000) -> BackfillResult: + """Top-level runner for the radionuclides backfill.""" + with session_ctx() as session: + return _backfill_radionuclides_impl(session, batch_size) + + +# ============= EOF ============================================= From 90e87b7e255c993d4ed0ea13f9c746f436bfc120 Mon Sep 17 00:00:00 2001 From: Kimball Bighorse Date: Fri, 27 Feb 2026 19:38:21 -0800 Subject: [PATCH 02/13] fix: harden chemistry backfill test cleanup - Wrap cleanup chain in try/except to prevent orphan data on failure (#566) - Scope AnalysisMethod cleanup to test-created IDs only (#567) Also addresses #565 (scalar_one hardening) and #568 (remove unused parameter_ids tracker) which were applied to the new files in the preceding feature commit. Refs #558 Co-Authored-By: Claude Opus 4.6 --- tests/features/environment.py | 104 ++++++++++++++++++++++++++++++++-- 1 file changed, 100 insertions(+), 4 deletions(-) diff --git a/tests/features/environment.py b/tests/features/environment.py index 4f3a6d2b5..3eb5c3bb5 100644 --- a/tests/features/environment.py +++ b/tests/features/environment.py @@ -51,6 +51,11 @@ Sample, Base, ) +from db.analysis_method import AnalysisMethod +from db.field import FieldEvent, FieldActivity +from db.nma_legacy import NMA_Radionuclides, NMA_Chemistry_SampleInfo +from db.notes import Notes +from db.observation import Observation from db.engine import session_ctx from db.initialization import recreate_public_schema, sync_search_vector_triggers from services.util import get_bool_env @@ -741,15 +746,107 @@ def before_scenario(context, scenario): def after_scenario(context, scenario): + # Chemistry backfill cleanup — runs regardless of DROP_AND_REBUILD_DB + # because the backfill steps create their own fixture data. + if hasattr(context, "_backfill_created"): + try: + with session_ctx() as session: + created = context._backfill_created + + # Delete in FK order: Notes → Observations → Samples → FieldActivities → FieldEvents → NMA rows + # First, delete Notes linked to observations we created + obs_ids = [ + row[0] + for row in session.execute( + select(Observation.id).where( + Observation.nma_pk_chemistryresults.isnot(None) + ) + ).all() + ] + if obs_ids: + for note in session.query(Notes).filter( + Notes.target_table == "observation", + Notes.target_id.in_(obs_ids), + ).all(): + session.delete(note) + + # Delete observations created by backfill + for obs in session.query(Observation).filter( + Observation.nma_pk_chemistryresults.isnot(None) + ).all(): + session.delete(obs) + + # Delete NMA_Radionuclides created in this scenario + for rid in created.get("nma_radionuclide_ids", []): + row = session.get(NMA_Radionuclides, rid) + if row: + session.delete(row) + + # Delete NMA_Chemistry_SampleInfo created in this scenario + for sid in created.get("nma_sampleinfo_ids", []): + row = session.get(NMA_Chemistry_SampleInfo, sid) + if row: + session.delete(row) + + # Delete Samples (cascades to observations already deleted) + for sid in created.get("sample_ids", []): + row = session.get(Sample, sid) + if row: + session.delete(row) + + # Delete FieldActivities + for faid in created.get("field_activity_ids", []): + row = session.get(FieldActivity, faid) + if row: + session.delete(row) + + # Delete FieldEvents + for feid in created.get("field_event_ids", []): + row = session.get(FieldEvent, feid) + if row: + session.delete(row) + + # Clean up AnalysisMethods created during this scenario's backfill + for amid in created.get("analysis_method_ids", []): + row = session.get(AnalysisMethod, amid) + if row: + session.delete(row) + + # Delete Things (wells) created by _ensure_well + for wid in created.get("well_ids", []): + # Delete LocationThingAssociation first + session.execute( + LocationThingAssociation.__table__.delete().where( + LocationThingAssociation.thing_id == wid + ) + ) + row = session.get(Thing, wid) + if row: + session.delete(row) + + # Delete Locations created by _ensure_well + for lid in created.get("location_ids", []): + row = session.get(Location, lid) + if row: + session.delete(row) + + session.commit() + except Exception: + import logging + + logging.getLogger(__name__).error( + "Chemistry backfill cleanup failed for scenario '%s'", + scenario.name, + exc_info=True, + ) + if not get_bool_env("DROP_AND_REBUILD_DB"): return - # runs after EVERY scenario - # e.g. clean up temp files, close db sessions + # runs after EVERY scenario (only when DROP_AND_REBUILD_DB is set) if scenario.name.startswith( "Successfully upload and associate assets from a valid manifest" ): - # delete all the assets uploaded for this scenario with session_ctx() as session: for uri in context.uris: sql = select(Asset).where(Asset.uri == uri) @@ -757,7 +854,6 @@ def after_scenario(context, scenario): session.delete(asset) session.commit() elif "cleanup_samples" in scenario.tags: - # delete all samples created during happy path tests with session_ctx() as session: samples = session.query(Sample).all() for sample in samples: From 03df1f1b79b5ee728c84fff532cc5d40f237f303 Mon Sep 17 00:00:00 2001 From: Kimball Bighorse Date: Fri, 27 Feb 2026 19:48:25 -0800 Subject: [PATCH 03/13] fix: harden backfill robustness and remove misleading batch_size - Add per-row savepoint isolation so one bad row doesn't abort the run - Skip rows with missing AnalysisDate instead of inserting epoch sentinel - Remove unused batch_size parameter from backfill signatures and CLI - Fix migration downgrade to backfill thing_id before setting NOT NULL - Track analysis_method_ids in test step defs for scoped cleanup Refs #558 Co-Authored-By: Claude Opus 4.6 --- ...e5e8_add_chemistry_backfill_columns_to_.py | 11 +- tests/features/steps/chemistry-backfill.py | 19 ++ transfers/backfill/backfill.py | 25 +- transfers/backfill/chemistry_backfill.py | 217 +++++++++--------- 4 files changed, 145 insertions(+), 127 deletions(-) diff --git a/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py b/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py index 850847747..2b67f23de 100644 --- a/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py +++ b/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py @@ -88,10 +88,17 @@ def upgrade() -> None: def downgrade() -> None: """Remove chemistry backfill columns.""" - # Restore NMA_Radionuclides.thing_id + # Restore NMA_Radionuclides.thing_id (add nullable, backfill, then enforce) op.add_column('NMA_Radionuclides', sa.Column( - 'thing_id', sa.Integer(), nullable=False, + 'thing_id', sa.Integer(), nullable=True, )) + op.execute( + 'UPDATE "NMA_Radionuclides" r ' + 'SET thing_id = csi.thing_id ' + 'FROM "NMA_Chemistry_SampleInfo" csi ' + 'WHERE r.chemistry_sample_info_id = csi.id' + ) + op.alter_column('NMA_Radionuclides', 'thing_id', nullable=False) op.create_foreign_key( 'NMA_Radionuclides_thing_id_fkey', 'NMA_Radionuclides', 'thing', ['thing_id'], ['id'], ondelete='CASCADE', diff --git a/tests/features/steps/chemistry-backfill.py b/tests/features/steps/chemistry-backfill.py index b80e9e722..a54b6460f 100644 --- a/tests/features/steps/chemistry-backfill.py +++ b/tests/features/steps/chemistry-backfill.py @@ -274,12 +274,30 @@ def step_given_sample_with_chemistry_key(context: Context, sample_pt_id: str): # --------------------------------------------------------------------------- # WHEN steps # --------------------------------------------------------------------------- +def _track_created_analysis_methods(context: Context): + """Record AnalysisMethod IDs created by the backfill for cleanup.""" + _ensure_backfill_tracking(context) + with session_ctx() as session: + # Find methods referenced by backfill-created observations + obs_am_ids = [ + row[0] + for row in session.execute( + select(Observation.analysis_method_id).where( + Observation.nma_pk_chemistryresults.isnot(None), + Observation.analysis_method_id.isnot(None), + ) + ).all() + ] + context._backfill_created["analysis_method_ids"] = list(set(obs_am_ids)) + + @when("I run the Radionuclides backfill job") def step_when_run_backfill(context: Context): """Execute the backfill and store the result on context.""" from transfers.backfill.chemistry_backfill import backfill_radionuclides context.backfill_result = backfill_radionuclides() + _track_created_analysis_methods(context) @when("I run the Radionuclides backfill job again") @@ -288,6 +306,7 @@ def step_when_run_backfill_again(context: Context): from transfers.backfill.chemistry_backfill import backfill_radionuclides context.backfill_result = backfill_radionuclides() + _track_created_analysis_methods(context) # --------------------------------------------------------------------------- diff --git a/transfers/backfill/backfill.py b/transfers/backfill/backfill.py index 94734621b..1d1d4b6ce 100644 --- a/transfers/backfill/backfill.py +++ b/transfers/backfill/backfill.py @@ -17,10 +17,9 @@ Orchestrates the backfill pipeline used in CD workflows. Preferred usage (avoids import path issues): - python -m transfers.backfill.backfill --batch-size 1000 + python -m transfers.backfill.backfill """ -import argparse import sys from pathlib import Path @@ -34,10 +33,8 @@ from transfers.logger import logger -def run(batch_size: int = 1000) -> None: - """ - Execute all backfill steps in a deterministic order. - """ +def run() -> None: + """Execute all backfill steps in a deterministic order.""" steps = ( ("Radionuclides", backfill_radionuclides, "BACKFILL_RADIONUCLIDES"), ) @@ -46,7 +43,7 @@ def run(batch_size: int = 1000) -> None: logger.info(f"Skipping backfill: {name} ({flag}=false)") continue logger.info(f"Starting backfill: {name}") - result = fn(batch_size) + result = fn() logger.info( f"Completed backfill: {name} — " f"inserted={result.inserted} updated={result.updated} " @@ -57,21 +54,9 @@ def run(batch_size: int = 1000) -> None: logger.warning(f" {name}: {err}") -def _parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Run backfill pipeline.") - parser.add_argument( - "--batch-size", - type=int, - default=1000, - help="Number of rows to insert per batch.", - ) - return parser.parse_args() - - if __name__ == "__main__": - args = _parse_args() try: - run(batch_size=args.batch_size) + run() except Exception: logger.critical("Backfill orchestration failed", exc_info=True) sys.exit(1) diff --git a/transfers/backfill/chemistry_backfill.py b/transfers/backfill/chemistry_backfill.py index a1cbff0b2..e61d32c2c 100644 --- a/transfers/backfill/chemistry_backfill.py +++ b/transfers/backfill/chemistry_backfill.py @@ -28,6 +28,7 @@ from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Session from db.analysis_method import AnalysisMethod @@ -121,7 +122,7 @@ def _resolve_detect_flag(symbol: str | None, value: float | None) -> bool | None return None -def _backfill_radionuclides_impl(session: Session, batch_size: int) -> BackfillResult: +def _backfill_radionuclides_impl(session: Session) -> BackfillResult: result = BackfillResult() sample_cache = _build_sample_cache(session) @@ -167,123 +168,129 @@ def _backfill_radionuclides_impl(session: Session, batch_size: int) -> BackfillR sample_id = sample_cache[sample_pt_key] - # Get-or-create Parameter analyte = row.analyte if not analyte: result.errors.append( f"Row GlobalID={global_id_str} has no Analyte — skipping" ) continue - param = _get_or_create_parameter(session, analyte, "water") - # Get-or-create AnalysisMethod - analysis_method_id = None - if row.analysis_method: - am = _get_or_create_analysis_method(session, row.analysis_method) - analysis_method_id = am.id - - # Resolve detect_flag - detect_flag = _resolve_detect_flag(row.symbol, row.sample_value) - - # Build observation_datetime — use analysis_date or fallback + # Build observation_datetime obs_dt = row.analysis_date if obs_dt is not None: if obs_dt.tzinfo is None: obs_dt = obs_dt.replace(tzinfo=timezone.utc) else: - # If no analysis date, we cannot create an observation result.errors.append( - f"Row GlobalID={global_id_str} has no AnalysisDate — using epoch" + f"Row GlobalID={global_id_str} has no AnalysisDate — skipping" ) - from datetime import datetime - - obs_dt = datetime(1970, 1, 1, tzinfo=timezone.utc) - - # Determine unit - unit = row.units if row.units else "pCi/L" - - # Upsert Observation - obs_values = { - "nma_pk_chemistryresults": global_id_str, - "sample_id": sample_id, - "parameter_id": param.id, - "analysis_method_id": analysis_method_id, - "observation_datetime": obs_dt, - "value": row.sample_value, - "unit": unit, - "detect_flag": detect_flag, - "uncertainty": row.uncertainty, - "analysis_agency": row.analyses_agency, - "release_status": "draft", - } - - stmt = pg_insert(Observation).values(**obs_values) - stmt = stmt.on_conflict_do_update( - index_elements=["nma_pk_chemistryresults"], - set_={ - "sample_id": stmt.excluded.sample_id, - "parameter_id": stmt.excluded.parameter_id, - "analysis_method_id": stmt.excluded.analysis_method_id, - "observation_datetime": stmt.excluded.observation_datetime, - "value": stmt.excluded.value, - "unit": stmt.excluded.unit, - "detect_flag": stmt.excluded.detect_flag, - "uncertainty": stmt.excluded.uncertainty, - "analysis_agency": stmt.excluded.analysis_agency, - }, - ) - session.execute(stmt) + continue - if global_id_str in existing_keys: - result.updated += 1 - else: - result.inserted += 1 - existing_keys.add(global_id_str) - - # Update Sample volume/volume_unit if present on legacy row - if row.volume is not None or row.volume_unit is not None: - sample = session.get(Sample, sample_id) - if sample: - if row.volume is not None: - sample.volume = float(row.volume) - if row.volume_unit is not None: - sample.volume_unit = row.volume_unit - - # Create Notes if present - if row.notes: - # Look up the observation we just upserted - obs = session.execute( - select(Observation).where( - Observation.nma_pk_chemistryresults == global_id_str - ) - ).scalar_one_or_none() - - if obs is None: - result.errors.append( - f"Row GlobalID={global_id_str}: upserted Observation not found " - f"when creating Notes — skipping note" - ) - continue - - # Check if note already exists for this observation - existing_note = session.execute( - select(Notes).where( - Notes.target_id == obs.id, - Notes.target_table == "observation", - Notes.note_type == "Chemistry Observation", - Notes.content == row.notes, - ) - ).scalar_one_or_none() - - if existing_note is None: - note = Notes( - target_id=obs.id, - target_table="observation", - note_type="Chemistry Observation", - content=row.notes, - release_status="draft", - ) - session.add(note) + # Per-row savepoint so one bad row doesn't abort the entire backfill + savepoint = session.begin_nested() + try: + # Get-or-create Parameter + param = _get_or_create_parameter(session, analyte, "water") + + # Get-or-create AnalysisMethod + analysis_method_id = None + if row.analysis_method: + am = _get_or_create_analysis_method(session, row.analysis_method) + analysis_method_id = am.id + + # Resolve detect_flag + detect_flag = _resolve_detect_flag(row.symbol, row.sample_value) + + # Determine unit + unit = row.units if row.units else "pCi/L" + + # Upsert Observation + obs_values = { + "nma_pk_chemistryresults": global_id_str, + "sample_id": sample_id, + "parameter_id": param.id, + "analysis_method_id": analysis_method_id, + "observation_datetime": obs_dt, + "value": row.sample_value, + "unit": unit, + "detect_flag": detect_flag, + "uncertainty": row.uncertainty, + "analysis_agency": row.analyses_agency, + "release_status": "draft", + } + + stmt = pg_insert(Observation).values(**obs_values) + stmt = stmt.on_conflict_do_update( + index_elements=["nma_pk_chemistryresults"], + set_={ + "sample_id": stmt.excluded.sample_id, + "parameter_id": stmt.excluded.parameter_id, + "analysis_method_id": stmt.excluded.analysis_method_id, + "observation_datetime": stmt.excluded.observation_datetime, + "value": stmt.excluded.value, + "unit": stmt.excluded.unit, + "detect_flag": stmt.excluded.detect_flag, + "uncertainty": stmt.excluded.uncertainty, + "analysis_agency": stmt.excluded.analysis_agency, + }, + ) + session.execute(stmt) + + if global_id_str in existing_keys: + result.updated += 1 + else: + result.inserted += 1 + existing_keys.add(global_id_str) + + # Update Sample volume/volume_unit if present on legacy row + if row.volume is not None or row.volume_unit is not None: + sample = session.get(Sample, sample_id) + if sample: + if row.volume is not None: + sample.volume = float(row.volume) + if row.volume_unit is not None: + sample.volume_unit = row.volume_unit + + # Create Notes if present + if row.notes: + obs = session.execute( + select(Observation).where( + Observation.nma_pk_chemistryresults == global_id_str + ) + ).scalar_one_or_none() + + if obs is None: + result.errors.append( + f"Row GlobalID={global_id_str}: upserted Observation not found " + f"when creating Notes — skipping note" + ) + else: + existing_note = session.execute( + select(Notes).where( + Notes.target_id == obs.id, + Notes.target_table == "observation", + Notes.note_type == "Chemistry Observation", + Notes.content == row.notes, + ) + ).scalar_one_or_none() + + if existing_note is None: + note = Notes( + target_id=obs.id, + target_table="observation", + note_type="Chemistry Observation", + content=row.notes, + release_status="draft", + ) + session.add(note) + + savepoint.commit() + except SQLAlchemyError as exc: + savepoint.rollback() + result.errors.append( + f"Row GlobalID={global_id_str}: {exc}" + ) + continue session.commit() logger.info( @@ -296,10 +303,10 @@ def _backfill_radionuclides_impl(session: Session, batch_size: int) -> BackfillR return result -def backfill_radionuclides(batch_size: int = 1000) -> BackfillResult: +def backfill_radionuclides() -> BackfillResult: """Top-level runner for the radionuclides backfill.""" with session_ctx() as session: - return _backfill_radionuclides_impl(session, batch_size) + return _backfill_radionuclides_impl(session) # ============= EOF ============================================= From 2cba0ae74716bf2581cb29b533f9abea42455738 Mon Sep 17 00:00:00 2001 From: Kimball Bighorse Date: Fri, 27 Feb 2026 19:59:39 -0800 Subject: [PATCH 04/13] fix: make migration idempotent, remove batch-size drift, scope test cleanup - Migration upgrade now checks information_schema before dropping thing_id column, guarding against environments where it was already removed - run_backfill.sh no longer references removed --batch-size parameter - Test observation cleanup scoped to scenario sample_ids instead of blanket nma_pk_chemistryresults IS NOT NULL query Refs #558, #570 Co-Authored-By: Claude Opus 4.6 --- ...e5e8_add_chemistry_backfill_columns_to_.py | 17 +++++++--- run_backfill.sh | 5 ++- tests/features/environment.py | 33 +++++++++++-------- 3 files changed, 33 insertions(+), 22 deletions(-) diff --git a/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py b/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py index 2b67f23de..23904610e 100644 --- a/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py +++ b/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py @@ -79,11 +79,18 @@ def upgrade() -> None: ) # Drop stale thing_id column from NMA_Radionuclides (model no longer defines it; - # relationships go through NMA_Chemistry_SampleInfo.thing_id instead) - op.drop_constraint( - 'NMA_Radionuclides_thing_id_fkey', 'NMA_Radionuclides', type_='foreignkey', - ) - op.drop_column('NMA_Radionuclides', 'thing_id') + # relationships go through NMA_Chemistry_SampleInfo.thing_id instead). + # Guard against environments where the column was already removed. + conn = op.get_bind() + has_thing_id = conn.execute(sa.text( + "SELECT 1 FROM information_schema.columns " + "WHERE table_name = 'NMA_Radionuclides' AND column_name = 'thing_id'" + )).scalar() + if has_thing_id: + op.drop_constraint( + 'NMA_Radionuclides_thing_id_fkey', 'NMA_Radionuclides', type_='foreignkey', + ) + op.drop_column('NMA_Radionuclides', 'thing_id') def downgrade() -> None: diff --git a/run_backfill.sh b/run_backfill.sh index 984a9fe27..f316662e2 100755 --- a/run_backfill.sh +++ b/run_backfill.sh @@ -6,7 +6,7 @@ # e.g. state, county, quad_name,etc. It will also be used to handle data refactors/corrections in the future. # Load environment variables from .env and run the staging backfill. -# Usage: ./run_backfill.sh [--batch-size N] +# Usage: ./run_backfill.sh # github workflow equivalent: for reference only #- name: Run backfill script on staging database @@ -38,5 +38,4 @@ set +a uv run alembic upgrade head -# Forward any args (e.g., --batch-size 500) -python -m transfers.backfill.backfill "$@" +python -m transfers.backfill.backfill diff --git a/tests/features/environment.py b/tests/features/environment.py index 3eb5c3bb5..d5c306854 100644 --- a/tests/features/environment.py +++ b/tests/features/environment.py @@ -754,15 +754,20 @@ def after_scenario(context, scenario): created = context._backfill_created # Delete in FK order: Notes → Observations → Samples → FieldActivities → FieldEvents → NMA rows - # First, delete Notes linked to observations we created - obs_ids = [ - row[0] - for row in session.execute( - select(Observation.id).where( - Observation.nma_pk_chemistryresults.isnot(None) - ) - ).all() - ] + # Scope to observations linked to this scenario's samples + scenario_sample_ids = created.get("sample_ids", []) + obs_ids = [] + if scenario_sample_ids: + obs_ids = [ + row[0] + for row in session.execute( + select(Observation.id).where( + Observation.sample_id.in_(scenario_sample_ids) + ) + ).all() + ] + + # Delete Notes linked to those observations if obs_ids: for note in session.query(Notes).filter( Notes.target_table == "observation", @@ -770,11 +775,11 @@ def after_scenario(context, scenario): ).all(): session.delete(note) - # Delete observations created by backfill - for obs in session.query(Observation).filter( - Observation.nma_pk_chemistryresults.isnot(None) - ).all(): - session.delete(obs) + # Delete the observations themselves + for oid in obs_ids: + obs = session.get(Observation, oid) + if obs: + session.delete(obs) # Delete NMA_Radionuclides created in this scenario for rid in created.get("nma_radionuclide_ids", []): From 20cee26d72e3194e2945d33414b682fa25280470 Mon Sep 17 00:00:00 2001 From: Kimball Bighorse Date: Fri, 27 Feb 2026 20:07:41 -0800 Subject: [PATCH 05/13] fix: harden migration FK lookup, add row-error tracebacks, scope test tracking - Migration upgrade dynamically discovers FK names via sa.inspect() with guard for missing name entries - Per-row exception handler now logs full traceback via logger.exception() before appending summary to result.errors - Analysis method tracking scoped to scenario sample_ids - Removed incorrect issue reference from test comment Refs #558 Co-Authored-By: Claude Opus 4.6 --- .../545a5b77e5e8_add_chemistry_backfill_columns_to_.py | 8 +++++--- tests/features/steps/chemistry-backfill.py | 9 ++++++--- transfers/backfill/chemistry_backfill.py | 6 +++--- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py b/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py index 23904610e..11f462467 100644 --- a/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py +++ b/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py @@ -87,9 +87,11 @@ def upgrade() -> None: "WHERE table_name = 'NMA_Radionuclides' AND column_name = 'thing_id'" )).scalar() if has_thing_id: - op.drop_constraint( - 'NMA_Radionuclides_thing_id_fkey', 'NMA_Radionuclides', type_='foreignkey', - ) + # FK name may differ across environments; look it up dynamically. + fks = sa.inspect(conn).get_foreign_keys('NMA_Radionuclides') + for fk in fks: + if 'thing_id' in fk['constrained_columns'] and fk.get('name'): + op.drop_constraint(fk['name'], 'NMA_Radionuclides', type_='foreignkey') op.drop_column('NMA_Radionuclides', 'thing_id') diff --git a/tests/features/steps/chemistry-backfill.py b/tests/features/steps/chemistry-backfill.py index a54b6460f..dfa3ea75a 100644 --- a/tests/features/steps/chemistry-backfill.py +++ b/tests/features/steps/chemistry-backfill.py @@ -169,7 +169,7 @@ def step_given_lexicon_terms_exist(context: Context): # Test-only analyte names used by scenarios — seed if absent. # In production these would be pre-seeded via a vocabulary - # preparation step (see issue #XXX). + # preparation step. test_analytes = ( "GB", "Uranium", "GA", "Ra228", "Uranium-238", "Radium-226", "Nitrate", "Unknown", @@ -277,13 +277,16 @@ def step_given_sample_with_chemistry_key(context: Context, sample_pt_id: str): def _track_created_analysis_methods(context: Context): """Record AnalysisMethod IDs created by the backfill for cleanup.""" _ensure_backfill_tracking(context) + scenario_sample_ids = context._backfill_created.get("sample_ids", []) + if not scenario_sample_ids: + return with session_ctx() as session: - # Find methods referenced by backfill-created observations + # Scope to observations linked to this scenario's samples only obs_am_ids = [ row[0] for row in session.execute( select(Observation.analysis_method_id).where( - Observation.nma_pk_chemistryresults.isnot(None), + Observation.sample_id.in_(scenario_sample_ids), Observation.analysis_method_id.isnot(None), ) ).all() diff --git a/transfers/backfill/chemistry_backfill.py b/transfers/backfill/chemistry_backfill.py index e61d32c2c..472d15409 100644 --- a/transfers/backfill/chemistry_backfill.py +++ b/transfers/backfill/chemistry_backfill.py @@ -28,7 +28,6 @@ from sqlalchemy import select from sqlalchemy.dialects.postgresql import insert as pg_insert -from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Session from db.analysis_method import AnalysisMethod @@ -285,10 +284,11 @@ def _backfill_radionuclides_impl(session: Session) -> BackfillResult: session.add(note) savepoint.commit() - except SQLAlchemyError as exc: + except Exception as exc: savepoint.rollback() + logger.exception("Row GlobalID=%s failed", global_id_str) result.errors.append( - f"Row GlobalID={global_id_str}: {exc}" + f"Row GlobalID={global_id_str}: {type(exc).__name__}: {exc}" ) continue From 56fa0a60e77db5a4c36b982e3d4a0eab76a00b9a Mon Sep 17 00:00:00 2001 From: kbighorse <93388+kbighorse@users.noreply.github.com> Date: Sat, 28 Feb 2026 04:07:12 +0000 Subject: [PATCH 06/13] Formatting changes --- ...e5e8_add_chemistry_backfill_columns_to_.py | 234 ++++++++++++------ tests/features/environment.py | 12 +- tests/features/steps/chemistry-backfill.py | 162 ++++++------ transfers/backfill/backfill.py | 4 +- transfers/backfill/chemistry_backfill.py | 7 +- 5 files changed, 251 insertions(+), 168 deletions(-) diff --git a/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py b/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py index 11f462467..a8fb53737 100644 --- a/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py +++ b/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py @@ -5,15 +5,15 @@ Create Date: 2026-02-27 11:30:45.380002 """ + from typing import Sequence, Union from alembic import op import sqlalchemy as sa - # revision identifiers, used by Alembic. -revision: str = '545a5b77e5e8' -down_revision: Union[str, Sequence[str], None] = 'd5e6f7a8b9c0' +revision: str = "545a5b77e5e8" +down_revision: Union[str, Sequence[str], None] = "d5e6f7a8b9c0" branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None @@ -21,110 +21,184 @@ def upgrade() -> None: """Add chemistry backfill columns to observation and sample tables.""" # Observation table: 4 new columns - op.add_column('observation', sa.Column( - 'nma_pk_chemistryresults', sa.String(), nullable=True, - comment='NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key', - )) - op.add_column('observation', sa.Column( - 'detect_flag', sa.Boolean(), nullable=True, - comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier", - )) - op.add_column('observation', sa.Column( - 'uncertainty', sa.Float(), nullable=True, - comment='Measurement uncertainty for the observation value', - )) - op.add_column('observation', sa.Column( - 'analysis_agency', sa.String(), nullable=True, - comment='Agency or lab that performed the analysis', - )) + op.add_column( + "observation", + sa.Column( + "nma_pk_chemistryresults", + sa.String(), + nullable=True, + comment="NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key", + ), + ) + op.add_column( + "observation", + sa.Column( + "detect_flag", + sa.Boolean(), + nullable=True, + comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier", + ), + ) + op.add_column( + "observation", + sa.Column( + "uncertainty", + sa.Float(), + nullable=True, + comment="Measurement uncertainty for the observation value", + ), + ) + op.add_column( + "observation", + sa.Column( + "analysis_agency", + sa.String(), + nullable=True, + comment="Agency or lab that performed the analysis", + ), + ) op.create_unique_constraint( - 'uq_observation_nma_pk_chemistryresults', - 'observation', ['nma_pk_chemistryresults'], + "uq_observation_nma_pk_chemistryresults", + "observation", + ["nma_pk_chemistryresults"], ) # Observation version table (sqlalchemy-continuum) - op.add_column('observation_version', sa.Column( - 'nma_pk_chemistryresults', sa.String(), autoincrement=False, nullable=True, - comment='NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key', - )) - op.add_column('observation_version', sa.Column( - 'detect_flag', sa.Boolean(), autoincrement=False, nullable=True, - comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier", - )) - op.add_column('observation_version', sa.Column( - 'uncertainty', sa.Float(), autoincrement=False, nullable=True, - comment='Measurement uncertainty for the observation value', - )) - op.add_column('observation_version', sa.Column( - 'analysis_agency', sa.String(), autoincrement=False, nullable=True, - comment='Agency or lab that performed the analysis', - )) + op.add_column( + "observation_version", + sa.Column( + "nma_pk_chemistryresults", + sa.String(), + autoincrement=False, + nullable=True, + comment="NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key", + ), + ) + op.add_column( + "observation_version", + sa.Column( + "detect_flag", + sa.Boolean(), + autoincrement=False, + nullable=True, + comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier", + ), + ) + op.add_column( + "observation_version", + sa.Column( + "uncertainty", + sa.Float(), + autoincrement=False, + nullable=True, + comment="Measurement uncertainty for the observation value", + ), + ) + op.add_column( + "observation_version", + sa.Column( + "analysis_agency", + sa.String(), + autoincrement=False, + nullable=True, + comment="Agency or lab that performed the analysis", + ), + ) # Sample table: 3 new columns - op.add_column('sample', sa.Column( - 'nma_pk_chemistrysample', sa.String(), nullable=True, - comment='NM_Aquifer SamplePtID for chemistry samples — transfer audit key', - )) - op.add_column('sample', sa.Column( - 'volume', sa.Float(), nullable=True, - comment='Volume of the sample collected', - )) - op.add_column('sample', sa.Column( - 'volume_unit', sa.String(), nullable=True, - comment='Unit for the sample volume (e.g. mL, L)', - )) + op.add_column( + "sample", + sa.Column( + "nma_pk_chemistrysample", + sa.String(), + nullable=True, + comment="NM_Aquifer SamplePtID for chemistry samples — transfer audit key", + ), + ) + op.add_column( + "sample", + sa.Column( + "volume", + sa.Float(), + nullable=True, + comment="Volume of the sample collected", + ), + ) + op.add_column( + "sample", + sa.Column( + "volume_unit", + sa.String(), + nullable=True, + comment="Unit for the sample volume (e.g. mL, L)", + ), + ) op.create_unique_constraint( - 'uq_sample_nma_pk_chemistrysample', - 'sample', ['nma_pk_chemistrysample'], + "uq_sample_nma_pk_chemistrysample", + "sample", + ["nma_pk_chemistrysample"], ) # Drop stale thing_id column from NMA_Radionuclides (model no longer defines it; # relationships go through NMA_Chemistry_SampleInfo.thing_id instead). # Guard against environments where the column was already removed. conn = op.get_bind() - has_thing_id = conn.execute(sa.text( - "SELECT 1 FROM information_schema.columns " - "WHERE table_name = 'NMA_Radionuclides' AND column_name = 'thing_id'" - )).scalar() + has_thing_id = conn.execute( + sa.text( + "SELECT 1 FROM information_schema.columns " + "WHERE table_name = 'NMA_Radionuclides' AND column_name = 'thing_id'" + ) + ).scalar() if has_thing_id: # FK name may differ across environments; look it up dynamically. - fks = sa.inspect(conn).get_foreign_keys('NMA_Radionuclides') + fks = sa.inspect(conn).get_foreign_keys("NMA_Radionuclides") for fk in fks: - if 'thing_id' in fk['constrained_columns'] and fk.get('name'): - op.drop_constraint(fk['name'], 'NMA_Radionuclides', type_='foreignkey') - op.drop_column('NMA_Radionuclides', 'thing_id') + if "thing_id" in fk["constrained_columns"] and fk.get("name"): + op.drop_constraint(fk["name"], "NMA_Radionuclides", type_="foreignkey") + op.drop_column("NMA_Radionuclides", "thing_id") def downgrade() -> None: """Remove chemistry backfill columns.""" # Restore NMA_Radionuclides.thing_id (add nullable, backfill, then enforce) - op.add_column('NMA_Radionuclides', sa.Column( - 'thing_id', sa.Integer(), nullable=True, - )) + op.add_column( + "NMA_Radionuclides", + sa.Column( + "thing_id", + sa.Integer(), + nullable=True, + ), + ) op.execute( 'UPDATE "NMA_Radionuclides" r ' - 'SET thing_id = csi.thing_id ' + "SET thing_id = csi.thing_id " 'FROM "NMA_Chemistry_SampleInfo" csi ' - 'WHERE r.chemistry_sample_info_id = csi.id' + "WHERE r.chemistry_sample_info_id = csi.id" ) - op.alter_column('NMA_Radionuclides', 'thing_id', nullable=False) + op.alter_column("NMA_Radionuclides", "thing_id", nullable=False) op.create_foreign_key( - 'NMA_Radionuclides_thing_id_fkey', 'NMA_Radionuclides', 'thing', - ['thing_id'], ['id'], ondelete='CASCADE', + "NMA_Radionuclides_thing_id_fkey", + "NMA_Radionuclides", + "thing", + ["thing_id"], + ["id"], + ondelete="CASCADE", ) - op.drop_constraint('uq_sample_nma_pk_chemistrysample', 'sample', type_='unique') - op.drop_column('sample', 'volume_unit') - op.drop_column('sample', 'volume') - op.drop_column('sample', 'nma_pk_chemistrysample') + op.drop_constraint("uq_sample_nma_pk_chemistrysample", "sample", type_="unique") + op.drop_column("sample", "volume_unit") + op.drop_column("sample", "volume") + op.drop_column("sample", "nma_pk_chemistrysample") - op.drop_column('observation_version', 'analysis_agency') - op.drop_column('observation_version', 'uncertainty') - op.drop_column('observation_version', 'detect_flag') - op.drop_column('observation_version', 'nma_pk_chemistryresults') + op.drop_column("observation_version", "analysis_agency") + op.drop_column("observation_version", "uncertainty") + op.drop_column("observation_version", "detect_flag") + op.drop_column("observation_version", "nma_pk_chemistryresults") - op.drop_constraint('uq_observation_nma_pk_chemistryresults', 'observation', type_='unique') - op.drop_column('observation', 'analysis_agency') - op.drop_column('observation', 'uncertainty') - op.drop_column('observation', 'detect_flag') - op.drop_column('observation', 'nma_pk_chemistryresults') + op.drop_constraint( + "uq_observation_nma_pk_chemistryresults", "observation", type_="unique" + ) + op.drop_column("observation", "analysis_agency") + op.drop_column("observation", "uncertainty") + op.drop_column("observation", "detect_flag") + op.drop_column("observation", "nma_pk_chemistryresults") diff --git a/tests/features/environment.py b/tests/features/environment.py index d5c306854..731b24820 100644 --- a/tests/features/environment.py +++ b/tests/features/environment.py @@ -769,10 +769,14 @@ def after_scenario(context, scenario): # Delete Notes linked to those observations if obs_ids: - for note in session.query(Notes).filter( - Notes.target_table == "observation", - Notes.target_id.in_(obs_ids), - ).all(): + for note in ( + session.query(Notes) + .filter( + Notes.target_table == "observation", + Notes.target_id.in_(obs_ids), + ) + .all() + ): session.delete(note) # Delete the observations themselves diff --git a/tests/features/steps/chemistry-backfill.py b/tests/features/steps/chemistry-backfill.py index dfa3ea75a..5afa024cc 100644 --- a/tests/features/steps/chemistry-backfill.py +++ b/tests/features/steps/chemistry-backfill.py @@ -118,9 +118,9 @@ def _get_observation_by_globalid(session, global_id: str) -> Observation: Observation.nma_pk_chemistryresults == global_id.lower() ) ).scalar_one_or_none() - assert obs is not None, ( - f"No Observation found with nma_pk_chemistryresults='{global_id.lower()}'" - ) + assert ( + obs is not None + ), f"No Observation found with nma_pk_chemistryresults='{global_id.lower()}'" return obs @@ -171,8 +171,14 @@ def step_given_lexicon_terms_exist(context: Context): # In production these would be pre-seeded via a vocabulary # preparation step. test_analytes = ( - "GB", "Uranium", "GA", "Ra228", "Uranium-238", - "Radium-226", "Nitrate", "Unknown", + "GB", + "Uranium", + "GA", + "Ra228", + "Uranium-238", + "Radium-226", + "Nitrate", + "Unknown", ) for analyte in test_analytes: existing = session.execute( @@ -196,7 +202,10 @@ def step_given_multi_radionuclides(context: Context): """Create multiple NMA_Radionuclides rows from a horizontal Behave table.""" _ensure_backfill_tracking(context) for row in context.table: - fields = {heading: _parse_table_value(row[heading]) for heading in context.table.headings} + fields = { + heading: _parse_table_value(row[heading]) + for heading in context.table.headings + } _create_radionuclide_from_fields(context, fields) @@ -321,12 +330,13 @@ def step_when_run_backfill_again(context: Context): def step_then_observation_count(context: Context, n: int, global_id: str): with session_ctx() as session: count = session.execute( - select(Observation) - .where(Observation.nma_pk_chemistryresults == global_id.lower()) + select(Observation).where( + Observation.nma_pk_chemistryresults == global_id.lower() + ) ).all() - assert len(count) == n, ( - f"Expected {n} Observation(s) for GlobalID {global_id}, found {len(count)}" - ) + assert ( + len(count) == n + ), f"Expected {n} Observation(s) for GlobalID {global_id}, found {len(count)}" if len(count) == 1: context._last_observation = count[0][0] @@ -356,9 +366,9 @@ def step_then_observation_datetime(context: Context, date_str: str): if dt.tzinfo is not None: dt = dt.astimezone(timezone.utc) actual_date = dt.strftime("%Y-%m-%d") - assert actual_date == date_str, ( - f"Expected observation_datetime={date_str}, got {actual_date}" - ) + assert ( + actual_date == date_str + ), f"Expected observation_datetime={date_str}, got {actual_date}" @then("the Observation should set value to {value}") @@ -367,9 +377,7 @@ def step_then_observation_value(context: Context, value: str): expected = float(value) with session_ctx() as session: obs = session.merge(obs) - assert obs.value == expected, ( - f"Expected value={expected}, got {obs.value}" - ) + assert obs.value == expected, f"Expected value={expected}, got {obs.value}" @then('the Observation should set unit to "{unit}"') @@ -390,9 +398,9 @@ def step_then_parameter_exists(context: Context, name: str, matrix: str): Parameter.parameter_name == name, Parameter.matrix == matrix ) ).scalar_one_or_none() - assert param is not None, ( - f"No Parameter found with parameter_name='{name}' and matrix='{matrix}'" - ) + assert ( + param is not None + ), f"No Parameter found with parameter_name='{name}' and matrix='{matrix}'" context._last_parameter = param @@ -405,12 +413,12 @@ def step_then_observation_refs_parameter(context: Context, name: str, matrix: st obs = session.merge(obs) param = session.get(Parameter, obs.parameter_id) assert param is not None, "Observation must reference a Parameter" - assert param.parameter_name == name, ( - f"Expected parameter_name='{name}', got '{param.parameter_name}'" - ) - assert param.matrix == matrix, ( - f"Expected matrix='{matrix}', got '{param.matrix}'" - ) + assert ( + param.parameter_name == name + ), f"Expected parameter_name='{name}', got '{param.parameter_name}'" + assert ( + param.matrix == matrix + ), f"Expected matrix='{matrix}', got '{param.matrix}'" @then('the Observation should set analysis_method_name to "{method}"') @@ -418,14 +426,14 @@ def step_then_observation_analysis_method(context: Context, method: str): obs = context._last_observation with session_ctx() as session: obs = session.merge(obs) - assert obs.analysis_method_id is not None, ( - "Observation should have an analysis_method_id" - ) + assert ( + obs.analysis_method_id is not None + ), "Observation should have an analysis_method_id" am = session.get(AnalysisMethod, obs.analysis_method_id) assert am is not None, "AnalysisMethod should exist" - assert am.analysis_method_name == method, ( - f"Expected analysis_method_name='{method}', got '{am.analysis_method_name}'" - ) + assert ( + am.analysis_method_name == method + ), f"Expected analysis_method_name='{method}', got '{am.analysis_method_name}'" @then("the Observation should set uncertainty to {value}") @@ -434,9 +442,9 @@ def step_then_observation_uncertainty(context: Context, value: str): expected = float(value) with session_ctx() as session: obs = session.merge(obs) - assert obs.uncertainty == expected, ( - f"Expected uncertainty={expected}, got {obs.uncertainty}" - ) + assert ( + obs.uncertainty == expected + ), f"Expected uncertainty={expected}, got {obs.uncertainty}" @then('the Observation should set analysis_agency to "{agency}"') @@ -444,9 +452,9 @@ def step_then_observation_analysis_agency(context: Context, agency: str): obs = context._last_observation with session_ctx() as session: obs = session.merge(obs) - assert obs.analysis_agency == agency, ( - f"Expected analysis_agency='{agency}', got '{obs.analysis_agency}'" - ) + assert ( + obs.analysis_agency == agency + ), f"Expected analysis_agency='{agency}', got '{obs.analysis_agency}'" # --- GlobalID-variant steps --- @@ -484,17 +492,15 @@ def step_then_obs_by_gid_refs_thing(context: Context, global_id: str): @then( 'the Observation for GlobalID "{global_id}" should set analysis_method_name to "{method}"' ) -def step_then_obs_by_gid_analysis_method( - context: Context, global_id: str, method: str -): +def step_then_obs_by_gid_analysis_method(context: Context, global_id: str, method: str): with session_ctx() as session: obs = _get_observation_by_globalid(session, global_id) assert obs.analysis_method_id is not None am = session.get(AnalysisMethod, obs.analysis_method_id) assert am is not None - assert am.analysis_method_name == method, ( - f"Expected analysis_method_name='{method}', got '{am.analysis_method_name}'" - ) + assert ( + am.analysis_method_name == method + ), f"Expected analysis_method_name='{method}', got '{am.analysis_method_name}'" @then( @@ -511,16 +517,14 @@ def step_then_obs_by_gid_refs_parameter( assert param.matrix == matrix -@then( - 'the Observation for GlobalID "{global_id}" should set detect_flag to {flag}' -) +@then('the Observation for GlobalID "{global_id}" should set detect_flag to {flag}') def step_then_obs_by_gid_detect_flag(context: Context, global_id: str, flag: str): expected = flag.lower() == "true" with session_ctx() as session: obs = _get_observation_by_globalid(session, global_id) - assert obs.detect_flag == expected, ( - f"Expected detect_flag={expected}, got {obs.detect_flag}" - ) + assert ( + obs.detect_flag == expected + ), f"Expected detect_flag={expected}, got {obs.detect_flag}" @then( @@ -530,9 +534,9 @@ def step_then_obs_by_gid_no_extra_columns(context: Context, global_id: str): with session_ctx() as session: obs = _get_observation_by_globalid(session, global_id) for attr in ("nma_sample_point_id", "nma_object_id", "nma_wclab_id"): - assert not hasattr(obs, attr), ( - f"Observation should not have attribute '{attr}'" - ) + assert not hasattr( + obs, attr + ), f"Observation should not have attribute '{attr}'" # --- Sample volume steps --- @@ -541,27 +545,31 @@ def step_then_sample_volume(context: Context, value: str): expected = float(value) with session_ctx() as session: # Find the sample that was just modified - samples = session.execute( - select(Sample).where(Sample.volume.isnot(None)) - ).scalars().all() + samples = ( + session.execute(select(Sample).where(Sample.volume.isnot(None))) + .scalars() + .all() + ) assert len(samples) >= 1, "Expected at least one Sample with volume set" sample = samples[-1] - assert sample.volume == expected, ( - f"Expected volume={expected}, got {sample.volume}" - ) + assert ( + sample.volume == expected + ), f"Expected volume={expected}, got {sample.volume}" @then('the Sample should set volume_unit to "{unit}"') def step_then_sample_volume_unit(context: Context, unit: str): with session_ctx() as session: - samples = session.execute( - select(Sample).where(Sample.volume_unit.isnot(None)) - ).scalars().all() + samples = ( + session.execute(select(Sample).where(Sample.volume_unit.isnot(None))) + .scalars() + .all() + ) assert len(samples) >= 1, "Expected at least one Sample with volume_unit set" sample = samples[-1] - assert sample.volume_unit == unit, ( - f"Expected volume_unit='{unit}', got '{sample.volume_unit}'" - ) + assert ( + sample.volume_unit == unit + ), f"Expected volume_unit='{unit}', got '{sample.volume_unit}'" # --- Notes steps --- @@ -603,14 +611,12 @@ def step_then_notes_exist(context: Context): ) def step_then_skipped_orphans(context: Context, n: int): assert context.backfill_result is not None, "Backfill result not found on context" - assert context.backfill_result.skipped_orphans == n, ( - f"Expected {n} skipped orphans, got {context.backfill_result.skipped_orphans}" - ) + assert ( + context.backfill_result.skipped_orphans == n + ), f"Expected {n} skipped orphans, got {context.backfill_result.skipped_orphans}" -@then( - 'no Observation record should exist with nma_pk_chemistryresults "{global_id}"' -) +@then('no Observation record should exist with nma_pk_chemistryresults "{global_id}"') def step_then_no_observation(context: Context, global_id: str): with session_ctx() as session: count = session.execute( @@ -618,9 +624,9 @@ def step_then_no_observation(context: Context, global_id: str): Observation.nma_pk_chemistryresults == global_id.lower() ) ).all() - assert len(count) == 0, ( - f"Expected 0 Observations for GlobalID {global_id}, found {len(count)}" - ) + assert ( + len(count) == 0 + ), f"Expected 0 Observations for GlobalID {global_id}, found {len(count)}" # --------------------------------------------------------------------------- @@ -661,8 +667,12 @@ def _create_radionuclide_from_fields(context: Context, fields: dict): ).replace(tzinfo=timezone.utc) # Parse numeric fields - sample_value = float(fields["SampleValue"]) if fields.get("SampleValue") else None - uncertainty_val = float(fields["Uncertainty"]) if fields.get("Uncertainty") else None + sample_value = ( + float(fields["SampleValue"]) if fields.get("SampleValue") else None + ) + uncertainty_val = ( + float(fields["Uncertainty"]) if fields.get("Uncertainty") else None + ) volume_val = int(fields["Volume"]) if fields.get("Volume") else None rad = NMA_Radionuclides( diff --git a/transfers/backfill/backfill.py b/transfers/backfill/backfill.py index 1d1d4b6ce..e86f6a613 100644 --- a/transfers/backfill/backfill.py +++ b/transfers/backfill/backfill.py @@ -35,9 +35,7 @@ def run() -> None: """Execute all backfill steps in a deterministic order.""" - steps = ( - ("Radionuclides", backfill_radionuclides, "BACKFILL_RADIONUCLIDES"), - ) + steps = (("Radionuclides", backfill_radionuclides, "BACKFILL_RADIONUCLIDES"),) for name, fn, flag in steps: if not get_bool_env(flag, True): logger.info(f"Skipping backfill: {name} ({flag}=false)") diff --git a/transfers/backfill/chemistry_backfill.py b/transfers/backfill/chemistry_backfill.py index 472d15409..058e3ecfe 100644 --- a/transfers/backfill/chemistry_backfill.py +++ b/transfers/backfill/chemistry_backfill.py @@ -90,9 +90,7 @@ def _get_or_create_analysis_method( ) -> AnalysisMethod: """Return existing AnalysisMethod or create a new one.""" am = session.execute( - select(AnalysisMethod).where( - AnalysisMethod.analysis_method_code == method_code - ) + select(AnalysisMethod).where(AnalysisMethod.analysis_method_code == method_code) ).scalar_one_or_none() if am is None: am = AnalysisMethod( @@ -141,8 +139,7 @@ def _backfill_radionuclides_impl(session: Session) -> BackfillResult: legacy_rows = session.execute( select(NMA_Radionuclides, NMA_Chemistry_SampleInfo.nma_sample_pt_id).join( NMA_Chemistry_SampleInfo, - NMA_Radionuclides.chemistry_sample_info_id - == NMA_Chemistry_SampleInfo.id, + NMA_Radionuclides.chemistry_sample_info_id == NMA_Chemistry_SampleInfo.id, ) ).all() From c1d1b3f1d6ff7f437e8e33c2d98b374d29b2e8e6 Mon Sep 17 00:00:00 2001 From: Kimball Bighorse Date: Sat, 28 Feb 2026 00:46:25 -0800 Subject: [PATCH 07/13] fix: scope sample assertions, normalize existing_keys case, broaden row catch - Volume/volume_unit test assertions query by scenario sample_ids - existing_keys set lowercases DB values to match global_id_str normalization - Per-row catch broadened to Exception with logger.exception() for tracebacks - Analysis method tracking scoped to scenario samples - Migration FK drop guarded against missing name entries - Resolve stash merge conflicts in sample volume steps Refs #558 Co-Authored-By: Claude Opus 4.6 --- tests/features/steps/chemistry-backfill.py | 27 ++++++++++------------ transfers/backfill/chemistry_backfill.py | 2 +- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/tests/features/steps/chemistry-backfill.py b/tests/features/steps/chemistry-backfill.py index 5afa024cc..0c317cbd9 100644 --- a/tests/features/steps/chemistry-backfill.py +++ b/tests/features/steps/chemistry-backfill.py @@ -543,15 +543,13 @@ def step_then_obs_by_gid_no_extra_columns(context: Context, global_id: str): @then("the Sample should set volume to {value}") def step_then_sample_volume(context: Context, value: str): expected = float(value) + scenario_sample_ids = context._backfill_created.get("sample_ids", []) + assert scenario_sample_ids, "No sample_ids tracked for this scenario" with session_ctx() as session: - # Find the sample that was just modified - samples = ( - session.execute(select(Sample).where(Sample.volume.isnot(None))) - .scalars() - .all() - ) - assert len(samples) >= 1, "Expected at least one Sample with volume set" - sample = samples[-1] + sample = session.execute( + select(Sample).where(Sample.id.in_(scenario_sample_ids)) + ).scalars().first() + assert sample is not None, "No Sample found for this scenario" assert ( sample.volume == expected ), f"Expected volume={expected}, got {sample.volume}" @@ -559,14 +557,13 @@ def step_then_sample_volume(context: Context, value: str): @then('the Sample should set volume_unit to "{unit}"') def step_then_sample_volume_unit(context: Context, unit: str): + scenario_sample_ids = context._backfill_created.get("sample_ids", []) + assert scenario_sample_ids, "No sample_ids tracked for this scenario" with session_ctx() as session: - samples = ( - session.execute(select(Sample).where(Sample.volume_unit.isnot(None))) - .scalars() - .all() - ) - assert len(samples) >= 1, "Expected at least one Sample with volume_unit set" - sample = samples[-1] + sample = session.execute( + select(Sample).where(Sample.id.in_(scenario_sample_ids)) + ).scalars().first() + assert sample is not None, "No Sample found for this scenario" assert ( sample.volume_unit == unit ), f"Expected volume_unit='{unit}', got '{sample.volume_unit}'" diff --git a/transfers/backfill/chemistry_backfill.py b/transfers/backfill/chemistry_backfill.py index 058e3ecfe..df01284b5 100644 --- a/transfers/backfill/chemistry_backfill.py +++ b/transfers/backfill/chemistry_backfill.py @@ -127,7 +127,7 @@ def _backfill_radionuclides_impl(session: Session) -> BackfillResult: # Track existing observation keys so we know insert vs update existing_keys = set( - row[0] + row[0].lower() if row[0] else row[0] for row in session.execute( select(Observation.nma_pk_chemistryresults).where( Observation.nma_pk_chemistryresults.isnot(None) From 848524ab933952f411cbc0a1e8ad1f8b40102449 Mon Sep 17 00:00:00 2001 From: kbighorse <93388+kbighorse@users.noreply.github.com> Date: Sat, 28 Feb 2026 08:45:55 +0000 Subject: [PATCH 08/13] Formatting changes --- tests/features/steps/chemistry-backfill.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/features/steps/chemistry-backfill.py b/tests/features/steps/chemistry-backfill.py index 0c317cbd9..25e5426b9 100644 --- a/tests/features/steps/chemistry-backfill.py +++ b/tests/features/steps/chemistry-backfill.py @@ -546,9 +546,11 @@ def step_then_sample_volume(context: Context, value: str): scenario_sample_ids = context._backfill_created.get("sample_ids", []) assert scenario_sample_ids, "No sample_ids tracked for this scenario" with session_ctx() as session: - sample = session.execute( - select(Sample).where(Sample.id.in_(scenario_sample_ids)) - ).scalars().first() + sample = ( + session.execute(select(Sample).where(Sample.id.in_(scenario_sample_ids))) + .scalars() + .first() + ) assert sample is not None, "No Sample found for this scenario" assert ( sample.volume == expected @@ -560,9 +562,11 @@ def step_then_sample_volume_unit(context: Context, unit: str): scenario_sample_ids = context._backfill_created.get("sample_ids", []) assert scenario_sample_ids, "No sample_ids tracked for this scenario" with session_ctx() as session: - sample = session.execute( - select(Sample).where(Sample.id.in_(scenario_sample_ids)) - ).scalars().first() + sample = ( + session.execute(select(Sample).where(Sample.id.in_(scenario_sample_ids))) + .scalars() + .first() + ) assert sample is not None, "No Sample found for this scenario" assert ( sample.volume_unit == unit From ac44f6bba768ef6ac9f4c2296ce4b3e57531b62f Mon Sep 17 00:00:00 2001 From: Kimball Bighorse Date: Mon, 2 Mar 2026 09:36:18 -0800 Subject: [PATCH 09/13] fix(tests): scope analysis method cleanup to only backfill-created rows Snapshot AnalysisMethod IDs before/after backfill to track only actually-created methods, preventing deletion of pre-existing rows. Co-Authored-By: Claude Opus 4.6 --- tests/features/steps/chemistry-backfill.py | 37 ++++++++++++---------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/tests/features/steps/chemistry-backfill.py b/tests/features/steps/chemistry-backfill.py index 25e5426b9..3fcab8951 100644 --- a/tests/features/steps/chemistry-backfill.py +++ b/tests/features/steps/chemistry-backfill.py @@ -283,24 +283,25 @@ def step_given_sample_with_chemistry_key(context: Context, sample_pt_id: str): # --------------------------------------------------------------------------- # WHEN steps # --------------------------------------------------------------------------- -def _track_created_analysis_methods(context: Context): - """Record AnalysisMethod IDs created by the backfill for cleanup.""" +def _snapshot_analysis_method_ids() -> set[int]: + """Return the set of all AnalysisMethod IDs that exist before the backfill.""" + with session_ctx() as session: + return { + row[0] + for row in session.execute(select(AnalysisMethod.id)).all() + } + + +def _track_created_analysis_methods(context: Context, pre_ids: set[int]): + """Record only AnalysisMethod IDs that were actually created by the backfill.""" _ensure_backfill_tracking(context) - scenario_sample_ids = context._backfill_created.get("sample_ids", []) - if not scenario_sample_ids: - return with session_ctx() as session: - # Scope to observations linked to this scenario's samples only - obs_am_ids = [ + post_ids = { row[0] - for row in session.execute( - select(Observation.analysis_method_id).where( - Observation.sample_id.in_(scenario_sample_ids), - Observation.analysis_method_id.isnot(None), - ) - ).all() - ] - context._backfill_created["analysis_method_ids"] = list(set(obs_am_ids)) + for row in session.execute(select(AnalysisMethod.id)).all() + } + new_ids = post_ids - pre_ids + context._backfill_created["analysis_method_ids"] = list(new_ids) @when("I run the Radionuclides backfill job") @@ -308,8 +309,9 @@ def step_when_run_backfill(context: Context): """Execute the backfill and store the result on context.""" from transfers.backfill.chemistry_backfill import backfill_radionuclides + pre_ids = _snapshot_analysis_method_ids() context.backfill_result = backfill_radionuclides() - _track_created_analysis_methods(context) + _track_created_analysis_methods(context, pre_ids) @when("I run the Radionuclides backfill job again") @@ -317,8 +319,9 @@ def step_when_run_backfill_again(context: Context): """Re-run to verify idempotency.""" from transfers.backfill.chemistry_backfill import backfill_radionuclides + pre_ids = _snapshot_analysis_method_ids() context.backfill_result = backfill_radionuclides() - _track_created_analysis_methods(context) + _track_created_analysis_methods(context, pre_ids) # --------------------------------------------------------------------------- From 10f3e413f7239b0ddb7b79301870289e098d5fd0 Mon Sep 17 00:00:00 2001 From: Kimball Bighorse Date: Mon, 2 Mar 2026 09:36:29 -0800 Subject: [PATCH 10/13] fix(tests): clear backfill tracking after scenario cleanup Delete context._backfill_created in finally block to prevent ID accumulation across scenarios causing redundant deletes. Co-Authored-By: Claude Opus 4.6 --- tests/features/environment.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/features/environment.py b/tests/features/environment.py index 731b24820..7aa818488 100644 --- a/tests/features/environment.py +++ b/tests/features/environment.py @@ -848,6 +848,8 @@ def after_scenario(context, scenario): scenario.name, exc_info=True, ) + finally: + del context._backfill_created if not get_bool_env("DROP_AND_REBUILD_DB"): return From 1f09e20217bc499dddfc24ee63c15e1aeab29411 Mon Sep 17 00:00:00 2001 From: Kimball Bighorse Date: Mon, 2 Mar 2026 10:28:00 -0800 Subject: [PATCH 11/13] fix: deterministic volume handling and explicit CLI arg rejection - Add ORDER BY id to legacy query so lowest-PK row wins when multiple radionuclide rows map to the same Sample (volume/volume_unit) - Skip overwrites with warning when a conflict is detected - Reject unknown CLI args in backfill entrypoint instead of silently ignoring them Co-Authored-By: Claude Opus 4.6 --- transfers/backfill/backfill.py | 8 ++++++ transfers/backfill/chemistry_backfill.py | 36 ++++++++++++++++++++---- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/transfers/backfill/backfill.py b/transfers/backfill/backfill.py index e86f6a613..304846a79 100644 --- a/transfers/backfill/backfill.py +++ b/transfers/backfill/backfill.py @@ -53,6 +53,14 @@ def run() -> None: if __name__ == "__main__": + if len(sys.argv) > 1: + logger.error( + "Unknown arguments: %s. " + "CLI options (--batch-size) were removed; " + "use BACKFILL_* env vars to control execution.", + " ".join(sys.argv[1:]), + ) + sys.exit(2) try: run() except Exception: diff --git a/transfers/backfill/chemistry_backfill.py b/transfers/backfill/chemistry_backfill.py index df01284b5..471e2cc3e 100644 --- a/transfers/backfill/chemistry_backfill.py +++ b/transfers/backfill/chemistry_backfill.py @@ -135,12 +135,16 @@ def _backfill_radionuclides_impl(session: Session) -> BackfillResult: ).all() ) - # Query all radionuclides joined with sample info for nma_sample_pt_id + # Query all radionuclides joined with sample info for nma_sample_pt_id. + # ORDER BY id for deterministic processing — when multiple rows map to + # the same Sample, the lowest-PK row's volume/volume_unit wins. legacy_rows = session.execute( - select(NMA_Radionuclides, NMA_Chemistry_SampleInfo.nma_sample_pt_id).join( + select(NMA_Radionuclides, NMA_Chemistry_SampleInfo.nma_sample_pt_id) + .join( NMA_Chemistry_SampleInfo, NMA_Radionuclides.chemistry_sample_info_id == NMA_Chemistry_SampleInfo.id, ) + .order_by(NMA_Radionuclides.id) ).all() logger.info("Processing %d legacy Radionuclides rows", len(legacy_rows)) @@ -238,14 +242,36 @@ def _backfill_radionuclides_impl(session: Session) -> BackfillResult: result.inserted += 1 existing_keys.add(global_id_str) - # Update Sample volume/volume_unit if present on legacy row + # Update Sample volume/volume_unit if present on legacy row. + # Only set when the sample doesn't already have a value to avoid + # nondeterministic overwrites when multiple rows share a sample. if row.volume is not None or row.volume_unit is not None: sample = session.get(Sample, sample_id) if sample: if row.volume is not None: - sample.volume = float(row.volume) + if sample.volume is None: + sample.volume = float(row.volume) + elif float(sample.volume) != float(row.volume): + logger.warning( + "Sample id=%s already has volume=%s, " + "skipping conflicting value %s from GlobalID=%s", + sample_id, + sample.volume, + row.volume, + global_id_str, + ) if row.volume_unit is not None: - sample.volume_unit = row.volume_unit + if sample.volume_unit is None: + sample.volume_unit = row.volume_unit + elif sample.volume_unit != row.volume_unit: + logger.warning( + "Sample id=%s already has volume_unit=%s, " + "skipping conflicting value %s from GlobalID=%s", + sample_id, + sample.volume_unit, + row.volume_unit, + global_id_str, + ) # Create Notes if present if row.notes: From ae60c1f1f0bc2e6fb44f2e125a7bd4227462df61 Mon Sep 17 00:00:00 2001 From: kbighorse <93388+kbighorse@users.noreply.github.com> Date: Mon, 2 Mar 2026 18:29:49 +0000 Subject: [PATCH 12/13] Formatting changes --- tests/features/steps/chemistry-backfill.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/tests/features/steps/chemistry-backfill.py b/tests/features/steps/chemistry-backfill.py index 3fcab8951..d588da780 100644 --- a/tests/features/steps/chemistry-backfill.py +++ b/tests/features/steps/chemistry-backfill.py @@ -286,20 +286,14 @@ def step_given_sample_with_chemistry_key(context: Context, sample_pt_id: str): def _snapshot_analysis_method_ids() -> set[int]: """Return the set of all AnalysisMethod IDs that exist before the backfill.""" with session_ctx() as session: - return { - row[0] - for row in session.execute(select(AnalysisMethod.id)).all() - } + return {row[0] for row in session.execute(select(AnalysisMethod.id)).all()} def _track_created_analysis_methods(context: Context, pre_ids: set[int]): """Record only AnalysisMethod IDs that were actually created by the backfill.""" _ensure_backfill_tracking(context) with session_ctx() as session: - post_ids = { - row[0] - for row in session.execute(select(AnalysisMethod.id)).all() - } + post_ids = {row[0] for row in session.execute(select(AnalysisMethod.id)).all()} new_ids = post_ids - pre_ids context._backfill_created["analysis_method_ids"] = list(new_ids) From 09c31ff7d6e3e0aaf306ac83dd8aac4f97d0ae40 Mon Sep 17 00:00:00 2001 From: Kimball Bighorse Date: Mon, 2 Mar 2026 11:33:41 -0800 Subject: [PATCH 13/13] fix: make pg_cron optional for local development Skip pg_cron extension creation gracefully when unavailable instead of hard-failing, unblocking local dev and test environments. Refs #576 Co-Authored-By: Claude Opus 4.6 --- core/initializers.py | 8 ++------ db/initialization.py | 8 ++------ 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/core/initializers.py b/core/initializers.py index 13a066fd3..c3a32d6f4 100644 --- a/core/initializers.py +++ b/core/initializers.py @@ -73,12 +73,8 @@ def erase_and_rebuild_db(): ")" ) ).scalar() - if not pg_cron_available: - raise RuntimeError( - "Cannot erase and rebuild database: pg_cron extension is not " - "available on this PostgreSQL server." - ) - session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron")) + if pg_cron_available: + session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron")) session.commit() Base.metadata.drop_all(session.bind) Base.metadata.create_all(session.bind) diff --git a/db/initialization.py b/db/initialization.py index a9c5516d1..ea3b0c88b 100644 --- a/db/initialization.py +++ b/db/initialization.py @@ -69,12 +69,8 @@ def recreate_public_schema(session: Session) -> None: ")" ) ).scalar() - if not pg_cron_available: - raise RuntimeError( - "Cannot initialize database schema: pg_cron extension is not available " - "on this PostgreSQL server." - ) - session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron")) + if pg_cron_available: + session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron")) session.execute(APP_READ_GRANT_SQL) grant_app_read_members(session) session.commit()