Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 170 additions & 1 deletion crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use arrow_array::{Int32Array, RecordBatch, StringArray};
use futures::TryStreamExt;
use paimon::catalog::Identifier;
use paimon::{Catalog, FileSystemCatalog, Plan};
use paimon::{Catalog, Error, FileSystemCatalog, Plan};
use std::collections::HashSet;

fn get_test_warehouse() -> String {
Expand Down Expand Up @@ -293,3 +293,172 @@ async fn test_read_partitioned_dv_pk_table() {
]
);
}

async fn get_test_table(table_name: &str) -> paimon::Table {
let warehouse = get_test_warehouse();
let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog");
let identifier = Identifier::new("default", table_name);
catalog
.get_table(&identifier)
.await
.expect("Failed to get table")
}

#[tokio::test]
async fn test_read_with_column_projection() {
let table = get_test_table("partitioned_log_table").await;

let plan = table
.new_read_builder()
.new_scan()
.plan()
.await
.expect("Failed to plan scan");

// Input order ["name", "id"] — output must preserve this order (not schema order).
let read = table
.new_read_builder()
.with_projection(&["name", "id"])
.new_read()
.expect("Failed to create projected read");

let field_names: Vec<&str> = read.read_type().iter().map(|f| f.name()).collect();
assert_eq!(
field_names,
vec!["name", "id"],
"read_type should preserve caller-specified order"
);

let stream = read
.to_arrow(plan.splits())
.expect("Failed to create arrow stream");
let batches: Vec<RecordBatch> = stream
.try_collect()
.await
.expect("Failed to collect batches");
assert!(!batches.is_empty());

for batch in &batches {
let schema = batch.schema();
let batch_field_names: Vec<&str> =
schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
batch_field_names,
vec!["name", "id"],
"RecordBatch schema should preserve caller-specified order"
);
assert!(
batch.column_by_name("dt").is_none(),
"Non-projected column 'dt' should be absent"
);
}

let actual = extract_id_name(&batches);
let expected = vec![
(1, "alice".to_string()),
(2, "bob".to_string()),
(3, "carol".to_string()),
];
assert_eq!(actual, expected);
}

#[tokio::test]
async fn test_read_projection_empty() {
let table = get_test_table("simple_log_table").await;

let read = table
.new_read_builder()
.with_projection(&[])
.new_read()
.expect("Empty projection should succeed");

assert_eq!(
read.read_type().len(),
0,
"Empty projection should produce empty read_type"
);

let plan = table
.new_read_builder()
.new_scan()
.plan()
.await
.expect("Failed to plan scan");

let stream = read
.to_arrow(plan.splits())
.expect("Failed to create arrow stream");
let batches: Vec<RecordBatch> = stream
.try_collect()
.await
.expect("Failed to collect batches");
assert!(!batches.is_empty());

for batch in &batches {
assert_eq!(
batch.num_columns(),
0,
"Empty projection should produce 0-column batches"
);
}
}

#[tokio::test]
async fn test_read_projection_unknown_column() {
let table = get_test_table("simple_log_table").await;

let err = table
.new_read_builder()
.with_projection(&["id", "nonexistent_column"])
.new_read()
.expect_err("Unknown columns should fail");

assert!(
matches!(
&err,
Error::ColumnNotExist {
full_name,
column,
} if full_name == "default.simple_log_table" && column == "nonexistent_column"
),
"Expected ColumnNotExist for nonexistent_column, got: {err:?}"
);
}

#[tokio::test]
async fn test_read_projection_all_invalid() {
let table = get_test_table("simple_log_table").await;

let err = table
.new_read_builder()
.with_projection(&["nonexistent_a", "nonexistent_b"])
.new_read()
.expect_err("All-invalid projection should fail");

assert!(
matches!(
&err,
Error::ColumnNotExist {
full_name,
column,
} if full_name == "default.simple_log_table" && column == "nonexistent_a"
),
"Expected ColumnNotExist for nonexistent_a, got: {err:?}"
);
}

#[tokio::test]
async fn test_read_projection_duplicate_column() {
let table = get_test_table("simple_log_table").await;

let err = table
.new_read_builder()
.with_projection(&["id", "id"])
.new_read()
.expect_err("Duplicate projection should fail");

assert!(
matches!(&err, Error::ConfigInvalid { message } if message.contains("Duplicate projection column 'id'")),
"Expected ConfigInvalid for duplicate projection, got: {err:?}"
);
}
51 changes: 35 additions & 16 deletions crates/paimon/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,10 @@ impl ArrowReader {
// Owned list of splits so the stream does not hold references.
let splits: Vec<DataSplit> = data_splits.to_vec();
let read_type = self.read_type;
let projected_column_names = (!read_type.is_empty()).then(|| {
read_type
.iter()
.map(|field| field.name().to_string())
.collect::<Vec<_>>()
});
let projected_column_names: Vec<String> = read_type
.iter()
.map(|field| field.name().to_string())
.collect();
Ok(try_stream! {
for split in splits {
// Create DV factory for this split only (like Java createReader(partition, bucket, files, deletionFiles)).
Expand Down Expand Up @@ -134,15 +132,13 @@ impl ArrowReader {
let mut batch_stream_builder =
ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
.await?;
// Clip to read type columns; file-only columns are dropped.
if let Some(projected_column_names) = projected_column_names.as_ref() {
let parquet_schema = batch_stream_builder.parquet_schema();
let mask = ProjectionMask::columns(
parquet_schema,
projected_column_names.iter().map(String::as_str),
);
batch_stream_builder = batch_stream_builder.with_projection(mask);
}
// ProjectionMask preserves parquet-schema order; read_type order is restored below.
let parquet_schema = batch_stream_builder.parquet_schema().clone();
let mask = ProjectionMask::columns(
&parquet_schema,
projected_column_names.iter().map(String::as_str),
);
batch_stream_builder = batch_stream_builder.with_projection(mask);

if let Some(dv) = dv {
if !dv.is_empty() {
Expand All @@ -157,7 +153,30 @@ impl ArrowReader {
let mut batch_stream = batch_stream_builder.build()?;

while let Some(batch) = batch_stream.next().await {
yield batch?
let batch = batch?;
// Reorder columns from parquet-schema order to read_type order.
// Every projected column must exist in the batch; a missing
// column indicates schema mismatch and must not be silenced.
let reorder_indices: Vec<usize> = projected_column_names
.iter()
.map(|name| {
batch.schema().index_of(name).map_err(|_| {
Error::UnexpectedError {
message: format!(
"Projected column '{}' not found in Parquet batch schema",
name
),
source: None,
}
})
})
.collect::<crate::Result<Vec<_>>>()?;
yield batch.project(&reorder_indices).map_err(|e| {
Error::UnexpectedError {
message: "Failed to reorder projected columns".to_string(),
source: Some(Box::new(e)),
}
})?;
}
}
}
Expand Down
71 changes: 64 additions & 7 deletions crates/paimon/src/table/read_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,40 @@

//! ReadBuilder and TableRead for table read API.
//!
//! Reference: [pypaimon.read.read_builder.ReadBuilder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/read_builder.py)
//! and [pypaimon.table.file_store_table.FileStoreTable.new_read_builder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/table/file_store_table.py).
//! Reference: [Java ReadBuilder.withProjection](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java)
//! and [TypeUtils.project](https://github.com/apache/paimon/blob/master/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java).

use super::{ArrowRecordBatchStream, Table, TableScan};
use crate::arrow::ArrowReaderBuilder;
use crate::spec::{CoreOptions, DataField};
use crate::Result;
use crate::{DataSplit, Error};
use std::collections::{HashMap, HashSet};

/// Builder for table scan and table read (new_scan, new_read).
///
/// Reference: [pypaimon.read.read_builder.ReadBuilder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/read_builder.py)
/// Rust keeps a names-based projection API for ergonomics, while aligning the
/// resulting read semantics with Java Paimon's order-preserving projection.
#[derive(Debug, Clone)]
pub struct ReadBuilder<'a> {
table: &'a Table,
projected_fields: Option<Vec<String>>,
}

impl<'a> ReadBuilder<'a> {
pub(crate) fn new(table: &'a Table) -> Self {
Self { table }
Self {
table,
projected_fields: None,
}
}

/// Set column projection by name. Output order follows the caller-specified order.
/// Unknown or duplicate names cause `new_read()` to fail; an empty list is a valid
/// zero-column projection.
pub fn with_projection(mut self, columns: &[&str]) -> Self {
self.projected_fields = Some(columns.iter().map(|c| (*c).to_string()).collect());
self
}

/// Create a table scan. Call [TableScan::plan] to get splits.
Expand All @@ -46,12 +60,55 @@ impl<'a> ReadBuilder<'a> {

/// Create a table read for consuming splits (e.g. from a scan plan).
pub fn new_read(&self) -> Result<TableRead<'a>> {
let read_type = self.table.schema.fields();
let read_type = match &self.projected_fields {
None => self.table.schema.fields().to_vec(),
Some(projected) => self.resolve_projected_fields(projected)?,
};

Ok(TableRead {
table: self.table,
read_type,
})
}

fn resolve_projected_fields(&self, projected_fields: &[String]) -> Result<Vec<DataField>> {
if projected_fields.is_empty() {
return Ok(Vec::new());
}

let full_name = self.table.identifier().full_name();
let field_map: HashMap<&str, &DataField> = self
.table
.schema
.fields()
.iter()
.map(|field| (field.name(), field))
.collect();

let mut seen = HashSet::with_capacity(projected_fields.len());
let mut resolved = Vec::with_capacity(projected_fields.len());

for name in projected_fields {
if !seen.insert(name.as_str()) {
return Err(Error::ConfigInvalid {
message: format!(
"Duplicate projection column '{}' for table {}",
name, full_name
),
});
}

let field = field_map
.get(name.as_str())
.ok_or_else(|| Error::ColumnNotExist {
full_name: full_name.clone(),
column: name.clone(),
})?;
resolved.push((*field).clone());
}

Ok(resolved)
}
}

/// Table read: reads data from splits (e.g. produced by [TableScan::plan]).
Expand All @@ -60,13 +117,13 @@ impl<'a> ReadBuilder<'a> {
#[derive(Debug, Clone)]
pub struct TableRead<'a> {
table: &'a Table,
read_type: &'a [DataField],
read_type: Vec<DataField>,
}

impl<'a> TableRead<'a> {
/// Schema (fields) that this read will produce.
pub fn read_type(&self) -> &[DataField] {
self.read_type
&self.read_type
}

/// Table for this read.
Expand Down
Loading