mirror of https://github.com/jetkvm/kvm.git
fix: address critical issues in multi-session management
- Fix nickname index stale pointer during session reconnection - Reset LastActive for all emergency promotions to prevent cascade timeouts - Bypass rate limits when no primary exists to prevent system deadlock - Replace manual mutex with atomic.Int32 for session counter (fixes race condition) - Implement collect-then-delete pattern for safe map iteration - Reduce logging verbosity for routine cleanup operations
This commit is contained in:
parent
08b0dd0c37
commit
ba2fa34385
|
|
@ -70,7 +70,7 @@ func updateDisplay() {
|
|||
nativeInstance.UpdateLabelIfChanged("hdmi_status_label", "Disconnected")
|
||||
_, _ = nativeInstance.UIObjClearState("hdmi_status_label", "LV_STATE_CHECKED")
|
||||
}
|
||||
nativeInstance.UpdateLabelIfChanged("cloud_status_label", fmt.Sprintf("%d active", actionSessions))
|
||||
nativeInstance.UpdateLabelIfChanged("cloud_status_label", fmt.Sprintf("%d active", getActiveSessions()))
|
||||
|
||||
if networkManager != nil && networkManager.IsUp() {
|
||||
nativeInstance.UISetVar("main_screen", "home_screen")
|
||||
|
|
|
|||
|
|
@ -25,6 +25,18 @@ func (sm *SessionManager) attemptEmergencyPromotion(ctx emergencyPromotionContex
|
|||
sm.emergencyWindowMutex.Lock()
|
||||
defer sm.emergencyWindowMutex.Unlock()
|
||||
|
||||
// CRITICAL: Bypass all rate limits if no primary exists to prevent deadlock
|
||||
// System availability takes priority over DoS protection
|
||||
noPrimaryExists := (sm.primarySessionID == "")
|
||||
if noPrimaryExists {
|
||||
sm.logger.Info().
|
||||
Str("triggerSessionID", ctx.triggerSessionID).
|
||||
Str("triggerReason", ctx.triggerReason).
|
||||
Msg("Bypassing emergency promotion rate limits - no primary exists")
|
||||
promotedSessionID := sm.findMostTrustedSessionForEmergency()
|
||||
return promotedSessionID, true, false
|
||||
}
|
||||
|
||||
const slidingWindowDuration = 60 * time.Second
|
||||
const maxEmergencyPromotionsPerMinute = 3
|
||||
|
||||
|
|
@ -187,19 +199,21 @@ func (sm *SessionManager) promoteAfterGraceExpiration(expiredSessionID string, n
|
|||
// handlePendingSessionTimeout removes timed-out pending sessions (DoS protection)
|
||||
// Returns true if any pending session was removed
|
||||
func (sm *SessionManager) handlePendingSessionTimeout(now time.Time) bool {
|
||||
needsCleanup := false
|
||||
toDelete := make([]string, 0)
|
||||
for id, session := range sm.sessions {
|
||||
if session.Mode == SessionModePending &&
|
||||
now.Sub(session.CreatedAt) > defaultPendingSessionTimeout {
|
||||
websocketLogger.Info().
|
||||
websocketLogger.Debug().
|
||||
Str("sessionId", id).
|
||||
Dur("age", now.Sub(session.CreatedAt)).
|
||||
Msg("Removing timed-out pending session")
|
||||
delete(sm.sessions, id)
|
||||
needsCleanup = true
|
||||
toDelete = append(toDelete, id)
|
||||
}
|
||||
}
|
||||
return needsCleanup
|
||||
for _, id := range toDelete {
|
||||
delete(sm.sessions, id)
|
||||
}
|
||||
return len(toDelete) > 0
|
||||
}
|
||||
|
||||
// handleObserverSessionCleanup removes inactive observer sessions with closed RPC channels
|
||||
|
|
@ -210,21 +224,23 @@ func (sm *SessionManager) handleObserverSessionCleanup(now time.Time) bool {
|
|||
observerTimeout = time.Duration(currentSessionSettings.ObserverTimeout) * time.Second
|
||||
}
|
||||
|
||||
needsCleanup := false
|
||||
toDelete := make([]string, 0)
|
||||
for id, session := range sm.sessions {
|
||||
if session.Mode == SessionModeObserver {
|
||||
if session.RPCChannel == nil && now.Sub(session.LastActive) > observerTimeout {
|
||||
sm.logger.Info().
|
||||
sm.logger.Debug().
|
||||
Str("sessionId", id).
|
||||
Dur("inactiveFor", now.Sub(session.LastActive)).
|
||||
Dur("observerTimeout", observerTimeout).
|
||||
Msg("Removing inactive observer session with closed RPC channel")
|
||||
delete(sm.sessions, id)
|
||||
needsCleanup = true
|
||||
toDelete = append(toDelete, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
return needsCleanup
|
||||
for _, id := range toDelete {
|
||||
delete(sm.sessions, id)
|
||||
}
|
||||
return len(toDelete) > 0
|
||||
}
|
||||
|
||||
// handlePrimarySessionTimeout checks and handles primary session timeout
|
||||
|
|
|
|||
|
|
@ -250,6 +250,10 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe
|
|||
// Ensure session has auto-generated nickname if needed
|
||||
sm.ensureNickname(session)
|
||||
|
||||
if !nicknameReserved && session.Nickname != "" {
|
||||
sm.nicknameIndex[session.Nickname] = session
|
||||
}
|
||||
|
||||
sm.sessions[session.ID] = session
|
||||
|
||||
// If this was the primary, try to restore primary status
|
||||
|
|
@ -1200,8 +1204,8 @@ func (sm *SessionManager) transferPrimaryRole(fromSessionID, toSessionID, transf
|
|||
// Promote target session
|
||||
toSession.Mode = SessionModePrimary
|
||||
toSession.hidRPCAvailable = false
|
||||
// Reset LastActive only for emergency promotions to prevent immediate re-timeout
|
||||
if transferType == "emergency_timeout_promotion" || transferType == "emergency_promotion_deadlock_prevention" {
|
||||
// Reset LastActive for all emergency promotions to prevent immediate re-timeout
|
||||
if strings.HasPrefix(transferType, "emergency_") {
|
||||
toSession.LastActive = time.Now()
|
||||
}
|
||||
sm.primarySessionID = toSessionID
|
||||
|
|
|
|||
25
webrtc.go
25
webrtc.go
|
|
@ -7,6 +7,7 @@ import (
|
|||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/coder/websocket"
|
||||
|
|
@ -66,24 +67,14 @@ type Session struct {
|
|||
keysDownStateQueue chan usbgadget.KeysDownState
|
||||
}
|
||||
|
||||
var (
|
||||
actionSessions int = 0
|
||||
activeSessionsMutex = &sync.Mutex{}
|
||||
)
|
||||
var actionSessions atomic.Int32
|
||||
|
||||
func incrActiveSessions() int {
|
||||
activeSessionsMutex.Lock()
|
||||
defer activeSessionsMutex.Unlock()
|
||||
|
||||
actionSessions++
|
||||
return actionSessions
|
||||
func incrActiveSessions() int32 {
|
||||
return actionSessions.Add(1)
|
||||
}
|
||||
|
||||
func getActiveSessions() int {
|
||||
activeSessionsMutex.Lock()
|
||||
defer activeSessionsMutex.Unlock()
|
||||
|
||||
return actionSessions
|
||||
func getActiveSessions() int32 {
|
||||
return actionSessions.Load()
|
||||
}
|
||||
|
||||
// CheckRPCRateLimit checks if the session has exceeded RPC rate limits (DoS protection)
|
||||
|
|
@ -494,9 +485,9 @@ func newSession(config SessionConfig) (*Session, error) {
|
|||
|
||||
if isConnected {
|
||||
isConnected = false
|
||||
actionSessions--
|
||||
newCount := actionSessions.Add(-1)
|
||||
onActiveSessionsChanged()
|
||||
if actionSessions == 0 {
|
||||
if newCount == 0 {
|
||||
onLastSessionDisconnected()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue