From 9871d2cd1c089da73200b26ac47fc4f3dbcf8d57 Mon Sep 17 00:00:00 2001 From: a3377596 Date: Mon, 16 Mar 2026 16:35:21 +0800 Subject: [PATCH 1/3] add new files --- cpp/include/PixelsScanFunction.hpp | 11 +- cpp/pixels-core/include/PixelsFilter.h | 40 ++- .../include/filter/comparison_operators.hpp | 45 ++++ .../include/filter/conjunction_filter.hpp | 41 ++++ .../include/filter/constant_filter.hpp | 28 +++ .../include/filter/table_filter.hpp | 58 +++++ .../include/reader/PixelsReaderOption.h | 17 +- .../include/reader/PixelsRecordReaderImpl.h | 5 +- .../include/vector/BinaryColumnVector.h | 67 ++--- .../include/vector/DecimalColumnVector.h | 57 +++-- cpp/pixels-core/lib/PixelsFilter.cpp | 201 +++++++++++---- cpp/pixels-core/lib/PixelsReaderImpl.cpp | 179 ++++---------- .../lib/vector/BinaryColumnVector.cpp | 148 ++++++----- .../lib/vector/DecimalColumnVector.cpp | 226 ++++++++--------- cpp/pixels-duckdb/PixelsScanFunction.cpp | 231 ++++++++++++++---- 15 files changed, 867 insertions(+), 487 deletions(-) create mode 100644 cpp/pixels-core/include/filter/comparison_operators.hpp create mode 100644 cpp/pixels-core/include/filter/conjunction_filter.hpp create mode 100644 cpp/pixels-core/include/filter/constant_filter.hpp create mode 100644 cpp/pixels-core/include/filter/table_filter.hpp diff --git a/cpp/include/PixelsScanFunction.hpp b/cpp/include/PixelsScanFunction.hpp index fb29fc6a1e..f320269896 100644 --- a/cpp/include/PixelsScanFunction.hpp +++ b/cpp/include/PixelsScanFunction.hpp @@ -70,10 +70,8 @@ #include "duckdb/catalog/catalog.hpp" #include "duckdb/common/constants.hpp" #include "duckdb/common/enums/file_compression_type.hpp" -//#include "duckdb/common/field_writer.hpp" #include "duckdb/common/file_system.hpp" #include "duckdb/common/hive_partitioning.hpp" -//#include "duckdb/common/types/chunk_collection.hpp" #include "duckdb/function/copy_function.hpp" #include "duckdb/function/table_function.hpp" #include "duckdb/main/client_context.hpp" @@ -87,6 +85,8 @@ #include "duckdb/storage/statistics/base_statistics.hpp" #include "duckdb/catalog/catalog_entry/table_function_catalog_entry.hpp" #include "duckdb/common/multi_file/multi_file_reader.hpp" +#include "filter/table_filter.hpp"//转换filter +#include "filter/constant_filter.hpp" #endif @@ -117,9 +117,8 @@ namespace duckdb PixelsReadLocalState &scan_data, PixelsReadGlobalState ¶llel_state, bool is_init_state = false); - static PixelsReaderOption - GetPixelsReaderOption(PixelsReadLocalState &local_state, PixelsReadGlobalState &global_state); - + static PixelsReaderOption GetPixelsReaderOption(PixelsReadLocalState &local_state, PixelsReadGlobalState &global_state); + static pixels::TableFilterSet ConvertDuckDBFilter(TableFilterSet *filters); private: static void TransformDuckdbType(const std::shared_ptr &type, vector &return_types); @@ -133,4 +132,4 @@ namespace duckdb }; } // namespace duckdb -#endif // EXAMPLE_C_PIXELSSCANFUNCTION_HPP +#endif // EXAMPLE_C_PIXELSSCANFUNCTION_HPP \ No newline at end of file diff --git a/cpp/pixels-core/include/PixelsFilter.h b/cpp/pixels-core/include/PixelsFilter.h index 282d456692..f12748f608 100644 --- a/cpp/pixels-core/include/PixelsFilter.h +++ b/cpp/pixels-core/include/PixelsFilter.h @@ -26,38 +26,54 @@ #define DUCKDB_PIXELSFILTER_H #include -#include "duckdb/planner/table_filter.hpp" -#include "duckdb/common/vector_size.hpp" -#include "duckdb/planner/filter/constant_filter.hpp" -#include "duckdb/planner/filter/null_filter.hpp" -#include "duckdb/planner/filter/conjunction_filter.hpp" -#include "duckdb/common/operator/comparison_operators.hpp" #include "PixelsBitMask.h" #include "vector/ColumnVector.h" #include "TypeDescription.h" #include #include +#include "filter/table_filter.hpp" +#include "filter/constant_filter.hpp" +#include "filter/conjunction_filter.hpp" +#include "filter/comparison_operators.hpp" #define ENABLE_SIMD_FILTER class PixelsFilter { public: - static void ApplyFilter(std::shared_ptr vector, duckdb::TableFilter &filter, + static void ApplyFilter(std::shared_ptr vector, const pixels::TableFilter &filter, PixelsBitMask &filterMask, std::shared_ptr type); template static int CompareAvx2(void *data, T constant); - template + /*template static void TemplatedFilterOperation(std::shared_ptr vector, - const duckdb::Value &constant, PixelsBitMask &filter_mask, - std::shared_ptr type); + const pixels::Scalar &constant, PixelsBitMask &filter_mask, + std::shared_ptr type);*/ + template + static void IntFilterOperation(std::shared_ptr vector, + const pixels::Scalar &constant, PixelsBitMask &filter_mask); + + template + static void LongFilterOperation(std::shared_ptr vector, + const pixels::Scalar &constant, PixelsBitMask &filter_mask); + template + static void DateFilterOperation(std::shared_ptr vector, + const pixels::Scalar &constant, PixelsBitMask &filter_mask); + + template + static void DecimalFilterOperation(std::shared_ptr vector, + const pixels::Scalar &constant, PixelsBitMask &filter_mask); + + template + static void StringFilterOperation(std::shared_ptr vector, + const pixels::Scalar &constant, PixelsBitMask &filter_mask); template - static void FilterOperationSwitch(std::shared_ptr vector, duckdb::Value &constant, + static void FilterOperationSwitch(std::shared_ptr vector, const pixels::Scalar &constant, PixelsBitMask &filter_mask, std::shared_ptr type); }; -#endif //DUCKDB_PIXELSFILTER_H +#endif //DUCKDB_PIXELSFILTER_H \ No newline at end of file diff --git a/cpp/pixels-core/include/filter/comparison_operators.hpp b/cpp/pixels-core/include/filter/comparison_operators.hpp new file mode 100644 index 0000000000..2e17c13af6 --- /dev/null +++ b/cpp/pixels-core/include/filter/comparison_operators.hpp @@ -0,0 +1,45 @@ +#pragma once +namespace pixels{ + //仿照duckdb的比较符结构体,每个结构体都有一个静态函数Operation,接受两个参数,返回比较结果。PixelsFilter.cpp中会调用这个函数来进行过滤操作。 + +struct Equals { + template + static inline bool Operation(const T &left, const T &right) { + return left == right; + } +}; +struct NotEquals { + template + static inline bool Operation(const T &left, const T &right) { + return !Equals::Operation(left, right); + } +}; + +struct GreaterThan { + template + static inline bool Operation(const T &left, const T &right) { + return left > right; + } +}; + +struct GreaterThanEquals { + template + static inline bool Operation(const T &left, const T &right) { + return !GreaterThan::Operation(right, left); + } +}; + +struct LessThan { + template + static inline bool Operation(const T &left, const T &right) { + return GreaterThan::Operation(right, left); + } +}; + +struct LessThanEquals { + template + static inline bool Operation(const T &left, const T &right) { + return !GreaterThan::Operation(left, right); + } +}; +} \ No newline at end of file diff --git a/cpp/pixels-core/include/filter/conjunction_filter.hpp b/cpp/pixels-core/include/filter/conjunction_filter.hpp new file mode 100644 index 0000000000..0d61d6ac8f --- /dev/null +++ b/cpp/pixels-core/include/filter/conjunction_filter.hpp @@ -0,0 +1,41 @@ +#pragma once + +#include "table_filter.hpp" + + +namespace pixels { + +class ConjunctionFilter : public TableFilter { +public: + explicit ConjunctionFilter(TableFilterType filter_type_p) : TableFilter(filter_type_p) { + } + + ~ConjunctionFilter() override { + } + + //! The filters of this conjunction + std::vector> child_filters; + +public: + bool Equals(const TableFilter &other) const override { + return TableFilter::Equals(other); + } +}; + +class ConjunctionOrFilter : public ConjunctionFilter { +public: + static constexpr const TableFilterType TYPE = TableFilterType::CONJUNCTION_OR; + + ConjunctionOrFilter() + : ConjunctionFilter(TYPE) {} +}; + +class ConjunctionAndFilter : public ConjunctionFilter { +public: + static constexpr const TableFilterType TYPE = TableFilterType::CONJUNCTION_AND; + + ConjunctionAndFilter() + : ConjunctionFilter(TYPE) {} +}; + +} \ No newline at end of file diff --git a/cpp/pixels-core/include/filter/constant_filter.hpp b/cpp/pixels-core/include/filter/constant_filter.hpp new file mode 100644 index 0000000000..6106577b06 --- /dev/null +++ b/cpp/pixels-core/include/filter/constant_filter.hpp @@ -0,0 +1,28 @@ +#pragma once +#include "table_filter.hpp" +#include +#include +#include +#include + +namespace pixels { + +class ConstantFilter : public TableFilter { +public: + static constexpr const TableFilterType TYPE = TableFilterType::CONSTANT_COMPARISON; + +public: + // 使用 std::move 处理可能存在的字符串拷贝 + ConstantFilter(ComparisonOperator comparison_type, Scalar constant) + : TableFilter(TYPE), + comparison_type(comparison_type), + constant(std::move(constant)) {} + + //! 比较操作符 (例如: ==, >, <, >=, <=) + ComparisonOperator comparison_type; + + //! 具体的常量值 + Scalar constant; +}; + +} \ No newline at end of file diff --git a/cpp/pixels-core/include/filter/table_filter.hpp b/cpp/pixels-core/include/filter/table_filter.hpp new file mode 100644 index 0000000000..72f0c73219 --- /dev/null +++ b/cpp/pixels-core/include/filter/table_filter.hpp @@ -0,0 +1,58 @@ +#pragma once + +#include +#include +#include "PixelsTypes.h" +#include +#include +namespace pixels { + +//! TableFilter represents a filter pushed down into the table scan. +class TableFilter { +public: + TableFilter() : filter_type(TableFilterType::DEFAULT) {} + explicit TableFilter(TableFilterType filter_type_p) : filter_type(filter_type_p) { + } + virtual ~TableFilter() { + } + + TableFilterType filter_type; + +public: + virtual bool Equals(const TableFilter &other) const { + return filter_type == other.filter_type; + } +}; + +//仿照duckdb设计 +class TableFilterSet { +public: + std::map> filters; +public: + bool Equals(TableFilterSet &other) { + if (filters.size() != other.filters.size()) { + return false; + } + for (auto &entry : filters) { + auto other_entry = other.filters.find(entry.first); + if (other_entry == other.filters.end()) { + return false; + } + if (!entry.second->Equals(*other_entry->second)) { + return false; + } + } + return true; + } + static bool Equals(TableFilterSet *left, TableFilterSet *right) { + if (left == right) { + return true; + } + if (!left || !right) { + return false; + } + return left->Equals(*right); + } +}; + +} // namespace pixels \ No newline at end of file diff --git a/cpp/pixels-core/include/reader/PixelsReaderOption.h b/cpp/pixels-core/include/reader/PixelsReaderOption.h index 72b656a547..c4530b651c 100644 --- a/cpp/pixels-core/include/reader/PixelsReaderOption.h +++ b/cpp/pixels-core/include/reader/PixelsReaderOption.h @@ -26,9 +26,11 @@ #define PIXELS_PIXELSREADEROPTION_H #include +#include #include #include -#include "duckdb/planner/table_filter.hpp" +#include "filter/table_filter.hpp" + class PixelsReaderOption { @@ -55,11 +57,13 @@ class PixelsReaderOption void setRGRange(int start, int len); - void setFilter(duckdb::TableFilterSet *filter); + void setFilter(pixels::TableFilterSet f); + + pixels::TableFilterSet extractFilter(); - void setRingIndex(int ringIndex); + int getfiltersize(); - duckdb::TableFilterSet *getFilter(); + void setRingIndex(int ringIndex); int getRGStart(); @@ -79,8 +83,9 @@ class PixelsReaderOption private: std::vector includedCols; - duckdb::TableFilterSet *filter; // TODO: pixelsPredicate + pixels::TableFilterSet filter; + //修改:不再依赖 duckdb 类型,不过看来原项目的主人是想要设计一套自己的谓词系统的 bool skipCorruptRecords; bool tolerantSchemaEvolution; // this may lead to column missing due to schema evolution bool enableEncodedColumnVector; // whether read encoded column vectors directly when possible @@ -89,6 +94,6 @@ class PixelsReaderOption int batchSize; int rgStart; int rgLen; - int ringIndex; + int ringIndex; }; #endif //PIXELS_PIXELSREADEROPTION_H diff --git a/cpp/pixels-core/include/reader/PixelsRecordReaderImpl.h b/cpp/pixels-core/include/reader/PixelsRecordReaderImpl.h index 67904ebe5a..8ad01072c3 100644 --- a/cpp/pixels-core/include/reader/PixelsRecordReaderImpl.h +++ b/cpp/pixels-core/include/reader/PixelsRecordReaderImpl.h @@ -67,7 +67,7 @@ class PixelsRecordReaderImpl : public PixelsRecordReader explicit PixelsRecordReaderImpl(std::shared_ptr reader, const pixels::proto::PostScript &pixelsPostScript, const pixels::proto::Footer &pixelsFooter, - const PixelsReaderOption &opt, + PixelsReaderOption &opt,//option不能是只读的,因为要把tablefilterset拿过来 std::shared_ptr pixelsFooterCache ); @@ -104,7 +104,8 @@ class PixelsRecordReaderImpl : public PixelsRecordReader pixels::proto::PostScript postScript; std::shared_ptr footerCache; PixelsReaderOption option; - duckdb::TableFilterSet *filter; + //pixels::predicate filter + pixels::TableFilterSet filter; long queryId; int ringIndex; int RGStart; diff --git a/cpp/pixels-core/include/vector/BinaryColumnVector.h b/cpp/pixels-core/include/vector/BinaryColumnVector.h index 448a3f843a..8c6ce8c6d4 100644 --- a/cpp/pixels-core/include/vector/BinaryColumnVector.h +++ b/cpp/pixels-core/include/vector/BinaryColumnVector.h @@ -27,57 +27,34 @@ #include "vector/ColumnVector.h" #include "vector/VectorizedRowBatch.h" -#include "duckdb.h" -#include "duckdb/common/types/vector.hpp" +#include "PixelsTypes.h" +#include +#include /** - * BinaryColumnVector derived from org.apache.hadoop.hive.ql.exec.vector. - *

- * This class supports string and binary data by value reference -- i.e. each field is - * explicitly present, as opposed to provided by a dictionary reference. - * In some cases, all the values will be in the same byte array to begin with, - * but this need not be the case. If each value is in a separate byte - * array to start with, or not all the values are in the same original - * byte array, you can still assign data by reference into this column vector. - * This gives flexibility to use this in multiple situations. - *

- * When setting data by reference, the caller - * is responsible for allocating the byte arrays used to hold the data. - * You can also set data by value, as long as you call the initBuffer() method first. - * You can mix "by value" and "by reference" in the same column vector, - * though that use is probably not typical. + * BinaryColumnVector + * 作用:存储字符串或二进制数据的引用。原本项目同时保存duckdb的string和std的string,有点和稀泥,这里先改为仅保存std的string + * 后续要实现一个类似duckdb的string_t结构体,包含指针和长度,来避免std::string的内存分配和复制开销。 */ +// BinaryColumnVector.h + class BinaryColumnVector : public ColumnVector { - public: - duckdb::string_t *vector; - - std::vector str_vec; +public: + BinaryColumnVector(uint64_t len, bool encoding); + ~BinaryColumnVector(); - /** - * Use this constructor by default. All column vectors - * should normally be the default size. - */ - explicit BinaryColumnVector(uint64_t len = VectorizedRowBatch::DEFAULT_SIZE, bool encoding = false); + void close() override; - ~BinaryColumnVector(); - /** - * Set a field by reference. - * - * @param elementNum index within column vector to set - * @param sourceBuf container of source data - * @param start start byte position within source - * @param length length of source byte sequence - */ - void setRef(int elementNum, uint8_t *const &sourceBuf, int start, int length); - void *current() override; - void close() override; - void print(int rowCount) override; - - void add(std::string &value) override; - void add(uint8_t *v, int length); - //void setVal(int elemnetNum,uint8_t* sourceBuf); - void setVal(int elementNum, uint8_t *sourceBuf, int start, int length); - void ensureSize(uint64_t size, bool preserveData) override; + void setRef(int elementNum, uint8_t *const &sourceBuf, int start, int length); + void setVal(int elementNum, uint8_t *sourceBuf, int start, int length); + void ensureSize(uint64_t size, bool preserveData) override; + void add(std::string &value); + void add(uint8_t *v, int len); + void *current() override; + const std::string &getValue(idx_t i) const; + bool isNullAt(idx_t i) const; + std::vector str_vec; // 唯一字符串存储 }; + #endif //PIXELS_BINARYCOLUMNVECTOR_H diff --git a/cpp/pixels-core/include/vector/DecimalColumnVector.h b/cpp/pixels-core/include/vector/DecimalColumnVector.h index 652e26c22b..a7d5a70d7a 100644 --- a/cpp/pixels-core/include/vector/DecimalColumnVector.h +++ b/cpp/pixels-core/include/vector/DecimalColumnVector.h @@ -27,32 +27,39 @@ #include "vector/ColumnVector.h" #include "vector/VectorizedRowBatch.h" -#include "duckdb/common/types.hpp" +#include "PixelsTypes.h" //引入中立类型 + +#pragma once +#include +#include +#include +#include +#include +#include -using PhysicalType = duckdb::PhysicalType; class DecimalColumnVector : public ColumnVector { - public: - long *vector; - int precision; - int scale; - PhysicalType physical_type_; - static long DEFAULT_UNSCALED_VALUE; - /** - * Use this constructor by default. All column vectors - * should normally be the default size. - */ - DecimalColumnVector(int precision, int scale, bool encoding = false); - DecimalColumnVector(uint64_t len, int precision, int scale, bool encoding = false); - ~DecimalColumnVector(); - void print(int rowCount) override; - void close() override; - void *current() override; - int getPrecision(); - int getScale(); - - void add(std::string &value) override; - void add(long value) override; - void ensureSize(uint64_t size, bool preserveData) override; +public: + DecimalColumnVector(uint64_t len, int precision, int scale, bool encoding); + ~DecimalColumnVector(); + + void add(long value); + void add(const std::string& value); + + void* current(); + void close(); + + int getPrecision() const { return precision_; } + int getScale() const { return scale_; } + void* getData() const { return vector; } + pixels::PhysicalType getPhysicalType() const { return physical_type_; } + size_t ElementSize() const; + void ensureSize(uint64_t size, bool preserveData); + void* vector;//这里改为void,之后根据大小调整 + int precision_; + int scale_; + + pixels::PhysicalType physical_type_; }; -#endif //PIXELS_DECIMALCOLUMNVECTOR_H + +#endif //PIXELS_DECIMALCOLUMNVECTOR_H \ No newline at end of file diff --git a/cpp/pixels-core/lib/PixelsFilter.cpp b/cpp/pixels-core/lib/PixelsFilter.cpp index 643715eedc..7f11fa8d6c 100644 --- a/cpp/pixels-core/lib/PixelsFilter.cpp +++ b/cpp/pixels-core/lib/PixelsFilter.cpp @@ -27,6 +27,7 @@ template int PixelsFilter::CompareAvx2(void *data, T constant) { + //std::cout<<"enter compare"<()) + if constexpr(std::is_same()) { mask = _mm256_cmpeq_epi32(vector, constants); return _mm256_movemask_ps((__m256) mask); - } else if constexpr(std::is_same()) + } else if constexpr(std::is_same()) { mask = _mm256_cmpgt_epi32(constants, vector); return _mm256_movemask_ps((__m256) mask); - } else if constexpr(std::is_same()) + } else if constexpr(std::is_same()) { mask = _mm256_cmpgt_epi32(vector, constants); return ~_mm256_movemask_ps((__m256) mask); - } else if constexpr(std::is_same()) + } else if constexpr(std::is_same()) { mask = _mm256_cmpgt_epi32(vector, constants); return _mm256_movemask_ps((__m256) mask); - } else if constexpr(std::is_same()) + } else if constexpr(std::is_same()) { mask = _mm256_cmpgt_epi32(constants, vector); return ~_mm256_movemask_ps((__m256) mask); @@ -62,35 +63,35 @@ int PixelsFilter::CompareAvx2(void *data, T constant) vector = _mm256_load_si256((__m256i *) data); vector_next = _mm256_load_si256((__m256i * )((uint8_t *) data + 32)); int result = 0; - if constexpr(std::is_same()) + if constexpr(std::is_same()) { mask = _mm256_cmpeq_epi64(vector, constants); result = _mm256_movemask_pd((__m256d) mask); mask = _mm256_cmpeq_epi64(vector_next, constants); result += _mm256_movemask_pd((__m256d) mask) << 4; return result; - } else if constexpr(std::is_same()) + } else if constexpr(std::is_same()) { mask = _mm256_cmpgt_epi64(constants, vector); result = _mm256_movemask_pd((__m256d) mask); mask = _mm256_cmpgt_epi64(constants, vector_next); result += _mm256_movemask_pd((__m256d) mask) << 4; return result; - } else if constexpr(std::is_same()) + } else if constexpr(std::is_same()) { mask = _mm256_cmpgt_epi64(vector, constants); result = _mm256_movemask_pd((__m256d) mask); mask = _mm256_cmpgt_epi64(vector_next, constants); result += _mm256_movemask_pd((__m256d) mask) << 4; return ~result; - } else if constexpr(std::is_same()) + } else if constexpr(std::is_same()) { mask = _mm256_cmpgt_epi64(vector, constants); result = _mm256_movemask_pd((__m256d) mask); mask = _mm256_cmpgt_epi64(vector_next, constants); result += _mm256_movemask_pd((__m256d) mask) << 4; return result; - } else if constexpr(std::is_same()) + } else if constexpr(std::is_same()) { mask = _mm256_cmpgt_epi64(constants, vector); result = _mm256_movemask_pd((__m256d) mask); @@ -103,18 +104,113 @@ int PixelsFilter::CompareAvx2(void *data, T constant) } } +template +void PixelsFilter::IntFilterOperation(std::shared_ptr vector, + const pixels::Scalar &constant, PixelsBitMask &filter_mask){ + int32_t constant_value=std::get(constant); + auto intColumnVector = std::static_pointer_cast(vector); + int i = 0; +#ifdef ENABLE_SIMD_FILTER + for (; i < vector->length - vector->length % 8; i += 8) { + uint8_t mask = CompareAvx2(intColumnVector->intVector + i, constant_value); + filter_mask.setByteAligned(i, mask); + } +#endif + for (; i < vector->length; i++) + { + filter_mask.set(i, OP::Operation((int32_t) intColumnVector->intVector[i],constant_value)); + } +} -template +template +void PixelsFilter::LongFilterOperation(std::shared_ptr vector,const pixels::Scalar &constant, PixelsBitMask &filter_mask){ + //printf("enter long filter operation\n"); + // 传入地址 &constant + int64_t constant_value=std::get(constant); + //printf("long filter constant value: %lld\n", constant_value); + auto longColumnVector = std::static_pointer_cast(vector); + int i = 0; +#ifdef ENABLE_SIMD_FILTER + for (; i < vector->length - vector->length % 8; i += 8) { + uint8_t mask = CompareAvx2(longColumnVector->longVector + i, constant_value); + filter_mask.setByteAligned(i,mask); + + } +#endif + for (; i < vector->length; i++) + { + filter_mask.set(i, OP::Operation((int64_t) longColumnVector->longVector[i],constant_value)); + } +} + +template +void PixelsFilter::DateFilterOperation(std::shared_ptr vector, + const pixels::Scalar &constant, PixelsBitMask &filter_mask){ + int32_t constant_value=std::get(constant); + auto dateColumnVector = std::static_pointer_cast(vector); + int i = 0; +#ifdef ENABLE_SIMD_FILTER + for (; i < vector->length - vector->length % 8; i += 8) { + uint8_t mask = CompareAvx2(dateColumnVector->dates + i, constant_value); + filter_mask.setByteAligned(i, mask); + } +#endif + for (; i < vector->length; i++) + { + filter_mask.set(i, OP::Operation((int32_t) dateColumnVector->dates[i],constant_value)); + } +} + +template +void PixelsFilter::DecimalFilterOperation(std::shared_ptr vector, + const pixels::Scalar &constant, PixelsBitMask &filter_mask){ + int64_t constant_value=std::get(constant); + //std::cout<<"in filter the decimal is "<(vector); + int i = 0; +#ifdef ENABLE_SIMD_FILTER + for (; i < vector->length - vector->length % 8; i += 8) { + uint8_t mask = CompareAvx2(decimalColumnVector->vector + i, constant_value); + filter_mask.setByteAligned(i, mask); + } +#endif + for (; i < vector->length; i++) + { + //std::cout<<(int64_t)((int64_t*)decimalColumnVector->vector)[i]<vector)[i], constant_value)); + } +} + +template +void PixelsFilter::StringFilterOperation(std::shared_ptr vector,//string目前只能并行比较,原项目也是这样,沿用,后续再想办法 + const pixels::Scalar &constant, PixelsBitMask &filter_mask){ + std::string constant_value=std::get(constant); + auto binaryColumnVector = std::static_pointer_cast(vector); + for (int i = 0; i < vector->length; i++) + { + filter_mask.set(i, OP::Operation((std::string) binaryColumnVector->str_vec[i],(std::string) constant_value)); + } +} + +/*template void PixelsFilter::TemplatedFilterOperation(std::shared_ptr vector, - const duckdb::Value &constant, PixelsBitMask &filter_mask, + const pixels::Scalar &constant, PixelsBitMask &filter_mask, std::shared_ptr type) { - T constant_value = constant.template GetValueUnsafe(); + T constant_value; + if (auto ptr = std::get_if(&constant)) { + constant_value = *ptr; + std::cout << "value is " << constant_value << std::endl; + } else { + std::cerr << "Invalid type in constant" << std::endl; + } + switch (type->getCategory()) { case TypeDescription::SHORT: case TypeDescription::INT: { + std::cout<<"it is int"<(vector); int i = 0; #ifdef ENABLE_SIMD_FILTER @@ -132,6 +228,7 @@ void PixelsFilter::TemplatedFilterOperation(std::shared_ptr vecto } case TypeDescription::LONG: { + std::cout<<"it is long"<(vector); int i = 0; #ifdef ENABLE_SIMD_FILTER @@ -176,8 +273,7 @@ void PixelsFilter::TemplatedFilterOperation(std::shared_ptr vecto #endif for (; i < vector->length; i++) { - filter_mask.set(i, OP::Operation((T) decimalColumnVector->vector[i], - constant_value)); + filter_mask.set(i,OP::Operation((T)((int64_t*)decimalColumnVector->vector)[i], constant_value)); } break; } @@ -190,16 +286,16 @@ void PixelsFilter::TemplatedFilterOperation(std::shared_ptr vecto auto binaryColumnVector = std::static_pointer_cast(vector); for (int i = 0; i < vector->length; i++) { - filter_mask.set(i, OP::Operation((duckdb::string_t) binaryColumnVector->vector[i], - (duckdb::string_t) constant_value)); + filter_mask.set(i, OP::Operation((std::string) binaryColumnVector->vector[i], + (std::string) constant_value)); } break; } } -} +}*/ template -void PixelsFilter::FilterOperationSwitch(std::shared_ptr vector, duckdb::Value &constant, +void PixelsFilter::FilterOperationSwitch(std::shared_ptr vector, const pixels::Scalar &constant, PixelsBitMask &filter_mask, std::shared_ptr type) { @@ -211,36 +307,41 @@ void PixelsFilter::FilterOperationSwitch(std::shared_ptr vector, { case TypeDescription::SHORT: case TypeDescription::INT: + IntFilterOperation(vector,constant,filter_mask); + break; case TypeDescription::DATE: - TemplatedFilterOperation(vector, constant, filter_mask, type); + DateFilterOperation(vector, constant, filter_mask); break; case TypeDescription::LONG: - TemplatedFilterOperation(vector, constant, filter_mask, type); + LongFilterOperation(vector, constant, filter_mask); break; case TypeDescription::DECIMAL: - TemplatedFilterOperation(vector, constant, filter_mask, type); + DecimalFilterOperation(vector, constant, filter_mask); break; case TypeDescription::STRING: case TypeDescription::BINARY: case TypeDescription::VARBINARY: case TypeDescription::CHAR: case TypeDescription::VARCHAR: - TemplatedFilterOperation(vector, constant, filter_mask, type); + StringFilterOperation(vector, constant, filter_mask); break; default: throw InvalidArgumentException("Unsupported type for filter. "); } } -void PixelsFilter::ApplyFilter(std::shared_ptr vector, duckdb::TableFilter &filter, +void PixelsFilter::ApplyFilter(std::shared_ptr vector, const pixels::TableFilter &filter, PixelsBitMask &filterMask, std::shared_ptr type) { + switch (filter.filter_type) { - case duckdb::TableFilterType::CONJUNCTION_AND: + //std::cout<<(int)filter.filter_type< vector, duckdb::Ta } break; } - case duckdb::TableFilterType::CONJUNCTION_OR: + case pixels::TableFilterType::CONJUNCTION_OR: { - auto &conjunction = (duckdb::ConjunctionOrFilter &) filter; + //std::cout<<"this a conjuntionor filter"< vector, duckdb::Ta filterMask.And(orMask); break; } - case duckdb::TableFilterType::CONSTANT_COMPARISON: + case pixels::TableFilterType::CONSTANT_COMPARISON: { - auto &constant_filter = (duckdb::ConstantFilter &) filter; + auto &constant_filter = (pixels::ConstantFilter &) filter;//constant filter里的constant本质是个scaler,variant做成的 switch (constant_filter.comparison_type) { - case duckdb::ExpressionType::COMPARE_EQUAL: - FilterOperationSwitch( - vector, constant_filter.constant, filterMask, type); + case pixels::ComparisonOperator::EQUAL: + FilterOperationSwitch( + vector,constant_filter.constant, filterMask, type); break; - case duckdb::ExpressionType::COMPARE_LESSTHAN: - FilterOperationSwitch( + case pixels::ComparisonOperator::LESS_THAN: + FilterOperationSwitch( vector, constant_filter.constant, filterMask, type); break; - case duckdb::ExpressionType::COMPARE_LESSTHANOREQUALTO: - FilterOperationSwitch( + case pixels::ComparisonOperator::LESS_THAN_OR_EQUAL: + FilterOperationSwitch( vector, constant_filter.constant, filterMask, type); break; - case duckdb::ExpressionType::COMPARE_GREATERTHAN: - FilterOperationSwitch( + case pixels::ComparisonOperator::GREATER_THAN: + FilterOperationSwitch( vector, constant_filter.constant, filterMask, type); break; - case duckdb::ExpressionType::COMPARE_GREATERTHANOREQUALTO: - FilterOperationSwitch( + case pixels::ComparisonOperator::GREATER_THAN_OR_EQUAL: + FilterOperationSwitch( vector, constant_filter.constant, filterMask, type); break; default: - D_ASSERT(0); + //D_ASSERT(0);do nothing for unsupported comparison operator for now + break; } break; } - case duckdb::TableFilterType::IS_NOT_NULL: + case pixels::TableFilterType::IS_NOT_NULL: // TODO: support is null break; - case duckdb::TableFilterType::IS_NULL: + case pixels::TableFilterType::IS_NULL: // TODO: support is null break; - case duckdb::TableFilterType::OPTIONAL_FILTER: + case pixels::TableFilterType::OPTIONAL_FILTER: // nothing to do return; default: - D_ASSERT(0); + //D_ASSERT(0); do nothing for unsupported filter type for now break; } -} - - - - +} \ No newline at end of file diff --git a/cpp/pixels-core/lib/PixelsReaderImpl.cpp b/cpp/pixels-core/lib/PixelsReaderImpl.cpp index ff01c3fb89..66b73525c2 100644 --- a/cpp/pixels-core/lib/PixelsReaderImpl.cpp +++ b/cpp/pixels-core/lib/PixelsReaderImpl.cpp @@ -22,147 +22,72 @@ * @author liyu * @create 2023-03-06 */ -#include "PixelsReaderImpl.h" +#ifndef PIXELS_PIXELSREADERIMPL_H +#define PIXELS_PIXELSREADERIMPL_H -PixelsReaderImpl::PixelsReaderImpl(std::shared_ptr fileSchema, - std::shared_ptr reader, - std::shared_ptr fileTail, - std::shared_ptr footerCache) -{ - this->fileSchema = fileSchema; - this->physicalReader = reader; - this->footer = fileTail->footer(); - this->postScript = fileTail->postscript(); - this->pixelsFooterCache = footerCache; - this->closed = false; -} +#include "PixelsReader.h" +#include "reader/PixelsRecordReaderImpl.h" +#include +#include +#include "pixels-common/pixels.pb.h" +#include "PixelsFooterCache.h" +#include "reader/PixelsReaderOption.h" -/** - * Prepare for the next row batch. This method is independent from readBatch(). - * - * @param batchSize the willing batch size - * @return the real batch size - */ -std::shared_ptr PixelsReaderImpl::read(PixelsReaderOption option) -{ - // TODO: add a function parameter, and the code before creating PixelsRecordReaderImpl - std::shared_ptr recordReader = - std::make_shared( - physicalReader, postScript, - footer, option, pixelsFooterCache); - recordReaders.emplace_back(recordReader); - return recordReader; -} - -std::shared_ptr PixelsReaderImpl::getFileSchema() -{ - return fileSchema; -} +class PixelsReaderBuilder; -PixelsVersion::Version PixelsReaderImpl::getFileVersion() +class PixelsReaderImpl : public PixelsReader { - return PixelsVersion::from(postScript.version()); -} +public: + std::shared_ptr read(PixelsReaderOption option); -long PixelsReaderImpl::getNumberOfRows() -{ - return postScript.numberofrows(); -} + PixelsReaderImpl(std::shared_ptr fileSchema, + std::shared_ptr reader, + std::shared_ptr fileTail, + std::shared_ptr footerCache); -pixels::proto::CompressionKind PixelsReaderImpl::getCompressionKind() -{ - return postScript.compression(); -} + ~PixelsReaderImpl(); -long PixelsReaderImpl::getCompressionBlockSize() -{ - return postScript.compressionblocksize(); -} + std::shared_ptr getFileSchema() override; -long PixelsReaderImpl::getPixelStride() -{ - return postScript.pixelstride(); -} + PixelsVersion::Version getFileVersion() override; -std::string PixelsReaderImpl::getWriterTimeZone() -{ - return postScript.writertimezone(); -} + long getNumberOfRows() override; -int PixelsReaderImpl::getRowGroupNum() -{ - return footer.rowgroupinfos_size(); -} + pixels::proto::CompressionKind getCompressionKind() override; -bool PixelsReaderImpl::isPartitioned() -{ - return postScript.has_partitioned() && postScript.partitioned(); -} + long getCompressionBlockSize() override; -ColumnStatisticList PixelsReaderImpl::getColumnStats() -{ - return footer.columnstats(); -} + long getPixelStride() override; -pixels::proto::ColumnStatistic PixelsReaderImpl::getColumnStat(std::string columnName) -{ - auto fieldNames = fileSchema->getFieldNames(); - auto fieldIter = std::find(fieldNames.begin(), fieldNames.end(), columnName); - if (fieldIter == fieldNames.end()) - { - throw InvalidArgumentException("the column " + - columnName + " is not the field name!"); - } - int fieldId = fieldIter - fieldNames.begin(); - return footer.columnstats().Get(fieldId); -} - -RowGroupInfoList PixelsReaderImpl::getRowGroupInfos() -{ - return footer.rowgroupinfos(); -} + std::string getWriterTimeZone() override; -pixels::proto::RowGroupInformation PixelsReaderImpl::getRowGroupInfo(int rowGroupId) -{ - if (rowGroupId < 0 || rowGroupId >= footer.columnstats_size()) - { - throw InvalidArgumentException("row group id is out of bound."); - } - return footer.rowgroupinfos().Get(rowGroupId); -} - -pixels::proto::RowGroupStatistic PixelsReaderImpl::getRowGroupStat(int rowGroupId) -{ - if (rowGroupId < 0 || rowGroupId >= footer.columnstats_size()) - { - throw InvalidArgumentException("row group id is out of bound."); - } - return footer.rowgroupstats().Get(rowGroupId); -} - -RowGroupStatList PixelsReaderImpl::getRowGroupStats() -{ - return footer.rowgroupstats(); -} + int getRowGroupNum() override; -PixelsReaderImpl::~PixelsReaderImpl() -{ - if (!closed) - { - PixelsReaderImpl::close(); - } -} + bool isPartitioned() override; -void PixelsReaderImpl::close() -{ - if (!closed) - { - for (auto recordReader: recordReaders) - { - recordReader->close(); - } - recordReaders.clear(); - physicalReader->close(); - } -} + ColumnStatisticList getColumnStats() override; + + pixels::proto::ColumnStatistic getColumnStat(std::string columnName) override; + + RowGroupInfoList getRowGroupInfos() override; + + pixels::proto::RowGroupInformation getRowGroupInfo(int rowGroupId) override; + + pixels::proto::RowGroupStatistic getRowGroupStat(int rowGroupId) override; + + RowGroupStatList getRowGroupStats() override; + + void close() override; + +private: + std::vector > recordReaders; + std::shared_ptr fileSchema; + std::shared_ptr physicalReader; + std::shared_ptr pixelsFooterCache; + pixels::proto::PostScript postScript; + pixels::proto::Footer footer; + bool closed; +}; + +#endif //PIXELS_PIXELSREADERIMPL_H \ No newline at end of file diff --git a/cpp/pixels-core/lib/vector/BinaryColumnVector.cpp b/cpp/pixels-core/lib/vector/BinaryColumnVector.cpp index dd4fe71b6e..65f2d597be 100644 --- a/cpp/pixels-core/lib/vector/BinaryColumnVector.cpp +++ b/cpp/pixels-core/lib/vector/BinaryColumnVector.cpp @@ -23,101 +23,125 @@ * @create 2023-03-17 */ #include "vector/BinaryColumnVector.h" +#include // 用于 posix_memalign 和 free +#include // 用于 memcpy -BinaryColumnVector::BinaryColumnVector(uint64_t len, bool encoding) : ColumnVector(len, encoding) +BinaryColumnVector::BinaryColumnVector(uint64_t len, bool encoding) + : ColumnVector(len, encoding) { - posix_memalign(reinterpret_cast(&vector), 32, - len * sizeof(duckdb::string_t)); - str_vec.resize(len); - memoryUsage += (long) sizeof(uint8_t) * len; + str_vec.resize(len); + memoryUsage += sizeof(std::string) * len; } void BinaryColumnVector::close() { - if (!closed) - { - ColumnVector::close(); - free(vector); - vector = nullptr; - } + if (!closed) + { + ColumnVector::close(); + str_vec.clear(); + str_vec.shrink_to_fit(); + closed = true; + } } -void BinaryColumnVector::setRef(int elementNum, uint8_t *const &sourceBuf, int start, int length) +void BinaryColumnVector::setRef(int elementNum, + uint8_t *const &sourceBuf, + int start, + int length) { - if (elementNum >= writeIndex) - { - writeIndex = elementNum + 1; - } - this->vector[elementNum] - = duckdb::string_t((char *) (sourceBuf + start), length); -// std::cout<< this->vector[elementNum].GetString()<= (int)this->length) + { + ensureSize(elementNum + 1, true); + } + if (elementNum >= writeIndex) + { + writeIndex = elementNum + 1; + } + + str_vec[elementNum] = std::string( + reinterpret_cast(sourceBuf + start), + length); + + isNull[elementNum] = false; } -void BinaryColumnVector::print(int rowCount) +void BinaryColumnVector::setVal(int elementNum, + uint8_t *sourceBuf, + int start, + int length) { - throw InvalidArgumentException("not support print binarycolumnvector."); + str_vec[elementNum] = std::string( + reinterpret_cast(sourceBuf + start), + length); + + isNull[elementNum] = false; } +void BinaryColumnVector::ensureSize(uint64_t size, bool preserveData) +{ + if (length >= size) + return; + + if (preserveData) + { + str_vec.resize(size); + } + else + { + std::vector new_vec(size); + str_vec.swap(new_vec); + } + + memoryUsage += sizeof(std::string) * (size - length); + + resize(size); // 更新基类 length +} + + BinaryColumnVector::~BinaryColumnVector() { - if (!closed) - { - BinaryColumnVector::close(); - } + if (!closed) + { + BinaryColumnVector::close(); + } } void *BinaryColumnVector::current() { - if (vector == nullptr) - { - return nullptr; - } else - { - return vector + readIndex; - } + if (readIndex >= str_vec.size()) + return nullptr; + + return (void *)&str_vec[readIndex]; } void BinaryColumnVector::add(std::string &value) { - size_t len = value.size(); - uint8_t *buffer = new uint8_t[len]; - std::memcpy(buffer, value.data(), len); - add(buffer, len); - delete[] buffer; + if (writeIndex >= (int)length) + { + ensureSize(writeIndex == 0 ? 1 : writeIndex * 2, true); + } + + str_vec[writeIndex++] = value; } void BinaryColumnVector::add(uint8_t *v, int len) { - if (writeIndex >= length) - { - ensureSize(writeIndex * 2, true); - } - setVal(writeIndex++, v, 0, len); + if (writeIndex >= (int)length) + { + ensureSize(writeIndex == 0 ? 1 : writeIndex * 2, true); + } + + str_vec[writeIndex++] = + std::string(reinterpret_cast(v), len); } -void BinaryColumnVector::setVal(int elementNum, uint8_t *sourceBuf, int start, int length) +const std::string &BinaryColumnVector::getValue(idx_t i) const { - vector[elementNum] = duckdb::string_t(reinterpret_cast(sourceBuf + start), length); - isNull[elementNum] = false; - str_vec[elementNum] = std::string(reinterpret_cast(sourceBuf + start), length); + return str_vec[i]; } -void BinaryColumnVector::ensureSize(uint64_t size, bool preserveData) +bool BinaryColumnVector::isNullAt(idx_t i) const { - ColumnVector::ensureSize(size, preserveData); - if (length < size) - { - duckdb::string_t *oldVector = vector; - posix_memalign(reinterpret_cast(&vector), 32, size * sizeof(duckdb::string_t)); - str_vec.resize(size); - if (preserveData) - { - std::copy(oldVector, oldVector + length, vector); - } - delete[] oldVector; - memoryUsage += (long) sizeof(duckdb::string_t) * (size - length); - resize(size); - } + return isNull[i]; } \ No newline at end of file diff --git a/cpp/pixels-core/lib/vector/DecimalColumnVector.cpp b/cpp/pixels-core/lib/vector/DecimalColumnVector.cpp index bd2afec54f..926e006f74 100644 --- a/cpp/pixels-core/lib/vector/DecimalColumnVector.cpp +++ b/cpp/pixels-core/lib/vector/DecimalColumnVector.cpp @@ -25,155 +25,163 @@ #include #include #include +#include +#include #include "vector/DecimalColumnVector.h" -#include "duckdb/common/types/decimal.hpp" - -/** - * The decimal column vector with precision and scale. - * The values of this column vector are the unscaled integer value - * of the decimal. For example, the unscaled value of 3.14, which is - * of the type decimal(3,2), is 314. While the precision and scale - * of this decimal are 3 and 2, respectively. - * - *

Note: it only supports short decimals with max precision - * and scale 18.

- * - * Created at: 05/03/2022 - * Author: hank - */ -DecimalColumnVector::DecimalColumnVector(int precision, int scale, bool encoding) : ColumnVector ( - VectorizedRowBatch::DEFAULT_SIZE, encoding) +DecimalColumnVector::DecimalColumnVector( + uint64_t len, + int precision, + int scale, + bool encoding) + : ColumnVector(len, encoding), + precision_(precision), + scale_(scale) { - DecimalColumnVector (VectorizedRowBatch::DEFAULT_SIZE, precision, scale, encoding); -} + if (precision <= pixels::DecimalConfig::MAX_WIDTH_INT16) + physical_type_ = pixels::PhysicalType::INT16; + else if (precision <= pixels::DecimalConfig::MAX_WIDTH_INT32) + physical_type_ = pixels::PhysicalType::INT32; + else if (precision <= pixels::DecimalConfig::MAX_WIDTH_INT64) + physical_type_ = pixels::PhysicalType::INT64; + else if (precision <= pixels::DecimalConfig::MAX_WIDTH_INT128) + physical_type_ = pixels::PhysicalType::INT128; + else + throw std::runtime_error("Decimal precision too large"); -DecimalColumnVector::DecimalColumnVector(uint64_t len, int precision, int scale, - bool encoding) - : ColumnVector (len, encoding) -{ - // decimal column vector has no encoding so we don't allocate memory to - // this->vector - this->vector = nullptr; - this->precision = precision; - this->scale = scale; - - using duckdb::Decimal; - if (precision <= Decimal::MAX_WIDTH_INT16) - { - physical_type_ = PhysicalType::INT16; - posix_memalign (reinterpret_cast(&vector), 32, - len * sizeof (int16_t)); - memoryUsage += (uint64_t) sizeof (int16_t) * len; - } else if (precision <= Decimal::MAX_WIDTH_INT32) - { - physical_type_ = PhysicalType::INT32; - posix_memalign (reinterpret_cast(&vector), 32, - len * sizeof (int32_t)); - memoryUsage += (uint64_t) sizeof (int32_t) * len; - } else if (precision <= Decimal::MAX_WIDTH_INT64) - { - physical_type_ = PhysicalType::INT64; - posix_memalign (reinterpret_cast(&vector), 32, - len * sizeof (int64_t)); - memoryUsage += (uint64_t) sizeof (uint64_t) * len; - } else if (precision <= Decimal::MAX_WIDTH_INT128) - { - physical_type_ = PhysicalType::INT128; - posix_memalign (reinterpret_cast(&vector), 32, - len * sizeof (uint64_t)); - memoryUsage += (uint64_t) sizeof (uint64_t) * len; - } else - { - throw std::runtime_error ( - "Decimal precision is bigger than the maximum supported width"); - } + size_t bytes = len * ElementSize(); + + if (posix_memalign(&vector, 32, bytes) != 0) + throw std::runtime_error("Decimal allocation failed"); + + memoryUsage += bytes; } void DecimalColumnVector::close() { if (!closed) { - ColumnVector::close (); - if (physical_type_ == PhysicalType::INT16 || - physical_type_ == PhysicalType::INT32) + ColumnVector::close(); + + if (physical_type_ == pixels::PhysicalType::INT16 || + physical_type_ == pixels::PhysicalType::INT32) { free (vector); } - vector = nullptr; - } -} -void DecimalColumnVector::print(int rowCount) -{ -// throw InvalidArgumentException("not support print Decimalcolumnvector."); - for (int i = 0; i < rowCount; i++) - { - std::cout << vector[i] << std::endl; + closed = true; } } + DecimalColumnVector::~DecimalColumnVector() { if (!closed) { - DecimalColumnVector::close (); + DecimalColumnVector::close(); } } -void *DecimalColumnVector::current() +void* DecimalColumnVector::current() { - if (vector == nullptr) - { - return nullptr; - } else - { - return vector + readIndex; - } + if (!vector) return nullptr; + + return static_cast(vector) + + readIndex * ElementSize(); } -int DecimalColumnVector::getPrecision() +void DecimalColumnVector::add(const std::string& input) { - return precision; -} + std::string value = input; + auto dot_pos = value.find('.'); + int fractional_digits = 0; -int DecimalColumnVector::getScale() -{ - return scale; -} + if (dot_pos != std::string::npos) + fractional_digits = value.size() - dot_pos - 1; -void DecimalColumnVector::add(std::string &value) -{ - std::transform (value.begin (), value.end (), value.begin (), ::tolower); - size_t pos = value.find ('.'); - long integerValue = std::stoll (value.substr (0, pos) + value.substr (pos + 1)); - long l = integerValue * std::pow (10, scale - (value.length () - pos - 1)); - add (l); + std::string merged = + dot_pos == std::string::npos ? + value : + value.substr(0, dot_pos) + value.substr(dot_pos + 1); + + __int128 integerValue = 0; + + for (char c : merged) + { + if (c == '-') continue; + integerValue = integerValue * 10 + (c - '0'); + } + + if (value[0] == '-') + integerValue = -integerValue; + + int scale_diff = scale_ - fractional_digits; + + for (int i = 0; i < scale_diff; i++) + integerValue *= 10; + + add((long)integerValue); // 自动走分支 } void DecimalColumnVector::add(long value) { if (writeIndex >= length) + ensureSize(length * 2, true); + + switch (physical_type_) { - ensureSize (writeIndex * 2, true); + case pixels::PhysicalType::INT16: + reinterpret_cast(vector)[writeIndex] = (int16_t)value; + break; + + case pixels::PhysicalType::INT32: + reinterpret_cast(vector)[writeIndex] = (int32_t)value; + break; + + case pixels::PhysicalType::INT64: + reinterpret_cast(vector)[writeIndex] = (int64_t)value; + break; + + case pixels::PhysicalType::INT128: + reinterpret_cast<__int128*>(vector)[writeIndex] = (__int128)value; + break; } - int index = writeIndex++; - vector[index] = value; - isNull[index] = false; + + isNull[writeIndex] = false; + writeIndex++; } -void DecimalColumnVector::ensureSize(uint64_t size, bool preserveData) +size_t DecimalColumnVector::ElementSize() const { - ColumnVector::ensureSize (size, preserveData); - long *oldVector = vector; - posix_memalign (reinterpret_cast(&vector), 32, - size * sizeof (long)); - if (preserveData) + switch (physical_type_) { - std::copy (oldVector, oldVector + length, vector); + case pixels::PhysicalType::INT16: return sizeof(int16_t); + case pixels::PhysicalType::INT32: return sizeof(int32_t); + case pixels::PhysicalType::INT64: return sizeof(int64_t); + case pixels::PhysicalType::INT128: return sizeof(__int128); + default: + throw std::runtime_error("Invalid decimal physical type"); } - delete[] oldVector; - memoryUsage += (long) sizeof (int) * (size - length); - resize (size); -} \ No newline at end of file +} + +void DecimalColumnVector::ensureSize(uint64_t size, bool preserveData) +{ + if (size <= length) + return; + + void* old = vector; + size_t old_bytes = length * ElementSize(); + size_t new_bytes = size * ElementSize(); + + if (posix_memalign(&vector, 32, new_bytes) != 0) + throw std::runtime_error("Decimal resize failed"); + + if (preserveData && old) + std::memcpy(vector, old, old_bytes); + + free(old); + + memoryUsage += (new_bytes - old_bytes); + resize(size); +} diff --git a/cpp/pixels-duckdb/PixelsScanFunction.cpp b/cpp/pixels-duckdb/PixelsScanFunction.cpp index d68f6ac787..73ea339911 100644 --- a/cpp/pixels-duckdb/PixelsScanFunction.cpp +++ b/cpp/pixels-duckdb/PixelsScanFunction.cpp @@ -26,6 +26,7 @@ #include "physical/StorageArrayScheduler.h" #include "profiler/CountProfiler.h" +/// @brief namespace duckdb { @@ -82,11 +83,11 @@ void PixelsScanFunction::PixelsScanImplementation(ClientContext &context, { return; } - + auto &data = (PixelsReadLocalState &) *data_p.local_state; auto &gstate = (PixelsReadGlobalState &) *data_p.global_state; auto &bind_data = (PixelsReadBindData &) *data_p.bind_data; - + //std::cout << "filters ptr: " << gstate.filters << std::endl; do { if (data.currPixelsRecordReader == nullptr || @@ -110,7 +111,7 @@ void PixelsScanFunction::PixelsScanImplementation(ClientContext &context, } if (data.vectorizedRowBatch == nullptr) { - data.vectorizedRowBatch = currPixelsRecordReader->readBatch(false); + data.vectorizedRowBatch = currPixelsRecordReader->readBatch(false);//readbatch里完成filter过滤 } uint64_t currentLoc = data.vectorizedRowBatch->position(); std::shared_ptr resultSchema = data.currPixelsRecordReader->getResultSchema(); @@ -356,8 +357,7 @@ void PixelsScanFunction::TransformDuckdbChunk(PixelsReadLocalState &data, case TypeDescription::INT: { auto intCol = std::static_pointer_cast(col); - Vector vector(LogicalType::INTEGER, - (data_ptr_t) (intCol->current()), col->currentValid(),col->getCapacity()); + Vector vector(LogicalType::INTEGER,(data_ptr_t) (intCol->current()), col->currentValid(),col->getCapacity()); output.data.at(col_id).Reference(vector); // auto result_ptr = FlatVector::GetData(output.data.at(col_id)); // memcpy(result_ptr, intCol->intVector + row_offset, thisOutputChunkRows * sizeof(int)); @@ -370,8 +370,7 @@ void PixelsScanFunction::TransformDuckdbChunk(PixelsReadLocalState &data, case TypeDescription::LONG: { auto longCol = std::static_pointer_cast(col); - Vector vector(LogicalType::BIGINT, - (data_ptr_t) (longCol->current()), col->currentValid(),col->getCapacity()); + Vector vector(LogicalType::BIGINT,(data_ptr_t) (longCol->current()), col->currentValid(),col->getCapacity()); output.data.at(col_id).Reference(vector); // auto result_ptr = FlatVector::GetData(output.data.at(col_id)); // memcpy(result_ptr, longCol->longVector + row_offset, thisOutputChunkRows * sizeof(long)); @@ -387,8 +386,7 @@ void PixelsScanFunction::TransformDuckdbChunk(PixelsReadLocalState &data, case TypeDescription::DECIMAL: { auto decimalCol = std::static_pointer_cast(col); - Vector vector(LogicalType::DECIMAL(colSchema->getPrecision(), colSchema->getScale()), - (data_ptr_t) (decimalCol->current()), col->currentValid(),col->getCapacity()); + Vector vector(LogicalType::DECIMAL(colSchema->getPrecision(), colSchema->getScale()),(data_ptr_t) (decimalCol->current()), col->currentValid(),col->getCapacity()); output.data.at(col_id).Reference(vector); // auto result_ptr = FlatVector::GetData(output.data.at(col_id)); // memcpy(result_ptr, decimalCol->vector + row_offset, thisOutputChunkRows * sizeof(long)); @@ -403,8 +401,7 @@ void PixelsScanFunction::TransformDuckdbChunk(PixelsReadLocalState &data, case TypeDescription::DATE: { auto dateCol = std::static_pointer_cast(col); - Vector vector(LogicalType::DATE, - (data_ptr_t) (dateCol->current()), col->currentValid(),col->getCapacity()); + Vector vector(LogicalType::DATE,(data_ptr_t) (dateCol->current()), col->currentValid(),col->getCapacity()); output.data.at(col_id).Reference(vector); // auto result_ptr = FlatVector::GetData(output.data.at(col_id)); // memcpy(result_ptr, dateCol->dates + row_offset, thisOutputChunkRows * sizeof(int)); @@ -419,8 +416,7 @@ void PixelsScanFunction::TransformDuckdbChunk(PixelsReadLocalState &data, case TypeDescription::TIMESTAMP: { auto tsCol = std::static_pointer_cast(col); - Vector vector(LogicalType::TIMESTAMP, - (data_ptr_t) (tsCol->current()), col->currentValid(),col->getCapacity()); + Vector vector(LogicalType::TIMESTAMP,(data_ptr_t) (tsCol->current()), col->currentValid(),col->getCapacity()); output.data.at(col_id).Reference(vector); break; } @@ -431,16 +427,25 @@ void PixelsScanFunction::TransformDuckdbChunk(PixelsReadLocalState &data, // break; case TypeDescription::VARCHAR: case TypeDescription::CHAR: - case TypeDescription::STRING: - { - auto binaryCol = std::static_pointer_cast(col); - Vector vector(LogicalType::VARCHAR, - (data_ptr_t) (binaryCol->current()), col->currentValid(),col->getCapacity()); - output.data.at(col_id).Reference(vector); -// auto result_ptr = FlatVector::GetData(output.data.at(col_id)); -// memcpy(result_ptr, binaryCol->vector + row_offset, thisOutputChunkRows * sizeof(string_t)); - break; - } + case TypeDescription::STRING://不知道怎么跑起来的,能动就不要改,别的是直接reference了,这里采用的应该是拷贝 + { + auto binaryCol = std::static_pointer_cast(col); + auto &out_vec = output.data.at(col_id); + out_vec.SetVectorType(VectorType::FLAT_VECTOR); + auto result_data = FlatVector::GetData(out_vec); + for (idx_t i = 0; i < thisOutputChunkRows; i++) + { + if (binaryCol->isNullAt(i)) + { + FlatVector::SetNull(out_vec, i, true); + continue; + } + const std::string &str = binaryCol->getValue(i); + result_data[i] = StringVector::AddString(out_vec, str); + } + + break; + } // case TypeDescription::STRUCT: // break; // default: @@ -542,8 +547,10 @@ bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, PixelsR ->setPixelsFooterCache(footerCache) ->build(); + // 1. 使用 auto 配合 std::move(或者直接在下一步 move) PixelsReaderOption option = GetPixelsReaderOption(scan_data, parallel_state); - scan_data.nextPixelsRecordReader = scan_data.nextReader->read(option); + // 2. 传给 read 时必须显式 move + scan_data.nextPixelsRecordReader = scan_data.nextReader->read(std::move(option)); auto nextPixelsRecordReader = std::static_pointer_cast( scan_data.nextPixelsRecordReader); @@ -552,8 +559,8 @@ bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, PixelsR //double buffer nextPixelsRecordReader->read(); } - - } else + } + else { scan_data.nextReader = nullptr; scan_data.nextPixelsRecordReader = nullptr; @@ -561,21 +568,161 @@ bool PixelsScanFunction::PixelsParallelStateNext(ClientContext &context, PixelsR return true; } -PixelsReaderOption -PixelsScanFunction::GetPixelsReaderOption(PixelsReadLocalState &local_state, PixelsReadGlobalState &global_state) -{ - PixelsReaderOption option; - option.setSkipCorruptRecords(true); - option.setTolerantSchemaEvolution(true); - option.setEnableEncodedColumnVector(true); - option.setFilter(global_state.filters); - option.setEnabledFilterPushDown(enable_filter_pushdown); - // includeCols comes from the caller of PixelsPageSource - option.setIncludeCols(local_state.column_names); - option.setRGRange(0, local_state.nextReader->getRowGroupNum()); - option.setQueryId(1); - int stride = std::stoi(ConfigFactory::Instance().getProperty("pixel.stride")); - option.setBatchSize(stride); - return option; +pixels::ConstantFilter ConvertConstantFilter(const ConstantFilter &filter) { + pixels::ComparisonOperator op; + // 转换比较操作符 + switch (filter.comparison_type) { + case ExpressionType::COMPARE_EQUAL: + op = pixels::ComparisonOperator::EQUAL; + break; + case ExpressionType::COMPARE_LESSTHAN: + op = pixels::ComparisonOperator::LESS_THAN; + break; + case ExpressionType::COMPARE_LESSTHANOREQUALTO: + op = pixels::ComparisonOperator::LESS_THAN_OR_EQUAL; + break; + case ExpressionType::COMPARE_GREATERTHAN: + op = pixels::ComparisonOperator::GREATER_THAN; + break; + case ExpressionType::COMPARE_GREATERTHANOREQUALTO: + op = pixels::ComparisonOperator::GREATER_THAN_OR_EQUAL; + break; + default: + throw std::runtime_error("Unsupported DuckDB comparison type"); + } + pixels::Scalar val; + switch (filter.constant.type().id()) { + case LogicalTypeId::INTEGER:{ + val.set(filter.constant.GetValue()); + break; + } + case LogicalTypeId::BIGINT:{ + val.set(filter.constant.GetValue()); + break; + } + case LogicalTypeId::VARCHAR:{ + val.set(filter.constant.GetValue()); + break; + } + case LogicalTypeId::DATE: { + auto d = filter.constant.GetValue(); + val.set(d.days); // 转成 int + break; + } + case duckdb::LogicalTypeId::DECIMAL: { + double decimal_value = filter.constant.GetValue(); // 整数表示 + int32_t scale = DecimalType::GetScale(filter.constant.type()); + for(int i=0;i()); + break; + } + default: + throw std::runtime_error("Unsupported DuckDB constant type"); + } + + return pixels::ConstantFilter(op, std::move(val)); +} + +static std::unique_ptr +ConvertConjunctionFilter(const ConjunctionFilter &duck_filter) { + std::unique_ptr result; + + if (duck_filter.filter_type == TableFilterType::CONJUNCTION_AND) { + result = std::make_unique(); + } else { + result = std::make_unique(); + } + for (const auto &child : duck_filter.child_filters) { + const TableFilter &child_ref = *child; + switch (child_ref.filter_type) { + case TableFilterType::CONSTANT_COMPARISON: { + const auto &cf = static_cast(child_ref); + result->child_filters.push_back(std::make_unique(ConvertConstantFilter(cf))); + break; + } + case TableFilterType::CONJUNCTION_AND: + case TableFilterType::CONJUNCTION_OR: { + const auto &child_conj =static_cast(child_ref); + result->child_filters.push_back(ConvertConjunctionFilter(child_conj)); // 递归直接接收 + break; + } + case TableFilterType::IS_NULL: + break; + case TableFilterType::IS_NOT_NULL: + break; + case TableFilterType::IN_FILTER: + break; + default: + break; + } + } + return result; } + +pixels::TableFilterSet PixelsScanFunction::ConvertDuckDBFilter(TableFilterSet* filters){ + pixels::TableFilterSet pixel_filters; + for (auto &entry : filters->filters) { + idx_t col_idx = entry.first; + const TableFilter &filter_ref = *entry.second; + switch (filter_ref.filter_type) { + case TableFilterType::CONSTANT_COMPARISON: { + const auto &cf = static_cast(filter_ref); + pixel_filters.filters[col_idx] = std::make_unique(std::move(ConvertConstantFilter(cf))); + break; + } + + case TableFilterType::CONJUNCTION_AND: + case TableFilterType::CONJUNCTION_OR: { + const auto &conj =static_cast(filter_ref); + pixel_filters.filters[col_idx] =ConvertConjunctionFilter(conj); + break; + } + + case TableFilterType::IS_NULL://以下的要么连filter层也没有实现要么就是duckdb不会下推,转换也没有必要 + break; + case TableFilterType::IS_NOT_NULL: + break; + case TableFilterType::STRUCT_EXTRACT: + break; + case TableFilterType::OPTIONAL_FILTER: + break; + case TableFilterType::IN_FILTER: + break; + case TableFilterType::DYNAMIC_FILTER: + break; + default: + break; + } + } + return pixel_filters; + } + +PixelsReaderOption PixelsScanFunction::GetPixelsReaderOption(PixelsReadLocalState &local_state, PixelsReadGlobalState &global_state) { + PixelsReaderOption option; + option.setSkipCorruptRecords(true); + option.setTolerantSchemaEvolution(true); + option.setEnableEncodedColumnVector(true); + option.setEnabledFilterPushDown(enable_filter_pushdown); + //option.setFilter(global_state.filters);完成从duckdb的filter到pixelstablefilter的转换,做一个adapter来转接 + if (global_state.filters && enable_filter_pushdown) { + auto pixels_filter = ConvertDuckDBFilter(global_state.filters); + option.setFilter(std::move(pixels_filter)); + } + option.setIncludeCols(local_state.column_names); + option.setRGRange(0, local_state.nextReader->getRowGroupNum()); + option.setQueryId(1); + + int stride = std::stoi(ConfigFactory::Instance().getProperty("pixel.stride")); + option.setBatchSize(stride); + + return option; +} + +} \ No newline at end of file From 35d6fd2fada9b22874c2fd6c0ec27e434541cc19 Mon Sep 17 00:00:00 2001 From: a3377596 Date: Mon, 16 Mar 2026 18:16:36 +0800 Subject: [PATCH 2/3] adjust file structure --- cpp/{ => etc}/pixels-cpp.properties | 3 +- cpp/pixels-common/lib/utils/ConfigFactory.cpp | 4 +- cpp/pixels-core/include/PixelsBitMask.h | 9 +- cpp/pixels-core/include/PixelsTypes.h | 155 +++++++++++++++ cpp/pixels-core/include/reader/ColumnReader.h | 18 +- .../include/reader/PixelsRecordReader.h | 2 +- .../include/reader/PixelsRecordReaderImpl.h | 2 +- cpp/pixels-core/include/vector/ColumnVector.h | 4 +- cpp/pixels-core/include/writer/ColumnWriter.h | 3 - cpp/pixels-core/lib/PixelsFilter.cpp | 10 +- cpp/pixels-core/lib/PixelsReaderImpl.cpp | 180 +++++++++++++----- .../lib/reader/PixelsReaderOption.cpp | 56 ++---- .../lib/reader/PixelsRecordReaderImpl.cpp | 59 +++--- .../lib/writer/DecimalColumnWriter.cpp | 3 +- cpp/tests/writer/PixelsWriterTest.cpp | 2 +- 15 files changed, 350 insertions(+), 160 deletions(-) rename cpp/{ => etc}/pixels-cpp.properties (98%) create mode 100644 cpp/pixels-core/include/PixelsTypes.h diff --git a/cpp/pixels-cpp.properties b/cpp/etc/pixels-cpp.properties similarity index 98% rename from cpp/pixels-cpp.properties rename to cpp/etc/pixels-cpp.properties index ca0bda4a1d..481ea3cd52 100644 --- a/cpp/pixels-cpp.properties +++ b/cpp/etc/pixels-cpp.properties @@ -21,7 +21,7 @@ pixel.column.size.path=/home/whz/pixels/clickbench-size-e0.csv # the work thread to run parquet. -1 means using all CPU cores parquet.threads=-1 - +pixels.doublebuffer=true # storage device identifier directory depth # this parameter defines the directory depth that determines the storage device. # for example, we have three SSDs, the path is /data/ssd1, /data/ssd2 and /data/ssd3, so the depth is 2 @@ -63,4 +63,3 @@ pixel.bufferpool.hugepage=true # 1 IORING_SETUP_IOPOLL # 2 IORING_SETUP_SQPOLL pixels.io_uring.mode=0 - diff --git a/cpp/pixels-common/lib/utils/ConfigFactory.cpp b/cpp/pixels-common/lib/utils/ConfigFactory.cpp index e0623f09ac..ab5996346d 100644 --- a/cpp/pixels-common/lib/utils/ConfigFactory.cpp +++ b/cpp/pixels-common/lib/utils/ConfigFactory.cpp @@ -53,8 +53,8 @@ ConfigFactory::ConfigFactory() { pixelsHome += "/"; } - std::ifstream infile(pixelsHome + "etc/pixels-cpp.properties"); - std::cout << "pixels properties file is " << pixelsHome + "etc/pixels-cpp.properties" << std::endl; + std::ifstream infile(pixelsHome + "cpp/etc/pixels-cpp.properties"); + std::cout << "pixels properties file is " << pixelsHome + "cpp/etc/pixels-cpp.properties" << std::endl; std::string line; while (std::getline(infile, line)) { diff --git a/cpp/pixels-core/include/PixelsBitMask.h b/cpp/pixels-core/include/PixelsBitMask.h index 044b0a4623..45365e1c9f 100644 --- a/cpp/pixels-core/include/PixelsBitMask.h +++ b/cpp/pixels-core/include/PixelsBitMask.h @@ -26,13 +26,6 @@ #define DUCKDB_PIXELSBITMASK_H #include -#include "duckdb/planner/table_filter.hpp" -#include "duckdb/common/vector_size.hpp" -#include "duckdb/planner/filter/constant_filter.hpp" -#include "duckdb/planner/filter/null_filter.hpp" -#include "duckdb/planner/filter/conjunction_filter.hpp" -#include "duckdb/common/operator/comparison_operators.hpp" - #include "vector/ColumnVector.h" #include "TypeDescription.h" @@ -68,4 +61,4 @@ class PixelsBitMask uint8_t get(long index); }; -#endif //DUCKDB_PIXELSBITMASK_H +#endif //DUCKDB_PIXELSBITMASK_H \ No newline at end of file diff --git a/cpp/pixels-core/include/PixelsTypes.h b/cpp/pixels-core/include/PixelsTypes.h new file mode 100644 index 0000000000..f123fee8df --- /dev/null +++ b/cpp/pixels-core/include/PixelsTypes.h @@ -0,0 +1,155 @@ +#pragma once +//放一些枚举类 + +#ifndef PIXELS_TYPES_H +#define PIXELS_TYPES_H +#include +#include +#include +#include +using idx_t = uint64_t; +namespace pixels { + +enum class PhysicalType : uint8_t {//decimal的物理存储类型 + INT16, + INT32, + INT64, + INT128, +}; + +struct DecimalConfig {//decimal不同物理存储类型的最大宽度 + static constexpr int MAX_WIDTH_INT16 = 4; + static constexpr int MAX_WIDTH_INT32 = 9; + static constexpr int MAX_WIDTH_INT64 = 18; + static constexpr int MAX_WIDTH_INT128 = 38; +}; + +enum class ComparisonOperator : uint8_t { + EQUAL, + NOT_EQUAL, + GREATER_THAN, + GREATER_THAN_OR_EQUAL, + LESS_THAN, + LESS_THAN_OR_EQUAL, + IS_NULL, + IS_NOT_NULL +}; + +enum class TableFilterType : uint8_t { + CONSTANT_COMPARISON = 0, // constant comparison (e.g. =C, >C, >=C, +#include #include "PixelsFilter.h" +#include "PixelsBitMask.h" +#include "vector/ColumnVector.h" class ColumnReader { @@ -41,10 +42,10 @@ class ColumnReader static std::shared_ptr newColumnReader(std::shared_ptr type); /** - * Closes this column reader and releases any resources associated - * with it. If the column reader is already closed then invoking this - * method has no effect. - */ + * Closes this column reader and releases any resources associated + * with it. If the column reader is already closed then invoking this + * method has no effect. + */ virtual void close() = 0; /** @@ -59,6 +60,7 @@ class ColumnReader * @param vectorIndex the index from where we start reading values into the vector * @param vector vector to read values into * @param chunkIndex the metadata of the column chunk to read. + * @param filterMask the bitmask to store filter results */ virtual void read(std::shared_ptr input, pixels::proto::ColumnEncoding &encoding, @@ -75,4 +77,4 @@ class ColumnReader std::shared_ptr type; uint32_t isNullOffset; }; -#endif //PIXELS_COLUMNREADER_H +#endif //PIXELS_COLUMNREADER_H \ No newline at end of file diff --git a/cpp/pixels-core/include/reader/PixelsRecordReader.h b/cpp/pixels-core/include/reader/PixelsRecordReader.h index b6c51cb97a..735d147b8c 100644 --- a/cpp/pixels-core/include/reader/PixelsRecordReader.h +++ b/cpp/pixels-core/include/reader/PixelsRecordReader.h @@ -48,4 +48,4 @@ class PixelsRecordReader virtual void close() = 0; }; -#endif //PIXELS_PIXELSRECORDREADER_H +#endif //PIXELS_PIXELSRECORDREADER_H \ No newline at end of file diff --git a/cpp/pixels-core/include/reader/PixelsRecordReaderImpl.h b/cpp/pixels-core/include/reader/PixelsRecordReaderImpl.h index 8ad01072c3..2003346e2e 100644 --- a/cpp/pixels-core/include/reader/PixelsRecordReaderImpl.h +++ b/cpp/pixels-core/include/reader/PixelsRecordReaderImpl.h @@ -154,4 +154,4 @@ class PixelsRecordReaderImpl : public PixelsRecordReader std::shared_ptr resultRowBatch; int asyncReadRequestNum{0}; }; -#endif //PIXELS_PIXELSRECORDREADERIMPL_H +#endif //PIXELS_PIXELSRECORDREADERIMPL_H \ No newline at end of file diff --git a/cpp/pixels-core/include/vector/ColumnVector.h b/cpp/pixels-core/include/vector/ColumnVector.h index f4f322ada4..550409d5b8 100644 --- a/cpp/pixels-core/include/vector/ColumnVector.h +++ b/cpp/pixels-core/include/vector/ColumnVector.h @@ -38,8 +38,6 @@ * structure that is used in the inner loop of query execution. */ -#include "duckdb.h" - #include "exception/InvalidArgumentException.h" #include #include @@ -56,7 +54,7 @@ * The fields are public by design since this is a performance-critical * structure that is used in the inner loop of query execution. */ - +using idx_t = uint64_t; class ColumnVector { public: diff --git a/cpp/pixels-core/include/writer/ColumnWriter.h b/cpp/pixels-core/include/writer/ColumnWriter.h index 5ef608c382..76234215ba 100644 --- a/cpp/pixels-core/include/writer/ColumnWriter.h +++ b/cpp/pixels-core/include/writer/ColumnWriter.h @@ -29,9 +29,6 @@ #include "physical/natives/ByteBuffer.h" #include "pixels-common/pixels.pb.h" #include -#include -#include "duckdb.h" -#include "duckdb/common/types/vector.hpp" #include "PixelsFilter.h" #include "writer/PixelsWriterOption.h" #include "stats/StatsRecorder.h" diff --git a/cpp/pixels-core/lib/PixelsFilter.cpp b/cpp/pixels-core/lib/PixelsFilter.cpp index 7f11fa8d6c..1e4911bd78 100644 --- a/cpp/pixels-core/lib/PixelsFilter.cpp +++ b/cpp/pixels-core/lib/PixelsFilter.cpp @@ -107,7 +107,7 @@ int PixelsFilter::CompareAvx2(void *data, T constant) template void PixelsFilter::IntFilterOperation(std::shared_ptr vector, const pixels::Scalar &constant, PixelsBitMask &filter_mask){ - int32_t constant_value=std::get(constant); + int32_t constant_value=constant.get_int32(); auto intColumnVector = std::static_pointer_cast(vector); int i = 0; #ifdef ENABLE_SIMD_FILTER @@ -126,7 +126,7 @@ template void PixelsFilter::LongFilterOperation(std::shared_ptr vector,const pixels::Scalar &constant, PixelsBitMask &filter_mask){ //printf("enter long filter operation\n"); // 传入地址 &constant - int64_t constant_value=std::get(constant); + int64_t constant_value=constant.get_int64(); //printf("long filter constant value: %lld\n", constant_value); auto longColumnVector = std::static_pointer_cast(vector); int i = 0; @@ -146,7 +146,7 @@ void PixelsFilter::LongFilterOperation(std::shared_ptr vector,con template void PixelsFilter::DateFilterOperation(std::shared_ptr vector, const pixels::Scalar &constant, PixelsBitMask &filter_mask){ - int32_t constant_value=std::get(constant); + int32_t constant_value=constant.get_int32(); //date的constant传入时已经转换为int32了 auto dateColumnVector = std::static_pointer_cast(vector); int i = 0; #ifdef ENABLE_SIMD_FILTER @@ -164,7 +164,7 @@ void PixelsFilter::DateFilterOperation(std::shared_ptr vector, template void PixelsFilter::DecimalFilterOperation(std::shared_ptr vector, const pixels::Scalar &constant, PixelsBitMask &filter_mask){ - int64_t constant_value=std::get(constant); + int64_t constant_value=constant.get_int64(); //decimal的constant传入时已经转换为int64了 //std::cout<<"in filter the decimal is "<(vector); int i = 0; @@ -184,7 +184,7 @@ void PixelsFilter::DecimalFilterOperation(std::shared_ptr vector, template void PixelsFilter::StringFilterOperation(std::shared_ptr vector,//string目前只能并行比较,原项目也是这样,沿用,后续再想办法 const pixels::Scalar &constant, PixelsBitMask &filter_mask){ - std::string constant_value=std::get(constant); + std::string constant_value=constant.get_string(); auto binaryColumnVector = std::static_pointer_cast(vector); for (int i = 0; i < vector->length; i++) { diff --git a/cpp/pixels-core/lib/PixelsReaderImpl.cpp b/cpp/pixels-core/lib/PixelsReaderImpl.cpp index 66b73525c2..4ec862e019 100644 --- a/cpp/pixels-core/lib/PixelsReaderImpl.cpp +++ b/cpp/pixels-core/lib/PixelsReaderImpl.cpp @@ -22,72 +22,148 @@ * @author liyu * @create 2023-03-06 */ -#ifndef PIXELS_PIXELSREADERIMPL_H -#define PIXELS_PIXELSREADERIMPL_H +#include "PixelsReaderImpl.h" -#include "PixelsReader.h" -#include "reader/PixelsRecordReaderImpl.h" -#include -#include -#include "pixels-common/pixels.pb.h" -#include "PixelsFooterCache.h" -#include "reader/PixelsReaderOption.h" - - -class PixelsReaderBuilder; - -class PixelsReaderImpl : public PixelsReader +PixelsReaderImpl::PixelsReaderImpl(std::shared_ptr fileSchema, + std::shared_ptr reader, + std::shared_ptr fileTail, + std::shared_ptr footerCache) { -public: - std::shared_ptr read(PixelsReaderOption option); - - PixelsReaderImpl(std::shared_ptr fileSchema, - std::shared_ptr reader, - std::shared_ptr fileTail, - std::shared_ptr footerCache); - - ~PixelsReaderImpl(); - - std::shared_ptr getFileSchema() override; - - PixelsVersion::Version getFileVersion() override; + this->fileSchema = fileSchema; + this->physicalReader = reader; + this->footer = fileTail->footer(); + this->postScript = fileTail->postscript(); + this->pixelsFooterCache = footerCache; + this->closed = false; +} - long getNumberOfRows() override; - pixels::proto::CompressionKind getCompressionKind() override; - - long getCompressionBlockSize() override; +/** + * Prepare for the next row batch. This method is independent from readBatch(). + * + * @param batchSize the willing batch size + * @return the real batch size + */ +std::shared_ptr PixelsReaderImpl::read(PixelsReaderOption option) +{ + // TODO: add a function parameter, and the code before creating PixelsRecordReaderImpl + std::shared_ptr recordReader = + std::make_shared( + physicalReader, postScript, + footer, option, pixelsFooterCache); + recordReaders.emplace_back(recordReader); + //std::cout<<"at the end of reader move the size of filters is "< PixelsReaderImpl::getFileSchema() +{ + return fileSchema; +} - long getPixelStride() override; +PixelsVersion::Version PixelsReaderImpl::getFileVersion() +{ + return PixelsVersion::from(postScript.version()); +} - std::string getWriterTimeZone() override; +long PixelsReaderImpl::getNumberOfRows() +{ + return postScript.numberofrows(); +} - int getRowGroupNum() override; +pixels::proto::CompressionKind PixelsReaderImpl::getCompressionKind() +{ + return postScript.compression(); +} - bool isPartitioned() override; +long PixelsReaderImpl::getCompressionBlockSize() +{ + return postScript.compressionblocksize(); +} - ColumnStatisticList getColumnStats() override; +long PixelsReaderImpl::getPixelStride() +{ + return postScript.pixelstride(); +} - pixels::proto::ColumnStatistic getColumnStat(std::string columnName) override; +std::string PixelsReaderImpl::getWriterTimeZone() +{ + return postScript.writertimezone(); +} - RowGroupInfoList getRowGroupInfos() override; +int PixelsReaderImpl::getRowGroupNum() +{ + return footer.rowgroupinfos_size(); +} - pixels::proto::RowGroupInformation getRowGroupInfo(int rowGroupId) override; +bool PixelsReaderImpl::isPartitioned() +{ + return postScript.has_partitioned() && postScript.partitioned(); +} - pixels::proto::RowGroupStatistic getRowGroupStat(int rowGroupId) override; +ColumnStatisticList PixelsReaderImpl::getColumnStats() +{ + return footer.columnstats(); +} - RowGroupStatList getRowGroupStats() override; +pixels::proto::ColumnStatistic PixelsReaderImpl::getColumnStat(std::string columnName) +{ + auto fieldNames = fileSchema->getFieldNames(); + auto fieldIter = std::find(fieldNames.begin(), fieldNames.end(), columnName); + if (fieldIter == fieldNames.end()) + { + throw InvalidArgumentException("the column " + + columnName + " is not the field name!"); + } + int fieldId = fieldIter - fieldNames.begin(); + return footer.columnstats().Get(fieldId); +} + +RowGroupInfoList PixelsReaderImpl::getRowGroupInfos() +{ + return footer.rowgroupinfos(); +} - void close() override; +pixels::proto::RowGroupInformation PixelsReaderImpl::getRowGroupInfo(int rowGroupId) +{ + if (rowGroupId < 0 || rowGroupId >= footer.columnstats_size()) + { + throw InvalidArgumentException("row group id is out of bound."); + } + return footer.rowgroupinfos().Get(rowGroupId); +} + +pixels::proto::RowGroupStatistic PixelsReaderImpl::getRowGroupStat(int rowGroupId) +{ + if (rowGroupId < 0 || rowGroupId >= footer.columnstats_size()) + { + throw InvalidArgumentException("row group id is out of bound."); + } + return footer.rowgroupstats().Get(rowGroupId); +} + +RowGroupStatList PixelsReaderImpl::getRowGroupStats() +{ + return footer.rowgroupstats(); +} -private: - std::vector > recordReaders; - std::shared_ptr fileSchema; - std::shared_ptr physicalReader; - std::shared_ptr pixelsFooterCache; - pixels::proto::PostScript postScript; - pixels::proto::Footer footer; - bool closed; -}; +PixelsReaderImpl::~PixelsReaderImpl() +{ + if (!closed) + { + PixelsReaderImpl::close(); + } +} -#endif //PIXELS_PIXELSREADERIMPL_H \ No newline at end of file +void PixelsReaderImpl::close() +{ + if (!closed) + { + for (auto recordReader: recordReaders) + { + recordReader->close(); + } + recordReaders.clear(); + physicalReader->close(); + } +} \ No newline at end of file diff --git a/cpp/pixels-core/lib/reader/PixelsReaderOption.cpp b/cpp/pixels-core/lib/reader/PixelsReaderOption.cpp index 631bd9527c..16779ed089 100644 --- a/cpp/pixels-core/lib/reader/PixelsReaderOption.cpp +++ b/cpp/pixels-core/lib/reader/PixelsReaderOption.cpp @@ -1,40 +1,16 @@ -/* - * Copyright 2023 PixelsDB. - * - * This file is part of Pixels. - * - * Pixels is free software: you can redistribute it and/or modify - * it under the terms of the Affero GNU General Public License as - * published by the Free Software Foundation, either version 3 of - * the License, or (at your option) any later version. - * - * Pixels is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Affero GNU General Public License for more details. - * - * You should have received a copy of the Affero GNU General Public - * License along with Pixels. If not, see - * . - */ - -/* - * @author liyu - * @create 2023-03-15 - */ #include "reader/PixelsReaderOption.h" PixelsReaderOption::PixelsReaderOption() { - // TODO: pixelsPredicate skipCorruptRecords = false; tolerantSchemaEvolution = true; enableEncodedColumnVector = true; - enableFilterPushDown = false; + enableFilterPushDown = false; queryId = -1L; batchSize = 0; rgStart = 0; rgLen = -1; // -1 means reading to the end of the file + ringIndex = 0; // 显式初始化 } void PixelsReaderOption::setIncludeCols(const std::vector &columnNames) @@ -121,14 +97,18 @@ bool PixelsReaderOption::isEnabledFilterPushDown() return this->enableFilterPushDown; } -void PixelsReaderOption::setFilter(duckdb::TableFilterSet *filter) -{ - this->filter = filter; + +void PixelsReaderOption::setFilter(pixels::TableFilterSet f) { + //std::cout<<"at set filter"<filter; +pixels::TableFilterSet PixelsReaderOption::extractFilter() { + return std::move(filter); +} + +int PixelsReaderOption::getfiltersize(){ + return filter.filters.size(); } void PixelsReaderOption::setBatchSize(int batchSize) @@ -139,14 +119,4 @@ void PixelsReaderOption::setBatchSize(int batchSize) int PixelsReaderOption::getBatchSize() const { return batchSize; -} - - - - - - - - - - +} \ No newline at end of file diff --git a/cpp/pixels-core/lib/reader/PixelsRecordReaderImpl.cpp b/cpp/pixels-core/lib/reader/PixelsRecordReaderImpl.cpp index 4055ac4d31..5ccc4cdbf0 100644 --- a/cpp/pixels-core/lib/reader/PixelsRecordReaderImpl.cpp +++ b/cpp/pixels-core/lib/reader/PixelsRecordReaderImpl.cpp @@ -29,14 +29,14 @@ std::mutex PixelsRecordReaderImpl::mutex_; PixelsRecordReaderImpl::PixelsRecordReaderImpl(std::shared_ptr reader, const pixels::proto::PostScript &pixelsPostScript, const pixels::proto::Footer &pixelsFooter, - const PixelsReaderOption &opt, + PixelsReaderOption &opt, std::shared_ptr pixelsFooterCache) { physicalReader = reader; footer = pixelsFooter; postScript = pixelsPostScript; footerCache = pixelsFooterCache; - option = opt; + option = std::move(opt); // TODO: intialize all kinds of variable queryId = option.getQueryId(); RGStart = option.getRGStart(); @@ -46,13 +46,12 @@ PixelsRecordReaderImpl::PixelsRecordReaderImpl(std::shared_ptr // for test purpose, can we comment it temporarily // assert(batchSize >= STANDARD_VECTOR_SIZE); enabledFilterPushDown = option.isEnabledFilterPushDown(); - if (enabledFilterPushDown) - { - filter = option.getFilter(); - } - else - { - filter = nullptr; + if (enabledFilterPushDown) { + // 从 option 中把含有 unique_ptr 的 map 彻底移动到当前类的成员 filter 中 + this->filter = option.extractFilter(); + } else { + // 如果 TableFilterSet 内部是 map,清除即可 + this->filter.filters.clear(); } filterMask = nullptr; everRead = false; @@ -222,38 +221,41 @@ std::shared_ptr PixelsRecordReaderImpl::readBatch(bool reus } auto columnVectors = resultRowBatch->cols; - if (filterMask != nullptr) - { + if (filterMask != nullptr) { filterMask->set(); } - if(asyncReadRequestNum > 0) - { + if(asyncReadRequestNum > 0) { asyncReadComplete(asyncReadRequestNum); } std::vector filterColumnIndex; - if (filter != nullptr) + //std::cout << "filter size: " << filter.filters.size() << std::endl; + if (!filter.filters.empty())//reader的filter没有了duckdb的tablefilterset,改成自己的tablefilterset { - for (auto &filterCol: filter->filters) + for (auto const& [col_idx, filter_ptr] : filter.filters) { - if (filterMask->isNone()) - { - break; - } - int i = filterCol.first; - int index = curChunkBufferIndex.at(i); - auto &encoding = curEncoding.at(i); - auto &chunkIndex = curChunkIndex.at(i); - readers.at(i)->read(chunkBuffers.at(index), *encoding, curRowInRG, curBatchSize, - postScript.pixelstride(), resultRowBatch->rowCount, - columnVectors.at(i), *chunkIndex, filterMask); + // col_idx 是 idx_t 类型 + // filter_ptr 是 std::unique_ptr& 类型 + //printf("filter column index: %d\n", col_idx); + int index = curChunkBufferIndex.at(col_idx); + auto &encoding = curEncoding.at(col_idx); + auto &chunkIndex = curChunkIndex.at(col_idx); + readers.at(col_idx)->read(chunkBuffers.at(index), *encoding, curRowInRG, curBatchSize, + postScript.pixelstride(), resultRowBatch->rowCount,columnVectors.at(col_idx), *chunkIndex, filterMask); filterColumnIndex.emplace_back(index); - PixelsFilter::ApplyFilter(columnVectors.at(i), *filterCol.second, *filterMask, - resultSchema->getChildren().at(i)); + + // 注意:传给 ApplyFilter 时,需要解引用指针以获取对象引用 + PixelsFilter::ApplyFilter( + columnVectors.at(col_idx), + *filter_ptr, // 这里解引用 unique_ptr 得到 TableFilter& + *filterMask, + resultSchema->getChildren().at(col_idx) + ); } } + // read vectors for (int i = 0; i < resultColumns.size(); i++) { @@ -571,4 +573,3 @@ void PixelsRecordReaderImpl::close() includedColumnTypes.clear(); endOfFile = true; } - diff --git a/cpp/pixels-core/lib/writer/DecimalColumnWriter.cpp b/cpp/pixels-core/lib/writer/DecimalColumnWriter.cpp index 298effa639..0b8effdd6f 100644 --- a/cpp/pixels-core/lib/writer/DecimalColumnWriter.cpp +++ b/cpp/pixels-core/lib/writer/DecimalColumnWriter.cpp @@ -38,7 +38,7 @@ int DecimalColumnWriter::write(std::shared_ptr vector, int size) throw std::invalid_argument("Invalid vector type"); } - long *values = columnVector->vector; + long *values = (long*)columnVector->vector; EncodingUtils encodingUtils; for (int i = 0; i < size; i++) @@ -73,4 +73,3 @@ bool DecimalColumnWriter::decideNullsPadding(std::shared_ptr { return writerOption->isNullsPadding(); } - diff --git a/cpp/tests/writer/PixelsWriterTest.cpp b/cpp/tests/writer/PixelsWriterTest.cpp index 5c60037848..255622f43e 100644 --- a/cpp/tests/writer/PixelsWriterTest.cpp +++ b/cpp/tests/writer/PixelsWriterTest.cpp @@ -153,7 +153,7 @@ TEST_F(PIXELS_WRITER_TEST, DISABLED_WRITE_AND_READ) option.setIncludeCols({"a"}); option.setBatchSize(10); option.setRGRange(0,1); - auto recordReader = pixels_reader->read(option); + auto recordReader = pixels_reader->read(std::move(option)); auto rowBatch = recordReader->readBatch(true); auto vector = std::static_pointer_cast(rowBatch->cols[0]); { From 1e26deaa30751b94900085026acfd6b34a248b5d Mon Sep 17 00:00:00 2001 From: a3377596 Date: Mon, 16 Mar 2026 20:17:11 +0800 Subject: [PATCH 3/3] fix bug --- cpp/pixels-common/CMakeLists.txt | 2 +- cpp/pixels-core/lib/PixelsBitMask.cpp | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/cpp/pixels-common/CMakeLists.txt b/cpp/pixels-common/CMakeLists.txt index 3cba67bf36..f62c77d50e 100644 --- a/cpp/pixels-common/CMakeLists.txt +++ b/cpp/pixels-common/CMakeLists.txt @@ -53,4 +53,4 @@ link_directories(${CMAKE_CURRENT_BINARY_DIR}/liburing/src) message(${CMAKE_CURRENT_BINARY_DIR}/liburing/src) target_link_libraries(pixels-common ${Protobuf_LIBRARIES} - ${CMAKE_CURRENT_BINARY_DIR}/liburing/src/liburing.a) + ${CMAKE_CURRENT_BINARY_DIR}/liburing/src/liburing.a) \ No newline at end of file diff --git a/cpp/pixels-core/lib/PixelsBitMask.cpp b/cpp/pixels-core/lib/PixelsBitMask.cpp index 2604acca07..4f3f079c33 100644 --- a/cpp/pixels-core/lib/PixelsBitMask.cpp +++ b/cpp/pixels-core/lib/PixelsBitMask.cpp @@ -89,16 +89,15 @@ void PixelsBitMask::set() void PixelsBitMask::set(long index, uint8_t value) { assert(index < maskLength); + uint8_t &byteMask = mask[index / 8]; uint8_t shiftMask = 1 << (index % 8); + if (value == 0) { - byteMask = byteMask & ~(shiftMask); - } - else - { - byteMask = byteMask | shiftMask; + byteMask &= ~shiftMask; } + // value==1 什么都不做 } @@ -133,7 +132,6 @@ void PixelsBitMask::And(long index, uint8_t value) void PixelsBitMask::setByteAligned(long index, uint8_t value) { - mask[index / 8] = value; + mask[index / 8] &= value; } -