feat(audio): add audio handlers and refactor session management

- Extract audio-related handlers into separate file for better organization
- Simplify session creation logic by removing duplicate code paths
- Add new Prometheus metrics for connection monitoring
- Reduce websocket ping interval from 30s to 15s for better responsiveness
This commit is contained in:
Alex P 2025-09-05 16:30:09 +00:00
parent d9072673c0
commit 0a38451c95
3 changed files with 219 additions and 382 deletions

136
audio_handlers.go Normal file
View File

@ -0,0 +1,136 @@
package kvm
import (
"context"
"time"
"github.com/coder/websocket"
"github.com/gin-gonic/gin"
"github.com/jetkvm/kvm/internal/audio"
"github.com/rs/zerolog"
)
// handleAudioMute handles POST /audio/mute requests
func handleAudioMute(c *gin.Context) {
type muteReq struct {
Muted bool `json:"muted"`
}
var req muteReq
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "invalid request"})
return
}
audio.SetAudioMuted(req.Muted)
// Also set relay mute state if in main process
audio.SetAudioRelayMuted(req.Muted)
// Broadcast audio mute state change via WebSocket
broadcaster := audio.GetAudioEventBroadcaster()
broadcaster.BroadcastAudioDeviceChanged(!req.Muted, "audio_mute_changed")
c.JSON(200, gin.H{
"status": "audio mute state updated",
"muted": req.Muted,
})
}
// handleMicrophoneStart handles POST /microphone/start requests
func handleMicrophoneStart(c *gin.Context) {
if currentSession == nil {
c.JSON(400, gin.H{"error": "no active session"})
return
}
if currentSession.AudioInputManager == nil {
c.JSON(500, gin.H{"error": "audio input manager not available"})
return
}
// Check cooldown using atomic operations
// Note: Cooldown check would be implemented in audio package if needed
logger.Info().Msg("starting microphone via HTTP request")
err := currentSession.AudioInputManager.Start()
if err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
// Broadcast microphone state change via WebSocket
broadcaster := audio.GetAudioEventBroadcaster()
broadcaster.BroadcastAudioDeviceChanged(true, "microphone_started")
c.JSON(200, gin.H{
"status": "microphone started",
"is_running": currentSession.AudioInputManager.IsRunning(),
})
}
// handleMicrophoneMute handles POST /microphone/mute requests
func handleMicrophoneMute(c *gin.Context) {
var req struct {
Muted bool `json:"muted"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "invalid request body"})
return
}
// Note: Microphone muting is typically handled at the frontend level
// This endpoint is provided for consistency but doesn't affect backend processing
c.JSON(200, gin.H{
"status": "mute state updated",
"muted": req.Muted,
})
}
// handleMicrophoneReset handles POST /microphone/reset requests
func handleMicrophoneReset(c *gin.Context) {
if currentSession == nil {
c.JSON(400, gin.H{"error": "no active session"})
return
}
if currentSession.AudioInputManager == nil {
c.JSON(500, gin.H{"error": "audio input manager not available"})
return
}
// Check cooldown using atomic operations
// Note: Cooldown check would be implemented in audio package if needed
logger.Info().Msg("forcing microphone state reset")
// Force stop the AudioInputManager
currentSession.AudioInputManager.Stop()
// Wait a bit to ensure everything is stopped
time.Sleep(100 * time.Millisecond)
// Broadcast microphone state change via WebSocket
broadcaster := audio.GetAudioEventBroadcaster()
broadcaster.BroadcastAudioDeviceChanged(false, "microphone_reset")
c.JSON(200, gin.H{
"status": "microphone reset completed",
"is_running": currentSession.AudioInputManager.IsRunning(),
})
}
// handleSubscribeAudioEvents handles WebSocket audio event subscription
func handleSubscribeAudioEvents(connectionID string, wsCon *websocket.Conn, runCtx context.Context, l *zerolog.Logger) {
l.Info().Msg("client subscribing to audio events")
broadcaster := audio.GetAudioEventBroadcaster()
broadcaster.Subscribe(connectionID, wsCon, runCtx, l)
}
// handleUnsubscribeAudioEvents handles WebSocket audio event unsubscription
func handleUnsubscribeAudioEvents(connectionID string, l *zerolog.Logger) {
l.Info().Msg("client unsubscribing from audio events")
broadcaster := audio.GetAudioEventBroadcaster()
broadcaster.Unsubscribe(connectionID)
}

100
cloud.go
View File

@ -39,8 +39,7 @@ const (
// should be lower than the websocket response timeout set in cloud-api
CloudOidcRequestTimeout = 10 * time.Second
// WebsocketPingInterval is the interval at which the websocket client sends ping messages to the cloud
// Increased to 30 seconds for constrained environments to reduce overhead
WebsocketPingInterval = 30 * time.Second
WebsocketPingInterval = 15 * time.Second
)
var (
@ -77,6 +76,23 @@ var (
},
[]string{"type", "source"},
)
metricConnectionPingDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "jetkvm_connection_ping_duration_seconds",
Help: "The duration of the ping response",
Buckets: []float64{
0.1, 0.5, 1, 10,
},
},
[]string{"type", "source"},
)
metricConnectionTotalPingSentCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "jetkvm_connection_ping_sent_total",
Help: "The total number of pings sent to the connection",
},
[]string{"type", "source"},
)
metricConnectionTotalPingReceivedCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "jetkvm_connection_ping_received_total",
@ -84,6 +100,13 @@ var (
},
[]string{"type", "source"},
)
metricConnectionSessionRequestCount = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "jetkvm_connection_session_requests_total",
Help: "The total number of session requests received",
},
[]string{"type", "source"},
)
metricConnectionSessionRequestDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "jetkvm_connection_session_request_duration_seconds",
@ -424,70 +447,35 @@ func handleSessionRequest(
}
}
var session *Session
var err error
var sd string
session, err := newSession(SessionConfig{
ws: c,
IsCloud: isCloudConnection,
LocalIP: req.IP,
ICEServers: req.ICEServers,
Logger: scopedLogger,
})
if err != nil {
_ = wsjson.Write(context.Background(), c, gin.H{"error": err})
return err
}
// Check if we have an existing session
sd, err := session.ExchangeOffer(req.Sd)
if err != nil {
_ = wsjson.Write(context.Background(), c, gin.H{"error": err})
return err
}
if currentSession != nil {
scopedLogger.Info().Msg("existing session detected, creating new session and notifying old session")
// Always create a new session when there's an existing one
// This ensures the "otherSessionConnected" prompt is shown
session, err = newSession(SessionConfig{
ws: c,
IsCloud: isCloudConnection,
LocalIP: req.IP,
ICEServers: req.ICEServers,
Logger: scopedLogger,
})
if err != nil {
_ = wsjson.Write(context.Background(), c, gin.H{"error": err})
return err
}
sd, err = session.ExchangeOffer(req.Sd)
if err != nil {
_ = wsjson.Write(context.Background(), c, gin.H{"error": err})
return err
}
// Notify the old session about the takeover
writeJSONRPCEvent("otherSessionConnected", nil, currentSession)
peerConn := currentSession.peerConnection
go func() {
time.Sleep(1 * time.Second)
_ = peerConn.Close()
}()
currentSession = session
scopedLogger.Info().Interface("session", session).Msg("new session created, old session notified")
} else {
// No existing session, create a new one
scopedLogger.Info().Msg("creating new session")
session, err = newSession(SessionConfig{
ws: c,
IsCloud: isCloudConnection,
LocalIP: req.IP,
ICEServers: req.ICEServers,
Logger: scopedLogger,
})
if err != nil {
_ = wsjson.Write(context.Background(), c, gin.H{"error": err})
return err
}
sd, err = session.ExchangeOffer(req.Sd)
if err != nil {
_ = wsjson.Write(context.Background(), c, gin.H{"error": err})
return err
}
currentSession = session
cloudLogger.Info().Interface("session", session).Msg("new session accepted")
cloudLogger.Trace().Interface("session", session).Msg("new session accepted")
}
cloudLogger.Info().Interface("session", session).Msg("new session accepted")
cloudLogger.Trace().Interface("session", session).Msg("new session accepted")
currentSession = session
_ = wsjson.Write(context.Background(), c, gin.H{"type": "answer", "data": sd})
return nil
}

365
web.go
View File

@ -14,17 +14,15 @@ import (
"strings"
"time"
"github.com/jetkvm/kvm/internal/audio"
"github.com/coder/websocket"
"github.com/coder/websocket/wsjson"
gin_logger "github.com/gin-contrib/logger"
"github.com/gin-gonic/gin"
"github.com/google/uuid"
"github.com/jetkvm/kvm/internal/logging"
"github.com/pion/webrtc/v4"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog"
"golang.org/x/crypto/bcrypt"
)
@ -102,6 +100,9 @@ func setupRouter() *gin.Engine {
// We use this to setup the device in the welcome page
r.POST("/device/setup", handleSetup)
// A Prometheus metrics endpoint.
r.GET("/metrics", gin.WrapH(promhttp.Handler()))
// Developer mode protected routes
developerModeRouter := r.Group("/developer/")
developerModeRouter.Use(basicAuthProtectedMiddleware(true))
@ -153,274 +154,14 @@ func setupRouter() *gin.Engine {
protected.PUT("/auth/password-local", handleUpdatePassword)
protected.DELETE("/auth/local-password", handleDeletePassword)
protected.POST("/storage/upload", handleUploadHttp)
// Audio handlers
protected.POST("/audio/mute", handleAudioMute)
protected.POST("/microphone/start", handleMicrophoneStart)
protected.POST("/microphone/mute", handleMicrophoneMute)
protected.POST("/microphone/reset", handleMicrophoneReset)
}
protected.POST("/audio/mute", func(c *gin.Context) {
type muteReq struct {
Muted bool `json:"muted"`
}
var req muteReq
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "invalid request"})
return
}
audio.SetAudioMuted(req.Muted)
// Also set relay mute state if in main process
audio.SetAudioRelayMuted(req.Muted)
// Broadcast audio mute state change via WebSocket
broadcaster := audio.GetAudioEventBroadcaster()
broadcaster.BroadcastAudioMuteChanged(req.Muted)
c.JSON(200, gin.H{"muted": req.Muted})
})
protected.GET("/audio/quality", func(c *gin.Context) {
config := audio.GetAudioConfig()
presets := audio.GetAudioQualityPresets()
c.JSON(200, gin.H{
"current": config,
"presets": presets,
})
})
protected.POST("/audio/quality", func(c *gin.Context) {
type qualityReq struct {
Quality int `json:"quality"`
}
var req qualityReq
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "invalid request"})
return
}
// Validate quality level
if req.Quality < 0 || req.Quality > 3 {
c.JSON(400, gin.H{"error": "invalid quality level (0-3)"})
return
}
audio.SetAudioQuality(audio.AudioQuality(req.Quality))
c.JSON(200, gin.H{
"quality": req.Quality,
"config": audio.GetAudioConfig(),
})
})
protected.GET("/microphone/quality", func(c *gin.Context) {
config := audio.GetMicrophoneConfig()
presets := audio.GetMicrophoneQualityPresets()
c.JSON(200, gin.H{
"current": config,
"presets": presets,
})
})
protected.POST("/microphone/quality", func(c *gin.Context) {
type qualityReq struct {
Quality int `json:"quality"`
}
var req qualityReq
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "invalid request"})
return
}
// Validate quality level
if req.Quality < 0 || req.Quality > 3 {
c.JSON(400, gin.H{"error": "invalid quality level (0-3)"})
return
}
audio.SetMicrophoneQuality(audio.AudioQuality(req.Quality))
c.JSON(200, gin.H{
"quality": req.Quality,
"config": audio.GetMicrophoneConfig(),
})
})
// Microphone API endpoints
protected.GET("/microphone/status", func(c *gin.Context) {
sessionActive := currentSession != nil
var running bool
if sessionActive && currentSession.AudioInputManager != nil {
running = currentSession.AudioInputManager.IsRunning()
}
c.JSON(200, gin.H{
"running": running,
"session_active": sessionActive,
})
})
protected.POST("/microphone/start", func(c *gin.Context) {
if currentSession == nil {
c.JSON(400, gin.H{"error": "no active session"})
return
}
if currentSession.AudioInputManager == nil {
c.JSON(500, gin.H{"error": "audio input manager not available"})
return
}
// Optimized server-side cooldown using atomic operations
opResult := audio.TryMicrophoneOperation()
if !opResult.Allowed {
running := currentSession.AudioInputManager.IsRunning()
c.JSON(200, gin.H{
"status": "cooldown",
"running": running,
"cooldown_ms_remaining": opResult.RemainingCooldown.Milliseconds(),
"operation_id": opResult.OperationID,
})
return
}
// Check if already running before attempting to start
if currentSession.AudioInputManager.IsRunning() {
c.JSON(200, gin.H{
"status": "already running",
"running": true,
})
return
}
err := currentSession.AudioInputManager.Start()
if err != nil {
// Log the error for debugging but don't expose internal details
logger.Warn().Err(err).Msg("failed to start microphone")
// Check if it's already running after the failed start attempt
// This handles race conditions where another request started it
if currentSession.AudioInputManager.IsRunning() {
c.JSON(200, gin.H{
"status": "started by concurrent request",
"running": true,
})
return
}
c.JSON(500, gin.H{"error": "failed to start microphone"})
return
}
// Broadcast microphone state change via WebSocket
broadcaster := audio.GetAudioEventBroadcaster()
broadcaster.BroadcastMicrophoneStateChanged(true, true)
c.JSON(200, gin.H{
"status": "started",
"running": currentSession.AudioInputManager.IsRunning(),
})
})
protected.POST("/microphone/stop", func(c *gin.Context) {
if currentSession == nil {
c.JSON(400, gin.H{"error": "no active session"})
return
}
if currentSession.AudioInputManager == nil {
c.JSON(500, gin.H{"error": "audio input manager not available"})
return
}
// Optimized server-side cooldown using atomic operations
opResult := audio.TryMicrophoneOperation()
if !opResult.Allowed {
running := currentSession.AudioInputManager.IsRunning()
c.JSON(200, gin.H{
"status": "cooldown",
"running": running,
"cooldown_ms_remaining": opResult.RemainingCooldown.Milliseconds(),
"operation_id": opResult.OperationID,
})
return
}
// Check if already stopped before attempting to stop
if !currentSession.AudioInputManager.IsRunning() {
c.JSON(200, gin.H{
"status": "already stopped",
"running": false,
})
return
}
currentSession.AudioInputManager.Stop()
// AudioInputManager.Stop() already coordinates a clean stop via IPC audio input system
// so we don't need to call it again here
// Broadcast microphone state change via WebSocket
broadcaster := audio.GetAudioEventBroadcaster()
broadcaster.BroadcastMicrophoneStateChanged(false, true)
c.JSON(200, gin.H{
"status": "stopped",
"running": currentSession.AudioInputManager.IsRunning(),
})
})
protected.POST("/microphone/mute", func(c *gin.Context) {
var req struct {
Muted bool `json:"muted"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "invalid request body"})
return
}
// Note: Microphone muting is typically handled at the frontend level
// This endpoint is provided for consistency but doesn't affect backend processing
c.JSON(200, gin.H{
"status": "mute state updated",
"muted": req.Muted,
})
})
// System memory information endpoint
protected.GET("/system/memory", func(c *gin.Context) {
processMonitor := audio.GetProcessMonitor()
totalMemory := processMonitor.GetTotalMemory()
c.JSON(200, gin.H{
"total_memory_bytes": totalMemory,
"total_memory_mb": totalMemory / (1024 * 1024),
})
})
protected.POST("/microphone/reset", func(c *gin.Context) {
if currentSession == nil {
c.JSON(400, gin.H{"error": "no active session"})
return
}
if currentSession.AudioInputManager == nil {
c.JSON(500, gin.H{"error": "audio input manager not available"})
return
}
logger.Info().Msg("forcing microphone state reset")
// Force stop the AudioInputManager
currentSession.AudioInputManager.Stop()
// Wait a bit to ensure everything is stopped
time.Sleep(100 * time.Millisecond)
// Broadcast microphone state change via WebSocket
broadcaster := audio.GetAudioEventBroadcaster()
broadcaster.BroadcastMicrophoneStateChanged(false, true)
c.JSON(200, gin.H{
"status": "reset",
"audio_input_running": currentSession.AudioInputManager.IsRunning(),
})
})
// Catch-all route for SPA
r.NoRoute(func(c *gin.Context) {
if c.Request.Method == "GET" && c.NegotiateFormat(gin.MIMEHTML) == gin.MIMEHTML {
@ -444,57 +185,26 @@ func handleWebRTCSession(c *gin.Context) {
return
}
var session *Session
var err error
var sd string
session, err := newSession(SessionConfig{})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err})
return
}
// Check if we have an existing session
sd, err := session.ExchangeOffer(req.Sd)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err})
return
}
if currentSession != nil {
logger.Info().Msg("existing session detected, creating new session and notifying old session")
// Always create a new session when there's an existing one
// This ensures the "otherSessionConnected" prompt is shown
session, err = newSession(SessionConfig{})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err})
return
}
sd, err = session.ExchangeOffer(req.Sd)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err})
return
}
// Notify the old session about the takeover
writeJSONRPCEvent("otherSessionConnected", nil, currentSession)
peerConn := currentSession.peerConnection
go func() {
time.Sleep(1 * time.Second)
_ = peerConn.Close()
}()
currentSession = session
logger.Info().Interface("session", session).Msg("new session created, old session notified")
} else {
// No existing session, create a new one
logger.Info().Msg("creating new session")
session, err = newSession(SessionConfig{})
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err})
return
}
sd, err = session.ExchangeOffer(req.Sd)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err})
return
}
currentSession = session
logger.Info().Interface("session", session).Msg("new session accepted")
}
currentSession = session
c.JSON(http.StatusOK, gin.H{"sd": sd})
}
@ -563,9 +273,6 @@ func handleWebRTCSignalWsMessages(
if isCloudConnection {
setCloudConnectionState(CloudConnectionStateDisconnected)
}
// Clean up audio event subscription
broadcaster := audio.GetAudioEventBroadcaster()
broadcaster.Unsubscribe(connectionID)
cancelRun()
}()
@ -598,7 +305,11 @@ func handleWebRTCSignalWsMessages(
return
}
// Metrics collection disabled
// set the timer for the ping duration
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
metricConnectionLastPingDuration.WithLabelValues(sourceType, source).Set(v)
metricConnectionPingDuration.WithLabelValues(sourceType, source).Observe(v)
}))
l.Trace().Msg("sending ping frame")
err := wsCon.Ping(runCtx)
@ -609,9 +320,13 @@ func handleWebRTCSignalWsMessages(
return
}
// Metrics collection disabled
// dont use `defer` here because we want to observe the duration of the ping
duration := timer.ObserveDuration()
l.Trace().Msg("received pong frame")
metricConnectionTotalPingSentCount.WithLabelValues(sourceType, source).Inc()
metricConnectionLastPingTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()
l.Trace().Str("duration", duration.String()).Msg("received pong frame")
}
}()
@ -657,7 +372,8 @@ func handleWebRTCSignalWsMessages(
return err
}
// Metrics collection disabled
metricConnectionTotalPingReceivedCount.WithLabelValues(sourceType, source).Inc()
metricConnectionLastPingReceivedTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()
continue
}
@ -681,7 +397,8 @@ func handleWebRTCSignalWsMessages(
l.Info().Str("oidcGoogle", req.OidcGoogle).Msg("new session request with OIDC Google")
}
// Metrics collection disabled
metricConnectionSessionRequestCount.WithLabelValues(sourceType, source).Inc()
metricConnectionLastSessionRequestTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()
err = handleSessionRequest(runCtx, wsCon, req, isCloudConnection, source, &l)
if err != nil {
l.Warn().Str("error", err.Error()).Msg("error starting new session")
@ -714,13 +431,9 @@ func handleWebRTCSignalWsMessages(
l.Warn().Str("error", err.Error()).Msg("failed to add incoming ICE candidate to our peer connection")
}
} else if message.Type == "subscribe-audio-events" {
l.Info().Msg("client subscribing to audio events")
broadcaster := audio.GetAudioEventBroadcaster()
broadcaster.Subscribe(connectionID, wsCon, runCtx, &l)
handleSubscribeAudioEvents(connectionID, wsCon, runCtx, &l)
} else if message.Type == "unsubscribe-audio-events" {
l.Info().Msg("client unsubscribing from audio events")
broadcaster := audio.GetAudioEventBroadcaster()
broadcaster.Unsubscribe(connectionID)
handleUnsubscribeAudioEvents(connectionID, &l)
}
}
}