Compare commits

..

20 Commits

Author SHA1 Message Date
Alex P 140a803ccf perf(audio): add ARM NEON SIMD optimizations for audio processing
Implement SIMD-optimized audio operations using ARM NEON for Cortex-A7 targets
Update Makefile and CI configuration to support NEON compilation flags
Add SIMD implementations for common audio operations including:
- Sample clearing and interleaving
- Volume scaling and format conversion
- Channel manipulation and balance adjustment
- Endianness swapping and prefetching
2025-09-16 18:18:19 +00:00
Alex P eca3c52513 PR Review Optimization: As recommended, use ternary operators instead of if/else for better readability 2025-09-16 16:17:56 +03:00
Alex P 55bcfb5a22 Consistency: keep if block multi-line 2025-09-16 16:08:16 +03:00
Alex P 0027001390 Cleanup: removed redundant code 2025-09-16 16:03:20 +03:00
Alex P caa0a60ebb Cleanup: removed redundant code 2025-09-16 16:00:55 +03:00
Alex P a5fb3bf30c Fix: remove misplaced const 2025-09-16 15:52:53 +03:00
Alex P 26e71806cb Cleanup, Optimizations: Small aaudio optimizations 2025-09-16 15:46:55 +03:00
Alex P 2f7bf55f22 Cleanup, Optimizations: Small aaudio optimizations 2025-09-16 15:45:03 +03:00
Alex P 8a3f1b6c32 Cleanup, Optimizations: Small aaudio optimizations 2025-09-16 15:37:23 +03:00
Alex P 7ffb9e1d59 Cleanup: removed redundant code, comments, etc. 2025-09-16 15:31:10 +03:00
Alex P 647eca4292 Cleanup: removed redundant code, comments, etc. 2025-09-16 15:23:16 +03:00
Alex P a8b58b5d34 [WIP] Cleanup: removed redundant code 2025-09-16 15:17:49 +03:00
Alex P b23cc50d6c [WIP] Cleanup: removed redundant code 2025-09-16 15:14:00 +03:00
Alex P 1f88dab95f [WIP] Maintainability Improvement: Add debug logging throughout the audio system for easy debugging and troubleshooting 2025-09-16 15:05:08 +03:00
Alex P 0944c886e5 [WIP] Maintainability Improvement: Add debug logging throughout the audio system for easy debugging and troubleshooting 2025-09-16 11:27:18 +03:00
Alex P 5e257b3144 [WIP] Add debug logging throughout the audio system 2025-09-16 11:26:48 +03:00
Alex P fb98c4edcb [WIP] Maintainability: Add debug / trace logs to make it easy to debug audio input issues 2025-09-16 11:11:18 +03:00
Alex P e894470ca8 [WIP] Cleanup: function naming 2025-09-16 07:33:34 +00:00
Alex P 996016b0da [WIP] Cleanup: remove unnecessary complexity 2025-09-15 23:00:03 +00:00
Alex P 7ab4a0e41d [WIP] Simplification: PR Simplification 2025-09-16 00:44:26 +03:00
31 changed files with 1159 additions and 3264 deletions

View File

@ -84,7 +84,10 @@ jobs:
version: v2.0.2 version: v2.0.2
env: env:
CGO_ENABLED: 1 CGO_ENABLED: 1
ALSA_VERSION: ${{ env.ALSA_VERSION }} GOOS: linux
OPUS_VERSION: ${{ env.OPUS_VERSION }} GOARCH: arm
CGO_CFLAGS: "-I${{ steps.build-env.outputs.cache_path }}/alsa-lib-${{ steps.build-env.outputs.alsa_version }}/include -I${{ steps.build-env.outputs.cache_path }}/opus-${{ steps.build-env.outputs.opus_version }}/include -I${{ steps.build-env.outputs.cache_path }}/opus-${{ steps.build-env.outputs.opus_version }}/celt" GOARM: 7
CC: ${{ steps.build-env.outputs.cache_path }}/../rv1106-system/tools/linux/toolchain/arm-rockchip830-linux-uclibcgnueabihf/bin/arm-rockchip830-linux-uclibcgnueabihf-gcc
PKG_CONFIG_PATH: ${{ steps.build-env.outputs.cache_path }}/alsa-lib-${{ steps.build-env.outputs.alsa_version }}/utils:${{ steps.build-env.outputs.cache_path }}/opus-${{ steps.build-env.outputs.opus_version }}
CGO_CFLAGS: "-O3 -mfpu=neon -mtune=cortex-a7 -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops -mvectorize-with-neon-quad -marm -D__ARM_NEON -I${{ steps.build-env.outputs.cache_path }}/alsa-lib-${{ steps.build-env.outputs.alsa_version }}/include -I${{ steps.build-env.outputs.cache_path }}/opus-${{ steps.build-env.outputs.opus_version }}/include -I${{ steps.build-env.outputs.cache_path }}/opus-${{ steps.build-env.outputs.opus_version }}/celt"
CGO_LDFLAGS: "-L${{ steps.build-env.outputs.cache_path }}/alsa-lib-${{ steps.build-env.outputs.alsa_version }}/src/.libs -lasound -L${{ steps.build-env.outputs.cache_path }}/opus-${{ steps.build-env.outputs.opus_version }}/.libs -lopus -lm -ldl -static" CGO_LDFLAGS: "-L${{ steps.build-env.outputs.cache_path }}/alsa-lib-${{ steps.build-env.outputs.alsa_version }}/src/.libs -lasound -L${{ steps.build-env.outputs.cache_path }}/opus-${{ steps.build-env.outputs.opus_version }}/.libs -lopus -lm -ldl -static"

1
.gitignore vendored
View File

@ -11,3 +11,4 @@ tmp/
device-tests.tar.gz device-tests.tar.gz
CLAUDE.md CLAUDE.md
.claude/

View File

@ -409,7 +409,7 @@ npm install
```bash ```bash
# Enable debug logging # Enable debug logging
export LOG_TRACE_SCOPES="jetkvm,cloud,websocket,native,jsonrpc" export LOG_TRACE_SCOPES="jetkvm,cloud,websocket,native,jsonrpc,audio"
# Frontend development # Frontend development
export JETKVM_PROXY_URL="ws://<IP>" export JETKVM_PROXY_URL="ws://<IP>"
@ -461,7 +461,7 @@ curl http://api:$JETKVM_PASSWORD@YOUR_DEVICE_IP/developer/pprof/
```bash ```bash
# Enable trace logging (useful for debugging) # Enable trace logging (useful for debugging)
export LOG_TRACE_SCOPES="jetkvm,cloud,websocket,native,jsonrpc" export LOG_TRACE_SCOPES="jetkvm,cloud,websocket,native,jsonrpc,audio"
# For frontend development # For frontend development
export JETKVM_PROXY_URL="ws://<JETKVM_IP>" export JETKVM_PROXY_URL="ws://<JETKVM_IP>"

View File

@ -36,8 +36,8 @@ export PKG_CONFIG_PATH := $(AUDIO_LIBS_DIR)/alsa-lib-$(ALSA_VERSION)/utils:$(AUD
# Common command to clean Go cache with verbose output for all Go builds # Common command to clean Go cache with verbose output for all Go builds
CLEAN_GO_CACHE := @echo "Cleaning Go cache..."; go clean -cache -v CLEAN_GO_CACHE := @echo "Cleaning Go cache..."; go clean -cache -v
# Optimization flags for ARM Cortex-A7 with NEON # Optimization flags for ARM Cortex-A7 with NEON SIMD
OPTIM_CFLAGS := -O3 -mfpu=neon -mtune=cortex-a7 -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops OPTIM_CFLAGS := -O3 -mfpu=neon -mtune=cortex-a7 -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops -mvectorize-with-neon-quad -marm -D__ARM_NEON
# Cross-compilation environment for ARM - exported globally # Cross-compilation environment for ARM - exported globally
export GOOS := linux export GOOS := linux

View File

@ -41,7 +41,7 @@ REMOTE_USER="root"
REMOTE_PATH="/userdata/jetkvm/bin" REMOTE_PATH="/userdata/jetkvm/bin"
SKIP_UI_BUILD=false SKIP_UI_BUILD=false
RESET_USB_HID_DEVICE=false RESET_USB_HID_DEVICE=false
LOG_TRACE_SCOPES="${LOG_TRACE_SCOPES:-jetkvm,cloud,websocket,native,jsonrpc}" LOG_TRACE_SCOPES="${LOG_TRACE_SCOPES:-jetkvm,cloud,websocket,native,jsonrpc,audio}"
RUN_GO_TESTS=false RUN_GO_TESTS=false
RUN_GO_TESTS_ONLY=false RUN_GO_TESTS_ONLY=false
INSTALL_APP=false INSTALL_APP=false

View File

@ -1,447 +0,0 @@
package audio
import (
"context"
"math"
"sync"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// AdaptiveBufferConfig holds configuration for the adaptive buffer sizing algorithm.
//
// The adaptive buffer system dynamically adjusts audio buffer sizes based on real-time
// system conditions to optimize the trade-off between latency and stability. The algorithm
// uses multiple factors to make decisions:
//
// 1. System Load Monitoring:
// - CPU usage: High CPU load increases buffer sizes to prevent underruns
// - Memory usage: High memory pressure reduces buffer sizes to conserve RAM
//
// 2. Latency Tracking:
// - Target latency: Optimal latency for the current quality setting
// - Max latency: Hard limit beyond which buffers are aggressively reduced
//
// 3. Adaptation Strategy:
// - Exponential smoothing: Prevents oscillation and provides stable adjustments
// - Discrete steps: Buffer sizes change in fixed increments to avoid instability
// - Hysteresis: Different thresholds for increasing vs decreasing buffer sizes
//
// The algorithm is specifically tuned for embedded ARM systems with limited resources,
// prioritizing stability over absolute minimum latency.
type AdaptiveBufferConfig struct {
// Buffer size limits (in frames)
MinBufferSize int
MaxBufferSize int
DefaultBufferSize int
// System load thresholds
LowCPUThreshold float64 // Below this, increase buffer size
HighCPUThreshold float64 // Above this, decrease buffer size
LowMemoryThreshold float64 // Below this, increase buffer size
HighMemoryThreshold float64 // Above this, decrease buffer size
// Latency thresholds (in milliseconds)
TargetLatency time.Duration
MaxLatency time.Duration
// Adaptation parameters
AdaptationInterval time.Duration
SmoothingFactor float64 // 0.0-1.0, higher = more responsive
}
// DefaultAdaptiveBufferConfig returns optimized config for JetKVM hardware
func DefaultAdaptiveBufferConfig() AdaptiveBufferConfig {
return AdaptiveBufferConfig{
// Conservative buffer sizes for 256MB RAM constraint
MinBufferSize: Config.AdaptiveMinBufferSize,
MaxBufferSize: Config.AdaptiveMaxBufferSize,
DefaultBufferSize: Config.AdaptiveDefaultBufferSize,
// CPU thresholds optimized for single-core ARM Cortex A7 under load
LowCPUThreshold: Config.LowCPUThreshold * 100, // Below 20% CPU
HighCPUThreshold: Config.HighCPUThreshold * 100, // Above 60% CPU (lowered to be more responsive)
// Memory thresholds for 256MB total RAM
LowMemoryThreshold: Config.LowMemoryThreshold * 100, // Below 35% memory usage
HighMemoryThreshold: Config.HighMemoryThreshold * 100, // Above 75% memory usage (lowered for earlier response)
// Latency targets
TargetLatency: Config.AdaptiveBufferTargetLatency, // Target 20ms latency
MaxLatency: Config.MaxLatencyThreshold, // Max acceptable latency
// Adaptation settings
AdaptationInterval: Config.BufferUpdateInterval, // Check every 500ms
SmoothingFactor: Config.SmoothingFactor, // Moderate responsiveness
}
}
// AdaptiveBufferManager manages dynamic buffer sizing based on system conditions
type AdaptiveBufferManager struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
currentInputBufferSize int64 // Current input buffer size (atomic)
currentOutputBufferSize int64 // Current output buffer size (atomic)
averageLatency int64 // Average latency in nanoseconds (atomic)
systemCPUPercent int64 // System CPU percentage * 100 (atomic)
systemMemoryPercent int64 // System memory percentage * 100 (atomic)
adaptationCount int64 // Metrics tracking (atomic)
config AdaptiveBufferConfig
logger zerolog.Logger
// Control channels
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// Metrics tracking
lastAdaptation time.Time
mutex sync.RWMutex
}
// NewAdaptiveBufferManager creates a new adaptive buffer manager
func NewAdaptiveBufferManager(config AdaptiveBufferConfig) *AdaptiveBufferManager {
logger := logging.GetDefaultLogger().With().Str("component", "adaptive-buffer").Logger()
if err := ValidateAdaptiveBufferConfig(config.MinBufferSize, config.MaxBufferSize, config.DefaultBufferSize); err != nil {
logger.Warn().Err(err).Msg("invalid adaptive buffer config, using defaults")
config = DefaultAdaptiveBufferConfig()
}
ctx, cancel := context.WithCancel(context.Background())
return &AdaptiveBufferManager{
currentInputBufferSize: int64(config.DefaultBufferSize),
currentOutputBufferSize: int64(config.DefaultBufferSize),
config: config,
logger: logger,
ctx: ctx,
cancel: cancel,
lastAdaptation: time.Now(),
}
}
// Start begins the adaptive buffer management
func (abm *AdaptiveBufferManager) Start() {
abm.wg.Add(1)
go abm.adaptationLoop()
abm.logger.Info().Msg("adaptive buffer manager started")
}
// Stop stops the adaptive buffer management
func (abm *AdaptiveBufferManager) Stop() {
abm.cancel()
abm.wg.Wait()
abm.logger.Info().Msg("adaptive buffer manager stopped")
}
// GetInputBufferSize returns the current recommended input buffer size
func (abm *AdaptiveBufferManager) GetInputBufferSize() int {
return int(atomic.LoadInt64(&abm.currentInputBufferSize))
}
// GetOutputBufferSize returns the current recommended output buffer size
func (abm *AdaptiveBufferManager) GetOutputBufferSize() int {
return int(atomic.LoadInt64(&abm.currentOutputBufferSize))
}
// UpdateLatency updates the current latency measurement
func (abm *AdaptiveBufferManager) UpdateLatency(latency time.Duration) {
// Use exponential moving average for latency tracking
// Weight: 90% historical, 10% current (for smoother averaging)
currentAvg := atomic.LoadInt64(&abm.averageLatency)
newLatencyNs := latency.Nanoseconds()
if currentAvg == 0 {
// First measurement
atomic.StoreInt64(&abm.averageLatency, newLatencyNs)
} else {
// Exponential moving average
newAvg := (currentAvg*9 + newLatencyNs) / 10
atomic.StoreInt64(&abm.averageLatency, newAvg)
}
// Log high latency warnings only for truly problematic latencies
// Use a more reasonable threshold: 10ms for audio processing is concerning
highLatencyThreshold := 10 * time.Millisecond
if latency > highLatencyThreshold {
abm.logger.Debug().
Dur("latency_ms", latency/time.Millisecond).
Dur("threshold_ms", highLatencyThreshold/time.Millisecond).
Msg("High audio processing latency detected")
}
}
// BoostBuffersForQualityChange immediately increases buffer sizes to handle quality change bursts
// This bypasses the normal adaptive algorithm for emergency situations
func (abm *AdaptiveBufferManager) BoostBuffersForQualityChange() {
// Immediately set buffers to maximum size to handle quality change frame bursts
maxSize := int64(abm.config.MaxBufferSize)
atomic.StoreInt64(&abm.currentInputBufferSize, maxSize)
atomic.StoreInt64(&abm.currentOutputBufferSize, maxSize)
abm.logger.Info().
Int("buffer_size", int(maxSize)).
Msg("Boosted buffers to maximum size for quality change")
}
// adaptationLoop is the main loop that adjusts buffer sizes
func (abm *AdaptiveBufferManager) adaptationLoop() {
defer abm.wg.Done()
ticker := time.NewTicker(abm.config.AdaptationInterval)
defer ticker.Stop()
for {
select {
case <-abm.ctx.Done():
return
case <-ticker.C:
abm.adaptBufferSizes()
}
}
}
// adaptBufferSizes analyzes system conditions and adjusts buffer sizes
// adaptBufferSizes implements the core adaptive buffer sizing algorithm.
//
// This function uses a multi-factor approach to determine optimal buffer sizes:
//
// Mathematical Model:
// 1. Factor Calculation:
//
// - CPU Factor: Sigmoid function that increases buffer size under high CPU load
//
// - Memory Factor: Inverse relationship that decreases buffer size under memory pressure
//
// - Latency Factor: Exponential decay that aggressively reduces buffers when latency exceeds targets
//
// 2. Combined Factor:
// Combined = (CPU_factor * Memory_factor * Latency_factor)
// This multiplicative approach ensures any single critical factor can override others
//
// 3. Exponential Smoothing:
// New_size = Current_size + smoothing_factor * (Target_size - Current_size)
// This prevents rapid oscillations and provides stable convergence
//
// 4. Discrete Quantization:
// Final sizes are rounded to frame boundaries and clamped to configured limits
//
// The algorithm runs periodically and only applies changes when the adaptation interval
// has elapsed, preventing excessive adjustments that could destabilize the audio pipeline.
func (abm *AdaptiveBufferManager) adaptBufferSizes() {
// Use fixed system metrics for stability
systemCPU := 50.0 // Assume moderate CPU usage
systemMemory := 60.0 // Assume moderate memory usage
atomic.StoreInt64(&abm.systemCPUPercent, int64(systemCPU*100))
atomic.StoreInt64(&abm.systemMemoryPercent, int64(systemMemory*100))
// Get current latency
currentLatencyNs := atomic.LoadInt64(&abm.averageLatency)
currentLatency := time.Duration(currentLatencyNs)
// Calculate adaptation factors
cpuFactor := abm.calculateCPUFactor(systemCPU)
memoryFactor := abm.calculateMemoryFactor(systemMemory)
latencyFactor := abm.calculateLatencyFactor(currentLatency)
// Combine factors with weights (CPU has highest priority for KVM coexistence)
combinedFactor := Config.CPUMemoryWeight*cpuFactor + Config.MemoryWeight*memoryFactor + Config.LatencyWeight*latencyFactor
// Apply adaptation with smoothing
currentInput := float64(atomic.LoadInt64(&abm.currentInputBufferSize))
currentOutput := float64(atomic.LoadInt64(&abm.currentOutputBufferSize))
// Calculate new buffer sizes
newInputSize := abm.applyAdaptation(currentInput, combinedFactor)
newOutputSize := abm.applyAdaptation(currentOutput, combinedFactor)
// Update buffer sizes if they changed significantly
adjustmentMade := false
if math.Abs(newInputSize-currentInput) >= 0.5 || math.Abs(newOutputSize-currentOutput) >= 0.5 {
atomic.StoreInt64(&abm.currentInputBufferSize, int64(math.Round(newInputSize)))
atomic.StoreInt64(&abm.currentOutputBufferSize, int64(math.Round(newOutputSize)))
atomic.AddInt64(&abm.adaptationCount, 1)
abm.mutex.Lock()
abm.lastAdaptation = time.Now()
abm.mutex.Unlock()
adjustmentMade = true
abm.logger.Debug().
Float64("cpu_percent", systemCPU).
Float64("memory_percent", systemMemory).
Dur("latency", currentLatency).
Float64("combined_factor", combinedFactor).
Int("new_input_size", int(newInputSize)).
Int("new_output_size", int(newOutputSize)).
Msg("Adapted buffer sizes")
}
// Update metrics with current state
currentInputSize := int(atomic.LoadInt64(&abm.currentInputBufferSize))
currentOutputSize := int(atomic.LoadInt64(&abm.currentOutputBufferSize))
UpdateAdaptiveBufferMetrics(currentInputSize, currentOutputSize, systemCPU, systemMemory, adjustmentMade)
}
// calculateCPUFactor returns adaptation factor based on CPU usage with threshold validation.
//
// Validation Rules:
// - CPU percentage must be within valid range [0.0, 100.0]
// - Uses LowCPUThreshold and HighCPUThreshold from config for decision boundaries
// - Default thresholds: Low=20.0%, High=80.0%
//
// Adaptation Logic:
// - CPU > HighCPUThreshold: Return -1.0 (decrease buffers to reduce CPU load)
// - CPU < LowCPUThreshold: Return +1.0 (increase buffers for better quality)
// - Between thresholds: Linear interpolation based on distance from midpoint
//
// Returns: Adaptation factor in range [-1.0, +1.0]
// - Negative values: Decrease buffer sizes to reduce CPU usage
// - Positive values: Increase buffer sizes for better audio quality
// - Zero: No adaptation needed
//
// The function ensures CPU-aware buffer management to balance audio quality
// with system performance, preventing CPU starvation of the KVM process.
func (abm *AdaptiveBufferManager) calculateCPUFactor(cpuPercent float64) float64 {
if cpuPercent > abm.config.HighCPUThreshold {
// High CPU: decrease buffers to reduce latency and give CPU to KVM
return -1.0
} else if cpuPercent < abm.config.LowCPUThreshold {
// Low CPU: increase buffers for better quality
return 1.0
}
// Medium CPU: linear interpolation
midpoint := (abm.config.HighCPUThreshold + abm.config.LowCPUThreshold) / 2
return (midpoint - cpuPercent) / (midpoint - abm.config.LowCPUThreshold)
}
// calculateMemoryFactor returns adaptation factor based on memory usage with threshold validation.
//
// Validation Rules:
// - Memory percentage must be within valid range [0.0, 100.0]
// - Uses LowMemoryThreshold and HighMemoryThreshold from config for decision boundaries
// - Default thresholds: Low=30.0%, High=85.0%
//
// Adaptation Logic:
// - Memory > HighMemoryThreshold: Return -1.0 (decrease buffers to free memory)
// - Memory < LowMemoryThreshold: Return +1.0 (increase buffers for performance)
// - Between thresholds: Linear interpolation based on distance from midpoint
//
// Returns: Adaptation factor in range [-1.0, +1.0]
// - Negative values: Decrease buffer sizes to reduce memory usage
// - Positive values: Increase buffer sizes for better performance
// - Zero: No adaptation needed
//
// The function prevents memory exhaustion while optimizing buffer sizes
// for audio processing performance and system stability.
func (abm *AdaptiveBufferManager) calculateMemoryFactor(memoryPercent float64) float64 {
if memoryPercent > abm.config.HighMemoryThreshold {
// High memory: decrease buffers to free memory
return -1.0
} else if memoryPercent < abm.config.LowMemoryThreshold {
// Low memory: increase buffers for better performance
return 1.0
}
// Medium memory: linear interpolation
midpoint := (abm.config.HighMemoryThreshold + abm.config.LowMemoryThreshold) / 2
return (midpoint - memoryPercent) / (midpoint - abm.config.LowMemoryThreshold)
}
// calculateLatencyFactor returns adaptation factor based on latency with threshold validation.
//
// Validation Rules:
// - Latency must be non-negative duration
// - Uses TargetLatency and MaxLatency from config for decision boundaries
// - Default thresholds: Target=50ms, Max=200ms
//
// Adaptation Logic:
// - Latency > MaxLatency: Return -1.0 (decrease buffers to reduce latency)
// - Latency < TargetLatency: Return +1.0 (increase buffers for quality)
// - Between thresholds: Linear interpolation based on distance from midpoint
//
// Returns: Adaptation factor in range [-1.0, +1.0]
// - Negative values: Decrease buffer sizes to reduce audio latency
// - Positive values: Increase buffer sizes for better audio quality
// - Zero: Latency is at optimal level
//
// The function balances audio latency with quality, ensuring real-time
// performance while maintaining acceptable audio processing quality.
func (abm *AdaptiveBufferManager) calculateLatencyFactor(latency time.Duration) float64 {
if latency > abm.config.MaxLatency {
// High latency: decrease buffers
return -1.0
} else if latency < abm.config.TargetLatency {
// Low latency: can increase buffers
return 1.0
}
// Medium latency: linear interpolation
midLatency := (abm.config.MaxLatency + abm.config.TargetLatency) / 2
return float64(midLatency-latency) / float64(midLatency-abm.config.TargetLatency)
}
// applyAdaptation applies the adaptation factor to current buffer size
func (abm *AdaptiveBufferManager) applyAdaptation(currentSize, factor float64) float64 {
// Calculate target size based on factor
var targetSize float64
if factor > 0 {
// Increase towards max
targetSize = currentSize + factor*(float64(abm.config.MaxBufferSize)-currentSize)
} else {
// Decrease towards min
targetSize = currentSize + factor*(currentSize-float64(abm.config.MinBufferSize))
}
// Apply smoothing
newSize := currentSize + abm.config.SmoothingFactor*(targetSize-currentSize)
// Clamp to valid range
return math.Max(float64(abm.config.MinBufferSize),
math.Min(float64(abm.config.MaxBufferSize), newSize))
}
// GetStats returns current adaptation statistics
func (abm *AdaptiveBufferManager) GetStats() map[string]interface{} {
abm.mutex.RLock()
lastAdaptation := abm.lastAdaptation
abm.mutex.RUnlock()
return map[string]interface{}{
"input_buffer_size": abm.GetInputBufferSize(),
"output_buffer_size": abm.GetOutputBufferSize(),
"average_latency_ms": float64(atomic.LoadInt64(&abm.averageLatency)) / 1e6,
"system_cpu_percent": float64(atomic.LoadInt64(&abm.systemCPUPercent)) / Config.PercentageMultiplier,
"system_memory_percent": float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / Config.PercentageMultiplier,
"adaptation_count": atomic.LoadInt64(&abm.adaptationCount),
"last_adaptation": lastAdaptation,
}
}
// Global adaptive buffer manager instance
var globalAdaptiveBufferManager *AdaptiveBufferManager
var adaptiveBufferOnce sync.Once
// GetAdaptiveBufferManager returns the global adaptive buffer manager instance
func GetAdaptiveBufferManager() *AdaptiveBufferManager {
adaptiveBufferOnce.Do(func() {
globalAdaptiveBufferManager = NewAdaptiveBufferManager(DefaultAdaptiveBufferConfig())
})
return globalAdaptiveBufferManager
}
// StartAdaptiveBuffering starts the global adaptive buffer manager
func StartAdaptiveBuffering() {
GetAdaptiveBufferManager().Start()
}
// StopAdaptiveBuffering stops the global adaptive buffer manager
func StopAdaptiveBuffering() {
if globalAdaptiveBufferManager != nil {
globalAdaptiveBufferManager.Stop()
}
}

View File

@ -1,626 +0,0 @@
//go:build cgo
package audio
import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// BatchAudioProcessor manages batched CGO operations to reduce syscall overhead
type BatchAudioProcessor struct {
// Statistics - MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
stats BatchAudioStats
// Control
ctx context.Context
cancel context.CancelFunc
logger *zerolog.Logger
batchSize int
batchDuration time.Duration
// Batch queues and state (atomic for lock-free access)
readQueue chan batchReadRequest
writeQueue chan batchWriteRequest
initialized int32
running int32
threadPinned int32
writePinned int32
// Buffers (pre-allocated to avoid allocation overhead)
readBufPool *sync.Pool
writeBufPool *sync.Pool
}
type BatchAudioStats struct {
// int64 fields MUST be first for ARM32 alignment
BatchedReads int64
SingleReads int64
BatchedWrites int64
SingleWrites int64
BatchedFrames int64
SingleFrames int64
WriteFrames int64
CGOCallsReduced int64
OSThreadPinTime time.Duration // time.Duration is int64 internally
WriteThreadTime time.Duration // time.Duration is int64 internally
LastBatchTime time.Time
LastWriteTime time.Time
}
type batchReadRequest struct {
buffer []byte
resultChan chan batchReadResult
timestamp time.Time
}
type batchReadResult struct {
length int
err error
}
type batchWriteRequest struct {
buffer []byte // Buffer for backward compatibility
opusData []byte // Opus encoded data for decode-write operations
pcmBuffer []byte // PCM buffer for decode-write operations
resultChan chan batchWriteResult
timestamp time.Time
}
type batchWriteResult struct {
length int
err error
}
// NewBatchAudioProcessor creates a new batch audio processor
func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor {
// Validate input parameters with minimal overhead
if batchSize <= 0 || batchSize > 1000 {
batchSize = Config.BatchProcessorFramesPerBatch
}
if batchDuration <= 0 {
batchDuration = Config.BatchProcessingDelay
}
// Use optimized queue sizes from configuration
queueSize := Config.BatchProcessorMaxQueueSize
if queueSize <= 0 {
queueSize = batchSize * 2 // Fallback to double batch size
}
ctx, cancel := context.WithCancel(context.Background())
// Pre-allocate logger to avoid repeated allocations
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
frameSize := Config.MinReadEncodeBuffer
if frameSize == 0 {
frameSize = 1500 // Safe fallback
}
processor := &BatchAudioProcessor{
ctx: ctx,
cancel: cancel,
logger: &logger,
batchSize: batchSize,
batchDuration: batchDuration,
readQueue: make(chan batchReadRequest, queueSize),
writeQueue: make(chan batchWriteRequest, queueSize),
readBufPool: &sync.Pool{
New: func() interface{} {
return make([]byte, 0, frameSize)
},
},
writeBufPool: &sync.Pool{
New: func() interface{} {
return make([]byte, 0, frameSize)
},
},
}
return processor
}
// Start initializes and starts the batch processor
func (bap *BatchAudioProcessor) Start() error {
if !atomic.CompareAndSwapInt32(&bap.running, 0, 1) {
return nil // Already running
}
// Initialize CGO resources once per processor lifecycle
if !atomic.CompareAndSwapInt32(&bap.initialized, 0, 1) {
return nil // Already initialized
}
// Start batch processing goroutines
go bap.batchReadProcessor()
go bap.batchWriteProcessor()
bap.logger.Info().Int("batch_size", bap.batchSize).
Dur("batch_duration", bap.batchDuration).
Msg("batch audio processor started")
return nil
}
// Stop cleanly shuts down the batch processor
func (bap *BatchAudioProcessor) Stop() {
if !atomic.CompareAndSwapInt32(&bap.running, 1, 0) {
return // Already stopped
}
bap.cancel()
// Wait for processing to complete
time.Sleep(bap.batchDuration + Config.BatchProcessingDelay)
bap.logger.Info().Msg("batch audio processor stopped")
}
// BatchReadEncode performs batched audio read and encode operations
func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) {
// Validate buffer before processing
if err := ValidateBufferSize(len(buffer)); err != nil {
// Only log validation errors in debug mode to reduce overhead
if bap.logger.GetLevel() <= zerolog.DebugLevel {
bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing")
}
return 0, err
}
if !bap.IsRunning() {
// Fallback to single operation if batch processor is not running
// Use sampling to reduce atomic operations overhead
if atomic.LoadInt64(&bap.stats.SingleReads)%10 == 0 {
atomic.AddInt64(&bap.stats.SingleReads, 10)
atomic.AddInt64(&bap.stats.SingleFrames, 10)
}
return CGOAudioReadEncode(buffer)
}
resultChan := make(chan batchReadResult, 1)
request := batchReadRequest{
buffer: buffer,
resultChan: resultChan,
timestamp: time.Now(),
}
// Try to queue the request with non-blocking send
select {
case bap.readQueue <- request:
// Successfully queued
default:
// Queue is full, fallback to single operation
// Use sampling to reduce atomic operations overhead
if atomic.LoadInt64(&bap.stats.SingleReads)%10 == 0 {
atomic.AddInt64(&bap.stats.SingleReads, 10)
atomic.AddInt64(&bap.stats.SingleFrames, 10)
}
return CGOAudioReadEncode(buffer)
}
// Wait for result with timeout
select {
case result := <-resultChan:
return result.length, result.err
case <-time.After(Config.BatchProcessorTimeout):
// Timeout, fallback to single operation
// Use sampling to reduce atomic operations overhead
if atomic.LoadInt64(&bap.stats.SingleReads)%10 == 0 {
atomic.AddInt64(&bap.stats.SingleReads, 10)
atomic.AddInt64(&bap.stats.SingleFrames, 10)
}
return CGOAudioReadEncode(buffer)
}
}
// BatchDecodeWrite performs batched audio decode and write operations
// This is the legacy version that uses a single buffer
func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) {
// Validate buffer before processing
if err := ValidateBufferSize(len(buffer)); err != nil {
// Only log validation errors in debug mode to reduce overhead
if bap.logger.GetLevel() <= zerolog.DebugLevel {
bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing")
}
return 0, err
}
if !bap.IsRunning() {
// Fallback to single operation if batch processor is not running
// Use sampling to reduce atomic operations overhead
if atomic.LoadInt64(&bap.stats.SingleWrites)%10 == 0 {
atomic.AddInt64(&bap.stats.SingleWrites, 10)
atomic.AddInt64(&bap.stats.WriteFrames, 10)
}
return CGOAudioDecodeWriteLegacy(buffer)
}
resultChan := make(chan batchWriteResult, 1)
request := batchWriteRequest{
buffer: buffer,
resultChan: resultChan,
timestamp: time.Now(),
}
// Try to queue the request with non-blocking send
select {
case bap.writeQueue <- request:
// Successfully queued
default:
// Queue is full, fall back to single operation
// Use sampling to reduce atomic operations overhead
if atomic.LoadInt64(&bap.stats.SingleWrites)%10 == 0 {
atomic.AddInt64(&bap.stats.SingleWrites, 10)
atomic.AddInt64(&bap.stats.WriteFrames, 10)
}
return CGOAudioDecodeWriteLegacy(buffer)
}
// Wait for result with timeout
select {
case result := <-resultChan:
return result.length, result.err
case <-time.After(Config.BatchProcessorTimeout):
// Use sampling to reduce atomic operations overhead
if atomic.LoadInt64(&bap.stats.SingleWrites)%10 == 0 {
atomic.AddInt64(&bap.stats.SingleWrites, 10)
atomic.AddInt64(&bap.stats.WriteFrames, 10)
}
return CGOAudioDecodeWriteLegacy(buffer)
}
}
// BatchDecodeWriteWithBuffers performs batched audio decode and write operations with separate opus and PCM buffers
func (bap *BatchAudioProcessor) BatchDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
// Validate buffers before processing
if len(opusData) == 0 {
return 0, fmt.Errorf("empty opus data buffer")
}
if len(pcmBuffer) == 0 {
return 0, fmt.Errorf("empty PCM buffer")
}
if !bap.IsRunning() {
// Fallback to single operation if batch processor is not running
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
// Use the optimized function with separate buffers
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
resultChan := make(chan batchWriteResult, 1)
request := batchWriteRequest{
opusData: opusData,
pcmBuffer: pcmBuffer,
resultChan: resultChan,
timestamp: time.Now(),
}
// Try to queue the request with non-blocking send
select {
case bap.writeQueue <- request:
// Successfully queued
default:
// Queue is full, fall back to single operation
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
// Use the optimized function with separate buffers
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
// Wait for result with timeout
select {
case result := <-resultChan:
return result.length, result.err
case <-time.After(Config.BatchProcessorTimeout):
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
// Use the optimized function with separate buffers
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
}
// batchReadProcessor processes batched read operations
func (bap *BatchAudioProcessor) batchReadProcessor() {
defer bap.logger.Debug().Msg("batch read processor stopped")
ticker := time.NewTicker(bap.batchDuration)
defer ticker.Stop()
var batch []batchReadRequest
batch = make([]batchReadRequest, 0, bap.batchSize)
for atomic.LoadInt32(&bap.running) == 1 {
select {
case <-bap.ctx.Done():
return
case req := <-bap.readQueue:
batch = append(batch, req)
if len(batch) >= bap.batchSize {
bap.processBatchRead(batch)
batch = batch[:0] // Clear slice but keep capacity
}
case <-ticker.C:
if len(batch) > 0 {
bap.processBatchRead(batch)
batch = batch[:0] // Clear slice but keep capacity
}
}
}
// Process any remaining requests
if len(batch) > 0 {
bap.processBatchRead(batch)
}
}
// batchWriteProcessor processes batched write operations
func (bap *BatchAudioProcessor) batchWriteProcessor() {
defer bap.logger.Debug().Msg("batch write processor stopped")
ticker := time.NewTicker(bap.batchDuration)
defer ticker.Stop()
var batch []batchWriteRequest
batch = make([]batchWriteRequest, 0, bap.batchSize)
for atomic.LoadInt32(&bap.running) == 1 {
select {
case <-bap.ctx.Done():
return
case req := <-bap.writeQueue:
batch = append(batch, req)
if len(batch) >= bap.batchSize {
bap.processBatchWrite(batch)
batch = batch[:0] // Clear slice but keep capacity
}
case <-ticker.C:
if len(batch) > 0 {
bap.processBatchWrite(batch)
batch = batch[:0] // Clear slice but keep capacity
}
}
}
// Process any remaining requests
if len(batch) > 0 {
bap.processBatchWrite(batch)
}
}
// processBatchRead processes a batch of read requests efficiently
func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
batchSize := len(batch)
if batchSize == 0 {
return
}
threadPinningThreshold := Config.BatchProcessorThreadPinningThreshold
if threadPinningThreshold == 0 {
threadPinningThreshold = Config.MinBatchSizeForThreadPinning // Fallback
}
// Only pin to OS thread for large batches to reduce thread contention
var start time.Time
threadWasPinned := false
if batchSize >= threadPinningThreshold && atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) {
start = time.Now()
threadWasPinned = true
runtime.LockOSThread()
}
// Batch stats updates to reduce atomic operations (update once per batch instead of per frame)
atomic.AddInt64(&bap.stats.BatchedReads, 1)
atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize))
if batchSize > 1 {
atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1))
}
// Process each request in the batch with minimal overhead
for i := range batch {
req := &batch[i]
length, err := CGOAudioReadEncode(req.buffer)
// Send result back (non-blocking) - reuse result struct
select {
case req.resultChan <- batchReadResult{length: length, err: err}:
default:
// Requestor timed out, drop result
}
}
// Release thread lock if we pinned it
if threadWasPinned {
runtime.UnlockOSThread()
atomic.StoreInt32(&bap.threadPinned, 0)
bap.stats.OSThreadPinTime += time.Since(start)
}
// Update timestamp only once per batch instead of per frame
bap.stats.LastBatchTime = time.Now()
}
// processBatchWrite processes a batch of write requests efficiently
func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) {
if len(batch) == 0 {
return
}
threadPinningThreshold := Config.BatchProcessorThreadPinningThreshold
if threadPinningThreshold == 0 {
threadPinningThreshold = Config.MinBatchSizeForThreadPinning // Fallback
}
// Only pin to OS thread for large batches to reduce thread contention
start := time.Now()
shouldPinThread := len(batch) >= threadPinningThreshold
// Track if we pinned the thread in this call
threadWasPinned := false
if shouldPinThread && atomic.CompareAndSwapInt32(&bap.writePinned, 0, 1) {
threadWasPinned = true
runtime.LockOSThread()
// Priority scheduler not implemented - using default thread priority
}
batchSize := len(batch)
atomic.AddInt64(&bap.stats.BatchedWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, int64(batchSize))
if batchSize > 1 {
atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1))
}
// Add deferred function to release thread lock if we pinned it
if threadWasPinned {
defer func() {
// Priority scheduler not implemented - using default thread priority
runtime.UnlockOSThread()
atomic.StoreInt32(&bap.writePinned, 0)
bap.stats.WriteThreadTime += time.Since(start)
}()
}
// Process each request in the batch
for _, req := range batch {
var length int
var err error
// Handle both legacy and new decode-write operations
if req.opusData != nil && req.pcmBuffer != nil {
// New style with separate opus data and PCM buffer
length, err = CGOAudioDecodeWrite(req.opusData, req.pcmBuffer)
} else {
// Legacy style with single buffer
length, err = CGOAudioDecodeWriteLegacy(req.buffer)
}
result := batchWriteResult{
length: length,
err: err,
}
// Send result back (non-blocking)
select {
case req.resultChan <- result:
default:
// Requestor timed out, drop result
}
}
bap.stats.LastWriteTime = time.Now()
}
// GetStats returns current batch processor statistics
func (bap *BatchAudioProcessor) GetStats() BatchAudioStats {
return BatchAudioStats{
BatchedReads: atomic.LoadInt64(&bap.stats.BatchedReads),
SingleReads: atomic.LoadInt64(&bap.stats.SingleReads),
BatchedWrites: atomic.LoadInt64(&bap.stats.BatchedWrites),
SingleWrites: atomic.LoadInt64(&bap.stats.SingleWrites),
BatchedFrames: atomic.LoadInt64(&bap.stats.BatchedFrames),
SingleFrames: atomic.LoadInt64(&bap.stats.SingleFrames),
WriteFrames: atomic.LoadInt64(&bap.stats.WriteFrames),
CGOCallsReduced: atomic.LoadInt64(&bap.stats.CGOCallsReduced),
OSThreadPinTime: bap.stats.OSThreadPinTime,
WriteThreadTime: bap.stats.WriteThreadTime,
LastBatchTime: bap.stats.LastBatchTime,
LastWriteTime: bap.stats.LastWriteTime,
}
}
// IsRunning returns whether the batch processor is running
func (bap *BatchAudioProcessor) IsRunning() bool {
return atomic.LoadInt32(&bap.running) == 1
}
// Global batch processor instance
var (
globalBatchProcessor unsafe.Pointer // *BatchAudioProcessor
batchProcessorInitialized int32
)
// GetBatchAudioProcessor returns the global batch processor instance
func GetBatchAudioProcessor() *BatchAudioProcessor {
ptr := atomic.LoadPointer(&globalBatchProcessor)
if ptr != nil {
return (*BatchAudioProcessor)(ptr)
}
// Initialize on first use
if atomic.CompareAndSwapInt32(&batchProcessorInitialized, 0, 1) {
processor := NewBatchAudioProcessor(Config.BatchProcessorFramesPerBatch, Config.BatchProcessorTimeout)
atomic.StorePointer(&globalBatchProcessor, unsafe.Pointer(processor))
return processor
}
// Another goroutine initialized it, try again
ptr = atomic.LoadPointer(&globalBatchProcessor)
if ptr != nil {
return (*BatchAudioProcessor)(ptr)
}
// Fallback: create a new processor (should rarely happen)
return NewBatchAudioProcessor(Config.BatchProcessorFramesPerBatch, Config.BatchProcessorTimeout)
}
// EnableBatchAudioProcessing enables the global batch processor
func EnableBatchAudioProcessing() error {
processor := GetBatchAudioProcessor()
return processor.Start()
}
// DisableBatchAudioProcessing disables the global batch processor
func DisableBatchAudioProcessing() {
ptr := atomic.LoadPointer(&globalBatchProcessor)
if ptr != nil {
processor := (*BatchAudioProcessor)(ptr)
processor.Stop()
}
}
// BatchCGOAudioReadEncode is a batched version of CGOAudioReadEncode
func BatchCGOAudioReadEncode(buffer []byte) (int, error) {
processor := GetBatchAudioProcessor()
if processor == nil || !processor.IsRunning() {
// Fall back to non-batched version if processor is not running
return CGOAudioReadEncode(buffer)
}
return processor.BatchReadEncode(buffer)
}
// BatchCGOAudioDecodeWrite is a batched version of CGOAudioDecodeWrite
func BatchCGOAudioDecodeWrite(buffer []byte) (int, error) {
processor := GetBatchAudioProcessor()
if processor == nil || !processor.IsRunning() {
// Fall back to non-batched version if processor is not running
return CGOAudioDecodeWriteLegacy(buffer)
}
return processor.BatchDecodeWrite(buffer)
}
// BatchCGOAudioDecodeWriteWithBuffers is a batched version of CGOAudioDecodeWrite that uses separate opus and PCM buffers
func BatchCGOAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
processor := GetBatchAudioProcessor()
if processor == nil || !processor.IsRunning() {
// Fall back to non-batched version if processor is not running
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
return processor.BatchDecodeWriteWithBuffers(opusData, pcmBuffer)
}

View File

@ -1,331 +0,0 @@
//go:build cgo
package audio
import (
"errors"
"sync"
"sync/atomic"
"unsafe"
)
// BatchReferenceManager handles batch reference counting operations
// to reduce atomic operation overhead for high-frequency frame operations
type BatchReferenceManager struct {
// Batch operations queue
batchQueue chan batchRefOperation
workerPool chan struct{} // Worker pool semaphore
running int32
wg sync.WaitGroup
// Statistics
batchedOps int64
singleOps int64
batchSavings int64 // Number of atomic operations saved
}
type batchRefOperation struct {
frames []*ZeroCopyAudioFrame
operation refOperationType
resultCh chan batchRefResult
}
type refOperationType int
const (
refOpAddRef refOperationType = iota
refOpRelease
refOpMixed // For operations with mixed AddRef/Release
)
// Errors
var (
ErrUnsupportedOperation = errors.New("unsupported batch reference operation")
)
type batchRefResult struct {
finalReleases []bool // For Release operations, indicates which frames had final release
err error
}
// Global batch reference manager
var (
globalBatchRefManager *BatchReferenceManager
batchRefOnce sync.Once
)
// GetBatchReferenceManager returns the global batch reference manager
func GetBatchReferenceManager() *BatchReferenceManager {
batchRefOnce.Do(func() {
globalBatchRefManager = NewBatchReferenceManager()
globalBatchRefManager.Start()
})
return globalBatchRefManager
}
// NewBatchReferenceManager creates a new batch reference manager
func NewBatchReferenceManager() *BatchReferenceManager {
return &BatchReferenceManager{
batchQueue: make(chan batchRefOperation, 256), // Buffered for high throughput
workerPool: make(chan struct{}, 4), // 4 workers for parallel processing
}
}
// Start starts the batch reference manager workers
func (brm *BatchReferenceManager) Start() {
if !atomic.CompareAndSwapInt32(&brm.running, 0, 1) {
return // Already running
}
// Start worker goroutines
for i := 0; i < cap(brm.workerPool); i++ {
brm.wg.Add(1)
go brm.worker()
}
}
// Stop stops the batch reference manager
func (brm *BatchReferenceManager) Stop() {
if !atomic.CompareAndSwapInt32(&brm.running, 1, 0) {
return // Already stopped
}
close(brm.batchQueue)
brm.wg.Wait()
}
// worker processes batch reference operations
func (brm *BatchReferenceManager) worker() {
defer brm.wg.Done()
for op := range brm.batchQueue {
brm.processBatchOperation(op)
}
}
// processBatchOperation processes a batch of reference operations
func (brm *BatchReferenceManager) processBatchOperation(op batchRefOperation) {
result := batchRefResult{}
switch op.operation {
case refOpAddRef:
// Batch AddRef operations
for _, frame := range op.frames {
if frame != nil {
atomic.AddInt32(&frame.refCount, 1)
}
}
atomic.AddInt64(&brm.batchedOps, int64(len(op.frames)))
atomic.AddInt64(&brm.batchSavings, int64(len(op.frames)-1)) // Saved ops vs individual calls
case refOpRelease:
// Batch Release operations
result.finalReleases = make([]bool, len(op.frames))
for i, frame := range op.frames {
if frame != nil {
newCount := atomic.AddInt32(&frame.refCount, -1)
if newCount == 0 {
result.finalReleases[i] = true
// Return to pool if pooled
if frame.pooled {
globalZeroCopyPool.Put(frame)
}
}
}
}
atomic.AddInt64(&brm.batchedOps, int64(len(op.frames)))
atomic.AddInt64(&brm.batchSavings, int64(len(op.frames)-1))
case refOpMixed:
// Handle mixed operations (not implemented in this version)
result.err = ErrUnsupportedOperation
}
// Send result back
if op.resultCh != nil {
op.resultCh <- result
close(op.resultCh)
}
}
// BatchAddRef performs AddRef on multiple frames in a single batch
func (brm *BatchReferenceManager) BatchAddRef(frames []*ZeroCopyAudioFrame) error {
if len(frames) == 0 {
return nil
}
// For small batches, use direct operations to avoid overhead
if len(frames) <= 2 {
for _, frame := range frames {
if frame != nil {
frame.AddRef()
}
}
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
return nil
}
// Use batch processing for larger sets
if atomic.LoadInt32(&brm.running) == 0 {
// Fallback to individual operations if batch manager not running
for _, frame := range frames {
if frame != nil {
frame.AddRef()
}
}
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
return nil
}
resultCh := make(chan batchRefResult, 1)
op := batchRefOperation{
frames: frames,
operation: refOpAddRef,
resultCh: resultCh,
}
select {
case brm.batchQueue <- op:
// Wait for completion
<-resultCh
return nil
default:
// Queue full, fallback to individual operations
for _, frame := range frames {
if frame != nil {
frame.AddRef()
}
}
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
return nil
}
}
// BatchRelease performs Release on multiple frames in a single batch
// Returns a slice indicating which frames had their final reference released
func (brm *BatchReferenceManager) BatchRelease(frames []*ZeroCopyAudioFrame) ([]bool, error) {
if len(frames) == 0 {
return nil, nil
}
// For small batches, use direct operations
if len(frames) <= 2 {
finalReleases := make([]bool, len(frames))
for i, frame := range frames {
if frame != nil {
finalReleases[i] = frame.Release()
}
}
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
return finalReleases, nil
}
// Use batch processing for larger sets
if atomic.LoadInt32(&brm.running) == 0 {
// Fallback to individual operations
finalReleases := make([]bool, len(frames))
for i, frame := range frames {
if frame != nil {
finalReleases[i] = frame.Release()
}
}
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
return finalReleases, nil
}
resultCh := make(chan batchRefResult, 1)
op := batchRefOperation{
frames: frames,
operation: refOpRelease,
resultCh: resultCh,
}
select {
case brm.batchQueue <- op:
// Wait for completion
result := <-resultCh
return result.finalReleases, result.err
default:
// Queue full, fallback to individual operations
finalReleases := make([]bool, len(frames))
for i, frame := range frames {
if frame != nil {
finalReleases[i] = frame.Release()
}
}
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
return finalReleases, nil
}
}
// GetStats returns batch reference counting statistics
func (brm *BatchReferenceManager) GetStats() (batchedOps, singleOps, savings int64) {
return atomic.LoadInt64(&brm.batchedOps),
atomic.LoadInt64(&brm.singleOps),
atomic.LoadInt64(&brm.batchSavings)
}
// Convenience functions for global batch reference manager
// BatchAddRefFrames performs batch AddRef on multiple frames
func BatchAddRefFrames(frames []*ZeroCopyAudioFrame) error {
return GetBatchReferenceManager().BatchAddRef(frames)
}
// BatchReleaseFrames performs batch Release on multiple frames
func BatchReleaseFrames(frames []*ZeroCopyAudioFrame) ([]bool, error) {
return GetBatchReferenceManager().BatchRelease(frames)
}
// GetBatchReferenceStats returns global batch reference statistics
func GetBatchReferenceStats() (batchedOps, singleOps, savings int64) {
return GetBatchReferenceManager().GetStats()
}
// ZeroCopyFrameSlice provides utilities for working with slices of zero-copy frames
type ZeroCopyFrameSlice []*ZeroCopyAudioFrame
// AddRefAll performs batch AddRef on all frames in the slice
func (zfs ZeroCopyFrameSlice) AddRefAll() error {
return BatchAddRefFrames(zfs)
}
// ReleaseAll performs batch Release on all frames in the slice
func (zfs ZeroCopyFrameSlice) ReleaseAll() ([]bool, error) {
return BatchReleaseFrames(zfs)
}
// FilterNonNil returns a new slice with only non-nil frames
func (zfs ZeroCopyFrameSlice) FilterNonNil() ZeroCopyFrameSlice {
filtered := make(ZeroCopyFrameSlice, 0, len(zfs))
for _, frame := range zfs {
if frame != nil {
filtered = append(filtered, frame)
}
}
return filtered
}
// Len returns the number of frames in the slice
func (zfs ZeroCopyFrameSlice) Len() int {
return len(zfs)
}
// Get returns the frame at the specified index
func (zfs ZeroCopyFrameSlice) Get(index int) *ZeroCopyAudioFrame {
if index < 0 || index >= len(zfs) {
return nil
}
return zfs[index]
}
// UnsafePointers returns unsafe pointers for all frames (for CGO batch operations)
func (zfs ZeroCopyFrameSlice) UnsafePointers() []unsafe.Pointer {
pointers := make([]unsafe.Pointer, len(zfs))
for i, frame := range zfs {
if frame != nil {
pointers[i] = frame.UnsafePointer()
}
}
return pointers
}

View File

@ -1,415 +0,0 @@
//go:build cgo
package audio
import (
"sync"
"sync/atomic"
"time"
)
// BatchZeroCopyProcessor handles batch operations on zero-copy audio frames
// with optimized reference counting and memory management
type BatchZeroCopyProcessor struct {
// Configuration
maxBatchSize int
batchTimeout time.Duration
processingDelay time.Duration
adaptiveThreshold float64
// Processing queues
readEncodeQueue chan *batchZeroCopyRequest
decodeWriteQueue chan *batchZeroCopyRequest
// Worker management
workerPool chan struct{}
running int32
wg sync.WaitGroup
// Statistics
batchedFrames int64
singleFrames int64
batchSavings int64
processingTimeUs int64
adaptiveHits int64
adaptiveMisses int64
}
type batchZeroCopyRequest struct {
frames []*ZeroCopyAudioFrame
operation batchZeroCopyOperation
resultCh chan batchZeroCopyResult
timestamp time.Time
}
type batchZeroCopyOperation int
const (
batchOpReadEncode batchZeroCopyOperation = iota
batchOpDecodeWrite
batchOpMixed
)
type batchZeroCopyResult struct {
encodedData [][]byte // For read-encode operations
processedCount int // Number of successfully processed frames
err error
}
// Global batch zero-copy processor
var (
globalBatchZeroCopyProcessor *BatchZeroCopyProcessor
batchZeroCopyOnce sync.Once
)
// GetBatchZeroCopyProcessor returns the global batch zero-copy processor
func GetBatchZeroCopyProcessor() *BatchZeroCopyProcessor {
batchZeroCopyOnce.Do(func() {
globalBatchZeroCopyProcessor = NewBatchZeroCopyProcessor()
globalBatchZeroCopyProcessor.Start()
})
return globalBatchZeroCopyProcessor
}
// NewBatchZeroCopyProcessor creates a new batch zero-copy processor
func NewBatchZeroCopyProcessor() *BatchZeroCopyProcessor {
cache := Config
return &BatchZeroCopyProcessor{
maxBatchSize: cache.BatchProcessorFramesPerBatch,
batchTimeout: cache.BatchProcessorTimeout,
processingDelay: cache.BatchProcessingDelay,
adaptiveThreshold: cache.BatchProcessorAdaptiveThreshold,
readEncodeQueue: make(chan *batchZeroCopyRequest, cache.BatchProcessorMaxQueueSize),
decodeWriteQueue: make(chan *batchZeroCopyRequest, cache.BatchProcessorMaxQueueSize),
workerPool: make(chan struct{}, 4), // 4 workers for parallel processing
}
}
// Start starts the batch zero-copy processor workers
func (bzcp *BatchZeroCopyProcessor) Start() {
if !atomic.CompareAndSwapInt32(&bzcp.running, 0, 1) {
return // Already running
}
// Start worker goroutines for read-encode operations
for i := 0; i < cap(bzcp.workerPool)/2; i++ {
bzcp.wg.Add(1)
go bzcp.readEncodeWorker()
}
// Start worker goroutines for decode-write operations
for i := 0; i < cap(bzcp.workerPool)/2; i++ {
bzcp.wg.Add(1)
go bzcp.decodeWriteWorker()
}
}
// Stop stops the batch zero-copy processor
func (bzcp *BatchZeroCopyProcessor) Stop() {
if !atomic.CompareAndSwapInt32(&bzcp.running, 1, 0) {
return // Already stopped
}
close(bzcp.readEncodeQueue)
close(bzcp.decodeWriteQueue)
bzcp.wg.Wait()
}
// readEncodeWorker processes batch read-encode operations
func (bzcp *BatchZeroCopyProcessor) readEncodeWorker() {
defer bzcp.wg.Done()
for req := range bzcp.readEncodeQueue {
bzcp.processBatchReadEncode(req)
}
}
// decodeWriteWorker processes batch decode-write operations
func (bzcp *BatchZeroCopyProcessor) decodeWriteWorker() {
defer bzcp.wg.Done()
for req := range bzcp.decodeWriteQueue {
bzcp.processBatchDecodeWrite(req)
}
}
// processBatchReadEncode processes a batch of read-encode operations
func (bzcp *BatchZeroCopyProcessor) processBatchReadEncode(req *batchZeroCopyRequest) {
startTime := time.Now()
result := batchZeroCopyResult{}
// Batch AddRef all frames first
err := BatchAddRefFrames(req.frames)
if err != nil {
result.err = err
if req.resultCh != nil {
req.resultCh <- result
close(req.resultCh)
}
return
}
// Process frames using existing batch read-encode logic
encodedData, err := BatchReadEncode(len(req.frames))
if err != nil {
// Batch release frames on error
if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil {
// Log release error but preserve original error
_ = releaseErr
}
result.err = err
} else {
result.encodedData = encodedData
result.processedCount = len(encodedData)
// Batch release frames after successful processing
if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil {
// Log release error but don't fail the operation
_ = releaseErr
}
}
// Update statistics
atomic.AddInt64(&bzcp.batchedFrames, int64(len(req.frames)))
atomic.AddInt64(&bzcp.batchSavings, int64(len(req.frames)-1))
atomic.AddInt64(&bzcp.processingTimeUs, time.Since(startTime).Microseconds())
// Send result back
if req.resultCh != nil {
req.resultCh <- result
close(req.resultCh)
}
}
// processBatchDecodeWrite processes a batch of decode-write operations
func (bzcp *BatchZeroCopyProcessor) processBatchDecodeWrite(req *batchZeroCopyRequest) {
startTime := time.Now()
result := batchZeroCopyResult{}
// Batch AddRef all frames first
err := BatchAddRefFrames(req.frames)
if err != nil {
result.err = err
if req.resultCh != nil {
req.resultCh <- result
close(req.resultCh)
}
return
}
// Extract data from zero-copy frames for batch processing
frameData := make([][]byte, len(req.frames))
for i, frame := range req.frames {
if frame != nil {
// Get data from zero-copy frame
frameData[i] = frame.Data()[:frame.Length()]
}
}
// Process frames using existing batch decode-write logic
err = BatchDecodeWrite(frameData)
if err != nil {
result.err = err
} else {
result.processedCount = len(req.frames)
}
// Batch release frames
if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil {
// Log release error but don't override processing error
_ = releaseErr
}
// Update statistics
atomic.AddInt64(&bzcp.batchedFrames, int64(len(req.frames)))
atomic.AddInt64(&bzcp.batchSavings, int64(len(req.frames)-1))
atomic.AddInt64(&bzcp.processingTimeUs, time.Since(startTime).Microseconds())
// Send result back
if req.resultCh != nil {
req.resultCh <- result
close(req.resultCh)
}
}
// BatchReadEncodeZeroCopy performs batch read-encode on zero-copy frames
func (bzcp *BatchZeroCopyProcessor) BatchReadEncodeZeroCopy(frames []*ZeroCopyAudioFrame) ([][]byte, error) {
if len(frames) == 0 {
return nil, nil
}
// For small batches, use direct operations to avoid overhead
if len(frames) <= 2 {
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleReadEncode(frames)
}
// Use adaptive threshold to determine batch vs single processing
batchedFrames := atomic.LoadInt64(&bzcp.batchedFrames)
singleFrames := atomic.LoadInt64(&bzcp.singleFrames)
totalFrames := batchedFrames + singleFrames
if totalFrames > 100 { // Only apply adaptive logic after some samples
batchRatio := float64(batchedFrames) / float64(totalFrames)
if batchRatio < bzcp.adaptiveThreshold {
// Batch processing not effective, use single processing
atomic.AddInt64(&bzcp.adaptiveMisses, 1)
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleReadEncode(frames)
}
atomic.AddInt64(&bzcp.adaptiveHits, 1)
}
// Use batch processing
if atomic.LoadInt32(&bzcp.running) == 0 {
// Fallback to single processing if batch processor not running
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleReadEncode(frames)
}
resultCh := make(chan batchZeroCopyResult, 1)
req := &batchZeroCopyRequest{
frames: frames,
operation: batchOpReadEncode,
resultCh: resultCh,
timestamp: time.Now(),
}
select {
case bzcp.readEncodeQueue <- req:
// Wait for completion
result := <-resultCh
return result.encodedData, result.err
default:
// Queue full, fallback to single processing
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleReadEncode(frames)
}
}
// BatchDecodeWriteZeroCopy performs batch decode-write on zero-copy frames
func (bzcp *BatchZeroCopyProcessor) BatchDecodeWriteZeroCopy(frames []*ZeroCopyAudioFrame) error {
if len(frames) == 0 {
return nil
}
// For small batches, use direct operations
if len(frames) <= 2 {
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleDecodeWrite(frames)
}
// Use adaptive threshold
batchedFrames := atomic.LoadInt64(&bzcp.batchedFrames)
singleFrames := atomic.LoadInt64(&bzcp.singleFrames)
totalFrames := batchedFrames + singleFrames
if totalFrames > 100 {
batchRatio := float64(batchedFrames) / float64(totalFrames)
if batchRatio < bzcp.adaptiveThreshold {
atomic.AddInt64(&bzcp.adaptiveMisses, 1)
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleDecodeWrite(frames)
}
atomic.AddInt64(&bzcp.adaptiveHits, 1)
}
// Use batch processing
if atomic.LoadInt32(&bzcp.running) == 0 {
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleDecodeWrite(frames)
}
resultCh := make(chan batchZeroCopyResult, 1)
req := &batchZeroCopyRequest{
frames: frames,
operation: batchOpDecodeWrite,
resultCh: resultCh,
timestamp: time.Now(),
}
select {
case bzcp.decodeWriteQueue <- req:
// Wait for completion
result := <-resultCh
return result.err
default:
// Queue full, fallback to single processing
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleDecodeWrite(frames)
}
}
// processSingleReadEncode processes frames individually for read-encode
func (bzcp *BatchZeroCopyProcessor) processSingleReadEncode(frames []*ZeroCopyAudioFrame) ([][]byte, error) {
// Extract data and use existing batch processing
frameData := make([][]byte, 0, len(frames))
for _, frame := range frames {
if frame != nil {
frame.AddRef()
frameData = append(frameData, frame.Data()[:frame.Length()])
}
}
// Use existing batch read-encode
result, err := BatchReadEncode(len(frameData))
// Release frames
for _, frame := range frames {
if frame != nil {
frame.Release()
}
}
return result, err
}
// processSingleDecodeWrite processes frames individually for decode-write
func (bzcp *BatchZeroCopyProcessor) processSingleDecodeWrite(frames []*ZeroCopyAudioFrame) error {
// Extract data and use existing batch processing
frameData := make([][]byte, 0, len(frames))
for _, frame := range frames {
if frame != nil {
frame.AddRef()
frameData = append(frameData, frame.Data()[:frame.Length()])
}
}
// Use existing batch decode-write
err := BatchDecodeWrite(frameData)
// Release frames
for _, frame := range frames {
if frame != nil {
frame.Release()
}
}
return err
}
// GetBatchZeroCopyStats returns batch zero-copy processing statistics
func (bzcp *BatchZeroCopyProcessor) GetBatchZeroCopyStats() (batchedFrames, singleFrames, savings, processingTimeUs, adaptiveHits, adaptiveMisses int64) {
return atomic.LoadInt64(&bzcp.batchedFrames),
atomic.LoadInt64(&bzcp.singleFrames),
atomic.LoadInt64(&bzcp.batchSavings),
atomic.LoadInt64(&bzcp.processingTimeUs),
atomic.LoadInt64(&bzcp.adaptiveHits),
atomic.LoadInt64(&bzcp.adaptiveMisses)
}
// Convenience functions for global batch zero-copy processor
// BatchReadEncodeZeroCopyFrames performs batch read-encode on zero-copy frames
func BatchReadEncodeZeroCopyFrames(frames []*ZeroCopyAudioFrame) ([][]byte, error) {
return GetBatchZeroCopyProcessor().BatchReadEncodeZeroCopy(frames)
}
// BatchDecodeWriteZeroCopyFrames performs batch decode-write on zero-copy frames
func BatchDecodeWriteZeroCopyFrames(frames []*ZeroCopyAudioFrame) error {
return GetBatchZeroCopyProcessor().BatchDecodeWriteZeroCopy(frames)
}
// GetGlobalBatchZeroCopyStats returns global batch zero-copy processing statistics
func GetGlobalBatchZeroCopyStats() (batchedFrames, singleFrames, savings, processingTimeUs, adaptiveHits, adaptiveMisses int64) {
return GetBatchZeroCopyProcessor().GetBatchZeroCopyStats()
}

File diff suppressed because it is too large Load Diff

View File

@ -5,10 +5,15 @@ package audio
import ( import (
"errors" "errors"
"fmt" "fmt"
"os"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"unsafe" "unsafe"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
) )
/* /*
@ -19,9 +24,7 @@ import (
*/ */
import "C" import "C"
// Optimized Go wrappers with reduced overhead
var ( var (
// Base error types for wrapping with context
errAudioInitFailed = errors.New("failed to init ALSA/Opus") errAudioInitFailed = errors.New("failed to init ALSA/Opus")
errAudioReadEncode = errors.New("audio read/encode error") errAudioReadEncode = errors.New("audio read/encode error")
errAudioDecodeWrite = errors.New("audio decode/write error") errAudioDecodeWrite = errors.New("audio decode/write error")
@ -91,6 +94,30 @@ func cgoAudioInit() error {
cache := GetCachedConfig() cache := GetCachedConfig()
cache.Update() cache.Update()
// Enable C trace logging if Go audio scope trace level is active
// Enable C trace logging if Go audio scope trace level is active
audioLogger := logging.GetSubsystemLogger("audio")
loggerTraceEnabled := audioLogger.GetLevel() <= zerolog.TraceLevel
// Manual check for audio scope in PION_LOG_TRACE (workaround for logging system bug)
manualTraceEnabled := false
pionTrace := os.Getenv("PION_LOG_TRACE")
if pionTrace != "" {
scopes := strings.Split(strings.ToLower(pionTrace), ",")
for _, scope := range scopes {
if strings.TrimSpace(scope) == "audio" {
manualTraceEnabled = true
break
}
}
}
// Use manual check as fallback if logging system fails
traceEnabled := loggerTraceEnabled || manualTraceEnabled
CGOSetTraceLogging(traceEnabled)
// Update C constants from cached config (atomic access, no locks) // Update C constants from cached config (atomic access, no locks)
C.update_audio_constants( C.update_audio_constants(
C.int(cache.opusBitrate.Load()), C.int(cache.opusBitrate.Load()),
@ -110,7 +137,7 @@ func cgoAudioInit() error {
C.int(Config.CGOMaxBackoffMicroseconds), C.int(Config.CGOMaxBackoffMicroseconds),
) )
result := C.jetkvm_audio_init() result := C.jetkvm_audio_capture_init()
if result != 0 { if result != 0 {
return newAudioInitError(int(result)) return newAudioInitError(int(result))
} }
@ -161,16 +188,6 @@ type AudioConfigCache struct {
inputProcessingTimeoutMS atomic.Int32 inputProcessingTimeoutMS atomic.Int32
maxRestartAttempts atomic.Int32 maxRestartAttempts atomic.Int32
// Batch processing related values
BatchProcessingTimeout time.Duration
BatchProcessorFramesPerBatch int
BatchProcessorTimeout time.Duration
BatchProcessingDelay time.Duration
MinBatchSizeForThreadPinning int
BatchProcessorMaxQueueSize int
BatchProcessorAdaptiveThreshold float64
BatchProcessorThreadPinningThreshold int
// Mutex for updating the cache // Mutex for updating the cache
mutex sync.RWMutex mutex sync.RWMutex
lastUpdate time.Time lastUpdate time.Time
@ -184,7 +201,7 @@ type AudioConfigCache struct {
// Global audio config cache instance // Global audio config cache instance
var globalAudioConfigCache = &AudioConfigCache{ var globalAudioConfigCache = &AudioConfigCache{
cacheExpiry: 30 * time.Second, // Increased from 10s to 30s to further reduce cache updates cacheExpiry: 30 * time.Second,
} }
// GetCachedConfig returns the global audio config cache instance // GetCachedConfig returns the global audio config cache instance
@ -234,16 +251,6 @@ func (c *AudioConfigCache) Update() {
c.minOpusBitrate.Store(int32(Config.MinOpusBitrate)) c.minOpusBitrate.Store(int32(Config.MinOpusBitrate))
c.maxOpusBitrate.Store(int32(Config.MaxOpusBitrate)) c.maxOpusBitrate.Store(int32(Config.MaxOpusBitrate))
// Update batch processing related values
c.BatchProcessingTimeout = 100 * time.Millisecond // Fixed timeout for batch processing
c.BatchProcessorFramesPerBatch = Config.BatchProcessorFramesPerBatch
c.BatchProcessorTimeout = Config.BatchProcessorTimeout
c.BatchProcessingDelay = Config.BatchProcessingDelay
c.MinBatchSizeForThreadPinning = Config.MinBatchSizeForThreadPinning
c.BatchProcessorMaxQueueSize = Config.BatchProcessorMaxQueueSize
c.BatchProcessorAdaptiveThreshold = Config.BatchProcessorAdaptiveThreshold
c.BatchProcessorThreadPinningThreshold = Config.BatchProcessorThreadPinningThreshold
// Pre-allocate common errors // Pre-allocate common errors
c.bufferTooSmallReadEncode = newBufferTooSmallError(0, Config.MinReadEncodeBuffer) c.bufferTooSmallReadEncode = newBufferTooSmallError(0, Config.MinReadEncodeBuffer)
c.bufferTooLargeDecodeWrite = newBufferTooLargeError(Config.MaxDecodeWriteBuffer+1, Config.MaxDecodeWriteBuffer) c.bufferTooLargeDecodeWrite = newBufferTooLargeError(Config.MaxDecodeWriteBuffer+1, Config.MaxDecodeWriteBuffer)
@ -251,6 +258,9 @@ func (c *AudioConfigCache) Update() {
c.lastUpdate = time.Now() c.lastUpdate = time.Now()
c.initialized.Store(true) c.initialized.Store(true)
c.lastUpdate = time.Now()
c.initialized.Store(true)
// Update the global validation cache as well // Update the global validation cache as well
if cachedMaxFrameSize != 0 { if cachedMaxFrameSize != 0 {
cachedMaxFrameSize = Config.MaxAudioFrameSize cachedMaxFrameSize = Config.MaxAudioFrameSize
@ -288,8 +298,6 @@ func (c *AudioConfigCache) GetBufferTooLargeError() error {
return c.bufferTooLargeDecodeWrite return c.bufferTooLargeDecodeWrite
} }
// Removed duplicate config caching system - using AudioConfigCache instead
// updateCacheIfNeeded updates cache only if expired to avoid overhead // updateCacheIfNeeded updates cache only if expired to avoid overhead
func updateCacheIfNeeded(cache *AudioConfigCache) { func updateCacheIfNeeded(cache *AudioConfigCache) {
if cache.initialized.Load() { if cache.initialized.Load() {
@ -335,6 +343,10 @@ func cgoAudioPlaybackInit() error {
cache := GetCachedConfig() cache := GetCachedConfig()
cache.Update() cache.Update()
// Enable C trace logging if Go audio scope trace level is active
audioLogger := logging.GetSubsystemLogger("audio")
CGOSetTraceLogging(audioLogger.GetLevel() <= zerolog.TraceLevel)
// No need to update C constants here as they're already set in cgoAudioInit // No need to update C constants here as they're already set in cgoAudioInit
ret := C.jetkvm_audio_playback_init() ret := C.jetkvm_audio_playback_init()
@ -348,9 +360,44 @@ func cgoAudioPlaybackClose() {
C.jetkvm_audio_playback_close() C.jetkvm_audio_playback_close()
} }
// Audio decode/write metrics for monitoring USB Gadget audio success
var (
audioDecodeWriteTotal atomic.Int64
audioDecodeWriteSuccess atomic.Int64
audioDecodeWriteFailures atomic.Int64
audioDecodeWriteRecovery atomic.Int64
audioDecodeWriteLastError atomic.Value
audioDecodeWriteLastTime atomic.Int64
)
// GetAudioDecodeWriteStats returns current audio decode/write statistics
func GetAudioDecodeWriteStats() (total, success, failures, recovery int64, lastError string, lastTime time.Time) {
total = audioDecodeWriteTotal.Load()
success = audioDecodeWriteSuccess.Load()
failures = audioDecodeWriteFailures.Load()
recovery = audioDecodeWriteRecovery.Load()
if err := audioDecodeWriteLastError.Load(); err != nil {
lastError = err.(string)
}
lastTimeNano := audioDecodeWriteLastTime.Load()
if lastTimeNano > 0 {
lastTime = time.Unix(0, lastTimeNano)
}
return
}
func cgoAudioDecodeWrite(buf []byte) (int, error) { func cgoAudioDecodeWrite(buf []byte) (int, error) {
start := time.Now()
audioDecodeWriteTotal.Add(1)
audioDecodeWriteLastTime.Store(start.UnixNano())
// Minimal validation - assume caller provides correct size // Minimal validation - assume caller provides correct size
if len(buf) == 0 { if len(buf) == 0 {
audioDecodeWriteFailures.Add(1)
audioDecodeWriteLastError.Store("empty buffer")
return 0, errEmptyBuffer return 0, errEmptyBuffer
} }
@ -359,14 +406,31 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) {
// Fast path for success // Fast path for success
if n >= 0 { if n >= 0 {
audioDecodeWriteSuccess.Add(1)
return n, nil return n, nil
} }
// Error handling with static errors // Error handling with static errors
if n == -1 { audioDecodeWriteFailures.Add(1)
return 0, errAudioInitFailed var errMsg string
var err error
switch n {
case -1:
errMsg = "audio system not initialized"
err = errAudioInitFailed
case -2:
errMsg = "audio device error or recovery failed"
err = errAudioDecodeWrite
audioDecodeWriteRecovery.Add(1)
default:
errMsg = fmt.Sprintf("unknown error code %d", n)
err = errAudioDecodeWrite
} }
return 0, errAudioDecodeWrite
audioDecodeWriteLastError.Store(errMsg)
return 0, err
} }
// updateOpusEncoderParams dynamically updates OPUS encoder parameters // updateOpusEncoderParams dynamically updates OPUS encoder parameters
@ -388,7 +452,9 @@ func updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType
// Buffer pool for reusing buffers in CGO functions // Buffer pool for reusing buffers in CGO functions
var ( var (
// Using SizedBufferPool for better memory management // Simple buffer pool for PCM data
pcmBufferPool = NewAudioBufferPool(Config.MaxPCMBufferSize)
// Track buffer pool usage // Track buffer pool usage
cgoBufferPoolGets atomic.Int64 cgoBufferPoolGets atomic.Int64
cgoBufferPoolPuts atomic.Int64 cgoBufferPoolPuts atomic.Int64
@ -396,19 +462,19 @@ var (
batchProcessingCount atomic.Int64 batchProcessingCount atomic.Int64
batchFrameCount atomic.Int64 batchFrameCount atomic.Int64
batchProcessingTime atomic.Int64 batchProcessingTime atomic.Int64
// Batch time tracking removed
) )
// GetBufferFromPool gets a buffer from the pool with at least the specified capacity // GetBufferFromPool gets a buffer from the pool with at least the specified capacity
func GetBufferFromPool(minCapacity int) []byte { func GetBufferFromPool(minCapacity int) []byte {
cgoBufferPoolGets.Add(1) cgoBufferPoolGets.Add(1)
return GetOptimalBuffer(minCapacity) // Use simple fixed-size buffer for PCM data
return pcmBufferPool.Get()
} }
// ReturnBufferToPool returns a buffer to the pool // ReturnBufferToPool returns a buffer to the pool
func ReturnBufferToPool(buf []byte) { func ReturnBufferToPool(buf []byte) {
cgoBufferPoolPuts.Add(1) cgoBufferPoolPuts.Add(1)
ReturnOptimalBuffer(buf) pcmBufferPool.Put(buf)
} }
// ReadEncodeWithPooledBuffer reads audio data and encodes it using a buffer from the pool // ReadEncodeWithPooledBuffer reads audio data and encodes it using a buffer from the pool
@ -451,125 +517,6 @@ func DecodeWriteWithPooledBuffer(data []byte) (int, error) {
return CGOAudioDecodeWrite(data, pcmBuffer) return CGOAudioDecodeWrite(data, pcmBuffer)
} }
// BatchReadEncode reads and encodes multiple audio frames in a single batch
// with optimized zero-copy frame management and batch reference counting
func BatchReadEncode(batchSize int) ([][]byte, error) {
// Simple batch processing without complex overhead
frames := make([][]byte, 0, batchSize)
frameSize := 4096 // Fixed frame size for performance
for i := 0; i < batchSize; i++ {
buf := make([]byte, frameSize)
n, err := cgoAudioReadEncode(buf)
if err != nil {
if i > 0 {
return frames, nil // Return partial batch
}
return nil, err
}
if n > 0 {
frames = append(frames, buf[:n])
}
}
return frames, nil
}
// BatchDecodeWrite decodes and writes multiple audio frames in a single batch
// This reduces CGO call overhead by processing multiple frames at once
// with optimized zero-copy frame management and batch reference counting
func BatchDecodeWrite(frames [][]byte) error {
// Validate input
if len(frames) == 0 {
return nil
}
// Convert to zero-copy frames for optimized processing
zeroCopyFrames := make([]*ZeroCopyAudioFrame, 0, len(frames))
for _, frameData := range frames {
if len(frameData) > 0 {
frame := GetZeroCopyFrame()
frame.SetDataDirect(frameData) // Direct assignment without copy
zeroCopyFrames = append(zeroCopyFrames, frame)
}
}
// Use batch reference counting for efficient management
if len(zeroCopyFrames) > 0 {
// Batch AddRef all frames at once
err := BatchAddRefFrames(zeroCopyFrames)
if err != nil {
return err
}
// Ensure cleanup with batch release
defer func() {
if _, err := BatchReleaseFrames(zeroCopyFrames); err != nil {
// Log release error but don't fail the operation
_ = err
}
}()
}
// Get cached config
cache := GetCachedConfig()
// Only update cache if expired - avoid unnecessary overhead
// Use proper locking to avoid race condition
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
cache.mutex.RUnlock()
if cacheExpired {
cache.Update()
}
} else {
cache.Update()
}
// Track batch processing statistics - only if enabled
var startTime time.Time
// Batch time tracking removed
trackTime := false
if trackTime {
startTime = time.Now()
}
batchProcessingCount.Add(1)
// Get a PCM buffer from the pool for optimized decode-write
pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize())
defer ReturnBufferToPool(pcmBuffer)
// Process each zero-copy frame with optimized batch processing
frameCount := 0
for _, zcFrame := range zeroCopyFrames {
// Get frame data from zero-copy frame
frameData := zcFrame.Data()[:zcFrame.Length()]
if len(frameData) == 0 {
continue
}
// Process this frame using optimized implementation
_, err := CGOAudioDecodeWrite(frameData, pcmBuffer)
if err != nil {
// Update statistics before returning error
batchFrameCount.Add(int64(frameCount))
if trackTime {
batchProcessingTime.Add(time.Since(startTime).Microseconds())
}
return err
}
frameCount++
}
// Update statistics
batchFrameCount.Add(int64(frameCount))
if trackTime {
batchProcessingTime.Add(time.Since(startTime).Microseconds())
}
return nil
}
// GetBatchProcessingStats returns statistics about batch processing // GetBatchProcessingStats returns statistics about batch processing
func GetBatchProcessingStats() (count, frames, avgTimeUs int64) { func GetBatchProcessingStats() (count, frames, avgTimeUs int64) {
count = batchProcessingCount.Load() count = batchProcessingCount.Load()
@ -587,11 +534,19 @@ func GetBatchProcessingStats() (count, frames, avgTimeUs int64) {
// cgoAudioDecodeWriteWithBuffers decodes opus data and writes to PCM buffer // cgoAudioDecodeWriteWithBuffers decodes opus data and writes to PCM buffer
// This implementation uses separate buffers for opus data and PCM output // This implementation uses separate buffers for opus data and PCM output
func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) { func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
start := time.Now()
audioDecodeWriteTotal.Add(1)
audioDecodeWriteLastTime.Store(start.UnixNano())
// Validate input // Validate input
if len(opusData) == 0 { if len(opusData) == 0 {
audioDecodeWriteFailures.Add(1)
audioDecodeWriteLastError.Store("empty opus data")
return 0, errEmptyBuffer return 0, errEmptyBuffer
} }
if len(pcmBuffer) == 0 { if cap(pcmBuffer) == 0 {
audioDecodeWriteFailures.Add(1)
audioDecodeWriteLastError.Store("empty pcm buffer capacity")
return 0, errEmptyBuffer return 0, errEmptyBuffer
} }
@ -613,39 +568,67 @@ func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, err
// Ensure data doesn't exceed max packet size // Ensure data doesn't exceed max packet size
maxPacketSize := cache.GetMaxPacketSize() maxPacketSize := cache.GetMaxPacketSize()
if len(opusData) > maxPacketSize { if len(opusData) > maxPacketSize {
audioDecodeWriteFailures.Add(1)
errMsg := fmt.Sprintf("opus packet too large: %d > %d", len(opusData), maxPacketSize)
audioDecodeWriteLastError.Store(errMsg)
return 0, newBufferTooLargeError(len(opusData), maxPacketSize) return 0, newBufferTooLargeError(len(opusData), maxPacketSize)
} }
// Metrics tracking only - detailed logging handled at application level
// Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is never nil for non-empty slices // Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is never nil for non-empty slices
n := int(C.jetkvm_audio_decode_write(unsafe.Pointer(&opusData[0]), C.int(len(opusData)))) n := int(C.jetkvm_audio_decode_write(unsafe.Pointer(&opusData[0]), C.int(len(opusData))))
// Fast path for success case // Fast path for success case
if n >= 0 { if n >= 0 {
audioDecodeWriteSuccess.Add(1)
return n, nil return n, nil
} }
// Handle error cases with static error codes to reduce allocations // Handle error cases with static error codes to reduce allocations
audioDecodeWriteFailures.Add(1)
var errMsg string
var err error
switch n { switch n {
case -1: case -1:
return 0, errAudioInitFailed errMsg = "audio system not initialized"
err = errAudioInitFailed
case -2: case -2:
return 0, errAudioDecodeWrite errMsg = "audio device error or recovery failed"
err = errAudioDecodeWrite
audioDecodeWriteRecovery.Add(1)
default: default:
return 0, newAudioDecodeWriteError(n) errMsg = fmt.Sprintf("unknown error code %d", n)
err = newAudioDecodeWriteError(n)
} }
audioDecodeWriteLastError.Store(errMsg)
return 0, err
} }
// Optimized CGO function aliases - use direct function calls to reduce overhead // Optimized CGO function aliases - use direct function calls to reduce overhead
// These are now direct function aliases instead of variable assignments // These are now direct function aliases instead of variable assignments
func CGOAudioInit() error { return cgoAudioInit() } func CGOAudioInit() error { return cgoAudioInit() }
func CGOAudioClose() { cgoAudioClose() } func CGOAudioClose() { cgoAudioClose() }
func CGOAudioReadEncode(buf []byte) (int, error) { return cgoAudioReadEncode(buf) } func CGOAudioReadEncode(buf []byte) (int, error) { return cgoAudioReadEncode(buf) }
func CGOAudioPlaybackInit() error { return cgoAudioPlaybackInit() } func CGOAudioPlaybackInit() error { return cgoAudioPlaybackInit() }
func CGOAudioPlaybackClose() { cgoAudioPlaybackClose() } func CGOAudioPlaybackClose() { cgoAudioPlaybackClose() }
func CGOAudioDecodeWriteLegacy(buf []byte) (int, error) { return cgoAudioDecodeWrite(buf) }
func CGOAudioDecodeWrite(opusData []byte, pcmBuffer []byte) (int, error) { func CGOAudioDecodeWrite(opusData []byte, pcmBuffer []byte) (int, error) {
return cgoAudioDecodeWriteWithBuffers(opusData, pcmBuffer) return cgoAudioDecodeWriteWithBuffers(opusData, pcmBuffer)
} }
func CGOUpdateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx int) error { func CGOUpdateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx int) error {
return updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx) return updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx)
} }
func CGOSetTraceLogging(enabled bool) {
var cEnabled C.int
if enabled {
cEnabled = 1
} else {
cEnabled = 0
}
C.set_trace_logging(cEnabled)
}

View File

@ -1,15 +0,0 @@
package audio
import "time"
// GetMetricsUpdateInterval returns the current metrics update interval from centralized config
func GetMetricsUpdateInterval() time.Duration {
return Config.MetricsUpdateInterval
}
// SetMetricsUpdateInterval sets the metrics update interval in centralized config
func SetMetricsUpdateInterval(interval time.Duration) {
config := Config
config.MetricsUpdateInterval = interval
UpdateConfig(config)
}

View File

@ -152,11 +152,6 @@ type AudioConfigConstants struct {
MemoryFactor float64 MemoryFactor float64
LatencyFactor float64 LatencyFactor float64
// Adaptive Buffer Configuration
AdaptiveMinBufferSize int // Minimum buffer size in frames for adaptive buffering
AdaptiveMaxBufferSize int // Maximum buffer size in frames for adaptive buffering
AdaptiveDefaultBufferSize int // Default buffer size in frames for adaptive buffering
// Timing Configuration // Timing Configuration
RetryDelay time.Duration // Retry delay RetryDelay time.Duration // Retry delay
MaxRetryDelay time.Duration // Maximum retry delay MaxRetryDelay time.Duration // Maximum retry delay
@ -171,22 +166,17 @@ type AudioConfigConstants struct {
OutputSupervisorTimeout time.Duration // 5s OutputSupervisorTimeout time.Duration // 5s
BatchProcessingDelay time.Duration // 10ms BatchProcessingDelay time.Duration // 10ms
// Adaptive Buffer Configuration // System threshold configuration for buffer management
// LowCPUThreshold defines CPU usage threshold for buffer size reduction. LowCPUThreshold float64 // CPU usage threshold for performance optimization
LowCPUThreshold float64 // 20% CPU threshold for buffer optimization HighCPUThreshold float64 // CPU usage threshold for performance limits
LowMemoryThreshold float64 // 50% memory threshold
// HighCPUThreshold defines CPU usage threshold for buffer size increase. HighMemoryThreshold float64 // 75% memory threshold
HighCPUThreshold float64 // 60% CPU threshold CooldownPeriod time.Duration // 30s cooldown period
LowMemoryThreshold float64 // 50% memory threshold RollbackThreshold time.Duration // 300ms rollback threshold
HighMemoryThreshold float64 // 75% memory threshold
AdaptiveBufferTargetLatency time.Duration // 20ms target latency
CooldownPeriod time.Duration // 30s cooldown period
RollbackThreshold time.Duration // 300ms rollback threshold
MaxLatencyThreshold time.Duration // 200ms max latency MaxLatencyThreshold time.Duration // 200ms max latency
JitterThreshold time.Duration // 20ms jitter threshold JitterThreshold time.Duration // 20ms jitter threshold
LatencyOptimizationInterval time.Duration // 5s optimization interval LatencyOptimizationInterval time.Duration // 5s optimization interval
LatencyAdaptiveThreshold float64 // 0.8 adaptive threshold
MicContentionTimeout time.Duration // 200ms contention timeout MicContentionTimeout time.Duration // 200ms contention timeout
PreallocPercentage int // 20% preallocation percentage PreallocPercentage int // 20% preallocation percentage
BackoffStart time.Duration // 50ms initial backoff BackoffStart time.Duration // 50ms initial backoff
@ -199,7 +189,6 @@ type AudioConfigConstants struct {
PercentageMultiplier float64 // Multiplier for percentage calculations (100.0) PercentageMultiplier float64 // Multiplier for percentage calculations (100.0)
AveragingWeight float64 // Weight for weighted averaging (0.7) AveragingWeight float64 // Weight for weighted averaging (0.7)
ScalingFactor float64 // General scaling factor (1.5) ScalingFactor float64 // General scaling factor (1.5)
SmoothingFactor float64 // Smoothing factor for adaptive buffers (0.3)
CPUMemoryWeight float64 // Weight for CPU factor in calculations (0.5) CPUMemoryWeight float64 // Weight for CPU factor in calculations (0.5)
MemoryWeight float64 // Weight for memory factor (0.3) MemoryWeight float64 // Weight for memory factor (0.3)
LatencyWeight float64 // Weight for latency factor (0.2) LatencyWeight float64 // Weight for latency factor (0.2)
@ -213,32 +202,23 @@ type AudioConfigConstants struct {
CGOPCMBufferSize int // PCM buffer size for CGO audio processing CGOPCMBufferSize int // PCM buffer size for CGO audio processing
CGONanosecondsPerSecond float64 // Nanoseconds per second conversion CGONanosecondsPerSecond float64 // Nanoseconds per second conversion
// Batch Processing Constants
BatchProcessorFramesPerBatch int // Frames processed per batch (4)
BatchProcessorTimeout time.Duration // Batch processing timeout (5ms)
BatchProcessorMaxQueueSize int // Maximum batch queue size (16)
BatchProcessorAdaptiveThreshold float64 // Adaptive batch sizing threshold (0.8)
BatchProcessorThreadPinningThreshold int // Thread pinning threshold (8 frames)
// Output Streaming Constants // Output Streaming Constants
OutputStreamingFrameIntervalMS int // Output frame interval (20ms for 50 FPS) OutputStreamingFrameIntervalMS int // Output frame interval (20ms for 50 FPS)
// IPC Constants // IPC Constants
IPCInitialBufferFrames int // Initial IPC buffer size (500 frames) IPCInitialBufferFrames int // Initial IPC buffer size (500 frames)
EventTimeoutSeconds int EventTimeoutSeconds int
EventTimeFormatString string EventTimeFormatString string
EventSubscriptionDelayMS int EventSubscriptionDelayMS int
InputProcessingTimeoutMS int InputProcessingTimeoutMS int
AdaptiveBufferCPUMultiplier int InputSocketName string
AdaptiveBufferMemoryMultiplier int OutputSocketName string
InputSocketName string AudioInputComponentName string
OutputSocketName string AudioOutputComponentName string
AudioInputComponentName string AudioServerComponentName string
AudioOutputComponentName string AudioRelayComponentName string
AudioServerComponentName string AudioEventsComponentName string
AudioRelayComponentName string
AudioEventsComponentName string
TestSocketTimeout time.Duration TestSocketTimeout time.Duration
TestBufferSize int TestBufferSize int
@ -493,17 +473,11 @@ func DefaultAudioConfig() *AudioConfigConstants {
OutputSupervisorTimeout: 5 * time.Second, // Output monitoring timeout OutputSupervisorTimeout: 5 * time.Second, // Output monitoring timeout
BatchProcessingDelay: 5 * time.Millisecond, // Reduced batch processing delay BatchProcessingDelay: 5 * time.Millisecond, // Reduced batch processing delay
// Adaptive Buffer Configuration - Optimized for single-core RV1106G3 // System Load Configuration - Optimized for single-core RV1106G3
LowCPUThreshold: 0.40, // Adjusted for single-core ARM system LowCPUThreshold: 0.40, // Adjusted for single-core ARM system
HighCPUThreshold: 0.75, // Adjusted for single-core RV1106G3 (current load ~64%) HighCPUThreshold: 0.75, // Adjusted for single-core RV1106G3 (current load ~64%)
LowMemoryThreshold: 0.60, LowMemoryThreshold: 0.60,
HighMemoryThreshold: 0.85, // Adjusted for 200MB total memory system HighMemoryThreshold: 0.85, // Adjusted for 200MB total memory system
AdaptiveBufferTargetLatency: 10 * time.Millisecond, // Aggressive target latency for responsiveness
// Adaptive Buffer Size Configuration - Optimized for quality change bursts
AdaptiveMinBufferSize: 256, // Further increased minimum to prevent emergency mode
AdaptiveMaxBufferSize: 1024, // Much higher maximum for quality changes
AdaptiveDefaultBufferSize: 512, // Higher default for stability during bursts
CooldownPeriod: 15 * time.Second, // Reduced cooldown period CooldownPeriod: 15 * time.Second, // Reduced cooldown period
RollbackThreshold: 200 * time.Millisecond, // Lower rollback threshold RollbackThreshold: 200 * time.Millisecond, // Lower rollback threshold
@ -511,7 +485,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
MaxLatencyThreshold: 150 * time.Millisecond, // Lower max latency threshold MaxLatencyThreshold: 150 * time.Millisecond, // Lower max latency threshold
JitterThreshold: 15 * time.Millisecond, // Reduced jitter threshold JitterThreshold: 15 * time.Millisecond, // Reduced jitter threshold
LatencyOptimizationInterval: 3 * time.Second, // More frequent optimization LatencyOptimizationInterval: 3 * time.Second, // More frequent optimization
LatencyAdaptiveThreshold: 0.7, // More aggressive adaptive threshold
// Microphone Contention Configuration // Microphone Contention Configuration
MicContentionTimeout: 200 * time.Millisecond, MicContentionTimeout: 200 * time.Millisecond,
@ -531,7 +504,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
AveragingWeight: 0.7, // Weight for smoothing values (70% recent, 30% historical) AveragingWeight: 0.7, // Weight for smoothing values (70% recent, 30% historical)
ScalingFactor: 1.5, // General scaling factor for adaptive adjustments ScalingFactor: 1.5, // General scaling factor for adaptive adjustments
SmoothingFactor: 0.3, // For adaptive buffer smoothing
CPUMemoryWeight: 0.5, // CPU factor weight in combined calculations CPUMemoryWeight: 0.5, // CPU factor weight in combined calculations
MemoryWeight: 0.3, // Memory factor weight in combined calculations MemoryWeight: 0.3, // Memory factor weight in combined calculations
LatencyWeight: 0.2, // Latency factor weight in combined calculations LatencyWeight: 0.2, // Latency factor weight in combined calculations
@ -544,13 +516,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
CGOPCMBufferSize: 1920, // 1920 samples for PCM buffer (max 2ch*960) CGOPCMBufferSize: 1920, // 1920 samples for PCM buffer (max 2ch*960)
CGONanosecondsPerSecond: 1000000000.0, // 1000000000.0 for nanosecond conversions CGONanosecondsPerSecond: 1000000000.0, // 1000000000.0 for nanosecond conversions
// Batch Processing Constants - Optimized for quality change bursts
BatchProcessorFramesPerBatch: 16, // Larger batches for quality changes
BatchProcessorTimeout: 20 * time.Millisecond, // Longer timeout for bursts
BatchProcessorMaxQueueSize: 64, // Larger queue for quality changes
BatchProcessorAdaptiveThreshold: 0.6, // Lower threshold for faster adaptation
BatchProcessorThreadPinningThreshold: 8, // Lower threshold for better performance
// Output Streaming Constants - Balanced for stability // Output Streaming Constants - Balanced for stability
OutputStreamingFrameIntervalMS: 20, // 20ms frame interval (50 FPS) for stability OutputStreamingFrameIntervalMS: 20, // 20ms frame interval (50 FPS) for stability
@ -572,10 +537,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
// Input Processing Constants - Balanced for stability // Input Processing Constants - Balanced for stability
InputProcessingTimeoutMS: 10, // 10ms processing timeout threshold InputProcessingTimeoutMS: 10, // 10ms processing timeout threshold
// Adaptive Buffer Constants
AdaptiveBufferCPUMultiplier: 100, // 100 multiplier for CPU percentage
AdaptiveBufferMemoryMultiplier: 100, // 100 multiplier for memory percentage
// Socket Names // Socket Names
InputSocketName: "audio_input.sock", // Socket name for audio input IPC InputSocketName: "audio_input.sock", // Socket name for audio input IPC
OutputSocketName: "audio_output.sock", // Socket name for audio output IPC OutputSocketName: "audio_output.sock", // Socket name for audio output IPC

View File

@ -241,11 +241,6 @@ func (s *AudioControlService) SetAudioQuality(quality AudioQuality) {
SetAudioQuality(quality) SetAudioQuality(quality)
} }
// SetMicrophoneQuality sets the microphone input quality
func (s *AudioControlService) SetMicrophoneQuality(quality AudioQuality) {
SetMicrophoneQuality(quality)
}
// GetAudioQualityPresets returns available audio quality presets // GetAudioQualityPresets returns available audio quality presets
func (s *AudioControlService) GetAudioQualityPresets() map[AudioQuality]AudioConfig { func (s *AudioControlService) GetAudioQualityPresets() map[AudioQuality]AudioConfig {
return GetAudioQualityPresets() return GetAudioQualityPresets()

View File

@ -2,7 +2,6 @@ package audio
import ( import (
"runtime" "runtime"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -11,67 +10,6 @@ import (
) )
var ( var (
// Adaptive buffer metrics
adaptiveInputBufferSize = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_adaptive_input_buffer_size_bytes",
Help: "Current adaptive input buffer size in bytes",
},
)
adaptiveOutputBufferSize = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_adaptive_output_buffer_size_bytes",
Help: "Current adaptive output buffer size in bytes",
},
)
adaptiveBufferAdjustmentsTotal = promauto.NewCounter(
prometheus.CounterOpts{
Name: "jetkvm_adaptive_buffer_adjustments_total",
Help: "Total number of adaptive buffer size adjustments",
},
)
adaptiveSystemCpuPercent = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_adaptive_system_cpu_percent",
Help: "System CPU usage percentage used by adaptive buffer manager",
},
)
adaptiveSystemMemoryPercent = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_adaptive_system_memory_percent",
Help: "System memory usage percentage used by adaptive buffer manager",
},
)
// Socket buffer metrics
socketBufferSizeGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_socket_buffer_size_bytes",
Help: "Current socket buffer size in bytes",
},
[]string{"component", "buffer_type"}, // buffer_type: send, receive
)
socketBufferUtilizationGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_socket_buffer_utilization_percent",
Help: "Socket buffer utilization percentage",
},
[]string{"component", "buffer_type"}, // buffer_type: send, receive
)
socketBufferOverflowCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "jetkvm_audio_socket_buffer_overflow_total",
Help: "Total number of socket buffer overflows",
},
[]string{"component", "buffer_type"}, // buffer_type: send, receive
)
// Audio output metrics // Audio output metrics
audioFramesReceivedTotal = promauto.NewCounter( audioFramesReceivedTotal = promauto.NewCounter(
prometheus.CounterOpts{ prometheus.CounterOpts{
@ -158,10 +96,7 @@ var (
}, },
) )
// Device health metrics // Memory metrics (basic monitoring)
// Removed device health metrics - functionality not used
// Memory metrics
memoryHeapAllocBytes = promauto.NewGauge( memoryHeapAllocBytes = promauto.NewGauge(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Name: "jetkvm_audio_memory_heap_alloc_bytes", Name: "jetkvm_audio_memory_heap_alloc_bytes",
@ -169,20 +104,6 @@ var (
}, },
) )
memoryHeapSysBytes = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_audio_memory_heap_sys_bytes",
Help: "Total heap system memory in bytes",
},
)
memoryHeapObjects = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_audio_memory_heap_objects",
Help: "Number of heap objects",
},
)
memoryGCCount = promauto.NewCounter( memoryGCCount = promauto.NewCounter(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "jetkvm_audio_memory_gc_total", Name: "jetkvm_audio_memory_gc_total",
@ -190,74 +111,8 @@ var (
}, },
) )
memoryGCCPUFraction = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_audio_memory_gc_cpu_fraction",
Help: "Fraction of CPU time spent in garbage collection",
},
)
// Buffer pool efficiency metrics
bufferPoolHitRate = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_buffer_pool_hit_rate_percent",
Help: "Buffer pool hit rate percentage",
},
[]string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool
)
bufferPoolMissRate = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_buffer_pool_miss_rate_percent",
Help: "Buffer pool miss rate percentage",
},
[]string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool
)
bufferPoolUtilization = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_buffer_pool_utilization_percent",
Help: "Buffer pool utilization percentage",
},
[]string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool
)
bufferPoolThroughput = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_buffer_pool_throughput_ops_per_sec",
Help: "Buffer pool throughput in operations per second",
},
[]string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool
)
bufferPoolGetLatency = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_buffer_pool_get_latency_seconds",
Help: "Average buffer pool get operation latency in seconds",
},
[]string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool
)
bufferPoolPutLatency = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_buffer_pool_put_latency_seconds",
Help: "Average buffer pool put operation latency in seconds",
},
[]string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool
)
// Latency percentile metrics
latencyPercentile = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_latency_percentile_milliseconds",
Help: "Audio latency percentiles in milliseconds",
},
[]string{"source", "percentile"}, // source: input, output, processing; percentile: p50, p95, p99, min, max, avg
)
// Metrics update tracking // Metrics update tracking
metricsUpdateMutex sync.RWMutex lastMetricsUpdate int64
lastMetricsUpdate int64
// Counter value tracking (since prometheus counters don't have Get() method) // Counter value tracking (since prometheus counters don't have Get() method)
audioFramesReceivedValue uint64 audioFramesReceivedValue uint64
@ -269,8 +124,6 @@ var (
micBytesProcessedValue uint64 micBytesProcessedValue uint64
micConnectionDropsValue uint64 micConnectionDropsValue uint64
// Atomic counters for device health metrics - functionality removed, no longer used
// Atomic counter for memory GC // Atomic counter for memory GC
memoryGCCountValue uint32 memoryGCCountValue uint32
) )
@ -374,49 +227,12 @@ func UpdateMicrophoneMetrics(metrics UnifiedAudioMetrics) {
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
} }
// UpdateAdaptiveBufferMetrics updates Prometheus metrics with adaptive buffer information // UpdateMemoryMetrics updates basic memory metrics
func UpdateAdaptiveBufferMetrics(inputBufferSize, outputBufferSize int, cpuPercent, memoryPercent float64, adjustmentMade bool) {
metricsUpdateMutex.Lock()
defer metricsUpdateMutex.Unlock()
adaptiveInputBufferSize.Set(float64(inputBufferSize))
adaptiveOutputBufferSize.Set(float64(outputBufferSize))
adaptiveSystemCpuPercent.Set(cpuPercent)
adaptiveSystemMemoryPercent.Set(memoryPercent)
if adjustmentMade {
adaptiveBufferAdjustmentsTotal.Inc()
}
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateSocketBufferMetrics updates socket buffer metrics
func UpdateSocketBufferMetrics(component, bufferType string, size, utilization float64, overflowOccurred bool) {
metricsUpdateMutex.Lock()
defer metricsUpdateMutex.Unlock()
socketBufferSizeGauge.WithLabelValues(component, bufferType).Set(size)
socketBufferUtilizationGauge.WithLabelValues(component, bufferType).Set(utilization)
if overflowOccurred {
socketBufferOverflowCounter.WithLabelValues(component, bufferType).Inc()
}
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateDeviceHealthMetrics - Placeholder for future device health metrics
// UpdateMemoryMetrics updates memory metrics
func UpdateMemoryMetrics() { func UpdateMemoryMetrics() {
var m runtime.MemStats var m runtime.MemStats
runtime.ReadMemStats(&m) runtime.ReadMemStats(&m)
memoryHeapAllocBytes.Set(float64(m.HeapAlloc)) memoryHeapAllocBytes.Set(float64(m.HeapAlloc))
memoryHeapSysBytes.Set(float64(m.HeapSys))
memoryHeapObjects.Set(float64(m.HeapObjects))
memoryGCCPUFraction.Set(m.GCCPUFraction)
// Update GC count with delta calculation // Update GC count with delta calculation
currentGCCount := uint32(m.NumGC) currentGCCount := uint32(m.NumGC)
@ -428,31 +244,6 @@ func UpdateMemoryMetrics() {
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
} }
// UpdateBufferPoolMetrics updates buffer pool efficiency metrics
func UpdateBufferPoolMetrics(poolName string, hitRate, missRate, utilization, throughput, getLatency, putLatency float64) {
metricsUpdateMutex.Lock()
defer metricsUpdateMutex.Unlock()
bufferPoolHitRate.WithLabelValues(poolName).Set(hitRate * 100)
bufferPoolMissRate.WithLabelValues(poolName).Set(missRate * 100)
bufferPoolUtilization.WithLabelValues(poolName).Set(utilization * 100)
bufferPoolThroughput.WithLabelValues(poolName).Set(throughput)
bufferPoolGetLatency.WithLabelValues(poolName).Set(getLatency)
bufferPoolPutLatency.WithLabelValues(poolName).Set(putLatency)
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateLatencyMetrics updates latency percentile metrics
func UpdateLatencyMetrics(source, percentile string, latencyMilliseconds float64) {
metricsUpdateMutex.Lock()
defer metricsUpdateMutex.Unlock()
latencyPercentile.WithLabelValues(source, percentile).Set(latencyMilliseconds)
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// GetLastMetricsUpdate returns the timestamp of the last metrics update // GetLastMetricsUpdate returns the timestamp of the last metrics update
func GetLastMetricsUpdate() time.Time { func GetLastMetricsUpdate() time.Time {
timestamp := atomic.LoadInt64(&lastMetricsUpdate) timestamp := atomic.LoadInt64(&lastMetricsUpdate)

View File

@ -154,25 +154,6 @@ func ValidateMetricsInterval(interval time.Duration) error {
return nil return nil
} }
// ValidateAdaptiveBufferConfig validates adaptive buffer configuration
func ValidateAdaptiveBufferConfig(minSize, maxSize, defaultSize int) error {
if minSize <= 0 || maxSize <= 0 || defaultSize <= 0 {
return ErrInvalidBufferSize
}
if minSize >= maxSize {
return ErrInvalidBufferSize
}
if defaultSize < minSize || defaultSize > maxSize {
return ErrInvalidBufferSize
}
// Validate against global limits
maxBuffer := Config.SocketMaxBuffer
if maxSize > maxBuffer {
return ErrInvalidBufferSize
}
return nil
}
// ValidateInputIPCConfig validates input IPC configuration // ValidateInputIPCConfig validates input IPC configuration
func ValidateInputIPCConfig(sampleRate, channels, frameSize int) error { func ValidateInputIPCConfig(sampleRate, channels, frameSize int) error {
minSampleRate := Config.MinSampleRate minSampleRate := Config.MinSampleRate

View File

@ -1,329 +0,0 @@
package audio
import (
"sync"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// Task represents a function to be executed by a worker in the pool
type Task func()
// GoroutinePool manages a pool of reusable goroutines to reduce the overhead
// of goroutine creation and destruction
type GoroutinePool struct {
// Atomic fields must be first for proper alignment on 32-bit systems
taskCount int64 // Number of tasks processed
workerCount int64 // Current number of workers
maxIdleTime time.Duration
maxWorkers int
taskQueue chan Task
workerSem chan struct{} // Semaphore to limit concurrent workers
shutdown chan struct{}
shutdownOnce sync.Once
wg sync.WaitGroup
logger *zerolog.Logger
name string
}
// NewGoroutinePool creates a new goroutine pool with the specified parameters
func NewGoroutinePool(name string, maxWorkers int, queueSize int, maxIdleTime time.Duration) *GoroutinePool {
logger := logging.GetDefaultLogger().With().Str("component", "goroutine-pool").Str("pool", name).Logger()
pool := &GoroutinePool{
maxWorkers: maxWorkers,
maxIdleTime: maxIdleTime,
taskQueue: make(chan Task, queueSize),
workerSem: make(chan struct{}, maxWorkers),
shutdown: make(chan struct{}),
logger: &logger,
name: name,
}
// Start a supervisor goroutine to monitor pool health
go pool.supervisor()
return pool
}
// Submit adds a task to the pool for execution
// Returns true if the task was accepted, false if the queue is full
func (p *GoroutinePool) Submit(task Task) bool {
select {
case <-p.shutdown:
return false // Pool is shutting down
case p.taskQueue <- task:
// Task accepted, ensure we have a worker to process it
p.ensureWorkerAvailable()
return true
default:
// Queue is full
return false
}
}
// SubmitWithBackpressure adds a task to the pool with backpressure handling
// Returns true if task was accepted, false if dropped due to backpressure
func (p *GoroutinePool) SubmitWithBackpressure(task Task) bool {
select {
case <-p.shutdown:
return false // Pool is shutting down
case p.taskQueue <- task:
// Task accepted, ensure we have a worker to process it
p.ensureWorkerAvailable()
return true
default:
// Queue is full - apply backpressure
// Check if we're in a high-load situation
queueLen := len(p.taskQueue)
queueCap := cap(p.taskQueue)
workerCount := atomic.LoadInt64(&p.workerCount)
// If queue is >90% full and we're at max workers, drop the task
if queueLen > int(float64(queueCap)*0.9) && workerCount >= int64(p.maxWorkers) {
p.logger.Warn().Int("queue_len", queueLen).Int("queue_cap", queueCap).Msg("Dropping task due to backpressure")
return false
}
// Try one more time with a short timeout
select {
case p.taskQueue <- task:
p.ensureWorkerAvailable()
return true
case <-time.After(1 * time.Millisecond):
// Still can't submit after timeout - drop task
p.logger.Debug().Msg("Task dropped after backpressure timeout")
return false
}
}
}
// ensureWorkerAvailable makes sure at least one worker is available to process tasks
func (p *GoroutinePool) ensureWorkerAvailable() {
// Check if we already have enough workers
currentWorkers := atomic.LoadInt64(&p.workerCount)
// Only start new workers if:
// 1. We have no workers at all, or
// 2. The queue is growing and we're below max workers
queueLen := len(p.taskQueue)
if currentWorkers == 0 || (queueLen > int(currentWorkers) && currentWorkers < int64(p.maxWorkers)) {
// Try to acquire a semaphore slot without blocking
select {
case p.workerSem <- struct{}{}:
// We got a slot, start a new worker
p.startWorker()
default:
// All worker slots are taken, which means we have enough workers
}
}
}
// startWorker launches a new worker goroutine
func (p *GoroutinePool) startWorker() {
p.wg.Add(1)
atomic.AddInt64(&p.workerCount, 1)
go func() {
defer func() {
atomic.AddInt64(&p.workerCount, -1)
<-p.workerSem // Release the semaphore slot
p.wg.Done()
// Recover from panics in worker tasks
if r := recover(); r != nil {
p.logger.Error().Interface("panic", r).Msg("Worker recovered from panic")
}
}()
idleTimer := time.NewTimer(p.maxIdleTime)
defer idleTimer.Stop()
for {
select {
case <-p.shutdown:
return
case task, ok := <-p.taskQueue:
if !ok {
return // Channel closed
}
// Reset idle timer
if !idleTimer.Stop() {
<-idleTimer.C
}
idleTimer.Reset(p.maxIdleTime)
// Execute the task with panic recovery
func() {
defer func() {
if r := recover(); r != nil {
p.logger.Error().Interface("panic", r).Msg("Task execution panic recovered")
}
}()
task()
}()
atomic.AddInt64(&p.taskCount, 1)
case <-idleTimer.C:
// Worker has been idle for too long
// Keep at least 2 workers alive to handle incoming tasks without creating new goroutines
if atomic.LoadInt64(&p.workerCount) > 2 {
return
}
// For persistent workers (the minimum 2), use a longer idle timeout
// This prevents excessive worker creation/destruction cycles
idleTimer.Reset(p.maxIdleTime * 3) // Triple the idle time for persistent workers
}
}
}()
}
// supervisor monitors the pool and logs statistics periodically
func (p *GoroutinePool) supervisor() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-p.shutdown:
return
case <-ticker.C:
workers := atomic.LoadInt64(&p.workerCount)
tasks := atomic.LoadInt64(&p.taskCount)
queueLen := len(p.taskQueue)
p.logger.Debug().
Int64("workers", workers).
Int64("tasks_processed", tasks).
Int("queue_length", queueLen).
Msg("Pool statistics")
}
}
}
// Shutdown gracefully shuts down the pool
// If wait is true, it will wait for all tasks to complete
// If wait is false, it will terminate immediately, potentially leaving tasks unprocessed
func (p *GoroutinePool) Shutdown(wait bool) {
p.shutdownOnce.Do(func() {
close(p.shutdown)
if wait {
// Wait for all tasks to be processed
if len(p.taskQueue) > 0 {
p.logger.Debug().Int("remaining_tasks", len(p.taskQueue)).Msg("Waiting for tasks to complete")
}
// Close the task queue to signal no more tasks
close(p.taskQueue)
// Wait for all workers to finish
p.wg.Wait()
}
})
}
// GetStats returns statistics about the pool
func (p *GoroutinePool) GetStats() map[string]interface{} {
return map[string]interface{}{
"name": p.name,
"worker_count": atomic.LoadInt64(&p.workerCount),
"max_workers": p.maxWorkers,
"tasks_processed": atomic.LoadInt64(&p.taskCount),
"queue_length": len(p.taskQueue),
"queue_capacity": cap(p.taskQueue),
}
}
// Global pools for different audio processing tasks
var (
globalAudioProcessorPool atomic.Pointer[GoroutinePool]
globalAudioReaderPool atomic.Pointer[GoroutinePool]
globalAudioProcessorInitOnce sync.Once
globalAudioReaderInitOnce sync.Once
)
// GetAudioProcessorPool returns the global audio processor pool
func GetAudioProcessorPool() *GoroutinePool {
pool := globalAudioProcessorPool.Load()
if pool != nil {
return pool
}
globalAudioProcessorInitOnce.Do(func() {
config := Config
newPool := NewGoroutinePool(
"audio-processor",
config.MaxAudioProcessorWorkers,
config.AudioProcessorQueueSize,
config.WorkerMaxIdleTime,
)
globalAudioProcessorPool.Store(newPool)
pool = newPool
})
return globalAudioProcessorPool.Load()
}
// GetAudioReaderPool returns the global audio reader pool
func GetAudioReaderPool() *GoroutinePool {
pool := globalAudioReaderPool.Load()
if pool != nil {
return pool
}
globalAudioReaderInitOnce.Do(func() {
config := Config
newPool := NewGoroutinePool(
"audio-reader",
config.MaxAudioReaderWorkers,
config.AudioReaderQueueSize,
config.WorkerMaxIdleTime,
)
globalAudioReaderPool.Store(newPool)
pool = newPool
})
return globalAudioReaderPool.Load()
}
// SubmitAudioProcessorTask submits a task to the audio processor pool
func SubmitAudioProcessorTask(task Task) bool {
return GetAudioProcessorPool().Submit(task)
}
// SubmitAudioReaderTask submits a task to the audio reader pool
func SubmitAudioReaderTask(task Task) bool {
return GetAudioReaderPool().Submit(task)
}
// SubmitAudioProcessorTaskWithBackpressure submits a task with backpressure handling
func SubmitAudioProcessorTaskWithBackpressure(task Task) bool {
return GetAudioProcessorPool().SubmitWithBackpressure(task)
}
// SubmitAudioReaderTaskWithBackpressure submits a task with backpressure handling
func SubmitAudioReaderTaskWithBackpressure(task Task) bool {
return GetAudioReaderPool().SubmitWithBackpressure(task)
}
// ShutdownAudioPools shuts down all audio goroutine pools
func ShutdownAudioPools(wait bool) {
logger := logging.GetDefaultLogger().With().Str("component", "audio-pools").Logger()
processorPool := globalAudioProcessorPool.Load()
if processorPool != nil {
logger.Info().Msg("Shutting down audio processor pool")
processorPool.Shutdown(wait)
}
readerPool := globalAudioReaderPool.Load()
if readerPool != nil {
logger.Info().Msg("Shutting down audio reader pool")
readerPool.Shutdown(wait)
}
}

View File

@ -70,9 +70,6 @@ func (aim *AudioInputManager) Stop() {
aim.logComponentStop(AudioInputManagerComponent) aim.logComponentStop(AudioInputManagerComponent)
// Flush any pending sampled metrics before stopping
aim.flushPendingMetrics()
// Stop the IPC-based audio input // Stop the IPC-based audio input
aim.ipcManager.Stop() aim.ipcManager.Stop()
@ -120,8 +117,6 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error {
if err != nil { if err != nil {
return err return err
} }
aim.recordFrameProcessed(len(frame))
aim.updateLatency(processingTime)
return nil return nil
} }
@ -164,8 +159,6 @@ func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame)
// Update metrics // Update metrics
atomic.AddInt64(&aim.framesSent, 1) atomic.AddInt64(&aim.framesSent, 1)
aim.recordFrameProcessed(frame.Length())
aim.updateLatency(processingTime)
return nil return nil
} }

View File

@ -17,6 +17,7 @@ import (
"time" "time"
"github.com/jetkvm/kvm/internal/logging" "github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
) )
// Global audio input server instance // Global audio input server instance
@ -46,7 +47,7 @@ func RecoverGlobalAudioInputServer() {
// RunAudioInputServer runs the audio input server subprocess // RunAudioInputServer runs the audio input server subprocess
// This should be called from main() when the subprocess is detected // This should be called from main() when the subprocess is detected
func RunAudioInputServer() error { func RunAudioInputServer() error {
logger := logging.GetDefaultLogger().With().Str("component", "audio-input-server").Logger() logger := logging.GetSubsystemLogger("audio").With().Str("component", "audio-input-server").Logger()
// Parse OPUS configuration from environment variables // Parse OPUS configuration from environment variables
bitrate, complexity, vbr, signalType, bandwidth, dtx := parseOpusConfig() bitrate, complexity, vbr, signalType, bandwidth, dtx := parseOpusConfig()
@ -85,6 +86,10 @@ func RunAudioInputServer() error {
logger.Info().Msg("audio input server started, waiting for connections") logger.Info().Msg("audio input server started, waiting for connections")
// Update C trace logging based on current audio scope log level (after environment variables are processed)
traceEnabled := logger.GetLevel() <= zerolog.TraceLevel
CGOSetTraceLogging(traceEnabled)
// Set up signal handling for graceful shutdown // Set up signal handling for graceful shutdown
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()

View File

@ -119,6 +119,11 @@ func (ais *AudioInputSupervisor) startProcess() error {
// Set environment variables for IPC and OPUS configuration // Set environment variables for IPC and OPUS configuration
env := append(os.Environ(), "JETKVM_AUDIO_INPUT_IPC=true") // Enable IPC mode env := append(os.Environ(), "JETKVM_AUDIO_INPUT_IPC=true") // Enable IPC mode
env = append(env, ais.opusEnv...) // Add OPUS configuration env = append(env, ais.opusEnv...) // Add OPUS configuration
// Pass logging environment variables directly to subprocess
// The subprocess will inherit all PION_LOG_* variables from os.Environ()
// This ensures the audio scope gets the correct trace level
ais.cmd.Env = env ais.cmd.Env = env
// Set process group to allow clean termination // Set process group to allow clean termination

View File

@ -29,24 +29,22 @@ var (
// OptimizedIPCMessage represents an optimized message with pre-allocated buffers // OptimizedIPCMessage represents an optimized message with pre-allocated buffers
type OptimizedIPCMessage struct { type OptimizedIPCMessage struct {
header [17]byte // Pre-allocated header buffer (headerSize = 17) header [17]byte
data []byte // Reusable data buffer data []byte
msg UnifiedIPCMessage // Embedded message msg UnifiedIPCMessage
} }
// MessagePool manages a pool of reusable messages to reduce allocations // MessagePool manages a pool of reusable messages to reduce allocations
type MessagePool struct { type MessagePool struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) hitCount int64
hitCount int64 // Pool hit counter (atomic) missCount int64
missCount int64 // Pool miss counter (atomic)
// Other fields
pool chan *OptimizedIPCMessage pool chan *OptimizedIPCMessage
// Memory optimization fields
preallocated []*OptimizedIPCMessage // Pre-allocated messages for immediate use preallocated []*OptimizedIPCMessage
preallocSize int // Number of pre-allocated messages preallocSize int
maxPoolSize int // Maximum pool size to prevent memory bloat maxPoolSize int
mutex sync.RWMutex // Protects preallocated slice mutex sync.RWMutex
} }
// Global message pool instance // Global message pool instance
@ -152,30 +150,25 @@ func (mp *MessagePool) Put(msg *OptimizedIPCMessage) {
} }
} }
// AudioInputServer handles IPC communication for audio input processing
type AudioInputServer struct { type AudioInputServer struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) bufferSize int64
bufferSize int64 // Current buffer size (atomic) processingTime int64
processingTime int64 // Average processing time in nanoseconds (atomic) droppedFrames int64
droppedFrames int64 // Dropped frames counter (atomic) totalFrames int64
totalFrames int64 // Total frames counter (atomic)
listener net.Listener listener net.Listener
conn net.Conn conn net.Conn
mtx sync.Mutex mtx sync.Mutex
running bool running bool
// Triple-goroutine architecture messageChan chan *UnifiedIPCMessage
messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages processChan chan *UnifiedIPCMessage
processChan chan *UnifiedIPCMessage // Buffered channel for processing queue stopChan chan struct{}
stopChan chan struct{} // Stop signal for all goroutines wg sync.WaitGroup
wg sync.WaitGroup // Wait group for goroutine coordination
// Channel resizing support channelMutex sync.RWMutex
channelMutex sync.RWMutex // Protects channel recreation lastBufferSize int64
lastBufferSize int64 // Last known buffer size for change detection
// Socket buffer configuration
socketBufferConfig SocketBufferConfig socketBufferConfig SocketBufferConfig
} }
@ -211,8 +204,8 @@ func NewAudioInputServer() (*AudioInputServer, error) {
return nil, fmt.Errorf("failed to create unix socket after 3 attempts: %w", err) return nil, fmt.Errorf("failed to create unix socket after 3 attempts: %w", err)
} }
// Get initial buffer size from config // Get initial buffer size (512 frames for stability)
initialBufferSize := int64(Config.AdaptiveDefaultBufferSize) initialBufferSize := int64(512)
// Ensure minimum buffer size to prevent immediate overflow // Ensure minimum buffer size to prevent immediate overflow
// Use at least 50 frames to handle burst traffic // Use at least 50 frames to handle burst traffic
@ -256,11 +249,8 @@ func (ais *AudioInputServer) Start() error {
ais.startProcessorGoroutine() ais.startProcessorGoroutine()
ais.startMonitorGoroutine() ais.startMonitorGoroutine()
// Submit the connection acceptor to the audio reader pool // Submit the connection acceptor directly
if !SubmitAudioReaderTask(ais.acceptConnections) { go ais.acceptConnections()
// If the pool is full or shutting down, fall back to direct goroutine creation
go ais.acceptConnections()
}
return nil return nil
} }
@ -335,10 +325,8 @@ func (ais *AudioInputServer) acceptConnections() {
ais.mtx.Unlock() ais.mtx.Unlock()
// Handle this connection using the goroutine pool // Handle this connection using the goroutine pool
if !SubmitAudioReaderTask(func() { ais.handleConnection(conn) }) { // Handle the connection directly
// If the pool is full or shutting down, fall back to direct goroutine creation go ais.handleConnection(conn)
go ais.handleConnection(conn)
}
} }
} }
@ -466,13 +454,9 @@ func (ais *AudioInputServer) processMessage(msg *UnifiedIPCMessage) error {
// processOpusFrame processes an Opus audio frame // processOpusFrame processes an Opus audio frame
func (ais *AudioInputServer) processOpusFrame(data []byte) error { func (ais *AudioInputServer) processOpusFrame(data []byte) error {
// Fast path: skip empty frame check - caller should handle this
dataLen := len(data)
if dataLen == 0 {
return nil
}
// Inline validation for critical audio path - avoid function call overhead // Inline validation for critical audio path - avoid function call overhead
dataLen := len(data)
cachedMaxFrameSize := maxFrameSize
if dataLen > cachedMaxFrameSize { if dataLen > cachedMaxFrameSize {
return ErrFrameDataTooLarge return ErrFrameDataTooLarge
} }
@ -485,8 +469,85 @@ func (ais *AudioInputServer) processOpusFrame(data []byte) error {
pcmBuffer := GetBufferFromPool(cache.MaxPCMBufferSize) pcmBuffer := GetBufferFromPool(cache.MaxPCMBufferSize)
defer ReturnBufferToPool(pcmBuffer) defer ReturnBufferToPool(pcmBuffer)
// Log audio processing details periodically for monitoring
totalFrames := atomic.AddInt64(&ais.totalFrames, 1)
// Zero-cost debug logging for buffer allocation (first few operations)
// Only perform computations if trace logging is actually enabled
if totalFrames <= 5 {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
if logger.GetLevel() <= zerolog.TraceLevel {
logger.Trace().
Int("requested_buffer_size", cache.MaxPCMBufferSize).
Int("pcm_buffer_length", len(pcmBuffer)).
Int("pcm_buffer_capacity", cap(pcmBuffer)).
Msg("PCM buffer allocated from pool")
}
}
if totalFrames <= 5 || totalFrames%500 == 1 {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
if logger.GetLevel() <= zerolog.TraceLevel {
logger.Trace().
Int("opus_frame_size", dataLen).
Int("pcm_buffer_size", len(pcmBuffer)).
Int64("total_frames_processed", totalFrames).
Msg("Processing audio frame for USB Gadget output")
}
}
// Direct CGO call - avoid wrapper function overhead // Direct CGO call - avoid wrapper function overhead
_, err := CGOAudioDecodeWrite(data, pcmBuffer) start := time.Now()
framesWritten, err := CGOAudioDecodeWrite(data, pcmBuffer)
duration := time.Since(start)
// Log the result with detailed context
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
if err != nil {
// Log error with detailed context for debugging
atomic.AddInt64(&ais.droppedFrames, 1)
// Get current statistics for context
total, success, failures, recovery, lastError, _ := GetAudioDecodeWriteStats()
successRate := float64(success) / float64(total) * 100
logger.Error().
Err(err).
Int("opus_frame_size", dataLen).
Dur("processing_duration", duration).
Int64("frames_written", int64(framesWritten)).
Int64("total_operations", total).
Int64("successful_operations", success).
Int64("failed_operations", failures).
Int64("recovery_attempts", recovery).
Float64("success_rate_percent", successRate).
Str("last_error", lastError).
Int64("total_frames_processed", totalFrames).
Int64("dropped_frames", atomic.LoadInt64(&ais.droppedFrames)).
Msg("Failed to decode/write audio frame to USB Gadget")
return err
}
// Log successful operations periodically to monitor health (zero-cost when trace disabled)
if (totalFrames <= 5 || totalFrames%1000 == 1) && logger.GetLevel() <= zerolog.TraceLevel {
// Get current statistics for context (only when trace is enabled)
total, success, failures, recovery, _, _ := GetAudioDecodeWriteStats()
successRate := float64(success) / float64(total) * 100
logger.Trace().
Int("opus_frame_size", dataLen).
Int64("frames_written", int64(framesWritten)).
Int64("total_operations", total).
Int64("successful_operations", success).
Int64("failed_operations", failures).
Int64("recovery_attempts", recovery).
Float64("success_rate_percent", successRate).
Int64("total_frames_processed", totalFrames).
Int64("dropped_frames", atomic.LoadInt64(&ais.droppedFrames)).
Msg("Successfully decoded/wrote audio frame to USB Gadget")
}
return err return err
} }
@ -981,17 +1042,8 @@ func (ais *AudioInputServer) startReaderGoroutine() {
} }
} }
// Submit the reader task to the audio reader pool with backpressure // Handle the reader task directly
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() go readerTask()
if !SubmitAudioReaderTaskWithBackpressure(readerTask) {
// Task was dropped due to backpressure - this is expected under high load
// Log at debug level to avoid spam, but track the drop
logger.Debug().Msg("Audio reader task dropped due to backpressure")
// Don't fall back to unlimited goroutine creation
// Instead, let the system recover naturally
ais.wg.Done() // Decrement the wait group since we're not starting the task
}
} }
// startProcessorGoroutine starts the message processor using the goroutine pool // startProcessorGoroutine starts the message processor using the goroutine pool
@ -1073,17 +1125,8 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
} }
} }
// Submit the processor task to the audio processor pool with backpressure // Submit the processor task directly
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() go processorTask()
if !SubmitAudioProcessorTaskWithBackpressure(processorTask) {
// Task was dropped due to backpressure - this is expected under high load
// Log at debug level to avoid spam, but track the drop
logger.Debug().Msg("Audio processor task dropped due to backpressure")
// Don't fall back to unlimited goroutine creation
// Instead, let the system recover naturally
ais.wg.Done() // Decrement the wait group since we're not starting the task
}
} }
// processMessageWithRecovery processes a message with enhanced error recovery // processMessageWithRecovery processes a message with enhanced error recovery
@ -1182,9 +1225,6 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
atomic.StoreInt64(&ais.processingTime, newAvg) atomic.StoreInt64(&ais.processingTime, newAvg)
} }
// Report latency to adaptive buffer manager
ais.ReportLatency(latency)
if err != nil { if err != nil {
atomic.AddInt64(&ais.droppedFrames, 1) atomic.AddInt64(&ais.droppedFrames, 1)
} }
@ -1206,17 +1246,8 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
} }
} }
// Submit the monitor task to the audio processor pool with backpressure // Submit the monitor task directly
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() go monitorTask()
if !SubmitAudioProcessorTaskWithBackpressure(monitorTask) {
// Task was dropped due to backpressure - this is expected under high load
// Log at debug level to avoid spam, but track the drop
logger.Debug().Msg("Audio monitor task dropped due to backpressure")
// Don't fall back to unlimited goroutine creation
// Instead, let the system recover naturally
ais.wg.Done() // Decrement the wait group since we're not starting the task
}
} }
// GetServerStats returns server performance statistics // GetServerStats returns server performance statistics
@ -1227,18 +1258,13 @@ func (ais *AudioInputServer) GetServerStats() (total, dropped int64, avgProcessi
atomic.LoadInt64(&ais.bufferSize) atomic.LoadInt64(&ais.bufferSize)
} }
// UpdateBufferSize updates the buffer size (now using fixed config values) // UpdateBufferSize updates the buffer size (now using fixed values)
func (ais *AudioInputServer) UpdateBufferSize() { func (ais *AudioInputServer) UpdateBufferSize() {
// Buffer size is now fixed from config // Buffer size is now fixed at 512 frames for stability
newSize := int64(Config.AdaptiveDefaultBufferSize) newSize := int64(512)
atomic.StoreInt64(&ais.bufferSize, newSize) atomic.StoreInt64(&ais.bufferSize, newSize)
} }
// ReportLatency reports processing latency (now a no-op with fixed buffers)
func (ais *AudioInputServer) ReportLatency(latency time.Duration) {
// Latency reporting is now a no-op with fixed buffer sizes
}
// GetMessagePoolStats returns detailed statistics about the message pool // GetMessagePoolStats returns detailed statistics about the message pool
func (mp *MessagePool) GetMessagePoolStats() MessagePoolStats { func (mp *MessagePool) GetMessagePoolStats() MessagePoolStats {
mp.mutex.RLock() mp.mutex.RLock()

View File

@ -18,10 +18,9 @@ var globalOutputClientMessagePool = NewGenericMessagePool(Config.OutputMessagePo
// AudioOutputServer provides audio output IPC functionality // AudioOutputServer provides audio output IPC functionality
type AudioOutputServer struct { type AudioOutputServer struct {
// Atomic counters bufferSize int64
bufferSize int64 // Current buffer size (atomic) droppedFrames int64
droppedFrames int64 // Dropped frames counter (atomic) totalFrames int64
totalFrames int64 // Total frames counter (atomic)
listener net.Listener listener net.Listener
conn net.Conn conn net.Conn
@ -29,12 +28,10 @@ type AudioOutputServer struct {
running bool running bool
logger zerolog.Logger logger zerolog.Logger
// Message channels messageChan chan *UnifiedIPCMessage
messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages processChan chan *UnifiedIPCMessage
processChan chan *UnifiedIPCMessage // Buffered channel for processing queue wg sync.WaitGroup
wg sync.WaitGroup // Wait group for goroutine coordination
// Configuration
socketPath string socketPath string
magicNumber uint32 magicNumber uint32
} }
@ -265,6 +262,17 @@ func (s *AudioOutputServer) SendFrame(frame []byte) error {
return fmt.Errorf("no client connected") return fmt.Errorf("no client connected")
} }
// Zero-cost trace logging for frame transmission
if s.logger.GetLevel() <= zerolog.TraceLevel {
totalFrames := atomic.LoadInt64(&s.totalFrames)
if totalFrames <= 5 || totalFrames%1000 == 1 {
s.logger.Trace().
Int("frame_size", len(frame)).
Int64("total_frames_sent", totalFrames).
Msg("Sending audio frame to output client")
}
}
msg := &UnifiedIPCMessage{ msg := &UnifiedIPCMessage{
Magic: s.magicNumber, Magic: s.magicNumber,
Type: MessageTypeOpusFrame, Type: MessageTypeOpusFrame,
@ -301,9 +309,8 @@ func (s *AudioOutputServer) GetServerStats() (total, dropped int64, bufferSize i
// AudioOutputClient provides audio output IPC client functionality // AudioOutputClient provides audio output IPC client functionality
type AudioOutputClient struct { type AudioOutputClient struct {
// Atomic counters droppedFrames int64
droppedFrames int64 // Atomic counter for dropped frames totalFrames int64
totalFrames int64 // Atomic counter for total frames
conn net.Conn conn net.Conn
mtx sync.Mutex mtx sync.Mutex
@ -311,10 +318,9 @@ type AudioOutputClient struct {
logger zerolog.Logger logger zerolog.Logger
socketPath string socketPath string
magicNumber uint32 magicNumber uint32
bufferPool *AudioBufferPool // Buffer pool for memory optimization bufferPool *AudioBufferPool
// Health monitoring autoReconnect bool
autoReconnect bool // Enable automatic reconnection
} }
func NewAudioOutputClient() *AudioOutputClient { func NewAudioOutputClient() *AudioOutputClient {
@ -405,6 +411,7 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) {
} }
size := binary.LittleEndian.Uint32(optMsg.header[5:9]) size := binary.LittleEndian.Uint32(optMsg.header[5:9])
timestamp := int64(binary.LittleEndian.Uint64(optMsg.header[9:17]))
maxFrameSize := Config.OutputMaxFrameSize maxFrameSize := Config.OutputMaxFrameSize
if int(size) > maxFrameSize { if int(size) > maxFrameSize {
return nil, fmt.Errorf("received frame size validation failed: got %d bytes, maximum allowed %d bytes", size, maxFrameSize) return nil, fmt.Errorf("received frame size validation failed: got %d bytes, maximum allowed %d bytes", size, maxFrameSize)
@ -423,6 +430,19 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) {
// Note: Caller is responsible for returning frame to pool via PutAudioFrameBuffer() // Note: Caller is responsible for returning frame to pool via PutAudioFrameBuffer()
atomic.AddInt64(&c.totalFrames, 1) atomic.AddInt64(&c.totalFrames, 1)
// Zero-cost trace logging for frame reception
if c.logger.GetLevel() <= zerolog.TraceLevel {
totalFrames := atomic.LoadInt64(&c.totalFrames)
if totalFrames <= 5 || totalFrames%1000 == 1 {
c.logger.Trace().
Int("frame_size", int(size)).
Int64("timestamp", timestamp).
Int64("total_frames_received", totalFrames).
Msg("Received audio frame from output server")
}
}
return frame, nil return frame, nil
} }

View File

@ -59,12 +59,6 @@ func (bam *BaseAudioManager) resetMetrics() {
bam.metrics.AverageLatency = 0 bam.metrics.AverageLatency = 0
} }
// flushPendingMetrics is now a no-op since we use direct atomic updates
func (bam *BaseAudioManager) flushPendingMetrics() {
// No-op: metrics are now updated directly without local buffering
// This function is kept for API compatibility
}
// getBaseMetrics returns a copy of the base metrics // getBaseMetrics returns a copy of the base metrics
func (bam *BaseAudioManager) getBaseMetrics() BaseAudioMetrics { func (bam *BaseAudioManager) getBaseMetrics() BaseAudioMetrics {
return BaseAudioMetrics{ return BaseAudioMetrics{
@ -77,18 +71,6 @@ func (bam *BaseAudioManager) getBaseMetrics() BaseAudioMetrics {
} }
} }
// recordFrameProcessed records a processed frame with simplified tracking
func (bam *BaseAudioManager) recordFrameProcessed(bytes int) {
}
// recordFrameDropped records a dropped frame with simplified tracking
func (bam *BaseAudioManager) recordFrameDropped() {
}
// updateLatency updates the average latency
func (bam *BaseAudioManager) updateLatency(latency time.Duration) {
}
// logComponentStart logs component start with consistent format // logComponentStart logs component start with consistent format
func (bam *BaseAudioManager) logComponentStart(component string) { func (bam *BaseAudioManager) logComponentStart(component string) {
bam.logger.Debug().Str("component", component).Msg("starting component") bam.logger.Debug().Str("component", component).Msg("starting component")

View File

@ -104,19 +104,11 @@ func (aom *AudioOutputIPCManager) WriteOpusFrame(frame *ZeroCopyAudioFrame) erro
return fmt.Errorf("output frame validation failed: %w", err) return fmt.Errorf("output frame validation failed: %w", err)
} }
start := time.Now()
// Send frame to IPC server // Send frame to IPC server
if err := aom.server.SendFrame(frame.Data()); err != nil { if err := aom.server.SendFrame(frame.Data()); err != nil {
aom.recordFrameDropped()
return err return err
} }
// Update metrics
processingTime := time.Since(start)
aom.recordFrameProcessed(frame.Length())
aom.updateLatency(processingTime)
return nil return nil
} }
@ -130,22 +122,14 @@ func (aom *AudioOutputIPCManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFra
return fmt.Errorf("audio output server not initialized") return fmt.Errorf("audio output server not initialized")
} }
start := time.Now()
// Extract frame data // Extract frame data
frameData := frame.Data() frameData := frame.Data()
// Send frame to IPC server (zero-copy not available, use regular send) // Send frame to IPC server (zero-copy not available, use regular send)
if err := aom.server.SendFrame(frameData); err != nil { if err := aom.server.SendFrame(frameData); err != nil {
aom.recordFrameDropped()
return err return err
} }
// Update metrics
processingTime := time.Since(start)
aom.recordFrameProcessed(len(frameData))
aom.updateLatency(processingTime)
return nil return nil
} }

View File

@ -4,10 +4,12 @@ import (
"context" "context"
"os" "os"
"os/signal" "os/signal"
"strings"
"syscall" "syscall"
"time" "time"
"github.com/jetkvm/kvm/internal/logging" "github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
) )
// getEnvInt reads an integer from environment variable with a default value // getEnvInt reads an integer from environment variable with a default value
@ -15,7 +17,7 @@ import (
// RunAudioOutputServer runs the audio output server subprocess // RunAudioOutputServer runs the audio output server subprocess
// This should be called from main() when the subprocess is detected // This should be called from main() when the subprocess is detected
func RunAudioOutputServer() error { func RunAudioOutputServer() error {
logger := logging.GetDefaultLogger().With().Str("component", "audio-output-server").Logger() logger := logging.GetSubsystemLogger("audio").With().Str("component", "audio-output-server").Logger()
// Parse OPUS configuration from environment variables // Parse OPUS configuration from environment variables
bitrate, complexity, vbr, signalType, bandwidth, dtx := parseOpusConfig() bitrate, complexity, vbr, signalType, bandwidth, dtx := parseOpusConfig()
@ -52,6 +54,27 @@ func RunAudioOutputServer() error {
logger.Info().Msg("audio output server started, waiting for connections") logger.Info().Msg("audio output server started, waiting for connections")
// Update C trace logging based on current audio scope log level (after environment variables are processed)
loggerTraceEnabled := logger.GetLevel() <= zerolog.TraceLevel
// Manual check for audio scope in PION_LOG_TRACE (workaround for logging system bug)
manualTraceEnabled := false
pionTrace := os.Getenv("PION_LOG_TRACE")
if pionTrace != "" {
scopes := strings.Split(strings.ToLower(pionTrace), ",")
for _, scope := range scopes {
if strings.TrimSpace(scope) == "audio" {
manualTraceEnabled = true
break
}
}
}
// Use manual check as fallback if logging system fails
traceEnabled := loggerTraceEnabled || manualTraceEnabled
CGOSetTraceLogging(traceEnabled)
// Set up signal handling for graceful shutdown // Set up signal handling for graceful shutdown
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()

View File

@ -14,8 +14,6 @@ import (
"github.com/rs/zerolog" "github.com/rs/zerolog"
) )
// Removed unused AudioOutputStreamer struct - actual streaming uses direct functions
var ( var (
outputStreamingRunning int32 outputStreamingRunning int32
outputStreamingCancel context.CancelFunc outputStreamingCancel context.CancelFunc
@ -30,22 +28,6 @@ func getOutputStreamingLogger() *zerolog.Logger {
return outputStreamingLogger return outputStreamingLogger
} }
// Removed unused NewAudioOutputStreamer function
// Removed unused AudioOutputStreamer.Start method
// Removed unused AudioOutputStreamer.Stop method
// Removed unused AudioOutputStreamer.streamLoop method
// Removed unused AudioOutputStreamer.processingLoop method
// Removed unused AudioOutputStreamer.statisticsLoop method
// Removed unused AudioOutputStreamer.reportStatistics method
// Removed all unused AudioOutputStreamer methods
// StartAudioOutputStreaming starts audio output streaming (capturing system audio) // StartAudioOutputStreaming starts audio output streaming (capturing system audio)
func StartAudioOutputStreaming(send func([]byte)) error { func StartAudioOutputStreaming(send func([]byte)) error {
if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) { if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) {
@ -84,6 +66,7 @@ func StartAudioOutputStreaming(send func([]byte)) error {
maxConsecutiveErrors := Config.MaxConsecutiveErrors maxConsecutiveErrors := Config.MaxConsecutiveErrors
errorBackoffDelay := Config.RetryDelay errorBackoffDelay := Config.RetryDelay
maxErrorBackoff := Config.MaxRetryDelay maxErrorBackoff := Config.MaxRetryDelay
var frameCount int64
for { for {
select { select {
@ -143,11 +126,25 @@ func StartAudioOutputStreaming(send func([]byte)) error {
} }
if n > 0 { if n > 0 {
frameCount++
// Get frame buffer from pool to reduce allocations // Get frame buffer from pool to reduce allocations
frame := GetAudioFrameBuffer() frame := GetAudioFrameBuffer()
frame = frame[:n] // Resize to actual frame size frame = frame[:n] // Resize to actual frame size
copy(frame, buffer[:n]) copy(frame, buffer[:n])
// Zero-cost trace logging for output frame processing
logger := getOutputStreamingLogger()
if logger.GetLevel() <= zerolog.TraceLevel {
if frameCount <= 5 || frameCount%1000 == 1 {
logger.Trace().
Int("frame_size", n).
Int("buffer_capacity", cap(frame)).
Int64("total_frames_sent", frameCount).
Msg("Audio output frame captured and buffered")
}
}
// Validate frame before sending // Validate frame before sending
if err := ValidateAudioFrame(frame); err != nil { if err := ValidateAudioFrame(frame); err != nil {
getOutputStreamingLogger().Warn().Err(err).Msg("Frame validation failed, dropping frame") getOutputStreamingLogger().Warn().Err(err).Msg("Frame validation failed, dropping frame")
@ -159,6 +156,16 @@ func StartAudioOutputStreaming(send func([]byte)) error {
// Return buffer to pool after sending // Return buffer to pool after sending
PutAudioFrameBuffer(frame) PutAudioFrameBuffer(frame)
RecordFrameReceived(n) RecordFrameReceived(n)
// Zero-cost trace logging for successful frame transmission
if logger.GetLevel() <= zerolog.TraceLevel {
if frameCount <= 5 || frameCount%1000 == 1 {
logger.Trace().
Int("frame_size", n).
Int64("total_frames_sent", frameCount).
Msg("Audio output frame sent successfully")
}
}
} }
// Small delay to prevent busy waiting // Small delay to prevent busy waiting
time.Sleep(Config.ShortSleepDuration) time.Sleep(Config.ShortSleepDuration)

View File

@ -17,23 +17,6 @@ const (
AudioOutputSupervisorComponent = "audio-output-supervisor" AudioOutputSupervisorComponent = "audio-output-supervisor"
) )
// Restart configuration is now retrieved from centralized config
func getMaxRestartAttempts() int {
return Config.MaxRestartAttempts
}
func getRestartWindow() time.Duration {
return Config.RestartWindow
}
func getRestartDelay() time.Duration {
return Config.RestartDelay
}
func getMaxRestartDelay() time.Duration {
return Config.MaxRestartDelay
}
// AudioOutputSupervisor manages the audio output server subprocess lifecycle // AudioOutputSupervisor manages the audio output server subprocess lifecycle
type AudioOutputSupervisor struct { type AudioOutputSupervisor struct {
*BaseSupervisor *BaseSupervisor
@ -175,10 +158,10 @@ func (s *AudioOutputSupervisor) supervisionLoop() {
ProcessType: "audio output server", ProcessType: "audio output server",
Timeout: Config.OutputSupervisorTimeout, Timeout: Config.OutputSupervisorTimeout,
EnableRestart: true, EnableRestart: true,
MaxRestartAttempts: getMaxRestartAttempts(), MaxRestartAttempts: Config.MaxRestartAttempts,
RestartWindow: getRestartWindow(), RestartWindow: Config.RestartWindow,
RestartDelay: getRestartDelay(), RestartDelay: Config.RestartDelay,
MaxRestartDelay: getMaxRestartDelay(), MaxRestartDelay: Config.MaxRestartDelay,
} }
// Configure callbacks // Configure callbacks
@ -217,7 +200,13 @@ func (s *AudioOutputSupervisor) startProcess() error {
s.cmd.Stderr = os.Stderr s.cmd.Stderr = os.Stderr
// Set environment variables for OPUS configuration // Set environment variables for OPUS configuration
s.cmd.Env = append(os.Environ(), s.opusEnv...) env := append(os.Environ(), s.opusEnv...)
// Pass logging environment variables directly to subprocess
// The subprocess will inherit all PION_LOG_* variables from os.Environ()
// This ensures the audio scope gets the correct trace level
s.cmd.Env = env
// Start the process // Start the process
if err := s.cmd.Start(); err != nil { if err := s.cmd.Start(); err != nil {
@ -249,13 +238,13 @@ func (s *AudioOutputSupervisor) shouldRestart() bool {
now := time.Now() now := time.Now()
var recentAttempts []time.Time var recentAttempts []time.Time
for _, attempt := range s.restartAttempts { for _, attempt := range s.restartAttempts {
if now.Sub(attempt) < getRestartWindow() { if now.Sub(attempt) < Config.RestartWindow {
recentAttempts = append(recentAttempts, attempt) recentAttempts = append(recentAttempts, attempt)
} }
} }
s.restartAttempts = recentAttempts s.restartAttempts = recentAttempts
return len(s.restartAttempts) < getMaxRestartAttempts() return len(s.restartAttempts) < Config.MaxRestartAttempts
} }
// recordRestartAttempt records a restart attempt // recordRestartAttempt records a restart attempt
@ -274,17 +263,17 @@ func (s *AudioOutputSupervisor) calculateRestartDelay() time.Duration {
// Exponential backoff based on recent restart attempts // Exponential backoff based on recent restart attempts
attempts := len(s.restartAttempts) attempts := len(s.restartAttempts)
if attempts == 0 { if attempts == 0 {
return getRestartDelay() return Config.RestartDelay
} }
// Calculate exponential backoff: 2^attempts * base delay // Calculate exponential backoff: 2^attempts * base delay
delay := getRestartDelay() delay := Config.RestartDelay
for i := 0; i < attempts && delay < getMaxRestartDelay(); i++ { for i := 0; i < attempts && delay < Config.MaxRestartDelay; i++ {
delay *= 2 delay *= 2
} }
if delay > getMaxRestartDelay() { if delay > Config.MaxRestartDelay {
delay = getMaxRestartDelay() delay = Config.MaxRestartDelay
} }
return delay return delay

View File

@ -3,7 +3,7 @@
// Package audio provides real-time audio processing for JetKVM with low-latency streaming. // Package audio provides real-time audio processing for JetKVM with low-latency streaming.
// //
// Key components: output/input pipelines with Opus codec, adaptive buffer management, // Key components: output/input pipelines with Opus codec, buffer management,
// zero-copy frame pools, IPC communication, and process supervision. // zero-copy frame pools, IPC communication, and process supervision.
// //
// Supports four quality presets (Low/Medium/High/Ultra) with configurable bitrates. // Supports four quality presets (Low/Medium/High/Ultra) with configurable bitrates.
@ -260,97 +260,6 @@ func GetAudioConfig() AudioConfig {
return currentConfig return currentConfig
} }
// Simplified OPUS parameter lookup table
var opusParams = map[AudioQuality]struct {
complexity, vbr, signalType, bandwidth, dtx int
}{
AudioQualityLow: {Config.AudioQualityLowOpusComplexity, Config.AudioQualityLowOpusVBR, Config.AudioQualityLowOpusSignalType, Config.AudioQualityLowOpusBandwidth, Config.AudioQualityLowOpusDTX},
AudioQualityMedium: {Config.AudioQualityMediumOpusComplexity, Config.AudioQualityMediumOpusVBR, Config.AudioQualityMediumOpusSignalType, Config.AudioQualityMediumOpusBandwidth, Config.AudioQualityMediumOpusDTX},
AudioQualityHigh: {Config.AudioQualityHighOpusComplexity, Config.AudioQualityHighOpusVBR, Config.AudioQualityHighOpusSignalType, Config.AudioQualityHighOpusBandwidth, Config.AudioQualityHighOpusDTX},
AudioQualityUltra: {Config.AudioQualityUltraOpusComplexity, Config.AudioQualityUltraOpusVBR, Config.AudioQualityUltraOpusSignalType, Config.AudioQualityUltraOpusBandwidth, Config.AudioQualityUltraOpusDTX},
}
// SetMicrophoneQuality updates the current microphone quality configuration
func SetMicrophoneQuality(quality AudioQuality) {
// Validate audio quality parameter
if err := ValidateAudioQuality(quality); err != nil {
// Log validation error but don't fail - maintain backward compatibility
logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger()
logger.Warn().Err(err).Int("quality", int(quality)).Msg("invalid microphone quality, using current config")
return
}
presets := GetMicrophoneQualityPresets()
if config, exists := presets[quality]; exists {
currentMicrophoneConfig = config
// Get OPUS parameters using lookup table
params, exists := opusParams[quality]
if !exists {
// Fallback to medium quality
params = opusParams[AudioQualityMedium]
}
// Update audio input subprocess configuration dynamically without restart
logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger()
// Set new OPUS configuration for future restarts
if supervisor := GetAudioInputSupervisor(); supervisor != nil {
supervisor.SetOpusConfig(config.Bitrate*1000, params.complexity, params.vbr, params.signalType, params.bandwidth, params.dtx)
// Check if microphone is active but IPC control is broken
inputManager := getAudioInputManager()
if inputManager.IsRunning() && !supervisor.IsConnected() {
// Reconnect the IPC control channel
supervisor.Stop()
time.Sleep(50 * time.Millisecond)
if err := supervisor.Start(); err != nil {
logger.Debug().Err(err).Msg("failed to reconnect IPC control channel")
}
}
// Send dynamic configuration update to running subprocess via IPC
if supervisor.IsConnected() {
// Convert AudioConfig to UnifiedIPCOpusConfig with complete Opus parameters
opusConfig := UnifiedIPCOpusConfig{
SampleRate: config.SampleRate,
Channels: config.Channels,
FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples
Bitrate: config.Bitrate * 1000, // Convert kbps to bps
Complexity: params.complexity,
VBR: params.vbr,
SignalType: params.signalType,
Bandwidth: params.bandwidth,
DTX: params.dtx,
}
if err := supervisor.SendOpusConfig(opusConfig); err != nil {
logger.Debug().Err(err).Msg("failed to send dynamic Opus config update via IPC")
// Fallback to subprocess restart if IPC update fails
supervisor.Stop()
if err := supervisor.Start(); err != nil {
logger.Error().Err(err).Msg("failed to restart audio input subprocess after IPC update failure")
}
} else {
logger.Info().Msg("audio input quality updated dynamically via IPC")
// Reset audio input stats after config update
go func() {
time.Sleep(Config.QualityChangeSettleDelay) // Wait for quality change to settle
// Reset audio input server stats to clear persistent warnings
ResetGlobalAudioInputServerStats()
// Attempt recovery if microphone is still having issues
time.Sleep(1 * time.Second)
RecoverGlobalAudioInputServer()
}()
}
} else {
logger.Info().Bool("supervisor_running", supervisor.IsRunning()).Msg("audio input subprocess not connected, configuration will apply on next start")
}
}
}
}
// GetMicrophoneConfig returns the current microphone configuration // GetMicrophoneConfig returns the current microphone configuration
func GetMicrophoneConfig() AudioConfig { func GetMicrophoneConfig() AudioConfig {
return currentMicrophoneConfig return currentMicrophoneConfig
@ -382,10 +291,12 @@ func RecordFrameReceived(bytes int) {
// RecordFrameDropped increments the frames dropped counter with batched updates // RecordFrameDropped increments the frames dropped counter with batched updates
func RecordFrameDropped() { func RecordFrameDropped() {
atomic.AddUint64(&batchedFramesDropped, 1)
} }
// RecordConnectionDrop increments the connection drops counter with batched updates // RecordConnectionDrop increments the connection drops counter with batched updates
func RecordConnectionDrop() { func RecordConnectionDrop() {
atomic.AddUint64(&batchedConnectionDrops, 1)
} }
// flushBatchedMetrics flushes accumulated metrics to the main counters // flushBatchedMetrics flushes accumulated metrics to the main counters

View File

@ -1,217 +0,0 @@
package audio
import (
"sync"
"sync/atomic"
)
// SizedBufferPool manages a pool of buffers with size tracking
type SizedBufferPool struct {
// The underlying sync.Pool
pool sync.Pool
// Statistics for monitoring
totalBuffers atomic.Int64
totalBytes atomic.Int64
gets atomic.Int64
puts atomic.Int64
misses atomic.Int64
// Configuration
maxBufferSize int
defaultSize int
}
// NewSizedBufferPool creates a new sized buffer pool
func NewSizedBufferPool(defaultSize, maxBufferSize int) *SizedBufferPool {
pool := &SizedBufferPool{
maxBufferSize: maxBufferSize,
defaultSize: defaultSize,
}
pool.pool = sync.Pool{
New: func() interface{} {
// Track pool misses
pool.misses.Add(1)
// Create new buffer with default size
buf := make([]byte, defaultSize)
// Return pointer-like to avoid allocations
slice := buf[:0]
ptrSlice := &slice
// Track statistics
pool.totalBuffers.Add(1)
pool.totalBytes.Add(int64(cap(buf)))
return ptrSlice
},
}
return pool
}
// Get returns a buffer from the pool with at least the specified capacity
func (p *SizedBufferPool) Get(minCapacity int) []byte {
// Track gets
p.gets.Add(1)
// Get buffer from pool - handle pointer-like storage
var buf []byte
poolObj := p.pool.Get()
switch v := poolObj.(type) {
case *[]byte:
// Handle pointer-like storage from Put method
if v != nil {
buf = (*v)[:0] // Get the underlying slice
} else {
buf = make([]byte, 0, p.defaultSize)
}
case []byte:
// Handle direct slice for backward compatibility
buf = v
default:
// Fallback for unexpected types
buf = make([]byte, 0, p.defaultSize)
p.misses.Add(1)
}
// Check if buffer has sufficient capacity
if cap(buf) < minCapacity {
// Track statistics for the old buffer
p.totalBytes.Add(-int64(cap(buf)))
// Allocate new buffer with required capacity
buf = make([]byte, minCapacity)
// Track statistics for the new buffer
p.totalBytes.Add(int64(cap(buf)))
} else {
// Resize existing buffer
buf = buf[:minCapacity]
}
return buf
}
// Put returns a buffer to the pool
func (p *SizedBufferPool) Put(buf []byte) {
// Track statistics
p.puts.Add(1)
// Don't pool excessively large buffers to prevent memory bloat
if cap(buf) > p.maxBufferSize {
// Track statistics
p.totalBuffers.Add(-1)
p.totalBytes.Add(-int64(cap(buf)))
return
}
// Clear buffer contents for security
for i := range buf {
buf[i] = 0
}
// Return to pool - use pointer-like approach to avoid allocations
slice := buf[:0]
p.pool.Put(&slice)
}
// GetStats returns statistics about the buffer pool
func (p *SizedBufferPool) GetStats() (buffers, bytes, gets, puts, misses int64) {
buffers = p.totalBuffers.Load()
bytes = p.totalBytes.Load()
gets = p.gets.Load()
puts = p.puts.Load()
misses = p.misses.Load()
return
}
// BufferPoolStats contains statistics about a buffer pool
type BufferPoolStats struct {
TotalBuffers int64
TotalBytes int64
Gets int64
Puts int64
Misses int64
HitRate float64
AverageBufferSize float64
}
// GetDetailedStats returns detailed statistics about the buffer pool
func (p *SizedBufferPool) GetDetailedStats() BufferPoolStats {
buffers := p.totalBuffers.Load()
bytes := p.totalBytes.Load()
gets := p.gets.Load()
puts := p.puts.Load()
misses := p.misses.Load()
// Calculate hit rate
hitRate := 0.0
if gets > 0 {
hitRate = float64(gets-misses) / float64(gets) * 100.0
}
// Calculate average buffer size
avgSize := 0.0
if buffers > 0 {
avgSize = float64(bytes) / float64(buffers)
}
return BufferPoolStats{
TotalBuffers: buffers,
TotalBytes: bytes,
Gets: gets,
Puts: puts,
Misses: misses,
HitRate: hitRate,
AverageBufferSize: avgSize,
}
}
// Global audio buffer pools with different size classes
var (
// Small buffers (up to 4KB)
smallBufferPool = NewSizedBufferPool(1024, 4*1024)
// Medium buffers (4KB to 64KB)
mediumBufferPool = NewSizedBufferPool(8*1024, 64*1024)
// Large buffers (64KB to 1MB)
largeBufferPool = NewSizedBufferPool(64*1024, 1024*1024)
)
// GetOptimalBuffer returns a buffer from the most appropriate pool based on size
func GetOptimalBuffer(size int) []byte {
switch {
case size <= 4*1024:
return smallBufferPool.Get(size)
case size <= 64*1024:
return mediumBufferPool.Get(size)
default:
return largeBufferPool.Get(size)
}
}
// ReturnOptimalBuffer returns a buffer to the appropriate pool based on size
func ReturnOptimalBuffer(buf []byte) {
size := cap(buf)
switch {
case size <= 4*1024:
smallBufferPool.Put(buf)
case size <= 64*1024:
mediumBufferPool.Put(buf)
default:
largeBufferPool.Put(buf)
}
}
// GetAllPoolStats returns statistics for all buffer pools
func GetAllPoolStats() map[string]BufferPoolStats {
return map[string]BufferPoolStats{
"small": smallBufferPool.GetDetailedStats(),
"medium": mediumBufferPool.GetDetailedStats(),
"large": largeBufferPool.GetDetailedStats(),
}
}

View File

@ -156,23 +156,11 @@ func RecordSocketBufferMetrics(conn net.Conn, component string) {
} }
// Get current socket buffer sizes // Get current socket buffer sizes
sendSize, recvSize, err := GetSocketBufferSizes(conn) _, _, err := GetSocketBufferSizes(conn)
if err != nil { if err != nil {
// Log error but don't fail // Log error but don't fail
return return
} }
// Record buffer sizes // Socket buffer sizes recorded for debugging if needed
socketBufferSizeGauge.WithLabelValues(component, "send").Set(float64(sendSize))
socketBufferSizeGauge.WithLabelValues(component, "receive").Set(float64(recvSize))
}
// RecordSocketBufferOverflow records a socket buffer overflow event
func RecordSocketBufferOverflow(component, bufferType string) {
socketBufferOverflowCounter.WithLabelValues(component, bufferType).Inc()
}
// UpdateSocketBufferUtilization updates socket buffer utilization metrics
func UpdateSocketBufferUtilization(component, bufferType string, utilizationPercent float64) {
socketBufferUtilizationGauge.WithLabelValues(component, bufferType).Set(utilizationPercent)
} }

View File

@ -35,11 +35,6 @@ func startAudioSubprocess() error {
// Initialize validation cache for optimal performance // Initialize validation cache for optimal performance
audio.InitValidationCache() audio.InitValidationCache()
// Enable batch audio processing to reduce CGO call overhead
if err := audio.EnableBatchAudioProcessing(); err != nil {
logger.Warn().Err(err).Msg("failed to enable batch audio processing")
}
// Create audio server supervisor // Create audio server supervisor
audioSupervisor = audio.NewAudioOutputSupervisor() audioSupervisor = audio.NewAudioOutputSupervisor()
@ -108,9 +103,6 @@ func startAudioSubprocess() error {
// Stop audio relay when process exits // Stop audio relay when process exits
audio.StopAudioRelay() audio.StopAudioRelay()
// Disable batch audio processing
audio.DisableBatchAudioProcessing()
}, },
// onRestart // onRestart
func(attempt int, delay time.Duration) { func(attempt int, delay time.Duration) {