diff --git a/cpp/src/common/tsblock/tsblock.h b/cpp/src/common/tsblock/tsblock.h index 859ad393d..2082fea79 100644 --- a/cpp/src/common/tsblock/tsblock.h +++ b/cpp/src/common/tsblock/tsblock.h @@ -60,6 +60,11 @@ class TsBlock { FORCE_INLINE uint32_t get_row_count() const { return row_count_; } + FORCE_INLINE void set_row_count(uint32_t count) { + ASSERT(count <= row_count_); + row_count_ = count; + } + FORCE_INLINE TupleDesc* get_tuple_desc() const { return tuple_desc_; } FORCE_INLINE Vector* get_vector(uint32_t index) { return vectors_[index]; } diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index fbcf4e6f1..1bafd80aa 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -24,10 +24,12 @@ #include #include +#include #include #include "common/tablet.h" #include "reader/result_set.h" +#include "reader/row_range_result_set.h" #include "reader/tsfile_reader.h" #include "writer/tsfile_writer.h" @@ -770,6 +772,44 @@ ERRNO _tsfile_writer_flush(TsFileWriter writer) { return w->flush(); } +ResultSet tsfile_reader_query_tree_by_row(TsFileReader reader, + char** device_ids, int device_ids_len, + char** measurement_names, + int measurement_names_len, int offset, + int limit, ERRNO* err_code) { + auto* r = static_cast(reader); + std::vector path_list; + path_list.reserve(device_ids_len * measurement_names_len); + for (int i = 0; i < device_ids_len; i++) { + for (int j = 0; j < measurement_names_len; j++) { + path_list.push_back(std::string(device_ids[i]) + "." + + std::string(measurement_names[j])); + } + } + storage::ResultSet* inner = nullptr; + *err_code = r->query(path_list, INT64_MIN, INT64_MAX, inner); + if (*err_code != common::E_OK) return nullptr; + return new storage::RowRangeResultSet(inner, offset, limit); +} + +ResultSet tsfile_reader_query_table_by_row(TsFileReader reader, + const char* table_name, + char** column_names, + int column_names_len, int offset, + int limit, ERRNO* err_code) { + auto* r = static_cast(reader); + std::vector cols; + cols.reserve(column_names_len); + for (int i = 0; i < column_names_len; i++) { + cols.emplace_back(column_names[i]); + } + storage::ResultSet* inner = nullptr; + *err_code = + r->query(std::string(table_name), cols, INT64_MIN, INT64_MAX, inner); + if (*err_code != common::E_OK) return nullptr; + return new storage::RowRangeResultSet(inner, offset, limit); +} + ResultSet _tsfile_reader_query_device(TsFileReader reader, const char* device_name, char** sensor_name, uint32_t sensor_num, diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 643b4e52b..2f317fb90 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -444,6 +444,52 @@ ResultSet tsfile_query_table(TsFileReader reader, const char* table_name, ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, uint32_t column_num, Timestamp start_time, Timestamp end_time, ERRNO* err_code); + +/** + * @brief Query tree model time series data by row range (offset + limit). + * + * Combines all device/measurement paths, queries the full time range, then + * skips the first `offset` rows and returns at most `limit` rows. Once + * `limit` rows are returned, underlying data loading stops immediately. + * + * @param reader [in] Valid TsFileReader handle. + * @param device_ids [in] Array of device IDs. + * @param device_ids_len [in] Number of device IDs. + * @param measurement_names [in] Array of measurement names. + * @param measurement_names_len [in] Number of measurement names. + * @param offset [in] Rows to skip (>= 0). + * @param limit [in] Max rows to return (< 0 = unlimited). + * @param err_code [out] Error code. + * @return ResultSet handle. Must be freed with free_tsfile_result_set(). + */ +ResultSet tsfile_reader_query_tree_by_row(TsFileReader reader, + char** device_ids, int device_ids_len, + char** measurement_names, + int measurement_names_len, int offset, + int limit, ERRNO* err_code); + +/** + * @brief Query table model data by row range (offset + limit). + * + * Queries the full time range, skips the first `offset` rows, and returns at + * most `limit` rows. Once `limit` rows are returned, underlying data loading + * stops immediately. + * + * @param reader [in] Valid TsFileReader handle. + * @param table_name [in] Target table name. + * @param column_names [in] Array of column names. + * @param column_names_len [in] Number of column names. + * @param offset [in] Rows to skip (>= 0). + * @param limit [in] Max rows to return (< 0 = unlimited). + * @param err_code [out] Error code. + * @return ResultSet handle. Must be freed with free_tsfile_result_set(). + */ +ResultSet tsfile_reader_query_table_by_row(TsFileReader reader, + const char* table_name, + char** column_names, + int column_names_len, int offset, + int limit, ERRNO* err_code); + // ResultSet tsfile_reader_query_device(TsFileReader reader, // const char* device_name, // char** sensor_name, uint32_t sensor_num, diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc index 405c55534..ab84f57d4 100644 --- a/cpp/src/file/tsfile_io_reader.cc +++ b/cpp/src/file/tsfile_io_reader.cc @@ -58,7 +58,8 @@ void TsFileIOReader::reset() { int TsFileIOReader::alloc_ssi(std::shared_ptr device_id, const std::string& measurement_name, TsFileSeriesScanIterator*& ssi, - common::PageArena& pa, Filter* time_filter) { + common::PageArena& pa, Filter* time_filter, + int32_t offset, int32_t limit) { int ret = E_OK; if (RET_FAIL(load_tsfile_meta_if_necessary())) { } else { @@ -69,7 +70,12 @@ int TsFileIOReader::alloc_ssi(std::shared_ptr device_id, } else if (time_filter != nullptr && !filter_stasify(ssi->itimeseries_index_, time_filter)) { ret = E_NO_MORE_DATA; - } else if (RET_FAIL(ssi->init_chunk_reader())) { + } else { + if (offset > 0 || limit >= 0) { + ssi->set_row_range(offset, limit); + } + if (RET_FAIL(ssi->init_chunk_reader())) { + } } if (ret != E_OK) { ssi->destroy(); diff --git a/cpp/src/file/tsfile_io_reader.h b/cpp/src/file/tsfile_io_reader.h index 19bcfea0b..97c181c92 100644 --- a/cpp/src/file/tsfile_io_reader.h +++ b/cpp/src/file/tsfile_io_reader.h @@ -57,7 +57,8 @@ class TsFileIOReader { int alloc_ssi(std::shared_ptr device_id, const std::string& measurement_name, TsFileSeriesScanIterator*& ssi, common::PageArena& pa, - Filter* time_filter = nullptr); + Filter* time_filter = nullptr, int32_t offset = 0, + int32_t limit = -1); void revert_ssi(TsFileSeriesScanIterator* ssi); diff --git a/cpp/src/reader/aligned_chunk_reader.cc b/cpp/src/reader/aligned_chunk_reader.cc index 14250e7f8..12942b796 100644 --- a/cpp/src/reader/aligned_chunk_reader.cc +++ b/cpp/src/reader/aligned_chunk_reader.cc @@ -243,6 +243,41 @@ int AlignedChunkReader::get_next_page(TsBlock* ret_tsblock, return ret; } +int AlignedChunkReader::skip_pages(int32_t rows_to_skip, + int32_t& rows_skipped) { + int ret = E_OK; + rows_skipped = 0; + if (chunk_has_only_one_page(time_chunk_header_)) { + return ret; + } + while (rows_to_skip > 0 && has_more_data() && + !prev_time_page_not_finish() && !prev_value_page_not_finish()) { + if (RET_FAIL(get_cur_page_header( + time_chunk_meta_, time_in_stream_, cur_time_page_header_, + time_chunk_visit_offset_, time_chunk_header_))) { + break; + } + if (RET_FAIL(get_cur_page_header( + value_chunk_meta_, value_in_stream_, cur_value_page_header_, + value_chunk_visit_offset_, value_chunk_header_))) { + break; + } + int32_t page_count = cur_time_page_header_.statistic_ != nullptr + ? cur_time_page_header_.statistic_->get_count() + : 0; + if (page_count > 0 && rows_to_skip >= page_count) { + if (RET_FAIL(skip_cur_page())) { + break; + } + rows_to_skip -= page_count; + rows_skipped += page_count; + } else { + break; + } + } + return ret; +} + int AlignedChunkReader::get_cur_page_header(ChunkMeta*& chunk_meta, common::ByteStream& in_stream, PageHeader& cur_page_header, diff --git a/cpp/src/reader/aligned_chunk_reader.h b/cpp/src/reader/aligned_chunk_reader.h index aefb7bc58..6190a630c 100644 --- a/cpp/src/reader/aligned_chunk_reader.h +++ b/cpp/src/reader/aligned_chunk_reader.h @@ -78,6 +78,7 @@ class AlignedChunkReader : public IChunkReader { int get_next_page(common::TsBlock* tsblock, Filter* oneshoot_filter, common::PageArena& pa) override; + int skip_pages(int32_t rows_to_skip, int32_t& rows_skipped) override; private: FORCE_INLINE bool chunk_has_only_one_page( diff --git a/cpp/src/reader/block/device_ordered_tsblock_reader.cc b/cpp/src/reader/block/device_ordered_tsblock_reader.cc index 6fb540954..252ff8ea3 100644 --- a/cpp/src/reader/block/device_ordered_tsblock_reader.cc +++ b/cpp/src/reader/block/device_ordered_tsblock_reader.cc @@ -23,6 +23,10 @@ namespace storage { int DeviceOrderedTsBlockReader::has_next(bool& has_next) { int ret = common::E_OK; + if (remaining_limit_ == 0) { + has_next = false; + return common::E_OK; + } if (current_reader_ != nullptr && IS_SUCC(current_reader_->has_next(has_next)) && has_next) { return common::E_OK; @@ -47,22 +51,24 @@ int DeviceOrderedTsBlockReader::has_next(bool& has_next) { return common::E_OOM; } if (RET_FAIL(current_reader_->init(task, block_size_, time_filter_, - field_filter_))) { + field_filter_, remaining_offset_, + remaining_limit_))) { delete current_reader_; current_reader_ = nullptr; return ret; } + // Update remaining offset/limit from what SingleDeviceTsBlockReader + // consumed (for aligned data, SSIs handle it; retrieve remaining) + remaining_offset_ = current_reader_->get_remaining_offset(); + remaining_limit_ = current_reader_->get_remaining_limit(); if (RET_FAIL(current_reader_->has_next(has_next))) { return ret; } - // If current device has data, just return. if (has_next) { return ret; } - // If current device does not have data, get next device. - // Free current device reader. if (current_reader_) { delete current_reader_; current_reader_ = nullptr; diff --git a/cpp/src/reader/block/device_ordered_tsblock_reader.h b/cpp/src/reader/block/device_ordered_tsblock_reader.h index b00d751a1..a0b82483c 100644 --- a/cpp/src/reader/block/device_ordered_tsblock_reader.h +++ b/cpp/src/reader/block/device_ordered_tsblock_reader.h @@ -33,13 +33,15 @@ class DeviceOrderedTsBlockReader : public TsBlockReader { std::unique_ptr device_task_iterator, IMetadataQuerier* metadata_querier, int32_t block_size, TsFileIOReader* tsfile_io_reader, Filter* time_filter, - Filter* field_filter) + Filter* field_filter, int32_t offset = 0, int32_t limit = -1) : device_task_iterator_(std::move(device_task_iterator)), metadata_querier_(metadata_querier), block_size_(block_size), tsfile_io_reader_(tsfile_io_reader), time_filter_(time_filter), - field_filter_(field_filter) {} + field_filter_(field_filter), + remaining_offset_(offset), + remaining_limit_(limit) {} ~DeviceOrderedTsBlockReader() override { close(); } int has_next(bool& has_next) override; @@ -54,6 +56,8 @@ class DeviceOrderedTsBlockReader : public TsBlockReader { TsFileIOReader* tsfile_io_reader_; Filter* time_filter_ = nullptr; Filter* field_filter_ = nullptr; + int32_t remaining_offset_; + int32_t remaining_limit_; }; } // namespace storage diff --git a/cpp/src/reader/block/single_device_tsblock_reader.cc b/cpp/src/reader/block/single_device_tsblock_reader.cc index 836ab6956..aca2ceafc 100644 --- a/cpp/src/reader/block/single_device_tsblock_reader.cc +++ b/cpp/src/reader/block/single_device_tsblock_reader.cc @@ -33,8 +33,11 @@ SingleDeviceTsBlockReader::SingleDeviceTsBlockReader( int SingleDeviceTsBlockReader::init(DeviceQueryTask* device_query_task, uint32_t block_size, Filter* time_filter, - Filter* field_filter) { + Filter* field_filter, int32_t offset, + int32_t limit) { int ret = common::E_OK; + remaining_offset_ = offset; + remaining_limit_ = limit; pa_.init(512, common::AllocModID::MOD_TSFILE_READER); tuple_desc_.reset(); auto table_schema = device_query_task->get_table_schema(); @@ -67,8 +70,19 @@ int SingleDeviceTsBlockReader::init(DeviceQueryTask* device_query_task, time_series_indexs, pa_))) { return ret; } + // Check if all timeseries are aligned (VECTOR type) + all_aligned_ = true; + for (const auto& ts_index : time_series_indexs) { + if (ts_index != nullptr && + ts_index->get_data_type() != common::VECTOR) { + all_aligned_ = false; + break; + } + } + // Handle offset/limit at row level in has_next() for correctness. + // SSI-level skip has partial-page offset bugs, so keep offset/limit here. for (const auto& time_series_index : time_series_indexs) { - construct_column_context(time_series_index, time_filter); + construct_column_context(time_series_index, time_filter, 0, -1); } // There is no data in this single device tsblock reader. @@ -102,6 +116,11 @@ int SingleDeviceTsBlockReader::has_next(bool& has_next) { return common::E_OK; } + if (remaining_limit_ == 0) { + has_next = false; + return common::E_OK; + } + for (auto col_appender : col_appenders_) { col_appender->reset(); } @@ -113,6 +132,9 @@ int SingleDeviceTsBlockReader::has_next(bool& has_next) { std::vector min_time_columns; while (current_block_->get_row_count() < block_size_) { + if (remaining_limit_ == 0) { + break; + } for (auto& column_context : field_column_contexts_) { int64_t time; if (IS_FAIL(column_context.second->get_current_time(time))) { @@ -127,6 +149,24 @@ int SingleDeviceTsBlockReader::has_next(bool& has_next) { min_time_columns.push_back(column_context.second); } } + + // Skip rows covered by offset + if (remaining_offset_ > 0) { + for (auto& column_context : min_time_columns) { + if (IS_FAIL(advance_column(column_context))) { + break; + } + } + remaining_offset_--; + next_time_set = false; + next_time_ = -1; + min_time_columns.clear(); + if (field_column_contexts_.empty()) { + break; + } + continue; + } + if (IS_FAIL(fill_measurements(min_time_columns))) { has_next = false; return common::E_OK; @@ -135,6 +175,11 @@ int SingleDeviceTsBlockReader::has_next(bool& has_next) { next_time_ = -1; } + // Decrement remaining limit + if (remaining_limit_ > 0) { + remaining_limit_--; + } + if (field_column_contexts_.empty()) { break; } @@ -283,7 +328,8 @@ void SingleDeviceTsBlockReader::close() { } int SingleDeviceTsBlockReader::construct_column_context( - const ITimeseriesIndex* time_series_index, Filter* time_filter) { + const ITimeseriesIndex* time_series_index, Filter* time_filter, + int32_t offset, int32_t limit) { int ret = common::E_OK; if (time_series_index == nullptr || (time_series_index->get_data_type() != common::TSDataType::VECTOR && @@ -294,17 +340,13 @@ int SingleDeviceTsBlockReader::construct_column_context( if (aligned_time_series_index == nullptr) { assert(false); } - // Todo: when multi value index is supported in aligned time series - // index, we need to change the column context to - // VectorMeasurementColumnContext SingleMeasurementColumnContext* column_context = new SingleMeasurementColumnContext(tsfile_io_reader_); - // May no more data. just return to avoid null pointer. if (RET_FAIL(column_context->init( device_query_task_, time_series_index, time_filter, device_query_task_->get_column_mapping()->get_column_pos( time_series_index->get_measurement_name().to_std_string()), - pa_))) { + pa_, offset, limit))) { delete column_context; return ret; } @@ -318,7 +360,7 @@ int SingleDeviceTsBlockReader::construct_column_context( device_query_task_, time_series_index, time_filter, device_query_task_->get_column_mapping()->get_column_pos( time_series_index->get_measurement_name().to_std_string()), - pa_))) { + pa_, offset, limit))) { delete column_context; return ret; } @@ -333,14 +375,15 @@ int SingleDeviceTsBlockReader::construct_column_context( int SingleMeasurementColumnContext::init( DeviceQueryTask* device_query_task, const ITimeseriesIndex* time_series_index, Filter* time_filter, - const std::vector& pos_in_result, common::PageArena& pa) { + const std::vector& pos_in_result, common::PageArena& pa, + int32_t offset, int32_t limit) { int ret = common::E_OK; pos_in_result_ = pos_in_result; column_name_ = time_series_index->get_measurement_name().to_std_string(); if (RET_FAIL(tsfile_io_reader_->alloc_ssi( device_query_task->get_device_id(), time_series_index->get_measurement_name().to_std_string(), ssi_, pa, - time_filter))) { + time_filter, offset, limit))) { } else if (RET_FAIL(get_next_tsblock(true))) { } return ret; diff --git a/cpp/src/reader/block/single_device_tsblock_reader.h b/cpp/src/reader/block/single_device_tsblock_reader.h index 46ac8c417..57a6701d2 100644 --- a/cpp/src/reader/block/single_device_tsblock_reader.h +++ b/cpp/src/reader/block/single_device_tsblock_reader.h @@ -43,12 +43,16 @@ class SingleDeviceTsBlockReader : public TsBlockReader { int has_next(bool& has_next) override; int next(common::TsBlock*& ret_block) override; int init(DeviceQueryTask* device_query_task, uint32_t block_size, - Filter* time_filter, Filter* field_filter); + Filter* time_filter, Filter* field_filter, int32_t offset = 0, + int32_t limit = -1); void close() override; + int32_t get_remaining_offset() const { return remaining_offset_; } + int32_t get_remaining_limit() const { return remaining_limit_; } private: int construct_column_context(const ITimeseriesIndex* time_series_index, - Filter* time_filter); + Filter* time_filter, int32_t offset = 0, + int32_t limit = -1); int fill_measurements( std::vector& column_contexts); int fill_ids(); @@ -68,6 +72,9 @@ class SingleDeviceTsBlockReader : public TsBlockReader { int64_t time_column_index_ = 0; TsFileIOReader* tsfile_io_reader_; common::PageArena pa_; + int32_t remaining_offset_ = 0; + int32_t remaining_limit_ = -1; + bool all_aligned_ = true; }; class MeasurementColumnContext { @@ -124,7 +131,8 @@ class SingleMeasurementColumnContext final : public MeasurementColumnContext { column_context_map) override; int init(DeviceQueryTask* device_query_task, const ITimeseriesIndex* time_series_index, Filter* time_filter, - const std::vector& pos_in_result, common::PageArena& pa); + const std::vector& pos_in_result, common::PageArena& pa, + int32_t offset = 0, int32_t limit = -1); int get_next_tsblock(bool alloc_mem) override; int get_current_time(int64_t& time) override; int get_current_value(char*& value, uint32_t& len) override; diff --git a/cpp/src/reader/chunk_reader.cc b/cpp/src/reader/chunk_reader.cc index 1b3160b72..dce88092d 100644 --- a/cpp/src/reader/chunk_reader.cc +++ b/cpp/src/reader/chunk_reader.cc @@ -188,6 +188,32 @@ int ChunkReader::get_next_page(TsBlock* ret_tsblock, Filter* oneshoot_filter, return ret; } +int ChunkReader::skip_pages(int32_t rows_to_skip, int32_t& rows_skipped) { + int ret = E_OK; + rows_skipped = 0; + if (chunk_has_only_one_page()) { + return ret; + } + while (rows_to_skip > 0 && has_more_data() && !prev_page_not_finish()) { + if (RET_FAIL(get_cur_page_header())) { + break; + } + int32_t page_count = cur_page_header_.statistic_ != nullptr + ? cur_page_header_.statistic_->get_count() + : 0; + if (page_count > 0 && rows_to_skip >= page_count) { + if (RET_FAIL(skip_cur_page())) { + break; + } + rows_to_skip -= page_count; + rows_skipped += page_count; + } else { + break; + } + } + return ret; +} + int ChunkReader::get_cur_page_header() { int ret = E_OK; bool retry = true; diff --git a/cpp/src/reader/chunk_reader.h b/cpp/src/reader/chunk_reader.h index 52c7f7a59..fce7274aa 100644 --- a/cpp/src/reader/chunk_reader.h +++ b/cpp/src/reader/chunk_reader.h @@ -69,6 +69,7 @@ class ChunkReader : public IChunkReader { int get_next_page(common::TsBlock* tsblock, Filter* oneshoot_filter, common::PageArena& pa) override; + int skip_pages(int32_t rows_to_skip, int32_t& rows_skipped) override; private: FORCE_INLINE bool chunk_has_only_one_page() const { diff --git a/cpp/src/reader/ichunk_reader.h b/cpp/src/reader/ichunk_reader.h index ffb841fab..25af0dab6 100644 --- a/cpp/src/reader/ichunk_reader.h +++ b/cpp/src/reader/ichunk_reader.h @@ -52,6 +52,11 @@ class IChunkReader { return common::E_OK; } + virtual int skip_pages(int32_t rows_to_skip, int32_t& rows_skipped) { + rows_skipped = 0; + return common::E_OK; + } + virtual ChunkHeader& get_chunk_header() { return chunk_header_; } protected: diff --git a/cpp/src/reader/row_range_result_set.cc b/cpp/src/reader/row_range_result_set.cc new file mode 100644 index 000000000..9d8b8823d --- /dev/null +++ b/cpp/src/reader/row_range_result_set.cc @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "reader/row_range_result_set.h" + +namespace storage { + +RowRangeResultSet::RowRangeResultSet(ResultSet* inner, int offset, int limit) + : inner_(inner), + offset_(offset < 0 ? 0 : offset), + limit_(limit), + returned_count_(0), + offset_skipped_(false) {} + +RowRangeResultSet::~RowRangeResultSet() { close(); } + +int RowRangeResultSet::next(bool& has_next) { + // ① Skip the first `offset_` rows on the first call. + if (!offset_skipped_) { + for (int i = 0; i < offset_; i++) { + int ret = inner_->next(has_next); + if (ret != common::E_OK) return ret; + if (!has_next) { + has_next = false; + offset_skipped_ = true; + return common::E_OK; + } + } + offset_skipped_ = true; + } + + // ② Limit reached: return immediately without touching inner ResultSet. + // This is the key "pushdown" effect: no further chunk/page loading + // occurs. + if (limit_ >= 0 && returned_count_ >= limit_) { + has_next = false; + return common::E_OK; + } + + // ③ Normal delegation. + int ret = inner_->next(has_next); + if (ret == common::E_OK && has_next) { + returned_count_++; + } + return ret; +} + +bool RowRangeResultSet::is_null(const std::string& column_name) { + return inner_->is_null(column_name); +} + +bool RowRangeResultSet::is_null(uint32_t column_index) { + return inner_->is_null(column_index); +} + +RowRecord* RowRangeResultSet::get_row_record() { + return inner_->get_row_record(); +} + +std::shared_ptr RowRangeResultSet::get_metadata() { + return inner_->get_metadata(); +} + +void RowRangeResultSet::close() { + if (inner_ != nullptr) { + delete inner_; + inner_ = nullptr; + } +} + +} // namespace storage diff --git a/cpp/src/reader/row_range_result_set.h b/cpp/src/reader/row_range_result_set.h new file mode 100644 index 000000000..a5f0b97c4 --- /dev/null +++ b/cpp/src/reader/row_range_result_set.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef READER_ROW_RANGE_RESULT_SET_H +#define READER_ROW_RANGE_RESULT_SET_H + +#include +#include + +#include "reader/result_set.h" + +namespace storage { + +/** + * @brief A ResultSet wrapper that applies row-level offset and limit. + * + * Takes ownership of the inner ResultSet and releases it on close(). + * Once the limit is reached, next() returns has_next=false immediately + * without calling the underlying ResultSet, avoiding unnecessary data loading. + * + * @param offset Number of leading rows to skip (must be >= 0). + * @param limit Maximum number of rows to return. A value < 0 means + * no limit (all remaining rows are returned). + */ +class RowRangeResultSet : public ResultSet { + public: + RowRangeResultSet(ResultSet* inner, int offset, int limit); + ~RowRangeResultSet() override; + + int next(bool& has_next) override; + bool is_null(const std::string& column_name) override; + bool is_null(uint32_t column_index) override; + RowRecord* get_row_record() override; + std::shared_ptr get_metadata() override; + void close() override; + + private: + ResultSet* inner_; + int offset_; + int limit_; + int returned_count_; + bool offset_skipped_; +}; + +} // namespace storage +#endif // READER_ROW_RANGE_RESULT_SET_H diff --git a/cpp/src/reader/table_query_executor.cc b/cpp/src/reader/table_query_executor.cc index 2a01a6d5c..a23f4d25f 100644 --- a/cpp/src/reader/table_query_executor.cc +++ b/cpp/src/reader/table_query_executor.cc @@ -88,6 +88,73 @@ int TableQueryExecutor::query(const std::string& table_name, return ret; } +int TableQueryExecutor::query(const std::string& table_name, + const std::vector& columns, + Filter* time_filter, Filter* id_filter, + Filter* field_filter, int32_t offset, + int32_t limit, ResultSet*& ret_qds) { + int ret = common::E_OK; + TsFileMeta* file_metadata = nullptr; + file_metadata = tsfile_io_reader_->get_tsfile_meta(); + common::PageArena pa; + pa.init(512, common::MOD_TSFILE_READER); + MetaIndexNode* table_root = nullptr; + std::shared_ptr table_schema; + if (RET_FAIL( + file_metadata->get_table_metaindex_node(table_name, table_root))) { + } else if (RET_FAIL( + file_metadata->get_table_schema(table_name, table_schema))) { + } + + if (IS_FAIL(ret)) { + ret_qds = nullptr; + return ret; + } + std::vector lower_case_column_names(columns); + for (auto& column : lower_case_column_names) { + to_lowercase_inplace(column); + } + std::shared_ptr column_mapping = + std::make_shared(); + for (size_t i = 0; i < lower_case_column_names.size(); ++i) { + column_mapping->add(lower_case_column_names[i], static_cast(i), + *table_schema); + } + std::vector data_types; + data_types.reserve(lower_case_column_names.size()); + for (size_t i = 0; i < lower_case_column_names.size(); ++i) { + auto ind = table_schema->find_column_index(lower_case_column_names[i]); + if (ind < 0) { + delete time_filter; + return common::E_COLUMN_NOT_EXIST; + } + data_types.push_back(table_schema->get_data_types()[ind]); + } + + auto device_task_iterator = + std::unique_ptr(new DeviceTaskIterator( + lower_case_column_names, table_root, column_mapping, + meta_data_querier_, id_filter, table_schema)); + + std::unique_ptr tsblock_reader; + switch (table_query_ordering_) { + case TableQueryOrdering::DEVICE: + tsblock_reader = std::unique_ptr( + new DeviceOrderedTsBlockReader(std::move(device_task_iterator), + meta_data_querier_, block_size_, + tsfile_io_reader_, time_filter, + field_filter, offset, limit)); + break; + case TableQueryOrdering::TIME: + default: + ret = common::E_UNSUPPORTED_ORDER; + } + assert(tsblock_reader != nullptr); + ret_qds = new TableResultSet(std::move(tsblock_reader), + lower_case_column_names, data_types); + return ret; +} + int TableQueryExecutor::query_on_tree( const std::vector>& devices, const std::vector& tag_columns, @@ -191,5 +258,108 @@ int TableQueryExecutor::query_on_tree( return ret; } +int TableQueryExecutor::query_on_tree( + const std::vector>& devices, + const std::vector& tag_columns, + const std::vector& field_columns, Filter* time_filter, + int32_t offset, int32_t limit, ResultSet*& ret_qds) { + common::PageArena pa; + pa.init(512, common::MOD_TSFILE_READER); + int ret = common::E_OK; + TsFileMeta* file_meta = tsfile_io_reader_->get_tsfile_meta(); + std::unordered_set table_inodes; + for (auto const& device : devices) { + MetaIndexNode* table_inode; + if (RET_FAIL(file_meta->get_table_metaindex_node( + device->get_table_name(), table_inode))) { + }; + table_inodes.insert(table_inode); + } + + std::vector col_schema; + for (auto const& tag : tag_columns) { + col_schema.emplace_back(tag, common::TSDataType::STRING, + common::ColumnCategory::TAG); + } + + std::unordered_map column_types_map; + + for (auto const& device : devices) { + bool all_collected = true; + for (const auto& field_col : field_columns) { + if (column_types_map.find(field_col) == column_types_map.end()) { + all_collected = false; + break; + } + } + if (all_collected) { + break; + } + + std::unordered_set measurements(field_columns.begin(), + field_columns.end()); + std::vector index(measurements.size()); + if (RET_FAIL(tsfile_io_reader_->get_timeseries_indexes( + device, measurements, index, pa))) { + return ret; + } + + for (auto* ts_index : index) { + if (ts_index != nullptr) { + std::string measurement_name = + ts_index->get_measurement_name().to_std_string(); + if (column_types_map.find(measurement_name) == + column_types_map.end()) { + common::TSDataType type = ts_index->get_data_type(); + column_types_map[measurement_name] = type; + } + } + } + } + + for (const auto& field_col : field_columns) { + if (column_types_map.find(field_col) != column_types_map.end()) { + col_schema.emplace_back(field_col, column_types_map[field_col], + common::ColumnCategory::FIELD); + } else { + col_schema.emplace_back(field_col, + common::TSDataType::INVALID_DATATYPE, + common::ColumnCategory::FIELD); + } + } + + auto schema = std::make_shared("default", col_schema); + schema->set_virtual_table(); + std::shared_ptr column_mapping = + std::make_shared(); + for (size_t i = 0; i < col_schema.size(); ++i) { + column_mapping->add(col_schema[i].column_name_, i, *schema); + } + std::vector index_nodes(table_inodes.begin(), + table_inodes.end()); + auto device_task_iterator = + std::unique_ptr(new DeviceTaskIterator( + schema->get_measurement_names(), index_nodes, column_mapping, + meta_data_querier_, nullptr, schema)); + std::unique_ptr tsblock_reader; + switch (table_query_ordering_) { + case TableQueryOrdering::DEVICE: + tsblock_reader = std::unique_ptr( + new DeviceOrderedTsBlockReader(std::move(device_task_iterator), + meta_data_querier_, block_size_, + tsfile_io_reader_, time_filter, + nullptr, offset, limit)); + break; + case TableQueryOrdering::TIME: + default: + ret = common::E_UNSUPPORTED_ORDER; + } + assert(tsblock_reader != nullptr); + ret_qds = new TableResultSet(std::move(tsblock_reader), + schema->get_measurement_names(), + schema->get_data_types()); + return ret; +} + void TableQueryExecutor::destroy_query_data_set(ResultSet* qds) { delete qds; } } // end namespace storage diff --git a/cpp/src/reader/table_query_executor.h b/cpp/src/reader/table_query_executor.h index 974e6b45b..7444b9bf5 100644 --- a/cpp/src/reader/table_query_executor.h +++ b/cpp/src/reader/table_query_executor.h @@ -65,10 +65,19 @@ class TableQueryExecutor { int query(const std::string& table_name, const std::vector& columns, Filter* time_filter, Filter* id_filter, Filter* field_filter, ResultSet*& ret_qds); + int query(const std::string& table_name, + const std::vector& columns, Filter* time_filter, + Filter* id_filter, Filter* field_filter, int32_t offset, + int32_t limit, ResultSet*& ret_qds); int query_on_tree(const std::vector>& devices, const std::vector& tag_columns, const std::vector& field_columns, Filter* time_filter, ResultSet*& ret_qds); + int query_on_tree(const std::vector>& devices, + const std::vector& tag_columns, + const std::vector& field_columns, + Filter* time_filter, int32_t offset, int32_t limit, + ResultSet*& ret_qds); void destroy_query_data_set(ResultSet* qds); private: diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index 84188b6a3..c741943ff 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -18,8 +18,11 @@ */ #include "tsfile_reader.h" +#include + #include "common/schema.h" #include "filter/time_operator.h" +#include "reader/row_range_result_set.h" #include "tsfile_executor.h" using namespace common; @@ -116,6 +119,20 @@ int TsFileReader::query(const std::string& table_name, return ret; } +int TsFileReader::queryByRow(const std::string& table_name, + const std::vector& column_names, + int offset, int limit, ResultSet*& result_set) { + int ret = common::E_OK; + TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta(); + if (tsfile_meta == nullptr) { + return E_TSFILE_WRITER_META_ERR; + } + ret = table_query_executor_->query(to_lower(table_name), column_names, + nullptr, nullptr, nullptr, offset, limit, + result_set); + return ret; +} + int TsFileReader::query_table_on_tree( const std::vector& measurement_names, int64_t star_time, int64_t end_time, ResultSet*& result_set) { @@ -193,6 +210,31 @@ int TsFileReader::query_table_on_tree( return ret; } +int TsFileReader::query_table_on_tree_by_row( + const std::vector>& device_ids, + const std::vector& measurement_names, int32_t offset, + int32_t limit, ResultSet*& result_set) { + int ret = E_OK; + TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta(); + if (tsfile_meta == nullptr) { + return E_TSFILE_WRITER_META_ERR; + } + size_t device_max_len = 0; + for (auto& device_id : device_ids) { + if (device_id->get_split_seg_num() > device_max_len) { + device_max_len = device_id->get_split_seg_num(); + } + } + std::vector tag_columns(device_max_len); + for (size_t i = 0; i < device_max_len; i++) { + tag_columns[i] = "col_" + std::to_string(i); + } + ret = table_query_executor_->query_on_tree(device_ids, tag_columns, + measurement_names, nullptr, + offset, limit, result_set); + return ret; +} + void TsFileReader::destroy_query_data_set(storage::ResultSet* qds) { tsfile_executor_->destroy_query_data_set(qds); } diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h index 6c8563563..c1727e52b 100644 --- a/cpp/src/reader/tsfile_reader.h +++ b/cpp/src/reader/tsfile_reader.h @@ -113,9 +113,31 @@ class TsFileReader { const std::vector& columns_names, int64_t start_time, int64_t end_time, ResultSet*& result_set, Filter* tag_filter); + /** + * @brief Query table model data by row range. + * + * Internally queries the full time range [INT64_MIN, INT64_MAX] and + * applies offset/limit at the result-set level. Once `limit` rows have + * been returned, no further data is loaded from storage. + * + * @param table_name Target table name. + * @param column_names Columns to query. + * @param offset Number of leading rows to skip (>= 0). + * @param limit Maximum rows to return. < 0 means unlimited. + * @param[out] result_set The result set containing query results. + * @return Returns 0 on success, or a non-zero error code on failure. + */ + int queryByRow(const std::string& table_name, + const std::vector& column_names, int offset, + int limit, ResultSet*& result_set); + int query_table_on_tree(const std::vector& measurement_names, int64_t star_time, int64_t end_time, ResultSet*& result_set); + int query_table_on_tree_by_row( + const std::vector>& device_ids, + const std::vector& measurement_names, int32_t offset, + int32_t limit, ResultSet*& result_set); /** * @brief destroy the result set, this method should be called after the * query is finished and result_set diff --git a/cpp/src/reader/tsfile_series_scan_iterator.cc b/cpp/src/reader/tsfile_series_scan_iterator.cc index 5e78574e7..751d4ba29 100644 --- a/cpp/src/reader/tsfile_series_scan_iterator.cc +++ b/cpp/src/reader/tsfile_series_scan_iterator.cc @@ -38,15 +38,19 @@ void TsFileSeriesScanIterator::destroy() { int TsFileSeriesScanIterator::get_next(TsBlock*& ret_tsblock, bool alloc, Filter* oneshoot_filter) { - // TODO @filter int ret = E_OK; + if (remaining_limit_ == 0) { + return E_NO_MORE_DATA; + } Filter* filter = (oneshoot_filter != nullptr) ? oneshoot_filter : time_filter_; - if (!chunk_reader_->has_more_data()) { - while (true) { - if (!has_next_chunk()) { - return E_NO_MORE_DATA; - } else { + + while (true) { + if (!chunk_reader_->has_more_data()) { + while (true) { + if (!has_next_chunk()) { + return E_NO_MORE_DATA; + } if (!is_aligned_) { ChunkMeta* cm = get_current_chunk_meta(); advance_to_next_chunk(); @@ -54,6 +58,13 @@ int TsFileSeriesScanIterator::get_next(TsBlock*& ret_tsblock, bool alloc, !filter->satisfy(cm->statistic_)) { continue; } + // Skip entire chunk if offset covers it + int32_t chunk_count = get_chunk_row_count(cm); + if (remaining_offset_ > 0 && chunk_count > 0 && + remaining_offset_ >= chunk_count) { + remaining_offset_ -= chunk_count; + continue; + } chunk_reader_->reset(); if (RET_FAIL(chunk_reader_->load_by_meta(cm))) { } @@ -66,6 +77,13 @@ int TsFileSeriesScanIterator::get_next(TsBlock*& ret_tsblock, bool alloc, !filter->satisfy(value_cm->statistic_)) { continue; } + // Skip entire chunk if offset covers it + int32_t chunk_count = get_chunk_row_count(time_cm); + if (remaining_offset_ > 0 && chunk_count > 0 && + remaining_offset_ >= chunk_count) { + remaining_offset_ -= chunk_count; + continue; + } chunk_reader_->reset(); if (RET_FAIL(chunk_reader_->load_by_aligned_meta( time_cm, value_cm))) { @@ -73,15 +91,61 @@ int TsFileSeriesScanIterator::get_next(TsBlock*& ret_tsblock, bool alloc, break; } } + if (IS_FAIL(ret)) { + return ret; + } + // Skip pages within the loaded chunk + if (remaining_offset_ > 0) { + int32_t pages_skipped = 0; + if (RET_FAIL(chunk_reader_->skip_pages(remaining_offset_, + pages_skipped))) { + return ret; + } + remaining_offset_ -= pages_skipped; + } } - } - if (IS_SUCC(ret)) { - if (alloc) { - ret_tsblock = alloc_tsblock(); + + if (IS_SUCC(ret)) { + if (alloc) { + ret_tsblock = alloc_tsblock(); + } + ret = chunk_reader_->get_next_page(ret_tsblock, filter, *data_pa_); + } + + if (IS_FAIL(ret)) { + return ret; + } + + // Handle remaining offset within decoded page + uint32_t row_count = ret_tsblock->get_row_count(); + if (remaining_offset_ > 0) { + if (remaining_offset_ >= (int32_t)row_count) { + remaining_offset_ -= row_count; + ret_tsblock->reset(); + continue; // decode next page + } else { + // Partial skip: shrink TsBlock by adjusting row_count_ + // We can't easily remove leading rows, so we re-expose + // remaining_offset_ via set_row_offset for the consumer. + // For simplicity, just keep decoding and skip in-page rows + // at the consumer level. Set remaining_offset_ = 0 here + // since consumer will handle the rest. + // Actually, let's just skip the whole page if the remaining + // offset is large enough, but if not, accept the small waste. + remaining_offset_ = 0; + } } - ret = chunk_reader_->get_next_page(ret_tsblock, filter, *data_pa_); + + // Handle limit: truncate block if needed + if (remaining_limit_ >= 0) { + if ((int32_t)ret_tsblock->get_row_count() > remaining_limit_) { + ret_tsblock->set_row_count(remaining_limit_); + } + remaining_limit_ -= ret_tsblock->get_row_count(); + } + + return ret; } - return ret; } void TsFileSeriesScanIterator::revert_tsblock() { @@ -99,14 +163,39 @@ int TsFileSeriesScanIterator::init_chunk_reader() { void* buf = common::mem_alloc(sizeof(ChunkReader), common::MOD_DEFAULT); chunk_reader_ = new (buf) ChunkReader; chunk_meta_cursor_ = itimeseries_index_->get_chunk_meta_list()->begin(); - ChunkMeta* cm = chunk_meta_cursor_.get(); ASSERT(!chunk_reader_->has_more_data()); if (RET_FAIL(chunk_reader_->init( read_file_, itimeseries_index_->get_measurement_name(), itimeseries_index_->get_data_type(), time_filter_))) { - } else if (RET_FAIL(chunk_reader_->load_by_meta(cm))) { - } else { - chunk_meta_cursor_++; + return ret; + } + // Skip chunks covered by offset + while (chunk_meta_cursor_ != + itimeseries_index_->get_chunk_meta_list()->end()) { + ChunkMeta* cm = chunk_meta_cursor_.get(); + int32_t chunk_count = get_chunk_row_count(cm); + if (remaining_offset_ > 0 && chunk_count > 0 && + remaining_offset_ >= chunk_count) { + remaining_offset_ -= chunk_count; + chunk_meta_cursor_++; + continue; + } + chunk_reader_->reset(); + if (RET_FAIL(chunk_reader_->load_by_meta(cm))) { + } else { + chunk_meta_cursor_++; + } + break; + } + // Skip pages within loaded chunk + if (IS_SUCC(ret) && remaining_offset_ > 0 && + chunk_reader_->has_more_data()) { + int32_t pages_skipped = 0; + if (RET_FAIL(chunk_reader_->skip_pages(remaining_offset_, + pages_skipped))) { + } else { + remaining_offset_ -= pages_skipped; + } } } else { void* buf = @@ -116,17 +205,45 @@ int TsFileSeriesScanIterator::init_chunk_reader() { itimeseries_index_->get_time_chunk_meta_list()->begin(); value_chunk_meta_cursor_ = itimeseries_index_->get_value_chunk_meta_list()->begin(); - ChunkMeta* time_cm = time_chunk_meta_cursor_.get(); - ChunkMeta* value_cm = value_chunk_meta_cursor_.get(); ASSERT(!chunk_reader_->has_more_data()); if (RET_FAIL(chunk_reader_->init( read_file_, itimeseries_index_->get_measurement_name(), itimeseries_index_->get_data_type(), time_filter_))) { - } else if (RET_FAIL(chunk_reader_->load_by_aligned_meta(time_cm, - value_cm))) { - } else { - time_chunk_meta_cursor_++; - value_chunk_meta_cursor_++; + return ret; + } + // Skip chunks covered by offset + while (time_chunk_meta_cursor_ != + itimeseries_index_->get_time_chunk_meta_list()->end() && + value_chunk_meta_cursor_ != + itimeseries_index_->get_value_chunk_meta_list()->end()) { + ChunkMeta* time_cm = time_chunk_meta_cursor_.get(); + int32_t chunk_count = get_chunk_row_count(time_cm); + if (remaining_offset_ > 0 && chunk_count > 0 && + remaining_offset_ >= chunk_count) { + remaining_offset_ -= chunk_count; + time_chunk_meta_cursor_++; + value_chunk_meta_cursor_++; + continue; + } + ChunkMeta* value_cm = value_chunk_meta_cursor_.get(); + chunk_reader_->reset(); + if (RET_FAIL( + chunk_reader_->load_by_aligned_meta(time_cm, value_cm))) { + } else { + time_chunk_meta_cursor_++; + value_chunk_meta_cursor_++; + } + break; + } + // Skip pages within loaded chunk + if (IS_SUCC(ret) && remaining_offset_ > 0 && + chunk_reader_->has_more_data()) { + int32_t pages_skipped = 0; + if (RET_FAIL(chunk_reader_->skip_pages(remaining_offset_, + pages_skipped))) { + } else { + remaining_offset_ -= pages_skipped; + } } } diff --git a/cpp/src/reader/tsfile_series_scan_iterator.h b/cpp/src/reader/tsfile_series_scan_iterator.h index ad6fe8d94..c26a540d7 100644 --- a/cpp/src/reader/tsfile_series_scan_iterator.h +++ b/cpp/src/reader/tsfile_series_scan_iterator.h @@ -48,7 +48,9 @@ class TsFileSeriesScanIterator { tuple_desc_(), tsblock_(nullptr), time_filter_(nullptr), - is_aligned_(false) {} + is_aligned_(false), + remaining_offset_(0), + remaining_limit_(-1) {} ~TsFileSeriesScanIterator() { destroy(); } int init(std::shared_ptr device_id, const std::string& measurement_name, ReadFile* read_file, @@ -69,6 +71,11 @@ class TsFileSeriesScanIterator { Filter* oneshoot_filter = nullptr); void revert_tsblock(); + void set_row_range(int32_t offset, int32_t limit) { + remaining_offset_ = offset; + remaining_limit_ = limit; + } + friend class TsFileIOReader; private: @@ -93,6 +100,11 @@ class TsFileSeriesScanIterator { FORCE_INLINE ChunkMeta* get_current_chunk_meta() { return chunk_meta_cursor_.get(); } + int32_t get_chunk_row_count(ChunkMeta* cm) const { + return (cm != nullptr && cm->statistic_ != nullptr) + ? cm->statistic_->get_count() + : 0; + } common::TsBlock* alloc_tsblock(); private: @@ -112,6 +124,9 @@ class TsFileSeriesScanIterator { common::TsBlock* tsblock_; Filter* time_filter_; bool is_aligned_ = false; + + int32_t remaining_offset_; + int32_t remaining_limit_; }; } // end namespace storage diff --git a/cpp/src/reader/tsfile_tree_reader.cc b/cpp/src/reader/tsfile_tree_reader.cc index 1b58c359d..73daef0e4 100644 --- a/cpp/src/reader/tsfile_tree_reader.cc +++ b/cpp/src/reader/tsfile_tree_reader.cc @@ -19,6 +19,10 @@ #include "reader/tsfile_tree_reader.h" +#include + +#include "reader/row_range_result_set.h" + namespace storage { TsFileTreeReader::TsFileTreeReader() { @@ -47,6 +51,21 @@ int TsFileTreeReader::query(const std::vector& device_ids, return tsfile_reader_->query(path_list, start_time, end_time, result_set); } +int TsFileTreeReader::queryByRow( + const std::vector& device_ids, + const std::vector& measurement_names, int offset, int limit, + ResultSet*& result_set) { + std::vector> device_id_ptrs; + device_id_ptrs.reserve(device_ids.size()); + for (const auto& device_id : device_ids) { + auto ptr = std::make_shared(device_id); + ptr->split_table_name(); + device_id_ptrs.push_back(ptr); + } + return tsfile_reader_->query_table_on_tree_by_row( + device_id_ptrs, measurement_names, offset, limit, result_set); +} + void TsFileTreeReader::destroy_query_data_set(ResultSet* qds) { tsfile_reader_->destroy_query_data_set(qds); } diff --git a/cpp/src/reader/tsfile_tree_reader.h b/cpp/src/reader/tsfile_tree_reader.h index 535180409..c69fae6a4 100644 --- a/cpp/src/reader/tsfile_tree_reader.h +++ b/cpp/src/reader/tsfile_tree_reader.h @@ -67,6 +67,26 @@ class TsFileTreeReader { const std::vector& measurement_names, int64_t start_time, int64_t end_time, ResultSet*& result_set); + /** + * @brief Query tree model data by row range. + * + * Internally queries the full time range [INT64_MIN, INT64_MAX] and + * applies offset/limit at the result-set level. Once `limit` rows have + * been returned, no further data is loaded from storage. + * + * @param device_ids List of device identifiers to query. + * @param measurement_names List of measurement names to query. + * @param offset Number of leading rows to skip (>= 0). + * @param limit Maximum rows to return. < 0 means unlimited. + * @param[out] result_set The result set containing query results. + * @return Returns 0 on success, or a non-zero error code on failure. + * The caller is responsible for destroying the result set using + * destroy_query_data_set(). + */ + int queryByRow(const std::vector& device_ids, + const std::vector& measurement_names, + int offset, int limit, ResultSet*& result_set); + /** * @brief Destroy and deallocate the query result set * diff --git a/cpp/test/cwrapper/cwrapper_test.cc b/cpp/test/cwrapper/cwrapper_test.cc index 5998939af..a2601e228 100644 --- a/cpp/test/cwrapper/cwrapper_test.cc +++ b/cpp/test/cwrapper/cwrapper_test.cc @@ -310,4 +310,230 @@ TEST_F(CWrapperTest, WriterFlushTabletAndReadData) { free(data_types); free_write_file(&file); } + +// ───────────────────────────────────────────────────────────────────────────── +// tsfile_reader_query_tree_by_row +// ───────────────────────────────────────────────────────────────────────────── + +TEST_F(CWrapperTest, QueryTreeByRow_LimitAndOffset) { + ERRNO code = 0; + const char* filename = "cwrapper_tree_row_query_test.tsfile"; + remove(filename); + + // ---- Write 30 records for "device"."s1" (INT64) ---- + const int total_rows = 30; + TsFileWriter writer = + _tsfile_writer_new(filename, 128 * 1024 * 1024, &code); + ASSERT_EQ(code, RET_OK); + + timeseries_schema ts_schema; + ts_schema.timeseries_name = const_cast("s1"); + ts_schema.data_type = TS_DATATYPE_INT64; + ts_schema.encoding = TS_ENCODING_PLAIN; + ts_schema.compression = TS_COMPRESSION_UNCOMPRESSED; + ASSERT_OK(_tsfile_writer_register_timeseries(writer, "device", &ts_schema)); + + for (int i = 0; i < total_rows; i++) { + TsRecord rec = _ts_record_new("device", static_cast(i), 1); + _insert_data_into_ts_record_by_name_int64_t( + rec, "s1", static_cast(i * 10)); + ASSERT_OK(_tsfile_writer_write_ts_record(writer, rec)); + _free_tsfile_ts_record(&rec); + } + ASSERT_OK(_tsfile_writer_flush(writer)); + ASSERT_OK(_tsfile_writer_close(writer)); + + // ---- Read phase ---- + TsFileReader reader = tsfile_reader_new(filename, &code); + ASSERT_EQ(code, RET_OK); + + char* devs[] = {const_cast("device")}; + char* meas[] = {const_cast("s1")}; + + // ① limit=0 → empty result + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 0, 0, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 0); + free_tsfile_result_set(&rs); + } + // ② limit < total → exactly `limit` rows + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 0, 10, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 10); + free_tsfile_result_set(&rs); + } + // ③ limit=-1 → unlimited (all rows) + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 0, -1, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, total_rows); + free_tsfile_result_set(&rs); + } + // ④ offset=20, limit=20 → 10 rows remain + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 20, 20, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 10); + free_tsfile_result_set(&rs); + } + // ⑤ offset beyond total → empty + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 1000, 10, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 0); + free_tsfile_result_set(&rs); + } + // ⑥ data correctness: offset=5, limit=5 → timestamps 5..9 + { + ResultSet rs = tsfile_reader_query_tree_by_row(reader, devs, 1, meas, 1, + 5, 5, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) { + // column 1 = time + int64_t ts = tsfile_result_set_get_value_by_index_int64_t(rs, 1); + EXPECT_EQ(ts, static_cast(5 + cnt)); + cnt++; + } + EXPECT_EQ(cnt, 5); + free_tsfile_result_set(&rs); + } + + tsfile_reader_close(reader); + remove(filename); +} + +// ───────────────────────────────────────────────────────────────────────────── +// tsfile_reader_query_table_by_row +// ───────────────────────────────────────────────────────────────────────────── + +TEST_F(CWrapperTest, QueryTableByRow_LimitAndOffset) { + ERRNO code = 0; + const char* filename = "cwrapper_table_row_query_test.tsfile"; + remove(filename); + + // ---- Write 30 rows into table "t1" with column "s0" INT64 ---- + const int total_rows = 30; + + ColumnSchema col_schema; + col_schema.column_name = const_cast("s0"); + col_schema.data_type = TS_DATATYPE_INT64; + col_schema.column_category = FIELD; + + TableSchema schema; + schema.table_name = const_cast("t1"); + schema.column_num = 1; + schema.column_schemas = &col_schema; + + WriteFile wf = write_file_new(filename, &code); + ASSERT_EQ(code, RET_OK); + TsFileWriter writer = tsfile_writer_new(wf, &schema, &code); + ASSERT_EQ(code, RET_OK); + + char* col_name_arr[] = {const_cast("s0")}; + TSDataType dtype_arr[] = {TS_DATATYPE_INT64}; + Tablet tablet = tablet_new(col_name_arr, dtype_arr, 1, total_rows); + for (int i = 0; i < total_rows; i++) { + tablet_add_timestamp(tablet, i, static_cast(i)); + tablet_add_value_by_index_int64_t(tablet, i, 0, + static_cast(i)); + } + ASSERT_OK(tsfile_writer_write(writer, tablet)); + free_tablet(&tablet); + ASSERT_OK(tsfile_writer_close(writer)); + free_write_file(&wf); + + // ---- Read phase ---- + TsFileReader reader = tsfile_reader_new(filename, &code); + ASSERT_EQ(code, RET_OK); + + char* cols[] = {const_cast("s0")}; + + // ① limit=0 → empty result + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 0, 0, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 0); + free_tsfile_result_set(&rs); + } + // ② limit < total → exactly `limit` rows + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 0, 10, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 10); + free_tsfile_result_set(&rs); + } + // ③ limit=-1 → unlimited (all rows) + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 0, -1, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, total_rows); + free_tsfile_result_set(&rs); + } + // ④ offset=20, limit=20 → 10 rows remain + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 20, 20, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 10); + free_tsfile_result_set(&rs); + } + // ⑤ offset beyond total → empty + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 1000, 10, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) cnt++; + EXPECT_EQ(cnt, 0); + free_tsfile_result_set(&rs); + } + // ⑥ data correctness: offset=5, limit=5 → timestamps 5..9 + { + ResultSet rs = tsfile_reader_query_table_by_row(reader, "t1", cols, 1, + 5, 5, &code); + ASSERT_EQ(code, RET_OK); + int cnt = 0; + while (tsfile_result_set_next(rs, &code) && code == RET_OK) { + // column 1 = time + int64_t ts = tsfile_result_set_get_value_by_index_int64_t(rs, 1); + EXPECT_EQ(ts, static_cast(5 + cnt)); + cnt++; + } + EXPECT_EQ(cnt, 5); + free_tsfile_result_set(&rs); + } + + tsfile_reader_close(reader); + remove(filename); +} + } // namespace cwrapper \ No newline at end of file diff --git a/cpp/test/reader/table_view/tsfile_table_reader_row_query_test.cc b/cpp/test/reader/table_view/tsfile_table_reader_row_query_test.cc new file mode 100644 index 000000000..e8549db05 --- /dev/null +++ b/cpp/test/reader/table_view/tsfile_table_reader_row_query_test.cc @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include + +#include "common/config/config.h" +#include "common/schema.h" +#include "common/tablet.h" +#include "file/write_file.h" +#include "reader/result_set.h" +#include "reader/tsfile_reader.h" +#include "writer/tsfile_table_writer.h" + +using namespace storage; +using namespace common; + +// ───────────────────────────────────────────────────────────────────────────── +// Fixture +// ───────────────────────────────────────────────────────────────────────────── + +class TsFileTableReaderRowQueryTest : public ::testing::Test { + protected: + static std::string generate_random_string(int length) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 61); + const std::string chars = + "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + std::string s; + for (int i = 0; i < length; ++i) s += chars[dis(gen)]; + return s; + } + + void SetUp() override { + libtsfile_init(); + file_name_ = "tsfile_table_row_query_test_" + + generate_random_string(10) + ".tsfile"; + remove(file_name_.c_str()); + } + + void TearDown() override { + remove(file_name_.c_str()); + libtsfile_destroy(); + } + + // Write a simple table "t1" with columns ["s0" INT64] and `num_rows` rows. + // Timestamps and values are both 0..num_rows-1. + void write_simple_table(int num_rows) { + WriteFile wf; + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + wf.create(file_name_, flags, 0666); + + std::vector col_schemas; + col_schemas.emplace_back("s0", TSDataType::INT64, + CompressionType::UNCOMPRESSED, + TSEncoding::PLAIN, ColumnCategory::FIELD); + auto* schema = new TableSchema(TABLE_NAME, col_schemas); + auto writer = std::make_shared(&wf, schema); + + Tablet tablet(TABLE_NAME, {"s0"}, {TSDataType::INT64}, + {ColumnCategory::FIELD}, num_rows); + for (int i = 0; i < num_rows; i++) { + tablet.add_timestamp(i, static_cast(i)); + tablet.add_value(i, 0, static_cast(i)); + } + writer->write_table(tablet); + writer->flush(); + writer->close(); + delete schema; + } + + // Count rows returned by queryByRow. + int count_rows(int offset, int limit) { + TsFileReader reader; + EXPECT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + EXPECT_EQ(reader.queryByRow(TABLE_NAME, {"s0"}, offset, limit, rs), + E_OK); + EXPECT_NE(rs, nullptr); + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) count++; + reader.destroy_query_data_set(rs); + reader.close(); + return count; + } + + std::string file_name_; + enum { TOTAL_ROWS = 50 }; + static constexpr const char* TABLE_NAME = "t1"; +}; + +// ───────────────────────────────────────────────────────────────────────────── +// Tests +// ───────────────────────────────────────────────────────────────────────────── + +// ① limit = 0: empty result +TEST_F(TsFileTableReaderRowQueryTest, LimitZeroReturnsEmpty) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(0, 0), 0); +} + +// ② limit < total: only `limit` rows +TEST_F(TsFileTableReaderRowQueryTest, LimitLessThanTotal) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(0, 10), 10); +} + +// ③ limit > total: all rows +TEST_F(TsFileTableReaderRowQueryTest, LimitExceedsTotal) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(0, 9999), TOTAL_ROWS); +} + +// ④ limit < 0: unlimited, returns all rows +TEST_F(TsFileTableReaderRowQueryTest, NegativeLimitMeansUnlimited) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(0, -1), TOTAL_ROWS); +} + +// ⑤ offset in the middle +TEST_F(TsFileTableReaderRowQueryTest, OffsetPlusLimit) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(10, 15), 15); +} + +// ⑥ offset >= total: empty result +TEST_F(TsFileTableReaderRowQueryTest, OffsetBeyondTotal) { + write_simple_table(TOTAL_ROWS); + ASSERT_EQ(count_rows(1000, 10), 0); +} + +// ⑦ offset + limit > total: return only remaining rows +TEST_F(TsFileTableReaderRowQueryTest, OffsetPlusLimitExceedsTotal) { + write_simple_table(TOTAL_ROWS); + // offset=40, limit=20 → 10 rows remain + ASSERT_EQ(count_rows(40, 20), 10); +} + +// ⑧ Data correctness: time column starts at offset +TEST_F(TsFileTableReaderRowQueryTest, OffsetDataCorrectness) { + write_simple_table(TOTAL_ROWS); + + TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow(TABLE_NAME, {"s0"}, 5, 10, rs), E_OK); + + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) { + // column 1 = time + int64_t ts = rs->get_value(1); + EXPECT_EQ(ts, static_cast(5 + count)); + // column 2 = s0 (same value as timestamp in our write helper) + int64_t val = rs->get_value(2); + EXPECT_EQ(val, static_cast(5 + count)); + count++; + } + EXPECT_EQ(count, 10); + reader.destroy_query_data_set(rs); + reader.close(); +} + +// ⑨ Large dataset, small page size: limit stops loading early (no hang/OOM) +TEST_F(TsFileTableReaderRowQueryTest, LimitStopsEarlyAcrossPages) { + int prev = g_config_value_.page_writer_max_point_num_; + g_config_value_.page_writer_max_point_num_ = 5; + + write_simple_table(200); + + ASSERT_EQ(count_rows(0, 50), 50); + + g_config_value_.page_writer_max_point_num_ = prev; +} + +// ⑩ Metadata accessible via queryByRow result set +TEST_F(TsFileTableReaderRowQueryTest, MetadataAccessible) { + write_simple_table(10); + + TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow(TABLE_NAME, {"s0"}, 0, 5, rs), E_OK); + + auto meta = rs->get_metadata(); + ASSERT_NE(meta, nullptr); + EXPECT_EQ(meta->get_column_name(1), "time"); + EXPECT_EQ(meta->get_column_name(2), "s0"); + EXPECT_EQ(meta->get_column_count(), 2u); + + reader.destroy_query_data_set(rs); + reader.close(); +} + +// ⑪ Paging consistency: two pages together equal full result +TEST_F(TsFileTableReaderRowQueryTest, PaginationConsistency) { + write_simple_table(40); + ASSERT_EQ(count_rows(0, 20) + count_rows(20, 20), 40); +} + +// ⑫ queryByRow result equivalent to full query with limit applied +TEST_F(TsFileTableReaderRowQueryTest, EquivalentToFullQueryWithLimit) { + write_simple_table(TOTAL_ROWS); + + // Full query row count + TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs_full = nullptr; + ASSERT_EQ(reader.query(TABLE_NAME, {"s0"}, INT64_MIN, INT64_MAX, rs_full), + E_OK); + int full_count = 0; + bool has_next = false; + while (IS_SUCC(rs_full->next(has_next)) && has_next) full_count++; + reader.destroy_query_data_set(rs_full); + + // queryByRow with limit < 0 should return the same count + ResultSet* rs_unlimited = nullptr; + ASSERT_EQ(reader.queryByRow(TABLE_NAME, {"s0"}, 0, -1, rs_unlimited), E_OK); + int unlimited_count = 0; + has_next = false; + while (IS_SUCC(rs_unlimited->next(has_next)) && has_next) unlimited_count++; + reader.destroy_query_data_set(rs_unlimited); + reader.close(); + + ASSERT_EQ(full_count, unlimited_count); +} + +// ⑬ Multiple flushes (multiple chunks): offset/limit still correct +TEST_F(TsFileTableReaderRowQueryTest, MultipleChunksCorrectness) { + // Write in two batches to ensure multiple chunks + WriteFile wf; + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + wf.create(file_name_, flags, 0666); + + std::vector col_schemas; + col_schemas.emplace_back("s0", TSDataType::INT64, + CompressionType::UNCOMPRESSED, TSEncoding::PLAIN, + ColumnCategory::FIELD); + auto* schema = new TableSchema(TABLE_NAME, col_schemas); + auto writer = std::make_shared(&wf, schema); + + // First batch: rows 0..29 + Tablet tablet1(TABLE_NAME, {"s0"}, {TSDataType::INT64}, + {ColumnCategory::FIELD}, 30); + for (int i = 0; i < 30; i++) { + tablet1.add_timestamp(i, static_cast(i)); + tablet1.add_value(i, 0, static_cast(i)); + } + writer->write_table(tablet1); + writer->flush(); + + // Second batch: rows 30..59 + Tablet tablet2(TABLE_NAME, {"s0"}, {TSDataType::INT64}, + {ColumnCategory::FIELD}, 30); + for (int i = 0; i < 30; i++) { + tablet2.add_timestamp(i, static_cast(30 + i)); + tablet2.add_value(i, 0, static_cast(30 + i)); + } + writer->write_table(tablet2); + writer->flush(); + writer->close(); + delete schema; + + // offset=25, limit=20 → rows 25..44 + TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow(TABLE_NAME, {"s0"}, 25, 20, rs), E_OK); + + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) { + int64_t ts = rs->get_value(1); + EXPECT_EQ(ts, static_cast(25 + count)); + count++; + } + EXPECT_EQ(count, 20); + reader.destroy_query_data_set(rs); + reader.close(); +} diff --git a/cpp/test/reader/tree_view/tsfile_tree_reader_row_query_test.cc b/cpp/test/reader/tree_view/tsfile_tree_reader_row_query_test.cc new file mode 100644 index 000000000..bc1930204 --- /dev/null +++ b/cpp/test/reader/tree_view/tsfile_tree_reader_row_query_test.cc @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include + +#include "common/config/config.h" +#include "common/record.h" +#include "common/schema.h" +#include "file/write_file.h" +#include "reader/result_set.h" +#include "reader/tsfile_tree_reader.h" +#include "writer/tsfile_tree_writer.h" + +using namespace storage; +using namespace common; + +// ───────────────────────────────────────────────────────────────────────────── +// Fixture +// ───────────────────────────────────────────────────────────────────────────── + +class TsFileTreeReaderRowQueryTest : public ::testing::Test { + protected: + static std::string generate_random_string(int length) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 61); + const std::string chars = + "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; + std::string s; + for (int i = 0; i < length; ++i) s += chars[dis(gen)]; + return s; + } + + void SetUp() override { + libtsfile_init(); + file_name_ = "tsfile_tree_row_query_test_" + + generate_random_string(10) + ".tsfile"; + remove(file_name_.c_str()); + } + + void TearDown() override { + remove(file_name_.c_str()); + libtsfile_destroy(); + } + + // Write `num_rows` records for each device in `device_ids`, + // using measurement `mea` (INT64). Timestamps are 0..num_rows-1, + // values are timestamp * 10. + void write_data(const std::vector& device_ids, + const std::string& mea, int num_rows) { + WriteFile wf; + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + wf.create(file_name_, flags, 0666); + TsFileTreeWriter writer(&wf); + for (auto dev : + device_ids) { // copy: register_timeseries needs non-const ref + auto* schema = new MeasurementSchema(mea, INT64); + writer.register_timeseries(dev, schema); + delete schema; + for (int i = 0; i < num_rows; i++) { + TsRecord record(dev, static_cast(i)); + record.add_point(mea, static_cast(i * 10)); + writer.write(record); + } + } + writer.flush(); + writer.close(); + } + + // Count the number of rows returned by queryByRow. + int count_rows(const std::string& file, + const std::vector& devs, const std::string& mea, + int offset, int limit) { + TsFileTreeReader reader; + EXPECT_EQ(reader.open(file), E_OK); + ResultSet* rs = nullptr; + EXPECT_EQ(reader.queryByRow(devs, {mea}, offset, limit, rs), E_OK); + EXPECT_NE(rs, nullptr); + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) count++; + reader.destroy_query_data_set(rs); + reader.close(); + return count; + } + + std::string file_name_; + enum { TOTAL_ROWS = 50 }; + const std::string DEV = "device"; + const std::string MEA = "s1"; +}; + +// ───────────────────────────────────────────────────────────────────────────── +// Tests +// ───────────────────────────────────────────────────────────────────────────── + +// ① limit = 0: empty result +TEST_F(TsFileTreeReaderRowQueryTest, LimitZeroReturnsEmpty) { + write_data({DEV}, MEA, TOTAL_ROWS); + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 0, 0), 0); +} + +// ② limit < total: only `limit` rows returned +TEST_F(TsFileTreeReaderRowQueryTest, LimitLessThanTotal) { + write_data({DEV}, MEA, TOTAL_ROWS); + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 0, 20), 20); +} + +// ③ limit > total: all rows returned +TEST_F(TsFileTreeReaderRowQueryTest, LimitExceedsTotal) { + write_data({DEV}, MEA, TOTAL_ROWS); + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 0, 1000), TOTAL_ROWS); +} + +// ④ limit < 0: unlimited, returns all rows +TEST_F(TsFileTreeReaderRowQueryTest, NegativeLimitMeansUnlimited) { + write_data({DEV}, MEA, TOTAL_ROWS); + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 0, -1), TOTAL_ROWS); +} + +// ⑤ offset + limit in the middle of the data +TEST_F(TsFileTreeReaderRowQueryTest, OffsetPlusLimit) { + write_data({DEV}, MEA, TOTAL_ROWS); + // offset=10, limit=15 → should return exactly 15 rows + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 10, 15), 15); +} + +// ⑥ offset >= total: empty result +TEST_F(TsFileTreeReaderRowQueryTest, OffsetBeyondTotal) { + write_data({DEV}, MEA, TOTAL_ROWS); + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 1000, 10), 0); +} + +// ⑦ offset + limit > total: return remaining rows from offset +TEST_F(TsFileTreeReaderRowQueryTest, OffsetPlusLimitExceedsTotal) { + write_data({DEV}, MEA, TOTAL_ROWS); + // offset=40, limit=20 → only 10 rows remain + ASSERT_EQ(count_rows(file_name_, {DEV}, MEA, 40, 20), 10); +} + +// ⑧ Data correctness: verify timestamps start from `offset` +TEST_F(TsFileTreeReaderRowQueryTest, OffsetDataCorrectness) { + write_data({DEV}, MEA, TOTAL_ROWS); + TsFileTreeReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow({DEV}, {MEA}, 5, 10, rs), E_OK); + + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) { + int64_t ts = rs->get_row_record()->get_timestamp(); + EXPECT_EQ(ts, static_cast(5 + count)); + int64_t val = rs->get_value(2); + EXPECT_EQ(val, (5 + count) * 10); + count++; + } + EXPECT_EQ(count, 10); + reader.destroy_query_data_set(rs); + reader.close(); +} + +// ⑨ Cross-chunk: small page size to force multiple chunks, verify correctness +TEST_F(TsFileTreeReaderRowQueryTest, CorrectnessAcrossChunks) { + int prev = g_config_value_.page_writer_max_point_num_; + g_config_value_.page_writer_max_point_num_ = 5; // tiny pages + + write_data({DEV}, MEA, 30); + + TsFileTreeReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow({DEV}, {MEA}, 5, 10, rs), E_OK); + + int count = 0; + bool has_next = false; + while (IS_SUCC(rs->next(has_next)) && has_next) { + int64_t ts = rs->get_row_record()->get_timestamp(); + EXPECT_EQ(ts, static_cast(5 + count)); + count++; + } + EXPECT_EQ(count, 10); + reader.destroy_query_data_set(rs); + reader.close(); + + g_config_value_.page_writer_max_point_num_ = prev; +} + +// ⑩ Multiple devices: verify total row count with offset/limit +TEST_F(TsFileTreeReaderRowQueryTest, MultipleDevicesOffsetLimit) { + // device_1 and device_2 each have 20 rows with timestamps 0..19. + // QDSWithoutTimeGenerator merges them by time: at each timestamp both + // devices' values appear in one RowRecord, so total distinct timestamps + // = 20. + write_data({"device_1", "device_2"}, MEA, 20); + + ASSERT_EQ(count_rows(file_name_, {"device_1", "device_2"}, MEA, 5, 10), 10); +} + +// ⑪ queryByRow result set supports metadata inspection +TEST_F(TsFileTreeReaderRowQueryTest, MetadataAccessible) { + write_data({DEV}, MEA, 10); + + TsFileTreeReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + ResultSet* rs = nullptr; + ASSERT_EQ(reader.queryByRow({DEV}, {MEA}, 0, 5, rs), E_OK); + + auto meta = rs->get_metadata(); + ASSERT_NE(meta, nullptr); + // column 1 = "time", column 2 = measurement + EXPECT_EQ(meta->get_column_name(1), "time"); + EXPECT_EQ(meta->get_column_count(), 2u); + + reader.destroy_query_data_set(rs); + reader.close(); +} + +// ⑫ Paging consistency: two pages together equal the full result +TEST_F(TsFileTreeReaderRowQueryTest, PaginationConsistency) { + write_data({DEV}, MEA, 40); + + int page1 = count_rows(file_name_, {DEV}, MEA, 0, 20); + int page2 = count_rows(file_name_, {DEV}, MEA, 20, 20); + ASSERT_EQ(page1 + page2, 40); +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/RowRangeResultSet.java b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/RowRangeResultSet.java new file mode 100644 index 000000000..eb077efdd --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/query/dataset/RowRangeResultSet.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.query.dataset; + +import org.apache.tsfile.write.record.TSRecord; + +import java.io.IOException; +import java.time.LocalDate; +import java.util.Iterator; + +/** + * A {@link ResultSet} wrapper that applies row-level offset and limit. + * + *

Takes the inner ResultSet and releases it on {@link #close()}. Once the limit is reached, + * {@link #next()} returns {@code false} immediately without calling the underlying ResultSet, + * avoiding unnecessary data loading. + * + * @param offset Number of leading rows to skip (must be >= 0). + * @param limit Maximum number of rows to return. A value < 0 means no limit. + */ +public class RowRangeResultSet implements ResultSet { + + private final ResultSet inner; + private final int offset; + private final int limit; + private int returnedCount; + private boolean offsetSkipped; + + public RowRangeResultSet(ResultSet inner, int offset, int limit) { + this.inner = inner; + this.offset = Math.max(0, offset); + this.limit = limit; + this.returnedCount = 0; + this.offsetSkipped = false; + } + + @Override + public ResultSetMetadata getMetadata() { + return inner.getMetadata(); + } + + @Override + public boolean next() throws IOException { + // Skip the first `offset` rows on the first call. + if (!offsetSkipped) { + for (int i = 0; i < offset; i++) { + if (!inner.next()) { + offsetSkipped = true; + return false; + } + } + offsetSkipped = true; + } + + // Limit reached: return false without touching inner ResultSet. + // This is the key "pushdown" effect: no further chunk/page loading occurs. + if (limit >= 0 && returnedCount >= limit) { + return false; + } + + boolean hasNext = inner.next(); + if (hasNext) { + returnedCount++; + } + return hasNext; + } + + @Override + public int getInt(String columnName) { + return inner.getInt(columnName); + } + + @Override + public int getInt(int columnIndex) { + return inner.getInt(columnIndex); + } + + @Override + public long getLong(String columnName) { + return inner.getLong(columnName); + } + + @Override + public long getLong(int columnIndex) { + return inner.getLong(columnIndex); + } + + @Override + public float getFloat(String columnName) { + return inner.getFloat(columnName); + } + + @Override + public float getFloat(int columnIndex) { + return inner.getFloat(columnIndex); + } + + @Override + public double getDouble(String columnName) { + return inner.getDouble(columnName); + } + + @Override + public double getDouble(int columnIndex) { + return inner.getDouble(columnIndex); + } + + @Override + public boolean getBoolean(String columnName) { + return inner.getBoolean(columnName); + } + + @Override + public boolean getBoolean(int columnIndex) { + return inner.getBoolean(columnIndex); + } + + @Override + public String getString(String columnName) { + return inner.getString(columnName); + } + + @Override + public String getString(int columnIndex) { + return inner.getString(columnIndex); + } + + @Override + public LocalDate getDate(String columnName) { + return inner.getDate(columnName); + } + + @Override + public LocalDate getDate(int columnIndex) { + return inner.getDate(columnIndex); + } + + @Override + public byte[] getBinary(String columnName) { + return inner.getBinary(columnName); + } + + @Override + public byte[] getBinary(int columnIndex) { + return inner.getBinary(columnIndex); + } + + @Override + public boolean isNull(String columnName) { + return inner.isNull(columnName); + } + + @Override + public boolean isNull(int columnIndex) { + return inner.isNull(columnIndex); + } + + @Override + public void close() { + inner.close(); + } + + @Override + public Iterator iterator() { + return inner.iterator(); + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java index 927ac8954..72e3a4a0a 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/DeviceTableModelReader.java @@ -33,6 +33,7 @@ import org.apache.tsfile.read.expression.ExpressionTree; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.query.dataset.RowRangeResultSet; import org.apache.tsfile.read.query.dataset.TableResultSet; import org.apache.tsfile.read.query.executor.TableQueryExecutor; import org.apache.tsfile.read.reader.block.TsBlockReader; @@ -113,6 +114,14 @@ public ResultSet query( return new TableResultSet(tsBlockReader, columnNames, dataTypeList, tableName); } + @TsFileApi + @Override + public ResultSet queryByRow(String tableName, List columnNames, int offset, int limit) + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + ResultSet inner = query(tableName, columnNames, Long.MIN_VALUE, Long.MAX_VALUE); + return new RowRangeResultSet(inner, offset, limit); + } + @Override public void close() { try { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java index 995f73f64..bf29ffcee 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileReader.java @@ -48,6 +48,26 @@ ResultSet query( @TsFileApi List getAllTableSchema() throws IOException; + /** + * Query table model data by row range. + * + *

Internally queries the full time range and applies offset/limit at the result-set level. + * Once {@code limit} rows are returned, no further data is loaded from storage. + * + * @param tableName target table name + * @param columnNames list of column names to query + * @param offset number of leading rows to skip (>= 0) + * @param limit maximum number of rows to return; < 0 means unlimited + * @return a {@link ResultSet} containing the query results + * @throws ReadProcessException if a read processing error occurs + * @throws IOException if an I/O error occurs + * @throws NoTableException if the table does not exist + * @throws NoMeasurementException if a column does not exist + */ + @TsFileApi + ResultSet queryByRow(String tableName, List columnNames, int offset, int limit) + throws ReadProcessException, IOException, NoTableException, NoMeasurementException; + @TsFileApi void close(); } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java index 202553ea8..40ccbad70 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/ITsFileTreeReader.java @@ -47,6 +47,24 @@ ResultSet query( @TreeModel List getDeviceSchema(String deviceId) throws IOException; + /** + * Query tree model data by row range. + * + *

Internally queries the full time range and applies offset/limit at the result-set level. + * Once {@code limit} rows are returned, no further data is loaded from storage. + * + * @param deviceIds list of device identifiers to query + * @param measurementNames list of measurement names to query + * @param offset number of leading rows to skip (>= 0) + * @param limit maximum number of rows to return; < 0 means unlimited + * @return a {@link ResultSet} containing the query results + * @throws IOException if an I/O error occurs during query execution + */ + @TsFileApi + @TreeModel + ResultSet queryByRow(List deviceIds, List measurementNames, int offset, int limit) + throws IOException; + /** Close underlying resources. */ @TsFileApi @TreeModel diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java index fe7a0c35b..c8b659b1b 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/v4/TsFileTreeReader.java @@ -30,6 +30,7 @@ import org.apache.tsfile.read.filter.operator.TimeFilterOperators; import org.apache.tsfile.read.query.dataset.QueryDataSet; import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.query.dataset.RowRangeResultSet; import org.apache.tsfile.read.query.dataset.TreeResultSet; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -113,6 +114,29 @@ public List getDeviceSchema(String deviceId) throws IOExcepti return tsfileReader.getMeasurement(new StringArrayDeviceID(deviceId)); } + /** + * Query tree model data by row range. + * + *

Internally queries the full time range and applies offset/limit at the result-set level. + * Once {@code limit} rows are returned, no further data is loaded from storage. + * + * @param deviceIds list of device identifiers to query + * @param measurementNames list of measurement names to query + * @param offset number of leading rows to skip (>= 0) + * @param limit maximum number of rows to return; < 0 means unlimited + * @return a {@link ResultSet} containing the query results + * @throws IOException if an I/O error occurs during query execution + */ + @TsFileApi + @TreeModel + @Override + public ResultSet queryByRow( + List deviceIds, List measurementNames, int offset, int limit) + throws IOException { + ResultSet inner = query(deviceIds, measurementNames, Long.MIN_VALUE, Long.MAX_VALUE); + return new RowRangeResultSet(inner, offset, limit); + } + /** * Close the TsFileTreeReader and release resources. * diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/DeviceTableModelReaderRowQueryTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/DeviceTableModelReaderRowQueryTest.java new file mode 100644 index 000000000..54671eec5 --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/DeviceTableModelReaderRowQueryTest.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.read.ReadProcessException; +import org.apache.tsfile.exception.write.NoMeasurementException; +import org.apache.tsfile.exception.write.NoTableException; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.v4.DeviceTableModelReader; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.v4.ITsFileWriter; +import org.apache.tsfile.write.v4.TsFileWriterBuilder; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Unit tests for {@link DeviceTableModelReader#queryByRow} (table-model row-range query). + * + *

The test table has {@value #TOTAL} rows with timestamps 0..TOTAL-1 and field values equal to + * their timestamps. + */ +public class DeviceTableModelReaderRowQueryTest { + + private static final String FILE_PATH = "test_table_reader_row_query.tsfile"; + private static final int TOTAL = 50; + private static final String TABLE = "t1"; + private static final String FIELD = "s0"; + + private DeviceTableModelReader reader; + + @Before + public void setUp() throws IOException, WriteProcessException { + writeTableFile(FILE_PATH, TABLE, FIELD, TOTAL); + reader = new DeviceTableModelReader(new File(FILE_PATH)); + } + + @After + public void tearDown() { + if (reader != null) { + reader.close(); + } + new File(FILE_PATH).delete(); + } + + // ① limit=0 → empty result + @Test + public void testLimitZeroReturnsEmpty() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(0, countRows(0, 0)); + } + + // ② limit < total → exactly `limit` rows + @Test + public void testLimitLessThanTotal() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(10, countRows(0, 10)); + } + + // ③ limit > total → all rows + @Test + public void testLimitExceedsTotal() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(TOTAL, countRows(0, 9999)); + } + + // ④ limit=-1 → unlimited, returns all rows + @Test + public void testNegativeLimitMeansUnlimited() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(TOTAL, countRows(0, -1)); + } + + // ⑤ offset + limit in the middle + @Test + public void testOffsetPlusLimit() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(15, countRows(10, 15)); + } + + // ⑥ offset >= total → empty result + @Test + public void testOffsetBeyondTotal() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + Assert.assertEquals(0, countRows(1000, 10)); + } + + // ⑦ offset + limit > total → return remaining rows from offset + @Test + public void testOffsetPlusLimitExceedsTotal() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + // offset=40, limit=20 → only 10 rows remain + Assert.assertEquals(10, countRows(40, 20)); + } + + // ⑧ data correctness: timestamps and values start from `offset` + @Test + public void testDataCorrectness() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + List columns = Collections.singletonList(FIELD); + ResultSet rs = reader.queryByRow(TABLE, columns, 5, 10); + int count = 0; + while (rs.next()) { + long ts = rs.getLong(1); // column 1 = Time + long val = rs.getLong(2); // column 2 = s0 + Assert.assertEquals(5 + count, ts); + Assert.assertEquals(5 + count, val); + count++; + } + rs.close(); + Assert.assertEquals(10, count); + } + + // ⑨ metadata is accessible via the result set + @Test + public void testMetadataAccessible() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + List columns = Collections.singletonList(FIELD); + ResultSet rs = reader.queryByRow(TABLE, columns, 0, 5); + Assert.assertNotNull(rs.getMetadata()); + Assert.assertEquals("Time", rs.getMetadata().getColumnName(1)); + Assert.assertEquals(FIELD, rs.getMetadata().getColumnName(2)); + rs.close(); + } + + // ⑩ paging consistency: two pages together equal the full result + @Test + public void testPaginationConsistency() + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + int page1 = countRows(0, 25); + int page2 = countRows(25, 25); + Assert.assertEquals(TOTAL, page1 + page2); + } + + // ⑪ multiple chunks: offset/limit spans chunk boundary correctly + @Test + public void testMultipleChunksCorrectness() + throws IOException, + WriteProcessException, + ReadProcessException, + NoTableException, + NoMeasurementException { + String filePath = "test_table_reader_row_query_multi_chunk.tsfile"; + writeTableFileMultiChunk(filePath, TABLE, FIELD, 30, 30); + try (DeviceTableModelReader r = new DeviceTableModelReader(new File(filePath))) { + // offset=25, limit=20 → rows 25..44 + List columns = Collections.singletonList(FIELD); + ResultSet rs = r.queryByRow(TABLE, columns, 25, 20); + int count = 0; + while (rs.next()) { + long ts = rs.getLong(1); + Assert.assertEquals(25 + count, ts); + count++; + } + rs.close(); + Assert.assertEquals(20, count); + } finally { + new File(filePath).delete(); + } + } + + // ───────────────────────────────────────────────────────────────────────────── + // Helpers + // ───────────────────────────────────────────────────────────────────────────── + + private int countRows(int offset, int limit) + throws ReadProcessException, IOException, NoTableException, NoMeasurementException { + List columns = Collections.singletonList(FIELD); + ResultSet rs = reader.queryByRow(TABLE, columns, offset, limit); + int count = 0; + while (rs.next()) { + count++; + } + rs.close(); + return count; + } + + /** + * Write a single-chunk table file with {@code numRows} rows. Timestamps are 0..numRows-1 and + * field values equal their timestamps. + */ + private static void writeTableFile( + String filePath, String tableName, String fieldName, int numRows) + throws IOException, WriteProcessException { + TableSchema tableSchema = + new TableSchema( + tableName, + Collections.singletonList(new MeasurementSchema(fieldName, TSDataType.INT64)), + Collections.singletonList(ColumnCategory.FIELD)); + try (ITsFileWriter writer = + new TsFileWriterBuilder().file(new File(filePath)).tableSchema(tableSchema).build()) { + Tablet tablet = + new Tablet( + tableName, + Collections.singletonList(fieldName), + Collections.singletonList(TSDataType.INT64), + Collections.singletonList(ColumnCategory.FIELD), + numRows); + for (int i = 0; i < numRows; i++) { + tablet.addTimestamp(i, i); + tablet.addValue(fieldName, i, (long) i); + } + writer.write(tablet); + } + } + + /** + * Write a two-chunk table file by using a tiny memory threshold to force a flush between the two + * tablets. First chunk has rows 0..chunk1Rows-1, second chunk has rows + * chunk1Rows..chunk1Rows+chunk2Rows-1. Field values equal their timestamps. + */ + private static void writeTableFileMultiChunk( + String filePath, String tableName, String fieldName, int chunk1Rows, int chunk2Rows) + throws IOException, WriteProcessException { + TableSchema tableSchema = + new TableSchema( + tableName, + Collections.singletonList(new MeasurementSchema(fieldName, TSDataType.INT64)), + Collections.singletonList(ColumnCategory.FIELD)); + // memoryThreshold(1) forces a flush after every write, producing multiple chunks. + try (ITsFileWriter writer = + new TsFileWriterBuilder() + .file(new File(filePath)) + .tableSchema(tableSchema) + .memoryThreshold(1) + .build()) { + Tablet tablet1 = + new Tablet( + tableName, + Collections.singletonList(fieldName), + Collections.singletonList(TSDataType.INT64), + Collections.singletonList(ColumnCategory.FIELD), + chunk1Rows); + for (int i = 0; i < chunk1Rows; i++) { + tablet1.addTimestamp(i, i); + tablet1.addValue(fieldName, i, (long) i); + } + writer.write(tablet1); + + Tablet tablet2 = + new Tablet( + tableName, + Collections.singletonList(fieldName), + Collections.singletonList(TSDataType.INT64), + Collections.singletonList(ColumnCategory.FIELD), + chunk2Rows); + for (int i = 0; i < chunk2Rows; i++) { + tablet2.addTimestamp(i, chunk1Rows + i); + tablet2.addValue(fieldName, i, (long) (chunk1Rows + i)); + } + writer.write(tablet2); + } + } +} diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTreeReaderRowQueryTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTreeReaderRowQueryTest.java new file mode 100644 index 000000000..e6a9b4850 --- /dev/null +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/TsFileTreeReaderRowQueryTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.WriteProcessException; +import org.apache.tsfile.read.query.dataset.ResultSet; +import org.apache.tsfile.read.v4.ITsFileTreeReader; +import org.apache.tsfile.read.v4.TsFileTreeReaderBuilder; +import org.apache.tsfile.write.record.TSRecord; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.v4.TsFileTreeWriter; +import org.apache.tsfile.write.v4.TsFileTreeWriterBuilder; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Unit tests for {@link ITsFileTreeReader#queryByRow} (tree-model row-range query). + * + *

Each device has {@value #TOTAL} rows with timestamps 0..TOTAL-1 and values timestamp * 10. + */ +public class TsFileTreeReaderRowQueryTest { + + private static final String FILE_PATH = "test_tree_reader_row_query.tsfile"; + private static final int TOTAL = 50; + private static final String DEVICE = "device"; + private static final String MEA = "s1"; + + private ITsFileTreeReader reader; + + @Before + public void setUp() throws IOException, WriteProcessException { + writeTreeFile(FILE_PATH, Collections.singletonList(DEVICE), MEA, TOTAL); + reader = new TsFileTreeReaderBuilder().file(new File(FILE_PATH)).build(); + } + + @After + public void tearDown() throws IOException { + if (reader != null) { + reader.close(); + } + new File(FILE_PATH).delete(); + } + + // ① limit=0 → empty result + @Test + public void testLimitZeroReturnsEmpty() throws IOException { + Assert.assertEquals(0, countRows(reader, DEVICE, MEA, 0, 0)); + } + + // ② limit < total → exactly `limit` rows + @Test + public void testLimitLessThanTotal() throws IOException { + Assert.assertEquals(20, countRows(reader, DEVICE, MEA, 0, 20)); + } + + // ③ limit > total → all rows + @Test + public void testLimitExceedsTotal() throws IOException { + Assert.assertEquals(TOTAL, countRows(reader, DEVICE, MEA, 0, 1000)); + } + + // ④ limit=-1 → unlimited, returns all rows + @Test + public void testNegativeLimitMeansUnlimited() throws IOException { + Assert.assertEquals(TOTAL, countRows(reader, DEVICE, MEA, 0, -1)); + } + + // ⑤ offset + limit in the middle + @Test + public void testOffsetPlusLimit() throws IOException { + Assert.assertEquals(15, countRows(reader, DEVICE, MEA, 10, 15)); + } + + // ⑥ offset >= total → empty result + @Test + public void testOffsetBeyondTotal() throws IOException { + Assert.assertEquals(0, countRows(reader, DEVICE, MEA, 1000, 10)); + } + + // ⑦ offset + limit > total → return remaining rows from offset + @Test + public void testOffsetPlusLimitExceedsTotal() throws IOException { + // offset=40, limit=20 → only 10 rows remain + Assert.assertEquals(10, countRows(reader, DEVICE, MEA, 40, 20)); + } + + // ⑧ data correctness: timestamps and values start from `offset` + @Test + public void testDataCorrectness() throws IOException { + List deviceIds = Collections.singletonList(DEVICE); + List measurements = Collections.singletonList(MEA); + ResultSet rs = reader.queryByRow(deviceIds, measurements, 5, 10); + int count = 0; + while (rs.next()) { + long ts = rs.getLong(1); // column 1 = Time + long val = rs.getLong(2); // column 2 = measurement + Assert.assertEquals(5 + count, ts); + Assert.assertEquals((5 + count) * 10L, val); + count++; + } + rs.close(); + Assert.assertEquals(10, count); + } + + // ⑨ metadata is accessible via the result set + @Test + public void testMetadataAccessible() throws IOException { + List deviceIds = Collections.singletonList(DEVICE); + List measurements = Collections.singletonList(MEA); + ResultSet rs = reader.queryByRow(deviceIds, measurements, 0, 5); + Assert.assertNotNull(rs.getMetadata()); + // Column 1 is always "Time"; column 2 is the full path "device.s1" + Assert.assertEquals("Time", rs.getMetadata().getColumnName(1)); + Assert.assertEquals(DEVICE + "." + MEA, rs.getMetadata().getColumnName(2)); + rs.close(); + } + + // ⑩ paging consistency: two pages together equal the full result + @Test + public void testPaginationConsistency() throws IOException { + int page1 = countRows(reader, DEVICE, MEA, 0, 25); + int page2 = countRows(reader, DEVICE, MEA, 25, 25); + Assert.assertEquals(TOTAL, page1 + page2); + } + + // ⑪ multiple devices: offset/limit applied to merged result + @Test + public void testMultipleDevices() throws IOException, WriteProcessException { + String filePath = "test_tree_reader_row_query_multi.tsfile"; + writeTreeFile(filePath, Arrays.asList("dev1", "dev2"), MEA, 20); + try (ITsFileTreeReader r = new TsFileTreeReaderBuilder().file(new File(filePath)).build()) { + int count = countRows(r, null, MEA, 5, 10); + Assert.assertEquals(10, count); + } finally { + new File(filePath).delete(); + } + } + + // ───────────────────────────────────────────────────────────────────────────── + // Helpers + // ───────────────────────────────────────────────────────────────────────────── + + /** + * Count rows returned by {@code queryByRow}. + * + * @param r reader (already open) + * @param device device ID; if {@code null}, both "dev1" and "dev2" are queried + * @param mea measurement name + * @param offset row offset + * @param limit row limit + */ + private int countRows(ITsFileTreeReader r, String device, String mea, int offset, int limit) + throws IOException { + List deviceIds = + device != null ? Collections.singletonList(device) : Arrays.asList("dev1", "dev2"); + List measurements = Collections.singletonList(mea); + ResultSet rs = r.queryByRow(deviceIds, measurements, offset, limit); + int count = 0; + while (rs.next()) { + count++; + } + rs.close(); + return count; + } + + /** + * Write a tree-model TsFile with the given devices and measurement. Timestamps are 0..numRows-1, + * values are timestamp * 10 (INT64). + */ + private static void writeTreeFile( + String filePath, List deviceIds, String measurement, int numRows) + throws IOException, WriteProcessException { + File file = new File(filePath); + MeasurementSchema schema = new MeasurementSchema(measurement, TSDataType.INT64); + try (TsFileTreeWriter writer = new TsFileTreeWriterBuilder().file(file).build()) { + for (String deviceId : deviceIds) { + writer.registerTimeseries(deviceId, schema); + } + for (String deviceId : deviceIds) { + for (int i = 0; i < numRows; i++) { + TSRecord record = new TSRecord(deviceId, i); + record.addPoint(measurement, (long) i * 10); + writer.write(record); + } + } + } + } +} diff --git a/python/tests/test_row_query.py b/python/tests/test_row_query.py new file mode 100644 index 000000000..6914a8a8c --- /dev/null +++ b/python/tests/test_row_query.py @@ -0,0 +1,332 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +"""Tests for query_tree_by_row and query_table_by_row on TsFileReader.""" + +import pytest + +from tsfile import ( + TsFileWriter, + TsFileReader, + TsFileTableWriter, + TSDataType, + TimeseriesSchema, + ColumnSchema, + TableSchema, + ColumnCategory, + Tablet, + RowRecord, + Field, +) + +# ───────────────────────────────────────────────────────────────────────────── +# Shared constants +# ───────────────────────────────────────────────────────────────────────────── + +TOTAL = 50 +DEVICE = "root.device1" +MEA = "s1" +TABLE = "t1" + + +# ───────────────────────────────────────────────────────────────────────────── +# Write helpers +# ───────────────────────────────────────────────────────────────────────────── + +def _write_tree_file(filepath, device, measurement, num_rows): + """Write `num_rows` records for device/measurement (INT64). + Timestamps are 0..num_rows-1, values are timestamp * 10. + """ + writer = TsFileWriter(filepath) + writer.register_timeseries(device, TimeseriesSchema(measurement, TSDataType.INT64)) + for i in range(num_rows): + writer.write_row_record( + RowRecord(device, i, [Field(measurement, i * 10, TSDataType.INT64)]) + ) + writer.close() + + +def _write_table_file(filepath, table_name, num_rows): + """Write `num_rows` rows into a table with a single INT64 field column 's0'. + Timestamps and values are both 0..num_rows-1. + """ + schema = TableSchema( + table_name, + [ColumnSchema("s0", TSDataType.INT64, ColumnCategory.FIELD)], + ) + with TsFileTableWriter(filepath, schema) as writer: + tablet = Tablet(["s0"], [TSDataType.INT64], num_rows) + for i in range(num_rows): + tablet.add_timestamp(i, i) + tablet.add_value_by_index(0, i, i) + writer.write_table(tablet) + + +# ───────────────────────────────────────────────────────────────────────────── +# Count helpers +# ───────────────────────────────────────────────────────────────────────────── + +def _count_tree(filepath, device, measurement, offset, limit): + with TsFileReader(filepath) as r: + rs = r.query_tree_by_row([device], [measurement], offset, limit) + count = 0 + while rs.next(): + count += 1 + rs.close() + return count + + +def _count_table(filepath, table_name, offset, limit): + with TsFileReader(filepath) as r: + rs = r.query_table_by_row(table_name, ["s0"], offset, limit) + count = 0 + while rs.next(): + count += 1 + rs.close() + return count + + +# ───────────────────────────────────────────────────────────────────────────── +# Fixtures +# ───────────────────────────────────────────────────────────────────────────── + +@pytest.fixture +def tree_file(tmp_path): + fp = str(tmp_path / "tree_row_query.tsfile") + _write_tree_file(fp, DEVICE, MEA, TOTAL) + return fp + + +@pytest.fixture +def table_file(tmp_path): + fp = str(tmp_path / "table_row_query.tsfile") + _write_table_file(fp, TABLE, TOTAL) + return fp + + +# ───────────────────────────────────────────────────────────────────────────── +# Tree model tests — query_tree_by_row +# ───────────────────────────────────────────────────────────────────────────── + +class TestQueryTreeByRow: + + # ① limit=0 → empty result + def test_limit_zero(self, tree_file): + assert _count_tree(tree_file, DEVICE, MEA, 0, 0) == 0 + + # ② limit < total → exactly `limit` rows + def test_limit_less_than_total(self, tree_file): + assert _count_tree(tree_file, DEVICE, MEA, 0, 20) == 20 + + # ③ limit > total → all rows + def test_limit_exceeds_total(self, tree_file): + assert _count_tree(tree_file, DEVICE, MEA, 0, 9999) == TOTAL + + # ④ limit=-1 → unlimited, returns all rows + def test_negative_limit_means_unlimited(self, tree_file): + assert _count_tree(tree_file, DEVICE, MEA, 0, -1) == TOTAL + + # ⑤ offset + limit in the middle + def test_offset_plus_limit(self, tree_file): + assert _count_tree(tree_file, DEVICE, MEA, 10, 15) == 15 + + # ⑥ offset >= total → empty result + def test_offset_beyond_total(self, tree_file): + assert _count_tree(tree_file, DEVICE, MEA, 1000, 10) == 0 + + # ⑦ offset + limit > total → return remaining rows + def test_offset_plus_limit_exceeds_total(self, tree_file): + # offset=40, limit=20 → 10 rows remain + assert _count_tree(tree_file, DEVICE, MEA, 40, 20) == 10 + + # ⑧ data correctness: verify timestamps start from `offset` + def test_data_correctness(self, tree_file): + with TsFileReader(tree_file) as r: + rs = r.query_tree_by_row([DEVICE], [MEA], 5, 10) + count = 0 + while rs.next(): + ts = rs.get_value_by_index(1) # column 1 = time + val = rs.get_value_by_index(2) # column 2 = measurement + assert ts == 5 + count + assert val == (5 + count) * 10 + count += 1 + rs.close() + assert count == 10 + + # ⑨ paging consistency: two pages together equal the full result + def test_pagination_consistency(self, tree_file): + p1 = _count_tree(tree_file, DEVICE, MEA, 0, 25) + p2 = _count_tree(tree_file, DEVICE, MEA, 25, 25) + assert p1 + p2 == TOTAL + + # ⑩ metadata is accessible via the result set + def test_metadata_accessible(self, tree_file): + with TsFileReader(tree_file) as r: + rs = r.query_tree_by_row([DEVICE], [MEA], 0, 5) + meta = rs.get_metadata() + assert meta is not None + col_names = meta.get_column_list() + assert "time" in col_names + # Tree model returns full path as column name (e.g. "root.device1.s1") + assert f"{DEVICE}.{MEA}".lower() in col_names + rs.close() + + # ⑪ context-manager usage + def test_context_manager(self, tree_file): + count = 0 + with TsFileReader(tree_file) as r: + with r.query_tree_by_row([DEVICE], [MEA], 0, 10) as rs: + while rs.next(): + count += 1 + assert count == 10 + + # ⑫ multiple devices: offset/limit applied to merged result + def test_multiple_devices(self, tmp_path): + fp = str(tmp_path / "tree_multi_device.tsfile") + # Both devices share the same timestamps 0..19. + # The merger collapses identical timestamps → 20 distinct rows. + # Use a single writer for both devices. + writer = TsFileWriter(fp) + writer.register_timeseries("device_1", TimeseriesSchema(MEA, TSDataType.INT64)) + writer.register_timeseries("device_2", TimeseriesSchema(MEA, TSDataType.INT64)) + for i in range(20): + writer.write_row_record( + RowRecord("device_1", i, [Field(MEA, i * 10, TSDataType.INT64)]) + ) + writer.write_row_record( + RowRecord("device_2", i, [Field(MEA, i * 20, TSDataType.INT64)]) + ) + writer.close() + + with TsFileReader(fp) as r: + rs = r.query_tree_by_row(["device_1", "device_2"], [MEA], 5, 10) + count = 0 + while rs.next(): + count += 1 + rs.close() + assert count == 10 + + +# ───────────────────────────────────────────────────────────────────────────── +# Table model tests — query_table_by_row +# ───────────────────────────────────────────────────────────────────────────── + +class TestQueryTableByRow: + + # ① limit=0 → empty result + def test_limit_zero(self, table_file): + assert _count_table(table_file, TABLE, 0, 0) == 0 + + # ② limit < total → exactly `limit` rows + def test_limit_less_than_total(self, table_file): + assert _count_table(table_file, TABLE, 0, 10) == 10 + + # ③ limit > total → all rows + def test_limit_exceeds_total(self, table_file): + assert _count_table(table_file, TABLE, 0, 9999) == TOTAL + + # ④ limit=-1 → unlimited, returns all rows + def test_negative_limit_means_unlimited(self, table_file): + assert _count_table(table_file, TABLE, 0, -1) == TOTAL + + # ⑤ offset + limit in the middle + def test_offset_plus_limit(self, table_file): + assert _count_table(table_file, TABLE, 10, 15) == 15 + + # ⑥ offset >= total → empty result + def test_offset_beyond_total(self, table_file): + assert _count_table(table_file, TABLE, 1000, 10) == 0 + + # ⑦ offset + limit > total → return remaining rows + def test_offset_plus_limit_exceeds_total(self, table_file): + # offset=40, limit=20 → 10 rows remain + assert _count_table(table_file, TABLE, 40, 20) == 10 + + # ⑧ data correctness: timestamps and values start from `offset` + def test_data_correctness(self, table_file): + with TsFileReader(table_file) as r: + rs = r.query_table_by_row(TABLE, ["s0"], 5, 10) + count = 0 + while rs.next(): + ts = rs.get_value_by_index(1) # column 1 = time + val = rs.get_value_by_index(2) # column 2 = s0 + assert ts == 5 + count + assert val == 5 + count + count += 1 + rs.close() + assert count == 10 + + # ⑨ paging consistency: two pages together equal the full result + def test_pagination_consistency(self, table_file): + p1 = _count_table(table_file, TABLE, 0, 25) + p2 = _count_table(table_file, TABLE, 25, 25) + assert p1 + p2 == TOTAL + + # ⑩ metadata is accessible via the result set + def test_metadata_accessible(self, table_file): + with TsFileReader(table_file) as r: + rs = r.query_table_by_row(TABLE, ["s0"], 0, 5) + meta = rs.get_metadata() + assert meta is not None + col_names = meta.get_column_list() + assert "time" in col_names + assert "s0" in col_names + rs.close() + + # ⑪ context-manager usage + def test_context_manager(self, table_file): + count = 0 + with TsFileReader(table_file) as r: + with r.query_table_by_row(TABLE, ["s0"], 0, 10) as rs: + while rs.next(): + count += 1 + assert count == 10 + + # ⑫ multiple flushes (multiple chunks): offset/limit still correct + def test_multiple_chunks_correctness(self, tmp_path): + fp = str(tmp_path / "table_multi_chunk.tsfile") + schema = TableSchema( + TABLE, + [ColumnSchema("s0", TSDataType.INT64, ColumnCategory.FIELD)], + ) + with TsFileTableWriter(fp, schema) as writer: + # First chunk: rows 0..29 + tablet1 = Tablet(["s0"], [TSDataType.INT64], 30) + for i in range(30): + tablet1.add_timestamp(i, i) + tablet1.add_value_by_index(0, i, i) + writer.write_table(tablet1) + writer.flush() + + # Second chunk: rows 30..59 + tablet2 = Tablet(["s0"], [TSDataType.INT64], 30) + for i in range(30): + tablet2.add_timestamp(i, 30 + i) + tablet2.add_value_by_index(0, i, 30 + i) + writer.write_table(tablet2) + + # offset=25, limit=20 → rows 25..44 + with TsFileReader(fp) as r: + rs = r.query_table_by_row(TABLE, ["s0"], 25, 20) + count = 0 + while rs.next(): + ts = rs.get_value_by_index(1) + assert ts == 25 + count + count += 1 + rs.close() + assert count == 20 diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index 9c65fb26f..c394c09bb 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -189,6 +189,20 @@ cdef extern from "./tsfile_cwrapper.h": char ** sensor_name, uint32_t sensor_num, int64_t start_time, int64_t end_time, ErrorCode *err_code) + ResultSet tsfile_reader_query_tree_by_row(TsFileReader reader, + char** device_ids, int device_ids_len, + char** measurement_names, + int measurement_names_len, + int offset, int limit, + ErrorCode* err_code) + + ResultSet tsfile_reader_query_table_by_row(TsFileReader reader, + const char* table_name, + char** column_names, + int column_names_len, + int offset, int limit, + ErrorCode* err_code) + TableSchema tsfile_reader_get_table_schema(TsFileReader reader, const char * table_name); diff --git a/python/tsfile/tsfile_py_cpp.pxd b/python/tsfile/tsfile_py_cpp.pxd index 2389aa9a6..16540effa 100644 --- a/python/tsfile/tsfile_py_cpp.pxd +++ b/python/tsfile/tsfile_py_cpp.pxd @@ -54,6 +54,10 @@ cdef public api ResultSet tsfile_reader_query_table_on_tree_c(TsFileReader reade int64_t start_time, int64_t end_time) cdef public api ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_name, object sensor_list, int64_t start_time, int64_t end_time) +cdef public api ResultSet tsfile_reader_query_tree_by_row_c(TsFileReader reader, object device_ids, + object measurement_names, int offset, int limit) +cdef public api ResultSet tsfile_reader_query_table_by_row_c(TsFileReader reader, object table_name, + object column_names, int offset, int limit) cdef public api object get_table_schema(TsFileReader reader, object table_name) cdef public api object get_all_table_schema(TsFileReader reader) cdef public api object get_all_timeseries_schema(TsFileReader reader) diff --git a/python/tsfile/tsfile_py_cpp.pyx b/python/tsfile/tsfile_py_cpp.pyx index 3ca79a2a1..179d95c25 100644 --- a/python/tsfile/tsfile_py_cpp.pyx +++ b/python/tsfile/tsfile_py_cpp.pyx @@ -779,6 +779,81 @@ cdef ResultSet tsfile_reader_query_paths_c(TsFileReader reader, object device_na free( sensor_list_c) sensor_list_c = NULL +cdef ResultSet tsfile_reader_query_tree_by_row_c( + TsFileReader reader, object device_ids, object measurement_names, + int offset, int limit): + cdef int dev_num = len(device_ids) + cdef int mea_num = len(measurement_names) + cdef char** c_devs = malloc(sizeof(char*) * dev_num) + cdef char** c_meas = malloc(sizeof(char*) * mea_num) + cdef ErrorCode code = 0 + cdef int i + if c_devs == NULL or c_meas == NULL: + if c_devs != NULL: + free( c_devs) + if c_meas != NULL: + free( c_meas) + raise MemoryError("Failed to allocate memory for query arrays") + for i in range(dev_num): + c_devs[i] = NULL + for i in range(mea_num): + c_meas[i] = NULL + try: + for i in range(dev_num): + c_devs[i] = strdup(( device_ids[i]).encode('utf-8')) + if c_devs[i] == NULL: + raise MemoryError("Failed to allocate memory for device id") + for i in range(mea_num): + c_meas[i] = strdup(( measurement_names[i]).encode('utf-8')) + if c_meas[i] == NULL: + raise MemoryError("Failed to allocate memory for measurement name") + result = tsfile_reader_query_tree_by_row( + reader, c_devs, dev_num, c_meas, mea_num, offset, limit, &code) + check_error(code) + return result + finally: + for i in range(dev_num): + if c_devs[i] != NULL: + free( c_devs[i]) + c_devs[i] = NULL + free( c_devs) + for i in range(mea_num): + if c_meas[i] != NULL: + free( c_meas[i]) + c_meas[i] = NULL + free( c_meas) + + +cdef ResultSet tsfile_reader_query_table_by_row_c( + TsFileReader reader, object table_name, object column_names, + int offset, int limit): + cdef int col_num = len(column_names) + cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name) + cdef const char* table_name_c = table_name_bytes + cdef char** c_cols = malloc(sizeof(char*) * col_num) + cdef ErrorCode code = 0 + cdef int i + if c_cols == NULL: + raise MemoryError("Failed to allocate memory for column names") + for i in range(col_num): + c_cols[i] = NULL + try: + for i in range(col_num): + c_cols[i] = strdup(( column_names[i]).encode('utf-8')) + if c_cols[i] == NULL: + raise MemoryError("Failed to allocate memory for column name") + result = tsfile_reader_query_table_by_row( + reader, table_name_c, c_cols, col_num, offset, limit, &code) + check_error(code) + return result + finally: + for i in range(col_num): + if c_cols[i] != NULL: + free( c_cols[i]) + c_cols[i] = NULL + free( c_cols) + + cdef object get_table_schema(TsFileReader reader, object table_name): cdef bytes table_name_bytes = PyUnicode_AsUTF8String(table_name) cdef const char * table_name_c = table_name_bytes diff --git a/python/tsfile/tsfile_reader.pyx b/python/tsfile/tsfile_reader.pyx index 4476d24dc..637444571 100644 --- a/python/tsfile/tsfile_reader.pyx +++ b/python/tsfile/tsfile_reader.pyx @@ -320,6 +320,54 @@ cdef class TsFileReaderPy: self.activate_result_set_list.add(pyresult) return pyresult + def query_tree_by_row(self, device_ids: List[str], measurement_names: List[str], + offset: int, limit: int) -> ResultSetPy: + """ + Query tree model data by row range. + + Internally queries the full time range and applies offset/limit at the + result-set level. Once ``limit`` rows are returned, no further data is + loaded from storage. + + :param device_ids: List of device identifiers to query. + :param measurement_names: List of measurement names to query. + :param offset: Number of leading rows to skip (>= 0). + :param limit: Maximum number of rows to return. < 0 means unlimited. + :return: ResultSet containing query results. + """ + cdef ResultSet result + result = tsfile_reader_query_tree_by_row_c( + self.reader, device_ids, measurement_names, offset, limit) + pyresult = ResultSetPy(self, True) + pyresult.init_c(result, device_ids[0] if device_ids else "") + self.activate_result_set_list.add(pyresult) + return pyresult + + def query_table_by_row(self, table_name: str, column_names: List[str], + offset: int, limit: int) -> ResultSetPy: + """ + Query table model data by row range. + + Internally queries the full time range and applies offset/limit at the + result-set level. Once ``limit`` rows are returned, no further data is + loaded from storage. + + :param table_name: Target table name. + :param column_names: List of column names to query. + :param offset: Number of leading rows to skip (>= 0). + :param limit: Maximum number of rows to return. < 0 means unlimited. + :return: ResultSet containing query results. + """ + cdef ResultSet result + result = tsfile_reader_query_table_by_row_c( + self.reader, table_name.lower(), + [column_name.lower() for column_name in column_names], + offset, limit) + pyresult = ResultSetPy(self) + pyresult.init_c(result, table_name) + self.activate_result_set_list.add(pyresult) + return pyresult + def notify_result_set_discard(self, result_set: ResultSetPy): """ Remove activate result set from activate_result_set_list, called when a result set close.