//go:build cgo // +build cgo // Package audio provides real-time audio processing for JetKVM with low-latency streaming. // // Key components: output/input pipelines with Opus codec, buffer management, // zero-copy frame pools, IPC communication, and process supervision. // // Supports four quality presets (Low/Medium/High/Ultra) with configurable bitrates. // All APIs are thread-safe with comprehensive error handling and metrics collection. // // # Performance Characteristics // // Designed for embedded ARM systems with limited resources: // - Sub-50ms end-to-end latency under normal conditions // - Memory usage scales with buffer configuration // - CPU usage optimized through zero-copy operations // - Network bandwidth adapts to quality settings // // # Usage Example // // config := GetAudioConfig() // SetAudioQuality(AudioQualityHigh) // // // Audio output will automatically start when frames are received package audio import ( "errors" "sync/atomic" "time" "github.com/jetkvm/kvm/internal/logging" ) var ( ErrAudioAlreadyRunning = errors.New("audio already running") ) // MaxAudioFrameSize is now retrieved from centralized config func GetMaxAudioFrameSize() int { return Config.MaxAudioFrameSize } // AudioQuality represents different audio quality presets type AudioQuality int const ( AudioQualityLow AudioQuality = iota AudioQualityMedium AudioQualityHigh AudioQualityUltra ) // AudioConfig holds configuration for audio processing type AudioConfig struct { Quality AudioQuality Bitrate int // kbps SampleRate int // Hz Channels int FrameSize time.Duration // ms } // AudioMetrics tracks audio performance metrics type AudioMetrics struct { FramesReceived uint64 FramesDropped uint64 BytesProcessed uint64 ConnectionDrops uint64 LastFrameTime time.Time AverageLatency time.Duration } var ( currentConfig = AudioConfig{ Quality: AudioQualityMedium, Bitrate: Config.AudioQualityMediumOutputBitrate, SampleRate: Config.SampleRate, Channels: Config.Channels, FrameSize: Config.AudioQualityMediumFrameSize, } currentMicrophoneConfig = AudioConfig{ Quality: AudioQualityMedium, Bitrate: Config.AudioQualityMediumInputBitrate, SampleRate: Config.SampleRate, Channels: 1, FrameSize: Config.AudioQualityMediumFrameSize, } metrics AudioMetrics ) // qualityPresets defines the base quality configurations var qualityPresets = map[AudioQuality]struct { outputBitrate, inputBitrate int sampleRate, channels int frameSize time.Duration }{ AudioQualityLow: { outputBitrate: Config.AudioQualityLowOutputBitrate, inputBitrate: Config.AudioQualityLowInputBitrate, sampleRate: Config.AudioQualityLowSampleRate, channels: Config.AudioQualityLowChannels, frameSize: Config.AudioQualityLowFrameSize, }, AudioQualityMedium: { outputBitrate: Config.AudioQualityMediumOutputBitrate, inputBitrate: Config.AudioQualityMediumInputBitrate, sampleRate: Config.AudioQualityMediumSampleRate, channels: Config.AudioQualityMediumChannels, frameSize: Config.AudioQualityMediumFrameSize, }, AudioQualityHigh: { outputBitrate: Config.AudioQualityHighOutputBitrate, inputBitrate: Config.AudioQualityHighInputBitrate, sampleRate: Config.SampleRate, channels: Config.AudioQualityHighChannels, frameSize: Config.AudioQualityHighFrameSize, }, AudioQualityUltra: { outputBitrate: Config.AudioQualityUltraOutputBitrate, inputBitrate: Config.AudioQualityUltraInputBitrate, sampleRate: Config.SampleRate, channels: Config.AudioQualityUltraChannels, frameSize: Config.AudioQualityUltraFrameSize, }, } // GetAudioQualityPresets returns predefined quality configurations for audio output func GetAudioQualityPresets() map[AudioQuality]AudioConfig { result := make(map[AudioQuality]AudioConfig) for quality, preset := range qualityPresets { config := AudioConfig{ Quality: quality, Bitrate: preset.outputBitrate, SampleRate: preset.sampleRate, Channels: preset.channels, FrameSize: preset.frameSize, } result[quality] = config } return result } // GetMicrophoneQualityPresets returns predefined quality configurations for microphone input func GetMicrophoneQualityPresets() map[AudioQuality]AudioConfig { result := make(map[AudioQuality]AudioConfig) for quality, preset := range qualityPresets { config := AudioConfig{ Quality: quality, Bitrate: preset.inputBitrate, SampleRate: func() int { if quality == AudioQualityLow { return Config.AudioQualityMicLowSampleRate } return preset.sampleRate }(), Channels: 1, // Microphone is always mono FrameSize: preset.frameSize, } result[quality] = config } return result } // SetAudioQuality updates the current audio quality configuration func SetAudioQuality(quality AudioQuality) { // Validate audio quality parameter if err := ValidateAudioQuality(quality); err != nil { // Log validation error but don't fail - maintain backward compatibility logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger() logger.Warn().Err(err).Int("quality", int(quality)).Msg("invalid audio quality, using current config") return } presets := GetAudioQualityPresets() if config, exists := presets[quality]; exists { currentConfig = config // Get OPUS encoder parameters based on quality var complexity, vbr, signalType, bandwidth, dtx int switch quality { case AudioQualityLow: complexity = Config.AudioQualityLowOpusComplexity vbr = Config.AudioQualityLowOpusVBR signalType = Config.AudioQualityLowOpusSignalType bandwidth = Config.AudioQualityLowOpusBandwidth dtx = Config.AudioQualityLowOpusDTX case AudioQualityMedium: complexity = Config.AudioQualityMediumOpusComplexity vbr = Config.AudioQualityMediumOpusVBR signalType = Config.AudioQualityMediumOpusSignalType bandwidth = Config.AudioQualityMediumOpusBandwidth dtx = Config.AudioQualityMediumOpusDTX case AudioQualityHigh: complexity = Config.AudioQualityHighOpusComplexity vbr = Config.AudioQualityHighOpusVBR signalType = Config.AudioQualityHighOpusSignalType bandwidth = Config.AudioQualityHighOpusBandwidth dtx = Config.AudioQualityHighOpusDTX case AudioQualityUltra: complexity = Config.AudioQualityUltraOpusComplexity vbr = Config.AudioQualityUltraOpusVBR signalType = Config.AudioQualityUltraOpusSignalType bandwidth = Config.AudioQualityUltraOpusBandwidth dtx = Config.AudioQualityUltraOpusDTX default: // Use medium quality as fallback complexity = Config.AudioQualityMediumOpusComplexity vbr = Config.AudioQualityMediumOpusVBR signalType = Config.AudioQualityMediumOpusSignalType bandwidth = Config.AudioQualityMediumOpusBandwidth dtx = Config.AudioQualityMediumOpusDTX } // Update audio output subprocess configuration dynamically without restart logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger() logger.Info().Int("quality", int(quality)).Msg("updating audio output quality settings dynamically") // Set new OPUS configuration for future restarts if supervisor := GetAudioOutputSupervisor(); supervisor != nil { supervisor.SetOpusConfig(config.Bitrate*1000, complexity, vbr, signalType, bandwidth, dtx) // Send dynamic configuration update to running subprocess via IPC if supervisor.IsConnected() { // Convert AudioConfig to UnifiedIPCOpusConfig with complete Opus parameters opusConfig := UnifiedIPCOpusConfig{ SampleRate: config.SampleRate, Channels: config.Channels, FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples Bitrate: config.Bitrate * 1000, // Convert kbps to bps Complexity: complexity, VBR: vbr, SignalType: signalType, Bandwidth: bandwidth, DTX: dtx, } logger.Info().Interface("opusConfig", opusConfig).Msg("sending Opus configuration to audio output subprocess") if err := supervisor.SendOpusConfig(opusConfig); err != nil { logger.Warn().Err(err).Msg("failed to send dynamic Opus config update via IPC, falling back to subprocess restart") // Fallback to subprocess restart if IPC update fails supervisor.Stop() if err := supervisor.Start(); err != nil { logger.Error().Err(err).Msg("failed to restart audio output subprocess after IPC update failure") } } else { logger.Info().Msg("audio output quality updated dynamically via IPC") // Reset audio output stats after config update go func() { time.Sleep(Config.QualityChangeSettleDelay) // Wait for quality change to settle // Reset audio input server stats to clear persistent warnings ResetGlobalAudioInputServerStats() // Attempt recovery if there are still issues time.Sleep(1 * time.Second) RecoverGlobalAudioInputServer() }() } } else { logger.Info().Bool("supervisor_running", supervisor.IsRunning()).Msg("audio output subprocess not connected, configuration will apply on next start") } } } } // GetAudioConfig returns the current audio configuration func GetAudioConfig() AudioConfig { return currentConfig } // Simplified OPUS parameter lookup table var opusParams = map[AudioQuality]struct { complexity, vbr, signalType, bandwidth, dtx int }{ AudioQualityLow: {Config.AudioQualityLowOpusComplexity, Config.AudioQualityLowOpusVBR, Config.AudioQualityLowOpusSignalType, Config.AudioQualityLowOpusBandwidth, Config.AudioQualityLowOpusDTX}, AudioQualityMedium: {Config.AudioQualityMediumOpusComplexity, Config.AudioQualityMediumOpusVBR, Config.AudioQualityMediumOpusSignalType, Config.AudioQualityMediumOpusBandwidth, Config.AudioQualityMediumOpusDTX}, AudioQualityHigh: {Config.AudioQualityHighOpusComplexity, Config.AudioQualityHighOpusVBR, Config.AudioQualityHighOpusSignalType, Config.AudioQualityHighOpusBandwidth, Config.AudioQualityHighOpusDTX}, AudioQualityUltra: {Config.AudioQualityUltraOpusComplexity, Config.AudioQualityUltraOpusVBR, Config.AudioQualityUltraOpusSignalType, Config.AudioQualityUltraOpusBandwidth, Config.AudioQualityUltraOpusDTX}, } // SetMicrophoneQuality updates the current microphone quality configuration func SetMicrophoneQuality(quality AudioQuality) { // Validate audio quality parameter if err := ValidateAudioQuality(quality); err != nil { // Log validation error but don't fail - maintain backward compatibility logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger() logger.Warn().Err(err).Int("quality", int(quality)).Msg("invalid microphone quality, using current config") return } presets := GetMicrophoneQualityPresets() if config, exists := presets[quality]; exists { currentMicrophoneConfig = config // Get OPUS parameters using lookup table params, exists := opusParams[quality] if !exists { // Fallback to medium quality params = opusParams[AudioQualityMedium] } // Update audio input subprocess configuration dynamically without restart logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger() // Set new OPUS configuration for future restarts if supervisor := GetAudioInputSupervisor(); supervisor != nil { supervisor.SetOpusConfig(config.Bitrate*1000, params.complexity, params.vbr, params.signalType, params.bandwidth, params.dtx) // Check if microphone is active but IPC control is broken inputManager := getAudioInputManager() if inputManager.IsRunning() && !supervisor.IsConnected() { // Reconnect the IPC control channel supervisor.Stop() time.Sleep(50 * time.Millisecond) if err := supervisor.Start(); err != nil { logger.Debug().Err(err).Msg("failed to reconnect IPC control channel") } } // Send dynamic configuration update to running subprocess via IPC if supervisor.IsConnected() { // Convert AudioConfig to UnifiedIPCOpusConfig with complete Opus parameters opusConfig := UnifiedIPCOpusConfig{ SampleRate: config.SampleRate, Channels: config.Channels, FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples Bitrate: config.Bitrate * 1000, // Convert kbps to bps Complexity: params.complexity, VBR: params.vbr, SignalType: params.signalType, Bandwidth: params.bandwidth, DTX: params.dtx, } if err := supervisor.SendOpusConfig(opusConfig); err != nil { logger.Debug().Err(err).Msg("failed to send dynamic Opus config update via IPC") // Fallback to subprocess restart if IPC update fails supervisor.Stop() if err := supervisor.Start(); err != nil { logger.Error().Err(err).Msg("failed to restart audio input subprocess after IPC update failure") } } else { logger.Info().Msg("audio input quality updated dynamically via IPC") // Reset audio input stats after config update go func() { time.Sleep(Config.QualityChangeSettleDelay) // Wait for quality change to settle // Reset audio input server stats to clear persistent warnings ResetGlobalAudioInputServerStats() // Attempt recovery if microphone is still having issues time.Sleep(1 * time.Second) RecoverGlobalAudioInputServer() }() } } else { logger.Info().Bool("supervisor_running", supervisor.IsRunning()).Msg("audio input subprocess not connected, configuration will apply on next start") } } } } // GetMicrophoneConfig returns the current microphone configuration func GetMicrophoneConfig() AudioConfig { return currentMicrophoneConfig } // GetGlobalAudioMetrics returns the current global audio metrics func GetGlobalAudioMetrics() AudioMetrics { return metrics } // Batched metrics to reduce atomic operations frequency var ( batchedFramesReceived uint64 batchedBytesProcessed uint64 batchedFramesDropped uint64 batchedConnectionDrops uint64 lastFlushTime int64 // Unix timestamp in nanoseconds ) // RecordFrameReceived increments the frames received counter with batched updates func RecordFrameReceived(bytes int) { // Use local batching to reduce atomic operations frequency atomic.AddUint64(&batchedBytesProcessed, uint64(bytes)) // Update timestamp immediately for accurate tracking metrics.LastFrameTime = time.Now() } // RecordFrameDropped increments the frames dropped counter with batched updates func RecordFrameDropped() { } // RecordConnectionDrop increments the connection drops counter with batched updates func RecordConnectionDrop() { } // flushBatchedMetrics flushes accumulated metrics to the main counters func flushBatchedMetrics() { // Atomically move batched metrics to main metrics framesReceived := atomic.SwapUint64(&batchedFramesReceived, 0) bytesProcessed := atomic.SwapUint64(&batchedBytesProcessed, 0) framesDropped := atomic.SwapUint64(&batchedFramesDropped, 0) connectionDrops := atomic.SwapUint64(&batchedConnectionDrops, 0) // Update main metrics if we have any batched data if framesReceived > 0 { atomic.AddUint64(&metrics.FramesReceived, framesReceived) } if bytesProcessed > 0 { atomic.AddUint64(&metrics.BytesProcessed, bytesProcessed) } if framesDropped > 0 { atomic.AddUint64(&metrics.FramesDropped, framesDropped) } if connectionDrops > 0 { atomic.AddUint64(&metrics.ConnectionDrops, connectionDrops) } // Update last flush time atomic.StoreInt64(&lastFlushTime, time.Now().UnixNano()) } // FlushPendingMetrics forces a flush of all batched metrics func FlushPendingMetrics() { flushBatchedMetrics() }