diff --git a/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py b/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py new file mode 100644 index 00000000..a8fb5373 --- /dev/null +++ b/alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py @@ -0,0 +1,204 @@ +"""add chemistry backfill columns to observation and sample + +Revision ID: 545a5b77e5e8 +Revises: d5e6f7a8b9c0 +Create Date: 2026-02-27 11:30:45.380002 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision: str = "545a5b77e5e8" +down_revision: Union[str, Sequence[str], None] = "d5e6f7a8b9c0" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Add chemistry backfill columns to observation and sample tables.""" + # Observation table: 4 new columns + op.add_column( + "observation", + sa.Column( + "nma_pk_chemistryresults", + sa.String(), + nullable=True, + comment="NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key", + ), + ) + op.add_column( + "observation", + sa.Column( + "detect_flag", + sa.Boolean(), + nullable=True, + comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier", + ), + ) + op.add_column( + "observation", + sa.Column( + "uncertainty", + sa.Float(), + nullable=True, + comment="Measurement uncertainty for the observation value", + ), + ) + op.add_column( + "observation", + sa.Column( + "analysis_agency", + sa.String(), + nullable=True, + comment="Agency or lab that performed the analysis", + ), + ) + op.create_unique_constraint( + "uq_observation_nma_pk_chemistryresults", + "observation", + ["nma_pk_chemistryresults"], + ) + + # Observation version table (sqlalchemy-continuum) + op.add_column( + "observation_version", + sa.Column( + "nma_pk_chemistryresults", + sa.String(), + autoincrement=False, + nullable=True, + comment="NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key", + ), + ) + op.add_column( + "observation_version", + sa.Column( + "detect_flag", + sa.Boolean(), + autoincrement=False, + nullable=True, + comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier", + ), + ) + op.add_column( + "observation_version", + sa.Column( + "uncertainty", + sa.Float(), + autoincrement=False, + nullable=True, + comment="Measurement uncertainty for the observation value", + ), + ) + op.add_column( + "observation_version", + sa.Column( + "analysis_agency", + sa.String(), + autoincrement=False, + nullable=True, + comment="Agency or lab that performed the analysis", + ), + ) + + # Sample table: 3 new columns + op.add_column( + "sample", + sa.Column( + "nma_pk_chemistrysample", + sa.String(), + nullable=True, + comment="NM_Aquifer SamplePtID for chemistry samples — transfer audit key", + ), + ) + op.add_column( + "sample", + sa.Column( + "volume", + sa.Float(), + nullable=True, + comment="Volume of the sample collected", + ), + ) + op.add_column( + "sample", + sa.Column( + "volume_unit", + sa.String(), + nullable=True, + comment="Unit for the sample volume (e.g. mL, L)", + ), + ) + op.create_unique_constraint( + "uq_sample_nma_pk_chemistrysample", + "sample", + ["nma_pk_chemistrysample"], + ) + + # Drop stale thing_id column from NMA_Radionuclides (model no longer defines it; + # relationships go through NMA_Chemistry_SampleInfo.thing_id instead). + # Guard against environments where the column was already removed. + conn = op.get_bind() + has_thing_id = conn.execute( + sa.text( + "SELECT 1 FROM information_schema.columns " + "WHERE table_name = 'NMA_Radionuclides' AND column_name = 'thing_id'" + ) + ).scalar() + if has_thing_id: + # FK name may differ across environments; look it up dynamically. + fks = sa.inspect(conn).get_foreign_keys("NMA_Radionuclides") + for fk in fks: + if "thing_id" in fk["constrained_columns"] and fk.get("name"): + op.drop_constraint(fk["name"], "NMA_Radionuclides", type_="foreignkey") + op.drop_column("NMA_Radionuclides", "thing_id") + + +def downgrade() -> None: + """Remove chemistry backfill columns.""" + # Restore NMA_Radionuclides.thing_id (add nullable, backfill, then enforce) + op.add_column( + "NMA_Radionuclides", + sa.Column( + "thing_id", + sa.Integer(), + nullable=True, + ), + ) + op.execute( + 'UPDATE "NMA_Radionuclides" r ' + "SET thing_id = csi.thing_id " + 'FROM "NMA_Chemistry_SampleInfo" csi ' + "WHERE r.chemistry_sample_info_id = csi.id" + ) + op.alter_column("NMA_Radionuclides", "thing_id", nullable=False) + op.create_foreign_key( + "NMA_Radionuclides_thing_id_fkey", + "NMA_Radionuclides", + "thing", + ["thing_id"], + ["id"], + ondelete="CASCADE", + ) + + op.drop_constraint("uq_sample_nma_pk_chemistrysample", "sample", type_="unique") + op.drop_column("sample", "volume_unit") + op.drop_column("sample", "volume") + op.drop_column("sample", "nma_pk_chemistrysample") + + op.drop_column("observation_version", "analysis_agency") + op.drop_column("observation_version", "uncertainty") + op.drop_column("observation_version", "detect_flag") + op.drop_column("observation_version", "nma_pk_chemistryresults") + + op.drop_constraint( + "uq_observation_nma_pk_chemistryresults", "observation", type_="unique" + ) + op.drop_column("observation", "analysis_agency") + op.drop_column("observation", "uncertainty") + op.drop_column("observation", "detect_flag") + op.drop_column("observation", "nma_pk_chemistryresults") diff --git a/core/initializers.py b/core/initializers.py index 13a066fd..c3a32d6f 100644 --- a/core/initializers.py +++ b/core/initializers.py @@ -73,12 +73,8 @@ def erase_and_rebuild_db(): ")" ) ).scalar() - if not pg_cron_available: - raise RuntimeError( - "Cannot erase and rebuild database: pg_cron extension is not " - "available on this PostgreSQL server." - ) - session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron")) + if pg_cron_available: + session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron")) session.commit() Base.metadata.drop_all(session.bind) Base.metadata.create_all(session.bind) diff --git a/core/lexicon.json b/core/lexicon.json index 32757116..c5512cc2 100644 --- a/core/lexicon.json +++ b/core/lexicon.json @@ -857,6 +857,13 @@ "term": "mg/L", "definition": "Milligrams per Liter" }, + { + "categories": [ + "unit" + ], + "term": "pCi/L", + "definition": "Picocuries per Liter" + }, { "categories": [ "unit" @@ -8213,6 +8220,13 @@ "term": "Site Notes (legacy)", "definition": "Legacy site notes field from WaterLevels" }, + { + "categories": [ + "note_type" + ], + "term": "Chemistry Observation", + "definition": "Notes from chemistry observation results" + }, { "categories": [ "well_pump_type" diff --git a/db/initialization.py b/db/initialization.py index a9c5516d..ea3b0c88 100644 --- a/db/initialization.py +++ b/db/initialization.py @@ -69,12 +69,8 @@ def recreate_public_schema(session: Session) -> None: ")" ) ).scalar() - if not pg_cron_available: - raise RuntimeError( - "Cannot initialize database schema: pg_cron extension is not available " - "on this PostgreSQL server." - ) - session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron")) + if pg_cron_available: + session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron")) session.execute(APP_READ_GRANT_SQL) grant_app_read_members(session) session.commit() diff --git a/db/observation.py b/db/observation.py index d716f908..cdda1cdf 100644 --- a/db/observation.py +++ b/db/observation.py @@ -37,6 +37,25 @@ class Observation(Base, AutoBaseMixin, ReleaseMixin): # NM_Aquifer fields for audits nma_pk_waterlevels: Mapped[str] = mapped_column(nullable=True) + nma_pk_chemistryresults: Mapped[str] = mapped_column( + nullable=True, + unique=True, + comment="NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key", + ) + + # Chemistry-specific columns + detect_flag: Mapped[bool] = mapped_column( + nullable=True, + comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier", + ) + uncertainty: Mapped[float] = mapped_column( + nullable=True, + comment="Measurement uncertainty for the observation value", + ) + analysis_agency: Mapped[str] = mapped_column( + nullable=True, + comment="Agency or lab that performed the analysis", + ) # --- Foreign Keys --- sample_id: Mapped[int] = mapped_column( diff --git a/db/sample.py b/db/sample.py index cdb65b68..32c17c27 100644 --- a/db/sample.py +++ b/db/sample.py @@ -89,6 +89,21 @@ class Sample(Base, AutoBaseMixin, ReleaseMixin): nullable=True, comment="NM_Aquifer primary key for waterlevels - to be used for transfer audits", ) + nma_pk_chemistrysample: Mapped[str] = mapped_column( + nullable=True, + unique=True, + comment="NM_Aquifer SamplePtID for chemistry samples — transfer audit key", + ) + + # Chemistry sample attributes + volume: Mapped[float] = mapped_column( + nullable=True, + comment="Volume of the sample collected", + ) + volume_unit: Mapped[str] = mapped_column( + nullable=True, + comment="Unit for the sample volume (e.g. mL, L)", + ) # --- Relationship Definitions --- field_activity: Mapped["FieldActivity"] = relationship(back_populates="samples") diff --git a/run_backfill.sh b/run_backfill.sh index 984a9fe2..f316662e 100755 --- a/run_backfill.sh +++ b/run_backfill.sh @@ -6,7 +6,7 @@ # e.g. state, county, quad_name,etc. It will also be used to handle data refactors/corrections in the future. # Load environment variables from .env and run the staging backfill. -# Usage: ./run_backfill.sh [--batch-size N] +# Usage: ./run_backfill.sh # github workflow equivalent: for reference only #- name: Run backfill script on staging database @@ -38,5 +38,4 @@ set +a uv run alembic upgrade head -# Forward any args (e.g., --batch-size 500) -python -m transfers.backfill.backfill "$@" +python -m transfers.backfill.backfill diff --git a/schemas/observation.py b/schemas/observation.py index 6f645b13..fe6585bd 100644 --- a/schemas/observation.py +++ b/schemas/observation.py @@ -112,6 +112,10 @@ class BaseObservationResponse(BaseResponseModel): value: float | None unit: Unit nma_data_quality: str | None = None + nma_pk_chemistryresults: str | None = None + detect_flag: bool | None = None + uncertainty: float | None = None + analysis_agency: str | None = None class GroundwaterLevelObservationResponse(BaseObservationResponse): diff --git a/schemas/sample.py b/schemas/sample.py index 8dce646b..c9056c30 100644 --- a/schemas/sample.py +++ b/schemas/sample.py @@ -139,6 +139,9 @@ class SampleResponse(BaseResponseModel): notes: str | None depth_top: float | None depth_bottom: float | None + nma_pk_chemistrysample: str | None = None + volume: float | None = None + volume_unit: str | None = None # ============= EOF ============================================= diff --git a/tests/features/environment.py b/tests/features/environment.py index 4f3a6d2b..7aa81848 100644 --- a/tests/features/environment.py +++ b/tests/features/environment.py @@ -51,6 +51,11 @@ Sample, Base, ) +from db.analysis_method import AnalysisMethod +from db.field import FieldEvent, FieldActivity +from db.nma_legacy import NMA_Radionuclides, NMA_Chemistry_SampleInfo +from db.notes import Notes +from db.observation import Observation from db.engine import session_ctx from db.initialization import recreate_public_schema, sync_search_vector_triggers from services.util import get_bool_env @@ -741,15 +746,118 @@ def before_scenario(context, scenario): def after_scenario(context, scenario): + # Chemistry backfill cleanup — runs regardless of DROP_AND_REBUILD_DB + # because the backfill steps create their own fixture data. + if hasattr(context, "_backfill_created"): + try: + with session_ctx() as session: + created = context._backfill_created + + # Delete in FK order: Notes → Observations → Samples → FieldActivities → FieldEvents → NMA rows + # Scope to observations linked to this scenario's samples + scenario_sample_ids = created.get("sample_ids", []) + obs_ids = [] + if scenario_sample_ids: + obs_ids = [ + row[0] + for row in session.execute( + select(Observation.id).where( + Observation.sample_id.in_(scenario_sample_ids) + ) + ).all() + ] + + # Delete Notes linked to those observations + if obs_ids: + for note in ( + session.query(Notes) + .filter( + Notes.target_table == "observation", + Notes.target_id.in_(obs_ids), + ) + .all() + ): + session.delete(note) + + # Delete the observations themselves + for oid in obs_ids: + obs = session.get(Observation, oid) + if obs: + session.delete(obs) + + # Delete NMA_Radionuclides created in this scenario + for rid in created.get("nma_radionuclide_ids", []): + row = session.get(NMA_Radionuclides, rid) + if row: + session.delete(row) + + # Delete NMA_Chemistry_SampleInfo created in this scenario + for sid in created.get("nma_sampleinfo_ids", []): + row = session.get(NMA_Chemistry_SampleInfo, sid) + if row: + session.delete(row) + + # Delete Samples (cascades to observations already deleted) + for sid in created.get("sample_ids", []): + row = session.get(Sample, sid) + if row: + session.delete(row) + + # Delete FieldActivities + for faid in created.get("field_activity_ids", []): + row = session.get(FieldActivity, faid) + if row: + session.delete(row) + + # Delete FieldEvents + for feid in created.get("field_event_ids", []): + row = session.get(FieldEvent, feid) + if row: + session.delete(row) + + # Clean up AnalysisMethods created during this scenario's backfill + for amid in created.get("analysis_method_ids", []): + row = session.get(AnalysisMethod, amid) + if row: + session.delete(row) + + # Delete Things (wells) created by _ensure_well + for wid in created.get("well_ids", []): + # Delete LocationThingAssociation first + session.execute( + LocationThingAssociation.__table__.delete().where( + LocationThingAssociation.thing_id == wid + ) + ) + row = session.get(Thing, wid) + if row: + session.delete(row) + + # Delete Locations created by _ensure_well + for lid in created.get("location_ids", []): + row = session.get(Location, lid) + if row: + session.delete(row) + + session.commit() + except Exception: + import logging + + logging.getLogger(__name__).error( + "Chemistry backfill cleanup failed for scenario '%s'", + scenario.name, + exc_info=True, + ) + finally: + del context._backfill_created + if not get_bool_env("DROP_AND_REBUILD_DB"): return - # runs after EVERY scenario - # e.g. clean up temp files, close db sessions + # runs after EVERY scenario (only when DROP_AND_REBUILD_DB is set) if scenario.name.startswith( "Successfully upload and associate assets from a valid manifest" ): - # delete all the assets uploaded for this scenario with session_ctx() as session: for uri in context.uris: sql = select(Asset).where(Asset.uri == uri) @@ -757,7 +865,6 @@ def after_scenario(context, scenario): session.delete(asset) session.commit() elif "cleanup_samples" in scenario.tags: - # delete all samples created during happy path tests with session_ctx() as session: samples = session.query(Sample).all() for sample in samples: diff --git a/tests/features/nma-chemistry-radionuclides-refactor.feature b/tests/features/nma-chemistry-radionuclides-refactor.feature index 060407e4..d1c1840f 100644 --- a/tests/features/nma-chemistry-radionuclides-refactor.feature +++ b/tests/features/nma-chemistry-radionuclides-refactor.feature @@ -78,6 +78,7 @@ Feature: Refactor legacy Radionuclides into the Ocotillo schema via backfill job | 0C354D8D-5404-41CE-9C95-002213371C4F | 77F1E3CF-A961-440E-966C-DD2E3675044B | GB | 5 | pCi/L| 2005-01-18 | E900.0 | | 095DA2E3-79E3-4BF2-B096-025C6D9A64B7 | BC50F55E-5BF1-471D-931D-03501081B4FD | Ra228 | 2.6 | pCi/L| 2003-11-26 | EPA 904.0 Mod | And a Sample record exists with nma_pk_chemistrysample "77F1E3CF-A961-440E-966C-DD2E3675044B" + And a Sample record exists with nma_pk_chemistrysample "BC50F55E-5BF1-471D-931D-03501081B4FD" When I run the Radionuclides backfill job Then the Observation for GlobalID "0C354D8D-5404-41CE-9C95-002213371C4F" should set analysis_method_name to "E900.0" And the Observation for GlobalID "095DA2E3-79E3-4BF2-B096-025C6D9A64B7" should set analysis_method_name to "EPA 904.0 Mod" @@ -123,6 +124,7 @@ Feature: Refactor legacy Radionuclides into the Ocotillo schema via backfill job | field | value | | GlobalID | 76F3A993-A29B-413B-83E0-00ADF51D15A2 | | SamplePtID | 7758D992-0394-42B1-BE96-734FCACB6412 | + | Analyte | Radium-226 | | SamplePointID| EB-490A | | OBJECTID | 333 | | WCLab_ID | null | diff --git a/tests/features/steps/chemistry-backfill.py b/tests/features/steps/chemistry-backfill.py new file mode 100644 index 00000000..d588da78 --- /dev/null +++ b/tests/features/steps/chemistry-backfill.py @@ -0,0 +1,701 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +BDD step definitions for chemistry backfill features. + +These steps are designed to be reusable across all chemistry types +(Radionuclides, MajorChemistry, MinorTraceChemistry, FieldParameters). +""" + +import uuid +from datetime import datetime, timezone + +from behave import given, when, then +from behave.runner import Context +from sqlalchemy import select + +from db import Location, Thing, LocationThingAssociation +from db.analysis_method import AnalysisMethod +from db.engine import session_ctx +from db.field import FieldEvent, FieldActivity +from db.nma_legacy import NMA_Chemistry_SampleInfo, NMA_Radionuclides +from db.notes import Notes +from db.observation import Observation +from db.parameter import Parameter +from db.sample import Sample + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- +def _ensure_backfill_tracking(context: Context): + """Initialize per-scenario tracking lists for cleanup.""" + if not hasattr(context, "_backfill_created"): + context._backfill_created = { + "observation_ids": [], + "note_ids": [], + "analysis_method_ids": [], + "sample_ids": [], + "field_activity_ids": [], + "field_event_ids": [], + "nma_radionuclide_ids": [], + "nma_sampleinfo_ids": [], + "location_ids": [], + "well_ids": [], + } + + +def _ensure_well(context: Context): + """Ensure context.objects['wells'] has at least one well. + + When running without DROP_AND_REBUILD_DB the before_all fixture doesn't + create wells. This helper creates a minimal Location → Thing chain so + the chemistry-backfill steps can link NMA_Chemistry_SampleInfo rows to a + Thing. + """ + if not hasattr(context, "objects"): + context.objects = {} + if context.objects.get("wells"): + return # already have one + + _ensure_backfill_tracking(context) + with session_ctx() as session: + loc = Location( + point="POINT(-107.949533 33.809665)", + elevation=2464.9, + release_status="draft", + ) + session.add(loc) + session.flush() + context._backfill_created["location_ids"].append(loc.id) + + well = Thing( + name="CHEM-TEST-0001", + first_visit_date="2023-03-03", + thing_type="water well", + release_status="draft", + well_depth=10, + hole_depth=10, + ) + session.add(well) + session.flush() + context._backfill_created["well_ids"].append(well.id) + + assoc = LocationThingAssociation(location=loc, thing=well) + assoc.effective_start = "2025-02-01T00:00:00Z" + session.add(assoc) + session.commit() + session.refresh(well) + + if "wells" not in context.objects: + context.objects["wells"] = [] + context.objects["wells"].append(well) + + +def _parse_table_value(raw: str): + """Parse a table cell value, handling 'null' and numeric coercion.""" + if raw.strip().lower() == "null": + return None + return raw.strip() + + +def _get_observation_by_globalid(session, global_id: str) -> Observation: + obs = session.execute( + select(Observation).where( + Observation.nma_pk_chemistryresults == global_id.lower() + ) + ).scalar_one_or_none() + assert ( + obs is not None + ), f"No Observation found with nma_pk_chemistryresults='{global_id.lower()}'" + return obs + + +# --------------------------------------------------------------------------- +# GIVEN steps +# --------------------------------------------------------------------------- +@given("a database session is available") +def step_given_db_session(context: Context): + """Verify session_ctx works and init context attributes.""" + _ensure_backfill_tracking(context) + with session_ctx() as session: + assert session is not None, "Database session should be available" + context.backfill_result = None + context.chemistry_type = None + + +@given("legacy NMA_Radionuclides records exist in the database") +def step_given_legacy_radionuclides_exist(context: Context): + """Marker step — sets chemistry_type on context.""" + _ensure_backfill_tracking(context) + context.chemistry_type = "Radionuclides" + + +@given( + 'lexicon terms exist for parameter_name, unit, analysis_method_type, and sample_matrix "water"' +) +def step_given_lexicon_terms_exist(context: Context): + """Fail-fast check that critical lexicon terms are seeded. + + Also seeds test-only analyte names so the backfill can create + Parameter records without requiring production lexicon data. + """ + _ensure_backfill_tracking(context) + with session_ctx() as session: + from db.lexicon import LexiconTerm + + # Terms that must already exist (seeded by init_lexicon) + for term in ("water", "pCi/L", "Chemistry Observation"): + row = session.execute( + select(LexiconTerm).where(LexiconTerm.term == term) + ).scalar_one_or_none() + assert row is not None, ( + f"Required lexicon term '{term}' not found. " + "Ensure init_lexicon() ran during before_all." + ) + + # Test-only analyte names used by scenarios — seed if absent. + # In production these would be pre-seeded via a vocabulary + # preparation step. + test_analytes = ( + "GB", + "Uranium", + "GA", + "Ra228", + "Uranium-238", + "Radium-226", + "Nitrate", + "Unknown", + ) + for analyte in test_analytes: + existing = session.execute( + select(LexiconTerm).where(LexiconTerm.term == analyte) + ).scalar_one_or_none() + if existing is None: + session.add(LexiconTerm(term=analyte, definition=analyte)) + session.commit() + + +@given("a legacy NMA_Radionuclides record exists with:") +def step_given_single_radionuclide(context: Context): + """Create a single NMA_Radionuclides row from a vertical Behave table.""" + _ensure_backfill_tracking(context) + fields = {row["field"]: _parse_table_value(row["value"]) for row in context.table} + _create_radionuclide_from_fields(context, fields) + + +@given("legacy NMA_Radionuclides records exist with:") +def step_given_multi_radionuclides(context: Context): + """Create multiple NMA_Radionuclides rows from a horizontal Behave table.""" + _ensure_backfill_tracking(context) + for row in context.table: + fields = { + heading: _parse_table_value(row[heading]) + for heading in context.table.headings + } + _create_radionuclide_from_fields(context, fields) + + +@given("a legacy Chemistry_SampleInfo record exists with:") +def step_given_sampleinfo(context: Context): + """Create an NMA_Chemistry_SampleInfo row from a vertical Behave table.""" + _ensure_backfill_tracking(context) + _ensure_well(context) + fields = {row["field"]: _parse_table_value(row["value"]) for row in context.table} + + with session_ctx() as session: + well = context.objects["wells"][0] + well = session.merge(well) + + sampleinfo = NMA_Chemistry_SampleInfo( + thing_id=well.id, + nma_sample_pt_id=fields.get("SamplePtID"), + nma_sample_point_id=fields.get("SamplePointID", "UNKNOWN"), + ) + session.add(sampleinfo) + session.commit() + session.refresh(sampleinfo) + context._backfill_created["nma_sampleinfo_ids"].append(sampleinfo.id) + + # Store for later use + if not hasattr(context, "_sampleinfo_map"): + context._sampleinfo_map = {} + context._sampleinfo_map[fields["SamplePtID"]] = sampleinfo.id + + +@given('a Sample record exists with nma_pk_chemistrysample "{sample_pt_id}"') +def step_given_sample_with_chemistry_key(context: Context, sample_pt_id: str): + """Create FieldEvent → FieldActivity → Sample chain linked to first well.""" + _ensure_backfill_tracking(context) + _ensure_well(context) + + with session_ctx() as session: + well = context.objects["wells"][0] + well = session.merge(well) + + fe = FieldEvent( + thing_id=well.id, + event_date=datetime(2005, 1, 1, tzinfo=timezone.utc), + release_status="draft", + ) + session.add(fe) + session.flush() + context._backfill_created["field_event_ids"].append(fe.id) + + fa = FieldActivity( + field_event_id=fe.id, + activity_type="water chemistry", + release_status="draft", + ) + session.add(fa) + session.flush() + context._backfill_created["field_activity_ids"].append(fa.id) + + sample = Sample( + field_activity_id=fa.id, + sample_date=datetime(2005, 1, 1, tzinfo=timezone.utc), + sample_name=f"CHEM-{uuid.uuid4().hex[:8]}", + sample_matrix="water", + sample_method="Estimated", + qc_type="Normal", + nma_pk_chemistrysample=sample_pt_id, + release_status="draft", + ) + session.add(sample) + session.commit() + session.refresh(sample) + context._backfill_created["sample_ids"].append(sample.id) + + +# --------------------------------------------------------------------------- +# WHEN steps +# --------------------------------------------------------------------------- +def _snapshot_analysis_method_ids() -> set[int]: + """Return the set of all AnalysisMethod IDs that exist before the backfill.""" + with session_ctx() as session: + return {row[0] for row in session.execute(select(AnalysisMethod.id)).all()} + + +def _track_created_analysis_methods(context: Context, pre_ids: set[int]): + """Record only AnalysisMethod IDs that were actually created by the backfill.""" + _ensure_backfill_tracking(context) + with session_ctx() as session: + post_ids = {row[0] for row in session.execute(select(AnalysisMethod.id)).all()} + new_ids = post_ids - pre_ids + context._backfill_created["analysis_method_ids"] = list(new_ids) + + +@when("I run the Radionuclides backfill job") +def step_when_run_backfill(context: Context): + """Execute the backfill and store the result on context.""" + from transfers.backfill.chemistry_backfill import backfill_radionuclides + + pre_ids = _snapshot_analysis_method_ids() + context.backfill_result = backfill_radionuclides() + _track_created_analysis_methods(context, pre_ids) + + +@when("I run the Radionuclides backfill job again") +def step_when_run_backfill_again(context: Context): + """Re-run to verify idempotency.""" + from transfers.backfill.chemistry_backfill import backfill_radionuclides + + pre_ids = _snapshot_analysis_method_ids() + context.backfill_result = backfill_radionuclides() + _track_created_analysis_methods(context, pre_ids) + + +# --------------------------------------------------------------------------- +# THEN steps +# --------------------------------------------------------------------------- +@then( + 'exactly {n:d} Observation record should exist with nma_pk_chemistryresults "{global_id}"' +) +def step_then_observation_count(context: Context, n: int, global_id: str): + with session_ctx() as session: + count = session.execute( + select(Observation).where( + Observation.nma_pk_chemistryresults == global_id.lower() + ) + ).all() + assert ( + len(count) == n + ), f"Expected {n} Observation(s) for GlobalID {global_id}, found {len(count)}" + if len(count) == 1: + context._last_observation = count[0][0] + + +@then( + 'the Observation should reference the Sample with nma_pk_chemistrysample "{sample_pt_id}"' +) +def step_then_observation_refs_sample(context: Context, sample_pt_id: str): + obs = context._last_observation + with session_ctx() as session: + obs = session.merge(obs) + sample = session.get(Sample, obs.sample_id) + assert sample is not None, "Observation must reference a Sample" + assert sample.nma_pk_chemistrysample == sample_pt_id, ( + f"Expected Sample.nma_pk_chemistrysample={sample_pt_id}, " + f"got {sample.nma_pk_chemistrysample}" + ) + + +@then('the Observation should set observation_datetime to "{date_str}"') +def step_then_observation_datetime(context: Context, date_str: str): + obs = context._last_observation + with session_ctx() as session: + obs = session.merge(obs) + dt = obs.observation_datetime + # Normalize to UTC for comparison (pg may return in server tz) + if dt.tzinfo is not None: + dt = dt.astimezone(timezone.utc) + actual_date = dt.strftime("%Y-%m-%d") + assert ( + actual_date == date_str + ), f"Expected observation_datetime={date_str}, got {actual_date}" + + +@then("the Observation should set value to {value}") +def step_then_observation_value(context: Context, value: str): + obs = context._last_observation + expected = float(value) + with session_ctx() as session: + obs = session.merge(obs) + assert obs.value == expected, f"Expected value={expected}, got {obs.value}" + + +@then('the Observation should set unit to "{unit}"') +def step_then_observation_unit(context: Context, unit: str): + obs = context._last_observation + with session_ctx() as session: + obs = session.merge(obs) + assert obs.unit == unit, f"Expected unit={unit}, got {obs.unit}" + + +@then( + 'a Parameter record should exist with parameter_name "{name}" and matrix "{matrix}"' +) +def step_then_parameter_exists(context: Context, name: str, matrix: str): + with session_ctx() as session: + param = session.execute( + select(Parameter).where( + Parameter.parameter_name == name, Parameter.matrix == matrix + ) + ).scalar_one_or_none() + assert ( + param is not None + ), f"No Parameter found with parameter_name='{name}' and matrix='{matrix}'" + context._last_parameter = param + + +@then( + 'the Observation should reference the Parameter with parameter_name "{name}" and matrix "{matrix}"' +) +def step_then_observation_refs_parameter(context: Context, name: str, matrix: str): + obs = context._last_observation + with session_ctx() as session: + obs = session.merge(obs) + param = session.get(Parameter, obs.parameter_id) + assert param is not None, "Observation must reference a Parameter" + assert ( + param.parameter_name == name + ), f"Expected parameter_name='{name}', got '{param.parameter_name}'" + assert ( + param.matrix == matrix + ), f"Expected matrix='{matrix}', got '{param.matrix}'" + + +@then('the Observation should set analysis_method_name to "{method}"') +def step_then_observation_analysis_method(context: Context, method: str): + obs = context._last_observation + with session_ctx() as session: + obs = session.merge(obs) + assert ( + obs.analysis_method_id is not None + ), "Observation should have an analysis_method_id" + am = session.get(AnalysisMethod, obs.analysis_method_id) + assert am is not None, "AnalysisMethod should exist" + assert ( + am.analysis_method_name == method + ), f"Expected analysis_method_name='{method}', got '{am.analysis_method_name}'" + + +@then("the Observation should set uncertainty to {value}") +def step_then_observation_uncertainty(context: Context, value: str): + obs = context._last_observation + expected = float(value) + with session_ctx() as session: + obs = session.merge(obs) + assert ( + obs.uncertainty == expected + ), f"Expected uncertainty={expected}, got {obs.uncertainty}" + + +@then('the Observation should set analysis_agency to "{agency}"') +def step_then_observation_analysis_agency(context: Context, agency: str): + obs = context._last_observation + with session_ctx() as session: + obs = session.merge(obs) + assert ( + obs.analysis_agency == agency + ), f"Expected analysis_agency='{agency}', got '{obs.analysis_agency}'" + + +# --- GlobalID-variant steps --- +@then( + 'the Observation for GlobalID "{global_id}" should reference the Sample with nma_pk_chemistrysample "{sample_pt_id}"' +) +def step_then_obs_by_gid_refs_sample( + context: Context, global_id: str, sample_pt_id: str +): + with session_ctx() as session: + obs = _get_observation_by_globalid(session, global_id) + sample = session.get(Sample, obs.sample_id) + assert sample is not None + assert sample.nma_pk_chemistrysample == sample_pt_id, ( + f"Expected Sample.nma_pk_chemistrysample={sample_pt_id}, " + f"got {sample.nma_pk_chemistrysample}" + ) + + +@then( + 'the Observation for GlobalID "{global_id}" should reference the Thing associated with that Sample' +) +def step_then_obs_by_gid_refs_thing(context: Context, global_id: str): + with session_ctx() as session: + obs = _get_observation_by_globalid(session, global_id) + sample = session.get(Sample, obs.sample_id) + assert sample is not None + fa = sample.field_activity + assert fa is not None, "Sample must have a field_activity" + fe = fa.field_event + assert fe is not None, "FieldActivity must have a field_event" + assert fe.thing_id is not None, "FieldEvent must reference a Thing" + + +@then( + 'the Observation for GlobalID "{global_id}" should set analysis_method_name to "{method}"' +) +def step_then_obs_by_gid_analysis_method(context: Context, global_id: str, method: str): + with session_ctx() as session: + obs = _get_observation_by_globalid(session, global_id) + assert obs.analysis_method_id is not None + am = session.get(AnalysisMethod, obs.analysis_method_id) + assert am is not None + assert ( + am.analysis_method_name == method + ), f"Expected analysis_method_name='{method}', got '{am.analysis_method_name}'" + + +@then( + 'the Observation for GlobalID "{global_id}" should reference the Parameter with parameter_name "{name}" and matrix "{matrix}"' +) +def step_then_obs_by_gid_refs_parameter( + context: Context, global_id: str, name: str, matrix: str +): + with session_ctx() as session: + obs = _get_observation_by_globalid(session, global_id) + param = session.get(Parameter, obs.parameter_id) + assert param is not None + assert param.parameter_name == name + assert param.matrix == matrix + + +@then('the Observation for GlobalID "{global_id}" should set detect_flag to {flag}') +def step_then_obs_by_gid_detect_flag(context: Context, global_id: str, flag: str): + expected = flag.lower() == "true" + with session_ctx() as session: + obs = _get_observation_by_globalid(session, global_id) + assert ( + obs.detect_flag == expected + ), f"Expected detect_flag={expected}, got {obs.detect_flag}" + + +@then( + 'the Observation for GlobalID "{global_id}" should not store SamplePointID, OBJECTID, or WCLab_ID' +) +def step_then_obs_by_gid_no_extra_columns(context: Context, global_id: str): + with session_ctx() as session: + obs = _get_observation_by_globalid(session, global_id) + for attr in ("nma_sample_point_id", "nma_object_id", "nma_wclab_id"): + assert not hasattr( + obs, attr + ), f"Observation should not have attribute '{attr}'" + + +# --- Sample volume steps --- +@then("the Sample should set volume to {value}") +def step_then_sample_volume(context: Context, value: str): + expected = float(value) + scenario_sample_ids = context._backfill_created.get("sample_ids", []) + assert scenario_sample_ids, "No sample_ids tracked for this scenario" + with session_ctx() as session: + sample = ( + session.execute(select(Sample).where(Sample.id.in_(scenario_sample_ids))) + .scalars() + .first() + ) + assert sample is not None, "No Sample found for this scenario" + assert ( + sample.volume == expected + ), f"Expected volume={expected}, got {sample.volume}" + + +@then('the Sample should set volume_unit to "{unit}"') +def step_then_sample_volume_unit(context: Context, unit: str): + scenario_sample_ids = context._backfill_created.get("sample_ids", []) + assert scenario_sample_ids, "No sample_ids tracked for this scenario" + with session_ctx() as session: + sample = ( + session.execute(select(Sample).where(Sample.id.in_(scenario_sample_ids))) + .scalars() + .first() + ) + assert sample is not None, "No Sample found for this scenario" + assert ( + sample.volume_unit == unit + ), f"Expected volume_unit='{unit}', got '{sample.volume_unit}'" + + +# --- Notes steps --- +@then("a Notes record should exist with:") +def step_then_notes_exist(context: Context): + fields = {row["field"]: row["value"] for row in context.table} + target_table = fields["target_table"] + note_type = fields["note_type"] + content = fields["content"] + + # Resolve dynamic target_id + target_id_raw = fields["target_id"] + if target_id_raw.startswith("(observation.id for GlobalID"): + gid = target_id_raw.split("GlobalID")[1].strip().rstrip(")") + with session_ctx() as session: + obs = _get_observation_by_globalid(session, gid) + target_id = obs.id + else: + target_id = int(target_id_raw) + + with session_ctx() as session: + note = session.execute( + select(Notes).where( + Notes.target_id == target_id, + Notes.target_table == target_table, + Notes.note_type == note_type, + Notes.content == content, + ) + ).scalar_one_or_none() + assert note is not None, ( + f"No Notes found with target_table={target_table}, target_id={target_id}, " + f"note_type={note_type}, content={content}" + ) + + +# --- Orphan prevention steps --- +@then( + "the backfill job should report {n:d} skipped record due to missing Sample linkage (SamplePtID)" +) +def step_then_skipped_orphans(context: Context, n: int): + assert context.backfill_result is not None, "Backfill result not found on context" + assert ( + context.backfill_result.skipped_orphans == n + ), f"Expected {n} skipped orphans, got {context.backfill_result.skipped_orphans}" + + +@then('no Observation record should exist with nma_pk_chemistryresults "{global_id}"') +def step_then_no_observation(context: Context, global_id: str): + with session_ctx() as session: + count = session.execute( + select(Observation).where( + Observation.nma_pk_chemistryresults == global_id.lower() + ) + ).all() + assert ( + len(count) == 0 + ), f"Expected 0 Observations for GlobalID {global_id}, found {len(count)}" + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- +def _create_radionuclide_from_fields(context: Context, fields: dict): + """Create NMA_Chemistry_SampleInfo (if needed) and NMA_Radionuclides row.""" + _ensure_well(context) + global_id = fields.get("GlobalID") + sample_pt_id = fields.get("SamplePtID") + + with session_ctx() as session: + well = context.objects["wells"][0] + well = session.merge(well) + + # Get or create SampleInfo for this SamplePtID + sampleinfo = session.execute( + select(NMA_Chemistry_SampleInfo).where( + NMA_Chemistry_SampleInfo.nma_sample_pt_id == sample_pt_id + ) + ).scalar_one_or_none() + + if sampleinfo is None: + sampleinfo = NMA_Chemistry_SampleInfo( + thing_id=well.id, + nma_sample_pt_id=sample_pt_id, + nma_sample_point_id=fields.get("SamplePointID", "UNKNOWN"), + ) + session.add(sampleinfo) + session.flush() + context._backfill_created["nma_sampleinfo_ids"].append(sampleinfo.id) + + # Parse analysis date (store as UTC so the backfill's tz handling is a no-op) + analysis_date = None + if fields.get("AnalysisDate"): + analysis_date = datetime.strptime( + fields["AnalysisDate"], "%Y-%m-%d" + ).replace(tzinfo=timezone.utc) + + # Parse numeric fields + sample_value = ( + float(fields["SampleValue"]) if fields.get("SampleValue") else None + ) + uncertainty_val = ( + float(fields["Uncertainty"]) if fields.get("Uncertainty") else None + ) + volume_val = int(fields["Volume"]) if fields.get("Volume") else None + + rad = NMA_Radionuclides( + chemistry_sample_info_id=sampleinfo.id, + nma_global_id=global_id, + nma_sample_pt_id=sample_pt_id, + nma_sample_point_id=fields.get("SamplePointID"), + nma_object_id=int(fields["OBJECTID"]) if fields.get("OBJECTID") else None, + nma_wclab_id=fields.get("WCLab_ID"), + analyte=fields.get("Analyte"), + symbol=fields.get("Symbol"), + sample_value=sample_value, + units=fields.get("Units"), + uncertainty=uncertainty_val, + analysis_method=fields.get("AnalysisMethod"), + analysis_date=analysis_date, + notes=fields.get("Notes"), + volume=volume_val, + volume_unit=fields.get("VolumeUnit"), + analyses_agency=fields.get("AnalysesAgency"), + ) + session.add(rad) + session.commit() + session.refresh(rad) + context._backfill_created["nma_radionuclide_ids"].append(rad.id) + + +# ============= EOF ============================================= diff --git a/transfers/backfill/backfill.py b/transfers/backfill/backfill.py index fc7f5026..304846a7 100644 --- a/transfers/backfill/backfill.py +++ b/transfers/backfill/backfill.py @@ -17,10 +17,9 @@ Orchestrates the backfill pipeline used in CD workflows. Preferred usage (avoids import path issues): - python -m transfers.backfill.backfill --batch-size 1000 + python -m transfers.backfill.backfill """ -import argparse import sys from pathlib import Path @@ -30,26 +29,19 @@ sys.path.insert(0, str(ROOT)) from services.util import get_bool_env +from transfers.backfill.chemistry_backfill import backfill_radionuclides from transfers.logger import logger -def run(batch_size: int = 1000) -> None: - """ - Execute all backfill steps in a deterministic order. - - Currently, no concrete backfill steps are registered. This function is kept - as a stable orchestration entry point (used by CD/CLI) and will be wired - up to real backfill steps in future refactoring work. - """ - # NOTE: Intentionally empty; this serves as a placeholder until concrete - # backfill steps are implemented and registered in this tuple. - steps = () +def run() -> None: + """Execute all backfill steps in a deterministic order.""" + steps = (("Radionuclides", backfill_radionuclides, "BACKFILL_RADIONUCLIDES"),) for name, fn, flag in steps: if not get_bool_env(flag, True): logger.info(f"Skipping backfill: {name} ({flag}=false)") continue logger.info(f"Starting backfill: {name}") - result = fn(batch_size) + result = fn() logger.info( f"Completed backfill: {name} — " f"inserted={result.inserted} updated={result.updated} " @@ -60,21 +52,17 @@ def run(batch_size: int = 1000) -> None: logger.warning(f" {name}: {err}") -def _parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Run backfill pipeline.") - parser.add_argument( - "--batch-size", - type=int, - default=1000, - help="Number of rows to insert per batch.", - ) - return parser.parse_args() - - if __name__ == "__main__": - args = _parse_args() + if len(sys.argv) > 1: + logger.error( + "Unknown arguments: %s. " + "CLI options (--batch-size) were removed; " + "use BACKFILL_* env vars to control execution.", + " ".join(sys.argv[1:]), + ) + sys.exit(2) try: - run(batch_size=args.batch_size) + run() except Exception: logger.critical("Backfill orchestration failed", exc_info=True) sys.exit(1) diff --git a/transfers/backfill/chemistry_backfill.py b/transfers/backfill/chemistry_backfill.py new file mode 100644 index 00000000..471e2cc3 --- /dev/null +++ b/transfers/backfill/chemistry_backfill.py @@ -0,0 +1,335 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +Backfill NMA_Radionuclides rows into the Ocotillo Observation schema. + +Reads legacy NMA_Radionuclides + NMA_Chemistry_SampleInfo, resolves each +row to an existing Sample (via nma_pk_chemistrysample), and upserts +Observation records keyed on nma_pk_chemistryresults. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import timezone + +from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.orm import Session + +from db.analysis_method import AnalysisMethod +from db.notes import Notes +from db.nma_legacy import NMA_Radionuclides, NMA_Chemistry_SampleInfo +from db.observation import Observation +from db.parameter import Parameter +from db.sample import Sample +from db.engine import session_ctx +from transfers.logger import logger + + +@dataclass +class BackfillResult: + inserted: int = 0 + updated: int = 0 + skipped_orphans: int = 0 + errors: list[str] = field(default_factory=list) + + +def _build_sample_cache(session: Session) -> dict[str, int]: + """Map Sample.nma_pk_chemistrysample → Sample.id for rows that have the key.""" + rows = session.execute( + select(Sample.nma_pk_chemistrysample, Sample.id).where( + Sample.nma_pk_chemistrysample.isnot(None) + ) + ).all() + return {str(key).lower(): sid for key, sid in rows} + + +def _get_or_create_parameter( + session: Session, parameter_name: str, matrix: str = "water" +) -> Parameter: + """Return existing Parameter or create a new one. + + The parameter_name must already exist as a LexiconTerm (FK constraint). + If it doesn't, the insert will fail — callers should ensure vocabulary + is seeded before running the backfill. + """ + param = session.execute( + select(Parameter).where( + Parameter.parameter_name == parameter_name, + Parameter.matrix == matrix, + ) + ).scalar_one_or_none() + if param is None: + param = Parameter( + parameter_name=parameter_name, + matrix=matrix, + default_unit="pCi/L", + release_status="draft", + ) + session.add(param) + session.flush() + return param + + +def _get_or_create_analysis_method( + session: Session, method_code: str +) -> AnalysisMethod: + """Return existing AnalysisMethod or create a new one.""" + am = session.execute( + select(AnalysisMethod).where(AnalysisMethod.analysis_method_code == method_code) + ).scalar_one_or_none() + if am is None: + am = AnalysisMethod( + analysis_method_code=method_code, + analysis_method_name=method_code, + release_status="draft", + ) + session.add(am) + session.flush() + return am + + +def _resolve_detect_flag(symbol: str | None, value: float | None) -> bool | None: + """Map legacy Symbol qualifier to detect_flag. + + Rules: + - Symbol == '<' → False (below detection limit) + - Symbol is non-null and != '<' → True (detected, other qualifier) + - Symbol is null and value is not None → True (detected, no qualifier) + - Symbol is null and value is None → None (unknown) + """ + if symbol is not None: + return symbol != "<" + if value is not None: + return True + return None + + +def _backfill_radionuclides_impl(session: Session) -> BackfillResult: + result = BackfillResult() + + sample_cache = _build_sample_cache(session) + logger.info("Sample cache built with %d entries", len(sample_cache)) + + # Track existing observation keys so we know insert vs update + existing_keys = set( + row[0].lower() if row[0] else row[0] + for row in session.execute( + select(Observation.nma_pk_chemistryresults).where( + Observation.nma_pk_chemistryresults.isnot(None) + ) + ).all() + ) + + # Query all radionuclides joined with sample info for nma_sample_pt_id. + # ORDER BY id for deterministic processing — when multiple rows map to + # the same Sample, the lowest-PK row's volume/volume_unit wins. + legacy_rows = session.execute( + select(NMA_Radionuclides, NMA_Chemistry_SampleInfo.nma_sample_pt_id) + .join( + NMA_Chemistry_SampleInfo, + NMA_Radionuclides.chemistry_sample_info_id == NMA_Chemistry_SampleInfo.id, + ) + .order_by(NMA_Radionuclides.id) + ).all() + + logger.info("Processing %d legacy Radionuclides rows", len(legacy_rows)) + + for row, nma_sample_pt_id in legacy_rows: + global_id_str = str(row.nma_global_id).lower() if row.nma_global_id else None + if global_id_str is None: + result.errors.append(f"Row id={row.id} has no nma_global_id") + continue + + # Resolve Sample via nma_pk_chemistrysample + sample_pt_key = str(nma_sample_pt_id).lower() if nma_sample_pt_id else None + if sample_pt_key is None or sample_pt_key not in sample_cache: + result.skipped_orphans += 1 + logger.debug( + "Skipping Radionuclide GlobalID=%s — no matching Sample for SamplePtID=%s", + global_id_str, + sample_pt_key, + ) + continue + + sample_id = sample_cache[sample_pt_key] + + analyte = row.analyte + if not analyte: + result.errors.append( + f"Row GlobalID={global_id_str} has no Analyte — skipping" + ) + continue + + # Build observation_datetime + obs_dt = row.analysis_date + if obs_dt is not None: + if obs_dt.tzinfo is None: + obs_dt = obs_dt.replace(tzinfo=timezone.utc) + else: + result.errors.append( + f"Row GlobalID={global_id_str} has no AnalysisDate — skipping" + ) + continue + + # Per-row savepoint so one bad row doesn't abort the entire backfill + savepoint = session.begin_nested() + try: + # Get-or-create Parameter + param = _get_or_create_parameter(session, analyte, "water") + + # Get-or-create AnalysisMethod + analysis_method_id = None + if row.analysis_method: + am = _get_or_create_analysis_method(session, row.analysis_method) + analysis_method_id = am.id + + # Resolve detect_flag + detect_flag = _resolve_detect_flag(row.symbol, row.sample_value) + + # Determine unit + unit = row.units if row.units else "pCi/L" + + # Upsert Observation + obs_values = { + "nma_pk_chemistryresults": global_id_str, + "sample_id": sample_id, + "parameter_id": param.id, + "analysis_method_id": analysis_method_id, + "observation_datetime": obs_dt, + "value": row.sample_value, + "unit": unit, + "detect_flag": detect_flag, + "uncertainty": row.uncertainty, + "analysis_agency": row.analyses_agency, + "release_status": "draft", + } + + stmt = pg_insert(Observation).values(**obs_values) + stmt = stmt.on_conflict_do_update( + index_elements=["nma_pk_chemistryresults"], + set_={ + "sample_id": stmt.excluded.sample_id, + "parameter_id": stmt.excluded.parameter_id, + "analysis_method_id": stmt.excluded.analysis_method_id, + "observation_datetime": stmt.excluded.observation_datetime, + "value": stmt.excluded.value, + "unit": stmt.excluded.unit, + "detect_flag": stmt.excluded.detect_flag, + "uncertainty": stmt.excluded.uncertainty, + "analysis_agency": stmt.excluded.analysis_agency, + }, + ) + session.execute(stmt) + + if global_id_str in existing_keys: + result.updated += 1 + else: + result.inserted += 1 + existing_keys.add(global_id_str) + + # Update Sample volume/volume_unit if present on legacy row. + # Only set when the sample doesn't already have a value to avoid + # nondeterministic overwrites when multiple rows share a sample. + if row.volume is not None or row.volume_unit is not None: + sample = session.get(Sample, sample_id) + if sample: + if row.volume is not None: + if sample.volume is None: + sample.volume = float(row.volume) + elif float(sample.volume) != float(row.volume): + logger.warning( + "Sample id=%s already has volume=%s, " + "skipping conflicting value %s from GlobalID=%s", + sample_id, + sample.volume, + row.volume, + global_id_str, + ) + if row.volume_unit is not None: + if sample.volume_unit is None: + sample.volume_unit = row.volume_unit + elif sample.volume_unit != row.volume_unit: + logger.warning( + "Sample id=%s already has volume_unit=%s, " + "skipping conflicting value %s from GlobalID=%s", + sample_id, + sample.volume_unit, + row.volume_unit, + global_id_str, + ) + + # Create Notes if present + if row.notes: + obs = session.execute( + select(Observation).where( + Observation.nma_pk_chemistryresults == global_id_str + ) + ).scalar_one_or_none() + + if obs is None: + result.errors.append( + f"Row GlobalID={global_id_str}: upserted Observation not found " + f"when creating Notes — skipping note" + ) + else: + existing_note = session.execute( + select(Notes).where( + Notes.target_id == obs.id, + Notes.target_table == "observation", + Notes.note_type == "Chemistry Observation", + Notes.content == row.notes, + ) + ).scalar_one_or_none() + + if existing_note is None: + note = Notes( + target_id=obs.id, + target_table="observation", + note_type="Chemistry Observation", + content=row.notes, + release_status="draft", + ) + session.add(note) + + savepoint.commit() + except Exception as exc: + savepoint.rollback() + logger.exception("Row GlobalID=%s failed", global_id_str) + result.errors.append( + f"Row GlobalID={global_id_str}: {type(exc).__name__}: {exc}" + ) + continue + + session.commit() + logger.info( + "Backfill complete: inserted=%d updated=%d skipped_orphans=%d errors=%d", + result.inserted, + result.updated, + result.skipped_orphans, + len(result.errors), + ) + return result + + +def backfill_radionuclides() -> BackfillResult: + """Top-level runner for the radionuclides backfill.""" + with session_ctx() as session: + return _backfill_radionuclides_impl(session) + + +# ============= EOF =============================================