kvm/session_manager.go

1725 lines
52 KiB
Go

package kvm
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/rs/zerolog"
)
// SessionMode and constants are now imported from internal/session via session_permissions.go
// Session validation constants
const (
minNicknameLength = 2
maxNicknameLength = 30
maxIdentityLength = 256
)
// Timing constants for session management
const (
// Broadcast throttling (DoS protection)
globalBroadcastDelay = 100 * time.Millisecond // Minimum time between global session broadcasts
sessionBroadcastDelay = 50 * time.Millisecond // Minimum time between broadcasts to a single session
// Session timeout defaults
defaultPendingSessionTimeout = 1 * time.Minute // Timeout for pending sessions (DoS protection)
defaultObserverSessionTimeout = 2 * time.Minute // Timeout for inactive observer sessions
disabledTimeoutValue = 24 * time.Hour // Value used when timeout is disabled (0 setting)
// Transfer and blacklist settings
transferBlacklistDuration = 60 * time.Second // Duration to blacklist sessions after manual transfer
// Grace period limits
maxGracePeriodEntries = 10 // Maximum number of grace period entries to prevent memory exhaustion
// Emergency promotion limits (DoS protection)
emergencyWindowDuration = 60 * time.Second // Sliding window duration for emergency promotion rate limiting
maxEmergencyPromotionsPerMinute = 3 // Maximum emergency promotions allowed within the sliding window
emergencyPromotionCooldown = 10 * time.Second // Minimum time between individual emergency promotions
maxConsecutiveEmergencyPromotions = 3 // Maximum consecutive emergency promotions before blocking
emergencyPromotionWindowCleanupAge = 60 * time.Second // Age at which emergency window entries are cleaned up
// Trust scoring constants
invalidSessionTrustScore = -1000 // Trust score for non-existent sessions
)
var (
ErrMaxSessionsReached = errors.New("maximum number of sessions reached")
)
type SessionData struct {
ID string `json:"id"`
Mode SessionMode `json:"mode"`
Source string `json:"source"`
Identity string `json:"identity"`
Nickname string `json:"nickname,omitempty"`
CreatedAt time.Time `json:"created_at"`
LastActive time.Time `json:"last_active"`
}
// Event types for JSON-RPC notifications
type (
SessionsUpdateEvent struct {
Sessions []SessionData `json:"sessions"`
YourMode SessionMode `json:"yourMode"`
}
NewSessionPendingEvent struct {
SessionID string `json:"sessionId"`
Source string `json:"source"`
Identity string `json:"identity"`
Nickname string `json:"nickname,omitempty"`
}
PrimaryRequestEvent struct {
RequestID string `json:"requestId"`
Source string `json:"source"`
Identity string `json:"identity"`
Nickname string `json:"nickname,omitempty"`
}
)
// TransferBlacklistEntry prevents recently demoted sessions from immediately becoming primary again
type TransferBlacklistEntry struct {
SessionID string
ExpiresAt time.Time
}
type SessionManager struct {
mu sync.RWMutex
primaryPromotionLock sync.Mutex
primaryTimeout time.Duration
logger *zerolog.Logger
sessions map[string]*Session
nicknameIndex map[string]*Session
reconnectGrace map[string]time.Time
reconnectInfo map[string]*SessionData
transferBlacklist []TransferBlacklistEntry
queueOrder []string
primarySessionID string
lastPrimaryID string
maxSessions int
cleanupCancel context.CancelFunc
lastEmergencyPromotion time.Time
consecutiveEmergencyPromotions int
emergencyPromotionWindow []time.Time
emergencyWindowMutex sync.Mutex
lastBroadcast time.Time
broadcastMutex sync.Mutex
broadcastQueue chan struct{}
broadcastPending atomic.Bool
}
// NewSessionManager creates a new session manager
func NewSessionManager(logger *zerolog.Logger) *SessionManager {
// Use configuration values if available
maxSessions := 10
primaryTimeout := 5 * time.Minute
if config != nil && config.MultiSession != nil {
if config.MultiSession.MaxSessions > 0 {
maxSessions = config.MultiSession.MaxSessions
}
if config.MultiSession.PrimaryTimeout > 0 {
primaryTimeout = time.Duration(config.MultiSession.PrimaryTimeout) * time.Second
}
}
// Override with session settings if available
if currentSessionSettings != nil {
if currentSessionSettings.PrimaryTimeout > 0 {
primaryTimeout = time.Duration(currentSessionSettings.PrimaryTimeout) * time.Second
}
if currentSessionSettings.MaxSessions > 0 {
maxSessions = currentSessionSettings.MaxSessions
}
}
sm := &SessionManager{
sessions: make(map[string]*Session),
nicknameIndex: make(map[string]*Session),
reconnectGrace: make(map[string]time.Time),
reconnectInfo: make(map[string]*SessionData),
transferBlacklist: make([]TransferBlacklistEntry, 0),
queueOrder: make([]string, 0),
logger: logger,
maxSessions: maxSessions,
primaryTimeout: primaryTimeout,
broadcastQueue: make(chan struct{}, 100),
}
ctx, cancel := context.WithCancel(context.Background())
sm.cleanupCancel = cancel
go sm.cleanupInactiveSessions(ctx)
go sm.broadcastWorker(ctx)
return sm
}
func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSettings) error {
// Basic input validation
if session == nil {
sm.logger.Error().Msg("AddSession: session is nil")
return errors.New("session cannot be nil")
}
// Validate nickname if provided (matching frontend validation)
if session.Nickname != "" {
if len(session.Nickname) < minNicknameLength {
return fmt.Errorf("nickname must be at least %d characters", minNicknameLength)
}
if len(session.Nickname) > maxNicknameLength {
return fmt.Errorf("nickname must be %d characters or less", maxNicknameLength)
}
// Note: Pattern validation is done in RPC layer, not here for performance
}
if len(session.Identity) > maxIdentityLength {
return fmt.Errorf("identity too long (max %d characters)", maxIdentityLength)
}
sm.mu.Lock()
defer sm.mu.Unlock()
nicknameReserved := false
defer func() {
if r := recover(); r != nil {
if nicknameReserved && session.Nickname != "" {
if sm.nicknameIndex[session.Nickname] == session {
delete(sm.nicknameIndex, session.Nickname)
}
}
panic(r)
}
}()
if session.Nickname != "" {
if existingSession, exists := sm.nicknameIndex[session.Nickname]; exists {
if existingSession.ID != session.ID {
return fmt.Errorf("nickname '%s' is already in use by another session", session.Nickname)
}
}
sm.nicknameIndex[session.Nickname] = session
nicknameReserved = true
}
wasWithinGracePeriod := false
wasPreviouslyPrimary := false
wasPreviouslyPending := false
if graceTime, exists := sm.reconnectGrace[session.ID]; exists {
if time.Now().Before(graceTime) {
wasWithinGracePeriod = true
wasPreviouslyPrimary = (sm.lastPrimaryID == session.ID)
if reconnectInfo, hasInfo := sm.reconnectInfo[session.ID]; hasInfo {
wasPreviouslyPending = (reconnectInfo.Mode == SessionModePending)
}
}
}
// Check if a session with this ID already exists (reconnection)
if existing, exists := sm.sessions[session.ID]; exists {
if existing.Identity != session.Identity || existing.Source != session.Source {
return fmt.Errorf("session ID already in use by different user (identity mismatch)")
}
// Close old connection to prevent multiple active connections for same session ID
if existing.peerConnection != nil {
existing.peerConnection.Close()
}
// Update the existing session with new connection details
existing.peerConnection = session.peerConnection
existing.VideoTrack = session.VideoTrack
existing.ControlChannel = session.ControlChannel
existing.RPCChannel = session.RPCChannel
existing.HidChannel = session.HidChannel
existing.flushCandidates = session.flushCandidates
// Preserve mode and nickname
session.Mode = existing.Mode
session.Nickname = existing.Nickname
session.CreatedAt = existing.CreatedAt
// Ensure session has auto-generated nickname if needed
sm.ensureNickname(session)
if !nicknameReserved && session.Nickname != "" {
sm.nicknameIndex[session.Nickname] = session
}
sm.sessions[session.ID] = session
// If this was the primary, try to restore primary status
if existing.Mode == SessionModePrimary {
isBlacklisted := sm.isSessionBlacklisted(session.ID)
// SECURITY: Prevent dual-primary window - only restore if no other primary exists
primaryExists := sm.primarySessionID != "" && sm.sessions[sm.primarySessionID] != nil
if sm.lastPrimaryID == session.ID && !isBlacklisted && !primaryExists {
sm.primarySessionID = session.ID
sm.lastPrimaryID = ""
delete(sm.reconnectGrace, session.ID)
} else {
// Grace period expired, another session took over, or primary already exists
session.Mode = SessionModeObserver
}
}
go sm.broadcastSessionListUpdate()
return nil
}
if len(sm.sessions) >= sm.maxSessions {
return ErrMaxSessionsReached
}
// Generate ID if not set
if session.ID == "" {
session.ID = uuid.New().String()
}
// Set nickname from client settings if provided
if clientSettings != nil && clientSettings.Nickname != "" {
session.Nickname = clientSettings.Nickname
}
// Use global settings for requirements (not client-provided)
globalSettings := currentSessionSettings
primaryExists := sm.primarySessionID != "" && sm.sessions[sm.primarySessionID] != nil
// Check if there's an active grace period for a primary session (different from this session)
hasActivePrimaryGracePeriod := false
if sm.lastPrimaryID != "" && sm.lastPrimaryID != session.ID {
if graceTime, exists := sm.reconnectGrace[sm.lastPrimaryID]; exists {
if time.Now().Before(graceTime) {
if reconnectInfo, hasInfo := sm.reconnectInfo[sm.lastPrimaryID]; hasInfo {
if reconnectInfo.Mode == SessionModePrimary {
hasActivePrimaryGracePeriod = true
}
}
}
}
}
isBlacklisted := sm.isSessionBlacklisted(session.ID)
isOnlySession := len(sm.sessions) == 0
// Determine if this session should become primary
canBecomePrimary := !primaryExists && !hasActivePrimaryGracePeriod
isReconnectingPrimary := wasWithinGracePeriod && wasPreviouslyPrimary
isNewEligibleSession := !wasWithinGracePeriod && (!isBlacklisted || isOnlySession)
shouldBecomePrimary := canBecomePrimary && (isReconnectingPrimary || isNewEligibleSession)
if shouldBecomePrimary {
if sm.primarySessionID == "" || sm.sessions[sm.primarySessionID] == nil {
session.Mode = SessionModePrimary
sm.primarySessionID = session.ID
sm.lastPrimaryID = ""
// Clear all existing grace periods when a new primary is established
for oldSessionID := range sm.reconnectGrace {
delete(sm.reconnectGrace, oldSessionID)
}
for oldSessionID := range sm.reconnectInfo {
delete(sm.reconnectInfo, oldSessionID)
}
session.hidRPCAvailable = false
} else {
session.Mode = SessionModeObserver
}
} else if wasPreviouslyPending {
session.Mode = SessionModePending
} else if globalSettings != nil && globalSettings.RequireApproval && primaryExists && !wasWithinGracePeriod {
session.Mode = SessionModePending
// Notify primary about the pending session, but only if nickname is not required OR already provided
if primary := sm.sessions[sm.primarySessionID]; primary != nil {
// Check if nickname is required and missing
requiresNickname := globalSettings.RequireNickname
hasNickname := session.Nickname != "" && len(session.Nickname) > 0
// Only send approval request if nickname is not required OR already provided
if !requiresNickname || hasNickname {
go func() {
writeJSONRPCEvent("newSessionPending", map[string]interface{}{
"sessionId": session.ID,
"source": session.Source,
"identity": session.Identity,
"nickname": session.Nickname,
}, primary)
}()
}
// If nickname is required and missing, the approval request will be sent
// later when updateSessionNickname is called (see jsonrpc.go:232-242)
}
} else {
// No primary exists and approval is required, OR approval is not required
// In either case, this session becomes an observer
session.Mode = SessionModeObserver
}
session.CreatedAt = time.Now()
session.LastActive = time.Now()
sm.sessions[session.ID] = session
sm.logger.Info().
Str("sessionID", session.ID).
Str("mode", string(session.Mode)).
Int("totalSessions", len(sm.sessions)).
Msg("Session added to manager")
sm.ensureNickname(session)
if !nicknameReserved && session.Nickname != "" {
sm.nicknameIndex[session.Nickname] = session
}
sm.validateSinglePrimary()
// Clean up grace period after validation completes
if wasWithinGracePeriod {
delete(sm.reconnectGrace, session.ID)
delete(sm.reconnectInfo, session.ID)
}
// Notify all sessions about the new connection
go sm.broadcastSessionListUpdate()
return nil
}
// RemoveSession removes a session from the manager
func (sm *SessionManager) RemoveSession(sessionID string) {
sm.mu.Lock()
defer sm.mu.Unlock()
session, exists := sm.sessions[sessionID]
if !exists {
return
}
wasPrimary := session.Mode == SessionModePrimary
delete(sm.sessions, sessionID)
if session.Nickname != "" {
if sm.nicknameIndex[session.Nickname] == session {
delete(sm.nicknameIndex, session.Nickname)
}
}
sm.logger.Info().
Str("sessionID", sessionID).
Bool("wasPrimary", wasPrimary).
Int("remainingSessions", len(sm.sessions)).
Msg("Session removed from manager")
sm.removeFromQueue(sessionID)
// Check if this session was marked for immediate removal (intentional logout)
isIntentionalLogout := false
if graceTime, exists := sm.reconnectGrace[sessionID]; exists {
if time.Now().After(graceTime) {
isIntentionalLogout = true
delete(sm.reconnectGrace, sessionID)
delete(sm.reconnectInfo, sessionID)
}
}
// Determine grace period duration (used for logging even if intentional logout)
gracePeriod := 10
if currentSessionSettings != nil && currentSessionSettings.ReconnectGrace > 0 {
gracePeriod = currentSessionSettings.ReconnectGrace
}
// Only add grace period if this is NOT an intentional logout
if !isIntentionalLogout {
// Limit grace period entries to prevent memory exhaustion
// Evict the entry that will expire soonest (oldest expiration time)
for len(sm.reconnectGrace) >= maxGracePeriodEntries {
var evictID string
var earliestExpiration time.Time
for id, graceTime := range sm.reconnectGrace {
// Find the grace period that expires first (earliest time)
if earliestExpiration.IsZero() || graceTime.Before(earliestExpiration) {
evictID = id
earliestExpiration = graceTime
}
}
if evictID != "" {
delete(sm.reconnectGrace, evictID)
delete(sm.reconnectInfo, evictID)
} else {
break
}
}
sm.reconnectGrace[sessionID] = time.Now().Add(time.Duration(gracePeriod) * time.Second)
// Store session info for potential reconnection
sm.reconnectInfo[sessionID] = &SessionData{
ID: session.ID,
Mode: session.Mode,
Source: session.Source,
Identity: session.Identity,
Nickname: session.Nickname,
CreatedAt: session.CreatedAt,
}
}
// If this was the primary session, clear primary slot and track for grace period
if wasPrimary {
if isIntentionalLogout {
// Intentional logout: clear immediately and promote right away
sm.primarySessionID = ""
sm.lastPrimaryID = ""
sm.logger.Info().
Str("sessionID", sessionID).
Int("remainingSessions", len(sm.sessions)).
Msg("Primary session removed via intentional logout - immediate promotion")
} else {
// Accidental disconnect: use grace period
sm.lastPrimaryID = sessionID // Remember this was the primary for grace period
sm.primarySessionID = "" // Clear primary slot so other sessions can be promoted
// Clear all blacklists to allow promotion after grace period expires
if len(sm.transferBlacklist) > 0 {
sm.transferBlacklist = make([]TransferBlacklistEntry, 0)
}
sm.logger.Info().
Str("sessionID", sessionID).
Dur("gracePeriod", time.Duration(gracePeriod)*time.Second).
Int("remainingSessions", len(sm.sessions)).
Msg("Primary session removed, grace period active")
}
// Trigger validation for potential promotion
if len(sm.sessions) > 0 {
sm.validateSinglePrimary()
}
}
// Notify remaining sessions
go sm.broadcastSessionListUpdate()
}
// GetSession returns a session by ID
func (sm *SessionManager) GetSession(sessionID string) *Session {
sm.mu.RLock()
session := sm.sessions[sessionID]
sm.mu.RUnlock()
return session
}
// IsValidReconnection checks if a session ID can be reused for reconnection
func (sm *SessionManager) IsValidReconnection(sessionID, source, identity string) bool {
sm.mu.RLock()
defer sm.mu.RUnlock()
// Check if session is in reconnect grace period
if info, exists := sm.reconnectInfo[sessionID]; exists {
// Verify the source and identity match
return info.Source == source && info.Identity == identity
}
return false
}
// IsInGracePeriod checks if a session ID is within the reconnection grace period
func (sm *SessionManager) IsInGracePeriod(sessionID string) bool {
sm.mu.RLock()
defer sm.mu.RUnlock()
if graceTime, exists := sm.reconnectGrace[sessionID]; exists {
return time.Now().Before(graceTime)
}
return false
}
// ClearGracePeriod removes the grace period for a session (for intentional logout/disconnect)
// This marks the session for immediate removal without grace period protection
// Actual promotion will happen in RemoveSession when it detects no grace period
func (sm *SessionManager) ClearGracePeriod(sessionID string) {
sm.mu.Lock()
defer sm.mu.Unlock()
// Clear grace period and reconnect info to prevent grace period from being added
delete(sm.reconnectGrace, sessionID)
delete(sm.reconnectInfo, sessionID)
// Mark this session with a special "immediate removal" grace period (already expired)
// This signals to RemoveSession that this was intentional and should skip grace period
sm.reconnectGrace[sessionID] = time.Now().Add(-1 * time.Second) // Already expired
sm.logger.Info().
Str("sessionID", sessionID).
Str("lastPrimaryID", sm.lastPrimaryID).
Str("primarySessionID", sm.primarySessionID).
Msg("Marked session for immediate removal (intentional logout)")
}
// isSessionBlacklisted checks if a session was recently demoted via transfer and should not become primary
func (sm *SessionManager) isSessionBlacklisted(sessionID string) bool {
now := time.Now()
isBlacklisted := false
// Clean expired entries in-place (zero allocations)
writeIndex := 0
for readIndex := 0; readIndex < len(sm.transferBlacklist); readIndex++ {
entry := sm.transferBlacklist[readIndex]
if now.Before(entry.ExpiresAt) {
// Keep this entry - still valid
sm.transferBlacklist[writeIndex] = entry
writeIndex++
if entry.SessionID == sessionID {
isBlacklisted = true
}
}
// Expired entries are automatically skipped (not copied forward)
}
// Truncate to only valid entries
sm.transferBlacklist = sm.transferBlacklist[:writeIndex]
return isBlacklisted
}
// GetPrimarySession returns the current primary session
func (sm *SessionManager) GetPrimarySession() *Session {
sm.mu.RLock()
if sm.primarySessionID == "" {
sm.mu.RUnlock()
return nil
}
session := sm.sessions[sm.primarySessionID]
sm.mu.RUnlock()
return session
}
// SetPrimarySession sets a session as primary
func (sm *SessionManager) SetPrimarySession(sessionID string) error {
sm.mu.Lock()
defer sm.mu.Unlock()
session, exists := sm.sessions[sessionID]
if !exists {
return ErrSessionNotFound
}
session.Mode = SessionModePrimary
sm.primarySessionID = sessionID
sm.lastPrimaryID = ""
return nil
}
// CanReceiveVideo checks if a session is allowed to receive video
// Sessions in pending state cannot receive video
// Sessions that require nickname but don't have one also cannot receive video (if enforced)
func (sm *SessionManager) CanReceiveVideo(session *Session, settings *SessionSettings) bool {
// Check if session has video view permission
if !session.HasPermission(PermissionVideoView) {
return false
}
// If nickname is required and session doesn't have one, block video
if settings != nil && settings.RequireNickname && session.Nickname == "" {
return false
}
return true
}
// GetAllSessions returns information about all active sessions
func (sm *SessionManager) GetAllSessions() []SessionData {
sm.mu.RLock()
defer sm.mu.RUnlock()
// Don't run validation on every getSessions call
// This was causing immediate demotion during transfers and page refreshes
// Validation should only run during state changes, not data queries
infos := make([]SessionData, 0, len(sm.sessions))
for _, session := range sm.sessions {
infos = append(infos, SessionData{
ID: session.ID,
Mode: session.Mode,
Source: session.Source,
Identity: session.Identity,
Nickname: session.Nickname,
CreatedAt: session.CreatedAt,
LastActive: session.LastActive,
})
}
return infos
}
// RequestPrimary requests primary control for a session
func (sm *SessionManager) RequestPrimary(sessionID string) error {
sm.mu.Lock()
defer sm.mu.Unlock()
session, exists := sm.sessions[sessionID]
if !exists {
return ErrSessionNotFound
}
// If already primary, nothing to do
if session.Mode == SessionModePrimary {
return nil
}
// Check if there's a primary in grace period before promoting
if sm.primarySessionID == "" {
// Don't promote immediately if there's a primary waiting in grace period
if sm.lastPrimaryID != "" {
// Check if grace period is still active
if graceTime, exists := sm.reconnectGrace[sm.lastPrimaryID]; exists {
if time.Now().Before(graceTime) {
// Primary is in grace period, queue this request instead
sm.queueOrder = append(sm.queueOrder, sessionID)
session.Mode = SessionModeQueued
sm.logger.Info().
Str("sessionID", sessionID).
Str("gracePrimaryID", sm.lastPrimaryID).
Msg("Request queued - primary session in grace period")
go sm.broadcastSessionListUpdate()
return nil
}
}
}
// No grace period conflict, promote immediately using centralized system
err := sm.transferPrimaryRole("", sessionID, "initial_promotion", "first session auto-promotion")
if err == nil {
// Send mode change event after promoting
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "primary"}, session)
go sm.broadcastSessionListUpdate()
}
return err
}
// Notify the primary session about the request
if primarySession, exists := sm.sessions[sm.primarySessionID]; exists {
event := PrimaryRequestEvent{
RequestID: sessionID,
Identity: session.Identity,
Source: session.Source,
Nickname: session.Nickname,
}
writeJSONRPCEvent("primaryControlRequested", event, primarySession)
}
// Add to queue if not already there
if session.Mode != SessionModeQueued {
session.Mode = SessionModeQueued
sm.queueOrder = append(sm.queueOrder, sessionID)
}
// Broadcast update in goroutine to avoid deadlock
go sm.broadcastSessionListUpdate()
return nil
}
// ReleasePrimary releases primary control from a session
func (sm *SessionManager) ReleasePrimary(sessionID string) error {
sm.mu.Lock()
defer sm.mu.Unlock()
session, exists := sm.sessions[sessionID]
if !exists {
return ErrSessionNotFound
}
if session.Mode != SessionModePrimary {
return nil
}
// Check if there are other sessions that could take control
hasOtherEligibleSessions := false
for id, s := range sm.sessions {
if id != sessionID && (s.Mode == SessionModeObserver || s.Mode == SessionModeQueued) {
hasOtherEligibleSessions = true
break
}
}
// Don't allow releasing primary if no one else can take control
if !hasOtherEligibleSessions {
return errors.New("cannot release primary control - no other sessions available")
}
// Demote to observer
session.Mode = SessionModeObserver
sm.primarySessionID = ""
// Clear any active input state
sm.clearInputState()
// Find the next session to promote (excluding the current primary)
// For voluntary releases, ignore blacklisting since this is user-initiated
promotedSessionID := sm.findNextSessionToPromoteExcludingIgnoreBlacklist(sessionID)
// If we found someone to promote, use centralized transfer
if promotedSessionID != "" {
err := sm.transferPrimaryRole(sessionID, promotedSessionID, "release_transfer", "primary release and auto-promotion")
if err != nil {
sm.logger.Error().
Str("error", err.Error()).
Str("releasedBySessionID", sessionID).
Str("promotedSessionID", promotedSessionID).
Msg("Failed to transfer primary role after release")
return err
}
sm.logger.Info().
Str("releasedBySessionID", sessionID).
Str("promotedSessionID", promotedSessionID).
Msg("Primary control released and transferred to observer")
// Send mode change event for promoted session
go func() {
if promotedSession := sessionManager.GetSession(promotedSessionID); promotedSession != nil {
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "primary"}, promotedSession)
}
}()
} else {
sm.logger.Warn().
Str("releasedBySessionID", sessionID).
Msg("Primary control released but no eligible sessions found for promotion")
}
// Broadcast update in goroutine to avoid deadlock
go sm.broadcastSessionListUpdate()
return nil
}
// TransferPrimary transfers primary control from one session to another
func (sm *SessionManager) TransferPrimary(fromID, toID string) error {
sm.mu.Lock()
defer sm.mu.Unlock()
// SECURITY: Verify fromID is the actual current primary
if sm.primarySessionID != fromID {
return fmt.Errorf("transfer denied: %s is not the current primary (current primary: %s)", fromID, sm.primarySessionID)
}
fromSession, exists := sm.sessions[fromID]
if !exists {
return ErrSessionNotFound
}
if fromSession.Mode != SessionModePrimary {
return errors.New("transfer denied: from session is not in primary mode")
}
// Use centralized transfer method
err := sm.transferPrimaryRole(fromID, toID, "direct_transfer", "manual transfer request")
if err != nil {
return err
}
// Send events in goroutines to avoid holding lock
go func() {
if fromSession := sessionManager.GetSession(fromID); fromSession != nil {
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "observer"}, fromSession)
}
}()
go func() {
if toSession := sessionManager.GetSession(toID); toSession != nil {
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "primary"}, toSession)
}
sm.broadcastSessionListUpdate()
}()
return nil
}
// ApprovePrimaryRequest approves a pending primary control request
func (sm *SessionManager) ApprovePrimaryRequest(currentPrimaryID, requesterID string) error {
sm.mu.Lock()
defer sm.mu.Unlock()
// Log the approval request
sm.logger.Info().
Str("currentPrimaryID", currentPrimaryID).
Str("requesterID", requesterID).
Str("actualPrimaryID", sm.primarySessionID).
Msg("ApprovePrimaryRequest called")
// Verify current primary is correct
if sm.primarySessionID != currentPrimaryID {
sm.logger.Error().
Str("currentPrimaryID", currentPrimaryID).
Str("actualPrimaryID", sm.primarySessionID).
Msg("Not the primary session")
return errors.New("not the primary session")
}
// SECURITY: Verify requester session exists and is in Queued mode
requesterSession, exists := sm.sessions[requesterID]
if !exists {
sm.logger.Error().
Str("requesterID", requesterID).
Msg("Requester session not found")
return errors.New("requester session not found")
}
if requesterSession.Mode != SessionModeQueued {
sm.logger.Error().
Str("requesterID", requesterID).
Str("actualMode", string(requesterSession.Mode)).
Msg("Requester session is not in queued mode")
return fmt.Errorf("requester session is not in queued mode (current mode: %s)", requesterSession.Mode)
}
// Remove requester from queue
sm.removeFromQueue(requesterID)
// Use centralized transfer method
err := sm.transferPrimaryRole(currentPrimaryID, requesterID, "approval_transfer", "primary approval request")
if err != nil {
return err
}
// Send events after releasing lock to avoid deadlock
go func() {
if demotedSession := sessionManager.GetSession(currentPrimaryID); demotedSession != nil {
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "observer"}, demotedSession)
}
}()
go func() {
if promotedSession := sessionManager.GetSession(requesterID); promotedSession != nil {
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "primary"}, promotedSession)
}
sm.broadcastSessionListUpdate()
}()
return nil
}
// DenyPrimaryRequest denies a pending primary control request
func (sm *SessionManager) DenyPrimaryRequest(currentPrimaryID, requesterID string) error {
sm.mu.Lock()
defer sm.mu.Unlock()
// Verify current primary is correct
if sm.primarySessionID != currentPrimaryID {
return errors.New("not the primary session")
}
requester, exists := sm.sessions[requesterID]
if !exists {
return ErrSessionNotFound
}
// Move requester back to observer
requester.Mode = SessionModeObserver
sm.removeFromQueue(requesterID)
// Validate session consistency after mode change
sm.validateSinglePrimary()
// Notify requester of denial in goroutine
go func() {
writeJSONRPCEvent("primaryControlDenied", map[string]interface{}{}, requester)
sm.broadcastSessionListUpdate()
}()
return nil
}
// ApproveSession approves a pending session (thread-safe)
func (sm *SessionManager) ApproveSession(sessionID string) error {
sm.mu.Lock()
defer sm.mu.Unlock()
session, exists := sm.sessions[sessionID]
if !exists {
return ErrSessionNotFound
}
if session.Mode != SessionModePending {
return errors.New("session is not in pending mode")
}
// Promote session to observer
session.Mode = SessionModeObserver
sm.logger.Info().
Str("sessionID", sessionID).
Msg("Session approved and promoted to observer")
return nil
}
// DenySession denies a pending session (thread-safe)
func (sm *SessionManager) DenySession(sessionID string) error {
sm.mu.Lock()
defer sm.mu.Unlock()
session, exists := sm.sessions[sessionID]
if !exists {
return ErrSessionNotFound
}
if session.Mode != SessionModePending {
return errors.New("session is not in pending mode")
}
sm.logger.Info().
Str("sessionID", sessionID).
Msg("Session denied - notifying session")
return nil
}
// ForEachSession executes a function for each active session
func (sm *SessionManager) ForEachSession(fn func(*Session)) {
sm.mu.RLock()
// Create a copy of sessions to avoid holding lock during callbacks
sessionsCopy := make([]*Session, 0, len(sm.sessions))
for _, session := range sm.sessions {
sessionsCopy = append(sessionsCopy, session)
}
sm.mu.RUnlock()
// Call function outside of lock to prevent deadlocks
for _, session := range sessionsCopy {
fn(session)
}
}
// UpdateLastActive updates the last active time for a session
func (sm *SessionManager) UpdateLastActive(sessionID string) {
sm.mu.Lock()
if session, exists := sm.sessions[sessionID]; exists {
session.LastActive = time.Now()
}
sm.mu.Unlock()
}
// Internal helper methods
// validateSinglePrimary ensures there's only one primary session and fixes any inconsistencies
func (sm *SessionManager) validateSinglePrimary() {
primarySessions := make([]*Session, 0)
// Find all sessions that think they're primary
for _, session := range sm.sessions {
if session.Mode == SessionModePrimary {
primarySessions = append(primarySessions, session)
}
}
// If we have multiple primaries, fix it
if len(primarySessions) > 1 {
sm.logger.Error().
Int("primaryCount", len(primarySessions)).
Msg("Multiple primary sessions detected, fixing")
// Keep the first one as primary, demote the rest
for i, session := range primarySessions {
if i == 0 {
sm.primarySessionID = session.ID
} else {
session.Mode = SessionModeObserver
}
}
}
// Ensure manager's primarySessionID matches reality
if len(primarySessions) == 1 && sm.primarySessionID != primarySessions[0].ID {
sm.logger.Warn().
Str("managerPrimaryID", sm.primarySessionID).
Str("actualPrimaryID", primarySessions[0].ID).
Msg("Manager primary ID mismatch, fixing...")
sm.primarySessionID = primarySessions[0].ID
}
// Don't clear primary slot if there's a grace period active
if len(primarySessions) == 0 && sm.primarySessionID != "" {
if sm.lastPrimaryID == sm.primarySessionID {
if graceTime, exists := sm.reconnectGrace[sm.primarySessionID]; exists {
if time.Now().Before(graceTime) {
return // Keep primary slot reserved during grace period
}
}
}
sm.primarySessionID = ""
}
// Check if there's an active grace period for any primary session
hasActivePrimaryGracePeriod := false
for sessionID, graceTime := range sm.reconnectGrace {
if time.Now().Before(graceTime) {
if reconnectInfo, hasInfo := sm.reconnectInfo[sessionID]; hasInfo {
if reconnectInfo.Mode == SessionModePrimary {
hasActivePrimaryGracePeriod = true
break
}
}
}
}
// Auto-promote if there are NO primary sessions at all AND no active grace period
if len(primarySessions) == 0 && sm.primarySessionID == "" && len(sm.sessions) > 0 && !hasActivePrimaryGracePeriod {
// Find a session to promote to primary
nextSessionID := sm.findNextSessionToPromote()
if nextSessionID != "" {
sm.logger.Info().
Str("promotedSessionID", nextSessionID).
Msg("Auto-promoting observer to primary - no primary sessions exist and no grace period active")
// Use the centralized promotion logic
err := sm.transferPrimaryRole("", nextSessionID, "emergency_auto_promotion", "no primary sessions detected")
if err != nil {
sm.logger.Error().
Err(err).
Str("sessionID", nextSessionID).
Msg("Failed to auto-promote session to primary")
}
} else {
sm.logger.Warn().
Msg("No eligible session found for emergency auto-promotion")
}
}
}
func (sm *SessionManager) transferPrimaryRole(fromSessionID, toSessionID, transferType, context string) error {
sm.primaryPromotionLock.Lock()
defer sm.primaryPromotionLock.Unlock()
// Validate sessions exist
toSession, toExists := sm.sessions[toSessionID]
if !toExists {
return ErrSessionNotFound
}
// SECURITY: Prevent promoting a session that's already primary
if toSession.Mode == SessionModePrimary {
sm.logger.Warn().
Str("sessionID", toSessionID).
Str("transferType", transferType).
Msg("Attempted to promote session that is already primary")
return errors.New("target session is already primary")
}
var fromSession *Session
var fromExists bool
if fromSessionID != "" {
fromSession, fromExists = sm.sessions[fromSessionID]
if !fromExists {
return ErrSessionNotFound
}
}
// Demote existing primary if specified
if fromExists && fromSession.Mode == SessionModePrimary {
fromSession.Mode = SessionModeObserver
fromSession.hidRPCAvailable = false
// Always delete grace period when demoting - no exceptions
// If a session times out or is manually transferred, it should not auto-reclaim primary
delete(sm.reconnectGrace, fromSessionID)
delete(sm.reconnectInfo, fromSessionID)
sm.logger.Info().
Str("demotedSessionID", fromSessionID).
Str("transferType", transferType).
Str("context", context).
Msg("Demoted existing primary session")
}
primaryCount := 0
var existingPrimaryID string
for id, sess := range sm.sessions {
if sess.Mode == SessionModePrimary {
primaryCount++
if id != toSessionID {
existingPrimaryID = id
}
}
}
if primaryCount > 1 || (primaryCount == 1 && existingPrimaryID != "" && existingPrimaryID != sm.primarySessionID) {
sm.logger.Error().
Int("primaryCount", primaryCount).
Str("existingPrimaryID", existingPrimaryID).
Str("targetPromotionID", toSessionID).
Str("managerPrimaryID", sm.primarySessionID).
Str("transferType", transferType).
Msg("CRITICAL: Dual-primary corruption detected - forcing fix")
for id, sess := range sm.sessions {
if sess.Mode == SessionModePrimary {
if id != sm.primarySessionID && id != toSessionID {
sess.Mode = SessionModeObserver
sm.logger.Warn().
Str("demotedSessionID", id).
Msg("Force-demoted session due to dual-primary corruption")
}
}
}
if sm.primarySessionID != "" && sm.sessions[sm.primarySessionID] != nil {
if sm.sessions[sm.primarySessionID].Mode != SessionModePrimary {
sm.primarySessionID = ""
}
}
existingPrimaryID = ""
for id, sess := range sm.sessions {
if id != toSessionID && sess.Mode == SessionModePrimary {
existingPrimaryID = id
break
}
}
if existingPrimaryID != "" {
sm.logger.Error().
Str("existingPrimaryID", existingPrimaryID).
Str("targetPromotionID", toSessionID).
Msg("CRITICAL: Cannot fix dual-primary corruption - blocking promotion")
return fmt.Errorf("cannot promote: dual-primary corruption detected and fix failed (%s)", existingPrimaryID)
}
} else if existingPrimaryID != "" {
sm.logger.Error().
Str("existingPrimaryID", existingPrimaryID).
Str("targetPromotionID", toSessionID).
Str("transferType", transferType).
Msg("CRITICAL: Attempted to create second primary - blocking promotion")
return fmt.Errorf("cannot promote: another primary session exists (%s)", existingPrimaryID)
}
// Promote target session
toSession.Mode = SessionModePrimary
toSession.hidRPCAvailable = false
// Reset LastActive for all emergency promotions to prevent immediate re-timeout
if strings.HasPrefix(transferType, "emergency_") {
toSession.LastActive = time.Now()
}
sm.primarySessionID = toSessionID
// ALWAYS set lastPrimaryID to the new primary to support WebRTC reconnections
// This allows the newly promoted session to handle page refreshes correctly
// The blacklist system prevents unwanted takeovers during manual transfers
sm.lastPrimaryID = toSessionID
// Clear input state
sm.clearInputState()
// Reset consecutive emergency promotion counter on successful manual transfer
if fromSessionID != "" && transferType != "emergency_promotion_deadlock_prevention" && transferType != "emergency_timeout_promotion" {
sm.consecutiveEmergencyPromotions = 0
}
// Apply bidirectional blacklisting - protect newly promoted session
// Only apply blacklisting for MANUAL transfers, not emergency promotions
// Emergency promotions need to happen immediately without blacklist interference
isManualTransfer := (transferType == "direct_transfer" || transferType == "approval_transfer" || transferType == "release_transfer")
now := time.Now()
blacklistedCount := 0
if isManualTransfer {
// First, clear any existing blacklist entries for the newly promoted session
cleanedBlacklist := make([]TransferBlacklistEntry, 0)
for _, entry := range sm.transferBlacklist {
if entry.SessionID != toSessionID { // Remove any old blacklist entries for the new primary
cleanedBlacklist = append(cleanedBlacklist, entry)
}
}
sm.transferBlacklist = cleanedBlacklist
// Then blacklist all other sessions
for sessionID := range sm.sessions {
if sessionID != toSessionID { // Don't blacklist the newly promoted session
sm.transferBlacklist = append(sm.transferBlacklist, TransferBlacklistEntry{
SessionID: sessionID,
ExpiresAt: now.Add(transferBlacklistDuration),
})
blacklistedCount++
}
}
}
// Grace periods are cleared for demoted sessions (line 519-520) to prevent them from
// auto-reclaiming primary after manual transfer. New grace periods are created when
// sessions reconnect via RemoveSession. The blacklist provides additional protection
// during the transfer window, while lastPrimaryID allows the newly promoted session
// to safely handle browser refreshes and reclaim primary if disconnected.
sm.logger.Info().
Str("fromSessionID", fromSessionID).
Str("toSessionID", toSessionID).
Str("transferType", transferType).
Str("context", context).
Int("blacklistedSessions", blacklistedCount).
Dur("blacklistDuration", transferBlacklistDuration).
Msg("Primary role transferred with bidirectional protection")
// DON'T validate here - causes recursive calls and map iteration issues
// The caller (AddSession, RemoveSession, etc.) will validate after we return
// sm.validateSinglePrimary() // REMOVED to prevent recursion
// Send reconnection signal for emergency promotions via WebSocket (more reliable than RPC when channel is stale)
if toExists && (transferType == "emergency_timeout_promotion" || transferType == "emergency_auto_promotion") {
go func() {
time.Sleep(globalBroadcastDelay)
eventData := map[string]interface{}{
"sessionId": toSessionID,
"newMode": string(toSession.Mode),
"reason": "session_promotion",
"action": "reconnect_required",
"timestamp": time.Now().Unix(),
}
err := toSession.sendWebSocketSignal("connectionModeChanged", eventData)
if err != nil {
sm.logger.Warn().Err(err).Str("sessionId", toSessionID).Msg("WebSocket signal failed, using RPC")
writeJSONRPCEvent("connectionModeChanged", eventData, toSession)
}
sm.logger.Info().Str("sessionId", toSessionID).Str("transferType", transferType).Msg("Sent reconnection signal")
}()
}
return nil
}
// findNextSessionToPromote finds the next eligible session for promotion
// Replicates the logic from promoteNextSession but just returns the session ID
func (sm *SessionManager) findNextSessionToPromote() string {
return sm.findNextSessionToPromoteExcluding("", true)
}
func (sm *SessionManager) findNextSessionToPromoteExcluding(excludeSessionID string, checkBlacklist bool) string {
// First, check if there are queued sessions (excluding the specified session)
if len(sm.queueOrder) > 0 {
nextID := sm.queueOrder[0]
if nextID != excludeSessionID {
if _, exists := sm.sessions[nextID]; exists {
if !checkBlacklist || !sm.isSessionBlacklisted(nextID) {
return nextID
}
}
}
}
// Otherwise, find any observer session (excluding the specified session)
for id, session := range sm.sessions {
if id != excludeSessionID && session.Mode == SessionModeObserver {
if !checkBlacklist || !sm.isSessionBlacklisted(id) {
return id
}
}
}
// If still no primary and there are pending sessions (edge case: all sessions are pending)
// This can happen if RequireApproval was enabled but primary left
for id, session := range sm.sessions {
if id != excludeSessionID && session.Mode == SessionModePending {
if !checkBlacklist || !sm.isSessionBlacklisted(id) {
return id
}
}
}
return "" // No eligible session found
}
func (sm *SessionManager) findNextSessionToPromoteExcludingIgnoreBlacklist(excludeSessionID string) string {
return sm.findNextSessionToPromoteExcluding(excludeSessionID, false)
}
func (sm *SessionManager) removeFromQueue(sessionID string) {
// In-place removal is more efficient
for i, id := range sm.queueOrder {
if id == sessionID {
sm.queueOrder = append(sm.queueOrder[:i], sm.queueOrder[i+1:]...)
return
}
}
}
func (sm *SessionManager) clearInputState() {
// Clear keyboard state
if gadget != nil {
_ = gadget.KeyboardReport(0, []byte{0, 0, 0, 0, 0, 0})
}
}
// getCurrentPrimaryTimeout returns the current primary timeout duration
func (sm *SessionManager) getCurrentPrimaryTimeout() time.Duration {
// Use session settings if available
if currentSessionSettings != nil {
if currentSessionSettings.PrimaryTimeout == 0 {
return disabledTimeoutValue
} else if currentSessionSettings.PrimaryTimeout > 0 {
return time.Duration(currentSessionSettings.PrimaryTimeout) * time.Second
}
}
// Fall back to config or default
return sm.primaryTimeout
}
// getSessionTrustScore calculates a trust score for session selection during emergency promotion
func (sm *SessionManager) getSessionTrustScore(sessionID string) int {
session, exists := sm.sessions[sessionID]
if !exists {
return invalidSessionTrustScore
}
score := 0
now := time.Now()
// Longer session duration = more trust (up to 100 points for 100+ minutes)
sessionAge := now.Sub(session.CreatedAt)
score += int(sessionAge.Minutes())
if score > 100 {
score = 100 // Cap age bonus at 100 points
}
// Recently successful primary sessions get higher trust
if sm.lastPrimaryID == sessionID {
score += 50
}
// Observer mode is more trustworthy than queued/pending for emergency promotion
switch session.Mode {
case SessionModeObserver:
score += 20
case SessionModeQueued:
score += 10
case SessionModePending:
// Pending sessions get no bonus and are less preferred
score += 0
}
// Check if session has nickname when required (shows engagement)
if currentSessionSettings != nil && currentSessionSettings.RequireNickname {
if session.Nickname != "" {
score += 15
} else {
score -= 30 // Penalize sessions without required nickname
}
}
return score
}
// findMostTrustedSessionForEmergency finds the most trustworthy session for emergency promotion
func (sm *SessionManager) findMostTrustedSessionForEmergency() string {
bestSessionID := ""
bestScore := -1
for sessionID, session := range sm.sessions {
if sm.isSessionBlacklisted(sessionID) ||
session.Mode == SessionModePrimary ||
(session.Mode != SessionModeObserver && session.Mode != SessionModeQueued) {
continue
}
score := sm.getSessionTrustScore(sessionID)
if score > bestScore {
bestScore = score
bestSessionID = sessionID
}
}
if bestSessionID != "" {
sm.logger.Info().
Str("selectedSession", bestSessionID).
Int("trustScore", bestScore).
Msg("Selected most trusted session for emergency promotion")
}
return bestSessionID
}
// extractBrowserFromUserAgent extracts browser name from user agent string
func extractBrowserFromUserAgent(userAgent string) *string {
ua := strings.ToLower(userAgent)
// Check for common browsers (order matters - Chrome contains Safari, etc.)
if strings.Contains(ua, "edg/") || strings.Contains(ua, "edge") {
return &BrowserEdge
}
if strings.Contains(ua, "firefox") {
return &BrowserFirefox
}
if strings.Contains(ua, "chrome") {
return &BrowserChrome
}
if strings.Contains(ua, "safari") && !strings.Contains(ua, "chrome") {
return &BrowserSafari
}
if strings.Contains(ua, "opera") || strings.Contains(ua, "opr/") {
return &BrowserOpera
}
return &BrowserUnknown
}
// generateAutoNickname creates a user-friendly auto-generated nickname
func generateAutoNickname(session *Session) string {
// Use browser type from session, fallback to "user" if not set
browser := "user"
if session.Browser != nil {
browser = *session.Browser
}
// Use last 4 chars of session ID for uniqueness (lowercase)
sessionID := strings.ToLower(session.ID)
shortID := sessionID[len(sessionID)-4:]
// Generate contextual lowercase nickname
return fmt.Sprintf("u-%s-%s", browser, shortID)
}
// generateNicknameFromUserAgent creates a nickname from user agent (for frontend use)
func generateNicknameFromUserAgent(userAgent string) string {
// Extract browser info
browserPtr := extractBrowserFromUserAgent(userAgent)
browser := "user"
if browserPtr != nil {
browser = *browserPtr
}
// Generate a random 4-character ID (lowercase)
shortID := strings.ToLower(fmt.Sprintf("%04x", time.Now().UnixNano()%0xFFFF))
// Generate contextual lowercase nickname
return fmt.Sprintf("u-%s-%s", browser, shortID)
}
// ensureNickname ensures session has a nickname, auto-generating if needed
func (sm *SessionManager) ensureNickname(session *Session) {
// Skip if session already has a nickname
if session.Nickname != "" {
return
}
// Skip if nickname is required (user must set manually)
if currentSessionSettings != nil && currentSessionSettings.RequireNickname {
return
}
// Auto-generate nickname
session.Nickname = generateAutoNickname(session)
sm.logger.Debug().
Str("sessionID", session.ID).
Str("autoNickname", session.Nickname).
Msg("Auto-generated nickname for session")
}
// updateAllSessionNicknames updates nicknames for all sessions when settings change
func (sm *SessionManager) updateAllSessionNicknames() {
sm.mu.Lock()
defer sm.mu.Unlock()
updated := 0
for _, session := range sm.sessions {
oldNickname := session.Nickname
sm.ensureNickname(session)
if session.Nickname != oldNickname {
updated++
}
}
if updated > 0 {
sm.logger.Info().
Int("updatedSessions", updated).
Msg("Auto-generated nicknames for sessions after settings change")
// Broadcast the update
go sm.broadcastSessionListUpdate()
}
}
func (sm *SessionManager) broadcastWorker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-sm.broadcastQueue:
sm.broadcastPending.Store(false)
sm.executeBroadcast()
}
}
}
func (sm *SessionManager) broadcastSessionListUpdate() {
if sm.broadcastPending.CompareAndSwap(false, true) {
select {
case sm.broadcastQueue <- struct{}{}:
default:
sm.logger.Warn().
Int("queueLen", len(sm.broadcastQueue)).
Int("queueCap", cap(sm.broadcastQueue)).
Msg("Broadcast queue full, dropping update")
sm.broadcastPending.Store(false)
}
}
}
func (sm *SessionManager) executeBroadcast() {
sm.broadcastMutex.Lock()
if time.Since(sm.lastBroadcast) < globalBroadcastDelay {
sm.broadcastMutex.Unlock()
return
}
sm.lastBroadcast = time.Now()
sm.broadcastMutex.Unlock()
sm.mu.RLock()
infos := make([]SessionData, 0, len(sm.sessions))
activeSessions := make([]*Session, 0, len(sm.sessions))
for _, session := range sm.sessions {
infos = append(infos, SessionData{
ID: session.ID,
Mode: session.Mode,
Source: session.Source,
Identity: session.Identity,
Nickname: session.Nickname,
CreatedAt: session.CreatedAt,
LastActive: session.LastActive,
})
if session.RPCChannel != nil {
activeSessions = append(activeSessions, session)
}
}
sm.mu.RUnlock()
for _, session := range activeSessions {
session.lastBroadcastMu.Lock()
shouldSkip := time.Since(session.LastBroadcast) < sessionBroadcastDelay
if !shouldSkip {
session.LastBroadcast = time.Now()
}
session.lastBroadcastMu.Unlock()
if shouldSkip {
continue
}
event := SessionsUpdateEvent{
Sessions: infos,
YourMode: session.Mode,
}
writeJSONRPCEvent("sessionsUpdated", event, session)
}
}
// Shutdown stops the session manager and cleans up resources
func (sm *SessionManager) Shutdown() {
if sm.cleanupCancel != nil {
sm.cleanupCancel()
}
sm.mu.Lock()
defer sm.mu.Unlock()
close(sm.broadcastQueue)
for id := range sm.sessions {
delete(sm.sessions, id)
}
}
func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
validationCounter := 0
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
sm.mu.Lock()
now := time.Now()
needsBroadcast := false
// Clean up expired emergency promotion window entries
sm.emergencyWindowMutex.Lock()
cutoff := now.Add(-emergencyPromotionWindowCleanupAge)
validEntries := make([]time.Time, 0, len(sm.emergencyPromotionWindow))
for _, t := range sm.emergencyPromotionWindow {
if t.After(cutoff) {
validEntries = append(validEntries, t)
}
}
sm.emergencyPromotionWindow = validEntries
sm.emergencyWindowMutex.Unlock()
// Handle expired grace periods
gracePeriodExpired := sm.handleGracePeriodExpiration(now)
if gracePeriodExpired {
needsBroadcast = true
}
// Clean up timed-out pending sessions (DoS protection)
if sm.handlePendingSessionTimeout(now) {
needsBroadcast = true
}
// Clean up inactive observer sessions
if sm.handleObserverSessionCleanup(now) {
needsBroadcast = true
}
// Handle primary session timeout
if sm.handlePrimarySessionTimeout(now) {
needsBroadcast = true
}
// Run validation immediately if grace period expired, otherwise periodically
if gracePeriodExpired {
sm.validateSinglePrimary()
} else {
validationCounter++
if validationCounter >= 10 {
validationCounter = 0
sm.validateSinglePrimary()
}
}
sm.mu.Unlock()
if needsBroadcast {
go sm.broadcastSessionListUpdate()
}
}
}
}
// Global session manager instance
var (
sessionManager *SessionManager
sessionManagerOnce sync.Once
)
func initSessionManager() {
sessionManagerOnce.Do(func() {
sessionManager = NewSessionManager(websocketLogger)
})
}
// Global session settings - references config.SessionSettings for persistence
var currentSessionSettings *SessionSettings