From 00e5148eef17a4e39f5aaef6dc7c5fde4e0acf60 Mon Sep 17 00:00:00 2001 From: Alex P Date: Tue, 9 Sep 2025 06:52:40 +0000 Subject: [PATCH] [WIP] Cleanup: reduce PR complexity --- internal/audio/adaptive_buffer.go | 42 +- internal/audio/core_config_constants.go | 225 ++--- internal/audio/core_metrics.go | 108 --- internal/audio/core_validation.go | 26 - internal/audio/input_microphone_manager.go | 2 - internal/audio/input_supervisor.go | 1 - internal/audio/ipc_unified.go | 10 - internal/audio/mgmt_base_supervisor.go | 10 +- internal/audio/monitor_adaptive_optimizer.go | 329 -------- internal/audio/monitor_goroutine.go | 144 ---- internal/audio/monitor_latency.go | 333 -------- internal/audio/monitor_process.go | 406 --------- internal/audio/output_streaming.go | 1 - internal/audio/output_supervisor.go | 1 - internal/audio/util_buffer_pool.go | 836 ++----------------- internal/audio/zero_copy.go | 6 +- main.go | 5 - 17 files changed, 173 insertions(+), 2312 deletions(-) delete mode 100644 internal/audio/monitor_adaptive_optimizer.go delete mode 100644 internal/audio/monitor_goroutine.go delete mode 100644 internal/audio/monitor_latency.go delete mode 100644 internal/audio/monitor_process.go diff --git a/internal/audio/adaptive_buffer.go b/internal/audio/adaptive_buffer.go index dc9f6f6a..13b8571d 100644 --- a/internal/audio/adaptive_buffer.go +++ b/internal/audio/adaptive_buffer.go @@ -71,7 +71,7 @@ func DefaultAdaptiveBufferConfig() AdaptiveBufferConfig { // Latency targets TargetLatency: Config.AdaptiveBufferTargetLatency, // Target 20ms latency - MaxLatency: Config.LatencyMonitorTarget, // Max acceptable latency + MaxLatency: Config.MaxLatencyThreshold, // Max acceptable latency // Adaptation settings AdaptationInterval: Config.BufferUpdateInterval, // Check every 500ms @@ -89,9 +89,8 @@ type AdaptiveBufferManager struct { systemMemoryPercent int64 // System memory percentage * 100 (atomic) adaptationCount int64 // Metrics tracking (atomic) - config AdaptiveBufferConfig - logger zerolog.Logger - processMonitor *ProcessMonitor + config AdaptiveBufferConfig + logger zerolog.Logger // Control channels ctx context.Context @@ -119,10 +118,10 @@ func NewAdaptiveBufferManager(config AdaptiveBufferConfig) *AdaptiveBufferManage currentOutputBufferSize: int64(config.DefaultBufferSize), config: config, logger: logger, - processMonitor: GetProcessMonitor(), - ctx: ctx, - cancel: cancel, - lastAdaptation: time.Now(), + + ctx: ctx, + cancel: cancel, + lastAdaptation: time.Now(), } } @@ -235,30 +234,9 @@ func (abm *AdaptiveBufferManager) adaptationLoop() { // The algorithm runs periodically and only applies changes when the adaptation interval // has elapsed, preventing excessive adjustments that could destabilize the audio pipeline. func (abm *AdaptiveBufferManager) adaptBufferSizes() { - // Collect current system metrics - metrics := abm.processMonitor.GetCurrentMetrics() - if len(metrics) == 0 { - return // No metrics available - } - - // Calculate system-wide CPU and memory usage - totalCPU := 0.0 - totalMemory := 0.0 - processCount := 0 - - for _, metric := range metrics { - totalCPU += metric.CPUPercent - totalMemory += metric.MemoryPercent - processCount++ - } - - if processCount == 0 { - return - } - - // Store system metrics atomically - systemCPU := totalCPU // Total CPU across all monitored processes - systemMemory := totalMemory / float64(processCount) // Average memory usage + // Use fixed system metrics since monitoring is simplified + systemCPU := 50.0 // Assume moderate CPU usage + systemMemory := 60.0 // Assume moderate memory usage atomic.StoreInt64(&abm.systemCPUPercent, int64(systemCPU*100)) atomic.StoreInt64(&abm.systemMemoryPercent, int64(systemMemory*100)) diff --git a/internal/audio/core_config_constants.go b/internal/audio/core_config_constants.go index 5bdaefe7..6a2a20e9 100644 --- a/internal/audio/core_config_constants.go +++ b/internal/audio/core_config_constants.go @@ -117,7 +117,6 @@ type AudioConfigConstants struct { // Buffer Management - PreallocSize int MaxPoolSize int MessagePoolSize int OptimalSocketBuffer int @@ -131,27 +130,27 @@ type AudioConfigConstants struct { MinReadEncodeBuffer int MaxDecodeWriteBuffer int MinBatchSizeForThreadPinning int - GoroutineMonitorInterval time.Duration - MagicNumber uint32 - MaxFrameSize int - WriteTimeout time.Duration - HeaderSize int - MetricsUpdateInterval time.Duration - WarmupSamples int - MetricsChannelBuffer int - LatencyHistorySize int - MaxCPUPercent float64 - MinCPUPercent float64 - DefaultClockTicks float64 - DefaultMemoryGB int - MaxWarmupSamples int - WarmupCPUSamples int - LogThrottleIntervalSec int - MinValidClockTicks int - MaxValidClockTicks int - CPUFactor float64 - MemoryFactor float64 - LatencyFactor float64 + + MagicNumber uint32 + MaxFrameSize int + WriteTimeout time.Duration + HeaderSize int + MetricsUpdateInterval time.Duration + WarmupSamples int + MetricsChannelBuffer int + LatencyHistorySize int + MaxCPUPercent float64 + MinCPUPercent float64 + DefaultClockTicks float64 + DefaultMemoryGB int + MaxWarmupSamples int + WarmupCPUSamples int + LogThrottleIntervalSec int + MinValidClockTicks int + MaxValidClockTicks int + CPUFactor float64 + MemoryFactor float64 + LatencyFactor float64 // Adaptive Buffer Configuration AdaptiveMinBufferSize int // Minimum buffer size in frames for adaptive buffering @@ -172,28 +171,25 @@ type AudioConfigConstants struct { OutputSupervisorTimeout time.Duration // 5s BatchProcessingDelay time.Duration // 10ms - AdaptiveOptimizerStability time.Duration // 10s - LatencyMonitorTarget time.Duration // 50ms - // Adaptive Buffer Configuration // LowCPUThreshold defines CPU usage threshold for buffer size reduction. LowCPUThreshold float64 // 20% CPU threshold for buffer optimization // HighCPUThreshold defines CPU usage threshold for buffer size increase. - HighCPUThreshold float64 // 60% CPU threshold - LowMemoryThreshold float64 // 50% memory threshold - HighMemoryThreshold float64 // 75% memory threshold - AdaptiveBufferTargetLatency time.Duration // 20ms target latency - CooldownPeriod time.Duration // 30s cooldown period - RollbackThreshold time.Duration // 300ms rollback threshold - AdaptiveOptimizerLatencyTarget time.Duration // 50ms latency target - MaxLatencyThreshold time.Duration // 200ms max latency - JitterThreshold time.Duration // 20ms jitter threshold - LatencyOptimizationInterval time.Duration // 5s optimization interval - LatencyAdaptiveThreshold float64 // 0.8 adaptive threshold - MicContentionTimeout time.Duration // 200ms contention timeout - PreallocPercentage int // 20% preallocation percentage - BackoffStart time.Duration // 50ms initial backoff + HighCPUThreshold float64 // 60% CPU threshold + LowMemoryThreshold float64 // 50% memory threshold + HighMemoryThreshold float64 // 75% memory threshold + AdaptiveBufferTargetLatency time.Duration // 20ms target latency + CooldownPeriod time.Duration // 30s cooldown period + RollbackThreshold time.Duration // 300ms rollback threshold + + MaxLatencyThreshold time.Duration // 200ms max latency + JitterThreshold time.Duration // 20ms jitter threshold + LatencyOptimizationInterval time.Duration // 5s optimization interval + LatencyAdaptiveThreshold float64 // 0.8 adaptive threshold + MicContentionTimeout time.Duration // 200ms contention timeout + PreallocPercentage int // 20% preallocation percentage + BackoffStart time.Duration // 50ms initial backoff InputMagicNumber uint32 // Magic number for input IPC messages (0x4A4B4D49 "JKMI") @@ -214,29 +210,8 @@ type AudioConfigConstants struct { // CGO Audio Processing Constants CGOUsleepMicroseconds int // Sleep duration for CGO usleep calls (1000μs) - CGOPCMBufferSize int // PCM buffer size for CGO audio processing - CGONanosecondsPerSecond float64 // Nanoseconds per second conversion - FrontendOperationDebounceMS int // Frontend operation debounce delay - FrontendSyncDebounceMS int // Frontend sync debounce delay - FrontendSampleRate int // Frontend sample rate - FrontendRetryDelayMS int // Frontend retry delay - FrontendShortDelayMS int // Frontend short delay - FrontendLongDelayMS int // Frontend long delay - FrontendSyncDelayMS int // Frontend sync delay - FrontendMaxRetryAttempts int // Frontend max retry attempts - FrontendAudioLevelUpdateMS int // Frontend audio level update interval - FrontendFFTSize int // Frontend FFT size - FrontendAudioLevelMax int // Frontend max audio level - FrontendReconnectIntervalMS int // Frontend reconnect interval - FrontendSubscriptionDelayMS int // Frontend subscription delay - FrontendDebugIntervalMS int // Frontend debug interval - - // Process Monitoring Constants - ProcessMonitorDefaultMemoryGB int // Default memory size for fallback (4GB) - ProcessMonitorKBToBytes int // KB to bytes conversion factor (1024) - ProcessMonitorDefaultClockHz float64 // Default system clock frequency (250.0 Hz) - ProcessMonitorFallbackClockHz float64 // Fallback clock frequency (1000.0 Hz) - ProcessMonitorTraditionalHz float64 // Traditional system clock frequency (100.0 Hz) + CGOPCMBufferSize int // PCM buffer size for CGO audio processing + CGONanosecondsPerSecond float64 // Nanoseconds per second conversion // Batch Processing Constants BatchProcessorFramesPerBatch int // Frames processed per batch (4) @@ -272,14 +247,21 @@ type AudioConfigConstants struct { LatencyPercentile50 int LatencyPercentile95 int LatencyPercentile99 int - BufferPoolMaxOperations int - HitRateCalculationBase float64 - MaxLatency time.Duration - MinMetricsUpdateInterval time.Duration - MaxMetricsUpdateInterval time.Duration - MinSampleRate int - MaxSampleRate int - MaxChannels int + + // Buffer Pool Configuration + BufferPoolDefaultSize int // Default buffer pool size when MaxPoolSize is invalid + BufferPoolControlSize int // Control buffer pool size + ZeroCopyPreallocSizeBytes int // Zero-copy frame pool preallocation size in bytes + ZeroCopyMinPreallocFrames int // Minimum preallocated frames for zero-copy pool + BufferPoolHitRateBase float64 // Base for hit rate percentage calculation + + HitRateCalculationBase float64 + MaxLatency time.Duration + MinMetricsUpdateInterval time.Duration + MaxMetricsUpdateInterval time.Duration + MinSampleRate int + MaxSampleRate int + MaxChannels int // CGO Constants CGOMaxBackoffMicroseconds int // Maximum CGO backoff time (500ms) @@ -329,26 +311,6 @@ type AudioConfigConstants struct { QualityChangeSettleDelay time.Duration // Delay for quality change to settle QualityChangeRecoveryDelay time.Duration // Delay before attempting recovery - // Buffer Pool Cache Configuration - BufferPoolCacheSize int // Buffers per goroutine cache (4) - BufferPoolCacheTTL time.Duration // Cache TTL for aggressive cleanup (5s) - BufferPoolMaxCacheEntries int // Maximum cache entries to prevent memory bloat (128) - BufferPoolCacheCleanupInterval time.Duration // Cleanup interval for frequent cleanup (15s) - BufferPoolCacheWarmupThreshold int // Warmup threshold for faster startup (25) - BufferPoolCacheHitRateTarget float64 // Target hit rate for balanced performance (0.80) - BufferPoolMaxCacheSize int // Maximum goroutine caches (256) - BufferPoolCleanupInterval int64 // Cleanup interval in seconds (15) - BufferPoolBufferTTL int64 // Buffer TTL in seconds (30) - BufferPoolControlSize int // Control pool buffer size (512) - BufferPoolMinPreallocBuffers int // Minimum preallocation buffers - BufferPoolMaxPoolSize int // Maximum pool size - BufferPoolChunkBufferCount int // Buffers per chunk - BufferPoolMinChunkSize int // Minimum chunk size (64KB) - BufferPoolInitialChunkCapacity int // Initial chunk capacity - BufferPoolAdaptiveResizeThreshold int // Threshold for adaptive resize - BufferPoolHighHitRateThreshold float64 // High hit rate threshold - BufferPoolOptimizeCacheThreshold int // Threshold for cache optimization - BufferPoolCounterResetThreshold int // Counter reset threshold } // DefaultAudioConfig returns the default configuration constants @@ -458,7 +420,7 @@ func DefaultAudioConfig() *AudioConfigConstants { MaxRestartDelay: 30 * time.Second, // Maximum delay for exponential backoff // Buffer Management - PreallocSize: 1024 * 1024, // 1MB buffer preallocation + MaxPoolSize: 100, // Maximum object pool size MessagePoolSize: 1024, // Significantly increased message pool for quality change bursts OptimalSocketBuffer: 262144, // 256KB optimal socket buffer @@ -521,39 +483,15 @@ func DefaultAudioConfig() *AudioConfigConstants { QualityChangeSettleDelay: 2 * time.Second, // Delay for quality change to settle QualityChangeRecoveryDelay: 1 * time.Second, // Delay before attempting recovery - // Buffer Pool Cache Configuration - BufferPoolCacheSize: 4, // Buffers per goroutine cache - BufferPoolCacheTTL: 5 * time.Second, // Cache TTL for aggressive cleanup - BufferPoolMaxCacheEntries: 128, // Maximum cache entries to prevent memory bloat - BufferPoolCacheCleanupInterval: 15 * time.Second, // Cleanup interval for frequent cleanup - BufferPoolCacheWarmupThreshold: 25, // Warmup threshold for faster startup - BufferPoolCacheHitRateTarget: 0.80, // Target hit rate for balanced performance - BufferPoolMaxCacheSize: 256, // Maximum goroutine caches - BufferPoolCleanupInterval: 15, // Cleanup interval in seconds - BufferPoolBufferTTL: 30, // Buffer TTL in seconds - BufferPoolControlSize: 512, // Control pool buffer size - BufferPoolMinPreallocBuffers: 16, // Minimum preallocation buffers (reduced from 50) - BufferPoolMaxPoolSize: 128, // Maximum pool size (reduced from 256) - BufferPoolChunkBufferCount: 8, // Buffers per chunk (reduced from 64 to prevent large allocations) - BufferPoolMinChunkSize: 8192, // Minimum chunk size (8KB, reduced from 64KB) - BufferPoolInitialChunkCapacity: 4, // Initial chunk capacity - BufferPoolAdaptiveResizeThreshold: 100, // Threshold for adaptive resize - BufferPoolHighHitRateThreshold: 0.95, // High hit rate threshold - BufferPoolOptimizeCacheThreshold: 100, // Threshold for cache optimization - BufferPoolCounterResetThreshold: 10000, // Counter reset threshold - // Timing Constants - Optimized for quality change stability - DefaultSleepDuration: 100 * time.Millisecond, // Balanced polling interval - ShortSleepDuration: 10 * time.Millisecond, // Balanced high-frequency polling - LongSleepDuration: 200 * time.Millisecond, // Balanced background task delay - DefaultTickerInterval: 100 * time.Millisecond, // Balanced periodic task interval - BufferUpdateInterval: 250 * time.Millisecond, // Faster buffer size update frequency - InputSupervisorTimeout: 5 * time.Second, // Input monitoring timeout - OutputSupervisorTimeout: 5 * time.Second, // Output monitoring timeout - BatchProcessingDelay: 5 * time.Millisecond, // Reduced batch processing delay - AdaptiveOptimizerStability: 5 * time.Second, // Faster adaptive stability period - - LatencyMonitorTarget: 50 * time.Millisecond, // Balanced target latency for monitoring + DefaultSleepDuration: 100 * time.Millisecond, // Balanced polling interval + ShortSleepDuration: 10 * time.Millisecond, // Balanced high-frequency polling + LongSleepDuration: 200 * time.Millisecond, // Balanced background task delay + DefaultTickerInterval: 100 * time.Millisecond, // Balanced periodic task interval + BufferUpdateInterval: 250 * time.Millisecond, // Faster buffer size update frequency + InputSupervisorTimeout: 5 * time.Second, // Input monitoring timeout + OutputSupervisorTimeout: 5 * time.Second, // Output monitoring timeout + BatchProcessingDelay: 5 * time.Millisecond, // Reduced batch processing delay // Adaptive Buffer Configuration - Optimized for single-core RV1106G3 LowCPUThreshold: 0.40, // Adjusted for single-core ARM system @@ -568,9 +506,8 @@ func DefaultAudioConfig() *AudioConfigConstants { AdaptiveDefaultBufferSize: 512, // Higher default for stability during bursts // Adaptive Optimizer Configuration - Faster response - CooldownPeriod: 15 * time.Second, // Reduced cooldown period - RollbackThreshold: 200 * time.Millisecond, // Lower rollback threshold - AdaptiveOptimizerLatencyTarget: 30 * time.Millisecond, // Reduced latency target + CooldownPeriod: 15 * time.Second, // Reduced cooldown period + RollbackThreshold: 200 * time.Millisecond, // Lower rollback threshold // Latency Monitor Configuration - More aggressive monitoring MaxLatencyThreshold: 150 * time.Millisecond, // Lower max latency threshold @@ -609,29 +546,6 @@ func DefaultAudioConfig() *AudioConfigConstants { CGOPCMBufferSize: 1920, // 1920 samples for PCM buffer (max 2ch*960) CGONanosecondsPerSecond: 1000000000.0, // 1000000000.0 for nanosecond conversions - // Frontend Constants - Balanced for stability - FrontendOperationDebounceMS: 1000, // 1000ms debounce for frontend operations - FrontendSyncDebounceMS: 1000, // 1000ms debounce for sync operations - FrontendSampleRate: 48000, // 48000Hz sample rate for frontend audio - FrontendRetryDelayMS: 500, // 500ms retry delay - FrontendShortDelayMS: 200, // 200ms short delay - FrontendLongDelayMS: 300, // 300ms long delay - FrontendSyncDelayMS: 500, // 500ms sync delay - FrontendMaxRetryAttempts: 3, // 3 maximum retry attempts - FrontendAudioLevelUpdateMS: 100, // 100ms audio level update interval - FrontendFFTSize: 256, // 256 FFT size for audio analysis - FrontendAudioLevelMax: 100, // 100 maximum audio level - FrontendReconnectIntervalMS: 3000, // 3000ms reconnect interval - FrontendSubscriptionDelayMS: 100, // 100ms subscription delay - FrontendDebugIntervalMS: 5000, // 5000ms debug interval - - // Process Monitor Constants - ProcessMonitorDefaultMemoryGB: 4, // 4GB default memory for fallback - ProcessMonitorKBToBytes: 1024, // 1024 conversion factor - ProcessMonitorDefaultClockHz: 250.0, // 250.0 Hz default for ARM systems - ProcessMonitorFallbackClockHz: 1000.0, // 1000.0 Hz fallback clock - ProcessMonitorTraditionalHz: 100.0, // 100.0 Hz traditional clock - // Batch Processing Constants - Optimized for quality change bursts BatchProcessorFramesPerBatch: 16, // Larger batches for quality changes BatchProcessorTimeout: 20 * time.Millisecond, // Longer timeout for bursts @@ -686,9 +600,15 @@ func DefaultAudioConfig() *AudioConfigConstants { LatencyPercentile95: 95, // 95th percentile calculation factor LatencyPercentile99: 99, // 99th percentile calculation factor + // Buffer Pool Configuration + BufferPoolDefaultSize: 64, // Default buffer pool size when MaxPoolSize is invalid + BufferPoolControlSize: 512, // Control buffer pool size + ZeroCopyPreallocSizeBytes: 1024 * 1024, // Zero-copy frame pool preallocation size in bytes (1MB) + ZeroCopyMinPreallocFrames: 1, // Minimum preallocated frames for zero-copy pool + BufferPoolHitRateBase: 100.0, // Base for hit rate percentage calculation + // Buffer Pool Efficiency Constants - BufferPoolMaxOperations: 1000, // 1000 operations for efficiency tracking - HitRateCalculationBase: 100.0, // 100.0 base for hit rate percentage calculation + HitRateCalculationBase: 100.0, // 100.0 base for hit rate percentage calculation // Validation Constants MaxLatency: 500 * time.Millisecond, // 500ms maximum allowed latency @@ -733,9 +653,6 @@ func DefaultAudioConfig() *AudioConfigConstants { // Batch Audio Processing Configuration MinBatchSizeForThreadPinning: 5, // Minimum batch size to pin thread - // Goroutine Monitoring Configuration - GoroutineMonitorInterval: 30 * time.Second, // 30s monitoring interval - // Performance Configuration Flags - Production optimizations } diff --git a/internal/audio/core_metrics.go b/internal/audio/core_metrics.go index 03fafae6..a0dc9886 100644 --- a/internal/audio/core_metrics.go +++ b/internal/audio/core_metrics.go @@ -158,78 +158,6 @@ var ( }, ) - // Audio subprocess process metrics - audioProcessCpuPercent = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_process_cpu_percent", - Help: "CPU usage percentage of audio output subprocess", - }, - ) - - audioProcessMemoryPercent = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_process_memory_percent", - Help: "Memory usage percentage of audio output subprocess", - }, - ) - - audioProcessMemoryRssBytes = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_process_memory_rss_bytes", - Help: "RSS memory usage in bytes of audio output subprocess", - }, - ) - - audioProcessMemoryVmsBytes = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_process_memory_vms_bytes", - Help: "VMS memory usage in bytes of audio output subprocess", - }, - ) - - audioProcessRunning = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_process_running", - Help: "Whether audio output subprocess is running (1=running, 0=stopped)", - }, - ) - - // Microphone subprocess process metrics - microphoneProcessCpuPercent = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "jetkvm_microphone_process_cpu_percent", - Help: "CPU usage percentage of microphone input subprocess", - }, - ) - - microphoneProcessMemoryPercent = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "jetkvm_microphone_process_memory_percent", - Help: "Memory usage percentage of microphone input subprocess", - }, - ) - - microphoneProcessMemoryRssBytes = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "jetkvm_microphone_process_memory_rss_bytes", - Help: "RSS memory usage in bytes of microphone input subprocess", - }, - ) - - microphoneProcessMemoryVmsBytes = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "jetkvm_microphone_process_memory_vms_bytes", - Help: "VMS memory usage in bytes of microphone input subprocess", - }, - ) - - microphoneProcessRunning = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "jetkvm_microphone_process_running", - Help: "Whether microphone input subprocess is running (1=running, 0=stopped)", - }, - ) - // Device health metrics // Removed device health metrics - functionality not used @@ -446,42 +374,6 @@ func UpdateMicrophoneMetrics(metrics UnifiedAudioMetrics) { atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) } -// UpdateAudioProcessMetrics updates Prometheus metrics with audio subprocess data -func UpdateAudioProcessMetrics(metrics ProcessMetrics, isRunning bool) { - metricsUpdateMutex.Lock() - defer metricsUpdateMutex.Unlock() - - audioProcessCpuPercent.Set(metrics.CPUPercent) - audioProcessMemoryPercent.Set(metrics.MemoryPercent) - audioProcessMemoryRssBytes.Set(float64(metrics.MemoryRSS)) - audioProcessMemoryVmsBytes.Set(float64(metrics.MemoryVMS)) - if isRunning { - audioProcessRunning.Set(1) - } else { - audioProcessRunning.Set(0) - } - - atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) -} - -// UpdateMicrophoneProcessMetrics updates Prometheus metrics with microphone subprocess data -func UpdateMicrophoneProcessMetrics(metrics ProcessMetrics, isRunning bool) { - metricsUpdateMutex.Lock() - defer metricsUpdateMutex.Unlock() - - microphoneProcessCpuPercent.Set(metrics.CPUPercent) - microphoneProcessMemoryPercent.Set(metrics.MemoryPercent) - microphoneProcessMemoryRssBytes.Set(float64(metrics.MemoryRSS)) - microphoneProcessMemoryVmsBytes.Set(float64(metrics.MemoryVMS)) - if isRunning { - microphoneProcessRunning.Set(1) - } else { - microphoneProcessRunning.Set(0) - } - - atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) -} - // UpdateAdaptiveBufferMetrics updates Prometheus metrics with adaptive buffer information func UpdateAdaptiveBufferMetrics(inputBufferSize, outputBufferSize int, cpuPercent, memoryPercent float64, adjustmentMade bool) { metricsUpdateMutex.Lock() diff --git a/internal/audio/core_validation.go b/internal/audio/core_validation.go index 4f5edb09..5836abdd 100644 --- a/internal/audio/core_validation.go +++ b/internal/audio/core_validation.go @@ -218,32 +218,6 @@ func ValidateOutputIPCConfig(sampleRate, channels, frameSize int) error { return nil } -// ValidateLatencyConfig validates latency monitor configuration -func ValidateLatencyConfig(config LatencyConfig) error { - if err := ValidateLatency(config.TargetLatency); err != nil { - return err - } - if err := ValidateLatency(config.MaxLatency); err != nil { - return err - } - if config.TargetLatency >= Config.MaxLatency { - return ErrInvalidLatency - } - if err := ValidateMetricsInterval(config.OptimizationInterval); err != nil { - return err - } - if config.HistorySize <= 0 { - return ErrInvalidBufferSize - } - if config.JitterThreshold < 0 { - return ErrInvalidLatency - } - if config.AdaptiveThreshold < 0 || config.AdaptiveThreshold > 1.0 { - return ErrInvalidConfiguration - } - return nil -} - // ValidateSampleRate validates audio sample rate values // Optimized to use AudioConfigCache for frequently accessed values func ValidateSampleRate(sampleRate int) error { diff --git a/internal/audio/input_microphone_manager.go b/internal/audio/input_microphone_manager.go index 5178f9f3..0eaa052f 100644 --- a/internal/audio/input_microphone_manager.go +++ b/internal/audio/input_microphone_manager.go @@ -115,7 +115,6 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error { Msg("High audio processing latency detected") // Record latency for goroutine cleanup optimization - RecordAudioLatency(latencyMs) } if err != nil { @@ -156,7 +155,6 @@ func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame) Msg("High audio processing latency detected") // Record latency for goroutine cleanup optimization - RecordAudioLatency(latencyMs) } if err != nil { diff --git a/internal/audio/input_supervisor.go b/internal/audio/input_supervisor.go index 70b63c88..3e7f499c 100644 --- a/internal/audio/input_supervisor.go +++ b/internal/audio/input_supervisor.go @@ -135,7 +135,6 @@ func (ais *AudioInputSupervisor) startProcess() error { ais.logger.Info().Int("pid", ais.processPID).Strs("args", args).Strs("opus_env", ais.opusEnv).Msg("audio input server process started") // Add process to monitoring - ais.processMonitor.AddProcess(ais.processPID, "audio-input-server") // Connect client to the server go ais.connectClient() diff --git a/internal/audio/ipc_unified.go b/internal/audio/ipc_unified.go index dec68352..4ff6eea9 100644 --- a/internal/audio/ipc_unified.go +++ b/internal/audio/ipc_unified.go @@ -117,10 +117,6 @@ type UnifiedAudioServer struct { socketPath string magicNumber uint32 socketBufferConfig SocketBufferConfig - - // Performance monitoring - latencyMonitor *LatencyMonitor - adaptiveOptimizer *AdaptiveOptimizer } // NewUnifiedAudioServer creates a new unified audio server @@ -148,8 +144,6 @@ func NewUnifiedAudioServer(isInput bool) (*UnifiedAudioServer, error) { messageChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize), processChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize), socketBufferConfig: DefaultSocketBufferConfig(), - latencyMonitor: nil, - adaptiveOptimizer: nil, } return server, nil @@ -365,10 +359,6 @@ func (s *UnifiedAudioServer) SendFrame(frame []byte) error { } // Record latency for monitoring - if s.latencyMonitor != nil { - writeLatency := time.Since(start) - s.latencyMonitor.RecordLatency(writeLatency, "ipc_write") - } atomic.AddInt64(&s.totalFrames, 1) return nil diff --git a/internal/audio/mgmt_base_supervisor.go b/internal/audio/mgmt_base_supervisor.go index 19327b5c..bface756 100644 --- a/internal/audio/mgmt_base_supervisor.go +++ b/internal/audio/mgmt_base_supervisor.go @@ -28,7 +28,6 @@ type BaseSupervisor struct { processPID int // Process monitoring - processMonitor *ProcessMonitor // Exit tracking lastExitCode int @@ -45,10 +44,10 @@ type BaseSupervisor struct { func NewBaseSupervisor(componentName string) *BaseSupervisor { logger := logging.GetDefaultLogger().With().Str("component", componentName).Logger() return &BaseSupervisor{ - logger: &logger, - processMonitor: GetProcessMonitor(), - stopChan: make(chan struct{}), - processDone: make(chan struct{}), + logger: &logger, + + stopChan: make(chan struct{}), + processDone: make(chan struct{}), } } @@ -211,7 +210,6 @@ func (bs *BaseSupervisor) waitForProcessExit(processType string) { bs.mutex.Unlock() // Remove process from monitoring - bs.processMonitor.RemoveProcess(pid) if exitCode != 0 { bs.logger.Error().Int("pid", pid).Int("exit_code", exitCode).Msgf("%s process exited with error", processType) diff --git a/internal/audio/monitor_adaptive_optimizer.go b/internal/audio/monitor_adaptive_optimizer.go deleted file mode 100644 index fe0b81f0..00000000 --- a/internal/audio/monitor_adaptive_optimizer.go +++ /dev/null @@ -1,329 +0,0 @@ -package audio - -import ( - "context" - "sync" - "sync/atomic" - "time" - - "github.com/rs/zerolog" -) - -// AdaptiveOptimizer automatically adjusts audio parameters based on latency metrics -type AdaptiveOptimizer struct { - // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) - optimizationCount int64 // Number of optimizations performed (atomic) - lastOptimization int64 // Timestamp of last optimization (atomic) - optimizationLevel int64 // Current optimization level (0-10) (atomic) - stabilityScore int64 // Current stability score (0-100) (atomic) - optimizationInterval int64 // Current optimization interval in nanoseconds (atomic) - - latencyMonitor *LatencyMonitor - bufferManager *AdaptiveBufferManager - logger zerolog.Logger - - // Control channels - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - - // Configuration - config OptimizerConfig - - // Stability tracking - stabilityHistory []StabilityMetric - stabilityMutex sync.RWMutex -} - -// StabilityMetric tracks system stability over time -type StabilityMetric struct { - Timestamp time.Time - LatencyStdev float64 - CPUVariance float64 - MemoryStable bool - ErrorRate float64 - StabilityScore int -} - -// OptimizerConfig holds configuration for the adaptive optimizer -type OptimizerConfig struct { - MaxOptimizationLevel int // Maximum optimization level (0-10) - CooldownPeriod time.Duration // Minimum time between optimizations - Aggressiveness float64 // How aggressively to optimize (0.0-1.0) - RollbackThreshold time.Duration // Latency threshold to rollback optimizations - StabilityPeriod time.Duration // Time to wait for stability after optimization - - // Adaptive interval configuration - MinOptimizationInterval time.Duration // Minimum optimization interval (high stability) - MaxOptimizationInterval time.Duration // Maximum optimization interval (low stability) - StabilityThreshold int // Stability score threshold for interval adjustment - StabilityHistorySize int // Number of stability metrics to track -} - -// DefaultOptimizerConfig returns a sensible default configuration -func DefaultOptimizerConfig() OptimizerConfig { - return OptimizerConfig{ - MaxOptimizationLevel: 8, - CooldownPeriod: Config.CooldownPeriod, - Aggressiveness: Config.OptimizerAggressiveness, - RollbackThreshold: Config.RollbackThreshold, - StabilityPeriod: Config.AdaptiveOptimizerStability, - - // Adaptive interval defaults - MinOptimizationInterval: 100 * time.Millisecond, // High stability: check every 100ms - MaxOptimizationInterval: 2 * time.Second, // Low stability: check every 2s - StabilityThreshold: 70, // Stability score threshold - StabilityHistorySize: 20, // Track last 20 stability metrics - } -} - -// NewAdaptiveOptimizer creates a new adaptive optimizer -func NewAdaptiveOptimizer(latencyMonitor *LatencyMonitor, bufferManager *AdaptiveBufferManager, config OptimizerConfig, logger zerolog.Logger) *AdaptiveOptimizer { - ctx, cancel := context.WithCancel(context.Background()) - - optimizer := &AdaptiveOptimizer{ - latencyMonitor: latencyMonitor, - bufferManager: bufferManager, - config: config, - logger: logger.With().Str("component", "adaptive-optimizer").Logger(), - ctx: ctx, - cancel: cancel, - stabilityHistory: make([]StabilityMetric, 0, config.StabilityHistorySize), - } - - // Initialize stability score and optimization interval - atomic.StoreInt64(&optimizer.stabilityScore, 50) // Start with medium stability - atomic.StoreInt64(&optimizer.optimizationInterval, int64(config.MaxOptimizationInterval)) - - // Register as latency monitor callback - latencyMonitor.AddOptimizationCallback(optimizer.handleLatencyOptimization) - - return optimizer -} - -// Start begins the adaptive optimization process -func (ao *AdaptiveOptimizer) Start() { - ao.wg.Add(1) - go ao.optimizationLoop() - ao.logger.Debug().Msg("adaptive optimizer started") -} - -// Stop stops the adaptive optimizer -func (ao *AdaptiveOptimizer) Stop() { - ao.cancel() - ao.wg.Wait() - ao.logger.Debug().Msg("adaptive optimizer stopped") -} - -// initializeStrategies sets up the available optimization strategies - -// handleLatencyOptimization is called when latency optimization is needed -func (ao *AdaptiveOptimizer) handleLatencyOptimization(metrics LatencyMetrics) error { - currentLevel := atomic.LoadInt64(&ao.optimizationLevel) - lastOpt := atomic.LoadInt64(&ao.lastOptimization) - - // Check cooldown period - if time.Since(time.Unix(0, lastOpt)) < ao.config.CooldownPeriod { - return nil - } - - // Determine if we need to increase or decrease optimization level - targetLevel := ao.calculateTargetOptimizationLevel(metrics) - - if targetLevel > currentLevel { - return ao.increaseOptimization(int(targetLevel)) - } else if targetLevel < currentLevel { - return ao.decreaseOptimization(int(targetLevel)) - } - - return nil -} - -// calculateTargetOptimizationLevel determines the appropriate optimization level -func (ao *AdaptiveOptimizer) calculateTargetOptimizationLevel(metrics LatencyMetrics) int64 { - // Base calculation on current latency vs target - latencyRatio := float64(metrics.Current) / float64(Config.AdaptiveOptimizerLatencyTarget) // 50ms target - - // Adjust based on trend - switch metrics.Trend { - case LatencyTrendIncreasing: - latencyRatio *= 1.2 // Be more aggressive - case LatencyTrendDecreasing: - latencyRatio *= 0.8 // Be less aggressive - case LatencyTrendVolatile: - latencyRatio *= 1.1 // Slightly more aggressive - } - - // Apply aggressiveness factor - latencyRatio *= ao.config.Aggressiveness - - // Convert to optimization level - targetLevel := int64(latencyRatio * Config.LatencyScalingFactor) // Scale to 0-10 range - if targetLevel > int64(ao.config.MaxOptimizationLevel) { - targetLevel = int64(ao.config.MaxOptimizationLevel) - } - if targetLevel < 0 { - targetLevel = 0 - } - - return targetLevel -} - -// increaseOptimization applies optimization strategies up to the target level -func (ao *AdaptiveOptimizer) increaseOptimization(targetLevel int) error { - atomic.StoreInt64(&ao.optimizationLevel, int64(targetLevel)) - atomic.StoreInt64(&ao.lastOptimization, time.Now().UnixNano()) - atomic.AddInt64(&ao.optimizationCount, 1) - - return nil -} - -// decreaseOptimization rolls back optimization strategies to the target level -func (ao *AdaptiveOptimizer) decreaseOptimization(targetLevel int) error { - atomic.StoreInt64(&ao.optimizationLevel, int64(targetLevel)) - atomic.StoreInt64(&ao.lastOptimization, time.Now().UnixNano()) - - return nil -} - -// optimizationLoop runs the main optimization monitoring loop -func (ao *AdaptiveOptimizer) optimizationLoop() { - defer ao.wg.Done() - - // Start with initial interval - currentInterval := time.Duration(atomic.LoadInt64(&ao.optimizationInterval)) - ticker := time.NewTicker(currentInterval) - defer ticker.Stop() - - for { - select { - case <-ao.ctx.Done(): - return - case <-ticker.C: - // Update stability metrics and check for optimization needs - ao.updateStabilityMetrics() - ao.checkStability() - - // Adjust optimization interval based on current stability - newInterval := ao.calculateOptimizationInterval() - if newInterval != currentInterval { - currentInterval = newInterval - ticker.Reset(currentInterval) - ao.logger.Debug().Dur("new_interval", currentInterval).Int64("stability_score", atomic.LoadInt64(&ao.stabilityScore)).Msg("adjusted optimization interval") - } - } - } -} - -// checkStability monitors system stability and rolls back if needed -func (ao *AdaptiveOptimizer) checkStability() { - metrics := ao.latencyMonitor.GetMetrics() - - // Check if we need to rollback due to excessive latency - if metrics.Current > ao.config.RollbackThreshold { - currentLevel := int(atomic.LoadInt64(&ao.optimizationLevel)) - if currentLevel > 0 { - ao.logger.Warn().Dur("current_latency", metrics.Current).Dur("threshold", ao.config.RollbackThreshold).Msg("rolling back optimizations due to excessive latency") - if err := ao.decreaseOptimization(currentLevel - 1); err != nil { - ao.logger.Error().Err(err).Msg("failed to decrease optimization level") - } - } - } -} - -// updateStabilityMetrics calculates and stores current system stability metrics -func (ao *AdaptiveOptimizer) updateStabilityMetrics() { - metrics := ao.latencyMonitor.GetMetrics() - - // Calculate stability score based on multiple factors - stabilityScore := ao.calculateStabilityScore(metrics) - atomic.StoreInt64(&ao.stabilityScore, int64(stabilityScore)) - - // Store stability metric in history - stabilityMetric := StabilityMetric{ - Timestamp: time.Now(), - LatencyStdev: float64(metrics.Jitter), // Use Jitter as variance indicator - CPUVariance: 0.0, // TODO: Get from system metrics - MemoryStable: true, // TODO: Get from system metrics - ErrorRate: 0.0, // TODO: Get from error tracking - StabilityScore: stabilityScore, - } - - ao.stabilityMutex.Lock() - ao.stabilityHistory = append(ao.stabilityHistory, stabilityMetric) - if len(ao.stabilityHistory) > ao.config.StabilityHistorySize { - ao.stabilityHistory = ao.stabilityHistory[1:] - } - ao.stabilityMutex.Unlock() -} - -// calculateStabilityScore computes a stability score (0-100) based on system metrics -func (ao *AdaptiveOptimizer) calculateStabilityScore(metrics LatencyMetrics) int { - // Base score starts at 100 (perfect stability) - score := 100.0 - - // Penalize high jitter (latency variance) - if metrics.Jitter > 0 && metrics.Average > 0 { - jitterRatio := float64(metrics.Jitter) / float64(metrics.Average) - variancePenalty := jitterRatio * 50 // Scale jitter impact - score -= variancePenalty - } - - // Penalize latency trend volatility - switch metrics.Trend { - case LatencyTrendVolatile: - score -= 20 - case LatencyTrendIncreasing: - score -= 10 - case LatencyTrendDecreasing: - score += 5 // Slight bonus for improving latency - } - - // Ensure score is within bounds - if score < 0 { - score = 0 - } - if score > 100 { - score = 100 - } - - return int(score) -} - -// calculateOptimizationInterval determines the optimization interval based on stability -func (ao *AdaptiveOptimizer) calculateOptimizationInterval() time.Duration { - stabilityScore := atomic.LoadInt64(&ao.stabilityScore) - - // High stability = shorter intervals (more frequent optimization) - // Low stability = longer intervals (less frequent optimization) - if stabilityScore >= int64(ao.config.StabilityThreshold) { - // High stability: use minimum interval - interval := ao.config.MinOptimizationInterval - atomic.StoreInt64(&ao.optimizationInterval, int64(interval)) - return interval - } else { - // Low stability: scale interval based on stability score - // Lower stability = longer intervals - stabilityRatio := float64(stabilityScore) / float64(ao.config.StabilityThreshold) - minInterval := float64(ao.config.MinOptimizationInterval) - maxInterval := float64(ao.config.MaxOptimizationInterval) - - // Linear interpolation between min and max intervals - interval := time.Duration(minInterval + (maxInterval-minInterval)*(1.0-stabilityRatio)) - atomic.StoreInt64(&ao.optimizationInterval, int64(interval)) - return interval - } -} - -// GetOptimizationStats returns current optimization statistics -func (ao *AdaptiveOptimizer) GetOptimizationStats() map[string]interface{} { - return map[string]interface{}{ - "optimization_level": atomic.LoadInt64(&ao.optimizationLevel), - "optimization_count": atomic.LoadInt64(&ao.optimizationCount), - "last_optimization": time.Unix(0, atomic.LoadInt64(&ao.lastOptimization)), - "stability_score": atomic.LoadInt64(&ao.stabilityScore), - "optimization_interval": time.Duration(atomic.LoadInt64(&ao.optimizationInterval)), - } -} - -// Strategy implementation methods (stubs for now) diff --git a/internal/audio/monitor_goroutine.go b/internal/audio/monitor_goroutine.go deleted file mode 100644 index fa2c8d8d..00000000 --- a/internal/audio/monitor_goroutine.go +++ /dev/null @@ -1,144 +0,0 @@ -package audio - -import ( - "runtime" - "sync/atomic" - "time" - - "github.com/jetkvm/kvm/internal/logging" -) - -// GoroutineMonitor tracks goroutine count and provides cleanup mechanisms -type GoroutineMonitor struct { - baselineCount int - peakCount int - lastCount int - monitorInterval time.Duration - lastCheck time.Time - enabled int32 -} - -// Global goroutine monitor instance -var globalGoroutineMonitor *GoroutineMonitor - -// NewGoroutineMonitor creates a new goroutine monitor -func NewGoroutineMonitor(monitorInterval time.Duration) *GoroutineMonitor { - if monitorInterval <= 0 { - monitorInterval = 30 * time.Second - } - - // Get current goroutine count as baseline - baselineCount := runtime.NumGoroutine() - - return &GoroutineMonitor{ - baselineCount: baselineCount, - peakCount: baselineCount, - lastCount: baselineCount, - monitorInterval: monitorInterval, - lastCheck: time.Now(), - } -} - -// Start begins goroutine monitoring -func (gm *GoroutineMonitor) Start() { - if !atomic.CompareAndSwapInt32(&gm.enabled, 0, 1) { - return // Already running - } - - go gm.monitorLoop() -} - -// Stop stops goroutine monitoring -func (gm *GoroutineMonitor) Stop() { - atomic.StoreInt32(&gm.enabled, 0) -} - -// monitorLoop periodically checks goroutine count -func (gm *GoroutineMonitor) monitorLoop() { - logger := logging.GetDefaultLogger().With().Str("component", "goroutine-monitor").Logger() - logger.Info().Int("baseline", gm.baselineCount).Msg("goroutine monitor started") - - for atomic.LoadInt32(&gm.enabled) == 1 { - time.Sleep(gm.monitorInterval) - gm.checkGoroutineCount() - } - - logger.Info().Msg("goroutine monitor stopped") -} - -// checkGoroutineCount checks current goroutine count and logs if it exceeds thresholds -func (gm *GoroutineMonitor) checkGoroutineCount() { - currentCount := runtime.NumGoroutine() - gm.lastCount = currentCount - - // Update peak count if needed - if currentCount > gm.peakCount { - gm.peakCount = currentCount - } - - // Calculate growth since baseline - growth := currentCount - gm.baselineCount - growthPercent := float64(growth) / float64(gm.baselineCount) * 100 - - // Log warning if growth exceeds thresholds - logger := logging.GetDefaultLogger().With().Str("component", "goroutine-monitor").Logger() - - // Different log levels based on growth severity - if growthPercent > 30 { - // Severe growth - trigger cleanup - logger.Warn().Int("current", currentCount).Int("baseline", gm.baselineCount). - Int("growth", growth).Float64("growth_percent", growthPercent). - Msg("excessive goroutine growth detected - triggering cleanup") - - // Force garbage collection to clean up unused resources - runtime.GC() - - // Force cleanup of goroutine buffer cache - cleanupGoroutineCache() - } else if growthPercent > 20 { - // Moderate growth - just log warning - logger.Warn().Int("current", currentCount).Int("baseline", gm.baselineCount). - Int("growth", growth).Float64("growth_percent", growthPercent). - Msg("significant goroutine growth detected") - } else if growthPercent > 10 { - // Minor growth - log info - logger.Info().Int("current", currentCount).Int("baseline", gm.baselineCount). - Int("growth", growth).Float64("growth_percent", growthPercent). - Msg("goroutine growth detected") - } - - // Update last check time - gm.lastCheck = time.Now() -} - -// GetGoroutineStats returns current goroutine statistics -func (gm *GoroutineMonitor) GetGoroutineStats() map[string]interface{} { - return map[string]interface{}{ - "current_count": gm.lastCount, - "baseline_count": gm.baselineCount, - "peak_count": gm.peakCount, - "growth": gm.lastCount - gm.baselineCount, - "growth_percent": float64(gm.lastCount-gm.baselineCount) / float64(gm.baselineCount) * 100, - "last_check": gm.lastCheck, - } -} - -// GetGoroutineMonitor returns the global goroutine monitor instance -func GetGoroutineMonitor() *GoroutineMonitor { - if globalGoroutineMonitor == nil { - globalGoroutineMonitor = NewGoroutineMonitor(Config.GoroutineMonitorInterval) - } - return globalGoroutineMonitor -} - -// StartGoroutineMonitoring starts the global goroutine monitor -func StartGoroutineMonitoring() { - // Goroutine monitoring disabled -} - -// StopGoroutineMonitoring stops the global goroutine monitor -func StopGoroutineMonitoring() { - if globalGoroutineMonitor != nil { - globalGoroutineMonitor.Stop() - } -} diff --git a/internal/audio/monitor_latency.go b/internal/audio/monitor_latency.go deleted file mode 100644 index e44c4c08..00000000 --- a/internal/audio/monitor_latency.go +++ /dev/null @@ -1,333 +0,0 @@ -package audio - -import ( - "context" - "sync" - "sync/atomic" - "time" - - "github.com/rs/zerolog" -) - -// LatencyMonitor tracks and optimizes audio latency in real-time -type LatencyMonitor struct { - // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) - currentLatency int64 // Current latency in nanoseconds (atomic) - averageLatency int64 // Rolling average latency in nanoseconds (atomic) - minLatency int64 // Minimum observed latency in nanoseconds (atomic) - maxLatency int64 // Maximum observed latency in nanoseconds (atomic) - latencySamples int64 // Number of latency samples collected (atomic) - jitterAccumulator int64 // Accumulated jitter for variance calculation (atomic) - lastOptimization int64 // Timestamp of last optimization in nanoseconds (atomic) - - config LatencyConfig - logger zerolog.Logger - - // Control channels - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - - // Optimization callbacks - optimizationCallbacks []OptimizationCallback - mutex sync.RWMutex - - // Performance tracking - latencyHistory []LatencyMeasurement - historyMutex sync.RWMutex -} - -// LatencyConfig holds configuration for latency monitoring -type LatencyConfig struct { - TargetLatency time.Duration // Target latency to maintain - MaxLatency time.Duration // Maximum acceptable latency - OptimizationInterval time.Duration // How often to run optimization - HistorySize int // Number of latency measurements to keep - JitterThreshold time.Duration // Jitter threshold for optimization - AdaptiveThreshold float64 // Threshold for adaptive adjustments (0.0-1.0) -} - -// LatencyMeasurement represents a single latency measurement -type LatencyMeasurement struct { - Timestamp time.Time - Latency time.Duration - Jitter time.Duration - Source string // Source of the measurement (e.g., "input", "output", "processing") -} - -// OptimizationCallback is called when latency optimization is triggered -type OptimizationCallback func(metrics LatencyMetrics) error - -// LatencyMetrics provides comprehensive latency statistics -type LatencyMetrics struct { - Current time.Duration - Average time.Duration - Min time.Duration - Max time.Duration - Jitter time.Duration - SampleCount int64 - Trend LatencyTrend -} - -// LatencyTrend indicates the direction of latency changes -type LatencyTrend int - -const ( - LatencyTrendStable LatencyTrend = iota - LatencyTrendIncreasing - LatencyTrendDecreasing - LatencyTrendVolatile -) - -// DefaultLatencyConfig returns a sensible default configuration -func DefaultLatencyConfig() LatencyConfig { - config := Config - return LatencyConfig{ - TargetLatency: config.LatencyMonitorTarget, - MaxLatency: config.MaxLatencyThreshold, - OptimizationInterval: config.LatencyOptimizationInterval, - HistorySize: config.LatencyHistorySize, - JitterThreshold: config.JitterThreshold, - AdaptiveThreshold: config.LatencyAdaptiveThreshold, - } -} - -// NewLatencyMonitor creates a new latency monitoring system -func NewLatencyMonitor(config LatencyConfig, logger zerolog.Logger) *LatencyMonitor { - // Validate latency configuration - if err := ValidateLatencyConfig(config); err != nil { - // Log validation error and use default configuration - logger.Error().Err(err).Msg("Invalid latency configuration provided, using defaults") - config = DefaultLatencyConfig() - } - - ctx, cancel := context.WithCancel(context.Background()) - - return &LatencyMonitor{ - config: config, - logger: logger.With().Str("component", "latency-monitor").Logger(), - ctx: ctx, - cancel: cancel, - latencyHistory: make([]LatencyMeasurement, 0, config.HistorySize), - minLatency: int64(time.Hour), // Initialize to high value - } -} - -// Start begins latency monitoring and optimization -func (lm *LatencyMonitor) Start() { - lm.wg.Add(1) - go lm.monitoringLoop() -} - -// Stop stops the latency monitor -func (lm *LatencyMonitor) Stop() { - lm.cancel() - lm.wg.Wait() -} - -// RecordLatency records a new latency measurement -func (lm *LatencyMonitor) RecordLatency(latency time.Duration, source string) { - now := time.Now() - latencyNanos := latency.Nanoseconds() - - // Update atomic counters - atomic.StoreInt64(&lm.currentLatency, latencyNanos) - atomic.AddInt64(&lm.latencySamples, 1) - - // Update min/max - for { - oldMin := atomic.LoadInt64(&lm.minLatency) - if latencyNanos >= oldMin || atomic.CompareAndSwapInt64(&lm.minLatency, oldMin, latencyNanos) { - break - } - } - - for { - oldMax := atomic.LoadInt64(&lm.maxLatency) - if latencyNanos <= oldMax || atomic.CompareAndSwapInt64(&lm.maxLatency, oldMax, latencyNanos) { - break - } - } - - // Update rolling average using exponential moving average - oldAvg := atomic.LoadInt64(&lm.averageLatency) - newAvg := oldAvg + (latencyNanos-oldAvg)/10 // Alpha = 0.1 - atomic.StoreInt64(&lm.averageLatency, newAvg) - - // Calculate jitter (difference from average) - jitter := latencyNanos - newAvg - if jitter < 0 { - jitter = -jitter - } - atomic.AddInt64(&lm.jitterAccumulator, jitter) - - // Store in history - lm.historyMutex.Lock() - measurement := LatencyMeasurement{ - Timestamp: now, - Latency: latency, - Jitter: time.Duration(jitter), - Source: source, - } - - if len(lm.latencyHistory) >= lm.config.HistorySize { - // Remove oldest measurement - copy(lm.latencyHistory, lm.latencyHistory[1:]) - lm.latencyHistory[len(lm.latencyHistory)-1] = measurement - } else { - lm.latencyHistory = append(lm.latencyHistory, measurement) - } - lm.historyMutex.Unlock() -} - -// GetMetrics returns current latency metrics -func (lm *LatencyMonitor) GetMetrics() LatencyMetrics { - current := atomic.LoadInt64(&lm.currentLatency) - average := atomic.LoadInt64(&lm.averageLatency) - min := atomic.LoadInt64(&lm.minLatency) - max := atomic.LoadInt64(&lm.maxLatency) - samples := atomic.LoadInt64(&lm.latencySamples) - jitterSum := atomic.LoadInt64(&lm.jitterAccumulator) - - var jitter time.Duration - if samples > 0 { - jitter = time.Duration(jitterSum / samples) - } - - return LatencyMetrics{ - Current: time.Duration(current), - Average: time.Duration(average), - Min: time.Duration(min), - Max: time.Duration(max), - Jitter: jitter, - SampleCount: samples, - Trend: lm.calculateTrend(), - } -} - -// AddOptimizationCallback adds a callback for latency optimization -func (lm *LatencyMonitor) AddOptimizationCallback(callback OptimizationCallback) { - lm.mutex.Lock() - lm.optimizationCallbacks = append(lm.optimizationCallbacks, callback) - lm.mutex.Unlock() -} - -// monitoringLoop runs the main monitoring and optimization loop -func (lm *LatencyMonitor) monitoringLoop() { - defer lm.wg.Done() - - ticker := time.NewTicker(lm.config.OptimizationInterval) - defer ticker.Stop() - - for { - select { - case <-lm.ctx.Done(): - return - case <-ticker.C: - lm.runOptimization() - } - } -} - -// runOptimization checks if optimization is needed and triggers callbacks with threshold validation. -// -// Validation Rules: -// - Current latency must not exceed MaxLatency (default: 200ms) -// - Average latency checked against adaptive threshold: TargetLatency * (1 + AdaptiveThreshold) -// - Jitter must not exceed JitterThreshold (default: 20ms) -// - All latency values must be non-negative durations -// -// Optimization Triggers: -// - Current latency > MaxLatency: Immediate optimization needed -// - Average latency > adaptive threshold: Gradual optimization needed -// - Jitter > JitterThreshold: Stability optimization needed -// -// Threshold Calculations: -// - Adaptive threshold = TargetLatency * (1.0 + AdaptiveThreshold) -// - Default: 50ms * (1.0 + 0.8) = 90ms adaptive threshold -// - Provides buffer above target before triggering optimization -// -// The function ensures real-time audio performance by monitoring multiple -// latency metrics and triggering optimization callbacks when thresholds are exceeded. -func (lm *LatencyMonitor) runOptimization() { - metrics := lm.GetMetrics() - - // Check if optimization is needed - needsOptimization := false - - // Check if current latency exceeds threshold - if metrics.Current > lm.config.MaxLatency { - needsOptimization = true - lm.logger.Warn().Dur("current_latency", metrics.Current).Dur("max_latency", lm.config.MaxLatency).Msg("latency exceeds maximum threshold") - } - - // Check if average latency is above adaptive threshold - adaptiveThreshold := time.Duration(float64(lm.config.TargetLatency.Nanoseconds()) * (1.0 + lm.config.AdaptiveThreshold)) - if metrics.Average > adaptiveThreshold { - needsOptimization = true - } - - // Check if jitter is too high - if metrics.Jitter > lm.config.JitterThreshold { - needsOptimization = true - } - - if needsOptimization { - atomic.StoreInt64(&lm.lastOptimization, time.Now().UnixNano()) - - // Run optimization callbacks - lm.mutex.RLock() - callbacks := make([]OptimizationCallback, len(lm.optimizationCallbacks)) - copy(callbacks, lm.optimizationCallbacks) - lm.mutex.RUnlock() - - for _, callback := range callbacks { - if err := callback(metrics); err != nil { - lm.logger.Error().Err(err).Msg("optimization callback failed") - } - } - } -} - -// calculateTrend analyzes recent latency measurements to determine trend -func (lm *LatencyMonitor) calculateTrend() LatencyTrend { - lm.historyMutex.RLock() - defer lm.historyMutex.RUnlock() - - if len(lm.latencyHistory) < 10 { - return LatencyTrendStable - } - - // Analyze last 10 measurements - recentMeasurements := lm.latencyHistory[len(lm.latencyHistory)-10:] - - var increasing, decreasing int - for i := 1; i < len(recentMeasurements); i++ { - if recentMeasurements[i].Latency > recentMeasurements[i-1].Latency { - increasing++ - } else if recentMeasurements[i].Latency < recentMeasurements[i-1].Latency { - decreasing++ - } - } - - // Determine trend based on direction changes - if increasing > 6 { - return LatencyTrendIncreasing - } else if decreasing > 6 { - return LatencyTrendDecreasing - } else if increasing+decreasing > 7 { - return LatencyTrendVolatile - } - - return LatencyTrendStable -} - -// GetLatencyHistory returns a copy of recent latency measurements -func (lm *LatencyMonitor) GetLatencyHistory() []LatencyMeasurement { - lm.historyMutex.RLock() - defer lm.historyMutex.RUnlock() - - history := make([]LatencyMeasurement, len(lm.latencyHistory)) - copy(history, lm.latencyHistory) - return history -} diff --git a/internal/audio/monitor_process.go b/internal/audio/monitor_process.go deleted file mode 100644 index aa898347..00000000 --- a/internal/audio/monitor_process.go +++ /dev/null @@ -1,406 +0,0 @@ -package audio - -import ( - "bufio" - "fmt" - "os" - "strconv" - "strings" - "sync" - "time" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" -) - -// Variables for process monitoring (using configuration) -var ( - // System constants - maxCPUPercent = Config.MaxCPUPercent - minCPUPercent = Config.MinCPUPercent - defaultClockTicks = Config.DefaultClockTicks - defaultMemoryGB = Config.DefaultMemoryGB - - // Monitoring thresholds - maxWarmupSamples = Config.MaxWarmupSamples - warmupCPUSamples = Config.WarmupCPUSamples - - // Channel buffer size - metricsChannelBuffer = Config.MetricsChannelBuffer - - // Clock tick detection ranges - minValidClockTicks = float64(Config.MinValidClockTicks) - maxValidClockTicks = float64(Config.MaxValidClockTicks) -) - -// Variables for process monitoring -var ( - pageSize = Config.PageSize -) - -// ProcessMetrics represents CPU and memory usage metrics for a process -type ProcessMetrics struct { - PID int `json:"pid"` - CPUPercent float64 `json:"cpu_percent"` - MemoryRSS int64 `json:"memory_rss_bytes"` - MemoryVMS int64 `json:"memory_vms_bytes"` - MemoryPercent float64 `json:"memory_percent"` - Timestamp time.Time `json:"timestamp"` - ProcessName string `json:"process_name"` -} - -type ProcessMonitor struct { - logger zerolog.Logger - mutex sync.RWMutex - monitoredPIDs map[int]*processState - running bool - stopChan chan struct{} - metricsChan chan ProcessMetrics - updateInterval time.Duration - totalMemory int64 - memoryOnce sync.Once - clockTicks float64 - clockTicksOnce sync.Once -} - -// processState tracks the state needed for CPU calculation -type processState struct { - name string - lastCPUTime int64 - lastSysTime int64 - lastUserTime int64 - lastSample time.Time - warmupSamples int -} - -// NewProcessMonitor creates a new process monitor -func NewProcessMonitor() *ProcessMonitor { - return &ProcessMonitor{ - logger: logging.GetDefaultLogger().With().Str("component", "process-monitor").Logger(), - monitoredPIDs: make(map[int]*processState), - stopChan: make(chan struct{}), - metricsChan: make(chan ProcessMetrics, metricsChannelBuffer), - updateInterval: GetMetricsUpdateInterval(), - } -} - -// Start begins monitoring processes -func (pm *ProcessMonitor) Start() { - pm.mutex.Lock() - defer pm.mutex.Unlock() - - if pm.running { - return - } - - pm.running = true - go pm.monitorLoop() - pm.logger.Debug().Msg("process monitor started") -} - -// Stop stops monitoring processes -func (pm *ProcessMonitor) Stop() { - pm.mutex.Lock() - defer pm.mutex.Unlock() - - if !pm.running { - return - } - - pm.running = false - close(pm.stopChan) - pm.logger.Debug().Msg("process monitor stopped") -} - -// AddProcess adds a process to monitor -func (pm *ProcessMonitor) AddProcess(pid int, name string) { - pm.mutex.Lock() - defer pm.mutex.Unlock() - - pm.monitoredPIDs[pid] = &processState{ - name: name, - lastSample: time.Now(), - } - pm.logger.Info().Int("pid", pid).Str("name", name).Msg("Added process to monitor") -} - -// RemoveProcess removes a process from monitoring -func (pm *ProcessMonitor) RemoveProcess(pid int) { - pm.mutex.Lock() - defer pm.mutex.Unlock() - - delete(pm.monitoredPIDs, pid) - pm.logger.Info().Int("pid", pid).Msg("Removed process from monitor") -} - -// GetMetricsChan returns the channel for receiving metrics -func (pm *ProcessMonitor) GetMetricsChan() <-chan ProcessMetrics { - return pm.metricsChan -} - -// GetCurrentMetrics returns current metrics for all monitored processes -func (pm *ProcessMonitor) GetCurrentMetrics() []ProcessMetrics { - pm.mutex.RLock() - defer pm.mutex.RUnlock() - - var metrics []ProcessMetrics - for pid, state := range pm.monitoredPIDs { - if metric, err := pm.collectMetrics(pid, state); err == nil { - metrics = append(metrics, metric) - } - } - return metrics -} - -// monitorLoop is the main monitoring loop -func (pm *ProcessMonitor) monitorLoop() { - ticker := time.NewTicker(pm.updateInterval) - defer ticker.Stop() - - for { - select { - case <-pm.stopChan: - return - case <-ticker.C: - pm.collectAllMetrics() - } - } -} - -func (pm *ProcessMonitor) collectAllMetrics() { - pm.mutex.RLock() - pidsToCheck := make([]int, 0, len(pm.monitoredPIDs)) - states := make([]*processState, 0, len(pm.monitoredPIDs)) - for pid, state := range pm.monitoredPIDs { - pidsToCheck = append(pidsToCheck, pid) - states = append(states, state) - } - pm.mutex.RUnlock() - - deadPIDs := make([]int, 0) - for i, pid := range pidsToCheck { - if metric, err := pm.collectMetrics(pid, states[i]); err == nil { - select { - case pm.metricsChan <- metric: - default: - } - } else { - deadPIDs = append(deadPIDs, pid) - } - } - - for _, pid := range deadPIDs { - pm.RemoveProcess(pid) - } -} - -func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessMetrics, error) { - now := time.Now() - metric := ProcessMetrics{ - PID: pid, - Timestamp: now, - ProcessName: state.name, - } - - statPath := fmt.Sprintf("/proc/%d/stat", pid) - statData, err := os.ReadFile(statPath) - if err != nil { - return metric, fmt.Errorf("failed to read process statistics from /proc/%d/stat: %w", pid, err) - } - - fields := strings.Fields(string(statData)) - if len(fields) < 24 { - return metric, fmt.Errorf("invalid process stat format: expected at least 24 fields, got %d from /proc/%d/stat", len(fields), pid) - } - - utime, _ := strconv.ParseInt(fields[13], 10, 64) - stime, _ := strconv.ParseInt(fields[14], 10, 64) - totalCPUTime := utime + stime - - vsize, _ := strconv.ParseInt(fields[22], 10, 64) - rss, _ := strconv.ParseInt(fields[23], 10, 64) - - metric.MemoryRSS = rss * int64(pageSize) - metric.MemoryVMS = vsize - - // Calculate CPU percentage - metric.CPUPercent = pm.calculateCPUPercent(totalCPUTime, state, now) - - // Increment warmup counter - if state.warmupSamples < maxWarmupSamples { - state.warmupSamples++ - } - - // Calculate memory percentage (RSS / total system memory) - if totalMem := pm.getTotalMemory(); totalMem > 0 { - metric.MemoryPercent = float64(metric.MemoryRSS) / float64(totalMem) * Config.PercentageMultiplier - } - - // Update state for next calculation - state.lastCPUTime = totalCPUTime - state.lastUserTime = utime - state.lastSysTime = stime - state.lastSample = now - - return metric, nil -} - -// calculateCPUPercent calculates CPU percentage for a process with validation and bounds checking. -// -// Validation Rules: -// - Returns 0.0 for first sample (no baseline for comparison) -// - Requires positive time delta between samples -// - Applies CPU percentage bounds: [MinCPUPercent, MaxCPUPercent] -// - Uses system clock ticks for accurate CPU time conversion -// - Validates clock ticks within range [MinValidClockTicks, MaxValidClockTicks] -// -// Bounds Applied: -// - CPU percentage clamped to [0.01%, 100.0%] (default values) -// - Clock ticks validated within [50, 1000] range (default values) -// - Time delta must be > 0 to prevent division by zero -// -// Warmup Behavior: -// - During warmup period (< WarmupCPUSamples), returns MinCPUPercent for idle processes -// - This indicates process is alive but not consuming significant CPU -// -// The function ensures accurate CPU percentage calculation while preventing -// invalid measurements that could affect system monitoring and adaptive algorithms. -func (pm *ProcessMonitor) calculateCPUPercent(totalCPUTime int64, state *processState, now time.Time) float64 { - if state.lastSample.IsZero() { - // First sample - initialize baseline - state.warmupSamples = 0 - return 0.0 - } - - timeDelta := now.Sub(state.lastSample).Seconds() - cpuDelta := float64(totalCPUTime - state.lastCPUTime) - - if timeDelta <= 0 { - return 0.0 - } - - if cpuDelta > 0 { - // Convert from clock ticks to seconds using actual system clock ticks - clockTicks := pm.getClockTicks() - cpuSeconds := cpuDelta / clockTicks - cpuPercent := (cpuSeconds / timeDelta) * Config.PercentageMultiplier - - // Apply bounds - if cpuPercent > maxCPUPercent { - cpuPercent = maxCPUPercent - } - if cpuPercent < minCPUPercent { - cpuPercent = minCPUPercent - } - - return cpuPercent - } - - // No CPU delta - process was idle - if state.warmupSamples < warmupCPUSamples { - // During warmup, provide a small non-zero value to indicate process is alive - return minCPUPercent - } - - return 0.0 -} - -func (pm *ProcessMonitor) getClockTicks() float64 { - pm.clockTicksOnce.Do(func() { - // Try to detect actual clock ticks from kernel boot parameters or /proc/stat - if data, err := os.ReadFile("/proc/cmdline"); err == nil { - // Look for HZ parameter in kernel command line - cmdline := string(data) - if strings.Contains(cmdline, "HZ=") { - fields := strings.Fields(cmdline) - for _, field := range fields { - if strings.HasPrefix(field, "HZ=") { - if hz, err := strconv.ParseFloat(field[3:], 64); err == nil && hz > 0 { - pm.clockTicks = hz - return - } - } - } - } - } - - // Try reading from /proc/timer_list for more accurate detection - if data, err := os.ReadFile("/proc/timer_list"); err == nil { - timer := string(data) - // Look for tick device frequency - lines := strings.Split(timer, "\n") - for _, line := range lines { - if strings.Contains(line, "tick_period:") { - fields := strings.Fields(line) - if len(fields) >= 2 { - if period, err := strconv.ParseInt(fields[1], 10, 64); err == nil && period > 0 { - // Convert nanoseconds to Hz - hz := Config.CGONanosecondsPerSecond / float64(period) - if hz >= minValidClockTicks && hz <= maxValidClockTicks { - pm.clockTicks = hz - return - } - } - } - } - } - } - - // Fallback: Most embedded ARM systems (like jetKVM) use 250 Hz or 1000 Hz - // rather than the traditional 100 Hz - pm.clockTicks = defaultClockTicks - pm.logger.Warn().Float64("clock_ticks", pm.clockTicks).Msg("Using fallback clock ticks value") - - // Log successful detection for non-fallback values - if pm.clockTicks != defaultClockTicks { - pm.logger.Info().Float64("clock_ticks", pm.clockTicks).Msg("Detected system clock ticks") - } - }) - return pm.clockTicks -} - -func (pm *ProcessMonitor) getTotalMemory() int64 { - pm.memoryOnce.Do(func() { - file, err := os.Open("/proc/meminfo") - if err != nil { - pm.totalMemory = int64(defaultMemoryGB) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes) - return - } - defer file.Close() - - scanner := bufio.NewScanner(file) - for scanner.Scan() { - line := scanner.Text() - if strings.HasPrefix(line, "MemTotal:") { - fields := strings.Fields(line) - if len(fields) >= 2 { - if kb, err := strconv.ParseInt(fields[1], 10, 64); err == nil { - pm.totalMemory = kb * int64(Config.ProcessMonitorKBToBytes) - return - } - } - break - } - } - pm.totalMemory = int64(defaultMemoryGB) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes) // Fallback - }) - return pm.totalMemory -} - -// GetTotalMemory returns total system memory in bytes (public method) -func (pm *ProcessMonitor) GetTotalMemory() int64 { - return pm.getTotalMemory() -} - -// Global process monitor instance -var globalProcessMonitor *ProcessMonitor -var processMonitorOnce sync.Once - -// GetProcessMonitor returns the global process monitor instance -func GetProcessMonitor() *ProcessMonitor { - processMonitorOnce.Do(func() { - globalProcessMonitor = NewProcessMonitor() - globalProcessMonitor.Start() - }) - return globalProcessMonitor -} diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index 2560d4be..1cf404c8 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -49,7 +49,6 @@ func getOutputStreamingLogger() *zerolog.Logger { // StartAudioOutputStreaming starts audio output streaming (capturing system audio) func StartAudioOutputStreaming(send func([]byte)) error { // Initialize audio monitoring (latency tracking and cache cleanup) - InitializeAudioMonitoring() if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) { return ErrAudioAlreadyRunning diff --git a/internal/audio/output_supervisor.go b/internal/audio/output_supervisor.go index e4888b01..b506cab7 100644 --- a/internal/audio/output_supervisor.go +++ b/internal/audio/output_supervisor.go @@ -213,7 +213,6 @@ func (s *AudioOutputSupervisor) startProcess() error { s.logger.Info().Int("pid", s.processPID).Strs("args", args).Strs("opus_env", s.opusEnv).Msg("audio server process started") // Add process to monitoring - s.processMonitor.AddProcess(s.processPID, "audio-output-server") if s.onProcessStart != nil { s.onProcessStart(s.processPID) diff --git a/internal/audio/util_buffer_pool.go b/internal/audio/util_buffer_pool.go index 5f452942..aabcd4d9 100644 --- a/internal/audio/util_buffer_pool.go +++ b/internal/audio/util_buffer_pool.go @@ -4,804 +4,138 @@ package audio import ( - "runtime" - "sort" - "sync" "sync/atomic" - "time" - "unsafe" ) -// AudioLatencyInfo holds simplified latency information for cleanup decisions -type AudioLatencyInfo struct { - LatencyMs float64 - Timestamp time.Time -} - -// Global latency tracking -var ( - currentAudioLatency = AudioLatencyInfo{} - currentAudioLatencyLock sync.RWMutex - audioMonitoringInitialized int32 // Atomic flag to track initialization -) - -// InitializeAudioMonitoring starts the background goroutines for latency tracking and cache cleanup -// This is safe to call multiple times as it will only initialize once -func InitializeAudioMonitoring() { - // Use atomic CAS to ensure we only initialize once - if atomic.CompareAndSwapInt32(&audioMonitoringInitialized, 0, 1) { - // Start the latency recorder - startLatencyRecorder() - - // Start the cleanup goroutine - startCleanupGoroutine() - } -} - -// latencyChannel is used for non-blocking latency recording -var latencyChannel = make(chan float64, 10) - -// startLatencyRecorder starts the latency recorder goroutine -// This should be called during package initialization -func startLatencyRecorder() { - go latencyRecorderLoop() -} - -// latencyRecorderLoop processes latency recordings in the background -func latencyRecorderLoop() { - for latencyMs := range latencyChannel { - currentAudioLatencyLock.Lock() - currentAudioLatency = AudioLatencyInfo{ - LatencyMs: latencyMs, - Timestamp: time.Now(), - } - currentAudioLatencyLock.Unlock() - } -} - -// RecordAudioLatency records the current audio processing latency -// This is called from the audio input manager when latency is measured -// It is non-blocking to ensure zero overhead in the critical audio path -func RecordAudioLatency(latencyMs float64) { - // Non-blocking send - if channel is full, we drop the update - select { - case latencyChannel <- latencyMs: - // Successfully sent - default: - // Channel full, drop this update to avoid blocking the audio path - } -} - -// GetAudioLatencyMetrics returns the current audio latency information -// Returns nil if no latency data is available or if it's too old -func GetAudioLatencyMetrics() *AudioLatencyInfo { - currentAudioLatencyLock.RLock() - defer currentAudioLatencyLock.RUnlock() - - // Check if we have valid latency data - if currentAudioLatency.Timestamp.IsZero() { - return nil - } - - // Check if the data is too old (more than 5 seconds) - if time.Since(currentAudioLatency.Timestamp) > 5*time.Second { - return nil - } - - return &AudioLatencyInfo{ - LatencyMs: currentAudioLatency.LatencyMs, - Timestamp: currentAudioLatency.Timestamp, - } -} - -// Enhanced lock-free buffer cache for per-goroutine optimization -type lockFreeBufferCache struct { - buffers [8]*[]byte // Increased from 4 to 8 buffers per goroutine cache for better hit rates -} - -// Buffer pool constants are now configured via Config -// See core_config_constants.go for default values - -// TTL tracking for goroutine cache entries -type cacheEntry struct { - cache *lockFreeBufferCache - lastAccess int64 // Unix timestamp of last access - gid int64 // Goroutine ID for better tracking -} - -// Per-goroutine buffer cache using goroutine-local storage -var goroutineBufferCache = make(map[int64]*lockFreeBufferCache) -var goroutineCacheMutex sync.RWMutex -var goroutineCacheWithTTL = make(map[int64]*cacheEntry) -var lastCleanupTime int64 // Unix timestamp of last cleanup - -// getGoroutineID extracts goroutine ID from runtime stack for cache key -func getGoroutineID() int64 { - b := make([]byte, 64) - b = b[:runtime.Stack(b, false)] - // Parse "goroutine 123 [running]:" format - for i := 10; i < len(b); i++ { - if b[i] == ' ' { - id := int64(0) - for j := 10; j < i; j++ { - if b[j] >= '0' && b[j] <= '9' { - id = id*10 + int64(b[j]-'0') - } - } - return id - } - } - return 0 -} - -// Map of goroutine ID to cache entry with TTL tracking (declared above) - -// cleanupChannel is used for asynchronous cleanup requests -var cleanupChannel = make(chan struct{}, 1) - -// startCleanupGoroutine starts the cleanup goroutine -// This should be called during package initialization -func startCleanupGoroutine() { - go cleanupLoop() -} - -// cleanupLoop processes cleanup requests in the background -func cleanupLoop() { - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - - for { - select { - case <-cleanupChannel: - // Received explicit cleanup request - performCleanup(true) - case <-ticker.C: - // Regular cleanup check - performCleanup(false) - } - } -} - -// requestCleanup signals the cleanup goroutine to perform a cleanup -// This is non-blocking and can be called from the critical path -func requestCleanup() { - select { - case cleanupChannel <- struct{}{}: - // Successfully requested cleanup - default: - // Channel full, cleanup already pending - } -} - -// performCleanup does the actual cache cleanup work -// This runs in a dedicated goroutine, not in the critical path -func performCleanup(forced bool) { - now := time.Now().Unix() - lastCleanup := atomic.LoadInt64(&lastCleanupTime) - - // Check if we're in a high-latency situation - isHighLatency := false - latencyMetrics := GetAudioLatencyMetrics() - if latencyMetrics != nil && latencyMetrics.LatencyMs > 10.0 { - // Under high latency, be more aggressive with cleanup - isHighLatency = true - } - - // Only cleanup if enough time has passed (less time if high latency) or if forced - interval := Config.BufferPoolCleanupInterval - if isHighLatency { - interval = Config.BufferPoolCleanupInterval / 2 // More frequent cleanup under high latency - } - - if !forced && now-lastCleanup < interval { - return - } - - // Try to acquire cleanup lock atomically - if !atomic.CompareAndSwapInt64(&lastCleanupTime, lastCleanup, now) { - return // Another goroutine is already cleaning up - } - - // Perform the actual cleanup - doCleanupGoroutineCache() -} - -// cleanupGoroutineCache triggers an asynchronous cleanup of the goroutine cache -// This is safe to call from the critical path as it's non-blocking -func cleanupGoroutineCache() { - // Request asynchronous cleanup - requestCleanup() -} - -// The actual cleanup implementation that runs in the background goroutine -func doCleanupGoroutineCache() { - // Get current time for TTL calculations - now := time.Now().Unix() - - // Check if we're in a high-latency situation - isHighLatency := false - latencyMetrics := GetAudioLatencyMetrics() - if latencyMetrics != nil && latencyMetrics.LatencyMs > 10.0 { - // Under high latency, be more aggressive with cleanup - isHighLatency = true - } - - goroutineCacheMutex.Lock() - defer goroutineCacheMutex.Unlock() - - // Convert old cache format to new TTL-based format if needed - if len(goroutineCacheWithTTL) == 0 && len(goroutineBufferCache) > 0 { - for gid, cache := range goroutineBufferCache { - goroutineCacheWithTTL[gid] = &cacheEntry{ - cache: cache, - lastAccess: now, - gid: gid, - } - } - // Clear old cache to free memory - goroutineBufferCache = make(map[int64]*lockFreeBufferCache) - } - - // Enhanced cleanup with size limits and better TTL management - entriesToRemove := make([]int64, 0) - ttl := Config.BufferPoolBufferTTL - if isHighLatency { - // Under high latency, use a much shorter TTL - ttl = Config.BufferPoolBufferTTL / 4 - } - - // Remove entries older than enhanced TTL - for gid, entry := range goroutineCacheWithTTL { - // Both now and entry.lastAccess are int64, so this comparison is safe - if now-entry.lastAccess > ttl { - entriesToRemove = append(entriesToRemove, gid) - } - } - - // If we have too many cache entries, remove the oldest ones - if len(goroutineCacheWithTTL) > Config.BufferPoolMaxCacheEntries { - // Sort by last access time and remove oldest entries - type cacheEntryWithGID struct { - gid int64 - lastAccess int64 - } - entries := make([]cacheEntryWithGID, 0, len(goroutineCacheWithTTL)) - for gid, entry := range goroutineCacheWithTTL { - entries = append(entries, cacheEntryWithGID{gid: gid, lastAccess: entry.lastAccess}) - } - // Sort by last access time (oldest first) - sort.Slice(entries, func(i, j int) bool { - return entries[i].lastAccess < entries[j].lastAccess - }) - // Mark oldest entries for removal - excessCount := len(goroutineCacheWithTTL) - Config.BufferPoolMaxCacheEntries - for i := 0; i < excessCount && i < len(entries); i++ { - entriesToRemove = append(entriesToRemove, entries[i].gid) - } - } - - // If cache is still too large after TTL cleanup, remove oldest entries - // Under high latency, use a more aggressive target size - targetSize := Config.BufferPoolMaxCacheSize - targetReduction := Config.BufferPoolMaxCacheSize / 2 - - if isHighLatency { - // Under high latency, target a much smaller cache size - targetSize = Config.BufferPoolMaxCacheSize / 4 - targetReduction = Config.BufferPoolMaxCacheSize / 8 - } - - if len(goroutineCacheWithTTL) > targetSize { - // Find oldest entries - type ageEntry struct { - gid int64 - lastAccess int64 - } - oldestEntries := make([]ageEntry, 0, len(goroutineCacheWithTTL)) - for gid, entry := range goroutineCacheWithTTL { - oldestEntries = append(oldestEntries, ageEntry{gid, entry.lastAccess}) - } - - // Sort by lastAccess (oldest first) - sort.Slice(oldestEntries, func(i, j int) bool { - return oldestEntries[i].lastAccess < oldestEntries[j].lastAccess - }) - - // Remove oldest entries to get down to target reduction size - toRemove := len(goroutineCacheWithTTL) - targetReduction - for i := 0; i < toRemove && i < len(oldestEntries); i++ { - entriesToRemove = append(entriesToRemove, oldestEntries[i].gid) - } - } - - // Remove marked entries and return their buffers to the pool - for _, gid := range entriesToRemove { - if entry, exists := goroutineCacheWithTTL[gid]; exists { - // Return buffers to main pool before removing entry - for i, buf := range entry.cache.buffers { - if buf != nil { - // Clear the buffer slot atomically - entry.cache.buffers[i] = nil - } - } - delete(goroutineCacheWithTTL, gid) - } - } -} - +// AudioBufferPool provides a simple buffer pool for audio processing type AudioBufferPool struct { - // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) - currentSize int64 // Current pool size (atomic) - hitCount int64 // Pool hit counter (atomic) - missCount int64 // Pool miss counter (atomic) + // Atomic counters + hitCount int64 // Pool hit counter (atomic) + missCount int64 // Pool miss counter (atomic) - // Other fields - pool sync.Pool - bufferSize int - maxPoolSize int - mutex sync.RWMutex - // Memory optimization fields - preallocated []*[]byte // Pre-allocated buffers for immediate use - preallocSize int // Number of pre-allocated buffers - - // Chunk-based allocation optimization - chunkSize int // Size of each memory chunk - chunks [][]byte // Pre-allocated memory chunks - chunkOffsets []int // Current offset in each chunk - chunkMutex sync.Mutex // Protects chunk allocation + // Pool configuration + bufferSize int + pool chan []byte + maxSize int } +// NewAudioBufferPool creates a new simple audio buffer pool func NewAudioBufferPool(bufferSize int) *AudioBufferPool { - // Validate buffer size parameter - if err := ValidateBufferSize(bufferSize); err != nil { - // Use default value on validation error - bufferSize = Config.AudioFramePoolSize + maxSize := Config.MaxPoolSize + if maxSize <= 0 { + maxSize = Config.BufferPoolDefaultSize } - // Enhanced preallocation strategy based on buffer size and system capacity - var preallocSize int - if bufferSize <= Config.AudioFramePoolSize { - // For smaller pools, use enhanced preallocation - preallocSize = Config.PreallocPercentage * 2 - } else { - // For larger pools, use standard enhanced preallocation - preallocSize = (Config.PreallocPercentage * 3) / 2 + pool := &AudioBufferPool{ + bufferSize: bufferSize, + pool: make(chan []byte, maxSize), + maxSize: maxSize, } - // Ensure minimum preallocation for better performance - if preallocSize < Config.BufferPoolMinPreallocBuffers { - preallocSize = Config.BufferPoolMinPreallocBuffers - } - - // Calculate max pool size based on buffer size to prevent memory bloat - maxPoolSize := Config.BufferPoolMaxPoolSize // Default - if bufferSize > 8192 { - maxPoolSize = Config.BufferPoolMaxPoolSize / 4 // Much smaller for very large buffers - } else if bufferSize > 4096 { - maxPoolSize = Config.BufferPoolMaxPoolSize / 2 // Smaller for large buffers - } else if bufferSize > 1024 { - maxPoolSize = (Config.BufferPoolMaxPoolSize * 3) / 4 // Medium for medium buffers - } - - // Calculate chunk size - allocate larger chunks to reduce allocation frequency - chunkSize := bufferSize * Config.BufferPoolChunkBufferCount // Each chunk holds multiple buffers worth of memory - if chunkSize < Config.BufferPoolMinChunkSize { - chunkSize = Config.BufferPoolMinChunkSize // Minimum chunk size - } - - p := &AudioBufferPool{ - bufferSize: bufferSize, - maxPoolSize: maxPoolSize, - preallocated: make([]*[]byte, 0, preallocSize), - preallocSize: preallocSize, - chunkSize: chunkSize, - chunks: make([][]byte, 0, Config.BufferPoolInitialChunkCapacity), // Start with capacity for initial chunks - chunkOffsets: make([]int, 0, Config.BufferPoolInitialChunkCapacity), - } - - // Configure sync.Pool with optimized allocation - p.pool.New = func() interface{} { - // Use chunk-based allocation instead of individual make() - buf := p.allocateFromChunk() - return &buf - } - - // Pre-allocate buffers with optimized capacity - for i := 0; i < preallocSize; i++ { - // Use chunk-based allocation to prevent over-allocation - buf := p.allocateFromChunk() - p.preallocated = append(p.preallocated, &buf) - } - - return p -} - -// allocateFromChunk allocates a buffer from pre-allocated memory chunks -func (p *AudioBufferPool) allocateFromChunk() []byte { - p.chunkMutex.Lock() - defer p.chunkMutex.Unlock() - - // Try to allocate from existing chunks - for i := 0; i < len(p.chunks); i++ { - if p.chunkOffsets[i]+p.bufferSize <= len(p.chunks[i]) { - // Slice from the chunk - start := p.chunkOffsets[i] - end := start + p.bufferSize - buf := p.chunks[i][start:end:end] // Use 3-index slice to set capacity - p.chunkOffsets[i] = end - return buf[:0] // Return with zero length but correct capacity + // Pre-populate the pool + for i := 0; i < maxSize/2; i++ { + buf := make([]byte, bufferSize) + select { + case pool.pool <- buf: + default: + break } } - // Need to allocate a new chunk - newChunk := make([]byte, p.chunkSize) - p.chunks = append(p.chunks, newChunk) - p.chunkOffsets = append(p.chunkOffsets, p.bufferSize) - - // Return buffer from the new chunk - buf := newChunk[0:p.bufferSize:p.bufferSize] - return buf[:0] // Return with zero length but correct capacity + return pool } +// Get retrieves a buffer from the pool func (p *AudioBufferPool) Get() []byte { - // Skip cleanup trigger in hotpath - cleanup runs in background - // cleanupGoroutineCache() - moved to background goroutine - - // Fast path: Try lock-free per-goroutine cache first - gid := getGoroutineID() - goroutineCacheMutex.RLock() - cacheEntry, exists := goroutineCacheWithTTL[gid] - goroutineCacheMutex.RUnlock() - - if exists && cacheEntry != nil && cacheEntry.cache != nil { - // Try to get buffer from lock-free cache - cache := cacheEntry.cache - for i := 0; i < len(cache.buffers); i++ { - bufPtr := (*unsafe.Pointer)(unsafe.Pointer(&cache.buffers[i])) - buf := (*[]byte)(atomic.LoadPointer(bufPtr)) - if buf != nil && atomic.CompareAndSwapPointer(bufPtr, unsafe.Pointer(buf), nil) { - // Direct hit count update to avoid sampling complexity in critical path - atomic.AddInt64(&p.hitCount, 1) - *buf = (*buf)[:0] - return *buf - } - } - // Update access time only after cache miss to reduce overhead - cacheEntry.lastAccess = time.Now().Unix() - } - - // Fallback: Try pre-allocated pool with mutex - p.mutex.Lock() - if len(p.preallocated) > 0 { - lastIdx := len(p.preallocated) - 1 - buf := p.preallocated[lastIdx] - p.preallocated = p.preallocated[:lastIdx] - p.mutex.Unlock() - // Direct hit count update to avoid sampling complexity in critical path + select { + case buf := <-p.pool: atomic.AddInt64(&p.hitCount, 1) - *buf = (*buf)[:0] - return *buf + return buf[:0] // Reset length but keep capacity + default: + atomic.AddInt64(&p.missCount, 1) + return make([]byte, 0, p.bufferSize) } - p.mutex.Unlock() - - // Try sync.Pool next - if poolBuf := p.pool.Get(); poolBuf != nil { - buf := poolBuf.(*[]byte) - // Direct hit count update to avoid sampling complexity in critical path - atomic.AddInt64(&p.hitCount, 1) - atomic.AddInt64(&p.currentSize, -1) - // Fast capacity check - most buffers should be correct size - if cap(*buf) >= p.bufferSize { - *buf = (*buf)[:0] - return *buf - } - // Buffer too small, fall through to allocation - } - - // Pool miss - allocate new buffer from chunk - // Direct miss count update to avoid sampling complexity in critical path - atomic.AddInt64(&p.missCount, 1) - return p.allocateFromChunk() } +// Put returns a buffer to the pool func (p *AudioBufferPool) Put(buf []byte) { - // Fast validation - reject buffers that are too small or too large - bufCap := cap(buf) - if bufCap < p.bufferSize || bufCap > p.bufferSize*2 { - return // Buffer size mismatch, don't pool it to prevent memory bloat + if buf == nil || cap(buf) != p.bufferSize { + return // Invalid buffer } - // Enhanced buffer clearing - only clear if buffer contains sensitive data - // For audio buffers, we can skip clearing for performance unless needed - // This reduces CPU overhead significantly - var resetBuf []byte - if cap(buf) > p.bufferSize { - // If capacity is larger than expected, create a new properly sized buffer - resetBuf = make([]byte, 0, p.bufferSize) - } else { - // Reset length but keep capacity for reuse efficiency - resetBuf = buf[:0] + // Reset the buffer + buf = buf[:0] + + // Try to return to pool + select { + case p.pool <- buf: + // Successfully returned to pool + default: + // Pool is full, discard buffer } - - // Fast path: Try to put in lock-free per-goroutine cache - gid := getGoroutineID() - goroutineCacheMutex.RLock() - entryWithTTL, exists := goroutineCacheWithTTL[gid] - goroutineCacheMutex.RUnlock() - - var cache *lockFreeBufferCache - if exists && entryWithTTL != nil { - cache = entryWithTTL.cache - // Update access time only when we successfully use the cache - } else { - // Create new cache for this goroutine - cache = &lockFreeBufferCache{} - now := time.Now().Unix() - goroutineCacheMutex.Lock() - goroutineCacheWithTTL[gid] = &cacheEntry{ - cache: cache, - lastAccess: now, - gid: gid, - } - goroutineCacheMutex.Unlock() - } - - if cache != nil { - // Try to store in lock-free cache - for i := 0; i < len(cache.buffers); i++ { - bufPtr := (*unsafe.Pointer)(unsafe.Pointer(&cache.buffers[i])) - if atomic.CompareAndSwapPointer(bufPtr, nil, unsafe.Pointer(&resetBuf)) { - // Update access time only on successful cache - if exists && entryWithTTL != nil { - entryWithTTL.lastAccess = time.Now().Unix() - } - return // Successfully cached - } - } - } - - // Fallback: Try to return to pre-allocated pool for fastest reuse - p.mutex.Lock() - if len(p.preallocated) < p.preallocSize { - p.preallocated = append(p.preallocated, &resetBuf) - p.mutex.Unlock() - return - } - p.mutex.Unlock() - - // Check sync.Pool size limit to prevent excessive memory usage - if atomic.LoadInt64(&p.currentSize) >= int64(p.maxPoolSize) { - return // Pool is full, let GC handle this buffer - } - - // Return to sync.Pool and update counter atomically - p.pool.Put(&resetBuf) - atomic.AddInt64(&p.currentSize, 1) } -// Enhanced global buffer pools for different audio frame types with improved sizing -var ( - // Main audio frame pool with enhanced capacity - audioFramePool = NewAudioBufferPool(Config.AudioFramePoolSize) - // Control message pool with enhanced capacity for better throughput - audioControlPool = NewAudioBufferPool(Config.BufferPoolControlSize) // Control message buffer size -) - -func GetAudioFrameBuffer() []byte { - return audioFramePool.Get() -} - -func PutAudioFrameBuffer(buf []byte) { - audioFramePool.Put(buf) -} - -func GetAudioControlBuffer() []byte { - return audioControlPool.Get() -} - -func PutAudioControlBuffer(buf []byte) { - audioControlPool.Put(buf) -} - -// GetPoolStats returns detailed statistics about this buffer pool -func (p *AudioBufferPool) GetPoolStats() AudioBufferPoolDetailedStats { - p.mutex.RLock() - preallocatedCount := len(p.preallocated) - currentSize := p.currentSize - p.mutex.RUnlock() - +// GetStats returns pool statistics +func (p *AudioBufferPool) GetStats() AudioBufferPoolStats { hitCount := atomic.LoadInt64(&p.hitCount) missCount := atomic.LoadInt64(&p.missCount) totalRequests := hitCount + missCount var hitRate float64 if totalRequests > 0 { - hitRate = float64(hitCount) / float64(totalRequests) * Config.PercentageMultiplier + hitRate = float64(hitCount) / float64(totalRequests) * Config.BufferPoolHitRateBase } - return AudioBufferPoolDetailedStats{ - BufferSize: p.bufferSize, - MaxPoolSize: p.maxPoolSize, - CurrentPoolSize: currentSize, - PreallocatedCount: int64(preallocatedCount), - PreallocatedMax: int64(p.preallocSize), - HitCount: hitCount, - MissCount: missCount, - HitRate: hitRate, - } -} - -// AudioBufferPoolDetailedStats provides detailed pool statistics -type AudioBufferPoolDetailedStats struct { - BufferSize int - MaxPoolSize int - CurrentPoolSize int64 - PreallocatedCount int64 - PreallocatedMax int64 - HitCount int64 - MissCount int64 - HitRate float64 // Percentage - TotalBytes int64 // Total memory usage in bytes - AverageBufferSize float64 // Average size of buffers in the pool -} - -// GetAudioBufferPoolStats returns statistics about the audio buffer pools -type AudioBufferPoolStats struct { - FramePoolSize int64 - FramePoolMax int - ControlPoolSize int64 - ControlPoolMax int - // Enhanced statistics - FramePoolHitRate float64 - ControlPoolHitRate float64 - FramePoolDetails AudioBufferPoolDetailedStats - ControlPoolDetails AudioBufferPoolDetailedStats -} - -func GetAudioBufferPoolStats() AudioBufferPoolStats { - audioFramePool.mutex.RLock() - frameSize := audioFramePool.currentSize - frameMax := audioFramePool.maxPoolSize - audioFramePool.mutex.RUnlock() - - audioControlPool.mutex.RLock() - controlSize := audioControlPool.currentSize - controlMax := audioControlPool.maxPoolSize - audioControlPool.mutex.RUnlock() - - // Get detailed statistics - frameDetails := audioFramePool.GetPoolStats() - controlDetails := audioControlPool.GetPoolStats() - return AudioBufferPoolStats{ - FramePoolSize: frameSize, - FramePoolMax: frameMax, - ControlPoolSize: controlSize, - ControlPoolMax: controlMax, - FramePoolHitRate: frameDetails.HitRate, - ControlPoolHitRate: controlDetails.HitRate, - FramePoolDetails: frameDetails, - ControlPoolDetails: controlDetails, + BufferSize: p.bufferSize, + MaxPoolSize: p.maxSize, + CurrentSize: int64(len(p.pool)), + HitCount: hitCount, + MissCount: missCount, + HitRate: hitRate, } } -// AdaptiveResize dynamically adjusts pool parameters based on performance metrics -func (p *AudioBufferPool) AdaptiveResize() { - hitCount := atomic.LoadInt64(&p.hitCount) - missCount := atomic.LoadInt64(&p.missCount) - totalRequests := hitCount + missCount - - if totalRequests < int64(Config.BufferPoolAdaptiveResizeThreshold) { - return // Not enough data for meaningful adaptation - } - - hitRate := float64(hitCount) / float64(totalRequests) - currentSize := atomic.LoadInt64(&p.currentSize) - - // If hit rate is low, consider increasing pool size - if hitRate < Config.BufferPoolCacheHitRateTarget && currentSize < int64(p.maxPoolSize) { - // Increase preallocation by 25% up to max pool size - newPreallocSize := int(float64(len(p.preallocated)) * 1.25) - if newPreallocSize > p.maxPoolSize { - newPreallocSize = p.maxPoolSize - } - - // Preallocate additional buffers - for len(p.preallocated) < newPreallocSize { - buf := make([]byte, p.bufferSize) - p.preallocated = append(p.preallocated, &buf) - } - } - - // If hit rate is very high and pool is large, consider shrinking - if hitRate > Config.BufferPoolHighHitRateThreshold && len(p.preallocated) > p.preallocSize { - // Reduce preallocation by 10% but not below original size - newSize := int(float64(len(p.preallocated)) * 0.9) - if newSize < p.preallocSize { - newSize = p.preallocSize - } - - // Remove excess preallocated buffers - if newSize < len(p.preallocated) { - p.preallocated = p.preallocated[:newSize] - } - } +// AudioBufferPoolStats represents pool statistics +type AudioBufferPoolStats struct { + BufferSize int + MaxPoolSize int + CurrentSize int64 + HitCount int64 + MissCount int64 + HitRate float64 } -// WarmupCache pre-populates goroutine-local caches for better initial performance -func (p *AudioBufferPool) WarmupCache() { - // Only warmup if we have sufficient request history - hitCount := atomic.LoadInt64(&p.hitCount) - missCount := atomic.LoadInt64(&p.missCount) - totalRequests := hitCount + missCount +// Global buffer pools +var ( + audioFramePool = NewAudioBufferPool(Config.AudioFramePoolSize) + audioControlPool = NewAudioBufferPool(Config.BufferPoolControlSize) +) - if totalRequests < int64(Config.BufferPoolCacheWarmupThreshold) { - return - } - - // Get or create cache for current goroutine - gid := getGoroutineID() - goroutineCacheMutex.RLock() - entryWithTTL, exists := goroutineCacheWithTTL[gid] - goroutineCacheMutex.RUnlock() - - var cache *lockFreeBufferCache - if exists && entryWithTTL != nil { - cache = entryWithTTL.cache - } else { - // Create new cache for this goroutine - cache = &lockFreeBufferCache{} - now := time.Now().Unix() - goroutineCacheMutex.Lock() - goroutineCacheWithTTL[gid] = &cacheEntry{ - cache: cache, - lastAccess: now, - gid: gid, - } - goroutineCacheMutex.Unlock() - } - - if cache != nil { - // Fill cache to optimal level based on hit rate - hitRate := float64(hitCount) / float64(totalRequests) - optimalCacheSize := int(float64(Config.BufferPoolCacheSize) * hitRate) - if optimalCacheSize < 2 { - optimalCacheSize = 2 - } - - // Pre-allocate buffers for cache - for i := 0; i < optimalCacheSize && i < len(cache.buffers); i++ { - if cache.buffers[i] == nil { - // Get buffer from main pool - buf := p.Get() - if len(buf) > 0 { - cache.buffers[i] = &buf - } - } - } - } +// GetAudioFrameBuffer gets a buffer for audio frames +func GetAudioFrameBuffer() []byte { + return audioFramePool.Get() } -// OptimizeCache performs periodic cache optimization based on usage patterns -func (p *AudioBufferPool) OptimizeCache() { - hitCount := atomic.LoadInt64(&p.hitCount) - missCount := atomic.LoadInt64(&p.missCount) - totalRequests := hitCount + missCount +// PutAudioFrameBuffer returns a buffer to the frame pool +func PutAudioFrameBuffer(buf []byte) { + audioFramePool.Put(buf) +} - if totalRequests < int64(Config.BufferPoolOptimizeCacheThreshold) { - return - } +// GetAudioControlBuffer gets a buffer for control messages +func GetAudioControlBuffer() []byte { + return audioControlPool.Get() +} - hitRate := float64(hitCount) / float64(totalRequests) +// PutAudioControlBuffer returns a buffer to the control pool +func PutAudioControlBuffer(buf []byte) { + audioControlPool.Put(buf) +} - // If hit rate is below target, trigger cache warmup - if hitRate < Config.BufferPoolCacheHitRateTarget { - p.WarmupCache() - } - - // Reset counters periodically to avoid overflow and get fresh metrics - if totalRequests > int64(Config.BufferPoolCounterResetThreshold) { - atomic.StoreInt64(&p.hitCount, hitCount/2) - atomic.StoreInt64(&p.missCount, missCount/2) +// GetAudioBufferPoolStats returns statistics for all pools +func GetAudioBufferPoolStats() map[string]AudioBufferPoolStats { + return map[string]AudioBufferPoolStats{ + "frame_pool": audioFramePool.GetStats(), + "control_pool": audioControlPool.GetStats(), } } diff --git a/internal/audio/zero_copy.go b/internal/audio/zero_copy.go index 0c7edda2..38c57592 100644 --- a/internal/audio/zero_copy.go +++ b/internal/audio/zero_copy.go @@ -98,7 +98,7 @@ type ZeroCopyFramePool struct { // NewZeroCopyFramePool creates a new zero-copy frame pool func NewZeroCopyFramePool(maxFrameSize int) *ZeroCopyFramePool { // Pre-allocate frames for immediate availability - preallocSizeBytes := Config.PreallocSize + preallocSizeBytes := Config.ZeroCopyPreallocSizeBytes maxPoolSize := Config.MaxPoolSize // Limit total pool size // Calculate number of frames based on memory budget, not frame count @@ -106,8 +106,8 @@ func NewZeroCopyFramePool(maxFrameSize int) *ZeroCopyFramePool { if preallocFrameCount > maxPoolSize { preallocFrameCount = maxPoolSize } - if preallocFrameCount < 1 { - preallocFrameCount = 1 // Always preallocate at least one frame + if preallocFrameCount < Config.ZeroCopyMinPreallocFrames { + preallocFrameCount = Config.ZeroCopyMinPreallocFrames } preallocated := make([]*ZeroCopyAudioFrame, 0, preallocFrameCount) diff --git a/main.go b/main.go index 1bc7b686..0a7516ec 100644 --- a/main.go +++ b/main.go @@ -35,9 +35,6 @@ func startAudioSubprocess() error { // Initialize validation cache for optimal performance audio.InitValidationCache() - // Start goroutine monitoring to detect and prevent leaks - audio.StartGoroutineMonitoring() - // Enable batch audio processing to reduce CGO call overhead if err := audio.EnableBatchAudioProcessing(); err != nil { logger.Warn().Err(err).Msg("failed to enable batch audio processing") @@ -112,8 +109,6 @@ func startAudioSubprocess() error { // Stop audio relay when process exits audio.StopAudioRelay() - // Stop goroutine monitoring - audio.StopGoroutineMonitoring() // Disable batch audio processing audio.DisableBatchAudioProcessing() },