Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions rust/crates/sift_cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ pub struct ExportArgs {
#[arg(long)]
pub channel_id: Vec<String>,

/// Regular expression used to filter calculated channels to include in the export
#[arg(long)]
pub calculated_channel_regex: Option<String>,

/// Name of calculated channel to include in the export; can be specified multiple times
#[arg(long)]
pub calculated_channel: Vec<String>,

/// ID of calculated channel to include in the export; can be specified multiple times
#[arg(long)]
pub calculated_channel_id: Vec<String>,

/// Start time in RFC 3339 format (required for asset exports)
#[arg(long)]
pub start: Option<String>,
Expand Down
130 changes: 127 additions & 3 deletions rust/crates/sift_cli/src/cmd/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,15 @@ use zip::ZipArchive;
use crate::{
cli::{ExportAssetArgs, ExportRunArgs},
util::{
api::create_grpc_channel, channel::filter_channels, job::JobServiceWrapper,
progress::Spinner, tty::Output,
api::create_grpc_channel,
calculated_channel::{
ResolveScope, channel_applies_to_assets, filter_calculated_channels,
resolve_to_calculated_channel_configs,
},
channel::filter_channels,
job::JobServiceWrapper,
progress::Spinner,
tty::Output,
},
};

Expand Down Expand Up @@ -120,6 +127,63 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result<ExitCode> {
}
}

let scope = ResolveScope::Run(&run.run_id);
let mut calculated_channel_configs = Vec::new();

if !args.common.calculated_channel.is_empty() {
let names_cel = args
.common
.calculated_channel
.iter()
.map(|c| format!("'{c}'"))
.collect::<Vec<_>>()
.join(",");
let filter = format!("name in [{names_cel}]");
let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?;

for channel in query_res {
if channel_applies_to_assets(&channel, &run.asset_ids) {
let configs =
resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope)
.await?;
calculated_channel_configs.extend(configs);
}
}
}

if let Some(re) = args.common.calculated_channel_regex {
let filter = format!("name.matches(\"{re}\")");
let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?;

for channel in query_res {
if channel_applies_to_assets(&channel, &run.asset_ids) {
let configs =
resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope)
.await?;
calculated_channel_configs.extend(configs);
}
}
}

if !args.common.calculated_channel_id.is_empty() {
let ids_cel = args
.common
.calculated_channel_id
.iter()
.map(|id| format!("'{id}'"))
.collect::<Vec<_>>()
.join(",");
let filter = format!("calculated_channel_id in [{ids_cel}]");
let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?;

for channel in query_res {
let configs =
resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope)
.await?;
calculated_channel_configs.extend(configs);
}
}

let start_time = args
.common
.start
Expand All @@ -144,6 +208,7 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result<ExitCode> {

let export_req = ExportDataRequest {
channel_ids,
calculated_channel_configs,
output_format: ExportOutputFormat::from(args.common.format).into(),
time_selection: Some(TimeSelection::RunsAndTimeRange(RunsAndTimeRange {
start_time,
Expand Down Expand Up @@ -227,11 +292,70 @@ pub async fn asset(ctx: Context, args: ExportAssetArgs) -> Result<ExitCode> {
}
}

let asset_ids = vec![asset_id.to_string()];
let scope = ResolveScope::Assets(&asset_ids);
let mut calculated_channel_configs = Vec::new();

if !args.common.calculated_channel.is_empty() {
let names_cel = args
.common
.calculated_channel
.iter()
.map(|c| format!("'{c}'"))
.collect::<Vec<_>>()
.join(",");
let filter = format!("name in [{names_cel}]");
let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?;

for channel in query_res {
if channel_applies_to_assets(&channel, &asset_ids) {
let configs =
resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope)
.await?;
calculated_channel_configs.extend(configs);
}
}
}

if let Some(re) = args.common.calculated_channel_regex {
let filter = format!("name.matches(\"{re}\")");
let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?;

for channel in query_res {
if channel_applies_to_assets(&channel, &asset_ids) {
let configs =
resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope)
.await?;
calculated_channel_configs.extend(configs);
}
}
}

if !args.common.calculated_channel_id.is_empty() {
let ids_cel = args
.common
.calculated_channel_id
.iter()
.map(|id| format!("'{id}'"))
.collect::<Vec<_>>()
.join(",");
let filter = format!("calculated_channel_id in [{ids_cel}]");
let query_res = filter_calculated_channels(grpc_channel.clone(), &filter).await?;

for channel in query_res {
let configs =
resolve_to_calculated_channel_configs(grpc_channel.clone(), &channel, &scope)
.await?;
calculated_channel_configs.extend(configs);
}
}

let export_req = ExportDataRequest {
channel_ids,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user provides only invalid channels or calculated channels, it looks like we revert back to exporting the whole run if I understand this correctly. Should we in that case just return an error instead? Otherwise we're kicking off a huge job the user probably doesn't want.

calculated_channel_configs,
output_format: ExportOutputFormat::from(args.common.format).into(),
time_selection: Some(TimeSelection::AssetsAndTimeRange(AssetsAndTimeRange {
asset_ids: vec![asset_id.to_string()],
asset_ids,
start_time: Some(start_time),
stop_time: Some(stop_time),
})),
Expand Down
151 changes: 151 additions & 0 deletions rust/crates/sift_cli/src/util/calculated_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use anyhow::{Context, Result, anyhow};
use sift_rs::{
SiftChannel,
calculated_channels::v2::{
CalculatedChannel, CalculatedChannelAbstractChannelReference,
ListCalculatedChannelsRequest, ListCalculatedChannelsResponse,
ResolveCalculatedChannelRequest, calculated_channel_asset_configuration::AssetScope,
calculated_channel_service_client::CalculatedChannelServiceClient,
resolve_calculated_channel_request::CalculatedChannel as RequestCalculatedChannel,
},
common::r#type::v1::{
Ids, NamedResources, ResourceIdentifier, named_resources::Resources,
resource_identifier::Identifier,
},
exports::v1::CalculatedChannelConfig,
};

pub enum ResolveScope<'a> {
Run(&'a str),
Assets(&'a [String]),
}

pub async fn filter_calculated_channels(
grpc_channel: SiftChannel,
filter: &str,
) -> Result<Vec<CalculatedChannel>> {
let mut service = CalculatedChannelServiceClient::new(grpc_channel);
let mut page_token = String::new();
let mut query_result = Vec::new();

loop {
let ListCalculatedChannelsResponse {
calculated_channels,
next_page_token,
..
} = service
.list_calculated_channels(ListCalculatedChannelsRequest {
page_token,
filter: filter.to_string(),
page_size: 1000,
..Default::default()
})
.await
.context("failed to query calculated channels")?
.into_inner();

query_result.extend(calculated_channels.into_iter());

if next_page_token.is_empty() {
break;
}
page_token = next_page_token;
}
Ok(query_result)
}

pub fn channel_applies_to_assets(channel: &CalculatedChannel, asset_ids: &[String]) -> bool {
let Some(config) = &channel.calculated_channel_configuration else {
return true;
};
let Some(asset_config) = &config.asset_configuration else {
return true;
};
match &asset_config.asset_scope {
None => true,
Some(AssetScope::AllAssets(_)) => true,
Some(AssetScope::Selection(selection)) => {
selection.asset_ids.iter().any(|id| asset_ids.contains(id))
}
}
}

pub async fn resolve_to_calculated_channel_configs(
grpc_channel: SiftChannel,
channel: &CalculatedChannel,
scope: &ResolveScope<'_>,
) -> Result<Vec<CalculatedChannelConfig>> {
let mut service = CalculatedChannelServiceClient::new(grpc_channel);

let (assets, run) = match scope {
ResolveScope::Run(run_id) => (
None,
Some(ResourceIdentifier {
identifier: Some(Identifier::Id(run_id.to_string())),
}),
),
ResolveScope::Assets(asset_ids) => (
Some(NamedResources {
resources: Some(Resources::Ids(Ids {
ids: asset_ids.to_vec(),
})),
}),
None,
),
};

let response = service
.resolve_calculated_channel(ResolveCalculatedChannelRequest {
assets,
run,
calculated_channel: Some(RequestCalculatedChannel::Identifier(ResourceIdentifier {
identifier: Some(Identifier::Id(channel.calculated_channel_id.clone())),
})),
..Default::default()
})
.await
.with_context(|| format!("failed to resolve calculated channel '{}'", channel.name))?
.into_inner();

if !response.unresolved.is_empty() {
let assets: Vec<_> = response
.unresolved
.iter()
.map(|u| format!("'{}': {}", u.asset_name, u.error_message))
.collect();
return Err(anyhow!(
"calculated channel '{}' could not be resolved for the following assets:\n{}",
channel.name,
assets.join("\n")
));
}

response
.resolved
.into_iter()
.map(|resolved| {
let expr = resolved.expression_request.ok_or_else(|| {
anyhow!(
"resolved calculated channel '{}' has no expression request",
channel.name
)
})?;

let channel_references = expr
.expression_channel_references
.into_iter()
.map(|r| CalculatedChannelAbstractChannelReference {
channel_reference: r.channel_reference,
channel_identifier: r.channel_id,
})
.collect();

Ok(CalculatedChannelConfig {
name: channel.name.clone(),
expression: expr.expression,
channel_references,
units: channel.units.clone(),
})
})
.collect()
}
1 change: 1 addition & 0 deletions rust/crates/sift_cli/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod api;
pub mod calculated_channel;
pub mod channel;
pub mod job;
pub mod progress;
Expand Down
Loading