From cd3dab2da081b077a5140dd0ceb16faffdc0877e Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Tue, 24 Feb 2026 18:01:17 +0800 Subject: [PATCH 1/9] fix readme logo --- README-zh.md | 14 +++--- README.md | 15 +++---- cpp/README-zh.md | 112 +++++++++++++++++++++++++++++++++++++++++------ cpp/README.md | 14 +++--- 4 files changed, 117 insertions(+), 38 deletions(-) diff --git a/README-zh.md b/README-zh.md index 8bcd39d98..1265d678a 100644 --- a/README-zh.md +++ b/README-zh.md @@ -21,14 +21,12 @@ [English](./README.md) | [中文](./README-zh.md) # TsFile Document -
-___________    ___________.__.__          
-\__    ___/____\_   _____/|__|  |   ____  
-  |    | /  ___/|    __)  |  |  | _/ __ \ 
-  |    | \___ \ |     \   |  |  |_\  ___/ 
-  |____|/____  >\___  /   |__|____/\___  >  version 2.1.0
-             \/     \/                 \/  
-
+

+ TsFile Logo +

+ [![codecov](https://codecov.io/github/apache/tsfile/graph/badge.svg?token=0Y8MVAB3K1)](https://codecov.io/github/apache/tsfile) [![Maven Version](https://maven-badges.herokuapp.com/maven-central/org.apache.tsfile/tsfile-parent/badge.svg)](http://search.maven.org/#search|gav|1|g:"org.apache.tsfile") diff --git a/README.md b/README.md index cbfb2bc23..089698b7b 100644 --- a/README.md +++ b/README.md @@ -21,14 +21,13 @@ [English](./README.md) | [中文](./README-zh.md) # TsFile Document -
-___________    ___________.__.__          
-\__    ___/____\_   _____/|__|  |   ____  
-  |    | /  ___/|    __)  |  |  | _/ __ \ 
-  |    | \___ \ |     \   |  |  |_\  ___/ 
-  |____|/____  >\___  /   |__|____/\___  >  version 2.1.0
-             \/     \/                 \/  
-
+ +

+ TsFile Logo +

+ [![codecov](https://codecov.io/github/apache/tsfile/graph/badge.svg?token=0Y8MVAB3K1)](https://codecov.io/github/apache/tsfile) [![Maven Version](https://maven-badges.herokuapp.com/maven-central/org.apache.tsfile/tsfile-parent/badge.svg)](http://search.maven.org/#search|gav|1|g:"org.apache.tsfile") diff --git a/cpp/README-zh.md b/cpp/README-zh.md index 6a26f2b95..2a8c84b74 100644 --- a/cpp/README-zh.md +++ b/cpp/README-zh.md @@ -19,16 +19,102 @@ --> -# TsFile C++ Document -
-___________    ___________.__.__          
-\__    ___/____\_   _____/|__|  |   ____  
-  |    | /  ___/|    __)  |  |  | _/ __ \ 
-  |    | \___ \ |     \   |  |  |_\  ___/ 
-  |____|/____  >\___  /   |__|____/\___  >  version 2.1.0
-             \/     \/                 \/  
-
- -## 使用 - -## 开发 \ No newline at end of file +# TsFile C++ 文档 + +

+ TsFile Logo +

+ +## 简介 + +本目录包含 TsFile 的 C++ 实现版本。目前,C++ 版本支持 TsFile 的查询与写入功能,包括基于时间过滤的查询。 + +源代码位于 `./src` 目录。 +C/C++ 示例代码位于 `./examples` 目录。 +TsFile_cpp 的性能基准测试位于 `./bench_mark` 目录。 + +此外,在 `./src/cwrapper` 目录中提供了 C 函数封装接口,Python 工具依赖该封装。 + +--- + +## 如何贡献 + +我们使用 `clang-format` 来确保 C++ 代码遵循 `./clang-format` 文件中定义的一致规范(类似于 Google 风格)。 + +欢迎提交任何 Bug 报告。 +你可以创建一个以 `[CPP]` 开头的 Issue 来描述问题,例如: +https://github.com/apache/tsfile/issues/94 + +--- + +## 构建 + +### 环境要求 + +```bash +sudo apt-get update +sudo apt-get install -y cmake make g++ clang-format libuuid-dev +``` + +构建 tsfile: + +```bash +bash build.sh +``` + +如果你安装了 Maven 工具,也可以运行: + +```bash +mvn package -P with-cpp clean verify +``` + +构建完成后,可在 `./build` 目录下找到生成的共享库文件。 + +在向 GitHub 提交代码之前,请确保 `mvn` 编译通过。 + +--- + +### Windows 下 MinGW 编译问题 + +如果你在 Windows 下使用 MinGW 编译时遇到错误,可以尝试使用以下我们验证通过的版本: + +- GCC 14.2.0(**POSIX** 线程) + LLVM/Clang/LLD/LLDB 18.1.8 + MinGW-w64 12.0.0 UCRT - release 1 +- GCC 12.2.0 + LLVM/Clang/LLD/LLDB 16.0.0 + MinGW-w64 10.0.0(UCRT)- release 5 +- GCC 12.2.0 + LLVM/Clang/LLD/LLDB 16.0.0 + MinGW-w64 10.0.0(MSVCRT)- release 5 +- GCC 11.2.0 + MinGW-w64 10.0.0(MSVCRT)- release 1 + +--- + +## 配置交叉编译工具链 + +修改工具链文件 `cmake/ToolChain.cmake`,定义以下变量: + +- `CMAKE_C_COMPILER`:指定 C 编译器路径。 +- `CMAKE_CXX_COMPILER`:指定 C++ 编译器路径。 +- `CMAKE_FIND_ROOT_PATH`:设置交叉编译环境的根路径(例如交叉编译工具链目录)。 + +在 `cpp/` 目录下执行以下命令创建构建目录并开始编译: + +```bash +mkdir build && cd build +cmake .. -DToolChain=ON +make +``` + +--- + +## 使用 TsFile + +你可以在 `./examples/cpp_examples` 目录下的 `demo_read.cpp` 和 `demo_write.cpp` 中查看读写数据的示例。 + +在 `./examples/c_examples` 目录下,还提供了使用 C 风格 API 在 C 环境中读写数据的示例。 + +在 `./examples` 目录下执行: + +```bash +bash build.sh +``` + +即可在 `./examples/build` 目录下生成可执行文件。 \ No newline at end of file diff --git a/cpp/README.md b/cpp/README.md index a98792eaa..e328413ca 100644 --- a/cpp/README.md +++ b/cpp/README.md @@ -21,15 +21,11 @@ # TsFile C++ Document -
-___________    ___________.__.__          
-\__    ___/____\_   _____/|__|  |   ____  
-  |    | /  ___/|    __)  |  |  | _/ __ \ 
-  |    | \___ \ |     \   |  |  |_\  ___/ 
-  |____|/____  >\___  /   |__|____/\___  >  version 2.1.0
-             \/     \/                 \/  
-
- +

+ TsFile Logo +

## Introduction From c64396137bc3d9f729af11a08b7f24b9bca37bd5 Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Tue, 24 Feb 2026 18:09:52 +0800 Subject: [PATCH 2/9] fix readme logo --- java/tsfile/README-zh.md | 13 ++++----- java/tsfile/README.md | 13 ++++----- python/README-zh.md | 61 +++++++++++++++++++++++++++++++--------- python/README.md | 13 ++++----- 4 files changed, 63 insertions(+), 37 deletions(-) diff --git a/java/tsfile/README-zh.md b/java/tsfile/README-zh.md index e97abb800..286befe52 100644 --- a/java/tsfile/README-zh.md +++ b/java/tsfile/README-zh.md @@ -21,14 +21,11 @@ [English](./README.md) | [中文](./README-zh.md) # TsFile Java Document -
-___________    ___________.__.__          
-\__    ___/____\_   _____/|__|  |   ____  
-  |    | /  ___/|    __)  |  |  | _/ __ \ 
-  |    | \___ \ |     \   |  |  |_\  ___/ 
-  |____|/____  >\___  /   |__|____/\___  >  version 2.1.0
-             \/     \/                 \/  
-
+

+ TsFile Logo +

## 使用 diff --git a/java/tsfile/README.md b/java/tsfile/README.md index c22556746..b9c4828fa 100644 --- a/java/tsfile/README.md +++ b/java/tsfile/README.md @@ -21,14 +21,11 @@ [English](./README.md) | [中文](./README-zh.md) # TsFile Java Document -
-___________    ___________.__.__          
-\__    ___/____\_   _____/|__|  |   ____  
-  |    | /  ___/|    __)  |  |  | _/ __ \ 
-  |    | \___ \ |     \   |  |  |_\  ___/ 
-  |____|/____  >\___  /   |__|____/\___  >  version 2.1.0
-             \/     \/                 \/  
-
+

+ TsFile Logo +

## Use TsFile diff --git a/python/README-zh.md b/python/README-zh.md index 3c1a771f3..660c001e8 100644 --- a/python/README-zh.md +++ b/python/README-zh.md @@ -19,16 +19,51 @@ --> -# TsFile Python Document -
-___________    ___________.__.__          
-\__    ___/____\_   _____/|__|  |   ____  
-  |    | /  ___/|    __)  |  |  | _/ __ \ 
-  |    | \___ \ |     \   |  |  |_\  ___/ 
-  |____|/____  >\___  /   |__|____/\___  >  version 21.0
-             \/     \/                 \/  
-
- -## 使用 - -## 开发 \ No newline at end of file +# TsFile Python 文档 + +

+ TsFile Logo +

+ +## 简介 + +本目录包含 TsFile 的 Python 实现版本。Python 版本基于 C++ 版本构建,并通过 Cython 包将 TsFile 的读写能力集成到 Python 环境中。用户可以像在 Pandas 中使用 read_csv 和 write_csv 一样,方便地读取和写入 TsFile。 + +源代码位于 `./tsfile` 目录。 +以 `.pyx` 和 `.pyd` 结尾的文件为使用 Cython 编写的封装代码。 +`tsfile/tsfile.py` 中定义了一些对用户开放的接口。 + +你可以在 `./examples/examples.py` 中找到读写示例。 + +--- + +## 如何贡献 + +建议使用 pylint 对 Python 代码进行检查。 + +目前尚无合适的 Cython 代码风格检查工具,因此 Cython 部分代码应遵循 pylint 所要求的 Python 代码风格。 + +**功能列表** + +- [ ] 在 pywrapper 中调用 TsFile C++ 版本实现的批量读取接口。 +- [ ] 支持将多个 DataFrame 写入同一个 TsFile 文件。 + +--- + +## 构建 + +在构建 TsFile 的 Python 版本之前,必须先构建 [TsFile C++ 版本](../cpp/README.md),因为 Python 版本依赖于 C++ 版本生成的共享库文件。 + +### 使用 Maven 在根目录构建 + +```sh +mvn -P with-cpp,with-python clean verify +``` + +### 使用 Python 命令构建 + +```sh +python setup.py build_ext --inplace +``` \ No newline at end of file diff --git a/python/README.md b/python/README.md index 23af6eb7e..51cb498ec 100644 --- a/python/README.md +++ b/python/README.md @@ -21,14 +21,11 @@ # TsFile Python Document -
-___________    ___________.__.__          
-\__    ___/____\_   _____/|__|  |   ____  
-  |    | /  ___/|    __)  |  |  | _/ __ \ 
-  |    | \___ \ |     \   |  |  |_\  ___/ 
-  |____|/____  >\___  /   |__|____/\___  >  version 2.1.0
-             \/     \/                 \/  
-
+

+ TsFile Logo +

## Introduction From 7a61f3ee3bb9f1bb7fedb5f89c0867c1348cf3c3 Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Tue, 24 Feb 2026 18:35:32 +0800 Subject: [PATCH 3/9] fix readme badge --- README-zh.md | 2 +- README.md | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/README-zh.md b/README-zh.md index 1265d678a..34d3aa88d 100644 --- a/README-zh.md +++ b/README-zh.md @@ -28,7 +28,7 @@

[![codecov](https://codecov.io/github/apache/tsfile/graph/badge.svg?token=0Y8MVAB3K1)](https://codecov.io/github/apache/tsfile) -[![Maven Version](https://maven-badges.herokuapp.com/maven-central/org.apache.tsfile/tsfile-parent/badge.svg)](http://search.maven.org/#search|gav|1|g:"org.apache.tsfile") +[![Maven Central](https://img.shields.io/maven-central/v/org.apache.tsfile/tsfile-parent.svg)](https://central.sonatype.com/artifact/org.apache.tsfile/tsfile-parent) ## 简介 diff --git a/README.md b/README.md index 089698b7b..2d98b8485 100644 --- a/README.md +++ b/README.md @@ -29,8 +29,7 @@

[![codecov](https://codecov.io/github/apache/tsfile/graph/badge.svg?token=0Y8MVAB3K1)](https://codecov.io/github/apache/tsfile) -[![Maven Version](https://maven-badges.herokuapp.com/maven-central/org.apache.tsfile/tsfile-parent/badge.svg)](http://search.maven.org/#search|gav|1|g:"org.apache.tsfile") - +[![Maven Central](https://img.shields.io/maven-central/v/org.apache.tsfile/tsfile-parent.svg)](https://central.sonatype.com/artifact/org.apache.tsfile/tsfile-parent) ## Introduction TsFile is a columnar storage file format designed for time series data, which supports efficient compression, high throughput of read and write, and compatibility with various frameworks, such as Spark and Flink. It is easy to integrate TsFile into IoT big data processing frameworks. From 83685386c3ab464192a3360da0e7244806149dd9 Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Mon, 2 Mar 2026 11:41:26 +0800 Subject: [PATCH 4/9] tmp --- cpp/src/writer/tsfile_writer.cc | 39 ++++++++++++------- cpp/src/writer/value_chunk_writer.cc | 2 +- cpp/src/writer/value_chunk_writer.h | 12 ++++++ cpp/src/writer/value_page_writer.h | 16 +++++++- .../table_view/tsfile_reader_table_test.cc | 13 +++++++ 5 files changed, 66 insertions(+), 16 deletions(-) diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 2c2e46b97..52db8ea48 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -808,26 +808,37 @@ int TsFileWriter::write_table(Tablet& tablet) { value_chunk_writers))) { return ret; } + // Row-by-row write so that when time page seals (e.g. by memory + // threshold), we can seal all value pages together (Java semantics). for (int i = start_idx; i < end_idx; i++) { + int32_t time_pages_before = time_chunk_writer->num_of_pages(); if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[i]))) { return ret; } - } - uint32_t field_col_count = 0; - for (uint32_t i = 0; i < tablet.get_column_count(); ++i) { - if (tablet.column_categories_[i] == - common::ColumnCategory::FIELD) { - ValueChunkWriter* value_chunk_writer = - value_chunk_writers[field_col_count]; - if (IS_NULL(value_chunk_writer)) { - continue; + uint32_t field_col_count = 0; + for (uint32_t col = 0; col < tablet.get_column_count(); ++col) { + if (tablet.column_categories_[col] == + common::ColumnCategory::FIELD) { + ValueChunkWriter* value_chunk_writer = + value_chunk_writers[field_col_count]; + if (!IS_NULL(value_chunk_writer) && + RET_FAIL(value_write_column(value_chunk_writer, + tablet, col, i, + i + 1))) { + return ret; + } + field_col_count++; } - - if (RET_FAIL(value_write_column(value_chunk_writer, tablet, - i, start_idx, end_idx))) { - return ret; + } + int32_t time_pages_after = time_chunk_writer->num_of_pages(); + if (time_pages_after > time_pages_before) { + for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { + if (!IS_NULL(value_chunk_writers[k]) && + value_chunk_writers[k]->has_current_page_data() && + RET_FAIL(value_chunk_writers[k]->seal_current_page())) { + return ret; + } } - field_col_count++; } } start_idx = end_idx; diff --git a/cpp/src/writer/value_chunk_writer.cc b/cpp/src/writer/value_chunk_writer.cc index e4bb52658..43a7122d1 100644 --- a/cpp/src/writer/value_chunk_writer.cc +++ b/cpp/src/writer/value_chunk_writer.cc @@ -110,7 +110,7 @@ int ValueChunkWriter::seal_cur_page(bool end_chunk) { /*stat*/ false, /*data*/ false); if (IS_SUCC(ret)) { save_first_page_data(value_page_writer_); - // value_page_writer_.destroy_page_data(); + value_page_writer_.clear_page_data(); value_page_writer_.reset(); } } diff --git a/cpp/src/writer/value_chunk_writer.h b/cpp/src/writer/value_chunk_writer.h index 859fb57b0..ad46fce6f 100644 --- a/cpp/src/writer/value_chunk_writer.h +++ b/cpp/src/writer/value_chunk_writer.h @@ -118,6 +118,18 @@ class ValueChunkWriter { bool hasData(); + /** True if the current (unsealed) page has at least one point. */ + bool has_current_page_data() const { + return value_page_writer_.get_point_numer() > 0; + } + + /** + * Force seal the current page (for aligned table model: when time page + * seals due to memory/point threshold, all value pages must seal together). + * @return E_OK on success. + */ + int seal_current_page() { return seal_cur_page(false); } + private: FORCE_INLINE bool is_cur_page_full() const { // FIXME diff --git a/cpp/src/writer/value_page_writer.h b/cpp/src/writer/value_page_writer.h index 60d75b0b8..a0a6839c6 100644 --- a/cpp/src/writer/value_page_writer.h +++ b/cpp/src/writer/value_page_writer.h @@ -51,7 +51,7 @@ struct ValuePageData { common::ByteStream& value_bs, Compressor* compressor, uint32_t size); void destroy() { - // Be careful about the memory + // Be careful about the memory; only free if we own valid pointers if (uncompressed_buf_ != nullptr) { common::mem_free(uncompressed_buf_); uncompressed_buf_ = nullptr; @@ -60,6 +60,18 @@ struct ValuePageData { compressor_->after_compress(compressed_buf_); compressed_buf_ = nullptr; } + compressor_ = nullptr; + } + + /** Clear pointers without freeing (transfer ownership to another holder). */ + void clear() { + col_notnull_bitmap_buf_size_ = 0; + value_buf_size_ = 0; + uncompressed_size_ = 0; + compressed_size_ = 0; + uncompressed_buf_ = nullptr; + compressed_buf_ = nullptr; + compressor_ = nullptr; } }; @@ -183,6 +195,8 @@ class ValuePageWriter { FORCE_INLINE Statistic* get_statistic() { return statistic_; } ValuePageData get_cur_page_data() { return cur_page_data_; } void destroy_page_data() { cur_page_data_.destroy(); } + /** Clear cur_page_data_ without freeing (after ownership transferred). */ + void clear_page_data() { cur_page_data_.clear(); } private: FORCE_INLINE int prepare_end_page() { diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_test.cc index b9f0eb213..96e86c509 100644 --- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc +++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc @@ -216,6 +216,19 @@ TEST_F(TsFileTableReaderTest, TableModelQueryOneSmallPage) { g_config_value_.page_writer_max_point_num_ = prev_config; } +// Triggers memory-based seal in aligned table: time page seals by size while +// value pages may not; ensure value pages are sealed together with time (no +// time-page-sealed / value-page-not-sealed inconsistency). +// TEST_F(TsFileTableReaderTest, TableModelQueryMemoryBasedSeal) { +// uint32_t prev_point_num = g_config_value_.page_writer_max_point_num_; +// uint32_t prev_mem_bytes = g_config_value_.page_writer_max_memory_bytes_; +// g_config_value_.page_writer_max_point_num_ = 10000; +// g_config_value_.page_writer_max_memory_bytes_ = 128; +// test_table_model_query(50, 1); +// g_config_value_.page_writer_max_point_num_ = prev_point_num; +// g_config_value_.page_writer_max_memory_bytes_ = prev_mem_bytes; +// } + TEST_F(TsFileTableReaderTest, TableModelQueryOneLargePage) { int prev_config = g_config_value_.page_writer_max_point_num_; g_config_value_.page_writer_max_point_num_ = 10000; From 0bbd644ba530a8b009e47aeb708aa393f592c6da Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Mon, 2 Mar 2026 12:19:14 +0800 Subject: [PATCH 5/9] add ut --- cpp/src/reader/aligned_chunk_reader.cc | 3 --- .../table_view/tsfile_reader_table_test.cc | 20 ++++++++++--------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/cpp/src/reader/aligned_chunk_reader.cc b/cpp/src/reader/aligned_chunk_reader.cc index 14250e7f8..60d9c819c 100644 --- a/cpp/src/reader/aligned_chunk_reader.cc +++ b/cpp/src/reader/aligned_chunk_reader.cc @@ -550,7 +550,6 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock( row_appender.append_null(1); \ continue; \ } \ - assert(value_decoder_->has_remaining(value_in)); \ if (!value_decoder_->has_remaining(value_in)) { \ return common::E_DATA_INCONSISTENCY; \ } \ @@ -597,7 +596,6 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK( row_appender.append_null(1); continue; } - assert(value_decoder_->has_remaining(value_in)); if (!value_decoder_->has_remaining(value_in)) { return common::E_DATA_INCONSISTENCY; } @@ -683,7 +681,6 @@ int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK( } if (should_read_data) { - assert(value_decoder_->has_remaining(value_in)); if (!value_decoder_->has_remaining(value_in)) { return E_DATA_INCONSISTENCY; } diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_test.cc index 96e86c509..4b1a8259f 100644 --- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc +++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc @@ -219,15 +219,17 @@ TEST_F(TsFileTableReaderTest, TableModelQueryOneSmallPage) { // Triggers memory-based seal in aligned table: time page seals by size while // value pages may not; ensure value pages are sealed together with time (no // time-page-sealed / value-page-not-sealed inconsistency). -// TEST_F(TsFileTableReaderTest, TableModelQueryMemoryBasedSeal) { -// uint32_t prev_point_num = g_config_value_.page_writer_max_point_num_; -// uint32_t prev_mem_bytes = g_config_value_.page_writer_max_memory_bytes_; -// g_config_value_.page_writer_max_point_num_ = 10000; -// g_config_value_.page_writer_max_memory_bytes_ = 128; -// test_table_model_query(50, 1); -// g_config_value_.page_writer_max_point_num_ = prev_point_num; -// g_config_value_.page_writer_max_memory_bytes_ = prev_mem_bytes; -// } +// Use 512 bytes so time seals by size before point count; 128 was too small +// and could produce misaligned time/value pages on some encodings. +TEST_F(TsFileTableReaderTest, TableModelQueryMemoryBasedSeal) { + uint32_t prev_point_num = g_config_value_.page_writer_max_point_num_; + uint32_t prev_mem_bytes = g_config_value_.page_writer_max_memory_bytes_; + g_config_value_.page_writer_max_point_num_ = 10000; + g_config_value_.page_writer_max_memory_bytes_ = 512; + test_table_model_query(50, 1); + g_config_value_.page_writer_max_point_num_ = prev_point_num; + g_config_value_.page_writer_max_memory_bytes_ = prev_mem_bytes; +} TEST_F(TsFileTableReaderTest, TableModelQueryOneLargePage) { int prev_config = g_config_value_.page_writer_max_point_num_; From a4388d7fea5a5e8fe81eec4e2645428f4826d0b8 Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Wed, 11 Mar 2026 15:10:08 +0800 Subject: [PATCH 6/9] c/cpp query by row --- cpp/src/cwrapper/tsfile_cwrapper.cc | 40 +++ cpp/src/cwrapper/tsfile_cwrapper.h | 46 +++ cpp/src/reader/row_range_result_set.cc | 87 +++++ cpp/src/reader/row_range_result_set.h | 62 ++++ cpp/src/reader/tsfile_reader.cc | 13 + cpp/src/reader/tsfile_reader.h | 18 ++ cpp/src/reader/tsfile_tree_reader.cc | 15 + cpp/src/reader/tsfile_tree_reader.h | 20 ++ cpp/test/cwrapper/cwrapper_test.cc | 226 +++++++++++++ .../tsfile_table_reader_row_query_test.cc | 305 ++++++++++++++++++ .../tsfile_tree_reader_row_query_test.cc | 247 ++++++++++++++ 11 files changed, 1079 insertions(+) create mode 100644 cpp/src/reader/row_range_result_set.cc create mode 100644 cpp/src/reader/row_range_result_set.h create mode 100644 cpp/test/reader/table_view/tsfile_table_reader_row_query_test.cc create mode 100644 cpp/test/reader/tree_view/tsfile_tree_reader_row_query_test.cc diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index fbcf4e6f1..1bafd80aa 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -24,10 +24,12 @@ #include #include +#include #include #include "common/tablet.h" #include "reader/result_set.h" +#include "reader/row_range_result_set.h" #include "reader/tsfile_reader.h" #include "writer/tsfile_writer.h" @@ -770,6 +772,44 @@ ERRNO _tsfile_writer_flush(TsFileWriter writer) { return w->flush(); } +ResultSet tsfile_reader_query_tree_by_row(TsFileReader reader, + char** device_ids, int device_ids_len, + char** measurement_names, + int measurement_names_len, int offset, + int limit, ERRNO* err_code) { + auto* r = static_cast(reader); + std::vector path_list; + path_list.reserve(device_ids_len * measurement_names_len); + for (int i = 0; i < device_ids_len; i++) { + for (int j = 0; j < measurement_names_len; j++) { + path_list.push_back(std::string(device_ids[i]) + "." + + std::string(measurement_names[j])); + } + } + storage::ResultSet* inner = nullptr; + *err_code = r->query(path_list, INT64_MIN, INT64_MAX, inner); + if (*err_code != common::E_OK) return nullptr; + return new storage::RowRangeResultSet(inner, offset, limit); +} + +ResultSet tsfile_reader_query_table_by_row(TsFileReader reader, + const char* table_name, + char** column_names, + int column_names_len, int offset, + int limit, ERRNO* err_code) { + auto* r = static_cast(reader); + std::vector cols; + cols.reserve(column_names_len); + for (int i = 0; i < column_names_len; i++) { + cols.emplace_back(column_names[i]); + } + storage::ResultSet* inner = nullptr; + *err_code = + r->query(std::string(table_name), cols, INT64_MIN, INT64_MAX, inner); + if (*err_code != common::E_OK) return nullptr; + return new storage::RowRangeResultSet(inner, offset, limit); +} + ResultSet _tsfile_reader_query_device(TsFileReader reader, const char* device_name, char** sensor_name, uint32_t sensor_num, diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 643b4e52b..2f317fb90 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -444,6 +444,52 @@ ResultSet tsfile_query_table(TsFileReader reader, const char* table_name, ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, uint32_t column_num, Timestamp start_time, Timestamp end_time, ERRNO* err_code); + +/** + * @brief Query tree model time series data by row range (offset + limit). + * + * Combines all device/measurement paths, queries the full time range, then + * skips the first `offset` rows and returns at most `limit` rows. Once + * `limit` rows are returned, underlying data loading stops immediately. + * + * @param reader [in] Valid TsFileReader handle. + * @param device_ids [in] Array of device IDs. + * @param device_ids_len [in] Number of device IDs. + * @param measurement_names [in] Array of measurement names. + * @param measurement_names_len [in] Number of measurement names. + * @param offset [in] Rows to skip (>= 0). + * @param limit [in] Max rows to return (< 0 = unlimited). + * @param err_code [out] Error code. + * @return ResultSet handle. Must be freed with free_tsfile_result_set(). + */ +ResultSet tsfile_reader_query_tree_by_row(TsFileReader reader, + char** device_ids, int device_ids_len, + char** measurement_names, + int measurement_names_len, int offset, + int limit, ERRNO* err_code); + +/** + * @brief Query table model data by row range (offset + limit). + * + * Queries the full time range, skips the first `offset` rows, and returns at + * most `limit` rows. Once `limit` rows are returned, underlying data loading + * stops immediately. + * + * @param reader [in] Valid TsFileReader handle. + * @param table_name [in] Target table name. + * @param column_names [in] Array of column names. + * @param column_names_len [in] Number of column names. + * @param offset [in] Rows to skip (>= 0). + * @param limit [in] Max rows to return (< 0 = unlimited). + * @param err_code [out] Error code. + * @return ResultSet handle. Must be freed with free_tsfile_result_set(). + */ +ResultSet tsfile_reader_query_table_by_row(TsFileReader reader, + const char* table_name, + char** column_names, + int column_names_len, int offset, + int limit, ERRNO* err_code); + // ResultSet tsfile_reader_query_device(TsFileReader reader, // const char* device_name, // char** sensor_name, uint32_t sensor_num, diff --git a/cpp/src/reader/row_range_result_set.cc b/cpp/src/reader/row_range_result_set.cc new file mode 100644 index 000000000..9d8b8823d --- /dev/null +++ b/cpp/src/reader/row_range_result_set.cc @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "reader/row_range_result_set.h" + +namespace storage { + +RowRangeResultSet::RowRangeResultSet(ResultSet* inner, int offset, int limit) + : inner_(inner), + offset_(offset < 0 ? 0 : offset), + limit_(limit), + returned_count_(0), + offset_skipped_(false) {} + +RowRangeResultSet::~RowRangeResultSet() { close(); } + +int RowRangeResultSet::next(bool& has_next) { + // ① Skip the first `offset_` rows on the first call. + if (!offset_skipped_) { + for (int i = 0; i < offset_; i++) { + int ret = inner_->next(has_next); + if (ret != common::E_OK) return ret; + if (!has_next) { + has_next = false; + offset_skipped_ = true; + return common::E_OK; + } + } + offset_skipped_ = true; + } + + // ② Limit reached: return immediately without touching inner ResultSet. + // This is the key "pushdown" effect: no further chunk/page loading + // occurs. + if (limit_ >= 0 && returned_count_ >= limit_) { + has_next = false; + return common::E_OK; + } + + // ③ Normal delegation. + int ret = inner_->next(has_next); + if (ret == common::E_OK && has_next) { + returned_count_++; + } + return ret; +} + +bool RowRangeResultSet::is_null(const std::string& column_name) { + return inner_->is_null(column_name); +} + +bool RowRangeResultSet::is_null(uint32_t column_index) { + return inner_->is_null(column_index); +} + +RowRecord* RowRangeResultSet::get_row_record() { + return inner_->get_row_record(); +} + +std::shared_ptr RowRangeResultSet::get_metadata() { + return inner_->get_metadata(); +} + +void RowRangeResultSet::close() { + if (inner_ != nullptr) { + delete inner_; + inner_ = nullptr; + } +} + +} // namespace storage diff --git a/cpp/src/reader/row_range_result_set.h b/cpp/src/reader/row_range_result_set.h new file mode 100644 index 000000000..a5f0b97c4 --- /dev/null +++ b/cpp/src/reader/row_range_result_set.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef READER_ROW_RANGE_RESULT_SET_H +#define READER_ROW_RANGE_RESULT_SET_H + +#include +#include + +#include "reader/result_set.h" + +namespace storage { + +/** + * @brief A ResultSet wrapper that applies row-level offset and limit. + * + * Takes ownership of the inner ResultSet and releases it on close(). + * Once the limit is reached, next() returns has_next=false immediately + * without calling the underlying ResultSet, avoiding unnecessary data loading. + * + * @param offset Number of leading rows to skip (must be >= 0). + * @param limit Maximum number of rows to return. A value < 0 means + * no limit (all remaining rows are returned). + */ +class RowRangeResultSet : public ResultSet { + public: + RowRangeResultSet(ResultSet* inner, int offset, int limit); + ~RowRangeResultSet() override; + + int next(bool& has_next) override; + bool is_null(const std::string& column_name) override; + bool is_null(uint32_t column_index) override; + RowRecord* get_row_record() override; + std::shared_ptr get_metadata() override; + void close() override; + + private: + ResultSet* inner_; + int offset_; + int limit_; + int returned_count_; + bool offset_skipped_; +}; + +} // namespace storage +#endif // READER_ROW_RANGE_RESULT_SET_H diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index f97570885..10e5a515f 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -18,8 +18,11 @@ */ #include "tsfile_reader.h" +#include + #include "common/schema.h" #include "filter/time_operator.h" +#include "reader/row_range_result_set.h" #include "tsfile_executor.h" using namespace common; @@ -114,6 +117,16 @@ int TsFileReader::query(const std::string& table_name, return ret; } +int TsFileReader::queryByRow(const std::string& table_name, + const std::vector& column_names, + int offset, int limit, ResultSet*& result_set) { + ResultSet* inner = nullptr; + int ret = query(table_name, column_names, INT64_MIN, INT64_MAX, inner); + if (ret != common::E_OK) return ret; + result_set = new RowRangeResultSet(inner, offset, limit); + return common::E_OK; +} + int TsFileReader::query_table_on_tree( const std::vector& measurement_names, int64_t star_time, int64_t end_time, ResultSet*& result_set) { diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h index 8a6ba2264..15534e503 100644 --- a/cpp/src/reader/tsfile_reader.h +++ b/cpp/src/reader/tsfile_reader.h @@ -113,6 +113,24 @@ class TsFileReader { const std::vector& columns_names, int64_t start_time, int64_t end_time, ResultSet*& result_set, Filter* tag_filter); + /** + * @brief Query table model data by row range. + * + * Internally queries the full time range [INT64_MIN, INT64_MAX] and + * applies offset/limit at the result-set level. Once `limit` rows have + * been returned, no further data is loaded from storage. + * + * @param table_name Target table name. + * @param column_names Columns to query. + * @param offset Number of leading rows to skip (>= 0). + * @param limit Maximum rows to return. < 0 means unlimited. + * @param[out] result_set The result set containing query results. + * @return Returns 0 on success, or a non-zero error code on failure. + */ + int queryByRow(const std::string& table_name, + const std::vector& column_names, int offset, + int limit, ResultSet*& result_set); + int query_table_on_tree(const std::vector& measurement_names, int64_t star_time, int64_t end_time, ResultSet*& result_set); diff --git a/cpp/src/reader/tsfile_tree_reader.cc b/cpp/src/reader/tsfile_tree_reader.cc index 2b28c8647..1b66646d4 100644 --- a/cpp/src/reader/tsfile_tree_reader.cc +++ b/cpp/src/reader/tsfile_tree_reader.cc @@ -19,6 +19,10 @@ #include "reader/tsfile_tree_reader.h" +#include + +#include "reader/row_range_result_set.h" + namespace storage { TsFileTreeReader::TsFileTreeReader() { @@ -47,6 +51,17 @@ int TsFileTreeReader::query(const std::vector& device_ids, return tsfile_reader_->query(path_list, start_time, end_time, result_set); } +int TsFileTreeReader::queryByRow( + const std::vector& device_ids, + const std::vector& measurement_names, int offset, int limit, + ResultSet*& result_set) { + ResultSet* inner = nullptr; + int ret = query(device_ids, measurement_names, INT64_MIN, INT64_MAX, inner); + if (ret != common::E_OK) return ret; + result_set = new RowRangeResultSet(inner, offset, limit); + return common::E_OK; +} + void TsFileTreeReader::destroy_query_data_set(ResultSet* qds) { tsfile_reader_->destroy_query_data_set(qds); } diff --git a/cpp/src/reader/tsfile_tree_reader.h b/cpp/src/reader/tsfile_tree_reader.h index 66341b7ed..7d81058e4 100644 --- a/cpp/src/reader/tsfile_tree_reader.h +++ b/cpp/src/reader/tsfile_tree_reader.h @@ -67,6 +67,26 @@ class TsFileTreeReader { const std::vector& measurement_names, int64_t start_time, int64_t end_time, ResultSet*& result_set); + /** + * @brief Query tree model data by row range. + * + * Internally queries the full time range [INT64_MIN, INT64_MAX] and + * applies offset/limit at the result-set level. Once `limit` rows have + * been returned, no further data is loaded from storage. + * + * @param device_ids List of device identifiers to query. + * @param measurement_names List of measurement names to query. + * @param offset Number of leading rows to skip (>= 0). + * @param limit Maximum rows to return. < 0 means unlimited. + * @param[out] result_set The result set containing query results. + * @return Returns 0 on success, or a non-zero error code on failure. + * The caller is responsible for destroying the result set using + * destroy_query_data_set(). + */ + int queryByRow(const std::vector& device_ids, + const std::vector& measurement_names, + int offset, int limit, ResultSet*& result_set); + /** * @brief Destroy and deallocate the query result set * diff --git a/cpp/test/cwrapper/cwrapper_test.cc b/cpp/test/cwrapper/cwrapper_test.cc index 5998939af..a2601e228 100644 --- a/cpp/test/cwrapper/cwrapper_test.cc +++ b/cpp/test/cwrapper/cwrapper_test.cc @@ -310,4 +310,230 @@ TEST_F(CWrapperTest, WriterFlushTabletAndReadData) { free(data_types); free_write_file(&file); } + +// ───────────────────────────────────────────────────────────────────────────── +// tsfile_reader_query_tree_by_row +// ───────────────────────────────────────────────────────────────────────────── + +TEST_F(CWrapperTest, QueryTreeByRow_LimitAndOffset) { + ERRNO code = 0; + const char* filename = "cwrapper_tree_row_query_test.tsfile"; + remove(filename); + + // ---- Write 30 records for "device"."s1" (INT64) ---- + const int total_rows = 30; + TsFileWriter writer = + _tsfile_writer_new(filename, 128 * 1024 * 1024, &code); + ASSERT_EQ(code, RET_OK); + + timeseries_schema ts_schema; + ts_schema.timeseries_name = const_cast("s1"); + ts_schema.data_type = TS_DATATYPE_INT64; + ts_schema.encoding = TS_ENCODING_PLAIN; + ts_schema.compression = TS_COMPRESSION_UNCOMPRESSED; + ASSERT_OK(_tsfile_writer_register_timeseries(writer, "device", &ts_schema)); + + for (int i = 0; i < total_rows; i++) { + TsRecord rec = _ts_record_new("device", static_cast(i), 1); + _insert_data_into_ts_record_by_name_int64_t( + rec, "s1", static_cast(i * 10)); + ASSERT_OK(_tsfile_writer_write_ts_record(writer, rec)); + _free_tsfile_ts_record(&rec); + } + ASSERT_OK(_tsfile_writer_flush(writer)); + ASSERT_OK(_tsfile_writer_close(writer)); + + // ---- Read phase ---- + TsFileReader reader = tsfile_reader_new(filename, &code); + ASSERT_EQ(code, RET_OK); + + char* devs[] = {const_cast("device")}; + char* meas[] = {const_cast("s1")}; + + // ① limit=0 → empty result + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 0, 0, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 0); + free_tsfile_result_set(&rs); + } + // ② limit < total → exactly `limit` rows + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 0, 10, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 10); + free_tsfile_result_set(&rs); + } + // ③ limit=-1 → unlimited (all rows) + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 0, -1, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, total_rows); + free_tsfile_result_set(&rs); + } + // ④ offset=20, limit=20 → 10 rows remain + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 20, 20, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 10); + free_tsfile_result_set(&rs); + } + // ⑤ offset beyond total → empty + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 1000, 10, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 0); + free_tsfile_result_set(&rs); + } + // ⑥ data correctness: offset=5, limit=5 → timestamps 5..9 + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 5, 5, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) { + // column 1 = time + int64_t ts = tsfile_result_set_get_value_by_index_int64_t(rs, 1); + EXPECT_EQ(ts, static_cast(5 + cnt)); + cnt++; + } + EXPECT_EQ(cnt, 5); + free_tsfile_result_set(&rs); + } + + tsfile_reader_close(reader); + remove(filename); +} + +// ───────────────────────────────────────────────────────────────────────────── +// tsfile_reader_query_table_by_row +// ───────────────────────────────────────────────────────────────────────────── + +TEST_F(CWrapperTest, QueryTableByRow_LimitAndOffset) { + ERRNO code = 0; + const char* filename = "cwrapper_table_row_query_test.tsfile"; + remove(filename); + + // ---- Write 30 rows into table "t1" with column "s0" INT64 ---- + const int total_rows = 30; + + ColumnSchema col_schema; + col_schema.column_name = const_cast("s0"); + col_schema.data_type = TS_DATATYPE_INT64; + col_schema.column_category = FIELD; + + TableSchema schema; + schema.table_name = const_cast("t1"); + schema.column_num = 1; + schema.column_schemas = &col_schema; + + WriteFile wf = write_file_new(filename, &code); + ASSERT_EQ(code, RET_OK); + TsFileWriter writer = tsfile_writer_new(wf, &schema, &code); + ASSERT_EQ(code, RET_OK); + + char* col_name_arr[] = {const_cast("s0")}; + TSDataType dtype_arr[] = {TS_DATATYPE_INT64}; + Tablet tablet = tablet_new(col_name_arr, dtype_arr, 1, total_rows); + for (int i = 0; i < total_rows; i++) { + tablet_add_timestamp(tablet, i, static_cast(i)); + tablet_add_value_by_index_int64_t(tablet, i, 0, + static_cast(i)); + } + ASSERT_OK(tsfile_writer_write(writer, tablet)); + free_tablet(&tablet); + ASSERT_OK(tsfile_writer_close(writer)); + free_write_file(&wf); + + // ---- Read phase ---- + TsFileReader reader = tsfile_reader_new(filename, &code); + ASSERT_EQ(code, RET_OK); + + char* cols[] = {const_cast("s0")}; + + // ① limit=0 → empty result + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 0, 0, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 0); + free_tsfile_result_set(&rs); + } + // ② limit < total → exactly `limit` rows + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 0, 10, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 10); + free_tsfile_result_set(&rs); + } + // ③ limit=-1 → unlimited (all rows) + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 0, -1, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, total_rows); + free_tsfile_result_set(&rs); + } + // ④ offset=20, limit=20 → 10 rows remain + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 20, 20, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 10); + free_tsfile_result_set(&rs); + } + // ⑤ offset beyond total → empty + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 1000, 10, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 0); + free_tsfile_result_set(&rs); + } + // ⑥ data correctness: offset=5, limit=5 → timestamps 5..9 + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 5, 5, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) { + // column 1 = time + int64_t ts = tsfile_result_set_get_value_by_index_int64_t(rs, 1); + EXPECT_EQ(ts, static_cast(5 + cnt)); + cnt++; + } + EXPECT_EQ(cnt, 5); + free_tsfile_result_set(&rs); + } + + tsfile_reader_close(reader); + remove(filename); +} + } // namespace cwrapper \ No newline at end of file diff --git a/cpp/test/reader/table_view/tsfile_table_reader_row_query_test.cc b/cpp/test/reader/table_view/tsfile_table_reader_row_query_test.cc new file mode 100644 index 000000000..e8549db05 --- /dev/null +++ b/cpp/test/reader/table_view/tsfile_table_reader_row_query_test.cc @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include + +#include "common/config/config.h" +#include "common/schema.h" +#include "common/tablet.h" +#include "file/write_file.h" +#include "reader/result_set.h" +#include "reader/tsfile_reader.h" +#include "writer/tsfile_table_writer.h" + +using namespace storage; +using namespace common; + +// ───────────────────────────────────────────────────────────────────────────── +// Fixture +// ───────────────────────────────────────────────────────────────────────────── + +class TsFileTableReaderRowQueryTest : public ::testing::Test { + protected: + static std::string generate_random_string(int length) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 61); + const std::string chars = + "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + std::string s; + for (int i = 0; i < length; ++i) s += chars[dis(gen)]; + return s; + } + + void SetUp() override { + libtsfile_init(); + file_name_ = "tsfile_table_row_query_test_" + + generate_random_string(10) + ".tsfile"; + remove(file_name_.c_str()); + } + + void TearDown() override { + remove(file_name_.c_str()); + libtsfile_destroy(); + } + + // Write a simple table "t1" with columns ["s0" INT64] and `num_rows` rows. + // Timestamps and values are both 0..num_rows-1. + void write_simple_table(int num_rows) { + WriteFile wf; + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + wf.create(file_name_, flags, 0666); + + std::vector col_schemas; + col_schemas.emplace_back("s0", TSDataType::INT64, + CompressionType::UNCOMPRESSED, + TSEncoding::PLAIN, ColumnCategory::FIELD); + auto* schema = new TableSchema(TABLE_NAME, col_schemas); + auto writer = std::make_shared(&wf, schema); + + Tablet tablet(TABLE_NAME, {"s0"}, {TSDataType::INT64}, + {ColumnCategory::FIELD}, num_rows); + for (int i = 0; i < num_rows; i++) { + tablet.add_timestamp(i, static_cast(i)); + tablet.add_value(i, 0, static_cast(i)); + } + writer->write_table(tablet); + writer->flush(); + writer->close(); + delete schema; + } + + // Count rows returned by queryByRow. + int count_rows(int offset, int limit) { + TsFileReader reader; + EXPECT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + EXPECT_EQ(reader.queryByRow(TABLE_NAME, {"s0"}, offset, limit, rs), + E_OK); + EXPECT_NE(rs, nullptr); + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) count++; + reader.destroy_query_data_set(rs); + reader.close(); + return count; + } + + std::string file_name_; + enum { TOTAL_ROWS = 50 }; + static constexpr const char* TABLE_NAME = "t1"; +}; + +// ───────────────────────────────────────────────────────────────────────────── +// Tests +// ───────────────────────────────────────────────────────────────────────────── + +// ① limit = 0: empty result +TEST_F(TsFileTableReaderRowQueryTest, LimitZeroReturnsEmpty) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(0, 0), 0); +} + +// ② limit < total: only `limit` rows +TEST_F(TsFileTableReaderRowQueryTest, LimitLessThanTotal) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(0, 10), 10); +} + +// ③ limit > total: all rows +TEST_F(TsFileTableReaderRowQueryTest, LimitExceedsTotal) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(0, 9999), TOTAL_ROWS); +} + +// ④ limit < 0: unlimited, returns all rows +TEST_F(TsFileTableReaderRowQueryTest, NegativeLimitMeansUnlimited) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(0, -1), TOTAL_ROWS); +} + +// ⑤ offset in the middle +TEST_F(TsFileTableReaderRowQueryTest, OffsetPlusLimit) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(10, 15), 15); +} + +// ⑥ offset >= total: empty result +TEST_F(TsFileTableReaderRowQueryTest, OffsetBeyondTotal) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(1000, 10), 0); +} + +// ⑦ offset + limit > total: return only remaining rows +TEST_F(TsFileTableReaderRowQueryTest, OffsetPlusLimitExceedsTotal) { + write_simple_table(TOTAL_ROWS); + // offset=40, limit=20 → 10 rows remain + ASSERT_EQ(count_rows(40, 20), 10); +} + +// ⑧ Data correctness: time column starts at offset +TEST_F(TsFileTableReaderRowQueryTest, OffsetDataCorrectness) { + write_simple_table(TOTAL_ROWS); + + TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow(TABLE_NAME, {"s0"}, 5, 10, rs), E_OK); + + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) { + // column 1 = time + int64_t ts = rs->get_value(1); + EXPECT_EQ(ts, static_cast(5 + count)); + // column 2 = s0 (same value as timestamp in our write helper) + int64_t val = rs->get_value(2); + EXPECT_EQ(val, static_cast(5 + count)); + count++; + } + EXPECT_EQ(count, 10); + reader.destroy_query_data_set(rs); + reader.close(); +} + +// ⑨ Large dataset, small page size: limit stops loading early (no hang/OOM) +TEST_F(TsFileTableReaderRowQueryTest, LimitStopsEarlyAcrossPages) { + int prev = g_config_value_.page_writer_max_point_num_; + g_config_value_.page_writer_max_point_num_ = 5; + + write_simple_table(200); + + ASSERT_EQ(count_rows(0, 50), 50); + + g_config_value_.page_writer_max_point_num_ = prev; +} + +// ⑩ Metadata accessible via queryByRow result set +TEST_F(TsFileTableReaderRowQueryTest, MetadataAccessible) { + write_simple_table(10); + + TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow(TABLE_NAME, {"s0"}, 0, 5, rs), E_OK); + + auto meta = rs->get_metadata(); + ASSERT_NE(meta, nullptr); + EXPECT_EQ(meta->get_column_name(1), "time"); + EXPECT_EQ(meta->get_column_name(2), "s0"); + EXPECT_EQ(meta->get_column_count(), 2u); + + reader.destroy_query_data_set(rs); + reader.close(); +} + +// ⑪ Paging consistency: two pages together equal full result +TEST_F(TsFileTableReaderRowQueryTest, PaginationConsistency) { + write_simple_table(40); + ASSERT_EQ(count_rows(0, 20) + count_rows(20, 20), 40); +} + +// ⑫ queryByRow result equivalent to full query with limit applied +TEST_F(TsFileTableReaderRowQueryTest, EquivalentToFullQueryWithLimit) { + write_simple_table(TOTAL_ROWS); + + // Full query row count + TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs_full = nullptr; + ASSERT_EQ(reader.query(TABLE_NAME, {"s0"}, INT64_MIN, INT64_MAX, rs_full), + E_OK); + int full_count = 0; + bool has_next = false; + while (IS_SUCC(rs_full->next(has_next)) && has_next) full_count++; + reader.destroy_query_data_set(rs_full); + + // queryByRow with limit < 0 should return the same count + ResultSet* rs_unlimited = nullptr; + ASSERT_EQ(reader.queryByRow(TABLE_NAME, {"s0"}, 0, -1, rs_unlimited), E_OK); + int unlimited_count = 0; + has_next = false; + while (IS_SUCC(rs_unlimited->next(has_next)) && has_next) unlimited_count++; + reader.destroy_query_data_set(rs_unlimited); + reader.close(); + + ASSERT_EQ(full_count, unlimited_count); +} + +// ⑬ Multiple flushes (multiple chunks): offset/limit still correct +TEST_F(TsFileTableReaderRowQueryTest, MultipleChunksCorrectness) { + // Write in two batches to ensure multiple chunks + WriteFile wf; + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + wf.create(file_name_, flags, 0666); + + std::vector col_schemas; + col_schemas.emplace_back("s0", TSDataType::INT64, + CompressionType::UNCOMPRESSED, TSEncoding::PLAIN, + ColumnCategory::FIELD); + auto* schema = new TableSchema(TABLE_NAME, col_schemas); + auto writer = std::make_shared(&wf, schema); + + // First batch: rows 0..29 + Tablet tablet1(TABLE_NAME, {"s0"}, {TSDataType::INT64}, + {ColumnCategory::FIELD}, 30); + for (int i = 0; i < 30; i++) { + tablet1.add_timestamp(i, static_cast(i)); + tablet1.add_value(i, 0, static_cast(i)); + } + writer->write_table(tablet1); + writer->flush(); + + // Second batch: rows 30..59 + Tablet tablet2(TABLE_NAME, {"s0"}, {TSDataType::INT64}, + {ColumnCategory::FIELD}, 30); + for (int i = 0; i < 30; i++) { + tablet2.add_timestamp(i, static_cast(30 + i)); + tablet2.add_value(i, 0, static_cast(30 + i)); + } + writer->write_table(tablet2); + writer->flush(); + writer->close(); + delete schema; + + // offset=25, limit=20 → rows 25..44 + TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow(TABLE_NAME, {"s0"}, 25, 20, rs), E_OK); + + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) { + int64_t ts = rs->get_value(1); + EXPECT_EQ(ts, static_cast(25 + count)); + count++; + } + EXPECT_EQ(count, 20); + reader.destroy_query_data_set(rs); + reader.close(); +} diff --git a/cpp/test/reader/tree_view/tsfile_tree_reader_row_query_test.cc b/cpp/test/reader/tree_view/tsfile_tree_reader_row_query_test.cc new file mode 100644 index 000000000..bc1930204 --- /dev/null +++ b/cpp/test/reader/tree_view/tsfile_tree_reader_row_query_test.cc @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include + +#include "common/config/config.h" +#include "common/record.h" +#include "common/schema.h" +#include "file/write_file.h" +#include "reader/result_set.h" +#include "reader/tsfile_tree_reader.h" +#include "writer/tsfile_tree_writer.h" + +using namespace storage; +using namespace common; + +// ───────────────────────────────────────────────────────────────────────────── +// Fixture +// ───────────────────────────────────────────────────────────────────────────── + +class TsFileTreeReaderRowQueryTest : public ::testing::Test { + protected: + static std::string generate_random_string(int length) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 61); + const std::string chars = + "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + std::string s; + for (int i = 0; i < length; ++i) s += chars[dis(gen)]; + return s; + } + + void SetUp() override { + libtsfile_init(); + file_name_ = "tsfile_tree_row_query_test_" + + generate_random_string(10) + ".tsfile"; + remove(file_name_.c_str()); + } + + void TearDown() override { + remove(file_name_.c_str()); + libtsfile_destroy(); + } + + // Write `num_rows` records for each device in `device_ids`, + // using measurement `mea` (INT64). Timestamps are 0..num_rows-1, + // values are timestamp * 10. + void write_data(const std::vector& device_ids, + const std::string& mea, int num_rows) { + WriteFile wf; + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + wf.create(file_name_, flags, 0666); + TsFileTreeWriter writer(&wf); + for (auto dev : + device_ids) { // copy: register_timeseries needs non-const ref + auto* schema = new MeasurementSchema(mea, INT64); + writer.register_timeseries(dev, schema); + delete schema; + for (int i = 0; i < num_rows; i++) { + TsRecord record(dev, static_cast(i)); + record.add_point(mea, static_cast(i * 10)); + writer.write(record); + } + } + writer.flush(); + writer.close(); + } + + // Count the number of rows returned by queryByRow. + int count_rows(const std::string& file, + const std::vector& devs, const std::string& mea, + int offset, int limit) { + TsFileTreeReader reader; + EXPECT_EQ(reader.open(file), E_OK); + ResultSet* rs = nullptr; + EXPECT_EQ(reader.queryByRow(devs, {mea}, offset, limit, rs), E_OK); + EXPECT_NE(rs, nullptr); + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) count++; + reader.destroy_query_data_set(rs); + reader.close(); + return count; + } + + std::string file_name_; + enum { TOTAL_ROWS = 50 }; + const std::string DEV = "device"; + const std::string MEA = "s1"; +}; + +// ───────────────────────────────────────────────────────────────────────────── +// Tests +// ───────────────────────────────────────────────────────────────────────────── + +// ① limit = 0: empty result +TEST_F(TsFileTreeReaderRowQueryTest, LimitZeroReturnsEmpty) { + write_data({DEV}, MEA, TOTAL_ROWS); + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 0, 0), 0); +} + +// ② limit < total: only `limit` rows returned +TEST_F(TsFileTreeReaderRowQueryTest, LimitLessThanTotal) { + write_data({DEV}, MEA, TOTAL_ROWS); + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 0, 20), 20); +} + +// ③ limit > total: all rows returned +TEST_F(TsFileTreeReaderRowQueryTest, LimitExceedsTotal) { + write_data({DEV}, MEA, TOTAL_ROWS); + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 0, 1000), TOTAL_ROWS); +} + +// ④ limit < 0: unlimited, returns all rows +TEST_F(TsFileTreeReaderRowQueryTest, NegativeLimitMeansUnlimited) { + write_data({DEV}, MEA, TOTAL_ROWS); + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 0, -1), TOTAL_ROWS); +} + +// ⑤ offset + limit in the middle of the data +TEST_F(TsFileTreeReaderRowQueryTest, OffsetPlusLimit) { + write_data({DEV}, MEA, TOTAL_ROWS); + // offset=10, limit=15 → should return exactly 15 rows + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 10, 15), 15); +} + +// ⑥ offset >= total: empty result +TEST_F(TsFileTreeReaderRowQueryTest, OffsetBeyondTotal) { + write_data({DEV}, MEA, TOTAL_ROWS); + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 1000, 10), 0); +} + +// ⑦ offset + limit > total: return remaining rows from offset +TEST_F(TsFileTreeReaderRowQueryTest, OffsetPlusLimitExceedsTotal) { + write_data({DEV}, MEA, TOTAL_ROWS); + // offset=40, limit=20 → only 10 rows remain + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 40, 20), 10); +} + +// ⑧ Data correctness: verify timestamps start from `offset` +TEST_F(TsFileTreeReaderRowQueryTest, OffsetDataCorrectness) { + write_data({DEV}, MEA, TOTAL_ROWS); + TsFileTreeReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow({DEV}, {MEA}, 5, 10, rs), E_OK); + + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) { + int64_t ts = rs->get_row_record()->get_timestamp(); + EXPECT_EQ(ts, static_cast(5 + count)); + int64_t val = rs->get_value(2); + EXPECT_EQ(val, (5 + count) * 10); + count++; + } + EXPECT_EQ(count, 10); + reader.destroy_query_data_set(rs); + reader.close(); +} + +// ⑨ Cross-chunk: small page size to force multiple chunks, verify correctness +TEST_F(TsFileTreeReaderRowQueryTest, CorrectnessAcrossChunks) { + int prev = g_config_value_.page_writer_max_point_num_; + g_config_value_.page_writer_max_point_num_ = 5; // tiny pages + + write_data({DEV}, MEA, 30); + + TsFileTreeReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow({DEV}, {MEA}, 5, 10, rs), E_OK); + + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) { + int64_t ts = rs->get_row_record()->get_timestamp(); + EXPECT_EQ(ts, static_cast(5 + count)); + count++; + } + EXPECT_EQ(count, 10); + reader.destroy_query_data_set(rs); + reader.close(); + + g_config_value_.page_writer_max_point_num_ = prev; +} + +// ⑩ Multiple devices: verify total row count with offset/limit +TEST_F(TsFileTreeReaderRowQueryTest, MultipleDevicesOffsetLimit) { + // device_1 and device_2 each have 20 rows with timestamps 0..19. + // QDSWithoutTimeGenerator merges them by time: at each timestamp both + // devices' values appear in one RowRecord, so total distinct timestamps + // = 20. + write_data({"device_1", "device_2"}, MEA, 20); + + ASSERT_EQ(count_rows(file_name_, {"device_1", "device_2"}, MEA, 5, 10), 10); +} + +// ⑪ queryByRow result set supports metadata inspection +TEST_F(TsFileTreeReaderRowQueryTest, MetadataAccessible) { + write_data({DEV}, MEA, 10); + + TsFileTreeReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow({DEV}, {MEA}, 0, 5, rs), E_OK); + + auto meta = rs->get_metadata(); + ASSERT_NE(meta, nullptr); + // column 1 = "time", column 2 = measurement + EXPECT_EQ(meta->get_column_name(1), "time"); + EXPECT_EQ(meta->get_column_count(), 2u); + + reader.destroy_query_data_set(rs); + reader.close(); +} + +// ⑫ Paging consistency: two pages together equal the full result +TEST_F(TsFileTreeReaderRowQueryTest, PaginationConsistency) { + write_data({DEV}, MEA, 40); + + int page1 = count_rows(file_name_, {DEV}, MEA, 0, 20); + int page2 = count_rows(file_name_, {DEV}, MEA, 20, 20); + ASSERT_EQ(page1 + page2, 40); +} From e1d8657649f9f90f50ef39229fa1698a33f453d3 Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Wed, 11 Mar 2026 15:51:00 +0800 Subject: [PATCH 7/9] python query by row --- python/tsfile/tsfile_cpp.pxd | 14 ++++++ python/tsfile/tsfile_py_cpp.pxd | 4 ++ python/tsfile/tsfile_py_cpp.pyx | 75 +++++++++++++++++++++++++++++++++ python/tsfile/tsfile_reader.pyx | 48 +++++++++++++++++++++ 4 files changed, 141 insertions(+) diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 9c65fb26f..c394c09bb 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -189,6 +189,20 @@ cdef extern from "./tsfile_cwrapper.h": char ** sensor_name, uint32_t sensor_num, int64_t start_time, int64_t end_time, ErrorCode *err_code) + ResultSet tsfile_reader_query_tree_by_row(TsFileReader reader, + char** device_ids, int device_ids_len, + char** measurement_names, + int measurement_names_len, + int offset, int limit, + ErrorCode* err_code) + + ResultSet tsfile_reader_query_table_by_row(TsFileReader reader, + const char* table_name, + char** column_names, + int column_names_len, + int offset, int limit, + ErrorCode* err_code) + TableSchema tsfile_reader_get_table_schema(TsFileReader reader, const char * table_name); diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd index 2389aa9a6..16540effa 100644 --- a/python/tsfile/tsfile_py_cpp.pxd +++ b/python/tsfile/tsfile_py_cpp.pxd @@ -54,6 +54,10 @@ cdef public api ResultSet tsfile_reader_query_table_on_tree_c(TsFileReader reade int64_t start_time, int64_t end_time) cdef public api ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_name, object sensor_list, int64_t start_time, int64_t end_time) +cdef public api ResultSet tsfile_reader_query_tree_by_row_c(TsFileReader reader, object device_ids, + object measurement_names, int offset, int limit) +cdef public api ResultSet tsfile_reader_query_table_by_row_c(TsFileReader reader, object table_name, + object column_names, int offset, int limit) cdef public api object get_table_schema(TsFileReader reader, object table_name) cdef public api object get_all_table_schema(TsFileReader reader) cdef public api object get_all_timeseries_schema(TsFileReader reader) diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index 3ca79a2a1..179d95c25 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -779,6 +779,81 @@ cdef ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_na free( sensor_list_c) sensor_list_c = NULL +cdef ResultSet tsfile_reader_query_tree_by_row_c( + TsFileReader reader, object device_ids, object measurement_names, + int offset, int limit): + cdef int dev_num = len(device_ids) + cdef int mea_num = len(measurement_names) + cdef char** c_devs = malloc(sizeof(char*) * dev_num) + cdef char** c_meas = malloc(sizeof(char*) * mea_num) + cdef ErrorCode code = 0 + cdef int i + if c_devs == NULL or c_meas == NULL: + if c_devs != NULL: + free( c_devs) + if c_meas != NULL: + free( c_meas) + raise MemoryError("Failed to allocate memory for query arrays") + for i in range(dev_num): + c_devs[i] = NULL + for i in range(mea_num): + c_meas[i] = NULL + try: + for i in range(dev_num): + c_devs[i] = strdup(( device_ids[i]).encode('utf-8')) + if c_devs[i] == NULL: + raise MemoryError("Failed to allocate memory for device id") + for i in range(mea_num): + c_meas[i] = strdup(( measurement_names[i]).encode('utf-8')) + if c_meas[i] == NULL: + raise MemoryError("Failed to allocate memory for measurement name") + result = tsfile_reader_query_tree_by_row( + reader, c_devs, dev_num, c_meas, mea_num, offset, limit, &code) + check_error(code) + return result + finally: + for i in range(dev_num): + if c_devs[i] != NULL: + free( c_devs[i]) + c_devs[i] = NULL + free( c_devs) + for i in range(mea_num): + if c_meas[i] != NULL: + free( c_meas[i]) + c_meas[i] = NULL + free( c_meas) + + +cdef ResultSet tsfile_reader_query_table_by_row_c( + TsFileReader reader, object table_name, object column_names, + int offset, int limit): + cdef int col_num = len(column_names) + cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name) + cdef const char* table_name_c = table_name_bytes + cdef char** c_cols = malloc(sizeof(char*) * col_num) + cdef ErrorCode code = 0 + cdef int i + if c_cols == NULL: + raise MemoryError("Failed to allocate memory for column names") + for i in range(col_num): + c_cols[i] = NULL + try: + for i in range(col_num): + c_cols[i] = strdup(( column_names[i]).encode('utf-8')) + if c_cols[i] == NULL: + raise MemoryError("Failed to allocate memory for column name") + result = tsfile_reader_query_table_by_row( + reader, table_name_c, c_cols, col_num, offset, limit, &code) + check_error(code) + return result + finally: + for i in range(col_num): + if c_cols[i] != NULL: + free( c_cols[i]) + c_cols[i] = NULL + free( c_cols) + + cdef object get_table_schema(TsFileReader reader, object table_name): cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name) cdef const char * table_name_c = table_name_bytes diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index 4476d24dc..637444571 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -320,6 +320,54 @@ cdef class TsFileReaderPy: self.activate_result_set_list.add(pyresult) return pyresult + def query_tree_by_row(self, device_ids: List[str], measurement_names: List[str], + offset: int, limit: int) -> ResultSetPy: + """ + Query tree model data by row range. + + Internally queries the full time range and applies offset/limit at the + result-set level. Once ``limit`` rows are returned, no further data is + loaded from storage. + + :param device_ids: List of device identifiers to query. + :param measurement_names: List of measurement names to query. + :param offset: Number of leading rows to skip (>= 0). + :param limit: Maximum number of rows to return. < 0 means unlimited. + :return: ResultSet containing query results. + """ + cdef ResultSet result + result = tsfile_reader_query_tree_by_row_c( + self.reader, device_ids, measurement_names, offset, limit) + pyresult = ResultSetPy(self, True) + pyresult.init_c(result, device_ids[0] if device_ids else "") + self.activate_result_set_list.add(pyresult) + return pyresult + + def query_table_by_row(self, table_name: str, column_names: List[str], + offset: int, limit: int) -> ResultSetPy: + """ + Query table model data by row range. + + Internally queries the full time range and applies offset/limit at the + result-set level. Once ``limit`` rows are returned, no further data is + loaded from storage. + + :param table_name: Target table name. + :param column_names: List of column names to query. + :param offset: Number of leading rows to skip (>= 0). + :param limit: Maximum number of rows to return. < 0 means unlimited. + :return: ResultSet containing query results. + """ + cdef ResultSet result + result = tsfile_reader_query_table_by_row_c( + self.reader, table_name.lower(), + [column_name.lower() for column_name in column_names], + offset, limit) + pyresult = ResultSetPy(self) + pyresult.init_c(result, table_name) + self.activate_result_set_list.add(pyresult) + return pyresult + def notify_result_set_discard(self, result_set: ResultSetPy): """ Remove activate result set from activate_result_set_list, called when a result set close. From ce5adc6aa354f05f0f15e94003e21d43b5570d44 Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Wed, 11 Mar 2026 16:20:58 +0800 Subject: [PATCH 8/9] java query by row --- .../read/query/dataset/RowRangeResultSet.java | 184 ++++++++++++ .../read/v4/DeviceTableModelReader.java | 9 + .../apache/tsfile/read/v4/ITsFileReader.java | 20 ++ .../tsfile/read/v4/ITsFileTreeReader.java | 18 ++ .../tsfile/read/v4/TsFileTreeReader.java | 24 ++ .../DeviceTableModelReaderRowQueryTest.java | 283 ++++++++++++++++++ .../read/TsFileTreeReaderRowQueryTest.java | 215 +++++++++++++ 7 files changed, 753 insertions(+) create mode 100644 java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/RowRangeResultSet.java create mode 100644 java/tsfile/src/test/java/org/apache/tsfile/read/DeviceTableModelReaderRowQueryTest.java create mode 100644 java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTreeReaderRowQueryTest.java diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/RowRangeResultSet.java b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/RowRangeResultSet.java new file mode 100644 index 000000000..eb077efdd --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/RowRangeResultSet.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.query.dataset; + +import org.apache.tsfile.write.record.TSRecord; + +import java.io.IOException; +import java.time.LocalDate; +import java.util.Iterator; + +/** + * A {@link ResultSet} wrapper that applies row-level offset and limit. + * + *

Takes the inner ResultSet and releases it on {@link #close()}. Once the limit is reached, + * {@link #next()} returns {@code false} immediately without calling the underlying ResultSet, + * avoiding unnecessary data loading. + * + * @param offset Number of leading rows to skip (must be >= 0). + * @param limit Maximum number of rows to return. A value < 0 means no limit. + */ +public class RowRangeResultSet implements ResultSet { + + private final ResultSet inner; + private final int offset; + private final int limit; + private int returnedCount; + private boolean offsetSkipped; + + public RowRangeResultSet(ResultSet inner, int offset, int limit) { + this.inner = inner; + this.offset = Math.max(0, offset); + this.limit = limit; + this.returnedCount = 0; + this.offsetSkipped = false; + } + + @Override + public ResultSetMetadata getMetadata() { + return inner.getMetadata(); + } + + @Override + public boolean next() throws IOException { + // Skip the first `offset` rows on the first call. + if (!offsetSkipped) { + for (int i = 0; i < offset; i++) { + if (!inner.next()) { + offsetSkipped = true; + return false; + } + } + offsetSkipped = true; + } + + // Limit reached: return false without touching inner ResultSet. + // This is the key "pushdown" effect: no further chunk/page loading occurs. + if (limit >= 0 && returnedCount >= limit) { + return false; + } + + boolean hasNext = inner.next(); + if (hasNext) { + returnedCount++; + } + return hasNext; + } + + @Override + public int getInt(String columnName) { + return inner.getInt(columnName); + } + + @Override + public int getInt(int columnIndex) { + return inner.getInt(columnIndex); + } + + @Override + public long getLong(String columnName) { + return inner.getLong(columnName); + } + + @Override + public long getLong(int columnIndex) { + return inner.getLong(columnIndex); + } + + @Override + public float getFloat(String columnName) { + return inner.getFloat(columnName); + } + + @Override + public float getFloat(int columnIndex) { + return inner.getFloat(columnIndex); + } + + @Override + public double getDouble(String columnName) { + return inner.getDouble(columnName); + } + + @Override + public double getDouble(int columnIndex) { + return inner.getDouble(columnIndex); + } + + @Override + public boolean getBoolean(String columnName) { + return inner.getBoolean(columnName); + } + + @Override + public boolean getBoolean(int columnIndex) { + return inner.getBoolean(columnIndex); + } + + @Override + public String getString(String columnName) { + return inner.getString(columnName); + } + + @Override + public String getString(int columnIndex) { + return inner.getString(columnIndex); + } + + @Override + public LocalDate getDate(String columnName) { + return inner.getDate(columnName); + } + + @Override + public LocalDate getDate(int columnIndex) { + return inner.getDate(columnIndex); + } + + @Override + public byte[] getBinary(String columnName) { + return inner.getBinary(columnName); + } + + @Override + public byte[] getBinary(int columnIndex) { + return inner.getBinary(columnIndex); + } + + @Override + public boolean isNull(String columnName) { + return inner.isNull(columnName); + } + + @Override + public boolean isNull(int columnIndex) { + return inner.isNull(columnIndex); + } + + @Override + public void close() { + inner.close(); + } + + @Override + public Iterator iterator() { + return inner.iterator(); + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java index 927ac8954..72e3a4a0a 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java @@ -33,6 +33,7 @@ import org.apache.tsfile.read.expression.ExpressionTree; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.query.dataset.RowRangeResultSet; import org.apache.tsfile.read.query.dataset.TableResultSet; import org.apache.tsfile.read.query.executor.TableQueryExecutor; import org.apache.tsfile.read.reader.block.TsBlockReader; @@ -113,6 +114,14 @@ public ResultSet query( return new TableResultSet(tsBlockReader, columnNames, dataTypeList, tableName); } + @TsFileApi + @Override + public ResultSet queryByRow(String tableName, List columnNames, int offset, int limit) + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + ResultSet inner = query(tableName, columnNames, Long.MIN_VALUE, Long.MAX_VALUE); + return new RowRangeResultSet(inner, offset, limit); + } + @Override public void close() { try { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java index 995f73f64..bf29ffcee 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java @@ -48,6 +48,26 @@ ResultSet query( @TsFileApi List getAllTableSchema() throws IOException; + /** + * Query table model data by row range. + * + *

Internally queries the full time range and applies offset/limit at the result-set level. + * Once {@code limit} rows are returned, no further data is loaded from storage. + * + * @param tableName target table name + * @param columnNames list of column names to query + * @param offset number of leading rows to skip (>= 0) + * @param limit maximum number of rows to return; < 0 means unlimited + * @return a {@link ResultSet} containing the query results + * @throws ReadProcessException if a read processing error occurs + * @throws IOException if an I/O error occurs + * @throws NoTableException if the table does not exist + * @throws NoMeasurementException if a column does not exist + */ + @TsFileApi + ResultSet queryByRow(String tableName, List columnNames, int offset, int limit) + throws ReadProcessException, IOException, NoTableException, NoMeasurementException; + @TsFileApi void close(); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java index 202553ea8..40ccbad70 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java @@ -47,6 +47,24 @@ ResultSet query( @TreeModel List getDeviceSchema(String deviceId) throws IOException; + /** + * Query tree model data by row range. + * + *

Internally queries the full time range and applies offset/limit at the result-set level. + * Once {@code limit} rows are returned, no further data is loaded from storage. + * + * @param deviceIds list of device identifiers to query + * @param measurementNames list of measurement names to query + * @param offset number of leading rows to skip (>= 0) + * @param limit maximum number of rows to return; < 0 means unlimited + * @return a {@link ResultSet} containing the query results + * @throws IOException if an I/O error occurs during query execution + */ + @TsFileApi + @TreeModel + ResultSet queryByRow(List deviceIds, List measurementNames, int offset, int limit) + throws IOException; + /** Close underlying resources. */ @TsFileApi @TreeModel diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java index fe7a0c35b..c8b659b1b 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java @@ -30,6 +30,7 @@ import org.apache.tsfile.read.filter.operator.TimeFilterOperators; import org.apache.tsfile.read.query.dataset.QueryDataSet; import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.query.dataset.RowRangeResultSet; import org.apache.tsfile.read.query.dataset.TreeResultSet; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -113,6 +114,29 @@ public List getDeviceSchema(String deviceId) throws IOExcepti return tsfileReader.getMeasurement(new StringArrayDeviceID(deviceId)); } + /** + * Query tree model data by row range. + * + *

Internally queries the full time range and applies offset/limit at the result-set level. + * Once {@code limit} rows are returned, no further data is loaded from storage. + * + * @param deviceIds list of device identifiers to query + * @param measurementNames list of measurement names to query + * @param offset number of leading rows to skip (>= 0) + * @param limit maximum number of rows to return; < 0 means unlimited + * @return a {@link ResultSet} containing the query results + * @throws IOException if an I/O error occurs during query execution + */ + @TsFileApi + @TreeModel + @Override + public ResultSet queryByRow( + List deviceIds, List measurementNames, int offset, int limit) + throws IOException { + ResultSet inner = query(deviceIds, measurementNames, Long.MIN_VALUE, Long.MAX_VALUE); + return new RowRangeResultSet(inner, offset, limit); + } + /** * Close the TsFileTreeReader and release resources. * diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/DeviceTableModelReaderRowQueryTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/DeviceTableModelReaderRowQueryTest.java new file mode 100644 index 000000000..54671eec5 --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/DeviceTableModelReaderRowQueryTest.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.read.ReadProcessException; +import org.apache.tsfile.exception.write.NoMeasurementException; +import org.apache.tsfile.exception.write.NoTableException; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.v4.DeviceTableModelReader; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.v4.ITsFileWriter; +import org.apache.tsfile.write.v4.TsFileWriterBuilder; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Unit tests for {@link DeviceTableModelReader#queryByRow} (table-model row-range query). + * + *

The test table has {@value #TOTAL} rows with timestamps 0..TOTAL-1 and field values equal to + * their timestamps. + */ +public class DeviceTableModelReaderRowQueryTest { + + private static final String FILE_PATH = "test_table_reader_row_query.tsfile"; + private static final int TOTAL = 50; + private static final String TABLE = "t1"; + private static final String FIELD = "s0"; + + private DeviceTableModelReader reader; + + @Before + public void setUp() throws IOException, WriteProcessException { + writeTableFile(FILE_PATH, TABLE, FIELD, TOTAL); + reader = new DeviceTableModelReader(new File(FILE_PATH)); + } + + @After + public void tearDown() { + if (reader != null) { + reader.close(); + } + new File(FILE_PATH).delete(); + } + + // ① limit=0 → empty result + @Test + public void testLimitZeroReturnsEmpty() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(0, countRows(0, 0)); + } + + // ② limit < total → exactly `limit` rows + @Test + public void testLimitLessThanTotal() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(10, countRows(0, 10)); + } + + // ③ limit > total → all rows + @Test + public void testLimitExceedsTotal() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(TOTAL, countRows(0, 9999)); + } + + // ④ limit=-1 → unlimited, returns all rows + @Test + public void testNegativeLimitMeansUnlimited() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(TOTAL, countRows(0, -1)); + } + + // ⑤ offset + limit in the middle + @Test + public void testOffsetPlusLimit() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(15, countRows(10, 15)); + } + + // ⑥ offset >= total → empty result + @Test + public void testOffsetBeyondTotal() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(0, countRows(1000, 10)); + } + + // ⑦ offset + limit > total → return remaining rows from offset + @Test + public void testOffsetPlusLimitExceedsTotal() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + // offset=40, limit=20 → only 10 rows remain + Assert.assertEquals(10, countRows(40, 20)); + } + + // ⑧ data correctness: timestamps and values start from `offset` + @Test + public void testDataCorrectness() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + List columns = Collections.singletonList(FIELD); + ResultSet rs = reader.queryByRow(TABLE, columns, 5, 10); + int count = 0; + while (rs.next()) { + long ts = rs.getLong(1); // column 1 = Time + long val = rs.getLong(2); // column 2 = s0 + Assert.assertEquals(5 + count, ts); + Assert.assertEquals(5 + count, val); + count++; + } + rs.close(); + Assert.assertEquals(10, count); + } + + // ⑨ metadata is accessible via the result set + @Test + public void testMetadataAccessible() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + List columns = Collections.singletonList(FIELD); + ResultSet rs = reader.queryByRow(TABLE, columns, 0, 5); + Assert.assertNotNull(rs.getMetadata()); + Assert.assertEquals("Time", rs.getMetadata().getColumnName(1)); + Assert.assertEquals(FIELD, rs.getMetadata().getColumnName(2)); + rs.close(); + } + + // ⑩ paging consistency: two pages together equal the full result + @Test + public void testPaginationConsistency() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + int page1 = countRows(0, 25); + int page2 = countRows(25, 25); + Assert.assertEquals(TOTAL, page1 + page2); + } + + // ⑪ multiple chunks: offset/limit spans chunk boundary correctly + @Test + public void testMultipleChunksCorrectness() + throws IOException, + WriteProcessException, + ReadProcessException, + NoTableException, + NoMeasurementException { + String filePath = "test_table_reader_row_query_multi_chunk.tsfile"; + writeTableFileMultiChunk(filePath, TABLE, FIELD, 30, 30); + try (DeviceTableModelReader r = new DeviceTableModelReader(new File(filePath))) { + // offset=25, limit=20 → rows 25..44 + List columns = Collections.singletonList(FIELD); + ResultSet rs = r.queryByRow(TABLE, columns, 25, 20); + int count = 0; + while (rs.next()) { + long ts = rs.getLong(1); + Assert.assertEquals(25 + count, ts); + count++; + } + rs.close(); + Assert.assertEquals(20, count); + } finally { + new File(filePath).delete(); + } + } + + // ───────────────────────────────────────────────────────────────────────────── + // Helpers + // ───────────────────────────────────────────────────────────────────────────── + + private int countRows(int offset, int limit) + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + List columns = Collections.singletonList(FIELD); + ResultSet rs = reader.queryByRow(TABLE, columns, offset, limit); + int count = 0; + while (rs.next()) { + count++; + } + rs.close(); + return count; + } + + /** + * Write a single-chunk table file with {@code numRows} rows. Timestamps are 0..numRows-1 and + * field values equal their timestamps. + */ + private static void writeTableFile( + String filePath, String tableName, String fieldName, int numRows) + throws IOException, WriteProcessException { + TableSchema tableSchema = + new TableSchema( + tableName, + Collections.singletonList(new MeasurementSchema(fieldName, TSDataType.INT64)), + Collections.singletonList(ColumnCategory.FIELD)); + try (ITsFileWriter writer = + new TsFileWriterBuilder().file(new File(filePath)).tableSchema(tableSchema).build()) { + Tablet tablet = + new Tablet( + tableName, + Collections.singletonList(fieldName), + Collections.singletonList(TSDataType.INT64), + Collections.singletonList(ColumnCategory.FIELD), + numRows); + for (int i = 0; i < numRows; i++) { + tablet.addTimestamp(i, i); + tablet.addValue(fieldName, i, (long) i); + } + writer.write(tablet); + } + } + + /** + * Write a two-chunk table file by using a tiny memory threshold to force a flush between the two + * tablets. First chunk has rows 0..chunk1Rows-1, second chunk has rows + * chunk1Rows..chunk1Rows+chunk2Rows-1. Field values equal their timestamps. + */ + private static void writeTableFileMultiChunk( + String filePath, String tableName, String fieldName, int chunk1Rows, int chunk2Rows) + throws IOException, WriteProcessException { + TableSchema tableSchema = + new TableSchema( + tableName, + Collections.singletonList(new MeasurementSchema(fieldName, TSDataType.INT64)), + Collections.singletonList(ColumnCategory.FIELD)); + // memoryThreshold(1) forces a flush after every write, producing multiple chunks. + try (ITsFileWriter writer = + new TsFileWriterBuilder() + .file(new File(filePath)) + .tableSchema(tableSchema) + .memoryThreshold(1) + .build()) { + Tablet tablet1 = + new Tablet( + tableName, + Collections.singletonList(fieldName), + Collections.singletonList(TSDataType.INT64), + Collections.singletonList(ColumnCategory.FIELD), + chunk1Rows); + for (int i = 0; i < chunk1Rows; i++) { + tablet1.addTimestamp(i, i); + tablet1.addValue(fieldName, i, (long) i); + } + writer.write(tablet1); + + Tablet tablet2 = + new Tablet( + tableName, + Collections.singletonList(fieldName), + Collections.singletonList(TSDataType.INT64), + Collections.singletonList(ColumnCategory.FIELD), + chunk2Rows); + for (int i = 0; i < chunk2Rows; i++) { + tablet2.addTimestamp(i, chunk1Rows + i); + tablet2.addValue(fieldName, i, (long) (chunk1Rows + i)); + } + writer.write(tablet2); + } + } +} diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTreeReaderRowQueryTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTreeReaderRowQueryTest.java new file mode 100644 index 000000000..e6a9b4850 --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTreeReaderRowQueryTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.v4.ITsFileTreeReader; +import org.apache.tsfile.read.v4.TsFileTreeReaderBuilder; +import org.apache.tsfile.write.record.TSRecord; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.v4.TsFileTreeWriter; +import org.apache.tsfile.write.v4.TsFileTreeWriterBuilder; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Unit tests for {@link ITsFileTreeReader#queryByRow} (tree-model row-range query). + * + *

Each device has {@value #TOTAL} rows with timestamps 0..TOTAL-1 and values timestamp * 10. + */ +public class TsFileTreeReaderRowQueryTest { + + private static final String FILE_PATH = "test_tree_reader_row_query.tsfile"; + private static final int TOTAL = 50; + private static final String DEVICE = "device"; + private static final String MEA = "s1"; + + private ITsFileTreeReader reader; + + @Before + public void setUp() throws IOException, WriteProcessException { + writeTreeFile(FILE_PATH, Collections.singletonList(DEVICE), MEA, TOTAL); + reader = new TsFileTreeReaderBuilder().file(new File(FILE_PATH)).build(); + } + + @After + public void tearDown() throws IOException { + if (reader != null) { + reader.close(); + } + new File(FILE_PATH).delete(); + } + + // ① limit=0 → empty result + @Test + public void testLimitZeroReturnsEmpty() throws IOException { + Assert.assertEquals(0, countRows(reader, DEVICE, MEA, 0, 0)); + } + + // ② limit < total → exactly `limit` rows + @Test + public void testLimitLessThanTotal() throws IOException { + Assert.assertEquals(20, countRows(reader, DEVICE, MEA, 0, 20)); + } + + // ③ limit > total → all rows + @Test + public void testLimitExceedsTotal() throws IOException { + Assert.assertEquals(TOTAL, countRows(reader, DEVICE, MEA, 0, 1000)); + } + + // ④ limit=-1 → unlimited, returns all rows + @Test + public void testNegativeLimitMeansUnlimited() throws IOException { + Assert.assertEquals(TOTAL, countRows(reader, DEVICE, MEA, 0, -1)); + } + + // ⑤ offset + limit in the middle + @Test + public void testOffsetPlusLimit() throws IOException { + Assert.assertEquals(15, countRows(reader, DEVICE, MEA, 10, 15)); + } + + // ⑥ offset >= total → empty result + @Test + public void testOffsetBeyondTotal() throws IOException { + Assert.assertEquals(0, countRows(reader, DEVICE, MEA, 1000, 10)); + } + + // ⑦ offset + limit > total → return remaining rows from offset + @Test + public void testOffsetPlusLimitExceedsTotal() throws IOException { + // offset=40, limit=20 → only 10 rows remain + Assert.assertEquals(10, countRows(reader, DEVICE, MEA, 40, 20)); + } + + // ⑧ data correctness: timestamps and values start from `offset` + @Test + public void testDataCorrectness() throws IOException { + List deviceIds = Collections.singletonList(DEVICE); + List measurements = Collections.singletonList(MEA); + ResultSet rs = reader.queryByRow(deviceIds, measurements, 5, 10); + int count = 0; + while (rs.next()) { + long ts = rs.getLong(1); // column 1 = Time + long val = rs.getLong(2); // column 2 = measurement + Assert.assertEquals(5 + count, ts); + Assert.assertEquals((5 + count) * 10L, val); + count++; + } + rs.close(); + Assert.assertEquals(10, count); + } + + // ⑨ metadata is accessible via the result set + @Test + public void testMetadataAccessible() throws IOException { + List deviceIds = Collections.singletonList(DEVICE); + List measurements = Collections.singletonList(MEA); + ResultSet rs = reader.queryByRow(deviceIds, measurements, 0, 5); + Assert.assertNotNull(rs.getMetadata()); + // Column 1 is always "Time"; column 2 is the full path "device.s1" + Assert.assertEquals("Time", rs.getMetadata().getColumnName(1)); + Assert.assertEquals(DEVICE + "." + MEA, rs.getMetadata().getColumnName(2)); + rs.close(); + } + + // ⑩ paging consistency: two pages together equal the full result + @Test + public void testPaginationConsistency() throws IOException { + int page1 = countRows(reader, DEVICE, MEA, 0, 25); + int page2 = countRows(reader, DEVICE, MEA, 25, 25); + Assert.assertEquals(TOTAL, page1 + page2); + } + + // ⑪ multiple devices: offset/limit applied to merged result + @Test + public void testMultipleDevices() throws IOException, WriteProcessException { + String filePath = "test_tree_reader_row_query_multi.tsfile"; + writeTreeFile(filePath, Arrays.asList("dev1", "dev2"), MEA, 20); + try (ITsFileTreeReader r = new TsFileTreeReaderBuilder().file(new File(filePath)).build()) { + int count = countRows(r, null, MEA, 5, 10); + Assert.assertEquals(10, count); + } finally { + new File(filePath).delete(); + } + } + + // ───────────────────────────────────────────────────────────────────────────── + // Helpers + // ───────────────────────────────────────────────────────────────────────────── + + /** + * Count rows returned by {@code queryByRow}. + * + * @param r reader (already open) + * @param device device ID; if {@code null}, both "dev1" and "dev2" are queried + * @param mea measurement name + * @param offset row offset + * @param limit row limit + */ + private int countRows(ITsFileTreeReader r, String device, String mea, int offset, int limit) + throws IOException { + List deviceIds = + device != null ? Collections.singletonList(device) : Arrays.asList("dev1", "dev2"); + List measurements = Collections.singletonList(mea); + ResultSet rs = r.queryByRow(deviceIds, measurements, offset, limit); + int count = 0; + while (rs.next()) { + count++; + } + rs.close(); + return count; + } + + /** + * Write a tree-model TsFile with the given devices and measurement. Timestamps are 0..numRows-1, + * values are timestamp * 10 (INT64). + */ + private static void writeTreeFile( + String filePath, List deviceIds, String measurement, int numRows) + throws IOException, WriteProcessException { + File file = new File(filePath); + MeasurementSchema schema = new MeasurementSchema(measurement, TSDataType.INT64); + try (TsFileTreeWriter writer = new TsFileTreeWriterBuilder().file(file).build()) { + for (String deviceId : deviceIds) { + writer.registerTimeseries(deviceId, schema); + } + for (String deviceId : deviceIds) { + for (int i = 0; i < numRows; i++) { + TSRecord record = new TSRecord(deviceId, i); + record.addPoint(measurement, (long) i * 10); + writer.write(record); + } + } + } + } +} From 98a0bec549166e7d8f2dd1f9840dc19fe18a2825 Mon Sep 17 00:00:00 2001 From: 761417898 <761417898@qq.com> Date: Wed, 11 Mar 2026 16:55:19 +0800 Subject: [PATCH 9/9] mvn spotless:apply --- cpp/src/reader/aligned_chunk_reader.cc | 3 +++ cpp/src/writer/tsfile_writer.cc | 11 ++++++----- cpp/src/writer/value_page_writer.h | 3 ++- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/cpp/src/reader/aligned_chunk_reader.cc b/cpp/src/reader/aligned_chunk_reader.cc index 60d9c819c..14250e7f8 100644 --- a/cpp/src/reader/aligned_chunk_reader.cc +++ b/cpp/src/reader/aligned_chunk_reader.cc @@ -550,6 +550,7 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock( row_appender.append_null(1); \ continue; \ } \ + assert(value_decoder_->has_remaining(value_in)); \ if (!value_decoder_->has_remaining(value_in)) { \ return common::E_DATA_INCONSISTENCY; \ } \ @@ -596,6 +597,7 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK( row_appender.append_null(1); continue; } + assert(value_decoder_->has_remaining(value_in)); if (!value_decoder_->has_remaining(value_in)) { return common::E_DATA_INCONSISTENCY; } @@ -681,6 +683,7 @@ int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK( } if (should_read_data) { + assert(value_decoder_->has_remaining(value_in)); if (!value_decoder_->has_remaining(value_in)) { return E_DATA_INCONSISTENCY; } diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 52db8ea48..856e19698 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -809,7 +809,8 @@ int TsFileWriter::write_table(Tablet& tablet) { return ret; } // Row-by-row write so that when time page seals (e.g. by memory - // threshold), we can seal all value pages together (Java semantics). + // threshold), we can seal all value pages together (Java + // semantics). for (int i = start_idx; i < end_idx; i++) { int32_t time_pages_before = time_chunk_writer->num_of_pages(); if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[i]))) { @@ -822,9 +823,8 @@ int TsFileWriter::write_table(Tablet& tablet) { ValueChunkWriter* value_chunk_writer = value_chunk_writers[field_col_count]; if (!IS_NULL(value_chunk_writer) && - RET_FAIL(value_write_column(value_chunk_writer, - tablet, col, i, - i + 1))) { + RET_FAIL(value_write_column( + value_chunk_writer, tablet, col, i, i + 1))) { return ret; } field_col_count++; @@ -835,7 +835,8 @@ int TsFileWriter::write_table(Tablet& tablet) { for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { if (!IS_NULL(value_chunk_writers[k]) && value_chunk_writers[k]->has_current_page_data() && - RET_FAIL(value_chunk_writers[k]->seal_current_page())) { + RET_FAIL( + value_chunk_writers[k]->seal_current_page())) { return ret; } } diff --git a/cpp/src/writer/value_page_writer.h b/cpp/src/writer/value_page_writer.h index a0a6839c6..9057102cb 100644 --- a/cpp/src/writer/value_page_writer.h +++ b/cpp/src/writer/value_page_writer.h @@ -63,7 +63,8 @@ struct ValuePageData { compressor_ = nullptr; } - /** Clear pointers without freeing (transfer ownership to another holder). */ + /** Clear pointers without freeing (transfer ownership to another holder). + */ void clear() { col_notnull_bitmap_buf_size_ = 0; value_buf_size_ = 0;