diff --git a/cpp/src/reader/aligned_chunk_reader.cc b/cpp/src/reader/aligned_chunk_reader.cc index 14250e7f8..60d9c819c 100644 --- a/cpp/src/reader/aligned_chunk_reader.cc +++ b/cpp/src/reader/aligned_chunk_reader.cc @@ -550,7 +550,6 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock( row_appender.append_null(1); \ continue; \ } \ - assert(value_decoder_->has_remaining(value_in)); \ if (!value_decoder_->has_remaining(value_in)) { \ return common::E_DATA_INCONSISTENCY; \ } \ @@ -597,7 +596,6 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK( row_appender.append_null(1); continue; } - assert(value_decoder_->has_remaining(value_in)); if (!value_decoder_->has_remaining(value_in)) { return common::E_DATA_INCONSISTENCY; } @@ -683,7 +681,6 @@ int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK( } if (should_read_data) { - assert(value_decoder_->has_remaining(value_in)); if (!value_decoder_->has_remaining(value_in)) { return E_DATA_INCONSISTENCY; } diff --git a/cpp/src/reader/qds_without_timegenerator.cc b/cpp/src/reader/qds_without_timegenerator.cc index 90c782131..4124aee45 100644 --- a/cpp/src/reader/qds_without_timegenerator.cc +++ b/cpp/src/reader/qds_without_timegenerator.cc @@ -124,10 +124,13 @@ int QDSWithoutTimeGenerator::next(bool& has_next) { std::multimap::iterator iter = heap_time_.find(time); for (uint32_t i = 0; i < count; ++i) { uint32_t len = 0; + bool is_null_val = false; auto val_datatype = value_iters_[iter->second]->get_data_type(); - void* val_ptr = value_iters_[iter->second]->read(&len); - row_record_->get_field(iter->second + 1) - ->set_value(val_datatype, val_ptr, len, pa_); + void* val_ptr = value_iters_[iter->second]->read(&len, &is_null_val); + if (!is_null_val) { + row_record_->get_field(iter->second + 1) + ->set_value(val_datatype, val_ptr, len, pa_); + } value_iters_[iter->second]->next(); if (!time_iters_[iter->second]->end()) { int64_t timev = *(int64_t*)(time_iters_[iter->second]->read(&len)); diff --git a/cpp/src/writer/time_chunk_writer.cc b/cpp/src/writer/time_chunk_writer.cc index 5f004a0f5..0c7e3b212 100644 --- a/cpp/src/writer/time_chunk_writer.cc +++ b/cpp/src/writer/time_chunk_writer.cc @@ -173,6 +173,9 @@ int TimeChunkWriter::end_encode_chunk() { chunk_header_.data_size_ = chunk_data_.total_size(); chunk_header_.num_of_pages_ = num_of_pages_; } + } else if (num_of_pages_ > 0) { + chunk_header_.data_size_ = chunk_data_.total_size(); + chunk_header_.num_of_pages_ = num_of_pages_; } #if DEBUG_SE std::cout << "end_encode_time_chunk: num_of_pages_=" << num_of_pages_ diff --git a/cpp/src/writer/time_chunk_writer.h b/cpp/src/writer/time_chunk_writer.h index ac3b374b0..b5d9f489d 100644 --- a/cpp/src/writer/time_chunk_writer.h +++ b/cpp/src/writer/time_chunk_writer.h @@ -72,6 +72,18 @@ class TimeChunkWriter { bool hasData(); + /** True if the current (unsealed) page has at least one point. */ + bool has_current_page_data() const { + return time_page_writer_.get_point_numer() > 0; + } + + /** + * Force seal the current page (for aligned model: when any aligned page + * seals due to memory/point threshold, all pages must seal together). + * @return E_OK on success. + */ + int seal_current_page() { return seal_cur_page(false); } + private: FORCE_INLINE bool is_cur_page_full() const { // FIXME diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 2c2e46b97..982c9b068 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -634,6 +634,14 @@ int TsFileWriter::write_record_aligned(const TsRecord& record) { if (value_chunk_writers.size() != record.points_.size()) { return E_INVALID_ARG; } + int32_t time_pages_before = time_chunk_writer->num_of_pages(); + std::vector value_pages_before(value_chunk_writers.size(), 0); + for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { + ValueChunkWriter* value_chunk_writer = value_chunk_writers[c]; + if (!IS_NULL(value_chunk_writer)) { + value_pages_before[c] = value_chunk_writer->num_of_pages(); + } + } time_chunk_writer->write(record.timestamp_); for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { ValueChunkWriter* value_chunk_writer = value_chunk_writers[c]; @@ -643,6 +651,11 @@ int TsFileWriter::write_record_aligned(const TsRecord& record) { write_point_aligned(value_chunk_writer, record.timestamp_, data_types[c], record.points_[c]); } + if (RET_FAIL(maybe_seal_aligned_pages_together( + time_chunk_writer, value_chunk_writers, time_pages_before, + value_pages_before))) { + return ret; + } return ret; } @@ -704,6 +717,41 @@ int TsFileWriter::write_point_aligned(ValueChunkWriter* value_chunk_writer, } } +int TsFileWriter::maybe_seal_aligned_pages_together( + TimeChunkWriter* time_chunk_writer, + common::SimpleVector& value_chunk_writers, + int32_t time_pages_before, const std::vector& value_pages_before) { + bool should_seal_all = + time_chunk_writer->num_of_pages() > time_pages_before; + for (uint32_t c = 0; c < value_chunk_writers.size() && !should_seal_all; + c++) { + ValueChunkWriter* value_chunk_writer = value_chunk_writers[c]; + if (!IS_NULL(value_chunk_writer) && + value_chunk_writer->num_of_pages() > value_pages_before[c]) { + should_seal_all = true; + break; + } + } + if (!should_seal_all) { + return E_OK; + } + + int ret = E_OK; + if (time_chunk_writer->has_current_page_data() && + RET_FAIL(time_chunk_writer->seal_current_page())) { + return ret; + } + for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { + ValueChunkWriter* value_chunk_writer = value_chunk_writers[c]; + if (!IS_NULL(value_chunk_writer) && + value_chunk_writer->has_current_page_data() && + RET_FAIL(value_chunk_writer->seal_current_page())) { + return ret; + } + } + return ret; +} + int TsFileWriter::write_tablet_aligned(const Tablet& tablet) { int ret = E_OK; SimpleVector value_chunk_writers; @@ -716,15 +764,33 @@ int TsFileWriter::write_tablet_aligned(const Tablet& tablet) { data_types))) { return ret; } - time_write_column(time_chunk_writer, tablet); - ASSERT(value_chunk_writers.size() == tablet.get_column_count()); - for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { - ValueChunkWriter* value_chunk_writer = value_chunk_writers[c]; - if (IS_NULL(value_chunk_writer)) { - continue; + for (uint32_t row = 0; row < tablet.get_cur_row_size(); row++) { + int32_t time_pages_before = time_chunk_writer->num_of_pages(); + std::vector value_pages_before(value_chunk_writers.size(), 0); + for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { + ValueChunkWriter* value_chunk_writer = value_chunk_writers[c]; + if (!IS_NULL(value_chunk_writer)) { + value_pages_before[c] = value_chunk_writer->num_of_pages(); + } + } + + if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[row]))) { + return ret; + } + ASSERT(value_chunk_writers.size() == tablet.get_column_count()); + for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { + ValueChunkWriter* value_chunk_writer = value_chunk_writers[c]; + if (IS_NULL(value_chunk_writer)) { + continue; + } + if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c, row, + row + 1))) { + return ret; + } } - if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c, 0, - tablet.get_cur_row_size()))) { + if (RET_FAIL(maybe_seal_aligned_pages_together( + time_chunk_writer, value_chunk_writers, time_pages_before, + value_pages_before))) { return ret; } } @@ -808,26 +874,38 @@ int TsFileWriter::write_table(Tablet& tablet) { value_chunk_writers))) { return ret; } + // Row-by-row write so that when time page seals (e.g. by memory + // threshold), we can seal all value pages together (Java + // semantics). for (int i = start_idx; i < end_idx; i++) { + int32_t time_pages_before = time_chunk_writer->num_of_pages(); if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[i]))) { return ret; } - } - uint32_t field_col_count = 0; - for (uint32_t i = 0; i < tablet.get_column_count(); ++i) { - if (tablet.column_categories_[i] == - common::ColumnCategory::FIELD) { - ValueChunkWriter* value_chunk_writer = - value_chunk_writers[field_col_count]; - if (IS_NULL(value_chunk_writer)) { - continue; + uint32_t field_col_count = 0; + for (uint32_t col = 0; col < tablet.get_column_count(); ++col) { + if (tablet.column_categories_[col] == + common::ColumnCategory::FIELD) { + ValueChunkWriter* value_chunk_writer = + value_chunk_writers[field_col_count]; + if (!IS_NULL(value_chunk_writer) && + RET_FAIL(value_write_column( + value_chunk_writer, tablet, col, i, i + 1))) { + return ret; + } + field_col_count++; } - - if (RET_FAIL(value_write_column(value_chunk_writer, tablet, - i, start_idx, end_idx))) { - return ret; + } + int32_t time_pages_after = time_chunk_writer->num_of_pages(); + if (time_pages_after > time_pages_before) { + for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { + if (!IS_NULL(value_chunk_writers[k]) && + value_chunk_writers[k]->get_point_numer() > 0 && + RET_FAIL( + value_chunk_writers[k]->seal_current_page())) { + return ret; + } } - field_col_count++; } } start_idx = end_idx; diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index e80a1232b..75f88d37b 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -112,6 +112,11 @@ class TsFileWriter { int write_point_aligned(ValueChunkWriter* value_chunk_writer, int64_t timestamp, common::TSDataType data_type, const DataPoint& point); + int maybe_seal_aligned_pages_together( + TimeChunkWriter* time_chunk_writer, + common::SimpleVector& value_chunk_writers, + int32_t time_pages_before, + const std::vector& value_pages_before); int flush_chunk_group(MeasurementSchemaGroup* chunk_group, bool is_aligned); int write_typed_column(storage::ChunkWriter* chunk_writer, diff --git a/cpp/src/writer/value_chunk_writer.cc b/cpp/src/writer/value_chunk_writer.cc index e4bb52658..a59cf8d3f 100644 --- a/cpp/src/writer/value_chunk_writer.cc +++ b/cpp/src/writer/value_chunk_writer.cc @@ -110,7 +110,7 @@ int ValueChunkWriter::seal_cur_page(bool end_chunk) { /*stat*/ false, /*data*/ false); if (IS_SUCC(ret)) { save_first_page_data(value_page_writer_); - // value_page_writer_.destroy_page_data(); + value_page_writer_.clear_page_data(); value_page_writer_.reset(); } } @@ -161,7 +161,8 @@ int ValueChunkWriter::write_first_page_data(ByteStream& pages_data, int ValueChunkWriter::end_encode_chunk() { int ret = E_OK; - if (value_page_writer_.get_statistic()->count_ > 0) { + if (value_page_writer_.get_point_numer() > 0 || + (has_current_page_data() && num_of_pages_ == 0)) { ret = seal_cur_page(/*end_chunk*/ true); if (E_OK == ret) { chunk_header_.data_size_ = chunk_data_.total_size(); @@ -174,6 +175,9 @@ int ValueChunkWriter::end_encode_chunk() { chunk_header_.data_size_ = chunk_data_.total_size(); chunk_header_.num_of_pages_ = num_of_pages_; } + } else if (num_of_pages_ > 0) { + chunk_header_.data_size_ = chunk_data_.total_size(); + chunk_header_.num_of_pages_ = num_of_pages_; } #if DEBUG_SE std::cout << "end_encode_chunk: num_of_pages_=" << num_of_pages_ @@ -193,9 +197,7 @@ int64_t ValueChunkWriter::estimate_max_series_mem_size() { } bool ValueChunkWriter::hasData() { - return num_of_pages_ > 0 || - (value_page_writer_.get_statistic() != nullptr && - value_page_writer_.get_statistic()->count_ > 0); + return num_of_pages_ > 0 || has_current_page_data(); } } // end namespace storage diff --git a/cpp/src/writer/value_chunk_writer.h b/cpp/src/writer/value_chunk_writer.h index 859fb57b0..6ec54022c 100644 --- a/cpp/src/writer/value_chunk_writer.h +++ b/cpp/src/writer/value_chunk_writer.h @@ -118,6 +118,23 @@ class ValueChunkWriter { bool hasData(); + /** True if the current (unsealed) page has at least one write (including + * nulls). */ + bool has_current_page_data() const { + return value_page_writer_.get_total_write_count() > 0; + } + + FORCE_INLINE uint32_t get_point_numer() const { + return value_page_writer_.get_point_numer(); + } + + /** + * Force seal the current page (for aligned table model: when time page + * seals due to memory/point threshold, all value pages must seal together). + * @return E_OK on success. + */ + int seal_current_page() { return seal_cur_page(false); } + private: FORCE_INLINE bool is_cur_page_full() const { // FIXME diff --git a/cpp/src/writer/value_page_writer.cc b/cpp/src/writer/value_page_writer.cc index feedb1870..1c8f05350 100644 --- a/cpp/src/writer/value_page_writer.cc +++ b/cpp/src/writer/value_page_writer.cc @@ -43,7 +43,7 @@ int ValuePageData::init(ByteStream& col_notnull_bitmap_bs, ByteStream& value_bs, if (IS_NULL(uncompressed_buf_)) { return E_OOM; } - if (col_notnull_bitmap_buf_size_ == 0 || value_buf_size_ == 0) { + if (col_notnull_bitmap_buf_size_ == 0) { return E_INVALID_ARG; } uncompressed_buf_[0] = (unsigned char)((size >> 24) & 0xFF); @@ -54,11 +54,11 @@ int ValuePageData::init(ByteStream& col_notnull_bitmap_bs, ByteStream& value_bs, if (RET_FAIL(common::copy_bs_to_buf(col_notnull_bitmap_bs, uncompressed_buf_ + sizeof(size), col_notnull_bitmap_buf_size_))) { - } else if (RET_FAIL(common::copy_bs_to_buf(value_bs, - uncompressed_buf_ + - sizeof(size) + - col_notnull_bitmap_buf_size_, - value_buf_size_))) { + } else if (value_buf_size_ > 0 && RET_FAIL(common::copy_bs_to_buf( + value_bs, + uncompressed_buf_ + sizeof(size) + + col_notnull_bitmap_buf_size_, + value_buf_size_))) { } else { // TODO // NOTE: different compressor may have different compress API diff --git a/cpp/src/writer/value_page_writer.h b/cpp/src/writer/value_page_writer.h index 60d75b0b8..aa2555094 100644 --- a/cpp/src/writer/value_page_writer.h +++ b/cpp/src/writer/value_page_writer.h @@ -51,7 +51,6 @@ struct ValuePageData { common::ByteStream& value_bs, Compressor* compressor, uint32_t size); void destroy() { - // Be careful about the memory if (uncompressed_buf_ != nullptr) { common::mem_free(uncompressed_buf_); uncompressed_buf_ = nullptr; @@ -60,6 +59,19 @@ struct ValuePageData { compressor_->after_compress(compressed_buf_); compressed_buf_ = nullptr; } + compressor_ = nullptr; + } + + /** Clear pointers without freeing (transfer ownership to another holder). + */ + void clear() { + col_notnull_bitmap_buf_size_ = 0; + value_buf_size_ = 0; + uncompressed_size_ = 0; + compressed_size_ = 0; + uncompressed_buf_ = nullptr; + compressed_buf_ = nullptr; + compressor_ = nullptr; } }; @@ -152,6 +164,7 @@ class ValuePageWriter { } FORCE_INLINE uint32_t get_point_numer() const { return statistic_->count_; } + FORCE_INLINE uint32_t get_total_write_count() const { return size_; } FORCE_INLINE uint32_t get_col_notnull_bitmap_out_stream_size() const { return col_notnull_bitmap_out_stream_.total_size(); } @@ -183,6 +196,8 @@ class ValuePageWriter { FORCE_INLINE Statistic* get_statistic() { return statistic_; } ValuePageData get_cur_page_data() { return cur_page_data_; } void destroy_page_data() { cur_page_data_.destroy(); } + /** Clear cur_page_data_ without freeing (after ownership transferred). */ + void clear_page_data() { cur_page_data_.clear(); } private: FORCE_INLINE int prepare_end_page() { diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc b/cpp/test/reader/table_view/tsfile_reader_table_test.cc index b9f0eb213..4b1a8259f 100644 --- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc +++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc @@ -216,6 +216,21 @@ TEST_F(TsFileTableReaderTest, TableModelQueryOneSmallPage) { g_config_value_.page_writer_max_point_num_ = prev_config; } +// Triggers memory-based seal in aligned table: time page seals by size while +// value pages may not; ensure value pages are sealed together with time (no +// time-page-sealed / value-page-not-sealed inconsistency). +// Use 512 bytes so time seals by size before point count; 128 was too small +// and could produce misaligned time/value pages on some encodings. +TEST_F(TsFileTableReaderTest, TableModelQueryMemoryBasedSeal) { + uint32_t prev_point_num = g_config_value_.page_writer_max_point_num_; + uint32_t prev_mem_bytes = g_config_value_.page_writer_max_memory_bytes_; + g_config_value_.page_writer_max_point_num_ = 10000; + g_config_value_.page_writer_max_memory_bytes_ = 512; + test_table_model_query(50, 1); + g_config_value_.page_writer_max_point_num_ = prev_point_num; + g_config_value_.page_writer_max_memory_bytes_ = prev_mem_bytes; +} + TEST_F(TsFileTableReaderTest, TableModelQueryOneLargePage) { int prev_config = g_config_value_.page_writer_max_point_num_; g_config_value_.page_writer_max_point_num_ = 10000; diff --git a/cpp/test/writer/tsfile_writer_test.cc b/cpp/test/writer/tsfile_writer_test.cc index 25684e726..14a7b4100 100644 --- a/cpp/test/writer/tsfile_writer_test.cc +++ b/cpp/test/writer/tsfile_writer_test.cc @@ -813,6 +813,241 @@ TEST_F(TsFileWriterTest, WriteAlignedTimeseries) { reader.destroy_query_data_set(qds); } +/* + * Aligned page seal synchronization tests. + * + * In the aligned model, time page and every value page must seal together + * so that each chunk has the same number of pages. Without synchronization, + * a threshold hit on one page (point-count or memory) would seal only that + * page, producing misaligned page counts and corrupt reads. + * + * Three sub-cases: + * 1. Time page reaches point-count threshold first; value pages have + * partial nulls so their non-null statistic count is lower and they + * would NOT seal on their own. + * 2. Time page reaches memory threshold first; value pages are mostly + * null so their encoded-data memory is much smaller. + * 3. A value page (STRING, large per-row memory) reaches memory + * threshold first; time page and other value pages have not. + */ + +// Case 1: time page seals by point-count; value pages with partial nulls +// have fewer non-null points (statistic count) and would not self-seal. +// Sync mechanism must force all value pages to seal together. +TEST_F(TsFileWriterTest, AlignedSealSync_PointCountWithNulls) { + uint32_t prev_pt = g_config_value_.page_writer_max_point_num_; + uint32_t prev_mem = g_config_value_.page_writer_max_memory_bytes_; + struct Guard { + uint32_t pt, mem; + ~Guard() { + g_config_value_.page_writer_max_point_num_ = pt; + g_config_value_.page_writer_max_memory_bytes_ = mem; + } + } guard{prev_pt, prev_mem}; + g_config_value_.page_writer_max_point_num_ = 10; + g_config_value_.page_writer_max_memory_bytes_ = 1024 * 1024; + + std::string device_name = "device_pt_null"; + std::vector mnames = {"s0", "s1", "s2"}; + std::vector schemas; + for (auto& n : mnames) { + schemas.push_back(new MeasurementSchema(n, INT64, PLAIN, UNCOMPRESSED)); + } + tsfile_writer_->register_aligned_timeseries(device_name, schemas); + + // s0: always non-null -> 10 non-null per 10-row page, self-seals + // s1: null on even rows -> 5 non-null per page, won't self-seal + // s2: null except every 5th row -> 2 non-null per page, won't self-seal + int row_num = 30; + for (int i = 0; i < row_num; ++i) { + TsRecord record(1622505600000 + i, device_name); + record.add_point(mnames[0], static_cast(i)); + if (i % 2 != 0) { + record.add_point(mnames[1], static_cast(i * 10)); + } else { + record.points_.emplace_back(DataPoint(mnames[1])); + } + if (i % 5 == 0) { + record.add_point(mnames[2], static_cast(i * 100)); + } else { + record.points_.emplace_back(DataPoint(mnames[2])); + } + ASSERT_EQ(tsfile_writer_->write_record_aligned(record), E_OK); + } + ASSERT_EQ(tsfile_writer_->flush(), E_OK); + ASSERT_EQ(tsfile_writer_->close(), E_OK); + + std::vector select_list; + for (auto& n : mnames) { + select_list.emplace_back(device_name, n); + } + storage::QueryExpression* qe = + storage::QueryExpression::create(select_list, nullptr); + storage::TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + storage::ResultSet* tmp_qds = nullptr; + ASSERT_EQ(reader.query(qe, tmp_qds), E_OK); + auto* qds = (QDSWithoutTimeGenerator*)tmp_qds; + + bool has_next = false; + int64_t cur_row = 0; + while (IS_SUCC(qds->next(has_next)) && has_next) { + auto* rec = qds->get_row_record(); + ASSERT_NE(rec, nullptr); + EXPECT_EQ(rec->get_timestamp(), 1622505600000 + cur_row); + EXPECT_EQ(field_to_string(rec->get_field(1)), std::to_string(cur_row)); + if (cur_row % 2 != 0) { + EXPECT_EQ(field_to_string(rec->get_field(2)), + std::to_string(cur_row * 10)); + } + if (cur_row % 5 == 0) { + EXPECT_EQ(field_to_string(rec->get_field(3)), + std::to_string(cur_row * 100)); + } + cur_row++; + } + EXPECT_EQ(cur_row, row_num); + reader.destroy_query_data_set(qds); + ASSERT_EQ(reader.close(), E_OK); +} + +// Case 2: time page seals by memory threshold first. Value pages are mostly +// null so their encoded-value memory grows much slower than the time page +// (INT64 PLAIN = 8 bytes/point). Time page hits 512 bytes at ~64 points; +// value pages with 1 non-null every 20 rows only have ~24 bytes of value +// data at that point. Sync must force all value pages to seal. +TEST_F(TsFileWriterTest, AlignedSealSync_TimeMemoryFirst) { + uint32_t prev_pt = g_config_value_.page_writer_max_point_num_; + uint32_t prev_mem = g_config_value_.page_writer_max_memory_bytes_; + struct Guard { + uint32_t pt, mem; + ~Guard() { + g_config_value_.page_writer_max_point_num_ = pt; + g_config_value_.page_writer_max_memory_bytes_ = mem; + } + } guard{prev_pt, prev_mem}; + g_config_value_.page_writer_max_point_num_ = 10000; + g_config_value_.page_writer_max_memory_bytes_ = 512; + + std::string device_name = "device_time_mem"; + std::vector mnames = {"s0", "s1"}; + std::vector schemas; + for (auto& n : mnames) { + schemas.push_back(new MeasurementSchema(n, INT64, PLAIN, UNCOMPRESSED)); + } + tsfile_writer_->register_aligned_timeseries(device_name, schemas); + + int row_num = 200; + for (int i = 0; i < row_num; ++i) { + TsRecord record(1622505600000 + i, device_name); + if (i % 20 == 0) { + record.add_point(mnames[0], static_cast(i)); + record.add_point(mnames[1], static_cast(i * 10)); + } else { + record.points_.emplace_back(DataPoint(mnames[0])); + record.points_.emplace_back(DataPoint(mnames[1])); + } + ASSERT_EQ(tsfile_writer_->write_record_aligned(record), E_OK); + } + ASSERT_EQ(tsfile_writer_->flush(), E_OK); + ASSERT_EQ(tsfile_writer_->close(), E_OK); + + std::vector select_list; + for (auto& n : mnames) { + select_list.emplace_back(device_name, n); + } + storage::QueryExpression* qe = + storage::QueryExpression::create(select_list, nullptr); + storage::TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + storage::ResultSet* tmp_qds = nullptr; + ASSERT_EQ(reader.query(qe, tmp_qds), E_OK); + auto* qds = (QDSWithoutTimeGenerator*)tmp_qds; + + bool has_next = false; + int64_t cur_row = 0; + while (IS_SUCC(qds->next(has_next)) && has_next) { + auto* rec = qds->get_row_record(); + ASSERT_NE(rec, nullptr); + EXPECT_EQ(rec->get_timestamp(), 1622505600000 + cur_row); + if (cur_row % 20 == 0) { + EXPECT_EQ(field_to_string(rec->get_field(1)), + std::to_string(cur_row)); + EXPECT_EQ(field_to_string(rec->get_field(2)), + std::to_string(cur_row * 10)); + } + cur_row++; + } + EXPECT_EQ(cur_row, row_num); + reader.destroy_query_data_set(qds); + ASSERT_EQ(reader.close(), E_OK); +} + +// Case 3: a value page (STRING type, ~104 bytes/point with PLAIN encoding) +// seals by memory threshold before the time page (INT64, 8 bytes/point). +// With threshold=512, STRING value page seals at ~5 points while time page +// only has ~40 bytes. Sync must force time page and other value pages to seal. +TEST_F(TsFileWriterTest, AlignedSealSync_ValueMemoryFirst) { + uint32_t prev_pt = g_config_value_.page_writer_max_point_num_; + uint32_t prev_mem = g_config_value_.page_writer_max_memory_bytes_; + struct Guard { + uint32_t pt, mem; + ~Guard() { + g_config_value_.page_writer_max_point_num_ = pt; + g_config_value_.page_writer_max_memory_bytes_ = mem; + } + } guard{prev_pt, prev_mem}; + g_config_value_.page_writer_max_point_num_ = 10000; + g_config_value_.page_writer_max_memory_bytes_ = 512; + + std::string device_name = "device_val_mem"; + std::vector schemas; + schemas.push_back(new MeasurementSchema("s0", INT64, PLAIN, UNCOMPRESSED)); + schemas.push_back(new MeasurementSchema("s1", STRING, PLAIN, UNCOMPRESSED)); + tsfile_writer_->register_aligned_timeseries(device_name, schemas); + + char* long_buf = new char[101]; + memset(long_buf, 'A', 100); + long_buf[100] = '\0'; + common::String str_val(long_buf, 100); + + int row_num = 100; + for (int i = 0; i < row_num; ++i) { + TsRecord record(1622505600000 + i, device_name); + record.add_point(std::string("s0"), static_cast(i)); + record.add_point(std::string("s1"), str_val); + ASSERT_EQ(tsfile_writer_->write_record_aligned(record), E_OK); + } + delete[] long_buf; + ASSERT_EQ(tsfile_writer_->flush(), E_OK); + ASSERT_EQ(tsfile_writer_->close(), E_OK); + + std::string s0("s0"), s1("s1"); + std::vector select_list; + select_list.emplace_back(device_name, s0); + select_list.emplace_back(device_name, s1); + storage::QueryExpression* qe = + storage::QueryExpression::create(select_list, nullptr); + storage::TsFileReader reader; + ASSERT_EQ(reader.open(file_name_), E_OK); + storage::ResultSet* tmp_qds = nullptr; + ASSERT_EQ(reader.query(qe, tmp_qds), E_OK); + auto* qds = (QDSWithoutTimeGenerator*)tmp_qds; + + bool has_next = false; + int64_t cur_row = 0; + while (IS_SUCC(qds->next(has_next)) && has_next) { + auto* rec = qds->get_row_record(); + ASSERT_NE(rec, nullptr); + EXPECT_EQ(rec->get_timestamp(), 1622505600000 + cur_row); + EXPECT_EQ(field_to_string(rec->get_field(1)), std::to_string(cur_row)); + cur_row++; + } + EXPECT_EQ(cur_row, row_num); + reader.destroy_query_data_set(qds); + ASSERT_EQ(reader.close(), E_OK); +} + TEST_F(TsFileWriterTest, WriteAlignedMultiFlush) { int measurement_num = 100, row_num = 100; std::string device_name = "device";