diff --git a/jsonrpc.go b/jsonrpc.go index 33c7974a..0b2d8d84 100644 --- a/jsonrpc.go +++ b/jsonrpc.go @@ -81,6 +81,11 @@ func writeJSONRPCResponse(response JSONRPCResponse, session *Session) { } func writeJSONRPCEvent(event string, params any, session *Session) { + // Defensive checks: skip if session or RPC channel is not ready + if session == nil || session.RPCChannel == nil { + return // Channel not ready or already closed - this is expected during cleanup + } + request := JSONRPCEvent{ JSONRPC: "2.0", Method: event, @@ -91,10 +96,6 @@ func writeJSONRPCEvent(event string, params any, session *Session) { jsonRpcLogger.Warn().Err(err).Msg("Error marshalling JSONRPC event") return } - if session == nil || session.RPCChannel == nil { - jsonRpcLogger.Info().Msg("RPC channel not available") - return - } requestString := string(requestBytes) scopedLogger := jsonRpcLogger.With(). @@ -105,7 +106,8 @@ func writeJSONRPCEvent(event string, params any, session *Session) { err = session.RPCChannel.SendText(requestString) if err != nil { - scopedLogger.Warn().Err(err).Msg("error sending JSONRPC event") + // Only log at debug level - closed pipe errors are expected during reconnection + scopedLogger.Debug().Err(err).Str("event", event).Msg("Could not send JSONRPC event (channel may be closing)") return } } @@ -327,6 +329,15 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) { if privateKeystrokes, ok := settings["privateKeystrokes"].(bool); ok { currentSessionSettings.PrivateKeystrokes = privateKeystrokes } + if maxRejectionAttempts, ok := settings["maxRejectionAttempts"].(float64); ok { + currentSessionSettings.MaxRejectionAttempts = int(maxRejectionAttempts) + } + if maxSessions, ok := settings["maxSessions"].(float64); ok { + currentSessionSettings.MaxSessions = int(maxSessions) + } + if observerTimeout, ok := settings["observerTimeout"].(float64); ok { + currentSessionSettings.ObserverTimeout = int(observerTimeout) + } // Trigger nickname auto-generation for sessions when RequireNickname changes if sessionManager != nil { diff --git a/session_manager.go b/session_manager.go index f50bc32a..7aa38379 100644 --- a/session_manager.go +++ b/session_manager.go @@ -102,8 +102,13 @@ func NewSessionManager(logger *zerolog.Logger) *SessionManager { } // Override with session settings if available - if currentSessionSettings != nil && currentSessionSettings.PrimaryTimeout > 0 { - primaryTimeout = time.Duration(currentSessionSettings.PrimaryTimeout) * time.Second + if currentSessionSettings != nil { + if currentSessionSettings.PrimaryTimeout > 0 { + primaryTimeout = time.Duration(currentSessionSettings.PrimaryTimeout) * time.Second + } + if currentSessionSettings.MaxSessions > 0 { + maxSessions = currentSessionSettings.MaxSessions + } } sm := &SessionManager{ @@ -1674,6 +1679,27 @@ func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) { } } + // Clean up observer sessions with closed RPC channels (stale connections) + // This prevents accumulation of zombie observer sessions that are no longer connected + observerTimeout := 2 * time.Minute // Default: 2 minutes + if currentSessionSettings != nil && currentSessionSettings.ObserverTimeout > 0 { + observerTimeout = time.Duration(currentSessionSettings.ObserverTimeout) * time.Second + } + for id, session := range sm.sessions { + if session.Mode == SessionModeObserver { + // Check if RPC channel is nil/closed AND session has been inactive + if session.RPCChannel == nil && now.Sub(session.LastActive) > observerTimeout { + sm.logger.Info(). + Str("sessionId", id). + Dur("inactiveFor", now.Sub(session.LastActive)). + Dur("observerTimeout", observerTimeout). + Msg("Removing inactive observer session with closed RPC channel") + delete(sm.sessions, id) + needsBroadcast = true + } + } + } + // Check primary session timeout (every 30 iterations = 30 seconds) if sm.primarySessionID != "" { if primary, exists := sm.sessions[sm.primarySessionID]; exists { diff --git a/ui/src/routes/devices.$id.settings.multi-session.tsx b/ui/src/routes/devices.$id.settings.multi-session.tsx index 09375086..4691876e 100644 --- a/ui/src/routes/devices.$id.settings.multi-session.tsx +++ b/ui/src/routes/devices.$id.settings.multi-session.tsx @@ -30,6 +30,8 @@ export default function SessionsSettings() { const [reconnectGrace, setReconnectGrace] = useState(10); const [primaryTimeout, setPrimaryTimeout] = useState(300); const [privateKeystrokes, setPrivateKeystrokes] = useState(false); + const [maxSessions, setMaxSessions] = useState(10); + const [observerTimeout, setObserverTimeout] = useState(120); useEffect(() => { send("getSessionSettings", {}, (response: JsonRpcResponse) => { @@ -43,6 +45,8 @@ export default function SessionsSettings() { primaryTimeout?: number; privateKeystrokes?: boolean; maxRejectionAttempts?: number; + maxSessions?: number; + observerTimeout?: number; }; setRequireSessionApproval(settings.requireApproval); setRequireSessionNickname(settings.requireNickname); @@ -58,6 +62,12 @@ export default function SessionsSettings() { if (settings.maxRejectionAttempts !== undefined) { setMaxRejectionAttempts(settings.maxRejectionAttempts); } + if (settings.maxSessions !== undefined) { + setMaxSessions(settings.maxSessions); + } + if (settings.observerTimeout !== undefined) { + setObserverTimeout(settings.observerTimeout); + } } }); }, [send, setRequireSessionApproval, setRequireSessionNickname, setMaxRejectionAttempts]); @@ -69,6 +79,8 @@ export default function SessionsSettings() { primaryTimeout: number; privateKeystrokes: boolean; maxRejectionAttempts: number; + maxSessions: number; + observerTimeout: number; }>) => { if (!canModifySettings) { notify.error("Only the primary session can change this setting"); @@ -83,6 +95,8 @@ export default function SessionsSettings() { primaryTimeout: primaryTimeout, privateKeystrokes: privateKeystrokes, maxRejectionAttempts: maxRejectionAttempts, + maxSessions: maxSessions, + observerTimeout: observerTimeout, ...updates } }, (response: JsonRpcResponse) => { @@ -248,6 +262,65 @@ export default function SessionsSettings() { + +
+ { + const newValue = parseInt(e.target.value) || 10; + if (newValue < 1 || newValue > 20) { + notify.error("Max sessions must be between 1 and 20"); + return; + } + setMaxSessions(newValue); + updateSessionSettings({ maxSessions: newValue }); + notify.success( + `Maximum concurrent sessions set to ${newValue}` + ); + }} + className="w-20 px-2 py-1.5 border rounded-md bg-white dark:bg-slate-800 border-slate-300 dark:border-slate-600 text-slate-900 dark:text-white disabled:opacity-50 disabled:cursor-not-allowed text-sm" + /> + sessions +
+
+ + +
+ { + const newValue = parseInt(e.target.value) || 120; + if (newValue < 30 || newValue > 600) { + notify.error("Timeout must be between 30 and 600 seconds"); + return; + } + setObserverTimeout(newValue); + updateSessionSettings({ observerTimeout: newValue }); + notify.success( + `Observer cleanup timeout set to ${Math.round(newValue / 60)} minute${Math.round(newValue / 60) === 1 ? '' : 's'}` + ); + }} + className="w-20 px-2 py-1.5 border rounded-md bg-white dark:bg-slate-800 border-slate-300 dark:border-slate-600 text-slate-900 dark:text-white disabled:opacity-50 disabled:cursor-not-allowed text-sm" + /> + seconds +
+
+