c_cols)
+
+
cdef object get_table_schema(TsFileReader reader, object table_name):
cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name)
cdef const char * table_name_c = table_name_bytes
diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx
index 4476d24dc..637444571 100644
--- a/python/tsfile/tsfile_reader.pyx
+++ b/python/tsfile/tsfile_reader.pyx
@@ -320,6 +320,54 @@ cdef class TsFileReaderPy:
self.activate_result_set_list.add(pyresult)
return pyresult
+ def query_tree_by_row(self, device_ids: List[str], measurement_names: List[str],
+ offset: int, limit: int) -> ResultSetPy:
+ """
+ Query tree model data by row range.
+
+ Internally queries the full time range and applies offset/limit at the
+ result-set level. Once ``limit`` rows are returned, no further data is
+ loaded from storage.
+
+ :param device_ids: List of device identifiers to query.
+ :param measurement_names: List of measurement names to query.
+ :param offset: Number of leading rows to skip (>= 0).
+ :param limit: Maximum number of rows to return. < 0 means unlimited.
+ :return: ResultSet containing query results.
+ """
+ cdef ResultSet result
+ result = tsfile_reader_query_tree_by_row_c(
+ self.reader, device_ids, measurement_names, offset, limit)
+ pyresult = ResultSetPy(self, True)
+ pyresult.init_c(result, device_ids[0] if device_ids else "")
+ self.activate_result_set_list.add(pyresult)
+ return pyresult
+
+ def query_table_by_row(self, table_name: str, column_names: List[str],
+ offset: int, limit: int) -> ResultSetPy:
+ """
+ Query table model data by row range.
+
+ Internally queries the full time range and applies offset/limit at the
+ result-set level. Once ``limit`` rows are returned, no further data is
+ loaded from storage.
+
+ :param table_name: Target table name.
+ :param column_names: List of column names to query.
+ :param offset: Number of leading rows to skip (>= 0).
+ :param limit: Maximum number of rows to return. < 0 means unlimited.
+ :return: ResultSet containing query results.
+ """
+ cdef ResultSet result
+ result = tsfile_reader_query_table_by_row_c(
+ self.reader, table_name.lower(),
+ [column_name.lower() for column_name in column_names],
+ offset, limit)
+ pyresult = ResultSetPy(self)
+ pyresult.init_c(result, table_name)
+ self.activate_result_set_list.add(pyresult)
+ return pyresult
+
def notify_result_set_discard(self, result_set: ResultSetPy):
"""
Remove activate result set from activate_result_set_list, called when a result set close.
From ce5adc6aa354f05f0f15e94003e21d43b5570d44 Mon Sep 17 00:00:00 2001
From: 761417898 <761417898@qq.com>
Date: Wed, 11 Mar 2026 16:20:58 +0800
Subject: [PATCH 8/9] java query by row
---
.../read/query/dataset/RowRangeResultSet.java | 184 ++++++++++++
.../read/v4/DeviceTableModelReader.java | 9 +
.../apache/tsfile/read/v4/ITsFileReader.java | 20 ++
.../tsfile/read/v4/ITsFileTreeReader.java | 18 ++
.../tsfile/read/v4/TsFileTreeReader.java | 24 ++
.../DeviceTableModelReaderRowQueryTest.java | 283 ++++++++++++++++++
.../read/TsFileTreeReaderRowQueryTest.java | 215 +++++++++++++
7 files changed, 753 insertions(+)
create mode 100644 java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/RowRangeResultSet.java
create mode 100644 java/tsfile/src/test/java/org/apache/tsfile/read/DeviceTableModelReaderRowQueryTest.java
create mode 100644 java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTreeReaderRowQueryTest.java
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/RowRangeResultSet.java b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/RowRangeResultSet.java
new file mode 100644
index 000000000..eb077efdd
--- /dev/null
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/RowRangeResultSet.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read.query.dataset;
+
+import org.apache.tsfile.write.record.TSRecord;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.Iterator;
+
+/**
+ * A {@link ResultSet} wrapper that applies row-level offset and limit.
+ *
+ * Takes the inner ResultSet and releases it on {@link #close()}. Once the limit is reached,
+ * {@link #next()} returns {@code false} immediately without calling the underlying ResultSet,
+ * avoiding unnecessary data loading.
+ *
+ * @param offset Number of leading rows to skip (must be >= 0).
+ * @param limit Maximum number of rows to return. A value < 0 means no limit.
+ */
+public class RowRangeResultSet implements ResultSet {
+
+ private final ResultSet inner;
+ private final int offset;
+ private final int limit;
+ private int returnedCount;
+ private boolean offsetSkipped;
+
+ public RowRangeResultSet(ResultSet inner, int offset, int limit) {
+ this.inner = inner;
+ this.offset = Math.max(0, offset);
+ this.limit = limit;
+ this.returnedCount = 0;
+ this.offsetSkipped = false;
+ }
+
+ @Override
+ public ResultSetMetadata getMetadata() {
+ return inner.getMetadata();
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ // Skip the first `offset` rows on the first call.
+ if (!offsetSkipped) {
+ for (int i = 0; i < offset; i++) {
+ if (!inner.next()) {
+ offsetSkipped = true;
+ return false;
+ }
+ }
+ offsetSkipped = true;
+ }
+
+ // Limit reached: return false without touching inner ResultSet.
+ // This is the key "pushdown" effect: no further chunk/page loading occurs.
+ if (limit >= 0 && returnedCount >= limit) {
+ return false;
+ }
+
+ boolean hasNext = inner.next();
+ if (hasNext) {
+ returnedCount++;
+ }
+ return hasNext;
+ }
+
+ @Override
+ public int getInt(String columnName) {
+ return inner.getInt(columnName);
+ }
+
+ @Override
+ public int getInt(int columnIndex) {
+ return inner.getInt(columnIndex);
+ }
+
+ @Override
+ public long getLong(String columnName) {
+ return inner.getLong(columnName);
+ }
+
+ @Override
+ public long getLong(int columnIndex) {
+ return inner.getLong(columnIndex);
+ }
+
+ @Override
+ public float getFloat(String columnName) {
+ return inner.getFloat(columnName);
+ }
+
+ @Override
+ public float getFloat(int columnIndex) {
+ return inner.getFloat(columnIndex);
+ }
+
+ @Override
+ public double getDouble(String columnName) {
+ return inner.getDouble(columnName);
+ }
+
+ @Override
+ public double getDouble(int columnIndex) {
+ return inner.getDouble(columnIndex);
+ }
+
+ @Override
+ public boolean getBoolean(String columnName) {
+ return inner.getBoolean(columnName);
+ }
+
+ @Override
+ public boolean getBoolean(int columnIndex) {
+ return inner.getBoolean(columnIndex);
+ }
+
+ @Override
+ public String getString(String columnName) {
+ return inner.getString(columnName);
+ }
+
+ @Override
+ public String getString(int columnIndex) {
+ return inner.getString(columnIndex);
+ }
+
+ @Override
+ public LocalDate getDate(String columnName) {
+ return inner.getDate(columnName);
+ }
+
+ @Override
+ public LocalDate getDate(int columnIndex) {
+ return inner.getDate(columnIndex);
+ }
+
+ @Override
+ public byte[] getBinary(String columnName) {
+ return inner.getBinary(columnName);
+ }
+
+ @Override
+ public byte[] getBinary(int columnIndex) {
+ return inner.getBinary(columnIndex);
+ }
+
+ @Override
+ public boolean isNull(String columnName) {
+ return inner.isNull(columnName);
+ }
+
+ @Override
+ public boolean isNull(int columnIndex) {
+ return inner.isNull(columnIndex);
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public Iterator iterator() {
+ return inner.iterator();
+ }
+}
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java
index 927ac8954..72e3a4a0a 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java
@@ -33,6 +33,7 @@
import org.apache.tsfile.read.expression.ExpressionTree;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.read.query.dataset.ResultSet;
+import org.apache.tsfile.read.query.dataset.RowRangeResultSet;
import org.apache.tsfile.read.query.dataset.TableResultSet;
import org.apache.tsfile.read.query.executor.TableQueryExecutor;
import org.apache.tsfile.read.reader.block.TsBlockReader;
@@ -113,6 +114,14 @@ public ResultSet query(
return new TableResultSet(tsBlockReader, columnNames, dataTypeList, tableName);
}
+ @TsFileApi
+ @Override
+ public ResultSet queryByRow(String tableName, List columnNames, int offset, int limit)
+ throws ReadProcessException, IOException, NoTableException, NoMeasurementException {
+ ResultSet inner = query(tableName, columnNames, Long.MIN_VALUE, Long.MAX_VALUE);
+ return new RowRangeResultSet(inner, offset, limit);
+ }
+
@Override
public void close() {
try {
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java
index 995f73f64..bf29ffcee 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java
@@ -48,6 +48,26 @@ ResultSet query(
@TsFileApi
List getAllTableSchema() throws IOException;
+ /**
+ * Query table model data by row range.
+ *
+ * Internally queries the full time range and applies offset/limit at the result-set level.
+ * Once {@code limit} rows are returned, no further data is loaded from storage.
+ *
+ * @param tableName target table name
+ * @param columnNames list of column names to query
+ * @param offset number of leading rows to skip (>= 0)
+ * @param limit maximum number of rows to return; < 0 means unlimited
+ * @return a {@link ResultSet} containing the query results
+ * @throws ReadProcessException if a read processing error occurs
+ * @throws IOException if an I/O error occurs
+ * @throws NoTableException if the table does not exist
+ * @throws NoMeasurementException if a column does not exist
+ */
+ @TsFileApi
+ ResultSet queryByRow(String tableName, List columnNames, int offset, int limit)
+ throws ReadProcessException, IOException, NoTableException, NoMeasurementException;
+
@TsFileApi
void close();
}
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java
index 202553ea8..40ccbad70 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java
@@ -47,6 +47,24 @@ ResultSet query(
@TreeModel
List getDeviceSchema(String deviceId) throws IOException;
+ /**
+ * Query tree model data by row range.
+ *
+ * Internally queries the full time range and applies offset/limit at the result-set level.
+ * Once {@code limit} rows are returned, no further data is loaded from storage.
+ *
+ * @param deviceIds list of device identifiers to query
+ * @param measurementNames list of measurement names to query
+ * @param offset number of leading rows to skip (>= 0)
+ * @param limit maximum number of rows to return; < 0 means unlimited
+ * @return a {@link ResultSet} containing the query results
+ * @throws IOException if an I/O error occurs during query execution
+ */
+ @TsFileApi
+ @TreeModel
+ ResultSet queryByRow(List deviceIds, List measurementNames, int offset, int limit)
+ throws IOException;
+
/** Close underlying resources. */
@TsFileApi
@TreeModel
diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java
index fe7a0c35b..c8b659b1b 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java
@@ -30,6 +30,7 @@
import org.apache.tsfile.read.filter.operator.TimeFilterOperators;
import org.apache.tsfile.read.query.dataset.QueryDataSet;
import org.apache.tsfile.read.query.dataset.ResultSet;
+import org.apache.tsfile.read.query.dataset.RowRangeResultSet;
import org.apache.tsfile.read.query.dataset.TreeResultSet;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -113,6 +114,29 @@ public List getDeviceSchema(String deviceId) throws IOExcepti
return tsfileReader.getMeasurement(new StringArrayDeviceID(deviceId));
}
+ /**
+ * Query tree model data by row range.
+ *
+ * Internally queries the full time range and applies offset/limit at the result-set level.
+ * Once {@code limit} rows are returned, no further data is loaded from storage.
+ *
+ * @param deviceIds list of device identifiers to query
+ * @param measurementNames list of measurement names to query
+ * @param offset number of leading rows to skip (>= 0)
+ * @param limit maximum number of rows to return; < 0 means unlimited
+ * @return a {@link ResultSet} containing the query results
+ * @throws IOException if an I/O error occurs during query execution
+ */
+ @TsFileApi
+ @TreeModel
+ @Override
+ public ResultSet queryByRow(
+ List deviceIds, List measurementNames, int offset, int limit)
+ throws IOException {
+ ResultSet inner = query(deviceIds, measurementNames, Long.MIN_VALUE, Long.MAX_VALUE);
+ return new RowRangeResultSet(inner, offset, limit);
+ }
+
/**
* Close the TsFileTreeReader and release resources.
*
diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/DeviceTableModelReaderRowQueryTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/DeviceTableModelReaderRowQueryTest.java
new file mode 100644
index 000000000..54671eec5
--- /dev/null
+++ b/java/tsfile/src/test/java/org/apache/tsfile/read/DeviceTableModelReaderRowQueryTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read;
+
+import org.apache.tsfile.enums.ColumnCategory;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.read.ReadProcessException;
+import org.apache.tsfile.exception.write.NoMeasurementException;
+import org.apache.tsfile.exception.write.NoTableException;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.read.query.dataset.ResultSet;
+import org.apache.tsfile.read.v4.DeviceTableModelReader;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.v4.ITsFileWriter;
+import org.apache.tsfile.write.v4.TsFileWriterBuilder;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Unit tests for {@link DeviceTableModelReader#queryByRow} (table-model row-range query).
+ *
+ * The test table has {@value #TOTAL} rows with timestamps 0..TOTAL-1 and field values equal to
+ * their timestamps.
+ */
+public class DeviceTableModelReaderRowQueryTest {
+
+ private static final String FILE_PATH = "test_table_reader_row_query.tsfile";
+ private static final int TOTAL = 50;
+ private static final String TABLE = "t1";
+ private static final String FIELD = "s0";
+
+ private DeviceTableModelReader reader;
+
+ @Before
+ public void setUp() throws IOException, WriteProcessException {
+ writeTableFile(FILE_PATH, TABLE, FIELD, TOTAL);
+ reader = new DeviceTableModelReader(new File(FILE_PATH));
+ }
+
+ @After
+ public void tearDown() {
+ if (reader != null) {
+ reader.close();
+ }
+ new File(FILE_PATH).delete();
+ }
+
+ // ① limit=0 → empty result
+ @Test
+ public void testLimitZeroReturnsEmpty()
+ throws ReadProcessException, IOException, NoTableException, NoMeasurementException {
+ Assert.assertEquals(0, countRows(0, 0));
+ }
+
+ // ② limit < total → exactly `limit` rows
+ @Test
+ public void testLimitLessThanTotal()
+ throws ReadProcessException, IOException, NoTableException, NoMeasurementException {
+ Assert.assertEquals(10, countRows(0, 10));
+ }
+
+ // ③ limit > total → all rows
+ @Test
+ public void testLimitExceedsTotal()
+ throws ReadProcessException, IOException, NoTableException, NoMeasurementException {
+ Assert.assertEquals(TOTAL, countRows(0, 9999));
+ }
+
+ // ④ limit=-1 → unlimited, returns all rows
+ @Test
+ public void testNegativeLimitMeansUnlimited()
+ throws ReadProcessException, IOException, NoTableException, NoMeasurementException {
+ Assert.assertEquals(TOTAL, countRows(0, -1));
+ }
+
+ // ⑤ offset + limit in the middle
+ @Test
+ public void testOffsetPlusLimit()
+ throws ReadProcessException, IOException, NoTableException, NoMeasurementException {
+ Assert.assertEquals(15, countRows(10, 15));
+ }
+
+ // ⑥ offset >= total → empty result
+ @Test
+ public void testOffsetBeyondTotal()
+ throws ReadProcessException, IOException, NoTableException, NoMeasurementException {
+ Assert.assertEquals(0, countRows(1000, 10));
+ }
+
+ // ⑦ offset + limit > total → return remaining rows from offset
+ @Test
+ public void testOffsetPlusLimitExceedsTotal()
+ throws ReadProcessException, IOException, NoTableException, NoMeasurementException {
+ // offset=40, limit=20 → only 10 rows remain
+ Assert.assertEquals(10, countRows(40, 20));
+ }
+
+ // ⑧ data correctness: timestamps and values start from `offset`
+ @Test
+ public void testDataCorrectness()
+ throws ReadProcessException, IOException, NoTableException, NoMeasurementException {
+ List columns = Collections.singletonList(FIELD);
+ ResultSet rs = reader.queryByRow(TABLE, columns, 5, 10);
+ int count = 0;
+ while (rs.next()) {
+ long ts = rs.getLong(1); // column 1 = Time
+ long val = rs.getLong(2); // column 2 = s0
+ Assert.assertEquals(5 + count, ts);
+ Assert.assertEquals(5 + count, val);
+ count++;
+ }
+ rs.close();
+ Assert.assertEquals(10, count);
+ }
+
+ // ⑨ metadata is accessible via the result set
+ @Test
+ public void testMetadataAccessible()
+ throws ReadProcessException, IOException, NoTableException, NoMeasurementException {
+ List columns = Collections.singletonList(FIELD);
+ ResultSet rs = reader.queryByRow(TABLE, columns, 0, 5);
+ Assert.assertNotNull(rs.getMetadata());
+ Assert.assertEquals("Time", rs.getMetadata().getColumnName(1));
+ Assert.assertEquals(FIELD, rs.getMetadata().getColumnName(2));
+ rs.close();
+ }
+
+ // ⑩ paging consistency: two pages together equal the full result
+ @Test
+ public void testPaginationConsistency()
+ throws ReadProcessException, IOException, NoTableException, NoMeasurementException {
+ int page1 = countRows(0, 25);
+ int page2 = countRows(25, 25);
+ Assert.assertEquals(TOTAL, page1 + page2);
+ }
+
+ // ⑪ multiple chunks: offset/limit spans chunk boundary correctly
+ @Test
+ public void testMultipleChunksCorrectness()
+ throws IOException,
+ WriteProcessException,
+ ReadProcessException,
+ NoTableException,
+ NoMeasurementException {
+ String filePath = "test_table_reader_row_query_multi_chunk.tsfile";
+ writeTableFileMultiChunk(filePath, TABLE, FIELD, 30, 30);
+ try (DeviceTableModelReader r = new DeviceTableModelReader(new File(filePath))) {
+ // offset=25, limit=20 → rows 25..44
+ List columns = Collections.singletonList(FIELD);
+ ResultSet rs = r.queryByRow(TABLE, columns, 25, 20);
+ int count = 0;
+ while (rs.next()) {
+ long ts = rs.getLong(1);
+ Assert.assertEquals(25 + count, ts);
+ count++;
+ }
+ rs.close();
+ Assert.assertEquals(20, count);
+ } finally {
+ new File(filePath).delete();
+ }
+ }
+
+ // ─────────────────────────────────────────────────────────────────────────────
+ // Helpers
+ // ─────────────────────────────────────────────────────────────────────────────
+
+ private int countRows(int offset, int limit)
+ throws ReadProcessException, IOException, NoTableException, NoMeasurementException {
+ List columns = Collections.singletonList(FIELD);
+ ResultSet rs = reader.queryByRow(TABLE, columns, offset, limit);
+ int count = 0;
+ while (rs.next()) {
+ count++;
+ }
+ rs.close();
+ return count;
+ }
+
+ /**
+ * Write a single-chunk table file with {@code numRows} rows. Timestamps are 0..numRows-1 and
+ * field values equal their timestamps.
+ */
+ private static void writeTableFile(
+ String filePath, String tableName, String fieldName, int numRows)
+ throws IOException, WriteProcessException {
+ TableSchema tableSchema =
+ new TableSchema(
+ tableName,
+ Collections.singletonList(new MeasurementSchema(fieldName, TSDataType.INT64)),
+ Collections.singletonList(ColumnCategory.FIELD));
+ try (ITsFileWriter writer =
+ new TsFileWriterBuilder().file(new File(filePath)).tableSchema(tableSchema).build()) {
+ Tablet tablet =
+ new Tablet(
+ tableName,
+ Collections.singletonList(fieldName),
+ Collections.singletonList(TSDataType.INT64),
+ Collections.singletonList(ColumnCategory.FIELD),
+ numRows);
+ for (int i = 0; i < numRows; i++) {
+ tablet.addTimestamp(i, i);
+ tablet.addValue(fieldName, i, (long) i);
+ }
+ writer.write(tablet);
+ }
+ }
+
+ /**
+ * Write a two-chunk table file by using a tiny memory threshold to force a flush between the two
+ * tablets. First chunk has rows 0..chunk1Rows-1, second chunk has rows
+ * chunk1Rows..chunk1Rows+chunk2Rows-1. Field values equal their timestamps.
+ */
+ private static void writeTableFileMultiChunk(
+ String filePath, String tableName, String fieldName, int chunk1Rows, int chunk2Rows)
+ throws IOException, WriteProcessException {
+ TableSchema tableSchema =
+ new TableSchema(
+ tableName,
+ Collections.singletonList(new MeasurementSchema(fieldName, TSDataType.INT64)),
+ Collections.singletonList(ColumnCategory.FIELD));
+ // memoryThreshold(1) forces a flush after every write, producing multiple chunks.
+ try (ITsFileWriter writer =
+ new TsFileWriterBuilder()
+ .file(new File(filePath))
+ .tableSchema(tableSchema)
+ .memoryThreshold(1)
+ .build()) {
+ Tablet tablet1 =
+ new Tablet(
+ tableName,
+ Collections.singletonList(fieldName),
+ Collections.singletonList(TSDataType.INT64),
+ Collections.singletonList(ColumnCategory.FIELD),
+ chunk1Rows);
+ for (int i = 0; i < chunk1Rows; i++) {
+ tablet1.addTimestamp(i, i);
+ tablet1.addValue(fieldName, i, (long) i);
+ }
+ writer.write(tablet1);
+
+ Tablet tablet2 =
+ new Tablet(
+ tableName,
+ Collections.singletonList(fieldName),
+ Collections.singletonList(TSDataType.INT64),
+ Collections.singletonList(ColumnCategory.FIELD),
+ chunk2Rows);
+ for (int i = 0; i < chunk2Rows; i++) {
+ tablet2.addTimestamp(i, chunk1Rows + i);
+ tablet2.addValue(fieldName, i, (long) (chunk1Rows + i));
+ }
+ writer.write(tablet2);
+ }
+ }
+}
diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTreeReaderRowQueryTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTreeReaderRowQueryTest.java
new file mode 100644
index 000000000..e6a9b4850
--- /dev/null
+++ b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTreeReaderRowQueryTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.read.query.dataset.ResultSet;
+import org.apache.tsfile.read.v4.ITsFileTreeReader;
+import org.apache.tsfile.read.v4.TsFileTreeReaderBuilder;
+import org.apache.tsfile.write.record.TSRecord;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.v4.TsFileTreeWriter;
+import org.apache.tsfile.write.v4.TsFileTreeWriterBuilder;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Unit tests for {@link ITsFileTreeReader#queryByRow} (tree-model row-range query).
+ *
+ * Each device has {@value #TOTAL} rows with timestamps 0..TOTAL-1 and values timestamp * 10.
+ */
+public class TsFileTreeReaderRowQueryTest {
+
+ private static final String FILE_PATH = "test_tree_reader_row_query.tsfile";
+ private static final int TOTAL = 50;
+ private static final String DEVICE = "device";
+ private static final String MEA = "s1";
+
+ private ITsFileTreeReader reader;
+
+ @Before
+ public void setUp() throws IOException, WriteProcessException {
+ writeTreeFile(FILE_PATH, Collections.singletonList(DEVICE), MEA, TOTAL);
+ reader = new TsFileTreeReaderBuilder().file(new File(FILE_PATH)).build();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (reader != null) {
+ reader.close();
+ }
+ new File(FILE_PATH).delete();
+ }
+
+ // ① limit=0 → empty result
+ @Test
+ public void testLimitZeroReturnsEmpty() throws IOException {
+ Assert.assertEquals(0, countRows(reader, DEVICE, MEA, 0, 0));
+ }
+
+ // ② limit < total → exactly `limit` rows
+ @Test
+ public void testLimitLessThanTotal() throws IOException {
+ Assert.assertEquals(20, countRows(reader, DEVICE, MEA, 0, 20));
+ }
+
+ // ③ limit > total → all rows
+ @Test
+ public void testLimitExceedsTotal() throws IOException {
+ Assert.assertEquals(TOTAL, countRows(reader, DEVICE, MEA, 0, 1000));
+ }
+
+ // ④ limit=-1 → unlimited, returns all rows
+ @Test
+ public void testNegativeLimitMeansUnlimited() throws IOException {
+ Assert.assertEquals(TOTAL, countRows(reader, DEVICE, MEA, 0, -1));
+ }
+
+ // ⑤ offset + limit in the middle
+ @Test
+ public void testOffsetPlusLimit() throws IOException {
+ Assert.assertEquals(15, countRows(reader, DEVICE, MEA, 10, 15));
+ }
+
+ // ⑥ offset >= total → empty result
+ @Test
+ public void testOffsetBeyondTotal() throws IOException {
+ Assert.assertEquals(0, countRows(reader, DEVICE, MEA, 1000, 10));
+ }
+
+ // ⑦ offset + limit > total → return remaining rows from offset
+ @Test
+ public void testOffsetPlusLimitExceedsTotal() throws IOException {
+ // offset=40, limit=20 → only 10 rows remain
+ Assert.assertEquals(10, countRows(reader, DEVICE, MEA, 40, 20));
+ }
+
+ // ⑧ data correctness: timestamps and values start from `offset`
+ @Test
+ public void testDataCorrectness() throws IOException {
+ List deviceIds = Collections.singletonList(DEVICE);
+ List measurements = Collections.singletonList(MEA);
+ ResultSet rs = reader.queryByRow(deviceIds, measurements, 5, 10);
+ int count = 0;
+ while (rs.next()) {
+ long ts = rs.getLong(1); // column 1 = Time
+ long val = rs.getLong(2); // column 2 = measurement
+ Assert.assertEquals(5 + count, ts);
+ Assert.assertEquals((5 + count) * 10L, val);
+ count++;
+ }
+ rs.close();
+ Assert.assertEquals(10, count);
+ }
+
+ // ⑨ metadata is accessible via the result set
+ @Test
+ public void testMetadataAccessible() throws IOException {
+ List deviceIds = Collections.singletonList(DEVICE);
+ List measurements = Collections.singletonList(MEA);
+ ResultSet rs = reader.queryByRow(deviceIds, measurements, 0, 5);
+ Assert.assertNotNull(rs.getMetadata());
+ // Column 1 is always "Time"; column 2 is the full path "device.s1"
+ Assert.assertEquals("Time", rs.getMetadata().getColumnName(1));
+ Assert.assertEquals(DEVICE + "." + MEA, rs.getMetadata().getColumnName(2));
+ rs.close();
+ }
+
+ // ⑩ paging consistency: two pages together equal the full result
+ @Test
+ public void testPaginationConsistency() throws IOException {
+ int page1 = countRows(reader, DEVICE, MEA, 0, 25);
+ int page2 = countRows(reader, DEVICE, MEA, 25, 25);
+ Assert.assertEquals(TOTAL, page1 + page2);
+ }
+
+ // ⑪ multiple devices: offset/limit applied to merged result
+ @Test
+ public void testMultipleDevices() throws IOException, WriteProcessException {
+ String filePath = "test_tree_reader_row_query_multi.tsfile";
+ writeTreeFile(filePath, Arrays.asList("dev1", "dev2"), MEA, 20);
+ try (ITsFileTreeReader r = new TsFileTreeReaderBuilder().file(new File(filePath)).build()) {
+ int count = countRows(r, null, MEA, 5, 10);
+ Assert.assertEquals(10, count);
+ } finally {
+ new File(filePath).delete();
+ }
+ }
+
+ // ─────────────────────────────────────────────────────────────────────────────
+ // Helpers
+ // ─────────────────────────────────────────────────────────────────────────────
+
+ /**
+ * Count rows returned by {@code queryByRow}.
+ *
+ * @param r reader (already open)
+ * @param device device ID; if {@code null}, both "dev1" and "dev2" are queried
+ * @param mea measurement name
+ * @param offset row offset
+ * @param limit row limit
+ */
+ private int countRows(ITsFileTreeReader r, String device, String mea, int offset, int limit)
+ throws IOException {
+ List deviceIds =
+ device != null ? Collections.singletonList(device) : Arrays.asList("dev1", "dev2");
+ List measurements = Collections.singletonList(mea);
+ ResultSet rs = r.queryByRow(deviceIds, measurements, offset, limit);
+ int count = 0;
+ while (rs.next()) {
+ count++;
+ }
+ rs.close();
+ return count;
+ }
+
+ /**
+ * Write a tree-model TsFile with the given devices and measurement. Timestamps are 0..numRows-1,
+ * values are timestamp * 10 (INT64).
+ */
+ private static void writeTreeFile(
+ String filePath, List deviceIds, String measurement, int numRows)
+ throws IOException, WriteProcessException {
+ File file = new File(filePath);
+ MeasurementSchema schema = new MeasurementSchema(measurement, TSDataType.INT64);
+ try (TsFileTreeWriter writer = new TsFileTreeWriterBuilder().file(file).build()) {
+ for (String deviceId : deviceIds) {
+ writer.registerTimeseries(deviceId, schema);
+ }
+ for (String deviceId : deviceIds) {
+ for (int i = 0; i < numRows; i++) {
+ TSRecord record = new TSRecord(deviceId, i);
+ record.addPoint(measurement, (long) i * 10);
+ writer.write(record);
+ }
+ }
+ }
+ }
+}
From 98a0bec549166e7d8f2dd1f9840dc19fe18a2825 Mon Sep 17 00:00:00 2001
From: 761417898 <761417898@qq.com>
Date: Wed, 11 Mar 2026 16:55:19 +0800
Subject: [PATCH 9/9] mvn spotless:apply
---
cpp/src/reader/aligned_chunk_reader.cc | 3 +++
cpp/src/writer/tsfile_writer.cc | 11 ++++++-----
cpp/src/writer/value_page_writer.h | 3 ++-
3 files changed, 11 insertions(+), 6 deletions(-)
diff --git a/cpp/src/reader/aligned_chunk_reader.cc b/cpp/src/reader/aligned_chunk_reader.cc
index 60d9c819c..14250e7f8 100644
--- a/cpp/src/reader/aligned_chunk_reader.cc
+++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -550,6 +550,7 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
row_appender.append_null(1); \
continue; \
} \
+ assert(value_decoder_->has_remaining(value_in)); \
if (!value_decoder_->has_remaining(value_in)) { \
return common::E_DATA_INCONSISTENCY; \
} \
@@ -596,6 +597,7 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
row_appender.append_null(1);
continue;
}
+ assert(value_decoder_->has_remaining(value_in));
if (!value_decoder_->has_remaining(value_in)) {
return common::E_DATA_INCONSISTENCY;
}
@@ -681,6 +683,7 @@ int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK(
}
if (should_read_data) {
+ assert(value_decoder_->has_remaining(value_in));
if (!value_decoder_->has_remaining(value_in)) {
return E_DATA_INCONSISTENCY;
}
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 52db8ea48..856e19698 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -809,7 +809,8 @@ int TsFileWriter::write_table(Tablet& tablet) {
return ret;
}
// Row-by-row write so that when time page seals (e.g. by memory
- // threshold), we can seal all value pages together (Java semantics).
+ // threshold), we can seal all value pages together (Java
+ // semantics).
for (int i = start_idx; i < end_idx; i++) {
int32_t time_pages_before = time_chunk_writer->num_of_pages();
if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[i]))) {
@@ -822,9 +823,8 @@ int TsFileWriter::write_table(Tablet& tablet) {
ValueChunkWriter* value_chunk_writer =
value_chunk_writers[field_col_count];
if (!IS_NULL(value_chunk_writer) &&
- RET_FAIL(value_write_column(value_chunk_writer,
- tablet, col, i,
- i + 1))) {
+ RET_FAIL(value_write_column(
+ value_chunk_writer, tablet, col, i, i + 1))) {
return ret;
}
field_col_count++;
@@ -835,7 +835,8 @@ int TsFileWriter::write_table(Tablet& tablet) {
for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
if (!IS_NULL(value_chunk_writers[k]) &&
value_chunk_writers[k]->has_current_page_data() &&
- RET_FAIL(value_chunk_writers[k]->seal_current_page())) {
+ RET_FAIL(
+ value_chunk_writers[k]->seal_current_page())) {
return ret;
}
}
diff --git a/cpp/src/writer/value_page_writer.h b/cpp/src/writer/value_page_writer.h
index a0a6839c6..9057102cb 100644
--- a/cpp/src/writer/value_page_writer.h
+++ b/cpp/src/writer/value_page_writer.h
@@ -63,7 +63,8 @@ struct ValuePageData {
compressor_ = nullptr;
}
- /** Clear pointers without freeing (transfer ownership to another holder). */
+ /** Clear pointers without freeing (transfer ownership to another holder).
+ */
void clear() {
col_notnull_bitmap_buf_size_ = 0;
value_buf_size_ = 0;