Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
77f2207
Implement interface get_all_timeseries_metadata for Retrieve metadata…
hongzhi-gao Feb 9, 2026
fa3470e
mvn spotless apply
hongzhi-gao Feb 9, 2026
311b9c1
Implement RestorableTsFileIOWriter
hongzhi-gao Feb 9, 2026
f364734
Support continued writing to restored files in the tree model & table…
hongzhi-gao Feb 10, 2026
cd3dab2
fix readme logo
hongzhi-gao Feb 24, 2026
c643961
fix readme logo
hongzhi-gao Feb 24, 2026
7a61f3e
fix readme badge
hongzhi-gao Feb 24, 2026
4efad3b
fix recovery tsfile statistic
hongzhi-gao Feb 26, 2026
38230ad
fix recovery tsfile append and reader
hongzhi-gao Feb 26, 2026
cb1f9d1
Refactor RestorableTsFileIOWriterTest
hongzhi-gao Feb 27, 2026
9b995ba
Refactor get_timeseries_metadata
hongzhi-gao Feb 27, 2026
419fbcd
Refactor get_timeseries_metadata & restorable_tsfile_io_writer.cc
hongzhi-gao Feb 27, 2026
e7766d2
Merge branch 'develop' into feature/refactor-get-sensor-statistic
hongzhi-gao Feb 27, 2026
5fb0580
Merge branch 'apache:develop' into develop
hongzhi-gao Feb 27, 2026
d72171c
Merge remote-tracking branch 'origin/develop' into feature/refactor-g…
hongzhi-gao Feb 27, 2026
2c7cabc
mvn spotless apply
hongzhi-gao Feb 27, 2026
e9b511a
fix mem leak and overflow warning
hongzhi-gao Feb 27, 2026
aaf36b9
mvn spotless:apply
hongzhi-gao Feb 27, 2026
ac09535
try fix ci
hongzhi-gao Feb 27, 2026
3e02fc4
try fix ci
hongzhi-gao Feb 27, 2026
0ff41b3
try fix ci
hongzhi-gao Feb 27, 2026
d5d5ac6
removed the replay loop and added a recovery API to restore the logic…
hongzhi-gao Mar 3, 2026
0f9bd6a
spotless apply
hongzhi-gao Mar 3, 2026
5245b30
fix restore_recovered_file_position
hongzhi-gao Mar 5, 2026
dc36335
fix TsFileWriter::init(RestorableTsFileIOWriter* rw)
hongzhi-gao Mar 6, 2026
7b84e2c
fix RestorableTsFileIOWriter ut
hongzhi-gao Mar 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/unit-test-cpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ jobs:
fi

# Run the actual maven build including all tests.
# On Windows, prepend MinGW bin to PATH so test exe can find runtime DLLs
# (e.g. libstdc++-6.dll) when gtest_discover_tests runs it; avoids 0xc0000139.
- name: Build and test with Maven
shell: bash
run: |
Expand All @@ -129,6 +131,9 @@ jobs:
else
ASAN_VALUE="OFF"
fi
if [[ "$RUNNER_OS" == "Windows" ]]; then
export PATH="/c/mingw64/bin:$PATH"
fi
./mvnw${{ steps.platform_suffix.outputs.platform_suffix }} -P with-cpp \
-Denable.asan=$ASAN_VALUE -Dbuild.type=${{ matrix.build_type }} clean verify

5 changes: 5 additions & 0 deletions .github/workflows/unit-test-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ jobs:
- name: Build and test with Maven
shell: bash
run: |
# On Windows, prepend MinGW bin so CTest/test exe uses the correct
# runtime DLLs instead of Git-for-Windows mingw runtime from PATH.
if [[ "$RUNNER_OS" == "Windows" ]]; then
export PATH="/c/mingw64/bin:$PATH"
fi
./mvnw${{ steps.platform_suffix.outputs.platform_suffix }} -P with-python -Denable.asan=OFF -Dbuild.type=Release clean verify -Dspotless.skip=true

- name: Upload whl Artifact
Expand Down
21 changes: 21 additions & 0 deletions cpp/src/common/allocator/byte_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,27 @@ class ByteStream {
total_size_.atomic_aaf(used_bytes);
}

/**
* Advance write position without copying payload bytes.
* Recovery path can use this to rebuild logical stream offset from file
* size directly.
*/
int advance_write_pos(uint32_t len) {
int ret = common::E_OK;
uint32_t advanced = 0;
while (advanced < len) {
if (RET_FAIL(prepare_space())) {
return ret;
}
uint32_t remainder = page_size_ - (total_size_.load() % page_size_);
uint32_t step =
remainder < (len - advanced) ? remainder : (len - advanced);
total_size_.atomic_aaf(step);
advanced += step;
}
return ret;
}

/* ================ Part 4: reading internal buffers ================ */
/*
* one-shot reader iterator
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/common/device_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ class StringArrayDeviceID : public IDeviceID {
if (prefix_segments_.size() == 0 || prefix_segments_.size() == 1) {
return segments_[pos];
} else {
if (pos < prefix_segments_.size()) {
return prefix_segments_[pos];
if (pos >= 0 &&
static_cast<size_t>(pos) < prefix_segments_.size()) {
return prefix_segments_[static_cast<size_t>(pos)];
} else {
return segments_[pos - prefix_segments_.size() + 1];
}
Expand Down
59 changes: 52 additions & 7 deletions cpp/src/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ struct MeasurementSchemaGroup {
*/
class TableSchema {
public:
TableSchema() = default;
TableSchema() : updatable_(true) {}

/**
* Constructs a TableSchema object with the given table name, column
Expand All @@ -197,7 +197,7 @@ class TableSchema {
*/
TableSchema(const std::string& table_name,
const std::vector<common::ColumnSchema>& column_schemas)
: table_name_(table_name) {
: table_name_(table_name), updatable_(false) {
to_lowercase_inplace(table_name_);
for (const common::ColumnSchema& column_schema : column_schemas) {
column_schemas_.emplace_back(std::make_shared<MeasurementSchema>(
Expand All @@ -217,7 +217,9 @@ class TableSchema {
TableSchema(const std::string& table_name,
const std::vector<MeasurementSchema*>& column_schemas,
const std::vector<common::ColumnCategory>& column_categories)
: table_name_(table_name), column_categories_(column_categories) {
: table_name_(table_name),
column_categories_(column_categories),
updatable_(false) {
to_lowercase_inplace(table_name_);
for (const auto column_schema : column_schemas) {
if (column_schema != nullptr) {
Expand All @@ -236,11 +238,13 @@ class TableSchema {
TableSchema(TableSchema&& other) noexcept
: table_name_(std::move(other.table_name_)),
column_schemas_(std::move(other.column_schemas_)),
column_categories_(std::move(other.column_categories_)) {}
column_categories_(std::move(other.column_categories_)),
updatable_(other.updatable_) {}

TableSchema(const TableSchema& other) noexcept
: table_name_(other.table_name_),
column_categories_(other.column_categories_) {
column_categories_(other.column_categories_),
updatable_(false) {
for (const auto& column_schema : other.column_schemas_) {
// Just call default construction
column_schemas_.emplace_back(
Expand Down Expand Up @@ -342,6 +346,14 @@ class TableSchema {
size_t get_column_pos_index_num() const { return column_pos_index_.size(); }

void update(ChunkGroupMeta* chunk_group_meta) {
if (!updatable_) {
return;
}
std::shared_ptr<IDeviceID> device_id = chunk_group_meta->device_id_;
const int seg_num = device_id ? device_id->segment_num() : 0;
if (seg_num > max_level_) {
max_level_ = seg_num;
}
for (auto iter = chunk_group_meta->chunk_meta_list_.begin();
iter != chunk_group_meta->chunk_meta_list_.end(); iter++) {
auto& chunk_meta = iter.get();
Expand Down Expand Up @@ -371,6 +383,29 @@ class TableSchema {
}
}

void finalize_column_schema() {
if (!updatable_) {
return;
}
std::vector<std::shared_ptr<MeasurementSchema>> id_columns;
for (int i = 1; i < max_level_; i++) {
std::string col_name = "__level" + std::to_string(i);
id_columns.push_back(std::make_shared<MeasurementSchema>(
col_name, common::STRING, common::PLAIN,
common::CompressionType::UNCOMPRESSED));
}
column_schemas_.insert(column_schemas_.begin(), id_columns.begin(),
id_columns.end());
column_categories_.insert(column_categories_.begin(), id_columns.size(),
common::ColumnCategory::TAG);
column_pos_index_.clear();
for (size_t i = 0; i < column_schemas_.size(); i++) {
column_pos_index_[to_lower(column_schemas_[i]->measurement_name_)] =
static_cast<int>(i);
}
updatable_ = false;
}

std::vector<common::TSDataType> get_data_types() const {
std::vector<common::TSDataType> ret;
for (const auto& measurement_schema : column_schemas_) {
Expand Down Expand Up @@ -424,6 +459,8 @@ class TableSchema {
std::vector<common::ColumnCategory> column_categories_;
std::map<std::string, int> column_pos_index_;
bool is_virtual_table_ = false;
int max_level_ = 0;
bool updatable_ = false;
};

struct Schema {
Expand All @@ -433,11 +470,19 @@ struct Schema {

void update_table_schema(ChunkGroupMeta* chunk_group_meta) {
std::shared_ptr<IDeviceID> device_id = chunk_group_meta->device_id_;
auto table_name = device_id->get_table_name();
std::string table_name = device_id->get_table_name();
if (table_schema_map_.find(table_name) == table_schema_map_.end()) {
table_schema_map_[table_name] = std::make_shared<TableSchema>();
}
table_schema_map_[table_name]->update(chunk_group_meta);
auto& ts = table_schema_map_[table_name];
ts->set_table_name(table_name);
ts->update(chunk_group_meta);
}

void finalize_table_schemas() {
for (auto& kv : table_schema_map_) {
kv.second->finalize_column_schema();
}
}
void register_table_schema(
const std::shared_ptr<TableSchema>& table_schema) {
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/common/tsfile_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
#include <cstring>
#include <iostream>
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>

#include "common/allocator/my_string.h"
#include "common/allocator/page_arena.h"
Expand Down Expand Up @@ -322,6 +324,12 @@ class ITimeseriesIndex {
virtual Statistic* get_statistic() const { return nullptr; }
};

/** Map: IDeviceID -> list of timeseries metadata (ITimeseriesIndex). */
using DeviceTimeseriesMetadataMap =
std::map<std::shared_ptr<IDeviceID>,
std::vector<std::shared_ptr<ITimeseriesIndex>>,
IDeviceIDComparator>;

/*
* A TimeseriesIndex may have one or more chunk metas,
* that means we have such a map: <Timeseries, List<ChunkMeta>>.
Expand Down
18 changes: 17 additions & 1 deletion cpp/src/encoding/fire.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class LongFire : public Fire<int64_t> {

int64_t predict(int64_t value) override {
int64_t alpha = accumulator_ >> learn_shift_;
int64_t diff = (alpha * delta_) >> bit_width_;
int64_t diff = safe_mul_shift(alpha, delta_, bit_width_);
return value + diff;
}

Expand All @@ -101,6 +101,22 @@ class LongFire : public Fire<int64_t> {
accumulator_ -= gradient;
delta_ = val - pre;
}

private:
/** (alpha * delta_) >> shift without signed overflow; both args are
* int64_t. */
static int64_t safe_mul_shift(int64_t alpha, int64_t delta, int shift) {
#if defined(__SIZEOF_INT128__) && __SIZEOF_INT128__ >= 16
__int128 product = static_cast<__int128>(alpha) * delta;
return static_cast<int64_t>(product >> shift);
#else
/* Portable fallback: use double for product. Exact for |alpha|,|delta|
* < 2^53. */
double prod = static_cast<double>(alpha) * static_cast<double>(delta);
double div = static_cast<double>(1LL << shift);
return static_cast<int64_t>(prod / div);
#endif
}
};

#endif // ENCODING_FIRE_H
2 changes: 1 addition & 1 deletion cpp/src/encoding/ts2diff_encoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class TS2DIFFEncoder : public Encoder {
public:
TS2DIFFEncoder() { init(); }

~TS2DIFFEncoder() {}
~TS2DIFFEncoder() { destroy(); }

void reset() { write_index_ = -1; }

Expand Down
Loading