diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 58fc32e0..005c85d3 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -616,6 +616,31 @@ pub struct VideoFrame { } impl VideoFrame { + /// Validate that `layout` is consistent with the given dimensions/format + /// and that `data_len` is large enough. + fn validate_layout( + width: u32, + height: u32, + pixel_format: PixelFormat, + layout: &VideoLayout, + data_len: usize, + ) -> Result<(), StreamKitError> { + let expected_layout = + VideoLayout::aligned(width, height, pixel_format, layout.stride_align()); + if *layout != expected_layout { + return Err(StreamKitError::Runtime(format!( + "VideoFrame layout mismatch: expected {expected_layout:?}, got {layout:?}" + ))); + } + if data_len < layout.total_bytes() { + return Err(StreamKitError::Runtime(format!( + "VideoFrame data buffer too small: need {} bytes, have {data_len}", + layout.total_bytes(), + ))); + } + Ok(()) + } + pub fn from_pooled( width: u32, height: u32, @@ -635,20 +660,7 @@ impl VideoFrame { mut data: PooledVideoData, metadata: Option, ) -> Result { - let expected_layout = - VideoLayout::aligned(width, height, pixel_format, layout.stride_align()); - if layout != expected_layout { - return Err(StreamKitError::Runtime(format!( - "VideoFrame layout mismatch: expected {expected_layout:?}, got {layout:?}" - ))); - } - if data.len() < layout.total_bytes() { - return Err(StreamKitError::Runtime(format!( - "VideoFrame data buffer too small: need {} bytes, have {}", - layout.total_bytes(), - data.len() - ))); - } + Self::validate_layout(width, height, pixel_format, &layout, data.len())?; data.truncate(layout.total_bytes()); Ok(Self { width, height, pixel_format, layout, data: Arc::new(data), metadata }) } @@ -691,20 +703,7 @@ impl VideoFrame { data: Arc, metadata: Option, ) -> Result { - let expected_layout = - VideoLayout::aligned(width, height, pixel_format, layout.stride_align()); - if layout != expected_layout { - return Err(StreamKitError::Runtime(format!( - "VideoFrame layout mismatch: expected {expected_layout:?}, got {layout:?}" - ))); - } - if data.len() < layout.total_bytes() { - return Err(StreamKitError::Runtime(format!( - "VideoFrame data buffer too small: need {} bytes, have {}", - layout.total_bytes(), - data.len() - ))); - } + Self::validate_layout(width, height, pixel_format, &layout, data.len())?; Ok(Self { width, height, pixel_format, layout, data, metadata }) } diff --git a/crates/engine/benches/compositor_only.rs b/crates/engine/benches/compositor_only.rs index 78ff6264..0808f365 100644 --- a/crates/engine/benches/compositor_only.rs +++ b/crates/engine/benches/compositor_only.rs @@ -48,7 +48,7 @@ use streamkit_core::VideoFramePool; use streamkit_nodes::video::compositor::config::Rect; use streamkit_nodes::video::compositor::kernel::{composite_frame, ConversionCache, LayerSnapshot}; use streamkit_nodes::video::compositor::overlay::DecodedOverlay; -use streamkit_nodes::video::compositor::pixel_ops::{rgba8_to_i420_buf, rgba8_to_nv12_buf}; +use streamkit_nodes::video::pixel_ops::{rgba8_to_i420_buf, rgba8_to_nv12_buf}; // ── Default benchmark parameters ──────────────────────────────────────────── diff --git a/crates/engine/benches/pixel_convert.rs b/crates/engine/benches/pixel_convert.rs index b8f17dea..dc16639c 100644 --- a/crates/engine/benches/pixel_convert.rs +++ b/crates/engine/benches/pixel_convert.rs @@ -33,7 +33,7 @@ use std::time::Instant; -use streamkit_nodes::video::compositor::pixel_ops::{ +use streamkit_nodes::video::pixel_ops::{ i420_to_rgba8_buf, nv12_to_rgba8_buf, rgba8_to_i420_buf, rgba8_to_nv12_buf, }; diff --git a/crates/nodes/src/audio/codecs/opus.rs b/crates/nodes/src/audio/codecs/opus.rs index fe53712d..c2cfd262 100644 --- a/crates/nodes/src/audio/codecs/opus.rs +++ b/crates/nodes/src/audio/codecs/opus.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use bytes::Bytes; -use opentelemetry::{global, KeyValue}; +use opentelemetry::global; use schemars::JsonSchema; use serde::Deserialize; use std::sync::Arc; @@ -100,13 +100,12 @@ impl ProcessorNode for OpusDecoderNode { Bytes, Option, )>(get_codec_channel_capacity()); - let (result_tx, mut result_rx) = mpsc::channel::<( - Result, - Option, - )>(get_codec_channel_capacity()); + let (result_tx, mut result_rx) = + mpsc::channel::>(get_codec_channel_capacity()); - // Spawn a single blocking task that will handle all decode operations - // Uses blocking_recv/blocking_send for efficiency - no need for block_on + // Spawn a single blocking task that will handle all decode operations. + // Constructs AudioFrame packets directly so the forward loop can use + // the generic codec_forward_loop helper. let decode_task = tokio::task::spawn_blocking(move || { let mut decoder = match opus::Decoder::new(OPUS_SAMPLE_RATE, opus::Channels::Mono) { Ok(d) => d, @@ -117,44 +116,47 @@ impl ProcessorNode for OpusDecoderNode { }; // Reusable decode buffer - avoids allocation per frame (~7.5KB savings per decode) - // This buffer lives for the lifetime of the decode task let mut decode_buffer = vec![0f32; OPUS_MAX_FRAME_SIZE]; - // Use blocking_recv - efficient for spawn_blocking context while let Some((data, metadata)) = decode_rx.blocking_recv() { let decode_start_time = Instant::now(); - let result = { - // Note: No need to zero the buffer - opus writes to it and we only - // copy out decoded_len samples, so stale data is never read. - match decoder.decode_float(&data, &mut decode_buffer, false) { - Ok(decoded_len) => audio_pool.as_ref().map_or_else( - || Ok(PooledSamples::from_vec(decode_buffer[..decoded_len].to_vec())), + let result = match decoder.decode_float(&data, &mut decode_buffer, false) { + Ok(decoded_len) => { + // Skip empty decode results + if decoded_len == 0 { + decode_duration_histogram + .record(decode_start_time.elapsed().as_secs_f64(), &[]); + continue; + } + let samples = audio_pool.as_ref().map_or_else( + || PooledSamples::from_vec(decode_buffer[..decoded_len].to_vec()), |pool| { - let mut samples = pool.get(decoded_len); - samples - .as_mut_slice() - .copy_from_slice(&decode_buffer[..decoded_len]); - Ok(samples) + let mut s = pool.get(decoded_len); + s.as_mut_slice().copy_from_slice(&decode_buffer[..decoded_len]); + s }, - ), - Err(e) => Err(e.to_string()), - } + ); + Ok(Packet::Audio(AudioFrame::from_pooled( + OPUS_SAMPLE_RATE, + 1, + samples, + metadata, + ))) + }, + Err(e) => Err(e.to_string()), }; decode_duration_histogram.record(decode_start_time.elapsed().as_secs_f64(), &[]); - // Use blocking_send - efficient for spawn_blocking context - if result_tx.blocking_send((result, metadata)).is_err() { - break; // Main task has shut down + if result_tx.blocking_send(result).is_err() { + break; } } }); state_helpers::emit_running(&context.state_tx, &node_name); - let mut audio_packet_count = 0; - // Stats tracking let mut stats_tracker = NodeStatsTracker::new(node_name.clone(), context.stats_tx.clone()); @@ -211,153 +213,22 @@ impl ProcessorNode for OpusDecoderNode { tracing::info!("OpusDecoderNode input stream closed"); }); - // Process results from the blocking task - loop { - tokio::select! { - maybe_result = result_rx.recv() => { - match maybe_result { - Some((Ok(decoded_samples), metadata)) => { - packets_processed_counter.add(1, &[KeyValue::new("status", "ok")]); - stats_tracker.received(); - - if !decoded_samples.is_empty() { - audio_packet_count += 1; - - let output_frame = AudioFrame::from_pooled( - OPUS_SAMPLE_RATE, - 1, - decoded_samples, - metadata, // Propagate metadata from input packet - ); - if context - .output_sender - .send("out", Packet::Audio(output_frame)) - .await - .is_err() - { - tracing::debug!("Output channel closed, stopping node"); - break; - } - stats_tracker.sent(); - } - stats_tracker.maybe_send(); - } - Some((Err(e), _metadata)) => { - packets_processed_counter.add(1, &[KeyValue::new("status", "error")]); - stats_tracker.received(); - stats_tracker.errored(); - stats_tracker.maybe_send(); - tracing::warn!("Decode error for packet: {}", e); - // Don't fail the entire node for decode errors, just skip the packet - } - None => { - // Result channel closed, blocking task is done - break; - } - } - } - Some(control_msg) = context.control_rx.recv() => { - if matches!(control_msg, streamkit_core::control::NodeControlMessage::Shutdown) { - tracing::info!("OpusDecoderNode received shutdown signal"); - // Abort input task - input_task.abort(); - // Abort blocking decode task for immediate shutdown - decode_task.abort(); - // Signal blocking task to shut down (in case it's still running) - drop(decode_tx); - // Break out of main loop - break; - } - // Ignore other control messages - } - _ = &mut input_task => { - // Input task finished, signal blocking task to shut down - drop(decode_tx); - - // Continue processing any remaining results, but also check for shutdown - loop { - tokio::select! { - maybe_result = result_rx.recv() => { - match maybe_result { - Some((Ok(decoded_samples), metadata)) => { - packets_processed_counter.add(1, &[KeyValue::new("status", "ok")]); - stats_tracker.received(); - - if !decoded_samples.is_empty() { - audio_packet_count += 1; - - let output_frame = AudioFrame::from_pooled( - OPUS_SAMPLE_RATE, - 1, - decoded_samples, - metadata, // Propagate metadata - ); - if context - .output_sender - .send("out", Packet::Audio(output_frame)) - .await - .is_err() - { - tracing::debug!("Output channel closed, stopping node"); - break; - } - stats_tracker.sent(); - } - stats_tracker.maybe_send(); - } - Some((Err(e), _metadata)) => { - packets_processed_counter.add(1, &[KeyValue::new("status", "error")]); - stats_tracker.received(); - stats_tracker.errored(); - stats_tracker.maybe_send(); - tracing::warn!("Decode error for packet: {}", e); - } - None => { - // Result channel closed, all results processed - break; - } - } - } - Some(ctrl_msg) = context.control_rx.recv() => { - if matches!(ctrl_msg, streamkit_core::control::NodeControlMessage::Shutdown) { - tracing::info!("OpusDecoderNode received shutdown signal during drain"); - // Abort blocking decode task for immediate shutdown - decode_task.abort(); - break; - } - } - } - } - break; - } - } - } - - // Abort the blocking task if not already aborted (for immediate shutdown) - decode_task.abort(); - - // Wait for the blocking task to complete with timeout (blocking I/O may not abort immediately) - match tokio::time::timeout(std::time::Duration::from_millis(100), decode_task).await { - Ok(Ok(())) => { - // Task completed successfully - }, - Ok(Err(e)) => { - // Task panicked or was aborted - if !e.is_cancelled() { - tracing::error!("Decode task panicked: {}", e); - } - }, - Err(_) => { - // Timeout - blocking task is stuck in I/O, this is expected on abort - tracing::debug!( - "Decode task did not respond to abort within 100ms (stuck in blocking I/O)" - ); - }, - } + crate::codec_utils::codec_forward_loop( + &mut context, + &mut result_rx, + &mut input_task, + decode_task, + decode_tx, + &packets_processed_counter, + &mut stats_tracker, + |packet| packet, // Already a Packet, pass through + "OpusDecoderNode", + ) + .await; state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); - tracing::info!("OpusDecoderNode finished after {} audio packets", audio_packet_count); + tracing::info!("OpusDecoderNode finished"); Ok(()) } } @@ -586,114 +457,27 @@ impl ProcessorNode for OpusEncoderNode { tracing::info!("OpusEncoderNode input stream closed after {} frames", frame_count); }); - // Process results from the blocking task - loop { - tokio::select! { - maybe_result = result_rx.recv() => { - match maybe_result { - Some(Ok(encoded_data)) => { - packets_processed_counter.add(1, &[KeyValue::new("status", "ok")]); - stats_tracker.received(); - - // Calculate packet duration: Opus typically uses 20ms frames at 48kHz - // (960 samples per frame). Set duration for pacing nodes downstream. - let duration_us = 20_000u64; // 20ms = 20,000 microseconds - - let output_packet = Packet::Binary { - data: Bytes::from(encoded_data), - content_type: None, // Opus packets don't have a content-type - metadata: Some(streamkit_core::types::PacketMetadata { - timestamp_us: None, // No absolute timestamp - duration_us: Some(duration_us), - sequence: None, // No sequence tracking yet - keyframe: None, - }), - }; - if context - .output_sender - .send("out", output_packet) - .await - .is_err() - { - tracing::debug!("Output channel closed, stopping node"); - break; - } - stats_tracker.sent(); - stats_tracker.maybe_send(); - } - Some(Err(e)) => { - packets_processed_counter.add(1, &[KeyValue::new("status", "error")]); - stats_tracker.received(); - stats_tracker.errored(); - stats_tracker.maybe_send(); - tracing::error!("Encode error: {}", e); - // Don't fail the entire pipeline for encode errors (e.g., last frame with invalid size) - // Just skip the frame and continue - } - None => { - // Result channel closed, blocking task is done - break; - } - } - } - Some(control_msg) = context.control_rx.recv() => { - if matches!(control_msg, streamkit_core::control::NodeControlMessage::Shutdown) { - tracing::info!("OpusEncoderNode received shutdown signal"); - // Abort input task - input_task.abort(); - // Signal blocking task to shut down - drop(encode_tx); - // Break out of main loop - break; - } - // Ignore other control messages - } - _ = &mut input_task => { - // Input task finished, signal blocking task to shut down - drop(encode_tx); - - // Continue processing any remaining results - while let Some(maybe_result) = result_rx.recv().await { - match maybe_result { - Ok(encoded_data) => { - packets_processed_counter.add(1, &[KeyValue::new("status", "ok")]); - stats_tracker.received(); - - let output_packet = Packet::Binary { - data: Bytes::from(encoded_data), - content_type: None, // Opus packets don't have a content-type - metadata: None, - }; - if context - .output_sender - .send("out", output_packet) - .await - .is_err() - { - tracing::debug!("Output channel closed, stopping node"); - break; - } - stats_tracker.sent(); - stats_tracker.maybe_send(); - } - Err(e) => { - packets_processed_counter.add(1, &[KeyValue::new("status", "error")]); - stats_tracker.received(); - stats_tracker.errored(); - stats_tracker.maybe_send(); - tracing::error!("Encode error: {}", e); - // Don't fail the entire pipeline for encode errors (e.g., last frame with invalid size) - // Just skip the frame and continue processing remaining results - } - } - } - break; - } - } - } - - // Wait for the blocking task to complete - let _ = encode_task.await; + crate::codec_utils::codec_forward_loop( + &mut context, + &mut result_rx, + &mut input_task, + encode_task, + encode_tx, + &packets_processed_counter, + &mut stats_tracker, + |encoded_data| Packet::Binary { + data: Bytes::from(encoded_data), + content_type: None, + metadata: Some(streamkit_core::types::PacketMetadata { + timestamp_us: None, + duration_us: Some(20_000), // 20ms Opus frame + sequence: None, + keyframe: None, + }), + }, + "OpusEncoderNode", + ) + .await; state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); diff --git a/crates/nodes/src/codec_utils.rs b/crates/nodes/src/codec_utils.rs new file mode 100644 index 00000000..a65ab154 --- /dev/null +++ b/crates/nodes/src/codec_utils.rs @@ -0,0 +1,117 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +//! Shared async select-loop for codec-style nodes. +//! +//! Many nodes follow the same pattern: a blocking task produces +//! `Result` items, an input task feeds packets into the blocking +//! task, and an async select-loop forwards results to the output sender while +//! handling shutdown and input completion. [`codec_forward_loop`] captures +//! this pattern so individual nodes don't need to duplicate it. + +use opentelemetry::KeyValue; +use streamkit_core::stats::NodeStatsTracker; +use streamkit_core::types::Packet; +use streamkit_core::NodeContext; +use tokio::sync::mpsc; + +/// Shared select-loop that forwards codec results to the output sender. +/// +/// Handles three concurrent events: +/// 1. Results arriving from the blocking codec task. +/// 2. Shutdown control messages. +/// 3. Input task completion (triggers drain of remaining results). +/// +/// `to_packet` converts a codec-specific result `T` into a [`Packet`]. +#[allow(clippy::too_many_arguments)] +pub async fn codec_forward_loop( + context: &mut NodeContext, + result_rx: &mut mpsc::Receiver>, + input_task: &mut tokio::task::JoinHandle<()>, + codec_task: tokio::task::JoinHandle<()>, + codec_tx: mpsc::Sender, + counter: &opentelemetry::metrics::Counter, + stats: &mut NodeStatsTracker, + to_packet: impl Fn(T) -> Packet, + label: &str, +) { + /// Forwards a single successful codec result to the output sender. + /// Returns `true` if the output channel is closed (caller should break). + async fn forward_one( + packet: Packet, + context: &mut NodeContext, + counter: &opentelemetry::metrics::Counter, + stats: &mut NodeStatsTracker, + ) -> bool { + counter.add(1, &[KeyValue::new("status", "ok")]); + stats.received(); + if context.output_sender.send("out", packet).await.is_err() { + tracing::debug!("Output channel closed, stopping node"); + return true; + } + stats.sent(); + stats.maybe_send(); + false + } + + /// Handles a codec error result by updating counters and logging. + fn handle_error( + err: &str, + counter: &opentelemetry::metrics::Counter, + stats: &mut NodeStatsTracker, + label: &str, + ) { + counter.add(1, &[KeyValue::new("status", "error")]); + stats.received(); + stats.errored(); + stats.maybe_send(); + tracing::warn!("{label} codec error: {err}"); + } + + loop { + tokio::select! { + maybe_result = result_rx.recv() => { + match maybe_result { + Some(Ok(item)) => { + if forward_one(to_packet(item), context, counter, stats).await { + break; + } + } + Some(Err(err)) => handle_error(&err, counter, stats, label), + None => break, + } + } + Some(control_msg) = context.control_rx.recv() => { + if matches!(control_msg, streamkit_core::control::NodeControlMessage::Shutdown) { + tracing::info!("{label} received shutdown signal"); + // NOTE: Aborting the input task and dropping codec_tx causes + // the codec thread to exit/flush, but because we break out + // here those flushed results are never sent downstream. + // Data loss on explicit shutdown is acceptable. + input_task.abort(); + codec_task.abort(); + drop(codec_tx); + break; + } + } + _ = &mut *input_task => { + drop(codec_tx); + while let Some(maybe_result) = result_rx.recv().await { + match maybe_result { + Ok(item) => { + if forward_one(to_packet(item), context, counter, stats).await { + break; + } + } + Err(err) => handle_error(&err, counter, stats, label), + } + } + break; + } + } + } + + codec_task.abort(); + let _ = codec_task.await; +} diff --git a/crates/nodes/src/lib.rs b/crates/nodes/src/lib.rs index 724f70b4..76fa5ac4 100644 --- a/crates/nodes/src/lib.rs +++ b/crates/nodes/src/lib.rs @@ -12,6 +12,7 @@ pub mod transport; pub mod video; // Shared utilities +pub mod codec_utils; pub mod streaming_utils; #[cfg(test)] diff --git a/crates/nodes/src/video/compositor/kernel.rs b/crates/nodes/src/video/compositor/kernel.rs index cb34ba62..e902b93a 100644 --- a/crates/nodes/src/video/compositor/kernel.rs +++ b/crates/nodes/src/video/compositor/kernel.rs @@ -13,10 +13,16 @@ use streamkit_core::types::PixelFormat; use super::config::Rect; use super::overlay::DecodedOverlay; -use super::pixel_ops::{ - all_alpha_opaque, i420_to_rgba8_buf, nv12_to_rgba8_buf, scale_blit_rgba_rotated, +use crate::video::pixel_ops::{ + all_alpha_opaque, i420_to_rgba8_buf, nv12_to_rgba8_buf, scale_blit_rgba_rotated, BlitRect, }; +impl From for BlitRect { + fn from(r: Rect) -> Self { + Self { x: r.x, y: r.y, width: r.width, height: r.height } + } +} + // ── Compositing kernel (runs on a persistent blocking thread) ──────────────── // ── YUV → RGBA conversion cache ───────────────────────────────────────────── @@ -196,7 +202,7 @@ struct CompositeItem<'a> { src_data: &'a [u8], src_width: u32, src_height: u32, - dst_rect: Rect, + dst_rect: BlitRect, opacity: f32, rotation_degrees: f32, /// When `true`, all source pixels have alpha == 255. Allows the blit @@ -312,8 +318,11 @@ pub fn composite_frame( // Video layers. for (layer, src_data) in resolved.iter().flatten() { - let dst_rect = - layer.rect.clone().unwrap_or(Rect { x: 0, y: 0, width: canvas_w, height: canvas_h }); + let dst_rect: BlitRect = layer + .rect + .clone() + .unwrap_or(Rect { x: 0, y: 0, width: canvas_w, height: canvas_h }) + .into(); // NV12/I420 → RGBA8 conversion always writes alpha = 255. let src_opaque = layer.pixel_format != PixelFormat::Rgba8; items.push(CompositeItem { @@ -337,7 +346,7 @@ pub fn composite_frame( src_data: &ov.rgba_data, src_width: ov.width, src_height: ov.height, - dst_rect: ov.rect.clone(), + dst_rect: ov.rect.clone().into(), opacity: ov.opacity, rotation_degrees: ov.rotation_degrees, src_opaque: false, @@ -354,7 +363,7 @@ pub fn composite_frame( src_data: &ov.rgba_data, src_width: ov.width, src_height: ov.height, - dst_rect: ov.rect.clone(), + dst_rect: ov.rect.clone().into(), opacity: ov.opacity, rotation_degrees: ov.rotation_degrees, src_opaque: false, diff --git a/crates/nodes/src/video/compositor/mod.rs b/crates/nodes/src/video/compositor/mod.rs index d7de3d11..20034edb 100644 --- a/crates/nodes/src/video/compositor/mod.rs +++ b/crates/nodes/src/video/compositor/mod.rs @@ -991,1048 +991,5 @@ pub fn register_compositor_nodes(registry: &mut NodeRegistry) { clippy::cast_possible_truncation, clippy::cast_sign_loss )] -mod tests { - use super::*; - use crate::test_utils::{ - assert_state_initializing, assert_state_running, assert_state_stopped, create_test_context, - }; - use config::{LayerConfig, Rect}; - use pixel_ops::{scale_blit_rgba, scale_blit_rgba_rotated}; - use std::collections::HashMap; - use tokio::sync::mpsc; - - /// Create a solid-colour RGBA8 VideoFrame. - fn make_rgba_frame(width: u32, height: u32, r: u8, g: u8, b: u8, a: u8) -> VideoFrame { - let total = (width as usize) * (height as usize) * 4; - let mut data = vec![0u8; total]; - for pixel in data.chunks_exact_mut(4) { - pixel[0] = r; - pixel[1] = g; - pixel[2] = b; - pixel[3] = a; - } - VideoFrame::new(width, height, PixelFormat::Rgba8, data).unwrap() - } - - // ── Unit tests for compositing helpers ─────────────────────────────── - - #[test] - fn test_scale_blit_identity() { - // 2x2 red source blitted onto a 4x4 canvas at (1,1) 2x2 rect. - let src = vec![255, 0, 0, 255, 0, 255, 0, 255, 0, 0, 255, 255, 128, 128, 128, 255]; - let mut dst = vec![0u8; 4 * 4 * 4]; // 4x4 RGBA, all transparent black - - scale_blit_rgba( - &mut dst, - 4, - 4, - &src, - 2, - 2, - &Rect { x: 1, y: 1, width: 2, height: 2 }, - 1.0, - false, - false, - false, - ); - - // Pixel at (1,1) should be red. - let x = 1usize; - let y = 1usize; - let idx = (y * 4 + x) * 4; - assert_eq!(dst[idx], 255); - assert_eq!(dst[idx + 1], 0); - assert_eq!(dst[idx + 2], 0); - assert_eq!(dst[idx + 3], 255); - - // Pixel at (0,0) should remain transparent black. - assert_eq!(dst[0], 0); - assert_eq!(dst[3], 0); - } - - #[test] - fn test_scale_blit_with_opacity() { - // White source at 50% opacity over black background. - let src = vec![255, 255, 255, 255]; // 1x1 white - let mut dst = vec![0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255]; // 2x2 black - - scale_blit_rgba( - &mut dst, - 2, - 2, - &src, - 1, - 1, - &Rect { x: 0, y: 0, width: 1, height: 1 }, - 0.5, - false, - false, - false, - ); - - // Pixel (0,0): white at 50% over opaque black -> ~128 grey. - let r = dst[0]; - assert!(r > 120 && r < 135, "Expected ~128, got {r}"); - } - - #[test] - fn test_scale_blit_scaling() { - // 1x1 red source scaled to 4x4 rect on an 8x8 canvas. - let src = vec![255, 0, 0, 255]; - let mut dst = vec![0u8; 8 * 8 * 4]; - - scale_blit_rgba( - &mut dst, - 8, - 8, - &src, - 1, - 1, - &Rect { x: 2, y: 2, width: 4, height: 4 }, - 1.0, - false, - false, - false, - ); - - // All pixels in the 4x4 destination rect should be red. - for y in 2..6u32 { - for x in 2..6u32 { - let idx = ((y * 8 + x) * 4) as usize; - assert_eq!(dst[idx], 255, "Red at ({x},{y})"); - assert_eq!(dst[idx + 1], 0, "Green at ({x},{y})"); - } - } - // Outside should remain black. - assert_eq!(dst[0], 0); - } - - #[test] - fn test_rotated_blit_stretch_to_fill() { - // A wide 4×2 red source blitted into a square 20×20 rect with 45° - // rotation on a 40×40 canvas. - // - // The source is stretched to fill the 20×20 rect (no aspect-ratio - // fit), then rotated 45°. The centre of the rect (canvas pixel - // 20,20) should be covered by red source pixels, while the rect - // corner (10,10) — outside the rotated area — should remain - // transparent. - let src = [255u8, 0, 0, 255].repeat(4 * 2); // 4×2 solid red - let mut dst = vec![0u8; 40 * 40 * 4]; - - scale_blit_rgba_rotated( - &mut dst, - 40, - 40, - &src, - 4, - 2, - &Rect { x: 10, y: 10, width: 20, height: 20 }, - 1.0, - 45.0, - false, - false, - false, - ); - - // The centre of the rect (canvas pixel 20,20) should be covered - // by source content (red). - let cx = 20usize; - let cy = 20usize; - let idx = (cy * 40 + cx) * 4; - assert_eq!(dst[idx], 255, "Centre R"); - assert_eq!(dst[idx + 1], 0, "Centre G"); - assert_eq!(dst[idx + 2], 0, "Centre B"); - assert!(dst[idx + 3] > 200, "Centre A should be mostly opaque"); - - // The rect corner (10,10) is outside the rotated content area - // and should remain transparent. - let corner_idx = (10usize * 40 + 10) * 4; - assert_eq!(dst[corner_idx + 3], 0, "Rect corner should be transparent"); - } - - #[test] - fn test_composite_frame_empty_layers() { - // No layers, no overlays -> transparent black canvas. - let mut cache = ConversionCache::new(); - let result = composite_frame(4, 4, &[], &[], &[], None, &mut cache); - let buf = result.as_slice(); - assert_eq!(buf.len(), 4 * 4 * 4); - assert!(buf.iter().all(|&b| b == 0)); - } - - #[test] - fn test_composite_frame_single_layer() { - let data = make_rgba_frame(2, 2, 255, 0, 0, 255); - let layer = LayerSnapshot { - data: data.data, - width: 2, - height: 2, - pixel_format: PixelFormat::Rgba8, - rect: Some(Rect { x: 0, y: 0, width: 4, height: 4 }), - opacity: 1.0, - z_index: 0, - rotation_degrees: 0.0, - mirror_horizontal: false, - mirror_vertical: false, - }; - - let mut cache = ConversionCache::new(); - let result = composite_frame(4, 4, &[Some(layer)], &[], &[], None, &mut cache); - let buf = result.as_slice(); - - // Entire canvas should be red (scaled from 2x2 to 4x4). - for pixel in buf.chunks_exact(4) { - assert_eq!(pixel[0], 255, "Red channel"); - assert_eq!(pixel[1], 0, "Green channel"); - assert_eq!(pixel[2], 0, "Blue channel"); - assert_eq!(pixel[3], 255, "Alpha channel"); - } - } - - #[test] - fn test_composite_frame_two_layers() { - // Bottom: full-canvas red. Top: small green square at (1,1) 2x2. - let red = make_rgba_frame(4, 4, 255, 0, 0, 255); - let green = make_rgba_frame(2, 2, 0, 255, 0, 255); - - let layer0 = LayerSnapshot { - data: red.data, - width: 4, - height: 4, - pixel_format: PixelFormat::Rgba8, - rect: None, - opacity: 1.0, - z_index: 0, - rotation_degrees: 0.0, - mirror_horizontal: false, - mirror_vertical: false, - }; - let layer1 = LayerSnapshot { - data: green.data, - width: 2, - height: 2, - pixel_format: PixelFormat::Rgba8, - rect: Some(Rect { x: 1, y: 1, width: 2, height: 2 }), - opacity: 1.0, - z_index: 1, - rotation_degrees: 0.0, - mirror_horizontal: false, - mirror_vertical: false, - }; - - let mut cache = ConversionCache::new(); - let result = - composite_frame(4, 4, &[Some(layer0), Some(layer1)], &[], &[], None, &mut cache); - let buf = result.as_slice(); - - // (0,0) should be red. - assert_eq!(buf[0], 255); - assert_eq!(buf[1], 0); - - // (1,1) should be green (overwritten by top layer). - let x = 1usize; - let y = 1usize; - let idx = (y * 4 + x) * 4; - assert_eq!(buf[idx], 0); - assert_eq!(buf[idx + 1], 255); - assert_eq!(buf[idx + 2], 0); - } - - #[test] - fn test_rasterize_text_overlay_produces_pixels() { - let cfg = config::TextOverlayConfig { - text: "Hi".to_string(), - transform: config::OverlayTransform { - rect: Rect { x: 0, y: 0, width: 64, height: 32 }, - opacity: 1.0, - rotation_degrees: 0.0, - z_index: 0, - mirror_horizontal: false, - mirror_vertical: false, - }, - color: [255, 255, 0, 255], - font_size: 24, - font_path: None, - font_data_base64: None, - font_name: None, - }; - let overlay = rasterize_text_overlay(&cfg); - // Width and height should be at least the original rect dimensions. - assert!(overlay.width >= 64); - assert!(overlay.height >= 32); - // The rect in the returned overlay should match the bitmap dimensions. - assert_eq!(overlay.rect.width, overlay.width); - assert_eq!(overlay.rect.height, overlay.height); - // Should have some non-zero pixels (text was drawn). - assert!(overlay.rgba_data.iter().any(|&b| b > 0)); - } - - #[test] - fn test_fit_rect_preserving_aspect() { - // 4:3 source into 16:9 bounds → pillarboxed (width-limited) - let bounds = Rect { x: 100, y: 50, width: 426, height: 240 }; - let fitted = fit_rect_preserving_aspect(640, 480, &bounds); - // Scale = min(426/640, 240/480) = min(0.666, 0.5) = 0.5 - // Fitted: 320×240, centred within 426×240 - assert_eq!(fitted.width, 320); - assert_eq!(fitted.height, 240); - assert_eq!(fitted.x, 100 + (426 - 320) / 2); - assert_eq!(fitted.y, 50); - - // 16:9 source into 4:3 bounds → letterboxed (height-limited) - let bounds = Rect { x: 0, y: 0, width: 400, height: 400 }; - let fitted = fit_rect_preserving_aspect(1280, 720, &bounds); - // Scale = min(400/1280, 400/720) = min(0.3125, 0.555) = 0.3125 - // Fitted: 400×225, centred within 400×400 - assert_eq!(fitted.width, 400); - assert_eq!(fitted.height, 225); - assert_eq!(fitted.x, 0); - assert_eq!(fitted.y, (400 - 225) / 2); - - // Exact match → no change - let bounds = Rect { x: 10, y: 20, width: 640, height: 480 }; - let fitted = fit_rect_preserving_aspect(640, 480, &bounds); - assert_eq!(fitted.width, 640); - assert_eq!(fitted.height, 480); - assert_eq!(fitted.x, 10); - assert_eq!(fitted.y, 20); - } - - #[test] - fn test_config_validate_ok() { - let cfg = CompositorConfig::default(); - assert!(cfg.validate().is_ok()); - } - - #[test] - fn test_config_validate_zero_dimensions() { - let cfg = CompositorConfig { width: 0, height: 720, ..Default::default() }; - assert!(cfg.validate().is_err()); - } - - #[test] - fn test_config_validate_bad_opacity() { - let mut cfg = CompositorConfig::default(); - cfg.layers.insert("in_0".to_string(), LayerConfig { opacity: 1.5, ..Default::default() }); - assert!(cfg.validate().is_err()); - } - - // ── Integration test: node run() with mock context ────────────────── - - #[tokio::test] - async fn test_compositor_node_run_main_only() { - let (input_tx, input_rx) = mpsc::channel(10); - let mut inputs = HashMap::new(); - inputs.insert("in_0".to_string(), input_rx); - - let (context, mock_sender, mut state_rx) = create_test_context(inputs, 10); - - let config = CompositorConfig { width: 4, height: 4, ..Default::default() }; - let node = CompositorNode::new(config); - - let node_handle = tokio::spawn(async move { Box::new(node).run(context).await }); - - assert_state_initializing(&mut state_rx).await; - assert_state_running(&mut state_rx).await; - - // Send a red frame. - let frame = make_rgba_frame(2, 2, 255, 0, 0, 255); - input_tx.send(Packet::Video(frame)).await.unwrap(); - - // Give time for processing. - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - - // Close input. - drop(input_tx); - - assert_state_stopped(&mut state_rx).await; - node_handle.await.unwrap().unwrap(); - - let output_packets = mock_sender.get_packets_for_pin("out").await; - assert!(!output_packets.is_empty(), "Expected at least 1 output frame"); - - // Verify output is 4x4 RGBA. - if let Packet::Video(ref out_frame) = output_packets[0] { - assert_eq!(out_frame.width, 4); - assert_eq!(out_frame.height, 4); - assert_eq!(out_frame.pixel_format, PixelFormat::Rgba8); - // Should be red (2x2 scaled to fill 4x4). - assert_eq!(out_frame.data()[0], 255); // R - assert_eq!(out_frame.data()[1], 0); // G - } else { - panic!("Expected video packet"); - } - } - - #[tokio::test] - async fn test_compositor_node_preserves_metadata() { - let (input_tx, input_rx) = mpsc::channel(10); - let mut inputs = HashMap::new(); - inputs.insert("in_0".to_string(), input_rx); - - let (context, mock_sender, mut state_rx) = create_test_context(inputs, 10); - - let config = CompositorConfig { width: 2, height: 2, ..Default::default() }; - let node = CompositorNode::new(config); - - let node_handle = tokio::spawn(async move { Box::new(node).run(context).await }); - - assert_state_initializing(&mut state_rx).await; - assert_state_running(&mut state_rx).await; - - let mut frame = make_rgba_frame(2, 2, 100, 100, 100, 255); - frame.metadata = Some(PacketMetadata { - timestamp_us: Some(42_000), - duration_us: Some(33_333), - sequence: Some(7), - keyframe: Some(true), - }); - input_tx.send(Packet::Video(frame)).await.unwrap(); - - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - drop(input_tx); - - assert_state_stopped(&mut state_rx).await; - node_handle.await.unwrap().unwrap(); - - let output_packets = mock_sender.get_packets_for_pin("out").await; - assert!(!output_packets.is_empty()); - - if let Packet::Video(ref out_frame) = output_packets[0] { - let meta = out_frame.metadata.as_ref().expect("metadata should be preserved"); - assert_eq!(meta.timestamp_us, Some(42_000)); - assert_eq!(meta.duration_us, Some(33_333)); - assert_eq!(meta.sequence, Some(0)); // output sequence starts at 0 - } else { - panic!("Expected video packet"); - } - } - - #[test] - fn test_compositor_definition_pins() { - let (inputs, outputs) = CompositorNode::definition_pins(); - assert_eq!(inputs.len(), 1); - assert_eq!(inputs[0].name, "in"); - assert!(matches!(inputs[0].cardinality, PinCardinality::Dynamic { .. })); - assert_eq!(outputs.len(), 1); - assert_eq!(outputs[0].name, "out"); - } - - #[test] - fn test_compositor_pool_usage() { - use streamkit_core::frame_pool::FramePool; - - let canvas_w = 4u32; - let canvas_h = 4u32; - let total = (canvas_w as usize) * (canvas_h as usize) * 4; // 64 bytes - - let pool = FramePool::::preallocated(&[total], 2); - assert_eq!(pool.stats().buckets[0].available, 2); - - let mut cache = ConversionCache::new(); - let result = composite_frame(canvas_w, canvas_h, &[], &[], &[], Some(&pool), &mut cache); - assert_eq!(result.as_slice().len(), total); - // One buffer was taken from the pool. - assert_eq!(pool.stats().buckets[0].available, 1); - - // Drop returns to pool. - drop(result); - assert_eq!(pool.stats().buckets[0].available, 2); - } - - // ── SIMD vs scalar equivalence tests ──────────────────────────────── - - /// Helper: scalar I420→RGBA8 conversion for a single pixel (reference). - #[allow(clippy::many_single_char_names)] - fn scalar_i420_to_rgba8(y: u8, u: u8, v: u8) -> [u8; 4] { - let c = i32::from(y) - 16; - let d = i32::from(u) - 128; - let e = i32::from(v) - 128; - let r = ((298 * c + 409 * e + 128) >> 8).clamp(0, 255) as u8; - let g = ((298 * c - 100 * d - 208 * e + 128) >> 8).clamp(0, 255) as u8; - let b = ((298 * c + 516 * d + 128) >> 8).clamp(0, 255) as u8; - [r, g, b, 255] - } - - /// Helper: scalar RGBA8→Y for a single pixel (reference). - fn scalar_rgba8_to_y(r: u8, g: u8, b: u8) -> u8 { - let y = ((66 * i32::from(r) + 129 * i32::from(g) + 25 * i32::from(b) + 128) >> 8) + 16; - y.clamp(0, 255) as u8 - } - - #[test] - fn test_i420_to_rgba8_simd_matches_scalar() { - // Test a variety of YUV values, including edge cases that trigger - // i16 overflow with the BT.601 coefficients. - let test_cases: Vec<(u8, u8, u8)> = vec![ - (16, 128, 128), // black - (235, 128, 128), // white - (81, 90, 240), // pure red - (145, 54, 34), // pure green - (41, 240, 110), // pure blue - (255, 128, 128), // max Y - (0, 0, 0), // min everything - (255, 255, 255), // max everything - (16, 0, 255), // extreme chroma - (235, 255, 0), // extreme chroma - ]; - - let width = test_cases.len() as u32; - // Build I420 buffer. - let mut y_plane = Vec::new(); - let mut u_plane = Vec::new(); - let mut v_plane = Vec::new(); - for &(y, u, v) in &test_cases { - y_plane.push(y); - // Each chroma sample covers 2 luma pixels horizontally. - if y_plane.len() % 2 == 1 { - u_plane.push(u); - v_plane.push(v); - } - } - let chroma_w = (width as usize).div_ceil(2); - // Pad if needed. - while u_plane.len() < chroma_w { - u_plane.push(128); - v_plane.push(128); - } - - let mut i420_data = Vec::new(); - i420_data.extend_from_slice(&y_plane); - i420_data.extend_from_slice(&u_plane); - i420_data.extend_from_slice(&v_plane); - - // Convert using the public function (which uses SIMD internally). - let mut simd_out = vec![0u8; width as usize * 4]; - pixel_ops::i420_to_rgba8_buf(&i420_data, width, 1, &mut simd_out); - - // Compare with scalar reference. - for (i, &(y, _u, _v)) in test_cases.iter().enumerate() { - // For chroma, each sample covers 2 pixels, so use the chroma - // value from the corresponding pair. - let chroma_idx = i / 2; - let actual_u = u_plane[chroma_idx]; - let actual_v = v_plane[chroma_idx]; - let expected = scalar_i420_to_rgba8(y, actual_u, actual_v); - let got = &simd_out[i * 4..(i + 1) * 4]; - assert_eq!( - got, &expected, - "pixel {i}: Y={y} U={actual_u} V={actual_v} → expected {expected:?}, got {got:?}" - ); - } - } - - #[test] - fn test_rgba8_to_i420_simd_matches_scalar() { - // Test RGBA→Y conversion with values that trigger i16 overflow - // (129 * 255 = 32895 > i16::MAX). - let test_pixels: Vec<(u8, u8, u8)> = vec![ - (0, 0, 0), // black - (255, 255, 255), // white - (255, 0, 0), // red - (0, 255, 0), // green - (0, 0, 255), // blue - (128, 128, 128), // mid grey - (0, 254, 0), // just below overflow threshold - (0, 255, 0), // at overflow threshold - ]; - - let width = test_pixels.len() as u32; - let mut rgba_data = Vec::with_capacity(width as usize * 4); - for &(r, g, b) in &test_pixels { - rgba_data.extend_from_slice(&[r, g, b, 255]); - } - - // Convert using the public function (SIMD internally). - let i420_size = width as usize + 2 * (width as usize).div_ceil(2); - let mut i420_out = vec![0u8; i420_size]; - pixel_ops::rgba8_to_i420_buf(&rgba_data, width, 1, &mut i420_out); - - // Check Y plane matches scalar. - for (i, &(r, g, b)) in test_pixels.iter().enumerate() { - let expected_y = scalar_rgba8_to_y(r, g, b); - let got_y = i420_out[i]; - assert_eq!( - got_y, expected_y, - "pixel {i}: R={r} G={g} B={b} → expected Y={expected_y}, got Y={got_y}" - ); - } - } - - #[test] - fn test_i420_rgba8_roundtrip_preserves_values() { - // A full I420→RGBA8→I420 round-trip should produce values close - // to the originals (within ±2 due to integer rounding). - let width: u32 = 8; - let height: u32 = 2; - let w = width as usize; - let h = height as usize; - let chroma_w = w.div_ceil(2); - - // Build a simple I420 test pattern. - let mut i420_data = vec![0u8; w * h + 2 * chroma_w * (h / 2)]; - // Y plane: gradient. - for (i, val) in i420_data[..w * h].iter_mut().enumerate() { - *val = (16 + (i * 219 / (w * h))) as u8; - } - // U/V planes: mid-range. - let u_offset = w * h; - let v_offset = u_offset + chroma_w * (h / 2); - for i in 0..chroma_w * (h / 2) { - i420_data[u_offset + i] = 128; - i420_data[v_offset + i] = 128; - } - - // I420 → RGBA8 → I420 - let mut rgba = vec![0u8; w * h * 4]; - pixel_ops::i420_to_rgba8_buf(&i420_data, width, height, &mut rgba); - let mut i420_roundtrip = vec![0u8; i420_data.len()]; - pixel_ops::rgba8_to_i420_buf(&rgba, width, height, &mut i420_roundtrip); - - // Y values should be close (within ±2 of originals due to rounding). - for (idx, orig_val) in i420_data[..w * h].iter().enumerate() { - let orig = i32::from(*orig_val); - let rt = i32::from(i420_roundtrip[idx]); - assert!( - (orig - rt).abs() <= 2, - "Y[{idx}]: original={orig}, roundtrip={rt}, diff={}", - (orig - rt).abs() - ); - } - } - - /// Test that `scale_blit_rgba` with opacity < 1.0 writes all rows correctly - /// on a buffer wide enough to exercise the AVX2 blend path (32 pixels). - /// This verifies the AVX2 → SSE2 → scalar cascade in `blit_row_alpha`. - #[test] - fn test_scale_blit_opacity_all_rows_written() { - let w = 32usize; - let h = 32usize; - // Fully opaque red source. - let src: Vec = [200, 50, 30, 255].repeat(w * h); - // All-black destination (simulates cleared canvas). - let mut dst = vec![0u8; w * h * 4]; - - scale_blit_rgba( - &mut dst, - w as u32, - h as u32, - &src, - w as u32, - h as u32, - &Rect { x: 0, y: 0, width: w as u32, height: h as u32 }, - 0.9, - false, - false, - false, - ); - - // Every single row should have been written to (non-zero pixels). - for row in 0..h { - let row_start = row * w * 4; - let row_slice = &dst[row_start..row_start + w * 4]; - let any_written = row_slice.iter().any(|&b| b != 0); - assert!(any_written, "Row {row} was not written to (all zeros)"); - - // Verify each pixel matches the expected scalar blend. - // opacity_u16 = (0.9 * 255 + 0.5) as u16 = 230 - // sa_eff = (255 * 230 + 128) >> 8 = 229 - // Dst is black (0), so blended = src * sa_eff / 255. - let opacity_u16: u16 = 230; - let sa_eff = ((255u16 * opacity_u16 + 128) >> 8).min(255); - let expected_r = { - let blend = 200u16 * sa_eff + 128; - ((blend + (blend >> 8)) >> 8) as u8 - }; - let expected_g = { - let blend = 50u16 * sa_eff + 128; - ((blend + (blend >> 8)) >> 8) as u8 - }; - let expected_b = { - let blend = 30u16 * sa_eff + 128; - ((blend + (blend >> 8)) >> 8) as u8 - }; - for col in 0..w { - let idx = row_start + col * 4; - let got_r = dst[idx]; - let got_g = dst[idx + 1]; - let got_b = dst[idx + 2]; - let got_a = dst[idx + 3]; - - // Allow ±1 for rounding differences between SIMD and scalar paths. - assert!( - (i16::from(got_r) - i16::from(expected_r)).abs() <= 1, - "Row {row}, Col {col}: R={got_r}, expected ~{expected_r}" - ); - assert!( - (i16::from(got_g) - i16::from(expected_g)).abs() <= 1, - "Row {row}, Col {col}: G={got_g}, expected ~{expected_g}" - ); - assert!( - (i16::from(got_b) - i16::from(expected_b)).abs() <= 1, - "Row {row}, Col {col}: B={got_b}, expected ~{expected_b}" - ); - assert!(got_a > 200, "Row {row}, Col {col}: A={got_a}, expected >200"); - } - } - } - - /// Test I420→RGBA8 AVX2 kernel correctness with a multi-row buffer wide - /// enough to exercise the 8-pixel AVX2 path plus scalar remainder. - /// Verifies the OOB-safe scalar chroma reads produce identical output to - /// the scalar reference for every pixel. - #[test] - fn test_i420_to_rgba8_avx2_wide_multirow() { - // 24 pixels wide = 3 AVX2 iterations (8px each) with 0 remainder. - // 4 rows to exercise multi-row chroma subsampling. - let width: u32 = 24; - let height: u32 = 4; - let w = width as usize; - let h = height as usize; - let chroma_w = w / 2; - - // Build a varied I420 test pattern. - let mut i420_data = vec![0u8; w * h + 2 * chroma_w * (h / 2)]; - // Y plane: gradient across rows and columns. - for row in 0..h { - for col in 0..w { - i420_data[row * w + col] = (16 + ((row * w + col) * 219) / (w * h)) as u8; - } - } - // U/V planes: varying chroma values. - let u_offset = w * h; - let v_offset = u_offset + chroma_w * (h / 2); - for i in 0..chroma_w * (h / 2) { - i420_data[u_offset + i] = (64 + (i * 3) % 192) as u8; - i420_data[v_offset + i] = (32 + (i * 7) % 224) as u8; - } - - // Convert using the public function (dispatches to AVX2 on this machine). - let mut simd_out = vec![0u8; w * h * 4]; - pixel_ops::i420_to_rgba8_buf(&i420_data, width, height, &mut simd_out); - - // Compare every pixel against the scalar reference. - for row in 0..h { - for col in 0..w { - let luma = i420_data[row * w + col]; - let chroma_r = row / 2; - let chroma_c = col / 2; - let u_val = i420_data[u_offset + chroma_r * chroma_w + chroma_c]; - let v_val = i420_data[v_offset + chroma_r * chroma_w + chroma_c]; - let expected = scalar_i420_to_rgba8(luma, u_val, v_val); - let got_idx = (row * w + col) * 4; - let got = &simd_out[got_idx..got_idx + 4]; - assert_eq!( - got, &expected, - "row={row} col={col}: Y={luma} U={u_val} V={v_val} → expected {expected:?}, got {got:?}" - ); - } - } - } - - /// Test that opacity < 1.0 through `composite_frame` produces correct - /// output with no black borders when source matches canvas dimensions. - #[test] - fn test_composite_frame_opacity_no_black_borders() { - let w = 32u32; - let h = 32u32; - let frame = make_rgba_frame(w, h, 200, 100, 50, 255); - - let layer = LayerSnapshot { - data: frame.data, - width: w, - height: h, - pixel_format: PixelFormat::Rgba8, - rect: Some(Rect { x: 0, y: 0, width: w, height: h }), - opacity: 0.8, - z_index: 0, - rotation_degrees: 0.0, - mirror_horizontal: false, - mirror_vertical: false, - }; - - let mut cache = ConversionCache::new(); - let result = composite_frame(w, h, &[Some(layer)], &[], &[], None, &mut cache); - let buf = result.as_slice(); - - // Every row should have non-zero content (no black borders). - for row in 0..h as usize { - let row_start = row * w as usize * 4; - let row_end = row_start + w as usize * 4; - let any_nonzero = buf[row_start..row_end].iter().any(|&b| b != 0); - assert!(any_nonzero, "Row {row} is all zeros — black border detected"); - } - } - - /// Full-pipeline test at real dimensions (640×480): compositor blit with - /// opacity < 1.0, then RGBA→NV12→RGBA roundtrip, checking for black bands. - /// This exercises the exact pipeline the VP9 encoder sees. - #[test] - #[allow(clippy::many_single_char_names)] // Standard image-processing shorthand (w, h, r, g, b, etc.) - fn test_full_pipeline_opacity_nv12_roundtrip_no_black_bands() { - let w = 640u32; - let h = 480u32; - let wu = w as usize; - let hu = h as usize; - - // Create a colorbars-like pattern: 7 vertical bars of different colors. - let colors: [(u8, u8, u8); 7] = [ - (255, 255, 255), // white - (255, 255, 0), // yellow - (0, 255, 255), // cyan - (0, 255, 0), // green - (255, 0, 255), // magenta - (255, 0, 0), // red - (0, 0, 255), // blue - ]; - let mut src_rgba = vec![0u8; wu * hu * 4]; - for row in 0..hu { - for col in 0..wu { - let bar_idx = (col * 7) / wu; - let (r, g, b) = colors[bar_idx]; - let off = (row * wu + col) * 4; - src_rgba[off] = r; - src_rgba[off + 1] = g; - src_rgba[off + 2] = b; - src_rgba[off + 3] = 255; - } - } - - // Step 1: Blit onto canvas with opacity 0.9 (through scale_blit_rgba_rotated, - // exactly as the compositor does). - let mut canvas = vec![0u8; wu * hu * 4]; - pixel_ops::scale_blit_rgba_rotated( - &mut canvas, - w, - h, - &src_rgba, - w, - h, - &Rect { x: 0, y: 0, width: w, height: h }, - 0.9, - 0.0, - false, - false, - false, - ); - - // Verify compositor output: every row should have non-zero pixels. - for row in 0..hu { - let row_start = row * wu * 4; - let any_nonzero = canvas[row_start..row_start + wu * 4].iter().any(|&b| b != 0); - assert!(any_nonzero, "Compositor output row {row} is all zeros (black band)"); - } - - // Step 2: Convert RGBA → NV12 (exactly as the VP9 encoder does). - let chroma_w = wu.div_ceil(2); - let chroma_h = hu.div_ceil(2); - let nv12_size = wu * hu + chroma_w * 2 * chroma_h; - let mut nv12 = vec![0u8; nv12_size]; - pixel_ops::rgba8_to_nv12_buf(&canvas, w, h, &mut nv12); - - // Verify Y plane: no rows should be all-zero (Y=0 is below black level). - // With opacity 0.9 on colored bars, Y values should be well above 0. - for row in 0..hu { - let y_row = &nv12[row * wu..(row + 1) * wu]; - let max_y = *y_row.iter().max().unwrap(); - assert!(max_y > 16, "NV12 Y-plane row {row}: max Y={max_y}, expected >16 (not black)"); - } - - // Step 3: Convert NV12 → RGBA (simulates decoder display). - let mut decoded_rgba = vec![0u8; wu * hu * 4]; - pixel_ops::nv12_to_rgba8_buf(&nv12, w, h, &mut decoded_rgba); - - // Verify decoded output: every row should have non-black pixels. - for row in 0..hu { - let row_start = row * wu * 4; - let row_slice = &decoded_rgba[row_start..row_start + wu * 4]; - // Check that at least some pixels have R, G, or B > 10 (not near-black). - let has_visible = - row_slice.chunks_exact(4).any(|px| px[0] > 10 || px[1] > 10 || px[2] > 10); - assert!(has_visible, "Decoded row {row} has no visible pixels (all near-black)"); - } - } - - /// Regression test: a 4:3 source blitted onto a 16:9 canvas with opacity < 1.0 - /// must cover the entire canvas (stretch-to-fill) with no black bars. - /// Previously the near-zero rotation fast path applied an aspect-ratio-preserving - /// fit that left letterbox gaps visible as black bands when opacity < 1.0. - #[test] - fn test_mismatched_aspect_ratio_opacity_no_black_bars() { - let src_w = 640u32; - let src_h = 480u32; // 4:3 - let canvas_w = 1280u32; - let canvas_h = 720u32; // 16:9 - - // Solid green source. - let src = [0u8, 255, 0, 255].repeat((src_w * src_h) as usize); - let mut canvas = vec![0u8; (canvas_w * canvas_h * 4) as usize]; - - pixel_ops::scale_blit_rgba_rotated( - &mut canvas, - canvas_w, - canvas_h, - &src, - src_w, - src_h, - &Rect { x: 0, y: 0, width: canvas_w, height: canvas_h }, - 0.9, - 0.0, // no rotation — exercises the near-zero fast path - false, - false, - false, - ); - - // Every row should have non-zero pixels (no black bars on left/right). - for row in 0..canvas_h as usize { - let row_start = row * canvas_w as usize * 4; - let row_end = row_start + canvas_w as usize * 4; - let any_nonzero = canvas[row_start..row_end].iter().any(|&b| b != 0); - assert!(any_nonzero, "Row {row} is all zeros — black bar detected"); - } - - // Every column should have non-zero pixels (no black bars on top/bottom). - for col in 0..canvas_w as usize { - let any_nonzero = (0..canvas_h as usize).any(|row| { - let idx = (row * canvas_w as usize + col) * 4; - canvas[idx] != 0 || canvas[idx + 1] != 0 || canvas[idx + 2] != 0 - }); - assert!(any_nonzero, "Column {col} is all zeros — black bar detected"); - } - } - - /// Regression test: a 4:3 source blitted into a non-square rect with 15° - /// rotation must cover the centre of the rect (stretch-to-fill, not - /// aspect-ratio fit). Exercises the rotated path's per-axis inverse - /// scaling (`inv_scale_x` / `inv_scale_y`). - #[test] - fn test_rotated_blit_mismatched_aspect_ratio_covers_centre() { - // 4×2 red source into a 40×20 rect (2:1 aspect mismatch) at 15° on - // a 60×40 canvas. The centre of the rect (canvas pixel 30,20) must - // be covered by red source content. - let src = [255u8, 0, 0, 255].repeat(4 * 2); // 4×2 solid red - let mut dst = vec![0u8; 60 * 40 * 4]; - - scale_blit_rgba_rotated( - &mut dst, - 60, - 40, - &src, - 4, - 2, - &Rect { x: 10, y: 10, width: 40, height: 20 }, - 1.0, - 15.0, - false, - false, - false, - ); - - // Centre of the rect (canvas pixel 30, 20) should be red. - let cx = 30usize; - let cy = 20usize; - let idx = (cy * 60 + cx) * 4; - assert_eq!(dst[idx], 255, "Centre R"); - assert_eq!(dst[idx + 1], 0, "Centre G"); - assert_eq!(dst[idx + 2], 0, "Centre B"); - assert!(dst[idx + 3] > 200, "Centre A should be mostly opaque"); - } - - /// Test RGBA→NV12 AVX2 chroma conversion matches scalar reference. - /// Uses a 640-wide frame to fully exercise the AVX2 path (8 chroma samples/iter). - #[test] - #[allow(clippy::many_single_char_names)] // Standard image-processing shorthand (w, h, r, g, b, etc.) - fn test_rgba8_to_nv12_avx2_chroma_matches_scalar() { - let w = 640u32; - let h = 4u32; - let wu = w as usize; - let hu = h as usize; - let chroma_w = wu / 2; - let chroma_h = hu / 2; - - // Create a varied RGBA pattern. - let mut rgba = vec![0u8; wu * hu * 4]; - for row in 0..hu { - for col in 0..wu { - let off = (row * wu + col) * 4; - rgba[off] = ((col * 3 + row * 7) % 256) as u8; // R - rgba[off + 1] = ((col * 5 + row * 11) % 256) as u8; // G - rgba[off + 2] = ((col * 7 + row * 13) % 256) as u8; // B - rgba[off + 3] = 255; // A - } - } - - // Convert using the public function (dispatches to AVX2). - let nv12_size = wu * hu + chroma_w * 2 * chroma_h; - let mut nv12_simd = vec![0u8; nv12_size]; - pixel_ops::rgba8_to_nv12_buf(&rgba, w, h, &mut nv12_simd); - - // Compute scalar reference for the chroma plane. - let y_size = wu * hu; - for crow in 0..chroma_h { - let r0 = crow * 2; - for ccol in 0..chroma_w { - let c0 = ccol * 2; - let mut sr = 0i32; - let mut sg = 0i32; - let mut sb = 0i32; - let mut count = 0i32; - for dr in 0..2u32 { - let rr = r0 + dr as usize; - if rr >= hu { - continue; - } - for dc in 0..2u32 { - let cc = c0 + dc as usize; - if cc < wu { - let off = (rr * wu + cc) * 4; - sr += i32::from(rgba[off]); - sg += i32::from(rgba[off + 1]); - sb += i32::from(rgba[off + 2]); - count += 1; - } - } - } - let r_avg = sr / count; - let g_avg = sg / count; - let b_avg = sb / count; - let expected_u = ((-38 * r_avg - 74 * g_avg + 112 * b_avg + 128) >> 8) + 128; - let expected_v = ((112 * r_avg - 94 * g_avg - 18 * b_avg + 128) >> 8) + 128; - let expected_u = expected_u.clamp(0, 255) as u8; - let expected_v = expected_v.clamp(0, 255) as u8; - - let uv_off = y_size + crow * chroma_w * 2 + ccol * 2; - let got_u = nv12_simd[uv_off]; - let got_v = nv12_simd[uv_off + 1]; - - // Allow ±2 for rounding differences between SIMD and scalar. - assert!( - (i16::from(got_u) - i16::from(expected_u)).abs() <= 2, - "crow={crow} ccol={ccol}: U got={got_u}, expected={expected_u}" - ); - assert!( - (i16::from(got_v) - i16::from(expected_v)).abs() <= 2, - "crow={crow} ccol={ccol}: V got={got_v}, expected={expected_v}" - ); - } - } - - // Also verify Y plane matches scalar reference. - for row in 0..hu { - for col in 0..wu { - let off = (row * wu + col) * 4; - let r = i32::from(rgba[off]); - let g = i32::from(rgba[off + 1]); - let b = i32::from(rgba[off + 2]); - let expected_y = - (((66 * r + 129 * g + 25 * b + 128) >> 8) + 16).clamp(0, 255) as u8; - let got_y = nv12_simd[row * wu + col]; - assert!( - (i16::from(got_y) - i16::from(expected_y)).abs() <= 1, - "row={row} col={col}: Y got={got_y}, expected={expected_y}" - ); - } - } - } -} +#[path = "tests.rs"] +mod tests; diff --git a/crates/nodes/src/video/compositor/pixel_ops/mod.rs b/crates/nodes/src/video/compositor/pixel_ops/mod.rs index 9f45c2df..52428fd7 100644 --- a/crates/nodes/src/video/compositor/pixel_ops/mod.rs +++ b/crates/nodes/src/video/compositor/pixel_ops/mod.rs @@ -2,102 +2,11 @@ // // SPDX-License-Identifier: MPL-2.0 -//! Pixel-level operations for the video compositor. +//! Re-exports from [`crate::video::pixel_ops`]. //! -//! Contains RGBA8 blitting (with nearest-neighbor scaling), alpha blending, -//! overlay compositing, and I420 / NV12 ↔ RGBA8 colour-space conversion. -//! -//! All hot loops use row-level parallelism via `rayon` when the region is -//! large enough to amortise the thread-pool dispatch overhead. Below the -//! threshold the same per-row closures run sequentially. -//! -//! # Module structure -//! -//! - [`blit`] — axis-aligned and rotated scale + blit operations. -//! - [`convert`] — colour-space conversion (I420, NV12 ↔ RGBA8). -//! - [`simd`] (x86-64 only) — SIMD kernels for both blitting and conversion. - -mod blit; -mod convert; - -#[cfg(target_arch = "x86_64")] -mod simd_x86_64; - -/// Re-export the x86-64 SIMD module under a shorter name for internal use. -#[cfg(target_arch = "x86_64")] -use simd_x86_64 as simd; - -// ── Shared constants and helpers ──────────────────────────────────────────── - -/// Minimum number of output rows before we dispatch to rayon. Below this -/// threshold the per-row work is small enough that the rayon scheduling -/// overhead (work-stealing queue push/pop, thread wake-up) dominates. -/// 64 rows at 1280-wide RGBA8 ≈ 320 KiB — a reasonable crossover point -/// on modern x86-64 cores. -const RAYON_ROW_THRESHOLD: usize = 64; - -/// Number of rows to bundle into a single rayon task once parallel mode is -/// entered. Reduces work-stealing overhead from ~1 task/row to -/// ~rows/chunk tasks. -/// -/// [`rayon_chunk_rows`] auto-tunes the chunk size based on workload: -/// wider or taller frames produce fewer, larger chunks, keeping -/// scheduling cost proportional to the actual parallelism available. -/// -/// Formula: `max(8, total_rows / (num_cpus * 4))`, clamped to `[8, 64]`. -/// This keeps chunk counts proportional to hardware parallelism while -/// avoiding both excessive scheduling overhead (too many tiny chunks) -/// and poor load-balancing (too few large chunks). -/// -/// The CPU count is cached in a `LazyLock` so we avoid a `sysconf` syscall -/// (~40 µs on Linux) on every call. -fn rayon_chunk_rows(total_rows: usize) -> usize { - static CPUS: std::sync::LazyLock = std::sync::LazyLock::new(|| { - std::thread::available_parallelism().map(std::num::NonZero::get).unwrap_or(1) - }); - let ideal = total_rows.div_ceil(*CPUS * 4); - ideal.clamp(8, 64) -} - -/// Fixed-point alpha blend: `(src * alpha + dst * (255 - alpha) + 128) / 255` -/// using the well-known `((x + (x >> 8)) >> 8)` fast approximation of `x / 255`. -#[allow(clippy::inline_always)] -#[inline(always)] -const fn blend_u8(src: u8, dst: u8, alpha: u16) -> u8 { - let inv = 255 - alpha; - let val = src as u16 * alpha + dst as u16 * inv + 128; - ((val + (val >> 8)) >> 8) as u8 -} - -/// Check whether every pixel's alpha byte in an RGBA8 buffer is `0xFF`. -/// -/// Dispatches to AVX2 / SSE2 kernels on x86-64 (8 / 4 pixels per iteration) -/// and falls back to a scalar scan on other architectures. Safe wrapper -/// around the `target_feature`-gated SIMD helpers so callers outside this -/// module don't need their own `unsafe` + `cfg` scaffolding. -/// -/// Assumes `rgba.len()` is a multiple of 4 — always true for valid RGBA8 data. -pub fn all_alpha_opaque(rgba: &[u8]) -> bool { - #[cfg(target_arch = "x86_64")] - { - // SAFETY: `all_alpha_opaque_{avx2,sse2}` require the input length to be - // a multiple of 4 bytes, which holds for any valid RGBA8 buffer. Both - // helpers handle arbitrary tails internally (scalar fall-through for - // trailing bytes past the last full SIMD chunk). Feature availability - // is checked at runtime via `is_x86_feature_detected!`. - if is_x86_feature_detected!("avx2") { - unsafe { simd::all_alpha_opaque_avx2(rgba) } - } else { - unsafe { simd::all_alpha_opaque_sse2(rgba) } - } - } - #[cfg(not(target_arch = "x86_64"))] - { - rgba.chunks_exact(4).all(|px| px[3] == 255) - } -} - -// ── Public API re-exports ─────────────────────────────────────────────────── +//! The pixel-operation implementations have moved to `video::pixel_ops` so +//! they can be shared across the compositor, pixel-convert node, and any +//! future video nodes. This shim keeps existing `super::pixel_ops::*` +//! imports inside the compositor compiling without changes. -pub use blit::{scale_blit_rgba, scale_blit_rgba_rotated}; -pub use convert::{i420_to_rgba8_buf, nv12_to_rgba8_buf, rgba8_to_i420_buf, rgba8_to_nv12_buf}; +pub use crate::video::pixel_ops::*; diff --git a/crates/nodes/src/video/compositor/tests.rs b/crates/nodes/src/video/compositor/tests.rs new file mode 100644 index 00000000..30d2f091 --- /dev/null +++ b/crates/nodes/src/video/compositor/tests.rs @@ -0,0 +1,1045 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +use super::*; +use crate::test_utils::{ + assert_state_initializing, assert_state_running, assert_state_stopped, create_test_context, +}; +use config::{LayerConfig, Rect}; +use pixel_ops::{scale_blit_rgba, scale_blit_rgba_rotated, BlitRect}; +use std::collections::HashMap; +use tokio::sync::mpsc; + +/// Create a solid-colour RGBA8 VideoFrame. +fn make_rgba_frame(width: u32, height: u32, r: u8, g: u8, b: u8, a: u8) -> VideoFrame { + let total = (width as usize) * (height as usize) * 4; + let mut data = vec![0u8; total]; + for pixel in data.chunks_exact_mut(4) { + pixel[0] = r; + pixel[1] = g; + pixel[2] = b; + pixel[3] = a; + } + VideoFrame::new(width, height, PixelFormat::Rgba8, data).unwrap() +} + +// ── Unit tests for compositing helpers ─────────────────────────────── + +#[test] +fn test_scale_blit_identity() { + // 2x2 red source blitted onto a 4x4 canvas at (1,1) 2x2 rect. + let src = vec![255, 0, 0, 255, 0, 255, 0, 255, 0, 0, 255, 255, 128, 128, 128, 255]; + let mut dst = vec![0u8; 4 * 4 * 4]; // 4x4 RGBA, all transparent black + + scale_blit_rgba( + &mut dst, + 4, + 4, + &src, + 2, + 2, + &BlitRect { x: 1, y: 1, width: 2, height: 2 }, + 1.0, + false, + false, + false, + ); + + // Pixel at (1,1) should be red. + let x = 1usize; + let y = 1usize; + let idx = (y * 4 + x) * 4; + assert_eq!(dst[idx], 255); + assert_eq!(dst[idx + 1], 0); + assert_eq!(dst[idx + 2], 0); + assert_eq!(dst[idx + 3], 255); + + // Pixel at (0,0) should remain transparent black. + assert_eq!(dst[0], 0); + assert_eq!(dst[3], 0); +} + +#[test] +fn test_scale_blit_with_opacity() { + // White source at 50% opacity over black background. + let src = vec![255, 255, 255, 255]; // 1x1 white + let mut dst = vec![0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255]; // 2x2 black + + scale_blit_rgba( + &mut dst, + 2, + 2, + &src, + 1, + 1, + &BlitRect { x: 0, y: 0, width: 1, height: 1 }, + 0.5, + false, + false, + false, + ); + + // Pixel (0,0): white at 50% over opaque black -> ~128 grey. + let r = dst[0]; + assert!(r > 120 && r < 135, "Expected ~128, got {r}"); +} + +#[test] +fn test_scale_blit_scaling() { + // 1x1 red source scaled to 4x4 rect on an 8x8 canvas. + let src = vec![255, 0, 0, 255]; + let mut dst = vec![0u8; 8 * 8 * 4]; + + scale_blit_rgba( + &mut dst, + 8, + 8, + &src, + 1, + 1, + &BlitRect { x: 2, y: 2, width: 4, height: 4 }, + 1.0, + false, + false, + false, + ); + + // All pixels in the 4x4 destination rect should be red. + for y in 2..6u32 { + for x in 2..6u32 { + let idx = ((y * 8 + x) * 4) as usize; + assert_eq!(dst[idx], 255, "Red at ({x},{y})"); + assert_eq!(dst[idx + 1], 0, "Green at ({x},{y})"); + } + } + // Outside should remain black. + assert_eq!(dst[0], 0); +} + +#[test] +fn test_rotated_blit_stretch_to_fill() { + // A wide 4×2 red source blitted into a square 20×20 rect with 45° + // rotation on a 40×40 canvas. + // + // The source is stretched to fill the 20×20 rect (no aspect-ratio + // fit), then rotated 45°. The centre of the rect (canvas pixel + // 20,20) should be covered by red source pixels, while the rect + // corner (10,10) — outside the rotated area — should remain + // transparent. + let src = [255u8, 0, 0, 255].repeat(4 * 2); // 4×2 solid red + let mut dst = vec![0u8; 40 * 40 * 4]; + + scale_blit_rgba_rotated( + &mut dst, + 40, + 40, + &src, + 4, + 2, + &BlitRect { x: 10, y: 10, width: 20, height: 20 }, + 1.0, + 45.0, + false, + false, + false, + ); + + // The centre of the rect (canvas pixel 20,20) should be covered + // by source content (red). + let cx = 20usize; + let cy = 20usize; + let idx = (cy * 40 + cx) * 4; + assert_eq!(dst[idx], 255, "Centre R"); + assert_eq!(dst[idx + 1], 0, "Centre G"); + assert_eq!(dst[idx + 2], 0, "Centre B"); + assert!(dst[idx + 3] > 200, "Centre A should be mostly opaque"); + + // The rect corner (10,10) is outside the rotated content area + // and should remain transparent. + let corner_idx = (10usize * 40 + 10) * 4; + assert_eq!(dst[corner_idx + 3], 0, "Rect corner should be transparent"); +} + +#[test] +fn test_composite_frame_empty_layers() { + // No layers, no overlays -> transparent black canvas. + let mut cache = ConversionCache::new(); + let result = composite_frame(4, 4, &[], &[], &[], None, &mut cache); + let buf = result.as_slice(); + assert_eq!(buf.len(), 4 * 4 * 4); + assert!(buf.iter().all(|&b| b == 0)); +} + +#[test] +fn test_composite_frame_single_layer() { + let data = make_rgba_frame(2, 2, 255, 0, 0, 255); + let layer = LayerSnapshot { + data: data.data, + width: 2, + height: 2, + pixel_format: PixelFormat::Rgba8, + rect: Some(Rect { x: 0, y: 0, width: 4, height: 4 }), + opacity: 1.0, + z_index: 0, + rotation_degrees: 0.0, + mirror_horizontal: false, + mirror_vertical: false, + }; + + let mut cache = ConversionCache::new(); + let result = composite_frame(4, 4, &[Some(layer)], &[], &[], None, &mut cache); + let buf = result.as_slice(); + + // Entire canvas should be red (scaled from 2x2 to 4x4). + for pixel in buf.chunks_exact(4) { + assert_eq!(pixel[0], 255, "Red channel"); + assert_eq!(pixel[1], 0, "Green channel"); + assert_eq!(pixel[2], 0, "Blue channel"); + assert_eq!(pixel[3], 255, "Alpha channel"); + } +} + +#[test] +fn test_composite_frame_two_layers() { + // Bottom: full-canvas red. Top: small green square at (1,1) 2x2. + let red = make_rgba_frame(4, 4, 255, 0, 0, 255); + let green = make_rgba_frame(2, 2, 0, 255, 0, 255); + + let layer0 = LayerSnapshot { + data: red.data, + width: 4, + height: 4, + pixel_format: PixelFormat::Rgba8, + rect: None, + opacity: 1.0, + z_index: 0, + rotation_degrees: 0.0, + mirror_horizontal: false, + mirror_vertical: false, + }; + let layer1 = LayerSnapshot { + data: green.data, + width: 2, + height: 2, + pixel_format: PixelFormat::Rgba8, + rect: Some(Rect { x: 1, y: 1, width: 2, height: 2 }), + opacity: 1.0, + z_index: 1, + rotation_degrees: 0.0, + mirror_horizontal: false, + mirror_vertical: false, + }; + + let mut cache = ConversionCache::new(); + let result = composite_frame(4, 4, &[Some(layer0), Some(layer1)], &[], &[], None, &mut cache); + let buf = result.as_slice(); + + // (0,0) should be red. + assert_eq!(buf[0], 255); + assert_eq!(buf[1], 0); + + // (1,1) should be green (overwritten by top layer). + let x = 1usize; + let y = 1usize; + let idx = (y * 4 + x) * 4; + assert_eq!(buf[idx], 0); + assert_eq!(buf[idx + 1], 255); + assert_eq!(buf[idx + 2], 0); +} + +#[test] +fn test_rasterize_text_overlay_produces_pixels() { + let cfg = config::TextOverlayConfig { + text: "Hi".to_string(), + transform: config::OverlayTransform { + rect: Rect { x: 0, y: 0, width: 64, height: 32 }, + opacity: 1.0, + rotation_degrees: 0.0, + z_index: 0, + mirror_horizontal: false, + mirror_vertical: false, + }, + color: [255, 255, 0, 255], + font_size: 24, + font_path: None, + font_data_base64: None, + font_name: None, + }; + let overlay = rasterize_text_overlay(&cfg); + // Width and height should be at least the original rect dimensions. + assert!(overlay.width >= 64); + assert!(overlay.height >= 32); + // The rect in the returned overlay should match the bitmap dimensions. + assert_eq!(overlay.rect.width, overlay.width); + assert_eq!(overlay.rect.height, overlay.height); + // Should have some non-zero pixels (text was drawn). + assert!(overlay.rgba_data.iter().any(|&b| b > 0)); +} + +#[test] +fn test_fit_rect_preserving_aspect() { + // 4:3 source into 16:9 bounds → pillarboxed (width-limited) + let bounds = Rect { x: 100, y: 50, width: 426, height: 240 }; + let fitted = fit_rect_preserving_aspect(640, 480, &bounds); + // Scale = min(426/640, 240/480) = min(0.666, 0.5) = 0.5 + // Fitted: 320×240, centred within 426×240 + assert_eq!(fitted.width, 320); + assert_eq!(fitted.height, 240); + assert_eq!(fitted.x, 100 + (426 - 320) / 2); + assert_eq!(fitted.y, 50); + + // 16:9 source into 4:3 bounds → letterboxed (height-limited) + let bounds = Rect { x: 0, y: 0, width: 400, height: 400 }; + let fitted = fit_rect_preserving_aspect(1280, 720, &bounds); + // Scale = min(400/1280, 400/720) = min(0.3125, 0.555) = 0.3125 + // Fitted: 400×225, centred within 400×400 + assert_eq!(fitted.width, 400); + assert_eq!(fitted.height, 225); + assert_eq!(fitted.x, 0); + assert_eq!(fitted.y, (400 - 225) / 2); + + // Exact match → no change + let bounds = Rect { x: 10, y: 20, width: 640, height: 480 }; + let fitted = fit_rect_preserving_aspect(640, 480, &bounds); + assert_eq!(fitted.width, 640); + assert_eq!(fitted.height, 480); + assert_eq!(fitted.x, 10); + assert_eq!(fitted.y, 20); +} + +#[test] +fn test_config_validate_ok() { + let cfg = CompositorConfig::default(); + assert!(cfg.validate().is_ok()); +} + +#[test] +fn test_config_validate_zero_dimensions() { + let cfg = CompositorConfig { width: 0, height: 720, ..Default::default() }; + assert!(cfg.validate().is_err()); +} + +#[test] +fn test_config_validate_bad_opacity() { + let mut cfg = CompositorConfig::default(); + cfg.layers.insert("in_0".to_string(), LayerConfig { opacity: 1.5, ..Default::default() }); + assert!(cfg.validate().is_err()); +} + +// ── Integration test: node run() with mock context ────────────────── + +#[tokio::test] +async fn test_compositor_node_run_main_only() { + let (input_tx, input_rx) = mpsc::channel(10); + let mut inputs = HashMap::new(); + inputs.insert("in_0".to_string(), input_rx); + + let (context, mock_sender, mut state_rx) = create_test_context(inputs, 10); + + let config = CompositorConfig { width: 4, height: 4, ..Default::default() }; + let node = CompositorNode::new(config); + + let node_handle = tokio::spawn(async move { Box::new(node).run(context).await }); + + assert_state_initializing(&mut state_rx).await; + assert_state_running(&mut state_rx).await; + + // Send a red frame. + let frame = make_rgba_frame(2, 2, 255, 0, 0, 255); + input_tx.send(Packet::Video(frame)).await.unwrap(); + + // Give time for processing. + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + // Close input. + drop(input_tx); + + assert_state_stopped(&mut state_rx).await; + node_handle.await.unwrap().unwrap(); + + let output_packets = mock_sender.get_packets_for_pin("out").await; + assert!(!output_packets.is_empty(), "Expected at least 1 output frame"); + + // Verify output is 4x4 RGBA. + if let Packet::Video(ref out_frame) = output_packets[0] { + assert_eq!(out_frame.width, 4); + assert_eq!(out_frame.height, 4); + assert_eq!(out_frame.pixel_format, PixelFormat::Rgba8); + // Should be red (2x2 scaled to fill 4x4). + assert_eq!(out_frame.data()[0], 255); // R + assert_eq!(out_frame.data()[1], 0); // G + } else { + panic!("Expected video packet"); + } +} + +#[tokio::test] +async fn test_compositor_node_preserves_metadata() { + let (input_tx, input_rx) = mpsc::channel(10); + let mut inputs = HashMap::new(); + inputs.insert("in_0".to_string(), input_rx); + + let (context, mock_sender, mut state_rx) = create_test_context(inputs, 10); + + let config = CompositorConfig { width: 2, height: 2, ..Default::default() }; + let node = CompositorNode::new(config); + + let node_handle = tokio::spawn(async move { Box::new(node).run(context).await }); + + assert_state_initializing(&mut state_rx).await; + assert_state_running(&mut state_rx).await; + + let mut frame = make_rgba_frame(2, 2, 100, 100, 100, 255); + frame.metadata = Some(PacketMetadata { + timestamp_us: Some(42_000), + duration_us: Some(33_333), + sequence: Some(7), + keyframe: Some(true), + }); + input_tx.send(Packet::Video(frame)).await.unwrap(); + + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + drop(input_tx); + + assert_state_stopped(&mut state_rx).await; + node_handle.await.unwrap().unwrap(); + + let output_packets = mock_sender.get_packets_for_pin("out").await; + assert!(!output_packets.is_empty()); + + if let Packet::Video(ref out_frame) = output_packets[0] { + let meta = out_frame.metadata.as_ref().expect("metadata should be preserved"); + assert_eq!(meta.timestamp_us, Some(42_000)); + assert_eq!(meta.duration_us, Some(33_333)); + assert_eq!(meta.sequence, Some(0)); // output sequence starts at 0 + } else { + panic!("Expected video packet"); + } +} + +#[test] +fn test_compositor_definition_pins() { + let (inputs, outputs) = CompositorNode::definition_pins(); + assert_eq!(inputs.len(), 1); + assert_eq!(inputs[0].name, "in"); + assert!(matches!(inputs[0].cardinality, PinCardinality::Dynamic { .. })); + assert_eq!(outputs.len(), 1); + assert_eq!(outputs[0].name, "out"); +} + +#[test] +fn test_compositor_pool_usage() { + use streamkit_core::frame_pool::FramePool; + + let canvas_w = 4u32; + let canvas_h = 4u32; + let total = (canvas_w as usize) * (canvas_h as usize) * 4; // 64 bytes + + let pool = FramePool::::preallocated(&[total], 2); + assert_eq!(pool.stats().buckets[0].available, 2); + + let mut cache = ConversionCache::new(); + let result = composite_frame(canvas_w, canvas_h, &[], &[], &[], Some(&pool), &mut cache); + assert_eq!(result.as_slice().len(), total); + // One buffer was taken from the pool. + assert_eq!(pool.stats().buckets[0].available, 1); + + // Drop returns to pool. + drop(result); + assert_eq!(pool.stats().buckets[0].available, 2); +} + +// ── SIMD vs scalar equivalence tests ──────────────────────────────── + +/// Helper: scalar I420→RGBA8 conversion for a single pixel (reference). +#[allow(clippy::many_single_char_names)] +fn scalar_i420_to_rgba8(y: u8, u: u8, v: u8) -> [u8; 4] { + let c = i32::from(y) - 16; + let d = i32::from(u) - 128; + let e = i32::from(v) - 128; + let r = ((298 * c + 409 * e + 128) >> 8).clamp(0, 255) as u8; + let g = ((298 * c - 100 * d - 208 * e + 128) >> 8).clamp(0, 255) as u8; + let b = ((298 * c + 516 * d + 128) >> 8).clamp(0, 255) as u8; + [r, g, b, 255] +} + +/// Helper: scalar RGBA8→Y for a single pixel (reference). +fn scalar_rgba8_to_y(r: u8, g: u8, b: u8) -> u8 { + let y = ((66 * i32::from(r) + 129 * i32::from(g) + 25 * i32::from(b) + 128) >> 8) + 16; + y.clamp(0, 255) as u8 +} + +#[test] +fn test_i420_to_rgba8_simd_matches_scalar() { + // Test a variety of YUV values, including edge cases that trigger + // i16 overflow with the BT.601 coefficients. + let test_cases: Vec<(u8, u8, u8)> = vec![ + (16, 128, 128), // black + (235, 128, 128), // white + (81, 90, 240), // pure red + (145, 54, 34), // pure green + (41, 240, 110), // pure blue + (255, 128, 128), // max Y + (0, 0, 0), // min everything + (255, 255, 255), // max everything + (16, 0, 255), // extreme chroma + (235, 255, 0), // extreme chroma + ]; + + let width = test_cases.len() as u32; + // Build I420 buffer. + let mut y_plane = Vec::new(); + let mut u_plane = Vec::new(); + let mut v_plane = Vec::new(); + for &(y, u, v) in &test_cases { + y_plane.push(y); + // Each chroma sample covers 2 luma pixels horizontally. + if y_plane.len() % 2 == 1 { + u_plane.push(u); + v_plane.push(v); + } + } + let chroma_w = (width as usize).div_ceil(2); + // Pad if needed. + while u_plane.len() < chroma_w { + u_plane.push(128); + v_plane.push(128); + } + + let mut i420_data = Vec::new(); + i420_data.extend_from_slice(&y_plane); + i420_data.extend_from_slice(&u_plane); + i420_data.extend_from_slice(&v_plane); + + // Convert using the public function (which uses SIMD internally). + let mut simd_out = vec![0u8; width as usize * 4]; + pixel_ops::i420_to_rgba8_buf(&i420_data, width, 1, &mut simd_out); + + // Compare with scalar reference. + for (i, &(y, _u, _v)) in test_cases.iter().enumerate() { + // For chroma, each sample covers 2 pixels, so use the chroma + // value from the corresponding pair. + let chroma_idx = i / 2; + let actual_u = u_plane[chroma_idx]; + let actual_v = v_plane[chroma_idx]; + let expected = scalar_i420_to_rgba8(y, actual_u, actual_v); + let got = &simd_out[i * 4..(i + 1) * 4]; + assert_eq!( + got, &expected, + "pixel {i}: Y={y} U={actual_u} V={actual_v} → expected {expected:?}, got {got:?}" + ); + } +} + +#[test] +fn test_rgba8_to_i420_simd_matches_scalar() { + // Test RGBA→Y conversion with values that trigger i16 overflow + // (129 * 255 = 32895 > i16::MAX). + let test_pixels: Vec<(u8, u8, u8)> = vec![ + (0, 0, 0), // black + (255, 255, 255), // white + (255, 0, 0), // red + (0, 255, 0), // green + (0, 0, 255), // blue + (128, 128, 128), // mid grey + (0, 254, 0), // just below overflow threshold + (0, 255, 0), // at overflow threshold + ]; + + let width = test_pixels.len() as u32; + let mut rgba_data = Vec::with_capacity(width as usize * 4); + for &(r, g, b) in &test_pixels { + rgba_data.extend_from_slice(&[r, g, b, 255]); + } + + // Convert using the public function (SIMD internally). + let i420_size = width as usize + 2 * (width as usize).div_ceil(2); + let mut i420_out = vec![0u8; i420_size]; + pixel_ops::rgba8_to_i420_buf(&rgba_data, width, 1, &mut i420_out); + + // Check Y plane matches scalar. + for (i, &(r, g, b)) in test_pixels.iter().enumerate() { + let expected_y = scalar_rgba8_to_y(r, g, b); + let got_y = i420_out[i]; + assert_eq!( + got_y, expected_y, + "pixel {i}: R={r} G={g} B={b} → expected Y={expected_y}, got Y={got_y}" + ); + } +} + +#[test] +fn test_i420_rgba8_roundtrip_preserves_values() { + // A full I420→RGBA8→I420 round-trip should produce values close + // to the originals (within ±2 due to integer rounding). + let width: u32 = 8; + let height: u32 = 2; + let w = width as usize; + let h = height as usize; + let chroma_w = w.div_ceil(2); + + // Build a simple I420 test pattern. + let mut i420_data = vec![0u8; w * h + 2 * chroma_w * (h / 2)]; + // Y plane: gradient. + for (i, val) in i420_data[..w * h].iter_mut().enumerate() { + *val = (16 + (i * 219 / (w * h))) as u8; + } + // U/V planes: mid-range. + let u_offset = w * h; + let v_offset = u_offset + chroma_w * (h / 2); + for i in 0..chroma_w * (h / 2) { + i420_data[u_offset + i] = 128; + i420_data[v_offset + i] = 128; + } + + // I420 → RGBA8 → I420 + let mut rgba = vec![0u8; w * h * 4]; + pixel_ops::i420_to_rgba8_buf(&i420_data, width, height, &mut rgba); + let mut i420_roundtrip = vec![0u8; i420_data.len()]; + pixel_ops::rgba8_to_i420_buf(&rgba, width, height, &mut i420_roundtrip); + + // Y values should be close (within ±2 of originals due to rounding). + for (idx, orig_val) in i420_data[..w * h].iter().enumerate() { + let orig = i32::from(*orig_val); + let rt = i32::from(i420_roundtrip[idx]); + assert!( + (orig - rt).abs() <= 2, + "Y[{idx}]: original={orig}, roundtrip={rt}, diff={}", + (orig - rt).abs() + ); + } +} + +/// Test that `scale_blit_rgba` with opacity < 1.0 writes all rows correctly +/// on a buffer wide enough to exercise the AVX2 blend path (32 pixels). +/// This verifies the AVX2 → SSE2 → scalar cascade in `blit_row_alpha`. +#[test] +fn test_scale_blit_opacity_all_rows_written() { + let w = 32usize; + let h = 32usize; + // Fully opaque red source. + let src: Vec = [200, 50, 30, 255].repeat(w * h); + // All-black destination (simulates cleared canvas). + let mut dst = vec![0u8; w * h * 4]; + + scale_blit_rgba( + &mut dst, + w as u32, + h as u32, + &src, + w as u32, + h as u32, + &BlitRect { x: 0, y: 0, width: w as u32, height: h as u32 }, + 0.9, + false, + false, + false, + ); + + // Every single row should have been written to (non-zero pixels). + for row in 0..h { + let row_start = row * w * 4; + let row_slice = &dst[row_start..row_start + w * 4]; + let any_written = row_slice.iter().any(|&b| b != 0); + assert!(any_written, "Row {row} was not written to (all zeros)"); + + // Verify each pixel matches the expected scalar blend. + // opacity_u16 = (0.9 * 255 + 0.5) as u16 = 230 + // sa_eff = (255 * 230 + 128) >> 8 = 229 + // Dst is black (0), so blended = src * sa_eff / 255. + let opacity_u16: u16 = 230; + let sa_eff = ((255u16 * opacity_u16 + 128) >> 8).min(255); + let expected_r = { + let blend = 200u16 * sa_eff + 128; + ((blend + (blend >> 8)) >> 8) as u8 + }; + let expected_g = { + let blend = 50u16 * sa_eff + 128; + ((blend + (blend >> 8)) >> 8) as u8 + }; + let expected_b = { + let blend = 30u16 * sa_eff + 128; + ((blend + (blend >> 8)) >> 8) as u8 + }; + for col in 0..w { + let idx = row_start + col * 4; + let got_r = dst[idx]; + let got_g = dst[idx + 1]; + let got_b = dst[idx + 2]; + let got_a = dst[idx + 3]; + + // Allow ±1 for rounding differences between SIMD and scalar paths. + assert!( + (i16::from(got_r) - i16::from(expected_r)).abs() <= 1, + "Row {row}, Col {col}: R={got_r}, expected ~{expected_r}" + ); + assert!( + (i16::from(got_g) - i16::from(expected_g)).abs() <= 1, + "Row {row}, Col {col}: G={got_g}, expected ~{expected_g}" + ); + assert!( + (i16::from(got_b) - i16::from(expected_b)).abs() <= 1, + "Row {row}, Col {col}: B={got_b}, expected ~{expected_b}" + ); + assert!(got_a > 200, "Row {row}, Col {col}: A={got_a}, expected >200"); + } + } +} + +/// Test I420→RGBA8 AVX2 kernel correctness with a multi-row buffer wide +/// enough to exercise the 8-pixel AVX2 path plus scalar remainder. +/// Verifies the OOB-safe scalar chroma reads produce identical output to +/// the scalar reference for every pixel. +#[test] +fn test_i420_to_rgba8_avx2_wide_multirow() { + // 24 pixels wide = 3 AVX2 iterations (8px each) with 0 remainder. + // 4 rows to exercise multi-row chroma subsampling. + let width: u32 = 24; + let height: u32 = 4; + let w = width as usize; + let h = height as usize; + let chroma_w = w / 2; + + // Build a varied I420 test pattern. + let mut i420_data = vec![0u8; w * h + 2 * chroma_w * (h / 2)]; + // Y plane: gradient across rows and columns. + for row in 0..h { + for col in 0..w { + i420_data[row * w + col] = (16 + ((row * w + col) * 219) / (w * h)) as u8; + } + } + // U/V planes: varying chroma values. + let u_offset = w * h; + let v_offset = u_offset + chroma_w * (h / 2); + for i in 0..chroma_w * (h / 2) { + i420_data[u_offset + i] = (64 + (i * 3) % 192) as u8; + i420_data[v_offset + i] = (32 + (i * 7) % 224) as u8; + } + + // Convert using the public function (dispatches to AVX2 on this machine). + let mut simd_out = vec![0u8; w * h * 4]; + pixel_ops::i420_to_rgba8_buf(&i420_data, width, height, &mut simd_out); + + // Compare every pixel against the scalar reference. + for row in 0..h { + for col in 0..w { + let luma = i420_data[row * w + col]; + let chroma_r = row / 2; + let chroma_c = col / 2; + let u_val = i420_data[u_offset + chroma_r * chroma_w + chroma_c]; + let v_val = i420_data[v_offset + chroma_r * chroma_w + chroma_c]; + let expected = scalar_i420_to_rgba8(luma, u_val, v_val); + let got_idx = (row * w + col) * 4; + let got = &simd_out[got_idx..got_idx + 4]; + assert_eq!( + got, &expected, + "row={row} col={col}: Y={luma} U={u_val} V={v_val} → expected {expected:?}, got {got:?}" + ); + } + } +} + +/// Test that opacity < 1.0 through `composite_frame` produces correct +/// output with no black borders when source matches canvas dimensions. +#[test] +fn test_composite_frame_opacity_no_black_borders() { + let w = 32u32; + let h = 32u32; + let frame = make_rgba_frame(w, h, 200, 100, 50, 255); + + let layer = LayerSnapshot { + data: frame.data, + width: w, + height: h, + pixel_format: PixelFormat::Rgba8, + rect: Some(Rect { x: 0, y: 0, width: w, height: h }), + opacity: 0.8, + z_index: 0, + rotation_degrees: 0.0, + mirror_horizontal: false, + mirror_vertical: false, + }; + + let mut cache = ConversionCache::new(); + let result = composite_frame(w, h, &[Some(layer)], &[], &[], None, &mut cache); + let buf = result.as_slice(); + + // Every row should have non-zero content (no black borders). + for row in 0..h as usize { + let row_start = row * w as usize * 4; + let row_end = row_start + w as usize * 4; + let any_nonzero = buf[row_start..row_end].iter().any(|&b| b != 0); + assert!(any_nonzero, "Row {row} is all zeros — black border detected"); + } +} + +/// Full-pipeline test at real dimensions (640×480): compositor blit with +/// opacity < 1.0, then RGBA→NV12→RGBA roundtrip, checking for black bands. +/// This exercises the exact pipeline the VP9 encoder sees. +#[test] +#[allow(clippy::many_single_char_names)] // Standard image-processing shorthand (w, h, r, g, b, etc.) +fn test_full_pipeline_opacity_nv12_roundtrip_no_black_bands() { + let w = 640u32; + let h = 480u32; + let wu = w as usize; + let hu = h as usize; + + // Create a colorbars-like pattern: 7 vertical bars of different colors. + let colors: [(u8, u8, u8); 7] = [ + (255, 255, 255), // white + (255, 255, 0), // yellow + (0, 255, 255), // cyan + (0, 255, 0), // green + (255, 0, 255), // magenta + (255, 0, 0), // red + (0, 0, 255), // blue + ]; + let mut src_rgba = vec![0u8; wu * hu * 4]; + for row in 0..hu { + for col in 0..wu { + let bar_idx = (col * 7) / wu; + let (r, g, b) = colors[bar_idx]; + let off = (row * wu + col) * 4; + src_rgba[off] = r; + src_rgba[off + 1] = g; + src_rgba[off + 2] = b; + src_rgba[off + 3] = 255; + } + } + + // Step 1: Blit onto canvas with opacity 0.9 (through scale_blit_rgba_rotated, + // exactly as the compositor does). + let mut canvas = vec![0u8; wu * hu * 4]; + pixel_ops::scale_blit_rgba_rotated( + &mut canvas, + w, + h, + &src_rgba, + w, + h, + &BlitRect { x: 0, y: 0, width: w, height: h }, + 0.9, + 0.0, + false, + false, + false, + ); + + // Verify compositor output: every row should have non-zero pixels. + for row in 0..hu { + let row_start = row * wu * 4; + let any_nonzero = canvas[row_start..row_start + wu * 4].iter().any(|&b| b != 0); + assert!(any_nonzero, "Compositor output row {row} is all zeros (black band)"); + } + + // Step 2: Convert RGBA → NV12 (exactly as the VP9 encoder does). + let chroma_w = wu.div_ceil(2); + let chroma_h = hu.div_ceil(2); + let nv12_size = wu * hu + chroma_w * 2 * chroma_h; + let mut nv12 = vec![0u8; nv12_size]; + pixel_ops::rgba8_to_nv12_buf(&canvas, w, h, &mut nv12); + + // Verify Y plane: no rows should be all-zero (Y=0 is below black level). + // With opacity 0.9 on colored bars, Y values should be well above 0. + for row in 0..hu { + let y_row = &nv12[row * wu..(row + 1) * wu]; + let max_y = *y_row.iter().max().unwrap(); + assert!(max_y > 16, "NV12 Y-plane row {row}: max Y={max_y}, expected >16 (not black)"); + } + + // Step 3: Convert NV12 → RGBA (simulates decoder display). + let mut decoded_rgba = vec![0u8; wu * hu * 4]; + pixel_ops::nv12_to_rgba8_buf(&nv12, w, h, &mut decoded_rgba); + + // Verify decoded output: every row should have non-black pixels. + for row in 0..hu { + let row_start = row * wu * 4; + let row_slice = &decoded_rgba[row_start..row_start + wu * 4]; + // Check that at least some pixels have R, G, or B > 10 (not near-black). + let has_visible = + row_slice.chunks_exact(4).any(|px| px[0] > 10 || px[1] > 10 || px[2] > 10); + assert!(has_visible, "Decoded row {row} has no visible pixels (all near-black)"); + } +} + +/// Regression test: a 4:3 source blitted onto a 16:9 canvas with opacity < 1.0 +/// must cover the entire canvas (stretch-to-fill) with no black bars. +/// Previously the near-zero rotation fast path applied an aspect-ratio-preserving +/// fit that left letterbox gaps visible as black bands when opacity < 1.0. +#[test] +fn test_mismatched_aspect_ratio_opacity_no_black_bars() { + let src_w = 640u32; + let src_h = 480u32; // 4:3 + let canvas_w = 1280u32; + let canvas_h = 720u32; // 16:9 + + // Solid green source. + let src = [0u8, 255, 0, 255].repeat((src_w * src_h) as usize); + let mut canvas = vec![0u8; (canvas_w * canvas_h * 4) as usize]; + + pixel_ops::scale_blit_rgba_rotated( + &mut canvas, + canvas_w, + canvas_h, + &src, + src_w, + src_h, + &BlitRect { x: 0, y: 0, width: canvas_w, height: canvas_h }, + 0.9, + 0.0, // no rotation — exercises the near-zero fast path + false, + false, + false, + ); + + // Every row should have non-zero pixels (no black bars on left/right). + for row in 0..canvas_h as usize { + let row_start = row * canvas_w as usize * 4; + let row_end = row_start + canvas_w as usize * 4; + let any_nonzero = canvas[row_start..row_end].iter().any(|&b| b != 0); + assert!(any_nonzero, "Row {row} is all zeros — black bar detected"); + } + + // Every column should have non-zero pixels (no black bars on top/bottom). + for col in 0..canvas_w as usize { + let any_nonzero = (0..canvas_h as usize).any(|row| { + let idx = (row * canvas_w as usize + col) * 4; + canvas[idx] != 0 || canvas[idx + 1] != 0 || canvas[idx + 2] != 0 + }); + assert!(any_nonzero, "Column {col} is all zeros — black bar detected"); + } +} + +/// Regression test: a 4:3 source blitted into a non-square rect with 15° +/// rotation must cover the centre of the rect (stretch-to-fill, not +/// aspect-ratio fit). Exercises the rotated path's per-axis inverse +/// scaling (`inv_scale_x` / `inv_scale_y`). +#[test] +fn test_rotated_blit_mismatched_aspect_ratio_covers_centre() { + // 4×2 red source into a 40×20 rect (2:1 aspect mismatch) at 15° on + // a 60×40 canvas. The centre of the rect (canvas pixel 30,20) must + // be covered by red source content. + let src = [255u8, 0, 0, 255].repeat(4 * 2); // 4×2 solid red + let mut dst = vec![0u8; 60 * 40 * 4]; + + scale_blit_rgba_rotated( + &mut dst, + 60, + 40, + &src, + 4, + 2, + &BlitRect { x: 10, y: 10, width: 40, height: 20 }, + 1.0, + 15.0, + false, + false, + false, + ); + + // Centre of the rect (canvas pixel 30, 20) should be red. + let cx = 30usize; + let cy = 20usize; + let idx = (cy * 60 + cx) * 4; + assert_eq!(dst[idx], 255, "Centre R"); + assert_eq!(dst[idx + 1], 0, "Centre G"); + assert_eq!(dst[idx + 2], 0, "Centre B"); + assert!(dst[idx + 3] > 200, "Centre A should be mostly opaque"); +} + +/// Test RGBA→NV12 AVX2 chroma conversion matches scalar reference. +/// Uses a 640-wide frame to fully exercise the AVX2 path (8 chroma samples/iter). +#[test] +#[allow(clippy::many_single_char_names)] // Standard image-processing shorthand (w, h, r, g, b, etc.) +fn test_rgba8_to_nv12_avx2_chroma_matches_scalar() { + let w = 640u32; + let h = 4u32; + let wu = w as usize; + let hu = h as usize; + let chroma_w = wu / 2; + let chroma_h = hu / 2; + + // Create a varied RGBA pattern. + let mut rgba = vec![0u8; wu * hu * 4]; + for row in 0..hu { + for col in 0..wu { + let off = (row * wu + col) * 4; + rgba[off] = ((col * 3 + row * 7) % 256) as u8; // R + rgba[off + 1] = ((col * 5 + row * 11) % 256) as u8; // G + rgba[off + 2] = ((col * 7 + row * 13) % 256) as u8; // B + rgba[off + 3] = 255; // A + } + } + + // Convert using the public function (dispatches to AVX2). + let nv12_size = wu * hu + chroma_w * 2 * chroma_h; + let mut nv12_simd = vec![0u8; nv12_size]; + pixel_ops::rgba8_to_nv12_buf(&rgba, w, h, &mut nv12_simd); + + // Compute scalar reference for the chroma plane. + let y_size = wu * hu; + for crow in 0..chroma_h { + let r0 = crow * 2; + for ccol in 0..chroma_w { + let c0 = ccol * 2; + let mut sr = 0i32; + let mut sg = 0i32; + let mut sb = 0i32; + let mut count = 0i32; + for dr in 0..2u32 { + let rr = r0 + dr as usize; + if rr >= hu { + continue; + } + for dc in 0..2u32 { + let cc = c0 + dc as usize; + if cc < wu { + let off = (rr * wu + cc) * 4; + sr += i32::from(rgba[off]); + sg += i32::from(rgba[off + 1]); + sb += i32::from(rgba[off + 2]); + count += 1; + } + } + } + let r_avg = sr / count; + let g_avg = sg / count; + let b_avg = sb / count; + let expected_u = ((-38 * r_avg - 74 * g_avg + 112 * b_avg + 128) >> 8) + 128; + let expected_v = ((112 * r_avg - 94 * g_avg - 18 * b_avg + 128) >> 8) + 128; + let expected_u = expected_u.clamp(0, 255) as u8; + let expected_v = expected_v.clamp(0, 255) as u8; + + let uv_off = y_size + crow * chroma_w * 2 + ccol * 2; + let got_u = nv12_simd[uv_off]; + let got_v = nv12_simd[uv_off + 1]; + + // Allow ±2 for rounding differences between SIMD and scalar. + assert!( + (i16::from(got_u) - i16::from(expected_u)).abs() <= 2, + "crow={crow} ccol={ccol}: U got={got_u}, expected={expected_u}" + ); + assert!( + (i16::from(got_v) - i16::from(expected_v)).abs() <= 2, + "crow={crow} ccol={ccol}: V got={got_v}, expected={expected_v}" + ); + } + } + + // Also verify Y plane matches scalar reference. + for row in 0..hu { + for col in 0..wu { + let off = (row * wu + col) * 4; + let r = i32::from(rgba[off]); + let g = i32::from(rgba[off + 1]); + let b = i32::from(rgba[off + 2]); + let expected_y = (((66 * r + 129 * g + 25 * b + 128) >> 8) + 16).clamp(0, 255) as u8; + let got_y = nv12_simd[row * wu + col]; + assert!( + (i16::from(got_y) - i16::from(expected_y)).abs() <= 1, + "row={row} col={col}: Y got={got_y}, expected={expected_y}" + ); + } + } +} diff --git a/crates/nodes/src/video/mod.rs b/crates/nodes/src/video/mod.rs index 4cbb6d86..d103b923 100644 --- a/crates/nodes/src/video/mod.rs +++ b/crates/nodes/src/video/mod.rs @@ -37,6 +37,9 @@ pub mod colorbars; #[cfg(feature = "compositor")] pub mod compositor; +#[cfg(any(feature = "video", feature = "compositor"))] +pub mod pixel_ops; + #[cfg(feature = "compositor")] pub mod pixel_convert; diff --git a/crates/nodes/src/video/pixel_convert.rs b/crates/nodes/src/video/pixel_convert.rs index 2ba64490..4aec8b8a 100644 --- a/crates/nodes/src/video/pixel_convert.rs +++ b/crates/nodes/src/video/pixel_convert.rs @@ -25,7 +25,6 @@ use schemars::JsonSchema; use serde::Deserialize; use std::sync::Arc; use std::time::Instant; -use streamkit_core::control::NodeControlMessage; use streamkit_core::stats::NodeStatsTracker; use streamkit_core::types::{Packet, PacketType, PixelFormat, VideoFormat, VideoFrame}; use streamkit_core::{ @@ -36,7 +35,7 @@ use streamkit_core::{ use tokio::sync::mpsc; use super::parse_pixel_format; -use crate::video::compositor::pixel_ops::{ +use crate::video::pixel_ops::{ i420_to_rgba8_buf, nv12_to_rgba8_buf, rgba8_to_i420_buf, rgba8_to_nv12_buf, }; @@ -127,6 +126,10 @@ impl ProcessorNode for PixelConvertNode { tracing::info!("PixelConvertNode starting: target_format={:?}", self.target_format); + let meter = global::meter("skit_nodes"); + let packets_processed_counter = + meter.u64_counter("pixel_convert_packets_processed").build(); + // ── Blocking conversion thread ────────────────────────────────── let target_format = self.target_format; let otel_node_name = node_name.clone(); @@ -263,89 +266,18 @@ impl ProcessorNode for PixelConvertNode { tracing::info!("PixelConvertNode input stream closed"); }); - // ── Forward loop (mirrors codec_forward_loop pattern) ─────────── - loop { - tokio::select! { - maybe_result = result_rx.recv() => { - match maybe_result { - Some(Ok(out_frame)) => { - // Determine if this was a passthrough or conversion. - // We check the pixel format: if it matches AND the frame - // wasn't converted, it's a passthrough. - // Since the blocking thread only sends Ok when it - // succeeded, we count based on whether conversion happened. - // The simplest reliable signal: check if the output format - // differs from what we'd expect on passthrough... but - // actually both paths produce target_format. We'll count - // in the blocking thread instead by sending a flag. - // For simplicity, count all successful outputs here and - // rely on the metrics from the blocking thread. - - stats_tracker.received(); - if context - .output_sender - .send("out", Packet::Video(out_frame)) - .await - .is_err() - { - tracing::debug!("Output channel closed, stopping node"); - break; - } - stats_tracker.sent(); - stats_tracker.maybe_send(); - }, - Some(Err(err)) => { - stats_tracker.received(); - stats_tracker.errored(); - stats_tracker.maybe_send(); - tracing::warn!("PixelConvertNode conversion error: {err}"); - }, - None => break, - } - } - Some(control_msg) = context.control_rx.recv() => { - if matches!(control_msg, NodeControlMessage::Shutdown) { - tracing::info!("PixelConvertNode received shutdown signal"); - input_task.abort(); - convert_task.abort(); - drop(convert_tx); - break; - } - } - _ = &mut input_task => { - // Input finished — drop sender to signal blocking thread, - // then drain remaining results. - drop(convert_tx); - while let Some(maybe_result) = result_rx.recv().await { - match maybe_result { - Ok(out_frame) => { - stats_tracker.received(); - if context - .output_sender - .send("out", Packet::Video(out_frame)) - .await - .is_err() - { - break; - } - stats_tracker.sent(); - stats_tracker.maybe_send(); - }, - Err(err) => { - stats_tracker.received(); - stats_tracker.errored(); - stats_tracker.maybe_send(); - tracing::warn!("PixelConvertNode conversion error: {err}"); - }, - } - } - break; - } - } - } - - convert_task.abort(); - let _ = convert_task.await; + crate::codec_utils::codec_forward_loop( + &mut context, + &mut result_rx, + &mut input_task, + convert_task, + convert_tx, + &packets_processed_counter, + &mut stats_tracker, + Packet::Video, + "PixelConvertNode", + ) + .await; state_helpers::emit_stopped(&context.state_tx, &node_name, "input_closed"); tracing::info!("PixelConvertNode shutting down."); diff --git a/crates/nodes/src/video/compositor/pixel_ops/blit.rs b/crates/nodes/src/video/pixel_ops/blit.rs similarity index 99% rename from crates/nodes/src/video/compositor/pixel_ops/blit.rs rename to crates/nodes/src/video/pixel_ops/blit.rs index 9f7a9655..079bf4c9 100644 --- a/crates/nodes/src/video/compositor/pixel_ops/blit.rs +++ b/crates/nodes/src/video/pixel_ops/blit.rs @@ -12,7 +12,17 @@ //! is large enough to amortise the thread-pool dispatch overhead. use super::{blend_u8, rayon_chunk_rows, RAYON_ROW_THRESHOLD}; -use crate::video::compositor::config::Rect; +/// Pixel-space rectangle for positioning a layer on the output canvas. +/// +/// `x` and `y` are signed to allow off-screen positioning (e.g. for +/// slide-in effects or rotation around the rect centre). +#[derive(Debug, Clone)] +pub struct BlitRect { + pub x: i32, + pub y: i32, + pub width: u32, + pub height: u32, +} #[cfg(target_arch = "x86_64")] use super::simd::{ @@ -80,7 +90,7 @@ pub fn scale_blit_rgba( src: &[u8], src_width: u32, src_height: u32, - dst_rect: &Rect, + dst_rect: &BlitRect, opacity: f32, #[allow(unused_variables)] src_opaque: bool, mirror_h: bool, @@ -735,7 +745,7 @@ pub fn scale_blit_rgba_rotated( src: &[u8], src_width: u32, src_height: u32, - dst_rect: &Rect, + dst_rect: &BlitRect, opacity: f32, rotation_deg: f32, src_opaque: bool, diff --git a/crates/nodes/src/video/compositor/pixel_ops/convert.rs b/crates/nodes/src/video/pixel_ops/convert.rs similarity index 100% rename from crates/nodes/src/video/compositor/pixel_ops/convert.rs rename to crates/nodes/src/video/pixel_ops/convert.rs diff --git a/crates/nodes/src/video/pixel_ops/mod.rs b/crates/nodes/src/video/pixel_ops/mod.rs new file mode 100644 index 00000000..fcfc8476 --- /dev/null +++ b/crates/nodes/src/video/pixel_ops/mod.rs @@ -0,0 +1,106 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +//! Pixel-level operations for video processing. +//! +//! Contains RGBA8 blitting (with nearest-neighbor scaling), alpha blending, +//! overlay compositing, and I420 / NV12 ↔ RGBA8 colour-space conversion. +//! +//! This module is shared between the compositor and standalone nodes such as +//! [`super::pixel_convert`]. +//! +//! All hot loops use row-level parallelism via `rayon` when the region is +//! large enough to amortise the thread-pool dispatch overhead. Below the +//! threshold the same per-row closures run sequentially. +//! +//! # Module structure +//! +//! - [`blit`] — axis-aligned and rotated scale + blit operations. +//! - [`convert`] — colour-space conversion (I420, NV12 ↔ RGBA8). +//! - [`simd`] (x86-64 only) — SIMD kernels for both blitting and conversion. + +mod blit; +mod convert; + +#[cfg(target_arch = "x86_64")] +mod simd_x86_64; + +/// Re-export the x86-64 SIMD module under a shorter name for internal use. +#[cfg(target_arch = "x86_64")] +use simd_x86_64 as simd; + +// ── Shared constants and helpers ──────────────────────────────────────────── + +/// Minimum number of output rows before we dispatch to rayon. Below this +/// threshold the per-row work is small enough that the rayon scheduling +/// overhead (work-stealing queue push/pop, thread wake-up) dominates. +/// 64 rows at 1280-wide RGBA8 ≈ 320 KiB — a reasonable crossover point +/// on modern x86-64 cores. +const RAYON_ROW_THRESHOLD: usize = 64; + +/// Number of rows to bundle into a single rayon task once parallel mode is +/// entered. Reduces work-stealing overhead from ~1 task/row to +/// ~rows/chunk tasks. +/// +/// [`rayon_chunk_rows`] auto-tunes the chunk size based on workload: +/// wider or taller frames produce fewer, larger chunks, keeping +/// scheduling cost proportional to the actual parallelism available. +/// +/// Formula: `max(8, total_rows / (num_cpus * 4))`, clamped to `[8, 64]`. +/// This keeps chunk counts proportional to hardware parallelism while +/// avoiding both excessive scheduling overhead (too many tiny chunks) +/// and poor load-balancing (too few large chunks). +/// +/// The CPU count is cached in a `LazyLock` so we avoid a `sysconf` syscall +/// (~40 µs on Linux) on every call. +fn rayon_chunk_rows(total_rows: usize) -> usize { + static CPUS: std::sync::LazyLock = std::sync::LazyLock::new(|| { + std::thread::available_parallelism().map(std::num::NonZero::get).unwrap_or(1) + }); + let ideal = total_rows.div_ceil(*CPUS * 4); + ideal.clamp(8, 64) +} + +/// Fixed-point alpha blend: `(src * alpha + dst * (255 - alpha) + 128) / 255` +/// using the well-known `((x + (x >> 8)) >> 8)` fast approximation of `x / 255`. +#[allow(clippy::inline_always)] +#[inline(always)] +const fn blend_u8(src: u8, dst: u8, alpha: u16) -> u8 { + let inv = 255 - alpha; + let val = src as u16 * alpha + dst as u16 * inv + 128; + ((val + (val >> 8)) >> 8) as u8 +} + +/// Check whether every pixel's alpha byte in an RGBA8 buffer is `0xFF`. +/// +/// Dispatches to AVX2 / SSE2 kernels on x86-64 (8 / 4 pixels per iteration) +/// and falls back to a scalar scan on other architectures. Safe wrapper +/// around the `target_feature`-gated SIMD helpers so callers outside this +/// module don't need their own `unsafe` + `cfg` scaffolding. +/// +/// Assumes `rgba.len()` is a multiple of 4 — always true for valid RGBA8 data. +pub fn all_alpha_opaque(rgba: &[u8]) -> bool { + #[cfg(target_arch = "x86_64")] + { + // SAFETY: `all_alpha_opaque_{avx2,sse2}` require the input length to be + // a multiple of 4 bytes, which holds for any valid RGBA8 buffer. Both + // helpers handle arbitrary tails internally (scalar fall-through for + // trailing bytes past the last full SIMD chunk). Feature availability + // is checked at runtime via `is_x86_feature_detected!`. + if is_x86_feature_detected!("avx2") { + unsafe { simd::all_alpha_opaque_avx2(rgba) } + } else { + unsafe { simd::all_alpha_opaque_sse2(rgba) } + } + } + #[cfg(not(target_arch = "x86_64"))] + { + rgba.chunks_exact(4).all(|px| px[3] == 255) + } +} + +// ── Public API re-exports ─────────────────────────────────────────────────── + +pub use blit::{scale_blit_rgba, scale_blit_rgba_rotated, BlitRect}; +pub use convert::{i420_to_rgba8_buf, nv12_to_rgba8_buf, rgba8_to_i420_buf, rgba8_to_nv12_buf}; diff --git a/crates/nodes/src/video/compositor/pixel_ops/simd_x86_64.rs b/crates/nodes/src/video/pixel_ops/simd_x86_64.rs similarity index 100% rename from crates/nodes/src/video/compositor/pixel_ops/simd_x86_64.rs rename to crates/nodes/src/video/pixel_ops/simd_x86_64.rs diff --git a/crates/nodes/src/video/vp9.rs b/crates/nodes/src/video/vp9.rs index f35677e7..9f87f1c0 100644 --- a/crates/nodes/src/video/vp9.rs +++ b/crates/nodes/src/video/vp9.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use bytes::Bytes; -use opentelemetry::{global, KeyValue}; +use opentelemetry::global; use schemars::JsonSchema; use serde::Deserialize; use std::borrow::Cow; @@ -228,7 +228,7 @@ impl ProcessorNode for Vp9DecoderNode { tracing::info!("Vp9DecoderNode input stream closed"); }); - codec_forward_loop( + crate::codec_utils::codec_forward_loop( &mut context, &mut result_rx, &mut input_task, @@ -415,7 +415,7 @@ impl ProcessorNode for Vp9EncoderNode { tracing::info!("Vp9EncoderNode input stream closed"); }); - codec_forward_loop( + crate::codec_utils::codec_forward_loop( &mut context, &mut result_rx, &mut input_task, @@ -438,106 +438,6 @@ impl ProcessorNode for Vp9EncoderNode { } } -/// Shared select-loop that forwards codec results to the output sender. -/// -/// Handles three concurrent events: -/// 1. Results arriving from the blocking codec task. -/// 2. Shutdown control messages. -/// 3. Input task completion (triggers drain of remaining results). -/// -/// `to_packet` converts a codec-specific result `T` into a [`Packet`]. -#[allow(clippy::too_many_arguments)] -async fn codec_forward_loop( - context: &mut NodeContext, - result_rx: &mut mpsc::Receiver>, - input_task: &mut tokio::task::JoinHandle<()>, - codec_task: tokio::task::JoinHandle<()>, - codec_tx: mpsc::Sender, - counter: &opentelemetry::metrics::Counter, - stats: &mut NodeStatsTracker, - to_packet: impl Fn(T) -> Packet, - label: &str, -) { - /// Forwards a single successful codec result to the output sender. - /// Returns `true` if the output channel is closed (caller should break). - async fn forward_one( - packet: Packet, - context: &mut NodeContext, - counter: &opentelemetry::metrics::Counter, - stats: &mut NodeStatsTracker, - ) -> bool { - counter.add(1, &[KeyValue::new("status", "ok")]); - stats.received(); - if context.output_sender.send("out", packet).await.is_err() { - tracing::debug!("Output channel closed, stopping node"); - return true; - } - stats.sent(); - stats.maybe_send(); - false - } - - /// Handles a codec error result by updating counters and logging. - fn handle_error( - err: &str, - counter: &opentelemetry::metrics::Counter, - stats: &mut NodeStatsTracker, - label: &str, - ) { - counter.add(1, &[KeyValue::new("status", "error")]); - stats.received(); - stats.errored(); - stats.maybe_send(); - tracing::warn!("{label} codec error: {err}"); - } - - loop { - tokio::select! { - maybe_result = result_rx.recv() => { - match maybe_result { - Some(Ok(item)) => { - if forward_one(to_packet(item), context, counter, stats).await { - break; - } - } - Some(Err(err)) => handle_error(&err, counter, stats, label), - None => break, - } - } - Some(control_msg) = context.control_rx.recv() => { - if matches!(control_msg, streamkit_core::control::NodeControlMessage::Shutdown) { - tracing::info!("{label} received shutdown signal"); - // NOTE: Aborting the input task and dropping codec_tx causes - // the codec thread to exit/flush, but because we break out - // here those flushed results are never sent downstream. - // Data loss on explicit shutdown is acceptable. - input_task.abort(); - codec_task.abort(); - drop(codec_tx); - break; - } - } - _ = &mut *input_task => { - drop(codec_tx); - while let Some(maybe_result) = result_rx.recv().await { - match maybe_result { - Ok(item) => { - if forward_one(to_packet(item), context, counter, stats).await { - break; - } - } - Err(err) => handle_error(&err, counter, stats, label), - } - } - break; - } - } - } - - codec_task.abort(); - let _ = codec_task.await; -} - struct EncodedPacket { data: Bytes, metadata: Option,