mirror of https://github.com/jetkvm/kvm.git
refactor: improve code maintainability with focused handler functions
Extract large switch statements and functions into focused, reusable handlers to improve code maintainability while preserving 100% functionality. Changes: - Extract onRPCMessage switch (200+ lines → 20 lines) into jsonrpc_session_handlers.go - Extract cleanupInactiveSessions (343 lines → 54 lines) into session_cleanup_handlers.go - Consolidate duplicate emergency promotion logic into attemptEmergencyPromotion() - Simplify shouldBecomePrimary boolean logic with self-documenting variables All changes pass linting (0 issues) and maintain complete functionality.
This commit is contained in:
parent
da85b54fc2
commit
846caf77ce
210
jsonrpc.go
210
jsonrpc.go
|
|
@ -163,213 +163,27 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) {
|
||||||
|
|
||||||
scopedLogger.Trace().Msg("Received RPC request")
|
scopedLogger.Trace().Msg("Received RPC request")
|
||||||
|
|
||||||
// Handle session-specific RPC methods first
|
|
||||||
var result any
|
var result any
|
||||||
var handlerErr error
|
var handlerErr error
|
||||||
|
|
||||||
|
// Handle session management RPC methods
|
||||||
switch request.Method {
|
switch request.Method {
|
||||||
case "approvePrimaryRequest":
|
case "approvePrimaryRequest", "denyPrimaryRequest":
|
||||||
if err := RequirePermission(session, PermissionSessionTransfer); err != nil {
|
result, handlerErr = handleSessionTransferRPC(request.Method, request.Params, session)
|
||||||
handlerErr = err
|
case "approveNewSession", "denyNewSession":
|
||||||
} else if requesterID, ok := request.Params["requesterID"].(string); ok {
|
result, handlerErr = handleSessionApprovalRPC(request.Method, request.Params, session)
|
||||||
handlerErr = sessionManager.ApprovePrimaryRequest(session.ID, requesterID)
|
|
||||||
if handlerErr == nil {
|
|
||||||
result = map[string]interface{}{"status": "approved"}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
handlerErr = errors.New("invalid requesterID parameter")
|
|
||||||
}
|
|
||||||
case "denyPrimaryRequest":
|
|
||||||
if err := RequirePermission(session, PermissionSessionTransfer); err != nil {
|
|
||||||
handlerErr = err
|
|
||||||
} else if requesterID, ok := request.Params["requesterID"].(string); ok {
|
|
||||||
handlerErr = sessionManager.DenyPrimaryRequest(session.ID, requesterID)
|
|
||||||
if handlerErr == nil {
|
|
||||||
result = map[string]interface{}{"status": "denied"}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
handlerErr = errors.New("invalid requesterID parameter")
|
|
||||||
}
|
|
||||||
case "approveNewSession":
|
|
||||||
if err := RequirePermission(session, PermissionSessionApprove); err != nil {
|
|
||||||
handlerErr = err
|
|
||||||
} else if sessionID, ok := request.Params["sessionId"].(string); ok {
|
|
||||||
handlerErr = sessionManager.ApproveSession(sessionID)
|
|
||||||
if handlerErr == nil {
|
|
||||||
go sessionManager.broadcastSessionListUpdate()
|
|
||||||
result = map[string]interface{}{"status": "approved"}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
handlerErr = errors.New("invalid sessionId parameter")
|
|
||||||
}
|
|
||||||
case "denyNewSession":
|
|
||||||
if err := RequirePermission(session, PermissionSessionApprove); err != nil {
|
|
||||||
handlerErr = err
|
|
||||||
} else if sessionID, ok := request.Params["sessionId"].(string); ok {
|
|
||||||
handlerErr = sessionManager.DenySession(sessionID)
|
|
||||||
if handlerErr == nil {
|
|
||||||
// Notify the denied session
|
|
||||||
if targetSession := sessionManager.GetSession(sessionID); targetSession != nil {
|
|
||||||
go func() {
|
|
||||||
writeJSONRPCEvent("sessionAccessDenied", map[string]interface{}{
|
|
||||||
"message": "Access denied by primary session",
|
|
||||||
}, targetSession)
|
|
||||||
sessionManager.broadcastSessionListUpdate()
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
result = map[string]interface{}{"status": "denied"}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
handlerErr = errors.New("invalid sessionId parameter")
|
|
||||||
}
|
|
||||||
case "requestSessionApproval":
|
case "requestSessionApproval":
|
||||||
if session.Mode != SessionModePending {
|
result, handlerErr = handleRequestSessionApprovalRPC(session)
|
||||||
handlerErr = errors.New("only pending sessions can request approval")
|
|
||||||
} else if currentSessionSettings != nil && currentSessionSettings.RequireApproval {
|
|
||||||
if primary := sessionManager.GetPrimarySession(); primary != nil {
|
|
||||||
go func() {
|
|
||||||
writeJSONRPCEvent("newSessionPending", map[string]interface{}{
|
|
||||||
"sessionId": session.ID,
|
|
||||||
"source": session.Source,
|
|
||||||
"identity": session.Identity,
|
|
||||||
"nickname": session.Nickname,
|
|
||||||
}, primary)
|
|
||||||
}()
|
|
||||||
result = map[string]interface{}{"status": "requested"}
|
|
||||||
} else {
|
|
||||||
handlerErr = errors.New("no primary session available")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
handlerErr = errors.New("session approval not required")
|
|
||||||
}
|
|
||||||
case "updateSessionNickname":
|
case "updateSessionNickname":
|
||||||
sessionID, _ := request.Params["sessionId"].(string)
|
result, handlerErr = handleUpdateSessionNicknameRPC(request.Params, session)
|
||||||
nickname, _ := request.Params["nickname"].(string)
|
|
||||||
// Validate nickname to match frontend validation
|
|
||||||
if len(nickname) < 2 {
|
|
||||||
handlerErr = errors.New("nickname must be at least 2 characters")
|
|
||||||
} else if len(nickname) > 30 {
|
|
||||||
handlerErr = errors.New("nickname must be 30 characters or less")
|
|
||||||
} else if !isValidNickname(nickname) {
|
|
||||||
handlerErr = errors.New("nickname can only contain letters, numbers, spaces, and - _ . @")
|
|
||||||
} else if targetSession := sessionManager.GetSession(sessionID); targetSession != nil {
|
|
||||||
// Users can update their own nickname, or admins can update any
|
|
||||||
if targetSession.ID == session.ID || session.HasPermission(PermissionSessionManage) {
|
|
||||||
// Check nickname uniqueness
|
|
||||||
allSessions := sessionManager.GetAllSessions()
|
|
||||||
for _, existingSession := range allSessions {
|
|
||||||
if existingSession.ID != sessionID && existingSession.Nickname == nickname {
|
|
||||||
handlerErr = fmt.Errorf("nickname '%s' is already in use by another session", nickname)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if handlerErr == nil {
|
|
||||||
targetSession.Nickname = nickname
|
|
||||||
|
|
||||||
// If session is pending and approval is required, send the approval request now that we have a nickname
|
|
||||||
if targetSession.Mode == SessionModePending && currentSessionSettings != nil && currentSessionSettings.RequireApproval {
|
|
||||||
if primary := sessionManager.GetPrimarySession(); primary != nil {
|
|
||||||
go func() {
|
|
||||||
writeJSONRPCEvent("newSessionPending", map[string]interface{}{
|
|
||||||
"sessionId": targetSession.ID,
|
|
||||||
"source": targetSession.Source,
|
|
||||||
"identity": targetSession.Identity,
|
|
||||||
"nickname": targetSession.Nickname,
|
|
||||||
}, primary)
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sessionManager.broadcastSessionListUpdate()
|
|
||||||
result = map[string]interface{}{"status": "updated"}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
handlerErr = errors.New("permission denied: can only update own nickname")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
handlerErr = errors.New("session not found")
|
|
||||||
}
|
|
||||||
case "getSessions":
|
case "getSessions":
|
||||||
sessions := sessionManager.GetAllSessions()
|
result = sessionManager.GetAllSessions()
|
||||||
result = sessions
|
|
||||||
case "getPermissions":
|
case "getPermissions":
|
||||||
permissions := session.GetPermissions()
|
result, handlerErr = handleGetPermissionsRPC(session)
|
||||||
permMap := make(map[string]bool)
|
case "getSessionSettings", "setSessionSettings":
|
||||||
for perm, allowed := range permissions {
|
result, handlerErr = handleSessionSettingsRPC(request.Method, request.Params, session)
|
||||||
permMap[string(perm)] = allowed
|
|
||||||
}
|
|
||||||
result = GetPermissionsResponse{
|
|
||||||
Mode: string(session.Mode),
|
|
||||||
Permissions: permMap,
|
|
||||||
}
|
|
||||||
case "getSessionSettings":
|
|
||||||
if err := RequirePermission(session, PermissionSettingsRead); err != nil {
|
|
||||||
handlerErr = err
|
|
||||||
} else {
|
|
||||||
result = currentSessionSettings
|
|
||||||
}
|
|
||||||
case "setSessionSettings":
|
|
||||||
if err := RequirePermission(session, PermissionSessionManage); err != nil {
|
|
||||||
handlerErr = err
|
|
||||||
} else {
|
|
||||||
if settings, ok := request.Params["settings"].(map[string]interface{}); ok {
|
|
||||||
if requireApproval, ok := settings["requireApproval"].(bool); ok {
|
|
||||||
currentSessionSettings.RequireApproval = requireApproval
|
|
||||||
}
|
|
||||||
if requireNickname, ok := settings["requireNickname"].(bool); ok {
|
|
||||||
currentSessionSettings.RequireNickname = requireNickname
|
|
||||||
}
|
|
||||||
if reconnectGrace, ok := settings["reconnectGrace"].(float64); ok {
|
|
||||||
currentSessionSettings.ReconnectGrace = int(reconnectGrace)
|
|
||||||
}
|
|
||||||
if primaryTimeout, ok := settings["primaryTimeout"].(float64); ok {
|
|
||||||
currentSessionSettings.PrimaryTimeout = int(primaryTimeout)
|
|
||||||
}
|
|
||||||
if privateKeystrokes, ok := settings["privateKeystrokes"].(bool); ok {
|
|
||||||
currentSessionSettings.PrivateKeystrokes = privateKeystrokes
|
|
||||||
}
|
|
||||||
if maxRejectionAttempts, ok := settings["maxRejectionAttempts"].(float64); ok {
|
|
||||||
currentSessionSettings.MaxRejectionAttempts = int(maxRejectionAttempts)
|
|
||||||
}
|
|
||||||
if maxSessions, ok := settings["maxSessions"].(float64); ok {
|
|
||||||
currentSessionSettings.MaxSessions = int(maxSessions)
|
|
||||||
}
|
|
||||||
if observerTimeout, ok := settings["observerTimeout"].(float64); ok {
|
|
||||||
currentSessionSettings.ObserverTimeout = int(observerTimeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Trigger nickname auto-generation for sessions when RequireNickname changes
|
|
||||||
if sessionManager != nil {
|
|
||||||
sessionManager.updateAllSessionNicknames()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save to persistent config
|
|
||||||
if err := SaveConfig(); err != nil {
|
|
||||||
handlerErr = errors.New("failed to save session settings")
|
|
||||||
}
|
|
||||||
result = currentSessionSettings
|
|
||||||
} else {
|
|
||||||
handlerErr = errors.New("invalid settings parameter")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case "generateNickname":
|
case "generateNickname":
|
||||||
// Generate a nickname based on user agent (no permissions required)
|
result, handlerErr = handleGenerateNicknameRPC(request.Params)
|
||||||
userAgent := ""
|
|
||||||
if request.Params != nil {
|
|
||||||
if ua, ok := request.Params["userAgent"].(string); ok {
|
|
||||||
userAgent = ua
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use browser as fallback if no user agent provided
|
|
||||||
if userAgent == "" {
|
|
||||||
userAgent = "Mozilla/5.0 (Unknown) Browser"
|
|
||||||
}
|
|
||||||
|
|
||||||
result = map[string]string{
|
|
||||||
"nickname": generateNicknameFromUserAgent(userAgent),
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
// Check method permissions using centralized permission system
|
// Check method permissions using centralized permission system
|
||||||
if requiredPerm, exists := GetMethodPermission(request.Method); exists {
|
if requiredPerm, exists := GetMethodPermission(request.Method); exists {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,236 @@
|
||||||
|
package kvm
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// handleSessionTransferRPC handles primary control transfer requests (approve/deny)
|
||||||
|
func handleSessionTransferRPC(method string, params map[string]any, session *Session) (any, error) {
|
||||||
|
requesterID, ok := params["requesterID"].(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("invalid requesterID parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := RequirePermission(session, PermissionSessionTransfer); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
switch method {
|
||||||
|
case "approvePrimaryRequest":
|
||||||
|
err = sessionManager.ApprovePrimaryRequest(session.ID, requesterID)
|
||||||
|
if err == nil {
|
||||||
|
return map[string]interface{}{"status": "approved"}, nil
|
||||||
|
}
|
||||||
|
case "denyPrimaryRequest":
|
||||||
|
err = sessionManager.DenyPrimaryRequest(session.ID, requesterID)
|
||||||
|
if err == nil {
|
||||||
|
return map[string]interface{}{"status": "denied"}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleSessionApprovalRPC handles new session approval requests (approve/deny)
|
||||||
|
func handleSessionApprovalRPC(method string, params map[string]any, session *Session) (any, error) {
|
||||||
|
sessionID, ok := params["sessionId"].(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("invalid sessionId parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := RequirePermission(session, PermissionSessionApprove); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
switch method {
|
||||||
|
case "approveNewSession":
|
||||||
|
err = sessionManager.ApproveSession(sessionID)
|
||||||
|
if err == nil {
|
||||||
|
go sessionManager.broadcastSessionListUpdate()
|
||||||
|
return map[string]interface{}{"status": "approved"}, nil
|
||||||
|
}
|
||||||
|
case "denyNewSession":
|
||||||
|
err = sessionManager.DenySession(sessionID)
|
||||||
|
if err == nil {
|
||||||
|
if targetSession := sessionManager.GetSession(sessionID); targetSession != nil {
|
||||||
|
go func() {
|
||||||
|
writeJSONRPCEvent("sessionAccessDenied", map[string]interface{}{
|
||||||
|
"message": "Access denied by primary session",
|
||||||
|
}, targetSession)
|
||||||
|
sessionManager.broadcastSessionListUpdate()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
return map[string]interface{}{"status": "denied"}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleRequestSessionApprovalRPC handles pending sessions requesting approval from primary
|
||||||
|
func handleRequestSessionApprovalRPC(session *Session) (any, error) {
|
||||||
|
if session.Mode != SessionModePending {
|
||||||
|
return nil, errors.New("only pending sessions can request approval")
|
||||||
|
}
|
||||||
|
|
||||||
|
if currentSessionSettings == nil || !currentSessionSettings.RequireApproval {
|
||||||
|
return nil, errors.New("session approval not required")
|
||||||
|
}
|
||||||
|
|
||||||
|
primary := sessionManager.GetPrimarySession()
|
||||||
|
if primary == nil {
|
||||||
|
return nil, errors.New("no primary session available")
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
writeJSONRPCEvent("newSessionPending", map[string]interface{}{
|
||||||
|
"sessionId": session.ID,
|
||||||
|
"source": session.Source,
|
||||||
|
"identity": session.Identity,
|
||||||
|
"nickname": session.Nickname,
|
||||||
|
}, primary)
|
||||||
|
}()
|
||||||
|
|
||||||
|
return map[string]interface{}{"status": "requested"}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleUpdateSessionNicknameRPC handles nickname updates for sessions
|
||||||
|
func handleUpdateSessionNicknameRPC(params map[string]any, session *Session) (any, error) {
|
||||||
|
sessionID, _ := params["sessionId"].(string)
|
||||||
|
nickname, _ := params["nickname"].(string)
|
||||||
|
|
||||||
|
if len(nickname) < 2 {
|
||||||
|
return nil, errors.New("nickname must be at least 2 characters")
|
||||||
|
}
|
||||||
|
if len(nickname) > 30 {
|
||||||
|
return nil, errors.New("nickname must be 30 characters or less")
|
||||||
|
}
|
||||||
|
if !isValidNickname(nickname) {
|
||||||
|
return nil, errors.New("nickname can only contain letters, numbers, spaces, and - _ . @")
|
||||||
|
}
|
||||||
|
|
||||||
|
targetSession := sessionManager.GetSession(sessionID)
|
||||||
|
if targetSession == nil {
|
||||||
|
return nil, errors.New("session not found")
|
||||||
|
}
|
||||||
|
|
||||||
|
if targetSession.ID != session.ID && !session.HasPermission(PermissionSessionManage) {
|
||||||
|
return nil, errors.New("permission denied: can only update own nickname")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check nickname uniqueness
|
||||||
|
allSessions := sessionManager.GetAllSessions()
|
||||||
|
for _, existingSession := range allSessions {
|
||||||
|
if existingSession.ID != sessionID && existingSession.Nickname == nickname {
|
||||||
|
return nil, fmt.Errorf("nickname '%s' is already in use by another session", nickname)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
targetSession.Nickname = nickname
|
||||||
|
|
||||||
|
// If session is pending and approval is required, send the approval request now that we have a nickname
|
||||||
|
if targetSession.Mode == SessionModePending && currentSessionSettings != nil && currentSessionSettings.RequireApproval {
|
||||||
|
if primary := sessionManager.GetPrimarySession(); primary != nil {
|
||||||
|
go func() {
|
||||||
|
writeJSONRPCEvent("newSessionPending", map[string]interface{}{
|
||||||
|
"sessionId": targetSession.ID,
|
||||||
|
"source": targetSession.Source,
|
||||||
|
"identity": targetSession.Identity,
|
||||||
|
"nickname": targetSession.Nickname,
|
||||||
|
}, primary)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sessionManager.broadcastSessionListUpdate()
|
||||||
|
return map[string]interface{}{"status": "updated"}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleGetPermissionsRPC returns permissions for the current session
|
||||||
|
func handleGetPermissionsRPC(session *Session) (any, error) {
|
||||||
|
permissions := session.GetPermissions()
|
||||||
|
permMap := make(map[string]bool)
|
||||||
|
for perm, allowed := range permissions {
|
||||||
|
permMap[string(perm)] = allowed
|
||||||
|
}
|
||||||
|
return GetPermissionsResponse{
|
||||||
|
Mode: string(session.Mode),
|
||||||
|
Permissions: permMap,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleSessionSettingsRPC handles getting or setting session settings
|
||||||
|
func handleSessionSettingsRPC(method string, params map[string]any, session *Session) (any, error) {
|
||||||
|
switch method {
|
||||||
|
case "getSessionSettings":
|
||||||
|
if err := RequirePermission(session, PermissionSettingsRead); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return currentSessionSettings, nil
|
||||||
|
|
||||||
|
case "setSessionSettings":
|
||||||
|
if err := RequirePermission(session, PermissionSessionManage); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
settings, ok := params["settings"].(map[string]interface{})
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("invalid settings parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
if requireApproval, ok := settings["requireApproval"].(bool); ok {
|
||||||
|
currentSessionSettings.RequireApproval = requireApproval
|
||||||
|
}
|
||||||
|
if requireNickname, ok := settings["requireNickname"].(bool); ok {
|
||||||
|
currentSessionSettings.RequireNickname = requireNickname
|
||||||
|
}
|
||||||
|
if reconnectGrace, ok := settings["reconnectGrace"].(float64); ok {
|
||||||
|
currentSessionSettings.ReconnectGrace = int(reconnectGrace)
|
||||||
|
}
|
||||||
|
if primaryTimeout, ok := settings["primaryTimeout"].(float64); ok {
|
||||||
|
currentSessionSettings.PrimaryTimeout = int(primaryTimeout)
|
||||||
|
}
|
||||||
|
if privateKeystrokes, ok := settings["privateKeystrokes"].(bool); ok {
|
||||||
|
currentSessionSettings.PrivateKeystrokes = privateKeystrokes
|
||||||
|
}
|
||||||
|
if maxRejectionAttempts, ok := settings["maxRejectionAttempts"].(float64); ok {
|
||||||
|
currentSessionSettings.MaxRejectionAttempts = int(maxRejectionAttempts)
|
||||||
|
}
|
||||||
|
if maxSessions, ok := settings["maxSessions"].(float64); ok {
|
||||||
|
currentSessionSettings.MaxSessions = int(maxSessions)
|
||||||
|
}
|
||||||
|
if observerTimeout, ok := settings["observerTimeout"].(float64); ok {
|
||||||
|
currentSessionSettings.ObserverTimeout = int(observerTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
if sessionManager != nil {
|
||||||
|
sessionManager.updateAllSessionNicknames()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := SaveConfig(); err != nil {
|
||||||
|
return nil, errors.New("failed to save session settings")
|
||||||
|
}
|
||||||
|
return currentSessionSettings, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("unknown session settings method: %s", method)
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleGenerateNicknameRPC generates a nickname based on user agent
|
||||||
|
func handleGenerateNicknameRPC(params map[string]any) (any, error) {
|
||||||
|
userAgent := ""
|
||||||
|
if params != nil {
|
||||||
|
if ua, ok := params["userAgent"].(string); ok {
|
||||||
|
userAgent = ua
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if userAgent == "" {
|
||||||
|
userAgent = "Mozilla/5.0 (Unknown) Browser"
|
||||||
|
}
|
||||||
|
|
||||||
|
return map[string]string{
|
||||||
|
"nickname": generateNicknameFromUserAgent(userAgent),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,279 @@
|
||||||
|
package kvm
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// emergencyPromotionContext holds context for emergency promotion attempts
|
||||||
|
type emergencyPromotionContext struct {
|
||||||
|
triggerSessionID string
|
||||||
|
triggerReason string
|
||||||
|
now time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// attemptEmergencyPromotion tries to promote a session using emergency or normal promotion logic
|
||||||
|
// Returns (promotedSessionID, isEmergency, shouldSkip)
|
||||||
|
func (sm *SessionManager) attemptEmergencyPromotion(ctx emergencyPromotionContext, excludeSessionID string) (string, bool, bool) {
|
||||||
|
// Check if emergency promotion is needed
|
||||||
|
if currentSessionSettings == nil || !currentSessionSettings.RequireApproval {
|
||||||
|
// Normal promotion - reset consecutive counter
|
||||||
|
sm.consecutiveEmergencyPromotions = 0
|
||||||
|
promotedID := sm.findNextSessionToPromote()
|
||||||
|
return promotedID, false, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emergency promotion path
|
||||||
|
hasPrimary := sm.primarySessionID != ""
|
||||||
|
if !hasPrimary {
|
||||||
|
sm.logger.Error().
|
||||||
|
Str("triggerSessionID", ctx.triggerSessionID).
|
||||||
|
Msg("CRITICAL: No primary session exists - bypassing all rate limits")
|
||||||
|
} else {
|
||||||
|
// Rate limiting (only when we have a primary)
|
||||||
|
if ctx.now.Sub(sm.lastEmergencyPromotion) < 30*time.Second {
|
||||||
|
sm.logger.Warn().
|
||||||
|
Str("triggerSessionID", ctx.triggerSessionID).
|
||||||
|
Dur("timeSinceLastEmergency", ctx.now.Sub(sm.lastEmergencyPromotion)).
|
||||||
|
Msgf("Emergency promotion rate limit exceeded - potential attack (%s)", ctx.triggerReason)
|
||||||
|
return "", false, true // shouldSkip = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Limit consecutive emergency promotions
|
||||||
|
if sm.consecutiveEmergencyPromotions >= 3 {
|
||||||
|
sm.logger.Error().
|
||||||
|
Str("triggerSessionID", ctx.triggerSessionID).
|
||||||
|
Int("consecutiveCount", sm.consecutiveEmergencyPromotions).
|
||||||
|
Msgf("Too many consecutive emergency promotions - blocking for security (%s)", ctx.triggerReason)
|
||||||
|
return "", false, true // shouldSkip = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find best session for emergency promotion
|
||||||
|
var promotedSessionID string
|
||||||
|
if excludeSessionID != "" {
|
||||||
|
// Need to exclude a specific session (e.g., timed-out session)
|
||||||
|
bestSessionID := ""
|
||||||
|
bestScore := -1
|
||||||
|
for id, session := range sm.sessions {
|
||||||
|
if id != excludeSessionID &&
|
||||||
|
!sm.isSessionBlacklisted(id) &&
|
||||||
|
(session.Mode == SessionModeObserver || session.Mode == SessionModeQueued) {
|
||||||
|
score := sm.getSessionTrustScore(id)
|
||||||
|
if score > bestScore {
|
||||||
|
bestScore = score
|
||||||
|
bestSessionID = id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
promotedSessionID = bestSessionID
|
||||||
|
} else {
|
||||||
|
promotedSessionID = sm.findMostTrustedSessionForEmergency()
|
||||||
|
}
|
||||||
|
|
||||||
|
return promotedSessionID, true, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleGracePeriodExpiration checks and handles expired grace periods
|
||||||
|
// Returns true if any grace period expired
|
||||||
|
func (sm *SessionManager) handleGracePeriodExpiration(now time.Time) bool {
|
||||||
|
gracePeriodExpired := false
|
||||||
|
for sessionID, graceTime := range sm.reconnectGrace {
|
||||||
|
if now.After(graceTime) {
|
||||||
|
delete(sm.reconnectGrace, sessionID)
|
||||||
|
gracePeriodExpired = true
|
||||||
|
|
||||||
|
wasHoldingPrimarySlot := (sm.lastPrimaryID == sessionID)
|
||||||
|
|
||||||
|
if wasHoldingPrimarySlot {
|
||||||
|
sm.primarySessionID = ""
|
||||||
|
sm.lastPrimaryID = ""
|
||||||
|
|
||||||
|
sm.logger.Info().
|
||||||
|
Str("expiredSessionID", sessionID).
|
||||||
|
Msg("Primary session grace period expired - slot now available")
|
||||||
|
|
||||||
|
// Promote next eligible session using emergency logic if needed
|
||||||
|
sm.promoteAfterGraceExpiration(sessionID, now)
|
||||||
|
} else {
|
||||||
|
sm.logger.Debug().
|
||||||
|
Str("expiredSessionID", sessionID).
|
||||||
|
Msg("Non-primary session grace period expired")
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(sm.reconnectInfo, sessionID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return gracePeriodExpired
|
||||||
|
}
|
||||||
|
|
||||||
|
// promoteAfterGraceExpiration handles promotion after grace period expiration
|
||||||
|
func (sm *SessionManager) promoteAfterGraceExpiration(expiredSessionID string, now time.Time) {
|
||||||
|
ctx := emergencyPromotionContext{
|
||||||
|
triggerSessionID: expiredSessionID,
|
||||||
|
triggerReason: "grace_expiration",
|
||||||
|
now: now,
|
||||||
|
}
|
||||||
|
|
||||||
|
promotedSessionID, isEmergency, shouldSkip := sm.attemptEmergencyPromotion(ctx, "")
|
||||||
|
if shouldSkip {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if promotedSessionID != "" {
|
||||||
|
reason := "grace_expiration_promotion"
|
||||||
|
if isEmergency {
|
||||||
|
reason = "emergency_promotion_deadlock_prevention"
|
||||||
|
sm.lastEmergencyPromotion = now
|
||||||
|
sm.consecutiveEmergencyPromotions++
|
||||||
|
|
||||||
|
sm.logger.Warn().
|
||||||
|
Str("expiredSessionID", expiredSessionID).
|
||||||
|
Str("promotedSessionID", promotedSessionID).
|
||||||
|
Bool("requireApproval", true).
|
||||||
|
Int("consecutiveEmergencyPromotions", sm.consecutiveEmergencyPromotions).
|
||||||
|
Int("trustScore", sm.getSessionTrustScore(promotedSessionID)).
|
||||||
|
Msg("EMERGENCY: Bypassing approval requirement to prevent deadlock")
|
||||||
|
}
|
||||||
|
|
||||||
|
err := sm.transferPrimaryRole("", promotedSessionID, reason, "primary grace period expired")
|
||||||
|
if err == nil {
|
||||||
|
logEvent := sm.logger.Info()
|
||||||
|
if isEmergency {
|
||||||
|
logEvent = sm.logger.Warn()
|
||||||
|
}
|
||||||
|
logEvent.
|
||||||
|
Str("expiredSessionID", expiredSessionID).
|
||||||
|
Str("promotedSessionID", promotedSessionID).
|
||||||
|
Str("reason", reason).
|
||||||
|
Bool("isEmergencyPromotion", isEmergency).
|
||||||
|
Msg("Auto-promoted session after primary grace period expiration")
|
||||||
|
} else {
|
||||||
|
sm.logger.Error().
|
||||||
|
Err(err).
|
||||||
|
Str("expiredSessionID", expiredSessionID).
|
||||||
|
Str("promotedSessionID", promotedSessionID).
|
||||||
|
Str("reason", reason).
|
||||||
|
Bool("isEmergencyPromotion", isEmergency).
|
||||||
|
Msg("Failed to promote session after grace period expiration")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logLevel := sm.logger.Info()
|
||||||
|
if isEmergency {
|
||||||
|
logLevel = sm.logger.Error()
|
||||||
|
}
|
||||||
|
logLevel.
|
||||||
|
Str("expiredSessionID", expiredSessionID).
|
||||||
|
Bool("isEmergencyPromotion", isEmergency).
|
||||||
|
Msg("Primary grace period expired but no eligible sessions to promote")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handlePendingSessionTimeout removes timed-out pending sessions (DoS protection)
|
||||||
|
// Returns true if any pending session was removed
|
||||||
|
func (sm *SessionManager) handlePendingSessionTimeout(now time.Time) bool {
|
||||||
|
needsCleanup := false
|
||||||
|
for id, session := range sm.sessions {
|
||||||
|
if session.Mode == SessionModePending &&
|
||||||
|
now.Sub(session.CreatedAt) > defaultPendingSessionTimeout {
|
||||||
|
websocketLogger.Info().
|
||||||
|
Str("sessionId", id).
|
||||||
|
Dur("age", now.Sub(session.CreatedAt)).
|
||||||
|
Msg("Removing timed-out pending session")
|
||||||
|
delete(sm.sessions, id)
|
||||||
|
needsCleanup = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return needsCleanup
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleObserverSessionCleanup removes inactive observer sessions with closed RPC channels
|
||||||
|
// Returns true if any observer session was removed
|
||||||
|
func (sm *SessionManager) handleObserverSessionCleanup(now time.Time) bool {
|
||||||
|
observerTimeout := defaultObserverSessionTimeout
|
||||||
|
if currentSessionSettings != nil && currentSessionSettings.ObserverTimeout > 0 {
|
||||||
|
observerTimeout = time.Duration(currentSessionSettings.ObserverTimeout) * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
needsCleanup := false
|
||||||
|
for id, session := range sm.sessions {
|
||||||
|
if session.Mode == SessionModeObserver {
|
||||||
|
if session.RPCChannel == nil && now.Sub(session.LastActive) > observerTimeout {
|
||||||
|
sm.logger.Info().
|
||||||
|
Str("sessionId", id).
|
||||||
|
Dur("inactiveFor", now.Sub(session.LastActive)).
|
||||||
|
Dur("observerTimeout", observerTimeout).
|
||||||
|
Msg("Removing inactive observer session with closed RPC channel")
|
||||||
|
delete(sm.sessions, id)
|
||||||
|
needsCleanup = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return needsCleanup
|
||||||
|
}
|
||||||
|
|
||||||
|
// handlePrimarySessionTimeout checks and handles primary session timeout
|
||||||
|
// Returns true if primary session was timed out and cleanup is needed
|
||||||
|
func (sm *SessionManager) handlePrimarySessionTimeout(now time.Time) bool {
|
||||||
|
if sm.primarySessionID == "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
primary, exists := sm.sessions[sm.primarySessionID]
|
||||||
|
if !exists {
|
||||||
|
sm.primarySessionID = ""
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
currentTimeout := sm.getCurrentPrimaryTimeout()
|
||||||
|
if now.Sub(primary.LastActive) <= currentTimeout {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Timeout detected - demote primary
|
||||||
|
timedOutSessionID := primary.ID
|
||||||
|
primary.Mode = SessionModeObserver
|
||||||
|
sm.primarySessionID = ""
|
||||||
|
|
||||||
|
ctx := emergencyPromotionContext{
|
||||||
|
triggerSessionID: timedOutSessionID,
|
||||||
|
triggerReason: "timeout",
|
||||||
|
now: now,
|
||||||
|
}
|
||||||
|
|
||||||
|
promotedSessionID, isEmergency, shouldSkip := sm.attemptEmergencyPromotion(ctx, timedOutSessionID)
|
||||||
|
if shouldSkip {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if promotedSessionID != "" {
|
||||||
|
reason := "timeout_promotion"
|
||||||
|
if isEmergency {
|
||||||
|
reason = "emergency_timeout_promotion"
|
||||||
|
sm.lastEmergencyPromotion = now
|
||||||
|
sm.consecutiveEmergencyPromotions++
|
||||||
|
|
||||||
|
sm.logger.Warn().
|
||||||
|
Str("timedOutSessionID", timedOutSessionID).
|
||||||
|
Str("promotedSessionID", promotedSessionID).
|
||||||
|
Bool("requireApproval", true).
|
||||||
|
Int("trustScore", sm.getSessionTrustScore(promotedSessionID)).
|
||||||
|
Msg("EMERGENCY: Timeout promotion bypassing approval requirement")
|
||||||
|
}
|
||||||
|
|
||||||
|
err := sm.transferPrimaryRole(timedOutSessionID, promotedSessionID, reason, "primary session timeout")
|
||||||
|
if err == nil {
|
||||||
|
logEvent := sm.logger.Info()
|
||||||
|
if isEmergency {
|
||||||
|
logEvent = sm.logger.Warn()
|
||||||
|
}
|
||||||
|
logEvent.
|
||||||
|
Str("timedOutSessionID", timedOutSessionID).
|
||||||
|
Str("promotedSessionID", promotedSessionID).
|
||||||
|
Bool("isEmergencyPromotion", isEmergency).
|
||||||
|
Msg("Auto-promoted session after primary timeout")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
@ -280,12 +280,14 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe
|
||||||
}
|
}
|
||||||
|
|
||||||
isBlacklisted := sm.isSessionBlacklisted(session.ID)
|
isBlacklisted := sm.isSessionBlacklisted(session.ID)
|
||||||
|
isOnlySession := len(sm.sessions) == 0
|
||||||
|
|
||||||
// Determine if this session should become primary
|
// Determine if this session should become primary
|
||||||
// If there's no primary AND this is the ONLY session, ALWAYS promote regardless of blacklist
|
canBecomePrimary := !primaryExists && !hasActivePrimaryGracePeriod
|
||||||
isOnlySession := len(sm.sessions) == 0
|
isReconnectingPrimary := wasWithinGracePeriod && wasPreviouslyPrimary
|
||||||
shouldBecomePrimary := (wasWithinGracePeriod && wasPreviouslyPrimary && !primaryExists && !hasActivePrimaryGracePeriod) ||
|
isNewEligibleSession := !wasWithinGracePeriod && (!isBlacklisted || isOnlySession)
|
||||||
(!wasWithinGracePeriod && !hasActivePrimaryGracePeriod && !primaryExists && (!isBlacklisted || isOnlySession))
|
|
||||||
|
shouldBecomePrimary := canBecomePrimary && (isReconnectingPrimary || isNewEligibleSession)
|
||||||
|
|
||||||
if shouldBecomePrimary {
|
if shouldBecomePrimary {
|
||||||
if sm.primarySessionID == "" || sm.sessions[sm.primarySessionID] == nil {
|
if sm.primarySessionID == "" || sm.sessions[sm.primarySessionID] == nil {
|
||||||
|
|
@ -1565,10 +1567,10 @@ func (sm *SessionManager) Shutdown() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) {
|
func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) {
|
||||||
ticker := time.NewTicker(1 * time.Second) // Check every second for grace periods
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
validationCounter := 0 // Counter for periodic validateSinglePrimary calls
|
validationCounter := 0
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
|
@ -1579,313 +1581,33 @@ func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
needsBroadcast := false
|
needsBroadcast := false
|
||||||
|
|
||||||
// Check for expired grace periods and promote if needed
|
// Handle expired grace periods
|
||||||
gracePeriodExpired := false
|
gracePeriodExpired := sm.handleGracePeriodExpiration(now)
|
||||||
for sessionID, graceTime := range sm.reconnectGrace {
|
if gracePeriodExpired {
|
||||||
if now.After(graceTime) {
|
needsBroadcast = true
|
||||||
delete(sm.reconnectGrace, sessionID)
|
|
||||||
gracePeriodExpired = true
|
|
||||||
|
|
||||||
wasHoldingPrimarySlot := (sm.lastPrimaryID == sessionID)
|
|
||||||
|
|
||||||
// Check if this expired session was the primary holding the slot
|
|
||||||
if wasHoldingPrimarySlot {
|
|
||||||
// The primary didn't reconnect in time, now we can clear the slot and promote
|
|
||||||
sm.primarySessionID = ""
|
|
||||||
sm.lastPrimaryID = ""
|
|
||||||
needsBroadcast = true
|
|
||||||
|
|
||||||
sm.logger.Info().
|
|
||||||
Str("expiredSessionID", sessionID).
|
|
||||||
Msg("Primary session grace period expired - slot now available")
|
|
||||||
|
|
||||||
// Always try to promote when possible - approval is only for new pending sessions
|
|
||||||
// Use enhanced emergency promotion system for better security
|
|
||||||
isEmergencyPromotion := false
|
|
||||||
var promotedSessionID string
|
|
||||||
|
|
||||||
// === EMERGENCY PROMOTION ALGORITHM ===
|
|
||||||
//
|
|
||||||
// When RequireApproval is enabled, we face a potential deadlock scenario:
|
|
||||||
// - Primary session disconnects (grace period expires)
|
|
||||||
// - All other sessions are pending (waiting for approval from primary)
|
|
||||||
// - No primary exists to approve pending sessions
|
|
||||||
// - Result: System is stuck with no primary and no way to get one
|
|
||||||
//
|
|
||||||
// Solution: Emergency promotion bypasses approval requirement to select
|
|
||||||
// the most trustworthy pending/observer session as primary. This ensures
|
|
||||||
// the system ALWAYS has a primary session for KVM functionality.
|
|
||||||
//
|
|
||||||
// Security measures to prevent abuse:
|
|
||||||
// 1. Rate limiting: Max 1 emergency promotion per 30 seconds
|
|
||||||
// 2. Consecutive limit: Max 3 consecutive emergency promotions
|
|
||||||
// 3. Trust-based selection: Sessions scored on age, history, nickname
|
|
||||||
// 4. Audit logging: All emergency promotions logged at WARN level
|
|
||||||
//
|
|
||||||
// Trust scoring criteria (see getSessionTrustScore):
|
|
||||||
// - Session age: +1 point per minute (capped at 100)
|
|
||||||
// - Was previous primary: +50 points
|
|
||||||
// - Observer mode: +20 points (more trustworthy than queued/pending)
|
|
||||||
// - Queued mode: +10 points
|
|
||||||
// - Has required nickname: +15 points / missing: -30 points
|
|
||||||
//
|
|
||||||
// This algorithm prioritizes long-lived, previously-primary sessions
|
|
||||||
// with proper nicknames over newly-connected anonymous sessions.
|
|
||||||
//
|
|
||||||
// Check if this is an emergency scenario (RequireApproval enabled)
|
|
||||||
if currentSessionSettings != nil && currentSessionSettings.RequireApproval {
|
|
||||||
isEmergencyPromotion = true
|
|
||||||
|
|
||||||
// CRITICAL: Ensure we ALWAYS have a primary session
|
|
||||||
// If there's NO primary, bypass rate limits entirely
|
|
||||||
hasPrimary := sm.primarySessionID != ""
|
|
||||||
if !hasPrimary {
|
|
||||||
sm.logger.Error().
|
|
||||||
Str("expiredSessionID", sessionID).
|
|
||||||
Msg("CRITICAL: No primary session exists - bypassing all rate limits")
|
|
||||||
} else {
|
|
||||||
// Rate limiting for emergency promotions (only when we have a primary)
|
|
||||||
if now.Sub(sm.lastEmergencyPromotion) < 30*time.Second {
|
|
||||||
sm.logger.Warn().
|
|
||||||
Str("expiredSessionID", sessionID).
|
|
||||||
Dur("timeSinceLastEmergency", now.Sub(sm.lastEmergencyPromotion)).
|
|
||||||
Msg("Emergency promotion rate limit exceeded - potential attack")
|
|
||||||
continue // Skip this grace period expiration
|
|
||||||
}
|
|
||||||
|
|
||||||
// Limit consecutive emergency promotions
|
|
||||||
if sm.consecutiveEmergencyPromotions >= 3 {
|
|
||||||
sm.logger.Error().
|
|
||||||
Str("expiredSessionID", sessionID).
|
|
||||||
Int("consecutiveCount", sm.consecutiveEmergencyPromotions).
|
|
||||||
Msg("Too many consecutive emergency promotions - blocking for security")
|
|
||||||
continue // Skip this grace period expiration
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
promotedSessionID = sm.findMostTrustedSessionForEmergency()
|
|
||||||
} else {
|
|
||||||
// Normal promotion - reset consecutive counter
|
|
||||||
sm.consecutiveEmergencyPromotions = 0
|
|
||||||
promotedSessionID = sm.findNextSessionToPromote()
|
|
||||||
}
|
|
||||||
|
|
||||||
if promotedSessionID != "" {
|
|
||||||
// Determine reason and log appropriately
|
|
||||||
reason := "grace_expiration_promotion"
|
|
||||||
if isEmergencyPromotion {
|
|
||||||
reason = "emergency_promotion_deadlock_prevention"
|
|
||||||
sm.lastEmergencyPromotion = now
|
|
||||||
sm.consecutiveEmergencyPromotions++
|
|
||||||
|
|
||||||
// Enhanced logging for emergency promotions
|
|
||||||
sm.logger.Warn().
|
|
||||||
Str("expiredSessionID", sessionID).
|
|
||||||
Str("promotedSessionID", promotedSessionID).
|
|
||||||
Bool("requireApproval", true).
|
|
||||||
Int("consecutiveEmergencyPromotions", sm.consecutiveEmergencyPromotions).
|
|
||||||
Int("trustScore", sm.getSessionTrustScore(promotedSessionID)).
|
|
||||||
Msg("EMERGENCY: Bypassing approval requirement to prevent deadlock")
|
|
||||||
}
|
|
||||||
|
|
||||||
err := sm.transferPrimaryRole("", promotedSessionID, reason, "primary grace period expired")
|
|
||||||
if err == nil {
|
|
||||||
logEvent := sm.logger.Info()
|
|
||||||
if isEmergencyPromotion {
|
|
||||||
logEvent = sm.logger.Warn()
|
|
||||||
}
|
|
||||||
logEvent.
|
|
||||||
Str("expiredSessionID", sessionID).
|
|
||||||
Str("promotedSessionID", promotedSessionID).
|
|
||||||
Str("reason", reason).
|
|
||||||
Bool("isEmergencyPromotion", isEmergencyPromotion).
|
|
||||||
Msg("Auto-promoted session after primary grace period expiration")
|
|
||||||
} else {
|
|
||||||
sm.logger.Error().
|
|
||||||
Err(err).
|
|
||||||
Str("expiredSessionID", sessionID).
|
|
||||||
Str("promotedSessionID", promotedSessionID).
|
|
||||||
Str("reason", reason).
|
|
||||||
Bool("isEmergencyPromotion", isEmergencyPromotion).
|
|
||||||
Msg("Failed to promote session after grace period expiration")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logLevel := sm.logger.Info()
|
|
||||||
if isEmergencyPromotion {
|
|
||||||
logLevel = sm.logger.Error() // Emergency with no eligible sessions is critical
|
|
||||||
}
|
|
||||||
logLevel.
|
|
||||||
Str("expiredSessionID", sessionID).
|
|
||||||
Bool("isEmergencyPromotion", isEmergencyPromotion).
|
|
||||||
Msg("Primary grace period expired but no eligible sessions to promote")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Non-primary session grace period expired - just cleanup
|
|
||||||
sm.logger.Debug().
|
|
||||||
Str("expiredSessionID", sessionID).
|
|
||||||
Msg("Non-primary session grace period expired")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Also clean up reconnect info for expired sessions
|
|
||||||
delete(sm.reconnectInfo, sessionID)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean up pending sessions that have timed out (DoS protection)
|
// Clean up timed-out pending sessions (DoS protection)
|
||||||
for id, session := range sm.sessions {
|
if sm.handlePendingSessionTimeout(now) {
|
||||||
if session.Mode == SessionModePending &&
|
needsBroadcast = true
|
||||||
now.Sub(session.CreatedAt) > defaultPendingSessionTimeout {
|
|
||||||
websocketLogger.Info().
|
|
||||||
Str("sessionId", id).
|
|
||||||
Dur("age", now.Sub(session.CreatedAt)).
|
|
||||||
Msg("Removing timed-out pending session")
|
|
||||||
delete(sm.sessions, id)
|
|
||||||
needsBroadcast = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean up observer sessions with closed RPC channels (stale connections)
|
// Clean up inactive observer sessions
|
||||||
// This prevents accumulation of zombie observer sessions that are no longer connected
|
if sm.handleObserverSessionCleanup(now) {
|
||||||
observerTimeout := defaultObserverSessionTimeout
|
needsBroadcast = true
|
||||||
if currentSessionSettings != nil && currentSessionSettings.ObserverTimeout > 0 {
|
|
||||||
observerTimeout = time.Duration(currentSessionSettings.ObserverTimeout) * time.Second
|
|
||||||
}
|
|
||||||
for id, session := range sm.sessions {
|
|
||||||
if session.Mode == SessionModeObserver {
|
|
||||||
// Check if RPC channel is nil/closed AND session has been inactive
|
|
||||||
if session.RPCChannel == nil && now.Sub(session.LastActive) > observerTimeout {
|
|
||||||
sm.logger.Info().
|
|
||||||
Str("sessionId", id).
|
|
||||||
Dur("inactiveFor", now.Sub(session.LastActive)).
|
|
||||||
Dur("observerTimeout", observerTimeout).
|
|
||||||
Msg("Removing inactive observer session with closed RPC channel")
|
|
||||||
delete(sm.sessions, id)
|
|
||||||
needsBroadcast = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check primary session timeout (every 30 iterations = 30 seconds)
|
// Handle primary session timeout
|
||||||
if sm.primarySessionID != "" {
|
if sm.handlePrimarySessionTimeout(now) {
|
||||||
if primary, exists := sm.sessions[sm.primarySessionID]; exists {
|
needsBroadcast = true
|
||||||
currentTimeout := sm.getCurrentPrimaryTimeout()
|
|
||||||
if now.Sub(primary.LastActive) > currentTimeout {
|
|
||||||
timedOutSessionID := primary.ID
|
|
||||||
primary.Mode = SessionModeObserver
|
|
||||||
sm.primarySessionID = ""
|
|
||||||
|
|
||||||
// === TIMEOUT-BASED EMERGENCY PROMOTION ===
|
|
||||||
//
|
|
||||||
// Similar to grace period expiration, primary session timeout can create
|
|
||||||
// a deadlock when RequireApproval is enabled. The timeout detection happens
|
|
||||||
// every 30 seconds (based on ticker iterations) and demotes inactive primaries.
|
|
||||||
//
|
|
||||||
// Without emergency promotion:
|
|
||||||
// - Primary becomes inactive and times out
|
|
||||||
// - Primary is demoted to observer
|
|
||||||
// - All other sessions are pending (awaiting approval)
|
|
||||||
// - No primary exists to approve them
|
|
||||||
// - System deadlocked with no KVM control
|
|
||||||
//
|
|
||||||
// This uses the same trust-based selection and security measures as
|
|
||||||
// grace period emergency promotion to ensure system availability.
|
|
||||||
isEmergencyPromotion := false
|
|
||||||
var promotedSessionID string
|
|
||||||
|
|
||||||
// Check if this requires emergency promotion due to approval requirements
|
|
||||||
if currentSessionSettings != nil && currentSessionSettings.RequireApproval {
|
|
||||||
isEmergencyPromotion = true
|
|
||||||
|
|
||||||
// CRITICAL: Ensure we ALWAYS have a primary session
|
|
||||||
// primarySessionID was just cleared above, so this will always be empty
|
|
||||||
// But check anyway for completeness
|
|
||||||
hasPrimary := sm.primarySessionID != ""
|
|
||||||
if !hasPrimary {
|
|
||||||
sm.logger.Error().
|
|
||||||
Str("timedOutSessionID", timedOutSessionID).
|
|
||||||
Msg("CRITICAL: No primary session after timeout - bypassing all rate limits")
|
|
||||||
} else {
|
|
||||||
// Rate limiting for emergency promotions (only when we have a primary)
|
|
||||||
if now.Sub(sm.lastEmergencyPromotion) < 30*time.Second {
|
|
||||||
sm.logger.Warn().
|
|
||||||
Str("timedOutSessionID", timedOutSessionID).
|
|
||||||
Dur("timeSinceLastEmergency", now.Sub(sm.lastEmergencyPromotion)).
|
|
||||||
Msg("Emergency promotion rate limit exceeded during timeout - potential attack")
|
|
||||||
continue // Skip this timeout
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use trust-based selection but exclude the timed-out session
|
|
||||||
bestSessionID := ""
|
|
||||||
bestScore := -1
|
|
||||||
for id, session := range sm.sessions {
|
|
||||||
if id != timedOutSessionID &&
|
|
||||||
!sm.isSessionBlacklisted(id) &&
|
|
||||||
(session.Mode == SessionModeObserver || session.Mode == SessionModeQueued) {
|
|
||||||
score := sm.getSessionTrustScore(id)
|
|
||||||
if score > bestScore {
|
|
||||||
bestScore = score
|
|
||||||
bestSessionID = id
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
promotedSessionID = bestSessionID
|
|
||||||
} else {
|
|
||||||
// Normal timeout promotion - find any observer except the timed-out one
|
|
||||||
for id, session := range sm.sessions {
|
|
||||||
if id != timedOutSessionID && session.Mode == SessionModeObserver && !sm.isSessionBlacklisted(id) {
|
|
||||||
promotedSessionID = id
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If found a session to promote
|
|
||||||
if promotedSessionID != "" {
|
|
||||||
reason := "timeout_promotion"
|
|
||||||
if isEmergencyPromotion {
|
|
||||||
reason = "emergency_timeout_promotion"
|
|
||||||
sm.lastEmergencyPromotion = now
|
|
||||||
sm.consecutiveEmergencyPromotions++
|
|
||||||
|
|
||||||
// Enhanced logging for emergency timeout promotions
|
|
||||||
sm.logger.Warn().
|
|
||||||
Str("timedOutSessionID", timedOutSessionID).
|
|
||||||
Str("promotedSessionID", promotedSessionID).
|
|
||||||
Bool("requireApproval", true).
|
|
||||||
Int("trustScore", sm.getSessionTrustScore(promotedSessionID)).
|
|
||||||
Msg("EMERGENCY: Timeout promotion bypassing approval requirement")
|
|
||||||
}
|
|
||||||
|
|
||||||
err := sm.transferPrimaryRole(timedOutSessionID, promotedSessionID, reason, "primary session timeout")
|
|
||||||
if err == nil {
|
|
||||||
needsBroadcast = true
|
|
||||||
logEvent := sm.logger.Info()
|
|
||||||
if isEmergencyPromotion {
|
|
||||||
logEvent = sm.logger.Warn()
|
|
||||||
}
|
|
||||||
logEvent.
|
|
||||||
Str("timedOutSessionID", timedOutSessionID).
|
|
||||||
Str("promotedSessionID", promotedSessionID).
|
|
||||||
Bool("isEmergencyPromotion", isEmergencyPromotion).
|
|
||||||
Msg("Auto-promoted session after primary timeout")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Primary session no longer exists, clear it
|
|
||||||
sm.primarySessionID = ""
|
|
||||||
needsBroadcast = true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run validation immediately if a grace period expired, otherwise run periodically
|
// Run validation immediately if grace period expired, otherwise periodically
|
||||||
if gracePeriodExpired {
|
if gracePeriodExpired {
|
||||||
sm.validateSinglePrimary()
|
sm.validateSinglePrimary()
|
||||||
} else {
|
} else {
|
||||||
// Periodic validateSinglePrimary to catch deadlock states
|
|
||||||
validationCounter++
|
validationCounter++
|
||||||
if validationCounter >= 10 { // Every 10 seconds
|
if validationCounter >= 10 {
|
||||||
validationCounter = 0
|
validationCounter = 0
|
||||||
sm.validateSinglePrimary()
|
sm.validateSinglePrimary()
|
||||||
}
|
}
|
||||||
|
|
@ -1893,7 +1615,6 @@ func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) {
|
||||||
|
|
||||||
sm.mu.Unlock()
|
sm.mu.Unlock()
|
||||||
|
|
||||||
// Broadcast outside of lock if needed
|
|
||||||
if needsBroadcast {
|
if needsBroadcast {
|
||||||
go sm.broadcastSessionListUpdate()
|
go sm.broadcastSessionListUpdate()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue