mirror of https://github.com/jetkvm/kvm.git
perf(audio): optimize hotpath by removing redundant checks and logging
- Skip logging in frame validation to reduce overhead - Only update cache when expired to avoid unnecessary operations - Remove duplicate config caching system and simplify buffer handling - Optimize batch processing with pre-allocated buffers and conditional time tracking
This commit is contained in:
parent
5353c1cab2
commit
a557987629
|
@ -859,44 +859,15 @@ func (c *AudioConfigCache) GetBufferTooLargeError() error {
|
||||||
return c.bufferTooLargeDecodeWrite
|
return c.bufferTooLargeDecodeWrite
|
||||||
}
|
}
|
||||||
|
|
||||||
// For backward compatibility
|
// Removed duplicate config caching system - using AudioConfigCache instead
|
||||||
var (
|
|
||||||
cachedMinReadEncodeBuffer int
|
|
||||||
cachedMaxDecodeWriteBuffer int
|
|
||||||
cachedMaxPacketSize int
|
|
||||||
configCacheMutex sync.RWMutex
|
|
||||||
lastConfigUpdate time.Time
|
|
||||||
configCacheExpiry = 10 * time.Second
|
|
||||||
configCacheInitialized atomic.Bool
|
|
||||||
)
|
|
||||||
|
|
||||||
// Pre-allocated errors to avoid allocations in hot path
|
|
||||||
var (
|
|
||||||
errBufferTooSmallReadEncode error
|
|
||||||
errBufferTooLargeDecodeWrite error
|
|
||||||
)
|
|
||||||
|
|
||||||
// updateConfigCache refreshes the cached config values if needed
|
|
||||||
// This function is kept for backward compatibility
|
|
||||||
func updateConfigCache() {
|
|
||||||
// Use the new global cache
|
|
||||||
globalAudioConfigCache.Update()
|
|
||||||
|
|
||||||
// Update old variables for backward compatibility
|
|
||||||
cachedMinReadEncodeBuffer = globalAudioConfigCache.GetMinReadEncodeBuffer()
|
|
||||||
cachedMaxDecodeWriteBuffer = globalAudioConfigCache.GetMaxDecodeWriteBuffer()
|
|
||||||
cachedMaxPacketSize = globalAudioConfigCache.GetMaxPacketSize()
|
|
||||||
errBufferTooSmallReadEncode = globalAudioConfigCache.GetBufferTooSmallError()
|
|
||||||
errBufferTooLargeDecodeWrite = globalAudioConfigCache.GetBufferTooLargeError()
|
|
||||||
|
|
||||||
// Mark as initialized
|
|
||||||
configCacheInitialized.Store(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func cgoAudioReadEncode(buf []byte) (int, error) {
|
func cgoAudioReadEncode(buf []byte) (int, error) {
|
||||||
// Fast path: Use AudioConfigCache to avoid GetConfig() in hot path
|
// Fast path: Use AudioConfigCache to avoid GetConfig() in hot path
|
||||||
cache := GetCachedConfig()
|
cache := GetCachedConfig()
|
||||||
|
// Only update cache if expired - avoid unnecessary overhead
|
||||||
|
if time.Since(cache.lastUpdate) > cache.cacheExpiry {
|
||||||
cache.Update()
|
cache.Update()
|
||||||
|
}
|
||||||
|
|
||||||
// Fast validation with cached values - avoid lock with atomic access
|
// Fast validation with cached values - avoid lock with atomic access
|
||||||
minRequired := cache.GetMinReadEncodeBuffer()
|
minRequired := cache.GetMinReadEncodeBuffer()
|
||||||
|
@ -914,14 +885,8 @@ func cgoAudioReadEncode(buf []byte) (int, error) {
|
||||||
// Note: The C code already has comprehensive state tracking with capture_initialized,
|
// Note: The C code already has comprehensive state tracking with capture_initialized,
|
||||||
// capture_initializing, playback_initialized, and playback_initializing flags.
|
// capture_initializing, playback_initialized, and playback_initializing flags.
|
||||||
|
|
||||||
// Direct CGO call with minimal overhead - avoid bounds check with unsafe
|
// Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is safe for validated non-empty buffers
|
||||||
var bufPtr unsafe.Pointer
|
n := C.jetkvm_audio_read_encode(unsafe.Pointer(&buf[0]))
|
||||||
if len(buf) > 0 {
|
|
||||||
bufPtr = unsafe.Pointer(&buf[0])
|
|
||||||
}
|
|
||||||
|
|
||||||
// Direct CGO call with minimal overhead
|
|
||||||
n := C.jetkvm_audio_read_encode(bufPtr)
|
|
||||||
|
|
||||||
// Fast path for success case
|
// Fast path for success case
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
|
@ -967,15 +932,15 @@ func cgoAudioPlaybackClose() {
|
||||||
func cgoAudioDecodeWrite(buf []byte) (n int, err error) {
|
func cgoAudioDecodeWrite(buf []byte) (n int, err error) {
|
||||||
// Fast validation with AudioConfigCache
|
// Fast validation with AudioConfigCache
|
||||||
cache := GetCachedConfig()
|
cache := GetCachedConfig()
|
||||||
|
// Only update cache if expired - avoid unnecessary overhead
|
||||||
|
if time.Since(cache.lastUpdate) > cache.cacheExpiry {
|
||||||
cache.Update()
|
cache.Update()
|
||||||
|
}
|
||||||
|
|
||||||
// Optimized buffer validation
|
// Optimized buffer validation
|
||||||
if len(buf) == 0 {
|
if len(buf) == 0 {
|
||||||
return 0, errEmptyBuffer
|
return 0, errEmptyBuffer
|
||||||
}
|
}
|
||||||
if buf == nil {
|
|
||||||
return 0, errNilBuffer
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use cached max buffer size with atomic access
|
// Use cached max buffer size with atomic access
|
||||||
maxAllowed := cache.GetMaxDecodeWriteBuffer()
|
maxAllowed := cache.GetMaxDecodeWriteBuffer()
|
||||||
|
@ -987,26 +952,8 @@ func cgoAudioDecodeWrite(buf []byte) (n int, err error) {
|
||||||
return 0, newBufferTooLargeError(len(buf), maxAllowed)
|
return 0, newBufferTooLargeError(len(buf), maxAllowed)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Avoid bounds check with unsafe
|
// Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is safe for validated non-empty buffers
|
||||||
var bufPtr unsafe.Pointer
|
n = int(C.jetkvm_audio_decode_write(unsafe.Pointer(&buf[0]), C.int(len(buf))))
|
||||||
if len(buf) > 0 {
|
|
||||||
bufPtr = unsafe.Pointer(&buf[0])
|
|
||||||
if bufPtr == nil {
|
|
||||||
return 0, errInvalidBufferPtr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Simplified panic recovery - only recover from C panics
|
|
||||||
defer func() {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
// Log the panic but don't allocate in the hot path
|
|
||||||
// Using pre-allocated error to avoid allocations
|
|
||||||
err = errAudioDecodeWrite
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Direct CGO call with minimal overhead
|
|
||||||
n = int(C.jetkvm_audio_decode_write(bufPtr, C.int(len(buf))))
|
|
||||||
|
|
||||||
// Fast path for success case
|
// Fast path for success case
|
||||||
if n >= 0 {
|
if n >= 0 {
|
||||||
|
@ -1051,10 +998,12 @@ var (
|
||||||
// Track buffer pool usage for monitoring
|
// Track buffer pool usage for monitoring
|
||||||
cgoBufferPoolGets atomic.Int64
|
cgoBufferPoolGets atomic.Int64
|
||||||
cgoBufferPoolPuts atomic.Int64
|
cgoBufferPoolPuts atomic.Int64
|
||||||
// Batch processing statistics
|
// Batch processing statistics - only enabled in debug builds
|
||||||
batchProcessingCount atomic.Int64
|
batchProcessingCount atomic.Int64
|
||||||
batchFrameCount atomic.Int64
|
batchFrameCount atomic.Int64
|
||||||
batchProcessingTime atomic.Int64
|
batchProcessingTime atomic.Int64
|
||||||
|
// Flag to control time tracking overhead
|
||||||
|
enableBatchTimeTracking atomic.Bool
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetBufferFromPool gets a buffer from the pool with at least the specified capacity
|
// GetBufferFromPool gets a buffer from the pool with at least the specified capacity
|
||||||
|
@ -1142,7 +1091,10 @@ func (b *AudioFrameBatch) Release() {
|
||||||
func ReadEncodeWithPooledBuffer() ([]byte, int, error) {
|
func ReadEncodeWithPooledBuffer() ([]byte, int, error) {
|
||||||
// Get cached config
|
// Get cached config
|
||||||
cache := GetCachedConfig()
|
cache := GetCachedConfig()
|
||||||
|
// Only update cache if expired - avoid unnecessary overhead
|
||||||
|
if time.Since(cache.lastUpdate) > cache.cacheExpiry {
|
||||||
cache.Update()
|
cache.Update()
|
||||||
|
}
|
||||||
|
|
||||||
// Get a buffer from the pool with appropriate capacity
|
// Get a buffer from the pool with appropriate capacity
|
||||||
bufferSize := cache.GetMinReadEncodeBuffer()
|
bufferSize := cache.GetMinReadEncodeBuffer()
|
||||||
|
@ -1178,7 +1130,10 @@ func DecodeWriteWithPooledBuffer(data []byte) (int, error) {
|
||||||
|
|
||||||
// Get cached config
|
// Get cached config
|
||||||
cache := GetCachedConfig()
|
cache := GetCachedConfig()
|
||||||
|
// Only update cache if expired - avoid unnecessary overhead
|
||||||
|
if time.Since(cache.lastUpdate) > cache.cacheExpiry {
|
||||||
cache.Update()
|
cache.Update()
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure data doesn't exceed max packet size
|
// Ensure data doesn't exceed max packet size
|
||||||
maxPacketSize := cache.GetMaxPacketSize()
|
maxPacketSize := cache.GetMaxPacketSize()
|
||||||
|
@ -1202,7 +1157,10 @@ func DecodeWriteWithPooledBuffer(data []byte) (int, error) {
|
||||||
func BatchReadEncode(batchSize int) ([][]byte, error) {
|
func BatchReadEncode(batchSize int) ([][]byte, error) {
|
||||||
// Get cached config
|
// Get cached config
|
||||||
cache := GetCachedConfig()
|
cache := GetCachedConfig()
|
||||||
|
// Only update cache if expired - avoid unnecessary overhead
|
||||||
|
if time.Since(cache.lastUpdate) > cache.cacheExpiry {
|
||||||
cache.Update()
|
cache.Update()
|
||||||
|
}
|
||||||
|
|
||||||
// Calculate total buffer size needed for batch
|
// Calculate total buffer size needed for batch
|
||||||
frameSize := cache.GetMinReadEncodeBuffer()
|
frameSize := cache.GetMinReadEncodeBuffer()
|
||||||
|
@ -1212,8 +1170,24 @@ func BatchReadEncode(batchSize int) ([][]byte, error) {
|
||||||
batchBuffer := GetBufferFromPool(totalSize)
|
batchBuffer := GetBufferFromPool(totalSize)
|
||||||
defer ReturnBufferToPool(batchBuffer)
|
defer ReturnBufferToPool(batchBuffer)
|
||||||
|
|
||||||
// Track batch processing statistics
|
// Pre-allocate frame result buffers from pool to avoid allocations in loop
|
||||||
startTime := time.Now()
|
frameBuffers := make([][]byte, 0, batchSize)
|
||||||
|
for i := 0; i < batchSize; i++ {
|
||||||
|
frameBuffers = append(frameBuffers, GetBufferFromPool(frameSize))
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
// Return all frame buffers to pool
|
||||||
|
for _, buf := range frameBuffers {
|
||||||
|
ReturnBufferToPool(buf)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Track batch processing statistics - only if enabled
|
||||||
|
var startTime time.Time
|
||||||
|
trackTime := enableBatchTimeTracking.Load()
|
||||||
|
if trackTime {
|
||||||
|
startTime = time.Now()
|
||||||
|
}
|
||||||
batchProcessingCount.Add(1)
|
batchProcessingCount.Add(1)
|
||||||
|
|
||||||
// Process frames in batch
|
// Process frames in batch
|
||||||
|
@ -1229,21 +1203,25 @@ func BatchReadEncode(batchSize int) ([][]byte, error) {
|
||||||
// Return partial batch on error
|
// Return partial batch on error
|
||||||
if i > 0 {
|
if i > 0 {
|
||||||
batchFrameCount.Add(int64(i))
|
batchFrameCount.Add(int64(i))
|
||||||
|
if trackTime {
|
||||||
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
||||||
|
}
|
||||||
return frames, nil
|
return frames, nil
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy frame data to result
|
// Reuse pre-allocated buffer instead of make([]byte, n)
|
||||||
frameCopy := make([]byte, n)
|
frameCopy := frameBuffers[i][:n] // Slice to actual size
|
||||||
copy(frameCopy, frameBuf[:n])
|
copy(frameCopy, frameBuf[:n])
|
||||||
frames = append(frames, frameCopy)
|
frames = append(frames, frameCopy)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update statistics
|
// Update statistics
|
||||||
batchFrameCount.Add(int64(len(frames)))
|
batchFrameCount.Add(int64(len(frames)))
|
||||||
|
if trackTime {
|
||||||
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
||||||
|
}
|
||||||
|
|
||||||
return frames, nil
|
return frames, nil
|
||||||
}
|
}
|
||||||
|
@ -1258,10 +1236,17 @@ func BatchDecodeWrite(frames [][]byte) error {
|
||||||
|
|
||||||
// Get cached config
|
// Get cached config
|
||||||
cache := GetCachedConfig()
|
cache := GetCachedConfig()
|
||||||
|
// Only update cache if expired - avoid unnecessary overhead
|
||||||
|
if time.Since(cache.lastUpdate) > cache.cacheExpiry {
|
||||||
cache.Update()
|
cache.Update()
|
||||||
|
}
|
||||||
|
|
||||||
// Track batch processing statistics
|
// Track batch processing statistics - only if enabled
|
||||||
startTime := time.Now()
|
var startTime time.Time
|
||||||
|
trackTime := enableBatchTimeTracking.Load()
|
||||||
|
if trackTime {
|
||||||
|
startTime = time.Now()
|
||||||
|
}
|
||||||
batchProcessingCount.Add(1)
|
batchProcessingCount.Add(1)
|
||||||
|
|
||||||
// Get a PCM buffer from the pool for optimized decode-write
|
// Get a PCM buffer from the pool for optimized decode-write
|
||||||
|
@ -1281,7 +1266,9 @@ func BatchDecodeWrite(frames [][]byte) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Update statistics before returning error
|
// Update statistics before returning error
|
||||||
batchFrameCount.Add(int64(frameCount))
|
batchFrameCount.Add(int64(frameCount))
|
||||||
|
if trackTime {
|
||||||
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1290,7 +1277,9 @@ func BatchDecodeWrite(frames [][]byte) error {
|
||||||
|
|
||||||
// Update statistics
|
// Update statistics
|
||||||
batchFrameCount.Add(int64(frameCount))
|
batchFrameCount.Add(int64(frameCount))
|
||||||
|
if trackTime {
|
||||||
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
batchProcessingTime.Add(time.Since(startTime).Microseconds())
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1322,7 +1311,10 @@ func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, err
|
||||||
|
|
||||||
// Get cached config
|
// Get cached config
|
||||||
cache := GetCachedConfig()
|
cache := GetCachedConfig()
|
||||||
|
// Only update cache if expired - avoid unnecessary overhead
|
||||||
|
if time.Since(cache.lastUpdate) > cache.cacheExpiry {
|
||||||
cache.Update()
|
cache.Update()
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure data doesn't exceed max packet size
|
// Ensure data doesn't exceed max packet size
|
||||||
maxPacketSize := cache.GetMaxPacketSize()
|
maxPacketSize := cache.GetMaxPacketSize()
|
||||||
|
@ -1330,46 +1322,23 @@ func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, err
|
||||||
return 0, newBufferTooLargeError(len(opusData), maxPacketSize)
|
return 0, newBufferTooLargeError(len(opusData), maxPacketSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Avoid bounds check with unsafe
|
// Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is never nil for non-empty slices
|
||||||
var opusPtr unsafe.Pointer
|
n := int(C.jetkvm_audio_decode_write(unsafe.Pointer(&opusData[0]), C.int(len(opusData))))
|
||||||
if len(opusData) > 0 {
|
|
||||||
opusPtr = unsafe.Pointer(&opusData[0])
|
|
||||||
if opusPtr == nil {
|
|
||||||
return 0, errInvalidBufferPtr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Simplified panic recovery - only recover from C panics
|
|
||||||
var n int
|
|
||||||
var err error
|
|
||||||
defer func() {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
// Using pre-allocated error to avoid allocations
|
|
||||||
err = errAudioDecodeWrite
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Direct CGO call with minimal overhead
|
|
||||||
n = int(C.jetkvm_audio_decode_write(opusPtr, C.int(len(opusData))))
|
|
||||||
|
|
||||||
// Fast path for success case
|
// Fast path for success case
|
||||||
if n >= 0 {
|
if n >= 0 {
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle error cases with static error codes
|
// Handle error cases with static error codes to reduce allocations
|
||||||
switch n {
|
switch n {
|
||||||
case -1:
|
case -1:
|
||||||
n = 0
|
return 0, errAudioInitFailed
|
||||||
err = errAudioInitFailed
|
|
||||||
case -2:
|
case -2:
|
||||||
n = 0
|
return 0, errAudioDecodeWrite
|
||||||
err = errAudioDecodeWrite
|
|
||||||
default:
|
default:
|
||||||
n = 0
|
return 0, newAudioDecodeWriteError(n)
|
||||||
err = newAudioDecodeWriteError(n)
|
|
||||||
}
|
}
|
||||||
return n, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// CGO function aliases
|
// CGO function aliases
|
||||||
|
|
|
@ -507,14 +507,16 @@ func (ais *AudioInputServer) processOpusFrame(data []byte) error {
|
||||||
|
|
||||||
// Use ultra-fast validation for critical audio path
|
// Use ultra-fast validation for critical audio path
|
||||||
if err := ValidateAudioFrame(data); err != nil {
|
if err := ValidateAudioFrame(data); err != nil {
|
||||||
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
|
// Skip logging in hotpath to avoid overhead - validation errors are rare
|
||||||
logger.Error().Err(err).Msg("Frame validation failed")
|
|
||||||
return fmt.Errorf("input frame validation failed: %w", err)
|
return fmt.Errorf("input frame validation failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get cached config for optimal performance
|
// Get cached config for optimal performance
|
||||||
cache := GetCachedConfig()
|
cache := GetCachedConfig()
|
||||||
|
// Only update cache if expired - avoid unnecessary overhead
|
||||||
|
if time.Since(cache.lastUpdate) > cache.cacheExpiry {
|
||||||
cache.Update()
|
cache.Update()
|
||||||
|
}
|
||||||
|
|
||||||
// Get a PCM buffer from the pool for optimized decode-write
|
// Get a PCM buffer from the pool for optimized decode-write
|
||||||
pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize())
|
pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize())
|
||||||
|
|
Loading…
Reference in New Issue