diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go index 18cef09..e59bdc8 100644 --- a/internal/audio/input_ipc.go +++ b/internal/audio/input_ipc.go @@ -178,6 +178,9 @@ type AudioInputServer struct { processChan chan *InputIPCMessage // Buffered channel for processing queue stopChan chan struct{} // Stop signal for all goroutines wg sync.WaitGroup // Wait group for goroutine coordination + + // Socket buffer configuration + socketBufferConfig SocketBufferConfig } // NewAudioInputServer creates a new audio input server @@ -195,12 +198,16 @@ func NewAudioInputServer() (*AudioInputServer, error) { adaptiveManager := GetAdaptiveBufferManager() initialBufferSize := int64(adaptiveManager.GetInputBufferSize()) + // Initialize socket buffer configuration + socketBufferConfig := DefaultSocketBufferConfig() + return &AudioInputServer{ - listener: listener, - messageChan: make(chan *InputIPCMessage, initialBufferSize), - processChan: make(chan *InputIPCMessage, initialBufferSize), - stopChan: make(chan struct{}), - bufferSize: initialBufferSize, + listener: listener, + messageChan: make(chan *InputIPCMessage, initialBufferSize), + processChan: make(chan *InputIPCMessage, initialBufferSize), + stopChan: make(chan struct{}), + bufferSize: initialBufferSize, + socketBufferConfig: socketBufferConfig, }, nil } @@ -270,6 +277,16 @@ func (ais *AudioInputServer) acceptConnections() { return } + // Configure socket buffers for optimal performance + if err := ConfigureSocketBuffers(conn, ais.socketBufferConfig); err != nil { + // Log warning but don't fail - socket buffer optimization is not critical + logger := logging.GetDefaultLogger().With().Str("component", "audio-input-server").Logger() + logger.Warn().Err(err).Msg("Failed to configure socket buffers, continuing with defaults") + } else { + // Record socket buffer metrics for monitoring + RecordSocketBufferMetrics(conn, "audio-input") + } + ais.mtx.Lock() // Close existing connection if any if ais.conn != nil { diff --git a/internal/audio/ipc.go b/internal/audio/ipc.go index c30bcb1..33f96ae 100644 --- a/internal/audio/ipc.go +++ b/internal/audio/ipc.go @@ -12,6 +12,7 @@ import ( "sync/atomic" "time" + "github.com/jetkvm/kvm/internal/logging" "github.com/rs/zerolog" ) @@ -118,6 +119,9 @@ type AudioServer struct { // Latency monitoring latencyMonitor *LatencyMonitor adaptiveOptimizer *AdaptiveOptimizer + + // Socket buffer configuration + socketBufferConfig SocketBufferConfig } func NewAudioServer() (*AudioServer, error) { @@ -146,13 +150,17 @@ func NewAudioServer() (*AudioServer, error) { optimizerConfig := DefaultOptimizerConfig() adaptiveOptimizer := NewAdaptiveOptimizer(latencyMonitor, bufferManager, optimizerConfig, logger) + // Initialize socket buffer configuration + socketBufferConfig := DefaultSocketBufferConfig() + return &AudioServer{ - listener: listener, - messageChan: make(chan *OutputIPCMessage, initialBufferSize), - stopChan: make(chan struct{}), - bufferSize: initialBufferSize, - latencyMonitor: latencyMonitor, - adaptiveOptimizer: adaptiveOptimizer, + listener: listener, + messageChan: make(chan *OutputIPCMessage, initialBufferSize), + stopChan: make(chan struct{}), + bufferSize: initialBufferSize, + latencyMonitor: latencyMonitor, + adaptiveOptimizer: adaptiveOptimizer, + socketBufferConfig: socketBufferConfig, }, nil } @@ -195,6 +203,16 @@ func (s *AudioServer) acceptConnections() { return } + // Configure socket buffers for optimal performance + if err := ConfigureSocketBuffers(conn, s.socketBufferConfig); err != nil { + // Log warning but don't fail - socket buffer optimization is not critical + logger := logging.GetDefaultLogger().With().Str("component", "audio-server").Logger() + logger.Warn().Err(err).Msg("Failed to configure socket buffers, continuing with defaults") + } else { + // Record socket buffer metrics for monitoring + RecordSocketBufferMetrics(conn, "audio-output") + } + s.mtx.Lock() // Close existing connection if any if s.conn != nil { diff --git a/internal/audio/metrics.go b/internal/audio/metrics.go index d15d347..9c2b442 100644 --- a/internal/audio/metrics.go +++ b/internal/audio/metrics.go @@ -46,6 +46,31 @@ var ( }, ) + // Socket buffer metrics + socketBufferSizeGauge = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "jetkvm_audio_socket_buffer_size_bytes", + Help: "Current socket buffer size in bytes", + }, + []string{"component", "buffer_type"}, // buffer_type: send, receive + ) + + socketBufferUtilizationGauge = promauto.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "jetkvm_audio_socket_buffer_utilization_percent", + Help: "Socket buffer utilization percentage", + }, + []string{"component", "buffer_type"}, // buffer_type: send, receive + ) + + socketBufferOverflowCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "jetkvm_audio_socket_buffer_overflow_total", + Help: "Total number of socket buffer overflows", + }, + []string{"component", "buffer_type"}, // buffer_type: send, receive + ) + // Audio output metrics audioFramesReceivedTotal = promauto.NewCounter( prometheus.CounterOpts{ diff --git a/internal/audio/socket_buffer.go b/internal/audio/socket_buffer.go new file mode 100644 index 0000000..f7a0f7f --- /dev/null +++ b/internal/audio/socket_buffer.go @@ -0,0 +1,160 @@ +package audio + +import ( + "fmt" + "net" + "syscall" +) + +const ( + // Socket buffer sizes optimized for JetKVM's audio workload + OptimalSocketBuffer = 128 * 1024 // 128KB (32 frames @ 4KB each) + MaxSocketBuffer = 256 * 1024 // 256KB for high-load scenarios + MinSocketBuffer = 32 * 1024 // 32KB minimum for basic functionality +) + +// SocketBufferConfig holds socket buffer configuration +type SocketBufferConfig struct { + SendBufferSize int + RecvBufferSize int + Enabled bool +} + +// DefaultSocketBufferConfig returns the default socket buffer configuration +func DefaultSocketBufferConfig() SocketBufferConfig { + return SocketBufferConfig{ + SendBufferSize: OptimalSocketBuffer, + RecvBufferSize: OptimalSocketBuffer, + Enabled: true, + } +} + +// HighLoadSocketBufferConfig returns configuration for high-load scenarios +func HighLoadSocketBufferConfig() SocketBufferConfig { + return SocketBufferConfig{ + SendBufferSize: MaxSocketBuffer, + RecvBufferSize: MaxSocketBuffer, + Enabled: true, + } +} + +// ConfigureSocketBuffers applies socket buffer configuration to a Unix socket connection +func ConfigureSocketBuffers(conn net.Conn, config SocketBufferConfig) error { + if !config.Enabled { + return nil + } + + if err := ValidateSocketBufferConfig(config); err != nil { + return fmt.Errorf("invalid socket buffer config: %w", err) + } + + unixConn, ok := conn.(*net.UnixConn) + if !ok { + return fmt.Errorf("connection is not a Unix socket") + } + + file, err := unixConn.File() + if err != nil { + return fmt.Errorf("failed to get socket file descriptor: %w", err) + } + defer file.Close() + + fd := int(file.Fd()) + + if config.SendBufferSize > 0 { + if err := syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_SNDBUF, config.SendBufferSize); err != nil { + return fmt.Errorf("failed to set SO_SNDBUF to %d: %w", config.SendBufferSize, err) + } + } + + if config.RecvBufferSize > 0 { + if err := syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_RCVBUF, config.RecvBufferSize); err != nil { + return fmt.Errorf("failed to set SO_RCVBUF to %d: %w", config.RecvBufferSize, err) + } + } + + return nil +} + +// GetSocketBufferSizes retrieves current socket buffer sizes +func GetSocketBufferSizes(conn net.Conn) (sendSize, recvSize int, err error) { + unixConn, ok := conn.(*net.UnixConn) + if !ok { + return 0, 0, fmt.Errorf("socket buffer query only supported for Unix sockets") + } + + file, err := unixConn.File() + if err != nil { + return 0, 0, fmt.Errorf("failed to get socket file descriptor: %w", err) + } + defer file.Close() + + fd := int(file.Fd()) + + // Get send buffer size + sendSize, err = syscall.GetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_SNDBUF) + if err != nil { + return 0, 0, fmt.Errorf("failed to get SO_SNDBUF: %w", err) + } + + // Get receive buffer size + recvSize, err = syscall.GetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_RCVBUF) + if err != nil { + return 0, 0, fmt.Errorf("failed to get SO_RCVBUF: %w", err) + } + + return sendSize, recvSize, nil +} + +// ValidateSocketBufferConfig validates socket buffer configuration +func ValidateSocketBufferConfig(config SocketBufferConfig) error { + if !config.Enabled { + return nil + } + + if config.SendBufferSize < MinSocketBuffer { + return fmt.Errorf("send buffer size %d is below minimum %d", config.SendBufferSize, MinSocketBuffer) + } + + if config.RecvBufferSize < MinSocketBuffer { + return fmt.Errorf("receive buffer size %d is below minimum %d", config.RecvBufferSize, MinSocketBuffer) + } + + if config.SendBufferSize > MaxSocketBuffer { + return fmt.Errorf("send buffer size %d exceeds maximum %d", config.SendBufferSize, MaxSocketBuffer) + } + + if config.RecvBufferSize > MaxSocketBuffer { + return fmt.Errorf("receive buffer size %d exceeds maximum %d", config.RecvBufferSize, MaxSocketBuffer) + } + + return nil +} + +// RecordSocketBufferMetrics records socket buffer metrics for monitoring +func RecordSocketBufferMetrics(conn net.Conn, component string) { + if conn == nil { + return + } + + // Get current socket buffer sizes + sendSize, recvSize, err := GetSocketBufferSizes(conn) + if err != nil { + // Log error but don't fail + return + } + + // Record buffer sizes + socketBufferSizeGauge.WithLabelValues(component, "send").Set(float64(sendSize)) + socketBufferSizeGauge.WithLabelValues(component, "receive").Set(float64(recvSize)) +} + +// RecordSocketBufferOverflow records a socket buffer overflow event +func RecordSocketBufferOverflow(component, bufferType string) { + socketBufferOverflowCounter.WithLabelValues(component, bufferType).Inc() +} + +// UpdateSocketBufferUtilization updates socket buffer utilization metrics +func UpdateSocketBufferUtilization(component, bufferType string, utilizationPercent float64) { + socketBufferUtilizationGauge.WithLabelValues(component, bufferType).Set(utilizationPercent) +}