From 78b266327acba810943646c8071859ca7d2cb00f Mon Sep 17 00:00:00 2001 From: Seongjin Yoon Date: Sun, 5 Apr 2026 11:03:21 -0700 Subject: [PATCH] Fix dataset export --- .../web/service/ResultExportService.scala | 64 +++++++++++++++++++ .../amber/config/EnvironmentalVariable.scala | 2 + 2 files changed, 66 insertions(+) diff --git a/amber/src/main/scala/org/apache/texera/web/service/ResultExportService.scala b/amber/src/main/scala/org/apache/texera/web/service/ResultExportService.scala index e4fdc92da94..1247678c720 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/ResultExportService.scala +++ b/amber/src/main/scala/org/apache/texera/web/service/ResultExportService.scala @@ -86,6 +86,14 @@ object ResultExportService { "http://localhost:9092/api/dataset/did/upload" ) .trim + + lazy val fileServiceCreateDatasetVersionEndpoint: String = + sys.env + .getOrElse( + EnvironmentalVariable.ENV_FILE_SERVICE_CREATE_DATASET_VERSION_ENDPOINT, + "http://localhost:9092/api/dataset/did/version/create" + ) + .trim } class ResultExportService(workflowIdentity: WorkflowIdentity, computingUnitId: Int) { @@ -113,6 +121,18 @@ class ResultExportService(workflowIdentity: WorkflowIdentity, computingUnitId: I } } + // After all operators are uploaded, commit each dataset as a new version + if (successMessages.nonEmpty) { + request.datasetIds.foreach { did => + try { + commitDatasetVersion(user, did, s"Export from workflow ${request.workflowName}") + } catch { + case ex: Exception => + errorMessages += s"Failed to create version for dataset $did: ${ex.getMessage}" + } + } + } + var exportResponse: ResultExportResponse = null if (errorMessages.isEmpty) { exportResponse = ResultExportResponse("success", successMessages.mkString("\n")) @@ -571,6 +591,50 @@ class ResultExportService(workflowIdentity: WorkflowIdentity, computingUnitId: I } } + /** + * Commit staged files in a dataset as a new version by calling the file service. + */ + private def commitDatasetVersion( + user: User, + did: Int, + versionName: String + ): Unit = { + val createVersionUrl = s"$fileServiceCreateDatasetVersionEndpoint" + .replace("did", did.toString) + + var connection: HttpURLConnection = null + try { + val url = new URL(createVersionUrl) + connection = url.openConnection().asInstanceOf[HttpURLConnection] + connection.setDoOutput(true) + connection.setRequestMethod("POST") + connection.setRequestProperty("Content-Type", "text/plain") + connection.setRequestProperty( + "Authorization", + s"Bearer ${JwtAuth.jwtToken(jwtClaims(user, TOKEN_EXPIRE_TIME_IN_MINUTES))}" + ) + + val outputStream = connection.getOutputStream + outputStream.write(versionName.getBytes(StandardCharsets.UTF_8)) + outputStream.close() + + val responseCode = connection.getResponseCode + if (responseCode != HttpURLConnection.HTTP_OK) { + throw new RuntimeException( + s"Failed to create dataset version. Server responded with: $responseCode" + ) + } + } catch { + case e: Exception => + throw new RuntimeException( + s"Error creating version for dataset $did: ${e.getMessage}", + e + ) + } finally { + if (connection != null) connection.disconnect() + } + } + /** * Generate a file name for an operator's exported file */ diff --git a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala index 1adc3233055..c0a37b181b9 100644 --- a/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala +++ b/common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala @@ -38,6 +38,8 @@ object EnvironmentalVariable { val ENV_FILE_SERVICE_GET_PRESIGNED_URL_ENDPOINT = "FILE_SERVICE_GET_PRESIGNED_URL_ENDPOINT" val ENV_FILE_SERVICE_UPLOAD_ONE_FILE_TO_DATASET_ENDPOINT = "FILE_SERVICE_UPLOAD_ONE_FILE_TO_DATASET_ENDPOINT" + val ENV_FILE_SERVICE_CREATE_DATASET_VERSION_ENDPOINT = + "FILE_SERVICE_CREATE_DATASET_VERSION_ENDPOINT" /** * Auth related vars