Compare commits

...

4 Commits

Author SHA1 Message Date
Alex P 9a10d3ed38 refactor: revert unrelated USB gadget type changes
Remove int→int16 type signature changes from internal/usbgadget/ that were
not essential to multi-session functionality. These changes should be part
of a separate USB improvement PR.

Changes:
- Revert AbsMouseReport signature to use int instead of int16
- Remove int16 casts in hidrpc.go calling code
- Update usb.go wrapper functions to match

This keeps the multi-session PR focused on session management without
coupling unrelated USB gadget refactoring.
2025-10-17 11:51:09 +03:00
Alex P 846caf77ce refactor: improve code maintainability with focused handler functions
Extract large switch statements and functions into focused, reusable handlers
to improve code maintainability while preserving 100% functionality.

Changes:
- Extract onRPCMessage switch (200+ lines → 20 lines) into jsonrpc_session_handlers.go
- Extract cleanupInactiveSessions (343 lines → 54 lines) into session_cleanup_handlers.go
- Consolidate duplicate emergency promotion logic into attemptEmergencyPromotion()
- Simplify shouldBecomePrimary boolean logic with self-documenting variables

All changes pass linting (0 issues) and maintain complete functionality.
2025-10-17 11:29:04 +03:00
Alex P da85b54fc2 [WIP] Optimizations: code readiness optimizations 2025-10-17 10:44:18 +03:00
Alex P 8f17bbd1f9 [WIP] Optimizations: code readiness optimizations 2025-10-17 10:09:04 +03:00
11 changed files with 615 additions and 540 deletions

View File

@ -69,7 +69,7 @@ func handleHidRPCMessage(message hidrpc.Message, session *Session) {
logger.Warn().Err(err).Msg("failed to get pointer report")
return
}
rpcErr = rpcAbsMouseReport(int16(pointerReport.X), int16(pointerReport.Y), pointerReport.Button)
rpcErr = rpcAbsMouseReport(pointerReport.X, pointerReport.Y, pointerReport.Button)
case hidrpc.TypeMouseReport:
if !session.HasPermission(PermissionMouseInput) {
logger.Debug().
@ -146,8 +146,8 @@ const baseExtension = expectedRate + maxLateness // 100ms extension on perfect t
const maxStaleness = 225 * time.Millisecond // discard ancient packets outright
func handleHidRPCKeypressKeepAlive(session *Session) error {
// Update LastActive to prevent session timeout (jiggler sends every 50ms)
sessionManager.UpdateLastActive(session.ID)
// NOTE: Do NOT update LastActive here - jiggler keep-alives are automated,
// not human input. Only actual keyboard/mouse input should prevent timeout.
session.keepAliveJitterLock.Lock()
defer session.keepAliveJitterLock.Unlock()

View File

@ -354,7 +354,7 @@ func (u *UsbGadget) UpdateKeysDown(modifier byte, keys []byte) KeysDownState {
u.keyboardStateLock.Unlock()
if u.onKeysDownChange != nil {
(*u.onKeysDownChange)(state)
(*u.onKeysDownChange)(state) // this enques to the outgoing hidrpc queue via usb.go → currentSession.enqueueKeysDownState(...)
}
return state
}

View File

@ -85,7 +85,7 @@ func (u *UsbGadget) absMouseWriteHidFile(data []byte) error {
return nil
}
func (u *UsbGadget) AbsMouseReport(x int16, y int16, buttons uint8) error {
func (u *UsbGadget) AbsMouseReport(x int, y int, buttons uint8) error {
u.absMouseLock.Lock()
defer u.absMouseLock.Unlock()

View File

@ -163,213 +163,27 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) {
scopedLogger.Trace().Msg("Received RPC request")
// Handle session-specific RPC methods first
var result any
var handlerErr error
// Handle session management RPC methods
switch request.Method {
case "approvePrimaryRequest":
if err := RequirePermission(session, PermissionSessionTransfer); err != nil {
handlerErr = err
} else if requesterID, ok := request.Params["requesterID"].(string); ok {
handlerErr = sessionManager.ApprovePrimaryRequest(session.ID, requesterID)
if handlerErr == nil {
result = map[string]interface{}{"status": "approved"}
}
} else {
handlerErr = errors.New("invalid requesterID parameter")
}
case "denyPrimaryRequest":
if err := RequirePermission(session, PermissionSessionTransfer); err != nil {
handlerErr = err
} else if requesterID, ok := request.Params["requesterID"].(string); ok {
handlerErr = sessionManager.DenyPrimaryRequest(session.ID, requesterID)
if handlerErr == nil {
result = map[string]interface{}{"status": "denied"}
}
} else {
handlerErr = errors.New("invalid requesterID parameter")
}
case "approveNewSession":
if err := RequirePermission(session, PermissionSessionApprove); err != nil {
handlerErr = err
} else if sessionID, ok := request.Params["sessionId"].(string); ok {
handlerErr = sessionManager.ApproveSession(sessionID)
if handlerErr == nil {
go sessionManager.broadcastSessionListUpdate()
result = map[string]interface{}{"status": "approved"}
}
} else {
handlerErr = errors.New("invalid sessionId parameter")
}
case "denyNewSession":
if err := RequirePermission(session, PermissionSessionApprove); err != nil {
handlerErr = err
} else if sessionID, ok := request.Params["sessionId"].(string); ok {
handlerErr = sessionManager.DenySession(sessionID)
if handlerErr == nil {
// Notify the denied session
if targetSession := sessionManager.GetSession(sessionID); targetSession != nil {
go func() {
writeJSONRPCEvent("sessionAccessDenied", map[string]interface{}{
"message": "Access denied by primary session",
}, targetSession)
sessionManager.broadcastSessionListUpdate()
}()
}
result = map[string]interface{}{"status": "denied"}
}
} else {
handlerErr = errors.New("invalid sessionId parameter")
}
case "approvePrimaryRequest", "denyPrimaryRequest":
result, handlerErr = handleSessionTransferRPC(request.Method, request.Params, session)
case "approveNewSession", "denyNewSession":
result, handlerErr = handleSessionApprovalRPC(request.Method, request.Params, session)
case "requestSessionApproval":
if session.Mode != SessionModePending {
handlerErr = errors.New("only pending sessions can request approval")
} else if currentSessionSettings != nil && currentSessionSettings.RequireApproval {
if primary := sessionManager.GetPrimarySession(); primary != nil {
go func() {
writeJSONRPCEvent("newSessionPending", map[string]interface{}{
"sessionId": session.ID,
"source": session.Source,
"identity": session.Identity,
"nickname": session.Nickname,
}, primary)
}()
result = map[string]interface{}{"status": "requested"}
} else {
handlerErr = errors.New("no primary session available")
}
} else {
handlerErr = errors.New("session approval not required")
}
result, handlerErr = handleRequestSessionApprovalRPC(session)
case "updateSessionNickname":
sessionID, _ := request.Params["sessionId"].(string)
nickname, _ := request.Params["nickname"].(string)
// Validate nickname to match frontend validation
if len(nickname) < 2 {
handlerErr = errors.New("nickname must be at least 2 characters")
} else if len(nickname) > 30 {
handlerErr = errors.New("nickname must be 30 characters or less")
} else if !isValidNickname(nickname) {
handlerErr = errors.New("nickname can only contain letters, numbers, spaces, and - _ . @")
} else if targetSession := sessionManager.GetSession(sessionID); targetSession != nil {
// Users can update their own nickname, or admins can update any
if targetSession.ID == session.ID || session.HasPermission(PermissionSessionManage) {
// Check nickname uniqueness
allSessions := sessionManager.GetAllSessions()
for _, existingSession := range allSessions {
if existingSession.ID != sessionID && existingSession.Nickname == nickname {
handlerErr = fmt.Errorf("nickname '%s' is already in use by another session", nickname)
break
}
}
if handlerErr == nil {
targetSession.Nickname = nickname
// If session is pending and approval is required, send the approval request now that we have a nickname
if targetSession.Mode == SessionModePending && currentSessionSettings != nil && currentSessionSettings.RequireApproval {
if primary := sessionManager.GetPrimarySession(); primary != nil {
go func() {
writeJSONRPCEvent("newSessionPending", map[string]interface{}{
"sessionId": targetSession.ID,
"source": targetSession.Source,
"identity": targetSession.Identity,
"nickname": targetSession.Nickname,
}, primary)
}()
}
}
sessionManager.broadcastSessionListUpdate()
result = map[string]interface{}{"status": "updated"}
}
} else {
handlerErr = errors.New("permission denied: can only update own nickname")
}
} else {
handlerErr = errors.New("session not found")
}
result, handlerErr = handleUpdateSessionNicknameRPC(request.Params, session)
case "getSessions":
sessions := sessionManager.GetAllSessions()
result = sessions
result = sessionManager.GetAllSessions()
case "getPermissions":
permissions := session.GetPermissions()
permMap := make(map[string]bool)
for perm, allowed := range permissions {
permMap[string(perm)] = allowed
}
result = GetPermissionsResponse{
Mode: string(session.Mode),
Permissions: permMap,
}
case "getSessionSettings":
if err := RequirePermission(session, PermissionSettingsRead); err != nil {
handlerErr = err
} else {
result = currentSessionSettings
}
case "setSessionSettings":
if err := RequirePermission(session, PermissionSessionManage); err != nil {
handlerErr = err
} else {
if settings, ok := request.Params["settings"].(map[string]interface{}); ok {
if requireApproval, ok := settings["requireApproval"].(bool); ok {
currentSessionSettings.RequireApproval = requireApproval
}
if requireNickname, ok := settings["requireNickname"].(bool); ok {
currentSessionSettings.RequireNickname = requireNickname
}
if reconnectGrace, ok := settings["reconnectGrace"].(float64); ok {
currentSessionSettings.ReconnectGrace = int(reconnectGrace)
}
if primaryTimeout, ok := settings["primaryTimeout"].(float64); ok {
currentSessionSettings.PrimaryTimeout = int(primaryTimeout)
}
if privateKeystrokes, ok := settings["privateKeystrokes"].(bool); ok {
currentSessionSettings.PrivateKeystrokes = privateKeystrokes
}
if maxRejectionAttempts, ok := settings["maxRejectionAttempts"].(float64); ok {
currentSessionSettings.MaxRejectionAttempts = int(maxRejectionAttempts)
}
if maxSessions, ok := settings["maxSessions"].(float64); ok {
currentSessionSettings.MaxSessions = int(maxSessions)
}
if observerTimeout, ok := settings["observerTimeout"].(float64); ok {
currentSessionSettings.ObserverTimeout = int(observerTimeout)
}
// Trigger nickname auto-generation for sessions when RequireNickname changes
if sessionManager != nil {
sessionManager.updateAllSessionNicknames()
}
// Save to persistent config
if err := SaveConfig(); err != nil {
handlerErr = errors.New("failed to save session settings")
}
result = currentSessionSettings
} else {
handlerErr = errors.New("invalid settings parameter")
}
}
result, handlerErr = handleGetPermissionsRPC(session)
case "getSessionSettings", "setSessionSettings":
result, handlerErr = handleSessionSettingsRPC(request.Method, request.Params, session)
case "generateNickname":
// Generate a nickname based on user agent (no permissions required)
userAgent := ""
if request.Params != nil {
if ua, ok := request.Params["userAgent"].(string); ok {
userAgent = ua
}
}
// Use browser as fallback if no user agent provided
if userAgent == "" {
userAgent = "Mozilla/5.0 (Unknown) Browser"
}
result = map[string]string{
"nickname": generateNicknameFromUserAgent(userAgent),
}
result, handlerErr = handleGenerateNicknameRPC(request.Params)
default:
// Check method permissions using centralized permission system
if requiredPerm, exists := GetMethodPermission(request.Method); exists {

236
jsonrpc_session_handlers.go Normal file
View File

@ -0,0 +1,236 @@
package kvm
import (
"errors"
"fmt"
)
// handleSessionTransferRPC handles primary control transfer requests (approve/deny)
func handleSessionTransferRPC(method string, params map[string]any, session *Session) (any, error) {
requesterID, ok := params["requesterID"].(string)
if !ok {
return nil, errors.New("invalid requesterID parameter")
}
if err := RequirePermission(session, PermissionSessionTransfer); err != nil {
return nil, err
}
var err error
switch method {
case "approvePrimaryRequest":
err = sessionManager.ApprovePrimaryRequest(session.ID, requesterID)
if err == nil {
return map[string]interface{}{"status": "approved"}, nil
}
case "denyPrimaryRequest":
err = sessionManager.DenyPrimaryRequest(session.ID, requesterID)
if err == nil {
return map[string]interface{}{"status": "denied"}, nil
}
}
return nil, err
}
// handleSessionApprovalRPC handles new session approval requests (approve/deny)
func handleSessionApprovalRPC(method string, params map[string]any, session *Session) (any, error) {
sessionID, ok := params["sessionId"].(string)
if !ok {
return nil, errors.New("invalid sessionId parameter")
}
if err := RequirePermission(session, PermissionSessionApprove); err != nil {
return nil, err
}
var err error
switch method {
case "approveNewSession":
err = sessionManager.ApproveSession(sessionID)
if err == nil {
go sessionManager.broadcastSessionListUpdate()
return map[string]interface{}{"status": "approved"}, nil
}
case "denyNewSession":
err = sessionManager.DenySession(sessionID)
if err == nil {
if targetSession := sessionManager.GetSession(sessionID); targetSession != nil {
go func() {
writeJSONRPCEvent("sessionAccessDenied", map[string]interface{}{
"message": "Access denied by primary session",
}, targetSession)
sessionManager.broadcastSessionListUpdate()
}()
}
return map[string]interface{}{"status": "denied"}, nil
}
}
return nil, err
}
// handleRequestSessionApprovalRPC handles pending sessions requesting approval from primary
func handleRequestSessionApprovalRPC(session *Session) (any, error) {
if session.Mode != SessionModePending {
return nil, errors.New("only pending sessions can request approval")
}
if currentSessionSettings == nil || !currentSessionSettings.RequireApproval {
return nil, errors.New("session approval not required")
}
primary := sessionManager.GetPrimarySession()
if primary == nil {
return nil, errors.New("no primary session available")
}
go func() {
writeJSONRPCEvent("newSessionPending", map[string]interface{}{
"sessionId": session.ID,
"source": session.Source,
"identity": session.Identity,
"nickname": session.Nickname,
}, primary)
}()
return map[string]interface{}{"status": "requested"}, nil
}
// handleUpdateSessionNicknameRPC handles nickname updates for sessions
func handleUpdateSessionNicknameRPC(params map[string]any, session *Session) (any, error) {
sessionID, _ := params["sessionId"].(string)
nickname, _ := params["nickname"].(string)
if len(nickname) < 2 {
return nil, errors.New("nickname must be at least 2 characters")
}
if len(nickname) > 30 {
return nil, errors.New("nickname must be 30 characters or less")
}
if !isValidNickname(nickname) {
return nil, errors.New("nickname can only contain letters, numbers, spaces, and - _ . @")
}
targetSession := sessionManager.GetSession(sessionID)
if targetSession == nil {
return nil, errors.New("session not found")
}
if targetSession.ID != session.ID && !session.HasPermission(PermissionSessionManage) {
return nil, errors.New("permission denied: can only update own nickname")
}
// Check nickname uniqueness
allSessions := sessionManager.GetAllSessions()
for _, existingSession := range allSessions {
if existingSession.ID != sessionID && existingSession.Nickname == nickname {
return nil, fmt.Errorf("nickname '%s' is already in use by another session", nickname)
}
}
targetSession.Nickname = nickname
// If session is pending and approval is required, send the approval request now that we have a nickname
if targetSession.Mode == SessionModePending && currentSessionSettings != nil && currentSessionSettings.RequireApproval {
if primary := sessionManager.GetPrimarySession(); primary != nil {
go func() {
writeJSONRPCEvent("newSessionPending", map[string]interface{}{
"sessionId": targetSession.ID,
"source": targetSession.Source,
"identity": targetSession.Identity,
"nickname": targetSession.Nickname,
}, primary)
}()
}
}
sessionManager.broadcastSessionListUpdate()
return map[string]interface{}{"status": "updated"}, nil
}
// handleGetPermissionsRPC returns permissions for the current session
func handleGetPermissionsRPC(session *Session) (any, error) {
permissions := session.GetPermissions()
permMap := make(map[string]bool)
for perm, allowed := range permissions {
permMap[string(perm)] = allowed
}
return GetPermissionsResponse{
Mode: string(session.Mode),
Permissions: permMap,
}, nil
}
// handleSessionSettingsRPC handles getting or setting session settings
func handleSessionSettingsRPC(method string, params map[string]any, session *Session) (any, error) {
switch method {
case "getSessionSettings":
if err := RequirePermission(session, PermissionSettingsRead); err != nil {
return nil, err
}
return currentSessionSettings, nil
case "setSessionSettings":
if err := RequirePermission(session, PermissionSessionManage); err != nil {
return nil, err
}
settings, ok := params["settings"].(map[string]interface{})
if !ok {
return nil, errors.New("invalid settings parameter")
}
if requireApproval, ok := settings["requireApproval"].(bool); ok {
currentSessionSettings.RequireApproval = requireApproval
}
if requireNickname, ok := settings["requireNickname"].(bool); ok {
currentSessionSettings.RequireNickname = requireNickname
}
if reconnectGrace, ok := settings["reconnectGrace"].(float64); ok {
currentSessionSettings.ReconnectGrace = int(reconnectGrace)
}
if primaryTimeout, ok := settings["primaryTimeout"].(float64); ok {
currentSessionSettings.PrimaryTimeout = int(primaryTimeout)
}
if privateKeystrokes, ok := settings["privateKeystrokes"].(bool); ok {
currentSessionSettings.PrivateKeystrokes = privateKeystrokes
}
if maxRejectionAttempts, ok := settings["maxRejectionAttempts"].(float64); ok {
currentSessionSettings.MaxRejectionAttempts = int(maxRejectionAttempts)
}
if maxSessions, ok := settings["maxSessions"].(float64); ok {
currentSessionSettings.MaxSessions = int(maxSessions)
}
if observerTimeout, ok := settings["observerTimeout"].(float64); ok {
currentSessionSettings.ObserverTimeout = int(observerTimeout)
}
if sessionManager != nil {
sessionManager.updateAllSessionNicknames()
}
if err := SaveConfig(); err != nil {
return nil, errors.New("failed to save session settings")
}
return currentSessionSettings, nil
}
return nil, fmt.Errorf("unknown session settings method: %s", method)
}
// handleGenerateNicknameRPC generates a nickname based on user agent
func handleGenerateNicknameRPC(params map[string]any) (any, error) {
userAgent := ""
if params != nil {
if ua, ok := params["userAgent"].(string); ok {
userAgent = ua
}
}
if userAgent == "" {
userAgent = "Mozilla/5.0 (Unknown) Browser"
}
return map[string]string{
"nickname": generateNicknameFromUserAgent(userAgent),
}, nil
}

View File

@ -62,6 +62,9 @@ func initNative(systemVersion *semver.Version, appVersion *semver.Version) {
Str("sessionID", s.ID).
Err(err).
Msg("error writing sample to session")
} else {
// Update LastActive when video frame successfully sent (prevents observer timeout)
sessionManager.UpdateLastActive(s.ID)
}
}
})

279
session_cleanup_handlers.go Normal file
View File

@ -0,0 +1,279 @@
package kvm
import (
"time"
)
// emergencyPromotionContext holds context for emergency promotion attempts
type emergencyPromotionContext struct {
triggerSessionID string
triggerReason string
now time.Time
}
// attemptEmergencyPromotion tries to promote a session using emergency or normal promotion logic
// Returns (promotedSessionID, isEmergency, shouldSkip)
func (sm *SessionManager) attemptEmergencyPromotion(ctx emergencyPromotionContext, excludeSessionID string) (string, bool, bool) {
// Check if emergency promotion is needed
if currentSessionSettings == nil || !currentSessionSettings.RequireApproval {
// Normal promotion - reset consecutive counter
sm.consecutiveEmergencyPromotions = 0
promotedID := sm.findNextSessionToPromote()
return promotedID, false, false
}
// Emergency promotion path
hasPrimary := sm.primarySessionID != ""
if !hasPrimary {
sm.logger.Error().
Str("triggerSessionID", ctx.triggerSessionID).
Msg("CRITICAL: No primary session exists - bypassing all rate limits")
} else {
// Rate limiting (only when we have a primary)
if ctx.now.Sub(sm.lastEmergencyPromotion) < 30*time.Second {
sm.logger.Warn().
Str("triggerSessionID", ctx.triggerSessionID).
Dur("timeSinceLastEmergency", ctx.now.Sub(sm.lastEmergencyPromotion)).
Msgf("Emergency promotion rate limit exceeded - potential attack (%s)", ctx.triggerReason)
return "", false, true // shouldSkip = true
}
// Limit consecutive emergency promotions
if sm.consecutiveEmergencyPromotions >= 3 {
sm.logger.Error().
Str("triggerSessionID", ctx.triggerSessionID).
Int("consecutiveCount", sm.consecutiveEmergencyPromotions).
Msgf("Too many consecutive emergency promotions - blocking for security (%s)", ctx.triggerReason)
return "", false, true // shouldSkip = true
}
}
// Find best session for emergency promotion
var promotedSessionID string
if excludeSessionID != "" {
// Need to exclude a specific session (e.g., timed-out session)
bestSessionID := ""
bestScore := -1
for id, session := range sm.sessions {
if id != excludeSessionID &&
!sm.isSessionBlacklisted(id) &&
(session.Mode == SessionModeObserver || session.Mode == SessionModeQueued) {
score := sm.getSessionTrustScore(id)
if score > bestScore {
bestScore = score
bestSessionID = id
}
}
}
promotedSessionID = bestSessionID
} else {
promotedSessionID = sm.findMostTrustedSessionForEmergency()
}
return promotedSessionID, true, false
}
// handleGracePeriodExpiration checks and handles expired grace periods
// Returns true if any grace period expired
func (sm *SessionManager) handleGracePeriodExpiration(now time.Time) bool {
gracePeriodExpired := false
for sessionID, graceTime := range sm.reconnectGrace {
if now.After(graceTime) {
delete(sm.reconnectGrace, sessionID)
gracePeriodExpired = true
wasHoldingPrimarySlot := (sm.lastPrimaryID == sessionID)
if wasHoldingPrimarySlot {
sm.primarySessionID = ""
sm.lastPrimaryID = ""
sm.logger.Info().
Str("expiredSessionID", sessionID).
Msg("Primary session grace period expired - slot now available")
// Promote next eligible session using emergency logic if needed
sm.promoteAfterGraceExpiration(sessionID, now)
} else {
sm.logger.Debug().
Str("expiredSessionID", sessionID).
Msg("Non-primary session grace period expired")
}
delete(sm.reconnectInfo, sessionID)
}
}
return gracePeriodExpired
}
// promoteAfterGraceExpiration handles promotion after grace period expiration
func (sm *SessionManager) promoteAfterGraceExpiration(expiredSessionID string, now time.Time) {
ctx := emergencyPromotionContext{
triggerSessionID: expiredSessionID,
triggerReason: "grace_expiration",
now: now,
}
promotedSessionID, isEmergency, shouldSkip := sm.attemptEmergencyPromotion(ctx, "")
if shouldSkip {
return
}
if promotedSessionID != "" {
reason := "grace_expiration_promotion"
if isEmergency {
reason = "emergency_promotion_deadlock_prevention"
sm.lastEmergencyPromotion = now
sm.consecutiveEmergencyPromotions++
sm.logger.Warn().
Str("expiredSessionID", expiredSessionID).
Str("promotedSessionID", promotedSessionID).
Bool("requireApproval", true).
Int("consecutiveEmergencyPromotions", sm.consecutiveEmergencyPromotions).
Int("trustScore", sm.getSessionTrustScore(promotedSessionID)).
Msg("EMERGENCY: Bypassing approval requirement to prevent deadlock")
}
err := sm.transferPrimaryRole("", promotedSessionID, reason, "primary grace period expired")
if err == nil {
logEvent := sm.logger.Info()
if isEmergency {
logEvent = sm.logger.Warn()
}
logEvent.
Str("expiredSessionID", expiredSessionID).
Str("promotedSessionID", promotedSessionID).
Str("reason", reason).
Bool("isEmergencyPromotion", isEmergency).
Msg("Auto-promoted session after primary grace period expiration")
} else {
sm.logger.Error().
Err(err).
Str("expiredSessionID", expiredSessionID).
Str("promotedSessionID", promotedSessionID).
Str("reason", reason).
Bool("isEmergencyPromotion", isEmergency).
Msg("Failed to promote session after grace period expiration")
}
} else {
logLevel := sm.logger.Info()
if isEmergency {
logLevel = sm.logger.Error()
}
logLevel.
Str("expiredSessionID", expiredSessionID).
Bool("isEmergencyPromotion", isEmergency).
Msg("Primary grace period expired but no eligible sessions to promote")
}
}
// handlePendingSessionTimeout removes timed-out pending sessions (DoS protection)
// Returns true if any pending session was removed
func (sm *SessionManager) handlePendingSessionTimeout(now time.Time) bool {
needsCleanup := false
for id, session := range sm.sessions {
if session.Mode == SessionModePending &&
now.Sub(session.CreatedAt) > defaultPendingSessionTimeout {
websocketLogger.Info().
Str("sessionId", id).
Dur("age", now.Sub(session.CreatedAt)).
Msg("Removing timed-out pending session")
delete(sm.sessions, id)
needsCleanup = true
}
}
return needsCleanup
}
// handleObserverSessionCleanup removes inactive observer sessions with closed RPC channels
// Returns true if any observer session was removed
func (sm *SessionManager) handleObserverSessionCleanup(now time.Time) bool {
observerTimeout := defaultObserverSessionTimeout
if currentSessionSettings != nil && currentSessionSettings.ObserverTimeout > 0 {
observerTimeout = time.Duration(currentSessionSettings.ObserverTimeout) * time.Second
}
needsCleanup := false
for id, session := range sm.sessions {
if session.Mode == SessionModeObserver {
if session.RPCChannel == nil && now.Sub(session.LastActive) > observerTimeout {
sm.logger.Info().
Str("sessionId", id).
Dur("inactiveFor", now.Sub(session.LastActive)).
Dur("observerTimeout", observerTimeout).
Msg("Removing inactive observer session with closed RPC channel")
delete(sm.sessions, id)
needsCleanup = true
}
}
}
return needsCleanup
}
// handlePrimarySessionTimeout checks and handles primary session timeout
// Returns true if primary session was timed out and cleanup is needed
func (sm *SessionManager) handlePrimarySessionTimeout(now time.Time) bool {
if sm.primarySessionID == "" {
return false
}
primary, exists := sm.sessions[sm.primarySessionID]
if !exists {
sm.primarySessionID = ""
return true
}
currentTimeout := sm.getCurrentPrimaryTimeout()
if now.Sub(primary.LastActive) <= currentTimeout {
return false
}
// Timeout detected - demote primary
timedOutSessionID := primary.ID
primary.Mode = SessionModeObserver
sm.primarySessionID = ""
ctx := emergencyPromotionContext{
triggerSessionID: timedOutSessionID,
triggerReason: "timeout",
now: now,
}
promotedSessionID, isEmergency, shouldSkip := sm.attemptEmergencyPromotion(ctx, timedOutSessionID)
if shouldSkip {
return false
}
if promotedSessionID != "" {
reason := "timeout_promotion"
if isEmergency {
reason = "emergency_timeout_promotion"
sm.lastEmergencyPromotion = now
sm.consecutiveEmergencyPromotions++
sm.logger.Warn().
Str("timedOutSessionID", timedOutSessionID).
Str("promotedSessionID", promotedSessionID).
Bool("requireApproval", true).
Int("trustScore", sm.getSessionTrustScore(promotedSessionID)).
Msg("EMERGENCY: Timeout promotion bypassing approval requirement")
}
err := sm.transferPrimaryRole(timedOutSessionID, promotedSessionID, reason, "primary session timeout")
if err == nil {
logEvent := sm.logger.Info()
if isEmergency {
logEvent = sm.logger.Warn()
}
logEvent.
Str("timedOutSessionID", timedOutSessionID).
Str("promotedSessionID", promotedSessionID).
Bool("isEmergencyPromotion", isEmergency).
Msg("Auto-promoted session after primary timeout")
return true
}
}
return false
}

View File

@ -84,10 +84,6 @@ type TransferBlacklistEntry struct {
var (
lastBroadcast time.Time
broadcastMutex sync.Mutex
// Pre-allocated event maps to reduce allocations
modePrimaryEvent = map[string]string{"mode": "primary"}
modeObserverEvent = map[string]string{"mode": "observer"}
)
type SessionManager struct {
@ -95,6 +91,7 @@ type SessionManager struct {
primaryTimeout time.Duration // 8 bytes
logger *zerolog.Logger // 8 bytes
sessions map[string]*Session // 8 bytes
nicknameIndex map[string]*Session // 8 bytes - O(1) nickname uniqueness lookups
reconnectGrace map[string]time.Time // 8 bytes
reconnectInfo map[string]*SessionData // 8 bytes
transferBlacklist []TransferBlacklistEntry // Prevent demoted sessions from immediate re-promotion
@ -136,6 +133,7 @@ func NewSessionManager(logger *zerolog.Logger) *SessionManager {
sm := &SessionManager{
sessions: make(map[string]*Session),
nicknameIndex: make(map[string]*Session),
reconnectGrace: make(map[string]time.Time),
reconnectInfo: make(map[string]*SessionData),
transferBlacklist: make([]TransferBlacklistEntry, 0),
@ -177,10 +175,10 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe
sm.mu.Lock()
defer sm.mu.Unlock()
// Check nickname uniqueness (only for non-empty nicknames)
// Check nickname uniqueness using O(1) index (only for non-empty nicknames)
if session.Nickname != "" {
for id, existingSession := range sm.sessions {
if id != session.ID && existingSession.Nickname == session.Nickname {
if existingSession, exists := sm.nicknameIndex[session.Nickname]; exists {
if existingSession.ID != session.ID {
return fmt.Errorf("nickname '%s' is already in use by another session", session.Nickname)
}
}
@ -282,12 +280,14 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe
}
isBlacklisted := sm.isSessionBlacklisted(session.ID)
isOnlySession := len(sm.sessions) == 0
// Determine if this session should become primary
// If there's no primary AND this is the ONLY session, ALWAYS promote regardless of blacklist
isOnlySession := len(sm.sessions) == 0
shouldBecomePrimary := (wasWithinGracePeriod && wasPreviouslyPrimary && !primaryExists && !hasActivePrimaryGracePeriod) ||
(!wasWithinGracePeriod && !hasActivePrimaryGracePeriod && !primaryExists && (!isBlacklisted || isOnlySession))
canBecomePrimary := !primaryExists && !hasActivePrimaryGracePeriod
isReconnectingPrimary := wasWithinGracePeriod && wasPreviouslyPrimary
isNewEligibleSession := !wasWithinGracePeriod && (!isBlacklisted || isOnlySession)
shouldBecomePrimary := canBecomePrimary && (isReconnectingPrimary || isNewEligibleSession)
if shouldBecomePrimary {
if sm.primarySessionID == "" || sm.sessions[sm.primarySessionID] == nil {
@ -351,6 +351,11 @@ func (sm *SessionManager) AddSession(session *Session, clientSettings *SessionSe
// Ensure session has auto-generated nickname if needed
sm.ensureNickname(session)
// Add to nickname index
if session.Nickname != "" {
sm.nicknameIndex[session.Nickname] = session
}
sm.validateSinglePrimary()
// Clean up grace period after validation completes
@ -406,18 +411,20 @@ func (sm *SessionManager) RemoveSession(sessionID string) {
// Only add grace period if this is NOT an intentional logout
if !isIntentionalLogout {
// Limit grace period entries to prevent memory exhaustion
// Evict the entry that will expire soonest (oldest expiration time)
for len(sm.reconnectGrace) >= maxGracePeriodEntries {
var oldestID string
var oldestTime time.Time
var evictID string
var earliestExpiration time.Time
for id, graceTime := range sm.reconnectGrace {
if oldestTime.IsZero() || graceTime.Before(oldestTime) {
oldestID = id
oldestTime = graceTime
// Find the grace period that expires first (earliest time)
if earliestExpiration.IsZero() || graceTime.Before(earliestExpiration) {
evictID = id
earliestExpiration = graceTime
}
}
if oldestID != "" {
delete(sm.reconnectGrace, oldestID)
delete(sm.reconnectInfo, oldestID)
if evictID != "" {
delete(sm.reconnectGrace, evictID)
delete(sm.reconnectInfo, evictID)
} else {
break
}
@ -531,20 +538,26 @@ func (sm *SessionManager) ClearGracePeriod(sessionID string) {
// isSessionBlacklisted checks if a session was recently demoted via transfer and should not become primary
func (sm *SessionManager) isSessionBlacklisted(sessionID string) bool {
now := time.Now()
isBlacklisted := false
// Clean expired entries while we're here
validEntries := make([]TransferBlacklistEntry, 0, len(sm.transferBlacklist))
for _, entry := range sm.transferBlacklist {
// Clean expired entries in-place (zero allocations)
writeIndex := 0
for readIndex := 0; readIndex < len(sm.transferBlacklist); readIndex++ {
entry := sm.transferBlacklist[readIndex]
if now.Before(entry.ExpiresAt) {
validEntries = append(validEntries, entry)
// Keep this entry - still valid
sm.transferBlacklist[writeIndex] = entry
writeIndex++
if entry.SessionID == sessionID {
return true // Found active blacklist entry
isBlacklisted = true
}
}
// Expired entries are automatically skipped (not copied forward)
}
sm.transferBlacklist = validEntries // Update with only non-expired entries
// Truncate to only valid entries
sm.transferBlacklist = sm.transferBlacklist[:writeIndex]
return false
return isBlacklisted
}
// GetPrimarySession returns the current primary session
@ -655,7 +668,7 @@ func (sm *SessionManager) RequestPrimary(sessionID string) error {
err := sm.transferPrimaryRole("", sessionID, "initial_promotion", "first session auto-promotion")
if err == nil {
// Send mode change event after promoting
writeJSONRPCEvent("modeChanged", modePrimaryEvent, session)
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "primary"}, session)
go sm.broadcastSessionListUpdate()
}
return err
@ -742,7 +755,7 @@ func (sm *SessionManager) ReleasePrimary(sessionID string) error {
// Send mode change event for promoted session
go func() {
if promotedSession := sessionManager.GetSession(promotedSessionID); promotedSession != nil {
writeJSONRPCEvent("modeChanged", modePrimaryEvent, promotedSession)
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "primary"}, promotedSession)
}
}()
} else {
@ -784,13 +797,13 @@ func (sm *SessionManager) TransferPrimary(fromID, toID string) error {
// Send events in goroutines to avoid holding lock
go func() {
if fromSession := sessionManager.GetSession(fromID); fromSession != nil {
writeJSONRPCEvent("modeChanged", modeObserverEvent, fromSession)
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "observer"}, fromSession)
}
}()
go func() {
if toSession := sessionManager.GetSession(toID); toSession != nil {
writeJSONRPCEvent("modeChanged", modePrimaryEvent, toSession)
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "primary"}, toSession)
}
sm.broadcastSessionListUpdate()
}()
@ -848,13 +861,13 @@ func (sm *SessionManager) ApprovePrimaryRequest(currentPrimaryID, requesterID st
// Send events after releasing lock to avoid deadlock
go func() {
if demotedSession := sessionManager.GetSession(currentPrimaryID); demotedSession != nil {
writeJSONRPCEvent("modeChanged", modeObserverEvent, demotedSession)
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "observer"}, demotedSession)
}
}()
go func() {
if promotedSession := sessionManager.GetSession(requesterID); promotedSession != nil {
writeJSONRPCEvent("modeChanged", modePrimaryEvent, promotedSession)
writeJSONRPCEvent("modeChanged", map[string]string{"mode": "primary"}, promotedSession)
}
sm.broadcastSessionListUpdate()
}()
@ -1519,10 +1532,17 @@ func (sm *SessionManager) broadcastSessionListUpdate() {
// Now send events without holding lock
for _, session := range activeSessions {
// Per-session throttling to prevent broadcast storms
if time.Since(session.LastBroadcast) < sessionBroadcastDelay {
session.lastBroadcastMu.Lock()
shouldSkip := time.Since(session.LastBroadcast) < sessionBroadcastDelay
if !shouldSkip {
session.LastBroadcast = time.Now()
}
session.lastBroadcastMu.Unlock()
if shouldSkip {
continue
}
session.LastBroadcast = time.Now()
event := SessionsUpdateEvent{
Sessions: infos,
YourMode: session.Mode,
@ -1547,10 +1567,10 @@ func (sm *SessionManager) Shutdown() {
}
func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second) // Check every second for grace periods
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
validationCounter := 0 // Counter for periodic validateSinglePrimary calls
validationCounter := 0
for {
select {
@ -1561,313 +1581,33 @@ func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) {
now := time.Now()
needsBroadcast := false
// Check for expired grace periods and promote if needed
gracePeriodExpired := false
for sessionID, graceTime := range sm.reconnectGrace {
if now.After(graceTime) {
delete(sm.reconnectGrace, sessionID)
gracePeriodExpired = true
wasHoldingPrimarySlot := (sm.lastPrimaryID == sessionID)
// Check if this expired session was the primary holding the slot
if wasHoldingPrimarySlot {
// The primary didn't reconnect in time, now we can clear the slot and promote
sm.primarySessionID = ""
sm.lastPrimaryID = ""
needsBroadcast = true
sm.logger.Info().
Str("expiredSessionID", sessionID).
Msg("Primary session grace period expired - slot now available")
// Always try to promote when possible - approval is only for new pending sessions
// Use enhanced emergency promotion system for better security
isEmergencyPromotion := false
var promotedSessionID string
// === EMERGENCY PROMOTION ALGORITHM ===
//
// When RequireApproval is enabled, we face a potential deadlock scenario:
// - Primary session disconnects (grace period expires)
// - All other sessions are pending (waiting for approval from primary)
// - No primary exists to approve pending sessions
// - Result: System is stuck with no primary and no way to get one
//
// Solution: Emergency promotion bypasses approval requirement to select
// the most trustworthy pending/observer session as primary. This ensures
// the system ALWAYS has a primary session for KVM functionality.
//
// Security measures to prevent abuse:
// 1. Rate limiting: Max 1 emergency promotion per 30 seconds
// 2. Consecutive limit: Max 3 consecutive emergency promotions
// 3. Trust-based selection: Sessions scored on age, history, nickname
// 4. Audit logging: All emergency promotions logged at WARN level
//
// Trust scoring criteria (see getSessionTrustScore):
// - Session age: +1 point per minute (capped at 100)
// - Was previous primary: +50 points
// - Observer mode: +20 points (more trustworthy than queued/pending)
// - Queued mode: +10 points
// - Has required nickname: +15 points / missing: -30 points
//
// This algorithm prioritizes long-lived, previously-primary sessions
// with proper nicknames over newly-connected anonymous sessions.
//
// Check if this is an emergency scenario (RequireApproval enabled)
if currentSessionSettings != nil && currentSessionSettings.RequireApproval {
isEmergencyPromotion = true
// CRITICAL: Ensure we ALWAYS have a primary session
// If there's NO primary, bypass rate limits entirely
hasPrimary := sm.primarySessionID != ""
if !hasPrimary {
sm.logger.Error().
Str("expiredSessionID", sessionID).
Msg("CRITICAL: No primary session exists - bypassing all rate limits")
} else {
// Rate limiting for emergency promotions (only when we have a primary)
if now.Sub(sm.lastEmergencyPromotion) < 30*time.Second {
sm.logger.Warn().
Str("expiredSessionID", sessionID).
Dur("timeSinceLastEmergency", now.Sub(sm.lastEmergencyPromotion)).
Msg("Emergency promotion rate limit exceeded - potential attack")
continue // Skip this grace period expiration
}
// Limit consecutive emergency promotions
if sm.consecutiveEmergencyPromotions >= 3 {
sm.logger.Error().
Str("expiredSessionID", sessionID).
Int("consecutiveCount", sm.consecutiveEmergencyPromotions).
Msg("Too many consecutive emergency promotions - blocking for security")
continue // Skip this grace period expiration
}
}
promotedSessionID = sm.findMostTrustedSessionForEmergency()
} else {
// Normal promotion - reset consecutive counter
sm.consecutiveEmergencyPromotions = 0
promotedSessionID = sm.findNextSessionToPromote()
}
if promotedSessionID != "" {
// Determine reason and log appropriately
reason := "grace_expiration_promotion"
if isEmergencyPromotion {
reason = "emergency_promotion_deadlock_prevention"
sm.lastEmergencyPromotion = now
sm.consecutiveEmergencyPromotions++
// Enhanced logging for emergency promotions
sm.logger.Warn().
Str("expiredSessionID", sessionID).
Str("promotedSessionID", promotedSessionID).
Bool("requireApproval", true).
Int("consecutiveEmergencyPromotions", sm.consecutiveEmergencyPromotions).
Int("trustScore", sm.getSessionTrustScore(promotedSessionID)).
Msg("EMERGENCY: Bypassing approval requirement to prevent deadlock")
}
err := sm.transferPrimaryRole("", promotedSessionID, reason, "primary grace period expired")
if err == nil {
logEvent := sm.logger.Info()
if isEmergencyPromotion {
logEvent = sm.logger.Warn()
}
logEvent.
Str("expiredSessionID", sessionID).
Str("promotedSessionID", promotedSessionID).
Str("reason", reason).
Bool("isEmergencyPromotion", isEmergencyPromotion).
Msg("Auto-promoted session after primary grace period expiration")
} else {
sm.logger.Error().
Err(err).
Str("expiredSessionID", sessionID).
Str("promotedSessionID", promotedSessionID).
Str("reason", reason).
Bool("isEmergencyPromotion", isEmergencyPromotion).
Msg("Failed to promote session after grace period expiration")
}
} else {
logLevel := sm.logger.Info()
if isEmergencyPromotion {
logLevel = sm.logger.Error() // Emergency with no eligible sessions is critical
}
logLevel.
Str("expiredSessionID", sessionID).
Bool("isEmergencyPromotion", isEmergencyPromotion).
Msg("Primary grace period expired but no eligible sessions to promote")
}
} else {
// Non-primary session grace period expired - just cleanup
sm.logger.Debug().
Str("expiredSessionID", sessionID).
Msg("Non-primary session grace period expired")
}
// Also clean up reconnect info for expired sessions
delete(sm.reconnectInfo, sessionID)
}
}
// Clean up pending sessions that have timed out (DoS protection)
for id, session := range sm.sessions {
if session.Mode == SessionModePending &&
now.Sub(session.CreatedAt) > defaultPendingSessionTimeout {
websocketLogger.Info().
Str("sessionId", id).
Dur("age", now.Sub(session.CreatedAt)).
Msg("Removing timed-out pending session")
delete(sm.sessions, id)
// Handle expired grace periods
gracePeriodExpired := sm.handleGracePeriodExpiration(now)
if gracePeriodExpired {
needsBroadcast = true
}
}
// Clean up observer sessions with closed RPC channels (stale connections)
// This prevents accumulation of zombie observer sessions that are no longer connected
observerTimeout := defaultObserverSessionTimeout
if currentSessionSettings != nil && currentSessionSettings.ObserverTimeout > 0 {
observerTimeout = time.Duration(currentSessionSettings.ObserverTimeout) * time.Second
}
for id, session := range sm.sessions {
if session.Mode == SessionModeObserver {
// Check if RPC channel is nil/closed AND session has been inactive
if session.RPCChannel == nil && now.Sub(session.LastActive) > observerTimeout {
sm.logger.Info().
Str("sessionId", id).
Dur("inactiveFor", now.Sub(session.LastActive)).
Dur("observerTimeout", observerTimeout).
Msg("Removing inactive observer session with closed RPC channel")
delete(sm.sessions, id)
// Clean up timed-out pending sessions (DoS protection)
if sm.handlePendingSessionTimeout(now) {
needsBroadcast = true
}
}
}
// Check primary session timeout (every 30 iterations = 30 seconds)
if sm.primarySessionID != "" {
if primary, exists := sm.sessions[sm.primarySessionID]; exists {
currentTimeout := sm.getCurrentPrimaryTimeout()
if now.Sub(primary.LastActive) > currentTimeout {
timedOutSessionID := primary.ID
primary.Mode = SessionModeObserver
sm.primarySessionID = ""
// === TIMEOUT-BASED EMERGENCY PROMOTION ===
//
// Similar to grace period expiration, primary session timeout can create
// a deadlock when RequireApproval is enabled. The timeout detection happens
// every 30 seconds (based on ticker iterations) and demotes inactive primaries.
//
// Without emergency promotion:
// - Primary becomes inactive and times out
// - Primary is demoted to observer
// - All other sessions are pending (awaiting approval)
// - No primary exists to approve them
// - System deadlocked with no KVM control
//
// This uses the same trust-based selection and security measures as
// grace period emergency promotion to ensure system availability.
isEmergencyPromotion := false
var promotedSessionID string
// Check if this requires emergency promotion due to approval requirements
if currentSessionSettings != nil && currentSessionSettings.RequireApproval {
isEmergencyPromotion = true
// CRITICAL: Ensure we ALWAYS have a primary session
// primarySessionID was just cleared above, so this will always be empty
// But check anyway for completeness
hasPrimary := sm.primarySessionID != ""
if !hasPrimary {
sm.logger.Error().
Str("timedOutSessionID", timedOutSessionID).
Msg("CRITICAL: No primary session after timeout - bypassing all rate limits")
} else {
// Rate limiting for emergency promotions (only when we have a primary)
if now.Sub(sm.lastEmergencyPromotion) < 30*time.Second {
sm.logger.Warn().
Str("timedOutSessionID", timedOutSessionID).
Dur("timeSinceLastEmergency", now.Sub(sm.lastEmergencyPromotion)).
Msg("Emergency promotion rate limit exceeded during timeout - potential attack")
continue // Skip this timeout
}
}
// Use trust-based selection but exclude the timed-out session
bestSessionID := ""
bestScore := -1
for id, session := range sm.sessions {
if id != timedOutSessionID &&
!sm.isSessionBlacklisted(id) &&
(session.Mode == SessionModeObserver || session.Mode == SessionModeQueued) {
score := sm.getSessionTrustScore(id)
if score > bestScore {
bestScore = score
bestSessionID = id
}
}
}
promotedSessionID = bestSessionID
} else {
// Normal timeout promotion - find any observer except the timed-out one
for id, session := range sm.sessions {
if id != timedOutSessionID && session.Mode == SessionModeObserver && !sm.isSessionBlacklisted(id) {
promotedSessionID = id
break
}
}
}
// If found a session to promote
if promotedSessionID != "" {
reason := "timeout_promotion"
if isEmergencyPromotion {
reason = "emergency_timeout_promotion"
sm.lastEmergencyPromotion = now
sm.consecutiveEmergencyPromotions++
// Enhanced logging for emergency timeout promotions
sm.logger.Warn().
Str("timedOutSessionID", timedOutSessionID).
Str("promotedSessionID", promotedSessionID).
Bool("requireApproval", true).
Int("trustScore", sm.getSessionTrustScore(promotedSessionID)).
Msg("EMERGENCY: Timeout promotion bypassing approval requirement")
}
err := sm.transferPrimaryRole(timedOutSessionID, promotedSessionID, reason, "primary session timeout")
if err == nil {
needsBroadcast = true
logEvent := sm.logger.Info()
if isEmergencyPromotion {
logEvent = sm.logger.Warn()
}
logEvent.
Str("timedOutSessionID", timedOutSessionID).
Str("promotedSessionID", promotedSessionID).
Bool("isEmergencyPromotion", isEmergencyPromotion).
Msg("Auto-promoted session after primary timeout")
}
}
}
} else {
// Primary session no longer exists, clear it
sm.primarySessionID = ""
// Clean up inactive observer sessions
if sm.handleObserverSessionCleanup(now) {
needsBroadcast = true
}
// Handle primary session timeout
if sm.handlePrimarySessionTimeout(now) {
needsBroadcast = true
}
// Run validation immediately if a grace period expired, otherwise run periodically
// Run validation immediately if grace period expired, otherwise periodically
if gracePeriodExpired {
sm.validateSinglePrimary()
} else {
// Periodic validateSinglePrimary to catch deadlock states
validationCounter++
if validationCounter >= 10 { // Every 10 seconds
if validationCounter >= 10 {
validationCounter = 0
sm.validateSinglePrimary()
}
@ -1875,7 +1615,6 @@ func (sm *SessionManager) cleanupInactiveSessions(ctx context.Context) {
sm.mu.Unlock()
// Broadcast outside of lock if needed
if needsBroadcast {
go sm.broadcastSessionListUpdate()
}

View File

@ -52,7 +52,6 @@ export default function Actionbar({
const response = JSON.parse(event.data);
if (response.id === id && response.result) {
setSessions(response.result);
rpcDataChannel.removeEventListener("message", handler);
}
} catch {
// Ignore parse errors for non-JSON messages
@ -62,10 +61,14 @@ export default function Actionbar({
rpcDataChannel.addEventListener("message", handler);
rpcDataChannel.send(message);
// Clean up after timeout
setTimeout(() => {
const timeoutId = setTimeout(() => {
rpcDataChannel.removeEventListener("message", handler);
}, 5000);
return () => {
clearTimeout(timeoutId);
rpcDataChannel.removeEventListener("message", handler);
};
}
}, [rpcDataChannel, sessions.length, setSessions]);

4
usb.go
View File

@ -89,7 +89,7 @@ func (s *Session) rpcKeypressReport(key byte, press bool) error {
return gadget.KeypressReport(key, press)
}
func (s *Session) rpcAbsMouseReport(x int16, y int16, buttons uint8) error {
func (s *Session) rpcAbsMouseReport(x int, y int, buttons uint8) error {
if s == nil || !s.HasPermission(PermissionMouseInput) {
return ErrPermissionDeniedMouse
}
@ -128,7 +128,7 @@ func rpcKeypressReport(key byte, press bool) error {
return ErrNotPrimarySession
}
func rpcAbsMouseReport(x int16, y int16, buttons uint8) error {
func rpcAbsMouseReport(x int, y int, buttons uint8) error {
if primary := sessionManager.GetPrimarySession(); primary != nil {
return primary.rpcAbsMouseReport(x, y, buttons)
}

View File

@ -44,6 +44,7 @@ type Session struct {
rpcRateLimitMu sync.Mutex // Protects rate limit fields
rpcRateLimit int // Count of RPCs in current window
rpcRateLimitWin time.Time // Start of current rate limit window
lastBroadcastMu sync.Mutex // Protects LastBroadcast field
peerConnection *webrtc.PeerConnection
VideoTrack *webrtc.TrackLocalStaticSample