From da7b4862375de7a25c0e254d6e90aa2c1e64c546 Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 03:19:43 +0000 Subject: [PATCH 01/21] feat(server): Add optional snapshot and cursor-based filtering to WebSocket protocol Implement HYP-148 to make snapshots optional and support cursor-based filtering: - Add with_snapshot, after, and snapshot_limit fields to Subscription struct - Add seq field to Frame for live update cursor tracking - Implement EntityCache::get_after() for filtering entities by cursor - Update WebSocket server to respect with_snapshot flag - Update projector to include seq in Frame from SlotContext Clients can now: - Skip initial snapshot with with_snapshot: false - Resume from specific point with after cursor (_seq value) - Limit snapshot size with snapshot_limit - Track position via seq field in live updates --- rust/hyperstack-server/src/cache.rs | 160 ++++++++++++ rust/hyperstack-server/src/projector.rs | 4 + rust/hyperstack-server/src/websocket/frame.rs | 38 +++ .../hyperstack-server/src/websocket/server.rs | 232 +++++++++++------- .../src/websocket/subscription.rs | 89 +++++++ 5 files changed, 431 insertions(+), 92 deletions(-) diff --git a/rust/hyperstack-server/src/cache.rs b/rust/hyperstack-server/src/cache.rs index 544c2415..c217e6c9 100644 --- a/rust/hyperstack-server/src/cache.rs +++ b/rust/hyperstack-server/src/cache.rs @@ -109,6 +109,50 @@ impl EntityCache { .unwrap_or_default() } + /// Get entities with _seq greater than the provided cursor. + /// + /// Returns entities that have been updated after the given cursor, + /// sorted by _seq in ascending order. Useful for resuming from + /// a specific point in the stream. + pub async fn get_after( + &self, + view_id: &str, + cursor: &str, + limit: Option, + ) -> Vec<(String, Value)> { + let caches = self.caches.read().await; + + if let Some(cache) = caches.get(view_id) { + let mut results: Vec<(String, Value)> = cache + .iter() + .filter(|(_, entity)| { + entity + .get("_seq") + .and_then(|s| s.as_str()) + .map(|seq| seq > cursor) + .unwrap_or(false) + }) + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + + // Sort by _seq (ascending) + results.sort_by(|a, b| { + let seq_a = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + let seq_b = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + seq_a.cmp(seq_b) + }); + + // Apply limit if provided + if let Some(limit) = limit { + results.truncate(limit); + } + + results + } else { + vec![] + } + } + /// Get a specific entity from the cache pub async fn get(&self, view_id: &str, key: &str) -> Option { let caches = self.caches.read().await; @@ -554,4 +598,120 @@ mod tests { assert_eq!(snapshot_config.initial_batch_size, 25); assert_eq!(snapshot_config.subsequent_batch_size, 75); } + + #[tokio::test] + async fn test_get_after() { + let cache = EntityCache::new(); + + // Insert entities with _seq values + cache + .upsert( + "tokens/list", + "key1", + json!({"id": 1, "_seq": "100:000000000001"}), + ) + .await; + cache + .upsert( + "tokens/list", + "key2", + json!({"id": 2, "_seq": "100:000000000002"}), + ) + .await; + cache + .upsert( + "tokens/list", + "key3", + json!({"id": 3, "_seq": "100:000000000003"}), + ) + .await; + cache + .upsert( + "tokens/list", + "key4", + json!({"id": 4, "_seq": "101:000000000001"}), + ) + .await; + + // Get all entities after "100:000000000002" + let after = cache.get_after("tokens/list", "100:000000000002", None).await; + + // Should return key3 and key4 (sorted by _seq) + assert_eq!(after.len(), 2); + assert_eq!(after[0].0, "key3"); + assert_eq!(after[1].0, "key4"); + } + + #[tokio::test] + async fn test_get_after_with_limit() { + let cache = EntityCache::new(); + + // Insert entities with _seq values + cache + .upsert( + "tokens/list", + "key1", + json!({"id": 1, "_seq": "100:000000000001"}), + ) + .await; + cache + .upsert( + "tokens/list", + "key2", + json!({"id": 2, "_seq": "100:000000000002"}), + ) + .await; + cache + .upsert( + "tokens/list", + "key3", + json!({"id": 3, "_seq": "100:000000000003"}), + ) + .await; + + // Get entities after "100:000000000000" with limit 2 + let after = cache.get_after("tokens/list", "100:000000000000", Some(2)).await; + + // Should return only first 2 (key1 and key2) + assert_eq!(after.len(), 2); + assert_eq!(after[0].0, "key1"); + assert_eq!(after[1].0, "key2"); + } + + #[tokio::test] + async fn test_get_after_empty_result() { + let cache = EntityCache::new(); + + cache + .upsert( + "tokens/list", + "key1", + json!({"id": 1, "_seq": "100:000000000001"}), + ) + .await; + + // Get entities after a future cursor + let after = cache.get_after("tokens/list", "999:000000000000", None).await; + + assert!(after.is_empty()); + } + + #[tokio::test] + async fn test_get_after_missing_seq() { + let cache = EntityCache::new(); + + // Insert entity without _seq + cache + .upsert( + "tokens/list", + "key1", + json!({"id": 1}), + ) + .await; + + // Get entities after any cursor - entity without _seq should not be included + let after = cache.get_after("tokens/list", "0:000000000000", None).await; + + assert!(after.is_empty()); + } } diff --git a/rust/hyperstack-server/src/projector.rs b/rust/hyperstack-server/src/projector.rs index 0bf0d6de..874b9cd2 100644 --- a/rust/hyperstack-server/src/projector.rs +++ b/rust/hyperstack-server/src/projector.rs @@ -168,6 +168,9 @@ impl Projector { let mut projected = spec.projection.apply(patch_data); transform_large_u64_to_strings(&mut projected); + // Extract _seq from the patch data to include in the frame + let seq = slot_context.map(|ctx| ctx.to_seq_string()); + let frame = Frame { mode: spec.mode, export: spec.id.clone(), @@ -175,6 +178,7 @@ impl Projector { key: key.clone(), data: projected, append: append.clone(), + seq, }; json_buffer.clear(); diff --git a/rust/hyperstack-server/src/websocket/frame.rs b/rust/hyperstack-server/src/websocket/frame.rs index 8196a6c7..3b6dd32f 100644 --- a/rust/hyperstack-server/src/websocket/frame.rs +++ b/rust/hyperstack-server/src/websocket/frame.rs @@ -65,6 +65,9 @@ pub struct Frame { pub data: serde_json::Value, #[serde(skip_serializing_if = "Vec::is_empty", default)] pub append: Vec, + /// Sequence cursor for ordering and resume capability + #[serde(skip_serializing_if = "Option::is_none")] + pub seq: Option, } /// A single entity within a snapshot @@ -149,6 +152,7 @@ mod tests { key: "123".to_string(), data: serde_json::json!({}), append: vec![], + seq: None, }; assert_eq!(frame.entity(), "SettlementGame/list"); @@ -164,6 +168,7 @@ mod tests { key: "123".to_string(), data: serde_json::json!({"gameId": "123"}), append: vec![], + seq: None, }; let json = serde_json::to_value(&frame).unwrap(); @@ -173,6 +178,39 @@ mod tests { assert_eq!(json["key"], "123"); } + #[test] + fn test_frame_with_seq() { + let frame = Frame { + mode: Mode::List, + export: "SettlementGame/list".to_string(), + op: "upsert", + key: "123".to_string(), + data: serde_json::json!({"gameId": "123"}), + append: vec![], + seq: Some("123456789:000000000042".to_string()), + }; + + let json = serde_json::to_value(&frame).unwrap(); + assert_eq!(json["op"], "upsert"); + assert_eq!(json["seq"], "123456789:000000000042"); + } + + #[test] + fn test_frame_seq_skipped_when_none() { + let frame = Frame { + mode: Mode::List, + export: "SettlementGame/list".to_string(), + op: "upsert", + key: "123".to_string(), + data: serde_json::json!({"gameId": "123"}), + append: vec![], + seq: None, + }; + + let json = serde_json::to_value(&frame).unwrap(); + assert!(json.get("seq").is_none()); + } + #[test] fn test_snapshot_frame_complete_serialization() { let frame = SnapshotFrame { diff --git a/rust/hyperstack-server/src/websocket/server.rs b/rust/hyperstack-server/src/websocket/server.rs index 39f9efda..572c650c 100644 --- a/rust/hyperstack-server/src/websocket/server.rs +++ b/rust/hyperstack-server/src/websocket/server.rs @@ -581,28 +581,35 @@ async fn attach_client_to_bus( let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await; - if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await { - transform_large_u64_to_strings(&mut cached_entity); - let snapshot_entities = vec![SnapshotEntity { - key: key.to_string(), - data: cached_entity, - }]; - let batch_config = ctx.entity_cache.snapshot_config(); - let _ = send_snapshot_batches( - ctx.client_id, - &snapshot_entities, - view_spec.mode, - view_id, - ctx.client_manager, - &batch_config, - #[cfg(feature = "otel")] - ctx.metrics.as_ref(), - ) - .await; - rx.borrow_and_update(); - } else if !rx.borrow().is_empty() { - let data = rx.borrow_and_update().clone(); - let _ = ctx.client_manager.send_to_client(ctx.client_id, data); + // Check if we should send snapshot (defaults to true for backward compatibility) + let should_send_snapshot = subscription.with_snapshot.unwrap_or(true); + + if should_send_snapshot { + if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await { + transform_large_u64_to_strings(&mut cached_entity); + let snapshot_entities = vec![SnapshotEntity { + key: key.to_string(), + data: cached_entity, + }]; + let batch_config = ctx.entity_cache.snapshot_config(); + let _ = send_snapshot_batches( + ctx.client_id, + &snapshot_entities, + view_spec.mode, + view_id, + ctx.client_manager, + &batch_config, + #[cfg(feature = "otel")] + ctx.metrics.as_ref(), + ) + .await; + rx.borrow_and_update(); + } else if !rx.borrow().is_empty() { + let data = rx.borrow_and_update().clone(); + let _ = ctx.client_manager.send_to_client(ctx.client_id, data); + } + } else { + info!("Client {} subscribed to {} without snapshot", ctx.client_id, view_id); } let client_id = ctx.client_id; @@ -639,33 +646,46 @@ async fn attach_client_to_bus( Mode::List | Mode::Append => { let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await; - let snapshots = ctx.entity_cache.get_all(view_id).await; - let snapshot_entities: Vec = snapshots - .into_iter() - .filter(|(key, _)| subscription.matches_key(key)) - .map(|(key, mut data)| { - transform_large_u64_to_strings(&mut data); - SnapshotEntity { key, data } - }) - .collect(); - - if !snapshot_entities.is_empty() { - let batch_config = ctx.entity_cache.snapshot_config(); - if send_snapshot_batches( - ctx.client_id, - &snapshot_entities, - view_spec.mode, - view_id, - ctx.client_manager, - &batch_config, - #[cfg(feature = "otel")] - ctx.metrics.as_ref(), - ) - .await - .is_err() - { - return; + // Check if we should send snapshot (defaults to true for backward compatibility) + let should_send_snapshot = subscription.with_snapshot.unwrap_or(true); + + if should_send_snapshot { + // Determine which entities to send based on cursor + let snapshots = if let Some(ref cursor) = subscription.after { + ctx.entity_cache.get_after(view_id, cursor, subscription.snapshot_limit).await + } else { + ctx.entity_cache.get_all(view_id).await + }; + + let snapshot_entities: Vec = snapshots + .into_iter() + .filter(|(key, _)| subscription.matches_key(key)) + .map(|(key, mut data)| { + transform_large_u64_to_strings(&mut data); + SnapshotEntity { key, data } + }) + .collect(); + + if !snapshot_entities.is_empty() { + let batch_config = ctx.entity_cache.snapshot_config(); + if send_snapshot_batches( + ctx.client_id, + &snapshot_entities, + view_spec.mode, + view_id, + ctx.client_manager, + &batch_config, + #[cfg(feature = "otel")] + ctx.metrics.as_ref(), + ) + .await + .is_err() + { + return; + } } + } else { + info!("Client {} subscribed to {} without snapshot", ctx.client_id, view_id); } let client_id = ctx.client_id; @@ -820,6 +840,7 @@ async fn attach_derived_view_subscription_otel( if let Some((new_key, data)) = new_window.first() { for old_key in current_window_keys.difference(&new_keys) { let delete_frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "delete", @@ -841,6 +862,7 @@ async fn attach_derived_view_subscription_otel( let mut transformed_data = data.clone(); transform_large_u64_to_strings(&mut transformed_data); let frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "upsert", @@ -862,6 +884,7 @@ async fn attach_derived_view_subscription_otel( } else { for key in current_window_keys.difference(&new_keys) { let delete_frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "delete", @@ -884,6 +907,7 @@ async fn attach_derived_view_subscription_otel( let mut transformed_data = data.clone(); transform_large_u64_to_strings(&mut transformed_data); let frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "upsert", @@ -959,26 +983,33 @@ async fn attach_client_to_bus( let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await; - if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await { - transform_large_u64_to_strings(&mut cached_entity); - let snapshot_entities = vec![SnapshotEntity { - key: key.to_string(), - data: cached_entity, - }]; - let batch_config = ctx.entity_cache.snapshot_config(); - let _ = send_snapshot_batches( - ctx.client_id, - &snapshot_entities, - view_spec.mode, - view_id, - ctx.client_manager, - &batch_config, - ) - .await; - rx.borrow_and_update(); - } else if !rx.borrow().is_empty() { - let data = rx.borrow_and_update().clone(); - let _ = ctx.client_manager.send_to_client(ctx.client_id, data); + // Check if we should send snapshot (defaults to true for backward compatibility) + let should_send_snapshot = subscription.with_snapshot.unwrap_or(true); + + if should_send_snapshot { + if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await { + transform_large_u64_to_strings(&mut cached_entity); + let snapshot_entities = vec![SnapshotEntity { + key: key.to_string(), + data: cached_entity, + }]; + let batch_config = ctx.entity_cache.snapshot_config(); + let _ = send_snapshot_batches( + ctx.client_id, + &snapshot_entities, + view_spec.mode, + view_id, + ctx.client_manager, + &batch_config, + ) + .await; + rx.borrow_and_update(); + } else if !rx.borrow().is_empty() { + let data = rx.borrow_and_update().clone(); + let _ = ctx.client_manager.send_to_client(ctx.client_id, data); + } + } else { + info!("Client {} subscribed to {} without snapshot", ctx.client_id, view_id); } let client_id = ctx.client_id; @@ -1011,31 +1042,44 @@ async fn attach_client_to_bus( Mode::List | Mode::Append => { let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await; - let snapshots = ctx.entity_cache.get_all(view_id).await; - let snapshot_entities: Vec = snapshots - .into_iter() - .filter(|(key, _)| subscription.matches_key(key)) - .map(|(key, mut data)| { - transform_large_u64_to_strings(&mut data); - SnapshotEntity { key, data } - }) - .collect(); - - if !snapshot_entities.is_empty() { - let batch_config = ctx.entity_cache.snapshot_config(); - if send_snapshot_batches( - ctx.client_id, - &snapshot_entities, - view_spec.mode, - view_id, - ctx.client_manager, - &batch_config, - ) - .await - .is_err() - { - return; + // Check if we should send snapshot (defaults to true for backward compatibility) + let should_send_snapshot = subscription.with_snapshot.unwrap_or(true); + + if should_send_snapshot { + // Determine which entities to send based on cursor + let snapshots = if let Some(ref cursor) = subscription.after { + ctx.entity_cache.get_after(view_id, cursor, subscription.snapshot_limit).await + } else { + ctx.entity_cache.get_all(view_id).await + }; + + let snapshot_entities: Vec = snapshots + .into_iter() + .filter(|(key, _)| subscription.matches_key(key)) + .map(|(key, mut data)| { + transform_large_u64_to_strings(&mut data); + SnapshotEntity { key, data } + }) + .collect(); + + if !snapshot_entities.is_empty() { + let batch_config = ctx.entity_cache.snapshot_config(); + if send_snapshot_batches( + ctx.client_id, + &snapshot_entities, + view_spec.mode, + view_id, + ctx.client_manager, + &batch_config, + ) + .await + .is_err() + { + return; + } } + } else { + info!("Client {} subscribed to {} without snapshot", ctx.client_id, view_id); } let client_id = ctx.client_id; @@ -1183,6 +1227,7 @@ async fn attach_derived_view_subscription( if let Some((new_key, data)) = new_window.first() { for old_key in current_window_keys.difference(&new_keys) { let delete_frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "delete", @@ -1201,6 +1246,7 @@ async fn attach_derived_view_subscription( let mut transformed_data = data.clone(); transform_large_u64_to_strings(&mut transformed_data); let frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "upsert", @@ -1218,6 +1264,7 @@ async fn attach_derived_view_subscription( } else { for key in current_window_keys.difference(&new_keys) { let delete_frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "delete", @@ -1237,6 +1284,7 @@ async fn attach_derived_view_subscription( let mut transformed_data = data.clone(); transform_large_u64_to_strings(&mut transformed_data); let frame = Frame { + seq: None, mode: frame_mode, export: view_id_clone.clone(), op: "upsert", diff --git a/rust/hyperstack-server/src/websocket/subscription.rs b/rust/hyperstack-server/src/websocket/subscription.rs index 75ed6f40..38a05c83 100644 --- a/rust/hyperstack-server/src/websocket/subscription.rs +++ b/rust/hyperstack-server/src/websocket/subscription.rs @@ -26,6 +26,15 @@ pub struct Subscription { /// Number of items to skip (for windowed subscriptions) #[serde(skip_serializing_if = "Option::is_none")] pub skip: Option, + /// Whether to include initial snapshot (defaults to true for backward compatibility) + #[serde(skip_serializing_if = "Option::is_none")] + pub with_snapshot: Option, + /// Cursor for resuming from a specific point (_seq value) + #[serde(skip_serializing_if = "Option::is_none")] + pub after: Option, + /// Maximum number of entities to include in snapshot (pagination hint) + #[serde(skip_serializing_if = "Option::is_none")] + pub snapshot_limit: Option, } /// Client unsubscription request @@ -103,6 +112,9 @@ mod tests { partition: None, take: None, skip: None, + with_snapshot: None, + after: None, + snapshot_limit: None, }; assert!(sub.matches("SettlementGame/list", "835")); @@ -118,6 +130,9 @@ mod tests { partition: None, take: None, skip: None, + with_snapshot: None, + after: None, + snapshot_limit: None, }; assert!(sub.matches("SettlementGame/list", "835")); @@ -189,6 +204,9 @@ mod tests { partition: None, take: None, skip: None, + with_snapshot: None, + after: None, + snapshot_limit: None, }; assert_eq!(sub.sub_key(), "SettlementGame/list:835"); } @@ -201,6 +219,9 @@ mod tests { partition: None, take: None, skip: None, + with_snapshot: None, + after: None, + snapshot_limit: None, }; assert_eq!(sub.sub_key(), "SettlementGame/list:*"); } @@ -239,4 +260,72 @@ mod tests { _ => panic!("Expected Subscribe"), } } + + #[test] + fn test_subscription_with_optional_snapshot() { + let json = json!({ + "type": "subscribe", + "view": "SettlementGame/list", + "with_snapshot": false + }); + + let msg: ClientMessage = serde_json::from_value(json).unwrap(); + match msg { + ClientMessage::Subscribe(sub) => { + assert_eq!(sub.view, "SettlementGame/list"); + assert_eq!(sub.with_snapshot, Some(false)); + } + _ => panic!("Expected Subscribe"), + } + } + + #[test] + fn test_subscription_with_after_cursor() { + let json = json!({ + "type": "subscribe", + "view": "SettlementGame/list", + "after": "123456789:000000000042" + }); + + let msg: ClientMessage = serde_json::from_value(json).unwrap(); + match msg { + ClientMessage::Subscribe(sub) => { + assert_eq!(sub.view, "SettlementGame/list"); + assert_eq!(sub.after, Some("123456789:000000000042".to_string())); + } + _ => panic!("Expected Subscribe"), + } + } + + #[test] + fn test_subscription_with_snapshot_limit() { + let json = json!({ + "type": "subscribe", + "view": "SettlementGame/list", + "after": "123456789:000000000042", + "snapshot_limit": 100 + }); + + let msg: ClientMessage = serde_json::from_value(json).unwrap(); + match msg { + ClientMessage::Subscribe(sub) => { + assert_eq!(sub.view, "SettlementGame/list"); + assert_eq!(sub.after, Some("123456789:000000000042".to_string())); + assert_eq!(sub.snapshot_limit, Some(100)); + } + _ => panic!("Expected Subscribe"), + } + } + + #[test] + fn test_subscription_defaults_with_snapshot_to_true() { + let json = json!({ + "view": "SettlementGame/list" + }); + + let sub: Subscription = serde_json::from_value(json).unwrap(); + assert_eq!(sub.with_snapshot, None); + // When None, server should default to true + assert!(sub.with_snapshot.unwrap_or(true)); + } } From 9c5fcc0a063f8696277bb190ab4fa14e9e0f8e73 Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 03:19:48 +0000 Subject: [PATCH 02/21] feat(rust-sdk): Support optional snapshots and cursor-based resume Update Rust SDK for HYP-148 protocol changes: - Add with_snapshot, after, and snapshot_limit fields to Subscription - Add builder methods: with_snapshot(), after(), with_snapshot_limit() - Add seq field to Frame struct for cursor tracking - Update ConnectionManager to send new subscription fields - Maintain backward compatibility with defaults Usage: let sub = Subscription::new("Game/list") .with_snapshot(false) .after("123:000000000042") .with_snapshot_limit(100); --- rust/hyperstack-sdk/src/connection.rs | 11 ++++++++- rust/hyperstack-sdk/src/frame.rs | 3 +++ rust/hyperstack-sdk/src/stream.rs | 6 ++--- rust/hyperstack-sdk/src/subscription.rs | 30 +++++++++++++++++++++++++ 4 files changed, 46 insertions(+), 4 deletions(-) diff --git a/rust/hyperstack-sdk/src/connection.rs b/rust/hyperstack-sdk/src/connection.rs index 55a7eb8e..d1def253 100644 --- a/rust/hyperstack-sdk/src/connection.rs +++ b/rust/hyperstack-sdk/src/connection.rs @@ -63,7 +63,7 @@ impl ConnectionManager { } pub async fn ensure_subscription(&self, view: &str, key: Option<&str>) { - self.ensure_subscription_with_opts(view, key, None, None) + self.ensure_subscription_with_opts(view, key, None, None, None, None, None) .await } @@ -73,6 +73,9 @@ impl ConnectionManager { key: Option<&str>, take: Option, skip: Option, + with_snapshot: Option, + after: Option<&str>, + snapshot_limit: Option, ) { let sub = Subscription { view: view.to_string(), @@ -81,6 +84,9 @@ impl ConnectionManager { filters: None, take, skip, + with_snapshot, + after: after.map(|s| s.to_string()), + snapshot_limit, }; if !self.inner.subscriptions.read().await.contains(&sub) { @@ -196,6 +202,9 @@ fn spawn_connection_loop( filters: None, take: None, skip: None, + with_snapshot: None, + after: None, + snapshot_limit: None, }; subscriptions.write().await.remove(&sub); let client_msg = ClientMessage::Unsubscribe(unsub); diff --git a/rust/hyperstack-sdk/src/frame.rs b/rust/hyperstack-sdk/src/frame.rs index 29c7a703..f2773a69 100644 --- a/rust/hyperstack-sdk/src/frame.rs +++ b/rust/hyperstack-sdk/src/frame.rs @@ -81,6 +81,9 @@ pub struct Frame { pub data: serde_json::Value, #[serde(default)] pub append: Vec, + /// Sequence cursor for ordering and resume capability + #[serde(skip_serializing_if = "Option::is_none")] + pub seq: Option, } impl Frame { diff --git a/rust/hyperstack-sdk/src/stream.rs b/rust/hyperstack-sdk/src/stream.rs index 21cc7282..0561ecd9 100644 --- a/rust/hyperstack-sdk/src/stream.rs +++ b/rust/hyperstack-sdk/src/stream.rs @@ -326,7 +326,7 @@ impl Stream for EntityStre let view = subscription_view.clone(); let key = subscription_key.clone(); let fut = Box::pin(async move { - conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip) + conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip, None, None, None) .await; }); @@ -540,7 +540,7 @@ impl Stream for RichEntity let view = subscription_view.clone(); let key = subscription_key.clone(); let fut = Box::pin(async move { - conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip) + conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip, None, None, None) .await; }); @@ -1024,7 +1024,7 @@ impl Stream for UseStream< let view = subscription_view.clone(); let key = subscription_key.clone(); let fut = Box::pin(async move { - conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip) + conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip, None, None, None) .await; }); diff --git a/rust/hyperstack-sdk/src/subscription.rs b/rust/hyperstack-sdk/src/subscription.rs index d6fae06d..a085764d 100644 --- a/rust/hyperstack-sdk/src/subscription.rs +++ b/rust/hyperstack-sdk/src/subscription.rs @@ -22,6 +22,15 @@ pub struct Subscription { pub take: Option, #[serde(skip_serializing_if = "Option::is_none")] pub skip: Option, + /// Whether to include initial snapshot (defaults to true for backward compatibility) + #[serde(skip_serializing_if = "Option::is_none")] + pub with_snapshot: Option, + /// Cursor for resuming from a specific point (_seq value) + #[serde(skip_serializing_if = "Option::is_none")] + pub after: Option, + /// Maximum number of entities to include in snapshot (pagination hint) + #[serde(skip_serializing_if = "Option::is_none")] + pub snapshot_limit: Option, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -67,6 +76,9 @@ impl Subscription { filters: None, take: None, skip: None, + with_snapshot: None, + after: None, + snapshot_limit: None, } } @@ -90,6 +102,24 @@ impl Subscription { self } + /// Set whether to include the initial snapshot (defaults to true) + pub fn with_snapshot(mut self, with_snapshot: bool) -> Self { + self.with_snapshot = Some(with_snapshot); + self + } + + /// Set the cursor to resume from (for reconnecting and getting only newer data) + pub fn after(mut self, cursor: impl Into) -> Self { + self.after = Some(cursor.into()); + self + } + + /// Set the maximum number of entities to include in the snapshot + pub fn with_snapshot_limit(mut self, limit: usize) -> Self { + self.snapshot_limit = Some(limit); + self + } + pub fn sub_key(&self) -> String { let filters_str = self .filters From 3f239e9d9fa2f992b7d8eaa1c467c541af9a8a9a Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 03:19:52 +0000 Subject: [PATCH 03/21] feat(typescript-sdk): Support optional snapshots and cursor-based resume Update TypeScript and React SDKs for HYP-148 protocol changes: - Add withSnapshot, after, and snapshotLimit to Subscription and WatchOptions - Add seq field to EntityFrame for cursor tracking - Add new fields to ViewHookOptions and ListParamsBase for React hooks - Connection automatically includes new fields via spread operator Usage: // Skip snapshot view.watch({ withSnapshot: false }) // Resume from cursor view.watch({ after: "123:000000000042", snapshotLimit: 100 }) // React hooks useListView(Game.list, { after: lastSeq, withSnapshot: false }) --- typescript/core/src/frame.ts | 2 ++ typescript/core/src/types.ts | 12 ++++++++++++ typescript/react/src/types.ts | 12 ++++++++++++ 3 files changed, 26 insertions(+) diff --git a/typescript/core/src/frame.ts b/typescript/core/src/frame.ts index 95e4f03e..066a9def 100644 --- a/typescript/core/src/frame.ts +++ b/typescript/core/src/frame.ts @@ -23,6 +23,8 @@ export interface EntityFrame { key: string; data: T; append?: string[]; + /** Sequence cursor for ordering and resume capability */ + seq?: string; } export interface SnapshotEntity { diff --git a/typescript/core/src/types.ts b/typescript/core/src/types.ts index 41e232f4..c144cada 100644 --- a/typescript/core/src/types.ts +++ b/typescript/core/src/types.ts @@ -41,6 +41,12 @@ export interface Subscription { filters?: Record; take?: number; skip?: number; + /** Whether to include initial snapshot (defaults to true for backward compatibility) */ + withSnapshot?: boolean; + /** Cursor for resuming from a specific point (_seq value) */ + after?: string; + /** Maximum number of entities to include in snapshot (pagination hint) */ + snapshotLimit?: number; } export type SchemaResult = @@ -56,6 +62,12 @@ export interface WatchOptions { skip?: number; filters?: Record; schema?: Schema; + /** Whether to include initial snapshot (defaults to true) */ + withSnapshot?: boolean; + /** Cursor for resuming from a specific point (_seq value) */ + after?: string; + /** Maximum number of entities to include in snapshot */ + snapshotLimit?: number; } export interface HyperStackOptions { diff --git a/typescript/react/src/types.ts b/typescript/react/src/types.ts index d52b51c0..e2a1ccb2 100644 --- a/typescript/react/src/types.ts +++ b/typescript/react/src/types.ts @@ -60,6 +60,12 @@ export interface ViewHookOptions { refreshOnReconnect?: boolean; /** Schema to validate entities. Returns undefined if validation fails. */ schema?: Schema; + /** Whether to include initial snapshot (defaults to true) */ + withSnapshot?: boolean; + /** Cursor for resuming from a specific point (_seq value) */ + after?: string; + /** Maximum number of entities to include in snapshot */ + snapshotLimit?: number; } export interface ViewHookResult { @@ -77,6 +83,12 @@ export interface ListParamsBase { skip?: number; /** Schema to validate/filter entities. Only entities passing safeParse will be returned. */ schema?: Schema; + /** Whether to include initial snapshot (defaults to true) */ + withSnapshot?: boolean; + /** Cursor for resuming from a specific point (_seq value) */ + after?: string; + /** Maximum number of entities to include in snapshot */ + snapshotLimit?: number; } export interface ListParamsSingle extends ListParamsBase { From 1f7f95be29e70391c74cec425ee2badd1f87e0bc Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 03:29:22 +0000 Subject: [PATCH 04/21] feat(sdk): Add builder methods and React hooks for new subscription options Update SDKs to fully expose HYP-148 subscription options: Rust SDK: - Add with_snapshot, after, and snapshot_limit fields to UseBuilder - Add builder methods: with_snapshot(), after(), with_snapshot_limit() - Similar updates to WatchBuilder and RichWatchBuilder React SDK: - Update useListView to pass withSnapshot, after, snapshotLimit through - Update useStateView to pass withSnapshot, after, snapshotLimit through - Ensure all subscription options are included in useEffect and refresh dependencies --- rust/hyperstack-sdk/src/view.rs | 24 ++++++++++++++++++ typescript/react/src/view-hooks.ts | 40 ++++++++++++++++++++++++------ 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/rust/hyperstack-sdk/src/view.rs b/rust/hyperstack-sdk/src/view.rs index 96fdf5d0..4c1afef3 100644 --- a/rust/hyperstack-sdk/src/view.rs +++ b/rust/hyperstack-sdk/src/view.rs @@ -149,6 +149,9 @@ where take: Option, skip: Option, filters: Option>, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, stream: Option>, } @@ -170,6 +173,9 @@ where take: None, skip: None, filters: None, + with_snapshot: None, + after: None, + snapshot_limit: None, stream: None, } } @@ -193,6 +199,24 @@ where .insert(key.into(), value.into()); self } + + /// Set whether to include the initial snapshot (defaults to true). + pub fn with_snapshot(mut self, with_snapshot: bool) -> Self { + self.with_snapshot = Some(with_snapshot); + self + } + + /// Set the cursor to resume from (for reconnecting and getting only newer data). + pub fn after(mut self, cursor: impl Into) -> Self { + self.after = Some(cursor.into()); + self + } + + /// Set the maximum number of entities to include in the snapshot. + pub fn with_snapshot_limit(mut self, limit: usize) -> Self { + self.snapshot_limit = Some(limit); + self + } } impl Stream for UseBuilder diff --git a/typescript/react/src/view-hooks.ts b/typescript/react/src/view-hooks.ts index 842681d7..b3e1bfbb 100644 --- a/typescript/react/src/view-hooks.ts +++ b/typescript/react/src/view-hooks.ts @@ -27,13 +27,22 @@ export function useStateView( const keyString = key ? Object.values(key)[0] : undefined; const enabled = options?.enabled !== false; const schema = options?.schema as Schema | undefined; + const withSnapshot = options?.withSnapshot; + const after = options?.after; + const snapshotLimit = options?.snapshotLimit; useEffect(() => { if (!enabled || !clientRef.current) return undefined; try { const registry = clientRef.current.getSubscriptionRegistry(); - const unsubscribe = registry.subscribe({ view: viewDef.view, key: keyString }); + const unsubscribe = registry.subscribe({ + view: viewDef.view, + key: keyString, + withSnapshot, + after, + snapshotLimit + }); setIsLoading(true); return () => { @@ -48,14 +57,20 @@ export function useStateView( setIsLoading(false); return undefined; } - }, [viewDef.view, keyString, enabled, client]); + }, [viewDef.view, keyString, enabled, withSnapshot, after, snapshotLimit, client]); const refresh = useCallback(() => { if (!enabled || !clientRef.current) return; try { const registry = clientRef.current.getSubscriptionRegistry(); - const unsubscribe = registry.subscribe({ view: viewDef.view, key: keyString }); + const unsubscribe = registry.subscribe({ + view: viewDef.view, + key: keyString, + withSnapshot, + after, + snapshotLimit + }); setIsLoading(true); setTimeout(() => { @@ -69,7 +84,7 @@ export function useStateView( setError(err instanceof Error ? err : new Error('Refresh failed')); setIsLoading(false); } - }, [viewDef.view, keyString, enabled]); + }, [viewDef.view, keyString, enabled, withSnapshot, after, snapshotLimit]); const subscribe = useCallback((callback: () => void) => { if (!clientRef.current) return () => {}; @@ -128,6 +143,9 @@ export function useListView( const filtersJson = params?.filters ? JSON.stringify(params.filters) : undefined; const limit = params?.limit; const schema = params?.schema as Schema | undefined; + const withSnapshot = params?.withSnapshot; + const after = params?.after; + const snapshotLimit = params?.snapshotLimit; useEffect(() => { if (!enabled || !clientRef.current) return undefined; @@ -139,7 +157,10 @@ export function useListView( key, filters: params?.filters, take, - skip + skip, + withSnapshot, + after, + snapshotLimit }); setIsLoading(true); @@ -155,7 +176,7 @@ export function useListView( setIsLoading(false); return undefined; } - }, [viewDef.view, enabled, key, filtersJson, take, skip, client]); + }, [viewDef.view, enabled, key, filtersJson, take, skip, withSnapshot, after, snapshotLimit, client]); const refresh = useCallback(() => { if (!enabled || !clientRef.current) return; @@ -167,7 +188,10 @@ export function useListView( key, filters: params?.filters, take, - skip + skip, + withSnapshot, + after, + snapshotLimit }); setIsLoading(true); @@ -182,7 +206,7 @@ export function useListView( setError(err instanceof Error ? err : new Error('Refresh failed')); setIsLoading(false); } - }, [viewDef.view, enabled, key, filtersJson, take, skip]); + }, [viewDef.view, enabled, key, filtersJson, take, skip, withSnapshot, after, snapshotLimit]); const subscribe = useCallback((callback: () => void) => { if (!clientRef.current) return () => {}; From 8f4fba016c6a530d6462b4711b19acc1ca670452 Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 03:33:11 +0000 Subject: [PATCH 05/21] fix(rust-sdk): Wire up new subscription fields through stream layer Ensure with_snapshot, after, and snapshot_limit are properly passed from builders through to actual WebSocket subscriptions: - Update UseStream, EntityStream, and RichEntityStream to accept and store new fields - Update all builder poll_next implementations to pass new fields when creating streams - Update connection.ensure_subscription_with_opts calls to use actual values instead of None - Ensure UseBuilder, WatchBuilder, and RichWatchBuilder all properly wire up the fields --- rust/hyperstack-sdk/src/stream.rs | 51 ++++++++++++++++++++++++-- rust/hyperstack-sdk/src/view.rs | 60 +++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 3 deletions(-) diff --git a/rust/hyperstack-sdk/src/stream.rs b/rust/hyperstack-sdk/src/stream.rs index 0561ecd9..fd320e23 100644 --- a/rust/hyperstack-sdk/src/stream.rs +++ b/rust/hyperstack-sdk/src/stream.rs @@ -179,6 +179,9 @@ enum EntityStreamState { subscription_key: Option, take: Option, skip: Option, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, }, Active { inner: BroadcastStream, @@ -246,6 +249,9 @@ impl EntityStream { subscription_key, None, None, + None, + None, + None, ) } @@ -259,6 +265,9 @@ impl EntityStream { subscription_key: Option, take: Option, skip: Option, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, ) -> Self { Self { state: EntityStreamState::Lazy { @@ -268,6 +277,9 @@ impl EntityStream { subscription_key, take, skip, + with_snapshot, + after, + snapshot_limit, }, view: entity_name, key_filter, @@ -313,6 +325,9 @@ impl Stream for EntityStre subscription_key, take, skip, + with_snapshot, + after, + snapshot_limit, } = std::mem::replace(&mut this.state, EntityStreamState::Invalid) else { unreachable!() @@ -326,7 +341,7 @@ impl Stream for EntityStre let view = subscription_view.clone(); let key = subscription_key.clone(); let fut = Box::pin(async move { - conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip, None, None, None) + conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip, with_snapshot, after.as_deref(), snapshot_limit) .await; }); @@ -429,6 +444,9 @@ enum RichEntityStreamState { subscription_key: Option, take: Option, skip: Option, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, }, Active { inner: BroadcastStream, @@ -481,6 +499,9 @@ impl RichEntityStream { subscription_key, None, None, + None, + None, + None, ) } @@ -494,6 +515,9 @@ impl RichEntityStream { subscription_key: Option, take: Option, skip: Option, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, ) -> Self { Self { state: RichEntityStreamState::Lazy { @@ -503,6 +527,9 @@ impl RichEntityStream { subscription_key, take, skip, + with_snapshot, + after, + snapshot_limit, }, view: entity_name, key_filter, @@ -527,6 +554,9 @@ impl Stream for RichEntity subscription_key, take, skip, + with_snapshot, + after, + snapshot_limit, } = std::mem::replace(&mut this.state, RichEntityStreamState::Invalid) else { unreachable!() @@ -540,7 +570,7 @@ impl Stream for RichEntity let view = subscription_view.clone(); let key = subscription_key.clone(); let fut = Box::pin(async move { - conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip, None, None, None) + conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip, with_snapshot, after.as_deref(), snapshot_limit) .await; }); @@ -890,6 +920,9 @@ enum UseStreamState { subscription_key: Option, take: Option, skip: Option, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, }, Active { inner: BroadcastStream, @@ -942,6 +975,9 @@ impl UseStream { subscription_key, None, None, + None, + None, + None, ) } @@ -955,6 +991,9 @@ impl UseStream { subscription_key: Option, take: Option, skip: Option, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, ) -> Self { Self { state: UseStreamState::Lazy { @@ -964,6 +1003,9 @@ impl UseStream { subscription_key, take, skip, + with_snapshot, + after, + snapshot_limit, }, view: entity_name, key_filter, @@ -1012,6 +1054,9 @@ impl Stream for UseStream< subscription_key, take, skip, + with_snapshot, + after, + snapshot_limit, } = std::mem::replace(&mut this.state, UseStreamState::Invalid) else { unreachable!() @@ -1024,7 +1069,7 @@ impl Stream for UseStream< let view = subscription_view.clone(); let key = subscription_key.clone(); let fut = Box::pin(async move { - conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip, None, None, None) + conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip, with_snapshot, after.as_deref(), snapshot_limit) .await; }); diff --git a/rust/hyperstack-sdk/src/view.rs b/rust/hyperstack-sdk/src/view.rs index 4c1afef3..1b379129 100644 --- a/rust/hyperstack-sdk/src/view.rs +++ b/rust/hyperstack-sdk/src/view.rs @@ -238,6 +238,9 @@ where None, this.take, this.skip, + this.with_snapshot, + this.after.clone(), + this.snapshot_limit, )); } @@ -257,6 +260,9 @@ where take: Option, skip: Option, filters: Option>, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, stream: Option>, } @@ -278,6 +284,9 @@ where take: None, skip: None, filters: None, + with_snapshot: None, + after: None, + snapshot_limit: None, stream: None, } } @@ -302,6 +311,24 @@ where self } + /// Set whether to include the initial snapshot (defaults to true). + pub fn with_snapshot(mut self, with_snapshot: bool) -> Self { + self.with_snapshot = Some(with_snapshot); + self + } + + /// Set the cursor to resume from (for reconnecting and getting only newer data). + pub fn after(mut self, cursor: impl Into) -> Self { + self.after = Some(cursor.into()); + self + } + + /// Set the maximum number of entities to include in the snapshot. + pub fn with_snapshot_limit(mut self, limit: usize) -> Self { + self.snapshot_limit = Some(limit); + self + } + /// Get a rich stream with before/after diffs instead. pub fn rich(self) -> RichEntityStream { RichEntityStream::new_lazy_with_opts( @@ -313,6 +340,9 @@ where None, self.take, self.skip, + self.with_snapshot, + self.after, + self.snapshot_limit, ) } } @@ -336,6 +366,9 @@ where None, this.take, this.skip, + this.with_snapshot, + this.after.clone(), + this.snapshot_limit, )); } @@ -355,6 +388,9 @@ where take: Option, skip: Option, filters: Option>, + with_snapshot: Option, + after: Option, + snapshot_limit: Option, stream: Option>, } @@ -376,6 +412,9 @@ where take: None, skip: None, filters: None, + with_snapshot: None, + after: None, + snapshot_limit: None, stream: None, } } @@ -396,6 +435,24 @@ where .insert(key.into(), value.into()); self } + + /// Set whether to include the initial snapshot (defaults to true). + pub fn with_snapshot(mut self, with_snapshot: bool) -> Self { + self.with_snapshot = Some(with_snapshot); + self + } + + /// Set the cursor to resume from (for reconnecting and getting only newer data). + pub fn after(mut self, cursor: impl Into) -> Self { + self.after = Some(cursor.into()); + self + } + + /// Set the maximum number of entities to include in the snapshot. + pub fn with_snapshot_limit(mut self, limit: usize) -> Self { + self.snapshot_limit = Some(limit); + self + } } impl Stream for RichWatchBuilder @@ -417,6 +474,9 @@ where None, this.take, this.skip, + this.with_snapshot, + this.after.clone(), + this.snapshot_limit, )); } From 865f953120814777cd2707f2074d2b5cfaf43c38 Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 03:42:59 +0000 Subject: [PATCH 06/21] refactor: fix clippy too_many_arguments warning Group 5 optional subscription parameters into SubscriptionOptions struct to reduce ensure_subscription_with_opts from 8 args to 3. --- rust/hyperstack-sdk/src/connection.rs | 28 ++++++++++++++++---------- rust/hyperstack-sdk/src/stream.rs | 29 +++++++++++++++++++++++---- stacks/ore/Cargo.lock | 10 ++++----- 3 files changed, 47 insertions(+), 20 deletions(-) diff --git a/rust/hyperstack-sdk/src/connection.rs b/rust/hyperstack-sdk/src/connection.rs index d1def253..7545b7b6 100644 --- a/rust/hyperstack-sdk/src/connection.rs +++ b/rust/hyperstack-sdk/src/connection.rs @@ -22,6 +22,16 @@ pub enum ConnectionCommand { Disconnect, } +/// Options for subscribing to a view with specific parameters +#[derive(Debug, Clone, Default)] +pub struct SubscriptionOptions { + pub take: Option, + pub skip: Option, + pub with_snapshot: Option, + pub after: Option, + pub snapshot_limit: Option, +} + struct ConnectionManagerInner { #[allow(dead_code)] url: String, @@ -63,7 +73,7 @@ impl ConnectionManager { } pub async fn ensure_subscription(&self, view: &str, key: Option<&str>) { - self.ensure_subscription_with_opts(view, key, None, None, None, None, None) + self.ensure_subscription_with_opts(view, key, SubscriptionOptions::default()) .await } @@ -71,22 +81,18 @@ impl ConnectionManager { &self, view: &str, key: Option<&str>, - take: Option, - skip: Option, - with_snapshot: Option, - after: Option<&str>, - snapshot_limit: Option, + opts: SubscriptionOptions, ) { let sub = Subscription { view: view.to_string(), key: key.map(|s| s.to_string()), partition: None, filters: None, - take, - skip, - with_snapshot, - after: after.map(|s| s.to_string()), - snapshot_limit, + take: opts.take, + skip: opts.skip, + with_snapshot: opts.with_snapshot, + after: opts.after, + snapshot_limit: opts.snapshot_limit, }; if !self.inner.subscriptions.read().await.contains(&sub) { diff --git a/rust/hyperstack-sdk/src/stream.rs b/rust/hyperstack-sdk/src/stream.rs index fd320e23..725521c0 100644 --- a/rust/hyperstack-sdk/src/stream.rs +++ b/rust/hyperstack-sdk/src/stream.rs @@ -1,4 +1,4 @@ -use crate::connection::ConnectionManager; +use crate::connection::{ConnectionManager, SubscriptionOptions}; use crate::frame::Operation; use crate::store::{SharedStore, StoreUpdate}; use futures_util::Stream; @@ -341,7 +341,14 @@ impl Stream for EntityStre let view = subscription_view.clone(); let key = subscription_key.clone(); let fut = Box::pin(async move { - conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip, with_snapshot, after.as_deref(), snapshot_limit) + let opts = SubscriptionOptions { + take, + skip, + with_snapshot, + after, + snapshot_limit, + }; + conn.ensure_subscription_with_opts(&view, key.as_deref(), opts) .await; }); @@ -570,7 +577,14 @@ impl Stream for RichEntity let view = subscription_view.clone(); let key = subscription_key.clone(); let fut = Box::pin(async move { - conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip, with_snapshot, after.as_deref(), snapshot_limit) + let opts = SubscriptionOptions { + take, + skip, + with_snapshot, + after, + snapshot_limit, + }; + conn.ensure_subscription_with_opts(&view, key.as_deref(), opts) .await; }); @@ -1069,7 +1083,14 @@ impl Stream for UseStream< let view = subscription_view.clone(); let key = subscription_key.clone(); let fut = Box::pin(async move { - conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip, with_snapshot, after.as_deref(), snapshot_limit) + let opts = SubscriptionOptions { + take, + skip, + with_snapshot, + after, + snapshot_limit, + }; + conn.ensure_subscription_with_opts(&view, key.as_deref(), opts) .await; }); diff --git a/stacks/ore/Cargo.lock b/stacks/ore/Cargo.lock index 085f1c24..16c2c2e1 100644 --- a/stacks/ore/Cargo.lock +++ b/stacks/ore/Cargo.lock @@ -1436,7 +1436,7 @@ dependencies = [ [[package]] name = "hyperstack" -version = "0.5.8" +version = "0.5.10" dependencies = [ "anyhow", "bs58", @@ -1474,7 +1474,7 @@ dependencies = [ [[package]] name = "hyperstack-interpreter" -version = "0.5.8" +version = "0.5.10" dependencies = [ "bs58", "dashmap", @@ -1501,7 +1501,7 @@ dependencies = [ [[package]] name = "hyperstack-macros" -version = "0.5.8" +version = "0.5.10" dependencies = [ "bs58", "hex", @@ -1516,7 +1516,7 @@ dependencies = [ [[package]] name = "hyperstack-sdk" -version = "0.5.6" +version = "0.5.10" dependencies = [ "anyhow", "flate2", @@ -1533,7 +1533,7 @@ dependencies = [ [[package]] name = "hyperstack-server" -version = "0.5.8" +version = "0.5.10" dependencies = [ "anyhow", "base64 0.22.1", From b3850e6e411576c31bb846508230dd7afda91cb1 Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 03:45:15 +0000 Subject: [PATCH 07/21] fix: apply snapshot_limit when no after cursor is provided The snapshot_limit subscription parameter was only being honored when an after cursor was set. When after was None, get_all() was called without any limit, returning the full unlimited snapshot. This fix adds the limit parameter to get_all() and passes snapshot_limit in both otel and non-otel code paths for List/Append mode. --- rust/hyperstack-server/src/cache.rs | 15 +++++++++++---- rust/hyperstack-server/src/materialized_view.rs | 2 +- rust/hyperstack-server/src/websocket/server.rs | 4 ++-- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/rust/hyperstack-server/src/cache.rs b/rust/hyperstack-server/src/cache.rs index c217e6c9..120458fa 100644 --- a/rust/hyperstack-server/src/cache.rs +++ b/rust/hyperstack-server/src/cache.rs @@ -100,13 +100,20 @@ impl EntityCache { /// /// Returns a vector of (key, entity) pairs for sending as snapshots /// to new subscribers. - pub async fn get_all(&self, view_id: &str) -> Vec<(String, Value)> { + pub async fn get_all(&self, view_id: &str, limit: Option) -> Vec<(String, Value)> { let caches = self.caches.read().await; - caches + let mut results: Vec<(String, Value)> = caches .get(view_id) .map(|cache| cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect()) - .unwrap_or_default() + .unwrap_or_default(); + + // Apply limit if provided + if let Some(limit) = limit { + results.truncate(limit); + } + + results } /// Get entities with _seq greater than the provided cursor. @@ -499,7 +506,7 @@ mod tests { cache.upsert("tokens/list", "key1", json!({"id": 1})).await; cache.upsert("tokens/list", "key2", json!({"id": 2})).await; - let all = cache.get_all("tokens/list").await; + let all = cache.get_all("tokens/list", None).await; assert_eq!(all.len(), 2); } diff --git a/rust/hyperstack-server/src/materialized_view.rs b/rust/hyperstack-server/src/materialized_view.rs index 44b68e30..6e00d215 100644 --- a/rust/hyperstack-server/src/materialized_view.rs +++ b/rust/hyperstack-server/src/materialized_view.rs @@ -96,7 +96,7 @@ impl MaterializedView { /// Evaluate initial state from cache pub async fn evaluate_initial(&self, cache: &EntityCache) -> Vec<(String, Value)> { - let entities = cache.get_all(&self.source_id).await; + let entities = cache.get_all(&self.source_id, None).await; self.evaluate_pipeline(entities).await } diff --git a/rust/hyperstack-server/src/websocket/server.rs b/rust/hyperstack-server/src/websocket/server.rs index 572c650c..7877b6c3 100644 --- a/rust/hyperstack-server/src/websocket/server.rs +++ b/rust/hyperstack-server/src/websocket/server.rs @@ -654,7 +654,7 @@ async fn attach_client_to_bus( let snapshots = if let Some(ref cursor) = subscription.after { ctx.entity_cache.get_after(view_id, cursor, subscription.snapshot_limit).await } else { - ctx.entity_cache.get_all(view_id).await + ctx.entity_cache.get_all(view_id, subscription.snapshot_limit).await }; let snapshot_entities: Vec = snapshots @@ -1050,7 +1050,7 @@ async fn attach_client_to_bus( let snapshots = if let Some(ref cursor) = subscription.after { ctx.entity_cache.get_after(view_id, cursor, subscription.snapshot_limit).await } else { - ctx.entity_cache.get_all(view_id).await + ctx.entity_cache.get_all(view_id, subscription.snapshot_limit).await }; let snapshot_entities: Vec = snapshots From fc68ea51cf1074e4442f7264322908901f1e041e Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 04:05:45 +0000 Subject: [PATCH 08/21] fix: add camelCase serde rename to Subscription struct Fixes field name mismatch between TypeScript client and Rust server. The TypeScript Subscription interface uses camelCase (withSnapshot, snapshotLimit), but the Rust struct used snake_case. Added #[serde(rename_all = "camelCase")] attribute so the server properly recognizes these fields. Updated tests to use camelCase JSON keys. --- rust/hyperstack-server/src/websocket/subscription.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rust/hyperstack-server/src/websocket/subscription.rs b/rust/hyperstack-server/src/websocket/subscription.rs index 38a05c83..b9842691 100644 --- a/rust/hyperstack-server/src/websocket/subscription.rs +++ b/rust/hyperstack-server/src/websocket/subscription.rs @@ -14,6 +14,7 @@ pub enum ClientMessage { /// Client subscription to a specific view #[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct Subscription { pub view: String, #[serde(skip_serializing_if = "Option::is_none")] @@ -266,7 +267,7 @@ mod tests { let json = json!({ "type": "subscribe", "view": "SettlementGame/list", - "with_snapshot": false + "withSnapshot": false }); let msg: ClientMessage = serde_json::from_value(json).unwrap(); @@ -303,7 +304,7 @@ mod tests { "type": "subscribe", "view": "SettlementGame/list", "after": "123456789:000000000042", - "snapshot_limit": 100 + "snapshotLimit": 100 }); let msg: ClientMessage = serde_json::from_value(json).unwrap(); From 1d629172fc023aff7576bd79c13351ce12535df7 Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 04:07:26 +0000 Subject: [PATCH 09/21] fix: make snapshot_limit deterministic by sorting before truncation --- rust/hyperstack-server/src/cache.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/rust/hyperstack-server/src/cache.rs b/rust/hyperstack-server/src/cache.rs index 120458fa..4e099cdc 100644 --- a/rust/hyperstack-server/src/cache.rs +++ b/rust/hyperstack-server/src/cache.rs @@ -108,6 +108,15 @@ impl EntityCache { .map(|cache| cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect()) .unwrap_or_default(); + // Sort by _seq descending to return the most recent entities when limit is applied + if limit.is_some() { + results.sort_by(|a, b| { + let seq_a = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + let seq_b = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + seq_b.cmp(seq_a) + }); + } + // Apply limit if provided if let Some(limit) = limit { results.truncate(limit); From 12f8f755d2a125f4a255c4da79e699dab842cdd4 Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 04:09:24 +0000 Subject: [PATCH 10/21] fix: apply snapshot_limit after key filter in websocket subscriptions The snapshot_limit was being applied to the unfiltered cache before the matches_key filter, causing clients to receive fewer entities than the limit when filtering was in use. Now the limit is applied after filtering so clients get up to snapshot_limit matching entities. --- rust/hyperstack-server/src/websocket/server.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/rust/hyperstack-server/src/websocket/server.rs b/rust/hyperstack-server/src/websocket/server.rs index 7877b6c3..82dd9a06 100644 --- a/rust/hyperstack-server/src/websocket/server.rs +++ b/rust/hyperstack-server/src/websocket/server.rs @@ -652,14 +652,15 @@ async fn attach_client_to_bus( if should_send_snapshot { // Determine which entities to send based on cursor let snapshots = if let Some(ref cursor) = subscription.after { - ctx.entity_cache.get_after(view_id, cursor, subscription.snapshot_limit).await + ctx.entity_cache.get_after(view_id, cursor, None).await } else { - ctx.entity_cache.get_all(view_id, subscription.snapshot_limit).await + ctx.entity_cache.get_all(view_id, None).await }; let snapshot_entities: Vec = snapshots .into_iter() .filter(|(key, _)| subscription.matches_key(key)) + .take(subscription.snapshot_limit.unwrap_or(usize::MAX)) .map(|(key, mut data)| { transform_large_u64_to_strings(&mut data); SnapshotEntity { key, data } @@ -1048,14 +1049,15 @@ async fn attach_client_to_bus( if should_send_snapshot { // Determine which entities to send based on cursor let snapshots = if let Some(ref cursor) = subscription.after { - ctx.entity_cache.get_after(view_id, cursor, subscription.snapshot_limit).await + ctx.entity_cache.get_after(view_id, cursor, None).await } else { - ctx.entity_cache.get_all(view_id, subscription.snapshot_limit).await + ctx.entity_cache.get_all(view_id, None).await }; let snapshot_entities: Vec = snapshots .into_iter() .filter(|(key, _)| subscription.matches_key(key)) + .take(subscription.snapshot_limit.unwrap_or(usize::MAX)) .map(|(key, mut data)| { transform_large_u64_to_strings(&mut data); SnapshotEntity { key, data } From 21338051a31c984e2afb94983dc5553bfc0f016c Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 04:31:23 +0000 Subject: [PATCH 11/21] refactor: remove unused limit parameter from get_all --- rust/hyperstack-server/src/cache.rs | 24 ++++--------------- .../src/materialized_view.rs | 2 +- .../hyperstack-server/src/websocket/server.rs | 4 ++-- 3 files changed, 7 insertions(+), 23 deletions(-) diff --git a/rust/hyperstack-server/src/cache.rs b/rust/hyperstack-server/src/cache.rs index 4e099cdc..c217e6c9 100644 --- a/rust/hyperstack-server/src/cache.rs +++ b/rust/hyperstack-server/src/cache.rs @@ -100,29 +100,13 @@ impl EntityCache { /// /// Returns a vector of (key, entity) pairs for sending as snapshots /// to new subscribers. - pub async fn get_all(&self, view_id: &str, limit: Option) -> Vec<(String, Value)> { + pub async fn get_all(&self, view_id: &str) -> Vec<(String, Value)> { let caches = self.caches.read().await; - let mut results: Vec<(String, Value)> = caches + caches .get(view_id) .map(|cache| cache.iter().map(|(k, v)| (k.clone(), v.clone())).collect()) - .unwrap_or_default(); - - // Sort by _seq descending to return the most recent entities when limit is applied - if limit.is_some() { - results.sort_by(|a, b| { - let seq_a = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); - let seq_b = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); - seq_b.cmp(seq_a) - }); - } - - // Apply limit if provided - if let Some(limit) = limit { - results.truncate(limit); - } - - results + .unwrap_or_default() } /// Get entities with _seq greater than the provided cursor. @@ -515,7 +499,7 @@ mod tests { cache.upsert("tokens/list", "key1", json!({"id": 1})).await; cache.upsert("tokens/list", "key2", json!({"id": 2})).await; - let all = cache.get_all("tokens/list", None).await; + let all = cache.get_all("tokens/list").await; assert_eq!(all.len(), 2); } diff --git a/rust/hyperstack-server/src/materialized_view.rs b/rust/hyperstack-server/src/materialized_view.rs index 6e00d215..44b68e30 100644 --- a/rust/hyperstack-server/src/materialized_view.rs +++ b/rust/hyperstack-server/src/materialized_view.rs @@ -96,7 +96,7 @@ impl MaterializedView { /// Evaluate initial state from cache pub async fn evaluate_initial(&self, cache: &EntityCache) -> Vec<(String, Value)> { - let entities = cache.get_all(&self.source_id, None).await; + let entities = cache.get_all(&self.source_id).await; self.evaluate_pipeline(entities).await } diff --git a/rust/hyperstack-server/src/websocket/server.rs b/rust/hyperstack-server/src/websocket/server.rs index 82dd9a06..31d47ee4 100644 --- a/rust/hyperstack-server/src/websocket/server.rs +++ b/rust/hyperstack-server/src/websocket/server.rs @@ -654,7 +654,7 @@ async fn attach_client_to_bus( let snapshots = if let Some(ref cursor) = subscription.after { ctx.entity_cache.get_after(view_id, cursor, None).await } else { - ctx.entity_cache.get_all(view_id, None).await + ctx.entity_cache.get_all(view_id).await }; let snapshot_entities: Vec = snapshots @@ -1051,7 +1051,7 @@ async fn attach_client_to_bus( let snapshots = if let Some(ref cursor) = subscription.after { ctx.entity_cache.get_after(view_id, cursor, None).await } else { - ctx.entity_cache.get_all(view_id, None).await + ctx.entity_cache.get_all(view_id).await }; let snapshot_entities: Vec = snapshots From 6abba2a923bb725465f579e6f4ee25e2d68ec03e Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 04:34:26 +0000 Subject: [PATCH 12/21] fix: sort entities by _seq before applying snapshot_limit When only snapshot_limit is set (no cursor), entities were being taken from DashMap in non-deterministic order. Now we sort by _seq descending to ensure deterministic results, matching the behavior when an after cursor is provided. This applies to both the otel and non-otel code paths for List/Append mode subscriptions. --- .../hyperstack-server/src/websocket/server.rs | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/rust/hyperstack-server/src/websocket/server.rs b/rust/hyperstack-server/src/websocket/server.rs index 31d47ee4..f40c431e 100644 --- a/rust/hyperstack-server/src/websocket/server.rs +++ b/rust/hyperstack-server/src/websocket/server.rs @@ -651,12 +651,21 @@ async fn attach_client_to_bus( if should_send_snapshot { // Determine which entities to send based on cursor - let snapshots = if let Some(ref cursor) = subscription.after { + let mut snapshots = if let Some(ref cursor) = subscription.after { ctx.entity_cache.get_after(view_id, cursor, None).await } else { ctx.entity_cache.get_all(view_id).await }; + // Sort by _seq descending when snapshot_limit is set to ensure deterministic results + if subscription.snapshot_limit.is_some() { + snapshots.sort_by(|a, b| { + let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + sb.cmp(sa) // descending: most-recent N + }); + } + let snapshot_entities: Vec = snapshots .into_iter() .filter(|(key, _)| subscription.matches_key(key)) @@ -1048,12 +1057,21 @@ async fn attach_client_to_bus( if should_send_snapshot { // Determine which entities to send based on cursor - let snapshots = if let Some(ref cursor) = subscription.after { + let mut snapshots = if let Some(ref cursor) = subscription.after { ctx.entity_cache.get_after(view_id, cursor, None).await } else { ctx.entity_cache.get_all(view_id).await }; + // Sort by _seq descending when snapshot_limit is set to ensure deterministic results + if subscription.snapshot_limit.is_some() { + snapshots.sort_by(|a, b| { + let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + sb.cmp(sa) // descending: most-recent N + }); + } + let snapshot_entities: Vec = snapshots .into_iter() .filter(|(key, _)| subscription.matches_key(key)) From 522d7ae2b3d77bbd8cbd9c3ca92764138c826e9c Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 04:35:44 +0000 Subject: [PATCH 13/21] fix: add camelCase serde rename to Subscription struct --- rust/hyperstack-sdk/src/subscription.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/hyperstack-sdk/src/subscription.rs b/rust/hyperstack-sdk/src/subscription.rs index a085764d..f3cc0edd 100644 --- a/rust/hyperstack-sdk/src/subscription.rs +++ b/rust/hyperstack-sdk/src/subscription.rs @@ -10,6 +10,7 @@ pub enum ClientMessage { } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] pub struct Subscription { pub view: String, #[serde(skip_serializing_if = "Option::is_none")] From 6f4a4d36132809ec91cd4a542e4239b82ca83eca Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 04:50:10 +0000 Subject: [PATCH 14/21] fix: correct snapshot ordering when using cursor with limit When both `after` (cursor) and `snapshot_limit` are set, the descending sort was overwriting the ascending order from `get_after`, causing the wrong subset to be returned (most recent N instead of next N). - Only apply descending sort when there's no cursor - Pass snapshot_limit to get_after to avoid loading unnecessary data - Remove redundant .take() call since limiting now happens in get_after --- rust/hyperstack-server/src/websocket/server.rs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/rust/hyperstack-server/src/websocket/server.rs b/rust/hyperstack-server/src/websocket/server.rs index f40c431e..18dd8d55 100644 --- a/rust/hyperstack-server/src/websocket/server.rs +++ b/rust/hyperstack-server/src/websocket/server.rs @@ -652,13 +652,13 @@ async fn attach_client_to_bus( if should_send_snapshot { // Determine which entities to send based on cursor let mut snapshots = if let Some(ref cursor) = subscription.after { - ctx.entity_cache.get_after(view_id, cursor, None).await + ctx.entity_cache.get_after(view_id, cursor, subscription.snapshot_limit).await } else { ctx.entity_cache.get_all(view_id).await }; - // Sort by _seq descending when snapshot_limit is set to ensure deterministic results - if subscription.snapshot_limit.is_some() { + // Sort by _seq descending only when there is no cursor (to get most-recent N from full cache) + if subscription.snapshot_limit.is_some() && subscription.after.is_none() { snapshots.sort_by(|a, b| { let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); @@ -669,7 +669,6 @@ async fn attach_client_to_bus( let snapshot_entities: Vec = snapshots .into_iter() .filter(|(key, _)| subscription.matches_key(key)) - .take(subscription.snapshot_limit.unwrap_or(usize::MAX)) .map(|(key, mut data)| { transform_large_u64_to_strings(&mut data); SnapshotEntity { key, data } @@ -1058,13 +1057,13 @@ async fn attach_client_to_bus( if should_send_snapshot { // Determine which entities to send based on cursor let mut snapshots = if let Some(ref cursor) = subscription.after { - ctx.entity_cache.get_after(view_id, cursor, None).await + ctx.entity_cache.get_after(view_id, cursor, subscription.snapshot_limit).await } else { ctx.entity_cache.get_all(view_id).await }; - // Sort by _seq descending when snapshot_limit is set to ensure deterministic results - if subscription.snapshot_limit.is_some() { + // Sort by _seq descending only when there is no cursor (to get most-recent N from full cache) + if subscription.snapshot_limit.is_some() && subscription.after.is_none() { snapshots.sort_by(|a, b| { let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); @@ -1075,7 +1074,6 @@ async fn attach_client_to_bus( let snapshot_entities: Vec = snapshots .into_iter() .filter(|(key, _)| subscription.matches_key(key)) - .take(subscription.snapshot_limit.unwrap_or(usize::MAX)) .map(|(key, mut data)| { transform_large_u64_to_strings(&mut data); SnapshotEntity { key, data } From 3808e59d65ccaa1af24477cf29b64e38c718b340 Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 05:15:11 +0000 Subject: [PATCH 15/21] fix: guard setIsLoading(true) when withSnapshot is false --- typescript/react/src/view-hooks.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/typescript/react/src/view-hooks.ts b/typescript/react/src/view-hooks.ts index b3e1bfbb..076198de 100644 --- a/typescript/react/src/view-hooks.ts +++ b/typescript/react/src/view-hooks.ts @@ -43,7 +43,9 @@ export function useStateView( after, snapshotLimit }); - setIsLoading(true); + if (withSnapshot !== false) { + setIsLoading(true); + } return () => { try { @@ -162,7 +164,9 @@ export function useListView( after, snapshotLimit }); - setIsLoading(true); + if (withSnapshot !== false) { + setIsLoading(true); + } return () => { try { From 0782e666a33ac9326f53c0b9f5556d66a663badc Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 05:15:12 +0000 Subject: [PATCH 16/21] fix: add truncate after sorting by _seq to respect snapshot_limit --- .../hyperstack-server/src/websocket/server.rs | 30 +++++++++++-------- 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/rust/hyperstack-server/src/websocket/server.rs b/rust/hyperstack-server/src/websocket/server.rs index 18dd8d55..9befcc27 100644 --- a/rust/hyperstack-server/src/websocket/server.rs +++ b/rust/hyperstack-server/src/websocket/server.rs @@ -658,12 +658,15 @@ async fn attach_client_to_bus( }; // Sort by _seq descending only when there is no cursor (to get most-recent N from full cache) - if subscription.snapshot_limit.is_some() && subscription.after.is_none() { - snapshots.sort_by(|a, b| { - let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); - let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); - sb.cmp(sa) // descending: most-recent N - }); + if let Some(limit) = subscription.snapshot_limit { + if subscription.after.is_none() { + snapshots.sort_by(|a, b| { + let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + sb.cmp(sa) // descending: most-recent N + }); + snapshots.truncate(limit); + } } let snapshot_entities: Vec = snapshots @@ -1063,12 +1066,15 @@ async fn attach_client_to_bus( }; // Sort by _seq descending only when there is no cursor (to get most-recent N from full cache) - if subscription.snapshot_limit.is_some() && subscription.after.is_none() { - snapshots.sort_by(|a, b| { - let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); - let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); - sb.cmp(sa) // descending: most-recent N - }); + if let Some(limit) = subscription.snapshot_limit { + if subscription.after.is_none() { + snapshots.sort_by(|a, b| { + let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); + sb.cmp(sa) // descending: most-recent N + }); + snapshots.truncate(limit); + } } let snapshot_entities: Vec = snapshots From 4935c7a3d006774bedf476738e7bdf82177aca82 Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 05:28:05 +0000 Subject: [PATCH 17/21] docs: clarify after and snapshot_limit ignored for State mode --- rust/hyperstack-server/src/websocket/subscription.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rust/hyperstack-server/src/websocket/subscription.rs b/rust/hyperstack-server/src/websocket/subscription.rs index b9842691..65c7fd22 100644 --- a/rust/hyperstack-server/src/websocket/subscription.rs +++ b/rust/hyperstack-server/src/websocket/subscription.rs @@ -30,10 +30,12 @@ pub struct Subscription { /// Whether to include initial snapshot (defaults to true for backward compatibility) #[serde(skip_serializing_if = "Option::is_none")] pub with_snapshot: Option, - /// Cursor for resuming from a specific point (_seq value) + /// Cursor for resuming from a specific point (_seq value). + /// Note: Ignored for State mode subscriptions (single entity, no pagination). #[serde(skip_serializing_if = "Option::is_none")] pub after: Option, - /// Maximum number of entities to include in snapshot (pagination hint) + /// Maximum number of entities to include in snapshot (pagination hint). + /// Note: Ignored for State mode subscriptions (single entity). #[serde(skip_serializing_if = "Option::is_none")] pub snapshot_limit: Option, } From 18ec45d17f7851e4dce85897a2bf90dff8f0c1da Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 05:28:06 +0000 Subject: [PATCH 18/21] fix: guard refresh() isLoading behind withSnapshot check --- typescript/react/src/view-hooks.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/typescript/react/src/view-hooks.ts b/typescript/react/src/view-hooks.ts index 076198de..14f14b8e 100644 --- a/typescript/react/src/view-hooks.ts +++ b/typescript/react/src/view-hooks.ts @@ -73,7 +73,10 @@ export function useStateView( after, snapshotLimit }); - setIsLoading(true); + const shouldLoad = withSnapshot ?? true; + if (shouldLoad) { + setIsLoading(true); + } setTimeout(() => { try { @@ -197,7 +200,10 @@ export function useListView( after, snapshotLimit }); - setIsLoading(true); + const shouldLoad = withSnapshot ?? true; + if (shouldLoad) { + setIsLoading(true); + } setTimeout(() => { try { From 7b2c06cb699b1f8bc1503a31a190e08a91996158 Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 11:57:32 +0000 Subject: [PATCH 19/21] fix: Fix _seq numeric comparison and missing borrow_and_update in cache and WebSocket handlers --- rust/hyperstack-server/src/cache.rs | 17 +++++++++++++++-- rust/hyperstack-server/src/websocket/server.rs | 8 +++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/rust/hyperstack-server/src/cache.rs b/rust/hyperstack-server/src/cache.rs index c217e6c9..266de235 100644 --- a/rust/hyperstack-server/src/cache.rs +++ b/rust/hyperstack-server/src/cache.rs @@ -16,6 +16,19 @@ const DEFAULT_MAX_ARRAY_LENGTH: usize = 100; const DEFAULT_INITIAL_SNAPSHOT_BATCH_SIZE: usize = 50; const DEFAULT_SUBSEQUENT_SNAPSHOT_BATCH_SIZE: usize = 100; +/// Compare two `_seq` values numerically. +/// `_seq` format is "{slot}:{offset}" where slot is not zero-padded. +/// This handles digit-count boundaries correctly (e.g., 99999999 < 100000000). +pub fn cmp_seq(a: &str, b: &str) -> std::cmp::Ordering { + fn parse(s: &str) -> (u64, u64) { + let mut parts = s.splitn(2, ':'); + let slot = parts.next().and_then(|p| p.parse().ok()).unwrap_or(0); + let offset = parts.next().and_then(|p| p.parse().ok()).unwrap_or(0); + (slot, offset) + } + parse(a).cmp(&parse(b)) +} + /// Configuration for the entity cache #[derive(Debug, Clone)] pub struct EntityCacheConfig { @@ -129,7 +142,7 @@ impl EntityCache { entity .get("_seq") .and_then(|s| s.as_str()) - .map(|seq| seq > cursor) + .map(|seq| cmp_seq(seq, cursor) == std::cmp::Ordering::Greater) .unwrap_or(false) }) .map(|(k, v)| (k.clone(), v.clone())) @@ -139,7 +152,7 @@ impl EntityCache { results.sort_by(|a, b| { let seq_a = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); let seq_b = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); - seq_a.cmp(seq_b) + cmp_seq(seq_a, seq_b) }); // Apply limit if provided diff --git a/rust/hyperstack-server/src/websocket/server.rs b/rust/hyperstack-server/src/websocket/server.rs index 9befcc27..d2b7d290 100644 --- a/rust/hyperstack-server/src/websocket/server.rs +++ b/rust/hyperstack-server/src/websocket/server.rs @@ -1,5 +1,5 @@ use crate::bus::BusManager; -use crate::cache::{EntityCache, SnapshotBatchConfig}; +use crate::cache::{cmp_seq, EntityCache, SnapshotBatchConfig}; use crate::compression::maybe_compress; use crate::view::{ViewIndex, ViewSpec}; use crate::websocket::client_manager::ClientManager; @@ -610,6 +610,7 @@ async fn attach_client_to_bus( } } else { info!("Client {} subscribed to {} without snapshot", ctx.client_id, view_id); + rx.borrow_and_update(); } let client_id = ctx.client_id; @@ -663,7 +664,7 @@ async fn attach_client_to_bus( snapshots.sort_by(|a, b| { let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); - sb.cmp(sa) // descending: most-recent N + cmp_seq(sb, sa) // descending: most-recent N }); snapshots.truncate(limit); } @@ -1022,6 +1023,7 @@ async fn attach_client_to_bus( } } else { info!("Client {} subscribed to {} without snapshot", ctx.client_id, view_id); + rx.borrow_and_update(); } let client_id = ctx.client_id; @@ -1071,7 +1073,7 @@ async fn attach_client_to_bus( snapshots.sort_by(|a, b| { let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or(""); - sb.cmp(sa) // descending: most-recent N + cmp_seq(sb, sa) // descending: most-recent N }); snapshots.truncate(limit); } From b8a369264feba1245a53c8b550fb09875382a03c Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 12:32:09 +0000 Subject: [PATCH 20/21] fix: prevent isLoading stuck when withSnapshot is false in view hooks --- typescript/react/src/view-hooks.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/typescript/react/src/view-hooks.ts b/typescript/react/src/view-hooks.ts index 14f14b8e..6c214d8b 100644 --- a/typescript/react/src/view-hooks.ts +++ b/typescript/react/src/view-hooks.ts @@ -18,7 +18,7 @@ export function useStateView( key?: Record, options?: ViewHookOptions ): ViewHookResult { - const [isLoading, setIsLoading] = useState(!options?.initialData); + const [isLoading, setIsLoading] = useState(!options?.initialData && options?.withSnapshot !== false); const [error, setError] = useState(); const clientRef = useRef(client); clientRef.current = client; @@ -134,7 +134,7 @@ export function useListView( params?: ListParams, options?: ViewHookOptions ): ViewHookResult { - const [isLoading, setIsLoading] = useState(!options?.initialData); + const [isLoading, setIsLoading] = useState(!options?.initialData && params?.withSnapshot !== false); const [error, setError] = useState(); const clientRef = useRef(client); clientRef.current = client; From 81adab0925d5ec7e5171d3ccace88349e6d0e0b1 Mon Sep 17 00:00:00 2001 From: Adrian Henry Date: Sat, 21 Mar 2026 12:32:10 +0000 Subject: [PATCH 21/21] docs: document that after cursor is unsupported for derived views --- rust/hyperstack-server/src/websocket/subscription.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rust/hyperstack-server/src/websocket/subscription.rs b/rust/hyperstack-server/src/websocket/subscription.rs index 65c7fd22..8785acb7 100644 --- a/rust/hyperstack-server/src/websocket/subscription.rs +++ b/rust/hyperstack-server/src/websocket/subscription.rs @@ -32,6 +32,8 @@ pub struct Subscription { pub with_snapshot: Option, /// Cursor for resuming from a specific point (_seq value). /// Note: Ignored for State mode subscriptions (single entity, no pagination). + /// Note: Not supported for derived views (windowed aggregations with sort). Derived views + /// always emit `seq: None` in live update frames, so cursor-based reconnection is unavailable. #[serde(skip_serializing_if = "Option::is_none")] pub after: Option, /// Maximum number of entities to include in snapshot (pagination hint).