Compare commits

..

No commits in common. "260f62efc3ffb6aa95fd106db50f0d834f295e5d" and "950ca2bd9966ae69ebe0463b1c4ba2d22f73b5ec" have entirely different histories.

15 changed files with 248 additions and 2261 deletions

1
.gitignore vendored
View File

@ -10,4 +10,3 @@ tmp/
*.code-workspace *.code-workspace
device-tests.tar.gz device-tests.tar.gz
CLAUDE.md

View File

@ -4,7 +4,6 @@ package audio
import ( import (
"context" "context"
"fmt"
"runtime" "runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -29,31 +28,23 @@ type BatchAudioProcessor struct {
// Batch queues and state (atomic for lock-free access) // Batch queues and state (atomic for lock-free access)
readQueue chan batchReadRequest readQueue chan batchReadRequest
writeQueue chan batchWriteRequest
initialized int32 initialized int32
running int32 running int32
threadPinned int32 threadPinned int32
writePinned int32
// Buffers (pre-allocated to avoid allocation overhead) // Buffers (pre-allocated to avoid allocation overhead)
readBufPool *sync.Pool readBufPool *sync.Pool
writeBufPool *sync.Pool
} }
type BatchAudioStats struct { type BatchAudioStats struct {
// int64 fields MUST be first for ARM32 alignment // int64 fields MUST be first for ARM32 alignment
BatchedReads int64 BatchedReads int64
SingleReads int64 SingleReads int64
BatchedWrites int64
SingleWrites int64
BatchedFrames int64 BatchedFrames int64
SingleFrames int64 SingleFrames int64
WriteFrames int64
CGOCallsReduced int64 CGOCallsReduced int64
OSThreadPinTime time.Duration // time.Duration is int64 internally OSThreadPinTime time.Duration // time.Duration is int64 internally
WriteThreadTime time.Duration // time.Duration is int64 internally
LastBatchTime time.Time LastBatchTime time.Time
LastWriteTime time.Time
} }
type batchReadRequest struct { type batchReadRequest struct {
@ -67,43 +58,23 @@ type batchReadResult struct {
err error err error
} }
type batchWriteRequest struct {
buffer []byte // Buffer for backward compatibility
opusData []byte // Opus encoded data for decode-write operations
pcmBuffer []byte // PCM buffer for decode-write operations
resultChan chan batchWriteResult
timestamp time.Time
}
type batchWriteResult struct {
length int
err error
}
// NewBatchAudioProcessor creates a new batch audio processor // NewBatchAudioProcessor creates a new batch audio processor
func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor { func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor {
// Get cached config to avoid GetConfig() calls // Validate input parameters
cache := GetCachedConfig() if err := ValidateBufferSize(batchSize); err != nil {
cache.Update() logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
logger.Warn().Err(err).Int("batchSize", batchSize).Msg("invalid batch size, using default")
// Validate input parameters with minimal overhead batchSize = GetConfig().BatchProcessorFramesPerBatch
if batchSize <= 0 || batchSize > 1000 {
batchSize = cache.BatchProcessorFramesPerBatch
} }
if batchDuration <= 0 { if batchDuration <= 0 {
batchDuration = cache.BatchProcessingDelay logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
logger.Warn().Dur("batchDuration", batchDuration).Msg("invalid batch duration, using default")
batchDuration = GetConfig().BatchProcessingDelay
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
// Pre-allocate logger to avoid repeated allocations
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
// Pre-calculate frame size to avoid repeated GetConfig() calls
frameSize := cache.GetMinReadEncodeBuffer()
if frameSize == 0 {
frameSize = 1500 // Safe fallback
}
processor := &BatchAudioProcessor{ processor := &BatchAudioProcessor{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
@ -111,17 +82,9 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu
batchSize: batchSize, batchSize: batchSize,
batchDuration: batchDuration, batchDuration: batchDuration,
readQueue: make(chan batchReadRequest, batchSize*2), readQueue: make(chan batchReadRequest, batchSize*2),
writeQueue: make(chan batchWriteRequest, batchSize*2),
readBufPool: &sync.Pool{ readBufPool: &sync.Pool{
New: func() interface{} { New: func() interface{} {
// Use pre-calculated frame size to avoid GetConfig() calls return make([]byte, GetConfig().AudioFramePoolSize) // Max audio frame size
return make([]byte, 0, frameSize)
},
},
writeBufPool: &sync.Pool{
New: func() interface{} {
// Use pre-calculated frame size to avoid GetConfig() calls
return make([]byte, 0, frameSize)
}, },
}, },
} }
@ -142,7 +105,6 @@ func (bap *BatchAudioProcessor) Start() error {
// Start batch processing goroutines // Start batch processing goroutines
go bap.batchReadProcessor() go bap.batchReadProcessor()
go bap.batchWriteProcessor()
bap.logger.Info().Int("batch_size", bap.batchSize). bap.logger.Info().Int("batch_size", bap.batchSize).
Dur("batch_duration", bap.batchDuration). Dur("batch_duration", bap.batchDuration).
@ -167,17 +129,13 @@ func (bap *BatchAudioProcessor) Stop() {
// BatchReadEncode performs batched audio read and encode operations // BatchReadEncode performs batched audio read and encode operations
func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) {
// Get cached config to avoid GetConfig() calls in hot path
cache := GetCachedConfig()
cache.Update()
// Validate buffer before processing // Validate buffer before processing
if err := ValidateBufferSize(len(buffer)); err != nil { if err := ValidateBufferSize(len(buffer)); err != nil {
bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing") bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing")
return 0, err return 0, err
} }
if !bap.IsRunning() { if atomic.LoadInt32(&bap.running) == 0 {
// Fallback to single operation if batch processor is not running // Fallback to single operation if batch processor is not running
atomic.AddInt64(&bap.stats.SingleReads, 1) atomic.AddInt64(&bap.stats.SingleReads, 1)
atomic.AddInt64(&bap.stats.SingleFrames, 1) atomic.AddInt64(&bap.stats.SingleFrames, 1)
@ -191,22 +149,21 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) {
timestamp: time.Now(), timestamp: time.Now(),
} }
// Try to queue the request with non-blocking send
select { select {
case bap.readQueue <- request: case bap.readQueue <- request:
// Successfully queued // Successfully queued
default: case <-time.After(GetConfig().ShortTimeout):
// Queue is full, fallback to single operation // Queue is full or blocked, fallback to single operation
atomic.AddInt64(&bap.stats.SingleReads, 1) atomic.AddInt64(&bap.stats.SingleReads, 1)
atomic.AddInt64(&bap.stats.SingleFrames, 1) atomic.AddInt64(&bap.stats.SingleFrames, 1)
return CGOAudioReadEncode(buffer) return CGOAudioReadEncode(buffer)
} }
// Wait for result with timeout // Wait for result
select { select {
case result := <-resultChan: case result := <-resultChan:
return result.length, result.err return result.length, result.err
case <-time.After(cache.BatchProcessingTimeout): case <-time.After(GetConfig().MediumTimeout):
// Timeout, fallback to single operation // Timeout, fallback to single operation
atomic.AddInt64(&bap.stats.SingleReads, 1) atomic.AddInt64(&bap.stats.SingleReads, 1)
atomic.AddInt64(&bap.stats.SingleFrames, 1) atomic.AddInt64(&bap.stats.SingleFrames, 1)
@ -214,109 +171,6 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) {
} }
} }
// BatchDecodeWrite performs batched audio decode and write operations
// This is the legacy version that uses a single buffer
func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) {
// Get cached config to avoid GetConfig() calls in hot path
cache := GetCachedConfig()
cache.Update()
// Validate buffer before processing
if err := ValidateBufferSize(len(buffer)); err != nil {
bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing")
return 0, err
}
if !bap.IsRunning() {
// Fallback to single operation if batch processor is not running
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
return CGOAudioDecodeWriteLegacy(buffer)
}
resultChan := make(chan batchWriteResult, 1)
request := batchWriteRequest{
buffer: buffer,
resultChan: resultChan,
timestamp: time.Now(),
}
// Try to queue the request with non-blocking send
select {
case bap.writeQueue <- request:
// Successfully queued
default:
// Queue is full, fall back to single operation
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
return CGOAudioDecodeWriteLegacy(buffer)
}
// Wait for result with timeout
select {
case result := <-resultChan:
return result.length, result.err
case <-time.After(cache.BatchProcessingTimeout):
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
return CGOAudioDecodeWriteLegacy(buffer)
}
}
// BatchDecodeWriteWithBuffers performs batched audio decode and write operations with separate opus and PCM buffers
func (bap *BatchAudioProcessor) BatchDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
// Get cached config to avoid GetConfig() calls in hot path
cache := GetCachedConfig()
cache.Update()
// Validate buffers before processing
if len(opusData) == 0 {
return 0, fmt.Errorf("empty opus data buffer")
}
if len(pcmBuffer) == 0 {
return 0, fmt.Errorf("empty PCM buffer")
}
if !bap.IsRunning() {
// Fallback to single operation if batch processor is not running
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
// Use the optimized function with separate buffers
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
resultChan := make(chan batchWriteResult, 1)
request := batchWriteRequest{
opusData: opusData,
pcmBuffer: pcmBuffer,
resultChan: resultChan,
timestamp: time.Now(),
}
// Try to queue the request with non-blocking send
select {
case bap.writeQueue <- request:
// Successfully queued
default:
// Queue is full, fall back to single operation
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
// Use the optimized function with separate buffers
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
// Wait for result with timeout
select {
case result := <-resultChan:
return result.length, result.err
case <-time.After(cache.BatchProcessingTimeout):
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
// Use the optimized function with separate buffers
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
}
// batchReadProcessor processes batched read operations // batchReadProcessor processes batched read operations
func (bap *BatchAudioProcessor) batchReadProcessor() { func (bap *BatchAudioProcessor) batchReadProcessor() {
defer bap.logger.Debug().Msg("batch read processor stopped") defer bap.logger.Debug().Msg("batch read processor stopped")
@ -353,150 +207,43 @@ func (bap *BatchAudioProcessor) batchReadProcessor() {
} }
} }
// batchWriteProcessor processes batched write operations
func (bap *BatchAudioProcessor) batchWriteProcessor() {
defer bap.logger.Debug().Msg("batch write processor stopped")
ticker := time.NewTicker(bap.batchDuration)
defer ticker.Stop()
var batch []batchWriteRequest
batch = make([]batchWriteRequest, 0, bap.batchSize)
for atomic.LoadInt32(&bap.running) == 1 {
select {
case <-bap.ctx.Done():
return
case req := <-bap.writeQueue:
batch = append(batch, req)
if len(batch) >= bap.batchSize {
bap.processBatchWrite(batch)
batch = batch[:0] // Clear slice but keep capacity
}
case <-ticker.C:
if len(batch) > 0 {
bap.processBatchWrite(batch)
batch = batch[:0] // Clear slice but keep capacity
}
}
}
// Process any remaining requests
if len(batch) > 0 {
bap.processBatchWrite(batch)
}
}
// processBatchRead processes a batch of read requests efficiently // processBatchRead processes a batch of read requests efficiently
func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
batchSize := len(batch) if len(batch) == 0 {
if batchSize == 0 {
return return
} }
// Get cached config once - avoid repeated calls // Pin to OS thread for the entire batch to minimize thread switching overhead
cache := GetCachedConfig() start := time.Now()
minBatchSize := cache.MinBatchSizeForThreadPinning if atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) {
// Only pin to OS thread for large batches to reduce thread contention
var start time.Time
threadWasPinned := false
if batchSize >= minBatchSize && atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) {
start = time.Now()
threadWasPinned = true
runtime.LockOSThread() runtime.LockOSThread()
// Skip priority setting for better performance - audio threads already have good priority
// Set high priority for batch audio processing
if err := SetAudioThreadPriority(); err != nil {
bap.logger.Warn().Err(err).Msg("failed to set batch audio processing priority")
}
defer func() {
if err := ResetThreadPriority(); err != nil {
bap.logger.Warn().Err(err).Msg("failed to reset thread priority")
}
runtime.UnlockOSThread()
atomic.StoreInt32(&bap.threadPinned, 0)
bap.stats.OSThreadPinTime += time.Since(start)
}()
} }
// Update stats efficiently batchSize := len(batch)
atomic.AddInt64(&bap.stats.BatchedReads, 1) atomic.AddInt64(&bap.stats.BatchedReads, 1)
atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize)) atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize))
if batchSize > 1 { if batchSize > 1 {
atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1)) atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1))
} }
// Process each request in the batch with minimal overhead
for i := range batch {
req := &batch[i]
length, err := CGOAudioReadEncode(req.buffer)
// Send result back (non-blocking) - reuse result struct
select {
case req.resultChan <- batchReadResult{length: length, err: err}:
default:
// Requestor timed out, drop result
}
}
// Release thread lock if we pinned it
if threadWasPinned {
runtime.UnlockOSThread()
atomic.StoreInt32(&bap.threadPinned, 0)
bap.stats.OSThreadPinTime += time.Since(start)
}
bap.stats.LastBatchTime = time.Now()
}
// processBatchWrite processes a batch of write requests efficiently
func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) {
if len(batch) == 0 {
return
}
// Get cached config to avoid GetConfig() calls in hot path
cache := GetCachedConfig()
// Only pin to OS thread for large batches to reduce thread contention
start := time.Now()
shouldPinThread := len(batch) >= cache.MinBatchSizeForThreadPinning
// Track if we pinned the thread in this call
threadWasPinned := false
if shouldPinThread && atomic.CompareAndSwapInt32(&bap.writePinned, 0, 1) {
threadWasPinned = true
runtime.LockOSThread()
// Set high priority for batch audio processing - skip logging in hotpath
_ = SetAudioThreadPriority()
}
batchSize := len(batch)
atomic.AddInt64(&bap.stats.BatchedWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, int64(batchSize))
if batchSize > 1 {
atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1))
}
// Add deferred function to release thread lock if we pinned it
if threadWasPinned {
defer func() {
// Skip logging in hotpath for performance
_ = ResetThreadPriority()
runtime.UnlockOSThread()
atomic.StoreInt32(&bap.writePinned, 0)
bap.stats.WriteThreadTime += time.Since(start)
}()
}
// Process each request in the batch // Process each request in the batch
for _, req := range batch { for _, req := range batch {
var length int length, err := CGOAudioReadEncode(req.buffer)
var err error result := batchReadResult{
// Handle both legacy and new decode-write operations
if req.opusData != nil && req.pcmBuffer != nil {
// New style with separate opus data and PCM buffer
length, err = CGOAudioDecodeWrite(req.opusData, req.pcmBuffer)
} else {
// Legacy style with single buffer
length, err = CGOAudioDecodeWriteLegacy(req.buffer)
}
result := batchWriteResult{
length: length, length: length,
err: err, err: err,
} }
@ -509,7 +256,7 @@ func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) {
} }
} }
bap.stats.LastWriteTime = time.Now() bap.stats.LastBatchTime = time.Now()
} }
// GetStats returns current batch processor statistics // GetStats returns current batch processor statistics
@ -517,16 +264,11 @@ func (bap *BatchAudioProcessor) GetStats() BatchAudioStats {
return BatchAudioStats{ return BatchAudioStats{
BatchedReads: atomic.LoadInt64(&bap.stats.BatchedReads), BatchedReads: atomic.LoadInt64(&bap.stats.BatchedReads),
SingleReads: atomic.LoadInt64(&bap.stats.SingleReads), SingleReads: atomic.LoadInt64(&bap.stats.SingleReads),
BatchedWrites: atomic.LoadInt64(&bap.stats.BatchedWrites),
SingleWrites: atomic.LoadInt64(&bap.stats.SingleWrites),
BatchedFrames: atomic.LoadInt64(&bap.stats.BatchedFrames), BatchedFrames: atomic.LoadInt64(&bap.stats.BatchedFrames),
SingleFrames: atomic.LoadInt64(&bap.stats.SingleFrames), SingleFrames: atomic.LoadInt64(&bap.stats.SingleFrames),
WriteFrames: atomic.LoadInt64(&bap.stats.WriteFrames),
CGOCallsReduced: atomic.LoadInt64(&bap.stats.CGOCallsReduced), CGOCallsReduced: atomic.LoadInt64(&bap.stats.CGOCallsReduced),
OSThreadPinTime: bap.stats.OSThreadPinTime, OSThreadPinTime: bap.stats.OSThreadPinTime,
WriteThreadTime: bap.stats.WriteThreadTime,
LastBatchTime: bap.stats.LastBatchTime, LastBatchTime: bap.stats.LastBatchTime,
LastWriteTime: bap.stats.LastWriteTime,
} }
} }
@ -550,11 +292,8 @@ func GetBatchAudioProcessor() *BatchAudioProcessor {
// Initialize on first use // Initialize on first use
if atomic.CompareAndSwapInt32(&batchProcessorInitialized, 0, 1) { if atomic.CompareAndSwapInt32(&batchProcessorInitialized, 0, 1) {
// Get cached config to avoid GetConfig() calls config := GetConfig()
cache := GetCachedConfig() processor := NewBatchAudioProcessor(config.BatchProcessorFramesPerBatch, config.BatchProcessorTimeout)
cache.Update()
processor := NewBatchAudioProcessor(cache.BatchProcessorFramesPerBatch, cache.BatchProcessorTimeout)
atomic.StorePointer(&globalBatchProcessor, unsafe.Pointer(processor)) atomic.StorePointer(&globalBatchProcessor, unsafe.Pointer(processor))
return processor return processor
} }
@ -588,32 +327,8 @@ func DisableBatchAudioProcessing() {
// BatchCGOAudioReadEncode is a batched version of CGOAudioReadEncode // BatchCGOAudioReadEncode is a batched version of CGOAudioReadEncode
func BatchCGOAudioReadEncode(buffer []byte) (int, error) { func BatchCGOAudioReadEncode(buffer []byte) (int, error) {
processor := GetBatchAudioProcessor() processor := GetBatchAudioProcessor()
if processor == nil || !processor.IsRunning() { if processor != nil && processor.IsRunning() {
// Fall back to non-batched version if processor is not running return processor.BatchReadEncode(buffer)
return CGOAudioReadEncode(buffer)
} }
return CGOAudioReadEncode(buffer)
return processor.BatchReadEncode(buffer)
}
// BatchCGOAudioDecodeWrite is a batched version of CGOAudioDecodeWrite
func BatchCGOAudioDecodeWrite(buffer []byte) (int, error) {
processor := GetBatchAudioProcessor()
if processor == nil || !processor.IsRunning() {
// Fall back to non-batched version if processor is not running
return CGOAudioDecodeWriteLegacy(buffer)
}
return processor.BatchDecodeWrite(buffer)
}
// BatchCGOAudioDecodeWriteWithBuffers is a batched version of CGOAudioDecodeWrite that uses separate opus and PCM buffers
func BatchCGOAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
processor := GetBatchAudioProcessor()
if processor == nil || !processor.IsRunning() {
// Fall back to non-batched version if processor is not running
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
return processor.BatchDecodeWriteWithBuffers(opusData, pcmBuffer)
} }

View File

@ -1,118 +1,24 @@
//go:build cgo
// +build cgo
package audio package audio
import ( import (
"runtime" "runtime"
"sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"unsafe" "unsafe"
) )
// AudioLatencyInfo holds simplified latency information for cleanup decisions
type AudioLatencyInfo struct {
LatencyMs float64
Timestamp time.Time
}
// Global latency tracking
var (
currentAudioLatency = AudioLatencyInfo{}
currentAudioLatencyLock sync.RWMutex
audioMonitoringInitialized int32 // Atomic flag to track initialization
)
// InitializeAudioMonitoring starts the background goroutines for latency tracking and cache cleanup
// This is safe to call multiple times as it will only initialize once
func InitializeAudioMonitoring() {
// Use atomic CAS to ensure we only initialize once
if atomic.CompareAndSwapInt32(&audioMonitoringInitialized, 0, 1) {
// Start the latency recorder
startLatencyRecorder()
// Start the cleanup goroutine
startCleanupGoroutine()
}
}
// latencyChannel is used for non-blocking latency recording
var latencyChannel = make(chan float64, 10)
// startLatencyRecorder starts the latency recorder goroutine
// This should be called during package initialization
func startLatencyRecorder() {
go latencyRecorderLoop()
}
// latencyRecorderLoop processes latency recordings in the background
func latencyRecorderLoop() {
for latencyMs := range latencyChannel {
currentAudioLatencyLock.Lock()
currentAudioLatency = AudioLatencyInfo{
LatencyMs: latencyMs,
Timestamp: time.Now(),
}
currentAudioLatencyLock.Unlock()
}
}
// RecordAudioLatency records the current audio processing latency
// This is called from the audio input manager when latency is measured
// It is non-blocking to ensure zero overhead in the critical audio path
func RecordAudioLatency(latencyMs float64) {
// Non-blocking send - if channel is full, we drop the update
select {
case latencyChannel <- latencyMs:
// Successfully sent
default:
// Channel full, drop this update to avoid blocking the audio path
}
}
// GetAudioLatencyMetrics returns the current audio latency information
// Returns nil if no latency data is available or if it's too old
func GetAudioLatencyMetrics() *AudioLatencyInfo {
currentAudioLatencyLock.RLock()
defer currentAudioLatencyLock.RUnlock()
// Check if we have valid latency data
if currentAudioLatency.Timestamp.IsZero() {
return nil
}
// Check if the data is too old (more than 5 seconds)
if time.Since(currentAudioLatency.Timestamp) > 5*time.Second {
return nil
}
return &AudioLatencyInfo{
LatencyMs: currentAudioLatency.LatencyMs,
Timestamp: currentAudioLatency.Timestamp,
}
}
// Lock-free buffer cache for per-goroutine optimization // Lock-free buffer cache for per-goroutine optimization
type lockFreeBufferCache struct { type lockFreeBufferCache struct {
buffers [4]*[]byte // Small fixed-size array for lock-free access buffers [4]*[]byte // Small fixed-size array for lock-free access
} }
// TTL tracking for goroutine cache entries
type cacheEntry struct {
cache *lockFreeBufferCache
lastAccess int64 // Unix timestamp of last access
gid int64 // Goroutine ID for better tracking
}
// Per-goroutine buffer cache using goroutine-local storage // Per-goroutine buffer cache using goroutine-local storage
var goroutineBufferCache = make(map[int64]*lockFreeBufferCache) var goroutineBufferCache = make(map[int64]*lockFreeBufferCache)
var goroutineCacheMutex sync.RWMutex var goroutineCacheMutex sync.RWMutex
var lastCleanupTime int64 // Unix timestamp of last cleanup var lastCleanupTime int64 // Unix timestamp of last cleanup
const maxCacheSize = 500 // Maximum number of goroutine caches (reduced from 1000) const maxCacheSize = 1000 // Maximum number of goroutine caches
const cleanupInterval int64 = 30 // Cleanup interval in seconds (30 seconds, reduced from 60) const cleanupInterval = 300 // Cleanup interval in seconds (5 minutes)
const bufferTTL int64 = 60 // Time-to-live for cached buffers in seconds (1 minute, reduced from 2)
// getGoroutineID extracts goroutine ID from runtime stack for cache key // getGoroutineID extracts goroutine ID from runtime stack for cache key
func getGoroutineID() int64 { func getGoroutineID() int64 {
@ -133,67 +39,13 @@ func getGoroutineID() int64 {
return 0 return 0
} }
// Map of goroutine ID to cache entry with TTL tracking // cleanupGoroutineCache removes stale entries from the goroutine cache
var goroutineCacheWithTTL = make(map[int64]*cacheEntry) func cleanupGoroutineCache() {
// cleanupChannel is used for asynchronous cleanup requests
var cleanupChannel = make(chan struct{}, 1)
// startCleanupGoroutine starts the cleanup goroutine
// This should be called during package initialization
func startCleanupGoroutine() {
go cleanupLoop()
}
// cleanupLoop processes cleanup requests in the background
func cleanupLoop() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-cleanupChannel:
// Received explicit cleanup request
performCleanup(true)
case <-ticker.C:
// Regular cleanup check
performCleanup(false)
}
}
}
// requestCleanup signals the cleanup goroutine to perform a cleanup
// This is non-blocking and can be called from the critical path
func requestCleanup() {
select {
case cleanupChannel <- struct{}{}:
// Successfully requested cleanup
default:
// Channel full, cleanup already pending
}
}
// performCleanup does the actual cache cleanup work
// This runs in a dedicated goroutine, not in the critical path
func performCleanup(forced bool) {
now := time.Now().Unix() now := time.Now().Unix()
lastCleanup := atomic.LoadInt64(&lastCleanupTime) lastCleanup := atomic.LoadInt64(&lastCleanupTime)
// Check if we're in a high-latency situation // Only cleanup if enough time has passed
isHighLatency := false if now-lastCleanup < cleanupInterval {
latencyMetrics := GetAudioLatencyMetrics()
if latencyMetrics != nil && latencyMetrics.LatencyMs > 10.0 {
// Under high latency, be more aggressive with cleanup
isHighLatency = true
}
// Only cleanup if enough time has passed (less time if high latency) or if forced
interval := cleanupInterval
if isHighLatency {
interval = cleanupInterval / 2 // More frequent cleanup under high latency
}
if !forced && now-lastCleanup < interval {
return return
} }
@ -202,93 +54,23 @@ func performCleanup(forced bool) {
return // Another goroutine is already cleaning up return // Another goroutine is already cleaning up
} }
// Perform the actual cleanup
doCleanupGoroutineCache()
}
// cleanupGoroutineCache triggers an asynchronous cleanup of the goroutine cache
// This is safe to call from the critical path as it's non-blocking
func cleanupGoroutineCache() {
// Request asynchronous cleanup
requestCleanup()
}
// The actual cleanup implementation that runs in the background goroutine
func doCleanupGoroutineCache() {
// Get current time for TTL calculations
now := time.Now().Unix()
// Check if we're in a high-latency situation
isHighLatency := false
latencyMetrics := GetAudioLatencyMetrics()
if latencyMetrics != nil && latencyMetrics.LatencyMs > 10.0 {
// Under high latency, be more aggressive with cleanup
isHighLatency = true
}
goroutineCacheMutex.Lock() goroutineCacheMutex.Lock()
defer goroutineCacheMutex.Unlock() defer goroutineCacheMutex.Unlock()
// Convert old cache format to new TTL-based format if needed // If cache is too large, remove oldest entries (simple FIFO)
if len(goroutineCacheWithTTL) == 0 && len(goroutineBufferCache) > 0 { if len(goroutineBufferCache) > maxCacheSize {
for gid, cache := range goroutineBufferCache { // Remove half of the entries to avoid frequent cleanups
goroutineCacheWithTTL[gid] = &cacheEntry{ toRemove := len(goroutineBufferCache) - maxCacheSize/2
cache: cache, count := 0
lastAccess: now, for gid := range goroutineBufferCache {
delete(goroutineBufferCache, gid)
count++
if count >= toRemove {
break
} }
} }
// Clear old cache to free memory // Log cleanup for debugging (removed logging dependency)
goroutineBufferCache = make(map[int64]*lockFreeBufferCache) _ = count // Avoid unused variable warning
}
// Remove stale entries based on TTL (more aggressive under high latency)
expiredCount := 0
ttl := bufferTTL
if isHighLatency {
// Under high latency, use a much shorter TTL
ttl = bufferTTL / 4
}
for gid, entry := range goroutineCacheWithTTL {
// Both now and entry.lastAccess are int64, so this comparison is safe
if now-entry.lastAccess > ttl {
delete(goroutineCacheWithTTL, gid)
expiredCount++
}
}
// If cache is still too large after TTL cleanup, remove oldest entries
// Under high latency, use a more aggressive target size
targetSize := maxCacheSize
targetReduction := maxCacheSize / 2
if isHighLatency {
// Under high latency, target a much smaller cache size
targetSize = maxCacheSize / 4
targetReduction = maxCacheSize / 8
}
if len(goroutineCacheWithTTL) > targetSize {
// Find oldest entries
type ageEntry struct {
gid int64
lastAccess int64
}
oldestEntries := make([]ageEntry, 0, len(goroutineCacheWithTTL))
for gid, entry := range goroutineCacheWithTTL {
oldestEntries = append(oldestEntries, ageEntry{gid, entry.lastAccess})
}
// Sort by lastAccess (oldest first)
sort.Slice(oldestEntries, func(i, j int) bool {
return oldestEntries[i].lastAccess < oldestEntries[j].lastAccess
})
// Remove oldest entries to get down to target reduction size
toRemove := len(goroutineCacheWithTTL) - targetReduction
for i := 0; i < toRemove && i < len(oldestEntries); i++ {
delete(goroutineCacheWithTTL, oldestEntries[i].gid)
}
} }
} }
@ -351,29 +133,39 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool {
} }
func (p *AudioBufferPool) Get() []byte { func (p *AudioBufferPool) Get() []byte {
// Skip cleanup trigger in hotpath - cleanup runs in background // Trigger periodic cleanup of goroutine cache
// cleanupGoroutineCache() - moved to background goroutine cleanupGoroutineCache()
start := time.Now()
wasHit := false
defer func() {
latency := time.Since(start)
// Record metrics for frame pool (assuming this is the main usage)
if p.bufferSize >= GetConfig().AudioFramePoolSize {
GetGranularMetricsCollector().RecordFramePoolGet(latency, wasHit)
} else {
GetGranularMetricsCollector().RecordControlPoolGet(latency, wasHit)
}
}()
// Fast path: Try lock-free per-goroutine cache first // Fast path: Try lock-free per-goroutine cache first
gid := getGoroutineID() gid := getGoroutineID()
goroutineCacheMutex.RLock() goroutineCacheMutex.RLock()
cacheEntry, exists := goroutineCacheWithTTL[gid] cache, exists := goroutineBufferCache[gid]
goroutineCacheMutex.RUnlock() goroutineCacheMutex.RUnlock()
if exists && cacheEntry != nil && cacheEntry.cache != nil { if exists && cache != nil {
// Try to get buffer from lock-free cache // Try to get buffer from lock-free cache
cache := cacheEntry.cache
for i := 0; i < len(cache.buffers); i++ { for i := 0; i < len(cache.buffers); i++ {
bufPtr := (*unsafe.Pointer)(unsafe.Pointer(&cache.buffers[i])) bufPtr := (*unsafe.Pointer)(unsafe.Pointer(&cache.buffers[i]))
buf := (*[]byte)(atomic.LoadPointer(bufPtr)) buf := (*[]byte)(atomic.LoadPointer(bufPtr))
if buf != nil && atomic.CompareAndSwapPointer(bufPtr, unsafe.Pointer(buf), nil) { if buf != nil && atomic.CompareAndSwapPointer(bufPtr, unsafe.Pointer(buf), nil) {
atomic.AddInt64(&p.hitCount, 1) atomic.AddInt64(&p.hitCount, 1)
wasHit = true
*buf = (*buf)[:0] *buf = (*buf)[:0]
return *buf return *buf
} }
} }
// Update access time only after cache miss to reduce overhead
cacheEntry.lastAccess = time.Now().Unix()
} }
// Fallback: Try pre-allocated pool with mutex // Fallback: Try pre-allocated pool with mutex
@ -383,7 +175,11 @@ func (p *AudioBufferPool) Get() []byte {
buf := p.preallocated[lastIdx] buf := p.preallocated[lastIdx]
p.preallocated = p.preallocated[:lastIdx] p.preallocated = p.preallocated[:lastIdx]
p.mutex.Unlock() p.mutex.Unlock()
// Update hit counter
atomic.AddInt64(&p.hitCount, 1) atomic.AddInt64(&p.hitCount, 1)
wasHit = true
// Ensure buffer is properly reset
*buf = (*buf)[:0] *buf = (*buf)[:0]
return *buf return *buf
} }
@ -392,14 +188,18 @@ func (p *AudioBufferPool) Get() []byte {
// Try sync.Pool next // Try sync.Pool next
if poolBuf := p.pool.Get(); poolBuf != nil { if poolBuf := p.pool.Get(); poolBuf != nil {
buf := poolBuf.(*[]byte) buf := poolBuf.(*[]byte)
// Update hit counter
atomic.AddInt64(&p.hitCount, 1) atomic.AddInt64(&p.hitCount, 1)
atomic.AddInt64(&p.currentSize, -1) // Ensure buffer is properly reset and check capacity
// Fast capacity check - most buffers should be correct size
if cap(*buf) >= p.bufferSize { if cap(*buf) >= p.bufferSize {
wasHit = true
*buf = (*buf)[:0] *buf = (*buf)[:0]
return *buf return *buf
} else {
// Buffer too small, allocate new one
atomic.AddInt64(&p.missCount, 1)
return make([]byte, 0, p.bufferSize)
} }
// Buffer too small, fall through to allocation
} }
// Pool miss - allocate new buffer with exact capacity // Pool miss - allocate new buffer with exact capacity
@ -408,7 +208,18 @@ func (p *AudioBufferPool) Get() []byte {
} }
func (p *AudioBufferPool) Put(buf []byte) { func (p *AudioBufferPool) Put(buf []byte) {
// Fast validation - reject buffers that are too small or too large start := time.Now()
defer func() {
latency := time.Since(start)
// Record metrics for frame pool (assuming this is the main usage)
if p.bufferSize >= GetConfig().AudioFramePoolSize {
GetGranularMetricsCollector().RecordFramePoolPut(latency, cap(buf))
} else {
GetGranularMetricsCollector().RecordControlPoolPut(latency, cap(buf))
}
}()
// Validate buffer capacity - reject buffers that are too small or too large
bufCap := cap(buf) bufCap := cap(buf)
if bufCap < p.bufferSize || bufCap > p.bufferSize*2 { if bufCap < p.bufferSize || bufCap > p.bufferSize*2 {
return // Buffer size mismatch, don't pool it to prevent memory bloat return // Buffer size mismatch, don't pool it to prevent memory bloat
@ -420,23 +231,14 @@ func (p *AudioBufferPool) Put(buf []byte) {
// Fast path: Try to put in lock-free per-goroutine cache // Fast path: Try to put in lock-free per-goroutine cache
gid := getGoroutineID() gid := getGoroutineID()
goroutineCacheMutex.RLock() goroutineCacheMutex.RLock()
entryWithTTL, exists := goroutineCacheWithTTL[gid] cache, exists := goroutineBufferCache[gid]
goroutineCacheMutex.RUnlock() goroutineCacheMutex.RUnlock()
var cache *lockFreeBufferCache if !exists {
if exists && entryWithTTL != nil {
cache = entryWithTTL.cache
// Update access time only when we successfully use the cache
} else {
// Create new cache for this goroutine // Create new cache for this goroutine
cache = &lockFreeBufferCache{} cache = &lockFreeBufferCache{}
now := time.Now().Unix()
goroutineCacheMutex.Lock() goroutineCacheMutex.Lock()
goroutineCacheWithTTL[gid] = &cacheEntry{ goroutineBufferCache[gid] = cache
cache: cache,
lastAccess: now,
gid: gid,
}
goroutineCacheMutex.Unlock() goroutineCacheMutex.Unlock()
} }
@ -445,10 +247,6 @@ func (p *AudioBufferPool) Put(buf []byte) {
for i := 0; i < len(cache.buffers); i++ { for i := 0; i < len(cache.buffers); i++ {
bufPtr := (*unsafe.Pointer)(unsafe.Pointer(&cache.buffers[i])) bufPtr := (*unsafe.Pointer)(unsafe.Pointer(&cache.buffers[i]))
if atomic.CompareAndSwapPointer(bufPtr, nil, unsafe.Pointer(&buf)) { if atomic.CompareAndSwapPointer(bufPtr, nil, unsafe.Pointer(&buf)) {
// Update access time only on successful cache
if exists && entryWithTTL != nil {
entryWithTTL.lastAccess = time.Now().Unix()
}
return // Successfully cached return // Successfully cached
} }
} }
@ -464,12 +262,15 @@ func (p *AudioBufferPool) Put(buf []byte) {
p.mutex.Unlock() p.mutex.Unlock()
// Check sync.Pool size limit to prevent excessive memory usage // Check sync.Pool size limit to prevent excessive memory usage
if atomic.LoadInt64(&p.currentSize) >= int64(p.maxPoolSize) { currentSize := atomic.LoadInt64(&p.currentSize)
if currentSize >= int64(p.maxPoolSize) {
return // Pool is full, let GC handle this buffer return // Pool is full, let GC handle this buffer
} }
// Return to sync.Pool and update counter atomically // Return to sync.Pool
p.pool.Put(&resetBuf) p.pool.Put(&resetBuf)
// Update pool size counter atomically
atomic.AddInt64(&p.currentSize, 1) atomic.AddInt64(&p.currentSize, 1)
} }
@ -532,8 +333,6 @@ type AudioBufferPoolDetailedStats struct {
HitCount int64 HitCount int64
MissCount int64 MissCount int64
HitRate float64 // Percentage HitRate float64 // Percentage
TotalBytes int64 // Total memory usage in bytes
AverageBufferSize float64 // Average size of buffers in the pool
} }
// GetAudioBufferPoolStats returns statistics about the audio buffer pools // GetAudioBufferPoolStats returns statistics about the audio buffer pools

View File

@ -5,9 +5,6 @@ package audio
import ( import (
"errors" "errors"
"fmt" "fmt"
"sync"
"sync/atomic"
"time"
"unsafe" "unsafe"
) )
@ -676,26 +673,23 @@ func newAudioDecodeWriteError(cErrorCode int) error {
} }
func cgoAudioInit() error { func cgoAudioInit() error {
// Get cached config and ensure it's updated // Update C constants from Go configuration
cache := GetCachedConfig() config := GetConfig()
cache.Update()
// Update C constants from cached config (atomic access, no locks)
C.update_audio_constants( C.update_audio_constants(
C.int(cache.opusBitrate.Load()), C.int(config.CGOOpusBitrate),
C.int(cache.opusComplexity.Load()), C.int(config.CGOOpusComplexity),
C.int(cache.opusVBR.Load()), C.int(config.CGOOpusVBR),
C.int(cache.opusVBRConstraint.Load()), C.int(config.CGOOpusVBRConstraint),
C.int(cache.opusSignalType.Load()), C.int(config.CGOOpusSignalType),
C.int(cache.opusBandwidth.Load()), C.int(config.CGOOpusBandwidth),
C.int(cache.opusDTX.Load()), C.int(config.CGOOpusDTX),
C.int(cache.sampleRate.Load()), C.int(config.CGOSampleRate),
C.int(cache.channels.Load()), C.int(config.CGOChannels),
C.int(cache.frameSize.Load()), C.int(config.CGOFrameSize),
C.int(cache.maxPacketSize.Load()), C.int(config.CGOMaxPacketSize),
C.int(GetConfig().CGOUsleepMicroseconds), C.int(config.CGOUsleepMicroseconds),
C.int(GetConfig().CGOMaxAttempts), C.int(config.CGOMaxAttempts),
C.int(GetConfig().CGOMaxBackoffMicroseconds), C.int(config.CGOMaxBackoffMicroseconds),
) )
result := C.jetkvm_audio_init() result := C.jetkvm_audio_init()
@ -709,223 +703,30 @@ func cgoAudioClose() {
C.jetkvm_audio_close() C.jetkvm_audio_close()
} }
// AudioConfigCache provides a comprehensive caching system for audio configuration
// to minimize GetConfig() calls in the hot path
type AudioConfigCache struct {
// Atomic fields for lock-free access to frequently used values
minReadEncodeBuffer atomic.Int32
maxDecodeWriteBuffer atomic.Int32
maxPacketSize atomic.Int32
maxPCMBufferSize atomic.Int32
opusBitrate atomic.Int32
opusComplexity atomic.Int32
opusVBR atomic.Int32
opusVBRConstraint atomic.Int32
opusSignalType atomic.Int32
opusBandwidth atomic.Int32
opusDTX atomic.Int32
sampleRate atomic.Int32
channels atomic.Int32
frameSize atomic.Int32
// Additional cached values for validation functions
maxAudioFrameSize atomic.Int32
maxChannels atomic.Int32
minFrameDuration atomic.Int64 // Store as nanoseconds
maxFrameDuration atomic.Int64 // Store as nanoseconds
minOpusBitrate atomic.Int32
maxOpusBitrate atomic.Int32
// Batch processing related values
BatchProcessingTimeout time.Duration
BatchProcessorFramesPerBatch int
BatchProcessorTimeout time.Duration
BatchProcessingDelay time.Duration
MinBatchSizeForThreadPinning int
// Mutex for updating the cache
mutex sync.RWMutex
lastUpdate time.Time
cacheExpiry time.Duration
initialized atomic.Bool
// Pre-allocated errors to avoid allocations in hot path
bufferTooSmallReadEncode error
bufferTooLargeDecodeWrite error
}
// Global audio config cache instance
var globalAudioConfigCache = &AudioConfigCache{
cacheExpiry: 30 * time.Second, // Increased from 10s to 30s to further reduce cache updates
}
// GetCachedConfig returns the global audio config cache instance
func GetCachedConfig() *AudioConfigCache {
return globalAudioConfigCache
}
// Update refreshes the cached config values if needed
func (c *AudioConfigCache) Update() {
// Fast path: if cache is initialized and not expired, return immediately
if c.initialized.Load() {
c.mutex.RLock()
cacheExpired := time.Since(c.lastUpdate) > c.cacheExpiry
c.mutex.RUnlock()
if !cacheExpired {
return
}
}
// Slow path: update cache
c.mutex.Lock()
defer c.mutex.Unlock()
// Double-check after acquiring lock
if !c.initialized.Load() || time.Since(c.lastUpdate) > c.cacheExpiry {
config := GetConfig() // Call GetConfig() only once
// Update atomic values for lock-free access - CGO values
c.minReadEncodeBuffer.Store(int32(config.MinReadEncodeBuffer))
c.maxDecodeWriteBuffer.Store(int32(config.MaxDecodeWriteBuffer))
c.maxPacketSize.Store(int32(config.CGOMaxPacketSize))
c.maxPCMBufferSize.Store(int32(config.MaxPCMBufferSize))
c.opusBitrate.Store(int32(config.CGOOpusBitrate))
c.opusComplexity.Store(int32(config.CGOOpusComplexity))
c.opusVBR.Store(int32(config.CGOOpusVBR))
c.opusVBRConstraint.Store(int32(config.CGOOpusVBRConstraint))
c.opusSignalType.Store(int32(config.CGOOpusSignalType))
c.opusBandwidth.Store(int32(config.CGOOpusBandwidth))
c.opusDTX.Store(int32(config.CGOOpusDTX))
c.sampleRate.Store(int32(config.CGOSampleRate))
c.channels.Store(int32(config.CGOChannels))
c.frameSize.Store(int32(config.CGOFrameSize))
// Update additional validation values
c.maxAudioFrameSize.Store(int32(config.MaxAudioFrameSize))
c.maxChannels.Store(int32(config.MaxChannels))
c.minFrameDuration.Store(int64(config.MinFrameDuration))
c.maxFrameDuration.Store(int64(config.MaxFrameDuration))
c.minOpusBitrate.Store(int32(config.MinOpusBitrate))
c.maxOpusBitrate.Store(int32(config.MaxOpusBitrate))
// Update batch processing related values
c.BatchProcessingTimeout = 100 * time.Millisecond // Fixed timeout for batch processing
c.BatchProcessorFramesPerBatch = config.BatchProcessorFramesPerBatch
c.BatchProcessorTimeout = config.BatchProcessorTimeout
c.BatchProcessingDelay = config.BatchProcessingDelay
c.MinBatchSizeForThreadPinning = config.MinBatchSizeForThreadPinning
// Pre-allocate common errors
c.bufferTooSmallReadEncode = newBufferTooSmallError(0, config.MinReadEncodeBuffer)
c.bufferTooLargeDecodeWrite = newBufferTooLargeError(config.MaxDecodeWriteBuffer+1, config.MaxDecodeWriteBuffer)
c.lastUpdate = time.Now()
c.initialized.Store(true)
// Update the global validation cache as well
if cachedMaxFrameSize != 0 {
cachedMaxFrameSize = config.MaxAudioFrameSize
}
}
}
// GetMinReadEncodeBuffer returns the cached MinReadEncodeBuffer value
func (c *AudioConfigCache) GetMinReadEncodeBuffer() int {
return int(c.minReadEncodeBuffer.Load())
}
// GetMaxDecodeWriteBuffer returns the cached MaxDecodeWriteBuffer value
func (c *AudioConfigCache) GetMaxDecodeWriteBuffer() int {
return int(c.maxDecodeWriteBuffer.Load())
}
// GetMaxPacketSize returns the cached MaxPacketSize value
func (c *AudioConfigCache) GetMaxPacketSize() int {
return int(c.maxPacketSize.Load())
}
// GetMaxPCMBufferSize returns the cached MaxPCMBufferSize value
func (c *AudioConfigCache) GetMaxPCMBufferSize() int {
return int(c.maxPCMBufferSize.Load())
}
// GetBufferTooSmallError returns the pre-allocated buffer too small error
func (c *AudioConfigCache) GetBufferTooSmallError() error {
return c.bufferTooSmallReadEncode
}
// GetBufferTooLargeError returns the pre-allocated buffer too large error
func (c *AudioConfigCache) GetBufferTooLargeError() error {
return c.bufferTooLargeDecodeWrite
}
// Removed duplicate config caching system - using AudioConfigCache instead
func cgoAudioReadEncode(buf []byte) (int, error) { func cgoAudioReadEncode(buf []byte) (int, error) {
// Fast path: Use AudioConfigCache to avoid GetConfig() in hot path minRequired := GetConfig().MinReadEncodeBuffer
cache := GetCachedConfig()
// Only update cache if expired - avoid unnecessary overhead
// Use proper locking to avoid race condition
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
cache.mutex.RUnlock()
if cacheExpired {
cache.Update()
}
} else {
cache.Update()
}
// Fast validation with cached values - avoid lock with atomic access
minRequired := cache.GetMinReadEncodeBuffer()
// Buffer validation - use pre-allocated error for common case
if len(buf) < minRequired { if len(buf) < minRequired {
// Use pre-allocated error for common case, only create custom error for edge cases return 0, newBufferTooSmallError(len(buf), minRequired)
if len(buf) > 0 {
return 0, newBufferTooSmallError(len(buf), minRequired)
}
return 0, cache.GetBufferTooSmallError()
} }
// Skip initialization check for now to avoid CGO compilation issues // Skip initialization check for now to avoid CGO compilation issues
// TODO: Add proper initialization state checking
// Note: The C code already has comprehensive state tracking with capture_initialized, // Note: The C code already has comprehensive state tracking with capture_initialized,
// capture_initializing, playback_initialized, and playback_initializing flags. // capture_initializing, playback_initialized, and playback_initializing flags.
// When CGO environment is properly configured, this should check C.capture_initialized.
// Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is safe for validated non-empty buffers
n := C.jetkvm_audio_read_encode(unsafe.Pointer(&buf[0])) n := C.jetkvm_audio_read_encode(unsafe.Pointer(&buf[0]))
// Fast path for success case
if n > 0 {
return int(n), nil
}
// Handle error cases - use static error codes to reduce allocations
if n < 0 { if n < 0 {
// Common error cases return 0, newAudioReadEncodeError(int(n))
switch n {
case -1:
return 0, errAudioInitFailed
case -2:
return 0, errAudioReadEncode
default:
return 0, newAudioReadEncodeError(int(n))
}
} }
if n == 0 {
// n == 0 case return 0, nil // No data available
return 0, nil // No data available }
return int(n), nil
} }
// Audio playback functions // Audio playback functions
func cgoAudioPlaybackInit() error { func cgoAudioPlaybackInit() error {
// Get cached config and ensure it's updated
cache := GetCachedConfig()
cache.Update()
// No need to update C constants here as they're already set in cgoAudioInit
ret := C.jetkvm_audio_playback_init() ret := C.jetkvm_audio_playback_init()
if ret != 0 { if ret != 0 {
return newAudioPlaybackInitError(int(ret)) return newAudioPlaybackInitError(int(ret))
@ -937,58 +738,34 @@ func cgoAudioPlaybackClose() {
C.jetkvm_audio_playback_close() C.jetkvm_audio_playback_close()
} }
func cgoAudioDecodeWrite(buf []byte) (n int, err error) { func cgoAudioDecodeWrite(buf []byte) (int, error) {
// Fast validation with AudioConfigCache
cache := GetCachedConfig()
// Only update cache if expired - avoid unnecessary overhead
// Use proper locking to avoid race condition
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
cache.mutex.RUnlock()
if cacheExpired {
cache.Update()
}
} else {
cache.Update()
}
// Optimized buffer validation
if len(buf) == 0 { if len(buf) == 0 {
return 0, errEmptyBuffer return 0, errEmptyBuffer
} }
if buf == nil {
// Use cached max buffer size with atomic access return 0, errNilBuffer
maxAllowed := cache.GetMaxDecodeWriteBuffer() }
maxAllowed := GetConfig().MaxDecodeWriteBuffer
if len(buf) > maxAllowed { if len(buf) > maxAllowed {
// Use pre-allocated error for common case
if len(buf) == maxAllowed+1 {
return 0, cache.GetBufferTooLargeError()
}
return 0, newBufferTooLargeError(len(buf), maxAllowed) return 0, newBufferTooLargeError(len(buf), maxAllowed)
} }
// Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is safe for validated non-empty buffers bufPtr := unsafe.Pointer(&buf[0])
n = int(C.jetkvm_audio_decode_write(unsafe.Pointer(&buf[0]), C.int(len(buf)))) if bufPtr == nil {
return 0, errInvalidBufferPtr
// Fast path for success case
if n >= 0 {
return n, nil
} }
// Handle error cases with static error codes defer func() {
switch n { if r := recover(); r != nil {
case -1: _ = r
n = 0 }
err = errAudioInitFailed }()
case -2:
n = 0 n := C.jetkvm_audio_decode_write(bufPtr, C.int(len(buf)))
err = errAudioDecodeWrite if n < 0 {
default: return 0, newAudioDecodeWriteError(int(n))
n = 0
err = newAudioDecodeWriteError(n)
} }
return return int(n), nil
} }
// updateOpusEncoderParams dynamically updates OPUS encoder parameters // updateOpusEncoderParams dynamically updates OPUS encoder parameters
@ -1008,406 +785,13 @@ func updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType
return nil return nil
} }
// Buffer pool for reusing buffers in CGO functions // CGO function aliases
var ( var (
// Using SizedBufferPool for better memory management CGOAudioInit = cgoAudioInit
// Track buffer pool usage for monitoring CGOAudioClose = cgoAudioClose
cgoBufferPoolGets atomic.Int64 CGOAudioReadEncode = cgoAudioReadEncode
cgoBufferPoolPuts atomic.Int64 CGOAudioPlaybackInit = cgoAudioPlaybackInit
// Batch processing statistics - only enabled in debug builds CGOAudioPlaybackClose = cgoAudioPlaybackClose
batchProcessingCount atomic.Int64 CGOAudioDecodeWrite = cgoAudioDecodeWrite
batchFrameCount atomic.Int64 CGOUpdateOpusEncoderParams = updateOpusEncoderParams
batchProcessingTime atomic.Int64
// Flag to control time tracking overhead
enableBatchTimeTracking atomic.Bool
) )
// GetBufferFromPool gets a buffer from the pool with at least the specified capacity
func GetBufferFromPool(minCapacity int) []byte {
cgoBufferPoolGets.Add(1)
// Use the SizedBufferPool for better memory management
return GetOptimalBuffer(minCapacity)
}
// ReturnBufferToPool returns a buffer to the pool
func ReturnBufferToPool(buf []byte) {
cgoBufferPoolPuts.Add(1)
// Use the SizedBufferPool for better memory management
ReturnOptimalBuffer(buf)
}
// Note: AudioFrameBatch is now defined in batch_audio.go
// This is kept here for reference but commented out to avoid conflicts
/*
// AudioFrameBatch represents a batch of audio frames for processing
type AudioFrameBatch struct {
// Buffer for batch processing
buffer []byte
// Number of frames in the batch
frameCount int
// Size of each frame
frameSize int
// Current position in the buffer
position int
}
// NewAudioFrameBatch creates a new audio frame batch with the specified capacity
func NewAudioFrameBatch(maxFrames int) *AudioFrameBatch {
// Get cached config
cache := GetCachedConfig()
cache.Update()
// Calculate frame size based on cached config
frameSize := cache.GetMinReadEncodeBuffer()
// Create batch with buffer sized for maxFrames
return &AudioFrameBatch{
buffer: GetBufferFromPool(maxFrames * frameSize),
frameCount: 0,
frameSize: frameSize,
position: 0,
}
}
// AddFrame adds a frame to the batch
// Returns true if the batch is full after adding this frame
func (b *AudioFrameBatch) AddFrame(frame []byte) bool {
// Calculate position in buffer for this frame
pos := b.position
// Copy frame data to batch buffer
copy(b.buffer[pos:pos+len(frame)], frame)
// Update position and frame count
b.position += len(frame)
b.frameCount++
// Check if batch is full (buffer capacity reached)
return b.position >= len(b.buffer)
}
// Reset resets the batch for reuse
func (b *AudioFrameBatch) Reset() {
b.frameCount = 0
b.position = 0
}
// Release returns the batch buffer to the pool
func (b *AudioFrameBatch) Release() {
ReturnBufferToPool(b.buffer)
b.buffer = nil
b.frameCount = 0
b.frameSize = 0
b.position = 0
}
*/
// ReadEncodeWithPooledBuffer reads audio data and encodes it using a buffer from the pool
// This reduces memory allocations by reusing buffers
func ReadEncodeWithPooledBuffer() ([]byte, int, error) {
// Get cached config
cache := GetCachedConfig()
// Only update cache if expired - avoid unnecessary overhead
// Use proper locking to avoid race condition
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
cache.mutex.RUnlock()
if cacheExpired {
cache.Update()
}
} else {
cache.Update()
}
// Get a buffer from the pool with appropriate capacity
bufferSize := cache.GetMinReadEncodeBuffer()
if bufferSize == 0 {
bufferSize = 1500 // Fallback if cache not initialized
}
// Get buffer from pool
buf := GetBufferFromPool(bufferSize)
// Perform read/encode operation
n, err := cgoAudioReadEncode(buf)
if err != nil {
// Return buffer to pool on error
ReturnBufferToPool(buf)
return nil, 0, err
}
// Resize buffer to actual data size
result := buf[:n]
// Return the buffer with data
return result, n, nil
}
// DecodeWriteWithPooledBuffer decodes and writes audio data using a pooled buffer
// The caller is responsible for returning the input buffer to the pool if needed
func DecodeWriteWithPooledBuffer(data []byte) (int, error) {
// Validate input
if len(data) == 0 {
return 0, errEmptyBuffer
}
// Get cached config
cache := GetCachedConfig()
// Only update cache if expired - avoid unnecessary overhead
// Use proper locking to avoid race condition
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
cache.mutex.RUnlock()
if cacheExpired {
cache.Update()
}
} else {
cache.Update()
}
// Ensure data doesn't exceed max packet size
maxPacketSize := cache.GetMaxPacketSize()
if len(data) > maxPacketSize {
return 0, newBufferTooLargeError(len(data), maxPacketSize)
}
// Get a PCM buffer from the pool for optimized decode-write
pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize())
defer ReturnBufferToPool(pcmBuffer)
// Perform decode/write operation using optimized implementation
n, err := CGOAudioDecodeWrite(data, pcmBuffer)
// Return result
return n, err
}
// BatchReadEncode reads and encodes multiple audio frames in a single batch
// This reduces CGO call overhead by processing multiple frames at once
func BatchReadEncode(batchSize int) ([][]byte, error) {
// Get cached config
cache := GetCachedConfig()
// Only update cache if expired - avoid unnecessary overhead
// Use proper locking to avoid race condition
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
cache.mutex.RUnlock()
if cacheExpired {
cache.Update()
}
} else {
cache.Update()
}
// Calculate total buffer size needed for batch
frameSize := cache.GetMinReadEncodeBuffer()
totalSize := frameSize * batchSize
// Get a single large buffer for all frames
batchBuffer := GetBufferFromPool(totalSize)
defer ReturnBufferToPool(batchBuffer)
// Pre-allocate frame result buffers from pool to avoid allocations in loop
frameBuffers := make([][]byte, 0, batchSize)
for i := 0; i < batchSize; i++ {
frameBuffers = append(frameBuffers, GetBufferFromPool(frameSize))
}
defer func() {
// Return all frame buffers to pool
for _, buf := range frameBuffers {
ReturnBufferToPool(buf)
}
}()
// Track batch processing statistics - only if enabled
var startTime time.Time
trackTime := enableBatchTimeTracking.Load()
if trackTime {
startTime = time.Now()
}
batchProcessingCount.Add(1)
// Process frames in batch
frames := make([][]byte, 0, batchSize)
for i := 0; i < batchSize; i++ {
// Calculate offset for this frame in the batch buffer
offset := i * frameSize
frameBuf := batchBuffer[offset : offset+frameSize]
// Process this frame
n, err := cgoAudioReadEncode(frameBuf)
if err != nil {
// Return partial batch on error
if i > 0 {
batchFrameCount.Add(int64(i))
if trackTime {
batchProcessingTime.Add(time.Since(startTime).Microseconds())
}
return frames, nil
}
return nil, err
}
// Reuse pre-allocated buffer instead of make([]byte, n)
frameCopy := frameBuffers[i][:n] // Slice to actual size
copy(frameCopy, frameBuf[:n])
frames = append(frames, frameCopy)
}
// Update statistics
batchFrameCount.Add(int64(len(frames)))
if trackTime {
batchProcessingTime.Add(time.Since(startTime).Microseconds())
}
return frames, nil
}
// BatchDecodeWrite decodes and writes multiple audio frames in a single batch
// This reduces CGO call overhead by processing multiple frames at once
func BatchDecodeWrite(frames [][]byte) error {
// Validate input
if len(frames) == 0 {
return nil
}
// Get cached config
cache := GetCachedConfig()
// Only update cache if expired - avoid unnecessary overhead
// Use proper locking to avoid race condition
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
cache.mutex.RUnlock()
if cacheExpired {
cache.Update()
}
} else {
cache.Update()
}
// Track batch processing statistics - only if enabled
var startTime time.Time
trackTime := enableBatchTimeTracking.Load()
if trackTime {
startTime = time.Now()
}
batchProcessingCount.Add(1)
// Get a PCM buffer from the pool for optimized decode-write
pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize())
defer ReturnBufferToPool(pcmBuffer)
// Process each frame
frameCount := 0
for _, frame := range frames {
// Skip empty frames
if len(frame) == 0 {
continue
}
// Process this frame using optimized implementation
_, err := CGOAudioDecodeWrite(frame, pcmBuffer)
if err != nil {
// Update statistics before returning error
batchFrameCount.Add(int64(frameCount))
if trackTime {
batchProcessingTime.Add(time.Since(startTime).Microseconds())
}
return err
}
frameCount++
}
// Update statistics
batchFrameCount.Add(int64(frameCount))
if trackTime {
batchProcessingTime.Add(time.Since(startTime).Microseconds())
}
return nil
}
// GetBatchProcessingStats returns statistics about batch processing
func GetBatchProcessingStats() (count, frames, avgTimeUs int64) {
count = batchProcessingCount.Load()
frames = batchFrameCount.Load()
totalTime := batchProcessingTime.Load()
// Calculate average time per batch
if count > 0 {
avgTimeUs = totalTime / count
}
return count, frames, avgTimeUs
}
// cgoAudioDecodeWriteWithBuffers decodes opus data and writes to PCM buffer
// This implementation uses separate buffers for opus data and PCM output
func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
// Validate input
if len(opusData) == 0 {
return 0, errEmptyBuffer
}
if len(pcmBuffer) == 0 {
return 0, errEmptyBuffer
}
// Get cached config
cache := GetCachedConfig()
// Only update cache if expired - avoid unnecessary overhead
// Use proper locking to avoid race condition
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
cache.mutex.RUnlock()
if cacheExpired {
cache.Update()
}
} else {
cache.Update()
}
// Ensure data doesn't exceed max packet size
maxPacketSize := cache.GetMaxPacketSize()
if len(opusData) > maxPacketSize {
return 0, newBufferTooLargeError(len(opusData), maxPacketSize)
}
// Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is never nil for non-empty slices
n := int(C.jetkvm_audio_decode_write(unsafe.Pointer(&opusData[0]), C.int(len(opusData))))
// Fast path for success case
if n >= 0 {
return n, nil
}
// Handle error cases with static error codes to reduce allocations
switch n {
case -1:
return 0, errAudioInitFailed
case -2:
return 0, errAudioDecodeWrite
default:
return 0, newAudioDecodeWriteError(n)
}
}
// Optimized CGO function aliases - use direct function calls to reduce overhead
// These are now direct function aliases instead of variable assignments
func CGOAudioInit() error { return cgoAudioInit() }
func CGOAudioClose() { cgoAudioClose() }
func CGOAudioReadEncode(buf []byte) (int, error) { return cgoAudioReadEncode(buf) }
func CGOAudioPlaybackInit() error { return cgoAudioPlaybackInit() }
func CGOAudioPlaybackClose() { cgoAudioPlaybackClose() }
func CGOAudioDecodeWriteLegacy(buf []byte) (int, error) { return cgoAudioDecodeWrite(buf) }
func CGOAudioDecodeWrite(opusData []byte, pcmBuffer []byte) (int, error) {
return cgoAudioDecodeWriteWithBuffers(opusData, pcmBuffer)
}
func CGOUpdateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx int) error {
return updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx)
}

View File

@ -30,11 +30,6 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) {
return 0, errors.New("audio not available in lint mode") return 0, errors.New("audio not available in lint mode")
} }
// cgoAudioDecodeWriteWithBuffers is a stub implementation for the optimized decode-write function
func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
return 0, errors.New("audio not available in lint mode")
}
// Uppercase aliases for external API compatibility // Uppercase aliases for external API compatibility
var ( var (
@ -43,6 +38,5 @@ var (
CGOAudioReadEncode = cgoAudioReadEncode CGOAudioReadEncode = cgoAudioReadEncode
CGOAudioPlaybackInit = cgoAudioPlaybackInit CGOAudioPlaybackInit = cgoAudioPlaybackInit
CGOAudioPlaybackClose = cgoAudioPlaybackClose CGOAudioPlaybackClose = cgoAudioPlaybackClose
CGOAudioDecodeWriteLegacy = cgoAudioDecodeWrite CGOAudioDecodeWrite = cgoAudioDecodeWrite
CGOAudioDecodeWrite = cgoAudioDecodeWriteWithBuffers
) )

View File

@ -11,7 +11,6 @@ import (
type AudioConfigConstants struct { type AudioConfigConstants struct {
// Audio Quality Presets // Audio Quality Presets
MaxAudioFrameSize int // Maximum audio frame size in bytes (default: 4096) MaxAudioFrameSize int // Maximum audio frame size in bytes (default: 4096)
MaxPCMBufferSize int // Maximum PCM buffer size in bytes for separate buffer optimization
// Opus Encoding Parameters // Opus Encoding Parameters
OpusBitrate int // Target bitrate for Opus encoding in bps (default: 128000) OpusBitrate int // Target bitrate for Opus encoding in bps (default: 128000)
@ -380,18 +379,6 @@ type AudioConfigConstants struct {
// Default 4096 bytes handles maximum audio frame size with safety margin. // Default 4096 bytes handles maximum audio frame size with safety margin.
MaxDecodeWriteBuffer int MaxDecodeWriteBuffer int
// MinBatchSizeForThreadPinning defines the minimum batch size required to pin a thread.
// Used in: batch_audio.go for deciding when to pin a thread for batch processing.
// Impact: Smaller values increase thread pinning frequency but may improve performance.
// Default 5 frames provides a good balance between performance and thread contention.
MinBatchSizeForThreadPinning int
// GoroutineMonitorInterval defines the interval for monitoring goroutine counts.
// Used in: goroutine_monitor.go for periodic goroutine count checks.
// Impact: Shorter intervals provide more frequent monitoring but increase overhead.
// Default 30 seconds provides reasonable monitoring frequency with minimal overhead.
GoroutineMonitorInterval time.Duration
// IPC Configuration - Inter-Process Communication settings for audio components // IPC Configuration - Inter-Process Communication settings for audio components
// Used in: ipc.go for configuring audio process communication // Used in: ipc.go for configuring audio process communication
// Impact: Controls IPC reliability, performance, and protocol compliance // Impact: Controls IPC reliability, performance, and protocol compliance
@ -1544,40 +1531,6 @@ type AudioConfigConstants struct {
LatencyBucket500ms time.Duration // 500ms latency bucket LatencyBucket500ms time.Duration // 500ms latency bucket
LatencyBucket1s time.Duration // 1s latency bucket LatencyBucket1s time.Duration // 1s latency bucket
LatencyBucket2s time.Duration // 2s latency bucket LatencyBucket2s time.Duration // 2s latency bucket
// Goroutine Pool Configuration
// Used in: goroutine_pool.go for managing reusable goroutines
// Impact: Reduces goroutine creation overhead and improves performance
// MaxAudioProcessorWorkers defines maximum number of workers in the audio processor pool.
// Used in: goroutine_pool.go for limiting concurrent audio processing goroutines
// Impact: Controls resource usage while ensuring sufficient processing capacity.
// Default 8 provides good parallelism without excessive resource consumption.
MaxAudioProcessorWorkers int
// MaxAudioReaderWorkers defines maximum number of workers in the audio reader pool.
// Used in: goroutine_pool.go for limiting concurrent audio reading goroutines
// Impact: Controls resource usage while ensuring sufficient reading capacity.
// Default 4 provides good parallelism for I/O operations.
MaxAudioReaderWorkers int
// AudioProcessorQueueSize defines the task queue size for the audio processor pool.
// Used in: goroutine_pool.go for buffering audio processing tasks
// Impact: Larger queue allows more tasks to be buffered during load spikes.
// Default 32 provides good buffering without excessive memory usage.
AudioProcessorQueueSize int
// AudioReaderQueueSize defines the task queue size for the audio reader pool.
// Used in: goroutine_pool.go for buffering audio reading tasks
// Impact: Larger queue allows more tasks to be buffered during load spikes.
// Default 16 provides good buffering for I/O operations.
AudioReaderQueueSize int
// WorkerMaxIdleTime defines how long a worker goroutine can remain idle before termination.
// Used in: goroutine_pool.go for efficient worker lifecycle management
// Impact: Shorter times reduce resource usage, longer times improve responsiveness.
// Default 30s balances resource usage with startup latency.
WorkerMaxIdleTime time.Duration
} }
// DefaultAudioConfig returns the default configuration constants // DefaultAudioConfig returns the default configuration constants
@ -1587,7 +1540,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
return &AudioConfigConstants{ return &AudioConfigConstants{
// Audio Quality Presets // Audio Quality Presets
MaxAudioFrameSize: 4096, MaxAudioFrameSize: 4096,
MaxPCMBufferSize: 8192, // Default PCM buffer size (2x MaxAudioFrameSize for safety)
// Opus Encoding Parameters // Opus Encoding Parameters
OpusBitrate: 128000, OpusBitrate: 128000,
@ -2436,13 +2388,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
EventTimeFormatString: "2006-01-02T15:04:05.000Z", // "2006-01-02T15:04:05.000Z" time format EventTimeFormatString: "2006-01-02T15:04:05.000Z", // "2006-01-02T15:04:05.000Z" time format
EventSubscriptionDelayMS: 100, // 100ms subscription delay EventSubscriptionDelayMS: 100, // 100ms subscription delay
// Goroutine Pool Configuration
MaxAudioProcessorWorkers: 16, // 16 workers for audio processing tasks
MaxAudioReaderWorkers: 8, // 8 workers for audio reading tasks
AudioProcessorQueueSize: 64, // 64 tasks queue size for processor pool
AudioReaderQueueSize: 32, // 32 tasks queue size for reader pool
WorkerMaxIdleTime: 60 * time.Second, // 60s maximum idle time before worker termination
// Input Processing Constants // Input Processing Constants
InputProcessingTimeoutMS: 10, // 10ms processing timeout threshold InputProcessingTimeoutMS: 10, // 10ms processing timeout threshold
@ -2519,12 +2464,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
LatencyBucket500ms: 500 * time.Millisecond, // 500ms latency bucket LatencyBucket500ms: 500 * time.Millisecond, // 500ms latency bucket
LatencyBucket1s: 1 * time.Second, // 1s latency bucket LatencyBucket1s: 1 * time.Second, // 1s latency bucket
LatencyBucket2s: 2 * time.Second, // 2s latency bucket LatencyBucket2s: 2 * time.Second, // 2s latency bucket
// Batch Audio Processing Configuration
MinBatchSizeForThreadPinning: 5, // Minimum batch size to pin thread
// Goroutine Monitoring Configuration
GoroutineMonitorInterval: 30 * time.Second, // 30s monitoring interval
} }
} }

View File

@ -1,145 +0,0 @@
package audio
import (
"runtime"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
)
// GoroutineMonitor tracks goroutine count and provides cleanup mechanisms
type GoroutineMonitor struct {
baselineCount int
peakCount int
lastCount int
monitorInterval time.Duration
lastCheck time.Time
enabled int32
}
// Global goroutine monitor instance
var globalGoroutineMonitor *GoroutineMonitor
// NewGoroutineMonitor creates a new goroutine monitor
func NewGoroutineMonitor(monitorInterval time.Duration) *GoroutineMonitor {
if monitorInterval <= 0 {
monitorInterval = 30 * time.Second
}
// Get current goroutine count as baseline
baselineCount := runtime.NumGoroutine()
return &GoroutineMonitor{
baselineCount: baselineCount,
peakCount: baselineCount,
lastCount: baselineCount,
monitorInterval: monitorInterval,
lastCheck: time.Now(),
}
}
// Start begins goroutine monitoring
func (gm *GoroutineMonitor) Start() {
if !atomic.CompareAndSwapInt32(&gm.enabled, 0, 1) {
return // Already running
}
go gm.monitorLoop()
}
// Stop stops goroutine monitoring
func (gm *GoroutineMonitor) Stop() {
atomic.StoreInt32(&gm.enabled, 0)
}
// monitorLoop periodically checks goroutine count
func (gm *GoroutineMonitor) monitorLoop() {
logger := logging.GetDefaultLogger().With().Str("component", "goroutine-monitor").Logger()
logger.Info().Int("baseline", gm.baselineCount).Msg("goroutine monitor started")
for atomic.LoadInt32(&gm.enabled) == 1 {
time.Sleep(gm.monitorInterval)
gm.checkGoroutineCount()
}
logger.Info().Msg("goroutine monitor stopped")
}
// checkGoroutineCount checks current goroutine count and logs if it exceeds thresholds
func (gm *GoroutineMonitor) checkGoroutineCount() {
currentCount := runtime.NumGoroutine()
gm.lastCount = currentCount
// Update peak count if needed
if currentCount > gm.peakCount {
gm.peakCount = currentCount
}
// Calculate growth since baseline
growth := currentCount - gm.baselineCount
growthPercent := float64(growth) / float64(gm.baselineCount) * 100
// Log warning if growth exceeds thresholds
logger := logging.GetDefaultLogger().With().Str("component", "goroutine-monitor").Logger()
// Different log levels based on growth severity
if growthPercent > 30 {
// Severe growth - trigger cleanup
logger.Warn().Int("current", currentCount).Int("baseline", gm.baselineCount).
Int("growth", growth).Float64("growth_percent", growthPercent).
Msg("excessive goroutine growth detected - triggering cleanup")
// Force garbage collection to clean up unused resources
runtime.GC()
// Force cleanup of goroutine buffer cache
cleanupGoroutineCache()
} else if growthPercent > 20 {
// Moderate growth - just log warning
logger.Warn().Int("current", currentCount).Int("baseline", gm.baselineCount).
Int("growth", growth).Float64("growth_percent", growthPercent).
Msg("significant goroutine growth detected")
} else if growthPercent > 10 {
// Minor growth - log info
logger.Info().Int("current", currentCount).Int("baseline", gm.baselineCount).
Int("growth", growth).Float64("growth_percent", growthPercent).
Msg("goroutine growth detected")
}
// Update last check time
gm.lastCheck = time.Now()
}
// GetGoroutineStats returns current goroutine statistics
func (gm *GoroutineMonitor) GetGoroutineStats() map[string]interface{} {
return map[string]interface{}{
"current_count": gm.lastCount,
"baseline_count": gm.baselineCount,
"peak_count": gm.peakCount,
"growth": gm.lastCount - gm.baselineCount,
"growth_percent": float64(gm.lastCount-gm.baselineCount) / float64(gm.baselineCount) * 100,
"last_check": gm.lastCheck,
}
}
// GetGoroutineMonitor returns the global goroutine monitor instance
func GetGoroutineMonitor() *GoroutineMonitor {
if globalGoroutineMonitor == nil {
globalGoroutineMonitor = NewGoroutineMonitor(GetConfig().GoroutineMonitorInterval)
}
return globalGoroutineMonitor
}
// StartGoroutineMonitoring starts the global goroutine monitor
func StartGoroutineMonitoring() {
monitor := GetGoroutineMonitor()
monitor.Start()
}
// StopGoroutineMonitoring stops the global goroutine monitor
func StopGoroutineMonitoring() {
if globalGoroutineMonitor != nil {
globalGoroutineMonitor.Stop()
}
}

View File

@ -1,283 +0,0 @@
package audio
import (
"sync"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// Task represents a function to be executed by a worker in the pool
type Task func()
// GoroutinePool manages a pool of reusable goroutines to reduce the overhead
// of goroutine creation and destruction
type GoroutinePool struct {
// Atomic fields must be first for proper alignment on 32-bit systems
taskCount int64 // Number of tasks processed
workerCount int64 // Current number of workers
maxIdleTime time.Duration
maxWorkers int
taskQueue chan Task
workerSem chan struct{} // Semaphore to limit concurrent workers
shutdown chan struct{}
shutdownOnce sync.Once
wg sync.WaitGroup
logger *zerolog.Logger
name string
}
// NewGoroutinePool creates a new goroutine pool with the specified parameters
func NewGoroutinePool(name string, maxWorkers int, queueSize int, maxIdleTime time.Duration) *GoroutinePool {
logger := logging.GetDefaultLogger().With().Str("component", "goroutine-pool").Str("pool", name).Logger()
pool := &GoroutinePool{
maxWorkers: maxWorkers,
maxIdleTime: maxIdleTime,
taskQueue: make(chan Task, queueSize),
workerSem: make(chan struct{}, maxWorkers),
shutdown: make(chan struct{}),
logger: &logger,
name: name,
}
// Start a supervisor goroutine to monitor pool health
go pool.supervisor()
return pool
}
// Submit adds a task to the pool for execution
// Returns true if the task was accepted, false if the queue is full
func (p *GoroutinePool) Submit(task Task) bool {
select {
case <-p.shutdown:
return false // Pool is shutting down
case p.taskQueue <- task:
// Task accepted, ensure we have a worker to process it
p.ensureWorkerAvailable()
return true
default:
// Queue is full
return false
}
}
// ensureWorkerAvailable makes sure at least one worker is available to process tasks
func (p *GoroutinePool) ensureWorkerAvailable() {
// Check if we already have enough workers
currentWorkers := atomic.LoadInt64(&p.workerCount)
// Only start new workers if:
// 1. We have no workers at all, or
// 2. The queue is growing and we're below max workers
queueLen := len(p.taskQueue)
if currentWorkers == 0 || (queueLen > int(currentWorkers) && currentWorkers < int64(p.maxWorkers)) {
// Try to acquire a semaphore slot without blocking
select {
case p.workerSem <- struct{}{}:
// We got a slot, start a new worker
p.startWorker()
default:
// All worker slots are taken, which means we have enough workers
}
}
}
// startWorker launches a new worker goroutine
func (p *GoroutinePool) startWorker() {
p.wg.Add(1)
atomic.AddInt64(&p.workerCount, 1)
go func() {
defer func() {
atomic.AddInt64(&p.workerCount, -1)
<-p.workerSem // Release the semaphore slot
p.wg.Done()
// Recover from panics in worker tasks
if r := recover(); r != nil {
p.logger.Error().Interface("panic", r).Msg("Worker recovered from panic")
}
}()
idleTimer := time.NewTimer(p.maxIdleTime)
defer idleTimer.Stop()
for {
select {
case <-p.shutdown:
return
case task, ok := <-p.taskQueue:
if !ok {
return // Channel closed
}
// Reset idle timer
if !idleTimer.Stop() {
<-idleTimer.C
}
idleTimer.Reset(p.maxIdleTime)
// Execute the task with panic recovery
func() {
defer func() {
if r := recover(); r != nil {
p.logger.Error().Interface("panic", r).Msg("Task execution panic recovered")
}
}()
task()
}()
atomic.AddInt64(&p.taskCount, 1)
case <-idleTimer.C:
// Worker has been idle for too long
// Keep at least 2 workers alive to handle incoming tasks without creating new goroutines
if atomic.LoadInt64(&p.workerCount) > 2 {
return
}
// For persistent workers (the minimum 2), use a longer idle timeout
// This prevents excessive worker creation/destruction cycles
idleTimer.Reset(p.maxIdleTime * 3) // Triple the idle time for persistent workers
}
}
}()
}
// supervisor monitors the pool and logs statistics periodically
func (p *GoroutinePool) supervisor() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-p.shutdown:
return
case <-ticker.C:
workers := atomic.LoadInt64(&p.workerCount)
tasks := atomic.LoadInt64(&p.taskCount)
queueLen := len(p.taskQueue)
p.logger.Info().
Int64("workers", workers).
Int64("tasks_processed", tasks).
Int("queue_length", queueLen).
Msg("Pool statistics")
}
}
}
// Shutdown gracefully shuts down the pool
// If wait is true, it will wait for all tasks to complete
// If wait is false, it will terminate immediately, potentially leaving tasks unprocessed
func (p *GoroutinePool) Shutdown(wait bool) {
p.shutdownOnce.Do(func() {
close(p.shutdown)
if wait {
// Wait for all tasks to be processed
if len(p.taskQueue) > 0 {
p.logger.Info().Int("remaining_tasks", len(p.taskQueue)).Msg("Waiting for tasks to complete")
}
// Close the task queue to signal no more tasks
close(p.taskQueue)
// Wait for all workers to finish
p.wg.Wait()
}
})
}
// GetStats returns statistics about the pool
func (p *GoroutinePool) GetStats() map[string]interface{} {
return map[string]interface{}{
"name": p.name,
"worker_count": atomic.LoadInt64(&p.workerCount),
"max_workers": p.maxWorkers,
"tasks_processed": atomic.LoadInt64(&p.taskCount),
"queue_length": len(p.taskQueue),
"queue_capacity": cap(p.taskQueue),
}
}
// Global pools for different audio processing tasks
var (
globalAudioProcessorPool atomic.Pointer[GoroutinePool]
globalAudioReaderPool atomic.Pointer[GoroutinePool]
globalAudioProcessorInitOnce sync.Once
globalAudioReaderInitOnce sync.Once
)
// GetAudioProcessorPool returns the global audio processor pool
func GetAudioProcessorPool() *GoroutinePool {
pool := globalAudioProcessorPool.Load()
if pool != nil {
return pool
}
globalAudioProcessorInitOnce.Do(func() {
config := GetConfig()
newPool := NewGoroutinePool(
"audio-processor",
config.MaxAudioProcessorWorkers,
config.AudioProcessorQueueSize,
config.WorkerMaxIdleTime,
)
globalAudioProcessorPool.Store(newPool)
pool = newPool
})
return globalAudioProcessorPool.Load()
}
// GetAudioReaderPool returns the global audio reader pool
func GetAudioReaderPool() *GoroutinePool {
pool := globalAudioReaderPool.Load()
if pool != nil {
return pool
}
globalAudioReaderInitOnce.Do(func() {
config := GetConfig()
newPool := NewGoroutinePool(
"audio-reader",
config.MaxAudioReaderWorkers,
config.AudioReaderQueueSize,
config.WorkerMaxIdleTime,
)
globalAudioReaderPool.Store(newPool)
pool = newPool
})
return globalAudioReaderPool.Load()
}
// SubmitAudioProcessorTask submits a task to the audio processor pool
func SubmitAudioProcessorTask(task Task) bool {
return GetAudioProcessorPool().Submit(task)
}
// SubmitAudioReaderTask submits a task to the audio reader pool
func SubmitAudioReaderTask(task Task) bool {
return GetAudioReaderPool().Submit(task)
}
// ShutdownAudioPools shuts down all audio goroutine pools
func ShutdownAudioPools(wait bool) {
logger := logging.GetDefaultLogger().With().Str("component", "audio-pools").Logger()
processorPool := globalAudioProcessorPool.Load()
if processorPool != nil {
logger.Info().Msg("Shutting down audio processor pool")
processorPool.Shutdown(wait)
}
readerPool := globalAudioReaderPool.Load()
if readerPool != nil {
logger.Info().Msg("Shutting down audio reader pool")
readerPool.Shutdown(wait)
}
}

View File

@ -96,13 +96,9 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error {
// Log high latency warnings // Log high latency warnings
if processingTime > time.Duration(GetConfig().InputProcessingTimeoutMS)*time.Millisecond { if processingTime > time.Duration(GetConfig().InputProcessingTimeoutMS)*time.Millisecond {
latencyMs := float64(processingTime.Milliseconds())
aim.logger.Warn(). aim.logger.Warn().
Float64("latency_ms", latencyMs). Dur("latency_ms", processingTime).
Msg("High audio processing latency detected") Msg("High audio processing latency detected")
// Record latency for goroutine cleanup optimization
RecordAudioLatency(latencyMs)
} }
if err != nil { if err != nil {
@ -136,13 +132,9 @@ func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame)
// Log high latency warnings // Log high latency warnings
if processingTime > time.Duration(GetConfig().InputProcessingTimeoutMS)*time.Millisecond { if processingTime > time.Duration(GetConfig().InputProcessingTimeoutMS)*time.Millisecond {
latencyMs := float64(processingTime.Milliseconds())
aim.logger.Warn(). aim.logger.Warn().
Float64("latency_ms", latencyMs). Dur("latency_ms", processingTime).
Msg("High audio processing latency detected") Msg("High audio processing latency detected")
// Record latency for goroutine cleanup optimization
RecordAudioLatency(latencyMs)
} }
if err != nil { if err != nil {

View File

@ -289,14 +289,8 @@ func (ais *AudioInputServer) Start() error {
ais.startProcessorGoroutine() ais.startProcessorGoroutine()
ais.startMonitorGoroutine() ais.startMonitorGoroutine()
// Submit the connection acceptor to the audio reader pool // Accept connections in a goroutine
if !SubmitAudioReaderTask(ais.acceptConnections) { go ais.acceptConnections()
// If the pool is full or shutting down, fall back to direct goroutine creation
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation")
go ais.acceptConnections()
}
return nil return nil
} }
@ -366,14 +360,8 @@ func (ais *AudioInputServer) acceptConnections() {
ais.conn = conn ais.conn = conn
ais.mtx.Unlock() ais.mtx.Unlock()
// Handle this connection using the goroutine pool // Handle this connection
if !SubmitAudioReaderTask(func() { ais.handleConnection(conn) }) { go ais.handleConnection(conn)
// If the pool is full or shutting down, fall back to direct goroutine creation
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation")
go ais.handleConnection(conn)
}
} }
} }
@ -501,27 +489,19 @@ func (ais *AudioInputServer) processMessage(msg *InputIPCMessage) error {
// processOpusFrame processes an Opus audio frame // processOpusFrame processes an Opus audio frame
func (ais *AudioInputServer) processOpusFrame(data []byte) error { func (ais *AudioInputServer) processOpusFrame(data []byte) error {
// Fast path: skip empty frame check - caller should handle this if len(data) == 0 {
dataLen := len(data) return nil // Empty frame, ignore
if dataLen == 0 {
return nil
} }
// Inline validation for critical audio path - avoid function call overhead // Use ultra-fast validation for critical audio path
if dataLen > cachedMaxFrameSize { if err := ValidateAudioFrame(data); err != nil {
return ErrFrameDataTooLarge logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
logger.Error().Err(err).Msg("Frame validation failed")
return fmt.Errorf("input frame validation failed: %w", err)
} }
// Get cached config once - avoid repeated calls and locking // Process the Opus frame using CGO
cache := GetCachedConfig() _, err := CGOAudioDecodeWrite(data)
// Skip cache expiry check in hotpath - background updates handle this
// Get a PCM buffer from the pool for optimized decode-write
pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize())
defer ReturnBufferToPool(pcmBuffer)
// Direct CGO call - avoid wrapper function overhead
_, err := CGOAudioDecodeWrite(data, pcmBuffer)
return err return err
} }
@ -710,20 +690,25 @@ func (aic *AudioInputClient) SendFrame(frame []byte) error {
return fmt.Errorf("not connected to audio input server") return fmt.Errorf("not connected to audio input server")
} }
frameLen := len(frame) if len(frame) == 0 {
if frameLen == 0 {
return nil // Empty frame, ignore return nil // Empty frame, ignore
} }
// Inline frame validation to reduce function call overhead // Validate frame data before sending
if frameLen > maxFrameSize { if err := ValidateAudioFrame(frame); err != nil {
return ErrFrameDataTooLarge logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
logger.Error().Err(err).Msg("Frame validation failed")
return fmt.Errorf("input frame validation failed: %w", err)
}
if len(frame) > maxFrameSize {
return fmt.Errorf("frame too large: got %d bytes, maximum allowed %d bytes", len(frame), maxFrameSize)
} }
msg := &InputIPCMessage{ msg := &InputIPCMessage{
Magic: inputMagicNumber, Magic: inputMagicNumber,
Type: InputMessageTypeOpusFrame, Type: InputMessageTypeOpusFrame,
Length: uint32(frameLen), Length: uint32(len(frame)),
Timestamp: time.Now().UnixNano(), Timestamp: time.Now().UnixNano(),
Data: frame, Data: frame,
} }
@ -740,25 +725,26 @@ func (aic *AudioInputClient) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error
return fmt.Errorf("not connected to audio input server") return fmt.Errorf("not connected to audio input server")
} }
if frame == nil { if frame == nil || frame.Length() == 0 {
return nil // Nil frame, ignore
}
frameLen := frame.Length()
if frameLen == 0 {
return nil // Empty frame, ignore return nil // Empty frame, ignore
} }
// Inline frame validation to reduce function call overhead // Validate zero-copy frame before sending
if frameLen > maxFrameSize { if err := ValidateZeroCopyFrame(frame); err != nil {
return ErrFrameDataTooLarge logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
logger.Error().Err(err).Msg("Zero-copy frame validation failed")
return fmt.Errorf("input frame validation failed: %w", err)
}
if frame.Length() > maxFrameSize {
return fmt.Errorf("frame too large: got %d bytes, maximum allowed %d bytes", frame.Length(), maxFrameSize)
} }
// Use zero-copy data directly // Use zero-copy data directly
msg := &InputIPCMessage{ msg := &InputIPCMessage{
Magic: inputMagicNumber, Magic: inputMagicNumber,
Type: InputMessageTypeOpusFrame, Type: InputMessageTypeOpusFrame,
Length: uint32(frameLen), Length: uint32(frame.Length()),
Timestamp: time.Now().UnixNano(), Timestamp: time.Now().UnixNano(),
Data: frame.Data(), // Zero-copy data access Data: frame.Data(), // Zero-copy data access
} }
@ -892,12 +878,10 @@ func (aic *AudioInputClient) ResetStats() {
ResetFrameStats(&aic.totalFrames, &aic.droppedFrames) ResetFrameStats(&aic.totalFrames, &aic.droppedFrames)
} }
// startReaderGoroutine starts the message reader using the goroutine pool // startReaderGoroutine starts the message reader goroutine
func (ais *AudioInputServer) startReaderGoroutine() { func (ais *AudioInputServer) startReaderGoroutine() {
ais.wg.Add(1) ais.wg.Add(1)
go func() {
// Create a reader task that will run in the goroutine pool
readerTask := func() {
defer ais.wg.Done() defer ais.wg.Done()
// Enhanced error tracking and recovery // Enhanced error tracking and recovery
@ -929,7 +913,10 @@ func (ais *AudioInputServer) startReaderGoroutine() {
consecutiveErrors++ consecutiveErrors++
lastErrorTime = now lastErrorTime = now
// Skip logging in hotpath for performance - only log critical errors // Log error with context
logger.Warn().Err(err).
Int("consecutive_errors", consecutiveErrors).
Msg("Failed to read message from input connection")
// Progressive backoff based on error count // Progressive backoff based on error count
if consecutiveErrors > 1 { if consecutiveErrors > 1 {
@ -979,33 +966,26 @@ func (ais *AudioInputServer) startReaderGoroutine() {
} }
} }
} }
} }()
// Submit the reader task to the audio reader pool
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if !SubmitAudioReaderTask(readerTask) {
// If the pool is full or shutting down, fall back to direct goroutine creation
logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation")
go readerTask()
}
} }
// startProcessorGoroutine starts the message processor using the goroutine pool // startProcessorGoroutine starts the message processor goroutine
func (ais *AudioInputServer) startProcessorGoroutine() { func (ais *AudioInputServer) startProcessorGoroutine() {
ais.wg.Add(1) ais.wg.Add(1)
go func() {
// Create a processor task that will run in the goroutine pool
processorTask := func() {
runtime.LockOSThread() runtime.LockOSThread()
defer runtime.UnlockOSThread() defer runtime.UnlockOSThread()
// Set high priority for audio processing - skip logging in hotpath // Set high priority for audio processing
_ = SetAudioThreadPriority() logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
defer func() { _ = ResetThreadPriority() }() if err := SetAudioThreadPriority(); err != nil {
logger.Warn().Err(err).Msg("Failed to set audio processing priority")
// Create logger for this goroutine }
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() defer func() {
if err := ResetThreadPriority(); err != nil {
logger.Warn().Err(err).Msg("Failed to reset thread priority")
}
}()
// Enhanced error tracking for processing // Enhanced error tracking for processing
var processingErrors int var processingErrors int
@ -1034,10 +1014,17 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
processingErrors++ processingErrors++
lastProcessingError = now lastProcessingError = now
// Skip logging in hotpath for performance logger.Warn().Err(err).
Int("processing_errors", processingErrors).
Dur("processing_time", processingTime).
Msg("Failed to process input message")
// If too many processing errors, drop frames more aggressively // If too many processing errors, drop frames more aggressively
if processingErrors >= maxProcessingErrors { if processingErrors >= maxProcessingErrors {
logger.Error().
Int("processing_errors", processingErrors).
Msg("Too many processing errors, entering aggressive drop mode")
// Clear processing queue to recover // Clear processing queue to recover
for len(ais.processChan) > 0 { for len(ais.processChan) > 0 {
select { select {
@ -1055,23 +1042,14 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
// Reset error counter on successful processing // Reset error counter on successful processing
if processingErrors > 0 { if processingErrors > 0 {
processingErrors = 0 processingErrors = 0
// Skip logging in hotpath for performance logger.Info().Msg("Input processing recovered")
} }
// Update processing time metrics // Update processing time metrics
atomic.StoreInt64(&ais.processingTime, processingTime.Nanoseconds()) atomic.StoreInt64(&ais.processingTime, processingTime.Nanoseconds())
} }
} }
} }()
// Submit the processor task to the audio processor pool
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if !SubmitAudioProcessorTask(processorTask) {
// If the pool is full or shutting down, fall back to direct goroutine creation
logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation")
go processorTask()
}
} }
// processMessageWithRecovery processes a message with enhanced error recovery // processMessageWithRecovery processes a message with enhanced error recovery
@ -1108,12 +1086,10 @@ func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, lo
} }
} }
// startMonitorGoroutine starts the performance monitoring using the goroutine pool // startMonitorGoroutine starts the performance monitoring goroutine
func (ais *AudioInputServer) startMonitorGoroutine() { func (ais *AudioInputServer) startMonitorGoroutine() {
ais.wg.Add(1) ais.wg.Add(1)
go func() {
// Create a monitor task that will run in the goroutine pool
monitorTask := func() {
runtime.LockOSThread() runtime.LockOSThread()
defer runtime.UnlockOSThread() defer runtime.UnlockOSThread()
@ -1190,16 +1166,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
} }
} }
} }
} }()
// Submit the monitor task to the audio processor pool
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if !SubmitAudioProcessorTask(monitorTask) {
// If the pool is full or shutting down, fall back to direct goroutine creation
logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation")
go monitorTask()
}
} }
// GetServerStats returns server performance statistics // GetServerStats returns server performance statistics

View File

@ -159,14 +159,8 @@ func (s *AudioOutputServer) Start() error {
// Start message processor goroutine // Start message processor goroutine
s.startProcessorGoroutine() s.startProcessorGoroutine()
// Submit the connection acceptor to the audio reader pool // Accept connections in a goroutine
if !SubmitAudioReaderTask(s.acceptConnections) { go s.acceptConnections()
// If the pool is full or shutting down, fall back to direct goroutine creation
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger()
logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation")
go s.acceptConnections()
}
return nil return nil
} }
@ -205,12 +199,10 @@ func (s *AudioOutputServer) acceptConnections() {
} }
} }
// startProcessorGoroutine starts the message processor using the goroutine pool // startProcessorGoroutine starts the message processor
func (s *AudioOutputServer) startProcessorGoroutine() { func (s *AudioOutputServer) startProcessorGoroutine() {
s.wg.Add(1) s.wg.Add(1)
go func() {
// Create a processor task that will run in the goroutine pool
processorTask := func() {
defer s.wg.Done() defer s.wg.Done()
for { for {
select { select {
@ -226,16 +218,7 @@ func (s *AudioOutputServer) startProcessorGoroutine() {
return return
} }
} }
} }()
// Submit the processor task to the audio processor pool
if !SubmitAudioProcessorTask(processorTask) {
// If the pool is full or shutting down, fall back to direct goroutine creation
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger()
logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation")
go processorTask()
}
} }
func (s *AudioOutputServer) Stop() { func (s *AudioOutputServer) Stop() {

View File

@ -1,6 +1,3 @@
//go:build cgo
// +build cgo
package audio package audio
import ( import (
@ -311,9 +308,6 @@ func (s *AudioOutputStreamer) ReportLatency(latency time.Duration) {
// StartAudioOutputStreaming starts audio output streaming (capturing system audio) // StartAudioOutputStreaming starts audio output streaming (capturing system audio)
func StartAudioOutputStreaming(send func([]byte)) error { func StartAudioOutputStreaming(send func([]byte)) error {
// Initialize audio monitoring (latency tracking and cache cleanup)
InitializeAudioMonitoring()
if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) { if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) {
return ErrAudioAlreadyRunning return ErrAudioAlreadyRunning
} }

View File

@ -1,217 +0,0 @@
package audio
import (
"sync"
"sync/atomic"
)
// SizedBufferPool manages a pool of buffers with size tracking
type SizedBufferPool struct {
// The underlying sync.Pool
pool sync.Pool
// Statistics for monitoring
totalBuffers atomic.Int64
totalBytes atomic.Int64
gets atomic.Int64
puts atomic.Int64
misses atomic.Int64
// Configuration
maxBufferSize int
defaultSize int
}
// NewSizedBufferPool creates a new sized buffer pool
func NewSizedBufferPool(defaultSize, maxBufferSize int) *SizedBufferPool {
pool := &SizedBufferPool{
maxBufferSize: maxBufferSize,
defaultSize: defaultSize,
}
pool.pool = sync.Pool{
New: func() interface{} {
// Track pool misses
pool.misses.Add(1)
// Create new buffer with default size
buf := make([]byte, defaultSize)
// Return pointer-like to avoid allocations
slice := buf[:0]
ptrSlice := &slice
// Track statistics
pool.totalBuffers.Add(1)
pool.totalBytes.Add(int64(cap(buf)))
return ptrSlice
},
}
return pool
}
// Get returns a buffer from the pool with at least the specified capacity
func (p *SizedBufferPool) Get(minCapacity int) []byte {
// Track gets
p.gets.Add(1)
// Get buffer from pool - handle pointer-like storage
var buf []byte
poolObj := p.pool.Get()
switch v := poolObj.(type) {
case *[]byte:
// Handle pointer-like storage from Put method
if v != nil {
buf = (*v)[:0] // Get the underlying slice
} else {
buf = make([]byte, 0, p.defaultSize)
}
case []byte:
// Handle direct slice for backward compatibility
buf = v
default:
// Fallback for unexpected types
buf = make([]byte, 0, p.defaultSize)
p.misses.Add(1)
}
// Check if buffer has sufficient capacity
if cap(buf) < minCapacity {
// Track statistics for the old buffer
p.totalBytes.Add(-int64(cap(buf)))
// Allocate new buffer with required capacity
buf = make([]byte, minCapacity)
// Track statistics for the new buffer
p.totalBytes.Add(int64(cap(buf)))
} else {
// Resize existing buffer
buf = buf[:minCapacity]
}
return buf
}
// Put returns a buffer to the pool
func (p *SizedBufferPool) Put(buf []byte) {
// Track statistics
p.puts.Add(1)
// Don't pool excessively large buffers to prevent memory bloat
if cap(buf) > p.maxBufferSize {
// Track statistics
p.totalBuffers.Add(-1)
p.totalBytes.Add(-int64(cap(buf)))
return
}
// Clear buffer contents for security
for i := range buf {
buf[i] = 0
}
// Return to pool - use pointer-like approach to avoid allocations
slice := buf[:0]
p.pool.Put(&slice)
}
// GetStats returns statistics about the buffer pool
func (p *SizedBufferPool) GetStats() (buffers, bytes, gets, puts, misses int64) {
buffers = p.totalBuffers.Load()
bytes = p.totalBytes.Load()
gets = p.gets.Load()
puts = p.puts.Load()
misses = p.misses.Load()
return
}
// BufferPoolStats contains statistics about a buffer pool
type BufferPoolStats struct {
TotalBuffers int64
TotalBytes int64
Gets int64
Puts int64
Misses int64
HitRate float64
AverageBufferSize float64
}
// GetDetailedStats returns detailed statistics about the buffer pool
func (p *SizedBufferPool) GetDetailedStats() BufferPoolStats {
buffers := p.totalBuffers.Load()
bytes := p.totalBytes.Load()
gets := p.gets.Load()
puts := p.puts.Load()
misses := p.misses.Load()
// Calculate hit rate
hitRate := 0.0
if gets > 0 {
hitRate = float64(gets-misses) / float64(gets) * 100.0
}
// Calculate average buffer size
avgSize := 0.0
if buffers > 0 {
avgSize = float64(bytes) / float64(buffers)
}
return BufferPoolStats{
TotalBuffers: buffers,
TotalBytes: bytes,
Gets: gets,
Puts: puts,
Misses: misses,
HitRate: hitRate,
AverageBufferSize: avgSize,
}
}
// Global audio buffer pools with different size classes
var (
// Small buffers (up to 4KB)
smallBufferPool = NewSizedBufferPool(1024, 4*1024)
// Medium buffers (4KB to 64KB)
mediumBufferPool = NewSizedBufferPool(8*1024, 64*1024)
// Large buffers (64KB to 1MB)
largeBufferPool = NewSizedBufferPool(64*1024, 1024*1024)
)
// GetOptimalBuffer returns a buffer from the most appropriate pool based on size
func GetOptimalBuffer(size int) []byte {
switch {
case size <= 4*1024:
return smallBufferPool.Get(size)
case size <= 64*1024:
return mediumBufferPool.Get(size)
default:
return largeBufferPool.Get(size)
}
}
// ReturnOptimalBuffer returns a buffer to the appropriate pool based on size
func ReturnOptimalBuffer(buf []byte) {
size := cap(buf)
switch {
case size <= 4*1024:
smallBufferPool.Put(buf)
case size <= 64*1024:
mediumBufferPool.Put(buf)
default:
largeBufferPool.Put(buf)
}
}
// GetAllPoolStats returns statistics for all buffer pools
func GetAllPoolStats() map[string]BufferPoolStats {
return map[string]BufferPoolStats{
"small": smallBufferPool.GetDetailedStats(),
"medium": mediumBufferPool.GetDetailedStats(),
"large": largeBufferPool.GetDetailedStats(),
}
}

View File

@ -58,22 +58,10 @@ func ValidateZeroCopyFrame(frame *ZeroCopyAudioFrame) error {
} }
// ValidateBufferSize validates buffer size parameters with enhanced boundary checks // ValidateBufferSize validates buffer size parameters with enhanced boundary checks
// Optimized to use AudioConfigCache for frequently accessed values
func ValidateBufferSize(size int) error { func ValidateBufferSize(size int) error {
if size <= 0 { if size <= 0 {
return fmt.Errorf("%w: buffer size %d must be positive", ErrInvalidBufferSize, size) return fmt.Errorf("%w: buffer size %d must be positive", ErrInvalidBufferSize, size)
} }
// Fast path: Check against cached max frame size
cache := GetCachedConfig()
maxFrameSize := int(cache.maxAudioFrameSize.Load())
// Most common case: validating a buffer that's sized for audio frames
if maxFrameSize > 0 && size <= maxFrameSize {
return nil
}
// Slower path: full validation against SocketMaxBuffer
config := GetConfig() config := GetConfig()
// Use SocketMaxBuffer as the upper limit for general buffer validation // Use SocketMaxBuffer as the upper limit for general buffer validation
// This allows for socket buffers while still preventing extremely large allocations // This allows for socket buffers while still preventing extremely large allocations
@ -211,22 +199,10 @@ func ValidateLatencyConfig(config LatencyConfig) error {
} }
// ValidateSampleRate validates audio sample rate values // ValidateSampleRate validates audio sample rate values
// Optimized to use AudioConfigCache for frequently accessed values
func ValidateSampleRate(sampleRate int) error { func ValidateSampleRate(sampleRate int) error {
if sampleRate <= 0 { if sampleRate <= 0 {
return fmt.Errorf("%w: sample rate %d must be positive", ErrInvalidSampleRate, sampleRate) return fmt.Errorf("%w: sample rate %d must be positive", ErrInvalidSampleRate, sampleRate)
} }
// Fast path: Check against cached sample rate first
cache := GetCachedConfig()
cachedRate := int(cache.sampleRate.Load())
// Most common case: validating against the current sample rate
if sampleRate == cachedRate {
return nil
}
// Slower path: check against all valid rates
config := GetConfig() config := GetConfig()
validRates := config.ValidSampleRates validRates := config.ValidSampleRates
for _, rate := range validRates { for _, rate := range validRates {
@ -239,23 +215,10 @@ func ValidateSampleRate(sampleRate int) error {
} }
// ValidateChannelCount validates audio channel count // ValidateChannelCount validates audio channel count
// Optimized to use AudioConfigCache for frequently accessed values
func ValidateChannelCount(channels int) error { func ValidateChannelCount(channels int) error {
if channels <= 0 { if channels <= 0 {
return fmt.Errorf("%w: channel count %d must be positive", ErrInvalidChannels, channels) return fmt.Errorf("%w: channel count %d must be positive", ErrInvalidChannels, channels)
} }
// Fast path: Check against cached channels first
cache := GetCachedConfig()
cachedChannels := int(cache.channels.Load())
// Most common case: validating against the current channel count
if channels == cachedChannels {
return nil
}
// Check against max channels - still using cache to avoid GetConfig()
// Note: We don't have maxChannels in the cache yet, so we'll use GetConfig() for now
config := GetConfig() config := GetConfig()
if channels > config.MaxChannels { if channels > config.MaxChannels {
return fmt.Errorf("%w: channel count %d exceeds maximum %d", return fmt.Errorf("%w: channel count %d exceeds maximum %d",
@ -265,33 +228,10 @@ func ValidateChannelCount(channels int) error {
} }
// ValidateBitrate validates audio bitrate values (expects kbps) // ValidateBitrate validates audio bitrate values (expects kbps)
// Optimized to use AudioConfigCache for frequently accessed values
func ValidateBitrate(bitrate int) error { func ValidateBitrate(bitrate int) error {
if bitrate <= 0 { if bitrate <= 0 {
return fmt.Errorf("%w: bitrate %d must be positive", ErrInvalidBitrate, bitrate) return fmt.Errorf("%w: bitrate %d must be positive", ErrInvalidBitrate, bitrate)
} }
// Fast path: Check against cached bitrate values
cache := GetCachedConfig()
minBitrate := int(cache.minOpusBitrate.Load())
maxBitrate := int(cache.maxOpusBitrate.Load())
// If we have valid cached values, use them
if minBitrate > 0 && maxBitrate > 0 {
// Convert kbps to bps for comparison with config limits
bitrateInBps := bitrate * 1000
if bitrateInBps < minBitrate {
return fmt.Errorf("%w: bitrate %d kbps (%d bps) below minimum %d bps",
ErrInvalidBitrate, bitrate, bitrateInBps, minBitrate)
}
if bitrateInBps > maxBitrate {
return fmt.Errorf("%w: bitrate %d kbps (%d bps) exceeds maximum %d bps",
ErrInvalidBitrate, bitrate, bitrateInBps, maxBitrate)
}
return nil
}
// Slower path: full validation with GetConfig()
config := GetConfig() config := GetConfig()
// Convert kbps to bps for comparison with config limits // Convert kbps to bps for comparison with config limits
bitrateInBps := bitrate * 1000 bitrateInBps := bitrate * 1000
@ -307,31 +247,10 @@ func ValidateBitrate(bitrate int) error {
} }
// ValidateFrameDuration validates frame duration values // ValidateFrameDuration validates frame duration values
// Optimized to use AudioConfigCache for frequently accessed values
func ValidateFrameDuration(duration time.Duration) error { func ValidateFrameDuration(duration time.Duration) error {
if duration <= 0 { if duration <= 0 {
return fmt.Errorf("%w: frame duration %v must be positive", ErrInvalidFrameDuration, duration) return fmt.Errorf("%w: frame duration %v must be positive", ErrInvalidFrameDuration, duration)
} }
// Fast path: Check against cached frame size first
cache := GetCachedConfig()
// Convert frameSize (samples) to duration for comparison
// Note: This calculation should match how frameSize is converted to duration elsewhere
cachedFrameSize := int(cache.frameSize.Load())
cachedSampleRate := int(cache.sampleRate.Load())
// Only do this calculation if we have valid cached values
if cachedFrameSize > 0 && cachedSampleRate > 0 {
cachedDuration := time.Duration(cachedFrameSize) * time.Second / time.Duration(cachedSampleRate)
// Most common case: validating against the current frame duration
if duration == cachedDuration {
return nil
}
}
// Slower path: full validation against min/max
config := GetConfig() config := GetConfig()
if duration < config.MinFrameDuration { if duration < config.MinFrameDuration {
return fmt.Errorf("%w: frame duration %v below minimum %v", return fmt.Errorf("%w: frame duration %v below minimum %v",
@ -345,29 +264,7 @@ func ValidateFrameDuration(duration time.Duration) error {
} }
// ValidateAudioConfigComplete performs comprehensive audio configuration validation // ValidateAudioConfigComplete performs comprehensive audio configuration validation
// Uses optimized validation functions that leverage AudioConfigCache
func ValidateAudioConfigComplete(config AudioConfig) error { func ValidateAudioConfigComplete(config AudioConfig) error {
// Fast path: Check if all values match the current cached configuration
cache := GetCachedConfig()
cachedSampleRate := int(cache.sampleRate.Load())
cachedChannels := int(cache.channels.Load())
cachedBitrate := int(cache.opusBitrate.Load()) / 1000 // Convert from bps to kbps
cachedFrameSize := int(cache.frameSize.Load())
// Only do this calculation if we have valid cached values
if cachedSampleRate > 0 && cachedChannels > 0 && cachedBitrate > 0 && cachedFrameSize > 0 {
cachedDuration := time.Duration(cachedFrameSize) * time.Second / time.Duration(cachedSampleRate)
// Most common case: validating the current configuration
if config.SampleRate == cachedSampleRate &&
config.Channels == cachedChannels &&
config.Bitrate == cachedBitrate &&
config.FrameSize == cachedDuration {
return nil
}
}
// Slower path: validate each parameter individually
if err := ValidateAudioQuality(config.Quality); err != nil { if err := ValidateAudioQuality(config.Quality); err != nil {
return fmt.Errorf("quality validation failed: %w", err) return fmt.Errorf("quality validation failed: %w", err)
} }
@ -406,55 +303,36 @@ func ValidateAudioConfigConstants(config *AudioConfigConstants) error {
return nil return nil
} }
// Note: We're transitioning from individual cached values to using AudioConfigCache // Cached max frame size to avoid function call overhead in hot paths
// for better consistency and reduced maintenance overhead
// Global variable for backward compatibility
var cachedMaxFrameSize int var cachedMaxFrameSize int
// Note: Validation cache is initialized on first use to avoid init function
// InitValidationCache initializes cached validation values with actual config // InitValidationCache initializes cached validation values with actual config
func InitValidationCache() { func InitValidationCache() {
// Initialize the global cache variable for backward compatibility cachedMaxFrameSize = GetConfig().MaxAudioFrameSize
config := GetConfig()
cachedMaxFrameSize = config.MaxAudioFrameSize
// Update the global audio config cache
GetCachedConfig().Update()
} }
// ValidateAudioFrame provides optimized validation for audio frame data // ValidateAudioFrame provides optimized validation for audio frame data
// This is the primary validation function used in all audio processing paths // This is the primary validation function used in all audio processing paths
// //
// Performance optimizations: // Performance optimizations:
// - Uses cached max frame size to eliminate config lookups // - Uses cached config value to eliminate function call overhead
// - Single branch condition for optimal CPU pipeline efficiency // - Single branch condition for optimal CPU pipeline efficiency
// - Minimal error allocation overhead // - Inlined length checks for minimal overhead
// //
//go:inline //go:inline
func ValidateAudioFrame(data []byte) error { func ValidateAudioFrame(data []byte) error {
// Fast path: check length against cached max size in single operation // Initialize cache on first use if not already done
if cachedMaxFrameSize == 0 {
InitValidationCache()
}
// Optimized validation with pre-allocated error messages for minimal overhead
dataLen := len(data) dataLen := len(data)
if dataLen == 0 { if dataLen == 0 {
return ErrFrameDataEmpty return ErrFrameDataEmpty
} }
if dataLen > cachedMaxFrameSize {
// Use global cached value for fastest access - updated during initialization
maxSize := cachedMaxFrameSize
if maxSize == 0 {
// Fallback: get from cache only if global cache not initialized
cache := GetCachedConfig()
maxSize = int(cache.maxAudioFrameSize.Load())
if maxSize == 0 {
// Last resort: update cache and get fresh value
cache.Update()
maxSize = int(cache.maxAudioFrameSize.Load())
}
// Cache the value globally for next calls
cachedMaxFrameSize = maxSize
}
// Single comparison for validation
if dataLen > maxSize {
return ErrFrameDataTooLarge return ErrFrameDataTooLarge
} }
return nil return nil

12
main.go
View File

@ -38,14 +38,6 @@ func startAudioSubprocess() error {
// Start adaptive buffer management for optimal performance // Start adaptive buffer management for optimal performance
audio.StartAdaptiveBuffering() audio.StartAdaptiveBuffering()
// Start goroutine monitoring to detect and prevent leaks
audio.StartGoroutineMonitoring()
// Enable batch audio processing to reduce CGO call overhead
if err := audio.EnableBatchAudioProcessing(); err != nil {
logger.Warn().Err(err).Msg("failed to enable batch audio processing")
}
// Create audio server supervisor // Create audio server supervisor
audioSupervisor = audio.NewAudioOutputSupervisor() audioSupervisor = audio.NewAudioOutputSupervisor()
@ -103,10 +95,6 @@ func startAudioSubprocess() error {
audio.StopAudioRelay() audio.StopAudioRelay()
// Stop adaptive buffering // Stop adaptive buffering
audio.StopAdaptiveBuffering() audio.StopAdaptiveBuffering()
// Stop goroutine monitoring
audio.StopGoroutineMonitoring()
// Disable batch audio processing
audio.DisableBatchAudioProcessing()
}, },
// onRestart // onRestart
func(attempt int, delay time.Duration) { func(attempt int, delay time.Duration) {