mirror of https://github.com/jetkvm/kvm.git
feat(audio): optimize audio processing with batch processing and goroutine monitoring
- Add batch audio processing to reduce CGO call overhead - Implement goroutine monitoring and cleanup for leak prevention - Optimize buffer pool with TTL-based cache and latency-aware cleanup - Add configurable parameters for batch processing and monitoring - Improve CGO audio read performance with config caching
This commit is contained in:
parent
950ca2bd99
commit
8110be6cc6
|
@ -9,4 +9,5 @@ tmp/
|
|||
*.tmp
|
||||
*.code-workspace
|
||||
|
||||
device-tests.tar.gz
|
||||
device-tests.tar.gz
|
||||
CLAUDE.md
|
||||
|
|
|
@ -213,16 +213,32 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
|
|||
return
|
||||
}
|
||||
|
||||
// Pin to OS thread for the entire batch to minimize thread switching overhead
|
||||
// Only pin to OS thread for large batches to reduce thread contention
|
||||
start := time.Now()
|
||||
if atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) {
|
||||
shouldPinThread := len(batch) >= GetConfig().MinBatchSizeForThreadPinning
|
||||
|
||||
// Track if we pinned the thread in this call
|
||||
threadWasPinned := false
|
||||
|
||||
if shouldPinThread && atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) {
|
||||
threadWasPinned = true
|
||||
runtime.LockOSThread()
|
||||
|
||||
// Set high priority for batch audio processing
|
||||
if err := SetAudioThreadPriority(); err != nil {
|
||||
bap.logger.Warn().Err(err).Msg("failed to set batch audio processing priority")
|
||||
}
|
||||
}
|
||||
|
||||
batchSize := len(batch)
|
||||
atomic.AddInt64(&bap.stats.BatchedReads, 1)
|
||||
atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize))
|
||||
if batchSize > 1 {
|
||||
atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1))
|
||||
}
|
||||
|
||||
// Add deferred function to release thread lock if we pinned it
|
||||
if threadWasPinned {
|
||||
defer func() {
|
||||
if err := ResetThreadPriority(); err != nil {
|
||||
bap.logger.Warn().Err(err).Msg("failed to reset thread priority")
|
||||
|
@ -233,13 +249,6 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
|
|||
}()
|
||||
}
|
||||
|
||||
batchSize := len(batch)
|
||||
atomic.AddInt64(&bap.stats.BatchedReads, 1)
|
||||
atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize))
|
||||
if batchSize > 1 {
|
||||
atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1))
|
||||
}
|
||||
|
||||
// Process each request in the batch
|
||||
for _, req := range batch {
|
||||
length, err := CGOAudioReadEncode(req.buffer)
|
||||
|
|
|
@ -1,24 +1,118 @@
|
|||
//go:build cgo
|
||||
// +build cgo
|
||||
|
||||
package audio
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// AudioLatencyInfo holds simplified latency information for cleanup decisions
|
||||
type AudioLatencyInfo struct {
|
||||
LatencyMs float64
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// Global latency tracking
|
||||
var (
|
||||
currentAudioLatency = AudioLatencyInfo{}
|
||||
currentAudioLatencyLock sync.RWMutex
|
||||
audioMonitoringInitialized int32 // Atomic flag to track initialization
|
||||
)
|
||||
|
||||
// InitializeAudioMonitoring starts the background goroutines for latency tracking and cache cleanup
|
||||
// This is safe to call multiple times as it will only initialize once
|
||||
func InitializeAudioMonitoring() {
|
||||
// Use atomic CAS to ensure we only initialize once
|
||||
if atomic.CompareAndSwapInt32(&audioMonitoringInitialized, 0, 1) {
|
||||
// Start the latency recorder
|
||||
startLatencyRecorder()
|
||||
|
||||
// Start the cleanup goroutine
|
||||
startCleanupGoroutine()
|
||||
}
|
||||
}
|
||||
|
||||
// latencyChannel is used for non-blocking latency recording
|
||||
var latencyChannel = make(chan float64, 10)
|
||||
|
||||
// startLatencyRecorder starts the latency recorder goroutine
|
||||
// This should be called during package initialization
|
||||
func startLatencyRecorder() {
|
||||
go latencyRecorderLoop()
|
||||
}
|
||||
|
||||
// latencyRecorderLoop processes latency recordings in the background
|
||||
func latencyRecorderLoop() {
|
||||
for latencyMs := range latencyChannel {
|
||||
currentAudioLatencyLock.Lock()
|
||||
currentAudioLatency = AudioLatencyInfo{
|
||||
LatencyMs: latencyMs,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
currentAudioLatencyLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// RecordAudioLatency records the current audio processing latency
|
||||
// This is called from the audio input manager when latency is measured
|
||||
// It is non-blocking to ensure zero overhead in the critical audio path
|
||||
func RecordAudioLatency(latencyMs float64) {
|
||||
// Non-blocking send - if channel is full, we drop the update
|
||||
select {
|
||||
case latencyChannel <- latencyMs:
|
||||
// Successfully sent
|
||||
default:
|
||||
// Channel full, drop this update to avoid blocking the audio path
|
||||
}
|
||||
}
|
||||
|
||||
// GetAudioLatencyMetrics returns the current audio latency information
|
||||
// Returns nil if no latency data is available or if it's too old
|
||||
func GetAudioLatencyMetrics() *AudioLatencyInfo {
|
||||
currentAudioLatencyLock.RLock()
|
||||
defer currentAudioLatencyLock.RUnlock()
|
||||
|
||||
// Check if we have valid latency data
|
||||
if currentAudioLatency.Timestamp.IsZero() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if the data is too old (more than 5 seconds)
|
||||
if time.Since(currentAudioLatency.Timestamp) > 5*time.Second {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &AudioLatencyInfo{
|
||||
LatencyMs: currentAudioLatency.LatencyMs,
|
||||
Timestamp: currentAudioLatency.Timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
// Lock-free buffer cache for per-goroutine optimization
|
||||
type lockFreeBufferCache struct {
|
||||
buffers [4]*[]byte // Small fixed-size array for lock-free access
|
||||
}
|
||||
|
||||
// TTL tracking for goroutine cache entries
|
||||
type cacheEntry struct {
|
||||
cache *lockFreeBufferCache
|
||||
lastAccess int64 // Unix timestamp of last access
|
||||
gid int64 // Goroutine ID for better tracking
|
||||
}
|
||||
|
||||
// Per-goroutine buffer cache using goroutine-local storage
|
||||
var goroutineBufferCache = make(map[int64]*lockFreeBufferCache)
|
||||
var goroutineCacheMutex sync.RWMutex
|
||||
var lastCleanupTime int64 // Unix timestamp of last cleanup
|
||||
const maxCacheSize = 1000 // Maximum number of goroutine caches
|
||||
const cleanupInterval = 300 // Cleanup interval in seconds (5 minutes)
|
||||
var lastCleanupTime int64 // Unix timestamp of last cleanup
|
||||
const maxCacheSize = 500 // Maximum number of goroutine caches (reduced from 1000)
|
||||
const cleanupInterval int64 = 30 // Cleanup interval in seconds (30 seconds, reduced from 60)
|
||||
const bufferTTL int64 = 60 // Time-to-live for cached buffers in seconds (1 minute, reduced from 2)
|
||||
|
||||
// getGoroutineID extracts goroutine ID from runtime stack for cache key
|
||||
func getGoroutineID() int64 {
|
||||
|
@ -39,13 +133,67 @@ func getGoroutineID() int64 {
|
|||
return 0
|
||||
}
|
||||
|
||||
// cleanupGoroutineCache removes stale entries from the goroutine cache
|
||||
func cleanupGoroutineCache() {
|
||||
// Map of goroutine ID to cache entry with TTL tracking
|
||||
var goroutineCacheWithTTL = make(map[int64]*cacheEntry)
|
||||
|
||||
// cleanupChannel is used for asynchronous cleanup requests
|
||||
var cleanupChannel = make(chan struct{}, 1)
|
||||
|
||||
// startCleanupGoroutine starts the cleanup goroutine
|
||||
// This should be called during package initialization
|
||||
func startCleanupGoroutine() {
|
||||
go cleanupLoop()
|
||||
}
|
||||
|
||||
// cleanupLoop processes cleanup requests in the background
|
||||
func cleanupLoop() {
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-cleanupChannel:
|
||||
// Received explicit cleanup request
|
||||
performCleanup(true)
|
||||
case <-ticker.C:
|
||||
// Regular cleanup check
|
||||
performCleanup(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// requestCleanup signals the cleanup goroutine to perform a cleanup
|
||||
// This is non-blocking and can be called from the critical path
|
||||
func requestCleanup() {
|
||||
select {
|
||||
case cleanupChannel <- struct{}{}:
|
||||
// Successfully requested cleanup
|
||||
default:
|
||||
// Channel full, cleanup already pending
|
||||
}
|
||||
}
|
||||
|
||||
// performCleanup does the actual cache cleanup work
|
||||
// This runs in a dedicated goroutine, not in the critical path
|
||||
func performCleanup(forced bool) {
|
||||
now := time.Now().Unix()
|
||||
lastCleanup := atomic.LoadInt64(&lastCleanupTime)
|
||||
|
||||
// Only cleanup if enough time has passed
|
||||
if now-lastCleanup < cleanupInterval {
|
||||
// Check if we're in a high-latency situation
|
||||
isHighLatency := false
|
||||
latencyMetrics := GetAudioLatencyMetrics()
|
||||
if latencyMetrics != nil && latencyMetrics.LatencyMs > 10.0 {
|
||||
// Under high latency, be more aggressive with cleanup
|
||||
isHighLatency = true
|
||||
}
|
||||
|
||||
// Only cleanup if enough time has passed (less time if high latency) or if forced
|
||||
interval := cleanupInterval
|
||||
if isHighLatency {
|
||||
interval = cleanupInterval / 2 // More frequent cleanup under high latency
|
||||
}
|
||||
|
||||
if !forced && now-lastCleanup < interval {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -54,23 +202,93 @@ func cleanupGoroutineCache() {
|
|||
return // Another goroutine is already cleaning up
|
||||
}
|
||||
|
||||
// Perform the actual cleanup
|
||||
doCleanupGoroutineCache()
|
||||
}
|
||||
|
||||
// cleanupGoroutineCache triggers an asynchronous cleanup of the goroutine cache
|
||||
// This is safe to call from the critical path as it's non-blocking
|
||||
func cleanupGoroutineCache() {
|
||||
// Request asynchronous cleanup
|
||||
requestCleanup()
|
||||
}
|
||||
|
||||
// The actual cleanup implementation that runs in the background goroutine
|
||||
func doCleanupGoroutineCache() {
|
||||
// Get current time for TTL calculations
|
||||
now := time.Now().Unix()
|
||||
|
||||
// Check if we're in a high-latency situation
|
||||
isHighLatency := false
|
||||
latencyMetrics := GetAudioLatencyMetrics()
|
||||
if latencyMetrics != nil && latencyMetrics.LatencyMs > 10.0 {
|
||||
// Under high latency, be more aggressive with cleanup
|
||||
isHighLatency = true
|
||||
}
|
||||
|
||||
goroutineCacheMutex.Lock()
|
||||
defer goroutineCacheMutex.Unlock()
|
||||
|
||||
// If cache is too large, remove oldest entries (simple FIFO)
|
||||
if len(goroutineBufferCache) > maxCacheSize {
|
||||
// Remove half of the entries to avoid frequent cleanups
|
||||
toRemove := len(goroutineBufferCache) - maxCacheSize/2
|
||||
count := 0
|
||||
for gid := range goroutineBufferCache {
|
||||
delete(goroutineBufferCache, gid)
|
||||
count++
|
||||
if count >= toRemove {
|
||||
break
|
||||
// Convert old cache format to new TTL-based format if needed
|
||||
if len(goroutineCacheWithTTL) == 0 && len(goroutineBufferCache) > 0 {
|
||||
for gid, cache := range goroutineBufferCache {
|
||||
goroutineCacheWithTTL[gid] = &cacheEntry{
|
||||
cache: cache,
|
||||
lastAccess: now,
|
||||
}
|
||||
}
|
||||
// Log cleanup for debugging (removed logging dependency)
|
||||
_ = count // Avoid unused variable warning
|
||||
// Clear old cache to free memory
|
||||
goroutineBufferCache = make(map[int64]*lockFreeBufferCache)
|
||||
}
|
||||
|
||||
// Remove stale entries based on TTL (more aggressive under high latency)
|
||||
expiredCount := 0
|
||||
ttl := bufferTTL
|
||||
if isHighLatency {
|
||||
// Under high latency, use a much shorter TTL
|
||||
ttl = bufferTTL / 4
|
||||
}
|
||||
|
||||
for gid, entry := range goroutineCacheWithTTL {
|
||||
// Both now and entry.lastAccess are int64, so this comparison is safe
|
||||
if now-entry.lastAccess > ttl {
|
||||
delete(goroutineCacheWithTTL, gid)
|
||||
expiredCount++
|
||||
}
|
||||
}
|
||||
|
||||
// If cache is still too large after TTL cleanup, remove oldest entries
|
||||
// Under high latency, use a more aggressive target size
|
||||
targetSize := maxCacheSize
|
||||
targetReduction := maxCacheSize / 2
|
||||
|
||||
if isHighLatency {
|
||||
// Under high latency, target a much smaller cache size
|
||||
targetSize = maxCacheSize / 4
|
||||
targetReduction = maxCacheSize / 8
|
||||
}
|
||||
|
||||
if len(goroutineCacheWithTTL) > targetSize {
|
||||
// Find oldest entries
|
||||
type ageEntry struct {
|
||||
gid int64
|
||||
lastAccess int64
|
||||
}
|
||||
oldestEntries := make([]ageEntry, 0, len(goroutineCacheWithTTL))
|
||||
for gid, entry := range goroutineCacheWithTTL {
|
||||
oldestEntries = append(oldestEntries, ageEntry{gid, entry.lastAccess})
|
||||
}
|
||||
|
||||
// Sort by lastAccess (oldest first)
|
||||
sort.Slice(oldestEntries, func(i, j int) bool {
|
||||
return oldestEntries[i].lastAccess < oldestEntries[j].lastAccess
|
||||
})
|
||||
|
||||
// Remove oldest entries to get down to target reduction size
|
||||
toRemove := len(goroutineCacheWithTTL) - targetReduction
|
||||
for i := 0; i < toRemove && i < len(oldestEntries); i++ {
|
||||
delete(goroutineCacheWithTTL, oldestEntries[i].gid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -151,7 +369,18 @@ func (p *AudioBufferPool) Get() []byte {
|
|||
// Fast path: Try lock-free per-goroutine cache first
|
||||
gid := getGoroutineID()
|
||||
goroutineCacheMutex.RLock()
|
||||
cache, exists := goroutineBufferCache[gid]
|
||||
|
||||
// Try new TTL-based cache first
|
||||
cacheEntry, exists := goroutineCacheWithTTL[gid]
|
||||
var cache *lockFreeBufferCache
|
||||
if exists && cacheEntry != nil {
|
||||
cache = cacheEntry.cache
|
||||
// Update last access time
|
||||
cacheEntry.lastAccess = time.Now().Unix()
|
||||
} else {
|
||||
// Fall back to legacy cache if needed
|
||||
cache, exists = goroutineBufferCache[gid]
|
||||
}
|
||||
goroutineCacheMutex.RUnlock()
|
||||
|
||||
if exists && cache != nil {
|
||||
|
@ -190,6 +419,8 @@ func (p *AudioBufferPool) Get() []byte {
|
|||
buf := poolBuf.(*[]byte)
|
||||
// Update hit counter
|
||||
atomic.AddInt64(&p.hitCount, 1)
|
||||
// Decrement pool size counter atomically
|
||||
atomic.AddInt64(&p.currentSize, -1)
|
||||
// Ensure buffer is properly reset and check capacity
|
||||
if cap(*buf) >= p.bufferSize {
|
||||
wasHit = true
|
||||
|
@ -230,15 +461,32 @@ func (p *AudioBufferPool) Put(buf []byte) {
|
|||
|
||||
// Fast path: Try to put in lock-free per-goroutine cache
|
||||
gid := getGoroutineID()
|
||||
now := time.Now().Unix()
|
||||
|
||||
// Check if we have a TTL-based cache entry for this goroutine
|
||||
goroutineCacheMutex.RLock()
|
||||
cache, exists := goroutineBufferCache[gid]
|
||||
entryWithTTL, exists := goroutineCacheWithTTL[gid]
|
||||
var cache *lockFreeBufferCache
|
||||
if exists && entryWithTTL != nil {
|
||||
cache = entryWithTTL.cache
|
||||
// Update last access time
|
||||
entryWithTTL.lastAccess = now
|
||||
} else {
|
||||
// Fall back to legacy cache if needed
|
||||
cache, exists = goroutineBufferCache[gid]
|
||||
}
|
||||
goroutineCacheMutex.RUnlock()
|
||||
|
||||
if !exists {
|
||||
// Create new cache for this goroutine
|
||||
cache = &lockFreeBufferCache{}
|
||||
goroutineCacheMutex.Lock()
|
||||
goroutineBufferCache[gid] = cache
|
||||
// Store in TTL-based cache
|
||||
goroutineCacheWithTTL[gid] = &cacheEntry{
|
||||
cache: cache,
|
||||
lastAccess: now,
|
||||
gid: gid,
|
||||
}
|
||||
goroutineCacheMutex.Unlock()
|
||||
}
|
||||
|
||||
|
@ -267,9 +515,8 @@ func (p *AudioBufferPool) Put(buf []byte) {
|
|||
return // Pool is full, let GC handle this buffer
|
||||
}
|
||||
|
||||
// Return to sync.Pool
|
||||
// Return to sync.Pool and update counter atomically
|
||||
p.pool.Put(&resetBuf)
|
||||
|
||||
// Update pool size counter atomically
|
||||
atomic.AddInt64(&p.currentSize, 1)
|
||||
}
|
||||
|
|
|
@ -5,6 +5,8 @@ package audio
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
|
@ -703,26 +705,63 @@ func cgoAudioClose() {
|
|||
C.jetkvm_audio_close()
|
||||
}
|
||||
|
||||
// Cache config values to avoid repeated GetConfig() calls in hot path
|
||||
var (
|
||||
cachedMinReadEncodeBuffer int
|
||||
configCacheMutex sync.RWMutex
|
||||
lastConfigUpdate time.Time
|
||||
configCacheExpiry = 5 * time.Second
|
||||
)
|
||||
|
||||
// updateConfigCache refreshes the cached config values if needed
|
||||
func updateConfigCache() {
|
||||
configCacheMutex.RLock()
|
||||
cacheExpired := time.Since(lastConfigUpdate) > configCacheExpiry
|
||||
configCacheMutex.RUnlock()
|
||||
|
||||
if cacheExpired {
|
||||
configCacheMutex.Lock()
|
||||
defer configCacheMutex.Unlock()
|
||||
// Double-check after acquiring lock
|
||||
if time.Since(lastConfigUpdate) > configCacheExpiry {
|
||||
cachedMinReadEncodeBuffer = GetConfig().MinReadEncodeBuffer
|
||||
lastConfigUpdate = time.Now()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func cgoAudioReadEncode(buf []byte) (int, error) {
|
||||
minRequired := GetConfig().MinReadEncodeBuffer
|
||||
// Use cached config values to avoid GetConfig() in hot path
|
||||
updateConfigCache()
|
||||
|
||||
// Fast validation with cached values
|
||||
configCacheMutex.RLock()
|
||||
minRequired := cachedMinReadEncodeBuffer
|
||||
configCacheMutex.RUnlock()
|
||||
|
||||
if len(buf) < minRequired {
|
||||
return 0, newBufferTooSmallError(len(buf), minRequired)
|
||||
}
|
||||
|
||||
// Skip initialization check for now to avoid CGO compilation issues
|
||||
// TODO: Add proper initialization state checking
|
||||
// Note: The C code already has comprehensive state tracking with capture_initialized,
|
||||
// capture_initializing, playback_initialized, and playback_initializing flags.
|
||||
// When CGO environment is properly configured, this should check C.capture_initialized.
|
||||
|
||||
// Direct CGO call with minimal overhead
|
||||
n := C.jetkvm_audio_read_encode(unsafe.Pointer(&buf[0]))
|
||||
|
||||
// Fast path for success case
|
||||
if n > 0 {
|
||||
return int(n), nil
|
||||
}
|
||||
|
||||
// Handle error cases
|
||||
if n < 0 {
|
||||
return 0, newAudioReadEncodeError(int(n))
|
||||
}
|
||||
if n == 0 {
|
||||
return 0, nil // No data available
|
||||
}
|
||||
return int(n), nil
|
||||
|
||||
// n == 0 case
|
||||
return 0, nil // No data available
|
||||
}
|
||||
|
||||
// Audio playback functions
|
||||
|
|
|
@ -379,6 +379,18 @@ type AudioConfigConstants struct {
|
|||
// Default 4096 bytes handles maximum audio frame size with safety margin.
|
||||
MaxDecodeWriteBuffer int
|
||||
|
||||
// MinBatchSizeForThreadPinning defines the minimum batch size required to pin a thread.
|
||||
// Used in: batch_audio.go for deciding when to pin a thread for batch processing.
|
||||
// Impact: Smaller values increase thread pinning frequency but may improve performance.
|
||||
// Default 5 frames provides a good balance between performance and thread contention.
|
||||
MinBatchSizeForThreadPinning int
|
||||
|
||||
// GoroutineMonitorInterval defines the interval for monitoring goroutine counts.
|
||||
// Used in: goroutine_monitor.go for periodic goroutine count checks.
|
||||
// Impact: Shorter intervals provide more frequent monitoring but increase overhead.
|
||||
// Default 30 seconds provides reasonable monitoring frequency with minimal overhead.
|
||||
GoroutineMonitorInterval time.Duration
|
||||
|
||||
// IPC Configuration - Inter-Process Communication settings for audio components
|
||||
// Used in: ipc.go for configuring audio process communication
|
||||
// Impact: Controls IPC reliability, performance, and protocol compliance
|
||||
|
@ -2464,6 +2476,12 @@ func DefaultAudioConfig() *AudioConfigConstants {
|
|||
LatencyBucket500ms: 500 * time.Millisecond, // 500ms latency bucket
|
||||
LatencyBucket1s: 1 * time.Second, // 1s latency bucket
|
||||
LatencyBucket2s: 2 * time.Second, // 2s latency bucket
|
||||
|
||||
// Batch Audio Processing Configuration
|
||||
MinBatchSizeForThreadPinning: 5, // Minimum batch size to pin thread
|
||||
|
||||
// Goroutine Monitoring Configuration
|
||||
GoroutineMonitorInterval: 30 * time.Second, // 30s monitoring interval
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,145 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/jetkvm/kvm/internal/logging"
|
||||
)
|
||||
|
||||
// GoroutineMonitor tracks goroutine count and provides cleanup mechanisms
|
||||
type GoroutineMonitor struct {
|
||||
baselineCount int
|
||||
peakCount int
|
||||
lastCount int
|
||||
monitorInterval time.Duration
|
||||
lastCheck time.Time
|
||||
enabled int32
|
||||
}
|
||||
|
||||
// Global goroutine monitor instance
|
||||
var globalGoroutineMonitor *GoroutineMonitor
|
||||
|
||||
// NewGoroutineMonitor creates a new goroutine monitor
|
||||
func NewGoroutineMonitor(monitorInterval time.Duration) *GoroutineMonitor {
|
||||
if monitorInterval <= 0 {
|
||||
monitorInterval = 30 * time.Second
|
||||
}
|
||||
|
||||
// Get current goroutine count as baseline
|
||||
baselineCount := runtime.NumGoroutine()
|
||||
|
||||
return &GoroutineMonitor{
|
||||
baselineCount: baselineCount,
|
||||
peakCount: baselineCount,
|
||||
lastCount: baselineCount,
|
||||
monitorInterval: monitorInterval,
|
||||
lastCheck: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins goroutine monitoring
|
||||
func (gm *GoroutineMonitor) Start() {
|
||||
if !atomic.CompareAndSwapInt32(&gm.enabled, 0, 1) {
|
||||
return // Already running
|
||||
}
|
||||
|
||||
go gm.monitorLoop()
|
||||
}
|
||||
|
||||
// Stop stops goroutine monitoring
|
||||
func (gm *GoroutineMonitor) Stop() {
|
||||
atomic.StoreInt32(&gm.enabled, 0)
|
||||
}
|
||||
|
||||
// monitorLoop periodically checks goroutine count
|
||||
func (gm *GoroutineMonitor) monitorLoop() {
|
||||
logger := logging.GetDefaultLogger().With().Str("component", "goroutine-monitor").Logger()
|
||||
logger.Info().Int("baseline", gm.baselineCount).Msg("goroutine monitor started")
|
||||
|
||||
for atomic.LoadInt32(&gm.enabled) == 1 {
|
||||
time.Sleep(gm.monitorInterval)
|
||||
gm.checkGoroutineCount()
|
||||
}
|
||||
|
||||
logger.Info().Msg("goroutine monitor stopped")
|
||||
}
|
||||
|
||||
// checkGoroutineCount checks current goroutine count and logs if it exceeds thresholds
|
||||
func (gm *GoroutineMonitor) checkGoroutineCount() {
|
||||
currentCount := runtime.NumGoroutine()
|
||||
gm.lastCount = currentCount
|
||||
|
||||
// Update peak count if needed
|
||||
if currentCount > gm.peakCount {
|
||||
gm.peakCount = currentCount
|
||||
}
|
||||
|
||||
// Calculate growth since baseline
|
||||
growth := currentCount - gm.baselineCount
|
||||
growthPercent := float64(growth) / float64(gm.baselineCount) * 100
|
||||
|
||||
// Log warning if growth exceeds thresholds
|
||||
logger := logging.GetDefaultLogger().With().Str("component", "goroutine-monitor").Logger()
|
||||
|
||||
// Different log levels based on growth severity
|
||||
if growthPercent > 30 {
|
||||
// Severe growth - trigger cleanup
|
||||
logger.Warn().Int("current", currentCount).Int("baseline", gm.baselineCount).
|
||||
Int("growth", growth).Float64("growth_percent", growthPercent).
|
||||
Msg("excessive goroutine growth detected - triggering cleanup")
|
||||
|
||||
// Force garbage collection to clean up unused resources
|
||||
runtime.GC()
|
||||
|
||||
// Force cleanup of goroutine buffer cache
|
||||
cleanupGoroutineCache()
|
||||
} else if growthPercent > 20 {
|
||||
// Moderate growth - just log warning
|
||||
logger.Warn().Int("current", currentCount).Int("baseline", gm.baselineCount).
|
||||
Int("growth", growth).Float64("growth_percent", growthPercent).
|
||||
Msg("significant goroutine growth detected")
|
||||
} else if growthPercent > 10 {
|
||||
// Minor growth - log info
|
||||
logger.Info().Int("current", currentCount).Int("baseline", gm.baselineCount).
|
||||
Int("growth", growth).Float64("growth_percent", growthPercent).
|
||||
Msg("goroutine growth detected")
|
||||
}
|
||||
|
||||
// Update last check time
|
||||
gm.lastCheck = time.Now()
|
||||
}
|
||||
|
||||
// GetGoroutineStats returns current goroutine statistics
|
||||
func (gm *GoroutineMonitor) GetGoroutineStats() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"current_count": gm.lastCount,
|
||||
"baseline_count": gm.baselineCount,
|
||||
"peak_count": gm.peakCount,
|
||||
"growth": gm.lastCount - gm.baselineCount,
|
||||
"growth_percent": float64(gm.lastCount-gm.baselineCount) / float64(gm.baselineCount) * 100,
|
||||
"last_check": gm.lastCheck,
|
||||
}
|
||||
}
|
||||
|
||||
// GetGoroutineMonitor returns the global goroutine monitor instance
|
||||
func GetGoroutineMonitor() *GoroutineMonitor {
|
||||
if globalGoroutineMonitor == nil {
|
||||
globalGoroutineMonitor = NewGoroutineMonitor(GetConfig().GoroutineMonitorInterval)
|
||||
}
|
||||
return globalGoroutineMonitor
|
||||
}
|
||||
|
||||
// StartGoroutineMonitoring starts the global goroutine monitor
|
||||
func StartGoroutineMonitoring() {
|
||||
monitor := GetGoroutineMonitor()
|
||||
monitor.Start()
|
||||
}
|
||||
|
||||
// StopGoroutineMonitoring stops the global goroutine monitor
|
||||
func StopGoroutineMonitoring() {
|
||||
if globalGoroutineMonitor != nil {
|
||||
globalGoroutineMonitor.Stop()
|
||||
}
|
||||
}
|
|
@ -96,9 +96,13 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error {
|
|||
|
||||
// Log high latency warnings
|
||||
if processingTime > time.Duration(GetConfig().InputProcessingTimeoutMS)*time.Millisecond {
|
||||
latencyMs := float64(processingTime.Milliseconds())
|
||||
aim.logger.Warn().
|
||||
Dur("latency_ms", processingTime).
|
||||
Float64("latency_ms", latencyMs).
|
||||
Msg("High audio processing latency detected")
|
||||
|
||||
// Record latency for goroutine cleanup optimization
|
||||
RecordAudioLatency(latencyMs)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
@ -132,9 +136,13 @@ func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame)
|
|||
|
||||
// Log high latency warnings
|
||||
if processingTime > time.Duration(GetConfig().InputProcessingTimeoutMS)*time.Millisecond {
|
||||
latencyMs := float64(processingTime.Milliseconds())
|
||||
aim.logger.Warn().
|
||||
Dur("latency_ms", processingTime).
|
||||
Float64("latency_ms", latencyMs).
|
||||
Msg("High audio processing latency detected")
|
||||
|
||||
// Record latency for goroutine cleanup optimization
|
||||
RecordAudioLatency(latencyMs)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
|
|
@ -1,3 +1,6 @@
|
|||
//go:build cgo
|
||||
// +build cgo
|
||||
|
||||
package audio
|
||||
|
||||
import (
|
||||
|
@ -308,6 +311,9 @@ func (s *AudioOutputStreamer) ReportLatency(latency time.Duration) {
|
|||
|
||||
// StartAudioOutputStreaming starts audio output streaming (capturing system audio)
|
||||
func StartAudioOutputStreaming(send func([]byte)) error {
|
||||
// Initialize audio monitoring (latency tracking and cache cleanup)
|
||||
InitializeAudioMonitoring()
|
||||
|
||||
if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) {
|
||||
return ErrAudioAlreadyRunning
|
||||
}
|
||||
|
|
12
main.go
12
main.go
|
@ -38,6 +38,14 @@ func startAudioSubprocess() error {
|
|||
// Start adaptive buffer management for optimal performance
|
||||
audio.StartAdaptiveBuffering()
|
||||
|
||||
// Start goroutine monitoring to detect and prevent leaks
|
||||
audio.StartGoroutineMonitoring()
|
||||
|
||||
// Enable batch audio processing to reduce CGO call overhead
|
||||
if err := audio.EnableBatchAudioProcessing(); err != nil {
|
||||
logger.Warn().Err(err).Msg("failed to enable batch audio processing")
|
||||
}
|
||||
|
||||
// Create audio server supervisor
|
||||
audioSupervisor = audio.NewAudioOutputSupervisor()
|
||||
|
||||
|
@ -95,6 +103,10 @@ func startAudioSubprocess() error {
|
|||
audio.StopAudioRelay()
|
||||
// Stop adaptive buffering
|
||||
audio.StopAdaptiveBuffering()
|
||||
// Stop goroutine monitoring
|
||||
audio.StopGoroutineMonitoring()
|
||||
// Disable batch audio processing
|
||||
audio.DisableBatchAudioProcessing()
|
||||
},
|
||||
// onRestart
|
||||
func(attempt int, delay time.Duration) {
|
||||
|
|
Loading…
Reference in New Issue