From 9db59ea45c6fe592fddc819cd2ad1343995de0f6 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Fri, 6 Feb 2026 22:17:11 +0100 Subject: [PATCH] cover more edge cases --- .../0100_function_archive_task_message.sql | 23 +++ ...00_function_cascade_resolve_conditions.sql | 37 +++++ .../schemas/0100_function_complete_task.sql | 8 +- .../0100_function_create_flow_from_shape.sql | 4 +- pkgs/core/schemas/0100_function_fail_task.sql | 84 +++++----- pkgs/core/src/database-types.ts | 4 + .../20260206115746_pgflow_step_conditions.sql | 148 +++++++++++++----- pkgs/core/supabase/migrations/atlas.sum | 4 +- ...met_fail_archives_active_messages.test.sql | 79 ++++++++++ ...t_unmet_fail_emits_failure_events.test.sql | 88 +++++++++++ .../missing_condition_modes_defaults.test.sql | 67 ++++++++ ...ayed_fail_task_skip_is_idempotent.test.sql | 121 ++++++++++++++ ...il_archives_parent_failed_message.test.sql | 79 ++++++++++ ...tion_fail_returns_failed_task_row.test.sql | 89 +++++++++++ ...conditions_before_start_fail_mode.test.sql | 87 ++++++++++ ...conditions_before_start_skip_mode.test.sql | 87 ++++++++++ ...en_taskless_map_starts_downstream.test.sql | 101 ++++++++++++ ...type_violation_returns_failed_row.test.sql | 83 ++++++++++ .../docs/build/conditional-steps/index.mdx | 17 +- .../conditional-steps/pattern-matching.mdx | 6 +- .../build/conditional-steps/skip-modes.mdx | 9 +- 21 files changed, 1128 insertions(+), 97 deletions(-) create mode 100644 pkgs/core/schemas/0100_function_archive_task_message.sql create mode 100644 pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql create mode 100644 pkgs/core/supabase/tests/condition_evaluation/root_unmet_fail_emits_failure_events.test.sql create mode 100644 pkgs/core/supabase/tests/create_flow_from_shape/missing_condition_modes_defaults.test.sql create mode 100644 pkgs/core/supabase/tests/fail_task_when_exhausted/replayed_fail_task_skip_is_idempotent.test.sql create mode 100644 pkgs/core/supabase/tests/fail_task_when_exhausted/skip_condition_fail_archives_parent_failed_message.test.sql create mode 100644 pkgs/core/supabase/tests/fail_task_when_exhausted/skip_condition_fail_returns_failed_task_row.test.sql create mode 100644 pkgs/core/supabase/tests/fail_task_when_exhausted/skip_resolves_dependent_conditions_before_start_fail_mode.test.sql create mode 100644 pkgs/core/supabase/tests/fail_task_when_exhausted/skip_resolves_dependent_conditions_before_start_skip_mode.test.sql create mode 100644 pkgs/core/supabase/tests/fail_task_when_exhausted/skip_then_taskless_map_starts_downstream.test.sql create mode 100644 pkgs/core/supabase/tests/type_violations/complete_task_type_violation_returns_failed_row.test.sql diff --git a/pkgs/core/schemas/0100_function_archive_task_message.sql b/pkgs/core/schemas/0100_function_archive_task_message.sql new file mode 100644 index 000000000..3ed804114 --- /dev/null +++ b/pkgs/core/schemas/0100_function_archive_task_message.sql @@ -0,0 +1,23 @@ +create or replace function pgflow._archive_task_message( + p_run_id uuid, + p_step_slug text, + p_task_index int +) +returns void +language sql +volatile +set search_path to '' +as $$ + SELECT pgmq.archive( + r.flow_slug, + ARRAY_AGG(st.message_id) + ) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = p_run_id + AND st.step_slug = p_step_slug + AND st.task_index = p_task_index + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; +$$; diff --git a/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql b/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql index 2a86d4861..8a953f179 100644 --- a/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql +++ b/pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql @@ -116,6 +116,43 @@ BEGIN failed_at = now() WHERE pgflow.runs.run_id = cascade_resolve_conditions.run_id; + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:failed', + 'run_id', cascade_resolve_conditions.run_id, + 'step_slug', v_first_fail.step_slug, + 'status', 'failed', + 'error_message', 'Condition not met', + 'failed_at', now() + ), + concat('step:', v_first_fail.step_slug, ':failed'), + concat('pgflow:run:', cascade_resolve_conditions.run_id), + false + ); + + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:failed', + 'run_id', cascade_resolve_conditions.run_id, + 'flow_slug', v_first_fail.flow_slug, + 'status', 'failed', + 'error_message', 'Condition not met', + 'failed_at', now() + ), + 'run:failed', + concat('pgflow:run:', cascade_resolve_conditions.run_id), + false + ); + + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = cascade_resolve_conditions.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; + RETURN false; END IF; diff --git a/pkgs/core/schemas/0100_function_complete_task.sql b/pkgs/core/schemas/0100_function_complete_task.sql index 521e8a3af..f4d5242d4 100644 --- a/pkgs/core/schemas/0100_function_complete_task.sql +++ b/pkgs/core/schemas/0100_function_complete_task.sql @@ -146,8 +146,12 @@ IF v_dependent_map_slug IS NOT NULL THEN AND st.task_index = complete_task.task_index AND st.message_id IS NOT NULL; - -- Return empty result - RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false; + -- Return the failed task row (API contract: always return task row) + RETURN QUERY + SELECT * FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index; RETURN; END IF; diff --git a/pkgs/core/schemas/0100_function_create_flow_from_shape.sql b/pkgs/core/schemas/0100_function_create_flow_from_shape.sql index c9501921f..a6006432d 100644 --- a/pkgs/core/schemas/0100_function_create_flow_from_shape.sql +++ b/pkgs/core/schemas/0100_function_create_flow_from_shape.sql @@ -48,8 +48,8 @@ BEGIN timeout => (v_step_options->>'timeout')::int, start_delay => (v_step_options->>'startDelay')::int, step_type => v_step->>'stepType', - when_unmet => v_step->>'whenUnmet', - when_exhausted => v_step->>'whenExhausted', + when_unmet => COALESCE(v_step->>'whenUnmet', 'skip'), + when_exhausted => COALESCE(v_step->>'whenExhausted', 'fail'), required_input_pattern => CASE WHEN (v_step->'requiredInputPattern'->>'defined')::boolean THEN v_step->'requiredInputPattern'->'value' diff --git a/pkgs/core/schemas/0100_function_fail_task.sql b/pkgs/core/schemas/0100_function_fail_task.sql index e926dbd18..9eac65d60 100644 --- a/pkgs/core/schemas/0100_function_fail_task.sql +++ b/pkgs/core/schemas/0100_function_fail_task.sql @@ -140,45 +140,42 @@ maybe_fail_step AS ( WHERE pgflow.step_states.run_id = fail_task.run_id AND pgflow.step_states.step_slug = fail_task.step_slug RETURNING pgflow.step_states.* +), +run_update AS ( + -- Update run status: only fail when when_exhausted='fail' and step was failed + UPDATE pgflow.runs + SET status = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' + ELSE status + END, + failed_at = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN now() + ELSE NULL + END, + -- Decrement remaining_steps when step was skipped (not failed, run continues) + remaining_steps = CASE + WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1 + ELSE pgflow.runs.remaining_steps + END + WHERE pgflow.runs.run_id = fail_task.run_id + RETURNING pgflow.runs.status ) - -- Update run status: only fail when when_exhausted='fail' and step was failed -UPDATE pgflow.runs -SET status = CASE - WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' - ELSE status - END, - failed_at = CASE - WHEN (select status from maybe_fail_step) = 'failed' THEN now() - ELSE NULL - END, - -- Decrement remaining_steps when step was skipped (not failed, run continues) - remaining_steps = CASE - WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1 - ELSE pgflow.runs.remaining_steps - END -WHERE pgflow.runs.run_id = fail_task.run_id -RETURNING (status = 'failed') INTO v_run_failed; +SELECT + COALESCE((SELECT status = 'failed' FROM run_update), false), + COALESCE((SELECT status = 'failed' FROM maybe_fail_step), false), + COALESCE((SELECT status = 'skipped' FROM maybe_fail_step), false), + COALESCE((SELECT is_exhausted FROM task_status), false) +INTO v_run_failed, v_step_failed, v_step_skipped, v_task_exhausted; - -- Capture when_exhausted mode and check if step was skipped for later processing + -- Capture when_exhausted mode for later skip handling SELECT s.when_exhausted INTO v_when_exhausted FROM pgflow.steps s JOIN pgflow.runs r ON r.flow_slug = s.flow_slug -WHERE r.run_id = fail_task.run_id - AND s.step_slug = fail_task.step_slug; - -SELECT (status = 'skipped') INTO v_step_skipped -FROM pgflow.step_states -WHERE pgflow.step_states.run_id = fail_task.run_id - AND pgflow.step_states.step_slug = fail_task.step_slug; - --- Check if step failed by querying the step_states table -SELECT (status = 'failed') INTO v_step_failed -FROM pgflow.step_states -WHERE pgflow.step_states.run_id = fail_task.run_id - AND pgflow.step_states.step_slug = fail_task.step_slug; + WHERE r.run_id = fail_task.run_id + AND s.step_slug = fail_task.step_slug; -- Send broadcast event for step failure if the step was failed -IF v_step_failed THEN +IF v_task_exhausted AND v_step_failed THEN PERFORM realtime.send( jsonb_build_object( 'event_type', 'step:failed', @@ -194,8 +191,8 @@ IF v_step_failed THEN ); END IF; - -- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade') - IF v_step_skipped THEN +-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade') + IF v_task_exhausted AND v_step_skipped THEN -- Send broadcast event for step skipped PERFORM realtime.send( jsonb_build_object( @@ -237,11 +234,26 @@ END IF; AND dep.dep_slug = fail_task.step_slug AND child_state.step_slug = dep.step_slug; - -- Start any steps that became ready after decrementing remaining_deps - PERFORM pgflow.start_ready_steps(fail_task.run_id); + -- Evaluate conditions on newly-ready dependent steps + -- This must happen before cascade_complete_taskless_steps so that + -- skipped steps can set initial_tasks=0 for their map dependents + IF NOT pgflow.cascade_resolve_conditions(fail_task.run_id) THEN + -- Run was failed due to a condition with when_unmet='fail' + -- Archive the failed task's message before returning + PERFORM pgflow._archive_task_message(fail_task.run_id, fail_task.step_slug, fail_task.task_index); + -- Return the task row (API contract) + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = fail_task.run_id + AND pgflow.step_tasks.step_slug = fail_task.step_slug + AND pgflow.step_tasks.task_index = fail_task.task_index; + RETURN; + END IF; -- Auto-complete taskless steps (e.g., map steps with initial_tasks=0 from skipped dep) PERFORM pgflow.cascade_complete_taskless_steps(fail_task.run_id); + + -- Start steps that became ready after condition resolution and taskless completion + PERFORM pgflow.start_ready_steps(fail_task.run_id); END IF; -- Try to complete the run (remaining_steps may now be 0) diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index 8252adb2a..7d7ed85b9 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -406,6 +406,10 @@ export type Database = { [_ in never]: never } Functions: { + _archive_task_message: { + Args: { p_run_id: string; p_step_slug: string; p_task_index: number } + Returns: undefined + } _cascade_force_skip_steps: { Args: { run_id: string; skip_reason: string; step_slug: string } Returns: number diff --git a/pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql b/pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql index 7550a3c86..809d2e18e 100644 --- a/pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql +++ b/pkgs/core/supabase/migrations/20260206115746_pgflow_step_conditions.sql @@ -273,8 +273,8 @@ BEGIN timeout => (v_step_options->>'timeout')::int, start_delay => (v_step_options->>'startDelay')::int, step_type => v_step->>'stepType', - when_unmet => v_step->>'whenUnmet', - when_exhausted => v_step->>'whenExhausted', + when_unmet => COALESCE(v_step->>'whenUnmet', 'skip'), + when_exhausted => COALESCE(v_step->>'whenExhausted', 'fail'), required_input_pattern => CASE WHEN (v_step->'requiredInputPattern'->>'defined')::boolean THEN v_step->'requiredInputPattern'->'value' @@ -533,6 +533,43 @@ BEGIN failed_at = now() WHERE pgflow.runs.run_id = cascade_resolve_conditions.run_id; + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'step:failed', + 'run_id', cascade_resolve_conditions.run_id, + 'step_slug', v_first_fail.step_slug, + 'status', 'failed', + 'error_message', 'Condition not met', + 'failed_at', now() + ), + concat('step:', v_first_fail.step_slug, ':failed'), + concat('pgflow:run:', cascade_resolve_conditions.run_id), + false + ); + + PERFORM realtime.send( + jsonb_build_object( + 'event_type', 'run:failed', + 'run_id', cascade_resolve_conditions.run_id, + 'flow_slug', v_first_fail.flow_slug, + 'status', 'failed', + 'error_message', 'Condition not met', + 'failed_at', now() + ), + 'run:failed', + concat('pgflow:run:', cascade_resolve_conditions.run_id), + false + ); + + PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id)) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = cascade_resolve_conditions.run_id + AND st.status IN ('queued', 'started') + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; + RETURN false; END IF; @@ -958,8 +995,12 @@ IF v_dependent_map_slug IS NOT NULL THEN AND st.task_index = complete_task.task_index AND st.message_id IS NOT NULL; - -- Return empty result - RETURN QUERY SELECT * FROM pgflow.step_tasks WHERE false; + -- Return the failed task row (API contract: always return task row) + RETURN QUERY + SELECT * FROM pgflow.step_tasks st + WHERE st.run_id = complete_task.run_id + AND st.step_slug = complete_task.step_slug + AND st.task_index = complete_task.task_index; RETURN; END IF; @@ -1182,6 +1223,21 @@ WHERE step_task.run_id = complete_task.run_id end; $$; +-- Create "_archive_task_message" function +CREATE FUNCTION "pgflow"."_archive_task_message" ("p_run_id" uuid, "p_step_slug" text, "p_task_index" integer) RETURNS void LANGUAGE sql SET "search_path" = '' AS $$ +SELECT pgmq.archive( + r.flow_slug, + ARRAY_AGG(st.message_id) + ) + FROM pgflow.step_tasks st + JOIN pgflow.runs r ON st.run_id = r.run_id + WHERE st.run_id = p_run_id + AND st.step_slug = p_step_slug + AND st.task_index = p_task_index + AND st.message_id IS NOT NULL + GROUP BY r.flow_slug + HAVING COUNT(st.message_id) > 0; +$$; -- Modify "fail_task" function CREATE OR REPLACE FUNCTION "pgflow"."fail_task" ("run_id" uuid, "step_slug" text, "task_index" integer, "error_message" text) RETURNS SETOF "pgflow"."step_tasks" LANGUAGE plpgsql SET "search_path" = '' AS $$ DECLARE @@ -1315,45 +1371,42 @@ maybe_fail_step AS ( WHERE pgflow.step_states.run_id = fail_task.run_id AND pgflow.step_states.step_slug = fail_task.step_slug RETURNING pgflow.step_states.* +), +run_update AS ( + -- Update run status: only fail when when_exhausted='fail' and step was failed + UPDATE pgflow.runs + SET status = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' + ELSE status + END, + failed_at = CASE + WHEN (select status from maybe_fail_step) = 'failed' THEN now() + ELSE NULL + END, + -- Decrement remaining_steps when step was skipped (not failed, run continues) + remaining_steps = CASE + WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1 + ELSE pgflow.runs.remaining_steps + END + WHERE pgflow.runs.run_id = fail_task.run_id + RETURNING pgflow.runs.status ) - -- Update run status: only fail when when_exhausted='fail' and step was failed -UPDATE pgflow.runs -SET status = CASE - WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed' - ELSE status - END, - failed_at = CASE - WHEN (select status from maybe_fail_step) = 'failed' THEN now() - ELSE NULL - END, - -- Decrement remaining_steps when step was skipped (not failed, run continues) - remaining_steps = CASE - WHEN (select status from maybe_fail_step) = 'skipped' THEN pgflow.runs.remaining_steps - 1 - ELSE pgflow.runs.remaining_steps - END -WHERE pgflow.runs.run_id = fail_task.run_id -RETURNING (status = 'failed') INTO v_run_failed; +SELECT + COALESCE((SELECT status = 'failed' FROM run_update), false), + COALESCE((SELECT status = 'failed' FROM maybe_fail_step), false), + COALESCE((SELECT status = 'skipped' FROM maybe_fail_step), false), + COALESCE((SELECT is_exhausted FROM task_status), false) +INTO v_run_failed, v_step_failed, v_step_skipped, v_task_exhausted; - -- Capture when_exhausted mode and check if step was skipped for later processing + -- Capture when_exhausted mode for later skip handling SELECT s.when_exhausted INTO v_when_exhausted FROM pgflow.steps s JOIN pgflow.runs r ON r.flow_slug = s.flow_slug -WHERE r.run_id = fail_task.run_id - AND s.step_slug = fail_task.step_slug; - -SELECT (status = 'skipped') INTO v_step_skipped -FROM pgflow.step_states -WHERE pgflow.step_states.run_id = fail_task.run_id - AND pgflow.step_states.step_slug = fail_task.step_slug; - --- Check if step failed by querying the step_states table -SELECT (status = 'failed') INTO v_step_failed -FROM pgflow.step_states -WHERE pgflow.step_states.run_id = fail_task.run_id - AND pgflow.step_states.step_slug = fail_task.step_slug; + WHERE r.run_id = fail_task.run_id + AND s.step_slug = fail_task.step_slug; -- Send broadcast event for step failure if the step was failed -IF v_step_failed THEN +IF v_task_exhausted AND v_step_failed THEN PERFORM realtime.send( jsonb_build_object( 'event_type', 'step:failed', @@ -1369,8 +1422,8 @@ IF v_step_failed THEN ); END IF; - -- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade') - IF v_step_skipped THEN +-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade') + IF v_task_exhausted AND v_step_skipped THEN -- Send broadcast event for step skipped PERFORM realtime.send( jsonb_build_object( @@ -1412,11 +1465,26 @@ END IF; AND dep.dep_slug = fail_task.step_slug AND child_state.step_slug = dep.step_slug; - -- Start any steps that became ready after decrementing remaining_deps - PERFORM pgflow.start_ready_steps(fail_task.run_id); + -- Evaluate conditions on newly-ready dependent steps + -- This must happen before cascade_complete_taskless_steps so that + -- skipped steps can set initial_tasks=0 for their map dependents + IF NOT pgflow.cascade_resolve_conditions(fail_task.run_id) THEN + -- Run was failed due to a condition with when_unmet='fail' + -- Archive the failed task's message before returning + PERFORM pgflow._archive_task_message(fail_task.run_id, fail_task.step_slug, fail_task.task_index); + -- Return the task row (API contract) + RETURN QUERY SELECT * FROM pgflow.step_tasks + WHERE pgflow.step_tasks.run_id = fail_task.run_id + AND pgflow.step_tasks.step_slug = fail_task.step_slug + AND pgflow.step_tasks.task_index = fail_task.task_index; + RETURN; + END IF; -- Auto-complete taskless steps (e.g., map steps with initial_tasks=0 from skipped dep) PERFORM pgflow.cascade_complete_taskless_steps(fail_task.run_id); + + -- Start steps that became ready after condition resolution and taskless completion + PERFORM pgflow.start_ready_steps(fail_task.run_id); END IF; -- Try to complete the run (remaining_steps may now be 0) diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index c98885fa9..f9596b341 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:ThrUAu9izqXh7CYZpi1VC17rNHGXdQh4yX5fwrTmygU= +h1:PKFaEWem6gnojibWcUFpjaSxvZfZ5mbqd1JLuMuV0NA= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -18,4 +18,4 @@ h1:ThrUAu9izqXh7CYZpi1VC17rNHGXdQh4yX5fwrTmygU= 20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o= 20260120205547_pgflow_requeue_stalled_tasks.sql h1:4wCBBvjtETCgJf1eXmlH5wCTKDUhiLi0uzsFG1V528E= 20260124113408_pgflow_auth_secret_support.sql h1:i/s1JkBqRElN6FOYFQviJt685W08SuSo30aP25lNlLc= -20260206115746_pgflow_step_conditions.sql h1:rIoXVl0SoVFGHdCFpAQnD6DRSHugzQODZa+UjAhA0ow= +20260206115746_pgflow_step_conditions.sql h1:ek5nBPfTIzDc4tioCQWBshSPwTj4HFRdL+njLFGlJKc= diff --git a/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql b/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql new file mode 100644 index 000000000..b43dcf413 --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/dependent_unmet_fail_archives_active_messages.test.sql @@ -0,0 +1,79 @@ +begin; +select plan(4); + +select pgflow_tests.reset_db(); + +select pgflow.create_flow('dependent_fail_archive'); +select pgflow.add_step( + flow_slug => 'dependent_fail_archive', + step_slug => 'first' +); +select pgflow.add_step( + flow_slug => 'dependent_fail_archive', + step_slug => 'second' +); +select pgflow.add_step( + flow_slug => 'dependent_fail_archive', + step_slug => 'checker', + deps_slugs => array['first'], + required_input_pattern => '{"ok": true}'::jsonb, + when_unmet => 'fail' +); + +with run as ( + select * + from pgflow.start_flow('dependent_fail_archive', '{}'::jsonb) +) +select run_id into temporary run_ids from run; + +select pgflow_tests.read_and_start('dependent_fail_archive'); + +select pgflow.complete_task( + run_id => (select run_id from run_ids), + step_slug => 'first', + task_index => 0, + output => '{"ok": false}'::jsonb +); + +select is( + ( + select status + from pgflow.runs + where run_id = (select run_id from run_ids) + ), + 'failed', + 'run should fail when dependent fail-condition is unmet' +); + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'checker' + ), + 'failed', + 'checker should fail due to unmet condition' +); + +select is( + ( + select count(*) + from pgmq.q_dependent_fail_archive + ), + 0::bigint, + 'run failure should archive all active queue messages' +); + +select ok( + ( + select count(*) + from pgmq.a_dependent_fail_archive + ) >= 2, + 'archive queue should contain completed and run-failure archived messages' +); + +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/condition_evaluation/root_unmet_fail_emits_failure_events.test.sql b/pkgs/core/supabase/tests/condition_evaluation/root_unmet_fail_emits_failure_events.test.sql new file mode 100644 index 000000000..cde264bbf --- /dev/null +++ b/pkgs/core/supabase/tests/condition_evaluation/root_unmet_fail_emits_failure_events.test.sql @@ -0,0 +1,88 @@ +begin; +select plan(6); + +select pgflow_tests.reset_db(); + +select pgflow.create_flow('root_fail_events'); +select pgflow.add_step( + flow_slug => 'root_fail_events', + step_slug => 'guarded', + required_input_pattern => '{"ok": true}'::jsonb, + when_unmet => 'fail' +); + +with run as ( + select * + from pgflow.start_flow('root_fail_events', '{}'::jsonb) +) +select run_id into temporary run_ids from run; + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'guarded' + ), + 'failed', + 'guarded step should fail when root condition is unmet' +); + +select is( + ( + select status + from pgflow.runs + where run_id = (select run_id from run_ids) + ), + 'failed', + 'run should fail when root fail-condition is unmet' +); + +select is( + pgflow_tests.count_realtime_events( + event_type => 'step:failed', + run_id => (select run_id from run_ids), + step_slug => 'guarded' + ), + 1::integer, + 'should emit one step:failed event' +); + +select is( + pgflow_tests.count_realtime_events( + event_type => 'run:failed', + run_id => (select run_id from run_ids) + ), + 1::integer, + 'should emit one run:failed event' +); + +select is( + ( + select payload->>'status' + from pgflow_tests.get_realtime_message( + event_type => 'step:failed', + run_id => (select run_id from run_ids), + step_slug => 'guarded' + ) + ), + 'failed', + 'step:failed payload should include failed status' +); + +select is( + ( + select payload->>'status' + from pgflow_tests.get_realtime_message( + event_type => 'run:failed', + run_id => (select run_id from run_ids) + ) + ), + 'failed', + 'run:failed payload should include failed status' +); + +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/create_flow_from_shape/missing_condition_modes_defaults.test.sql b/pkgs/core/supabase/tests/create_flow_from_shape/missing_condition_modes_defaults.test.sql new file mode 100644 index 000000000..59f997bf6 --- /dev/null +++ b/pkgs/core/supabase/tests/create_flow_from_shape/missing_condition_modes_defaults.test.sql @@ -0,0 +1,67 @@ +begin; +select plan(5); + +select pgflow_tests.reset_db(); + +select lives_ok( + $$ + select pgflow._create_flow_from_shape( + p_flow_slug => 'legacy_shape_defaults', + p_shape => '{ + "steps": [ + { + "slug": "first", + "stepType": "single", + "dependencies": [] + } + ] + }'::jsonb + ) + $$, + 'legacy shape without condition mode fields should compile' +); + +select is( + ( + select when_unmet + from pgflow.steps + where flow_slug = 'legacy_shape_defaults' + and step_slug = 'first' + ), + 'skip', + 'missing whenUnmet should default to skip' +); + +select is( + ( + select when_exhausted + from pgflow.steps + where flow_slug = 'legacy_shape_defaults' + and step_slug = 'first' + ), + 'fail', + 'missing whenExhausted should default to fail' +); + +select ok( + ( + select required_input_pattern is null + from pgflow.steps + where flow_slug = 'legacy_shape_defaults' + and step_slug = 'first' + ), + 'required_input_pattern should remain null when omitted' +); + +select ok( + ( + select forbidden_input_pattern is null + from pgflow.steps + where flow_slug = 'legacy_shape_defaults' + and step_slug = 'first' + ), + 'forbidden_input_pattern should remain null when omitted' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/fail_task_when_exhausted/replayed_fail_task_skip_is_idempotent.test.sql b/pkgs/core/supabase/tests/fail_task_when_exhausted/replayed_fail_task_skip_is_idempotent.test.sql new file mode 100644 index 000000000..729b37040 --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task_when_exhausted/replayed_fail_task_skip_is_idempotent.test.sql @@ -0,0 +1,121 @@ +begin; +select plan(7); + +select pgflow_tests.reset_db(); + +select pgflow.create_flow('replayed_skip_idempotent'); +select pgflow.add_step( + flow_slug => 'replayed_skip_idempotent', + step_slug => 'a', + max_attempts => 0, + when_exhausted => 'skip' +); +select pgflow.add_step( + flow_slug => 'replayed_skip_idempotent', + step_slug => 'd' +); +select pgflow.add_step( + flow_slug => 'replayed_skip_idempotent', + step_slug => 'c', + deps_slugs => array['d'] +); +select pgflow.add_step( + flow_slug => 'replayed_skip_idempotent', + step_slug => 'b', + deps_slugs => array['a', 'c'] +); + +with run as ( + select * + from pgflow.start_flow('replayed_skip_idempotent', '{}'::jsonb) +) +select run_id into temporary run_ids from run; + +select pgflow_tests.poll_and_fail('replayed_skip_idempotent'); + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'a' + ), + 'skipped', + 'first fail should skip step a' +); + +select is( + ( + select remaining_deps + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'b' + ), + 1, + 'first fail should decrement b remaining deps from 2 to 1' +); + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'b' + ), + 'created', + 'b should still be waiting on c after first fail' +); + +select pgflow.fail_task( + run_id => (select run_id from run_ids), + step_slug => 'a', + task_index => 0, + error_message => 'replayed failure' +); + +select is( + ( + select remaining_deps + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'b' + ), + 1, + 'replayed fail_task should not decrement b remaining deps again' +); + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'b' + ), + 'created', + 'replayed fail_task should not start b prematurely' +); + +select is( + pgflow_tests.count_realtime_events( + event_type => 'step:skipped', + run_id => (select run_id from run_ids), + step_slug => 'a' + ), + 1::integer, + 'replayed fail_task should not emit duplicate step:skipped event' +); + +select is( + ( + select status + from pgflow.runs + where run_id = (select run_id from run_ids) + ), + 'started', + 'run should remain started while c is incomplete' +); + +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_condition_fail_archives_parent_failed_message.test.sql b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_condition_fail_archives_parent_failed_message.test.sql new file mode 100644 index 000000000..5cb3705c5 --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_condition_fail_archives_parent_failed_message.test.sql @@ -0,0 +1,79 @@ +-- Test: Plain skip archives parent task message when condition fails the run +-- +-- Flow structure: +-- parent (when_exhausted='skip', max_attempts=0) -> child (condition with when_unmet='fail') +-- +-- Expected behavior: +-- 1. parent fails and gets skipped +-- 2. child's condition fails -> run fails +-- 3. Parent's message should be archived (not orphaned) +-- +-- This is a regression test for the bug where early RETURN in fail_task +-- left the parent task's message orphaned in the queue. + +begin; +select plan(4); +select pgflow_tests.reset_db(); + +-- Create flow: parent -> child with condition and fail mode +select pgflow.create_flow('skip_condition_fail_archive'); +select pgflow.add_step('skip_condition_fail_archive', 'parent', max_attempts => 0, when_exhausted => 'skip'); +select pgflow.add_step( + flow_slug => 'skip_condition_fail_archive', + step_slug => 'child', + deps_slugs => ARRAY['parent'], + required_input_pattern => '{"parent": {"success": true}}'::jsonb, + when_unmet => 'fail' +); + +-- Start flow and capture run_id +with flow as ( + select * from pgflow.start_flow('skip_condition_fail_archive', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Start the parent task +select pgflow_tests.read_and_start('skip_condition_fail_archive'); + +-- Fail parent task (triggers skip -> condition resolution -> run failure) +select pgflow.fail_task( + (select run_id from run_ids), + 'parent', + 0, + 'handler failed' +); + +-- Test 1: parent should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'parent'), + 'skipped', + 'parent should be skipped after failure' +); + +-- Test 2: run should be failed +select is( + (select status from pgflow.runs where run_id = (select run_id from run_ids)), + 'failed', + 'Run should fail when child condition fails' +); + +-- Test 3: queue should be empty (parent message archived, not orphaned) +select is( + (select count(*) from pgmq.q_skip_condition_fail_archive), + 0::bigint, + 'Queue should be empty - parent message archived, not orphaned' +); + +-- Test 4: message should exist in archive table (prove it was archived, not just deleted) +select is( + (select count(*)::int from pgmq.a_skip_condition_fail_archive), + 1, + 'Message should exist in archive table - proves proper archival' +); + +-- Cleanup +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_condition_fail_returns_failed_task_row.test.sql b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_condition_fail_returns_failed_task_row.test.sql new file mode 100644 index 000000000..bce6fa16a --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_condition_fail_returns_failed_task_row.test.sql @@ -0,0 +1,89 @@ +-- Test: fail_task returns task row when condition fails the run +-- +-- Flow structure: +-- parent (when_exhausted='skip', max_attempts=0) -> child (condition with when_unmet='fail') +-- +-- Expected behavior: +-- 1. parent fails and gets skipped +-- 2. child's condition fails -> run fails +-- 3. fail_task should return the task row (API contract) +-- +-- This is a regression test for the bug where early RETURN in fail_task +-- didn't return any row, breaking the API contract. + +begin; +select plan(5); +select pgflow_tests.reset_db(); + +-- Create flow: parent -> child with condition and fail mode +select pgflow.create_flow('skip_condition_fail_return'); +select pgflow.add_step('skip_condition_fail_return', 'parent', max_attempts => 0, when_exhausted => 'skip'); +select pgflow.add_step( + flow_slug => 'skip_condition_fail_return', + step_slug => 'child', + deps_slugs => ARRAY['parent'], + required_input_pattern => '{"parent": {"success": true}}'::jsonb, + when_unmet => 'fail' +); + +-- Start flow and capture run_id +with flow as ( + select * from pgflow.start_flow('skip_condition_fail_return', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Start the parent task +select pgflow_tests.read_and_start('skip_condition_fail_return'); + +-- Fail parent task (triggers skip -> condition resolution -> run failure) +-- Capture the return value +select * into temporary fail_result +from pgflow.fail_task( + (select run_id from run_ids), + 'parent', + 0, + 'handler failed' +); + +-- Test 1: fail_task should return exactly one row +select is( + (select count(*)::int from fail_result), + 1, + 'fail_task should return exactly one row even when condition fails run' +); + +-- Test 2: returned row should have correct step_slug +select is( + (select step_slug from fail_result), + 'parent', + 'Returned row should have correct step_slug' +); + +-- Test 3: returned row should have status 'failed' (task was failed) +select is( + (select status from fail_result), + 'failed', + 'Returned row should have status failed' +); + +-- Test 4: run should be failed (proves condition-fail path executed) +select is( + (select status from pgflow.runs where run_id = (select run_id from run_ids)), + 'failed', + 'Run should be failed - proves condition-fail branch executed' +); + +-- Test 5: parent step state should be skipped (proves skip->condition path) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'parent'), + 'skipped', + 'Parent step should be skipped - proves skip->condition-fail path' +); + +-- Cleanup +drop table if exists run_ids; +drop table if exists fail_result; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_resolves_dependent_conditions_before_start_fail_mode.test.sql b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_resolves_dependent_conditions_before_start_fail_mode.test.sql new file mode 100644 index 000000000..5e94a6b52 --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_resolves_dependent_conditions_before_start_fail_mode.test.sql @@ -0,0 +1,87 @@ +-- Test: Plain skip resolves dependent conditions before starting steps (child when_unmet='fail') +-- +-- Flow structure: +-- parent (when_exhausted='skip', max_attempts=0) -> child +-- +-- The child has a required_input_pattern that won't be met when parent is skipped. +-- Expected behavior: +-- 1. parent fails and gets skipped +-- 2. child should be condition-resolved (failed due to unmet condition), NOT started +-- 3. Run fails + +begin; +select plan(5); +select pgflow_tests.reset_db(); + +-- Create flow: parent -> child with condition and fail mode +select pgflow.create_flow('skip_parent_conditional_child_fail'); +select pgflow.add_step('skip_parent_conditional_child_fail', 'parent', max_attempts => 0, when_exhausted => 'skip'); +select pgflow.add_step( + flow_slug => 'skip_parent_conditional_child_fail', + step_slug => 'child', + deps_slugs => ARRAY['parent'], + required_input_pattern => '{"parent": {"success": true}}'::jsonb, + when_unmet => 'fail' +); + +-- Start flow and capture run_id +with flow as ( + select * from pgflow.start_flow('skip_parent_conditional_child_fail', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Start the parent task +select pgflow_tests.read_and_start('skip_parent_conditional_child_fail'); + +-- Fail parent task +select pgflow.fail_task( + (select run_id from run_ids), + 'parent', + 0, + 'handler failed' +); + +-- Test 1: parent should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'parent'), + 'skipped', + 'parent should be skipped after failure' +); + +-- Test 2: child should be condition-failed, NOT started +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'child'), + 'failed', + 'child should be failed due to unmet condition, not started' +); + +-- Test 3: child error_message should indicate condition failure +select ok( + (select error_message like '%Condition not met%' + from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'child'), + 'child should have error_message about condition not met' +); + +-- Test 4: No task should be created for child +select is( + (select count(*)::int from pgflow.step_tasks + where run_id = (select run_id from run_ids) and step_slug = 'child'), + 0, + 'No task should be created for child' +); + +-- Test 5: Run should fail +select is( + (select status from pgflow.runs where run_id = (select run_id from run_ids)), + 'failed', + 'Run should fail when child has unmet condition with when_unmet=fail' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_resolves_dependent_conditions_before_start_skip_mode.test.sql b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_resolves_dependent_conditions_before_start_skip_mode.test.sql new file mode 100644 index 000000000..c7fbc36eb --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_resolves_dependent_conditions_before_start_skip_mode.test.sql @@ -0,0 +1,87 @@ +-- Test: Plain skip resolves dependent conditions before starting steps (child when_unmet='skip') +-- +-- Flow structure: +-- parent (when_exhausted='skip', max_attempts=0) -> child +-- +-- The child has a required_input_pattern that won't be met when parent is skipped. +-- Expected behavior: +-- 1. parent fails and gets skipped +-- 2. child should be condition-resolved (skipped due to unmet condition), NOT started +-- 3. Run completes successfully + +begin; +select plan(5); +select pgflow_tests.reset_db(); + +-- Create flow: parent -> child with condition +select pgflow.create_flow('skip_parent_conditional_child_skip'); +select pgflow.add_step('skip_parent_conditional_child_skip', 'parent', max_attempts => 0, when_exhausted => 'skip'); +select pgflow.add_step( + flow_slug => 'skip_parent_conditional_child_skip', + step_slug => 'child', + deps_slugs => ARRAY['parent'], + required_input_pattern => '{"parent": {"success": true}}'::jsonb, + when_unmet => 'skip' +); + +-- Start flow and capture run_id +with flow as ( + select * from pgflow.start_flow('skip_parent_conditional_child_skip', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Start the parent task +select pgflow_tests.read_and_start('skip_parent_conditional_child_skip'); + +-- Fail parent task +select pgflow.fail_task( + (select run_id from run_ids), + 'parent', + 0, + 'handler failed' +); + +-- Test 1: parent should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'parent'), + 'skipped', + 'parent should be skipped after failure' +); + +-- Test 2: child should be condition-skipped, NOT started +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'child'), + 'skipped', + 'child should be skipped due to unmet condition, not started' +); + +-- Test 3: child skip_reason should be 'condition_unmet' +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'child'), + 'condition_unmet', + 'child should have skip_reason = condition_unmet' +); + +-- Test 4: No task should be created for child +select is( + (select count(*)::int from pgflow.step_tasks + where run_id = (select run_id from run_ids) and step_slug = 'child'), + 0, + 'No task should be created for child' +); + +-- Test 5: Run should complete successfully +select is( + (select status from pgflow.runs where run_id = (select run_id from run_ids)), + 'completed', + 'Run should complete when child is skipped' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_then_taskless_map_starts_downstream.test.sql b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_then_taskless_map_starts_downstream.test.sql new file mode 100644 index 000000000..12df9f679 --- /dev/null +++ b/pkgs/core/supabase/tests/fail_task_when_exhausted/skip_then_taskless_map_starts_downstream.test.sql @@ -0,0 +1,101 @@ +begin; +select plan(6); + +select pgflow_tests.reset_db(); + +select pgflow.create_flow('skip_map_downstream'); +select pgflow.add_step( + flow_slug => 'skip_map_downstream', + step_slug => 'a', + max_attempts => 0, + when_exhausted => 'skip' +); +select pgflow.add_step( + flow_slug => 'skip_map_downstream', + step_slug => 'm', + step_type => 'map', + deps_slugs => array['a'] +); +select pgflow.add_step( + flow_slug => 'skip_map_downstream', + step_slug => 'z', + deps_slugs => array['m'] +); + +with run as ( + select * + from pgflow.start_flow('skip_map_downstream', '{}'::jsonb) +) +select run_id into temporary run_ids from run; + +select pgflow_tests.poll_and_fail('skip_map_downstream'); + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'a' + ), + 'skipped', + 'source step should be skipped after exhaustion' +); + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'm' + ), + 'completed', + 'taskless map should auto-complete after dependency skip' +); + +select is( + ( + select output + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'm' + ), + '[]'::jsonb, + 'auto-completed map should emit empty array output' +); + +select is( + ( + select remaining_deps + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'z' + ), + 0, + 'downstream step should have all dependencies resolved' +); + +select is( + ( + select status + from pgflow.step_states + where run_id = (select run_id from run_ids) + and step_slug = 'z' + ), + 'started', + 'downstream step should be started after map auto-completion' +); + +select is( + ( + select status + from pgflow.runs + where run_id = (select run_id from run_ids) + ), + 'started', + 'run should remain started while downstream work is in progress' +); + +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/type_violations/complete_task_type_violation_returns_failed_row.test.sql b/pkgs/core/supabase/tests/type_violations/complete_task_type_violation_returns_failed_row.test.sql new file mode 100644 index 000000000..c1cc14a04 --- /dev/null +++ b/pkgs/core/supabase/tests/type_violations/complete_task_type_violation_returns_failed_row.test.sql @@ -0,0 +1,83 @@ +-- Test: complete_task returns task row on type violation +-- +-- Flow structure: +-- producer -> consumer_map (expects array input) +-- +-- Expected behavior: +-- 1. producer completes with non-array output +-- 2. consumer_map triggers type violation -> fails +-- 3. complete_task should return the task row (API contract) +-- +-- This is a regression test for the bug where type violation path +-- returned empty result instead of the task row. + +begin; +select plan(5); +select pgflow_tests.reset_db(); + +-- Create flow: producer -> consumer_map (expects array) +select pgflow.create_flow('type_violation_return'); +select pgflow.add_step( + flow_slug => 'type_violation_return', + step_slug => 'producer', + step_type => 'single' +); +select pgflow.add_step( + flow_slug => 'type_violation_return', + step_slug => 'consumer_map', + deps_slugs => ARRAY['producer'], + step_type => 'map' +); + +-- Start flow +select run_id as test_run_id from pgflow.start_flow('type_violation_return', '{}'::jsonb) \gset + +-- Start producer task +select pgflow_tests.ensure_worker('type_violation_return', '11111111-1111-1111-1111-111111111111'::uuid); +SELECT * FROM pgflow_tests.read_and_start('type_violation_return', 1, 1) LIMIT 1; + +-- Trigger type violation by completing producer with non-array (consumer_map expects array) +-- Capture the return value +select * into temporary complete_result +from pgflow.complete_task(:'test_run_id'::uuid, 'producer', 0, '{"not": "an array"}'::jsonb); + +-- Test 1: complete_task should return exactly one row +select is( + (select count(*)::int from complete_result), + 1, + 'complete_task should return exactly one row even on type violation' +); + +-- Test 2: returned row should have correct step_slug +select is( + (select step_slug from complete_result), + 'producer', + 'Returned row should have correct step_slug' +); + +-- Test 3: returned row should have status 'failed' (task was failed due to type violation) +select is( + (select status from complete_result), + 'failed', + 'Returned row should have status failed' +); + +-- Test 4: run should be failed (proves type-violation path executed) +select is( + (select status from pgflow.runs where run_id = :'test_run_id'::uuid), + 'failed', + 'Run should be failed - proves type-violation branch executed' +); + +-- Test 5: error message should contain type violation signature +select matches( + (select error_message from complete_result), + '^\[TYPE_VIOLATION\].*', + 'Error message should start with [TYPE_VIOLATION] signature' +); + +-- Cleanup +drop table if exists complete_result; + +select * from finish(); +rollback; diff --git a/pkgs/website/src/content/docs/build/conditional-steps/index.mdx b/pkgs/website/src/content/docs/build/conditional-steps/index.mdx index de131c68c..3f7c4dfc3 100644 --- a/pkgs/website/src/content/docs/build/conditional-steps/index.mdx +++ b/pkgs/website/src/content/docs/build/conditional-steps/index.mdx @@ -19,10 +19,10 @@ pgflow lets you skip steps based on input patterns or when handlers fail. pgflow provides two ways to skip steps: -| Feature | When Evaluated | Purpose | -| ------------------------- | ---------------- | -------------------------------- | -| `if`/`ifNot` conditions | Before step runs | Route based on input data | -| `whenExhausted` option | After step fails | Recover gracefully from failures | +| Feature | When Evaluated | Purpose | +| ----------------------- | ---------------- | -------------------------------- | +| `if`/`ifNot` conditions | Before step runs | Route based on input data | +| `whenExhausted` option | After step fails | Recover gracefully from failures | Both use the same three modes: `fail`, `skip`, and `skip-cascade`. @@ -38,7 +38,7 @@ When a condition is unmet or a step fails, you control what happens: | Mode | Behavior | | -------------- | ----------------------------------------------------------------------------------------------- | -| `fail` | Step fails, entire run fails (default for `whenExhausted`) | +| `fail` | Step fails, entire run fails (default for `whenExhausted`) | | `skip` | Step marked as skipped, run continues, dependents receive `undefined` (default for `whenUnmet`) | | `skip-cascade` | Step AND all downstream dependents skipped, run continues | @@ -53,7 +53,7 @@ new Flow<{ userId: string; plan: 'free' | 'premium' }>({ slug: 'userOnboarding', }) .step({ slug: 'createAccount' }, async (input) => { - return { accountId: await createUser(input.run.userId) }; + return { accountId: await createUser(input.userId) }; }) .step( { @@ -99,12 +99,13 @@ pgflow's type system tracks which steps may be skipped: .step({ slug: 'processResults', dependsOn: ['optionalEnrichment'], -}, async (input) => { +}, async (input, ctx) => { // TypeScript knows this may be undefined if (input.optionalEnrichment) { return processWithEnrichment(input.optionalEnrichment); } - return processBasic(input.run); + const flowInput = await ctx.flowInput; + return processBasic(flowInput); }) ``` diff --git a/pkgs/website/src/content/docs/build/conditional-steps/pattern-matching.mdx b/pkgs/website/src/content/docs/build/conditional-steps/pattern-matching.mdx index e6b2d418f..5d0f601a9 100644 --- a/pkgs/website/src/content/docs/build/conditional-steps/pattern-matching.mdx +++ b/pkgs/website/src/content/docs/build/conditional-steps/pattern-matching.mdx @@ -60,8 +60,8 @@ new Flow({ slug: 'onboarding' }).step( whenUnmet: 'skip', }, async (input) => { - // input.run.plan is guaranteed to be 'premium' here - return await enablePremiumFeatures(input.run.userId); + // input.plan is guaranteed to be 'premium' here + return await enablePremiumFeatures(input.userId); } ); ``` @@ -73,7 +73,7 @@ For dependent steps, the pattern matches against an object containing all depend ```typescript new Flow<{ url: string }>({ slug: 'contentPipeline' }) .step({ slug: 'analyze' }, async (input) => { - const result = await analyzeContent(input.run.url); + const result = await analyzeContent(input.url); return { needsModeration: result.flagged, content: result.text }; }) .step( diff --git a/pkgs/website/src/content/docs/build/conditional-steps/skip-modes.mdx b/pkgs/website/src/content/docs/build/conditional-steps/skip-modes.mdx index 657eb1549..2591204ad 100644 --- a/pkgs/website/src/content/docs/build/conditional-steps/skip-modes.mdx +++ b/pkgs/website/src/content/docs/build/conditional-steps/skip-modes.mdx @@ -69,17 +69,18 @@ The default behavior for `whenUnmet`. The step is marked as skipped, but the run if: { includeEnrichment: true }, // whenUnmet: 'skip' is the default }, async (input) => { - return await fetchEnrichment(input.run.id); + return await fetchEnrichment(input.id); }) .step({ slug: 'processResults', dependsOn: ['enrichData'], -}, async (input) => { +}, async (input, ctx) => { // TypeScript knows enrichData may be undefined if (input.enrichData) { return processWithEnrichment(input.enrichData); } - return processBasic(input.run); + const flowInput = await ctx.flowInput; + return processBasic(flowInput); }) ``` @@ -236,7 +237,7 @@ When a step is skipped, pgflow records why in the `skip_reason` field: | -------------------- | ------------------------------------------------- | | `condition_unmet` | Step's `if` or `ifNot` condition wasn't satisfied | | `dependency_skipped` | A dependency was skipped with `skip-cascade` | -| `handler_failed` | Handler failed with `whenExhausted: 'skip'` | +| `handler_failed` | Handler failed with `whenExhausted: 'skip'` | Query skipped steps: