From 3a28105f562200a5256d3c12ef05c704fa3ae535 Mon Sep 17 00:00:00 2001 From: Alex P Date: Sun, 24 Aug 2025 23:36:29 +0000 Subject: [PATCH] Fix: linting errors --- internal/audio/adaptive_buffer.go | 128 +++++++++++++-------------- internal/audio/adaptive_optimizer.go | 64 +++++++------- internal/audio/batch_audio.go | 4 +- internal/audio/buffer_pool.go | 76 ++++++++-------- internal/audio/input_ipc.go | 89 ++++++++++--------- internal/audio/ipc.go | 44 ++++----- internal/audio/latency_monitor.go | 82 ++++++++--------- internal/audio/memory_metrics.go | 50 +++++------ internal/audio/mic_contention.go | 4 +- internal/audio/output_streaming.go | 30 +++---- internal/audio/priority_scheduler.go | 8 +- internal/audio/relay.go | 2 +- internal/audio/zero_copy.go | 44 ++++----- main.go | 2 +- webrtc.go | 10 +-- 15 files changed, 320 insertions(+), 317 deletions(-) diff --git a/internal/audio/adaptive_buffer.go b/internal/audio/adaptive_buffer.go index dbfdfac..057532b 100644 --- a/internal/audio/adaptive_buffer.go +++ b/internal/audio/adaptive_buffer.go @@ -14,20 +14,20 @@ import ( // AdaptiveBufferConfig holds configuration for adaptive buffer sizing type AdaptiveBufferConfig struct { // Buffer size limits (in frames) - MinBufferSize int - MaxBufferSize int + MinBufferSize int + MaxBufferSize int DefaultBufferSize int - + // System load thresholds - LowCPUThreshold float64 // Below this, increase buffer size - HighCPUThreshold float64 // Above this, decrease buffer size - LowMemoryThreshold float64 // Below this, increase buffer size + LowCPUThreshold float64 // Below this, increase buffer size + HighCPUThreshold float64 // Above this, decrease buffer size + LowMemoryThreshold float64 // Below this, increase buffer size HighMemoryThreshold float64 // Above this, decrease buffer size - + // Latency thresholds (in milliseconds) - TargetLatency time.Duration - MaxLatency time.Duration - + TargetLatency time.Duration + MaxLatency time.Duration + // Adaptation parameters AdaptationInterval time.Duration SmoothingFactor float64 // 0.0-1.0, higher = more responsive @@ -37,25 +37,25 @@ type AdaptiveBufferConfig struct { func DefaultAdaptiveBufferConfig() AdaptiveBufferConfig { return AdaptiveBufferConfig{ // Conservative buffer sizes for 256MB RAM constraint - MinBufferSize: 3, // Minimum 3 frames (slightly higher for stability) - MaxBufferSize: 20, // Maximum 20 frames (increased for high load scenarios) - DefaultBufferSize: 6, // Default 6 frames (increased for better stability) - + MinBufferSize: 3, // Minimum 3 frames (slightly higher for stability) + MaxBufferSize: 20, // Maximum 20 frames (increased for high load scenarios) + DefaultBufferSize: 6, // Default 6 frames (increased for better stability) + // CPU thresholds optimized for single-core ARM Cortex A7 under load - LowCPUThreshold: 20.0, // Below 20% CPU - HighCPUThreshold: 60.0, // Above 60% CPU (lowered to be more responsive) - + LowCPUThreshold: 20.0, // Below 20% CPU + HighCPUThreshold: 60.0, // Above 60% CPU (lowered to be more responsive) + // Memory thresholds for 256MB total RAM LowMemoryThreshold: 35.0, // Below 35% memory usage HighMemoryThreshold: 75.0, // Above 75% memory usage (lowered for earlier response) - + // Latency targets TargetLatency: 20 * time.Millisecond, // Target 20ms latency MaxLatency: 50 * time.Millisecond, // Max acceptable 50ms - + // Adaptation settings AdaptationInterval: 500 * time.Millisecond, // Check every 500ms - SmoothingFactor: 0.3, // Moderate responsiveness + SmoothingFactor: 0.3, // Moderate responsiveness } } @@ -64,38 +64,38 @@ type AdaptiveBufferManager struct { // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) currentInputBufferSize int64 // Current input buffer size (atomic) currentOutputBufferSize int64 // Current output buffer size (atomic) - averageLatency int64 // Average latency in nanoseconds (atomic) - systemCPUPercent int64 // System CPU percentage * 100 (atomic) - systemMemoryPercent int64 // System memory percentage * 100 (atomic) - adaptationCount int64 // Metrics tracking (atomic) - + averageLatency int64 // Average latency in nanoseconds (atomic) + systemCPUPercent int64 // System CPU percentage * 100 (atomic) + systemMemoryPercent int64 // System memory percentage * 100 (atomic) + adaptationCount int64 // Metrics tracking (atomic) + config AdaptiveBufferConfig logger zerolog.Logger processMonitor *ProcessMonitor - + // Control channels ctx context.Context cancel context.CancelFunc wg sync.WaitGroup - + // Metrics tracking - lastAdaptation time.Time + lastAdaptation time.Time mutex sync.RWMutex } // NewAdaptiveBufferManager creates a new adaptive buffer manager func NewAdaptiveBufferManager(config AdaptiveBufferConfig) *AdaptiveBufferManager { ctx, cancel := context.WithCancel(context.Background()) - + return &AdaptiveBufferManager{ currentInputBufferSize: int64(config.DefaultBufferSize), currentOutputBufferSize: int64(config.DefaultBufferSize), - config: config, - logger: logging.GetDefaultLogger().With().Str("component", "adaptive-buffer").Logger(), - processMonitor: GetProcessMonitor(), - ctx: ctx, - cancel: cancel, - lastAdaptation: time.Now(), + config: config, + logger: logging.GetDefaultLogger().With().Str("component", "adaptive-buffer").Logger(), + processMonitor: GetProcessMonitor(), + ctx: ctx, + cancel: cancel, + lastAdaptation: time.Now(), } } @@ -128,7 +128,7 @@ func (abm *AdaptiveBufferManager) UpdateLatency(latency time.Duration) { // Use exponential moving average for latency currentAvg := atomic.LoadInt64(&abm.averageLatency) newLatency := latency.Nanoseconds() - + if currentAvg == 0 { atomic.StoreInt64(&abm.averageLatency, newLatency) } else { @@ -141,10 +141,10 @@ func (abm *AdaptiveBufferManager) UpdateLatency(latency time.Duration) { // adaptationLoop is the main loop that adjusts buffer sizes func (abm *AdaptiveBufferManager) adaptationLoop() { defer abm.wg.Done() - + ticker := time.NewTicker(abm.config.AdaptationInterval) defer ticker.Stop() - + for { select { case <-abm.ctx.Done(): @@ -162,61 +162,61 @@ func (abm *AdaptiveBufferManager) adaptBufferSizes() { if len(metrics) == 0 { return // No metrics available } - + // Calculate system-wide CPU and memory usage totalCPU := 0.0 totalMemory := 0.0 processCount := 0 - + for _, metric := range metrics { totalCPU += metric.CPUPercent totalMemory += metric.MemoryPercent processCount++ } - + if processCount == 0 { return } - + // Store system metrics atomically - systemCPU := totalCPU // Total CPU across all monitored processes + systemCPU := totalCPU // Total CPU across all monitored processes systemMemory := totalMemory / float64(processCount) // Average memory usage - + atomic.StoreInt64(&abm.systemCPUPercent, int64(systemCPU*100)) atomic.StoreInt64(&abm.systemMemoryPercent, int64(systemMemory*100)) - + // Get current latency currentLatencyNs := atomic.LoadInt64(&abm.averageLatency) currentLatency := time.Duration(currentLatencyNs) - + // Calculate adaptation factors cpuFactor := abm.calculateCPUFactor(systemCPU) memoryFactor := abm.calculateMemoryFactor(systemMemory) latencyFactor := abm.calculateLatencyFactor(currentLatency) - + // Combine factors with weights (CPU has highest priority for KVM coexistence) combinedFactor := 0.5*cpuFactor + 0.3*memoryFactor + 0.2*latencyFactor - + // Apply adaptation with smoothing currentInput := float64(atomic.LoadInt64(&abm.currentInputBufferSize)) currentOutput := float64(atomic.LoadInt64(&abm.currentOutputBufferSize)) - + // Calculate new buffer sizes newInputSize := abm.applyAdaptation(currentInput, combinedFactor) newOutputSize := abm.applyAdaptation(currentOutput, combinedFactor) - + // Update buffer sizes if they changed significantly adjustmentMade := false if math.Abs(newInputSize-currentInput) >= 0.5 || math.Abs(newOutputSize-currentOutput) >= 0.5 { atomic.StoreInt64(&abm.currentInputBufferSize, int64(math.Round(newInputSize))) atomic.StoreInt64(&abm.currentOutputBufferSize, int64(math.Round(newOutputSize))) - + atomic.AddInt64(&abm.adaptationCount, 1) abm.mutex.Lock() abm.lastAdaptation = time.Now() abm.mutex.Unlock() adjustmentMade = true - + abm.logger.Debug(). Float64("cpu_percent", systemCPU). Float64("memory_percent", systemMemory). @@ -226,7 +226,7 @@ func (abm *AdaptiveBufferManager) adaptBufferSizes() { Int("new_output_size", int(newOutputSize)). Msg("Adapted buffer sizes") } - + // Update metrics with current state currentInputSize := int(atomic.LoadInt64(&abm.currentInputBufferSize)) currentOutputSize := int(atomic.LoadInt64(&abm.currentOutputBufferSize)) @@ -287,12 +287,12 @@ func (abm *AdaptiveBufferManager) applyAdaptation(currentSize, factor float64) f // Decrease towards min targetSize = currentSize + factor*(currentSize-float64(abm.config.MinBufferSize)) } - + // Apply smoothing newSize := currentSize + abm.config.SmoothingFactor*(targetSize-currentSize) - + // Clamp to valid range - return math.Max(float64(abm.config.MinBufferSize), + return math.Max(float64(abm.config.MinBufferSize), math.Min(float64(abm.config.MaxBufferSize), newSize)) } @@ -301,15 +301,15 @@ func (abm *AdaptiveBufferManager) GetStats() map[string]interface{} { abm.mutex.RLock() lastAdaptation := abm.lastAdaptation abm.mutex.RUnlock() - + return map[string]interface{}{ - "input_buffer_size": abm.GetInputBufferSize(), - "output_buffer_size": abm.GetOutputBufferSize(), - "average_latency_ms": float64(atomic.LoadInt64(&abm.averageLatency)) / 1e6, - "system_cpu_percent": float64(atomic.LoadInt64(&abm.systemCPUPercent)) / 100, + "input_buffer_size": abm.GetInputBufferSize(), + "output_buffer_size": abm.GetOutputBufferSize(), + "average_latency_ms": float64(atomic.LoadInt64(&abm.averageLatency)) / 1e6, + "system_cpu_percent": float64(atomic.LoadInt64(&abm.systemCPUPercent)) / 100, "system_memory_percent": float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / 100, - "adaptation_count": atomic.LoadInt64(&abm.adaptationCount), - "last_adaptation": lastAdaptation, + "adaptation_count": atomic.LoadInt64(&abm.adaptationCount), + "last_adaptation": lastAdaptation, } } @@ -335,4 +335,4 @@ func StopAdaptiveBuffering() { if globalAdaptiveBufferManager != nil { globalAdaptiveBufferManager.Stop() } -} \ No newline at end of file +} diff --git a/internal/audio/adaptive_optimizer.go b/internal/audio/adaptive_optimizer.go index 7aa12fa..e9ac814 100644 --- a/internal/audio/adaptive_optimizer.go +++ b/internal/audio/adaptive_optimizer.go @@ -15,47 +15,44 @@ type AdaptiveOptimizer struct { optimizationCount int64 // Number of optimizations performed (atomic) lastOptimization int64 // Timestamp of last optimization (atomic) optimizationLevel int64 // Current optimization level (0-10) (atomic) - + latencyMonitor *LatencyMonitor bufferManager *AdaptiveBufferManager logger zerolog.Logger - + // Control channels ctx context.Context cancel context.CancelFunc wg sync.WaitGroup - + // Configuration config OptimizerConfig - mutex sync.RWMutex } // OptimizerConfig holds configuration for the adaptive optimizer type OptimizerConfig struct { MaxOptimizationLevel int // Maximum optimization level (0-10) - CooldownPeriod time.Duration // Minimum time between optimizations - Aggressiveness float64 // How aggressively to optimize (0.0-1.0) - RollbackThreshold time.Duration // Latency threshold to rollback optimizations - StabilityPeriod time.Duration // Time to wait for stability after optimization + CooldownPeriod time.Duration // Minimum time between optimizations + Aggressiveness float64 // How aggressively to optimize (0.0-1.0) + RollbackThreshold time.Duration // Latency threshold to rollback optimizations + StabilityPeriod time.Duration // Time to wait for stability after optimization } - - // DefaultOptimizerConfig returns a sensible default configuration func DefaultOptimizerConfig() OptimizerConfig { return OptimizerConfig{ MaxOptimizationLevel: 8, - CooldownPeriod: 30 * time.Second, - Aggressiveness: 0.7, - RollbackThreshold: 300 * time.Millisecond, - StabilityPeriod: 10 * time.Second, + CooldownPeriod: 30 * time.Second, + Aggressiveness: 0.7, + RollbackThreshold: 300 * time.Millisecond, + StabilityPeriod: 10 * time.Second, } } // NewAdaptiveOptimizer creates a new adaptive optimizer func NewAdaptiveOptimizer(latencyMonitor *LatencyMonitor, bufferManager *AdaptiveBufferManager, config OptimizerConfig, logger zerolog.Logger) *AdaptiveOptimizer { ctx, cancel := context.WithCancel(context.Background()) - + optimizer := &AdaptiveOptimizer{ latencyMonitor: latencyMonitor, bufferManager: bufferManager, @@ -64,12 +61,10 @@ func NewAdaptiveOptimizer(latencyMonitor *LatencyMonitor, bufferManager *Adaptiv ctx: ctx, cancel: cancel, } - - // Register as latency monitor callback latencyMonitor.AddOptimizationCallback(optimizer.handleLatencyOptimization) - + return optimizer } @@ -89,26 +84,25 @@ func (ao *AdaptiveOptimizer) Stop() { // initializeStrategies sets up the available optimization strategies - // handleLatencyOptimization is called when latency optimization is needed func (ao *AdaptiveOptimizer) handleLatencyOptimization(metrics LatencyMetrics) error { currentLevel := atomic.LoadInt64(&ao.optimizationLevel) lastOpt := atomic.LoadInt64(&ao.lastOptimization) - + // Check cooldown period if time.Since(time.Unix(0, lastOpt)) < ao.config.CooldownPeriod { return nil } - + // Determine if we need to increase or decrease optimization level targetLevel := ao.calculateTargetOptimizationLevel(metrics) - + if targetLevel > currentLevel { return ao.increaseOptimization(int(targetLevel)) } else if targetLevel < currentLevel { return ao.decreaseOptimization(int(targetLevel)) } - + return nil } @@ -116,7 +110,7 @@ func (ao *AdaptiveOptimizer) handleLatencyOptimization(metrics LatencyMetrics) e func (ao *AdaptiveOptimizer) calculateTargetOptimizationLevel(metrics LatencyMetrics) int64 { // Base calculation on current latency vs target latencyRatio := float64(metrics.Current) / float64(50*time.Millisecond) // 50ms target - + // Adjust based on trend switch metrics.Trend { case LatencyTrendIncreasing: @@ -126,10 +120,10 @@ func (ao *AdaptiveOptimizer) calculateTargetOptimizationLevel(metrics LatencyMet case LatencyTrendVolatile: latencyRatio *= 1.1 // Slightly more aggressive } - + // Apply aggressiveness factor latencyRatio *= ao.config.Aggressiveness - + // Convert to optimization level targetLevel := int64(latencyRatio * 2) // Scale to 0-10 range if targetLevel > int64(ao.config.MaxOptimizationLevel) { @@ -138,7 +132,7 @@ func (ao *AdaptiveOptimizer) calculateTargetOptimizationLevel(metrics LatencyMet if targetLevel < 0 { targetLevel = 0 } - + return targetLevel } @@ -147,7 +141,7 @@ func (ao *AdaptiveOptimizer) increaseOptimization(targetLevel int) error { atomic.StoreInt64(&ao.optimizationLevel, int64(targetLevel)) atomic.StoreInt64(&ao.lastOptimization, time.Now().UnixNano()) atomic.AddInt64(&ao.optimizationCount, 1) - + return nil } @@ -155,17 +149,17 @@ func (ao *AdaptiveOptimizer) increaseOptimization(targetLevel int) error { func (ao *AdaptiveOptimizer) decreaseOptimization(targetLevel int) error { atomic.StoreInt64(&ao.optimizationLevel, int64(targetLevel)) atomic.StoreInt64(&ao.lastOptimization, time.Now().UnixNano()) - + return nil } // optimizationLoop runs the main optimization monitoring loop func (ao *AdaptiveOptimizer) optimizationLoop() { defer ao.wg.Done() - + ticker := time.NewTicker(ao.config.StabilityPeriod) defer ticker.Stop() - + for { select { case <-ao.ctx.Done(): @@ -179,13 +173,15 @@ func (ao *AdaptiveOptimizer) optimizationLoop() { // checkStability monitors system stability and rolls back if needed func (ao *AdaptiveOptimizer) checkStability() { metrics := ao.latencyMonitor.GetMetrics() - + // Check if we need to rollback due to excessive latency if metrics.Current > ao.config.RollbackThreshold { currentLevel := int(atomic.LoadInt64(&ao.optimizationLevel)) if currentLevel > 0 { ao.logger.Warn().Dur("current_latency", metrics.Current).Dur("threshold", ao.config.RollbackThreshold).Msg("Rolling back optimizations due to excessive latency") - ao.decreaseOptimization(currentLevel - 1) + if err := ao.decreaseOptimization(currentLevel - 1); err != nil { + ao.logger.Error().Err(err).Msg("Failed to decrease optimization level") + } } } } @@ -199,4 +195,4 @@ func (ao *AdaptiveOptimizer) GetOptimizationStats() map[string]interface{} { } } -// Strategy implementation methods (stubs for now) \ No newline at end of file +// Strategy implementation methods (stubs for now) diff --git a/internal/audio/batch_audio.go b/internal/audio/batch_audio.go index 3061d48..253e94c 100644 --- a/internal/audio/batch_audio.go +++ b/internal/audio/batch_audio.go @@ -199,12 +199,12 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { start := time.Now() if atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) { runtime.LockOSThread() - + // Set high priority for batch audio processing if err := SetAudioThreadPriority(); err != nil { bap.logger.Warn().Err(err).Msg("Failed to set batch audio processing priority") } - + defer func() { if err := ResetThreadPriority(); err != nil { bap.logger.Warn().Err(err).Msg("Failed to reset thread priority") diff --git a/internal/audio/buffer_pool.go b/internal/audio/buffer_pool.go index 953d55f..cf26e28 100644 --- a/internal/audio/buffer_pool.go +++ b/internal/audio/buffer_pool.go @@ -7,15 +7,15 @@ import ( type AudioBufferPool struct { // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) - currentSize int64 // Current pool size (atomic) - hitCount int64 // Pool hit counter (atomic) - missCount int64 // Pool miss counter (atomic) - + currentSize int64 // Current pool size (atomic) + hitCount int64 // Pool hit counter (atomic) + missCount int64 // Pool miss counter (atomic) + // Other fields - pool sync.Pool - bufferSize int - maxPoolSize int - mutex sync.RWMutex + pool sync.Pool + bufferSize int + maxPoolSize int + mutex sync.RWMutex // Memory optimization fields preallocated []*[]byte // Pre-allocated buffers for immediate use preallocSize int // Number of pre-allocated buffers @@ -25,16 +25,16 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool { // Pre-allocate 20% of max pool size for immediate availability preallocSize := 20 preallocated := make([]*[]byte, 0, preallocSize) - + // Pre-allocate buffers to reduce initial allocation overhead for i := 0; i < preallocSize; i++ { buf := make([]byte, 0, bufferSize) preallocated = append(preallocated, &buf) } - + return &AudioBufferPool{ - bufferSize: bufferSize, - maxPoolSize: 100, // Limit pool size to prevent excessive memory usage + bufferSize: bufferSize, + maxPoolSize: 100, // Limit pool size to prevent excessive memory usage preallocated: preallocated, preallocSize: preallocSize, pool: sync.Pool{ @@ -56,10 +56,10 @@ func (p *AudioBufferPool) Get() []byte { return (*buf)[:0] // Reset length but keep capacity } p.mutex.Unlock() - + // Try sync.Pool next if buf := p.pool.Get(); buf != nil { - bufSlice := buf.([]byte) + bufPtr := buf.(*[]byte) // Update pool size counter when retrieving from pool p.mutex.Lock() if p.currentSize > 0 { @@ -67,9 +67,9 @@ func (p *AudioBufferPool) Get() []byte { } p.mutex.Unlock() atomic.AddInt64(&p.hitCount, 1) - return bufSlice[:0] // Reset length but keep capacity + return (*bufPtr)[:0] // Reset length but keep capacity } - + // Last resort: allocate new buffer atomic.AddInt64(&p.missCount, 1) return make([]byte, 0, p.bufferSize) @@ -82,7 +82,7 @@ func (p *AudioBufferPool) Put(buf []byte) { // Reset buffer for reuse resetBuf := buf[:0] - + // First try to return to pre-allocated pool for fastest reuse p.mutex.Lock() if len(p.preallocated) < p.preallocSize { @@ -102,7 +102,7 @@ func (p *AudioBufferPool) Put(buf []byte) { } // Return to sync.Pool - p.pool.Put(resetBuf) + p.pool.Put(&resetBuf) // Update pool size counter p.mutex.Lock() @@ -137,16 +137,16 @@ func (p *AudioBufferPool) GetPoolStats() AudioBufferPoolDetailedStats { preallocatedCount := len(p.preallocated) currentSize := p.currentSize p.mutex.RUnlock() - + hitCount := atomic.LoadInt64(&p.hitCount) missCount := atomic.LoadInt64(&p.missCount) totalRequests := hitCount + missCount - + var hitRate float64 if totalRequests > 0 { hitRate = float64(hitCount) / float64(totalRequests) * 100 } - + return AudioBufferPoolDetailedStats{ BufferSize: p.bufferSize, MaxPoolSize: p.maxPoolSize, @@ -173,15 +173,15 @@ type AudioBufferPoolDetailedStats struct { // GetAudioBufferPoolStats returns statistics about the audio buffer pools type AudioBufferPoolStats struct { - FramePoolSize int64 - FramePoolMax int - ControlPoolSize int64 - ControlPoolMax int + FramePoolSize int64 + FramePoolMax int + ControlPoolSize int64 + ControlPoolMax int // Enhanced statistics - FramePoolHitRate float64 - ControlPoolHitRate float64 - FramePoolDetails AudioBufferPoolDetailedStats - ControlPoolDetails AudioBufferPoolDetailedStats + FramePoolHitRate float64 + ControlPoolHitRate float64 + FramePoolDetails AudioBufferPoolDetailedStats + ControlPoolDetails AudioBufferPoolDetailedStats } func GetAudioBufferPoolStats() AudioBufferPoolStats { @@ -194,19 +194,19 @@ func GetAudioBufferPoolStats() AudioBufferPoolStats { controlSize := audioControlPool.currentSize controlMax := audioControlPool.maxPoolSize audioControlPool.mutex.RUnlock() - + // Get detailed statistics frameDetails := audioFramePool.GetPoolStats() controlDetails := audioControlPool.GetPoolStats() return AudioBufferPoolStats{ - FramePoolSize: frameSize, - FramePoolMax: frameMax, - ControlPoolSize: controlSize, - ControlPoolMax: controlMax, - FramePoolHitRate: frameDetails.HitRate, - ControlPoolHitRate: controlDetails.HitRate, - FramePoolDetails: frameDetails, - ControlPoolDetails: controlDetails, + FramePoolSize: frameSize, + FramePoolMax: frameMax, + ControlPoolSize: controlSize, + ControlPoolMax: controlMax, + FramePoolHitRate: frameDetails.HitRate, + ControlPoolHitRate: controlDetails.HitRate, + FramePoolDetails: frameDetails, + ControlPoolDetails: controlDetails, } } diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go index 45a20e5..18cef09 100644 --- a/internal/audio/input_ipc.go +++ b/internal/audio/input_ipc.go @@ -49,18 +49,18 @@ type InputIPCMessage struct { // OptimizedIPCMessage represents an optimized message with pre-allocated buffers type OptimizedIPCMessage struct { header [headerSize]byte // Pre-allocated header buffer - data []byte // Reusable data buffer - msg InputIPCMessage // Embedded message + data []byte // Reusable data buffer + msg InputIPCMessage // Embedded message } // MessagePool manages a pool of reusable messages to reduce allocations type MessagePool struct { // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) - hitCount int64 // Pool hit counter (atomic) - missCount int64 // Pool miss counter (atomic) - + hitCount int64 // Pool hit counter (atomic) + missCount int64 // Pool miss counter (atomic) + // Other fields - pool chan *OptimizedIPCMessage + pool chan *OptimizedIPCMessage // Memory optimization fields preallocated []*OptimizedIPCMessage // Pre-allocated messages for immediate use preallocSize int // Number of pre-allocated messages @@ -73,32 +73,37 @@ var globalMessagePool = &MessagePool{ pool: make(chan *OptimizedIPCMessage, messagePoolSize), } -// Initialize the message pool with pre-allocated messages -func init() { - // Pre-allocate 30% of pool size for immediate availability - preallocSize := messagePoolSize * 30 / 100 - globalMessagePool.preallocSize = preallocSize - globalMessagePool.maxPoolSize = messagePoolSize * 2 // Allow growth up to 2x - globalMessagePool.preallocated = make([]*OptimizedIPCMessage, 0, preallocSize) - - // Pre-allocate messages to reduce initial allocation overhead - for i := 0; i < preallocSize; i++ { - msg := &OptimizedIPCMessage{ - data: make([]byte, 0, maxFrameSize), +var messagePoolInitOnce sync.Once + +// initializeMessagePool initializes the message pool with pre-allocated messages +func initializeMessagePool() { + messagePoolInitOnce.Do(func() { + // Pre-allocate 30% of pool size for immediate availability + preallocSize := messagePoolSize * 30 / 100 + globalMessagePool.preallocSize = preallocSize + globalMessagePool.maxPoolSize = messagePoolSize * 2 // Allow growth up to 2x + globalMessagePool.preallocated = make([]*OptimizedIPCMessage, 0, preallocSize) + + // Pre-allocate messages to reduce initial allocation overhead + for i := 0; i < preallocSize; i++ { + msg := &OptimizedIPCMessage{ + data: make([]byte, 0, maxFrameSize), + } + globalMessagePool.preallocated = append(globalMessagePool.preallocated, msg) } - globalMessagePool.preallocated = append(globalMessagePool.preallocated, msg) - } - - // Fill the channel pool with remaining messages - for i := preallocSize; i < messagePoolSize; i++ { - globalMessagePool.pool <- &OptimizedIPCMessage{ - data: make([]byte, 0, maxFrameSize), + + // Fill the channel pool with remaining messages + for i := preallocSize; i < messagePoolSize; i++ { + globalMessagePool.pool <- &OptimizedIPCMessage{ + data: make([]byte, 0, maxFrameSize), + } } - } + }) } // Get retrieves a message from the pool func (mp *MessagePool) Get() *OptimizedIPCMessage { + initializeMessagePool() // First try pre-allocated messages for fastest access mp.mutex.Lock() if len(mp.preallocated) > 0 { @@ -109,7 +114,7 @@ func (mp *MessagePool) Get() *OptimizedIPCMessage { return msg } mp.mutex.Unlock() - + // Try channel pool next select { case msg := <-mp.pool: @@ -129,7 +134,7 @@ func (mp *MessagePool) Put(msg *OptimizedIPCMessage) { // Reset the message for reuse msg.data = msg.data[:0] msg.msg = InputIPCMessage{} - + // First try to return to pre-allocated pool for fastest reuse mp.mutex.Lock() if len(mp.preallocated) < mp.preallocSize { @@ -138,7 +143,7 @@ func (mp *MessagePool) Put(msg *OptimizedIPCMessage) { return } mp.mutex.Unlock() - + // Try channel pool next select { case mp.pool <- msg: @@ -335,7 +340,7 @@ func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error } else { optMsg.data = optMsg.data[:msg.Length] } - + _, err = io.ReadFull(conn, optMsg.data) if err != nil { return nil, err @@ -350,7 +355,7 @@ func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error Length: msg.Length, Timestamp: msg.Timestamp, } - + if msg.Length > 0 { // Copy data to ensure it's not affected by buffer reuse result.Data = make([]byte, msg.Length) @@ -733,7 +738,7 @@ func (ais *AudioInputServer) startProcessorGoroutine() { go func() { runtime.LockOSThread() defer runtime.UnlockOSThread() - + // Set high priority for audio processing logger := logging.GetDefaultLogger().With().Str("component", "audio-input-processor").Logger() if err := SetAudioThreadPriority(); err != nil { @@ -744,7 +749,7 @@ func (ais *AudioInputServer) startProcessorGoroutine() { logger.Warn().Err(err).Msg("Failed to reset thread priority") } }() - + defer ais.wg.Done() for { select { @@ -785,7 +790,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() { go func() { runtime.LockOSThread() defer runtime.UnlockOSThread() - + // Set I/O priority for monitoring logger := logging.GetDefaultLogger().With().Str("component", "audio-input-monitor").Logger() if err := SetAudioIOThreadPriority(); err != nil { @@ -796,11 +801,11 @@ func (ais *AudioInputServer) startMonitorGoroutine() { logger.Warn().Err(err).Msg("Failed to reset thread priority") } }() - + defer ais.wg.Done() ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() - + // Buffer size update ticker (less frequent) bufferUpdateTicker := time.NewTicker(500 * time.Millisecond) defer bufferUpdateTicker.Stop() @@ -835,7 +840,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() { newAvg := (currentAvg + processingTime.Nanoseconds()) / 2 atomic.StoreInt64(&ais.processingTime, newAvg) } - + // Report latency to adaptive buffer manager ais.ReportLatency(latency) @@ -847,7 +852,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() { goto checkBufferUpdate } } - + checkBufferUpdate: // Check if we need to update buffer size select { @@ -888,19 +893,19 @@ func (mp *MessagePool) GetMessagePoolStats() MessagePoolStats { mp.mutex.RLock() preallocatedCount := len(mp.preallocated) mp.mutex.RUnlock() - + hitCount := atomic.LoadInt64(&mp.hitCount) missCount := atomic.LoadInt64(&mp.missCount) totalRequests := hitCount + missCount - + var hitRate float64 if totalRequests > 0 { hitRate = float64(hitCount) / float64(totalRequests) * 100 } - + // Calculate channel pool size channelPoolSize := len(mp.pool) - + return MessagePoolStats{ MaxPoolSize: mp.maxPoolSize, ChannelPoolSize: channelPoolSize, diff --git a/internal/audio/ipc.go b/internal/audio/ipc.go index d58878e..c30bcb1 100644 --- a/internal/audio/ipc.go +++ b/internal/audio/ipc.go @@ -11,18 +11,18 @@ import ( "sync" "sync/atomic" "time" - + "github.com/rs/zerolog" ) const ( - outputMagicNumber uint32 = 0x4A4B4F55 // "JKOU" (JetKVM Output) - outputSocketName = "audio_output.sock" - outputMaxFrameSize = 4096 // Maximum Opus frame size - outputWriteTimeout = 10 * time.Millisecond // Non-blocking write timeout (increased for high load) - outputMaxDroppedFrames = 50 // Maximum consecutive dropped frames - outputHeaderSize = 17 // Fixed header size: 4+1+4+8 bytes - outputMessagePoolSize = 128 // Pre-allocated message pool size + outputMagicNumber uint32 = 0x4A4B4F55 // "JKOU" (JetKVM Output) + outputSocketName = "audio_output.sock" + outputMaxFrameSize = 4096 // Maximum Opus frame size + outputWriteTimeout = 10 * time.Millisecond // Non-blocking write timeout (increased for high load) + outputMaxDroppedFrames = 50 // Maximum consecutive dropped frames + outputHeaderSize = 17 // Fixed header size: 4+1+4+8 bytes + outputMessagePoolSize = 128 // Pre-allocated message pool size ) // OutputMessageType represents the type of IPC message @@ -61,7 +61,7 @@ func NewOutputMessagePool(size int) *OutputMessagePool { pool := &OutputMessagePool{ pool: make(chan *OutputOptimizedMessage, size), } - + // Pre-allocate messages for i := 0; i < size; i++ { msg := &OutputOptimizedMessage{ @@ -69,7 +69,7 @@ func NewOutputMessagePool(size int) *OutputMessagePool { } pool.pool <- msg } - + return pool } @@ -101,10 +101,9 @@ var globalOutputMessagePool = NewOutputMessagePool(outputMessagePoolSize) type AudioServer struct { // Atomic fields must be first for proper alignment on ARM - bufferSize int64 // Current buffer size (atomic) - processingTime int64 // Average processing time in nanoseconds (atomic) - droppedFrames int64 // Dropped frames counter (atomic) - totalFrames int64 // Total frames counter (atomic) + bufferSize int64 // Current buffer size (atomic) + droppedFrames int64 // Dropped frames counter (atomic) + totalFrames int64 // Total frames counter (atomic) listener net.Listener conn net.Conn @@ -115,9 +114,9 @@ type AudioServer struct { messageChan chan *OutputIPCMessage // Buffered channel for incoming messages stopChan chan struct{} // Stop signal wg sync.WaitGroup // Wait group for goroutine coordination - + // Latency monitoring - latencyMonitor *LatencyMonitor + latencyMonitor *LatencyMonitor adaptiveOptimizer *AdaptiveOptimizer } @@ -138,11 +137,11 @@ func NewAudioServer() (*AudioServer, error) { latencyConfig := DefaultLatencyConfig() logger := zerolog.New(os.Stderr).With().Timestamp().Str("component", "audio-server").Logger() latencyMonitor := NewLatencyMonitor(latencyConfig, logger) - + // Initialize adaptive buffer manager with default config bufferConfig := DefaultAdaptiveBufferConfig() bufferManager := NewAdaptiveBufferManager(bufferConfig) - + // Initialize adaptive optimizer optimizerConfig := DefaultOptimizerConfig() adaptiveOptimizer := NewAdaptiveOptimizer(latencyMonitor, bufferManager, optimizerConfig, logger) @@ -216,7 +215,10 @@ func (s *AudioServer) startProcessorGoroutine() { case msg := <-s.messageChan: // Process message (currently just frame sending) if msg.Type == OutputMessageTypeOpusFrame { - s.sendFrameToClient(msg.Data) + if err := s.sendFrameToClient(msg.Data); err != nil { + // Log error but continue processing + atomic.AddInt64(&s.droppedFrames, 1) + } } case <-s.stopChan: return @@ -283,13 +285,13 @@ func (s *AudioServer) SendFrame(frame []byte) error { select { case s.messageChan <- msg: atomic.AddInt64(&s.totalFrames, 1) - + // Record latency for monitoring if s.latencyMonitor != nil { processingTime := time.Since(start) s.latencyMonitor.RecordLatency(processingTime, "ipc_send") } - + return nil default: // Channel full, drop frame to prevent blocking diff --git a/internal/audio/latency_monitor.go b/internal/audio/latency_monitor.go index ec97f68..963ee28 100644 --- a/internal/audio/latency_monitor.go +++ b/internal/audio/latency_monitor.go @@ -19,19 +19,19 @@ type LatencyMonitor struct { latencySamples int64 // Number of latency samples collected (atomic) jitterAccumulator int64 // Accumulated jitter for variance calculation (atomic) lastOptimization int64 // Timestamp of last optimization in nanoseconds (atomic) - + config LatencyConfig logger zerolog.Logger - + // Control channels ctx context.Context cancel context.CancelFunc wg sync.WaitGroup - + // Optimization callbacks optimizationCallbacks []OptimizationCallback - mutex sync.RWMutex - + mutex sync.RWMutex + // Performance tracking latencyHistory []LatencyMeasurement historyMutex sync.RWMutex @@ -39,12 +39,12 @@ type LatencyMonitor struct { // LatencyConfig holds configuration for latency monitoring type LatencyConfig struct { - TargetLatency time.Duration // Target latency to maintain - MaxLatency time.Duration // Maximum acceptable latency + TargetLatency time.Duration // Target latency to maintain + MaxLatency time.Duration // Maximum acceptable latency OptimizationInterval time.Duration // How often to run optimization - HistorySize int // Number of latency measurements to keep - JitterThreshold time.Duration // Jitter threshold for optimization - AdaptiveThreshold float64 // Threshold for adaptive adjustments (0.0-1.0) + HistorySize int // Number of latency measurements to keep + JitterThreshold time.Duration // Jitter threshold for optimization + AdaptiveThreshold float64 // Threshold for adaptive adjustments (0.0-1.0) } // LatencyMeasurement represents a single latency measurement @@ -83,18 +83,18 @@ const ( func DefaultLatencyConfig() LatencyConfig { return LatencyConfig{ TargetLatency: 50 * time.Millisecond, - MaxLatency: 200 * time.Millisecond, + MaxLatency: 200 * time.Millisecond, OptimizationInterval: 5 * time.Second, - HistorySize: 100, - JitterThreshold: 20 * time.Millisecond, - AdaptiveThreshold: 0.8, // Trigger optimization when 80% above target + HistorySize: 100, + JitterThreshold: 20 * time.Millisecond, + AdaptiveThreshold: 0.8, // Trigger optimization when 80% above target } } // NewLatencyMonitor creates a new latency monitoring system func NewLatencyMonitor(config LatencyConfig, logger zerolog.Logger) *LatencyMonitor { ctx, cancel := context.WithCancel(context.Background()) - + return &LatencyMonitor{ config: config, logger: logger.With().Str("component", "latency-monitor").Logger(), @@ -123,11 +123,11 @@ func (lm *LatencyMonitor) Stop() { func (lm *LatencyMonitor) RecordLatency(latency time.Duration, source string) { now := time.Now() latencyNanos := latency.Nanoseconds() - + // Update atomic counters atomic.StoreInt64(&lm.currentLatency, latencyNanos) atomic.AddInt64(&lm.latencySamples, 1) - + // Update min/max for { oldMin := atomic.LoadInt64(&lm.minLatency) @@ -135,26 +135,26 @@ func (lm *LatencyMonitor) RecordLatency(latency time.Duration, source string) { break } } - + for { oldMax := atomic.LoadInt64(&lm.maxLatency) if latencyNanos <= oldMax || atomic.CompareAndSwapInt64(&lm.maxLatency, oldMax, latencyNanos) { break } } - + // Update rolling average using exponential moving average oldAvg := atomic.LoadInt64(&lm.averageLatency) newAvg := oldAvg + (latencyNanos-oldAvg)/10 // Alpha = 0.1 atomic.StoreInt64(&lm.averageLatency, newAvg) - + // Calculate jitter (difference from average) jitter := latencyNanos - newAvg if jitter < 0 { jitter = -jitter } atomic.AddInt64(&lm.jitterAccumulator, jitter) - + // Store in history lm.historyMutex.Lock() measurement := LatencyMeasurement{ @@ -163,7 +163,7 @@ func (lm *LatencyMonitor) RecordLatency(latency time.Duration, source string) { Jitter: time.Duration(jitter), Source: source, } - + if len(lm.latencyHistory) >= lm.config.HistorySize { // Remove oldest measurement copy(lm.latencyHistory, lm.latencyHistory[1:]) @@ -182,12 +182,12 @@ func (lm *LatencyMonitor) GetMetrics() LatencyMetrics { max := atomic.LoadInt64(&lm.maxLatency) samples := atomic.LoadInt64(&lm.latencySamples) jitterSum := atomic.LoadInt64(&lm.jitterAccumulator) - + var jitter time.Duration if samples > 0 { jitter = time.Duration(jitterSum / samples) } - + return LatencyMetrics{ Current: time.Duration(current), Average: time.Duration(average), @@ -209,10 +209,10 @@ func (lm *LatencyMonitor) AddOptimizationCallback(callback OptimizationCallback) // monitoringLoop runs the main monitoring and optimization loop func (lm *LatencyMonitor) monitoringLoop() { defer lm.wg.Done() - + ticker := time.NewTicker(lm.config.OptimizationInterval) defer ticker.Stop() - + for { select { case <-lm.ctx.Done(): @@ -226,44 +226,44 @@ func (lm *LatencyMonitor) monitoringLoop() { // runOptimization checks if optimization is needed and triggers callbacks func (lm *LatencyMonitor) runOptimization() { metrics := lm.GetMetrics() - + // Check if optimization is needed needsOptimization := false - + // Check if current latency exceeds threshold if metrics.Current > lm.config.MaxLatency { needsOptimization = true lm.logger.Warn().Dur("current_latency", metrics.Current).Dur("max_latency", lm.config.MaxLatency).Msg("Latency exceeds maximum threshold") } - + // Check if average latency is above adaptive threshold adaptiveThreshold := time.Duration(float64(lm.config.TargetLatency.Nanoseconds()) * (1.0 + lm.config.AdaptiveThreshold)) if metrics.Average > adaptiveThreshold { needsOptimization = true lm.logger.Info().Dur("average_latency", metrics.Average).Dur("threshold", adaptiveThreshold).Msg("Average latency above adaptive threshold") } - + // Check if jitter is too high if metrics.Jitter > lm.config.JitterThreshold { needsOptimization = true lm.logger.Info().Dur("jitter", metrics.Jitter).Dur("threshold", lm.config.JitterThreshold).Msg("Jitter above threshold") } - + if needsOptimization { atomic.StoreInt64(&lm.lastOptimization, time.Now().UnixNano()) - + // Run optimization callbacks lm.mutex.RLock() callbacks := make([]OptimizationCallback, len(lm.optimizationCallbacks)) copy(callbacks, lm.optimizationCallbacks) lm.mutex.RUnlock() - + for _, callback := range callbacks { if err := callback(metrics); err != nil { lm.logger.Error().Err(err).Msg("Optimization callback failed") } } - + lm.logger.Info().Interface("metrics", metrics).Msg("Latency optimization triggered") } } @@ -272,14 +272,14 @@ func (lm *LatencyMonitor) runOptimization() { func (lm *LatencyMonitor) calculateTrend() LatencyTrend { lm.historyMutex.RLock() defer lm.historyMutex.RUnlock() - + if len(lm.latencyHistory) < 10 { return LatencyTrendStable } - + // Analyze last 10 measurements recentMeasurements := lm.latencyHistory[len(lm.latencyHistory)-10:] - + var increasing, decreasing int for i := 1; i < len(recentMeasurements); i++ { if recentMeasurements[i].Latency > recentMeasurements[i-1].Latency { @@ -288,7 +288,7 @@ func (lm *LatencyMonitor) calculateTrend() LatencyTrend { decreasing++ } } - + // Determine trend based on direction changes if increasing > 6 { return LatencyTrendIncreasing @@ -297,7 +297,7 @@ func (lm *LatencyMonitor) calculateTrend() LatencyTrend { } else if increasing+decreasing > 7 { return LatencyTrendVolatile } - + return LatencyTrendStable } @@ -305,8 +305,8 @@ func (lm *LatencyMonitor) calculateTrend() LatencyTrend { func (lm *LatencyMonitor) GetLatencyHistory() []LatencyMeasurement { lm.historyMutex.RLock() defer lm.historyMutex.RUnlock() - + history := make([]LatencyMeasurement, len(lm.latencyHistory)) copy(history, lm.latencyHistory) return history -} \ No newline at end of file +} diff --git a/internal/audio/memory_metrics.go b/internal/audio/memory_metrics.go index 6732d56..a5129b1 100644 --- a/internal/audio/memory_metrics.go +++ b/internal/audio/memory_metrics.go @@ -13,17 +13,17 @@ import ( // MemoryMetrics provides comprehensive memory allocation statistics type MemoryMetrics struct { // Runtime memory statistics - RuntimeStats RuntimeMemoryStats `json:"runtime_stats"` + RuntimeStats RuntimeMemoryStats `json:"runtime_stats"` // Audio buffer pool statistics - BufferPools AudioBufferPoolStats `json:"buffer_pools"` + BufferPools AudioBufferPoolStats `json:"buffer_pools"` // Zero-copy frame pool statistics - ZeroCopyPool ZeroCopyFramePoolStats `json:"zero_copy_pool"` + ZeroCopyPool ZeroCopyFramePoolStats `json:"zero_copy_pool"` // Message pool statistics - MessagePool MessagePoolStats `json:"message_pool"` + MessagePool MessagePoolStats `json:"message_pool"` // Batch processor statistics - BatchProcessor BatchProcessorMemoryStats `json:"batch_processor,omitempty"` + BatchProcessor BatchProcessorMemoryStats `json:"batch_processor,omitempty"` // Collection timestamp - Timestamp time.Time `json:"timestamp"` + Timestamp time.Time `json:"timestamp"` } // RuntimeMemoryStats provides Go runtime memory statistics @@ -59,10 +59,10 @@ type RuntimeMemoryStats struct { // BatchProcessorMemoryStats provides batch processor memory statistics type BatchProcessorMemoryStats struct { - Initialized bool `json:"initialized"` - Running bool `json:"running"` - Stats BatchAudioStats `json:"stats"` - BufferPool AudioBufferPoolDetailedStats `json:"buffer_pool,omitempty"` + Initialized bool `json:"initialized"` + Running bool `json:"running"` + Stats BatchAudioStats `json:"stats"` + BufferPool AudioBufferPoolDetailedStats `json:"buffer_pool,omitempty"` } // GetBatchAudioProcessor is defined in batch_audio.go @@ -83,7 +83,7 @@ func CollectMemoryMetrics() MemoryMetrics { // Collect runtime memory statistics var m runtime.MemStats runtime.ReadMemStats(&m) - + runtimeStats := RuntimeMemoryStats{ Alloc: m.Alloc, TotalAlloc: m.TotalAlloc, @@ -113,16 +113,16 @@ func CollectMemoryMetrics() MemoryMetrics { NumForcedGC: m.NumForcedGC, GCCPUFraction: m.GCCPUFraction, } - + // Collect audio buffer pool statistics bufferPoolStats := GetAudioBufferPoolStats() - + // Collect zero-copy frame pool statistics zeroCopyStats := GetGlobalZeroCopyPoolStats() - + // Collect message pool statistics messagePoolStats := GetGlobalMessagePoolStats() - + // Collect batch processor statistics if available var batchStats BatchProcessorMemoryStats if processor := GetBatchAudioProcessor(); processor != nil { @@ -131,7 +131,7 @@ func CollectMemoryMetrics() MemoryMetrics { batchStats.Stats = processor.GetStats() // Note: BatchAudioProcessor uses sync.Pool, detailed stats not available } - + return MemoryMetrics{ RuntimeStats: runtimeStats, BufferPools: bufferPoolStats, @@ -145,23 +145,23 @@ func CollectMemoryMetrics() MemoryMetrics { // HandleMemoryMetrics provides an HTTP handler for memory metrics func HandleMemoryMetrics(w http.ResponseWriter, r *http.Request) { logger := getMemoryMetricsLogger() - + if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } - + metrics := CollectMemoryMetrics() - + w.Header().Set("Content-Type", "application/json") w.Header().Set("Cache-Control", "no-cache") - + if err := json.NewEncoder(w).Encode(metrics); err != nil { logger.Error().Err(err).Msg("failed to encode memory metrics") http.Error(w, "Internal server error", http.StatusInternalServerError) return } - + logger.Debug().Msg("memory metrics served") } @@ -169,7 +169,7 @@ func HandleMemoryMetrics(w http.ResponseWriter, r *http.Request) { func LogMemoryMetrics() { logger := getMemoryMetricsLogger() metrics := CollectMemoryMetrics() - + logger.Info(). Uint64("heap_alloc_mb", metrics.RuntimeStats.HeapAlloc/1024/1024). Uint64("heap_sys_mb", metrics.RuntimeStats.HeapSys/1024/1024). @@ -186,13 +186,13 @@ func LogMemoryMetrics() { func StartMemoryMetricsLogging(interval time.Duration) { logger := getMemoryMetricsLogger() logger.Info().Dur("interval", interval).Msg("starting memory metrics logging") - + go func() { ticker := time.NewTicker(interval) defer ticker.Stop() - + for range ticker.C { LogMemoryMetrics() } }() -} \ No newline at end of file +} diff --git a/internal/audio/mic_contention.go b/internal/audio/mic_contention.go index a62c1dc..8b06630 100644 --- a/internal/audio/mic_contention.go +++ b/internal/audio/mic_contention.go @@ -12,8 +12,8 @@ type MicrophoneContentionManager struct { lastOpNano int64 cooldownNanos int64 operationID int64 - - lockPtr unsafe.Pointer + + lockPtr unsafe.Pointer } func NewMicrophoneContentionManager(cooldown time.Duration) *MicrophoneContentionManager { diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index 78ac33e..bef3eae 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -61,9 +61,9 @@ func NewOutputStreamer() (*OutputStreamer, error) { bufferPool: NewAudioBufferPool(MaxAudioFrameSize), // Use existing buffer pool ctx: ctx, cancel: cancel, - batchSize: initialBatchSize, // Use adaptive batch size - processingChan: make(chan []byte, 500), // Large buffer for smooth processing - statsInterval: 5 * time.Second, // Statistics every 5 seconds + batchSize: initialBatchSize, // Use adaptive batch size + processingChan: make(chan []byte, 500), // Large buffer for smooth processing + statsInterval: 5 * time.Second, // Statistics every 5 seconds lastStatsTime: time.Now().UnixNano(), }, nil } @@ -85,9 +85,9 @@ func (s *OutputStreamer) Start() error { // Start multiple goroutines for optimal performance s.wg.Add(3) - go s.streamLoop() // Main streaming loop - go s.processingLoop() // Frame processing loop - go s.statisticsLoop() // Performance monitoring loop + go s.streamLoop() // Main streaming loop + go s.processingLoop() // Frame processing loop + go s.statisticsLoop() // Performance monitoring loop return nil } @@ -125,7 +125,7 @@ func (s *OutputStreamer) streamLoop() { frameInterval := time.Duration(20) * time.Millisecond // 50 FPS base rate ticker := time.NewTicker(frameInterval) defer ticker.Stop() - + // Batch size update ticker batchUpdateTicker := time.NewTicker(500 * time.Millisecond) defer batchUpdateTicker.Stop() @@ -181,7 +181,7 @@ func (s *OutputStreamer) processingLoop() { // Pin goroutine to OS thread for consistent performance runtime.LockOSThread() defer runtime.UnlockOSThread() - + // Set high priority for audio output processing if err := SetAudioThreadPriority(); err != nil { getOutputStreamingLogger().Warn().Err(err).Msg("Failed to set audio output processing priority") @@ -192,7 +192,7 @@ func (s *OutputStreamer) processingLoop() { } }() - for _ = range s.processingChan { + for range s.processingChan { // Process frame (currently just receiving, but can be extended) if _, err := s.client.ReceiveFrame(); err != nil { if s.client.IsConnected() { @@ -260,13 +260,13 @@ func (s *OutputStreamer) GetDetailedStats() map[string]interface{} { processingTime := atomic.LoadInt64(&s.processingTime) stats := map[string]interface{}{ - "processed_frames": processed, - "dropped_frames": dropped, + "processed_frames": processed, + "dropped_frames": dropped, "avg_processing_time_ns": processingTime, - "batch_size": s.batchSize, - "channel_buffer_size": cap(s.processingChan), - "channel_current_size": len(s.processingChan), - "connected": s.client.IsConnected(), + "batch_size": s.batchSize, + "channel_buffer_size": cap(s.processingChan), + "channel_current_size": len(s.processingChan), + "connected": s.client.IsConnected(), } if processed+dropped > 0 { diff --git a/internal/audio/priority_scheduler.go b/internal/audio/priority_scheduler.go index c119d55..ffd8595 100644 --- a/internal/audio/priority_scheduler.go +++ b/internal/audio/priority_scheduler.go @@ -22,7 +22,7 @@ const ( AudioHighPriority = 80 // High priority for critical audio processing AudioMediumPriority = 60 // Medium priority for regular audio processing AudioLowPriority = 40 // Low priority for background audio tasks - + // SCHED_NORMAL is the default (priority 0) NormalPriority = 0 ) @@ -36,14 +36,14 @@ const ( // PriorityScheduler manages thread priorities for audio processing type PriorityScheduler struct { - logger zerolog.Logger + logger zerolog.Logger enabled bool } // NewPriorityScheduler creates a new priority scheduler func NewPriorityScheduler() *PriorityScheduler { return &PriorityScheduler{ - logger: logging.GetDefaultLogger().With().Str("component", "priority-scheduler").Logger(), + logger: logging.GetDefaultLogger().With().Str("component", "priority-scheduler").Logger(), enabled: true, } } @@ -162,4 +162,4 @@ func SetAudioIOThreadPriority() error { // ResetThreadPriority is a convenience function to reset thread priority func ResetThreadPriority() error { return GetPriorityScheduler().ResetPriority() -} \ No newline at end of file +} diff --git a/internal/audio/relay.go b/internal/audio/relay.go index 93d1bca..c87f94b 100644 --- a/internal/audio/relay.go +++ b/internal/audio/relay.go @@ -17,7 +17,7 @@ type AudioRelay struct { // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) framesRelayed int64 framesDropped int64 - + client *AudioClient ctx context.Context cancel context.CancelFunc diff --git a/internal/audio/zero_copy.go b/internal/audio/zero_copy.go index 5a7cb95..5d0cfb9 100644 --- a/internal/audio/zero_copy.go +++ b/internal/audio/zero_copy.go @@ -20,14 +20,14 @@ type ZeroCopyAudioFrame struct { // ZeroCopyFramePool manages reusable zero-copy audio frames type ZeroCopyFramePool struct { // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) - counter int64 // Frame counter (atomic) - hitCount int64 // Pool hit counter (atomic) - missCount int64 // Pool miss counter (atomic) - + counter int64 // Frame counter (atomic) + hitCount int64 // Pool hit counter (atomic) + missCount int64 // Pool miss counter (atomic) + // Other fields - pool sync.Pool - maxSize int - mutex sync.RWMutex + pool sync.Pool + maxSize int + mutex sync.RWMutex // Memory optimization fields preallocated []*ZeroCopyAudioFrame // Pre-allocated frames for immediate use preallocSize int // Number of pre-allocated frames @@ -40,7 +40,7 @@ func NewZeroCopyFramePool(maxFrameSize int) *ZeroCopyFramePool { preallocSize := 15 maxPoolSize := 50 // Limit total pool size preallocated := make([]*ZeroCopyAudioFrame, 0, preallocSize) - + // Pre-allocate frames to reduce initial allocation overhead for i := 0; i < preallocSize; i++ { frame := &ZeroCopyAudioFrame{ @@ -50,7 +50,7 @@ func NewZeroCopyFramePool(maxFrameSize int) *ZeroCopyFramePool { } preallocated = append(preallocated, frame) } - + return &ZeroCopyFramePool{ maxSize: maxFrameSize, preallocated: preallocated, @@ -76,18 +76,18 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame { frame := p.preallocated[len(p.preallocated)-1] p.preallocated = p.preallocated[:len(p.preallocated)-1] p.mutex.Unlock() - + frame.mutex.Lock() frame.refCount = 1 frame.length = 0 frame.data = frame.data[:0] frame.mutex.Unlock() - + atomic.AddInt64(&p.hitCount, 1) return frame } p.mutex.Unlock() - + // Try sync.Pool next frame := p.pool.Get().(*ZeroCopyAudioFrame) frame.mutex.Lock() @@ -95,7 +95,7 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame { frame.length = 0 frame.data = frame.data[:0] frame.mutex.Unlock() - + atomic.AddInt64(&p.hitCount, 1) return frame } @@ -113,7 +113,7 @@ func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) { frame.length = 0 frame.data = frame.data[:0] frame.mutex.Unlock() - + // First try to return to pre-allocated pool for fastest reuse p.mutex.Lock() if len(p.preallocated) < p.preallocSize { @@ -122,16 +122,16 @@ func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) { return } p.mutex.Unlock() - + // Check pool size limit to prevent excessive memory usage p.mutex.RLock() currentCount := atomic.LoadInt64(&p.counter) p.mutex.RUnlock() - + if currentCount >= int64(p.maxPoolSize) { return // Pool is full, let GC handle this frame } - + // Return to sync.Pool p.pool.Put(frame) atomic.AddInt64(&p.counter, 1) @@ -227,16 +227,16 @@ func (p *ZeroCopyFramePool) GetZeroCopyPoolStats() ZeroCopyFramePoolStats { preallocatedCount := len(p.preallocated) currentCount := atomic.LoadInt64(&p.counter) p.mutex.RUnlock() - + hitCount := atomic.LoadInt64(&p.hitCount) missCount := atomic.LoadInt64(&p.missCount) totalRequests := hitCount + missCount - + var hitRate float64 if totalRequests > 0 { hitRate = float64(hitCount) / float64(totalRequests) * 100 } - + return ZeroCopyFramePoolStats{ MaxFrameSize: p.maxSize, MaxPoolSize: p.maxPoolSize, @@ -283,7 +283,7 @@ func PutZeroCopyFrame(frame *ZeroCopyAudioFrame) { // ZeroCopyAudioReadEncode performs audio read and encode with zero-copy optimization func ZeroCopyAudioReadEncode() (*ZeroCopyAudioFrame, error) { frame := GetZeroCopyFrame() - + // Ensure frame has enough capacity if frame.Capacity() < MaxAudioFrameSize { // Reallocate if needed @@ -311,4 +311,4 @@ func ZeroCopyAudioReadEncode() (*ZeroCopyAudioFrame, error) { frame.mutex.Unlock() return frame, nil -} \ No newline at end of file +} diff --git a/main.go b/main.go index b2d2be9..56f6917 100644 --- a/main.go +++ b/main.go @@ -33,7 +33,7 @@ func runAudioServer() { func startAudioSubprocess() error { // Start adaptive buffer management for optimal performance audio.StartAdaptiveBuffering() - + // Create audio server supervisor audioSupervisor = audio.NewAudioServerSupervisor() diff --git a/webrtc.go b/webrtc.go index 415eb0d..8966fb4 100644 --- a/webrtc.go +++ b/webrtc.go @@ -30,12 +30,12 @@ type Session struct { AudioInputManager *audio.AudioInputManager shouldUmountVirtualMedia bool // Microphone operation throttling - micCooldown time.Duration + micCooldown time.Duration // Audio frame processing - audioFrameChan chan []byte - audioStopChan chan struct{} - audioWg sync.WaitGroup - rpcQueue chan webrtc.DataChannelMessage + audioFrameChan chan []byte + audioStopChan chan struct{} + audioWg sync.WaitGroup + rpcQueue chan webrtc.DataChannelMessage } type SessionConfig struct {