Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 204 additions & 0 deletions alembic/versions/545a5b77e5e8_add_chemistry_backfill_columns_to_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
"""add chemistry backfill columns to observation and sample

Revision ID: 545a5b77e5e8
Revises: d5e6f7a8b9c0
Create Date: 2026-02-27 11:30:45.380002

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision: str = "545a5b77e5e8"
down_revision: Union[str, Sequence[str], None] = "d5e6f7a8b9c0"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
"""Add chemistry backfill columns to observation and sample tables."""
# Observation table: 4 new columns
op.add_column(
"observation",
sa.Column(
"nma_pk_chemistryresults",
sa.String(),
nullable=True,
comment="NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key",
),
)
op.add_column(
"observation",
sa.Column(
"detect_flag",
sa.Boolean(),
nullable=True,
comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier",
),
)
op.add_column(
"observation",
sa.Column(
"uncertainty",
sa.Float(),
nullable=True,
comment="Measurement uncertainty for the observation value",
),
)
op.add_column(
"observation",
sa.Column(
"analysis_agency",
sa.String(),
nullable=True,
comment="Agency or lab that performed the analysis",
),
)
op.create_unique_constraint(
"uq_observation_nma_pk_chemistryresults",
"observation",
["nma_pk_chemistryresults"],
)

# Observation version table (sqlalchemy-continuum)
op.add_column(
"observation_version",
sa.Column(
"nma_pk_chemistryresults",
sa.String(),
autoincrement=False,
nullable=True,
comment="NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key",
),
)
op.add_column(
"observation_version",
sa.Column(
"detect_flag",
sa.Boolean(),
autoincrement=False,
nullable=True,
comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier",
),
)
op.add_column(
"observation_version",
sa.Column(
"uncertainty",
sa.Float(),
autoincrement=False,
nullable=True,
comment="Measurement uncertainty for the observation value",
),
)
op.add_column(
"observation_version",
sa.Column(
"analysis_agency",
sa.String(),
autoincrement=False,
nullable=True,
comment="Agency or lab that performed the analysis",
),
)

# Sample table: 3 new columns
op.add_column(
"sample",
sa.Column(
"nma_pk_chemistrysample",
sa.String(),
nullable=True,
comment="NM_Aquifer SamplePtID for chemistry samples — transfer audit key",
),
)
op.add_column(
"sample",
sa.Column(
"volume",
sa.Float(),
nullable=True,
comment="Volume of the sample collected",
),
)
op.add_column(
"sample",
sa.Column(
"volume_unit",
sa.String(),
nullable=True,
comment="Unit for the sample volume (e.g. mL, L)",
),
)
op.create_unique_constraint(
"uq_sample_nma_pk_chemistrysample",
"sample",
["nma_pk_chemistrysample"],
)

# Drop stale thing_id column from NMA_Radionuclides (model no longer defines it;
# relationships go through NMA_Chemistry_SampleInfo.thing_id instead).
# Guard against environments where the column was already removed.
conn = op.get_bind()
has_thing_id = conn.execute(
sa.text(
"SELECT 1 FROM information_schema.columns "
"WHERE table_name = 'NMA_Radionuclides' AND column_name = 'thing_id'"
)
).scalar()
if has_thing_id:
# FK name may differ across environments; look it up dynamically.
fks = sa.inspect(conn).get_foreign_keys("NMA_Radionuclides")
for fk in fks:
if "thing_id" in fk["constrained_columns"] and fk.get("name"):
op.drop_constraint(fk["name"], "NMA_Radionuclides", type_="foreignkey")
op.drop_column("NMA_Radionuclides", "thing_id")


def downgrade() -> None:
"""Remove chemistry backfill columns."""
# Restore NMA_Radionuclides.thing_id (add nullable, backfill, then enforce)
op.add_column(
"NMA_Radionuclides",
sa.Column(
"thing_id",
sa.Integer(),
nullable=True,
),
)
op.execute(
'UPDATE "NMA_Radionuclides" r '
"SET thing_id = csi.thing_id "
'FROM "NMA_Chemistry_SampleInfo" csi '
"WHERE r.chemistry_sample_info_id = csi.id"
)
op.alter_column("NMA_Radionuclides", "thing_id", nullable=False)
op.create_foreign_key(
"NMA_Radionuclides_thing_id_fkey",
"NMA_Radionuclides",
"thing",
["thing_id"],
["id"],
ondelete="CASCADE",
)

op.drop_constraint("uq_sample_nma_pk_chemistrysample", "sample", type_="unique")
op.drop_column("sample", "volume_unit")
op.drop_column("sample", "volume")
op.drop_column("sample", "nma_pk_chemistrysample")

op.drop_column("observation_version", "analysis_agency")
op.drop_column("observation_version", "uncertainty")
op.drop_column("observation_version", "detect_flag")
op.drop_column("observation_version", "nma_pk_chemistryresults")

op.drop_constraint(
"uq_observation_nma_pk_chemistryresults", "observation", type_="unique"
)
op.drop_column("observation", "analysis_agency")
op.drop_column("observation", "uncertainty")
op.drop_column("observation", "detect_flag")
op.drop_column("observation", "nma_pk_chemistryresults")
8 changes: 2 additions & 6 deletions core/initializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,8 @@ def erase_and_rebuild_db():
")"
)
).scalar()
if not pg_cron_available:
raise RuntimeError(
"Cannot erase and rebuild database: pg_cron extension is not "
"available on this PostgreSQL server."
)
session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron"))
if pg_cron_available:
session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron"))
session.commit()
Base.metadata.drop_all(session.bind)
Base.metadata.create_all(session.bind)
Expand Down
14 changes: 14 additions & 0 deletions core/lexicon.json
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,13 @@
"term": "mg/L",
"definition": "Milligrams per Liter"
},
{
"categories": [
"unit"
],
"term": "pCi/L",
"definition": "Picocuries per Liter"
},
{
"categories": [
"unit"
Expand Down Expand Up @@ -8213,6 +8220,13 @@
"term": "Site Notes (legacy)",
"definition": "Legacy site notes field from WaterLevels"
},
{
"categories": [
"note_type"
],
"term": "Chemistry Observation",
"definition": "Notes from chemistry observation results"
},
{
"categories": [
"well_pump_type"
Expand Down
8 changes: 2 additions & 6 deletions db/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,8 @@ def recreate_public_schema(session: Session) -> None:
")"
)
).scalar()
if not pg_cron_available:
raise RuntimeError(
"Cannot initialize database schema: pg_cron extension is not available "
"on this PostgreSQL server."
)
session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron"))
if pg_cron_available:
session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron"))
session.execute(APP_READ_GRANT_SQL)
grant_app_read_members(session)
session.commit()
Expand Down
19 changes: 19 additions & 0 deletions db/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,25 @@ class Observation(Base, AutoBaseMixin, ReleaseMixin):

# NM_Aquifer fields for audits
nma_pk_waterlevels: Mapped[str] = mapped_column(nullable=True)
nma_pk_chemistryresults: Mapped[str] = mapped_column(
nullable=True,
unique=True,
comment="NM_Aquifer GlobalID for chemistry results — transfer audit and idempotent upsert key",
)

# Chemistry-specific columns
detect_flag: Mapped[bool] = mapped_column(
nullable=True,
comment="True=detected, False=below detection limit (legacy Symbol '<'), None=no qualifier",
)
uncertainty: Mapped[float] = mapped_column(
nullable=True,
comment="Measurement uncertainty for the observation value",
)
analysis_agency: Mapped[str] = mapped_column(
nullable=True,
comment="Agency or lab that performed the analysis",
)

# --- Foreign Keys ---
sample_id: Mapped[int] = mapped_column(
Expand Down
15 changes: 15 additions & 0 deletions db/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,21 @@ class Sample(Base, AutoBaseMixin, ReleaseMixin):
nullable=True,
comment="NM_Aquifer primary key for waterlevels - to be used for transfer audits",
)
nma_pk_chemistrysample: Mapped[str] = mapped_column(
nullable=True,
unique=True,
comment="NM_Aquifer SamplePtID for chemistry samples — transfer audit key",
)

# Chemistry sample attributes
volume: Mapped[float] = mapped_column(
nullable=True,
comment="Volume of the sample collected",
)
volume_unit: Mapped[str] = mapped_column(
nullable=True,
comment="Unit for the sample volume (e.g. mL, L)",
)

# --- Relationship Definitions ---
field_activity: Mapped["FieldActivity"] = relationship(back_populates="samples")
Expand Down
5 changes: 2 additions & 3 deletions run_backfill.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# e.g. state, county, quad_name,etc. It will also be used to handle data refactors/corrections in the future.

# Load environment variables from .env and run the staging backfill.
# Usage: ./run_backfill.sh [--batch-size N]
# Usage: ./run_backfill.sh

# github workflow equivalent: for reference only
#- name: Run backfill script on staging database
Expand Down Expand Up @@ -38,5 +38,4 @@ set +a

uv run alembic upgrade head

# Forward any args (e.g., --batch-size 500)
python -m transfers.backfill.backfill "$@"
python -m transfers.backfill.backfill
4 changes: 4 additions & 0 deletions schemas/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ class BaseObservationResponse(BaseResponseModel):
value: float | None
unit: Unit
nma_data_quality: str | None = None
nma_pk_chemistryresults: str | None = None
detect_flag: bool | None = None
uncertainty: float | None = None
analysis_agency: str | None = None


class GroundwaterLevelObservationResponse(BaseObservationResponse):
Expand Down
3 changes: 3 additions & 0 deletions schemas/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ class SampleResponse(BaseResponseModel):
notes: str | None
depth_top: float | None
depth_bottom: float | None
nma_pk_chemistrysample: str | None = None
volume: float | None = None
volume_unit: str | None = None


# ============= EOF =============================================
Loading