From 77f22079087bd06150feb19935431a3be75bb992 Mon Sep 17 00:00:00 2001
From: 761417898 <761417898@qq.com>
Date: Mon, 9 Feb 2026 10:20:10 +0800
Subject: [PATCH 01/23] Implement interface get_all_timeseries_metadata for
Retrieve metadata for all timeseries in the file
---
cpp/src/reader/tsfile_reader.cc | 41 ++++++++++++++++++-
cpp/src/reader/tsfile_reader.h | 27 ++++++++++++
cpp/src/reader/tsfile_tree_reader.cc | 13 ++++++
cpp/src/reader/tsfile_tree_reader.h | 28 +++++++++++++
.../tree_view/tsfile_reader_tree_test.cc | 21 +++++++++-
cpp/test/reader/tsfile_reader_test.cc | 23 ++++++++++-
6 files changed, 150 insertions(+), 3 deletions(-)
diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc
index f97570885..1556d8cb0 100644
--- a/cpp/src/reader/tsfile_reader.cc
+++ b/cpp/src/reader/tsfile_reader.cc
@@ -29,7 +29,9 @@ namespace storage {
TsFileReader::TsFileReader()
: read_file_(nullptr),
tsfile_executor_(nullptr),
- table_query_executor_(nullptr) {}
+ table_query_executor_(nullptr) {
+ tsfile_reader_meta_pa_.init(512, MOD_TSFILE_READER);
+}
TsFileReader::~TsFileReader() { close(); }
@@ -291,6 +293,27 @@ int TsFileReader::get_timeseries_schema(
return E_OK;
}
+int TsFileReader::get_timeseries_metadata(
+ std::shared_ptr device_id,
+ std::vector>& result) {
+ int ret = E_OK;
+ std::vector timeseries_indexs;
+ tsfile_reader_meta_pa_.init(512, MOD_TSFILE_READER);
+ auto noop_deleter = [](ITimeseriesIndex*) {};
+ if (RET_FAIL(
+ tsfile_executor_->get_tsfile_io_reader()
+ ->get_device_timeseries_meta_without_chunk_meta(
+ device_id, timeseries_indexs, tsfile_reader_meta_pa_))) {
+ } else {
+ for (auto timeseries_index : timeseries_indexs) {
+ // wrap raw pointer with shared_ptr + noop deleter
+ result.emplace_back(std::shared_ptr(
+ timeseries_index, noop_deleter));
+ }
+ }
+ return ret;
+}
+
ResultSet* TsFileReader::read_timeseries(
const std::shared_ptr& device_id,
const std::vector& measurement_name) {
@@ -320,4 +343,20 @@ TsFileReader::get_all_table_schemas() {
return table_schemas;
}
+int TsFileReader::get_all_timeseries_metadata(
+ std::map,
+ std::vector>, IDeviceIDComparator>& result) {
+ auto device_ids = this->get_all_device_ids();
+ int ret = E_OK;
+ for (const auto& device_id : device_ids) {
+ std::vector> timeseries_list;
+ if (RET_SUCC(this->get_timeseries_metadata(device_id, timeseries_list))) {
+ result.insert(std::make_pair(device_id, timeseries_list));
+ } else {
+ break;
+ }
+ }
+ return ret;
+}
+
} // namespace storage
diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h
index 8a6ba2264..565cf651e 100644
--- a/cpp/src/reader/tsfile_reader.h
+++ b/cpp/src/reader/tsfile_reader.h
@@ -152,6 +152,22 @@ class TsFileReader {
*/
int get_timeseries_schema(std::shared_ptr device_id,
std::vector& result);
+
+ /**
+ * @brief Retrieve metadata for all timeseries in the file
+ *
+ * Scans the entire TsFile to collect all timeseries index information
+ * organized by device ID. Returns a map where each device ID maps to
+ * a list of its timeseries indices.
+ *
+ * @param [out] result Map to receive timeseries metadata:
+ * std::shared_ptr ->
+ * std::vector>;
+ * @return 0 on success, non-zero error code on failure
+ */
+ int get_all_timeseries_metadata(
+ std::map,
+ std::vector>, IDeviceIDComparator>& result);
/**
* @brief get the table schema by the table name
*
@@ -167,6 +183,16 @@ class TsFileReader {
*/
std::vector> get_all_table_schemas();
+ /**
+ * @brief Retrieve metadata for all timeseries under a specific device
+ *
+ * @param [in] device_id Device identifier to query
+ * @param [out] result List to receive timeseries metadata for the device
+ * @return 0 on success, non-zero error code on failure
+ */
+ int get_timeseries_metadata(std::shared_ptr device_id,
+ std::vector>& result);
+
private:
int get_all_devices(std::vector>& device_ids,
std::shared_ptr index_node,
@@ -174,6 +200,7 @@ class TsFileReader {
storage::ReadFile* read_file_;
storage::TsFileExecutor* tsfile_executor_;
storage::TableQueryExecutor* table_query_executor_;
+ common::PageArena tsfile_reader_meta_pa_;
};
} // namespace storage
diff --git a/cpp/src/reader/tsfile_tree_reader.cc b/cpp/src/reader/tsfile_tree_reader.cc
index 2b28c8647..459e46a8a 100644
--- a/cpp/src/reader/tsfile_tree_reader.cc
+++ b/cpp/src/reader/tsfile_tree_reader.cc
@@ -68,4 +68,17 @@ std::vector TsFileTreeReader::get_all_device_ids() {
return ret_device_ids;
}
+int TsFileTreeReader::get_timeseries_metadata(
+ std::shared_ptr device_id,
+ std::vector>& result) {
+ return tsfile_reader_->get_timeseries_metadata(std::move(device_id), result);
+}
+
+int TsFileTreeReader::get_all_timeseries_metadata(
+ std::map,
+ std::vector>,
+ IDeviceIDComparator>& result) {
+ return tsfile_reader_->get_all_timeseries_metadata(result);
+}
+
} // namespace storage
\ No newline at end of file
diff --git a/cpp/src/reader/tsfile_tree_reader.h b/cpp/src/reader/tsfile_tree_reader.h
index 66341b7ed..19f62724b 100644
--- a/cpp/src/reader/tsfile_tree_reader.h
+++ b/cpp/src/reader/tsfile_tree_reader.h
@@ -97,6 +97,34 @@ class TsFileTreeReader {
*/
std::vector get_all_device_ids();
+ /**
+ * @brief Retrieve metadata for all timeseries under a specific device
+ *
+ * @param [in] device_id Device identifier to query
+ * @param [out] result List to receive timeseries metadata for the device
+ * @return 0 on success, non-zero error code on failure
+ */
+ int get_timeseries_metadata(
+ std::shared_ptr device_id,
+ std::vector>& result);
+
+ /**
+ * @brief Retrieve metadata for all timeseries in the file
+ *
+ * Scans the entire TsFile to collect all timeseries index information
+ * organized by device ID. Returns a map where each device ID maps to
+ * a list of its timeseries indices.
+ *
+ * @param [out] result Map to receive timeseries metadata:
+ * std::shared_ptr ->
+ * std::vector>;
+ * @return 0 on success, non-zero error code on failure
+ */
+ int get_all_timeseries_metadata(
+ std::map,
+ std::vector>,
+ IDeviceIDComparator>& result);
+
private:
std::shared_ptr
tsfile_reader_; ///< Underlying TsFile reader implementation
diff --git a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
index ffcaa20fa..7dd6a8877 100644
--- a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
+++ b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
@@ -307,9 +307,12 @@ TEST_F(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) {
}
const int NUM_ROWS = 100;
+ int start_time = 0, end_time = -1;
for (int row = 0; row < NUM_ROWS; ++row) {
for (const auto& device_id : device_ids) {
- TsRecord record(device_id, row * 1000);
+ int timestamp = row * 1000;
+ TsRecord record(device_id, timestamp);
+ end_time = timestamp;
for (size_t i = 0; i < measurement_ids.size(); ++i) {
switch (data_types[i]) {
case INT64:
@@ -342,6 +345,22 @@ TEST_F(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) {
TsFileTreeReader reader;
reader.open(file_name_);
+ std::map,
+ std::vector>,
+ IDeviceIDComparator>
+ device_timeseries_map;
+ ASSERT_EQ(reader.get_all_timeseries_metadata(device_timeseries_map), E_OK);
+ ASSERT_EQ(device_timeseries_map.size(), device_ids.size());
+ auto device_timeseries = device_timeseries_map.at(std::make_shared(device_ids[0]));
+ ASSERT_EQ(device_timeseries.size(), measurement_ids.size());
+ ASSERT_EQ(
+ device_timeseries[0]->get_measurement_name().to_std_string(),
+ *std::min_element(measurement_ids.begin(), measurement_ids.end()));
+ ASSERT_EQ(device_timeseries[0]->get_statistic()->start_time_,
+ start_time);
+ ASSERT_EQ(device_timeseries[0]->get_statistic()->end_time_, end_time);
+ ASSERT_EQ(device_timeseries[0]->get_statistic()->count_, NUM_ROWS);
+ // Verify get_all_device_ids
auto read_device_ids = reader.get_all_device_ids();
ASSERT_EQ(read_device_ids.size(), device_ids.size());
for (size_t i = 0; i < device_ids.size(); ++i) {
diff --git a/cpp/test/reader/tsfile_reader_test.cc b/cpp/test/reader/tsfile_reader_test.cc
index 6d4edd5a3..824eca3e5 100644
--- a/cpp/test/reader/tsfile_reader_test.cc
+++ b/cpp/test/reader/tsfile_reader_test.cc
@@ -197,7 +197,7 @@ TEST_F(TsFileReaderTest, GetAllDevice) {
}
TEST_F(TsFileReaderTest, GetTimeseriesSchema) {
- std::vector device_path = {"device", "device.ln"};
+ std::vector device_path = {"device.ln1", "device.ln2 "};
std::vector measurement_name = {"temperature", "humidity"};
common::TSDataType data_type = common::TSDataType::INT32;
common::TSEncoding encoding = common::TSEncoding::PLAIN;
@@ -235,5 +235,26 @@ TEST_F(TsFileReaderTest, GetTimeseriesSchema) {
measurement_schemas);
ASSERT_EQ(measurement_schemas[1].measurement_name_, measurement_name[1]);
ASSERT_EQ(measurement_schemas[1].data_type_, TSDataType::INT32);
+
+ std::vector> timeseries_list;
+ ASSERT_EQ(reader.get_timeseries_metadata(std::make_shared(device_path[0]),
+ timeseries_list), E_OK);
+ ASSERT_EQ(timeseries_list.size(), 1);
+ ASSERT_EQ(timeseries_list[0]->get_measurement_name().to_std_string(), measurement_name[0]);
+ ASSERT_EQ(timeseries_list[0]->get_statistic()->start_time_, 1622505600000);
+ ASSERT_EQ(timeseries_list[0]->get_statistic()->end_time_, 1622505600000);
+ ASSERT_EQ(timeseries_list[0]->get_statistic()->count_, 1);
+
+ std::map,
+ std::vector>, IDeviceIDComparator>
+ device_timeseries_map;
+ ASSERT_EQ(reader.get_all_timeseries_metadata(device_timeseries_map), E_OK);
+ ASSERT_EQ(device_timeseries_map.size(), 2);
+ auto device_timeseries_1 = device_timeseries_map.at(std::make_shared(device_path[1]));
+ ASSERT_EQ(device_timeseries_1.size(), 1);
+ ASSERT_EQ(device_timeseries_1[0]->get_measurement_name().to_std_string(), measurement_name[1]);
+ ASSERT_EQ(device_timeseries_1[0]->get_statistic()->start_time_, 1622505600000);
+ ASSERT_EQ(device_timeseries_1[0]->get_statistic()->end_time_, 1622505600000);
+ ASSERT_EQ(device_timeseries_1[0]->get_statistic()->count_, 1);
reader.close();
}
From fa3470e81b52c5cc0a2d81cd5ef32d9bcd4f9320 Mon Sep 17 00:00:00 2001
From: 761417898 <761417898@qq.com>
Date: Mon, 9 Feb 2026 10:33:29 +0800
Subject: [PATCH 02/23] mvn spotless apply
---
cpp/src/reader/tsfile_reader.cc | 8 ++++---
cpp/src/reader/tsfile_reader.h | 8 ++++---
cpp/src/reader/tsfile_tree_reader.cc | 3 ++-
.../tree_view/tsfile_reader_tree_test.cc | 6 ++---
cpp/test/reader/tsfile_reader_test.cc | 24 ++++++++++++-------
5 files changed, 31 insertions(+), 18 deletions(-)
diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc
index 1556d8cb0..90e4c89cf 100644
--- a/cpp/src/reader/tsfile_reader.cc
+++ b/cpp/src/reader/tsfile_reader.cc
@@ -344,13 +344,15 @@ TsFileReader::get_all_table_schemas() {
}
int TsFileReader::get_all_timeseries_metadata(
- std::map,
- std::vector>, IDeviceIDComparator>& result) {
+ std::map,
+ std::vector>,
+ IDeviceIDComparator>& result) {
auto device_ids = this->get_all_device_ids();
int ret = E_OK;
for (const auto& device_id : device_ids) {
std::vector> timeseries_list;
- if (RET_SUCC(this->get_timeseries_metadata(device_id, timeseries_list))) {
+ if (RET_SUCC(
+ this->get_timeseries_metadata(device_id, timeseries_list))) {
result.insert(std::make_pair(device_id, timeseries_list));
} else {
break;
diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h
index 565cf651e..1b1b4cc52 100644
--- a/cpp/src/reader/tsfile_reader.h
+++ b/cpp/src/reader/tsfile_reader.h
@@ -167,7 +167,8 @@ class TsFileReader {
*/
int get_all_timeseries_metadata(
std::map,
- std::vector>, IDeviceIDComparator>& result);
+ std::vector>,
+ IDeviceIDComparator>& result);
/**
* @brief get the table schema by the table name
*
@@ -190,8 +191,9 @@ class TsFileReader {
* @param [out] result List to receive timeseries metadata for the device
* @return 0 on success, non-zero error code on failure
*/
- int get_timeseries_metadata(std::shared_ptr device_id,
- std::vector>& result);
+ int get_timeseries_metadata(
+ std::shared_ptr device_id,
+ std::vector>& result);
private:
int get_all_devices(std::vector>& device_ids,
diff --git a/cpp/src/reader/tsfile_tree_reader.cc b/cpp/src/reader/tsfile_tree_reader.cc
index 459e46a8a..dadeb8012 100644
--- a/cpp/src/reader/tsfile_tree_reader.cc
+++ b/cpp/src/reader/tsfile_tree_reader.cc
@@ -71,7 +71,8 @@ std::vector TsFileTreeReader::get_all_device_ids() {
int TsFileTreeReader::get_timeseries_metadata(
std::shared_ptr device_id,
std::vector>& result) {
- return tsfile_reader_->get_timeseries_metadata(std::move(device_id), result);
+ return tsfile_reader_->get_timeseries_metadata(std::move(device_id),
+ result);
}
int TsFileTreeReader::get_all_timeseries_metadata(
diff --git a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
index 7dd6a8877..dc79b0457 100644
--- a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
+++ b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
@@ -351,13 +351,13 @@ TEST_F(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) {
device_timeseries_map;
ASSERT_EQ(reader.get_all_timeseries_metadata(device_timeseries_map), E_OK);
ASSERT_EQ(device_timeseries_map.size(), device_ids.size());
- auto device_timeseries = device_timeseries_map.at(std::make_shared(device_ids[0]));
+ auto device_timeseries = device_timeseries_map.at(
+ std::make_shared(device_ids[0]));
ASSERT_EQ(device_timeseries.size(), measurement_ids.size());
ASSERT_EQ(
device_timeseries[0]->get_measurement_name().to_std_string(),
*std::min_element(measurement_ids.begin(), measurement_ids.end()));
- ASSERT_EQ(device_timeseries[0]->get_statistic()->start_time_,
- start_time);
+ ASSERT_EQ(device_timeseries[0]->get_statistic()->start_time_, start_time);
ASSERT_EQ(device_timeseries[0]->get_statistic()->end_time_, end_time);
ASSERT_EQ(device_timeseries[0]->get_statistic()->count_, NUM_ROWS);
// Verify get_all_device_ids
diff --git a/cpp/test/reader/tsfile_reader_test.cc b/cpp/test/reader/tsfile_reader_test.cc
index 824eca3e5..c67efcc93 100644
--- a/cpp/test/reader/tsfile_reader_test.cc
+++ b/cpp/test/reader/tsfile_reader_test.cc
@@ -237,24 +237,32 @@ TEST_F(TsFileReaderTest, GetTimeseriesSchema) {
ASSERT_EQ(measurement_schemas[1].data_type_, TSDataType::INT32);
std::vector> timeseries_list;
- ASSERT_EQ(reader.get_timeseries_metadata(std::make_shared(device_path[0]),
- timeseries_list), E_OK);
+ ASSERT_EQ(reader.get_timeseries_metadata(
+ std::make_shared(device_path[0]),
+ timeseries_list),
+ E_OK);
ASSERT_EQ(timeseries_list.size(), 1);
- ASSERT_EQ(timeseries_list[0]->get_measurement_name().to_std_string(), measurement_name[0]);
+ ASSERT_EQ(timeseries_list[0]->get_measurement_name().to_std_string(),
+ measurement_name[0]);
ASSERT_EQ(timeseries_list[0]->get_statistic()->start_time_, 1622505600000);
ASSERT_EQ(timeseries_list[0]->get_statistic()->end_time_, 1622505600000);
ASSERT_EQ(timeseries_list[0]->get_statistic()->count_, 1);
std::map,
- std::vector>, IDeviceIDComparator>
+ std::vector>,
+ IDeviceIDComparator>
device_timeseries_map;
ASSERT_EQ(reader.get_all_timeseries_metadata(device_timeseries_map), E_OK);
ASSERT_EQ(device_timeseries_map.size(), 2);
- auto device_timeseries_1 = device_timeseries_map.at(std::make_shared(device_path[1]));
+ auto device_timeseries_1 = device_timeseries_map.at(
+ std::make_shared(device_path[1]));
ASSERT_EQ(device_timeseries_1.size(), 1);
- ASSERT_EQ(device_timeseries_1[0]->get_measurement_name().to_std_string(), measurement_name[1]);
- ASSERT_EQ(device_timeseries_1[0]->get_statistic()->start_time_, 1622505600000);
- ASSERT_EQ(device_timeseries_1[0]->get_statistic()->end_time_, 1622505600000);
+ ASSERT_EQ(device_timeseries_1[0]->get_measurement_name().to_std_string(),
+ measurement_name[1]);
+ ASSERT_EQ(device_timeseries_1[0]->get_statistic()->start_time_,
+ 1622505600000);
+ ASSERT_EQ(device_timeseries_1[0]->get_statistic()->end_time_,
+ 1622505600000);
ASSERT_EQ(device_timeseries_1[0]->get_statistic()->count_, 1);
reader.close();
}
From 311b9c1ea06cac68483d770283126ee638f3606d Mon Sep 17 00:00:00 2001
From: 761417898 <761417898@qq.com>
Date: Mon, 9 Feb 2026 17:54:18 +0800
Subject: [PATCH 03/23] Implement RestorableTsFileIOWriter
---
cpp/src/file/restorable_tsfile_io_writer.cc | 517 ++++++++++++++++++
cpp/src/file/restorable_tsfile_io_writer.h | 117 ++++
cpp/src/file/tsfile_io_writer.h | 6 +
cpp/src/file/write_file.cc | 36 +-
cpp/src/file/write_file.h | 2 +
cpp/src/writer/tsfile_writer.cc | 45 +-
cpp/src/writer/tsfile_writer.h | 3 +
.../file/restorable_tsfile_io_writer_test.cc | 323 +++++++++++
cpp/test/file/write_file_test.cc | 27 +
cpp/test/writer/tsfile_writer_test.cc | 5 -
10 files changed, 1072 insertions(+), 9 deletions(-)
create mode 100644 cpp/src/file/restorable_tsfile_io_writer.cc
create mode 100644 cpp/src/file/restorable_tsfile_io_writer.h
create mode 100644 cpp/test/file/restorable_tsfile_io_writer_test.cc
diff --git a/cpp/src/file/restorable_tsfile_io_writer.cc b/cpp/src/file/restorable_tsfile_io_writer.cc
new file mode 100644
index 000000000..8b44e803c
--- /dev/null
+++ b/cpp/src/file/restorable_tsfile_io_writer.cc
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "file/restorable_tsfile_io_writer.h"
+
+#include
+
+#include
+#include
+#include
+
+#include "common/allocator/byte_stream.h"
+#include "common/device_id.h"
+#include "common/statistic.h"
+#include "utils/errno_define.h"
+
+#ifdef _WIN32
+#include
+#include
+#include
+ssize_t pread(int fd, void* buf, size_t count, uint64_t offset);
+#else
+#include
+#include
+#endif
+
+using namespace common;
+
+namespace storage {
+
+namespace {
+
+const int HEADER_LEN = MAGIC_STRING_TSFILE_LEN + 1; // magic + version
+const int BUF_SIZE = 4096;
+
+/**
+ * Lightweight read-only file handle for self-check only.
+ * Use init_from_fd() when WriteFile is already open to avoid opening the file
+ * twice (fixes Windows file sharing and ensures we read the same content).
+ */
+struct SelfCheckReader {
+ int fd_;
+ int32_t file_size_;
+ bool own_fd_; // if false, do not close fd_
+
+ SelfCheckReader() : fd_(-1), file_size_(-1), own_fd_(true) {}
+
+ int init_from_fd(int fd) {
+ fd_ = fd;
+ own_fd_ = false;
+ if (fd_ < 0) {
+ return E_FILE_OPEN_ERR;
+ }
+#ifdef _WIN32
+ struct _stat st;
+ if (_fstat(fd_, &st) < 0) {
+ return E_FILE_STAT_ERR;
+ }
+ file_size_ = static_cast(st.st_size);
+#else
+ struct stat st;
+ if (fstat(fd_, &st) < 0) {
+ return E_FILE_STAT_ERR;
+ }
+ file_size_ = static_cast(st.st_size);
+#endif
+ return E_OK;
+ }
+
+ int open(const std::string& path) {
+#ifdef _WIN32
+ fd_ = ::_open(path.c_str(), _O_RDONLY | _O_BINARY);
+#else
+ fd_ = ::open(path.c_str(), O_RDONLY);
+#endif
+ if (fd_ < 0) {
+ return E_FILE_OPEN_ERR;
+ }
+ own_fd_ = true;
+#ifdef _WIN32
+ struct _stat st;
+ if (_fstat(fd_, &st) < 0) {
+ close();
+ return E_FILE_STAT_ERR;
+ }
+ file_size_ = static_cast(st.st_size);
+#else
+ struct stat st;
+ if (fstat(fd_, &st) < 0) {
+ close();
+ return E_FILE_STAT_ERR;
+ }
+ file_size_ = static_cast(st.st_size);
+#endif
+ return E_OK;
+ }
+
+ void close() {
+ if (own_fd_ && fd_ >= 0) {
+#ifdef _WIN32
+ ::_close(fd_);
+#else
+ ::close(fd_);
+#endif
+ }
+ fd_ = -1;
+ file_size_ = -1;
+ }
+
+ int32_t file_size() const { return file_size_; }
+
+ int read(int32_t offset, char* buf, int32_t buf_size, int32_t& read_len) {
+ read_len = 0;
+ if (fd_ < 0) {
+ return E_FILE_READ_ERR;
+ }
+ ssize_t n = ::pread(fd_, buf, buf_size, offset);
+ if (n < 0) {
+ return E_FILE_READ_ERR;
+ }
+ read_len = static_cast(n);
+ return E_OK;
+ }
+};
+
+#ifdef _WIN32
+ssize_t pread(int fd, void* buf, size_t count, uint64_t offset) {
+ DWORD read_bytes = 0;
+ OVERLAPPED ov = {};
+ ov.OffsetHigh = (DWORD)((offset >> 32) & 0xFFFFFFFF);
+ ov.Offset = (DWORD)(offset & 0xFFFFFFFF);
+ HANDLE h = (HANDLE)_get_osfhandle(fd);
+ if (!ReadFile(h, buf, (DWORD)count, &read_bytes, &ov)) {
+ if (GetLastError() != ERROR_HANDLE_EOF) {
+ return -1;
+ }
+ }
+ return (ssize_t)read_bytes;
+}
+#endif
+
+static int parse_chunk_header_and_skip(SelfCheckReader& reader,
+ int64_t chunk_start,
+ int64_t& bytes_consumed,
+ ChunkHeader* header_out = nullptr) {
+ int32_t file_size = reader.file_size();
+ int32_t max_read = static_cast(
+ std::min(static_cast(BUF_SIZE), file_size - chunk_start));
+ if (max_read < ChunkHeader::MIN_SERIALIZED_SIZE) {
+ return E_TSFILE_CORRUPTED;
+ }
+
+ std::vector buf(max_read);
+ int32_t read_len = 0;
+ int ret = reader.read(static_cast(chunk_start), buf.data(),
+ max_read, read_len);
+ if (ret != E_OK || read_len < ChunkHeader::MIN_SERIALIZED_SIZE) {
+ return E_TSFILE_CORRUPTED;
+ }
+
+ ByteStream bs;
+ bs.wrap_from(buf.data(), read_len);
+
+ ChunkHeader header;
+ ret = header.deserialize_from(bs);
+ if (ret != E_OK) {
+ return E_TSFILE_CORRUPTED;
+ }
+
+ int header_len = bs.read_pos();
+ int64_t total = header_len + header.data_size_;
+ if (chunk_start + total > file_size) {
+ return E_TSFILE_CORRUPTED;
+ }
+
+ if (header_out != nullptr) {
+ *header_out = header;
+ }
+ bytes_consumed = total;
+ return E_OK;
+}
+
+} // namespace
+
+RestorableTsFileIOWriter::RestorableTsFileIOWriter()
+ : TsFileIOWriter(),
+ write_file_(nullptr),
+ write_file_owned_(false),
+ truncated_size_(-1),
+ crashed_(false),
+ can_write_(false) {
+ self_check_arena_.init(512, MOD_TSFILE_READER);
+}
+
+RestorableTsFileIOWriter::~RestorableTsFileIOWriter() { close(); }
+
+void RestorableTsFileIOWriter::close() {
+ if (write_file_owned_ && write_file_ != nullptr) {
+ write_file_->close();
+ delete write_file_;
+ write_file_ = nullptr;
+ write_file_owned_ = false;
+ }
+ self_check_arena_.destroy();
+}
+
+int RestorableTsFileIOWriter::open(const std::string& file_path,
+ bool truncate_corrupted) {
+ if (write_file_ != nullptr) {
+ return E_ALREADY_EXIST;
+ }
+
+ file_path_ = file_path;
+ write_file_ = new WriteFile();
+ write_file_owned_ = true;
+
+ // O_RDWR|O_CREAT without O_TRUNC: preserve existing file content
+#ifdef _WIN32
+ const int flags = O_RDWR | O_CREAT | O_BINARY;
+#else
+ const int flags = O_RDWR | O_CREAT;
+#endif
+ const mode_t mode = 0644;
+
+ int ret = write_file_->create(file_path_, flags, mode);
+ if (ret != E_OK) {
+ close();
+ return ret;
+ }
+
+ ret = self_check(truncate_corrupted);
+ if (ret != E_OK) {
+ close();
+ return ret;
+ }
+
+ return E_OK;
+}
+
+int RestorableTsFileIOWriter::self_check(bool truncate_corrupted) {
+ SelfCheckReader reader;
+ // Use a separate read-only handle for self-check - on Windows, sharing
+ // the O_RDWR fd can cause stale/cached reads for complete-file detection
+ int ret = reader.open(file_path_);
+ if (ret != E_OK) {
+ return ret;
+ }
+
+ int32_t file_size = reader.file_size();
+ if (file_size == 0) {
+ reader.close();
+ truncated_size_ = 0;
+ crashed_ = true;
+ can_write_ = true;
+ if (write_file_->seek_to_end() != E_OK) {
+ return E_FILE_READ_ERR;
+ }
+ ret = init(write_file_);
+ if (ret != E_OK) {
+ return ret;
+ }
+ ret = start_file();
+ if (ret != E_OK) {
+ return ret;
+ }
+ return E_OK;
+ }
+
+ if (file_size < HEADER_LEN) {
+ reader.close();
+ truncated_size_ = TSFILE_CHECK_INCOMPATIBLE;
+ return E_TSFILE_CORRUPTED;
+ }
+
+ char header_buf[HEADER_LEN];
+ int32_t read_len = 0;
+ ret = reader.read(0, header_buf, HEADER_LEN, read_len);
+ if (ret != E_OK || read_len != HEADER_LEN) {
+ reader.close();
+ truncated_size_ = TSFILE_CHECK_INCOMPATIBLE;
+ return E_TSFILE_CORRUPTED;
+ }
+
+ if (memcmp(header_buf, MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN) != 0) {
+ reader.close();
+ truncated_size_ = TSFILE_CHECK_INCOMPATIBLE;
+ return E_TSFILE_CORRUPTED;
+ }
+
+ if (header_buf[MAGIC_STRING_TSFILE_LEN] != VERSION_NUM_BYTE) {
+ reader.close();
+ truncated_size_ = TSFILE_CHECK_INCOMPATIBLE;
+ return E_TSFILE_CORRUPTED;
+ }
+
+ // Completeness check per Java isComplete(): only header+tail magic
+ // size >= MAGIC*2 + version_byte, tail magic equals head magic
+ bool is_complete = false;
+ if (file_size >= static_cast(MAGIC_STRING_TSFILE_LEN * 2 + 1)) {
+ char tail_buf[MAGIC_STRING_TSFILE_LEN];
+ ret = reader.read(file_size - MAGIC_STRING_TSFILE_LEN, tail_buf,
+ MAGIC_STRING_TSFILE_LEN, read_len);
+ if (ret == E_OK && read_len == MAGIC_STRING_TSFILE_LEN &&
+ memcmp(tail_buf, MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN) ==
+ 0) {
+ is_complete = true;
+ }
+ }
+
+ if (is_complete) {
+ reader.close();
+ truncated_size_ = TSFILE_CHECK_COMPLETE;
+ crashed_ = false;
+ can_write_ = false;
+ write_file_->close();
+ delete write_file_;
+ write_file_ = nullptr;
+ write_file_owned_ = false;
+ return E_OK;
+ }
+
+ int64_t truncated = HEADER_LEN;
+ int64_t pos = HEADER_LEN;
+ std::vector buf(BUF_SIZE);
+
+ // Recover schema and chunk group meta per Java selfCheck
+ std::shared_ptr cur_device_id;
+ ChunkGroupMeta* cur_cgm = nullptr;
+ std::vector recovered_cgm_list;
+
+ auto flush_chunk_group = [this, &cur_device_id, &cur_cgm,
+ &recovered_cgm_list]() {
+ if (cur_cgm != nullptr && cur_device_id != nullptr) {
+ get_schema()->update_table_schema(cur_cgm);
+ recovered_cgm_list.push_back(cur_cgm);
+ cur_cgm = nullptr;
+ }
+ };
+
+ while (pos < file_size) {
+ unsigned char marker;
+ ret = reader.read(static_cast(pos),
+ reinterpret_cast(&marker), 1, read_len);
+ if (ret != E_OK || read_len != 1) {
+ break;
+ }
+ pos += 1;
+
+ if (marker == static_cast(SEPARATOR_MARKER)) {
+ truncated = pos - 1;
+ flush_chunk_group();
+ break;
+ }
+
+ if (marker == static_cast(CHUNK_GROUP_HEADER_MARKER)) {
+ truncated = pos - 1;
+ flush_chunk_group();
+ int seg_len = 0;
+ ret = reader.read(static_cast(pos), buf.data(), BUF_SIZE,
+ read_len);
+ if (ret != E_OK || read_len < 1) {
+ break;
+ }
+ ByteStream bs;
+ bs.wrap_from(buf.data(), read_len);
+ cur_device_id = std::make_shared("init");
+ ret = cur_device_id->deserialize(bs);
+ if (ret != E_OK) {
+ break;
+ }
+ seg_len = bs.read_pos();
+ pos += seg_len;
+ cur_cgm = new (self_check_arena_.alloc(sizeof(ChunkGroupMeta)))
+ ChunkGroupMeta(&self_check_arena_);
+ cur_cgm->init(cur_device_id);
+ continue;
+ }
+
+ if (marker == static_cast(OPERATION_INDEX_RANGE)) {
+ truncated = pos - 1;
+ flush_chunk_group();
+ cur_device_id.reset();
+ if (pos + 2 * 8 > static_cast(file_size)) {
+ break;
+ }
+ char range_buf[16];
+ ret =
+ reader.read(static_cast(pos), range_buf, 16, read_len);
+ if (ret != E_OK || read_len != 16) {
+ break;
+ }
+ pos += 16;
+ truncated = pos;
+ continue;
+ }
+
+ if (marker == static_cast(CHUNK_HEADER_MARKER) ||
+ marker ==
+ static_cast(ONLY_ONE_PAGE_CHUNK_HEADER_MARKER) ||
+ (marker & 0x3F) ==
+ static_cast(CHUNK_HEADER_MARKER) ||
+ (marker & 0x3F) ==
+ static_cast(ONLY_ONE_PAGE_CHUNK_HEADER_MARKER)) {
+ int64_t chunk_start = pos - 1;
+ int64_t consumed = 0;
+ ChunkHeader chdr;
+ ret = parse_chunk_header_and_skip(reader, chunk_start, consumed,
+ &chdr);
+ if (ret != E_OK) {
+ break;
+ }
+ pos = chunk_start + consumed;
+ truncated = pos;
+ if (cur_cgm != nullptr) {
+ void* cm_buf = self_check_arena_.alloc(sizeof(ChunkMeta));
+ if (IS_NULL(cm_buf)) {
+ ret = common::E_OOM;
+ break;
+ }
+ auto* cm = new (cm_buf) ChunkMeta();
+ common::String mname;
+ mname.dup_from(chdr.measurement_name_, self_check_arena_);
+ Statistic* stat = StatisticFactory::alloc_statistic_with_pa(
+ static_cast(chdr.data_type_),
+ &self_check_arena_);
+ if (IS_NULL(stat)) {
+ ret = common::E_OOM;
+ break;
+ }
+ stat->reset();
+ cm->init(mname,
+ static_cast(chdr.data_type_),
+ chunk_start, stat, 0,
+ static_cast(chdr.encoding_type_),
+ static_cast(
+ chdr.compression_type_),
+ self_check_arena_);
+ cur_cgm->push(cm);
+ if (cur_device_id != nullptr &&
+ (static_cast(chdr.chunk_type_) & 0x80) !=
+ 0) {
+ aligned_devices_.insert(cur_device_id->get_table_name());
+ }
+ }
+ continue;
+ }
+
+ truncated_size_ = TSFILE_CHECK_INCOMPATIBLE;
+ flush_chunk_group();
+ reader.close();
+ return E_TSFILE_CORRUPTED;
+ }
+
+ flush_chunk_group();
+ reader.close();
+ truncated_size_ = truncated;
+
+ if (truncate_corrupted && truncated < static_cast(file_size)) {
+ ret = write_file_->truncate(truncated);
+ if (ret != E_OK) {
+ return ret;
+ }
+ }
+
+ if (write_file_->seek_to_end() != E_OK) {
+ return E_FILE_READ_ERR;
+ }
+
+ crashed_ = true;
+ can_write_ = true;
+
+ ret = init(write_file_);
+ if (ret != E_OK) {
+ return ret;
+ }
+
+ for (ChunkGroupMeta* cgm : recovered_cgm_list) {
+ push_chunk_group_meta(cgm);
+ }
+
+ return E_OK;
+}
+
+bool RestorableTsFileIOWriter::is_device_aligned(
+ const std::string& device) const {
+ return aligned_devices_.find(device) != aligned_devices_.end();
+}
+
+TsFileIOWriter* RestorableTsFileIOWriter::get_tsfile_io_writer() {
+ return can_write_ ? this : nullptr;
+}
+
+WriteFile* RestorableTsFileIOWriter::get_write_file() {
+ return can_write_ ? write_file_ : nullptr;
+}
+
+std::string RestorableTsFileIOWriter::get_file_path() const {
+ return file_path_;
+}
+
+} // namespace storage
diff --git a/cpp/src/file/restorable_tsfile_io_writer.h b/cpp/src/file/restorable_tsfile_io_writer.h
new file mode 100644
index 000000000..4f5296b1e
--- /dev/null
+++ b/cpp/src/file/restorable_tsfile_io_writer.h
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef FILE_RESTORABLE_TSFILE_IO_WRITER_H
+#define FILE_RESTORABLE_TSFILE_IO_WRITER_H
+
+#include
+#include
+#include
+
+#include "common/schema.h"
+#include "file/tsfile_io_writer.h"
+#include "file/write_file.h"
+
+namespace storage {
+
+/**
+ * TsFile check status constants for self-check result.
+ * COMPLETE_FILE (0): File is complete, no recovery needed.
+ * INCOMPATIBLE_FILE (-2): File is not in TsFile format.
+ */
+constexpr int64_t TSFILE_CHECK_COMPLETE = 0;
+constexpr int64_t TSFILE_CHECK_INCOMPATIBLE = -2;
+
+/**
+ * RestorableTsFileIOWriter opens and optionally recovers a TsFile.
+ * Inherits from TsFileIOWriter for continued writing after recovery.
+ *
+ * (1) If the TsFile is closed normally: has_crashed()=false, can_write()=false
+ *
+ * (2) If the TsFile is incomplete/crashed: has_crashed()=true,
+ * can_write()=true, the writer truncates corrupted data and allows continued
+ * writing.
+ *
+ * Uses standard C++11 and avoids memory leaks via RAII and smart pointers.
+ */
+class RestorableTsFileIOWriter : public TsFileIOWriter {
+ public:
+ RestorableTsFileIOWriter();
+ ~RestorableTsFileIOWriter();
+
+ // Non-copyable
+ RestorableTsFileIOWriter(const RestorableTsFileIOWriter&) = delete;
+ RestorableTsFileIOWriter& operator=(const RestorableTsFileIOWriter&) =
+ delete;
+
+ /**
+ * Open a TsFile for recovery/append.
+ * Uses O_RDWR|O_CREAT without O_TRUNC, so existing file content is
+ * preserved.
+ *
+ * @param file_path Path to the TsFile
+ * @param truncate_corrupted If true, truncate corrupted data. If false,
+ * do not truncate (incomplete file will remain as-is).
+ * @return E_OK on success, error code otherwise.
+ */
+ int open(const std::string& file_path, bool truncate_corrupted = true);
+
+ void close();
+
+ bool can_write() const { return can_write_; }
+ bool has_crashed() const { return crashed_; }
+ int64_t get_truncated_size() const { return truncated_size_; }
+ std::shared_ptr get_known_schema() { return get_schema(); }
+
+ /** True if the device was recovered as aligned (has time column). */
+ bool is_device_aligned(const std::string& device) const;
+
+ /**
+ * Get the TsFileIOWriter for continued writing. Only valid when
+ * can_write() is true. Returns this (since we inherit TsFileIOWriter).
+ */
+ TsFileIOWriter* get_tsfile_io_writer();
+
+ /**
+ * Get the WriteFile for TsFileWriter::init(). Only valid when can_write().
+ * Caller must not destroy the returned pointer.
+ */
+ WriteFile* get_write_file();
+
+ std::string get_file_path() const;
+
+ private:
+ int self_check(bool truncate_corrupted);
+
+ private:
+ std::string file_path_;
+ WriteFile* write_file_;
+ bool write_file_owned_;
+
+ int64_t truncated_size_;
+ bool crashed_;
+ bool can_write_;
+
+ std::set aligned_devices_;
+ common::PageArena self_check_arena_;
+};
+
+} // namespace storage
+
+#endif // FILE_RESTORABLE_TSFILE_IO_WRITER_H
diff --git a/cpp/src/file/tsfile_io_writer.h b/cpp/src/file/tsfile_io_writer.h
index 901b2a5b5..6d1ed2bdc 100644
--- a/cpp/src/file/tsfile_io_writer.h
+++ b/cpp/src/file/tsfile_io_writer.h
@@ -197,6 +197,12 @@ class TsFileIOWriter {
// for open file
void add_ts_time_index_entry(TimeseriesIndex& ts_index);
+ protected:
+ /** For RestorableTsFileIOWriter: add recovered chunk group meta. */
+ void push_chunk_group_meta(ChunkGroupMeta* cgm) {
+ chunk_group_meta_list_.push_back(cgm);
+ }
+
private:
common::PageArena meta_allocator_;
common::ByteStream write_stream_;
diff --git a/cpp/src/file/write_file.cc b/cpp/src/file/write_file.cc
index 004a3e9d4..e0ce619da 100644
--- a/cpp/src/file/write_file.cc
+++ b/cpp/src/file/write_file.cc
@@ -32,6 +32,7 @@
#include "utils/errno_define.h"
#ifdef _WIN32
+#include
int fsync(int);
#endif
@@ -118,7 +119,12 @@ int WriteFile::sync() {
}
int WriteFile::close() {
- ASSERT(fd_ > 0);
+ if (fd_ < 0) {
+#ifdef DEBUG_SE
+ std::cout << "file already closed, path=" << path_;
+#endif
+ return E_OK;
+ }
if (::close(fd_) < 0) {
#ifdef DEBUG_SE
std::cout << "failed to close " << path_ << " errorno " << errno
@@ -134,6 +140,34 @@ int WriteFile::close() {
return E_OK;
}
+int WriteFile::truncate(int64_t size) {
+ ASSERT(fd_ > 0);
+#ifdef _WIN32
+ if (_chsize_s(fd_, static_cast(size)) != 0) {
+ return E_FILE_WRITE_ERR;
+ }
+#else
+ if (::ftruncate(fd_, static_cast(size)) < 0) {
+ return E_FILE_WRITE_ERR;
+ }
+#endif
+ return E_OK;
+}
+
+int WriteFile::seek_to_end() {
+ ASSERT(fd_ > 0);
+#ifdef _WIN32
+ if (_lseeki64(fd_, 0, SEEK_END) < 0) {
+ return E_FILE_READ_ERR;
+ }
+#else
+ if (::lseek(fd_, 0, SEEK_END) < 0) {
+ return E_FILE_READ_ERR;
+ }
+#endif
+ return E_OK;
+}
+
} // end namespace storage
#ifdef _WIN32
diff --git a/cpp/src/file/write_file.h b/cpp/src/file/write_file.h
index 1c1f85118..85260a3be 100644
--- a/cpp/src/file/write_file.h
+++ b/cpp/src/file/write_file.h
@@ -45,6 +45,8 @@ class WriteFile {
// int flush() { return common::E_OK; } // TODO
int sync();
int close();
+ int truncate(int64_t size);
+ int seek_to_end();
FORCE_INLINE std::string get_file_path() { return path_; }
private:
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 2c2e46b97..f957bc276 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -23,6 +23,7 @@
#include "chunk_writer.h"
#include "common/config/config.h"
+#include "file/restorable_tsfile_io_writer.h"
#include "file/tsfile_io_writer.h"
#include "file/write_file.h"
#include "utils/errno_define.h"
@@ -68,7 +69,8 @@ TsFileWriter::TsFileWriter()
record_count_since_last_flush_(0),
record_count_for_next_mem_check_(
g_config_value_.record_count_for_next_mem_check_),
- write_file_created_(false) {}
+ write_file_created_(false),
+ io_writer_owned_(true) {}
TsFileWriter::~TsFileWriter() { destroy(); }
@@ -77,10 +79,10 @@ void TsFileWriter::destroy() {
delete write_file_;
write_file_ = nullptr;
}
- if (io_writer_) {
+ if (io_writer_owned_ && io_writer_) {
delete io_writer_;
- io_writer_ = nullptr;
}
+ io_writer_ = nullptr;
DeviceSchemasMapIter dev_iter;
// cppcheck-suppress postfixOperator
for (dev_iter = schemas_.begin(); dev_iter != schemas_.end(); dev_iter++) {
@@ -113,11 +115,48 @@ int TsFileWriter::init(WriteFile* write_file) {
}
write_file_ = write_file;
write_file_created_ = false;
+ io_writer_owned_ = true;
io_writer_ = new TsFileIOWriter();
io_writer_->init(write_file_);
return E_OK;
}
+int TsFileWriter::init(RestorableTsFileIOWriter* rw) {
+ if (rw == nullptr) {
+ return E_INVALID_ARG;
+ }
+ if (!rw->can_write()) {
+ return E_INVALID_ARG;
+ }
+ write_file_ = rw->get_write_file();
+ write_file_created_ = false;
+ io_writer_owned_ = false;
+ io_writer_ = rw; // use RestorableTsFileIOWriter as io_writer_
+ std::shared_ptr known = rw->get_known_schema();
+ for (const auto& kv : known->table_schema_map_) {
+ const std::string& table_name = kv.first;
+ const std::shared_ptr& ts = kv.second;
+ if (!ts || ts->get_measurement_names().empty()) {
+ continue;
+ }
+ auto device_id = std::make_shared(table_name);
+ auto* ms_group = new MeasurementSchemaGroup;
+ ms_group->is_aligned_ = rw->is_device_aligned(table_name);
+ for (const auto& ms : ts->get_measurement_schemas()) {
+ if (ms) {
+ auto* copy =
+ new MeasurementSchema(ms->measurement_name_, ms->data_type_,
+ ms->encoding_, ms->compression_type_);
+ ms_group->measurement_schema_map_.insert(
+ std::make_pair(ms->measurement_name_, copy));
+ }
+ }
+ schemas_.insert(std::make_pair(device_id, ms_group));
+ }
+ start_file_done_ = true;
+ return E_OK;
+}
+
void TsFileWriter::set_generate_table_schema(bool generate_table_schema) {
io_writer_->set_generate_table_schema(generate_table_schema);
}
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index e80a1232b..85c47db7f 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -37,6 +37,7 @@ namespace storage {
class WriteFile;
class ChunkWriter;
class TsFileIOWriter;
+class RestorableTsFileIOWriter;
} // namespace storage
namespace storage {
@@ -55,6 +56,7 @@ class TsFileWriter {
int open(const std::string& file_path, int flags, mode_t mode);
int open(const std::string& file_path);
int init(storage::WriteFile* write_file);
+ int init(storage::RestorableTsFileIOWriter* rw);
void set_generate_table_schema(bool generate_table_schema);
int register_timeseries(const std::string& device_id,
@@ -183,6 +185,7 @@ class TsFileWriter {
// record count for next memory check
int64_t record_count_for_next_mem_check_;
bool write_file_created_;
+ bool io_writer_owned_; // false when init(RestorableTsFileIOWriter*)
bool table_aligned_ = true;
int write_typed_column(ValueChunkWriter* value_chunk_writer,
diff --git a/cpp/test/file/restorable_tsfile_io_writer_test.cc b/cpp/test/file/restorable_tsfile_io_writer_test.cc
new file mode 100644
index 000000000..ff8d1dad7
--- /dev/null
+++ b/cpp/test/file/restorable_tsfile_io_writer_test.cc
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "file/restorable_tsfile_io_writer.h"
+
+#include
+
+#include
+
+#include "common/record.h"
+#include "common/tsfile_common.h"
+#include "file/write_file.h"
+#include "writer/tsfile_writer.h"
+
+using namespace storage;
+using namespace common;
+
+class RestorableTsFileIOWriterTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ libtsfile_init();
+ file_name_ = "restorable_tsfile_io_writer_test.tsfile";
+ remove(file_name_.c_str());
+ }
+
+ void TearDown() override {
+ remove(file_name_.c_str());
+ libtsfile_destroy();
+ }
+
+ std::string file_name_;
+};
+
+TEST_F(RestorableTsFileIOWriterTest, OpenEmptyFile) {
+ RestorableTsFileIOWriter writer;
+ ASSERT_EQ(writer.open(file_name_, true), E_OK);
+ EXPECT_TRUE(writer.can_write());
+ EXPECT_TRUE(writer.has_crashed());
+ EXPECT_EQ(writer.get_truncated_size(), 0);
+ EXPECT_NE(writer.get_tsfile_io_writer(), nullptr);
+ writer.close();
+}
+
+TEST_F(RestorableTsFileIOWriterTest, OpenBadMagicFile) {
+ std::ofstream f(file_name_);
+ f.write("BadFile", 7);
+ f.close();
+
+ RestorableTsFileIOWriter writer;
+ EXPECT_NE(writer.open(file_name_, true), E_OK);
+ EXPECT_EQ(writer.get_truncated_size(), TSFILE_CHECK_INCOMPATIBLE);
+ writer.close();
+}
+
+TEST_F(RestorableTsFileIOWriterTest, OpenCompleteFile) {
+ TsFileWriter tw;
+ int flags = O_WRONLY | O_CREAT | O_TRUNC;
+#ifdef _WIN32
+ flags |= O_BINARY;
+#endif
+ ASSERT_EQ(tw.open(file_name_, flags, 0666), E_OK);
+ tw.register_timeseries(
+ "d1",
+ MeasurementSchema("s1", FLOAT, GORILLA, CompressionType::UNCOMPRESSED));
+ TsRecord record(1, "d1");
+ record.add_point("s1", 1.0f);
+ tw.write_record(record);
+
+ record.timestamp_ = 2;
+ tw.write_record(record);
+
+ tw.flush();
+ tw.close();
+
+ RestorableTsFileIOWriter writer;
+ ASSERT_EQ(writer.open(file_name_, true), E_OK);
+ EXPECT_FALSE(writer.can_write());
+ EXPECT_FALSE(writer.has_crashed());
+ EXPECT_EQ(writer.get_truncated_size(), TSFILE_CHECK_COMPLETE);
+ EXPECT_EQ(writer.get_tsfile_io_writer(), nullptr);
+ writer.close();
+}
+
+TEST_F(RestorableTsFileIOWriterTest, OpenTruncatedFile) {
+ TsFileWriter tw;
+ int flags = O_WRONLY | O_CREAT | O_TRUNC;
+#ifdef _WIN32
+ flags |= O_BINARY;
+#endif
+ ASSERT_EQ(tw.open(file_name_, flags, 0666), E_OK);
+ tw.register_timeseries(
+ "d1",
+ MeasurementSchema("s1", FLOAT, RLE, CompressionType::UNCOMPRESSED));
+ TsRecord record(1, "d1");
+ record.add_point("s1", 1.0f);
+ tw.write_record(record);
+ tw.flush();
+ tw.close();
+
+ std::streampos full_size;
+ {
+ std::ifstream f(file_name_, std::ios::binary | std::ios::ate);
+ full_size = f.tellg();
+ }
+
+ std::ofstream corrupt(file_name_, std::ios::binary | std::ios::in);
+ corrupt.seekp(full_size - std::streamoff(5));
+ corrupt.put(0);
+ corrupt.put(0);
+ corrupt.put(0);
+ corrupt.put(0);
+ corrupt.put(0);
+ corrupt.close();
+
+ RestorableTsFileIOWriter writer;
+ ASSERT_EQ(writer.open(file_name_, true), E_OK);
+ EXPECT_TRUE(writer.can_write());
+ EXPECT_TRUE(writer.has_crashed());
+ EXPECT_GE(writer.get_truncated_size(),
+ static_cast(MAGIC_STRING_TSFILE_LEN + 1));
+ EXPECT_LE(writer.get_truncated_size(), static_cast(full_size));
+ EXPECT_NE(writer.get_tsfile_io_writer(), nullptr);
+ writer.close();
+}
+
+TEST_F(RestorableTsFileIOWriterTest, OpenFileWithOnlyHeader) {
+ WriteFile wf;
+ int flags = O_RDWR | O_CREAT | O_TRUNC;
+#ifdef _WIN32
+ flags |= O_BINARY;
+#endif
+ ASSERT_EQ(wf.create(file_name_, flags, 0666), E_OK);
+ wf.write(MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN);
+ wf.write(&VERSION_NUM_BYTE, 1);
+ wf.close();
+
+ RestorableTsFileIOWriter writer;
+ ASSERT_EQ(writer.open(file_name_, true), E_OK);
+ EXPECT_TRUE(writer.can_write());
+ EXPECT_TRUE(writer.has_crashed());
+ EXPECT_EQ(writer.get_truncated_size(), MAGIC_STRING_TSFILE_LEN + 1);
+ writer.close();
+}
+
+TEST_F(RestorableTsFileIOWriterTest, TruncateRecoversAndProvidesWriter) {
+ TsFileWriter tw;
+ int flags = O_WRONLY | O_CREAT | O_TRUNC;
+#ifdef _WIN32
+ flags |= O_BINARY;
+#endif
+ ASSERT_EQ(tw.open(file_name_, flags, 0666), E_OK);
+ tw.register_timeseries(
+ "d1",
+ MeasurementSchema("s1", FLOAT, GORILLA, CompressionType::UNCOMPRESSED));
+ TsRecord record(1, "d1");
+ record.add_point("s1", 1.0f);
+ tw.write_record(record);
+
+ record.timestamp_ = 2;
+ tw.write_record(record);
+
+ tw.flush();
+ tw.close();
+
+ std::streampos full_size;
+ {
+ std::ifstream f(file_name_, std::ios::binary | std::ios::ate);
+ full_size = f.tellg();
+ }
+
+ std::ofstream corrupt(file_name_, std::ios::binary | std::ios::in);
+ corrupt.seekp(full_size - std::streamoff(3));
+ corrupt.put(0);
+ corrupt.put(0);
+ corrupt.put(0);
+ corrupt.close();
+
+ RestorableTsFileIOWriter rw;
+ ASSERT_EQ(rw.open(file_name_, true), E_OK);
+ ASSERT_TRUE(rw.can_write());
+ ASSERT_NE(rw.get_tsfile_io_writer(), nullptr);
+ ASSERT_NE(rw.get_write_file(), nullptr);
+ EXPECT_EQ(rw.get_file_path(), file_name_);
+
+ TsFileWriter tw2;
+ ASSERT_EQ(tw2.init(&rw), E_OK);
+ TsRecord record2(3, "d1");
+ record2.add_point("s1", 3.0f);
+ ASSERT_EQ(tw2.write_record(record2), E_OK);
+ tw2.close();
+ rw.close();
+}
+
+TEST_F(RestorableTsFileIOWriterTest,
+ MultiDeviceMultiMeasurementRecoverAndWrite) {
+ TsFileWriter tw;
+ int flags = O_WRONLY | O_CREAT | O_TRUNC;
+#ifdef _WIN32
+ flags |= O_BINARY;
+#endif
+ ASSERT_EQ(tw.open(file_name_, flags, 0666), E_OK);
+ tw.register_timeseries("d1", MeasurementSchema("s1", FLOAT));
+ tw.register_timeseries("d1", MeasurementSchema("s2", INT32));
+ tw.register_timeseries("d2", MeasurementSchema("s1", FLOAT));
+ tw.register_timeseries("d2", MeasurementSchema("s2", DOUBLE));
+
+ TsRecord r1(1, "d1");
+ r1.add_point("s1", 1.0f);
+ r1.add_point("s2", 10);
+ ASSERT_EQ(tw.write_record(r1), E_OK);
+
+ TsRecord r2(2, "d2");
+ r2.add_point("s1", 2.0f);
+ r2.add_point("s2", 20.0);
+ ASSERT_EQ(tw.write_record(r2), E_OK);
+
+ tw.flush();
+ tw.close();
+
+ std::streampos full_size;
+ {
+ std::ifstream f(file_name_, std::ios::binary | std::ios::ate);
+ full_size = f.tellg();
+ }
+
+ std::ofstream corrupt(file_name_, std::ios::binary | std::ios::in);
+ corrupt.seekp(full_size - std::streamoff(3));
+ corrupt.put(0);
+ corrupt.put(0);
+ corrupt.put(0);
+ corrupt.close();
+
+ RestorableTsFileIOWriter rw;
+ ASSERT_EQ(rw.open(file_name_, true), E_OK);
+ ASSERT_TRUE(rw.can_write());
+
+ TsFileWriter tw2;
+ ASSERT_EQ(tw2.init(&rw), E_OK);
+
+ TsRecord r3(3, "d1");
+ r3.add_point("s1", 3.0f);
+ r3.add_point("s2", 30);
+ ASSERT_EQ(tw2.write_record(r3), E_OK);
+
+ TsRecord r4(4, "d2");
+ r4.add_point("s1", 4.0f);
+ r4.add_point("s2", 40.0);
+ ASSERT_EQ(tw2.write_record(r4), E_OK);
+
+ tw2.close();
+ rw.close();
+}
+
+TEST_F(RestorableTsFileIOWriterTest, AlignedTimeseriesRecoverAndWrite) {
+ TsFileWriter tw;
+ int flags = O_WRONLY | O_CREAT | O_TRUNC;
+#ifdef _WIN32
+ flags |= O_BINARY;
+#endif
+ ASSERT_EQ(tw.open(file_name_, flags, 0666), E_OK);
+
+ std::vector aligned_schemas;
+ aligned_schemas.push_back(new MeasurementSchema("s1", FLOAT));
+ aligned_schemas.push_back(new MeasurementSchema("s2", FLOAT));
+ tw.register_aligned_timeseries("d1", aligned_schemas);
+
+ TsRecord r1(1, "d1");
+ r1.add_point("s1", 1.0f);
+ r1.add_point("s2", 2.0f);
+ ASSERT_EQ(tw.write_record_aligned(r1), E_OK);
+
+ TsRecord r2(2, "d1");
+ r2.add_point("s1", 3.0f);
+ r2.add_point("s2", 4.0f);
+ ASSERT_EQ(tw.write_record_aligned(r2), E_OK);
+
+ tw.flush();
+ tw.close();
+
+ std::streampos full_size;
+ {
+ std::ifstream f(file_name_, std::ios::binary | std::ios::ate);
+ full_size = f.tellg();
+ }
+
+ std::ofstream corrupt(file_name_, std::ios::binary | std::ios::in);
+ corrupt.seekp(full_size - std::streamoff(3));
+ corrupt.put(0);
+ corrupt.put(0);
+ corrupt.put(0);
+ corrupt.close();
+
+ RestorableTsFileIOWriter rw;
+ ASSERT_EQ(rw.open(file_name_, true), E_OK);
+ ASSERT_TRUE(rw.can_write());
+
+ TsFileWriter tw2;
+ ASSERT_EQ(tw2.init(&rw), E_OK);
+
+ TsRecord r3(3, "d1");
+ r3.add_point("s1", 5.0f);
+ r3.add_point("s2", 6.0f);
+ ASSERT_EQ(tw2.write_record_aligned(r3), E_OK);
+
+ tw2.close();
+ rw.close();
+}
diff --git a/cpp/test/file/write_file_test.cc b/cpp/test/file/write_file_test.cc
index 1345b7bee..802b1a8f1 100644
--- a/cpp/test/file/write_file_test.cc
+++ b/cpp/test/file/write_file_test.cc
@@ -112,3 +112,30 @@ TEST_F(WriteFileTest, CloseFile) {
EXPECT_EQ(write_file.write(content, content_len), E_OK);
EXPECT_EQ(write_file.close(), E_OK);
}
+
+TEST_F(WriteFileTest, TruncateFile) {
+ WriteFile write_file;
+ std::string file_name = "test_file_truncate.dat";
+
+ remove(file_name.c_str());
+
+ int flags = O_RDWR | O_CREAT | O_TRUNC;
+#ifdef _WIN32
+ flags |= O_BINARY;
+#endif
+ mode_t mode = 0666;
+ EXPECT_EQ(write_file.create(file_name, flags, mode), E_OK);
+ EXPECT_TRUE(write_file.file_opened());
+
+ const char* content = "Hello, Truncate World!";
+ uint32_t content_len = strlen(content);
+ EXPECT_EQ(write_file.write(content, content_len), E_OK);
+ EXPECT_EQ(write_file.truncate(7), E_OK);
+ write_file.close();
+
+ std::ifstream file(file_name);
+ std::string file_content((std::istreambuf_iterator(file)),
+ std::istreambuf_iterator());
+ EXPECT_EQ(file_content, "Hello, ");
+ remove(file_name.c_str());
+}
diff --git a/cpp/test/writer/tsfile_writer_test.cc b/cpp/test/writer/tsfile_writer_test.cc
index 25684e726..30fded6eb 100644
--- a/cpp/test/writer/tsfile_writer_test.cc
+++ b/cpp/test/writer/tsfile_writer_test.cc
@@ -115,11 +115,6 @@ class TsFileWriterTest : public ::testing::Test {
class TsFileWriterTestSimple : public ::testing::Test {};
-TEST_F(TsFileWriterTestSimple, InitWithNullWriteFile) {
- TsFileWriter writer;
- ASSERT_EQ(writer.init(nullptr), E_INVALID_ARG);
-}
-
TEST_F(TsFileWriterTest, WriteDiffDataType) {
std::string device_name = "test_table";
common::TSEncoding encoding = common::TSEncoding::PLAIN;
From f36473459b2c89665e3d71ab6fa647f9e3ac4072 Mon Sep 17 00:00:00 2001
From: 761417898 <761417898@qq.com>
Date: Tue, 10 Feb 2026 16:00:48 +0800
Subject: [PATCH 04/23] Support continued writing to restored files in the tree
model & table model interfaces.
---
cpp/src/common/schema.h | 59 +++++++++++--
cpp/src/file/restorable_tsfile_io_writer.cc | 2 +
cpp/src/file/tsfile_io_writer.cc | 24 +++---
cpp/src/file/tsfile_io_writer.h | 2 +
cpp/src/writer/tsfile_table_writer.cc | 25 ++++++
cpp/src/writer/tsfile_table_writer.h | 14 ++++
cpp/src/writer/tsfile_tree_writer.cc | 10 +++
cpp/src/writer/tsfile_tree_writer.h | 14 ++++
cpp/src/writer/tsfile_writer.cc | 7 ++
.../file/restorable_tsfile_io_writer_test.cc | 82 +++++++++++++++++--
10 files changed, 213 insertions(+), 26 deletions(-)
diff --git a/cpp/src/common/schema.h b/cpp/src/common/schema.h
index 499dd5bc7..a2c989af2 100644
--- a/cpp/src/common/schema.h
+++ b/cpp/src/common/schema.h
@@ -182,7 +182,7 @@ struct MeasurementSchemaGroup {
*/
class TableSchema {
public:
- TableSchema() = default;
+ TableSchema() : updatable_(true) {}
/**
* Constructs a TableSchema object with the given table name, column
@@ -197,7 +197,7 @@ class TableSchema {
*/
TableSchema(const std::string& table_name,
const std::vector& column_schemas)
- : table_name_(table_name) {
+ : table_name_(table_name), updatable_(false) {
to_lowercase_inplace(table_name_);
for (const common::ColumnSchema& column_schema : column_schemas) {
column_schemas_.emplace_back(std::make_shared(
@@ -217,7 +217,9 @@ class TableSchema {
TableSchema(const std::string& table_name,
const std::vector& column_schemas,
const std::vector& column_categories)
- : table_name_(table_name), column_categories_(column_categories) {
+ : table_name_(table_name),
+ column_categories_(column_categories),
+ updatable_(false) {
to_lowercase_inplace(table_name_);
for (const auto column_schema : column_schemas) {
if (column_schema != nullptr) {
@@ -236,11 +238,13 @@ class TableSchema {
TableSchema(TableSchema&& other) noexcept
: table_name_(std::move(other.table_name_)),
column_schemas_(std::move(other.column_schemas_)),
- column_categories_(std::move(other.column_categories_)) {}
+ column_categories_(std::move(other.column_categories_)),
+ updatable_(other.updatable_) {}
TableSchema(const TableSchema& other) noexcept
: table_name_(other.table_name_),
- column_categories_(other.column_categories_) {
+ column_categories_(other.column_categories_),
+ updatable_(false) {
for (const auto& column_schema : other.column_schemas_) {
// Just call default construction
column_schemas_.emplace_back(
@@ -342,6 +346,14 @@ class TableSchema {
size_t get_column_pos_index_num() const { return column_pos_index_.size(); }
void update(ChunkGroupMeta* chunk_group_meta) {
+ if (!updatable_) {
+ return;
+ }
+ std::shared_ptr device_id = chunk_group_meta->device_id_;
+ const int seg_num = device_id ? device_id->segment_num() : 0;
+ if (seg_num > max_level_) {
+ max_level_ = seg_num;
+ }
for (auto iter = chunk_group_meta->chunk_meta_list_.begin();
iter != chunk_group_meta->chunk_meta_list_.end(); iter++) {
auto& chunk_meta = iter.get();
@@ -371,6 +383,29 @@ class TableSchema {
}
}
+ void finalize_column_schema() {
+ if (!updatable_) {
+ return;
+ }
+ std::vector> id_columns;
+ for (int i = 1; i < max_level_; i++) {
+ std::string col_name = "__level" + std::to_string(i);
+ id_columns.push_back(std::make_shared(
+ col_name, common::STRING, common::PLAIN,
+ common::CompressionType::UNCOMPRESSED));
+ }
+ column_schemas_.insert(column_schemas_.begin(), id_columns.begin(),
+ id_columns.end());
+ column_categories_.insert(column_categories_.begin(), id_columns.size(),
+ common::ColumnCategory::TAG);
+ column_pos_index_.clear();
+ for (size_t i = 0; i < column_schemas_.size(); i++) {
+ column_pos_index_[to_lower(column_schemas_[i]->measurement_name_)] =
+ static_cast(i);
+ }
+ updatable_ = false;
+ }
+
std::vector get_data_types() const {
std::vector ret;
for (const auto& measurement_schema : column_schemas_) {
@@ -424,6 +459,8 @@ class TableSchema {
std::vector column_categories_;
std::map column_pos_index_;
bool is_virtual_table_ = false;
+ int max_level_ = 0;
+ bool updatable_ = false;
};
struct Schema {
@@ -433,11 +470,19 @@ struct Schema {
void update_table_schema(ChunkGroupMeta* chunk_group_meta) {
std::shared_ptr device_id = chunk_group_meta->device_id_;
- auto table_name = device_id->get_table_name();
+ std::string table_name = device_id->get_table_name();
if (table_schema_map_.find(table_name) == table_schema_map_.end()) {
table_schema_map_[table_name] = std::make_shared();
}
- table_schema_map_[table_name]->update(chunk_group_meta);
+ auto& ts = table_schema_map_[table_name];
+ ts->set_table_name(table_name);
+ ts->update(chunk_group_meta);
+ }
+
+ void finalize_table_schemas() {
+ for (auto& kv : table_schema_map_) {
+ kv.second->finalize_column_schema();
+ }
}
void register_table_schema(
const std::shared_ptr& table_schema) {
diff --git a/cpp/src/file/restorable_tsfile_io_writer.cc b/cpp/src/file/restorable_tsfile_io_writer.cc
index 8b44e803c..f563c56a4 100644
--- a/cpp/src/file/restorable_tsfile_io_writer.cc
+++ b/cpp/src/file/restorable_tsfile_io_writer.cc
@@ -468,6 +468,7 @@ int RestorableTsFileIOWriter::self_check(bool truncate_corrupted) {
}
flush_chunk_group();
+ get_schema()->finalize_table_schemas();
reader.close();
truncated_size_ = truncated;
@@ -493,6 +494,7 @@ int RestorableTsFileIOWriter::self_check(bool truncate_corrupted) {
for (ChunkGroupMeta* cgm : recovered_cgm_list) {
push_chunk_group_meta(cgm);
}
+ chunk_group_meta_from_recovery_ = true;
return E_OK;
}
diff --git a/cpp/src/file/tsfile_io_writer.cc b/cpp/src/file/tsfile_io_writer.cc
index d6888086f..dadadb5dc 100644
--- a/cpp/src/file/tsfile_io_writer.cc
+++ b/cpp/src/file/tsfile_io_writer.cc
@@ -70,17 +70,19 @@ int TsFileIOWriter::init(WriteFile* write_file) {
}
void TsFileIOWriter::destroy() {
- for (auto iter = chunk_group_meta_list_.begin();
- iter != chunk_group_meta_list_.end(); iter++) {
- if (iter.get() && iter.get()->device_id_) {
- iter.get()->device_id_.reset();
- }
- if (iter.get()) {
- for (auto chunk_meta = iter.get()->chunk_meta_list_.begin();
- chunk_meta != iter.get()->chunk_meta_list_.end();
- chunk_meta++) {
- if (chunk_meta.get()) {
- chunk_meta.get()->statistic_->destroy();
+ if (!chunk_group_meta_from_recovery_) {
+ for (auto iter = chunk_group_meta_list_.begin();
+ iter != chunk_group_meta_list_.end(); iter++) {
+ if (iter.get() && iter.get()->device_id_) {
+ iter.get()->device_id_.reset();
+ }
+ if (iter.get()) {
+ for (auto chunk_meta = iter.get()->chunk_meta_list_.begin();
+ chunk_meta != iter.get()->chunk_meta_list_.end();
+ chunk_meta++) {
+ if (chunk_meta.get()) {
+ chunk_meta.get()->statistic_->destroy();
+ }
}
}
}
diff --git a/cpp/src/file/tsfile_io_writer.h b/cpp/src/file/tsfile_io_writer.h
index 6d1ed2bdc..151af97c6 100644
--- a/cpp/src/file/tsfile_io_writer.h
+++ b/cpp/src/file/tsfile_io_writer.h
@@ -202,6 +202,8 @@ class TsFileIOWriter {
void push_chunk_group_meta(ChunkGroupMeta* cgm) {
chunk_group_meta_list_.push_back(cgm);
}
+ /** If true, destroy() skips chunk_group_meta_list_ (entries from arena). */
+ bool chunk_group_meta_from_recovery_ = false;
private:
common::PageArena meta_allocator_;
diff --git a/cpp/src/writer/tsfile_table_writer.cc b/cpp/src/writer/tsfile_table_writer.cc
index 6dd990188..7288860c3 100644
--- a/cpp/src/writer/tsfile_table_writer.cc
+++ b/cpp/src/writer/tsfile_table_writer.cc
@@ -19,6 +19,31 @@
#include "tsfile_table_writer.h"
+#include "file/restorable_tsfile_io_writer.h"
+
+namespace storage {
+
+TsFileTableWriter::TsFileTableWriter(
+ storage::RestorableTsFileIOWriter* restorable_writer,
+ uint64_t memory_threshold)
+ : error_number(common::E_OK) {
+ tsfile_writer_ = std::make_shared();
+ error_number = tsfile_writer_->init(restorable_writer);
+ if (error_number != common::E_OK) {
+ return;
+ }
+ tsfile_writer_->set_generate_table_schema(false);
+ std::shared_ptr schema = restorable_writer->get_known_schema();
+ if (schema && schema->table_schema_map_.size() == 1) {
+ exclusive_table_name_ = schema->table_schema_map_.begin()->first;
+ } else {
+ exclusive_table_name_.clear();
+ }
+ common::g_config_value_.chunk_group_size_threshold_ = memory_threshold;
+}
+
+} // namespace storage
+
storage::TsFileTableWriter::~TsFileTableWriter() = default;
int storage::TsFileTableWriter::register_table(
diff --git a/cpp/src/writer/tsfile_table_writer.h b/cpp/src/writer/tsfile_table_writer.h
index d3fc918b7..ce18bc007 100644
--- a/cpp/src/writer/tsfile_table_writer.h
+++ b/cpp/src/writer/tsfile_table_writer.h
@@ -22,6 +22,7 @@
#include "writer/tsfile_writer.h"
namespace storage {
+class RestorableTsFileIOWriter;
/**
* @brief Facilitates writing structured table data into a TsFile with a
@@ -66,6 +67,19 @@ class TsFileTableWriter {
common::g_config_value_.chunk_group_size_threshold_ = memory_threshold;
}
+ /**
+ * Constructs a TsFileTableWriter from a RestorableTsFileIOWriter so that
+ * table data can be appended after recovery. Schema is taken from the
+ * restored file; do not pass a TableSchema.
+ *
+ * @param restorable_writer Restored I/O writer; must not be null and must
+ * have been opened with truncate so that can_write() is true.
+ * @param memory_threshold Optional memory threshold for buffered data.
+ */
+ explicit TsFileTableWriter(
+ storage::RestorableTsFileIOWriter* restorable_writer,
+ uint64_t memory_threshold = 128 * 1024 * 1024);
+
~TsFileTableWriter();
/**
* Registers a table schema with the writer.
diff --git a/cpp/src/writer/tsfile_tree_writer.cc b/cpp/src/writer/tsfile_tree_writer.cc
index 59c11914d..5066678d8 100644
--- a/cpp/src/writer/tsfile_tree_writer.cc
+++ b/cpp/src/writer/tsfile_tree_writer.cc
@@ -19,6 +19,8 @@
#include "writer/tsfile_tree_writer.h"
+#include "file/restorable_tsfile_io_writer.h"
+
namespace storage {
TsFileTreeWriter::TsFileTreeWriter(storage::WriteFile* writer_file,
@@ -28,6 +30,14 @@ TsFileTreeWriter::TsFileTreeWriter(storage::WriteFile* writer_file,
common::g_config_value_.chunk_group_size_threshold_ = memory_threshold;
}
+TsFileTreeWriter::TsFileTreeWriter(
+ storage::RestorableTsFileIOWriter* restorable_writer,
+ uint64_t memory_threshold) {
+ tsfile_writer_ = std::make_shared();
+ tsfile_writer_->init(restorable_writer);
+ common::g_config_value_.chunk_group_size_threshold_ = memory_threshold;
+}
+
int TsFileTreeWriter::register_timeseries(std::string& device_id,
MeasurementSchema* schema) {
return tsfile_writer_->register_timeseries(device_id, *schema);
diff --git a/cpp/src/writer/tsfile_tree_writer.h b/cpp/src/writer/tsfile_tree_writer.h
index 90ef0d76f..3c21c23da 100644
--- a/cpp/src/writer/tsfile_tree_writer.h
+++ b/cpp/src/writer/tsfile_tree_writer.h
@@ -26,6 +26,7 @@
#include "tsfile_writer.h"
namespace storage {
+class RestorableTsFileIOWriter;
/**
* @brief Provides an interface for writing hierarchical (tree-structured)
@@ -56,6 +57,19 @@ class TsFileTreeWriter {
explicit TsFileTreeWriter(storage::WriteFile* writer_file,
uint64_t memory_threshold = 128 * 1024 * 1024);
+ /**
+ * Constructs a TsFileTreeWriter from a RestorableTsFileIOWriter so that
+ * data can be appended after recovery (schema and alignment are taken from
+ * the restored file).
+ *
+ * @param restorable_writer Restored I/O writer; must not be null and must
+ * have been opened and scanned (e.g. after truncate recovery).
+ * @param memory_threshold Optional memory threshold for buffered data.
+ */
+ explicit TsFileTreeWriter(
+ storage::RestorableTsFileIOWriter* restorable_writer,
+ uint64_t memory_threshold = 128 * 1024 * 1024);
+
/**
* Registers a single (non-aligned) time series under the given device ID.
*
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index f957bc276..00870eb21 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -534,6 +534,13 @@ int TsFileWriter::do_check_schema_table(
schemas_[device_id] = device_schema;
}
+ if (IS_NULL(device_schema->time_chunk_writer_)) {
+ device_schema->time_chunk_writer_ = new TimeChunkWriter();
+ device_schema->time_chunk_writer_->init(
+ "", g_config_value_.time_encoding_type_,
+ g_config_value_.time_compress_type_);
+ }
+
uint32_t column_cnt = tablet.get_column_count();
time_chunk_writer = device_schema->time_chunk_writer_;
MeasurementSchemaMap& msm = device_schema->measurement_schema_map_;
diff --git a/cpp/test/file/restorable_tsfile_io_writer_test.cc b/cpp/test/file/restorable_tsfile_io_writer_test.cc
index ff8d1dad7..7a6b6f507 100644
--- a/cpp/test/file/restorable_tsfile_io_writer_test.cc
+++ b/cpp/test/file/restorable_tsfile_io_writer_test.cc
@@ -24,8 +24,12 @@
#include
#include "common/record.h"
+#include "common/schema.h"
+#include "common/tablet.h"
#include "common/tsfile_common.h"
#include "file/write_file.h"
+#include "writer/tsfile_table_writer.h"
+#include "writer/tsfile_tree_writer.h"
#include "writer/tsfile_writer.h"
using namespace storage;
@@ -207,8 +211,7 @@ TEST_F(RestorableTsFileIOWriterTest, TruncateRecoversAndProvidesWriter) {
rw.close();
}
-TEST_F(RestorableTsFileIOWriterTest,
- MultiDeviceMultiMeasurementRecoverAndWrite) {
+TEST_F(RestorableTsFileIOWriterTest, MultiDeviceRecoverAndWriteWithTreeWriter) {
TsFileWriter tw;
int flags = O_WRONLY | O_CREAT | O_TRUNC;
#ifdef _WIN32
@@ -250,20 +253,18 @@ TEST_F(RestorableTsFileIOWriterTest,
ASSERT_EQ(rw.open(file_name_, true), E_OK);
ASSERT_TRUE(rw.can_write());
- TsFileWriter tw2;
- ASSERT_EQ(tw2.init(&rw), E_OK);
-
+ TsFileTreeWriter tree_writer(&rw);
TsRecord r3(3, "d1");
r3.add_point("s1", 3.0f);
r3.add_point("s2", 30);
- ASSERT_EQ(tw2.write_record(r3), E_OK);
+ ASSERT_EQ(tree_writer.write(r3), E_OK);
TsRecord r4(4, "d2");
r4.add_point("s1", 4.0f);
r4.add_point("s2", 40.0);
- ASSERT_EQ(tw2.write_record(r4), E_OK);
+ ASSERT_EQ(tree_writer.write(r4), E_OK);
- tw2.close();
+ tree_writer.close();
rw.close();
}
@@ -321,3 +322,68 @@ TEST_F(RestorableTsFileIOWriterTest, AlignedTimeseriesRecoverAndWrite) {
tw2.close();
rw.close();
}
+
+TEST_F(RestorableTsFileIOWriterTest, TableWriterRecoverAndWrite) {
+ std::vector measurement_schemas;
+ std::vector column_categories;
+ measurement_schemas.push_back(new MeasurementSchema("device", STRING));
+ measurement_schemas.push_back(new MeasurementSchema("value", DOUBLE));
+ column_categories.push_back(ColumnCategory::TAG);
+ column_categories.push_back(ColumnCategory::FIELD);
+ TableSchema table_schema("test_table", measurement_schemas,
+ column_categories);
+
+ int flags = O_WRONLY | O_CREAT | O_TRUNC;
+#ifdef _WIN32
+ flags |= O_BINARY;
+#endif
+ WriteFile write_file;
+ write_file.create(file_name_, flags, 0666);
+
+ TsFileTableWriter table_writer(&write_file, &table_schema);
+ Tablet tablet(table_schema.get_measurement_names(),
+ table_schema.get_data_types(), 10);
+ std::string table_name = "test_table";
+ tablet.set_table_name(table_name);
+ for (int i = 0; i < 10; i++) {
+ tablet.add_timestamp(i, static_cast(i));
+ tablet.add_value(i, "device", "device0");
+ tablet.add_value(i, "value", i * 1.1);
+ }
+ ASSERT_EQ(table_writer.write_table(tablet), E_OK);
+ ASSERT_EQ(table_writer.flush(), E_OK);
+ table_writer.close();
+ write_file.close();
+
+ std::streampos full_size;
+ {
+ std::ifstream f(file_name_, std::ios::binary | std::ios::ate);
+ full_size = f.tellg();
+ }
+ std::ofstream corrupt(file_name_, std::ios::binary | std::ios::in);
+ corrupt.seekp(full_size - std::streamoff(3));
+ corrupt.put(0);
+ corrupt.put(0);
+ corrupt.put(0);
+ corrupt.close();
+
+ RestorableTsFileIOWriter rw;
+ ASSERT_EQ(rw.open(file_name_, true), E_OK);
+ ASSERT_TRUE(rw.can_write());
+
+ TsFileTableWriter table_writer2(&rw);
+ // Java 规则:key=device_id.get_table_name()="device0";1 segment 时无
+ // __level 列,仅有 FIELD "value"
+ std::vector value_col = {"value"};
+ std::vector value_types = {DOUBLE};
+ Tablet tablet2(value_col, value_types, 10);
+ tablet2.set_table_name(table_name);
+ for (int i = 0; i < 10; i++) {
+ tablet2.add_timestamp(i, static_cast(i + 10));
+ tablet.add_value(i, "device", "device0");
+ tablet2.add_value(i, "value", (i + 10) * 1.1);
+ }
+ ASSERT_EQ(table_writer2.write_table(tablet2), E_OK);
+ table_writer2.close();
+ // rw.close();
+}
From cd3dab2da081b077a5140dd0ceb16faffdc0877e Mon Sep 17 00:00:00 2001
From: 761417898 <761417898@qq.com>
Date: Tue, 24 Feb 2026 18:01:17 +0800
Subject: [PATCH 05/23] fix readme logo
---
README-zh.md | 14 +++---
README.md | 15 +++----
cpp/README-zh.md | 112 +++++++++++++++++++++++++++++++++++++++++------
cpp/README.md | 14 +++---
4 files changed, 117 insertions(+), 38 deletions(-)
diff --git a/README-zh.md b/README-zh.md
index 8bcd39d98..1265d678a 100644
--- a/README-zh.md
+++ b/README-zh.md
@@ -21,14 +21,12 @@
[English](./README.md) | [中文](./README-zh.md)
# TsFile Document
-
-___________ ___________.__.__
-\__ ___/____\_ _____/|__| | ____
- | | / ___/| __) | | | _/ __ \
- | | \___ \ | \ | | |_\ ___/
- |____|/____ >\___ / |__|____/\___ > version 2.1.0
- \/ \/ \/
-
+
+
+
+
[](https://codecov.io/github/apache/tsfile)
[](http://search.maven.org/#search|gav|1|g:"org.apache.tsfile")
diff --git a/README.md b/README.md
index cbfb2bc23..089698b7b 100644
--- a/README.md
+++ b/README.md
@@ -21,14 +21,13 @@
[English](./README.md) | [中文](./README-zh.md)
# TsFile Document
-
-___________ ___________.__.__
-\__ ___/____\_ _____/|__| | ____
- | | / ___/| __) | | | _/ __ \
- | | \___ \ | \ | | |_\ ___/
- |____|/____ >\___ / |__|____/\___ > version 2.1.0
- \/ \/ \/
-
+
+
+
+
+
[](https://codecov.io/github/apache/tsfile)
[](http://search.maven.org/#search|gav|1|g:"org.apache.tsfile")
diff --git a/cpp/README-zh.md b/cpp/README-zh.md
index 6a26f2b95..2a8c84b74 100644
--- a/cpp/README-zh.md
+++ b/cpp/README-zh.md
@@ -19,16 +19,102 @@
-->
-# TsFile C++ Document
-
-___________ ___________.__.__
-\__ ___/____\_ _____/|__| | ____
- | | / ___/| __) | | | _/ __ \
- | | \___ \ | \ | | |_\ ___/
- |____|/____ >\___ / |__|____/\___ > version 2.1.0
- \/ \/ \/
-
-
-## 使用
-
-## 开发
\ No newline at end of file
+# TsFile C++ 文档
+
+
+
+
+
+## 简介
+
+本目录包含 TsFile 的 C++ 实现版本。目前,C++ 版本支持 TsFile 的查询与写入功能,包括基于时间过滤的查询。
+
+源代码位于 `./src` 目录。
+C/C++ 示例代码位于 `./examples` 目录。
+TsFile_cpp 的性能基准测试位于 `./bench_mark` 目录。
+
+此外,在 `./src/cwrapper` 目录中提供了 C 函数封装接口,Python 工具依赖该封装。
+
+---
+
+## 如何贡献
+
+我们使用 `clang-format` 来确保 C++ 代码遵循 `./clang-format` 文件中定义的一致规范(类似于 Google 风格)。
+
+欢迎提交任何 Bug 报告。
+你可以创建一个以 `[CPP]` 开头的 Issue 来描述问题,例如:
+https://github.com/apache/tsfile/issues/94
+
+---
+
+## 构建
+
+### 环境要求
+
+```bash
+sudo apt-get update
+sudo apt-get install -y cmake make g++ clang-format libuuid-dev
+```
+
+构建 tsfile:
+
+```bash
+bash build.sh
+```
+
+如果你安装了 Maven 工具,也可以运行:
+
+```bash
+mvn package -P with-cpp clean verify
+```
+
+构建完成后,可在 `./build` 目录下找到生成的共享库文件。
+
+在向 GitHub 提交代码之前,请确保 `mvn` 编译通过。
+
+---
+
+### Windows 下 MinGW 编译问题
+
+如果你在 Windows 下使用 MinGW 编译时遇到错误,可以尝试使用以下我们验证通过的版本:
+
+- GCC 14.2.0(**POSIX** 线程) + LLVM/Clang/LLD/LLDB 18.1.8 + MinGW-w64 12.0.0 UCRT - release 1
+- GCC 12.2.0 + LLVM/Clang/LLD/LLDB 16.0.0 + MinGW-w64 10.0.0(UCRT)- release 5
+- GCC 12.2.0 + LLVM/Clang/LLD/LLDB 16.0.0 + MinGW-w64 10.0.0(MSVCRT)- release 5
+- GCC 11.2.0 + MinGW-w64 10.0.0(MSVCRT)- release 1
+
+---
+
+## 配置交叉编译工具链
+
+修改工具链文件 `cmake/ToolChain.cmake`,定义以下变量:
+
+- `CMAKE_C_COMPILER`:指定 C 编译器路径。
+- `CMAKE_CXX_COMPILER`:指定 C++ 编译器路径。
+- `CMAKE_FIND_ROOT_PATH`:设置交叉编译环境的根路径(例如交叉编译工具链目录)。
+
+在 `cpp/` 目录下执行以下命令创建构建目录并开始编译:
+
+```bash
+mkdir build && cd build
+cmake .. -DToolChain=ON
+make
+```
+
+---
+
+## 使用 TsFile
+
+你可以在 `./examples/cpp_examples` 目录下的 `demo_read.cpp` 和 `demo_write.cpp` 中查看读写数据的示例。
+
+在 `./examples/c_examples` 目录下,还提供了使用 C 风格 API 在 C 环境中读写数据的示例。
+
+在 `./examples` 目录下执行:
+
+```bash
+bash build.sh
+```
+
+即可在 `./examples/build` 目录下生成可执行文件。
\ No newline at end of file
diff --git a/cpp/README.md b/cpp/README.md
index a98792eaa..e328413ca 100644
--- a/cpp/README.md
+++ b/cpp/README.md
@@ -21,15 +21,11 @@
# TsFile C++ Document
-
-___________ ___________.__.__
-\__ ___/____\_ _____/|__| | ____
- | | / ___/| __) | | | _/ __ \
- | | \___ \ | \ | | |_\ ___/
- |____|/____ >\___ / |__|____/\___ > version 2.1.0
- \/ \/ \/
-
-
+
+
+
## Introduction
From c64396137bc3d9f729af11a08b7f24b9bca37bd5 Mon Sep 17 00:00:00 2001
From: 761417898 <761417898@qq.com>
Date: Tue, 24 Feb 2026 18:09:52 +0800
Subject: [PATCH 06/23] fix readme logo
---
java/tsfile/README-zh.md | 13 ++++-----
java/tsfile/README.md | 13 ++++-----
python/README-zh.md | 61 +++++++++++++++++++++++++++++++---------
python/README.md | 13 ++++-----
4 files changed, 63 insertions(+), 37 deletions(-)
diff --git a/java/tsfile/README-zh.md b/java/tsfile/README-zh.md
index e97abb800..286befe52 100644
--- a/java/tsfile/README-zh.md
+++ b/java/tsfile/README-zh.md
@@ -21,14 +21,11 @@
[English](./README.md) | [中文](./README-zh.md)
# TsFile Java Document
-
-___________ ___________.__.__
-\__ ___/____\_ _____/|__| | ____
- | | / ___/| __) | | | _/ __ \
- | | \___ \ | \ | | |_\ ___/
- |____|/____ >\___ / |__|____/\___ > version 2.1.0
- \/ \/ \/
-
+
+
+
## 使用
diff --git a/java/tsfile/README.md b/java/tsfile/README.md
index c22556746..b9c4828fa 100644
--- a/java/tsfile/README.md
+++ b/java/tsfile/README.md
@@ -21,14 +21,11 @@
[English](./README.md) | [中文](./README-zh.md)
# TsFile Java Document
-
-___________ ___________.__.__
-\__ ___/____\_ _____/|__| | ____
- | | / ___/| __) | | | _/ __ \
- | | \___ \ | \ | | |_\ ___/
- |____|/____ >\___ / |__|____/\___ > version 2.1.0
- \/ \/ \/
-
+
+
+
## Use TsFile
diff --git a/python/README-zh.md b/python/README-zh.md
index 3c1a771f3..660c001e8 100644
--- a/python/README-zh.md
+++ b/python/README-zh.md
@@ -19,16 +19,51 @@
-->
-# TsFile Python Document
-
-___________ ___________.__.__
-\__ ___/____\_ _____/|__| | ____
- | | / ___/| __) | | | _/ __ \
- | | \___ \ | \ | | |_\ ___/
- |____|/____ >\___ / |__|____/\___ > version 21.0
- \/ \/ \/
-
-
-## 使用
-
-## 开发
\ No newline at end of file
+# TsFile Python 文档
+
+
+
+
+
+## 简介
+
+本目录包含 TsFile 的 Python 实现版本。Python 版本基于 C++ 版本构建,并通过 Cython 包将 TsFile 的读写能力集成到 Python 环境中。用户可以像在 Pandas 中使用 read_csv 和 write_csv 一样,方便地读取和写入 TsFile。
+
+源代码位于 `./tsfile` 目录。
+以 `.pyx` 和 `.pyd` 结尾的文件为使用 Cython 编写的封装代码。
+`tsfile/tsfile.py` 中定义了一些对用户开放的接口。
+
+你可以在 `./examples/examples.py` 中找到读写示例。
+
+---
+
+## 如何贡献
+
+建议使用 pylint 对 Python 代码进行检查。
+
+目前尚无合适的 Cython 代码风格检查工具,因此 Cython 部分代码应遵循 pylint 所要求的 Python 代码风格。
+
+**功能列表**
+
+- [ ] 在 pywrapper 中调用 TsFile C++ 版本实现的批量读取接口。
+- [ ] 支持将多个 DataFrame 写入同一个 TsFile 文件。
+
+---
+
+## 构建
+
+在构建 TsFile 的 Python 版本之前,必须先构建 [TsFile C++ 版本](../cpp/README.md),因为 Python 版本依赖于 C++ 版本生成的共享库文件。
+
+### 使用 Maven 在根目录构建
+
+```sh
+mvn -P with-cpp,with-python clean verify
+```
+
+### 使用 Python 命令构建
+
+```sh
+python setup.py build_ext --inplace
+```
\ No newline at end of file
diff --git a/python/README.md b/python/README.md
index 23af6eb7e..51cb498ec 100644
--- a/python/README.md
+++ b/python/README.md
@@ -21,14 +21,11 @@
# TsFile Python Document
-
-___________ ___________.__.__
-\__ ___/____\_ _____/|__| | ____
- | | / ___/| __) | | | _/ __ \
- | | \___ \ | \ | | |_\ ___/
- |____|/____ >\___ / |__|____/\___ > version 2.1.0
- \/ \/ \/
-
+
+
+
## Introduction
From 7a61f3ee3bb9f1bb7fedb5f89c0867c1348cf3c3 Mon Sep 17 00:00:00 2001
From: 761417898 <761417898@qq.com>
Date: Tue, 24 Feb 2026 18:35:32 +0800
Subject: [PATCH 07/23] fix readme badge
---
README-zh.md | 2 +-
README.md | 3 +--
2 files changed, 2 insertions(+), 3 deletions(-)
diff --git a/README-zh.md b/README-zh.md
index 1265d678a..34d3aa88d 100644
--- a/README-zh.md
+++ b/README-zh.md
@@ -28,7 +28,7 @@
[](https://codecov.io/github/apache/tsfile)
-[](http://search.maven.org/#search|gav|1|g:"org.apache.tsfile")
+[](https://central.sonatype.com/artifact/org.apache.tsfile/tsfile-parent)
## 简介
diff --git a/README.md b/README.md
index 089698b7b..2d98b8485 100644
--- a/README.md
+++ b/README.md
@@ -29,8 +29,7 @@
[](https://codecov.io/github/apache/tsfile)
-[](http://search.maven.org/#search|gav|1|g:"org.apache.tsfile")
-
+[](https://central.sonatype.com/artifact/org.apache.tsfile/tsfile-parent)
## Introduction
TsFile is a columnar storage file format designed for time series data, which supports efficient compression, high throughput of read and write, and compatibility with various frameworks, such as Spark and Flink. It is easy to integrate TsFile into IoT big data processing frameworks.
From 4efad3b52323e7f0961a4c0e931416f16448c17e Mon Sep 17 00:00:00 2001
From: 761417898 <761417898@qq.com>
Date: Thu, 26 Feb 2026 16:00:05 +0800
Subject: [PATCH 08/23] fix recovery tsfile statistic
---
cpp/src/file/restorable_tsfile_io_writer.cc | 273 ++++++++++++++++++++
1 file changed, 273 insertions(+)
diff --git a/cpp/src/file/restorable_tsfile_io_writer.cc b/cpp/src/file/restorable_tsfile_io_writer.cc
index f563c56a4..7f651607e 100644
--- a/cpp/src/file/restorable_tsfile_io_writer.cc
+++ b/cpp/src/file/restorable_tsfile_io_writer.cc
@@ -28,6 +28,9 @@
#include "common/allocator/byte_stream.h"
#include "common/device_id.h"
#include "common/statistic.h"
+#include "common/tsfile_common.h"
+#include "compress/compressor_factory.h"
+#include "encoding/decoder_factory.h"
#include "utils/errno_define.h"
#ifdef _WIN32
@@ -196,6 +199,252 @@ static int parse_chunk_header_and_skip(SelfCheckReader& reader,
return E_OK;
}
+/**
+ * Recover chunk-level statistic from chunk data so that tail metadata can be
+ * generated correctly after recovery (aligned with Java TsFileSequenceReader
+ * selfCheck). Multi-page: merge each page header's statistic. Single-page:
+ * decode page data and update stat. For aligned value chunks, time_batch
+ * (from the time chunk in the same group) must be provided.
+ */
+static int recover_chunk_statistic(
+ const ChunkHeader& chdr, const char* chunk_data, int32_t data_size,
+ Statistic* out_stat, common::PageArena* pa,
+ const std::vector* time_batch = nullptr,
+ std::vector* out_time_batch = nullptr) {
+ if (chunk_data == nullptr || data_size <= 0 || out_stat == nullptr) {
+ return E_OK;
+ }
+ common::ByteStream bs;
+ bs.wrap_from(const_cast(chunk_data),
+ static_cast(data_size));
+ const bool multi_page =
+ (static_cast(chdr.chunk_type_) & 0x3F) ==
+ static_cast(CHUNK_HEADER_MARKER);
+
+ if (multi_page) {
+ while (bs.remaining_size() > 0) {
+ PageHeader ph;
+ int ret = ph.deserialize_from(bs, true, chdr.data_type_);
+ if (ret != common::E_OK) {
+ return ret;
+ }
+ uint32_t comp = ph.compressed_size_;
+ if (ph.statistic_ != nullptr) {
+ if (out_stat->merge_with(ph.statistic_) != common::E_OK) {
+ ph.reset();
+ return common::E_TSFILE_CORRUPTED;
+ }
+ }
+ ph.reset();
+ bs.wrapped_buf_advance_read_pos(comp);
+ }
+ return E_OK;
+ }
+
+ /* Single-page: page header has no statistic; decompress and decode. */
+ const bool is_time_column =
+ (static_cast(chdr.chunk_type_) & 0x80) != 0;
+ PageHeader ph;
+ int ret = ph.deserialize_from(bs, false, chdr.data_type_);
+ if (ret != common::E_OK ||
+ ph.compressed_size_ == 0 ||
+ bs.remaining_size() < ph.compressed_size_) {
+ return E_OK;
+ }
+ const char* compressed_ptr =
+ chunk_data + (data_size - static_cast(bs.remaining_size()));
+ char* uncompressed_buf = nullptr;
+ uint32_t uncompressed_size = 0;
+ Compressor* compressor =
+ CompressorFactory::alloc_compressor(chdr.compression_type_);
+ if (compressor == nullptr) {
+ return common::E_OOM;
+ }
+ ret = compressor->reset(false);
+ if (ret != common::E_OK) {
+ CompressorFactory::free(compressor);
+ return ret;
+ }
+ ret = compressor->uncompress(
+ const_cast(compressed_ptr), ph.compressed_size_,
+ uncompressed_buf, uncompressed_size);
+ if (ret != common::E_OK ||
+ uncompressed_buf == nullptr ||
+ uncompressed_size != ph.uncompressed_size_) {
+ if (uncompressed_buf != nullptr) {
+ compressor->after_uncompress(uncompressed_buf);
+ }
+ CompressorFactory::free(compressor);
+ return (ret == common::E_OK) ? common::E_TSFILE_CORRUPTED : ret;
+ }
+ if (is_time_column) {
+ /* Time chunk: uncompressed = raw time stream only (no var_uint). */
+ Decoder* time_decoder = DecoderFactory::alloc_time_decoder();
+ if (time_decoder == nullptr) {
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return common::E_OOM;
+ }
+ common::ByteStream time_in;
+ time_in.wrap_from(uncompressed_buf, uncompressed_size);
+ time_decoder->reset();
+ int64_t t;
+ if (out_time_batch != nullptr) {
+ out_time_batch->clear();
+ }
+ while (time_decoder->has_remaining(time_in)) {
+ if (time_decoder->read_int64(t, time_in) != common::E_OK) {
+ break;
+ }
+ out_stat->update(t);
+ if (out_time_batch != nullptr) {
+ out_time_batch->push_back(t);
+ }
+ }
+ DecoderFactory::free(time_decoder);
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return E_OK;
+ }
+
+ /* Value chunk: parse layout and decode. */
+ const char* value_buf = nullptr;
+ uint32_t value_buf_size = 0;
+ std::vector time_decode_buf;
+ const std::vector* times = nullptr;
+
+ if (time_batch != nullptr && !time_batch->empty()) {
+ /* Aligned value page: uncompressed = ui32(size) + bitmap + value_buf */
+ if (uncompressed_size < 4) {
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return E_OK;
+ }
+ uint32_t num_values =
+ (static_cast(static_cast(uncompressed_buf[0])) << 24) |
+ (static_cast(static_cast(uncompressed_buf[1])) << 16) |
+ (static_cast(static_cast(uncompressed_buf[2])) << 8) |
+ (static_cast(static_cast(uncompressed_buf[3])));
+ uint32_t bitmap_size = (num_values + 7) / 8;
+ if (uncompressed_size < 4 + bitmap_size) {
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return E_OK;
+ }
+ value_buf = uncompressed_buf + 4 + bitmap_size;
+ value_buf_size = uncompressed_size - 4 - bitmap_size;
+ times = time_batch;
+ } else {
+ /* Non-aligned: var_uint(time_buf_size) + time_buf + value_buf */
+ int var_size = 0;
+ uint32_t time_buf_size = 0;
+ ret = common::SerializationUtil::read_var_uint(
+ time_buf_size, uncompressed_buf, static_cast(uncompressed_size),
+ &var_size);
+ if (ret != common::E_OK ||
+ static_cast(var_size) + time_buf_size > uncompressed_size) {
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return (ret == common::E_OK) ? common::E_TSFILE_CORRUPTED : ret;
+ }
+ const char* time_buf = uncompressed_buf + var_size;
+ value_buf = time_buf + time_buf_size;
+ value_buf_size =
+ uncompressed_size - static_cast(var_size) - time_buf_size;
+ Decoder* time_decoder = DecoderFactory::alloc_time_decoder();
+ if (time_decoder == nullptr) {
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return common::E_OOM;
+ }
+ common::ByteStream time_in;
+ time_in.wrap_from(const_cast(time_buf), time_buf_size);
+ time_decoder->reset();
+ time_decode_buf.clear();
+ int64_t t;
+ while (time_decoder->has_remaining(time_in)) {
+ if (time_decoder->read_int64(t, time_in) != common::E_OK) {
+ break;
+ }
+ time_decode_buf.push_back(t);
+ }
+ DecoderFactory::free(time_decoder);
+ times = &time_decode_buf;
+ }
+
+ Decoder* value_decoder = DecoderFactory::alloc_value_decoder(
+ chdr.encoding_type_, chdr.data_type_);
+ if (value_decoder == nullptr) {
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return common::E_OOM;
+ }
+ common::ByteStream value_in;
+ value_in.wrap_from(const_cast(value_buf), value_buf_size);
+ value_decoder->reset();
+ size_t idx = 0;
+ const size_t num_times = times->size();
+ while (idx < num_times && value_decoder->has_remaining(value_in)) {
+ int64_t t = (*times)[idx];
+ switch (chdr.data_type_) {
+ case common::BOOLEAN: {
+ bool v;
+ if (value_decoder->read_boolean(v, value_in) == common::E_OK) {
+ out_stat->update(t, v);
+ }
+ break;
+ }
+ case common::INT32:
+ case common::DATE: {
+ int32_t v;
+ if (value_decoder->read_int32(v, value_in) == common::E_OK) {
+ out_stat->update(t, v);
+ }
+ break;
+ }
+ case common::INT64:
+ case common::TIMESTAMP: {
+ int64_t v;
+ if (value_decoder->read_int64(v, value_in) == common::E_OK) {
+ out_stat->update(t, v);
+ }
+ break;
+ }
+ case common::FLOAT: {
+ float v;
+ if (value_decoder->read_float(v, value_in) == common::E_OK) {
+ out_stat->update(t, v);
+ }
+ break;
+ }
+ case common::DOUBLE: {
+ double v;
+ if (value_decoder->read_double(v, value_in) == common::E_OK) {
+ out_stat->update(t, v);
+ }
+ break;
+ }
+ case common::TEXT:
+ case common::BLOB:
+ case common::STRING: {
+ common::String v;
+ if (pa != nullptr &&
+ value_decoder->read_String(v, *pa, value_in) == common::E_OK) {
+ out_stat->update(t, v);
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ idx++;
+ }
+ DecoderFactory::free(value_decoder);
+ compressor->after_uncompress(uncompressed_buf);
+ CompressorFactory::free(compressor);
+ return E_OK;
+}
+
} // namespace
RestorableTsFileIOWriter::RestorableTsFileIOWriter()
@@ -343,6 +592,7 @@ int RestorableTsFileIOWriter::self_check(bool truncate_corrupted) {
std::shared_ptr cur_device_id;
ChunkGroupMeta* cur_cgm = nullptr;
std::vector recovered_cgm_list;
+ std::vector cur_group_time_batch;
auto flush_chunk_group = [this, &cur_device_id, &cur_cgm,
&recovered_cgm_list]() {
@@ -371,6 +621,7 @@ int RestorableTsFileIOWriter::self_check(bool truncate_corrupted) {
if (marker == static_cast(CHUNK_GROUP_HEADER_MARKER)) {
truncated = pos - 1;
flush_chunk_group();
+ cur_group_time_batch.clear();
int seg_len = 0;
ret = reader.read(static_cast(pos), buf.data(), BUF_SIZE,
read_len);
@@ -444,6 +695,28 @@ int RestorableTsFileIOWriter::self_check(bool truncate_corrupted) {
break;
}
stat->reset();
+ if (chdr.data_size_ > 0) {
+ const int32_t header_len =
+ static_cast(consumed) - chdr.data_size_;
+ if (header_len > 0 &&
+ chunk_start + consumed <= static_cast(file_size)) {
+ std::vector chunk_data(chdr.data_size_);
+ int32_t read_len = 0;
+ ret = reader.read(
+ static_cast(chunk_start + header_len),
+ chunk_data.data(), chdr.data_size_, read_len);
+ if (ret == E_OK && read_len == chdr.data_size_) {
+ ret = recover_chunk_statistic(
+ chdr, chunk_data.data(), chdr.data_size_, stat,
+ &self_check_arena_,
+ &cur_group_time_batch,
+ &cur_group_time_batch);
+ }
+ if (ret != E_OK) {
+ break;
+ }
+ }
+ }
cm->init(mname,
static_cast(chdr.data_type_),
chunk_start, stat, 0,
From 38230adc40e5c9989b04bb1f5f41e451e7b7e113 Mon Sep 17 00:00:00 2001
From: 761417898 <761417898@qq.com>
Date: Thu, 26 Feb 2026 19:49:22 +0800
Subject: [PATCH 09/23] fix recovery tsfile append and reader
---
cpp/src/file/restorable_tsfile_io_writer.cc | 36 +++++++++
cpp/src/file/tsfile_io_writer.cc | 19 ++++-
cpp/src/file/tsfile_io_writer.h | 9 +++
cpp/src/file/write_file.cc | 13 +++
cpp/src/file/write_file.h | 4 +-
.../file/restorable_tsfile_io_writer_test.cc | 80 ++++++++++++++++---
6 files changed, 148 insertions(+), 13 deletions(-)
diff --git a/cpp/src/file/restorable_tsfile_io_writer.cc b/cpp/src/file/restorable_tsfile_io_writer.cc
index 7f651607e..7d3ac087a 100644
--- a/cpp/src/file/restorable_tsfile_io_writer.cc
+++ b/cpp/src/file/restorable_tsfile_io_writer.cc
@@ -19,6 +19,7 @@
#include "file/restorable_tsfile_io_writer.h"
+#include
#include
#include
@@ -764,6 +765,41 @@ int RestorableTsFileIOWriter::self_check(bool truncate_corrupted) {
return ret;
}
+ // Restore write_stream_ from file content so cur_file_position() is correct
+ // when generating tail metadata. Flush will skip these leading bytes.
+ file_size = write_file_->get_position();
+ if (file_size > 0) {
+ const int read_chunk = 65536;
+ std::vector read_buf(read_chunk);
+ int64_t offset = 0;
+ while (offset < file_size) {
+ int64_t to_read = std::min(static_cast(read_chunk),
+ file_size - offset);
+ ssize_t nr = -1;
+#ifdef _WIN32
+ nr = pread(write_file_->get_fd(), read_buf.data(),
+ static_cast(to_read), static_cast(offset));
+#else
+ nr = ::pread(write_file_->get_fd(), read_buf.data(),
+ static_cast(to_read), offset);
+#endif
+ if (nr <= 0) {
+ ret = E_FILE_READ_ERR;
+ break;
+ }
+ if (write_buf(read_buf.data(), static_cast(nr)) != E_OK) {
+ ret = E_FILE_WRITE_ERR;
+ break;
+ }
+ offset += nr;
+ }
+ if (ret == E_OK) {
+ set_flush_skip_leading(file_size);
+ } else {
+ return ret;
+ }
+ }
+
for (ChunkGroupMeta* cgm : recovered_cgm_list) {
push_chunk_group_meta(cgm);
}
diff --git a/cpp/src/file/tsfile_io_writer.cc b/cpp/src/file/tsfile_io_writer.cc
index dadadb5dc..a08ec6ed3 100644
--- a/cpp/src/file/tsfile_io_writer.cc
+++ b/cpp/src/file/tsfile_io_writer.cc
@@ -858,6 +858,8 @@ int TsFileIOWriter::clone_node_list(
/*
* TODO:
* when finish flushing stream to file, reclaim memory used by stream
+ * When flush_skip_leading_ > 0 (recovery path), the first N bytes in the
+ * stream are already on disk; only skip consuming them and write the rest.
*/
int TsFileIOWriter::flush_stream_to_file() {
int ret = E_OK;
@@ -866,10 +868,21 @@ int TsFileIOWriter::flush_stream_to_file() {
write_stream_consumer_.get_next_buf(write_stream_);
if (b.buf_ == nullptr) {
break;
- } else {
- if (RET_FAIL(file_->write(b.buf_, b.len_))) {
- break;
+ }
+ uint32_t write_off = 0;
+ uint32_t write_len = b.len_;
+ if (flush_skip_leading_ > 0) {
+ if (static_cast(b.len_) <= flush_skip_leading_) {
+ flush_skip_leading_ -= static_cast(b.len_);
+ continue;
}
+ write_off = static_cast(flush_skip_leading_);
+ write_len = b.len_ - write_off;
+ flush_skip_leading_ = 0;
+ }
+ if (write_len > 0 &&
+ RET_FAIL(file_->write(b.buf_ + write_off, write_len))) {
+ break;
}
}
diff --git a/cpp/src/file/tsfile_io_writer.h b/cpp/src/file/tsfile_io_writer.h
index 151af97c6..f44e606b4 100644
--- a/cpp/src/file/tsfile_io_writer.h
+++ b/cpp/src/file/tsfile_io_writer.h
@@ -204,6 +204,11 @@ class TsFileIOWriter {
}
/** If true, destroy() skips chunk_group_meta_list_ (entries from arena). */
bool chunk_group_meta_from_recovery_ = false;
+ /**
+ * For recovery only: number of leading bytes in write_stream_ that are
+ * already on disk. flush_stream_to_file() will skip writing these.
+ */
+ void set_flush_skip_leading(int64_t n) { flush_skip_leading_ = n; }
private:
common::PageArena meta_allocator_;
@@ -224,6 +229,10 @@ class TsFileIOWriter {
std::string encrypt_type_;
std::string encrypt_key_;
bool is_aligned_;
+ /** Recovery only: skip this many leading bytes when flushing (already on disk). */
+ int64_t flush_skip_leading_ = 0;
+
+ friend class RestorableTsFileIOWriter;
};
} // end namespace storage
diff --git a/cpp/src/file/write_file.cc b/cpp/src/file/write_file.cc
index e0ce619da..f93191664 100644
--- a/cpp/src/file/write_file.cc
+++ b/cpp/src/file/write_file.cc
@@ -168,6 +168,19 @@ int WriteFile::seek_to_end() {
return E_OK;
}
+int64_t WriteFile::get_position() {
+ if (fd_ < 0) {
+ return 0;
+ }
+#ifdef _WIN32
+ int64_t pos = _lseeki64(fd_, 0, SEEK_CUR);
+ return (pos < 0) ? 0 : pos;
+#else
+ off_t pos = ::lseek(fd_, 0, SEEK_CUR);
+ return (pos < 0) ? 0 : static_cast(pos);
+#endif
+}
+
} // end namespace storage
#ifdef _WIN32
diff --git a/cpp/src/file/write_file.h b/cpp/src/file/write_file.h
index 85260a3be..129f49962 100644
--- a/cpp/src/file/write_file.h
+++ b/cpp/src/file/write_file.h
@@ -34,7 +34,6 @@ class WriteFile {
#ifndef LIBTSFILE_SDK
WriteFile() : path_(), file_id_(), fd_(-1) {}
FORCE_INLINE common::FileID get_file_id() const { return file_id_; }
- FORCE_INLINE int get_fd() const { return fd_; }
int create(const common::FileID& file_id, int flags, mode_t mode);
#else
WriteFile() : path_(), fd_(-1) {}
@@ -48,6 +47,9 @@ class WriteFile {
int truncate(int64_t size);
int seek_to_end();
FORCE_INLINE std::string get_file_path() { return path_; }
+ /** Current file offset (for recovery: used as file size after seek_to_end). */
+ int64_t get_position();
+ FORCE_INLINE int get_fd() const { return fd_; }
private:
int do_create(int flags, mode_t mode);
diff --git a/cpp/test/file/restorable_tsfile_io_writer_test.cc b/cpp/test/file/restorable_tsfile_io_writer_test.cc
index 7a6b6f507..48c8a2ae7 100644
--- a/cpp/test/file/restorable_tsfile_io_writer_test.cc
+++ b/cpp/test/file/restorable_tsfile_io_writer_test.cc
@@ -30,8 +30,13 @@
#include "file/write_file.h"
#include "writer/tsfile_table_writer.h"
#include "writer/tsfile_tree_writer.h"
+#include "reader/tsfile_tree_reader.h"
+#include "reader/tsfile_reader.h"
#include "writer/tsfile_writer.h"
+namespace storage {
+class ResultSet;
+}
using namespace storage;
using namespace common;
@@ -264,8 +269,29 @@ TEST_F(RestorableTsFileIOWriterTest, MultiDeviceRecoverAndWriteWithTreeWriter) {
r4.add_point("s2", 40.0);
ASSERT_EQ(tree_writer.write(r4), E_OK);
+ tree_writer.flush();
tree_writer.close();
- rw.close();
+
+ TsFileTreeReader reader;
+ reader.open(file_name_);
+ auto device_ids = reader.get_all_device_ids();
+ ASSERT_EQ(device_ids.size(), 2);
+
+ std::vector measurement_ids{"s1", "s2"};
+ ResultSet* result;
+ int ret =
+ reader.query(device_ids, measurement_ids, INT64_MIN, INT64_MAX, result);
+ ASSERT_EQ(ret, E_OK);
+ auto iter = result->iterator();
+ RowRecord* read_record;
+ int row_cnt = 0;
+ while (iter.hasNext()) {
+ read_record = iter.next();
+ row_cnt += 1;
+ }
+ ASSERT_EQ(row_cnt, 4);
+ reader.destroy_query_data_set(result);
+ reader.close();
}
TEST_F(RestorableTsFileIOWriterTest, AlignedTimeseriesRecoverAndWrite) {
@@ -318,9 +344,30 @@ TEST_F(RestorableTsFileIOWriterTest, AlignedTimeseriesRecoverAndWrite) {
r3.add_point("s1", 5.0f);
r3.add_point("s2", 6.0f);
ASSERT_EQ(tw2.write_record_aligned(r3), E_OK);
-
+ tw2.flush();
tw2.close();
- rw.close();
+
+
+ TsFileTreeReader reader;
+ reader.open(file_name_);
+ auto device_ids = reader.get_all_device_ids();
+ ASSERT_EQ(device_ids.size(), 1);
+
+ std::vector measurement_ids{"s1", "s2"};
+ ResultSet* result;
+ int ret =
+ reader.query(device_ids, measurement_ids, INT64_MIN, INT64_MAX, result);
+ ASSERT_EQ(ret, E_OK);
+ auto iter = result->iterator();
+ RowRecord* read_record;
+ int row_cnt = 0;
+ while (iter.hasNext()) {
+ read_record = iter.next();
+ row_cnt += 1;
+ }
+ ASSERT_EQ(row_cnt, 3);
+ reader.destroy_query_data_set(result);
+ reader.close();
}
TEST_F(RestorableTsFileIOWriterTest, TableWriterRecoverAndWrite) {
@@ -372,18 +419,33 @@ TEST_F(RestorableTsFileIOWriterTest, TableWriterRecoverAndWrite) {
ASSERT_TRUE(rw.can_write());
TsFileTableWriter table_writer2(&rw);
- // Java 规则:key=device_id.get_table_name()="device0";1 segment 时无
- // __level 列,仅有 FIELD "value"
- std::vector value_col = {"value"};
- std::vector value_types = {DOUBLE};
+ std::vector value_col = {"__level1", "value"};
+ std::vector value_types = {STRING, DOUBLE};
Tablet tablet2(value_col, value_types, 10);
tablet2.set_table_name(table_name);
for (int i = 0; i < 10; i++) {
tablet2.add_timestamp(i, static_cast(i + 10));
- tablet.add_value(i, "device", "device0");
+ tablet2.add_value(i, "__level1", "device0");
tablet2.add_value(i, "value", (i + 10) * 1.1);
}
ASSERT_EQ(table_writer2.write_table(tablet2), E_OK);
+ table_writer2.flush();
table_writer2.close();
- // rw.close();
+
+ TsFileReader table_reader;
+ ASSERT_EQ(table_reader.open(file_name_), E_OK);
+
+
+ ResultSet* tmp_result_set = nullptr;
+ table_reader.query("test_table",
+ {"__level1", "value"}, 0, 10000,
+ tmp_result_set, nullptr);
+ auto* table_result_set = (TableResultSet*)tmp_result_set;
+ bool has_next = false;
+ int64_t row_num = 0;
+ while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
+ auto record = table_result_set->get_row_record();
+ row_num++;
+ }
+ ASSERT_EQ(row_num, 20);
}
From cb1f9d186dcc1232f15ba40676d3528c36c2b59b Mon Sep 17 00:00:00 2001
From: 761417898 <761417898@qq.com>
Date: Fri, 27 Feb 2026 09:21:51 +0800
Subject: [PATCH 10/23] Refactor RestorableTsFileIOWriterTest
---
.../file/restorable_tsfile_io_writer_test.cc | 715 ++++++++----------
1 file changed, 334 insertions(+), 381 deletions(-)
diff --git a/cpp/test/file/restorable_tsfile_io_writer_test.cc b/cpp/test/file/restorable_tsfile_io_writer_test.cc
index 48c8a2ae7..548ccf637 100644
--- a/cpp/test/file/restorable_tsfile_io_writer_test.cc
+++ b/cpp/test/file/restorable_tsfile_io_writer_test.cc
@@ -17,10 +17,15 @@
* under the License.
*/
+/**
+ * Unit tests for RestorableTsFileIOWriter.
+ * Covers: empty/invalid/complete file open, truncate recovery, continued write
+ * with TsFileWriter/TsFileTreeWriter/TsFileTableWriter, and read-back verify.
+ */
+
#include "file/restorable_tsfile_io_writer.h"
#include
-
#include
#include "common/record.h"
@@ -28,10 +33,10 @@
#include "common/tablet.h"
#include "common/tsfile_common.h"
#include "file/write_file.h"
+#include "reader/tsfile_reader.h"
+#include "reader/tsfile_tree_reader.h"
#include "writer/tsfile_table_writer.h"
#include "writer/tsfile_tree_writer.h"
-#include "reader/tsfile_tree_reader.h"
-#include "reader/tsfile_reader.h"
#include "writer/tsfile_writer.h"
namespace storage {
@@ -40,412 +45,360 @@ class ResultSet;
using namespace storage;
using namespace common;
+// -----------------------------------------------------------------------------
+// Helpers used by multiple tests (file flags, file size, corrupt tail)
+// -----------------------------------------------------------------------------
+
+static int GetWriteCreateFlags() {
+ int flags = O_WRONLY | O_CREAT | O_TRUNC;
+#ifdef _WIN32
+ flags |= O_BINARY;
+#endif
+ return flags;
+}
+
+static int64_t GetFileSize(const std::string& path) {
+ std::ifstream f(path, std::ios::binary | std::ios::ate);
+ return static_cast(f.tellg());
+}
+
+/** Overwrite the last num_bytes of the file with zeros to simulate corruption. */
+static void CorruptFileTail(const std::string& path, int num_bytes) {
+ const int64_t full_size = GetFileSize(path);
+ std::ofstream out(path, std::ios::binary | std::ios::in);
+ out.seekp(full_size - static_cast(num_bytes));
+ for (int i = 0; i < num_bytes; ++i) {
+ out.put(0);
+ }
+ out.close();
+}
+
+/** Query tree reader and return row count; destroys query result. */
+static int CountTreeReaderRows(TsFileTreeReader& reader,
+ const std::vector& measurement_ids) {
+ auto device_ids = reader.get_all_device_ids();
+ ResultSet* result = nullptr;
+ int ret = reader.query(device_ids, measurement_ids, INT64_MIN, INT64_MAX,
+ result);
+ if (ret != E_OK || result == nullptr) {
+ return -1;
+ }
+ int count = 0;
+ for (auto it = result->iterator(); it.hasNext(); it.next()) {
+ ++count;
+ }
+ reader.destroy_query_data_set(result);
+ return count;
+}
+
+// -----------------------------------------------------------------------------
+// Test fixture
+// -----------------------------------------------------------------------------
+
class RestorableTsFileIOWriterTest : public ::testing::Test {
- protected:
- void SetUp() override {
- libtsfile_init();
- file_name_ = "restorable_tsfile_io_writer_test.tsfile";
- remove(file_name_.c_str());
- }
-
- void TearDown() override {
- remove(file_name_.c_str());
- libtsfile_destroy();
- }
-
- std::string file_name_;
+ protected:
+ void SetUp() override {
+ libtsfile_init();
+ file_name_ = "restorable_tsfile_io_writer_test.tsfile";
+ remove(file_name_.c_str());
+ }
+
+ void TearDown() override {
+ remove(file_name_.c_str());
+ libtsfile_destroy();
+ }
+
+ int64_t GetCurrentFileSize() const { return GetFileSize(file_name_); }
+ void CorruptCurrentFileTail(int num_bytes) {
+ CorruptFileTail(file_name_, num_bytes);
+ }
+
+ std::string file_name_;
};
+// -----------------------------------------------------------------------------
+// Open behavior: empty file, bad magic, complete file, truncated file, header-only
+// -----------------------------------------------------------------------------
+
TEST_F(RestorableTsFileIOWriterTest, OpenEmptyFile) {
- RestorableTsFileIOWriter writer;
- ASSERT_EQ(writer.open(file_name_, true), E_OK);
- EXPECT_TRUE(writer.can_write());
- EXPECT_TRUE(writer.has_crashed());
- EXPECT_EQ(writer.get_truncated_size(), 0);
- EXPECT_NE(writer.get_tsfile_io_writer(), nullptr);
- writer.close();
+ RestorableTsFileIOWriter writer;
+ ASSERT_EQ(writer.open(file_name_, true), E_OK);
+ EXPECT_TRUE(writer.can_write());
+ EXPECT_TRUE(writer.has_crashed());
+ EXPECT_EQ(writer.get_truncated_size(), 0);
+ EXPECT_NE(writer.get_tsfile_io_writer(), nullptr);
+ writer.close();
}
TEST_F(RestorableTsFileIOWriterTest, OpenBadMagicFile) {
- std::ofstream f(file_name_);
- f.write("BadFile", 7);
- f.close();
-
- RestorableTsFileIOWriter writer;
- EXPECT_NE(writer.open(file_name_, true), E_OK);
- EXPECT_EQ(writer.get_truncated_size(), TSFILE_CHECK_INCOMPATIBLE);
- writer.close();
+ std::ofstream f(file_name_);
+ f.write("BadFile", 7);
+ f.close();
+
+ RestorableTsFileIOWriter writer;
+ EXPECT_NE(writer.open(file_name_, true), E_OK);
+ EXPECT_EQ(writer.get_truncated_size(), TSFILE_CHECK_INCOMPATIBLE);
+ writer.close();
}
TEST_F(RestorableTsFileIOWriterTest, OpenCompleteFile) {
- TsFileWriter tw;
- int flags = O_WRONLY | O_CREAT | O_TRUNC;
-#ifdef _WIN32
- flags |= O_BINARY;
-#endif
- ASSERT_EQ(tw.open(file_name_, flags, 0666), E_OK);
- tw.register_timeseries(
- "d1",
- MeasurementSchema("s1", FLOAT, GORILLA, CompressionType::UNCOMPRESSED));
- TsRecord record(1, "d1");
- record.add_point("s1", 1.0f);
- tw.write_record(record);
-
- record.timestamp_ = 2;
- tw.write_record(record);
-
- tw.flush();
- tw.close();
-
- RestorableTsFileIOWriter writer;
- ASSERT_EQ(writer.open(file_name_, true), E_OK);
- EXPECT_FALSE(writer.can_write());
- EXPECT_FALSE(writer.has_crashed());
- EXPECT_EQ(writer.get_truncated_size(), TSFILE_CHECK_COMPLETE);
- EXPECT_EQ(writer.get_tsfile_io_writer(), nullptr);
- writer.close();
+ TsFileWriter tw;
+ ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK);
+ tw.register_timeseries(
+ "d1",
+ MeasurementSchema("s1", FLOAT, GORILLA, CompressionType::UNCOMPRESSED));
+ TsRecord record(1, "d1");
+ record.add_point("s1", 1.0f);
+ tw.write_record(record);
+ record.timestamp_ = 2;
+ tw.write_record(record);
+ tw.flush();
+ tw.close();
+
+ RestorableTsFileIOWriter writer;
+ ASSERT_EQ(writer.open(file_name_, true), E_OK);
+ EXPECT_FALSE(writer.can_write());
+ EXPECT_FALSE(writer.has_crashed());
+ EXPECT_EQ(writer.get_truncated_size(), TSFILE_CHECK_COMPLETE);
+ EXPECT_EQ(writer.get_tsfile_io_writer(), nullptr);
+ writer.close();
}
TEST_F(RestorableTsFileIOWriterTest, OpenTruncatedFile) {
- TsFileWriter tw;
- int flags = O_WRONLY | O_CREAT | O_TRUNC;
-#ifdef _WIN32
- flags |= O_BINARY;
-#endif
- ASSERT_EQ(tw.open(file_name_, flags, 0666), E_OK);
- tw.register_timeseries(
- "d1",
- MeasurementSchema("s1", FLOAT, RLE, CompressionType::UNCOMPRESSED));
- TsRecord record(1, "d1");
- record.add_point("s1", 1.0f);
- tw.write_record(record);
- tw.flush();
- tw.close();
-
- std::streampos full_size;
- {
- std::ifstream f(file_name_, std::ios::binary | std::ios::ate);
- full_size = f.tellg();
- }
-
- std::ofstream corrupt(file_name_, std::ios::binary | std::ios::in);
- corrupt.seekp(full_size - std::streamoff(5));
- corrupt.put(0);
- corrupt.put(0);
- corrupt.put(0);
- corrupt.put(0);
- corrupt.put(0);
- corrupt.close();
-
- RestorableTsFileIOWriter writer;
- ASSERT_EQ(writer.open(file_name_, true), E_OK);
- EXPECT_TRUE(writer.can_write());
- EXPECT_TRUE(writer.has_crashed());
- EXPECT_GE(writer.get_truncated_size(),
- static_cast(MAGIC_STRING_TSFILE_LEN + 1));
- EXPECT_LE(writer.get_truncated_size(), static_cast(full_size));
- EXPECT_NE(writer.get_tsfile_io_writer(), nullptr);
- writer.close();
+ TsFileWriter tw;
+ ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK);
+ tw.register_timeseries(
+ "d1",
+ MeasurementSchema("s1", FLOAT, RLE, CompressionType::UNCOMPRESSED));
+ TsRecord record(1, "d1");
+ record.add_point("s1", 1.0f);
+ tw.write_record(record);
+ tw.flush();
+ tw.close();
+
+ const int64_t full_size = GetCurrentFileSize();
+ CorruptCurrentFileTail(5);
+
+ RestorableTsFileIOWriter writer;
+ ASSERT_EQ(writer.open(file_name_, true), E_OK);
+ EXPECT_TRUE(writer.can_write());
+ EXPECT_TRUE(writer.has_crashed());
+ EXPECT_GE(writer.get_truncated_size(),
+ static_cast(MAGIC_STRING_TSFILE_LEN + 1));
+ EXPECT_LE(writer.get_truncated_size(), full_size);
+ EXPECT_NE(writer.get_tsfile_io_writer(), nullptr);
+ writer.close();
}
TEST_F(RestorableTsFileIOWriterTest, OpenFileWithOnlyHeader) {
- WriteFile wf;
- int flags = O_RDWR | O_CREAT | O_TRUNC;
+ int flags = O_RDWR | O_CREAT | O_TRUNC;
#ifdef _WIN32
- flags |= O_BINARY;
+ flags |= O_BINARY;
#endif
- ASSERT_EQ(wf.create(file_name_, flags, 0666), E_OK);
- wf.write(MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN);
- wf.write(&VERSION_NUM_BYTE, 1);
- wf.close();
-
- RestorableTsFileIOWriter writer;
- ASSERT_EQ(writer.open(file_name_, true), E_OK);
- EXPECT_TRUE(writer.can_write());
- EXPECT_TRUE(writer.has_crashed());
- EXPECT_EQ(writer.get_truncated_size(), MAGIC_STRING_TSFILE_LEN + 1);
- writer.close();
+ WriteFile wf;
+ ASSERT_EQ(wf.create(file_name_, flags, 0666), E_OK);
+ wf.write(MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN);
+ wf.write(&VERSION_NUM_BYTE, 1);
+ wf.close();
+
+ RestorableTsFileIOWriter writer;
+ ASSERT_EQ(writer.open(file_name_, true), E_OK);
+ EXPECT_TRUE(writer.can_write());
+ EXPECT_TRUE(writer.has_crashed());
+ EXPECT_EQ(writer.get_truncated_size(), MAGIC_STRING_TSFILE_LEN + 1);
+ writer.close();
}
+// -----------------------------------------------------------------------------
+// Recovery + continued write with TsFileWriter (tree model)
+// -----------------------------------------------------------------------------
+
TEST_F(RestorableTsFileIOWriterTest, TruncateRecoversAndProvidesWriter) {
- TsFileWriter tw;
- int flags = O_WRONLY | O_CREAT | O_TRUNC;
-#ifdef _WIN32
- flags |= O_BINARY;
-#endif
- ASSERT_EQ(tw.open(file_name_, flags, 0666), E_OK);
- tw.register_timeseries(
- "d1",
- MeasurementSchema("s1", FLOAT, GORILLA, CompressionType::UNCOMPRESSED));
- TsRecord record(1, "d1");
- record.add_point("s1", 1.0f);
- tw.write_record(record);
-
- record.timestamp_ = 2;
- tw.write_record(record);
-
- tw.flush();
- tw.close();
-
- std::streampos full_size;
- {
- std::ifstream f(file_name_, std::ios::binary | std::ios::ate);
- full_size = f.tellg();
- }
-
- std::ofstream corrupt(file_name_, std::ios::binary | std::ios::in);
- corrupt.seekp(full_size - std::streamoff(3));
- corrupt.put(0);
- corrupt.put(0);
- corrupt.put(0);
- corrupt.close();
-
- RestorableTsFileIOWriter rw;
- ASSERT_EQ(rw.open(file_name_, true), E_OK);
- ASSERT_TRUE(rw.can_write());
- ASSERT_NE(rw.get_tsfile_io_writer(), nullptr);
- ASSERT_NE(rw.get_write_file(), nullptr);
- EXPECT_EQ(rw.get_file_path(), file_name_);
-
- TsFileWriter tw2;
- ASSERT_EQ(tw2.init(&rw), E_OK);
- TsRecord record2(3, "d1");
- record2.add_point("s1", 3.0f);
- ASSERT_EQ(tw2.write_record(record2), E_OK);
- tw2.close();
- rw.close();
+ TsFileWriter tw;
+ ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK);
+ tw.register_timeseries(
+ "d1",
+ MeasurementSchema("s1", FLOAT, GORILLA, CompressionType::UNCOMPRESSED));
+ TsRecord record(1, "d1");
+ record.add_point("s1", 1.0f);
+ tw.write_record(record);
+ record.timestamp_ = 2;
+ tw.write_record(record);
+ tw.flush();
+ tw.close();
+
+ CorruptCurrentFileTail(3);
+
+ RestorableTsFileIOWriter rw;
+ ASSERT_EQ(rw.open(file_name_, true), E_OK);
+ ASSERT_TRUE(rw.can_write());
+ ASSERT_NE(rw.get_tsfile_io_writer(), nullptr);
+ ASSERT_NE(rw.get_write_file(), nullptr);
+ EXPECT_EQ(rw.get_file_path(), file_name_);
+
+ TsFileWriter tw2;
+ ASSERT_EQ(tw2.init(&rw), E_OK);
+ TsRecord record2(3, "d1");
+ record2.add_point("s1", 3.0f);
+ ASSERT_EQ(tw2.write_record(record2), E_OK);
+ tw2.close();
+ rw.close();
}
+// -----------------------------------------------------------------------------
+// Recovery + continued write with TsFileTreeWriter, then read-back verify
+// -----------------------------------------------------------------------------
+
TEST_F(RestorableTsFileIOWriterTest, MultiDeviceRecoverAndWriteWithTreeWriter) {
- TsFileWriter tw;
- int flags = O_WRONLY | O_CREAT | O_TRUNC;
-#ifdef _WIN32
- flags |= O_BINARY;
-#endif
- ASSERT_EQ(tw.open(file_name_, flags, 0666), E_OK);
- tw.register_timeseries("d1", MeasurementSchema("s1", FLOAT));
- tw.register_timeseries("d1", MeasurementSchema("s2", INT32));
- tw.register_timeseries("d2", MeasurementSchema("s1", FLOAT));
- tw.register_timeseries("d2", MeasurementSchema("s2", DOUBLE));
-
- TsRecord r1(1, "d1");
- r1.add_point("s1", 1.0f);
- r1.add_point("s2", 10);
- ASSERT_EQ(tw.write_record(r1), E_OK);
-
- TsRecord r2(2, "d2");
- r2.add_point("s1", 2.0f);
- r2.add_point("s2", 20.0);
- ASSERT_EQ(tw.write_record(r2), E_OK);
-
- tw.flush();
- tw.close();
-
- std::streampos full_size;
- {
- std::ifstream f(file_name_, std::ios::binary | std::ios::ate);
- full_size = f.tellg();
- }
-
- std::ofstream corrupt(file_name_, std::ios::binary | std::ios::in);
- corrupt.seekp(full_size - std::streamoff(3));
- corrupt.put(0);
- corrupt.put(0);
- corrupt.put(0);
- corrupt.close();
-
- RestorableTsFileIOWriter rw;
- ASSERT_EQ(rw.open(file_name_, true), E_OK);
- ASSERT_TRUE(rw.can_write());
-
- TsFileTreeWriter tree_writer(&rw);
- TsRecord r3(3, "d1");
- r3.add_point("s1", 3.0f);
- r3.add_point("s2", 30);
- ASSERT_EQ(tree_writer.write(r3), E_OK);
-
- TsRecord r4(4, "d2");
- r4.add_point("s1", 4.0f);
- r4.add_point("s2", 40.0);
- ASSERT_EQ(tree_writer.write(r4), E_OK);
-
- tree_writer.flush();
- tree_writer.close();
-
- TsFileTreeReader reader;
- reader.open(file_name_);
- auto device_ids = reader.get_all_device_ids();
- ASSERT_EQ(device_ids.size(), 2);
-
- std::vector measurement_ids{"s1", "s2"};
- ResultSet* result;
- int ret =
- reader.query(device_ids, measurement_ids, INT64_MIN, INT64_MAX, result);
- ASSERT_EQ(ret, E_OK);
- auto iter = result->iterator();
- RowRecord* read_record;
- int row_cnt = 0;
- while (iter.hasNext()) {
- read_record = iter.next();
- row_cnt += 1;
- }
- ASSERT_EQ(row_cnt, 4);
- reader.destroy_query_data_set(result);
- reader.close();
+ TsFileWriter tw;
+ ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK);
+ tw.register_timeseries("d1", MeasurementSchema("s1", FLOAT));
+ tw.register_timeseries("d1", MeasurementSchema("s2", INT32));
+ tw.register_timeseries("d2", MeasurementSchema("s1", FLOAT));
+ tw.register_timeseries("d2", MeasurementSchema("s2", DOUBLE));
+
+ TsRecord r1(1, "d1");
+ r1.add_point("s1", 1.0f);
+ r1.add_point("s2", 10);
+ ASSERT_EQ(tw.write_record(r1), E_OK);
+ TsRecord r2(2, "d2");
+ r2.add_point("s1", 2.0f);
+ r2.add_point("s2", 20.0);
+ ASSERT_EQ(tw.write_record(r2), E_OK);
+ tw.flush();
+ tw.close();
+
+ CorruptCurrentFileTail(3);
+
+ RestorableTsFileIOWriter rw;
+ ASSERT_EQ(rw.open(file_name_, true), E_OK);
+ ASSERT_TRUE(rw.can_write());
+
+ TsFileTreeWriter tree_writer(&rw);
+ TsRecord r3(3, "d1");
+ r3.add_point("s1", 3.0f);
+ r3.add_point("s2", 30);
+ ASSERT_EQ(tree_writer.write(r3), E_OK);
+ TsRecord r4(4, "d2");
+ r4.add_point("s1", 4.0f);
+ r4.add_point("s2", 40.0);
+ ASSERT_EQ(tree_writer.write(r4), E_OK);
+ tree_writer.flush();
+ tree_writer.close();
+
+ TsFileTreeReader reader;
+ reader.open(file_name_);
+ ASSERT_EQ(reader.get_all_device_ids().size(), 2u);
+ ASSERT_EQ(CountTreeReaderRows(reader, {"s1", "s2"}), 4);
+ reader.close();
}
+// -----------------------------------------------------------------------------
+// Recovery + continued write with aligned timeseries, then read-back verify
+// -----------------------------------------------------------------------------
+
TEST_F(RestorableTsFileIOWriterTest, AlignedTimeseriesRecoverAndWrite) {
- TsFileWriter tw;
- int flags = O_WRONLY | O_CREAT | O_TRUNC;
-#ifdef _WIN32
- flags |= O_BINARY;
-#endif
- ASSERT_EQ(tw.open(file_name_, flags, 0666), E_OK);
-
- std::vector aligned_schemas;
- aligned_schemas.push_back(new MeasurementSchema("s1", FLOAT));
- aligned_schemas.push_back(new MeasurementSchema("s2", FLOAT));
- tw.register_aligned_timeseries("d1", aligned_schemas);
-
- TsRecord r1(1, "d1");
- r1.add_point("s1", 1.0f);
- r1.add_point("s2", 2.0f);
- ASSERT_EQ(tw.write_record_aligned(r1), E_OK);
-
- TsRecord r2(2, "d1");
- r2.add_point("s1", 3.0f);
- r2.add_point("s2", 4.0f);
- ASSERT_EQ(tw.write_record_aligned(r2), E_OK);
-
- tw.flush();
- tw.close();
-
- std::streampos full_size;
- {
- std::ifstream f(file_name_, std::ios::binary | std::ios::ate);
- full_size = f.tellg();
- }
-
- std::ofstream corrupt(file_name_, std::ios::binary | std::ios::in);
- corrupt.seekp(full_size - std::streamoff(3));
- corrupt.put(0);
- corrupt.put(0);
- corrupt.put(0);
- corrupt.close();
-
- RestorableTsFileIOWriter rw;
- ASSERT_EQ(rw.open(file_name_, true), E_OK);
- ASSERT_TRUE(rw.can_write());
-
- TsFileWriter tw2;
- ASSERT_EQ(tw2.init(&rw), E_OK);
-
- TsRecord r3(3, "d1");
- r3.add_point("s1", 5.0f);
- r3.add_point("s2", 6.0f);
- ASSERT_EQ(tw2.write_record_aligned(r3), E_OK);
- tw2.flush();
- tw2.close();
-
-
- TsFileTreeReader reader;
- reader.open(file_name_);
- auto device_ids = reader.get_all_device_ids();
- ASSERT_EQ(device_ids.size(), 1);
-
- std::vector measurement_ids{"s1", "s2"};
- ResultSet* result;
- int ret =
- reader.query(device_ids, measurement_ids, INT64_MIN, INT64_MAX, result);
- ASSERT_EQ(ret, E_OK);
- auto iter = result->iterator();
- RowRecord* read_record;
- int row_cnt = 0;
- while (iter.hasNext()) {
- read_record = iter.next();
- row_cnt += 1;
- }
- ASSERT_EQ(row_cnt, 3);
- reader.destroy_query_data_set(result);
- reader.close();
+ TsFileWriter tw;
+ ASSERT_EQ(tw.open(file_name_, GetWriteCreateFlags(), 0666), E_OK);
+ std::vector aligned_schemas;
+ aligned_schemas.push_back(new MeasurementSchema("s1", FLOAT));
+ aligned_schemas.push_back(new MeasurementSchema("s2", FLOAT));
+ tw.register_aligned_timeseries("d1", aligned_schemas);
+
+ TsRecord r1(1, "d1");
+ r1.add_point("s1", 1.0f);
+ r1.add_point("s2", 2.0f);
+ ASSERT_EQ(tw.write_record_aligned(r1), E_OK);
+ TsRecord r2(2, "d1");
+ r2.add_point("s1", 3.0f);
+ r2.add_point("s2", 4.0f);
+ ASSERT_EQ(tw.write_record_aligned(r2), E_OK);
+ tw.flush();
+ tw.close();
+
+ CorruptCurrentFileTail(3);
+
+ RestorableTsFileIOWriter rw;
+ ASSERT_EQ(rw.open(file_name_, true), E_OK);
+ ASSERT_TRUE(rw.can_write());
+
+ TsFileWriter tw2;
+ ASSERT_EQ(tw2.init(&rw), E_OK);
+ TsRecord r3(3, "d1");
+ r3.add_point("s1", 5.0f);
+ r3.add_point("s2", 6.0f);
+ ASSERT_EQ(tw2.write_record_aligned(r3), E_OK);
+ tw2.flush();
+ tw2.close();
+
+ TsFileTreeReader reader;
+ reader.open(file_name_);
+ ASSERT_EQ(reader.get_all_device_ids().size(), 1u);
+ ASSERT_EQ(CountTreeReaderRows(reader, {"s1", "s2"}), 3);
+ reader.close();
}
+// -----------------------------------------------------------------------------
+// Recovery + continued write with TsFileTableWriter (table model), then read-back
+// -----------------------------------------------------------------------------
+
TEST_F(RestorableTsFileIOWriterTest, TableWriterRecoverAndWrite) {
- std::vector measurement_schemas;
- std::vector column_categories;
- measurement_schemas.push_back(new MeasurementSchema("device", STRING));
- measurement_schemas.push_back(new MeasurementSchema("value", DOUBLE));
- column_categories.push_back(ColumnCategory::TAG);
- column_categories.push_back(ColumnCategory::FIELD);
- TableSchema table_schema("test_table", measurement_schemas,
- column_categories);
-
- int flags = O_WRONLY | O_CREAT | O_TRUNC;
-#ifdef _WIN32
- flags |= O_BINARY;
-#endif
- WriteFile write_file;
- write_file.create(file_name_, flags, 0666);
-
- TsFileTableWriter table_writer(&write_file, &table_schema);
- Tablet tablet(table_schema.get_measurement_names(),
- table_schema.get_data_types(), 10);
- std::string table_name = "test_table";
- tablet.set_table_name(table_name);
- for (int i = 0; i < 10; i++) {
- tablet.add_timestamp(i, static_cast(i));
- tablet.add_value(i, "device", "device0");
- tablet.add_value(i, "value", i * 1.1);
- }
- ASSERT_EQ(table_writer.write_table(tablet), E_OK);
- ASSERT_EQ(table_writer.flush(), E_OK);
- table_writer.close();
- write_file.close();
-
- std::streampos full_size;
- {
- std::ifstream f(file_name_, std::ios::binary | std::ios::ate);
- full_size = f.tellg();
- }
- std::ofstream corrupt(file_name_, std::ios::binary | std::ios::in);
- corrupt.seekp(full_size - std::streamoff(3));
- corrupt.put(0);
- corrupt.put(0);
- corrupt.put(0);
- corrupt.close();
-
- RestorableTsFileIOWriter rw;
- ASSERT_EQ(rw.open(file_name_, true), E_OK);
- ASSERT_TRUE(rw.can_write());
-
- TsFileTableWriter table_writer2(&rw);
- std::vector value_col = {"__level1", "value"};
- std::vector value_types = {STRING, DOUBLE};
- Tablet tablet2(value_col, value_types, 10);
- tablet2.set_table_name(table_name);
- for (int i = 0; i < 10; i++) {
- tablet2.add_timestamp(i, static_cast(i + 10));
- tablet2.add_value(i, "__level1", "device0");
- tablet2.add_value(i, "value", (i + 10) * 1.1);
- }
- ASSERT_EQ(table_writer2.write_table(tablet2), E_OK);
- table_writer2.flush();
- table_writer2.close();
-
- TsFileReader table_reader;
- ASSERT_EQ(table_reader.open(file_name_), E_OK);
-
-
- ResultSet* tmp_result_set = nullptr;
- table_reader.query("test_table",
- {"__level1", "value"}, 0, 10000,
- tmp_result_set, nullptr);
- auto* table_result_set = (TableResultSet*)tmp_result_set;
- bool has_next = false;
- int64_t row_num = 0;
- while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
- auto record = table_result_set->get_row_record();
- row_num++;
- }
- ASSERT_EQ(row_num, 20);
+ std::vector measurement_schemas;
+ measurement_schemas.push_back(new MeasurementSchema("device", STRING));
+ measurement_schemas.push_back(new MeasurementSchema("value", DOUBLE));
+ std::vector column_categories = {ColumnCategory::TAG,
+ ColumnCategory::FIELD};
+ TableSchema table_schema("test_table", measurement_schemas,
+ column_categories);
+
+ WriteFile write_file;
+ write_file.create(file_name_, GetWriteCreateFlags(), 0666);
+ TsFileTableWriter table_writer(&write_file, &table_schema);
+ Tablet tablet(table_schema.get_measurement_names(),
+ table_schema.get_data_types(), 10);
+ const std::string table_name = "test_table";
+ tablet.set_table_name(table_name);
+ for (int i = 0; i < 10; i++) {
+ tablet.add_timestamp(i, static_cast(i));
+ tablet.add_value(i, "device", "device0");
+ tablet.add_value(i, "value", i * 1.1);
+ }
+ ASSERT_EQ(table_writer.write_table(tablet), E_OK);
+ ASSERT_EQ(table_writer.flush(), E_OK);
+ table_writer.close();
+ write_file.close();
+
+ CorruptCurrentFileTail(3);
+
+ RestorableTsFileIOWriter rw;
+ ASSERT_EQ(rw.open(file_name_, true), E_OK);
+ ASSERT_TRUE(rw.can_write());
+
+ TsFileTableWriter table_writer2(&rw);
+ std::vector value_col = {"__level1", "value"};
+ std::vector value_types = {STRING, DOUBLE};
+ Tablet tablet2(value_col, value_types, 10);
+ tablet2.set_table_name(table_name);
+ for (int i = 0; i < 10; i++) {
+ tablet2.add_timestamp(i, static_cast(i + 10));
+ tablet2.add_value(i, "__level1", "device0");
+ tablet2.add_value(i, "value", (i + 10) * 1.1);
+ }
+ ASSERT_EQ(table_writer2.write_table(tablet2), E_OK);
+ table_writer2.flush();
+ table_writer2.close();
+
+ TsFileReader table_reader;
+ ASSERT_EQ(table_reader.open(file_name_), E_OK);
+ ResultSet* tmp_result_set = nullptr;
+ table_reader.query("test_table", {"__level1", "value"}, 0, 10000,
+ tmp_result_set, nullptr);
+ auto* table_result_set = static_cast(tmp_result_set);
+ bool has_next = false;
+ int64_t row_num = 0;
+ while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
+ (void)table_result_set->get_row_record();
+ row_num++;
+ }
+ ASSERT_EQ(row_num, 20);
}
From 9b995badd638598b2e7849becd12990a58435a76 Mon Sep 17 00:00:00 2001
From: 761417898 <761417898@qq.com>
Date: Fri, 27 Feb 2026 09:39:10 +0800
Subject: [PATCH 11/23] Refactor get_timeseries_metadata
---
cpp/src/common/tsfile_common.h | 7 +++
cpp/src/reader/tsfile_reader.cc | 49 +++++++++++--------
cpp/src/reader/tsfile_reader.h | 46 ++++++++---------
cpp/src/reader/tsfile_tree_reader.cc | 20 ++++----
cpp/src/reader/tsfile_tree_reader.h | 41 +++++++---------
.../tree_view/tsfile_reader_tree_test.cc | 8 +--
cpp/test/reader/tsfile_reader_test.cc | 22 ++++-----
7 files changed, 99 insertions(+), 94 deletions(-)
diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h
index 39cd027ef..64faa1acf 100644
--- a/cpp/src/common/tsfile_common.h
+++ b/cpp/src/common/tsfile_common.h
@@ -23,9 +23,11 @@
#include
#include
#include