From b66e4351b8a785ea1b8dca1057b58219dab783f6 Mon Sep 17 00:00:00 2001 From: Marvin Froeder Date: Fri, 20 Mar 2026 14:53:31 -0300 Subject: [PATCH 1/3] fix: support multiple EXECUTE STATEMENT SET blocks in session mode (#308) Signed-off-by: Marvin Froeder --- .sdkmanrc | 1 + CLAUDE.md => AGENTS.md | 18 +++- ...ix-multiple-statement-sets-session-mode.md | 18 ++++ .../com/datasqrl/flinkrunner/SqlExecutor.java | 55 +++++++++++- .../flinkrunner/MultipleStatementSetsIT.java | 67 ++++++++++++++ .../datasqrl/flinkrunner/SqlExecutorTest.java | 88 +++++++++++++++++++ .../sql/test_multiple_statement_sets.sql | 41 +++++++++ 7 files changed, 283 insertions(+), 5 deletions(-) create mode 100644 .sdkmanrc rename CLAUDE.md => AGENTS.md (80%) create mode 100644 adr/fix-multiple-statement-sets-session-mode.md create mode 100644 flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/MultipleStatementSetsIT.java create mode 100644 flink-sql-runner/src/test/resources/sql/test_multiple_statement_sets.sql diff --git a/.sdkmanrc b/.sdkmanrc new file mode 100644 index 00000000..a4417cdc --- /dev/null +++ b/.sdkmanrc @@ -0,0 +1 @@ +java=17.0.18-amzn diff --git a/CLAUDE.md b/AGENTS.md similarity index 80% rename from CLAUDE.md rename to AGENTS.md index 84e75c55..917516d1 100644 --- a/CLAUDE.md +++ b/AGENTS.md @@ -1,6 +1,6 @@ -# CLAUDE.md +# AGENTS.md -This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. +This file provides guidance to AI coding agents when working with code in this repository. ## Project Overview @@ -102,3 +102,17 @@ New modules should: 2. Include proper Maven coordinates under `com.datasqrl.flinkrunner` 3. Add to parent POM modules section 4. Include common dependencies from parent (Lombok, JUnit, etc.) + +## AI Decision Records (ADR) + +When working on non-trivial changes, create an ADR in `adr/.md` to maintain context across sessions: + +- **Location**: `adr/branch-name.md` +- **When**: Before creating a PR or when context needs to be preserved +- **Structure**: + - Summary: Goal and what was implemented (2-3 sentences) + - Key decisions: Technical choices and rationale + - Related: Links to issues, PRs, or other context +- **Size**: Keep concise — aim for 300-500 words, never exceed 800 +- **Content**: Document WHY decisions were made, not WHAT was changed (that's in git diff) +- **Commit**: Always commit and push ADR along with the code changes diff --git a/adr/fix-multiple-statement-sets-session-mode.md b/adr/fix-multiple-statement-sets-session-mode.md new file mode 100644 index 00000000..3aa5cf26 --- /dev/null +++ b/adr/fix-multiple-statement-sets-session-mode.md @@ -0,0 +1,18 @@ +# Fix: Multiple EXECUTE STATEMENT SET blocks in session mode + +## Summary + +When running SQL scripts with multiple `EXECUTE STATEMENT SET` blocks via FlinkSessionJob (session mode), only the first block executes. The fix replaces `tableResult.await()` with a fallback to job status polling when `await()` is not supported. + +## Key Decisions + +**Why `await()` fails in session mode**: FlinkSessionJob submits jobs via REST API, which returns a `WebSubmissionJobClient`. This client does not support `getJobExecutionResult()`, which `await()` internally calls. The first statement set runs fine, but the runner throws when trying to wait for it before submitting the second. + +**Fix approach — try/catch with polling fallback**: `awaitCompletion()` first tries `await()` (works in application mode). If that throws, it falls back to polling `jobClient.getJobStatus()` until a terminal state (FINISHED, FAILED, CANCELED). This preserves the serialization between statement sets that Ferenc identified as essential, while working in both application and session modes. + +**Why not remove `await()` entirely**: The await/polling is necessary to serialize multiple statement sets. Without it, the second set would submit before the first completes, which breaks the intended execution order. + +## Related + +- Issue: https://github.com/DataSQRL/flink-sql-runner/issues/308 +- Test manifest: https://github.com/DataSQRL/cloud-compilation/blob/add-dedup-test/test-case.yaml diff --git a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/SqlExecutor.java b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/SqlExecutor.java index 95489a48..6166d44b 100644 --- a/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/SqlExecutor.java +++ b/flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/SqlExecutor.java @@ -23,6 +23,8 @@ import java.util.Arrays; import java.util.Optional; import java.util.ServiceLoader; +import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -30,6 +32,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; @@ -146,6 +149,11 @@ TableResult executeCompiledPlan(String planJson) { } } + private static final Set TERMINAL_STATES = + Set.of(JobStatus.FINISHED, JobStatus.CANCELED, JobStatus.FAILED); + + private static final long JOB_POLL_INTERVAL_MS = 1000; + private TableResult executeStatement(String statement, boolean intermediate) { TableResult tableResult = null; try { @@ -153,7 +161,6 @@ private TableResult executeStatement(String statement, boolean intermediate) { var statementSetMatcher = STATEMENT_SET_PATTERN.matcher(statement.trim()); if (setMatcher.matches()) { - // Handle SET statements var key = setMatcher.group(1); var value = setMatcher.group(2); tEnv.getConfig().getConfiguration().setString(key, value); @@ -163,8 +170,8 @@ private TableResult executeStatement(String statement, boolean intermediate) { log.info("Executing statement:\n{}", statement); tableResult = tEnv.executeSql(statement); if (statementSetMatcher.find() && intermediate) { - log.debug("Make sure to wait intermediate statement set to finish..."); - tableResult.await(); + log.debug("Waiting for intermediate statement set to finish..."); + awaitCompletion(tableResult); } } } catch (Exception e) { @@ -173,6 +180,48 @@ private TableResult executeStatement(String statement, boolean intermediate) { return tableResult; } + /** + * Waits for a {@link TableResult} to complete. First attempts {@link TableResult#await()}, which + * works in application mode. If that fails (e.g., in session mode where {@code + * WebSubmissionJobClient} does not support {@code getJobExecutionResult()}), falls back to + * polling the job status via {@code JobClient.getJobStatus()}. + */ + @VisibleForTesting + static void awaitCompletion(TableResult tableResult) + throws InterruptedException, ExecutionException { + try { + tableResult.await(); + } catch (Exception e) { + log.info( + "tableResult.await() not supported (likely session mode), falling back to job status polling"); + pollJobUntilTerminal(tableResult); + } + } + + private static void pollJobUntilTerminal(TableResult tableResult) + throws InterruptedException, ExecutionException { + var jobClient = + tableResult + .getJobClient() + .orElseThrow( + () -> new IllegalStateException("No JobClient available to poll job status")); + + while (true) { + var status = jobClient.getJobStatus().get(); + log.debug("Polling job status: {}", status); + + if (TERMINAL_STATES.contains(status)) { + if (status == JobStatus.FAILED) { + throw new RuntimeException("Job " + jobClient.getJobID() + " failed during execution"); + } + log.info("Job {} completed with status: {}", jobClient.getJobID(), status); + return; + } + + Thread.sleep(JOB_POLL_INTERVAL_MS); + } + } + /** * Sets up the UDF path in the TableEnvironment. * diff --git a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/MultipleStatementSetsIT.java b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/MultipleStatementSetsIT.java new file mode 100644 index 00000000..5d59a3db --- /dev/null +++ b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/MultipleStatementSetsIT.java @@ -0,0 +1,67 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +class MultipleStatementSetsIT extends AbstractITSupport { + + @Test + void given_multipleStatementSets_when_batchExecution_then_allStatementSetsComplete() + throws Exception { + var execCmd = + new ArrayList<>( + List.of( + "flink", + "run", + "./plugins/flink-sql-runner/flink-sql-runner.uber.jar", + "--sqlfile", + "/it/sqlfile/test_multiple_statement_sets.sql", + "--mode", + "BATCH")); + + var execRes = flinkContainer.execInContainer(execCmd.toArray(new String[0])); + var stdOut = execRes.getStdout(); + var stdErr = execRes.getStderr(); + + assertThat(stdErr) + .withFailMessage("Execution failed with: %s", stdErr) + .doesNotContain("The program finished with the following exception:"); + + var jobIds = + stdOut + .lines() + .filter(line -> line.startsWith("Job has been submitted with JobID")) + .map(line -> line.substring(line.lastIndexOf(" ") + 1).trim()) + .toList(); + + assertThat(jobIds).as("Expected two jobs submitted (one per EXECUTE STATEMENT SET)").hasSize(2); + + var tmLogs = flinkContainer.execInContainer("bash", "-c", "cat /opt/flink/log/*taskexecutor*"); + + assertThat(tmLogs.getStdout()) + .as("First statement set output (PrintOutputA) should be present") + .contains("PrintOutputA"); + + assertThat(tmLogs.getStdout()) + .as("Second statement set output (PrintOutputB) should be present") + .contains("PrintOutputB"); + } +} diff --git a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/SqlExecutorTest.java b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/SqlExecutorTest.java index 495382e7..f17324cf 100644 --- a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/SqlExecutorTest.java +++ b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/SqlExecutorTest.java @@ -16,14 +16,20 @@ package com.datasqrl.flinkrunner; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.*; import java.io.File; import java.net.URL; import java.net.URLClassLoader; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; @@ -125,4 +131,86 @@ void testSetConfigClassPaths_setsClasspathOption() throws Exception { List paths = config.get(PipelineOptions.CLASSPATHS); assertThat(paths).contains("file:/test.jar"); } + + @Test + void given_awaitWorks_when_awaitCompletion_then_usesAwaitDirectly() throws Exception { + TableResult tableResult = mock(TableResult.class); + + SqlExecutor.awaitCompletion(tableResult); + + verify(tableResult).await(); + verify(tableResult, never()).getJobClient(); + } + + @Test + void given_awaitThrows_when_awaitCompletion_then_fallsBackToPolling() throws Exception { + TableResult tableResult = mock(TableResult.class); + JobClient jobClient = mock(JobClient.class); + + doThrow(new RuntimeException("Web Submission does not support await")) + .when(tableResult) + .await(); + when(tableResult.getJobClient()).thenReturn(Optional.of(jobClient)); + when(jobClient.getJobID()).thenReturn(new JobID()); + when(jobClient.getJobStatus()) + .thenReturn(CompletableFuture.completedFuture(JobStatus.RUNNING)) + .thenReturn(CompletableFuture.completedFuture(JobStatus.FINISHED)); + + SqlExecutor.awaitCompletion(tableResult); + + verify(tableResult).getJobClient(); + verify(jobClient, atLeast(2)).getJobStatus(); + } + + @Test + void given_jobFails_when_awaitCompletion_then_throwsRuntimeException() throws Exception { + TableResult tableResult = mock(TableResult.class); + JobClient jobClient = mock(JobClient.class); + var jobId = new JobID(); + + doThrow(new RuntimeException("Web Submission does not support await")) + .when(tableResult) + .await(); + when(tableResult.getJobClient()).thenReturn(Optional.of(jobClient)); + when(jobClient.getJobID()).thenReturn(jobId); + when(jobClient.getJobStatus()).thenReturn(CompletableFuture.completedFuture(JobStatus.FAILED)); + + assertThatThrownBy(() -> SqlExecutor.awaitCompletion(tableResult)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("failed during execution"); + } + + @Test + void given_multipleStatementSets_when_executeScript_then_bothExecuted() throws Exception { + TableEnvironment mockTableEnv = mock(TableEnvironment.class); + TableResult firstResult = mock(TableResult.class); + TableResult secondResult = mock(TableResult.class); + JobClient jobClient = mock(JobClient.class); + + when(mockTableEnv.executeSql(contains("OutA"))).thenReturn(firstResult); + when(mockTableEnv.executeSql(contains("OutB"))).thenReturn(secondResult); + when(firstResult.getJobClient()).thenReturn(Optional.of(jobClient)); + when(jobClient.getJobID()).thenReturn(new JobID()); + when(jobClient.getJobStatus()) + .thenReturn(CompletableFuture.completedFuture(JobStatus.FINISHED)); + doThrow(new RuntimeException("Web Submission does not support await")) + .when(firstResult) + .await(); + + SqlExecutor executor = new SqlExecutor(mockTableEnv); + + String script = + "EXECUTE STATEMENT SET BEGIN\n" + + "INSERT INTO OutA SELECT * FROM InA;\n" + + "END;\n" + + "EXECUTE STATEMENT SET BEGIN\n" + + "INSERT INTO OutB SELECT * FROM InB;\n" + + "END"; + + TableResult result = executor.executeScript(script); + + assertThat(result).isSameAs(secondResult); + verify(mockTableEnv).executeSql(contains("OutA")); + verify(mockTableEnv).executeSql(contains("OutB")); + } } diff --git a/flink-sql-runner/src/test/resources/sql/test_multiple_statement_sets.sql b/flink-sql-runner/src/test/resources/sql/test_multiple_statement_sets.sql new file mode 100644 index 00000000..ca9f1cb4 --- /dev/null +++ b/flink-sql-runner/src/test/resources/sql/test_multiple_statement_sets.sql @@ -0,0 +1,41 @@ +CREATE TABLE InA ( + id BIGINT, + name STRING +) WITH ( + 'connector' = 'datagen', + 'number-of-rows' = '10' +); + +CREATE TABLE InB ( + id BIGINT, + name STRING +) WITH ( + 'connector' = 'datagen', + 'number-of-rows' = '10' +); + +CREATE TABLE OutA ( + id BIGINT, + name STRING +) WITH ( + 'connector' = 'print', + 'print-identifier' = 'PrintOutputA' +); + +CREATE TABLE OutB ( + id BIGINT, + name STRING +) WITH ( + 'connector' = 'print', + 'print-identifier' = 'PrintOutputB' +); + +EXECUTE STATEMENT SET BEGIN +INSERT INTO OutA +SELECT * FROM InA; +END; + +EXECUTE STATEMENT SET BEGIN +INSERT INTO OutB +SELECT * FROM InB; +END From c0a42000e5b6ba7ae5eaaa223c2554435f104f3c Mon Sep 17 00:00:00 2001 From: Marvin Froeder Date: Fri, 20 Mar 2026 15:04:44 -0300 Subject: [PATCH 2/3] fix: wait for batch jobs to complete before asserting logs in IT Signed-off-by: Marvin Froeder --- .../flinkrunner/MultipleStatementSetsIT.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/MultipleStatementSetsIT.java b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/MultipleStatementSetsIT.java index 5d59a3db..baccb011 100644 --- a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/MultipleStatementSetsIT.java +++ b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/MultipleStatementSetsIT.java @@ -15,8 +15,12 @@ */ package com.datasqrl.flinkrunner; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import com.nextbreakpoint.flink.client.model.JobStatus; import java.util.ArrayList; import java.util.List; import org.junit.jupiter.api.Test; @@ -54,6 +58,14 @@ void given_multipleStatementSets_when_batchExecution_then_allStatementSetsComple assertThat(jobIds).as("Expected two jobs submitted (one per EXECUTE STATEMENT SET)").hasSize(2); + for (String jobId : jobIds) { + await() + .atMost(30, SECONDS) + .pollInterval(500, MILLISECONDS) + .ignoreExceptions() + .until(() -> client.getJobStatusInfo(jobId).getStatus() == JobStatus.FINISHED); + } + var tmLogs = flinkContainer.execInContainer("bash", "-c", "cat /opt/flink/log/*taskexecutor*"); assertThat(tmLogs.getStdout()) From a099fbbae4fb3f0046050a1cfebe10c0a8e776d0 Mon Sep 17 00:00:00 2001 From: Marvin Froeder Date: Fri, 20 Mar 2026 16:33:34 -0300 Subject: [PATCH 3/3] test: add K8s E2E test for multiple statement sets in session mode Signed-off-by: Marvin Froeder --- .../flinkrunner/MultipleStatementSetsCT.java | 405 ++++++++++++++++++ 1 file changed, 405 insertions(+) create mode 100644 flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/MultipleStatementSetsCT.java diff --git a/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/MultipleStatementSetsCT.java b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/MultipleStatementSetsCT.java new file mode 100644 index 00000000..31837ae4 --- /dev/null +++ b/flink-sql-runner/src/test/java/com/datasqrl/flinkrunner/MultipleStatementSetsCT.java @@ -0,0 +1,405 @@ +/* + * Copyright © 2026 DataSQRL (contact@datasqrl.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datasqrl.flinkrunner; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +/** + * K8s-based E2E test that verifies multiple EXECUTE STATEMENT SET blocks work correctly when + * submitted via FlinkSessionJob (Flink Operator session mode). + * + *

This test requires: + * + *

    + *
  • kubectl configured with access to a K8s cluster with the Flink operator installed + *
  • The flink-sql-runner image accessible from the cluster + *
+ * + *

Run with: {@code mvn verify -Dit.test=MultipleStatementSetsCT + * -Dflink.runner.image=:} + */ +@Slf4j +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class MultipleStatementSetsCT { + + private static final String NAMESPACE_PREFIX = "flink-runner-ct-"; + private static final String FLINK_DEPLOYMENT_NAME = "flink-application-test-0"; + private static final String SESSION_JOB_NAME = "flink-session-job-test-0"; + + private static final String DEFAULT_IMAGE = + "819957634547.dkr.ecr.us-east-1.amazonaws.com/cache/dockerhub/datasqrl/flink-sql-runner:0.9.5-flink-2.2"; + + private String flinkImage; + private String namespace; + + @BeforeAll + void setUp() throws Exception { + flinkImage = System.getProperty("flink.runner.image", DEFAULT_IMAGE); + namespace = NAMESPACE_PREFIX + Long.toHexString(System.currentTimeMillis()); + log.info("Using Flink image: {}", flinkImage); + log.info("Using namespace: {}", namespace); + + kubectl("create", "namespace", namespace); + + kubectl("create", "serviceaccount", "flink", "-n", namespace); + + kubectl( + "create", + "rolebinding", + "flink-edit", + "-n", + namespace, + "--clusterrole=edit", + "--serviceaccount=" + namespace + ":flink"); + + applyFlinkSqlConfigMap(); + applyHadoopConfigMap(); + applyFlinkDeployment(); + applyFlinkSessionJob(); + + kubectl( + "annotate", + "flinkdeployment", + FLINK_DEPLOYMENT_NAME, + "-n", + namespace, + "force-reconcile=" + System.currentTimeMillis(), + "--overwrite"); + + log.info("All resources applied, waiting for pods..."); + } + + @AfterAll + void tearDown() { + try { + collectDiagnostics(); + cleanupNamespace(); + } catch (Exception e) { + log.warn("Cleanup failed", e); + } + } + + @Test + void given_multipleStatementSets_when_sessionMode_then_allStatementSetsComplete() + throws Exception { + await("first-statement-set-completed") + .atMost(5, MINUTES) + .pollInterval(10, SECONDS) + .ignoreExceptions() + .untilAsserted( + () -> { + var pods = kubectl("get", "pods", "-n", namespace, "--no-headers"); + log.info("Pods:\n{}", pods); + var tmLogs = getTaskManagerLogs(); + assertThat(tmLogs).contains("PrintOutputA"); + }); + + log.info("First statement set completed, checking for second..."); + + var tmLogs = getTaskManagerLogs(); + + assertThat(tmLogs) + .as("First statement set output (PrintOutputA) should be present") + .contains("PrintOutputA"); + + assertThat(tmLogs) + .as("Second statement set output (PrintOutputB) should be present") + .contains("PrintOutputB"); + } + + private String getTaskManagerLogs() throws Exception { + var tmPod = + kubectl( + "get", + "pods", + "-n", + namespace, + "-l", + "component=taskmanager", + "-o", + "jsonpath={.items[0].metadata.name}") + .trim(); + + if (tmPod.isEmpty()) { + var allPods = kubectl("get", "pods", "-n", namespace, "--no-headers"); + var tmLine = allPods.lines().filter(l -> l.contains("taskmanager")).findFirst().orElse(""); + tmPod = tmLine.split("\\s+")[0]; + } + + assertThat(tmPod).as("TaskManager pod should exist").isNotEmpty(); + return kubectl("logs", tmPod, "-n", namespace, "-c", "flink-main-container"); + } + + private void applyFlinkSqlConfigMap() throws Exception { + var yaml = + """ + apiVersion: v1 + kind: ConfigMap + metadata: + name: flink-sql-test + namespace: %s + data: + config.yaml: | + "table.exec.source.idle-timeout": "1 ms" + "execution.runtime-mode": "BATCH" + flink.sql: | + CREATE TABLE `InA` ( + `id` BIGINT, + `name` STRING + ) WITH ( + 'connector' = 'datagen', + 'number-of-rows' = '10' + ); + CREATE TABLE `InB` ( + `id` BIGINT, + `name` STRING + ) WITH ( + 'connector' = 'datagen', + 'number-of-rows' = '10' + ); + CREATE TABLE `OutA` ( + `id` BIGINT, + `name` STRING + ) WITH ( + 'connector' = 'print', + 'print-identifier' = 'PrintOutputA' + ); + CREATE TABLE `OutB` ( + `id` BIGINT, + `name` STRING + ) WITH ( + 'connector' = 'print', + 'print-identifier' = 'PrintOutputB' + ); + EXECUTE STATEMENT SET BEGIN + INSERT INTO `OutA` + SELECT * + FROM `InA` + ORDER BY `id` + ; + END; + EXECUTE STATEMENT SET BEGIN + INSERT INTO `OutB` + SELECT * + FROM `InB` + ORDER BY `id` + ; + END + """ + .formatted(namespace); + kubectlApply(yaml); + } + + private void applyHadoopConfigMap() throws Exception { + var yaml = + """ + apiVersion: v1 + kind: ConfigMap + metadata: + name: hadoop-config-%s + namespace: %s + data: + core-site.xml: | + + + """ + .formatted(FLINK_DEPLOYMENT_NAME, namespace); + kubectlApply(yaml); + } + + private void applyFlinkDeployment() throws Exception { + var yaml = + """ + apiVersion: flink.apache.org/v1beta1 + kind: FlinkDeployment + metadata: + name: %s + namespace: %s + spec: + image: %s + flinkVersion: v2_2 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + taskmanager.memory.process.size: "2000m" + jobmanager.memory.process.size: "1000m" + execution.runtime-mode: "BATCH" + pipeline.classpaths: "file:///opt/flink/flink-sql-runner.jar" + security.delegation.tokens.enabled: "false" + serviceAccount: flink + jobManager: + replicas: 1 + resource: + cpu: 0.5 + taskManager: + replicas: 1 + resource: + cpu: 0.5 + podTemplate: + spec: + nodeSelector: + sqrlpipeline/tier: standard + sqrlpipeline/type: sqrlpipeline + containers: + - name: flink-main-container + volumeMounts: + - name: flink-sql + mountPath: /opt/flink/usrlib/flink-files + - name: udf-jars + mountPath: /opt/flink/usrlib/flink-files/jars + volumes: + - name: flink-sql + configMap: + name: flink-sql-test + - name: udf-jars + emptyDir: {} + """ + .formatted(FLINK_DEPLOYMENT_NAME, namespace, flinkImage); + kubectlApply(yaml); + } + + private void applyFlinkSessionJob() throws Exception { + var yaml = + """ + apiVersion: flink.apache.org/v1beta1 + kind: FlinkSessionJob + metadata: + name: %s + namespace: %s + spec: + deploymentName: %s + job: + entryClass: com.datasqrl.flinkrunner.CliRunner + args: + - "--sqlfile" + - "/opt/flink/usrlib/flink-files/flink.sql" + - "--config-dir" + - "/opt/flink/usrlib/flink-files" + - "--udfpath" + - "/opt/flink/usrlib/flink-files/jars" + parallelism: 1 + upgradeMode: stateless + """ + .formatted(SESSION_JOB_NAME, namespace, FLINK_DEPLOYMENT_NAME); + kubectlApply(yaml); + } + + private void cleanupNamespace() { + try { + kubectl( + "patch", + "flinkdeployment", + FLINK_DEPLOYMENT_NAME, + "-n", + namespace, + "-p", + "{\"metadata\":{\"finalizers\":null}}", + "--type=merge"); + } catch (Exception ignored) { + } + try { + kubectl( + "patch", + "flinksessionjob", + SESSION_JOB_NAME, + "-n", + namespace, + "-p", + "{\"metadata\":{\"finalizers\":null}}", + "--type=merge"); + } catch (Exception ignored) { + } + try { + kubectl("delete", "namespace", namespace, "--force", "--grace-period=0", "--wait=false"); + await("namespace-deleted") + .atMost(60, SECONDS) + .pollInterval(2, SECONDS) + .ignoreExceptions() + .untilAsserted( + () -> { + var ns = kubectl("get", "namespace", namespace, "--ignore-not-found"); + assertThat(ns.trim()).isEmpty(); + }); + } catch (Exception ignored) { + } + } + + private void collectDiagnostics() { + try { + log.info("=== Diagnostics ==="); + log.info( + "FlinkDeployment status:\n{}", + kubectl("get", "flinkdeployment", FLINK_DEPLOYMENT_NAME, "-n", namespace, "-o", "yaml")); + log.info( + "FlinkSessionJob status:\n{}", + kubectl("get", "flinksessionjob", SESSION_JOB_NAME, "-n", namespace, "-o", "yaml")); + log.info("Pods:\n{}", kubectl("get", "pods", "-n", namespace, "-o", "wide")); + log.info( + "Events:\n{}", kubectl("get", "events", "-n", namespace, "--sort-by=.lastTimestamp")); + } catch (Exception e) { + log.warn("Failed to collect diagnostics", e); + } + } + + private String kubectl(String... args) throws Exception { + var cmd = new String[args.length + 1]; + cmd[0] = "kubectl"; + System.arraycopy(args, 0, cmd, 1, args.length); + return exec(cmd); + } + + private void kubectlApply(String yaml) throws Exception { + var process = + new ProcessBuilder("kubectl", "apply", "-f", "-").redirectErrorStream(true).start(); + process.getOutputStream().write(yaml.getBytes(StandardCharsets.UTF_8)); + process.getOutputStream().close(); + var output = + new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8)) + .lines() + .collect(Collectors.joining("\n")); + var exitCode = process.waitFor(); + if (exitCode != 0) { + throw new RuntimeException("kubectl apply failed: " + output); + } + log.info("Applied: {}", output); + } + + private String exec(String... cmd) throws Exception { + var process = new ProcessBuilder(cmd).redirectErrorStream(true).start(); + var output = + new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8)) + .lines() + .collect(Collectors.joining("\n")); + var exitCode = process.waitFor(); + if (exitCode != 0) { + throw new RuntimeException( + "Command failed (exit " + exitCode + "): " + String.join(" ", cmd) + "\n" + output); + } + return output; + } +}