Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .sdkmanrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
java=17.0.18-amzn
18 changes: 16 additions & 2 deletions CLAUDE.md → AGENTS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# CLAUDE.md
# AGENTS.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
This file provides guidance to AI coding agents when working with code in this repository.

## Project Overview

Expand Down Expand Up @@ -102,3 +102,17 @@ New modules should:
2. Include proper Maven coordinates under `com.datasqrl.flinkrunner`
3. Add to parent POM modules section
4. Include common dependencies from parent (Lombok, JUnit, etc.)

## AI Decision Records (ADR)

When working on non-trivial changes, create an ADR in `adr/<branch-name>.md` to maintain context across sessions:

- **Location**: `adr/branch-name.md`
- **When**: Before creating a PR or when context needs to be preserved
- **Structure**:
- Summary: Goal and what was implemented (2-3 sentences)
- Key decisions: Technical choices and rationale
- Related: Links to issues, PRs, or other context
- **Size**: Keep concise — aim for 300-500 words, never exceed 800
- **Content**: Document WHY decisions were made, not WHAT was changed (that's in git diff)
- **Commit**: Always commit and push ADR along with the code changes
18 changes: 18 additions & 0 deletions adr/fix-multiple-statement-sets-session-mode.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Fix: Multiple EXECUTE STATEMENT SET blocks in session mode

## Summary

When running SQL scripts with multiple `EXECUTE STATEMENT SET` blocks via FlinkSessionJob (session mode), only the first block executes. The fix replaces `tableResult.await()` with a fallback to job status polling when `await()` is not supported.

## Key Decisions

**Why `await()` fails in session mode**: FlinkSessionJob submits jobs via REST API, which returns a `WebSubmissionJobClient`. This client does not support `getJobExecutionResult()`, which `await()` internally calls. The first statement set runs fine, but the runner throws when trying to wait for it before submitting the second.

**Fix approach — try/catch with polling fallback**: `awaitCompletion()` first tries `await()` (works in application mode). If that throws, it falls back to polling `jobClient.getJobStatus()` until a terminal state (FINISHED, FAILED, CANCELED). This preserves the serialization between statement sets that Ferenc identified as essential, while working in both application and session modes.

**Why not remove `await()` entirely**: The await/polling is necessary to serialize multiple statement sets. Without it, the second set would submit before the first completes, which breaks the intended execution order.

## Related

- Issue: https://github.com/DataSQRL/flink-sql-runner/issues/308
- Test manifest: https://github.com/DataSQRL/cloud-compilation/blob/add-dedup-test/test-case.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@
import java.util.Arrays;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
Expand Down Expand Up @@ -146,14 +149,18 @@ TableResult executeCompiledPlan(String planJson) {
}
}

private static final Set<JobStatus> TERMINAL_STATES =
Set.of(JobStatus.FINISHED, JobStatus.CANCELED, JobStatus.FAILED);

private static final long JOB_POLL_INTERVAL_MS = 1000;

private TableResult executeStatement(String statement, boolean intermediate) {
TableResult tableResult = null;
try {
var setMatcher = SET_PATTERN.matcher(statement.trim());
var statementSetMatcher = STATEMENT_SET_PATTERN.matcher(statement.trim());

if (setMatcher.matches()) {
// Handle SET statements
var key = setMatcher.group(1);
var value = setMatcher.group(2);
tEnv.getConfig().getConfiguration().setString(key, value);
Expand All @@ -163,8 +170,8 @@ private TableResult executeStatement(String statement, boolean intermediate) {
log.info("Executing statement:\n{}", statement);
tableResult = tEnv.executeSql(statement);
if (statementSetMatcher.find() && intermediate) {
log.debug("Make sure to wait intermediate statement set to finish...");
tableResult.await();
log.debug("Waiting for intermediate statement set to finish...");
awaitCompletion(tableResult);
}
}
} catch (Exception e) {
Expand All @@ -173,6 +180,48 @@ private TableResult executeStatement(String statement, boolean intermediate) {
return tableResult;
}

/**
* Waits for a {@link TableResult} to complete. First attempts {@link TableResult#await()}, which
* works in application mode. If that fails (e.g., in session mode where {@code
* WebSubmissionJobClient} does not support {@code getJobExecutionResult()}), falls back to
* polling the job status via {@code JobClient.getJobStatus()}.
*/
@VisibleForTesting
static void awaitCompletion(TableResult tableResult)
throws InterruptedException, ExecutionException {
try {
tableResult.await();
} catch (Exception e) {
log.info(
"tableResult.await() not supported (likely session mode), falling back to job status polling");
pollJobUntilTerminal(tableResult);
}
}

private static void pollJobUntilTerminal(TableResult tableResult)
throws InterruptedException, ExecutionException {
var jobClient =
tableResult
.getJobClient()
.orElseThrow(
() -> new IllegalStateException("No JobClient available to poll job status"));

while (true) {
var status = jobClient.getJobStatus().get();
log.debug("Polling job status: {}", status);

if (TERMINAL_STATES.contains(status)) {
if (status == JobStatus.FAILED) {
throw new RuntimeException("Job " + jobClient.getJobID() + " failed during execution");
}
log.info("Job {} completed with status: {}", jobClient.getJobID(), status);
return;
}

Thread.sleep(JOB_POLL_INTERVAL_MS);
}
}

/**
* Sets up the UDF path in the TableEnvironment.
*
Expand Down
Loading
Loading