diff --git a/cmd/server/main-server.go b/cmd/server/main-server.go index 4c28bb866d..0d49a78c37 100644 --- a/cmd/server/main-server.go +++ b/cmd/server/main-server.go @@ -600,7 +600,7 @@ func main() { // use fmt instead of log here to make sure it goes directly to stderr fmt.Fprintf(os.Stderr, "WAVESRV-ESTART ws:%s web:%s version:%s buildtime:%s\n", wsListener.Addr(), webListener.Addr(), WaveVersion, BuildTime) }() - go wshutil.RunWshRpcOverListener(unixListener) + go wshutil.RunWshRpcOverListener(unixListener, nil) web.RunWebServer(webListener) // blocking runtime.KeepAlive(waveLock) } diff --git a/cmd/wsh/cmd/wshcmd-connserver.go b/cmd/wsh/cmd/wshcmd-connserver.go index 9f63976115..86f6c2a923 100644 --- a/cmd/wsh/cmd/wshcmd-connserver.go +++ b/cmd/wsh/cmd/wshcmd-connserver.go @@ -97,7 +97,7 @@ func handleNewListenerConn(conn net.Conn, router *wshutil.WshRouter) { router.UnregisterLink(baseds.LinkId(linkId)) } }() - wshutil.AdaptStreamToMsgCh(conn, proxy.FromRemoteCh) + wshutil.AdaptStreamToMsgCh(conn, proxy.FromRemoteCh, nil) }() linkId := router.RegisterUntrustedLink(proxy) linkIdContainer.Store(int32(linkId)) @@ -265,7 +265,7 @@ func serverRunRouterDomainSocket(jwtToken string) error { log.Printf("upstream domain socket closed, shutting down") wshutil.DoShutdown("", 0, true) }() - wshutil.AdaptStreamToMsgCh(conn, upstreamProxy.FromRemoteCh) + wshutil.AdaptStreamToMsgCh(conn, upstreamProxy.FromRemoteCh, nil) }() // register the domain socket connection as upstream diff --git a/emain/emain.ts b/emain/emain.ts index 58187e5293..960a3d4d2b 100644 --- a/emain/emain.ts +++ b/emain/emain.ts @@ -412,6 +412,16 @@ async function appMain() { fireAndForget(createNewWaveWindow); } }); + electron.powerMonitor.on("resume", () => { + console.log("system resumed from sleep, notifying server"); + fireAndForget(async () => { + try { + await RpcApi.NotifySystemResumeCommand(ElectronWshClient, { noresponse: true }); + } catch (e) { + console.log("error calling NotifySystemResumeCommand", e); + } + }); + }); const rawGlobalHotKey = launchSettings?.["app:globalhotkey"]; if (rawGlobalHotKey) { registerGlobalHotkey(rawGlobalHotKey); diff --git a/frontend/app/block/block.scss b/frontend/app/block/block.scss index 41c7d9835b..65806ba78d 100644 --- a/frontend/app/block/block.scss +++ b/frontend/app/block/block.scss @@ -284,8 +284,8 @@ .connstatus-overlay { position: absolute; top: calc(var(--header-height) + 6px); - left: 6px; - right: 6px; + left: 8px; + right: 8px; z-index: var(--zindex-block-mask-inner); display: flex; align-items: center; @@ -296,6 +296,7 @@ backdrop-filter: blur(50px); border-radius: 6px; box-shadow: 0px 13px 16px 0px rgb(from var(--block-bg-color) r g b / 40%); + opacity: 0.85; .connstatus-content { display: flex; diff --git a/frontend/app/block/connectionbutton.tsx b/frontend/app/block/connectionbutton.tsx index 1f74f00967..c0a37659cb 100644 --- a/frontend/app/block/connectionbutton.tsx +++ b/frontend/app/block/connectionbutton.tsx @@ -80,6 +80,14 @@ export const ConnectionButton = React.memo( color = "var(--grey-text-color)"; titleText = "Disconnected from " + connection; showDisconnectedSlash = true; + } else if (connStatus?.connhealthstatus === "degraded" || connStatus?.connhealthstatus === "stalled") { + color = "var(--warning-color)"; + iconName = "signal-bars-slash"; + if (connStatus.connhealthstatus === "degraded") { + titleText = "Connection degraded: " + connection; + } else { + titleText = "Connection stalled: " + connection; + } } if (iconSvg != null) { connIconElem = iconSvg; diff --git a/frontend/app/block/connstatusoverlay.tsx b/frontend/app/block/connstatusoverlay.tsx index 8f0852abac..6c90511d7d 100644 --- a/frontend/app/block/connstatusoverlay.tsx +++ b/frontend/app/block/connstatusoverlay.tsx @@ -14,6 +14,100 @@ import * as jotai from "jotai"; import { OverlayScrollbarsComponent } from "overlayscrollbars-react"; import * as React from "react"; +function formatElapsedTime(elapsedMs: number): string { + if (elapsedMs <= 0) { + return ""; + } + + const elapsedSeconds = Math.floor(elapsedMs / 1000); + + if (elapsedSeconds < 60) { + return `${elapsedSeconds}s`; + } + + const elapsedMinutes = Math.floor(elapsedSeconds / 60); + if (elapsedMinutes < 60) { + return `${elapsedMinutes}m`; + } + + const elapsedHours = Math.floor(elapsedMinutes / 60); + const remainingMinutes = elapsedMinutes % 60; + + if (elapsedHours < 24) { + if (remainingMinutes === 0) { + return `${elapsedHours}h`; + } + return `${elapsedHours}h${remainingMinutes}m`; + } + + return "more than a day"; +} + +const StalledOverlay = React.memo( + ({ + connName, + connStatus, + overlayRefCallback, + }: { + connName: string; + connStatus: ConnStatus; + overlayRefCallback: (el: HTMLDivElement | null) => void; + }) => { + const [elapsedTime, setElapsedTime] = React.useState(""); + + const handleDisconnect = React.useCallback(() => { + const prtn = RpcApi.ConnDisconnectCommand(TabRpcClient, connName, { timeout: 5000 }); + prtn.catch((e) => console.log("error disconnecting", connName, e)); + }, [connName]); + + React.useEffect(() => { + if (!connStatus.lastactivitybeforestalledtime) { + return; + } + + const updateElapsed = () => { + const now = Date.now(); + const lastActivity = connStatus.lastactivitybeforestalledtime!; + const elapsed = now - lastActivity; + setElapsedTime(formatElapsedTime(elapsed)); + }; + + updateElapsed(); + const interval = setInterval(updateElapsed, 1000); + + return () => clearInterval(interval); + }, [connStatus.lastactivitybeforestalledtime]); + + return ( +
+
+ +
+ Connection to "{connName}" is stalled + {elapsedTime && ` (no activity for ${elapsedTime})`} +
+
+ +
+
+ ); + } +); +StalledOverlay.displayName = "StalledOverlay"; + export const ConnStatusOverlay = React.memo( ({ nodeModel, @@ -121,10 +215,17 @@ export const ConnStatusOverlay = React.memo( [showError, showWshError, connStatus.error, connStatus.wsherror] ); - if (!showWshError && (isLayoutMode || connStatus.status == "connected" || connModalOpen)) { + let showStalled = connStatus.status == "connected" && connStatus.connhealthstatus == "stalled"; + if (!showWshError && !showStalled && (isLayoutMode || connStatus.status == "connected" || connModalOpen)) { return null; } + if (showStalled && !showWshError) { + return ( + + ); + } + return (
diff --git a/frontend/app/store/wshclientapi.ts b/frontend/app/store/wshclientapi.ts index 72ffd8618d..1f90e5f23b 100644 --- a/frontend/app/store/wshclientapi.ts +++ b/frontend/app/store/wshclientapi.ts @@ -512,6 +512,11 @@ class RpcApiType { return client.wshRpcCall("notify", data, opts); } + // command "notifysystemresume" [call] + NotifySystemResumeCommand(client: WshClient, opts?: RpcOpts): Promise { + return client.wshRpcCall("notifysystemresume", null, opts); + } + // command "path" [call] PathCommand(client: WshClient, data: PathCommandData, opts?: RpcOpts): Promise { return client.wshRpcCall("path", data, opts); diff --git a/frontend/tailwindsetup.css b/frontend/tailwindsetup.css index a41c476052..b9f1933825 100644 --- a/frontend/tailwindsetup.css +++ b/frontend/tailwindsetup.css @@ -67,6 +67,7 @@ --container-w600: 600px; --container-w450: 450px; + --container-w350: 350px; --container-xs: 300px; --container-xxs: 200px; --container-tiny: 120px; diff --git a/frontend/types/gotypes.d.ts b/frontend/types/gotypes.d.ts index 8a30f5e9be..c46690d63b 100644 --- a/frontend/types/gotypes.d.ts +++ b/frontend/types/gotypes.d.ts @@ -785,6 +785,7 @@ declare global { // wshrpc.ConnStatus type ConnStatus = { status: string; + connhealthstatus?: string; wshenabled: boolean; connection: string; connected: boolean; @@ -794,6 +795,8 @@ declare global { wsherror?: string; nowshreason?: string; wshversion?: string; + lastactivitybeforestalledtime?: number; + keepalivesenttime?: number; }; // wshrpc.CpuDataRequest diff --git a/pkg/blockcontroller/blockcontroller.go b/pkg/blockcontroller/blockcontroller.go index d5b307e92a..524a66c10b 100644 --- a/pkg/blockcontroller/blockcontroller.go +++ b/pkg/blockcontroller/blockcontroller.go @@ -290,11 +290,31 @@ func DestroyBlockController(blockId string) { deleteController(blockId) } +func sendConnMonitorInputNotification(controller Controller) { + connName := controller.GetConnName() + if connName == "" || conncontroller.IsLocalConnName(connName) || conncontroller.IsWslConnName(connName) { + return + } + + connOpts, parseErr := remote.ParseOpts(connName) + if parseErr != nil { + return + } + sshConn := conncontroller.MaybeGetConn(connOpts) + if sshConn != nil { + monitor := sshConn.GetMonitor() + if monitor != nil { + monitor.NotifyInput() + } + } +} + func SendInput(blockId string, inputUnion *BlockInputUnion) error { controller := getController(blockId) if controller == nil { return fmt.Errorf("no controller found for block %s", blockId) } + sendConnMonitorInputNotification(controller) return controller.SendInput(inputUnion) } @@ -413,7 +433,10 @@ func CheckConnStatus(blockId string) error { if err != nil { return fmt.Errorf("error parsing connection name: %w", err) } - conn := conncontroller.GetConn(opts) + conn := conncontroller.MaybeGetConn(opts) + if conn == nil { + return fmt.Errorf("no connection found") + } connStatus := conn.DeriveConnStatus() if connStatus.Status != conncontroller.Status_Connected { return fmt.Errorf("not connected: %s", connStatus.Status) diff --git a/pkg/blockcontroller/durableshellcontroller.go b/pkg/blockcontroller/durableshellcontroller.go index 25ac22c9aa..a208a3df75 100644 --- a/pkg/blockcontroller/durableshellcontroller.go +++ b/pkg/blockcontroller/durableshellcontroller.go @@ -229,7 +229,7 @@ func (dsc *DurableShellController) startNewJob(ctx context.Context, blockMeta wa if err != nil { return "", fmt.Errorf("invalid ssh remote name (%s): %w", connName, err) } - conn := conncontroller.GetConn(opts) + conn := conncontroller.MaybeGetConn(opts) if conn == nil { return "", fmt.Errorf("connection %q not found", connName) } diff --git a/pkg/blockcontroller/shellcontroller.go b/pkg/blockcontroller/shellcontroller.go index b0c7081efc..a410225394 100644 --- a/pkg/blockcontroller/shellcontroller.go +++ b/pkg/blockcontroller/shellcontroller.go @@ -355,7 +355,7 @@ func (bc *ShellController) getConnUnion(logCtx context.Context, remoteName strin if err != nil { return ConnUnion{}, fmt.Errorf("invalid ssh remote name (%s): %w", remoteName, err) } - conn := conncontroller.GetConn(opts) + conn := conncontroller.MaybeGetConn(opts) if conn == nil { return ConnUnion{}, fmt.Errorf("ssh connection not found: %s", remoteName) } diff --git a/pkg/jobmanager/jobmanager.go b/pkg/jobmanager/jobmanager.go index cdd245af7f..7afaec047e 100644 --- a/pkg/jobmanager/jobmanager.go +++ b/pkg/jobmanager/jobmanager.go @@ -440,7 +440,7 @@ func handleJobDomainSocketClient(conn net.Conn) { panichandler.PanicHandler("handleJobDomainSocketClient:AdaptStreamToMsgCh", recover()) }() defer serverImpl.Close() - wshutil.AdaptStreamToMsgCh(conn, inputCh) + wshutil.AdaptStreamToMsgCh(conn, inputCh, nil) }() _ = wshRpc diff --git a/pkg/remote/conncontroller/conncontroller.go b/pkg/remote/conncontroller/conncontroller.go index 4e6f0aeefd..16a41809f2 100644 --- a/pkg/remote/conncontroller/conncontroller.go +++ b/pkg/remote/conncontroller/conncontroller.go @@ -61,6 +61,12 @@ const ( NoWshCode_InstallVerifyError = "install-verify-error" ) +const ( + ConnHealthStatus_Good = "good" + ConnHealthStatus_Degraded = "degraded" + ConnHealthStatus_Stalled = "stalled" +) + const DefaultConnectionTimeout = 60 * time.Second var globalLock = &sync.Mutex{} @@ -72,6 +78,7 @@ type SSHConn struct { lifecycleLock *sync.Mutex // this protects the lifecycle from concurrent calls Status string + ConnHealthStatus string WshEnabled *atomic.Bool Opts *remote.SSHOpts Client *ssh.Client @@ -84,6 +91,7 @@ type SSHConn struct { WshVersion string LastConnectTime int64 ActiveConnNum int + Monitor *ConnMonitor // will not be nil } var ConnServerCmdTemplate = strings.TrimSpace( @@ -127,17 +135,27 @@ func GetNumSSHHasConnected() int { func (conn *SSHConn) DeriveConnStatus() wshrpc.ConnStatus { conn.lock.Lock() defer conn.lock.Unlock() + var lastActivityBeforeStalledTime int64 + var keepAliveSentTime int64 + monitor := conn.Monitor + if conn.ConnHealthStatus == ConnHealthStatus_Stalled && monitor != nil { + lastActivityBeforeStalledTime = monitor.LastActivityTime.Load() + keepAliveSentTime = monitor.KeepAliveSentTime.Load() + } return wshrpc.ConnStatus{ - Status: conn.Status, - Connected: conn.Status == Status_Connected, - Connection: conn.Opts.String(), - HasConnected: (conn.LastConnectTime > 0), - ActiveConnNum: conn.ActiveConnNum, - Error: conn.Error, - WshEnabled: conn.WshEnabled.Load(), - WshError: conn.WshError, - NoWshReason: conn.NoWshReason, - WshVersion: conn.WshVersion, + Status: conn.Status, + Connected: conn.Status == Status_Connected, + Connection: conn.Opts.String(), + HasConnected: (conn.LastConnectTime > 0), + ActiveConnNum: conn.ActiveConnNum, + Error: conn.Error, + WshEnabled: conn.WshEnabled.Load(), + WshError: conn.WshError, + NoWshReason: conn.NoWshReason, + WshVersion: conn.WshVersion, + ConnHealthStatus: conn.ConnHealthStatus, + LastActivityBeforeStalledTime: lastActivityBeforeStalledTime, + KeepAliveSentTime: keepAliveSentTime, } } @@ -180,9 +198,14 @@ func (conn *SSHConn) Close() error { func (conn *SSHConn) closeInternal_withlifecyclelock() { // does not set status (that should happen at another level) - client := WithLockRtn(conn, func() *ssh.Client { - return conn.Client + conn.WithLock(func() { + if conn.Monitor != nil { + conn.Monitor.Close() + conn.Monitor = nil + } + conn.Monitor = nil }) + client := conn.GetClient() if client != nil { // this MUST go first to force close the connection. // the DomainSockListener.Close() sends SSH protocol packets which can block on a dead network conn @@ -276,7 +299,12 @@ func (conn *SSHConn) OpenDomainSocketListener(ctx context.Context) error { conn.DomainSockListener = nil conn.DomainSockName = "" }) - wshutil.RunWshRpcOverListener(listener) + monitor := conn.GetMonitor() + var updateCallback func() + if monitor != nil { + updateCallback = monitor.UpdateLastActivityTime + } + wshutil.RunWshRpcOverListener(listener, updateCallback) }() return nil } @@ -323,11 +351,11 @@ func (conn *SSHConn) GetEnvironmentMaps(ctx context.Context) (map[string]string, func runSessionWithContext(ctx context.Context, session *ssh.Session, cmd string) error { errCh := make(chan error, 1) - + go func() { errCh <- session.Run(cmd) }() - + select { case <-ctx.Done(): session.Close() @@ -527,6 +555,10 @@ func (conn *SSHConn) StartConnServer(ctx context.Context, afterUpdate bool, useR log.Printf("[conncontroller:%s:output] error: %v\n", conn.GetName(), output.Error) continue } + monitor := conn.GetMonitor() + if monitor != nil { + monitor.UpdateLastActivityTime() + } line := output.Line if !strings.HasSuffix(line, "\n") { line += "\n" @@ -663,6 +695,12 @@ func (conn *SSHConn) GetClient() *ssh.Client { return conn.Client } +func (conn *SSHConn) GetMonitor() *ConnMonitor { + conn.lock.Lock() + defer conn.lock.Unlock() + return conn.Monitor +} + func (conn *SSHConn) WaitForConnect(ctx context.Context) error { for { status := conn.DeriveConnStatus() @@ -916,7 +954,13 @@ func (conn *SSHConn) connectInternal(ctx context.Context, connFlags *wconfig.Con return err } conn.WithLock(func() { + if conn.Monitor != nil { + conn.Monitor.Close() + conn.Monitor = nil + } conn.Client = client + conn.ConnHealthStatus = ConnHealthStatus_Good + conn.Monitor = MakeConnMonitor(conn, client) }) go func() { defer func() { @@ -954,6 +998,11 @@ func (conn *SSHConn) waitForDisconnect() { return } err := client.Wait() + if err != nil { + log.Printf("[conn:%s] client.Wait() returned error: %v", conn.GetName(), err) + } else { + log.Printf("[conn:%s] client.Wait() completed (clean disconnect)", conn.GetName()) + } conn.lifecycleLock.Lock() defer conn.lifecycleLock.Unlock() conn.WithLock(func() { @@ -986,17 +1035,42 @@ func (conn *SSHConn) ClearWshError() { }) } +func (conn *SSHConn) SetConnHealthStatus(client *ssh.Client, status string) { + changed := false + conn.WithLock(func() { + if conn.Client != client { + return + } + if conn.ConnHealthStatus != status { + conn.ConnHealthStatus = status + changed = true + } + }) + if changed { + conn.FireConnChangeEvent() + } +} + +func (conn *SSHConn) GetConnHealthStatus() string { + var status string + conn.WithLock(func() { + status = conn.ConnHealthStatus + }) + return status +} + func getConnInternal(opts *remote.SSHOpts, createIfNotExists bool) *SSHConn { globalLock.Lock() defer globalLock.Unlock() rtn := clientControllerMap[*opts] if rtn == nil && createIfNotExists { rtn = &SSHConn{ - lock: &sync.Mutex{}, - lifecycleLock: &sync.Mutex{}, - Status: Status_Init, - WshEnabled: &atomic.Bool{}, - Opts: opts, + lock: &sync.Mutex{}, + lifecycleLock: &sync.Mutex{}, + Status: Status_Init, + ConnHealthStatus: ConnHealthStatus_Good, + WshEnabled: &atomic.Bool{}, + Opts: opts, } clientControllerMap[*opts] = rtn } @@ -1009,6 +1083,12 @@ func GetConn(opts *remote.SSHOpts) *SSHConn { return conn } +// does NOT connect, can return nil +func MaybeGetConn(opts *remote.SSHOpts) *SSHConn { + conn := getConnInternal(opts, false) + return conn +} + func IsConnected(connName string) (bool, error) { if IsLocalConnName(connName) { return true, nil diff --git a/pkg/remote/conncontroller/connmonitor.go b/pkg/remote/conncontroller/connmonitor.go new file mode 100644 index 0000000000..dc14561484 --- /dev/null +++ b/pkg/remote/conncontroller/connmonitor.go @@ -0,0 +1,198 @@ +// Copyright 2026, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package conncontroller + +import ( + "context" + "log" + "sync" + "sync/atomic" + "time" + + "github.com/wavetermdev/waveterm/pkg/panichandler" + "golang.org/x/crypto/ssh" +) + +// Lock ordering: conn.lock > cm.lock (conn.lock is outer, cm.lock is inner) +// CRITICAL: Methods that hold cm.lock must NEVER call into SSHConn (deadlock - violates ordering). +// Methods called from SSHConn while conn.lock is held should avoid acquiring cm.lock (keep locking simple). +type ConnMonitor struct { + lock *sync.Mutex + Conn *SSHConn // always non-nil, set at creation + Client *ssh.Client // always non-nil, set at creation + LastActivityTime atomic.Int64 + LastInputTime atomic.Int64 + KeepAliveSentTime atomic.Int64 + KeepAliveInFlight bool + ctx context.Context + cancelFunc context.CancelFunc + inputNotifyCh chan int64 +} + +func MakeConnMonitor(conn *SSHConn, client *ssh.Client) *ConnMonitor { + if conn == nil { + panic("conn cannot be nil") + } + if client == nil { + panic("client cannot be nil") + } + ctx, cancelFunc := context.WithCancel(context.Background()) + cm := &ConnMonitor{ + lock: &sync.Mutex{}, + Conn: conn, + Client: client, + ctx: ctx, + cancelFunc: cancelFunc, + inputNotifyCh: make(chan int64, 1), + } + go cm.keepAliveMonitor() + return cm +} + +// setConnHealthStatus calls into SSHConn.SetConnHealthStatus +// CRITICAL: cm.lock must NOT be held when calling this method (violates lock ordering) +func (cm *ConnMonitor) setConnHealthStatus(status string) { + cm.Conn.SetConnHealthStatus(cm.Client, status) +} + +func (cm *ConnMonitor) UpdateLastActivityTime() { + cm.LastActivityTime.Store(time.Now().UnixMilli()) + cm.setConnHealthStatus(ConnHealthStatus_Good) +} + +func (cm *ConnMonitor) NotifyInput() { + inputTime := time.Now().UnixMilli() + cm.LastInputTime.Store(inputTime) + select { + case cm.inputNotifyCh <- inputTime: + default: + } +} + +func (cm *ConnMonitor) isUrgent() bool { + lastInput := cm.LastInputTime.Load() + if lastInput == 0 { + return false + } + return time.Now().UnixMilli()-lastInput < 10000 +} + +func (cm *ConnMonitor) setKeepAliveInFlight() bool { + cm.lock.Lock() + defer cm.lock.Unlock() + + if cm.KeepAliveInFlight { + return false + } + cm.KeepAliveInFlight = true + cm.KeepAliveSentTime.Store(time.Now().UnixMilli()) + return true +} + +func (cm *ConnMonitor) clearKeepAliveInFlight() { + cm.lock.Lock() + defer cm.lock.Unlock() + + cm.KeepAliveInFlight = false +} + +func (cm *ConnMonitor) getTimeSinceKeepAlive() int64 { + cm.lock.Lock() + defer cm.lock.Unlock() + + if !cm.KeepAliveInFlight { + return 0 + } + return time.Now().UnixMilli() - cm.KeepAliveSentTime.Load() +} + +func (cm *ConnMonitor) SendKeepAlive() error { + client := cm.Client + if !cm.setKeepAliveInFlight() { + return nil + } + go func() { + defer func() { + panichandler.PanicHandler("conncontroller:SendKeepAlive", recover()) + }() + defer cm.clearKeepAliveInFlight() + startTime := time.Now() + _, _, err := client.SendRequest("keepalive@openssh.com", true, nil) + if err != nil { + // errors are only returned for network and I/O issues (likely disconnection). do not update last activity time + duration := time.Since(startTime).Milliseconds() + log.Printf("[conncontroller] conn:%s keepalive error (duration=%dms): %v", cm.Conn.GetName(), duration, err) + return + } + cm.UpdateLastActivityTime() + }() + return nil +} + +func (cm *ConnMonitor) checkConnection() { + lastActivity := cm.LastActivityTime.Load() + if lastActivity == 0 { + return + } + urgent := cm.isUrgent() + timeSinceActivity := time.Now().UnixMilli() - lastActivity + + keepAliveThreshold := int64(10000) + if urgent { + keepAliveThreshold = 1000 + } + if timeSinceActivity > keepAliveThreshold { + cm.SendKeepAlive() + } + + stalledThreshold := int64(10000) + if urgent { + stalledThreshold = 5000 + } + timeSinceKeepAlive := cm.getTimeSinceKeepAlive() + if timeSinceKeepAlive > stalledThreshold { + cm.setConnHealthStatus(ConnHealthStatus_Stalled) + } +} + +func (cm *ConnMonitor) keepAliveMonitor() { + defer func() { + panichandler.PanicHandler("conncontroller:keepAliveMonitor", recover()) + }() + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + // check if our client is still the active one + if cm.Conn.GetClient() != cm.Client { + return + } + + select { + case <-ticker.C: + cm.checkConnection() + + case inputTime := <-cm.inputNotifyCh: + select { + case <-time.After(1 * time.Second): + if cm.LastActivityTime.Load() >= inputTime { + break + } + cm.setConnHealthStatus(ConnHealthStatus_Degraded) + cm.checkConnection() + case <-cm.ctx.Done(): + return + } + + case <-cm.ctx.Done(): + return + } + } +} + +func (cm *ConnMonitor) Close() { + if cm.cancelFunc != nil { + cm.cancelFunc() + } +} diff --git a/pkg/util/shellutil/shellutil.go b/pkg/util/shellutil/shellutil.go index 3f8ee505be..e6f6c21f38 100644 --- a/pkg/util/shellutil/shellutil.go +++ b/pkg/util/shellutil/shellutil.go @@ -620,24 +620,23 @@ func FixupWaveZshHistory() error { } func GetTerminalResetSeq() string { - resetSeq := "\x1b[0m" // reset attributes - resetSeq += "\x1b[?25h" // show cursor - resetSeq += "\x1b[?1l" // normal cursor keys - resetSeq += "\x1b[?6l" // origin mode off (DECOM) - resetSeq += "\x1b[?7h" // wraparound on - resetSeq += "\x1b[?45l" // reverse wraparound off - resetSeq += "\x1b[?66l" // application keypad off (DECNKM) - resetSeq += "\x1b[4l" // insert mode off (IRM) - resetSeq += "\x1b[?9l" // X10 mouse tracking off - resetSeq += "\x1b[?1000l" // disable Send Mouse X & Y on button press - resetSeq += "\x1b[?1002l" // disable Use Cell Motion Mouse Tracking - resetSeq += "\x1b[?1003l" // disable Use All Motion Mouse Tracking - resetSeq += "\x1b[?1004l" // disable Send FocusIn/FocusOut events - resetSeq += "\x1b[?1006l" // disable Enable SGR Mouse Mode - resetSeq += "\x1b[?1007l" // disable Enable Alternate Scroll Mode - resetSeq += "\x1b[?2004l" // disable bracketed paste mode - resetSeq += "\x1b[?2026l" // synchronized output off - resetSeq += FormatOSC(16162, "R") // disable alternate screen mode + resetSeq := "\x1b[0m" // reset attributes + resetSeq += "\x1b[?25h" // show cursor + resetSeq += "\x1b[?1l" // normal cursor keys + resetSeq += "\x1b[?7h" // wraparound on + resetSeq += "\x1b[?45l" // reverse wraparound off + resetSeq += "\x1b[?66l" // application keypad off (DECNKM) + resetSeq += "\x1b[4l" // insert mode off (IRM) + resetSeq += "\x1b[?9l" // X10 mouse tracking off + resetSeq += "\x1b[?1000l" // disable Send Mouse X & Y on button press + resetSeq += "\x1b[?1002l" // disable Use Cell Motion Mouse Tracking + resetSeq += "\x1b[?1003l" // disable Use All Motion Mouse Tracking + resetSeq += "\x1b[?1004l" // disable Send FocusIn/FocusOut events + resetSeq += "\x1b[?1006l" // disable Enable SGR Mouse Mode + resetSeq += "\x1b[?1007l" // disable Enable Alternate Scroll Mode + resetSeq += "\x1b[?2004l" // disable bracketed paste mode + resetSeq += "\x1b[?2026l" // synchronized output off + resetSeq += FormatOSC(16162, "R") // disable alternate screen mode return resetSeq } diff --git a/pkg/util/utilfn/streamtolines.go b/pkg/util/utilfn/streamtolines.go index 7cfabf4cf2..51758465da 100644 --- a/pkg/util/utilfn/streamtolines.go +++ b/pkg/util/utilfn/streamtolines.go @@ -56,7 +56,7 @@ func streamToLines_processBuf(lineBuf *lineBuf, readBuf []byte, lineFn func([]by } } -func StreamToLines(input io.Reader, lineFn func([]byte)) error { +func StreamToLines(input io.Reader, lineFn func([]byte), readCallback func()) error { var lineBuf lineBuf readBuf := make([]byte, 64*1024) for { @@ -65,6 +65,9 @@ func StreamToLines(input io.Reader, lineFn func([]byte)) error { if err != nil { return err } + if readCallback != nil { + readCallback() + } } } @@ -76,7 +79,7 @@ func StreamToLinesChan(input io.Reader) chan LineOutput { defer close(ch) err := StreamToLines(input, func(line []byte) { ch <- LineOutput{Line: string(line)} - }) + }, nil) if err != nil && err != io.EOF { ch <- LineOutput{Error: err} } diff --git a/pkg/wshrpc/wshclient/wshclient.go b/pkg/wshrpc/wshclient/wshclient.go index 533fb01c24..2b6199f957 100644 --- a/pkg/wshrpc/wshclient/wshclient.go +++ b/pkg/wshrpc/wshclient/wshclient.go @@ -620,6 +620,12 @@ func NotifyCommand(w *wshutil.WshRpc, data wshrpc.WaveNotificationOptions, opts return err } +// command "notifysystemresume", wshserver.NotifySystemResumeCommand +func NotifySystemResumeCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) error { + _, err := sendRpcRequestCallHelper[any](w, "notifysystemresume", nil, opts) + return err +} + // command "path", wshserver.PathCommand func PathCommand(w *wshutil.WshRpc, data wshrpc.PathCommandData, opts *wshrpc.RpcOpts) (string, error) { resp, err := sendRpcRequestCallHelper[string](w, "path", data, opts) diff --git a/pkg/wshrpc/wshremote/wshremote_job.go b/pkg/wshrpc/wshremote/wshremote_job.go index df6d1d4a0a..09310e09da 100644 --- a/pkg/wshrpc/wshremote/wshremote_job.go +++ b/pkg/wshrpc/wshremote/wshremote_job.go @@ -75,7 +75,7 @@ func (impl *ServerImpl) connectToJobManager(ctx context.Context, jobId string, m close(proxy.FromRemoteCh) cleanup() }() - wshutil.AdaptStreamToMsgCh(conn, proxy.FromRemoteCh) + wshutil.AdaptStreamToMsgCh(conn, proxy.FromRemoteCh, nil) }() routeId := wshutil.MakeLinkRouteId(linkId) diff --git a/pkg/wshrpc/wshrpctypes.go b/pkg/wshrpc/wshrpctypes.go index 00cad208f0..c9b1363359 100644 --- a/pkg/wshrpc/wshrpctypes.go +++ b/pkg/wshrpc/wshrpctypes.go @@ -103,6 +103,7 @@ type WshRpcInterface interface { ConnUpdateWshCommand(ctx context.Context, remoteInfo RemoteInfo) (bool, error) FindGitBashCommand(ctx context.Context, rescan bool) (string, error) ConnServerInitCommand(ctx context.Context, data CommandConnServerInitData) error + NotifySystemResumeCommand(ctx context.Context) error // eventrecv is special, it's handled internally by WshRpc with EventListener EventRecvCommand(ctx context.Context, data wps.WaveEvent) error @@ -441,16 +442,19 @@ type ConnConfigRequest struct { } type ConnStatus struct { - Status string `json:"status"` - WshEnabled bool `json:"wshenabled"` - Connection string `json:"connection"` - Connected bool `json:"connected"` - HasConnected bool `json:"hasconnected"` // true if it has *ever* connected successfully - ActiveConnNum int `json:"activeconnnum"` - Error string `json:"error,omitempty"` - WshError string `json:"wsherror,omitempty"` - NoWshReason string `json:"nowshreason,omitempty"` - WshVersion string `json:"wshversion,omitempty"` + Status string `json:"status"` + ConnHealthStatus string `json:"connhealthstatus,omitempty"` + WshEnabled bool `json:"wshenabled"` + Connection string `json:"connection"` + Connected bool `json:"connected"` + HasConnected bool `json:"hasconnected"` // true if it has *ever* connected successfully + ActiveConnNum int `json:"activeconnnum"` + Error string `json:"error,omitempty"` + WshError string `json:"wsherror,omitempty"` + NoWshReason string `json:"nowshreason,omitempty"` + WshVersion string `json:"wshversion,omitempty"` + LastActivityBeforeStalledTime int64 `json:"lastactivitybeforestalledtime,omitempty"` + KeepAliveSentTime int64 `json:"keepalivesenttime,omitempty"` } type WebSelectorOpts struct { diff --git a/pkg/wshrpc/wshserver/wshserver.go b/pkg/wshrpc/wshserver/wshserver.go index 2777cd17f1..04b711fa0b 100644 --- a/pkg/wshrpc/wshserver/wshserver.go +++ b/pkg/wshrpc/wshserver/wshserver.go @@ -595,7 +595,7 @@ func (ws *WshServer) ConnDisconnectCommand(ctx context.Context, connName string) if err != nil { return fmt.Errorf("error parsing connection name: %w", err) } - conn := conncontroller.GetConn(connOpts) + conn := conncontroller.MaybeGetConn(connOpts) if conn == nil { return fmt.Errorf("connection not found: %s", connName) } @@ -755,6 +755,11 @@ func (ws *WshServer) DismissWshFailCommand(ctx context.Context, connName string) return nil } +func (ws *WshServer) NotifySystemResumeCommand(ctx context.Context) error { + log.Printf("NotifySystemResumeCommand called\n") + return nil +} + func (ws *WshServer) FindGitBashCommand(ctx context.Context, rescan bool) (string, error) { fullConfig := wconfig.GetWatcher().GetFullConfig() return shellutil.FindGitBash(&fullConfig, rescan), nil diff --git a/pkg/wshutil/wshrpcio.go b/pkg/wshutil/wshrpcio.go index 3345bdd9d3..2a88e987b4 100644 --- a/pkg/wshutil/wshrpcio.go +++ b/pkg/wshutil/wshrpcio.go @@ -16,10 +16,10 @@ import ( // * stream (json lines) // * websocket (json packets) -func AdaptStreamToMsgCh(input io.Reader, output chan baseds.RpcInputChType) error { +func AdaptStreamToMsgCh(input io.Reader, output chan baseds.RpcInputChType, readCallback func()) error { return utilfn.StreamToLines(input, func(line []byte) { output <- baseds.RpcInputChType{MsgBytes: line} - }) + }, readCallback) } func AdaptOutputChToStream(outputCh chan []byte, output io.Writer) error { diff --git a/pkg/wshutil/wshutil.go b/pkg/wshutil/wshutil.go index d979e7652f..2bb7e2db11 100644 --- a/pkg/wshutil/wshutil.go +++ b/pkg/wshutil/wshutil.go @@ -171,7 +171,7 @@ func SetupConnRpcClient(conn net.Conn, serverImpl ServerImpl, debugStr string) ( }() // when input is closed, close the connection defer conn.Close() - AdaptStreamToMsgCh(conn, inputCh) + AdaptStreamToMsgCh(conn, inputCh, nil) }() rtn := MakeWshRpcWithChannels(inputCh, outputCh, wshrpc.RpcContext{}, serverImpl, debugStr) return rtn, writeErrCh, nil @@ -255,7 +255,7 @@ func ValidateAndExtractRpcContextFromToken(tokenStr string) (*wshrpc.RpcContext, return claimsToRpcCtx(claims), nil } -func RunWshRpcOverListener(listener net.Listener) { +func RunWshRpcOverListener(listener net.Listener, readCallback func()) { defer log.Printf("domain socket listener shutting down\n") for { conn, err := listener.Accept() @@ -267,7 +267,7 @@ func RunWshRpcOverListener(listener net.Listener) { break } log.Print("got domain socket connection\n") - go handleDomainSocketClient(conn) + go handleDomainSocketClient(conn, readCallback) } } @@ -324,7 +324,7 @@ func HandleStdIOClient(logName string, input chan utilfn.LineOutput, output io.W <-doneCh } -func handleDomainSocketClient(conn net.Conn) { +func handleDomainSocketClient(conn net.Conn, readCallback func()) { var linkIdContainer atomic.Int32 proxy := MakeRpcProxy("domain") go func() { @@ -350,7 +350,7 @@ func handleDomainSocketClient(conn net.Conn) { DefaultRouter.UnregisterLink(baseds.LinkId(linkId)) } }() - AdaptStreamToMsgCh(conn, proxy.FromRemoteCh) + AdaptStreamToMsgCh(conn, proxy.FromRemoteCh, readCallback) }() linkId := DefaultRouter.RegisterUntrustedLink(proxy) linkIdContainer.Store(int32(linkId))