diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 4d314f2d..79bfcd7e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -35,7 +35,7 @@ jobs: - name: Check out source repository uses: actions/checkout@v6.0.2 - - name: Start database (PostGIS + pg_cron) + - name: Start database (PostGIS) run: | docker compose build db docker compose up -d db @@ -81,7 +81,6 @@ jobs: PGPASSWORD=postgres psql -h localhost -p 5432 -U postgres -tc "SELECT 1 FROM pg_database WHERE datname = 'ocotilloapi_test'" | grep -q 1 || \ PGPASSWORD=postgres psql -h localhost -p 5432 -U postgres -c "CREATE DATABASE ocotilloapi_test" PGPASSWORD=postgres psql -h localhost -p 5432 -U postgres -d ocotilloapi_test -c "CREATE EXTENSION IF NOT EXISTS postgis" - PGPASSWORD=postgres psql -h localhost -p 5432 -U postgres -d ocotilloapi_test -c "CREATE EXTENSION IF NOT EXISTS pg_cron" - name: Run tests run: uv run pytest -vv --durations=20 --cov --cov-report=xml --junitxml=junit.xml --ignore=tests/transfers @@ -121,7 +120,7 @@ jobs: - name: Check out source repository uses: actions/checkout@v6.0.2 - - name: Start database (PostGIS + pg_cron) + - name: Start database (PostGIS) run: | docker compose build db docker compose up -d db @@ -167,7 +166,6 @@ jobs: PGPASSWORD=postgres psql -h localhost -p 5432 -U postgres -tc "SELECT 1 FROM pg_database WHERE datname = 'ocotilloapi_test'" | grep -q 1 || \ PGPASSWORD=postgres psql -h localhost -p 5432 -U postgres -c "CREATE DATABASE ocotilloapi_test" PGPASSWORD=postgres psql -h localhost -p 5432 -U postgres -d ocotilloapi_test -c "CREATE EXTENSION IF NOT EXISTS postgis" - PGPASSWORD=postgres psql -h localhost -p 5432 -U postgres -d ocotilloapi_test -c "CREATE EXTENSION IF NOT EXISTS pg_cron" - name: Run BDD tests run: uv run behave tests/features --tags="@backend and @production and not @skip" --no-capture diff --git a/alembic/versions/d5e6f7a8b9c0_create_pygeoapi_supporting_views.py b/alembic/versions/d5e6f7a8b9c0_create_pygeoapi_supporting_views.py index 6aab7887..e11bf240 100644 --- a/alembic/versions/d5e6f7a8b9c0_create_pygeoapi_supporting_views.py +++ b/alembic/versions/d5e6f7a8b9c0_create_pygeoapi_supporting_views.py @@ -17,8 +17,6 @@ branch_labels: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None REFRESH_FUNCTION_NAME = "refresh_pygeoapi_materialized_views" -REFRESH_JOB_NAME = "refresh_pygeoapi_matviews_nightly" -REFRESH_SCHEDULE = "0 3 * * *" THING_COLLECTIONS = [ ("water_wells", "water well"), @@ -73,9 +71,7 @@ def _create_thing_view(view_id: str, thing_type: str) -> str: SELECT t.id, t.name, - t.thing_type, t.first_visit_date, - t.spring_type, t.nma_pk_welldata, t.well_depth, t.hole_depth, @@ -89,6 +85,7 @@ def _create_thing_view(view_id: str, thing_type: str) -> str: t.formation_completion_code, t.nma_formation_zone, t.release_status, + l.elevation, l.point FROM thing AS t JOIN latest_location AS ll ON ll.thing_id = t.id @@ -154,7 +151,7 @@ def _create_avg_tds_view() -> str: SELECT csi.thing_id, mc.id AS major_chemistry_id, - mc."AnalysisDate" AS analysis_date, + COALESCE(mc."AnalysisDate", csi."CollectionDate")::date AS observation_date, mc."SampleValue" AS sample_value, mc."Units" AS units FROM "NMA_MajorChemistry" AS mc @@ -178,8 +175,8 @@ def _create_avg_tds_view() -> str: t.thing_type, COUNT(to2.major_chemistry_id)::integer AS tds_observation_count, AVG(to2.sample_value)::double precision AS avg_tds_value, - MIN(to2.analysis_date) AS first_tds_observation_datetime, - MAX(to2.analysis_date) AS latest_tds_observation_datetime, + MIN(to2.observation_date) AS first_tds_observation_date, + MAX(to2.observation_date) AS last_tds_observation_date, l.point FROM tds_obs AS to2 JOIN thing AS t ON t.id = to2.thing_id @@ -231,68 +228,6 @@ def _create_refresh_function() -> str: """ -def _schedule_refresh_job() -> str: - return f""" - DO $do$ - BEGIN - BEGIN - -- Avoid direct SELECT on cron.job because managed Postgres - -- environments may deny access to the cron schema table. - PERFORM cron.unschedule('{REFRESH_JOB_NAME}'); - EXCEPTION - WHEN undefined_function THEN - NULL; - WHEN invalid_parameter_value THEN - NULL; - WHEN internal_error THEN - -- Some pg_cron builds raise internal_error when the named - -- job does not exist. Treat this as already-unscheduled. - NULL; - WHEN insufficient_privilege THEN - RAISE NOTICE - 'Skipping pg_cron unschedule for % due to insufficient privileges.', - '{REFRESH_JOB_NAME}'; - RETURN; - END; - - PERFORM cron.schedule( - '{REFRESH_JOB_NAME}', - '{REFRESH_SCHEDULE}', - $cmd$SELECT public.{REFRESH_FUNCTION_NAME}();$cmd$ - ); - EXCEPTION - WHEN insufficient_privilege THEN - RAISE NOTICE - 'Skipping pg_cron schedule for % due to insufficient privileges.', - '{REFRESH_JOB_NAME}'; - END - $do$; - """ - - -def _unschedule_refresh_job() -> str: - return f""" - DO $do$ - BEGIN - BEGIN - PERFORM cron.unschedule('{REFRESH_JOB_NAME}'); - EXCEPTION - WHEN undefined_function THEN - NULL; - WHEN invalid_parameter_value THEN - NULL; - WHEN internal_error THEN - NULL; - WHEN insufficient_privilege THEN - RAISE NOTICE - 'Skipping pg_cron unschedule for % due to insufficient privileges.', - '{REFRESH_JOB_NAME}'; - END; - END - $do$; - """ - - def upgrade() -> None: bind = op.get_bind() inspector = inspect(bind) @@ -307,16 +242,6 @@ def upgrade() -> None: f"tables are missing: {missing_tables_str}" ) - pg_cron_available = bind.execute( - text( - "SELECT EXISTS (" - "SELECT 1 FROM pg_available_extensions WHERE name = 'pg_cron'" - ")" - ) - ).scalar() - if pg_cron_available: - op.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron")) - for view_id, thing_type in THING_COLLECTIONS: safe_view_id = _safe_view_id(view_id) op.execute(text(f"DROP VIEW IF EXISTS ogc_{safe_view_id}")) @@ -360,21 +285,9 @@ def upgrade() -> None: _create_matview_indexes() op.execute(text(_create_refresh_function())) - if pg_cron_available: - op.execute(text(_schedule_refresh_job())) def downgrade() -> None: - bind = op.get_bind() - pg_cron_available = bind.execute( - text( - "SELECT EXISTS (" - "SELECT 1 FROM pg_available_extensions WHERE name = 'pg_cron'" - ")" - ) - ).scalar() - if pg_cron_available: - op.execute(text(_unschedule_refresh_job())) op.execute(text(f"DROP FUNCTION IF EXISTS public.{REFRESH_FUNCTION_NAME}()")) _drop_view_or_materialized_view("ogc_avg_tds_wells") _drop_view_or_materialized_view("ogc_latest_depth_to_water_wells") diff --git a/alembic/versions/i2b3c4d5e6f7_add_latest_tds_pygeoapi_materialized_view.py b/alembic/versions/i2b3c4d5e6f7_add_latest_tds_pygeoapi_materialized_view.py new file mode 100644 index 00000000..d5f3d7d7 --- /dev/null +++ b/alembic/versions/i2b3c4d5e6f7_add_latest_tds_pygeoapi_materialized_view.py @@ -0,0 +1,229 @@ +"""add latest tds pygeoapi materialized view + +Revision ID: i2b3c4d5e6f7 +Revises: d5e6f7a8b9c0 +Create Date: 2026-03-02 11:00:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import inspect, text + +# revision identifiers, used by Alembic. +revision: str = "i2b3c4d5e6f7" +down_revision: Union[str, Sequence[str], None] = "d5e6f7a8b9c0" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +LATEST_LOCATION_CTE = """ +SELECT DISTINCT ON (lta.thing_id) + lta.thing_id, + lta.location_id, + lta.effective_start +FROM location_thing_association AS lta +WHERE lta.effective_end IS NULL +ORDER BY lta.thing_id, lta.effective_start DESC +""".strip() + + +def _create_latest_tds_view() -> str: + return f""" + CREATE VIEW ogc_latest_tds_wells AS + WITH latest_location AS ( +{LATEST_LOCATION_CTE} + ), + tds_obs AS ( + SELECT + csi.thing_id, + mc.id AS major_chemistry_id, + COALESCE(mc."AnalysisDate", csi."CollectionDate") AS observation_datetime, + mc."SampleValue" AS sample_value, + mc."Units" AS units + FROM "NMA_MajorChemistry" AS mc + JOIN "NMA_Chemistry_SampleInfo" AS csi + ON csi.id = mc.chemistry_sample_info_id + JOIN thing AS t ON t.id = csi.thing_id + WHERE + t.thing_type = 'water well' + AND mc."SampleValue" IS NOT NULL + AND ( + lower(coalesce(mc."Analyte", '')) IN ( + 'tds', + 'total dissolved solids' + ) + OR lower(coalesce(mc."Symbol", '')) = 'tds' + ) + ), + ranked_tds AS ( + SELECT + to2.thing_id, + to2.major_chemistry_id, + to2.observation_datetime, + to2.sample_value, + to2.units, + ROW_NUMBER() OVER ( + PARTITION BY to2.thing_id + ORDER BY to2.observation_datetime DESC NULLS LAST, to2.major_chemistry_id DESC + ) AS rn + FROM tds_obs AS to2 + ) + SELECT + t.id AS id, + t.name, + t.thing_type, + rt.major_chemistry_id, + rt.observation_datetime::date AS latest_tds_observation_date, + rt.sample_value AS latest_tds_value, + rt.units AS latest_tds_units, + l.point + FROM ranked_tds AS rt + JOIN thing AS t ON t.id = rt.thing_id + JOIN latest_location AS ll ON ll.thing_id = t.id + JOIN location AS l ON l.id = ll.location_id + WHERE rt.rn = 1 + """ + + +def _create_avg_tds_view() -> str: + return f""" + CREATE MATERIALIZED VIEW ogc_avg_tds_wells AS + WITH latest_location AS ( +{LATEST_LOCATION_CTE} + ), + tds_obs AS ( + SELECT + csi.thing_id, + mc.id AS major_chemistry_id, + COALESCE(mc."AnalysisDate", csi."CollectionDate")::date AS observation_date, + mc."SampleValue" AS sample_value, + mc."Units" AS units + FROM "NMA_MajorChemistry" AS mc + JOIN "NMA_Chemistry_SampleInfo" AS csi + ON csi.id = mc.chemistry_sample_info_id + JOIN thing AS t ON t.id = csi.thing_id + WHERE + t.thing_type = 'water well' + AND mc."SampleValue" IS NOT NULL + AND ( + lower(coalesce(mc."Analyte", '')) IN ( + 'tds', + 'total dissolved solids' + ) + OR lower(coalesce(mc."Symbol", '')) = 'tds' + ) + ) + SELECT + t.id AS id, + t.name, + t.thing_type, + COUNT(to2.major_chemistry_id)::integer AS tds_observation_count, + AVG(to2.sample_value)::double precision AS avg_tds_value, + MIN(to2.observation_date) AS first_tds_observation_date, + MAX(to2.observation_date) AS last_tds_observation_date, + l.point + FROM tds_obs AS to2 + JOIN thing AS t ON t.id = to2.thing_id + JOIN latest_location AS ll ON ll.thing_id = t.id + JOIN location AS l ON l.id = ll.location_id + GROUP BY t.id, t.name, t.thing_type, l.point + """ + + +def _create_avg_tds_view_with_datetime_columns() -> str: + return f""" + CREATE MATERIALIZED VIEW ogc_avg_tds_wells AS + WITH latest_location AS ( +{LATEST_LOCATION_CTE} + ), + tds_obs AS ( + SELECT + csi.thing_id, + mc.id AS major_chemistry_id, + mc."AnalysisDate" AS analysis_date, + mc."SampleValue" AS sample_value, + mc."Units" AS units + FROM "NMA_MajorChemistry" AS mc + JOIN "NMA_Chemistry_SampleInfo" AS csi + ON csi.id = mc.chemistry_sample_info_id + JOIN thing AS t ON t.id = csi.thing_id + WHERE + t.thing_type = 'water well' + AND mc."SampleValue" IS NOT NULL + AND ( + lower(coalesce(mc."Analyte", '')) IN ( + 'tds', + 'total dissolved solids' + ) + OR lower(coalesce(mc."Symbol", '')) = 'tds' + ) + ) + SELECT + t.id AS id, + t.name, + t.thing_type, + COUNT(to2.major_chemistry_id)::integer AS tds_observation_count, + AVG(to2.sample_value)::double precision AS avg_tds_value, + MIN(to2.analysis_date::date) AS first_tds_observation_date, + MAX(to2.analysis_date::date) AS last_tds_observation_date, + l.point + FROM tds_obs AS to2 + JOIN thing AS t ON t.id = to2.thing_id + JOIN latest_location AS ll ON ll.thing_id = t.id + JOIN location AS l ON l.id = ll.location_id + GROUP BY t.id, t.name, t.thing_type, l.point + """ + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + existing_tables = set(inspector.get_table_names(schema="public")) + required_tds = { + "NMA_MajorChemistry", + "NMA_Chemistry_SampleInfo", + "thing", + "location", + "location_thing_association", + } + + if not required_tds.issubset(existing_tables): + missing_tds_tables = sorted(t for t in required_tds if t not in existing_tables) + missing_tds_tables_str = ", ".join(missing_tds_tables) + raise RuntimeError( + "Cannot create TDS views. The following required " + f"tables are missing: {missing_tds_tables_str}" + ) + + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_avg_tds_wells")) + op.execute(text("DROP VIEW IF EXISTS ogc_latest_tds_wells")) + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_latest_tds_wells")) + + op.execute(text(_create_avg_tds_view())) + op.execute( + text( + "COMMENT ON MATERIALIZED VIEW ogc_avg_tds_wells IS " + "'Average TDS per well from major chemistry results for pygeoapi.'" + ) + ) + op.execute( + text("CREATE UNIQUE INDEX ux_ogc_avg_tds_wells_id " "ON ogc_avg_tds_wells (id)") + ) + + op.execute(text(_create_latest_tds_view())) + op.execute( + text( + "COMMENT ON VIEW ogc_latest_tds_wells IS " + "'Latest TDS per well from major chemistry results for pygeoapi.'" + ) + ) + + +def downgrade() -> None: + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_avg_tds_wells")) + op.execute(text("DROP VIEW IF EXISTS ogc_latest_tds_wells")) + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_latest_tds_wells")) + op.execute(text(_create_avg_tds_view_with_datetime_columns())) + op.execute( + text("CREATE UNIQUE INDEX ux_ogc_avg_tds_wells_id " "ON ogc_avg_tds_wells (id)") + ) diff --git a/alembic/versions/k4d5e6f7a8b9_add_depth_to_water_trend_materialized_view.py b/alembic/versions/k4d5e6f7a8b9_add_depth_to_water_trend_materialized_view.py new file mode 100644 index 00000000..8180aaab --- /dev/null +++ b/alembic/versions/k4d5e6f7a8b9_add_depth_to_water_trend_materialized_view.py @@ -0,0 +1,132 @@ +"""add depth to water trend materialized view + +Revision ID: k4d5e6f7a8b9 +Revises: i2b3c4d5e6f7 +Create Date: 2026-03-02 19:15:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import inspect, text + +# revision identifiers, used by Alembic. +revision: str = "k4d5e6f7a8b9" +down_revision: Union[str, Sequence[str], None] = "i2b3c4d5e6f7" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +LATEST_LOCATION_CTE = """ +SELECT DISTINCT ON (lta.thing_id) + lta.thing_id, + lta.location_id, + lta.effective_start +FROM location_thing_association AS lta +WHERE lta.effective_end IS NULL +ORDER BY lta.thing_id, lta.effective_start DESC +""".strip() + + +def _create_depth_to_water_trend_view() -> str: + return f""" + CREATE MATERIALIZED VIEW ogc_depth_to_water_trend_wells AS + WITH latest_location AS ( +{LATEST_LOCATION_CTE} + ), + obs AS ( + SELECT + fe.thing_id, + o.observation_datetime, + (o.value - COALESCE(o.measuring_point_height, 0)) AS depth_to_water_bgs + FROM observation AS o + JOIN sample AS s ON s.id = o.sample_id + JOIN field_activity AS fa ON fa.id = s.field_activity_id + JOIN field_event AS fe ON fe.id = fa.field_event_id + JOIN thing AS t ON t.id = fe.thing_id + WHERE + t.thing_type = 'water well' + AND fa.activity_type = 'groundwater level' + AND o.value IS NOT NULL + AND o.observation_datetime IS NOT NULL + ), + agg AS ( + SELECT + ob.thing_id, + COUNT(*)::integer AS record_count, + MIN(ob.observation_datetime) AS first_observation_datetime, + MAX(ob.observation_datetime) AS last_observation_datetime, + EXTRACT(EPOCH FROM (MAX(ob.observation_datetime) - MIN(ob.observation_datetime))) + / 31557600.0 AS span_years, + REGR_SLOPE( + ob.depth_to_water_bgs, + EXTRACT(EPOCH FROM ob.observation_datetime) + ) * 31557600.0 AS slope_ft_per_year + FROM obs AS ob + GROUP BY ob.thing_id + ) + SELECT + t.id AS id, + t.name, + t.thing_type, + a.record_count, + a.first_observation_datetime, + a.last_observation_datetime, + a.span_years, + a.slope_ft_per_year, + CASE + WHEN a.record_count >= 10 OR (a.record_count >= 4 AND a.span_years >= 2.0) THEN + CASE + WHEN a.slope_ft_per_year IS NULL THEN 'not enough data' + WHEN a.slope_ft_per_year > 0.25 THEN 'increasing' + WHEN a.slope_ft_per_year < -0.25 THEN 'decreasing' + ELSE 'stable' + END + ELSE 'not enough data' + END AS trend_category, + l.point + FROM agg AS a + JOIN thing AS t ON t.id = a.thing_id + JOIN latest_location AS ll ON ll.thing_id = t.id + JOIN location AS l ON l.id = ll.location_id + """ + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + existing_tables = set(inspector.get_table_names(schema="public")) + required_tables = { + "thing", + "location", + "location_thing_association", + "observation", + "sample", + "field_activity", + "field_event", + } + + if not required_tables.issubset(existing_tables): + missing = sorted(t for t in required_tables if t not in existing_tables) + raise RuntimeError( + "Cannot create ogc_depth_to_water_trend_wells. Missing required tables: " + + ", ".join(missing) + ) + + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_depth_to_water_trend_wells")) + op.execute(text(_create_depth_to_water_trend_view())) + op.execute( + text( + "COMMENT ON MATERIALIZED VIEW ogc_depth_to_water_trend_wells IS " + "'Depth-to-water trend classification for water wells.'" + ) + ) + op.execute( + text( + "CREATE UNIQUE INDEX ux_ogc_depth_to_water_trend_wells_id " + "ON ogc_depth_to_water_trend_wells (id)" + ) + ) + + +def downgrade() -> None: + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_depth_to_water_trend_wells")) diff --git a/alembic/versions/l5e6f7a8b9c0_add_water_well_summary_materialized_view.py b/alembic/versions/l5e6f7a8b9c0_add_water_well_summary_materialized_view.py new file mode 100644 index 00000000..215be01b --- /dev/null +++ b/alembic/versions/l5e6f7a8b9c0_add_water_well_summary_materialized_view.py @@ -0,0 +1,157 @@ +"""add water well summary materialized view + +Revision ID: l5e6f7a8b9c0 +Revises: k4d5e6f7a8b9 +Create Date: 2026-03-02 20:05:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +from sqlalchemy import inspect, text + +# revision identifiers, used by Alembic. +revision: str = "l5e6f7a8b9c0" +down_revision: Union[str, Sequence[str], None] = "k4d5e6f7a8b9" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +LATEST_LOCATION_CTE = """ +SELECT DISTINCT ON (lta.thing_id) + lta.thing_id, + lta.location_id, + lta.effective_start +FROM location_thing_association AS lta +WHERE lta.effective_end IS NULL +ORDER BY lta.thing_id, lta.effective_start DESC +""".strip() + + +def _create_water_well_summary_view() -> str: + return f""" + CREATE MATERIALIZED VIEW ogc_water_well_summary AS + WITH latest_location AS ( +{LATEST_LOCATION_CTE} + ), + wl_obs AS ( + SELECT + fe.thing_id, + o.id AS observation_id, + o.observation_datetime, + (o.value - COALESCE(o.measuring_point_height, 0)) AS water_level + FROM observation AS o + JOIN sample AS s ON s.id = o.sample_id + JOIN field_activity AS fa ON fa.id = s.field_activity_id + JOIN field_event AS fe ON fe.id = fa.field_event_id + JOIN thing AS t ON t.id = fe.thing_id + WHERE + t.thing_type = 'water well' + AND fa.activity_type = 'groundwater level' + AND o.value IS NOT NULL + AND o.observation_datetime IS NOT NULL + ), + wl_agg AS ( + SELECT + w.thing_id, + COUNT(*)::integer AS total_water_levels, + MIN(w.water_level) AS min_water_level, + MAX(w.water_level) AS max_water_level, + REGR_SLOPE( + w.water_level, + EXTRACT(EPOCH FROM w.observation_datetime) + ) * 31557600.0 AS water_level_trend_ft_per_year + FROM wl_obs AS w + GROUP BY w.thing_id + ), + wl_last AS ( + SELECT + ranked.thing_id, + ranked.water_level AS last_water_level, + ranked.observation_datetime AS last_water_level_datetime + FROM ( + SELECT + w.thing_id, + w.water_level, + w.observation_datetime, + ROW_NUMBER() OVER ( + PARTITION BY w.thing_id + ORDER BY w.observation_datetime DESC, w.observation_id DESC + ) AS rn + FROM wl_obs AS w + ) AS ranked + WHERE ranked.rn = 1 + ) + SELECT + t.id AS id, + t.name, + t.well_depth, + l.elevation, + dpl.collection_method AS elevation_method, + t.nma_formation_zone AS formation_zone, + wa.total_water_levels, + wl.last_water_level, + wl.last_water_level_datetime, + wa.min_water_level, + wa.max_water_level, + wa.water_level_trend_ft_per_year, + l.point + FROM thing AS t + JOIN latest_location AS ll ON ll.thing_id = t.id + JOIN location AS l ON l.id = ll.location_id + JOIN wl_agg AS wa ON wa.thing_id = t.id + LEFT JOIN wl_last AS wl ON wl.thing_id = t.id + LEFT JOIN LATERAL ( + SELECT dp.collection_method + FROM data_provenance AS dp + WHERE + dp.target_table = 'location' + AND dp.target_id = l.id + AND dp.field_name = 'elevation' + ORDER BY dp.id DESC + LIMIT 1 + ) AS dpl ON true + WHERE t.thing_type = 'water well' + AND wa.total_water_levels > 0 + """ + + +def upgrade() -> None: + bind = op.get_bind() + inspector = inspect(bind) + existing_tables = set(inspector.get_table_names(schema="public")) + required_tables = { + "thing", + "location", + "location_thing_association", + "observation", + "sample", + "field_activity", + "field_event", + "data_provenance", + } + + if not required_tables.issubset(existing_tables): + missing = sorted(t for t in required_tables if t not in existing_tables) + raise RuntimeError( + "Cannot create ogc_water_well_summary. Missing required tables: " + + ", ".join(missing) + ) + + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_water_well_summary")) + op.execute(text(_create_water_well_summary_view())) + op.execute( + text( + "COMMENT ON MATERIALIZED VIEW ogc_water_well_summary IS " + "'Summary statistics for water wells including water-level trend.'" + ) + ) + op.execute( + text( + "CREATE UNIQUE INDEX ux_ogc_water_well_summary_id " + "ON ogc_water_well_summary (id)" + ) + ) + + +def downgrade() -> None: + op.execute(text("DROP MATERIALIZED VIEW IF EXISTS ogc_water_well_summary")) diff --git a/cli/cli.py b/cli/cli.py index 19b34cc9..09c20185 100644 --- a/cli/cli.py +++ b/cli/cli.py @@ -53,6 +53,8 @@ class SmokePopulation(str, Enum): PYGEOAPI_MATERIALIZED_VIEWS = ( "ogc_latest_depth_to_water_wells", "ogc_avg_tds_wells", + "ogc_depth_to_water_trend_wells", + "ogc_water_well_summary", ) diff --git a/core/initializers.py b/core/initializers.py index c3a32d6f..ba932b9b 100644 --- a/core/initializers.py +++ b/core/initializers.py @@ -13,8 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from pathlib import Path import os +from pathlib import Path from fastapi_pagination import add_pagination from sqlalchemy import text, select @@ -66,15 +66,6 @@ def erase_and_rebuild_db(): session.execute(text("DROP SCHEMA public CASCADE")) session.execute(text("CREATE SCHEMA public")) session.execute(text("CREATE EXTENSION IF NOT EXISTS postgis")) - pg_cron_available = session.execute( - text( - "SELECT EXISTS (" - "SELECT 1 FROM pg_available_extensions WHERE name = 'pg_cron'" - ")" - ) - ).scalar() - if pg_cron_available: - session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron")) session.commit() Base.metadata.drop_all(session.bind) Base.metadata.create_all(session.bind) diff --git a/core/pygeoapi-config.yml b/core/pygeoapi-config.yml index 4228b6cd..1a468b13 100644 --- a/core/pygeoapi-config.yml +++ b/core/pygeoapi-config.yml @@ -59,7 +59,7 @@ resources: latest_depth_to_water_wells: type: collection - title: Latest Depth to Water (Wells) + title: Latest Depth to Water (Water Wells) description: Most recent depth-to-water below ground surface observation for each water well. keywords: [water-wells, groundwater-level, depth-to-water-bgs, latest] extents: @@ -103,4 +103,73 @@ resources: table: ogc_avg_tds_wells geom_field: point + latest_tds_wells: + type: collection + title: Latest TDS (Water Wells) + description: Most recent total dissolved solids (TDS) result from major chemistry for each water well. + keywords: [water-wells, chemistry, tds, total-dissolved-solids, latest] + extents: + spatial: + bbox: [-109.05, 31.33, -103.00, 37.00] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + providers: + - type: feature + name: PostgreSQL + data: + host: {postgres_host} + port: {postgres_port} + dbname: {postgres_db} + user: {postgres_user} + password: {postgres_password_env} + search_path: [public] + id_field: id + table: ogc_latest_tds_wells + geom_field: point + + depth_to_water_trend_wells: + type: collection + title: Depth to Water Trend (Water Wells) + description: Trend classification for depth to water based on slope in feet per year. + keywords: [water-wells, groundwater-level, depth-to-water, trend, slope] + extents: + spatial: + bbox: [-109.05, 31.33, -103.00, 37.00] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + providers: + - type: feature + name: PostgreSQL + data: + host: {postgres_host} + port: {postgres_port} + dbname: {postgres_db} + user: {postgres_user} + password: {postgres_password_env} + search_path: [public] + id_field: id + table: ogc_depth_to_water_trend_wells + geom_field: point + + water_well_summary: + type: collection + title: Water Well Summary + description: Summary metrics per water well, including latest, min/max, and trend for water levels. + keywords: [water-wells, summary, groundwater-level, trend] + extents: + spatial: + bbox: [-109.05, 31.33, -103.00, 37.00] + crs: http://www.opengis.net/def/crs/OGC/1.3/CRS84 + providers: + - type: feature + name: PostgreSQL + data: + host: {postgres_host} + port: {postgres_port} + dbname: {postgres_db} + user: {postgres_user} + password: {postgres_password_env} + search_path: [public] + id_field: id + table: ogc_water_well_summary + geom_field: point + {thing_collections_block} diff --git a/core/pygeoapi.py b/core/pygeoapi.py index 33fa09d7..6c679a21 100644 --- a/core/pygeoapi.py +++ b/core/pygeoapi.py @@ -265,14 +265,28 @@ def _thing_collections_block( def _pygeoapi_db_settings() -> tuple[str, str, str, str, str]: - host = (os.environ.get("PYGEOAPI_POSTGRES_HOST") or "").strip() or "127.0.0.1" - port = (os.environ.get("PYGEOAPI_POSTGRES_PORT") or "").strip() or "5432" - dbname = (os.environ.get("PYGEOAPI_POSTGRES_DB") or "").strip() or "postgres" - user = (os.environ.get("PYGEOAPI_POSTGRES_USER") or "").strip() + host = ( + (os.environ.get("PYGEOAPI_POSTGRES_HOST") or "").strip() + or (os.environ.get("POSTGRES_HOST") or "").strip() + or "127.0.0.1" + ) + port = ( + (os.environ.get("PYGEOAPI_POSTGRES_PORT") or "").strip() + or (os.environ.get("POSTGRES_PORT") or "").strip() + or "5432" + ) + dbname = ( + (os.environ.get("PYGEOAPI_POSTGRES_DB") or "").strip() + or (os.environ.get("POSTGRES_DB") or "").strip() + or "postgres" + ) + user = (os.environ.get("PYGEOAPI_POSTGRES_USER") or "").strip() or ( + os.environ.get("POSTGRES_USER") or "" + ).strip() if not user: raise RuntimeError( - "PYGEOAPI_POSTGRES_USER must be set and non-empty to generate the " - "pygeoapi configuration." + "PYGEOAPI_POSTGRES_USER or POSTGRES_USER must be set and non-empty " + "to generate the pygeoapi configuration." ) if os.environ.get("PYGEOAPI_POSTGRES_PASSWORD") is None: raise RuntimeError( @@ -310,17 +324,17 @@ def _write_config(path: Path) -> None: # * Do not expose it in logs, error messages, or diagnostics. # * Ensure filesystem permissions restrict access appropriately. path.write_text(config, encoding="utf-8") + path.chmod(0o600) -def _generate_openapi(_config_path: Path, openapi_path: Path) -> None: - openapi = f"""openapi: 3.0.2 -info: - title: Ocotillo OGC API - version: 1.0.0 -servers: - - url: {_server_url()} -paths: {{}} -""" +def _generate_openapi(config_path: Path, openapi_path: Path) -> None: + from pygeoapi.openapi import generate_openapi_document + + # Avoid startup failures when backing tables are not yet present; pygeoapi + # will skip invalid collections and still emit a standards-compliant spec. + openapi = generate_openapi_document( + config_path, "yaml", fail_on_invalid_collection=False + ) openapi_path.write_text(openapi, encoding="utf-8") diff --git a/db/initialization.py b/db/initialization.py index d44853be..262241fe 100644 --- a/db/initialization.py +++ b/db/initialization.py @@ -62,15 +62,6 @@ def recreate_public_schema(session: Session) -> None: session.execute(text("DROP SCHEMA public CASCADE")) session.execute(text("CREATE SCHEMA public")) session.execute(text("CREATE EXTENSION IF NOT EXISTS postgis")) - pg_cron_available = session.execute( - text( - "SELECT EXISTS (" - "SELECT 1 FROM pg_available_extensions WHERE name = 'pg_cron'" - ")" - ) - ).scalar() - if pg_cron_available: - session.execute(text("CREATE EXTENSION IF NOT EXISTS pg_cron")) session.execute(APP_READ_GRANT_SQL) grant_app_read_members(session) session.commit() diff --git a/docker-compose.yml b/docker-compose.yml index 5b82575a..9eb88baf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,10 +6,6 @@ services: context: . dockerfile: ./docker/db/Dockerfile platform: linux/amd64 - command: > - postgres - -c shared_preload_libraries=pg_cron - -c cron.database_name=${POSTGRES_DB} environment: - POSTGRES_USER=${POSTGRES_USER} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} diff --git a/docker/db/Dockerfile b/docker/db/Dockerfile index 57f2f8ea..4a1fbd51 100644 --- a/docker/db/Dockerfile +++ b/docker/db/Dockerfile @@ -1,5 +1 @@ FROM postgis/postgis:17-3.5 - -RUN apt-get update \ - && apt-get install -y --no-install-recommends postgresql-17-cron \ - && rm -rf /var/lib/apt/lists/* diff --git a/tests/test_cli_commands.py b/tests/test_cli_commands.py index 8a89be83..0673d8ba 100644 --- a/tests/test_cli_commands.py +++ b/tests/test_cli_commands.py @@ -56,9 +56,11 @@ def __exit__(self, exc_type, exc, tb): assert executed_sql == [ "REFRESH MATERIALIZED VIEW ogc_latest_depth_to_water_wells", "REFRESH MATERIALIZED VIEW ogc_avg_tds_wells", + "REFRESH MATERIALIZED VIEW ogc_depth_to_water_trend_wells", + "REFRESH MATERIALIZED VIEW ogc_water_well_summary", ] assert commit_called["value"] is True - assert "Refreshed 2 materialized view(s)." in result.output + assert "Refreshed 4 materialized view(s)." in result.output def test_refresh_pygeoapi_materialized_views_custom_and_concurrently(monkeypatch): diff --git a/tests/test_ogc.py b/tests/test_ogc.py index 364d0066..e243c90b 100644 --- a/tests/test_ogc.py +++ b/tests/test_ogc.py @@ -13,9 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -import pytest +from datetime import datetime from importlib.util import find_spec +import pytest +from sqlalchemy import text + from core.dependencies import ( admin_function, editor_function, @@ -24,6 +27,8 @@ viewer_function, amp_viewer_function, ) +from db import NMA_Chemistry_SampleInfo, NMA_MajorChemistry +from db.engine import session_ctx from main import app from tests import client, override_authentication @@ -71,12 +76,145 @@ def test_ogc_conformance(): assert any("ogcapi-features" in item for item in payload["conformsTo"]) +def test_ogc_openapi_has_paths(): + response = client.get("/ogcapi/openapi?f=json") + assert response.status_code == 200 + payload = response.json() + assert payload["openapi"].startswith("3.") + assert payload["paths"] + assert "/collections" in payload["paths"] + + +def test_latest_tds_observation_date_falls_back_to_collection_date(water_well_thing): + with session_ctx() as session: + csi = NMA_Chemistry_SampleInfo( + thing_id=water_well_thing.id, + nma_sample_point_id="TDS-FALLBK", + collection_date=datetime(2024, 1, 15, 12, 30, 0), + ) + session.add(csi) + session.flush() + + mc = NMA_MajorChemistry( + chemistry_sample_info_id=csi.id, + analyte="Total Dissolved Solids", + symbol="TDS", + sample_value=500.0, + units="mg/L", + analysis_date=None, + ) + session.add(mc) + session.commit() + + session.execute(text("REFRESH MATERIALIZED VIEW ogc_avg_tds_wells")) + session.commit() + + latest_dt = session.execute( + text( + "SELECT latest_tds_observation_date " + "FROM ogc_latest_tds_wells WHERE id = :thing_id" + ), + {"thing_id": water_well_thing.id}, + ).scalar_one() + avg_range = session.execute( + text( + "SELECT first_tds_observation_date, last_tds_observation_date " + "FROM ogc_avg_tds_wells WHERE id = :thing_id" + ), + {"thing_id": water_well_thing.id}, + ).one() + + assert latest_dt is not None + assert latest_dt.isoformat() == "2024-01-15" + assert avg_range.first_tds_observation_date is not None + assert avg_range.last_tds_observation_date is not None + assert avg_range.first_tds_observation_date.isoformat() == "2024-01-15" + assert avg_range.last_tds_observation_date.isoformat() == "2024-01-15" + + session.delete(mc) + session.delete(csi) + session.commit() + session.execute(text("REFRESH MATERIALIZED VIEW ogc_avg_tds_wells")) + session.commit() + + +def test_latest_tds_uses_latest_timestamp_within_same_day(water_well_thing): + with session_ctx() as session: + csi = NMA_Chemistry_SampleInfo( + thing_id=water_well_thing.id, + nma_sample_point_id="TDS-TIME", + collection_date=datetime(2024, 2, 1, 9, 0, 0), + ) + session.add(csi) + session.flush() + + mc_early = NMA_MajorChemistry( + chemistry_sample_info_id=csi.id, + analyte="Total Dissolved Solids", + symbol="TDS", + sample_value=300.0, + units="mg/L", + analysis_date=datetime(2024, 2, 1, 8, 0, 0), + ) + mc_late = NMA_MajorChemistry( + chemistry_sample_info_id=csi.id, + analyte="Total Dissolved Solids", + symbol="TDS", + sample_value=700.0, + units="mg/L", + analysis_date=datetime(2024, 2, 1, 18, 0, 0), + ) + session.add(mc_early) + session.add(mc_late) + session.commit() + + session.execute(text("REFRESH MATERIALIZED VIEW ogc_avg_tds_wells")) + session.commit() + + row = session.execute( + text( + "SELECT latest_tds_observation_date, latest_tds_value " + "FROM ogc_latest_tds_wells WHERE id = :thing_id" + ), + {"thing_id": water_well_thing.id}, + ).one() + + assert row.latest_tds_observation_date.isoformat() == "2024-02-01" + assert float(row.latest_tds_value) == 700.0 + + session.delete(mc_late) + session.delete(mc_early) + session.delete(csi) + session.commit() + session.execute(text("REFRESH MATERIALIZED VIEW ogc_avg_tds_wells")) + session.commit() + + def test_ogc_collections(): response = client.get("/ogcapi/collections") assert response.status_code == 200 payload = response.json() ids = {collection["id"] for collection in payload["collections"]} - assert {"locations", "water_wells", "springs"}.issubset(ids) + assert { + "locations", + "water_wells", + "springs", + "latest_tds_wells", + "depth_to_water_trend_wells", + "water_well_summary", + }.issubset(ids) + + +def test_ogc_new_collection_items_endpoints(): + for collection_id in ( + "latest_tds_wells", + "depth_to_water_trend_wells", + "water_well_summary", + ): + response = client.get(f"/ogcapi/collections/{collection_id}/items?limit=10") + assert response.status_code == 200 + payload = response.json() + assert payload["type"] == "FeatureCollection" @pytest.mark.skip("PostGIS spatial operators not available in CI - see issue #449")