From 276d61233c4e498d83b04c83ad3250d2c382542d Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Tue, 3 Mar 2026 17:24:36 +0800 Subject: [PATCH 1/2] feat: add checkpoint performance test --- .../pixels/retina/TestRetinaCheckpoint.java | 127 ++++++++++++++++++ 1 file changed, 127 insertions(+) diff --git a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java index b1fab563a..2f0606c92 100644 --- a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java +++ b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestRetinaCheckpoint.java @@ -38,6 +38,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ThreadLocalRandom; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -384,6 +385,132 @@ else if (j % 3 == 1) assertFalse("Errors occurred during concurrency test", errorOccurred.get()); } + @Test + public void testCheckpointPerformance() throws RetinaException, IOException, InterruptedException + { + // 1. Performance Test Configuration + double targetDeleteRatio = 0.0; // @TARGET_DELETE_RATIO@ + int numFiles = 50000; + int rowsPerRg = 200000; + long totalRows = (long) numFiles * rowsPerRg; + long timestamp = System.currentTimeMillis(); + + System.out.printf("Target Delete Ratio: %.2f%%%n", targetDeleteRatio * 100); + System.out.printf("Total Rows: %,d%n", totalRows); + + // 2. Populate Visibility Data + System.out.println("[Perf] Populating visibility data..."); + for (int i = 0; i < numFiles; i++) + { + retinaManager.addVisibility(i, 0, rowsPerRg); + } + + // 3. Delete Records based on Ratio + System.out.println("[Perf] Deleting records..."); + long totalDeleted = 0; + if (targetDeleteRatio > 0) + { + // Delete contiguous block for performance stability + int rowsToDeletePerRg = (int) (rowsPerRg * targetDeleteRatio); + for (int i = 0; i < numFiles; i++) + { + // Delete rows 0 to rowsToDeletePerRg - 1 + for (int j = 0; j < rowsToDeletePerRg; j++) + { + retinaManager.deleteRecord(i, 0, j, timestamp); + } + totalDeleted += rowsToDeletePerRg; + } + } + double actualRatio = (double) totalDeleted / totalRows; + System.out.printf("Actual Ratio: %.2f%%%n", actualRatio * 100); + + // Measure Memory before Offload + System.gc(); + Thread.sleep(1000); + long memBeforeOffload = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + + // 4. Register Offload (Checkpoint Creation) + System.out.println("[Perf] Starting Offload..."); + long startOffload = System.nanoTime(); + retinaManager.registerOffload(timestamp); + long endOffload = System.nanoTime(); + double offloadTimeMs = (endOffload - startOffload) / 1_000_000.0; + System.out.printf("Total Offload Time: %.2f ms%n", offloadTimeMs); + + // Measure Peak Memory (Approximation: Current - Before) + long memAfterOffload = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + double peakMemMb = Math.max(0, (memAfterOffload - memBeforeOffload) / (1024.0 * 1024.0)); + System.out.printf("Offload Peak Mem Overhead: %.2f MB%n", peakMemMb); + + // File Size + String checkpointPath = resolve(testCheckpointDir, getOffloadFileName(timestamp)); + long fileSizeBytes = storage.getStatus(checkpointPath).getLength(); + double fileSizeMb = fileSizeBytes / (1024.0 * 1024.0); + System.out.printf("Checkpoint File Size: %.2f MB%n", fileSizeMb); + + // Write Throughput + double writeThroughput = fileSizeMb / (offloadTimeMs / 1000.0); + System.out.printf("Write Throughput: %.2f MB/s%n", writeThroughput); + + // 5. Simulate System Restart (Cold Load) + System.out.println("[Perf] Simulating restart..."); + // Rename to GC file to simulate persisted state + String gcPath = resolve(testCheckpointDir, getGcFileName(timestamp)); + // Simple copy since no rename + try (DataInputStream in = storage.open(checkpointPath); + DataOutputStream out = storage.create(gcPath, true, 8 * 1024 * 1024)) + { + byte[] buffer = new byte[64 * 1024]; // 64KB copy buffer + int bytesRead; + while ((bytesRead = in.read(buffer)) != -1) + { + out.write(buffer, 0, bytesRead); + } + } + storage.delete(checkpointPath, false); + + resetSingletonState(); + System.gc(); + Thread.sleep(1000); + long memBeforeLoad = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + + // Recover + long startLoad = System.nanoTime(); + retinaManager.recoverCheckpoints(); + long endLoad = System.nanoTime(); + double loadTimeMs = (endLoad - startLoad) / 1_000_000.0; + System.out.printf("First Load Time (Cold): %.2f ms%n", loadTimeMs); + + // Load Memory Overhead + long memAfterLoad = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + double loadMemMb = Math.max(0, (memAfterLoad - memBeforeLoad) / (1024.0 * 1024.0)); + System.out.printf("Load Memory Overhead: %.2f MB%n", loadMemMb); + + // Read Throughput + double readThroughput = fileSizeMb / (loadTimeMs / 1000.0); + System.out.printf("Read/Parse Throughput: %.2f MB/s%n", readThroughput); + + // 6. Avg Memory Hit Latency + System.out.println("[Perf] Measuring Memory Hit Latency..."); + long totalLatencyNs = 0; + int latencySamples = 10000; + for (int i = 0; i < latencySamples; i++) + { + // Random file query + long randomFileId = ThreadLocalRandom.current().nextInt(numFiles); + long startQuery = System.nanoTime(); + retinaManager.queryVisibility(randomFileId, 0, timestamp); + long endQuery = System.nanoTime(); + totalLatencyNs += (endQuery - startQuery); + } + double avgLatencyMs = (totalLatencyNs / (double) latencySamples) / 1_000_000.0; + System.out.printf("Avg Memory Hit Latency: %.4f ms%n", avgLatencyMs); + + // Cleanup + storage.delete(gcPath, false); + } + /** * Use reflection to reset internal state of RetinaResourceManager, simulating a restart. */ From 40ea77c8af340066c9cad4d67934ee653d26cb75 Mon Sep 17 00:00:00 2001 From: Dongyang Geng Date: Tue, 3 Mar 2026 19:26:40 +0800 Subject: [PATCH 2/2] feat: implement multiload checkpoint file --- .../pixels/common/utils/CheckpointFileIO.java | 218 ++++++++++++++++++ .../reader/VisibilityCheckpointCache.java | 57 +---- .../reader/TestVisibilityCheckpointCache.java | 39 ++-- .../pixels/retina/RetinaResourceManager.java | 69 ++---- 4 files changed, 259 insertions(+), 124 deletions(-) create mode 100644 pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/CheckpointFileIO.java diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/CheckpointFileIO.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/CheckpointFileIO.java new file mode 100644 index 000000000..bd78ab64e --- /dev/null +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/CheckpointFileIO.java @@ -0,0 +1,218 @@ +/* + * Copyright 2026 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ +package io.pixelsdb.pixels.common.utils; + +import io.pixelsdb.pixels.common.physical.PhysicalReader; +import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil; +import io.pixelsdb.pixels.common.physical.Storage; +import io.pixelsdb.pixels.common.physical.StorageFactory; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; + +/** + * Unified checkpoint file read/write utility class. + *

+ * Checkpoint file format: + *

+ * [4B] rgCount                         -- total number of entries
+ * Repeated rgCount times:
+ *   [8B] fileId                        -- file ID
+ *   [4B] rgId                          -- Row Group ID
+ *   [4B] recordNum                     -- number of records
+ *   [4B] bitmapLen                     -- length of bitmap array
+ *   [8B x bitmapLen] bitmap            -- visibility bitmap data
+ * 
+ *

+ * This class is shared by the server-side (RetinaResourceManager) and the client-side + * (VisibilityCheckpointCache) to eliminate duplicated read/write logic and ensure + * consistent file format across both sides. + */ +public class CheckpointFileIO +{ + private static final Logger logger = LogManager.getLogger(CheckpointFileIO.class); + + /** + * A checkpoint entry containing the visibility information of a single Row Group. + */ + public static class CheckpointEntry + { + public final long fileId; + public final int rgId; + public final int recordNum; + public final long[] bitmap; + + public CheckpointEntry(long fileId, int rgId, int recordNum, long[] bitmap) + { + this.fileId = fileId; + this.rgId = rgId; + this.recordNum = recordNum; + this.bitmap = bitmap; + } + } + + /** + * Functional interface for processing each CheckpointEntry during parallel reading. + */ + @FunctionalInterface + public interface EntryConsumer + { + void accept(CheckpointEntry entry); + } + + private CheckpointFileIO() + { + } + + /** + * Write checkpoint entries to a file. + *

+ * Uses a producer-consumer pattern: the caller puts CheckpointEntry objects into the queue, + * and this method consumes totalRgs entries from the queue and writes them sequentially. + * + * @param path the file path + * @param totalRgs total number of entries to write + * @param queue blocking queue containing CheckpointEntry objects + * @throws Exception if writing fails + */ + public static void writeCheckpoint(String path, int totalRgs, BlockingQueue queue) throws Exception + { + Storage storage = StorageFactory.Instance().getStorage(path); + try (DataOutputStream out = storage.create(path, true, 8 * 1024 * 1024)) + { + out.writeInt(totalRgs); + for (int i = 0; i < totalRgs; i++) + { + CheckpointEntry entry = queue.take(); + out.writeLong(entry.fileId); + out.writeInt(entry.rgId); + out.writeInt(entry.recordNum); + out.writeInt(entry.bitmap.length); + for (long l : entry.bitmap) + { + out.writeLong(l); + } + } + out.flush(); + } + } + + /** + * Read and parse a checkpoint file in parallel using the specified executor. + * + * @param path the file path + * @param consumer callback for each parsed CheckpointEntry + * @param executor the executor service for parallel parsing + * @return the rgCount (total number of entries) in the file + * @throws IOException if reading fails + */ + public static int readCheckpointParallel(String path, EntryConsumer consumer, ExecutorService executor) throws IOException + { + Storage storage = StorageFactory.Instance().getStorage(path); + long fileLength = storage.getStatus(path).getLength(); + + // Step 1: Read the entire file into byte[] at once + byte[] content; + try (PhysicalReader reader = PhysicalReaderUtil.newPhysicalReader(storage, path)) + { + ByteBuffer buffer = reader.readFully((int) fileLength); + if (buffer.hasArray()) + { + content = buffer.array(); + } else + { + content = new byte[(int) fileLength]; + buffer.get(content); + } + } + + ByteBuffer buf = ByteBuffer.wrap(content); + int rgCount = buf.getInt(); + + if (rgCount > 0) + { + // Step 2: Sequential scan to build offset index (lightweight, pointer jumps only) + int[] offsets = new int[rgCount]; + for (int i = 0; i < rgCount; i++) + { + offsets[i] = buf.position(); + buf.position(buf.position() + 8 + 4 + 4); // skip fileId(8) + rgId(4) + recordNum(4) + int bitmapLen = buf.getInt(); + buf.position(buf.position() + bitmapLen * 8); // skip bitmap data + } + + // Step 3: Parallel parsing + int parallelism = Math.min(Runtime.getRuntime().availableProcessors(), + Math.max(1, rgCount / 64)); + int batchSize = (rgCount + parallelism - 1) / parallelism; + List> futures = new ArrayList<>(); + + for (int batchStart = 0; batchStart < rgCount; batchStart += batchSize) + { + int start = batchStart; + int end = Math.min(batchStart + batchSize, rgCount); + futures.add(CompletableFuture.runAsync(() -> { + for (int i = start; i < end; i++) + { + // Each thread uses its own ByteBuffer instance (sharing the same byte[], thread-safe) + ByteBuffer slice = ByteBuffer.wrap(content); + slice.position(offsets[i]); + long fileId = slice.getLong(); + int rgId = slice.getInt(); + int recordNum = slice.getInt(); + int len = slice.getInt(); + long[] bitmap = new long[len]; + for (int j = 0; j < len; j++) + { + bitmap[j] = slice.getLong(); + } + consumer.accept(new CheckpointEntry(fileId, rgId, recordNum, bitmap)); + } + }, executor)); + } + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + } + + return rgCount; + } + + /** + * Read and parse a checkpoint file in parallel using the default ForkJoinPool.commonPool(). + * + * @param path the file path + * @param consumer callback for each parsed CheckpointEntry + * @return the rgCount (total number of entries) in the file + * @throws IOException if reading fails + */ + public static int readCheckpointParallel(String path, EntryConsumer consumer) throws IOException + { + return readCheckpointParallel(path, consumer, ForkJoinPool.commonPool()); + } +} diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/VisibilityCheckpointCache.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/VisibilityCheckpointCache.java index aac767ab8..9e3c525cd 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/VisibilityCheckpointCache.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/VisibilityCheckpointCache.java @@ -21,18 +21,12 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; -import io.pixelsdb.pixels.common.physical.PhysicalReader; -import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil; -import io.pixelsdb.pixels.common.physical.Storage; -import io.pixelsdb.pixels.common.physical.StorageFactory; +import io.pixelsdb.pixels.common.utils.CheckpointFileIO; import io.pixelsdb.pixels.common.utils.ConfigFactory; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -88,47 +82,18 @@ public long[] getVisibilityBitmap(long timestamp, String checkpointPath, long ta private Map loadCheckpointFile(String path) throws IOException { - long start = System.currentTimeMillis(); - Storage storage = StorageFactory.Instance().getStorage(path); - long fileLength = storage.getStatus(path).getLength(); + long startTime = System.currentTimeMillis(); - byte[] content; - try (PhysicalReader reader = PhysicalReaderUtil.newPhysicalReader(storage, path)) - { - ByteBuffer buffer = reader.readFully((int) fileLength); - if (buffer.hasArray()) - { - content = buffer.array(); - } - else - { - content = new byte[(int) fileLength]; - buffer.get(content); - } - } + // Use ConcurrentHashMap to support concurrent writes from parallel parsing + Map timestampCache = new ConcurrentHashMap<>(); - Map timestampCache; - try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(content))) - { - int rgCount = in.readInt(); - // Initial capacity: count / 0.75 + 1 to avoid rehash - timestampCache = new ConcurrentHashMap<>((int) (rgCount / 0.75) + 1); - - for (int i = 0; i < rgCount; i++) - { - long fileId = in.readLong(); - int rgId = in.readInt(); - int len = in.readInt(); - long[] bitmap = new long[len]; - for (int j = 0; j < len; j++) - { - bitmap[j] = in.readLong(); - } - timestampCache.put(fileId + "_" + rgId, bitmap); - } - } - long end = System.currentTimeMillis(); - logger.info("Loaded visibility checkpoint from {} in {} ms, RG count: {}", path, (end - start), timestampCache.size()); + // Use CheckpointFileIO for unified read + parallel parsing logic + int rgCount = CheckpointFileIO.readCheckpointParallel(path, entry -> { + timestampCache.put(entry.fileId + "_" + entry.rgId, entry.bitmap); + }); + + long endTime = System.currentTimeMillis(); + logger.info("Loaded visibility checkpoint from {} in {} ms, RG count: {}", path, (endTime - startTime), timestampCache.size()); return timestampCache; } } diff --git a/pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java b/pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java index e92feacfb..ed1bcbe97 100644 --- a/pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java +++ b/pixels-core/src/test/java/io/pixelsdb/pixels/core/reader/TestVisibilityCheckpointCache.java @@ -21,16 +21,13 @@ import io.pixelsdb.pixels.common.physical.Storage; import io.pixelsdb.pixels.common.physical.StorageFactory; +import io.pixelsdb.pixels.common.utils.CheckpointFileIO; import io.pixelsdb.pixels.common.utils.ConfigFactory; import org.junit.Before; import org.junit.Test; -import java.io.DataOutputStream; import java.io.IOException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.LongAdder; import static org.junit.Assert.assertNotNull; @@ -65,30 +62,28 @@ private String resolve(String dir, String filename) { /** * Helper to create a dummy checkpoint file. */ - private void createDummyCheckpoint(String path, int numFiles, int rgsPerFile, long[] bitmap) throws IOException + private void createDummyCheckpoint(String path, int numFiles, int rgsPerFile, long[] bitmap) throws Exception { - try (DataOutputStream out = storage.create(path, true, 8 * 1024 * 1024)) + // Default recordNum = bitmap.length * 64 (each long represents 64 rows) + createDummyCheckpoint(path, numFiles, rgsPerFile, bitmap, bitmap.length * 64); + } + + private void createDummyCheckpoint(String path, int numFiles, int rgsPerFile, long[] bitmap, int recordNum) throws Exception + { + int totalRgs = numFiles * rgsPerFile; + BlockingQueue queue = new LinkedBlockingQueue<>(totalRgs); + for (int f = 0; f < numFiles; f++) { - out.writeInt(numFiles * rgsPerFile); - for (int f = 0; f < numFiles; f++) + for (int r = 0; r < rgsPerFile; r++) { - for (int r = 0; r < rgsPerFile; r++) - { - out.writeLong((long)f); - out.writeInt(r); - out.writeInt(bitmap.length); - for (long l : bitmap) - { - out.writeLong(l); - } - } + queue.put(new CheckpointFileIO.CheckpointEntry((long) f, r, recordNum, bitmap)); } - out.flush(); } + CheckpointFileIO.writeCheckpoint(path, totalRgs, queue); } @Test - public void testCacheLoading() throws IOException + public void testCacheLoading() throws Exception { long timestamp = 1000L; String checkpointPath = resolve(testCheckpointDir, "vis_gc_tencent_100.bin"); @@ -108,7 +103,7 @@ public void testCacheLoading() throws IOException * Migrated and adapted performance test. */ @Test - public void testCheckpointPerformance() throws InterruptedException, IOException + public void testCheckpointPerformance() throws Exception { // 1. Configuration parameters int numFiles = 5000; diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java index 5f5670a41..040374e2b 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java @@ -30,6 +30,7 @@ import io.pixelsdb.pixels.common.physical.Storage; import io.pixelsdb.pixels.common.physical.StorageFactory; import io.pixelsdb.pixels.common.transaction.TransService; +import io.pixelsdb.pixels.common.utils.CheckpointFileIO; import io.pixelsdb.pixels.common.utils.ConfigFactory; import io.pixelsdb.pixels.common.utils.RetinaUtils; import io.pixelsdb.pixels.core.PixelsProto; @@ -73,22 +74,6 @@ public class RetinaResourceManager private final Map checkpointRefCounts; - private static class CheckpointEntry - { - final long fileId; - final int rgId; - final int recordNum; - final long[] bitmap; - - CheckpointEntry(long fileId, int rgId, int recordNum, long[] bitmap) - { - this.fileId = fileId; - this.rgId = rgId; - this.recordNum = recordNum; - this.bitmap = bitmap; - } - } - private enum CheckpointType { GC, @@ -324,7 +309,7 @@ private CompletableFuture createCheckpoint(long timestamp, CheckpointType // 2. Use a BlockingQueue for producer-consumer pattern // Limit capacity to avoid excessive memory usage if writing is slow - BlockingQueue queue = new LinkedBlockingQueue<>(1024); + BlockingQueue queue = new LinkedBlockingQueue<>(1024); // 3. Start producer tasks to fetch bitmaps in parallel for (Map.Entry entry : entries) @@ -338,7 +323,7 @@ private CompletableFuture createCheckpoint(long timestamp, CheckpointType int rgId = Integer.parseInt(parts[1]); RGVisibility rgVisibility = entry.getValue(); long[] bitmap = rgVisibility.getVisibilityBitmap(timestamp); - queue.put(new CheckpointEntry(fileId, rgId, (int) rgVisibility.getRecordNum(), bitmap)); + queue.put(new CheckpointFileIO.CheckpointEntry(fileId, rgId, (int) rgVisibility.getRecordNum(), bitmap)); } catch (Exception e) { logger.error("Failed to fetch visibility bitmap for checkpoint", e); @@ -364,27 +349,8 @@ private CompletableFuture createCheckpoint(long timestamp, CheckpointType long startWrite = System.currentTimeMillis(); try { - Storage storage = StorageFactory.Instance().getStorage(filePath); - // Use a temporary file to ensure atomic commit - // Although LocalFS lacks rename, using a synchronized block here - // makes it safe within this JVM instance. - try (DataOutputStream out = storage.create(filePath, true, 8 * 1024 * 1024)) - { - out.writeInt(totalRgs); - for (int i = 0; i < totalRgs; i++) - { - CheckpointEntry entry = queue.take(); - out.writeLong(entry.fileId); - out.writeInt(entry.rgId); - out.writeInt(entry.recordNum); - out.writeInt(entry.bitmap.length); - for (long l : entry.bitmap) - { - out.writeLong(l); - } - } - out.flush(); - } + // Use CheckpointFileIO for unified write logic + CheckpointFileIO.writeCheckpoint(filePath, totalRgs, queue); long endWrite = System.currentTimeMillis(); logger.info("Writing {} checkpoint file to {} took {} ms", type, filePath, (endWrite - startWrite)); @@ -736,23 +702,14 @@ public void recoverCheckpoints() Storage latestStorage = StorageFactory.Instance().getStorage(latestPath); if (latestStorage.exists(latestPath)) { - try (DataInputStream in = latestStorage.open(latestPath)) - { - int rgCount = in.readInt(); - for (int i = 0; i < rgCount; i++) - { - long fileId = in.readLong(); - int rgId = in.readInt(); - int recordNum = in.readInt(); - int len = in.readInt(); - long[] bitmap = new long[len]; - for (int j = 0; j < len; j++) - { - bitmap[j] = in.readLong(); - } - rgVisibilityMap.put(fileId + "_" + rgId, new RGVisibility(recordNum, latestTs, bitmap)); - } - } + // Use CheckpointFileIO for unified read + parallel parsing logic + final long ts = latestTs; + int rgCount = CheckpointFileIO.readCheckpointParallel(latestPath, entry -> { + rgVisibilityMap.put(entry.fileId + "_" + entry.rgId, + new RGVisibility(entry.recordNum, ts, entry.bitmap)); + }, checkpointExecutor); + + logger.info("Recovered {} RG entries from GC checkpoint", rgCount); } } catch (IOException e) {