mirror of https://github.com/jetkvm/kvm.git
Compare commits
4 Commits
0e040a9b54
...
9a10d3ed38
| Author | SHA1 | Date |
|---|---|---|
|
|
9a10d3ed38 | |
|
|
846caf77ce | |
|
|
da85b54fc2 | |
|
|
8f17bbd1f9 |
|
|
@ -69,7 +69,7 @@ func handleHidRPCMessage(message hidrpc.Message, session *Session) {
|
|||
logger.Warn().Err(err).Msg("failed to get pointer report")
|
||||
return
|
||||
}
|
||||
rpcErr = rpcAbsMouseReport(int16(pointerReport.X), int16(pointerReport.Y), pointerReport.Button)
|
||||
rpcErr = rpcAbsMouseReport(pointerReport.X, pointerReport.Y, pointerReport.Button)
|
||||
case hidrpc.TypeMouseReport:
|
||||
if !session.HasPermission(PermissionMouseInput) {
|
||||
logger.Debug().
|
||||
|
|
@ -146,8 +146,8 @@ const baseExtension = expectedRate + maxLateness // 100ms extension on perfect t
|
|||
const maxStaleness = 225 * time.Millisecond // discard ancient packets outright
|
||||
|
||||
func handleHidRPCKeypressKeepAlive(session *Session) error {
|
||||
// Update LastActive to prevent session timeout (jiggler sends every 50ms)
|
||||
sessionManager.UpdateLastActive(session.ID)
|
||||
// NOTE: Do NOT update LastActive here - jiggler keep-alives are automated,
|
||||
// not human input. Only actual keyboard/mouse input should prevent timeout.
|
||||
|
||||
session.keepAliveJitterLock.Lock()
|
||||
defer session.keepAliveJitterLock.Unlock()
|
||||
|
|
|
|||
|
|
@ -354,7 +354,7 @@ func (u *UsbGadget) UpdateKeysDown(modifier byte, keys []byte) KeysDownState {
|
|||
u.keyboardStateLock.Unlock()
|
||||
|
||||
if u.onKeysDownChange != nil {
|
||||
(*u.onKeysDownChange)(state)
|
||||
(*u.onKeysDownChange)(state) // this enques to the outgoing hidrpc queue via usb.go → currentSession.enqueueKeysDownState(...)
|
||||
}
|
||||
return state
|
||||
}
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ func (u *UsbGadget) absMouseWriteHidFile(data []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (u *UsbGadget) AbsMouseReport(x int16, y int16, buttons uint8) error {
|
||||
func (u *UsbGadget) AbsMouseReport(x int, y int, buttons uint8) error {
|
||||
u.absMouseLock.Lock()
|
||||
defer u.absMouseLock.Unlock()
|
||||
|
||||
|
|
|
|||
210
jsonrpc.go
210
jsonrpc.go
|
|
@ -163,213 +163,27 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) {
|
|||
|
||||
scopedLogger.Trace().Msg("Received RPC request")
|
||||
|
||||
// Handle session-specific RPC methods first
|
||||
var result any
|
||||
var handlerErr error
|
||||
|
||||
// Handle session management RPC methods
|
||||
switch request.Method {
|
||||
case "approvePrimaryRequest":
|
||||
if err := RequirePermission(session, PermissionSessionTransfer); err != nil {
|
||||
handlerErr = err
|
||||
} else if requesterID, ok := request.Params["requesterID"].(string); ok {
|
||||
handlerErr = sessionManager.ApprovePrimaryRequest(session.ID, requesterID)
|
||||
if handlerErr == nil {
|
||||
result = map[string]interface{}{"status": "approved"}
|
||||
}
|
||||
} else {
|
||||
handlerErr = errors.New("invalid requesterID parameter")
|
||||
}
|
||||
case "denyPrimaryRequest":
|
||||
if err := RequirePermission(session, PermissionSessionTransfer); err != nil {
|
||||
handlerErr = err
|
||||
} else if requesterID, ok := request.Params["requesterID"].(string); ok {
|
||||
handlerErr = sessionManager.DenyPrimaryRequest(session.ID, requesterID)
|
||||
if handlerErr == nil {
|
||||
result = map[string]interface{}{"status": "denied"}
|
||||
}
|
||||
} else {
|
||||
handlerErr = errors.New("invalid requesterID parameter")
|
||||
}
|
||||
case "approveNewSession":
|
||||
if err := RequirePermission(session, PermissionSessionApprove); err != nil {
|
||||
handlerErr = err
|
||||
} else if sessionID, ok := request.Params["sessionId"].(string); ok {
|
||||
handlerErr = sessionManager.ApproveSession(sessionID)
|
||||
if handlerErr == nil {
|
||||
go sessionManager.broadcastSessionListUpdate()
|
||||
result = map[string]interface{}{"status": "approved"}
|
||||
}
|
||||
} else {
|
||||
handlerErr = errors.New("invalid sessionId parameter")
|
||||
}
|
||||
case "denyNewSession":
|
||||
if err := RequirePermission(session, PermissionSessionApprove); err != nil {
|
||||
handlerErr = err
|
||||
} else if sessionID, ok := request.Params["sessionId"].(string); ok {
|
||||
handlerErr = sessionManager.DenySession(sessionID)
|
||||
if handlerErr == nil {
|
||||
// Notify the denied session
|
||||
if targetSession := sessionManager.GetSession(sessionID); targetSession != nil {
|
||||
go func() {
|
||||
writeJSONRPCEvent("sessionAccessDenied", map[string]interface{}{
|
||||
"message": "Access denied by primary session",
|
||||
}, targetSession)
|
||||
sessionManager.broadcastSessionListUpdate()
|
||||
}()
|
||||
}
|
||||
result = map[string]interface{}{"status": "denied"}
|
||||
}
|
||||
} else {
|
||||
handlerErr = errors.New("invalid sessionId parameter")
|
||||
}
|
||||
case "approvePrimaryRequest", "denyPrimaryRequest":
|
||||
result, handlerErr = handleSessionTransferRPC(request.Method, request.Params, session)
|
||||
case "approveNewSession", "denyNewSession":
|
||||
result, handlerErr = handleSessionApprovalRPC(request.Method, request.Params, session)
|
||||
case "requestSessionApproval":
|
||||
if session.Mode != SessionModePending {
|
||||
handlerErr = errors.New("only pending sessions can request approval")
|
||||
} else if currentSessionSettings != nil && currentSessionSettings.RequireApproval {
|
||||
if primary := sessionManager.GetPrimarySession(); primary != nil {
|
||||
go func() {
|
||||
writeJSONRPCEvent("newSessionPending", map[string]interface{}{
|
||||
"sessionId": session.ID,
|
||||
"source": session.Source,
|
||||
"identity": session.Identity,
|
||||
"nickname": session.Nickname,
|
||||
}, primary)
|
||||
}()
|
||||
result = map[string]interface{}{"status": "requested"}
|
||||
} else {
|
||||
handlerErr = errors.New("no primary session available")
|
||||
}
|
||||
} else {
|
||||
handlerErr = errors.New("session approval not required")
|
||||
}
|
||||
result, handlerErr = handleRequestSessionApprovalRPC(session)
|
||||
case "updateSessionNickname":
|
||||
sessionID, _ := request.Params["sessionId"].(string)
|
||||
nickname, _ := request.Params["nickname"].(string)
|
||||
// Validate nickname to match frontend validation
|
||||
if len(nickname) < 2 {
|
||||
handlerErr = errors.New("nickname must be at least 2 characters")
|
||||
} else if len(nickname) > 30 {
|
||||
handlerErr = errors.New("nickname must be 30 characters or less")
|
||||
} else if !isValidNickname(nickname) {
|
||||
handlerErr = errors.New("nickname can only contain letters, numbers, spaces, and - _ . @")
|
||||
} else if targetSession := sessionManager.GetSession(sessionID); targetSession != nil {
|
||||
// Users can update their own nickname, or admins can update any
|
||||
if targetSession.ID == session.ID || session.HasPermission(PermissionSessionManage) {
|
||||
// Check nickname uniqueness
|
||||
allSessions := sessionManager.GetAllSessions()
|
||||
for _, existingSession := range allSessions {
|
||||
if existingSession.ID != sessionID && existingSession.Nickname == nickname {
|
||||
handlerErr = fmt.Errorf("nickname '%s' is already in use by another session", nickname)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if handlerErr == nil {
|
||||
targetSession.Nickname = nickname
|
||||
|
||||
// If session is pending and approval is required, send the approval request now that we have a nickname
|
||||
if targetSession.Mode == SessionModePending && currentSessionSettings != nil && currentSessionSettings.RequireApproval {
|
||||
if primary := sessionManager.GetPrimarySession(); primary != nil {
|
||||
go func() {
|
||||
writeJSONRPCEvent("newSessionPending", map[string]interface{}{
|
||||
"sessionId": targetSession.ID,
|
||||
"source": targetSession.Source,
|
||||
"identity": targetSession.Identity,
|
||||
"nickname": targetSession.Nickname,
|
||||
}, primary)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
sessionManager.broadcastSessionListUpdate()
|
||||
result = map[string]interface{}{"status": "updated"}
|
||||
}
|
||||
} else {
|
||||
handlerErr = errors.New("permission denied: can only update own nickname")
|
||||
}
|
||||
} else {
|
||||
handlerErr = errors.New("session not found")
|
||||
}
|
||||
result, handlerErr = handleUpdateSessionNicknameRPC(request.Params, session)
|
||||
case "getSessions":
|
||||
sessions := sessionManager.GetAllSessions()
|
||||
result = sessions
|
||||
result = sessionManager.GetAllSessions()
|
||||
case "getPermissions":
|
||||
permissions := session.GetPermissions()
|
||||
permMap := make(map[string]bool)
|
||||
for perm, allowed := range permissions {
|
||||
permMap[string(perm)] = allowed
|
||||
}
|
||||
result = GetPermissionsResponse{
|
||||
Mode: string(session.Mode),
|
||||
Permissions: permMap,
|
||||
}
|
||||
case "getSessionSettings":
|
||||
if err := RequirePermission(session, PermissionSettingsRead); err != nil {
|
||||
handlerErr = err
|
||||
} else {
|
||||
result = currentSessionSettings
|
||||
}
|
||||
case "setSessionSettings":
|
||||
if err := RequirePermission(session, PermissionSessionManage); err != nil {
|
||||
handlerErr = err
|
||||
} else {
|
||||
if settings, ok := request.Params["settings"].(map[string]interface{}); ok {
|
||||
if requireApproval, ok := settings["requireApproval"].(bool); ok {
|
||||
currentSessionSettings.RequireApproval = requireApproval
|
||||
}
|
||||
if requireNickname, ok := settings["requireNickname"].(bool); ok {
|
||||
currentSessionSettings.RequireNickname = requireNickname
|
||||
}
|
||||
if reconnectGrace, ok := settings["reconnectGrace"].(float64); ok {
|
||||
currentSessionSettings.ReconnectGrace = int(reconnectGrace)
|
||||
}
|
||||
if primaryTimeout, ok := settings["primaryTimeout"].(float64); ok {
|
||||
currentSessionSettings.PrimaryTimeout = int(primaryTimeout)
|
||||
}
|
||||
if privateKeystrokes, ok := settings["privateKeystrokes"].(bool); ok {
|
||||
currentSessionSettings.PrivateKeystrokes = privateKeystrokes
|
||||
}
|
||||
if maxRejectionAttempts, ok := settings["maxRejectionAttempts"].(float64); ok {
|
||||
currentSessionSettings.MaxRejectionAttempts = int(maxRejectionAttempts)
|
||||
}
|
||||
if maxSessions, ok := settings["maxSessions"].(float64); ok {
|
||||
currentSessionSettings.MaxSessions = int(maxSessions)
|
||||
}
|
||||
if observerTimeout, ok := settings["observerTimeout"].(float64); ok {
|
||||
currentSessionSettings.ObserverTimeout = int(observerTimeout)
|
||||
}
|
||||
|
||||
// Trigger nickname auto-generation for sessions when RequireNickname changes
|
||||
if sessionManager != nil {
|
||||
sessionManager.updateAllSessionNicknames()
|
||||
}
|
||||
|
||||
// Save to persistent config
|
||||
if err := SaveConfig(); err != nil {
|
||||
handlerErr = errors.New("failed to save session settings")
|
||||
}
|
||||
result = currentSessionSettings
|
||||
} else {
|
||||
handlerErr = errors.New("invalid settings parameter")
|
||||
}
|
||||
}
|
||||
result, handlerErr = handleGetPermissionsRPC(session)
|
||||
case "getSessionSettings", "setSessionSettings":
|
||||
result, handlerErr = handleSessionSettingsRPC(request.Method, request.Params, session)
|
||||
case "generateNickname":
|
||||
// Generate a nickname based on user agent (no permissions required)
|
||||
userAgent := ""
|
||||
if request.Params != nil {
|
||||
if ua, ok := request.Params["userAgent"].(string); ok {
|
||||
userAgent = ua
|
||||
}
|
||||
}
|
||||
|
||||
// Use browser as fallback if no user agent provided
|
||||
if userAgent == "" {
|
||||
userAgent = "Mozilla/5.0 (Unknown) Browser"
|
||||
}
|
||||
|
||||
result = map[string]string{
|
||||
"nickname": generateNicknameFromUserAgent(userAgent),
|
||||
}
|
||||
result, handlerErr = handleGenerateNicknameRPC(request.Params)
|
||||
default:
|
||||
// Check method permissions using centralized permission system
|
||||
if requiredPerm, exists := GetMethodPermission(request.Method); exists {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,236 @@
|
|||
package kvm
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// handleSessionTransferRPC handles primary control transfer requests (approve/deny)
|
||||
func handleSessionTransferRPC(method string, params map[string]any, session *Session) (any, error) {
|
||||
requesterID, ok := params["requesterID"].(string)
|
||||
if !ok {
|
||||
return nil, errors.New("invalid requesterID parameter")
|
||||
}
|
||||
|
||||
if err := RequirePermission(session, PermissionSessionTransfer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var err error
|
||||
switch method {
|
||||
case "approvePrimaryRequest":
|
||||
err = sessionManager.ApprovePrimaryRequest(session.ID, requesterID)
|
||||
if err == nil {
|
||||
return map[string]interface{}{"status": "approved"}, nil
|
||||
}
|
||||
case "denyPrimaryRequest":
|
||||
err = sessionManager.DenyPrimaryRequest(session.ID, requesterID)
|
||||
if err == nil {
|
||||
return map[string]interface{}{"status": "denied"}, nil
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// handleSessionApprovalRPC handles new session approval requests (approve/deny)
|
||||
func handleSessionApprovalRPC(method string, params map[string]any, session *Session) (any, error) {
|
||||
sessionID, ok := params["sessionId"].(string)
|
||||
if !ok {
|
||||
return nil, errors.New("invalid sessionId parameter")
|
||||
}
|
||||
|
||||
if err := RequirePermission(session, PermissionSessionApprove); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var err error
|
||||
switch method {
|
||||
case "approveNewSession":
|
||||
err = sessionManager.ApproveSession(sessionID)
|
||||
if err == nil {
|
||||
go sessionManager.broadcastSessionListUpdate()
|
||||
return map[string]interface{}{"status": "approved"}, nil
|
||||
}
|
||||
case "denyNewSession":
|
||||
err = sessionManager.DenySession(sessionID)
|
||||
if err == nil {
|
||||
if targetSession := sessionManager.GetSession(sessionID); targetSession != nil {
|
||||
go func() {
|
||||
writeJSONRPCEvent("sessionAccessDenied", map[string]interface{}{
|
||||
"message": "Access denied by primary session",
|
||||
}, targetSession)
|
||||
sessionManager.broadcastSessionListUpdate()
|
||||
}()
|
||||
}
|
||||
return map[string]interface{}{"status": "denied"}, nil
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// handleRequestSessionApprovalRPC handles pending sessions requesting approval from primary
|
||||
func handleRequestSessionApprovalRPC(session *Session) (any, error) {
|
||||
if session.Mode != SessionModePending {
|
||||
return nil, errors.New("only pending sessions can request approval")
|
||||
}
|
||||
|
||||
if currentSessionSettings == nil || !currentSessionSettings.RequireApproval {
|
||||
return nil, errors.New("session approval not required")
|
||||
}
|
||||
|
||||
primary := sessionManager.GetPrimarySession()
|
||||
if primary == nil {
|
||||
return nil, errors.New("no primary session available")
|
||||
}
|
||||
|
||||
go func() {
|
||||
writeJSONRPCEvent("newSessionPending", map[string]interface{}{
|
||||
"sessionId": session.ID,
|
||||
"source": session.Source,
|
||||
"identity": session.Identity,
|
||||
"nickname": session.Nickname,
|
||||
}, primary)
|
||||
}()
|
||||
|
||||
return map[string]interface{}{"status": "requested"}, nil
|
||||
}
|
||||
|
||||
// handleUpdateSessionNicknameRPC handles nickname updates for sessions
|
||||
func handleUpdateSessionNicknameRPC(params map[string]any, session *Session) (any, error) {
|
||||
sessionID, _ := params["sessionId"].(string)
|
||||
nickname, _ := params["nickname"].(string)
|
||||
|
||||
if len(nickname) < 2 {
|
||||
return nil, errors.New("nickname must be at least 2 characters")
|
||||
}
|
||||
if len(nickname) > 30 {
|
||||
return nil, errors.New("nickname must be 30 characters or less")
|
||||
}
|
||||
if !isValidNickname(nickname) {
|
||||
return nil, errors.New("nickname can only contain letters, numbers, spaces, and - _ . @")
|
||||
}
|
||||
|
||||
targetSession := sessionManager.GetSession(sessionID)
|
||||
if targetSession == nil {
|
||||
return nil, errors.New("session not found")
|
||||
}
|
||||
|
||||
if targetSession.ID != session.ID && !session.HasPermission(PermissionSessionManage) {
|
||||
return nil, errors.New("permission denied: can only update own nickname")
|
||||
}
|
||||
|
||||
// Check nickname uniqueness
|
||||
allSessions := sessionManager.GetAllSessions()
|
||||
for _, existingSession := range allSessions {
|
||||
if existingSession.ID != sessionID && existingSession.Nickname == nickname {
|
||||
return nil, fmt.Errorf("nickname '%s' is already in use by another session", nickname)
|
||||
}
|
||||
}
|
||||
|
||||
targetSession.Nickname = nickname
|
||||
|
||||
// If session is pending and approval is required, send the approval request now that we have a nickname
|
||||
if targetSession.Mode == SessionModePending && currentSessionSettings != nil && currentSessionSettings.RequireApproval {
|
||||
if primary := sessionManager.GetPrimarySession(); primary != nil {
|
||||
go func() {
|
||||
writeJSONRPCEvent("newSessionPending", map[string]interface{}{
|
||||
"sessionId": targetSession.ID,
|
||||
"source": targetSession.Source,
|
||||
"identity": targetSession.Identity,
|
||||
"nickname": targetSession.Nickname,
|
||||
}, primary)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
sessionManager.broadcastSessionListUpdate()
|
||||
return map[string]interface{}{"status": "updated"}, nil
|
||||
}
|
||||
|
||||
// handleGetPermissionsRPC returns permissions for the current session
|
||||
func handleGetPermissionsRPC(session *Session) (any, error) {
|
||||
permissions := session.GetPermissions()
|
||||
permMap := make(map[string]bool)
|
||||
for perm, allowed := range permissions {
|
||||
permMap[string(perm)] = allowed
|
||||
}
|
||||
return GetPermissionsResponse{
|
||||
Mode: string(session.Mode),
|
||||
Permissions: permMap,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// handleSessionSettingsRPC handles getting or setting session settings
|
||||
func handleSessionSettingsRPC(method string, params map[string]any, session *Session) (any, error) {
|
||||
switch method {
|
||||
case "getSessionSettings":
|
||||
if err := RequirePermission(session, PermissionSettingsRead); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return currentSessionSettings, nil
|
||||
|
||||
case "setSessionSettings":
|
||||
if err := RequirePermission(session, PermissionSessionManage); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
settings, ok := params["settings"].(map[string]interface{})
|
||||
if !ok {
|
||||
return nil, errors.New("invalid settings parameter")
|
||||
}
|
||||
|
||||
if requireApproval, ok := settings["requireApproval"].(bool); ok {
|
||||
currentSessionSettings.RequireApproval = requireApproval
|
||||
}
|
||||
if requireNickname, ok := settings["requireNickname"].(bool); ok {
|
||||
currentSessionSettings.RequireNickname = requireNickname
|
||||
}
|
||||
if reconnectGrace, ok := settings["reconnectGrace"].(float64); ok {
|
||||
currentSessionSettings.ReconnectGrace = int(reconnectGrace)
|
||||
}
|
||||
if primaryTimeout, ok := settings["primaryTimeout"].(float64); ok {
|
||||
currentSessionSettings.PrimaryTimeout = int(primaryTimeout)
|
||||
}
|
||||
if privateKeystrokes, ok := settings["privateKeystrokes"].(bool); ok {
|
||||
currentSessionSettings.PrivateKeystrokes = privateKeystrokes
|
||||
}
|
||||
if maxRejectionAttempts, ok := settings["maxRejectionAttempts"].(float64); ok {
|
||||
currentSessionSettings.MaxRejectionAttempts = int(maxRejectionAttempts)
|
||||
}
|
||||
if maxSessions, ok := settings["maxSessions"].(float64); ok {
|
||||
currentSessionSettings.MaxSessions = int(maxSessions)
|
||||
}
|
||||
if observerTimeout, ok := settings["observerTimeout"].(float64); ok {
|
||||
currentSessionSettings.ObserverTimeout = int(observerTimeout)
|
||||
}
|
||||
|
||||
if sessionManager != nil {
|
||||
sessionManager.updateAllSessionNicknames()
|
||||
}
|
||||
|
||||
if err := SaveConfig(); err != nil {
|
||||
return nil, errors.New("failed to save session settings")
|
||||
}
|
||||
return currentSessionSettings, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("unknown session settings method: %s", method)
|
||||
}
|
||||
|
||||
// handleGenerateNicknameRPC generates a nickname based on user agent
|
||||
func handleGenerateNicknameRPC(params map[string]any) (any, error) {
|
||||
userAgent := ""
|
||||
if params != nil {
|
||||
if ua, ok := params["userAgent"].(string); ok {
|
||||
userAgent = ua
|
||||
}
|
||||
}
|
||||
|
||||
if userAgent == "" {
|
||||
userAgent = "Mozilla/5.0 (Unknown) Browser"
|
||||
}
|
||||
|
||||
return map[string]string{
|
||||
"nickname": generateNicknameFromUserAgent(userAgent),
|
||||
}, nil
|
||||
}
|
||||
|
|
@ -62,6 +62,9 @@ func initNative(systemVersion *semver.Version, appVersion *semver.Version) {
|
|||
Str("sessionID", s.ID).
|
||||
Err(err).
|
||||
Msg("error writing sample to session")
|
||||
} else {
|
||||
// Update LastActive when video frame successfully sent (prevents observer timeout)
|
||||
sessionManager.UpdateLastActive(s.ID)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
|
|||
|
|
@ -0,0 +1,279 @@
|
|||
package kvm
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// emergencyPromotionContext holds context for emergency promotion attempts
|
||||
type emergencyPromotionContext struct {
|
||||
triggerSessionID string
|
||||
triggerReason string
|
||||
now time.Time
|
||||
}
|
||||
|
||||
// attemptEmergencyPromotion tries to promote a session using emergency or normal promotion logic
|
||||
// Returns (promotedSessionID, isEmergency, shouldSkip)
|
||||
func (sm *SessionManager) attemptEmergencyPromotion(ctx emergencyPromotionContext, excludeSessionID string) (string, bool, bool) {
|
||||
// Check if emergency promotion is needed
|
||||
if currentSessionSettings == nil || !currentSessionSettings.RequireApproval {
|
||||
// Normal promotion - reset consecutive counter
|
||||
sm.consecutiveEmergencyPromotions = 0
|
||||
promotedID := sm.findNextSessionToPromote()
|
||||
return promotedID, false, false
|
||||
}
|
||||
|
||||
// Emergency promotion path
|
||||
hasPrimary := sm.primarySessionID != ""
|
||||
if !hasPrimary {
|
||||
sm.logger.Error().
|
||||
Str("triggerSessionID", ctx.triggerSessionID).
|
||||
Msg("CRITICAL: No primary session exists - bypassing all rate limits")
|
||||
} else {
|
||||
// Rate limiting (only when we have a primary)
|
||||
if ctx.now.Sub(sm.lastEmergencyPromotion) < 30*time.Second {
|
||||
sm.logger.Warn().
|
||||
Str("triggerSessionID", ctx.triggerSessionID).
|
||||
Dur("timeSinceLastEmergency", ctx.now.Sub(sm.lastEmergencyPromotion)).
|
||||
Msgf("Emergency promotion rate limit exceeded - potential attack (%s)", ctx.triggerReason)
|
||||
return "", false, true // shouldSkip = true
|
||||
}
|
||||
|
||||
// Limit consecutive emergency promotions
|
||||
if sm.consecutiveEmergencyPromotions >= 3 {
|
||||
sm.logger.Error().
|
||||
Str("triggerSessionID", ctx.triggerSessionID).
|
||||
Int("consecutiveCount", sm.consecutiveEmergencyPromotions).
|
||||
Msgf("Too many consecutive emergency promotions - blocking for security (%s)", ctx.triggerReason)
|
||||
return "", false, true // shouldSkip = true
|
||||
}
|
||||
}
|
||||
|
||||
// Find best session for emergency promotion
|
||||
var promotedSessionID string
|
||||
if excludeSessionID != "" {
|
||||
// Need to exclude a specific session (e.g., timed-out session)
|
||||
bestSessionID := ""
|
||||
bestScore := -1
|
||||
for id, session := range sm.sessions {
|
||||
if id != excludeSessionID &&
|
||||
!sm.isSessionBlacklisted(id) &&
|
||||
(session.Mode == SessionModeObserver || session.Mode == SessionModeQueued) {
|
||||
score := sm.getSessionTrustScore(id)
|
||||
if score > bestScore {
|
||||
bestScore = score
|
||||
bestSessionID = id
|
||||
}
|
||||
}
|
||||
}
|
||||
promotedSessionID = bestSessionID
|
||||
} else {
|
||||
promotedSessionID = sm.findMostTrustedSessionForEmergency()
|
||||
}
|
||||
|
||||
return promotedSessionID, true, false
|
||||
}
|
||||
|
||||
// handleGracePeriodExpiration checks and handles expired grace periods
|
||||
// Returns true if any grace period expired
|
||||
func (sm *SessionManager) handleGracePeriodExpiration(now time.Time) bool {
|
||||
gracePeriodExpired := false
|
||||
for sessionID, graceTime := range sm.reconnectGrace {
|
||||
if now.After(graceTime) {
|
||||
delete(sm.reconnectGrace, sessionID)
|
||||
gracePeriodExpired = true
|
||||
|
||||
wasHoldingPrimarySlot := (sm.lastPrimaryID == sessionID)
|
||||
|
||||
if wasHoldingPrimarySlot {
|
||||
sm.primarySessionID = ""
|
||||
sm.lastPrimaryID = ""
|
||||
|
||||
sm.logger.Info().
|
||||
Str("expiredSessionID", sessionID).
|
||||
Msg("Primary session grace period expired - slot now available")
|
||||
|
||||
// Promote next eligible session using emergency logic if needed
|
||||
sm.promoteAfterGraceExpiration(sessionID, now)
|
||||
} else {
|
||||
sm.logger.Debug().
|
||||
Str("expiredSessionID", sessionID).
|
||||
Msg("Non-primary session grace period expired")
|
||||
}
|
||||
|
||||
delete(sm.reconnectInfo, sessionID)
|
||||
}
|
||||
}
|
||||
return gracePeriodExpired
|
||||
}
|
||||
|
||||
// promoteAfterGraceExpiration handles promotion after grace period expiration
|
||||
func (sm *SessionManager) promoteAfterGraceExpiration(expiredSessionID string, now time.Time) {
|
||||
ctx := emergencyPromotionContext{
|
||||
triggerSessionID: expiredSessionID,
|
||||
triggerReason: "grace_expiration",
|
||||
now: now,
|
||||
}
|
||||
|
||||
promotedSessionID, isEmergency, shouldSkip := sm.attemptEmergencyPromotion(ctx, "")
|
||||
if shouldSkip {
|
||||
return
|
||||
}
|
||||
|
||||
if promotedSessionID != "" {
|
||||
reason := "grace_expiration_promotion"
|
||||
if isEmergency {
|
||||
reason = "emergency_promotion_deadlock_prevention"
|
||||
sm.lastEmergencyPromotion = now
|
||||
sm.consecutiveEmergencyPromotions++
|
||||
|
||||
sm.logger.Warn().
|
||||
Str("expiredSessionID", expiredSessionID).
|
||||
Str("promotedSessionID", promotedSessionID).
|
||||
Bool("requireApproval", true).
|
||||
Int("consecutiveEmergencyPromotions", sm.consecutiveEmergencyPromotions).
|
||||
Int("trustScore", sm.getSessionTrustScore(promotedSessionID)).
|
||||
Msg("EMERGENCY: Bypassing approval requirement to prevent deadlock")
|
||||
}
|
||||
|
||||
err := sm.transferPrimaryRole("", promotedSessionID, reason, "primary grace period expired")
|
||||
if err == nil {
|
||||
logEvent := sm.logger.Info()
|
||||
if isEmergency {
|
||||
logEvent = sm.logger.Warn()
|
||||
}
|
||||
logEvent.
|
||||
Str("expiredSessionID", expiredSessionID).
|
||||
Str("promotedSessionID", promotedSessionID).
|
||||
Str("reason", reason).
|
||||
Bool("isEmergencyPromotion", isEmergency).
|
||||
Msg("Auto-promoted session after primary grace period expiration")
|
||||
} else {
|
||||
sm.logger.Error().
|
||||
Err(err).
|
||||
Str("expiredSessionID", expiredSessionID).
|
||||
Str("promotedSessionID", promotedSessionID).
|
||||
Str("reason", reason).
|
||||
Bool("isEmergencyPromotion", isEmergency).
|
||||
Msg("Failed to promote session after grace period expiration")
|
||||
}
|
||||
} else {
|
||||
logLevel := sm.logger.Info()
|
||||
if isEmergency {
|
||||
logLevel = sm.logger.Error()
|
||||
}
|
||||
logLevel.
|
||||
Str("expiredSessionID", expiredSessionID).
|
||||
Bool("isEmergencyPromotion", isEmergency).
|
||||
Msg("Primary grace period expired but no eligible sessions to promote")
|
||||
}
|
||||
}
|
||||
|
||||
// handlePendingSessionTimeout removes timed-out pending sessions (DoS protection)
|
||||
// Returns true if any pending session was removed
|
||||
func (sm *SessionManager) handlePendingSessionTimeout(now time.Time) bool {
|
||||
needsCleanup := false
|
||||
for id, session := range sm.sessions {
|
||||
if session.Mode == SessionModePending &&
|
||||
now.Sub(session.CreatedAt) > defaultPendingSessionTimeout {
|
||||
websocketLogger.Info().
|
||||
Str("sessionId", id).
|
||||
Dur("age", now.Sub(session.CreatedAt)).
|
||||
Msg("Removing timed-out pending session")
|
||||
delete(sm.sessions, id)
|
||||
needsCleanup = true
|
||||
}
|
||||
}
|
||||
return needsCleanup
|
||||
}
|
||||
|
||||
// handleObserverSessionCleanup removes inactive observer sessions with closed RPC channels
|
||||
// Returns true if any observer session was removed
|
||||
func (sm *SessionManager) handleObserverSessionCleanup(now time.Time) bool {
|
||||
observerTimeout := defaultObserverSessionTimeout
|
||||
if currentSessionSettings != nil && currentSessionSettings.ObserverTimeout > 0 {
|
||||
observerTimeout = time.Duration(currentSessionSettings.ObserverTimeout) * time.Second
|
||||
}
|
||||
|
||||
needsCleanup := false
|
||||
for id, session := range sm.sessions {
|
||||
if session.Mode == SessionModeObserver {
|
||||
if session.RPCChannel == nil && now.Sub(session.LastActive) > observerTimeout {
|
||||
sm.logger.Info().
|
||||
Str("sessionId", id).
|
||||
Dur("inactiveFor", now.Sub(session.LastActive)).
|
||||
Dur("observerTimeout", observerTimeout).
|
||||
Msg("Removing inactive observer session with closed RPC channel")
|
||||
delete(sm.sessions, id)
|
||||
needsCleanup = true
|
||||
}
|
||||
}
|
||||
}
|
||||
return needsCleanup
|
||||
}
|
||||
|
||||
// handlePrimarySessionTimeout checks and handles primary session timeout
|
||||
// Returns true if primary session was timed out and cleanup is needed
|
||||
func (sm *SessionManager) handlePrimarySessionTimeout(now time.Time) bool {
|
||||
if sm.primarySessionID == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
primary, exists := sm.sessions[sm.primarySessionID]
|
||||
if !exists {
|
||||
sm.primarySessionID = ""
|
||||
return true
|
||||
}
|
||||
|
||||
currentTimeout := sm.getCurrentPrimaryTimeout()
|
||||
if now.Sub(primary.LastActive) <= currentTimeout {
|
||||
return false
|
||||
}
|
||||
|
||||
// Timeout detected - demote primary
|
||||
timedOutSessionID := primary.ID
|
||||
primary.Mode = SessionModeObserver
|
||||
sm.primarySessionID = ""
|
||||
|
||||
ctx := emergencyPromotionContext{
|
||||
triggerSessionID: timedOutSessionID,
|
||||
triggerReason: "timeout",
|
||||
now: now,
|
||||
}
|
||||
|
||||
promotedSessionID, isEmergency, shouldSkip := sm.attemptEmergencyPromotion(ctx, timedOutSessionID)
|
||||
if shouldSkip {
|
||||
return false
|
||||
}
|
||||
|
||||
if promotedSessionID != "" {
|
||||
reason := "timeout_promotion"
|
||||
if isEmergency {
|
||||
reason = "emergency_timeout_promotion"
|
||||
sm.lastEmergencyPromotion = now
|
||||
sm.consecutiveEmergencyPromotions++
|
||||
|
||||
sm.logger.Warn().
|
||||
Str("timedOutSessionID", timedOutSessionID).
|
||||
Str("promotedSessionID", promotedSessionID).
|
||||
Bool("requireApproval", true).
|
||||
Int("trustScore", sm.getSessionTrustScore(promotedSessionID)).
|
||||
Msg("EMERGENCY: Timeout promotion bypassing approval requirement")
|
||||
}
|
||||
|
||||
err := sm.transferPrimaryRole(timedOutSessionID, promotedSessionID, reason, "primary session timeout")
|
||||
if err == nil {
|
||||
logEvent := sm.logger.Info()
|
||||
if isEmergency {
|
||||
logEvent = sm.logger.Warn()
|
||||
}
|
||||
logEvent.
|
||||
Str("timedOutSessionID", timedOutSessionID).
|
||||
Str("promotedSessionID", promotedSessionID).
|
||||
Bool("isEmergencyPromotion", isEmergency).
|
||||
Msg("Auto-promoted session after primary timeout")
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
|
@ -84,10 +84,6 @@ type TransferBlacklistEntry struct {
|
|||
var (
|
||||
lastBroadcast time.Time
|
||||
broadcastMutex sync.Mutex
|
||||
|
||||
// Pre-allocated event maps to reduce allocations
|
||||
modePrimaryEvent = map[string]string{"mode": "primary"}
|
||||
modeObserverEvent = map[string]string{"mode": "observer"}
|
||||
)
|
||||
|
||||
type SessionManager struct {
|
||||
|
|
@ -95,6 +91,7 @@ type SessionManager struct {
|
|||
primaryTimeout time.Duration // 8 bytes
|
||||
logger *zerolog.Logger // 8 bytes
|
||||
sessions map[string]*Session // 8 bytes
|
||||
nicknameIndex map[string]*Session // 8 bytes - O(1) nickname uniqueness lookups
|
||||
reconnectGrace map[string]time.Time // 8 bytes
|
||||
reconnectInfo map[string]*SessionData // 8 bytes
|
||||
transferBlacklist []TransferBlacklistEntry // Prevent demoted sessions from immediate re-promotion
|
||||
|
|
@ -136,6 +133,7 @@ func NewSessionManager(logger *zerolog.Logger) *SessionManager {
|
|||
|
||||
sm := &SessionManager{
|
||||
sessions: make(map[string]*Session),
|
||||
nicknameIndex: make(map[string]*Session),
|
||||
reconnectGrace: make(map[string]time.Time),
|
||||
reconnectInfo: make(map[string]*SessionData),
|
||||
transferBlacklist: make([]TransferBlacklistEntry, 0),
|
||||
|
|
@ -177,10 +175,10 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe
|
|||
sm.mu.Lock()
|
||||
defer sm.mu.Unlock()
|
||||
|
||||
// Check nickname uniqueness (only for non-empty nicknames)
|
||||
// Check nickname uniqueness using O(1) index (only for non-empty nicknames)
|
||||
if session.Nickname != "" {
|
||||
for id, existingSession := range sm.sessions {
|
||||
if id != session.ID && existingSession.Nickname == session.Nickname {
|
||||
if existingSession, exists := sm.nicknameIndex[session.Nickname]; exists {
|
||||
if existingSession.ID != session.ID {
|
||||
return fmt.Errorf("nickname '%s' is already in use by another session", session.Nickname)
|
||||
}
|
||||
}
|
||||
|
|
@ -282,12 +280,14 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe
|
|||
}
|
||||
|
||||
isBlacklisted := sm.isSessionBlacklisted(session.ID)
|
||||
isOnlySession := len(sm.sessions) == 0
|
||||
|
||||
// Determine if this session should become primary
|
||||
// If there's no primary AND this is the ONLY session, ALWAYS promote regardless of blacklist
|
||||
isOnlySession := len(sm.sessions) == 0
|
||||
shouldBecomePrimary := (wasWithinGracePeriod && wasPreviouslyPrimary && !primaryExists && !hasActivePrimaryGracePeriod) ||
|
||||
(!wasWithinGracePeriod && !hasActivePrimaryGracePeriod && !primaryExists && (!isBlacklisted || isOnlySession))
|
||||
canBecomePrimary := !primaryExists && !hasActivePrimaryGracePeriod
|
||||
isReconnectingPrimary := wasWithinGracePeriod && wasPreviouslyPrimary
|
||||
isNewEligibleSession := !wasWithinGracePeriod && (!isBlacklisted || isOnlySession)
|
||||
|
||||
shouldBecomePrimary := canBecomePrimary && (isReconnectingPrimary || isNewEligibleSession)
|
||||
|
||||
if shouldBecomePrimary {
|
||||
if sm.primarySessionID == "" || sm.sessions[sm.primarySessionID] == nil {
|
||||
|
|
@ -351,6 +351,11 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe
|
|||
// Ensure session has auto-generated nickname if needed
|
||||
sm.ensureNickname(session)
|
||||
|
||||
// Add to nickname index
|
||||
if session.Nickname != "" {
|
||||
sm.nicknameIndex[session.Nickname] = session
|
||||
}
|
||||
|
||||
sm.validateSinglePrimary()
|
||||
|
||||
// Clean up grace period after validation completes
|
||||
|
|
@ -406,18 +411,20 @@ func (sm *SessionManager) RemoveSession(sessionID string) {
|
|||
// Only add grace period if this is NOT an intentional logout
|
||||
if !isIntentionalLogout {
|
||||
// Limit grace period entries to prevent memory exhaustion
|
||||
// Evict the entry that will expire soonest (oldest expiration time)
|
||||
for len(sm.reconnectGrace) >= maxGracePeriodEntries {
|
||||
var oldestID string
|
||||
var oldestTime time.Time
|
||||
var evictID string
|
||||
var earliestExpiration time.Time
|
||||
for id, graceTime := range sm.reconnectGrace {
|
||||
if oldestTime.IsZero() || graceTime.Before(oldestTime) {
|
||||
oldestID = id
|
||||
oldestTime = graceTime
|
||||
// Find the grace period that expires first (earliest time)
|
||||
if earliestExpiration.IsZero() || graceTime.Before(earliestExpiration) {
|
||||
evictID = id
|
||||
earliestExpiration = graceTime
|
||||
}
|
||||
}
|
||||
if oldestID != "" {
|
||||
delete(sm.reconnectGrace, oldestID)
|
||||
delete(sm.reconnectInfo, oldestID)
|
||||
if evictID != "" {
|
||||
delete(sm.reconnectGrace, evictID)
|
||||
delete(sm.reconnectInfo, evictID)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
|
|
@ -531,20 +538,26 @@ func (sm *SessionManager) ClearGracePeriod(sessionID string) {
|
|||
// isSessionBlacklisted checks if a session was recently demoted via transfer and should not become primary
|
||||
func (sm *SessionManager) isSessionBlacklisted(sessionID string) bool {
|
||||
now := time.Now()
|
||||
isBlacklisted := false
|
||||
|
||||
// Clean expired entries while we're here
|
||||
validEntries := make([]TransferBlacklistEntry, 0, len(sm.transferBlacklist))
|
||||
for _, entry := range sm.transferBlacklist {
|
||||
// Clean expired entries in-place (zero allocations)
|
||||
writeIndex := 0
|
||||
for readIndex := 0; readIndex < len(sm.transferBlacklist); readIndex++ {
|
||||
entry := sm.transferBlacklist[readIndex]
|
||||
if now.Before(entry.ExpiresAt) {
|
||||
validEntries = append(validEntries, entry)
|
||||
// Keep this entry - still valid
|
||||
sm.transferBlacklist[writeIndex] = entry
|
||||
writeIndex++
|
||||
if entry.SessionID == sessionID {
|
||||
return true // Found active blacklist entry
|
||||
isBlacklisted = true
|
||||
}
|
||||
}
|
||||
// Expired entries are automatically skipped (not copied forward)
|
||||
}
|
||||
sm.transferBlacklist = validEntries // Update with only non-expired entries
|
||||
// Truncate to only valid entries
|
||||
sm.transferBlacklist = sm.transferBlacklist[:writeIndex]
|
||||
|
||||
return false
|
||||
return isBlacklisted
|
||||
}
|
||||
|
||||
// GetPrimarySession returns the current primary session
|
||||
|
|
@ -655,7 +668,7 @@ func (sm *SessionManager) RequestPrimary(sessionID string) error {
|
|||
err := sm.transferPrimaryRole("", sessionID, "initial_promotion", "first session auto-promotion")
|
||||
if err == nil {
|
||||
// Send mode change event after promoting
|
||||
writeJSONRPCEvent("modeChanged", modePrimaryEvent, session)
|
||||
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "primary"}, session)
|
||||
go sm.broadcastSessionListUpdate()
|
||||
}
|
||||
return err
|
||||
|
|
@ -742,7 +755,7 @@ func (sm *SessionManager) ReleasePrimary(sessionID string) error {
|
|||
// Send mode change event for promoted session
|
||||
go func() {
|
||||
if promotedSession := sessionManager.GetSession(promotedSessionID); promotedSession != nil {
|
||||
writeJSONRPCEvent("modeChanged", modePrimaryEvent, promotedSession)
|
||||
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "primary"}, promotedSession)
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
|
|
@ -784,13 +797,13 @@ func (sm *SessionManager) TransferPrimary(fromID, toID string) error {
|
|||
// Send events in goroutines to avoid holding lock
|
||||
go func() {
|
||||
if fromSession := sessionManager.GetSession(fromID); fromSession != nil {
|
||||
writeJSONRPCEvent("modeChanged", modeObserverEvent, fromSession)
|
||||
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "observer"}, fromSession)
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
if toSession := sessionManager.GetSession(toID); toSession != nil {
|
||||
writeJSONRPCEvent("modeChanged", modePrimaryEvent, toSession)
|
||||
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "primary"}, toSession)
|
||||
}
|
||||
sm.broadcastSessionListUpdate()
|
||||
}()
|
||||
|
|
@ -848,13 +861,13 @@ func (sm *SessionManager) ApprovePrimaryRequest(currentPrimaryID, requesterID st
|
|||
// Send events after releasing lock to avoid deadlock
|
||||
go func() {
|
||||
if demotedSession := sessionManager.GetSession(currentPrimaryID); demotedSession != nil {
|
||||
writeJSONRPCEvent("modeChanged", modeObserverEvent, demotedSession)
|
||||
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "observer"}, demotedSession)
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
if promotedSession := sessionManager.GetSession(requesterID); promotedSession != nil {
|
||||
writeJSONRPCEvent("modeChanged", modePrimaryEvent, promotedSession)
|
||||
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "primary"}, promotedSession)
|
||||
}
|
||||
sm.broadcastSessionListUpdate()
|
||||
}()
|
||||
|
|
@ -1519,10 +1532,17 @@ func (sm *SessionManager) broadcastSessionListUpdate() {
|
|||
// Now send events without holding lock
|
||||
for _, session := range activeSessions {
|
||||
// Per-session throttling to prevent broadcast storms
|
||||
if time.Since(session.LastBroadcast) < sessionBroadcastDelay {
|
||||
session.lastBroadcastMu.Lock()
|
||||
shouldSkip := time.Since(session.LastBroadcast) < sessionBroadcastDelay
|
||||
if !shouldSkip {
|
||||
session.LastBroadcast = time.Now()
|
||||
}
|
||||
session.lastBroadcastMu.Unlock()
|
||||
|
||||
if shouldSkip {
|
||||
continue
|
||||
}
|
||||
session.LastBroadcast = time.Now()
|
||||
|
||||
event := SessionsUpdateEvent{
|
||||
Sessions: infos,
|
||||
YourMode: session.Mode,
|
||||
|
|
@ -1547,10 +1567,10 @@ func (sm *SessionManager) Shutdown() {
|
|||
}
|
||||
|
||||
func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) {
|
||||
ticker := time.NewTicker(1 * time.Second) // Check every second for grace periods
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
validationCounter := 0 // Counter for periodic validateSinglePrimary calls
|
||||
validationCounter := 0
|
||||
|
||||
for {
|
||||
select {
|
||||
|
|
@ -1561,313 +1581,33 @@ func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) {
|
|||
now := time.Now()
|
||||
needsBroadcast := false
|
||||
|
||||
// Check for expired grace periods and promote if needed
|
||||
gracePeriodExpired := false
|
||||
for sessionID, graceTime := range sm.reconnectGrace {
|
||||
if now.After(graceTime) {
|
||||
delete(sm.reconnectGrace, sessionID)
|
||||
gracePeriodExpired = true
|
||||
|
||||
wasHoldingPrimarySlot := (sm.lastPrimaryID == sessionID)
|
||||
|
||||
// Check if this expired session was the primary holding the slot
|
||||
if wasHoldingPrimarySlot {
|
||||
// The primary didn't reconnect in time, now we can clear the slot and promote
|
||||
sm.primarySessionID = ""
|
||||
sm.lastPrimaryID = ""
|
||||
needsBroadcast = true
|
||||
|
||||
sm.logger.Info().
|
||||
Str("expiredSessionID", sessionID).
|
||||
Msg("Primary session grace period expired - slot now available")
|
||||
|
||||
// Always try to promote when possible - approval is only for new pending sessions
|
||||
// Use enhanced emergency promotion system for better security
|
||||
isEmergencyPromotion := false
|
||||
var promotedSessionID string
|
||||
|
||||
// === EMERGENCY PROMOTION ALGORITHM ===
|
||||
//
|
||||
// When RequireApproval is enabled, we face a potential deadlock scenario:
|
||||
// - Primary session disconnects (grace period expires)
|
||||
// - All other sessions are pending (waiting for approval from primary)
|
||||
// - No primary exists to approve pending sessions
|
||||
// - Result: System is stuck with no primary and no way to get one
|
||||
//
|
||||
// Solution: Emergency promotion bypasses approval requirement to select
|
||||
// the most trustworthy pending/observer session as primary. This ensures
|
||||
// the system ALWAYS has a primary session for KVM functionality.
|
||||
//
|
||||
// Security measures to prevent abuse:
|
||||
// 1. Rate limiting: Max 1 emergency promotion per 30 seconds
|
||||
// 2. Consecutive limit: Max 3 consecutive emergency promotions
|
||||
// 3. Trust-based selection: Sessions scored on age, history, nickname
|
||||
// 4. Audit logging: All emergency promotions logged at WARN level
|
||||
//
|
||||
// Trust scoring criteria (see getSessionTrustScore):
|
||||
// - Session age: +1 point per minute (capped at 100)
|
||||
// - Was previous primary: +50 points
|
||||
// - Observer mode: +20 points (more trustworthy than queued/pending)
|
||||
// - Queued mode: +10 points
|
||||
// - Has required nickname: +15 points / missing: -30 points
|
||||
//
|
||||
// This algorithm prioritizes long-lived, previously-primary sessions
|
||||
// with proper nicknames over newly-connected anonymous sessions.
|
||||
//
|
||||
// Check if this is an emergency scenario (RequireApproval enabled)
|
||||
if currentSessionSettings != nil && currentSessionSettings.RequireApproval {
|
||||
isEmergencyPromotion = true
|
||||
|
||||
// CRITICAL: Ensure we ALWAYS have a primary session
|
||||
// If there's NO primary, bypass rate limits entirely
|
||||
hasPrimary := sm.primarySessionID != ""
|
||||
if !hasPrimary {
|
||||
sm.logger.Error().
|
||||
Str("expiredSessionID", sessionID).
|
||||
Msg("CRITICAL: No primary session exists - bypassing all rate limits")
|
||||
} else {
|
||||
// Rate limiting for emergency promotions (only when we have a primary)
|
||||
if now.Sub(sm.lastEmergencyPromotion) < 30*time.Second {
|
||||
sm.logger.Warn().
|
||||
Str("expiredSessionID", sessionID).
|
||||
Dur("timeSinceLastEmergency", now.Sub(sm.lastEmergencyPromotion)).
|
||||
Msg("Emergency promotion rate limit exceeded - potential attack")
|
||||
continue // Skip this grace period expiration
|
||||
}
|
||||
|
||||
// Limit consecutive emergency promotions
|
||||
if sm.consecutiveEmergencyPromotions >= 3 {
|
||||
sm.logger.Error().
|
||||
Str("expiredSessionID", sessionID).
|
||||
Int("consecutiveCount", sm.consecutiveEmergencyPromotions).
|
||||
Msg("Too many consecutive emergency promotions - blocking for security")
|
||||
continue // Skip this grace period expiration
|
||||
}
|
||||
}
|
||||
|
||||
promotedSessionID = sm.findMostTrustedSessionForEmergency()
|
||||
} else {
|
||||
// Normal promotion - reset consecutive counter
|
||||
sm.consecutiveEmergencyPromotions = 0
|
||||
promotedSessionID = sm.findNextSessionToPromote()
|
||||
}
|
||||
|
||||
if promotedSessionID != "" {
|
||||
// Determine reason and log appropriately
|
||||
reason := "grace_expiration_promotion"
|
||||
if isEmergencyPromotion {
|
||||
reason = "emergency_promotion_deadlock_prevention"
|
||||
sm.lastEmergencyPromotion = now
|
||||
sm.consecutiveEmergencyPromotions++
|
||||
|
||||
// Enhanced logging for emergency promotions
|
||||
sm.logger.Warn().
|
||||
Str("expiredSessionID", sessionID).
|
||||
Str("promotedSessionID", promotedSessionID).
|
||||
Bool("requireApproval", true).
|
||||
Int("consecutiveEmergencyPromotions", sm.consecutiveEmergencyPromotions).
|
||||
Int("trustScore", sm.getSessionTrustScore(promotedSessionID)).
|
||||
Msg("EMERGENCY: Bypassing approval requirement to prevent deadlock")
|
||||
}
|
||||
|
||||
err := sm.transferPrimaryRole("", promotedSessionID, reason, "primary grace period expired")
|
||||
if err == nil {
|
||||
logEvent := sm.logger.Info()
|
||||
if isEmergencyPromotion {
|
||||
logEvent = sm.logger.Warn()
|
||||
}
|
||||
logEvent.
|
||||
Str("expiredSessionID", sessionID).
|
||||
Str("promotedSessionID", promotedSessionID).
|
||||
Str("reason", reason).
|
||||
Bool("isEmergencyPromotion", isEmergencyPromotion).
|
||||
Msg("Auto-promoted session after primary grace period expiration")
|
||||
} else {
|
||||
sm.logger.Error().
|
||||
Err(err).
|
||||
Str("expiredSessionID", sessionID).
|
||||
Str("promotedSessionID", promotedSessionID).
|
||||
Str("reason", reason).
|
||||
Bool("isEmergencyPromotion", isEmergencyPromotion).
|
||||
Msg("Failed to promote session after grace period expiration")
|
||||
}
|
||||
} else {
|
||||
logLevel := sm.logger.Info()
|
||||
if isEmergencyPromotion {
|
||||
logLevel = sm.logger.Error() // Emergency with no eligible sessions is critical
|
||||
}
|
||||
logLevel.
|
||||
Str("expiredSessionID", sessionID).
|
||||
Bool("isEmergencyPromotion", isEmergencyPromotion).
|
||||
Msg("Primary grace period expired but no eligible sessions to promote")
|
||||
}
|
||||
} else {
|
||||
// Non-primary session grace period expired - just cleanup
|
||||
sm.logger.Debug().
|
||||
Str("expiredSessionID", sessionID).
|
||||
Msg("Non-primary session grace period expired")
|
||||
}
|
||||
|
||||
// Also clean up reconnect info for expired sessions
|
||||
delete(sm.reconnectInfo, sessionID)
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up pending sessions that have timed out (DoS protection)
|
||||
for id, session := range sm.sessions {
|
||||
if session.Mode == SessionModePending &&
|
||||
now.Sub(session.CreatedAt) > defaultPendingSessionTimeout {
|
||||
websocketLogger.Info().
|
||||
Str("sessionId", id).
|
||||
Dur("age", now.Sub(session.CreatedAt)).
|
||||
Msg("Removing timed-out pending session")
|
||||
delete(sm.sessions, id)
|
||||
// Handle expired grace periods
|
||||
gracePeriodExpired := sm.handleGracePeriodExpiration(now)
|
||||
if gracePeriodExpired {
|
||||
needsBroadcast = true
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up observer sessions with closed RPC channels (stale connections)
|
||||
// This prevents accumulation of zombie observer sessions that are no longer connected
|
||||
observerTimeout := defaultObserverSessionTimeout
|
||||
if currentSessionSettings != nil && currentSessionSettings.ObserverTimeout > 0 {
|
||||
observerTimeout = time.Duration(currentSessionSettings.ObserverTimeout) * time.Second
|
||||
}
|
||||
for id, session := range sm.sessions {
|
||||
if session.Mode == SessionModeObserver {
|
||||
// Check if RPC channel is nil/closed AND session has been inactive
|
||||
if session.RPCChannel == nil && now.Sub(session.LastActive) > observerTimeout {
|
||||
sm.logger.Info().
|
||||
Str("sessionId", id).
|
||||
Dur("inactiveFor", now.Sub(session.LastActive)).
|
||||
Dur("observerTimeout", observerTimeout).
|
||||
Msg("Removing inactive observer session with closed RPC channel")
|
||||
delete(sm.sessions, id)
|
||||
// Clean up timed-out pending sessions (DoS protection)
|
||||
if sm.handlePendingSessionTimeout(now) {
|
||||
needsBroadcast = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check primary session timeout (every 30 iterations = 30 seconds)
|
||||
if sm.primarySessionID != "" {
|
||||
if primary, exists := sm.sessions[sm.primarySessionID]; exists {
|
||||
currentTimeout := sm.getCurrentPrimaryTimeout()
|
||||
if now.Sub(primary.LastActive) > currentTimeout {
|
||||
timedOutSessionID := primary.ID
|
||||
primary.Mode = SessionModeObserver
|
||||
sm.primarySessionID = ""
|
||||
|
||||
// === TIMEOUT-BASED EMERGENCY PROMOTION ===
|
||||
//
|
||||
// Similar to grace period expiration, primary session timeout can create
|
||||
// a deadlock when RequireApproval is enabled. The timeout detection happens
|
||||
// every 30 seconds (based on ticker iterations) and demotes inactive primaries.
|
||||
//
|
||||
// Without emergency promotion:
|
||||
// - Primary becomes inactive and times out
|
||||
// - Primary is demoted to observer
|
||||
// - All other sessions are pending (awaiting approval)
|
||||
// - No primary exists to approve them
|
||||
// - System deadlocked with no KVM control
|
||||
//
|
||||
// This uses the same trust-based selection and security measures as
|
||||
// grace period emergency promotion to ensure system availability.
|
||||
isEmergencyPromotion := false
|
||||
var promotedSessionID string
|
||||
|
||||
// Check if this requires emergency promotion due to approval requirements
|
||||
if currentSessionSettings != nil && currentSessionSettings.RequireApproval {
|
||||
isEmergencyPromotion = true
|
||||
|
||||
// CRITICAL: Ensure we ALWAYS have a primary session
|
||||
// primarySessionID was just cleared above, so this will always be empty
|
||||
// But check anyway for completeness
|
||||
hasPrimary := sm.primarySessionID != ""
|
||||
if !hasPrimary {
|
||||
sm.logger.Error().
|
||||
Str("timedOutSessionID", timedOutSessionID).
|
||||
Msg("CRITICAL: No primary session after timeout - bypassing all rate limits")
|
||||
} else {
|
||||
// Rate limiting for emergency promotions (only when we have a primary)
|
||||
if now.Sub(sm.lastEmergencyPromotion) < 30*time.Second {
|
||||
sm.logger.Warn().
|
||||
Str("timedOutSessionID", timedOutSessionID).
|
||||
Dur("timeSinceLastEmergency", now.Sub(sm.lastEmergencyPromotion)).
|
||||
Msg("Emergency promotion rate limit exceeded during timeout - potential attack")
|
||||
continue // Skip this timeout
|
||||
}
|
||||
}
|
||||
|
||||
// Use trust-based selection but exclude the timed-out session
|
||||
bestSessionID := ""
|
||||
bestScore := -1
|
||||
for id, session := range sm.sessions {
|
||||
if id != timedOutSessionID &&
|
||||
!sm.isSessionBlacklisted(id) &&
|
||||
(session.Mode == SessionModeObserver || session.Mode == SessionModeQueued) {
|
||||
score := sm.getSessionTrustScore(id)
|
||||
if score > bestScore {
|
||||
bestScore = score
|
||||
bestSessionID = id
|
||||
}
|
||||
}
|
||||
}
|
||||
promotedSessionID = bestSessionID
|
||||
} else {
|
||||
// Normal timeout promotion - find any observer except the timed-out one
|
||||
for id, session := range sm.sessions {
|
||||
if id != timedOutSessionID && session.Mode == SessionModeObserver && !sm.isSessionBlacklisted(id) {
|
||||
promotedSessionID = id
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If found a session to promote
|
||||
if promotedSessionID != "" {
|
||||
reason := "timeout_promotion"
|
||||
if isEmergencyPromotion {
|
||||
reason = "emergency_timeout_promotion"
|
||||
sm.lastEmergencyPromotion = now
|
||||
sm.consecutiveEmergencyPromotions++
|
||||
|
||||
// Enhanced logging for emergency timeout promotions
|
||||
sm.logger.Warn().
|
||||
Str("timedOutSessionID", timedOutSessionID).
|
||||
Str("promotedSessionID", promotedSessionID).
|
||||
Bool("requireApproval", true).
|
||||
Int("trustScore", sm.getSessionTrustScore(promotedSessionID)).
|
||||
Msg("EMERGENCY: Timeout promotion bypassing approval requirement")
|
||||
}
|
||||
|
||||
err := sm.transferPrimaryRole(timedOutSessionID, promotedSessionID, reason, "primary session timeout")
|
||||
if err == nil {
|
||||
needsBroadcast = true
|
||||
logEvent := sm.logger.Info()
|
||||
if isEmergencyPromotion {
|
||||
logEvent = sm.logger.Warn()
|
||||
}
|
||||
logEvent.
|
||||
Str("timedOutSessionID", timedOutSessionID).
|
||||
Str("promotedSessionID", promotedSessionID).
|
||||
Bool("isEmergencyPromotion", isEmergencyPromotion).
|
||||
Msg("Auto-promoted session after primary timeout")
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Primary session no longer exists, clear it
|
||||
sm.primarySessionID = ""
|
||||
// Clean up inactive observer sessions
|
||||
if sm.handleObserverSessionCleanup(now) {
|
||||
needsBroadcast = true
|
||||
}
|
||||
|
||||
// Handle primary session timeout
|
||||
if sm.handlePrimarySessionTimeout(now) {
|
||||
needsBroadcast = true
|
||||
}
|
||||
|
||||
// Run validation immediately if a grace period expired, otherwise run periodically
|
||||
// Run validation immediately if grace period expired, otherwise periodically
|
||||
if gracePeriodExpired {
|
||||
sm.validateSinglePrimary()
|
||||
} else {
|
||||
// Periodic validateSinglePrimary to catch deadlock states
|
||||
validationCounter++
|
||||
if validationCounter >= 10 { // Every 10 seconds
|
||||
if validationCounter >= 10 {
|
||||
validationCounter = 0
|
||||
sm.validateSinglePrimary()
|
||||
}
|
||||
|
|
@ -1875,7 +1615,6 @@ func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) {
|
|||
|
||||
sm.mu.Unlock()
|
||||
|
||||
// Broadcast outside of lock if needed
|
||||
if needsBroadcast {
|
||||
go sm.broadcastSessionListUpdate()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,7 +52,6 @@ export default function Actionbar({
|
|||
const response = JSON.parse(event.data);
|
||||
if (response.id === id && response.result) {
|
||||
setSessions(response.result);
|
||||
rpcDataChannel.removeEventListener("message", handler);
|
||||
}
|
||||
} catch {
|
||||
// Ignore parse errors for non-JSON messages
|
||||
|
|
@ -62,10 +61,14 @@ export default function Actionbar({
|
|||
rpcDataChannel.addEventListener("message", handler);
|
||||
rpcDataChannel.send(message);
|
||||
|
||||
// Clean up after timeout
|
||||
setTimeout(() => {
|
||||
const timeoutId = setTimeout(() => {
|
||||
rpcDataChannel.removeEventListener("message", handler);
|
||||
}, 5000);
|
||||
|
||||
return () => {
|
||||
clearTimeout(timeoutId);
|
||||
rpcDataChannel.removeEventListener("message", handler);
|
||||
};
|
||||
}
|
||||
}, [rpcDataChannel, sessions.length, setSessions]);
|
||||
|
||||
|
|
|
|||
4
usb.go
4
usb.go
|
|
@ -89,7 +89,7 @@ func (s *Session) rpcKeypressReport(key byte, press bool) error {
|
|||
return gadget.KeypressReport(key, press)
|
||||
}
|
||||
|
||||
func (s *Session) rpcAbsMouseReport(x int16, y int16, buttons uint8) error {
|
||||
func (s *Session) rpcAbsMouseReport(x int, y int, buttons uint8) error {
|
||||
if s == nil || !s.HasPermission(PermissionMouseInput) {
|
||||
return ErrPermissionDeniedMouse
|
||||
}
|
||||
|
|
@ -128,7 +128,7 @@ func rpcKeypressReport(key byte, press bool) error {
|
|||
return ErrNotPrimarySession
|
||||
}
|
||||
|
||||
func rpcAbsMouseReport(x int16, y int16, buttons uint8) error {
|
||||
func rpcAbsMouseReport(x int, y int, buttons uint8) error {
|
||||
if primary := sessionManager.GetPrimarySession(); primary != nil {
|
||||
return primary.rpcAbsMouseReport(x, y, buttons)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ type Session struct {
|
|||
rpcRateLimitMu sync.Mutex // Protects rate limit fields
|
||||
rpcRateLimit int // Count of RPCs in current window
|
||||
rpcRateLimitWin time.Time // Start of current rate limit window
|
||||
lastBroadcastMu sync.Mutex // Protects LastBroadcast field
|
||||
|
||||
peerConnection *webrtc.PeerConnection
|
||||
VideoTrack *webrtc.TrackLocalStaticSample
|
||||
|
|
|
|||
Loading…
Reference in New Issue