diff --git a/audio_handlers.go b/audio_handlers.go new file mode 100644 index 00000000..049026f4 --- /dev/null +++ b/audio_handlers.go @@ -0,0 +1,136 @@ +package kvm + +import ( + "context" + "time" + + "github.com/coder/websocket" + "github.com/gin-gonic/gin" + "github.com/jetkvm/kvm/internal/audio" + "github.com/rs/zerolog" +) + +// handleAudioMute handles POST /audio/mute requests +func handleAudioMute(c *gin.Context) { + type muteReq struct { + Muted bool `json:"muted"` + } + var req muteReq + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(400, gin.H{"error": "invalid request"}) + return + } + audio.SetAudioMuted(req.Muted) + // Also set relay mute state if in main process + audio.SetAudioRelayMuted(req.Muted) + + // Broadcast audio mute state change via WebSocket + broadcaster := audio.GetAudioEventBroadcaster() + broadcaster.BroadcastAudioDeviceChanged(!req.Muted, "audio_mute_changed") + + c.JSON(200, gin.H{ + "status": "audio mute state updated", + "muted": req.Muted, + }) +} + +// handleMicrophoneStart handles POST /microphone/start requests +func handleMicrophoneStart(c *gin.Context) { + if currentSession == nil { + c.JSON(400, gin.H{"error": "no active session"}) + return + } + + if currentSession.AudioInputManager == nil { + c.JSON(500, gin.H{"error": "audio input manager not available"}) + return + } + + // Check cooldown using atomic operations + // Note: Cooldown check would be implemented in audio package if needed + + logger.Info().Msg("starting microphone via HTTP request") + + err := currentSession.AudioInputManager.Start() + if err != nil { + c.JSON(500, gin.H{"error": err.Error()}) + return + } + + // Broadcast microphone state change via WebSocket + broadcaster := audio.GetAudioEventBroadcaster() + broadcaster.BroadcastAudioDeviceChanged(true, "microphone_started") + + c.JSON(200, gin.H{ + "status": "microphone started", + "is_running": currentSession.AudioInputManager.IsRunning(), + }) +} + +// handleMicrophoneMute handles POST /microphone/mute requests +func handleMicrophoneMute(c *gin.Context) { + var req struct { + Muted bool `json:"muted"` + } + + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(400, gin.H{"error": "invalid request body"}) + return + } + + // Note: Microphone muting is typically handled at the frontend level + // This endpoint is provided for consistency but doesn't affect backend processing + c.JSON(200, gin.H{ + "status": "mute state updated", + "muted": req.Muted, + }) +} + + + +// handleMicrophoneReset handles POST /microphone/reset requests +func handleMicrophoneReset(c *gin.Context) { + if currentSession == nil { + c.JSON(400, gin.H{"error": "no active session"}) + return + } + + if currentSession.AudioInputManager == nil { + c.JSON(500, gin.H{"error": "audio input manager not available"}) + return + } + + // Check cooldown using atomic operations + // Note: Cooldown check would be implemented in audio package if needed + + logger.Info().Msg("forcing microphone state reset") + + // Force stop the AudioInputManager + currentSession.AudioInputManager.Stop() + + // Wait a bit to ensure everything is stopped + time.Sleep(100 * time.Millisecond) + + // Broadcast microphone state change via WebSocket + broadcaster := audio.GetAudioEventBroadcaster() + broadcaster.BroadcastAudioDeviceChanged(false, "microphone_reset") + + c.JSON(200, gin.H{ + "status": "microphone reset completed", + "is_running": currentSession.AudioInputManager.IsRunning(), + }) +} + +// handleSubscribeAudioEvents handles WebSocket audio event subscription +func handleSubscribeAudioEvents(connectionID string, wsCon *websocket.Conn, runCtx context.Context, l *zerolog.Logger) { + l.Info().Msg("client subscribing to audio events") + broadcaster := audio.GetAudioEventBroadcaster() + broadcaster.Subscribe(connectionID, wsCon, runCtx, l) +} + +// handleUnsubscribeAudioEvents handles WebSocket audio event unsubscription +func handleUnsubscribeAudioEvents(connectionID string, l *zerolog.Logger) { + l.Info().Msg("client unsubscribing from audio events") + broadcaster := audio.GetAudioEventBroadcaster() + broadcaster.Unsubscribe(connectionID) +} diff --git a/cloud.go b/cloud.go index e3e1d5da..cec749e4 100644 --- a/cloud.go +++ b/cloud.go @@ -39,8 +39,7 @@ const ( // should be lower than the websocket response timeout set in cloud-api CloudOidcRequestTimeout = 10 * time.Second // WebsocketPingInterval is the interval at which the websocket client sends ping messages to the cloud - // Increased to 30 seconds for constrained environments to reduce overhead - WebsocketPingInterval = 30 * time.Second + WebsocketPingInterval = 15 * time.Second ) var ( @@ -77,6 +76,23 @@ var ( }, []string{"type", "source"}, ) + metricConnectionPingDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "jetkvm_connection_ping_duration_seconds", + Help: "The duration of the ping response", + Buckets: []float64{ + 0.1, 0.5, 1, 10, + }, + }, + []string{"type", "source"}, + ) + metricConnectionTotalPingSentCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "jetkvm_connection_ping_sent_total", + Help: "The total number of pings sent to the connection", + }, + []string{"type", "source"}, + ) metricConnectionTotalPingReceivedCount = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "jetkvm_connection_ping_received_total", @@ -84,6 +100,13 @@ var ( }, []string{"type", "source"}, ) + metricConnectionSessionRequestCount = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "jetkvm_connection_session_requests_total", + Help: "The total number of session requests received", + }, + []string{"type", "source"}, + ) metricConnectionSessionRequestDuration = promauto.NewHistogramVec( prometheus.HistogramOpts{ Name: "jetkvm_connection_session_request_duration_seconds", @@ -424,70 +447,35 @@ func handleSessionRequest( } } - var session *Session - var err error - var sd string + session, err := newSession(SessionConfig{ + ws: c, + IsCloud: isCloudConnection, + LocalIP: req.IP, + ICEServers: req.ICEServers, + Logger: scopedLogger, + }) + if err != nil { + _ = wsjson.Write(context.Background(), c, gin.H{"error": err}) + return err + } - // Check if we have an existing session + sd, err := session.ExchangeOffer(req.Sd) + if err != nil { + _ = wsjson.Write(context.Background(), c, gin.H{"error": err}) + return err + } if currentSession != nil { - scopedLogger.Info().Msg("existing session detected, creating new session and notifying old session") - - // Always create a new session when there's an existing one - // This ensures the "otherSessionConnected" prompt is shown - session, err = newSession(SessionConfig{ - ws: c, - IsCloud: isCloudConnection, - LocalIP: req.IP, - ICEServers: req.ICEServers, - Logger: scopedLogger, - }) - if err != nil { - _ = wsjson.Write(context.Background(), c, gin.H{"error": err}) - return err - } - - sd, err = session.ExchangeOffer(req.Sd) - if err != nil { - _ = wsjson.Write(context.Background(), c, gin.H{"error": err}) - return err - } - - // Notify the old session about the takeover writeJSONRPCEvent("otherSessionConnected", nil, currentSession) peerConn := currentSession.peerConnection go func() { time.Sleep(1 * time.Second) _ = peerConn.Close() }() - - currentSession = session - scopedLogger.Info().Interface("session", session).Msg("new session created, old session notified") - } else { - // No existing session, create a new one - scopedLogger.Info().Msg("creating new session") - session, err = newSession(SessionConfig{ - ws: c, - IsCloud: isCloudConnection, - LocalIP: req.IP, - ICEServers: req.ICEServers, - Logger: scopedLogger, - }) - if err != nil { - _ = wsjson.Write(context.Background(), c, gin.H{"error": err}) - return err - } - - sd, err = session.ExchangeOffer(req.Sd) - if err != nil { - _ = wsjson.Write(context.Background(), c, gin.H{"error": err}) - return err - } - - currentSession = session - cloudLogger.Info().Interface("session", session).Msg("new session accepted") - cloudLogger.Trace().Interface("session", session).Msg("new session accepted") } + cloudLogger.Info().Interface("session", session).Msg("new session accepted") + cloudLogger.Trace().Interface("session", session).Msg("new session accepted") + currentSession = session _ = wsjson.Write(context.Background(), c, gin.H{"type": "answer", "data": sd}) return nil } diff --git a/web.go b/web.go index 10a58fc6..5f0e488c 100644 --- a/web.go +++ b/web.go @@ -14,17 +14,15 @@ import ( "strings" "time" - "github.com/jetkvm/kvm/internal/audio" - "github.com/coder/websocket" "github.com/coder/websocket/wsjson" - gin_logger "github.com/gin-contrib/logger" "github.com/gin-gonic/gin" "github.com/google/uuid" "github.com/jetkvm/kvm/internal/logging" "github.com/pion/webrtc/v4" - + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog" "golang.org/x/crypto/bcrypt" ) @@ -102,6 +100,9 @@ func setupRouter() *gin.Engine { // We use this to setup the device in the welcome page r.POST("/device/setup", handleSetup) + // A Prometheus metrics endpoint. + r.GET("/metrics", gin.WrapH(promhttp.Handler())) + // Developer mode protected routes developerModeRouter := r.Group("/developer/") developerModeRouter.Use(basicAuthProtectedMiddleware(true)) @@ -153,274 +154,14 @@ func setupRouter() *gin.Engine { protected.PUT("/auth/password-local", handleUpdatePassword) protected.DELETE("/auth/local-password", handleDeletePassword) protected.POST("/storage/upload", handleUploadHttp) + + // Audio handlers + protected.POST("/audio/mute", handleAudioMute) + protected.POST("/microphone/start", handleMicrophoneStart) + protected.POST("/microphone/mute", handleMicrophoneMute) + protected.POST("/microphone/reset", handleMicrophoneReset) } - protected.POST("/audio/mute", func(c *gin.Context) { - type muteReq struct { - Muted bool `json:"muted"` - } - var req muteReq - if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(400, gin.H{"error": "invalid request"}) - return - } - audio.SetAudioMuted(req.Muted) - // Also set relay mute state if in main process - audio.SetAudioRelayMuted(req.Muted) - - // Broadcast audio mute state change via WebSocket - broadcaster := audio.GetAudioEventBroadcaster() - broadcaster.BroadcastAudioMuteChanged(req.Muted) - - c.JSON(200, gin.H{"muted": req.Muted}) - }) - - protected.GET("/audio/quality", func(c *gin.Context) { - config := audio.GetAudioConfig() - presets := audio.GetAudioQualityPresets() - c.JSON(200, gin.H{ - "current": config, - "presets": presets, - }) - }) - - protected.POST("/audio/quality", func(c *gin.Context) { - type qualityReq struct { - Quality int `json:"quality"` - } - var req qualityReq - if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(400, gin.H{"error": "invalid request"}) - return - } - - // Validate quality level - if req.Quality < 0 || req.Quality > 3 { - c.JSON(400, gin.H{"error": "invalid quality level (0-3)"}) - return - } - - audio.SetAudioQuality(audio.AudioQuality(req.Quality)) - c.JSON(200, gin.H{ - "quality": req.Quality, - "config": audio.GetAudioConfig(), - }) - }) - - protected.GET("/microphone/quality", func(c *gin.Context) { - config := audio.GetMicrophoneConfig() - presets := audio.GetMicrophoneQualityPresets() - c.JSON(200, gin.H{ - "current": config, - "presets": presets, - }) - }) - - protected.POST("/microphone/quality", func(c *gin.Context) { - type qualityReq struct { - Quality int `json:"quality"` - } - var req qualityReq - if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(400, gin.H{"error": "invalid request"}) - return - } - - // Validate quality level - if req.Quality < 0 || req.Quality > 3 { - c.JSON(400, gin.H{"error": "invalid quality level (0-3)"}) - return - } - - audio.SetMicrophoneQuality(audio.AudioQuality(req.Quality)) - c.JSON(200, gin.H{ - "quality": req.Quality, - "config": audio.GetMicrophoneConfig(), - }) - }) - - // Microphone API endpoints - protected.GET("/microphone/status", func(c *gin.Context) { - sessionActive := currentSession != nil - var running bool - - if sessionActive && currentSession.AudioInputManager != nil { - running = currentSession.AudioInputManager.IsRunning() - } - - c.JSON(200, gin.H{ - "running": running, - "session_active": sessionActive, - }) - }) - - protected.POST("/microphone/start", func(c *gin.Context) { - if currentSession == nil { - c.JSON(400, gin.H{"error": "no active session"}) - return - } - - if currentSession.AudioInputManager == nil { - c.JSON(500, gin.H{"error": "audio input manager not available"}) - return - } - - // Optimized server-side cooldown using atomic operations - opResult := audio.TryMicrophoneOperation() - if !opResult.Allowed { - running := currentSession.AudioInputManager.IsRunning() - c.JSON(200, gin.H{ - "status": "cooldown", - "running": running, - "cooldown_ms_remaining": opResult.RemainingCooldown.Milliseconds(), - "operation_id": opResult.OperationID, - }) - return - } - - // Check if already running before attempting to start - if currentSession.AudioInputManager.IsRunning() { - c.JSON(200, gin.H{ - "status": "already running", - "running": true, - }) - return - } - - err := currentSession.AudioInputManager.Start() - if err != nil { - // Log the error for debugging but don't expose internal details - logger.Warn().Err(err).Msg("failed to start microphone") - - // Check if it's already running after the failed start attempt - // This handles race conditions where another request started it - if currentSession.AudioInputManager.IsRunning() { - c.JSON(200, gin.H{ - "status": "started by concurrent request", - "running": true, - }) - return - } - - c.JSON(500, gin.H{"error": "failed to start microphone"}) - return - } - - // Broadcast microphone state change via WebSocket - broadcaster := audio.GetAudioEventBroadcaster() - broadcaster.BroadcastMicrophoneStateChanged(true, true) - - c.JSON(200, gin.H{ - "status": "started", - "running": currentSession.AudioInputManager.IsRunning(), - }) - }) - - protected.POST("/microphone/stop", func(c *gin.Context) { - if currentSession == nil { - c.JSON(400, gin.H{"error": "no active session"}) - return - } - - if currentSession.AudioInputManager == nil { - c.JSON(500, gin.H{"error": "audio input manager not available"}) - return - } - - // Optimized server-side cooldown using atomic operations - opResult := audio.TryMicrophoneOperation() - if !opResult.Allowed { - running := currentSession.AudioInputManager.IsRunning() - c.JSON(200, gin.H{ - "status": "cooldown", - "running": running, - "cooldown_ms_remaining": opResult.RemainingCooldown.Milliseconds(), - "operation_id": opResult.OperationID, - }) - return - } - - // Check if already stopped before attempting to stop - if !currentSession.AudioInputManager.IsRunning() { - c.JSON(200, gin.H{ - "status": "already stopped", - "running": false, - }) - return - } - - currentSession.AudioInputManager.Stop() - - // AudioInputManager.Stop() already coordinates a clean stop via IPC audio input system - // so we don't need to call it again here - - // Broadcast microphone state change via WebSocket - broadcaster := audio.GetAudioEventBroadcaster() - broadcaster.BroadcastMicrophoneStateChanged(false, true) - - c.JSON(200, gin.H{ - "status": "stopped", - "running": currentSession.AudioInputManager.IsRunning(), - }) - }) - - protected.POST("/microphone/mute", func(c *gin.Context) { - var req struct { - Muted bool `json:"muted"` - } - - if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(400, gin.H{"error": "invalid request body"}) - return - } - - // Note: Microphone muting is typically handled at the frontend level - // This endpoint is provided for consistency but doesn't affect backend processing - c.JSON(200, gin.H{ - "status": "mute state updated", - "muted": req.Muted, - }) - }) - - // System memory information endpoint - protected.GET("/system/memory", func(c *gin.Context) { - processMonitor := audio.GetProcessMonitor() - totalMemory := processMonitor.GetTotalMemory() - c.JSON(200, gin.H{ - "total_memory_bytes": totalMemory, - "total_memory_mb": totalMemory / (1024 * 1024), - }) - }) - - protected.POST("/microphone/reset", func(c *gin.Context) { - if currentSession == nil { - c.JSON(400, gin.H{"error": "no active session"}) - return - } - - if currentSession.AudioInputManager == nil { - c.JSON(500, gin.H{"error": "audio input manager not available"}) - return - } - - logger.Info().Msg("forcing microphone state reset") - - // Force stop the AudioInputManager - currentSession.AudioInputManager.Stop() - - // Wait a bit to ensure everything is stopped - time.Sleep(100 * time.Millisecond) - - // Broadcast microphone state change via WebSocket - broadcaster := audio.GetAudioEventBroadcaster() - broadcaster.BroadcastMicrophoneStateChanged(false, true) - - c.JSON(200, gin.H{ - "status": "reset", - "audio_input_running": currentSession.AudioInputManager.IsRunning(), - }) - }) - // Catch-all route for SPA r.NoRoute(func(c *gin.Context) { if c.Request.Method == "GET" && c.NegotiateFormat(gin.MIMEHTML) == gin.MIMEHTML { @@ -444,57 +185,26 @@ func handleWebRTCSession(c *gin.Context) { return } - var session *Session - var err error - var sd string + session, err := newSession(SessionConfig{}) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err}) + return + } - // Check if we have an existing session + sd, err := session.ExchangeOffer(req.Sd) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err}) + return + } if currentSession != nil { - logger.Info().Msg("existing session detected, creating new session and notifying old session") - - // Always create a new session when there's an existing one - // This ensures the "otherSessionConnected" prompt is shown - session, err = newSession(SessionConfig{}) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err}) - return - } - - sd, err = session.ExchangeOffer(req.Sd) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err}) - return - } - - // Notify the old session about the takeover writeJSONRPCEvent("otherSessionConnected", nil, currentSession) peerConn := currentSession.peerConnection go func() { time.Sleep(1 * time.Second) _ = peerConn.Close() }() - - currentSession = session - logger.Info().Interface("session", session).Msg("new session created, old session notified") - } else { - // No existing session, create a new one - logger.Info().Msg("creating new session") - session, err = newSession(SessionConfig{}) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err}) - return - } - - sd, err = session.ExchangeOffer(req.Sd) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err}) - return - } - - currentSession = session - logger.Info().Interface("session", session).Msg("new session accepted") } - + currentSession = session c.JSON(http.StatusOK, gin.H{"sd": sd}) } @@ -563,9 +273,6 @@ func handleWebRTCSignalWsMessages( if isCloudConnection { setCloudConnectionState(CloudConnectionStateDisconnected) } - // Clean up audio event subscription - broadcaster := audio.GetAudioEventBroadcaster() - broadcaster.Unsubscribe(connectionID) cancelRun() }() @@ -598,7 +305,11 @@ func handleWebRTCSignalWsMessages( return } - // Metrics collection disabled + // set the timer for the ping duration + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { + metricConnectionLastPingDuration.WithLabelValues(sourceType, source).Set(v) + metricConnectionPingDuration.WithLabelValues(sourceType, source).Observe(v) + })) l.Trace().Msg("sending ping frame") err := wsCon.Ping(runCtx) @@ -609,9 +320,13 @@ func handleWebRTCSignalWsMessages( return } - // Metrics collection disabled + // dont use `defer` here because we want to observe the duration of the ping + duration := timer.ObserveDuration() - l.Trace().Msg("received pong frame") + metricConnectionTotalPingSentCount.WithLabelValues(sourceType, source).Inc() + metricConnectionLastPingTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime() + + l.Trace().Str("duration", duration.String()).Msg("received pong frame") } }() @@ -657,7 +372,8 @@ func handleWebRTCSignalWsMessages( return err } - // Metrics collection disabled + metricConnectionTotalPingReceivedCount.WithLabelValues(sourceType, source).Inc() + metricConnectionLastPingReceivedTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime() continue } @@ -681,7 +397,8 @@ func handleWebRTCSignalWsMessages( l.Info().Str("oidcGoogle", req.OidcGoogle).Msg("new session request with OIDC Google") } - // Metrics collection disabled + metricConnectionSessionRequestCount.WithLabelValues(sourceType, source).Inc() + metricConnectionLastSessionRequestTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime() err = handleSessionRequest(runCtx, wsCon, req, isCloudConnection, source, &l) if err != nil { l.Warn().Str("error", err.Error()).Msg("error starting new session") @@ -714,13 +431,9 @@ func handleWebRTCSignalWsMessages( l.Warn().Str("error", err.Error()).Msg("failed to add incoming ICE candidate to our peer connection") } } else if message.Type == "subscribe-audio-events" { - l.Info().Msg("client subscribing to audio events") - broadcaster := audio.GetAudioEventBroadcaster() - broadcaster.Subscribe(connectionID, wsCon, runCtx, &l) + handleSubscribeAudioEvents(connectionID, wsCon, runCtx, &l) } else if message.Type == "unsubscribe-audio-events" { - l.Info().Msg("client unsubscribing from audio events") - broadcaster := audio.GetAudioEventBroadcaster() - broadcaster.Unsubscribe(connectionID) + handleUnsubscribeAudioEvents(connectionID, &l) } } }