Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cpp/src/common/tsblock/tsblock.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ class TsBlock {

FORCE_INLINE uint32_t get_row_count() const { return row_count_; }

FORCE_INLINE void set_row_count(uint32_t count) {
ASSERT(count <= row_count_);
row_count_ = count;
}

FORCE_INLINE TupleDesc* get_tuple_desc() const { return tuple_desc_; }

FORCE_INLINE Vector* get_vector(uint32_t index) { return vectors_[index]; }
Expand Down
40 changes: 40 additions & 0 deletions cpp/src/cwrapper/tsfile_cwrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
#include <unistd.h>
#include <writer/tsfile_table_writer.h>

#include <climits>
#include <set>

#include "common/tablet.h"
#include "reader/result_set.h"
#include "reader/row_range_result_set.h"
#include "reader/tsfile_reader.h"
#include "writer/tsfile_writer.h"

Expand Down Expand Up @@ -770,6 +772,44 @@ ERRNO _tsfile_writer_flush(TsFileWriter writer) {
return w->flush();
}

ResultSet tsfile_reader_query_tree_by_row(TsFileReader reader,
char** device_ids, int device_ids_len,
char** measurement_names,
int measurement_names_len, int offset,
int limit, ERRNO* err_code) {
auto* r = static_cast<storage::TsFileReader*>(reader);
std::vector<std::string> path_list;
path_list.reserve(device_ids_len * measurement_names_len);
for (int i = 0; i < device_ids_len; i++) {
for (int j = 0; j < measurement_names_len; j++) {
path_list.push_back(std::string(device_ids[i]) + "." +
std::string(measurement_names[j]));
}
}
storage::ResultSet* inner = nullptr;
*err_code = r->query(path_list, INT64_MIN, INT64_MAX, inner);
if (*err_code != common::E_OK) return nullptr;
return new storage::RowRangeResultSet(inner, offset, limit);
}

ResultSet tsfile_reader_query_table_by_row(TsFileReader reader,
const char* table_name,
char** column_names,
int column_names_len, int offset,
int limit, ERRNO* err_code) {
auto* r = static_cast<storage::TsFileReader*>(reader);
std::vector<std::string> cols;
cols.reserve(column_names_len);
for (int i = 0; i < column_names_len; i++) {
cols.emplace_back(column_names[i]);
}
storage::ResultSet* inner = nullptr;
*err_code =
r->query(std::string(table_name), cols, INT64_MIN, INT64_MAX, inner);
if (*err_code != common::E_OK) return nullptr;
return new storage::RowRangeResultSet(inner, offset, limit);
}

ResultSet _tsfile_reader_query_device(TsFileReader reader,
const char* device_name,
char** sensor_name, uint32_t sensor_num,
Expand Down
46 changes: 46 additions & 0 deletions cpp/src/cwrapper/tsfile_cwrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,52 @@ ResultSet tsfile_query_table(TsFileReader reader, const char* table_name,
ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns,
uint32_t column_num, Timestamp start_time,
Timestamp end_time, ERRNO* err_code);

/**
* @brief Query tree model time series data by row range (offset + limit).
*
* Combines all device/measurement paths, queries the full time range, then
* skips the first `offset` rows and returns at most `limit` rows. Once
* `limit` rows are returned, underlying data loading stops immediately.
*
* @param reader [in] Valid TsFileReader handle.
* @param device_ids [in] Array of device IDs.
* @param device_ids_len [in] Number of device IDs.
* @param measurement_names [in] Array of measurement names.
* @param measurement_names_len [in] Number of measurement names.
* @param offset [in] Rows to skip (>= 0).
* @param limit [in] Max rows to return (< 0 = unlimited).
* @param err_code [out] Error code.
* @return ResultSet handle. Must be freed with free_tsfile_result_set().
*/
ResultSet tsfile_reader_query_tree_by_row(TsFileReader reader,
char** device_ids, int device_ids_len,
char** measurement_names,
int measurement_names_len, int offset,
int limit, ERRNO* err_code);

/**
* @brief Query table model data by row range (offset + limit).
*
* Queries the full time range, skips the first `offset` rows, and returns at
* most `limit` rows. Once `limit` rows are returned, underlying data loading
* stops immediately.
*
* @param reader [in] Valid TsFileReader handle.
* @param table_name [in] Target table name.
* @param column_names [in] Array of column names.
* @param column_names_len [in] Number of column names.
* @param offset [in] Rows to skip (>= 0).
* @param limit [in] Max rows to return (< 0 = unlimited).
* @param err_code [out] Error code.
* @return ResultSet handle. Must be freed with free_tsfile_result_set().
*/
ResultSet tsfile_reader_query_table_by_row(TsFileReader reader,
const char* table_name,
char** column_names,
int column_names_len, int offset,
int limit, ERRNO* err_code);

// ResultSet tsfile_reader_query_device(TsFileReader reader,
// const char* device_name,
// char** sensor_name, uint32_t sensor_num,
Expand Down
10 changes: 8 additions & 2 deletions cpp/src/file/tsfile_io_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ void TsFileIOReader::reset() {
int TsFileIOReader::alloc_ssi(std::shared_ptr<IDeviceID> device_id,
const std::string& measurement_name,
TsFileSeriesScanIterator*& ssi,
common::PageArena& pa, Filter* time_filter) {
common::PageArena& pa, Filter* time_filter,
int32_t offset, int32_t limit) {
int ret = E_OK;
if (RET_FAIL(load_tsfile_meta_if_necessary())) {
} else {
Expand All @@ -69,7 +70,12 @@ int TsFileIOReader::alloc_ssi(std::shared_ptr<IDeviceID> device_id,
} else if (time_filter != nullptr &&
!filter_stasify(ssi->itimeseries_index_, time_filter)) {
ret = E_NO_MORE_DATA;
} else if (RET_FAIL(ssi->init_chunk_reader())) {
} else {
if (offset > 0 || limit >= 0) {
ssi->set_row_range(offset, limit);
}
if (RET_FAIL(ssi->init_chunk_reader())) {
}
}
if (ret != E_OK) {
ssi->destroy();
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/file/tsfile_io_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class TsFileIOReader {
int alloc_ssi(std::shared_ptr<IDeviceID> device_id,
const std::string& measurement_name,
TsFileSeriesScanIterator*& ssi, common::PageArena& pa,
Filter* time_filter = nullptr);
Filter* time_filter = nullptr, int32_t offset = 0,
int32_t limit = -1);

void revert_ssi(TsFileSeriesScanIterator* ssi);

Expand Down
35 changes: 35 additions & 0 deletions cpp/src/reader/aligned_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,41 @@ int AlignedChunkReader::get_next_page(TsBlock* ret_tsblock,
return ret;
}

int AlignedChunkReader::skip_pages(int32_t rows_to_skip,
int32_t& rows_skipped) {
int ret = E_OK;
rows_skipped = 0;
if (chunk_has_only_one_page(time_chunk_header_)) {
return ret;
}
while (rows_to_skip > 0 && has_more_data() &&
!prev_time_page_not_finish() && !prev_value_page_not_finish()) {
if (RET_FAIL(get_cur_page_header(
time_chunk_meta_, time_in_stream_, cur_time_page_header_,
time_chunk_visit_offset_, time_chunk_header_))) {
break;
}
if (RET_FAIL(get_cur_page_header(
value_chunk_meta_, value_in_stream_, cur_value_page_header_,
value_chunk_visit_offset_, value_chunk_header_))) {
break;
}
int32_t page_count = cur_time_page_header_.statistic_ != nullptr
? cur_time_page_header_.statistic_->get_count()
: 0;
if (page_count > 0 && rows_to_skip >= page_count) {
if (RET_FAIL(skip_cur_page())) {
break;
}
rows_to_skip -= page_count;
rows_skipped += page_count;
} else {
break;
}
}
return ret;
}

int AlignedChunkReader::get_cur_page_header(ChunkMeta*& chunk_meta,
common::ByteStream& in_stream,
PageHeader& cur_page_header,
Expand Down
1 change: 1 addition & 0 deletions cpp/src/reader/aligned_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class AlignedChunkReader : public IChunkReader {

int get_next_page(common::TsBlock* tsblock, Filter* oneshoot_filter,
common::PageArena& pa) override;
int skip_pages(int32_t rows_to_skip, int32_t& rows_skipped) override;

private:
FORCE_INLINE bool chunk_has_only_one_page(
Expand Down
14 changes: 10 additions & 4 deletions cpp/src/reader/block/device_ordered_tsblock_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ namespace storage {

int DeviceOrderedTsBlockReader::has_next(bool& has_next) {
int ret = common::E_OK;
if (remaining_limit_ == 0) {
has_next = false;
return common::E_OK;
}
if (current_reader_ != nullptr &&
IS_SUCC(current_reader_->has_next(has_next)) && has_next) {
return common::E_OK;
Expand All @@ -47,22 +51,24 @@ int DeviceOrderedTsBlockReader::has_next(bool& has_next) {
return common::E_OOM;
}
if (RET_FAIL(current_reader_->init(task, block_size_, time_filter_,
field_filter_))) {
field_filter_, remaining_offset_,
remaining_limit_))) {
delete current_reader_;
current_reader_ = nullptr;
return ret;
}
// Update remaining offset/limit from what SingleDeviceTsBlockReader
// consumed (for aligned data, SSIs handle it; retrieve remaining)
remaining_offset_ = current_reader_->get_remaining_offset();
remaining_limit_ = current_reader_->get_remaining_limit();

if (RET_FAIL(current_reader_->has_next(has_next))) {
return ret;
}
// If current device has data, just return.
if (has_next) {
return ret;
}
// If current device does not have data, get next device.

// Free current device reader.
if (current_reader_) {
delete current_reader_;
current_reader_ = nullptr;
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/reader/block/device_ordered_tsblock_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ class DeviceOrderedTsBlockReader : public TsBlockReader {
std::unique_ptr<DeviceTaskIterator> device_task_iterator,
IMetadataQuerier* metadata_querier, int32_t block_size,
TsFileIOReader* tsfile_io_reader, Filter* time_filter,
Filter* field_filter)
Filter* field_filter, int32_t offset = 0, int32_t limit = -1)
: device_task_iterator_(std::move(device_task_iterator)),
metadata_querier_(metadata_querier),
block_size_(block_size),
tsfile_io_reader_(tsfile_io_reader),
time_filter_(time_filter),
field_filter_(field_filter) {}
field_filter_(field_filter),
remaining_offset_(offset),
remaining_limit_(limit) {}
~DeviceOrderedTsBlockReader() override { close(); }

int has_next(bool& has_next) override;
Expand All @@ -54,6 +56,8 @@ class DeviceOrderedTsBlockReader : public TsBlockReader {
TsFileIOReader* tsfile_io_reader_;
Filter* time_filter_ = nullptr;
Filter* field_filter_ = nullptr;
int32_t remaining_offset_;
int32_t remaining_limit_;
};
} // namespace storage

Expand Down
Loading
Loading