feat(audio): optimize validation and add dynamic opus encoder configuration

Consolidate audio frame validation functions into a single optimized implementation and add dynamic OPUS encoder parameter updates based on quality settings. Initialize validation cache at startup for consistent performance.

Add latency profiler for end-to-end audio pipeline monitoring. Update test cases to use unified validation function and initialize cache.

The changes improve performance by reducing function call overhead and enabling runtime optimization of audio encoding parameters based on quality presets.
This commit is contained in:
Alex P 2025-08-27 23:44:16 +00:00
parent cdf0b20bc7
commit ece36ce5fd
16 changed files with 747 additions and 65 deletions

View File

@ -166,6 +166,47 @@ func SetAudioQuality(quality AudioQuality) {
presets := GetAudioQualityPresets()
if config, exists := presets[quality]; exists {
currentConfig = config
// Update CGO OPUS encoder parameters based on quality
var complexity, vbr, signalType, bandwidth, dtx int
switch quality {
case AudioQualityLow:
complexity = GetConfig().AudioQualityLowOpusComplexity
vbr = GetConfig().AudioQualityLowOpusVBR
signalType = GetConfig().AudioQualityLowOpusSignalType
bandwidth = GetConfig().AudioQualityLowOpusBandwidth
dtx = GetConfig().AudioQualityLowOpusDTX
case AudioQualityMedium:
complexity = GetConfig().AudioQualityMediumOpusComplexity
vbr = GetConfig().AudioQualityMediumOpusVBR
signalType = GetConfig().AudioQualityMediumOpusSignalType
bandwidth = GetConfig().AudioQualityMediumOpusBandwidth
dtx = GetConfig().AudioQualityMediumOpusDTX
case AudioQualityHigh:
complexity = GetConfig().AudioQualityHighOpusComplexity
vbr = GetConfig().AudioQualityHighOpusVBR
signalType = GetConfig().AudioQualityHighOpusSignalType
bandwidth = GetConfig().AudioQualityHighOpusBandwidth
dtx = GetConfig().AudioQualityHighOpusDTX
case AudioQualityUltra:
complexity = GetConfig().AudioQualityUltraOpusComplexity
vbr = GetConfig().AudioQualityUltraOpusVBR
signalType = GetConfig().AudioQualityUltraOpusSignalType
bandwidth = GetConfig().AudioQualityUltraOpusBandwidth
dtx = GetConfig().AudioQualityUltraOpusDTX
default:
// Use medium quality as fallback
complexity = GetConfig().AudioQualityMediumOpusComplexity
vbr = GetConfig().AudioQualityMediumOpusVBR
signalType = GetConfig().AudioQualityMediumOpusSignalType
bandwidth = GetConfig().AudioQualityMediumOpusBandwidth
dtx = GetConfig().AudioQualityMediumOpusDTX
}
// Dynamically update CGO OPUS encoder parameters
// Use current VBR constraint setting from config
vbrConstraint := GetConfig().CGOOpusVBRConstraint
updateOpusEncoderParams(config.Bitrate*1000, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx)
}
}

View File

@ -65,6 +65,35 @@ static volatile int capture_initialized = 0;
static volatile int playback_initializing = 0;
static volatile int playback_initialized = 0;
// Function to dynamically update Opus encoder parameters
int update_opus_encoder_params(int bitrate, int complexity, int vbr, int vbr_constraint,
int signal_type, int bandwidth, int dtx) {
if (!encoder || !capture_initialized) {
return -1; // Encoder not initialized
}
// Update the static variables
opus_bitrate = bitrate;
opus_complexity = complexity;
opus_vbr = vbr;
opus_vbr_constraint = vbr_constraint;
opus_signal_type = signal_type;
opus_bandwidth = bandwidth;
opus_dtx = dtx;
// Apply the new settings to the encoder
int result = 0;
result |= opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate));
result |= opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(opus_complexity));
result |= opus_encoder_ctl(encoder, OPUS_SET_VBR(opus_vbr));
result |= opus_encoder_ctl(encoder, OPUS_SET_VBR_CONSTRAINT(opus_vbr_constraint));
result |= opus_encoder_ctl(encoder, OPUS_SET_SIGNAL(opus_signal_type));
result |= opus_encoder_ctl(encoder, OPUS_SET_BANDWIDTH(opus_bandwidth));
result |= opus_encoder_ctl(encoder, OPUS_SET_DTX(opus_dtx));
return result; // 0 on success, non-zero on error
}
// Enhanced ALSA device opening with exponential backoff retry logic
static int safe_alsa_open(snd_pcm_t **handle, const char *device, snd_pcm_stream_t stream) {
int attempt = 0;
@ -733,12 +762,30 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) {
return int(n), nil
}
// updateOpusEncoderParams dynamically updates OPUS encoder parameters
func updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx int) error {
result := C.update_opus_encoder_params(
C.int(bitrate),
C.int(complexity),
C.int(vbr),
C.int(vbrConstraint),
C.int(signalType),
C.int(bandwidth),
C.int(dtx),
)
if result != 0 {
return fmt.Errorf("failed to update OPUS encoder parameters: C error code %d", result)
}
return nil
}
// CGO function aliases
var (
CGOAudioInit = cgoAudioInit
CGOAudioClose = cgoAudioClose
CGOAudioReadEncode = cgoAudioReadEncode
CGOAudioPlaybackInit = cgoAudioPlaybackInit
CGOAudioPlaybackClose = cgoAudioPlaybackClose
CGOAudioDecodeWrite = cgoAudioDecodeWrite
CGOAudioInit = cgoAudioInit
CGOAudioClose = cgoAudioClose
CGOAudioReadEncode = cgoAudioReadEncode
CGOAudioPlaybackInit = cgoAudioPlaybackInit
CGOAudioPlaybackClose = cgoAudioPlaybackClose
CGOAudioDecodeWrite = cgoAudioDecodeWrite
CGOUpdateOpusEncoderParams = updateOpusEncoderParams
)

View File

@ -53,6 +53,31 @@ type AudioConfigConstants struct {
AudioQualityHighChannels int // High-quality channel count (default: 2)
AudioQualityUltraChannels int // Ultra-quality channel count (default: 2)
// Audio Quality OPUS Encoder Parameters
AudioQualityLowOpusComplexity int // Low-quality OPUS complexity (default: 1)
AudioQualityLowOpusVBR int // Low-quality OPUS VBR setting (default: 0)
AudioQualityLowOpusSignalType int // Low-quality OPUS signal type (default: 3001)
AudioQualityLowOpusBandwidth int // Low-quality OPUS bandwidth (default: 1101)
AudioQualityLowOpusDTX int // Low-quality OPUS DTX setting (default: 1)
AudioQualityMediumOpusComplexity int // Medium-quality OPUS complexity (default: 5)
AudioQualityMediumOpusVBR int // Medium-quality OPUS VBR setting (default: 1)
AudioQualityMediumOpusSignalType int // Medium-quality OPUS signal type (default: 3002)
AudioQualityMediumOpusBandwidth int // Medium-quality OPUS bandwidth (default: 1103)
AudioQualityMediumOpusDTX int // Medium-quality OPUS DTX setting (default: 0)
AudioQualityHighOpusComplexity int // High-quality OPUS complexity (default: 8)
AudioQualityHighOpusVBR int // High-quality OPUS VBR setting (default: 1)
AudioQualityHighOpusSignalType int // High-quality OPUS signal type (default: 3002)
AudioQualityHighOpusBandwidth int // High-quality OPUS bandwidth (default: 1104)
AudioQualityHighOpusDTX int // High-quality OPUS DTX setting (default: 0)
AudioQualityUltraOpusComplexity int // Ultra-quality OPUS complexity (default: 10)
AudioQualityUltraOpusVBR int // Ultra-quality OPUS VBR setting (default: 1)
AudioQualityUltraOpusSignalType int // Ultra-quality OPUS signal type (default: 3002)
AudioQualityUltraOpusBandwidth int // Ultra-quality OPUS bandwidth (default: 1105)
AudioQualityUltraOpusDTX int // Ultra-quality OPUS DTX setting (default: 0)
// CGO Audio Constants
CGOOpusBitrate int // Native Opus encoder bitrate in bps (default: 96000)
@ -1616,6 +1641,38 @@ func DefaultAudioConfig() *AudioConfigConstants {
AudioQualityHighChannels: 2,
AudioQualityUltraChannels: 2,
// Audio Quality OPUS Encoder Parameters - Quality-specific encoder settings
// Used in: Dynamic OPUS encoder configuration based on quality presets
// Impact: Controls encoding complexity, VBR, signal type, bandwidth, and DTX
// Low Quality OPUS Parameters - Optimized for bandwidth conservation
AudioQualityLowOpusComplexity: 1, // Low complexity for minimal CPU usage
AudioQualityLowOpusVBR: 0, // CBR for predictable bandwidth
AudioQualityLowOpusSignalType: 3001, // OPUS_SIGNAL_VOICE
AudioQualityLowOpusBandwidth: 1101, // OPUS_BANDWIDTH_NARROWBAND
AudioQualityLowOpusDTX: 1, // Enable DTX for silence suppression
// Medium Quality OPUS Parameters - Balanced performance and quality
AudioQualityMediumOpusComplexity: 5, // Medium complexity for balanced performance
AudioQualityMediumOpusVBR: 1, // VBR for better quality
AudioQualityMediumOpusSignalType: 3002, // OPUS_SIGNAL_MUSIC
AudioQualityMediumOpusBandwidth: 1103, // OPUS_BANDWIDTH_WIDEBAND
AudioQualityMediumOpusDTX: 0, // Disable DTX for consistent quality
// High Quality OPUS Parameters - High quality with good performance
AudioQualityHighOpusComplexity: 8, // High complexity for better quality
AudioQualityHighOpusVBR: 1, // VBR for optimal quality
AudioQualityHighOpusSignalType: 3002, // OPUS_SIGNAL_MUSIC
AudioQualityHighOpusBandwidth: 1104, // OPUS_BANDWIDTH_SUPERWIDEBAND
AudioQualityHighOpusDTX: 0, // Disable DTX for consistent quality
// Ultra Quality OPUS Parameters - Maximum quality settings
AudioQualityUltraOpusComplexity: 10, // Maximum complexity for best quality
AudioQualityUltraOpusVBR: 1, // VBR for optimal quality
AudioQualityUltraOpusSignalType: 3002, // OPUS_SIGNAL_MUSIC
AudioQualityUltraOpusBandwidth: 1105, // OPUS_BANDWIDTH_FULLBAND
AudioQualityUltraOpusDTX: 0, // Disable DTX for maximum quality
// CGO Audio Constants
CGOOpusBitrate: 96000,
CGOOpusComplexity: 3,

View File

@ -84,7 +84,7 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error {
}
// Use ultra-fast validation for critical audio path
if err := ValidateAudioFrameUltraFast(frame); err != nil {
if err := ValidateAudioFrame(frame); err != nil {
aim.logComponentError(AudioInputManagerComponent, err, "Frame validation failed")
return fmt.Errorf("input frame validation failed: %w", err)
}

View File

@ -478,7 +478,7 @@ func (ais *AudioInputServer) processOpusFrame(data []byte) error {
}
// Use ultra-fast validation for critical audio path
if err := ValidateAudioFrameUltraFast(data); err != nil {
if err := ValidateAudioFrame(data); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
logger.Error().Err(err).Msg("Frame validation failed")
return fmt.Errorf("input frame validation failed: %w", err)
@ -635,7 +635,7 @@ func (aic *AudioInputClient) SendFrame(frame []byte) error {
}
// Validate frame data before sending
if err := ValidateAudioFrameUltraFast(frame); err != nil {
if err := ValidateAudioFrame(frame); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
logger.Error().Err(err).Msg("Frame validation failed")
return fmt.Errorf("input frame validation failed: %w", err)

View File

@ -103,7 +103,7 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error {
}
// Validate frame data
if err := ValidateAudioFrameUltraFast(frame); err != nil {
if err := ValidateAudioFrame(frame); err != nil {
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
aim.logger.Debug().Err(err).Msg("invalid frame data")
return err

View File

@ -16,6 +16,9 @@ func RunAudioInputServer() error {
logger := logging.GetDefaultLogger().With().Str("component", "audio-input-server").Logger()
logger.Debug().Msg("audio input server subprocess starting")
// Initialize validation cache for optimal performance
InitValidationCache()
// Start adaptive buffer management for optimal performance
StartAdaptiveBuffering()
defer StopAdaptiveBuffering()

View File

@ -261,7 +261,7 @@ func (s *AudioOutputServer) Close() error {
func (s *AudioOutputServer) SendFrame(frame []byte) error {
// Use ultra-fast validation for critical audio path
if err := ValidateAudioFrameUltraFast(frame); err != nil {
if err := ValidateAudioFrame(frame); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger()
logger.Error().Err(err).Msg("Frame validation failed")
return fmt.Errorf("output frame validation failed: %w", err)

View File

@ -0,0 +1,535 @@
package audio
import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// LatencyProfiler provides comprehensive end-to-end audio latency profiling
// with nanosecond precision across the entire WebRTC->IPC->CGO->ALSA pipeline
type LatencyProfiler struct {
// Atomic counters for thread-safe access (MUST be first for ARM32 alignment)
totalMeasurements int64 // Total number of measurements taken
webrtcLatencySum int64 // Sum of WebRTC processing latencies (nanoseconds)
ipcLatencySum int64 // Sum of IPC communication latencies (nanoseconds)
cgoLatencySum int64 // Sum of CGO call latencies (nanoseconds)
alsaLatencySum int64 // Sum of ALSA device latencies (nanoseconds)
endToEndLatencySum int64 // Sum of complete end-to-end latencies (nanoseconds)
validationLatencySum int64 // Sum of validation overhead (nanoseconds)
serializationLatencySum int64 // Sum of serialization overhead (nanoseconds)
// Peak latency tracking
maxWebrtcLatency int64 // Maximum WebRTC latency observed (nanoseconds)
maxIpcLatency int64 // Maximum IPC latency observed (nanoseconds)
maxCgoLatency int64 // Maximum CGO latency observed (nanoseconds)
maxAlsaLatency int64 // Maximum ALSA latency observed (nanoseconds)
maxEndToEndLatency int64 // Maximum end-to-end latency observed (nanoseconds)
// Configuration and control
config LatencyProfilerConfig
logger zerolog.Logger
ctx context.Context
cancel context.CancelFunc
running int32 // Atomic flag for profiler state
enabled int32 // Atomic flag for measurement collection
// Detailed measurement storage
measurements []DetailedLatencyMeasurement
measurementMutex sync.RWMutex
measurementIndex int
// High-resolution timing
timeSource func() int64 // Nanosecond precision time source
}
// LatencyProfilerConfig defines profiler configuration
type LatencyProfilerConfig struct {
MaxMeasurements int // Maximum measurements to store in memory
SamplingRate float64 // Sampling rate (0.0-1.0, 1.0 = profile every frame)
ReportingInterval time.Duration // How often to log profiling reports
ThresholdWarning time.Duration // Latency threshold for warnings
ThresholdCritical time.Duration // Latency threshold for critical alerts
EnableDetailedTrace bool // Enable detailed per-component tracing
EnableHistogram bool // Enable latency histogram collection
}
// DetailedLatencyMeasurement captures comprehensive latency breakdown
type DetailedLatencyMeasurement struct {
Timestamp time.Time // When the measurement was taken
FrameID uint64 // Unique frame identifier for tracing
WebRTCLatency time.Duration // WebRTC processing time
IPCLatency time.Duration // IPC communication time
CGOLatency time.Duration // CGO call overhead
ALSALatency time.Duration // ALSA device processing time
ValidationLatency time.Duration // Frame validation overhead
SerializationLatency time.Duration // Data serialization overhead
EndToEndLatency time.Duration // Complete pipeline latency
Source string // Source component (input/output)
FrameSize int // Size of the audio frame in bytes
CPUUsage float64 // CPU usage at time of measurement
MemoryUsage uint64 // Memory usage at time of measurement
}
// LatencyProfileReport contains aggregated profiling results
type LatencyProfileReport struct {
TotalMeasurements int64 // Total measurements taken
TimeRange time.Duration // Time span of measurements
// Average latencies
AvgWebRTCLatency time.Duration
AvgIPCLatency time.Duration
AvgCGOLatency time.Duration
AvgALSALatency time.Duration
AvgEndToEndLatency time.Duration
AvgValidationLatency time.Duration
AvgSerializationLatency time.Duration
// Peak latencies
MaxWebRTCLatency time.Duration
MaxIPCLatency time.Duration
MaxCGOLatency time.Duration
MaxALSALatency time.Duration
MaxEndToEndLatency time.Duration
// Performance analysis
BottleneckComponent string // Component with highest average latency
LatencyDistribution map[string]int // Histogram of latency ranges
Throughput float64 // Frames per second processed
}
// FrameLatencyTracker tracks latency for a single audio frame through the pipeline
type FrameLatencyTracker struct {
frameID uint64
startTime int64 // Nanosecond timestamp
webrtcStartTime int64
ipcStartTime int64
cgoStartTime int64
alsaStartTime int64
validationStartTime int64
serializationStartTime int64
frameSize int
source string
}
// Global profiler instance
var (
globalLatencyProfiler unsafe.Pointer // *LatencyProfiler
profilerInitialized int32
)
// DefaultLatencyProfilerConfig returns default profiler configuration
func DefaultLatencyProfilerConfig() LatencyProfilerConfig {
return LatencyProfilerConfig{
MaxMeasurements: 10000,
SamplingRate: 0.1, // Profile 10% of frames to minimize overhead
ReportingInterval: 30 * time.Second,
ThresholdWarning: 50 * time.Millisecond,
ThresholdCritical: 100 * time.Millisecond,
EnableDetailedTrace: false, // Disabled by default for performance
EnableHistogram: true,
}
}
// NewLatencyProfiler creates a new latency profiler
func NewLatencyProfiler(config LatencyProfilerConfig) *LatencyProfiler {
ctx, cancel := context.WithCancel(context.Background())
logger := logging.GetDefaultLogger().With().Str("component", "latency-profiler").Logger()
// Validate configuration
if config.MaxMeasurements <= 0 {
config.MaxMeasurements = 10000
}
if config.SamplingRate < 0.0 || config.SamplingRate > 1.0 {
config.SamplingRate = 0.1
}
if config.ReportingInterval <= 0 {
config.ReportingInterval = 30 * time.Second
}
profiler := &LatencyProfiler{
config: config,
logger: logger,
ctx: ctx,
cancel: cancel,
measurements: make([]DetailedLatencyMeasurement, config.MaxMeasurements),
timeSource: func() int64 { return time.Now().UnixNano() },
}
// Initialize peak latencies to zero
atomic.StoreInt64(&profiler.maxWebrtcLatency, 0)
atomic.StoreInt64(&profiler.maxIpcLatency, 0)
atomic.StoreInt64(&profiler.maxCgoLatency, 0)
atomic.StoreInt64(&profiler.maxAlsaLatency, 0)
atomic.StoreInt64(&profiler.maxEndToEndLatency, 0)
return profiler
}
// Start begins latency profiling
func (lp *LatencyProfiler) Start() error {
if !atomic.CompareAndSwapInt32(&lp.running, 0, 1) {
return fmt.Errorf("latency profiler already running")
}
// Enable measurement collection
atomic.StoreInt32(&lp.enabled, 1)
// Start reporting goroutine
go lp.reportingLoop()
lp.logger.Info().Float64("sampling_rate", lp.config.SamplingRate).Msg("latency profiler started")
return nil
}
// Stop stops latency profiling
func (lp *LatencyProfiler) Stop() {
if !atomic.CompareAndSwapInt32(&lp.running, 1, 0) {
return
}
// Disable measurement collection
atomic.StoreInt32(&lp.enabled, 0)
// Cancel context to stop reporting
lp.cancel()
lp.logger.Info().Msg("latency profiler stopped")
}
// IsEnabled returns whether profiling is currently enabled
func (lp *LatencyProfiler) IsEnabled() bool {
return atomic.LoadInt32(&lp.enabled) == 1
}
// StartFrameTracking begins tracking latency for a new audio frame
func (lp *LatencyProfiler) StartFrameTracking(frameID uint64, frameSize int, source string) *FrameLatencyTracker {
if !lp.IsEnabled() {
return nil
}
// Apply sampling rate to reduce profiling overhead
if lp.config.SamplingRate < 1.0 {
// Simple sampling based on frame ID
if float64(frameID%100)/100.0 > lp.config.SamplingRate {
return nil
}
}
now := lp.timeSource()
return &FrameLatencyTracker{
frameID: frameID,
startTime: now,
frameSize: frameSize,
source: source,
}
}
// TrackWebRTCStart marks the start of WebRTC processing
func (tracker *FrameLatencyTracker) TrackWebRTCStart() {
if tracker != nil {
tracker.webrtcStartTime = time.Now().UnixNano()
}
}
// TrackIPCStart marks the start of IPC communication
func (tracker *FrameLatencyTracker) TrackIPCStart() {
if tracker != nil {
tracker.ipcStartTime = time.Now().UnixNano()
}
}
// TrackCGOStart marks the start of CGO processing
func (tracker *FrameLatencyTracker) TrackCGOStart() {
if tracker != nil {
tracker.cgoStartTime = time.Now().UnixNano()
}
}
// TrackALSAStart marks the start of ALSA device processing
func (tracker *FrameLatencyTracker) TrackALSAStart() {
if tracker != nil {
tracker.alsaStartTime = time.Now().UnixNano()
}
}
// TrackValidationStart marks the start of frame validation
func (tracker *FrameLatencyTracker) TrackValidationStart() {
if tracker != nil {
tracker.validationStartTime = time.Now().UnixNano()
}
}
// TrackSerializationStart marks the start of data serialization
func (tracker *FrameLatencyTracker) TrackSerializationStart() {
if tracker != nil {
tracker.serializationStartTime = time.Now().UnixNano()
}
}
// FinishTracking completes frame tracking and records the measurement
func (lp *LatencyProfiler) FinishTracking(tracker *FrameLatencyTracker) {
if tracker == nil || !lp.IsEnabled() {
return
}
endTime := lp.timeSource()
// Calculate component latencies
var webrtcLatency, ipcLatency, cgoLatency, alsaLatency, validationLatency, serializationLatency time.Duration
if tracker.webrtcStartTime > 0 {
webrtcLatency = time.Duration(tracker.ipcStartTime - tracker.webrtcStartTime)
}
if tracker.ipcStartTime > 0 {
ipcLatency = time.Duration(tracker.cgoStartTime - tracker.ipcStartTime)
}
if tracker.cgoStartTime > 0 {
cgoLatency = time.Duration(tracker.alsaStartTime - tracker.cgoStartTime)
}
if tracker.alsaStartTime > 0 {
alsaLatency = time.Duration(endTime - tracker.alsaStartTime)
}
if tracker.validationStartTime > 0 {
validationLatency = time.Duration(tracker.ipcStartTime - tracker.validationStartTime)
}
if tracker.serializationStartTime > 0 {
serializationLatency = time.Duration(tracker.cgoStartTime - tracker.serializationStartTime)
}
endToEndLatency := time.Duration(endTime - tracker.startTime)
// Update atomic counters
atomic.AddInt64(&lp.totalMeasurements, 1)
atomic.AddInt64(&lp.webrtcLatencySum, webrtcLatency.Nanoseconds())
atomic.AddInt64(&lp.ipcLatencySum, ipcLatency.Nanoseconds())
atomic.AddInt64(&lp.cgoLatencySum, cgoLatency.Nanoseconds())
atomic.AddInt64(&lp.alsaLatencySum, alsaLatency.Nanoseconds())
atomic.AddInt64(&lp.endToEndLatencySum, endToEndLatency.Nanoseconds())
atomic.AddInt64(&lp.validationLatencySum, validationLatency.Nanoseconds())
atomic.AddInt64(&lp.serializationLatencySum, serializationLatency.Nanoseconds())
// Update peak latencies
lp.updatePeakLatency(&lp.maxWebrtcLatency, webrtcLatency.Nanoseconds())
lp.updatePeakLatency(&lp.maxIpcLatency, ipcLatency.Nanoseconds())
lp.updatePeakLatency(&lp.maxCgoLatency, cgoLatency.Nanoseconds())
lp.updatePeakLatency(&lp.maxAlsaLatency, alsaLatency.Nanoseconds())
lp.updatePeakLatency(&lp.maxEndToEndLatency, endToEndLatency.Nanoseconds())
// Store detailed measurement if enabled
if lp.config.EnableDetailedTrace {
lp.storeMeasurement(DetailedLatencyMeasurement{
Timestamp: time.Now(),
FrameID: tracker.frameID,
WebRTCLatency: webrtcLatency,
IPCLatency: ipcLatency,
CGOLatency: cgoLatency,
ALSALatency: alsaLatency,
ValidationLatency: validationLatency,
SerializationLatency: serializationLatency,
EndToEndLatency: endToEndLatency,
Source: tracker.source,
FrameSize: tracker.frameSize,
CPUUsage: lp.getCurrentCPUUsage(),
MemoryUsage: lp.getCurrentMemoryUsage(),
})
}
// Check for threshold violations
if endToEndLatency > lp.config.ThresholdCritical {
lp.logger.Error().Dur("latency", endToEndLatency).Uint64("frame_id", tracker.frameID).
Str("source", tracker.source).Msg("critical latency threshold exceeded")
} else if endToEndLatency > lp.config.ThresholdWarning {
lp.logger.Warn().Dur("latency", endToEndLatency).Uint64("frame_id", tracker.frameID).
Str("source", tracker.source).Msg("warning latency threshold exceeded")
}
}
// updatePeakLatency atomically updates peak latency if new value is higher
func (lp *LatencyProfiler) updatePeakLatency(peakPtr *int64, newLatency int64) {
for {
current := atomic.LoadInt64(peakPtr)
if newLatency <= current || atomic.CompareAndSwapInt64(peakPtr, current, newLatency) {
break
}
}
}
// storeMeasurement stores a detailed measurement in the circular buffer
func (lp *LatencyProfiler) storeMeasurement(measurement DetailedLatencyMeasurement) {
lp.measurementMutex.Lock()
defer lp.measurementMutex.Unlock()
lp.measurements[lp.measurementIndex] = measurement
lp.measurementIndex = (lp.measurementIndex + 1) % len(lp.measurements)
}
// GetReport generates a comprehensive latency profiling report
func (lp *LatencyProfiler) GetReport() LatencyProfileReport {
totalMeasurements := atomic.LoadInt64(&lp.totalMeasurements)
if totalMeasurements == 0 {
return LatencyProfileReport{}
}
// Calculate averages
avgWebRTC := time.Duration(atomic.LoadInt64(&lp.webrtcLatencySum) / totalMeasurements)
avgIPC := time.Duration(atomic.LoadInt64(&lp.ipcLatencySum) / totalMeasurements)
avgCGO := time.Duration(atomic.LoadInt64(&lp.cgoLatencySum) / totalMeasurements)
avgALSA := time.Duration(atomic.LoadInt64(&lp.alsaLatencySum) / totalMeasurements)
avgEndToEnd := time.Duration(atomic.LoadInt64(&lp.endToEndLatencySum) / totalMeasurements)
avgValidation := time.Duration(atomic.LoadInt64(&lp.validationLatencySum) / totalMeasurements)
avgSerialization := time.Duration(atomic.LoadInt64(&lp.serializationLatencySum) / totalMeasurements)
// Get peak latencies
maxWebRTC := time.Duration(atomic.LoadInt64(&lp.maxWebrtcLatency))
maxIPC := time.Duration(atomic.LoadInt64(&lp.maxIpcLatency))
maxCGO := time.Duration(atomic.LoadInt64(&lp.maxCgoLatency))
maxALSA := time.Duration(atomic.LoadInt64(&lp.maxAlsaLatency))
maxEndToEnd := time.Duration(atomic.LoadInt64(&lp.maxEndToEndLatency))
// Determine bottleneck component
bottleneck := "WebRTC"
maxAvg := avgWebRTC
if avgIPC > maxAvg {
bottleneck = "IPC"
maxAvg = avgIPC
}
if avgCGO > maxAvg {
bottleneck = "CGO"
maxAvg = avgCGO
}
if avgALSA > maxAvg {
bottleneck = "ALSA"
}
return LatencyProfileReport{
TotalMeasurements: totalMeasurements,
AvgWebRTCLatency: avgWebRTC,
AvgIPCLatency: avgIPC,
AvgCGOLatency: avgCGO,
AvgALSALatency: avgALSA,
AvgEndToEndLatency: avgEndToEnd,
AvgValidationLatency: avgValidation,
AvgSerializationLatency: avgSerialization,
MaxWebRTCLatency: maxWebRTC,
MaxIPCLatency: maxIPC,
MaxCGOLatency: maxCGO,
MaxALSALatency: maxALSA,
MaxEndToEndLatency: maxEndToEnd,
BottleneckComponent: bottleneck,
}
}
// reportingLoop periodically logs profiling reports
func (lp *LatencyProfiler) reportingLoop() {
ticker := time.NewTicker(lp.config.ReportingInterval)
defer ticker.Stop()
for {
select {
case <-lp.ctx.Done():
return
case <-ticker.C:
report := lp.GetReport()
if report.TotalMeasurements > 0 {
lp.logReport(report)
}
}
}
}
// logReport logs a comprehensive profiling report
func (lp *LatencyProfiler) logReport(report LatencyProfileReport) {
lp.logger.Info().
Int64("total_measurements", report.TotalMeasurements).
Dur("avg_webrtc_latency", report.AvgWebRTCLatency).
Dur("avg_ipc_latency", report.AvgIPCLatency).
Dur("avg_cgo_latency", report.AvgCGOLatency).
Dur("avg_alsa_latency", report.AvgALSALatency).
Dur("avg_end_to_end_latency", report.AvgEndToEndLatency).
Dur("avg_validation_latency", report.AvgValidationLatency).
Dur("avg_serialization_latency", report.AvgSerializationLatency).
Dur("max_webrtc_latency", report.MaxWebRTCLatency).
Dur("max_ipc_latency", report.MaxIPCLatency).
Dur("max_cgo_latency", report.MaxCGOLatency).
Dur("max_alsa_latency", report.MaxALSALatency).
Dur("max_end_to_end_latency", report.MaxEndToEndLatency).
Str("bottleneck_component", report.BottleneckComponent).
Msg("latency profiling report")
}
// getCurrentCPUUsage returns current CPU usage percentage
func (lp *LatencyProfiler) getCurrentCPUUsage() float64 {
// Simplified CPU usage - in production, this would use more sophisticated monitoring
var m runtime.MemStats
runtime.ReadMemStats(&m)
return float64(runtime.NumGoroutine()) / 100.0 // Rough approximation
}
// getCurrentMemoryUsage returns current memory usage in bytes
func (lp *LatencyProfiler) getCurrentMemoryUsage() uint64 {
var m runtime.MemStats
runtime.ReadMemStats(&m)
return m.Alloc
}
// GetGlobalLatencyProfiler returns the global latency profiler instance
func GetGlobalLatencyProfiler() *LatencyProfiler {
ptr := atomic.LoadPointer(&globalLatencyProfiler)
if ptr != nil {
return (*LatencyProfiler)(ptr)
}
// Initialize on first use
if atomic.CompareAndSwapInt32(&profilerInitialized, 0, 1) {
config := DefaultLatencyProfilerConfig()
profiler := NewLatencyProfiler(config)
atomic.StorePointer(&globalLatencyProfiler, unsafe.Pointer(profiler))
return profiler
}
// Another goroutine initialized it, try again
ptr = atomic.LoadPointer(&globalLatencyProfiler)
if ptr != nil {
return (*LatencyProfiler)(ptr)
}
// Fallback: create a new profiler
config := DefaultLatencyProfilerConfig()
return NewLatencyProfiler(config)
}
// EnableLatencyProfiling enables the global latency profiler
func EnableLatencyProfiling() error {
profiler := GetGlobalLatencyProfiler()
return profiler.Start()
}
// DisableLatencyProfiling disables the global latency profiler
func DisableLatencyProfiling() {
ptr := atomic.LoadPointer(&globalLatencyProfiler)
if ptr != nil {
profiler := (*LatencyProfiler)(ptr)
profiler.Stop()
}
}
// ProfileFrameLatency is a convenience function to profile a single frame's latency
func ProfileFrameLatency(frameID uint64, frameSize int, source string, fn func(*FrameLatencyTracker)) {
profiler := GetGlobalLatencyProfiler()
if !profiler.IsEnabled() {
fn(nil)
return
}
tracker := profiler.StartFrameTracking(frameID, frameSize, source)
defer profiler.FinishTracking(tracker)
fn(tracker)
}

View File

@ -16,6 +16,9 @@ func RunAudioOutputServer() error {
logger := logging.GetDefaultLogger().With().Str("component", "audio-output-server").Logger()
logger.Debug().Msg("audio output server subprocess starting")
// Initialize validation cache for optimal performance
InitValidationCache()
// Create audio server
server, err := NewAudioOutputServer()
if err != nil {

View File

@ -393,7 +393,7 @@ func StartAudioOutputStreaming(send func([]byte)) error {
copy(frame, buffer[:n])
// Validate frame before sending
if err := ValidateAudioFrameFast(frame); err != nil {
if err := ValidateAudioFrame(frame); err != nil {
getOutputStreamingLogger().Warn().Err(err).Msg("Frame validation failed, dropping frame")
PutAudioFrameBuffer(frame)
continue

View File

@ -17,6 +17,13 @@ import (
// TestPerformanceCriticalPaths tests the most frequently executed code paths
// to ensure they remain efficient and don't interfere with KVM functionality
func TestPerformanceCriticalPaths(t *testing.T) {
if testing.Short() {
t.Skip("Skipping performance tests in short mode")
}
// Initialize validation cache for performance testing
InitValidationCache()
tests := []struct {
name string
testFunc func(t *testing.T)
@ -33,9 +40,6 @@ func TestPerformanceCriticalPaths(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if testing.Short() {
t.Skip("Skipping performance test in short mode")
}
tt.testFunc(t)
})
}
@ -59,7 +63,7 @@ func testAudioFrameProcessingLatency(t *testing.T) {
start := time.Now()
for i := 0; i < frameCount; i++ {
// Simulate the critical path: validation + metrics update
err := ValidateAudioFrameFast(frameData)
err := ValidateAudioFrame(frameData)
require.NoError(t, err)
// Record frame received (atomic operation)
@ -139,10 +143,10 @@ func testValidationFunctionSpeed(t *testing.T) {
const iterations = 10000
frameData := make([]byte, 1920)
// Test ValidateAudioFrameFast (most critical)
// Test ValidateAudioFrame (most critical)
start := time.Now()
for i := 0; i < iterations; i++ {
err := ValidateAudioFrameFast(frameData)
err := ValidateAudioFrame(frameData)
require.NoError(t, err)
}
fastValidationLatency := time.Since(start) / iterations
@ -163,13 +167,13 @@ func testValidationFunctionSpeed(t *testing.T) {
}
bufferValidationLatency := time.Since(start) / iterations
t.Logf("ValidateAudioFrameFast latency: %v", fastValidationLatency)
t.Logf("ValidateAudioFrame latency: %v", fastValidationLatency)
t.Logf("ValidateAudioQuality latency: %v", qualityValidationLatency)
t.Logf("ValidateBufferSize latency: %v", bufferValidationLatency)
// Validation functions optimized for ARM Cortex-A7 single core @ 1GHz
// Conservative thresholds to ensure KVM functionality isn't impacted
assert.Less(t, fastValidationLatency, 100*time.Microsecond, "ValidateAudioFrameFast too slow")
assert.Less(t, fastValidationLatency, 100*time.Microsecond, "ValidateAudioFrame too slow")
assert.Less(t, qualityValidationLatency, 50*time.Microsecond, "ValidateAudioQuality too slow")
assert.Less(t, bufferValidationLatency, 50*time.Microsecond, "ValidateBufferSize too slow")
}
@ -218,7 +222,7 @@ func testConcurrentAccessPerformance(t *testing.T) {
for j := 0; j < operationsPerGoroutine; j++ {
// Simulate concurrent audio processing
_ = ValidateAudioFrameFast(frameData)
_ = ValidateAudioFrame(frameData)
RecordFrameReceived(len(frameData))
_ = GetAudioMetrics()
_ = GetAudioConfig()
@ -305,7 +309,7 @@ func TestRegressionDetection(t *testing.T) {
frameData := make([]byte, 1920)
start := time.Now()
for i := 0; i < 100; i++ {
_ = ValidateAudioFrameFast(frameData)
_ = ValidateAudioFrame(frameData)
RecordFrameReceived(len(frameData))
}
frameProcessingTime := time.Since(start) / 100
@ -367,7 +371,7 @@ func TestMemoryLeakDetection(t *testing.T) {
for cycle := 0; cycle < 10; cycle++ {
for i := 0; i < 1000; i++ {
frameData := make([]byte, 1920)
_ = ValidateAudioFrameFast(frameData)
_ = ValidateAudioFrame(frameData)
RecordFrameReceived(len(frameData))
_ = GetAudioMetrics()
_ = GetAudioConfig()

View File

@ -171,7 +171,7 @@ func (r *AudioRelay) relayLoop() {
// forwardToWebRTC forwards a frame to the WebRTC audio track
func (r *AudioRelay) forwardToWebRTC(frame []byte) error {
// Use ultra-fast validation for critical audio path
if err := ValidateAudioFrameUltraFast(frame); err != nil {
if err := ValidateAudioFrame(frame); err != nil {
r.incrementDropped()
r.logger.Debug().Err(err).Msg("invalid frame data in relay")
return err

View File

@ -1,3 +1,6 @@
//go:build !cgo || arm
// +build !cgo arm
package audio
import (
@ -298,54 +301,34 @@ func ValidateAudioConfigConstants(config *AudioConfigConstants) error {
return nil
}
// ValidateAudioFrameFast performs fast validation of audio frame data
// ValidateAudioFrameFast provides minimal validation for critical audio processing paths
// This function is optimized for performance and only checks essential safety bounds
func ValidateAudioFrameFast(data []byte) error {
if len(data) == 0 {
return fmt.Errorf("%w: frame data is empty", ErrInvalidFrameData)
}
maxFrameSize := GetConfig().MaxAudioFrameSize
if len(data) > maxFrameSize {
return fmt.Errorf("%w: frame size %d exceeds maximum %d", ErrInvalidFrameSize, len(data), maxFrameSize)
}
return nil
}
// Cached max frame size to avoid function call overhead in hot paths
var cachedMaxFrameSize int
// Cached constants for ultra-fast validation (initialized once at startup)
var (
maxFrameSizeCache = 8192 // Will be updated from config during init
)
// init initializes cached validation constants for optimal performance
//
//nolint:gochecknoinits // Required for performance-critical config caching
// Initialize validation cache at package initialization
func init() {
// Cache the maximum frame size to avoid function calls in hot paths
if config := GetConfig(); config != nil {
maxFrameSizeCache = config.MaxAudioFrameSize
}
// Fallback to safe default if config unavailable
if maxFrameSizeCache <= 0 {
maxFrameSizeCache = 8192
}
// This ensures the cache is always initialized before any validation calls
cachedMaxFrameSize = 4096 // Default value, will be updated by InitValidationCache
}
// ValidateAudioFrameUltraFast provides zero-overhead validation for ultra-critical paths
// This function only checks for nil/empty data and maximum size to prevent buffer overruns
// Use this in hot audio processing loops where every microsecond matters
// InitValidationCache initializes cached validation values with actual config
func InitValidationCache() {
cachedMaxFrameSize = GetConfig().MaxAudioFrameSize
}
// ValidateAudioFrame provides optimized validation for audio frame data
// This is the primary validation function used in all audio processing paths
//
// Performance optimizations:
// - Uses cached max frame size to avoid config function calls
// - Uses cached config value to eliminate function call overhead
// - Single branch condition for optimal CPU pipeline efficiency
// - Inlined length checks for minimal overhead
//
//go:inline
func ValidateAudioFrameUltraFast(data []byte) error {
func ValidateAudioFrame(data []byte) error {
// Single optimized check: empty data OR exceeds cached maximum
// This branch prediction friendly pattern minimizes CPU pipeline stalls
dataLen := len(data)
if dataLen == 0 || dataLen > maxFrameSizeCache {
if dataLen == 0 || dataLen > cachedMaxFrameSize {
return ErrInvalidFrameData
}
return nil

View File

@ -15,6 +15,9 @@ import (
// TestValidationFunctions provides comprehensive testing of all validation functions
// to ensure they catch breaking changes and regressions effectively
func TestValidationFunctions(t *testing.T) {
// Initialize validation cache for testing
InitValidationCache()
tests := []struct {
name string
testFunc func(t *testing.T)
@ -67,20 +70,20 @@ func testFrameDataValidation(t *testing.T) {
config := GetConfig()
// Test empty data
err := ValidateAudioFrameFast([]byte{})
err := ValidateAudioFrame([]byte{})
assert.Error(t, err)
assert.Contains(t, err.Error(), "frame data is empty")
// Test data above maximum size
largeData := make([]byte, config.MaxAudioFrameSize+1)
err = ValidateAudioFrameFast(largeData)
err = ValidateAudioFrame(largeData)
assert.Error(t, err)
assert.Contains(t, err.Error(), "exceeds maximum")
// Test valid data
validData := make([]byte, 1000) // Within bounds
if len(validData) <= config.MaxAudioFrameSize {
err = ValidateAudioFrameFast(validData)
err = ValidateAudioFrame(validData)
assert.NoError(t, err)
}
}
@ -439,19 +442,19 @@ func testAudioFrameFastValidation(t *testing.T) {
config := GetConfig()
// Test empty data
err := ValidateAudioFrameFast([]byte{})
err := ValidateAudioFrame([]byte{})
assert.Error(t, err)
assert.Contains(t, err.Error(), "frame data is empty")
// Test data exceeding maximum size
largeData := make([]byte, config.MaxAudioFrameSize+1)
err = ValidateAudioFrameFast(largeData)
err = ValidateAudioFrame(largeData)
assert.Error(t, err)
assert.Contains(t, err.Error(), "exceeds maximum")
// Test valid data
validData := make([]byte, 1000)
err = ValidateAudioFrameFast(validData)
err = ValidateAudioFrame(validData)
assert.NoError(t, err)
}
@ -513,6 +516,9 @@ func TestValidationPerformance(t *testing.T) {
t.Skip("Skipping performance test in short mode")
}
// Initialize validation cache for performance testing
InitValidationCache()
// Test that validation functions complete quickly
start := time.Now()
iterations := 10000

View File

@ -32,6 +32,9 @@ func runAudioServer() {
}
func startAudioSubprocess() error {
// Initialize validation cache for optimal performance
audio.InitValidationCache()
// Start adaptive buffer management for optimal performance
audio.StartAdaptiveBuffering()