diff --git a/src-tauri/src/commands/mls.rs b/src-tauri/src/commands/mls.rs index 674768a..3326624 100644 --- a/src-tauri/src/commands/mls.rs +++ b/src-tauri/src/commands/mls.rs @@ -653,8 +653,7 @@ pub async fn invite_member_to_group( .await .map_err(|e| format!("Task join error: {}", e))??; - // Sync participants array after adding members - sync_mls_group_participants(group_id).await?; + // Participant sync happens inside the background task after merge completes Ok(()) } @@ -681,8 +680,7 @@ pub async fn remove_mls_member_device( .await .map_err(|e| format!("Task join error: {}", e))??; - // Sync participants array after removing member - sync_mls_group_participants(group_id).await?; + // Participant sync happens inside the background task after merge completes Ok(()) } diff --git a/src-tauri/src/mls/mod.rs b/src-tauri/src/mls/mod.rs index 850258f..6bcfc8c 100644 --- a/src-tauri/src/mls/mod.rs +++ b/src-tauri/src/mls/mod.rs @@ -43,6 +43,51 @@ pub fn get_group_sync_lock(group_id: &str) -> Arc> { .clone() } +/// Publish a nostr event to TRUSTED_RELAYS with retries and exponential backoff. +/// +/// Matches the retry pattern used in pika core's `publish_evolution_event`: +/// 5 attempts, 250ms base backoff, retries on NIP-42 auth/protected errors. +/// Returns `Ok(())` when at least one relay confirms, `Err` after exhausting retries. +async fn publish_event_with_retries( + client: &nostr_sdk::Client, + event: &nostr_sdk::Event, +) -> Result<(), String> { + use std::time::Duration; + + let mut last_err: Option = None; + for attempt in 0..5u8 { + match client + .send_event_to(TRUSTED_RELAYS.iter().copied(), event) + .await + { + Ok(output) if !output.success.is_empty() => { + return Ok(()); + } + Ok(output) => { + let errors: Vec<&str> = output.failed.values().map(|s| s.as_str()).collect(); + let summary = if errors.is_empty() { + "no relay accepted event".to_string() + } else { + errors.join("; ") + }; + let any_retryable = errors.iter().any(|e| { + e.contains("protected") || e.contains("auth") || e.contains("AUTH") + }); + last_err = Some(summary); + if !any_retryable { + break; + } + } + Err(e) => { + last_err = Some(e.to_string()); + } + } + let delay_ms = 250u64.saturating_mul(1u64 << attempt); + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + } + Err(last_err.unwrap_or_else(|| "unknown error".to_string())) +} + /// Main MLS service facade /// /// Responsibilities: @@ -494,9 +539,11 @@ impl MlsService { /// /// This will: /// 1. Fetch the device's keypackage from the network - /// 2. Add the device to the group via nostr-mls - /// 3. Send the welcome message - /// 4. Update group metadata + /// 2. Create the add-member commit via MDK (does not merge yet) + /// 3. Return immediately — relay publish, merge, welcome, and metadata + /// update happen in a background task (MIP-02 / MIP-03 ordering) + /// + /// Background ordering: relay confirm → merge_pending_commit → send welcomes → UI update pub async fn add_member_device( &self, group_id: &str, @@ -575,12 +622,11 @@ impl MlsService { // Convert engine_group_id hex to GroupId let mls_group_id = GroupId::from_slice(&hex_string_to_bytes(&group_meta.engine_group_id)); - // Perform engine operations: add member and merge commit BEFORE publishing - // This ensures our local state is correct before announcing to the network + // Create the commit but do NOT merge yet — merge only after relay confirmation + // (MIP-02: commit must be on relay before welcome; MIP-03: relay ack before local merge) let (evolution_event, welcome_rumors) = { let engine = self.engine()?; - // Add member to group - returns AddMembersResult with evolution_event and welcome_rumors let add_result = engine .add_members(&mls_group_id, std::slice::from_ref(&kp_event)) .map_err(|e| { @@ -588,62 +634,104 @@ impl MlsService { MlsError::NostrMlsError(format!("Failed to add member: {}", e)) })?; - // CRITICAL: Merge the pending commit immediately after creating it - // This ensures our local state is correct BEFORE publishing to the network - // If we publish first and merge fails, remote and local state will desync - engine - .merge_pending_commit(&mls_group_id) - .map_err(|e| { - eprintln!("[MLS] Failed to merge commit: {}", e); - MlsError::NostrMlsError(format!("Failed to merge commit: {}", e)) - })?; - (add_result.evolution_event, add_result.welcome_rumors) }; - // Publish evolution event (commit) in the background — local state is already updated - let group_id_clone = group_id.to_string(); + // Spawn background task: relay publish → merge → welcomes → UI update. + // The Tauri command returns immediately so the frontend isn't blocked. + let db_path = self.db_path.clone(); + let group_id_owned = group_id.to_string(); + let engine_group_id = group_meta.engine_group_id.clone(); tokio::spawn(async move { let client = NOSTR_CLIENT.get().unwrap(); - match client.send_event(&evolution_event).await { - Ok(_) => { + + // Hold per-group lock for the entire publish → merge → welcome flow + let group_lock = get_group_sync_lock(&group_id_owned); + let _guard = group_lock.lock().await; + + // 1. Publish evolution event with retries + if let Err(e) = publish_event_with_retries(client, &evolution_event).await { + eprintln!("[MLS] Failed to publish commit after retries: {}", e); + if let Some(handle) = TAURI_APP.get() { + handle.emit("mls_error", serde_json::json!({ + "group_id": group_id_owned, + "error": format!("Failed to publish invite: {}", e) + })).ok(); + } + // Don't merge — MIP-03 requires relay ack before local epoch advance. + // Pending commit remains; next sync or retry can recover. + return; + } + + // 2. Merge pending commit now that relay confirmed + let mls_group_id = GroupId::from_slice(&hex_string_to_bytes(&engine_group_id)); + let storage = match MdkSqliteStorage::new_unencrypted(&db_path) { + Ok(s) => s, + Err(e) => { + eprintln!("[MLS] Failed to open storage for merge: {}", e); if let Some(handle) = TAURI_APP.get() { - let _ = track_mls_event_processed(handle, &evolution_event.id.to_hex(), &group_id_clone, evolution_event.created_at.as_secs()); + handle.emit("mls_error", serde_json::json!({ + "group_id": group_id_owned, + "error": format!("Failed to open storage for merge: {}", e) + })).ok(); } + return; } - Err(e) => eprintln!("[MLS] Failed to publish commit: {}", e), + }; + let engine = MDK::new(storage); + if let Err(e) = engine.merge_pending_commit(&mls_group_id) { + eprintln!("[MLS] Failed to merge commit after relay confirm: {}", e); + if let Some(handle) = TAURI_APP.get() { + handle.emit("mls_error", serde_json::json!({ + "group_id": group_id_owned, + "error": format!("Failed to merge commit: {}", e) + })).ok(); + } + return; } - }); - // Send welcome messages to the new member (concurrently) - if let Some(welcome_rumors) = welcome_rumors { - let futs: Vec<_> = welcome_rumors - .into_iter() - .map(|welcome| async { - if let Err(e) = client.gift_wrap_to(TRUSTED_RELAYS.iter().copied(), &member_pk, welcome, []).await { - eprintln!("[MLS] Failed to send welcome: {}", e); - } - }) - .collect(); - futures_util::future::join_all(futs).await; - } + // Track the event as processed only after merge succeeds + if let Some(handle) = TAURI_APP.get() { + let _ = track_mls_event_processed( + handle, + &evolution_event.id.to_hex(), + &group_id_owned, + evolution_event.created_at.as_secs(), + ); + } - // Update group metadata timestamp - let mut groups = self.read_groups().await?; - if let Some(group) = groups.iter_mut().find(|g| g.group_id == group_id) { - group.updated_at = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(); - self.write_groups(&groups).await?; - } + // 3. Send welcome messages (only after commit is on relay and merged) + if let Some(welcome_rumors) = welcome_rumors { + let futs: Vec<_> = welcome_rumors + .into_iter() + .map(|welcome| async move { + if let Err(e) = client.gift_wrap_to(TRUSTED_RELAYS.iter().copied(), &member_pk, welcome, []).await { + eprintln!("[MLS] Failed to send welcome: {}", e); + } + }) + .collect(); + futures_util::future::join_all(futs).await; + } - // Emit event to refresh UI - if let Some(handle) = TAURI_APP.get() { - handle.emit("mls_group_updated", serde_json::json!({ - "group_id": group_id - })).ok(); - } + // 4. Sync participants + update metadata + emit UI refresh + if let Err(e) = crate::commands::mls::sync_mls_group_participants(group_id_owned.clone()).await { + eprintln!("[MLS] Failed to sync participants after add: {}", e); + } + if let Some(handle) = TAURI_APP.get() { + if let Ok(mut groups) = crate::db::load_mls_groups(handle).await { + if let Some(group) = groups.iter_mut().find(|g| g.group_id == group_id_owned) { + group.updated_at = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + let _ = crate::db::save_mls_groups(handle.clone(), &groups).await; + } + } + handle.emit("mls_group_updated", serde_json::json!({ + "group_id": group_id_owned + })).ok(); + } + }); Ok(()) } @@ -653,11 +741,11 @@ impl MlsService { /// /// This will: /// 1. Fetch all members' keypackages from the network - /// 2. Add all members to the group via a single engine.add_members() call - /// 3. Merge pending commit once - /// 4. Publish evolution event - /// 5. Send all welcome messages concurrently - /// 6. Update group metadata + /// 2. Create the add-members commit via MDK (does not merge yet) + /// 3. Return immediately — relay publish, merge, welcomes, and metadata + /// update happen in a background task (MIP-02 / MIP-03 ordering) + /// + /// Background ordering: relay confirm → merge_pending_commit → send welcomes → UI update pub async fn add_member_devices( &self, group_id: &str, @@ -737,7 +825,8 @@ impl MlsService { .ok_or(MlsError::GroupNotFound)?; let mls_group_id = GroupId::from_slice(&hex_string_to_bytes(&group_meta.engine_group_id)); - // Add all members in a single engine call + // Create the commit but do NOT merge yet — merge only after relay confirmation + // (MIP-02: commit must be on relay before welcome; MIP-03: relay ack before local merge) let (evolution_event, welcome_rumors) = { let engine = self.engine()?; @@ -748,91 +837,136 @@ impl MlsService { MlsError::NostrMlsError(format!("Failed to add members: {}", e)) })?; - engine - .merge_pending_commit(&mls_group_id) - .map_err(|e| { - eprintln!("[MLS] Failed to merge commit: {}", e); - MlsError::NostrMlsError(format!("Failed to merge commit: {}", e)) - })?; - (add_result.evolution_event, add_result.welcome_rumors) }; - // Publish evolution event in the background - let group_id_clone = group_id.to_string(); + // Spawn background task: relay publish → merge → welcomes → UI update. + // The Tauri command returns immediately so the frontend isn't blocked. + let db_path = self.db_path.clone(); + let group_id_owned = group_id.to_string(); + let engine_group_id = group_meta.engine_group_id.clone(); tokio::spawn(async move { let client = NOSTR_CLIENT.get().unwrap(); - match client.send_event(&evolution_event).await { - Ok(_) => { + + // Hold per-group lock for the entire publish → merge → welcome flow + let group_lock = get_group_sync_lock(&group_id_owned); + let _guard = group_lock.lock().await; + + // 1. Publish evolution event with retries + if let Err(e) = publish_event_with_retries(client, &evolution_event).await { + eprintln!("[MLS] Failed to publish commit after retries: {}", e); + if let Some(handle) = TAURI_APP.get() { + handle.emit("mls_error", serde_json::json!({ + "group_id": group_id_owned, + "error": format!("Failed to publish invite: {}", e) + })).ok(); + } + // Don't merge — MIP-03 requires relay ack before local epoch advance. + // Pending commit remains; next sync or retry can recover. + return; + } + + // 2. Merge pending commit now that relay confirmed + let mls_group_id = GroupId::from_slice(&hex_string_to_bytes(&engine_group_id)); + let storage = match MdkSqliteStorage::new_unencrypted(&db_path) { + Ok(s) => s, + Err(e) => { + eprintln!("[MLS] Failed to open storage for merge: {}", e); if let Some(handle) = TAURI_APP.get() { - let _ = track_mls_event_processed(handle, &evolution_event.id.to_hex(), &group_id_clone, evolution_event.created_at.as_secs()); + handle.emit("mls_error", serde_json::json!({ + "group_id": group_id_owned, + "error": format!("Failed to open storage for merge: {}", e) + })).ok(); } + return; + } + }; + let engine = MDK::new(storage); + if let Err(e) = engine.merge_pending_commit(&mls_group_id) { + eprintln!("[MLS] Failed to merge commit after relay confirm: {}", e); + if let Some(handle) = TAURI_APP.get() { + handle.emit("mls_error", serde_json::json!({ + "group_id": group_id_owned, + "error": format!("Failed to merge commit: {}", e) + })).ok(); } - Err(e) => eprintln!("[MLS] Failed to publish commit: {}", e), + return; } - }); - // Send welcome messages concurrently, pairing each welcome with its recipient - if let Some(welcome_rumors) = welcome_rumors { - let invited_count = invited_recipients.len(); - if welcome_rumors.len() != invited_count { - eprintln!( - "[MLS] welcome/member count mismatch: welcomes={}, invited={}", - welcome_rumors.len(), - invited_count + // Track the event as processed only after merge succeeds + if let Some(handle) = TAURI_APP.get() { + let _ = track_mls_event_processed( + handle, + &evolution_event.id.to_hex(), + &group_id_owned, + evolution_event.created_at.as_secs(), ); } - let min_len = std::cmp::min(welcome_rumors.len(), invited_recipients.len()); - let futs: Vec<_> = (0..min_len) - .map(|i| { - let welcome = welcome_rumors[i].clone(); - let target = invited_recipients[i]; - async move { - match client - .gift_wrap_to(TRUSTED_RELAYS.iter().copied(), &target, welcome, []) - .await - { - Ok(wrapper_id) => { - let recipient = target.to_bech32().unwrap_or_default(); - println!( - "[MLS][welcome][published] wrapper_id={}, recipient={}, relays={:?}", - wrapper_id.to_hex(), - recipient, - TRUSTED_RELAYS - ); - } - Err(e) => { - let recipient = target.to_bech32().unwrap_or_default(); - eprintln!( - "[MLS][welcome][publish_error] recipient={}, relays={:?}, err={}", - recipient, - TRUSTED_RELAYS, - e - ); + + // 3. Send welcome messages concurrently, pairing each welcome with its recipient + if let Some(welcome_rumors) = welcome_rumors { + let invited_count = invited_recipients.len(); + if welcome_rumors.len() != invited_count { + eprintln!( + "[MLS] welcome/member count mismatch: welcomes={}, invited={}", + welcome_rumors.len(), + invited_count + ); + } + let min_len = std::cmp::min(welcome_rumors.len(), invited_recipients.len()); + let futs: Vec<_> = (0..min_len) + .map(|i| { + let welcome = welcome_rumors[i].clone(); + let target = invited_recipients[i]; + async move { + match client + .gift_wrap_to(TRUSTED_RELAYS.iter().copied(), &target, welcome, []) + .await + { + Ok(wrapper_id) => { + let recipient = target.to_bech32().unwrap_or_default(); + println!( + "[MLS][welcome][published] wrapper_id={}, recipient={}, relays={:?}", + wrapper_id.to_hex(), + recipient, + TRUSTED_RELAYS + ); + } + Err(e) => { + let recipient = target.to_bech32().unwrap_or_default(); + eprintln!( + "[MLS][welcome][publish_error] recipient={}, relays={:?}, err={}", + recipient, + TRUSTED_RELAYS, + e + ); + } } } - } - }) - .collect(); - futures_util::future::join_all(futs).await; - } - - // Update group metadata timestamp - let mut groups = self.read_groups().await?; - if let Some(group) = groups.iter_mut().find(|g| g.group_id == group_id) { - group.updated_at = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(); - self.write_groups(&groups).await?; - } + }) + .collect(); + futures_util::future::join_all(futs).await; + } - // Emit event to refresh UI - if let Some(handle) = TAURI_APP.get() { - handle.emit("mls_group_updated", serde_json::json!({ - "group_id": group_id - })).ok(); - } + // 4. Sync participants + update metadata + emit UI refresh + if let Err(e) = crate::commands::mls::sync_mls_group_participants(group_id_owned.clone()).await { + eprintln!("[MLS] Failed to sync participants after add: {}", e); + } + if let Some(handle) = TAURI_APP.get() { + if let Ok(mut groups) = crate::db::load_mls_groups(handle).await { + if let Some(group) = groups.iter_mut().find(|g| g.group_id == group_id_owned) { + group.updated_at = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); + let _ = crate::db::save_mls_groups(handle.clone(), &groups).await; + } + } + handle.emit("mls_group_updated", serde_json::json!({ + "group_id": group_id_owned + })).ok(); + } + }); Ok(()) } @@ -939,10 +1073,11 @@ impl MlsService { /// Remove a member device from a group (admin only) /// /// This will: - /// 1. Remove the member using MDK's remove_members() - /// 2. Publish the commit message to remaining group members - /// 3. Merge the pending commit locally - /// 4. Emit UI update event + /// 1. Remove the member using MDK's remove_members() (does not merge yet) + /// 2. Return immediately — relay publish, merge, and UI update happen in + /// a background task (MIP-03 ordering) + /// + /// Background ordering: relay confirm → merge_pending_commit → UI update pub async fn remove_member_device( &self, group_id: &str, @@ -967,8 +1102,9 @@ impl MlsService { // Convert engine_group_id hex to GroupId let mls_group_id = GroupId::from_slice(&hex_string_to_bytes(&group_meta.engine_group_id)); - // Perform engine operation: remove member and merge commit BEFORE publishing - // This ensures our local state is correct before announcing to the network + // Create the commit but do NOT merge yet — merge only after relay confirmation + // (MIP-03: relay ack before local merge) + // // Note: We intentionally do NOT sync before removal. Syncing can re-process // our own commits from the relay, which may corrupt the tree state after // multiple kick/re-invite cycles. A fresh engine reads the latest SQLite state. @@ -989,7 +1125,6 @@ impl MlsService { )); } - // Remove member from group - returns RemoveMembersResult with evolution_event let remove_result = engine .remove_members(&mls_group_id, &[member_pk]) .map_err(|e| { @@ -997,39 +1132,83 @@ impl MlsService { MlsError::NostrMlsError(format!("Failed to remove member: {}", e)) })?; - // CRITICAL: Merge the pending commit immediately after creating it - // This ensures our local state is correct BEFORE publishing to the network - // If we publish first and merge fails, remote and local state will desync - engine - .merge_pending_commit(&mls_group_id) - .map_err(|e| { - eprintln!("[MLS] Failed to merge commit: {}", e); - MlsError::NostrMlsError(format!("Failed to merge commit: {}", e)) - })?; - remove_result.evolution_event }; - // Publish evolution event (commit) in the background — local state is already updated - let group_id_clone = group_id.to_string(); + // Spawn background task: relay publish → merge → UI update. + // The Tauri command returns immediately so the frontend isn't blocked. + let db_path = self.db_path.clone(); + let group_id_owned = group_id.to_string(); + let engine_group_id = group_meta.engine_group_id.clone(); tokio::spawn(async move { let client = NOSTR_CLIENT.get().unwrap(); - match client.send_event(&evolution_event).await { - Ok(_) => { + + // Hold per-group lock for the entire publish → merge flow + let group_lock = get_group_sync_lock(&group_id_owned); + let _guard = group_lock.lock().await; + + // 1. Publish evolution event with retries + if let Err(e) = publish_event_with_retries(client, &evolution_event).await { + eprintln!("[MLS] Failed to publish remove commit after retries: {}", e); + if let Some(handle) = TAURI_APP.get() { + handle.emit("mls_error", serde_json::json!({ + "group_id": group_id_owned, + "error": format!("Failed to publish remove commit: {}", e) + })).ok(); + } + // Don't merge — MIP-03 requires relay ack before local epoch advance. + // Pending commit remains; next sync or retry can recover. + return; + } + + // 2. Merge pending commit now that relay confirmed + let mls_group_id = GroupId::from_slice(&hex_string_to_bytes(&engine_group_id)); + let storage = match MdkSqliteStorage::new_unencrypted(&db_path) { + Ok(s) => s, + Err(e) => { + eprintln!("[MLS] Failed to open storage for merge: {}", e); if let Some(handle) = TAURI_APP.get() { - let _ = track_mls_event_processed(handle, &evolution_event.id.to_hex(), &group_id_clone, evolution_event.created_at.as_secs()); + handle.emit("mls_error", serde_json::json!({ + "group_id": group_id_owned, + "error": format!("Failed to open storage for merge: {}", e) + })).ok(); } + return; } - Err(e) => eprintln!("[MLS] Failed to publish commit: {}", e), + }; + let engine = MDK::new(storage); + if let Err(e) = engine.merge_pending_commit(&mls_group_id) { + eprintln!("[MLS] Failed to merge commit after relay confirm: {}", e); + if let Some(handle) = TAURI_APP.get() { + handle.emit("mls_error", serde_json::json!({ + "group_id": group_id_owned, + "error": format!("Failed to merge commit: {}", e) + })).ok(); + } + return; + } + + // Track the event as processed only after merge succeeds + if let Some(handle) = TAURI_APP.get() { + let _ = track_mls_event_processed( + handle, + &evolution_event.id.to_hex(), + &group_id_owned, + evolution_event.created_at.as_secs(), + ); + } + + // 3. Sync participants + emit UI refresh + if let Err(e) = crate::commands::mls::sync_mls_group_participants(group_id_owned.clone()).await { + eprintln!("[MLS] Failed to sync participants after remove: {}", e); + } + if let Some(handle) = TAURI_APP.get() { + handle.emit("mls_group_updated", serde_json::json!({ + "group_id": group_id_owned + })).ok(); } }); - // Emit event to refresh UI member list - if let Some(handle) = TAURI_APP.get() { - handle.emit("mls_group_updated", serde_json::json!({ - "group_id": group_id - })).ok(); - } Ok(()) } diff --git a/src/main.js b/src/main.js index 1f11e36..ebb5579 100644 --- a/src/main.js +++ b/src/main.js @@ -5112,7 +5112,14 @@ async function setupRustListeners() { console.error('Error handling mls_group_left event:', e); } }); - + + // Listen for async MLS operation failures (background publish/merge errors) + _on('mls_error', (evt) => { + const { group_id, error } = evt.payload || {}; + console.error('[MLS] Background operation failed:', group_id, error); + showToast(error || 'Group operation failed'); + }); + // Listen for MLS initial sync completion after joining a group _on('mls_group_initial_sync', async (evt) => { try {