Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
da7b486
feat(server): Add optional snapshot and cursor-based filtering to Web…
adiman9 Mar 21, 2026
9c5fcc0
feat(rust-sdk): Support optional snapshots and cursor-based resume
adiman9 Mar 21, 2026
3f239e9
feat(typescript-sdk): Support optional snapshots and cursor-based resume
adiman9 Mar 21, 2026
1f7f95b
feat(sdk): Add builder methods and React hooks for new subscription o…
adiman9 Mar 21, 2026
8f4fba0
fix(rust-sdk): Wire up new subscription fields through stream layer
adiman9 Mar 21, 2026
865f953
refactor: fix clippy too_many_arguments warning
adiman9 Mar 21, 2026
b3850e6
fix: apply snapshot_limit when no after cursor is provided
adiman9 Mar 21, 2026
fc68ea5
fix: add camelCase serde rename to Subscription struct
adiman9 Mar 21, 2026
1d62917
fix: make snapshot_limit deterministic by sorting before truncation
adiman9 Mar 21, 2026
12f8f75
fix: apply snapshot_limit after key filter in websocket subscriptions
adiman9 Mar 21, 2026
2133805
refactor: remove unused limit parameter from get_all
adiman9 Mar 21, 2026
6abba2a
fix: sort entities by _seq before applying snapshot_limit
adiman9 Mar 21, 2026
522d7ae
fix: add camelCase serde rename to Subscription struct
adiman9 Mar 21, 2026
6f4a4d3
fix: correct snapshot ordering when using cursor with limit
adiman9 Mar 21, 2026
3808e59
fix: guard setIsLoading(true) when withSnapshot is false
adiman9 Mar 21, 2026
0782e66
fix: add truncate after sorting by _seq to respect snapshot_limit
adiman9 Mar 21, 2026
4935c7a
docs: clarify after and snapshot_limit ignored for State mode
adiman9 Mar 21, 2026
18ec45d
fix: guard refresh() isLoading behind withSnapshot check
adiman9 Mar 21, 2026
7b2c06c
fix: Fix _seq numeric comparison and missing borrow_and_update in cac…
adiman9 Mar 21, 2026
b8a3692
fix: prevent isLoading stuck when withSnapshot is false in view hooks
adiman9 Mar 21, 2026
81adab0
docs: document that after cursor is unsupported for derived views
adiman9 Mar 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions rust/hyperstack-sdk/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ pub enum ConnectionCommand {
Disconnect,
}

/// Options for subscribing to a view with specific parameters
#[derive(Debug, Clone, Default)]
pub struct SubscriptionOptions {
pub take: Option<u32>,
pub skip: Option<u32>,
pub with_snapshot: Option<bool>,
pub after: Option<String>,
pub snapshot_limit: Option<usize>,
}

struct ConnectionManagerInner {
#[allow(dead_code)]
url: String,
Expand Down Expand Up @@ -63,24 +73,26 @@ impl ConnectionManager {
}

pub async fn ensure_subscription(&self, view: &str, key: Option<&str>) {
self.ensure_subscription_with_opts(view, key, None, None)
self.ensure_subscription_with_opts(view, key, SubscriptionOptions::default())
.await
}

pub async fn ensure_subscription_with_opts(
&self,
view: &str,
key: Option<&str>,
take: Option<u32>,
skip: Option<u32>,
opts: SubscriptionOptions,
) {
let sub = Subscription {
view: view.to_string(),
key: key.map(|s| s.to_string()),
partition: None,
filters: None,
take,
skip,
take: opts.take,
skip: opts.skip,
with_snapshot: opts.with_snapshot,
after: opts.after,
snapshot_limit: opts.snapshot_limit,
};

if !self.inner.subscriptions.read().await.contains(&sub) {
Expand Down Expand Up @@ -196,6 +208,9 @@ fn spawn_connection_loop(
filters: None,
take: None,
skip: None,
with_snapshot: None,
after: None,
snapshot_limit: None,
};
subscriptions.write().await.remove(&sub);
let client_msg = ClientMessage::Unsubscribe(unsub);
Expand Down
3 changes: 3 additions & 0 deletions rust/hyperstack-sdk/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ pub struct Frame {
pub data: serde_json::Value,
#[serde(default)]
pub append: Vec<String>,
/// Sequence cursor for ordering and resume capability
#[serde(skip_serializing_if = "Option::is_none")]
pub seq: Option<String>,
}

impl Frame {
Expand Down
74 changes: 70 additions & 4 deletions rust/hyperstack-sdk/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::connection::ConnectionManager;
use crate::connection::{ConnectionManager, SubscriptionOptions};
use crate::frame::Operation;
use crate::store::{SharedStore, StoreUpdate};
use futures_util::Stream;
Expand Down Expand Up @@ -179,6 +179,9 @@ enum EntityStreamState<T> {
subscription_key: Option<String>,
take: Option<u32>,
skip: Option<u32>,
with_snapshot: Option<bool>,
after: Option<String>,
snapshot_limit: Option<usize>,
},
Active {
inner: BroadcastStream<StoreUpdate>,
Expand Down Expand Up @@ -246,6 +249,9 @@ impl<T: DeserializeOwned + Clone + Send + 'static> EntityStream<T> {
subscription_key,
None,
None,
None,
None,
None,
)
}

Expand All @@ -259,6 +265,9 @@ impl<T: DeserializeOwned + Clone + Send + 'static> EntityStream<T> {
subscription_key: Option<String>,
take: Option<u32>,
skip: Option<u32>,
with_snapshot: Option<bool>,
after: Option<String>,
snapshot_limit: Option<usize>,
) -> Self {
Self {
state: EntityStreamState::Lazy {
Expand All @@ -268,6 +277,9 @@ impl<T: DeserializeOwned + Clone + Send + 'static> EntityStream<T> {
subscription_key,
take,
skip,
with_snapshot,
after,
snapshot_limit,
},
view: entity_name,
key_filter,
Expand Down Expand Up @@ -313,6 +325,9 @@ impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for EntityStre
subscription_key,
take,
skip,
with_snapshot,
after,
snapshot_limit,
} = std::mem::replace(&mut this.state, EntityStreamState::Invalid)
else {
unreachable!()
Expand All @@ -326,7 +341,14 @@ impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for EntityStre
let view = subscription_view.clone();
let key = subscription_key.clone();
let fut = Box::pin(async move {
conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip)
let opts = SubscriptionOptions {
take,
skip,
with_snapshot,
after,
snapshot_limit,
};
conn.ensure_subscription_with_opts(&view, key.as_deref(), opts)
.await;
});

Expand Down Expand Up @@ -429,6 +451,9 @@ enum RichEntityStreamState<T> {
subscription_key: Option<String>,
take: Option<u32>,
skip: Option<u32>,
with_snapshot: Option<bool>,
after: Option<String>,
snapshot_limit: Option<usize>,
},
Active {
inner: BroadcastStream<StoreUpdate>,
Expand Down Expand Up @@ -481,6 +506,9 @@ impl<T: DeserializeOwned + Clone + Send + 'static> RichEntityStream<T> {
subscription_key,
None,
None,
None,
None,
None,
)
}

Expand All @@ -494,6 +522,9 @@ impl<T: DeserializeOwned + Clone + Send + 'static> RichEntityStream<T> {
subscription_key: Option<String>,
take: Option<u32>,
skip: Option<u32>,
with_snapshot: Option<bool>,
after: Option<String>,
snapshot_limit: Option<usize>,
) -> Self {
Self {
state: RichEntityStreamState::Lazy {
Expand All @@ -503,6 +534,9 @@ impl<T: DeserializeOwned + Clone + Send + 'static> RichEntityStream<T> {
subscription_key,
take,
skip,
with_snapshot,
after,
snapshot_limit,
},
view: entity_name,
key_filter,
Expand All @@ -527,6 +561,9 @@ impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for RichEntity
subscription_key,
take,
skip,
with_snapshot,
after,
snapshot_limit,
} = std::mem::replace(&mut this.state, RichEntityStreamState::Invalid)
else {
unreachable!()
Expand All @@ -540,7 +577,14 @@ impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for RichEntity
let view = subscription_view.clone();
let key = subscription_key.clone();
let fut = Box::pin(async move {
conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip)
let opts = SubscriptionOptions {
take,
skip,
with_snapshot,
after,
snapshot_limit,
};
conn.ensure_subscription_with_opts(&view, key.as_deref(), opts)
.await;
});

Expand Down Expand Up @@ -890,6 +934,9 @@ enum UseStreamState<T> {
subscription_key: Option<String>,
take: Option<u32>,
skip: Option<u32>,
with_snapshot: Option<bool>,
after: Option<String>,
snapshot_limit: Option<usize>,
},
Active {
inner: BroadcastStream<StoreUpdate>,
Expand Down Expand Up @@ -942,6 +989,9 @@ impl<T: DeserializeOwned + Clone + Send + 'static> UseStream<T> {
subscription_key,
None,
None,
None,
None,
None,
)
}

Expand All @@ -955,6 +1005,9 @@ impl<T: DeserializeOwned + Clone + Send + 'static> UseStream<T> {
subscription_key: Option<String>,
take: Option<u32>,
skip: Option<u32>,
with_snapshot: Option<bool>,
after: Option<String>,
snapshot_limit: Option<usize>,
) -> Self {
Self {
state: UseStreamState::Lazy {
Expand All @@ -964,6 +1017,9 @@ impl<T: DeserializeOwned + Clone + Send + 'static> UseStream<T> {
subscription_key,
take,
skip,
with_snapshot,
after,
snapshot_limit,
},
view: entity_name,
key_filter,
Expand Down Expand Up @@ -1012,6 +1068,9 @@ impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for UseStream<
subscription_key,
take,
skip,
with_snapshot,
after,
snapshot_limit,
} = std::mem::replace(&mut this.state, UseStreamState::Invalid)
else {
unreachable!()
Expand All @@ -1024,7 +1083,14 @@ impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for UseStream<
let view = subscription_view.clone();
let key = subscription_key.clone();
let fut = Box::pin(async move {
conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip)
let opts = SubscriptionOptions {
take,
skip,
with_snapshot,
after,
snapshot_limit,
};
conn.ensure_subscription_with_opts(&view, key.as_deref(), opts)
.await;
});

Expand Down
31 changes: 31 additions & 0 deletions rust/hyperstack-sdk/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub enum ClientMessage {
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Subscription {
pub view: String,
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -22,6 +23,15 @@ pub struct Subscription {
pub take: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub skip: Option<u32>,
/// Whether to include initial snapshot (defaults to true for backward compatibility)
#[serde(skip_serializing_if = "Option::is_none")]
pub with_snapshot: Option<bool>,
/// Cursor for resuming from a specific point (_seq value)
#[serde(skip_serializing_if = "Option::is_none")]
pub after: Option<String>,
/// Maximum number of entities to include in snapshot (pagination hint)
#[serde(skip_serializing_if = "Option::is_none")]
pub snapshot_limit: Option<usize>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
Expand Down Expand Up @@ -67,6 +77,9 @@ impl Subscription {
filters: None,
take: None,
skip: None,
with_snapshot: None,
after: None,
snapshot_limit: None,
}
}

Expand All @@ -90,6 +103,24 @@ impl Subscription {
self
}

/// Set whether to include the initial snapshot (defaults to true)
pub fn with_snapshot(mut self, with_snapshot: bool) -> Self {
self.with_snapshot = Some(with_snapshot);
self
}

/// Set the cursor to resume from (for reconnecting and getting only newer data)
pub fn after(mut self, cursor: impl Into<String>) -> Self {
self.after = Some(cursor.into());
self
}

/// Set the maximum number of entities to include in the snapshot
pub fn with_snapshot_limit(mut self, limit: usize) -> Self {
self.snapshot_limit = Some(limit);
self
}

pub fn sub_key(&self) -> String {
let filters_str = self
.filters
Expand Down
Loading
Loading