diff --git a/rust/crates/sift_cli/src/cli/mod.rs b/rust/crates/sift_cli/src/cli/mod.rs index 0a879fb1d..4af508ae0 100644 --- a/rust/crates/sift_cli/src/cli/mod.rs +++ b/rust/crates/sift_cli/src/cli/mod.rs @@ -108,6 +108,18 @@ pub struct ExportArgs { #[arg(long)] pub channel_id: Vec, + /// Regular expression used to filter calculated channels to include in the export + #[arg(long)] + pub calculated_channel_regex: Option, + + /// Name of calculated channel to include in the export; can be specified multiple times + #[arg(long)] + pub calculated_channel: Vec, + + /// ID of calculated channel to include in the export; can be specified multiple times + #[arg(long)] + pub calculated_channel_id: Vec, + /// Start time in RFC 3339 format (required for asset exports) #[arg(long)] pub start: Option, diff --git a/rust/crates/sift_cli/src/cmd/export.rs b/rust/crates/sift_cli/src/cmd/export.rs index 0b69e3339..90340e85f 100644 --- a/rust/crates/sift_cli/src/cmd/export.rs +++ b/rust/crates/sift_cli/src/cmd/export.rs @@ -30,8 +30,15 @@ use zip::ZipArchive; use crate::{ cli::{ExportAssetArgs, ExportRunArgs}, util::{ - api::create_grpc_channel, channel::filter_channels, job::JobServiceWrapper, - progress::Spinner, tty::Output, + api::create_grpc_channel, + calculated_channel::{ + ResolveScope, channel_applies_to_assets, filter_calculated_channels, + resolve_to_calculated_channel_configs, + }, + channel::filter_channels, + job::JobServiceWrapper, + progress::Spinner, + tty::Output, }, }; @@ -120,6 +127,63 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result { } } + let scope = ResolveScope::Run(&run.run_id); + let mut calculated_channel_configs = Vec::new(); + + if !args.common.calculated_channel.is_empty() { + let names_cel = args + .common + .calculated_channel + .iter() + .map(|c| format!("'{c}'")) + .collect::>() + .join(","); + let filter = format!("name in [{names_cel}]"); + let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; + + for channel in query_res { + if channel_applies_to_assets(&channel, &run.asset_ids) { + let configs = + resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope) + .await?; + calculated_channel_configs.extend(configs); + } + } + } + + if let Some(re) = args.common.calculated_channel_regex { + let filter = format!("name.matches(\"{re}\")"); + let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; + + for channel in query_res { + if channel_applies_to_assets(&channel, &run.asset_ids) { + let configs = + resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope) + .await?; + calculated_channel_configs.extend(configs); + } + } + } + + if !args.common.calculated_channel_id.is_empty() { + let ids_cel = args + .common + .calculated_channel_id + .iter() + .map(|id| format!("'{id}'")) + .collect::>() + .join(","); + let filter = format!("calculated_channel_id in [{ids_cel}]"); + let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; + + for channel in query_res { + let configs = + resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope) + .await?; + calculated_channel_configs.extend(configs); + } + } + let start_time = args .common .start @@ -144,6 +208,7 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result { let export_req = ExportDataRequest { channel_ids, + calculated_channel_configs, output_format: ExportOutputFormat::from(args.common.format).into(), time_selection: Some(TimeSelection::RunsAndTimeRange(RunsAndTimeRange { start_time, @@ -227,11 +292,70 @@ pub async fn asset(ctx: Context, args: ExportAssetArgs) -> Result { } } + let asset_ids = vec![asset_id.to_string()]; + let scope = ResolveScope::Assets(&asset_ids); + let mut calculated_channel_configs = Vec::new(); + + if !args.common.calculated_channel.is_empty() { + let names_cel = args + .common + .calculated_channel + .iter() + .map(|c| format!("'{c}'")) + .collect::>() + .join(","); + let filter = format!("name in [{names_cel}]"); + let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; + + for channel in query_res { + if channel_applies_to_assets(&channel, &asset_ids) { + let configs = + resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope) + .await?; + calculated_channel_configs.extend(configs); + } + } + } + + if let Some(re) = args.common.calculated_channel_regex { + let filter = format!("name.matches(\"{re}\")"); + let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; + + for channel in query_res { + if channel_applies_to_assets(&channel, &asset_ids) { + let configs = + resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope) + .await?; + calculated_channel_configs.extend(configs); + } + } + } + + if !args.common.calculated_channel_id.is_empty() { + let ids_cel = args + .common + .calculated_channel_id + .iter() + .map(|id| format!("'{id}'")) + .collect::>() + .join(","); + let filter = format!("calculated_channel_id in [{ids_cel}]"); + let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?; + + for channel in query_res { + let configs = + resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope) + .await?; + calculated_channel_configs.extend(configs); + } + } + let export_req = ExportDataRequest { channel_ids, + calculated_channel_configs, output_format: ExportOutputFormat::from(args.common.format).into(), time_selection: Some(TimeSelection::AssetsAndTimeRange(AssetsAndTimeRange { - asset_ids: vec![asset_id.to_string()], + asset_ids, start_time: Some(start_time), stop_time: Some(stop_time), })), diff --git a/rust/crates/sift_cli/src/util/calculated_channel.rs b/rust/crates/sift_cli/src/util/calculated_channel.rs new file mode 100644 index 000000000..90ca941ed --- /dev/null +++ b/rust/crates/sift_cli/src/util/calculated_channel.rs @@ -0,0 +1,151 @@ +use anyhow::{Context, Result, anyhow}; +use sift_rs::{ + SiftChannel, + calculated_channels::v2::{ + CalculatedChannel, CalculatedChannelAbstractChannelReference, + ListCalculatedChannelsRequest, ListCalculatedChannelsResponse, + ResolveCalculatedChannelRequest, calculated_channel_asset_configuration::AssetScope, + calculated_channel_service_client::CalculatedChannelServiceClient, + resolve_calculated_channel_request::CalculatedChannel as RequestCalculatedChannel, + }, + common::r#type::v1::{ + Ids, NamedResources, ResourceIdentifier, named_resources::Resources, + resource_identifier::Identifier, + }, + exports::v1::CalculatedChannelConfig, +}; + +pub enum ResolveScope<'a> { + Run(&'a str), + Assets(&'a [String]), +} + +pub async fn filter_calculated_channels( + grpc_channel: SiftChannel, + filter: &str, +) -> Result> { + let mut service = CalculatedChannelServiceClient::new(grpc_channel); + let mut page_token = String::new(); + let mut query_result = Vec::new(); + + loop { + let ListCalculatedChannelsResponse { + calculated_channels, + next_page_token, + .. + } = service + .list_calculated_channels(ListCalculatedChannelsRequest { + page_token, + filter: filter.to_string(), + page_size: 1000, + ..Default::default() + }) + .await + .context("failed to query calculated channels")? + .into_inner(); + + query_result.extend(calculated_channels.into_iter()); + + if next_page_token.is_empty() { + break; + } + page_token = next_page_token; + } + Ok(query_result) +} + +pub fn channel_applies_to_assets(channel: &CalculatedChannel, asset_ids: &[String]) -> bool { + let Some(config) = &channel.calculated_channel_configuration else { + return true; + }; + let Some(asset_config) = &config.asset_configuration else { + return true; + }; + match &asset_config.asset_scope { + None => true, + Some(AssetScope::AllAssets(_)) => true, + Some(AssetScope::Selection(selection)) => { + selection.asset_ids.iter().any(|id| asset_ids.contains(id)) + } + } +} + +pub async fn resolve_to_calculated_channel_configs( + grpc_channel: SiftChannel, + channel: &CalculatedChannel, + scope: &ResolveScope<'_>, +) -> Result> { + let mut service = CalculatedChannelServiceClient::new(grpc_channel); + + let (assets, run) = match scope { + ResolveScope::Run(run_id) => ( + None, + Some(ResourceIdentifier { + identifier: Some(Identifier::Id(run_id.to_string())), + }), + ), + ResolveScope::Assets(asset_ids) => ( + Some(NamedResources { + resources: Some(Resources::Ids(Ids { + ids: asset_ids.to_vec(), + })), + }), + None, + ), + }; + + let response = service + .resolve_calculated_channel(ResolveCalculatedChannelRequest { + assets, + run, + calculated_channel: Some(RequestCalculatedChannel::Identifier(ResourceIdentifier { + identifier: Some(Identifier::Id(channel.calculated_channel_id.clone())), + })), + ..Default::default() + }) + .await + .with_context(|| format!("failed to resolve calculated channel '{}'", channel.name))? + .into_inner(); + + if !response.unresolved.is_empty() { + let assets: Vec<_> = response + .unresolved + .iter() + .map(|u| format!("'{}': {}", u.asset_name, u.error_message)) + .collect(); + return Err(anyhow!( + "calculated channel '{}' could not be resolved for the following assets:\n{}", + channel.name, + assets.join("\n") + )); + } + + response + .resolved + .into_iter() + .map(|resolved| { + let expr = resolved.expression_request.ok_or_else(|| { + anyhow!( + "resolved calculated channel '{}' has no expression request", + channel.name + ) + })?; + + let channel_references = expr + .expression_channel_references + .into_iter() + .map(|r| CalculatedChannelAbstractChannelReference { + channel_reference: r.channel_reference, + channel_identifier: r.channel_id, + }) + .collect(); + + Ok(CalculatedChannelConfig { + name: channel.name.clone(), + expression: expr.expression, + channel_references, + units: channel.units.clone(), + }) + }) + .collect() +} diff --git a/rust/crates/sift_cli/src/util/mod.rs b/rust/crates/sift_cli/src/util/mod.rs index 3165a95f8..e1e3fe2f3 100644 --- a/rust/crates/sift_cli/src/util/mod.rs +++ b/rust/crates/sift_cli/src/util/mod.rs @@ -1,4 +1,5 @@ pub mod api; +pub mod calculated_channel; pub mod channel; pub mod job; pub mod progress;