[WIP] Cleanup: reduce PR complexity

This commit is contained in:
Alex P 2025-09-09 06:52:40 +00:00
parent 0ebfc762f7
commit 00e5148eef
17 changed files with 173 additions and 2312 deletions

View File

@ -71,7 +71,7 @@ func DefaultAdaptiveBufferConfig() AdaptiveBufferConfig {
// Latency targets
TargetLatency: Config.AdaptiveBufferTargetLatency, // Target 20ms latency
MaxLatency: Config.LatencyMonitorTarget, // Max acceptable latency
MaxLatency: Config.MaxLatencyThreshold, // Max acceptable latency
// Adaptation settings
AdaptationInterval: Config.BufferUpdateInterval, // Check every 500ms
@ -91,7 +91,6 @@ type AdaptiveBufferManager struct {
config AdaptiveBufferConfig
logger zerolog.Logger
processMonitor *ProcessMonitor
// Control channels
ctx context.Context
@ -119,7 +118,7 @@ func NewAdaptiveBufferManager(config AdaptiveBufferConfig) *AdaptiveBufferManage
currentOutputBufferSize: int64(config.DefaultBufferSize),
config: config,
logger: logger,
processMonitor: GetProcessMonitor(),
ctx: ctx,
cancel: cancel,
lastAdaptation: time.Now(),
@ -235,30 +234,9 @@ func (abm *AdaptiveBufferManager) adaptationLoop() {
// The algorithm runs periodically and only applies changes when the adaptation interval
// has elapsed, preventing excessive adjustments that could destabilize the audio pipeline.
func (abm *AdaptiveBufferManager) adaptBufferSizes() {
// Collect current system metrics
metrics := abm.processMonitor.GetCurrentMetrics()
if len(metrics) == 0 {
return // No metrics available
}
// Calculate system-wide CPU and memory usage
totalCPU := 0.0
totalMemory := 0.0
processCount := 0
for _, metric := range metrics {
totalCPU += metric.CPUPercent
totalMemory += metric.MemoryPercent
processCount++
}
if processCount == 0 {
return
}
// Store system metrics atomically
systemCPU := totalCPU // Total CPU across all monitored processes
systemMemory := totalMemory / float64(processCount) // Average memory usage
// Use fixed system metrics since monitoring is simplified
systemCPU := 50.0 // Assume moderate CPU usage
systemMemory := 60.0 // Assume moderate memory usage
atomic.StoreInt64(&abm.systemCPUPercent, int64(systemCPU*100))
atomic.StoreInt64(&abm.systemMemoryPercent, int64(systemMemory*100))

View File

@ -117,7 +117,6 @@ type AudioConfigConstants struct {
// Buffer Management
PreallocSize int
MaxPoolSize int
MessagePoolSize int
OptimalSocketBuffer int
@ -131,7 +130,7 @@ type AudioConfigConstants struct {
MinReadEncodeBuffer int
MaxDecodeWriteBuffer int
MinBatchSizeForThreadPinning int
GoroutineMonitorInterval time.Duration
MagicNumber uint32
MaxFrameSize int
WriteTimeout time.Duration
@ -172,9 +171,6 @@ type AudioConfigConstants struct {
OutputSupervisorTimeout time.Duration // 5s
BatchProcessingDelay time.Duration // 10ms
AdaptiveOptimizerStability time.Duration // 10s
LatencyMonitorTarget time.Duration // 50ms
// Adaptive Buffer Configuration
// LowCPUThreshold defines CPU usage threshold for buffer size reduction.
LowCPUThreshold float64 // 20% CPU threshold for buffer optimization
@ -186,7 +182,7 @@ type AudioConfigConstants struct {
AdaptiveBufferTargetLatency time.Duration // 20ms target latency
CooldownPeriod time.Duration // 30s cooldown period
RollbackThreshold time.Duration // 300ms rollback threshold
AdaptiveOptimizerLatencyTarget time.Duration // 50ms latency target
MaxLatencyThreshold time.Duration // 200ms max latency
JitterThreshold time.Duration // 20ms jitter threshold
LatencyOptimizationInterval time.Duration // 5s optimization interval
@ -216,27 +212,6 @@ type AudioConfigConstants struct {
CGOPCMBufferSize int // PCM buffer size for CGO audio processing
CGONanosecondsPerSecond float64 // Nanoseconds per second conversion
FrontendOperationDebounceMS int // Frontend operation debounce delay
FrontendSyncDebounceMS int // Frontend sync debounce delay
FrontendSampleRate int // Frontend sample rate
FrontendRetryDelayMS int // Frontend retry delay
FrontendShortDelayMS int // Frontend short delay
FrontendLongDelayMS int // Frontend long delay
FrontendSyncDelayMS int // Frontend sync delay
FrontendMaxRetryAttempts int // Frontend max retry attempts
FrontendAudioLevelUpdateMS int // Frontend audio level update interval
FrontendFFTSize int // Frontend FFT size
FrontendAudioLevelMax int // Frontend max audio level
FrontendReconnectIntervalMS int // Frontend reconnect interval
FrontendSubscriptionDelayMS int // Frontend subscription delay
FrontendDebugIntervalMS int // Frontend debug interval
// Process Monitoring Constants
ProcessMonitorDefaultMemoryGB int // Default memory size for fallback (4GB)
ProcessMonitorKBToBytes int // KB to bytes conversion factor (1024)
ProcessMonitorDefaultClockHz float64 // Default system clock frequency (250.0 Hz)
ProcessMonitorFallbackClockHz float64 // Fallback clock frequency (1000.0 Hz)
ProcessMonitorTraditionalHz float64 // Traditional system clock frequency (100.0 Hz)
// Batch Processing Constants
BatchProcessorFramesPerBatch int // Frames processed per batch (4)
@ -272,7 +247,14 @@ type AudioConfigConstants struct {
LatencyPercentile50 int
LatencyPercentile95 int
LatencyPercentile99 int
BufferPoolMaxOperations int
// Buffer Pool Configuration
BufferPoolDefaultSize int // Default buffer pool size when MaxPoolSize is invalid
BufferPoolControlSize int // Control buffer pool size
ZeroCopyPreallocSizeBytes int // Zero-copy frame pool preallocation size in bytes
ZeroCopyMinPreallocFrames int // Minimum preallocated frames for zero-copy pool
BufferPoolHitRateBase float64 // Base for hit rate percentage calculation
HitRateCalculationBase float64
MaxLatency time.Duration
MinMetricsUpdateInterval time.Duration
@ -329,26 +311,6 @@ type AudioConfigConstants struct {
QualityChangeSettleDelay time.Duration // Delay for quality change to settle
QualityChangeRecoveryDelay time.Duration // Delay before attempting recovery
// Buffer Pool Cache Configuration
BufferPoolCacheSize int // Buffers per goroutine cache (4)
BufferPoolCacheTTL time.Duration // Cache TTL for aggressive cleanup (5s)
BufferPoolMaxCacheEntries int // Maximum cache entries to prevent memory bloat (128)
BufferPoolCacheCleanupInterval time.Duration // Cleanup interval for frequent cleanup (15s)
BufferPoolCacheWarmupThreshold int // Warmup threshold for faster startup (25)
BufferPoolCacheHitRateTarget float64 // Target hit rate for balanced performance (0.80)
BufferPoolMaxCacheSize int // Maximum goroutine caches (256)
BufferPoolCleanupInterval int64 // Cleanup interval in seconds (15)
BufferPoolBufferTTL int64 // Buffer TTL in seconds (30)
BufferPoolControlSize int // Control pool buffer size (512)
BufferPoolMinPreallocBuffers int // Minimum preallocation buffers
BufferPoolMaxPoolSize int // Maximum pool size
BufferPoolChunkBufferCount int // Buffers per chunk
BufferPoolMinChunkSize int // Minimum chunk size (64KB)
BufferPoolInitialChunkCapacity int // Initial chunk capacity
BufferPoolAdaptiveResizeThreshold int // Threshold for adaptive resize
BufferPoolHighHitRateThreshold float64 // High hit rate threshold
BufferPoolOptimizeCacheThreshold int // Threshold for cache optimization
BufferPoolCounterResetThreshold int // Counter reset threshold
}
// DefaultAudioConfig returns the default configuration constants
@ -458,7 +420,7 @@ func DefaultAudioConfig() *AudioConfigConstants {
MaxRestartDelay: 30 * time.Second, // Maximum delay for exponential backoff
// Buffer Management
PreallocSize: 1024 * 1024, // 1MB buffer preallocation
MaxPoolSize: 100, // Maximum object pool size
MessagePoolSize: 1024, // Significantly increased message pool for quality change bursts
OptimalSocketBuffer: 262144, // 256KB optimal socket buffer
@ -521,27 +483,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
QualityChangeSettleDelay: 2 * time.Second, // Delay for quality change to settle
QualityChangeRecoveryDelay: 1 * time.Second, // Delay before attempting recovery
// Buffer Pool Cache Configuration
BufferPoolCacheSize: 4, // Buffers per goroutine cache
BufferPoolCacheTTL: 5 * time.Second, // Cache TTL for aggressive cleanup
BufferPoolMaxCacheEntries: 128, // Maximum cache entries to prevent memory bloat
BufferPoolCacheCleanupInterval: 15 * time.Second, // Cleanup interval for frequent cleanup
BufferPoolCacheWarmupThreshold: 25, // Warmup threshold for faster startup
BufferPoolCacheHitRateTarget: 0.80, // Target hit rate for balanced performance
BufferPoolMaxCacheSize: 256, // Maximum goroutine caches
BufferPoolCleanupInterval: 15, // Cleanup interval in seconds
BufferPoolBufferTTL: 30, // Buffer TTL in seconds
BufferPoolControlSize: 512, // Control pool buffer size
BufferPoolMinPreallocBuffers: 16, // Minimum preallocation buffers (reduced from 50)
BufferPoolMaxPoolSize: 128, // Maximum pool size (reduced from 256)
BufferPoolChunkBufferCount: 8, // Buffers per chunk (reduced from 64 to prevent large allocations)
BufferPoolMinChunkSize: 8192, // Minimum chunk size (8KB, reduced from 64KB)
BufferPoolInitialChunkCapacity: 4, // Initial chunk capacity
BufferPoolAdaptiveResizeThreshold: 100, // Threshold for adaptive resize
BufferPoolHighHitRateThreshold: 0.95, // High hit rate threshold
BufferPoolOptimizeCacheThreshold: 100, // Threshold for cache optimization
BufferPoolCounterResetThreshold: 10000, // Counter reset threshold
// Timing Constants - Optimized for quality change stability
DefaultSleepDuration: 100 * time.Millisecond, // Balanced polling interval
ShortSleepDuration: 10 * time.Millisecond, // Balanced high-frequency polling
@ -551,9 +492,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
InputSupervisorTimeout: 5 * time.Second, // Input monitoring timeout
OutputSupervisorTimeout: 5 * time.Second, // Output monitoring timeout
BatchProcessingDelay: 5 * time.Millisecond, // Reduced batch processing delay
AdaptiveOptimizerStability: 5 * time.Second, // Faster adaptive stability period
LatencyMonitorTarget: 50 * time.Millisecond, // Balanced target latency for monitoring
// Adaptive Buffer Configuration - Optimized for single-core RV1106G3
LowCPUThreshold: 0.40, // Adjusted for single-core ARM system
@ -570,7 +508,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
// Adaptive Optimizer Configuration - Faster response
CooldownPeriod: 15 * time.Second, // Reduced cooldown period
RollbackThreshold: 200 * time.Millisecond, // Lower rollback threshold
AdaptiveOptimizerLatencyTarget: 30 * time.Millisecond, // Reduced latency target
// Latency Monitor Configuration - More aggressive monitoring
MaxLatencyThreshold: 150 * time.Millisecond, // Lower max latency threshold
@ -609,29 +546,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
CGOPCMBufferSize: 1920, // 1920 samples for PCM buffer (max 2ch*960)
CGONanosecondsPerSecond: 1000000000.0, // 1000000000.0 for nanosecond conversions
// Frontend Constants - Balanced for stability
FrontendOperationDebounceMS: 1000, // 1000ms debounce for frontend operations
FrontendSyncDebounceMS: 1000, // 1000ms debounce for sync operations
FrontendSampleRate: 48000, // 48000Hz sample rate for frontend audio
FrontendRetryDelayMS: 500, // 500ms retry delay
FrontendShortDelayMS: 200, // 200ms short delay
FrontendLongDelayMS: 300, // 300ms long delay
FrontendSyncDelayMS: 500, // 500ms sync delay
FrontendMaxRetryAttempts: 3, // 3 maximum retry attempts
FrontendAudioLevelUpdateMS: 100, // 100ms audio level update interval
FrontendFFTSize: 256, // 256 FFT size for audio analysis
FrontendAudioLevelMax: 100, // 100 maximum audio level
FrontendReconnectIntervalMS: 3000, // 3000ms reconnect interval
FrontendSubscriptionDelayMS: 100, // 100ms subscription delay
FrontendDebugIntervalMS: 5000, // 5000ms debug interval
// Process Monitor Constants
ProcessMonitorDefaultMemoryGB: 4, // 4GB default memory for fallback
ProcessMonitorKBToBytes: 1024, // 1024 conversion factor
ProcessMonitorDefaultClockHz: 250.0, // 250.0 Hz default for ARM systems
ProcessMonitorFallbackClockHz: 1000.0, // 1000.0 Hz fallback clock
ProcessMonitorTraditionalHz: 100.0, // 100.0 Hz traditional clock
// Batch Processing Constants - Optimized for quality change bursts
BatchProcessorFramesPerBatch: 16, // Larger batches for quality changes
BatchProcessorTimeout: 20 * time.Millisecond, // Longer timeout for bursts
@ -686,8 +600,14 @@ func DefaultAudioConfig() *AudioConfigConstants {
LatencyPercentile95: 95, // 95th percentile calculation factor
LatencyPercentile99: 99, // 99th percentile calculation factor
// Buffer Pool Configuration
BufferPoolDefaultSize: 64, // Default buffer pool size when MaxPoolSize is invalid
BufferPoolControlSize: 512, // Control buffer pool size
ZeroCopyPreallocSizeBytes: 1024 * 1024, // Zero-copy frame pool preallocation size in bytes (1MB)
ZeroCopyMinPreallocFrames: 1, // Minimum preallocated frames for zero-copy pool
BufferPoolHitRateBase: 100.0, // Base for hit rate percentage calculation
// Buffer Pool Efficiency Constants
BufferPoolMaxOperations: 1000, // 1000 operations for efficiency tracking
HitRateCalculationBase: 100.0, // 100.0 base for hit rate percentage calculation
// Validation Constants
@ -733,9 +653,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
// Batch Audio Processing Configuration
MinBatchSizeForThreadPinning: 5, // Minimum batch size to pin thread
// Goroutine Monitoring Configuration
GoroutineMonitorInterval: 30 * time.Second, // 30s monitoring interval
// Performance Configuration Flags - Production optimizations
}

View File

@ -158,78 +158,6 @@ var (
},
)
// Audio subprocess process metrics
audioProcessCpuPercent = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_audio_process_cpu_percent",
Help: "CPU usage percentage of audio output subprocess",
},
)
audioProcessMemoryPercent = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_audio_process_memory_percent",
Help: "Memory usage percentage of audio output subprocess",
},
)
audioProcessMemoryRssBytes = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_audio_process_memory_rss_bytes",
Help: "RSS memory usage in bytes of audio output subprocess",
},
)
audioProcessMemoryVmsBytes = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_audio_process_memory_vms_bytes",
Help: "VMS memory usage in bytes of audio output subprocess",
},
)
audioProcessRunning = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_audio_process_running",
Help: "Whether audio output subprocess is running (1=running, 0=stopped)",
},
)
// Microphone subprocess process metrics
microphoneProcessCpuPercent = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_microphone_process_cpu_percent",
Help: "CPU usage percentage of microphone input subprocess",
},
)
microphoneProcessMemoryPercent = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_microphone_process_memory_percent",
Help: "Memory usage percentage of microphone input subprocess",
},
)
microphoneProcessMemoryRssBytes = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_microphone_process_memory_rss_bytes",
Help: "RSS memory usage in bytes of microphone input subprocess",
},
)
microphoneProcessMemoryVmsBytes = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_microphone_process_memory_vms_bytes",
Help: "VMS memory usage in bytes of microphone input subprocess",
},
)
microphoneProcessRunning = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_microphone_process_running",
Help: "Whether microphone input subprocess is running (1=running, 0=stopped)",
},
)
// Device health metrics
// Removed device health metrics - functionality not used
@ -446,42 +374,6 @@ func UpdateMicrophoneMetrics(metrics UnifiedAudioMetrics) {
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateAudioProcessMetrics updates Prometheus metrics with audio subprocess data
func UpdateAudioProcessMetrics(metrics ProcessMetrics, isRunning bool) {
metricsUpdateMutex.Lock()
defer metricsUpdateMutex.Unlock()
audioProcessCpuPercent.Set(metrics.CPUPercent)
audioProcessMemoryPercent.Set(metrics.MemoryPercent)
audioProcessMemoryRssBytes.Set(float64(metrics.MemoryRSS))
audioProcessMemoryVmsBytes.Set(float64(metrics.MemoryVMS))
if isRunning {
audioProcessRunning.Set(1)
} else {
audioProcessRunning.Set(0)
}
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateMicrophoneProcessMetrics updates Prometheus metrics with microphone subprocess data
func UpdateMicrophoneProcessMetrics(metrics ProcessMetrics, isRunning bool) {
metricsUpdateMutex.Lock()
defer metricsUpdateMutex.Unlock()
microphoneProcessCpuPercent.Set(metrics.CPUPercent)
microphoneProcessMemoryPercent.Set(metrics.MemoryPercent)
microphoneProcessMemoryRssBytes.Set(float64(metrics.MemoryRSS))
microphoneProcessMemoryVmsBytes.Set(float64(metrics.MemoryVMS))
if isRunning {
microphoneProcessRunning.Set(1)
} else {
microphoneProcessRunning.Set(0)
}
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateAdaptiveBufferMetrics updates Prometheus metrics with adaptive buffer information
func UpdateAdaptiveBufferMetrics(inputBufferSize, outputBufferSize int, cpuPercent, memoryPercent float64, adjustmentMade bool) {
metricsUpdateMutex.Lock()

View File

@ -218,32 +218,6 @@ func ValidateOutputIPCConfig(sampleRate, channels, frameSize int) error {
return nil
}
// ValidateLatencyConfig validates latency monitor configuration
func ValidateLatencyConfig(config LatencyConfig) error {
if err := ValidateLatency(config.TargetLatency); err != nil {
return err
}
if err := ValidateLatency(config.MaxLatency); err != nil {
return err
}
if config.TargetLatency >= Config.MaxLatency {
return ErrInvalidLatency
}
if err := ValidateMetricsInterval(config.OptimizationInterval); err != nil {
return err
}
if config.HistorySize <= 0 {
return ErrInvalidBufferSize
}
if config.JitterThreshold < 0 {
return ErrInvalidLatency
}
if config.AdaptiveThreshold < 0 || config.AdaptiveThreshold > 1.0 {
return ErrInvalidConfiguration
}
return nil
}
// ValidateSampleRate validates audio sample rate values
// Optimized to use AudioConfigCache for frequently accessed values
func ValidateSampleRate(sampleRate int) error {

View File

@ -115,7 +115,6 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error {
Msg("High audio processing latency detected")
// Record latency for goroutine cleanup optimization
RecordAudioLatency(latencyMs)
}
if err != nil {
@ -156,7 +155,6 @@ func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame)
Msg("High audio processing latency detected")
// Record latency for goroutine cleanup optimization
RecordAudioLatency(latencyMs)
}
if err != nil {

View File

@ -135,7 +135,6 @@ func (ais *AudioInputSupervisor) startProcess() error {
ais.logger.Info().Int("pid", ais.processPID).Strs("args", args).Strs("opus_env", ais.opusEnv).Msg("audio input server process started")
// Add process to monitoring
ais.processMonitor.AddProcess(ais.processPID, "audio-input-server")
// Connect client to the server
go ais.connectClient()

View File

@ -117,10 +117,6 @@ type UnifiedAudioServer struct {
socketPath string
magicNumber uint32
socketBufferConfig SocketBufferConfig
// Performance monitoring
latencyMonitor *LatencyMonitor
adaptiveOptimizer *AdaptiveOptimizer
}
// NewUnifiedAudioServer creates a new unified audio server
@ -148,8 +144,6 @@ func NewUnifiedAudioServer(isInput bool) (*UnifiedAudioServer, error) {
messageChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize),
processChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize),
socketBufferConfig: DefaultSocketBufferConfig(),
latencyMonitor: nil,
adaptiveOptimizer: nil,
}
return server, nil
@ -365,10 +359,6 @@ func (s *UnifiedAudioServer) SendFrame(frame []byte) error {
}
// Record latency for monitoring
if s.latencyMonitor != nil {
writeLatency := time.Since(start)
s.latencyMonitor.RecordLatency(writeLatency, "ipc_write")
}
atomic.AddInt64(&s.totalFrames, 1)
return nil

View File

@ -28,7 +28,6 @@ type BaseSupervisor struct {
processPID int
// Process monitoring
processMonitor *ProcessMonitor
// Exit tracking
lastExitCode int
@ -46,7 +45,7 @@ func NewBaseSupervisor(componentName string) *BaseSupervisor {
logger := logging.GetDefaultLogger().With().Str("component", componentName).Logger()
return &BaseSupervisor{
logger: &logger,
processMonitor: GetProcessMonitor(),
stopChan: make(chan struct{}),
processDone: make(chan struct{}),
}
@ -211,7 +210,6 @@ func (bs *BaseSupervisor) waitForProcessExit(processType string) {
bs.mutex.Unlock()
// Remove process from monitoring
bs.processMonitor.RemoveProcess(pid)
if exitCode != 0 {
bs.logger.Error().Int("pid", pid).Int("exit_code", exitCode).Msgf("%s process exited with error", processType)

View File

@ -1,329 +0,0 @@
package audio
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/rs/zerolog"
)
// AdaptiveOptimizer automatically adjusts audio parameters based on latency metrics
type AdaptiveOptimizer struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
optimizationCount int64 // Number of optimizations performed (atomic)
lastOptimization int64 // Timestamp of last optimization (atomic)
optimizationLevel int64 // Current optimization level (0-10) (atomic)
stabilityScore int64 // Current stability score (0-100) (atomic)
optimizationInterval int64 // Current optimization interval in nanoseconds (atomic)
latencyMonitor *LatencyMonitor
bufferManager *AdaptiveBufferManager
logger zerolog.Logger
// Control channels
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// Configuration
config OptimizerConfig
// Stability tracking
stabilityHistory []StabilityMetric
stabilityMutex sync.RWMutex
}
// StabilityMetric tracks system stability over time
type StabilityMetric struct {
Timestamp time.Time
LatencyStdev float64
CPUVariance float64
MemoryStable bool
ErrorRate float64
StabilityScore int
}
// OptimizerConfig holds configuration for the adaptive optimizer
type OptimizerConfig struct {
MaxOptimizationLevel int // Maximum optimization level (0-10)
CooldownPeriod time.Duration // Minimum time between optimizations
Aggressiveness float64 // How aggressively to optimize (0.0-1.0)
RollbackThreshold time.Duration // Latency threshold to rollback optimizations
StabilityPeriod time.Duration // Time to wait for stability after optimization
// Adaptive interval configuration
MinOptimizationInterval time.Duration // Minimum optimization interval (high stability)
MaxOptimizationInterval time.Duration // Maximum optimization interval (low stability)
StabilityThreshold int // Stability score threshold for interval adjustment
StabilityHistorySize int // Number of stability metrics to track
}
// DefaultOptimizerConfig returns a sensible default configuration
func DefaultOptimizerConfig() OptimizerConfig {
return OptimizerConfig{
MaxOptimizationLevel: 8,
CooldownPeriod: Config.CooldownPeriod,
Aggressiveness: Config.OptimizerAggressiveness,
RollbackThreshold: Config.RollbackThreshold,
StabilityPeriod: Config.AdaptiveOptimizerStability,
// Adaptive interval defaults
MinOptimizationInterval: 100 * time.Millisecond, // High stability: check every 100ms
MaxOptimizationInterval: 2 * time.Second, // Low stability: check every 2s
StabilityThreshold: 70, // Stability score threshold
StabilityHistorySize: 20, // Track last 20 stability metrics
}
}
// NewAdaptiveOptimizer creates a new adaptive optimizer
func NewAdaptiveOptimizer(latencyMonitor *LatencyMonitor, bufferManager *AdaptiveBufferManager, config OptimizerConfig, logger zerolog.Logger) *AdaptiveOptimizer {
ctx, cancel := context.WithCancel(context.Background())
optimizer := &AdaptiveOptimizer{
latencyMonitor: latencyMonitor,
bufferManager: bufferManager,
config: config,
logger: logger.With().Str("component", "adaptive-optimizer").Logger(),
ctx: ctx,
cancel: cancel,
stabilityHistory: make([]StabilityMetric, 0, config.StabilityHistorySize),
}
// Initialize stability score and optimization interval
atomic.StoreInt64(&optimizer.stabilityScore, 50) // Start with medium stability
atomic.StoreInt64(&optimizer.optimizationInterval, int64(config.MaxOptimizationInterval))
// Register as latency monitor callback
latencyMonitor.AddOptimizationCallback(optimizer.handleLatencyOptimization)
return optimizer
}
// Start begins the adaptive optimization process
func (ao *AdaptiveOptimizer) Start() {
ao.wg.Add(1)
go ao.optimizationLoop()
ao.logger.Debug().Msg("adaptive optimizer started")
}
// Stop stops the adaptive optimizer
func (ao *AdaptiveOptimizer) Stop() {
ao.cancel()
ao.wg.Wait()
ao.logger.Debug().Msg("adaptive optimizer stopped")
}
// initializeStrategies sets up the available optimization strategies
// handleLatencyOptimization is called when latency optimization is needed
func (ao *AdaptiveOptimizer) handleLatencyOptimization(metrics LatencyMetrics) error {
currentLevel := atomic.LoadInt64(&ao.optimizationLevel)
lastOpt := atomic.LoadInt64(&ao.lastOptimization)
// Check cooldown period
if time.Since(time.Unix(0, lastOpt)) < ao.config.CooldownPeriod {
return nil
}
// Determine if we need to increase or decrease optimization level
targetLevel := ao.calculateTargetOptimizationLevel(metrics)
if targetLevel > currentLevel {
return ao.increaseOptimization(int(targetLevel))
} else if targetLevel < currentLevel {
return ao.decreaseOptimization(int(targetLevel))
}
return nil
}
// calculateTargetOptimizationLevel determines the appropriate optimization level
func (ao *AdaptiveOptimizer) calculateTargetOptimizationLevel(metrics LatencyMetrics) int64 {
// Base calculation on current latency vs target
latencyRatio := float64(metrics.Current) / float64(Config.AdaptiveOptimizerLatencyTarget) // 50ms target
// Adjust based on trend
switch metrics.Trend {
case LatencyTrendIncreasing:
latencyRatio *= 1.2 // Be more aggressive
case LatencyTrendDecreasing:
latencyRatio *= 0.8 // Be less aggressive
case LatencyTrendVolatile:
latencyRatio *= 1.1 // Slightly more aggressive
}
// Apply aggressiveness factor
latencyRatio *= ao.config.Aggressiveness
// Convert to optimization level
targetLevel := int64(latencyRatio * Config.LatencyScalingFactor) // Scale to 0-10 range
if targetLevel > int64(ao.config.MaxOptimizationLevel) {
targetLevel = int64(ao.config.MaxOptimizationLevel)
}
if targetLevel < 0 {
targetLevel = 0
}
return targetLevel
}
// increaseOptimization applies optimization strategies up to the target level
func (ao *AdaptiveOptimizer) increaseOptimization(targetLevel int) error {
atomic.StoreInt64(&ao.optimizationLevel, int64(targetLevel))
atomic.StoreInt64(&ao.lastOptimization, time.Now().UnixNano())
atomic.AddInt64(&ao.optimizationCount, 1)
return nil
}
// decreaseOptimization rolls back optimization strategies to the target level
func (ao *AdaptiveOptimizer) decreaseOptimization(targetLevel int) error {
atomic.StoreInt64(&ao.optimizationLevel, int64(targetLevel))
atomic.StoreInt64(&ao.lastOptimization, time.Now().UnixNano())
return nil
}
// optimizationLoop runs the main optimization monitoring loop
func (ao *AdaptiveOptimizer) optimizationLoop() {
defer ao.wg.Done()
// Start with initial interval
currentInterval := time.Duration(atomic.LoadInt64(&ao.optimizationInterval))
ticker := time.NewTicker(currentInterval)
defer ticker.Stop()
for {
select {
case <-ao.ctx.Done():
return
case <-ticker.C:
// Update stability metrics and check for optimization needs
ao.updateStabilityMetrics()
ao.checkStability()
// Adjust optimization interval based on current stability
newInterval := ao.calculateOptimizationInterval()
if newInterval != currentInterval {
currentInterval = newInterval
ticker.Reset(currentInterval)
ao.logger.Debug().Dur("new_interval", currentInterval).Int64("stability_score", atomic.LoadInt64(&ao.stabilityScore)).Msg("adjusted optimization interval")
}
}
}
}
// checkStability monitors system stability and rolls back if needed
func (ao *AdaptiveOptimizer) checkStability() {
metrics := ao.latencyMonitor.GetMetrics()
// Check if we need to rollback due to excessive latency
if metrics.Current > ao.config.RollbackThreshold {
currentLevel := int(atomic.LoadInt64(&ao.optimizationLevel))
if currentLevel > 0 {
ao.logger.Warn().Dur("current_latency", metrics.Current).Dur("threshold", ao.config.RollbackThreshold).Msg("rolling back optimizations due to excessive latency")
if err := ao.decreaseOptimization(currentLevel - 1); err != nil {
ao.logger.Error().Err(err).Msg("failed to decrease optimization level")
}
}
}
}
// updateStabilityMetrics calculates and stores current system stability metrics
func (ao *AdaptiveOptimizer) updateStabilityMetrics() {
metrics := ao.latencyMonitor.GetMetrics()
// Calculate stability score based on multiple factors
stabilityScore := ao.calculateStabilityScore(metrics)
atomic.StoreInt64(&ao.stabilityScore, int64(stabilityScore))
// Store stability metric in history
stabilityMetric := StabilityMetric{
Timestamp: time.Now(),
LatencyStdev: float64(metrics.Jitter), // Use Jitter as variance indicator
CPUVariance: 0.0, // TODO: Get from system metrics
MemoryStable: true, // TODO: Get from system metrics
ErrorRate: 0.0, // TODO: Get from error tracking
StabilityScore: stabilityScore,
}
ao.stabilityMutex.Lock()
ao.stabilityHistory = append(ao.stabilityHistory, stabilityMetric)
if len(ao.stabilityHistory) > ao.config.StabilityHistorySize {
ao.stabilityHistory = ao.stabilityHistory[1:]
}
ao.stabilityMutex.Unlock()
}
// calculateStabilityScore computes a stability score (0-100) based on system metrics
func (ao *AdaptiveOptimizer) calculateStabilityScore(metrics LatencyMetrics) int {
// Base score starts at 100 (perfect stability)
score := 100.0
// Penalize high jitter (latency variance)
if metrics.Jitter > 0 && metrics.Average > 0 {
jitterRatio := float64(metrics.Jitter) / float64(metrics.Average)
variancePenalty := jitterRatio * 50 // Scale jitter impact
score -= variancePenalty
}
// Penalize latency trend volatility
switch metrics.Trend {
case LatencyTrendVolatile:
score -= 20
case LatencyTrendIncreasing:
score -= 10
case LatencyTrendDecreasing:
score += 5 // Slight bonus for improving latency
}
// Ensure score is within bounds
if score < 0 {
score = 0
}
if score > 100 {
score = 100
}
return int(score)
}
// calculateOptimizationInterval determines the optimization interval based on stability
func (ao *AdaptiveOptimizer) calculateOptimizationInterval() time.Duration {
stabilityScore := atomic.LoadInt64(&ao.stabilityScore)
// High stability = shorter intervals (more frequent optimization)
// Low stability = longer intervals (less frequent optimization)
if stabilityScore >= int64(ao.config.StabilityThreshold) {
// High stability: use minimum interval
interval := ao.config.MinOptimizationInterval
atomic.StoreInt64(&ao.optimizationInterval, int64(interval))
return interval
} else {
// Low stability: scale interval based on stability score
// Lower stability = longer intervals
stabilityRatio := float64(stabilityScore) / float64(ao.config.StabilityThreshold)
minInterval := float64(ao.config.MinOptimizationInterval)
maxInterval := float64(ao.config.MaxOptimizationInterval)
// Linear interpolation between min and max intervals
interval := time.Duration(minInterval + (maxInterval-minInterval)*(1.0-stabilityRatio))
atomic.StoreInt64(&ao.optimizationInterval, int64(interval))
return interval
}
}
// GetOptimizationStats returns current optimization statistics
func (ao *AdaptiveOptimizer) GetOptimizationStats() map[string]interface{} {
return map[string]interface{}{
"optimization_level": atomic.LoadInt64(&ao.optimizationLevel),
"optimization_count": atomic.LoadInt64(&ao.optimizationCount),
"last_optimization": time.Unix(0, atomic.LoadInt64(&ao.lastOptimization)),
"stability_score": atomic.LoadInt64(&ao.stabilityScore),
"optimization_interval": time.Duration(atomic.LoadInt64(&ao.optimizationInterval)),
}
}
// Strategy implementation methods (stubs for now)

View File

@ -1,144 +0,0 @@
package audio
import (
"runtime"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
)
// GoroutineMonitor tracks goroutine count and provides cleanup mechanisms
type GoroutineMonitor struct {
baselineCount int
peakCount int
lastCount int
monitorInterval time.Duration
lastCheck time.Time
enabled int32
}
// Global goroutine monitor instance
var globalGoroutineMonitor *GoroutineMonitor
// NewGoroutineMonitor creates a new goroutine monitor
func NewGoroutineMonitor(monitorInterval time.Duration) *GoroutineMonitor {
if monitorInterval <= 0 {
monitorInterval = 30 * time.Second
}
// Get current goroutine count as baseline
baselineCount := runtime.NumGoroutine()
return &GoroutineMonitor{
baselineCount: baselineCount,
peakCount: baselineCount,
lastCount: baselineCount,
monitorInterval: monitorInterval,
lastCheck: time.Now(),
}
}
// Start begins goroutine monitoring
func (gm *GoroutineMonitor) Start() {
if !atomic.CompareAndSwapInt32(&gm.enabled, 0, 1) {
return // Already running
}
go gm.monitorLoop()
}
// Stop stops goroutine monitoring
func (gm *GoroutineMonitor) Stop() {
atomic.StoreInt32(&gm.enabled, 0)
}
// monitorLoop periodically checks goroutine count
func (gm *GoroutineMonitor) monitorLoop() {
logger := logging.GetDefaultLogger().With().Str("component", "goroutine-monitor").Logger()
logger.Info().Int("baseline", gm.baselineCount).Msg("goroutine monitor started")
for atomic.LoadInt32(&gm.enabled) == 1 {
time.Sleep(gm.monitorInterval)
gm.checkGoroutineCount()
}
logger.Info().Msg("goroutine monitor stopped")
}
// checkGoroutineCount checks current goroutine count and logs if it exceeds thresholds
func (gm *GoroutineMonitor) checkGoroutineCount() {
currentCount := runtime.NumGoroutine()
gm.lastCount = currentCount
// Update peak count if needed
if currentCount > gm.peakCount {
gm.peakCount = currentCount
}
// Calculate growth since baseline
growth := currentCount - gm.baselineCount
growthPercent := float64(growth) / float64(gm.baselineCount) * 100
// Log warning if growth exceeds thresholds
logger := logging.GetDefaultLogger().With().Str("component", "goroutine-monitor").Logger()
// Different log levels based on growth severity
if growthPercent > 30 {
// Severe growth - trigger cleanup
logger.Warn().Int("current", currentCount).Int("baseline", gm.baselineCount).
Int("growth", growth).Float64("growth_percent", growthPercent).
Msg("excessive goroutine growth detected - triggering cleanup")
// Force garbage collection to clean up unused resources
runtime.GC()
// Force cleanup of goroutine buffer cache
cleanupGoroutineCache()
} else if growthPercent > 20 {
// Moderate growth - just log warning
logger.Warn().Int("current", currentCount).Int("baseline", gm.baselineCount).
Int("growth", growth).Float64("growth_percent", growthPercent).
Msg("significant goroutine growth detected")
} else if growthPercent > 10 {
// Minor growth - log info
logger.Info().Int("current", currentCount).Int("baseline", gm.baselineCount).
Int("growth", growth).Float64("growth_percent", growthPercent).
Msg("goroutine growth detected")
}
// Update last check time
gm.lastCheck = time.Now()
}
// GetGoroutineStats returns current goroutine statistics
func (gm *GoroutineMonitor) GetGoroutineStats() map[string]interface{} {
return map[string]interface{}{
"current_count": gm.lastCount,
"baseline_count": gm.baselineCount,
"peak_count": gm.peakCount,
"growth": gm.lastCount - gm.baselineCount,
"growth_percent": float64(gm.lastCount-gm.baselineCount) / float64(gm.baselineCount) * 100,
"last_check": gm.lastCheck,
}
}
// GetGoroutineMonitor returns the global goroutine monitor instance
func GetGoroutineMonitor() *GoroutineMonitor {
if globalGoroutineMonitor == nil {
globalGoroutineMonitor = NewGoroutineMonitor(Config.GoroutineMonitorInterval)
}
return globalGoroutineMonitor
}
// StartGoroutineMonitoring starts the global goroutine monitor
func StartGoroutineMonitoring() {
// Goroutine monitoring disabled
}
// StopGoroutineMonitoring stops the global goroutine monitor
func StopGoroutineMonitoring() {
if globalGoroutineMonitor != nil {
globalGoroutineMonitor.Stop()
}
}

View File

@ -1,333 +0,0 @@
package audio
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/rs/zerolog"
)
// LatencyMonitor tracks and optimizes audio latency in real-time
type LatencyMonitor struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
currentLatency int64 // Current latency in nanoseconds (atomic)
averageLatency int64 // Rolling average latency in nanoseconds (atomic)
minLatency int64 // Minimum observed latency in nanoseconds (atomic)
maxLatency int64 // Maximum observed latency in nanoseconds (atomic)
latencySamples int64 // Number of latency samples collected (atomic)
jitterAccumulator int64 // Accumulated jitter for variance calculation (atomic)
lastOptimization int64 // Timestamp of last optimization in nanoseconds (atomic)
config LatencyConfig
logger zerolog.Logger
// Control channels
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// Optimization callbacks
optimizationCallbacks []OptimizationCallback
mutex sync.RWMutex
// Performance tracking
latencyHistory []LatencyMeasurement
historyMutex sync.RWMutex
}
// LatencyConfig holds configuration for latency monitoring
type LatencyConfig struct {
TargetLatency time.Duration // Target latency to maintain
MaxLatency time.Duration // Maximum acceptable latency
OptimizationInterval time.Duration // How often to run optimization
HistorySize int // Number of latency measurements to keep
JitterThreshold time.Duration // Jitter threshold for optimization
AdaptiveThreshold float64 // Threshold for adaptive adjustments (0.0-1.0)
}
// LatencyMeasurement represents a single latency measurement
type LatencyMeasurement struct {
Timestamp time.Time
Latency time.Duration
Jitter time.Duration
Source string // Source of the measurement (e.g., "input", "output", "processing")
}
// OptimizationCallback is called when latency optimization is triggered
type OptimizationCallback func(metrics LatencyMetrics) error
// LatencyMetrics provides comprehensive latency statistics
type LatencyMetrics struct {
Current time.Duration
Average time.Duration
Min time.Duration
Max time.Duration
Jitter time.Duration
SampleCount int64
Trend LatencyTrend
}
// LatencyTrend indicates the direction of latency changes
type LatencyTrend int
const (
LatencyTrendStable LatencyTrend = iota
LatencyTrendIncreasing
LatencyTrendDecreasing
LatencyTrendVolatile
)
// DefaultLatencyConfig returns a sensible default configuration
func DefaultLatencyConfig() LatencyConfig {
config := Config
return LatencyConfig{
TargetLatency: config.LatencyMonitorTarget,
MaxLatency: config.MaxLatencyThreshold,
OptimizationInterval: config.LatencyOptimizationInterval,
HistorySize: config.LatencyHistorySize,
JitterThreshold: config.JitterThreshold,
AdaptiveThreshold: config.LatencyAdaptiveThreshold,
}
}
// NewLatencyMonitor creates a new latency monitoring system
func NewLatencyMonitor(config LatencyConfig, logger zerolog.Logger) *LatencyMonitor {
// Validate latency configuration
if err := ValidateLatencyConfig(config); err != nil {
// Log validation error and use default configuration
logger.Error().Err(err).Msg("Invalid latency configuration provided, using defaults")
config = DefaultLatencyConfig()
}
ctx, cancel := context.WithCancel(context.Background())
return &LatencyMonitor{
config: config,
logger: logger.With().Str("component", "latency-monitor").Logger(),
ctx: ctx,
cancel: cancel,
latencyHistory: make([]LatencyMeasurement, 0, config.HistorySize),
minLatency: int64(time.Hour), // Initialize to high value
}
}
// Start begins latency monitoring and optimization
func (lm *LatencyMonitor) Start() {
lm.wg.Add(1)
go lm.monitoringLoop()
}
// Stop stops the latency monitor
func (lm *LatencyMonitor) Stop() {
lm.cancel()
lm.wg.Wait()
}
// RecordLatency records a new latency measurement
func (lm *LatencyMonitor) RecordLatency(latency time.Duration, source string) {
now := time.Now()
latencyNanos := latency.Nanoseconds()
// Update atomic counters
atomic.StoreInt64(&lm.currentLatency, latencyNanos)
atomic.AddInt64(&lm.latencySamples, 1)
// Update min/max
for {
oldMin := atomic.LoadInt64(&lm.minLatency)
if latencyNanos >= oldMin || atomic.CompareAndSwapInt64(&lm.minLatency, oldMin, latencyNanos) {
break
}
}
for {
oldMax := atomic.LoadInt64(&lm.maxLatency)
if latencyNanos <= oldMax || atomic.CompareAndSwapInt64(&lm.maxLatency, oldMax, latencyNanos) {
break
}
}
// Update rolling average using exponential moving average
oldAvg := atomic.LoadInt64(&lm.averageLatency)
newAvg := oldAvg + (latencyNanos-oldAvg)/10 // Alpha = 0.1
atomic.StoreInt64(&lm.averageLatency, newAvg)
// Calculate jitter (difference from average)
jitter := latencyNanos - newAvg
if jitter < 0 {
jitter = -jitter
}
atomic.AddInt64(&lm.jitterAccumulator, jitter)
// Store in history
lm.historyMutex.Lock()
measurement := LatencyMeasurement{
Timestamp: now,
Latency: latency,
Jitter: time.Duration(jitter),
Source: source,
}
if len(lm.latencyHistory) >= lm.config.HistorySize {
// Remove oldest measurement
copy(lm.latencyHistory, lm.latencyHistory[1:])
lm.latencyHistory[len(lm.latencyHistory)-1] = measurement
} else {
lm.latencyHistory = append(lm.latencyHistory, measurement)
}
lm.historyMutex.Unlock()
}
// GetMetrics returns current latency metrics
func (lm *LatencyMonitor) GetMetrics() LatencyMetrics {
current := atomic.LoadInt64(&lm.currentLatency)
average := atomic.LoadInt64(&lm.averageLatency)
min := atomic.LoadInt64(&lm.minLatency)
max := atomic.LoadInt64(&lm.maxLatency)
samples := atomic.LoadInt64(&lm.latencySamples)
jitterSum := atomic.LoadInt64(&lm.jitterAccumulator)
var jitter time.Duration
if samples > 0 {
jitter = time.Duration(jitterSum / samples)
}
return LatencyMetrics{
Current: time.Duration(current),
Average: time.Duration(average),
Min: time.Duration(min),
Max: time.Duration(max),
Jitter: jitter,
SampleCount: samples,
Trend: lm.calculateTrend(),
}
}
// AddOptimizationCallback adds a callback for latency optimization
func (lm *LatencyMonitor) AddOptimizationCallback(callback OptimizationCallback) {
lm.mutex.Lock()
lm.optimizationCallbacks = append(lm.optimizationCallbacks, callback)
lm.mutex.Unlock()
}
// monitoringLoop runs the main monitoring and optimization loop
func (lm *LatencyMonitor) monitoringLoop() {
defer lm.wg.Done()
ticker := time.NewTicker(lm.config.OptimizationInterval)
defer ticker.Stop()
for {
select {
case <-lm.ctx.Done():
return
case <-ticker.C:
lm.runOptimization()
}
}
}
// runOptimization checks if optimization is needed and triggers callbacks with threshold validation.
//
// Validation Rules:
// - Current latency must not exceed MaxLatency (default: 200ms)
// - Average latency checked against adaptive threshold: TargetLatency * (1 + AdaptiveThreshold)
// - Jitter must not exceed JitterThreshold (default: 20ms)
// - All latency values must be non-negative durations
//
// Optimization Triggers:
// - Current latency > MaxLatency: Immediate optimization needed
// - Average latency > adaptive threshold: Gradual optimization needed
// - Jitter > JitterThreshold: Stability optimization needed
//
// Threshold Calculations:
// - Adaptive threshold = TargetLatency * (1.0 + AdaptiveThreshold)
// - Default: 50ms * (1.0 + 0.8) = 90ms adaptive threshold
// - Provides buffer above target before triggering optimization
//
// The function ensures real-time audio performance by monitoring multiple
// latency metrics and triggering optimization callbacks when thresholds are exceeded.
func (lm *LatencyMonitor) runOptimization() {
metrics := lm.GetMetrics()
// Check if optimization is needed
needsOptimization := false
// Check if current latency exceeds threshold
if metrics.Current > lm.config.MaxLatency {
needsOptimization = true
lm.logger.Warn().Dur("current_latency", metrics.Current).Dur("max_latency", lm.config.MaxLatency).Msg("latency exceeds maximum threshold")
}
// Check if average latency is above adaptive threshold
adaptiveThreshold := time.Duration(float64(lm.config.TargetLatency.Nanoseconds()) * (1.0 + lm.config.AdaptiveThreshold))
if metrics.Average > adaptiveThreshold {
needsOptimization = true
}
// Check if jitter is too high
if metrics.Jitter > lm.config.JitterThreshold {
needsOptimization = true
}
if needsOptimization {
atomic.StoreInt64(&lm.lastOptimization, time.Now().UnixNano())
// Run optimization callbacks
lm.mutex.RLock()
callbacks := make([]OptimizationCallback, len(lm.optimizationCallbacks))
copy(callbacks, lm.optimizationCallbacks)
lm.mutex.RUnlock()
for _, callback := range callbacks {
if err := callback(metrics); err != nil {
lm.logger.Error().Err(err).Msg("optimization callback failed")
}
}
}
}
// calculateTrend analyzes recent latency measurements to determine trend
func (lm *LatencyMonitor) calculateTrend() LatencyTrend {
lm.historyMutex.RLock()
defer lm.historyMutex.RUnlock()
if len(lm.latencyHistory) < 10 {
return LatencyTrendStable
}
// Analyze last 10 measurements
recentMeasurements := lm.latencyHistory[len(lm.latencyHistory)-10:]
var increasing, decreasing int
for i := 1; i < len(recentMeasurements); i++ {
if recentMeasurements[i].Latency > recentMeasurements[i-1].Latency {
increasing++
} else if recentMeasurements[i].Latency < recentMeasurements[i-1].Latency {
decreasing++
}
}
// Determine trend based on direction changes
if increasing > 6 {
return LatencyTrendIncreasing
} else if decreasing > 6 {
return LatencyTrendDecreasing
} else if increasing+decreasing > 7 {
return LatencyTrendVolatile
}
return LatencyTrendStable
}
// GetLatencyHistory returns a copy of recent latency measurements
func (lm *LatencyMonitor) GetLatencyHistory() []LatencyMeasurement {
lm.historyMutex.RLock()
defer lm.historyMutex.RUnlock()
history := make([]LatencyMeasurement, len(lm.latencyHistory))
copy(history, lm.latencyHistory)
return history
}

View File

@ -1,406 +0,0 @@
package audio
import (
"bufio"
"fmt"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// Variables for process monitoring (using configuration)
var (
// System constants
maxCPUPercent = Config.MaxCPUPercent
minCPUPercent = Config.MinCPUPercent
defaultClockTicks = Config.DefaultClockTicks
defaultMemoryGB = Config.DefaultMemoryGB
// Monitoring thresholds
maxWarmupSamples = Config.MaxWarmupSamples
warmupCPUSamples = Config.WarmupCPUSamples
// Channel buffer size
metricsChannelBuffer = Config.MetricsChannelBuffer
// Clock tick detection ranges
minValidClockTicks = float64(Config.MinValidClockTicks)
maxValidClockTicks = float64(Config.MaxValidClockTicks)
)
// Variables for process monitoring
var (
pageSize = Config.PageSize
)
// ProcessMetrics represents CPU and memory usage metrics for a process
type ProcessMetrics struct {
PID int `json:"pid"`
CPUPercent float64 `json:"cpu_percent"`
MemoryRSS int64 `json:"memory_rss_bytes"`
MemoryVMS int64 `json:"memory_vms_bytes"`
MemoryPercent float64 `json:"memory_percent"`
Timestamp time.Time `json:"timestamp"`
ProcessName string `json:"process_name"`
}
type ProcessMonitor struct {
logger zerolog.Logger
mutex sync.RWMutex
monitoredPIDs map[int]*processState
running bool
stopChan chan struct{}
metricsChan chan ProcessMetrics
updateInterval time.Duration
totalMemory int64
memoryOnce sync.Once
clockTicks float64
clockTicksOnce sync.Once
}
// processState tracks the state needed for CPU calculation
type processState struct {
name string
lastCPUTime int64
lastSysTime int64
lastUserTime int64
lastSample time.Time
warmupSamples int
}
// NewProcessMonitor creates a new process monitor
func NewProcessMonitor() *ProcessMonitor {
return &ProcessMonitor{
logger: logging.GetDefaultLogger().With().Str("component", "process-monitor").Logger(),
monitoredPIDs: make(map[int]*processState),
stopChan: make(chan struct{}),
metricsChan: make(chan ProcessMetrics, metricsChannelBuffer),
updateInterval: GetMetricsUpdateInterval(),
}
}
// Start begins monitoring processes
func (pm *ProcessMonitor) Start() {
pm.mutex.Lock()
defer pm.mutex.Unlock()
if pm.running {
return
}
pm.running = true
go pm.monitorLoop()
pm.logger.Debug().Msg("process monitor started")
}
// Stop stops monitoring processes
func (pm *ProcessMonitor) Stop() {
pm.mutex.Lock()
defer pm.mutex.Unlock()
if !pm.running {
return
}
pm.running = false
close(pm.stopChan)
pm.logger.Debug().Msg("process monitor stopped")
}
// AddProcess adds a process to monitor
func (pm *ProcessMonitor) AddProcess(pid int, name string) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.monitoredPIDs[pid] = &processState{
name: name,
lastSample: time.Now(),
}
pm.logger.Info().Int("pid", pid).Str("name", name).Msg("Added process to monitor")
}
// RemoveProcess removes a process from monitoring
func (pm *ProcessMonitor) RemoveProcess(pid int) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
delete(pm.monitoredPIDs, pid)
pm.logger.Info().Int("pid", pid).Msg("Removed process from monitor")
}
// GetMetricsChan returns the channel for receiving metrics
func (pm *ProcessMonitor) GetMetricsChan() <-chan ProcessMetrics {
return pm.metricsChan
}
// GetCurrentMetrics returns current metrics for all monitored processes
func (pm *ProcessMonitor) GetCurrentMetrics() []ProcessMetrics {
pm.mutex.RLock()
defer pm.mutex.RUnlock()
var metrics []ProcessMetrics
for pid, state := range pm.monitoredPIDs {
if metric, err := pm.collectMetrics(pid, state); err == nil {
metrics = append(metrics, metric)
}
}
return metrics
}
// monitorLoop is the main monitoring loop
func (pm *ProcessMonitor) monitorLoop() {
ticker := time.NewTicker(pm.updateInterval)
defer ticker.Stop()
for {
select {
case <-pm.stopChan:
return
case <-ticker.C:
pm.collectAllMetrics()
}
}
}
func (pm *ProcessMonitor) collectAllMetrics() {
pm.mutex.RLock()
pidsToCheck := make([]int, 0, len(pm.monitoredPIDs))
states := make([]*processState, 0, len(pm.monitoredPIDs))
for pid, state := range pm.monitoredPIDs {
pidsToCheck = append(pidsToCheck, pid)
states = append(states, state)
}
pm.mutex.RUnlock()
deadPIDs := make([]int, 0)
for i, pid := range pidsToCheck {
if metric, err := pm.collectMetrics(pid, states[i]); err == nil {
select {
case pm.metricsChan <- metric:
default:
}
} else {
deadPIDs = append(deadPIDs, pid)
}
}
for _, pid := range deadPIDs {
pm.RemoveProcess(pid)
}
}
func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessMetrics, error) {
now := time.Now()
metric := ProcessMetrics{
PID: pid,
Timestamp: now,
ProcessName: state.name,
}
statPath := fmt.Sprintf("/proc/%d/stat", pid)
statData, err := os.ReadFile(statPath)
if err != nil {
return metric, fmt.Errorf("failed to read process statistics from /proc/%d/stat: %w", pid, err)
}
fields := strings.Fields(string(statData))
if len(fields) < 24 {
return metric, fmt.Errorf("invalid process stat format: expected at least 24 fields, got %d from /proc/%d/stat", len(fields), pid)
}
utime, _ := strconv.ParseInt(fields[13], 10, 64)
stime, _ := strconv.ParseInt(fields[14], 10, 64)
totalCPUTime := utime + stime
vsize, _ := strconv.ParseInt(fields[22], 10, 64)
rss, _ := strconv.ParseInt(fields[23], 10, 64)
metric.MemoryRSS = rss * int64(pageSize)
metric.MemoryVMS = vsize
// Calculate CPU percentage
metric.CPUPercent = pm.calculateCPUPercent(totalCPUTime, state, now)
// Increment warmup counter
if state.warmupSamples < maxWarmupSamples {
state.warmupSamples++
}
// Calculate memory percentage (RSS / total system memory)
if totalMem := pm.getTotalMemory(); totalMem > 0 {
metric.MemoryPercent = float64(metric.MemoryRSS) / float64(totalMem) * Config.PercentageMultiplier
}
// Update state for next calculation
state.lastCPUTime = totalCPUTime
state.lastUserTime = utime
state.lastSysTime = stime
state.lastSample = now
return metric, nil
}
// calculateCPUPercent calculates CPU percentage for a process with validation and bounds checking.
//
// Validation Rules:
// - Returns 0.0 for first sample (no baseline for comparison)
// - Requires positive time delta between samples
// - Applies CPU percentage bounds: [MinCPUPercent, MaxCPUPercent]
// - Uses system clock ticks for accurate CPU time conversion
// - Validates clock ticks within range [MinValidClockTicks, MaxValidClockTicks]
//
// Bounds Applied:
// - CPU percentage clamped to [0.01%, 100.0%] (default values)
// - Clock ticks validated within [50, 1000] range (default values)
// - Time delta must be > 0 to prevent division by zero
//
// Warmup Behavior:
// - During warmup period (< WarmupCPUSamples), returns MinCPUPercent for idle processes
// - This indicates process is alive but not consuming significant CPU
//
// The function ensures accurate CPU percentage calculation while preventing
// invalid measurements that could affect system monitoring and adaptive algorithms.
func (pm *ProcessMonitor) calculateCPUPercent(totalCPUTime int64, state *processState, now time.Time) float64 {
if state.lastSample.IsZero() {
// First sample - initialize baseline
state.warmupSamples = 0
return 0.0
}
timeDelta := now.Sub(state.lastSample).Seconds()
cpuDelta := float64(totalCPUTime - state.lastCPUTime)
if timeDelta <= 0 {
return 0.0
}
if cpuDelta > 0 {
// Convert from clock ticks to seconds using actual system clock ticks
clockTicks := pm.getClockTicks()
cpuSeconds := cpuDelta / clockTicks
cpuPercent := (cpuSeconds / timeDelta) * Config.PercentageMultiplier
// Apply bounds
if cpuPercent > maxCPUPercent {
cpuPercent = maxCPUPercent
}
if cpuPercent < minCPUPercent {
cpuPercent = minCPUPercent
}
return cpuPercent
}
// No CPU delta - process was idle
if state.warmupSamples < warmupCPUSamples {
// During warmup, provide a small non-zero value to indicate process is alive
return minCPUPercent
}
return 0.0
}
func (pm *ProcessMonitor) getClockTicks() float64 {
pm.clockTicksOnce.Do(func() {
// Try to detect actual clock ticks from kernel boot parameters or /proc/stat
if data, err := os.ReadFile("/proc/cmdline"); err == nil {
// Look for HZ parameter in kernel command line
cmdline := string(data)
if strings.Contains(cmdline, "HZ=") {
fields := strings.Fields(cmdline)
for _, field := range fields {
if strings.HasPrefix(field, "HZ=") {
if hz, err := strconv.ParseFloat(field[3:], 64); err == nil && hz > 0 {
pm.clockTicks = hz
return
}
}
}
}
}
// Try reading from /proc/timer_list for more accurate detection
if data, err := os.ReadFile("/proc/timer_list"); err == nil {
timer := string(data)
// Look for tick device frequency
lines := strings.Split(timer, "\n")
for _, line := range lines {
if strings.Contains(line, "tick_period:") {
fields := strings.Fields(line)
if len(fields) >= 2 {
if period, err := strconv.ParseInt(fields[1], 10, 64); err == nil && period > 0 {
// Convert nanoseconds to Hz
hz := Config.CGONanosecondsPerSecond / float64(period)
if hz >= minValidClockTicks && hz <= maxValidClockTicks {
pm.clockTicks = hz
return
}
}
}
}
}
}
// Fallback: Most embedded ARM systems (like jetKVM) use 250 Hz or 1000 Hz
// rather than the traditional 100 Hz
pm.clockTicks = defaultClockTicks
pm.logger.Warn().Float64("clock_ticks", pm.clockTicks).Msg("Using fallback clock ticks value")
// Log successful detection for non-fallback values
if pm.clockTicks != defaultClockTicks {
pm.logger.Info().Float64("clock_ticks", pm.clockTicks).Msg("Detected system clock ticks")
}
})
return pm.clockTicks
}
func (pm *ProcessMonitor) getTotalMemory() int64 {
pm.memoryOnce.Do(func() {
file, err := os.Open("/proc/meminfo")
if err != nil {
pm.totalMemory = int64(defaultMemoryGB) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes)
return
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "MemTotal:") {
fields := strings.Fields(line)
if len(fields) >= 2 {
if kb, err := strconv.ParseInt(fields[1], 10, 64); err == nil {
pm.totalMemory = kb * int64(Config.ProcessMonitorKBToBytes)
return
}
}
break
}
}
pm.totalMemory = int64(defaultMemoryGB) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes) // Fallback
})
return pm.totalMemory
}
// GetTotalMemory returns total system memory in bytes (public method)
func (pm *ProcessMonitor) GetTotalMemory() int64 {
return pm.getTotalMemory()
}
// Global process monitor instance
var globalProcessMonitor *ProcessMonitor
var processMonitorOnce sync.Once
// GetProcessMonitor returns the global process monitor instance
func GetProcessMonitor() *ProcessMonitor {
processMonitorOnce.Do(func() {
globalProcessMonitor = NewProcessMonitor()
globalProcessMonitor.Start()
})
return globalProcessMonitor
}

View File

@ -49,7 +49,6 @@ func getOutputStreamingLogger() *zerolog.Logger {
// StartAudioOutputStreaming starts audio output streaming (capturing system audio)
func StartAudioOutputStreaming(send func([]byte)) error {
// Initialize audio monitoring (latency tracking and cache cleanup)
InitializeAudioMonitoring()
if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) {
return ErrAudioAlreadyRunning

View File

@ -213,7 +213,6 @@ func (s *AudioOutputSupervisor) startProcess() error {
s.logger.Info().Int("pid", s.processPID).Strs("args", args).Strs("opus_env", s.opusEnv).Msg("audio server process started")
// Add process to monitoring
s.processMonitor.AddProcess(s.processPID, "audio-output-server")
if s.onProcessStart != nil {
s.onProcessStart(s.processPID)

View File

@ -4,804 +4,138 @@
package audio
import (
"runtime"
"sort"
"sync"
"sync/atomic"
"time"
"unsafe"
)
// AudioLatencyInfo holds simplified latency information for cleanup decisions
type AudioLatencyInfo struct {
LatencyMs float64
Timestamp time.Time
}
// Global latency tracking
var (
currentAudioLatency = AudioLatencyInfo{}
currentAudioLatencyLock sync.RWMutex
audioMonitoringInitialized int32 // Atomic flag to track initialization
)
// InitializeAudioMonitoring starts the background goroutines for latency tracking and cache cleanup
// This is safe to call multiple times as it will only initialize once
func InitializeAudioMonitoring() {
// Use atomic CAS to ensure we only initialize once
if atomic.CompareAndSwapInt32(&audioMonitoringInitialized, 0, 1) {
// Start the latency recorder
startLatencyRecorder()
// Start the cleanup goroutine
startCleanupGoroutine()
}
}
// latencyChannel is used for non-blocking latency recording
var latencyChannel = make(chan float64, 10)
// startLatencyRecorder starts the latency recorder goroutine
// This should be called during package initialization
func startLatencyRecorder() {
go latencyRecorderLoop()
}
// latencyRecorderLoop processes latency recordings in the background
func latencyRecorderLoop() {
for latencyMs := range latencyChannel {
currentAudioLatencyLock.Lock()
currentAudioLatency = AudioLatencyInfo{
LatencyMs: latencyMs,
Timestamp: time.Now(),
}
currentAudioLatencyLock.Unlock()
}
}
// RecordAudioLatency records the current audio processing latency
// This is called from the audio input manager when latency is measured
// It is non-blocking to ensure zero overhead in the critical audio path
func RecordAudioLatency(latencyMs float64) {
// Non-blocking send - if channel is full, we drop the update
select {
case latencyChannel <- latencyMs:
// Successfully sent
default:
// Channel full, drop this update to avoid blocking the audio path
}
}
// GetAudioLatencyMetrics returns the current audio latency information
// Returns nil if no latency data is available or if it's too old
func GetAudioLatencyMetrics() *AudioLatencyInfo {
currentAudioLatencyLock.RLock()
defer currentAudioLatencyLock.RUnlock()
// Check if we have valid latency data
if currentAudioLatency.Timestamp.IsZero() {
return nil
}
// Check if the data is too old (more than 5 seconds)
if time.Since(currentAudioLatency.Timestamp) > 5*time.Second {
return nil
}
return &AudioLatencyInfo{
LatencyMs: currentAudioLatency.LatencyMs,
Timestamp: currentAudioLatency.Timestamp,
}
}
// Enhanced lock-free buffer cache for per-goroutine optimization
type lockFreeBufferCache struct {
buffers [8]*[]byte // Increased from 4 to 8 buffers per goroutine cache for better hit rates
}
// Buffer pool constants are now configured via Config
// See core_config_constants.go for default values
// TTL tracking for goroutine cache entries
type cacheEntry struct {
cache *lockFreeBufferCache
lastAccess int64 // Unix timestamp of last access
gid int64 // Goroutine ID for better tracking
}
// Per-goroutine buffer cache using goroutine-local storage
var goroutineBufferCache = make(map[int64]*lockFreeBufferCache)
var goroutineCacheMutex sync.RWMutex
var goroutineCacheWithTTL = make(map[int64]*cacheEntry)
var lastCleanupTime int64 // Unix timestamp of last cleanup
// getGoroutineID extracts goroutine ID from runtime stack for cache key
func getGoroutineID() int64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
// Parse "goroutine 123 [running]:" format
for i := 10; i < len(b); i++ {
if b[i] == ' ' {
id := int64(0)
for j := 10; j < i; j++ {
if b[j] >= '0' && b[j] <= '9' {
id = id*10 + int64(b[j]-'0')
}
}
return id
}
}
return 0
}
// Map of goroutine ID to cache entry with TTL tracking (declared above)
// cleanupChannel is used for asynchronous cleanup requests
var cleanupChannel = make(chan struct{}, 1)
// startCleanupGoroutine starts the cleanup goroutine
// This should be called during package initialization
func startCleanupGoroutine() {
go cleanupLoop()
}
// cleanupLoop processes cleanup requests in the background
func cleanupLoop() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-cleanupChannel:
// Received explicit cleanup request
performCleanup(true)
case <-ticker.C:
// Regular cleanup check
performCleanup(false)
}
}
}
// requestCleanup signals the cleanup goroutine to perform a cleanup
// This is non-blocking and can be called from the critical path
func requestCleanup() {
select {
case cleanupChannel <- struct{}{}:
// Successfully requested cleanup
default:
// Channel full, cleanup already pending
}
}
// performCleanup does the actual cache cleanup work
// This runs in a dedicated goroutine, not in the critical path
func performCleanup(forced bool) {
now := time.Now().Unix()
lastCleanup := atomic.LoadInt64(&lastCleanupTime)
// Check if we're in a high-latency situation
isHighLatency := false
latencyMetrics := GetAudioLatencyMetrics()
if latencyMetrics != nil && latencyMetrics.LatencyMs > 10.0 {
// Under high latency, be more aggressive with cleanup
isHighLatency = true
}
// Only cleanup if enough time has passed (less time if high latency) or if forced
interval := Config.BufferPoolCleanupInterval
if isHighLatency {
interval = Config.BufferPoolCleanupInterval / 2 // More frequent cleanup under high latency
}
if !forced && now-lastCleanup < interval {
return
}
// Try to acquire cleanup lock atomically
if !atomic.CompareAndSwapInt64(&lastCleanupTime, lastCleanup, now) {
return // Another goroutine is already cleaning up
}
// Perform the actual cleanup
doCleanupGoroutineCache()
}
// cleanupGoroutineCache triggers an asynchronous cleanup of the goroutine cache
// This is safe to call from the critical path as it's non-blocking
func cleanupGoroutineCache() {
// Request asynchronous cleanup
requestCleanup()
}
// The actual cleanup implementation that runs in the background goroutine
func doCleanupGoroutineCache() {
// Get current time for TTL calculations
now := time.Now().Unix()
// Check if we're in a high-latency situation
isHighLatency := false
latencyMetrics := GetAudioLatencyMetrics()
if latencyMetrics != nil && latencyMetrics.LatencyMs > 10.0 {
// Under high latency, be more aggressive with cleanup
isHighLatency = true
}
goroutineCacheMutex.Lock()
defer goroutineCacheMutex.Unlock()
// Convert old cache format to new TTL-based format if needed
if len(goroutineCacheWithTTL) == 0 && len(goroutineBufferCache) > 0 {
for gid, cache := range goroutineBufferCache {
goroutineCacheWithTTL[gid] = &cacheEntry{
cache: cache,
lastAccess: now,
gid: gid,
}
}
// Clear old cache to free memory
goroutineBufferCache = make(map[int64]*lockFreeBufferCache)
}
// Enhanced cleanup with size limits and better TTL management
entriesToRemove := make([]int64, 0)
ttl := Config.BufferPoolBufferTTL
if isHighLatency {
// Under high latency, use a much shorter TTL
ttl = Config.BufferPoolBufferTTL / 4
}
// Remove entries older than enhanced TTL
for gid, entry := range goroutineCacheWithTTL {
// Both now and entry.lastAccess are int64, so this comparison is safe
if now-entry.lastAccess > ttl {
entriesToRemove = append(entriesToRemove, gid)
}
}
// If we have too many cache entries, remove the oldest ones
if len(goroutineCacheWithTTL) > Config.BufferPoolMaxCacheEntries {
// Sort by last access time and remove oldest entries
type cacheEntryWithGID struct {
gid int64
lastAccess int64
}
entries := make([]cacheEntryWithGID, 0, len(goroutineCacheWithTTL))
for gid, entry := range goroutineCacheWithTTL {
entries = append(entries, cacheEntryWithGID{gid: gid, lastAccess: entry.lastAccess})
}
// Sort by last access time (oldest first)
sort.Slice(entries, func(i, j int) bool {
return entries[i].lastAccess < entries[j].lastAccess
})
// Mark oldest entries for removal
excessCount := len(goroutineCacheWithTTL) - Config.BufferPoolMaxCacheEntries
for i := 0; i < excessCount && i < len(entries); i++ {
entriesToRemove = append(entriesToRemove, entries[i].gid)
}
}
// If cache is still too large after TTL cleanup, remove oldest entries
// Under high latency, use a more aggressive target size
targetSize := Config.BufferPoolMaxCacheSize
targetReduction := Config.BufferPoolMaxCacheSize / 2
if isHighLatency {
// Under high latency, target a much smaller cache size
targetSize = Config.BufferPoolMaxCacheSize / 4
targetReduction = Config.BufferPoolMaxCacheSize / 8
}
if len(goroutineCacheWithTTL) > targetSize {
// Find oldest entries
type ageEntry struct {
gid int64
lastAccess int64
}
oldestEntries := make([]ageEntry, 0, len(goroutineCacheWithTTL))
for gid, entry := range goroutineCacheWithTTL {
oldestEntries = append(oldestEntries, ageEntry{gid, entry.lastAccess})
}
// Sort by lastAccess (oldest first)
sort.Slice(oldestEntries, func(i, j int) bool {
return oldestEntries[i].lastAccess < oldestEntries[j].lastAccess
})
// Remove oldest entries to get down to target reduction size
toRemove := len(goroutineCacheWithTTL) - targetReduction
for i := 0; i < toRemove && i < len(oldestEntries); i++ {
entriesToRemove = append(entriesToRemove, oldestEntries[i].gid)
}
}
// Remove marked entries and return their buffers to the pool
for _, gid := range entriesToRemove {
if entry, exists := goroutineCacheWithTTL[gid]; exists {
// Return buffers to main pool before removing entry
for i, buf := range entry.cache.buffers {
if buf != nil {
// Clear the buffer slot atomically
entry.cache.buffers[i] = nil
}
}
delete(goroutineCacheWithTTL, gid)
}
}
}
// AudioBufferPool provides a simple buffer pool for audio processing
type AudioBufferPool struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
currentSize int64 // Current pool size (atomic)
// Atomic counters
hitCount int64 // Pool hit counter (atomic)
missCount int64 // Pool miss counter (atomic)
// Other fields
pool sync.Pool
// Pool configuration
bufferSize int
maxPoolSize int
mutex sync.RWMutex
// Memory optimization fields
preallocated []*[]byte // Pre-allocated buffers for immediate use
preallocSize int // Number of pre-allocated buffers
// Chunk-based allocation optimization
chunkSize int // Size of each memory chunk
chunks [][]byte // Pre-allocated memory chunks
chunkOffsets []int // Current offset in each chunk
chunkMutex sync.Mutex // Protects chunk allocation
pool chan []byte
maxSize int
}
// NewAudioBufferPool creates a new simple audio buffer pool
func NewAudioBufferPool(bufferSize int) *AudioBufferPool {
// Validate buffer size parameter
if err := ValidateBufferSize(bufferSize); err != nil {
// Use default value on validation error
bufferSize = Config.AudioFramePoolSize
maxSize := Config.MaxPoolSize
if maxSize <= 0 {
maxSize = Config.BufferPoolDefaultSize
}
// Enhanced preallocation strategy based on buffer size and system capacity
var preallocSize int
if bufferSize <= Config.AudioFramePoolSize {
// For smaller pools, use enhanced preallocation
preallocSize = Config.PreallocPercentage * 2
} else {
// For larger pools, use standard enhanced preallocation
preallocSize = (Config.PreallocPercentage * 3) / 2
}
// Ensure minimum preallocation for better performance
if preallocSize < Config.BufferPoolMinPreallocBuffers {
preallocSize = Config.BufferPoolMinPreallocBuffers
}
// Calculate max pool size based on buffer size to prevent memory bloat
maxPoolSize := Config.BufferPoolMaxPoolSize // Default
if bufferSize > 8192 {
maxPoolSize = Config.BufferPoolMaxPoolSize / 4 // Much smaller for very large buffers
} else if bufferSize > 4096 {
maxPoolSize = Config.BufferPoolMaxPoolSize / 2 // Smaller for large buffers
} else if bufferSize > 1024 {
maxPoolSize = (Config.BufferPoolMaxPoolSize * 3) / 4 // Medium for medium buffers
}
// Calculate chunk size - allocate larger chunks to reduce allocation frequency
chunkSize := bufferSize * Config.BufferPoolChunkBufferCount // Each chunk holds multiple buffers worth of memory
if chunkSize < Config.BufferPoolMinChunkSize {
chunkSize = Config.BufferPoolMinChunkSize // Minimum chunk size
}
p := &AudioBufferPool{
pool := &AudioBufferPool{
bufferSize: bufferSize,
maxPoolSize: maxPoolSize,
preallocated: make([]*[]byte, 0, preallocSize),
preallocSize: preallocSize,
chunkSize: chunkSize,
chunks: make([][]byte, 0, Config.BufferPoolInitialChunkCapacity), // Start with capacity for initial chunks
chunkOffsets: make([]int, 0, Config.BufferPoolInitialChunkCapacity),
pool: make(chan []byte, maxSize),
maxSize: maxSize,
}
// Configure sync.Pool with optimized allocation
p.pool.New = func() interface{} {
// Use chunk-based allocation instead of individual make()
buf := p.allocateFromChunk()
return &buf
// Pre-populate the pool
for i := 0; i < maxSize/2; i++ {
buf := make([]byte, bufferSize)
select {
case pool.pool <- buf:
default:
break
}
}
// Pre-allocate buffers with optimized capacity
for i := 0; i < preallocSize; i++ {
// Use chunk-based allocation to prevent over-allocation
buf := p.allocateFromChunk()
p.preallocated = append(p.preallocated, &buf)
}
return p
}
// allocateFromChunk allocates a buffer from pre-allocated memory chunks
func (p *AudioBufferPool) allocateFromChunk() []byte {
p.chunkMutex.Lock()
defer p.chunkMutex.Unlock()
// Try to allocate from existing chunks
for i := 0; i < len(p.chunks); i++ {
if p.chunkOffsets[i]+p.bufferSize <= len(p.chunks[i]) {
// Slice from the chunk
start := p.chunkOffsets[i]
end := start + p.bufferSize
buf := p.chunks[i][start:end:end] // Use 3-index slice to set capacity
p.chunkOffsets[i] = end
return buf[:0] // Return with zero length but correct capacity
}
}
// Need to allocate a new chunk
newChunk := make([]byte, p.chunkSize)
p.chunks = append(p.chunks, newChunk)
p.chunkOffsets = append(p.chunkOffsets, p.bufferSize)
// Return buffer from the new chunk
buf := newChunk[0:p.bufferSize:p.bufferSize]
return buf[:0] // Return with zero length but correct capacity
return pool
}
// Get retrieves a buffer from the pool
func (p *AudioBufferPool) Get() []byte {
// Skip cleanup trigger in hotpath - cleanup runs in background
// cleanupGoroutineCache() - moved to background goroutine
// Fast path: Try lock-free per-goroutine cache first
gid := getGoroutineID()
goroutineCacheMutex.RLock()
cacheEntry, exists := goroutineCacheWithTTL[gid]
goroutineCacheMutex.RUnlock()
if exists && cacheEntry != nil && cacheEntry.cache != nil {
// Try to get buffer from lock-free cache
cache := cacheEntry.cache
for i := 0; i < len(cache.buffers); i++ {
bufPtr := (*unsafe.Pointer)(unsafe.Pointer(&cache.buffers[i]))
buf := (*[]byte)(atomic.LoadPointer(bufPtr))
if buf != nil && atomic.CompareAndSwapPointer(bufPtr, unsafe.Pointer(buf), nil) {
// Direct hit count update to avoid sampling complexity in critical path
select {
case buf := <-p.pool:
atomic.AddInt64(&p.hitCount, 1)
*buf = (*buf)[:0]
return *buf
}
}
// Update access time only after cache miss to reduce overhead
cacheEntry.lastAccess = time.Now().Unix()
}
// Fallback: Try pre-allocated pool with mutex
p.mutex.Lock()
if len(p.preallocated) > 0 {
lastIdx := len(p.preallocated) - 1
buf := p.preallocated[lastIdx]
p.preallocated = p.preallocated[:lastIdx]
p.mutex.Unlock()
// Direct hit count update to avoid sampling complexity in critical path
atomic.AddInt64(&p.hitCount, 1)
*buf = (*buf)[:0]
return *buf
}
p.mutex.Unlock()
// Try sync.Pool next
if poolBuf := p.pool.Get(); poolBuf != nil {
buf := poolBuf.(*[]byte)
// Direct hit count update to avoid sampling complexity in critical path
atomic.AddInt64(&p.hitCount, 1)
atomic.AddInt64(&p.currentSize, -1)
// Fast capacity check - most buffers should be correct size
if cap(*buf) >= p.bufferSize {
*buf = (*buf)[:0]
return *buf
}
// Buffer too small, fall through to allocation
}
// Pool miss - allocate new buffer from chunk
// Direct miss count update to avoid sampling complexity in critical path
return buf[:0] // Reset length but keep capacity
default:
atomic.AddInt64(&p.missCount, 1)
return p.allocateFromChunk()
return make([]byte, 0, p.bufferSize)
}
}
// Put returns a buffer to the pool
func (p *AudioBufferPool) Put(buf []byte) {
// Fast validation - reject buffers that are too small or too large
bufCap := cap(buf)
if bufCap < p.bufferSize || bufCap > p.bufferSize*2 {
return // Buffer size mismatch, don't pool it to prevent memory bloat
if buf == nil || cap(buf) != p.bufferSize {
return // Invalid buffer
}
// Enhanced buffer clearing - only clear if buffer contains sensitive data
// For audio buffers, we can skip clearing for performance unless needed
// This reduces CPU overhead significantly
var resetBuf []byte
if cap(buf) > p.bufferSize {
// If capacity is larger than expected, create a new properly sized buffer
resetBuf = make([]byte, 0, p.bufferSize)
} else {
// Reset length but keep capacity for reuse efficiency
resetBuf = buf[:0]
}
// Reset the buffer
buf = buf[:0]
// Fast path: Try to put in lock-free per-goroutine cache
gid := getGoroutineID()
goroutineCacheMutex.RLock()
entryWithTTL, exists := goroutineCacheWithTTL[gid]
goroutineCacheMutex.RUnlock()
var cache *lockFreeBufferCache
if exists && entryWithTTL != nil {
cache = entryWithTTL.cache
// Update access time only when we successfully use the cache
} else {
// Create new cache for this goroutine
cache = &lockFreeBufferCache{}
now := time.Now().Unix()
goroutineCacheMutex.Lock()
goroutineCacheWithTTL[gid] = &cacheEntry{
cache: cache,
lastAccess: now,
gid: gid,
// Try to return to pool
select {
case p.pool <- buf:
// Successfully returned to pool
default:
// Pool is full, discard buffer
}
goroutineCacheMutex.Unlock()
}
if cache != nil {
// Try to store in lock-free cache
for i := 0; i < len(cache.buffers); i++ {
bufPtr := (*unsafe.Pointer)(unsafe.Pointer(&cache.buffers[i]))
if atomic.CompareAndSwapPointer(bufPtr, nil, unsafe.Pointer(&resetBuf)) {
// Update access time only on successful cache
if exists && entryWithTTL != nil {
entryWithTTL.lastAccess = time.Now().Unix()
}
return // Successfully cached
}
}
}
// Fallback: Try to return to pre-allocated pool for fastest reuse
p.mutex.Lock()
if len(p.preallocated) < p.preallocSize {
p.preallocated = append(p.preallocated, &resetBuf)
p.mutex.Unlock()
return
}
p.mutex.Unlock()
// Check sync.Pool size limit to prevent excessive memory usage
if atomic.LoadInt64(&p.currentSize) >= int64(p.maxPoolSize) {
return // Pool is full, let GC handle this buffer
}
// Return to sync.Pool and update counter atomically
p.pool.Put(&resetBuf)
atomic.AddInt64(&p.currentSize, 1)
}
// Enhanced global buffer pools for different audio frame types with improved sizing
var (
// Main audio frame pool with enhanced capacity
audioFramePool = NewAudioBufferPool(Config.AudioFramePoolSize)
// Control message pool with enhanced capacity for better throughput
audioControlPool = NewAudioBufferPool(Config.BufferPoolControlSize) // Control message buffer size
)
func GetAudioFrameBuffer() []byte {
return audioFramePool.Get()
}
func PutAudioFrameBuffer(buf []byte) {
audioFramePool.Put(buf)
}
func GetAudioControlBuffer() []byte {
return audioControlPool.Get()
}
func PutAudioControlBuffer(buf []byte) {
audioControlPool.Put(buf)
}
// GetPoolStats returns detailed statistics about this buffer pool
func (p *AudioBufferPool) GetPoolStats() AudioBufferPoolDetailedStats {
p.mutex.RLock()
preallocatedCount := len(p.preallocated)
currentSize := p.currentSize
p.mutex.RUnlock()
// GetStats returns pool statistics
func (p *AudioBufferPool) GetStats() AudioBufferPoolStats {
hitCount := atomic.LoadInt64(&p.hitCount)
missCount := atomic.LoadInt64(&p.missCount)
totalRequests := hitCount + missCount
var hitRate float64
if totalRequests > 0 {
hitRate = float64(hitCount) / float64(totalRequests) * Config.PercentageMultiplier
hitRate = float64(hitCount) / float64(totalRequests) * Config.BufferPoolHitRateBase
}
return AudioBufferPoolDetailedStats{
return AudioBufferPoolStats{
BufferSize: p.bufferSize,
MaxPoolSize: p.maxPoolSize,
CurrentPoolSize: currentSize,
PreallocatedCount: int64(preallocatedCount),
PreallocatedMax: int64(p.preallocSize),
MaxPoolSize: p.maxSize,
CurrentSize: int64(len(p.pool)),
HitCount: hitCount,
MissCount: missCount,
HitRate: hitRate,
}
}
// AudioBufferPoolDetailedStats provides detailed pool statistics
type AudioBufferPoolDetailedStats struct {
// AudioBufferPoolStats represents pool statistics
type AudioBufferPoolStats struct {
BufferSize int
MaxPoolSize int
CurrentPoolSize int64
PreallocatedCount int64
PreallocatedMax int64
CurrentSize int64
HitCount int64
MissCount int64
HitRate float64 // Percentage
TotalBytes int64 // Total memory usage in bytes
AverageBufferSize float64 // Average size of buffers in the pool
HitRate float64
}
// GetAudioBufferPoolStats returns statistics about the audio buffer pools
type AudioBufferPoolStats struct {
FramePoolSize int64
FramePoolMax int
ControlPoolSize int64
ControlPoolMax int
// Enhanced statistics
FramePoolHitRate float64
ControlPoolHitRate float64
FramePoolDetails AudioBufferPoolDetailedStats
ControlPoolDetails AudioBufferPoolDetailedStats
// Global buffer pools
var (
audioFramePool = NewAudioBufferPool(Config.AudioFramePoolSize)
audioControlPool = NewAudioBufferPool(Config.BufferPoolControlSize)
)
// GetAudioFrameBuffer gets a buffer for audio frames
func GetAudioFrameBuffer() []byte {
return audioFramePool.Get()
}
func GetAudioBufferPoolStats() AudioBufferPoolStats {
audioFramePool.mutex.RLock()
frameSize := audioFramePool.currentSize
frameMax := audioFramePool.maxPoolSize
audioFramePool.mutex.RUnlock()
audioControlPool.mutex.RLock()
controlSize := audioControlPool.currentSize
controlMax := audioControlPool.maxPoolSize
audioControlPool.mutex.RUnlock()
// Get detailed statistics
frameDetails := audioFramePool.GetPoolStats()
controlDetails := audioControlPool.GetPoolStats()
return AudioBufferPoolStats{
FramePoolSize: frameSize,
FramePoolMax: frameMax,
ControlPoolSize: controlSize,
ControlPoolMax: controlMax,
FramePoolHitRate: frameDetails.HitRate,
ControlPoolHitRate: controlDetails.HitRate,
FramePoolDetails: frameDetails,
ControlPoolDetails: controlDetails,
}
// PutAudioFrameBuffer returns a buffer to the frame pool
func PutAudioFrameBuffer(buf []byte) {
audioFramePool.Put(buf)
}
// AdaptiveResize dynamically adjusts pool parameters based on performance metrics
func (p *AudioBufferPool) AdaptiveResize() {
hitCount := atomic.LoadInt64(&p.hitCount)
missCount := atomic.LoadInt64(&p.missCount)
totalRequests := hitCount + missCount
if totalRequests < int64(Config.BufferPoolAdaptiveResizeThreshold) {
return // Not enough data for meaningful adaptation
}
hitRate := float64(hitCount) / float64(totalRequests)
currentSize := atomic.LoadInt64(&p.currentSize)
// If hit rate is low, consider increasing pool size
if hitRate < Config.BufferPoolCacheHitRateTarget && currentSize < int64(p.maxPoolSize) {
// Increase preallocation by 25% up to max pool size
newPreallocSize := int(float64(len(p.preallocated)) * 1.25)
if newPreallocSize > p.maxPoolSize {
newPreallocSize = p.maxPoolSize
}
// Preallocate additional buffers
for len(p.preallocated) < newPreallocSize {
buf := make([]byte, p.bufferSize)
p.preallocated = append(p.preallocated, &buf)
}
}
// If hit rate is very high and pool is large, consider shrinking
if hitRate > Config.BufferPoolHighHitRateThreshold && len(p.preallocated) > p.preallocSize {
// Reduce preallocation by 10% but not below original size
newSize := int(float64(len(p.preallocated)) * 0.9)
if newSize < p.preallocSize {
newSize = p.preallocSize
}
// Remove excess preallocated buffers
if newSize < len(p.preallocated) {
p.preallocated = p.preallocated[:newSize]
}
}
// GetAudioControlBuffer gets a buffer for control messages
func GetAudioControlBuffer() []byte {
return audioControlPool.Get()
}
// WarmupCache pre-populates goroutine-local caches for better initial performance
func (p *AudioBufferPool) WarmupCache() {
// Only warmup if we have sufficient request history
hitCount := atomic.LoadInt64(&p.hitCount)
missCount := atomic.LoadInt64(&p.missCount)
totalRequests := hitCount + missCount
if totalRequests < int64(Config.BufferPoolCacheWarmupThreshold) {
return
}
// Get or create cache for current goroutine
gid := getGoroutineID()
goroutineCacheMutex.RLock()
entryWithTTL, exists := goroutineCacheWithTTL[gid]
goroutineCacheMutex.RUnlock()
var cache *lockFreeBufferCache
if exists && entryWithTTL != nil {
cache = entryWithTTL.cache
} else {
// Create new cache for this goroutine
cache = &lockFreeBufferCache{}
now := time.Now().Unix()
goroutineCacheMutex.Lock()
goroutineCacheWithTTL[gid] = &cacheEntry{
cache: cache,
lastAccess: now,
gid: gid,
}
goroutineCacheMutex.Unlock()
}
if cache != nil {
// Fill cache to optimal level based on hit rate
hitRate := float64(hitCount) / float64(totalRequests)
optimalCacheSize := int(float64(Config.BufferPoolCacheSize) * hitRate)
if optimalCacheSize < 2 {
optimalCacheSize = 2
}
// Pre-allocate buffers for cache
for i := 0; i < optimalCacheSize && i < len(cache.buffers); i++ {
if cache.buffers[i] == nil {
// Get buffer from main pool
buf := p.Get()
if len(buf) > 0 {
cache.buffers[i] = &buf
}
}
}
}
// PutAudioControlBuffer returns a buffer to the control pool
func PutAudioControlBuffer(buf []byte) {
audioControlPool.Put(buf)
}
// OptimizeCache performs periodic cache optimization based on usage patterns
func (p *AudioBufferPool) OptimizeCache() {
hitCount := atomic.LoadInt64(&p.hitCount)
missCount := atomic.LoadInt64(&p.missCount)
totalRequests := hitCount + missCount
if totalRequests < int64(Config.BufferPoolOptimizeCacheThreshold) {
return
}
hitRate := float64(hitCount) / float64(totalRequests)
// If hit rate is below target, trigger cache warmup
if hitRate < Config.BufferPoolCacheHitRateTarget {
p.WarmupCache()
}
// Reset counters periodically to avoid overflow and get fresh metrics
if totalRequests > int64(Config.BufferPoolCounterResetThreshold) {
atomic.StoreInt64(&p.hitCount, hitCount/2)
atomic.StoreInt64(&p.missCount, missCount/2)
// GetAudioBufferPoolStats returns statistics for all pools
func GetAudioBufferPoolStats() map[string]AudioBufferPoolStats {
return map[string]AudioBufferPoolStats{
"frame_pool": audioFramePool.GetStats(),
"control_pool": audioControlPool.GetStats(),
}
}

View File

@ -98,7 +98,7 @@ type ZeroCopyFramePool struct {
// NewZeroCopyFramePool creates a new zero-copy frame pool
func NewZeroCopyFramePool(maxFrameSize int) *ZeroCopyFramePool {
// Pre-allocate frames for immediate availability
preallocSizeBytes := Config.PreallocSize
preallocSizeBytes := Config.ZeroCopyPreallocSizeBytes
maxPoolSize := Config.MaxPoolSize // Limit total pool size
// Calculate number of frames based on memory budget, not frame count
@ -106,8 +106,8 @@ func NewZeroCopyFramePool(maxFrameSize int) *ZeroCopyFramePool {
if preallocFrameCount > maxPoolSize {
preallocFrameCount = maxPoolSize
}
if preallocFrameCount < 1 {
preallocFrameCount = 1 // Always preallocate at least one frame
if preallocFrameCount < Config.ZeroCopyMinPreallocFrames {
preallocFrameCount = Config.ZeroCopyMinPreallocFrames
}
preallocated := make([]*ZeroCopyAudioFrame, 0, preallocFrameCount)

View File

@ -35,9 +35,6 @@ func startAudioSubprocess() error {
// Initialize validation cache for optimal performance
audio.InitValidationCache()
// Start goroutine monitoring to detect and prevent leaks
audio.StartGoroutineMonitoring()
// Enable batch audio processing to reduce CGO call overhead
if err := audio.EnableBatchAudioProcessing(); err != nil {
logger.Warn().Err(err).Msg("failed to enable batch audio processing")
@ -112,8 +109,6 @@ func startAudioSubprocess() error {
// Stop audio relay when process exits
audio.StopAudioRelay()
// Stop goroutine monitoring
audio.StopGoroutineMonitoring()
// Disable batch audio processing
audio.DisableBatchAudioProcessing()
},