mirror of https://github.com/jetkvm/kvm.git
332 lines
8.4 KiB
Go
332 lines
8.4 KiB
Go
//go:build cgo
|
|
|
|
package audio
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"sync/atomic"
|
|
"unsafe"
|
|
)
|
|
|
|
// BatchReferenceManager handles batch reference counting operations
|
|
// to reduce atomic operation overhead for high-frequency frame operations
|
|
type BatchReferenceManager struct {
|
|
// Batch operations queue
|
|
batchQueue chan batchRefOperation
|
|
workerPool chan struct{} // Worker pool semaphore
|
|
running int32
|
|
wg sync.WaitGroup
|
|
|
|
// Statistics
|
|
batchedOps int64
|
|
singleOps int64
|
|
batchSavings int64 // Number of atomic operations saved
|
|
}
|
|
|
|
type batchRefOperation struct {
|
|
frames []*ZeroCopyAudioFrame
|
|
operation refOperationType
|
|
resultCh chan batchRefResult
|
|
}
|
|
|
|
type refOperationType int
|
|
|
|
const (
|
|
refOpAddRef refOperationType = iota
|
|
refOpRelease
|
|
refOpMixed // For operations with mixed AddRef/Release
|
|
)
|
|
|
|
// Errors
|
|
var (
|
|
ErrUnsupportedOperation = errors.New("unsupported batch reference operation")
|
|
)
|
|
|
|
type batchRefResult struct {
|
|
finalReleases []bool // For Release operations, indicates which frames had final release
|
|
err error
|
|
}
|
|
|
|
// Global batch reference manager
|
|
var (
|
|
globalBatchRefManager *BatchReferenceManager
|
|
batchRefOnce sync.Once
|
|
)
|
|
|
|
// GetBatchReferenceManager returns the global batch reference manager
|
|
func GetBatchReferenceManager() *BatchReferenceManager {
|
|
batchRefOnce.Do(func() {
|
|
globalBatchRefManager = NewBatchReferenceManager()
|
|
globalBatchRefManager.Start()
|
|
})
|
|
return globalBatchRefManager
|
|
}
|
|
|
|
// NewBatchReferenceManager creates a new batch reference manager
|
|
func NewBatchReferenceManager() *BatchReferenceManager {
|
|
return &BatchReferenceManager{
|
|
batchQueue: make(chan batchRefOperation, 256), // Buffered for high throughput
|
|
workerPool: make(chan struct{}, 4), // 4 workers for parallel processing
|
|
}
|
|
}
|
|
|
|
// Start starts the batch reference manager workers
|
|
func (brm *BatchReferenceManager) Start() {
|
|
if !atomic.CompareAndSwapInt32(&brm.running, 0, 1) {
|
|
return // Already running
|
|
}
|
|
|
|
// Start worker goroutines
|
|
for i := 0; i < cap(brm.workerPool); i++ {
|
|
brm.wg.Add(1)
|
|
go brm.worker()
|
|
}
|
|
}
|
|
|
|
// Stop stops the batch reference manager
|
|
func (brm *BatchReferenceManager) Stop() {
|
|
if !atomic.CompareAndSwapInt32(&brm.running, 1, 0) {
|
|
return // Already stopped
|
|
}
|
|
|
|
close(brm.batchQueue)
|
|
brm.wg.Wait()
|
|
}
|
|
|
|
// worker processes batch reference operations
|
|
func (brm *BatchReferenceManager) worker() {
|
|
defer brm.wg.Done()
|
|
|
|
for op := range brm.batchQueue {
|
|
brm.processBatchOperation(op)
|
|
}
|
|
}
|
|
|
|
// processBatchOperation processes a batch of reference operations
|
|
func (brm *BatchReferenceManager) processBatchOperation(op batchRefOperation) {
|
|
result := batchRefResult{}
|
|
|
|
switch op.operation {
|
|
case refOpAddRef:
|
|
// Batch AddRef operations
|
|
for _, frame := range op.frames {
|
|
if frame != nil {
|
|
atomic.AddInt32(&frame.refCount, 1)
|
|
}
|
|
}
|
|
atomic.AddInt64(&brm.batchedOps, int64(len(op.frames)))
|
|
atomic.AddInt64(&brm.batchSavings, int64(len(op.frames)-1)) // Saved ops vs individual calls
|
|
|
|
case refOpRelease:
|
|
// Batch Release operations
|
|
result.finalReleases = make([]bool, len(op.frames))
|
|
for i, frame := range op.frames {
|
|
if frame != nil {
|
|
newCount := atomic.AddInt32(&frame.refCount, -1)
|
|
if newCount == 0 {
|
|
result.finalReleases[i] = true
|
|
// Return to pool if pooled
|
|
if frame.pooled {
|
|
globalZeroCopyPool.Put(frame)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
atomic.AddInt64(&brm.batchedOps, int64(len(op.frames)))
|
|
atomic.AddInt64(&brm.batchSavings, int64(len(op.frames)-1))
|
|
|
|
case refOpMixed:
|
|
// Handle mixed operations (not implemented in this version)
|
|
result.err = ErrUnsupportedOperation
|
|
}
|
|
|
|
// Send result back
|
|
if op.resultCh != nil {
|
|
op.resultCh <- result
|
|
close(op.resultCh)
|
|
}
|
|
}
|
|
|
|
// BatchAddRef performs AddRef on multiple frames in a single batch
|
|
func (brm *BatchReferenceManager) BatchAddRef(frames []*ZeroCopyAudioFrame) error {
|
|
if len(frames) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// For small batches, use direct operations to avoid overhead
|
|
if len(frames) <= 2 {
|
|
for _, frame := range frames {
|
|
if frame != nil {
|
|
frame.AddRef()
|
|
}
|
|
}
|
|
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
|
|
return nil
|
|
}
|
|
|
|
// Use batch processing for larger sets
|
|
if atomic.LoadInt32(&brm.running) == 0 {
|
|
// Fallback to individual operations if batch manager not running
|
|
for _, frame := range frames {
|
|
if frame != nil {
|
|
frame.AddRef()
|
|
}
|
|
}
|
|
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
|
|
return nil
|
|
}
|
|
|
|
resultCh := make(chan batchRefResult, 1)
|
|
op := batchRefOperation{
|
|
frames: frames,
|
|
operation: refOpAddRef,
|
|
resultCh: resultCh,
|
|
}
|
|
|
|
select {
|
|
case brm.batchQueue <- op:
|
|
// Wait for completion
|
|
<-resultCh
|
|
return nil
|
|
default:
|
|
// Queue full, fallback to individual operations
|
|
for _, frame := range frames {
|
|
if frame != nil {
|
|
frame.AddRef()
|
|
}
|
|
}
|
|
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// BatchRelease performs Release on multiple frames in a single batch
|
|
// Returns a slice indicating which frames had their final reference released
|
|
func (brm *BatchReferenceManager) BatchRelease(frames []*ZeroCopyAudioFrame) ([]bool, error) {
|
|
if len(frames) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// For small batches, use direct operations
|
|
if len(frames) <= 2 {
|
|
finalReleases := make([]bool, len(frames))
|
|
for i, frame := range frames {
|
|
if frame != nil {
|
|
finalReleases[i] = frame.Release()
|
|
}
|
|
}
|
|
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
|
|
return finalReleases, nil
|
|
}
|
|
|
|
// Use batch processing for larger sets
|
|
if atomic.LoadInt32(&brm.running) == 0 {
|
|
// Fallback to individual operations
|
|
finalReleases := make([]bool, len(frames))
|
|
for i, frame := range frames {
|
|
if frame != nil {
|
|
finalReleases[i] = frame.Release()
|
|
}
|
|
}
|
|
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
|
|
return finalReleases, nil
|
|
}
|
|
|
|
resultCh := make(chan batchRefResult, 1)
|
|
op := batchRefOperation{
|
|
frames: frames,
|
|
operation: refOpRelease,
|
|
resultCh: resultCh,
|
|
}
|
|
|
|
select {
|
|
case brm.batchQueue <- op:
|
|
// Wait for completion
|
|
result := <-resultCh
|
|
return result.finalReleases, result.err
|
|
default:
|
|
// Queue full, fallback to individual operations
|
|
finalReleases := make([]bool, len(frames))
|
|
for i, frame := range frames {
|
|
if frame != nil {
|
|
finalReleases[i] = frame.Release()
|
|
}
|
|
}
|
|
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
|
|
return finalReleases, nil
|
|
}
|
|
}
|
|
|
|
// GetStats returns batch reference counting statistics
|
|
func (brm *BatchReferenceManager) GetStats() (batchedOps, singleOps, savings int64) {
|
|
return atomic.LoadInt64(&brm.batchedOps),
|
|
atomic.LoadInt64(&brm.singleOps),
|
|
atomic.LoadInt64(&brm.batchSavings)
|
|
}
|
|
|
|
// Convenience functions for global batch reference manager
|
|
|
|
// BatchAddRefFrames performs batch AddRef on multiple frames
|
|
func BatchAddRefFrames(frames []*ZeroCopyAudioFrame) error {
|
|
return GetBatchReferenceManager().BatchAddRef(frames)
|
|
}
|
|
|
|
// BatchReleaseFrames performs batch Release on multiple frames
|
|
func BatchReleaseFrames(frames []*ZeroCopyAudioFrame) ([]bool, error) {
|
|
return GetBatchReferenceManager().BatchRelease(frames)
|
|
}
|
|
|
|
// GetBatchReferenceStats returns global batch reference statistics
|
|
func GetBatchReferenceStats() (batchedOps, singleOps, savings int64) {
|
|
return GetBatchReferenceManager().GetStats()
|
|
}
|
|
|
|
// ZeroCopyFrameSlice provides utilities for working with slices of zero-copy frames
|
|
type ZeroCopyFrameSlice []*ZeroCopyAudioFrame
|
|
|
|
// AddRefAll performs batch AddRef on all frames in the slice
|
|
func (zfs ZeroCopyFrameSlice) AddRefAll() error {
|
|
return BatchAddRefFrames(zfs)
|
|
}
|
|
|
|
// ReleaseAll performs batch Release on all frames in the slice
|
|
func (zfs ZeroCopyFrameSlice) ReleaseAll() ([]bool, error) {
|
|
return BatchReleaseFrames(zfs)
|
|
}
|
|
|
|
// FilterNonNil returns a new slice with only non-nil frames
|
|
func (zfs ZeroCopyFrameSlice) FilterNonNil() ZeroCopyFrameSlice {
|
|
filtered := make(ZeroCopyFrameSlice, 0, len(zfs))
|
|
for _, frame := range zfs {
|
|
if frame != nil {
|
|
filtered = append(filtered, frame)
|
|
}
|
|
}
|
|
return filtered
|
|
}
|
|
|
|
// Len returns the number of frames in the slice
|
|
func (zfs ZeroCopyFrameSlice) Len() int {
|
|
return len(zfs)
|
|
}
|
|
|
|
// Get returns the frame at the specified index
|
|
func (zfs ZeroCopyFrameSlice) Get(index int) *ZeroCopyAudioFrame {
|
|
if index < 0 || index >= len(zfs) {
|
|
return nil
|
|
}
|
|
return zfs[index]
|
|
}
|
|
|
|
// UnsafePointers returns unsafe pointers for all frames (for CGO batch operations)
|
|
func (zfs ZeroCopyFrameSlice) UnsafePointers() []unsafe.Pointer {
|
|
pointers := make([]unsafe.Pointer, len(zfs))
|
|
for i, frame := range zfs {
|
|
if frame != nil {
|
|
pointers[i] = frame.UnsafePointer()
|
|
}
|
|
}
|
|
return pointers
|
|
}
|