diff --git a/internal/audio/atomic_utils.go b/internal/audio/atomic_utils.go deleted file mode 100644 index 0a898191..00000000 --- a/internal/audio/atomic_utils.go +++ /dev/null @@ -1,204 +0,0 @@ -package audio - -import ( - "sync/atomic" - "time" -) - -// AtomicCounter provides thread-safe counter operations -type AtomicCounter struct { - value int64 -} - -// NewAtomicCounter creates a new atomic counter -func NewAtomicCounter() *AtomicCounter { - return &AtomicCounter{} -} - -// Add atomically adds delta to the counter and returns the new value -func (c *AtomicCounter) Add(delta int64) int64 { - return atomic.AddInt64(&c.value, delta) -} - -// Increment atomically increments the counter by 1 -func (c *AtomicCounter) Increment() int64 { - return atomic.AddInt64(&c.value, 1) -} - -// Load atomically loads the counter value -func (c *AtomicCounter) Load() int64 { - return atomic.LoadInt64(&c.value) -} - -// Store atomically stores a new value -func (c *AtomicCounter) Store(value int64) { - atomic.StoreInt64(&c.value, value) -} - -// Reset atomically resets the counter to zero -func (c *AtomicCounter) Reset() { - atomic.StoreInt64(&c.value, 0) -} - -// Swap atomically swaps the value and returns the old value -func (c *AtomicCounter) Swap(new int64) int64 { - return atomic.SwapInt64(&c.value, new) -} - -// FrameMetrics provides common frame tracking metrics -type FrameMetrics struct { - Total *AtomicCounter - Dropped *AtomicCounter - Bytes *AtomicCounter -} - -// NewFrameMetrics creates a new frame metrics tracker -func NewFrameMetrics() *FrameMetrics { - return &FrameMetrics{ - Total: NewAtomicCounter(), - Dropped: NewAtomicCounter(), - Bytes: NewAtomicCounter(), - } -} - -// RecordFrame atomically records a successful frame with its size -func (fm *FrameMetrics) RecordFrame(size int64) { - fm.Total.Increment() - fm.Bytes.Add(size) -} - -// RecordDrop atomically records a dropped frame -func (fm *FrameMetrics) RecordDrop() { - fm.Dropped.Increment() -} - -// GetStats returns current metrics values -func (fm *FrameMetrics) GetStats() (total, dropped, bytes int64) { - return fm.Total.Load(), fm.Dropped.Load(), fm.Bytes.Load() -} - -// Reset resets all metrics to zero -func (fm *FrameMetrics) Reset() { - fm.Total.Reset() - fm.Dropped.Reset() - fm.Bytes.Reset() -} - -// GetDropRate calculates the drop rate as a percentage -func (fm *FrameMetrics) GetDropRate() float64 { - total := fm.Total.Load() - if total == 0 { - return 0.0 - } - dropped := fm.Dropped.Load() - return float64(dropped) / float64(total) * 100.0 -} - -// LatencyTracker provides atomic latency tracking -type LatencyTracker struct { - current *AtomicCounter - min *AtomicCounter - max *AtomicCounter - average *AtomicCounter - samples *AtomicCounter -} - -// NewLatencyTracker creates a new latency tracker -func NewLatencyTracker() *LatencyTracker { - lt := &LatencyTracker{ - current: NewAtomicCounter(), - min: NewAtomicCounter(), - max: NewAtomicCounter(), - average: NewAtomicCounter(), - samples: NewAtomicCounter(), - } - // Initialize min to max value so first measurement sets it properly - lt.min.Store(int64(^uint64(0) >> 1)) // Max int64 - return lt -} - -// RecordLatency atomically records a new latency measurement -func (lt *LatencyTracker) RecordLatency(latency time.Duration) { - latencyNanos := latency.Nanoseconds() - lt.current.Store(latencyNanos) - lt.samples.Increment() - - // Update min - for { - oldMin := lt.min.Load() - if latencyNanos >= oldMin { - break - } - if atomic.CompareAndSwapInt64(<.min.value, oldMin, latencyNanos) { - break - } - } - - // Update max - for { - oldMax := lt.max.Load() - if latencyNanos <= oldMax { - break - } - if atomic.CompareAndSwapInt64(<.max.value, oldMax, latencyNanos) { - break - } - } - - // Update average using exponential moving average - oldAvg := lt.average.Load() - newAvg := (oldAvg*7 + latencyNanos) / 8 // 87.5% weight to old average - lt.average.Store(newAvg) -} - -// GetLatencyStats returns current latency statistics -func (lt *LatencyTracker) GetLatencyStats() (current, min, max, average time.Duration, samples int64) { - return time.Duration(lt.current.Load()), - time.Duration(lt.min.Load()), - time.Duration(lt.max.Load()), - time.Duration(lt.average.Load()), - lt.samples.Load() -} - -// PoolMetrics provides common pool performance metrics -type PoolMetrics struct { - Hits *AtomicCounter - Misses *AtomicCounter -} - -// NewPoolMetrics creates a new pool metrics tracker -func NewPoolMetrics() *PoolMetrics { - return &PoolMetrics{ - Hits: NewAtomicCounter(), - Misses: NewAtomicCounter(), - } -} - -// RecordHit atomically records a pool hit -func (pm *PoolMetrics) RecordHit() { - pm.Hits.Increment() -} - -// RecordMiss atomically records a pool miss -func (pm *PoolMetrics) RecordMiss() { - pm.Misses.Increment() -} - -// GetHitRate calculates the hit rate as a percentage -func (pm *PoolMetrics) GetHitRate() float64 { - hits := pm.Hits.Load() - misses := pm.Misses.Load() - total := hits + misses - if total == 0 { - return 0.0 - } - return float64(hits) / float64(total) * 100.0 -} - -// GetStats returns hit and miss counts -func (pm *PoolMetrics) GetStats() (hits, misses int64, hitRate float64) { - hits = pm.Hits.Load() - misses = pm.Misses.Load() - hitRate = pm.GetHitRate() - return -} diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index fdc1cb7e..b50ef1d3 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -910,11 +910,8 @@ func (c *AudioConfigCache) GetBufferTooLargeError() error { // Removed duplicate config caching system - using AudioConfigCache instead -func cgoAudioReadEncode(buf []byte) (int, error) { - // Fast path: Use AudioConfigCache to avoid GetConfig() in hot path - cache := GetCachedConfig() - // Only update cache if expired - avoid unnecessary overhead - // Use proper locking to avoid race condition +// updateCacheIfNeeded updates cache only if expired to avoid overhead +func updateCacheIfNeeded(cache *AudioConfigCache) { if cache.initialized.Load() { cache.mutex.RLock() cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry @@ -925,6 +922,11 @@ func cgoAudioReadEncode(buf []byte) (int, error) { } else { cache.Update() } +} + +func cgoAudioReadEncode(buf []byte) (int, error) { + cache := GetCachedConfig() + updateCacheIfNeeded(cache) // Fast validation with cached values - avoid lock with atomic access minRequired := cache.GetMinReadEncodeBuffer() @@ -1073,14 +1075,12 @@ var ( // GetBufferFromPool gets a buffer from the pool with at least the specified capacity func GetBufferFromPool(minCapacity int) []byte { cgoBufferPoolGets.Add(1) - // Use the SizedBufferPool for better memory management return GetOptimalBuffer(minCapacity) } // ReturnBufferToPool returns a buffer to the pool func ReturnBufferToPool(buf []byte) { cgoBufferPoolPuts.Add(1) - // Use the SizedBufferPool for better memory management ReturnOptimalBuffer(buf) } @@ -1151,104 +1151,49 @@ func (b *AudioFrameBatch) Release() { */ // ReadEncodeWithPooledBuffer reads audio data and encodes it using a buffer from the pool -// This reduces memory allocations by reusing buffers func ReadEncodeWithPooledBuffer() ([]byte, int, error) { - // Get cached config cache := GetCachedConfig() - // Only update cache if expired - avoid unnecessary overhead - // Use proper locking to avoid race condition - if cache.initialized.Load() { - cache.mutex.RLock() - cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry - cache.mutex.RUnlock() - if cacheExpired { - cache.Update() - } - } else { - cache.Update() - } + updateCacheIfNeeded(cache) - // Get a buffer from the pool with appropriate capacity bufferSize := cache.GetMinReadEncodeBuffer() if bufferSize == 0 { - bufferSize = 1500 // Fallback if cache not initialized + bufferSize = 1500 } - // Get buffer from pool buf := GetBufferFromPool(bufferSize) - - // Perform read/encode operation n, err := cgoAudioReadEncode(buf) if err != nil { - // Return buffer to pool on error ReturnBufferToPool(buf) return nil, 0, err } - // Resize buffer to actual data size - result := buf[:n] - - // Return the buffer with data - return result, n, nil + return buf[:n], n, nil } // DecodeWriteWithPooledBuffer decodes and writes audio data using a pooled buffer -// The caller is responsible for returning the input buffer to the pool if needed func DecodeWriteWithPooledBuffer(data []byte) (int, error) { - // Validate input if len(data) == 0 { return 0, errEmptyBuffer } - // Get cached config cache := GetCachedConfig() - // Only update cache if expired - avoid unnecessary overhead - // Use proper locking to avoid race condition - if cache.initialized.Load() { - cache.mutex.RLock() - cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry - cache.mutex.RUnlock() - if cacheExpired { - cache.Update() - } - } else { - cache.Update() - } + updateCacheIfNeeded(cache) - // Ensure data doesn't exceed max packet size maxPacketSize := cache.GetMaxPacketSize() if len(data) > maxPacketSize { return 0, newBufferTooLargeError(len(data), maxPacketSize) } - // Get a PCM buffer from the pool for optimized decode-write pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize()) defer ReturnBufferToPool(pcmBuffer) - // Perform decode/write operation using optimized implementation - n, err := CGOAudioDecodeWrite(data, pcmBuffer) - - // Return result - return n, err + return CGOAudioDecodeWrite(data, pcmBuffer) } // BatchReadEncode reads and encodes multiple audio frames in a single batch -// This reduces CGO call overhead by processing multiple frames at once func BatchReadEncode(batchSize int) ([][]byte, error) { - // Get cached config cache := GetCachedConfig() - // Only update cache if expired - avoid unnecessary overhead - // Use proper locking to avoid race condition - if cache.initialized.Load() { - cache.mutex.RLock() - cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry - cache.mutex.RUnlock() - if cacheExpired { - cache.Update() - } - } else { - cache.Update() - } + updateCacheIfNeeded(cache) // Calculate total buffer size needed for batch frameSize := cache.GetMinReadEncodeBuffer() diff --git a/internal/audio/env_utils.go b/internal/audio/env_utils.go new file mode 100644 index 00000000..8c01d4f1 --- /dev/null +++ b/internal/audio/env_utils.go @@ -0,0 +1,56 @@ +package audio + +import ( + "os" + "strconv" + + "github.com/jetkvm/kvm/internal/logging" +) + +// getEnvInt reads an integer value from environment variable with fallback to default +func getEnvInt(key string, defaultValue int) int { + if value := os.Getenv(key); value != "" { + if intValue, err := strconv.Atoi(value); err == nil { + return intValue + } + } + return defaultValue +} + +// parseOpusConfig reads OPUS configuration from environment variables +// with fallback to default config values +func parseOpusConfig() (bitrate, complexity, vbr, signalType, bandwidth, dtx int) { + // Read configuration from environment variables with config defaults + bitrate = getEnvInt("JETKVM_OPUS_BITRATE", GetConfig().CGOOpusBitrate) + complexity = getEnvInt("JETKVM_OPUS_COMPLEXITY", GetConfig().CGOOpusComplexity) + vbr = getEnvInt("JETKVM_OPUS_VBR", GetConfig().CGOOpusVBR) + signalType = getEnvInt("JETKVM_OPUS_SIGNAL_TYPE", GetConfig().CGOOpusSignalType) + bandwidth = getEnvInt("JETKVM_OPUS_BANDWIDTH", GetConfig().CGOOpusBandwidth) + dtx = getEnvInt("JETKVM_OPUS_DTX", GetConfig().CGOOpusDTX) + + return bitrate, complexity, vbr, signalType, bandwidth, dtx +} + +// applyOpusConfig applies OPUS configuration to the global config +// with optional logging for the specified component +func applyOpusConfig(bitrate, complexity, vbr, signalType, bandwidth, dtx int, component string, enableLogging bool) { + config := GetConfig() + config.CGOOpusBitrate = bitrate + config.CGOOpusComplexity = complexity + config.CGOOpusVBR = vbr + config.CGOOpusSignalType = signalType + config.CGOOpusBandwidth = bandwidth + config.CGOOpusDTX = dtx + + if enableLogging { + logger := logging.GetDefaultLogger().With().Str("component", component).Logger() + logger.Info(). + Int("bitrate", bitrate). + Int("complexity", complexity). + Int("vbr", vbr). + Int("signal_type", signalType). + Int("bandwidth", bandwidth). + Int("dtx", dtx). + Msg("applied OPUS configuration") + } +} diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go index d07ff7c2..8dc4f197 100644 --- a/internal/audio/input_ipc.go +++ b/internal/audio/input_ipc.go @@ -6,7 +6,6 @@ import ( "io" "net" "os" - "path/filepath" "runtime" "sync" "sync/atomic" @@ -16,67 +15,33 @@ import ( "github.com/rs/zerolog" ) -var ( - inputMagicNumber uint32 = GetConfig().InputMagicNumber // "JKMI" (JetKVM Microphone Input) - inputSocketName = "audio_input.sock" -) - -const ( - headerSize = 17 // Fixed header size: 4+1+4+8 bytes - matches GetConfig().HeaderSize -) - +// Constants are now defined in unified_ipc.go var ( maxFrameSize = GetConfig().MaxFrameSize // Maximum Opus frame size messagePoolSize = GetConfig().MessagePoolSize // Pre-allocated message pool size ) -// InputMessageType represents the type of IPC message -type InputMessageType uint8 +// Legacy aliases for backward compatibility +type InputMessageType = UnifiedMessageType +type InputIPCMessage = UnifiedIPCMessage +// Legacy constants for backward compatibility const ( - InputMessageTypeOpusFrame InputMessageType = iota - InputMessageTypeConfig - InputMessageTypeOpusConfig - InputMessageTypeStop - InputMessageTypeHeartbeat - InputMessageTypeAck + InputMessageTypeOpusFrame = MessageTypeOpusFrame + InputMessageTypeConfig = MessageTypeConfig + InputMessageTypeOpusConfig = MessageTypeOpusConfig + InputMessageTypeStop = MessageTypeStop + InputMessageTypeHeartbeat = MessageTypeHeartbeat + InputMessageTypeAck = MessageTypeAck ) -// InputIPCMessage represents a message sent over IPC -type InputIPCMessage struct { - Magic uint32 - Type InputMessageType - Length uint32 - Timestamp int64 - Data []byte -} - -// Implement IPCMessage interface -func (msg *InputIPCMessage) GetMagic() uint32 { - return msg.Magic -} - -func (msg *InputIPCMessage) GetType() uint8 { - return uint8(msg.Type) -} - -func (msg *InputIPCMessage) GetLength() uint32 { - return msg.Length -} - -func (msg *InputIPCMessage) GetTimestamp() int64 { - return msg.Timestamp -} - -func (msg *InputIPCMessage) GetData() []byte { - return msg.Data -} +// Methods are now inherited from UnifiedIPCMessage // OptimizedIPCMessage represents an optimized message with pre-allocated buffers type OptimizedIPCMessage struct { - header [headerSize]byte // Pre-allocated header buffer - data []byte // Reusable data buffer - msg InputIPCMessage // Embedded message + header [17]byte // Pre-allocated header buffer (headerSize = 17) + data []byte // Reusable data buffer + msg InputIPCMessage // Embedded message } // MessagePool manages a pool of reusable messages to reduce allocations @@ -197,25 +162,9 @@ func (mp *MessagePool) Put(msg *OptimizedIPCMessage) { } } -// InputIPCConfig represents configuration for audio input -type InputIPCConfig struct { - SampleRate int - Channels int - FrameSize int -} - -// InputIPCOpusConfig contains complete Opus encoder configuration -type InputIPCOpusConfig struct { - SampleRate int - Channels int - FrameSize int - Bitrate int - Complexity int - VBR int - SignalType int - Bandwidth int - DTX int -} +// Legacy aliases for backward compatibility +type InputIPCConfig = UnifiedIPCConfig +type InputIPCOpusConfig = UnifiedIPCOpusConfig // AudioInputServer handles IPC communication for audio input processing type AudioInputServer struct { @@ -1305,10 +1254,4 @@ func GetGlobalMessagePoolStats() MessagePoolStats { // Helper functions -// getInputSocketPath returns the path to the input socket -func getInputSocketPath() string { - if path := os.Getenv("JETKVM_AUDIO_INPUT_SOCKET"); path != "" { - return path - } - return filepath.Join("/var/run", inputSocketName) -} +// getInputSocketPath is now defined in unified_ipc.go diff --git a/internal/audio/input_server_main.go b/internal/audio/input_server_main.go index 808c1667..84d64b2e 100644 --- a/internal/audio/input_server_main.go +++ b/internal/audio/input_server_main.go @@ -13,7 +13,6 @@ import ( "context" "os" "os/signal" - "strconv" "syscall" "time" @@ -21,39 +20,6 @@ import ( ) // getEnvInt reads an integer from environment variable with a default value -func getEnvIntInput(key string, defaultValue int) int { - if value := os.Getenv(key); value != "" { - if intValue, err := strconv.Atoi(value); err == nil { - return intValue - } - } - return defaultValue -} - -// parseOpusConfigInput reads OPUS configuration from environment variables -// with fallback to default config values for input server -func parseOpusConfigInput() (bitrate, complexity, vbr, signalType, bandwidth, dtx int) { - // Read configuration from environment variables with config defaults - bitrate = getEnvIntInput("JETKVM_OPUS_BITRATE", GetConfig().CGOOpusBitrate) - complexity = getEnvIntInput("JETKVM_OPUS_COMPLEXITY", GetConfig().CGOOpusComplexity) - vbr = getEnvIntInput("JETKVM_OPUS_VBR", GetConfig().CGOOpusVBR) - signalType = getEnvIntInput("JETKVM_OPUS_SIGNAL_TYPE", GetConfig().CGOOpusSignalType) - bandwidth = getEnvIntInput("JETKVM_OPUS_BANDWIDTH", GetConfig().CGOOpusBandwidth) - dtx = getEnvIntInput("JETKVM_OPUS_DTX", GetConfig().CGOOpusDTX) - - return bitrate, complexity, vbr, signalType, bandwidth, dtx -} - -// applyOpusConfigInput applies OPUS configuration to the global config for input server -func applyOpusConfigInput(bitrate, complexity, vbr, signalType, bandwidth, dtx int) { - config := GetConfig() - config.CGOOpusBitrate = bitrate - config.CGOOpusComplexity = complexity - config.CGOOpusVBR = vbr - config.CGOOpusSignalType = signalType - config.CGOOpusBandwidth = bandwidth - config.CGOOpusDTX = dtx -} // RunAudioInputServer runs the audio input server subprocess // This should be called from main() when the subprocess is detected @@ -62,8 +28,8 @@ func RunAudioInputServer() error { logger.Debug().Msg("audio input server subprocess starting") // Parse OPUS configuration from environment variables - bitrate, complexity, vbr, signalType, bandwidth, dtx := parseOpusConfigInput() - applyOpusConfigInput(bitrate, complexity, vbr, signalType, bandwidth, dtx) + bitrate, complexity, vbr, signalType, bandwidth, dtx := parseOpusConfig() + applyOpusConfig(bitrate, complexity, vbr, signalType, bandwidth, dtx, "audio-input-server", false) // Initialize validation cache for optimal performance InitValidationCache() diff --git a/internal/audio/input_supervisor.go b/internal/audio/input_supervisor.go index 509f4e6f..889b124c 100644 --- a/internal/audio/input_supervisor.go +++ b/internal/audio/input_supervisor.go @@ -48,8 +48,6 @@ func (ais *AudioInputSupervisor) SetOpusConfig(bitrate, complexity, vbr, signalT } } - - // Start starts the audio input server subprocess func (ais *AudioInputSupervisor) Start() error { ais.mutex.Lock() @@ -122,15 +120,11 @@ func (ais *AudioInputSupervisor) Start() error { return nil } - - // Stop stops the audio input server subprocess func (ais *AudioInputSupervisor) Stop() { ais.mutex.Lock() defer ais.mutex.Unlock() - - if !ais.IsRunning() { return } diff --git a/internal/audio/ipc.go b/internal/audio/ipc.go index d172144e..ccef9318 100644 --- a/internal/audio/ipc.go +++ b/internal/audio/ipc.go @@ -4,359 +4,46 @@ import ( "encoding/binary" "fmt" "io" - "net" - "os" - "path/filepath" - "sync" "sync/atomic" - "time" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" ) -var ( - outputMagicNumber uint32 = GetConfig().OutputMagicNumber // "JKOU" (JetKVM Output) - outputSocketName = "audio_output.sock" -) - -// Output IPC constants are now centralized in config_constants.go -// outputMaxFrameSize, outputWriteTimeout, outputMaxDroppedFrames, outputHeaderSize, outputMessagePoolSize - -// OutputIPCConfig represents configuration for audio output -type OutputIPCConfig struct { - SampleRate int - Channels int - FrameSize int -} - -// OutputMessageType represents the type of IPC message -type OutputMessageType uint8 +// Legacy aliases for backward compatibility +type OutputIPCConfig = UnifiedIPCConfig +type OutputMessageType = UnifiedMessageType +type OutputIPCMessage = UnifiedIPCMessage +// Legacy constants for backward compatibility const ( - OutputMessageTypeOpusFrame OutputMessageType = iota - OutputMessageTypeConfig - OutputMessageTypeStop - OutputMessageTypeHeartbeat - OutputMessageTypeAck + OutputMessageTypeOpusFrame = MessageTypeOpusFrame + OutputMessageTypeConfig = MessageTypeConfig + OutputMessageTypeStop = MessageTypeStop + OutputMessageTypeHeartbeat = MessageTypeHeartbeat + OutputMessageTypeAck = MessageTypeAck ) -// OutputIPCMessage represents a message sent over IPC -type OutputIPCMessage struct { - Magic uint32 - Type OutputMessageType - Length uint32 - Timestamp int64 - Data []byte -} - -// Implement IPCMessage interface -func (msg *OutputIPCMessage) GetMagic() uint32 { - return msg.Magic -} - -func (msg *OutputIPCMessage) GetType() uint8 { - return uint8(msg.Type) -} - -func (msg *OutputIPCMessage) GetLength() uint32 { - return msg.Length -} - -func (msg *OutputIPCMessage) GetTimestamp() int64 { - return msg.Timestamp -} - -func (msg *OutputIPCMessage) GetData() []byte { - return msg.Data -} +// Methods are now inherited from UnifiedIPCMessage // Global shared message pool for output IPC client header reading var globalOutputClientMessagePool = NewGenericMessagePool(GetConfig().OutputMessagePoolSize) -type AudioOutputServer struct { - // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) - bufferSize int64 // Current buffer size (atomic) - droppedFrames int64 // Dropped frames counter (atomic) - totalFrames int64 // Total frames counter (atomic) - - listener net.Listener - conn net.Conn - mtx sync.Mutex - running bool - - // Advanced message handling - messageChan chan *OutputIPCMessage // Buffered channel for incoming messages - stopChan chan struct{} // Stop signal - wg sync.WaitGroup // Wait group for goroutine coordination - - // Latency monitoring - latencyMonitor *LatencyMonitor - adaptiveOptimizer *AdaptiveOptimizer - - // Socket buffer configuration - socketBufferConfig SocketBufferConfig -} +// AudioOutputServer is now an alias for UnifiedAudioServer +type AudioOutputServer = UnifiedAudioServer func NewAudioOutputServer() (*AudioOutputServer, error) { - socketPath := getOutputSocketPath() - // Remove existing socket if any - os.Remove(socketPath) - - listener, err := net.Listen("unix", socketPath) - if err != nil { - return nil, fmt.Errorf("failed to create unix socket: %w", err) - } - - // Initialize with adaptive buffer size (start with 500 frames) - initialBufferSize := int64(GetConfig().InitialBufferFrames) - - // Initialize latency monitoring - latencyConfig := DefaultLatencyConfig() - logger := zerolog.New(os.Stderr).With().Timestamp().Str("component", AudioOutputServerComponent).Logger() - latencyMonitor := NewLatencyMonitor(latencyConfig, logger) - - // Initialize adaptive buffer manager with default config - bufferConfig := DefaultAdaptiveBufferConfig() - bufferManager := NewAdaptiveBufferManager(bufferConfig) - - // Initialize adaptive optimizer - optimizerConfig := DefaultOptimizerConfig() - adaptiveOptimizer := NewAdaptiveOptimizer(latencyMonitor, bufferManager, optimizerConfig, logger) - - // Initialize socket buffer configuration - socketBufferConfig := DefaultSocketBufferConfig() - - return &AudioOutputServer{ - listener: listener, - messageChan: make(chan *OutputIPCMessage, initialBufferSize), - stopChan: make(chan struct{}), - bufferSize: initialBufferSize, - latencyMonitor: latencyMonitor, - adaptiveOptimizer: adaptiveOptimizer, - socketBufferConfig: socketBufferConfig, - }, nil + return NewUnifiedAudioServer(false) // false = output server } -func (s *AudioOutputServer) Start() error { - s.mtx.Lock() - defer s.mtx.Unlock() +// Start method is now inherited from UnifiedAudioServer - if s.running { - return fmt.Errorf("server already running") - } +// acceptConnections method is now inherited from UnifiedAudioServer - s.running = true +// startProcessorGoroutine method is now inherited from UnifiedAudioServer - // Start latency monitoring and adaptive optimization - if s.latencyMonitor != nil { - s.latencyMonitor.Start() - } - if s.adaptiveOptimizer != nil { - s.adaptiveOptimizer.Start() - } +// Stop method is now inherited from UnifiedAudioServer - // Start message processor goroutine - s.startProcessorGoroutine() +// Close method is now inherited from UnifiedAudioServer - // Submit the connection acceptor to the audio reader pool - if !SubmitAudioReaderTask(s.acceptConnections) { - // If the pool is full or shutting down, fall back to direct goroutine creation - logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger() - logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation") - - go s.acceptConnections() - } - - return nil -} - -// acceptConnections accepts incoming connections -func (s *AudioOutputServer) acceptConnections() { - logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger() - for s.running { - conn, err := s.listener.Accept() - if err != nil { - if s.running { - // Log warning and retry on accept failure - logger.Warn().Err(err).Msg("Failed to accept connection, retrying") - continue - } - return - } - - // Configure socket buffers for optimal performance - if err := ConfigureSocketBuffers(conn, s.socketBufferConfig); err != nil { - // Log warning but don't fail - socket buffer optimization is not critical - logger.Warn().Err(err).Msg("Failed to configure socket buffers, continuing with defaults") - } else { - // Record socket buffer metrics for monitoring - RecordSocketBufferMetrics(conn, "audio-output") - } - - s.mtx.Lock() - // Close existing connection if any - if s.conn != nil { - s.conn.Close() - s.conn = nil - } - s.conn = conn - s.mtx.Unlock() - } -} - -// startProcessorGoroutine starts the message processor using the goroutine pool -func (s *AudioOutputServer) startProcessorGoroutine() { - s.wg.Add(1) - - // Create a processor task that will run in the goroutine pool - processorTask := func() { - defer s.wg.Done() - for { - select { - case msg := <-s.messageChan: - // Process message (currently just frame sending) - if msg.Type == OutputMessageTypeOpusFrame { - if err := s.sendFrameToClient(msg.Data); err != nil { - // Log error but continue processing - atomic.AddInt64(&s.droppedFrames, 1) - } - } - case <-s.stopChan: - return - } - } - } - - // Submit the processor task to the audio processor pool - if !SubmitAudioProcessorTask(processorTask) { - // If the pool is full or shutting down, fall back to direct goroutine creation - logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger() - logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation") - - go processorTask() - } -} - -func (s *AudioOutputServer) Stop() { - s.mtx.Lock() - defer s.mtx.Unlock() - - if !s.running { - return - } - - s.running = false - - // Stop latency monitoring and adaptive optimization - if s.adaptiveOptimizer != nil { - s.adaptiveOptimizer.Stop() - } - if s.latencyMonitor != nil { - s.latencyMonitor.Stop() - } - - // Signal processor to stop - close(s.stopChan) - s.wg.Wait() - - if s.conn != nil { - s.conn.Close() - s.conn = nil - } -} - -func (s *AudioOutputServer) Close() error { - s.Stop() - if s.listener != nil { - s.listener.Close() - } - // Remove socket file - os.Remove(getOutputSocketPath()) - return nil -} - -func (s *AudioOutputServer) SendFrame(frame []byte) error { - // Use ultra-fast validation for critical audio path - if err := ValidateAudioFrame(frame); err != nil { - logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger() - logger.Error().Err(err).Msg("Frame validation failed") - return fmt.Errorf("output frame validation failed: %w", err) - } - - // Additional output-specific size check - maxFrameSize := GetConfig().OutputMaxFrameSize - if len(frame) > maxFrameSize { - return fmt.Errorf("output frame size validation failed: got %d bytes, maximum allowed %d bytes", len(frame), maxFrameSize) - } - - start := time.Now() - - // Create IPC message - msg := &OutputIPCMessage{ - Magic: outputMagicNumber, - Type: OutputMessageTypeOpusFrame, - Length: uint32(len(frame)), - Timestamp: start.UnixNano(), - Data: frame, - } - - // Try to send via message channel (non-blocking) - select { - case s.messageChan <- msg: - atomic.AddInt64(&s.totalFrames, 1) - - // Record latency for monitoring - if s.latencyMonitor != nil { - processingTime := time.Since(start) - s.latencyMonitor.RecordLatency(processingTime, "ipc_send") - } - - return nil - default: - // Channel full, drop frame to prevent blocking - atomic.AddInt64(&s.droppedFrames, 1) - return fmt.Errorf("output message channel full (capacity: %d) - frame dropped to prevent blocking", cap(s.messageChan)) - } -} - -// sendFrameToClient sends frame data directly to the connected client -// Global shared message pool for output IPC server -var globalOutputServerMessagePool = NewGenericMessagePool(GetConfig().OutputMessagePoolSize) - -func (s *AudioOutputServer) sendFrameToClient(frame []byte) error { - s.mtx.Lock() - defer s.mtx.Unlock() - - if s.conn == nil { - return fmt.Errorf("no audio output client connected to server") - } - - start := time.Now() - - // Create output IPC message - msg := &OutputIPCMessage{ - Magic: outputMagicNumber, - Type: OutputMessageTypeOpusFrame, - Length: uint32(len(frame)), - Timestamp: start.UnixNano(), - Data: frame, - } - - // Use shared WriteIPCMessage function - err := WriteIPCMessage(s.conn, msg, globalOutputServerMessagePool, &s.droppedFrames) - if err != nil { - return err - } - - // Record latency for monitoring - if s.latencyMonitor != nil { - writeLatency := time.Since(start) - s.latencyMonitor.RecordLatency(writeLatency, "ipc_write") - } - - return nil -} +// SendFrame method is now inherited from UnifiedAudioServer // GetServerStats returns server performance statistics func (s *AudioOutputServer) GetServerStats() (total, dropped int64, bufferSize int64) { @@ -364,82 +51,20 @@ func (s *AudioOutputServer) GetServerStats() (total, dropped int64, bufferSize i return stats.Total, stats.Dropped, atomic.LoadInt64(&s.bufferSize) } -type AudioOutputClient struct { - // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) - droppedFrames int64 // Atomic counter for dropped frames - totalFrames int64 // Atomic counter for total frames - - conn net.Conn - mtx sync.Mutex - running bool - bufferPool *AudioBufferPool // Buffer pool for memory optimization -} +// AudioOutputClient is now an alias for UnifiedAudioClient +type AudioOutputClient = UnifiedAudioClient func NewAudioOutputClient() *AudioOutputClient { - return &AudioOutputClient{ - bufferPool: NewAudioBufferPool(GetMaxAudioFrameSize()), - } + return NewUnifiedAudioClient(false) // false = output client } -// Connect connects to the audio output server -func (c *AudioOutputClient) Connect() error { - c.mtx.Lock() - defer c.mtx.Unlock() +// Connect method is now inherited from UnifiedAudioClient - if c.running { - return nil // Already connected - } +// Disconnect method is now inherited from UnifiedAudioClient - socketPath := getOutputSocketPath() - // Try connecting multiple times as the server might not be ready - // Reduced retry count and delay for faster startup - for i := 0; i < 8; i++ { - conn, err := net.Dial("unix", socketPath) - if err == nil { - c.conn = conn - c.running = true - return nil - } - // Exponential backoff starting from config - backoffStart := GetConfig().BackoffStart - delay := time.Duration(backoffStart.Nanoseconds()*(1< maxDelay { - delay = maxDelay - } - time.Sleep(delay) - } +// IsConnected method is now inherited from UnifiedAudioClient - return fmt.Errorf("failed to connect to audio output server at %s after %d retries", socketPath, 8) -} - -// Disconnect disconnects from the audio output server -func (c *AudioOutputClient) Disconnect() { - c.mtx.Lock() - defer c.mtx.Unlock() - - if !c.running { - return - } - - c.running = false - if c.conn != nil { - c.conn.Close() - c.conn = nil - } -} - -// IsConnected returns whether the client is connected -func (c *AudioOutputClient) IsConnected() bool { - c.mtx.Lock() - defer c.mtx.Unlock() - return c.running && c.conn != nil -} - -func (c *AudioOutputClient) Close() error { - c.Disconnect() - return nil -} +// Close method is now inherited from UnifiedAudioClient func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) { c.mtx.Lock() @@ -499,10 +124,4 @@ func (c *AudioOutputClient) GetClientStats() (total, dropped int64) { // Helper functions -// getOutputSocketPath returns the path to the output socket -func getOutputSocketPath() string { - if path := os.Getenv("JETKVM_AUDIO_OUTPUT_SOCKET"); path != "" { - return path - } - return filepath.Join("/var/run", outputSocketName) -} +// getOutputSocketPath is now defined in unified_ipc.go diff --git a/internal/audio/input.go b/internal/audio/microphone_manager.go similarity index 100% rename from internal/audio/input.go rename to internal/audio/microphone_manager.go diff --git a/internal/audio/naming_standards.go b/internal/audio/naming_standards.go index 2e5be5b6..aa6da1be 100644 --- a/internal/audio/naming_standards.go +++ b/internal/audio/naming_standards.go @@ -25,11 +25,9 @@ STANDARDIZED NAMING CONVENTIONS: - AudioInputStreamer (new: for consistency with OutputStreamer) Output Components: - - AudioOutputManager (new: missing high-level manager) - AudioOutputSupervisor (replaces: AudioOutputSupervisor) ✓ - AudioOutputServer (replaces: AudioOutputServer) ✓ - AudioOutputClient (replaces: AudioOutputClient) ✓ - - AudioOutputStreamer (replaces: OutputStreamer) 3. IPC NAMING: - AudioInputIPCManager (replaces: AudioInputIPCManager) ✓ @@ -46,18 +44,14 @@ STANDARDIZED NAMING CONVENTIONS: - OutputMessageType (replaces: OutputMessageType) ✓ ISSUES IDENTIFIED: -1. Missing AudioOutputManager (high-level output management) -2. Inconsistent naming: OutputStreamer vs AudioInputSupervisor -3. Missing AudioOutputIPCManager for symmetry -4. Missing OutputIPCConfig for consistency -5. Component names in logging should be standardized +1. Missing AudioOutputIPCManager for symmetry +2. Missing OutputIPCConfig for consistency +3. Component names in logging should be standardized IMPLEMENTATION PLAN: -1. Create AudioOutputManager to match AudioInputManager -2. Rename OutputStreamer to AudioOutputStreamer -3. Create AudioOutputIPCManager for symmetry -4. Standardize all component logging names -5. Update all references consistently +1. Create AudioOutputIPCManager for symmetry +2. Standardize all component logging names +3. Update all references consistently */ // Component name constants for consistent logging @@ -70,11 +64,9 @@ const ( AudioInputIPCComponent = "audio-input-ipc" // Output component names - AudioOutputManagerComponent = "audio-output-manager" AudioOutputSupervisorComponent = "audio-output-supervisor" AudioOutputServerComponent = "audio-output-server" AudioOutputClientComponent = "audio-output-client" - AudioOutputStreamerComponent = "audio-output-streamer" AudioOutputIPCComponent = "audio-output-ipc" // Common component names diff --git a/internal/audio/output_ipc_manager.go b/internal/audio/output_ipc_manager.go index 2c64c918..cbc736bb 100644 --- a/internal/audio/output_ipc_manager.go +++ b/internal/audio/output_ipc_manager.go @@ -8,6 +8,15 @@ import ( "github.com/jetkvm/kvm/internal/logging" ) +// AudioOutputMetrics represents metrics for audio output operations +type AudioOutputMetrics struct { + // Atomic int64 field first for proper ARM32 alignment + FramesReceived int64 `json:"frames_received"` // Total frames received (output-specific) + + // Embedded struct with atomic fields properly aligned + BaseAudioMetrics +} + // AudioOutputIPCManager manages audio output using IPC when enabled type AudioOutputIPCManager struct { *BaseAudioManager diff --git a/internal/audio/output_manager.go b/internal/audio/output_manager.go deleted file mode 100644 index 883cb47d..00000000 --- a/internal/audio/output_manager.go +++ /dev/null @@ -1,151 +0,0 @@ -package audio - -import ( - "sync/atomic" - - "github.com/jetkvm/kvm/internal/logging" -) - -// AudioOutputManager manages audio output stream using IPC mode -type AudioOutputManager struct { - *BaseAudioManager - streamer *AudioOutputStreamer - framesReceived int64 // Output-specific metric -} - -// AudioOutputMetrics tracks output-specific metrics -// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) -type AudioOutputMetrics struct { - // Atomic int64 field first for proper ARM32 alignment - FramesReceived int64 `json:"frames_received"` // Total frames received (output-specific) - - // Embedded struct with atomic fields properly aligned - BaseAudioMetrics -} - -// NewAudioOutputManager creates a new audio output manager -func NewAudioOutputManager() *AudioOutputManager { - logger := logging.GetDefaultLogger().With().Str("component", AudioOutputManagerComponent).Logger() - streamer, err := NewAudioOutputStreamer() - if err != nil { - // Log error but continue with nil streamer - will be handled gracefully - logger.Error().Err(err).Msg("Failed to create audio output streamer") - } - - return &AudioOutputManager{ - BaseAudioManager: NewBaseAudioManager(logger), - streamer: streamer, - } -} - -// Start starts the audio output manager -func (aom *AudioOutputManager) Start() error { - if !aom.setRunning(true) { - return nil // Already running - } - - aom.logComponentStart(AudioOutputManagerComponent) - - if aom.streamer == nil { - // Try to recreate streamer if it was nil - streamer, err := NewAudioOutputStreamer() - if err != nil { - aom.setRunning(false) - aom.logComponentError(AudioOutputManagerComponent, err, "failed to create audio output streamer") - return err - } - aom.streamer = streamer - } - - err := aom.streamer.Start() - if err != nil { - aom.setRunning(false) - // Reset metrics on failed start - aom.resetMetrics() - aom.logComponentError(AudioOutputManagerComponent, err, "failed to start component") - return err - } - - aom.logComponentStarted(AudioOutputManagerComponent) - return nil -} - -// Stop stops the audio output manager -func (aom *AudioOutputManager) Stop() { - if !aom.setRunning(false) { - return // Already stopped - } - - aom.logComponentStop(AudioOutputManagerComponent) - - if aom.streamer != nil { - aom.streamer.Stop() - } - - aom.logComponentStopped(AudioOutputManagerComponent) -} - -// resetMetrics resets all metrics to zero -func (aom *AudioOutputManager) resetMetrics() { - aom.BaseAudioManager.resetMetrics() - atomic.StoreInt64(&aom.framesReceived, 0) -} - -// Note: IsRunning() is inherited from BaseAudioManager - -// IsReady returns whether the audio output manager is ready to receive frames -func (aom *AudioOutputManager) IsReady() bool { - if !aom.IsRunning() || aom.streamer == nil { - return false - } - // For output, we consider it ready if the streamer is running - // This could be enhanced with connection status checks - return true -} - -// GetMetrics returns current metrics -func (aom *AudioOutputManager) GetMetrics() AudioOutputMetrics { - return AudioOutputMetrics{ - FramesReceived: atomic.LoadInt64(&aom.framesReceived), - BaseAudioMetrics: aom.getBaseMetrics(), - } -} - -// GetComprehensiveMetrics returns detailed performance metrics -func (aom *AudioOutputManager) GetComprehensiveMetrics() map[string]interface{} { - baseMetrics := aom.GetMetrics() - - comprehensiveMetrics := map[string]interface{}{ - "manager": map[string]interface{}{ - "frames_received": baseMetrics.FramesReceived, - "frames_processed": baseMetrics.FramesProcessed, - "frames_dropped": baseMetrics.FramesDropped, - "bytes_processed": baseMetrics.BytesProcessed, - "connection_drops": baseMetrics.ConnectionDrops, - "average_latency_ms": float64(baseMetrics.AverageLatency.Nanoseconds()) / 1e6, - "last_frame_time": baseMetrics.LastFrameTime, - "running": aom.IsRunning(), - "ready": aom.IsReady(), - }, - } - - if aom.streamer != nil { - processed, dropped, avgTime := aom.streamer.GetStats() - comprehensiveMetrics["streamer"] = map[string]interface{}{ - "frames_processed": processed, - "frames_dropped": dropped, - "avg_processing_time_ms": float64(avgTime.Nanoseconds()) / 1e6, - } - - if detailedStats := aom.streamer.GetDetailedStats(); detailedStats != nil { - comprehensiveMetrics["detailed"] = detailedStats - } - } - - return comprehensiveMetrics -} - -// GetStreamer returns the streamer for advanced operations -func (aom *AudioOutputManager) GetStreamer() *AudioOutputStreamer { - return aom.streamer -} diff --git a/internal/audio/output_server_main.go b/internal/audio/output_server_main.go index 32169f97..c857fef6 100644 --- a/internal/audio/output_server_main.go +++ b/internal/audio/output_server_main.go @@ -4,7 +4,6 @@ import ( "context" "os" "os/signal" - "strconv" "syscall" "time" @@ -12,50 +11,6 @@ import ( ) // getEnvInt reads an integer from environment variable with a default value -func getEnvInt(key string, defaultValue int) int { - if value := os.Getenv(key); value != "" { - if intValue, err := strconv.Atoi(value); err == nil { - return intValue - } - } - return defaultValue -} - -// parseOpusConfig reads OPUS configuration from environment variables -// with fallback to default config values -func parseOpusConfig() (bitrate, complexity, vbr, signalType, bandwidth, dtx int) { - // Read configuration from environment variables with config defaults - bitrate = getEnvInt("JETKVM_OPUS_BITRATE", GetConfig().CGOOpusBitrate) - complexity = getEnvInt("JETKVM_OPUS_COMPLEXITY", GetConfig().CGOOpusComplexity) - vbr = getEnvInt("JETKVM_OPUS_VBR", GetConfig().CGOOpusVBR) - signalType = getEnvInt("JETKVM_OPUS_SIGNAL_TYPE", GetConfig().CGOOpusSignalType) - bandwidth = getEnvInt("JETKVM_OPUS_BANDWIDTH", GetConfig().CGOOpusBandwidth) - dtx = getEnvInt("JETKVM_OPUS_DTX", GetConfig().CGOOpusDTX) - - return bitrate, complexity, vbr, signalType, bandwidth, dtx -} - -// applyOpusConfig applies OPUS configuration to the global config -func applyOpusConfig(bitrate, complexity, vbr, signalType, bandwidth, dtx int) { - logger := logging.GetDefaultLogger().With().Str("component", "audio-output-server").Logger() - - config := GetConfig() - config.CGOOpusBitrate = bitrate - config.CGOOpusComplexity = complexity - config.CGOOpusVBR = vbr - config.CGOOpusSignalType = signalType - config.CGOOpusBandwidth = bandwidth - config.CGOOpusDTX = dtx - - logger.Info(). - Int("bitrate", bitrate). - Int("complexity", complexity). - Int("vbr", vbr). - Int("signal_type", signalType). - Int("bandwidth", bandwidth). - Int("dtx", dtx). - Msg("applied OPUS configuration") -} // RunAudioOutputServer runs the audio output server subprocess // This should be called from main() when the subprocess is detected @@ -65,7 +20,7 @@ func RunAudioOutputServer() error { // Parse OPUS configuration from environment variables bitrate, complexity, vbr, signalType, bandwidth, dtx := parseOpusConfig() - applyOpusConfig(bitrate, complexity, vbr, signalType, bandwidth, dtx) + applyOpusConfig(bitrate, complexity, vbr, signalType, bandwidth, dtx, "audio-output-server", true) // Initialize validation cache for optimal performance InitValidationCache() @@ -76,7 +31,7 @@ func RunAudioOutputServer() error { logger.Error().Err(err).Msg("failed to create audio server") return err } - defer server.Close() + defer server.Stop() // Start accepting connections if err := server.Start(); err != nil { diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index c9690ab0..4d3b3b34 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -6,9 +6,7 @@ package audio import ( "context" "fmt" - "runtime" "strings" - "sync" "sync/atomic" "time" @@ -16,31 +14,7 @@ import ( "github.com/rs/zerolog" ) -// AudioOutputStreamer manages high-performance audio output streaming -type AudioOutputStreamer struct { - // Atomic int64 fields MUST be first for ARM32 alignment (8-byte alignment required) - processedFrames int64 // Total processed frames counter (atomic) - droppedFrames int64 // Dropped frames counter (atomic) - processingTime int64 // Average processing time in nanoseconds (atomic) - lastStatsTime int64 // Last statistics update time (atomic) - - // Other fields after atomic int64 fields - sampleRate int32 // Sample every N frames (default: 10) - - client *AudioOutputClient - bufferPool *AudioBufferPool - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - running bool - mtx sync.Mutex - chanClosed bool // Track if processing channel is closed - - // Adaptive processing configuration - batchSize int // Adaptive batch size for frame processing - processingChan chan []byte // Buffered channel for frame processing - statsInterval time.Duration // Statistics reporting interval -} +// Removed unused AudioOutputStreamer struct - actual streaming uses direct functions var ( outputStreamingRunning int32 @@ -50,298 +24,27 @@ var ( func getOutputStreamingLogger() *zerolog.Logger { if outputStreamingLogger == nil { - logger := logging.GetDefaultLogger().With().Str("component", AudioOutputStreamerComponent).Logger() + logger := logging.GetDefaultLogger().With().Str("component", "audio-output-streaming").Logger() outputStreamingLogger = &logger } return outputStreamingLogger } -func NewAudioOutputStreamer() (*AudioOutputStreamer, error) { - client := NewAudioOutputClient() +// Removed unused NewAudioOutputStreamer function - // Get initial batch size from adaptive buffer manager - adaptiveManager := GetAdaptiveBufferManager() - initialBatchSize := adaptiveManager.GetOutputBufferSize() +// Removed unused AudioOutputStreamer.Start method - ctx, cancel := context.WithCancel(context.Background()) - return &AudioOutputStreamer{ - client: client, - bufferPool: NewAudioBufferPool(GetMaxAudioFrameSize()), // Use existing buffer pool - ctx: ctx, - cancel: cancel, - batchSize: initialBatchSize, // Use adaptive batch size - processingChan: make(chan []byte, GetConfig().ChannelBufferSize), // Large buffer for smooth processing - statsInterval: GetConfig().StatsUpdateInterval, // Statistics interval from config - lastStatsTime: time.Now().UnixNano(), - sampleRate: 10, // Update metrics every 10 frames to reduce atomic ops - }, nil -} +// Removed unused AudioOutputStreamer.Stop method -func (s *AudioOutputStreamer) Start() error { - s.mtx.Lock() - defer s.mtx.Unlock() +// Removed unused AudioOutputStreamer.streamLoop method - if s.running { - return fmt.Errorf("output streamer already running") - } +// Removed unused AudioOutputStreamer.processingLoop method - // Connect to audio output server - if err := s.client.Connect(); err != nil { - return fmt.Errorf("failed to connect to audio output server at %s: %w", getOutputSocketPath(), err) - } +// Removed unused AudioOutputStreamer.statisticsLoop method - s.running = true +// Removed unused AudioOutputStreamer.reportStatistics method - // Start multiple goroutines for optimal performance - s.wg.Add(3) - go s.streamLoop() // Main streaming loop - go s.processingLoop() // Frame processing loop - go s.statisticsLoop() // Performance monitoring loop - - return nil -} - -func (s *AudioOutputStreamer) Stop() { - s.mtx.Lock() - defer s.mtx.Unlock() - - if !s.running { - return - } - - s.running = false - s.cancel() - - // Flush any pending sampled metrics before stopping - s.flushPendingMetrics() - - // Close processing channel to signal goroutines (only if not already closed) - if !s.chanClosed { - close(s.processingChan) - s.chanClosed = true - } - - // Wait for all goroutines to finish - s.wg.Wait() - - if s.client != nil { - s.client.Close() - } -} - -func (s *AudioOutputStreamer) streamLoop() { - defer s.wg.Done() - - // Only pin to OS thread for high-throughput scenarios to reduce scheduler interference - config := GetConfig() - useThreadOptimizations := config.MaxAudioProcessorWorkers > 8 - - if useThreadOptimizations { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - } - - // Adaptive timing for frame reading - frameInterval := time.Duration(GetConfig().OutputStreamingFrameIntervalMS) * time.Millisecond // 50 FPS base rate - ticker := time.NewTicker(frameInterval) - defer ticker.Stop() - - // Batch size update ticker - batchUpdateTicker := time.NewTicker(GetConfig().BufferUpdateInterval) - defer batchUpdateTicker.Stop() - - for { - select { - case <-s.ctx.Done(): - return - case <-batchUpdateTicker.C: - // Update batch size from adaptive buffer manager - s.UpdateBatchSize() - case <-ticker.C: - // Read audio data from CGO with timing measurement - startTime := time.Now() - frameBuf := s.bufferPool.Get() - n, err := CGOAudioReadEncode(frameBuf) - processingDuration := time.Since(startTime) - - if err != nil { - getOutputStreamingLogger().Warn().Err(err).Msg("Failed to read audio data") - s.bufferPool.Put(frameBuf) - atomic.AddInt64(&s.droppedFrames, 1) - continue - } - - if n > 0 { - // Send frame for processing (non-blocking) - // Use buffer pool to avoid allocation - frameData := s.bufferPool.Get() - frameData = frameData[:n] - copy(frameData, frameBuf[:n]) - - select { - case s.processingChan <- frameData: - atomic.AddInt64(&s.processedFrames, 1) - // Update processing time statistics - atomic.StoreInt64(&s.processingTime, int64(processingDuration)) - // Report latency to adaptive buffer manager - s.ReportLatency(processingDuration) - default: - // Processing channel full, drop frame - atomic.AddInt64(&s.droppedFrames, 1) - } - } - - s.bufferPool.Put(frameBuf) - } - } -} - -// processingLoop handles frame processing in a separate goroutine -func (s *AudioOutputStreamer) processingLoop() { - defer s.wg.Done() - - // Only use thread optimizations for high-throughput scenarios - config := GetConfig() - useThreadOptimizations := config.MaxAudioProcessorWorkers > 8 - - if useThreadOptimizations { - // Pin goroutine to OS thread for consistent performance - runtime.LockOSThread() - defer runtime.UnlockOSThread() - } - - for frameData := range s.processingChan { - // Process frame and return buffer to pool after processing - func() { - defer s.bufferPool.Put(frameData) - - if _, err := s.client.ReceiveFrame(); err != nil { - if s.client.IsConnected() { - // Sample logging to reduce overhead - log every 50th error - if atomic.LoadInt64(&s.droppedFrames)%50 == 0 && getOutputStreamingLogger().GetLevel() <= zerolog.WarnLevel { - getOutputStreamingLogger().Warn().Err(err).Msg("Error reading audio frame from output server") - } - s.recordFrameDropped() - } - // Try to reconnect if disconnected - if !s.client.IsConnected() { - if err := s.client.Connect(); err != nil { - // Only log reconnection failures if warn level enabled - if getOutputStreamingLogger().GetLevel() <= zerolog.WarnLevel { - getOutputStreamingLogger().Warn().Err(err).Msg("Failed to reconnect") - } - } - } - } else { - s.recordFrameProcessed() - } - }() - } -} - -// statisticsLoop monitors and reports performance statistics -func (s *AudioOutputStreamer) statisticsLoop() { - defer s.wg.Done() - - ticker := time.NewTicker(s.statsInterval) - defer ticker.Stop() - - for { - select { - case <-s.ctx.Done(): - return - case <-ticker.C: - s.reportStatistics() - } - } -} - -// reportStatistics logs current performance statistics -func (s *AudioOutputStreamer) reportStatistics() { - processed := atomic.LoadInt64(&s.processedFrames) - dropped := atomic.LoadInt64(&s.droppedFrames) - processingTime := atomic.LoadInt64(&s.processingTime) - - if processed > 0 { - dropRate := float64(dropped) / float64(processed+dropped) * GetConfig().PercentageMultiplier - avgProcessingTime := time.Duration(processingTime) - - getOutputStreamingLogger().Info().Int64("processed", processed).Int64("dropped", dropped).Float64("drop_rate", dropRate).Dur("avg_processing", avgProcessingTime).Msg("Output Audio Stats") - - // Get client statistics - clientTotal, clientDropped := s.client.GetClientStats() - getOutputStreamingLogger().Info().Int64("total", clientTotal).Int64("dropped", clientDropped).Msg("Client Stats") - } -} - -// recordFrameProcessed records a processed frame with sampling optimization -func (s *AudioOutputStreamer) recordFrameProcessed() { -} - -// recordFrameDropped records a dropped frame with sampling optimization -func (s *AudioOutputStreamer) recordFrameDropped() { -} - -// flushPendingMetrics flushes any pending sampled metrics to atomic counters -func (s *AudioOutputStreamer) flushPendingMetrics() { -} - -// GetStats returns streaming statistics with pending metrics flushed -func (s *AudioOutputStreamer) GetStats() (processed, dropped int64, avgProcessingTime time.Duration) { - // Flush pending metrics for accurate reading - s.flushPendingMetrics() - - processed = atomic.LoadInt64(&s.processedFrames) - dropped = atomic.LoadInt64(&s.droppedFrames) - processingTimeNs := atomic.LoadInt64(&s.processingTime) - avgProcessingTime = time.Duration(processingTimeNs) - return -} - -// GetDetailedStats returns comprehensive streaming statistics -func (s *AudioOutputStreamer) GetDetailedStats() map[string]interface{} { - // Flush pending metrics for accurate reading - s.flushPendingMetrics() - - processed := atomic.LoadInt64(&s.processedFrames) - dropped := atomic.LoadInt64(&s.droppedFrames) - processingTime := atomic.LoadInt64(&s.processingTime) - - stats := map[string]interface{}{ - "processed_frames": processed, - "dropped_frames": dropped, - "avg_processing_time_ns": processingTime, - "batch_size": s.batchSize, - "channel_buffer_size": cap(s.processingChan), - "channel_current_size": len(s.processingChan), - "connected": s.client.IsConnected(), - } - - if processed+dropped > 0 { - stats["drop_rate_percent"] = float64(dropped) / float64(processed+dropped) * GetConfig().PercentageMultiplier - } - - // Add client statistics - clientTotal, clientDropped := s.client.GetClientStats() - stats["client_total_frames"] = clientTotal - stats["client_dropped_frames"] = clientDropped - - return stats -} - -// UpdateBatchSize updates the batch size from adaptive buffer manager -func (s *AudioOutputStreamer) UpdateBatchSize() { - s.mtx.Lock() - adaptiveManager := GetAdaptiveBufferManager() - s.batchSize = adaptiveManager.GetOutputBufferSize() - s.mtx.Unlock() -} - -// ReportLatency reports processing latency to adaptive buffer manager -func (s *AudioOutputStreamer) ReportLatency(latency time.Duration) { - adaptiveManager := GetAdaptiveBufferManager() - adaptiveManager.UpdateLatency(latency) -} +// Removed all unused AudioOutputStreamer methods // StartAudioOutputStreaming starts audio output streaming (capturing system audio) func StartAudioOutputStreaming(send func([]byte)) error { diff --git a/internal/audio/audio.go b/internal/audio/quality_presets.go similarity index 100% rename from internal/audio/audio.go rename to internal/audio/quality_presets.go diff --git a/internal/audio/session.go b/internal/audio/session_provider.go similarity index 100% rename from internal/audio/session.go rename to internal/audio/session_provider.go diff --git a/internal/audio/api.go b/internal/audio/supervisor_api.go similarity index 100% rename from internal/audio/api.go rename to internal/audio/supervisor_api.go diff --git a/internal/audio/unified_ipc.go b/internal/audio/unified_ipc.go new file mode 100644 index 00000000..2b293d5f --- /dev/null +++ b/internal/audio/unified_ipc.go @@ -0,0 +1,510 @@ +package audio + +import ( + "encoding/binary" + "fmt" + "io" + "net" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +// Unified IPC constants +var ( + outputMagicNumber uint32 = GetConfig().OutputMagicNumber // "JKOU" (JetKVM Output) + inputMagicNumber uint32 = GetConfig().InputMagicNumber // "JKMI" (JetKVM Microphone Input) + outputSocketName = "audio_output.sock" + inputSocketName = "audio_input.sock" + headerSize = 17 // Fixed header size: 4+1+4+8 bytes +) + +// UnifiedMessageType represents the type of IPC message for both input and output +type UnifiedMessageType uint8 + +const ( + MessageTypeOpusFrame UnifiedMessageType = iota + MessageTypeConfig + MessageTypeOpusConfig + MessageTypeStop + MessageTypeHeartbeat + MessageTypeAck +) + +// UnifiedIPCMessage represents a message sent over IPC for both input and output +type UnifiedIPCMessage struct { + Magic uint32 + Type UnifiedMessageType + Length uint32 + Timestamp int64 + Data []byte +} + +// Implement IPCMessage interface +func (msg *UnifiedIPCMessage) GetMagic() uint32 { + return msg.Magic +} + +func (msg *UnifiedIPCMessage) GetType() uint8 { + return uint8(msg.Type) +} + +func (msg *UnifiedIPCMessage) GetLength() uint32 { + return msg.Length +} + +func (msg *UnifiedIPCMessage) GetTimestamp() int64 { + return msg.Timestamp +} + +func (msg *UnifiedIPCMessage) GetData() []byte { + return msg.Data +} + +// UnifiedIPCConfig represents configuration for audio +type UnifiedIPCConfig struct { + SampleRate int + Channels int + FrameSize int +} + +// UnifiedIPCOpusConfig represents Opus-specific configuration +type UnifiedIPCOpusConfig struct { + SampleRate int + Channels int + FrameSize int + Bitrate int + Complexity int + VBR int + SignalType int + Bandwidth int + DTX int +} + +// UnifiedAudioServer provides common functionality for both input and output servers +type UnifiedAudioServer struct { + // Atomic counters for performance monitoring + bufferSize int64 // Current buffer size (atomic) + droppedFrames int64 // Dropped frames counter (atomic) + totalFrames int64 // Total frames counter (atomic) + + listener net.Listener + conn net.Conn + mtx sync.Mutex + running bool + logger zerolog.Logger + + // Message channels + messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages + processChan chan *UnifiedIPCMessage // Buffered channel for processing queue + wg sync.WaitGroup // Wait group for goroutine coordination + + // Configuration + socketPath string + magicNumber uint32 + socketBufferConfig SocketBufferConfig + + // Performance monitoring + latencyMonitor *LatencyMonitor + adaptiveOptimizer *AdaptiveOptimizer +} + +// NewUnifiedAudioServer creates a new unified audio server +func NewUnifiedAudioServer(isInput bool) (*UnifiedAudioServer, error) { + var socketPath string + var magicNumber uint32 + var componentName string + + if isInput { + socketPath = getInputSocketPath() + magicNumber = inputMagicNumber + componentName = "audio-input-server" + } else { + socketPath = getOutputSocketPath() + magicNumber = outputMagicNumber + componentName = "audio-output-server" + } + + logger := logging.GetDefaultLogger().With().Str("component", componentName).Logger() + + server := &UnifiedAudioServer{ + logger: logger, + socketPath: socketPath, + magicNumber: magicNumber, + messageChan: make(chan *UnifiedIPCMessage, GetConfig().ChannelBufferSize), + processChan: make(chan *UnifiedIPCMessage, GetConfig().ChannelBufferSize), + socketBufferConfig: DefaultSocketBufferConfig(), + latencyMonitor: nil, + adaptiveOptimizer: nil, + } + + return server, nil +} + +// Start starts the unified audio server +func (s *UnifiedAudioServer) Start() error { + s.mtx.Lock() + defer s.mtx.Unlock() + + if s.running { + return fmt.Errorf("server already running") + } + + // Remove existing socket file + if err := os.Remove(s.socketPath); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove existing socket: %w", err) + } + + // Create listener + listener, err := net.Listen("unix", s.socketPath) + if err != nil { + return fmt.Errorf("failed to create listener: %w", err) + } + + s.listener = listener + s.running = true + + // Start goroutines + s.wg.Add(3) + go s.acceptConnections() + go s.startReaderGoroutine() + go s.startProcessorGoroutine() + + s.logger.Info().Str("socket_path", s.socketPath).Msg("Unified audio server started") + return nil +} + +// Stop stops the unified audio server +func (s *UnifiedAudioServer) Stop() { + s.mtx.Lock() + defer s.mtx.Unlock() + + if !s.running { + return + } + + s.running = false + + if s.listener != nil { + s.listener.Close() + } + + if s.conn != nil { + s.conn.Close() + } + + // Close channels + close(s.messageChan) + close(s.processChan) + + // Wait for goroutines to finish + s.wg.Wait() + + // Remove socket file + os.Remove(s.socketPath) + + s.logger.Info().Msg("Unified audio server stopped") +} + +// acceptConnections handles incoming connections +func (s *UnifiedAudioServer) acceptConnections() { + defer s.wg.Done() + + for s.running { + conn, err := AcceptConnectionWithRetry(s.listener, 3, 100*time.Millisecond) + if err != nil { + if s.running { + s.logger.Error().Err(err).Msg("Failed to accept connection") + } + continue + } + + s.mtx.Lock() + if s.conn != nil { + s.conn.Close() + } + s.conn = conn + s.mtx.Unlock() + + s.logger.Info().Msg("Client connected") + } +} + +// startReaderGoroutine handles reading messages from the connection +func (s *UnifiedAudioServer) startReaderGoroutine() { + defer s.wg.Done() + + for s.running { + s.mtx.Lock() + conn := s.conn + s.mtx.Unlock() + + if conn == nil { + time.Sleep(10 * time.Millisecond) + continue + } + + msg, err := s.readMessage(conn) + if err != nil { + if s.running { + s.logger.Error().Err(err).Msg("Failed to read message") + } + continue + } + + select { + case s.messageChan <- msg: + default: + atomic.AddInt64(&s.droppedFrames, 1) + s.logger.Warn().Msg("Message channel full, dropping message") + } + } +} + +// startProcessorGoroutine handles processing messages +func (s *UnifiedAudioServer) startProcessorGoroutine() { + defer s.wg.Done() + + for msg := range s.messageChan { + select { + case s.processChan <- msg: + atomic.AddInt64(&s.totalFrames, 1) + default: + atomic.AddInt64(&s.droppedFrames, 1) + s.logger.Warn().Msg("Process channel full, dropping message") + } + } +} + +// readMessage reads a message from the connection +func (s *UnifiedAudioServer) readMessage(conn net.Conn) (*UnifiedIPCMessage, error) { + // Read header + header := make([]byte, headerSize) + if _, err := io.ReadFull(conn, header); err != nil { + return nil, fmt.Errorf("failed to read header: %w", err) + } + + // Parse header + magic := binary.LittleEndian.Uint32(header[0:4]) + if magic != s.magicNumber { + return nil, fmt.Errorf("invalid magic number: expected %d, got %d", s.magicNumber, magic) + } + + msgType := UnifiedMessageType(header[4]) + length := binary.LittleEndian.Uint32(header[5:9]) + timestamp := int64(binary.LittleEndian.Uint64(header[9:17])) + + // Validate length + if length > uint32(GetConfig().MaxFrameSize) { + return nil, fmt.Errorf("message too large: %d bytes", length) + } + + // Read data + var data []byte + if length > 0 { + data = make([]byte, length) + if _, err := io.ReadFull(conn, data); err != nil { + return nil, fmt.Errorf("failed to read data: %w", err) + } + } + + return &UnifiedIPCMessage{ + Magic: magic, + Type: msgType, + Length: length, + Timestamp: timestamp, + Data: data, + }, nil +} + +// SendFrame sends a frame to the connected client +func (s *UnifiedAudioServer) SendFrame(frame []byte) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + if !s.running || s.conn == nil { + return fmt.Errorf("no client connected") + } + + start := time.Now() + + // Create message + msg := &UnifiedIPCMessage{ + Magic: s.magicNumber, + Type: MessageTypeOpusFrame, + Length: uint32(len(frame)), + Timestamp: start.UnixNano(), + Data: frame, + } + + // Write message to connection + err := s.writeMessage(s.conn, msg) + if err != nil { + atomic.AddInt64(&s.droppedFrames, 1) + return err + } + + // Record latency for monitoring + if s.latencyMonitor != nil { + writeLatency := time.Since(start) + s.latencyMonitor.RecordLatency(writeLatency, "ipc_write") + } + + atomic.AddInt64(&s.totalFrames, 1) + return nil +} + +// writeMessage writes a message to the connection +func (s *UnifiedAudioServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error { + // Write header + header := make([]byte, headerSize) + binary.LittleEndian.PutUint32(header[0:4], msg.Magic) + header[4] = uint8(msg.Type) + binary.LittleEndian.PutUint32(header[5:9], msg.Length) + binary.LittleEndian.PutUint64(header[9:17], uint64(msg.Timestamp)) + + if _, err := conn.Write(header); err != nil { + return fmt.Errorf("failed to write header: %w", err) + } + + // Write data if present + if msg.Length > 0 && msg.Data != nil { + if _, err := conn.Write(msg.Data); err != nil { + return fmt.Errorf("failed to write data: %w", err) + } + } + + return nil +} + +// UnifiedAudioClient provides common functionality for both input and output clients +type UnifiedAudioClient struct { + // Atomic fields first for ARM32 alignment + droppedFrames int64 // Atomic counter for dropped frames + totalFrames int64 // Atomic counter for total frames + + conn net.Conn + mtx sync.Mutex + running bool + logger zerolog.Logger + socketPath string + magicNumber uint32 + bufferPool *AudioBufferPool // Buffer pool for memory optimization +} + +// NewUnifiedAudioClient creates a new unified audio client +func NewUnifiedAudioClient(isInput bool) *UnifiedAudioClient { + var socketPath string + var magicNumber uint32 + var componentName string + + if isInput { + socketPath = getInputSocketPath() + magicNumber = inputMagicNumber + componentName = "audio-input-client" + } else { + socketPath = getOutputSocketPath() + magicNumber = outputMagicNumber + componentName = "audio-output-client" + } + + logger := logging.GetDefaultLogger().With().Str("component", componentName).Logger() + + return &UnifiedAudioClient{ + logger: logger, + socketPath: socketPath, + magicNumber: magicNumber, + bufferPool: NewAudioBufferPool(GetConfig().MaxFrameSize), + } +} + +// Connect connects the client to the server +func (c *UnifiedAudioClient) Connect() error { + c.mtx.Lock() + defer c.mtx.Unlock() + + if c.running { + return nil // Already connected + } + + // Ensure clean state before connecting + if c.conn != nil { + c.conn.Close() + c.conn = nil + } + + // Try connecting multiple times as the server might not be ready + // Reduced retry count and delay for faster startup + for i := 0; i < 10; i++ { + conn, err := net.Dial("unix", c.socketPath) + if err == nil { + c.conn = conn + c.running = true + // Reset frame counters on successful connection + atomic.StoreInt64(&c.totalFrames, 0) + atomic.StoreInt64(&c.droppedFrames, 0) + c.logger.Info().Str("socket_path", c.socketPath).Msg("Connected to server") + return nil + } + // Exponential backoff starting from config + backoffStart := GetConfig().BackoffStart + delay := time.Duration(backoffStart.Nanoseconds()*(1< maxDelay { + delay = maxDelay + } + time.Sleep(delay) + } + + // Ensure clean state on connection failure + c.conn = nil + c.running = false + return fmt.Errorf("failed to connect to audio server after 10 attempts") +} + +// Disconnect disconnects the client from the server +func (c *UnifiedAudioClient) Disconnect() { + c.mtx.Lock() + defer c.mtx.Unlock() + + if !c.running { + return + } + + c.running = false + + if c.conn != nil { + c.conn.Close() + c.conn = nil + } + + c.logger.Info().Msg("Disconnected from server") +} + +// IsConnected returns whether the client is connected +func (c *UnifiedAudioClient) IsConnected() bool { + c.mtx.Lock() + defer c.mtx.Unlock() + return c.running && c.conn != nil +} + +// GetFrameStats returns frame statistics +func (c *UnifiedAudioClient) GetFrameStats() (total, dropped int64) { + total = atomic.LoadInt64(&c.totalFrames) + dropped = atomic.LoadInt64(&c.droppedFrames) + return total, dropped +} + +// Helper functions for socket paths +func getInputSocketPath() string { + return filepath.Join(os.TempDir(), inputSocketName) +} + +func getOutputSocketPath() string { + return filepath.Join(os.TempDir(), outputSocketName) +} diff --git a/internal/audio/validation.go b/internal/audio/validation.go index 278352f1..be1cc976 100644 --- a/internal/audio/validation.go +++ b/internal/audio/validation.go @@ -11,13 +11,13 @@ import ( // Validation errors var ( - ErrInvalidAudioQuality = errors.New("invalid audio quality level") - ErrInvalidFrameSize = errors.New("invalid frame size") - ErrInvalidFrameData = errors.New("invalid frame data") - ErrFrameDataEmpty = errors.New("invalid frame data: frame data is empty") - ErrFrameDataTooLarge = errors.New("invalid frame data: exceeds maximum") - ErrInvalidBufferSize = errors.New("invalid buffer size") - ErrInvalidPriority = errors.New("invalid priority value") + ErrInvalidAudioQuality = errors.New("invalid audio quality level") + ErrInvalidFrameSize = errors.New("invalid frame size") + ErrInvalidFrameData = errors.New("invalid frame data") + ErrFrameDataEmpty = errors.New("invalid frame data: frame data is empty") + ErrFrameDataTooLarge = errors.New("invalid frame data: exceeds maximum") + ErrInvalidBufferSize = errors.New("invalid buffer size") + ErrInvalidLatency = errors.New("invalid latency value") ErrInvalidConfiguration = errors.New("invalid configuration") ErrInvalidSocketConfig = errors.New("invalid socket configuration") @@ -99,16 +99,6 @@ func ValidateBufferSize(size int) error { return nil } -// ValidateThreadPriority validates thread priority values with system limits -func ValidateThreadPriority(priority int) error { - const minPriority, maxPriority = -20, 19 - if priority < minPriority || priority > maxPriority { - return fmt.Errorf("%w: priority %d outside valid range [%d, %d]", - ErrInvalidPriority, priority, minPriority, maxPriority) - } - return nil -} - // ValidateLatency validates latency duration values with reasonable bounds // Optimized to use AudioConfigCache for frequently accessed values func ValidateLatency(latency time.Duration) error { diff --git a/internal/audio/relay.go b/internal/audio/webrtc_relay.go similarity index 100% rename from internal/audio/relay.go rename to internal/audio/webrtc_relay.go diff --git a/internal/audio/events.go b/internal/audio/websocket_events.go similarity index 100% rename from internal/audio/events.go rename to internal/audio/websocket_events.go diff --git a/main.go b/main.go index 0245ae4f..06a1cc2f 100644 --- a/main.go +++ b/main.go @@ -68,8 +68,6 @@ func startAudioSubprocess() error { config.AudioQualityLowOpusDTX, ) - - // Note: Audio input supervisor is NOT started here - it will be started on-demand // when the user activates microphone input through the UI