Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cpp/src/reader/aligned_chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,6 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
row_appender.append_null(1); \
continue; \
} \
assert(value_decoder_->has_remaining(value_in)); \
if (!value_decoder_->has_remaining(value_in)) { \
return common::E_DATA_INCONSISTENCY; \
} \
Expand Down Expand Up @@ -597,7 +596,6 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
row_appender.append_null(1);
continue;
}
assert(value_decoder_->has_remaining(value_in));
if (!value_decoder_->has_remaining(value_in)) {
return common::E_DATA_INCONSISTENCY;
}
Expand Down Expand Up @@ -683,7 +681,6 @@ int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK(
}

if (should_read_data) {
assert(value_decoder_->has_remaining(value_in));
if (!value_decoder_->has_remaining(value_in)) {
return E_DATA_INCONSISTENCY;
}
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/reader/qds_without_timegenerator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,13 @@ int QDSWithoutTimeGenerator::next(bool& has_next) {
std::multimap<int64_t, uint32_t>::iterator iter = heap_time_.find(time);
for (uint32_t i = 0; i < count; ++i) {
uint32_t len = 0;
bool is_null_val = false;
auto val_datatype = value_iters_[iter->second]->get_data_type();
void* val_ptr = value_iters_[iter->second]->read(&len);
row_record_->get_field(iter->second + 1)
->set_value(val_datatype, val_ptr, len, pa_);
void* val_ptr = value_iters_[iter->second]->read(&len, &is_null_val);
if (!is_null_val) {
row_record_->get_field(iter->second + 1)
->set_value(val_datatype, val_ptr, len, pa_);
}
value_iters_[iter->second]->next();
if (!time_iters_[iter->second]->end()) {
int64_t timev = *(int64_t*)(time_iters_[iter->second]->read(&len));
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/writer/time_chunk_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ int TimeChunkWriter::end_encode_chunk() {
chunk_header_.data_size_ = chunk_data_.total_size();
chunk_header_.num_of_pages_ = num_of_pages_;
}
} else if (num_of_pages_ > 0) {
chunk_header_.data_size_ = chunk_data_.total_size();
chunk_header_.num_of_pages_ = num_of_pages_;
}
#if DEBUG_SE
std::cout << "end_encode_time_chunk: num_of_pages_=" << num_of_pages_
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/writer/time_chunk_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ class TimeChunkWriter {

bool hasData();

/** True if the current (unsealed) page has at least one point. */
bool has_current_page_data() const {
return time_page_writer_.get_point_numer() > 0;
}

/**
* Force seal the current page (for aligned model: when any aligned page
* seals due to memory/point threshold, all pages must seal together).
* @return E_OK on success.
*/
int seal_current_page() { return seal_cur_page(false); }

private:
FORCE_INLINE bool is_cur_page_full() const {
// FIXME
Expand Down
122 changes: 100 additions & 22 deletions cpp/src/writer/tsfile_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,14 @@ int TsFileWriter::write_record_aligned(const TsRecord& record) {
if (value_chunk_writers.size() != record.points_.size()) {
return E_INVALID_ARG;
}
int32_t time_pages_before = time_chunk_writer->num_of_pages();
std::vector<int32_t> value_pages_before(value_chunk_writers.size(), 0);
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
if (!IS_NULL(value_chunk_writer)) {
value_pages_before[c] = value_chunk_writer->num_of_pages();
}
}
time_chunk_writer->write(record.timestamp_);
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
Expand All @@ -643,6 +651,11 @@ int TsFileWriter::write_record_aligned(const TsRecord& record) {
write_point_aligned(value_chunk_writer, record.timestamp_,
data_types[c], record.points_[c]);
}
if (RET_FAIL(maybe_seal_aligned_pages_together(
time_chunk_writer, value_chunk_writers, time_pages_before,
value_pages_before))) {
return ret;
}
return ret;
}

Expand Down Expand Up @@ -704,6 +717,41 @@ int TsFileWriter::write_point_aligned(ValueChunkWriter* value_chunk_writer,
}
}

int TsFileWriter::maybe_seal_aligned_pages_together(
TimeChunkWriter* time_chunk_writer,
common::SimpleVector<ValueChunkWriter*>& value_chunk_writers,
int32_t time_pages_before, const std::vector<int32_t>& value_pages_before) {
bool should_seal_all =
time_chunk_writer->num_of_pages() > time_pages_before;
for (uint32_t c = 0; c < value_chunk_writers.size() && !should_seal_all;
c++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
if (!IS_NULL(value_chunk_writer) &&
value_chunk_writer->num_of_pages() > value_pages_before[c]) {
should_seal_all = true;
break;
}
}
if (!should_seal_all) {
return E_OK;
}

int ret = E_OK;
if (time_chunk_writer->has_current_page_data() &&
RET_FAIL(time_chunk_writer->seal_current_page())) {
return ret;
}
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
if (!IS_NULL(value_chunk_writer) &&
value_chunk_writer->has_current_page_data() &&
RET_FAIL(value_chunk_writer->seal_current_page())) {
return ret;
}
}
return ret;
}

int TsFileWriter::write_tablet_aligned(const Tablet& tablet) {
int ret = E_OK;
SimpleVector<ValueChunkWriter*> value_chunk_writers;
Expand All @@ -716,15 +764,33 @@ int TsFileWriter::write_tablet_aligned(const Tablet& tablet) {
data_types))) {
return ret;
}
time_write_column(time_chunk_writer, tablet);
ASSERT(value_chunk_writers.size() == tablet.get_column_count());
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
if (IS_NULL(value_chunk_writer)) {
continue;
for (uint32_t row = 0; row < tablet.get_cur_row_size(); row++) {
int32_t time_pages_before = time_chunk_writer->num_of_pages();
std::vector<int32_t> value_pages_before(value_chunk_writers.size(), 0);
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
if (!IS_NULL(value_chunk_writer)) {
value_pages_before[c] = value_chunk_writer->num_of_pages();
}
}
Comment on lines -719 to +775
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing from a column-based order to a row-based order may cause a performance downgrade.
May add a configuration to optionally sacrifice the page size limit:
If strict_page_size=true, use the row-based insertion with checking the page size each time.
Otherwise, insert each column wholly without the page size check, and perform it at the very end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, when there is no string/blob/text column, the memory size of each column can be directly computed with its length. Thus, we can divide the incoming column into splits that will cause a page seal, and write them one by one in a column-based order.


if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[row]))) {
return ret;
}
ASSERT(value_chunk_writers.size() == tablet.get_column_count());
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
if (IS_NULL(value_chunk_writer)) {
continue;
}
if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c, row,
row + 1))) {
return ret;
}
}
if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c, 0,
tablet.get_cur_row_size()))) {
if (RET_FAIL(maybe_seal_aligned_pages_together(
time_chunk_writer, value_chunk_writers, time_pages_before,
value_pages_before))) {
return ret;
}
}
Expand Down Expand Up @@ -808,26 +874,38 @@ int TsFileWriter::write_table(Tablet& tablet) {
value_chunk_writers))) {
return ret;
}
// Row-by-row write so that when time page seals (e.g. by memory
// threshold), we can seal all value pages together (Java
// semantics).
for (int i = start_idx; i < end_idx; i++) {
int32_t time_pages_before = time_chunk_writer->num_of_pages();
if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[i]))) {
return ret;
}
}
uint32_t field_col_count = 0;
for (uint32_t i = 0; i < tablet.get_column_count(); ++i) {
if (tablet.column_categories_[i] ==
common::ColumnCategory::FIELD) {
ValueChunkWriter* value_chunk_writer =
value_chunk_writers[field_col_count];
if (IS_NULL(value_chunk_writer)) {
continue;
uint32_t field_col_count = 0;
for (uint32_t col = 0; col < tablet.get_column_count(); ++col) {
if (tablet.column_categories_[col] ==
common::ColumnCategory::FIELD) {
ValueChunkWriter* value_chunk_writer =
value_chunk_writers[field_col_count];
if (!IS_NULL(value_chunk_writer) &&
RET_FAIL(value_write_column(
value_chunk_writer, tablet, col, i, i + 1))) {
return ret;
}
field_col_count++;
}

if (RET_FAIL(value_write_column(value_chunk_writer, tablet,
i, start_idx, end_idx))) {
return ret;
}
int32_t time_pages_after = time_chunk_writer->num_of_pages();
if (time_pages_after > time_pages_before) {
for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
if (!IS_NULL(value_chunk_writers[k]) &&
value_chunk_writers[k]->get_point_numer() > 0 &&
RET_FAIL(
value_chunk_writers[k]->seal_current_page())) {
return ret;
}
}
Comment on lines +898 to 908
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the number of value pages not checked.

field_col_count++;
}
}
start_idx = end_idx;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/writer/tsfile_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ class TsFileWriter {
int write_point_aligned(ValueChunkWriter* value_chunk_writer,
int64_t timestamp, common::TSDataType data_type,
const DataPoint& point);
int maybe_seal_aligned_pages_together(
TimeChunkWriter* time_chunk_writer,
common::SimpleVector<ValueChunkWriter*>& value_chunk_writers,
int32_t time_pages_before,
const std::vector<int32_t>& value_pages_before);
int flush_chunk_group(MeasurementSchemaGroup* chunk_group, bool is_aligned);

int write_typed_column(storage::ChunkWriter* chunk_writer,
Expand Down
12 changes: 7 additions & 5 deletions cpp/src/writer/value_chunk_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ int ValueChunkWriter::seal_cur_page(bool end_chunk) {
/*stat*/ false, /*data*/ false);
if (IS_SUCC(ret)) {
save_first_page_data(value_page_writer_);
// value_page_writer_.destroy_page_data();
value_page_writer_.clear_page_data();
value_page_writer_.reset();
}
}
Expand Down Expand Up @@ -161,7 +161,8 @@ int ValueChunkWriter::write_first_page_data(ByteStream& pages_data,

int ValueChunkWriter::end_encode_chunk() {
int ret = E_OK;
if (value_page_writer_.get_statistic()->count_ > 0) {
if (value_page_writer_.get_point_numer() > 0 ||
(has_current_page_data() && num_of_pages_ == 0)) {
ret = seal_cur_page(/*end_chunk*/ true);
if (E_OK == ret) {
chunk_header_.data_size_ = chunk_data_.total_size();
Expand All @@ -174,6 +175,9 @@ int ValueChunkWriter::end_encode_chunk() {
chunk_header_.data_size_ = chunk_data_.total_size();
chunk_header_.num_of_pages_ = num_of_pages_;
}
} else if (num_of_pages_ > 0) {
chunk_header_.data_size_ = chunk_data_.total_size();
chunk_header_.num_of_pages_ = num_of_pages_;
}
#if DEBUG_SE
std::cout << "end_encode_chunk: num_of_pages_=" << num_of_pages_
Expand All @@ -193,9 +197,7 @@ int64_t ValueChunkWriter::estimate_max_series_mem_size() {
}

bool ValueChunkWriter::hasData() {
return num_of_pages_ > 0 ||
(value_page_writer_.get_statistic() != nullptr &&
value_page_writer_.get_statistic()->count_ > 0);
return num_of_pages_ > 0 || has_current_page_data();
}

} // end namespace storage
17 changes: 17 additions & 0 deletions cpp/src/writer/value_chunk_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,23 @@ class ValueChunkWriter {

bool hasData();

/** True if the current (unsealed) page has at least one write (including
* nulls). */
bool has_current_page_data() const {
return value_page_writer_.get_total_write_count() > 0;
}

FORCE_INLINE uint32_t get_point_numer() const {
return value_page_writer_.get_point_numer();
}

/**
* Force seal the current page (for aligned table model: when time page
* seals due to memory/point threshold, all value pages must seal together).
* @return E_OK on success.
*/
int seal_current_page() { return seal_cur_page(false); }

private:
FORCE_INLINE bool is_cur_page_full() const {
// FIXME
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/writer/value_page_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ int ValuePageData::init(ByteStream& col_notnull_bitmap_bs, ByteStream& value_bs,
if (IS_NULL(uncompressed_buf_)) {
return E_OOM;
}
if (col_notnull_bitmap_buf_size_ == 0 || value_buf_size_ == 0) {
if (col_notnull_bitmap_buf_size_ == 0) {
return E_INVALID_ARG;
}
uncompressed_buf_[0] = (unsigned char)((size >> 24) & 0xFF);
Expand All @@ -54,11 +54,11 @@ int ValuePageData::init(ByteStream& col_notnull_bitmap_bs, ByteStream& value_bs,
if (RET_FAIL(common::copy_bs_to_buf(col_notnull_bitmap_bs,
uncompressed_buf_ + sizeof(size),
col_notnull_bitmap_buf_size_))) {
} else if (RET_FAIL(common::copy_bs_to_buf(value_bs,
uncompressed_buf_ +
sizeof(size) +
col_notnull_bitmap_buf_size_,
value_buf_size_))) {
} else if (value_buf_size_ > 0 && RET_FAIL(common::copy_bs_to_buf(
value_bs,
uncompressed_buf_ + sizeof(size) +
col_notnull_bitmap_buf_size_,
value_buf_size_))) {
} else {
// TODO
// NOTE: different compressor may have different compress API
Expand Down
17 changes: 16 additions & 1 deletion cpp/src/writer/value_page_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ struct ValuePageData {
common::ByteStream& value_bs, Compressor* compressor,
uint32_t size);
void destroy() {
// Be careful about the memory
if (uncompressed_buf_ != nullptr) {
common::mem_free(uncompressed_buf_);
uncompressed_buf_ = nullptr;
Expand All @@ -60,6 +59,19 @@ struct ValuePageData {
compressor_->after_compress(compressed_buf_);
compressed_buf_ = nullptr;
}
compressor_ = nullptr;
}

/** Clear pointers without freeing (transfer ownership to another holder).
*/
void clear() {
col_notnull_bitmap_buf_size_ = 0;
value_buf_size_ = 0;
uncompressed_size_ = 0;
compressed_size_ = 0;
uncompressed_buf_ = nullptr;
compressed_buf_ = nullptr;
compressor_ = nullptr;
}
};

Expand Down Expand Up @@ -152,6 +164,7 @@ class ValuePageWriter {
}

FORCE_INLINE uint32_t get_point_numer() const { return statistic_->count_; }
FORCE_INLINE uint32_t get_total_write_count() const { return size_; }
FORCE_INLINE uint32_t get_col_notnull_bitmap_out_stream_size() const {
return col_notnull_bitmap_out_stream_.total_size();
}
Expand Down Expand Up @@ -183,6 +196,8 @@ class ValuePageWriter {
FORCE_INLINE Statistic* get_statistic() { return statistic_; }
ValuePageData get_cur_page_data() { return cur_page_data_; }
void destroy_page_data() { cur_page_data_.destroy(); }
/** Clear cur_page_data_ without freeing (after ownership transferred). */
void clear_page_data() { cur_page_data_.clear(); }

private:
FORCE_INLINE int prepare_end_page() {
Expand Down
Loading