diff --git a/hidrpc.go b/hidrpc.go index c4c8c8ae..e80b996b 100644 --- a/hidrpc.go +++ b/hidrpc.go @@ -134,14 +134,15 @@ const baseExtension = expectedRate + maxLateness // 100ms extension on perfect t const maxStaleness = 225 * time.Millisecond // discard ancient packets outright func handleHidRPCKeypressKeepAlive(session *Session) error { + // Update LastActive to prevent session timeout (jiggler sends every 50ms) + sessionManager.UpdateLastActive(session.ID) + session.keepAliveJitterLock.Lock() defer session.keepAliveJitterLock.Unlock() now := time.Now() - // 1) Staleness guard: ensures packets that arrive far beyond the life of a valid key hold - // (e.g. after a network stall, retransmit burst, or machine sleep) are ignored outright. - // This prevents “zombie” keepalives from reviving a key that should already be released. + // Staleness guard: discard ancient packets after network stall/machine sleep if !session.lastTimerResetTime.IsZero() && now.Sub(session.lastTimerResetTime) > maxStaleness { return nil } diff --git a/session_manager.go b/session_manager.go index 7aa38379..ed619ed0 100644 --- a/session_manager.go +++ b/session_manager.go @@ -1159,28 +1159,26 @@ func (sm *SessionManager) transferPrimaryRole(fromSessionID, toSessionID, transf // The caller (AddSession, RemoveSession, etc.) will validate after we return // sm.validateSinglePrimary() // REMOVED to prevent recursion - // Handle WebRTC connection state for promoted sessions - // When a session changes from observer to primary, the existing WebRTC connection - // was established for observer mode and needs to be re-negotiated for primary mode + // Send reconnection signal for emergency promotions via WebSocket (more reliable than RPC when channel is stale) if toExists && (transferType == "emergency_timeout_promotion" || transferType == "emergency_auto_promotion") { go func() { - // Small delay to ensure session mode changes are committed time.Sleep(100 * time.Millisecond) - // Send connection reset signal to the promoted session - writeJSONRPCEvent("connectionModeChanged", map[string]interface{}{ + eventData := map[string]interface{}{ "sessionId": toSessionID, "newMode": string(toSession.Mode), "reason": "session_promotion", "action": "reconnect_required", "timestamp": time.Now().Unix(), - }, toSession) + } - sm.logger.Info(). - Str("sessionId", toSessionID). - Str("newMode", string(toSession.Mode)). - Str("transferType", transferType). - Msg("Sent WebRTC reconnection signal to promoted session") + err := toSession.sendWebSocketSignal("connectionModeChanged", eventData) + if err != nil { + sm.logger.Warn().Err(err).Str("sessionId", toSessionID).Msg("WebSocket signal failed, using RPC") + writeJSONRPCEvent("connectionModeChanged", eventData, toSession) + } + + sm.logger.Info().Str("sessionId", toSessionID).Str("transferType", transferType).Msg("Sent reconnection signal") }() } diff --git a/ui/src/routes/devices.$id.tsx b/ui/src/routes/devices.$id.tsx index 078d4f7e..2ad16bbe 100644 --- a/ui/src/routes/devices.$id.tsx +++ b/ui/src/routes/devices.$id.tsx @@ -395,6 +395,24 @@ export default function KvmIdRoute() { peerConnection.addIceCandidate(candidate).catch(error => { console.warn("[Websocket] Failed to add ICE candidate:", error); }); + } else if (parsedMessage.type === "connectionModeChanged") { + // Handle mode changes via WebSocket (fallback when RPC channel stale) + const { newMode, action } = parsedMessage.data; + + if (action === "reconnect_required" && newMode) { + console.log(`[Websocket] Mode changed to ${newMode}, reconnecting...`); + + if (currentSessionId) { + setCurrentSession(currentSessionId, newMode); + } + + handleRpcEvent("connectionModeChanged", parsedMessage.data); + + setTimeout(() => { + peerConnection?.close(); + setupPeerConnection(); + }, 500); + } } }, }, diff --git a/webrtc.go b/webrtc.go index e6eec581..d5edaef4 100644 --- a/webrtc.go +++ b/webrtc.go @@ -51,9 +51,9 @@ type Session struct { RPCChannel *webrtc.DataChannel HidChannel *webrtc.DataChannel shouldUmountVirtualMedia bool - flushCandidates func() // Callback to flush buffered ICE candidates - - rpcQueue chan webrtc.DataChannelMessage + flushCandidates func() // Callback to flush buffered ICE candidates + ws *websocket.Conn // WebSocket for critical signaling when RPC unavailable + rpcQueue chan webrtc.DataChannelMessage hidRPCAvailable bool lastKeepAliveArrivalTime time.Time // Track when last keep-alive packet arrived @@ -116,6 +116,22 @@ func (s *Session) resetKeepAliveTime() { s.lastTimerResetTime = time.Time{} // Reset auto-release timer tracking } +// sendWebSocketSignal sends critical state changes via WebSocket (fallback when RPC channel stale) +func (s *Session) sendWebSocketSignal(messageType string, data map[string]interface{}) error { + if s == nil || s.ws == nil { + return nil + } + + err := wsjson.Write(context.Background(), s.ws, gin.H{"type": messageType, "data": data}) + if err != nil { + webrtcLogger.Debug().Err(err).Str("sessionId", s.ID).Msg("Failed to send WebSocket signal") + return err + } + + webrtcLogger.Info().Str("sessionId", s.ID).Str("messageType", messageType).Msg("Sent WebSocket signal") + return nil +} + type hidQueueMessage struct { webrtc.DataChannelMessage channel string @@ -299,6 +315,7 @@ func newSession(config SessionConfig) (*Session, error) { session := &Session{ peerConnection: peerConnection, Browser: extractBrowserFromUserAgent(config.UserAgent), + ws: config.ws, } session.rpcQueue = make(chan webrtc.DataChannelMessage, 256) session.initQueues()