-
Notifications
You must be signed in to change notification settings - Fork 149
Description
Summary
ColumnData::Time fails during bulk_insert() when a DATE column immediately precedes a TIME column in a multi-column BCP row.
SQL Server returns:
4816: Invalid column type from bcp client
This appears to be a metadata encoding issue in the TDS bulk protocol implementation.
The issue is reproducible on:
-
SQL Server 2022
-
tiberius0.12.3 -
Rust 1.75+
-
Linux (Fedora)
Environment
tiberius = 0.12.3
SQL Server 2022 (RTM-CU23)
Microsoft SQL Server 2022 - 16.0.4236.2
Connection uses standard TCP + TDS:
AuthMethod::sql_server(user, password)
Minimal failing schema
CREATE TABLE test_bug (
id BIGINT NULL,
d DATE NULL,
t TIME(7) NULL
);
Minimal reproduction
use tiberius::{Client, ColumnData, TokenRow};
use tiberius::time::{Date, Time};
let mut req = client.bulk_insert("#test_bug").await?;
let mut row = TokenRow::new();
row.push(ColumnData::I64(Some(1)));
row.push(ColumnData::Date(Some(Date::new(738535))));
row.push(ColumnData::Time(Some(Time::new(79101234560, 7))));
req.send(row).await?;
req.finalize().await?;
Observed behavior
SQL Server returns:
Token error: 'Invalid column type from bcp client for colid 3.'
(code: 4816)
The failure occurs during finalize().
Expected behavior
The row should insert successfully.
The same schema and values work correctly when inserted via:
-
normal
INSERT -
the
bcpCLI tool -
ColumnData::Timein a single-column table -
ColumnData::Timewhen only fixed-length types precede it
Additional investigation
I wrote a probe that tests many combinations of column types during BCP.
Results show:
| Schema | Result |
|---|---|
| TIME(7) single column | PASS |
| BIGINT → TIME(7) | PASS |
| BIGINT → NVARCHAR → TIME(7) | PASS |
| BIGINT → BIT → TIME(7) | PASS |
| BIGINT → DATE → TIME(7) | FAIL |
Example failing case:
(BIGINT, DATE, TIME(7))
Example working case:
(BIGINT, NVARCHAR(255), TIME(7))
This strongly suggests the issue occurs specifically when DateN precedes TimeN in the bulk metadata stream.
SQL Server error
Invalid column type from bcp client
Error 4816
This indicates SQL Server rejected the BCP column metadata, not the row value itself.
Workaround
Using a staging table avoids the issue:
BCP -> NVARCHAR(MAX)
INSERT INTO target
SELECT CAST(col AS TIME(7))
FROM staging
This confirms the issue occurs specifically when sending TimeN via BCP in the failing column configuration.
Full probe program
A full reproduction probe (≈600 lines) that isolates the issue across many column configurations is available here:
cargo.toml:
[package] name = "tiberius-bcp-probe" version = "0.1.0" edition = "2021" description = "Minimal reproducer for tiberius BCP type-compatibility with TIME / NVARCHAR columns"[[bin]]
name = "probe"
path = "src/main.rs"
[dependencies]
# Same tiberius line as etl-core — do not bump independently.
# "tds73" — required by etl-core; test with the same feature set.
# "time" — re-exports tiberius::time::Time (tiberius's own internal type,
# NOT the time crate). Enables correct multi-column BCP encoding
# for ColumnData::Time. Without this, tiberius sends a malformed
# COLMETADATA token for TIME columns in multi-column BCP streams,
# causing SQL Server error 4816.
# ColumnData::Time(Optiontiberius::time::Time) — note: NOT time::Time from the time crate.
tiberius = { version = "0.12", features = ["native-tls", "tds73", "time"] }
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["compat"] }
anyhow = "1"
# No external "time" crate needed — tiberius::time::Time is tiberius's own type.
main.rs:
//! tiberius BCP probe -- systematically tests which (SQL column type, ColumnData variant)
//! combinations tiberius 0.12.x bulk-insert accepts without SQL Server error 4816.
//!
//! # Usage
//!
//! MSSQL_HOST=localhost MSSQL_PORT=1433 \
//! MSSQL_USER=sa MSSQL_PASSWORD=YourPassword \
//! MSSQL_DATABASE=etl_target \
//! cargo run
//!
//! # Key discovery (from compile errors)
//!
//! `ColumnData::Time` wraps `Option` -- tiberius's OWN internal
//! struct (the old `MssqlTime` renamed), NOT `time::Time` from the external `time` crate.
//! The two types share the same name but are completely unrelated.
//!
//! `tiberius::time::Time::new(increments: u64, scale: u8)`:
//! scale = precision (0-7), matching SQL Server TIME(n)
//! increments = time-of-day in units of 10^(-scale) seconds
//!
//! Example -- 02:11:50.123456 (scale = 6, units = microseconds):
//! 2*3_600_000_000 + 11*60_000_000 + 50_000_000 + 123_456 = 7_910_123_456
//!
//! The `"time"` feature on tiberius is what makes `tiberius::time::Time` public.
//! Without it the module path exists but the type is not accessible.
use anyhow::{Context, Result};
use std::borrow::Cow;
use tiberius::time::Time as TibTime; // tiberius's own type, NOT the `time` crate
use tiberius::{AuthMethod, Client, ColumnData, Config, TokenRow};
use tokio::net::TcpStream;
use tokio_util::compat::TokioAsyncWriteCompatExt;
// -- type alias ----------------------------------------------------------------
type Conn = Client>;
// -- connection ----------------------------------------------------------------
fn env(key: &str, default: &str) -> String {
std::env::var(key).unwrap_or_else(|_| default.to_owned())
}
async fn connect() -> Result {
let host = env("MSSQL_HOST", "localhost");
let port: u16 = env("MSSQL_PORT", "1433").parse().unwrap_or(1433);
let user = env("MSSQL_USER", "sa");
let password = env("MSSQL_PASSWORD", "");
let database = env("MSSQL_DATABASE", "etl_target");
let mut cfg = Config::new();
cfg.host(&host);
cfg.port(port);
cfg.authentication(AuthMethod::sql_server(&user, &password));
cfg.database(&database);
cfg.trust_cert();
let tcp = TcpStream::connect(cfg.get_addr())
.await
.with_context(|| format!("TCP connect -- host={host} port={port}"))?;
tcp.set_nodelay(true)?;
Client::connect(cfg, tcp.compat_write())
.await
.with_context(|| format!("TDS handshake -- user={user} db={database}"))
}
// -- test result ---------------------------------------------------------------
struct TestResult {
id: String,
col_type: String,
val_desc: String,
outcome: Outcome,
}
enum Outcome {
Pass { read_back: String },
Fail { error: String },
}
impl TestResult {
fn print(&self) {
let (badge, detail) = match &self.outcome {
Outcome::Pass { read_back } => (
"\x1b[32mPASS\x1b[0m",
format!("-> {read_back}"),
),
Outcome::Fail { error } => (
"\x1b[31mFAIL\x1b[0m",
error.clone(),
),
};
println!(" {:>3} {:<36} {:<50} {badge} {detail}",
self.id, self.col_type, self.val_desc);
}
fn passed(&self) -> bool { matches!(self.outcome, Outcome::Pass { .. }) }
}
// -- core: single-column BCP test ----------------------------------------------
//
// Each test reconnects independently so that a tiberius 4816 error on one test
// cannot corrupt the connection state for the next test.
async fn bcp_single(
id: impl Into,
col_type: impl Into,
value: ColumnData<'static>,
val_desc: impl Into,
) -> TestResult {
let id = id.into();
let col_type = col_type.into();
let val_desc = val_desc.into();
let mut c = match connect().await {
Ok(c) => c,
Err(e) => return fail(id, col_type, val_desc, format!("connect: {e}")),
};
let table = "#tmp_bcp";
let _ = c.simple_query(
format!("IF OBJECT_ID('tempdb..{table}') IS NOT NULL DROP TABLE {table}")
).await;
if let Err(e) = c.simple_query(
format!("CREATE TABLE {table} ([val] {col_type} NULL)")
).await {
return fail(id, col_type, val_desc, format!("CREATE TABLE: {e}"));
}
if let Some(err) = do_bcp_one_row(&mut c, table, value).await {
return fail(id, col_type, val_desc, err);
}
// Cast to NVARCHAR for display regardless of actual column type.
let read_back = select_scalar(&mut c,
&format!("SELECT CONVERT(NVARCHAR(100), [val], 121) FROM {table}")
).await;
TestResult { id, col_type, val_desc, outcome: Outcome::Pass { read_back } }
}
/// Attempt one BCP row; returns `Some(error_string)` on failure, `None` on success.
async fn do_bcp_one_row(
c: &mut Conn,
table: &str,
value: ColumnData<'static>,
) -> Option {
let mut req = match c.bulk_insert(table).await {
Ok(r) => r,
Err(e) => return Some(format!("bulk_insert(): {e}")),
};
let mut row = TokenRow::new();
row.push(value);
if let Err(e) = req.send(row).await {
return Some(format!("send(): {e}"));
}
if let Err(e) = req.finalize().await {
return Some(format!("finalize(): {e}"));
}
None
}
// -- multi-column BCP test (E1) ------------------------------------------------
async fn bcp_multi_column() -> TestResult {
let id = "E1".to_owned();
let col_type = "(BIGINT, NVARCHAR(MAX), NVARCHAR(MAX))".to_owned();
let val_desc = "I64(42) + String(ts) + String(other)".to_owned();
let mut c = match connect().await {
Ok(c) => c,
Err(e) => return fail(id, col_type, val_desc, format!("connect: {e}")),
};
let table = "#tmp_multi";
let _ = c.simple_query(
format!("IF OBJECT_ID('tempdb..{table}') IS NOT NULL DROP TABLE {table}")
).await;
let _ = c.simple_query(format!(
"CREATE TABLE {table} ([id] BIGINT NULL, [col_time] NVARCHAR(MAX) NULL, [col_other] NVARCHAR(MAX) NULL)"
)).await;
let bcp_err: Option = 'bcp: {
let mut req = match c.bulk_insert(table).await {
Ok(r) => r,
Err(e) => break 'bcp Some(format!("bulk_insert(): {e}")),
};
let mut row = TokenRow::new();
row.push(ColumnData::I64(Some(42)));
row.push(ColumnData::String(Some(Cow::Owned("02:11:50.123456".to_owned()))));
row.push(ColumnData::String(Some(Cow::Owned("other_value".to_owned()))));
if let Err(e) = req.send(row).await { break 'bcp Some(format!("send(): {e}")); }
if let Err(e) = req.finalize().await { break 'bcp Some(format!("finalize(): {e}")); }
None
};
if let Some(err) = bcp_err {
return fail(id, col_type, val_desc, err);
}
let read_back = select_scalar(&mut c,
&format!("SELECT CONVERT(NVARCHAR(50), [col_time]) FROM {table}")
).await;
TestResult { id, col_type, val_desc, outcome: Outcome::Pass { read_back } }
}
// -- staging end-to-end (F1) ---------------------------------------------------
async fn staging_end_to_end() -> TestResult {
let id = "F1".to_owned();
let col_type = "NVARCHAR(MAX) -> TIME(7)".to_owned();
let val_desc = "BCP String + INSERT SELECT CAST(AS TIME(7))".to_owned();
let mut c = match connect().await {
Ok(c) => c,
Err(e) => return fail(id, col_type, val_desc, format!("connect: {e}")),
};
let stage = "#f_stage";
let target = "#f_target";
for t in [stage, target] {
let _ = c.simple_query(
format!("IF OBJECT_ID('tempdb..{t}') IS NOT NULL DROP TABLE {t}")
).await;
}
let _ = c.simple_query(format!("CREATE TABLE {stage} ([val] NVARCHAR(MAX) NULL)")).await;
let _ = c.simple_query(format!("CREATE TABLE {target} ([val] TIME(7) NULL)")).await;
if let Some(err) = do_bcp_one_row(
&mut c, stage,
ColumnData::String(Some(Cow::Owned("02:11:50.123456".to_owned())))
).await {
return fail(id, col_type, val_desc, format!("stage BCP: {err}"));
}
if let Err(e) = c.simple_query(
format!("INSERT INTO {target} SELECT CAST([val] AS TIME(7)) FROM {stage}")
).await {
return fail(id, col_type, val_desc, format!("INSERT SELECT CAST: {e}"));
}
let read_back = select_scalar(&mut c,
&format!("SELECT CONVERT(NVARCHAR(50), [val], 121) FROM {target}")
).await;
TestResult { id, col_type, val_desc, outcome: Outcome::Pass { read_back } }
}
// -- Group G: everything -> real TIME(7) column --------------------------------
//
// Tests both ColumnData::Time variants and string representations
// against a single-column TIME(7) target.
async fn bcp_into_time7_column() -> Vec {
let us: u64 = 2 * 3_600_000_000 + 11 * 60_000_000 + 50_000_000 + 123_456;
let hns: u64 = us * 10;
let secs: u64 = 2 * 3600 + 11 * 60 + 50;
let t_scale7 = TibTime::new(hns, 7);
let t_scale6 = TibTime::new(us, 6);
let t_scale0 = TibTime::new(secs, 0);
let ts_iso = "02:11:50.123456";
let ts_hms = "02:11:50";
let ts_bad = "2:11:50.123456"; // no leading zero
let cases: Vec<(&str, &str, ColumnData<'static>)> = vec![
("G1", "Time(TibTime scale=7, 100ns units)", ColumnData::Time(Some(t_scale7))),
("G2", "Time(TibTime scale=6, us units)", ColumnData::Time(Some(t_scale6))),
("G3", "Time(TibTime scale=0, second units)", ColumnData::Time(Some(t_scale0))),
("G4", "Time(None / NULL)", ColumnData::Time(None)),
("G5", "String(\"02:11:50.123456\") ISO 8601", ColumnData::String(Some(Cow::Owned(ts_iso.into())))),
("G6", "String(\"02:11:50\") no microseconds", ColumnData::String(Some(Cow::Owned(ts_hms.into())))),
("G7", "String(\"2:11:50.123456\") no lead-0", ColumnData::String(Some(Cow::Owned(ts_bad.into())))),
("G8", "String(None / NULL)", ColumnData::String(None)),
];
let mut results = Vec::with_capacity(cases.len());
for (id, val_desc, value) in cases {
results.push(bcp_single(id, "TIME(7)", value, val_desc).await);
}
results
}
// -- Group H: multi-column BCP with TIME at non-first position -----------------
//
// The etl_lib failure scenario: a table with DATE (or other types) before TIME.
// H1: mixed types (BIGINT, NVARCHAR, BIT, DATE, TIME) -- mirrors etl_lib schema
// H2: uniform BIGINT x16 then TIME -- passes, isolates that mixed types are the issue
// H3: just (BIGINT, DATE, TIME) -- bisect: is DATE alone the culprit?
// H4: just (BIGINT, NVARCHAR, TIME) -- bisect: is NVARCHAR alone the culprit?
// H5: just (BIGINT, BIT, TIME) -- bisect: is BIT alone the culprit?
async fn bcp_multi_column_time() -> Vec {
let us: u64 = 2 * 3_600_000_000 + 11 * 60_000_000 + 50_000_000 + 123_456;
let hns: u64 = us * 10;
let mut results = Vec::new();
// H1: (BIGINT, NVARCHAR(255), BIT, DATE, TIME(7)) -- TIME at colid=5
{
let id = "H1".to_owned();
let col_type = "(BIGINT, NVARCHAR(255), BIT, DATE, TIME(7))".to_owned();
let val_desc = "Time(scale=7) at colid=5; mirrors etl_lib mixed-type schema".to_owned();
let mut c = match connect().await {
Ok(c) => c,
Err(e) => {
results.push(fail(id, col_type, val_desc, format!("connect: {e}")));
for id in ["H2","H3","H4","H5"] {
results.push(fail(id.into(), "skipped".into(), "skipped".into(), "H1 connect failed".into()));
}
return results;
}
};
let table = "#h1";
let _ = c.simple_query(format!("IF OBJECT_ID('tempdb..{table}') IS NOT NULL DROP TABLE {table}")).await;
let _ = c.simple_query(format!(
"CREATE TABLE {table} ([id] BIGINT NULL, [s] NVARCHAR(255) NULL, [b] BIT NULL, [d] DATE NULL, [t] TIME(7) NULL)"
)).await;
use tiberius::time::{Date as TibDate, Time as TibTime};
let err: Option = 'bcp: {
let mut req = match c.bulk_insert(table).await { Ok(r) => r, Err(e) => break 'bcp Some(format!("bulk_insert(): {e}")) };
let mut row = TokenRow::new();
row.push(ColumnData::I64(Some(1)));
row.push(ColumnData::String(Some(Cow::Borrowed("hello"))));
row.push(ColumnData::Bit(Some(true)));
row.push(ColumnData::Date(Some(TibDate::new(738535))));
row.push(ColumnData::Time(Some(TibTime::new(hns, 7))));
if let Err(e) = req.send(row).await { break 'bcp Some(format!("send(): {e}")) }
if let Err(e) = req.finalize().await { break 'bcp Some(format!("finalize(): {e}")) }
None
};
if let Some(e) = err { results.push(fail(id, col_type, val_desc, e)); }
else {
let rb = select_scalar(&mut c, &format!("SELECT CONVERT(NVARCHAR(50),[t],121) FROM {table}")).await;
results.push(TestResult { id, col_type, val_desc, outcome: Outcome::Pass { read_back: rb } });
}
}
// H2: BIGINT x16 then TIME(7) at colid=17 -- all fixed-length, mirrors count of etl_lib
{
let id = "H2".to_owned();
let col_type = "BIGINT x16 + TIME(7) -- TIME at colid=17".to_owned();
let val_desc = "Time(scale=7) at colid=17; only fixed-length BIGINT before it".to_owned();
let mut c = match connect().await {
Ok(c) => c,
Err(e) => { results.push(fail(id, col_type, val_desc, format!("connect: {e}"))); return results; }
};
let table = "#h2";
let _ = c.simple_query(format!("IF OBJECT_ID('tempdb..{table}') IS NOT NULL DROP TABLE {table}")).await;
let padding = (1..=16).map(|i| format!("[c{i}] BIGINT NULL")).collect::>().join(", ");
let _ = c.simple_query(format!("CREATE TABLE {table} ({padding}, [t] TIME(7) NULL)")).await;
use tiberius::time::Time as TibTime;
let err: Option = 'bcp: {
let mut req = match c.bulk_insert(table).await { Ok(r) => r, Err(e) => break 'bcp Some(format!("bulk_insert(): {e}")) };
let mut row = TokenRow::new();
for i in 1_i64..=16 { row.push(ColumnData::I64(Some(i))); }
row.push(ColumnData::Time(Some(TibTime::new(hns, 7))));
if let Err(e) = req.send(row).await { break 'bcp Some(format!("send(): {e}")) }
if let Err(e) = req.finalize().await { break 'bcp Some(format!("finalize(): {e}")) }
None
};
if let Some(e) = err { results.push(fail(id, col_type, val_desc, e)); }
else {
let rb = select_scalar(&mut c, &format!("SELECT CONVERT(NVARCHAR(50),[t],121) FROM {table}")).await;
results.push(TestResult { id, col_type, val_desc, outcome: Outcome::Pass { read_back: rb } });
}
}
// H3: (BIGINT, DATE, TIME) -- is DateN alone before TimeN the trigger?
{
let id = "H3".to_owned();
let col_type = "(BIGINT, DATE, TIME(7)) -- DateN alone before TimeN".to_owned();
let val_desc = "Time(scale=7) at colid=3; DATE at colid=2".to_owned();
let mut c = match connect().await {
Ok(c) => c,
Err(e) => { results.push(fail(id, col_type, val_desc, format!("connect: {e}"))); return results; }
};
let table = "#h3";
let _ = c.simple_query(format!("IF OBJECT_ID('tempdb..{table}') IS NOT NULL DROP TABLE {table}")).await;
let _ = c.simple_query(format!("CREATE TABLE {table} ([id] BIGINT NULL, [d] DATE NULL, [t] TIME(7) NULL)")).await;
use tiberius::time::{Date as TibDate, Time as TibTime};
let err: Option = 'bcp: {
let mut req = match c.bulk_insert(table).await { Ok(r) => r, Err(e) => break 'bcp Some(format!("bulk_insert(): {e}")) };
let mut row = TokenRow::new();
row.push(ColumnData::I64(Some(1)));
row.push(ColumnData::Date(Some(TibDate::new(738535))));
row.push(ColumnData::Time(Some(TibTime::new(hns, 7))));
if let Err(e) = req.send(row).await { break 'bcp Some(format!("send(): {e}")) }
if let Err(e) = req.finalize().await { break 'bcp Some(format!("finalize(): {e}")) }
None
};
if let Some(e) = err { results.push(fail(id, col_type, val_desc, e)); }
else {
let rb = select_scalar(&mut c, &format!("SELECT CONVERT(NVARCHAR(50),[t],121) FROM {table}")).await;
results.push(TestResult { id, col_type, val_desc, outcome: Outcome::Pass { read_back: rb } });
}
}
// H4: (BIGINT, NVARCHAR(255), TIME) -- is variable-length NVARCHAR alone the trigger?
{
let id = "H4".to_owned();
let col_type = "(BIGINT, NVARCHAR(255), TIME(7)) -- NVARCHAR alone before TIME".to_owned();
let val_desc = "Time(scale=7) at colid=3; NVARCHAR(255) at colid=2".to_owned();
let mut c = match connect().await {
Ok(c) => c,
Err(e) => { results.push(fail(id, col_type, val_desc, format!("connect: {e}"))); return results; }
};
let table = "#h4";
let _ = c.simple_query(format!("IF OBJECT_ID('tempdb..{table}') IS NOT NULL DROP TABLE {table}")).await;
let _ = c.simple_query(format!("CREATE TABLE {table} ([id] BIGINT NULL, [s] NVARCHAR(255) NULL, [t] TIME(7) NULL)")).await;
use tiberius::time::Time as TibTime;
let err: Option = 'bcp: {
let mut req = match c.bulk_insert(table).await { Ok(r) => r, Err(e) => break 'bcp Some(format!("bulk_insert(): {e}")) };
let mut row = TokenRow::new();
row.push(ColumnData::I64(Some(1)));
row.push(ColumnData::String(Some(Cow::Borrowed("hello"))));
row.push(ColumnData::Time(Some(TibTime::new(hns, 7))));
if let Err(e) = req.send(row).await { break 'bcp Some(format!("send(): {e}")) }
if let Err(e) = req.finalize().await { break 'bcp Some(format!("finalize(): {e}")) }
None
};
if let Some(e) = err { results.push(fail(id, col_type, val_desc, e)); }
else {
let rb = select_scalar(&mut c, &format!("SELECT CONVERT(NVARCHAR(50),[t],121) FROM {table}")).await;
results.push(TestResult { id, col_type, val_desc, outcome: Outcome::Pass { read_back: rb } });
}
}
// H5: (BIGINT, BIT, TIME) -- is BIT alone the trigger?
{
let id = "H5".to_owned();
let col_type = "(BIGINT, BIT, TIME(7)) -- BIT alone before TIME".to_owned();
let val_desc = "Time(scale=7) at colid=3; BIT at colid=2".to_owned();
let mut c = match connect().await {
Ok(c) => c,
Err(e) => { results.push(fail(id, col_type, val_desc, format!("connect: {e}"))); return results; }
};
let table = "#h5";
let _ = c.simple_query(format!("IF OBJECT_ID('tempdb..{table}') IS NOT NULL DROP TABLE {table}")).await;
let _ = c.simple_query(format!("CREATE TABLE {table} ([id] BIGINT NULL, [b] BIT NULL, [t] TIME(7) NULL)")).await;
use tiberius::time::Time as TibTime;
let err: Option = 'bcp: {
let mut req = match c.bulk_insert(table).await { Ok(r) => r, Err(e) => break 'bcp Some(format!("bulk_insert(): {e}")) };
let mut row = TokenRow::new();
row.push(ColumnData::I64(Some(1)));
row.push(ColumnData::Bit(Some(true)));
row.push(ColumnData::Time(Some(TibTime::new(hns, 7))));
if let Err(e) = req.send(row).await { break 'bcp Some(format!("send(): {e}")) }
if let Err(e) = req.finalize().await { break 'bcp Some(format!("finalize(): {e}")) }
None
};
if let Some(e) = err { results.push(fail(id, col_type, val_desc, e)); }
else {
let rb = select_scalar(&mut c, &format!("SELECT CONVERT(NVARCHAR(50),[t],121) FROM {table}")).await;
results.push(TestResult { id, col_type, val_desc, outcome: Outcome::Pass { read_back: rb } });
}
}
results
}
// -- helpers -------------------------------------------------------------------
fn fail(id: String, col_type: String, val_desc: String, error: String) -> TestResult {
TestResult { id, col_type, val_desc, outcome: Outcome::Fail { error } }
}
async fn select_scalar(c: &mut Conn, sql: &str) -> String {
match c.simple_query(sql).await {
Err(e) => format!(""),
Ok(s) => match s.into_results().await {
Err(e) => format!(""),
Ok(results) => results
.first()
.and_then(|rows| rows.first())
.and_then(|row| row.get(0))
.unwrap_or("")
.to_owned(),
},
}
}
fn separator() {
println!("{}", "-".repeat(120));
}
// -- main ----------------------------------------------------------------------
#[tokio::main]
async fn main() -> Result<()> {
{
let host = env("MSSQL_HOST", "localhost");
let port = env("MSSQL_PORT", "1433");
let db = env("MSSQL_DATABASE", "etl_target");
let user = env("MSSQL_USER", "sa");
println!("Connecting to mssql://{}@{}:{}/{} (trust_cert) ...", user, host, port, db);
let mut c = connect().await?;
let ver = select_scalar(&mut c, "SELECT @@VERSION").await;
println!("Server : {}\n", ver.lines().next().unwrap_or(&ver));
}
let ts = "02:11:50.123456"; // 15 chars
separator();
println!(" {:>3} {:<36} {:<50} {} {}", "ID", "COL TYPE", "ColumnData", "RESULT", "detail");
separator();
// A: baseline plain strings
println!("\n-- A baseline strings -- all should PASS ----------------------------------------------------\n");
let a: Vec = tokio::join!(
bcp_single("A1", "NVARCHAR(MAX)", ColumnData::String(Some(Cow::Owned("hello".into()))), "String(\"hello\")"),
bcp_single("A2", "NVARCHAR(50)", ColumnData::String(Some(Cow::Owned("hello".into()))), "String(\"hello\")"),
bcp_single("A3", "NVARCHAR(20)", ColumnData::String(Some(Cow::Owned("hello".into()))), "String(\"hello\")"),
bcp_single("A4", "NVARCHAR(MAX)", ColumnData::String(None), "String(None/NULL)"),
).into_vec();
for r in &a { r.print(); }
// B: time-formatted strings into NVARCHAR
println!("\n-- B time-formatted strings into NVARCHAR -- does bounded width matter? -------------------\n");
let b4_desc = format!("String(ts) -> NVARCHAR(15) (ts is {} chars)", ts.len());
let b: Vec = tokio::join!(
bcp_single("B1", "NVARCHAR(MAX)", ColumnData::String(Some(Cow::Owned(ts.into()))), "String(ts) -> NVARCHAR(MAX)"),
bcp_single("B2", "NVARCHAR(50)", ColumnData::String(Some(Cow::Owned(ts.into()))), "String(ts) -> NVARCHAR(50)"),
bcp_single("B3", "NVARCHAR(20)", ColumnData::String(Some(Cow::Owned(ts.into()))), "String(ts) -> NVARCHAR(20)"),
bcp_single("B4", "NVARCHAR(15)", ColumnData::String(Some(Cow::Owned(ts.into()))), &b4_desc),
).into_vec();
for r in &b { r.print(); }
// C: ColumnData::Time into single-column TIME tables
println!("\n-- C ColumnData::Time into single-column TIME tables ----------------------------------------\n");
println!(" C1/C2 pass (TIME(7) target, scale 7 or 6). C3/C4 fail: column scale mismatch.\n");
let us: u64 = 2 * 3_600_000_000 + 11 * 60_000_000 + 50_000_000 + 123_456;
let hns: u64 = us * 10;
let secs: u64 = 2 * 3600 + 11 * 60 + 50;
let c1 = bcp_single("C1", "TIME(7)", ColumnData::Time(Some(TibTime::new(hns, 7))), "Time(TibTime scale=7 / 100ns)").await;
let c2 = bcp_single("C2", "TIME(7)", ColumnData::Time(Some(TibTime::new(us, 6))), "Time(TibTime scale=6 / us) -> TIME(7)").await;
let c3 = bcp_single("C3", "TIME(6)", ColumnData::Time(Some(TibTime::new(us, 6))), "Time(TibTime scale=6 / us) -> TIME(6)").await;
let c4 = bcp_single("C4", "TIME(0)", ColumnData::Time(Some(TibTime::new(secs, 0))), "Time(TibTime scale=0 / secs) -> TIME(0)").await;
let c5 = bcp_single("C5", "TIME(7)", ColumnData::Time(None), "Time(None / NULL) -> TIME(7)").await;
for r in &[&c1, &c2, &c3, &c4, &c5] { r.print(); }
// D: String directly into TIME(7)
println!("\n-- D ColumnData::String directly into TIME(7) -- expected: FAIL ----------------------------\n");
let d1 = bcp_single("D1", "TIME(7)", ColumnData::String(Some(Cow::Owned(ts.into()))),
"String(ts) -> TIME(7) (no staging)").await;
d1.print();
// E: multi-column NVARCHAR staging row
println!("\n-- E multi-column row (BIGINT + NVARCHAR(MAX)x2) mirroring #etl_bulk_stage ----------------\n");
let e1 = bcp_multi_column().await;
e1.print();
// F: full staging pattern
println!("\n-- F full staging pattern: BCP String->NVARCHAR(MAX), INSERT SELECT CAST(->TIME(7)) --------\n");
let f1 = staging_end_to_end().await;
f1.print();
// G: everything -> single-column real TIME(7) column
println!("\n-- G all strategies vs a real single-column TIME(7) target ----------------------------------\n");
let g = bcp_into_time7_column().await;
for r in &g { r.print(); }
// H: multi-column BCP -- TIME at non-first position + bisection
println!("\n-- H multi-column BCP with TIME at non-first position + bisection ---------------------------\n");
println!(" H1: mixed types (BIGINT, NVARCHAR, BIT, DATE, TIME) -- the etl_lib scenario");
println!(" H2: BIGINT x16 then TIME -- all fixed-length, no mixed types");
println!(" H3: BIGINT + DATE + TIME -- is DateN alone the culprit?");
println!(" H4: BIGINT + NVARCHAR + TIME -- is variable-length NVARCHAR the culprit?");
println!(" H5: BIGINT + BIT + TIME -- is BIT the culprit?\n");
let h = bcp_multi_column_time().await;
for r in &h { r.print(); }
// summary
let all: Vec<&TestResult> = a.iter()
.chain(b.iter())
.chain([&c1, &c2, &c3, &c4, &c5].iter().copied())
.chain([&d1, &e1, &f1].iter().copied())
.chain(g.iter())
.chain(h.iter())
.collect();
let passed = all.iter().filter(|r| r.passed()).count();
let failed = all.iter().filter(|r| !r.passed()).count();
println!();
separator();
println!(" {} passed {} failed ({} total)", passed, failed, all.len());
separator();
println!();
Ok(())
}
// -- helper: turn a 4-tuple of futures into a Vec -----------------------------
trait IntoVec { fn into_vec(self) -> Vec; }
impl IntoVec for (TestResult, TestResult, TestResult, TestResult) {
fn into_vec(self) -> Vec { vec![self.0, self.1, self.2, self.3] }
}
Hypothesis
This appears to be a bug in the bulk metadata encoding for:
DateN → TimeN
Possibly:
-
incorrect column metadata ordering
-
incorrect type token
-
incorrect length metadata
Impact
This affects ETL pipelines that use bulk_insert() for tables containing:
DATE column
followed by
TIME column
This schema pattern is common in event or audit tables.