diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index c85150d8f..fc35de43c 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -111,7 +111,7 @@ endif () option(BUILD_TEST "Build tests" ON) message("cmake using: BUILD_TEST=${BUILD_TEST}") -option(ENABLE_ANTLR4 "Enable ANTLR4 runtime" ON) +option(ENABLE_ANTLR4 "Enable ANTLR4 runtime" OFF) message("cmake using: ENABLE_ANTLR4=${ENABLE_ANTLR4}") option(ENABLE_SNAPPY "Enable Google Snappy compression" ON) diff --git a/cpp/src/common/tsblock/vector/vector.h b/cpp/src/common/tsblock/vector/vector.h index 20a765967..37a96c543 100644 --- a/cpp/src/common/tsblock/vector/vector.h +++ b/cpp/src/common/tsblock/vector/vector.h @@ -78,6 +78,10 @@ class Vector { FORCE_INLINE bool has_null() { return has_null_; } + FORCE_INLINE common::BitMap& get_bitmap() { return nulls_; } + + FORCE_INLINE common::ByteBuffer& get_value_data() { return values_; } + // We want derived class to have access to base class members, so it is // defined as protected protected: diff --git a/cpp/src/cwrapper/CMakeLists.txt b/cpp/src/cwrapper/CMakeLists.txt index 07f52eb33..d62250bf0 100644 --- a/cpp/src/cwrapper/CMakeLists.txt +++ b/cpp/src/cwrapper/CMakeLists.txt @@ -18,7 +18,7 @@ under the License. ]] message("Running in cwrapper directory") set(CMAKE_POSITION_INDEPENDENT_CODE ON) -set(CWRAPPER_SRC_LIST tsfile_cwrapper.cc) +set(CWRAPPER_SRC_LIST tsfile_cwrapper.cc arrow_c.cc) add_library(cwrapper_obj OBJECT ${CWRAPPER_SRC_LIST}) # install header files diff --git a/cpp/src/cwrapper/arrow_c.cc b/cpp/src/cwrapper/arrow_c.cc new file mode 100644 index 000000000..40fdcb639 --- /dev/null +++ b/cpp/src/cwrapper/arrow_c.cc @@ -0,0 +1,976 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include "common/allocator/alloc_base.h" +#include "common/tablet.h" +#include "common/tsblock/tsblock.h" +#include "common/tsblock/tuple_desc.h" +#include "common/tsblock/vector/vector.h" +#include "cwrapper/tsfile_cwrapper.h" +#include "utils/errno_define.h" + +namespace arrow { + +#define ARROW_FLAG_DICTIONARY_ORDERED 1 +#define ARROW_FLAG_NULLABLE 2 +#define ARROW_FLAG_MAP_KEYS_SORTED 4 + +struct ArrowArrayData { + void** buffers; + size_t n_buffers; +}; + +struct ArrowSchemaData { + std::vector* format_strings; + std::vector* name_strings; + ArrowSchema** children; + size_t n_children; +}; + +struct StructArrayData { + ArrowArray** children; + size_t n_children; +}; + +static const char* GetArrowFormatString(common::TSDataType datatype) { + switch (datatype) { + case common::BOOLEAN: + return "b"; + case common::INT32: + return "i"; + case common::INT64: + return "l"; + case common::TIMESTAMP: // nanosecond, no timezone + return "tsn:"; + case common::FLOAT: + return "f"; + case common::DOUBLE: + return "g"; + case common::TEXT: + case common::STRING: + return "u"; + case common::DATE: + return "tdD"; // date32: days since Unix epoch, stored as int32 + default: + return nullptr; + } +} + +static inline size_t GetNullBitmapSize(int64_t length) { + return (length + 7) / 8; +} + +static void ReleaseArrowArray(ArrowArray* array) { + if (array == nullptr || array->private_data == nullptr) { + return; + } + ArrowArrayData* data = static_cast(array->private_data); + if (data->buffers != nullptr) { + for (size_t i = 0; i < data->n_buffers; ++i) { + if (data->buffers[i] != nullptr) { + common::mem_free(data->buffers[i]); + } + } + common::mem_free(data->buffers); + } + common::mem_free(data); + + array->length = 0; + array->null_count = 0; + array->offset = 0; + array->n_buffers = 0; + array->n_children = 0; + array->buffers = nullptr; + array->children = nullptr; + array->dictionary = nullptr; + array->release = nullptr; + array->private_data = nullptr; +} + +static void ReleaseStructArrowArray(ArrowArray* array) { + if (array == nullptr || array->private_data == nullptr) { + return; + } + StructArrayData* data = static_cast(array->private_data); + if (data->children != nullptr) { + for (size_t i = 0; i < data->n_children; ++i) { + if (data->children[i] != nullptr) { + if (data->children[i]->release != nullptr) { + data->children[i]->release(data->children[i]); + } + common::mem_free(data->children[i]); + } + } + common::mem_free(data->children); + } + delete data; + + array->length = 0; + array->null_count = 0; + array->offset = 0; + array->n_buffers = 0; + array->n_children = 0; + array->buffers = nullptr; + array->children = nullptr; + array->dictionary = nullptr; + array->release = nullptr; + array->private_data = nullptr; +} + +static void ReleaseArrowSchema(ArrowSchema* schema) { + if (schema == nullptr || schema->private_data == nullptr) { + return; + } + ArrowSchemaData* data = static_cast(schema->private_data); + + // Release children schemas first + if (data->children != nullptr) { + for (size_t i = 0; i < data->n_children; ++i) { + if (data->children[i] != nullptr) { + if (data->children[i]->release != nullptr) { + data->children[i]->release(data->children[i]); + } + common::mem_free(data->children[i]); + } + } + common::mem_free(data->children); + } + + // Release string storage + if (data->format_strings != nullptr) { + delete data->format_strings; + } + if (data->name_strings != nullptr) { + delete data->name_strings; + } + + delete data; + + schema->format = nullptr; + schema->name = nullptr; + schema->metadata = nullptr; + schema->flags = 0; + schema->n_children = 0; + schema->children = nullptr; + schema->dictionary = nullptr; + schema->release = nullptr; + schema->private_data = nullptr; +} + +template +inline int BuildFixedLengthArrowArrayC(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + bool has_null = vec->has_null(); + size_t type_size = sizeof(CType); + // Arrow C Data Interface: fixed-width types always have 2 buffers + // buffers[0] = validity bitmap (may be NULL if no nulls) + // buffers[1] = values + static constexpr int64_t n_buffers = 2; + + ArrowArrayData* array_data = static_cast( + common::mem_alloc(sizeof(ArrowArrayData), common::MOD_TSBLOCK)); + if (array_data == nullptr) { + return common::E_OOM; + } + + array_data->n_buffers = n_buffers; + array_data->buffers = static_cast( + common::mem_alloc(n_buffers * sizeof(void*), common::MOD_TSBLOCK)); + if (array_data->buffers == nullptr) { + common::mem_free(array_data); + return common::E_OOM; + } + + for (int64_t i = 0; i < n_buffers; ++i) { + array_data->buffers[i] = nullptr; + } + + uint8_t* null_bitmap = nullptr; + if (has_null) { + size_t null_bitmap_size = GetNullBitmapSize(row_count); + null_bitmap = static_cast( + common::mem_alloc(null_bitmap_size, common::MOD_TSBLOCK)); + if (null_bitmap == nullptr) { + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + common::BitMap& vec_bitmap = vec->get_bitmap(); + char* vec_bitmap_data = vec_bitmap.get_bitmap(); + for (size_t i = 0; i < null_bitmap_size; ++i) { + null_bitmap[i] = ~static_cast(vec_bitmap_data[i]); + } + array_data->buffers[0] = null_bitmap; + + int64_t null_count = 0; + for (uint32_t i = 0; i < row_count; ++i) { + if (vec_bitmap.test(i)) { + null_count++; + } + } + out_array->null_count = null_count; + } else { + array_data->buffers[0] = nullptr; + out_array->null_count = 0; + } + + char* vec_data = vec->get_value_data().get_data(); + void* data_buffer = nullptr; + + if (std::is_same::value) { + size_t packed_size = GetNullBitmapSize(row_count); + uint8_t* packed_buffer = static_cast( + common::mem_alloc(packed_size, common::MOD_TSBLOCK)); + if (packed_buffer == nullptr) { + if (null_bitmap != nullptr) { + common::mem_free(null_bitmap); + } + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + + std::memset(packed_buffer, 0, packed_size); + + const uint8_t* src = reinterpret_cast(vec_data); + for (uint32_t i = 0; i < row_count; ++i) { + if (src[i] != 0) { + uint32_t byte_idx = i / 8; + uint32_t bit_idx = i % 8; + packed_buffer[byte_idx] |= (1 << bit_idx); + } + } + + data_buffer = packed_buffer; + } else { + size_t data_size = type_size * row_count; + data_buffer = common::mem_alloc(data_size, common::MOD_TSBLOCK); + if (data_buffer == nullptr) { + if (null_bitmap != nullptr) { + common::mem_free(null_bitmap); + } + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + std::memcpy(data_buffer, vec_data, data_size); + } + + array_data->buffers[1] = data_buffer; + + out_array->length = row_count; + out_array->offset = 0; + out_array->n_buffers = n_buffers; + out_array->n_children = 0; + out_array->buffers = const_cast(array_data->buffers); + out_array->children = nullptr; + out_array->dictionary = nullptr; + out_array->release = ReleaseArrowArray; + out_array->private_data = array_data; + + return common::E_OK; +} + +static int BuildStringArrowArrayC(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + bool has_null = vec->has_null(); + int64_t n_buffers = 3; + ArrowArrayData* array_data = static_cast( + common::mem_alloc(sizeof(ArrowArrayData), common::MOD_TSBLOCK)); + if (array_data == nullptr) { + return common::E_OOM; + } + + array_data->n_buffers = n_buffers; + array_data->buffers = static_cast( + common::mem_alloc(n_buffers * sizeof(void*), common::MOD_TSBLOCK)); + if (array_data->buffers == nullptr) { + common::mem_free(array_data); + return common::E_OOM; + } + + for (int64_t i = 0; i < n_buffers; ++i) { + array_data->buffers[i] = nullptr; + } + + uint8_t* null_bitmap = nullptr; + if (has_null) { + size_t null_bitmap_size = GetNullBitmapSize(row_count); + null_bitmap = static_cast( + common::mem_alloc(null_bitmap_size, common::MOD_TSBLOCK)); + if (null_bitmap == nullptr) { + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + common::BitMap& vec_bitmap = vec->get_bitmap(); + char* vec_bitmap_data = vec_bitmap.get_bitmap(); + for (size_t i = 0; i < null_bitmap_size; ++i) { + null_bitmap[i] = ~static_cast(vec_bitmap_data[i]); + } + array_data->buffers[0] = null_bitmap; + + int64_t null_count = 0; + for (uint32_t i = 0; i < row_count; ++i) { + if (vec_bitmap.test(i)) { + null_count++; + } + } + out_array->null_count = null_count; + } else { + array_data->buffers[0] = nullptr; + out_array->null_count = 0; + } + size_t offsets_size = sizeof(int32_t) * (row_count + 1); + int32_t* offsets = static_cast( + common::mem_alloc(offsets_size, common::MOD_TSBLOCK)); + if (offsets == nullptr) { + if (null_bitmap != nullptr) { + common::mem_free(null_bitmap); + } + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + + offsets[0] = 0; + uint32_t current_offset = 0; + char* vec_data = vec->get_value_data().get_data(); + uint32_t vec_offset = 0; + + // 获取 vec_bitmap 用于后续检查 + common::BitMap& vec_bitmap = vec->get_bitmap(); + + for (uint32_t i = 0; i < row_count; ++i) { + if (has_null && vec_bitmap.test(i)) { + offsets[i + 1] = current_offset; + } else { + uint32_t len = 0; + std::memcpy(&len, vec_data + vec_offset, sizeof(uint32_t)); + vec_offset += sizeof(uint32_t); + + current_offset += len; + offsets[i + 1] = current_offset; + vec_offset += len; + } + } + + array_data->buffers[1] = offsets; + + size_t data_size = current_offset; + uint8_t* data_buffer = static_cast( + common::mem_alloc(data_size, common::MOD_TSBLOCK)); + if (data_buffer == nullptr) { + if (null_bitmap != nullptr) { + common::mem_free(null_bitmap); + } + common::mem_free(offsets); + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + + vec_offset = 0; + uint32_t data_offset = 0; + for (uint32_t i = 0; i < row_count; ++i) { + if (!has_null || !vec_bitmap.test(i)) { + uint32_t len = 0; + std::memcpy(&len, vec_data + vec_offset, sizeof(uint32_t)); + vec_offset += sizeof(uint32_t); + + if (len > 0) { + std::memcpy(data_buffer + data_offset, vec_data + vec_offset, + len); + data_offset += len; + } + vec_offset += len; + } + } + + array_data->buffers[2] = data_buffer; + + out_array->length = row_count; + out_array->offset = 0; + out_array->n_buffers = n_buffers; + out_array->n_children = 0; + out_array->buffers = const_cast(array_data->buffers); + out_array->children = nullptr; + out_array->dictionary = nullptr; + out_array->release = ReleaseArrowArray; + out_array->private_data = array_data; + + return common::E_OK; +} + +// Convert TsFile YYYYMMDD integer to days since Unix epoch (1970-01-01) +static int32_t YYYYMMDDToDaysSinceEpoch(int32_t yyyymmdd) { + int year = yyyymmdd / 10000; + int month = (yyyymmdd % 10000) / 100; + int day = yyyymmdd % 100; + + std::tm date = {}; + date.tm_year = year - 1900; + date.tm_mon = month - 1; + date.tm_mday = day; + date.tm_hour = 12; + date.tm_isdst = -1; + + std::tm epoch = {}; + epoch.tm_year = 70; + epoch.tm_mon = 0; + epoch.tm_mday = 1; + epoch.tm_hour = 12; + epoch.tm_isdst = -1; + + time_t t1 = mktime(&date); + time_t t2 = mktime(&epoch); + return static_cast((t1 - t2) / (60 * 60 * 24)); +} + +static int BuildDateArrowArrayC(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + bool has_null = vec->has_null(); + static constexpr int64_t n_buffers = 2; + + ArrowArrayData* array_data = static_cast( + common::mem_alloc(sizeof(ArrowArrayData), common::MOD_TSBLOCK)); + if (array_data == nullptr) return common::E_OOM; + + array_data->n_buffers = n_buffers; + array_data->buffers = static_cast( + common::mem_alloc(n_buffers * sizeof(void*), common::MOD_TSBLOCK)); + if (array_data->buffers == nullptr) { + common::mem_free(array_data); + return common::E_OOM; + } + for (int64_t i = 0; i < n_buffers; ++i) array_data->buffers[i] = nullptr; + + common::BitMap& vec_bitmap = vec->get_bitmap(); + uint8_t* null_bitmap = nullptr; + if (has_null) { + size_t null_bitmap_size = GetNullBitmapSize(row_count); + null_bitmap = static_cast( + common::mem_alloc(null_bitmap_size, common::MOD_TSBLOCK)); + if (null_bitmap == nullptr) { + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + char* vec_bitmap_data = vec_bitmap.get_bitmap(); + for (size_t i = 0; i < null_bitmap_size; ++i) { + null_bitmap[i] = ~static_cast(vec_bitmap_data[i]); + } + int64_t null_count = 0; + for (uint32_t i = 0; i < row_count; ++i) { + if (vec_bitmap.test(i)) null_count++; + } + out_array->null_count = null_count; + array_data->buffers[0] = null_bitmap; + } else { + out_array->null_count = 0; + array_data->buffers[0] = nullptr; + } + + int32_t* data_buffer = static_cast( + common::mem_alloc(sizeof(int32_t) * row_count, common::MOD_TSBLOCK)); + if (data_buffer == nullptr) { + if (null_bitmap) common::mem_free(null_bitmap); + common::mem_free(array_data->buffers); + common::mem_free(array_data); + return common::E_OOM; + } + + char* vec_data = vec->get_value_data().get_data(); + for (uint32_t i = 0; i < row_count; ++i) { + if (has_null && vec_bitmap.test(i)) { + data_buffer[i] = 0; + } else { + int32_t yyyymmdd = 0; + std::memcpy(&yyyymmdd, vec_data + i * sizeof(int32_t), + sizeof(int32_t)); + data_buffer[i] = YYYYMMDDToDaysSinceEpoch(yyyymmdd); + } + } + + array_data->buffers[1] = data_buffer; + out_array->length = row_count; + out_array->offset = 0; + out_array->n_buffers = n_buffers; + out_array->n_children = 0; + out_array->buffers = const_cast(array_data->buffers); + out_array->children = nullptr; + out_array->dictionary = nullptr; + out_array->release = ReleaseArrowArray; + out_array->private_data = array_data; + return common::E_OK; +} + +// Helper function to build ArrowArray for a single column +static int BuildColumnArrowArray(common::Vector* vec, uint32_t row_count, + ArrowArray* out_array) { + if (vec == nullptr || out_array == nullptr || row_count == 0) { + return common::E_INVALID_ARG; + } + + common::TSDataType data_type = vec->get_vector_type(); + const char* format = GetArrowFormatString(data_type); + if (format == nullptr) { + return common::E_TYPE_NOT_SUPPORTED; + } + + int ret = common::E_OK; + switch (data_type) { + case common::BOOLEAN: + ret = BuildFixedLengthArrowArrayC(vec, row_count, out_array); + break; + case common::INT32: + ret = + BuildFixedLengthArrowArrayC(vec, row_count, out_array); + break; + case common::DATE: + ret = BuildDateArrowArrayC(vec, row_count, out_array); + break; + case common::INT64: + case common::TIMESTAMP: + ret = + BuildFixedLengthArrowArrayC(vec, row_count, out_array); + break; + case common::FLOAT: + ret = BuildFixedLengthArrowArrayC(vec, row_count, out_array); + break; + case common::DOUBLE: + ret = + BuildFixedLengthArrowArrayC(vec, row_count, out_array); + break; + case common::TEXT: + case common::STRING: + ret = BuildStringArrowArrayC(vec, row_count, out_array); + break; + default: + return common::E_TYPE_NOT_SUPPORTED; + } + return ret; +} + +// Build ArrowSchema for a single column +static int BuildColumnArrowSchema(common::TSDataType data_type, + const std::string& column_name, + ArrowSchema* out_schema) { + if (out_schema == nullptr) { + return common::E_INVALID_ARG; + } + + const char* format = GetArrowFormatString(data_type); + if (format == nullptr) { + return common::E_TYPE_NOT_SUPPORTED; + } + + ArrowSchemaData* schema_data = new ArrowSchemaData(); + schema_data->format_strings = new std::vector(); + schema_data->name_strings = new std::vector(); + schema_data->children = nullptr; + schema_data->n_children = 0; + + schema_data->format_strings->push_back(format); + schema_data->name_strings->push_back(column_name); + + out_schema->format = schema_data->format_strings->back().c_str(); + out_schema->name = schema_data->name_strings->back().c_str(); + out_schema->metadata = nullptr; + out_schema->flags = ARROW_FLAG_NULLABLE; + out_schema->n_children = 0; + out_schema->children = nullptr; + out_schema->dictionary = nullptr; + out_schema->release = ReleaseArrowSchema; + out_schema->private_data = schema_data; + + return common::E_OK; +} + +int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, + ArrowSchema* out_schema) { + if (out_array == nullptr || out_schema == nullptr) { + return common::E_INVALID_ARG; + } + + uint32_t row_count = tsblock.get_row_count(); + uint32_t column_count = tsblock.get_column_count(); + common::TupleDesc* tuple_desc = tsblock.get_tuple_desc(); + + if (row_count == 0 || column_count == 0) { + return common::E_INVALID_ARG; + } + + // Build ArrowSchema for struct type + ArrowSchemaData* schema_data = new ArrowSchemaData(); + schema_data->format_strings = new std::vector(); + schema_data->name_strings = new std::vector(); + schema_data->n_children = column_count; + schema_data->children = static_cast(common::mem_alloc( + column_count * sizeof(ArrowSchema*), common::MOD_TSBLOCK)); + if (schema_data->children == nullptr) { + delete schema_data->format_strings; + delete schema_data->name_strings; + delete schema_data; + return common::E_OOM; + } + + // Store format string for struct type + schema_data->format_strings->push_back("+s"); + schema_data->name_strings->push_back(""); + + // Build schema for each column + for (uint32_t i = 0; i < column_count; ++i) { + schema_data->children[i] = static_cast( + common::mem_alloc(sizeof(ArrowSchema), common::MOD_TSBLOCK)); + if (schema_data->children[i] == nullptr) { + for (uint32_t j = 0; j < i; ++j) { + if (schema_data->children[j] != nullptr && + schema_data->children[j]->release != nullptr) { + schema_data->children[j]->release(schema_data->children[j]); + } + } + common::mem_free(schema_data->children); + delete schema_data->format_strings; + delete schema_data->name_strings; + delete schema_data; + return common::E_OOM; + } + + common::TSDataType col_type = tuple_desc->get_column_type(i); + std::string col_name = tuple_desc->get_column_name(i); + + int ret = BuildColumnArrowSchema(col_type, col_name, + schema_data->children[i]); + if (ret != common::E_OK) { + for (uint32_t j = 0; j <= i; ++j) { + if (schema_data->children[j] != nullptr && + schema_data->children[j]->release != nullptr) { + schema_data->children[j]->release(schema_data->children[j]); + } + } + common::mem_free(schema_data->children); + delete schema_data->format_strings; + delete schema_data->name_strings; + delete schema_data; + return ret; + } + } + + out_schema->format = schema_data->format_strings->at(0).c_str(); + out_schema->name = schema_data->name_strings->at(0).c_str(); + out_schema->metadata = nullptr; + out_schema->flags = 0; + out_schema->n_children = column_count; + out_schema->children = schema_data->children; + out_schema->dictionary = nullptr; + out_schema->release = ReleaseArrowSchema; + out_schema->private_data = schema_data; + + ArrowArray** children_arrays = static_cast(common::mem_alloc( + column_count * sizeof(ArrowArray*), common::MOD_TSBLOCK)); + if (children_arrays == nullptr) { + ReleaseArrowSchema(out_schema); + return common::E_OOM; + } + + for (uint32_t i = 0; i < column_count; ++i) { + children_arrays[i] = static_cast( + common::mem_alloc(sizeof(ArrowArray), common::MOD_TSBLOCK)); + if (children_arrays[i] == nullptr) { + for (uint32_t j = 0; j < i; ++j) { + if (children_arrays[j] != nullptr && + children_arrays[j]->release != nullptr) { + children_arrays[j]->release(children_arrays[j]); + } + } + common::mem_free(children_arrays); + ReleaseArrowSchema(out_schema); + return common::E_OOM; + } + + common::Vector* vec = tsblock.get_vector(i); + int ret = BuildColumnArrowArray(vec, row_count, children_arrays[i]); + if (ret != common::E_OK) { + for (uint32_t j = 0; j <= i; ++j) { + if (children_arrays[j] != nullptr && + children_arrays[j]->release != nullptr) { + children_arrays[j]->release(children_arrays[j]); + } + } + common::mem_free(children_arrays); + ReleaseArrowSchema(out_schema); + return ret; + } + } + + StructArrayData* struct_data = new StructArrayData(); + struct_data->children = children_arrays; + struct_data->n_children = column_count; + + // Arrow C Data Interface: struct type requires n_buffers = 1 (validity + // bitmap) buffers[0] may be NULL if there are no nulls at the struct level + static const void* struct_buffers[1] = {nullptr}; + + out_array->length = row_count; + out_array->null_count = 0; // struct itself is never null + out_array->offset = 0; + out_array->n_buffers = 1; + out_array->n_children = column_count; + out_array->buffers = struct_buffers; + out_array->children = children_arrays; + out_array->dictionary = nullptr; + out_array->release = ReleaseStructArrowArray; + out_array->private_data = struct_data; + + return common::E_OK; +} + +// Convert days since Unix epoch back to YYYYMMDD integer format +static int32_t DaysSinceEpochToYYYYMMDD(int32_t days) { + std::tm epoch = {}; + epoch.tm_year = 70; + epoch.tm_mon = 0; + epoch.tm_mday = 1; + epoch.tm_hour = 12; + epoch.tm_isdst = -1; + time_t epoch_t = mktime(&epoch); + time_t target_t = epoch_t + static_cast(days) * 24 * 60 * 60; + std::tm* d = localtime(&target_t); + return (d->tm_year + 1900) * 10000 + (d->tm_mon + 1) * 100 + d->tm_mday; +} + +// Check if Arrow row is valid (non-null) based on validity bitmap +static bool ArrowIsValid(const ArrowArray* arr, int64_t row) { + if (arr->null_count == 0 || arr->buffers[0] == nullptr) return true; + int64_t bit_idx = arr->offset + row; + const uint8_t* bitmap = static_cast(arr->buffers[0]); + return (bitmap[bit_idx / 8] >> (bit_idx % 8)) & 1; +} + +// Map Arrow format string to TSDataType +static common::TSDataType ArrowFormatToDataType(const char* format) { + if (strcmp(format, "b") == 0) return common::BOOLEAN; + if (strcmp(format, "i") == 0) return common::INT32; + if (strcmp(format, "l") == 0) return common::INT64; + if (strcmp(format, "tsn:") == 0) return common::TIMESTAMP; + if (strcmp(format, "f") == 0) return common::FLOAT; + if (strcmp(format, "g") == 0) return common::DOUBLE; + if (strcmp(format, "u") == 0) return common::TEXT; + if (strcmp(format, "tdD") == 0) return common::DATE; + return common::INVALID_DATATYPE; +} + +// Convert Arrow C Data Interface struct array to storage::Tablet. +// The timestamp column (format "tsn:") is used as tablet timestamps; +// all other columns become tablet data columns. +// reg_schema: optional registered TableSchema; when provided its column types +// are used in the Tablet (so they match the writer's registered schema +// exactly). Arrow format strings are still used to decode the actual buffers. +int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, + const ArrowSchema* in_schema, + const storage::TableSchema* reg_schema, + storage::Tablet** out_tablet) { + if (!in_array || !in_schema || !out_tablet) return common::E_INVALID_ARG; + if (strcmp(in_schema->format, "+s") != 0) return common::E_INVALID_ARG; + + int64_t n_rows = in_array->length; + int64_t n_cols = in_schema->n_children; + if (n_rows <= 0 || n_cols == 0) return common::E_INVALID_ARG; + + int time_col_idx = -1; + std::vector col_names; + // col_types: types for Tablet schema (from reg_schema when available) + std::vector col_types; + // read_modes: how to decode Arrow buffers (from Arrow format string) + std::vector read_modes; + std::vector data_col_indices; + + // Cache reg_schema data types once to avoid repeated calls + std::vector reg_data_types; + if (reg_schema) { + reg_data_types = reg_schema->get_data_types(); + } + + for (int64_t i = 0; i < n_cols; i++) { + const ArrowSchema* child = in_schema->children[i]; + common::TSDataType read_mode = ArrowFormatToDataType(child->format); + if (read_mode == common::INVALID_DATATYPE) + return common::E_TYPE_NOT_SUPPORTED; + if (read_mode == common::TIMESTAMP) { + time_col_idx = static_cast(i); + } else { + std::string col_name = child->name ? child->name : ""; + common::TSDataType col_type = read_mode; + if (reg_schema) { + int reg_idx = const_cast(reg_schema) + ->find_column_index(col_name); + if (reg_idx >= 0 && + reg_idx < static_cast(reg_data_types.size())) { + col_type = reg_data_types[reg_idx]; + } + } + col_names.emplace_back(std::move(col_name)); + col_types.push_back(col_type); + read_modes.push_back(read_mode); + data_col_indices.push_back(static_cast(i)); + } + } + + if (col_names.empty()) return common::E_INVALID_ARG; + + std::string tname = table_name ? table_name : ""; + auto* tablet = new storage::Tablet(tname, &col_names, &col_types, + static_cast(n_rows)); + if (tablet->err_code_ != common::E_OK) { + int err = tablet->err_code_; + delete tablet; + return err; + } + + // Fill timestamps from the time column + if (time_col_idx >= 0) { + const ArrowArray* ts_arr = in_array->children[time_col_idx]; + const int64_t* ts_buf = static_cast(ts_arr->buffers[1]); + int64_t off = ts_arr->offset; + for (int64_t r = 0; r < n_rows; r++) { + if (ArrowIsValid(ts_arr, r)) + tablet->add_timestamp(static_cast(r), + ts_buf[off + r]); + } + } + + // Fill data columns from Arrow children (use read_modes to decode buffers) + for (size_t ci = 0; ci < data_col_indices.size(); ci++) { + const ArrowArray* col_arr = in_array->children[data_col_indices[ci]]; + common::TSDataType dtype = read_modes[ci]; + uint32_t tcol = static_cast(ci); + int64_t off = col_arr->offset; + + switch (dtype) { + case common::BOOLEAN: { + // Arrow boolean: bit-packed in buffers[1] + const uint8_t* vals = + static_cast(col_arr->buffers[1]); + for (int64_t r = 0; r < n_rows; r++) { + if (!ArrowIsValid(col_arr, r)) continue; + int64_t bit = off + r; + bool v = (vals[bit / 8] >> (bit % 8)) & 1; + tablet->add_value(static_cast(r), tcol, v); + } + break; + } + case common::INT32: { + const int32_t* vals = + static_cast(col_arr->buffers[1]); + for (int64_t r = 0; r < n_rows; r++) { + if (ArrowIsValid(col_arr, r)) + tablet->add_value(static_cast(r), + tcol, vals[off + r]); + } + break; + } + case common::INT64: { + const int64_t* vals = + static_cast(col_arr->buffers[1]); + for (int64_t r = 0; r < n_rows; r++) { + if (ArrowIsValid(col_arr, r)) + tablet->add_value(static_cast(r), + tcol, vals[off + r]); + } + break; + } + case common::FLOAT: { + const float* vals = + static_cast(col_arr->buffers[1]); + for (int64_t r = 0; r < n_rows; r++) { + if (ArrowIsValid(col_arr, r)) + tablet->add_value(static_cast(r), tcol, + vals[off + r]); + } + break; + } + case common::DOUBLE: { + const double* vals = + static_cast(col_arr->buffers[1]); + for (int64_t r = 0; r < n_rows; r++) { + if (ArrowIsValid(col_arr, r)) + tablet->add_value(static_cast(r), + tcol, vals[off + r]); + } + break; + } + case common::DATE: { + // Arrow stores date as int32 days-since-epoch; convert to + // YYYYMMDD + const int32_t* vals = + static_cast(col_arr->buffers[1]); + for (int64_t r = 0; r < n_rows; r++) { + if (!ArrowIsValid(col_arr, r)) continue; + int32_t yyyymmdd = DaysSinceEpochToYYYYMMDD(vals[off + r]); + tablet->add_value(static_cast(r), tcol, + yyyymmdd); + } + break; + } + case common::TEXT: + case common::STRING: { + // Arrow UTF-8 string: buffers[1]=int32 offsets, buffers[2]=char + // data + const int32_t* offsets = + static_cast(col_arr->buffers[1]); + const char* data = + static_cast(col_arr->buffers[2]); + for (int64_t r = 0; r < n_rows; r++) { + if (!ArrowIsValid(col_arr, r)) continue; + int32_t start = offsets[off + r]; + int32_t len = offsets[off + r + 1] - start; + tablet->add_value(static_cast(r), tcol, + common::String(data + start, len)); + } + break; + } + default: + delete tablet; + return common::E_TYPE_NOT_SUPPORTED; + } + } + + *out_tablet = tablet; + return common::E_OK; +} + +} // namespace arrow diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index fbcf4e6f1..779156945 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -24,13 +24,25 @@ #include #include +#include #include #include "common/tablet.h" #include "reader/result_set.h" +#include "reader/table_result_set.h" #include "reader/tsfile_reader.h" #include "writer/tsfile_writer.h" +// Forward declarations for arrow namespace functions (defined in arrow_c.cc) +namespace arrow { +int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, + ArrowSchema* out_schema); +int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array, + const ArrowSchema* in_schema, + const storage::TableSchema* reg_schema, + storage::Tablet** out_tablet); +} // namespace arrow + #ifdef __cplusplus extern "C" { #endif @@ -361,6 +373,21 @@ ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, return table_result_set; } +ResultSet tsfile_query_table_batch(TsFileReader reader, const char* table_name, + char** columns, uint32_t column_num, + Timestamp start_time, Timestamp end_time, + int batch_size, ERRNO* err_code) { + auto* r = static_cast(reader); + storage::ResultSet* table_result_set = nullptr; + std::vector column_names; + for (uint32_t i = 0; i < column_num; i++) { + column_names.emplace_back(columns[i]); + } + *err_code = r->query(table_name, column_names, start_time, end_time, + table_result_set, batch_size); + return table_result_set; +} + bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) { auto* r = static_cast(result_set); bool has_next = true; @@ -373,6 +400,34 @@ bool tsfile_result_set_next(ResultSet result_set, ERRNO* err_code) { return has_next; } +ERRNO tsfile_result_set_get_next_tsblock_as_arrow(ResultSet result_set, + ArrowArray* out_array, + ArrowSchema* out_schema) { + if (result_set == nullptr || out_array == nullptr || + out_schema == nullptr) { + return common::E_INVALID_ARG; + } + + auto* r = static_cast(result_set); + auto* table_result_set = dynamic_cast(r); + if (table_result_set == nullptr) { + return common::E_INVALID_ARG; + } + + common::TsBlock* tsblock = nullptr; + int ret = table_result_set->get_next_tsblock(tsblock); + if (ret != common::E_OK) { + return ret; + } + + if (tsblock == nullptr) { + return common::E_NO_MORE_DATA; + } + + ret = arrow::TsBlockToArrowStruct(*tsblock, out_array, out_schema); + return ret; +} + #define TSFILE_RESULT_SET_GET_VALUE_BY_NAME_DEF(type) \ type tsfile_result_set_get_value_by_name_##type(ResultSet result_set, \ const char* column_name) { \ @@ -744,6 +799,21 @@ ERRNO _tsfile_writer_write_table(TsFileWriter writer, Tablet tablet) { return w->write_table(*tbl); } +ERRNO _tsfile_writer_write_arrow_table(TsFileWriter writer, + const char* table_name, + ArrowArray* array, ArrowSchema* schema) { + auto* w = static_cast(writer); + std::shared_ptr reg_schema = + w->get_table_schema(table_name ? std::string(table_name) : ""); + storage::Tablet* tablet = nullptr; + int ret = arrow::ArrowStructToTablet(table_name, array, schema, + reg_schema.get(), &tablet); + if (ret != common::E_OK) return ret; + ret = w->write_table(*tablet); + delete tablet; + return ret; +} + ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord data) { auto* w = static_cast(writer); const storage::TsRecord* record = static_cast(data); diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index 643b4e52b..8d7b79d52 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -20,6 +20,7 @@ #ifndef SRC_CWRAPPER_TSFILE_CWRAPPER_H_ #define SRC_CWRAPPER_TSFILE_CWRAPPER_H_ #ifdef __cplusplus + extern "C" { #endif @@ -124,6 +125,39 @@ typedef void* TsRecord; typedef void* ResultSet; +typedef struct arrow_schema { + // Array type description + const char* format; + const char* name; + const char* metadata; + int64_t flags; + int64_t n_children; + struct arrow_schema** children; + struct arrow_schema* dictionary; + + // Release callback + void (*release)(struct arrow_schema*); + // Opaque producer-specific data + void* private_data; +} ArrowSchema; + +typedef struct arrow_array { + // Array data description + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void** buffers; + struct arrow_array** children; + struct arrow_array* dictionary; + + // Release callback + void (*release)(struct arrow_array*); + // Opaque producer-specific data + void* private_data; +} ArrowArray; + typedef int32_t ERRNO; typedef int64_t Timestamp; @@ -444,11 +478,15 @@ ResultSet tsfile_query_table(TsFileReader reader, const char* table_name, ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, uint32_t column_num, Timestamp start_time, Timestamp end_time, ERRNO* err_code); +ResultSet tsfile_query_table_batch(TsFileReader reader, const char* table_name, + char** columns, uint32_t column_num, + Timestamp start_time, Timestamp end_time, + int batch_size, ERRNO* err_code); // ResultSet tsfile_reader_query_device(TsFileReader reader, // const char* device_name, -// char** sensor_name, uint32_t sensor_num, -// Timestamp start_time, Timestamp -// end_time); +// char** sensor_name, uint32_t +// sensor_num, Timestamp start_time, +// Timestamp end_time); /** * @brief Check and fetch the next row in the ResultSet. @@ -458,6 +496,27 @@ ResultSet tsfile_query_table_on_tree(TsFileReader reader, char** columns, */ bool tsfile_result_set_next(ResultSet result_set, ERRNO* error_code); +/** + * @brief Gets the next TsBlock from batch ResultSet and converts it to Arrow + * format. + * + * @param result_set [in] Valid ResultSet handle from batch query + * (tsfile_query_table_batch). + * @param out_array [out] Pointer to ArrowArray pointer. Will be set to the + * converted Arrow array. + * @param out_schema [out] Pointer to ArrowSchema pointer. Will be set to the + * converted Arrow schema. + * @return ERRNO - E_OK(0) on success, E_NO_MORE_DATA if no more blocks, or + * other error codes. + * @note Caller should release ArrowArray and ArrowSchema by calling their + * release callbacks when done. + * @note This function should only be called on ResultSet obtained from + * tsfile_query_table_batch with batch_size > 0. + */ +ERRNO tsfile_result_set_get_next_tsblock_as_arrow(ResultSet result_set, + ArrowArray* out_array, + ArrowSchema* out_schema); + /** * @brief Gets value from current row by column name (generic types). * @@ -659,6 +718,11 @@ ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet); // Write a tablet into a table. ERRNO _tsfile_writer_write_table(TsFileWriter writer, Tablet tablet); +// Write Arrow C Data Interface batch into a table (Arrow -> Tablet -> write). +ERRNO _tsfile_writer_write_arrow_table(TsFileWriter writer, + const char* table_name, + ArrowArray* array, ArrowSchema* schema); + // Write a row record into a device. ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord record); diff --git a/cpp/src/reader/qds_with_timegenerator.h b/cpp/src/reader/qds_with_timegenerator.h index 52892df14..c0651f0b1 100644 --- a/cpp/src/reader/qds_with_timegenerator.h +++ b/cpp/src/reader/qds_with_timegenerator.h @@ -123,6 +123,7 @@ class QDSWithTimeGenerator : public ResultSet { bool is_null(uint32_t column_index); RowRecord* get_row_record(); std::shared_ptr get_metadata(); + int get_next_tsblock(common::TsBlock*& block) { return common::E_OK; }; private: int construct_node_tree(Expression* expr, Node*& node); diff --git a/cpp/src/reader/qds_without_timegenerator.h b/cpp/src/reader/qds_without_timegenerator.h index 0619fa673..f949e04b5 100644 --- a/cpp/src/reader/qds_without_timegenerator.h +++ b/cpp/src/reader/qds_without_timegenerator.h @@ -48,6 +48,7 @@ class QDSWithoutTimeGenerator : public ResultSet { bool is_null(uint32_t column_index); RowRecord* get_row_record(); std::shared_ptr get_metadata(); + int get_next_tsblock(common::TsBlock*& block) { return common::E_OK; }; private: int get_next_tsblock(uint32_t index, bool alloc_mem); diff --git a/cpp/src/reader/result_set.h b/cpp/src/reader/result_set.h index 87303cef4..26f3a3fa9 100644 --- a/cpp/src/reader/result_set.h +++ b/cpp/src/reader/result_set.h @@ -25,6 +25,7 @@ #include #include "common/row_record.h" +#include "common/tsblock/tsblock.h" namespace storage { /** @@ -155,6 +156,9 @@ class ResultSet : std::enable_shared_from_this { ASSERT(column_index >= 0 && column_index < row_record->get_col_num()); return row_record->get_field(column_index)->get_value(); } + + virtual int get_next_tsblock(common::TsBlock*& block) = 0; + /** * @brief Get the row record of the result set * diff --git a/cpp/src/reader/table_query_executor.cc b/cpp/src/reader/table_query_executor.cc index 2a01a6d5c..8f71d7625 100644 --- a/cpp/src/reader/table_query_executor.cc +++ b/cpp/src/reader/table_query_executor.cc @@ -83,8 +83,9 @@ int TableQueryExecutor::query(const std::string& table_name, ret = common::E_UNSUPPORTED_ORDER; } assert(tsblock_reader != nullptr); - ret_qds = new TableResultSet(std::move(tsblock_reader), - lower_case_column_names, data_types); + ret_qds = + new TableResultSet(std::move(tsblock_reader), lower_case_column_names, + data_types, batch_mode_); return ret; } diff --git a/cpp/src/reader/table_query_executor.h b/cpp/src/reader/table_query_executor.h index 974e6b45b..76427d7e8 100644 --- a/cpp/src/reader/table_query_executor.h +++ b/cpp/src/reader/table_query_executor.h @@ -44,13 +44,20 @@ class TableQueryExecutor { : meta_data_querier_(meta_data_querier), tsfile_io_reader_(tsfile_io_reader), table_query_ordering_(table_query_ordering), - block_size_(block_size) {} - TableQueryExecutor(ReadFile* read_file) { + block_size_(block_size), + batch_mode_(false) {} + TableQueryExecutor(ReadFile* read_file, const int batch_size = 0) { tsfile_io_reader_ = new TsFileIOReader(); tsfile_io_reader_->init(read_file); meta_data_querier_ = new MetadataQuerier(tsfile_io_reader_); table_query_ordering_ = TableQueryOrdering::DEVICE; - block_size_ = 1024; + if (batch_size == 0) { + block_size_ = 1024; + batch_mode_ = false; + } else { + block_size_ = batch_size; + batch_mode_ = true; + } } ~TableQueryExecutor() { if (meta_data_querier_ != nullptr) { @@ -76,6 +83,7 @@ class TableQueryExecutor { TsFileIOReader* tsfile_io_reader_; TableQueryOrdering table_query_ordering_; int32_t block_size_; + bool batch_mode_; }; } // namespace storage diff --git a/cpp/src/reader/table_result_set.cc b/cpp/src/reader/table_result_set.cc index aeeefb463..9a4281f12 100644 --- a/cpp/src/reader/table_result_set.cc +++ b/cpp/src/reader/table_result_set.cc @@ -37,7 +37,12 @@ void TableResultSet::init() { TableResultSet::~TableResultSet() { close(); } int TableResultSet::next(bool& has_next) { + if (batch_mode_) { + return tsblock_reader_->has_next(has_next); + } + int ret = common::E_OK; + while (row_iterator_ == nullptr || !row_iterator_->has_next()) { if (RET_FAIL(tsblock_reader_->has_next(has_next))) { return ret; @@ -103,6 +108,35 @@ std::shared_ptr TableResultSet::get_metadata() { return result_set_metadata_; } +int TableResultSet::get_next_tsblock(common::TsBlock*& block) { + int ret = common::E_OK; + block = nullptr; + + if (!batch_mode_) { + return common::E_INVALID_ARG; + } + + bool has_next = false; + if (RET_FAIL(tsblock_reader_->has_next(has_next))) { + return common::E_NO_MORE_DATA; + } + + if (!has_next) { + return common::E_NO_MORE_DATA; + } + + if (RET_FAIL(tsblock_reader_->next(tsblock_))) { + return common::E_NO_MORE_DATA; + } + + if (tsblock_ == nullptr) { + return common::E_NO_MORE_DATA; + } + + block = tsblock_; + return common::E_OK; +} + void TableResultSet::close() { tsblock_reader_->close(); pa_.destroy(); diff --git a/cpp/src/reader/table_result_set.h b/cpp/src/reader/table_result_set.h index 4192f7c2f..251c8029c 100644 --- a/cpp/src/reader/table_result_set.h +++ b/cpp/src/reader/table_result_set.h @@ -28,19 +28,22 @@ class TableResultSet : public ResultSet { public: explicit TableResultSet(std::unique_ptr tsblock_reader, std::vector column_names, - std::vector data_types) + std::vector data_types, + bool batch_mode = false) : tsblock_reader_(std::move(tsblock_reader)), column_names_(column_names), - data_types_(data_types) { + data_types_(data_types), + batch_mode_(batch_mode) { init(); } - ~TableResultSet(); + ~TableResultSet() override; int next(bool& has_next) override; bool is_null(const std::string& column_name) override; bool is_null(uint32_t column_index) override; RowRecord* get_row_record() override; std::shared_ptr get_metadata() override; void close() override; + int get_next_tsblock(common::TsBlock*& block) override; private: void init(); @@ -51,6 +54,7 @@ class TableResultSet : public ResultSet { std::vector> tsblock_readers_; std::vector column_names_; std::vector data_types_; + const bool batch_mode_; }; } // namespace storage #endif // TABLE_RESULT_SET_H \ No newline at end of file diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index f97570885..436f262e8 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -42,7 +42,6 @@ int TsFileReader::open(const std::string& file_path) { } else if (RET_FAIL(tsfile_executor_->init(read_file_))) { std::cout << "filed to init " << ret << std::endl; } - table_query_executor_ = new storage::TableQueryExecutor(read_file_); return ret; } @@ -87,15 +86,16 @@ int TsFileReader::query(std::vector& path_list, int64_t start_time, int TsFileReader::query(const std::string& table_name, const std::vector& columns_names, int64_t start_time, int64_t end_time, - ResultSet*& result_set) { + ResultSet*& result_set, int batch_size) { return this->query(table_name, columns_names, start_time, end_time, - result_set, nullptr); + result_set, nullptr, batch_size); } int TsFileReader::query(const std::string& table_name, const std::vector& columns_names, int64_t start_time, int64_t end_time, - ResultSet*& result_set, Filter* tag_filter) { + ResultSet*& result_set, Filter* tag_filter, + int batch_size) { int ret = E_OK; TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta(); if (tsfile_meta == nullptr) { @@ -108,6 +108,9 @@ int TsFileReader::query(const std::string& table_name, } Filter* time_filter = new TimeBetween(start_time, end_time, false); + if (table_query_executor_ == nullptr) { + table_query_executor_ = new TableQueryExecutor(read_file_, batch_size); + } ret = table_query_executor_->query(to_lower(table_name), columns_names, time_filter, tag_filter, nullptr, result_set); @@ -185,6 +188,9 @@ int TsFileReader::query_table_on_tree( columns_names[i] = "col_" + std::to_string(i); } Filter* time_filter = new TimeBetween(star_time, end_time, false); + if (table_query_executor_ == nullptr) { + table_query_executor_ = new TableQueryExecutor(read_file_); + } ret = table_query_executor_->query_on_tree( satisfied_device_ids, columns_names, measurement_names_to_query, time_filter, result_set); diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h index 8a6ba2264..526a96351 100644 --- a/cpp/src/reader/tsfile_reader.h +++ b/cpp/src/reader/tsfile_reader.h @@ -95,7 +95,7 @@ class TsFileReader { */ int query(const std::string& table_name, const std::vector& columns_names, int64_t start_time, - int64_t end_time, ResultSet*& result_set); + int64_t end_time, ResultSet*& result_set, int batch_size = 0); /** * @brief query the tsfile by the table name, columns names, start time @@ -111,7 +111,8 @@ class TsFileReader { */ int query(const std::string& table_name, const std::vector& columns_names, int64_t start_time, - int64_t end_time, ResultSet*& result_set, Filter* tag_filter); + int64_t end_time, ResultSet*& result_set, Filter* tag_filter, + int batch_size = 0); int query_table_on_tree(const std::vector& measurement_names, int64_t star_time, int64_t end_time, diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 2c2e46b97..1693a6647 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -336,6 +336,14 @@ int TsFileWriter::do_check_and_prepare_tablet(Tablet& tablet) { return common::E_OK; } +std::shared_ptr TsFileWriter::get_table_schema( + const std::string& table_name) const { + auto& schema_map = io_writer_->get_schema()->table_schema_map_; + auto it = schema_map.find(table_name); + if (it == schema_map.end()) return nullptr; + return it->second; +} + template int TsFileWriter::do_check_schema( std::shared_ptr device_id, diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index e80a1232b..106a41dce 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -90,6 +90,8 @@ class TsFileWriter { TableSchemasMapIter; DeviceSchemasMap* get_schema_group_map() { return &schemas_; } + std::shared_ptr get_table_schema( + const std::string& table_name) const; int64_t calculate_mem_size_for_all_group(); int check_memory_size_and_may_flush_chunks(); /* diff --git a/cpp/test/common/tsblock/arrow_tsblock_test.cc b/cpp/test/common/tsblock/arrow_tsblock_test.cc new file mode 100644 index 000000000..123efb59f --- /dev/null +++ b/cpp/test/common/tsblock/arrow_tsblock_test.cc @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include + +#include "common/tsblock/tsblock.h" +#include "cwrapper/tsfile_cwrapper.h" +#include "utils/db_utils.h" + +// Forward declarations for arrow namespace (functions are defined in +// arrow_c.cc) +namespace arrow { +// Type aliases for Arrow types (defined in tsfile_cwrapper.h) +using ArrowArray = ::ArrowArray; +using ArrowSchema = ::ArrowSchema; +#define ARROW_FLAG_DICTIONARY_ORDERED 1 +#define ARROW_FLAG_NULLABLE 2 +#define ARROW_FLAG_MAP_KEYS_SORTED 4 + +// Function declaration (defined in arrow_c.cc) +int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array, + ArrowSchema* out_schema); +} // namespace arrow + +static void VerifyArrowSchema( + const ::arrow::ArrowSchema* schema, + const std::vector& expected_names, + const std::vector& expected_formats) { + ASSERT_NE(schema, nullptr); + EXPECT_STREQ(schema->format, "+s"); + EXPECT_EQ(schema->n_children, expected_names.size()); + ASSERT_NE(schema->children, nullptr); + + for (size_t i = 0; i < expected_names.size(); ++i) { + const arrow::ArrowSchema* child = schema->children[i]; + ASSERT_NE(child, nullptr); + EXPECT_STREQ(child->name, expected_names[i].c_str()); + EXPECT_STREQ(child->format, expected_formats[i]); + EXPECT_EQ(child->flags, ARROW_FLAG_NULLABLE); + } +} + +static void VerifyArrowArrayData(const arrow::ArrowArray* array, + uint32_t expected_length) { + ASSERT_NE(array, nullptr); + EXPECT_EQ(array->length, expected_length); + EXPECT_EQ(array->n_children, 3); + ASSERT_NE(array->children, nullptr); +} + +TEST(ArrowTsBlockTest, NormalTsBlock_NoNulls) { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("int_col", common::INT32, common::SNAPPY, + common::RLE); + common::ColumnSchema col2("double_col", common::DOUBLE, common::SNAPPY, + common::RLE); + common::ColumnSchema col3("string_col", common::STRING, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + tuple_desc.push_back(col2); + tuple_desc.push_back(col3); + + common::TsBlock tsblock(&tuple_desc, 10); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + + for (int i = 0; i < 5; ++i) { + ASSERT_TRUE(row_appender.add_row()); + + int32_t int_val = 100 + i; + row_appender.append(0, reinterpret_cast(&int_val), + sizeof(int32_t)); + double double_val = 3.14 + i; + row_appender.append(1, reinterpret_cast(&double_val), + sizeof(double)); + std::string str_val = "test" + std::to_string(i); + row_appender.append(2, str_val.c_str(), str_val.length()); + } + + EXPECT_EQ(tsblock.get_row_count(), 5); + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + std::vector expected_names = {"int_col", "double_col", + "string_col"}; + std::vector expected_formats = {"i", "g", "u"}; + VerifyArrowSchema(&schema, expected_names, expected_formats); + + VerifyArrowArrayData(&array, 5); + + ASSERT_NE(array.children, nullptr); + ASSERT_NE(array.children[0], nullptr); + ASSERT_NE(array.children[1], nullptr); + ASSERT_NE(array.children[2], nullptr); + + const ArrowArray* int_array = array.children[0]; + EXPECT_EQ(int_array->length, 5); + EXPECT_EQ(int_array->null_count, 0); + ASSERT_NE(int_array->buffers, nullptr); + const int32_t* int_data = reinterpret_cast( + int_array->buffers[int_array->n_buffers - 1]); + for (int i = 0; i < 5; ++i) { + EXPECT_EQ(int_data[i], 100 + i); + } + + const arrow::ArrowArray* double_array = array.children[1]; + EXPECT_EQ(double_array->length, 5); + EXPECT_EQ(double_array->null_count, 0); + const double* double_data = reinterpret_cast( + double_array->buffers[double_array->n_buffers - 1]); + for (int i = 0; i < 5; ++i) { + EXPECT_DOUBLE_EQ(double_data[i], 3.14 + i); + } + const arrow::ArrowArray* string_array = array.children[2]; + EXPECT_EQ(string_array->length, 5); + EXPECT_EQ(string_array->null_count, 0); + ASSERT_NE(string_array->buffers, nullptr); + const int32_t* offsets = + reinterpret_cast(string_array->buffers[1]); + const char* string_data = + reinterpret_cast(string_array->buffers[2]); + + for (int i = 0; i < 5; ++i) { + int32_t start = offsets[i]; + int32_t end = offsets[i + 1]; + std::string expected_str = "test" + std::to_string(i); + std::string actual_str(string_data + start, end - start); + EXPECT_EQ(actual_str, expected_str); + } + + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } +} + +TEST(ArrowTsBlockTest, TsBlock_WithNulls) { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("int_col", common::INT32, common::SNAPPY, + common::RLE); + common::ColumnSchema col2("double_col", common::DOUBLE, common::SNAPPY, + common::RLE); + common::ColumnSchema col3("string_col", common::STRING, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + tuple_desc.push_back(col2); + tuple_desc.push_back(col3); + + common::TsBlock tsblock(&tuple_desc, 10); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + for (int i = 0; i < 5; ++i) { + ASSERT_TRUE(row_appender.add_row()); + + if (i == 1) { + row_appender.append_null(0); + row_appender.append_null(1); + row_appender.append_null(2); + } else if (i == 3) { + row_appender.append_null(0); + double double_val = 3.14 + i; + row_appender.append(1, reinterpret_cast(&double_val), + sizeof(double)); + std::string str_val = "test" + std::to_string(i); + row_appender.append(2, str_val.c_str(), str_val.length()); + } else { + int32_t int_val = 100 + i; + row_appender.append(0, reinterpret_cast(&int_val), + sizeof(int32_t)); + double double_val = 3.14 + i; + row_appender.append(1, reinterpret_cast(&double_val), + sizeof(double)); + std::string str_val = "test" + std::to_string(i); + row_appender.append(2, str_val.c_str(), str_val.length()); + } + } + + EXPECT_EQ(tsblock.get_row_count(), 5); + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + std::vector expected_names = {"int_col", "double_col", + "string_col"}; + std::vector expected_formats = {"i", "g", "u"}; + VerifyArrowSchema(&schema, expected_names, expected_formats); + + VerifyArrowArrayData(&array, 5); + + const arrow::ArrowArray* int_array = array.children[0]; + EXPECT_EQ(int_array->null_count, 2); + + const arrow::ArrowArray* double_array = array.children[1]; + EXPECT_EQ(double_array->null_count, 1); + + const arrow::ArrowArray* string_array = array.children[2]; + EXPECT_EQ(string_array->null_count, 1); + + ASSERT_NE(int_array->buffers[0], nullptr); + const uint8_t* null_bitmap = + reinterpret_cast(int_array->buffers[0]); + EXPECT_FALSE(null_bitmap[0] & (1 << 1)); + EXPECT_FALSE(null_bitmap[0] & (1 << 3)); + EXPECT_TRUE(null_bitmap[0] & (1 << 0)); + EXPECT_TRUE(null_bitmap[0] & (1 << 2)); + EXPECT_TRUE(null_bitmap[0] & (1 << 4)); + + const int32_t* int_data = reinterpret_cast( + int_array->buffers[int_array->n_buffers - 1]); + EXPECT_NE(int_data, nullptr); + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } +} + +TEST(ArrowTsBlockTest, TsBlock_EdgeCases) { + { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("single_col", common::INT64, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + + common::TsBlock tsblock(&tuple_desc, 5); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + for (int i = 0; i < 3; ++i) { + ASSERT_TRUE(row_appender.add_row()); + int64_t val = 1000 + i; + row_appender.append(0, reinterpret_cast(&val), + sizeof(int64_t)); + } + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + EXPECT_STREQ(schema.format, "+s"); + EXPECT_EQ(schema.n_children, 1); + EXPECT_STREQ(schema.children[0]->name, "single_col"); + EXPECT_STREQ(schema.children[0]->format, "l"); + + EXPECT_EQ(array.length, 3); + EXPECT_EQ(array.n_children, 1); + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } + } + + { + common::TupleDesc tuple_desc; + common::ColumnSchema col1("int_col", common::INT32, common::SNAPPY, + common::RLE); + common::ColumnSchema col2("double_col", common::DOUBLE, common::SNAPPY, + common::RLE); + tuple_desc.push_back(col1); + tuple_desc.push_back(col2); + + const int row_count = 1000; + common::TsBlock tsblock(&tuple_desc, row_count); + ASSERT_EQ(tsblock.init(), common::E_OK); + + common::RowAppender row_appender(&tsblock); + for (int i = 0; i < row_count; ++i) { + ASSERT_TRUE(row_appender.add_row()); + int32_t int_val = i; + row_appender.append(0, reinterpret_cast(&int_val), + sizeof(int32_t)); + double double_val = i * 0.5; + row_appender.append(1, reinterpret_cast(&double_val), + sizeof(double)); + } + + arrow::ArrowArray array; + arrow::ArrowSchema schema; + int ret = arrow::TsBlockToArrowStruct(tsblock, &array, &schema); + ASSERT_EQ(ret, common::E_OK); + + EXPECT_EQ(array.length, row_count); + EXPECT_EQ(array.n_children, 2); + + const arrow::ArrowArray* int_array = array.children[0]; + const int32_t* int_data = + reinterpret_cast(int_array->buffers[1]); + EXPECT_EQ(int_data[0], 0); + EXPECT_EQ(int_data[row_count - 1], row_count - 1); + + const arrow::ArrowArray* double_array = array.children[1]; + const double* double_data = + reinterpret_cast(double_array->buffers[1]); + EXPECT_DOUBLE_EQ(double_data[0], 0.0); + EXPECT_DOUBLE_EQ(double_data[row_count - 1], (row_count - 1) * 0.5); + + if (array.release != nullptr) { + array.release(&array); + } + if (schema.release != nullptr) { + schema.release(&schema); + } + } +} diff --git a/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc new file mode 100644 index 000000000..1298157e2 --- /dev/null +++ b/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include + +#include +#include +#include + +#include "common/record.h" +#include "common/schema.h" +#include "common/tablet.h" +#include "file/tsfile_io_writer.h" +#include "file/write_file.h" +#include "reader/table_result_set.h" +#include "reader/tsfile_reader.h" +#include "writer/chunk_writer.h" +#include "writer/tsfile_table_writer.h" + +using namespace storage; +using namespace common; + +class TsFileTableReaderBatchTest : public ::testing::Test { + protected: + void SetUp() override { + libtsfile_init(); + file_name_ = std::string("tsfile_reader_table_batch_test_") + + generate_random_string(10) + std::string(".tsfile"); + remove(file_name_.c_str()); + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + mode_t mode = 0666; + write_file_.create(file_name_, flags, mode); + } + void TearDown() override { + remove(file_name_.c_str()); + libtsfile_destroy(); + } + std::string file_name_; + WriteFile write_file_; + + public: + static std::string generate_random_string(int length) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 61); + + const std::string chars = + "0123456789" + "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + std::string random_string; + + for (int i = 0; i < length; ++i) { + random_string += chars[dis(gen)]; + } + + return random_string; + } + + static TableSchema* gen_table_schema_no_tag() { + // Generate table schema with only FIELD columns (no TAG columns) + std::vector measurement_schemas; + std::vector column_categories; + int measurement_schema_num = 5; // 5 field columns + for (int i = 0; i < measurement_schema_num; i++) { + measurement_schemas.emplace_back(new MeasurementSchema( + "s" + std::to_string(i), TSDataType::INT64, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + column_categories.emplace_back(ColumnCategory::FIELD); + } + return new TableSchema("testTableNoTag", measurement_schemas, + column_categories); + } + + static storage::Tablet gen_tablet_no_tag(TableSchema* table_schema, + int num_rows) { + // Generate tablet without tags (only field columns) + storage::Tablet tablet(table_schema->get_table_name(), + table_schema->get_measurement_names(), + table_schema->get_data_types(), + table_schema->get_column_categories(), num_rows); + + for (int i = 0; i < num_rows; i++) { + tablet.add_timestamp(i, i); + auto column_schemas = table_schema->get_measurement_schemas(); + for (const auto& column_schema : column_schemas) { + if (column_schema->data_type_ == TSDataType::INT64) { + tablet.add_value(i, column_schema->measurement_name_, + static_cast(i)); + } + } + } + return tablet; + } + + static TableSchema* gen_table_schema() { + std::vector measurement_schemas; + std::vector column_categories; + int id_schema_num = 2; + int measurement_schema_num = 3; + for (int i = 0; i < id_schema_num; i++) { + measurement_schemas.emplace_back(new MeasurementSchema( + "id" + std::to_string(i), TSDataType::STRING, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + column_categories.emplace_back(ColumnCategory::TAG); + } + for (int i = 0; i < measurement_schema_num; i++) { + measurement_schemas.emplace_back(new MeasurementSchema( + "s" + std::to_string(i), TSDataType::INT64, TSEncoding::PLAIN, + CompressionType::UNCOMPRESSED)); + column_categories.emplace_back(ColumnCategory::FIELD); + } + return new TableSchema("testTable", measurement_schemas, + column_categories); + } + + static storage::Tablet gen_tablet(TableSchema* table_schema, int offset, + int device_num, + int num_timestamp_per_device = 10) { + storage::Tablet tablet(table_schema->get_table_name(), + table_schema->get_measurement_names(), + table_schema->get_data_types(), + table_schema->get_column_categories(), + device_num * num_timestamp_per_device); + + char* literal = new char[std::strlen("device_id") + 1]; + std::strcpy(literal, "device_id"); + String literal_str(literal, std::strlen("device_id")); + for (int i = 0; i < device_num; i++) { + for (int l = 0; l < num_timestamp_per_device; l++) { + int row_index = i * num_timestamp_per_device + l; + tablet.add_timestamp(row_index, row_index); + auto column_schemas = table_schema->get_measurement_schemas(); + for (const auto& column_schema : column_schemas) { + switch (column_schema->data_type_) { + case TSDataType::INT64: + tablet.add_value(row_index, + column_schema->measurement_name_, + static_cast(i)); + break; + case TSDataType::STRING: + tablet.add_value(row_index, + column_schema->measurement_name_, + literal_str); + break; + default: + break; + } + } + } + } + delete[] literal; + return tablet; + } +}; + +TEST_F(TsFileTableReaderBatchTest, BatchQueryWithSmallBatchSize) { + auto table_schema = gen_table_schema(); + auto tsfile_table_writer_ = + std::make_shared(&write_file_, table_schema); + + const int device_num = 2; + const int points_per_device = 50; + auto tablet = gen_tablet(table_schema, 0, device_num, points_per_device); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + const int batch_size = 20; + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, 1000000000000, + tmp_result_set, batch_size); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int total_rows = 0; + int block_count = 0; + common::TsBlock* block = nullptr; + + char* literal = new char[std::strlen("device_id") + 1]; + std::strcpy(literal, "device_id"); + String expected_string(literal, std::strlen("device_id")); + std::vector int64_sums(3, 0); + while ((ret = table_result_set->get_next_tsblock(block)) == common::E_OK) { + ASSERT_NE(block, nullptr); + block_count++; + uint32_t row_count = block->get_row_count(); + total_rows += row_count; + ASSERT_EQ(row_count, batch_size); + + common::RowIterator row_iterator(block); + while (row_iterator.has_next()) { + uint32_t len; + bool null; + + int int64_col_idx = 0; + for (uint32_t col_idx = 1; + col_idx < row_iterator.get_column_count(); ++col_idx) { + const char* value = row_iterator.read(col_idx, &len, &null); + ASSERT_FALSE(null); + TSDataType data_type = row_iterator.get_data_type(col_idx); + if (data_type == TSDataType::INT64) { + int64_t int_val = *reinterpret_cast(value); + int64_sums[int64_col_idx] += int_val; + int64_col_idx++; + } else if (data_type == TSDataType::STRING) { + String str_value(value, len); + ASSERT_EQ(str_value.compare(expected_string), 0); + } + } + row_iterator.next(); + } + } + EXPECT_EQ(total_rows, device_num * points_per_device); + EXPECT_GT(block_count, 1); + for (size_t i = 0; i < int64_sums.size(); i++) { + EXPECT_EQ(int64_sums[i], 50); + } + + delete[] literal; + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; +} + +TEST_F(TsFileTableReaderBatchTest, BatchQueryWithLargeBatchSize) { + auto table_schema = gen_table_schema(); + auto tsfile_table_writer_ = + std::make_shared(&write_file_, table_schema); + + const int device_num = 1; + const int points_per_device = 120; + auto tablet = gen_tablet(table_schema, 0, device_num, points_per_device); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + const int batch_size = 100; + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, 1000000000000, + tmp_result_set, batch_size); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int total_rows = 0; + int block_count = 0; + common::TsBlock* block = nullptr; + + while (table_result_set->get_next_tsblock(block) == common::E_OK) { + ASSERT_NE(block, nullptr); + block_count++; + uint32_t row_count = block->get_row_count(); + total_rows += row_count; + + ASSERT_EQ(row_count, block_count == 1 ? batch_size : 20); + } + + EXPECT_EQ(total_rows, device_num * points_per_device); + EXPECT_GE(block_count, 2); + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; +} + +TEST_F(TsFileTableReaderBatchTest, BatchQueryVerifyDataCorrectness) { + auto table_schema = gen_table_schema(); + auto tsfile_table_writer_ = + std::make_shared(&write_file_, table_schema); + + const int device_num = 1; + const int points_per_device = 30; + auto tablet = gen_tablet(table_schema, 0, device_num, points_per_device); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + const int batch_size = 10; + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, 1000000000000, + tmp_result_set, batch_size); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int expected_timestamp = 0; + common::TsBlock* block = nullptr; + + while (table_result_set->get_next_tsblock(block) == common::E_OK) { + ASSERT_NE(block, nullptr); + + common::RowIterator row_iterator(block); + while (row_iterator.has_next()) { + uint32_t len; + bool null; + int64_t timestamp = *reinterpret_cast( + row_iterator.read(0, &len, &null)); + ASSERT_FALSE(null); + EXPECT_EQ(timestamp, expected_timestamp); + + for (uint32_t col_idx = 2; + col_idx < row_iterator.get_column_count(); ++col_idx) { + const char* value = row_iterator.read(col_idx, &len, &null); + if (!null && row_iterator.get_data_type(col_idx) == INT64) { + int64_t int_val = *reinterpret_cast(value); + EXPECT_EQ(int_val, 0); + } + } + row_iterator.next(); + expected_timestamp++; + } + } + + EXPECT_EQ(expected_timestamp, device_num * points_per_device); + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + delete table_schema; +} + +TEST_F(TsFileTableReaderBatchTest, PerformanceComparisonSinglePointVsBatch) { + // Create table schema without tags (only fields) + auto table_schema = gen_table_schema_no_tag(); + auto tsfile_table_writer_ = + std::make_shared(&write_file_, table_schema); + + // Write a large amount of data + const int total_rows = 1000000; + auto tablet = gen_tablet_no_tag(table_schema, total_rows); + ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->flush(), common::E_OK); + ASSERT_EQ(tsfile_table_writer_->close(), common::E_OK); + + // Test 1: Single point query (using next() method) + { + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + + ResultSet* tmp_result_set = nullptr; + // Single point query: don't specify batch_size (or use 0) + auto start_time = std::chrono::high_resolution_clock::now(); + + ret = reader.query(table_schema->get_table_name(), + table_schema->get_measurement_names(), 0, + 1000000000000, tmp_result_set); + ASSERT_EQ(ret, common::E_OK); + ASSERT_NE(tmp_result_set, nullptr); + + auto* table_result_set = dynamic_cast(tmp_result_set); + ASSERT_NE(table_result_set, nullptr); + + int total_rows_read = 0; + bool has_next = false; + + // Use next() method for single point query + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + total_rows_read++; + } + + auto end_time = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast( + end_time - start_time); + + EXPECT_EQ(total_rows_read, total_rows); + std::cout << "\n=== Single Point Query (using next() method) ===" + << std::endl; + std::cout << "Total rows read: " << total_rows_read << std::endl; + std::cout << "Time taken: " << duration.count() << " ms" << std::endl; + std::cout << "Throughput: " + << (total_rows_read * 5 * 1000.0 / duration.count()) + << " rows/sec" << std::endl; + + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); + } + + // // Test 2: Batch query (batch_size = 1000) + // { + // storage::TsFileReader reader; + // int ret = reader.open(file_name_); + // ASSERT_EQ(ret, common::E_OK); + // + // ResultSet* tmp_result_set = nullptr; + // const int batch_size = 10000; // Batch query + // auto start_time = std::chrono::high_resolution_clock::now(); + // + // ret = reader.query(table_schema->get_table_name(), + // table_schema->get_measurement_names(), 0, + // 1000000000000, tmp_result_set, batch_size); + // ASSERT_EQ(ret, common::E_OK); + // ASSERT_NE(tmp_result_set, nullptr); + // + // auto* table_result_set = + // dynamic_cast(tmp_result_set); + // ASSERT_NE(table_result_set, nullptr); + // + // int total_rows_read = 0; + // common::TsBlock* block = nullptr; + // int block_count = 0; + // + // while ((ret = table_result_set->get_next_tsblock(block)) == + // common::E_OK) { + // ASSERT_NE(block, nullptr); + // block_count++; + // total_rows_read += block->get_row_count(); + // } + // + // auto end_time = std::chrono::high_resolution_clock::now(); + // auto duration = + // std::chrono::duration_cast( + // end_time - start_time); + // + // EXPECT_EQ(total_rows_read, total_rows); + // std::cout << "\n=== Batch Query (batch_size=10000) ===" << std::endl; + // std::cout << "Total rows read: " << total_rows_read << std::endl; + // std::cout << "Block count: " << block_count << std::endl; + // std::cout << "Time taken: " << duration.count() << " ms" << + // std::endl; std::cout << "Throughput: " + // << (total_rows_read * 5 * 1000.0 / duration.count()) + // << " rows/sec" << std::endl; + // + // reader.destroy_query_data_set(table_result_set); + // ASSERT_EQ(reader.close(), common::E_OK); + // } + + delete table_schema; +} diff --git a/cpp/third_party/zlib-1.3.1/treebuild.xml b/cpp/third_party/zlib-1.3.1/treebuild.xml index 930b00be4..8e030572a 100644 --- a/cpp/third_party/zlib-1.3.1/treebuild.xml +++ b/cpp/third_party/zlib-1.3.1/treebuild.xml @@ -1,103 +1,99 @@ - + - zip compression library - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + zip compression library + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + zip compression library + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + -