mirror of https://github.com/jetkvm/kvm.git
fix: primary session timeout not triggering due to reconnection resets
Fixed critical bug where primary session timeout was never triggered even after configured inactivity period (e.g., 60 seconds with no input). Root cause: LastActive timestamp was being reset during WebSocket reconnections and session promotions, preventing the inactivity timer from ever reaching the timeout threshold. Changes: - session_manager.go:245: Removed LastActive reset during reconnection in AddSession(). Reconnections should NOT reset the activity timer since timeout is based on input activity, not connection activity. - session_manager.go:1207-1209: Made LastActive reset conditional in transferPrimaryRole(). Only emergency promotions reset the timer to prevent immediate re-timeout. Manual transfers preserve existing LastActive for accurate timeout tracking. Impact: - Primary sessions will now correctly timeout after configured inactivity - LastActive only updated by actual user input (keyboard/mouse events) - Emergency promotions still get fresh timer to prevent rapid re-timeout - Manual transfers maintain accurate activity tracking Test scenario: 1. User becomes primary and leaves tab in background 2. No keyboard/mouse input for 60+ seconds (timeout configured) 3. WebSocket stays connected but LastActive is not reset 4. handlePrimarySessionTimeout() detects inactivity and demotes primary 5. Next eligible observer is automatically promoted
This commit is contained in:
parent
711f7818bf
commit
c9d8dcb553
|
|
@ -31,12 +31,23 @@ const (
|
||||||
// Session timeout defaults
|
// Session timeout defaults
|
||||||
defaultPendingSessionTimeout = 1 * time.Minute // Timeout for pending sessions (DoS protection)
|
defaultPendingSessionTimeout = 1 * time.Minute // Timeout for pending sessions (DoS protection)
|
||||||
defaultObserverSessionTimeout = 2 * time.Minute // Timeout for inactive observer sessions
|
defaultObserverSessionTimeout = 2 * time.Minute // Timeout for inactive observer sessions
|
||||||
|
disabledTimeoutValue = 24 * time.Hour // Value used when timeout is disabled (0 setting)
|
||||||
|
|
||||||
// Transfer and blacklist settings
|
// Transfer and blacklist settings
|
||||||
transferBlacklistDuration = 60 * time.Second // Duration to blacklist sessions after manual transfer
|
transferBlacklistDuration = 60 * time.Second // Duration to blacklist sessions after manual transfer
|
||||||
|
|
||||||
// Grace period limits
|
// Grace period limits
|
||||||
maxGracePeriodEntries = 10 // Maximum number of grace period entries to prevent memory exhaustion
|
maxGracePeriodEntries = 10 // Maximum number of grace period entries to prevent memory exhaustion
|
||||||
|
|
||||||
|
// Emergency promotion limits (DoS protection)
|
||||||
|
emergencyWindowDuration = 60 * time.Second // Sliding window duration for emergency promotion rate limiting
|
||||||
|
maxEmergencyPromotionsPerMinute = 3 // Maximum emergency promotions allowed within the sliding window
|
||||||
|
emergencyPromotionCooldown = 10 * time.Second // Minimum time between individual emergency promotions
|
||||||
|
maxConsecutiveEmergencyPromotions = 3 // Maximum consecutive emergency promotions before blocking
|
||||||
|
emergencyPromotionWindowCleanupAge = 60 * time.Second // Age at which emergency window entries are cleaned up
|
||||||
|
|
||||||
|
// Trust scoring constants
|
||||||
|
invalidSessionTrustScore = -1000 // Trust score for non-existent sessions
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -81,12 +92,6 @@ type TransferBlacklistEntry struct {
|
||||||
ExpiresAt time.Time
|
ExpiresAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// Broadcast throttling state (DoS protection)
|
|
||||||
var (
|
|
||||||
lastBroadcast time.Time
|
|
||||||
broadcastMutex sync.Mutex
|
|
||||||
)
|
|
||||||
|
|
||||||
type SessionManager struct {
|
type SessionManager struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
primaryPromotionLock sync.Mutex
|
primaryPromotionLock sync.Mutex
|
||||||
|
|
@ -108,6 +113,8 @@ type SessionManager struct {
|
||||||
emergencyPromotionWindow []time.Time
|
emergencyPromotionWindow []time.Time
|
||||||
emergencyWindowMutex sync.Mutex
|
emergencyWindowMutex sync.Mutex
|
||||||
|
|
||||||
|
lastBroadcast time.Time
|
||||||
|
broadcastMutex sync.Mutex
|
||||||
broadcastQueue chan struct{}
|
broadcastQueue chan struct{}
|
||||||
broadcastPending atomic.Bool
|
broadcastPending atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
@ -184,15 +191,13 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe
|
||||||
|
|
||||||
nicknameReserved := false
|
nicknameReserved := false
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil || nicknameReserved {
|
if r := recover(); r != nil {
|
||||||
if nicknameReserved && session.Nickname != "" {
|
if nicknameReserved && session.Nickname != "" {
|
||||||
if sm.nicknameIndex[session.Nickname] == session {
|
if sm.nicknameIndex[session.Nickname] == session {
|
||||||
delete(sm.nicknameIndex, session.Nickname)
|
delete(sm.nicknameIndex, session.Nickname)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if r != nil {
|
panic(r)
|
||||||
panic(r)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
@ -221,7 +226,6 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe
|
||||||
|
|
||||||
// Check if a session with this ID already exists (reconnection)
|
// Check if a session with this ID already exists (reconnection)
|
||||||
if existing, exists := sm.sessions[session.ID]; exists {
|
if existing, exists := sm.sessions[session.ID]; exists {
|
||||||
// SECURITY: Verify identity matches to prevent session hijacking
|
|
||||||
if existing.Identity != session.Identity || existing.Source != session.Source {
|
if existing.Identity != session.Identity || existing.Source != session.Source {
|
||||||
return fmt.Errorf("session ID already in use by different user (identity mismatch)")
|
return fmt.Errorf("session ID already in use by different user (identity mismatch)")
|
||||||
}
|
}
|
||||||
|
|
@ -237,7 +241,6 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe
|
||||||
existing.ControlChannel = session.ControlChannel
|
existing.ControlChannel = session.ControlChannel
|
||||||
existing.RPCChannel = session.RPCChannel
|
existing.RPCChannel = session.RPCChannel
|
||||||
existing.HidChannel = session.HidChannel
|
existing.HidChannel = session.HidChannel
|
||||||
existing.LastActive = time.Now()
|
|
||||||
existing.flushCandidates = session.flushCandidates
|
existing.flushCandidates = session.flushCandidates
|
||||||
// Preserve existing mode and nickname
|
// Preserve existing mode and nickname
|
||||||
session.Mode = existing.Mode
|
session.Mode = existing.Mode
|
||||||
|
|
@ -1197,7 +1200,11 @@ func (sm *SessionManager) transferPrimaryRole(fromSessionID, toSessionID, transf
|
||||||
// Promote target session
|
// Promote target session
|
||||||
toSession.Mode = SessionModePrimary
|
toSession.Mode = SessionModePrimary
|
||||||
toSession.hidRPCAvailable = false // Force re-handshake
|
toSession.hidRPCAvailable = false // Force re-handshake
|
||||||
toSession.LastActive = time.Now() // Reset activity timestamp to prevent immediate timeout
|
// Only reset LastActive for emergency promotions to prevent immediate re-timeout
|
||||||
|
// For manual transfers, preserve existing LastActive to maintain timeout accuracy
|
||||||
|
if transferType == "emergency_timeout_promotion" || transferType == "emergency_promotion_deadlock_prevention" {
|
||||||
|
toSession.LastActive = time.Now() // Reset for emergency promotions only
|
||||||
|
}
|
||||||
sm.primarySessionID = toSessionID
|
sm.primarySessionID = toSessionID
|
||||||
|
|
||||||
// ALWAYS set lastPrimaryID to the new primary to support WebRTC reconnections
|
// ALWAYS set lastPrimaryID to the new primary to support WebRTC reconnections
|
||||||
|
|
@ -1272,7 +1279,7 @@ func (sm *SessionManager) transferPrimaryRole(fromSessionID, toSessionID, transf
|
||||||
// Send reconnection signal for emergency promotions via WebSocket (more reliable than RPC when channel is stale)
|
// Send reconnection signal for emergency promotions via WebSocket (more reliable than RPC when channel is stale)
|
||||||
if toExists && (transferType == "emergency_timeout_promotion" || transferType == "emergency_auto_promotion") {
|
if toExists && (transferType == "emergency_timeout_promotion" || transferType == "emergency_auto_promotion") {
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(globalBroadcastDelay)
|
||||||
|
|
||||||
eventData := map[string]interface{}{
|
eventData := map[string]interface{}{
|
||||||
"sessionId": toSessionID,
|
"sessionId": toSessionID,
|
||||||
|
|
@ -1362,8 +1369,7 @@ func (sm *SessionManager) getCurrentPrimaryTimeout() time.Duration {
|
||||||
// Use session settings if available
|
// Use session settings if available
|
||||||
if currentSessionSettings != nil {
|
if currentSessionSettings != nil {
|
||||||
if currentSessionSettings.PrimaryTimeout == 0 {
|
if currentSessionSettings.PrimaryTimeout == 0 {
|
||||||
// 0 means disabled - return a very large duration
|
return disabledTimeoutValue
|
||||||
return 24 * time.Hour
|
|
||||||
} else if currentSessionSettings.PrimaryTimeout > 0 {
|
} else if currentSessionSettings.PrimaryTimeout > 0 {
|
||||||
return time.Duration(currentSessionSettings.PrimaryTimeout) * time.Second
|
return time.Duration(currentSessionSettings.PrimaryTimeout) * time.Second
|
||||||
}
|
}
|
||||||
|
|
@ -1376,7 +1382,7 @@ func (sm *SessionManager) getCurrentPrimaryTimeout() time.Duration {
|
||||||
func (sm *SessionManager) getSessionTrustScore(sessionID string) int {
|
func (sm *SessionManager) getSessionTrustScore(sessionID string) int {
|
||||||
session, exists := sm.sessions[sessionID]
|
session, exists := sm.sessions[sessionID]
|
||||||
if !exists {
|
if !exists {
|
||||||
return -1000 // Session doesn't exist
|
return invalidSessionTrustScore
|
||||||
}
|
}
|
||||||
|
|
||||||
score := 0
|
score := 0
|
||||||
|
|
@ -1422,9 +1428,7 @@ func (sm *SessionManager) findMostTrustedSessionForEmergency() string {
|
||||||
bestSessionID := ""
|
bestSessionID := ""
|
||||||
bestScore := -1
|
bestScore := -1
|
||||||
|
|
||||||
// First pass: try to find observers or queued sessions (preferred)
|
|
||||||
for sessionID, session := range sm.sessions {
|
for sessionID, session := range sm.sessions {
|
||||||
// Skip if blacklisted, primary, or not eligible modes
|
|
||||||
if sm.isSessionBlacklisted(sessionID) ||
|
if sm.isSessionBlacklisted(sessionID) ||
|
||||||
session.Mode == SessionModePrimary ||
|
session.Mode == SessionModePrimary ||
|
||||||
(session.Mode != SessionModeObserver && session.Mode != SessionModeQueued) {
|
(session.Mode != SessionModeObserver && session.Mode != SessionModeQueued) {
|
||||||
|
|
@ -1438,24 +1442,6 @@ func (sm *SessionManager) findMostTrustedSessionForEmergency() string {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no observers/queued found, try pending sessions as last resort
|
|
||||||
if bestSessionID == "" {
|
|
||||||
for sessionID, session := range sm.sessions {
|
|
||||||
if sm.isSessionBlacklisted(sessionID) || session.Mode == SessionModePrimary {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if session.Mode == SessionModePending {
|
|
||||||
score := sm.getSessionTrustScore(sessionID)
|
|
||||||
if score > bestScore {
|
|
||||||
bestScore = score
|
|
||||||
bestSessionID = sessionID
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Log the selection decision for audit trail
|
|
||||||
if bestSessionID != "" {
|
if bestSessionID != "" {
|
||||||
sm.logger.Info().
|
sm.logger.Info().
|
||||||
Str("selectedSession", bestSessionID).
|
Str("selectedSession", bestSessionID).
|
||||||
|
|
@ -1589,13 +1575,13 @@ func (sm *SessionManager) broadcastSessionListUpdate() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sm *SessionManager) executeBroadcast() {
|
func (sm *SessionManager) executeBroadcast() {
|
||||||
broadcastMutex.Lock()
|
sm.broadcastMutex.Lock()
|
||||||
if time.Since(lastBroadcast) < globalBroadcastDelay {
|
if time.Since(sm.lastBroadcast) < globalBroadcastDelay {
|
||||||
broadcastMutex.Unlock()
|
sm.broadcastMutex.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
lastBroadcast = time.Now()
|
sm.lastBroadcast = time.Now()
|
||||||
broadcastMutex.Unlock()
|
sm.broadcastMutex.Unlock()
|
||||||
|
|
||||||
sm.mu.RLock()
|
sm.mu.RLock()
|
||||||
infos := make([]SessionData, 0, len(sm.sessions))
|
infos := make([]SessionData, 0, len(sm.sessions))
|
||||||
|
|
@ -1647,7 +1633,8 @@ func (sm *SessionManager) Shutdown() {
|
||||||
sm.mu.Lock()
|
sm.mu.Lock()
|
||||||
defer sm.mu.Unlock()
|
defer sm.mu.Unlock()
|
||||||
|
|
||||||
// Clean up all sessions
|
close(sm.broadcastQueue)
|
||||||
|
|
||||||
for id := range sm.sessions {
|
for id := range sm.sessions {
|
||||||
delete(sm.sessions, id)
|
delete(sm.sessions, id)
|
||||||
}
|
}
|
||||||
|
|
@ -1668,6 +1655,18 @@ func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
needsBroadcast := false
|
needsBroadcast := false
|
||||||
|
|
||||||
|
// Clean up expired emergency promotion window entries
|
||||||
|
sm.emergencyWindowMutex.Lock()
|
||||||
|
cutoff := now.Add(-emergencyPromotionWindowCleanupAge)
|
||||||
|
validEntries := make([]time.Time, 0, len(sm.emergencyPromotionWindow))
|
||||||
|
for _, t := range sm.emergencyPromotionWindow {
|
||||||
|
if t.After(cutoff) {
|
||||||
|
validEntries = append(validEntries, t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sm.emergencyPromotionWindow = validEntries
|
||||||
|
sm.emergencyWindowMutex.Unlock()
|
||||||
|
|
||||||
// Handle expired grace periods
|
// Handle expired grace periods
|
||||||
gracePeriodExpired := sm.handleGracePeriodExpiration(now)
|
gracePeriodExpired := sm.handleGracePeriodExpiration(now)
|
||||||
if gracePeriodExpired {
|
if gracePeriodExpired {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue