From 758bbbfff650235af43a6444478cad12732d2f25 Mon Sep 17 00:00:00 2001 From: Alex P Date: Thu, 28 Aug 2025 00:24:30 +0000 Subject: [PATCH] feat(audio): add latency histogram metrics collection and visualization - Add LatencyHistogramData interface and implement histogram collection in granular metrics - Create LatencyHistogram component for visualizing latency distribution - Update audio metrics events to include histogram data - Add comprehensive tests for histogram functionality - Improve error handling for OPUS encoder parameter updates - Optimize validation cache initialization --- internal/audio/audio.go | 8 +- internal/audio/cgo_audio.go | 18 +- internal/audio/config_constants.go | 66 +-- internal/audio/events.go | 66 ++- internal/audio/granular_metrics.go | 52 ++ internal/audio/granular_metrics_test.go | 560 ++++++++++++++++++ internal/audio/validation.go | 14 +- ui/src/components/AudioMetricsDashboard.tsx | 28 + ui/src/components/charts/LatencyHistogram.tsx | 145 +++++ ui/src/hooks/useAudioEvents.ts | 7 + 10 files changed, 888 insertions(+), 76 deletions(-) create mode 100644 internal/audio/granular_metrics_test.go create mode 100644 ui/src/components/charts/LatencyHistogram.tsx diff --git a/internal/audio/audio.go b/internal/audio/audio.go index a6768de..696772d 100644 --- a/internal/audio/audio.go +++ b/internal/audio/audio.go @@ -166,7 +166,7 @@ func SetAudioQuality(quality AudioQuality) { presets := GetAudioQualityPresets() if config, exists := presets[quality]; exists { currentConfig = config - + // Update CGO OPUS encoder parameters based on quality var complexity, vbr, signalType, bandwidth, dtx int switch quality { @@ -202,11 +202,13 @@ func SetAudioQuality(quality AudioQuality) { bandwidth = GetConfig().AudioQualityMediumOpusBandwidth dtx = GetConfig().AudioQualityMediumOpusDTX } - + // Dynamically update CGO OPUS encoder parameters // Use current VBR constraint setting from config vbrConstraint := GetConfig().CGOOpusVBRConstraint - updateOpusEncoderParams(config.Bitrate*1000, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx) + if err := updateOpusEncoderParams(config.Bitrate*1000, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx); err != nil { + logging.GetDefaultLogger().Error().Err(err).Msg("Failed to update OPUS encoder parameters") + } } } diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index 56c6aa5..9826740 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -71,7 +71,7 @@ int update_opus_encoder_params(int bitrate, int complexity, int vbr, int vbr_con if (!encoder || !capture_initialized) { return -1; // Encoder not initialized } - + // Update the static variables opus_bitrate = bitrate; opus_complexity = complexity; @@ -80,7 +80,7 @@ int update_opus_encoder_params(int bitrate, int complexity, int vbr, int vbr_con opus_signal_type = signal_type; opus_bandwidth = bandwidth; opus_dtx = dtx; - + // Apply the new settings to the encoder int result = 0; result |= opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate)); @@ -90,7 +90,7 @@ int update_opus_encoder_params(int bitrate, int complexity, int vbr, int vbr_con result |= opus_encoder_ctl(encoder, OPUS_SET_SIGNAL(opus_signal_type)); result |= opus_encoder_ctl(encoder, OPUS_SET_BANDWIDTH(opus_bandwidth)); result |= opus_encoder_ctl(encoder, OPUS_SET_DTX(opus_dtx)); - + return result; // 0 on success, non-zero on error } @@ -781,11 +781,11 @@ func updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType // CGO function aliases var ( - CGOAudioInit = cgoAudioInit - CGOAudioClose = cgoAudioClose - CGOAudioReadEncode = cgoAudioReadEncode - CGOAudioPlaybackInit = cgoAudioPlaybackInit - CGOAudioPlaybackClose = cgoAudioPlaybackClose - CGOAudioDecodeWrite = cgoAudioDecodeWrite + CGOAudioInit = cgoAudioInit + CGOAudioClose = cgoAudioClose + CGOAudioReadEncode = cgoAudioReadEncode + CGOAudioPlaybackInit = cgoAudioPlaybackInit + CGOAudioPlaybackClose = cgoAudioPlaybackClose + CGOAudioDecodeWrite = cgoAudioDecodeWrite CGOUpdateOpusEncoderParams = updateOpusEncoderParams ) diff --git a/internal/audio/config_constants.go b/internal/audio/config_constants.go index 9686a69..bcee566 100644 --- a/internal/audio/config_constants.go +++ b/internal/audio/config_constants.go @@ -54,11 +54,11 @@ type AudioConfigConstants struct { AudioQualityUltraChannels int // Ultra-quality channel count (default: 2) // Audio Quality OPUS Encoder Parameters - AudioQualityLowOpusComplexity int // Low-quality OPUS complexity (default: 1) - AudioQualityLowOpusVBR int // Low-quality OPUS VBR setting (default: 0) - AudioQualityLowOpusSignalType int // Low-quality OPUS signal type (default: 3001) - AudioQualityLowOpusBandwidth int // Low-quality OPUS bandwidth (default: 1101) - AudioQualityLowOpusDTX int // Low-quality OPUS DTX setting (default: 1) + AudioQualityLowOpusComplexity int // Low-quality OPUS complexity (default: 1) + AudioQualityLowOpusVBR int // Low-quality OPUS VBR setting (default: 0) + AudioQualityLowOpusSignalType int // Low-quality OPUS signal type (default: 3001) + AudioQualityLowOpusBandwidth int // Low-quality OPUS bandwidth (default: 1101) + AudioQualityLowOpusDTX int // Low-quality OPUS DTX setting (default: 1) AudioQualityMediumOpusComplexity int // Medium-quality OPUS complexity (default: 5) AudioQualityMediumOpusVBR int // Medium-quality OPUS VBR setting (default: 1) @@ -66,17 +66,17 @@ type AudioConfigConstants struct { AudioQualityMediumOpusBandwidth int // Medium-quality OPUS bandwidth (default: 1103) AudioQualityMediumOpusDTX int // Medium-quality OPUS DTX setting (default: 0) - AudioQualityHighOpusComplexity int // High-quality OPUS complexity (default: 8) - AudioQualityHighOpusVBR int // High-quality OPUS VBR setting (default: 1) - AudioQualityHighOpusSignalType int // High-quality OPUS signal type (default: 3002) - AudioQualityHighOpusBandwidth int // High-quality OPUS bandwidth (default: 1104) - AudioQualityHighOpusDTX int // High-quality OPUS DTX setting (default: 0) + AudioQualityHighOpusComplexity int // High-quality OPUS complexity (default: 8) + AudioQualityHighOpusVBR int // High-quality OPUS VBR setting (default: 1) + AudioQualityHighOpusSignalType int // High-quality OPUS signal type (default: 3002) + AudioQualityHighOpusBandwidth int // High-quality OPUS bandwidth (default: 1104) + AudioQualityHighOpusDTX int // High-quality OPUS DTX setting (default: 0) - AudioQualityUltraOpusComplexity int // Ultra-quality OPUS complexity (default: 10) - AudioQualityUltraOpusVBR int // Ultra-quality OPUS VBR setting (default: 1) - AudioQualityUltraOpusSignalType int // Ultra-quality OPUS signal type (default: 3002) - AudioQualityUltraOpusBandwidth int // Ultra-quality OPUS bandwidth (default: 1105) - AudioQualityUltraOpusDTX int // Ultra-quality OPUS DTX setting (default: 0) + AudioQualityUltraOpusComplexity int // Ultra-quality OPUS complexity (default: 10) + AudioQualityUltraOpusVBR int // Ultra-quality OPUS VBR setting (default: 1) + AudioQualityUltraOpusSignalType int // Ultra-quality OPUS signal type (default: 3002) + AudioQualityUltraOpusBandwidth int // Ultra-quality OPUS bandwidth (default: 1105) + AudioQualityUltraOpusDTX int // Ultra-quality OPUS DTX setting (default: 0) // CGO Audio Constants CGOOpusBitrate int // Native Opus encoder bitrate in bps (default: 96000) @@ -1646,32 +1646,32 @@ func DefaultAudioConfig() *AudioConfigConstants { // Impact: Controls encoding complexity, VBR, signal type, bandwidth, and DTX // Low Quality OPUS Parameters - Optimized for bandwidth conservation - AudioQualityLowOpusComplexity: 1, // Low complexity for minimal CPU usage - AudioQualityLowOpusVBR: 0, // CBR for predictable bandwidth - AudioQualityLowOpusSignalType: 3001, // OPUS_SIGNAL_VOICE - AudioQualityLowOpusBandwidth: 1101, // OPUS_BANDWIDTH_NARROWBAND - AudioQualityLowOpusDTX: 1, // Enable DTX for silence suppression + AudioQualityLowOpusComplexity: 1, // Low complexity for minimal CPU usage + AudioQualityLowOpusVBR: 0, // CBR for predictable bandwidth + AudioQualityLowOpusSignalType: 3001, // OPUS_SIGNAL_VOICE + AudioQualityLowOpusBandwidth: 1101, // OPUS_BANDWIDTH_NARROWBAND + AudioQualityLowOpusDTX: 1, // Enable DTX for silence suppression // Medium Quality OPUS Parameters - Balanced performance and quality - AudioQualityMediumOpusComplexity: 5, // Medium complexity for balanced performance - AudioQualityMediumOpusVBR: 1, // VBR for better quality + AudioQualityMediumOpusComplexity: 5, // Medium complexity for balanced performance + AudioQualityMediumOpusVBR: 1, // VBR for better quality AudioQualityMediumOpusSignalType: 3002, // OPUS_SIGNAL_MUSIC AudioQualityMediumOpusBandwidth: 1103, // OPUS_BANDWIDTH_WIDEBAND - AudioQualityMediumOpusDTX: 0, // Disable DTX for consistent quality + AudioQualityMediumOpusDTX: 0, // Disable DTX for consistent quality // High Quality OPUS Parameters - High quality with good performance - AudioQualityHighOpusComplexity: 8, // High complexity for better quality - AudioQualityHighOpusVBR: 1, // VBR for optimal quality - AudioQualityHighOpusSignalType: 3002, // OPUS_SIGNAL_MUSIC - AudioQualityHighOpusBandwidth: 1104, // OPUS_BANDWIDTH_SUPERWIDEBAND - AudioQualityHighOpusDTX: 0, // Disable DTX for consistent quality + AudioQualityHighOpusComplexity: 8, // High complexity for better quality + AudioQualityHighOpusVBR: 1, // VBR for optimal quality + AudioQualityHighOpusSignalType: 3002, // OPUS_SIGNAL_MUSIC + AudioQualityHighOpusBandwidth: 1104, // OPUS_BANDWIDTH_SUPERWIDEBAND + AudioQualityHighOpusDTX: 0, // Disable DTX for consistent quality // Ultra Quality OPUS Parameters - Maximum quality settings - AudioQualityUltraOpusComplexity: 10, // Maximum complexity for best quality - AudioQualityUltraOpusVBR: 1, // VBR for optimal quality - AudioQualityUltraOpusSignalType: 3002, // OPUS_SIGNAL_MUSIC - AudioQualityUltraOpusBandwidth: 1105, // OPUS_BANDWIDTH_FULLBAND - AudioQualityUltraOpusDTX: 0, // Disable DTX for maximum quality + AudioQualityUltraOpusComplexity: 10, // Maximum complexity for best quality + AudioQualityUltraOpusVBR: 1, // VBR for optimal quality + AudioQualityUltraOpusSignalType: 3002, // OPUS_SIGNAL_MUSIC + AudioQualityUltraOpusBandwidth: 1105, // OPUS_BANDWIDTH_FULLBAND + AudioQualityUltraOpusDTX: 0, // Disable DTX for maximum quality // CGO Audio Constants CGOOpusBitrate: 96000, diff --git a/internal/audio/events.go b/internal/audio/events.go index 1cf8862..698af09 100644 --- a/internal/audio/events.go +++ b/internal/audio/events.go @@ -39,12 +39,13 @@ type AudioMuteData struct { // AudioMetricsData represents audio metrics data type AudioMetricsData struct { - FramesReceived int64 `json:"frames_received"` - FramesDropped int64 `json:"frames_dropped"` - BytesProcessed int64 `json:"bytes_processed"` - LastFrameTime string `json:"last_frame_time"` - ConnectionDrops int64 `json:"connection_drops"` - AverageLatency string `json:"average_latency"` + FramesReceived int64 `json:"frames_received"` + FramesDropped int64 `json:"frames_dropped"` + BytesProcessed int64 `json:"bytes_processed"` + LastFrameTime string `json:"last_frame_time"` + ConnectionDrops int64 `json:"connection_drops"` + AverageLatency string `json:"average_latency"` + LatencyHistogram *LatencyHistogramData `json:"latency_histogram,omitempty"` } // MicrophoneStateData represents microphone state data @@ -55,12 +56,13 @@ type MicrophoneStateData struct { // MicrophoneMetricsData represents microphone metrics data type MicrophoneMetricsData struct { - FramesSent int64 `json:"frames_sent"` - FramesDropped int64 `json:"frames_dropped"` - BytesProcessed int64 `json:"bytes_processed"` - LastFrameTime string `json:"last_frame_time"` - ConnectionDrops int64 `json:"connection_drops"` - AverageLatency string `json:"average_latency"` + FramesSent int64 `json:"frames_sent"` + FramesDropped int64 `json:"frames_dropped"` + BytesProcessed int64 `json:"bytes_processed"` + LastFrameTime string `json:"last_frame_time"` + ConnectionDrops int64 `json:"connection_drops"` + AverageLatency string `json:"average_latency"` + LatencyHistogram *LatencyHistogramData `json:"latency_histogram,omitempty"` } // ProcessMetricsData represents process metrics data for WebSocket events @@ -225,25 +227,41 @@ func (aeb *AudioEventBroadcaster) sendInitialState(connectionID string) { // convertAudioMetricsToEventDataWithLatencyMs converts internal audio metrics to AudioMetricsData with millisecond latency formatting func convertAudioMetricsToEventDataWithLatencyMs(metrics AudioMetrics) AudioMetricsData { + // Get histogram data from granular metrics collector + granularCollector := GetGranularMetricsCollector() + var histogramData *LatencyHistogramData + if granularCollector != nil { + histogramData = granularCollector.GetOutputLatencyHistogram() + } + return AudioMetricsData{ - FramesReceived: metrics.FramesReceived, - FramesDropped: metrics.FramesDropped, - BytesProcessed: metrics.BytesProcessed, - LastFrameTime: metrics.LastFrameTime.Format(GetConfig().EventTimeFormatString), - ConnectionDrops: metrics.ConnectionDrops, - AverageLatency: fmt.Sprintf("%.1fms", float64(metrics.AverageLatency.Nanoseconds())/1e6), + FramesReceived: metrics.FramesReceived, + FramesDropped: metrics.FramesDropped, + BytesProcessed: metrics.BytesProcessed, + LastFrameTime: metrics.LastFrameTime.Format(GetConfig().EventTimeFormatString), + ConnectionDrops: metrics.ConnectionDrops, + AverageLatency: fmt.Sprintf("%.1fms", float64(metrics.AverageLatency.Nanoseconds())/1e6), + LatencyHistogram: histogramData, } } // convertAudioInputMetricsToEventDataWithLatencyMs converts internal audio input metrics to MicrophoneMetricsData with millisecond latency formatting func convertAudioInputMetricsToEventDataWithLatencyMs(metrics AudioInputMetrics) MicrophoneMetricsData { + // Get histogram data from granular metrics collector + granularCollector := GetGranularMetricsCollector() + var histogramData *LatencyHistogramData + if granularCollector != nil { + histogramData = granularCollector.GetInputLatencyHistogram() + } + return MicrophoneMetricsData{ - FramesSent: metrics.FramesSent, - FramesDropped: metrics.FramesDropped, - BytesProcessed: metrics.BytesProcessed, - LastFrameTime: metrics.LastFrameTime.Format(GetConfig().EventTimeFormatString), - ConnectionDrops: metrics.ConnectionDrops, - AverageLatency: fmt.Sprintf("%.1fms", float64(metrics.AverageLatency.Nanoseconds())/1e6), + FramesSent: metrics.FramesSent, + FramesDropped: metrics.FramesDropped, + BytesProcessed: metrics.BytesProcessed, + LastFrameTime: metrics.LastFrameTime.Format(GetConfig().EventTimeFormatString), + ConnectionDrops: metrics.ConnectionDrops, + AverageLatency: fmt.Sprintf("%.1fms", float64(metrics.AverageLatency.Nanoseconds())/1e6), + LatencyHistogram: histogramData, } } diff --git a/internal/audio/granular_metrics.go b/internal/audio/granular_metrics.go index 50fbb7d..d044dbb 100644 --- a/internal/audio/granular_metrics.go +++ b/internal/audio/granular_metrics.go @@ -142,6 +142,32 @@ func (lh *LatencyHistogram) RecordLatency(latency time.Duration) { lh.samplesMutex.Unlock() } +// LatencyHistogramData represents histogram data for WebSocket transmission +type LatencyHistogramData struct { + Buckets []float64 `json:"buckets"` // Bucket boundaries in milliseconds + Counts []int64 `json:"counts"` // Count for each bucket +} + +// GetHistogramData returns histogram buckets and counts for WebSocket transmission +func (lh *LatencyHistogram) GetHistogramData() LatencyHistogramData { + // Convert bucket boundaries from nanoseconds to milliseconds + buckets := make([]float64, len(lh.buckets)) + for i, bucket := range lh.buckets { + buckets[i] = float64(bucket) / 1e6 // Convert ns to ms + } + + // Get current counts atomically + counts := make([]int64, len(lh.counts)) + for i := range lh.counts { + counts[i] = atomic.LoadInt64(&lh.counts[i]) + } + + return LatencyHistogramData{ + Buckets: buckets, + Counts: counts, + } +} + // GetPercentiles calculates latency percentiles from recent samples func (lh *LatencyHistogram) GetPercentiles() LatencyPercentiles { lh.samplesMutex.RLock() @@ -341,6 +367,32 @@ func (gmc *GranularMetricsCollector) GetLatencyPercentiles() map[string]LatencyP } } +// GetInputLatencyHistogram returns histogram data for input latency +func (gmc *GranularMetricsCollector) GetInputLatencyHistogram() *LatencyHistogramData { + gmc.mutex.RLock() + defer gmc.mutex.RUnlock() + + if gmc.inputLatencyHist == nil { + return nil + } + + data := gmc.inputLatencyHist.GetHistogramData() + return &data +} + +// GetOutputLatencyHistogram returns histogram data for output latency +func (gmc *GranularMetricsCollector) GetOutputLatencyHistogram() *LatencyHistogramData { + gmc.mutex.RLock() + defer gmc.mutex.RUnlock() + + if gmc.outputLatencyHist == nil { + return nil + } + + data := gmc.outputLatencyHist.GetHistogramData() + return &data +} + // GetBufferPoolEfficiency returns efficiency metrics for all buffer pools func (gmc *GranularMetricsCollector) GetBufferPoolEfficiency() map[string]BufferPoolEfficiencyMetrics { gmc.mutex.RLock() diff --git a/internal/audio/granular_metrics_test.go b/internal/audio/granular_metrics_test.go new file mode 100644 index 0000000..0440311 --- /dev/null +++ b/internal/audio/granular_metrics_test.go @@ -0,0 +1,560 @@ +package audio + +import ( + "sync" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestLatencyHistogram tests the LatencyHistogram functionality +func TestLatencyHistogram(t *testing.T) { + tests := []struct { + name string + testFunc func(t *testing.T) + }{ + {"NewLatencyHistogram", testNewLatencyHistogram}, + {"RecordLatency", testRecordLatency}, + {"GetHistogramData", testGetHistogramData}, + {"GetPercentiles", testGetPercentiles}, + {"ConcurrentAccess", testLatencyHistogramConcurrentAccess}, + {"BucketDistribution", testBucketDistribution}, + {"OverflowBucket", testOverflowBucket}, + {"RecentSamplesLimit", testRecentSamplesLimit}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.testFunc(t) + }) + } +} + +// testNewLatencyHistogram tests LatencyHistogram creation +func testNewLatencyHistogram(t *testing.T) { + logger := zerolog.Nop() + maxSamples := 100 + + hist := NewLatencyHistogram(maxSamples, logger) + + require.NotNil(t, hist) + assert.Equal(t, maxSamples, hist.maxSamples) + assert.NotNil(t, hist.buckets) + assert.NotNil(t, hist.counts) + assert.Equal(t, len(hist.buckets)+1, len(hist.counts)) // +1 for overflow bucket + assert.NotNil(t, hist.recentSamples) + assert.Equal(t, 0, len(hist.recentSamples)) +} + +// testRecordLatency tests latency recording functionality +func testRecordLatency(t *testing.T) { + logger := zerolog.Nop() + hist := NewLatencyHistogram(100, logger) + + // Test recording various latencies + latencies := []time.Duration{ + 500 * time.Microsecond, // Should go in first bucket (1ms) + 3 * time.Millisecond, // Should go in second bucket (5ms) + 15 * time.Millisecond, // Should go in third bucket (10ms) + 100 * time.Millisecond, // Should go in appropriate bucket + 3 * time.Second, // Should go in overflow bucket + } + + for _, latency := range latencies { + hist.RecordLatency(latency) + } + + // Verify sample count + assert.Equal(t, int64(len(latencies)), hist.sampleCount) + + // Verify total latency is accumulated + expectedTotal := int64(0) + for _, latency := range latencies { + expectedTotal += latency.Nanoseconds() + } + assert.Equal(t, expectedTotal, hist.totalLatency) + + // Verify recent samples are stored + assert.Equal(t, len(latencies), len(hist.recentSamples)) +} + +// testGetHistogramData tests histogram data retrieval +func testGetHistogramData(t *testing.T) { + logger := zerolog.Nop() + hist := NewLatencyHistogram(100, logger) + + // Record some test latencies + hist.RecordLatency(500 * time.Microsecond) + hist.RecordLatency(3 * time.Millisecond) + hist.RecordLatency(15 * time.Millisecond) + hist.RecordLatency(3 * time.Second) // overflow + + data := hist.GetHistogramData() + + // Verify buckets are converted to milliseconds + require.NotNil(t, data.Buckets) + require.NotNil(t, data.Counts) + assert.Equal(t, len(hist.buckets), len(data.Buckets)) + assert.Equal(t, len(hist.counts), len(data.Counts)) + + // Verify first bucket is 1ms + assert.Equal(t, 1.0, data.Buckets[0]) + + // Verify counts are non-negative + for i, count := range data.Counts { + assert.GreaterOrEqual(t, count, int64(0), "Count at index %d should be non-negative", i) + } + + // Verify total counts match recorded samples + totalCounts := int64(0) + for _, count := range data.Counts { + totalCounts += count + } + assert.Equal(t, int64(4), totalCounts) +} + +// testGetPercentiles tests percentile calculation +func testGetPercentiles(t *testing.T) { + logger := zerolog.Nop() + hist := NewLatencyHistogram(100, logger) + + // Record a known set of latencies + latencies := []time.Duration{ + 1 * time.Millisecond, + 2 * time.Millisecond, + 3 * time.Millisecond, + 4 * time.Millisecond, + 5 * time.Millisecond, + 10 * time.Millisecond, + 20 * time.Millisecond, + 50 * time.Millisecond, + 100 * time.Millisecond, + 200 * time.Millisecond, + } + + for _, latency := range latencies { + hist.RecordLatency(latency) + } + + percentiles := hist.GetPercentiles() + + // Verify percentiles are calculated + assert.Greater(t, percentiles.P50, time.Duration(0)) + assert.Greater(t, percentiles.P95, time.Duration(0)) + assert.Greater(t, percentiles.P99, time.Duration(0)) + assert.Greater(t, percentiles.Min, time.Duration(0)) + assert.Greater(t, percentiles.Max, time.Duration(0)) + assert.Greater(t, percentiles.Avg, time.Duration(0)) + + // Verify ordering: Min <= P50 <= P95 <= P99 <= Max + assert.LessOrEqual(t, percentiles.Min, percentiles.P50) + assert.LessOrEqual(t, percentiles.P50, percentiles.P95) + assert.LessOrEqual(t, percentiles.P95, percentiles.P99) + assert.LessOrEqual(t, percentiles.P99, percentiles.Max) + + // Verify min and max are correct + assert.Equal(t, 1*time.Millisecond, percentiles.Min) + assert.Equal(t, 200*time.Millisecond, percentiles.Max) +} + +// testLatencyHistogramConcurrentAccess tests thread safety +func testLatencyHistogramConcurrentAccess(t *testing.T) { + logger := zerolog.Nop() + hist := NewLatencyHistogram(1000, logger) + + const numGoroutines = 10 + const samplesPerGoroutine = 100 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + // Concurrent writers + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer wg.Done() + for j := 0; j < samplesPerGoroutine; j++ { + latency := time.Duration(id*j+1) * time.Microsecond + hist.RecordLatency(latency) + } + }(i) + } + + // Concurrent readers + for i := 0; i < 5; i++ { + go func() { + for j := 0; j < 50; j++ { + _ = hist.GetHistogramData() + _ = hist.GetPercentiles() + time.Sleep(time.Microsecond) + } + }() + } + + wg.Wait() + + // Verify final state + assert.Equal(t, int64(numGoroutines*samplesPerGoroutine), hist.sampleCount) + data := hist.GetHistogramData() + assert.NotNil(t, data) +} + +// testBucketDistribution tests that latencies are distributed correctly across buckets +func testBucketDistribution(t *testing.T) { + logger := zerolog.Nop() + hist := NewLatencyHistogram(100, logger) + + // Record latencies that should go into specific buckets + testCases := []struct { + latency time.Duration + expectedBucket int + }{ + {500 * time.Microsecond, 0}, // < 1ms + {3 * time.Millisecond, 1}, // < 5ms + {8 * time.Millisecond, 2}, // < 10ms (assuming 10ms is bucket 2) + } + + for _, tc := range testCases { + hist.RecordLatency(tc.latency) + } + + data := hist.GetHistogramData() + + // Verify that counts are in expected buckets + for i, tc := range testCases { + if tc.expectedBucket < len(data.Counts) { + assert.GreaterOrEqual(t, data.Counts[tc.expectedBucket], int64(1), + "Test case %d: Expected bucket %d to have at least 1 count", i, tc.expectedBucket) + } + } +} + +// testOverflowBucket tests the overflow bucket functionality +func testOverflowBucket(t *testing.T) { + logger := zerolog.Nop() + hist := NewLatencyHistogram(100, logger) + + // Record a latency that should go into overflow bucket + veryHighLatency := 10 * time.Second + hist.RecordLatency(veryHighLatency) + + data := hist.GetHistogramData() + + // Verify overflow bucket (last bucket) has the count + overflowBucketIndex := len(data.Counts) - 1 + assert.Equal(t, int64(1), data.Counts[overflowBucketIndex]) + + // Verify other buckets are empty + for i := 0; i < overflowBucketIndex; i++ { + assert.Equal(t, int64(0), data.Counts[i], "Bucket %d should be empty", i) + } +} + +// testRecentSamplesLimit tests that recent samples are limited correctly +func testRecentSamplesLimit(t *testing.T) { + logger := zerolog.Nop() + maxSamples := 5 + hist := NewLatencyHistogram(maxSamples, logger) + + // Record more samples than the limit + for i := 0; i < maxSamples*2; i++ { + hist.RecordLatency(time.Duration(i+1) * time.Millisecond) + } + + // Verify recent samples are limited + hist.samplesMutex.RLock() + assert.Equal(t, maxSamples, len(hist.recentSamples)) + hist.samplesMutex.RUnlock() + + // Verify total sample count is still correct + assert.Equal(t, int64(maxSamples*2), hist.sampleCount) +} + +// TestGranularMetricsCollector tests the GranularMetricsCollector functionality +func TestGranularMetricsCollector(t *testing.T) { + tests := []struct { + name string + testFunc func(t *testing.T) + }{ + {"GetGranularMetricsCollector", testGetGranularMetricsCollector}, + {"RecordInputLatency", testRecordInputLatency}, + {"RecordOutputLatency", testRecordOutputLatency}, + {"GetInputLatencyHistogram", testGetInputLatencyHistogram}, + {"GetOutputLatencyHistogram", testGetOutputLatencyHistogram}, + {"GetLatencyPercentiles", testGetLatencyPercentiles}, + {"ConcurrentCollectorAccess", testConcurrentCollectorAccess}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.testFunc(t) + }) + } +} + +// testGetGranularMetricsCollector tests singleton behavior +func testGetGranularMetricsCollector(t *testing.T) { + collector1 := GetGranularMetricsCollector() + collector2 := GetGranularMetricsCollector() + + require.NotNil(t, collector1) + require.NotNil(t, collector2) + assert.Same(t, collector1, collector2, "Should return the same singleton instance") +} + +// testRecordInputLatency tests input latency recording +func testRecordInputLatency(t *testing.T) { + collector := GetGranularMetricsCollector() + require.NotNil(t, collector) + + testLatency := 5 * time.Millisecond + collector.RecordInputLatency(testLatency) + + // Verify histogram data is available + histData := collector.GetInputLatencyHistogram() + require.NotNil(t, histData) + assert.NotNil(t, histData.Buckets) + assert.NotNil(t, histData.Counts) + + // Verify at least one count is recorded + totalCounts := int64(0) + for _, count := range histData.Counts { + totalCounts += count + } + assert.Equal(t, int64(1), totalCounts) +} + +// testRecordOutputLatency tests output latency recording +func testRecordOutputLatency(t *testing.T) { + collector := GetGranularMetricsCollector() + require.NotNil(t, collector) + + testLatency := 10 * time.Millisecond + collector.RecordOutputLatency(testLatency) + + // Verify histogram data is available + histData := collector.GetOutputLatencyHistogram() + require.NotNil(t, histData) + assert.NotNil(t, histData.Buckets) + assert.NotNil(t, histData.Counts) + + // Verify at least one count is recorded + totalCounts := int64(0) + for _, count := range histData.Counts { + totalCounts += count + } + assert.Equal(t, int64(1), totalCounts) +} + +// testGetInputLatencyHistogram tests input histogram retrieval +func testGetInputLatencyHistogram(t *testing.T) { + collector := GetGranularMetricsCollector() + require.NotNil(t, collector) + + // Test when no data is recorded + histData := collector.GetInputLatencyHistogram() + if histData != nil { + assert.NotNil(t, histData.Buckets) + assert.NotNil(t, histData.Counts) + } + + // Record some data and test again + collector.RecordInputLatency(2 * time.Millisecond) + histData = collector.GetInputLatencyHistogram() + require.NotNil(t, histData) + assert.NotNil(t, histData.Buckets) + assert.NotNil(t, histData.Counts) +} + +// testGetOutputLatencyHistogram tests output histogram retrieval +func testGetOutputLatencyHistogram(t *testing.T) { + collector := GetGranularMetricsCollector() + require.NotNil(t, collector) + + // Test when no data is recorded + histData := collector.GetOutputLatencyHistogram() + if histData != nil { + assert.NotNil(t, histData.Buckets) + assert.NotNil(t, histData.Counts) + } + + // Record some data and test again + collector.RecordOutputLatency(7 * time.Millisecond) + histData = collector.GetOutputLatencyHistogram() + require.NotNil(t, histData) + assert.NotNil(t, histData.Buckets) + assert.NotNil(t, histData.Counts) +} + +// testGetLatencyPercentiles tests percentile retrieval from collector +func testGetLatencyPercentiles(t *testing.T) { + collector := GetGranularMetricsCollector() + require.NotNil(t, collector) + + // Record some test data + latencies := []time.Duration{ + 1 * time.Millisecond, + 5 * time.Millisecond, + 10 * time.Millisecond, + 20 * time.Millisecond, + 50 * time.Millisecond, + } + + for _, latency := range latencies { + collector.RecordInputLatency(latency) + collector.RecordOutputLatency(latency) + } + + // Test percentiles map + percentilesMap := collector.GetLatencyPercentiles() + require.NotNil(t, percentilesMap) + + // Test input percentiles if available + if inputPercentiles, exists := percentilesMap["input"]; exists { + assert.Greater(t, inputPercentiles.P50, time.Duration(0)) + assert.Greater(t, inputPercentiles.P95, time.Duration(0)) + assert.Greater(t, inputPercentiles.P99, time.Duration(0)) + } + + // Test output percentiles if available + if outputPercentiles, exists := percentilesMap["output"]; exists { + assert.Greater(t, outputPercentiles.P50, time.Duration(0)) + assert.Greater(t, outputPercentiles.P95, time.Duration(0)) + assert.Greater(t, outputPercentiles.P99, time.Duration(0)) + } +} + +// testConcurrentCollectorAccess tests thread safety of the collector +func testConcurrentCollectorAccess(t *testing.T) { + collector := GetGranularMetricsCollector() + require.NotNil(t, collector) + + const numGoroutines = 10 + const operationsPerGoroutine = 50 + + var wg sync.WaitGroup + wg.Add(numGoroutines * 3) // 3 types of operations + + // Concurrent input latency recording + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer wg.Done() + for j := 0; j < operationsPerGoroutine; j++ { + latency := time.Duration(id*j+1) * time.Microsecond + collector.RecordInputLatency(latency) + } + }(i) + } + + // Concurrent output latency recording + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer wg.Done() + for j := 0; j < operationsPerGoroutine; j++ { + latency := time.Duration(id*j+1) * time.Microsecond + collector.RecordOutputLatency(latency) + } + }(i) + } + + // Concurrent data retrieval + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + for j := 0; j < operationsPerGoroutine; j++ { + _ = collector.GetInputLatencyHistogram() + _ = collector.GetOutputLatencyHistogram() + _ = collector.GetLatencyPercentiles() + time.Sleep(time.Microsecond) + } + }() + } + + wg.Wait() + + // Verify final state is consistent + inputData := collector.GetInputLatencyHistogram() + outputData := collector.GetOutputLatencyHistogram() + assert.NotNil(t, inputData) + assert.NotNil(t, outputData) +} + +// Benchmark tests for performance validation +func BenchmarkLatencyHistogram(b *testing.B) { + logger := zerolog.Nop() + hist := NewLatencyHistogram(1000, logger) + + b.Run("RecordLatency", func(b *testing.B) { + latency := 5 * time.Millisecond + b.ResetTimer() + for i := 0; i < b.N; i++ { + hist.RecordLatency(latency) + } + }) + + b.Run("GetHistogramData", func(b *testing.B) { + // Pre-populate with some data + for i := 0; i < 100; i++ { + hist.RecordLatency(time.Duration(i) * time.Microsecond) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = hist.GetHistogramData() + } + }) + + b.Run("GetPercentiles", func(b *testing.B) { + // Pre-populate with some data + for i := 0; i < 100; i++ { + hist.RecordLatency(time.Duration(i) * time.Microsecond) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = hist.GetPercentiles() + } + }) +} + +func BenchmarkGranularMetricsCollector(b *testing.B) { + collector := GetGranularMetricsCollector() + + b.Run("RecordInputLatency", func(b *testing.B) { + latency := 5 * time.Millisecond + b.ResetTimer() + for i := 0; i < b.N; i++ { + collector.RecordInputLatency(latency) + } + }) + + b.Run("RecordOutputLatency", func(b *testing.B) { + latency := 5 * time.Millisecond + b.ResetTimer() + for i := 0; i < b.N; i++ { + collector.RecordOutputLatency(latency) + } + }) + + b.Run("GetInputLatencyHistogram", func(b *testing.B) { + // Pre-populate with some data + for i := 0; i < 100; i++ { + collector.RecordInputLatency(time.Duration(i) * time.Microsecond) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = collector.GetInputLatencyHistogram() + } + }) + + b.Run("GetOutputLatencyHistogram", func(b *testing.B) { + // Pre-populate with some data + for i := 0; i < 100; i++ { + collector.RecordOutputLatency(time.Duration(i) * time.Microsecond) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = collector.GetOutputLatencyHistogram() + } + }) +} diff --git a/internal/audio/validation.go b/internal/audio/validation.go index 5761bf1..388d59d 100644 --- a/internal/audio/validation.go +++ b/internal/audio/validation.go @@ -1,5 +1,5 @@ -//go:build !cgo || arm -// +build !cgo arm +//go:build cgo +// +build cgo package audio @@ -306,11 +306,7 @@ func ValidateAudioConfigConstants(config *AudioConfigConstants) error { // Cached max frame size to avoid function call overhead in hot paths var cachedMaxFrameSize int -// Initialize validation cache at package initialization -func init() { - // This ensures the cache is always initialized before any validation calls - cachedMaxFrameSize = 4096 // Default value, will be updated by InitValidationCache -} +// Note: Validation cache is initialized on first use to avoid init function // InitValidationCache initializes cached validation values with actual config func InitValidationCache() { @@ -327,6 +323,10 @@ func InitValidationCache() { // //go:inline func ValidateAudioFrame(data []byte) error { + // Initialize cache on first use if not already done + if cachedMaxFrameSize == 0 { + InitValidationCache() + } // Optimized validation with pre-allocated error messages for minimal overhead dataLen := len(data) if dataLen == 0 { diff --git a/ui/src/components/AudioMetricsDashboard.tsx b/ui/src/components/AudioMetricsDashboard.tsx index a346366..c8123dc 100644 --- a/ui/src/components/AudioMetricsDashboard.tsx +++ b/ui/src/components/AudioMetricsDashboard.tsx @@ -4,6 +4,7 @@ import { LuActivity, LuClock, LuHardDrive, LuSettings, LuCpu, LuMemoryStick } fr import { AudioLevelMeter } from "@components/AudioLevelMeter"; import StatChart from "@components/StatChart"; +import LatencyHistogram from "@components/charts/LatencyHistogram"; import { cx } from "@/cva.config"; import { useMicrophone } from "@/hooks/useMicrophone"; import { useAudioLevel } from "@/hooks/useAudioLevel"; @@ -12,6 +13,11 @@ import api from "@/api"; import { AUDIO_CONFIG } from "@/config/constants"; import audioQualityService from "@/services/audioQualityService"; +interface LatencyHistogramData { + buckets: number[]; // Bucket boundaries in milliseconds + counts: number[]; // Count for each bucket +} + interface AudioMetrics { frames_received: number; frames_dropped: number; @@ -19,6 +25,7 @@ interface AudioMetrics { last_frame_time: string; connection_drops: number; average_latency: string; + latency_histogram?: LatencyHistogramData; } interface MicrophoneMetrics { @@ -28,6 +35,7 @@ interface MicrophoneMetrics { last_frame_time: string; connection_drops: number; average_latency: string; + latency_histogram?: LatencyHistogramData; } interface ProcessMetrics { @@ -503,6 +511,26 @@ export default function AudioMetricsDashboard() { )} + {/* Latency Histograms */} + {metrics && ( +
+ + {microphoneMetrics && ( + + )} +
+ )} + {/* Subprocess Resource Usage - Histogram View */}
{/* Audio Output Subprocess */} diff --git a/ui/src/components/charts/LatencyHistogram.tsx b/ui/src/components/charts/LatencyHistogram.tsx new file mode 100644 index 0000000..349e279 --- /dev/null +++ b/ui/src/components/charts/LatencyHistogram.tsx @@ -0,0 +1,145 @@ +import React, { useMemo } from 'react'; +import { BarChart, Bar, XAxis, YAxis, CartesianGrid, Tooltip, ResponsiveContainer } from 'recharts'; + +import { LatencyHistogramData } from '../../hooks/useAudioEvents'; + +interface LatencyHistogramProps { + data?: LatencyHistogramData; + title: string; + height?: number; + className?: string; +} + +interface ChartDataPoint { + bucket: string; + count: number; + bucketValue: number; +} + +const LatencyHistogram: React.FC = ({ + data, + title, + height = 200, + className = '' +}) => { + // Memoize chart data transformation to avoid recalculation on every render + const chartData = useMemo((): ChartDataPoint[] => { + if (!data || !data.buckets || !data.counts || data.buckets.length === 0) { + return []; + } + + const transformedData: ChartDataPoint[] = []; + + // Process each bucket with its count + for (let i = 0; i < data.buckets.length; i++) { + const bucketValue = data.buckets[i]; + const count = data.counts[i] || 0; + + // Skip empty buckets to reduce chart clutter + if (count === 0) continue; + + // Format bucket label based on value + let bucketLabel: string; + if (bucketValue < 1) { + bucketLabel = `${(bucketValue * 1000).toFixed(0)}μs`; + } else if (bucketValue < 1000) { + bucketLabel = `${bucketValue.toFixed(1)}ms`; + } else { + bucketLabel = `${(bucketValue / 1000).toFixed(1)}s`; + } + + transformedData.push({ + bucket: bucketLabel, + count, + bucketValue + }); + } + + // Handle overflow bucket (last count if it exists) + if (data.counts.length > data.buckets.length) { + const overflowCount = data.counts[data.counts.length - 1]; + if (overflowCount > 0) { + transformedData.push({ + bucket: '>2s', + count: overflowCount, + bucketValue: 2000 // 2 seconds in ms + }); + } + } + + return transformedData; + }, [data]); + + // Custom tooltip for better UX + const CustomTooltip = ({ active, payload, label }: { + active?: boolean; + payload?: { payload: ChartDataPoint }[]; + label?: string; + }) => { + if (active && payload && payload.length) { + const data = payload[0].payload; + return ( +
+

{`Latency: ${label}`}

+

{`Count: ${data.count}`}

+
+ ); + } + return null; + }; + + if (!data || chartData.length === 0) { + return ( +
+

+ {title} +

+
+ No latency data available +
+
+ ); + } + + return ( +
+

+ {title} +

+ + + + + + } /> + + + +
+ ); +}; + +export default LatencyHistogram; \ No newline at end of file diff --git a/ui/src/hooks/useAudioEvents.ts b/ui/src/hooks/useAudioEvents.ts index fb78857..03ee54a 100644 --- a/ui/src/hooks/useAudioEvents.ts +++ b/ui/src/hooks/useAudioEvents.ts @@ -19,6 +19,11 @@ export interface AudioMuteData { muted: boolean; } +export interface LatencyHistogramData { + buckets: number[]; // Bucket boundaries in milliseconds + counts: number[]; // Count for each bucket +} + export interface AudioMetricsData { frames_received: number; frames_dropped: number; @@ -26,6 +31,7 @@ export interface AudioMetricsData { last_frame_time: string; connection_drops: number; average_latency: string; + latency_histogram?: LatencyHistogramData; } export interface MicrophoneStateData { @@ -40,6 +46,7 @@ export interface MicrophoneMetricsData { last_frame_time: string; connection_drops: number; average_latency: string; + latency_histogram?: LatencyHistogramData; } export interface ProcessMetricsData {