Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,36 @@ object LakeFSStorageClient {
.sortBy(_.getCreationDate)(Ordering[java.lang.Long].reverse) // Sort in descending order
}

/**
* Fetches all pages from a paginated LakeFS API call.
*
* @param fetch A function that takes a pagination cursor and returns (results, pagination).
* @return All results across all pages.
*/
private def fetchAllPages[T](
fetch: String => (java.util.List[T], Pagination)
): List[T] = {
val allResults = scala.collection.mutable.ListBuffer[T]()
var hasMore = true
var after = ""

while (hasMore) {
val (results, pagination) = fetch(after)
allResults ++= results.asScala
hasMore = pagination.getHasMore
if (hasMore) after = pagination.getNextOffset
}

allResults.toList
}

def retrieveObjectsOfVersion(repoName: String, commitHash: String): List[ObjectStats] = {
objectsApi.listObjects(repoName, commitHash).execute().getResults.asScala.toList
fetchAllPages[ObjectStats] { after =>
val request = objectsApi.listObjects(repoName, commitHash).amount(1000)
if (after.nonEmpty) request.after(after)
val response = request.execute()
(response.getResults, response.getPagination)
}
}

def retrieveRepositorySize(repoName: String, commitHash: String = ""): Long = {
Expand Down Expand Up @@ -334,12 +362,12 @@ object LakeFSStorageClient {
* @return List of uncommitted object stats.
*/
def retrieveUncommittedObjects(repoName: String): List[Diff] = {
branchesApi
.diffBranch(repoName, branchName)
.execute()
.getResults
.asScala
.toList
fetchAllPages[Diff] { after =>
val request = branchesApi.diffBranch(repoName, branchName).amount(1000)
if (after.nonEmpty) request.after(after)
val response = request.execute()
(response.getResults, response.getPagination)
}
}

def createCommit(repoName: String, branch: String, commitMessage: String): Commit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2445,4 +2445,30 @@ class DatasetResourceSpec
fetchSession(filePath) shouldBe null
fetchPartRows(uploadId) shouldBe empty
}

// ===========================================================================
// Pagination test – verify that listing APIs return more than the default (100 items)
// ===========================================================================

"LakeFS pagination" should "return all files when count exceeds one page for both uncommitted and committed objects" taggedAs Slow in {
val repoName =
s"pagination-${System.nanoTime()}-${Random.alphanumeric.take(6).mkString.toLowerCase}"
LakeFSStorageClient.initRepo(repoName)

val totalFiles = 110
(1 to totalFiles).foreach { i =>
LakeFSStorageClient.writeFileToRepo(
repoName,
s"file-$i.txt",
new ByteArrayInputStream(s"content-$i".getBytes(StandardCharsets.UTF_8))
)
}

// before commit: 110 files should appear as uncommitted diffs
LakeFSStorageClient.retrieveUncommittedObjects(repoName).size shouldEqual totalFiles

// after commit: 110 files should appear as committed objects
val commit = LakeFSStorageClient.withCreateVersion(repoName, "commit all files") {}
LakeFSStorageClient.retrieveObjectsOfVersion(repoName, commit.getId).size shouldEqual totalFiles
}
}
Loading