diff --git a/jsonrpc.go b/jsonrpc.go index e3cded4a..952884e2 100644 --- a/jsonrpc.go +++ b/jsonrpc.go @@ -163,213 +163,27 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) { scopedLogger.Trace().Msg("Received RPC request") - // Handle session-specific RPC methods first var result any var handlerErr error + // Handle session management RPC methods switch request.Method { - case "approvePrimaryRequest": - if err := RequirePermission(session, PermissionSessionTransfer); err != nil { - handlerErr = err - } else if requesterID, ok := request.Params["requesterID"].(string); ok { - handlerErr = sessionManager.ApprovePrimaryRequest(session.ID, requesterID) - if handlerErr == nil { - result = map[string]interface{}{"status": "approved"} - } - } else { - handlerErr = errors.New("invalid requesterID parameter") - } - case "denyPrimaryRequest": - if err := RequirePermission(session, PermissionSessionTransfer); err != nil { - handlerErr = err - } else if requesterID, ok := request.Params["requesterID"].(string); ok { - handlerErr = sessionManager.DenyPrimaryRequest(session.ID, requesterID) - if handlerErr == nil { - result = map[string]interface{}{"status": "denied"} - } - } else { - handlerErr = errors.New("invalid requesterID parameter") - } - case "approveNewSession": - if err := RequirePermission(session, PermissionSessionApprove); err != nil { - handlerErr = err - } else if sessionID, ok := request.Params["sessionId"].(string); ok { - handlerErr = sessionManager.ApproveSession(sessionID) - if handlerErr == nil { - go sessionManager.broadcastSessionListUpdate() - result = map[string]interface{}{"status": "approved"} - } - } else { - handlerErr = errors.New("invalid sessionId parameter") - } - case "denyNewSession": - if err := RequirePermission(session, PermissionSessionApprove); err != nil { - handlerErr = err - } else if sessionID, ok := request.Params["sessionId"].(string); ok { - handlerErr = sessionManager.DenySession(sessionID) - if handlerErr == nil { - // Notify the denied session - if targetSession := sessionManager.GetSession(sessionID); targetSession != nil { - go func() { - writeJSONRPCEvent("sessionAccessDenied", map[string]interface{}{ - "message": "Access denied by primary session", - }, targetSession) - sessionManager.broadcastSessionListUpdate() - }() - } - result = map[string]interface{}{"status": "denied"} - } - } else { - handlerErr = errors.New("invalid sessionId parameter") - } + case "approvePrimaryRequest", "denyPrimaryRequest": + result, handlerErr = handleSessionTransferRPC(request.Method, request.Params, session) + case "approveNewSession", "denyNewSession": + result, handlerErr = handleSessionApprovalRPC(request.Method, request.Params, session) case "requestSessionApproval": - if session.Mode != SessionModePending { - handlerErr = errors.New("only pending sessions can request approval") - } else if currentSessionSettings != nil && currentSessionSettings.RequireApproval { - if primary := sessionManager.GetPrimarySession(); primary != nil { - go func() { - writeJSONRPCEvent("newSessionPending", map[string]interface{}{ - "sessionId": session.ID, - "source": session.Source, - "identity": session.Identity, - "nickname": session.Nickname, - }, primary) - }() - result = map[string]interface{}{"status": "requested"} - } else { - handlerErr = errors.New("no primary session available") - } - } else { - handlerErr = errors.New("session approval not required") - } + result, handlerErr = handleRequestSessionApprovalRPC(session) case "updateSessionNickname": - sessionID, _ := request.Params["sessionId"].(string) - nickname, _ := request.Params["nickname"].(string) - // Validate nickname to match frontend validation - if len(nickname) < 2 { - handlerErr = errors.New("nickname must be at least 2 characters") - } else if len(nickname) > 30 { - handlerErr = errors.New("nickname must be 30 characters or less") - } else if !isValidNickname(nickname) { - handlerErr = errors.New("nickname can only contain letters, numbers, spaces, and - _ . @") - } else if targetSession := sessionManager.GetSession(sessionID); targetSession != nil { - // Users can update their own nickname, or admins can update any - if targetSession.ID == session.ID || session.HasPermission(PermissionSessionManage) { - // Check nickname uniqueness - allSessions := sessionManager.GetAllSessions() - for _, existingSession := range allSessions { - if existingSession.ID != sessionID && existingSession.Nickname == nickname { - handlerErr = fmt.Errorf("nickname '%s' is already in use by another session", nickname) - break - } - } - - if handlerErr == nil { - targetSession.Nickname = nickname - - // If session is pending and approval is required, send the approval request now that we have a nickname - if targetSession.Mode == SessionModePending && currentSessionSettings != nil && currentSessionSettings.RequireApproval { - if primary := sessionManager.GetPrimarySession(); primary != nil { - go func() { - writeJSONRPCEvent("newSessionPending", map[string]interface{}{ - "sessionId": targetSession.ID, - "source": targetSession.Source, - "identity": targetSession.Identity, - "nickname": targetSession.Nickname, - }, primary) - }() - } - } - - sessionManager.broadcastSessionListUpdate() - result = map[string]interface{}{"status": "updated"} - } - } else { - handlerErr = errors.New("permission denied: can only update own nickname") - } - } else { - handlerErr = errors.New("session not found") - } + result, handlerErr = handleUpdateSessionNicknameRPC(request.Params, session) case "getSessions": - sessions := sessionManager.GetAllSessions() - result = sessions + result = sessionManager.GetAllSessions() case "getPermissions": - permissions := session.GetPermissions() - permMap := make(map[string]bool) - for perm, allowed := range permissions { - permMap[string(perm)] = allowed - } - result = GetPermissionsResponse{ - Mode: string(session.Mode), - Permissions: permMap, - } - case "getSessionSettings": - if err := RequirePermission(session, PermissionSettingsRead); err != nil { - handlerErr = err - } else { - result = currentSessionSettings - } - case "setSessionSettings": - if err := RequirePermission(session, PermissionSessionManage); err != nil { - handlerErr = err - } else { - if settings, ok := request.Params["settings"].(map[string]interface{}); ok { - if requireApproval, ok := settings["requireApproval"].(bool); ok { - currentSessionSettings.RequireApproval = requireApproval - } - if requireNickname, ok := settings["requireNickname"].(bool); ok { - currentSessionSettings.RequireNickname = requireNickname - } - if reconnectGrace, ok := settings["reconnectGrace"].(float64); ok { - currentSessionSettings.ReconnectGrace = int(reconnectGrace) - } - if primaryTimeout, ok := settings["primaryTimeout"].(float64); ok { - currentSessionSettings.PrimaryTimeout = int(primaryTimeout) - } - if privateKeystrokes, ok := settings["privateKeystrokes"].(bool); ok { - currentSessionSettings.PrivateKeystrokes = privateKeystrokes - } - if maxRejectionAttempts, ok := settings["maxRejectionAttempts"].(float64); ok { - currentSessionSettings.MaxRejectionAttempts = int(maxRejectionAttempts) - } - if maxSessions, ok := settings["maxSessions"].(float64); ok { - currentSessionSettings.MaxSessions = int(maxSessions) - } - if observerTimeout, ok := settings["observerTimeout"].(float64); ok { - currentSessionSettings.ObserverTimeout = int(observerTimeout) - } - - // Trigger nickname auto-generation for sessions when RequireNickname changes - if sessionManager != nil { - sessionManager.updateAllSessionNicknames() - } - - // Save to persistent config - if err := SaveConfig(); err != nil { - handlerErr = errors.New("failed to save session settings") - } - result = currentSessionSettings - } else { - handlerErr = errors.New("invalid settings parameter") - } - } + result, handlerErr = handleGetPermissionsRPC(session) + case "getSessionSettings", "setSessionSettings": + result, handlerErr = handleSessionSettingsRPC(request.Method, request.Params, session) case "generateNickname": - // Generate a nickname based on user agent (no permissions required) - userAgent := "" - if request.Params != nil { - if ua, ok := request.Params["userAgent"].(string); ok { - userAgent = ua - } - } - - // Use browser as fallback if no user agent provided - if userAgent == "" { - userAgent = "Mozilla/5.0 (Unknown) Browser" - } - - result = map[string]string{ - "nickname": generateNicknameFromUserAgent(userAgent), - } + result, handlerErr = handleGenerateNicknameRPC(request.Params) default: // Check method permissions using centralized permission system if requiredPerm, exists := GetMethodPermission(request.Method); exists { diff --git a/jsonrpc_session_handlers.go b/jsonrpc_session_handlers.go new file mode 100644 index 00000000..0b4d887e --- /dev/null +++ b/jsonrpc_session_handlers.go @@ -0,0 +1,236 @@ +package kvm + +import ( + "errors" + "fmt" +) + +// handleSessionTransferRPC handles primary control transfer requests (approve/deny) +func handleSessionTransferRPC(method string, params map[string]any, session *Session) (any, error) { + requesterID, ok := params["requesterID"].(string) + if !ok { + return nil, errors.New("invalid requesterID parameter") + } + + if err := RequirePermission(session, PermissionSessionTransfer); err != nil { + return nil, err + } + + var err error + switch method { + case "approvePrimaryRequest": + err = sessionManager.ApprovePrimaryRequest(session.ID, requesterID) + if err == nil { + return map[string]interface{}{"status": "approved"}, nil + } + case "denyPrimaryRequest": + err = sessionManager.DenyPrimaryRequest(session.ID, requesterID) + if err == nil { + return map[string]interface{}{"status": "denied"}, nil + } + } + return nil, err +} + +// handleSessionApprovalRPC handles new session approval requests (approve/deny) +func handleSessionApprovalRPC(method string, params map[string]any, session *Session) (any, error) { + sessionID, ok := params["sessionId"].(string) + if !ok { + return nil, errors.New("invalid sessionId parameter") + } + + if err := RequirePermission(session, PermissionSessionApprove); err != nil { + return nil, err + } + + var err error + switch method { + case "approveNewSession": + err = sessionManager.ApproveSession(sessionID) + if err == nil { + go sessionManager.broadcastSessionListUpdate() + return map[string]interface{}{"status": "approved"}, nil + } + case "denyNewSession": + err = sessionManager.DenySession(sessionID) + if err == nil { + if targetSession := sessionManager.GetSession(sessionID); targetSession != nil { + go func() { + writeJSONRPCEvent("sessionAccessDenied", map[string]interface{}{ + "message": "Access denied by primary session", + }, targetSession) + sessionManager.broadcastSessionListUpdate() + }() + } + return map[string]interface{}{"status": "denied"}, nil + } + } + return nil, err +} + +// handleRequestSessionApprovalRPC handles pending sessions requesting approval from primary +func handleRequestSessionApprovalRPC(session *Session) (any, error) { + if session.Mode != SessionModePending { + return nil, errors.New("only pending sessions can request approval") + } + + if currentSessionSettings == nil || !currentSessionSettings.RequireApproval { + return nil, errors.New("session approval not required") + } + + primary := sessionManager.GetPrimarySession() + if primary == nil { + return nil, errors.New("no primary session available") + } + + go func() { + writeJSONRPCEvent("newSessionPending", map[string]interface{}{ + "sessionId": session.ID, + "source": session.Source, + "identity": session.Identity, + "nickname": session.Nickname, + }, primary) + }() + + return map[string]interface{}{"status": "requested"}, nil +} + +// handleUpdateSessionNicknameRPC handles nickname updates for sessions +func handleUpdateSessionNicknameRPC(params map[string]any, session *Session) (any, error) { + sessionID, _ := params["sessionId"].(string) + nickname, _ := params["nickname"].(string) + + if len(nickname) < 2 { + return nil, errors.New("nickname must be at least 2 characters") + } + if len(nickname) > 30 { + return nil, errors.New("nickname must be 30 characters or less") + } + if !isValidNickname(nickname) { + return nil, errors.New("nickname can only contain letters, numbers, spaces, and - _ . @") + } + + targetSession := sessionManager.GetSession(sessionID) + if targetSession == nil { + return nil, errors.New("session not found") + } + + if targetSession.ID != session.ID && !session.HasPermission(PermissionSessionManage) { + return nil, errors.New("permission denied: can only update own nickname") + } + + // Check nickname uniqueness + allSessions := sessionManager.GetAllSessions() + for _, existingSession := range allSessions { + if existingSession.ID != sessionID && existingSession.Nickname == nickname { + return nil, fmt.Errorf("nickname '%s' is already in use by another session", nickname) + } + } + + targetSession.Nickname = nickname + + // If session is pending and approval is required, send the approval request now that we have a nickname + if targetSession.Mode == SessionModePending && currentSessionSettings != nil && currentSessionSettings.RequireApproval { + if primary := sessionManager.GetPrimarySession(); primary != nil { + go func() { + writeJSONRPCEvent("newSessionPending", map[string]interface{}{ + "sessionId": targetSession.ID, + "source": targetSession.Source, + "identity": targetSession.Identity, + "nickname": targetSession.Nickname, + }, primary) + }() + } + } + + sessionManager.broadcastSessionListUpdate() + return map[string]interface{}{"status": "updated"}, nil +} + +// handleGetPermissionsRPC returns permissions for the current session +func handleGetPermissionsRPC(session *Session) (any, error) { + permissions := session.GetPermissions() + permMap := make(map[string]bool) + for perm, allowed := range permissions { + permMap[string(perm)] = allowed + } + return GetPermissionsResponse{ + Mode: string(session.Mode), + Permissions: permMap, + }, nil +} + +// handleSessionSettingsRPC handles getting or setting session settings +func handleSessionSettingsRPC(method string, params map[string]any, session *Session) (any, error) { + switch method { + case "getSessionSettings": + if err := RequirePermission(session, PermissionSettingsRead); err != nil { + return nil, err + } + return currentSessionSettings, nil + + case "setSessionSettings": + if err := RequirePermission(session, PermissionSessionManage); err != nil { + return nil, err + } + + settings, ok := params["settings"].(map[string]interface{}) + if !ok { + return nil, errors.New("invalid settings parameter") + } + + if requireApproval, ok := settings["requireApproval"].(bool); ok { + currentSessionSettings.RequireApproval = requireApproval + } + if requireNickname, ok := settings["requireNickname"].(bool); ok { + currentSessionSettings.RequireNickname = requireNickname + } + if reconnectGrace, ok := settings["reconnectGrace"].(float64); ok { + currentSessionSettings.ReconnectGrace = int(reconnectGrace) + } + if primaryTimeout, ok := settings["primaryTimeout"].(float64); ok { + currentSessionSettings.PrimaryTimeout = int(primaryTimeout) + } + if privateKeystrokes, ok := settings["privateKeystrokes"].(bool); ok { + currentSessionSettings.PrivateKeystrokes = privateKeystrokes + } + if maxRejectionAttempts, ok := settings["maxRejectionAttempts"].(float64); ok { + currentSessionSettings.MaxRejectionAttempts = int(maxRejectionAttempts) + } + if maxSessions, ok := settings["maxSessions"].(float64); ok { + currentSessionSettings.MaxSessions = int(maxSessions) + } + if observerTimeout, ok := settings["observerTimeout"].(float64); ok { + currentSessionSettings.ObserverTimeout = int(observerTimeout) + } + + if sessionManager != nil { + sessionManager.updateAllSessionNicknames() + } + + if err := SaveConfig(); err != nil { + return nil, errors.New("failed to save session settings") + } + return currentSessionSettings, nil + } + + return nil, fmt.Errorf("unknown session settings method: %s", method) +} + +// handleGenerateNicknameRPC generates a nickname based on user agent +func handleGenerateNicknameRPC(params map[string]any) (any, error) { + userAgent := "" + if params != nil { + if ua, ok := params["userAgent"].(string); ok { + userAgent = ua + } + } + + if userAgent == "" { + userAgent = "Mozilla/5.0 (Unknown) Browser" + } + + return map[string]string{ + "nickname": generateNicknameFromUserAgent(userAgent), + }, nil +} diff --git a/session_cleanup_handlers.go b/session_cleanup_handlers.go new file mode 100644 index 00000000..51740bfa --- /dev/null +++ b/session_cleanup_handlers.go @@ -0,0 +1,279 @@ +package kvm + +import ( + "time" +) + +// emergencyPromotionContext holds context for emergency promotion attempts +type emergencyPromotionContext struct { + triggerSessionID string + triggerReason string + now time.Time +} + +// attemptEmergencyPromotion tries to promote a session using emergency or normal promotion logic +// Returns (promotedSessionID, isEmergency, shouldSkip) +func (sm *SessionManager) attemptEmergencyPromotion(ctx emergencyPromotionContext, excludeSessionID string) (string, bool, bool) { + // Check if emergency promotion is needed + if currentSessionSettings == nil || !currentSessionSettings.RequireApproval { + // Normal promotion - reset consecutive counter + sm.consecutiveEmergencyPromotions = 0 + promotedID := sm.findNextSessionToPromote() + return promotedID, false, false + } + + // Emergency promotion path + hasPrimary := sm.primarySessionID != "" + if !hasPrimary { + sm.logger.Error(). + Str("triggerSessionID", ctx.triggerSessionID). + Msg("CRITICAL: No primary session exists - bypassing all rate limits") + } else { + // Rate limiting (only when we have a primary) + if ctx.now.Sub(sm.lastEmergencyPromotion) < 30*time.Second { + sm.logger.Warn(). + Str("triggerSessionID", ctx.triggerSessionID). + Dur("timeSinceLastEmergency", ctx.now.Sub(sm.lastEmergencyPromotion)). + Msgf("Emergency promotion rate limit exceeded - potential attack (%s)", ctx.triggerReason) + return "", false, true // shouldSkip = true + } + + // Limit consecutive emergency promotions + if sm.consecutiveEmergencyPromotions >= 3 { + sm.logger.Error(). + Str("triggerSessionID", ctx.triggerSessionID). + Int("consecutiveCount", sm.consecutiveEmergencyPromotions). + Msgf("Too many consecutive emergency promotions - blocking for security (%s)", ctx.triggerReason) + return "", false, true // shouldSkip = true + } + } + + // Find best session for emergency promotion + var promotedSessionID string + if excludeSessionID != "" { + // Need to exclude a specific session (e.g., timed-out session) + bestSessionID := "" + bestScore := -1 + for id, session := range sm.sessions { + if id != excludeSessionID && + !sm.isSessionBlacklisted(id) && + (session.Mode == SessionModeObserver || session.Mode == SessionModeQueued) { + score := sm.getSessionTrustScore(id) + if score > bestScore { + bestScore = score + bestSessionID = id + } + } + } + promotedSessionID = bestSessionID + } else { + promotedSessionID = sm.findMostTrustedSessionForEmergency() + } + + return promotedSessionID, true, false +} + +// handleGracePeriodExpiration checks and handles expired grace periods +// Returns true if any grace period expired +func (sm *SessionManager) handleGracePeriodExpiration(now time.Time) bool { + gracePeriodExpired := false + for sessionID, graceTime := range sm.reconnectGrace { + if now.After(graceTime) { + delete(sm.reconnectGrace, sessionID) + gracePeriodExpired = true + + wasHoldingPrimarySlot := (sm.lastPrimaryID == sessionID) + + if wasHoldingPrimarySlot { + sm.primarySessionID = "" + sm.lastPrimaryID = "" + + sm.logger.Info(). + Str("expiredSessionID", sessionID). + Msg("Primary session grace period expired - slot now available") + + // Promote next eligible session using emergency logic if needed + sm.promoteAfterGraceExpiration(sessionID, now) + } else { + sm.logger.Debug(). + Str("expiredSessionID", sessionID). + Msg("Non-primary session grace period expired") + } + + delete(sm.reconnectInfo, sessionID) + } + } + return gracePeriodExpired +} + +// promoteAfterGraceExpiration handles promotion after grace period expiration +func (sm *SessionManager) promoteAfterGraceExpiration(expiredSessionID string, now time.Time) { + ctx := emergencyPromotionContext{ + triggerSessionID: expiredSessionID, + triggerReason: "grace_expiration", + now: now, + } + + promotedSessionID, isEmergency, shouldSkip := sm.attemptEmergencyPromotion(ctx, "") + if shouldSkip { + return + } + + if promotedSessionID != "" { + reason := "grace_expiration_promotion" + if isEmergency { + reason = "emergency_promotion_deadlock_prevention" + sm.lastEmergencyPromotion = now + sm.consecutiveEmergencyPromotions++ + + sm.logger.Warn(). + Str("expiredSessionID", expiredSessionID). + Str("promotedSessionID", promotedSessionID). + Bool("requireApproval", true). + Int("consecutiveEmergencyPromotions", sm.consecutiveEmergencyPromotions). + Int("trustScore", sm.getSessionTrustScore(promotedSessionID)). + Msg("EMERGENCY: Bypassing approval requirement to prevent deadlock") + } + + err := sm.transferPrimaryRole("", promotedSessionID, reason, "primary grace period expired") + if err == nil { + logEvent := sm.logger.Info() + if isEmergency { + logEvent = sm.logger.Warn() + } + logEvent. + Str("expiredSessionID", expiredSessionID). + Str("promotedSessionID", promotedSessionID). + Str("reason", reason). + Bool("isEmergencyPromotion", isEmergency). + Msg("Auto-promoted session after primary grace period expiration") + } else { + sm.logger.Error(). + Err(err). + Str("expiredSessionID", expiredSessionID). + Str("promotedSessionID", promotedSessionID). + Str("reason", reason). + Bool("isEmergencyPromotion", isEmergency). + Msg("Failed to promote session after grace period expiration") + } + } else { + logLevel := sm.logger.Info() + if isEmergency { + logLevel = sm.logger.Error() + } + logLevel. + Str("expiredSessionID", expiredSessionID). + Bool("isEmergencyPromotion", isEmergency). + Msg("Primary grace period expired but no eligible sessions to promote") + } +} + +// handlePendingSessionTimeout removes timed-out pending sessions (DoS protection) +// Returns true if any pending session was removed +func (sm *SessionManager) handlePendingSessionTimeout(now time.Time) bool { + needsCleanup := false + for id, session := range sm.sessions { + if session.Mode == SessionModePending && + now.Sub(session.CreatedAt) > defaultPendingSessionTimeout { + websocketLogger.Info(). + Str("sessionId", id). + Dur("age", now.Sub(session.CreatedAt)). + Msg("Removing timed-out pending session") + delete(sm.sessions, id) + needsCleanup = true + } + } + return needsCleanup +} + +// handleObserverSessionCleanup removes inactive observer sessions with closed RPC channels +// Returns true if any observer session was removed +func (sm *SessionManager) handleObserverSessionCleanup(now time.Time) bool { + observerTimeout := defaultObserverSessionTimeout + if currentSessionSettings != nil && currentSessionSettings.ObserverTimeout > 0 { + observerTimeout = time.Duration(currentSessionSettings.ObserverTimeout) * time.Second + } + + needsCleanup := false + for id, session := range sm.sessions { + if session.Mode == SessionModeObserver { + if session.RPCChannel == nil && now.Sub(session.LastActive) > observerTimeout { + sm.logger.Info(). + Str("sessionId", id). + Dur("inactiveFor", now.Sub(session.LastActive)). + Dur("observerTimeout", observerTimeout). + Msg("Removing inactive observer session with closed RPC channel") + delete(sm.sessions, id) + needsCleanup = true + } + } + } + return needsCleanup +} + +// handlePrimarySessionTimeout checks and handles primary session timeout +// Returns true if primary session was timed out and cleanup is needed +func (sm *SessionManager) handlePrimarySessionTimeout(now time.Time) bool { + if sm.primarySessionID == "" { + return false + } + + primary, exists := sm.sessions[sm.primarySessionID] + if !exists { + sm.primarySessionID = "" + return true + } + + currentTimeout := sm.getCurrentPrimaryTimeout() + if now.Sub(primary.LastActive) <= currentTimeout { + return false + } + + // Timeout detected - demote primary + timedOutSessionID := primary.ID + primary.Mode = SessionModeObserver + sm.primarySessionID = "" + + ctx := emergencyPromotionContext{ + triggerSessionID: timedOutSessionID, + triggerReason: "timeout", + now: now, + } + + promotedSessionID, isEmergency, shouldSkip := sm.attemptEmergencyPromotion(ctx, timedOutSessionID) + if shouldSkip { + return false + } + + if promotedSessionID != "" { + reason := "timeout_promotion" + if isEmergency { + reason = "emergency_timeout_promotion" + sm.lastEmergencyPromotion = now + sm.consecutiveEmergencyPromotions++ + + sm.logger.Warn(). + Str("timedOutSessionID", timedOutSessionID). + Str("promotedSessionID", promotedSessionID). + Bool("requireApproval", true). + Int("trustScore", sm.getSessionTrustScore(promotedSessionID)). + Msg("EMERGENCY: Timeout promotion bypassing approval requirement") + } + + err := sm.transferPrimaryRole(timedOutSessionID, promotedSessionID, reason, "primary session timeout") + if err == nil { + logEvent := sm.logger.Info() + if isEmergency { + logEvent = sm.logger.Warn() + } + logEvent. + Str("timedOutSessionID", timedOutSessionID). + Str("promotedSessionID", promotedSessionID). + Bool("isEmergencyPromotion", isEmergency). + Msg("Auto-promoted session after primary timeout") + return true + } + } + + return false +} diff --git a/session_manager.go b/session_manager.go index 174203a1..a8d93f25 100644 --- a/session_manager.go +++ b/session_manager.go @@ -280,12 +280,14 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe } isBlacklisted := sm.isSessionBlacklisted(session.ID) + isOnlySession := len(sm.sessions) == 0 // Determine if this session should become primary - // If there's no primary AND this is the ONLY session, ALWAYS promote regardless of blacklist - isOnlySession := len(sm.sessions) == 0 - shouldBecomePrimary := (wasWithinGracePeriod && wasPreviouslyPrimary && !primaryExists && !hasActivePrimaryGracePeriod) || - (!wasWithinGracePeriod && !hasActivePrimaryGracePeriod && !primaryExists && (!isBlacklisted || isOnlySession)) + canBecomePrimary := !primaryExists && !hasActivePrimaryGracePeriod + isReconnectingPrimary := wasWithinGracePeriod && wasPreviouslyPrimary + isNewEligibleSession := !wasWithinGracePeriod && (!isBlacklisted || isOnlySession) + + shouldBecomePrimary := canBecomePrimary && (isReconnectingPrimary || isNewEligibleSession) if shouldBecomePrimary { if sm.primarySessionID == "" || sm.sessions[sm.primarySessionID] == nil { @@ -1565,10 +1567,10 @@ func (sm *SessionManager) Shutdown() { } func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) { - ticker := time.NewTicker(1 * time.Second) // Check every second for grace periods + ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() - validationCounter := 0 // Counter for periodic validateSinglePrimary calls + validationCounter := 0 for { select { @@ -1579,313 +1581,33 @@ func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) { now := time.Now() needsBroadcast := false - // Check for expired grace periods and promote if needed - gracePeriodExpired := false - for sessionID, graceTime := range sm.reconnectGrace { - if now.After(graceTime) { - delete(sm.reconnectGrace, sessionID) - gracePeriodExpired = true - - wasHoldingPrimarySlot := (sm.lastPrimaryID == sessionID) - - // Check if this expired session was the primary holding the slot - if wasHoldingPrimarySlot { - // The primary didn't reconnect in time, now we can clear the slot and promote - sm.primarySessionID = "" - sm.lastPrimaryID = "" - needsBroadcast = true - - sm.logger.Info(). - Str("expiredSessionID", sessionID). - Msg("Primary session grace period expired - slot now available") - - // Always try to promote when possible - approval is only for new pending sessions - // Use enhanced emergency promotion system for better security - isEmergencyPromotion := false - var promotedSessionID string - - // === EMERGENCY PROMOTION ALGORITHM === - // - // When RequireApproval is enabled, we face a potential deadlock scenario: - // - Primary session disconnects (grace period expires) - // - All other sessions are pending (waiting for approval from primary) - // - No primary exists to approve pending sessions - // - Result: System is stuck with no primary and no way to get one - // - // Solution: Emergency promotion bypasses approval requirement to select - // the most trustworthy pending/observer session as primary. This ensures - // the system ALWAYS has a primary session for KVM functionality. - // - // Security measures to prevent abuse: - // 1. Rate limiting: Max 1 emergency promotion per 30 seconds - // 2. Consecutive limit: Max 3 consecutive emergency promotions - // 3. Trust-based selection: Sessions scored on age, history, nickname - // 4. Audit logging: All emergency promotions logged at WARN level - // - // Trust scoring criteria (see getSessionTrustScore): - // - Session age: +1 point per minute (capped at 100) - // - Was previous primary: +50 points - // - Observer mode: +20 points (more trustworthy than queued/pending) - // - Queued mode: +10 points - // - Has required nickname: +15 points / missing: -30 points - // - // This algorithm prioritizes long-lived, previously-primary sessions - // with proper nicknames over newly-connected anonymous sessions. - // - // Check if this is an emergency scenario (RequireApproval enabled) - if currentSessionSettings != nil && currentSessionSettings.RequireApproval { - isEmergencyPromotion = true - - // CRITICAL: Ensure we ALWAYS have a primary session - // If there's NO primary, bypass rate limits entirely - hasPrimary := sm.primarySessionID != "" - if !hasPrimary { - sm.logger.Error(). - Str("expiredSessionID", sessionID). - Msg("CRITICAL: No primary session exists - bypassing all rate limits") - } else { - // Rate limiting for emergency promotions (only when we have a primary) - if now.Sub(sm.lastEmergencyPromotion) < 30*time.Second { - sm.logger.Warn(). - Str("expiredSessionID", sessionID). - Dur("timeSinceLastEmergency", now.Sub(sm.lastEmergencyPromotion)). - Msg("Emergency promotion rate limit exceeded - potential attack") - continue // Skip this grace period expiration - } - - // Limit consecutive emergency promotions - if sm.consecutiveEmergencyPromotions >= 3 { - sm.logger.Error(). - Str("expiredSessionID", sessionID). - Int("consecutiveCount", sm.consecutiveEmergencyPromotions). - Msg("Too many consecutive emergency promotions - blocking for security") - continue // Skip this grace period expiration - } - } - - promotedSessionID = sm.findMostTrustedSessionForEmergency() - } else { - // Normal promotion - reset consecutive counter - sm.consecutiveEmergencyPromotions = 0 - promotedSessionID = sm.findNextSessionToPromote() - } - - if promotedSessionID != "" { - // Determine reason and log appropriately - reason := "grace_expiration_promotion" - if isEmergencyPromotion { - reason = "emergency_promotion_deadlock_prevention" - sm.lastEmergencyPromotion = now - sm.consecutiveEmergencyPromotions++ - - // Enhanced logging for emergency promotions - sm.logger.Warn(). - Str("expiredSessionID", sessionID). - Str("promotedSessionID", promotedSessionID). - Bool("requireApproval", true). - Int("consecutiveEmergencyPromotions", sm.consecutiveEmergencyPromotions). - Int("trustScore", sm.getSessionTrustScore(promotedSessionID)). - Msg("EMERGENCY: Bypassing approval requirement to prevent deadlock") - } - - err := sm.transferPrimaryRole("", promotedSessionID, reason, "primary grace period expired") - if err == nil { - logEvent := sm.logger.Info() - if isEmergencyPromotion { - logEvent = sm.logger.Warn() - } - logEvent. - Str("expiredSessionID", sessionID). - Str("promotedSessionID", promotedSessionID). - Str("reason", reason). - Bool("isEmergencyPromotion", isEmergencyPromotion). - Msg("Auto-promoted session after primary grace period expiration") - } else { - sm.logger.Error(). - Err(err). - Str("expiredSessionID", sessionID). - Str("promotedSessionID", promotedSessionID). - Str("reason", reason). - Bool("isEmergencyPromotion", isEmergencyPromotion). - Msg("Failed to promote session after grace period expiration") - } - } else { - logLevel := sm.logger.Info() - if isEmergencyPromotion { - logLevel = sm.logger.Error() // Emergency with no eligible sessions is critical - } - logLevel. - Str("expiredSessionID", sessionID). - Bool("isEmergencyPromotion", isEmergencyPromotion). - Msg("Primary grace period expired but no eligible sessions to promote") - } - } else { - // Non-primary session grace period expired - just cleanup - sm.logger.Debug(). - Str("expiredSessionID", sessionID). - Msg("Non-primary session grace period expired") - } - - // Also clean up reconnect info for expired sessions - delete(sm.reconnectInfo, sessionID) - } + // Handle expired grace periods + gracePeriodExpired := sm.handleGracePeriodExpiration(now) + if gracePeriodExpired { + needsBroadcast = true } - // Clean up pending sessions that have timed out (DoS protection) - for id, session := range sm.sessions { - if session.Mode == SessionModePending && - now.Sub(session.CreatedAt) > defaultPendingSessionTimeout { - websocketLogger.Info(). - Str("sessionId", id). - Dur("age", now.Sub(session.CreatedAt)). - Msg("Removing timed-out pending session") - delete(sm.sessions, id) - needsBroadcast = true - } + // Clean up timed-out pending sessions (DoS protection) + if sm.handlePendingSessionTimeout(now) { + needsBroadcast = true } - // Clean up observer sessions with closed RPC channels (stale connections) - // This prevents accumulation of zombie observer sessions that are no longer connected - observerTimeout := defaultObserverSessionTimeout - if currentSessionSettings != nil && currentSessionSettings.ObserverTimeout > 0 { - observerTimeout = time.Duration(currentSessionSettings.ObserverTimeout) * time.Second - } - for id, session := range sm.sessions { - if session.Mode == SessionModeObserver { - // Check if RPC channel is nil/closed AND session has been inactive - if session.RPCChannel == nil && now.Sub(session.LastActive) > observerTimeout { - sm.logger.Info(). - Str("sessionId", id). - Dur("inactiveFor", now.Sub(session.LastActive)). - Dur("observerTimeout", observerTimeout). - Msg("Removing inactive observer session with closed RPC channel") - delete(sm.sessions, id) - needsBroadcast = true - } - } + // Clean up inactive observer sessions + if sm.handleObserverSessionCleanup(now) { + needsBroadcast = true } - // Check primary session timeout (every 30 iterations = 30 seconds) - if sm.primarySessionID != "" { - if primary, exists := sm.sessions[sm.primarySessionID]; exists { - currentTimeout := sm.getCurrentPrimaryTimeout() - if now.Sub(primary.LastActive) > currentTimeout { - timedOutSessionID := primary.ID - primary.Mode = SessionModeObserver - sm.primarySessionID = "" - - // === TIMEOUT-BASED EMERGENCY PROMOTION === - // - // Similar to grace period expiration, primary session timeout can create - // a deadlock when RequireApproval is enabled. The timeout detection happens - // every 30 seconds (based on ticker iterations) and demotes inactive primaries. - // - // Without emergency promotion: - // - Primary becomes inactive and times out - // - Primary is demoted to observer - // - All other sessions are pending (awaiting approval) - // - No primary exists to approve them - // - System deadlocked with no KVM control - // - // This uses the same trust-based selection and security measures as - // grace period emergency promotion to ensure system availability. - isEmergencyPromotion := false - var promotedSessionID string - - // Check if this requires emergency promotion due to approval requirements - if currentSessionSettings != nil && currentSessionSettings.RequireApproval { - isEmergencyPromotion = true - - // CRITICAL: Ensure we ALWAYS have a primary session - // primarySessionID was just cleared above, so this will always be empty - // But check anyway for completeness - hasPrimary := sm.primarySessionID != "" - if !hasPrimary { - sm.logger.Error(). - Str("timedOutSessionID", timedOutSessionID). - Msg("CRITICAL: No primary session after timeout - bypassing all rate limits") - } else { - // Rate limiting for emergency promotions (only when we have a primary) - if now.Sub(sm.lastEmergencyPromotion) < 30*time.Second { - sm.logger.Warn(). - Str("timedOutSessionID", timedOutSessionID). - Dur("timeSinceLastEmergency", now.Sub(sm.lastEmergencyPromotion)). - Msg("Emergency promotion rate limit exceeded during timeout - potential attack") - continue // Skip this timeout - } - } - - // Use trust-based selection but exclude the timed-out session - bestSessionID := "" - bestScore := -1 - for id, session := range sm.sessions { - if id != timedOutSessionID && - !sm.isSessionBlacklisted(id) && - (session.Mode == SessionModeObserver || session.Mode == SessionModeQueued) { - score := sm.getSessionTrustScore(id) - if score > bestScore { - bestScore = score - bestSessionID = id - } - } - } - promotedSessionID = bestSessionID - } else { - // Normal timeout promotion - find any observer except the timed-out one - for id, session := range sm.sessions { - if id != timedOutSessionID && session.Mode == SessionModeObserver && !sm.isSessionBlacklisted(id) { - promotedSessionID = id - break - } - } - } - - // If found a session to promote - if promotedSessionID != "" { - reason := "timeout_promotion" - if isEmergencyPromotion { - reason = "emergency_timeout_promotion" - sm.lastEmergencyPromotion = now - sm.consecutiveEmergencyPromotions++ - - // Enhanced logging for emergency timeout promotions - sm.logger.Warn(). - Str("timedOutSessionID", timedOutSessionID). - Str("promotedSessionID", promotedSessionID). - Bool("requireApproval", true). - Int("trustScore", sm.getSessionTrustScore(promotedSessionID)). - Msg("EMERGENCY: Timeout promotion bypassing approval requirement") - } - - err := sm.transferPrimaryRole(timedOutSessionID, promotedSessionID, reason, "primary session timeout") - if err == nil { - needsBroadcast = true - logEvent := sm.logger.Info() - if isEmergencyPromotion { - logEvent = sm.logger.Warn() - } - logEvent. - Str("timedOutSessionID", timedOutSessionID). - Str("promotedSessionID", promotedSessionID). - Bool("isEmergencyPromotion", isEmergencyPromotion). - Msg("Auto-promoted session after primary timeout") - } - } - } - } else { - // Primary session no longer exists, clear it - sm.primarySessionID = "" - needsBroadcast = true - } + // Handle primary session timeout + if sm.handlePrimarySessionTimeout(now) { + needsBroadcast = true } - // Run validation immediately if a grace period expired, otherwise run periodically + // Run validation immediately if grace period expired, otherwise periodically if gracePeriodExpired { sm.validateSinglePrimary() } else { - // Periodic validateSinglePrimary to catch deadlock states validationCounter++ - if validationCounter >= 10 { // Every 10 seconds + if validationCounter >= 10 { validationCounter = 0 sm.validateSinglePrimary() } @@ -1893,7 +1615,6 @@ func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) { sm.mu.Unlock() - // Broadcast outside of lock if needed if needsBroadcast { go sm.broadcastSessionListUpdate() }