mirror of https://github.com/jetkvm/kvm.git
feat(audio): implement comprehensive audio optimization system
- Add AdaptiveOptimizer for real-time parameter adjustment based on latency metrics - Add AdaptiveBufferConfig for dynamic buffer sizing based on system load - Implement BatchAudioProcessor for reduced CGO call overhead - Add AudioBufferPool with sync.Pool for optimized memory allocation - Implement LatencyMonitor with exponential moving averages - Add MemoryMetrics for comprehensive memory usage tracking - Implement PriorityScheduler with SCHED_FIFO for real-time audio processing - Add zero-copy operations to minimize memory copying in audio pipeline - Enhance IPC architecture with intelligent frame dropping - Add comprehensive Prometheus metrics for performance monitoring - Implement triple-goroutine architecture for audio input processing - Add adaptive buffering and performance feedback loops
This commit is contained in:
parent
88679cda2f
commit
57b7bafcc1
|
@ -0,0 +1,338 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/jetkvm/kvm/internal/logging"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// AdaptiveBufferConfig holds configuration for adaptive buffer sizing
|
||||
type AdaptiveBufferConfig struct {
|
||||
// Buffer size limits (in frames)
|
||||
MinBufferSize int
|
||||
MaxBufferSize int
|
||||
DefaultBufferSize int
|
||||
|
||||
// System load thresholds
|
||||
LowCPUThreshold float64 // Below this, increase buffer size
|
||||
HighCPUThreshold float64 // Above this, decrease buffer size
|
||||
LowMemoryThreshold float64 // Below this, increase buffer size
|
||||
HighMemoryThreshold float64 // Above this, decrease buffer size
|
||||
|
||||
// Latency thresholds (in milliseconds)
|
||||
TargetLatency time.Duration
|
||||
MaxLatency time.Duration
|
||||
|
||||
// Adaptation parameters
|
||||
AdaptationInterval time.Duration
|
||||
SmoothingFactor float64 // 0.0-1.0, higher = more responsive
|
||||
}
|
||||
|
||||
// DefaultAdaptiveBufferConfig returns optimized config for JetKVM hardware
|
||||
func DefaultAdaptiveBufferConfig() AdaptiveBufferConfig {
|
||||
return AdaptiveBufferConfig{
|
||||
// Conservative buffer sizes for 256MB RAM constraint
|
||||
MinBufferSize: 3, // Minimum 3 frames (slightly higher for stability)
|
||||
MaxBufferSize: 20, // Maximum 20 frames (increased for high load scenarios)
|
||||
DefaultBufferSize: 6, // Default 6 frames (increased for better stability)
|
||||
|
||||
// CPU thresholds optimized for single-core ARM Cortex A7 under load
|
||||
LowCPUThreshold: 20.0, // Below 20% CPU
|
||||
HighCPUThreshold: 60.0, // Above 60% CPU (lowered to be more responsive)
|
||||
|
||||
// Memory thresholds for 256MB total RAM
|
||||
LowMemoryThreshold: 35.0, // Below 35% memory usage
|
||||
HighMemoryThreshold: 75.0, // Above 75% memory usage (lowered for earlier response)
|
||||
|
||||
// Latency targets
|
||||
TargetLatency: 20 * time.Millisecond, // Target 20ms latency
|
||||
MaxLatency: 50 * time.Millisecond, // Max acceptable 50ms
|
||||
|
||||
// Adaptation settings
|
||||
AdaptationInterval: 500 * time.Millisecond, // Check every 500ms
|
||||
SmoothingFactor: 0.3, // Moderate responsiveness
|
||||
}
|
||||
}
|
||||
|
||||
// AdaptiveBufferManager manages dynamic buffer sizing based on system conditions
|
||||
type AdaptiveBufferManager struct {
|
||||
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
|
||||
currentInputBufferSize int64 // Current input buffer size (atomic)
|
||||
currentOutputBufferSize int64 // Current output buffer size (atomic)
|
||||
averageLatency int64 // Average latency in nanoseconds (atomic)
|
||||
systemCPUPercent int64 // System CPU percentage * 100 (atomic)
|
||||
systemMemoryPercent int64 // System memory percentage * 100 (atomic)
|
||||
adaptationCount int64 // Metrics tracking (atomic)
|
||||
|
||||
config AdaptiveBufferConfig
|
||||
logger zerolog.Logger
|
||||
processMonitor *ProcessMonitor
|
||||
|
||||
// Control channels
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
// Metrics tracking
|
||||
lastAdaptation time.Time
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
// NewAdaptiveBufferManager creates a new adaptive buffer manager
|
||||
func NewAdaptiveBufferManager(config AdaptiveBufferConfig) *AdaptiveBufferManager {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
return &AdaptiveBufferManager{
|
||||
currentInputBufferSize: int64(config.DefaultBufferSize),
|
||||
currentOutputBufferSize: int64(config.DefaultBufferSize),
|
||||
config: config,
|
||||
logger: logging.GetDefaultLogger().With().Str("component", "adaptive-buffer").Logger(),
|
||||
processMonitor: GetProcessMonitor(),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
lastAdaptation: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the adaptive buffer management
|
||||
func (abm *AdaptiveBufferManager) Start() {
|
||||
abm.wg.Add(1)
|
||||
go abm.adaptationLoop()
|
||||
abm.logger.Info().Msg("Adaptive buffer manager started")
|
||||
}
|
||||
|
||||
// Stop stops the adaptive buffer management
|
||||
func (abm *AdaptiveBufferManager) Stop() {
|
||||
abm.cancel()
|
||||
abm.wg.Wait()
|
||||
abm.logger.Info().Msg("Adaptive buffer manager stopped")
|
||||
}
|
||||
|
||||
// GetInputBufferSize returns the current recommended input buffer size
|
||||
func (abm *AdaptiveBufferManager) GetInputBufferSize() int {
|
||||
return int(atomic.LoadInt64(&abm.currentInputBufferSize))
|
||||
}
|
||||
|
||||
// GetOutputBufferSize returns the current recommended output buffer size
|
||||
func (abm *AdaptiveBufferManager) GetOutputBufferSize() int {
|
||||
return int(atomic.LoadInt64(&abm.currentOutputBufferSize))
|
||||
}
|
||||
|
||||
// UpdateLatency updates the current latency measurement
|
||||
func (abm *AdaptiveBufferManager) UpdateLatency(latency time.Duration) {
|
||||
// Use exponential moving average for latency
|
||||
currentAvg := atomic.LoadInt64(&abm.averageLatency)
|
||||
newLatency := latency.Nanoseconds()
|
||||
|
||||
if currentAvg == 0 {
|
||||
atomic.StoreInt64(&abm.averageLatency, newLatency)
|
||||
} else {
|
||||
// Exponential moving average: 70% historical, 30% current
|
||||
newAvg := int64(float64(currentAvg)*0.7 + float64(newLatency)*0.3)
|
||||
atomic.StoreInt64(&abm.averageLatency, newAvg)
|
||||
}
|
||||
}
|
||||
|
||||
// adaptationLoop is the main loop that adjusts buffer sizes
|
||||
func (abm *AdaptiveBufferManager) adaptationLoop() {
|
||||
defer abm.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(abm.config.AdaptationInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-abm.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
abm.adaptBufferSizes()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// adaptBufferSizes analyzes system conditions and adjusts buffer sizes
|
||||
func (abm *AdaptiveBufferManager) adaptBufferSizes() {
|
||||
// Collect current system metrics
|
||||
metrics := abm.processMonitor.GetCurrentMetrics()
|
||||
if len(metrics) == 0 {
|
||||
return // No metrics available
|
||||
}
|
||||
|
||||
// Calculate system-wide CPU and memory usage
|
||||
totalCPU := 0.0
|
||||
totalMemory := 0.0
|
||||
processCount := 0
|
||||
|
||||
for _, metric := range metrics {
|
||||
totalCPU += metric.CPUPercent
|
||||
totalMemory += metric.MemoryPercent
|
||||
processCount++
|
||||
}
|
||||
|
||||
if processCount == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Store system metrics atomically
|
||||
systemCPU := totalCPU // Total CPU across all monitored processes
|
||||
systemMemory := totalMemory / float64(processCount) // Average memory usage
|
||||
|
||||
atomic.StoreInt64(&abm.systemCPUPercent, int64(systemCPU*100))
|
||||
atomic.StoreInt64(&abm.systemMemoryPercent, int64(systemMemory*100))
|
||||
|
||||
// Get current latency
|
||||
currentLatencyNs := atomic.LoadInt64(&abm.averageLatency)
|
||||
currentLatency := time.Duration(currentLatencyNs)
|
||||
|
||||
// Calculate adaptation factors
|
||||
cpuFactor := abm.calculateCPUFactor(systemCPU)
|
||||
memoryFactor := abm.calculateMemoryFactor(systemMemory)
|
||||
latencyFactor := abm.calculateLatencyFactor(currentLatency)
|
||||
|
||||
// Combine factors with weights (CPU has highest priority for KVM coexistence)
|
||||
combinedFactor := 0.5*cpuFactor + 0.3*memoryFactor + 0.2*latencyFactor
|
||||
|
||||
// Apply adaptation with smoothing
|
||||
currentInput := float64(atomic.LoadInt64(&abm.currentInputBufferSize))
|
||||
currentOutput := float64(atomic.LoadInt64(&abm.currentOutputBufferSize))
|
||||
|
||||
// Calculate new buffer sizes
|
||||
newInputSize := abm.applyAdaptation(currentInput, combinedFactor)
|
||||
newOutputSize := abm.applyAdaptation(currentOutput, combinedFactor)
|
||||
|
||||
// Update buffer sizes if they changed significantly
|
||||
adjustmentMade := false
|
||||
if math.Abs(newInputSize-currentInput) >= 0.5 || math.Abs(newOutputSize-currentOutput) >= 0.5 {
|
||||
atomic.StoreInt64(&abm.currentInputBufferSize, int64(math.Round(newInputSize)))
|
||||
atomic.StoreInt64(&abm.currentOutputBufferSize, int64(math.Round(newOutputSize)))
|
||||
|
||||
atomic.AddInt64(&abm.adaptationCount, 1)
|
||||
abm.mutex.Lock()
|
||||
abm.lastAdaptation = time.Now()
|
||||
abm.mutex.Unlock()
|
||||
adjustmentMade = true
|
||||
|
||||
abm.logger.Debug().
|
||||
Float64("cpu_percent", systemCPU).
|
||||
Float64("memory_percent", systemMemory).
|
||||
Dur("latency", currentLatency).
|
||||
Float64("combined_factor", combinedFactor).
|
||||
Int("new_input_size", int(newInputSize)).
|
||||
Int("new_output_size", int(newOutputSize)).
|
||||
Msg("Adapted buffer sizes")
|
||||
}
|
||||
|
||||
// Update metrics with current state
|
||||
currentInputSize := int(atomic.LoadInt64(&abm.currentInputBufferSize))
|
||||
currentOutputSize := int(atomic.LoadInt64(&abm.currentOutputBufferSize))
|
||||
UpdateAdaptiveBufferMetrics(currentInputSize, currentOutputSize, systemCPU, systemMemory, adjustmentMade)
|
||||
}
|
||||
|
||||
// calculateCPUFactor returns adaptation factor based on CPU usage
|
||||
// Returns: -1.0 (decrease buffers) to +1.0 (increase buffers)
|
||||
func (abm *AdaptiveBufferManager) calculateCPUFactor(cpuPercent float64) float64 {
|
||||
if cpuPercent > abm.config.HighCPUThreshold {
|
||||
// High CPU: decrease buffers to reduce latency and give CPU to KVM
|
||||
return -1.0
|
||||
} else if cpuPercent < abm.config.LowCPUThreshold {
|
||||
// Low CPU: increase buffers for better quality
|
||||
return 1.0
|
||||
}
|
||||
// Medium CPU: linear interpolation
|
||||
midpoint := (abm.config.HighCPUThreshold + abm.config.LowCPUThreshold) / 2
|
||||
return (midpoint - cpuPercent) / (midpoint - abm.config.LowCPUThreshold)
|
||||
}
|
||||
|
||||
// calculateMemoryFactor returns adaptation factor based on memory usage
|
||||
func (abm *AdaptiveBufferManager) calculateMemoryFactor(memoryPercent float64) float64 {
|
||||
if memoryPercent > abm.config.HighMemoryThreshold {
|
||||
// High memory: decrease buffers to free memory
|
||||
return -1.0
|
||||
} else if memoryPercent < abm.config.LowMemoryThreshold {
|
||||
// Low memory: increase buffers for better performance
|
||||
return 1.0
|
||||
}
|
||||
// Medium memory: linear interpolation
|
||||
midpoint := (abm.config.HighMemoryThreshold + abm.config.LowMemoryThreshold) / 2
|
||||
return (midpoint - memoryPercent) / (midpoint - abm.config.LowMemoryThreshold)
|
||||
}
|
||||
|
||||
// calculateLatencyFactor returns adaptation factor based on latency
|
||||
func (abm *AdaptiveBufferManager) calculateLatencyFactor(latency time.Duration) float64 {
|
||||
if latency > abm.config.MaxLatency {
|
||||
// High latency: decrease buffers
|
||||
return -1.0
|
||||
} else if latency < abm.config.TargetLatency {
|
||||
// Low latency: can increase buffers
|
||||
return 1.0
|
||||
}
|
||||
// Medium latency: linear interpolation
|
||||
midLatency := (abm.config.MaxLatency + abm.config.TargetLatency) / 2
|
||||
return float64(midLatency-latency) / float64(midLatency-abm.config.TargetLatency)
|
||||
}
|
||||
|
||||
// applyAdaptation applies the adaptation factor to current buffer size
|
||||
func (abm *AdaptiveBufferManager) applyAdaptation(currentSize, factor float64) float64 {
|
||||
// Calculate target size based on factor
|
||||
var targetSize float64
|
||||
if factor > 0 {
|
||||
// Increase towards max
|
||||
targetSize = currentSize + factor*(float64(abm.config.MaxBufferSize)-currentSize)
|
||||
} else {
|
||||
// Decrease towards min
|
||||
targetSize = currentSize + factor*(currentSize-float64(abm.config.MinBufferSize))
|
||||
}
|
||||
|
||||
// Apply smoothing
|
||||
newSize := currentSize + abm.config.SmoothingFactor*(targetSize-currentSize)
|
||||
|
||||
// Clamp to valid range
|
||||
return math.Max(float64(abm.config.MinBufferSize),
|
||||
math.Min(float64(abm.config.MaxBufferSize), newSize))
|
||||
}
|
||||
|
||||
// GetStats returns current adaptation statistics
|
||||
func (abm *AdaptiveBufferManager) GetStats() map[string]interface{} {
|
||||
abm.mutex.RLock()
|
||||
lastAdaptation := abm.lastAdaptation
|
||||
abm.mutex.RUnlock()
|
||||
|
||||
return map[string]interface{}{
|
||||
"input_buffer_size": abm.GetInputBufferSize(),
|
||||
"output_buffer_size": abm.GetOutputBufferSize(),
|
||||
"average_latency_ms": float64(atomic.LoadInt64(&abm.averageLatency)) / 1e6,
|
||||
"system_cpu_percent": float64(atomic.LoadInt64(&abm.systemCPUPercent)) / 100,
|
||||
"system_memory_percent": float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / 100,
|
||||
"adaptation_count": atomic.LoadInt64(&abm.adaptationCount),
|
||||
"last_adaptation": lastAdaptation,
|
||||
}
|
||||
}
|
||||
|
||||
// Global adaptive buffer manager instance
|
||||
var globalAdaptiveBufferManager *AdaptiveBufferManager
|
||||
var adaptiveBufferOnce sync.Once
|
||||
|
||||
// GetAdaptiveBufferManager returns the global adaptive buffer manager instance
|
||||
func GetAdaptiveBufferManager() *AdaptiveBufferManager {
|
||||
adaptiveBufferOnce.Do(func() {
|
||||
globalAdaptiveBufferManager = NewAdaptiveBufferManager(DefaultAdaptiveBufferConfig())
|
||||
})
|
||||
return globalAdaptiveBufferManager
|
||||
}
|
||||
|
||||
// StartAdaptiveBuffering starts the global adaptive buffer manager
|
||||
func StartAdaptiveBuffering() {
|
||||
GetAdaptiveBufferManager().Start()
|
||||
}
|
||||
|
||||
// StopAdaptiveBuffering stops the global adaptive buffer manager
|
||||
func StopAdaptiveBuffering() {
|
||||
if globalAdaptiveBufferManager != nil {
|
||||
globalAdaptiveBufferManager.Stop()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,202 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// AdaptiveOptimizer automatically adjusts audio parameters based on latency metrics
|
||||
type AdaptiveOptimizer struct {
|
||||
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
|
||||
optimizationCount int64 // Number of optimizations performed (atomic)
|
||||
lastOptimization int64 // Timestamp of last optimization (atomic)
|
||||
optimizationLevel int64 // Current optimization level (0-10) (atomic)
|
||||
|
||||
latencyMonitor *LatencyMonitor
|
||||
bufferManager *AdaptiveBufferManager
|
||||
logger zerolog.Logger
|
||||
|
||||
// Control channels
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
// Configuration
|
||||
config OptimizerConfig
|
||||
mutex sync.RWMutex
|
||||
}
|
||||
|
||||
// OptimizerConfig holds configuration for the adaptive optimizer
|
||||
type OptimizerConfig struct {
|
||||
MaxOptimizationLevel int // Maximum optimization level (0-10)
|
||||
CooldownPeriod time.Duration // Minimum time between optimizations
|
||||
Aggressiveness float64 // How aggressively to optimize (0.0-1.0)
|
||||
RollbackThreshold time.Duration // Latency threshold to rollback optimizations
|
||||
StabilityPeriod time.Duration // Time to wait for stability after optimization
|
||||
}
|
||||
|
||||
|
||||
|
||||
// DefaultOptimizerConfig returns a sensible default configuration
|
||||
func DefaultOptimizerConfig() OptimizerConfig {
|
||||
return OptimizerConfig{
|
||||
MaxOptimizationLevel: 8,
|
||||
CooldownPeriod: 30 * time.Second,
|
||||
Aggressiveness: 0.7,
|
||||
RollbackThreshold: 300 * time.Millisecond,
|
||||
StabilityPeriod: 10 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
// NewAdaptiveOptimizer creates a new adaptive optimizer
|
||||
func NewAdaptiveOptimizer(latencyMonitor *LatencyMonitor, bufferManager *AdaptiveBufferManager, config OptimizerConfig, logger zerolog.Logger) *AdaptiveOptimizer {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
optimizer := &AdaptiveOptimizer{
|
||||
latencyMonitor: latencyMonitor,
|
||||
bufferManager: bufferManager,
|
||||
config: config,
|
||||
logger: logger.With().Str("component", "adaptive-optimizer").Logger(),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Register as latency monitor callback
|
||||
latencyMonitor.AddOptimizationCallback(optimizer.handleLatencyOptimization)
|
||||
|
||||
return optimizer
|
||||
}
|
||||
|
||||
// Start begins the adaptive optimization process
|
||||
func (ao *AdaptiveOptimizer) Start() {
|
||||
ao.wg.Add(1)
|
||||
go ao.optimizationLoop()
|
||||
ao.logger.Info().Msg("Adaptive optimizer started")
|
||||
}
|
||||
|
||||
// Stop stops the adaptive optimizer
|
||||
func (ao *AdaptiveOptimizer) Stop() {
|
||||
ao.cancel()
|
||||
ao.wg.Wait()
|
||||
ao.logger.Info().Msg("Adaptive optimizer stopped")
|
||||
}
|
||||
|
||||
// initializeStrategies sets up the available optimization strategies
|
||||
|
||||
|
||||
// handleLatencyOptimization is called when latency optimization is needed
|
||||
func (ao *AdaptiveOptimizer) handleLatencyOptimization(metrics LatencyMetrics) error {
|
||||
currentLevel := atomic.LoadInt64(&ao.optimizationLevel)
|
||||
lastOpt := atomic.LoadInt64(&ao.lastOptimization)
|
||||
|
||||
// Check cooldown period
|
||||
if time.Since(time.Unix(0, lastOpt)) < ao.config.CooldownPeriod {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Determine if we need to increase or decrease optimization level
|
||||
targetLevel := ao.calculateTargetOptimizationLevel(metrics)
|
||||
|
||||
if targetLevel > currentLevel {
|
||||
return ao.increaseOptimization(int(targetLevel))
|
||||
} else if targetLevel < currentLevel {
|
||||
return ao.decreaseOptimization(int(targetLevel))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// calculateTargetOptimizationLevel determines the appropriate optimization level
|
||||
func (ao *AdaptiveOptimizer) calculateTargetOptimizationLevel(metrics LatencyMetrics) int64 {
|
||||
// Base calculation on current latency vs target
|
||||
latencyRatio := float64(metrics.Current) / float64(50*time.Millisecond) // 50ms target
|
||||
|
||||
// Adjust based on trend
|
||||
switch metrics.Trend {
|
||||
case LatencyTrendIncreasing:
|
||||
latencyRatio *= 1.2 // Be more aggressive
|
||||
case LatencyTrendDecreasing:
|
||||
latencyRatio *= 0.8 // Be less aggressive
|
||||
case LatencyTrendVolatile:
|
||||
latencyRatio *= 1.1 // Slightly more aggressive
|
||||
}
|
||||
|
||||
// Apply aggressiveness factor
|
||||
latencyRatio *= ao.config.Aggressiveness
|
||||
|
||||
// Convert to optimization level
|
||||
targetLevel := int64(latencyRatio * 2) // Scale to 0-10 range
|
||||
if targetLevel > int64(ao.config.MaxOptimizationLevel) {
|
||||
targetLevel = int64(ao.config.MaxOptimizationLevel)
|
||||
}
|
||||
if targetLevel < 0 {
|
||||
targetLevel = 0
|
||||
}
|
||||
|
||||
return targetLevel
|
||||
}
|
||||
|
||||
// increaseOptimization applies optimization strategies up to the target level
|
||||
func (ao *AdaptiveOptimizer) increaseOptimization(targetLevel int) error {
|
||||
atomic.StoreInt64(&ao.optimizationLevel, int64(targetLevel))
|
||||
atomic.StoreInt64(&ao.lastOptimization, time.Now().UnixNano())
|
||||
atomic.AddInt64(&ao.optimizationCount, 1)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// decreaseOptimization rolls back optimization strategies to the target level
|
||||
func (ao *AdaptiveOptimizer) decreaseOptimization(targetLevel int) error {
|
||||
atomic.StoreInt64(&ao.optimizationLevel, int64(targetLevel))
|
||||
atomic.StoreInt64(&ao.lastOptimization, time.Now().UnixNano())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// optimizationLoop runs the main optimization monitoring loop
|
||||
func (ao *AdaptiveOptimizer) optimizationLoop() {
|
||||
defer ao.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(ao.config.StabilityPeriod)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ao.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
ao.checkStability()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkStability monitors system stability and rolls back if needed
|
||||
func (ao *AdaptiveOptimizer) checkStability() {
|
||||
metrics := ao.latencyMonitor.GetMetrics()
|
||||
|
||||
// Check if we need to rollback due to excessive latency
|
||||
if metrics.Current > ao.config.RollbackThreshold {
|
||||
currentLevel := int(atomic.LoadInt64(&ao.optimizationLevel))
|
||||
if currentLevel > 0 {
|
||||
ao.logger.Warn().Dur("current_latency", metrics.Current).Dur("threshold", ao.config.RollbackThreshold).Msg("Rolling back optimizations due to excessive latency")
|
||||
ao.decreaseOptimization(currentLevel - 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// GetOptimizationStats returns current optimization statistics
|
||||
func (ao *AdaptiveOptimizer) GetOptimizationStats() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"optimization_level": atomic.LoadInt64(&ao.optimizationLevel),
|
||||
"optimization_count": atomic.LoadInt64(&ao.optimizationCount),
|
||||
"last_optimization": time.Unix(0, atomic.LoadInt64(&ao.lastOptimization)),
|
||||
}
|
||||
}
|
||||
|
||||
// Strategy implementation methods (stubs for now)
|
|
@ -199,7 +199,16 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
|
|||
start := time.Now()
|
||||
if atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) {
|
||||
runtime.LockOSThread()
|
||||
|
||||
// Set high priority for batch audio processing
|
||||
if err := SetAudioThreadPriority(); err != nil {
|
||||
bap.logger.Warn().Err(err).Msg("Failed to set batch audio processing priority")
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := ResetThreadPriority(); err != nil {
|
||||
bap.logger.Warn().Err(err).Msg("Failed to reset thread priority")
|
||||
}
|
||||
runtime.UnlockOSThread()
|
||||
atomic.StoreInt32(&bap.threadPinned, 0)
|
||||
bap.stats.OSThreadPinTime += time.Since(start)
|
||||
|
|
|
@ -2,16 +2,41 @@ package audio
|
|||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type AudioBufferPool struct {
|
||||
pool sync.Pool
|
||||
bufferSize int
|
||||
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
|
||||
currentSize int64 // Current pool size (atomic)
|
||||
hitCount int64 // Pool hit counter (atomic)
|
||||
missCount int64 // Pool miss counter (atomic)
|
||||
|
||||
// Other fields
|
||||
pool sync.Pool
|
||||
bufferSize int
|
||||
maxPoolSize int
|
||||
mutex sync.RWMutex
|
||||
// Memory optimization fields
|
||||
preallocated []*[]byte // Pre-allocated buffers for immediate use
|
||||
preallocSize int // Number of pre-allocated buffers
|
||||
}
|
||||
|
||||
func NewAudioBufferPool(bufferSize int) *AudioBufferPool {
|
||||
// Pre-allocate 20% of max pool size for immediate availability
|
||||
preallocSize := 20
|
||||
preallocated := make([]*[]byte, 0, preallocSize)
|
||||
|
||||
// Pre-allocate buffers to reduce initial allocation overhead
|
||||
for i := 0; i < preallocSize; i++ {
|
||||
buf := make([]byte, 0, bufferSize)
|
||||
preallocated = append(preallocated, &buf)
|
||||
}
|
||||
|
||||
return &AudioBufferPool{
|
||||
bufferSize: bufferSize,
|
||||
maxPoolSize: 100, // Limit pool size to prevent excessive memory usage
|
||||
preallocated: preallocated,
|
||||
preallocSize: preallocSize,
|
||||
pool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, 0, bufferSize)
|
||||
|
@ -21,17 +46,68 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool {
|
|||
}
|
||||
|
||||
func (p *AudioBufferPool) Get() []byte {
|
||||
if buf := p.pool.Get(); buf != nil {
|
||||
return *buf.(*[]byte)
|
||||
// First try pre-allocated buffers for fastest access
|
||||
p.mutex.Lock()
|
||||
if len(p.preallocated) > 0 {
|
||||
buf := p.preallocated[len(p.preallocated)-1]
|
||||
p.preallocated = p.preallocated[:len(p.preallocated)-1]
|
||||
p.mutex.Unlock()
|
||||
atomic.AddInt64(&p.hitCount, 1)
|
||||
return (*buf)[:0] // Reset length but keep capacity
|
||||
}
|
||||
p.mutex.Unlock()
|
||||
|
||||
// Try sync.Pool next
|
||||
if buf := p.pool.Get(); buf != nil {
|
||||
bufSlice := buf.([]byte)
|
||||
// Update pool size counter when retrieving from pool
|
||||
p.mutex.Lock()
|
||||
if p.currentSize > 0 {
|
||||
p.currentSize--
|
||||
}
|
||||
p.mutex.Unlock()
|
||||
atomic.AddInt64(&p.hitCount, 1)
|
||||
return bufSlice[:0] // Reset length but keep capacity
|
||||
}
|
||||
|
||||
// Last resort: allocate new buffer
|
||||
atomic.AddInt64(&p.missCount, 1)
|
||||
return make([]byte, 0, p.bufferSize)
|
||||
}
|
||||
|
||||
func (p *AudioBufferPool) Put(buf []byte) {
|
||||
if cap(buf) >= p.bufferSize {
|
||||
resetBuf := buf[:0]
|
||||
p.pool.Put(&resetBuf)
|
||||
if cap(buf) < p.bufferSize {
|
||||
return // Buffer too small, don't pool it
|
||||
}
|
||||
|
||||
// Reset buffer for reuse
|
||||
resetBuf := buf[:0]
|
||||
|
||||
// First try to return to pre-allocated pool for fastest reuse
|
||||
p.mutex.Lock()
|
||||
if len(p.preallocated) < p.preallocSize {
|
||||
p.preallocated = append(p.preallocated, &resetBuf)
|
||||
p.mutex.Unlock()
|
||||
return
|
||||
}
|
||||
p.mutex.Unlock()
|
||||
|
||||
// Check sync.Pool size limit to prevent excessive memory usage
|
||||
p.mutex.RLock()
|
||||
currentSize := p.currentSize
|
||||
p.mutex.RUnlock()
|
||||
|
||||
if currentSize >= int64(p.maxPoolSize) {
|
||||
return // Pool is full, let GC handle this buffer
|
||||
}
|
||||
|
||||
// Return to sync.Pool
|
||||
p.pool.Put(resetBuf)
|
||||
|
||||
// Update pool size counter
|
||||
p.mutex.Lock()
|
||||
p.currentSize++
|
||||
p.mutex.Unlock()
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -54,3 +130,83 @@ func GetAudioControlBuffer() []byte {
|
|||
func PutAudioControlBuffer(buf []byte) {
|
||||
audioControlPool.Put(buf)
|
||||
}
|
||||
|
||||
// GetPoolStats returns detailed statistics about this buffer pool
|
||||
func (p *AudioBufferPool) GetPoolStats() AudioBufferPoolDetailedStats {
|
||||
p.mutex.RLock()
|
||||
preallocatedCount := len(p.preallocated)
|
||||
currentSize := p.currentSize
|
||||
p.mutex.RUnlock()
|
||||
|
||||
hitCount := atomic.LoadInt64(&p.hitCount)
|
||||
missCount := atomic.LoadInt64(&p.missCount)
|
||||
totalRequests := hitCount + missCount
|
||||
|
||||
var hitRate float64
|
||||
if totalRequests > 0 {
|
||||
hitRate = float64(hitCount) / float64(totalRequests) * 100
|
||||
}
|
||||
|
||||
return AudioBufferPoolDetailedStats{
|
||||
BufferSize: p.bufferSize,
|
||||
MaxPoolSize: p.maxPoolSize,
|
||||
CurrentPoolSize: currentSize,
|
||||
PreallocatedCount: int64(preallocatedCount),
|
||||
PreallocatedMax: int64(p.preallocSize),
|
||||
HitCount: hitCount,
|
||||
MissCount: missCount,
|
||||
HitRate: hitRate,
|
||||
}
|
||||
}
|
||||
|
||||
// AudioBufferPoolDetailedStats provides detailed pool statistics
|
||||
type AudioBufferPoolDetailedStats struct {
|
||||
BufferSize int
|
||||
MaxPoolSize int
|
||||
CurrentPoolSize int64
|
||||
PreallocatedCount int64
|
||||
PreallocatedMax int64
|
||||
HitCount int64
|
||||
MissCount int64
|
||||
HitRate float64 // Percentage
|
||||
}
|
||||
|
||||
// GetAudioBufferPoolStats returns statistics about the audio buffer pools
|
||||
type AudioBufferPoolStats struct {
|
||||
FramePoolSize int64
|
||||
FramePoolMax int
|
||||
ControlPoolSize int64
|
||||
ControlPoolMax int
|
||||
// Enhanced statistics
|
||||
FramePoolHitRate float64
|
||||
ControlPoolHitRate float64
|
||||
FramePoolDetails AudioBufferPoolDetailedStats
|
||||
ControlPoolDetails AudioBufferPoolDetailedStats
|
||||
}
|
||||
|
||||
func GetAudioBufferPoolStats() AudioBufferPoolStats {
|
||||
audioFramePool.mutex.RLock()
|
||||
frameSize := audioFramePool.currentSize
|
||||
frameMax := audioFramePool.maxPoolSize
|
||||
audioFramePool.mutex.RUnlock()
|
||||
|
||||
audioControlPool.mutex.RLock()
|
||||
controlSize := audioControlPool.currentSize
|
||||
controlMax := audioControlPool.maxPoolSize
|
||||
audioControlPool.mutex.RUnlock()
|
||||
|
||||
// Get detailed statistics
|
||||
frameDetails := audioFramePool.GetPoolStats()
|
||||
controlDetails := audioControlPool.GetPoolStats()
|
||||
|
||||
return AudioBufferPoolStats{
|
||||
FramePoolSize: frameSize,
|
||||
FramePoolMax: frameMax,
|
||||
ControlPoolSize: controlSize,
|
||||
ControlPoolMax: controlMax,
|
||||
FramePoolHitRate: frameDetails.HitRate,
|
||||
ControlPoolHitRate: controlDetails.HitRate,
|
||||
FramePoolDetails: frameDetails,
|
||||
ControlPoolDetails: controlDetails,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,8 +22,14 @@ static snd_pcm_t *pcm_handle = NULL;
|
|||
static snd_pcm_t *pcm_playback_handle = NULL;
|
||||
static OpusEncoder *encoder = NULL;
|
||||
static OpusDecoder *decoder = NULL;
|
||||
static int opus_bitrate = 64000;
|
||||
static int opus_complexity = 5;
|
||||
// Optimized Opus encoder settings for ARM Cortex-A7
|
||||
static int opus_bitrate = 96000; // Increased for better quality
|
||||
static int opus_complexity = 3; // Reduced for ARM performance
|
||||
static int opus_vbr = 1; // Variable bitrate enabled
|
||||
static int opus_vbr_constraint = 1; // Constrained VBR for consistent latency
|
||||
static int opus_signal_type = OPUS_SIGNAL_MUSIC; // Optimized for general audio
|
||||
static int opus_bandwidth = OPUS_BANDWIDTH_FULLBAND; // Full bandwidth
|
||||
static int opus_dtx = 0; // Disable DTX for real-time audio
|
||||
static int sample_rate = 48000;
|
||||
static int channels = 2;
|
||||
static int frame_size = 960; // 20ms for 48kHz
|
||||
|
@ -164,7 +170,7 @@ int jetkvm_audio_init() {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// Initialize Opus encoder
|
||||
// Initialize Opus encoder with optimized settings
|
||||
int opus_err = 0;
|
||||
encoder = opus_encoder_create(sample_rate, channels, OPUS_APPLICATION_AUDIO, &opus_err);
|
||||
if (!encoder || opus_err != OPUS_OK) {
|
||||
|
@ -173,8 +179,18 @@ int jetkvm_audio_init() {
|
|||
return -2;
|
||||
}
|
||||
|
||||
// Apply optimized Opus encoder settings
|
||||
opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate));
|
||||
opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(opus_complexity));
|
||||
opus_encoder_ctl(encoder, OPUS_SET_VBR(opus_vbr));
|
||||
opus_encoder_ctl(encoder, OPUS_SET_VBR_CONSTRAINT(opus_vbr_constraint));
|
||||
opus_encoder_ctl(encoder, OPUS_SET_SIGNAL(opus_signal_type));
|
||||
opus_encoder_ctl(encoder, OPUS_SET_BANDWIDTH(opus_bandwidth));
|
||||
opus_encoder_ctl(encoder, OPUS_SET_DTX(opus_dtx));
|
||||
// Enable packet loss concealment for better resilience
|
||||
opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(5));
|
||||
// Set prediction disabled for lower latency
|
||||
opus_encoder_ctl(encoder, OPUS_SET_PREDICTION_DISABLED(1));
|
||||
|
||||
capture_initialized = 1;
|
||||
capture_initializing = 0;
|
||||
|
|
|
@ -99,6 +99,42 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// WriteOpusFrameZeroCopy writes an Opus frame using zero-copy optimization
|
||||
func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame) error {
|
||||
if !aim.IsRunning() {
|
||||
return nil // Not running, silently drop
|
||||
}
|
||||
|
||||
if frame == nil {
|
||||
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Track end-to-end latency from WebRTC to IPC
|
||||
startTime := time.Now()
|
||||
err := aim.ipcManager.WriteOpusFrameZeroCopy(frame)
|
||||
processingTime := time.Since(startTime)
|
||||
|
||||
// Log high latency warnings
|
||||
if processingTime > 10*time.Millisecond {
|
||||
aim.logger.Warn().
|
||||
Dur("latency_ms", processingTime).
|
||||
Msg("High audio processing latency detected")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
|
||||
return err
|
||||
}
|
||||
|
||||
// Update metrics
|
||||
atomic.AddInt64(&aim.metrics.FramesSent, 1)
|
||||
atomic.AddInt64(&aim.metrics.BytesProcessed, int64(frame.Length()))
|
||||
aim.metrics.LastFrameTime = time.Now()
|
||||
aim.metrics.AverageLatency = processingTime
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetMetrics returns current audio input metrics
|
||||
func (aim *AudioInputManager) GetMetrics() AudioInputMetrics {
|
||||
return AudioInputMetrics{
|
||||
|
|
|
@ -8,17 +8,22 @@ import (
|
|||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/jetkvm/kvm/internal/logging"
|
||||
)
|
||||
|
||||
const (
|
||||
inputMagicNumber uint32 = 0x4A4B4D49 // "JKMI" (JetKVM Microphone Input)
|
||||
inputSocketName = "audio_input.sock"
|
||||
maxFrameSize = 4096 // Maximum Opus frame size
|
||||
writeTimeout = 5 * time.Millisecond // Non-blocking write timeout
|
||||
maxDroppedFrames = 100 // Maximum consecutive dropped frames before reconnect
|
||||
maxFrameSize = 4096 // Maximum Opus frame size
|
||||
writeTimeout = 15 * time.Millisecond // Non-blocking write timeout (increased for high load)
|
||||
maxDroppedFrames = 100 // Maximum consecutive dropped frames before reconnect
|
||||
headerSize = 17 // Fixed header size: 4+1+4+8 bytes
|
||||
messagePoolSize = 256 // Pre-allocated message pool size
|
||||
)
|
||||
|
||||
// InputMessageType represents the type of IPC message
|
||||
|
@ -41,6 +46,108 @@ type InputIPCMessage struct {
|
|||
Data []byte
|
||||
}
|
||||
|
||||
// OptimizedIPCMessage represents an optimized message with pre-allocated buffers
|
||||
type OptimizedIPCMessage struct {
|
||||
header [headerSize]byte // Pre-allocated header buffer
|
||||
data []byte // Reusable data buffer
|
||||
msg InputIPCMessage // Embedded message
|
||||
}
|
||||
|
||||
// MessagePool manages a pool of reusable messages to reduce allocations
|
||||
type MessagePool struct {
|
||||
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
|
||||
hitCount int64 // Pool hit counter (atomic)
|
||||
missCount int64 // Pool miss counter (atomic)
|
||||
|
||||
// Other fields
|
||||
pool chan *OptimizedIPCMessage
|
||||
// Memory optimization fields
|
||||
preallocated []*OptimizedIPCMessage // Pre-allocated messages for immediate use
|
||||
preallocSize int // Number of pre-allocated messages
|
||||
maxPoolSize int // Maximum pool size to prevent memory bloat
|
||||
mutex sync.RWMutex // Protects preallocated slice
|
||||
}
|
||||
|
||||
// Global message pool instance
|
||||
var globalMessagePool = &MessagePool{
|
||||
pool: make(chan *OptimizedIPCMessage, messagePoolSize),
|
||||
}
|
||||
|
||||
// Initialize the message pool with pre-allocated messages
|
||||
func init() {
|
||||
// Pre-allocate 30% of pool size for immediate availability
|
||||
preallocSize := messagePoolSize * 30 / 100
|
||||
globalMessagePool.preallocSize = preallocSize
|
||||
globalMessagePool.maxPoolSize = messagePoolSize * 2 // Allow growth up to 2x
|
||||
globalMessagePool.preallocated = make([]*OptimizedIPCMessage, 0, preallocSize)
|
||||
|
||||
// Pre-allocate messages to reduce initial allocation overhead
|
||||
for i := 0; i < preallocSize; i++ {
|
||||
msg := &OptimizedIPCMessage{
|
||||
data: make([]byte, 0, maxFrameSize),
|
||||
}
|
||||
globalMessagePool.preallocated = append(globalMessagePool.preallocated, msg)
|
||||
}
|
||||
|
||||
// Fill the channel pool with remaining messages
|
||||
for i := preallocSize; i < messagePoolSize; i++ {
|
||||
globalMessagePool.pool <- &OptimizedIPCMessage{
|
||||
data: make([]byte, 0, maxFrameSize),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves a message from the pool
|
||||
func (mp *MessagePool) Get() *OptimizedIPCMessage {
|
||||
// First try pre-allocated messages for fastest access
|
||||
mp.mutex.Lock()
|
||||
if len(mp.preallocated) > 0 {
|
||||
msg := mp.preallocated[len(mp.preallocated)-1]
|
||||
mp.preallocated = mp.preallocated[:len(mp.preallocated)-1]
|
||||
mp.mutex.Unlock()
|
||||
atomic.AddInt64(&mp.hitCount, 1)
|
||||
return msg
|
||||
}
|
||||
mp.mutex.Unlock()
|
||||
|
||||
// Try channel pool next
|
||||
select {
|
||||
case msg := <-mp.pool:
|
||||
atomic.AddInt64(&mp.hitCount, 1)
|
||||
return msg
|
||||
default:
|
||||
// Pool exhausted, create new message
|
||||
atomic.AddInt64(&mp.missCount, 1)
|
||||
return &OptimizedIPCMessage{
|
||||
data: make([]byte, 0, maxFrameSize),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Put returns a message to the pool
|
||||
func (mp *MessagePool) Put(msg *OptimizedIPCMessage) {
|
||||
// Reset the message for reuse
|
||||
msg.data = msg.data[:0]
|
||||
msg.msg = InputIPCMessage{}
|
||||
|
||||
// First try to return to pre-allocated pool for fastest reuse
|
||||
mp.mutex.Lock()
|
||||
if len(mp.preallocated) < mp.preallocSize {
|
||||
mp.preallocated = append(mp.preallocated, msg)
|
||||
mp.mutex.Unlock()
|
||||
return
|
||||
}
|
||||
mp.mutex.Unlock()
|
||||
|
||||
// Try channel pool next
|
||||
select {
|
||||
case mp.pool <- msg:
|
||||
// Successfully returned to pool
|
||||
default:
|
||||
// Pool full, let GC handle it
|
||||
}
|
||||
}
|
||||
|
||||
// InputIPCConfig represents configuration for audio input
|
||||
type InputIPCConfig struct {
|
||||
SampleRate int
|
||||
|
@ -79,8 +186,9 @@ func NewAudioInputServer() (*AudioInputServer, error) {
|
|||
return nil, fmt.Errorf("failed to create unix socket: %w", err)
|
||||
}
|
||||
|
||||
// Initialize with adaptive buffer size (start with 1000 frames)
|
||||
initialBufferSize := int64(1000)
|
||||
// Get initial buffer size from adaptive buffer manager
|
||||
adaptiveManager := GetAdaptiveBufferManager()
|
||||
initialBufferSize := int64(adaptiveManager.GetInputBufferSize())
|
||||
|
||||
return &AudioInputServer{
|
||||
listener: listener,
|
||||
|
@ -192,21 +300,22 @@ func (ais *AudioInputServer) handleConnection(conn net.Conn) {
|
|||
|
||||
// readMessage reads a complete message from the connection
|
||||
func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error) {
|
||||
// Read header (magic + type + length + timestamp)
|
||||
headerSize := 4 + 1 + 4 + 8 // uint32 + uint8 + uint32 + int64
|
||||
header := make([]byte, headerSize)
|
||||
// Get optimized message from pool
|
||||
optMsg := globalMessagePool.Get()
|
||||
defer globalMessagePool.Put(optMsg)
|
||||
|
||||
_, err := io.ReadFull(conn, header)
|
||||
// Read header directly into pre-allocated buffer
|
||||
_, err := io.ReadFull(conn, optMsg.header[:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Parse header
|
||||
msg := &InputIPCMessage{}
|
||||
msg.Magic = binary.LittleEndian.Uint32(header[0:4])
|
||||
msg.Type = InputMessageType(header[4])
|
||||
msg.Length = binary.LittleEndian.Uint32(header[5:9])
|
||||
msg.Timestamp = int64(binary.LittleEndian.Uint64(header[9:17]))
|
||||
// Parse header using optimized access
|
||||
msg := &optMsg.msg
|
||||
msg.Magic = binary.LittleEndian.Uint32(optMsg.header[0:4])
|
||||
msg.Type = InputMessageType(optMsg.header[4])
|
||||
msg.Length = binary.LittleEndian.Uint32(optMsg.header[5:9])
|
||||
msg.Timestamp = int64(binary.LittleEndian.Uint64(optMsg.header[9:17]))
|
||||
|
||||
// Validate magic number
|
||||
if msg.Magic != inputMagicNumber {
|
||||
|
@ -218,16 +327,37 @@ func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error
|
|||
return nil, fmt.Errorf("message too large: %d bytes", msg.Length)
|
||||
}
|
||||
|
||||
// Read data if present
|
||||
// Read data if present using pooled buffer
|
||||
if msg.Length > 0 {
|
||||
msg.Data = make([]byte, msg.Length)
|
||||
_, err = io.ReadFull(conn, msg.Data)
|
||||
// Ensure buffer capacity
|
||||
if cap(optMsg.data) < int(msg.Length) {
|
||||
optMsg.data = make([]byte, msg.Length)
|
||||
} else {
|
||||
optMsg.data = optMsg.data[:msg.Length]
|
||||
}
|
||||
|
||||
_, err = io.ReadFull(conn, optMsg.data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
msg.Data = optMsg.data
|
||||
}
|
||||
|
||||
return msg, nil
|
||||
// Return a copy of the message (data will be copied by caller if needed)
|
||||
result := &InputIPCMessage{
|
||||
Magic: msg.Magic,
|
||||
Type: msg.Type,
|
||||
Length: msg.Length,
|
||||
Timestamp: msg.Timestamp,
|
||||
}
|
||||
|
||||
if msg.Length > 0 {
|
||||
// Copy data to ensure it's not affected by buffer reuse
|
||||
result.Data = make([]byte, msg.Length)
|
||||
copy(result.Data, msg.Data)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// processMessage processes a received message
|
||||
|
@ -282,19 +412,20 @@ func (ais *AudioInputServer) sendAck() error {
|
|||
return ais.writeMessage(ais.conn, msg)
|
||||
}
|
||||
|
||||
// writeMessage writes a message to the connection
|
||||
// writeMessage writes a message to the connection using optimized buffers
|
||||
func (ais *AudioInputServer) writeMessage(conn net.Conn, msg *InputIPCMessage) error {
|
||||
// Prepare header
|
||||
headerSize := 4 + 1 + 4 + 8
|
||||
header := make([]byte, headerSize)
|
||||
// Get optimized message from pool for header preparation
|
||||
optMsg := globalMessagePool.Get()
|
||||
defer globalMessagePool.Put(optMsg)
|
||||
|
||||
binary.LittleEndian.PutUint32(header[0:4], msg.Magic)
|
||||
header[4] = byte(msg.Type)
|
||||
binary.LittleEndian.PutUint32(header[5:9], msg.Length)
|
||||
binary.LittleEndian.PutUint64(header[9:17], uint64(msg.Timestamp))
|
||||
// Prepare header in pre-allocated buffer
|
||||
binary.LittleEndian.PutUint32(optMsg.header[0:4], msg.Magic)
|
||||
optMsg.header[4] = byte(msg.Type)
|
||||
binary.LittleEndian.PutUint32(optMsg.header[5:9], msg.Length)
|
||||
binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(msg.Timestamp))
|
||||
|
||||
// Write header
|
||||
_, err := conn.Write(header)
|
||||
_, err := conn.Write(optMsg.header[:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -312,7 +443,7 @@ func (ais *AudioInputServer) writeMessage(conn net.Conn, msg *InputIPCMessage) e
|
|||
|
||||
// AudioInputClient handles IPC communication from the main process
|
||||
type AudioInputClient struct {
|
||||
// Atomic fields must be first for proper alignment on ARM
|
||||
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
|
||||
droppedFrames int64 // Atomic counter for dropped frames
|
||||
totalFrames int64 // Atomic counter for total frames
|
||||
|
||||
|
@ -410,6 +541,35 @@ func (aic *AudioInputClient) SendFrame(frame []byte) error {
|
|||
return aic.writeMessage(msg)
|
||||
}
|
||||
|
||||
// SendFrameZeroCopy sends a zero-copy Opus frame to the audio input server
|
||||
func (aic *AudioInputClient) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error {
|
||||
aic.mtx.Lock()
|
||||
defer aic.mtx.Unlock()
|
||||
|
||||
if !aic.running || aic.conn == nil {
|
||||
return fmt.Errorf("not connected")
|
||||
}
|
||||
|
||||
if frame == nil || frame.Length() == 0 {
|
||||
return nil // Empty frame, ignore
|
||||
}
|
||||
|
||||
if frame.Length() > maxFrameSize {
|
||||
return fmt.Errorf("frame too large: %d bytes", frame.Length())
|
||||
}
|
||||
|
||||
// Use zero-copy data directly
|
||||
msg := &InputIPCMessage{
|
||||
Magic: inputMagicNumber,
|
||||
Type: InputMessageTypeOpusFrame,
|
||||
Length: uint32(frame.Length()),
|
||||
Timestamp: time.Now().UnixNano(),
|
||||
Data: frame.Data(), // Zero-copy data access
|
||||
}
|
||||
|
||||
return aic.writeMessage(msg)
|
||||
}
|
||||
|
||||
// SendConfig sends a configuration update to the audio input server
|
||||
func (aic *AudioInputClient) SendConfig(config InputIPCConfig) error {
|
||||
aic.mtx.Lock()
|
||||
|
@ -460,14 +620,15 @@ func (aic *AudioInputClient) writeMessage(msg *InputIPCMessage) error {
|
|||
// Increment total frames counter
|
||||
atomic.AddInt64(&aic.totalFrames, 1)
|
||||
|
||||
// Prepare header
|
||||
headerSize := 4 + 1 + 4 + 8
|
||||
header := make([]byte, headerSize)
|
||||
// Get optimized message from pool for header preparation
|
||||
optMsg := globalMessagePool.Get()
|
||||
defer globalMessagePool.Put(optMsg)
|
||||
|
||||
binary.LittleEndian.PutUint32(header[0:4], msg.Magic)
|
||||
header[4] = byte(msg.Type)
|
||||
binary.LittleEndian.PutUint32(header[5:9], msg.Length)
|
||||
binary.LittleEndian.PutUint64(header[9:17], uint64(msg.Timestamp))
|
||||
// Prepare header in pre-allocated buffer
|
||||
binary.LittleEndian.PutUint32(optMsg.header[0:4], msg.Magic)
|
||||
optMsg.header[4] = byte(msg.Type)
|
||||
binary.LittleEndian.PutUint32(optMsg.header[5:9], msg.Length)
|
||||
binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(msg.Timestamp))
|
||||
|
||||
// Use non-blocking write with timeout
|
||||
ctx, cancel := context.WithTimeout(context.Background(), writeTimeout)
|
||||
|
@ -476,8 +637,8 @@ func (aic *AudioInputClient) writeMessage(msg *InputIPCMessage) error {
|
|||
// Create a channel to signal write completion
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
// Write header
|
||||
_, err := aic.conn.Write(header)
|
||||
// Write header using pre-allocated buffer
|
||||
_, err := aic.conn.Write(optMsg.header[:])
|
||||
if err != nil {
|
||||
done <- err
|
||||
return
|
||||
|
@ -570,6 +731,20 @@ func (ais *AudioInputServer) startReaderGoroutine() {
|
|||
func (ais *AudioInputServer) startProcessorGoroutine() {
|
||||
ais.wg.Add(1)
|
||||
go func() {
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
// Set high priority for audio processing
|
||||
logger := logging.GetDefaultLogger().With().Str("component", "audio-input-processor").Logger()
|
||||
if err := SetAudioThreadPriority(); err != nil {
|
||||
logger.Warn().Err(err).Msg("Failed to set audio processing priority")
|
||||
}
|
||||
defer func() {
|
||||
if err := ResetThreadPriority(); err != nil {
|
||||
logger.Warn().Err(err).Msg("Failed to reset thread priority")
|
||||
}
|
||||
}()
|
||||
|
||||
defer ais.wg.Done()
|
||||
for {
|
||||
select {
|
||||
|
@ -608,9 +783,27 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
|
|||
func (ais *AudioInputServer) startMonitorGoroutine() {
|
||||
ais.wg.Add(1)
|
||||
go func() {
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
// Set I/O priority for monitoring
|
||||
logger := logging.GetDefaultLogger().With().Str("component", "audio-input-monitor").Logger()
|
||||
if err := SetAudioIOThreadPriority(); err != nil {
|
||||
logger.Warn().Err(err).Msg("Failed to set audio I/O priority")
|
||||
}
|
||||
defer func() {
|
||||
if err := ResetThreadPriority(); err != nil {
|
||||
logger.Warn().Err(err).Msg("Failed to reset thread priority")
|
||||
}
|
||||
}()
|
||||
|
||||
defer ais.wg.Done()
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Buffer size update ticker (less frequent)
|
||||
bufferUpdateTicker := time.NewTicker(500 * time.Millisecond)
|
||||
defer bufferUpdateTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
|
@ -623,52 +816,46 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
|
|||
case msg := <-ais.processChan:
|
||||
start := time.Now()
|
||||
err := ais.processMessage(msg)
|
||||
processingTime := time.Since(start).Nanoseconds()
|
||||
processingTime := time.Since(start)
|
||||
|
||||
// Calculate end-to-end latency using message timestamp
|
||||
var latency time.Duration
|
||||
if msg.Type == InputMessageTypeOpusFrame && msg.Timestamp > 0 {
|
||||
msgTime := time.Unix(0, msg.Timestamp)
|
||||
endToEndLatency := time.Since(msgTime).Nanoseconds()
|
||||
latency = time.Since(msgTime)
|
||||
// Use exponential moving average for end-to-end latency tracking
|
||||
currentAvg := atomic.LoadInt64(&ais.processingTime)
|
||||
// Weight: 90% historical, 10% current (for smoother averaging)
|
||||
newAvg := (currentAvg*9 + endToEndLatency) / 10
|
||||
newAvg := (currentAvg*9 + latency.Nanoseconds()) / 10
|
||||
atomic.StoreInt64(&ais.processingTime, newAvg)
|
||||
} else {
|
||||
// Fallback to processing time only
|
||||
latency = processingTime
|
||||
currentAvg := atomic.LoadInt64(&ais.processingTime)
|
||||
newAvg := (currentAvg + processingTime) / 2
|
||||
newAvg := (currentAvg + processingTime.Nanoseconds()) / 2
|
||||
atomic.StoreInt64(&ais.processingTime, newAvg)
|
||||
}
|
||||
|
||||
// Report latency to adaptive buffer manager
|
||||
ais.ReportLatency(latency)
|
||||
|
||||
if err != nil {
|
||||
atomic.AddInt64(&ais.droppedFrames, 1)
|
||||
}
|
||||
default:
|
||||
// No more messages to process
|
||||
goto adaptiveBuffering
|
||||
goto checkBufferUpdate
|
||||
}
|
||||
}
|
||||
|
||||
adaptiveBuffering:
|
||||
// Adaptive buffer sizing based on processing time
|
||||
avgTime := atomic.LoadInt64(&ais.processingTime)
|
||||
currentSize := atomic.LoadInt64(&ais.bufferSize)
|
||||
|
||||
if avgTime > 10*1000*1000 { // > 10ms processing time
|
||||
// Increase buffer size
|
||||
newSize := currentSize * 2
|
||||
if newSize > 1000 {
|
||||
newSize = 1000
|
||||
}
|
||||
atomic.StoreInt64(&ais.bufferSize, newSize)
|
||||
} else if avgTime < 1*1000*1000 { // < 1ms processing time
|
||||
// Decrease buffer size
|
||||
newSize := currentSize / 2
|
||||
if newSize < 50 {
|
||||
newSize = 50
|
||||
}
|
||||
atomic.StoreInt64(&ais.bufferSize, newSize)
|
||||
|
||||
checkBufferUpdate:
|
||||
// Check if we need to update buffer size
|
||||
select {
|
||||
case <-bufferUpdateTicker.C:
|
||||
// Update buffer size from adaptive buffer manager
|
||||
ais.UpdateBufferSize()
|
||||
default:
|
||||
// No buffer update needed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -683,6 +870,64 @@ func (ais *AudioInputServer) GetServerStats() (total, dropped int64, avgProcessi
|
|||
atomic.LoadInt64(&ais.bufferSize)
|
||||
}
|
||||
|
||||
// UpdateBufferSize updates the buffer size from adaptive buffer manager
|
||||
func (ais *AudioInputServer) UpdateBufferSize() {
|
||||
adaptiveManager := GetAdaptiveBufferManager()
|
||||
newSize := int64(adaptiveManager.GetInputBufferSize())
|
||||
atomic.StoreInt64(&ais.bufferSize, newSize)
|
||||
}
|
||||
|
||||
// ReportLatency reports processing latency to adaptive buffer manager
|
||||
func (ais *AudioInputServer) ReportLatency(latency time.Duration) {
|
||||
adaptiveManager := GetAdaptiveBufferManager()
|
||||
adaptiveManager.UpdateLatency(latency)
|
||||
}
|
||||
|
||||
// GetMessagePoolStats returns detailed statistics about the message pool
|
||||
func (mp *MessagePool) GetMessagePoolStats() MessagePoolStats {
|
||||
mp.mutex.RLock()
|
||||
preallocatedCount := len(mp.preallocated)
|
||||
mp.mutex.RUnlock()
|
||||
|
||||
hitCount := atomic.LoadInt64(&mp.hitCount)
|
||||
missCount := atomic.LoadInt64(&mp.missCount)
|
||||
totalRequests := hitCount + missCount
|
||||
|
||||
var hitRate float64
|
||||
if totalRequests > 0 {
|
||||
hitRate = float64(hitCount) / float64(totalRequests) * 100
|
||||
}
|
||||
|
||||
// Calculate channel pool size
|
||||
channelPoolSize := len(mp.pool)
|
||||
|
||||
return MessagePoolStats{
|
||||
MaxPoolSize: mp.maxPoolSize,
|
||||
ChannelPoolSize: channelPoolSize,
|
||||
PreallocatedCount: int64(preallocatedCount),
|
||||
PreallocatedMax: int64(mp.preallocSize),
|
||||
HitCount: hitCount,
|
||||
MissCount: missCount,
|
||||
HitRate: hitRate,
|
||||
}
|
||||
}
|
||||
|
||||
// MessagePoolStats provides detailed message pool statistics
|
||||
type MessagePoolStats struct {
|
||||
MaxPoolSize int
|
||||
ChannelPoolSize int
|
||||
PreallocatedCount int64
|
||||
PreallocatedMax int64
|
||||
HitCount int64
|
||||
MissCount int64
|
||||
HitRate float64 // Percentage
|
||||
}
|
||||
|
||||
// GetGlobalMessagePoolStats returns statistics for the global message pool
|
||||
func GetGlobalMessagePoolStats() MessagePoolStats {
|
||||
return globalMessagePool.GetMessagePoolStats()
|
||||
}
|
||||
|
||||
// Helper functions
|
||||
|
||||
// getInputSocketPath returns the path to the input socket
|
||||
|
|
|
@ -116,6 +116,40 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// WriteOpusFrameZeroCopy sends an Opus frame via IPC using zero-copy optimization
|
||||
func (aim *AudioInputIPCManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame) error {
|
||||
if atomic.LoadInt32(&aim.running) == 0 {
|
||||
return nil // Not running, silently ignore
|
||||
}
|
||||
|
||||
if frame == nil || frame.Length() == 0 {
|
||||
return nil // Empty frame, ignore
|
||||
}
|
||||
|
||||
// Start latency measurement
|
||||
startTime := time.Now()
|
||||
|
||||
// Update metrics
|
||||
atomic.AddInt64(&aim.metrics.FramesSent, 1)
|
||||
atomic.AddInt64(&aim.metrics.BytesProcessed, int64(frame.Length()))
|
||||
aim.metrics.LastFrameTime = startTime
|
||||
|
||||
// Send frame via IPC using zero-copy data
|
||||
err := aim.supervisor.SendFrameZeroCopy(frame)
|
||||
if err != nil {
|
||||
// Count as dropped frame
|
||||
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
|
||||
aim.logger.Debug().Err(err).Msg("Failed to send zero-copy frame via IPC")
|
||||
return err
|
||||
}
|
||||
|
||||
// Calculate and update latency (end-to-end IPC transmission time)
|
||||
latency := time.Since(startTime)
|
||||
aim.updateLatencyMetrics(latency)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsRunning returns whether the IPC manager is running
|
||||
func (aim *AudioInputIPCManager) IsRunning() bool {
|
||||
return atomic.LoadInt32(&aim.running) == 1
|
||||
|
|
|
@ -16,6 +16,10 @@ func RunAudioInputServer() error {
|
|||
logger := logging.GetDefaultLogger().With().Str("component", "audio-input-server").Logger()
|
||||
logger.Info().Msg("Starting audio input server subprocess")
|
||||
|
||||
// Start adaptive buffer management for optimal performance
|
||||
StartAdaptiveBuffering()
|
||||
defer StopAdaptiveBuffering()
|
||||
|
||||
// Initialize CGO audio system
|
||||
err := CGOAudioPlaybackInit()
|
||||
if err != nil {
|
||||
|
|
|
@ -244,6 +244,19 @@ func (ais *AudioInputSupervisor) SendFrame(frame []byte) error {
|
|||
return ais.client.SendFrame(frame)
|
||||
}
|
||||
|
||||
// SendFrameZeroCopy sends a zero-copy frame to the subprocess
|
||||
func (ais *AudioInputSupervisor) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error {
|
||||
if ais.client == nil {
|
||||
return fmt.Errorf("client not initialized")
|
||||
}
|
||||
|
||||
if !ais.client.IsConnected() {
|
||||
return fmt.Errorf("client not connected")
|
||||
}
|
||||
|
||||
return ais.client.SendFrameZeroCopy(frame)
|
||||
}
|
||||
|
||||
// SendConfig sends a configuration update to the subprocess (convenience method)
|
||||
func (ais *AudioInputSupervisor) SendConfig(config InputIPCConfig) error {
|
||||
if ais.client == nil {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -8,22 +9,120 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
const (
|
||||
magicNumber uint32 = 0x4A4B564D // "JKVM"
|
||||
socketName = "audio_output.sock"
|
||||
outputMagicNumber uint32 = 0x4A4B4F55 // "JKOU" (JetKVM Output)
|
||||
outputSocketName = "audio_output.sock"
|
||||
outputMaxFrameSize = 4096 // Maximum Opus frame size
|
||||
outputWriteTimeout = 10 * time.Millisecond // Non-blocking write timeout (increased for high load)
|
||||
outputMaxDroppedFrames = 50 // Maximum consecutive dropped frames
|
||||
outputHeaderSize = 17 // Fixed header size: 4+1+4+8 bytes
|
||||
outputMessagePoolSize = 128 // Pre-allocated message pool size
|
||||
)
|
||||
|
||||
// OutputMessageType represents the type of IPC message
|
||||
type OutputMessageType uint8
|
||||
|
||||
const (
|
||||
OutputMessageTypeOpusFrame OutputMessageType = iota
|
||||
OutputMessageTypeConfig
|
||||
OutputMessageTypeStop
|
||||
OutputMessageTypeHeartbeat
|
||||
OutputMessageTypeAck
|
||||
)
|
||||
|
||||
// OutputIPCMessage represents an IPC message for audio output
|
||||
type OutputIPCMessage struct {
|
||||
Magic uint32
|
||||
Type OutputMessageType
|
||||
Length uint32
|
||||
Timestamp int64
|
||||
Data []byte
|
||||
}
|
||||
|
||||
// OutputOptimizedMessage represents a pre-allocated message for zero-allocation operations
|
||||
type OutputOptimizedMessage struct {
|
||||
header [outputHeaderSize]byte // Pre-allocated header buffer
|
||||
data []byte // Reusable data buffer
|
||||
}
|
||||
|
||||
// OutputMessagePool manages pre-allocated messages for zero-allocation IPC
|
||||
type OutputMessagePool struct {
|
||||
pool chan *OutputOptimizedMessage
|
||||
}
|
||||
|
||||
// NewOutputMessagePool creates a new message pool
|
||||
func NewOutputMessagePool(size int) *OutputMessagePool {
|
||||
pool := &OutputMessagePool{
|
||||
pool: make(chan *OutputOptimizedMessage, size),
|
||||
}
|
||||
|
||||
// Pre-allocate messages
|
||||
for i := 0; i < size; i++ {
|
||||
msg := &OutputOptimizedMessage{
|
||||
data: make([]byte, outputMaxFrameSize),
|
||||
}
|
||||
pool.pool <- msg
|
||||
}
|
||||
|
||||
return pool
|
||||
}
|
||||
|
||||
// Get retrieves a message from the pool
|
||||
func (p *OutputMessagePool) Get() *OutputOptimizedMessage {
|
||||
select {
|
||||
case msg := <-p.pool:
|
||||
return msg
|
||||
default:
|
||||
// Pool exhausted, create new message
|
||||
return &OutputOptimizedMessage{
|
||||
data: make([]byte, outputMaxFrameSize),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Put returns a message to the pool
|
||||
func (p *OutputMessagePool) Put(msg *OutputOptimizedMessage) {
|
||||
select {
|
||||
case p.pool <- msg:
|
||||
// Successfully returned to pool
|
||||
default:
|
||||
// Pool full, let GC handle it
|
||||
}
|
||||
}
|
||||
|
||||
// Global message pool for output IPC
|
||||
var globalOutputMessagePool = NewOutputMessagePool(outputMessagePoolSize)
|
||||
|
||||
type AudioServer struct {
|
||||
// Atomic fields must be first for proper alignment on ARM
|
||||
bufferSize int64 // Current buffer size (atomic)
|
||||
processingTime int64 // Average processing time in nanoseconds (atomic)
|
||||
droppedFrames int64 // Dropped frames counter (atomic)
|
||||
totalFrames int64 // Total frames counter (atomic)
|
||||
|
||||
listener net.Listener
|
||||
conn net.Conn
|
||||
mtx sync.Mutex
|
||||
running bool
|
||||
|
||||
// Advanced message handling
|
||||
messageChan chan *OutputIPCMessage // Buffered channel for incoming messages
|
||||
stopChan chan struct{} // Stop signal
|
||||
wg sync.WaitGroup // Wait group for goroutine coordination
|
||||
|
||||
// Latency monitoring
|
||||
latencyMonitor *LatencyMonitor
|
||||
adaptiveOptimizer *AdaptiveOptimizer
|
||||
}
|
||||
|
||||
func NewAudioServer() (*AudioServer, error) {
|
||||
socketPath := filepath.Join("/var/run", socketName)
|
||||
socketPath := getOutputSocketPath()
|
||||
// Remove existing socket if any
|
||||
os.Remove(socketPath)
|
||||
|
||||
|
@ -32,26 +131,175 @@ func NewAudioServer() (*AudioServer, error) {
|
|||
return nil, fmt.Errorf("failed to create unix socket: %w", err)
|
||||
}
|
||||
|
||||
return &AudioServer{listener: listener}, nil
|
||||
// Initialize with adaptive buffer size (start with 500 frames)
|
||||
initialBufferSize := int64(500)
|
||||
|
||||
// Initialize latency monitoring
|
||||
latencyConfig := DefaultLatencyConfig()
|
||||
logger := zerolog.New(os.Stderr).With().Timestamp().Str("component", "audio-server").Logger()
|
||||
latencyMonitor := NewLatencyMonitor(latencyConfig, logger)
|
||||
|
||||
// Initialize adaptive buffer manager with default config
|
||||
bufferConfig := DefaultAdaptiveBufferConfig()
|
||||
bufferManager := NewAdaptiveBufferManager(bufferConfig)
|
||||
|
||||
// Initialize adaptive optimizer
|
||||
optimizerConfig := DefaultOptimizerConfig()
|
||||
adaptiveOptimizer := NewAdaptiveOptimizer(latencyMonitor, bufferManager, optimizerConfig, logger)
|
||||
|
||||
return &AudioServer{
|
||||
listener: listener,
|
||||
messageChan: make(chan *OutputIPCMessage, initialBufferSize),
|
||||
stopChan: make(chan struct{}),
|
||||
bufferSize: initialBufferSize,
|
||||
latencyMonitor: latencyMonitor,
|
||||
adaptiveOptimizer: adaptiveOptimizer,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *AudioServer) Start() error {
|
||||
conn, err := s.listener.Accept()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to accept connection: %w", err)
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
if s.running {
|
||||
return fmt.Errorf("server already running")
|
||||
}
|
||||
s.conn = conn
|
||||
|
||||
s.running = true
|
||||
|
||||
// Start latency monitoring and adaptive optimization
|
||||
if s.latencyMonitor != nil {
|
||||
s.latencyMonitor.Start()
|
||||
}
|
||||
if s.adaptiveOptimizer != nil {
|
||||
s.adaptiveOptimizer.Start()
|
||||
}
|
||||
|
||||
// Start message processor goroutine
|
||||
s.startProcessorGoroutine()
|
||||
|
||||
// Accept connections in a goroutine
|
||||
go s.acceptConnections()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *AudioServer) Close() error {
|
||||
// acceptConnections accepts incoming connections
|
||||
func (s *AudioServer) acceptConnections() {
|
||||
for s.running {
|
||||
conn, err := s.listener.Accept()
|
||||
if err != nil {
|
||||
if s.running {
|
||||
// Only log error if we're still supposed to be running
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
s.mtx.Lock()
|
||||
// Close existing connection if any
|
||||
if s.conn != nil {
|
||||
s.conn.Close()
|
||||
}
|
||||
s.conn = conn
|
||||
s.mtx.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// startProcessorGoroutine starts the message processor
|
||||
func (s *AudioServer) startProcessorGoroutine() {
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case msg := <-s.messageChan:
|
||||
// Process message (currently just frame sending)
|
||||
if msg.Type == OutputMessageTypeOpusFrame {
|
||||
s.sendFrameToClient(msg.Data)
|
||||
}
|
||||
case <-s.stopChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *AudioServer) Stop() {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
if !s.running {
|
||||
return
|
||||
}
|
||||
|
||||
s.running = false
|
||||
|
||||
// Stop latency monitoring and adaptive optimization
|
||||
if s.adaptiveOptimizer != nil {
|
||||
s.adaptiveOptimizer.Stop()
|
||||
}
|
||||
if s.latencyMonitor != nil {
|
||||
s.latencyMonitor.Stop()
|
||||
}
|
||||
|
||||
// Signal processor to stop
|
||||
close(s.stopChan)
|
||||
s.wg.Wait()
|
||||
|
||||
if s.conn != nil {
|
||||
s.conn.Close()
|
||||
s.conn = nil
|
||||
}
|
||||
return s.listener.Close()
|
||||
}
|
||||
|
||||
func (s *AudioServer) Close() error {
|
||||
s.Stop()
|
||||
if s.listener != nil {
|
||||
s.listener.Close()
|
||||
}
|
||||
// Remove socket file
|
||||
os.Remove(getOutputSocketPath())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *AudioServer) SendFrame(frame []byte) error {
|
||||
if len(frame) > outputMaxFrameSize {
|
||||
return fmt.Errorf("frame size %d exceeds maximum %d", len(frame), outputMaxFrameSize)
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// Create IPC message
|
||||
msg := &OutputIPCMessage{
|
||||
Magic: outputMagicNumber,
|
||||
Type: OutputMessageTypeOpusFrame,
|
||||
Length: uint32(len(frame)),
|
||||
Timestamp: start.UnixNano(),
|
||||
Data: frame,
|
||||
}
|
||||
|
||||
// Try to send via message channel (non-blocking)
|
||||
select {
|
||||
case s.messageChan <- msg:
|
||||
atomic.AddInt64(&s.totalFrames, 1)
|
||||
|
||||
// Record latency for monitoring
|
||||
if s.latencyMonitor != nil {
|
||||
processingTime := time.Since(start)
|
||||
s.latencyMonitor.RecordLatency(processingTime, "ipc_send")
|
||||
}
|
||||
|
||||
return nil
|
||||
default:
|
||||
// Channel full, drop frame to prevent blocking
|
||||
atomic.AddInt64(&s.droppedFrames, 1)
|
||||
return fmt.Errorf("message channel full - frame dropped")
|
||||
}
|
||||
}
|
||||
|
||||
// sendFrameToClient sends frame data directly to the connected client
|
||||
func (s *AudioServer) sendFrameToClient(frame []byte) error {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
|
@ -59,70 +307,199 @@ func (s *AudioServer) SendFrame(frame []byte) error {
|
|||
return fmt.Errorf("no client connected")
|
||||
}
|
||||
|
||||
// Write magic number
|
||||
if err := binary.Write(s.conn, binary.BigEndian, magicNumber); err != nil {
|
||||
return fmt.Errorf("failed to write magic number: %w", err)
|
||||
}
|
||||
start := time.Now()
|
||||
|
||||
// Write frame size
|
||||
if err := binary.Write(s.conn, binary.BigEndian, uint32(len(frame))); err != nil {
|
||||
return fmt.Errorf("failed to write frame size: %w", err)
|
||||
}
|
||||
// Get optimized message from pool
|
||||
optMsg := globalOutputMessagePool.Get()
|
||||
defer globalOutputMessagePool.Put(optMsg)
|
||||
|
||||
// Write frame data
|
||||
if _, err := s.conn.Write(frame); err != nil {
|
||||
return fmt.Errorf("failed to write frame data: %w", err)
|
||||
}
|
||||
// Prepare header in pre-allocated buffer
|
||||
binary.LittleEndian.PutUint32(optMsg.header[0:4], outputMagicNumber)
|
||||
optMsg.header[4] = byte(OutputMessageTypeOpusFrame)
|
||||
binary.LittleEndian.PutUint32(optMsg.header[5:9], uint32(len(frame)))
|
||||
binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(start.UnixNano()))
|
||||
|
||||
return nil
|
||||
// Use non-blocking write with timeout
|
||||
ctx, cancel := context.WithTimeout(context.Background(), outputWriteTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Create a channel to signal write completion
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
// Write header using pre-allocated buffer
|
||||
_, err := s.conn.Write(optMsg.header[:])
|
||||
if err != nil {
|
||||
done <- err
|
||||
return
|
||||
}
|
||||
|
||||
// Write frame data
|
||||
if len(frame) > 0 {
|
||||
_, err = s.conn.Write(frame)
|
||||
if err != nil {
|
||||
done <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
done <- nil
|
||||
}()
|
||||
|
||||
// Wait for completion or timeout
|
||||
select {
|
||||
case err := <-done:
|
||||
if err != nil {
|
||||
atomic.AddInt64(&s.droppedFrames, 1)
|
||||
return err
|
||||
}
|
||||
// Record latency for monitoring
|
||||
if s.latencyMonitor != nil {
|
||||
writeLatency := time.Since(start)
|
||||
s.latencyMonitor.RecordLatency(writeLatency, "ipc_write")
|
||||
}
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
// Timeout occurred - drop frame to prevent blocking
|
||||
atomic.AddInt64(&s.droppedFrames, 1)
|
||||
return fmt.Errorf("write timeout - frame dropped")
|
||||
}
|
||||
}
|
||||
|
||||
// GetServerStats returns server performance statistics
|
||||
func (s *AudioServer) GetServerStats() (total, dropped int64, bufferSize int64) {
|
||||
return atomic.LoadInt64(&s.totalFrames),
|
||||
atomic.LoadInt64(&s.droppedFrames),
|
||||
atomic.LoadInt64(&s.bufferSize)
|
||||
}
|
||||
|
||||
type AudioClient struct {
|
||||
conn net.Conn
|
||||
mtx sync.Mutex
|
||||
// Atomic fields must be first for proper alignment on ARM
|
||||
droppedFrames int64 // Atomic counter for dropped frames
|
||||
totalFrames int64 // Atomic counter for total frames
|
||||
|
||||
conn net.Conn
|
||||
mtx sync.Mutex
|
||||
running bool
|
||||
}
|
||||
|
||||
func NewAudioClient() (*AudioClient, error) {
|
||||
socketPath := filepath.Join("/var/run", socketName)
|
||||
func NewAudioClient() *AudioClient {
|
||||
return &AudioClient{}
|
||||
}
|
||||
|
||||
// Connect connects to the audio output server
|
||||
func (c *AudioClient) Connect() error {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
if c.running {
|
||||
return nil // Already connected
|
||||
}
|
||||
|
||||
socketPath := getOutputSocketPath()
|
||||
// Try connecting multiple times as the server might not be ready
|
||||
for i := 0; i < 5; i++ {
|
||||
// Reduced retry count and delay for faster startup
|
||||
for i := 0; i < 8; i++ {
|
||||
conn, err := net.Dial("unix", socketPath)
|
||||
if err == nil {
|
||||
return &AudioClient{conn: conn}, nil
|
||||
c.conn = conn
|
||||
c.running = true
|
||||
return nil
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
// Exponential backoff starting at 50ms
|
||||
delay := time.Duration(50*(1<<uint(i/3))) * time.Millisecond
|
||||
if delay > 400*time.Millisecond {
|
||||
delay = 400 * time.Millisecond
|
||||
}
|
||||
time.Sleep(delay)
|
||||
}
|
||||
return nil, fmt.Errorf("failed to connect to audio server")
|
||||
|
||||
return fmt.Errorf("failed to connect to audio output server")
|
||||
}
|
||||
|
||||
// Disconnect disconnects from the audio output server
|
||||
func (c *AudioClient) Disconnect() {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
if !c.running {
|
||||
return
|
||||
}
|
||||
|
||||
c.running = false
|
||||
if c.conn != nil {
|
||||
c.conn.Close()
|
||||
c.conn = nil
|
||||
}
|
||||
}
|
||||
|
||||
// IsConnected returns whether the client is connected
|
||||
func (c *AudioClient) IsConnected() bool {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
return c.running && c.conn != nil
|
||||
}
|
||||
|
||||
func (c *AudioClient) Close() error {
|
||||
return c.conn.Close()
|
||||
c.Disconnect()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *AudioClient) ReceiveFrame() ([]byte, error) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
// Read magic number
|
||||
var magic uint32
|
||||
if err := binary.Read(c.conn, binary.BigEndian, &magic); err != nil {
|
||||
return nil, fmt.Errorf("failed to read magic number: %w", err)
|
||||
if !c.running || c.conn == nil {
|
||||
return nil, fmt.Errorf("not connected")
|
||||
}
|
||||
if magic != magicNumber {
|
||||
|
||||
// Get optimized message from pool for header reading
|
||||
optMsg := globalOutputMessagePool.Get()
|
||||
defer globalOutputMessagePool.Put(optMsg)
|
||||
|
||||
// Read header
|
||||
if _, err := io.ReadFull(c.conn, optMsg.header[:]); err != nil {
|
||||
return nil, fmt.Errorf("failed to read header: %w", err)
|
||||
}
|
||||
|
||||
// Parse header
|
||||
magic := binary.LittleEndian.Uint32(optMsg.header[0:4])
|
||||
if magic != outputMagicNumber {
|
||||
return nil, fmt.Errorf("invalid magic number: %x", magic)
|
||||
}
|
||||
|
||||
// Read frame size
|
||||
var size uint32
|
||||
if err := binary.Read(c.conn, binary.BigEndian, &size); err != nil {
|
||||
return nil, fmt.Errorf("failed to read frame size: %w", err)
|
||||
msgType := OutputMessageType(optMsg.header[4])
|
||||
if msgType != OutputMessageTypeOpusFrame {
|
||||
return nil, fmt.Errorf("unexpected message type: %d", msgType)
|
||||
}
|
||||
|
||||
size := binary.LittleEndian.Uint32(optMsg.header[5:9])
|
||||
if size > outputMaxFrameSize {
|
||||
return nil, fmt.Errorf("frame size %d exceeds maximum %d", size, outputMaxFrameSize)
|
||||
}
|
||||
|
||||
// Read frame data
|
||||
frame := make([]byte, size)
|
||||
if _, err := io.ReadFull(c.conn, frame); err != nil {
|
||||
return nil, fmt.Errorf("failed to read frame data: %w", err)
|
||||
if size > 0 {
|
||||
if _, err := io.ReadFull(c.conn, frame); err != nil {
|
||||
return nil, fmt.Errorf("failed to read frame data: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
atomic.AddInt64(&c.totalFrames, 1)
|
||||
return frame, nil
|
||||
}
|
||||
|
||||
// GetClientStats returns client performance statistics
|
||||
func (c *AudioClient) GetClientStats() (total, dropped int64) {
|
||||
return atomic.LoadInt64(&c.totalFrames),
|
||||
atomic.LoadInt64(&c.droppedFrames)
|
||||
}
|
||||
|
||||
// Helper functions
|
||||
|
||||
// getOutputSocketPath returns the path to the output socket
|
||||
func getOutputSocketPath() string {
|
||||
if path := os.Getenv("JETKVM_AUDIO_OUTPUT_SOCKET"); path != "" {
|
||||
return path
|
||||
}
|
||||
return filepath.Join("/var/run", outputSocketName)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,312 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// LatencyMonitor tracks and optimizes audio latency in real-time
|
||||
type LatencyMonitor struct {
|
||||
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
|
||||
currentLatency int64 // Current latency in nanoseconds (atomic)
|
||||
averageLatency int64 // Rolling average latency in nanoseconds (atomic)
|
||||
minLatency int64 // Minimum observed latency in nanoseconds (atomic)
|
||||
maxLatency int64 // Maximum observed latency in nanoseconds (atomic)
|
||||
latencySamples int64 // Number of latency samples collected (atomic)
|
||||
jitterAccumulator int64 // Accumulated jitter for variance calculation (atomic)
|
||||
lastOptimization int64 // Timestamp of last optimization in nanoseconds (atomic)
|
||||
|
||||
config LatencyConfig
|
||||
logger zerolog.Logger
|
||||
|
||||
// Control channels
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
// Optimization callbacks
|
||||
optimizationCallbacks []OptimizationCallback
|
||||
mutex sync.RWMutex
|
||||
|
||||
// Performance tracking
|
||||
latencyHistory []LatencyMeasurement
|
||||
historyMutex sync.RWMutex
|
||||
}
|
||||
|
||||
// LatencyConfig holds configuration for latency monitoring
|
||||
type LatencyConfig struct {
|
||||
TargetLatency time.Duration // Target latency to maintain
|
||||
MaxLatency time.Duration // Maximum acceptable latency
|
||||
OptimizationInterval time.Duration // How often to run optimization
|
||||
HistorySize int // Number of latency measurements to keep
|
||||
JitterThreshold time.Duration // Jitter threshold for optimization
|
||||
AdaptiveThreshold float64 // Threshold for adaptive adjustments (0.0-1.0)
|
||||
}
|
||||
|
||||
// LatencyMeasurement represents a single latency measurement
|
||||
type LatencyMeasurement struct {
|
||||
Timestamp time.Time
|
||||
Latency time.Duration
|
||||
Jitter time.Duration
|
||||
Source string // Source of the measurement (e.g., "input", "output", "processing")
|
||||
}
|
||||
|
||||
// OptimizationCallback is called when latency optimization is triggered
|
||||
type OptimizationCallback func(metrics LatencyMetrics) error
|
||||
|
||||
// LatencyMetrics provides comprehensive latency statistics
|
||||
type LatencyMetrics struct {
|
||||
Current time.Duration
|
||||
Average time.Duration
|
||||
Min time.Duration
|
||||
Max time.Duration
|
||||
Jitter time.Duration
|
||||
SampleCount int64
|
||||
Trend LatencyTrend
|
||||
}
|
||||
|
||||
// LatencyTrend indicates the direction of latency changes
|
||||
type LatencyTrend int
|
||||
|
||||
const (
|
||||
LatencyTrendStable LatencyTrend = iota
|
||||
LatencyTrendIncreasing
|
||||
LatencyTrendDecreasing
|
||||
LatencyTrendVolatile
|
||||
)
|
||||
|
||||
// DefaultLatencyConfig returns a sensible default configuration
|
||||
func DefaultLatencyConfig() LatencyConfig {
|
||||
return LatencyConfig{
|
||||
TargetLatency: 50 * time.Millisecond,
|
||||
MaxLatency: 200 * time.Millisecond,
|
||||
OptimizationInterval: 5 * time.Second,
|
||||
HistorySize: 100,
|
||||
JitterThreshold: 20 * time.Millisecond,
|
||||
AdaptiveThreshold: 0.8, // Trigger optimization when 80% above target
|
||||
}
|
||||
}
|
||||
|
||||
// NewLatencyMonitor creates a new latency monitoring system
|
||||
func NewLatencyMonitor(config LatencyConfig, logger zerolog.Logger) *LatencyMonitor {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
return &LatencyMonitor{
|
||||
config: config,
|
||||
logger: logger.With().Str("component", "latency-monitor").Logger(),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
latencyHistory: make([]LatencyMeasurement, 0, config.HistorySize),
|
||||
minLatency: int64(time.Hour), // Initialize to high value
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins latency monitoring and optimization
|
||||
func (lm *LatencyMonitor) Start() {
|
||||
lm.wg.Add(1)
|
||||
go lm.monitoringLoop()
|
||||
lm.logger.Info().Msg("Latency monitor started")
|
||||
}
|
||||
|
||||
// Stop stops the latency monitor
|
||||
func (lm *LatencyMonitor) Stop() {
|
||||
lm.cancel()
|
||||
lm.wg.Wait()
|
||||
lm.logger.Info().Msg("Latency monitor stopped")
|
||||
}
|
||||
|
||||
// RecordLatency records a new latency measurement
|
||||
func (lm *LatencyMonitor) RecordLatency(latency time.Duration, source string) {
|
||||
now := time.Now()
|
||||
latencyNanos := latency.Nanoseconds()
|
||||
|
||||
// Update atomic counters
|
||||
atomic.StoreInt64(&lm.currentLatency, latencyNanos)
|
||||
atomic.AddInt64(&lm.latencySamples, 1)
|
||||
|
||||
// Update min/max
|
||||
for {
|
||||
oldMin := atomic.LoadInt64(&lm.minLatency)
|
||||
if latencyNanos >= oldMin || atomic.CompareAndSwapInt64(&lm.minLatency, oldMin, latencyNanos) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
oldMax := atomic.LoadInt64(&lm.maxLatency)
|
||||
if latencyNanos <= oldMax || atomic.CompareAndSwapInt64(&lm.maxLatency, oldMax, latencyNanos) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Update rolling average using exponential moving average
|
||||
oldAvg := atomic.LoadInt64(&lm.averageLatency)
|
||||
newAvg := oldAvg + (latencyNanos-oldAvg)/10 // Alpha = 0.1
|
||||
atomic.StoreInt64(&lm.averageLatency, newAvg)
|
||||
|
||||
// Calculate jitter (difference from average)
|
||||
jitter := latencyNanos - newAvg
|
||||
if jitter < 0 {
|
||||
jitter = -jitter
|
||||
}
|
||||
atomic.AddInt64(&lm.jitterAccumulator, jitter)
|
||||
|
||||
// Store in history
|
||||
lm.historyMutex.Lock()
|
||||
measurement := LatencyMeasurement{
|
||||
Timestamp: now,
|
||||
Latency: latency,
|
||||
Jitter: time.Duration(jitter),
|
||||
Source: source,
|
||||
}
|
||||
|
||||
if len(lm.latencyHistory) >= lm.config.HistorySize {
|
||||
// Remove oldest measurement
|
||||
copy(lm.latencyHistory, lm.latencyHistory[1:])
|
||||
lm.latencyHistory[len(lm.latencyHistory)-1] = measurement
|
||||
} else {
|
||||
lm.latencyHistory = append(lm.latencyHistory, measurement)
|
||||
}
|
||||
lm.historyMutex.Unlock()
|
||||
}
|
||||
|
||||
// GetMetrics returns current latency metrics
|
||||
func (lm *LatencyMonitor) GetMetrics() LatencyMetrics {
|
||||
current := atomic.LoadInt64(&lm.currentLatency)
|
||||
average := atomic.LoadInt64(&lm.averageLatency)
|
||||
min := atomic.LoadInt64(&lm.minLatency)
|
||||
max := atomic.LoadInt64(&lm.maxLatency)
|
||||
samples := atomic.LoadInt64(&lm.latencySamples)
|
||||
jitterSum := atomic.LoadInt64(&lm.jitterAccumulator)
|
||||
|
||||
var jitter time.Duration
|
||||
if samples > 0 {
|
||||
jitter = time.Duration(jitterSum / samples)
|
||||
}
|
||||
|
||||
return LatencyMetrics{
|
||||
Current: time.Duration(current),
|
||||
Average: time.Duration(average),
|
||||
Min: time.Duration(min),
|
||||
Max: time.Duration(max),
|
||||
Jitter: jitter,
|
||||
SampleCount: samples,
|
||||
Trend: lm.calculateTrend(),
|
||||
}
|
||||
}
|
||||
|
||||
// AddOptimizationCallback adds a callback for latency optimization
|
||||
func (lm *LatencyMonitor) AddOptimizationCallback(callback OptimizationCallback) {
|
||||
lm.mutex.Lock()
|
||||
lm.optimizationCallbacks = append(lm.optimizationCallbacks, callback)
|
||||
lm.mutex.Unlock()
|
||||
}
|
||||
|
||||
// monitoringLoop runs the main monitoring and optimization loop
|
||||
func (lm *LatencyMonitor) monitoringLoop() {
|
||||
defer lm.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(lm.config.OptimizationInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-lm.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
lm.runOptimization()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runOptimization checks if optimization is needed and triggers callbacks
|
||||
func (lm *LatencyMonitor) runOptimization() {
|
||||
metrics := lm.GetMetrics()
|
||||
|
||||
// Check if optimization is needed
|
||||
needsOptimization := false
|
||||
|
||||
// Check if current latency exceeds threshold
|
||||
if metrics.Current > lm.config.MaxLatency {
|
||||
needsOptimization = true
|
||||
lm.logger.Warn().Dur("current_latency", metrics.Current).Dur("max_latency", lm.config.MaxLatency).Msg("Latency exceeds maximum threshold")
|
||||
}
|
||||
|
||||
// Check if average latency is above adaptive threshold
|
||||
adaptiveThreshold := time.Duration(float64(lm.config.TargetLatency.Nanoseconds()) * (1.0 + lm.config.AdaptiveThreshold))
|
||||
if metrics.Average > adaptiveThreshold {
|
||||
needsOptimization = true
|
||||
lm.logger.Info().Dur("average_latency", metrics.Average).Dur("threshold", adaptiveThreshold).Msg("Average latency above adaptive threshold")
|
||||
}
|
||||
|
||||
// Check if jitter is too high
|
||||
if metrics.Jitter > lm.config.JitterThreshold {
|
||||
needsOptimization = true
|
||||
lm.logger.Info().Dur("jitter", metrics.Jitter).Dur("threshold", lm.config.JitterThreshold).Msg("Jitter above threshold")
|
||||
}
|
||||
|
||||
if needsOptimization {
|
||||
atomic.StoreInt64(&lm.lastOptimization, time.Now().UnixNano())
|
||||
|
||||
// Run optimization callbacks
|
||||
lm.mutex.RLock()
|
||||
callbacks := make([]OptimizationCallback, len(lm.optimizationCallbacks))
|
||||
copy(callbacks, lm.optimizationCallbacks)
|
||||
lm.mutex.RUnlock()
|
||||
|
||||
for _, callback := range callbacks {
|
||||
if err := callback(metrics); err != nil {
|
||||
lm.logger.Error().Err(err).Msg("Optimization callback failed")
|
||||
}
|
||||
}
|
||||
|
||||
lm.logger.Info().Interface("metrics", metrics).Msg("Latency optimization triggered")
|
||||
}
|
||||
}
|
||||
|
||||
// calculateTrend analyzes recent latency measurements to determine trend
|
||||
func (lm *LatencyMonitor) calculateTrend() LatencyTrend {
|
||||
lm.historyMutex.RLock()
|
||||
defer lm.historyMutex.RUnlock()
|
||||
|
||||
if len(lm.latencyHistory) < 10 {
|
||||
return LatencyTrendStable
|
||||
}
|
||||
|
||||
// Analyze last 10 measurements
|
||||
recentMeasurements := lm.latencyHistory[len(lm.latencyHistory)-10:]
|
||||
|
||||
var increasing, decreasing int
|
||||
for i := 1; i < len(recentMeasurements); i++ {
|
||||
if recentMeasurements[i].Latency > recentMeasurements[i-1].Latency {
|
||||
increasing++
|
||||
} else if recentMeasurements[i].Latency < recentMeasurements[i-1].Latency {
|
||||
decreasing++
|
||||
}
|
||||
}
|
||||
|
||||
// Determine trend based on direction changes
|
||||
if increasing > 6 {
|
||||
return LatencyTrendIncreasing
|
||||
} else if decreasing > 6 {
|
||||
return LatencyTrendDecreasing
|
||||
} else if increasing+decreasing > 7 {
|
||||
return LatencyTrendVolatile
|
||||
}
|
||||
|
||||
return LatencyTrendStable
|
||||
}
|
||||
|
||||
// GetLatencyHistory returns a copy of recent latency measurements
|
||||
func (lm *LatencyMonitor) GetLatencyHistory() []LatencyMeasurement {
|
||||
lm.historyMutex.RLock()
|
||||
defer lm.historyMutex.RUnlock()
|
||||
|
||||
history := make([]LatencyMeasurement, len(lm.latencyHistory))
|
||||
copy(history, lm.latencyHistory)
|
||||
return history
|
||||
}
|
|
@ -0,0 +1,198 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/jetkvm/kvm/internal/logging"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// MemoryMetrics provides comprehensive memory allocation statistics
|
||||
type MemoryMetrics struct {
|
||||
// Runtime memory statistics
|
||||
RuntimeStats RuntimeMemoryStats `json:"runtime_stats"`
|
||||
// Audio buffer pool statistics
|
||||
BufferPools AudioBufferPoolStats `json:"buffer_pools"`
|
||||
// Zero-copy frame pool statistics
|
||||
ZeroCopyPool ZeroCopyFramePoolStats `json:"zero_copy_pool"`
|
||||
// Message pool statistics
|
||||
MessagePool MessagePoolStats `json:"message_pool"`
|
||||
// Batch processor statistics
|
||||
BatchProcessor BatchProcessorMemoryStats `json:"batch_processor,omitempty"`
|
||||
// Collection timestamp
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
// RuntimeMemoryStats provides Go runtime memory statistics
|
||||
type RuntimeMemoryStats struct {
|
||||
Alloc uint64 `json:"alloc"` // Bytes allocated and not yet freed
|
||||
TotalAlloc uint64 `json:"total_alloc"` // Total bytes allocated (cumulative)
|
||||
Sys uint64 `json:"sys"` // Total bytes obtained from OS
|
||||
Lookups uint64 `json:"lookups"` // Number of pointer lookups
|
||||
Mallocs uint64 `json:"mallocs"` // Number of mallocs
|
||||
Frees uint64 `json:"frees"` // Number of frees
|
||||
HeapAlloc uint64 `json:"heap_alloc"` // Bytes allocated and not yet freed (heap)
|
||||
HeapSys uint64 `json:"heap_sys"` // Bytes obtained from OS for heap
|
||||
HeapIdle uint64 `json:"heap_idle"` // Bytes in idle spans
|
||||
HeapInuse uint64 `json:"heap_inuse"` // Bytes in non-idle spans
|
||||
HeapReleased uint64 `json:"heap_released"` // Bytes released to OS
|
||||
HeapObjects uint64 `json:"heap_objects"` // Total number of allocated objects
|
||||
StackInuse uint64 `json:"stack_inuse"` // Bytes used by stack spans
|
||||
StackSys uint64 `json:"stack_sys"` // Bytes obtained from OS for stack
|
||||
MSpanInuse uint64 `json:"mspan_inuse"` // Bytes used by mspan structures
|
||||
MSpanSys uint64 `json:"mspan_sys"` // Bytes obtained from OS for mspan
|
||||
MCacheInuse uint64 `json:"mcache_inuse"` // Bytes used by mcache structures
|
||||
MCacheSys uint64 `json:"mcache_sys"` // Bytes obtained from OS for mcache
|
||||
BuckHashSys uint64 `json:"buck_hash_sys"` // Bytes used by profiling bucket hash table
|
||||
GCSys uint64 `json:"gc_sys"` // Bytes used for garbage collection metadata
|
||||
OtherSys uint64 `json:"other_sys"` // Bytes used for other system allocations
|
||||
NextGC uint64 `json:"next_gc"` // Target heap size for next GC
|
||||
LastGC uint64 `json:"last_gc"` // Time of last GC (nanoseconds since epoch)
|
||||
PauseTotalNs uint64 `json:"pause_total_ns"` // Total GC pause time
|
||||
NumGC uint32 `json:"num_gc"` // Number of completed GC cycles
|
||||
NumForcedGC uint32 `json:"num_forced_gc"` // Number of forced GC cycles
|
||||
GCCPUFraction float64 `json:"gc_cpu_fraction"` // Fraction of CPU time used by GC
|
||||
}
|
||||
|
||||
// BatchProcessorMemoryStats provides batch processor memory statistics
|
||||
type BatchProcessorMemoryStats struct {
|
||||
Initialized bool `json:"initialized"`
|
||||
Running bool `json:"running"`
|
||||
Stats BatchAudioStats `json:"stats"`
|
||||
BufferPool AudioBufferPoolDetailedStats `json:"buffer_pool,omitempty"`
|
||||
}
|
||||
|
||||
// GetBatchAudioProcessor is defined in batch_audio.go
|
||||
// BatchAudioStats is defined in batch_audio.go
|
||||
|
||||
var memoryMetricsLogger *zerolog.Logger
|
||||
|
||||
func getMemoryMetricsLogger() *zerolog.Logger {
|
||||
if memoryMetricsLogger == nil {
|
||||
logger := logging.GetDefaultLogger().With().Str("component", "memory-metrics").Logger()
|
||||
memoryMetricsLogger = &logger
|
||||
}
|
||||
return memoryMetricsLogger
|
||||
}
|
||||
|
||||
// CollectMemoryMetrics gathers comprehensive memory allocation statistics
|
||||
func CollectMemoryMetrics() MemoryMetrics {
|
||||
// Collect runtime memory statistics
|
||||
var m runtime.MemStats
|
||||
runtime.ReadMemStats(&m)
|
||||
|
||||
runtimeStats := RuntimeMemoryStats{
|
||||
Alloc: m.Alloc,
|
||||
TotalAlloc: m.TotalAlloc,
|
||||
Sys: m.Sys,
|
||||
Lookups: m.Lookups,
|
||||
Mallocs: m.Mallocs,
|
||||
Frees: m.Frees,
|
||||
HeapAlloc: m.HeapAlloc,
|
||||
HeapSys: m.HeapSys,
|
||||
HeapIdle: m.HeapIdle,
|
||||
HeapInuse: m.HeapInuse,
|
||||
HeapReleased: m.HeapReleased,
|
||||
HeapObjects: m.HeapObjects,
|
||||
StackInuse: m.StackInuse,
|
||||
StackSys: m.StackSys,
|
||||
MSpanInuse: m.MSpanInuse,
|
||||
MSpanSys: m.MSpanSys,
|
||||
MCacheInuse: m.MCacheInuse,
|
||||
MCacheSys: m.MCacheSys,
|
||||
BuckHashSys: m.BuckHashSys,
|
||||
GCSys: m.GCSys,
|
||||
OtherSys: m.OtherSys,
|
||||
NextGC: m.NextGC,
|
||||
LastGC: m.LastGC,
|
||||
PauseTotalNs: m.PauseTotalNs,
|
||||
NumGC: m.NumGC,
|
||||
NumForcedGC: m.NumForcedGC,
|
||||
GCCPUFraction: m.GCCPUFraction,
|
||||
}
|
||||
|
||||
// Collect audio buffer pool statistics
|
||||
bufferPoolStats := GetAudioBufferPoolStats()
|
||||
|
||||
// Collect zero-copy frame pool statistics
|
||||
zeroCopyStats := GetGlobalZeroCopyPoolStats()
|
||||
|
||||
// Collect message pool statistics
|
||||
messagePoolStats := GetGlobalMessagePoolStats()
|
||||
|
||||
// Collect batch processor statistics if available
|
||||
var batchStats BatchProcessorMemoryStats
|
||||
if processor := GetBatchAudioProcessor(); processor != nil {
|
||||
batchStats.Initialized = true
|
||||
batchStats.Running = processor.IsRunning()
|
||||
batchStats.Stats = processor.GetStats()
|
||||
// Note: BatchAudioProcessor uses sync.Pool, detailed stats not available
|
||||
}
|
||||
|
||||
return MemoryMetrics{
|
||||
RuntimeStats: runtimeStats,
|
||||
BufferPools: bufferPoolStats,
|
||||
ZeroCopyPool: zeroCopyStats,
|
||||
MessagePool: messagePoolStats,
|
||||
BatchProcessor: batchStats,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
// HandleMemoryMetrics provides an HTTP handler for memory metrics
|
||||
func HandleMemoryMetrics(w http.ResponseWriter, r *http.Request) {
|
||||
logger := getMemoryMetricsLogger()
|
||||
|
||||
if r.Method != http.MethodGet {
|
||||
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
metrics := CollectMemoryMetrics()
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
|
||||
if err := json.NewEncoder(w).Encode(metrics); err != nil {
|
||||
logger.Error().Err(err).Msg("failed to encode memory metrics")
|
||||
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debug().Msg("memory metrics served")
|
||||
}
|
||||
|
||||
// LogMemoryMetrics logs current memory metrics for debugging
|
||||
func LogMemoryMetrics() {
|
||||
logger := getMemoryMetricsLogger()
|
||||
metrics := CollectMemoryMetrics()
|
||||
|
||||
logger.Info().
|
||||
Uint64("heap_alloc_mb", metrics.RuntimeStats.HeapAlloc/1024/1024).
|
||||
Uint64("heap_sys_mb", metrics.RuntimeStats.HeapSys/1024/1024).
|
||||
Uint64("heap_objects", metrics.RuntimeStats.HeapObjects).
|
||||
Uint32("num_gc", metrics.RuntimeStats.NumGC).
|
||||
Float64("gc_cpu_fraction", metrics.RuntimeStats.GCCPUFraction).
|
||||
Float64("buffer_pool_hit_rate", metrics.BufferPools.FramePoolHitRate).
|
||||
Float64("zero_copy_hit_rate", metrics.ZeroCopyPool.HitRate).
|
||||
Float64("message_pool_hit_rate", metrics.MessagePool.HitRate).
|
||||
Msg("memory metrics snapshot")
|
||||
}
|
||||
|
||||
// StartMemoryMetricsLogging starts periodic memory metrics logging
|
||||
func StartMemoryMetricsLogging(interval time.Duration) {
|
||||
logger := getMemoryMetricsLogger()
|
||||
logger.Info().Dur("interval", interval).Msg("starting memory metrics logging")
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
LogMemoryMetrics()
|
||||
}
|
||||
}()
|
||||
}
|
|
@ -10,6 +10,42 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
// Adaptive buffer metrics
|
||||
adaptiveInputBufferSize = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "jetkvm_adaptive_input_buffer_size_bytes",
|
||||
Help: "Current adaptive input buffer size in bytes",
|
||||
},
|
||||
)
|
||||
|
||||
adaptiveOutputBufferSize = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "jetkvm_adaptive_output_buffer_size_bytes",
|
||||
Help: "Current adaptive output buffer size in bytes",
|
||||
},
|
||||
)
|
||||
|
||||
adaptiveBufferAdjustmentsTotal = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Name: "jetkvm_adaptive_buffer_adjustments_total",
|
||||
Help: "Total number of adaptive buffer size adjustments",
|
||||
},
|
||||
)
|
||||
|
||||
adaptiveSystemCpuPercent = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "jetkvm_adaptive_system_cpu_percent",
|
||||
Help: "System CPU usage percentage used by adaptive buffer manager",
|
||||
},
|
||||
)
|
||||
|
||||
adaptiveSystemMemoryPercent = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "jetkvm_adaptive_system_memory_percent",
|
||||
Help: "System memory usage percentage used by adaptive buffer manager",
|
||||
},
|
||||
)
|
||||
|
||||
// Audio output metrics
|
||||
audioFramesReceivedTotal = promauto.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
|
@ -364,6 +400,23 @@ func UpdateMicrophoneConfigMetrics(config AudioConfig) {
|
|||
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
|
||||
}
|
||||
|
||||
// UpdateAdaptiveBufferMetrics updates Prometheus metrics with adaptive buffer information
|
||||
func UpdateAdaptiveBufferMetrics(inputBufferSize, outputBufferSize int, cpuPercent, memoryPercent float64, adjustmentMade bool) {
|
||||
metricsUpdateMutex.Lock()
|
||||
defer metricsUpdateMutex.Unlock()
|
||||
|
||||
adaptiveInputBufferSize.Set(float64(inputBufferSize))
|
||||
adaptiveOutputBufferSize.Set(float64(outputBufferSize))
|
||||
adaptiveSystemCpuPercent.Set(cpuPercent)
|
||||
adaptiveSystemMemoryPercent.Set(memoryPercent)
|
||||
|
||||
if adjustmentMade {
|
||||
adaptiveBufferAdjustmentsTotal.Inc()
|
||||
}
|
||||
|
||||
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
|
||||
}
|
||||
|
||||
// GetLastMetricsUpdate returns the timestamp of the last metrics update
|
||||
func GetLastMetricsUpdate() time.Time {
|
||||
timestamp := atomic.LoadInt64(&lastMetricsUpdate)
|
||||
|
|
|
@ -8,9 +8,11 @@ import (
|
|||
|
||||
// MicrophoneContentionManager manages microphone access with cooldown periods
|
||||
type MicrophoneContentionManager struct {
|
||||
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
|
||||
lastOpNano int64
|
||||
cooldownNanos int64
|
||||
operationID int64
|
||||
|
||||
lockPtr unsafe.Pointer
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,9 @@ package audio
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
@ -9,6 +12,28 @@ import (
|
|||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// OutputStreamer manages high-performance audio output streaming
|
||||
type OutputStreamer struct {
|
||||
// Atomic fields must be first for proper alignment on ARM
|
||||
processedFrames int64 // Total processed frames counter (atomic)
|
||||
droppedFrames int64 // Dropped frames counter (atomic)
|
||||
processingTime int64 // Average processing time in nanoseconds (atomic)
|
||||
lastStatsTime int64 // Last statistics update time (atomic)
|
||||
|
||||
client *AudioClient
|
||||
bufferPool *AudioBufferPool
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
running bool
|
||||
mtx sync.Mutex
|
||||
|
||||
// Performance optimization fields
|
||||
batchSize int // Adaptive batch size for frame processing
|
||||
processingChan chan []byte // Buffered channel for frame processing
|
||||
statsInterval time.Duration // Statistics reporting interval
|
||||
}
|
||||
|
||||
var (
|
||||
outputStreamingRunning int32
|
||||
outputStreamingCancel context.CancelFunc
|
||||
|
@ -23,6 +48,253 @@ func getOutputStreamingLogger() *zerolog.Logger {
|
|||
return outputStreamingLogger
|
||||
}
|
||||
|
||||
func NewOutputStreamer() (*OutputStreamer, error) {
|
||||
client := NewAudioClient()
|
||||
|
||||
// Get initial batch size from adaptive buffer manager
|
||||
adaptiveManager := GetAdaptiveBufferManager()
|
||||
initialBatchSize := adaptiveManager.GetOutputBufferSize()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &OutputStreamer{
|
||||
client: client,
|
||||
bufferPool: NewAudioBufferPool(MaxAudioFrameSize), // Use existing buffer pool
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
batchSize: initialBatchSize, // Use adaptive batch size
|
||||
processingChan: make(chan []byte, 500), // Large buffer for smooth processing
|
||||
statsInterval: 5 * time.Second, // Statistics every 5 seconds
|
||||
lastStatsTime: time.Now().UnixNano(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *OutputStreamer) Start() error {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
if s.running {
|
||||
return fmt.Errorf("output streamer already running")
|
||||
}
|
||||
|
||||
// Connect to audio output server
|
||||
if err := s.client.Connect(); err != nil {
|
||||
return fmt.Errorf("failed to connect to audio output server: %w", err)
|
||||
}
|
||||
|
||||
s.running = true
|
||||
|
||||
// Start multiple goroutines for optimal performance
|
||||
s.wg.Add(3)
|
||||
go s.streamLoop() // Main streaming loop
|
||||
go s.processingLoop() // Frame processing loop
|
||||
go s.statisticsLoop() // Performance monitoring loop
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *OutputStreamer) Stop() {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
if !s.running {
|
||||
return
|
||||
}
|
||||
|
||||
s.running = false
|
||||
s.cancel()
|
||||
|
||||
// Close processing channel to signal goroutines
|
||||
close(s.processingChan)
|
||||
|
||||
// Wait for all goroutines to finish
|
||||
s.wg.Wait()
|
||||
|
||||
if s.client != nil {
|
||||
s.client.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *OutputStreamer) streamLoop() {
|
||||
defer s.wg.Done()
|
||||
|
||||
// Pin goroutine to OS thread for consistent performance
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
// Adaptive timing for frame reading
|
||||
frameInterval := time.Duration(20) * time.Millisecond // 50 FPS base rate
|
||||
ticker := time.NewTicker(frameInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Batch size update ticker
|
||||
batchUpdateTicker := time.NewTicker(500 * time.Millisecond)
|
||||
defer batchUpdateTicker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case <-batchUpdateTicker.C:
|
||||
// Update batch size from adaptive buffer manager
|
||||
s.UpdateBatchSize()
|
||||
case <-ticker.C:
|
||||
// Read audio data from CGO with timing measurement
|
||||
startTime := time.Now()
|
||||
frameBuf := s.bufferPool.Get()
|
||||
n, err := CGOAudioReadEncode(frameBuf)
|
||||
processingDuration := time.Since(startTime)
|
||||
|
||||
if err != nil {
|
||||
getOutputStreamingLogger().Warn().Err(err).Msg("Failed to read audio data")
|
||||
s.bufferPool.Put(frameBuf)
|
||||
atomic.AddInt64(&s.droppedFrames, 1)
|
||||
continue
|
||||
}
|
||||
|
||||
if n > 0 {
|
||||
// Send frame for processing (non-blocking)
|
||||
frameData := make([]byte, n)
|
||||
copy(frameData, frameBuf[:n])
|
||||
|
||||
select {
|
||||
case s.processingChan <- frameData:
|
||||
atomic.AddInt64(&s.processedFrames, 1)
|
||||
// Update processing time statistics
|
||||
atomic.StoreInt64(&s.processingTime, int64(processingDuration))
|
||||
// Report latency to adaptive buffer manager
|
||||
s.ReportLatency(processingDuration)
|
||||
default:
|
||||
// Processing channel full, drop frame
|
||||
atomic.AddInt64(&s.droppedFrames, 1)
|
||||
}
|
||||
}
|
||||
|
||||
s.bufferPool.Put(frameBuf)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processingLoop handles frame processing in a separate goroutine
|
||||
func (s *OutputStreamer) processingLoop() {
|
||||
defer s.wg.Done()
|
||||
|
||||
// Pin goroutine to OS thread for consistent performance
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
// Set high priority for audio output processing
|
||||
if err := SetAudioThreadPriority(); err != nil {
|
||||
getOutputStreamingLogger().Warn().Err(err).Msg("Failed to set audio output processing priority")
|
||||
}
|
||||
defer func() {
|
||||
if err := ResetThreadPriority(); err != nil {
|
||||
getOutputStreamingLogger().Warn().Err(err).Msg("Failed to reset thread priority")
|
||||
}
|
||||
}()
|
||||
|
||||
for _ = range s.processingChan {
|
||||
// Process frame (currently just receiving, but can be extended)
|
||||
if _, err := s.client.ReceiveFrame(); err != nil {
|
||||
if s.client.IsConnected() {
|
||||
getOutputStreamingLogger().Warn().Err(err).Msg("Failed to receive frame")
|
||||
atomic.AddInt64(&s.droppedFrames, 1)
|
||||
}
|
||||
// Try to reconnect if disconnected
|
||||
if !s.client.IsConnected() {
|
||||
if err := s.client.Connect(); err != nil {
|
||||
getOutputStreamingLogger().Warn().Err(err).Msg("Failed to reconnect")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// statisticsLoop monitors and reports performance statistics
|
||||
func (s *OutputStreamer) statisticsLoop() {
|
||||
defer s.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(s.statsInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.reportStatistics()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// reportStatistics logs current performance statistics
|
||||
func (s *OutputStreamer) reportStatistics() {
|
||||
processed := atomic.LoadInt64(&s.processedFrames)
|
||||
dropped := atomic.LoadInt64(&s.droppedFrames)
|
||||
processingTime := atomic.LoadInt64(&s.processingTime)
|
||||
|
||||
if processed > 0 {
|
||||
dropRate := float64(dropped) / float64(processed+dropped) * 100
|
||||
avgProcessingTime := time.Duration(processingTime)
|
||||
|
||||
getOutputStreamingLogger().Info().Int64("processed", processed).Int64("dropped", dropped).Float64("drop_rate", dropRate).Dur("avg_processing", avgProcessingTime).Msg("Output Audio Stats")
|
||||
|
||||
// Get client statistics
|
||||
clientTotal, clientDropped := s.client.GetClientStats()
|
||||
getOutputStreamingLogger().Info().Int64("total", clientTotal).Int64("dropped", clientDropped).Msg("Client Stats")
|
||||
}
|
||||
}
|
||||
|
||||
// GetStats returns streaming statistics
|
||||
func (s *OutputStreamer) GetStats() (processed, dropped int64, avgProcessingTime time.Duration) {
|
||||
processed = atomic.LoadInt64(&s.processedFrames)
|
||||
dropped = atomic.LoadInt64(&s.droppedFrames)
|
||||
processingTimeNs := atomic.LoadInt64(&s.processingTime)
|
||||
avgProcessingTime = time.Duration(processingTimeNs)
|
||||
return
|
||||
}
|
||||
|
||||
// GetDetailedStats returns comprehensive streaming statistics
|
||||
func (s *OutputStreamer) GetDetailedStats() map[string]interface{} {
|
||||
processed := atomic.LoadInt64(&s.processedFrames)
|
||||
dropped := atomic.LoadInt64(&s.droppedFrames)
|
||||
processingTime := atomic.LoadInt64(&s.processingTime)
|
||||
|
||||
stats := map[string]interface{}{
|
||||
"processed_frames": processed,
|
||||
"dropped_frames": dropped,
|
||||
"avg_processing_time_ns": processingTime,
|
||||
"batch_size": s.batchSize,
|
||||
"channel_buffer_size": cap(s.processingChan),
|
||||
"channel_current_size": len(s.processingChan),
|
||||
"connected": s.client.IsConnected(),
|
||||
}
|
||||
|
||||
if processed+dropped > 0 {
|
||||
stats["drop_rate_percent"] = float64(dropped) / float64(processed+dropped) * 100
|
||||
}
|
||||
|
||||
// Add client statistics
|
||||
clientTotal, clientDropped := s.client.GetClientStats()
|
||||
stats["client_total_frames"] = clientTotal
|
||||
stats["client_dropped_frames"] = clientDropped
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
// UpdateBatchSize updates the batch size from adaptive buffer manager
|
||||
func (s *OutputStreamer) UpdateBatchSize() {
|
||||
s.mtx.Lock()
|
||||
adaptiveManager := GetAdaptiveBufferManager()
|
||||
s.batchSize = adaptiveManager.GetOutputBufferSize()
|
||||
s.mtx.Unlock()
|
||||
}
|
||||
|
||||
// ReportLatency reports processing latency to adaptive buffer manager
|
||||
func (s *OutputStreamer) ReportLatency(latency time.Duration) {
|
||||
adaptiveManager := GetAdaptiveBufferManager()
|
||||
adaptiveManager.UpdateLatency(latency)
|
||||
}
|
||||
|
||||
// StartAudioOutputStreaming starts audio output streaming (capturing system audio)
|
||||
func StartAudioOutputStreaming(send func([]byte)) error {
|
||||
if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) {
|
||||
|
@ -61,10 +333,13 @@ func StartAudioOutputStreaming(send func([]byte)) error {
|
|||
continue
|
||||
}
|
||||
if n > 0 {
|
||||
// Send frame to callback
|
||||
frame := make([]byte, n)
|
||||
// Get frame buffer from pool to reduce allocations
|
||||
frame := GetAudioFrameBuffer()
|
||||
frame = frame[:n] // Resize to actual frame size
|
||||
copy(frame, buffer[:n])
|
||||
send(frame)
|
||||
// Return buffer to pool after sending
|
||||
PutAudioFrameBuffer(frame)
|
||||
RecordFrameReceived(n)
|
||||
}
|
||||
// Small delay to prevent busy waiting
|
||||
|
|
|
@ -0,0 +1,165 @@
|
|||
//go:build linux
|
||||
|
||||
package audio
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"syscall"
|
||||
"unsafe"
|
||||
|
||||
"github.com/jetkvm/kvm/internal/logging"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// SchedParam represents scheduling parameters for Linux
|
||||
type SchedParam struct {
|
||||
Priority int32
|
||||
}
|
||||
|
||||
// Priority levels for audio processing
|
||||
const (
|
||||
// SCHED_FIFO priorities (1-99, higher = more priority)
|
||||
AudioHighPriority = 80 // High priority for critical audio processing
|
||||
AudioMediumPriority = 60 // Medium priority for regular audio processing
|
||||
AudioLowPriority = 40 // Low priority for background audio tasks
|
||||
|
||||
// SCHED_NORMAL is the default (priority 0)
|
||||
NormalPriority = 0
|
||||
)
|
||||
|
||||
// Scheduling policies
|
||||
const (
|
||||
SCHED_NORMAL = 0
|
||||
SCHED_FIFO = 1
|
||||
SCHED_RR = 2
|
||||
)
|
||||
|
||||
// PriorityScheduler manages thread priorities for audio processing
|
||||
type PriorityScheduler struct {
|
||||
logger zerolog.Logger
|
||||
enabled bool
|
||||
}
|
||||
|
||||
// NewPriorityScheduler creates a new priority scheduler
|
||||
func NewPriorityScheduler() *PriorityScheduler {
|
||||
return &PriorityScheduler{
|
||||
logger: logging.GetDefaultLogger().With().Str("component", "priority-scheduler").Logger(),
|
||||
enabled: true,
|
||||
}
|
||||
}
|
||||
|
||||
// SetThreadPriority sets the priority of the current thread
|
||||
func (ps *PriorityScheduler) SetThreadPriority(priority int, policy int) error {
|
||||
if !ps.enabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Lock to OS thread to ensure we're setting priority for the right thread
|
||||
runtime.LockOSThread()
|
||||
|
||||
// Get current thread ID
|
||||
tid := syscall.Gettid()
|
||||
|
||||
// Set scheduling parameters
|
||||
param := &SchedParam{
|
||||
Priority: int32(priority),
|
||||
}
|
||||
|
||||
// Use syscall to set scheduler
|
||||
_, _, errno := syscall.Syscall(syscall.SYS_SCHED_SETSCHEDULER,
|
||||
uintptr(tid),
|
||||
uintptr(policy),
|
||||
uintptr(unsafe.Pointer(param)))
|
||||
|
||||
if errno != 0 {
|
||||
// If we can't set real-time priority, try nice value instead
|
||||
if policy != SCHED_NORMAL {
|
||||
ps.logger.Warn().Int("errno", int(errno)).Msg("Failed to set real-time priority, falling back to nice")
|
||||
return ps.setNicePriority(priority)
|
||||
}
|
||||
return errno
|
||||
}
|
||||
|
||||
ps.logger.Debug().Int("tid", tid).Int("priority", priority).Int("policy", policy).Msg("Thread priority set")
|
||||
return nil
|
||||
}
|
||||
|
||||
// setNicePriority sets nice value as fallback when real-time scheduling is not available
|
||||
func (ps *PriorityScheduler) setNicePriority(rtPriority int) error {
|
||||
// Convert real-time priority to nice value (inverse relationship)
|
||||
// RT priority 80 -> nice -10, RT priority 40 -> nice 0
|
||||
niceValue := (40 - rtPriority) / 4
|
||||
if niceValue < -20 {
|
||||
niceValue = -20
|
||||
}
|
||||
if niceValue > 19 {
|
||||
niceValue = 19
|
||||
}
|
||||
|
||||
err := syscall.Setpriority(syscall.PRIO_PROCESS, 0, niceValue)
|
||||
if err != nil {
|
||||
ps.logger.Warn().Err(err).Int("nice", niceValue).Msg("Failed to set nice priority")
|
||||
return err
|
||||
}
|
||||
|
||||
ps.logger.Debug().Int("nice", niceValue).Msg("Nice priority set as fallback")
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetAudioProcessingPriority sets high priority for audio processing threads
|
||||
func (ps *PriorityScheduler) SetAudioProcessingPriority() error {
|
||||
return ps.SetThreadPriority(AudioHighPriority, SCHED_FIFO)
|
||||
}
|
||||
|
||||
// SetAudioIOPriority sets medium priority for audio I/O threads
|
||||
func (ps *PriorityScheduler) SetAudioIOPriority() error {
|
||||
return ps.SetThreadPriority(AudioMediumPriority, SCHED_FIFO)
|
||||
}
|
||||
|
||||
// SetAudioBackgroundPriority sets low priority for background audio tasks
|
||||
func (ps *PriorityScheduler) SetAudioBackgroundPriority() error {
|
||||
return ps.SetThreadPriority(AudioLowPriority, SCHED_FIFO)
|
||||
}
|
||||
|
||||
// ResetPriority resets thread to normal scheduling
|
||||
func (ps *PriorityScheduler) ResetPriority() error {
|
||||
return ps.SetThreadPriority(NormalPriority, SCHED_NORMAL)
|
||||
}
|
||||
|
||||
// Disable disables priority scheduling (useful for testing or fallback)
|
||||
func (ps *PriorityScheduler) Disable() {
|
||||
ps.enabled = false
|
||||
ps.logger.Info().Msg("Priority scheduling disabled")
|
||||
}
|
||||
|
||||
// Enable enables priority scheduling
|
||||
func (ps *PriorityScheduler) Enable() {
|
||||
ps.enabled = true
|
||||
ps.logger.Info().Msg("Priority scheduling enabled")
|
||||
}
|
||||
|
||||
// Global priority scheduler instance
|
||||
var globalPriorityScheduler *PriorityScheduler
|
||||
|
||||
// GetPriorityScheduler returns the global priority scheduler instance
|
||||
func GetPriorityScheduler() *PriorityScheduler {
|
||||
if globalPriorityScheduler == nil {
|
||||
globalPriorityScheduler = NewPriorityScheduler()
|
||||
}
|
||||
return globalPriorityScheduler
|
||||
}
|
||||
|
||||
// SetAudioThreadPriority is a convenience function to set audio processing priority
|
||||
func SetAudioThreadPriority() error {
|
||||
return GetPriorityScheduler().SetAudioProcessingPriority()
|
||||
}
|
||||
|
||||
// SetAudioIOThreadPriority is a convenience function to set audio I/O priority
|
||||
func SetAudioIOThreadPriority() error {
|
||||
return GetPriorityScheduler().SetAudioIOPriority()
|
||||
}
|
||||
|
||||
// ResetThreadPriority is a convenience function to reset thread priority
|
||||
func ResetThreadPriority() error {
|
||||
return GetPriorityScheduler().ResetPriority()
|
||||
}
|
|
@ -2,6 +2,7 @@ package audio
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -13,6 +14,10 @@ import (
|
|||
// AudioRelay handles forwarding audio frames from the audio server subprocess
|
||||
// to WebRTC without any CGO audio processing. This runs in the main process.
|
||||
type AudioRelay struct {
|
||||
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
|
||||
framesRelayed int64
|
||||
framesDropped int64
|
||||
|
||||
client *AudioClient
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
@ -25,10 +30,6 @@ type AudioRelay struct {
|
|||
audioTrack AudioTrackWriter
|
||||
config AudioConfig
|
||||
muted bool
|
||||
|
||||
// Statistics
|
||||
framesRelayed int64
|
||||
framesDropped int64
|
||||
}
|
||||
|
||||
// AudioTrackWriter interface for WebRTC audio track
|
||||
|
@ -58,14 +59,16 @@ func (r *AudioRelay) Start(audioTrack AudioTrackWriter, config AudioConfig) erro
|
|||
}
|
||||
|
||||
// Create audio client to connect to subprocess
|
||||
client, err := NewAudioClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
client := NewAudioClient()
|
||||
r.client = client
|
||||
r.audioTrack = audioTrack
|
||||
r.config = config
|
||||
|
||||
// Connect to the audio output server
|
||||
if err := client.Connect(); err != nil {
|
||||
return fmt.Errorf("failed to connect to audio output server: %w", err)
|
||||
}
|
||||
|
||||
// Start relay goroutine
|
||||
r.wg.Add(1)
|
||||
go r.relayLoop()
|
||||
|
@ -88,7 +91,7 @@ func (r *AudioRelay) Stop() {
|
|||
r.wg.Wait()
|
||||
|
||||
if r.client != nil {
|
||||
r.client.Close()
|
||||
r.client.Disconnect()
|
||||
r.client = nil
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,314 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
// ZeroCopyAudioFrame represents an audio frame that can be passed between
|
||||
// components without copying the underlying data
|
||||
type ZeroCopyAudioFrame struct {
|
||||
data []byte
|
||||
length int
|
||||
capacity int
|
||||
refCount int32
|
||||
mutex sync.RWMutex
|
||||
pooled bool
|
||||
}
|
||||
|
||||
// ZeroCopyFramePool manages reusable zero-copy audio frames
|
||||
type ZeroCopyFramePool struct {
|
||||
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
|
||||
counter int64 // Frame counter (atomic)
|
||||
hitCount int64 // Pool hit counter (atomic)
|
||||
missCount int64 // Pool miss counter (atomic)
|
||||
|
||||
// Other fields
|
||||
pool sync.Pool
|
||||
maxSize int
|
||||
mutex sync.RWMutex
|
||||
// Memory optimization fields
|
||||
preallocated []*ZeroCopyAudioFrame // Pre-allocated frames for immediate use
|
||||
preallocSize int // Number of pre-allocated frames
|
||||
maxPoolSize int // Maximum pool size to prevent memory bloat
|
||||
}
|
||||
|
||||
// NewZeroCopyFramePool creates a new zero-copy frame pool
|
||||
func NewZeroCopyFramePool(maxFrameSize int) *ZeroCopyFramePool {
|
||||
// Pre-allocate 15 frames for immediate availability
|
||||
preallocSize := 15
|
||||
maxPoolSize := 50 // Limit total pool size
|
||||
preallocated := make([]*ZeroCopyAudioFrame, 0, preallocSize)
|
||||
|
||||
// Pre-allocate frames to reduce initial allocation overhead
|
||||
for i := 0; i < preallocSize; i++ {
|
||||
frame := &ZeroCopyAudioFrame{
|
||||
data: make([]byte, 0, maxFrameSize),
|
||||
capacity: maxFrameSize,
|
||||
pooled: true,
|
||||
}
|
||||
preallocated = append(preallocated, frame)
|
||||
}
|
||||
|
||||
return &ZeroCopyFramePool{
|
||||
maxSize: maxFrameSize,
|
||||
preallocated: preallocated,
|
||||
preallocSize: preallocSize,
|
||||
maxPoolSize: maxPoolSize,
|
||||
pool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &ZeroCopyAudioFrame{
|
||||
data: make([]byte, 0, maxFrameSize),
|
||||
capacity: maxFrameSize,
|
||||
pooled: true,
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves a zero-copy frame from the pool
|
||||
func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame {
|
||||
// First try pre-allocated frames for fastest access
|
||||
p.mutex.Lock()
|
||||
if len(p.preallocated) > 0 {
|
||||
frame := p.preallocated[len(p.preallocated)-1]
|
||||
p.preallocated = p.preallocated[:len(p.preallocated)-1]
|
||||
p.mutex.Unlock()
|
||||
|
||||
frame.mutex.Lock()
|
||||
frame.refCount = 1
|
||||
frame.length = 0
|
||||
frame.data = frame.data[:0]
|
||||
frame.mutex.Unlock()
|
||||
|
||||
atomic.AddInt64(&p.hitCount, 1)
|
||||
return frame
|
||||
}
|
||||
p.mutex.Unlock()
|
||||
|
||||
// Try sync.Pool next
|
||||
frame := p.pool.Get().(*ZeroCopyAudioFrame)
|
||||
frame.mutex.Lock()
|
||||
frame.refCount = 1
|
||||
frame.length = 0
|
||||
frame.data = frame.data[:0]
|
||||
frame.mutex.Unlock()
|
||||
|
||||
atomic.AddInt64(&p.hitCount, 1)
|
||||
return frame
|
||||
}
|
||||
|
||||
// Put returns a zero-copy frame to the pool
|
||||
func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) {
|
||||
if frame == nil || !frame.pooled {
|
||||
return
|
||||
}
|
||||
|
||||
frame.mutex.Lock()
|
||||
frame.refCount--
|
||||
if frame.refCount <= 0 {
|
||||
frame.refCount = 0
|
||||
frame.length = 0
|
||||
frame.data = frame.data[:0]
|
||||
frame.mutex.Unlock()
|
||||
|
||||
// First try to return to pre-allocated pool for fastest reuse
|
||||
p.mutex.Lock()
|
||||
if len(p.preallocated) < p.preallocSize {
|
||||
p.preallocated = append(p.preallocated, frame)
|
||||
p.mutex.Unlock()
|
||||
return
|
||||
}
|
||||
p.mutex.Unlock()
|
||||
|
||||
// Check pool size limit to prevent excessive memory usage
|
||||
p.mutex.RLock()
|
||||
currentCount := atomic.LoadInt64(&p.counter)
|
||||
p.mutex.RUnlock()
|
||||
|
||||
if currentCount >= int64(p.maxPoolSize) {
|
||||
return // Pool is full, let GC handle this frame
|
||||
}
|
||||
|
||||
// Return to sync.Pool
|
||||
p.pool.Put(frame)
|
||||
atomic.AddInt64(&p.counter, 1)
|
||||
} else {
|
||||
frame.mutex.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Data returns the frame data as a slice (zero-copy view)
|
||||
func (f *ZeroCopyAudioFrame) Data() []byte {
|
||||
f.mutex.RLock()
|
||||
defer f.mutex.RUnlock()
|
||||
return f.data[:f.length]
|
||||
}
|
||||
|
||||
// SetData sets the frame data (zero-copy if possible)
|
||||
func (f *ZeroCopyAudioFrame) SetData(data []byte) error {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
if len(data) > f.capacity {
|
||||
// Need to reallocate - not zero-copy but necessary
|
||||
f.data = make([]byte, len(data))
|
||||
f.capacity = len(data)
|
||||
f.pooled = false // Can't return to pool anymore
|
||||
}
|
||||
|
||||
// Zero-copy assignment when data fits in existing buffer
|
||||
if cap(f.data) >= len(data) {
|
||||
f.data = f.data[:len(data)]
|
||||
copy(f.data, data)
|
||||
} else {
|
||||
f.data = append(f.data[:0], data...)
|
||||
}
|
||||
f.length = len(data)
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetDataDirect sets frame data using direct buffer assignment (true zero-copy)
|
||||
// WARNING: The caller must ensure the buffer remains valid for the frame's lifetime
|
||||
func (f *ZeroCopyAudioFrame) SetDataDirect(data []byte) {
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
f.data = data
|
||||
f.length = len(data)
|
||||
f.capacity = cap(data)
|
||||
f.pooled = false // Direct assignment means we can't pool this frame
|
||||
}
|
||||
|
||||
// AddRef increments the reference count for shared access
|
||||
func (f *ZeroCopyAudioFrame) AddRef() {
|
||||
f.mutex.Lock()
|
||||
f.refCount++
|
||||
f.mutex.Unlock()
|
||||
}
|
||||
|
||||
// Release decrements the reference count
|
||||
func (f *ZeroCopyAudioFrame) Release() {
|
||||
f.mutex.Lock()
|
||||
f.refCount--
|
||||
f.mutex.Unlock()
|
||||
}
|
||||
|
||||
// Length returns the current data length
|
||||
func (f *ZeroCopyAudioFrame) Length() int {
|
||||
f.mutex.RLock()
|
||||
defer f.mutex.RUnlock()
|
||||
return f.length
|
||||
}
|
||||
|
||||
// Capacity returns the buffer capacity
|
||||
func (f *ZeroCopyAudioFrame) Capacity() int {
|
||||
f.mutex.RLock()
|
||||
defer f.mutex.RUnlock()
|
||||
return f.capacity
|
||||
}
|
||||
|
||||
// UnsafePointer returns an unsafe pointer to the data for CGO calls
|
||||
// WARNING: Only use this for CGO interop, ensure frame lifetime
|
||||
func (f *ZeroCopyAudioFrame) UnsafePointer() unsafe.Pointer {
|
||||
f.mutex.RLock()
|
||||
defer f.mutex.RUnlock()
|
||||
if len(f.data) == 0 {
|
||||
return nil
|
||||
}
|
||||
return unsafe.Pointer(&f.data[0])
|
||||
}
|
||||
|
||||
// Global zero-copy frame pool
|
||||
// GetZeroCopyPoolStats returns detailed statistics about the zero-copy frame pool
|
||||
func (p *ZeroCopyFramePool) GetZeroCopyPoolStats() ZeroCopyFramePoolStats {
|
||||
p.mutex.RLock()
|
||||
preallocatedCount := len(p.preallocated)
|
||||
currentCount := atomic.LoadInt64(&p.counter)
|
||||
p.mutex.RUnlock()
|
||||
|
||||
hitCount := atomic.LoadInt64(&p.hitCount)
|
||||
missCount := atomic.LoadInt64(&p.missCount)
|
||||
totalRequests := hitCount + missCount
|
||||
|
||||
var hitRate float64
|
||||
if totalRequests > 0 {
|
||||
hitRate = float64(hitCount) / float64(totalRequests) * 100
|
||||
}
|
||||
|
||||
return ZeroCopyFramePoolStats{
|
||||
MaxFrameSize: p.maxSize,
|
||||
MaxPoolSize: p.maxPoolSize,
|
||||
CurrentPoolSize: currentCount,
|
||||
PreallocatedCount: int64(preallocatedCount),
|
||||
PreallocatedMax: int64(p.preallocSize),
|
||||
HitCount: hitCount,
|
||||
MissCount: missCount,
|
||||
HitRate: hitRate,
|
||||
}
|
||||
}
|
||||
|
||||
// ZeroCopyFramePoolStats provides detailed zero-copy pool statistics
|
||||
type ZeroCopyFramePoolStats struct {
|
||||
MaxFrameSize int
|
||||
MaxPoolSize int
|
||||
CurrentPoolSize int64
|
||||
PreallocatedCount int64
|
||||
PreallocatedMax int64
|
||||
HitCount int64
|
||||
MissCount int64
|
||||
HitRate float64 // Percentage
|
||||
}
|
||||
|
||||
var (
|
||||
globalZeroCopyPool = NewZeroCopyFramePool(MaxAudioFrameSize)
|
||||
)
|
||||
|
||||
// GetZeroCopyFrame gets a frame from the global pool
|
||||
func GetZeroCopyFrame() *ZeroCopyAudioFrame {
|
||||
return globalZeroCopyPool.Get()
|
||||
}
|
||||
|
||||
// GetGlobalZeroCopyPoolStats returns statistics for the global zero-copy pool
|
||||
func GetGlobalZeroCopyPoolStats() ZeroCopyFramePoolStats {
|
||||
return globalZeroCopyPool.GetZeroCopyPoolStats()
|
||||
}
|
||||
|
||||
// PutZeroCopyFrame returns a frame to the global pool
|
||||
func PutZeroCopyFrame(frame *ZeroCopyAudioFrame) {
|
||||
globalZeroCopyPool.Put(frame)
|
||||
}
|
||||
|
||||
// ZeroCopyAudioReadEncode performs audio read and encode with zero-copy optimization
|
||||
func ZeroCopyAudioReadEncode() (*ZeroCopyAudioFrame, error) {
|
||||
frame := GetZeroCopyFrame()
|
||||
|
||||
// Ensure frame has enough capacity
|
||||
if frame.Capacity() < MaxAudioFrameSize {
|
||||
// Reallocate if needed
|
||||
frame.data = make([]byte, MaxAudioFrameSize)
|
||||
frame.capacity = MaxAudioFrameSize
|
||||
frame.pooled = false
|
||||
}
|
||||
|
||||
// Use unsafe pointer for direct CGO call
|
||||
n, err := CGOAudioReadEncode(frame.data[:MaxAudioFrameSize])
|
||||
if err != nil {
|
||||
PutZeroCopyFrame(frame)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if n == 0 {
|
||||
PutZeroCopyFrame(frame)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Set the actual data length
|
||||
frame.mutex.Lock()
|
||||
frame.length = n
|
||||
frame.data = frame.data[:n]
|
||||
frame.mutex.Unlock()
|
||||
|
||||
return frame, nil
|
||||
}
|
5
main.go
5
main.go
|
@ -31,6 +31,9 @@ func runAudioServer() {
|
|||
}
|
||||
|
||||
func startAudioSubprocess() error {
|
||||
// Start adaptive buffer management for optimal performance
|
||||
audio.StartAdaptiveBuffering()
|
||||
|
||||
// Create audio server supervisor
|
||||
audioSupervisor = audio.NewAudioServerSupervisor()
|
||||
|
||||
|
@ -59,6 +62,8 @@ func startAudioSubprocess() error {
|
|||
|
||||
// Stop audio relay when process exits
|
||||
audio.StopAudioRelay()
|
||||
// Stop adaptive buffering
|
||||
audio.StopAdaptiveBuffering()
|
||||
},
|
||||
// onRestart
|
||||
func(attempt int, delay time.Duration) {
|
||||
|
|
3
web.go
3
web.go
|
@ -457,6 +457,9 @@ func setupRouter() *gin.Engine {
|
|||
})
|
||||
})
|
||||
|
||||
// Audio memory allocation metrics endpoint
|
||||
protected.GET("/audio/memory-metrics", gin.WrapF(audio.HandleMemoryMetrics))
|
||||
|
||||
protected.GET("/microphone/process-metrics", func(c *gin.Context) {
|
||||
if currentSession == nil || currentSession.AudioInputManager == nil {
|
||||
c.JSON(200, gin.H{
|
||||
|
|
Loading…
Reference in New Issue