From 5353c1cab26ed11900b6b4a895e21a4594c70f85 Mon Sep 17 00:00:00 2001 From: Alex P Date: Wed, 3 Sep 2025 16:28:25 +0000 Subject: [PATCH] perf(audio): optimize opus decode-write with separate buffers - Add PCM buffer pool and config for optimized decode-write operations - Implement separate buffer handling in CGO audio processing - Update batch processor to support both legacy and optimized paths --- internal/audio/batch_audio.go | 92 ++++++++++++++++++++++++++++-- internal/audio/cgo_audio.go | 89 +++++++++++++++++++++++++++-- internal/audio/cgo_audio_stub.go | 8 ++- internal/audio/config_constants.go | 2 + internal/audio/input_ipc.go | 12 +++- 5 files changed, 189 insertions(+), 14 deletions(-) diff --git a/internal/audio/batch_audio.go b/internal/audio/batch_audio.go index e50de5c2..13392232 100644 --- a/internal/audio/batch_audio.go +++ b/internal/audio/batch_audio.go @@ -4,6 +4,7 @@ package audio import ( "context" + "fmt" "runtime" "sync" "sync/atomic" @@ -67,7 +68,9 @@ type batchReadResult struct { } type batchWriteRequest struct { - buffer []byte + buffer []byte // Buffer for backward compatibility + opusData []byte // Opus encoded data for decode-write operations + pcmBuffer []byte // PCM buffer for decode-write operations resultChan chan batchWriteResult timestamp time.Time } @@ -207,6 +210,7 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { } // BatchDecodeWrite performs batched audio decode and write operations +// This is the legacy version that uses a single buffer func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { // Get cached config to avoid GetConfig() calls in hot path cache := GetCachedConfig() @@ -222,7 +226,7 @@ func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { // Fallback to single operation if batch processor is not running atomic.AddInt64(&bap.stats.SingleWrites, 1) atomic.AddInt64(&bap.stats.WriteFrames, 1) - return CGOAudioDecodeWrite(buffer) + return CGOAudioDecodeWriteLegacy(buffer) } resultChan := make(chan batchWriteResult, 1) @@ -240,7 +244,7 @@ func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { // Queue is full, fall back to single operation atomic.AddInt64(&bap.stats.SingleWrites, 1) atomic.AddInt64(&bap.stats.WriteFrames, 1) - return CGOAudioDecodeWrite(buffer) + return CGOAudioDecodeWriteLegacy(buffer) } // Wait for result with timeout @@ -250,7 +254,61 @@ func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { case <-time.After(cache.BatchProcessingTimeout): atomic.AddInt64(&bap.stats.SingleWrites, 1) atomic.AddInt64(&bap.stats.WriteFrames, 1) - return CGOAudioDecodeWrite(buffer) + return CGOAudioDecodeWriteLegacy(buffer) + } +} + +// BatchDecodeWriteWithBuffers performs batched audio decode and write operations with separate opus and PCM buffers +func (bap *BatchAudioProcessor) BatchDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) { + // Get cached config to avoid GetConfig() calls in hot path + cache := GetCachedConfig() + cache.Update() + + // Validate buffers before processing + if len(opusData) == 0 { + return 0, fmt.Errorf("empty opus data buffer") + } + if len(pcmBuffer) == 0 { + return 0, fmt.Errorf("empty PCM buffer") + } + + if !bap.IsRunning() { + // Fallback to single operation if batch processor is not running + atomic.AddInt64(&bap.stats.SingleWrites, 1) + atomic.AddInt64(&bap.stats.WriteFrames, 1) + // Use the optimized function with separate buffers + return CGOAudioDecodeWrite(opusData, pcmBuffer) + } + + resultChan := make(chan batchWriteResult, 1) + request := batchWriteRequest{ + opusData: opusData, + pcmBuffer: pcmBuffer, + resultChan: resultChan, + timestamp: time.Now(), + } + + // Try to queue the request with non-blocking send + select { + case bap.writeQueue <- request: + // Successfully queued + default: + // Queue is full, fall back to single operation + atomic.AddInt64(&bap.stats.SingleWrites, 1) + atomic.AddInt64(&bap.stats.WriteFrames, 1) + // Use the optimized function with separate buffers + return CGOAudioDecodeWrite(opusData, pcmBuffer) + } + + // Wait for result with timeout + select { + case result := <-resultChan: + return result.length, result.err + case <-time.After(cache.BatchProcessingTimeout): + atomic.AddInt64(&bap.stats.SingleWrites, 1) + atomic.AddInt64(&bap.stats.WriteFrames, 1) + // Use the optimized function with separate buffers + return CGOAudioDecodeWrite(opusData, pcmBuffer) } } @@ -437,7 +495,18 @@ func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) { // Process each request in the batch for _, req := range batch { - length, err := CGOAudioDecodeWrite(req.buffer) + var length int + var err error + + // Handle both legacy and new decode-write operations + if req.opusData != nil && req.pcmBuffer != nil { + // New style with separate opus data and PCM buffer + length, err = CGOAudioDecodeWrite(req.opusData, req.pcmBuffer) + } else { + // Legacy style with single buffer + length, err = CGOAudioDecodeWriteLegacy(req.buffer) + } + result := batchWriteResult{ length: length, err: err, @@ -543,8 +612,19 @@ func BatchCGOAudioDecodeWrite(buffer []byte) (int, error) { processor := GetBatchAudioProcessor() if processor == nil || !processor.IsRunning() { // Fall back to non-batched version if processor is not running - return CGOAudioDecodeWrite(buffer) + return CGOAudioDecodeWriteLegacy(buffer) } return processor.BatchDecodeWrite(buffer) } + +// BatchCGOAudioDecodeWriteWithBuffers is a batched version of CGOAudioDecodeWrite that uses separate opus and PCM buffers +func BatchCGOAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) { + processor := GetBatchAudioProcessor() + if processor == nil || !processor.IsRunning() { + // Fall back to non-batched version if processor is not running + return CGOAudioDecodeWrite(opusData, pcmBuffer) + } + + return processor.BatchDecodeWriteWithBuffers(opusData, pcmBuffer) +} diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index 3d512434..fcd91b27 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -716,6 +716,7 @@ type AudioConfigCache struct { minReadEncodeBuffer atomic.Int32 maxDecodeWriteBuffer atomic.Int32 maxPacketSize atomic.Int32 + maxPCMBufferSize atomic.Int32 opusBitrate atomic.Int32 opusComplexity atomic.Int32 opusVBR atomic.Int32 @@ -787,6 +788,7 @@ func (c *AudioConfigCache) Update() { c.minReadEncodeBuffer.Store(int32(config.MinReadEncodeBuffer)) c.maxDecodeWriteBuffer.Store(int32(config.MaxDecodeWriteBuffer)) c.maxPacketSize.Store(int32(config.CGOMaxPacketSize)) + c.maxPCMBufferSize.Store(int32(config.MaxPCMBufferSize)) c.opusBitrate.Store(int32(config.CGOOpusBitrate)) c.opusComplexity.Store(int32(config.CGOOpusComplexity)) c.opusVBR.Store(int32(config.CGOOpusVBR)) @@ -842,6 +844,11 @@ func (c *AudioConfigCache) GetMaxPacketSize() int { return int(c.maxPacketSize.Load()) } +// GetMaxPCMBufferSize returns the cached MaxPCMBufferSize value +func (c *AudioConfigCache) GetMaxPCMBufferSize() int { + return int(c.maxPCMBufferSize.Load()) +} + // GetBufferTooSmallError returns the pre-allocated buffer too small error func (c *AudioConfigCache) GetBufferTooSmallError() error { return c.bufferTooSmallReadEncode @@ -1179,8 +1186,12 @@ func DecodeWriteWithPooledBuffer(data []byte) (int, error) { return 0, newBufferTooLargeError(len(data), maxPacketSize) } - // Perform decode/write operation - n, err := cgoAudioDecodeWrite(data) + // Get a PCM buffer from the pool for optimized decode-write + pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize()) + defer ReturnBufferToPool(pcmBuffer) + + // Perform decode/write operation using optimized implementation + n, err := CGOAudioDecodeWrite(data, pcmBuffer) // Return result return n, err @@ -1253,6 +1264,10 @@ func BatchDecodeWrite(frames [][]byte) error { startTime := time.Now() batchProcessingCount.Add(1) + // Get a PCM buffer from the pool for optimized decode-write + pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize()) + defer ReturnBufferToPool(pcmBuffer) + // Process each frame frameCount := 0 for _, frame := range frames { @@ -1261,8 +1276,8 @@ func BatchDecodeWrite(frames [][]byte) error { continue } - // Process this frame - _, err := DecodeWriteWithPooledBuffer(frame) + // Process this frame using optimized implementation + _, err := CGOAudioDecodeWrite(frame, pcmBuffer) if err != nil { // Update statistics before returning error batchFrameCount.Add(int64(frameCount)) @@ -1294,6 +1309,69 @@ func GetBatchProcessingStats() (count, frames, avgTimeUs int64) { return count, frames, avgTimeUs } +// cgoAudioDecodeWriteWithBuffers decodes opus data and writes to PCM buffer +// This implementation uses separate buffers for opus data and PCM output +func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) { + // Validate input + if len(opusData) == 0 { + return 0, errEmptyBuffer + } + if len(pcmBuffer) == 0 { + return 0, errEmptyBuffer + } + + // Get cached config + cache := GetCachedConfig() + cache.Update() + + // Ensure data doesn't exceed max packet size + maxPacketSize := cache.GetMaxPacketSize() + if len(opusData) > maxPacketSize { + return 0, newBufferTooLargeError(len(opusData), maxPacketSize) + } + + // Avoid bounds check with unsafe + var opusPtr unsafe.Pointer + if len(opusData) > 0 { + opusPtr = unsafe.Pointer(&opusData[0]) + if opusPtr == nil { + return 0, errInvalidBufferPtr + } + } + + // Simplified panic recovery - only recover from C panics + var n int + var err error + defer func() { + if r := recover(); r != nil { + // Using pre-allocated error to avoid allocations + err = errAudioDecodeWrite + } + }() + + // Direct CGO call with minimal overhead + n = int(C.jetkvm_audio_decode_write(opusPtr, C.int(len(opusData)))) + + // Fast path for success case + if n >= 0 { + return n, nil + } + + // Handle error cases with static error codes + switch n { + case -1: + n = 0 + err = errAudioInitFailed + case -2: + n = 0 + err = errAudioDecodeWrite + default: + n = 0 + err = newAudioDecodeWriteError(n) + } + return n, err +} + // CGO function aliases var ( CGOAudioInit = cgoAudioInit @@ -1301,6 +1379,7 @@ var ( CGOAudioReadEncode = cgoAudioReadEncode CGOAudioPlaybackInit = cgoAudioPlaybackInit CGOAudioPlaybackClose = cgoAudioPlaybackClose - CGOAudioDecodeWrite = cgoAudioDecodeWrite + CGOAudioDecodeWriteLegacy = cgoAudioDecodeWrite + CGOAudioDecodeWrite = cgoAudioDecodeWriteWithBuffers CGOUpdateOpusEncoderParams = updateOpusEncoderParams ) diff --git a/internal/audio/cgo_audio_stub.go b/internal/audio/cgo_audio_stub.go index 4ddb24dd..83c22d14 100644 --- a/internal/audio/cgo_audio_stub.go +++ b/internal/audio/cgo_audio_stub.go @@ -30,6 +30,11 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) { return 0, errors.New("audio not available in lint mode") } +// cgoAudioDecodeWriteWithBuffers is a stub implementation for the optimized decode-write function +func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) { + return 0, errors.New("audio not available in lint mode") +} + // Uppercase aliases for external API compatibility var ( @@ -38,5 +43,6 @@ var ( CGOAudioReadEncode = cgoAudioReadEncode CGOAudioPlaybackInit = cgoAudioPlaybackInit CGOAudioPlaybackClose = cgoAudioPlaybackClose - CGOAudioDecodeWrite = cgoAudioDecodeWrite + CGOAudioDecodeWriteLegacy = cgoAudioDecodeWrite + CGOAudioDecodeWrite = cgoAudioDecodeWriteWithBuffers ) diff --git a/internal/audio/config_constants.go b/internal/audio/config_constants.go index 1249e016..ec837dfa 100644 --- a/internal/audio/config_constants.go +++ b/internal/audio/config_constants.go @@ -11,6 +11,7 @@ import ( type AudioConfigConstants struct { // Audio Quality Presets MaxAudioFrameSize int // Maximum audio frame size in bytes (default: 4096) + MaxPCMBufferSize int // Maximum PCM buffer size in bytes for separate buffer optimization // Opus Encoding Parameters OpusBitrate int // Target bitrate for Opus encoding in bps (default: 128000) @@ -1586,6 +1587,7 @@ func DefaultAudioConfig() *AudioConfigConstants { return &AudioConfigConstants{ // Audio Quality Presets MaxAudioFrameSize: 4096, + MaxPCMBufferSize: 8192, // Default PCM buffer size (2x MaxAudioFrameSize for safety) // Opus Encoding Parameters OpusBitrate: 128000, diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go index b4279185..3862dcfa 100644 --- a/internal/audio/input_ipc.go +++ b/internal/audio/input_ipc.go @@ -512,8 +512,16 @@ func (ais *AudioInputServer) processOpusFrame(data []byte) error { return fmt.Errorf("input frame validation failed: %w", err) } - // Process the Opus frame using CGO - _, err := CGOAudioDecodeWrite(data) + // Get cached config for optimal performance + cache := GetCachedConfig() + cache.Update() + + // Get a PCM buffer from the pool for optimized decode-write + pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize()) + defer ReturnBufferToPool(pcmBuffer) + + // Process the Opus frame using optimized CGO implementation with separate buffers + _, err := CGOAudioDecodeWrite(data, pcmBuffer) return err }