mirror of https://github.com/jetkvm/kvm.git
Compare commits
11 Commits
950ca2bd99
...
260f62efc3
| Author | SHA1 | Date |
|---|---|---|
|
|
260f62efc3 | |
|
|
a741f05829 | |
|
|
a557987629 | |
|
|
5353c1cab2 | |
|
|
370178e43b | |
|
|
9f1dd28ad6 | |
|
|
2ab90e76e0 | |
|
|
1b7198aec2 | |
|
|
f9781f170c | |
|
|
d7b67e5012 | |
|
|
8110be6cc6 |
|
|
@ -9,4 +9,5 @@ tmp/
|
|||
*.tmp
|
||||
*.code-workspace
|
||||
|
||||
device-tests.tar.gz
|
||||
device-tests.tar.gz
|
||||
CLAUDE.md
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ package audio
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
|
@ -28,23 +29,31 @@ type BatchAudioProcessor struct {
|
|||
|
||||
// Batch queues and state (atomic for lock-free access)
|
||||
readQueue chan batchReadRequest
|
||||
writeQueue chan batchWriteRequest
|
||||
initialized int32
|
||||
running int32
|
||||
threadPinned int32
|
||||
writePinned int32
|
||||
|
||||
// Buffers (pre-allocated to avoid allocation overhead)
|
||||
readBufPool *sync.Pool
|
||||
readBufPool *sync.Pool
|
||||
writeBufPool *sync.Pool
|
||||
}
|
||||
|
||||
type BatchAudioStats struct {
|
||||
// int64 fields MUST be first for ARM32 alignment
|
||||
BatchedReads int64
|
||||
SingleReads int64
|
||||
BatchedWrites int64
|
||||
SingleWrites int64
|
||||
BatchedFrames int64
|
||||
SingleFrames int64
|
||||
WriteFrames int64
|
||||
CGOCallsReduced int64
|
||||
OSThreadPinTime time.Duration // time.Duration is int64 internally
|
||||
WriteThreadTime time.Duration // time.Duration is int64 internally
|
||||
LastBatchTime time.Time
|
||||
LastWriteTime time.Time
|
||||
}
|
||||
|
||||
type batchReadRequest struct {
|
||||
|
|
@ -58,23 +67,43 @@ type batchReadResult struct {
|
|||
err error
|
||||
}
|
||||
|
||||
type batchWriteRequest struct {
|
||||
buffer []byte // Buffer for backward compatibility
|
||||
opusData []byte // Opus encoded data for decode-write operations
|
||||
pcmBuffer []byte // PCM buffer for decode-write operations
|
||||
resultChan chan batchWriteResult
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
type batchWriteResult struct {
|
||||
length int
|
||||
err error
|
||||
}
|
||||
|
||||
// NewBatchAudioProcessor creates a new batch audio processor
|
||||
func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor {
|
||||
// Validate input parameters
|
||||
if err := ValidateBufferSize(batchSize); err != nil {
|
||||
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
|
||||
logger.Warn().Err(err).Int("batchSize", batchSize).Msg("invalid batch size, using default")
|
||||
batchSize = GetConfig().BatchProcessorFramesPerBatch
|
||||
// Get cached config to avoid GetConfig() calls
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// Validate input parameters with minimal overhead
|
||||
if batchSize <= 0 || batchSize > 1000 {
|
||||
batchSize = cache.BatchProcessorFramesPerBatch
|
||||
}
|
||||
if batchDuration <= 0 {
|
||||
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
|
||||
logger.Warn().Dur("batchDuration", batchDuration).Msg("invalid batch duration, using default")
|
||||
batchDuration = GetConfig().BatchProcessingDelay
|
||||
batchDuration = cache.BatchProcessingDelay
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
// Pre-allocate logger to avoid repeated allocations
|
||||
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
|
||||
|
||||
// Pre-calculate frame size to avoid repeated GetConfig() calls
|
||||
frameSize := cache.GetMinReadEncodeBuffer()
|
||||
if frameSize == 0 {
|
||||
frameSize = 1500 // Safe fallback
|
||||
}
|
||||
|
||||
processor := &BatchAudioProcessor{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
|
|
@ -82,9 +111,17 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu
|
|||
batchSize: batchSize,
|
||||
batchDuration: batchDuration,
|
||||
readQueue: make(chan batchReadRequest, batchSize*2),
|
||||
writeQueue: make(chan batchWriteRequest, batchSize*2),
|
||||
readBufPool: &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, GetConfig().AudioFramePoolSize) // Max audio frame size
|
||||
// Use pre-calculated frame size to avoid GetConfig() calls
|
||||
return make([]byte, 0, frameSize)
|
||||
},
|
||||
},
|
||||
writeBufPool: &sync.Pool{
|
||||
New: func() interface{} {
|
||||
// Use pre-calculated frame size to avoid GetConfig() calls
|
||||
return make([]byte, 0, frameSize)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
@ -105,6 +142,7 @@ func (bap *BatchAudioProcessor) Start() error {
|
|||
|
||||
// Start batch processing goroutines
|
||||
go bap.batchReadProcessor()
|
||||
go bap.batchWriteProcessor()
|
||||
|
||||
bap.logger.Info().Int("batch_size", bap.batchSize).
|
||||
Dur("batch_duration", bap.batchDuration).
|
||||
|
|
@ -129,13 +167,17 @@ func (bap *BatchAudioProcessor) Stop() {
|
|||
|
||||
// BatchReadEncode performs batched audio read and encode operations
|
||||
func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) {
|
||||
// Get cached config to avoid GetConfig() calls in hot path
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// Validate buffer before processing
|
||||
if err := ValidateBufferSize(len(buffer)); err != nil {
|
||||
bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing")
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if atomic.LoadInt32(&bap.running) == 0 {
|
||||
if !bap.IsRunning() {
|
||||
// Fallback to single operation if batch processor is not running
|
||||
atomic.AddInt64(&bap.stats.SingleReads, 1)
|
||||
atomic.AddInt64(&bap.stats.SingleFrames, 1)
|
||||
|
|
@ -149,21 +191,22 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) {
|
|||
timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Try to queue the request with non-blocking send
|
||||
select {
|
||||
case bap.readQueue <- request:
|
||||
// Successfully queued
|
||||
case <-time.After(GetConfig().ShortTimeout):
|
||||
// Queue is full or blocked, fallback to single operation
|
||||
default:
|
||||
// Queue is full, fallback to single operation
|
||||
atomic.AddInt64(&bap.stats.SingleReads, 1)
|
||||
atomic.AddInt64(&bap.stats.SingleFrames, 1)
|
||||
return CGOAudioReadEncode(buffer)
|
||||
}
|
||||
|
||||
// Wait for result
|
||||
// Wait for result with timeout
|
||||
select {
|
||||
case result := <-resultChan:
|
||||
return result.length, result.err
|
||||
case <-time.After(GetConfig().MediumTimeout):
|
||||
case <-time.After(cache.BatchProcessingTimeout):
|
||||
// Timeout, fallback to single operation
|
||||
atomic.AddInt64(&bap.stats.SingleReads, 1)
|
||||
atomic.AddInt64(&bap.stats.SingleFrames, 1)
|
||||
|
|
@ -171,6 +214,109 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// BatchDecodeWrite performs batched audio decode and write operations
|
||||
// This is the legacy version that uses a single buffer
|
||||
func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) {
|
||||
// Get cached config to avoid GetConfig() calls in hot path
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// Validate buffer before processing
|
||||
if err := ValidateBufferSize(len(buffer)); err != nil {
|
||||
bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing")
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if !bap.IsRunning() {
|
||||
// Fallback to single operation if batch processor is not running
|
||||
atomic.AddInt64(&bap.stats.SingleWrites, 1)
|
||||
atomic.AddInt64(&bap.stats.WriteFrames, 1)
|
||||
return CGOAudioDecodeWriteLegacy(buffer)
|
||||
}
|
||||
|
||||
resultChan := make(chan batchWriteResult, 1)
|
||||
request := batchWriteRequest{
|
||||
buffer: buffer,
|
||||
resultChan: resultChan,
|
||||
timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Try to queue the request with non-blocking send
|
||||
select {
|
||||
case bap.writeQueue <- request:
|
||||
// Successfully queued
|
||||
default:
|
||||
// Queue is full, fall back to single operation
|
||||
atomic.AddInt64(&bap.stats.SingleWrites, 1)
|
||||
atomic.AddInt64(&bap.stats.WriteFrames, 1)
|
||||
return CGOAudioDecodeWriteLegacy(buffer)
|
||||
}
|
||||
|
||||
// Wait for result with timeout
|
||||
select {
|
||||
case result := <-resultChan:
|
||||
return result.length, result.err
|
||||
case <-time.After(cache.BatchProcessingTimeout):
|
||||
atomic.AddInt64(&bap.stats.SingleWrites, 1)
|
||||
atomic.AddInt64(&bap.stats.WriteFrames, 1)
|
||||
return CGOAudioDecodeWriteLegacy(buffer)
|
||||
}
|
||||
}
|
||||
|
||||
// BatchDecodeWriteWithBuffers performs batched audio decode and write operations with separate opus and PCM buffers
|
||||
func (bap *BatchAudioProcessor) BatchDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
|
||||
// Get cached config to avoid GetConfig() calls in hot path
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// Validate buffers before processing
|
||||
if len(opusData) == 0 {
|
||||
return 0, fmt.Errorf("empty opus data buffer")
|
||||
}
|
||||
if len(pcmBuffer) == 0 {
|
||||
return 0, fmt.Errorf("empty PCM buffer")
|
||||
}
|
||||
|
||||
if !bap.IsRunning() {
|
||||
// Fallback to single operation if batch processor is not running
|
||||
atomic.AddInt64(&bap.stats.SingleWrites, 1)
|
||||
atomic.AddInt64(&bap.stats.WriteFrames, 1)
|
||||
// Use the optimized function with separate buffers
|
||||
return CGOAudioDecodeWrite(opusData, pcmBuffer)
|
||||
}
|
||||
|
||||
resultChan := make(chan batchWriteResult, 1)
|
||||
request := batchWriteRequest{
|
||||
opusData: opusData,
|
||||
pcmBuffer: pcmBuffer,
|
||||
resultChan: resultChan,
|
||||
timestamp: time.Now(),
|
||||
}
|
||||
|
||||
// Try to queue the request with non-blocking send
|
||||
select {
|
||||
case bap.writeQueue <- request:
|
||||
// Successfully queued
|
||||
default:
|
||||
// Queue is full, fall back to single operation
|
||||
atomic.AddInt64(&bap.stats.SingleWrites, 1)
|
||||
atomic.AddInt64(&bap.stats.WriteFrames, 1)
|
||||
// Use the optimized function with separate buffers
|
||||
return CGOAudioDecodeWrite(opusData, pcmBuffer)
|
||||
}
|
||||
|
||||
// Wait for result with timeout
|
||||
select {
|
||||
case result := <-resultChan:
|
||||
return result.length, result.err
|
||||
case <-time.After(cache.BatchProcessingTimeout):
|
||||
atomic.AddInt64(&bap.stats.SingleWrites, 1)
|
||||
atomic.AddInt64(&bap.stats.WriteFrames, 1)
|
||||
// Use the optimized function with separate buffers
|
||||
return CGOAudioDecodeWrite(opusData, pcmBuffer)
|
||||
}
|
||||
}
|
||||
|
||||
// batchReadProcessor processes batched read operations
|
||||
func (bap *BatchAudioProcessor) batchReadProcessor() {
|
||||
defer bap.logger.Debug().Msg("batch read processor stopped")
|
||||
|
|
@ -207,43 +353,150 @@ func (bap *BatchAudioProcessor) batchReadProcessor() {
|
|||
}
|
||||
}
|
||||
|
||||
// batchWriteProcessor processes batched write operations
|
||||
func (bap *BatchAudioProcessor) batchWriteProcessor() {
|
||||
defer bap.logger.Debug().Msg("batch write processor stopped")
|
||||
|
||||
ticker := time.NewTicker(bap.batchDuration)
|
||||
defer ticker.Stop()
|
||||
|
||||
var batch []batchWriteRequest
|
||||
batch = make([]batchWriteRequest, 0, bap.batchSize)
|
||||
|
||||
for atomic.LoadInt32(&bap.running) == 1 {
|
||||
select {
|
||||
case <-bap.ctx.Done():
|
||||
return
|
||||
|
||||
case req := <-bap.writeQueue:
|
||||
batch = append(batch, req)
|
||||
if len(batch) >= bap.batchSize {
|
||||
bap.processBatchWrite(batch)
|
||||
batch = batch[:0] // Clear slice but keep capacity
|
||||
}
|
||||
|
||||
case <-ticker.C:
|
||||
if len(batch) > 0 {
|
||||
bap.processBatchWrite(batch)
|
||||
batch = batch[:0] // Clear slice but keep capacity
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process any remaining requests
|
||||
if len(batch) > 0 {
|
||||
bap.processBatchWrite(batch)
|
||||
}
|
||||
}
|
||||
|
||||
// processBatchRead processes a batch of read requests efficiently
|
||||
func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
|
||||
if len(batch) == 0 {
|
||||
batchSize := len(batch)
|
||||
if batchSize == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Pin to OS thread for the entire batch to minimize thread switching overhead
|
||||
start := time.Now()
|
||||
if atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) {
|
||||
// Get cached config once - avoid repeated calls
|
||||
cache := GetCachedConfig()
|
||||
minBatchSize := cache.MinBatchSizeForThreadPinning
|
||||
|
||||
// Only pin to OS thread for large batches to reduce thread contention
|
||||
var start time.Time
|
||||
threadWasPinned := false
|
||||
if batchSize >= minBatchSize && atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) {
|
||||
start = time.Now()
|
||||
threadWasPinned = true
|
||||
runtime.LockOSThread()
|
||||
|
||||
// Set high priority for batch audio processing
|
||||
if err := SetAudioThreadPriority(); err != nil {
|
||||
bap.logger.Warn().Err(err).Msg("failed to set batch audio processing priority")
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := ResetThreadPriority(); err != nil {
|
||||
bap.logger.Warn().Err(err).Msg("failed to reset thread priority")
|
||||
}
|
||||
runtime.UnlockOSThread()
|
||||
atomic.StoreInt32(&bap.threadPinned, 0)
|
||||
bap.stats.OSThreadPinTime += time.Since(start)
|
||||
}()
|
||||
// Skip priority setting for better performance - audio threads already have good priority
|
||||
}
|
||||
|
||||
batchSize := len(batch)
|
||||
// Update stats efficiently
|
||||
atomic.AddInt64(&bap.stats.BatchedReads, 1)
|
||||
atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize))
|
||||
if batchSize > 1 {
|
||||
atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1))
|
||||
}
|
||||
|
||||
// Process each request in the batch with minimal overhead
|
||||
for i := range batch {
|
||||
req := &batch[i]
|
||||
length, err := CGOAudioReadEncode(req.buffer)
|
||||
|
||||
// Send result back (non-blocking) - reuse result struct
|
||||
select {
|
||||
case req.resultChan <- batchReadResult{length: length, err: err}:
|
||||
default:
|
||||
// Requestor timed out, drop result
|
||||
}
|
||||
}
|
||||
|
||||
// Release thread lock if we pinned it
|
||||
if threadWasPinned {
|
||||
runtime.UnlockOSThread()
|
||||
atomic.StoreInt32(&bap.threadPinned, 0)
|
||||
bap.stats.OSThreadPinTime += time.Since(start)
|
||||
}
|
||||
|
||||
bap.stats.LastBatchTime = time.Now()
|
||||
}
|
||||
|
||||
// processBatchWrite processes a batch of write requests efficiently
|
||||
func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) {
|
||||
if len(batch) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Get cached config to avoid GetConfig() calls in hot path
|
||||
cache := GetCachedConfig()
|
||||
|
||||
// Only pin to OS thread for large batches to reduce thread contention
|
||||
start := time.Now()
|
||||
shouldPinThread := len(batch) >= cache.MinBatchSizeForThreadPinning
|
||||
|
||||
// Track if we pinned the thread in this call
|
||||
threadWasPinned := false
|
||||
|
||||
if shouldPinThread && atomic.CompareAndSwapInt32(&bap.writePinned, 0, 1) {
|
||||
threadWasPinned = true
|
||||
runtime.LockOSThread()
|
||||
|
||||
// Set high priority for batch audio processing - skip logging in hotpath
|
||||
_ = SetAudioThreadPriority()
|
||||
}
|
||||
|
||||
batchSize := len(batch)
|
||||
atomic.AddInt64(&bap.stats.BatchedWrites, 1)
|
||||
atomic.AddInt64(&bap.stats.WriteFrames, int64(batchSize))
|
||||
if batchSize > 1 {
|
||||
atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1))
|
||||
}
|
||||
|
||||
// Add deferred function to release thread lock if we pinned it
|
||||
if threadWasPinned {
|
||||
defer func() {
|
||||
// Skip logging in hotpath for performance
|
||||
_ = ResetThreadPriority()
|
||||
runtime.UnlockOSThread()
|
||||
atomic.StoreInt32(&bap.writePinned, 0)
|
||||
bap.stats.WriteThreadTime += time.Since(start)
|
||||
}()
|
||||
}
|
||||
|
||||
// Process each request in the batch
|
||||
for _, req := range batch {
|
||||
length, err := CGOAudioReadEncode(req.buffer)
|
||||
result := batchReadResult{
|
||||
var length int
|
||||
var err error
|
||||
|
||||
// Handle both legacy and new decode-write operations
|
||||
if req.opusData != nil && req.pcmBuffer != nil {
|
||||
// New style with separate opus data and PCM buffer
|
||||
length, err = CGOAudioDecodeWrite(req.opusData, req.pcmBuffer)
|
||||
} else {
|
||||
// Legacy style with single buffer
|
||||
length, err = CGOAudioDecodeWriteLegacy(req.buffer)
|
||||
}
|
||||
|
||||
result := batchWriteResult{
|
||||
length: length,
|
||||
err: err,
|
||||
}
|
||||
|
|
@ -256,7 +509,7 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
|
|||
}
|
||||
}
|
||||
|
||||
bap.stats.LastBatchTime = time.Now()
|
||||
bap.stats.LastWriteTime = time.Now()
|
||||
}
|
||||
|
||||
// GetStats returns current batch processor statistics
|
||||
|
|
@ -264,11 +517,16 @@ func (bap *BatchAudioProcessor) GetStats() BatchAudioStats {
|
|||
return BatchAudioStats{
|
||||
BatchedReads: atomic.LoadInt64(&bap.stats.BatchedReads),
|
||||
SingleReads: atomic.LoadInt64(&bap.stats.SingleReads),
|
||||
BatchedWrites: atomic.LoadInt64(&bap.stats.BatchedWrites),
|
||||
SingleWrites: atomic.LoadInt64(&bap.stats.SingleWrites),
|
||||
BatchedFrames: atomic.LoadInt64(&bap.stats.BatchedFrames),
|
||||
SingleFrames: atomic.LoadInt64(&bap.stats.SingleFrames),
|
||||
WriteFrames: atomic.LoadInt64(&bap.stats.WriteFrames),
|
||||
CGOCallsReduced: atomic.LoadInt64(&bap.stats.CGOCallsReduced),
|
||||
OSThreadPinTime: bap.stats.OSThreadPinTime,
|
||||
WriteThreadTime: bap.stats.WriteThreadTime,
|
||||
LastBatchTime: bap.stats.LastBatchTime,
|
||||
LastWriteTime: bap.stats.LastWriteTime,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -292,8 +550,11 @@ func GetBatchAudioProcessor() *BatchAudioProcessor {
|
|||
|
||||
// Initialize on first use
|
||||
if atomic.CompareAndSwapInt32(&batchProcessorInitialized, 0, 1) {
|
||||
config := GetConfig()
|
||||
processor := NewBatchAudioProcessor(config.BatchProcessorFramesPerBatch, config.BatchProcessorTimeout)
|
||||
// Get cached config to avoid GetConfig() calls
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
processor := NewBatchAudioProcessor(cache.BatchProcessorFramesPerBatch, cache.BatchProcessorTimeout)
|
||||
atomic.StorePointer(&globalBatchProcessor, unsafe.Pointer(processor))
|
||||
return processor
|
||||
}
|
||||
|
|
@ -327,8 +588,32 @@ func DisableBatchAudioProcessing() {
|
|||
// BatchCGOAudioReadEncode is a batched version of CGOAudioReadEncode
|
||||
func BatchCGOAudioReadEncode(buffer []byte) (int, error) {
|
||||
processor := GetBatchAudioProcessor()
|
||||
if processor != nil && processor.IsRunning() {
|
||||
return processor.BatchReadEncode(buffer)
|
||||
if processor == nil || !processor.IsRunning() {
|
||||
// Fall back to non-batched version if processor is not running
|
||||
return CGOAudioReadEncode(buffer)
|
||||
}
|
||||
return CGOAudioReadEncode(buffer)
|
||||
|
||||
return processor.BatchReadEncode(buffer)
|
||||
}
|
||||
|
||||
// BatchCGOAudioDecodeWrite is a batched version of CGOAudioDecodeWrite
|
||||
func BatchCGOAudioDecodeWrite(buffer []byte) (int, error) {
|
||||
processor := GetBatchAudioProcessor()
|
||||
if processor == nil || !processor.IsRunning() {
|
||||
// Fall back to non-batched version if processor is not running
|
||||
return CGOAudioDecodeWriteLegacy(buffer)
|
||||
}
|
||||
|
||||
return processor.BatchDecodeWrite(buffer)
|
||||
}
|
||||
|
||||
// BatchCGOAudioDecodeWriteWithBuffers is a batched version of CGOAudioDecodeWrite that uses separate opus and PCM buffers
|
||||
func BatchCGOAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
|
||||
processor := GetBatchAudioProcessor()
|
||||
if processor == nil || !processor.IsRunning() {
|
||||
// Fall back to non-batched version if processor is not running
|
||||
return CGOAudioDecodeWrite(opusData, pcmBuffer)
|
||||
}
|
||||
|
||||
return processor.BatchDecodeWriteWithBuffers(opusData, pcmBuffer)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,24 +1,118 @@
|
|||
//go:build cgo
|
||||
// +build cgo
|
||||
|
||||
package audio
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// AudioLatencyInfo holds simplified latency information for cleanup decisions
|
||||
type AudioLatencyInfo struct {
|
||||
LatencyMs float64
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// Global latency tracking
|
||||
var (
|
||||
currentAudioLatency = AudioLatencyInfo{}
|
||||
currentAudioLatencyLock sync.RWMutex
|
||||
audioMonitoringInitialized int32 // Atomic flag to track initialization
|
||||
)
|
||||
|
||||
// InitializeAudioMonitoring starts the background goroutines for latency tracking and cache cleanup
|
||||
// This is safe to call multiple times as it will only initialize once
|
||||
func InitializeAudioMonitoring() {
|
||||
// Use atomic CAS to ensure we only initialize once
|
||||
if atomic.CompareAndSwapInt32(&audioMonitoringInitialized, 0, 1) {
|
||||
// Start the latency recorder
|
||||
startLatencyRecorder()
|
||||
|
||||
// Start the cleanup goroutine
|
||||
startCleanupGoroutine()
|
||||
}
|
||||
}
|
||||
|
||||
// latencyChannel is used for non-blocking latency recording
|
||||
var latencyChannel = make(chan float64, 10)
|
||||
|
||||
// startLatencyRecorder starts the latency recorder goroutine
|
||||
// This should be called during package initialization
|
||||
func startLatencyRecorder() {
|
||||
go latencyRecorderLoop()
|
||||
}
|
||||
|
||||
// latencyRecorderLoop processes latency recordings in the background
|
||||
func latencyRecorderLoop() {
|
||||
for latencyMs := range latencyChannel {
|
||||
currentAudioLatencyLock.Lock()
|
||||
currentAudioLatency = AudioLatencyInfo{
|
||||
LatencyMs: latencyMs,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
currentAudioLatencyLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// RecordAudioLatency records the current audio processing latency
|
||||
// This is called from the audio input manager when latency is measured
|
||||
// It is non-blocking to ensure zero overhead in the critical audio path
|
||||
func RecordAudioLatency(latencyMs float64) {
|
||||
// Non-blocking send - if channel is full, we drop the update
|
||||
select {
|
||||
case latencyChannel <- latencyMs:
|
||||
// Successfully sent
|
||||
default:
|
||||
// Channel full, drop this update to avoid blocking the audio path
|
||||
}
|
||||
}
|
||||
|
||||
// GetAudioLatencyMetrics returns the current audio latency information
|
||||
// Returns nil if no latency data is available or if it's too old
|
||||
func GetAudioLatencyMetrics() *AudioLatencyInfo {
|
||||
currentAudioLatencyLock.RLock()
|
||||
defer currentAudioLatencyLock.RUnlock()
|
||||
|
||||
// Check if we have valid latency data
|
||||
if currentAudioLatency.Timestamp.IsZero() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if the data is too old (more than 5 seconds)
|
||||
if time.Since(currentAudioLatency.Timestamp) > 5*time.Second {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &AudioLatencyInfo{
|
||||
LatencyMs: currentAudioLatency.LatencyMs,
|
||||
Timestamp: currentAudioLatency.Timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
// Lock-free buffer cache for per-goroutine optimization
|
||||
type lockFreeBufferCache struct {
|
||||
buffers [4]*[]byte // Small fixed-size array for lock-free access
|
||||
}
|
||||
|
||||
// TTL tracking for goroutine cache entries
|
||||
type cacheEntry struct {
|
||||
cache *lockFreeBufferCache
|
||||
lastAccess int64 // Unix timestamp of last access
|
||||
gid int64 // Goroutine ID for better tracking
|
||||
}
|
||||
|
||||
// Per-goroutine buffer cache using goroutine-local storage
|
||||
var goroutineBufferCache = make(map[int64]*lockFreeBufferCache)
|
||||
var goroutineCacheMutex sync.RWMutex
|
||||
var lastCleanupTime int64 // Unix timestamp of last cleanup
|
||||
const maxCacheSize = 1000 // Maximum number of goroutine caches
|
||||
const cleanupInterval = 300 // Cleanup interval in seconds (5 minutes)
|
||||
var lastCleanupTime int64 // Unix timestamp of last cleanup
|
||||
const maxCacheSize = 500 // Maximum number of goroutine caches (reduced from 1000)
|
||||
const cleanupInterval int64 = 30 // Cleanup interval in seconds (30 seconds, reduced from 60)
|
||||
const bufferTTL int64 = 60 // Time-to-live for cached buffers in seconds (1 minute, reduced from 2)
|
||||
|
||||
// getGoroutineID extracts goroutine ID from runtime stack for cache key
|
||||
func getGoroutineID() int64 {
|
||||
|
|
@ -39,13 +133,67 @@ func getGoroutineID() int64 {
|
|||
return 0
|
||||
}
|
||||
|
||||
// cleanupGoroutineCache removes stale entries from the goroutine cache
|
||||
func cleanupGoroutineCache() {
|
||||
// Map of goroutine ID to cache entry with TTL tracking
|
||||
var goroutineCacheWithTTL = make(map[int64]*cacheEntry)
|
||||
|
||||
// cleanupChannel is used for asynchronous cleanup requests
|
||||
var cleanupChannel = make(chan struct{}, 1)
|
||||
|
||||
// startCleanupGoroutine starts the cleanup goroutine
|
||||
// This should be called during package initialization
|
||||
func startCleanupGoroutine() {
|
||||
go cleanupLoop()
|
||||
}
|
||||
|
||||
// cleanupLoop processes cleanup requests in the background
|
||||
func cleanupLoop() {
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-cleanupChannel:
|
||||
// Received explicit cleanup request
|
||||
performCleanup(true)
|
||||
case <-ticker.C:
|
||||
// Regular cleanup check
|
||||
performCleanup(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// requestCleanup signals the cleanup goroutine to perform a cleanup
|
||||
// This is non-blocking and can be called from the critical path
|
||||
func requestCleanup() {
|
||||
select {
|
||||
case cleanupChannel <- struct{}{}:
|
||||
// Successfully requested cleanup
|
||||
default:
|
||||
// Channel full, cleanup already pending
|
||||
}
|
||||
}
|
||||
|
||||
// performCleanup does the actual cache cleanup work
|
||||
// This runs in a dedicated goroutine, not in the critical path
|
||||
func performCleanup(forced bool) {
|
||||
now := time.Now().Unix()
|
||||
lastCleanup := atomic.LoadInt64(&lastCleanupTime)
|
||||
|
||||
// Only cleanup if enough time has passed
|
||||
if now-lastCleanup < cleanupInterval {
|
||||
// Check if we're in a high-latency situation
|
||||
isHighLatency := false
|
||||
latencyMetrics := GetAudioLatencyMetrics()
|
||||
if latencyMetrics != nil && latencyMetrics.LatencyMs > 10.0 {
|
||||
// Under high latency, be more aggressive with cleanup
|
||||
isHighLatency = true
|
||||
}
|
||||
|
||||
// Only cleanup if enough time has passed (less time if high latency) or if forced
|
||||
interval := cleanupInterval
|
||||
if isHighLatency {
|
||||
interval = cleanupInterval / 2 // More frequent cleanup under high latency
|
||||
}
|
||||
|
||||
if !forced && now-lastCleanup < interval {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -54,23 +202,93 @@ func cleanupGoroutineCache() {
|
|||
return // Another goroutine is already cleaning up
|
||||
}
|
||||
|
||||
// Perform the actual cleanup
|
||||
doCleanupGoroutineCache()
|
||||
}
|
||||
|
||||
// cleanupGoroutineCache triggers an asynchronous cleanup of the goroutine cache
|
||||
// This is safe to call from the critical path as it's non-blocking
|
||||
func cleanupGoroutineCache() {
|
||||
// Request asynchronous cleanup
|
||||
requestCleanup()
|
||||
}
|
||||
|
||||
// The actual cleanup implementation that runs in the background goroutine
|
||||
func doCleanupGoroutineCache() {
|
||||
// Get current time for TTL calculations
|
||||
now := time.Now().Unix()
|
||||
|
||||
// Check if we're in a high-latency situation
|
||||
isHighLatency := false
|
||||
latencyMetrics := GetAudioLatencyMetrics()
|
||||
if latencyMetrics != nil && latencyMetrics.LatencyMs > 10.0 {
|
||||
// Under high latency, be more aggressive with cleanup
|
||||
isHighLatency = true
|
||||
}
|
||||
|
||||
goroutineCacheMutex.Lock()
|
||||
defer goroutineCacheMutex.Unlock()
|
||||
|
||||
// If cache is too large, remove oldest entries (simple FIFO)
|
||||
if len(goroutineBufferCache) > maxCacheSize {
|
||||
// Remove half of the entries to avoid frequent cleanups
|
||||
toRemove := len(goroutineBufferCache) - maxCacheSize/2
|
||||
count := 0
|
||||
for gid := range goroutineBufferCache {
|
||||
delete(goroutineBufferCache, gid)
|
||||
count++
|
||||
if count >= toRemove {
|
||||
break
|
||||
// Convert old cache format to new TTL-based format if needed
|
||||
if len(goroutineCacheWithTTL) == 0 && len(goroutineBufferCache) > 0 {
|
||||
for gid, cache := range goroutineBufferCache {
|
||||
goroutineCacheWithTTL[gid] = &cacheEntry{
|
||||
cache: cache,
|
||||
lastAccess: now,
|
||||
}
|
||||
}
|
||||
// Log cleanup for debugging (removed logging dependency)
|
||||
_ = count // Avoid unused variable warning
|
||||
// Clear old cache to free memory
|
||||
goroutineBufferCache = make(map[int64]*lockFreeBufferCache)
|
||||
}
|
||||
|
||||
// Remove stale entries based on TTL (more aggressive under high latency)
|
||||
expiredCount := 0
|
||||
ttl := bufferTTL
|
||||
if isHighLatency {
|
||||
// Under high latency, use a much shorter TTL
|
||||
ttl = bufferTTL / 4
|
||||
}
|
||||
|
||||
for gid, entry := range goroutineCacheWithTTL {
|
||||
// Both now and entry.lastAccess are int64, so this comparison is safe
|
||||
if now-entry.lastAccess > ttl {
|
||||
delete(goroutineCacheWithTTL, gid)
|
||||
expiredCount++
|
||||
}
|
||||
}
|
||||
|
||||
// If cache is still too large after TTL cleanup, remove oldest entries
|
||||
// Under high latency, use a more aggressive target size
|
||||
targetSize := maxCacheSize
|
||||
targetReduction := maxCacheSize / 2
|
||||
|
||||
if isHighLatency {
|
||||
// Under high latency, target a much smaller cache size
|
||||
targetSize = maxCacheSize / 4
|
||||
targetReduction = maxCacheSize / 8
|
||||
}
|
||||
|
||||
if len(goroutineCacheWithTTL) > targetSize {
|
||||
// Find oldest entries
|
||||
type ageEntry struct {
|
||||
gid int64
|
||||
lastAccess int64
|
||||
}
|
||||
oldestEntries := make([]ageEntry, 0, len(goroutineCacheWithTTL))
|
||||
for gid, entry := range goroutineCacheWithTTL {
|
||||
oldestEntries = append(oldestEntries, ageEntry{gid, entry.lastAccess})
|
||||
}
|
||||
|
||||
// Sort by lastAccess (oldest first)
|
||||
sort.Slice(oldestEntries, func(i, j int) bool {
|
||||
return oldestEntries[i].lastAccess < oldestEntries[j].lastAccess
|
||||
})
|
||||
|
||||
// Remove oldest entries to get down to target reduction size
|
||||
toRemove := len(goroutineCacheWithTTL) - targetReduction
|
||||
for i := 0; i < toRemove && i < len(oldestEntries); i++ {
|
||||
delete(goroutineCacheWithTTL, oldestEntries[i].gid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -133,39 +351,29 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool {
|
|||
}
|
||||
|
||||
func (p *AudioBufferPool) Get() []byte {
|
||||
// Trigger periodic cleanup of goroutine cache
|
||||
cleanupGoroutineCache()
|
||||
|
||||
start := time.Now()
|
||||
wasHit := false
|
||||
defer func() {
|
||||
latency := time.Since(start)
|
||||
// Record metrics for frame pool (assuming this is the main usage)
|
||||
if p.bufferSize >= GetConfig().AudioFramePoolSize {
|
||||
GetGranularMetricsCollector().RecordFramePoolGet(latency, wasHit)
|
||||
} else {
|
||||
GetGranularMetricsCollector().RecordControlPoolGet(latency, wasHit)
|
||||
}
|
||||
}()
|
||||
// Skip cleanup trigger in hotpath - cleanup runs in background
|
||||
// cleanupGoroutineCache() - moved to background goroutine
|
||||
|
||||
// Fast path: Try lock-free per-goroutine cache first
|
||||
gid := getGoroutineID()
|
||||
goroutineCacheMutex.RLock()
|
||||
cache, exists := goroutineBufferCache[gid]
|
||||
cacheEntry, exists := goroutineCacheWithTTL[gid]
|
||||
goroutineCacheMutex.RUnlock()
|
||||
|
||||
if exists && cache != nil {
|
||||
if exists && cacheEntry != nil && cacheEntry.cache != nil {
|
||||
// Try to get buffer from lock-free cache
|
||||
cache := cacheEntry.cache
|
||||
for i := 0; i < len(cache.buffers); i++ {
|
||||
bufPtr := (*unsafe.Pointer)(unsafe.Pointer(&cache.buffers[i]))
|
||||
buf := (*[]byte)(atomic.LoadPointer(bufPtr))
|
||||
if buf != nil && atomic.CompareAndSwapPointer(bufPtr, unsafe.Pointer(buf), nil) {
|
||||
atomic.AddInt64(&p.hitCount, 1)
|
||||
wasHit = true
|
||||
*buf = (*buf)[:0]
|
||||
return *buf
|
||||
}
|
||||
}
|
||||
// Update access time only after cache miss to reduce overhead
|
||||
cacheEntry.lastAccess = time.Now().Unix()
|
||||
}
|
||||
|
||||
// Fallback: Try pre-allocated pool with mutex
|
||||
|
|
@ -175,11 +383,7 @@ func (p *AudioBufferPool) Get() []byte {
|
|||
buf := p.preallocated[lastIdx]
|
||||
p.preallocated = p.preallocated[:lastIdx]
|
||||
p.mutex.Unlock()
|
||||
|
||||
// Update hit counter
|
||||
atomic.AddInt64(&p.hitCount, 1)
|
||||
wasHit = true
|
||||
// Ensure buffer is properly reset
|
||||
*buf = (*buf)[:0]
|
||||
return *buf
|
||||
}
|
||||
|
|
@ -188,18 +392,14 @@ func (p *AudioBufferPool) Get() []byte {
|
|||
// Try sync.Pool next
|
||||
if poolBuf := p.pool.Get(); poolBuf != nil {
|
||||
buf := poolBuf.(*[]byte)
|
||||
// Update hit counter
|
||||
atomic.AddInt64(&p.hitCount, 1)
|
||||
// Ensure buffer is properly reset and check capacity
|
||||
atomic.AddInt64(&p.currentSize, -1)
|
||||
// Fast capacity check - most buffers should be correct size
|
||||
if cap(*buf) >= p.bufferSize {
|
||||
wasHit = true
|
||||
*buf = (*buf)[:0]
|
||||
return *buf
|
||||
} else {
|
||||
// Buffer too small, allocate new one
|
||||
atomic.AddInt64(&p.missCount, 1)
|
||||
return make([]byte, 0, p.bufferSize)
|
||||
}
|
||||
// Buffer too small, fall through to allocation
|
||||
}
|
||||
|
||||
// Pool miss - allocate new buffer with exact capacity
|
||||
|
|
@ -208,18 +408,7 @@ func (p *AudioBufferPool) Get() []byte {
|
|||
}
|
||||
|
||||
func (p *AudioBufferPool) Put(buf []byte) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
latency := time.Since(start)
|
||||
// Record metrics for frame pool (assuming this is the main usage)
|
||||
if p.bufferSize >= GetConfig().AudioFramePoolSize {
|
||||
GetGranularMetricsCollector().RecordFramePoolPut(latency, cap(buf))
|
||||
} else {
|
||||
GetGranularMetricsCollector().RecordControlPoolPut(latency, cap(buf))
|
||||
}
|
||||
}()
|
||||
|
||||
// Validate buffer capacity - reject buffers that are too small or too large
|
||||
// Fast validation - reject buffers that are too small or too large
|
||||
bufCap := cap(buf)
|
||||
if bufCap < p.bufferSize || bufCap > p.bufferSize*2 {
|
||||
return // Buffer size mismatch, don't pool it to prevent memory bloat
|
||||
|
|
@ -231,14 +420,23 @@ func (p *AudioBufferPool) Put(buf []byte) {
|
|||
// Fast path: Try to put in lock-free per-goroutine cache
|
||||
gid := getGoroutineID()
|
||||
goroutineCacheMutex.RLock()
|
||||
cache, exists := goroutineBufferCache[gid]
|
||||
entryWithTTL, exists := goroutineCacheWithTTL[gid]
|
||||
goroutineCacheMutex.RUnlock()
|
||||
|
||||
if !exists {
|
||||
var cache *lockFreeBufferCache
|
||||
if exists && entryWithTTL != nil {
|
||||
cache = entryWithTTL.cache
|
||||
// Update access time only when we successfully use the cache
|
||||
} else {
|
||||
// Create new cache for this goroutine
|
||||
cache = &lockFreeBufferCache{}
|
||||
now := time.Now().Unix()
|
||||
goroutineCacheMutex.Lock()
|
||||
goroutineBufferCache[gid] = cache
|
||||
goroutineCacheWithTTL[gid] = &cacheEntry{
|
||||
cache: cache,
|
||||
lastAccess: now,
|
||||
gid: gid,
|
||||
}
|
||||
goroutineCacheMutex.Unlock()
|
||||
}
|
||||
|
||||
|
|
@ -247,6 +445,10 @@ func (p *AudioBufferPool) Put(buf []byte) {
|
|||
for i := 0; i < len(cache.buffers); i++ {
|
||||
bufPtr := (*unsafe.Pointer)(unsafe.Pointer(&cache.buffers[i]))
|
||||
if atomic.CompareAndSwapPointer(bufPtr, nil, unsafe.Pointer(&buf)) {
|
||||
// Update access time only on successful cache
|
||||
if exists && entryWithTTL != nil {
|
||||
entryWithTTL.lastAccess = time.Now().Unix()
|
||||
}
|
||||
return // Successfully cached
|
||||
}
|
||||
}
|
||||
|
|
@ -262,15 +464,12 @@ func (p *AudioBufferPool) Put(buf []byte) {
|
|||
p.mutex.Unlock()
|
||||
|
||||
// Check sync.Pool size limit to prevent excessive memory usage
|
||||
currentSize := atomic.LoadInt64(&p.currentSize)
|
||||
if currentSize >= int64(p.maxPoolSize) {
|
||||
if atomic.LoadInt64(&p.currentSize) >= int64(p.maxPoolSize) {
|
||||
return // Pool is full, let GC handle this buffer
|
||||
}
|
||||
|
||||
// Return to sync.Pool
|
||||
// Return to sync.Pool and update counter atomically
|
||||
p.pool.Put(&resetBuf)
|
||||
|
||||
// Update pool size counter atomically
|
||||
atomic.AddInt64(&p.currentSize, 1)
|
||||
}
|
||||
|
||||
|
|
@ -333,6 +532,8 @@ type AudioBufferPoolDetailedStats struct {
|
|||
HitCount int64
|
||||
MissCount int64
|
||||
HitRate float64 // Percentage
|
||||
TotalBytes int64 // Total memory usage in bytes
|
||||
AverageBufferSize float64 // Average size of buffers in the pool
|
||||
}
|
||||
|
||||
// GetAudioBufferPoolStats returns statistics about the audio buffer pools
|
||||
|
|
|
|||
|
|
@ -5,6 +5,9 @@ package audio
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
|
|
@ -673,23 +676,26 @@ func newAudioDecodeWriteError(cErrorCode int) error {
|
|||
}
|
||||
|
||||
func cgoAudioInit() error {
|
||||
// Update C constants from Go configuration
|
||||
config := GetConfig()
|
||||
// Get cached config and ensure it's updated
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// Update C constants from cached config (atomic access, no locks)
|
||||
C.update_audio_constants(
|
||||
C.int(config.CGOOpusBitrate),
|
||||
C.int(config.CGOOpusComplexity),
|
||||
C.int(config.CGOOpusVBR),
|
||||
C.int(config.CGOOpusVBRConstraint),
|
||||
C.int(config.CGOOpusSignalType),
|
||||
C.int(config.CGOOpusBandwidth),
|
||||
C.int(config.CGOOpusDTX),
|
||||
C.int(config.CGOSampleRate),
|
||||
C.int(config.CGOChannels),
|
||||
C.int(config.CGOFrameSize),
|
||||
C.int(config.CGOMaxPacketSize),
|
||||
C.int(config.CGOUsleepMicroseconds),
|
||||
C.int(config.CGOMaxAttempts),
|
||||
C.int(config.CGOMaxBackoffMicroseconds),
|
||||
C.int(cache.opusBitrate.Load()),
|
||||
C.int(cache.opusComplexity.Load()),
|
||||
C.int(cache.opusVBR.Load()),
|
||||
C.int(cache.opusVBRConstraint.Load()),
|
||||
C.int(cache.opusSignalType.Load()),
|
||||
C.int(cache.opusBandwidth.Load()),
|
||||
C.int(cache.opusDTX.Load()),
|
||||
C.int(cache.sampleRate.Load()),
|
||||
C.int(cache.channels.Load()),
|
||||
C.int(cache.frameSize.Load()),
|
||||
C.int(cache.maxPacketSize.Load()),
|
||||
C.int(GetConfig().CGOUsleepMicroseconds),
|
||||
C.int(GetConfig().CGOMaxAttempts),
|
||||
C.int(GetConfig().CGOMaxBackoffMicroseconds),
|
||||
)
|
||||
|
||||
result := C.jetkvm_audio_init()
|
||||
|
|
@ -703,30 +709,223 @@ func cgoAudioClose() {
|
|||
C.jetkvm_audio_close()
|
||||
}
|
||||
|
||||
// AudioConfigCache provides a comprehensive caching system for audio configuration
|
||||
// to minimize GetConfig() calls in the hot path
|
||||
type AudioConfigCache struct {
|
||||
// Atomic fields for lock-free access to frequently used values
|
||||
minReadEncodeBuffer atomic.Int32
|
||||
maxDecodeWriteBuffer atomic.Int32
|
||||
maxPacketSize atomic.Int32
|
||||
maxPCMBufferSize atomic.Int32
|
||||
opusBitrate atomic.Int32
|
||||
opusComplexity atomic.Int32
|
||||
opusVBR atomic.Int32
|
||||
opusVBRConstraint atomic.Int32
|
||||
opusSignalType atomic.Int32
|
||||
opusBandwidth atomic.Int32
|
||||
opusDTX atomic.Int32
|
||||
sampleRate atomic.Int32
|
||||
channels atomic.Int32
|
||||
frameSize atomic.Int32
|
||||
|
||||
// Additional cached values for validation functions
|
||||
maxAudioFrameSize atomic.Int32
|
||||
maxChannels atomic.Int32
|
||||
minFrameDuration atomic.Int64 // Store as nanoseconds
|
||||
maxFrameDuration atomic.Int64 // Store as nanoseconds
|
||||
minOpusBitrate atomic.Int32
|
||||
maxOpusBitrate atomic.Int32
|
||||
|
||||
// Batch processing related values
|
||||
BatchProcessingTimeout time.Duration
|
||||
BatchProcessorFramesPerBatch int
|
||||
BatchProcessorTimeout time.Duration
|
||||
BatchProcessingDelay time.Duration
|
||||
MinBatchSizeForThreadPinning int
|
||||
|
||||
// Mutex for updating the cache
|
||||
mutex sync.RWMutex
|
||||
lastUpdate time.Time
|
||||
cacheExpiry time.Duration
|
||||
initialized atomic.Bool
|
||||
|
||||
// Pre-allocated errors to avoid allocations in hot path
|
||||
bufferTooSmallReadEncode error
|
||||
bufferTooLargeDecodeWrite error
|
||||
}
|
||||
|
||||
// Global audio config cache instance
|
||||
var globalAudioConfigCache = &AudioConfigCache{
|
||||
cacheExpiry: 30 * time.Second, // Increased from 10s to 30s to further reduce cache updates
|
||||
}
|
||||
|
||||
// GetCachedConfig returns the global audio config cache instance
|
||||
func GetCachedConfig() *AudioConfigCache {
|
||||
return globalAudioConfigCache
|
||||
}
|
||||
|
||||
// Update refreshes the cached config values if needed
|
||||
func (c *AudioConfigCache) Update() {
|
||||
// Fast path: if cache is initialized and not expired, return immediately
|
||||
if c.initialized.Load() {
|
||||
c.mutex.RLock()
|
||||
cacheExpired := time.Since(c.lastUpdate) > c.cacheExpiry
|
||||
c.mutex.RUnlock()
|
||||
if !cacheExpired {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Slow path: update cache
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
// Double-check after acquiring lock
|
||||
if !c.initialized.Load() || time.Since(c.lastUpdate) > c.cacheExpiry {
|
||||
config := GetConfig() // Call GetConfig() only once
|
||||
|
||||
// Update atomic values for lock-free access - CGO values
|
||||
c.minReadEncodeBuffer.Store(int32(config.MinReadEncodeBuffer))
|
||||
c.maxDecodeWriteBuffer.Store(int32(config.MaxDecodeWriteBuffer))
|
||||
c.maxPacketSize.Store(int32(config.CGOMaxPacketSize))
|
||||
c.maxPCMBufferSize.Store(int32(config.MaxPCMBufferSize))
|
||||
c.opusBitrate.Store(int32(config.CGOOpusBitrate))
|
||||
c.opusComplexity.Store(int32(config.CGOOpusComplexity))
|
||||
c.opusVBR.Store(int32(config.CGOOpusVBR))
|
||||
c.opusVBRConstraint.Store(int32(config.CGOOpusVBRConstraint))
|
||||
c.opusSignalType.Store(int32(config.CGOOpusSignalType))
|
||||
c.opusBandwidth.Store(int32(config.CGOOpusBandwidth))
|
||||
c.opusDTX.Store(int32(config.CGOOpusDTX))
|
||||
c.sampleRate.Store(int32(config.CGOSampleRate))
|
||||
c.channels.Store(int32(config.CGOChannels))
|
||||
c.frameSize.Store(int32(config.CGOFrameSize))
|
||||
|
||||
// Update additional validation values
|
||||
c.maxAudioFrameSize.Store(int32(config.MaxAudioFrameSize))
|
||||
c.maxChannels.Store(int32(config.MaxChannels))
|
||||
c.minFrameDuration.Store(int64(config.MinFrameDuration))
|
||||
c.maxFrameDuration.Store(int64(config.MaxFrameDuration))
|
||||
c.minOpusBitrate.Store(int32(config.MinOpusBitrate))
|
||||
c.maxOpusBitrate.Store(int32(config.MaxOpusBitrate))
|
||||
|
||||
// Update batch processing related values
|
||||
c.BatchProcessingTimeout = 100 * time.Millisecond // Fixed timeout for batch processing
|
||||
c.BatchProcessorFramesPerBatch = config.BatchProcessorFramesPerBatch
|
||||
c.BatchProcessorTimeout = config.BatchProcessorTimeout
|
||||
c.BatchProcessingDelay = config.BatchProcessingDelay
|
||||
c.MinBatchSizeForThreadPinning = config.MinBatchSizeForThreadPinning
|
||||
|
||||
// Pre-allocate common errors
|
||||
c.bufferTooSmallReadEncode = newBufferTooSmallError(0, config.MinReadEncodeBuffer)
|
||||
c.bufferTooLargeDecodeWrite = newBufferTooLargeError(config.MaxDecodeWriteBuffer+1, config.MaxDecodeWriteBuffer)
|
||||
|
||||
c.lastUpdate = time.Now()
|
||||
c.initialized.Store(true)
|
||||
|
||||
// Update the global validation cache as well
|
||||
if cachedMaxFrameSize != 0 {
|
||||
cachedMaxFrameSize = config.MaxAudioFrameSize
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetMinReadEncodeBuffer returns the cached MinReadEncodeBuffer value
|
||||
func (c *AudioConfigCache) GetMinReadEncodeBuffer() int {
|
||||
return int(c.minReadEncodeBuffer.Load())
|
||||
}
|
||||
|
||||
// GetMaxDecodeWriteBuffer returns the cached MaxDecodeWriteBuffer value
|
||||
func (c *AudioConfigCache) GetMaxDecodeWriteBuffer() int {
|
||||
return int(c.maxDecodeWriteBuffer.Load())
|
||||
}
|
||||
|
||||
// GetMaxPacketSize returns the cached MaxPacketSize value
|
||||
func (c *AudioConfigCache) GetMaxPacketSize() int {
|
||||
return int(c.maxPacketSize.Load())
|
||||
}
|
||||
|
||||
// GetMaxPCMBufferSize returns the cached MaxPCMBufferSize value
|
||||
func (c *AudioConfigCache) GetMaxPCMBufferSize() int {
|
||||
return int(c.maxPCMBufferSize.Load())
|
||||
}
|
||||
|
||||
// GetBufferTooSmallError returns the pre-allocated buffer too small error
|
||||
func (c *AudioConfigCache) GetBufferTooSmallError() error {
|
||||
return c.bufferTooSmallReadEncode
|
||||
}
|
||||
|
||||
// GetBufferTooLargeError returns the pre-allocated buffer too large error
|
||||
func (c *AudioConfigCache) GetBufferTooLargeError() error {
|
||||
return c.bufferTooLargeDecodeWrite
|
||||
}
|
||||
|
||||
// Removed duplicate config caching system - using AudioConfigCache instead
|
||||
|
||||
func cgoAudioReadEncode(buf []byte) (int, error) {
|
||||
minRequired := GetConfig().MinReadEncodeBuffer
|
||||
// Fast path: Use AudioConfigCache to avoid GetConfig() in hot path
|
||||
cache := GetCachedConfig()
|
||||
// Only update cache if expired - avoid unnecessary overhead
|
||||
// Use proper locking to avoid race condition
|
||||
if cache.initialized.Load() {
|
||||
cache.mutex.RLock()
|
||||
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
|
||||
cache.mutex.RUnlock()
|
||||
if cacheExpired {
|
||||
cache.Update()
|
||||
}
|
||||
} else {
|
||||
cache.Update()
|
||||
}
|
||||
|
||||
// Fast validation with cached values - avoid lock with atomic access
|
||||
minRequired := cache.GetMinReadEncodeBuffer()
|
||||
|
||||
// Buffer validation - use pre-allocated error for common case
|
||||
if len(buf) < minRequired {
|
||||
return 0, newBufferTooSmallError(len(buf), minRequired)
|
||||
// Use pre-allocated error for common case, only create custom error for edge cases
|
||||
if len(buf) > 0 {
|
||||
return 0, newBufferTooSmallError(len(buf), minRequired)
|
||||
}
|
||||
return 0, cache.GetBufferTooSmallError()
|
||||
}
|
||||
|
||||
// Skip initialization check for now to avoid CGO compilation issues
|
||||
// TODO: Add proper initialization state checking
|
||||
// Note: The C code already has comprehensive state tracking with capture_initialized,
|
||||
// capture_initializing, playback_initialized, and playback_initializing flags.
|
||||
// When CGO environment is properly configured, this should check C.capture_initialized.
|
||||
|
||||
// Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is safe for validated non-empty buffers
|
||||
n := C.jetkvm_audio_read_encode(unsafe.Pointer(&buf[0]))
|
||||
|
||||
// Fast path for success case
|
||||
if n > 0 {
|
||||
return int(n), nil
|
||||
}
|
||||
|
||||
// Handle error cases - use static error codes to reduce allocations
|
||||
if n < 0 {
|
||||
return 0, newAudioReadEncodeError(int(n))
|
||||
// Common error cases
|
||||
switch n {
|
||||
case -1:
|
||||
return 0, errAudioInitFailed
|
||||
case -2:
|
||||
return 0, errAudioReadEncode
|
||||
default:
|
||||
return 0, newAudioReadEncodeError(int(n))
|
||||
}
|
||||
}
|
||||
if n == 0 {
|
||||
return 0, nil // No data available
|
||||
}
|
||||
return int(n), nil
|
||||
|
||||
// n == 0 case
|
||||
return 0, nil // No data available
|
||||
}
|
||||
|
||||
// Audio playback functions
|
||||
func cgoAudioPlaybackInit() error {
|
||||
// Get cached config and ensure it's updated
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// No need to update C constants here as they're already set in cgoAudioInit
|
||||
|
||||
ret := C.jetkvm_audio_playback_init()
|
||||
if ret != 0 {
|
||||
return newAudioPlaybackInitError(int(ret))
|
||||
|
|
@ -738,34 +937,58 @@ func cgoAudioPlaybackClose() {
|
|||
C.jetkvm_audio_playback_close()
|
||||
}
|
||||
|
||||
func cgoAudioDecodeWrite(buf []byte) (int, error) {
|
||||
func cgoAudioDecodeWrite(buf []byte) (n int, err error) {
|
||||
// Fast validation with AudioConfigCache
|
||||
cache := GetCachedConfig()
|
||||
// Only update cache if expired - avoid unnecessary overhead
|
||||
// Use proper locking to avoid race condition
|
||||
if cache.initialized.Load() {
|
||||
cache.mutex.RLock()
|
||||
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
|
||||
cache.mutex.RUnlock()
|
||||
if cacheExpired {
|
||||
cache.Update()
|
||||
}
|
||||
} else {
|
||||
cache.Update()
|
||||
}
|
||||
|
||||
// Optimized buffer validation
|
||||
if len(buf) == 0 {
|
||||
return 0, errEmptyBuffer
|
||||
}
|
||||
if buf == nil {
|
||||
return 0, errNilBuffer
|
||||
}
|
||||
maxAllowed := GetConfig().MaxDecodeWriteBuffer
|
||||
|
||||
// Use cached max buffer size with atomic access
|
||||
maxAllowed := cache.GetMaxDecodeWriteBuffer()
|
||||
if len(buf) > maxAllowed {
|
||||
// Use pre-allocated error for common case
|
||||
if len(buf) == maxAllowed+1 {
|
||||
return 0, cache.GetBufferTooLargeError()
|
||||
}
|
||||
return 0, newBufferTooLargeError(len(buf), maxAllowed)
|
||||
}
|
||||
|
||||
bufPtr := unsafe.Pointer(&buf[0])
|
||||
if bufPtr == nil {
|
||||
return 0, errInvalidBufferPtr
|
||||
// Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is safe for validated non-empty buffers
|
||||
n = int(C.jetkvm_audio_decode_write(unsafe.Pointer(&buf[0]), C.int(len(buf))))
|
||||
|
||||
// Fast path for success case
|
||||
if n >= 0 {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
_ = r
|
||||
}
|
||||
}()
|
||||
|
||||
n := C.jetkvm_audio_decode_write(bufPtr, C.int(len(buf)))
|
||||
if n < 0 {
|
||||
return 0, newAudioDecodeWriteError(int(n))
|
||||
// Handle error cases with static error codes
|
||||
switch n {
|
||||
case -1:
|
||||
n = 0
|
||||
err = errAudioInitFailed
|
||||
case -2:
|
||||
n = 0
|
||||
err = errAudioDecodeWrite
|
||||
default:
|
||||
n = 0
|
||||
err = newAudioDecodeWriteError(n)
|
||||
}
|
||||
return int(n), nil
|
||||
return
|
||||
}
|
||||
|
||||
// updateOpusEncoderParams dynamically updates OPUS encoder parameters
|
||||
|
|
@ -785,13 +1008,406 @@ func updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType
|
|||
return nil
|
||||
}
|
||||
|
||||
// CGO function aliases
|
||||
// Buffer pool for reusing buffers in CGO functions
|
||||
var (
|
||||
CGOAudioInit = cgoAudioInit
|
||||
CGOAudioClose = cgoAudioClose
|
||||
CGOAudioReadEncode = cgoAudioReadEncode
|
||||
CGOAudioPlaybackInit = cgoAudioPlaybackInit
|
||||
CGOAudioPlaybackClose = cgoAudioPlaybackClose
|
||||
CGOAudioDecodeWrite = cgoAudioDecodeWrite
|
||||
CGOUpdateOpusEncoderParams = updateOpusEncoderParams
|
||||
// Using SizedBufferPool for better memory management
|
||||
// Track buffer pool usage for monitoring
|
||||
cgoBufferPoolGets atomic.Int64
|
||||
cgoBufferPoolPuts atomic.Int64
|
||||
// Batch processing statistics - only enabled in debug builds
|
||||
batchProcessingCount atomic.Int64
|
||||
batchFrameCount atomic.Int64
|
||||
batchProcessingTime atomic.Int64
|
||||
// Flag to control time tracking overhead
|
||||
enableBatchTimeTracking atomic.Bool
|
||||
)
|
||||
|
||||
// GetBufferFromPool gets a buffer from the pool with at least the specified capacity
|
||||
func GetBufferFromPool(minCapacity int) []byte {
|
||||
cgoBufferPoolGets.Add(1)
|
||||
// Use the SizedBufferPool for better memory management
|
||||
return GetOptimalBuffer(minCapacity)
|
||||
}
|
||||
|
||||
// ReturnBufferToPool returns a buffer to the pool
|
||||
func ReturnBufferToPool(buf []byte) {
|
||||
cgoBufferPoolPuts.Add(1)
|
||||
// Use the SizedBufferPool for better memory management
|
||||
ReturnOptimalBuffer(buf)
|
||||
}
|
||||
|
||||
// Note: AudioFrameBatch is now defined in batch_audio.go
|
||||
// This is kept here for reference but commented out to avoid conflicts
|
||||
/*
|
||||
// AudioFrameBatch represents a batch of audio frames for processing
|
||||
type AudioFrameBatch struct {
|
||||
// Buffer for batch processing
|
||||
buffer []byte
|
||||
// Number of frames in the batch
|
||||
frameCount int
|
||||
// Size of each frame
|
||||
frameSize int
|
||||
// Current position in the buffer
|
||||
position int
|
||||
}
|
||||
|
||||
// NewAudioFrameBatch creates a new audio frame batch with the specified capacity
|
||||
func NewAudioFrameBatch(maxFrames int) *AudioFrameBatch {
|
||||
// Get cached config
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// Calculate frame size based on cached config
|
||||
frameSize := cache.GetMinReadEncodeBuffer()
|
||||
|
||||
// Create batch with buffer sized for maxFrames
|
||||
return &AudioFrameBatch{
|
||||
buffer: GetBufferFromPool(maxFrames * frameSize),
|
||||
frameCount: 0,
|
||||
frameSize: frameSize,
|
||||
position: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// AddFrame adds a frame to the batch
|
||||
// Returns true if the batch is full after adding this frame
|
||||
func (b *AudioFrameBatch) AddFrame(frame []byte) bool {
|
||||
// Calculate position in buffer for this frame
|
||||
pos := b.position
|
||||
|
||||
// Copy frame data to batch buffer
|
||||
copy(b.buffer[pos:pos+len(frame)], frame)
|
||||
|
||||
// Update position and frame count
|
||||
b.position += len(frame)
|
||||
b.frameCount++
|
||||
|
||||
// Check if batch is full (buffer capacity reached)
|
||||
return b.position >= len(b.buffer)
|
||||
}
|
||||
|
||||
// Reset resets the batch for reuse
|
||||
func (b *AudioFrameBatch) Reset() {
|
||||
b.frameCount = 0
|
||||
b.position = 0
|
||||
}
|
||||
|
||||
// Release returns the batch buffer to the pool
|
||||
func (b *AudioFrameBatch) Release() {
|
||||
ReturnBufferToPool(b.buffer)
|
||||
b.buffer = nil
|
||||
b.frameCount = 0
|
||||
b.frameSize = 0
|
||||
b.position = 0
|
||||
}
|
||||
*/
|
||||
|
||||
// ReadEncodeWithPooledBuffer reads audio data and encodes it using a buffer from the pool
|
||||
// This reduces memory allocations by reusing buffers
|
||||
func ReadEncodeWithPooledBuffer() ([]byte, int, error) {
|
||||
// Get cached config
|
||||
cache := GetCachedConfig()
|
||||
// Only update cache if expired - avoid unnecessary overhead
|
||||
// Use proper locking to avoid race condition
|
||||
if cache.initialized.Load() {
|
||||
cache.mutex.RLock()
|
||||
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
|
||||
cache.mutex.RUnlock()
|
||||
if cacheExpired {
|
||||
cache.Update()
|
||||
}
|
||||
} else {
|
||||
cache.Update()
|
||||
}
|
||||
|
||||
// Get a buffer from the pool with appropriate capacity
|
||||
bufferSize := cache.GetMinReadEncodeBuffer()
|
||||
if bufferSize == 0 {
|
||||
bufferSize = 1500 // Fallback if cache not initialized
|
||||
}
|
||||
|
||||
// Get buffer from pool
|
||||
buf := GetBufferFromPool(bufferSize)
|
||||
|
||||
// Perform read/encode operation
|
||||
n, err := cgoAudioReadEncode(buf)
|
||||
if err != nil {
|
||||
// Return buffer to pool on error
|
||||
ReturnBufferToPool(buf)
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// Resize buffer to actual data size
|
||||
result := buf[:n]
|
||||
|
||||
// Return the buffer with data
|
||||
return result, n, nil
|
||||
}
|
||||
|
||||
// DecodeWriteWithPooledBuffer decodes and writes audio data using a pooled buffer
|
||||
// The caller is responsible for returning the input buffer to the pool if needed
|
||||
func DecodeWriteWithPooledBuffer(data []byte) (int, error) {
|
||||
// Validate input
|
||||
if len(data) == 0 {
|
||||
return 0, errEmptyBuffer
|
||||
}
|
||||
|
||||
// Get cached config
|
||||
cache := GetCachedConfig()
|
||||
// Only update cache if expired - avoid unnecessary overhead
|
||||
// Use proper locking to avoid race condition
|
||||
if cache.initialized.Load() {
|
||||
cache.mutex.RLock()
|
||||
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
|
||||
cache.mutex.RUnlock()
|
||||
if cacheExpired {
|
||||
cache.Update()
|
||||
}
|
||||
} else {
|
||||
cache.Update()
|
||||
}
|
||||
|
||||
// Ensure data doesn't exceed max packet size
|
||||
maxPacketSize := cache.GetMaxPacketSize()
|
||||
if len(data) > maxPacketSize {
|
||||
return 0, newBufferTooLargeError(len(data), maxPacketSize)
|
||||
}
|
||||
|
||||
// Get a PCM buffer from the pool for optimized decode-write
|
||||
pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize())
|
||||
defer ReturnBufferToPool(pcmBuffer)
|
||||
|
||||
// Perform decode/write operation using optimized implementation
|
||||
n, err := CGOAudioDecodeWrite(data, pcmBuffer)
|
||||
|
||||
// Return result
|
||||
return n, err
|
||||
}
|
||||
|
||||
// BatchReadEncode reads and encodes multiple audio frames in a single batch
|
||||
// This reduces CGO call overhead by processing multiple frames at once
|
||||
func BatchReadEncode(batchSize int) ([][]byte, error) {
|
||||
// Get cached config
|
||||
cache := GetCachedConfig()
|
||||
// Only update cache if expired - avoid unnecessary overhead
|
||||
// Use proper locking to avoid race condition
|
||||
if cache.initialized.Load() {
|
||||
cache.mutex.RLock()
|
||||
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
|
||||
cache.mutex.RUnlock()
|
||||
if cacheExpired {
|
||||
cache.Update()
|
||||
}
|
||||
} else {
|
||||
cache.Update()
|
||||
}
|
||||
|
||||
// Calculate total buffer size needed for batch
|
||||
frameSize := cache.GetMinReadEncodeBuffer()
|
||||
totalSize := frameSize * batchSize
|
||||
|
||||
// Get a single large buffer for all frames
|
||||
batchBuffer := GetBufferFromPool(totalSize)
|
||||
defer ReturnBufferToPool(batchBuffer)
|
||||
|
||||
// Pre-allocate frame result buffers from pool to avoid allocations in loop
|
||||
frameBuffers := make([][]byte, 0, batchSize)
|
||||
for i := 0; i < batchSize; i++ {
|
||||
frameBuffers = append(frameBuffers, GetBufferFromPool(frameSize))
|
||||
}
|
||||
defer func() {
|
||||
// Return all frame buffers to pool
|
||||
for _, buf := range frameBuffers {
|
||||
ReturnBufferToPool(buf)
|
||||
}
|
||||
}()
|
||||
|
||||
// Track batch processing statistics - only if enabled
|
||||
var startTime time.Time
|
||||
trackTime := enableBatchTimeTracking.Load()
|
||||
if trackTime {
|
||||
startTime = time.Now()
|
||||
}
|
||||
batchProcessingCount.Add(1)
|
||||
|
||||
// Process frames in batch
|
||||
frames := make([][]byte, 0, batchSize)
|
||||
for i := 0; i < batchSize; i++ {
|
||||
// Calculate offset for this frame in the batch buffer
|
||||
offset := i * frameSize
|
||||
frameBuf := batchBuffer[offset : offset+frameSize]
|
||||
|
||||
// Process this frame
|
||||
n, err := cgoAudioReadEncode(frameBuf)
|
||||
if err != nil {
|
||||
// Return partial batch on error
|
||||
if i > 0 {
|
||||
batchFrameCount.Add(int64(i))
|
||||
if trackTime {
|
||||
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
||||
}
|
||||
return frames, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reuse pre-allocated buffer instead of make([]byte, n)
|
||||
frameCopy := frameBuffers[i][:n] // Slice to actual size
|
||||
copy(frameCopy, frameBuf[:n])
|
||||
frames = append(frames, frameCopy)
|
||||
}
|
||||
|
||||
// Update statistics
|
||||
batchFrameCount.Add(int64(len(frames)))
|
||||
if trackTime {
|
||||
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
||||
}
|
||||
|
||||
return frames, nil
|
||||
}
|
||||
|
||||
// BatchDecodeWrite decodes and writes multiple audio frames in a single batch
|
||||
// This reduces CGO call overhead by processing multiple frames at once
|
||||
func BatchDecodeWrite(frames [][]byte) error {
|
||||
// Validate input
|
||||
if len(frames) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get cached config
|
||||
cache := GetCachedConfig()
|
||||
// Only update cache if expired - avoid unnecessary overhead
|
||||
// Use proper locking to avoid race condition
|
||||
if cache.initialized.Load() {
|
||||
cache.mutex.RLock()
|
||||
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
|
||||
cache.mutex.RUnlock()
|
||||
if cacheExpired {
|
||||
cache.Update()
|
||||
}
|
||||
} else {
|
||||
cache.Update()
|
||||
}
|
||||
|
||||
// Track batch processing statistics - only if enabled
|
||||
var startTime time.Time
|
||||
trackTime := enableBatchTimeTracking.Load()
|
||||
if trackTime {
|
||||
startTime = time.Now()
|
||||
}
|
||||
batchProcessingCount.Add(1)
|
||||
|
||||
// Get a PCM buffer from the pool for optimized decode-write
|
||||
pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize())
|
||||
defer ReturnBufferToPool(pcmBuffer)
|
||||
|
||||
// Process each frame
|
||||
frameCount := 0
|
||||
for _, frame := range frames {
|
||||
// Skip empty frames
|
||||
if len(frame) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Process this frame using optimized implementation
|
||||
_, err := CGOAudioDecodeWrite(frame, pcmBuffer)
|
||||
if err != nil {
|
||||
// Update statistics before returning error
|
||||
batchFrameCount.Add(int64(frameCount))
|
||||
if trackTime {
|
||||
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
frameCount++
|
||||
}
|
||||
|
||||
// Update statistics
|
||||
batchFrameCount.Add(int64(frameCount))
|
||||
if trackTime {
|
||||
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetBatchProcessingStats returns statistics about batch processing
|
||||
func GetBatchProcessingStats() (count, frames, avgTimeUs int64) {
|
||||
count = batchProcessingCount.Load()
|
||||
frames = batchFrameCount.Load()
|
||||
totalTime := batchProcessingTime.Load()
|
||||
|
||||
// Calculate average time per batch
|
||||
if count > 0 {
|
||||
avgTimeUs = totalTime / count
|
||||
}
|
||||
|
||||
return count, frames, avgTimeUs
|
||||
}
|
||||
|
||||
// cgoAudioDecodeWriteWithBuffers decodes opus data and writes to PCM buffer
|
||||
// This implementation uses separate buffers for opus data and PCM output
|
||||
func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
|
||||
// Validate input
|
||||
if len(opusData) == 0 {
|
||||
return 0, errEmptyBuffer
|
||||
}
|
||||
if len(pcmBuffer) == 0 {
|
||||
return 0, errEmptyBuffer
|
||||
}
|
||||
|
||||
// Get cached config
|
||||
cache := GetCachedConfig()
|
||||
// Only update cache if expired - avoid unnecessary overhead
|
||||
// Use proper locking to avoid race condition
|
||||
if cache.initialized.Load() {
|
||||
cache.mutex.RLock()
|
||||
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
|
||||
cache.mutex.RUnlock()
|
||||
if cacheExpired {
|
||||
cache.Update()
|
||||
}
|
||||
} else {
|
||||
cache.Update()
|
||||
}
|
||||
|
||||
// Ensure data doesn't exceed max packet size
|
||||
maxPacketSize := cache.GetMaxPacketSize()
|
||||
if len(opusData) > maxPacketSize {
|
||||
return 0, newBufferTooLargeError(len(opusData), maxPacketSize)
|
||||
}
|
||||
|
||||
// Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is never nil for non-empty slices
|
||||
n := int(C.jetkvm_audio_decode_write(unsafe.Pointer(&opusData[0]), C.int(len(opusData))))
|
||||
|
||||
// Fast path for success case
|
||||
if n >= 0 {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// Handle error cases with static error codes to reduce allocations
|
||||
switch n {
|
||||
case -1:
|
||||
return 0, errAudioInitFailed
|
||||
case -2:
|
||||
return 0, errAudioDecodeWrite
|
||||
default:
|
||||
return 0, newAudioDecodeWriteError(n)
|
||||
}
|
||||
}
|
||||
|
||||
// Optimized CGO function aliases - use direct function calls to reduce overhead
|
||||
// These are now direct function aliases instead of variable assignments
|
||||
func CGOAudioInit() error { return cgoAudioInit() }
|
||||
func CGOAudioClose() { cgoAudioClose() }
|
||||
func CGOAudioReadEncode(buf []byte) (int, error) { return cgoAudioReadEncode(buf) }
|
||||
func CGOAudioPlaybackInit() error { return cgoAudioPlaybackInit() }
|
||||
func CGOAudioPlaybackClose() { cgoAudioPlaybackClose() }
|
||||
func CGOAudioDecodeWriteLegacy(buf []byte) (int, error) { return cgoAudioDecodeWrite(buf) }
|
||||
func CGOAudioDecodeWrite(opusData []byte, pcmBuffer []byte) (int, error) {
|
||||
return cgoAudioDecodeWriteWithBuffers(opusData, pcmBuffer)
|
||||
}
|
||||
func CGOUpdateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx int) error {
|
||||
return updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,6 +30,11 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) {
|
|||
return 0, errors.New("audio not available in lint mode")
|
||||
}
|
||||
|
||||
// cgoAudioDecodeWriteWithBuffers is a stub implementation for the optimized decode-write function
|
||||
func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
|
||||
return 0, errors.New("audio not available in lint mode")
|
||||
}
|
||||
|
||||
// Uppercase aliases for external API compatibility
|
||||
|
||||
var (
|
||||
|
|
@ -38,5 +43,6 @@ var (
|
|||
CGOAudioReadEncode = cgoAudioReadEncode
|
||||
CGOAudioPlaybackInit = cgoAudioPlaybackInit
|
||||
CGOAudioPlaybackClose = cgoAudioPlaybackClose
|
||||
CGOAudioDecodeWrite = cgoAudioDecodeWrite
|
||||
CGOAudioDecodeWriteLegacy = cgoAudioDecodeWrite
|
||||
CGOAudioDecodeWrite = cgoAudioDecodeWriteWithBuffers
|
||||
)
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import (
|
|||
type AudioConfigConstants struct {
|
||||
// Audio Quality Presets
|
||||
MaxAudioFrameSize int // Maximum audio frame size in bytes (default: 4096)
|
||||
MaxPCMBufferSize int // Maximum PCM buffer size in bytes for separate buffer optimization
|
||||
|
||||
// Opus Encoding Parameters
|
||||
OpusBitrate int // Target bitrate for Opus encoding in bps (default: 128000)
|
||||
|
|
@ -379,6 +380,18 @@ type AudioConfigConstants struct {
|
|||
// Default 4096 bytes handles maximum audio frame size with safety margin.
|
||||
MaxDecodeWriteBuffer int
|
||||
|
||||
// MinBatchSizeForThreadPinning defines the minimum batch size required to pin a thread.
|
||||
// Used in: batch_audio.go for deciding when to pin a thread for batch processing.
|
||||
// Impact: Smaller values increase thread pinning frequency but may improve performance.
|
||||
// Default 5 frames provides a good balance between performance and thread contention.
|
||||
MinBatchSizeForThreadPinning int
|
||||
|
||||
// GoroutineMonitorInterval defines the interval for monitoring goroutine counts.
|
||||
// Used in: goroutine_monitor.go for periodic goroutine count checks.
|
||||
// Impact: Shorter intervals provide more frequent monitoring but increase overhead.
|
||||
// Default 30 seconds provides reasonable monitoring frequency with minimal overhead.
|
||||
GoroutineMonitorInterval time.Duration
|
||||
|
||||
// IPC Configuration - Inter-Process Communication settings for audio components
|
||||
// Used in: ipc.go for configuring audio process communication
|
||||
// Impact: Controls IPC reliability, performance, and protocol compliance
|
||||
|
|
@ -1531,6 +1544,40 @@ type AudioConfigConstants struct {
|
|||
LatencyBucket500ms time.Duration // 500ms latency bucket
|
||||
LatencyBucket1s time.Duration // 1s latency bucket
|
||||
LatencyBucket2s time.Duration // 2s latency bucket
|
||||
|
||||
// Goroutine Pool Configuration
|
||||
// Used in: goroutine_pool.go for managing reusable goroutines
|
||||
// Impact: Reduces goroutine creation overhead and improves performance
|
||||
|
||||
// MaxAudioProcessorWorkers defines maximum number of workers in the audio processor pool.
|
||||
// Used in: goroutine_pool.go for limiting concurrent audio processing goroutines
|
||||
// Impact: Controls resource usage while ensuring sufficient processing capacity.
|
||||
// Default 8 provides good parallelism without excessive resource consumption.
|
||||
MaxAudioProcessorWorkers int
|
||||
|
||||
// MaxAudioReaderWorkers defines maximum number of workers in the audio reader pool.
|
||||
// Used in: goroutine_pool.go for limiting concurrent audio reading goroutines
|
||||
// Impact: Controls resource usage while ensuring sufficient reading capacity.
|
||||
// Default 4 provides good parallelism for I/O operations.
|
||||
MaxAudioReaderWorkers int
|
||||
|
||||
// AudioProcessorQueueSize defines the task queue size for the audio processor pool.
|
||||
// Used in: goroutine_pool.go for buffering audio processing tasks
|
||||
// Impact: Larger queue allows more tasks to be buffered during load spikes.
|
||||
// Default 32 provides good buffering without excessive memory usage.
|
||||
AudioProcessorQueueSize int
|
||||
|
||||
// AudioReaderQueueSize defines the task queue size for the audio reader pool.
|
||||
// Used in: goroutine_pool.go for buffering audio reading tasks
|
||||
// Impact: Larger queue allows more tasks to be buffered during load spikes.
|
||||
// Default 16 provides good buffering for I/O operations.
|
||||
AudioReaderQueueSize int
|
||||
|
||||
// WorkerMaxIdleTime defines how long a worker goroutine can remain idle before termination.
|
||||
// Used in: goroutine_pool.go for efficient worker lifecycle management
|
||||
// Impact: Shorter times reduce resource usage, longer times improve responsiveness.
|
||||
// Default 30s balances resource usage with startup latency.
|
||||
WorkerMaxIdleTime time.Duration
|
||||
}
|
||||
|
||||
// DefaultAudioConfig returns the default configuration constants
|
||||
|
|
@ -1540,6 +1587,7 @@ func DefaultAudioConfig() *AudioConfigConstants {
|
|||
return &AudioConfigConstants{
|
||||
// Audio Quality Presets
|
||||
MaxAudioFrameSize: 4096,
|
||||
MaxPCMBufferSize: 8192, // Default PCM buffer size (2x MaxAudioFrameSize for safety)
|
||||
|
||||
// Opus Encoding Parameters
|
||||
OpusBitrate: 128000,
|
||||
|
|
@ -2388,6 +2436,13 @@ func DefaultAudioConfig() *AudioConfigConstants {
|
|||
EventTimeFormatString: "2006-01-02T15:04:05.000Z", // "2006-01-02T15:04:05.000Z" time format
|
||||
EventSubscriptionDelayMS: 100, // 100ms subscription delay
|
||||
|
||||
// Goroutine Pool Configuration
|
||||
MaxAudioProcessorWorkers: 16, // 16 workers for audio processing tasks
|
||||
MaxAudioReaderWorkers: 8, // 8 workers for audio reading tasks
|
||||
AudioProcessorQueueSize: 64, // 64 tasks queue size for processor pool
|
||||
AudioReaderQueueSize: 32, // 32 tasks queue size for reader pool
|
||||
WorkerMaxIdleTime: 60 * time.Second, // 60s maximum idle time before worker termination
|
||||
|
||||
// Input Processing Constants
|
||||
InputProcessingTimeoutMS: 10, // 10ms processing timeout threshold
|
||||
|
||||
|
|
@ -2464,6 +2519,12 @@ func DefaultAudioConfig() *AudioConfigConstants {
|
|||
LatencyBucket500ms: 500 * time.Millisecond, // 500ms latency bucket
|
||||
LatencyBucket1s: 1 * time.Second, // 1s latency bucket
|
||||
LatencyBucket2s: 2 * time.Second, // 2s latency bucket
|
||||
|
||||
// Batch Audio Processing Configuration
|
||||
MinBatchSizeForThreadPinning: 5, // Minimum batch size to pin thread
|
||||
|
||||
// Goroutine Monitoring Configuration
|
||||
GoroutineMonitorInterval: 30 * time.Second, // 30s monitoring interval
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,145 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/jetkvm/kvm/internal/logging"
|
||||
)
|
||||
|
||||
// GoroutineMonitor tracks goroutine count and provides cleanup mechanisms
|
||||
type GoroutineMonitor struct {
|
||||
baselineCount int
|
||||
peakCount int
|
||||
lastCount int
|
||||
monitorInterval time.Duration
|
||||
lastCheck time.Time
|
||||
enabled int32
|
||||
}
|
||||
|
||||
// Global goroutine monitor instance
|
||||
var globalGoroutineMonitor *GoroutineMonitor
|
||||
|
||||
// NewGoroutineMonitor creates a new goroutine monitor
|
||||
func NewGoroutineMonitor(monitorInterval time.Duration) *GoroutineMonitor {
|
||||
if monitorInterval <= 0 {
|
||||
monitorInterval = 30 * time.Second
|
||||
}
|
||||
|
||||
// Get current goroutine count as baseline
|
||||
baselineCount := runtime.NumGoroutine()
|
||||
|
||||
return &GoroutineMonitor{
|
||||
baselineCount: baselineCount,
|
||||
peakCount: baselineCount,
|
||||
lastCount: baselineCount,
|
||||
monitorInterval: monitorInterval,
|
||||
lastCheck: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins goroutine monitoring
|
||||
func (gm *GoroutineMonitor) Start() {
|
||||
if !atomic.CompareAndSwapInt32(&gm.enabled, 0, 1) {
|
||||
return // Already running
|
||||
}
|
||||
|
||||
go gm.monitorLoop()
|
||||
}
|
||||
|
||||
// Stop stops goroutine monitoring
|
||||
func (gm *GoroutineMonitor) Stop() {
|
||||
atomic.StoreInt32(&gm.enabled, 0)
|
||||
}
|
||||
|
||||
// monitorLoop periodically checks goroutine count
|
||||
func (gm *GoroutineMonitor) monitorLoop() {
|
||||
logger := logging.GetDefaultLogger().With().Str("component", "goroutine-monitor").Logger()
|
||||
logger.Info().Int("baseline", gm.baselineCount).Msg("goroutine monitor started")
|
||||
|
||||
for atomic.LoadInt32(&gm.enabled) == 1 {
|
||||
time.Sleep(gm.monitorInterval)
|
||||
gm.checkGoroutineCount()
|
||||
}
|
||||
|
||||
logger.Info().Msg("goroutine monitor stopped")
|
||||
}
|
||||
|
||||
// checkGoroutineCount checks current goroutine count and logs if it exceeds thresholds
|
||||
func (gm *GoroutineMonitor) checkGoroutineCount() {
|
||||
currentCount := runtime.NumGoroutine()
|
||||
gm.lastCount = currentCount
|
||||
|
||||
// Update peak count if needed
|
||||
if currentCount > gm.peakCount {
|
||||
gm.peakCount = currentCount
|
||||
}
|
||||
|
||||
// Calculate growth since baseline
|
||||
growth := currentCount - gm.baselineCount
|
||||
growthPercent := float64(growth) / float64(gm.baselineCount) * 100
|
||||
|
||||
// Log warning if growth exceeds thresholds
|
||||
logger := logging.GetDefaultLogger().With().Str("component", "goroutine-monitor").Logger()
|
||||
|
||||
// Different log levels based on growth severity
|
||||
if growthPercent > 30 {
|
||||
// Severe growth - trigger cleanup
|
||||
logger.Warn().Int("current", currentCount).Int("baseline", gm.baselineCount).
|
||||
Int("growth", growth).Float64("growth_percent", growthPercent).
|
||||
Msg("excessive goroutine growth detected - triggering cleanup")
|
||||
|
||||
// Force garbage collection to clean up unused resources
|
||||
runtime.GC()
|
||||
|
||||
// Force cleanup of goroutine buffer cache
|
||||
cleanupGoroutineCache()
|
||||
} else if growthPercent > 20 {
|
||||
// Moderate growth - just log warning
|
||||
logger.Warn().Int("current", currentCount).Int("baseline", gm.baselineCount).
|
||||
Int("growth", growth).Float64("growth_percent", growthPercent).
|
||||
Msg("significant goroutine growth detected")
|
||||
} else if growthPercent > 10 {
|
||||
// Minor growth - log info
|
||||
logger.Info().Int("current", currentCount).Int("baseline", gm.baselineCount).
|
||||
Int("growth", growth).Float64("growth_percent", growthPercent).
|
||||
Msg("goroutine growth detected")
|
||||
}
|
||||
|
||||
// Update last check time
|
||||
gm.lastCheck = time.Now()
|
||||
}
|
||||
|
||||
// GetGoroutineStats returns current goroutine statistics
|
||||
func (gm *GoroutineMonitor) GetGoroutineStats() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"current_count": gm.lastCount,
|
||||
"baseline_count": gm.baselineCount,
|
||||
"peak_count": gm.peakCount,
|
||||
"growth": gm.lastCount - gm.baselineCount,
|
||||
"growth_percent": float64(gm.lastCount-gm.baselineCount) / float64(gm.baselineCount) * 100,
|
||||
"last_check": gm.lastCheck,
|
||||
}
|
||||
}
|
||||
|
||||
// GetGoroutineMonitor returns the global goroutine monitor instance
|
||||
func GetGoroutineMonitor() *GoroutineMonitor {
|
||||
if globalGoroutineMonitor == nil {
|
||||
globalGoroutineMonitor = NewGoroutineMonitor(GetConfig().GoroutineMonitorInterval)
|
||||
}
|
||||
return globalGoroutineMonitor
|
||||
}
|
||||
|
||||
// StartGoroutineMonitoring starts the global goroutine monitor
|
||||
func StartGoroutineMonitoring() {
|
||||
monitor := GetGoroutineMonitor()
|
||||
monitor.Start()
|
||||
}
|
||||
|
||||
// StopGoroutineMonitoring stops the global goroutine monitor
|
||||
func StopGoroutineMonitoring() {
|
||||
if globalGoroutineMonitor != nil {
|
||||
globalGoroutineMonitor.Stop()
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,283 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/jetkvm/kvm/internal/logging"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// Task represents a function to be executed by a worker in the pool
|
||||
type Task func()
|
||||
|
||||
// GoroutinePool manages a pool of reusable goroutines to reduce the overhead
|
||||
// of goroutine creation and destruction
|
||||
type GoroutinePool struct {
|
||||
// Atomic fields must be first for proper alignment on 32-bit systems
|
||||
taskCount int64 // Number of tasks processed
|
||||
workerCount int64 // Current number of workers
|
||||
maxIdleTime time.Duration
|
||||
maxWorkers int
|
||||
taskQueue chan Task
|
||||
workerSem chan struct{} // Semaphore to limit concurrent workers
|
||||
shutdown chan struct{}
|
||||
shutdownOnce sync.Once
|
||||
wg sync.WaitGroup
|
||||
logger *zerolog.Logger
|
||||
name string
|
||||
}
|
||||
|
||||
// NewGoroutinePool creates a new goroutine pool with the specified parameters
|
||||
func NewGoroutinePool(name string, maxWorkers int, queueSize int, maxIdleTime time.Duration) *GoroutinePool {
|
||||
logger := logging.GetDefaultLogger().With().Str("component", "goroutine-pool").Str("pool", name).Logger()
|
||||
|
||||
pool := &GoroutinePool{
|
||||
maxWorkers: maxWorkers,
|
||||
maxIdleTime: maxIdleTime,
|
||||
taskQueue: make(chan Task, queueSize),
|
||||
workerSem: make(chan struct{}, maxWorkers),
|
||||
shutdown: make(chan struct{}),
|
||||
logger: &logger,
|
||||
name: name,
|
||||
}
|
||||
|
||||
// Start a supervisor goroutine to monitor pool health
|
||||
go pool.supervisor()
|
||||
|
||||
return pool
|
||||
}
|
||||
|
||||
// Submit adds a task to the pool for execution
|
||||
// Returns true if the task was accepted, false if the queue is full
|
||||
func (p *GoroutinePool) Submit(task Task) bool {
|
||||
select {
|
||||
case <-p.shutdown:
|
||||
return false // Pool is shutting down
|
||||
case p.taskQueue <- task:
|
||||
// Task accepted, ensure we have a worker to process it
|
||||
p.ensureWorkerAvailable()
|
||||
return true
|
||||
default:
|
||||
// Queue is full
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// ensureWorkerAvailable makes sure at least one worker is available to process tasks
|
||||
func (p *GoroutinePool) ensureWorkerAvailable() {
|
||||
// Check if we already have enough workers
|
||||
currentWorkers := atomic.LoadInt64(&p.workerCount)
|
||||
|
||||
// Only start new workers if:
|
||||
// 1. We have no workers at all, or
|
||||
// 2. The queue is growing and we're below max workers
|
||||
queueLen := len(p.taskQueue)
|
||||
if currentWorkers == 0 || (queueLen > int(currentWorkers) && currentWorkers < int64(p.maxWorkers)) {
|
||||
// Try to acquire a semaphore slot without blocking
|
||||
select {
|
||||
case p.workerSem <- struct{}{}:
|
||||
// We got a slot, start a new worker
|
||||
p.startWorker()
|
||||
default:
|
||||
// All worker slots are taken, which means we have enough workers
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// startWorker launches a new worker goroutine
|
||||
func (p *GoroutinePool) startWorker() {
|
||||
p.wg.Add(1)
|
||||
atomic.AddInt64(&p.workerCount, 1)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
atomic.AddInt64(&p.workerCount, -1)
|
||||
<-p.workerSem // Release the semaphore slot
|
||||
p.wg.Done()
|
||||
|
||||
// Recover from panics in worker tasks
|
||||
if r := recover(); r != nil {
|
||||
p.logger.Error().Interface("panic", r).Msg("Worker recovered from panic")
|
||||
}
|
||||
}()
|
||||
|
||||
idleTimer := time.NewTimer(p.maxIdleTime)
|
||||
defer idleTimer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-p.shutdown:
|
||||
return
|
||||
case task, ok := <-p.taskQueue:
|
||||
if !ok {
|
||||
return // Channel closed
|
||||
}
|
||||
|
||||
// Reset idle timer
|
||||
if !idleTimer.Stop() {
|
||||
<-idleTimer.C
|
||||
}
|
||||
idleTimer.Reset(p.maxIdleTime)
|
||||
|
||||
// Execute the task with panic recovery
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
p.logger.Error().Interface("panic", r).Msg("Task execution panic recovered")
|
||||
}
|
||||
}()
|
||||
task()
|
||||
}()
|
||||
|
||||
atomic.AddInt64(&p.taskCount, 1)
|
||||
case <-idleTimer.C:
|
||||
// Worker has been idle for too long
|
||||
// Keep at least 2 workers alive to handle incoming tasks without creating new goroutines
|
||||
if atomic.LoadInt64(&p.workerCount) > 2 {
|
||||
return
|
||||
}
|
||||
// For persistent workers (the minimum 2), use a longer idle timeout
|
||||
// This prevents excessive worker creation/destruction cycles
|
||||
idleTimer.Reset(p.maxIdleTime * 3) // Triple the idle time for persistent workers
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// supervisor monitors the pool and logs statistics periodically
|
||||
func (p *GoroutinePool) supervisor() {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-p.shutdown:
|
||||
return
|
||||
case <-ticker.C:
|
||||
workers := atomic.LoadInt64(&p.workerCount)
|
||||
tasks := atomic.LoadInt64(&p.taskCount)
|
||||
queueLen := len(p.taskQueue)
|
||||
|
||||
p.logger.Info().
|
||||
Int64("workers", workers).
|
||||
Int64("tasks_processed", tasks).
|
||||
Int("queue_length", queueLen).
|
||||
Msg("Pool statistics")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the pool
|
||||
// If wait is true, it will wait for all tasks to complete
|
||||
// If wait is false, it will terminate immediately, potentially leaving tasks unprocessed
|
||||
func (p *GoroutinePool) Shutdown(wait bool) {
|
||||
p.shutdownOnce.Do(func() {
|
||||
close(p.shutdown)
|
||||
|
||||
if wait {
|
||||
// Wait for all tasks to be processed
|
||||
if len(p.taskQueue) > 0 {
|
||||
p.logger.Info().Int("remaining_tasks", len(p.taskQueue)).Msg("Waiting for tasks to complete")
|
||||
}
|
||||
|
||||
// Close the task queue to signal no more tasks
|
||||
close(p.taskQueue)
|
||||
|
||||
// Wait for all workers to finish
|
||||
p.wg.Wait()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// GetStats returns statistics about the pool
|
||||
func (p *GoroutinePool) GetStats() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"name": p.name,
|
||||
"worker_count": atomic.LoadInt64(&p.workerCount),
|
||||
"max_workers": p.maxWorkers,
|
||||
"tasks_processed": atomic.LoadInt64(&p.taskCount),
|
||||
"queue_length": len(p.taskQueue),
|
||||
"queue_capacity": cap(p.taskQueue),
|
||||
}
|
||||
}
|
||||
|
||||
// Global pools for different audio processing tasks
|
||||
var (
|
||||
globalAudioProcessorPool atomic.Pointer[GoroutinePool]
|
||||
globalAudioReaderPool atomic.Pointer[GoroutinePool]
|
||||
globalAudioProcessorInitOnce sync.Once
|
||||
globalAudioReaderInitOnce sync.Once
|
||||
)
|
||||
|
||||
// GetAudioProcessorPool returns the global audio processor pool
|
||||
func GetAudioProcessorPool() *GoroutinePool {
|
||||
pool := globalAudioProcessorPool.Load()
|
||||
if pool != nil {
|
||||
return pool
|
||||
}
|
||||
|
||||
globalAudioProcessorInitOnce.Do(func() {
|
||||
config := GetConfig()
|
||||
newPool := NewGoroutinePool(
|
||||
"audio-processor",
|
||||
config.MaxAudioProcessorWorkers,
|
||||
config.AudioProcessorQueueSize,
|
||||
config.WorkerMaxIdleTime,
|
||||
)
|
||||
globalAudioProcessorPool.Store(newPool)
|
||||
pool = newPool
|
||||
})
|
||||
|
||||
return globalAudioProcessorPool.Load()
|
||||
}
|
||||
|
||||
// GetAudioReaderPool returns the global audio reader pool
|
||||
func GetAudioReaderPool() *GoroutinePool {
|
||||
pool := globalAudioReaderPool.Load()
|
||||
if pool != nil {
|
||||
return pool
|
||||
}
|
||||
|
||||
globalAudioReaderInitOnce.Do(func() {
|
||||
config := GetConfig()
|
||||
newPool := NewGoroutinePool(
|
||||
"audio-reader",
|
||||
config.MaxAudioReaderWorkers,
|
||||
config.AudioReaderQueueSize,
|
||||
config.WorkerMaxIdleTime,
|
||||
)
|
||||
globalAudioReaderPool.Store(newPool)
|
||||
pool = newPool
|
||||
})
|
||||
|
||||
return globalAudioReaderPool.Load()
|
||||
}
|
||||
|
||||
// SubmitAudioProcessorTask submits a task to the audio processor pool
|
||||
func SubmitAudioProcessorTask(task Task) bool {
|
||||
return GetAudioProcessorPool().Submit(task)
|
||||
}
|
||||
|
||||
// SubmitAudioReaderTask submits a task to the audio reader pool
|
||||
func SubmitAudioReaderTask(task Task) bool {
|
||||
return GetAudioReaderPool().Submit(task)
|
||||
}
|
||||
|
||||
// ShutdownAudioPools shuts down all audio goroutine pools
|
||||
func ShutdownAudioPools(wait bool) {
|
||||
logger := logging.GetDefaultLogger().With().Str("component", "audio-pools").Logger()
|
||||
|
||||
processorPool := globalAudioProcessorPool.Load()
|
||||
if processorPool != nil {
|
||||
logger.Info().Msg("Shutting down audio processor pool")
|
||||
processorPool.Shutdown(wait)
|
||||
}
|
||||
|
||||
readerPool := globalAudioReaderPool.Load()
|
||||
if readerPool != nil {
|
||||
logger.Info().Msg("Shutting down audio reader pool")
|
||||
readerPool.Shutdown(wait)
|
||||
}
|
||||
}
|
||||
|
|
@ -96,9 +96,13 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error {
|
|||
|
||||
// Log high latency warnings
|
||||
if processingTime > time.Duration(GetConfig().InputProcessingTimeoutMS)*time.Millisecond {
|
||||
latencyMs := float64(processingTime.Milliseconds())
|
||||
aim.logger.Warn().
|
||||
Dur("latency_ms", processingTime).
|
||||
Float64("latency_ms", latencyMs).
|
||||
Msg("High audio processing latency detected")
|
||||
|
||||
// Record latency for goroutine cleanup optimization
|
||||
RecordAudioLatency(latencyMs)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
|
@ -132,9 +136,13 @@ func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame)
|
|||
|
||||
// Log high latency warnings
|
||||
if processingTime > time.Duration(GetConfig().InputProcessingTimeoutMS)*time.Millisecond {
|
||||
latencyMs := float64(processingTime.Milliseconds())
|
||||
aim.logger.Warn().
|
||||
Dur("latency_ms", processingTime).
|
||||
Float64("latency_ms", latencyMs).
|
||||
Msg("High audio processing latency detected")
|
||||
|
||||
// Record latency for goroutine cleanup optimization
|
||||
RecordAudioLatency(latencyMs)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -289,8 +289,14 @@ func (ais *AudioInputServer) Start() error {
|
|||
ais.startProcessorGoroutine()
|
||||
ais.startMonitorGoroutine()
|
||||
|
||||
// Accept connections in a goroutine
|
||||
go ais.acceptConnections()
|
||||
// Submit the connection acceptor to the audio reader pool
|
||||
if !SubmitAudioReaderTask(ais.acceptConnections) {
|
||||
// If the pool is full or shutting down, fall back to direct goroutine creation
|
||||
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
|
||||
logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation")
|
||||
|
||||
go ais.acceptConnections()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -360,8 +366,14 @@ func (ais *AudioInputServer) acceptConnections() {
|
|||
ais.conn = conn
|
||||
ais.mtx.Unlock()
|
||||
|
||||
// Handle this connection
|
||||
go ais.handleConnection(conn)
|
||||
// Handle this connection using the goroutine pool
|
||||
if !SubmitAudioReaderTask(func() { ais.handleConnection(conn) }) {
|
||||
// If the pool is full or shutting down, fall back to direct goroutine creation
|
||||
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
|
||||
logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation")
|
||||
|
||||
go ais.handleConnection(conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -489,19 +501,27 @@ func (ais *AudioInputServer) processMessage(msg *InputIPCMessage) error {
|
|||
|
||||
// processOpusFrame processes an Opus audio frame
|
||||
func (ais *AudioInputServer) processOpusFrame(data []byte) error {
|
||||
if len(data) == 0 {
|
||||
return nil // Empty frame, ignore
|
||||
// Fast path: skip empty frame check - caller should handle this
|
||||
dataLen := len(data)
|
||||
if dataLen == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Use ultra-fast validation for critical audio path
|
||||
if err := ValidateAudioFrame(data); err != nil {
|
||||
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
|
||||
logger.Error().Err(err).Msg("Frame validation failed")
|
||||
return fmt.Errorf("input frame validation failed: %w", err)
|
||||
// Inline validation for critical audio path - avoid function call overhead
|
||||
if dataLen > cachedMaxFrameSize {
|
||||
return ErrFrameDataTooLarge
|
||||
}
|
||||
|
||||
// Process the Opus frame using CGO
|
||||
_, err := CGOAudioDecodeWrite(data)
|
||||
// Get cached config once - avoid repeated calls and locking
|
||||
cache := GetCachedConfig()
|
||||
// Skip cache expiry check in hotpath - background updates handle this
|
||||
|
||||
// Get a PCM buffer from the pool for optimized decode-write
|
||||
pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize())
|
||||
defer ReturnBufferToPool(pcmBuffer)
|
||||
|
||||
// Direct CGO call - avoid wrapper function overhead
|
||||
_, err := CGOAudioDecodeWrite(data, pcmBuffer)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
@ -690,25 +710,20 @@ func (aic *AudioInputClient) SendFrame(frame []byte) error {
|
|||
return fmt.Errorf("not connected to audio input server")
|
||||
}
|
||||
|
||||
if len(frame) == 0 {
|
||||
frameLen := len(frame)
|
||||
if frameLen == 0 {
|
||||
return nil // Empty frame, ignore
|
||||
}
|
||||
|
||||
// Validate frame data before sending
|
||||
if err := ValidateAudioFrame(frame); err != nil {
|
||||
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
|
||||
logger.Error().Err(err).Msg("Frame validation failed")
|
||||
return fmt.Errorf("input frame validation failed: %w", err)
|
||||
}
|
||||
|
||||
if len(frame) > maxFrameSize {
|
||||
return fmt.Errorf("frame too large: got %d bytes, maximum allowed %d bytes", len(frame), maxFrameSize)
|
||||
// Inline frame validation to reduce function call overhead
|
||||
if frameLen > maxFrameSize {
|
||||
return ErrFrameDataTooLarge
|
||||
}
|
||||
|
||||
msg := &InputIPCMessage{
|
||||
Magic: inputMagicNumber,
|
||||
Type: InputMessageTypeOpusFrame,
|
||||
Length: uint32(len(frame)),
|
||||
Length: uint32(frameLen),
|
||||
Timestamp: time.Now().UnixNano(),
|
||||
Data: frame,
|
||||
}
|
||||
|
|
@ -725,26 +740,25 @@ func (aic *AudioInputClient) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error
|
|||
return fmt.Errorf("not connected to audio input server")
|
||||
}
|
||||
|
||||
if frame == nil || frame.Length() == 0 {
|
||||
if frame == nil {
|
||||
return nil // Nil frame, ignore
|
||||
}
|
||||
|
||||
frameLen := frame.Length()
|
||||
if frameLen == 0 {
|
||||
return nil // Empty frame, ignore
|
||||
}
|
||||
|
||||
// Validate zero-copy frame before sending
|
||||
if err := ValidateZeroCopyFrame(frame); err != nil {
|
||||
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
|
||||
logger.Error().Err(err).Msg("Zero-copy frame validation failed")
|
||||
return fmt.Errorf("input frame validation failed: %w", err)
|
||||
}
|
||||
|
||||
if frame.Length() > maxFrameSize {
|
||||
return fmt.Errorf("frame too large: got %d bytes, maximum allowed %d bytes", frame.Length(), maxFrameSize)
|
||||
// Inline frame validation to reduce function call overhead
|
||||
if frameLen > maxFrameSize {
|
||||
return ErrFrameDataTooLarge
|
||||
}
|
||||
|
||||
// Use zero-copy data directly
|
||||
msg := &InputIPCMessage{
|
||||
Magic: inputMagicNumber,
|
||||
Type: InputMessageTypeOpusFrame,
|
||||
Length: uint32(frame.Length()),
|
||||
Length: uint32(frameLen),
|
||||
Timestamp: time.Now().UnixNano(),
|
||||
Data: frame.Data(), // Zero-copy data access
|
||||
}
|
||||
|
|
@ -878,10 +892,12 @@ func (aic *AudioInputClient) ResetStats() {
|
|||
ResetFrameStats(&aic.totalFrames, &aic.droppedFrames)
|
||||
}
|
||||
|
||||
// startReaderGoroutine starts the message reader goroutine
|
||||
// startReaderGoroutine starts the message reader using the goroutine pool
|
||||
func (ais *AudioInputServer) startReaderGoroutine() {
|
||||
ais.wg.Add(1)
|
||||
go func() {
|
||||
|
||||
// Create a reader task that will run in the goroutine pool
|
||||
readerTask := func() {
|
||||
defer ais.wg.Done()
|
||||
|
||||
// Enhanced error tracking and recovery
|
||||
|
|
@ -913,10 +929,7 @@ func (ais *AudioInputServer) startReaderGoroutine() {
|
|||
consecutiveErrors++
|
||||
lastErrorTime = now
|
||||
|
||||
// Log error with context
|
||||
logger.Warn().Err(err).
|
||||
Int("consecutive_errors", consecutiveErrors).
|
||||
Msg("Failed to read message from input connection")
|
||||
// Skip logging in hotpath for performance - only log critical errors
|
||||
|
||||
// Progressive backoff based on error count
|
||||
if consecutiveErrors > 1 {
|
||||
|
|
@ -966,26 +979,33 @@ func (ais *AudioInputServer) startReaderGoroutine() {
|
|||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Submit the reader task to the audio reader pool
|
||||
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
|
||||
if !SubmitAudioReaderTask(readerTask) {
|
||||
// If the pool is full or shutting down, fall back to direct goroutine creation
|
||||
logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation")
|
||||
|
||||
go readerTask()
|
||||
}
|
||||
}
|
||||
|
||||
// startProcessorGoroutine starts the message processor goroutine
|
||||
// startProcessorGoroutine starts the message processor using the goroutine pool
|
||||
func (ais *AudioInputServer) startProcessorGoroutine() {
|
||||
ais.wg.Add(1)
|
||||
go func() {
|
||||
|
||||
// Create a processor task that will run in the goroutine pool
|
||||
processorTask := func() {
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
// Set high priority for audio processing
|
||||
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
|
||||
if err := SetAudioThreadPriority(); err != nil {
|
||||
logger.Warn().Err(err).Msg("Failed to set audio processing priority")
|
||||
}
|
||||
defer func() {
|
||||
if err := ResetThreadPriority(); err != nil {
|
||||
logger.Warn().Err(err).Msg("Failed to reset thread priority")
|
||||
}
|
||||
}()
|
||||
// Set high priority for audio processing - skip logging in hotpath
|
||||
_ = SetAudioThreadPriority()
|
||||
defer func() { _ = ResetThreadPriority() }()
|
||||
|
||||
// Create logger for this goroutine
|
||||
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
|
||||
|
||||
// Enhanced error tracking for processing
|
||||
var processingErrors int
|
||||
|
|
@ -1014,17 +1034,10 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
|
|||
processingErrors++
|
||||
lastProcessingError = now
|
||||
|
||||
logger.Warn().Err(err).
|
||||
Int("processing_errors", processingErrors).
|
||||
Dur("processing_time", processingTime).
|
||||
Msg("Failed to process input message")
|
||||
// Skip logging in hotpath for performance
|
||||
|
||||
// If too many processing errors, drop frames more aggressively
|
||||
if processingErrors >= maxProcessingErrors {
|
||||
logger.Error().
|
||||
Int("processing_errors", processingErrors).
|
||||
Msg("Too many processing errors, entering aggressive drop mode")
|
||||
|
||||
// Clear processing queue to recover
|
||||
for len(ais.processChan) > 0 {
|
||||
select {
|
||||
|
|
@ -1042,14 +1055,23 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
|
|||
// Reset error counter on successful processing
|
||||
if processingErrors > 0 {
|
||||
processingErrors = 0
|
||||
logger.Info().Msg("Input processing recovered")
|
||||
// Skip logging in hotpath for performance
|
||||
}
|
||||
|
||||
// Update processing time metrics
|
||||
atomic.StoreInt64(&ais.processingTime, processingTime.Nanoseconds())
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Submit the processor task to the audio processor pool
|
||||
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
|
||||
if !SubmitAudioProcessorTask(processorTask) {
|
||||
// If the pool is full or shutting down, fall back to direct goroutine creation
|
||||
logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation")
|
||||
|
||||
go processorTask()
|
||||
}
|
||||
}
|
||||
|
||||
// processMessageWithRecovery processes a message with enhanced error recovery
|
||||
|
|
@ -1086,10 +1108,12 @@ func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, lo
|
|||
}
|
||||
}
|
||||
|
||||
// startMonitorGoroutine starts the performance monitoring goroutine
|
||||
// startMonitorGoroutine starts the performance monitoring using the goroutine pool
|
||||
func (ais *AudioInputServer) startMonitorGoroutine() {
|
||||
ais.wg.Add(1)
|
||||
go func() {
|
||||
|
||||
// Create a monitor task that will run in the goroutine pool
|
||||
monitorTask := func() {
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
|
|
@ -1166,7 +1190,16 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
|
|||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Submit the monitor task to the audio processor pool
|
||||
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
|
||||
if !SubmitAudioProcessorTask(monitorTask) {
|
||||
// If the pool is full or shutting down, fall back to direct goroutine creation
|
||||
logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation")
|
||||
|
||||
go monitorTask()
|
||||
}
|
||||
}
|
||||
|
||||
// GetServerStats returns server performance statistics
|
||||
|
|
|
|||
|
|
@ -159,8 +159,14 @@ func (s *AudioOutputServer) Start() error {
|
|||
// Start message processor goroutine
|
||||
s.startProcessorGoroutine()
|
||||
|
||||
// Accept connections in a goroutine
|
||||
go s.acceptConnections()
|
||||
// Submit the connection acceptor to the audio reader pool
|
||||
if !SubmitAudioReaderTask(s.acceptConnections) {
|
||||
// If the pool is full or shutting down, fall back to direct goroutine creation
|
||||
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger()
|
||||
logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation")
|
||||
|
||||
go s.acceptConnections()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -199,10 +205,12 @@ func (s *AudioOutputServer) acceptConnections() {
|
|||
}
|
||||
}
|
||||
|
||||
// startProcessorGoroutine starts the message processor
|
||||
// startProcessorGoroutine starts the message processor using the goroutine pool
|
||||
func (s *AudioOutputServer) startProcessorGoroutine() {
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
|
||||
// Create a processor task that will run in the goroutine pool
|
||||
processorTask := func() {
|
||||
defer s.wg.Done()
|
||||
for {
|
||||
select {
|
||||
|
|
@ -218,7 +226,16 @@ func (s *AudioOutputServer) startProcessorGoroutine() {
|
|||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Submit the processor task to the audio processor pool
|
||||
if !SubmitAudioProcessorTask(processorTask) {
|
||||
// If the pool is full or shutting down, fall back to direct goroutine creation
|
||||
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger()
|
||||
logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation")
|
||||
|
||||
go processorTask()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *AudioOutputServer) Stop() {
|
||||
|
|
|
|||
|
|
@ -1,3 +1,6 @@
|
|||
//go:build cgo
|
||||
// +build cgo
|
||||
|
||||
package audio
|
||||
|
||||
import (
|
||||
|
|
@ -308,6 +311,9 @@ func (s *AudioOutputStreamer) ReportLatency(latency time.Duration) {
|
|||
|
||||
// StartAudioOutputStreaming starts audio output streaming (capturing system audio)
|
||||
func StartAudioOutputStreaming(send func([]byte)) error {
|
||||
// Initialize audio monitoring (latency tracking and cache cleanup)
|
||||
InitializeAudioMonitoring()
|
||||
|
||||
if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) {
|
||||
return ErrAudioAlreadyRunning
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,217 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// SizedBufferPool manages a pool of buffers with size tracking
|
||||
type SizedBufferPool struct {
|
||||
// The underlying sync.Pool
|
||||
pool sync.Pool
|
||||
|
||||
// Statistics for monitoring
|
||||
totalBuffers atomic.Int64
|
||||
totalBytes atomic.Int64
|
||||
gets atomic.Int64
|
||||
puts atomic.Int64
|
||||
misses atomic.Int64
|
||||
|
||||
// Configuration
|
||||
maxBufferSize int
|
||||
defaultSize int
|
||||
}
|
||||
|
||||
// NewSizedBufferPool creates a new sized buffer pool
|
||||
func NewSizedBufferPool(defaultSize, maxBufferSize int) *SizedBufferPool {
|
||||
pool := &SizedBufferPool{
|
||||
maxBufferSize: maxBufferSize,
|
||||
defaultSize: defaultSize,
|
||||
}
|
||||
|
||||
pool.pool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
// Track pool misses
|
||||
pool.misses.Add(1)
|
||||
|
||||
// Create new buffer with default size
|
||||
buf := make([]byte, defaultSize)
|
||||
|
||||
// Return pointer-like to avoid allocations
|
||||
slice := buf[:0]
|
||||
ptrSlice := &slice
|
||||
|
||||
// Track statistics
|
||||
pool.totalBuffers.Add(1)
|
||||
pool.totalBytes.Add(int64(cap(buf)))
|
||||
|
||||
return ptrSlice
|
||||
},
|
||||
}
|
||||
|
||||
return pool
|
||||
}
|
||||
|
||||
// Get returns a buffer from the pool with at least the specified capacity
|
||||
func (p *SizedBufferPool) Get(minCapacity int) []byte {
|
||||
// Track gets
|
||||
p.gets.Add(1)
|
||||
|
||||
// Get buffer from pool - handle pointer-like storage
|
||||
var buf []byte
|
||||
poolObj := p.pool.Get()
|
||||
switch v := poolObj.(type) {
|
||||
case *[]byte:
|
||||
// Handle pointer-like storage from Put method
|
||||
if v != nil {
|
||||
buf = (*v)[:0] // Get the underlying slice
|
||||
} else {
|
||||
buf = make([]byte, 0, p.defaultSize)
|
||||
}
|
||||
case []byte:
|
||||
// Handle direct slice for backward compatibility
|
||||
buf = v
|
||||
default:
|
||||
// Fallback for unexpected types
|
||||
buf = make([]byte, 0, p.defaultSize)
|
||||
p.misses.Add(1)
|
||||
}
|
||||
|
||||
// Check if buffer has sufficient capacity
|
||||
if cap(buf) < minCapacity {
|
||||
// Track statistics for the old buffer
|
||||
p.totalBytes.Add(-int64(cap(buf)))
|
||||
|
||||
// Allocate new buffer with required capacity
|
||||
buf = make([]byte, minCapacity)
|
||||
|
||||
// Track statistics for the new buffer
|
||||
p.totalBytes.Add(int64(cap(buf)))
|
||||
} else {
|
||||
// Resize existing buffer
|
||||
buf = buf[:minCapacity]
|
||||
}
|
||||
|
||||
return buf
|
||||
}
|
||||
|
||||
// Put returns a buffer to the pool
|
||||
func (p *SizedBufferPool) Put(buf []byte) {
|
||||
// Track statistics
|
||||
p.puts.Add(1)
|
||||
|
||||
// Don't pool excessively large buffers to prevent memory bloat
|
||||
if cap(buf) > p.maxBufferSize {
|
||||
// Track statistics
|
||||
p.totalBuffers.Add(-1)
|
||||
p.totalBytes.Add(-int64(cap(buf)))
|
||||
return
|
||||
}
|
||||
|
||||
// Clear buffer contents for security
|
||||
for i := range buf {
|
||||
buf[i] = 0
|
||||
}
|
||||
|
||||
// Return to pool - use pointer-like approach to avoid allocations
|
||||
slice := buf[:0]
|
||||
p.pool.Put(&slice)
|
||||
}
|
||||
|
||||
// GetStats returns statistics about the buffer pool
|
||||
func (p *SizedBufferPool) GetStats() (buffers, bytes, gets, puts, misses int64) {
|
||||
buffers = p.totalBuffers.Load()
|
||||
bytes = p.totalBytes.Load()
|
||||
gets = p.gets.Load()
|
||||
puts = p.puts.Load()
|
||||
misses = p.misses.Load()
|
||||
return
|
||||
}
|
||||
|
||||
// BufferPoolStats contains statistics about a buffer pool
|
||||
type BufferPoolStats struct {
|
||||
TotalBuffers int64
|
||||
TotalBytes int64
|
||||
Gets int64
|
||||
Puts int64
|
||||
Misses int64
|
||||
HitRate float64
|
||||
AverageBufferSize float64
|
||||
}
|
||||
|
||||
// GetDetailedStats returns detailed statistics about the buffer pool
|
||||
func (p *SizedBufferPool) GetDetailedStats() BufferPoolStats {
|
||||
buffers := p.totalBuffers.Load()
|
||||
bytes := p.totalBytes.Load()
|
||||
gets := p.gets.Load()
|
||||
puts := p.puts.Load()
|
||||
misses := p.misses.Load()
|
||||
|
||||
// Calculate hit rate
|
||||
hitRate := 0.0
|
||||
if gets > 0 {
|
||||
hitRate = float64(gets-misses) / float64(gets) * 100.0
|
||||
}
|
||||
|
||||
// Calculate average buffer size
|
||||
avgSize := 0.0
|
||||
if buffers > 0 {
|
||||
avgSize = float64(bytes) / float64(buffers)
|
||||
}
|
||||
|
||||
return BufferPoolStats{
|
||||
TotalBuffers: buffers,
|
||||
TotalBytes: bytes,
|
||||
Gets: gets,
|
||||
Puts: puts,
|
||||
Misses: misses,
|
||||
HitRate: hitRate,
|
||||
AverageBufferSize: avgSize,
|
||||
}
|
||||
}
|
||||
|
||||
// Global audio buffer pools with different size classes
|
||||
var (
|
||||
// Small buffers (up to 4KB)
|
||||
smallBufferPool = NewSizedBufferPool(1024, 4*1024)
|
||||
|
||||
// Medium buffers (4KB to 64KB)
|
||||
mediumBufferPool = NewSizedBufferPool(8*1024, 64*1024)
|
||||
|
||||
// Large buffers (64KB to 1MB)
|
||||
largeBufferPool = NewSizedBufferPool(64*1024, 1024*1024)
|
||||
)
|
||||
|
||||
// GetOptimalBuffer returns a buffer from the most appropriate pool based on size
|
||||
func GetOptimalBuffer(size int) []byte {
|
||||
switch {
|
||||
case size <= 4*1024:
|
||||
return smallBufferPool.Get(size)
|
||||
case size <= 64*1024:
|
||||
return mediumBufferPool.Get(size)
|
||||
default:
|
||||
return largeBufferPool.Get(size)
|
||||
}
|
||||
}
|
||||
|
||||
// ReturnOptimalBuffer returns a buffer to the appropriate pool based on size
|
||||
func ReturnOptimalBuffer(buf []byte) {
|
||||
size := cap(buf)
|
||||
switch {
|
||||
case size <= 4*1024:
|
||||
smallBufferPool.Put(buf)
|
||||
case size <= 64*1024:
|
||||
mediumBufferPool.Put(buf)
|
||||
default:
|
||||
largeBufferPool.Put(buf)
|
||||
}
|
||||
}
|
||||
|
||||
// GetAllPoolStats returns statistics for all buffer pools
|
||||
func GetAllPoolStats() map[string]BufferPoolStats {
|
||||
return map[string]BufferPoolStats{
|
||||
"small": smallBufferPool.GetDetailedStats(),
|
||||
"medium": mediumBufferPool.GetDetailedStats(),
|
||||
"large": largeBufferPool.GetDetailedStats(),
|
||||
}
|
||||
}
|
||||
|
|
@ -58,10 +58,22 @@ func ValidateZeroCopyFrame(frame *ZeroCopyAudioFrame) error {
|
|||
}
|
||||
|
||||
// ValidateBufferSize validates buffer size parameters with enhanced boundary checks
|
||||
// Optimized to use AudioConfigCache for frequently accessed values
|
||||
func ValidateBufferSize(size int) error {
|
||||
if size <= 0 {
|
||||
return fmt.Errorf("%w: buffer size %d must be positive", ErrInvalidBufferSize, size)
|
||||
}
|
||||
|
||||
// Fast path: Check against cached max frame size
|
||||
cache := GetCachedConfig()
|
||||
maxFrameSize := int(cache.maxAudioFrameSize.Load())
|
||||
|
||||
// Most common case: validating a buffer that's sized for audio frames
|
||||
if maxFrameSize > 0 && size <= maxFrameSize {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Slower path: full validation against SocketMaxBuffer
|
||||
config := GetConfig()
|
||||
// Use SocketMaxBuffer as the upper limit for general buffer validation
|
||||
// This allows for socket buffers while still preventing extremely large allocations
|
||||
|
|
@ -199,10 +211,22 @@ func ValidateLatencyConfig(config LatencyConfig) error {
|
|||
}
|
||||
|
||||
// ValidateSampleRate validates audio sample rate values
|
||||
// Optimized to use AudioConfigCache for frequently accessed values
|
||||
func ValidateSampleRate(sampleRate int) error {
|
||||
if sampleRate <= 0 {
|
||||
return fmt.Errorf("%w: sample rate %d must be positive", ErrInvalidSampleRate, sampleRate)
|
||||
}
|
||||
|
||||
// Fast path: Check against cached sample rate first
|
||||
cache := GetCachedConfig()
|
||||
cachedRate := int(cache.sampleRate.Load())
|
||||
|
||||
// Most common case: validating against the current sample rate
|
||||
if sampleRate == cachedRate {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Slower path: check against all valid rates
|
||||
config := GetConfig()
|
||||
validRates := config.ValidSampleRates
|
||||
for _, rate := range validRates {
|
||||
|
|
@ -215,10 +239,23 @@ func ValidateSampleRate(sampleRate int) error {
|
|||
}
|
||||
|
||||
// ValidateChannelCount validates audio channel count
|
||||
// Optimized to use AudioConfigCache for frequently accessed values
|
||||
func ValidateChannelCount(channels int) error {
|
||||
if channels <= 0 {
|
||||
return fmt.Errorf("%w: channel count %d must be positive", ErrInvalidChannels, channels)
|
||||
}
|
||||
|
||||
// Fast path: Check against cached channels first
|
||||
cache := GetCachedConfig()
|
||||
cachedChannels := int(cache.channels.Load())
|
||||
|
||||
// Most common case: validating against the current channel count
|
||||
if channels == cachedChannels {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check against max channels - still using cache to avoid GetConfig()
|
||||
// Note: We don't have maxChannels in the cache yet, so we'll use GetConfig() for now
|
||||
config := GetConfig()
|
||||
if channels > config.MaxChannels {
|
||||
return fmt.Errorf("%w: channel count %d exceeds maximum %d",
|
||||
|
|
@ -228,10 +265,33 @@ func ValidateChannelCount(channels int) error {
|
|||
}
|
||||
|
||||
// ValidateBitrate validates audio bitrate values (expects kbps)
|
||||
// Optimized to use AudioConfigCache for frequently accessed values
|
||||
func ValidateBitrate(bitrate int) error {
|
||||
if bitrate <= 0 {
|
||||
return fmt.Errorf("%w: bitrate %d must be positive", ErrInvalidBitrate, bitrate)
|
||||
}
|
||||
|
||||
// Fast path: Check against cached bitrate values
|
||||
cache := GetCachedConfig()
|
||||
minBitrate := int(cache.minOpusBitrate.Load())
|
||||
maxBitrate := int(cache.maxOpusBitrate.Load())
|
||||
|
||||
// If we have valid cached values, use them
|
||||
if minBitrate > 0 && maxBitrate > 0 {
|
||||
// Convert kbps to bps for comparison with config limits
|
||||
bitrateInBps := bitrate * 1000
|
||||
if bitrateInBps < minBitrate {
|
||||
return fmt.Errorf("%w: bitrate %d kbps (%d bps) below minimum %d bps",
|
||||
ErrInvalidBitrate, bitrate, bitrateInBps, minBitrate)
|
||||
}
|
||||
if bitrateInBps > maxBitrate {
|
||||
return fmt.Errorf("%w: bitrate %d kbps (%d bps) exceeds maximum %d bps",
|
||||
ErrInvalidBitrate, bitrate, bitrateInBps, maxBitrate)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Slower path: full validation with GetConfig()
|
||||
config := GetConfig()
|
||||
// Convert kbps to bps for comparison with config limits
|
||||
bitrateInBps := bitrate * 1000
|
||||
|
|
@ -247,10 +307,31 @@ func ValidateBitrate(bitrate int) error {
|
|||
}
|
||||
|
||||
// ValidateFrameDuration validates frame duration values
|
||||
// Optimized to use AudioConfigCache for frequently accessed values
|
||||
func ValidateFrameDuration(duration time.Duration) error {
|
||||
if duration <= 0 {
|
||||
return fmt.Errorf("%w: frame duration %v must be positive", ErrInvalidFrameDuration, duration)
|
||||
}
|
||||
|
||||
// Fast path: Check against cached frame size first
|
||||
cache := GetCachedConfig()
|
||||
|
||||
// Convert frameSize (samples) to duration for comparison
|
||||
// Note: This calculation should match how frameSize is converted to duration elsewhere
|
||||
cachedFrameSize := int(cache.frameSize.Load())
|
||||
cachedSampleRate := int(cache.sampleRate.Load())
|
||||
|
||||
// Only do this calculation if we have valid cached values
|
||||
if cachedFrameSize > 0 && cachedSampleRate > 0 {
|
||||
cachedDuration := time.Duration(cachedFrameSize) * time.Second / time.Duration(cachedSampleRate)
|
||||
|
||||
// Most common case: validating against the current frame duration
|
||||
if duration == cachedDuration {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Slower path: full validation against min/max
|
||||
config := GetConfig()
|
||||
if duration < config.MinFrameDuration {
|
||||
return fmt.Errorf("%w: frame duration %v below minimum %v",
|
||||
|
|
@ -264,7 +345,29 @@ func ValidateFrameDuration(duration time.Duration) error {
|
|||
}
|
||||
|
||||
// ValidateAudioConfigComplete performs comprehensive audio configuration validation
|
||||
// Uses optimized validation functions that leverage AudioConfigCache
|
||||
func ValidateAudioConfigComplete(config AudioConfig) error {
|
||||
// Fast path: Check if all values match the current cached configuration
|
||||
cache := GetCachedConfig()
|
||||
cachedSampleRate := int(cache.sampleRate.Load())
|
||||
cachedChannels := int(cache.channels.Load())
|
||||
cachedBitrate := int(cache.opusBitrate.Load()) / 1000 // Convert from bps to kbps
|
||||
cachedFrameSize := int(cache.frameSize.Load())
|
||||
|
||||
// Only do this calculation if we have valid cached values
|
||||
if cachedSampleRate > 0 && cachedChannels > 0 && cachedBitrate > 0 && cachedFrameSize > 0 {
|
||||
cachedDuration := time.Duration(cachedFrameSize) * time.Second / time.Duration(cachedSampleRate)
|
||||
|
||||
// Most common case: validating the current configuration
|
||||
if config.SampleRate == cachedSampleRate &&
|
||||
config.Channels == cachedChannels &&
|
||||
config.Bitrate == cachedBitrate &&
|
||||
config.FrameSize == cachedDuration {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Slower path: validate each parameter individually
|
||||
if err := ValidateAudioQuality(config.Quality); err != nil {
|
||||
return fmt.Errorf("quality validation failed: %w", err)
|
||||
}
|
||||
|
|
@ -303,36 +406,55 @@ func ValidateAudioConfigConstants(config *AudioConfigConstants) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Cached max frame size to avoid function call overhead in hot paths
|
||||
var cachedMaxFrameSize int
|
||||
// Note: We're transitioning from individual cached values to using AudioConfigCache
|
||||
// for better consistency and reduced maintenance overhead
|
||||
|
||||
// Note: Validation cache is initialized on first use to avoid init function
|
||||
// Global variable for backward compatibility
|
||||
var cachedMaxFrameSize int
|
||||
|
||||
// InitValidationCache initializes cached validation values with actual config
|
||||
func InitValidationCache() {
|
||||
cachedMaxFrameSize = GetConfig().MaxAudioFrameSize
|
||||
// Initialize the global cache variable for backward compatibility
|
||||
config := GetConfig()
|
||||
cachedMaxFrameSize = config.MaxAudioFrameSize
|
||||
|
||||
// Update the global audio config cache
|
||||
GetCachedConfig().Update()
|
||||
}
|
||||
|
||||
// ValidateAudioFrame provides optimized validation for audio frame data
|
||||
// This is the primary validation function used in all audio processing paths
|
||||
//
|
||||
// Performance optimizations:
|
||||
// - Uses cached config value to eliminate function call overhead
|
||||
// - Uses cached max frame size to eliminate config lookups
|
||||
// - Single branch condition for optimal CPU pipeline efficiency
|
||||
// - Inlined length checks for minimal overhead
|
||||
// - Minimal error allocation overhead
|
||||
//
|
||||
//go:inline
|
||||
func ValidateAudioFrame(data []byte) error {
|
||||
// Initialize cache on first use if not already done
|
||||
if cachedMaxFrameSize == 0 {
|
||||
InitValidationCache()
|
||||
}
|
||||
// Optimized validation with pre-allocated error messages for minimal overhead
|
||||
// Fast path: check length against cached max size in single operation
|
||||
dataLen := len(data)
|
||||
if dataLen == 0 {
|
||||
return ErrFrameDataEmpty
|
||||
}
|
||||
if dataLen > cachedMaxFrameSize {
|
||||
|
||||
// Use global cached value for fastest access - updated during initialization
|
||||
maxSize := cachedMaxFrameSize
|
||||
if maxSize == 0 {
|
||||
// Fallback: get from cache only if global cache not initialized
|
||||
cache := GetCachedConfig()
|
||||
maxSize = int(cache.maxAudioFrameSize.Load())
|
||||
if maxSize == 0 {
|
||||
// Last resort: update cache and get fresh value
|
||||
cache.Update()
|
||||
maxSize = int(cache.maxAudioFrameSize.Load())
|
||||
}
|
||||
// Cache the value globally for next calls
|
||||
cachedMaxFrameSize = maxSize
|
||||
}
|
||||
|
||||
// Single comparison for validation
|
||||
if dataLen > maxSize {
|
||||
return ErrFrameDataTooLarge
|
||||
}
|
||||
return nil
|
||||
|
|
|
|||
12
main.go
12
main.go
|
|
@ -38,6 +38,14 @@ func startAudioSubprocess() error {
|
|||
// Start adaptive buffer management for optimal performance
|
||||
audio.StartAdaptiveBuffering()
|
||||
|
||||
// Start goroutine monitoring to detect and prevent leaks
|
||||
audio.StartGoroutineMonitoring()
|
||||
|
||||
// Enable batch audio processing to reduce CGO call overhead
|
||||
if err := audio.EnableBatchAudioProcessing(); err != nil {
|
||||
logger.Warn().Err(err).Msg("failed to enable batch audio processing")
|
||||
}
|
||||
|
||||
// Create audio server supervisor
|
||||
audioSupervisor = audio.NewAudioOutputSupervisor()
|
||||
|
||||
|
|
@ -95,6 +103,10 @@ func startAudioSubprocess() error {
|
|||
audio.StopAudioRelay()
|
||||
// Stop adaptive buffering
|
||||
audio.StopAdaptiveBuffering()
|
||||
// Stop goroutine monitoring
|
||||
audio.StopGoroutineMonitoring()
|
||||
// Disable batch audio processing
|
||||
audio.DisableBatchAudioProcessing()
|
||||
},
|
||||
// onRestart
|
||||
func(attempt int, delay time.Duration) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue