kvm/internal/audio/batch_zero_copy.go

416 lines
12 KiB
Go

//go:build cgo
package audio
import (
"sync"
"sync/atomic"
"time"
)
// BatchZeroCopyProcessor handles batch operations on zero-copy audio frames
// with optimized reference counting and memory management
type BatchZeroCopyProcessor struct {
// Configuration
maxBatchSize int
batchTimeout time.Duration
processingDelay time.Duration
adaptiveThreshold float64
// Processing queues
readEncodeQueue chan *batchZeroCopyRequest
decodeWriteQueue chan *batchZeroCopyRequest
// Worker management
workerPool chan struct{}
running int32
wg sync.WaitGroup
// Statistics
batchedFrames int64
singleFrames int64
batchSavings int64
processingTimeUs int64
adaptiveHits int64
adaptiveMisses int64
}
type batchZeroCopyRequest struct {
frames []*ZeroCopyAudioFrame
operation batchZeroCopyOperation
resultCh chan batchZeroCopyResult
timestamp time.Time
}
type batchZeroCopyOperation int
const (
batchOpReadEncode batchZeroCopyOperation = iota
batchOpDecodeWrite
batchOpMixed
)
type batchZeroCopyResult struct {
encodedData [][]byte // For read-encode operations
processedCount int // Number of successfully processed frames
err error
}
// Global batch zero-copy processor
var (
globalBatchZeroCopyProcessor *BatchZeroCopyProcessor
batchZeroCopyOnce sync.Once
)
// GetBatchZeroCopyProcessor returns the global batch zero-copy processor
func GetBatchZeroCopyProcessor() *BatchZeroCopyProcessor {
batchZeroCopyOnce.Do(func() {
globalBatchZeroCopyProcessor = NewBatchZeroCopyProcessor()
globalBatchZeroCopyProcessor.Start()
})
return globalBatchZeroCopyProcessor
}
// NewBatchZeroCopyProcessor creates a new batch zero-copy processor
func NewBatchZeroCopyProcessor() *BatchZeroCopyProcessor {
cache := Config
return &BatchZeroCopyProcessor{
maxBatchSize: cache.BatchProcessorFramesPerBatch,
batchTimeout: cache.BatchProcessorTimeout,
processingDelay: cache.BatchProcessingDelay,
adaptiveThreshold: cache.BatchProcessorAdaptiveThreshold,
readEncodeQueue: make(chan *batchZeroCopyRequest, cache.BatchProcessorMaxQueueSize),
decodeWriteQueue: make(chan *batchZeroCopyRequest, cache.BatchProcessorMaxQueueSize),
workerPool: make(chan struct{}, 4), // 4 workers for parallel processing
}
}
// Start starts the batch zero-copy processor workers
func (bzcp *BatchZeroCopyProcessor) Start() {
if !atomic.CompareAndSwapInt32(&bzcp.running, 0, 1) {
return // Already running
}
// Start worker goroutines for read-encode operations
for i := 0; i < cap(bzcp.workerPool)/2; i++ {
bzcp.wg.Add(1)
go bzcp.readEncodeWorker()
}
// Start worker goroutines for decode-write operations
for i := 0; i < cap(bzcp.workerPool)/2; i++ {
bzcp.wg.Add(1)
go bzcp.decodeWriteWorker()
}
}
// Stop stops the batch zero-copy processor
func (bzcp *BatchZeroCopyProcessor) Stop() {
if !atomic.CompareAndSwapInt32(&bzcp.running, 1, 0) {
return // Already stopped
}
close(bzcp.readEncodeQueue)
close(bzcp.decodeWriteQueue)
bzcp.wg.Wait()
}
// readEncodeWorker processes batch read-encode operations
func (bzcp *BatchZeroCopyProcessor) readEncodeWorker() {
defer bzcp.wg.Done()
for req := range bzcp.readEncodeQueue {
bzcp.processBatchReadEncode(req)
}
}
// decodeWriteWorker processes batch decode-write operations
func (bzcp *BatchZeroCopyProcessor) decodeWriteWorker() {
defer bzcp.wg.Done()
for req := range bzcp.decodeWriteQueue {
bzcp.processBatchDecodeWrite(req)
}
}
// processBatchReadEncode processes a batch of read-encode operations
func (bzcp *BatchZeroCopyProcessor) processBatchReadEncode(req *batchZeroCopyRequest) {
startTime := time.Now()
result := batchZeroCopyResult{}
// Batch AddRef all frames first
err := BatchAddRefFrames(req.frames)
if err != nil {
result.err = err
if req.resultCh != nil {
req.resultCh <- result
close(req.resultCh)
}
return
}
// Process frames using existing batch read-encode logic
encodedData, err := BatchReadEncode(len(req.frames))
if err != nil {
// Batch release frames on error
if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil {
// Log release error but preserve original error
_ = releaseErr
}
result.err = err
} else {
result.encodedData = encodedData
result.processedCount = len(encodedData)
// Batch release frames after successful processing
if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil {
// Log release error but don't fail the operation
_ = releaseErr
}
}
// Update statistics
atomic.AddInt64(&bzcp.batchedFrames, int64(len(req.frames)))
atomic.AddInt64(&bzcp.batchSavings, int64(len(req.frames)-1))
atomic.AddInt64(&bzcp.processingTimeUs, time.Since(startTime).Microseconds())
// Send result back
if req.resultCh != nil {
req.resultCh <- result
close(req.resultCh)
}
}
// processBatchDecodeWrite processes a batch of decode-write operations
func (bzcp *BatchZeroCopyProcessor) processBatchDecodeWrite(req *batchZeroCopyRequest) {
startTime := time.Now()
result := batchZeroCopyResult{}
// Batch AddRef all frames first
err := BatchAddRefFrames(req.frames)
if err != nil {
result.err = err
if req.resultCh != nil {
req.resultCh <- result
close(req.resultCh)
}
return
}
// Extract data from zero-copy frames for batch processing
frameData := make([][]byte, len(req.frames))
for i, frame := range req.frames {
if frame != nil {
// Get data from zero-copy frame
frameData[i] = frame.Data()[:frame.Length()]
}
}
// Process frames using existing batch decode-write logic
err = BatchDecodeWrite(frameData)
if err != nil {
result.err = err
} else {
result.processedCount = len(req.frames)
}
// Batch release frames
if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil {
// Log release error but don't override processing error
_ = releaseErr
}
// Update statistics
atomic.AddInt64(&bzcp.batchedFrames, int64(len(req.frames)))
atomic.AddInt64(&bzcp.batchSavings, int64(len(req.frames)-1))
atomic.AddInt64(&bzcp.processingTimeUs, time.Since(startTime).Microseconds())
// Send result back
if req.resultCh != nil {
req.resultCh <- result
close(req.resultCh)
}
}
// BatchReadEncodeZeroCopy performs batch read-encode on zero-copy frames
func (bzcp *BatchZeroCopyProcessor) BatchReadEncodeZeroCopy(frames []*ZeroCopyAudioFrame) ([][]byte, error) {
if len(frames) == 0 {
return nil, nil
}
// For small batches, use direct operations to avoid overhead
if len(frames) <= 2 {
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleReadEncode(frames)
}
// Use adaptive threshold to determine batch vs single processing
batchedFrames := atomic.LoadInt64(&bzcp.batchedFrames)
singleFrames := atomic.LoadInt64(&bzcp.singleFrames)
totalFrames := batchedFrames + singleFrames
if totalFrames > 100 { // Only apply adaptive logic after some samples
batchRatio := float64(batchedFrames) / float64(totalFrames)
if batchRatio < bzcp.adaptiveThreshold {
// Batch processing not effective, use single processing
atomic.AddInt64(&bzcp.adaptiveMisses, 1)
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleReadEncode(frames)
}
atomic.AddInt64(&bzcp.adaptiveHits, 1)
}
// Use batch processing
if atomic.LoadInt32(&bzcp.running) == 0 {
// Fallback to single processing if batch processor not running
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleReadEncode(frames)
}
resultCh := make(chan batchZeroCopyResult, 1)
req := &batchZeroCopyRequest{
frames: frames,
operation: batchOpReadEncode,
resultCh: resultCh,
timestamp: time.Now(),
}
select {
case bzcp.readEncodeQueue <- req:
// Wait for completion
result := <-resultCh
return result.encodedData, result.err
default:
// Queue full, fallback to single processing
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleReadEncode(frames)
}
}
// BatchDecodeWriteZeroCopy performs batch decode-write on zero-copy frames
func (bzcp *BatchZeroCopyProcessor) BatchDecodeWriteZeroCopy(frames []*ZeroCopyAudioFrame) error {
if len(frames) == 0 {
return nil
}
// For small batches, use direct operations
if len(frames) <= 2 {
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleDecodeWrite(frames)
}
// Use adaptive threshold
batchedFrames := atomic.LoadInt64(&bzcp.batchedFrames)
singleFrames := atomic.LoadInt64(&bzcp.singleFrames)
totalFrames := batchedFrames + singleFrames
if totalFrames > 100 {
batchRatio := float64(batchedFrames) / float64(totalFrames)
if batchRatio < bzcp.adaptiveThreshold {
atomic.AddInt64(&bzcp.adaptiveMisses, 1)
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleDecodeWrite(frames)
}
atomic.AddInt64(&bzcp.adaptiveHits, 1)
}
// Use batch processing
if atomic.LoadInt32(&bzcp.running) == 0 {
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleDecodeWrite(frames)
}
resultCh := make(chan batchZeroCopyResult, 1)
req := &batchZeroCopyRequest{
frames: frames,
operation: batchOpDecodeWrite,
resultCh: resultCh,
timestamp: time.Now(),
}
select {
case bzcp.decodeWriteQueue <- req:
// Wait for completion
result := <-resultCh
return result.err
default:
// Queue full, fallback to single processing
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleDecodeWrite(frames)
}
}
// processSingleReadEncode processes frames individually for read-encode
func (bzcp *BatchZeroCopyProcessor) processSingleReadEncode(frames []*ZeroCopyAudioFrame) ([][]byte, error) {
// Extract data and use existing batch processing
frameData := make([][]byte, 0, len(frames))
for _, frame := range frames {
if frame != nil {
frame.AddRef()
frameData = append(frameData, frame.Data()[:frame.Length()])
}
}
// Use existing batch read-encode
result, err := BatchReadEncode(len(frameData))
// Release frames
for _, frame := range frames {
if frame != nil {
frame.Release()
}
}
return result, err
}
// processSingleDecodeWrite processes frames individually for decode-write
func (bzcp *BatchZeroCopyProcessor) processSingleDecodeWrite(frames []*ZeroCopyAudioFrame) error {
// Extract data and use existing batch processing
frameData := make([][]byte, 0, len(frames))
for _, frame := range frames {
if frame != nil {
frame.AddRef()
frameData = append(frameData, frame.Data()[:frame.Length()])
}
}
// Use existing batch decode-write
err := BatchDecodeWrite(frameData)
// Release frames
for _, frame := range frames {
if frame != nil {
frame.Release()
}
}
return err
}
// GetBatchZeroCopyStats returns batch zero-copy processing statistics
func (bzcp *BatchZeroCopyProcessor) GetBatchZeroCopyStats() (batchedFrames, singleFrames, savings, processingTimeUs, adaptiveHits, adaptiveMisses int64) {
return atomic.LoadInt64(&bzcp.batchedFrames),
atomic.LoadInt64(&bzcp.singleFrames),
atomic.LoadInt64(&bzcp.batchSavings),
atomic.LoadInt64(&bzcp.processingTimeUs),
atomic.LoadInt64(&bzcp.adaptiveHits),
atomic.LoadInt64(&bzcp.adaptiveMisses)
}
// Convenience functions for global batch zero-copy processor
// BatchReadEncodeZeroCopyFrames performs batch read-encode on zero-copy frames
func BatchReadEncodeZeroCopyFrames(frames []*ZeroCopyAudioFrame) ([][]byte, error) {
return GetBatchZeroCopyProcessor().BatchReadEncodeZeroCopy(frames)
}
// BatchDecodeWriteZeroCopyFrames performs batch decode-write on zero-copy frames
func BatchDecodeWriteZeroCopyFrames(frames []*ZeroCopyAudioFrame) error {
return GetBatchZeroCopyProcessor().BatchDecodeWriteZeroCopy(frames)
}
// GetGlobalBatchZeroCopyStats returns global batch zero-copy processing statistics
func GetGlobalBatchZeroCopyStats() (batchedFrames, singleFrames, savings, processingTimeUs, adaptiveHits, adaptiveMisses int64) {
return GetBatchZeroCopyProcessor().GetBatchZeroCopyStats()
}