Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,18 @@
import io.pixelsdb.pixels.common.physical.StorageFactory;
import io.pixelsdb.pixels.common.retina.RetinaService;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.common.utils.DateUtil;
import io.pixelsdb.pixels.common.utils.NetUtils;
import io.pixelsdb.pixels.common.utils.PixelsFileNameUtils;
import io.pixelsdb.pixels.core.compactor.CompactLayout;
import io.pixelsdb.pixels.core.compactor.PixelsCompactor;
import net.sourceforge.argparse4j.inf.Namespace;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -114,16 +117,52 @@ public void execute(Namespace ns, String command) throws Exception
}
}

// Issue #1305: obtain local hostname for the unified compact file naming.
String hostName = NetUtils.getLocalHostName();

/**
* Issue #1305:
* Group files by virtualNodeId before compaction.
* GC-eligible files (retina/ordered/compact) carry a real virtualNodeId and must
* not be mixed across groups to preserve create_ts/delete_ts monotonicity.
* single-type files fall into the VNODE_ID_NONE (-1) bucket and are compacted freely.
* copy-type files and unrecognised-format files are skipped.
*/
Map<Integer, List<Status>> groupedStatuses = new LinkedHashMap<>();
for (Status status : statuses)
{
String path = status.getPath();
PixelsFileNameUtils.PxlFileType fileType = PixelsFileNameUtils.extractFileType(path);
if (fileType == null)
{
System.err.println("Skipping file with unrecognized naming format: " + path);
continue;
}
if (fileType == PixelsFileNameUtils.PxlFileType.COPY)
{
System.err.println("Skipping copy file (test/benchmark data, not for compaction): " + path);
continue;
}
int vNodeId = PixelsFileNameUtils.extractVirtualNodeId(path);
groupedStatuses.computeIfAbsent(vNodeId, k -> new ArrayList<>()).add(status);
}

List<Path> targetPaths = layout.getCompactPaths();
ConcurrentLinkedQueue<File> compactFiles = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<Path> compactPaths = new ConcurrentLinkedQueue<>();
int targetPathId = 0;

// compact
// Issue #1305: iterate over each virtualNodeId group independently to preserve
// the monotonicity invariants required by Storage GC.
long startTime = System.currentTimeMillis();
for (int i = 0, thdId = 0; i < statuses.size(); i += numRowGroupInBlock, ++thdId)
int thdId = 0;
for (Map.Entry<Integer, List<Status>> entry : groupedStatuses.entrySet())
{
if (i + numRowGroupInBlock > statuses.size())
int vNodeId = entry.getKey();
List<Status> groupStatuses = entry.getValue();
int groupSize = groupStatuses.size();

for (int i = 0; i < groupSize; ++thdId)
{
/**
* Issue #160:
Expand All @@ -136,34 +175,38 @@ public void execute(Namespace ns, String command) throws Exception
* and rebuild a pure compactLayout for the tail files as the
* compactLayout in metadata does not work for the tail files.
*/
numRowGroupInBlock = statuses.size() - i;
compactLayout = CompactLayout.buildPure(numRowGroupInBlock, numColumn);
}
// Issue #1305: use local batchSize/batchLayout instead of mutating
// numRowGroupInBlock/compactLayout, so they stay unchanged across batches and groups.
int batchSize = Math.min(numRowGroupInBlock, groupSize - i);
CompactLayout batchLayout = (batchSize < numRowGroupInBlock)
? CompactLayout.buildPure(batchSize, numColumn)
: compactLayout;

List<String> sourcePaths = new ArrayList<>();
for (int j = 0; j < numRowGroupInBlock; ++j)
{
if (!statuses.get(i+j).getPath().endsWith("/"))
List<String> sourcePaths = new ArrayList<>();
for (int j = 0; j < batchSize; ++j)
{
sourcePaths.add(statuses.get(i + j).getPath());
if (!groupStatuses.get(i + j).getPath().endsWith("/"))
{
sourcePaths.add(groupStatuses.get(i + j).getPath());
}
}
}

Path targetPath = targetPaths.get(targetPathId++);
String targetDirPath = targetPath.getUri();
targetPathId %= targetPaths.size();
if (!targetDirPath.endsWith("/"))
{
targetDirPath += "/";
}
String targetFileName = DateUtil.getCurTime() + "_compact.pxl";
String targetFilePath = targetDirPath + targetFileName;
Path targetPath = targetPaths.get(targetPathId++);
String targetDirPath = targetPath.getUri();
targetPathId %= targetPaths.size();
if (!targetDirPath.endsWith("/"))
{
targetDirPath += "/";
}
// Issue #1305: use unified naming format (hostName + vNodeId) instead of DateUtil timestamp only.
String targetFileName = PixelsFileNameUtils.buildCompactFileName(hostName, vNodeId);
String targetFilePath = targetDirPath + targetFileName;

System.out.println("(" + thdId + ") " + sourcePaths.size() +
" ordered files to be compacted into '" + targetFilePath + "'.");
System.out.println("(" + thdId + ") vNodeId=" + vNodeId + ", " + sourcePaths.size() +
" ordered files to be compacted into '" + targetFilePath + "'.");

PixelsCompactor.Builder compactorBuilder = PixelsCompactor.newBuilder()
.setSourcePaths(sourcePaths)
PixelsCompactor.Builder compactorBuilder = PixelsCompactor.newBuilder()
.setSourcePaths(sourcePaths)
/**
* Issue #192:
* No need to deep copy compactLayout as it is never modified in-place
Expand All @@ -173,40 +216,46 @@ public void execute(Namespace ns, String command) throws Exception
*
* Deep copy it if it is in-place modified in the future.
*/
.setCompactLayout(compactLayout)
.setInputStorage(orderStorage)
.setOutputStorage(compactStorage)
.setPath(targetFilePath)
.setBlockSize(blockSize)
.setReplication(replication)
.setBlockPadding(false)
.setHasHiddenColumn(true);

long threadStart = System.currentTimeMillis();
compactExecutor.execute(() -> {
// Issue #192: run compaction in threads.
try
{
// build() spends some time to read file footers and should be called inside sub-thread.
PixelsCompactor pixelsCompactor = compactorBuilder.build();
pixelsCompactor.compact();
pixelsCompactor.close();
File compactFile = new File();
compactFile.setName(targetFileName);
compactFile.setType(File.Type.REGULAR);
compactFile.setNumRowGroup(pixelsCompactor.getNumRowGroup());
compactFile.setPathId(targetPath.getId());
compactFiles.offer(compactFile);
compactPaths.offer(targetPath);
} catch (IOException e)
{
System.err.println("write compact file '" + targetFilePath + "' failed");
e.printStackTrace();
return;
}
System.out.println("Compact file '" + targetFilePath + "' is built in " +
((System.currentTimeMillis() - threadStart) / 1000.0) + "s");
});
.setCompactLayout(batchLayout)
.setInputStorage(orderStorage)
.setOutputStorage(compactStorage)
.setPath(targetFilePath)
.setBlockSize(blockSize)
.setReplication(replication)
.setBlockPadding(false)
.setHasHiddenColumn(true);

final String finalTargetFileName = targetFileName;
final String finalTargetFilePath = targetFilePath;
final Path finalTargetPath = targetPath;
long threadStart = System.currentTimeMillis();

compactExecutor.execute(() -> {
// Issue #192: run compaction in threads.
try
{
PixelsCompactor pixelsCompactor = compactorBuilder.build();
pixelsCompactor.compact();
pixelsCompactor.close();
File compactFile = new File();
compactFile.setName(finalTargetFileName);
compactFile.setType(File.Type.REGULAR);
compactFile.setNumRowGroup(pixelsCompactor.getNumRowGroup());
compactFile.setPathId(finalTargetPath.getId());
compactFiles.offer(compactFile);
compactPaths.offer(finalTargetPath);
} catch (IOException e)
{
System.err.println("write compact file '" + finalTargetFilePath + "' failed");
e.printStackTrace();
return;
}
System.out.println("Compact file '" + finalTargetFilePath + "' is built in " +
((System.currentTimeMillis() - threadStart) / 1000.0) + "s");
});

i += batchSize;
}
}

// Issue #192: wait for the compaction to complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.pixelsdb.pixels.common.physical.StorageFactory;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.common.utils.Constants;
import io.pixelsdb.pixels.common.utils.DateUtil;
import io.pixelsdb.pixels.common.utils.PixelsFileNameUtils;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.hadoop.io.IOUtils;

Expand Down Expand Up @@ -82,8 +82,7 @@ public void execute(Namespace ns, String command) throws Exception
continue;
}
String destPath = destination_ +
sourceName.substring(0, sourceName.indexOf(postfix)) +
"_copy_" + DateUtil.getCurTime() + postfix;
PixelsFileNameUtils.buildCopyFileName(sourceName);
copyExecutor.execute(() -> {
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.StorageFactory;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.common.utils.DateUtil;
import io.pixelsdb.pixels.common.utils.IndexUtils;
import io.pixelsdb.pixels.common.utils.PixelsFileNameUtils;
import io.pixelsdb.pixels.common.utils.RetinaUtils;
import io.pixelsdb.pixels.core.PixelsWriter;
import io.pixelsdb.pixels.core.TypeDescription;
Expand Down Expand Up @@ -183,7 +183,7 @@ private PerVirtualNodeWriter initializeRetinaWriter(int bucketId) throws IOExcep
{
targetDirPath += "/";
}
String targetFileName = targetNode.getAddress() + "_" + DateUtil.getCurTime() + "_" + bucketId + ".pxl";
String targetFileName = PixelsFileNameUtils.buildOrderedFileName(targetNode.getAddress(), targetNode.getVirtualNodeId());
String targetFilePath = targetDirPath + targetFileName;

PixelsWriter pixelsWriter = getPixelsWriter(targetStorage, targetFilePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import io.pixelsdb.pixels.common.metadata.domain.Path;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.physical.StorageFactory;
import io.pixelsdb.pixels.common.utils.Constants;
import io.pixelsdb.pixels.common.utils.DateUtil;
import io.pixelsdb.pixels.core.PixelsWriter;
import io.pixelsdb.pixels.common.utils.NetUtils;
import io.pixelsdb.pixels.common.utils.PixelsFileNameUtils;
import io.pixelsdb.pixels.core.PixelsWriter;
import io.pixelsdb.pixels.core.PixelsWriterImpl;
import io.pixelsdb.pixels.core.TypeDescription;
import io.pixelsdb.pixels.core.vector.VectorizedRowBatch;
Expand Down Expand Up @@ -126,7 +126,7 @@ private void initializePixelsWriter() throws MetadataException, IOException
{
targetDirPath += "/";
}
String targetFileName = Constants.LOAD_DEFAULT_RETINA_PREFIX + "_" + DateUtil.getCurTime() + ".pxl";
String targetFileName = PixelsFileNameUtils.buildSingleFileName(NetUtils.getLocalHostName());
String targetFilePath = targetDirPath + targetFileName;

pixelsWriter = PixelsWriterImpl.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2026 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.common.utils;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
* Common network utilities.
*
* @author hank
* @create 2026-03-16
*/
public final class NetUtils
{
private static final Logger logger = LogManager.getLogger(NetUtils.class);

private NetUtils() {}

/**
* Returns the hostname of the local machine.
*
* <p>Resolution order:
* <ol>
* <li>The {@code HOSTNAME} environment variable — reliable in container / k8s environments
* where {@link InetAddress#getLocalHost()} may return an unpredictable pod address.</li>
* <li>{@link InetAddress#getLocalHost()#getHostName()} — standard JVM resolution.</li>
* <li>{@code "localhost"} — last-resort fallback; a warning is logged.</li>
* </ol>
*
* @return the local hostname, never {@code null}
*/
public static String getLocalHostName()
{
String hostName = System.getenv("HOSTNAME");
if (hostName != null && !hostName.isEmpty())
{
return hostName;
}
try
{
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e)
{
logger.warn("Failed to resolve local hostname via InetAddress, falling back to 'localhost'", e);
return "localhost";
}
}
}
Loading
Loading