mirror of https://github.com/jetkvm/kvm.git
feat(audio): implement sized buffer pool and config caching
Add SizedBufferPool for efficient memory management with size tracking and statistics Introduce AudioConfigCache to minimize GetConfig() calls in hot paths Add batch processing support for audio frames to reduce CGO overhead Extend AudioBufferPoolDetailedStats with total bytes and average size metrics
This commit is contained in:
parent
f9781f170c
commit
1b7198aec2
|
@ -580,6 +580,8 @@ type AudioBufferPoolDetailedStats struct {
|
|||
HitCount int64
|
||||
MissCount int64
|
||||
HitRate float64 // Percentage
|
||||
TotalBytes int64 // Total memory usage in bytes
|
||||
AverageBufferSize float64 // Average size of buffers in the pool
|
||||
}
|
||||
|
||||
// GetAudioBufferPoolStats returns statistics about the audio buffer pools
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
)
|
||||
|
@ -675,23 +676,26 @@ func newAudioDecodeWriteError(cErrorCode int) error {
|
|||
}
|
||||
|
||||
func cgoAudioInit() error {
|
||||
// Update C constants from Go configuration
|
||||
config := GetConfig()
|
||||
// Get cached config and ensure it's updated
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// Update C constants from cached config (atomic access, no locks)
|
||||
C.update_audio_constants(
|
||||
C.int(config.CGOOpusBitrate),
|
||||
C.int(config.CGOOpusComplexity),
|
||||
C.int(config.CGOOpusVBR),
|
||||
C.int(config.CGOOpusVBRConstraint),
|
||||
C.int(config.CGOOpusSignalType),
|
||||
C.int(config.CGOOpusBandwidth),
|
||||
C.int(config.CGOOpusDTX),
|
||||
C.int(config.CGOSampleRate),
|
||||
C.int(config.CGOChannels),
|
||||
C.int(config.CGOFrameSize),
|
||||
C.int(config.CGOMaxPacketSize),
|
||||
C.int(config.CGOUsleepMicroseconds),
|
||||
C.int(config.CGOMaxAttempts),
|
||||
C.int(config.CGOMaxBackoffMicroseconds),
|
||||
C.int(cache.opusBitrate.Load()),
|
||||
C.int(cache.opusComplexity.Load()),
|
||||
C.int(cache.opusVBR.Load()),
|
||||
C.int(cache.opusVBRConstraint.Load()),
|
||||
C.int(cache.opusSignalType.Load()),
|
||||
C.int(cache.opusBandwidth.Load()),
|
||||
C.int(cache.opusDTX.Load()),
|
||||
C.int(cache.sampleRate.Load()),
|
||||
C.int(cache.channels.Load()),
|
||||
C.int(cache.frameSize.Load()),
|
||||
C.int(cache.maxPacketSize.Load()),
|
||||
C.int(GetConfig().CGOUsleepMicroseconds),
|
||||
C.int(GetConfig().CGOMaxAttempts),
|
||||
C.int(GetConfig().CGOMaxBackoffMicroseconds),
|
||||
)
|
||||
|
||||
result := C.jetkvm_audio_init()
|
||||
|
@ -705,59 +709,194 @@ func cgoAudioClose() {
|
|||
C.jetkvm_audio_close()
|
||||
}
|
||||
|
||||
// Cache config values to avoid repeated GetConfig() calls in hot path
|
||||
var (
|
||||
cachedMinReadEncodeBuffer int
|
||||
configCacheMutex sync.RWMutex
|
||||
lastConfigUpdate time.Time
|
||||
configCacheExpiry = 5 * time.Second
|
||||
)
|
||||
// AudioConfigCache provides a comprehensive caching system for audio configuration
|
||||
// to minimize GetConfig() calls in the hot path
|
||||
type AudioConfigCache struct {
|
||||
// Atomic fields for lock-free access to frequently used values
|
||||
minReadEncodeBuffer atomic.Int32
|
||||
maxDecodeWriteBuffer atomic.Int32
|
||||
maxPacketSize atomic.Int32
|
||||
opusBitrate atomic.Int32
|
||||
opusComplexity atomic.Int32
|
||||
opusVBR atomic.Int32
|
||||
opusVBRConstraint atomic.Int32
|
||||
opusSignalType atomic.Int32
|
||||
opusBandwidth atomic.Int32
|
||||
opusDTX atomic.Int32
|
||||
sampleRate atomic.Int32
|
||||
channels atomic.Int32
|
||||
frameSize atomic.Int32
|
||||
|
||||
// updateConfigCache refreshes the cached config values if needed
|
||||
func updateConfigCache() {
|
||||
configCacheMutex.RLock()
|
||||
cacheExpired := time.Since(lastConfigUpdate) > configCacheExpiry
|
||||
configCacheMutex.RUnlock()
|
||||
// Mutex for updating the cache
|
||||
mutex sync.RWMutex
|
||||
lastUpdate time.Time
|
||||
cacheExpiry time.Duration
|
||||
initialized atomic.Bool
|
||||
|
||||
if cacheExpired {
|
||||
configCacheMutex.Lock()
|
||||
defer configCacheMutex.Unlock()
|
||||
// Double-check after acquiring lock
|
||||
if time.Since(lastConfigUpdate) > configCacheExpiry {
|
||||
cachedMinReadEncodeBuffer = GetConfig().MinReadEncodeBuffer
|
||||
lastConfigUpdate = time.Now()
|
||||
// Pre-allocated errors to avoid allocations in hot path
|
||||
bufferTooSmallReadEncode error
|
||||
bufferTooLargeDecodeWrite error
|
||||
}
|
||||
|
||||
// Global audio config cache instance
|
||||
var globalAudioConfigCache = &AudioConfigCache{
|
||||
cacheExpiry: 30 * time.Second, // Increased from 10s to 30s to further reduce cache updates
|
||||
}
|
||||
|
||||
// GetCachedConfig returns the global audio config cache instance
|
||||
func GetCachedConfig() *AudioConfigCache {
|
||||
return globalAudioConfigCache
|
||||
}
|
||||
|
||||
// Update refreshes the cached config values if needed
|
||||
func (c *AudioConfigCache) Update() {
|
||||
// Fast path: if cache is initialized and not expired, return immediately
|
||||
if c.initialized.Load() {
|
||||
c.mutex.RLock()
|
||||
cacheExpired := time.Since(c.lastUpdate) > c.cacheExpiry
|
||||
c.mutex.RUnlock()
|
||||
if !cacheExpired {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Slow path: update cache
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
// Double-check after acquiring lock
|
||||
if !c.initialized.Load() || time.Since(c.lastUpdate) > c.cacheExpiry {
|
||||
config := GetConfig() // Call GetConfig() only once
|
||||
|
||||
// Update atomic values for lock-free access
|
||||
c.minReadEncodeBuffer.Store(int32(config.MinReadEncodeBuffer))
|
||||
c.maxDecodeWriteBuffer.Store(int32(config.MaxDecodeWriteBuffer))
|
||||
c.maxPacketSize.Store(int32(config.CGOMaxPacketSize))
|
||||
c.opusBitrate.Store(int32(config.CGOOpusBitrate))
|
||||
c.opusComplexity.Store(int32(config.CGOOpusComplexity))
|
||||
c.opusVBR.Store(int32(config.CGOOpusVBR))
|
||||
c.opusVBRConstraint.Store(int32(config.CGOOpusVBRConstraint))
|
||||
c.opusSignalType.Store(int32(config.CGOOpusSignalType))
|
||||
c.opusBandwidth.Store(int32(config.CGOOpusBandwidth))
|
||||
c.opusDTX.Store(int32(config.CGOOpusDTX))
|
||||
c.sampleRate.Store(int32(config.CGOSampleRate))
|
||||
c.channels.Store(int32(config.CGOChannels))
|
||||
c.frameSize.Store(int32(config.CGOFrameSize))
|
||||
|
||||
// Pre-allocate common errors
|
||||
c.bufferTooSmallReadEncode = newBufferTooSmallError(0, config.MinReadEncodeBuffer)
|
||||
c.bufferTooLargeDecodeWrite = newBufferTooLargeError(config.MaxDecodeWriteBuffer+1, config.MaxDecodeWriteBuffer)
|
||||
|
||||
c.lastUpdate = time.Now()
|
||||
c.initialized.Store(true)
|
||||
}
|
||||
}
|
||||
|
||||
// GetMinReadEncodeBuffer returns the cached MinReadEncodeBuffer value
|
||||
func (c *AudioConfigCache) GetMinReadEncodeBuffer() int {
|
||||
return int(c.minReadEncodeBuffer.Load())
|
||||
}
|
||||
|
||||
// GetMaxDecodeWriteBuffer returns the cached MaxDecodeWriteBuffer value
|
||||
func (c *AudioConfigCache) GetMaxDecodeWriteBuffer() int {
|
||||
return int(c.maxDecodeWriteBuffer.Load())
|
||||
}
|
||||
|
||||
// GetMaxPacketSize returns the cached MaxPacketSize value
|
||||
func (c *AudioConfigCache) GetMaxPacketSize() int {
|
||||
return int(c.maxPacketSize.Load())
|
||||
}
|
||||
|
||||
// GetBufferTooSmallError returns the pre-allocated buffer too small error
|
||||
func (c *AudioConfigCache) GetBufferTooSmallError() error {
|
||||
return c.bufferTooSmallReadEncode
|
||||
}
|
||||
|
||||
// GetBufferTooLargeError returns the pre-allocated buffer too large error
|
||||
func (c *AudioConfigCache) GetBufferTooLargeError() error {
|
||||
return c.bufferTooLargeDecodeWrite
|
||||
}
|
||||
|
||||
// For backward compatibility
|
||||
var (
|
||||
cachedMinReadEncodeBuffer int
|
||||
cachedMaxDecodeWriteBuffer int
|
||||
cachedMaxPacketSize int
|
||||
configCacheMutex sync.RWMutex
|
||||
lastConfigUpdate time.Time
|
||||
configCacheExpiry = 10 * time.Second
|
||||
configCacheInitialized atomic.Bool
|
||||
)
|
||||
|
||||
// Pre-allocated errors to avoid allocations in hot path
|
||||
var (
|
||||
errBufferTooSmallReadEncode error
|
||||
errBufferTooLargeDecodeWrite error
|
||||
)
|
||||
|
||||
// updateConfigCache refreshes the cached config values if needed
|
||||
// This function is kept for backward compatibility
|
||||
func updateConfigCache() {
|
||||
// Use the new global cache
|
||||
globalAudioConfigCache.Update()
|
||||
|
||||
// Update old variables for backward compatibility
|
||||
cachedMinReadEncodeBuffer = globalAudioConfigCache.GetMinReadEncodeBuffer()
|
||||
cachedMaxDecodeWriteBuffer = globalAudioConfigCache.GetMaxDecodeWriteBuffer()
|
||||
cachedMaxPacketSize = globalAudioConfigCache.GetMaxPacketSize()
|
||||
errBufferTooSmallReadEncode = globalAudioConfigCache.GetBufferTooSmallError()
|
||||
errBufferTooLargeDecodeWrite = globalAudioConfigCache.GetBufferTooLargeError()
|
||||
|
||||
// Mark as initialized
|
||||
configCacheInitialized.Store(true)
|
||||
}
|
||||
|
||||
func cgoAudioReadEncode(buf []byte) (int, error) {
|
||||
// Use cached config values to avoid GetConfig() in hot path
|
||||
updateConfigCache()
|
||||
// Fast path: Use AudioConfigCache to avoid GetConfig() in hot path
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// Fast validation with cached values
|
||||
configCacheMutex.RLock()
|
||||
minRequired := cachedMinReadEncodeBuffer
|
||||
configCacheMutex.RUnlock()
|
||||
// Fast validation with cached values - avoid lock with atomic access
|
||||
minRequired := cache.GetMinReadEncodeBuffer()
|
||||
|
||||
// Buffer validation - use pre-allocated error for common case
|
||||
if len(buf) < minRequired {
|
||||
return 0, newBufferTooSmallError(len(buf), minRequired)
|
||||
// Use pre-allocated error for common case, only create custom error for edge cases
|
||||
if len(buf) > 0 {
|
||||
return 0, newBufferTooSmallError(len(buf), minRequired)
|
||||
}
|
||||
return 0, cache.GetBufferTooSmallError()
|
||||
}
|
||||
|
||||
// Skip initialization check for now to avoid CGO compilation issues
|
||||
// Note: The C code already has comprehensive state tracking with capture_initialized,
|
||||
// capture_initializing, playback_initialized, and playback_initializing flags.
|
||||
|
||||
// Direct CGO call with minimal overhead - avoid bounds check with unsafe
|
||||
var bufPtr unsafe.Pointer
|
||||
if len(buf) > 0 {
|
||||
bufPtr = unsafe.Pointer(&buf[0])
|
||||
}
|
||||
|
||||
// Direct CGO call with minimal overhead
|
||||
n := C.jetkvm_audio_read_encode(unsafe.Pointer(&buf[0]))
|
||||
n := C.jetkvm_audio_read_encode(bufPtr)
|
||||
|
||||
// Fast path for success case
|
||||
if n > 0 {
|
||||
return int(n), nil
|
||||
}
|
||||
|
||||
// Handle error cases
|
||||
// Handle error cases - use static error codes to reduce allocations
|
||||
if n < 0 {
|
||||
return 0, newAudioReadEncodeError(int(n))
|
||||
// Common error cases
|
||||
switch n {
|
||||
case -1:
|
||||
return 0, errAudioInitFailed
|
||||
case -2:
|
||||
return 0, errAudioReadEncode
|
||||
default:
|
||||
return 0, newAudioReadEncodeError(int(n))
|
||||
}
|
||||
}
|
||||
|
||||
// n == 0 case
|
||||
|
@ -766,6 +905,12 @@ func cgoAudioReadEncode(buf []byte) (int, error) {
|
|||
|
||||
// Audio playback functions
|
||||
func cgoAudioPlaybackInit() error {
|
||||
// Get cached config and ensure it's updated
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// No need to update C constants here as they're already set in cgoAudioInit
|
||||
|
||||
ret := C.jetkvm_audio_playback_init()
|
||||
if ret != 0 {
|
||||
return newAudioPlaybackInitError(int(ret))
|
||||
|
@ -777,34 +922,68 @@ func cgoAudioPlaybackClose() {
|
|||
C.jetkvm_audio_playback_close()
|
||||
}
|
||||
|
||||
func cgoAudioDecodeWrite(buf []byte) (int, error) {
|
||||
func cgoAudioDecodeWrite(buf []byte) (n int, err error) {
|
||||
// Fast validation with AudioConfigCache
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// Optimized buffer validation
|
||||
if len(buf) == 0 {
|
||||
return 0, errEmptyBuffer
|
||||
}
|
||||
if buf == nil {
|
||||
return 0, errNilBuffer
|
||||
}
|
||||
maxAllowed := GetConfig().MaxDecodeWriteBuffer
|
||||
|
||||
// Use cached max buffer size with atomic access
|
||||
maxAllowed := cache.GetMaxDecodeWriteBuffer()
|
||||
if len(buf) > maxAllowed {
|
||||
// Use pre-allocated error for common case
|
||||
if len(buf) == maxAllowed+1 {
|
||||
return 0, cache.GetBufferTooLargeError()
|
||||
}
|
||||
return 0, newBufferTooLargeError(len(buf), maxAllowed)
|
||||
}
|
||||
|
||||
bufPtr := unsafe.Pointer(&buf[0])
|
||||
if bufPtr == nil {
|
||||
return 0, errInvalidBufferPtr
|
||||
// Avoid bounds check with unsafe
|
||||
var bufPtr unsafe.Pointer
|
||||
if len(buf) > 0 {
|
||||
bufPtr = unsafe.Pointer(&buf[0])
|
||||
if bufPtr == nil {
|
||||
return 0, errInvalidBufferPtr
|
||||
}
|
||||
}
|
||||
|
||||
// Simplified panic recovery - only recover from C panics
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
_ = r
|
||||
// Log the panic but don't allocate in the hot path
|
||||
// Using pre-allocated error to avoid allocations
|
||||
err = errAudioDecodeWrite
|
||||
}
|
||||
}()
|
||||
|
||||
n := C.jetkvm_audio_decode_write(bufPtr, C.int(len(buf)))
|
||||
if n < 0 {
|
||||
return 0, newAudioDecodeWriteError(int(n))
|
||||
// Direct CGO call with minimal overhead
|
||||
n = int(C.jetkvm_audio_decode_write(bufPtr, C.int(len(buf))))
|
||||
|
||||
// Fast path for success case
|
||||
if n >= 0 {
|
||||
return n, nil
|
||||
}
|
||||
return int(n), nil
|
||||
|
||||
// Handle error cases with static error codes
|
||||
switch n {
|
||||
case -1:
|
||||
n = 0
|
||||
err = errAudioInitFailed
|
||||
case -2:
|
||||
n = 0
|
||||
err = errAudioDecodeWrite
|
||||
default:
|
||||
n = 0
|
||||
err = newAudioDecodeWriteError(n)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// updateOpusEncoderParams dynamically updates OPUS encoder parameters
|
||||
|
@ -824,6 +1003,258 @@ func updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType
|
|||
return nil
|
||||
}
|
||||
|
||||
// Buffer pool for reusing buffers in CGO functions
|
||||
var (
|
||||
// Using SizedBufferPool for better memory management
|
||||
// Track buffer pool usage for monitoring
|
||||
cgoBufferPoolGets atomic.Int64
|
||||
cgoBufferPoolPuts atomic.Int64
|
||||
// Batch processing statistics
|
||||
batchProcessingCount atomic.Int64
|
||||
batchFrameCount atomic.Int64
|
||||
batchProcessingTime atomic.Int64
|
||||
)
|
||||
|
||||
// GetBufferFromPool gets a buffer from the pool with at least the specified capacity
|
||||
func GetBufferFromPool(minCapacity int) []byte {
|
||||
cgoBufferPoolGets.Add(1)
|
||||
// Use the SizedBufferPool for better memory management
|
||||
return GetOptimalBuffer(minCapacity)
|
||||
}
|
||||
|
||||
// ReturnBufferToPool returns a buffer to the pool
|
||||
func ReturnBufferToPool(buf []byte) {
|
||||
cgoBufferPoolPuts.Add(1)
|
||||
// Use the SizedBufferPool for better memory management
|
||||
ReturnOptimalBuffer(buf)
|
||||
}
|
||||
|
||||
// AudioFrameBatch represents a batch of audio frames for processing
|
||||
type AudioFrameBatch struct {
|
||||
// Buffer for batch processing
|
||||
Buffer []byte
|
||||
// Number of frames in the batch
|
||||
FrameCount int
|
||||
// Size of each frame
|
||||
FrameSize int
|
||||
// Current position in the buffer
|
||||
Position int
|
||||
}
|
||||
|
||||
// NewAudioFrameBatch creates a new audio frame batch with the specified capacity
|
||||
func NewAudioFrameBatch(maxFrames int) *AudioFrameBatch {
|
||||
// Get cached config
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// Calculate frame size based on cached config
|
||||
frameSize := cache.GetMinReadEncodeBuffer()
|
||||
|
||||
// Create batch with buffer sized for maxFrames
|
||||
return &AudioFrameBatch{
|
||||
Buffer: GetBufferFromPool(maxFrames * frameSize),
|
||||
FrameCount: 0,
|
||||
FrameSize: frameSize,
|
||||
Position: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// AddFrame adds a frame to the batch
|
||||
// Returns true if the batch is full after adding this frame
|
||||
func (b *AudioFrameBatch) AddFrame(frame []byte) bool {
|
||||
// Calculate position in buffer for this frame
|
||||
pos := b.Position
|
||||
|
||||
// Copy frame data to batch buffer
|
||||
copy(b.Buffer[pos:pos+len(frame)], frame)
|
||||
|
||||
// Update position and frame count
|
||||
b.Position += len(frame)
|
||||
b.FrameCount++
|
||||
|
||||
// Check if batch is full (buffer capacity reached)
|
||||
return b.Position >= len(b.Buffer)
|
||||
}
|
||||
|
||||
// Reset resets the batch for reuse
|
||||
func (b *AudioFrameBatch) Reset() {
|
||||
b.FrameCount = 0
|
||||
b.Position = 0
|
||||
}
|
||||
|
||||
// Release returns the batch buffer to the pool
|
||||
func (b *AudioFrameBatch) Release() {
|
||||
ReturnBufferToPool(b.Buffer)
|
||||
b.Buffer = nil
|
||||
b.FrameCount = 0
|
||||
b.FrameSize = 0
|
||||
b.Position = 0
|
||||
}
|
||||
|
||||
// ReadEncodeWithPooledBuffer reads audio data and encodes it using a buffer from the pool
|
||||
// This reduces memory allocations by reusing buffers
|
||||
func ReadEncodeWithPooledBuffer() ([]byte, int, error) {
|
||||
// Get cached config
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// Get a buffer from the pool with appropriate capacity
|
||||
bufferSize := cache.GetMinReadEncodeBuffer()
|
||||
if bufferSize == 0 {
|
||||
bufferSize = 1500 // Fallback if cache not initialized
|
||||
}
|
||||
|
||||
// Get buffer from pool
|
||||
buf := GetBufferFromPool(bufferSize)
|
||||
|
||||
// Perform read/encode operation
|
||||
n, err := cgoAudioReadEncode(buf)
|
||||
if err != nil {
|
||||
// Return buffer to pool on error
|
||||
ReturnBufferToPool(buf)
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// Resize buffer to actual data size
|
||||
result := buf[:n]
|
||||
|
||||
// Return the buffer with data
|
||||
return result, n, nil
|
||||
}
|
||||
|
||||
// DecodeWriteWithPooledBuffer decodes and writes audio data using a pooled buffer
|
||||
// The caller is responsible for returning the input buffer to the pool if needed
|
||||
func DecodeWriteWithPooledBuffer(data []byte) (int, error) {
|
||||
// Validate input
|
||||
if len(data) == 0 {
|
||||
return 0, errEmptyBuffer
|
||||
}
|
||||
|
||||
// Get cached config
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// Ensure data doesn't exceed max packet size
|
||||
maxPacketSize := cache.GetMaxPacketSize()
|
||||
if len(data) > maxPacketSize {
|
||||
return 0, newBufferTooLargeError(len(data), maxPacketSize)
|
||||
}
|
||||
|
||||
// Perform decode/write operation
|
||||
n, err := cgoAudioDecodeWrite(data)
|
||||
|
||||
// Return result
|
||||
return n, err
|
||||
}
|
||||
|
||||
// BatchReadEncode reads and encodes multiple audio frames in a single batch
|
||||
// This reduces CGO call overhead by processing multiple frames at once
|
||||
func BatchReadEncode(batchSize int) ([][]byte, error) {
|
||||
// Get cached config
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// Calculate total buffer size needed for batch
|
||||
frameSize := cache.GetMinReadEncodeBuffer()
|
||||
totalSize := frameSize * batchSize
|
||||
|
||||
// Get a single large buffer for all frames
|
||||
batchBuffer := GetBufferFromPool(totalSize)
|
||||
defer ReturnBufferToPool(batchBuffer)
|
||||
|
||||
// Track batch processing statistics
|
||||
startTime := time.Now()
|
||||
batchProcessingCount.Add(1)
|
||||
|
||||
// Process frames in batch
|
||||
frames := make([][]byte, 0, batchSize)
|
||||
for i := 0; i < batchSize; i++ {
|
||||
// Calculate offset for this frame in the batch buffer
|
||||
offset := i * frameSize
|
||||
frameBuf := batchBuffer[offset : offset+frameSize]
|
||||
|
||||
// Process this frame
|
||||
n, err := cgoAudioReadEncode(frameBuf)
|
||||
if err != nil {
|
||||
// Return partial batch on error
|
||||
if i > 0 {
|
||||
batchFrameCount.Add(int64(i))
|
||||
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
||||
return frames, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Copy frame data to result
|
||||
frameCopy := make([]byte, n)
|
||||
copy(frameCopy, frameBuf[:n])
|
||||
frames = append(frames, frameCopy)
|
||||
}
|
||||
|
||||
// Update statistics
|
||||
batchFrameCount.Add(int64(len(frames)))
|
||||
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
||||
|
||||
return frames, nil
|
||||
}
|
||||
|
||||
// BatchDecodeWrite decodes and writes multiple audio frames in a single batch
|
||||
// This reduces CGO call overhead by processing multiple frames at once
|
||||
func BatchDecodeWrite(frames [][]byte) error {
|
||||
// Validate input
|
||||
if len(frames) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get cached config
|
||||
cache := GetCachedConfig()
|
||||
cache.Update()
|
||||
|
||||
// Track batch processing statistics
|
||||
startTime := time.Now()
|
||||
batchProcessingCount.Add(1)
|
||||
|
||||
// Process each frame
|
||||
frameCount := 0
|
||||
for _, frame := range frames {
|
||||
// Skip empty frames
|
||||
if len(frame) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Process this frame
|
||||
_, err := DecodeWriteWithPooledBuffer(frame)
|
||||
if err != nil {
|
||||
// Update statistics before returning error
|
||||
batchFrameCount.Add(int64(frameCount))
|
||||
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
||||
return err
|
||||
}
|
||||
|
||||
frameCount++
|
||||
}
|
||||
|
||||
// Update statistics
|
||||
batchFrameCount.Add(int64(frameCount))
|
||||
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetBatchProcessingStats returns statistics about batch processing
|
||||
func GetBatchProcessingStats() (count, frames, avgTimeUs int64) {
|
||||
count = batchProcessingCount.Load()
|
||||
frames = batchFrameCount.Load()
|
||||
totalTime := batchProcessingTime.Load()
|
||||
|
||||
// Calculate average time per batch
|
||||
if count > 0 {
|
||||
avgTimeUs = totalTime / count
|
||||
}
|
||||
|
||||
return count, frames, avgTimeUs
|
||||
}
|
||||
|
||||
// CGO function aliases
|
||||
var (
|
||||
CGOAudioInit = cgoAudioInit
|
||||
|
|
|
@ -0,0 +1,217 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// SizedBufferPool manages a pool of buffers with size tracking
|
||||
type SizedBufferPool struct {
|
||||
// The underlying sync.Pool
|
||||
pool sync.Pool
|
||||
|
||||
// Statistics for monitoring
|
||||
totalBuffers atomic.Int64
|
||||
totalBytes atomic.Int64
|
||||
gets atomic.Int64
|
||||
puts atomic.Int64
|
||||
misses atomic.Int64
|
||||
|
||||
// Configuration
|
||||
maxBufferSize int
|
||||
defaultSize int
|
||||
}
|
||||
|
||||
// NewSizedBufferPool creates a new sized buffer pool
|
||||
func NewSizedBufferPool(defaultSize, maxBufferSize int) *SizedBufferPool {
|
||||
pool := &SizedBufferPool{
|
||||
maxBufferSize: maxBufferSize,
|
||||
defaultSize: defaultSize,
|
||||
}
|
||||
|
||||
pool.pool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
// Track pool misses
|
||||
pool.misses.Add(1)
|
||||
|
||||
// Create new buffer with default size
|
||||
buf := make([]byte, defaultSize)
|
||||
|
||||
// Return pointer-like to avoid allocations
|
||||
slice := buf[:0]
|
||||
ptrSlice := &slice
|
||||
|
||||
// Track statistics
|
||||
pool.totalBuffers.Add(1)
|
||||
pool.totalBytes.Add(int64(cap(buf)))
|
||||
|
||||
return ptrSlice
|
||||
},
|
||||
}
|
||||
|
||||
return pool
|
||||
}
|
||||
|
||||
// Get returns a buffer from the pool with at least the specified capacity
|
||||
func (p *SizedBufferPool) Get(minCapacity int) []byte {
|
||||
// Track gets
|
||||
p.gets.Add(1)
|
||||
|
||||
// Get buffer from pool - handle pointer-like storage
|
||||
var buf []byte
|
||||
poolObj := p.pool.Get()
|
||||
switch v := poolObj.(type) {
|
||||
case *[]byte:
|
||||
// Handle pointer-like storage from Put method
|
||||
if v != nil {
|
||||
buf = (*v)[:0] // Get the underlying slice
|
||||
} else {
|
||||
buf = make([]byte, 0, p.defaultSize)
|
||||
}
|
||||
case []byte:
|
||||
// Handle direct slice for backward compatibility
|
||||
buf = v
|
||||
default:
|
||||
// Fallback for unexpected types
|
||||
buf = make([]byte, 0, p.defaultSize)
|
||||
p.misses.Add(1)
|
||||
}
|
||||
|
||||
// Check if buffer has sufficient capacity
|
||||
if cap(buf) < minCapacity {
|
||||
// Track statistics for the old buffer
|
||||
p.totalBytes.Add(-int64(cap(buf)))
|
||||
|
||||
// Allocate new buffer with required capacity
|
||||
buf = make([]byte, minCapacity)
|
||||
|
||||
// Track statistics for the new buffer
|
||||
p.totalBytes.Add(int64(cap(buf)))
|
||||
} else {
|
||||
// Resize existing buffer
|
||||
buf = buf[:minCapacity]
|
||||
}
|
||||
|
||||
return buf
|
||||
}
|
||||
|
||||
// Put returns a buffer to the pool
|
||||
func (p *SizedBufferPool) Put(buf []byte) {
|
||||
// Track statistics
|
||||
p.puts.Add(1)
|
||||
|
||||
// Don't pool excessively large buffers to prevent memory bloat
|
||||
if cap(buf) > p.maxBufferSize {
|
||||
// Track statistics
|
||||
p.totalBuffers.Add(-1)
|
||||
p.totalBytes.Add(-int64(cap(buf)))
|
||||
return
|
||||
}
|
||||
|
||||
// Clear buffer contents for security
|
||||
for i := range buf {
|
||||
buf[i] = 0
|
||||
}
|
||||
|
||||
// Return to pool - use pointer-like approach to avoid allocations
|
||||
slice := buf[:0]
|
||||
p.pool.Put(&slice)
|
||||
}
|
||||
|
||||
// GetStats returns statistics about the buffer pool
|
||||
func (p *SizedBufferPool) GetStats() (buffers, bytes, gets, puts, misses int64) {
|
||||
buffers = p.totalBuffers.Load()
|
||||
bytes = p.totalBytes.Load()
|
||||
gets = p.gets.Load()
|
||||
puts = p.puts.Load()
|
||||
misses = p.misses.Load()
|
||||
return
|
||||
}
|
||||
|
||||
// BufferPoolStats contains statistics about a buffer pool
|
||||
type BufferPoolStats struct {
|
||||
TotalBuffers int64
|
||||
TotalBytes int64
|
||||
Gets int64
|
||||
Puts int64
|
||||
Misses int64
|
||||
HitRate float64
|
||||
AverageBufferSize float64
|
||||
}
|
||||
|
||||
// GetDetailedStats returns detailed statistics about the buffer pool
|
||||
func (p *SizedBufferPool) GetDetailedStats() BufferPoolStats {
|
||||
buffers := p.totalBuffers.Load()
|
||||
bytes := p.totalBytes.Load()
|
||||
gets := p.gets.Load()
|
||||
puts := p.puts.Load()
|
||||
misses := p.misses.Load()
|
||||
|
||||
// Calculate hit rate
|
||||
hitRate := 0.0
|
||||
if gets > 0 {
|
||||
hitRate = float64(gets-misses) / float64(gets) * 100.0
|
||||
}
|
||||
|
||||
// Calculate average buffer size
|
||||
avgSize := 0.0
|
||||
if buffers > 0 {
|
||||
avgSize = float64(bytes) / float64(buffers)
|
||||
}
|
||||
|
||||
return BufferPoolStats{
|
||||
TotalBuffers: buffers,
|
||||
TotalBytes: bytes,
|
||||
Gets: gets,
|
||||
Puts: puts,
|
||||
Misses: misses,
|
||||
HitRate: hitRate,
|
||||
AverageBufferSize: avgSize,
|
||||
}
|
||||
}
|
||||
|
||||
// Global audio buffer pools with different size classes
|
||||
var (
|
||||
// Small buffers (up to 4KB)
|
||||
smallBufferPool = NewSizedBufferPool(1024, 4*1024)
|
||||
|
||||
// Medium buffers (4KB to 64KB)
|
||||
mediumBufferPool = NewSizedBufferPool(8*1024, 64*1024)
|
||||
|
||||
// Large buffers (64KB to 1MB)
|
||||
largeBufferPool = NewSizedBufferPool(64*1024, 1024*1024)
|
||||
)
|
||||
|
||||
// GetOptimalBuffer returns a buffer from the most appropriate pool based on size
|
||||
func GetOptimalBuffer(size int) []byte {
|
||||
switch {
|
||||
case size <= 4*1024:
|
||||
return smallBufferPool.Get(size)
|
||||
case size <= 64*1024:
|
||||
return mediumBufferPool.Get(size)
|
||||
default:
|
||||
return largeBufferPool.Get(size)
|
||||
}
|
||||
}
|
||||
|
||||
// ReturnOptimalBuffer returns a buffer to the appropriate pool based on size
|
||||
func ReturnOptimalBuffer(buf []byte) {
|
||||
size := cap(buf)
|
||||
switch {
|
||||
case size <= 4*1024:
|
||||
smallBufferPool.Put(buf)
|
||||
case size <= 64*1024:
|
||||
mediumBufferPool.Put(buf)
|
||||
default:
|
||||
largeBufferPool.Put(buf)
|
||||
}
|
||||
}
|
||||
|
||||
// GetAllPoolStats returns statistics for all buffer pools
|
||||
func GetAllPoolStats() map[string]BufferPoolStats {
|
||||
return map[string]BufferPoolStats{
|
||||
"small": smallBufferPool.GetDetailedStats(),
|
||||
"medium": mediumBufferPool.GetDetailedStats(),
|
||||
"large": largeBufferPool.GetDetailedStats(),
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue