Compare commits

...

5 Commits

Author SHA1 Message Date
Alex P 08b0dd0c37 chore: restore jiggler.go from dev branch
Replaced custom jiggler implementation with dev branch version:
- Uses rpcAbsMouseReport() instead of gadget.RelMouseReport()
- Maintains same behavior: does NOT call UpdateLastActive()
- Ensures jiggler activity doesn't interfere with session timeouts
- Preserves all multi-session timeout fixes

This change does not affect multi-session functionality.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-17 16:21:29 +03:00
Alex P f2431e9bbf fix: jiggler should not prevent primary session timeout
Problem: The jiggler was calling sessionManager.UpdateLastActive() which
prevented the primary session timeout from ever triggering. This made it
impossible to automatically demote inactive primary sessions.

Root cause analysis:
- Jiggler is automated mouse movement to prevent remote PC sleep
- It was incorrectly updating LastActive timestamp as if it were user input
- This reset the inactivity timer every time jiggler ran
- Primary session timeout requires LastActive to remain unchanged during
  actual user inactivity

Changes:
- Removed sessionManager.UpdateLastActive() call from jiggler.go:145
- Added comment explaining why jiggler should not update LastActive
- Session timeout now correctly tracks only REAL user input:
  * Keyboard events (via USB HID)
  * Mouse events (via USB HID)
  * Native operations
- Jiggler mouse movement is explicitly excluded from activity tracking

This works together with the previous fix that removed LastActive reset
during WebSocket reconnections.

Impact:
- Primary sessions will now correctly timeout after configured inactivity
- Jiggler continues to prevent remote PC sleep as intended
- Only genuine user input resets the inactivity timer

Test:
1. Enable jiggler with short interval (e.g., every 10 seconds)
2. Set primary timeout to 60 seconds
3. Leave primary tab in background with no user input
4. Jiggler will keep remote PC awake
5. After 60 seconds, primary session is correctly demoted
2025-10-17 15:30:55 +03:00
Alex P c9d8dcb553 fix: primary session timeout not triggering due to reconnection resets
Fixed critical bug where primary session timeout was never triggered even
after configured inactivity period (e.g., 60 seconds with no input).

Root cause: LastActive timestamp was being reset during WebSocket
reconnections and session promotions, preventing the inactivity timer
from ever reaching the timeout threshold.

Changes:
- session_manager.go:245: Removed LastActive reset during reconnection
  in AddSession(). Reconnections should NOT reset the activity timer
  since timeout is based on input activity, not connection activity.

- session_manager.go:1207-1209: Made LastActive reset conditional in
  transferPrimaryRole(). Only emergency promotions reset the timer to
  prevent immediate re-timeout. Manual transfers preserve existing
  LastActive for accurate timeout tracking.

Impact:
- Primary sessions will now correctly timeout after configured inactivity
- LastActive only updated by actual user input (keyboard/mouse events)
- Emergency promotions still get fresh timer to prevent rapid re-timeout
- Manual transfers maintain accurate activity tracking

Test scenario:
1. User becomes primary and leaves tab in background
2. No keyboard/mouse input for 60+ seconds (timeout configured)
3. WebSocket stays connected but LastActive is not reset
4. handlePrimarySessionTimeout() detects inactivity and demotes primary
5. Next eligible observer is automatically promoted
2025-10-17 15:15:35 +03:00
Alex P 711f7818bf Cleanup: remove unnecessary md file 2025-10-17 14:31:10 +03:00
Alex P 40ccecc902 fix: address critical race conditions and security issues in multi-session
This commit resolves multiple critical issues in the multi-session implementation:

Race Conditions Fixed:
- Add primaryPromotionLock mutex to prevent dual-primary corruption
- Implement atomic nickname reservation before session addition
- Add corruption detection and auto-fix in transferPrimaryRole
- Implement broadcast coalescing to prevent storms

Security Improvements:
- Add permission check for HID RPC handshake
- Implement sliding window rate limiting for emergency promotions
- Add global RPC rate limiter (2000 req/sec across all sessions)
- Enhance nickname validation (control chars, zero-width chars, unicode)

Reliability Enhancements:
- Add 5-second timeouts to all WebSocket writes
- Add RPC queue monitoring (warns at 200+ messages)
- Verify grace period memory leak protection
- Verify goroutine cleanup on session removal

Technical Details:
- Use double-locking pattern (primaryPromotionLock → mu)
- Implement deferred cleanup for failed nickname reservations
- Use atomic.Bool for broadcast coalescing
- Add trust scoring for emergency promotion selection

Files Modified:
- session_manager.go: Core session management fixes
- session_cleanup_handlers.go: Rate limiting for emergency promotions
- hidrpc.go: Permission checks for handshake
- jsonrpc_session_handlers.go: Enhanced nickname validation
- jsonrpc.go: Global RPC rate limiting
- webrtc.go: WebSocket timeouts and queue monitoring

Total: 266 insertions, 73 deletions across 6 files
2025-10-17 14:28:16 +03:00
7 changed files with 313 additions and 126 deletions

View File

@ -16,6 +16,13 @@ func handleHidRPCMessage(message hidrpc.Message, session *Session) {
switch message.Type() {
case hidrpc.TypeHandshake:
if !session.HasPermission(PermissionVideoView) {
logger.Debug().
Str("sessionID", session.ID).
Str("mode", string(session.Mode)).
Msg("handshake blocked: session lacks PermissionVideoView")
return
}
message, err := hidrpc.NewHandshakeMessage().Marshal()
if err != nil {
logger.Warn().Err(err).Msg("failed to marshal handshake message")

View File

@ -129,22 +129,18 @@ func runJiggler() {
}
inactivitySeconds := config.JigglerConfig.InactivityLimitSeconds
timeSinceLastInput := time.Since(gadget.GetLastUserInputTime())
logger.Debug().Msgf("Time since last user input %v", timeSinceLastInput)
if timeSinceLastInput > time.Duration(inactivitySeconds)*time.Second {
err := gadget.RelMouseReport(1, 0, 0)
logger.Debug().Msg("Jiggling mouse...")
//TODO: change to rel mouse
err := rpcAbsMouseReport(1, 1, 0)
if err != nil {
logger.Warn().Msgf("Failed to jiggle mouse: %v", err)
}
time.Sleep(50 * time.Millisecond)
err = gadget.RelMouseReport(-1, 0, 0)
err = rpcAbsMouseReport(0, 0, 0)
if err != nil {
logger.Warn().Msgf("Failed to reset mouse position: %v", err)
}
if sessionManager != nil {
if primarySession := sessionManager.GetPrimarySession(); primarySession != nil {
sessionManager.UpdateLastActive(primarySession.ID)
}
}
}
}
}

View File

@ -32,6 +32,32 @@ func isValidNickname(nickname string) bool {
return nicknameRegex.MatchString(nickname)
}
// Global RPC rate limiting (protects against coordinated DoS from multiple sessions)
var (
globalRPCRateLimitMu sync.Mutex
globalRPCRateLimit int
globalRPCRateLimitWin time.Time
)
func checkGlobalRPCRateLimit() bool {
const (
maxGlobalRPCPerSecond = 2000
rateLimitWindow = time.Second
)
globalRPCRateLimitMu.Lock()
defer globalRPCRateLimitMu.Unlock()
now := time.Now()
if now.Sub(globalRPCRateLimitWin) > rateLimitWindow {
globalRPCRateLimit = 0
globalRPCRateLimitWin = now
}
globalRPCRateLimit++
return globalRPCRateLimit <= maxGlobalRPCPerSecond
}
type JSONRPCRequest struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
@ -119,7 +145,24 @@ func broadcastJSONRPCEvent(event string, params any) {
}
func onRPCMessage(message webrtc.DataChannelMessage, session *Session) {
// Rate limit check (DoS protection)
// Global rate limit check (protects against coordinated DoS from multiple sessions)
if !checkGlobalRPCRateLimit() {
jsonRpcLogger.Warn().
Str("sessionId", session.ID).
Msg("Global RPC rate limit exceeded")
errorResponse := JSONRPCResponse{
JSONRPC: "2.0",
Error: map[string]any{
"code": -32000,
"message": "Global rate limit exceeded",
},
ID: 0,
}
writeJSONRPCResponse(errorResponse, session)
return
}
// Per-session rate limit check (DoS protection)
if !session.CheckRPCRateLimit() {
jsonRpcLogger.Warn().
Str("sessionId", session.ID).

View File

@ -95,19 +95,43 @@ func handleRequestSessionApprovalRPC(session *Session) (any, error) {
return map[string]interface{}{"status": "requested"}, nil
}
// handleUpdateSessionNicknameRPC handles nickname updates for sessions
func validateNickname(nickname string) error {
if len(nickname) < 2 {
return errors.New("nickname must be at least 2 characters")
}
if len(nickname) > 30 {
return errors.New("nickname must be 30 characters or less")
}
if !isValidNickname(nickname) {
return errors.New("nickname can only contain letters, numbers, spaces, and - _ . @")
}
for i, r := range nickname {
if r < 32 || r == 127 {
return fmt.Errorf("nickname contains control character at position %d", i)
}
if r >= 0x200B && r <= 0x200D {
return errors.New("nickname contains zero-width character")
}
}
trimmed := ""
for _, r := range nickname {
trimmed += string(r)
}
if trimmed != nickname {
return errors.New("nickname contains disallowed unicode")
}
return nil
}
func handleUpdateSessionNicknameRPC(params map[string]any, session *Session) (any, error) {
sessionID, _ := params["sessionId"].(string)
nickname, _ := params["nickname"].(string)
if len(nickname) < 2 {
return nil, errors.New("nickname must be at least 2 characters")
}
if len(nickname) > 30 {
return nil, errors.New("nickname must be 30 characters or less")
}
if !isValidNickname(nickname) {
return nil, errors.New("nickname can only contain letters, numbers, spaces, and - _ . @")
if err := validateNickname(nickname); err != nil {
return nil, err
}
targetSession := sessionManager.GetSession(sessionID)

View File

@ -22,30 +22,43 @@ func (sm *SessionManager) attemptEmergencyPromotion(ctx emergencyPromotionContex
return promotedID, false, false
}
// Emergency promotion path
hasPrimary := sm.primarySessionID != ""
if !hasPrimary {
sm.emergencyWindowMutex.Lock()
defer sm.emergencyWindowMutex.Unlock()
const slidingWindowDuration = 60 * time.Second
const maxEmergencyPromotionsPerMinute = 3
cutoff := ctx.now.Add(-slidingWindowDuration)
validEntries := make([]time.Time, 0, len(sm.emergencyPromotionWindow))
for _, t := range sm.emergencyPromotionWindow {
if t.After(cutoff) {
validEntries = append(validEntries, t)
}
}
sm.emergencyPromotionWindow = validEntries
if len(sm.emergencyPromotionWindow) >= maxEmergencyPromotionsPerMinute {
sm.logger.Error().
Str("triggerSessionID", ctx.triggerSessionID).
Msg("CRITICAL: No primary session exists - bypassing all rate limits")
} else {
// Rate limiting (only when we have a primary)
if ctx.now.Sub(sm.lastEmergencyPromotion) < 30*time.Second {
Int("promotionsInLastMinute", len(sm.emergencyPromotionWindow)).
Msg("Emergency promotion rate limit exceeded - potential attack")
return "", false, true
}
if ctx.now.Sub(sm.lastEmergencyPromotion) < 10*time.Second {
sm.logger.Warn().
Str("triggerSessionID", ctx.triggerSessionID).
Dur("timeSinceLastEmergency", ctx.now.Sub(sm.lastEmergencyPromotion)).
Msgf("Emergency promotion rate limit exceeded - potential attack (%s)", ctx.triggerReason)
return "", false, true // shouldSkip = true
Msg("Emergency promotion cooldown active")
return "", false, true
}
// Limit consecutive emergency promotions
if sm.consecutiveEmergencyPromotions >= 3 {
sm.logger.Error().
Str("triggerSessionID", ctx.triggerSessionID).
Int("consecutiveCount", sm.consecutiveEmergencyPromotions).
Msgf("Too many consecutive emergency promotions - blocking for security (%s)", ctx.triggerReason)
return "", false, true // shouldSkip = true
}
Msg("Too many consecutive emergency promotions - blocking")
return "", false, true
}
// Find best session for emergency promotion
@ -123,6 +136,9 @@ func (sm *SessionManager) promoteAfterGraceExpiration(expiredSessionID string, n
reason := "grace_expiration_promotion"
if isEmergency {
reason = "emergency_promotion_deadlock_prevention"
sm.emergencyWindowMutex.Lock()
sm.emergencyPromotionWindow = append(sm.emergencyPromotionWindow, now)
sm.emergencyWindowMutex.Unlock()
sm.lastEmergencyPromotion = now
sm.consecutiveEmergencyPromotions++
@ -249,6 +265,9 @@ func (sm *SessionManager) handlePrimarySessionTimeout(now time.Time) bool {
reason := "timeout_promotion"
if isEmergency {
reason = "emergency_timeout_promotion"
sm.emergencyWindowMutex.Lock()
sm.emergencyPromotionWindow = append(sm.emergencyPromotionWindow, now)
sm.emergencyWindowMutex.Unlock()
sm.lastEmergencyPromotion = now
sm.consecutiveEmergencyPromotions++

View File

@ -6,6 +6,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
@ -30,12 +31,23 @@ const (
// Session timeout defaults
defaultPendingSessionTimeout = 1 * time.Minute // Timeout for pending sessions (DoS protection)
defaultObserverSessionTimeout = 2 * time.Minute // Timeout for inactive observer sessions
disabledTimeoutValue = 24 * time.Hour // Value used when timeout is disabled (0 setting)
// Transfer and blacklist settings
transferBlacklistDuration = 60 * time.Second // Duration to blacklist sessions after manual transfer
// Grace period limits
maxGracePeriodEntries = 10 // Maximum number of grace period entries to prevent memory exhaustion
// Emergency promotion limits (DoS protection)
emergencyWindowDuration = 60 * time.Second // Sliding window duration for emergency promotion rate limiting
maxEmergencyPromotionsPerMinute = 3 // Maximum emergency promotions allowed within the sliding window
emergencyPromotionCooldown = 10 * time.Second // Minimum time between individual emergency promotions
maxConsecutiveEmergencyPromotions = 3 // Maximum consecutive emergency promotions before blocking
emergencyPromotionWindowCleanupAge = 60 * time.Second // Age at which emergency window entries are cleaned up
// Trust scoring constants
invalidSessionTrustScore = -1000 // Trust score for non-existent sessions
)
var (
@ -80,30 +92,31 @@ type TransferBlacklistEntry struct {
ExpiresAt time.Time
}
// Broadcast throttling state (DoS protection)
var (
lastBroadcast time.Time
broadcastMutex sync.Mutex
)
type SessionManager struct {
mu sync.RWMutex // 24 bytes - place first for better alignment
primaryTimeout time.Duration // 8 bytes
logger *zerolog.Logger // 8 bytes
sessions map[string]*Session // 8 bytes
nicknameIndex map[string]*Session // 8 bytes - O(1) nickname uniqueness lookups
reconnectGrace map[string]time.Time // 8 bytes
reconnectInfo map[string]*SessionData // 8 bytes
transferBlacklist []TransferBlacklistEntry // Prevent demoted sessions from immediate re-promotion
queueOrder []string // 24 bytes (slice header)
primarySessionID string // 16 bytes
lastPrimaryID string // 16 bytes
maxSessions int // 8 bytes
cleanupCancel context.CancelFunc // For stopping cleanup goroutine
mu sync.RWMutex
primaryPromotionLock sync.Mutex
primaryTimeout time.Duration
logger *zerolog.Logger
sessions map[string]*Session
nicknameIndex map[string]*Session
reconnectGrace map[string]time.Time
reconnectInfo map[string]*SessionData
transferBlacklist []TransferBlacklistEntry
queueOrder []string
primarySessionID string
lastPrimaryID string
maxSessions int
cleanupCancel context.CancelFunc
// Emergency promotion tracking for safety
lastEmergencyPromotion time.Time
consecutiveEmergencyPromotions int
emergencyPromotionWindow []time.Time
emergencyWindowMutex sync.Mutex
lastBroadcast time.Time
broadcastMutex sync.Mutex
broadcastQueue chan struct{}
broadcastPending atomic.Bool
}
// NewSessionManager creates a new session manager
@ -141,12 +154,13 @@ func NewSessionManager(logger *zerolog.Logger) *SessionManager {
logger: logger,
maxSessions: maxSessions,
primaryTimeout: primaryTimeout,
broadcastQueue: make(chan struct{}, 100),
}
// Start background cleanup of inactive sessions
ctx, cancel := context.WithCancel(context.Background())
sm.cleanupCancel = cancel
go sm.cleanupInactiveSessions(ctx)
go sm.broadcastWorker(ctx)
return sm
}
@ -175,13 +189,26 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe
sm.mu.Lock()
defer sm.mu.Unlock()
// Check nickname uniqueness using O(1) index (only for non-empty nicknames)
nicknameReserved := false
defer func() {
if r := recover(); r != nil {
if nicknameReserved && session.Nickname != "" {
if sm.nicknameIndex[session.Nickname] == session {
delete(sm.nicknameIndex, session.Nickname)
}
}
panic(r)
}
}()
if session.Nickname != "" {
if existingSession, exists := sm.nicknameIndex[session.Nickname]; exists {
if existingSession.ID != session.ID {
return fmt.Errorf("nickname '%s' is already in use by another session", session.Nickname)
}
}
sm.nicknameIndex[session.Nickname] = session
nicknameReserved = true
}
wasWithinGracePeriod := false
@ -199,7 +226,6 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe
// Check if a session with this ID already exists (reconnection)
if existing, exists := sm.sessions[session.ID]; exists {
// SECURITY: Verify identity matches to prevent session hijacking
if existing.Identity != session.Identity || existing.Source != session.Source {
return fmt.Errorf("session ID already in use by different user (identity mismatch)")
}
@ -215,9 +241,8 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe
existing.ControlChannel = session.ControlChannel
existing.RPCChannel = session.RPCChannel
existing.HidChannel = session.HidChannel
existing.LastActive = time.Now()
existing.flushCandidates = session.flushCandidates
// Preserve existing mode and nickname
// Preserve mode and nickname
session.Mode = existing.Mode
session.Nickname = existing.Nickname
session.CreatedAt = existing.CreatedAt
@ -348,11 +373,9 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe
Int("totalSessions", len(sm.sessions)).
Msg("Session added to manager")
// Ensure session has auto-generated nickname if needed
sm.ensureNickname(session)
// Add to nickname index
if session.Nickname != "" {
if !nicknameReserved && session.Nickname != "" {
sm.nicknameIndex[session.Nickname] = session
}
@ -383,13 +406,18 @@ func (sm *SessionManager) RemoveSession(sessionID string) {
wasPrimary := session.Mode == SessionModePrimary
delete(sm.sessions, sessionID)
if session.Nickname != "" {
if sm.nicknameIndex[session.Nickname] == session {
delete(sm.nicknameIndex, session.Nickname)
}
}
sm.logger.Info().
Str("sessionID", sessionID).
Bool("wasPrimary", wasPrimary).
Int("remainingSessions", len(sm.sessions)).
Msg("Session removed from manager")
// Remove from queue if present
sm.removeFromQueue(sessionID)
// Check if this session was marked for immediate removal (intentional logout)
@ -1063,9 +1091,10 @@ func (sm *SessionManager) validateSinglePrimary() {
}
}
// transferPrimaryRole is the centralized method for all primary role transfers
// It handles bidirectional blacklisting and logging consistently across all transfer types
func (sm *SessionManager) transferPrimaryRole(fromSessionID, toSessionID, transferType, context string) error {
sm.primaryPromotionLock.Lock()
defer sm.primaryPromotionLock.Unlock()
// Validate sessions exist
toSession, toExists := sm.sessions[toSessionID]
if !toExists {
@ -1107,22 +1136,74 @@ func (sm *SessionManager) transferPrimaryRole(fromSessionID, toSessionID, transf
Msg("Demoted existing primary session")
}
// SECURITY: Before promoting, verify there are no other primary sessions
primaryCount := 0
var existingPrimaryID string
for id, sess := range sm.sessions {
if sess.Mode == SessionModePrimary {
primaryCount++
if id != toSessionID {
existingPrimaryID = id
}
}
}
if primaryCount > 1 || (primaryCount == 1 && existingPrimaryID != "" && existingPrimaryID != sm.primarySessionID) {
sm.logger.Error().
Int("primaryCount", primaryCount).
Str("existingPrimaryID", existingPrimaryID).
Str("targetPromotionID", toSessionID).
Str("managerPrimaryID", sm.primarySessionID).
Str("transferType", transferType).
Msg("CRITICAL: Dual-primary corruption detected - forcing fix")
for id, sess := range sm.sessions {
if sess.Mode == SessionModePrimary {
if id != sm.primarySessionID && id != toSessionID {
sess.Mode = SessionModeObserver
sm.logger.Warn().
Str("demotedSessionID", id).
Msg("Force-demoted session due to dual-primary corruption")
}
}
}
if sm.primarySessionID != "" && sm.sessions[sm.primarySessionID] != nil {
if sm.sessions[sm.primarySessionID].Mode != SessionModePrimary {
sm.primarySessionID = ""
}
}
existingPrimaryID = ""
for id, sess := range sm.sessions {
if id != toSessionID && sess.Mode == SessionModePrimary {
existingPrimaryID = id
break
}
}
if existingPrimaryID != "" {
sm.logger.Error().
Str("existingPrimaryID", id).
Str("existingPrimaryID", existingPrimaryID).
Str("targetPromotionID", toSessionID).
Msg("CRITICAL: Cannot fix dual-primary corruption - blocking promotion")
return fmt.Errorf("cannot promote: dual-primary corruption detected and fix failed (%s)", existingPrimaryID)
}
} else if existingPrimaryID != "" {
sm.logger.Error().
Str("existingPrimaryID", existingPrimaryID).
Str("targetPromotionID", toSessionID).
Str("transferType", transferType).
Msg("CRITICAL: Attempted to create second primary - blocking promotion")
return fmt.Errorf("cannot promote: another primary session exists (%s)", id)
}
return fmt.Errorf("cannot promote: another primary session exists (%s)", existingPrimaryID)
}
// Promote target session
toSession.Mode = SessionModePrimary
toSession.hidRPCAvailable = false // Force re-handshake
toSession.LastActive = time.Now() // Reset activity timestamp to prevent immediate timeout
toSession.hidRPCAvailable = false
// Reset LastActive only for emergency promotions to prevent immediate re-timeout
if transferType == "emergency_timeout_promotion" || transferType == "emergency_promotion_deadlock_prevention" {
toSession.LastActive = time.Now()
}
sm.primarySessionID = toSessionID
// ALWAYS set lastPrimaryID to the new primary to support WebRTC reconnections
@ -1197,7 +1278,7 @@ func (sm *SessionManager) transferPrimaryRole(fromSessionID, toSessionID, transf
// Send reconnection signal for emergency promotions via WebSocket (more reliable than RPC when channel is stale)
if toExists && (transferType == "emergency_timeout_promotion" || transferType == "emergency_auto_promotion") {
go func() {
time.Sleep(100 * time.Millisecond)
time.Sleep(globalBroadcastDelay)
eventData := map[string]interface{}{
"sessionId": toSessionID,
@ -1287,8 +1368,7 @@ func (sm *SessionManager) getCurrentPrimaryTimeout() time.Duration {
// Use session settings if available
if currentSessionSettings != nil {
if currentSessionSettings.PrimaryTimeout == 0 {
// 0 means disabled - return a very large duration
return 24 * time.Hour
return disabledTimeoutValue
} else if currentSessionSettings.PrimaryTimeout > 0 {
return time.Duration(currentSessionSettings.PrimaryTimeout) * time.Second
}
@ -1301,7 +1381,7 @@ func (sm *SessionManager) getCurrentPrimaryTimeout() time.Duration {
func (sm *SessionManager) getSessionTrustScore(sessionID string) int {
session, exists := sm.sessions[sessionID]
if !exists {
return -1000 // Session doesn't exist
return invalidSessionTrustScore
}
score := 0
@ -1347,9 +1427,7 @@ func (sm *SessionManager) findMostTrustedSessionForEmergency() string {
bestSessionID := ""
bestScore := -1
// First pass: try to find observers or queued sessions (preferred)
for sessionID, session := range sm.sessions {
// Skip if blacklisted, primary, or not eligible modes
if sm.isSessionBlacklisted(sessionID) ||
session.Mode == SessionModePrimary ||
(session.Mode != SessionModeObserver && session.Mode != SessionModeQueued) {
@ -1363,24 +1441,6 @@ func (sm *SessionManager) findMostTrustedSessionForEmergency() string {
}
}
// If no observers/queued found, try pending sessions as last resort
if bestSessionID == "" {
for sessionID, session := range sm.sessions {
if sm.isSessionBlacklisted(sessionID) || session.Mode == SessionModePrimary {
continue
}
if session.Mode == SessionModePending {
score := sm.getSessionTrustScore(sessionID)
if score > bestScore {
bestScore = score
bestSessionID = sessionID
}
}
}
}
// Log the selection decision for audit trail
if bestSessionID != "" {
sm.logger.Info().
Str("selectedSession", bestSessionID).
@ -1492,21 +1552,37 @@ func (sm *SessionManager) updateAllSessionNicknames() {
}
}
func (sm *SessionManager) broadcastSessionListUpdate() {
// Throttle broadcasts to prevent DoS
broadcastMutex.Lock()
if time.Since(lastBroadcast) < globalBroadcastDelay {
broadcastMutex.Unlock()
return // Skip this broadcast to prevent storm
func (sm *SessionManager) broadcastWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-sm.broadcastQueue:
sm.broadcastPending.Store(false)
sm.executeBroadcast()
}
}
}
lastBroadcast = time.Now()
broadcastMutex.Unlock()
// Must be called in a goroutine to avoid deadlock
// Get all sessions first - use read lock only, no validation during broadcasts
func (sm *SessionManager) broadcastSessionListUpdate() {
if sm.broadcastPending.CompareAndSwap(false, true) {
select {
case sm.broadcastQueue <- struct{}{}:
default:
}
}
}
func (sm *SessionManager) executeBroadcast() {
sm.broadcastMutex.Lock()
if time.Since(sm.lastBroadcast) < globalBroadcastDelay {
sm.broadcastMutex.Unlock()
return
}
sm.lastBroadcast = time.Now()
sm.broadcastMutex.Unlock()
sm.mu.RLock()
// Build session infos and collect active sessions in one pass
infos := make([]SessionData, 0, len(sm.sessions))
activeSessions := make([]*Session, 0, len(sm.sessions))
@ -1521,17 +1597,13 @@ func (sm *SessionManager) broadcastSessionListUpdate() {
LastActive: session.LastActive,
})
// Only collect sessions ready for broadcast
if session.RPCChannel != nil {
activeSessions = append(activeSessions, session)
}
}
sm.mu.RUnlock()
// Now send events without holding lock
for _, session := range activeSessions {
// Per-session throttling to prevent broadcast storms
session.lastBroadcastMu.Lock()
shouldSkip := time.Since(session.LastBroadcast) < sessionBroadcastDelay
if !shouldSkip {
@ -1560,7 +1632,8 @@ func (sm *SessionManager) Shutdown() {
sm.mu.Lock()
defer sm.mu.Unlock()
// Clean up all sessions
close(sm.broadcastQueue)
for id := range sm.sessions {
delete(sm.sessions, id)
}
@ -1581,6 +1654,18 @@ func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) {
now := time.Now()
needsBroadcast := false
// Clean up expired emergency promotion window entries
sm.emergencyWindowMutex.Lock()
cutoff := now.Add(-emergencyPromotionWindowCleanupAge)
validEntries := make([]time.Time, 0, len(sm.emergencyPromotionWindow))
for _, t := range sm.emergencyPromotionWindow {
if t.After(cutoff) {
validEntries = append(validEntries, t)
}
}
sm.emergencyPromotionWindow = validEntries
sm.emergencyWindowMutex.Unlock()
// Handle expired grace periods
gracePeriodExpired := sm.handleGracePeriodExpiration(now)
if gracePeriodExpired {

View File

@ -123,7 +123,10 @@ func (s *Session) sendWebSocketSignal(messageType string, data map[string]interf
return nil
}
err := wsjson.Write(context.Background(), s.ws, gin.H{"type": messageType, "data": data})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := wsjson.Write(ctx, s.ws, gin.H{"type": messageType, "data": data})
if err != nil {
webrtcLogger.Debug().Err(err).Str("sessionId", s.ID).Msg("Failed to send WebSocket signal")
return err
@ -347,7 +350,13 @@ func newSession(config SessionConfig) (*Session, error) {
case "rpc":
session.RPCChannel = d
d.OnMessage(func(msg webrtc.DataChannelMessage) {
// Enqueue to ensure ordered processing
queueLen := len(session.rpcQueue)
if queueLen > 200 {
scopedLogger.Warn().
Str("sessionID", session.ID).
Int("queueLen", queueLen).
Msg("RPC queue approaching capacity")
}
session.rpcQueue <- msg
})
triggerOTAStateUpdate()
@ -406,7 +415,9 @@ func newSession(config SessionConfig) (*Session, error) {
}
candidateBufferMutex.Unlock()
err := wsjson.Write(context.Background(), config.ws, gin.H{"type": "new-ice-candidate", "data": candidate.ToJSON()})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := wsjson.Write(ctx, config.ws, gin.H{"type": "new-ice-candidate", "data": candidate.ToJSON()})
if err != nil {
scopedLogger.Warn().Err(err).Msg("failed to write new-ice-candidate to WebRTC signaling channel")
}
@ -419,7 +430,9 @@ func newSession(config SessionConfig) (*Session, error) {
answerSent = true
// Send all buffered candidates
for _, candidate := range candidateBuffer {
err := wsjson.Write(context.Background(), config.ws, gin.H{"type": "new-ice-candidate", "data": candidate})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := wsjson.Write(ctx, config.ws, gin.H{"type": "new-ice-candidate", "data": candidate})
cancel()
if err != nil {
scopedLogger.Warn().Err(err).Msg("failed to write buffered new-ice-candidate to WebRTC signaling channel")
}