diff --git a/cmd/main.go b/cmd/main.go index fdd79eba..4281daf0 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -22,16 +22,14 @@ const ( errorDumpTemplate = "jetkvm-%s.log" ) -func program(audioOutputServer, audioInputServer *bool) { +func program() { gspt.SetProcTitle(os.Args[0] + " [app]") - kvm.Main(*audioOutputServer, *audioInputServer) + kvm.Main() } func main() { versionPtr := flag.Bool("version", false, "print version and exit") versionJSONPtr := flag.Bool("version-json", false, "print version as json and exit") - audioOutputServerPtr := flag.Bool("audio-output-server", false, "Run as audio server subprocess") - audioInputServerPtr := flag.Bool("audio-input-server", false, "Run as audio input server subprocess") flag.Parse() @@ -50,7 +48,7 @@ func main() { case "": doSupervise() case kvm.GetBuiltAppVersion(): - program(audioOutputServerPtr, audioInputServerPtr) + program() default: fmt.Printf("Invalid build version: %s != %s\n", childID, kvm.GetBuiltAppVersion()) os.Exit(1) diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go deleted file mode 100644 index 0ace0c62..00000000 --- a/internal/audio/cgo_audio.go +++ /dev/null @@ -1,607 +0,0 @@ -//go:build cgo - -package audio - -import ( - "errors" - "fmt" - "os" - "strings" - "sync" - "sync/atomic" - "time" - "unsafe" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" -) - -/* -#include "c/audio.c" -*/ -import "C" - -var ( - errAudioInitFailed = errors.New("failed to init ALSA/Opus") - errAudioReadEncode = errors.New("audio read/encode error") - errAudioDecodeWrite = errors.New("audio decode/write error") - errAudioPlaybackInit = errors.New("failed to init ALSA playback/Opus decoder") - errEmptyBuffer = errors.New("empty buffer") - errNilBuffer = errors.New("nil buffer") - errInvalidBufferPtr = errors.New("invalid buffer pointer") -) - -// Error creation functions with enhanced context -func newBufferTooSmallError(actual, required int) error { - baseErr := fmt.Errorf("buffer too small: got %d bytes, need at least %d bytes", actual, required) - return WrapWithMetadata(baseErr, "cgo_audio", "buffer_validation", map[string]interface{}{ - "actual_size": actual, - "required_size": required, - "error_type": "buffer_undersize", - }) -} - -func newBufferTooLargeError(actual, max int) error { - baseErr := fmt.Errorf("buffer too large: got %d bytes, maximum allowed %d bytes", actual, max) - return WrapWithMetadata(baseErr, "cgo_audio", "buffer_validation", map[string]interface{}{ - "actual_size": actual, - "max_size": max, - "error_type": "buffer_oversize", - }) -} - -func newAudioInitError(cErrorCode int) error { - baseErr := fmt.Errorf("%w: C error code %d", errAudioInitFailed, cErrorCode) - return WrapWithMetadata(baseErr, "cgo_audio", "initialization", map[string]interface{}{ - "c_error_code": cErrorCode, - "error_type": "init_failure", - "severity": "critical", - }) -} - -func newAudioPlaybackInitError(cErrorCode int) error { - baseErr := fmt.Errorf("%w: C error code %d", errAudioPlaybackInit, cErrorCode) - return WrapWithMetadata(baseErr, "cgo_audio", "playback_init", map[string]interface{}{ - "c_error_code": cErrorCode, - "error_type": "playback_init_failure", - "severity": "high", - }) -} - -func newAudioReadEncodeError(cErrorCode int) error { - baseErr := fmt.Errorf("%w: C error code %d", errAudioReadEncode, cErrorCode) - return WrapWithMetadata(baseErr, "cgo_audio", "read_encode", map[string]interface{}{ - "c_error_code": cErrorCode, - "error_type": "read_encode_failure", - "severity": "medium", - }) -} - -func newAudioDecodeWriteError(cErrorCode int) error { - baseErr := fmt.Errorf("%w: C error code %d", errAudioDecodeWrite, cErrorCode) - return WrapWithMetadata(baseErr, "cgo_audio", "decode_write", map[string]interface{}{ - "c_error_code": cErrorCode, - "error_type": "decode_write_failure", - "severity": "medium", - }) -} - -func cgoAudioInit() error { - // Get cached config and ensure it's updated - cache := GetCachedConfig() - cache.Update() - - // Enable C trace logging if Go audio scope trace level is active - audioLogger := logging.GetSubsystemLogger("audio") - loggerTraceEnabled := audioLogger.GetLevel() <= zerolog.TraceLevel - - // Manual check for audio scope in PION_LOG_TRACE (workaround for logging system bug) - traceEnabled := loggerTraceEnabled - if !loggerTraceEnabled { - pionTrace := os.Getenv("PION_LOG_TRACE") - if pionTrace != "" { - scopes := strings.Split(strings.ToLower(pionTrace), ",") - for _, scope := range scopes { - if strings.TrimSpace(scope) == "audio" { - traceEnabled = true - break - } - } - } - } - - CGOSetTraceLogging(traceEnabled) - - // Update C constants from cached config (atomic access, no locks) - C.update_audio_constants( - C.int(cache.opusBitrate.Load()), - C.int(cache.opusComplexity.Load()), - C.int(cache.opusVBR.Load()), - C.int(cache.opusVBRConstraint.Load()), - C.int(cache.opusSignalType.Load()), - C.int(cache.opusBandwidth.Load()), - C.int(cache.opusDTX.Load()), - C.int(16), // LSB depth for improved bit allocation - C.int(cache.sampleRate.Load()), - C.int(cache.channels.Load()), - C.int(cache.frameSize.Load()), - C.int(cache.maxPacketSize.Load()), - C.int(Config.CGOUsleepMicroseconds), - C.int(Config.CGOMaxAttempts), - C.int(Config.CGOMaxBackoffMicroseconds), - ) - - result := C.jetkvm_audio_capture_init() - if result != 0 { - return newAudioInitError(int(result)) - } - return nil -} - -func cgoAudioClose() { - C.jetkvm_audio_capture_close() -} - -// AudioConfigCache provides a comprehensive caching system for audio configuration -type AudioConfigCache struct { - // All duration fields use int32 by storing as milliseconds for optimal ARM NEON performance - maxMetricsUpdateInterval atomic.Int32 // Store as milliseconds (10s = 10K ms < int32 max) - restartWindow atomic.Int32 // Store as milliseconds (5min = 300K ms < int32 max) - restartDelay atomic.Int32 // Store as milliseconds - maxRestartDelay atomic.Int32 // Store as milliseconds - - // Short-duration fields stored as milliseconds with int32 - minFrameDuration atomic.Int32 // Store as milliseconds (10ms = 10 ms < int32 max) - maxFrameDuration atomic.Int32 // Store as milliseconds (100ms = 100 ms < int32 max) - maxLatency atomic.Int32 // Store as milliseconds (500ms = 500 ms < int32 max) - minMetricsUpdateInterval atomic.Int32 // Store as milliseconds (100ms = 100 ms < int32 max) - - // Atomic int32 fields for lock-free access to frequently used values - minReadEncodeBuffer atomic.Int32 - maxDecodeWriteBuffer atomic.Int32 - maxPacketSize atomic.Int32 - maxPCMBufferSize atomic.Int32 - opusBitrate atomic.Int32 - opusComplexity atomic.Int32 - opusVBR atomic.Int32 - opusVBRConstraint atomic.Int32 - opusSignalType atomic.Int32 - opusBandwidth atomic.Int32 - opusDTX atomic.Int32 - sampleRate atomic.Int32 - channels atomic.Int32 - frameSize atomic.Int32 - - // Additional cached values for validation functions - maxAudioFrameSize atomic.Int32 - maxChannels atomic.Int32 - minOpusBitrate atomic.Int32 - maxOpusBitrate atomic.Int32 - - // Socket and buffer configuration values - socketMaxBuffer atomic.Int32 - socketMinBuffer atomic.Int32 - inputProcessingTimeoutMS atomic.Int32 - maxRestartAttempts atomic.Int32 - - // Mutex for updating the cache - mutex sync.RWMutex - lastUpdate time.Time - cacheExpiry time.Duration - initialized atomic.Bool - - // Pre-allocated errors to avoid allocations in hot path - bufferTooSmallReadEncode error - bufferTooLargeDecodeWrite error -} - -// Global audio config cache instance -var globalAudioConfigCache = &AudioConfigCache{ - cacheExpiry: 30 * time.Second, -} - -// GetCachedConfig returns the global audio config cache instance -func GetCachedConfig() *AudioConfigCache { - return globalAudioConfigCache -} - -// Update refreshes the cached config values if needed -func (c *AudioConfigCache) Update() { - // Fast path: if cache is initialized and not expired, return immediately - if c.initialized.Load() { - c.mutex.RLock() - cacheExpired := time.Since(c.lastUpdate) > c.cacheExpiry - c.mutex.RUnlock() - if !cacheExpired { - return - } - } - - // Slow path: update cache - c.mutex.Lock() - defer c.mutex.Unlock() - - // Double-check after acquiring lock - if !c.initialized.Load() || time.Since(c.lastUpdate) > c.cacheExpiry { - // Update atomic values for lock-free access - CGO values - c.minReadEncodeBuffer.Store(int32(Config.MinReadEncodeBuffer)) - c.maxDecodeWriteBuffer.Store(int32(Config.MaxDecodeWriteBuffer)) - c.maxPacketSize.Store(int32(Config.CGOMaxPacketSize)) - c.maxPCMBufferSize.Store(int32(Config.MaxPCMBufferSize)) - c.opusBitrate.Store(int32(Config.CGOOpusBitrate)) - c.opusComplexity.Store(int32(Config.CGOOpusComplexity)) - c.opusVBR.Store(int32(Config.CGOOpusVBR)) - c.opusVBRConstraint.Store(int32(Config.CGOOpusVBRConstraint)) - c.opusSignalType.Store(int32(Config.CGOOpusSignalType)) - c.opusBandwidth.Store(int32(Config.CGOOpusBandwidth)) - c.opusDTX.Store(int32(Config.CGOOpusDTX)) - c.sampleRate.Store(int32(Config.CGOSampleRate)) - c.channels.Store(int32(Config.CGOChannels)) - c.frameSize.Store(int32(Config.CGOFrameSize)) - - // Update additional validation values - c.maxAudioFrameSize.Store(int32(Config.MaxAudioFrameSize)) - c.maxChannels.Store(int32(Config.MaxChannels)) - - // Store duration fields as milliseconds for int32 optimization - c.minFrameDuration.Store(int32(Config.MinFrameDuration / time.Millisecond)) - c.maxFrameDuration.Store(int32(Config.MaxFrameDuration / time.Millisecond)) - c.maxLatency.Store(int32(Config.MaxLatency / time.Millisecond)) - c.minMetricsUpdateInterval.Store(int32(Config.MinMetricsUpdateInterval / time.Millisecond)) - c.maxMetricsUpdateInterval.Store(int32(Config.MaxMetricsUpdateInterval / time.Millisecond)) - c.restartWindow.Store(int32(Config.RestartWindow / time.Millisecond)) - c.restartDelay.Store(int32(Config.RestartDelay / time.Millisecond)) - c.maxRestartDelay.Store(int32(Config.MaxRestartDelay / time.Millisecond)) - c.minOpusBitrate.Store(int32(Config.MinOpusBitrate)) - c.maxOpusBitrate.Store(int32(Config.MaxOpusBitrate)) - - // Pre-allocate common errors - c.bufferTooSmallReadEncode = newBufferTooSmallError(0, Config.MinReadEncodeBuffer) - c.bufferTooLargeDecodeWrite = newBufferTooLargeError(Config.MaxDecodeWriteBuffer+1, Config.MaxDecodeWriteBuffer) - - c.lastUpdate = time.Now() - c.initialized.Store(true) - - c.lastUpdate = time.Now() - c.initialized.Store(true) - - // Update the global validation cache as well - if cachedMaxFrameSize != 0 { - cachedMaxFrameSize = Config.MaxAudioFrameSize - } - } -} - -// GetMinReadEncodeBuffer returns the cached MinReadEncodeBuffer value -func (c *AudioConfigCache) GetMinReadEncodeBuffer() int { - return int(c.minReadEncodeBuffer.Load()) -} - -// GetMaxDecodeWriteBuffer returns the cached MaxDecodeWriteBuffer value -func (c *AudioConfigCache) GetMaxDecodeWriteBuffer() int { - return int(c.maxDecodeWriteBuffer.Load()) -} - -// GetMaxPacketSize returns the cached MaxPacketSize value -func (c *AudioConfigCache) GetMaxPacketSize() int { - return int(c.maxPacketSize.Load()) -} - -// GetMaxPCMBufferSize returns the cached MaxPCMBufferSize value -func (c *AudioConfigCache) GetMaxPCMBufferSize() int { - return int(c.maxPCMBufferSize.Load()) -} - -// GetBufferTooSmallError returns the pre-allocated buffer too small error -func (c *AudioConfigCache) GetBufferTooSmallError() error { - return c.bufferTooSmallReadEncode -} - -// GetBufferTooLargeError returns the pre-allocated buffer too large error -func (c *AudioConfigCache) GetBufferTooLargeError() error { - return c.bufferTooLargeDecodeWrite -} - -func cgoAudioReadEncode(buf []byte) (int, error) { - // Minimal buffer validation - assume caller provides correct size - if len(buf) == 0 { - return 0, errEmptyBuffer - } - - // Direct CGO call - hotpath optimization - n := C.jetkvm_audio_read_encode(unsafe.Pointer(&buf[0])) - - // Fast path for success - if n > 0 { - return int(n), nil - } - - // Error handling with static errors - if n < 0 { - if n == -1 { - return 0, errAudioInitFailed - } - return 0, errAudioReadEncode - } - - return 0, nil -} - -// Audio playback functions -func cgoAudioPlaybackInit() error { - // Get cached config and ensure it's updated - cache := GetCachedConfig() - cache.Update() - - // Enable C trace logging if Go audio scope trace level is active - audioLogger := logging.GetSubsystemLogger("audio") - CGOSetTraceLogging(audioLogger.GetLevel() <= zerolog.TraceLevel) - - // No need to update C constants here as they're already set in cgoAudioInit - - ret := C.jetkvm_audio_playback_init() - if ret != 0 { - return newAudioPlaybackInitError(int(ret)) - } - return nil -} - -func cgoAudioPlaybackClose() { - C.jetkvm_audio_playback_close() -} - -// Audio decode/write metrics for monitoring USB Gadget audio success -var ( - audioDecodeWriteTotal atomic.Int64 - audioDecodeWriteSuccess atomic.Int64 - audioDecodeWriteFailures atomic.Int64 - audioDecodeWriteRecovery atomic.Int64 - audioDecodeWriteLastError atomic.Value - audioDecodeWriteLastTime atomic.Int64 -) - -// GetAudioDecodeWriteStats returns current audio decode/write statistics -func GetAudioDecodeWriteStats() (total, success, failures, recovery int64, lastError string, lastTime time.Time) { - total = audioDecodeWriteTotal.Load() - success = audioDecodeWriteSuccess.Load() - failures = audioDecodeWriteFailures.Load() - recovery = audioDecodeWriteRecovery.Load() - - if err := audioDecodeWriteLastError.Load(); err != nil { - lastError = err.(string) - } - - lastTimeNano := audioDecodeWriteLastTime.Load() - if lastTimeNano > 0 { - lastTime = time.Unix(0, lastTimeNano) - } - - return -} - -func cgoAudioDecodeWrite(buf []byte) (int, error) { - start := time.Now() - audioDecodeWriteTotal.Add(1) - audioDecodeWriteLastTime.Store(start.UnixNano()) - - // Minimal validation - assume caller provides correct size - if len(buf) == 0 { - audioDecodeWriteFailures.Add(1) - audioDecodeWriteLastError.Store("empty buffer") - return 0, errEmptyBuffer - } - - // Direct CGO call - hotpath optimization - n := int(C.jetkvm_audio_decode_write(unsafe.Pointer(&buf[0]), C.int(len(buf)))) - - // Fast path for success - if n >= 0 { - audioDecodeWriteSuccess.Add(1) - return n, nil - } - - audioDecodeWriteFailures.Add(1) - var errMsg string - var err error - - switch n { - case -1: - errMsg = "audio system not initialized" - err = errAudioInitFailed - case -2: - errMsg = "audio device error or recovery failed" - err = errAudioDecodeWrite - audioDecodeWriteRecovery.Add(1) - default: - errMsg = fmt.Sprintf("unknown error code %d", n) - err = errAudioDecodeWrite - } - - audioDecodeWriteLastError.Store(errMsg) - - return 0, err -} - -// updateOpusEncoderParams dynamically updates OPUS encoder parameters -func updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx int) error { - result := C.update_opus_encoder_params( - C.int(bitrate), - C.int(complexity), - C.int(vbr), - C.int(vbrConstraint), - C.int(signalType), - C.int(bandwidth), - C.int(dtx), - ) - if result != 0 { - return fmt.Errorf("failed to update OPUS encoder parameters: C error code %d", result) - } - return nil -} - -// Buffer pool for reusing buffers in CGO functions -var ( - // Simple buffer pool for PCM data - pcmBufferPool = NewAudioBufferPool(Config.MaxPCMBufferSize) - - // Track buffer pool usage - cgoBufferPoolGets atomic.Int64 - cgoBufferPoolPuts atomic.Int64 - // Batch processing statistics - only enabled in debug builds - batchProcessingCount atomic.Int64 - batchFrameCount atomic.Int64 - batchProcessingTime atomic.Int64 -) - -// GetBufferFromPool gets a buffer from the pool with at least the specified capacity -func GetBufferFromPool(minCapacity int) []byte { - cgoBufferPoolGets.Add(1) - // Use simple fixed-size buffer for PCM data - return pcmBufferPool.Get() -} - -// ReturnBufferToPool returns a buffer to the pool -func ReturnBufferToPool(buf []byte) { - cgoBufferPoolPuts.Add(1) - pcmBufferPool.Put(buf) -} - -// ReadEncodeWithPooledBuffer reads audio data and encodes it using a buffer from the pool -func ReadEncodeWithPooledBuffer() ([]byte, int, error) { - cache := GetCachedConfig() - cache.Update() - - bufferSize := cache.GetMinReadEncodeBuffer() - if bufferSize == 0 { - bufferSize = 1500 - } - - buf := GetBufferFromPool(bufferSize) - n, err := cgoAudioReadEncode(buf) - if err != nil { - ReturnBufferToPool(buf) - return nil, 0, err - } - - return buf[:n], n, nil -} - -// DecodeWriteWithPooledBuffer decodes and writes audio data using a pooled buffer -func DecodeWriteWithPooledBuffer(data []byte) (int, error) { - if len(data) == 0 { - return 0, errEmptyBuffer - } - - cache := GetCachedConfig() - cache.Update() - - maxPacketSize := cache.GetMaxPacketSize() - if len(data) > maxPacketSize { - return 0, newBufferTooLargeError(len(data), maxPacketSize) - } - - pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize()) - defer ReturnBufferToPool(pcmBuffer) - - return CGOAudioDecodeWrite(data, pcmBuffer) -} - -// GetBatchProcessingStats returns statistics about batch processing -func GetBatchProcessingStats() (count, frames, avgTimeUs int64) { - count = batchProcessingCount.Load() - frames = batchFrameCount.Load() - totalTime := batchProcessingTime.Load() - - // Calculate average time per batch - if count > 0 { - avgTimeUs = totalTime / count - } - - return count, frames, avgTimeUs -} - -// cgoAudioDecodeWriteWithBuffers decodes opus data and writes to PCM buffer -// This implementation uses separate buffers for opus data and PCM output -func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) { - start := time.Now() - audioDecodeWriteTotal.Add(1) - audioDecodeWriteLastTime.Store(start.UnixNano()) - - // Validate input - if len(opusData) == 0 { - audioDecodeWriteFailures.Add(1) - audioDecodeWriteLastError.Store("empty opus data") - return 0, errEmptyBuffer - } - if cap(pcmBuffer) == 0 { - audioDecodeWriteFailures.Add(1) - audioDecodeWriteLastError.Store("empty pcm buffer capacity") - return 0, errEmptyBuffer - } - - // Get cached config - cache := GetCachedConfig() - cache.Update() - - // Ensure data doesn't exceed max packet size - maxPacketSize := cache.GetMaxPacketSize() - if len(opusData) > maxPacketSize { - audioDecodeWriteFailures.Add(1) - errMsg := fmt.Sprintf("opus packet too large: %d > %d", len(opusData), maxPacketSize) - audioDecodeWriteLastError.Store(errMsg) - return 0, newBufferTooLargeError(len(opusData), maxPacketSize) - } - - // Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is never nil for non-empty slices - n := int(C.jetkvm_audio_decode_write(unsafe.Pointer(&opusData[0]), C.int(len(opusData)))) - - // Fast path for success case - if n >= 0 { - audioDecodeWriteSuccess.Add(1) - return n, nil - } - - audioDecodeWriteFailures.Add(1) - var errMsg string - var err error - - switch n { - case -1: - errMsg = "audio system not initialized" - err = errAudioInitFailed - case -2: - errMsg = "audio device error or recovery failed" - err = errAudioDecodeWrite - audioDecodeWriteRecovery.Add(1) - default: - errMsg = fmt.Sprintf("unknown error code %d", n) - err = newAudioDecodeWriteError(n) - } - - audioDecodeWriteLastError.Store(errMsg) - - return 0, err -} - -func CGOAudioInit() error { return cgoAudioInit() } -func CGOAudioClose() { cgoAudioClose() } -func CGOAudioReadEncode(buf []byte) (int, error) { return cgoAudioReadEncode(buf) } -func CGOAudioPlaybackInit() error { return cgoAudioPlaybackInit() } -func CGOAudioPlaybackClose() { cgoAudioPlaybackClose() } - -func CGOAudioDecodeWrite(opusData []byte, pcmBuffer []byte) (int, error) { - return cgoAudioDecodeWriteWithBuffers(opusData, pcmBuffer) -} -func CGOUpdateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx int) error { - return updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx) -} - -func CGOSetTraceLogging(enabled bool) { - var cEnabled C.int - if enabled { - cEnabled = 1 - } else { - cEnabled = 0 - } - C.set_trace_logging(cEnabled) -} diff --git a/internal/audio/core_handlers.go b/internal/audio/core_handlers.go index d38256d2..71e1e5aa 100644 --- a/internal/audio/core_handlers.go +++ b/internal/audio/core_handlers.go @@ -236,31 +236,6 @@ func (s *AudioControlService) GetMicrophoneStatus() map[string]interface{} { } } -// SetAudioQuality is deprecated - audio quality is now fixed at optimal settings -func (s *AudioControlService) SetAudioQuality(quality int) { - // No-op: quality is fixed at optimal configuration -} - -// GetAudioQualityPresets is deprecated - returns empty map -func (s *AudioControlService) GetAudioQualityPresets() map[int]AudioConfig { - return map[int]AudioConfig{} -} - -// GetMicrophoneQualityPresets is deprecated - returns empty map -func (s *AudioControlService) GetMicrophoneQualityPresets() map[int]AudioConfig { - return map[int]AudioConfig{} -} - -// GetCurrentAudioQuality returns the current audio quality configuration -func (s *AudioControlService) GetCurrentAudioQuality() AudioConfig { - return GetAudioConfig() -} - -// GetCurrentMicrophoneQuality returns the current microphone quality configuration -func (s *AudioControlService) GetCurrentMicrophoneQuality() AudioConfig { - return GetMicrophoneConfig() -} - // SubscribeToAudioEvents subscribes to audio events via WebSocket func (s *AudioControlService) SubscribeToAudioEvents(connectionID string, wsCon *websocket.Conn, runCtx context.Context, logger *zerolog.Logger) { logger.Info().Msg("client subscribing to audio events") diff --git a/internal/audio/core_metrics.go b/internal/audio/core_metrics.go index ab71ab88..f7f7eec5 100644 --- a/internal/audio/core_metrics.go +++ b/internal/audio/core_metrics.go @@ -139,19 +139,6 @@ type UnifiedAudioMetrics struct { AverageLatency time.Duration `json:"average_latency"` } -// convertAudioMetricsToUnified converts AudioMetrics to UnifiedAudioMetrics -func convertAudioMetricsToUnified(metrics AudioMetrics) UnifiedAudioMetrics { - return UnifiedAudioMetrics{ - FramesReceived: metrics.FramesReceived, - FramesDropped: metrics.FramesDropped, - FramesSent: 0, // AudioMetrics doesn't have FramesSent - BytesProcessed: metrics.BytesProcessed, - ConnectionDrops: metrics.ConnectionDrops, - LastFrameTime: metrics.LastFrameTime, - AverageLatency: metrics.AverageLatency, - } -} - // convertAudioInputMetricsToUnified converts AudioInputMetrics to UnifiedAudioMetrics func convertAudioInputMetricsToUnified(metrics AudioInputMetrics) UnifiedAudioMetrics { return UnifiedAudioMetrics{ diff --git a/internal/audio/core_metrics_registry.go b/internal/audio/core_metrics_registry.go index b842af08..2573d29c 100644 --- a/internal/audio/core_metrics_registry.go +++ b/internal/audio/core_metrics_registry.go @@ -12,7 +12,6 @@ import ( // This eliminates duplication between session-specific and global managers type MetricsRegistry struct { mu sync.RWMutex - audioMetrics AudioMetrics audioInputMetrics AudioInputMetrics lastUpdate int64 // Unix timestamp } @@ -32,17 +31,6 @@ func GetMetricsRegistry() *MetricsRegistry { return globalMetricsRegistry } -// UpdateAudioMetrics updates the centralized audio output metrics -func (mr *MetricsRegistry) UpdateAudioMetrics(metrics AudioMetrics) { - mr.mu.Lock() - mr.audioMetrics = metrics - mr.lastUpdate = time.Now().Unix() - mr.mu.Unlock() - - // Update Prometheus metrics directly to avoid circular dependency - UpdateAudioMetrics(convertAudioMetricsToUnified(metrics)) -} - // UpdateAudioInputMetrics updates the centralized audio input metrics func (mr *MetricsRegistry) UpdateAudioInputMetrics(metrics AudioInputMetrics) { mr.mu.Lock() @@ -54,13 +42,6 @@ func (mr *MetricsRegistry) UpdateAudioInputMetrics(metrics AudioInputMetrics) { UpdateMicrophoneMetrics(convertAudioInputMetricsToUnified(metrics)) } -// GetAudioMetrics returns the current audio output metrics -func (mr *MetricsRegistry) GetAudioMetrics() AudioMetrics { - mr.mu.RLock() - defer mr.mu.RUnlock() - return mr.audioMetrics -} - // GetAudioInputMetrics returns the current audio input metrics func (mr *MetricsRegistry) GetAudioInputMetrics() AudioInputMetrics { mr.mu.RLock() @@ -93,12 +74,6 @@ func (mr *MetricsRegistry) StartMetricsCollector() { metrics := globalManager.GetMetrics() mr.UpdateAudioInputMetrics(metrics) } - - // Collect audio output metrics from global audio output manager - // Note: We need to get metrics from the actual audio output system - // For now, we'll use the global metrics variable from quality_presets.go - globalAudioMetrics := GetGlobalAudioMetrics() - mr.UpdateAudioMetrics(globalAudioMetrics) } }() } diff --git a/internal/audio/core_validation.go b/internal/audio/core_validation.go index 3fa296cc..5f695d2f 100644 --- a/internal/audio/core_validation.go +++ b/internal/audio/core_validation.go @@ -287,45 +287,6 @@ func ValidateFrameDuration(duration time.Duration) error { return nil } -// ValidateAudioConfigComplete performs comprehensive audio configuration validation -// Uses optimized validation functions that leverage AudioConfigCache -func ValidateAudioConfigComplete(config AudioConfig) error { - // Fast path: Check if all values match the current cached configuration - cache := Config - cachedSampleRate := cache.SampleRate - cachedChannels := cache.Channels - cachedBitrate := cache.OpusBitrate / 1000 // Convert from bps to kbps - cachedFrameSize := cache.FrameSize - - // Only do this calculation if we have valid cached values - if cachedSampleRate > 0 && cachedChannels > 0 && cachedBitrate > 0 && cachedFrameSize > 0 { - cachedDuration := time.Duration(cachedFrameSize) * time.Second / time.Duration(cachedSampleRate) - - // Most common case: validating the current configuration - if config.SampleRate == cachedSampleRate && - config.Channels == cachedChannels && - config.Bitrate == cachedBitrate && - config.FrameSize == cachedDuration { - return nil - } - } - - // Slower path: validate each parameter individually - if err := ValidateBitrate(config.Bitrate); err != nil { - return fmt.Errorf("bitrate validation failed: %w", err) - } - if err := ValidateSampleRate(config.SampleRate); err != nil { - return fmt.Errorf("sample rate validation failed: %w", err) - } - if err := ValidateChannelCount(config.Channels); err != nil { - return fmt.Errorf("channel count validation failed: %w", err) - } - if err := ValidateFrameDuration(config.FrameSize); err != nil { - return fmt.Errorf("frame duration validation failed: %w", err) - } - return nil -} - // ValidateAudioConfigConstants validates audio configuration constants func ValidateAudioConfigConstants(config *AudioConfigConstants) error { // Quality validation removed - using fixed optimal configuration diff --git a/internal/audio/input_api.go b/internal/audio/input_api.go index a6398263..66f64d6d 100644 --- a/internal/audio/input_api.go +++ b/internal/audio/input_api.go @@ -21,7 +21,7 @@ type AudioInputInterface interface { // GetSupervisor returns the audio input supervisor for advanced management func (m *AudioInputManager) GetSupervisor() *AudioInputSupervisor { - return m.ipcManager.GetSupervisor() + return GetAudioInputSupervisor() } // getAudioInputManager returns the audio input manager diff --git a/internal/audio/input_microphone_manager.go b/internal/audio/input_microphone_manager.go index 355b6d77..1ec702e4 100644 --- a/internal/audio/input_microphone_manager.go +++ b/internal/audio/input_microphone_manager.go @@ -26,7 +26,6 @@ type AudioInputMetrics struct { // AudioInputManager manages microphone input stream using IPC mode only type AudioInputManager struct { *BaseAudioManager - ipcManager *AudioInputIPCManager framesSent int64 // Input-specific metric } @@ -35,10 +34,18 @@ func NewAudioInputManager() *AudioInputManager { logger := logging.GetDefaultLogger().With().Str("component", AudioInputManagerComponent).Logger() return &AudioInputManager{ BaseAudioManager: NewBaseAudioManager(logger), - ipcManager: NewAudioInputIPCManager(), } } +// getClient returns the audio input client from the global supervisor +func (aim *AudioInputManager) getClient() *AudioInputClient { + supervisor := GetAudioInputSupervisor() + if supervisor == nil { + return nil + } + return supervisor.GetClient() +} + // Start begins processing microphone input func (aim *AudioInputManager) Start() error { if !aim.setRunning(true) { @@ -47,15 +54,22 @@ func (aim *AudioInputManager) Start() error { aim.logComponentStart(AudioInputManagerComponent) - // Start the IPC-based audio input - err := aim.ipcManager.Start() - if err != nil { - aim.logComponentError(AudioInputManagerComponent, err, "failed to start component") - // Ensure proper cleanup on error + // Ensure supervisor and client are available + supervisor := GetAudioInputSupervisor() + if supervisor == nil { aim.setRunning(false) - // Reset metrics on failed start - aim.resetMetrics() - return err + return fmt.Errorf("audio input supervisor not available") + } + + // Start the supervisor if not already running + if !supervisor.IsRunning() { + err := supervisor.Start() + if err != nil { + aim.logComponentError(AudioInputManagerComponent, err, "failed to start supervisor") + aim.setRunning(false) + aim.resetMetrics() + return err + } } aim.logComponentStarted(AudioInputManagerComponent) @@ -70,8 +84,8 @@ func (aim *AudioInputManager) Stop() { aim.logComponentStop(AudioInputManagerComponent) - // Stop the IPC-based audio input - aim.ipcManager.Stop() + // Note: We don't stop the supervisor here as it may be shared + // The supervisor lifecycle is managed by the main process aim.logComponentStopped(AudioInputManagerComponent) } @@ -99,9 +113,15 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error { return fmt.Errorf("input frame validation failed: %w", err) } + // Get client from supervisor + client := aim.getClient() + if client == nil { + return fmt.Errorf("audio input client not available") + } + // Track end-to-end latency from WebRTC to IPC startTime := time.Now() - err := aim.ipcManager.WriteOpusFrame(frame) + err := client.SendFrame(frame) processingTime := time.Since(startTime) // Log high latency warnings @@ -135,9 +155,16 @@ func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame) return nil } + // Get client from supervisor + client := aim.getClient() + if client == nil { + atomic.AddInt64(&aim.metrics.FramesDropped, 1) + return fmt.Errorf("audio input client not available") + } + // Track end-to-end latency from WebRTC to IPC startTime := time.Now() - err := aim.ipcManager.WriteOpusFrameZeroCopy(frame) + err := client.SendFrameZeroCopy(frame) processingTime := time.Since(startTime) // Log high latency warnings @@ -172,8 +199,21 @@ func (aim *AudioInputManager) GetComprehensiveMetrics() map[string]interface{} { // Get base metrics baseMetrics := aim.GetMetrics() - // Get detailed IPC metrics - ipcMetrics, detailedStats := aim.ipcManager.GetDetailedMetrics() + // Get client stats if available + var clientStats map[string]interface{} + client := aim.getClient() + if client != nil { + total, dropped := client.GetFrameStats() + clientStats = map[string]interface{}{ + "frames_sent": total, + "frames_dropped": dropped, + } + } else { + clientStats = map[string]interface{}{ + "frames_sent": 0, + "frames_dropped": 0, + } + } comprehensiveMetrics := map[string]interface{}{ "manager": map[string]interface{}{ @@ -184,14 +224,7 @@ func (aim *AudioInputManager) GetComprehensiveMetrics() map[string]interface{} { "last_frame_time": baseMetrics.LastFrameTime, "running": aim.IsRunning(), }, - "ipc": map[string]interface{}{ - "frames_sent": ipcMetrics.FramesSent, - "frames_dropped": ipcMetrics.FramesDropped, - "bytes_processed": ipcMetrics.BytesProcessed, - "average_latency_ms": float64(ipcMetrics.AverageLatency.Nanoseconds()) / 1e6, - "last_frame_time": ipcMetrics.LastFrameTime, - }, - "detailed": detailedStats, + "client": clientStats, } return comprehensiveMetrics @@ -205,17 +238,14 @@ func (aim *AudioInputManager) IsRunning() bool { return true } - // If internal state says not running, check for existing system processes - // This prevents duplicate subprocess creation when a process already exists - if aim.ipcManager != nil { - supervisor := aim.ipcManager.GetSupervisor() - if supervisor != nil { - if existingPID, exists := supervisor.HasExistingProcess(); exists { - aim.logger.Info().Int("existing_pid", existingPID).Msg("Found existing audio input server process") - // Update internal state to reflect reality - aim.setRunning(true) - return true - } + // If internal state says not running, check supervisor + supervisor := GetAudioInputSupervisor() + if supervisor != nil { + if existingPID, exists := supervisor.HasExistingProcess(); exists { + aim.logger.Info().Int("existing_pid", existingPID).Msg("Found existing audio input server process") + // Update internal state to reflect reality + aim.setRunning(true) + return true } } @@ -228,5 +258,12 @@ func (aim *AudioInputManager) IsReady() bool { if !aim.IsRunning() { return false } - return aim.ipcManager.IsReady() + + // Check if client is connected + client := aim.getClient() + if client == nil { + return false + } + + return client.IsConnected() } diff --git a/internal/audio/input_server_main.go b/internal/audio/input_server_main.go deleted file mode 100644 index 8b67e0f4..00000000 --- a/internal/audio/input_server_main.go +++ /dev/null @@ -1,114 +0,0 @@ -//go:build cgo -// +build cgo - -package audio - -/* -#cgo pkg-config: alsa -#cgo LDFLAGS: -lopus -*/ -import "C" - -import ( - "context" - "os" - "os/signal" - "syscall" - "time" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" -) - -// Global audio input server instance -var globalAudioInputServer *AudioInputServer - -// GetGlobalAudioInputServer returns the global audio input server instance -func GetGlobalAudioInputServer() *AudioInputServer { - return globalAudioInputServer -} - -// ResetGlobalAudioInputServerStats resets the global audio input server stats -func ResetGlobalAudioInputServerStats() { - if globalAudioInputServer != nil { - globalAudioInputServer.ResetServerStats() - } -} - -// RecoverGlobalAudioInputServer attempts to recover from dropped frames -func RecoverGlobalAudioInputServer() { - if globalAudioInputServer != nil { - globalAudioInputServer.RecoverFromDroppedFrames() - } -} - -// getEnvInt reads an integer from environment variable with a default value - -// RunAudioInputServer runs the audio input server subprocess -// This should be called from main() when the subprocess is detected -func RunAudioInputServer() error { - logger := logging.GetSubsystemLogger("audio").With().Str("component", "audio-input-server").Logger() - - // Parse OPUS configuration from environment variables - bitrate, complexity, vbr, signalType, bandwidth, dtx := parseOpusConfig() - applyOpusConfig(bitrate, complexity, vbr, signalType, bandwidth, dtx, "audio-input-server", false) - - // Initialize validation cache for optimal performance - InitValidationCache() - - // Initialize CGO audio playback (optional for input server) - // This is used for audio loopback/monitoring features - err := CGOAudioPlaybackInit() - if err != nil { - logger.Warn().Err(err).Msg("failed to initialize CGO audio playback - audio monitoring disabled") - // Continue without playback - input functionality doesn't require it - } else { - defer CGOAudioPlaybackClose() - logger.Info().Msg("CGO audio playback initialized successfully") - } - - // Create and start the IPC server - server, err := NewAudioInputServer() - if err != nil { - logger.Error().Err(err).Msg("failed to create audio input server") - return err - } - defer server.Close() - - // Store globally for access by other functions - globalAudioInputServer = server - - err = server.Start() - if err != nil { - logger.Error().Err(err).Msg("failed to start audio input server") - return err - } - - logger.Info().Msg("audio input server started, waiting for connections") - - // Update C trace logging based on current audio scope log level (after environment variables are processed) - traceEnabled := logger.GetLevel() <= zerolog.TraceLevel - CGOSetTraceLogging(traceEnabled) - - // Set up signal handling for graceful shutdown - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - // Wait for shutdown signal - select { - case sig := <-sigChan: - logger.Info().Str("signal", sig.String()).Msg("received shutdown signal") - case <-ctx.Done(): - } - - // Graceful shutdown - server.Stop() - - // Give some time for cleanup - time.Sleep(Config.DefaultSleepDuration) - - return nil -} diff --git a/internal/audio/ipc_input.go b/internal/audio/ipc_input.go index 668c74c7..0316eb3f 100644 --- a/internal/audio/ipc_input.go +++ b/internal/audio/ipc_input.go @@ -1,23 +1,17 @@ package audio import ( - "encoding/binary" "fmt" - "io" "net" - "os" - "runtime" "sync" "sync/atomic" "time" "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" ) -// Component name constants for logging +// Component name constant for logging const ( - AudioInputServerComponent = "audio-input-server" AudioInputClientComponent = "audio-input-client" ) @@ -27,633 +21,6 @@ var ( messagePoolSize = Config.MessagePoolSize // Pre-allocated message pool size ) -// OptimizedIPCMessage represents an optimized message with pre-allocated buffers -type OptimizedIPCMessage struct { - header [17]byte - data []byte - msg UnifiedIPCMessage -} - -// MessagePool manages a pool of reusable messages to reduce allocations -type MessagePool struct { - hitCount int64 - missCount int64 - - pool chan *OptimizedIPCMessage - - preallocated []*OptimizedIPCMessage - preallocSize int - maxPoolSize int - mutex sync.RWMutex -} - -// Global message pool instance -var globalMessagePool = &MessagePool{ - pool: make(chan *OptimizedIPCMessage, messagePoolSize), -} - -var messagePoolInitOnce sync.Once - -// initializeMessagePool initializes the global message pool with pre-allocated messages -func initializeMessagePool() { - messagePoolInitOnce.Do(func() { - preallocSize := messagePoolSize / 4 // 25% pre-allocated for immediate use - globalMessagePool.preallocSize = preallocSize - globalMessagePool.maxPoolSize = messagePoolSize * Config.PoolGrowthMultiplier // Allow growth up to 2x - globalMessagePool.preallocated = make([]*OptimizedIPCMessage, 0, preallocSize) - - // Pre-allocate messages for immediate use - for i := 0; i < preallocSize; i++ { - msg := &OptimizedIPCMessage{ - data: make([]byte, 0, maxFrameSize), - } - globalMessagePool.preallocated = append(globalMessagePool.preallocated, msg) - } - - // Fill the channel with remaining messages - for i := preallocSize; i < messagePoolSize; i++ { - globalMessagePool.pool <- &OptimizedIPCMessage{ - data: make([]byte, 0, maxFrameSize), - } - } - }) -} - -// Get retrieves a message from the pool -func (mp *MessagePool) Get() *OptimizedIPCMessage { - initializeMessagePool() - // First try pre-allocated messages for fastest access - mp.mutex.Lock() - if len(mp.preallocated) > 0 { - msg := mp.preallocated[len(mp.preallocated)-1] - mp.preallocated = mp.preallocated[:len(mp.preallocated)-1] - mp.mutex.Unlock() - atomic.AddInt64(&mp.hitCount, 1) - // Reset message for reuse - msg.data = msg.data[:0] - msg.msg = UnifiedIPCMessage{} - return msg - } - mp.mutex.Unlock() - - // Try channel pool next - select { - case msg := <-mp.pool: - atomic.AddInt64(&mp.hitCount, 1) - // Reset message for reuse and ensure proper capacity - msg.data = msg.data[:0] - msg.msg = UnifiedIPCMessage{} - // Ensure data buffer has sufficient capacity - if cap(msg.data) < maxFrameSize { - msg.data = make([]byte, 0, maxFrameSize) - } - return msg - default: - // Pool exhausted, create new message with exact capacity - atomic.AddInt64(&mp.missCount, 1) - return &OptimizedIPCMessage{ - data: make([]byte, 0, maxFrameSize), - } - } -} - -// Put returns a message to the pool -func (mp *MessagePool) Put(msg *OptimizedIPCMessage) { - if msg == nil { - return - } - - // Validate buffer capacity - reject if too small or too large - if cap(msg.data) < maxFrameSize/2 || cap(msg.data) > maxFrameSize*2 { - return // Let GC handle oversized or undersized buffers - } - - // Reset the message for reuse - msg.data = msg.data[:0] - msg.msg = UnifiedIPCMessage{} - - // First try to return to pre-allocated pool for fastest reuse - mp.mutex.Lock() - if len(mp.preallocated) < mp.preallocSize { - mp.preallocated = append(mp.preallocated, msg) - mp.mutex.Unlock() - return - } - mp.mutex.Unlock() - - // Try channel pool next - select { - case mp.pool <- msg: - // Successfully returned to pool - default: - // Pool full, let GC handle it - } -} - -type AudioInputServer struct { - bufferSize int64 - processingTime int64 - droppedFrames int64 - totalFrames int64 - - listener net.Listener - conn net.Conn - mtx sync.Mutex - running bool - - messageChan chan *UnifiedIPCMessage - processChan chan *UnifiedIPCMessage - stopChan chan struct{} - wg sync.WaitGroup - - channelMutex sync.RWMutex - lastBufferSize int64 - - socketBufferConfig SocketBufferConfig -} - -// NewAudioInputServer creates a new audio input server -func NewAudioInputServer() (*AudioInputServer, error) { - socketPath := getInputSocketPath() - - // Retry socket creation with cleanup to handle race conditions - var listener net.Listener - var err error - for i := 0; i < 3; i++ { - // Remove existing socket if any - os.Remove(socketPath) - - // Small delay to ensure cleanup completes - if i > 0 { - time.Sleep(10 * time.Millisecond) - } - - listener, err = net.Listen("unix", socketPath) - if err == nil { - break - } - - // Log retry attempt - if i < 2 { - logger := logging.GetDefaultLogger().With().Str("component", "audio-input").Logger() - logger.Warn().Err(err).Int("attempt", i+1).Msg("Failed to create unix socket, retrying") - } - } - - if err != nil { - return nil, fmt.Errorf("failed to create unix socket after 3 attempts: %w", err) - } - - // Get initial buffer size (512 frames for stability) - initialBufferSize := int64(512) - - // Ensure minimum buffer size to prevent immediate overflow - // Use at least 50 frames to handle burst traffic - minBufferSize := int64(50) - if initialBufferSize < minBufferSize { - initialBufferSize = minBufferSize - } - - // Initialize socket buffer configuration - socketBufferConfig := DefaultSocketBufferConfig() - - return &AudioInputServer{ - listener: listener, - messageChan: make(chan *UnifiedIPCMessage, initialBufferSize), - processChan: make(chan *UnifiedIPCMessage, initialBufferSize), - stopChan: make(chan struct{}), - bufferSize: initialBufferSize, - lastBufferSize: initialBufferSize, - socketBufferConfig: socketBufferConfig, - }, nil -} - -// Start starts the audio input server -func (ais *AudioInputServer) Start() error { - ais.mtx.Lock() - defer ais.mtx.Unlock() - - if ais.running { - return fmt.Errorf("server already running") - } - - ais.running = true - - // Reset counters on start - atomic.StoreInt64(&ais.totalFrames, 0) - atomic.StoreInt64(&ais.droppedFrames, 0) - atomic.StoreInt64(&ais.processingTime, 0) - - // Start triple-goroutine architecture - ais.startReaderGoroutine() - ais.startProcessorGoroutine() - ais.startMonitorGoroutine() - - // Submit the connection acceptor directly - go ais.acceptConnections() - - return nil -} - -// Stop stops the audio input server -func (ais *AudioInputServer) Stop() { - ais.mtx.Lock() - defer ais.mtx.Unlock() - - if !ais.running { - return - } - - ais.running = false - - // Signal all goroutines to stop - close(ais.stopChan) - ais.wg.Wait() - - if ais.conn != nil { - ais.conn.Close() - ais.conn = nil - } - - if ais.listener != nil { - ais.listener.Close() - ais.listener = nil - } - - // Remove socket file to prevent restart issues - os.Remove(getInputSocketPath()) -} - -// Close closes the server and cleans up resources -func (ais *AudioInputServer) Close() { - ais.Stop() - // Remove socket file - os.Remove(getInputSocketPath()) -} - -// acceptConnections accepts incoming connections -func (ais *AudioInputServer) acceptConnections() { - for ais.running { - conn, err := ais.listener.Accept() - if err != nil { - if ais.running { - // Log error and continue accepting - logger := logging.GetDefaultLogger().With().Str("component", "audio-input").Logger() - logger.Warn().Err(err).Msg("failed to accept connection, retrying") - continue - } - return - } - - // Configure socket buffers for optimal performance - if err := ConfigureSocketBuffers(conn, ais.socketBufferConfig); err != nil { - // Log warning but don't fail - socket buffer optimization is not critical - logger := logging.GetDefaultLogger().With().Str("component", "audio-input").Logger() - logger.Warn().Err(err).Msg("failed to configure socket buffers, using defaults") - } else { - // Record socket buffer metrics for monitoring - RecordSocketBufferMetrics(conn, "audio-input") - } - - ais.mtx.Lock() - // Close existing connection if any to prevent resource leaks - if ais.conn != nil { - ais.conn.Close() - ais.conn = nil - } - ais.conn = conn - ais.mtx.Unlock() - - // Handle this connection using the goroutine pool - // Handle the connection directly - go ais.handleConnection(conn) - } -} - -// handleConnection handles a single client connection -func (ais *AudioInputServer) handleConnection(conn net.Conn) { - defer conn.Close() - - // Connection is now handled by the reader goroutine - // Just wait for connection to close or stop signal - for { - select { - case <-ais.stopChan: - return - default: - // Check if connection is still alive - if ais.conn == nil { - return - } - time.Sleep(Config.DefaultSleepDuration) - } - } -} - -// readMessage reads a message from the connection using optimized pooled buffers with validation. -// -// Validation Rules: -// - Magic number must match InputMagicNumber ("JKMI" - JetKVM Microphone Input) -// - Message length must not exceed MaxFrameSize (default: 4096 bytes) -// - Header size is fixed at 17 bytes (4+1+4+8: Magic+Type+Length+Timestamp) -// - Data length validation prevents buffer overflow attacks -// -// Message Format: -// - Magic (4 bytes): Identifies valid JetKVM audio messages -// - Type (1 byte): InputMessageType (OpusFrame, Config, Stop, Heartbeat, Ack) -// - Length (4 bytes): Data payload size in bytes -// - Timestamp (8 bytes): Message timestamp for latency tracking -// - Data (variable): Message payload up to MaxFrameSize -// -// Error Conditions: -// - Invalid magic number: Rejects non-JetKVM messages -// - Message too large: Prevents memory exhaustion -// - Connection errors: Network/socket failures -// - Incomplete reads: Partial message reception -// -// The function uses pooled buffers for efficient memory management and -// ensures all messages conform to the JetKVM audio protocol specification. -func (ais *AudioInputServer) readMessage(conn net.Conn) (*UnifiedIPCMessage, error) { - // Get optimized message from pool - optMsg := globalMessagePool.Get() - defer globalMessagePool.Put(optMsg) - - // Read header directly into pre-allocated buffer - _, err := io.ReadFull(conn, optMsg.header[:]) - if err != nil { - return nil, err - } - - // Parse header using optimized access - msg := &optMsg.msg - msg.Magic = binary.LittleEndian.Uint32(optMsg.header[0:4]) - msg.Type = UnifiedMessageType(optMsg.header[4]) - msg.Length = binary.LittleEndian.Uint32(optMsg.header[5:9]) - msg.Timestamp = int64(binary.LittleEndian.Uint64(optMsg.header[9:17])) - - // Validate magic number - if msg.Magic != inputMagicNumber { - return nil, fmt.Errorf("invalid magic number: got 0x%x, expected 0x%x", msg.Magic, inputMagicNumber) - } - - // Validate message length - if msg.Length > uint32(maxFrameSize) { - return nil, fmt.Errorf("message too large: got %d bytes, maximum allowed %d bytes", msg.Length, maxFrameSize) - } - - // Read data if present using pooled buffer - if msg.Length > 0 { - // Ensure buffer capacity - if cap(optMsg.data) < int(msg.Length) { - optMsg.data = make([]byte, msg.Length) - } else { - optMsg.data = optMsg.data[:msg.Length] - } - - _, err = io.ReadFull(conn, optMsg.data) - if err != nil { - return nil, err - } - msg.Data = optMsg.data - } - - // Return a copy of the message (data will be copied by caller if needed) - result := &UnifiedIPCMessage{ - Magic: msg.Magic, - Type: msg.Type, - Length: msg.Length, - Timestamp: msg.Timestamp, - } - - if msg.Length > 0 { - // Copy data to ensure it's not affected by buffer reuse - result.Data = make([]byte, msg.Length) - copy(result.Data, msg.Data) - } - - return result, nil -} - -// processMessage processes a received message -func (ais *AudioInputServer) processMessage(msg *UnifiedIPCMessage) error { - switch msg.Type { - case MessageTypeOpusFrame: - return ais.processOpusFrame(msg.Data) - case MessageTypeConfig: - return ais.processConfig(msg.Data) - case MessageTypeOpusConfig: - return ais.processOpusConfig(msg.Data) - case MessageTypeStop: - return fmt.Errorf("stop message received") - case MessageTypeHeartbeat: - return ais.sendAck() - default: - return fmt.Errorf("unknown message type: %d", msg.Type) - } -} - -// processOpusFrame processes an Opus audio frame -func (ais *AudioInputServer) processOpusFrame(data []byte) error { - // Inline validation for critical audio path - avoid function call overhead - dataLen := len(data) - cachedMaxFrameSize := maxFrameSize - if dataLen > cachedMaxFrameSize { - return ErrFrameDataTooLarge - } - - // Get cached config once - avoid repeated calls and locking - cache := Config - // Skip cache expiry check in hotpath - background updates handle this - - // Get a PCM buffer from the pool for optimized decode-write - pcmBuffer := GetBufferFromPool(cache.MaxPCMBufferSize) - defer ReturnBufferToPool(pcmBuffer) - - // Log audio processing details periodically for monitoring - totalFrames := atomic.AddInt64(&ais.totalFrames, 1) - - // Zero-cost debug logging for buffer allocation (first few operations) - // Only perform computations if trace logging is actually enabled - if totalFrames <= 5 { - logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() - if logger.GetLevel() <= zerolog.TraceLevel { - logger.Trace(). - Int("requested_buffer_size", cache.MaxPCMBufferSize). - Int("pcm_buffer_length", len(pcmBuffer)). - Int("pcm_buffer_capacity", cap(pcmBuffer)). - Msg("PCM buffer allocated from pool") - } - } - if totalFrames <= 5 || totalFrames%500 == 1 { - logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() - if logger.GetLevel() <= zerolog.TraceLevel { - logger.Trace(). - Int("opus_frame_size", dataLen). - Int("pcm_buffer_size", len(pcmBuffer)). - Int64("total_frames_processed", totalFrames). - Msg("Processing audio frame for USB Gadget output") - } - } - - // Direct CGO call - avoid wrapper function overhead - start := time.Now() - framesWritten, err := CGOAudioDecodeWrite(data, pcmBuffer) - duration := time.Since(start) - - // Log the result with detailed context - logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() - - if err != nil { - // Log error with detailed context for debugging - atomic.AddInt64(&ais.droppedFrames, 1) - - // Get current statistics for context - total, success, failures, recovery, lastError, _ := GetAudioDecodeWriteStats() - successRate := float64(success) / float64(total) * 100 - - logger.Error(). - Err(err). - Int("opus_frame_size", dataLen). - Dur("processing_duration", duration). - Int64("frames_written", int64(framesWritten)). - Int64("total_operations", total). - Int64("successful_operations", success). - Int64("failed_operations", failures). - Int64("recovery_attempts", recovery). - Float64("success_rate_percent", successRate). - Str("last_error", lastError). - Int64("total_frames_processed", totalFrames). - Int64("dropped_frames", atomic.LoadInt64(&ais.droppedFrames)). - Msg("Failed to decode/write audio frame to USB Gadget") - - return err - } - - // Log successful operations periodically to monitor health (zero-cost when trace disabled) - if (totalFrames <= 5 || totalFrames%1000 == 1) && logger.GetLevel() <= zerolog.TraceLevel { - // Get current statistics for context (only when trace is enabled) - total, success, failures, recovery, _, _ := GetAudioDecodeWriteStats() - successRate := float64(success) / float64(total) * 100 - - logger.Trace(). - Int("opus_frame_size", dataLen). - Int64("frames_written", int64(framesWritten)). - Int64("total_operations", total). - Int64("successful_operations", success). - Int64("failed_operations", failures). - Int64("recovery_attempts", recovery). - Float64("success_rate_percent", successRate). - Int64("total_frames_processed", totalFrames). - Int64("dropped_frames", atomic.LoadInt64(&ais.droppedFrames)). - Msg("Successfully decoded/wrote audio frame to USB Gadget") - } - - return err -} - -// processConfig processes a configuration update -func (ais *AudioInputServer) processConfig(data []byte) error { - // Validate configuration data - if len(data) == 0 { - return fmt.Errorf("empty configuration data") - } - - // Basic validation for configuration size - if err := ValidateBufferSize(len(data)); err != nil { - logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() - logger.Error().Err(err).Msg("Configuration buffer validation failed") - return fmt.Errorf("configuration validation failed: %w", err) - } - - // Acknowledge configuration receipt - return ais.sendAck() -} - -// processOpusConfig processes a complete Opus encoder configuration update -func (ais *AudioInputServer) processOpusConfig(data []byte) error { - logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() - - // Validate configuration data size (9 * int32 = 36 bytes) - if len(data) != 36 { - return fmt.Errorf("invalid Opus configuration data size: expected 36 bytes, got %d", len(data)) - } - - // Deserialize Opus configuration - config := UnifiedIPCOpusConfig{ - SampleRate: int(binary.LittleEndian.Uint32(data[0:4])), - Channels: int(binary.LittleEndian.Uint32(data[4:8])), - FrameSize: int(binary.LittleEndian.Uint32(data[8:12])), - Bitrate: int(binary.LittleEndian.Uint32(data[12:16])), - Complexity: int(binary.LittleEndian.Uint32(data[16:20])), - VBR: int(binary.LittleEndian.Uint32(data[20:24])), - SignalType: int(binary.LittleEndian.Uint32(data[24:28])), - Bandwidth: int(binary.LittleEndian.Uint32(data[28:32])), - DTX: int(binary.LittleEndian.Uint32(data[32:36])), - } - - logger.Info().Interface("config", config).Msg("applying dynamic Opus encoder configuration") - - // Note: We don't call CGOAudioInit() here as it would destroy and recreate the encoder, - // causing temporary unavailability. The encoder should already be initialized when - // the audio input server starts. - - // Apply the Opus encoder configuration dynamically with retry logic - var err error - for attempt := 0; attempt < 3; attempt++ { - err = CGOUpdateOpusEncoderParams( - config.Bitrate, - config.Complexity, - config.VBR, - 0, // VBR constraint - using default - config.SignalType, - config.Bandwidth, - config.DTX, - ) - if err == nil { - break - } - logger.Warn().Err(err).Int("attempt", attempt+1).Msg("Failed to update Opus encoder parameters, retrying") - if attempt < 2 { - time.Sleep(time.Duration(attempt+1) * 50 * time.Millisecond) - } - } - - if err != nil { - logger.Error().Err(err).Msg("failed to apply Opus encoder configuration after retries") - return fmt.Errorf("failed to apply Opus configuration: %w", err) - } - - logger.Info().Msg("Opus encoder configuration applied successfully") - return ais.sendAck() -} - -// sendAck sends an acknowledgment message -func (ais *AudioInputServer) sendAck() error { - ais.mtx.Lock() - defer ais.mtx.Unlock() - - if ais.conn == nil { - return fmt.Errorf("no connection") - } - - msg := &UnifiedIPCMessage{ - Magic: inputMagicNumber, - Type: MessageTypeAck, - Length: 0, - Timestamp: time.Now().UnixNano(), - } - - return ais.writeMessage(ais.conn, msg) -} - -// Global shared message pool for input IPC server -var globalInputServerMessagePool = NewGenericMessagePool(messagePoolSize) - -// writeMessage writes a message to the connection using shared common utilities -func (ais *AudioInputServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error { - // Use shared WriteIPCMessage function with global message pool - return WriteIPCMessage(conn, msg, globalInputServerMessagePool, &ais.droppedFrames) -} // AudioInputClient handles IPC communication from the main process type AudioInputClient struct { @@ -916,414 +283,3 @@ func (aic *AudioInputClient) ResetStats() { ResetFrameStats(&aic.totalFrames, &aic.droppedFrames) } -// ResetServerStats resets server frame statistics -func (ais *AudioInputServer) ResetServerStats() { - atomic.StoreInt64(&ais.totalFrames, 0) - atomic.StoreInt64(&ais.droppedFrames, 0) -} - -// RecoverFromDroppedFrames attempts to recover when too many frames are dropped -func (ais *AudioInputServer) RecoverFromDroppedFrames() { - total := atomic.LoadInt64(&ais.totalFrames) - dropped := atomic.LoadInt64(&ais.droppedFrames) - - // If more than 50% of frames are dropped, attempt recovery - if total > 100 && dropped > total/2 { - logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() - logger.Warn().Int64("total", total).Int64("dropped", dropped).Msg("high drop rate detected, attempting recovery") - - // Reset stats and update buffer size from adaptive manager - ais.ResetServerStats() - ais.UpdateBufferSize() - } -} - -// startReaderGoroutine starts the message reader using the goroutine pool -func (ais *AudioInputServer) startReaderGoroutine() { - ais.wg.Add(1) - - // Create a reader task that will run in the goroutine pool - readerTask := func() { - defer ais.wg.Done() - - // Enhanced error tracking and recovery - var consecutiveErrors int - var lastErrorTime time.Time - maxConsecutiveErrors := Config.MaxConsecutiveErrors - errorResetWindow := Config.RestartWindow // Use existing restart window - baseBackoffDelay := Config.RetryDelay - maxBackoffDelay := Config.MaxRetryDelay - - logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() - - for ais.running { - ais.mtx.Lock() - conn := ais.conn - ais.mtx.Unlock() - - if conn == nil { - time.Sleep(10 * time.Millisecond) - continue - } - - msg, err := ais.readMessage(conn) - if err != nil { - if ais.running { - // Enhanced error handling with progressive backoff - now := time.Now() - - // Reset error counter if enough time has passed - if now.Sub(lastErrorTime) > errorResetWindow { - consecutiveErrors = 0 - } - - consecutiveErrors++ - lastErrorTime = now - - // Skip logging in hotpath for performance - only log critical errors - - // Progressive backoff based on error count - if consecutiveErrors > 1 { - backoffDelay := time.Duration(consecutiveErrors-1) * baseBackoffDelay - if backoffDelay > maxBackoffDelay { - backoffDelay = maxBackoffDelay - } - time.Sleep(backoffDelay) - } - - // If too many consecutive errors, close connection to force reconnect - if consecutiveErrors >= maxConsecutiveErrors { - // Only log critical errors to reduce hotpath overhead - if logger.GetLevel() <= zerolog.ErrorLevel { - logger.Error(). - Int("consecutive_errors", consecutiveErrors). - Msg("Too many consecutive read errors, closing connection") - } - - ais.mtx.Lock() - if ais.conn != nil { - ais.conn.Close() - ais.conn = nil - } - ais.mtx.Unlock() - - consecutiveErrors = 0 // Reset for next connection - } - } - continue - } - - // Reset error counter on successful read - if consecutiveErrors > 0 { - consecutiveErrors = 0 - // Only log recovery info if debug level enabled to reduce overhead - if logger.GetLevel() <= zerolog.InfoLevel { - logger.Info().Msg("Input connection recovered") - } - } - - // Send to message channel with non-blocking write (use read lock for channel access) - ais.channelMutex.RLock() - messageChan := ais.messageChan - ais.channelMutex.RUnlock() - - select { - case messageChan <- msg: - atomic.AddInt64(&ais.totalFrames, 1) - default: - // Channel full, drop message - atomic.AddInt64(&ais.droppedFrames, 1) - // Avoid sampling logic in critical path - only log if warn level enabled - if logger.GetLevel() <= zerolog.WarnLevel { - droppedCount := atomic.LoadInt64(&ais.droppedFrames) - logger.Warn().Int64("total_dropped", droppedCount).Msg("Message channel full, dropping frame") - } - } - } - } - - // Handle the reader task directly - go readerTask() -} - -// startProcessorGoroutine starts the message processor using the goroutine pool -func (ais *AudioInputServer) startProcessorGoroutine() { - ais.wg.Add(1) - - // Create a processor task that will run in the goroutine pool - processorTask := func() { - // Only lock OS thread and set priority for high-load scenarios - // This reduces interference with input processing threads - config := Config - useThreadOptimizations := config.MaxAudioProcessorWorkers > 8 - - if useThreadOptimizations { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - - // Priority scheduler not implemented - using default thread priority - } - - // Create logger for this goroutine - logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() - - // Enhanced error tracking for processing - var processingErrors int - var lastProcessingError time.Time - maxProcessingErrors := config.MaxConsecutiveErrors - errorResetWindow := config.RestartWindow - - defer ais.wg.Done() - for { - select { - case <-ais.stopChan: - return - case msg := <-ais.getMessageChan(): - // Process message with error handling - start := time.Now() - err := ais.processMessageWithRecovery(msg, logger) - processingTime := time.Since(start) - - if err != nil { - // Track processing errors - now := time.Now() - if now.Sub(lastProcessingError) > errorResetWindow { - processingErrors = 0 - } - - processingErrors++ - lastProcessingError = now - - // Skip logging in hotpath for performance - - // If too many processing errors, drop frames more aggressively - if processingErrors >= maxProcessingErrors { - // Clear processing queue to recover - processChan := ais.getProcessChan() - for len(processChan) > 0 { - select { - case <-processChan: - atomic.AddInt64(&ais.droppedFrames, 1) - default: - break - } - } - processingErrors = 0 // Reset after clearing queue - } - continue - } - - // Reset error counter on successful processing - if processingErrors > 0 { - processingErrors = 0 - // Skip logging in hotpath for performance - } - - // Update processing time metrics - atomic.StoreInt64(&ais.processingTime, processingTime.Nanoseconds()) - } - } - } - - // Submit the processor task directly - go processorTask() -} - -// processMessageWithRecovery processes a message with enhanced error recovery -func (ais *AudioInputServer) processMessageWithRecovery(msg *UnifiedIPCMessage, logger zerolog.Logger) error { - // Intelligent frame dropping: prioritize recent frames - if msg.Type == MessageTypeOpusFrame { - // Check if processing queue is getting full - processChan := ais.getProcessChan() - queueLen := len(processChan) - bufferSize := int(atomic.LoadInt64(&ais.bufferSize)) - - if queueLen > bufferSize*3/4 { - // Drop oldest frames, keep newest - select { - case <-processChan: // Remove oldest - atomic.AddInt64(&ais.droppedFrames, 1) - logger.Debug().Msg("Dropped oldest frame to make room") - default: - } - } - } - - // Send to processing queue with timeout (use read lock for channel access) - ais.channelMutex.RLock() - processChan := ais.processChan - ais.channelMutex.RUnlock() - - select { - case processChan <- msg: - return nil - case <-time.After(Config.WriteTimeout): - // Processing queue full and timeout reached, drop frame - atomic.AddInt64(&ais.droppedFrames, 1) - return fmt.Errorf("processing queue timeout") - default: - // Processing queue full, drop frame immediately - atomic.AddInt64(&ais.droppedFrames, 1) - return fmt.Errorf("processing queue full") - } -} - -// startMonitorGoroutine starts the performance monitoring using the goroutine pool -func (ais *AudioInputServer) startMonitorGoroutine() { - ais.wg.Add(1) - - // Create a monitor task that will run in the goroutine pool - monitorTask := func() { - // Monitor goroutine doesn't need thread locking for most scenarios - // Only use thread optimizations for high-throughput scenarios - config := Config - useThreadOptimizations := config.MaxAudioProcessorWorkers > 8 - - if useThreadOptimizations { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - - // Priority scheduler not implemented - using default thread priority - } - - defer ais.wg.Done() - ticker := time.NewTicker(Config.DefaultTickerInterval) - defer ticker.Stop() - - // Buffer size update ticker (less frequent) - bufferUpdateTicker := time.NewTicker(Config.BufferUpdateInterval) - defer bufferUpdateTicker.Stop() - - for { - select { - case <-ais.stopChan: - return - case <-ticker.C: - // Process frames from processing queue - for { - select { - case msg := <-ais.getProcessChan(): - start := time.Now() - err := ais.processMessage(msg) - processingTime := time.Since(start) - - // Calculate end-to-end latency using message timestamp - var latency time.Duration - if msg.Type == MessageTypeOpusFrame && msg.Timestamp > 0 { - msgTime := time.Unix(0, msg.Timestamp) - latency = time.Since(msgTime) - // Use exponential moving average for end-to-end latency tracking - currentAvg := atomic.LoadInt64(&ais.processingTime) - // Weight: 90% historical, 10% current (for smoother averaging) - newAvg := (currentAvg*9 + latency.Nanoseconds()) / 10 - atomic.StoreInt64(&ais.processingTime, newAvg) - } else { - // Fallback to processing time only - latency = processingTime - currentAvg := atomic.LoadInt64(&ais.processingTime) - newAvg := (currentAvg + processingTime.Nanoseconds()) / 2 - atomic.StoreInt64(&ais.processingTime, newAvg) - } - - if err != nil { - atomic.AddInt64(&ais.droppedFrames, 1) - } - default: - // No more messages to process - goto checkBufferUpdate - } - } - - checkBufferUpdate: - // Check if we need to update buffer size - select { - case <-bufferUpdateTicker.C: - // Buffer size is now fixed from config - default: - // No buffer update needed - } - } - } - } - - // Submit the monitor task directly - go monitorTask() -} - -// GetServerStats returns server performance statistics -func (ais *AudioInputServer) GetServerStats() (total, dropped int64, avgProcessingTime time.Duration, bufferSize int64) { - return atomic.LoadInt64(&ais.totalFrames), - atomic.LoadInt64(&ais.droppedFrames), - time.Duration(atomic.LoadInt64(&ais.processingTime)), - atomic.LoadInt64(&ais.bufferSize) -} - -// UpdateBufferSize updates the buffer size (now using fixed values) -func (ais *AudioInputServer) UpdateBufferSize() { - // Buffer size is now fixed at 512 frames for stability - newSize := int64(512) - atomic.StoreInt64(&ais.bufferSize, newSize) -} - -// GetMessagePoolStats returns detailed statistics about the message pool -func (mp *MessagePool) GetMessagePoolStats() MessagePoolStats { - mp.mutex.RLock() - preallocatedCount := len(mp.preallocated) - mp.mutex.RUnlock() - - hitCount := atomic.LoadInt64(&mp.hitCount) - missCount := atomic.LoadInt64(&mp.missCount) - totalRequests := hitCount + missCount - - var hitRate float64 - if totalRequests > 0 { - hitRate = float64(hitCount) / float64(totalRequests) * Config.PercentageMultiplier - } - - // Calculate channel pool size - channelPoolSize := len(mp.pool) - - return MessagePoolStats{ - MaxPoolSize: mp.maxPoolSize, - ChannelPoolSize: channelPoolSize, - PreallocatedCount: int64(preallocatedCount), - PreallocatedMax: int64(mp.preallocSize), - HitCount: hitCount, - MissCount: missCount, - HitRate: hitRate, - } -} - -// MessagePoolStats provides detailed message pool statistics -type MessagePoolStats struct { - MaxPoolSize int - ChannelPoolSize int - PreallocatedCount int64 - PreallocatedMax int64 - HitCount int64 - MissCount int64 - HitRate float64 // Percentage -} - -// GetGlobalMessagePoolStats returns statistics for the global message pool -func GetGlobalMessagePoolStats() MessagePoolStats { - return globalMessagePool.GetMessagePoolStats() -} - -// getMessageChan safely returns the current message channel -func (ais *AudioInputServer) getMessageChan() chan *UnifiedIPCMessage { - ais.channelMutex.RLock() - defer ais.channelMutex.RUnlock() - return ais.messageChan -} - -// getProcessChan safely returns the current process channel -func (ais *AudioInputServer) getProcessChan() chan *UnifiedIPCMessage { - ais.channelMutex.RLock() - defer ais.channelMutex.RUnlock() - return ais.processChan -} - -// Helper functions - -// getInputSocketPath is now defined in unified_ipc.go diff --git a/internal/audio/ipc_output.go b/internal/audio/ipc_output.go index f5588371..95dd61cb 100644 --- a/internal/audio/ipc_output.go +++ b/internal/audio/ipc_output.go @@ -16,297 +16,6 @@ import ( // Global shared message pool for output IPC client header reading var globalOutputClientMessagePool = NewGenericMessagePool(Config.OutputMessagePoolSize) -// AudioOutputServer provides audio output IPC functionality -type AudioOutputServer struct { - bufferSize int64 - droppedFrames int64 - totalFrames int64 - - listener net.Listener - conn net.Conn - mtx sync.Mutex - running bool - logger zerolog.Logger - - messageChan chan *UnifiedIPCMessage - processChan chan *UnifiedIPCMessage - wg sync.WaitGroup - - socketPath string - magicNumber uint32 -} - -func NewAudioOutputServer() (*AudioOutputServer, error) { - socketPath := getOutputSocketPath() - logger := logging.GetDefaultLogger().With().Str("component", "audio-output-server").Logger() - - server := &AudioOutputServer{ - socketPath: socketPath, - magicNumber: Config.OutputMagicNumber, - logger: logger, - messageChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize), - processChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize), - } - - return server, nil -} - -// GetServerStats returns server performance statistics -// Start starts the audio output server -func (s *AudioOutputServer) Start() error { - s.mtx.Lock() - defer s.mtx.Unlock() - - if s.running { - return fmt.Errorf("audio output server is already running") - } - - // Create Unix socket - listener, err := net.Listen("unix", s.socketPath) - if err != nil { - return fmt.Errorf("failed to create unix socket: %w", err) - } - - s.listener = listener - s.running = true - - // Start goroutines - s.wg.Add(1) - go s.acceptConnections() - - s.logger.Info().Str("socket_path", s.socketPath).Msg("Audio output server started") - return nil -} - -// Stop stops the audio output server -func (s *AudioOutputServer) Stop() { - s.mtx.Lock() - defer s.mtx.Unlock() - - if !s.running { - return - } - - s.running = false - - if s.listener != nil { - s.listener.Close() - s.listener = nil - } - - if s.conn != nil { - s.conn.Close() - } - - // Close channels - close(s.messageChan) - close(s.processChan) - - s.wg.Wait() - s.logger.Info().Msg("Audio output server stopped") -} - -// acceptConnections handles incoming connections -func (s *AudioOutputServer) acceptConnections() { - defer s.wg.Done() - - for s.running { - conn, err := s.listener.Accept() - if err != nil { - if s.running { - s.logger.Error().Err(err).Msg("Failed to accept connection") - } - return - } - - s.mtx.Lock() - s.conn = conn - s.mtx.Unlock() - - s.logger.Info().Msg("Client connected to audio output server") - // Start message processing for this connection - s.wg.Add(1) - go s.handleConnection(conn) - } -} - -// handleConnection processes messages from a client connection -func (s *AudioOutputServer) handleConnection(conn net.Conn) { - defer s.wg.Done() - defer conn.Close() - - for s.running { - msg, err := s.readMessage(conn) - if err != nil { - if s.running { - s.logger.Error().Err(err).Msg("Failed to read message from client") - } - return - } - - if err := s.processMessage(msg); err != nil { - s.logger.Error().Err(err).Msg("Failed to process message") - } - } -} - -// readMessage reads a message from the connection -func (s *AudioOutputServer) readMessage(conn net.Conn) (*UnifiedIPCMessage, error) { - header := make([]byte, 17) - if _, err := io.ReadFull(conn, header); err != nil { - return nil, fmt.Errorf("failed to read header: %w", err) - } - - magic := binary.LittleEndian.Uint32(header[0:4]) - if magic != s.magicNumber { - return nil, fmt.Errorf("invalid magic number: expected %d, got %d", s.magicNumber, magic) - } - - msgType := UnifiedMessageType(header[4]) - length := binary.LittleEndian.Uint32(header[5:9]) - timestamp := int64(binary.LittleEndian.Uint64(header[9:17])) - - var data []byte - if length > 0 { - data = make([]byte, length) - if _, err := io.ReadFull(conn, data); err != nil { - return nil, fmt.Errorf("failed to read data: %w", err) - } - } - - return &UnifiedIPCMessage{ - Magic: magic, - Type: msgType, - Length: length, - Timestamp: timestamp, - Data: data, - }, nil -} - -// processMessage processes a received message -func (s *AudioOutputServer) processMessage(msg *UnifiedIPCMessage) error { - switch msg.Type { - case MessageTypeOpusConfig: - return s.processOpusConfig(msg.Data) - case MessageTypeStop: - s.logger.Info().Msg("Received stop message") - return nil - case MessageTypeHeartbeat: - s.logger.Debug().Msg("Received heartbeat") - return nil - default: - s.logger.Warn().Int("type", int(msg.Type)).Msg("Unknown message type") - return nil - } -} - -// processOpusConfig processes Opus configuration updates -func (s *AudioOutputServer) processOpusConfig(data []byte) error { - // Validate configuration data size (9 * int32 = 36 bytes) - if len(data) != 36 { - return fmt.Errorf("invalid Opus configuration data size: expected 36 bytes, got %d", len(data)) - } - - // Decode Opus configuration - config := UnifiedIPCOpusConfig{ - SampleRate: int(binary.LittleEndian.Uint32(data[0:4])), - Channels: int(binary.LittleEndian.Uint32(data[4:8])), - FrameSize: int(binary.LittleEndian.Uint32(data[8:12])), - Bitrate: int(binary.LittleEndian.Uint32(data[12:16])), - Complexity: int(binary.LittleEndian.Uint32(data[16:20])), - VBR: int(binary.LittleEndian.Uint32(data[20:24])), - SignalType: int(binary.LittleEndian.Uint32(data[24:28])), - Bandwidth: int(binary.LittleEndian.Uint32(data[28:32])), - DTX: int(binary.LittleEndian.Uint32(data[32:36])), - } - - s.logger.Info().Interface("config", config).Msg("Received Opus configuration update") - - // Ensure we're running in the audio server subprocess - if !isAudioServerProcess() { - s.logger.Warn().Msg("Opus configuration update ignored - not running in audio server subprocess") - return nil - } - - // Check if audio output streaming is currently active - if atomic.LoadInt32(&outputStreamingRunning) == 0 { - s.logger.Info().Msg("Audio output streaming not active, configuration will be applied when streaming starts") - return nil - } - - // Ensure capture is initialized before updating encoder parameters - // The C function requires both encoder and capture_initialized to be true - if err := cgoAudioInit(); err != nil { - s.logger.Debug().Err(err).Msg("Audio capture already initialized or initialization failed") - // Continue anyway - capture may already be initialized - } - - // Apply configuration using CGO function (only if audio system is running) - vbrConstraint := Config.CGOOpusVBRConstraint - if err := updateOpusEncoderParams(config.Bitrate, config.Complexity, config.VBR, vbrConstraint, config.SignalType, config.Bandwidth, config.DTX); err != nil { - s.logger.Error().Err(err).Msg("Failed to update Opus encoder parameters - encoder may not be initialized") - return err - } - - s.logger.Info().Msg("Opus encoder parameters updated successfully") - return nil -} - -// SendFrame sends an audio frame to the client -func (s *AudioOutputServer) SendFrame(frame []byte) error { - s.mtx.Lock() - conn := s.conn - s.mtx.Unlock() - - if conn == nil { - return fmt.Errorf("no client connected") - } - - // Zero-cost trace logging for frame transmission - if s.logger.GetLevel() <= zerolog.TraceLevel { - totalFrames := atomic.LoadInt64(&s.totalFrames) - if totalFrames <= 5 || totalFrames%1000 == 1 { - s.logger.Trace(). - Int("frame_size", len(frame)). - Int64("total_frames_sent", totalFrames). - Msg("Sending audio frame to output client") - } - } - - msg := &UnifiedIPCMessage{ - Magic: s.magicNumber, - Type: MessageTypeOpusFrame, - Length: uint32(len(frame)), - Timestamp: time.Now().UnixNano(), - Data: frame, - } - - return s.writeMessage(conn, msg) -} - -// writeMessage writes a message to the connection -func (s *AudioOutputServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error { - header := make([]byte, 17) - EncodeMessageHeader(header, msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp) - - if _, err := conn.Write(header); err != nil { - return fmt.Errorf("failed to write header: %w", err) - } - - if msg.Length > 0 && msg.Data != nil { - if _, err := conn.Write(msg.Data); err != nil { - return fmt.Errorf("failed to write data: %w", err) - } - } - - atomic.AddInt64(&s.totalFrames, 1) - return nil -} - -func (s *AudioOutputServer) GetServerStats() (total, dropped int64, bufferSize int64) { - return atomic.LoadInt64(&s.totalFrames), atomic.LoadInt64(&s.droppedFrames), atomic.LoadInt64(&s.bufferSize) -} - // AudioOutputClient provides audio output IPC client functionality type AudioOutputClient struct { droppedFrames int64 diff --git a/internal/audio/mgmt_input_ipc_manager.go b/internal/audio/mgmt_input_ipc_manager.go deleted file mode 100644 index acfdd89c..00000000 --- a/internal/audio/mgmt_input_ipc_manager.go +++ /dev/null @@ -1,365 +0,0 @@ -package audio - -import ( - "fmt" - "sync/atomic" - "time" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" -) - -// Component name constant for logging -const ( - AudioInputIPCComponent = "audio-input-ipc" -) - -// AudioInputIPCManager manages microphone input using IPC when enabled -type AudioInputIPCManager struct { - metrics AudioInputMetrics - - supervisor *AudioInputSupervisor - logger zerolog.Logger - running int32 - - // Connection monitoring and recovery - monitoringEnabled bool - lastConnectionCheck time.Time - connectionFailures int32 - recoveryInProgress int32 -} - -// NewAudioInputIPCManager creates a new IPC-based audio input manager -func NewAudioInputIPCManager() *AudioInputIPCManager { - return &AudioInputIPCManager{ - supervisor: GetAudioInputSupervisor(), // Use global shared supervisor - logger: logging.GetDefaultLogger().With().Str("component", AudioInputIPCComponent).Logger(), - } -} - -// Start starts the IPC-based audio input system -func (aim *AudioInputIPCManager) Start() error { - if !atomic.CompareAndSwapInt32(&aim.running, 0, 1) { - return nil - } - - aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("starting component") - - // Initialize connection monitoring - aim.monitoringEnabled = true - aim.lastConnectionCheck = time.Now() - atomic.StoreInt32(&aim.connectionFailures, 0) - atomic.StoreInt32(&aim.recoveryInProgress, 0) - - err := aim.supervisor.Start() - if err != nil { - // Ensure proper cleanup on supervisor start failure - atomic.StoreInt32(&aim.running, 0) - aim.monitoringEnabled = false - // Reset metrics on failed start - aim.resetMetrics() - aim.logger.Error().Err(err).Str("component", AudioInputIPCComponent).Msg("failed to start audio input supervisor") - return err - } - - config := UnifiedIPCConfig{ - SampleRate: Config.InputIPCSampleRate, - Channels: Config.InputIPCChannels, - FrameSize: Config.InputIPCFrameSize, - } - - // Validate configuration before using it - if err := ValidateInputIPCConfig(config.SampleRate, config.Channels, config.FrameSize); err != nil { - aim.logger.Warn().Err(err).Msg("invalid input IPC config from constants, using defaults") - // Use safe defaults if config validation fails - config = UnifiedIPCConfig{ - SampleRate: 48000, - Channels: 2, - FrameSize: 960, - } - } - - // Wait for subprocess readiness - time.Sleep(Config.LongSleepDuration) - - err = aim.supervisor.SendConfig(config) - if err != nil { - // Config send failure is not critical, log warning and continue - aim.logger.Warn().Err(err).Str("component", AudioInputIPCComponent).Msg("failed to send initial config, will retry later") - } - - aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("component started successfully") - return nil -} - -// Stop stops the IPC-based audio input system -func (aim *AudioInputIPCManager) Stop() { - if !atomic.CompareAndSwapInt32(&aim.running, 1, 0) { - return - } - - aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("stopping component") - - // Disable connection monitoring - aim.monitoringEnabled = false - - aim.supervisor.Stop() - aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("component stopped") -} - -// resetMetrics resets all metrics to zero -func (aim *AudioInputIPCManager) resetMetrics() { - atomic.StoreInt64(&aim.metrics.FramesSent, 0) - atomic.StoreInt64(&aim.metrics.FramesDropped, 0) - atomic.StoreInt64(&aim.metrics.BytesProcessed, 0) - atomic.StoreInt64(&aim.metrics.ConnectionDrops, 0) -} - -// WriteOpusFrame sends an Opus frame to the audio input server via IPC -func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error { - if atomic.LoadInt32(&aim.running) == 0 { - return nil // Not running, silently ignore - } - - if len(frame) == 0 { - return nil // Empty frame, ignore - } - - // Check connection health periodically - if aim.monitoringEnabled { - aim.checkConnectionHealth() - } - - // Validate frame data - if err := ValidateAudioFrame(frame); err != nil { - atomic.AddInt64(&aim.metrics.FramesDropped, 1) - aim.logger.Debug().Err(err).Msg("invalid frame data") - return err - } - - // Start latency measurement - startTime := time.Now() - - // Update metrics - atomic.AddInt64(&aim.metrics.FramesSent, 1) - atomic.AddInt64(&aim.metrics.BytesProcessed, int64(len(frame))) - aim.metrics.LastFrameTime = startTime - - // Send frame via IPC - err := aim.supervisor.SendFrame(frame) - if err != nil { - // Count as dropped frame - atomic.AddInt64(&aim.metrics.FramesDropped, 1) - - // Handle connection failure - if aim.monitoringEnabled { - aim.handleConnectionFailure(err) - } - - aim.logger.Debug().Err(err).Msg("failed to send frame via IPC") - return err - } - - // Reset connection failure counter on successful send - if aim.monitoringEnabled { - atomic.StoreInt32(&aim.connectionFailures, 0) - } - - // Calculate and update latency (end-to-end IPC transmission time) - latency := time.Since(startTime) - aim.updateLatencyMetrics(latency) - - return nil -} - -// WriteOpusFrameZeroCopy sends an Opus frame via IPC using zero-copy optimization -func (aim *AudioInputIPCManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame) error { - if atomic.LoadInt32(&aim.running) == 0 { - return nil // Not running, silently ignore - } - - if frame == nil || frame.Length() == 0 { - return nil // Empty frame, ignore - } - - // Validate zero-copy frame - if err := ValidateZeroCopyFrame(frame); err != nil { - atomic.AddInt64(&aim.metrics.FramesDropped, 1) - aim.logger.Debug().Err(err).Msg("invalid zero-copy frame") - return err - } - - // Start latency measurement - startTime := time.Now() - - // Update metrics - atomic.AddInt64(&aim.metrics.FramesSent, 1) - atomic.AddInt64(&aim.metrics.BytesProcessed, int64(frame.Length())) - aim.metrics.LastFrameTime = startTime - - // Send frame via IPC using zero-copy data - err := aim.supervisor.SendFrameZeroCopy(frame) - if err != nil { - // Count as dropped frame - atomic.AddInt64(&aim.metrics.FramesDropped, 1) - aim.logger.Debug().Err(err).Msg("failed to send zero-copy frame via IPC") - return err - } - - // Calculate and update latency (end-to-end IPC transmission time) - latency := time.Since(startTime) - aim.updateLatencyMetrics(latency) - - return nil -} - -// IsRunning returns whether the IPC manager is running -func (aim *AudioInputIPCManager) IsRunning() bool { - return atomic.LoadInt32(&aim.running) == 1 -} - -// IsReady returns whether the IPC manager is ready to receive frames -// This checks that the supervisor is connected to the audio input server -func (aim *AudioInputIPCManager) IsReady() bool { - if !aim.IsRunning() { - return false - } - return aim.supervisor.IsConnected() -} - -// GetMetrics returns current metrics -func (aim *AudioInputIPCManager) GetMetrics() AudioInputMetrics { - return AudioInputMetrics{ - FramesSent: atomic.LoadInt64(&aim.metrics.FramesSent), - BaseAudioMetrics: BaseAudioMetrics{ - FramesProcessed: atomic.LoadInt64(&aim.metrics.FramesProcessed), - FramesDropped: atomic.LoadInt64(&aim.metrics.FramesDropped), - BytesProcessed: atomic.LoadInt64(&aim.metrics.BytesProcessed), - ConnectionDrops: atomic.LoadInt64(&aim.metrics.ConnectionDrops), - AverageLatency: aim.metrics.AverageLatency, - LastFrameTime: aim.metrics.LastFrameTime, - }, - } -} - -// updateLatencyMetrics updates the latency metrics with exponential moving average -func (aim *AudioInputIPCManager) updateLatencyMetrics(latency time.Duration) { - // Use exponential moving average for smooth latency calculation - currentAvg := aim.metrics.AverageLatency - if currentAvg == 0 { - aim.metrics.AverageLatency = latency - } else { - // EMA with alpha = 0.1 for smooth averaging - aim.metrics.AverageLatency = time.Duration(float64(currentAvg)*0.9 + float64(latency)*0.1) - } -} - -// checkConnectionHealth monitors the IPC connection health -func (aim *AudioInputIPCManager) checkConnectionHealth() { - now := time.Now() - - // Check connection every 5 seconds - if now.Sub(aim.lastConnectionCheck) < 5*time.Second { - return - } - - aim.lastConnectionCheck = now - - // Check if supervisor and client are connected - if !aim.supervisor.IsConnected() { - aim.logger.Warn().Str("component", AudioInputIPCComponent).Msg("IPC connection lost, attempting recovery") - aim.handleConnectionFailure(fmt.Errorf("connection health check failed")) - } -} - -// handleConnectionFailure manages connection failure recovery -func (aim *AudioInputIPCManager) handleConnectionFailure(err error) { - // Increment failure counter - failures := atomic.AddInt32(&aim.connectionFailures, 1) - - // Prevent multiple concurrent recovery attempts - if !atomic.CompareAndSwapInt32(&aim.recoveryInProgress, 0, 1) { - return // Recovery already in progress - } - - // Start recovery in a separate goroutine to avoid blocking audio processing - go func() { - defer atomic.StoreInt32(&aim.recoveryInProgress, 0) - - aim.logger.Info(). - Int32("failures", failures). - Err(err). - Str("component", AudioInputIPCComponent). - Msg("attempting IPC connection recovery") - - // Stop and restart the supervisor to recover the connection - aim.supervisor.Stop() - - // Brief delay before restart - time.Sleep(100 * time.Millisecond) - - // Attempt to restart - if restartErr := aim.supervisor.Start(); restartErr != nil { - aim.logger.Error(). - Err(restartErr). - Str("component", AudioInputIPCComponent). - Msg("failed to recover IPC connection") - } else { - aim.logger.Info(). - Str("component", AudioInputIPCComponent). - Msg("IPC connection recovered successfully") - - // Reset failure counter on successful recovery - atomic.StoreInt32(&aim.connectionFailures, 0) - } - }() -} - -// GetDetailedMetrics returns comprehensive performance metrics -func (aim *AudioInputIPCManager) GetDetailedMetrics() (AudioInputMetrics, map[string]interface{}) { - metrics := aim.GetMetrics() - - // Get client frame statistics - client := aim.supervisor.GetClient() - totalFrames, droppedFrames := int64(0), int64(0) - dropRate := 0.0 - if client != nil { - totalFrames, droppedFrames = client.GetFrameStats() - dropRate = client.GetDropRate() - } - - // Get server statistics if available - serverStats := make(map[string]interface{}) - if aim.supervisor.IsRunning() { - serverStats["status"] = "running" - } else { - serverStats["status"] = "stopped" - } - - detailedStats := map[string]interface{}{ - "client_total_frames": totalFrames, - "client_dropped_frames": droppedFrames, - "client_drop_rate": dropRate, - "server_stats": serverStats, - "ipc_latency_ms": float64(metrics.AverageLatency.Nanoseconds()) / 1e6, - "frames_per_second": aim.calculateFrameRate(), - } - - return metrics, detailedStats -} - -// calculateFrameRate calculates the current frame rate -func (aim *AudioInputIPCManager) calculateFrameRate() float64 { - framesSent := atomic.LoadInt64(&aim.metrics.FramesSent) - if framesSent == 0 { - return 0.0 - } - - // Return typical Opus frame rate - return 50.0 -} - -// GetSupervisor returns the supervisor for advanced operations -func (aim *AudioInputIPCManager) GetSupervisor() *AudioInputSupervisor { - return aim.supervisor -} diff --git a/internal/audio/mgmt_output_ipc_manager.go b/internal/audio/mgmt_output_ipc_manager.go deleted file mode 100644 index 3d8dfac5..00000000 --- a/internal/audio/mgmt_output_ipc_manager.go +++ /dev/null @@ -1,207 +0,0 @@ -package audio - -import ( - "fmt" - "sync/atomic" - "time" - - "github.com/jetkvm/kvm/internal/logging" -) - -// Component name constant for logging -const ( - AudioOutputIPCComponent = "audio-output-ipc" -) - -// AudioOutputMetrics represents metrics for audio output operations -type AudioOutputMetrics struct { - // Atomic int64 field first for proper ARM32 alignment - FramesReceived int64 `json:"frames_received"` // Total frames received (output-specific) - - // Embedded struct with atomic fields properly aligned - BaseAudioMetrics -} - -// AudioOutputIPCManager manages audio output using IPC when enabled -type AudioOutputIPCManager struct { - *BaseAudioManager - server *AudioOutputServer -} - -// NewAudioOutputIPCManager creates a new IPC-based audio output manager -func NewAudioOutputIPCManager() *AudioOutputIPCManager { - return &AudioOutputIPCManager{ - BaseAudioManager: NewBaseAudioManager(logging.GetDefaultLogger().With().Str("component", AudioOutputIPCComponent).Logger()), - } -} - -// Start initializes and starts the audio output IPC manager -func (aom *AudioOutputIPCManager) Start() error { - aom.logComponentStart(AudioOutputIPCComponent) - - // Create and start the IPC server - server, err := NewAudioOutputServer() - if err != nil { - aom.logComponentError(AudioOutputIPCComponent, err, "failed to create IPC server") - return err - } - - if err := server.Start(); err != nil { - aom.logComponentError(AudioOutputIPCComponent, err, "failed to start IPC server") - return err - } - - aom.server = server - aom.setRunning(true) - aom.logComponentStarted(AudioOutputIPCComponent) - - // Send initial configuration - config := UnifiedIPCConfig{ - SampleRate: Config.SampleRate, - Channels: Config.Channels, - FrameSize: 20, // Fixed 20ms frame size for optimal audio - } - - if err := aom.SendConfig(config); err != nil { - aom.logger.Warn().Err(err).Msg("Failed to send initial configuration") - } - - return nil -} - -// Stop gracefully shuts down the audio output IPC manager -func (aom *AudioOutputIPCManager) Stop() { - aom.logComponentStop(AudioOutputIPCComponent) - - if aom.server != nil { - aom.server.Stop() - aom.server = nil - } - - aom.setRunning(false) - aom.resetMetrics() - aom.logComponentStopped(AudioOutputIPCComponent) -} - -// resetMetrics resets all metrics to zero -func (aom *AudioOutputIPCManager) resetMetrics() { - aom.BaseAudioManager.resetMetrics() -} - -// WriteOpusFrame sends an Opus frame to the output server -func (aom *AudioOutputIPCManager) WriteOpusFrame(frame *ZeroCopyAudioFrame) error { - if !aom.IsRunning() { - return fmt.Errorf("audio output IPC manager not running") - } - - if aom.server == nil { - return fmt.Errorf("audio output server not initialized") - } - - // Validate frame before processing - if err := ValidateZeroCopyFrame(frame); err != nil { - aom.logComponentError(AudioOutputIPCComponent, err, "Frame validation failed") - return fmt.Errorf("output frame validation failed: %w", err) - } - - // Send frame to IPC server - if err := aom.server.SendFrame(frame.Data()); err != nil { - return err - } - - return nil -} - -// WriteOpusFrameZeroCopy writes an Opus audio frame using zero-copy optimization -func (aom *AudioOutputIPCManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame) error { - if !aom.IsRunning() { - return fmt.Errorf("audio output IPC manager not running") - } - - if aom.server == nil { - return fmt.Errorf("audio output server not initialized") - } - - // Extract frame data - frameData := frame.Data() - - // Send frame to IPC server (zero-copy not available, use regular send) - if err := aom.server.SendFrame(frameData); err != nil { - return err - } - - return nil -} - -// IsReady returns true if the IPC manager is ready to process frames -func (aom *AudioOutputIPCManager) IsReady() bool { - return aom.IsRunning() && aom.server != nil -} - -// GetMetrics returns current audio output metrics -func (aom *AudioOutputIPCManager) GetMetrics() AudioOutputMetrics { - baseMetrics := aom.getBaseMetrics() - return AudioOutputMetrics{ - FramesReceived: atomic.LoadInt64(&baseMetrics.FramesProcessed), // For output, processed = received - BaseAudioMetrics: baseMetrics, - } -} - -// GetDetailedMetrics returns detailed metrics including server statistics -func (aom *AudioOutputIPCManager) GetDetailedMetrics() (AudioOutputMetrics, map[string]interface{}) { - metrics := aom.GetMetrics() - detailed := make(map[string]interface{}) - - if aom.server != nil { - total, dropped, bufferSize := aom.server.GetServerStats() - detailed["server_total_frames"] = total - detailed["server_dropped_frames"] = dropped - detailed["server_buffer_size"] = bufferSize - detailed["server_frame_rate"] = aom.calculateFrameRate() - } - - return metrics, detailed -} - -// calculateFrameRate calculates the current frame processing rate -func (aom *AudioOutputIPCManager) calculateFrameRate() float64 { - baseMetrics := aom.getBaseMetrics() - framesProcessed := atomic.LoadInt64(&baseMetrics.FramesProcessed) - if framesProcessed == 0 { - return 0.0 - } - - // Calculate rate based on last frame time - baseMetrics = aom.getBaseMetrics() - if baseMetrics.LastFrameTime.IsZero() { - return 0.0 - } - - elapsed := time.Since(baseMetrics.LastFrameTime) - if elapsed.Seconds() == 0 { - return 0.0 - } - - return float64(framesProcessed) / elapsed.Seconds() -} - -// SendConfig sends configuration to the IPC server -func (aom *AudioOutputIPCManager) SendConfig(config UnifiedIPCConfig) error { - if aom.server == nil { - return fmt.Errorf("audio output server not initialized") - } - - // Validate configuration parameters - if err := ValidateOutputIPCConfig(config.SampleRate, config.Channels, config.FrameSize); err != nil { - aom.logger.Error().Err(err).Msg("Configuration validation failed") - return fmt.Errorf("output configuration validation failed: %w", err) - } - - aom.logger.Info().Interface("config", config).Msg("configuration received") - return nil -} - -// GetServer returns the underlying IPC server (for testing) -func (aom *AudioOutputIPCManager) GetServer() *AudioOutputServer { - return aom.server -} diff --git a/internal/audio/output_server_main.go b/internal/audio/output_server_main.go deleted file mode 100644 index 2863fd8c..00000000 --- a/internal/audio/output_server_main.go +++ /dev/null @@ -1,99 +0,0 @@ -package audio - -import ( - "context" - "os" - "os/signal" - "strings" - "syscall" - "time" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" -) - -// getEnvInt reads an integer from environment variable with a default value - -// RunAudioOutputServer runs the audio output server subprocess -// This should be called from main() when the subprocess is detected -func RunAudioOutputServer() error { - logger := logging.GetSubsystemLogger("audio").With().Str("component", "audio-output-server").Logger() - - // Parse OPUS configuration from environment variables - bitrate, complexity, vbr, signalType, bandwidth, dtx := parseOpusConfig() - applyOpusConfig(bitrate, complexity, vbr, signalType, bandwidth, dtx, "audio-output-server", true) - - // Initialize validation cache for optimal performance - InitValidationCache() - - // Create audio server - server, err := NewAudioOutputServer() - if err != nil { - logger.Error().Err(err).Msg("failed to create audio server") - return err - } - defer server.Stop() - - // Start accepting connections - if err := server.Start(); err != nil { - logger.Error().Err(err).Msg("failed to start audio server") - return err - } - - // Initialize audio processing - err = StartNonBlockingAudioStreaming(func(frame []byte) { - if err := server.SendFrame(frame); err != nil { - logger.Warn().Err(err).Msg("failed to send audio frame") - RecordFrameDropped() - } - }) - if err != nil { - logger.Error().Err(err).Msg("failed to start audio processing") - return err - } - - logger.Info().Msg("audio output server started, waiting for connections") - - // Update C trace logging based on current audio scope log level (after environment variables are processed) - loggerTraceEnabled := logger.GetLevel() <= zerolog.TraceLevel - - // Manual check for audio scope in PION_LOG_TRACE (workaround for logging system bug) - manualTraceEnabled := false - pionTrace := os.Getenv("PION_LOG_TRACE") - if pionTrace != "" { - scopes := strings.Split(strings.ToLower(pionTrace), ",") - for _, scope := range scopes { - if strings.TrimSpace(scope) == "audio" { - manualTraceEnabled = true - break - } - } - } - - // Use manual check as fallback if logging system fails - traceEnabled := loggerTraceEnabled || manualTraceEnabled - - CGOSetTraceLogging(traceEnabled) - - // Set up signal handling for graceful shutdown - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - // Wait for shutdown signal - select { - case sig := <-sigChan: - logger.Info().Str("signal", sig.String()).Msg("received shutdown signal") - case <-ctx.Done(): - } - - // Graceful shutdown - StopNonBlockingAudioStreaming() - - // Give some time for cleanup - time.Sleep(Config.DefaultSleepDuration) - - return nil -} diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go deleted file mode 100644 index c2d952ce..00000000 --- a/internal/audio/output_streaming.go +++ /dev/null @@ -1,194 +0,0 @@ -//go:build cgo -// +build cgo - -package audio - -import ( - "context" - "fmt" - "strings" - "sync/atomic" - "time" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" -) - -var ( - outputStreamingRunning int32 - outputStreamingCancel context.CancelFunc - outputStreamingLogger *zerolog.Logger -) - -func getOutputStreamingLogger() *zerolog.Logger { - if outputStreamingLogger == nil { - logger := logging.GetDefaultLogger().With().Str("component", "audio-output-streaming").Logger() - outputStreamingLogger = &logger - } - return outputStreamingLogger -} - -// StartAudioOutputStreaming starts audio output streaming (capturing system audio) -func StartAudioOutputStreaming(send func([]byte)) error { - if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) { - return ErrAudioAlreadyRunning - } - - // Initialize CGO audio capture with retry logic - var initErr error - for attempt := 0; attempt < 3; attempt++ { - if initErr = CGOAudioInit(); initErr == nil { - break - } - getOutputStreamingLogger().Warn().Err(initErr).Int("attempt", attempt+1).Msg("Audio initialization failed, retrying") - time.Sleep(time.Duration(attempt+1) * 100 * time.Millisecond) - } - if initErr != nil { - atomic.StoreInt32(&outputStreamingRunning, 0) - return fmt.Errorf("failed to initialize audio after 3 attempts: %w", initErr) - } - - ctx, cancel := context.WithCancel(context.Background()) - outputStreamingCancel = cancel - - // Start audio capture loop - go func() { - defer func() { - CGOAudioClose() - atomic.StoreInt32(&outputStreamingRunning, 0) - getOutputStreamingLogger().Info().Msg("Audio output streaming stopped") - }() - - getOutputStreamingLogger().Info().Str("socket_path", getOutputSocketPath()).Msg("Audio output streaming started, connected to output server") - buffer := make([]byte, GetMaxAudioFrameSize()) - - consecutiveErrors := 0 - maxConsecutiveErrors := Config.MaxConsecutiveErrors - errorBackoffDelay := Config.RetryDelay - maxErrorBackoff := Config.MaxRetryDelay - var frameCount int64 - - for { - select { - case <-ctx.Done(): - return - default: - // Capture audio frame with enhanced error handling and initialization checking - n, err := CGOAudioReadEncode(buffer) - if err != nil { - consecutiveErrors++ - getOutputStreamingLogger().Warn(). - Err(err). - Int("consecutive_errors", consecutiveErrors). - Msg("Failed to read/encode audio") - - // Check if this is an initialization error (C error code -1) - if strings.Contains(err.Error(), "C error code -1") { - getOutputStreamingLogger().Error().Msg("Audio system not initialized properly, forcing reinitialization") - // Force immediate reinitialization for init errors - consecutiveErrors = maxConsecutiveErrors - } - - // Implement progressive backoff for consecutive errors - if consecutiveErrors >= maxConsecutiveErrors { - getOutputStreamingLogger().Error(). - Int("consecutive_errors", consecutiveErrors). - Msg("Too many consecutive audio errors, attempting recovery") - - // Try to reinitialize audio system - CGOAudioClose() - time.Sleep(errorBackoffDelay) - if initErr := CGOAudioInit(); initErr != nil { - getOutputStreamingLogger().Error(). - Err(initErr). - Msg("Failed to reinitialize audio system") - // Exponential backoff for reinitialization failures - errorBackoffDelay = time.Duration(float64(errorBackoffDelay) * Config.BackoffMultiplier) - if errorBackoffDelay > maxErrorBackoff { - errorBackoffDelay = maxErrorBackoff - } - } else { - getOutputStreamingLogger().Info().Msg("Audio system reinitialized successfully") - consecutiveErrors = 0 - errorBackoffDelay = Config.RetryDelay // Reset backoff - } - } else { - // Brief delay for transient errors - time.Sleep(Config.ShortSleepDuration) - } - continue - } - - // Success - reset error counters - if consecutiveErrors > 0 { - consecutiveErrors = 0 - errorBackoffDelay = Config.RetryDelay - } - - if n > 0 { - frameCount++ - - // Get frame buffer from pool to reduce allocations - frame := GetAudioFrameBuffer() - frame = frame[:n] // Resize to actual frame size - copy(frame, buffer[:n]) - - // Zero-cost trace logging for output frame processing - logger := getOutputStreamingLogger() - if logger.GetLevel() <= zerolog.TraceLevel { - if frameCount <= 5 || frameCount%1000 == 1 { - logger.Trace(). - Int("frame_size", n). - Int("buffer_capacity", cap(frame)). - Int64("total_frames_sent", frameCount). - Msg("Audio output frame captured and buffered") - } - } - - // Validate frame before sending - if err := ValidateAudioFrame(frame); err != nil { - getOutputStreamingLogger().Warn().Err(err).Msg("Frame validation failed, dropping frame") - PutAudioFrameBuffer(frame) - continue - } - - send(frame) - // Return buffer to pool after sending - PutAudioFrameBuffer(frame) - RecordFrameReceived(n) - - // Zero-cost trace logging for successful frame transmission - if logger.GetLevel() <= zerolog.TraceLevel { - if frameCount <= 5 || frameCount%1000 == 1 { - logger.Trace(). - Int("frame_size", n). - Int64("total_frames_sent", frameCount). - Msg("Audio output frame sent successfully") - } - } - } - // Small delay to prevent busy waiting - time.Sleep(Config.ShortSleepDuration) - } - } - }() - - return nil -} - -// StopAudioOutputStreaming stops audio output streaming -func StopAudioOutputStreaming() { - if atomic.LoadInt32(&outputStreamingRunning) == 0 { - return - } - - if outputStreamingCancel != nil { - outputStreamingCancel() - outputStreamingCancel = nil - } - - // Wait for streaming to stop - for atomic.LoadInt32(&outputStreamingRunning) == 1 { - time.Sleep(Config.ShortSleepDuration) - } -} diff --git a/internal/audio/quality_presets.go b/internal/audio/quality_presets.go deleted file mode 100644 index 25cf603f..00000000 --- a/internal/audio/quality_presets.go +++ /dev/null @@ -1,152 +0,0 @@ -//go:build cgo -// +build cgo - -// Package audio provides real-time audio processing for JetKVM with low-latency streaming. -// -// Key components: output/input pipelines with Opus codec, buffer management, -// zero-copy frame pools, IPC communication, and process supervision. -// -// Optimized for S16_LE @ 48kHz stereo HDMI audio with minimal CPU usage. -// All APIs are thread-safe with comprehensive error handling and metrics collection. -// -// # Performance Characteristics -// -// Designed for embedded ARM systems with limited resources: -// - Sub-50ms end-to-end latency under normal conditions -// - Memory usage scales with buffer configuration -// - CPU usage optimized through zero-copy operations and complexity=1 Opus -// - Fixed optimal configuration (96 kbps output, 48 kbps input) -// -// # Usage Example -// -// config := GetAudioConfig() -// -// // Audio output will automatically start when frames are received -package audio - -import ( - "errors" - "sync/atomic" - "time" -) - -var ( - ErrAudioAlreadyRunning = errors.New("audio already running") -) - -// MaxAudioFrameSize is now retrieved from centralized config -func GetMaxAudioFrameSize() int { - return Config.MaxAudioFrameSize -} - -// AudioConfig holds the optimal audio configuration -// All settings are fixed for S16_LE @ 48kHz HDMI audio -type AudioConfig struct { - Bitrate int // kbps (96 for output, 48 for input) - SampleRate int // Hz (always 48000) - Channels int // 2 for output (stereo), 1 for input (mono) - FrameSize time.Duration // ms (always 20ms) -} - -// AudioMetrics tracks audio performance metrics -type AudioMetrics struct { - FramesReceived uint64 - FramesDropped uint64 - BytesProcessed uint64 - ConnectionDrops uint64 - LastFrameTime time.Time - AverageLatency time.Duration -} - -var ( - // Optimal configuration for audio output (HDMI → client) - currentConfig = AudioConfig{ - Bitrate: Config.OptimalOutputBitrate, - SampleRate: Config.SampleRate, - Channels: Config.Channels, - FrameSize: 20 * time.Millisecond, - } - // Optimal configuration for microphone input (client → target) - currentMicrophoneConfig = AudioConfig{ - Bitrate: Config.OptimalInputBitrate, - SampleRate: Config.SampleRate, - Channels: 1, - FrameSize: 20 * time.Millisecond, - } - metrics AudioMetrics -) - -// GetAudioConfig returns the current optimal audio configuration -func GetAudioConfig() AudioConfig { - return currentConfig -} - -// GetMicrophoneConfig returns the current optimal microphone configuration -func GetMicrophoneConfig() AudioConfig { - return currentMicrophoneConfig -} - -// GetGlobalAudioMetrics returns the current global audio metrics -func GetGlobalAudioMetrics() AudioMetrics { - return metrics -} - -// Batched metrics to reduce atomic operations frequency -var ( - batchedFramesReceived uint64 - batchedBytesProcessed uint64 - batchedFramesDropped uint64 - batchedConnectionDrops uint64 - - lastFlushTime int64 // Unix timestamp in nanoseconds -) - -// RecordFrameReceived increments the frames received counter with batched updates -func RecordFrameReceived(bytes int) { - // Use local batching to reduce atomic operations frequency - atomic.AddUint64(&batchedBytesProcessed, uint64(bytes)) - - // Update timestamp immediately for accurate tracking - metrics.LastFrameTime = time.Now() -} - -// RecordFrameDropped increments the frames dropped counter with batched updates -func RecordFrameDropped() { - atomic.AddUint64(&batchedFramesDropped, 1) -} - -// RecordConnectionDrop increments the connection drops counter with batched updates -func RecordConnectionDrop() { - atomic.AddUint64(&batchedConnectionDrops, 1) -} - -// flushBatchedMetrics flushes accumulated metrics to the main counters -func flushBatchedMetrics() { - // Atomically move batched metrics to main metrics - framesReceived := atomic.SwapUint64(&batchedFramesReceived, 0) - bytesProcessed := atomic.SwapUint64(&batchedBytesProcessed, 0) - framesDropped := atomic.SwapUint64(&batchedFramesDropped, 0) - connectionDrops := atomic.SwapUint64(&batchedConnectionDrops, 0) - - // Update main metrics if we have any batched data - if framesReceived > 0 { - atomic.AddUint64(&metrics.FramesReceived, framesReceived) - } - if bytesProcessed > 0 { - atomic.AddUint64(&metrics.BytesProcessed, bytesProcessed) - } - if framesDropped > 0 { - atomic.AddUint64(&metrics.FramesDropped, framesDropped) - } - if connectionDrops > 0 { - atomic.AddUint64(&metrics.ConnectionDrops, connectionDrops) - } - - // Update last flush time - atomic.StoreInt64(&lastFlushTime, time.Now().UnixNano()) -} - -// FlushPendingMetrics forces a flush of all batched metrics -func FlushPendingMetrics() { - flushBatchedMetrics() -} diff --git a/internal/audio/relay_api.go b/internal/audio/relay_api.go index 6feb07e0..666cb69e 100644 --- a/internal/audio/relay_api.go +++ b/internal/audio/relay_api.go @@ -27,9 +27,6 @@ func StartAudioRelay(audioTrack AudioTrackWriter) error { // Create new relay relay := NewAudioRelay() - // Get current audio config - config := GetAudioConfig() - // Retry starting the relay with exponential backoff // This handles cases where the subprocess hasn't created its socket yet maxAttempts := 5 @@ -38,7 +35,7 @@ func StartAudioRelay(audioTrack AudioTrackWriter) error { var lastErr error for i := 0; i < maxAttempts; i++ { - if err := relay.Start(audioTrack, config); err != nil { + if err := relay.Start(audioTrack); err != nil { lastErr = err if i < maxAttempts-1 { // Calculate exponential backoff delay @@ -122,8 +119,7 @@ func UpdateAudioRelayTrack(audioTrack AudioTrackWriter) error { if globalRelay == nil { // No relay running, start one with the provided track relay := NewAudioRelay() - config := GetAudioConfig() - if err := relay.Start(audioTrack, config); err != nil { + if err := relay.Start(audioTrack); err != nil { relayMutex.Unlock() return err } diff --git a/internal/audio/rpc_handlers.go b/internal/audio/rpc_handlers.go index b56759fd..b19738db 100644 --- a/internal/audio/rpc_handlers.go +++ b/internal/audio/rpc_handlers.go @@ -29,14 +29,6 @@ func RPCAudioMute(muted bool) error { return service.MuteAudio(muted) } -// RPCAudioQuality is deprecated - quality is now fixed at optimal settings -// Returns current config for backward compatibility -func RPCAudioQuality(quality int) (map[string]any, error) { - // Quality is now fixed - return current optimal configuration - currentConfig := GetAudioConfig() - return map[string]any{"config": currentConfig}, nil -} - // RPCMicrophoneStart handles microphone start RPC requests func RPCMicrophoneStart() error { if getAudioControlServiceFunc == nil { @@ -73,19 +65,6 @@ func RPCAudioStatus() (map[string]interface{}, error) { return service.GetAudioStatus(), nil } -// RPCAudioQualityPresets is deprecated - returns single optimal configuration -// Kept for backward compatibility with UI -func RPCAudioQualityPresets() (map[string]any, error) { - // Return single optimal configuration as both preset and current - current := GetAudioConfig() - - // Return empty presets map (UI will handle this gracefully) - return map[string]any{ - "presets": map[string]any{}, - "current": current, - }, nil -} - // RPCMicrophoneStatus handles microphone status RPC requests (read-only) func RPCMicrophoneStatus() (map[string]interface{}, error) { if getAudioControlServiceFunc == nil { diff --git a/internal/audio/supervisor_api.go b/internal/audio/supervisor_api.go index 5d9fe5fa..4980a4c0 100644 --- a/internal/audio/supervisor_api.go +++ b/internal/audio/supervisor_api.go @@ -1,8 +1,6 @@ package audio import ( - "os" - "strings" "sync/atomic" "unsafe" ) @@ -12,51 +10,6 @@ var ( globalInputSupervisor unsafe.Pointer // *AudioInputSupervisor ) -// isAudioServerProcess detects if we're running as the audio server subprocess -func isAudioServerProcess() bool { - for _, arg := range os.Args { - if strings.Contains(arg, "--audio-output-server") { - return true - } - } - return false -} - -// StartAudioStreaming launches the audio stream. -// In audio server subprocess: uses CGO-based audio streaming -// In main process: this should not be called (use StartAudioRelay instead) -func StartAudioStreaming(send func([]byte)) error { - if isAudioServerProcess() { - // Audio server subprocess: use CGO audio processing - return StartAudioOutputStreaming(send) - } else { - // Main process: should use relay system instead - // This is kept for backward compatibility but not recommended - return StartAudioOutputStreaming(send) - } -} - -// StopAudioStreaming stops the audio stream. -func StopAudioStreaming() { - if isAudioServerProcess() { - // Audio server subprocess: stop CGO audio processing - StopAudioOutputStreaming() - } else { - // Main process: stop relay if running - StopAudioRelay() - } -} - -// StartNonBlockingAudioStreaming is an alias for backward compatibility -func StartNonBlockingAudioStreaming(send func([]byte)) error { - return StartAudioOutputStreaming(send) -} - -// StopNonBlockingAudioStreaming is an alias for backward compatibility -func StopNonBlockingAudioStreaming() { - StopAudioOutputStreaming() -} - // SetAudioOutputSupervisor sets the global audio output supervisor func SetAudioOutputSupervisor(supervisor *AudioOutputSupervisor) { atomic.StorePointer(&globalOutputSupervisor, unsafe.Pointer(supervisor)) diff --git a/internal/audio/webrtc_relay.go b/internal/audio/webrtc_relay.go index a8c37a19..2784cfc0 100644 --- a/internal/audio/webrtc_relay.go +++ b/internal/audio/webrtc_relay.go @@ -31,7 +31,6 @@ type AudioRelay struct { // WebRTC integration audioTrack AudioTrackWriter - config AudioConfig muted bool } @@ -49,12 +48,12 @@ func NewAudioRelay() *AudioRelay { ctx: ctx, cancel: cancel, logger: &logger, - bufferPool: NewAudioBufferPool(GetMaxAudioFrameSize()), + bufferPool: NewAudioBufferPool(Config.MaxAudioFrameSize), } } // Start begins the audio relay process -func (r *AudioRelay) Start(audioTrack AudioTrackWriter, config AudioConfig) error { +func (r *AudioRelay) Start(audioTrack AudioTrackWriter) error { r.mutex.Lock() defer r.mutex.Unlock() @@ -66,7 +65,6 @@ func (r *AudioRelay) Start(audioTrack AudioTrackWriter, config AudioConfig) erro client := NewAudioOutputClient() r.client = client r.audioTrack = audioTrack - r.config = config // Connect to the audio output server if err := client.Connect(); err != nil { @@ -189,7 +187,6 @@ func (r *AudioRelay) forwardToWebRTC(frame []byte) error { defer r.mutex.RUnlock() audioTrack := r.audioTrack - config := r.config muted := r.muted // Comprehensive nil check for audioTrack to prevent panic @@ -218,9 +215,10 @@ func (r *AudioRelay) forwardToWebRTC(frame []byte) error { } // Write sample to WebRTC track while holding the read lock + // Frame size is fixed at 20ms for HDMI audio return audioTrack.WriteSample(media.Sample{ Data: sampleData, - Duration: config.FrameSize, + Duration: 20 * time.Millisecond, }) } diff --git a/internal/audio/zero_copy.go b/internal/audio/zero_copy.go index 38c57592..9af02302 100644 --- a/internal/audio/zero_copy.go +++ b/internal/audio/zero_copy.go @@ -357,7 +357,7 @@ type ZeroCopyFramePoolStats struct { } var ( - globalZeroCopyPool = NewZeroCopyFramePool(GetMaxAudioFrameSize()) + globalZeroCopyPool = NewZeroCopyFramePool(Config.MaxAudioFrameSize) ) // GetZeroCopyFrame gets a frame from the global pool @@ -375,36 +375,3 @@ func PutZeroCopyFrame(frame *ZeroCopyAudioFrame) { globalZeroCopyPool.Put(frame) } -// ZeroCopyAudioReadEncode performs audio read and encode with zero-copy optimization -func ZeroCopyAudioReadEncode() (*ZeroCopyAudioFrame, error) { - frame := GetZeroCopyFrame() - - maxFrameSize := GetMaxAudioFrameSize() - // Ensure frame has enough capacity - if frame.Capacity() < maxFrameSize { - // Reallocate if needed - frame.data = make([]byte, maxFrameSize) - frame.capacity = maxFrameSize - frame.pooled = false - } - - // Use unsafe pointer for direct CGO call - n, err := CGOAudioReadEncode(frame.data[:maxFrameSize]) - if err != nil { - PutZeroCopyFrame(frame) - return nil, err - } - - if n == 0 { - PutZeroCopyFrame(frame) - return nil, nil - } - - // Set the actual data length - frame.mutex.Lock() - frame.length = n - frame.data = frame.data[:n] - frame.mutex.Unlock() - - return frame, nil -} diff --git a/jsonrpc.go b/jsonrpc.go index 918d59d6..c6025865 100644 --- a/jsonrpc.go +++ b/jsonrpc.go @@ -1322,10 +1322,6 @@ func rpcAudioMute(muted bool) error { return audio.RPCAudioMute(muted) } -func rpcAudioQuality(quality int) (map[string]any, error) { - return audio.RPCAudioQuality(quality) -} - func rpcMicrophoneStart() error { return audio.RPCMicrophoneStart() } @@ -1338,10 +1334,6 @@ func rpcAudioStatus() (map[string]interface{}, error) { return audio.RPCAudioStatus() } -func rpcAudioQualityPresets() (map[string]any, error) { - return audio.RPCAudioQualityPresets() -} - func rpcMicrophoneStatus() (map[string]interface{}, error) { return audio.RPCMicrophoneStatus() } @@ -1405,9 +1397,7 @@ var rpcHandlers = map[string]RPCHandler{ "getUsbEmulationState": {Func: rpcGetUsbEmulationState}, "setUsbEmulationState": {Func: rpcSetUsbEmulationState, Params: []string{"enabled"}}, "audioMute": {Func: rpcAudioMute, Params: []string{"muted"}}, - "audioQuality": {Func: rpcAudioQuality, Params: []string{"quality"}}, "audioStatus": {Func: rpcAudioStatus}, - "audioQualityPresets": {Func: rpcAudioQualityPresets}, "microphoneStart": {Func: rpcMicrophoneStart}, "microphoneStop": {Func: rpcMicrophoneStop}, "microphoneStatus": {Func: rpcMicrophoneStatus}, diff --git a/main.go b/main.go index 5675a2ea..c079d5ed 100644 --- a/main.go +++ b/main.go @@ -16,7 +16,6 @@ import ( var ( appCtx context.Context - isAudioServer bool audioProcessDone chan struct{} audioSupervisor *audio.AudioOutputSupervisor ) @@ -126,30 +125,8 @@ func startAudioSubprocess() error { return nil } -func Main(audioServer bool, audioInputServer bool) { - // Initialize channel and set audio server flag - isAudioServer = audioServer +func Main() { audioProcessDone = make(chan struct{}) - - // If running as audio server, only initialize audio processing - if isAudioServer { - err := audio.RunAudioOutputServer() - if err != nil { - logger.Error().Err(err).Msg("audio output server failed") - os.Exit(1) - } - return - } - - // If running as audio input server, only initialize audio input processing - if audioInputServer { - err := audio.RunAudioInputServer() - if err != nil { - logger.Error().Err(err).Msg("audio input server failed") - os.Exit(1) - } - return - } LoadConfig() var cancel context.CancelFunc @@ -274,16 +251,12 @@ func Main(audioServer bool, audioInputServer bool) { <-sigs logger.Info().Msg("JetKVM Shutting Down") - // Stop audio subprocess and wait for cleanup - if !isAudioServer { - if audioSupervisor != nil { - logger.Info().Msg("stopping audio supervisor") - audioSupervisor.Stop() - } - <-audioProcessDone - } else { - audio.StopNonBlockingAudioStreaming() + // Stop audio supervisor and wait for cleanup + if audioSupervisor != nil { + logger.Info().Msg("stopping audio supervisor") + audioSupervisor.Stop() } + <-audioProcessDone //if fuseServer != nil { // err := setMassStorageImage(" ") // if err != nil {