diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index 69a05f7..f7f7b57 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -9,28 +9,27 @@ // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on -// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //! Integration tests for reading Paimon tables provisioned by Spark. -use arrow_array::{Int32Array, StringArray}; +use arrow_array::{Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use paimon::catalog::Identifier; -use paimon::{Catalog, FileSystemCatalog}; +use paimon::{Catalog, FileSystemCatalog, Plan}; +use std::collections::HashSet; -/// Get the test warehouse path from environment variable or use default. fn get_test_warehouse() -> String { std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| "/tmp/paimon-warehouse".to_string()) } -async fn read_rows(table_name: &str) -> Vec<(i32, String)> { +async fn scan_and_read(table_name: &str) -> (Plan, Vec) { let warehouse = get_test_warehouse(); let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog"); - let identifier = Identifier::new("default", table_name); let table = catalog .get_table(&identifier) @@ -38,14 +37,13 @@ async fn read_rows(table_name: &str) -> Vec<(i32, String)> { .expect("Failed to get table"); let read_builder = table.new_read_builder(); - let read = read_builder.new_read().expect("Failed to create read"); let scan = read_builder.new_scan(); let plan = scan.plan().await.expect("Failed to plan scan"); + let read = read_builder.new_read().expect("Failed to create read"); let stream = read .to_arrow(plan.splits()) .expect("Failed to create arrow stream"); - let batches: Vec<_> = stream .try_collect() .await @@ -55,47 +53,57 @@ async fn read_rows(table_name: &str) -> Vec<(i32, String)> { !batches.is_empty(), "Expected at least one batch from table {table_name}" ); + (plan, batches) +} - let mut actual_rows: Vec<(i32, String)> = Vec::new(); - - for batch in &batches { - let id_array = batch +fn extract_id_name(batches: &[RecordBatch]) -> Vec<(i32, String)> { + let mut rows = Vec::new(); + for batch in batches { + let id = batch .column_by_name("id") .and_then(|c| c.as_any().downcast_ref::()) - .expect("Expected Int32Array for id column"); - let name_array = batch + .expect("Expected Int32Array for id"); + let name = batch .column_by_name("name") .and_then(|c| c.as_any().downcast_ref::()) - .expect("Expected StringArray for name column"); - + .expect("Expected StringArray for name"); for i in 0..batch.num_rows() { - actual_rows.push((id_array.value(i), name_array.value(i).to_string())); + rows.push((id.value(i), name.value(i).to_string())); } } - - actual_rows.sort_by_key(|(id, _)| *id); - actual_rows + rows.sort_by_key(|(id, _)| *id); + rows } #[tokio::test] async fn test_read_log_table() { - let actual_rows = read_rows("simple_log_table").await; - let expected_rows = vec![ + let (plan, batches) = scan_and_read("simple_log_table").await; + + // Non-partitioned table: partition should be a valid arity=0 BinaryRow + // deserialized from manifest bytes, not a stub without backing data. + for split in plan.splits() { + let partition = split.partition(); + assert_eq!(partition.arity(), 0); + assert!( + !partition.is_empty(), + "Non-partitioned split should have backing data from manifest deserialization" + ); + } + + let actual = extract_id_name(&batches); + let expected = vec![ (1, "alice".to_string()), (2, "bob".to_string()), (3, "carol".to_string()), ]; - - assert_eq!( - actual_rows, expected_rows, - "Rows should match expected values" - ); + assert_eq!(actual, expected, "Rows should match expected values"); } #[tokio::test] async fn test_read_dv_primary_key_table() { - let actual_rows = read_rows("simple_dv_pk_table").await; - let expected_rows = vec![ + let (_, batches) = scan_and_read("simple_dv_pk_table").await; + let actual = extract_id_name(&batches); + let expected = vec![ (1, "alice-v2".to_string()), (2, "bob-v2".to_string()), (3, "carol-v2".to_string()), @@ -103,9 +111,185 @@ async fn test_read_dv_primary_key_table() { (5, "eve-v2".to_string()), (6, "frank-v1".to_string()), ]; - assert_eq!( - actual_rows, expected_rows, + actual, expected, "DV-enabled PK table should only expose the latest row per key" ); } + +#[tokio::test] +async fn test_read_partitioned_log_table() { + let (plan, batches) = scan_and_read("partitioned_log_table").await; + + let mut seen_partitions: HashSet = HashSet::new(); + for split in plan.splits() { + let partition = split.partition(); + assert_eq!(partition.arity(), 1); + assert!(!partition.is_empty()); + let dt = partition.get_string(0).expect("Failed to decode dt"); + let expected_suffix = format!("dt={dt}/bucket-{}", split.bucket()); + assert!( + split.bucket_path().ends_with(&expected_suffix), + "bucket_path should end with '{expected_suffix}', got: {}", + split.bucket_path() + ); + seen_partitions.insert(dt.to_string()); + } + assert_eq!( + seen_partitions, + HashSet::from(["2024-01-01".into(), "2024-01-02".into()]) + ); + + let mut rows: Vec<(i32, String, String)> = Vec::new(); + for batch in &batches { + let id = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("id"); + let name = batch + .column_by_name("name") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("name"); + let dt = batch + .column_by_name("dt") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("dt"); + for i in 0..batch.num_rows() { + rows.push((id.value(i), name.value(i).into(), dt.value(i).into())); + } + } + rows.sort_by_key(|(id, _, _)| *id); + + assert_eq!( + rows, + vec![ + (1, "alice".into(), "2024-01-01".into()), + (2, "bob".into(), "2024-01-01".into()), + (3, "carol".into(), "2024-01-02".into()), + ] + ); +} + +#[tokio::test] +async fn test_read_multi_partitioned_log_table() { + let (plan, batches) = scan_and_read("multi_partitioned_log_table").await; + + let mut seen_partitions: HashSet<(String, i32)> = HashSet::new(); + for split in plan.splits() { + let partition = split.partition(); + assert_eq!(partition.arity(), 2); + assert!(!partition.is_empty()); + let dt = partition.get_string(0).expect("Failed to decode dt"); + let hr = partition.get_int(1).expect("Failed to decode hr"); + let expected_suffix = format!("dt={dt}/hr={hr}/bucket-{}", split.bucket()); + assert!( + split.bucket_path().ends_with(&expected_suffix), + "bucket_path should end with '{expected_suffix}', got: {}", + split.bucket_path() + ); + seen_partitions.insert((dt.to_string(), hr)); + } + assert_eq!( + seen_partitions, + HashSet::from([ + ("2024-01-01".into(), 10), + ("2024-01-01".into(), 20), + ("2024-01-02".into(), 10), + ]) + ); + + let mut rows: Vec<(i32, String, String, i32)> = Vec::new(); + for batch in &batches { + let id = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("id"); + let name = batch + .column_by_name("name") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("name"); + let dt = batch + .column_by_name("dt") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("dt"); + let hr = batch + .column_by_name("hr") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("hr"); + for i in 0..batch.num_rows() { + rows.push(( + id.value(i), + name.value(i).into(), + dt.value(i).into(), + hr.value(i), + )); + } + } + rows.sort_by_key(|(id, _, _, _)| *id); + + assert_eq!( + rows, + vec![ + (1, "alice".into(), "2024-01-01".into(), 10), + (2, "bob".into(), "2024-01-01".into(), 10), + (3, "carol".into(), "2024-01-01".into(), 20), + (4, "dave".into(), "2024-01-02".into(), 10), + ] + ); +} + +#[tokio::test] +async fn test_read_partitioned_dv_pk_table() { + let (plan, batches) = scan_and_read("partitioned_dv_pk_table").await; + + // Verify partition metadata on each split. + let mut seen_partitions: HashSet = HashSet::new(); + for split in plan.splits() { + let partition = split.partition(); + assert_eq!(partition.arity(), 1); + assert!(!partition.is_empty()); + let dt = partition.get_string(0).expect("Failed to decode dt"); + let expected_suffix = format!("dt={dt}/bucket-{}", split.bucket()); + assert!( + split.bucket_path().ends_with(&expected_suffix), + "bucket_path should end with '{expected_suffix}', got: {}", + split.bucket_path() + ); + seen_partitions.insert(dt.to_string()); + } + assert_eq!( + seen_partitions, + HashSet::from(["2024-01-01".into(), "2024-01-02".into()]) + ); + + let mut rows: Vec<(i32, String, String)> = Vec::new(); + for batch in &batches { + let id = batch + .column_by_name("id") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("id"); + let name = batch + .column_by_name("name") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("name"); + let dt = batch + .column_by_name("dt") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("dt"); + for i in 0..batch.num_rows() { + rows.push((id.value(i), name.value(i).into(), dt.value(i).into())); + } + } + rows.sort_by(|a, b| a.0.cmp(&b.0).then(a.2.cmp(&b.2))); + + assert_eq!( + rows, + vec![ + (1, "alice-v2".into(), "2024-01-01".into()), + (1, "alice-v1".into(), "2024-01-02".into()), + (2, "bob-v2".into(), "2024-01-01".into()), + (3, "carol-v2".into(), "2024-01-02".into()), + (4, "dave-v2".into(), "2024-01-02".into()), + ] + ); +} diff --git a/crates/paimon/src/spec/core_options.rs b/crates/paimon/src/spec/core_options.rs index 484ab57..f6164d0 100644 --- a/crates/paimon/src/spec/core_options.rs +++ b/crates/paimon/src/spec/core_options.rs @@ -20,8 +20,11 @@ use std::collections::HashMap; const DELETION_VECTORS_ENABLED_OPTION: &str = "deletion-vectors.enabled"; const SOURCE_SPLIT_TARGET_SIZE_OPTION: &str = "source.split.target-size"; const SOURCE_SPLIT_OPEN_FILE_COST_OPTION: &str = "source.split.open-file-cost"; +const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name"; +const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name"; const DEFAULT_SOURCE_SPLIT_TARGET_SIZE: i64 = 128 * 1024 * 1024; const DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST: i64 = 4 * 1024 * 1024; +const DEFAULT_PARTITION_DEFAULT_NAME: &str = "__DEFAULT_PARTITION__"; /// Typed accessors for common table options. /// @@ -39,7 +42,7 @@ impl<'a> CoreOptions<'a> { pub fn deletion_vectors_enabled(&self) -> bool { self.options .get(DELETION_VECTORS_ENABLED_OPTION) - .map(|value| matches!(value.to_ascii_lowercase().as_str(), "true")) + .map(|value| value.eq_ignore_ascii_case("true")) .unwrap_or(false) } @@ -56,6 +59,27 @@ impl<'a> CoreOptions<'a> { .and_then(|value| parse_memory_size(value)) .unwrap_or(DEFAULT_SOURCE_SPLIT_OPEN_FILE_COST) } + + /// The default partition name for null/blank partition values. + /// + /// Corresponds to Java `CoreOptions.PARTITION_DEFAULT_NAME`. + pub fn partition_default_name(&self) -> &str { + self.options + .get(PARTITION_DEFAULT_NAME_OPTION) + .map(String::as_str) + .unwrap_or(DEFAULT_PARTITION_DEFAULT_NAME) + } + + /// Whether to use legacy partition name formatting (toString semantics). + /// + /// Corresponds to Java `CoreOptions.PARTITION_GENERATE_LEGACY_NAME`. + /// Default: `true` to match Java Paimon. + pub fn legacy_partition_name(&self) -> bool { + self.options + .get(PARTITION_LEGACY_NAME_OPTION) + .map(|v| v.eq_ignore_ascii_case("true")) + .unwrap_or(true) + } } /// Parse a memory size string to bytes using binary (1024-based) semantics. @@ -131,4 +155,29 @@ mod tests { assert_eq!(parse_memory_size(""), None); assert_eq!(parse_memory_size("abc"), None); } + + #[test] + fn test_partition_options_defaults() { + let options = HashMap::new(); + let core = CoreOptions::new(&options); + assert_eq!(core.partition_default_name(), "__DEFAULT_PARTITION__"); + assert!(core.legacy_partition_name()); + } + + #[test] + fn test_partition_options_custom() { + let options = HashMap::from([ + ( + PARTITION_DEFAULT_NAME_OPTION.to_string(), + "NULL_PART".to_string(), + ), + ( + PARTITION_LEGACY_NAME_OPTION.to_string(), + "false".to_string(), + ), + ]); + let core = CoreOptions::new(&options); + assert_eq!(core.partition_default_name(), "NULL_PART"); + assert!(!core.legacy_partition_name()); + } } diff --git a/crates/paimon/src/spec/data_file.rs b/crates/paimon/src/spec/data_file.rs index 9934e63..8733b1d 100644 --- a/crates/paimon/src/spec/data_file.rs +++ b/crates/paimon/src/spec/data_file.rs @@ -83,10 +83,11 @@ impl BinaryRow { } } - /// Create a BinaryRow from raw binary bytes (e.g. from `ManifestEntry._PARTITION`). + /// Create a BinaryRow from raw binary bytes. /// /// The `data` must contain the full binary row content: /// header + null bit set + fixed-length part + variable-length part. + /// Does NOT include the 4-byte arity prefix (use `from_serialized_bytes` for that). pub fn from_bytes(arity: i32, data: Vec) -> Self { let null_bits_size_in_bytes = Self::cal_bit_set_width_in_bytes(arity); Self { @@ -96,6 +97,28 @@ impl BinaryRow { } } + /// Create a BinaryRow from Paimon's serialized format (e.g. `ManifestEntry._PARTITION`). + /// + /// Java `SerializationUtils.serializeBinaryRow()` prepends a 4-byte big-endian arity + /// before the raw BinaryRow content. This method reads that prefix and strips it, + /// matching Java `SerializationUtils.deserializeBinaryRow()`. + /// + /// Variable-length field offsets inside the BinaryRow content are relative to the + /// BinaryRow base, so they remain valid after stripping the 4-byte prefix. + pub fn from_serialized_bytes(data: &[u8]) -> crate::Result { + if data.len() < 4 { + return Err(crate::Error::UnexpectedError { + message: format!( + "BinaryRow: serialized data too short for arity prefix: {} bytes", + data.len() + ), + source: None, + }); + } + let arity = i32::from_be_bytes([data[0], data[1], data[2], data[3]]); + Ok(Self::from_bytes(arity, data[4..].to_vec())) + } + /// Number of fields in this row. pub fn arity(&self) -> i32 { self.arity @@ -591,6 +614,30 @@ mod tests { assert_eq!(BinaryRow::cal_bit_set_width_in_bytes(57), 16); } + #[test] + fn test_from_serialized_bytes() { + // Build a raw BinaryRow with arity=1, int value 42, then prepend 4-byte BE arity prefix + // to simulate Java SerializationUtils.serializeBinaryRow() format. + let mut builder = BinaryRowBuilder::new(1); + builder.write_int(0, 42); + let raw_row = builder.build(); + let raw_data = raw_row.data(); + + let mut serialized = Vec::with_capacity(4 + raw_data.len()); + serialized.extend_from_slice(&1_i32.to_be_bytes()); + serialized.extend_from_slice(raw_data); + + let row = BinaryRow::from_serialized_bytes(&serialized).unwrap(); + assert_eq!(row.arity(), 1); + assert!(!row.is_null_at(0)); + assert_eq!(row.get_int(0).unwrap(), 42); + } + + #[test] + fn test_from_serialized_bytes_too_short() { + assert!(BinaryRow::from_serialized_bytes(&[0, 0]).is_err()); + } + #[test] fn test_get_int() { let mut builder = BinaryRowBuilder::new(2); diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index f1ef422..83af8be 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -54,3 +54,4 @@ pub(crate) mod stats; mod types; pub use types::*; mod partition_utils; +pub(crate) use partition_utils::PartitionComputer; diff --git a/crates/paimon/src/spec/partition_utils.rs b/crates/paimon/src/spec/partition_utils.rs index 409e79d..332a810 100644 --- a/crates/paimon/src/spec/partition_utils.rs +++ b/crates/paimon/src/spec/partition_utils.rs @@ -29,10 +29,6 @@ use crate::spec::BinaryRow; use crate::spec::DataField; use chrono::{Local, NaiveDate, NaiveDateTime, TimeZone, Timelike}; -// TODO: remove after #131 consumes the pub(crate) API. -#[allow(dead_code)] -pub const DEFAULT_PARTITION_NAME: &str = "__DEFAULT_PARTITION__"; - const MILLIS_PER_DAY: i64 = 86_400_000; /// Computes partition string values and directory paths from a partition `BinaryRow`. @@ -42,8 +38,6 @@ const MILLIS_PER_DAY: i64 = 86_400_000; /// (escaped directory path). /// /// Reference: `org.apache.paimon.utils.InternalRowPartitionComputer` in Java Paimon. -// TODO: remove after #131 consumes the pub(crate) API. -#[allow(dead_code)] pub(crate) struct PartitionComputer { partition_keys: Vec, partition_fields: Vec, @@ -51,7 +45,6 @@ pub(crate) struct PartitionComputer { legacy_partition_name: bool, } -#[allow(dead_code)] impl PartitionComputer { /// Create a new `PartitionComputer`. /// @@ -155,28 +148,6 @@ impl PartitionComputer { } } -/// Backward-compatible free function that delegates to `PartitionComputer`. -// TODO: remove after #131 consumes the pub(crate) API. -#[allow(dead_code)] -pub(crate) fn generate_partition_path( - partition_keys: &[String], - schema_fields: &[DataField], - row: &BinaryRow, - default_partition_name: &str, - legacy_partition_name: bool, -) -> crate::Result { - if partition_keys.is_empty() { - return Ok(String::new()); - } - let computer = PartitionComputer::new( - partition_keys, - schema_fields, - default_partition_name, - legacy_partition_name, - )?; - computer.generate_partition_path(row) -} - /// Resolve the `DataField` for each partition key from the schema fields, preserving order. fn resolve_partition_fields<'a>( partition_keys: &[String], @@ -622,6 +593,8 @@ mod tests { DataField::new(0, name.to_string(), data_type) } + const TEST_DEFAULT_PARTITION_NAME: &str = "__DEFAULT_PARTITION__"; + /// Helper: assert single-column partition path for a given type and row writer. fn assert_single_partition( name: &str, @@ -634,11 +607,12 @@ mod tests { { let fields = vec![make_field(name, data_type)]; let keys = vec![name.to_string()]; + let computer = + PartitionComputer::new(&keys, &fields, TEST_DEFAULT_PARTITION_NAME, legacy).unwrap(); let mut builder = TestRowBuilder::new(1); write_fn(&mut builder); let row = builder.build(); - let result = - generate_partition_path(&keys, &fields, &row, DEFAULT_PARTITION_NAME, legacy).unwrap(); + let result = computer.generate_partition_path(&row).unwrap(); assert_eq!(result, expected); } @@ -649,12 +623,12 @@ mod tests { { let fields = vec![make_field(name, data_type)]; let keys = vec![name.to_string()]; + let computer = + PartitionComputer::new(&keys, &fields, TEST_DEFAULT_PARTITION_NAME, legacy).unwrap(); let mut builder = TestRowBuilder::new(1); write_fn(&mut builder); let row = builder.build(); - assert!( - generate_partition_path(&keys, &fields, &row, DEFAULT_PARTITION_NAME, legacy).is_err() - ); + assert!(computer.generate_partition_path(&row).is_err()); } // ======================== Escape tests ======================== @@ -689,7 +663,7 @@ mod tests { ]; let keys = vec!["dt".to_string(), "hr".to_string()]; let computer = - PartitionComputer::new(&keys, &fields, DEFAULT_PARTITION_NAME, true).unwrap(); + PartitionComputer::new(&keys, &fields, TEST_DEFAULT_PARTITION_NAME, true).unwrap(); let mut builder = TestRowBuilder::new(2); builder.write_string(0, "2024-01-01"); @@ -710,7 +684,7 @@ mod tests { ]; let keys = vec!["dt".to_string(), "hr".to_string()]; let computer = - PartitionComputer::new(&keys, &fields, DEFAULT_PARTITION_NAME, true).unwrap(); + PartitionComputer::new(&keys, &fields, TEST_DEFAULT_PARTITION_NAME, true).unwrap(); let mut builder = TestRowBuilder::new(2); builder.write_string(0, "2024-01-01"); @@ -726,7 +700,8 @@ mod tests { #[test] fn test_empty_partition_keys() { let row = BinaryRow::new(0); - let result = generate_partition_path(&[], &[], &row, DEFAULT_PARTITION_NAME, true).unwrap(); + let computer = PartitionComputer::new(&[], &[], TEST_DEFAULT_PARTITION_NAME, true).unwrap(); + let result = computer.generate_partition_path(&row).unwrap(); assert_eq!(result, ""); } @@ -748,14 +723,15 @@ mod tests { make_field("hr", DataType::Int(IntType::new())), ]; let keys = vec!["dt".to_string(), "hr".to_string()]; + let computer = + PartitionComputer::new(&keys, &fields, TEST_DEFAULT_PARTITION_NAME, true).unwrap(); let mut builder = TestRowBuilder::new(2); builder.write_string(0, "2024-01-01"); builder.write_int(1, 12); let row = builder.build(); - let result = - generate_partition_path(&keys, &fields, &row, DEFAULT_PARTITION_NAME, true).unwrap(); + let result = computer.generate_partition_path(&row).unwrap(); assert_eq!(result, "dt=2024-01-01/hr=12/"); } @@ -763,13 +739,14 @@ mod tests { fn test_null_partition_value() { let fields = vec![make_field("dt", DataType::VarChar(VarCharType::default()))]; let keys = vec!["dt".to_string()]; + let computer = + PartitionComputer::new(&keys, &fields, TEST_DEFAULT_PARTITION_NAME, true).unwrap(); let mut builder = TestRowBuilder::new(1); builder.set_null_at(0); let row = builder.build(); - let result = - generate_partition_path(&keys, &fields, &row, DEFAULT_PARTITION_NAME, true).unwrap(); + let result = computer.generate_partition_path(&row).unwrap(); assert_eq!(result, "dt=__DEFAULT_PARTITION__/"); } @@ -952,8 +929,12 @@ mod tests { builder.write_int(0, 1); let row = builder.build(); - let result = generate_partition_path(&keys, &fields, &row, DEFAULT_PARTITION_NAME, true); - assert!(result.is_err()); + let result = PartitionComputer::new(&keys, &fields, TEST_DEFAULT_PARTITION_NAME, true); + // Construction succeeds (field resolution fails for hr), or path generation fails due to arity mismatch + match result { + Err(_) => {} // field resolution failed — expected + Ok(computer) => assert!(computer.generate_partition_path(&row).is_err()), + } } #[test] @@ -961,11 +942,7 @@ mod tests { let fields = vec![make_field("other", DataType::Int(IntType::new()))]; let keys = vec!["dt".to_string()]; - let mut builder = TestRowBuilder::new(1); - builder.write_int(0, 1); - let row = builder.build(); - - let result = generate_partition_path(&keys, &fields, &row, DEFAULT_PARTITION_NAME, true); + let result = PartitionComputer::new(&keys, &fields, TEST_DEFAULT_PARTITION_NAME, true); assert!(result.is_err()); } @@ -1005,9 +982,11 @@ mod tests { fn test_empty_row_with_partition_keys() { let fields = vec![make_field("dt", DataType::Int(IntType::new()))]; let keys = vec!["dt".to_string()]; + let computer = + PartitionComputer::new(&keys, &fields, TEST_DEFAULT_PARTITION_NAME, true).unwrap(); let row = BinaryRow::new(1); // empty backing data - let result = generate_partition_path(&keys, &fields, &row, DEFAULT_PARTITION_NAME, true); + let result = computer.generate_partition_path(&row); assert!(result.is_err()); } @@ -1059,11 +1038,13 @@ mod tests { fn test_truncated_row_returns_error() { let fields = vec![make_field("dt", DataType::Int(IntType::new()))]; let keys = vec!["dt".to_string()]; + let computer = + PartitionComputer::new(&keys, &fields, TEST_DEFAULT_PARTITION_NAME, true).unwrap(); // Create a BinaryRow with arity=1 but truncated backing data (too short). let row = BinaryRow::from_bytes(1, vec![0u8; 4]); // needs >= 16 bytes - let result = generate_partition_path(&keys, &fields, &row, DEFAULT_PARTITION_NAME, true); + let result = computer.generate_partition_path(&row); assert!(result.is_err()); let msg = result.unwrap_err().to_string(); assert!(msg.contains("too short"), "Expected 'too short' in: {msg}"); diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index dded84c..1f640cd 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -90,14 +90,6 @@ impl<'a> TableRead<'a> { }); } - if !self.table.schema.partition_keys().is_empty() { - return Err(Error::Unsupported { - message: format!( - "Reading partitioned tables is not yet supported. Partition keys: {:?}", - self.table.schema.partition_keys() - ), - }); - } let reader = ArrowReaderBuilder::new(self.table.file_io.clone()).build(self.read_type().to_vec()); reader.read(data_splits) diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index 5d5bc4a..1c3454a 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -22,7 +22,9 @@ use super::Table; use crate::io::FileIO; -use crate::spec::{BinaryRow, CoreOptions, FileKind, IndexManifest, ManifestEntry, Snapshot}; +use crate::spec::{ + BinaryRow, CoreOptions, FileKind, IndexManifest, ManifestEntry, PartitionComputer, Snapshot, +}; use crate::table::bin_pack::split_for_batch; use crate::table::source::{DataSplitBuilder, DeletionFile, PartitionBucket, Plan}; use crate::table::SnapshotManager; @@ -172,29 +174,16 @@ impl<'a> TableScan<'a> { Some(s) => s, None => return Ok(Plan::new(Vec::new())), }; + self.plan_snapshot(snapshot).await + } + + async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result { + let file_io = self.table.file_io(); + let table_path = self.table.location(); let core_options = CoreOptions::new(self.table.schema().options()); + let deletion_vectors_enabled = core_options.deletion_vectors_enabled(); let target_split_size = core_options.source_split_target_size(); let open_file_cost = core_options.source_split_open_file_cost(); - let deletion_vectors_enabled = core_options.deletion_vectors_enabled(); - Self::plan_snapshot( - snapshot, - file_io, - table_path, - target_split_size, - open_file_cost, - deletion_vectors_enabled, - ) - .await - } - - async fn plan_snapshot( - snapshot: Snapshot, - file_io: &FileIO, - table_path: &str, - target_split_size: i64, - open_file_cost: i64, - deletion_vectors_enabled: bool, - ) -> crate::Result { let entries = read_all_manifest_entries(file_io, table_path, &snapshot).await?; let entries = filter_manifest_entries(entries, deletion_vectors_enabled); let entries = merge_manifest_entries(entries); @@ -210,13 +199,24 @@ impl<'a> TableScan<'a> { } let snapshot_id = snapshot.id(); - let base_path = table_path; + let base_path = table_path.trim_end_matches('/'); let mut splits = Vec::new(); + let partition_keys = self.table.schema().partition_keys(); + let partition_computer = if !partition_keys.is_empty() { + Some(PartitionComputer::new( + partition_keys, + self.table.schema().fields(), + core_options.partition_default_name(), + core_options.legacy_partition_name(), + )?) + } else { + None + }; + // Read deletion vector index manifest once (like Java generateSplits / scanDvIndex). let deletion_files_map = if let Some(index_manifest_name) = snapshot.index_manifest() { - let index_manifest_path = - format!("{}/{}", base_path.trim_end_matches('/'), MANIFEST_DIR); + let index_manifest_path = format!("{base_path}/{MANIFEST_DIR}"); let path = format!("{index_manifest_path}/{index_manifest_name}"); let index_entries = IndexManifest::read(file_io, &path).await?; Some(build_deletion_files_map(&index_entries, base_path)) @@ -225,6 +225,8 @@ impl<'a> TableScan<'a> { }; for ((partition, bucket), group_entries) in groups { + let partition_row = BinaryRow::from_serialized_bytes(&partition)?; + let total_buckets = group_entries .first() .map(|e| e.total_buckets()) @@ -240,18 +242,20 @@ impl<'a> TableScan<'a> { }) .collect(); - // todo: consider partitioned table - let bucket_path = format!("{base_path}/bucket-{bucket}"); + let bucket_path = if let Some(ref computer) = partition_computer { + let partition_path = computer.generate_partition_path(&partition_row)?; + format!("{base_path}/{partition_path}bucket-{bucket}") + } else { + format!("{base_path}/bucket-{bucket}") + }; - // Get the per-bucket deletion file map for looking up by file name after bin packing. + // Original `partition` Vec consumed by PartitionBucket for DV map lookup. let per_bucket_deletion_map = deletion_files_map .as_ref() .and_then(|map| map.get(&PartitionBucket::new(partition, bucket))); let file_groups = split_for_batch(data_files, target_split_size, open_file_cost); for file_group in file_groups { - // Build deletion files list for this specific file group (by file name lookup), - // matching Python's _get_deletion_files_for_split. let data_deletion_files = per_bucket_deletion_map.map(|per_bucket| { file_group .iter() @@ -261,8 +265,7 @@ impl<'a> TableScan<'a> { let mut builder = DataSplitBuilder::new() .with_snapshot(snapshot_id) - // todo: consider pass real partition - .with_partition(BinaryRow::new(0)) + .with_partition(partition_row.clone()) .with_bucket(bucket) .with_bucket_path(bucket_path.clone()) .with_total_buckets(total_buckets) diff --git a/dev/spark/provision.py b/dev/spark/provision.py index 21a7a52..a813238 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -110,6 +110,92 @@ def main(): """ ) + # ===== Partitioned table: single partition key (dt) ===== + spark.sql( + """ + CREATE TABLE IF NOT EXISTS partitioned_log_table ( + id INT, + name STRING, + dt STRING + ) USING paimon + PARTITIONED BY (dt) + """ + ) + spark.sql( + """ + INSERT INTO partitioned_log_table VALUES + (1, 'alice', '2024-01-01'), + (2, 'bob', '2024-01-01'), + (3, 'carol', '2024-01-02') + """ + ) + + # ===== Partitioned table: multiple partition keys (dt, hr) ===== + spark.sql( + """ + CREATE TABLE IF NOT EXISTS multi_partitioned_log_table ( + id INT, + name STRING, + dt STRING, + hr INT + ) USING paimon + PARTITIONED BY (dt, hr) + """ + ) + spark.sql( + """ + INSERT INTO multi_partitioned_log_table VALUES + (1, 'alice', '2024-01-01', 10), + (2, 'bob', '2024-01-01', 10), + (3, 'carol', '2024-01-01', 20), + (4, 'dave', '2024-01-02', 10) + """ + ) + + # ===== Partitioned table: PK + DV enabled ===== + spark.sql( + """ + CREATE TABLE IF NOT EXISTS partitioned_dv_pk_table ( + id INT, + name STRING, + dt STRING + ) USING paimon + PARTITIONED BY (dt) + TBLPROPERTIES ( + 'primary-key' = 'id,dt', + 'bucket' = '1', + 'deletion-vectors.enabled' = 'true' + ) + """ + ) + + spark.sql( + """ + INSERT INTO partitioned_dv_pk_table VALUES + (1, 'alice-v1', '2024-01-01'), + (2, 'bob-v1', '2024-01-01'), + (1, 'alice-v1', '2024-01-02'), + (3, 'carol-v1', '2024-01-02') + """ + ) + + spark.sql( + """ + INSERT INTO partitioned_dv_pk_table VALUES + (1, 'alice-v2', '2024-01-01'), + (3, 'carol-v2', '2024-01-02'), + (4, 'dave-v1', '2024-01-02') + """ + ) + + spark.sql( + """ + INSERT INTO partitioned_dv_pk_table VALUES + (2, 'bob-v2', '2024-01-01'), + (4, 'dave-v2', '2024-01-02') + """ + ) + if __name__ == "__main__": main()