From 86a263496211d271a723f183cd682b18fcbaf801 Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Fri, 20 Mar 2026 14:12:36 -0500 Subject: [PATCH 1/3] rust(feat): Support calculated channels in sift-cli --- rust/crates/sift_cli/src/cli/mod.rs | 12 ++ rust/crates/sift_cli/src/cmd/export.rs | 113 +++++++++++++++++- .../sift_cli/src/util/calculated_channel.rs | 87 ++++++++++++++ rust/crates/sift_cli/src/util/mod.rs | 1 + 4 files changed, 208 insertions(+), 5 deletions(-) create mode 100644 rust/crates/sift_cli/src/util/calculated_channel.rs diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index 0a879fb1d..92fc87336 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -108,6 +108,18 @@ pub struct ExportArgs { #[arg(long)] pub channel_id: Vec, + /// Regular expression used to filter calculated channels to include in the export + #[arg(long)] + pub calc_channel_regex: Option, + + /// Name of calculated channel to include in the export; can be specified multiple times + #[arg(long)] + pub calc_channel: Vec, + + /// ID of calculated channel to include in the export; can be specified multiple times + #[arg(long)] + pub calc_channel_id: Vec, + /// Start time in RFC 3339 format (required for asset exports) #[arg(long)] pub start: Option, diff --git a/rust/crates/sift_cli/src/cmd/export.rs b/rust/crates/sift_cli/src/cmd/export.rs index 0b69e3339..064c15d0a 100644 --- a/rust/crates/sift_cli/src/cmd/export.rs +++ b/rust/crates/sift_cli/src/cmd/export.rs @@ -17,8 +17,8 @@ use sift_rs::{ SiftChannel, assets::v1::{ListAssetsRequest, ListAssetsResponse, asset_service_client::AssetServiceClient}, exports::v1::{ - AssetsAndTimeRange, ExportDataRequest, ExportDataResponse, ExportOutputFormat, - GetDownloadUrlRequest, GetDownloadUrlResponse, RunsAndTimeRange, + AssetsAndTimeRange, CalculatedChannelConfig, ExportDataRequest, ExportDataResponse, + ExportOutputFormat, GetDownloadUrlRequest, GetDownloadUrlResponse, RunsAndTimeRange, export_data_request::TimeSelection, export_service_client::ExportServiceClient, }, jobs::v1::JobStatus, @@ -30,8 +30,14 @@ use zip::ZipArchive; use crate::{ cli::{ExportAssetArgs, ExportRunArgs}, util::{ - api::create_grpc_channel, channel::filter_channels, job::JobServiceWrapper, - progress::Spinner, tty::Output, + api::create_grpc_channel, + calculated_channel::{ + channel_applies_to_assets, filter_calculated_channels, to_calculated_channel_config, + }, + channel::filter_channels, + job::JobServiceWrapper, + progress::Spinner, + tty::Output, }, }; @@ -120,6 +126,53 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result { } } + let mut calculated_channel_configs: Vec = Vec::new(); + + if !args.common.calc_channel.is_empty() { + let names_cel = args + .common + .calc_channel + .iter() + .map(|c| format!("'{c}'")) + .collect::>() + .join(","); + let filter = format!("name in [{names_cel}]"); + let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; + + for channel in query_res { + if channel_applies_to_assets(&channel, &run.asset_ids) { + calculated_channel_configs.push(to_calculated_channel_config(channel)?); + } + } + } + + if let Some(re) = args.common.calc_channel_regex { + let filter = format!("name.matches(\"{re}\")"); + let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; + + for channel in query_res { + if channel_applies_to_assets(&channel, &run.asset_ids) { + calculated_channel_configs.push(to_calculated_channel_config(channel)?); + } + } + } + + if !args.common.calc_channel_id.is_empty() { + let ids_cel = args + .common + .calc_channel_id + .iter() + .map(|id| format!("'{id}'")) + .collect::>() + .join(","); + let filter = format!("calculated_channel_id in [{ids_cel}]"); + let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; + + for channel in query_res { + calculated_channel_configs.push(to_calculated_channel_config(channel)?); + } + } + let start_time = args .common .start @@ -144,6 +197,7 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result { let export_req = ExportDataRequest { channel_ids, + calculated_channel_configs, output_format: ExportOutputFormat::from(args.common.format).into(), time_selection: Some(TimeSelection::RunsAndTimeRange(RunsAndTimeRange { start_time, @@ -227,11 +281,60 @@ pub async fn asset(ctx: Context, args: ExportAssetArgs) -> Result { } } + let asset_ids = vec![asset_id.to_string()]; + let mut calculated_channel_configs: Vec = Vec::new(); + + if !args.common.calc_channel.is_empty() { + let names_cel = args + .common + .calc_channel + .iter() + .map(|c| format!("'{c}'")) + .collect::>() + .join(","); + let filter = format!("name in [{names_cel}]"); + let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; + + for channel in query_res { + if channel_applies_to_assets(&channel, &asset_ids) { + calculated_channel_configs.push(to_calculated_channel_config(channel)?); + } + } + } + + if let Some(re) = args.common.calc_channel_regex { + let filter = format!("name.matches(\"{re}\")"); + let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; + + for channel in query_res { + if channel_applies_to_assets(&channel, &asset_ids) { + calculated_channel_configs.push(to_calculated_channel_config(channel)?); + } + } + } + + if !args.common.calc_channel_id.is_empty() { + let ids_cel = args + .common + .calc_channel_id + .iter() + .map(|id| format!("'{id}'")) + .collect::>() + .join(","); + let filter = format!("calculated_channel_id in [{ids_cel}]"); + let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; + + for channel in query_res { + calculated_channel_configs.push(to_calculated_channel_config(channel)?); + } + } + let export_req = ExportDataRequest { channel_ids, + calculated_channel_configs, output_format: ExportOutputFormat::from(args.common.format).into(), time_selection: Some(TimeSelection::AssetsAndTimeRange(AssetsAndTimeRange { - asset_ids: vec![asset_id.to_string()], + asset_ids, start_time: Some(start_time), stop_time: Some(stop_time), })), diff --git a/rust/crates/sift_cli/src/util/calculated_channel.rs b/rust/crates/sift_cli/src/util/calculated_channel.rs new file mode 100644 index 000000000..413de0f6e --- /dev/null +++ b/rust/crates/sift_cli/src/util/calculated_channel.rs @@ -0,0 +1,87 @@ +use anyhow::{Context, Result, anyhow}; +use sift_rs::{ + SiftChannel, + calculated_channels::v2::{ + CalculatedChannel, ListCalculatedChannelsRequest, ListCalculatedChannelsResponse, + calculated_channel_query_configuration::Query, + calculated_channel_service_client::CalculatedChannelServiceClient, + }, + exports::v1::CalculatedChannelConfig, +}; + +pub async fn filter_calculated_channels( + grpc_channel: SiftChannel, + filter: &str, +) -> Result> { + let mut service = CalculatedChannelServiceClient::new(grpc_channel); + let mut page_token = String::new(); + let mut query_result = Vec::new(); + + loop { + let ListCalculatedChannelsResponse { + calculated_channels, + next_page_token, + .. + } = service + .list_calculated_channels(ListCalculatedChannelsRequest { + page_token, + filter: filter.to_string(), + page_size: 1000, + ..Default::default() + }) + .await + .context("failed to query calculated channels")? + .into_inner(); + + query_result.extend(calculated_channels.into_iter()); + + if next_page_token.is_empty() { + break; + } + page_token = next_page_token; + } + Ok(query_result) +} + +pub fn channel_applies_to_assets(channel: &CalculatedChannel, asset_ids: &[String]) -> bool { + use sift_rs::calculated_channels::v2::calculated_channel_asset_configuration::AssetScope; + + let Some(config) = &channel.calculated_channel_configuration else { + return true; + }; + let Some(asset_config) = &config.asset_configuration else { + return true; + }; + match &asset_config.asset_scope { + None => true, + Some(AssetScope::AllAssets(_)) => true, + Some(AssetScope::Selection(selection)) => { + selection.asset_ids.iter().any(|id| asset_ids.contains(id)) + } + } +} + +pub fn to_calculated_channel_config(channel: CalculatedChannel) -> Result { + let name = channel.name.clone(); + let units = channel.units.clone(); + + let config = channel + .calculated_channel_configuration + .ok_or_else(|| anyhow!("calculated channel '{name}' has no configuration"))?; + + let query_config = config + .query_configuration + .ok_or_else(|| anyhow!("calculated channel '{name}' has no query configuration"))?; + + let sel = match query_config.query { + Some(Query::Sel(sel)) => sel, + None => return Err(anyhow!("calculated channel '{name}' has no query")), + }; + + Ok(CalculatedChannelConfig { + name, + expression: sel.expression, + channel_references: sel.expression_channel_references, + units, + }) +} diff --git a/rust/crates/sift_cli/src/util/mod.rs b/rust/crates/sift_cli/src/util/mod.rs index 3165a95f8..e1e3fe2f3 100644 --- a/rust/crates/sift_cli/src/util/mod.rs +++ b/rust/crates/sift_cli/src/util/mod.rs @@ -1,4 +1,5 @@ pub mod api; +pub mod calculated_channel; pub mod channel; pub mod job; pub mod progress; From 8c418293b11ed391692160b98fcf93fd5dc1e9bc Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Fri, 20 Mar 2026 15:40:52 -0500 Subject: [PATCH 2/3] rust(feat): Add calculated-channel export support --- rust/crates/sift_cli/src/cmd/export.rs | 35 ++++-- .../sift_cli/src/util/calculated_channel.rs | 110 ++++++++++++++---- 2 files changed, 115 insertions(+), 30 deletions(-) diff --git a/rust/crates/sift_cli/src/cmd/export.rs b/rust/crates/sift_cli/src/cmd/export.rs index 064c15d0a..4f9b4170e 100644 --- a/rust/crates/sift_cli/src/cmd/export.rs +++ b/rust/crates/sift_cli/src/cmd/export.rs @@ -32,7 +32,8 @@ use crate::{ util::{ api::create_grpc_channel, calculated_channel::{ - channel_applies_to_assets, filter_calculated_channels, to_calculated_channel_config, + ResolveScope, channel_applies_to_assets, filter_calculated_channels, + resolve_to_calculated_channel_configs, }, channel::filter_channels, job::JobServiceWrapper, @@ -126,6 +127,7 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result { } } + let scope = ResolveScope::Run(&run.run_id); let mut calculated_channel_configs: Vec = Vec::new(); if !args.common.calc_channel.is_empty() { @@ -141,7 +143,10 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result { for channel in query_res { if channel_applies_to_assets(&channel, &run.asset_ids) { - calculated_channel_configs.push(to_calculated_channel_config(channel)?); + let configs = + resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope) + .await?; + calculated_channel_configs.extend(configs); } } } @@ -152,7 +157,10 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result { for channel in query_res { if channel_applies_to_assets(&channel, &run.asset_ids) { - calculated_channel_configs.push(to_calculated_channel_config(channel)?); + let configs = + resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope) + .await?; + calculated_channel_configs.extend(configs); } } } @@ -169,7 +177,10 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result { let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; for channel in query_res { - calculated_channel_configs.push(to_calculated_channel_config(channel)?); + let configs = + resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope) + .await?; + calculated_channel_configs.extend(configs); } } @@ -282,6 +293,7 @@ pub async fn asset(ctx: Context, args: ExportAssetArgs) -> Result { } let asset_ids = vec![asset_id.to_string()]; + let scope = ResolveScope::Assets(&asset_ids); let mut calculated_channel_configs: Vec = Vec::new(); if !args.common.calc_channel.is_empty() { @@ -297,7 +309,10 @@ pub async fn asset(ctx: Context, args: ExportAssetArgs) -> Result { for channel in query_res { if channel_applies_to_assets(&channel, &asset_ids) { - calculated_channel_configs.push(to_calculated_channel_config(channel)?); + let configs = + resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope) + .await?; + calculated_channel_configs.extend(configs); } } } @@ -308,7 +323,10 @@ pub async fn asset(ctx: Context, args: ExportAssetArgs) -> Result { for channel in query_res { if channel_applies_to_assets(&channel, &asset_ids) { - calculated_channel_configs.push(to_calculated_channel_config(channel)?); + let configs = + resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope) + .await?; + calculated_channel_configs.extend(configs); } } } @@ -325,7 +343,10 @@ pub async fn asset(ctx: Context, args: ExportAssetArgs) -> Result { let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; for channel in query_res { - calculated_channel_configs.push(to_calculated_channel_config(channel)?); + let configs = + resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope) + .await?; + calculated_channel_configs.extend(configs); } } diff --git a/rust/crates/sift_cli/src/util/calculated_channel.rs b/rust/crates/sift_cli/src/util/calculated_channel.rs index 413de0f6e..90ca941ed 100644 --- a/rust/crates/sift_cli/src/util/calculated_channel.rs +++ b/rust/crates/sift_cli/src/util/calculated_channel.rs @@ -2,13 +2,24 @@ use anyhow::{Context, Result, anyhow}; use sift_rs::{ SiftChannel, calculated_channels::v2::{ - CalculatedChannel, ListCalculatedChannelsRequest, ListCalculatedChannelsResponse, - calculated_channel_query_configuration::Query, + CalculatedChannel, CalculatedChannelAbstractChannelReference, + ListCalculatedChannelsRequest, ListCalculatedChannelsResponse, + ResolveCalculatedChannelRequest, calculated_channel_asset_configuration::AssetScope, calculated_channel_service_client::CalculatedChannelServiceClient, + resolve_calculated_channel_request::CalculatedChannel as RequestCalculatedChannel, + }, + common::r#type::v1::{ + Ids, NamedResources, ResourceIdentifier, named_resources::Resources, + resource_identifier::Identifier, }, exports::v1::CalculatedChannelConfig, }; +pub enum ResolveScope<'a> { + Run(&'a str), + Assets(&'a [String]), +} + pub async fn filter_calculated_channels( grpc_channel: SiftChannel, filter: &str, @@ -44,8 +55,6 @@ pub async fn filter_calculated_channels( } pub fn channel_applies_to_assets(channel: &CalculatedChannel, asset_ids: &[String]) -> bool { - use sift_rs::calculated_channels::v2::calculated_channel_asset_configuration::AssetScope; - let Some(config) = &channel.calculated_channel_configuration else { return true; }; @@ -61,27 +70,82 @@ pub fn channel_applies_to_assets(channel: &CalculatedChannel, asset_ids: &[Strin } } -pub fn to_calculated_channel_config(channel: CalculatedChannel) -> Result { - let name = channel.name.clone(); - let units = channel.units.clone(); +pub async fn resolve_to_calculated_channel_configs( + grpc_channel: SiftChannel, + channel: &CalculatedChannel, + scope: &ResolveScope<'_>, +) -> Result> { + let mut service = CalculatedChannelServiceClient::new(grpc_channel); + + let (assets, run) = match scope { + ResolveScope::Run(run_id) => ( + None, + Some(ResourceIdentifier { + identifier: Some(Identifier::Id(run_id.to_string())), + }), + ), + ResolveScope::Assets(asset_ids) => ( + Some(NamedResources { + resources: Some(Resources::Ids(Ids { + ids: asset_ids.to_vec(), + })), + }), + None, + ), + }; + + let response = service + .resolve_calculated_channel(ResolveCalculatedChannelRequest { + assets, + run, + calculated_channel: Some(RequestCalculatedChannel::Identifier(ResourceIdentifier { + identifier: Some(Identifier::Id(channel.calculated_channel_id.clone())), + })), + ..Default::default() + }) + .await + .with_context(|| format!("failed to resolve calculated channel '{}'", channel.name))? + .into_inner(); - let config = channel - .calculated_channel_configuration - .ok_or_else(|| anyhow!("calculated channel '{name}' has no configuration"))?; + if !response.unresolved.is_empty() { + let assets: Vec<_> = response + .unresolved + .iter() + .map(|u| format!("'{}': {}", u.asset_name, u.error_message)) + .collect(); + return Err(anyhow!( + "calculated channel '{}' could not be resolved for the following assets:\n{}", + channel.name, + assets.join("\n") + )); + } - let query_config = config - .query_configuration - .ok_or_else(|| anyhow!("calculated channel '{name}' has no query configuration"))?; + response + .resolved + .into_iter() + .map(|resolved| { + let expr = resolved.expression_request.ok_or_else(|| { + anyhow!( + "resolved calculated channel '{}' has no expression request", + channel.name + ) + })?; - let sel = match query_config.query { - Some(Query::Sel(sel)) => sel, - None => return Err(anyhow!("calculated channel '{name}' has no query")), - }; + let channel_references = expr + .expression_channel_references + .into_iter() + .map(|r| CalculatedChannelAbstractChannelReference { + channel_reference: r.channel_reference, + channel_identifier: r.channel_id, + }) + .collect(); - Ok(CalculatedChannelConfig { - name, - expression: sel.expression, - channel_references: sel.expression_channel_references, - units, - }) + Ok(CalculatedChannelConfig { + name: channel.name.clone(), + expression: expr.expression, + channel_references, + units: channel.units.clone(), + }) + }) + .collect() } From 240cb2dc9738e68eaf9a5377d5ff57dcd152105a Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Fri, 20 Mar 2026 17:12:53 -0500 Subject: [PATCH 3/3] PR feedback --- rust/crates/sift_cli/src/cli/mod.rs | 6 +++--- rust/crates/sift_cli/src/cmd/export.rs | 28 +++++++++++++------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index 92fc87336..4af508ae0 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -110,15 +110,15 @@ pub struct ExportArgs { /// Regular expression used to filter calculated channels to include in the export #[arg(long)] - pub calc_channel_regex: Option, + pub calculated_channel_regex: Option, /// Name of calculated channel to include in the export; can be specified multiple times #[arg(long)] - pub calc_channel: Vec, + pub calculated_channel: Vec, /// ID of calculated channel to include in the export; can be specified multiple times #[arg(long)] - pub calc_channel_id: Vec, + pub calculated_channel_id: Vec, /// Start time in RFC 3339 format (required for asset exports) #[arg(long)] diff --git a/rust/crates/sift_cli/src/cmd/export.rs b/rust/crates/sift_cli/src/cmd/export.rs index 4f9b4170e..90340e85f 100644 --- a/rust/crates/sift_cli/src/cmd/export.rs +++ b/rust/crates/sift_cli/src/cmd/export.rs @@ -17,8 +17,8 @@ use sift_rs::{ SiftChannel, assets::v1::{ListAssetsRequest, ListAssetsResponse, asset_service_client::AssetServiceClient}, exports::v1::{ - AssetsAndTimeRange, CalculatedChannelConfig, ExportDataRequest, ExportDataResponse, - ExportOutputFormat, GetDownloadUrlRequest, GetDownloadUrlResponse, RunsAndTimeRange, + AssetsAndTimeRange, ExportDataRequest, ExportDataResponse, ExportOutputFormat, + GetDownloadUrlRequest, GetDownloadUrlResponse, RunsAndTimeRange, export_data_request::TimeSelection, export_service_client::ExportServiceClient, }, jobs::v1::JobStatus, @@ -128,12 +128,12 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result { } let scope = ResolveScope::Run(&run.run_id); - let mut calculated_channel_configs: Vec = Vec::new(); + let mut calculated_channel_configs = Vec::new(); - if !args.common.calc_channel.is_empty() { + if !args.common.calculated_channel.is_empty() { let names_cel = args .common - .calc_channel + .calculated_channel .iter() .map(|c| format!("'{c}'")) .collect::>() @@ -151,7 +151,7 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result { } } - if let Some(re) = args.common.calc_channel_regex { + if let Some(re) = args.common.calculated_channel_regex { let filter = format!("name.matches(\"{re}\")"); let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; @@ -165,10 +165,10 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result { } } - if !args.common.calc_channel_id.is_empty() { + if !args.common.calculated_channel_id.is_empty() { let ids_cel = args .common - .calc_channel_id + .calculated_channel_id .iter() .map(|id| format!("'{id}'")) .collect::>() @@ -294,12 +294,12 @@ pub async fn asset(ctx: Context, args: ExportAssetArgs) -> Result { let asset_ids = vec![asset_id.to_string()]; let scope = ResolveScope::Assets(&asset_ids); - let mut calculated_channel_configs: Vec = Vec::new(); + let mut calculated_channel_configs = Vec::new(); - if !args.common.calc_channel.is_empty() { + if !args.common.calculated_channel.is_empty() { let names_cel = args .common - .calc_channel + .calculated_channel .iter() .map(|c| format!("'{c}'")) .collect::>() @@ -317,7 +317,7 @@ pub async fn asset(ctx: Context, args: ExportAssetArgs) -> Result { } } - if let Some(re) = args.common.calc_channel_regex { + if let Some(re) = args.common.calculated_channel_regex { let filter = format!("name.matches(\"{re}\")"); let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; @@ -331,10 +331,10 @@ pub async fn asset(ctx: Context, args: ExportAssetArgs) -> Result { } } - if !args.common.calc_channel_id.is_empty() { + if !args.common.calculated_channel_id.is_empty() { let ids_cel = args .common - .calc_channel_id + .calculated_channel_id .iter() .map(|id| format!("'{id}'")) .collect::>()