diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 73f47c9..df0aebd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -96,6 +96,12 @@ jobs: RUST_LOG: DEBUG RUST_BACKTRACE: full + - name: DataFusion Integration Test + run: cargo test -p paimon-datafusion --test read_tables + env: + RUST_LOG: DEBUG + RUST_BACKTRACE: full + - name: Go Integration Test working-directory: bindings/go run: make test diff --git a/Cargo.toml b/Cargo.toml index 7384d66..6282c09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ [workspace] resolver = "2" -members = ["crates/paimon", "crates/integration_tests", "bindings/c"] +members = ["crates/paimon", "crates/integration_tests", "bindings/c", "crates/integrations/datafusion"] [workspace.package] version = "0.0.0" diff --git a/crates/integrations/datafusion/Cargo.toml b/crates/integrations/datafusion/Cargo.toml new file mode 100644 index 0000000..c9673ec --- /dev/null +++ b/crates/integrations/datafusion/Cargo.toml @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "paimon-datafusion" +edition.workspace = true +version.workspace = true +license.workspace = true +description = "Apache Paimon DataFusion Integration (read-only)" +categories = ["database"] +keywords = ["paimon", "datafusion", "integrations"] + +[dependencies] +async-trait = "0.1" +datafusion = { version = "52.3.0"} +paimon = { path = "../../paimon" } +futures = "0.3" + +[dev-dependencies] +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/crates/integrations/datafusion/src/error.rs b/crates/integrations/datafusion/src/error.rs new file mode 100644 index 0000000..92b2728 --- /dev/null +++ b/crates/integrations/datafusion/src/error.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::common::error::GenericError; + +/// Converts a Paimon error into a DataFusion error. +pub fn to_datafusion_error(error: paimon::Error) -> datafusion::error::DataFusionError { + datafusion::error::DataFusionError::External(GenericError::from(error)) +} diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs new file mode 100644 index 0000000..edfe1ed --- /dev/null +++ b/crates/integrations/datafusion/src/lib.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Apache Paimon DataFusion Integration (read-only). +//! +//! Register a Paimon table as a DataFusion table provider to query it with SQL or DataFrame API. +//! +//! # Example +//! +//! ```ignore +//! use std::sync::Arc; +//! use datafusion::prelude::SessionContext; +//! use paimon_datafusion::PaimonTableProvider; +//! +//! // Obtain a Paimon Table (e.g. from your catalog), then: +//! let provider = PaimonTableProvider::try_new(table)?; +//! let ctx = SessionContext::new(); +//! ctx.register_table("my_table", Arc::new(provider))?; +//! let df = ctx.sql("SELECT * FROM my_table").await?; +//! ``` +//! +//! This version does not support write, column projection, or predicate pushdown. + +mod error; +mod physical_plan; +mod schema; +mod table; + +pub use error::to_datafusion_error; +pub use physical_plan::PaimonTableScan; +pub use schema::paimon_schema_to_arrow; +pub use table::PaimonTableProvider; diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs new file mode 100644 index 0000000..48aa546 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -0,0 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub(crate) mod scan; + +pub use scan::PaimonTableScan; diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs new file mode 100644 index 0000000..e567d15 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::error::Result as DFResult; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; +use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; +use futures::{StreamExt, TryStreamExt}; +use paimon::table::Table; + +use crate::error::to_datafusion_error; + +/// Execution plan that scans a Paimon table (read-only, no projection, no predicate, no limit). +#[derive(Debug)] +pub struct PaimonTableScan { + table: Table, + plan_properties: PlanProperties, +} + +impl PaimonTableScan { + pub(crate) fn new(schema: ArrowSchemaRef, table: Table) -> Self { + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(schema.clone()), + // TODO: Currently all Paimon splits are read in a single DataFusion partition, + // which means we lose DataFusion parallelism. A follow-up should expose one + // execution partition per Paimon split so that DataFusion can schedule them + // across threads. + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ); + Self { + table, + plan_properties, + } + } + + pub fn table(&self) -> &Table { + &self.table + } +} + +impl ExecutionPlan for PaimonTableScan { + fn name(&self) -> &str { + "PaimonTableScan" + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> DFResult { + let table = self.table.clone(); + let schema = self.schema(); + + let fut = async move { + let read_builder = table.new_read_builder(); + let scan = read_builder.new_scan(); + let plan = scan.plan().await.map_err(to_datafusion_error)?; + let read = read_builder.new_read().map_err(to_datafusion_error)?; + let stream = read.to_arrow(plan.splits()).map_err(to_datafusion_error)?; + let stream = stream.map(|r| r.map_err(to_datafusion_error)); + + Ok::<_, datafusion::error::DataFusionError>(RecordBatchStreamAdapter::new( + schema, + Box::pin(stream), + )) + }; + + let stream = futures::stream::once(fut).try_flatten(); + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema(), + stream, + ))) + } +} + +impl DisplayAs for PaimonTableScan { + fn fmt_as( + &self, + _t: datafusion::physical_plan::DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + write!(f, "PaimonTableScan") + } +} diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs new file mode 100644 index 0000000..231431b --- /dev/null +++ b/crates/integrations/datafusion/src/schema.rs @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::common::DataFusionError; +use datafusion::common::Result as DFResult; +use std::sync::Arc; + +use paimon::spec::{DataField, DataType as PaimonDataType}; + +/// Converts Paimon table schema (logical row type fields) to DataFusion Arrow schema. +pub fn paimon_schema_to_arrow(fields: &[DataField]) -> DFResult> { + let arrow_fields: Vec = fields + .iter() + .map(|f| { + let arrow_type = paimon_data_type_to_arrow(f.data_type())?; + Ok(Field::new( + f.name(), + arrow_type, + f.data_type().is_nullable(), + )) + }) + .collect::>>()?; + Ok(Arc::new(Schema::new(arrow_fields))) +} + +fn paimon_data_type_to_arrow(dt: &PaimonDataType) -> DFResult { + use datafusion::arrow::datatypes::TimeUnit; + + Ok(match dt { + PaimonDataType::Boolean(_) => DataType::Boolean, + PaimonDataType::TinyInt(_) => DataType::Int8, + PaimonDataType::SmallInt(_) => DataType::Int16, + PaimonDataType::Int(_) => DataType::Int32, + PaimonDataType::BigInt(_) => DataType::Int64, + PaimonDataType::Float(_) => DataType::Float32, + PaimonDataType::Double(_) => DataType::Float64, + PaimonDataType::VarChar(_) | PaimonDataType::Char(_) => DataType::Utf8, + PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) => DataType::Binary, + PaimonDataType::Date(_) => DataType::Date32, + PaimonDataType::Time(t) => match t.precision() { + // `read.to_arrow(...)` goes through the Parquet Arrow reader, which exposes INT32 + // TIME values as millisecond precision only. Mirror that here so provider schema and + // runtime RecordBatch schema stay identical. + 0..=3 => DataType::Time32(TimeUnit::Millisecond), + 4..=6 => DataType::Time64(TimeUnit::Microsecond), + 7..=9 => DataType::Time64(TimeUnit::Nanosecond), + precision => { + return Err(DataFusionError::Internal(format!( + "Unsupported TIME precision {precision}" + ))); + } + }, + PaimonDataType::Timestamp(t) => { + DataType::Timestamp(timestamp_time_unit(t.precision())?, None) + } + PaimonDataType::LocalZonedTimestamp(t) => { + DataType::Timestamp(timestamp_time_unit(t.precision())?, Some("UTC".into())) + } + PaimonDataType::Decimal(d) => { + let p = u8::try_from(d.precision()).map_err(|_| { + DataFusionError::Internal("Decimal precision exceeds u8".to_string()) + })?; + let s = i8::try_from(d.scale() as i32).map_err(|_| { + DataFusionError::Internal("Decimal scale out of i8 range".to_string()) + })?; + match d.precision() { + // The Parquet Arrow reader normalizes DECIMAL columns to Decimal128 regardless of + // Parquet physical storage width. Mirror that here to avoid DataFusion schema + // mismatch between `TableProvider::schema()` and `execute()` output. + 1..=38 => DataType::Decimal128(p, s), + precision => { + return Err(DataFusionError::Internal(format!( + "Unsupported DECIMAL precision {precision}" + ))); + } + } + } + PaimonDataType::Array(_) + | PaimonDataType::Map(_) + | PaimonDataType::Multiset(_) + | PaimonDataType::Row(_) => { + return Err(DataFusionError::NotImplemented( + "Paimon DataFusion integration does not yet support nested types (Array/Map/Row)" + .to_string(), + )); + } + }) +} + +fn timestamp_time_unit(precision: u32) -> DFResult { + use datafusion::arrow::datatypes::TimeUnit; + + match precision { + 0..=3 => Ok(TimeUnit::Millisecond), + 4..=6 => Ok(TimeUnit::Microsecond), + 7..=9 => Ok(TimeUnit::Nanosecond), + _ => Err(DataFusionError::Internal(format!( + "Unsupported TIMESTAMP precision {precision}" + ))), + } +} diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs new file mode 100644 index 0000000..04ccab6 --- /dev/null +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Paimon table provider for DataFusion (read-only). + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::catalog::Session; +use datafusion::common::DataFusionError; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result as DFResult; +use datafusion::logical_expr::Expr; +use datafusion::physical_plan::ExecutionPlan; +use paimon::table::Table; + +use crate::physical_plan::PaimonTableScan; +use crate::schema::paimon_schema_to_arrow; + +/// Read-only table provider for a Paimon table. +/// +/// Supports full table scan only (no write, no subset/reordered projection, no predicate +/// pushdown). +#[derive(Debug, Clone)] +pub struct PaimonTableProvider { + table: Table, + schema: ArrowSchemaRef, +} + +impl PaimonTableProvider { + /// Create a table provider from a Paimon table. + /// + /// Loads the table schema and converts it to Arrow for DataFusion. + pub fn try_new(table: Table) -> DFResult { + let fields = table.schema().fields(); + let schema = paimon_schema_to_arrow(fields)?; + Ok(Self { table, schema }) + } + + pub fn table(&self) -> &Table { + &self.table + } +} + +#[async_trait] +impl TableProvider for PaimonTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> ArrowSchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + if let Some(projection) = projection { + let is_full_schema_projection = projection.len() == self.schema.fields().len() + && projection.iter().copied().eq(0..self.schema.fields().len()); + + if !is_full_schema_projection { + return Err(DataFusionError::NotImplemented( + "Paimon DataFusion integration does not yet support subset or reordered projections; use SELECT * until apache/paimon-rust#146 is implemented".to_string(), + )); + } + } + + Ok(Arc::new(PaimonTableScan::new( + self.schema.clone(), + self.table.clone(), + ))) + } +} diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs new file mode 100644 index 0000000..f813d78 --- /dev/null +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::arrow::array::{Int32Array, StringArray}; +use datafusion::prelude::SessionContext; +use paimon::catalog::Identifier; +use paimon::{Catalog, FileSystemCatalog}; +use paimon_datafusion::PaimonTableProvider; + +fn get_test_warehouse() -> String { + std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| "/tmp/paimon-warehouse".to_string()) +} + +async fn create_context(table_name: &str) -> SessionContext { + let warehouse = get_test_warehouse(); + let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create catalog"); + let identifier = Identifier::new("default", table_name); + let table = catalog + .get_table(&identifier) + .await + .expect("Failed to get table"); + + let provider = PaimonTableProvider::try_new(table).expect("Failed to create table provider"); + let ctx = SessionContext::new(); + ctx.register_table(table_name, Arc::new(provider)) + .expect("Failed to register table"); + + ctx +} + +async fn read_rows(table_name: &str) -> Vec<(i32, String)> { + let batches = collect_query(table_name, &format!("SELECT id, name FROM {table_name}")) + .await + .expect("Failed to collect query result"); + + assert!( + !batches.is_empty(), + "Expected at least one batch from table {table_name}" + ); + + let mut actual_rows = Vec::new(); + for batch in &batches { + let id_array = batch + .column_by_name("id") + .and_then(|column| column.as_any().downcast_ref::()) + .expect("Expected Int32Array for id column"); + let name_array = batch + .column_by_name("name") + .and_then(|column| column.as_any().downcast_ref::()) + .expect("Expected StringArray for name column"); + + for row_index in 0..batch.num_rows() { + actual_rows.push(( + id_array.value(row_index), + name_array.value(row_index).to_string(), + )); + } + } + + actual_rows.sort_by_key(|(id, _)| *id); + actual_rows +} + +async fn collect_query( + table_name: &str, + sql: &str, +) -> datafusion::error::Result> { + let ctx = create_context(table_name).await; + + ctx.sql(sql).await?.collect().await +} + +#[tokio::test] +async fn test_read_log_table_via_datafusion() { + let actual_rows = read_rows("simple_log_table").await; + let expected_rows = vec![ + (1, "alice".to_string()), + (2, "bob".to_string()), + (3, "carol".to_string()), + ]; + + assert_eq!( + actual_rows, expected_rows, + "Rows should match expected values" + ); +} + +#[tokio::test] +async fn test_read_primary_key_table_via_datafusion() { + let actual_rows = read_rows("simple_dv_pk_table").await; + let expected_rows = vec![ + (1, "alice-v2".to_string()), + (2, "bob-v2".to_string()), + (3, "carol-v2".to_string()), + (4, "dave-v2".to_string()), + (5, "eve-v2".to_string()), + (6, "frank-v1".to_string()), + ]; + + assert_eq!( + actual_rows, expected_rows, + "Primary key table rows should match expected values" + ); +} + +#[tokio::test] +async fn test_subset_projection_returns_not_implemented() { + let error = collect_query("simple_log_table", "SELECT id FROM simple_log_table") + .await + .expect_err("Subset projection should be rejected until projection support lands"); + + assert!( + error + .to_string() + .contains("does not yet support subset or reordered projections"), + "Expected explicit unsupported projection error, got: {error}" + ); +} diff --git a/crates/paimon/examples/rest_list_databases_example.rs b/crates/paimon/examples/rest_list_databases_example.rs index 5140c17..2a5e435 100644 --- a/crates/paimon/examples/rest_list_databases_example.rs +++ b/crates/paimon/examples/rest_list_databases_example.rs @@ -48,7 +48,7 @@ async fn main() { let api = match RESTApi::new(options, true).await { Ok(api) => api, Err(e) => { - eprintln!("Failed to create RESTApi: {}", e); + eprintln!("Failed to create RESTApi: {e}"); return; } }; @@ -57,11 +57,11 @@ async fn main() { println!("Calling list_databases()..."); match api.list_databases().await { Ok(databases) => { - println!("Databases found: {:?}", databases); + println!("Databases found: {databases:?}"); println!("Total count: {}", databases.len()); } Err(e) => { - eprintln!("Failed to list databases: {}", e); + eprintln!("Failed to list databases: {e}"); } } } diff --git a/crates/paimon/src/api/auth/factory.rs b/crates/paimon/src/api/auth/factory.rs index d073640..58234b2 100644 --- a/crates/paimon/src/api/auth/factory.rs +++ b/crates/paimon/src/api/auth/factory.rs @@ -53,7 +53,7 @@ impl AuthProviderFactory { message: "auth provider is required".to_string(), }), Some(unknown) => Err(Error::ConfigInvalid { - message: format!("Unknown auth provider: {}", unknown), + message: format!("Unknown auth provider: {unknown}"), }), } } diff --git a/crates/paimon/src/api/rest_client.rs b/crates/paimon/src/api/rest_client.rs index 13f59be..b348918 100644 --- a/crates/paimon/src/api/rest_client.rs +++ b/crates/paimon/src/api/rest_client.rs @@ -48,7 +48,7 @@ impl HttpClient { .timeout(Duration::from_secs(30)) .build() .map_err(|e| Error::ConfigInvalid { - message: format!("Failed to create HTTP client: {}", e), + message: format!("Failed to create HTTP client: {e}"), })?; Ok(HttpClient { @@ -78,7 +78,7 @@ impl HttpClient { let normalized_url = if uri.starts_with("http://") || uri.starts_with("https://") { uri.to_string() } else { - format!("http://{}", uri) + format!("http://{uri}") }; // Remove trailing slash diff --git a/crates/paimon/src/spec/partition_utils.rs b/crates/paimon/src/spec/partition_utils.rs index 409e79d..418ecfc 100644 --- a/crates/paimon/src/spec/partition_utils.rs +++ b/crates/paimon/src/spec/partition_utils.rs @@ -189,7 +189,7 @@ fn resolve_partition_fields<'a>( .iter() .find(|f| f.name() == key) .ok_or_else(|| Error::UnexpectedError { - message: format!("Partition key '{}' not found in schema fields", key), + message: format!("Partition key '{key}' not found in schema fields"), source: None, }) }) @@ -298,7 +298,7 @@ fn format_partition_value( | DataType::Multiset(_) | DataType::Row(_) => { return Err(Error::Unsupported { - message: format!("{:?} type is not supported as partition key", data_type), + message: format!("{data_type:?} type is not supported as partition key"), }); } }; @@ -330,7 +330,7 @@ fn format_time(millis_of_day: i32, precision: u32) -> String { let s = (ms % 60_000) / 1_000; let mut frac_ms = ms % 1_000; - let hms = format!("{:02}:{:02}:{:02}", h, m, s); + let hms = format!("{h:02}:{m:02}:{s:02}"); if precision == 0 || frac_ms == 0 { return hms; } @@ -348,7 +348,7 @@ fn format_time(millis_of_day: i32, precision: u32) -> String { remaining -= 1; } - format!("{}.{}", hms, frac) + format!("{hms}.{frac}") } /// Format an unscaled i128 value with the given scale to a plain decimal string. @@ -448,9 +448,9 @@ fn format_timestamp_legacy(dt: NaiveDateTime) -> String { return date_hour_min; } - let mut result = format!("{}:{:02}", date_hour_min, sec); + let mut result = format!("{date_hour_min}:{sec:02}"); if nano > 0 { - let frac = format!("{:09}", nano); + let frac = format!("{nano:09}"); let trimmed = frac.trim_end_matches('0'); result.push('.'); result.push_str(trimmed); @@ -471,7 +471,7 @@ fn format_timestamp_non_legacy(dt: NaiveDateTime, precision: u32) -> String { } // Pad nano to 9 digits, then strip trailing zeros but keep at least up to `precision` digits. - let nano_str = format!("{:09}", nano); + let nano_str = format!("{nano:09}"); let mut fraction = &nano_str[..]; // Strip trailing zeros, but don't go below the precision boundary. @@ -482,7 +482,7 @@ fn format_timestamp_non_legacy(dt: NaiveDateTime, precision: u32) -> String { if fraction.is_empty() { ymdhms } else { - format!("{}.{}", ymdhms, fraction) + format!("{ymdhms}.{fraction}") } } diff --git a/crates/paimon/tests/mock_server.rs b/crates/paimon/tests/mock_server.rs index bedb46f..32ced4d 100644 --- a/crates/paimon/tests/mock_server.rs +++ b/crates/paimon/tests/mock_server.rs @@ -90,7 +90,7 @@ impl RESTServer { let err = ErrorResponse::new( None, None, - Some(format!("Warehouse {} not found", warehouse)), + Some(format!("Warehouse {warehouse} not found")), Some(404), ); return (StatusCode::NOT_FOUND, Json(err)).into_response(); @@ -130,7 +130,7 @@ impl RESTServer { /// Get the server URL. pub fn url(&self) -> Option { - self.addr.map(|a| format!("http://{}", a)) + self.addr.map(|a| format!("http://{a}")) } /// Get the server address. @@ -174,7 +174,7 @@ pub async fn start_mock_server( .route("/v1/config", get(RESTServer::get_config)) // Database routes .route( - &format!("{}/databases", prefix), + &format!("{prefix}/databases"), get(RESTServer::list_databases), ) .layer(Extension(state)); @@ -186,7 +186,7 @@ pub async fn start_mock_server( let server_handle = tokio::spawn(async move { if let Err(e) = axum::serve(listener, app.into_make_service()).await { - eprintln!("mock server error: {}", e); + eprintln!("mock server error: {e}"); } }); diff --git a/deny.toml b/deny.toml index c11cec6..02dd81b 100644 --- a/deny.toml +++ b/deny.toml @@ -26,6 +26,7 @@ allow = [ "MIT", "Unicode-3.0", "Zlib", + "bzip2-1.0.6", ] exceptions = [