Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 163 additions & 82 deletions src-tauri/src/mls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,51 @@ pub use tracking::{is_mls_event_processed, track_mls_event_processed, cleanup_ol
use types::{has_encoding_tag, KeyPackageIndexEntry};
use tracking::wipe_legacy_mls_database;

/// Publish a nostr event to TRUSTED_RELAYS with retries and exponential backoff.
///
/// Matches the retry pattern used in pika core's `publish_evolution_event`:
/// 5 attempts, 250ms base backoff, retries on NIP-42 auth/protected errors.
/// Returns `Ok(())` when at least one relay confirms, `Err` after exhausting retries.
async fn publish_event_with_retries(
client: &nostr_sdk::Client,
event: &nostr_sdk::Event,
) -> Result<(), String> {
use std::time::Duration;

let mut last_err: Option<String> = None;
for attempt in 0..5u8 {
match client
.send_event_to(TRUSTED_RELAYS.iter().copied(), event)
.await
{
Ok(output) if !output.success.is_empty() => {
return Ok(());
}
Ok(output) => {
let errors: Vec<&str> = output.failed.values().map(|s| s.as_str()).collect();
let summary = if errors.is_empty() {
"no relay accepted event".to_string()
} else {
errors.join("; ")
};
let any_retryable = errors.iter().any(|e| {
e.contains("protected") || e.contains("auth") || e.contains("AUTH")
});
last_err = Some(summary);
if !any_retryable {
break;
}
}
Err(e) => {
last_err = Some(e.to_string());
}
}
let delay_ms = 250u64.saturating_mul(1u64 << attempt);
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
Err(last_err.unwrap_or_else(|| "unknown error".to_string()))
}

/// Main MLS service facade
///
/// Responsibilities:
Expand Down Expand Up @@ -478,9 +523,11 @@ impl MlsService {
///
/// This will:
/// 1. Fetch the device's keypackage from the network
/// 2. Add the device to the group via nostr-mls
/// 3. Send the welcome message
/// 4. Update group metadata
/// 2. Create the add-member commit via MDK (does not merge yet)
/// 3. Return immediately — relay publish, merge, welcome, and metadata
/// update happen in a background task (MIP-02 / MIP-03 ordering)
///
/// Background ordering: relay confirm → merge_pending_commit → send welcomes → UI update
pub async fn add_member_device(
&self,
group_id: &str,
Expand Down Expand Up @@ -559,75 +606,89 @@ impl MlsService {
// Convert engine_group_id hex to GroupId
let mls_group_id = GroupId::from_slice(&hex_string_to_bytes(&group_meta.engine_group_id));

// Perform engine operations: add member and merge commit BEFORE publishing
// This ensures our local state is correct before announcing to the network
// Create the commit but do NOT merge yet — merge only after relay confirmation
// (MIP-02: commit must be on relay before welcome; MIP-03: relay ack before local merge)
let (evolution_event, welcome_rumors) = {
let engine = self.engine()?;

// Add member to group - returns AddMembersResult with evolution_event and welcome_rumors
let add_result = engine
.add_members(&mls_group_id, std::slice::from_ref(&kp_event))
.map_err(|e| {
eprintln!("[MLS] Failed to add member: {}", e);
MlsError::NostrMlsError(format!("Failed to add member: {}", e))
})?;

// CRITICAL: Merge the pending commit immediately after creating it
// This ensures our local state is correct BEFORE publishing to the network
// If we publish first and merge fails, remote and local state will desync
engine
.merge_pending_commit(&mls_group_id)
.map_err(|e| {
eprintln!("[MLS] Failed to merge commit: {}", e);
MlsError::NostrMlsError(format!("Failed to merge commit: {}", e))
})?;

(add_result.evolution_event, add_result.welcome_rumors)
};

// Publish evolution event (commit) in the background — local state is already updated
let group_id_clone = group_id.to_string();
// Spawn background task: relay publish → merge → welcomes → UI update.
// The Tauri command returns immediately so the frontend isn't blocked.
let db_path = self.db_path.clone();
let group_id_owned = group_id.to_string();
let engine_group_id = group_meta.engine_group_id.clone();
tokio::spawn(async move {
let client = NOSTR_CLIENT.get().unwrap();
match client.send_event(&evolution_event).await {
Ok(_) => {
if let Some(handle) = TAURI_APP.get() {
let _ = track_mls_event_processed(handle, &evolution_event.id.to_hex(), &group_id_clone, evolution_event.created_at.as_secs());
}
}
Err(e) => eprintln!("[MLS] Failed to publish commit: {}", e),

// 1. Publish evolution event with retries
if let Err(e) = publish_event_with_retries(client, &evolution_event).await {
eprintln!("[MLS] Failed to publish commit after retries: {}", e);
return;
}
});

// Send welcome messages to the new member (concurrently)
if let Some(welcome_rumors) = welcome_rumors {
let futs: Vec<_> = welcome_rumors
.into_iter()
.map(|welcome| async {
if let Err(e) = client.gift_wrap_to(TRUSTED_RELAYS.iter().copied(), &member_pk, welcome, []).await {
eprintln!("[MLS] Failed to send welcome: {}", e);
}
})
.collect();
futures_util::future::join_all(futs).await;
}
// Track the published event
if let Some(handle) = TAURI_APP.get() {
let _ = track_mls_event_processed(
handle,
&evolution_event.id.to_hex(),
&group_id_owned,
evolution_event.created_at.as_secs(),
);
}

// Update group metadata timestamp
let mut groups = self.read_groups().await?;
if let Some(group) = groups.iter_mut().find(|g| g.group_id == group_id) {
group.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
self.write_groups(&groups).await?;
}
// 2. Merge pending commit now that relay confirmed
let mls_group_id = GroupId::from_slice(&hex_string_to_bytes(&engine_group_id));
let storage = match MdkSqliteStorage::new_unencrypted(&db_path) {
Ok(s) => s,
Err(e) => {
eprintln!("[MLS] Failed to open storage for merge: {}", e);
return;
}
};
let engine = MDK::new(storage);
if let Err(e) = engine.merge_pending_commit(&mls_group_id) {
eprintln!("[MLS] Failed to merge commit after relay confirm: {}", e);
return;
}

// Emit event to refresh UI
if let Some(handle) = TAURI_APP.get() {
handle.emit("mls_group_updated", serde_json::json!({
"group_id": group_id
})).ok();
}
// 3. Send welcome messages (only after commit is on relay)
if let Some(welcome_rumors) = welcome_rumors {
let futs: Vec<_> = welcome_rumors
.into_iter()
.map(|welcome| async move {
if let Err(e) = client.gift_wrap_to(TRUSTED_RELAYS.iter().copied(), &member_pk, welcome, []).await {
eprintln!("[MLS] Failed to send welcome: {}", e);
}
})
.collect();
futures_util::future::join_all(futs).await;
}

// 4. Update group metadata timestamp and emit UI refresh
if let Some(handle) = TAURI_APP.get() {
if let Ok(mut groups) = crate::db::load_mls_groups(handle).await {
if let Some(group) = groups.iter_mut().find(|g| g.group_id == group_id_owned) {
group.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let _ = crate::db::save_mls_groups(handle.clone(), &groups).await;
}
}
handle.emit("mls_group_updated", serde_json::json!({
"group_id": group_id_owned
})).ok();
}
});

Ok(())
}
Expand Down Expand Up @@ -735,10 +796,11 @@ impl MlsService {
/// Remove a member device from a group (admin only)
///
/// This will:
/// 1. Remove the member using MDK's remove_members()
/// 2. Publish the commit message to remaining group members
/// 3. Merge the pending commit locally
/// 4. Emit UI update event
/// 1. Remove the member using MDK's remove_members() (does not merge yet)
/// 2. Return immediately — relay publish, merge, and UI update happen in
/// a background task (MIP-03 ordering)
///
/// Background ordering: relay confirm → merge_pending_commit → UI update
pub async fn remove_member_device(
&self,
group_id: &str,
Expand All @@ -765,6 +827,9 @@ impl MlsService {

// Perform engine operation: remove member and merge commit BEFORE publishing
// This ensures our local state is correct before announcing to the network
// Create the commit but do NOT merge yet — merge only after relay confirmation
// (MIP-03: relay ack before local merge)
//
// Note: We intentionally do NOT sync before removal. Syncing can re-process
// our own commits from the relay, which may corrupt the tree state after
// multiple kick/re-invite cycles. A fresh engine reads the latest SQLite state.
Expand All @@ -785,47 +850,63 @@ impl MlsService {
));
}

// Remove member from group - returns RemoveMembersResult with evolution_event
let remove_result = engine
.remove_members(&mls_group_id, &[member_pk])
.map_err(|e| {
eprintln!("[MLS] Failed to remove member: {}", e);
MlsError::NostrMlsError(format!("Failed to remove member: {}", e))
})?;

// CRITICAL: Merge the pending commit immediately after creating it
// This ensures our local state is correct BEFORE publishing to the network
// If we publish first and merge fails, remote and local state will desync
engine
.merge_pending_commit(&mls_group_id)
.map_err(|e| {
eprintln!("[MLS] Failed to merge commit: {}", e);
MlsError::NostrMlsError(format!("Failed to merge commit: {}", e))
})?;

remove_result.evolution_event
};

// Publish evolution event (commit) in the background — local state is already updated
let group_id_clone = group_id.to_string();
// Spawn background task: relay publish → merge → UI update.
// The Tauri command returns immediately so the frontend isn't blocked.
let db_path = self.db_path.clone();
let group_id_owned = group_id.to_string();
let engine_group_id = group_meta.engine_group_id.clone();
tokio::spawn(async move {
let client = NOSTR_CLIENT.get().unwrap();
match client.send_event(&evolution_event).await {
Ok(_) => {
if let Some(handle) = TAURI_APP.get() {
let _ = track_mls_event_processed(handle, &evolution_event.id.to_hex(), &group_id_clone, evolution_event.created_at.as_secs());
}

// 1. Publish evolution event with retries
if let Err(e) = publish_event_with_retries(client, &evolution_event).await {
eprintln!("[MLS] Failed to publish remove commit after retries: {}", e);
return;
}

// Track the published event
if let Some(handle) = TAURI_APP.get() {
let _ = track_mls_event_processed(
handle,
&evolution_event.id.to_hex(),
&group_id_owned,
evolution_event.created_at.as_secs(),
);
}

// 2. Merge pending commit now that relay confirmed
let mls_group_id = GroupId::from_slice(&hex_string_to_bytes(&engine_group_id));
let storage = match MdkSqliteStorage::new_unencrypted(&db_path) {
Ok(s) => s,
Err(e) => {
eprintln!("[MLS] Failed to open storage for merge: {}", e);
return;
}
Err(e) => eprintln!("[MLS] Failed to publish commit: {}", e),
};
let engine = MDK::new(storage);
if let Err(e) = engine.merge_pending_commit(&mls_group_id) {
eprintln!("[MLS] Failed to merge commit after relay confirm: {}", e);
return;
}

// 3. Emit event to refresh UI member list
if let Some(handle) = TAURI_APP.get() {
handle.emit("mls_group_updated", serde_json::json!({
"group_id": group_id_owned
})).ok();
}
});

// Emit event to refresh UI member list
if let Some(handle) = TAURI_APP.get() {
handle.emit("mls_group_updated", serde_json::json!({
"group_id": group_id
})).ok();
}
Ok(())
}

Expand Down
Loading