diff --git a/.github/workflows/unit-test-cpp.yml b/.github/workflows/unit-test-cpp.yml index 15a629914..0fd23ac02 100644 --- a/.github/workflows/unit-test-cpp.yml +++ b/.github/workflows/unit-test-cpp.yml @@ -121,6 +121,8 @@ jobs: fi # Run the actual maven build including all tests. + # On Windows, prepend MinGW bin to PATH so test exe can find runtime DLLs + # (e.g. libstdc++-6.dll) when gtest_discover_tests runs it; avoids 0xc0000139. - name: Build and test with Maven shell: bash run: | @@ -129,6 +131,9 @@ jobs: else ASAN_VALUE="OFF" fi + if [[ "$RUNNER_OS" == "Windows" ]]; then + export PATH="/c/mingw64/bin:$PATH" + fi ./mvnw${{ steps.platform_suffix.outputs.platform_suffix }} -P with-cpp \ -Denable.asan=$ASAN_VALUE -Dbuild.type=${{ matrix.build_type }} clean verify diff --git a/.github/workflows/unit-test-python.yml b/.github/workflows/unit-test-python.yml index 9392f43cd..da8275e20 100644 --- a/.github/workflows/unit-test-python.yml +++ b/.github/workflows/unit-test-python.yml @@ -96,6 +96,11 @@ jobs: - name: Build and test with Maven shell: bash run: | + # On Windows, prepend MinGW bin so CTest/test exe uses the correct + # runtime DLLs instead of Git-for-Windows mingw runtime from PATH. + if [[ "$RUNNER_OS" == "Windows" ]]; then + export PATH="/c/mingw64/bin:$PATH" + fi ./mvnw${{ steps.platform_suffix.outputs.platform_suffix }} -P with-python -Denable.asan=OFF -Dbuild.type=Release clean verify -Dspotless.skip=true - name: Upload whl Artifact diff --git a/cpp/src/common/allocator/byte_stream.h b/cpp/src/common/allocator/byte_stream.h index 570aa1c13..4e1029ea4 100644 --- a/cpp/src/common/allocator/byte_stream.h +++ b/cpp/src/common/allocator/byte_stream.h @@ -459,6 +459,27 @@ class ByteStream { total_size_.atomic_aaf(used_bytes); } + /** + * Advance write position without copying payload bytes. + * Recovery path can use this to rebuild logical stream offset from file + * size directly. + */ + int advance_write_pos(uint32_t len) { + int ret = common::E_OK; + uint32_t advanced = 0; + while (advanced < len) { + if (RET_FAIL(prepare_space())) { + return ret; + } + uint32_t remainder = page_size_ - (total_size_.load() % page_size_); + uint32_t step = + remainder < (len - advanced) ? remainder : (len - advanced); + total_size_.atomic_aaf(step); + advanced += step; + } + return ret; + } + /* ================ Part 4: reading internal buffers ================ */ /* * one-shot reader iterator diff --git a/cpp/src/common/device_id.h b/cpp/src/common/device_id.h index 50d0d0105..323df9d47 100644 --- a/cpp/src/common/device_id.h +++ b/cpp/src/common/device_id.h @@ -148,8 +148,9 @@ class StringArrayDeviceID : public IDeviceID { if (prefix_segments_.size() == 0 || prefix_segments_.size() == 1) { return segments_[pos]; } else { - if (pos < prefix_segments_.size()) { - return prefix_segments_[pos]; + if (pos >= 0 && + static_cast(pos) < prefix_segments_.size()) { + return prefix_segments_[static_cast(pos)]; } else { return segments_[pos - prefix_segments_.size() + 1]; } diff --git a/cpp/src/common/schema.h b/cpp/src/common/schema.h index 499dd5bc7..a2c989af2 100644 --- a/cpp/src/common/schema.h +++ b/cpp/src/common/schema.h @@ -182,7 +182,7 @@ struct MeasurementSchemaGroup { */ class TableSchema { public: - TableSchema() = default; + TableSchema() : updatable_(true) {} /** * Constructs a TableSchema object with the given table name, column @@ -197,7 +197,7 @@ class TableSchema { */ TableSchema(const std::string& table_name, const std::vector& column_schemas) - : table_name_(table_name) { + : table_name_(table_name), updatable_(false) { to_lowercase_inplace(table_name_); for (const common::ColumnSchema& column_schema : column_schemas) { column_schemas_.emplace_back(std::make_shared( @@ -217,7 +217,9 @@ class TableSchema { TableSchema(const std::string& table_name, const std::vector& column_schemas, const std::vector& column_categories) - : table_name_(table_name), column_categories_(column_categories) { + : table_name_(table_name), + column_categories_(column_categories), + updatable_(false) { to_lowercase_inplace(table_name_); for (const auto column_schema : column_schemas) { if (column_schema != nullptr) { @@ -236,11 +238,13 @@ class TableSchema { TableSchema(TableSchema&& other) noexcept : table_name_(std::move(other.table_name_)), column_schemas_(std::move(other.column_schemas_)), - column_categories_(std::move(other.column_categories_)) {} + column_categories_(std::move(other.column_categories_)), + updatable_(other.updatable_) {} TableSchema(const TableSchema& other) noexcept : table_name_(other.table_name_), - column_categories_(other.column_categories_) { + column_categories_(other.column_categories_), + updatable_(false) { for (const auto& column_schema : other.column_schemas_) { // Just call default construction column_schemas_.emplace_back( @@ -342,6 +346,14 @@ class TableSchema { size_t get_column_pos_index_num() const { return column_pos_index_.size(); } void update(ChunkGroupMeta* chunk_group_meta) { + if (!updatable_) { + return; + } + std::shared_ptr device_id = chunk_group_meta->device_id_; + const int seg_num = device_id ? device_id->segment_num() : 0; + if (seg_num > max_level_) { + max_level_ = seg_num; + } for (auto iter = chunk_group_meta->chunk_meta_list_.begin(); iter != chunk_group_meta->chunk_meta_list_.end(); iter++) { auto& chunk_meta = iter.get(); @@ -371,6 +383,29 @@ class TableSchema { } } + void finalize_column_schema() { + if (!updatable_) { + return; + } + std::vector> id_columns; + for (int i = 1; i < max_level_; i++) { + std::string col_name = "__level" + std::to_string(i); + id_columns.push_back(std::make_shared( + col_name, common::STRING, common::PLAIN, + common::CompressionType::UNCOMPRESSED)); + } + column_schemas_.insert(column_schemas_.begin(), id_columns.begin(), + id_columns.end()); + column_categories_.insert(column_categories_.begin(), id_columns.size(), + common::ColumnCategory::TAG); + column_pos_index_.clear(); + for (size_t i = 0; i < column_schemas_.size(); i++) { + column_pos_index_[to_lower(column_schemas_[i]->measurement_name_)] = + static_cast(i); + } + updatable_ = false; + } + std::vector get_data_types() const { std::vector ret; for (const auto& measurement_schema : column_schemas_) { @@ -424,6 +459,8 @@ class TableSchema { std::vector column_categories_; std::map column_pos_index_; bool is_virtual_table_ = false; + int max_level_ = 0; + bool updatable_ = false; }; struct Schema { @@ -433,11 +470,19 @@ struct Schema { void update_table_schema(ChunkGroupMeta* chunk_group_meta) { std::shared_ptr device_id = chunk_group_meta->device_id_; - auto table_name = device_id->get_table_name(); + std::string table_name = device_id->get_table_name(); if (table_schema_map_.find(table_name) == table_schema_map_.end()) { table_schema_map_[table_name] = std::make_shared(); } - table_schema_map_[table_name]->update(chunk_group_meta); + auto& ts = table_schema_map_[table_name]; + ts->set_table_name(table_name); + ts->update(chunk_group_meta); + } + + void finalize_table_schemas() { + for (auto& kv : table_schema_map_) { + kv.second->finalize_column_schema(); + } } void register_table_schema( const std::shared_ptr& table_schema) { diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h index d12c6ed8c..ad2fa5911 100644 --- a/cpp/src/common/tsfile_common.h +++ b/cpp/src/common/tsfile_common.h @@ -23,9 +23,11 @@ #include #include #include +#include #include #include #include +#include #include "common/allocator/my_string.h" #include "common/allocator/page_arena.h" @@ -322,6 +324,12 @@ class ITimeseriesIndex { virtual Statistic* get_statistic() const { return nullptr; } }; +/** Map: IDeviceID -> list of timeseries metadata (ITimeseriesIndex). */ +using DeviceTimeseriesMetadataMap = + std::map, + std::vector>, + IDeviceIDComparator>; + /* * A TimeseriesIndex may have one or more chunk metas, * that means we have such a map: >. diff --git a/cpp/src/encoding/fire.h b/cpp/src/encoding/fire.h index 9b319a175..0ca3d1680 100644 --- a/cpp/src/encoding/fire.h +++ b/cpp/src/encoding/fire.h @@ -92,7 +92,7 @@ class LongFire : public Fire { int64_t predict(int64_t value) override { int64_t alpha = accumulator_ >> learn_shift_; - int64_t diff = (alpha * delta_) >> bit_width_; + int64_t diff = safe_mul_shift(alpha, delta_, bit_width_); return value + diff; } @@ -101,6 +101,22 @@ class LongFire : public Fire { accumulator_ -= gradient; delta_ = val - pre; } + + private: + /** (alpha * delta_) >> shift without signed overflow; both args are + * int64_t. */ + static int64_t safe_mul_shift(int64_t alpha, int64_t delta, int shift) { +#if defined(__SIZEOF_INT128__) && __SIZEOF_INT128__ >= 16 + __int128 product = static_cast<__int128>(alpha) * delta; + return static_cast(product >> shift); +#else + /* Portable fallback: use double for product. Exact for |alpha|,|delta| + * < 2^53. */ + double prod = static_cast(alpha) * static_cast(delta); + double div = static_cast(1LL << shift); + return static_cast(prod / div); +#endif + } }; #endif // ENCODING_FIRE_H diff --git a/cpp/src/encoding/ts2diff_encoder.h b/cpp/src/encoding/ts2diff_encoder.h index 18272e3e2..8c5ddafc7 100644 --- a/cpp/src/encoding/ts2diff_encoder.h +++ b/cpp/src/encoding/ts2diff_encoder.h @@ -93,7 +93,7 @@ class TS2DIFFEncoder : public Encoder { public: TS2DIFFEncoder() { init(); } - ~TS2DIFFEncoder() {} + ~TS2DIFFEncoder() { destroy(); } void reset() { write_index_ = -1; } diff --git a/cpp/src/file/restorable_tsfile_io_writer.cc b/cpp/src/file/restorable_tsfile_io_writer.cc new file mode 100644 index 000000000..d98cdff65 --- /dev/null +++ b/cpp/src/file/restorable_tsfile_io_writer.cc @@ -0,0 +1,845 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "file/restorable_tsfile_io_writer.h" + +#include + +#include +#include +#include +#include + +#include "common/allocator/byte_stream.h" +#include "common/device_id.h" +#include "common/statistic.h" +#include "common/tsfile_common.h" +#include "compress/compressor_factory.h" +#include "encoding/decoder_factory.h" +#include "utils/errno_define.h" + +#ifdef _WIN32 +#include +#include +#include +ssize_t pread(int fd, void* buf, size_t count, uint64_t offset); +#else +#include +#include +#endif + +using namespace common; + +namespace storage { + +namespace { + +const int HEADER_LEN = MAGIC_STRING_TSFILE_LEN + 1; // magic + version +const int BUF_SIZE = 4096; +const unsigned char kTimeChunkTypeMask = 0x80; + +// ----------------------------------------------------------------------------- +// Self-check helpers: read file, parse chunk header, recover chunk statistics +// ----------------------------------------------------------------------------- + +/** + * Lightweight read-only file handle for self-check only. + * Use init_from_fd() when WriteFile is already open to avoid opening the file + * twice (fixes Windows file sharing and ensures we read the same content). + */ +struct SelfCheckReader { + int fd_; + int32_t file_size_; + bool own_fd_; // if false, do not close fd_ + + SelfCheckReader() : fd_(-1), file_size_(-1), own_fd_(true) {} + + int init_from_fd(int fd) { + fd_ = fd; + own_fd_ = false; + if (fd_ < 0) { + return E_FILE_OPEN_ERR; + } +#ifdef _WIN32 + struct _stat st; + if (_fstat(fd_, &st) < 0) { + return E_FILE_STAT_ERR; + } + file_size_ = static_cast(st.st_size); +#else + struct stat st; + if (fstat(fd_, &st) < 0) { + return E_FILE_STAT_ERR; + } + file_size_ = static_cast(st.st_size); +#endif + return E_OK; + } + + int open(const std::string& path) { +#ifdef _WIN32 + fd_ = ::_open(path.c_str(), _O_RDONLY | _O_BINARY); +#else + fd_ = ::open(path.c_str(), O_RDONLY); +#endif + if (fd_ < 0) { + return E_FILE_OPEN_ERR; + } + own_fd_ = true; +#ifdef _WIN32 + struct _stat st; + if (_fstat(fd_, &st) < 0) { + close(); + return E_FILE_STAT_ERR; + } + file_size_ = static_cast(st.st_size); +#else + struct stat st; + if (fstat(fd_, &st) < 0) { + close(); + return E_FILE_STAT_ERR; + } + file_size_ = static_cast(st.st_size); +#endif + return E_OK; + } + + void close() { + if (own_fd_ && fd_ >= 0) { +#ifdef _WIN32 + ::_close(fd_); +#else + ::close(fd_); +#endif + } + fd_ = -1; + file_size_ = -1; + } + + int32_t file_size() const { return file_size_; } + + int read(int32_t offset, char* buf, int32_t buf_size, int32_t& read_len) { + read_len = 0; + if (fd_ < 0) { + return E_FILE_READ_ERR; + } + ssize_t n = ::pread(fd_, buf, buf_size, offset); + if (n < 0) { + return E_FILE_READ_ERR; + } + read_len = static_cast(n); + return E_OK; + } +}; + +#ifdef _WIN32 +ssize_t pread(int fd, void* buf, size_t count, uint64_t offset) { + DWORD read_bytes = 0; + OVERLAPPED ov = {}; + ov.OffsetHigh = (DWORD)((offset >> 32) & 0xFFFFFFFF); + ov.Offset = (DWORD)(offset & 0xFFFFFFFF); + HANDLE h = (HANDLE)_get_osfhandle(fd); + if (!ReadFile(h, buf, (DWORD)count, &read_bytes, &ov)) { + if (GetLastError() != ERROR_HANDLE_EOF) { + return -1; + } + } + return (ssize_t)read_bytes; +} +#endif + +/** + * Parse chunk header at chunk_start and compute total chunk size (header + + * data). Does not read full chunk data; used to advance scan position. + * @param header_out If non-null, filled with the deserialized chunk header. + * @param bytes_consumed Set to header_len + data_size on success. + */ +static int parse_chunk_header_and_skip(SelfCheckReader& reader, + int64_t chunk_start, + int64_t& bytes_consumed, + ChunkHeader* header_out = nullptr) { + int32_t file_size = reader.file_size(); + int32_t max_read = static_cast( + std::min(static_cast(BUF_SIZE), file_size - chunk_start)); + if (max_read < ChunkHeader::MIN_SERIALIZED_SIZE) { + return E_TSFILE_CORRUPTED; + } + + std::vector buf(max_read); + int32_t read_len = 0; + int ret = reader.read(static_cast(chunk_start), buf.data(), + max_read, read_len); + if (ret != E_OK || read_len < ChunkHeader::MIN_SERIALIZED_SIZE) { + return E_TSFILE_CORRUPTED; + } + + ByteStream bs; + bs.wrap_from(buf.data(), read_len); + + ChunkHeader header; + ret = header.deserialize_from(bs); + if (ret != E_OK) { + return E_TSFILE_CORRUPTED; + } + + int header_len = bs.read_pos(); + int64_t total = header_len + header.data_size_; + if (chunk_start + total > file_size) { + return E_TSFILE_CORRUPTED; + } + + if (header_out != nullptr) { + *header_out = header; + } + bytes_consumed = total; + return E_OK; +} + +/** + * Recover chunk-level statistic from chunk data so that tail metadata can be + * generated correctly after recovery (aligned with Java TsFileSequenceReader + * selfCheck). Multi-page: merge each page header's statistic. Single-page: + * decode page data and update stat. For aligned value chunks, time_batch + * (from the time chunk in the same group) must be provided. + */ +static int recover_chunk_statistic( + const ChunkHeader& chdr, const char* chunk_data, int32_t data_size, + Statistic* out_stat, common::PageArena* pa, + const std::vector* time_batch = nullptr, + std::vector* out_time_batch = nullptr) { + if (chunk_data == nullptr || data_size <= 0 || out_stat == nullptr) { + return E_OK; + } + common::ByteStream bs; + bs.wrap_from(const_cast(chunk_data), + static_cast(data_size)); + // Multi-page chunk: high bits of chunk_type_ are 0x00, low 6 bits = + // CHUNK_HEADER_MARKER + const bool multi_page = + (static_cast(chdr.chunk_type_) & 0x3F) == + static_cast(CHUNK_HEADER_MARKER); + + if (multi_page) { + while (bs.remaining_size() > 0) { + PageHeader ph; + int ret = ph.deserialize_from(bs, true, chdr.data_type_); + if (ret != common::E_OK) { + return ret; + } + uint32_t comp = ph.compressed_size_; + if (ph.statistic_ != nullptr) { + if (out_stat->merge_with(ph.statistic_) != common::E_OK) { + ph.reset(); + return common::E_TSFILE_CORRUPTED; + } + } + ph.reset(); + bs.wrapped_buf_advance_read_pos(comp); + } + return E_OK; + } + + // Single-page chunk: statistic is not in page header; decompress and decode + // to fill out_stat. is_time_column: bit 0x80 in chunk_type_ indicates time + // column (aligned model). + const bool is_time_column = (static_cast(chdr.chunk_type_) & + kTimeChunkTypeMask) != 0; + PageHeader ph; + int ret = ph.deserialize_from(bs, false, chdr.data_type_); + if (ret != common::E_OK || ph.compressed_size_ == 0 || + bs.remaining_size() < ph.compressed_size_) { + // Align with Java selfCheck behavior: malformed/incomplete page in this + // chunk is treated as corrupted data. + return common::E_TSFILE_CORRUPTED; + } + const char* compressed_ptr = + chunk_data + (data_size - static_cast(bs.remaining_size())); + char* uncompressed_buf = nullptr; + uint32_t uncompressed_size = 0; + Compressor* compressor = + CompressorFactory::alloc_compressor(chdr.compression_type_); + if (compressor == nullptr) { + return common::E_OOM; + } + ret = compressor->reset(false); + if (ret != common::E_OK) { + CompressorFactory::free(compressor); + return ret; + } + ret = compressor->uncompress(const_cast(compressed_ptr), + ph.compressed_size_, uncompressed_buf, + uncompressed_size); + if (ret != common::E_OK || uncompressed_buf == nullptr || + uncompressed_size != ph.uncompressed_size_) { + if (uncompressed_buf != nullptr) { + compressor->after_uncompress(uncompressed_buf); + } + CompressorFactory::free(compressor); + return (ret == common::E_OK) ? common::E_TSFILE_CORRUPTED : ret; + } + if (is_time_column) { + /* Time chunk: uncompressed = raw time stream only (no var_uint). */ + Decoder* time_decoder = DecoderFactory::alloc_time_decoder(); + if (time_decoder == nullptr) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return common::E_OOM; + } + common::ByteStream time_in; + time_in.wrap_from(uncompressed_buf, uncompressed_size); + time_decoder->reset(); + int64_t t; + if (out_time_batch != nullptr) { + out_time_batch->clear(); + } + while (time_decoder->has_remaining(time_in)) { + if (time_decoder->read_int64(t, time_in) != common::E_OK) { + break; + } + out_stat->update(t); + if (out_time_batch != nullptr) { + out_time_batch->push_back(t); + } + } + DecoderFactory::free(time_decoder); + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; + } + + /* Value chunk: parse layout and decode. */ + const char* value_buf = nullptr; + uint32_t value_buf_size = 0; + std::vector time_decode_buf; + const std::vector* times = nullptr; + + if (time_batch != nullptr && !time_batch->empty()) { + // Aligned value page: uncompressed layout = uint32(num_values) + bitmap + // + value_buf + if (uncompressed_size < 4) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; + } + uint32_t num_values = + (static_cast( + static_cast(uncompressed_buf[0])) + << 24) | + (static_cast( + static_cast(uncompressed_buf[1])) + << 16) | + (static_cast( + static_cast(uncompressed_buf[2])) + << 8) | + (static_cast( + static_cast(uncompressed_buf[3]))); + uint32_t bitmap_size = (num_values + 7) / 8; + if (uncompressed_size < 4 + bitmap_size) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; + } + value_buf = uncompressed_buf + 4 + bitmap_size; + value_buf_size = uncompressed_size - 4 - bitmap_size; + times = time_batch; + } else { + // Non-aligned value page: var_uint(time_buf_size) + time_buf + + // value_buf + int var_size = 0; + uint32_t time_buf_size = 0; + ret = common::SerializationUtil::read_var_uint( + time_buf_size, uncompressed_buf, + static_cast(uncompressed_size), &var_size); + if (ret != common::E_OK || + static_cast(var_size) + time_buf_size > + uncompressed_size) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return (ret == common::E_OK) ? common::E_TSFILE_CORRUPTED : ret; + } + const char* time_buf = uncompressed_buf + var_size; + value_buf = time_buf + time_buf_size; + value_buf_size = + uncompressed_size - static_cast(var_size) - time_buf_size; + Decoder* time_decoder = DecoderFactory::alloc_time_decoder(); + if (time_decoder == nullptr) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return common::E_OOM; + } + common::ByteStream time_in; + time_in.wrap_from(const_cast(time_buf), time_buf_size); + time_decoder->reset(); + time_decode_buf.clear(); + int64_t t; + while (time_decoder->has_remaining(time_in)) { + if (time_decoder->read_int64(t, time_in) != common::E_OK) { + break; + } + time_decode_buf.push_back(t); + } + DecoderFactory::free(time_decoder); + times = &time_decode_buf; + } + + Decoder* value_decoder = DecoderFactory::alloc_value_decoder( + chdr.encoding_type_, chdr.data_type_); + if (value_decoder == nullptr) { + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return common::E_OOM; + } + common::ByteStream value_in; + value_in.wrap_from(const_cast(value_buf), value_buf_size); + value_decoder->reset(); + size_t idx = 0; + const size_t num_times = times->size(); + while (idx < num_times && value_decoder->has_remaining(value_in)) { + int64_t t = (*times)[idx]; + switch (chdr.data_type_) { + case common::BOOLEAN: { + bool v; + if (value_decoder->read_boolean(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::INT32: + case common::DATE: { + int32_t v; + if (value_decoder->read_int32(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::INT64: + case common::TIMESTAMP: { + int64_t v; + if (value_decoder->read_int64(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::FLOAT: { + float v; + if (value_decoder->read_float(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::DOUBLE: { + double v; + if (value_decoder->read_double(v, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + case common::TEXT: + case common::BLOB: + case common::STRING: { + common::String v; + if (pa != nullptr && value_decoder->read_String( + v, *pa, value_in) == common::E_OK) { + out_stat->update(t, v); + } + break; + } + default: + break; + } + idx++; + } + DecoderFactory::free(value_decoder); + compressor->after_uncompress(uncompressed_buf); + CompressorFactory::free(compressor); + return E_OK; +} + +} // namespace + +RestorableTsFileIOWriter::RestorableTsFileIOWriter() + : TsFileIOWriter(), + write_file_(nullptr), + write_file_owned_(false), + truncated_size_(-1), + crashed_(false), + can_write_(false) { + self_check_arena_.init(512, MOD_TSFILE_READER); +} + +RestorableTsFileIOWriter::~RestorableTsFileIOWriter() { close(); } + +void RestorableTsFileIOWriter::close() { + if (write_file_owned_ && write_file_ != nullptr) { + write_file_->close(); + delete write_file_; + write_file_ = nullptr; + write_file_owned_ = false; + } + for (ChunkGroupMeta* cgm : self_check_recovered_cgm_) { + cgm->device_id_.reset(); + } + self_check_recovered_cgm_.clear(); + self_check_arena_.destroy(); +} + +int RestorableTsFileIOWriter::open(const std::string& file_path, + bool truncate_corrupted) { + if (write_file_ != nullptr) { + return E_ALREADY_EXIST; + } + + file_path_ = file_path; + write_file_ = new WriteFile(); + write_file_owned_ = true; + + // O_RDWR|O_CREAT without O_TRUNC: preserve existing file content +#ifdef _WIN32 + const int flags = O_RDWR | O_CREAT | O_BINARY; +#else + const int flags = O_RDWR | O_CREAT; +#endif + const mode_t mode = 0644; + + int ret = write_file_->create(file_path_, flags, mode); + if (ret != E_OK) { + close(); + return ret; + } + + ret = self_check(truncate_corrupted); + if (ret != E_OK) { + close(); + return ret; + } + + return E_OK; +} + +int RestorableTsFileIOWriter::self_check(bool truncate_corrupted) { + SelfCheckReader reader; + // Use a separate read-only handle for self-check: on Windows, sharing the + // O_RDWR fd can cause stale/cached reads when detecting a complete file. + int ret = reader.open(file_path_); + if (ret != E_OK) { + return ret; + } + + int32_t file_size = reader.file_size(); + + // --- Empty file: treat as crashed, allow writing from scratch --- + if (file_size == 0) { + reader.close(); + truncated_size_ = 0; + crashed_ = true; + can_write_ = true; + if (write_file_->seek_to_end() != E_OK) { + return E_FILE_READ_ERR; + } + ret = init(write_file_); + if (ret != E_OK) { + return ret; + } + ret = start_file(); + if (ret != E_OK) { + return ret; + } + return E_OK; + } + + // --- File too short or invalid header => not a valid TsFile --- + if (file_size < HEADER_LEN) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + char header_buf[HEADER_LEN]; + int32_t read_len = 0; + ret = reader.read(0, header_buf, HEADER_LEN, read_len); + if (ret != E_OK || read_len != HEADER_LEN) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + if (memcmp(header_buf, MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN) != 0) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + if (header_buf[MAGIC_STRING_TSFILE_LEN] != VERSION_NUM_BYTE) { + reader.close(); + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + return E_TSFILE_CORRUPTED; + } + + // --- Completeness check (aligned with Java isComplete()) --- + // Require size >= 2*magic + version_byte and tail magic same as head magic. + bool is_complete = false; + if (file_size >= static_cast(MAGIC_STRING_TSFILE_LEN * 2 + 1)) { + char tail_buf[MAGIC_STRING_TSFILE_LEN]; + ret = reader.read(file_size - MAGIC_STRING_TSFILE_LEN, tail_buf, + MAGIC_STRING_TSFILE_LEN, read_len); + if (ret == E_OK && read_len == MAGIC_STRING_TSFILE_LEN && + memcmp(tail_buf, MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN) == + 0) { + is_complete = true; + } + } + + // --- File is complete: no recovery, close write handle and return --- + if (is_complete) { + reader.close(); + truncated_size_ = TSFILE_CHECK_COMPLETE; + crashed_ = false; + can_write_ = false; + write_file_->close(); + delete write_file_; + write_file_ = nullptr; + write_file_owned_ = false; + return E_OK; + } + + // --- Recovery path: scan from header to find last valid truncation point + // --- + int64_t truncated = HEADER_LEN; + int64_t pos = HEADER_LEN; + std::vector buf(BUF_SIZE); + + // Recover schema and chunk group meta (aligned with Java selfCheck). + // cur_group_time_batch: timestamps decoded from time chunk, used by aligned + // value chunks. + std::shared_ptr cur_device_id; + ChunkGroupMeta* cur_cgm = nullptr; + std::vector recovered_cgm_list; + std::vector cur_group_time_batch; + + auto flush_chunk_group = [this, &cur_device_id, &cur_cgm, + &recovered_cgm_list]() { + if (cur_cgm != nullptr && cur_device_id != nullptr) { + get_schema()->update_table_schema(cur_cgm); + recovered_cgm_list.push_back(cur_cgm); + self_check_recovered_cgm_.push_back(cur_cgm); + cur_cgm = nullptr; + } + }; + + while (pos < file_size) { + unsigned char marker; + ret = reader.read(static_cast(pos), + reinterpret_cast(&marker), 1, read_len); + if (ret != E_OK || read_len != 1) { + break; + } + pos += 1; + + if (marker == static_cast(SEPARATOR_MARKER)) { + truncated = pos - 1; + flush_chunk_group(); + break; + } + + if (marker == static_cast(CHUNK_GROUP_HEADER_MARKER)) { + truncated = pos - 1; + flush_chunk_group(); + cur_group_time_batch.clear(); + int seg_len = 0; + ret = reader.read(static_cast(pos), buf.data(), BUF_SIZE, + read_len); + if (ret != E_OK || read_len < 1) { + break; + } + ByteStream bs; + bs.wrap_from(buf.data(), read_len); + cur_device_id = std::make_shared("init"); + ret = cur_device_id->deserialize(bs); + if (ret != E_OK) { + break; + } + seg_len = bs.read_pos(); + pos += seg_len; + cur_cgm = new (self_check_arena_.alloc(sizeof(ChunkGroupMeta))) + ChunkGroupMeta(&self_check_arena_); + cur_cgm->init(cur_device_id); + continue; + } + + if (marker == static_cast(OPERATION_INDEX_RANGE)) { + truncated = pos - 1; + flush_chunk_group(); + cur_device_id.reset(); + if (pos + 2 * 8 > static_cast(file_size)) { + break; + } + char range_buf[16]; + ret = + reader.read(static_cast(pos), range_buf, 16, read_len); + if (ret != E_OK || read_len != 16) { + break; + } + pos += 16; + truncated = pos; + continue; + } + + if (marker == static_cast(CHUNK_HEADER_MARKER) || + marker == + static_cast(ONLY_ONE_PAGE_CHUNK_HEADER_MARKER) || + (marker & 0x3F) == + static_cast(CHUNK_HEADER_MARKER) || + (marker & 0x3F) == + static_cast(ONLY_ONE_PAGE_CHUNK_HEADER_MARKER)) { + int64_t chunk_start = pos - 1; + int64_t consumed = 0; + ChunkHeader chdr; + ret = parse_chunk_header_and_skip(reader, chunk_start, consumed, + &chdr); + if (ret != E_OK) { + break; + } + pos = chunk_start + consumed; + truncated = pos; + if (cur_cgm != nullptr) { + void* cm_buf = self_check_arena_.alloc(sizeof(ChunkMeta)); + if (IS_NULL(cm_buf)) { + ret = common::E_OOM; + break; + } + auto* cm = new (cm_buf) ChunkMeta(); + common::String mname; + mname.dup_from(chdr.measurement_name_, self_check_arena_); + Statistic* stat = StatisticFactory::alloc_statistic_with_pa( + static_cast(chdr.data_type_), + &self_check_arena_); + if (IS_NULL(stat)) { + ret = common::E_OOM; + break; + } + stat->reset(); + if (chdr.data_size_ > 0) { + const int32_t header_len = + static_cast(consumed) - chdr.data_size_; + if (header_len > 0 && chunk_start + consumed <= + static_cast(file_size)) { + std::vector chunk_data(chdr.data_size_); + int32_t read_len = 0; + ret = reader.read( + static_cast(chunk_start + header_len), + chunk_data.data(), chdr.data_size_, read_len); + if (ret == E_OK && read_len == chdr.data_size_) { + ret = recover_chunk_statistic( + chdr, chunk_data.data(), chdr.data_size_, stat, + &self_check_arena_, &cur_group_time_batch, + &cur_group_time_batch); + } + if (ret != E_OK) { + break; + } + } + } + cm->init(mname, + static_cast(chdr.data_type_), + chunk_start, stat, 0, + static_cast(chdr.encoding_type_), + static_cast( + chdr.compression_type_), + self_check_arena_); + cur_cgm->push(cm); + if (cur_device_id != nullptr && + (static_cast(chdr.chunk_type_) & + kTimeChunkTypeMask) != 0) { + // For aligned series, a time chunk implies this device + // uses aligned layout. Record it so recovered writer state + // can keep alignment behavior consistent. + aligned_devices_.insert(cur_device_id->get_table_name()); + } + } + continue; + } + + truncated_size_ = TSFILE_CHECK_INCOMPATIBLE; + flush_chunk_group(); + reader.close(); + return E_TSFILE_CORRUPTED; + } + + flush_chunk_group(); + get_schema()->finalize_table_schemas(); + reader.close(); + truncated_size_ = truncated; + + // --- Optionally truncate file to last valid offset --- + if (truncate_corrupted && truncated < static_cast(file_size)) { + ret = write_file_->truncate(truncated); + if (ret != E_OK) { + return ret; + } + } + + if (write_file_->seek_to_end() != E_OK) { + return E_FILE_READ_ERR; + } + + crashed_ = true; + can_write_ = true; + + ret = init(write_file_); + if (ret != E_OK) { + return ret; + } + + // --- Restore write_stream_ logical position from existing file size --- + const int64_t restored_size = write_file_->get_position(); + if (restored_size > 0) { + ret = restore_recovered_file_position(restored_size); + if (ret != E_OK) { + return ret; + } + } + + // --- Attach recovered ChunkGroupMeta to writer; destroy() will not free + // them --- + for (ChunkGroupMeta* cgm : recovered_cgm_list) { + push_chunk_group_meta(cgm); + } + chunk_group_meta_from_recovery_ = true; + + return E_OK; +} + +bool RestorableTsFileIOWriter::is_device_aligned( + const std::string& device) const { + return aligned_devices_.find(device) != aligned_devices_.end(); +} + +TsFileIOWriter* RestorableTsFileIOWriter::get_tsfile_io_writer() { + return can_write_ ? this : nullptr; +} + +WriteFile* RestorableTsFileIOWriter::get_write_file() { + return can_write_ ? write_file_ : nullptr; +} + +std::string RestorableTsFileIOWriter::get_file_path() const { + return file_path_; +} + +} // namespace storage diff --git a/cpp/src/file/restorable_tsfile_io_writer.h b/cpp/src/file/restorable_tsfile_io_writer.h new file mode 100644 index 000000000..051bf7d8c --- /dev/null +++ b/cpp/src/file/restorable_tsfile_io_writer.h @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef FILE_RESTORABLE_TSFILE_IO_WRITER_H +#define FILE_RESTORABLE_TSFILE_IO_WRITER_H + +#include +#include +#include +#include + +#include "common/schema.h" +#include "common/tsfile_common.h" +#include "file/tsfile_io_writer.h" +#include "file/write_file.h" + +namespace storage { + +/** + * TsFile check status constants for self-check result. + * COMPLETE_FILE (0): File is complete, no recovery needed. + * INCOMPATIBLE_FILE (-2): File is not in TsFile format. + */ +constexpr int64_t TSFILE_CHECK_COMPLETE = 0; +constexpr int64_t TSFILE_CHECK_INCOMPATIBLE = -2; + +/** + * RestorableTsFileIOWriter opens and optionally recovers a TsFile. + * Inherits from TsFileIOWriter for continued writing after recovery. + * + * (1) If the TsFile is closed normally: has_crashed()=false, can_write()=false + * + * (2) If the TsFile is incomplete/crashed: has_crashed()=true, + * can_write()=true, the writer truncates corrupted data and allows continued + * writing. + * + * Uses standard C++11 and avoids memory leaks via RAII and smart pointers. + */ +class RestorableTsFileIOWriter : public TsFileIOWriter { + public: + RestorableTsFileIOWriter(); + ~RestorableTsFileIOWriter(); + + // Non-copyable + RestorableTsFileIOWriter(const RestorableTsFileIOWriter&) = delete; + RestorableTsFileIOWriter& operator=(const RestorableTsFileIOWriter&) = + delete; + + /** + * Open a TsFile for recovery/append. + * Uses O_RDWR|O_CREAT without O_TRUNC, so existing file content is + * preserved. + * + * @param file_path Path to the TsFile + * @param truncate_corrupted If true, truncate corrupted data. If false, + * do not truncate (incomplete file will remain as-is). + * @return E_OK on success, error code otherwise. + */ + int open(const std::string& file_path, bool truncate_corrupted = true); + + void close(); + + bool can_write() const { return can_write_; } + bool has_crashed() const { return crashed_; } + int64_t get_truncated_size() const { return truncated_size_; } + std::shared_ptr get_known_schema() { return get_schema(); } + + /** True if the device was recovered as aligned (has time column). */ + bool is_device_aligned(const std::string& device) const; + + /** + * Recovered chunk group metas from self_check (actual device_id and chunk + * metas from file). TsFileWriter::init() uses this to rebuild schemas_ + * with the real device keys (aligned with Java). Valid until close(). + */ + const std::vector& get_recovered_chunk_group_metas() + const { + return self_check_recovered_cgm_; + } + + /** + * Get the TsFileIOWriter for continued writing. Only valid when + * can_write() is true. Returns this (since we inherit TsFileIOWriter). + */ + TsFileIOWriter* get_tsfile_io_writer(); + + /** + * Get the WriteFile for TsFileWriter::init(). Only valid when can_write(). + * Caller must not destroy the returned pointer. + */ + WriteFile* get_write_file(); + + std::string get_file_path() const; + + private: + int self_check(bool truncate_corrupted); + + private: + std::string file_path_; + WriteFile* write_file_; + bool write_file_owned_; + + int64_t truncated_size_; + bool crashed_; + bool can_write_; + + std::set aligned_devices_; + common::PageArena self_check_arena_; + /** ChunkGroupMeta* allocated from self_check_arena_; reset device_id before + * arena destroy to avoid leak. */ + std::vector self_check_recovered_cgm_; +}; + +} // namespace storage + +#endif // FILE_RESTORABLE_TSFILE_IO_WRITER_H diff --git a/cpp/src/file/tsfile_io_writer.cc b/cpp/src/file/tsfile_io_writer.cc index 200ab84f9..2aba1e39c 100644 --- a/cpp/src/file/tsfile_io_writer.cc +++ b/cpp/src/file/tsfile_io_writer.cc @@ -50,17 +50,21 @@ int TsFileIOWriter::init(WriteFile* write_file) { } void TsFileIOWriter::destroy() { - for (auto iter = chunk_group_meta_list_.begin(); - iter != chunk_group_meta_list_.end(); iter++) { - if (iter.get() && iter.get()->device_id_) { - iter.get()->device_id_.reset(); - } - if (iter.get()) { - for (auto chunk_meta = iter.get()->chunk_meta_list_.begin(); - chunk_meta != iter.get()->chunk_meta_list_.end(); - chunk_meta++) { - if (chunk_meta.get()) { - chunk_meta.get()->statistic_->destroy(); + // When meta came from RestorableTsFileIOWriter recovery, entries live in + // an arena there; do not release device_id_/statistic_ here. + if (!chunk_group_meta_from_recovery_) { + for (auto iter = chunk_group_meta_list_.begin(); + iter != chunk_group_meta_list_.end(); iter++) { + if (iter.get() && iter.get()->device_id_) { + iter.get()->device_id_.reset(); + } + if (iter.get()) { + for (auto chunk_meta = iter.get()->chunk_meta_list_.begin(); + chunk_meta != iter.get()->chunk_meta_list_.end(); + chunk_meta++) { + if (chunk_meta.get()) { + chunk_meta.get()->statistic_->destroy(); + } } } } @@ -812,6 +816,14 @@ int TsFileIOWriter::clone_node_list( return ret; } +int TsFileIOWriter::restore_recovered_file_position(int64_t recovered_size) { + if (recovered_size < 0) { + return E_INVALID_ARG; + } + file_base_offset_ = recovered_size; + return E_OK; +} + // #if DEBUG_SE // void DEBUG_print_byte_stream_buf(const char *tag, // const char *buf, @@ -844,10 +856,9 @@ int TsFileIOWriter::flush_stream_to_file() { write_stream_consumer_.get_next_buf(write_stream_); if (b.buf_ == nullptr) { break; - } else { - if (RET_FAIL(file_->write(b.buf_, b.len_))) { - break; - } + } + if (b.len_ > 0 && RET_FAIL(file_->write(b.buf_, b.len_))) { + break; } } diff --git a/cpp/src/file/tsfile_io_writer.h b/cpp/src/file/tsfile_io_writer.h index a7d0a1404..8fcc8fa55 100644 --- a/cpp/src/file/tsfile_io_writer.h +++ b/cpp/src/file/tsfile_io_writer.h @@ -117,7 +117,7 @@ class TsFileIOWriter { int flush_stream_to_file(); int write_chunk_data(common::ByteStream& chunk_data); FORCE_INLINE int64_t cur_file_position() const { - return write_stream_.total_size(); + return file_base_offset_ + write_stream_.total_size(); } FORCE_INLINE int write_buf(const char* buf, uint32_t len) { return write_stream_.write_buf(buf, len); @@ -184,6 +184,25 @@ class TsFileIOWriter { int init_bloom_filter(BloomFilter& filter); int32_t get_path_count(common::SimpleList& cgm_list); + // for open file + void add_ts_time_index_entry(TimeseriesIndex& ts_index); + + protected: + /** For RestorableTsFileIOWriter: append a recovered ChunkGroupMeta. */ + void push_chunk_group_meta(ChunkGroupMeta* cgm) { + chunk_group_meta_list_.push_back(cgm); + } + /** True when chunk_group_meta_list_ entries are from recovery arena; + * destroy() must not free them. */ + bool chunk_group_meta_from_recovery_ = false; + /** + * Recovery only: set file_base_offset_ so that cur_file_position() returns + * correct absolute offsets. After recovery the writer behaves as if the + * file was just flushed — write_stream_ starts empty and only holds new + * data. + */ + int restore_recovered_file_position(int64_t recovered_size); + private: common::PageArena meta_allocator_; common::ByteStream write_stream_; @@ -202,6 +221,11 @@ class TsFileIOWriter { std::string encrypt_type_; std::string encrypt_key_; bool is_aligned_; + /** Recovery only: absolute file offset at which write_stream_ logically + * begins. Normal (non-recovery) path keeps this at 0. */ + int64_t file_base_offset_ = 0; + + friend class RestorableTsFileIOWriter; // uses push_chunk_group_meta }; } // end namespace storage diff --git a/cpp/src/file/write_file.cc b/cpp/src/file/write_file.cc index d8a17aa56..8ad96fab2 100644 --- a/cpp/src/file/write_file.cc +++ b/cpp/src/file/write_file.cc @@ -32,6 +32,7 @@ #include "utils/errno_define.h" #ifdef _WIN32 +#include int fsync(int); #endif @@ -105,7 +106,13 @@ int WriteFile::sync() { } int WriteFile::close() { - ASSERT(fd_ > 0); + // Idempotent: already closed is not an error + if (fd_ < 0) { +#ifdef DEBUG_SE + std::cout << "file already closed, path=" << path_; +#endif + return E_OK; + } if (::close(fd_) < 0) { #ifdef DEBUG_SE std::cout << "failed to close " << path_ << " errorno " << errno @@ -121,6 +128,48 @@ int WriteFile::close() { return E_OK; } +int WriteFile::truncate(int64_t size) { + ASSERT(fd_ > 0); +#ifdef _WIN32 + if (_chsize_s(fd_, static_cast(size)) != 0) { + return E_FILE_WRITE_ERR; + } +#else + if (::ftruncate(fd_, static_cast(size)) < 0) { + return E_FILE_WRITE_ERR; + } +#endif + return E_OK; +} + +int WriteFile::seek_to_end() { + ASSERT(fd_ > 0); +#ifdef _WIN32 + if (_lseeki64(fd_, 0, SEEK_END) < 0) { + return E_FILE_READ_ERR; + } +#else + if (::lseek(fd_, 0, SEEK_END) < 0) { + return E_FILE_READ_ERR; + } +#endif + return E_OK; +} + +int64_t WriteFile::get_position() { + if (fd_ < 0) { + return 0; + } + // SEEK_CUR with offset 0 returns current position without moving +#ifdef _WIN32 + int64_t pos = _lseeki64(fd_, 0, SEEK_CUR); + return (pos < 0) ? 0 : pos; +#else + off_t pos = ::lseek(fd_, 0, SEEK_CUR); + return (pos < 0) ? 0 : static_cast(pos); +#endif +} + } // end namespace storage #ifdef _WIN32 diff --git a/cpp/src/file/write_file.h b/cpp/src/file/write_file.h index 6b5a506a8..9a5bce6e8 100644 --- a/cpp/src/file/write_file.h +++ b/cpp/src/file/write_file.h @@ -36,7 +36,14 @@ class WriteFile { FORCE_INLINE int get_fd() const { return fd_; } int sync(); int close(); + /** Truncate file to the given size (bytes). File must be open. */ + int truncate(int64_t size); + /** Seek to end of file. Used after open to position for append. */ + int seek_to_end(); FORCE_INLINE std::string get_file_path() { return path_; } + /** Current file offset. After seek_to_end(), equals file size (for + * recovery). */ + int64_t get_position(); private: int do_create(int flags, mode_t mode); diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index f97570885..84188b6a3 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -29,7 +29,9 @@ namespace storage { TsFileReader::TsFileReader() : read_file_(nullptr), tsfile_executor_(nullptr), - table_query_executor_(nullptr) {} + table_query_executor_(nullptr) { + tsfile_reader_meta_pa_.init(512, MOD_TSFILE_READER); +} TsFileReader::~TsFileReader() { close(); } @@ -224,6 +226,10 @@ std::vector> TsFileReader::get_all_device_ids() { return device_ids; } +std::vector> TsFileReader::get_all_devices() { + return get_all_device_ids(); +} + int TsFileReader::get_all_devices( std::vector>& device_ids, std::shared_ptr index_node, PageArena& pa) { @@ -291,6 +297,53 @@ int TsFileReader::get_timeseries_schema( return E_OK; } +int TsFileReader::get_timeseries_metadata_impl( + std::shared_ptr device_id, + std::vector>& result) { + int ret = E_OK; + std::vector timeseries_indexs; + tsfile_reader_meta_pa_.init(512, MOD_TSFILE_READER); + // Pointers are owned by tsfile_reader_meta_pa_; shared_ptr must not delete + auto noop_deleter = [](ITimeseriesIndex*) {}; + if (RET_FAIL( + tsfile_executor_->get_tsfile_io_reader() + ->get_device_timeseries_meta_without_chunk_meta( + device_id, timeseries_indexs, tsfile_reader_meta_pa_))) { + } else { + for (auto timeseries_index : timeseries_indexs) { + result.emplace_back(std::shared_ptr( + timeseries_index, noop_deleter)); + } + } + return ret; +} + +DeviceTimeseriesMetadataMap TsFileReader::get_timeseries_metadata( + const std::vector>& device_ids) { + DeviceTimeseriesMetadataMap result; + for (const auto& device_id : device_ids) { + std::vector> list; + if (get_timeseries_metadata_impl(device_id, list) == E_OK) { + result.insert(std::make_pair(device_id, std::move(list))); + } + // Skip non-existent devices (not inserted) + } + return result; +} + +DeviceTimeseriesMetadataMap TsFileReader::get_timeseries_metadata() { + // Collect metadata for all devices present in the file + DeviceTimeseriesMetadataMap result; + auto device_ids = get_all_device_ids(); + for (const auto& device_id : device_ids) { + std::vector> list; + if (get_timeseries_metadata_impl(device_id, list) == E_OK) { + result.insert(std::make_pair(device_id, std::move(list))); + } + } + return result; +} + ResultSet* TsFileReader::read_timeseries( const std::shared_ptr& device_id, const std::vector& measurement_name) { diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h index 8a6ba2264..6c8563563 100644 --- a/cpp/src/reader/tsfile_reader.h +++ b/cpp/src/reader/tsfile_reader.h @@ -142,6 +142,13 @@ class TsFileReader { */ std::vector> get_all_device_ids(); + /** + * @brief Get all device IDs in the file (same as get_all_device_ids). + * + * @return std::vector> the device list + */ + std::vector> get_all_devices(); + /** * @brief get the timeseries schema by the device id and measurement name * @@ -152,6 +159,26 @@ class TsFileReader { */ int get_timeseries_schema(std::shared_ptr device_id, std::vector& result); + + /** + * @brief Get timeseries metadata for specified devices. + * + * Only devices that exist in the file are included in the result. + * If device_ids is empty, returns an empty map. + * + * @param device_ids device list to query + * @return map: IDeviceID -> list of timeseries metadata (only existing) + */ + DeviceTimeseriesMetadataMap get_timeseries_metadata( + const std::vector>& device_ids); + + /** + * @brief Get timeseries metadata for all devices in the file. + * + * @return map: IDeviceID -> list of timeseries metadata + */ + DeviceTimeseriesMetadataMap get_timeseries_metadata(); + /** * @brief get the table schema by the table name * @@ -168,12 +195,16 @@ class TsFileReader { std::vector> get_all_table_schemas(); private: + int get_timeseries_metadata_impl( + std::shared_ptr device_id, + std::vector>& result); int get_all_devices(std::vector>& device_ids, std::shared_ptr index_node, common::PageArena& pa); storage::ReadFile* read_file_; storage::TsFileExecutor* tsfile_executor_; storage::TableQueryExecutor* table_query_executor_; + common::PageArena tsfile_reader_meta_pa_; }; } // namespace storage diff --git a/cpp/src/reader/tsfile_tree_reader.cc b/cpp/src/reader/tsfile_tree_reader.cc index 2b28c8647..1b58c359d 100644 --- a/cpp/src/reader/tsfile_tree_reader.cc +++ b/cpp/src/reader/tsfile_tree_reader.cc @@ -68,4 +68,17 @@ std::vector TsFileTreeReader::get_all_device_ids() { return ret_device_ids; } -} // namespace storage \ No newline at end of file +std::vector> TsFileTreeReader::get_all_devices() { + return tsfile_reader_->get_all_devices(); +} + +DeviceTimeseriesMetadataMap TsFileTreeReader::get_timeseries_metadata( + const std::vector>& device_ids) { + return tsfile_reader_->get_timeseries_metadata(device_ids); +} + +DeviceTimeseriesMetadataMap TsFileTreeReader::get_timeseries_metadata() { + return tsfile_reader_->get_timeseries_metadata(); +} + +} // namespace storage diff --git a/cpp/src/reader/tsfile_tree_reader.h b/cpp/src/reader/tsfile_tree_reader.h index 66341b7ed..535180409 100644 --- a/cpp/src/reader/tsfile_tree_reader.h +++ b/cpp/src/reader/tsfile_tree_reader.h @@ -89,14 +89,37 @@ class TsFileTreeReader { const std::string& device_id); /** - * @brief Get all device identifiers in the TsFile + * @brief Get all device identifiers in the TsFile (string form). * - * @return Vector containing all device identifiers found in the TsFile - * @note The returned vector will be empty if no devices are found or file - * is not opened + * @return Vector of device identifier strings */ std::vector get_all_device_ids(); + /** + * @brief Get all devices in the file (IDeviceID form). + * + * @return Vector of IDeviceID for all devices + */ + std::vector> get_all_devices(); + + /** + * @brief Get timeseries metadata for specified devices. + * + * Only devices that exist in the file are included. + * + * @param device_ids device list to query + * @return map: IDeviceID -> list of timeseries metadata (only existing) + */ + DeviceTimeseriesMetadataMap get_timeseries_metadata( + const std::vector>& device_ids); + + /** + * @brief Get timeseries metadata for all devices in the file. + * + * @return map: IDeviceID -> list of timeseries metadata + */ + DeviceTimeseriesMetadataMap get_timeseries_metadata(); + private: std::shared_ptr tsfile_reader_; ///< Underlying TsFile reader implementation diff --git a/cpp/src/writer/tsfile_table_writer.cc b/cpp/src/writer/tsfile_table_writer.cc index 6dd990188..eb0319af8 100644 --- a/cpp/src/writer/tsfile_table_writer.cc +++ b/cpp/src/writer/tsfile_table_writer.cc @@ -19,6 +19,32 @@ #include "tsfile_table_writer.h" +#include "file/restorable_tsfile_io_writer.h" + +namespace storage { + +// Constructor for appending after recovery: schema comes from restored file. +TsFileTableWriter::TsFileTableWriter( + storage::RestorableTsFileIOWriter* restorable_writer, + uint64_t memory_threshold) + : error_number(common::E_OK) { + tsfile_writer_ = std::make_shared(); + error_number = tsfile_writer_->init(restorable_writer); + if (error_number != common::E_OK) { + return; + } + tsfile_writer_->set_generate_table_schema(false); + std::shared_ptr schema = restorable_writer->get_known_schema(); + if (schema && schema->table_schema_map_.size() == 1) { + exclusive_table_name_ = schema->table_schema_map_.begin()->first; + } else { + exclusive_table_name_.clear(); + } + common::g_config_value_.chunk_group_size_threshold_ = memory_threshold; +} + +} // namespace storage + storage::TsFileTableWriter::~TsFileTableWriter() = default; int storage::TsFileTableWriter::register_table( diff --git a/cpp/src/writer/tsfile_table_writer.h b/cpp/src/writer/tsfile_table_writer.h index d3fc918b7..ce18bc007 100644 --- a/cpp/src/writer/tsfile_table_writer.h +++ b/cpp/src/writer/tsfile_table_writer.h @@ -22,6 +22,7 @@ #include "writer/tsfile_writer.h" namespace storage { +class RestorableTsFileIOWriter; /** * @brief Facilitates writing structured table data into a TsFile with a @@ -66,6 +67,19 @@ class TsFileTableWriter { common::g_config_value_.chunk_group_size_threshold_ = memory_threshold; } + /** + * Constructs a TsFileTableWriter from a RestorableTsFileIOWriter so that + * table data can be appended after recovery. Schema is taken from the + * restored file; do not pass a TableSchema. + * + * @param restorable_writer Restored I/O writer; must not be null and must + * have been opened with truncate so that can_write() is true. + * @param memory_threshold Optional memory threshold for buffered data. + */ + explicit TsFileTableWriter( + storage::RestorableTsFileIOWriter* restorable_writer, + uint64_t memory_threshold = 128 * 1024 * 1024); + ~TsFileTableWriter(); /** * Registers a table schema with the writer. diff --git a/cpp/src/writer/tsfile_tree_writer.cc b/cpp/src/writer/tsfile_tree_writer.cc index 59c11914d..28913c2bd 100644 --- a/cpp/src/writer/tsfile_tree_writer.cc +++ b/cpp/src/writer/tsfile_tree_writer.cc @@ -19,6 +19,8 @@ #include "writer/tsfile_tree_writer.h" +#include "file/restorable_tsfile_io_writer.h" + namespace storage { TsFileTreeWriter::TsFileTreeWriter(storage::WriteFile* writer_file, @@ -28,6 +30,16 @@ TsFileTreeWriter::TsFileTreeWriter(storage::WriteFile* writer_file, common::g_config_value_.chunk_group_size_threshold_ = memory_threshold; } +// Constructor for appending after recovery: schema and alignment from restored +// file. +TsFileTreeWriter::TsFileTreeWriter( + storage::RestorableTsFileIOWriter* restorable_writer, + uint64_t memory_threshold) { + tsfile_writer_ = std::make_shared(); + tsfile_writer_->init(restorable_writer); + common::g_config_value_.chunk_group_size_threshold_ = memory_threshold; +} + int TsFileTreeWriter::register_timeseries(std::string& device_id, MeasurementSchema* schema) { return tsfile_writer_->register_timeseries(device_id, *schema); diff --git a/cpp/src/writer/tsfile_tree_writer.h b/cpp/src/writer/tsfile_tree_writer.h index 90ef0d76f..3c21c23da 100644 --- a/cpp/src/writer/tsfile_tree_writer.h +++ b/cpp/src/writer/tsfile_tree_writer.h @@ -26,6 +26,7 @@ #include "tsfile_writer.h" namespace storage { +class RestorableTsFileIOWriter; /** * @brief Provides an interface for writing hierarchical (tree-structured) @@ -56,6 +57,19 @@ class TsFileTreeWriter { explicit TsFileTreeWriter(storage::WriteFile* writer_file, uint64_t memory_threshold = 128 * 1024 * 1024); + /** + * Constructs a TsFileTreeWriter from a RestorableTsFileIOWriter so that + * data can be appended after recovery (schema and alignment are taken from + * the restored file). + * + * @param restorable_writer Restored I/O writer; must not be null and must + * have been opened and scanned (e.g. after truncate recovery). + * @param memory_threshold Optional memory threshold for buffered data. + */ + explicit TsFileTreeWriter( + storage::RestorableTsFileIOWriter* restorable_writer, + uint64_t memory_threshold = 128 * 1024 * 1024); + /** * Registers a single (non-aligned) time series under the given device ID. * diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 2c2e46b97..9a087a82f 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -23,6 +23,7 @@ #include "chunk_writer.h" #include "common/config/config.h" +#include "file/restorable_tsfile_io_writer.h" #include "file/tsfile_io_writer.h" #include "file/write_file.h" #include "utils/errno_define.h" @@ -68,7 +69,8 @@ TsFileWriter::TsFileWriter() record_count_since_last_flush_(0), record_count_for_next_mem_check_( g_config_value_.record_count_for_next_mem_check_), - write_file_created_(false) {} + write_file_created_(false), + io_writer_owned_(true) {} TsFileWriter::~TsFileWriter() { destroy(); } @@ -77,10 +79,10 @@ void TsFileWriter::destroy() { delete write_file_; write_file_ = nullptr; } - if (io_writer_) { + if (io_writer_owned_ && io_writer_) { delete io_writer_; - io_writer_ = nullptr; } + io_writer_ = nullptr; DeviceSchemasMapIter dev_iter; // cppcheck-suppress postfixOperator for (dev_iter = schemas_.begin(); dev_iter != schemas_.end(); dev_iter++) { @@ -113,11 +115,80 @@ int TsFileWriter::init(WriteFile* write_file) { } write_file_ = write_file; write_file_created_ = false; + io_writer_owned_ = true; io_writer_ = new TsFileIOWriter(); io_writer_->init(write_file_); return E_OK; } +// ----------------------------------------------------------------------------- +// Recovery init: rebuild schemas_ from recovered chunk group metas (aligned +// with Java). Use each CGM's actual device_id from file as key so tree and +// table model both get correct lookups. Table model can still lazy-create from +// table_schema_map_ in do_check_schema_table when a new device appears. +// All new MeasurementSchemaGroup/MeasurementSchema are freed in destroy(). +// ----------------------------------------------------------------------------- +int TsFileWriter::init(RestorableTsFileIOWriter* rw) { + if (rw == nullptr || !rw->can_write()) { + return E_INVALID_ARG; + } + write_file_ = rw->get_write_file(); + write_file_created_ = false; + io_writer_owned_ = false; + io_writer_ = rw; + + const std::vector& recovered = + rw->get_recovered_chunk_group_metas(); + for (ChunkGroupMeta* cgm : recovered) { + if (cgm == nullptr || cgm->device_id_ == nullptr) { + continue; + } + std::shared_ptr device_id = cgm->device_id_; + + // Find existing group for same device (same device may have multiple + // CGMs from multiple flushes). + DeviceSchemasMapIter it = schemas_.begin(); + for (; it != schemas_.end(); ++it) { + if (it->first != nullptr && *it->first == *device_id) { + break; + } + } + + MeasurementSchemaGroup* group = nullptr; + if (it != schemas_.end()) { + group = it->second; + } else { + group = new MeasurementSchemaGroup; + group->is_aligned_ = + rw->is_device_aligned(device_id->get_table_name()); + schemas_.insert(std::make_pair(device_id, group)); + } + + // Add measurement schemas from this CGM (skip time column: empty name). + for (auto iter = cgm->chunk_meta_list_.begin(); + iter != cgm->chunk_meta_list_.end(); iter++) { + ChunkMeta* cm = iter.get(); + if (cm == nullptr) { + continue; + } + std::string mname = cm->measurement_name_.to_std_string(); + if (mname.empty()) { + continue; + } + if (group->measurement_schema_map_.find(mname) != + group->measurement_schema_map_.end()) { + continue; + } + MeasurementSchema* ms = new MeasurementSchema( + mname, cm->data_type_, cm->encoding_, cm->compression_type_); + group->measurement_schema_map_.insert(std::make_pair(mname, ms)); + } + } + + start_file_done_ = true; + return E_OK; +} + void TsFileWriter::set_generate_table_schema(bool generate_table_schema) { io_writer_->set_generate_table_schema(generate_table_schema); } @@ -495,6 +566,15 @@ int TsFileWriter::do_check_schema_table( schemas_[device_id] = device_schema; } + // After recovery, device_schema may exist but time_chunk_writer_ not yet + // created + if (IS_NULL(device_schema->time_chunk_writer_)) { + device_schema->time_chunk_writer_ = new TimeChunkWriter(); + device_schema->time_chunk_writer_->init( + "", g_config_value_.time_encoding_type_, + g_config_value_.time_compress_type_); + } + uint32_t column_cnt = tablet.get_column_count(); time_chunk_writer = device_schema->time_chunk_writer_; MeasurementSchemaMap& msm = device_schema->measurement_schema_map_; diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index e80a1232b..85c47db7f 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -37,6 +37,7 @@ namespace storage { class WriteFile; class ChunkWriter; class TsFileIOWriter; +class RestorableTsFileIOWriter; } // namespace storage namespace storage { @@ -55,6 +56,7 @@ class TsFileWriter { int open(const std::string& file_path, int flags, mode_t mode); int open(const std::string& file_path); int init(storage::WriteFile* write_file); + int init(storage::RestorableTsFileIOWriter* rw); void set_generate_table_schema(bool generate_table_schema); int register_timeseries(const std::string& device_id, @@ -183,6 +185,7 @@ class TsFileWriter { // record count for next memory check int64_t record_count_for_next_mem_check_; bool write_file_created_; + bool io_writer_owned_; // false when init(RestorableTsFileIOWriter*) bool table_aligned_ = true; int write_typed_column(ValueChunkWriter* value_chunk_writer, diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index 423381e4f..2be9c1b2c 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -154,10 +154,13 @@ target_link_libraries( set_target_properties(TsFile_Test PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${LIB_TSFILE_SDK_DIR}) +# On Windows, copy tsfile DLL next to the test exe so it can load at runtime +# (and when gtest_discover_tests runs the exe). Use TARGET_FILE so the path +# is correct for the current build config (e.g. Release). if (WIN32) add_custom_command(TARGET TsFile_Test POST_BUILD - COMMAND ${CMAKE_COMMAND} -E copy - "${LIBRARY_OUTPUT_PATH}/libtsfile.dll" + COMMAND ${CMAKE_COMMAND} -E copy_if_different + $ "$" COMMENT "Copying libtsfile.dll to test executable directory" VERBATIM @@ -165,4 +168,11 @@ if (WIN32) endif () include(GoogleTest) -gtest_discover_tests(TsFile_Test) \ No newline at end of file +# On Windows, delay test discovery until ctest runs (PRE_TEST) so the test exe +# runs with the correct env (e.g. PATH has MinGW, libtsfile.dll is present). +# Avoids 0xc0000139 when discovery runs at build time. +if(WIN32) + gtest_discover_tests(TsFile_Test DISCOVERY_MODE PRE_TEST) +else() + gtest_discover_tests(TsFile_Test) +endif() \ No newline at end of file diff --git a/cpp/test/file/restorable_tsfile_io_writer_test.cc b/cpp/test/file/restorable_tsfile_io_writer_test.cc new file mode 100644 index 000000000..655995d35 --- /dev/null +++ b/cpp/test/file/restorable_tsfile_io_writer_test.cc @@ -0,0 +1,497 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Unit tests for RestorableTsFileIOWriter. + * Covers: empty/invalid/complete file open, truncate recovery, continued write + * with TsFileWriter/TsFileTreeWriter/TsFileTableWriter, and read-back verify. + */ + +#include "file/restorable_tsfile_io_writer.h" + +#include + +#include +#include + +#include "common/record.h" +#include "common/schema.h" +#include "common/tablet.h" +#include "common/tsfile_common.h" +#include "file/write_file.h" +#include "reader/tsfile_reader.h" +#include "reader/tsfile_tree_reader.h" +#include "writer/tsfile_table_writer.h" +#include "writer/tsfile_tree_writer.h" +#include "writer/tsfile_writer.h" + +namespace storage { +class ResultSet; +} +using namespace storage; +using namespace common; + +// ----------------------------------------------------------------------------- +// Helpers used by multiple tests (file flags, file size, corrupt tail) +// ----------------------------------------------------------------------------- + +static int GetWriteCreateFlags() { + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + return flags; +} + +static int64_t GetFileSize(const std::string& path) { + std::ifstream f(path, std::ios::binary | std::ios::ate); + return static_cast(f.tellg()); +} + +/** Overwrite the last num_bytes of the file with zeros to simulate corruption. + */ +static void CorruptFileTail(const std::string& path, int num_bytes) { + const int64_t full_size = GetFileSize(path); + std::ofstream out(path, std::ios::binary | std::ios::in); + out.seekp(full_size - static_cast(num_bytes)); + for (int i = 0; i < num_bytes; ++i) { + out.put(0); + } + out.close(); +} + +/** Query tree reader and return row count; destroys query result. */ +static int CountTreeReaderRows( + TsFileTreeReader& reader, const std::vector& measurement_ids) { + auto device_ids = reader.get_all_device_ids(); + ResultSet* result = nullptr; + int ret = + reader.query(device_ids, measurement_ids, INT64_MIN, INT64_MAX, result); + if (ret != E_OK || result == nullptr) { + return -1; + } + int count = 0; + for (auto it = result->iterator(); it.hasNext(); it.next()) { + ++count; + } + reader.destroy_query_data_set(result); + return count; +} + +// ----------------------------------------------------------------------------- +// Test fixture +// ----------------------------------------------------------------------------- + +class RestorableTsFileIOWriterTest : public ::testing::Test { + protected: + void SetUp() override { + libtsfile_init(); + file_name_ = std::string("restorable_tsfile_io_writer_test_") + + generate_random_string(10) + std::string(".tsfile"); + remove(file_name_.c_str()); + } + + void TearDown() override { + remove(file_name_.c_str()); + libtsfile_destroy(); + } + + int64_t GetCurrentFileSize() const { return GetFileSize(file_name_); } + void CorruptCurrentFileTail(int num_bytes) { + CorruptFileTail(file_name_, num_bytes); + } + + std::string file_name_; + + static std::string generate_random_string(int length) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 61); + const std::string chars = + "0123456789" + "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + std::string s; + s.reserve(static_cast(length)); + for (int i = 0; i < length; ++i) { + s += chars[static_cast(dis(gen))]; + } + return s; + } +}; + +// ----------------------------------------------------------------------------- +// Open behavior: empty file, bad magic, complete file, truncated file, +// header-only +// ----------------------------------------------------------------------------- + +TEST_F(RestorableTsFileIOWriterTest, OpenEmptyFile) { + RestorableTsFileIOWriter writer; + ASSERT_EQ(writer.open(file_name_, true), E_OK); + EXPECT_TRUE(writer.can_write()); + EXPECT_TRUE(writer.has_crashed()); + EXPECT_EQ(writer.get_truncated_size(), 0); + EXPECT_NE(writer.get_tsfile_io_writer(), nullptr); + writer.close(); +} + +TEST_F(RestorableTsFileIOWriterTest, OpenBadMagicFile) { + std::ofstream f(file_name_); + f.write("BadFile", 7); + f.close(); + + RestorableTsFileIOWriter writer; + EXPECT_NE(writer.open(file_name_, true), E_OK); + EXPECT_EQ(writer.get_truncated_size(), TSFILE_CHECK_INCOMPATIBLE); + writer.close(); +} + +TEST_F(RestorableTsFileIOWriterTest, OpenCompleteFile) { + TsFileWriter tw; + ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK); + tw.register_timeseries( + "d1", + MeasurementSchema("s1", FLOAT, GORILLA, CompressionType::UNCOMPRESSED)); + TsRecord record(1, "d1"); + record.add_point("s1", 1.0f); + tw.write_record(record); + record.timestamp_ = 2; + tw.write_record(record); + tw.flush(); + tw.close(); + + RestorableTsFileIOWriter writer; + ASSERT_EQ(writer.open(file_name_, true), E_OK); + EXPECT_FALSE(writer.can_write()); + EXPECT_FALSE(writer.has_crashed()); + EXPECT_EQ(writer.get_truncated_size(), TSFILE_CHECK_COMPLETE); + EXPECT_EQ(writer.get_tsfile_io_writer(), nullptr); + writer.close(); +} + +TEST_F(RestorableTsFileIOWriterTest, OpenTruncatedFile) { + TsFileWriter tw; + ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK); + tw.register_timeseries( + "d1", + MeasurementSchema("s1", FLOAT, RLE, CompressionType::UNCOMPRESSED)); + TsRecord record(1, "d1"); + record.add_point("s1", 1.0f); + tw.write_record(record); + tw.flush(); + tw.close(); + + const int64_t full_size = GetCurrentFileSize(); + CorruptCurrentFileTail(5); + + RestorableTsFileIOWriter writer; + ASSERT_EQ(writer.open(file_name_, true), E_OK); + EXPECT_TRUE(writer.can_write()); + EXPECT_TRUE(writer.has_crashed()); + EXPECT_GE(writer.get_truncated_size(), + static_cast(MAGIC_STRING_TSFILE_LEN + 1)); + EXPECT_LE(writer.get_truncated_size(), full_size); + EXPECT_NE(writer.get_tsfile_io_writer(), nullptr); + writer.close(); +} + +TEST_F(RestorableTsFileIOWriterTest, OpenFileWithOnlyHeader) { + int flags = O_RDWR | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + WriteFile wf; + ASSERT_EQ(wf.create(file_name_, flags, 0666), E_OK); + wf.write(MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN); + wf.write(&VERSION_NUM_BYTE, 1); + wf.close(); + + RestorableTsFileIOWriter writer; + ASSERT_EQ(writer.open(file_name_, true), E_OK); + EXPECT_TRUE(writer.can_write()); + EXPECT_TRUE(writer.has_crashed()); + EXPECT_EQ(writer.get_truncated_size(), MAGIC_STRING_TSFILE_LEN + 1); + writer.close(); +} + +// ----------------------------------------------------------------------------- +// Recovery + continued write: TsFileWriter::init(rw) rebuilds schemas_ from +// recovered chunk group metas using actual device_id from file (not +// table_name), so both tree and table model get correct lookups. +// ----------------------------------------------------------------------------- + +TEST_F(RestorableTsFileIOWriterTest, TruncateRecoversAndProvidesWriter) { + TsFileWriter tw; + ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK); + tw.register_timeseries( + "d1", + MeasurementSchema("s1", FLOAT, GORILLA, CompressionType::UNCOMPRESSED)); + TsRecord record(1, "d1"); + record.add_point("s1", 1.0f); + tw.write_record(record); + record.timestamp_ = 2; + tw.write_record(record); + tw.flush(); + tw.close(); + + CorruptCurrentFileTail(3); + + RestorableTsFileIOWriter rw; + ASSERT_EQ(rw.open(file_name_, true), E_OK); + ASSERT_TRUE(rw.can_write()); + ASSERT_NE(rw.get_tsfile_io_writer(), nullptr); + ASSERT_NE(rw.get_write_file(), nullptr); + EXPECT_EQ(rw.get_file_path(), file_name_); + + TsFileWriter tw2; + ASSERT_EQ(tw2.init(&rw), E_OK); + TsRecord record2(3, "d1"); + record2.add_point("s1", 3.0f); + ASSERT_EQ(tw2.write_record(record2), E_OK); + tw2.close(); + rw.close(); +} + +// Multi-segment device path: recovery must use actual device_id from file so +// that subsequent write to the same path finds the schema (no table_name key). +TEST_F(RestorableTsFileIOWriterTest, + TreeModelMultiSegmentDeviceRecoverAndWrite) { + TsFileWriter tw; + ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK); + tw.register_timeseries( + "root.d1", + MeasurementSchema("s1", FLOAT, GORILLA, CompressionType::UNCOMPRESSED)); + TsRecord record(1, "root.d1"); + record.add_point("s1", 1.0f); + ASSERT_EQ(tw.write_record(record), E_OK); + tw.flush(); + tw.close(); + + CorruptCurrentFileTail(3); + + RestorableTsFileIOWriter rw; + ASSERT_EQ(rw.open(file_name_, true), E_OK); + ASSERT_TRUE(rw.can_write()); + + TsFileWriter tw2; + ASSERT_EQ(tw2.init(&rw), E_OK); + TsRecord record2(2, "root.d1"); + record2.add_point("s1", 2.0f); + ASSERT_EQ(tw2.write_record(record2), E_OK); + tw2.flush(); + tw2.close(); + rw.close(); + + TsFileTreeReader reader; + reader.open(file_name_); + ASSERT_EQ(reader.get_all_device_ids().size(), 1u); + ASSERT_EQ(CountTreeReaderRows(reader, {"s1"}), 2); + reader.close(); +} + +// ----------------------------------------------------------------------------- +// Recovery + continued write with TsFileTreeWriter, then read-back verify +// ----------------------------------------------------------------------------- + +TEST_F(RestorableTsFileIOWriterTest, MultiDeviceRecoverAndWriteWithTreeWriter) { + TsFileWriter tw; + ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK); + tw.register_timeseries("d1", MeasurementSchema("s1", FLOAT)); + tw.register_timeseries("d1", MeasurementSchema("s2", INT32)); + tw.register_timeseries("d2", MeasurementSchema("s1", FLOAT)); + tw.register_timeseries("d2", MeasurementSchema("s2", DOUBLE)); + + TsRecord r1(1, "d1"); + r1.add_point("s1", 1.0f); + r1.add_point("s2", 10); + ASSERT_EQ(tw.write_record(r1), E_OK); + TsRecord r2(2, "d2"); + r2.add_point("s1", 2.0f); + r2.add_point("s2", 20.0); + ASSERT_EQ(tw.write_record(r2), E_OK); + tw.flush(); + tw.close(); + + CorruptCurrentFileTail(3); + + RestorableTsFileIOWriter rw; + ASSERT_EQ(rw.open(file_name_, true), E_OK); + ASSERT_TRUE(rw.can_write()); + + TsFileTreeWriter tree_writer(&rw); + TsRecord r3(3, "d1"); + r3.add_point("s1", 3.0f); + r3.add_point("s2", 30); + ASSERT_EQ(tree_writer.write(r3), E_OK); + TsRecord r4(4, "d2"); + r4.add_point("s1", 4.0f); + r4.add_point("s2", 40.0); + ASSERT_EQ(tree_writer.write(r4), E_OK); + tree_writer.flush(); + tree_writer.close(); + + TsFileTreeReader reader; + reader.open(file_name_); + ASSERT_EQ(reader.get_all_device_ids().size(), 2u); + ASSERT_EQ(CountTreeReaderRows(reader, {"s1", "s2"}), 4); + reader.close(); +} + +// ----------------------------------------------------------------------------- +// Tree model + Recovery + continued write with aligned timeseries, then +// read-back verify +// ----------------------------------------------------------------------------- + +TEST_F(RestorableTsFileIOWriterTest, AlignedTimeseriesRecoverAndWrite) { + TsFileWriter tw; + ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK); + std::vector aligned_schemas; + aligned_schemas.push_back(new MeasurementSchema("s1", FLOAT)); + aligned_schemas.push_back(new MeasurementSchema("s2", FLOAT)); + tw.register_aligned_timeseries("d1", aligned_schemas); + + TsRecord r1(1, "d1"); + r1.add_point("s1", 1.0f); + r1.add_point("s2", 2.0f); + ASSERT_EQ(tw.write_record_aligned(r1), E_OK); + TsRecord r2(2, "d1"); + r2.add_point("s1", 3.0f); + r2.add_point("s2", 4.0f); + ASSERT_EQ(tw.write_record_aligned(r2), E_OK); + tw.flush(); + tw.close(); + + CorruptCurrentFileTail(3); + + RestorableTsFileIOWriter rw; + ASSERT_EQ(rw.open(file_name_, true), E_OK); + ASSERT_TRUE(rw.can_write()); + + TsFileTreeWriter tw2(&rw); + TsRecord r3(3, "d1"); + r3.add_point("s1", 5.0f); + r3.add_point("s2", 6.0f); + ASSERT_EQ(tw2.write(r3), E_OK); + tw2.flush(); + tw2.close(); + + TsFileTreeReader reader; + reader.open(file_name_); + ASSERT_EQ(reader.get_all_device_ids().size(), 1u); + ASSERT_EQ(CountTreeReaderRows(reader, {"s1", "s2"}), 3); + reader.close(); +} + +// ----------------------------------------------------------------------------- +// Recovery + continued write with TsFileTableWriter (table model), then +// read-back +// ----------------------------------------------------------------------------- + +TEST_F(RestorableTsFileIOWriterTest, TableWriterRecoverAndWrite) { + std::vector measurement_schemas; + measurement_schemas.push_back(new MeasurementSchema("device", STRING)); + measurement_schemas.push_back(new MeasurementSchema("value", DOUBLE)); + std::vector column_categories = {ColumnCategory::TAG, + ColumnCategory::FIELD}; + TableSchema table_schema("test_table", measurement_schemas, + column_categories); + + WriteFile write_file; + write_file.create(file_name_, GetWriteCreateFlags(), 0666); + TsFileTableWriter table_writer(&write_file, &table_schema); + const std::string table_name = "test_table"; + + { + Tablet tablet(table_schema.get_measurement_names(), + table_schema.get_data_types(), 10); + tablet.set_table_name(table_name); + for (int i = 0; i < 10; i++) { + tablet.add_timestamp(i, static_cast(i)); + tablet.add_value(i, "device", "device0"); + tablet.add_value(i, "value", i * 1.1); + } + ASSERT_EQ(table_writer.write_table(tablet), E_OK); + ASSERT_EQ(table_writer.flush(), E_OK); + } + { + Tablet tablet(table_schema.get_measurement_names(), + table_schema.get_data_types(), 10); + tablet.set_table_name(table_name); + for (int i = 0; i < 10; i++) { + tablet.add_timestamp(i, static_cast(i + 10)); + tablet.add_value(i, "device", "device1"); + tablet.add_value(i, "value", i * 1.1); + } + ASSERT_EQ(table_writer.write_table(tablet), E_OK); + ASSERT_EQ(table_writer.flush(), E_OK); + } + + table_writer.close(); + write_file.close(); + + CorruptCurrentFileTail(3); + + RestorableTsFileIOWriter rw; + ASSERT_EQ(rw.open(file_name_, true), E_OK); + ASSERT_TRUE(rw.can_write()); + + TsFileTableWriter table_writer2(&rw); + std::vector value_col = {"__level1", "value"}; + std::vector value_types = {STRING, DOUBLE}; + { + Tablet tablet2(value_col, value_types, 10); + tablet2.set_table_name(table_name); + for (int i = 0; i < 10; i++) { + tablet2.add_timestamp(i, static_cast(i + 20)); + tablet2.add_value(i, "__level1", "device0"); + tablet2.add_value(i, "value", (i + 10) * 1.1); + } + ASSERT_EQ(table_writer2.write_table(tablet2), E_OK); + table_writer2.flush(); + } + { + Tablet tablet2(value_col, value_types, 10); + tablet2.set_table_name(table_name); + for (int i = 0; i < 10; i++) { + tablet2.add_timestamp(i, static_cast(i + 30)); + tablet2.add_value(i, "__level1", "device1"); + tablet2.add_value(i, "value", (i + 10) * 1.1); + } + ASSERT_EQ(table_writer2.write_table(tablet2), E_OK); + table_writer2.flush(); + } + + table_writer2.close(); + + TsFileReader table_reader; + ASSERT_EQ(table_reader.open(file_name_), E_OK); + ResultSet* tmp_result_set = nullptr; + table_reader.query("test_table", {"__level1", "value"}, 0, 10000, + tmp_result_set, nullptr); + auto* table_result_set = static_cast(tmp_result_set); + bool has_next = false; + int64_t row_num = 0; + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + (void)table_result_set->get_row_record(); + row_num++; + } + ASSERT_EQ(row_num, 40); + table_reader.destroy_query_data_set(tmp_result_set); + table_reader.close(); +} diff --git a/cpp/test/file/write_file_test.cc b/cpp/test/file/write_file_test.cc index 1345b7bee..3cb9edd25 100644 --- a/cpp/test/file/write_file_test.cc +++ b/cpp/test/file/write_file_test.cc @@ -112,3 +112,32 @@ TEST_F(WriteFileTest, CloseFile) { EXPECT_EQ(write_file.write(content, content_len), E_OK); EXPECT_EQ(write_file.close(), E_OK); } + +// Truncate file to a given size (used by RestorableTsFileIOWriter after +// recovery). +TEST_F(WriteFileTest, TruncateFile) { + WriteFile write_file; + std::string file_name = "test_file_truncate.dat"; + + remove(file_name.c_str()); + + int flags = O_RDWR | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + mode_t mode = 0666; + EXPECT_EQ(write_file.create(file_name, flags, mode), E_OK); + EXPECT_TRUE(write_file.file_opened()); + + const char* content = "Hello, Truncate World!"; + uint32_t content_len = strlen(content); + EXPECT_EQ(write_file.write(content, content_len), E_OK); + EXPECT_EQ(write_file.truncate(7), E_OK); + write_file.close(); + + std::ifstream file(file_name); + std::string file_content((std::istreambuf_iterator(file)), + std::istreambuf_iterator()); + EXPECT_EQ(file_content, "Hello, "); + remove(file_name.c_str()); +} diff --git a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc index ffcaa20fa..aa4ff2544 100644 --- a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc +++ b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc @@ -307,9 +307,12 @@ TEST_F(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) { } const int NUM_ROWS = 100; + int start_time = 0, end_time = -1; for (int row = 0; row < NUM_ROWS; ++row) { for (const auto& device_id : device_ids) { - TsRecord record(device_id, row * 1000); + int timestamp = row * 1000; + TsRecord record(device_id, timestamp); + end_time = timestamp; for (size_t i = 0; i < measurement_ids.size(); ++i) { switch (data_types[i]) { case INT64: @@ -342,6 +345,18 @@ TEST_F(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) { TsFileTreeReader reader; reader.open(file_name_); + auto device_timeseries_map = reader.get_timeseries_metadata(); + ASSERT_EQ(device_timeseries_map.size(), device_ids.size()); + auto device_timeseries = device_timeseries_map.at( + std::make_shared(device_ids[0])); + ASSERT_EQ(device_timeseries.size(), measurement_ids.size()); + ASSERT_EQ( + device_timeseries[0]->get_measurement_name().to_std_string(), + *std::min_element(measurement_ids.begin(), measurement_ids.end())); + ASSERT_EQ(device_timeseries[0]->get_statistic()->start_time_, start_time); + ASSERT_EQ(device_timeseries[0]->get_statistic()->end_time_, end_time); + ASSERT_EQ(device_timeseries[0]->get_statistic()->count_, NUM_ROWS); + // Verify get_all_device_ids / get_all_devices auto read_device_ids = reader.get_all_device_ids(); ASSERT_EQ(read_device_ids.size(), device_ids.size()); for (size_t i = 0; i < device_ids.size(); ++i) { diff --git a/cpp/test/reader/tsfile_reader_test.cc b/cpp/test/reader/tsfile_reader_test.cc index b426d7ec7..54127e072 100644 --- a/cpp/test/reader/tsfile_reader_test.cc +++ b/cpp/test/reader/tsfile_reader_test.cc @@ -198,7 +198,7 @@ TEST_F(TsFileReaderTest, GetAllDevice) { } TEST_F(TsFileReaderTest, GetTimeseriesSchema) { - std::vector device_path = {"device", "device.ln"}; + std::vector device_path = {"device.ln1", "device.ln2 "}; std::vector measurement_name = {"temperature", "humidity"}; common::TSDataType data_type = common::TSDataType::INT32; common::TSEncoding encoding = common::TSEncoding::PLAIN; @@ -236,6 +236,31 @@ TEST_F(TsFileReaderTest, GetTimeseriesSchema) { measurement_schemas); ASSERT_EQ(measurement_schemas[1].measurement_name_, measurement_name[1]); ASSERT_EQ(measurement_schemas[1].data_type_, TSDataType::INT32); + + std::vector> one_device = { + std::make_shared(device_path[0])}; + auto one_meta = reader.get_timeseries_metadata(one_device); + ASSERT_EQ(one_meta.size(), 1u); + auto timeseries_list = one_meta.begin()->second; + ASSERT_EQ(timeseries_list.size(), 1u); + ASSERT_EQ(timeseries_list[0]->get_measurement_name().to_std_string(), + measurement_name[0]); + ASSERT_EQ(timeseries_list[0]->get_statistic()->start_time_, 1622505600000); + ASSERT_EQ(timeseries_list[0]->get_statistic()->end_time_, 1622505600000); + ASSERT_EQ(timeseries_list[0]->get_statistic()->count_, 1); + + auto device_timeseries_map = reader.get_timeseries_metadata(); + ASSERT_EQ(device_timeseries_map.size(), 2u); + auto device_timeseries_1 = device_timeseries_map.at( + std::make_shared(device_path[1])); + ASSERT_EQ(device_timeseries_1.size(), 1u); + ASSERT_EQ(device_timeseries_1[0]->get_measurement_name().to_std_string(), + measurement_name[1]); + ASSERT_EQ(device_timeseries_1[0]->get_statistic()->start_time_, + 1622505600000); + ASSERT_EQ(device_timeseries_1[0]->get_statistic()->end_time_, + 1622505600000); + ASSERT_EQ(device_timeseries_1[0]->get_statistic()->count_, 1); reader.close(); } diff --git a/cpp/test/writer/tsfile_writer_test.cc b/cpp/test/writer/tsfile_writer_test.cc index 25684e726..30fded6eb 100644 --- a/cpp/test/writer/tsfile_writer_test.cc +++ b/cpp/test/writer/tsfile_writer_test.cc @@ -115,11 +115,6 @@ class TsFileWriterTest : public ::testing::Test { class TsFileWriterTestSimple : public ::testing::Test {}; -TEST_F(TsFileWriterTestSimple, InitWithNullWriteFile) { - TsFileWriter writer; - ASSERT_EQ(writer.init(nullptr), E_INVALID_ARG); -} - TEST_F(TsFileWriterTest, WriteDiffDataType) { std::string device_name = "test_table"; common::TSEncoding encoding = common::TSEncoding::PLAIN;