From 8110be6cc6e5848fa15c2ddf3c2b1eb5677f6d70 Mon Sep 17 00:00:00 2001 From: Alex P Date: Wed, 3 Sep 2025 12:10:54 +0000 Subject: [PATCH] feat(audio): optimize audio processing with batch processing and goroutine monitoring - Add batch audio processing to reduce CGO call overhead - Implement goroutine monitoring and cleanup for leak prevention - Optimize buffer pool with TTL-based cache and latency-aware cleanup - Add configurable parameters for batch processing and monitoring - Improve CGO audio read performance with config caching --- .gitignore | 3 +- internal/audio/batch_audio.go | 27 ++- internal/audio/buffer_pool.go | 295 +++++++++++++++++++++++++--- internal/audio/cgo_audio.go | 53 ++++- internal/audio/config_constants.go | 18 ++ internal/audio/goroutine_monitor.go | 145 ++++++++++++++ internal/audio/input.go | 12 +- internal/audio/output_streaming.go | 6 + main.go | 12 ++ 9 files changed, 528 insertions(+), 43 deletions(-) create mode 100644 internal/audio/goroutine_monitor.go diff --git a/.gitignore b/.gitignore index 21e27b25..99f80f9c 100644 --- a/.gitignore +++ b/.gitignore @@ -9,4 +9,5 @@ tmp/ *.tmp *.code-workspace -device-tests.tar.gz \ No newline at end of file +device-tests.tar.gz +CLAUDE.md diff --git a/internal/audio/batch_audio.go b/internal/audio/batch_audio.go index 556b6ab1..a7b7537f 100644 --- a/internal/audio/batch_audio.go +++ b/internal/audio/batch_audio.go @@ -213,16 +213,32 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { return } - // Pin to OS thread for the entire batch to minimize thread switching overhead + // Only pin to OS thread for large batches to reduce thread contention start := time.Now() - if atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) { + shouldPinThread := len(batch) >= GetConfig().MinBatchSizeForThreadPinning + + // Track if we pinned the thread in this call + threadWasPinned := false + + if shouldPinThread && atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) { + threadWasPinned = true runtime.LockOSThread() // Set high priority for batch audio processing if err := SetAudioThreadPriority(); err != nil { bap.logger.Warn().Err(err).Msg("failed to set batch audio processing priority") } + } + batchSize := len(batch) + atomic.AddInt64(&bap.stats.BatchedReads, 1) + atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize)) + if batchSize > 1 { + atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1)) + } + + // Add deferred function to release thread lock if we pinned it + if threadWasPinned { defer func() { if err := ResetThreadPriority(); err != nil { bap.logger.Warn().Err(err).Msg("failed to reset thread priority") @@ -233,13 +249,6 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { }() } - batchSize := len(batch) - atomic.AddInt64(&bap.stats.BatchedReads, 1) - atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize)) - if batchSize > 1 { - atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1)) - } - // Process each request in the batch for _, req := range batch { length, err := CGOAudioReadEncode(req.buffer) diff --git a/internal/audio/buffer_pool.go b/internal/audio/buffer_pool.go index e4638f4e..15915347 100644 --- a/internal/audio/buffer_pool.go +++ b/internal/audio/buffer_pool.go @@ -1,24 +1,118 @@ +//go:build cgo +// +build cgo + package audio import ( "runtime" + "sort" "sync" "sync/atomic" "time" "unsafe" ) +// AudioLatencyInfo holds simplified latency information for cleanup decisions +type AudioLatencyInfo struct { + LatencyMs float64 + Timestamp time.Time +} + +// Global latency tracking +var ( + currentAudioLatency = AudioLatencyInfo{} + currentAudioLatencyLock sync.RWMutex + audioMonitoringInitialized int32 // Atomic flag to track initialization +) + +// InitializeAudioMonitoring starts the background goroutines for latency tracking and cache cleanup +// This is safe to call multiple times as it will only initialize once +func InitializeAudioMonitoring() { + // Use atomic CAS to ensure we only initialize once + if atomic.CompareAndSwapInt32(&audioMonitoringInitialized, 0, 1) { + // Start the latency recorder + startLatencyRecorder() + + // Start the cleanup goroutine + startCleanupGoroutine() + } +} + +// latencyChannel is used for non-blocking latency recording +var latencyChannel = make(chan float64, 10) + +// startLatencyRecorder starts the latency recorder goroutine +// This should be called during package initialization +func startLatencyRecorder() { + go latencyRecorderLoop() +} + +// latencyRecorderLoop processes latency recordings in the background +func latencyRecorderLoop() { + for latencyMs := range latencyChannel { + currentAudioLatencyLock.Lock() + currentAudioLatency = AudioLatencyInfo{ + LatencyMs: latencyMs, + Timestamp: time.Now(), + } + currentAudioLatencyLock.Unlock() + } +} + +// RecordAudioLatency records the current audio processing latency +// This is called from the audio input manager when latency is measured +// It is non-blocking to ensure zero overhead in the critical audio path +func RecordAudioLatency(latencyMs float64) { + // Non-blocking send - if channel is full, we drop the update + select { + case latencyChannel <- latencyMs: + // Successfully sent + default: + // Channel full, drop this update to avoid blocking the audio path + } +} + +// GetAudioLatencyMetrics returns the current audio latency information +// Returns nil if no latency data is available or if it's too old +func GetAudioLatencyMetrics() *AudioLatencyInfo { + currentAudioLatencyLock.RLock() + defer currentAudioLatencyLock.RUnlock() + + // Check if we have valid latency data + if currentAudioLatency.Timestamp.IsZero() { + return nil + } + + // Check if the data is too old (more than 5 seconds) + if time.Since(currentAudioLatency.Timestamp) > 5*time.Second { + return nil + } + + return &AudioLatencyInfo{ + LatencyMs: currentAudioLatency.LatencyMs, + Timestamp: currentAudioLatency.Timestamp, + } +} + // Lock-free buffer cache for per-goroutine optimization type lockFreeBufferCache struct { buffers [4]*[]byte // Small fixed-size array for lock-free access } +// TTL tracking for goroutine cache entries +type cacheEntry struct { + cache *lockFreeBufferCache + lastAccess int64 // Unix timestamp of last access + gid int64 // Goroutine ID for better tracking +} + // Per-goroutine buffer cache using goroutine-local storage var goroutineBufferCache = make(map[int64]*lockFreeBufferCache) var goroutineCacheMutex sync.RWMutex -var lastCleanupTime int64 // Unix timestamp of last cleanup -const maxCacheSize = 1000 // Maximum number of goroutine caches -const cleanupInterval = 300 // Cleanup interval in seconds (5 minutes) +var lastCleanupTime int64 // Unix timestamp of last cleanup +const maxCacheSize = 500 // Maximum number of goroutine caches (reduced from 1000) +const cleanupInterval int64 = 30 // Cleanup interval in seconds (30 seconds, reduced from 60) +const bufferTTL int64 = 60 // Time-to-live for cached buffers in seconds (1 minute, reduced from 2) // getGoroutineID extracts goroutine ID from runtime stack for cache key func getGoroutineID() int64 { @@ -39,13 +133,67 @@ func getGoroutineID() int64 { return 0 } -// cleanupGoroutineCache removes stale entries from the goroutine cache -func cleanupGoroutineCache() { +// Map of goroutine ID to cache entry with TTL tracking +var goroutineCacheWithTTL = make(map[int64]*cacheEntry) + +// cleanupChannel is used for asynchronous cleanup requests +var cleanupChannel = make(chan struct{}, 1) + +// startCleanupGoroutine starts the cleanup goroutine +// This should be called during package initialization +func startCleanupGoroutine() { + go cleanupLoop() +} + +// cleanupLoop processes cleanup requests in the background +func cleanupLoop() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-cleanupChannel: + // Received explicit cleanup request + performCleanup(true) + case <-ticker.C: + // Regular cleanup check + performCleanup(false) + } + } +} + +// requestCleanup signals the cleanup goroutine to perform a cleanup +// This is non-blocking and can be called from the critical path +func requestCleanup() { + select { + case cleanupChannel <- struct{}{}: + // Successfully requested cleanup + default: + // Channel full, cleanup already pending + } +} + +// performCleanup does the actual cache cleanup work +// This runs in a dedicated goroutine, not in the critical path +func performCleanup(forced bool) { now := time.Now().Unix() lastCleanup := atomic.LoadInt64(&lastCleanupTime) - // Only cleanup if enough time has passed - if now-lastCleanup < cleanupInterval { + // Check if we're in a high-latency situation + isHighLatency := false + latencyMetrics := GetAudioLatencyMetrics() + if latencyMetrics != nil && latencyMetrics.LatencyMs > 10.0 { + // Under high latency, be more aggressive with cleanup + isHighLatency = true + } + + // Only cleanup if enough time has passed (less time if high latency) or if forced + interval := cleanupInterval + if isHighLatency { + interval = cleanupInterval / 2 // More frequent cleanup under high latency + } + + if !forced && now-lastCleanup < interval { return } @@ -54,23 +202,93 @@ func cleanupGoroutineCache() { return // Another goroutine is already cleaning up } + // Perform the actual cleanup + doCleanupGoroutineCache() +} + +// cleanupGoroutineCache triggers an asynchronous cleanup of the goroutine cache +// This is safe to call from the critical path as it's non-blocking +func cleanupGoroutineCache() { + // Request asynchronous cleanup + requestCleanup() +} + +// The actual cleanup implementation that runs in the background goroutine +func doCleanupGoroutineCache() { + // Get current time for TTL calculations + now := time.Now().Unix() + + // Check if we're in a high-latency situation + isHighLatency := false + latencyMetrics := GetAudioLatencyMetrics() + if latencyMetrics != nil && latencyMetrics.LatencyMs > 10.0 { + // Under high latency, be more aggressive with cleanup + isHighLatency = true + } + goroutineCacheMutex.Lock() defer goroutineCacheMutex.Unlock() - // If cache is too large, remove oldest entries (simple FIFO) - if len(goroutineBufferCache) > maxCacheSize { - // Remove half of the entries to avoid frequent cleanups - toRemove := len(goroutineBufferCache) - maxCacheSize/2 - count := 0 - for gid := range goroutineBufferCache { - delete(goroutineBufferCache, gid) - count++ - if count >= toRemove { - break + // Convert old cache format to new TTL-based format if needed + if len(goroutineCacheWithTTL) == 0 && len(goroutineBufferCache) > 0 { + for gid, cache := range goroutineBufferCache { + goroutineCacheWithTTL[gid] = &cacheEntry{ + cache: cache, + lastAccess: now, } } - // Log cleanup for debugging (removed logging dependency) - _ = count // Avoid unused variable warning + // Clear old cache to free memory + goroutineBufferCache = make(map[int64]*lockFreeBufferCache) + } + + // Remove stale entries based on TTL (more aggressive under high latency) + expiredCount := 0 + ttl := bufferTTL + if isHighLatency { + // Under high latency, use a much shorter TTL + ttl = bufferTTL / 4 + } + + for gid, entry := range goroutineCacheWithTTL { + // Both now and entry.lastAccess are int64, so this comparison is safe + if now-entry.lastAccess > ttl { + delete(goroutineCacheWithTTL, gid) + expiredCount++ + } + } + + // If cache is still too large after TTL cleanup, remove oldest entries + // Under high latency, use a more aggressive target size + targetSize := maxCacheSize + targetReduction := maxCacheSize / 2 + + if isHighLatency { + // Under high latency, target a much smaller cache size + targetSize = maxCacheSize / 4 + targetReduction = maxCacheSize / 8 + } + + if len(goroutineCacheWithTTL) > targetSize { + // Find oldest entries + type ageEntry struct { + gid int64 + lastAccess int64 + } + oldestEntries := make([]ageEntry, 0, len(goroutineCacheWithTTL)) + for gid, entry := range goroutineCacheWithTTL { + oldestEntries = append(oldestEntries, ageEntry{gid, entry.lastAccess}) + } + + // Sort by lastAccess (oldest first) + sort.Slice(oldestEntries, func(i, j int) bool { + return oldestEntries[i].lastAccess < oldestEntries[j].lastAccess + }) + + // Remove oldest entries to get down to target reduction size + toRemove := len(goroutineCacheWithTTL) - targetReduction + for i := 0; i < toRemove && i < len(oldestEntries); i++ { + delete(goroutineCacheWithTTL, oldestEntries[i].gid) + } } } @@ -151,7 +369,18 @@ func (p *AudioBufferPool) Get() []byte { // Fast path: Try lock-free per-goroutine cache first gid := getGoroutineID() goroutineCacheMutex.RLock() - cache, exists := goroutineBufferCache[gid] + + // Try new TTL-based cache first + cacheEntry, exists := goroutineCacheWithTTL[gid] + var cache *lockFreeBufferCache + if exists && cacheEntry != nil { + cache = cacheEntry.cache + // Update last access time + cacheEntry.lastAccess = time.Now().Unix() + } else { + // Fall back to legacy cache if needed + cache, exists = goroutineBufferCache[gid] + } goroutineCacheMutex.RUnlock() if exists && cache != nil { @@ -190,6 +419,8 @@ func (p *AudioBufferPool) Get() []byte { buf := poolBuf.(*[]byte) // Update hit counter atomic.AddInt64(&p.hitCount, 1) + // Decrement pool size counter atomically + atomic.AddInt64(&p.currentSize, -1) // Ensure buffer is properly reset and check capacity if cap(*buf) >= p.bufferSize { wasHit = true @@ -230,15 +461,32 @@ func (p *AudioBufferPool) Put(buf []byte) { // Fast path: Try to put in lock-free per-goroutine cache gid := getGoroutineID() + now := time.Now().Unix() + + // Check if we have a TTL-based cache entry for this goroutine goroutineCacheMutex.RLock() - cache, exists := goroutineBufferCache[gid] + entryWithTTL, exists := goroutineCacheWithTTL[gid] + var cache *lockFreeBufferCache + if exists && entryWithTTL != nil { + cache = entryWithTTL.cache + // Update last access time + entryWithTTL.lastAccess = now + } else { + // Fall back to legacy cache if needed + cache, exists = goroutineBufferCache[gid] + } goroutineCacheMutex.RUnlock() if !exists { // Create new cache for this goroutine cache = &lockFreeBufferCache{} goroutineCacheMutex.Lock() - goroutineBufferCache[gid] = cache + // Store in TTL-based cache + goroutineCacheWithTTL[gid] = &cacheEntry{ + cache: cache, + lastAccess: now, + gid: gid, + } goroutineCacheMutex.Unlock() } @@ -267,9 +515,8 @@ func (p *AudioBufferPool) Put(buf []byte) { return // Pool is full, let GC handle this buffer } - // Return to sync.Pool + // Return to sync.Pool and update counter atomically p.pool.Put(&resetBuf) - // Update pool size counter atomically atomic.AddInt64(&p.currentSize, 1) } diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index 0eb24e41..13839a5a 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -5,6 +5,8 @@ package audio import ( "errors" "fmt" + "sync" + "time" "unsafe" ) @@ -703,26 +705,63 @@ func cgoAudioClose() { C.jetkvm_audio_close() } +// Cache config values to avoid repeated GetConfig() calls in hot path +var ( + cachedMinReadEncodeBuffer int + configCacheMutex sync.RWMutex + lastConfigUpdate time.Time + configCacheExpiry = 5 * time.Second +) + +// updateConfigCache refreshes the cached config values if needed +func updateConfigCache() { + configCacheMutex.RLock() + cacheExpired := time.Since(lastConfigUpdate) > configCacheExpiry + configCacheMutex.RUnlock() + + if cacheExpired { + configCacheMutex.Lock() + defer configCacheMutex.Unlock() + // Double-check after acquiring lock + if time.Since(lastConfigUpdate) > configCacheExpiry { + cachedMinReadEncodeBuffer = GetConfig().MinReadEncodeBuffer + lastConfigUpdate = time.Now() + } + } +} + func cgoAudioReadEncode(buf []byte) (int, error) { - minRequired := GetConfig().MinReadEncodeBuffer + // Use cached config values to avoid GetConfig() in hot path + updateConfigCache() + + // Fast validation with cached values + configCacheMutex.RLock() + minRequired := cachedMinReadEncodeBuffer + configCacheMutex.RUnlock() + if len(buf) < minRequired { return 0, newBufferTooSmallError(len(buf), minRequired) } // Skip initialization check for now to avoid CGO compilation issues - // TODO: Add proper initialization state checking // Note: The C code already has comprehensive state tracking with capture_initialized, // capture_initializing, playback_initialized, and playback_initializing flags. - // When CGO environment is properly configured, this should check C.capture_initialized. + // Direct CGO call with minimal overhead n := C.jetkvm_audio_read_encode(unsafe.Pointer(&buf[0])) + + // Fast path for success case + if n > 0 { + return int(n), nil + } + + // Handle error cases if n < 0 { return 0, newAudioReadEncodeError(int(n)) } - if n == 0 { - return 0, nil // No data available - } - return int(n), nil + + // n == 0 case + return 0, nil // No data available } // Audio playback functions diff --git a/internal/audio/config_constants.go b/internal/audio/config_constants.go index fc84b895..1c81c770 100644 --- a/internal/audio/config_constants.go +++ b/internal/audio/config_constants.go @@ -379,6 +379,18 @@ type AudioConfigConstants struct { // Default 4096 bytes handles maximum audio frame size with safety margin. MaxDecodeWriteBuffer int + // MinBatchSizeForThreadPinning defines the minimum batch size required to pin a thread. + // Used in: batch_audio.go for deciding when to pin a thread for batch processing. + // Impact: Smaller values increase thread pinning frequency but may improve performance. + // Default 5 frames provides a good balance between performance and thread contention. + MinBatchSizeForThreadPinning int + + // GoroutineMonitorInterval defines the interval for monitoring goroutine counts. + // Used in: goroutine_monitor.go for periodic goroutine count checks. + // Impact: Shorter intervals provide more frequent monitoring but increase overhead. + // Default 30 seconds provides reasonable monitoring frequency with minimal overhead. + GoroutineMonitorInterval time.Duration + // IPC Configuration - Inter-Process Communication settings for audio components // Used in: ipc.go for configuring audio process communication // Impact: Controls IPC reliability, performance, and protocol compliance @@ -2464,6 +2476,12 @@ func DefaultAudioConfig() *AudioConfigConstants { LatencyBucket500ms: 500 * time.Millisecond, // 500ms latency bucket LatencyBucket1s: 1 * time.Second, // 1s latency bucket LatencyBucket2s: 2 * time.Second, // 2s latency bucket + + // Batch Audio Processing Configuration + MinBatchSizeForThreadPinning: 5, // Minimum batch size to pin thread + + // Goroutine Monitoring Configuration + GoroutineMonitorInterval: 30 * time.Second, // 30s monitoring interval } } diff --git a/internal/audio/goroutine_monitor.go b/internal/audio/goroutine_monitor.go new file mode 100644 index 00000000..2c1d6d4d --- /dev/null +++ b/internal/audio/goroutine_monitor.go @@ -0,0 +1,145 @@ +package audio + +import ( + "runtime" + "sync/atomic" + "time" + + "github.com/jetkvm/kvm/internal/logging" +) + +// GoroutineMonitor tracks goroutine count and provides cleanup mechanisms +type GoroutineMonitor struct { + baselineCount int + peakCount int + lastCount int + monitorInterval time.Duration + lastCheck time.Time + enabled int32 +} + +// Global goroutine monitor instance +var globalGoroutineMonitor *GoroutineMonitor + +// NewGoroutineMonitor creates a new goroutine monitor +func NewGoroutineMonitor(monitorInterval time.Duration) *GoroutineMonitor { + if monitorInterval <= 0 { + monitorInterval = 30 * time.Second + } + + // Get current goroutine count as baseline + baselineCount := runtime.NumGoroutine() + + return &GoroutineMonitor{ + baselineCount: baselineCount, + peakCount: baselineCount, + lastCount: baselineCount, + monitorInterval: monitorInterval, + lastCheck: time.Now(), + } +} + +// Start begins goroutine monitoring +func (gm *GoroutineMonitor) Start() { + if !atomic.CompareAndSwapInt32(&gm.enabled, 0, 1) { + return // Already running + } + + go gm.monitorLoop() +} + +// Stop stops goroutine monitoring +func (gm *GoroutineMonitor) Stop() { + atomic.StoreInt32(&gm.enabled, 0) +} + +// monitorLoop periodically checks goroutine count +func (gm *GoroutineMonitor) monitorLoop() { + logger := logging.GetDefaultLogger().With().Str("component", "goroutine-monitor").Logger() + logger.Info().Int("baseline", gm.baselineCount).Msg("goroutine monitor started") + + for atomic.LoadInt32(&gm.enabled) == 1 { + time.Sleep(gm.monitorInterval) + gm.checkGoroutineCount() + } + + logger.Info().Msg("goroutine monitor stopped") +} + +// checkGoroutineCount checks current goroutine count and logs if it exceeds thresholds +func (gm *GoroutineMonitor) checkGoroutineCount() { + currentCount := runtime.NumGoroutine() + gm.lastCount = currentCount + + // Update peak count if needed + if currentCount > gm.peakCount { + gm.peakCount = currentCount + } + + // Calculate growth since baseline + growth := currentCount - gm.baselineCount + growthPercent := float64(growth) / float64(gm.baselineCount) * 100 + + // Log warning if growth exceeds thresholds + logger := logging.GetDefaultLogger().With().Str("component", "goroutine-monitor").Logger() + + // Different log levels based on growth severity + if growthPercent > 30 { + // Severe growth - trigger cleanup + logger.Warn().Int("current", currentCount).Int("baseline", gm.baselineCount). + Int("growth", growth).Float64("growth_percent", growthPercent). + Msg("excessive goroutine growth detected - triggering cleanup") + + // Force garbage collection to clean up unused resources + runtime.GC() + + // Force cleanup of goroutine buffer cache + cleanupGoroutineCache() + } else if growthPercent > 20 { + // Moderate growth - just log warning + logger.Warn().Int("current", currentCount).Int("baseline", gm.baselineCount). + Int("growth", growth).Float64("growth_percent", growthPercent). + Msg("significant goroutine growth detected") + } else if growthPercent > 10 { + // Minor growth - log info + logger.Info().Int("current", currentCount).Int("baseline", gm.baselineCount). + Int("growth", growth).Float64("growth_percent", growthPercent). + Msg("goroutine growth detected") + } + + // Update last check time + gm.lastCheck = time.Now() +} + +// GetGoroutineStats returns current goroutine statistics +func (gm *GoroutineMonitor) GetGoroutineStats() map[string]interface{} { + return map[string]interface{}{ + "current_count": gm.lastCount, + "baseline_count": gm.baselineCount, + "peak_count": gm.peakCount, + "growth": gm.lastCount - gm.baselineCount, + "growth_percent": float64(gm.lastCount-gm.baselineCount) / float64(gm.baselineCount) * 100, + "last_check": gm.lastCheck, + } +} + +// GetGoroutineMonitor returns the global goroutine monitor instance +func GetGoroutineMonitor() *GoroutineMonitor { + if globalGoroutineMonitor == nil { + globalGoroutineMonitor = NewGoroutineMonitor(GetConfig().GoroutineMonitorInterval) + } + return globalGoroutineMonitor +} + +// StartGoroutineMonitoring starts the global goroutine monitor +func StartGoroutineMonitoring() { + monitor := GetGoroutineMonitor() + monitor.Start() +} + +// StopGoroutineMonitoring stops the global goroutine monitor +func StopGoroutineMonitoring() { + if globalGoroutineMonitor != nil { + globalGoroutineMonitor.Stop() + } +} diff --git a/internal/audio/input.go b/internal/audio/input.go index 0c0c505b..6ef31d9f 100644 --- a/internal/audio/input.go +++ b/internal/audio/input.go @@ -96,9 +96,13 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error { // Log high latency warnings if processingTime > time.Duration(GetConfig().InputProcessingTimeoutMS)*time.Millisecond { + latencyMs := float64(processingTime.Milliseconds()) aim.logger.Warn(). - Dur("latency_ms", processingTime). + Float64("latency_ms", latencyMs). Msg("High audio processing latency detected") + + // Record latency for goroutine cleanup optimization + RecordAudioLatency(latencyMs) } if err != nil { @@ -132,9 +136,13 @@ func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame) // Log high latency warnings if processingTime > time.Duration(GetConfig().InputProcessingTimeoutMS)*time.Millisecond { + latencyMs := float64(processingTime.Milliseconds()) aim.logger.Warn(). - Dur("latency_ms", processingTime). + Float64("latency_ms", latencyMs). Msg("High audio processing latency detected") + + // Record latency for goroutine cleanup optimization + RecordAudioLatency(latencyMs) } if err != nil { diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index ed876538..88a57e36 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -1,3 +1,6 @@ +//go:build cgo +// +build cgo + package audio import ( @@ -308,6 +311,9 @@ func (s *AudioOutputStreamer) ReportLatency(latency time.Duration) { // StartAudioOutputStreaming starts audio output streaming (capturing system audio) func StartAudioOutputStreaming(send func([]byte)) error { + // Initialize audio monitoring (latency tracking and cache cleanup) + InitializeAudioMonitoring() + if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) { return ErrAudioAlreadyRunning } diff --git a/main.go b/main.go index 412d6eb3..06a1cc2f 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,14 @@ func startAudioSubprocess() error { // Start adaptive buffer management for optimal performance audio.StartAdaptiveBuffering() + // Start goroutine monitoring to detect and prevent leaks + audio.StartGoroutineMonitoring() + + // Enable batch audio processing to reduce CGO call overhead + if err := audio.EnableBatchAudioProcessing(); err != nil { + logger.Warn().Err(err).Msg("failed to enable batch audio processing") + } + // Create audio server supervisor audioSupervisor = audio.NewAudioOutputSupervisor() @@ -95,6 +103,10 @@ func startAudioSubprocess() error { audio.StopAudioRelay() // Stop adaptive buffering audio.StopAdaptiveBuffering() + // Stop goroutine monitoring + audio.StopGoroutineMonitoring() + // Disable batch audio processing + audio.DisableBatchAudioProcessing() }, // onRestart func(attempt int, delay time.Duration) {