diff --git a/rust/hyperstack-sdk/src/connection.rs b/rust/hyperstack-sdk/src/connection.rs index 55a7eb8e..7545b7b6 100644 --- a/rust/hyperstack-sdk/src/connection.rs +++ b/rust/hyperstack-sdk/src/connection.rs @@ -22,6 +22,16 @@ pub enum ConnectionCommand { Disconnect, } +/// Options for subscribing to a view with specific parameters +#[derive(Debug, Clone, Default)] +pub struct SubscriptionOptions { + pub take: Option, + pub skip: Option, + pub with_snapshot: Option, + pub after: Option, + pub snapshot_limit: Option, +} + struct ConnectionManagerInner { #[allow(dead_code)] url: String, @@ -63,7 +73,7 @@ impl ConnectionManager { } pub async fn ensure_subscription(&self, view: &str, key: Option<&str>) { - self.ensure_subscription_with_opts(view, key, None, None) + self.ensure_subscription_with_opts(view, key, SubscriptionOptions::default()) .await } @@ -71,16 +81,18 @@ impl ConnectionManager { &self, view: &str, key: Option<&str>, - take: Option, - skip: Option, + opts: SubscriptionOptions, ) { let sub = Subscription { view: view.to_string(), key: key.map(|s| s.to_string()), partition: None, filters: None, - take, - skip, + take: opts.take, + skip: opts.skip, + with_snapshot: opts.with_snapshot, + after: opts.after, + snapshot_limit: opts.snapshot_limit, }; if !self.inner.subscriptions.read().await.contains(&sub) { @@ -196,6 +208,9 @@ fn spawn_connection_loop( filters: None, take: None, skip: None, + with_snapshot: None, + after: None, + snapshot_limit: None, }; subscriptions.write().await.remove(&sub); let client_msg = ClientMessage::Unsubscribe(unsub); diff --git a/rust/hyperstack-sdk/src/frame.rs b/rust/hyperstack-sdk/src/frame.rs index 29c7a703..f2773a69 100644 --- a/rust/hyperstack-sdk/src/frame.rs +++ b/rust/hyperstack-sdk/src/frame.rs @@ -81,6 +81,9 @@ pub struct Frame { pub data: serde_json::Value, #[serde(default)] pub append: Vec, + /// Sequence cursor for ordering and resume capability + #[serde(skip_serializing_if = "Option::is_none")] + pub seq: Option, } impl Frame { diff --git a/rust/hyperstack-sdk/src/stream.rs b/rust/hyperstack-sdk/src/stream.rs index 21cc7282..725521c0 100644 --- a/rust/hyperstack-sdk/src/stream.rs +++ b/rust/hyperstack-sdk/src/stream.rs @@ -1,4 +1,4 @@ -use crate::connection::ConnectionManager; +use crate::connection::{ConnectionManager, SubscriptionOptions}; use crate::frame::Operation; use crate::store::{SharedStore, StoreUpdate}; use futures_util::Stream; @@ -179,6 +179,9 @@ enum EntityStreamState { subscription_key: Option, take: Option, skip: Option, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, }, Active { inner: BroadcastStream, @@ -246,6 +249,9 @@ impl EntityStream { subscription_key, None, None, + None, + None, + None, ) } @@ -259,6 +265,9 @@ impl EntityStream { subscription_key: Option, take: Option, skip: Option, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, ) -> Self { Self { state: EntityStreamState::Lazy { @@ -268,6 +277,9 @@ impl EntityStream { subscription_key, take, skip, + with_snapshot, + after, + snapshot_limit, }, view: entity_name, key_filter, @@ -313,6 +325,9 @@ impl Stream for EntityStre subscription_key, take, skip, + with_snapshot, + after, + snapshot_limit, } = std::mem::replace(&mut this.state, EntityStreamState::Invalid) else { unreachable!() @@ -326,7 +341,14 @@ impl Stream for EntityStre let view = subscription_view.clone(); let key = subscription_key.clone(); let fut = Box::pin(async move { - conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip) + let opts = SubscriptionOptions { + take, + skip, + with_snapshot, + after, + snapshot_limit, + }; + conn.ensure_subscription_with_opts(&view, key.as_deref(), opts) .await; }); @@ -429,6 +451,9 @@ enum RichEntityStreamState { subscription_key: Option, take: Option, skip: Option, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, }, Active { inner: BroadcastStream, @@ -481,6 +506,9 @@ impl RichEntityStream { subscription_key, None, None, + None, + None, + None, ) } @@ -494,6 +522,9 @@ impl RichEntityStream { subscription_key: Option, take: Option, skip: Option, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, ) -> Self { Self { state: RichEntityStreamState::Lazy { @@ -503,6 +534,9 @@ impl RichEntityStream { subscription_key, take, skip, + with_snapshot, + after, + snapshot_limit, }, view: entity_name, key_filter, @@ -527,6 +561,9 @@ impl Stream for RichEntity subscription_key, take, skip, + with_snapshot, + after, + snapshot_limit, } = std::mem::replace(&mut this.state, RichEntityStreamState::Invalid) else { unreachable!() @@ -540,7 +577,14 @@ impl Stream for RichEntity let view = subscription_view.clone(); let key = subscription_key.clone(); let fut = Box::pin(async move { - conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip) + let opts = SubscriptionOptions { + take, + skip, + with_snapshot, + after, + snapshot_limit, + }; + conn.ensure_subscription_with_opts(&view, key.as_deref(), opts) .await; }); @@ -890,6 +934,9 @@ enum UseStreamState { subscription_key: Option, take: Option, skip: Option, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, }, Active { inner: BroadcastStream, @@ -942,6 +989,9 @@ impl UseStream { subscription_key, None, None, + None, + None, + None, ) } @@ -955,6 +1005,9 @@ impl UseStream { subscription_key: Option, take: Option, skip: Option, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, ) -> Self { Self { state: UseStreamState::Lazy { @@ -964,6 +1017,9 @@ impl UseStream { subscription_key, take, skip, + with_snapshot, + after, + snapshot_limit, }, view: entity_name, key_filter, @@ -1012,6 +1068,9 @@ impl Stream for UseStream< subscription_key, take, skip, + with_snapshot, + after, + snapshot_limit, } = std::mem::replace(&mut this.state, UseStreamState::Invalid) else { unreachable!() @@ -1024,7 +1083,14 @@ impl Stream for UseStream< let view = subscription_view.clone(); let key = subscription_key.clone(); let fut = Box::pin(async move { - conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip) + let opts = SubscriptionOptions { + take, + skip, + with_snapshot, + after, + snapshot_limit, + }; + conn.ensure_subscription_with_opts(&view, key.as_deref(), opts) .await; }); diff --git a/rust/hyperstack-sdk/src/subscription.rs b/rust/hyperstack-sdk/src/subscription.rs index d6fae06d..f3cc0edd 100644 --- a/rust/hyperstack-sdk/src/subscription.rs +++ b/rust/hyperstack-sdk/src/subscription.rs @@ -10,6 +10,7 @@ pub enum ClientMessage { } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] pub struct Subscription { pub view: String, #[serde(skip_serializing_if = "Option::is_none")] @@ -22,6 +23,15 @@ pub struct Subscription { pub take: Option, #[serde(skip_serializing_if = "Option::is_none")] pub skip: Option, + /// Whether to include initial snapshot (defaults to true for backward compatibility) + #[serde(skip_serializing_if = "Option::is_none")] + pub with_snapshot: Option, + /// Cursor for resuming from a specific point (_seq value) + #[serde(skip_serializing_if = "Option::is_none")] + pub after: Option, + /// Maximum number of entities to include in snapshot (pagination hint) + #[serde(skip_serializing_if = "Option::is_none")] + pub snapshot_limit: Option, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -67,6 +77,9 @@ impl Subscription { filters: None, take: None, skip: None, + with_snapshot: None, + after: None, + snapshot_limit: None, } } @@ -90,6 +103,24 @@ impl Subscription { self } + /// Set whether to include the initial snapshot (defaults to true) + pub fn with_snapshot(mut self, with_snapshot: bool) -> Self { + self.with_snapshot = Some(with_snapshot); + self + } + + /// Set the cursor to resume from (for reconnecting and getting only newer data) + pub fn after(mut self, cursor: impl Into) -> Self { + self.after = Some(cursor.into()); + self + } + + /// Set the maximum number of entities to include in the snapshot + pub fn with_snapshot_limit(mut self, limit: usize) -> Self { + self.snapshot_limit = Some(limit); + self + } + pub fn sub_key(&self) -> String { let filters_str = self .filters diff --git a/rust/hyperstack-sdk/src/view.rs b/rust/hyperstack-sdk/src/view.rs index 96fdf5d0..1b379129 100644 --- a/rust/hyperstack-sdk/src/view.rs +++ b/rust/hyperstack-sdk/src/view.rs @@ -149,6 +149,9 @@ where take: Option, skip: Option, filters: Option>, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, stream: Option>, } @@ -170,6 +173,9 @@ where take: None, skip: None, filters: None, + with_snapshot: None, + after: None, + snapshot_limit: None, stream: None, } } @@ -193,6 +199,24 @@ where .insert(key.into(), value.into()); self } + + /// Set whether to include the initial snapshot (defaults to true). + pub fn with_snapshot(mut self, with_snapshot: bool) -> Self { + self.with_snapshot = Some(with_snapshot); + self + } + + /// Set the cursor to resume from (for reconnecting and getting only newer data). + pub fn after(mut self, cursor: impl Into) -> Self { + self.after = Some(cursor.into()); + self + } + + /// Set the maximum number of entities to include in the snapshot. + pub fn with_snapshot_limit(mut self, limit: usize) -> Self { + self.snapshot_limit = Some(limit); + self + } } impl Stream for UseBuilder @@ -214,6 +238,9 @@ where None, this.take, this.skip, + this.with_snapshot, + this.after.clone(), + this.snapshot_limit, )); } @@ -233,6 +260,9 @@ where take: Option, skip: Option, filters: Option>, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, stream: Option>, } @@ -254,6 +284,9 @@ where take: None, skip: None, filters: None, + with_snapshot: None, + after: None, + snapshot_limit: None, stream: None, } } @@ -278,6 +311,24 @@ where self } + /// Set whether to include the initial snapshot (defaults to true). + pub fn with_snapshot(mut self, with_snapshot: bool) -> Self { + self.with_snapshot = Some(with_snapshot); + self + } + + /// Set the cursor to resume from (for reconnecting and getting only newer data). + pub fn after(mut self, cursor: impl Into) -> Self { + self.after = Some(cursor.into()); + self + } + + /// Set the maximum number of entities to include in the snapshot. + pub fn with_snapshot_limit(mut self, limit: usize) -> Self { + self.snapshot_limit = Some(limit); + self + } + /// Get a rich stream with before/after diffs instead. pub fn rich(self) -> RichEntityStream { RichEntityStream::new_lazy_with_opts( @@ -289,6 +340,9 @@ where None, self.take, self.skip, + self.with_snapshot, + self.after, + self.snapshot_limit, ) } } @@ -312,6 +366,9 @@ where None, this.take, this.skip, + this.with_snapshot, + this.after.clone(), + this.snapshot_limit, )); } @@ -331,6 +388,9 @@ where take: Option, skip: Option, filters: Option>, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, stream: Option>, } @@ -352,6 +412,9 @@ where take: None, skip: None, filters: None, + with_snapshot: None, + after: None, + snapshot_limit: None, stream: None, } } @@ -372,6 +435,24 @@ where .insert(key.into(), value.into()); self } + + /// Set whether to include the initial snapshot (defaults to true). + pub fn with_snapshot(mut self, with_snapshot: bool) -> Self { + self.with_snapshot = Some(with_snapshot); + self + } + + /// Set the cursor to resume from (for reconnecting and getting only newer data). + pub fn after(mut self, cursor: impl Into) -> Self { + self.after = Some(cursor.into()); + self + } + + /// Set the maximum number of entities to include in the snapshot. + pub fn with_snapshot_limit(mut self, limit: usize) -> Self { + self.snapshot_limit = Some(limit); + self + } } impl Stream for RichWatchBuilder @@ -393,6 +474,9 @@ where None, this.take, this.skip, + this.with_snapshot, + this.after.clone(), + this.snapshot_limit, )); } diff --git a/rust/hyperstack-server/src/cache.rs b/rust/hyperstack-server/src/cache.rs index 544c2415..266de235 100644 --- a/rust/hyperstack-server/src/cache.rs +++ b/rust/hyperstack-server/src/cache.rs @@ -16,6 +16,19 @@ const DEFAULT_MAX_ARRAY_LENGTH: usize = 100; const DEFAULT_INITIAL_SNAPSHOT_BATCH_SIZE: usize = 50; const DEFAULT_SUBSEQUENT_SNAPSHOT_BATCH_SIZE: usize = 100; +/// Compare two `_seq` values numerically. +/// `_seq` format is "{slot}:{offset}" where slot is not zero-padded. +/// This handles digit-count boundaries correctly (e.g., 99999999 < 100000000). +pub fn cmp_seq(a: &str, b: &str) -> std::cmp::Ordering { + fn parse(s: &str) -> (u64, u64) { + let mut parts = s.splitn(2, ':'); + let slot = parts.next().and_then(|p| p.parse().ok()).unwrap_or(0); + let offset = parts.next().and_then(|p| p.parse().ok()).unwrap_or(0); + (slot, offset) + } + parse(a).cmp(&parse(b)) +} + /// Configuration for the entity cache #[derive(Debug, Clone)] pub struct EntityCacheConfig { @@ -109,6 +122,50 @@ impl EntityCache { .unwrap_or_default() } + /// Get entities with _seq greater than the provided cursor. + /// + /// Returns entities that have been updated after the given cursor, + /// sorted by _seq in ascending order. Useful for resuming from + /// a specific point in the stream. + pub async fn get_after( + &self, + view_id: &str, + cursor: &str, + limit: Option, + ) -> Vec<(String, Value)> { + let caches = self.caches.read().await; + + if let Some(cache) = caches.get(view_id) { + let mut results: Vec<(String, Value)> = cache + .iter() + .filter(|(_, entity)| { + entity + .get("_seq") + .and_then(|s| s.as_str()) + .map(|seq| cmp_seq(seq, cursor) == std::cmp::Ordering::Greater) + .unwrap_or(false) + }) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + + // Sort by _seq (ascending) + results.sort_by(|a, b| { + let seq_a = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + let seq_b = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + cmp_seq(seq_a, seq_b) + }); + + // Apply limit if provided + if let Some(limit) = limit { + results.truncate(limit); + } + + results + } else { + vec![] + } + } + /// Get a specific entity from the cache pub async fn get(&self, view_id: &str, key: &str) -> Option { let caches = self.caches.read().await; @@ -554,4 +611,120 @@ mod tests { assert_eq!(snapshot_config.initial_batch_size, 25); assert_eq!(snapshot_config.subsequent_batch_size, 75); } + + #[tokio::test] + async fn test_get_after() { + let cache = EntityCache::new(); + + // Insert entities with _seq values + cache + .upsert( + "tokens/list", + "key1", + json!({"id": 1, "_seq": "100:000000000001"}), + ) + .await; + cache + .upsert( + "tokens/list", + "key2", + json!({"id": 2, "_seq": "100:000000000002"}), + ) + .await; + cache + .upsert( + "tokens/list", + "key3", + json!({"id": 3, "_seq": "100:000000000003"}), + ) + .await; + cache + .upsert( + "tokens/list", + "key4", + json!({"id": 4, "_seq": "101:000000000001"}), + ) + .await; + + // Get all entities after "100:000000000002" + let after = cache.get_after("tokens/list", "100:000000000002", None).await; + + // Should return key3 and key4 (sorted by _seq) + assert_eq!(after.len(), 2); + assert_eq!(after[0].0, "key3"); + assert_eq!(after[1].0, "key4"); + } + + #[tokio::test] + async fn test_get_after_with_limit() { + let cache = EntityCache::new(); + + // Insert entities with _seq values + cache + .upsert( + "tokens/list", + "key1", + json!({"id": 1, "_seq": "100:000000000001"}), + ) + .await; + cache + .upsert( + "tokens/list", + "key2", + json!({"id": 2, "_seq": "100:000000000002"}), + ) + .await; + cache + .upsert( + "tokens/list", + "key3", + json!({"id": 3, "_seq": "100:000000000003"}), + ) + .await; + + // Get entities after "100:000000000000" with limit 2 + let after = cache.get_after("tokens/list", "100:000000000000", Some(2)).await; + + // Should return only first 2 (key1 and key2) + assert_eq!(after.len(), 2); + assert_eq!(after[0].0, "key1"); + assert_eq!(after[1].0, "key2"); + } + + #[tokio::test] + async fn test_get_after_empty_result() { + let cache = EntityCache::new(); + + cache + .upsert( + "tokens/list", + "key1", + json!({"id": 1, "_seq": "100:000000000001"}), + ) + .await; + + // Get entities after a future cursor + let after = cache.get_after("tokens/list", "999:000000000000", None).await; + + assert!(after.is_empty()); + } + + #[tokio::test] + async fn test_get_after_missing_seq() { + let cache = EntityCache::new(); + + // Insert entity without _seq + cache + .upsert( + "tokens/list", + "key1", + json!({"id": 1}), + ) + .await; + + // Get entities after any cursor - entity without _seq should not be included + let after = cache.get_after("tokens/list", "0:000000000000", None).await; + + assert!(after.is_empty()); + } } diff --git a/rust/hyperstack-server/src/projector.rs b/rust/hyperstack-server/src/projector.rs index 0bf0d6de..874b9cd2 100644 --- a/rust/hyperstack-server/src/projector.rs +++ b/rust/hyperstack-server/src/projector.rs @@ -168,6 +168,9 @@ impl Projector { let mut projected = spec.projection.apply(patch_data); transform_large_u64_to_strings(&mut projected); + // Extract _seq from the patch data to include in the frame + let seq = slot_context.map(|ctx| ctx.to_seq_string()); + let frame = Frame { mode: spec.mode, export: spec.id.clone(), @@ -175,6 +178,7 @@ impl Projector { key: key.clone(), data: projected, append: append.clone(), + seq, }; json_buffer.clear(); diff --git a/rust/hyperstack-server/src/websocket/frame.rs b/rust/hyperstack-server/src/websocket/frame.rs index 8196a6c7..3b6dd32f 100644 --- a/rust/hyperstack-server/src/websocket/frame.rs +++ b/rust/hyperstack-server/src/websocket/frame.rs @@ -65,6 +65,9 @@ pub struct Frame { pub data: serde_json::Value, #[serde(skip_serializing_if = "Vec::is_empty", default)] pub append: Vec, + /// Sequence cursor for ordering and resume capability + #[serde(skip_serializing_if = "Option::is_none")] + pub seq: Option, } /// A single entity within a snapshot @@ -149,6 +152,7 @@ mod tests { key: "123".to_string(), data: serde_json::json!({}), append: vec![], + seq: None, }; assert_eq!(frame.entity(), "SettlementGame/list"); @@ -164,6 +168,7 @@ mod tests { key: "123".to_string(), data: serde_json::json!({"gameId": "123"}), append: vec![], + seq: None, }; let json = serde_json::to_value(&frame).unwrap(); @@ -173,6 +178,39 @@ mod tests { assert_eq!(json["key"], "123"); } + #[test] + fn test_frame_with_seq() { + let frame = Frame { + mode: Mode::List, + export: "SettlementGame/list".to_string(), + op: "upsert", + key: "123".to_string(), + data: serde_json::json!({"gameId": "123"}), + append: vec![], + seq: Some("123456789:000000000042".to_string()), + }; + + let json = serde_json::to_value(&frame).unwrap(); + assert_eq!(json["op"], "upsert"); + assert_eq!(json["seq"], "123456789:000000000042"); + } + + #[test] + fn test_frame_seq_skipped_when_none() { + let frame = Frame { + mode: Mode::List, + export: "SettlementGame/list".to_string(), + op: "upsert", + key: "123".to_string(), + data: serde_json::json!({"gameId": "123"}), + append: vec![], + seq: None, + }; + + let json = serde_json::to_value(&frame).unwrap(); + assert!(json.get("seq").is_none()); + } + #[test] fn test_snapshot_frame_complete_serialization() { let frame = SnapshotFrame { diff --git a/rust/hyperstack-server/src/websocket/server.rs b/rust/hyperstack-server/src/websocket/server.rs index 39f9efda..d2b7d290 100644 --- a/rust/hyperstack-server/src/websocket/server.rs +++ b/rust/hyperstack-server/src/websocket/server.rs @@ -1,5 +1,5 @@ use crate::bus::BusManager; -use crate::cache::{EntityCache, SnapshotBatchConfig}; +use crate::cache::{cmp_seq, EntityCache, SnapshotBatchConfig}; use crate::compression::maybe_compress; use crate::view::{ViewIndex, ViewSpec}; use crate::websocket::client_manager::ClientManager; @@ -581,28 +581,36 @@ async fn attach_client_to_bus( let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await; - if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await { - transform_large_u64_to_strings(&mut cached_entity); - let snapshot_entities = vec![SnapshotEntity { - key: key.to_string(), - data: cached_entity, - }]; - let batch_config = ctx.entity_cache.snapshot_config(); - let _ = send_snapshot_batches( - ctx.client_id, - &snapshot_entities, - view_spec.mode, - view_id, - ctx.client_manager, - &batch_config, - #[cfg(feature = "otel")] - ctx.metrics.as_ref(), - ) - .await; + // Check if we should send snapshot (defaults to true for backward compatibility) + let should_send_snapshot = subscription.with_snapshot.unwrap_or(true); + + if should_send_snapshot { + if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await { + transform_large_u64_to_strings(&mut cached_entity); + let snapshot_entities = vec![SnapshotEntity { + key: key.to_string(), + data: cached_entity, + }]; + let batch_config = ctx.entity_cache.snapshot_config(); + let _ = send_snapshot_batches( + ctx.client_id, + &snapshot_entities, + view_spec.mode, + view_id, + ctx.client_manager, + &batch_config, + #[cfg(feature = "otel")] + ctx.metrics.as_ref(), + ) + .await; + rx.borrow_and_update(); + } else if !rx.borrow().is_empty() { + let data = rx.borrow_and_update().clone(); + let _ = ctx.client_manager.send_to_client(ctx.client_id, data); + } + } else { + info!("Client {} subscribed to {} without snapshot", ctx.client_id, view_id); rx.borrow_and_update(); - } else if !rx.borrow().is_empty() { - let data = rx.borrow_and_update().clone(); - let _ = ctx.client_manager.send_to_client(ctx.client_id, data); } let client_id = ctx.client_id; @@ -639,33 +647,58 @@ async fn attach_client_to_bus( Mode::List | Mode::Append => { let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await; - let snapshots = ctx.entity_cache.get_all(view_id).await; - let snapshot_entities: Vec = snapshots - .into_iter() - .filter(|(key, _)| subscription.matches_key(key)) - .map(|(key, mut data)| { - transform_large_u64_to_strings(&mut data); - SnapshotEntity { key, data } - }) - .collect(); - - if !snapshot_entities.is_empty() { - let batch_config = ctx.entity_cache.snapshot_config(); - if send_snapshot_batches( - ctx.client_id, - &snapshot_entities, - view_spec.mode, - view_id, - ctx.client_manager, - &batch_config, - #[cfg(feature = "otel")] - ctx.metrics.as_ref(), - ) - .await - .is_err() - { - return; + // Check if we should send snapshot (defaults to true for backward compatibility) + let should_send_snapshot = subscription.with_snapshot.unwrap_or(true); + + if should_send_snapshot { + // Determine which entities to send based on cursor + let mut snapshots = if let Some(ref cursor) = subscription.after { + ctx.entity_cache.get_after(view_id, cursor, subscription.snapshot_limit).await + } else { + ctx.entity_cache.get_all(view_id).await + }; + + // Sort by _seq descending only when there is no cursor (to get most-recent N from full cache) + if let Some(limit) = subscription.snapshot_limit { + if subscription.after.is_none() { + snapshots.sort_by(|a, b| { + let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + cmp_seq(sb, sa) // descending: most-recent N + }); + snapshots.truncate(limit); + } + } + + let snapshot_entities: Vec = snapshots + .into_iter() + .filter(|(key, _)| subscription.matches_key(key)) + .map(|(key, mut data)| { + transform_large_u64_to_strings(&mut data); + SnapshotEntity { key, data } + }) + .collect(); + + if !snapshot_entities.is_empty() { + let batch_config = ctx.entity_cache.snapshot_config(); + if send_snapshot_batches( + ctx.client_id, + &snapshot_entities, + view_spec.mode, + view_id, + ctx.client_manager, + &batch_config, + #[cfg(feature = "otel")] + ctx.metrics.as_ref(), + ) + .await + .is_err() + { + return; + } } + } else { + info!("Client {} subscribed to {} without snapshot", ctx.client_id, view_id); } let client_id = ctx.client_id; @@ -820,6 +853,7 @@ async fn attach_derived_view_subscription_otel( if let Some((new_key, data)) = new_window.first() { for old_key in current_window_keys.difference(&new_keys) { let delete_frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "delete", @@ -841,6 +875,7 @@ async fn attach_derived_view_subscription_otel( let mut transformed_data = data.clone(); transform_large_u64_to_strings(&mut transformed_data); let frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "upsert", @@ -862,6 +897,7 @@ async fn attach_derived_view_subscription_otel( } else { for key in current_window_keys.difference(&new_keys) { let delete_frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "delete", @@ -884,6 +920,7 @@ async fn attach_derived_view_subscription_otel( let mut transformed_data = data.clone(); transform_large_u64_to_strings(&mut transformed_data); let frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "upsert", @@ -959,26 +996,34 @@ async fn attach_client_to_bus( let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await; - if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await { - transform_large_u64_to_strings(&mut cached_entity); - let snapshot_entities = vec![SnapshotEntity { - key: key.to_string(), - data: cached_entity, - }]; - let batch_config = ctx.entity_cache.snapshot_config(); - let _ = send_snapshot_batches( - ctx.client_id, - &snapshot_entities, - view_spec.mode, - view_id, - ctx.client_manager, - &batch_config, - ) - .await; + // Check if we should send snapshot (defaults to true for backward compatibility) + let should_send_snapshot = subscription.with_snapshot.unwrap_or(true); + + if should_send_snapshot { + if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await { + transform_large_u64_to_strings(&mut cached_entity); + let snapshot_entities = vec![SnapshotEntity { + key: key.to_string(), + data: cached_entity, + }]; + let batch_config = ctx.entity_cache.snapshot_config(); + let _ = send_snapshot_batches( + ctx.client_id, + &snapshot_entities, + view_spec.mode, + view_id, + ctx.client_manager, + &batch_config, + ) + .await; + rx.borrow_and_update(); + } else if !rx.borrow().is_empty() { + let data = rx.borrow_and_update().clone(); + let _ = ctx.client_manager.send_to_client(ctx.client_id, data); + } + } else { + info!("Client {} subscribed to {} without snapshot", ctx.client_id, view_id); rx.borrow_and_update(); - } else if !rx.borrow().is_empty() { - let data = rx.borrow_and_update().clone(); - let _ = ctx.client_manager.send_to_client(ctx.client_id, data); } let client_id = ctx.client_id; @@ -1011,31 +1056,56 @@ async fn attach_client_to_bus( Mode::List | Mode::Append => { let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await; - let snapshots = ctx.entity_cache.get_all(view_id).await; - let snapshot_entities: Vec = snapshots - .into_iter() - .filter(|(key, _)| subscription.matches_key(key)) - .map(|(key, mut data)| { - transform_large_u64_to_strings(&mut data); - SnapshotEntity { key, data } - }) - .collect(); - - if !snapshot_entities.is_empty() { - let batch_config = ctx.entity_cache.snapshot_config(); - if send_snapshot_batches( - ctx.client_id, - &snapshot_entities, - view_spec.mode, - view_id, - ctx.client_manager, - &batch_config, - ) - .await - .is_err() - { - return; + // Check if we should send snapshot (defaults to true for backward compatibility) + let should_send_snapshot = subscription.with_snapshot.unwrap_or(true); + + if should_send_snapshot { + // Determine which entities to send based on cursor + let mut snapshots = if let Some(ref cursor) = subscription.after { + ctx.entity_cache.get_after(view_id, cursor, subscription.snapshot_limit).await + } else { + ctx.entity_cache.get_all(view_id).await + }; + + // Sort by _seq descending only when there is no cursor (to get most-recent N from full cache) + if let Some(limit) = subscription.snapshot_limit { + if subscription.after.is_none() { + snapshots.sort_by(|a, b| { + let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + cmp_seq(sb, sa) // descending: most-recent N + }); + snapshots.truncate(limit); + } + } + + let snapshot_entities: Vec = snapshots + .into_iter() + .filter(|(key, _)| subscription.matches_key(key)) + .map(|(key, mut data)| { + transform_large_u64_to_strings(&mut data); + SnapshotEntity { key, data } + }) + .collect(); + + if !snapshot_entities.is_empty() { + let batch_config = ctx.entity_cache.snapshot_config(); + if send_snapshot_batches( + ctx.client_id, + &snapshot_entities, + view_spec.mode, + view_id, + ctx.client_manager, + &batch_config, + ) + .await + .is_err() + { + return; + } } + } else { + info!("Client {} subscribed to {} without snapshot", ctx.client_id, view_id); } let client_id = ctx.client_id; @@ -1183,6 +1253,7 @@ async fn attach_derived_view_subscription( if let Some((new_key, data)) = new_window.first() { for old_key in current_window_keys.difference(&new_keys) { let delete_frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "delete", @@ -1201,6 +1272,7 @@ async fn attach_derived_view_subscription( let mut transformed_data = data.clone(); transform_large_u64_to_strings(&mut transformed_data); let frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "upsert", @@ -1218,6 +1290,7 @@ async fn attach_derived_view_subscription( } else { for key in current_window_keys.difference(&new_keys) { let delete_frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "delete", @@ -1237,6 +1310,7 @@ async fn attach_derived_view_subscription( let mut transformed_data = data.clone(); transform_large_u64_to_strings(&mut transformed_data); let frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "upsert", diff --git a/rust/hyperstack-server/src/websocket/subscription.rs b/rust/hyperstack-server/src/websocket/subscription.rs index 75ed6f40..8785acb7 100644 --- a/rust/hyperstack-server/src/websocket/subscription.rs +++ b/rust/hyperstack-server/src/websocket/subscription.rs @@ -14,6 +14,7 @@ pub enum ClientMessage { /// Client subscription to a specific view #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct Subscription { pub view: String, #[serde(skip_serializing_if = "Option::is_none")] @@ -26,6 +27,19 @@ pub struct Subscription { /// Number of items to skip (for windowed subscriptions) #[serde(skip_serializing_if = "Option::is_none")] pub skip: Option, + /// Whether to include initial snapshot (defaults to true for backward compatibility) + #[serde(skip_serializing_if = "Option::is_none")] + pub with_snapshot: Option, + /// Cursor for resuming from a specific point (_seq value). + /// Note: Ignored for State mode subscriptions (single entity, no pagination). + /// Note: Not supported for derived views (windowed aggregations with sort). Derived views + /// always emit `seq: None` in live update frames, so cursor-based reconnection is unavailable. + #[serde(skip_serializing_if = "Option::is_none")] + pub after: Option, + /// Maximum number of entities to include in snapshot (pagination hint). + /// Note: Ignored for State mode subscriptions (single entity). + #[serde(skip_serializing_if = "Option::is_none")] + pub snapshot_limit: Option, } /// Client unsubscription request @@ -103,6 +117,9 @@ mod tests { partition: None, take: None, skip: None, + with_snapshot: None, + after: None, + snapshot_limit: None, }; assert!(sub.matches("SettlementGame/list", "835")); @@ -118,6 +135,9 @@ mod tests { partition: None, take: None, skip: None, + with_snapshot: None, + after: None, + snapshot_limit: None, }; assert!(sub.matches("SettlementGame/list", "835")); @@ -189,6 +209,9 @@ mod tests { partition: None, take: None, skip: None, + with_snapshot: None, + after: None, + snapshot_limit: None, }; assert_eq!(sub.sub_key(), "SettlementGame/list:835"); } @@ -201,6 +224,9 @@ mod tests { partition: None, take: None, skip: None, + with_snapshot: None, + after: None, + snapshot_limit: None, }; assert_eq!(sub.sub_key(), "SettlementGame/list:*"); } @@ -239,4 +265,72 @@ mod tests { _ => panic!("Expected Subscribe"), } } + + #[test] + fn test_subscription_with_optional_snapshot() { + let json = json!({ + "type": "subscribe", + "view": "SettlementGame/list", + "withSnapshot": false + }); + + let msg: ClientMessage = serde_json::from_value(json).unwrap(); + match msg { + ClientMessage::Subscribe(sub) => { + assert_eq!(sub.view, "SettlementGame/list"); + assert_eq!(sub.with_snapshot, Some(false)); + } + _ => panic!("Expected Subscribe"), + } + } + + #[test] + fn test_subscription_with_after_cursor() { + let json = json!({ + "type": "subscribe", + "view": "SettlementGame/list", + "after": "123456789:000000000042" + }); + + let msg: ClientMessage = serde_json::from_value(json).unwrap(); + match msg { + ClientMessage::Subscribe(sub) => { + assert_eq!(sub.view, "SettlementGame/list"); + assert_eq!(sub.after, Some("123456789:000000000042".to_string())); + } + _ => panic!("Expected Subscribe"), + } + } + + #[test] + fn test_subscription_with_snapshot_limit() { + let json = json!({ + "type": "subscribe", + "view": "SettlementGame/list", + "after": "123456789:000000000042", + "snapshotLimit": 100 + }); + + let msg: ClientMessage = serde_json::from_value(json).unwrap(); + match msg { + ClientMessage::Subscribe(sub) => { + assert_eq!(sub.view, "SettlementGame/list"); + assert_eq!(sub.after, Some("123456789:000000000042".to_string())); + assert_eq!(sub.snapshot_limit, Some(100)); + } + _ => panic!("Expected Subscribe"), + } + } + + #[test] + fn test_subscription_defaults_with_snapshot_to_true() { + let json = json!({ + "view": "SettlementGame/list" + }); + + let sub: Subscription = serde_json::from_value(json).unwrap(); + assert_eq!(sub.with_snapshot, None); + // When None, server should default to true + assert!(sub.with_snapshot.unwrap_or(true)); + } } diff --git a/stacks/ore/Cargo.lock b/stacks/ore/Cargo.lock index 085f1c24..16c2c2e1 100644 --- a/stacks/ore/Cargo.lock +++ b/stacks/ore/Cargo.lock @@ -1436,7 +1436,7 @@ dependencies = [ [[package]] name = "hyperstack" -version = "0.5.8" +version = "0.5.10" dependencies = [ "anyhow", "bs58", @@ -1474,7 +1474,7 @@ dependencies = [ [[package]] name = "hyperstack-interpreter" -version = "0.5.8" +version = "0.5.10" dependencies = [ "bs58", "dashmap", @@ -1501,7 +1501,7 @@ dependencies = [ [[package]] name = "hyperstack-macros" -version = "0.5.8" +version = "0.5.10" dependencies = [ "bs58", "hex", @@ -1516,7 +1516,7 @@ dependencies = [ [[package]] name = "hyperstack-sdk" -version = "0.5.6" +version = "0.5.10" dependencies = [ "anyhow", "flate2", @@ -1533,7 +1533,7 @@ dependencies = [ [[package]] name = "hyperstack-server" -version = "0.5.8" +version = "0.5.10" dependencies = [ "anyhow", "base64 0.22.1", diff --git a/typescript/core/src/frame.ts b/typescript/core/src/frame.ts index 95e4f03e..066a9def 100644 --- a/typescript/core/src/frame.ts +++ b/typescript/core/src/frame.ts @@ -23,6 +23,8 @@ export interface EntityFrame { key: string; data: T; append?: string[]; + /** Sequence cursor for ordering and resume capability */ + seq?: string; } export interface SnapshotEntity { diff --git a/typescript/core/src/types.ts b/typescript/core/src/types.ts index 41e232f4..c144cada 100644 --- a/typescript/core/src/types.ts +++ b/typescript/core/src/types.ts @@ -41,6 +41,12 @@ export interface Subscription { filters?: Record; take?: number; skip?: number; + /** Whether to include initial snapshot (defaults to true for backward compatibility) */ + withSnapshot?: boolean; + /** Cursor for resuming from a specific point (_seq value) */ + after?: string; + /** Maximum number of entities to include in snapshot (pagination hint) */ + snapshotLimit?: number; } export type SchemaResult = @@ -56,6 +62,12 @@ export interface WatchOptions { skip?: number; filters?: Record; schema?: Schema; + /** Whether to include initial snapshot (defaults to true) */ + withSnapshot?: boolean; + /** Cursor for resuming from a specific point (_seq value) */ + after?: string; + /** Maximum number of entities to include in snapshot */ + snapshotLimit?: number; } export interface HyperStackOptions { diff --git a/typescript/react/src/types.ts b/typescript/react/src/types.ts index d52b51c0..e2a1ccb2 100644 --- a/typescript/react/src/types.ts +++ b/typescript/react/src/types.ts @@ -60,6 +60,12 @@ export interface ViewHookOptions { refreshOnReconnect?: boolean; /** Schema to validate entities. Returns undefined if validation fails. */ schema?: Schema; + /** Whether to include initial snapshot (defaults to true) */ + withSnapshot?: boolean; + /** Cursor for resuming from a specific point (_seq value) */ + after?: string; + /** Maximum number of entities to include in snapshot */ + snapshotLimit?: number; } export interface ViewHookResult { @@ -77,6 +83,12 @@ export interface ListParamsBase { skip?: number; /** Schema to validate/filter entities. Only entities passing safeParse will be returned. */ schema?: Schema; + /** Whether to include initial snapshot (defaults to true) */ + withSnapshot?: boolean; + /** Cursor for resuming from a specific point (_seq value) */ + after?: string; + /** Maximum number of entities to include in snapshot */ + snapshotLimit?: number; } export interface ListParamsSingle extends ListParamsBase { diff --git a/typescript/react/src/view-hooks.ts b/typescript/react/src/view-hooks.ts index 842681d7..6c214d8b 100644 --- a/typescript/react/src/view-hooks.ts +++ b/typescript/react/src/view-hooks.ts @@ -18,7 +18,7 @@ export function useStateView( key?: Record, options?: ViewHookOptions ): ViewHookResult { - const [isLoading, setIsLoading] = useState(!options?.initialData); + const [isLoading, setIsLoading] = useState(!options?.initialData && options?.withSnapshot !== false); const [error, setError] = useState(); const clientRef = useRef(client); clientRef.current = client; @@ -27,14 +27,25 @@ export function useStateView( const keyString = key ? Object.values(key)[0] : undefined; const enabled = options?.enabled !== false; const schema = options?.schema as Schema | undefined; + const withSnapshot = options?.withSnapshot; + const after = options?.after; + const snapshotLimit = options?.snapshotLimit; useEffect(() => { if (!enabled || !clientRef.current) return undefined; try { const registry = clientRef.current.getSubscriptionRegistry(); - const unsubscribe = registry.subscribe({ view: viewDef.view, key: keyString }); - setIsLoading(true); + const unsubscribe = registry.subscribe({ + view: viewDef.view, + key: keyString, + withSnapshot, + after, + snapshotLimit + }); + if (withSnapshot !== false) { + setIsLoading(true); + } return () => { try { @@ -48,15 +59,24 @@ export function useStateView( setIsLoading(false); return undefined; } - }, [viewDef.view, keyString, enabled, client]); + }, [viewDef.view, keyString, enabled, withSnapshot, after, snapshotLimit, client]); const refresh = useCallback(() => { if (!enabled || !clientRef.current) return; try { const registry = clientRef.current.getSubscriptionRegistry(); - const unsubscribe = registry.subscribe({ view: viewDef.view, key: keyString }); - setIsLoading(true); + const unsubscribe = registry.subscribe({ + view: viewDef.view, + key: keyString, + withSnapshot, + after, + snapshotLimit + }); + const shouldLoad = withSnapshot ?? true; + if (shouldLoad) { + setIsLoading(true); + } setTimeout(() => { try { @@ -69,7 +89,7 @@ export function useStateView( setError(err instanceof Error ? err : new Error('Refresh failed')); setIsLoading(false); } - }, [viewDef.view, keyString, enabled]); + }, [viewDef.view, keyString, enabled, withSnapshot, after, snapshotLimit]); const subscribe = useCallback((callback: () => void) => { if (!clientRef.current) return () => {}; @@ -114,7 +134,7 @@ export function useListView( params?: ListParams, options?: ViewHookOptions ): ViewHookResult { - const [isLoading, setIsLoading] = useState(!options?.initialData); + const [isLoading, setIsLoading] = useState(!options?.initialData && params?.withSnapshot !== false); const [error, setError] = useState(); const clientRef = useRef(client); clientRef.current = client; @@ -128,6 +148,9 @@ export function useListView( const filtersJson = params?.filters ? JSON.stringify(params.filters) : undefined; const limit = params?.limit; const schema = params?.schema as Schema | undefined; + const withSnapshot = params?.withSnapshot; + const after = params?.after; + const snapshotLimit = params?.snapshotLimit; useEffect(() => { if (!enabled || !clientRef.current) return undefined; @@ -139,9 +162,14 @@ export function useListView( key, filters: params?.filters, take, - skip + skip, + withSnapshot, + after, + snapshotLimit }); - setIsLoading(true); + if (withSnapshot !== false) { + setIsLoading(true); + } return () => { try { @@ -155,7 +183,7 @@ export function useListView( setIsLoading(false); return undefined; } - }, [viewDef.view, enabled, key, filtersJson, take, skip, client]); + }, [viewDef.view, enabled, key, filtersJson, take, skip, withSnapshot, after, snapshotLimit, client]); const refresh = useCallback(() => { if (!enabled || !clientRef.current) return; @@ -167,9 +195,15 @@ export function useListView( key, filters: params?.filters, take, - skip + skip, + withSnapshot, + after, + snapshotLimit }); - setIsLoading(true); + const shouldLoad = withSnapshot ?? true; + if (shouldLoad) { + setIsLoading(true); + } setTimeout(() => { try { @@ -182,7 +216,7 @@ export function useListView( setError(err instanceof Error ? err : new Error('Refresh failed')); setIsLoading(false); } - }, [viewDef.view, enabled, key, filtersJson, take, skip]); + }, [viewDef.view, enabled, key, filtersJson, take, skip, withSnapshot, after, snapshotLimit]); const subscribe = useCallback((callback: () => void) => { if (!clientRef.current) return () => {};