From 6adcc26ff2259f1e834b824349e249f3baceaf53 Mon Sep 17 00:00:00 2001 From: Alex P Date: Fri, 29 Aug 2025 17:05:37 +0000 Subject: [PATCH] feat(audio): add goroutine cache cleanup and process reuse Implement periodic cleanup of stale goroutine buffer caches to prevent memory leaks Add ability to detect and reuse existing audio input server processes --- internal/audio/buffer_pool.go | 47 +++++++++++++++++--- internal/audio/input_supervisor.go | 71 +++++++++++++++++++++++++++++- 2 files changed, 111 insertions(+), 7 deletions(-) diff --git a/internal/audio/buffer_pool.go b/internal/audio/buffer_pool.go index a6a09d32..e4638f4e 100644 --- a/internal/audio/buffer_pool.go +++ b/internal/audio/buffer_pool.go @@ -6,8 +6,6 @@ import ( "sync/atomic" "time" "unsafe" - - "github.com/jetkvm/kvm/internal/logging" ) // Lock-free buffer cache for per-goroutine optimization @@ -18,6 +16,9 @@ type lockFreeBufferCache struct { // Per-goroutine buffer cache using goroutine-local storage var goroutineBufferCache = make(map[int64]*lockFreeBufferCache) var goroutineCacheMutex sync.RWMutex +var lastCleanupTime int64 // Unix timestamp of last cleanup +const maxCacheSize = 1000 // Maximum number of goroutine caches +const cleanupInterval = 300 // Cleanup interval in seconds (5 minutes) // getGoroutineID extracts goroutine ID from runtime stack for cache key func getGoroutineID() int64 { @@ -38,6 +39,41 @@ func getGoroutineID() int64 { return 0 } +// cleanupGoroutineCache removes stale entries from the goroutine cache +func cleanupGoroutineCache() { + now := time.Now().Unix() + lastCleanup := atomic.LoadInt64(&lastCleanupTime) + + // Only cleanup if enough time has passed + if now-lastCleanup < cleanupInterval { + return + } + + // Try to acquire cleanup lock atomically + if !atomic.CompareAndSwapInt64(&lastCleanupTime, lastCleanup, now) { + return // Another goroutine is already cleaning up + } + + goroutineCacheMutex.Lock() + defer goroutineCacheMutex.Unlock() + + // If cache is too large, remove oldest entries (simple FIFO) + if len(goroutineBufferCache) > maxCacheSize { + // Remove half of the entries to avoid frequent cleanups + toRemove := len(goroutineBufferCache) - maxCacheSize/2 + count := 0 + for gid := range goroutineBufferCache { + delete(goroutineBufferCache, gid) + count++ + if count >= toRemove { + break + } + } + // Log cleanup for debugging (removed logging dependency) + _ = count // Avoid unused variable warning + } +} + type AudioBufferPool struct { // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) currentSize int64 // Current pool size (atomic) @@ -57,9 +93,7 @@ type AudioBufferPool struct { func NewAudioBufferPool(bufferSize int) *AudioBufferPool { // Validate buffer size parameter if err := ValidateBufferSize(bufferSize); err != nil { - // Log validation error and use default value - logger := logging.GetDefaultLogger().With().Str("component", "AudioBufferPool").Logger() - logger.Warn().Err(err).Int("bufferSize", bufferSize).Msg("invalid buffer size, using default") + // Use default value on validation error bufferSize = GetConfig().AudioFramePoolSize } @@ -99,6 +133,9 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool { } func (p *AudioBufferPool) Get() []byte { + // Trigger periodic cleanup of goroutine cache + cleanupGoroutineCache() + start := time.Now() wasHit := false defer func() { diff --git a/internal/audio/input_supervisor.go b/internal/audio/input_supervisor.go index f75ad307..70587eef 100644 --- a/internal/audio/input_supervisor.go +++ b/internal/audio/input_supervisor.go @@ -1,10 +1,15 @@ +//go:build cgo +// +build cgo + package audio import ( "fmt" "os" "os/exec" + "path/filepath" "strconv" + "strings" "syscall" "time" ) @@ -49,7 +54,20 @@ func (ais *AudioInputSupervisor) Start() error { defer ais.mutex.Unlock() if ais.IsRunning() { - return fmt.Errorf("audio input supervisor already running with PID %d", ais.cmd.Process.Pid) + if ais.cmd != nil && ais.cmd.Process != nil { + return fmt.Errorf("audio input supervisor already running with PID %d", ais.cmd.Process.Pid) + } + return fmt.Errorf("audio input supervisor already running") + } + + // Check for existing audio input server process + if existingPID, err := ais.findExistingAudioInputProcess(); err == nil { + ais.logger.Info().Int("existing_pid", existingPID).Msg("Found existing audio input server process, connecting to it") + + // Try to connect to the existing process + ais.setRunning(true) + go ais.connectClient() + return nil } // Create context for subprocess management @@ -204,7 +222,7 @@ func (ais *AudioInputSupervisor) monitorSubprocess() { ais.client.Disconnect() } - // Mark as not running + // Mark as not running first to prevent race conditions ais.setRunning(false) ais.cmd = nil @@ -264,3 +282,52 @@ func (ais *AudioInputSupervisor) SendConfig(config InputIPCConfig) error { return ais.client.SendConfig(config) } + +// findExistingAudioInputProcess checks if there's already an audio input server process running +func (ais *AudioInputSupervisor) findExistingAudioInputProcess() (int, error) { + // Get current executable path + execPath, err := os.Executable() + if err != nil { + return 0, fmt.Errorf("failed to get executable path: %w", err) + } + + execName := filepath.Base(execPath) + + // Use ps to find processes with our executable name and audio-input-server argument + cmd := exec.Command("ps", "aux") + output, err := cmd.Output() + if err != nil { + return 0, fmt.Errorf("failed to run ps command: %w", err) + } + + // Parse ps output to find audio input server processes + lines := strings.Split(string(output), "\n") + for _, line := range lines { + if strings.Contains(line, execName) && strings.Contains(line, "--audio-input-server") { + // Extract PID from ps output (second column) + fields := strings.Fields(line) + if len(fields) >= 2 { + if pid, err := strconv.Atoi(fields[1]); err == nil { + // Verify the process is still running and accessible + if ais.isProcessRunning(pid) { + return pid, nil + } + } + } + } + } + + return 0, fmt.Errorf("no existing audio input server process found") +} + +// isProcessRunning checks if a process with the given PID is still running +func (ais *AudioInputSupervisor) isProcessRunning(pid int) bool { + // Try to send signal 0 to check if process exists + process, err := os.FindProcess(pid) + if err != nil { + return false + } + + err = process.Signal(syscall.Signal(0)) + return err == nil +}