Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions cpp/src/cwrapper/tsfile_cwrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
#include <unistd.h>
#include <writer/tsfile_table_writer.h>

#include <climits>
#include <set>

#include "common/tablet.h"
#include "reader/result_set.h"
#include "reader/row_range_result_set.h"
#include "reader/tsfile_reader.h"
#include "writer/tsfile_writer.h"

Expand Down Expand Up @@ -770,6 +772,44 @@ ERRNO _tsfile_writer_flush(TsFileWriter writer) {
return w->flush();
}

ResultSet tsfile_reader_query_tree_by_row(TsFileReader reader,
char** device_ids, int device_ids_len,
char** measurement_names,
int measurement_names_len, int offset,
int limit, ERRNO* err_code) {
auto* r = static_cast<storage::TsFileReader*>(reader);
std::vector<std::string> path_list;
path_list.reserve(device_ids_len * measurement_names_len);
for (int i = 0; i < device_ids_len; i++) {
for (int j = 0; j < measurement_names_len; j++) {
path_list.push_back(std::string(device_ids[i]) + "." +
std::string(measurement_names[j]));
}
}
storage::ResultSet* inner = nullptr;
*err_code = r->query(path_list, INT64_MIN, INT64_MAX, inner);
if (*err_code != common::E_OK) return nullptr;
return new storage::RowRangeResultSet(inner, offset, limit);
}

ResultSet tsfile_reader_query_table_by_row(TsFileReader reader,
const char* table_name,
char** column_names,
int column_names_len, int offset,
int limit, ERRNO* err_code) {
auto* r = static_cast<storage::TsFileReader*>(reader);
std::vector<std::string> cols;
cols.reserve(column_names_len);
for (int i = 0; i < column_names_len; i++) {
cols.emplace_back(column_names[i]);
}
storage::ResultSet* inner = nullptr;
*err_code =
r->query(std::string(table_name), cols, INT64_MIN, INT64_MAX, inner);
if (*err_code != common::E_OK) return nullptr;
return new storage::RowRangeResultSet(inner, offset, limit);
}

ResultSet _tsfile_reader_query_device(TsFileReader reader,
const char* device_name,
char** sensor_name, uint32_t sensor_num,
Expand Down
46 changes: 46 additions & 0 deletions cpp/src/cwrapper/tsfile_cwrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,52 @@ ResultSet tsfile_query_table(TsFileReader reader, const char* table_name,
ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns,
uint32_t column_num, Timestamp start_time,
Timestamp end_time, ERRNO* err_code);

/**
* @brief Query tree model time series data by row range (offset + limit).
*
* Combines all device/measurement paths, queries the full time range, then
* skips the first `offset` rows and returns at most `limit` rows. Once
* `limit` rows are returned, underlying data loading stops immediately.
*
* @param reader [in] Valid TsFileReader handle.
* @param device_ids [in] Array of device IDs.
* @param device_ids_len [in] Number of device IDs.
* @param measurement_names [in] Array of measurement names.
* @param measurement_names_len [in] Number of measurement names.
* @param offset [in] Rows to skip (>= 0).
* @param limit [in] Max rows to return (< 0 = unlimited).
* @param err_code [out] Error code.
* @return ResultSet handle. Must be freed with free_tsfile_result_set().
*/
ResultSet tsfile_reader_query_tree_by_row(TsFileReader reader,
char** device_ids, int device_ids_len,
char** measurement_names,
int measurement_names_len, int offset,
int limit, ERRNO* err_code);

/**
* @brief Query table model data by row range (offset + limit).
*
* Queries the full time range, skips the first `offset` rows, and returns at
* most `limit` rows. Once `limit` rows are returned, underlying data loading
* stops immediately.
*
* @param reader [in] Valid TsFileReader handle.
* @param table_name [in] Target table name.
* @param column_names [in] Array of column names.
* @param column_names_len [in] Number of column names.
* @param offset [in] Rows to skip (>= 0).
* @param limit [in] Max rows to return (< 0 = unlimited).
* @param err_code [out] Error code.
* @return ResultSet handle. Must be freed with free_tsfile_result_set().
*/
ResultSet tsfile_reader_query_table_by_row(TsFileReader reader,
const char* table_name,
char** column_names,
int column_names_len, int offset,
int limit, ERRNO* err_code);

// ResultSet tsfile_reader_query_device(TsFileReader reader,
// const char* device_name,
// char** sensor_name, uint32_t sensor_num,
Expand Down
87 changes: 87 additions & 0 deletions cpp/src/reader/row_range_result_set.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "reader/row_range_result_set.h"

namespace storage {

RowRangeResultSet::RowRangeResultSet(ResultSet* inner, int offset, int limit)
: inner_(inner),
offset_(offset < 0 ? 0 : offset),
limit_(limit),
returned_count_(0),
offset_skipped_(false) {}

RowRangeResultSet::~RowRangeResultSet() { close(); }

int RowRangeResultSet::next(bool& has_next) {
// ① Skip the first `offset_` rows on the first call.
if (!offset_skipped_) {
for (int i = 0; i < offset_; i++) {
int ret = inner_->next(has_next);
if (ret != common::E_OK) return ret;
if (!has_next) {
has_next = false;
offset_skipped_ = true;
return common::E_OK;
}
}
offset_skipped_ = true;
}

// ② Limit reached: return immediately without touching inner ResultSet.
// This is the key "pushdown" effect: no further chunk/page loading
// occurs.
if (limit_ >= 0 && returned_count_ >= limit_) {
has_next = false;
return common::E_OK;
}

// ③ Normal delegation.
int ret = inner_->next(has_next);
if (ret == common::E_OK && has_next) {
returned_count_++;
}
return ret;
}

bool RowRangeResultSet::is_null(const std::string& column_name) {
return inner_->is_null(column_name);
}

bool RowRangeResultSet::is_null(uint32_t column_index) {
return inner_->is_null(column_index);
}

RowRecord* RowRangeResultSet::get_row_record() {
return inner_->get_row_record();
}

std::shared_ptr<ResultSetMetadata> RowRangeResultSet::get_metadata() {
return inner_->get_metadata();
}

void RowRangeResultSet::close() {
if (inner_ != nullptr) {
delete inner_;
inner_ = nullptr;
}
}

} // namespace storage
62 changes: 62 additions & 0 deletions cpp/src/reader/row_range_result_set.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#ifndef READER_ROW_RANGE_RESULT_SET_H
#define READER_ROW_RANGE_RESULT_SET_H

#include <memory>
#include <string>

#include "reader/result_set.h"

namespace storage {

/**
* @brief A ResultSet wrapper that applies row-level offset and limit.
*
* Takes ownership of the inner ResultSet and releases it on close().
* Once the limit is reached, next() returns has_next=false immediately
* without calling the underlying ResultSet, avoiding unnecessary data loading.
*
* @param offset Number of leading rows to skip (must be >= 0).
* @param limit Maximum number of rows to return. A value < 0 means
* no limit (all remaining rows are returned).
*/
class RowRangeResultSet : public ResultSet {
public:
RowRangeResultSet(ResultSet* inner, int offset, int limit);
~RowRangeResultSet() override;

int next(bool& has_next) override;
bool is_null(const std::string& column_name) override;
bool is_null(uint32_t column_index) override;
RowRecord* get_row_record() override;
std::shared_ptr<ResultSetMetadata> get_metadata() override;
void close() override;

private:
ResultSet* inner_;
int offset_;
int limit_;
int returned_count_;
bool offset_skipped_;
};

} // namespace storage
#endif // READER_ROW_RANGE_RESULT_SET_H
13 changes: 13 additions & 0 deletions cpp/src/reader/tsfile_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
#include "tsfile_reader.h"

#include <climits>

#include "common/schema.h"
#include "filter/time_operator.h"
#include "reader/row_range_result_set.h"
#include "tsfile_executor.h"

using namespace common;
Expand Down Expand Up @@ -114,6 +117,16 @@ int TsFileReader::query(const std::string& table_name,
return ret;
}

int TsFileReader::queryByRow(const std::string& table_name,
const std::vector<std::string>& column_names,
int offset, int limit, ResultSet*& result_set) {
ResultSet* inner = nullptr;
int ret = query(table_name, column_names, INT64_MIN, INT64_MAX, inner);
if (ret != common::E_OK) return ret;
result_set = new RowRangeResultSet(inner, offset, limit);
return common::E_OK;
}

int TsFileReader::query_table_on_tree(
const std::vector<std::string>& measurement_names, int64_t star_time,
int64_t end_time, ResultSet*& result_set) {
Expand Down
18 changes: 18 additions & 0 deletions cpp/src/reader/tsfile_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,24 @@ class TsFileReader {
const std::vector<std::string>& columns_names, int64_t start_time,
int64_t end_time, ResultSet*& result_set, Filter* tag_filter);

/**
* @brief Query table model data by row range.
*
* Internally queries the full time range [INT64_MIN, INT64_MAX] and
* applies offset/limit at the result-set level. Once `limit` rows have
* been returned, no further data is loaded from storage.
*
* @param table_name Target table name.
* @param column_names Columns to query.
* @param offset Number of leading rows to skip (>= 0).
* @param limit Maximum rows to return. < 0 means unlimited.
* @param[out] result_set The result set containing query results.
* @return Returns 0 on success, or a non-zero error code on failure.
*/
int queryByRow(const std::string& table_name,
const std::vector<std::string>& column_names, int offset,
int limit, ResultSet*& result_set);

int query_table_on_tree(const std::vector<std::string>& measurement_names,
int64_t star_time, int64_t end_time,
ResultSet*& result_set);
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/reader/tsfile_tree_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

#include "reader/tsfile_tree_reader.h"

#include <climits>

#include "reader/row_range_result_set.h"

namespace storage {

TsFileTreeReader::TsFileTreeReader() {
Expand Down Expand Up @@ -47,6 +51,17 @@ int TsFileTreeReader::query(const std::vector<std::string>& device_ids,
return tsfile_reader_->query(path_list, start_time, end_time, result_set);
}

int TsFileTreeReader::queryByRow(
const std::vector<std::string>& device_ids,
const std::vector<std::string>& measurement_names, int offset, int limit,
ResultSet*& result_set) {
ResultSet* inner = nullptr;
int ret = query(device_ids, measurement_names, INT64_MIN, INT64_MAX, inner);
if (ret != common::E_OK) return ret;
result_set = new RowRangeResultSet(inner, offset, limit);
return common::E_OK;
}

void TsFileTreeReader::destroy_query_data_set(ResultSet* qds) {
tsfile_reader_->destroy_query_data_set(qds);
}
Expand Down
20 changes: 20 additions & 0 deletions cpp/src/reader/tsfile_tree_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ class TsFileTreeReader {
const std::vector<std::string>& measurement_names,
int64_t start_time, int64_t end_time, ResultSet*& result_set);

/**
* @brief Query tree model data by row range.
*
* Internally queries the full time range [INT64_MIN, INT64_MAX] and
* applies offset/limit at the result-set level. Once `limit` rows have
* been returned, no further data is loaded from storage.
*
* @param device_ids List of device identifiers to query.
* @param measurement_names List of measurement names to query.
* @param offset Number of leading rows to skip (>= 0).
* @param limit Maximum rows to return. < 0 means unlimited.
* @param[out] result_set The result set containing query results.
* @return Returns 0 on success, or a non-zero error code on failure.
* The caller is responsible for destroying the result set using
* destroy_query_data_set().
*/
int queryByRow(const std::vector<std::string>& device_ids,
const std::vector<std::string>& measurement_names,
int offset, int limit, ResultSet*& result_set);

/**
* @brief Destroy and deallocate the query result set
*
Expand Down
Loading
Loading