diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index fbcf4e6f1..1bafd80aa 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -24,10 +24,12 @@ #include #include +#include #include #include "common/tablet.h" #include "reader/result_set.h" +#include "reader/row_range_result_set.h" #include "reader/tsfile_reader.h" #include "writer/tsfile_writer.h" @@ -770,6 +772,44 @@ ERRNO _tsfile_writer_flush(TsFileWriter writer) { return w->flush(); } +ResultSet tsfile_reader_query_tree_by_row(TsFileReader reader, + char** device_ids, int device_ids_len, + char** measurement_names, + int measurement_names_len, int offset, + int limit, ERRNO* err_code) { + auto* r = static_cast(reader); + std::vector path_list; + path_list.reserve(device_ids_len * measurement_names_len); + for (int i = 0; i < device_ids_len; i++) { + for (int j = 0; j < measurement_names_len; j++) { + path_list.push_back(std::string(device_ids[i]) + "." + + std::string(measurement_names[j])); + } + } + storage::ResultSet* inner = nullptr; + *err_code = r->query(path_list, INT64_MIN, INT64_MAX, inner); + if (*err_code != common::E_OK) return nullptr; + return new storage::RowRangeResultSet(inner, offset, limit); +} + +ResultSet tsfile_reader_query_table_by_row(TsFileReader reader, + const char* table_name, + char** column_names, + int column_names_len, int offset, + int limit, ERRNO* err_code) { + auto* r = static_cast(reader); + std::vector cols; + cols.reserve(column_names_len); + for (int i = 0; i < column_names_len; i++) { + cols.emplace_back(column_names[i]); + } + storage::ResultSet* inner = nullptr; + *err_code = + r->query(std::string(table_name), cols, INT64_MIN, INT64_MAX, inner); + if (*err_code != common::E_OK) return nullptr; + return new storage::RowRangeResultSet(inner, offset, limit); +} + ResultSet _tsfile_reader_query_device(TsFileReader reader, const char* device_name, char** sensor_name, uint32_t sensor_num, diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 643b4e52b..2f317fb90 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -444,6 +444,52 @@ ResultSet tsfile_query_table(TsFileReader reader, const char* table_name, ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, uint32_t column_num, Timestamp start_time, Timestamp end_time, ERRNO* err_code); + +/** + * @brief Query tree model time series data by row range (offset + limit). + * + * Combines all device/measurement paths, queries the full time range, then + * skips the first `offset` rows and returns at most `limit` rows. Once + * `limit` rows are returned, underlying data loading stops immediately. + * + * @param reader [in] Valid TsFileReader handle. + * @param device_ids [in] Array of device IDs. + * @param device_ids_len [in] Number of device IDs. + * @param measurement_names [in] Array of measurement names. + * @param measurement_names_len [in] Number of measurement names. + * @param offset [in] Rows to skip (>= 0). + * @param limit [in] Max rows to return (< 0 = unlimited). + * @param err_code [out] Error code. + * @return ResultSet handle. Must be freed with free_tsfile_result_set(). + */ +ResultSet tsfile_reader_query_tree_by_row(TsFileReader reader, + char** device_ids, int device_ids_len, + char** measurement_names, + int measurement_names_len, int offset, + int limit, ERRNO* err_code); + +/** + * @brief Query table model data by row range (offset + limit). + * + * Queries the full time range, skips the first `offset` rows, and returns at + * most `limit` rows. Once `limit` rows are returned, underlying data loading + * stops immediately. + * + * @param reader [in] Valid TsFileReader handle. + * @param table_name [in] Target table name. + * @param column_names [in] Array of column names. + * @param column_names_len [in] Number of column names. + * @param offset [in] Rows to skip (>= 0). + * @param limit [in] Max rows to return (< 0 = unlimited). + * @param err_code [out] Error code. + * @return ResultSet handle. Must be freed with free_tsfile_result_set(). + */ +ResultSet tsfile_reader_query_table_by_row(TsFileReader reader, + const char* table_name, + char** column_names, + int column_names_len, int offset, + int limit, ERRNO* err_code); + // ResultSet tsfile_reader_query_device(TsFileReader reader, // const char* device_name, // char** sensor_name, uint32_t sensor_num, diff --git a/cpp/src/reader/row_range_result_set.cc b/cpp/src/reader/row_range_result_set.cc new file mode 100644 index 000000000..9d8b8823d --- /dev/null +++ b/cpp/src/reader/row_range_result_set.cc @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "reader/row_range_result_set.h" + +namespace storage { + +RowRangeResultSet::RowRangeResultSet(ResultSet* inner, int offset, int limit) + : inner_(inner), + offset_(offset < 0 ? 0 : offset), + limit_(limit), + returned_count_(0), + offset_skipped_(false) {} + +RowRangeResultSet::~RowRangeResultSet() { close(); } + +int RowRangeResultSet::next(bool& has_next) { + // ① Skip the first `offset_` rows on the first call. + if (!offset_skipped_) { + for (int i = 0; i < offset_; i++) { + int ret = inner_->next(has_next); + if (ret != common::E_OK) return ret; + if (!has_next) { + has_next = false; + offset_skipped_ = true; + return common::E_OK; + } + } + offset_skipped_ = true; + } + + // ② Limit reached: return immediately without touching inner ResultSet. + // This is the key "pushdown" effect: no further chunk/page loading + // occurs. + if (limit_ >= 0 && returned_count_ >= limit_) { + has_next = false; + return common::E_OK; + } + + // ③ Normal delegation. + int ret = inner_->next(has_next); + if (ret == common::E_OK && has_next) { + returned_count_++; + } + return ret; +} + +bool RowRangeResultSet::is_null(const std::string& column_name) { + return inner_->is_null(column_name); +} + +bool RowRangeResultSet::is_null(uint32_t column_index) { + return inner_->is_null(column_index); +} + +RowRecord* RowRangeResultSet::get_row_record() { + return inner_->get_row_record(); +} + +std::shared_ptr RowRangeResultSet::get_metadata() { + return inner_->get_metadata(); +} + +void RowRangeResultSet::close() { + if (inner_ != nullptr) { + delete inner_; + inner_ = nullptr; + } +} + +} // namespace storage diff --git a/cpp/src/reader/row_range_result_set.h b/cpp/src/reader/row_range_result_set.h new file mode 100644 index 000000000..a5f0b97c4 --- /dev/null +++ b/cpp/src/reader/row_range_result_set.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef READER_ROW_RANGE_RESULT_SET_H +#define READER_ROW_RANGE_RESULT_SET_H + +#include +#include + +#include "reader/result_set.h" + +namespace storage { + +/** + * @brief A ResultSet wrapper that applies row-level offset and limit. + * + * Takes ownership of the inner ResultSet and releases it on close(). + * Once the limit is reached, next() returns has_next=false immediately + * without calling the underlying ResultSet, avoiding unnecessary data loading. + * + * @param offset Number of leading rows to skip (must be >= 0). + * @param limit Maximum number of rows to return. A value < 0 means + * no limit (all remaining rows are returned). + */ +class RowRangeResultSet : public ResultSet { + public: + RowRangeResultSet(ResultSet* inner, int offset, int limit); + ~RowRangeResultSet() override; + + int next(bool& has_next) override; + bool is_null(const std::string& column_name) override; + bool is_null(uint32_t column_index) override; + RowRecord* get_row_record() override; + std::shared_ptr get_metadata() override; + void close() override; + + private: + ResultSet* inner_; + int offset_; + int limit_; + int returned_count_; + bool offset_skipped_; +}; + +} // namespace storage +#endif // READER_ROW_RANGE_RESULT_SET_H diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index f97570885..10e5a515f 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -18,8 +18,11 @@ */ #include "tsfile_reader.h" +#include + #include "common/schema.h" #include "filter/time_operator.h" +#include "reader/row_range_result_set.h" #include "tsfile_executor.h" using namespace common; @@ -114,6 +117,16 @@ int TsFileReader::query(const std::string& table_name, return ret; } +int TsFileReader::queryByRow(const std::string& table_name, + const std::vector& column_names, + int offset, int limit, ResultSet*& result_set) { + ResultSet* inner = nullptr; + int ret = query(table_name, column_names, INT64_MIN, INT64_MAX, inner); + if (ret != common::E_OK) return ret; + result_set = new RowRangeResultSet(inner, offset, limit); + return common::E_OK; +} + int TsFileReader::query_table_on_tree( const std::vector& measurement_names, int64_t star_time, int64_t end_time, ResultSet*& result_set) { diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h index 8a6ba2264..15534e503 100644 --- a/cpp/src/reader/tsfile_reader.h +++ b/cpp/src/reader/tsfile_reader.h @@ -113,6 +113,24 @@ class TsFileReader { const std::vector& columns_names, int64_t start_time, int64_t end_time, ResultSet*& result_set, Filter* tag_filter); + /** + * @brief Query table model data by row range. + * + * Internally queries the full time range [INT64_MIN, INT64_MAX] and + * applies offset/limit at the result-set level. Once `limit` rows have + * been returned, no further data is loaded from storage. + * + * @param table_name Target table name. + * @param column_names Columns to query. + * @param offset Number of leading rows to skip (>= 0). + * @param limit Maximum rows to return. < 0 means unlimited. + * @param[out] result_set The result set containing query results. + * @return Returns 0 on success, or a non-zero error code on failure. + */ + int queryByRow(const std::string& table_name, + const std::vector& column_names, int offset, + int limit, ResultSet*& result_set); + int query_table_on_tree(const std::vector& measurement_names, int64_t star_time, int64_t end_time, ResultSet*& result_set); diff --git a/cpp/src/reader/tsfile_tree_reader.cc b/cpp/src/reader/tsfile_tree_reader.cc index 2b28c8647..1b66646d4 100644 --- a/cpp/src/reader/tsfile_tree_reader.cc +++ b/cpp/src/reader/tsfile_tree_reader.cc @@ -19,6 +19,10 @@ #include "reader/tsfile_tree_reader.h" +#include + +#include "reader/row_range_result_set.h" + namespace storage { TsFileTreeReader::TsFileTreeReader() { @@ -47,6 +51,17 @@ int TsFileTreeReader::query(const std::vector& device_ids, return tsfile_reader_->query(path_list, start_time, end_time, result_set); } +int TsFileTreeReader::queryByRow( + const std::vector& device_ids, + const std::vector& measurement_names, int offset, int limit, + ResultSet*& result_set) { + ResultSet* inner = nullptr; + int ret = query(device_ids, measurement_names, INT64_MIN, INT64_MAX, inner); + if (ret != common::E_OK) return ret; + result_set = new RowRangeResultSet(inner, offset, limit); + return common::E_OK; +} + void TsFileTreeReader::destroy_query_data_set(ResultSet* qds) { tsfile_reader_->destroy_query_data_set(qds); } diff --git a/cpp/src/reader/tsfile_tree_reader.h b/cpp/src/reader/tsfile_tree_reader.h index 66341b7ed..7d81058e4 100644 --- a/cpp/src/reader/tsfile_tree_reader.h +++ b/cpp/src/reader/tsfile_tree_reader.h @@ -67,6 +67,26 @@ class TsFileTreeReader { const std::vector& measurement_names, int64_t start_time, int64_t end_time, ResultSet*& result_set); + /** + * @brief Query tree model data by row range. + * + * Internally queries the full time range [INT64_MIN, INT64_MAX] and + * applies offset/limit at the result-set level. Once `limit` rows have + * been returned, no further data is loaded from storage. + * + * @param device_ids List of device identifiers to query. + * @param measurement_names List of measurement names to query. + * @param offset Number of leading rows to skip (>= 0). + * @param limit Maximum rows to return. < 0 means unlimited. + * @param[out] result_set The result set containing query results. + * @return Returns 0 on success, or a non-zero error code on failure. + * The caller is responsible for destroying the result set using + * destroy_query_data_set(). + */ + int queryByRow(const std::vector& device_ids, + const std::vector& measurement_names, + int offset, int limit, ResultSet*& result_set); + /** * @brief Destroy and deallocate the query result set * diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 2c2e46b97..856e19698 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -808,26 +808,38 @@ int TsFileWriter::write_table(Tablet& tablet) { value_chunk_writers))) { return ret; } + // Row-by-row write so that when time page seals (e.g. by memory + // threshold), we can seal all value pages together (Java + // semantics). for (int i = start_idx; i < end_idx; i++) { + int32_t time_pages_before = time_chunk_writer->num_of_pages(); if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[i]))) { return ret; } - } - uint32_t field_col_count = 0; - for (uint32_t i = 0; i < tablet.get_column_count(); ++i) { - if (tablet.column_categories_[i] == - common::ColumnCategory::FIELD) { - ValueChunkWriter* value_chunk_writer = - value_chunk_writers[field_col_count]; - if (IS_NULL(value_chunk_writer)) { - continue; + uint32_t field_col_count = 0; + for (uint32_t col = 0; col < tablet.get_column_count(); ++col) { + if (tablet.column_categories_[col] == + common::ColumnCategory::FIELD) { + ValueChunkWriter* value_chunk_writer = + value_chunk_writers[field_col_count]; + if (!IS_NULL(value_chunk_writer) && + RET_FAIL(value_write_column( + value_chunk_writer, tablet, col, i, i + 1))) { + return ret; + } + field_col_count++; } - - if (RET_FAIL(value_write_column(value_chunk_writer, tablet, - i, start_idx, end_idx))) { - return ret; + } + int32_t time_pages_after = time_chunk_writer->num_of_pages(); + if (time_pages_after > time_pages_before) { + for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { + if (!IS_NULL(value_chunk_writers[k]) && + value_chunk_writers[k]->has_current_page_data() && + RET_FAIL( + value_chunk_writers[k]->seal_current_page())) { + return ret; + } } - field_col_count++; } } start_idx = end_idx; diff --git a/cpp/src/writer/value_chunk_writer.cc b/cpp/src/writer/value_chunk_writer.cc index e4bb52658..43a7122d1 100644 --- a/cpp/src/writer/value_chunk_writer.cc +++ b/cpp/src/writer/value_chunk_writer.cc @@ -110,7 +110,7 @@ int ValueChunkWriter::seal_cur_page(bool end_chunk) { /*stat*/ false, /*data*/ false); if (IS_SUCC(ret)) { save_first_page_data(value_page_writer_); - // value_page_writer_.destroy_page_data(); + value_page_writer_.clear_page_data(); value_page_writer_.reset(); } } diff --git a/cpp/src/writer/value_chunk_writer.h b/cpp/src/writer/value_chunk_writer.h index 859fb57b0..ad46fce6f 100644 --- a/cpp/src/writer/value_chunk_writer.h +++ b/cpp/src/writer/value_chunk_writer.h @@ -118,6 +118,18 @@ class ValueChunkWriter { bool hasData(); + /** True if the current (unsealed) page has at least one point. */ + bool has_current_page_data() const { + return value_page_writer_.get_point_numer() > 0; + } + + /** + * Force seal the current page (for aligned table model: when time page + * seals due to memory/point threshold, all value pages must seal together). + * @return E_OK on success. + */ + int seal_current_page() { return seal_cur_page(false); } + private: FORCE_INLINE bool is_cur_page_full() const { // FIXME diff --git a/cpp/src/writer/value_page_writer.h b/cpp/src/writer/value_page_writer.h index 60d75b0b8..9057102cb 100644 --- a/cpp/src/writer/value_page_writer.h +++ b/cpp/src/writer/value_page_writer.h @@ -51,7 +51,7 @@ struct ValuePageData { common::ByteStream& value_bs, Compressor* compressor, uint32_t size); void destroy() { - // Be careful about the memory + // Be careful about the memory; only free if we own valid pointers if (uncompressed_buf_ != nullptr) { common::mem_free(uncompressed_buf_); uncompressed_buf_ = nullptr; @@ -60,6 +60,19 @@ struct ValuePageData { compressor_->after_compress(compressed_buf_); compressed_buf_ = nullptr; } + compressor_ = nullptr; + } + + /** Clear pointers without freeing (transfer ownership to another holder). + */ + void clear() { + col_notnull_bitmap_buf_size_ = 0; + value_buf_size_ = 0; + uncompressed_size_ = 0; + compressed_size_ = 0; + uncompressed_buf_ = nullptr; + compressed_buf_ = nullptr; + compressor_ = nullptr; } }; @@ -183,6 +196,8 @@ class ValuePageWriter { FORCE_INLINE Statistic* get_statistic() { return statistic_; } ValuePageData get_cur_page_data() { return cur_page_data_; } void destroy_page_data() { cur_page_data_.destroy(); } + /** Clear cur_page_data_ without freeing (after ownership transferred). */ + void clear_page_data() { cur_page_data_.clear(); } private: FORCE_INLINE int prepare_end_page() { diff --git a/cpp/test/cwrapper/cwrapper_test.cc b/cpp/test/cwrapper/cwrapper_test.cc index 5998939af..a2601e228 100644 --- a/cpp/test/cwrapper/cwrapper_test.cc +++ b/cpp/test/cwrapper/cwrapper_test.cc @@ -310,4 +310,230 @@ TEST_F(CWrapperTest, WriterFlushTabletAndReadData) { free(data_types); free_write_file(&file); } + +// ───────────────────────────────────────────────────────────────────────────── +// tsfile_reader_query_tree_by_row +// ───────────────────────────────────────────────────────────────────────────── + +TEST_F(CWrapperTest, QueryTreeByRow_LimitAndOffset) { + ERRNO code = 0; + const char* filename = "cwrapper_tree_row_query_test.tsfile"; + remove(filename); + + // ---- Write 30 records for "device"."s1" (INT64) ---- + const int total_rows = 30; + TsFileWriter writer = + _tsfile_writer_new(filename, 128 * 1024 * 1024, &code); + ASSERT_EQ(code, RET_OK); + + timeseries_schema ts_schema; + ts_schema.timeseries_name = const_cast("s1"); + ts_schema.data_type = TS_DATATYPE_INT64; + ts_schema.encoding = TS_ENCODING_PLAIN; + ts_schema.compression = TS_COMPRESSION_UNCOMPRESSED; + ASSERT_OK(_tsfile_writer_register_timeseries(writer, "device", &ts_schema)); + + for (int i = 0; i < total_rows; i++) { + TsRecord rec = _ts_record_new("device", static_cast(i), 1); + _insert_data_into_ts_record_by_name_int64_t( + rec, "s1", static_cast(i * 10)); + ASSERT_OK(_tsfile_writer_write_ts_record(writer, rec)); + _free_tsfile_ts_record(&rec); + } + ASSERT_OK(_tsfile_writer_flush(writer)); + ASSERT_OK(_tsfile_writer_close(writer)); + + // ---- Read phase ---- + TsFileReader reader = tsfile_reader_new(filename, &code); + ASSERT_EQ(code, RET_OK); + + char* devs[] = {const_cast("device")}; + char* meas[] = {const_cast("s1")}; + + // ① limit=0 → empty result + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 0, 0, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 0); + free_tsfile_result_set(&rs); + } + // ② limit < total → exactly `limit` rows + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 0, 10, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 10); + free_tsfile_result_set(&rs); + } + // ③ limit=-1 → unlimited (all rows) + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 0, -1, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, total_rows); + free_tsfile_result_set(&rs); + } + // ④ offset=20, limit=20 → 10 rows remain + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 20, 20, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 10); + free_tsfile_result_set(&rs); + } + // ⑤ offset beyond total → empty + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 1000, 10, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 0); + free_tsfile_result_set(&rs); + } + // ⑥ data correctness: offset=5, limit=5 → timestamps 5..9 + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 5, 5, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) { + // column 1 = time + int64_t ts = tsfile_result_set_get_value_by_index_int64_t(rs, 1); + EXPECT_EQ(ts, static_cast(5 + cnt)); + cnt++; + } + EXPECT_EQ(cnt, 5); + free_tsfile_result_set(&rs); + } + + tsfile_reader_close(reader); + remove(filename); +} + +// ───────────────────────────────────────────────────────────────────────────── +// tsfile_reader_query_table_by_row +// ───────────────────────────────────────────────────────────────────────────── + +TEST_F(CWrapperTest, QueryTableByRow_LimitAndOffset) { + ERRNO code = 0; + const char* filename = "cwrapper_table_row_query_test.tsfile"; + remove(filename); + + // ---- Write 30 rows into table "t1" with column "s0" INT64 ---- + const int total_rows = 30; + + ColumnSchema col_schema; + col_schema.column_name = const_cast("s0"); + col_schema.data_type = TS_DATATYPE_INT64; + col_schema.column_category = FIELD; + + TableSchema schema; + schema.table_name = const_cast("t1"); + schema.column_num = 1; + schema.column_schemas = &col_schema; + + WriteFile wf = write_file_new(filename, &code); + ASSERT_EQ(code, RET_OK); + TsFileWriter writer = tsfile_writer_new(wf, &schema, &code); + ASSERT_EQ(code, RET_OK); + + char* col_name_arr[] = {const_cast("s0")}; + TSDataType dtype_arr[] = {TS_DATATYPE_INT64}; + Tablet tablet = tablet_new(col_name_arr, dtype_arr, 1, total_rows); + for (int i = 0; i < total_rows; i++) { + tablet_add_timestamp(tablet, i, static_cast(i)); + tablet_add_value_by_index_int64_t(tablet, i, 0, + static_cast(i)); + } + ASSERT_OK(tsfile_writer_write(writer, tablet)); + free_tablet(&tablet); + ASSERT_OK(tsfile_writer_close(writer)); + free_write_file(&wf); + + // ---- Read phase ---- + TsFileReader reader = tsfile_reader_new(filename, &code); + ASSERT_EQ(code, RET_OK); + + char* cols[] = {const_cast("s0")}; + + // ① limit=0 → empty result + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 0, 0, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 0); + free_tsfile_result_set(&rs); + } + // ② limit < total → exactly `limit` rows + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 0, 10, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 10); + free_tsfile_result_set(&rs); + } + // ③ limit=-1 → unlimited (all rows) + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 0, -1, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, total_rows); + free_tsfile_result_set(&rs); + } + // ④ offset=20, limit=20 → 10 rows remain + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 20, 20, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 10); + free_tsfile_result_set(&rs); + } + // ⑤ offset beyond total → empty + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 1000, 10, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 0); + free_tsfile_result_set(&rs); + } + // ⑥ data correctness: offset=5, limit=5 → timestamps 5..9 + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 5, 5, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) { + // column 1 = time + int64_t ts = tsfile_result_set_get_value_by_index_int64_t(rs, 1); + EXPECT_EQ(ts, static_cast(5 + cnt)); + cnt++; + } + EXPECT_EQ(cnt, 5); + free_tsfile_result_set(&rs); + } + + tsfile_reader_close(reader); + remove(filename); +} + } // namespace cwrapper \ No newline at end of file diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_test.cc index b9f0eb213..4b1a8259f 100644 --- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc +++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc @@ -216,6 +216,21 @@ TEST_F(TsFileTableReaderTest, TableModelQueryOneSmallPage) { g_config_value_.page_writer_max_point_num_ = prev_config; } +// Triggers memory-based seal in aligned table: time page seals by size while +// value pages may not; ensure value pages are sealed together with time (no +// time-page-sealed / value-page-not-sealed inconsistency). +// Use 512 bytes so time seals by size before point count; 128 was too small +// and could produce misaligned time/value pages on some encodings. +TEST_F(TsFileTableReaderTest, TableModelQueryMemoryBasedSeal) { + uint32_t prev_point_num = g_config_value_.page_writer_max_point_num_; + uint32_t prev_mem_bytes = g_config_value_.page_writer_max_memory_bytes_; + g_config_value_.page_writer_max_point_num_ = 10000; + g_config_value_.page_writer_max_memory_bytes_ = 512; + test_table_model_query(50, 1); + g_config_value_.page_writer_max_point_num_ = prev_point_num; + g_config_value_.page_writer_max_memory_bytes_ = prev_mem_bytes; +} + TEST_F(TsFileTableReaderTest, TableModelQueryOneLargePage) { int prev_config = g_config_value_.page_writer_max_point_num_; g_config_value_.page_writer_max_point_num_ = 10000; diff --git a/cpp/test/reader/table_view/tsfile_table_reader_row_query_test.cc b/cpp/test/reader/table_view/tsfile_table_reader_row_query_test.cc new file mode 100644 index 000000000..e8549db05 --- /dev/null +++ b/cpp/test/reader/table_view/tsfile_table_reader_row_query_test.cc @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include + +#include "common/config/config.h" +#include "common/schema.h" +#include "common/tablet.h" +#include "file/write_file.h" +#include "reader/result_set.h" +#include "reader/tsfile_reader.h" +#include "writer/tsfile_table_writer.h" + +using namespace storage; +using namespace common; + +// ───────────────────────────────────────────────────────────────────────────── +// Fixture +// ───────────────────────────────────────────────────────────────────────────── + +class TsFileTableReaderRowQueryTest : public ::testing::Test { + protected: + static std::string generate_random_string(int length) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 61); + const std::string chars = + "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + std::string s; + for (int i = 0; i < length; ++i) s += chars[dis(gen)]; + return s; + } + + void SetUp() override { + libtsfile_init(); + file_name_ = "tsfile_table_row_query_test_" + + generate_random_string(10) + ".tsfile"; + remove(file_name_.c_str()); + } + + void TearDown() override { + remove(file_name_.c_str()); + libtsfile_destroy(); + } + + // Write a simple table "t1" with columns ["s0" INT64] and `num_rows` rows. + // Timestamps and values are both 0..num_rows-1. + void write_simple_table(int num_rows) { + WriteFile wf; + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + wf.create(file_name_, flags, 0666); + + std::vector col_schemas; + col_schemas.emplace_back("s0", TSDataType::INT64, + CompressionType::UNCOMPRESSED, + TSEncoding::PLAIN, ColumnCategory::FIELD); + auto* schema = new TableSchema(TABLE_NAME, col_schemas); + auto writer = std::make_shared(&wf, schema); + + Tablet tablet(TABLE_NAME, {"s0"}, {TSDataType::INT64}, + {ColumnCategory::FIELD}, num_rows); + for (int i = 0; i < num_rows; i++) { + tablet.add_timestamp(i, static_cast(i)); + tablet.add_value(i, 0, static_cast(i)); + } + writer->write_table(tablet); + writer->flush(); + writer->close(); + delete schema; + } + + // Count rows returned by queryByRow. + int count_rows(int offset, int limit) { + TsFileReader reader; + EXPECT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + EXPECT_EQ(reader.queryByRow(TABLE_NAME, {"s0"}, offset, limit, rs), + E_OK); + EXPECT_NE(rs, nullptr); + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) count++; + reader.destroy_query_data_set(rs); + reader.close(); + return count; + } + + std::string file_name_; + enum { TOTAL_ROWS = 50 }; + static constexpr const char* TABLE_NAME = "t1"; +}; + +// ───────────────────────────────────────────────────────────────────────────── +// Tests +// ───────────────────────────────────────────────────────────────────────────── + +// ① limit = 0: empty result +TEST_F(TsFileTableReaderRowQueryTest, LimitZeroReturnsEmpty) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(0, 0), 0); +} + +// ② limit < total: only `limit` rows +TEST_F(TsFileTableReaderRowQueryTest, LimitLessThanTotal) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(0, 10), 10); +} + +// ③ limit > total: all rows +TEST_F(TsFileTableReaderRowQueryTest, LimitExceedsTotal) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(0, 9999), TOTAL_ROWS); +} + +// ④ limit < 0: unlimited, returns all rows +TEST_F(TsFileTableReaderRowQueryTest, NegativeLimitMeansUnlimited) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(0, -1), TOTAL_ROWS); +} + +// ⑤ offset in the middle +TEST_F(TsFileTableReaderRowQueryTest, OffsetPlusLimit) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(10, 15), 15); +} + +// ⑥ offset >= total: empty result +TEST_F(TsFileTableReaderRowQueryTest, OffsetBeyondTotal) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(1000, 10), 0); +} + +// ⑦ offset + limit > total: return only remaining rows +TEST_F(TsFileTableReaderRowQueryTest, OffsetPlusLimitExceedsTotal) { + write_simple_table(TOTAL_ROWS); + // offset=40, limit=20 → 10 rows remain + ASSERT_EQ(count_rows(40, 20), 10); +} + +// ⑧ Data correctness: time column starts at offset +TEST_F(TsFileTableReaderRowQueryTest, OffsetDataCorrectness) { + write_simple_table(TOTAL_ROWS); + + TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow(TABLE_NAME, {"s0"}, 5, 10, rs), E_OK); + + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) { + // column 1 = time + int64_t ts = rs->get_value(1); + EXPECT_EQ(ts, static_cast(5 + count)); + // column 2 = s0 (same value as timestamp in our write helper) + int64_t val = rs->get_value(2); + EXPECT_EQ(val, static_cast(5 + count)); + count++; + } + EXPECT_EQ(count, 10); + reader.destroy_query_data_set(rs); + reader.close(); +} + +// ⑨ Large dataset, small page size: limit stops loading early (no hang/OOM) +TEST_F(TsFileTableReaderRowQueryTest, LimitStopsEarlyAcrossPages) { + int prev = g_config_value_.page_writer_max_point_num_; + g_config_value_.page_writer_max_point_num_ = 5; + + write_simple_table(200); + + ASSERT_EQ(count_rows(0, 50), 50); + + g_config_value_.page_writer_max_point_num_ = prev; +} + +// ⑩ Metadata accessible via queryByRow result set +TEST_F(TsFileTableReaderRowQueryTest, MetadataAccessible) { + write_simple_table(10); + + TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow(TABLE_NAME, {"s0"}, 0, 5, rs), E_OK); + + auto meta = rs->get_metadata(); + ASSERT_NE(meta, nullptr); + EXPECT_EQ(meta->get_column_name(1), "time"); + EXPECT_EQ(meta->get_column_name(2), "s0"); + EXPECT_EQ(meta->get_column_count(), 2u); + + reader.destroy_query_data_set(rs); + reader.close(); +} + +// ⑪ Paging consistency: two pages together equal full result +TEST_F(TsFileTableReaderRowQueryTest, PaginationConsistency) { + write_simple_table(40); + ASSERT_EQ(count_rows(0, 20) + count_rows(20, 20), 40); +} + +// ⑫ queryByRow result equivalent to full query with limit applied +TEST_F(TsFileTableReaderRowQueryTest, EquivalentToFullQueryWithLimit) { + write_simple_table(TOTAL_ROWS); + + // Full query row count + TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs_full = nullptr; + ASSERT_EQ(reader.query(TABLE_NAME, {"s0"}, INT64_MIN, INT64_MAX, rs_full), + E_OK); + int full_count = 0; + bool has_next = false; + while (IS_SUCC(rs_full->next(has_next)) && has_next) full_count++; + reader.destroy_query_data_set(rs_full); + + // queryByRow with limit < 0 should return the same count + ResultSet* rs_unlimited = nullptr; + ASSERT_EQ(reader.queryByRow(TABLE_NAME, {"s0"}, 0, -1, rs_unlimited), E_OK); + int unlimited_count = 0; + has_next = false; + while (IS_SUCC(rs_unlimited->next(has_next)) && has_next) unlimited_count++; + reader.destroy_query_data_set(rs_unlimited); + reader.close(); + + ASSERT_EQ(full_count, unlimited_count); +} + +// ⑬ Multiple flushes (multiple chunks): offset/limit still correct +TEST_F(TsFileTableReaderRowQueryTest, MultipleChunksCorrectness) { + // Write in two batches to ensure multiple chunks + WriteFile wf; + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + wf.create(file_name_, flags, 0666); + + std::vector col_schemas; + col_schemas.emplace_back("s0", TSDataType::INT64, + CompressionType::UNCOMPRESSED, TSEncoding::PLAIN, + ColumnCategory::FIELD); + auto* schema = new TableSchema(TABLE_NAME, col_schemas); + auto writer = std::make_shared(&wf, schema); + + // First batch: rows 0..29 + Tablet tablet1(TABLE_NAME, {"s0"}, {TSDataType::INT64}, + {ColumnCategory::FIELD}, 30); + for (int i = 0; i < 30; i++) { + tablet1.add_timestamp(i, static_cast(i)); + tablet1.add_value(i, 0, static_cast(i)); + } + writer->write_table(tablet1); + writer->flush(); + + // Second batch: rows 30..59 + Tablet tablet2(TABLE_NAME, {"s0"}, {TSDataType::INT64}, + {ColumnCategory::FIELD}, 30); + for (int i = 0; i < 30; i++) { + tablet2.add_timestamp(i, static_cast(30 + i)); + tablet2.add_value(i, 0, static_cast(30 + i)); + } + writer->write_table(tablet2); + writer->flush(); + writer->close(); + delete schema; + + // offset=25, limit=20 → rows 25..44 + TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow(TABLE_NAME, {"s0"}, 25, 20, rs), E_OK); + + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) { + int64_t ts = rs->get_value(1); + EXPECT_EQ(ts, static_cast(25 + count)); + count++; + } + EXPECT_EQ(count, 20); + reader.destroy_query_data_set(rs); + reader.close(); +} diff --git a/cpp/test/reader/tree_view/tsfile_tree_reader_row_query_test.cc b/cpp/test/reader/tree_view/tsfile_tree_reader_row_query_test.cc new file mode 100644 index 000000000..bc1930204 --- /dev/null +++ b/cpp/test/reader/tree_view/tsfile_tree_reader_row_query_test.cc @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include + +#include "common/config/config.h" +#include "common/record.h" +#include "common/schema.h" +#include "file/write_file.h" +#include "reader/result_set.h" +#include "reader/tsfile_tree_reader.h" +#include "writer/tsfile_tree_writer.h" + +using namespace storage; +using namespace common; + +// ───────────────────────────────────────────────────────────────────────────── +// Fixture +// ───────────────────────────────────────────────────────────────────────────── + +class TsFileTreeReaderRowQueryTest : public ::testing::Test { + protected: + static std::string generate_random_string(int length) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 61); + const std::string chars = + "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + std::string s; + for (int i = 0; i < length; ++i) s += chars[dis(gen)]; + return s; + } + + void SetUp() override { + libtsfile_init(); + file_name_ = "tsfile_tree_row_query_test_" + + generate_random_string(10) + ".tsfile"; + remove(file_name_.c_str()); + } + + void TearDown() override { + remove(file_name_.c_str()); + libtsfile_destroy(); + } + + // Write `num_rows` records for each device in `device_ids`, + // using measurement `mea` (INT64). Timestamps are 0..num_rows-1, + // values are timestamp * 10. + void write_data(const std::vector& device_ids, + const std::string& mea, int num_rows) { + WriteFile wf; + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + wf.create(file_name_, flags, 0666); + TsFileTreeWriter writer(&wf); + for (auto dev : + device_ids) { // copy: register_timeseries needs non-const ref + auto* schema = new MeasurementSchema(mea, INT64); + writer.register_timeseries(dev, schema); + delete schema; + for (int i = 0; i < num_rows; i++) { + TsRecord record(dev, static_cast(i)); + record.add_point(mea, static_cast(i * 10)); + writer.write(record); + } + } + writer.flush(); + writer.close(); + } + + // Count the number of rows returned by queryByRow. + int count_rows(const std::string& file, + const std::vector& devs, const std::string& mea, + int offset, int limit) { + TsFileTreeReader reader; + EXPECT_EQ(reader.open(file), E_OK); + ResultSet* rs = nullptr; + EXPECT_EQ(reader.queryByRow(devs, {mea}, offset, limit, rs), E_OK); + EXPECT_NE(rs, nullptr); + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) count++; + reader.destroy_query_data_set(rs); + reader.close(); + return count; + } + + std::string file_name_; + enum { TOTAL_ROWS = 50 }; + const std::string DEV = "device"; + const std::string MEA = "s1"; +}; + +// ───────────────────────────────────────────────────────────────────────────── +// Tests +// ───────────────────────────────────────────────────────────────────────────── + +// ① limit = 0: empty result +TEST_F(TsFileTreeReaderRowQueryTest, LimitZeroReturnsEmpty) { + write_data({DEV}, MEA, TOTAL_ROWS); + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 0, 0), 0); +} + +// ② limit < total: only `limit` rows returned +TEST_F(TsFileTreeReaderRowQueryTest, LimitLessThanTotal) { + write_data({DEV}, MEA, TOTAL_ROWS); + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 0, 20), 20); +} + +// ③ limit > total: all rows returned +TEST_F(TsFileTreeReaderRowQueryTest, LimitExceedsTotal) { + write_data({DEV}, MEA, TOTAL_ROWS); + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 0, 1000), TOTAL_ROWS); +} + +// ④ limit < 0: unlimited, returns all rows +TEST_F(TsFileTreeReaderRowQueryTest, NegativeLimitMeansUnlimited) { + write_data({DEV}, MEA, TOTAL_ROWS); + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 0, -1), TOTAL_ROWS); +} + +// ⑤ offset + limit in the middle of the data +TEST_F(TsFileTreeReaderRowQueryTest, OffsetPlusLimit) { + write_data({DEV}, MEA, TOTAL_ROWS); + // offset=10, limit=15 → should return exactly 15 rows + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 10, 15), 15); +} + +// ⑥ offset >= total: empty result +TEST_F(TsFileTreeReaderRowQueryTest, OffsetBeyondTotal) { + write_data({DEV}, MEA, TOTAL_ROWS); + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 1000, 10), 0); +} + +// ⑦ offset + limit > total: return remaining rows from offset +TEST_F(TsFileTreeReaderRowQueryTest, OffsetPlusLimitExceedsTotal) { + write_data({DEV}, MEA, TOTAL_ROWS); + // offset=40, limit=20 → only 10 rows remain + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 40, 20), 10); +} + +// ⑧ Data correctness: verify timestamps start from `offset` +TEST_F(TsFileTreeReaderRowQueryTest, OffsetDataCorrectness) { + write_data({DEV}, MEA, TOTAL_ROWS); + TsFileTreeReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow({DEV}, {MEA}, 5, 10, rs), E_OK); + + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) { + int64_t ts = rs->get_row_record()->get_timestamp(); + EXPECT_EQ(ts, static_cast(5 + count)); + int64_t val = rs->get_value(2); + EXPECT_EQ(val, (5 + count) * 10); + count++; + } + EXPECT_EQ(count, 10); + reader.destroy_query_data_set(rs); + reader.close(); +} + +// ⑨ Cross-chunk: small page size to force multiple chunks, verify correctness +TEST_F(TsFileTreeReaderRowQueryTest, CorrectnessAcrossChunks) { + int prev = g_config_value_.page_writer_max_point_num_; + g_config_value_.page_writer_max_point_num_ = 5; // tiny pages + + write_data({DEV}, MEA, 30); + + TsFileTreeReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow({DEV}, {MEA}, 5, 10, rs), E_OK); + + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) { + int64_t ts = rs->get_row_record()->get_timestamp(); + EXPECT_EQ(ts, static_cast(5 + count)); + count++; + } + EXPECT_EQ(count, 10); + reader.destroy_query_data_set(rs); + reader.close(); + + g_config_value_.page_writer_max_point_num_ = prev; +} + +// ⑩ Multiple devices: verify total row count with offset/limit +TEST_F(TsFileTreeReaderRowQueryTest, MultipleDevicesOffsetLimit) { + // device_1 and device_2 each have 20 rows with timestamps 0..19. + // QDSWithoutTimeGenerator merges them by time: at each timestamp both + // devices' values appear in one RowRecord, so total distinct timestamps + // = 20. + write_data({"device_1", "device_2"}, MEA, 20); + + ASSERT_EQ(count_rows(file_name_, {"device_1", "device_2"}, MEA, 5, 10), 10); +} + +// ⑪ queryByRow result set supports metadata inspection +TEST_F(TsFileTreeReaderRowQueryTest, MetadataAccessible) { + write_data({DEV}, MEA, 10); + + TsFileTreeReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow({DEV}, {MEA}, 0, 5, rs), E_OK); + + auto meta = rs->get_metadata(); + ASSERT_NE(meta, nullptr); + // column 1 = "time", column 2 = measurement + EXPECT_EQ(meta->get_column_name(1), "time"); + EXPECT_EQ(meta->get_column_count(), 2u); + + reader.destroy_query_data_set(rs); + reader.close(); +} + +// ⑫ Paging consistency: two pages together equal the full result +TEST_F(TsFileTreeReaderRowQueryTest, PaginationConsistency) { + write_data({DEV}, MEA, 40); + + int page1 = count_rows(file_name_, {DEV}, MEA, 0, 20); + int page2 = count_rows(file_name_, {DEV}, MEA, 20, 20); + ASSERT_EQ(page1 + page2, 40); +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/RowRangeResultSet.java b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/RowRangeResultSet.java new file mode 100644 index 000000000..eb077efdd --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/RowRangeResultSet.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.query.dataset; + +import org.apache.tsfile.write.record.TSRecord; + +import java.io.IOException; +import java.time.LocalDate; +import java.util.Iterator; + +/** + * A {@link ResultSet} wrapper that applies row-level offset and limit. + * + *

Takes the inner ResultSet and releases it on {@link #close()}. Once the limit is reached, + * {@link #next()} returns {@code false} immediately without calling the underlying ResultSet, + * avoiding unnecessary data loading. + * + * @param offset Number of leading rows to skip (must be >= 0). + * @param limit Maximum number of rows to return. A value < 0 means no limit. + */ +public class RowRangeResultSet implements ResultSet { + + private final ResultSet inner; + private final int offset; + private final int limit; + private int returnedCount; + private boolean offsetSkipped; + + public RowRangeResultSet(ResultSet inner, int offset, int limit) { + this.inner = inner; + this.offset = Math.max(0, offset); + this.limit = limit; + this.returnedCount = 0; + this.offsetSkipped = false; + } + + @Override + public ResultSetMetadata getMetadata() { + return inner.getMetadata(); + } + + @Override + public boolean next() throws IOException { + // Skip the first `offset` rows on the first call. + if (!offsetSkipped) { + for (int i = 0; i < offset; i++) { + if (!inner.next()) { + offsetSkipped = true; + return false; + } + } + offsetSkipped = true; + } + + // Limit reached: return false without touching inner ResultSet. + // This is the key "pushdown" effect: no further chunk/page loading occurs. + if (limit >= 0 && returnedCount >= limit) { + return false; + } + + boolean hasNext = inner.next(); + if (hasNext) { + returnedCount++; + } + return hasNext; + } + + @Override + public int getInt(String columnName) { + return inner.getInt(columnName); + } + + @Override + public int getInt(int columnIndex) { + return inner.getInt(columnIndex); + } + + @Override + public long getLong(String columnName) { + return inner.getLong(columnName); + } + + @Override + public long getLong(int columnIndex) { + return inner.getLong(columnIndex); + } + + @Override + public float getFloat(String columnName) { + return inner.getFloat(columnName); + } + + @Override + public float getFloat(int columnIndex) { + return inner.getFloat(columnIndex); + } + + @Override + public double getDouble(String columnName) { + return inner.getDouble(columnName); + } + + @Override + public double getDouble(int columnIndex) { + return inner.getDouble(columnIndex); + } + + @Override + public boolean getBoolean(String columnName) { + return inner.getBoolean(columnName); + } + + @Override + public boolean getBoolean(int columnIndex) { + return inner.getBoolean(columnIndex); + } + + @Override + public String getString(String columnName) { + return inner.getString(columnName); + } + + @Override + public String getString(int columnIndex) { + return inner.getString(columnIndex); + } + + @Override + public LocalDate getDate(String columnName) { + return inner.getDate(columnName); + } + + @Override + public LocalDate getDate(int columnIndex) { + return inner.getDate(columnIndex); + } + + @Override + public byte[] getBinary(String columnName) { + return inner.getBinary(columnName); + } + + @Override + public byte[] getBinary(int columnIndex) { + return inner.getBinary(columnIndex); + } + + @Override + public boolean isNull(String columnName) { + return inner.isNull(columnName); + } + + @Override + public boolean isNull(int columnIndex) { + return inner.isNull(columnIndex); + } + + @Override + public void close() { + inner.close(); + } + + @Override + public Iterator iterator() { + return inner.iterator(); + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java index 927ac8954..72e3a4a0a 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java @@ -33,6 +33,7 @@ import org.apache.tsfile.read.expression.ExpressionTree; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.query.dataset.RowRangeResultSet; import org.apache.tsfile.read.query.dataset.TableResultSet; import org.apache.tsfile.read.query.executor.TableQueryExecutor; import org.apache.tsfile.read.reader.block.TsBlockReader; @@ -113,6 +114,14 @@ public ResultSet query( return new TableResultSet(tsBlockReader, columnNames, dataTypeList, tableName); } + @TsFileApi + @Override + public ResultSet queryByRow(String tableName, List columnNames, int offset, int limit) + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + ResultSet inner = query(tableName, columnNames, Long.MIN_VALUE, Long.MAX_VALUE); + return new RowRangeResultSet(inner, offset, limit); + } + @Override public void close() { try { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java index 995f73f64..bf29ffcee 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java @@ -48,6 +48,26 @@ ResultSet query( @TsFileApi List getAllTableSchema() throws IOException; + /** + * Query table model data by row range. + * + *

Internally queries the full time range and applies offset/limit at the result-set level. + * Once {@code limit} rows are returned, no further data is loaded from storage. + * + * @param tableName target table name + * @param columnNames list of column names to query + * @param offset number of leading rows to skip (>= 0) + * @param limit maximum number of rows to return; < 0 means unlimited + * @return a {@link ResultSet} containing the query results + * @throws ReadProcessException if a read processing error occurs + * @throws IOException if an I/O error occurs + * @throws NoTableException if the table does not exist + * @throws NoMeasurementException if a column does not exist + */ + @TsFileApi + ResultSet queryByRow(String tableName, List columnNames, int offset, int limit) + throws ReadProcessException, IOException, NoTableException, NoMeasurementException; + @TsFileApi void close(); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java index 202553ea8..40ccbad70 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java @@ -47,6 +47,24 @@ ResultSet query( @TreeModel List getDeviceSchema(String deviceId) throws IOException; + /** + * Query tree model data by row range. + * + *

Internally queries the full time range and applies offset/limit at the result-set level. + * Once {@code limit} rows are returned, no further data is loaded from storage. + * + * @param deviceIds list of device identifiers to query + * @param measurementNames list of measurement names to query + * @param offset number of leading rows to skip (>= 0) + * @param limit maximum number of rows to return; < 0 means unlimited + * @return a {@link ResultSet} containing the query results + * @throws IOException if an I/O error occurs during query execution + */ + @TsFileApi + @TreeModel + ResultSet queryByRow(List deviceIds, List measurementNames, int offset, int limit) + throws IOException; + /** Close underlying resources. */ @TsFileApi @TreeModel diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java index fe7a0c35b..c8b659b1b 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java @@ -30,6 +30,7 @@ import org.apache.tsfile.read.filter.operator.TimeFilterOperators; import org.apache.tsfile.read.query.dataset.QueryDataSet; import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.query.dataset.RowRangeResultSet; import org.apache.tsfile.read.query.dataset.TreeResultSet; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -113,6 +114,29 @@ public List getDeviceSchema(String deviceId) throws IOExcepti return tsfileReader.getMeasurement(new StringArrayDeviceID(deviceId)); } + /** + * Query tree model data by row range. + * + *

Internally queries the full time range and applies offset/limit at the result-set level. + * Once {@code limit} rows are returned, no further data is loaded from storage. + * + * @param deviceIds list of device identifiers to query + * @param measurementNames list of measurement names to query + * @param offset number of leading rows to skip (>= 0) + * @param limit maximum number of rows to return; < 0 means unlimited + * @return a {@link ResultSet} containing the query results + * @throws IOException if an I/O error occurs during query execution + */ + @TsFileApi + @TreeModel + @Override + public ResultSet queryByRow( + List deviceIds, List measurementNames, int offset, int limit) + throws IOException { + ResultSet inner = query(deviceIds, measurementNames, Long.MIN_VALUE, Long.MAX_VALUE); + return new RowRangeResultSet(inner, offset, limit); + } + /** * Close the TsFileTreeReader and release resources. * diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/DeviceTableModelReaderRowQueryTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/DeviceTableModelReaderRowQueryTest.java new file mode 100644 index 000000000..54671eec5 --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/DeviceTableModelReaderRowQueryTest.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.read.ReadProcessException; +import org.apache.tsfile.exception.write.NoMeasurementException; +import org.apache.tsfile.exception.write.NoTableException; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.v4.DeviceTableModelReader; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.v4.ITsFileWriter; +import org.apache.tsfile.write.v4.TsFileWriterBuilder; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Unit tests for {@link DeviceTableModelReader#queryByRow} (table-model row-range query). + * + *

The test table has {@value #TOTAL} rows with timestamps 0..TOTAL-1 and field values equal to + * their timestamps. + */ +public class DeviceTableModelReaderRowQueryTest { + + private static final String FILE_PATH = "test_table_reader_row_query.tsfile"; + private static final int TOTAL = 50; + private static final String TABLE = "t1"; + private static final String FIELD = "s0"; + + private DeviceTableModelReader reader; + + @Before + public void setUp() throws IOException, WriteProcessException { + writeTableFile(FILE_PATH, TABLE, FIELD, TOTAL); + reader = new DeviceTableModelReader(new File(FILE_PATH)); + } + + @After + public void tearDown() { + if (reader != null) { + reader.close(); + } + new File(FILE_PATH).delete(); + } + + // ① limit=0 → empty result + @Test + public void testLimitZeroReturnsEmpty() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(0, countRows(0, 0)); + } + + // ② limit < total → exactly `limit` rows + @Test + public void testLimitLessThanTotal() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(10, countRows(0, 10)); + } + + // ③ limit > total → all rows + @Test + public void testLimitExceedsTotal() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(TOTAL, countRows(0, 9999)); + } + + // ④ limit=-1 → unlimited, returns all rows + @Test + public void testNegativeLimitMeansUnlimited() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(TOTAL, countRows(0, -1)); + } + + // ⑤ offset + limit in the middle + @Test + public void testOffsetPlusLimit() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(15, countRows(10, 15)); + } + + // ⑥ offset >= total → empty result + @Test + public void testOffsetBeyondTotal() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(0, countRows(1000, 10)); + } + + // ⑦ offset + limit > total → return remaining rows from offset + @Test + public void testOffsetPlusLimitExceedsTotal() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + // offset=40, limit=20 → only 10 rows remain + Assert.assertEquals(10, countRows(40, 20)); + } + + // ⑧ data correctness: timestamps and values start from `offset` + @Test + public void testDataCorrectness() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + List columns = Collections.singletonList(FIELD); + ResultSet rs = reader.queryByRow(TABLE, columns, 5, 10); + int count = 0; + while (rs.next()) { + long ts = rs.getLong(1); // column 1 = Time + long val = rs.getLong(2); // column 2 = s0 + Assert.assertEquals(5 + count, ts); + Assert.assertEquals(5 + count, val); + count++; + } + rs.close(); + Assert.assertEquals(10, count); + } + + // ⑨ metadata is accessible via the result set + @Test + public void testMetadataAccessible() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + List columns = Collections.singletonList(FIELD); + ResultSet rs = reader.queryByRow(TABLE, columns, 0, 5); + Assert.assertNotNull(rs.getMetadata()); + Assert.assertEquals("Time", rs.getMetadata().getColumnName(1)); + Assert.assertEquals(FIELD, rs.getMetadata().getColumnName(2)); + rs.close(); + } + + // ⑩ paging consistency: two pages together equal the full result + @Test + public void testPaginationConsistency() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + int page1 = countRows(0, 25); + int page2 = countRows(25, 25); + Assert.assertEquals(TOTAL, page1 + page2); + } + + // ⑪ multiple chunks: offset/limit spans chunk boundary correctly + @Test + public void testMultipleChunksCorrectness() + throws IOException, + WriteProcessException, + ReadProcessException, + NoTableException, + NoMeasurementException { + String filePath = "test_table_reader_row_query_multi_chunk.tsfile"; + writeTableFileMultiChunk(filePath, TABLE, FIELD, 30, 30); + try (DeviceTableModelReader r = new DeviceTableModelReader(new File(filePath))) { + // offset=25, limit=20 → rows 25..44 + List columns = Collections.singletonList(FIELD); + ResultSet rs = r.queryByRow(TABLE, columns, 25, 20); + int count = 0; + while (rs.next()) { + long ts = rs.getLong(1); + Assert.assertEquals(25 + count, ts); + count++; + } + rs.close(); + Assert.assertEquals(20, count); + } finally { + new File(filePath).delete(); + } + } + + // ───────────────────────────────────────────────────────────────────────────── + // Helpers + // ───────────────────────────────────────────────────────────────────────────── + + private int countRows(int offset, int limit) + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + List columns = Collections.singletonList(FIELD); + ResultSet rs = reader.queryByRow(TABLE, columns, offset, limit); + int count = 0; + while (rs.next()) { + count++; + } + rs.close(); + return count; + } + + /** + * Write a single-chunk table file with {@code numRows} rows. Timestamps are 0..numRows-1 and + * field values equal their timestamps. + */ + private static void writeTableFile( + String filePath, String tableName, String fieldName, int numRows) + throws IOException, WriteProcessException { + TableSchema tableSchema = + new TableSchema( + tableName, + Collections.singletonList(new MeasurementSchema(fieldName, TSDataType.INT64)), + Collections.singletonList(ColumnCategory.FIELD)); + try (ITsFileWriter writer = + new TsFileWriterBuilder().file(new File(filePath)).tableSchema(tableSchema).build()) { + Tablet tablet = + new Tablet( + tableName, + Collections.singletonList(fieldName), + Collections.singletonList(TSDataType.INT64), + Collections.singletonList(ColumnCategory.FIELD), + numRows); + for (int i = 0; i < numRows; i++) { + tablet.addTimestamp(i, i); + tablet.addValue(fieldName, i, (long) i); + } + writer.write(tablet); + } + } + + /** + * Write a two-chunk table file by using a tiny memory threshold to force a flush between the two + * tablets. First chunk has rows 0..chunk1Rows-1, second chunk has rows + * chunk1Rows..chunk1Rows+chunk2Rows-1. Field values equal their timestamps. + */ + private static void writeTableFileMultiChunk( + String filePath, String tableName, String fieldName, int chunk1Rows, int chunk2Rows) + throws IOException, WriteProcessException { + TableSchema tableSchema = + new TableSchema( + tableName, + Collections.singletonList(new MeasurementSchema(fieldName, TSDataType.INT64)), + Collections.singletonList(ColumnCategory.FIELD)); + // memoryThreshold(1) forces a flush after every write, producing multiple chunks. + try (ITsFileWriter writer = + new TsFileWriterBuilder() + .file(new File(filePath)) + .tableSchema(tableSchema) + .memoryThreshold(1) + .build()) { + Tablet tablet1 = + new Tablet( + tableName, + Collections.singletonList(fieldName), + Collections.singletonList(TSDataType.INT64), + Collections.singletonList(ColumnCategory.FIELD), + chunk1Rows); + for (int i = 0; i < chunk1Rows; i++) { + tablet1.addTimestamp(i, i); + tablet1.addValue(fieldName, i, (long) i); + } + writer.write(tablet1); + + Tablet tablet2 = + new Tablet( + tableName, + Collections.singletonList(fieldName), + Collections.singletonList(TSDataType.INT64), + Collections.singletonList(ColumnCategory.FIELD), + chunk2Rows); + for (int i = 0; i < chunk2Rows; i++) { + tablet2.addTimestamp(i, chunk1Rows + i); + tablet2.addValue(fieldName, i, (long) (chunk1Rows + i)); + } + writer.write(tablet2); + } + } +} diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTreeReaderRowQueryTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTreeReaderRowQueryTest.java new file mode 100644 index 000000000..e6a9b4850 --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTreeReaderRowQueryTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.v4.ITsFileTreeReader; +import org.apache.tsfile.read.v4.TsFileTreeReaderBuilder; +import org.apache.tsfile.write.record.TSRecord; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.v4.TsFileTreeWriter; +import org.apache.tsfile.write.v4.TsFileTreeWriterBuilder; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Unit tests for {@link ITsFileTreeReader#queryByRow} (tree-model row-range query). + * + *

Each device has {@value #TOTAL} rows with timestamps 0..TOTAL-1 and values timestamp * 10. + */ +public class TsFileTreeReaderRowQueryTest { + + private static final String FILE_PATH = "test_tree_reader_row_query.tsfile"; + private static final int TOTAL = 50; + private static final String DEVICE = "device"; + private static final String MEA = "s1"; + + private ITsFileTreeReader reader; + + @Before + public void setUp() throws IOException, WriteProcessException { + writeTreeFile(FILE_PATH, Collections.singletonList(DEVICE), MEA, TOTAL); + reader = new TsFileTreeReaderBuilder().file(new File(FILE_PATH)).build(); + } + + @After + public void tearDown() throws IOException { + if (reader != null) { + reader.close(); + } + new File(FILE_PATH).delete(); + } + + // ① limit=0 → empty result + @Test + public void testLimitZeroReturnsEmpty() throws IOException { + Assert.assertEquals(0, countRows(reader, DEVICE, MEA, 0, 0)); + } + + // ② limit < total → exactly `limit` rows + @Test + public void testLimitLessThanTotal() throws IOException { + Assert.assertEquals(20, countRows(reader, DEVICE, MEA, 0, 20)); + } + + // ③ limit > total → all rows + @Test + public void testLimitExceedsTotal() throws IOException { + Assert.assertEquals(TOTAL, countRows(reader, DEVICE, MEA, 0, 1000)); + } + + // ④ limit=-1 → unlimited, returns all rows + @Test + public void testNegativeLimitMeansUnlimited() throws IOException { + Assert.assertEquals(TOTAL, countRows(reader, DEVICE, MEA, 0, -1)); + } + + // ⑤ offset + limit in the middle + @Test + public void testOffsetPlusLimit() throws IOException { + Assert.assertEquals(15, countRows(reader, DEVICE, MEA, 10, 15)); + } + + // ⑥ offset >= total → empty result + @Test + public void testOffsetBeyondTotal() throws IOException { + Assert.assertEquals(0, countRows(reader, DEVICE, MEA, 1000, 10)); + } + + // ⑦ offset + limit > total → return remaining rows from offset + @Test + public void testOffsetPlusLimitExceedsTotal() throws IOException { + // offset=40, limit=20 → only 10 rows remain + Assert.assertEquals(10, countRows(reader, DEVICE, MEA, 40, 20)); + } + + // ⑧ data correctness: timestamps and values start from `offset` + @Test + public void testDataCorrectness() throws IOException { + List deviceIds = Collections.singletonList(DEVICE); + List measurements = Collections.singletonList(MEA); + ResultSet rs = reader.queryByRow(deviceIds, measurements, 5, 10); + int count = 0; + while (rs.next()) { + long ts = rs.getLong(1); // column 1 = Time + long val = rs.getLong(2); // column 2 = measurement + Assert.assertEquals(5 + count, ts); + Assert.assertEquals((5 + count) * 10L, val); + count++; + } + rs.close(); + Assert.assertEquals(10, count); + } + + // ⑨ metadata is accessible via the result set + @Test + public void testMetadataAccessible() throws IOException { + List deviceIds = Collections.singletonList(DEVICE); + List measurements = Collections.singletonList(MEA); + ResultSet rs = reader.queryByRow(deviceIds, measurements, 0, 5); + Assert.assertNotNull(rs.getMetadata()); + // Column 1 is always "Time"; column 2 is the full path "device.s1" + Assert.assertEquals("Time", rs.getMetadata().getColumnName(1)); + Assert.assertEquals(DEVICE + "." + MEA, rs.getMetadata().getColumnName(2)); + rs.close(); + } + + // ⑩ paging consistency: two pages together equal the full result + @Test + public void testPaginationConsistency() throws IOException { + int page1 = countRows(reader, DEVICE, MEA, 0, 25); + int page2 = countRows(reader, DEVICE, MEA, 25, 25); + Assert.assertEquals(TOTAL, page1 + page2); + } + + // ⑪ multiple devices: offset/limit applied to merged result + @Test + public void testMultipleDevices() throws IOException, WriteProcessException { + String filePath = "test_tree_reader_row_query_multi.tsfile"; + writeTreeFile(filePath, Arrays.asList("dev1", "dev2"), MEA, 20); + try (ITsFileTreeReader r = new TsFileTreeReaderBuilder().file(new File(filePath)).build()) { + int count = countRows(r, null, MEA, 5, 10); + Assert.assertEquals(10, count); + } finally { + new File(filePath).delete(); + } + } + + // ───────────────────────────────────────────────────────────────────────────── + // Helpers + // ───────────────────────────────────────────────────────────────────────────── + + /** + * Count rows returned by {@code queryByRow}. + * + * @param r reader (already open) + * @param device device ID; if {@code null}, both "dev1" and "dev2" are queried + * @param mea measurement name + * @param offset row offset + * @param limit row limit + */ + private int countRows(ITsFileTreeReader r, String device, String mea, int offset, int limit) + throws IOException { + List deviceIds = + device != null ? Collections.singletonList(device) : Arrays.asList("dev1", "dev2"); + List measurements = Collections.singletonList(mea); + ResultSet rs = r.queryByRow(deviceIds, measurements, offset, limit); + int count = 0; + while (rs.next()) { + count++; + } + rs.close(); + return count; + } + + /** + * Write a tree-model TsFile with the given devices and measurement. Timestamps are 0..numRows-1, + * values are timestamp * 10 (INT64). + */ + private static void writeTreeFile( + String filePath, List deviceIds, String measurement, int numRows) + throws IOException, WriteProcessException { + File file = new File(filePath); + MeasurementSchema schema = new MeasurementSchema(measurement, TSDataType.INT64); + try (TsFileTreeWriter writer = new TsFileTreeWriterBuilder().file(file).build()) { + for (String deviceId : deviceIds) { + writer.registerTimeseries(deviceId, schema); + } + for (String deviceId : deviceIds) { + for (int i = 0; i < numRows; i++) { + TSRecord record = new TSRecord(deviceId, i); + record.addPoint(measurement, (long) i * 10); + writer.write(record); + } + } + } + } +} diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 9c65fb26f..c394c09bb 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -189,6 +189,20 @@ cdef extern from "./tsfile_cwrapper.h": char ** sensor_name, uint32_t sensor_num, int64_t start_time, int64_t end_time, ErrorCode *err_code) + ResultSet tsfile_reader_query_tree_by_row(TsFileReader reader, + char** device_ids, int device_ids_len, + char** measurement_names, + int measurement_names_len, + int offset, int limit, + ErrorCode* err_code) + + ResultSet tsfile_reader_query_table_by_row(TsFileReader reader, + const char* table_name, + char** column_names, + int column_names_len, + int offset, int limit, + ErrorCode* err_code) + TableSchema tsfile_reader_get_table_schema(TsFileReader reader, const char * table_name); diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd index 2389aa9a6..16540effa 100644 --- a/python/tsfile/tsfile_py_cpp.pxd +++ b/python/tsfile/tsfile_py_cpp.pxd @@ -54,6 +54,10 @@ cdef public api ResultSet tsfile_reader_query_table_on_tree_c(TsFileReader reade int64_t start_time, int64_t end_time) cdef public api ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_name, object sensor_list, int64_t start_time, int64_t end_time) +cdef public api ResultSet tsfile_reader_query_tree_by_row_c(TsFileReader reader, object device_ids, + object measurement_names, int offset, int limit) +cdef public api ResultSet tsfile_reader_query_table_by_row_c(TsFileReader reader, object table_name, + object column_names, int offset, int limit) cdef public api object get_table_schema(TsFileReader reader, object table_name) cdef public api object get_all_table_schema(TsFileReader reader) cdef public api object get_all_timeseries_schema(TsFileReader reader) diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index 3ca79a2a1..179d95c25 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -779,6 +779,81 @@ cdef ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_na free( sensor_list_c) sensor_list_c = NULL +cdef ResultSet tsfile_reader_query_tree_by_row_c( + TsFileReader reader, object device_ids, object measurement_names, + int offset, int limit): + cdef int dev_num = len(device_ids) + cdef int mea_num = len(measurement_names) + cdef char** c_devs = malloc(sizeof(char*) * dev_num) + cdef char** c_meas = malloc(sizeof(char*) * mea_num) + cdef ErrorCode code = 0 + cdef int i + if c_devs == NULL or c_meas == NULL: + if c_devs != NULL: + free( c_devs) + if c_meas != NULL: + free( c_meas) + raise MemoryError("Failed to allocate memory for query arrays") + for i in range(dev_num): + c_devs[i] = NULL + for i in range(mea_num): + c_meas[i] = NULL + try: + for i in range(dev_num): + c_devs[i] = strdup(( device_ids[i]).encode('utf-8')) + if c_devs[i] == NULL: + raise MemoryError("Failed to allocate memory for device id") + for i in range(mea_num): + c_meas[i] = strdup(( measurement_names[i]).encode('utf-8')) + if c_meas[i] == NULL: + raise MemoryError("Failed to allocate memory for measurement name") + result = tsfile_reader_query_tree_by_row( + reader, c_devs, dev_num, c_meas, mea_num, offset, limit, &code) + check_error(code) + return result + finally: + for i in range(dev_num): + if c_devs[i] != NULL: + free( c_devs[i]) + c_devs[i] = NULL + free( c_devs) + for i in range(mea_num): + if c_meas[i] != NULL: + free( c_meas[i]) + c_meas[i] = NULL + free( c_meas) + + +cdef ResultSet tsfile_reader_query_table_by_row_c( + TsFileReader reader, object table_name, object column_names, + int offset, int limit): + cdef int col_num = len(column_names) + cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name) + cdef const char* table_name_c = table_name_bytes + cdef char** c_cols = malloc(sizeof(char*) * col_num) + cdef ErrorCode code = 0 + cdef int i + if c_cols == NULL: + raise MemoryError("Failed to allocate memory for column names") + for i in range(col_num): + c_cols[i] = NULL + try: + for i in range(col_num): + c_cols[i] = strdup(( column_names[i]).encode('utf-8')) + if c_cols[i] == NULL: + raise MemoryError("Failed to allocate memory for column name") + result = tsfile_reader_query_table_by_row( + reader, table_name_c, c_cols, col_num, offset, limit, &code) + check_error(code) + return result + finally: + for i in range(col_num): + if c_cols[i] != NULL: + free( c_cols[i]) + c_cols[i] = NULL + free( c_cols) + + cdef object get_table_schema(TsFileReader reader, object table_name): cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name) cdef const char * table_name_c = table_name_bytes diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index 4476d24dc..637444571 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -320,6 +320,54 @@ cdef class TsFileReaderPy: self.activate_result_set_list.add(pyresult) return pyresult + def query_tree_by_row(self, device_ids: List[str], measurement_names: List[str], + offset: int, limit: int) -> ResultSetPy: + """ + Query tree model data by row range. + + Internally queries the full time range and applies offset/limit at the + result-set level. Once ``limit`` rows are returned, no further data is + loaded from storage. + + :param device_ids: List of device identifiers to query. + :param measurement_names: List of measurement names to query. + :param offset: Number of leading rows to skip (>= 0). + :param limit: Maximum number of rows to return. < 0 means unlimited. + :return: ResultSet containing query results. + """ + cdef ResultSet result + result = tsfile_reader_query_tree_by_row_c( + self.reader, device_ids, measurement_names, offset, limit) + pyresult = ResultSetPy(self, True) + pyresult.init_c(result, device_ids[0] if device_ids else "") + self.activate_result_set_list.add(pyresult) + return pyresult + + def query_table_by_row(self, table_name: str, column_names: List[str], + offset: int, limit: int) -> ResultSetPy: + """ + Query table model data by row range. + + Internally queries the full time range and applies offset/limit at the + result-set level. Once ``limit`` rows are returned, no further data is + loaded from storage. + + :param table_name: Target table name. + :param column_names: List of column names to query. + :param offset: Number of leading rows to skip (>= 0). + :param limit: Maximum number of rows to return. < 0 means unlimited. + :return: ResultSet containing query results. + """ + cdef ResultSet result + result = tsfile_reader_query_table_by_row_c( + self.reader, table_name.lower(), + [column_name.lower() for column_name in column_names], + offset, limit) + pyresult = ResultSetPy(self) + pyresult.init_c(result, table_name) + self.activate_result_set_list.add(pyresult) + return pyresult + def notify_result_set_discard(self, result_set: ResultSetPy): """ Remove activate result set from activate_result_set_list, called when a result set close.