feat(audio): add batch write processing and improve thread management

- Add batch write processing functionality to match existing read processing
- Improve thread pinning logic with separate controls for read/write
- Add new batch processing configuration parameters
- Update build tags to exclude arm architecture
This commit is contained in:
Alex P 2025-09-03 15:21:35 +00:00
parent 2ab90e76e0
commit 9f1dd28ad6
6 changed files with 245 additions and 24 deletions

View File

@ -1,5 +1,5 @@
//go:build cgo //go:build cgo && !arm
// +build cgo // +build cgo,!arm
// Package audio provides real-time audio processing for JetKVM with low-latency streaming. // Package audio provides real-time audio processing for JetKVM with low-latency streaming.
// //

View File

@ -1,4 +1,4 @@
//go:build cgo //go:build cgo && !arm
package audio package audio
@ -28,23 +28,31 @@ type BatchAudioProcessor struct {
// Batch queues and state (atomic for lock-free access) // Batch queues and state (atomic for lock-free access)
readQueue chan batchReadRequest readQueue chan batchReadRequest
writeQueue chan batchWriteRequest
initialized int32 initialized int32
running int32 running int32
threadPinned int32 threadPinned int32
writePinned int32
// Buffers (pre-allocated to avoid allocation overhead) // Buffers (pre-allocated to avoid allocation overhead)
readBufPool *sync.Pool readBufPool *sync.Pool
writeBufPool *sync.Pool
} }
type BatchAudioStats struct { type BatchAudioStats struct {
// int64 fields MUST be first for ARM32 alignment // int64 fields MUST be first for ARM32 alignment
BatchedReads int64 BatchedReads int64
SingleReads int64 SingleReads int64
BatchedWrites int64
SingleWrites int64
BatchedFrames int64 BatchedFrames int64
SingleFrames int64 SingleFrames int64
WriteFrames int64
CGOCallsReduced int64 CGOCallsReduced int64
OSThreadPinTime time.Duration // time.Duration is int64 internally OSThreadPinTime time.Duration // time.Duration is int64 internally
WriteThreadTime time.Duration // time.Duration is int64 internally
LastBatchTime time.Time LastBatchTime time.Time
LastWriteTime time.Time
} }
type batchReadRequest struct { type batchReadRequest struct {
@ -58,18 +66,33 @@ type batchReadResult struct {
err error err error
} }
type batchWriteRequest struct {
buffer []byte
resultChan chan batchWriteResult
timestamp time.Time
}
type batchWriteResult struct {
length int
err error
}
// NewBatchAudioProcessor creates a new batch audio processor // NewBatchAudioProcessor creates a new batch audio processor
func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor { func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor {
// Get cached config to avoid GetConfig() calls
cache := GetCachedConfig()
cache.Update()
// Validate input parameters // Validate input parameters
if err := ValidateBufferSize(batchSize); err != nil { if err := ValidateBufferSize(batchSize); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
logger.Warn().Err(err).Int("batchSize", batchSize).Msg("invalid batch size, using default") logger.Warn().Err(err).Int("batchSize", batchSize).Msg("invalid batch size, using default")
batchSize = GetConfig().BatchProcessorFramesPerBatch batchSize = cache.BatchProcessorFramesPerBatch
} }
if batchDuration <= 0 { if batchDuration <= 0 {
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
logger.Warn().Dur("batchDuration", batchDuration).Msg("invalid batch duration, using default") logger.Warn().Dur("batchDuration", batchDuration).Msg("invalid batch duration, using default")
batchDuration = GetConfig().BatchProcessingDelay batchDuration = cache.BatchProcessingDelay
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -82,11 +105,17 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu
batchSize: batchSize, batchSize: batchSize,
batchDuration: batchDuration, batchDuration: batchDuration,
readQueue: make(chan batchReadRequest, batchSize*2), readQueue: make(chan batchReadRequest, batchSize*2),
writeQueue: make(chan batchWriteRequest, batchSize*2),
readBufPool: &sync.Pool{ readBufPool: &sync.Pool{
New: func() interface{} { New: func() interface{} {
return make([]byte, GetConfig().AudioFramePoolSize) // Max audio frame size return make([]byte, GetConfig().AudioFramePoolSize) // Max audio frame size
}, },
}, },
writeBufPool: &sync.Pool{
New: func() interface{} {
return make([]byte, GetConfig().AudioFramePoolSize) // Max audio frame size
},
},
} }
return processor return processor
@ -105,6 +134,7 @@ func (bap *BatchAudioProcessor) Start() error {
// Start batch processing goroutines // Start batch processing goroutines
go bap.batchReadProcessor() go bap.batchReadProcessor()
go bap.batchWriteProcessor()
bap.logger.Info().Int("batch_size", bap.batchSize). bap.logger.Info().Int("batch_size", bap.batchSize).
Dur("batch_duration", bap.batchDuration). Dur("batch_duration", bap.batchDuration).
@ -129,13 +159,17 @@ func (bap *BatchAudioProcessor) Stop() {
// BatchReadEncode performs batched audio read and encode operations // BatchReadEncode performs batched audio read and encode operations
func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) {
// Get cached config to avoid GetConfig() calls in hot path
cache := GetCachedConfig()
cache.Update()
// Validate buffer before processing // Validate buffer before processing
if err := ValidateBufferSize(len(buffer)); err != nil { if err := ValidateBufferSize(len(buffer)); err != nil {
bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing") bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing")
return 0, err return 0, err
} }
if atomic.LoadInt32(&bap.running) == 0 { if !bap.IsRunning() {
// Fallback to single operation if batch processor is not running // Fallback to single operation if batch processor is not running
atomic.AddInt64(&bap.stats.SingleReads, 1) atomic.AddInt64(&bap.stats.SingleReads, 1)
atomic.AddInt64(&bap.stats.SingleFrames, 1) atomic.AddInt64(&bap.stats.SingleFrames, 1)
@ -149,21 +183,22 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) {
timestamp: time.Now(), timestamp: time.Now(),
} }
// Try to queue the request with non-blocking send
select { select {
case bap.readQueue <- request: case bap.readQueue <- request:
// Successfully queued // Successfully queued
case <-time.After(GetConfig().ShortTimeout): default:
// Queue is full or blocked, fallback to single operation // Queue is full, fallback to single operation
atomic.AddInt64(&bap.stats.SingleReads, 1) atomic.AddInt64(&bap.stats.SingleReads, 1)
atomic.AddInt64(&bap.stats.SingleFrames, 1) atomic.AddInt64(&bap.stats.SingleFrames, 1)
return CGOAudioReadEncode(buffer) return CGOAudioReadEncode(buffer)
} }
// Wait for result // Wait for result with timeout
select { select {
case result := <-resultChan: case result := <-resultChan:
return result.length, result.err return result.length, result.err
case <-time.After(GetConfig().MediumTimeout): case <-time.After(cache.BatchProcessingTimeout):
// Timeout, fallback to single operation // Timeout, fallback to single operation
atomic.AddInt64(&bap.stats.SingleReads, 1) atomic.AddInt64(&bap.stats.SingleReads, 1)
atomic.AddInt64(&bap.stats.SingleFrames, 1) atomic.AddInt64(&bap.stats.SingleFrames, 1)
@ -171,6 +206,54 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) {
} }
} }
// BatchDecodeWrite performs batched audio decode and write operations
func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) {
// Get cached config to avoid GetConfig() calls in hot path
cache := GetCachedConfig()
cache.Update()
// Validate buffer before processing
if err := ValidateBufferSize(len(buffer)); err != nil {
bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing")
return 0, err
}
if !bap.IsRunning() {
// Fallback to single operation if batch processor is not running
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
return CGOAudioDecodeWrite(buffer)
}
resultChan := make(chan batchWriteResult, 1)
request := batchWriteRequest{
buffer: buffer,
resultChan: resultChan,
timestamp: time.Now(),
}
// Try to queue the request with non-blocking send
select {
case bap.writeQueue <- request:
// Successfully queued
default:
// Queue is full, fall back to single operation
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
return CGOAudioDecodeWrite(buffer)
}
// Wait for result with timeout
select {
case result := <-resultChan:
return result.length, result.err
case <-time.After(cache.BatchProcessingTimeout):
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
return CGOAudioDecodeWrite(buffer)
}
}
// batchReadProcessor processes batched read operations // batchReadProcessor processes batched read operations
func (bap *BatchAudioProcessor) batchReadProcessor() { func (bap *BatchAudioProcessor) batchReadProcessor() {
defer bap.logger.Debug().Msg("batch read processor stopped") defer bap.logger.Debug().Msg("batch read processor stopped")
@ -207,15 +290,54 @@ func (bap *BatchAudioProcessor) batchReadProcessor() {
} }
} }
// batchWriteProcessor processes batched write operations
func (bap *BatchAudioProcessor) batchWriteProcessor() {
defer bap.logger.Debug().Msg("batch write processor stopped")
ticker := time.NewTicker(bap.batchDuration)
defer ticker.Stop()
var batch []batchWriteRequest
batch = make([]batchWriteRequest, 0, bap.batchSize)
for atomic.LoadInt32(&bap.running) == 1 {
select {
case <-bap.ctx.Done():
return
case req := <-bap.writeQueue:
batch = append(batch, req)
if len(batch) >= bap.batchSize {
bap.processBatchWrite(batch)
batch = batch[:0] // Clear slice but keep capacity
}
case <-ticker.C:
if len(batch) > 0 {
bap.processBatchWrite(batch)
batch = batch[:0] // Clear slice but keep capacity
}
}
}
// Process any remaining requests
if len(batch) > 0 {
bap.processBatchWrite(batch)
}
}
// processBatchRead processes a batch of read requests efficiently // processBatchRead processes a batch of read requests efficiently
func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
if len(batch) == 0 { if len(batch) == 0 {
return return
} }
// Get cached config to avoid GetConfig() calls in hot path
cache := GetCachedConfig()
// Only pin to OS thread for large batches to reduce thread contention // Only pin to OS thread for large batches to reduce thread contention
start := time.Now() start := time.Now()
shouldPinThread := len(batch) >= GetConfig().MinBatchSizeForThreadPinning shouldPinThread := len(batch) >= cache.MinBatchSizeForThreadPinning
// Track if we pinned the thread in this call // Track if we pinned the thread in this call
threadWasPinned := false threadWasPinned := false
@ -268,16 +390,85 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
bap.stats.LastBatchTime = time.Now() bap.stats.LastBatchTime = time.Now()
} }
// processBatchWrite processes a batch of write requests efficiently
func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) {
if len(batch) == 0 {
return
}
// Get cached config to avoid GetConfig() calls in hot path
cache := GetCachedConfig()
// Only pin to OS thread for large batches to reduce thread contention
start := time.Now()
shouldPinThread := len(batch) >= cache.MinBatchSizeForThreadPinning
// Track if we pinned the thread in this call
threadWasPinned := false
if shouldPinThread && atomic.CompareAndSwapInt32(&bap.writePinned, 0, 1) {
threadWasPinned = true
runtime.LockOSThread()
// Set high priority for batch audio processing
if err := SetAudioThreadPriority(); err != nil {
bap.logger.Warn().Err(err).Msg("failed to set batch audio processing priority")
}
}
batchSize := len(batch)
atomic.AddInt64(&bap.stats.BatchedWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, int64(batchSize))
if batchSize > 1 {
atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1))
}
// Add deferred function to release thread lock if we pinned it
if threadWasPinned {
defer func() {
if err := ResetThreadPriority(); err != nil {
bap.logger.Warn().Err(err).Msg("failed to reset thread priority")
}
runtime.UnlockOSThread()
atomic.StoreInt32(&bap.writePinned, 0)
bap.stats.WriteThreadTime += time.Since(start)
}()
}
// Process each request in the batch
for _, req := range batch {
length, err := CGOAudioDecodeWrite(req.buffer)
result := batchWriteResult{
length: length,
err: err,
}
// Send result back (non-blocking)
select {
case req.resultChan <- result:
default:
// Requestor timed out, drop result
}
}
bap.stats.LastWriteTime = time.Now()
}
// GetStats returns current batch processor statistics // GetStats returns current batch processor statistics
func (bap *BatchAudioProcessor) GetStats() BatchAudioStats { func (bap *BatchAudioProcessor) GetStats() BatchAudioStats {
return BatchAudioStats{ return BatchAudioStats{
BatchedReads: atomic.LoadInt64(&bap.stats.BatchedReads), BatchedReads: atomic.LoadInt64(&bap.stats.BatchedReads),
SingleReads: atomic.LoadInt64(&bap.stats.SingleReads), SingleReads: atomic.LoadInt64(&bap.stats.SingleReads),
BatchedWrites: atomic.LoadInt64(&bap.stats.BatchedWrites),
SingleWrites: atomic.LoadInt64(&bap.stats.SingleWrites),
BatchedFrames: atomic.LoadInt64(&bap.stats.BatchedFrames), BatchedFrames: atomic.LoadInt64(&bap.stats.BatchedFrames),
SingleFrames: atomic.LoadInt64(&bap.stats.SingleFrames), SingleFrames: atomic.LoadInt64(&bap.stats.SingleFrames),
WriteFrames: atomic.LoadInt64(&bap.stats.WriteFrames),
CGOCallsReduced: atomic.LoadInt64(&bap.stats.CGOCallsReduced), CGOCallsReduced: atomic.LoadInt64(&bap.stats.CGOCallsReduced),
OSThreadPinTime: bap.stats.OSThreadPinTime, OSThreadPinTime: bap.stats.OSThreadPinTime,
WriteThreadTime: bap.stats.WriteThreadTime,
LastBatchTime: bap.stats.LastBatchTime, LastBatchTime: bap.stats.LastBatchTime,
LastWriteTime: bap.stats.LastWriteTime,
} }
} }
@ -301,8 +492,11 @@ func GetBatchAudioProcessor() *BatchAudioProcessor {
// Initialize on first use // Initialize on first use
if atomic.CompareAndSwapInt32(&batchProcessorInitialized, 0, 1) { if atomic.CompareAndSwapInt32(&batchProcessorInitialized, 0, 1) {
config := GetConfig() // Get cached config to avoid GetConfig() calls
processor := NewBatchAudioProcessor(config.BatchProcessorFramesPerBatch, config.BatchProcessorTimeout) cache := GetCachedConfig()
cache.Update()
processor := NewBatchAudioProcessor(cache.BatchProcessorFramesPerBatch, cache.BatchProcessorTimeout)
atomic.StorePointer(&globalBatchProcessor, unsafe.Pointer(processor)) atomic.StorePointer(&globalBatchProcessor, unsafe.Pointer(processor))
return processor return processor
} }
@ -336,8 +530,21 @@ func DisableBatchAudioProcessing() {
// BatchCGOAudioReadEncode is a batched version of CGOAudioReadEncode // BatchCGOAudioReadEncode is a batched version of CGOAudioReadEncode
func BatchCGOAudioReadEncode(buffer []byte) (int, error) { func BatchCGOAudioReadEncode(buffer []byte) (int, error) {
processor := GetBatchAudioProcessor() processor := GetBatchAudioProcessor()
if processor != nil && processor.IsRunning() { if processor == nil || !processor.IsRunning() {
return processor.BatchReadEncode(buffer) // Fall back to non-batched version if processor is not running
return CGOAudioReadEncode(buffer)
} }
return CGOAudioReadEncode(buffer)
return processor.BatchReadEncode(buffer)
}
// BatchCGOAudioDecodeWrite is a batched version of CGOAudioDecodeWrite
func BatchCGOAudioDecodeWrite(buffer []byte) (int, error) {
processor := GetBatchAudioProcessor()
if processor == nil || !processor.IsRunning() {
// Fall back to non-batched version if processor is not running
return CGOAudioDecodeWrite(buffer)
}
return processor.BatchDecodeWrite(buffer)
} }

View File

@ -1,5 +1,5 @@
//go:build cgo //go:build cgo && !arm
// +build cgo // +build cgo,!arm
package audio package audio

View File

@ -1,4 +1,4 @@
//go:build cgo //go:build cgo && !arm
package audio package audio
@ -735,6 +735,13 @@ type AudioConfigCache struct {
minOpusBitrate atomic.Int32 minOpusBitrate atomic.Int32
maxOpusBitrate atomic.Int32 maxOpusBitrate atomic.Int32
// Batch processing related values
BatchProcessingTimeout time.Duration
BatchProcessorFramesPerBatch int
BatchProcessorTimeout time.Duration
BatchProcessingDelay time.Duration
MinBatchSizeForThreadPinning int
// Mutex for updating the cache // Mutex for updating the cache
mutex sync.RWMutex mutex sync.RWMutex
lastUpdate time.Time lastUpdate time.Time
@ -799,6 +806,13 @@ func (c *AudioConfigCache) Update() {
c.minOpusBitrate.Store(int32(config.MinOpusBitrate)) c.minOpusBitrate.Store(int32(config.MinOpusBitrate))
c.maxOpusBitrate.Store(int32(config.MaxOpusBitrate)) c.maxOpusBitrate.Store(int32(config.MaxOpusBitrate))
// Update batch processing related values
c.BatchProcessingTimeout = 100 * time.Millisecond // Fixed timeout for batch processing
c.BatchProcessorFramesPerBatch = config.BatchProcessorFramesPerBatch
c.BatchProcessorTimeout = config.BatchProcessorTimeout
c.BatchProcessingDelay = config.BatchProcessingDelay
c.MinBatchSizeForThreadPinning = config.MinBatchSizeForThreadPinning
// Pre-allocate common errors // Pre-allocate common errors
c.bufferTooSmallReadEncode = newBufferTooSmallError(0, config.MinReadEncodeBuffer) c.bufferTooSmallReadEncode = newBufferTooSmallError(0, config.MinReadEncodeBuffer)
c.bufferTooLargeDecodeWrite = newBufferTooLargeError(config.MaxDecodeWriteBuffer+1, config.MaxDecodeWriteBuffer) c.bufferTooLargeDecodeWrite = newBufferTooLargeError(config.MaxDecodeWriteBuffer+1, config.MaxDecodeWriteBuffer)

View File

@ -1,5 +1,5 @@
//go:build cgo //go:build cgo && !arm
// +build cgo // +build cgo,!arm
package audio package audio

View File

@ -1,5 +1,5 @@
//go:build cgo //go:build cgo && !arm
// +build cgo // +build cgo,!arm
package audio package audio