diff --git a/internal/audio/audio.go b/internal/audio/audio.go index 5c3fb56..a6768de 100644 --- a/internal/audio/audio.go +++ b/internal/audio/audio.go @@ -166,6 +166,47 @@ func SetAudioQuality(quality AudioQuality) { presets := GetAudioQualityPresets() if config, exists := presets[quality]; exists { currentConfig = config + + // Update CGO OPUS encoder parameters based on quality + var complexity, vbr, signalType, bandwidth, dtx int + switch quality { + case AudioQualityLow: + complexity = GetConfig().AudioQualityLowOpusComplexity + vbr = GetConfig().AudioQualityLowOpusVBR + signalType = GetConfig().AudioQualityLowOpusSignalType + bandwidth = GetConfig().AudioQualityLowOpusBandwidth + dtx = GetConfig().AudioQualityLowOpusDTX + case AudioQualityMedium: + complexity = GetConfig().AudioQualityMediumOpusComplexity + vbr = GetConfig().AudioQualityMediumOpusVBR + signalType = GetConfig().AudioQualityMediumOpusSignalType + bandwidth = GetConfig().AudioQualityMediumOpusBandwidth + dtx = GetConfig().AudioQualityMediumOpusDTX + case AudioQualityHigh: + complexity = GetConfig().AudioQualityHighOpusComplexity + vbr = GetConfig().AudioQualityHighOpusVBR + signalType = GetConfig().AudioQualityHighOpusSignalType + bandwidth = GetConfig().AudioQualityHighOpusBandwidth + dtx = GetConfig().AudioQualityHighOpusDTX + case AudioQualityUltra: + complexity = GetConfig().AudioQualityUltraOpusComplexity + vbr = GetConfig().AudioQualityUltraOpusVBR + signalType = GetConfig().AudioQualityUltraOpusSignalType + bandwidth = GetConfig().AudioQualityUltraOpusBandwidth + dtx = GetConfig().AudioQualityUltraOpusDTX + default: + // Use medium quality as fallback + complexity = GetConfig().AudioQualityMediumOpusComplexity + vbr = GetConfig().AudioQualityMediumOpusVBR + signalType = GetConfig().AudioQualityMediumOpusSignalType + bandwidth = GetConfig().AudioQualityMediumOpusBandwidth + dtx = GetConfig().AudioQualityMediumOpusDTX + } + + // Dynamically update CGO OPUS encoder parameters + // Use current VBR constraint setting from config + vbrConstraint := GetConfig().CGOOpusVBRConstraint + updateOpusEncoderParams(config.Bitrate*1000, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx) } } diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index f8df3c8..56c6aa5 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -65,6 +65,35 @@ static volatile int capture_initialized = 0; static volatile int playback_initializing = 0; static volatile int playback_initialized = 0; +// Function to dynamically update Opus encoder parameters +int update_opus_encoder_params(int bitrate, int complexity, int vbr, int vbr_constraint, + int signal_type, int bandwidth, int dtx) { + if (!encoder || !capture_initialized) { + return -1; // Encoder not initialized + } + + // Update the static variables + opus_bitrate = bitrate; + opus_complexity = complexity; + opus_vbr = vbr; + opus_vbr_constraint = vbr_constraint; + opus_signal_type = signal_type; + opus_bandwidth = bandwidth; + opus_dtx = dtx; + + // Apply the new settings to the encoder + int result = 0; + result |= opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate)); + result |= opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(opus_complexity)); + result |= opus_encoder_ctl(encoder, OPUS_SET_VBR(opus_vbr)); + result |= opus_encoder_ctl(encoder, OPUS_SET_VBR_CONSTRAINT(opus_vbr_constraint)); + result |= opus_encoder_ctl(encoder, OPUS_SET_SIGNAL(opus_signal_type)); + result |= opus_encoder_ctl(encoder, OPUS_SET_BANDWIDTH(opus_bandwidth)); + result |= opus_encoder_ctl(encoder, OPUS_SET_DTX(opus_dtx)); + + return result; // 0 on success, non-zero on error +} + // Enhanced ALSA device opening with exponential backoff retry logic static int safe_alsa_open(snd_pcm_t **handle, const char *device, snd_pcm_stream_t stream) { int attempt = 0; @@ -733,12 +762,30 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) { return int(n), nil } +// updateOpusEncoderParams dynamically updates OPUS encoder parameters +func updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx int) error { + result := C.update_opus_encoder_params( + C.int(bitrate), + C.int(complexity), + C.int(vbr), + C.int(vbrConstraint), + C.int(signalType), + C.int(bandwidth), + C.int(dtx), + ) + if result != 0 { + return fmt.Errorf("failed to update OPUS encoder parameters: C error code %d", result) + } + return nil +} + // CGO function aliases var ( - CGOAudioInit = cgoAudioInit - CGOAudioClose = cgoAudioClose - CGOAudioReadEncode = cgoAudioReadEncode - CGOAudioPlaybackInit = cgoAudioPlaybackInit - CGOAudioPlaybackClose = cgoAudioPlaybackClose - CGOAudioDecodeWrite = cgoAudioDecodeWrite + CGOAudioInit = cgoAudioInit + CGOAudioClose = cgoAudioClose + CGOAudioReadEncode = cgoAudioReadEncode + CGOAudioPlaybackInit = cgoAudioPlaybackInit + CGOAudioPlaybackClose = cgoAudioPlaybackClose + CGOAudioDecodeWrite = cgoAudioDecodeWrite + CGOUpdateOpusEncoderParams = updateOpusEncoderParams ) diff --git a/internal/audio/config_constants.go b/internal/audio/config_constants.go index 3f1c617..9686a69 100644 --- a/internal/audio/config_constants.go +++ b/internal/audio/config_constants.go @@ -53,6 +53,31 @@ type AudioConfigConstants struct { AudioQualityHighChannels int // High-quality channel count (default: 2) AudioQualityUltraChannels int // Ultra-quality channel count (default: 2) + // Audio Quality OPUS Encoder Parameters + AudioQualityLowOpusComplexity int // Low-quality OPUS complexity (default: 1) + AudioQualityLowOpusVBR int // Low-quality OPUS VBR setting (default: 0) + AudioQualityLowOpusSignalType int // Low-quality OPUS signal type (default: 3001) + AudioQualityLowOpusBandwidth int // Low-quality OPUS bandwidth (default: 1101) + AudioQualityLowOpusDTX int // Low-quality OPUS DTX setting (default: 1) + + AudioQualityMediumOpusComplexity int // Medium-quality OPUS complexity (default: 5) + AudioQualityMediumOpusVBR int // Medium-quality OPUS VBR setting (default: 1) + AudioQualityMediumOpusSignalType int // Medium-quality OPUS signal type (default: 3002) + AudioQualityMediumOpusBandwidth int // Medium-quality OPUS bandwidth (default: 1103) + AudioQualityMediumOpusDTX int // Medium-quality OPUS DTX setting (default: 0) + + AudioQualityHighOpusComplexity int // High-quality OPUS complexity (default: 8) + AudioQualityHighOpusVBR int // High-quality OPUS VBR setting (default: 1) + AudioQualityHighOpusSignalType int // High-quality OPUS signal type (default: 3002) + AudioQualityHighOpusBandwidth int // High-quality OPUS bandwidth (default: 1104) + AudioQualityHighOpusDTX int // High-quality OPUS DTX setting (default: 0) + + AudioQualityUltraOpusComplexity int // Ultra-quality OPUS complexity (default: 10) + AudioQualityUltraOpusVBR int // Ultra-quality OPUS VBR setting (default: 1) + AudioQualityUltraOpusSignalType int // Ultra-quality OPUS signal type (default: 3002) + AudioQualityUltraOpusBandwidth int // Ultra-quality OPUS bandwidth (default: 1105) + AudioQualityUltraOpusDTX int // Ultra-quality OPUS DTX setting (default: 0) + // CGO Audio Constants CGOOpusBitrate int // Native Opus encoder bitrate in bps (default: 96000) @@ -1616,6 +1641,38 @@ func DefaultAudioConfig() *AudioConfigConstants { AudioQualityHighChannels: 2, AudioQualityUltraChannels: 2, + // Audio Quality OPUS Encoder Parameters - Quality-specific encoder settings + // Used in: Dynamic OPUS encoder configuration based on quality presets + // Impact: Controls encoding complexity, VBR, signal type, bandwidth, and DTX + + // Low Quality OPUS Parameters - Optimized for bandwidth conservation + AudioQualityLowOpusComplexity: 1, // Low complexity for minimal CPU usage + AudioQualityLowOpusVBR: 0, // CBR for predictable bandwidth + AudioQualityLowOpusSignalType: 3001, // OPUS_SIGNAL_VOICE + AudioQualityLowOpusBandwidth: 1101, // OPUS_BANDWIDTH_NARROWBAND + AudioQualityLowOpusDTX: 1, // Enable DTX for silence suppression + + // Medium Quality OPUS Parameters - Balanced performance and quality + AudioQualityMediumOpusComplexity: 5, // Medium complexity for balanced performance + AudioQualityMediumOpusVBR: 1, // VBR for better quality + AudioQualityMediumOpusSignalType: 3002, // OPUS_SIGNAL_MUSIC + AudioQualityMediumOpusBandwidth: 1103, // OPUS_BANDWIDTH_WIDEBAND + AudioQualityMediumOpusDTX: 0, // Disable DTX for consistent quality + + // High Quality OPUS Parameters - High quality with good performance + AudioQualityHighOpusComplexity: 8, // High complexity for better quality + AudioQualityHighOpusVBR: 1, // VBR for optimal quality + AudioQualityHighOpusSignalType: 3002, // OPUS_SIGNAL_MUSIC + AudioQualityHighOpusBandwidth: 1104, // OPUS_BANDWIDTH_SUPERWIDEBAND + AudioQualityHighOpusDTX: 0, // Disable DTX for consistent quality + + // Ultra Quality OPUS Parameters - Maximum quality settings + AudioQualityUltraOpusComplexity: 10, // Maximum complexity for best quality + AudioQualityUltraOpusVBR: 1, // VBR for optimal quality + AudioQualityUltraOpusSignalType: 3002, // OPUS_SIGNAL_MUSIC + AudioQualityUltraOpusBandwidth: 1105, // OPUS_BANDWIDTH_FULLBAND + AudioQualityUltraOpusDTX: 0, // Disable DTX for maximum quality + // CGO Audio Constants CGOOpusBitrate: 96000, CGOOpusComplexity: 3, diff --git a/internal/audio/input.go b/internal/audio/input.go index 192e367..fc6f5c6 100644 --- a/internal/audio/input.go +++ b/internal/audio/input.go @@ -84,7 +84,7 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error { } // Use ultra-fast validation for critical audio path - if err := ValidateAudioFrameUltraFast(frame); err != nil { + if err := ValidateAudioFrame(frame); err != nil { aim.logComponentError(AudioInputManagerComponent, err, "Frame validation failed") return fmt.Errorf("input frame validation failed: %w", err) } diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go index a7cece1..d56eac6 100644 --- a/internal/audio/input_ipc.go +++ b/internal/audio/input_ipc.go @@ -478,7 +478,7 @@ func (ais *AudioInputServer) processOpusFrame(data []byte) error { } // Use ultra-fast validation for critical audio path - if err := ValidateAudioFrameUltraFast(data); err != nil { + if err := ValidateAudioFrame(data); err != nil { logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() logger.Error().Err(err).Msg("Frame validation failed") return fmt.Errorf("input frame validation failed: %w", err) @@ -635,7 +635,7 @@ func (aic *AudioInputClient) SendFrame(frame []byte) error { } // Validate frame data before sending - if err := ValidateAudioFrameUltraFast(frame); err != nil { + if err := ValidateAudioFrame(frame); err != nil { logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() logger.Error().Err(err).Msg("Frame validation failed") return fmt.Errorf("input frame validation failed: %w", err) diff --git a/internal/audio/input_ipc_manager.go b/internal/audio/input_ipc_manager.go index 7ac0c49..ec57181 100644 --- a/internal/audio/input_ipc_manager.go +++ b/internal/audio/input_ipc_manager.go @@ -103,7 +103,7 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error { } // Validate frame data - if err := ValidateAudioFrameUltraFast(frame); err != nil { + if err := ValidateAudioFrame(frame); err != nil { atomic.AddInt64(&aim.metrics.FramesDropped, 1) aim.logger.Debug().Err(err).Msg("invalid frame data") return err diff --git a/internal/audio/input_server_main.go b/internal/audio/input_server_main.go index 55a6639..6846d59 100644 --- a/internal/audio/input_server_main.go +++ b/internal/audio/input_server_main.go @@ -16,6 +16,9 @@ func RunAudioInputServer() error { logger := logging.GetDefaultLogger().With().Str("component", "audio-input-server").Logger() logger.Debug().Msg("audio input server subprocess starting") + // Initialize validation cache for optimal performance + InitValidationCache() + // Start adaptive buffer management for optimal performance StartAdaptiveBuffering() defer StopAdaptiveBuffering() diff --git a/internal/audio/ipc.go b/internal/audio/ipc.go index 2cd2efb..208f7aa 100644 --- a/internal/audio/ipc.go +++ b/internal/audio/ipc.go @@ -261,7 +261,7 @@ func (s *AudioOutputServer) Close() error { func (s *AudioOutputServer) SendFrame(frame []byte) error { // Use ultra-fast validation for critical audio path - if err := ValidateAudioFrameUltraFast(frame); err != nil { + if err := ValidateAudioFrame(frame); err != nil { logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger() logger.Error().Err(err).Msg("Frame validation failed") return fmt.Errorf("output frame validation failed: %w", err) diff --git a/internal/audio/latency_profiler.go b/internal/audio/latency_profiler.go new file mode 100644 index 0000000..f723594 --- /dev/null +++ b/internal/audio/latency_profiler.go @@ -0,0 +1,535 @@ +package audio + +import ( + "context" + "fmt" + "runtime" + "sync" + "sync/atomic" + "time" + "unsafe" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +// LatencyProfiler provides comprehensive end-to-end audio latency profiling +// with nanosecond precision across the entire WebRTC->IPC->CGO->ALSA pipeline +type LatencyProfiler struct { + // Atomic counters for thread-safe access (MUST be first for ARM32 alignment) + totalMeasurements int64 // Total number of measurements taken + webrtcLatencySum int64 // Sum of WebRTC processing latencies (nanoseconds) + ipcLatencySum int64 // Sum of IPC communication latencies (nanoseconds) + cgoLatencySum int64 // Sum of CGO call latencies (nanoseconds) + alsaLatencySum int64 // Sum of ALSA device latencies (nanoseconds) + endToEndLatencySum int64 // Sum of complete end-to-end latencies (nanoseconds) + validationLatencySum int64 // Sum of validation overhead (nanoseconds) + serializationLatencySum int64 // Sum of serialization overhead (nanoseconds) + + // Peak latency tracking + maxWebrtcLatency int64 // Maximum WebRTC latency observed (nanoseconds) + maxIpcLatency int64 // Maximum IPC latency observed (nanoseconds) + maxCgoLatency int64 // Maximum CGO latency observed (nanoseconds) + maxAlsaLatency int64 // Maximum ALSA latency observed (nanoseconds) + maxEndToEndLatency int64 // Maximum end-to-end latency observed (nanoseconds) + + // Configuration and control + config LatencyProfilerConfig + logger zerolog.Logger + ctx context.Context + cancel context.CancelFunc + running int32 // Atomic flag for profiler state + enabled int32 // Atomic flag for measurement collection + + // Detailed measurement storage + measurements []DetailedLatencyMeasurement + measurementMutex sync.RWMutex + measurementIndex int + + // High-resolution timing + timeSource func() int64 // Nanosecond precision time source +} + +// LatencyProfilerConfig defines profiler configuration +type LatencyProfilerConfig struct { + MaxMeasurements int // Maximum measurements to store in memory + SamplingRate float64 // Sampling rate (0.0-1.0, 1.0 = profile every frame) + ReportingInterval time.Duration // How often to log profiling reports + ThresholdWarning time.Duration // Latency threshold for warnings + ThresholdCritical time.Duration // Latency threshold for critical alerts + EnableDetailedTrace bool // Enable detailed per-component tracing + EnableHistogram bool // Enable latency histogram collection +} + +// DetailedLatencyMeasurement captures comprehensive latency breakdown +type DetailedLatencyMeasurement struct { + Timestamp time.Time // When the measurement was taken + FrameID uint64 // Unique frame identifier for tracing + WebRTCLatency time.Duration // WebRTC processing time + IPCLatency time.Duration // IPC communication time + CGOLatency time.Duration // CGO call overhead + ALSALatency time.Duration // ALSA device processing time + ValidationLatency time.Duration // Frame validation overhead + SerializationLatency time.Duration // Data serialization overhead + EndToEndLatency time.Duration // Complete pipeline latency + Source string // Source component (input/output) + FrameSize int // Size of the audio frame in bytes + CPUUsage float64 // CPU usage at time of measurement + MemoryUsage uint64 // Memory usage at time of measurement +} + +// LatencyProfileReport contains aggregated profiling results +type LatencyProfileReport struct { + TotalMeasurements int64 // Total measurements taken + TimeRange time.Duration // Time span of measurements + + // Average latencies + AvgWebRTCLatency time.Duration + AvgIPCLatency time.Duration + AvgCGOLatency time.Duration + AvgALSALatency time.Duration + AvgEndToEndLatency time.Duration + AvgValidationLatency time.Duration + AvgSerializationLatency time.Duration + + // Peak latencies + MaxWebRTCLatency time.Duration + MaxIPCLatency time.Duration + MaxCGOLatency time.Duration + MaxALSALatency time.Duration + MaxEndToEndLatency time.Duration + + // Performance analysis + BottleneckComponent string // Component with highest average latency + LatencyDistribution map[string]int // Histogram of latency ranges + Throughput float64 // Frames per second processed +} + +// FrameLatencyTracker tracks latency for a single audio frame through the pipeline +type FrameLatencyTracker struct { + frameID uint64 + startTime int64 // Nanosecond timestamp + webrtcStartTime int64 + ipcStartTime int64 + cgoStartTime int64 + alsaStartTime int64 + validationStartTime int64 + serializationStartTime int64 + frameSize int + source string +} + +// Global profiler instance +var ( + globalLatencyProfiler unsafe.Pointer // *LatencyProfiler + profilerInitialized int32 +) + +// DefaultLatencyProfilerConfig returns default profiler configuration +func DefaultLatencyProfilerConfig() LatencyProfilerConfig { + return LatencyProfilerConfig{ + MaxMeasurements: 10000, + SamplingRate: 0.1, // Profile 10% of frames to minimize overhead + ReportingInterval: 30 * time.Second, + ThresholdWarning: 50 * time.Millisecond, + ThresholdCritical: 100 * time.Millisecond, + EnableDetailedTrace: false, // Disabled by default for performance + EnableHistogram: true, + } +} + +// NewLatencyProfiler creates a new latency profiler +func NewLatencyProfiler(config LatencyProfilerConfig) *LatencyProfiler { + ctx, cancel := context.WithCancel(context.Background()) + logger := logging.GetDefaultLogger().With().Str("component", "latency-profiler").Logger() + + // Validate configuration + if config.MaxMeasurements <= 0 { + config.MaxMeasurements = 10000 + } + if config.SamplingRate < 0.0 || config.SamplingRate > 1.0 { + config.SamplingRate = 0.1 + } + if config.ReportingInterval <= 0 { + config.ReportingInterval = 30 * time.Second + } + + profiler := &LatencyProfiler{ + config: config, + logger: logger, + ctx: ctx, + cancel: cancel, + measurements: make([]DetailedLatencyMeasurement, config.MaxMeasurements), + timeSource: func() int64 { return time.Now().UnixNano() }, + } + + // Initialize peak latencies to zero + atomic.StoreInt64(&profiler.maxWebrtcLatency, 0) + atomic.StoreInt64(&profiler.maxIpcLatency, 0) + atomic.StoreInt64(&profiler.maxCgoLatency, 0) + atomic.StoreInt64(&profiler.maxAlsaLatency, 0) + atomic.StoreInt64(&profiler.maxEndToEndLatency, 0) + + return profiler +} + +// Start begins latency profiling +func (lp *LatencyProfiler) Start() error { + if !atomic.CompareAndSwapInt32(&lp.running, 0, 1) { + return fmt.Errorf("latency profiler already running") + } + + // Enable measurement collection + atomic.StoreInt32(&lp.enabled, 1) + + // Start reporting goroutine + go lp.reportingLoop() + + lp.logger.Info().Float64("sampling_rate", lp.config.SamplingRate).Msg("latency profiler started") + return nil +} + +// Stop stops latency profiling +func (lp *LatencyProfiler) Stop() { + if !atomic.CompareAndSwapInt32(&lp.running, 1, 0) { + return + } + + // Disable measurement collection + atomic.StoreInt32(&lp.enabled, 0) + + // Cancel context to stop reporting + lp.cancel() + + lp.logger.Info().Msg("latency profiler stopped") +} + +// IsEnabled returns whether profiling is currently enabled +func (lp *LatencyProfiler) IsEnabled() bool { + return atomic.LoadInt32(&lp.enabled) == 1 +} + +// StartFrameTracking begins tracking latency for a new audio frame +func (lp *LatencyProfiler) StartFrameTracking(frameID uint64, frameSize int, source string) *FrameLatencyTracker { + if !lp.IsEnabled() { + return nil + } + + // Apply sampling rate to reduce profiling overhead + if lp.config.SamplingRate < 1.0 { + // Simple sampling based on frame ID + if float64(frameID%100)/100.0 > lp.config.SamplingRate { + return nil + } + } + + now := lp.timeSource() + return &FrameLatencyTracker{ + frameID: frameID, + startTime: now, + frameSize: frameSize, + source: source, + } +} + +// TrackWebRTCStart marks the start of WebRTC processing +func (tracker *FrameLatencyTracker) TrackWebRTCStart() { + if tracker != nil { + tracker.webrtcStartTime = time.Now().UnixNano() + } +} + +// TrackIPCStart marks the start of IPC communication +func (tracker *FrameLatencyTracker) TrackIPCStart() { + if tracker != nil { + tracker.ipcStartTime = time.Now().UnixNano() + } +} + +// TrackCGOStart marks the start of CGO processing +func (tracker *FrameLatencyTracker) TrackCGOStart() { + if tracker != nil { + tracker.cgoStartTime = time.Now().UnixNano() + } +} + +// TrackALSAStart marks the start of ALSA device processing +func (tracker *FrameLatencyTracker) TrackALSAStart() { + if tracker != nil { + tracker.alsaStartTime = time.Now().UnixNano() + } +} + +// TrackValidationStart marks the start of frame validation +func (tracker *FrameLatencyTracker) TrackValidationStart() { + if tracker != nil { + tracker.validationStartTime = time.Now().UnixNano() + } +} + +// TrackSerializationStart marks the start of data serialization +func (tracker *FrameLatencyTracker) TrackSerializationStart() { + if tracker != nil { + tracker.serializationStartTime = time.Now().UnixNano() + } +} + +// FinishTracking completes frame tracking and records the measurement +func (lp *LatencyProfiler) FinishTracking(tracker *FrameLatencyTracker) { + if tracker == nil || !lp.IsEnabled() { + return + } + + endTime := lp.timeSource() + + // Calculate component latencies + var webrtcLatency, ipcLatency, cgoLatency, alsaLatency, validationLatency, serializationLatency time.Duration + + if tracker.webrtcStartTime > 0 { + webrtcLatency = time.Duration(tracker.ipcStartTime - tracker.webrtcStartTime) + } + if tracker.ipcStartTime > 0 { + ipcLatency = time.Duration(tracker.cgoStartTime - tracker.ipcStartTime) + } + if tracker.cgoStartTime > 0 { + cgoLatency = time.Duration(tracker.alsaStartTime - tracker.cgoStartTime) + } + if tracker.alsaStartTime > 0 { + alsaLatency = time.Duration(endTime - tracker.alsaStartTime) + } + if tracker.validationStartTime > 0 { + validationLatency = time.Duration(tracker.ipcStartTime - tracker.validationStartTime) + } + if tracker.serializationStartTime > 0 { + serializationLatency = time.Duration(tracker.cgoStartTime - tracker.serializationStartTime) + } + + endToEndLatency := time.Duration(endTime - tracker.startTime) + + // Update atomic counters + atomic.AddInt64(&lp.totalMeasurements, 1) + atomic.AddInt64(&lp.webrtcLatencySum, webrtcLatency.Nanoseconds()) + atomic.AddInt64(&lp.ipcLatencySum, ipcLatency.Nanoseconds()) + atomic.AddInt64(&lp.cgoLatencySum, cgoLatency.Nanoseconds()) + atomic.AddInt64(&lp.alsaLatencySum, alsaLatency.Nanoseconds()) + atomic.AddInt64(&lp.endToEndLatencySum, endToEndLatency.Nanoseconds()) + atomic.AddInt64(&lp.validationLatencySum, validationLatency.Nanoseconds()) + atomic.AddInt64(&lp.serializationLatencySum, serializationLatency.Nanoseconds()) + + // Update peak latencies + lp.updatePeakLatency(&lp.maxWebrtcLatency, webrtcLatency.Nanoseconds()) + lp.updatePeakLatency(&lp.maxIpcLatency, ipcLatency.Nanoseconds()) + lp.updatePeakLatency(&lp.maxCgoLatency, cgoLatency.Nanoseconds()) + lp.updatePeakLatency(&lp.maxAlsaLatency, alsaLatency.Nanoseconds()) + lp.updatePeakLatency(&lp.maxEndToEndLatency, endToEndLatency.Nanoseconds()) + + // Store detailed measurement if enabled + if lp.config.EnableDetailedTrace { + lp.storeMeasurement(DetailedLatencyMeasurement{ + Timestamp: time.Now(), + FrameID: tracker.frameID, + WebRTCLatency: webrtcLatency, + IPCLatency: ipcLatency, + CGOLatency: cgoLatency, + ALSALatency: alsaLatency, + ValidationLatency: validationLatency, + SerializationLatency: serializationLatency, + EndToEndLatency: endToEndLatency, + Source: tracker.source, + FrameSize: tracker.frameSize, + CPUUsage: lp.getCurrentCPUUsage(), + MemoryUsage: lp.getCurrentMemoryUsage(), + }) + } + + // Check for threshold violations + if endToEndLatency > lp.config.ThresholdCritical { + lp.logger.Error().Dur("latency", endToEndLatency).Uint64("frame_id", tracker.frameID). + Str("source", tracker.source).Msg("critical latency threshold exceeded") + } else if endToEndLatency > lp.config.ThresholdWarning { + lp.logger.Warn().Dur("latency", endToEndLatency).Uint64("frame_id", tracker.frameID). + Str("source", tracker.source).Msg("warning latency threshold exceeded") + } +} + +// updatePeakLatency atomically updates peak latency if new value is higher +func (lp *LatencyProfiler) updatePeakLatency(peakPtr *int64, newLatency int64) { + for { + current := atomic.LoadInt64(peakPtr) + if newLatency <= current || atomic.CompareAndSwapInt64(peakPtr, current, newLatency) { + break + } + } +} + +// storeMeasurement stores a detailed measurement in the circular buffer +func (lp *LatencyProfiler) storeMeasurement(measurement DetailedLatencyMeasurement) { + lp.measurementMutex.Lock() + defer lp.measurementMutex.Unlock() + + lp.measurements[lp.measurementIndex] = measurement + lp.measurementIndex = (lp.measurementIndex + 1) % len(lp.measurements) +} + +// GetReport generates a comprehensive latency profiling report +func (lp *LatencyProfiler) GetReport() LatencyProfileReport { + totalMeasurements := atomic.LoadInt64(&lp.totalMeasurements) + if totalMeasurements == 0 { + return LatencyProfileReport{} + } + + // Calculate averages + avgWebRTC := time.Duration(atomic.LoadInt64(&lp.webrtcLatencySum) / totalMeasurements) + avgIPC := time.Duration(atomic.LoadInt64(&lp.ipcLatencySum) / totalMeasurements) + avgCGO := time.Duration(atomic.LoadInt64(&lp.cgoLatencySum) / totalMeasurements) + avgALSA := time.Duration(atomic.LoadInt64(&lp.alsaLatencySum) / totalMeasurements) + avgEndToEnd := time.Duration(atomic.LoadInt64(&lp.endToEndLatencySum) / totalMeasurements) + avgValidation := time.Duration(atomic.LoadInt64(&lp.validationLatencySum) / totalMeasurements) + avgSerialization := time.Duration(atomic.LoadInt64(&lp.serializationLatencySum) / totalMeasurements) + + // Get peak latencies + maxWebRTC := time.Duration(atomic.LoadInt64(&lp.maxWebrtcLatency)) + maxIPC := time.Duration(atomic.LoadInt64(&lp.maxIpcLatency)) + maxCGO := time.Duration(atomic.LoadInt64(&lp.maxCgoLatency)) + maxALSA := time.Duration(atomic.LoadInt64(&lp.maxAlsaLatency)) + maxEndToEnd := time.Duration(atomic.LoadInt64(&lp.maxEndToEndLatency)) + + // Determine bottleneck component + bottleneck := "WebRTC" + maxAvg := avgWebRTC + if avgIPC > maxAvg { + bottleneck = "IPC" + maxAvg = avgIPC + } + if avgCGO > maxAvg { + bottleneck = "CGO" + maxAvg = avgCGO + } + if avgALSA > maxAvg { + bottleneck = "ALSA" + } + + return LatencyProfileReport{ + TotalMeasurements: totalMeasurements, + AvgWebRTCLatency: avgWebRTC, + AvgIPCLatency: avgIPC, + AvgCGOLatency: avgCGO, + AvgALSALatency: avgALSA, + AvgEndToEndLatency: avgEndToEnd, + AvgValidationLatency: avgValidation, + AvgSerializationLatency: avgSerialization, + MaxWebRTCLatency: maxWebRTC, + MaxIPCLatency: maxIPC, + MaxCGOLatency: maxCGO, + MaxALSALatency: maxALSA, + MaxEndToEndLatency: maxEndToEnd, + BottleneckComponent: bottleneck, + } +} + +// reportingLoop periodically logs profiling reports +func (lp *LatencyProfiler) reportingLoop() { + ticker := time.NewTicker(lp.config.ReportingInterval) + defer ticker.Stop() + + for { + select { + case <-lp.ctx.Done(): + return + case <-ticker.C: + report := lp.GetReport() + if report.TotalMeasurements > 0 { + lp.logReport(report) + } + } + } +} + +// logReport logs a comprehensive profiling report +func (lp *LatencyProfiler) logReport(report LatencyProfileReport) { + lp.logger.Info(). + Int64("total_measurements", report.TotalMeasurements). + Dur("avg_webrtc_latency", report.AvgWebRTCLatency). + Dur("avg_ipc_latency", report.AvgIPCLatency). + Dur("avg_cgo_latency", report.AvgCGOLatency). + Dur("avg_alsa_latency", report.AvgALSALatency). + Dur("avg_end_to_end_latency", report.AvgEndToEndLatency). + Dur("avg_validation_latency", report.AvgValidationLatency). + Dur("avg_serialization_latency", report.AvgSerializationLatency). + Dur("max_webrtc_latency", report.MaxWebRTCLatency). + Dur("max_ipc_latency", report.MaxIPCLatency). + Dur("max_cgo_latency", report.MaxCGOLatency). + Dur("max_alsa_latency", report.MaxALSALatency). + Dur("max_end_to_end_latency", report.MaxEndToEndLatency). + Str("bottleneck_component", report.BottleneckComponent). + Msg("latency profiling report") +} + +// getCurrentCPUUsage returns current CPU usage percentage +func (lp *LatencyProfiler) getCurrentCPUUsage() float64 { + // Simplified CPU usage - in production, this would use more sophisticated monitoring + var m runtime.MemStats + runtime.ReadMemStats(&m) + return float64(runtime.NumGoroutine()) / 100.0 // Rough approximation +} + +// getCurrentMemoryUsage returns current memory usage in bytes +func (lp *LatencyProfiler) getCurrentMemoryUsage() uint64 { + var m runtime.MemStats + runtime.ReadMemStats(&m) + return m.Alloc +} + +// GetGlobalLatencyProfiler returns the global latency profiler instance +func GetGlobalLatencyProfiler() *LatencyProfiler { + ptr := atomic.LoadPointer(&globalLatencyProfiler) + if ptr != nil { + return (*LatencyProfiler)(ptr) + } + + // Initialize on first use + if atomic.CompareAndSwapInt32(&profilerInitialized, 0, 1) { + config := DefaultLatencyProfilerConfig() + profiler := NewLatencyProfiler(config) + atomic.StorePointer(&globalLatencyProfiler, unsafe.Pointer(profiler)) + return profiler + } + + // Another goroutine initialized it, try again + ptr = atomic.LoadPointer(&globalLatencyProfiler) + if ptr != nil { + return (*LatencyProfiler)(ptr) + } + + // Fallback: create a new profiler + config := DefaultLatencyProfilerConfig() + return NewLatencyProfiler(config) +} + +// EnableLatencyProfiling enables the global latency profiler +func EnableLatencyProfiling() error { + profiler := GetGlobalLatencyProfiler() + return profiler.Start() +} + +// DisableLatencyProfiling disables the global latency profiler +func DisableLatencyProfiling() { + ptr := atomic.LoadPointer(&globalLatencyProfiler) + if ptr != nil { + profiler := (*LatencyProfiler)(ptr) + profiler.Stop() + } +} + +// ProfileFrameLatency is a convenience function to profile a single frame's latency +func ProfileFrameLatency(frameID uint64, frameSize int, source string, fn func(*FrameLatencyTracker)) { + profiler := GetGlobalLatencyProfiler() + if !profiler.IsEnabled() { + fn(nil) + return + } + + tracker := profiler.StartFrameTracking(frameID, frameSize, source) + defer profiler.FinishTracking(tracker) + fn(tracker) +} diff --git a/internal/audio/output_server_main.go b/internal/audio/output_server_main.go index 149ac83..be9e2e5 100644 --- a/internal/audio/output_server_main.go +++ b/internal/audio/output_server_main.go @@ -16,6 +16,9 @@ func RunAudioOutputServer() error { logger := logging.GetDefaultLogger().With().Str("component", "audio-output-server").Logger() logger.Debug().Msg("audio output server subprocess starting") + // Initialize validation cache for optimal performance + InitValidationCache() + // Create audio server server, err := NewAudioOutputServer() if err != nil { diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index 4057c53..7c5b1d1 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -393,7 +393,7 @@ func StartAudioOutputStreaming(send func([]byte)) error { copy(frame, buffer[:n]) // Validate frame before sending - if err := ValidateAudioFrameFast(frame); err != nil { + if err := ValidateAudioFrame(frame); err != nil { getOutputStreamingLogger().Warn().Err(err).Msg("Frame validation failed, dropping frame") PutAudioFrameBuffer(frame) continue diff --git a/internal/audio/performance_critical_test.go b/internal/audio/performance_critical_test.go index adca1d8..73db6a5 100644 --- a/internal/audio/performance_critical_test.go +++ b/internal/audio/performance_critical_test.go @@ -17,6 +17,13 @@ import ( // TestPerformanceCriticalPaths tests the most frequently executed code paths // to ensure they remain efficient and don't interfere with KVM functionality func TestPerformanceCriticalPaths(t *testing.T) { + if testing.Short() { + t.Skip("Skipping performance tests in short mode") + } + + // Initialize validation cache for performance testing + InitValidationCache() + tests := []struct { name string testFunc func(t *testing.T) @@ -33,9 +40,6 @@ func TestPerformanceCriticalPaths(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if testing.Short() { - t.Skip("Skipping performance test in short mode") - } tt.testFunc(t) }) } @@ -59,7 +63,7 @@ func testAudioFrameProcessingLatency(t *testing.T) { start := time.Now() for i := 0; i < frameCount; i++ { // Simulate the critical path: validation + metrics update - err := ValidateAudioFrameFast(frameData) + err := ValidateAudioFrame(frameData) require.NoError(t, err) // Record frame received (atomic operation) @@ -139,10 +143,10 @@ func testValidationFunctionSpeed(t *testing.T) { const iterations = 10000 frameData := make([]byte, 1920) - // Test ValidateAudioFrameFast (most critical) + // Test ValidateAudioFrame (most critical) start := time.Now() for i := 0; i < iterations; i++ { - err := ValidateAudioFrameFast(frameData) + err := ValidateAudioFrame(frameData) require.NoError(t, err) } fastValidationLatency := time.Since(start) / iterations @@ -163,13 +167,13 @@ func testValidationFunctionSpeed(t *testing.T) { } bufferValidationLatency := time.Since(start) / iterations - t.Logf("ValidateAudioFrameFast latency: %v", fastValidationLatency) + t.Logf("ValidateAudioFrame latency: %v", fastValidationLatency) t.Logf("ValidateAudioQuality latency: %v", qualityValidationLatency) t.Logf("ValidateBufferSize latency: %v", bufferValidationLatency) // Validation functions optimized for ARM Cortex-A7 single core @ 1GHz // Conservative thresholds to ensure KVM functionality isn't impacted - assert.Less(t, fastValidationLatency, 100*time.Microsecond, "ValidateAudioFrameFast too slow") + assert.Less(t, fastValidationLatency, 100*time.Microsecond, "ValidateAudioFrame too slow") assert.Less(t, qualityValidationLatency, 50*time.Microsecond, "ValidateAudioQuality too slow") assert.Less(t, bufferValidationLatency, 50*time.Microsecond, "ValidateBufferSize too slow") } @@ -218,7 +222,7 @@ func testConcurrentAccessPerformance(t *testing.T) { for j := 0; j < operationsPerGoroutine; j++ { // Simulate concurrent audio processing - _ = ValidateAudioFrameFast(frameData) + _ = ValidateAudioFrame(frameData) RecordFrameReceived(len(frameData)) _ = GetAudioMetrics() _ = GetAudioConfig() @@ -305,7 +309,7 @@ func TestRegressionDetection(t *testing.T) { frameData := make([]byte, 1920) start := time.Now() for i := 0; i < 100; i++ { - _ = ValidateAudioFrameFast(frameData) + _ = ValidateAudioFrame(frameData) RecordFrameReceived(len(frameData)) } frameProcessingTime := time.Since(start) / 100 @@ -367,7 +371,7 @@ func TestMemoryLeakDetection(t *testing.T) { for cycle := 0; cycle < 10; cycle++ { for i := 0; i < 1000; i++ { frameData := make([]byte, 1920) - _ = ValidateAudioFrameFast(frameData) + _ = ValidateAudioFrame(frameData) RecordFrameReceived(len(frameData)) _ = GetAudioMetrics() _ = GetAudioConfig() diff --git a/internal/audio/relay.go b/internal/audio/relay.go index 0913599..43ccbfe 100644 --- a/internal/audio/relay.go +++ b/internal/audio/relay.go @@ -171,7 +171,7 @@ func (r *AudioRelay) relayLoop() { // forwardToWebRTC forwards a frame to the WebRTC audio track func (r *AudioRelay) forwardToWebRTC(frame []byte) error { // Use ultra-fast validation for critical audio path - if err := ValidateAudioFrameUltraFast(frame); err != nil { + if err := ValidateAudioFrame(frame); err != nil { r.incrementDropped() r.logger.Debug().Err(err).Msg("invalid frame data in relay") return err diff --git a/internal/audio/validation.go b/internal/audio/validation.go index b2d682c..b20df30 100644 --- a/internal/audio/validation.go +++ b/internal/audio/validation.go @@ -1,3 +1,6 @@ +//go:build !cgo || arm +// +build !cgo arm + package audio import ( @@ -298,54 +301,34 @@ func ValidateAudioConfigConstants(config *AudioConfigConstants) error { return nil } -// ValidateAudioFrameFast performs fast validation of audio frame data -// ValidateAudioFrameFast provides minimal validation for critical audio processing paths -// This function is optimized for performance and only checks essential safety bounds -func ValidateAudioFrameFast(data []byte) error { - if len(data) == 0 { - return fmt.Errorf("%w: frame data is empty", ErrInvalidFrameData) - } - maxFrameSize := GetConfig().MaxAudioFrameSize - if len(data) > maxFrameSize { - return fmt.Errorf("%w: frame size %d exceeds maximum %d", ErrInvalidFrameSize, len(data), maxFrameSize) - } - return nil -} +// Cached max frame size to avoid function call overhead in hot paths +var cachedMaxFrameSize int -// Cached constants for ultra-fast validation (initialized once at startup) -var ( - maxFrameSizeCache = 8192 // Will be updated from config during init -) - -// init initializes cached validation constants for optimal performance -// -//nolint:gochecknoinits // Required for performance-critical config caching +// Initialize validation cache at package initialization func init() { - // Cache the maximum frame size to avoid function calls in hot paths - if config := GetConfig(); config != nil { - maxFrameSizeCache = config.MaxAudioFrameSize - } - // Fallback to safe default if config unavailable - if maxFrameSizeCache <= 0 { - maxFrameSizeCache = 8192 - } + // This ensures the cache is always initialized before any validation calls + cachedMaxFrameSize = 4096 // Default value, will be updated by InitValidationCache } -// ValidateAudioFrameUltraFast provides zero-overhead validation for ultra-critical paths -// This function only checks for nil/empty data and maximum size to prevent buffer overruns -// Use this in hot audio processing loops where every microsecond matters +// InitValidationCache initializes cached validation values with actual config +func InitValidationCache() { + cachedMaxFrameSize = GetConfig().MaxAudioFrameSize +} + +// ValidateAudioFrame provides optimized validation for audio frame data +// This is the primary validation function used in all audio processing paths // // Performance optimizations: -// - Uses cached max frame size to avoid config function calls +// - Uses cached config value to eliminate function call overhead // - Single branch condition for optimal CPU pipeline efficiency // - Inlined length checks for minimal overhead // //go:inline -func ValidateAudioFrameUltraFast(data []byte) error { +func ValidateAudioFrame(data []byte) error { // Single optimized check: empty data OR exceeds cached maximum // This branch prediction friendly pattern minimizes CPU pipeline stalls dataLen := len(data) - if dataLen == 0 || dataLen > maxFrameSizeCache { + if dataLen == 0 || dataLen > cachedMaxFrameSize { return ErrInvalidFrameData } return nil diff --git a/internal/audio/validation_test.go b/internal/audio/validation_test.go index d24d4b2..b271605 100644 --- a/internal/audio/validation_test.go +++ b/internal/audio/validation_test.go @@ -15,6 +15,9 @@ import ( // TestValidationFunctions provides comprehensive testing of all validation functions // to ensure they catch breaking changes and regressions effectively func TestValidationFunctions(t *testing.T) { + // Initialize validation cache for testing + InitValidationCache() + tests := []struct { name string testFunc func(t *testing.T) @@ -67,20 +70,20 @@ func testFrameDataValidation(t *testing.T) { config := GetConfig() // Test empty data - err := ValidateAudioFrameFast([]byte{}) + err := ValidateAudioFrame([]byte{}) assert.Error(t, err) assert.Contains(t, err.Error(), "frame data is empty") // Test data above maximum size largeData := make([]byte, config.MaxAudioFrameSize+1) - err = ValidateAudioFrameFast(largeData) + err = ValidateAudioFrame(largeData) assert.Error(t, err) assert.Contains(t, err.Error(), "exceeds maximum") // Test valid data validData := make([]byte, 1000) // Within bounds if len(validData) <= config.MaxAudioFrameSize { - err = ValidateAudioFrameFast(validData) + err = ValidateAudioFrame(validData) assert.NoError(t, err) } } @@ -439,19 +442,19 @@ func testAudioFrameFastValidation(t *testing.T) { config := GetConfig() // Test empty data - err := ValidateAudioFrameFast([]byte{}) + err := ValidateAudioFrame([]byte{}) assert.Error(t, err) assert.Contains(t, err.Error(), "frame data is empty") // Test data exceeding maximum size largeData := make([]byte, config.MaxAudioFrameSize+1) - err = ValidateAudioFrameFast(largeData) + err = ValidateAudioFrame(largeData) assert.Error(t, err) assert.Contains(t, err.Error(), "exceeds maximum") // Test valid data validData := make([]byte, 1000) - err = ValidateAudioFrameFast(validData) + err = ValidateAudioFrame(validData) assert.NoError(t, err) } @@ -513,6 +516,9 @@ func TestValidationPerformance(t *testing.T) { t.Skip("Skipping performance test in short mode") } + // Initialize validation cache for performance testing + InitValidationCache() + // Test that validation functions complete quickly start := time.Now() iterations := 10000 diff --git a/main.go b/main.go index 1ce0493..a6dc98d 100644 --- a/main.go +++ b/main.go @@ -32,6 +32,9 @@ func runAudioServer() { } func startAudioSubprocess() error { + // Initialize validation cache for optimal performance + audio.InitValidationCache() + // Start adaptive buffer management for optimal performance audio.StartAdaptiveBuffering()