perf(audio): optimize opus decode-write with separate buffers

- Add PCM buffer pool and config for optimized decode-write operations
- Implement separate buffer handling in CGO audio processing
- Update batch processor to support both legacy and optimized paths
This commit is contained in:
Alex P 2025-09-03 16:28:25 +00:00
parent 370178e43b
commit 5353c1cab2
5 changed files with 189 additions and 14 deletions

View File

@ -4,6 +4,7 @@ package audio
import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
@ -67,7 +68,9 @@ type batchReadResult struct {
}
type batchWriteRequest struct {
buffer []byte
buffer []byte // Buffer for backward compatibility
opusData []byte // Opus encoded data for decode-write operations
pcmBuffer []byte // PCM buffer for decode-write operations
resultChan chan batchWriteResult
timestamp time.Time
}
@ -207,6 +210,7 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) {
}
// BatchDecodeWrite performs batched audio decode and write operations
// This is the legacy version that uses a single buffer
func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) {
// Get cached config to avoid GetConfig() calls in hot path
cache := GetCachedConfig()
@ -222,7 +226,7 @@ func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) {
// Fallback to single operation if batch processor is not running
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
return CGOAudioDecodeWrite(buffer)
return CGOAudioDecodeWriteLegacy(buffer)
}
resultChan := make(chan batchWriteResult, 1)
@ -240,7 +244,7 @@ func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) {
// Queue is full, fall back to single operation
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
return CGOAudioDecodeWrite(buffer)
return CGOAudioDecodeWriteLegacy(buffer)
}
// Wait for result with timeout
@ -250,7 +254,61 @@ func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) {
case <-time.After(cache.BatchProcessingTimeout):
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
return CGOAudioDecodeWrite(buffer)
return CGOAudioDecodeWriteLegacy(buffer)
}
}
// BatchDecodeWriteWithBuffers performs batched audio decode and write operations with separate opus and PCM buffers
func (bap *BatchAudioProcessor) BatchDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
// Get cached config to avoid GetConfig() calls in hot path
cache := GetCachedConfig()
cache.Update()
// Validate buffers before processing
if len(opusData) == 0 {
return 0, fmt.Errorf("empty opus data buffer")
}
if len(pcmBuffer) == 0 {
return 0, fmt.Errorf("empty PCM buffer")
}
if !bap.IsRunning() {
// Fallback to single operation if batch processor is not running
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
// Use the optimized function with separate buffers
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
resultChan := make(chan batchWriteResult, 1)
request := batchWriteRequest{
opusData: opusData,
pcmBuffer: pcmBuffer,
resultChan: resultChan,
timestamp: time.Now(),
}
// Try to queue the request with non-blocking send
select {
case bap.writeQueue <- request:
// Successfully queued
default:
// Queue is full, fall back to single operation
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
// Use the optimized function with separate buffers
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
// Wait for result with timeout
select {
case result := <-resultChan:
return result.length, result.err
case <-time.After(cache.BatchProcessingTimeout):
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
// Use the optimized function with separate buffers
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
}
@ -437,7 +495,18 @@ func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) {
// Process each request in the batch
for _, req := range batch {
length, err := CGOAudioDecodeWrite(req.buffer)
var length int
var err error
// Handle both legacy and new decode-write operations
if req.opusData != nil && req.pcmBuffer != nil {
// New style with separate opus data and PCM buffer
length, err = CGOAudioDecodeWrite(req.opusData, req.pcmBuffer)
} else {
// Legacy style with single buffer
length, err = CGOAudioDecodeWriteLegacy(req.buffer)
}
result := batchWriteResult{
length: length,
err: err,
@ -543,8 +612,19 @@ func BatchCGOAudioDecodeWrite(buffer []byte) (int, error) {
processor := GetBatchAudioProcessor()
if processor == nil || !processor.IsRunning() {
// Fall back to non-batched version if processor is not running
return CGOAudioDecodeWrite(buffer)
return CGOAudioDecodeWriteLegacy(buffer)
}
return processor.BatchDecodeWrite(buffer)
}
// BatchCGOAudioDecodeWriteWithBuffers is a batched version of CGOAudioDecodeWrite that uses separate opus and PCM buffers
func BatchCGOAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
processor := GetBatchAudioProcessor()
if processor == nil || !processor.IsRunning() {
// Fall back to non-batched version if processor is not running
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
return processor.BatchDecodeWriteWithBuffers(opusData, pcmBuffer)
}

View File

@ -716,6 +716,7 @@ type AudioConfigCache struct {
minReadEncodeBuffer atomic.Int32
maxDecodeWriteBuffer atomic.Int32
maxPacketSize atomic.Int32
maxPCMBufferSize atomic.Int32
opusBitrate atomic.Int32
opusComplexity atomic.Int32
opusVBR atomic.Int32
@ -787,6 +788,7 @@ func (c *AudioConfigCache) Update() {
c.minReadEncodeBuffer.Store(int32(config.MinReadEncodeBuffer))
c.maxDecodeWriteBuffer.Store(int32(config.MaxDecodeWriteBuffer))
c.maxPacketSize.Store(int32(config.CGOMaxPacketSize))
c.maxPCMBufferSize.Store(int32(config.MaxPCMBufferSize))
c.opusBitrate.Store(int32(config.CGOOpusBitrate))
c.opusComplexity.Store(int32(config.CGOOpusComplexity))
c.opusVBR.Store(int32(config.CGOOpusVBR))
@ -842,6 +844,11 @@ func (c *AudioConfigCache) GetMaxPacketSize() int {
return int(c.maxPacketSize.Load())
}
// GetMaxPCMBufferSize returns the cached MaxPCMBufferSize value
func (c *AudioConfigCache) GetMaxPCMBufferSize() int {
return int(c.maxPCMBufferSize.Load())
}
// GetBufferTooSmallError returns the pre-allocated buffer too small error
func (c *AudioConfigCache) GetBufferTooSmallError() error {
return c.bufferTooSmallReadEncode
@ -1179,8 +1186,12 @@ func DecodeWriteWithPooledBuffer(data []byte) (int, error) {
return 0, newBufferTooLargeError(len(data), maxPacketSize)
}
// Perform decode/write operation
n, err := cgoAudioDecodeWrite(data)
// Get a PCM buffer from the pool for optimized decode-write
pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize())
defer ReturnBufferToPool(pcmBuffer)
// Perform decode/write operation using optimized implementation
n, err := CGOAudioDecodeWrite(data, pcmBuffer)
// Return result
return n, err
@ -1253,6 +1264,10 @@ func BatchDecodeWrite(frames [][]byte) error {
startTime := time.Now()
batchProcessingCount.Add(1)
// Get a PCM buffer from the pool for optimized decode-write
pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize())
defer ReturnBufferToPool(pcmBuffer)
// Process each frame
frameCount := 0
for _, frame := range frames {
@ -1261,8 +1276,8 @@ func BatchDecodeWrite(frames [][]byte) error {
continue
}
// Process this frame
_, err := DecodeWriteWithPooledBuffer(frame)
// Process this frame using optimized implementation
_, err := CGOAudioDecodeWrite(frame, pcmBuffer)
if err != nil {
// Update statistics before returning error
batchFrameCount.Add(int64(frameCount))
@ -1294,6 +1309,69 @@ func GetBatchProcessingStats() (count, frames, avgTimeUs int64) {
return count, frames, avgTimeUs
}
// cgoAudioDecodeWriteWithBuffers decodes opus data and writes to PCM buffer
// This implementation uses separate buffers for opus data and PCM output
func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
// Validate input
if len(opusData) == 0 {
return 0, errEmptyBuffer
}
if len(pcmBuffer) == 0 {
return 0, errEmptyBuffer
}
// Get cached config
cache := GetCachedConfig()
cache.Update()
// Ensure data doesn't exceed max packet size
maxPacketSize := cache.GetMaxPacketSize()
if len(opusData) > maxPacketSize {
return 0, newBufferTooLargeError(len(opusData), maxPacketSize)
}
// Avoid bounds check with unsafe
var opusPtr unsafe.Pointer
if len(opusData) > 0 {
opusPtr = unsafe.Pointer(&opusData[0])
if opusPtr == nil {
return 0, errInvalidBufferPtr
}
}
// Simplified panic recovery - only recover from C panics
var n int
var err error
defer func() {
if r := recover(); r != nil {
// Using pre-allocated error to avoid allocations
err = errAudioDecodeWrite
}
}()
// Direct CGO call with minimal overhead
n = int(C.jetkvm_audio_decode_write(opusPtr, C.int(len(opusData))))
// Fast path for success case
if n >= 0 {
return n, nil
}
// Handle error cases with static error codes
switch n {
case -1:
n = 0
err = errAudioInitFailed
case -2:
n = 0
err = errAudioDecodeWrite
default:
n = 0
err = newAudioDecodeWriteError(n)
}
return n, err
}
// CGO function aliases
var (
CGOAudioInit = cgoAudioInit
@ -1301,6 +1379,7 @@ var (
CGOAudioReadEncode = cgoAudioReadEncode
CGOAudioPlaybackInit = cgoAudioPlaybackInit
CGOAudioPlaybackClose = cgoAudioPlaybackClose
CGOAudioDecodeWrite = cgoAudioDecodeWrite
CGOAudioDecodeWriteLegacy = cgoAudioDecodeWrite
CGOAudioDecodeWrite = cgoAudioDecodeWriteWithBuffers
CGOUpdateOpusEncoderParams = updateOpusEncoderParams
)

View File

@ -30,6 +30,11 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) {
return 0, errors.New("audio not available in lint mode")
}
// cgoAudioDecodeWriteWithBuffers is a stub implementation for the optimized decode-write function
func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
return 0, errors.New("audio not available in lint mode")
}
// Uppercase aliases for external API compatibility
var (
@ -38,5 +43,6 @@ var (
CGOAudioReadEncode = cgoAudioReadEncode
CGOAudioPlaybackInit = cgoAudioPlaybackInit
CGOAudioPlaybackClose = cgoAudioPlaybackClose
CGOAudioDecodeWrite = cgoAudioDecodeWrite
CGOAudioDecodeWriteLegacy = cgoAudioDecodeWrite
CGOAudioDecodeWrite = cgoAudioDecodeWriteWithBuffers
)

View File

@ -11,6 +11,7 @@ import (
type AudioConfigConstants struct {
// Audio Quality Presets
MaxAudioFrameSize int // Maximum audio frame size in bytes (default: 4096)
MaxPCMBufferSize int // Maximum PCM buffer size in bytes for separate buffer optimization
// Opus Encoding Parameters
OpusBitrate int // Target bitrate for Opus encoding in bps (default: 128000)
@ -1586,6 +1587,7 @@ func DefaultAudioConfig() *AudioConfigConstants {
return &AudioConfigConstants{
// Audio Quality Presets
MaxAudioFrameSize: 4096,
MaxPCMBufferSize: 8192, // Default PCM buffer size (2x MaxAudioFrameSize for safety)
// Opus Encoding Parameters
OpusBitrate: 128000,

View File

@ -512,8 +512,16 @@ func (ais *AudioInputServer) processOpusFrame(data []byte) error {
return fmt.Errorf("input frame validation failed: %w", err)
}
// Process the Opus frame using CGO
_, err := CGOAudioDecodeWrite(data)
// Get cached config for optimal performance
cache := GetCachedConfig()
cache.Update()
// Get a PCM buffer from the pool for optimized decode-write
pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize())
defer ReturnBufferToPool(pcmBuffer)
// Process the Opus frame using optimized CGO implementation with separate buffers
_, err := CGOAudioDecodeWrite(data, pcmBuffer)
return err
}