Compare commits

..

No commits in common. "274854b19861e1742453b00ee420e10ed277c04f" and "432303e22830fed4b9a7e7344ec0162c654db6c7" have entirely different histories.

4 changed files with 73 additions and 34 deletions

View File

@ -94,25 +94,28 @@ func cgoAudioInit() error {
cache := GetCachedConfig()
cache.Update()
// Enable C trace logging if Go audio scope trace level is active
// Enable C trace logging if Go audio scope trace level is active
audioLogger := logging.GetSubsystemLogger("audio")
loggerTraceEnabled := audioLogger.GetLevel() <= zerolog.TraceLevel
// Manual check for audio scope in PION_LOG_TRACE (workaround for logging system bug)
traceEnabled := loggerTraceEnabled
if !loggerTraceEnabled {
pionTrace := os.Getenv("PION_LOG_TRACE")
if pionTrace != "" {
scopes := strings.Split(strings.ToLower(pionTrace), ",")
for _, scope := range scopes {
if strings.TrimSpace(scope) == "audio" {
traceEnabled = true
break
}
manualTraceEnabled := false
pionTrace := os.Getenv("PION_LOG_TRACE")
if pionTrace != "" {
scopes := strings.Split(strings.ToLower(pionTrace), ",")
for _, scope := range scopes {
if strings.TrimSpace(scope) == "audio" {
manualTraceEnabled = true
break
}
}
}
// Use manual check as fallback if logging system fails
traceEnabled := loggerTraceEnabled || manualTraceEnabled
CGOSetTraceLogging(traceEnabled)
// Update C constants from cached config (atomic access, no locks)
@ -147,17 +150,15 @@ func cgoAudioClose() {
// AudioConfigCache provides a comprehensive caching system for audio configuration
type AudioConfigCache struct {
// All duration fields use int32 by storing as milliseconds for optimal ARM NEON performance
maxMetricsUpdateInterval atomic.Int32 // Store as milliseconds (10s = 10K ms < int32 max)
restartWindow atomic.Int32 // Store as milliseconds (5min = 300K ms < int32 max)
restartDelay atomic.Int32 // Store as milliseconds
maxRestartDelay atomic.Int32 // Store as milliseconds
// Short-duration fields stored as milliseconds with int32
minFrameDuration atomic.Int32 // Store as milliseconds (10ms = 10 ms < int32 max)
maxFrameDuration atomic.Int32 // Store as milliseconds (100ms = 100 ms < int32 max)
maxLatency atomic.Int32 // Store as milliseconds (500ms = 500 ms < int32 max)
minMetricsUpdateInterval atomic.Int32 // Store as milliseconds (100ms = 100 ms < int32 max)
// Atomic int64 fields MUST be first for ARM32 alignment (8-byte alignment required)
minFrameDuration atomic.Int64 // Store as nanoseconds
maxFrameDuration atomic.Int64 // Store as nanoseconds
maxLatency atomic.Int64 // Store as nanoseconds
minMetricsUpdateInterval atomic.Int64 // Store as nanoseconds
maxMetricsUpdateInterval atomic.Int64 // Store as nanoseconds
restartWindow atomic.Int64 // Store as nanoseconds
restartDelay atomic.Int64 // Store as nanoseconds
maxRestartDelay atomic.Int64 // Store as nanoseconds
// Atomic int32 fields for lock-free access to frequently used values
minReadEncodeBuffer atomic.Int32
@ -245,16 +246,8 @@ func (c *AudioConfigCache) Update() {
// Update additional validation values
c.maxAudioFrameSize.Store(int32(Config.MaxAudioFrameSize))
c.maxChannels.Store(int32(Config.MaxChannels))
// Store duration fields as milliseconds for int32 optimization
c.minFrameDuration.Store(int32(Config.MinFrameDuration / time.Millisecond))
c.maxFrameDuration.Store(int32(Config.MaxFrameDuration / time.Millisecond))
c.maxLatency.Store(int32(Config.MaxLatency / time.Millisecond))
c.minMetricsUpdateInterval.Store(int32(Config.MinMetricsUpdateInterval / time.Millisecond))
c.maxMetricsUpdateInterval.Store(int32(Config.MaxMetricsUpdateInterval / time.Millisecond))
c.restartWindow.Store(int32(Config.RestartWindow / time.Millisecond))
c.restartDelay.Store(int32(Config.RestartDelay / time.Millisecond))
c.maxRestartDelay.Store(int32(Config.MaxRestartDelay / time.Millisecond))
c.minFrameDuration.Store(int64(Config.MinFrameDuration))
c.maxFrameDuration.Store(int64(Config.MaxFrameDuration))
c.minOpusBitrate.Store(int32(Config.MinOpusBitrate))
c.maxOpusBitrate.Store(int32(Config.MaxOpusBitrate))
@ -305,6 +298,20 @@ func (c *AudioConfigCache) GetBufferTooLargeError() error {
return c.bufferTooLargeDecodeWrite
}
// updateCacheIfNeeded updates cache only if expired to avoid overhead
func updateCacheIfNeeded(cache *AudioConfigCache) {
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
cache.mutex.RUnlock()
if cacheExpired {
cache.Update()
}
} else {
cache.Update()
}
}
func cgoAudioReadEncode(buf []byte) (int, error) {
// Minimal buffer validation - assume caller provides correct size
if len(buf) == 0 {
@ -403,6 +410,7 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) {
return n, nil
}
// Error handling with static errors
audioDecodeWriteFailures.Add(1)
var errMsg string
var err error
@ -472,7 +480,7 @@ func ReturnBufferToPool(buf []byte) {
// ReadEncodeWithPooledBuffer reads audio data and encodes it using a buffer from the pool
func ReadEncodeWithPooledBuffer() ([]byte, int, error) {
cache := GetCachedConfig()
cache.Update()
updateCacheIfNeeded(cache)
bufferSize := cache.GetMinReadEncodeBuffer()
if bufferSize == 0 {
@ -496,7 +504,7 @@ func DecodeWriteWithPooledBuffer(data []byte) (int, error) {
}
cache := GetCachedConfig()
cache.Update()
updateCacheIfNeeded(cache)
maxPacketSize := cache.GetMaxPacketSize()
if len(data) > maxPacketSize {
@ -544,7 +552,18 @@ func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, err
// Get cached config
cache := GetCachedConfig()
cache.Update()
// Only update cache if expired - avoid unnecessary overhead
// Use proper locking to avoid race condition
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
cache.mutex.RUnlock()
if cacheExpired {
cache.Update()
}
} else {
cache.Update()
}
// Ensure data doesn't exceed max packet size
maxPacketSize := cache.GetMaxPacketSize()
@ -555,6 +574,8 @@ func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, err
return 0, newBufferTooLargeError(len(opusData), maxPacketSize)
}
// Metrics tracking only - detailed logging handled at application level
// Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is never nil for non-empty slices
n := int(C.jetkvm_audio_decode_write(unsafe.Pointer(&opusData[0]), C.int(len(opusData))))
@ -564,6 +585,7 @@ func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, err
return n, nil
}
// Handle error cases with static error codes to reduce allocations
audioDecodeWriteFailures.Add(1)
var errMsg string
var err error
@ -586,6 +608,8 @@ func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, err
return 0, err
}
// Optimized CGO function aliases - use direct function calls to reduce overhead
// These are now direct function aliases instead of variable assignments
func CGOAudioInit() error { return cgoAudioInit() }
func CGOAudioClose() { cgoAudioClose() }
func CGOAudioReadEncode(buf []byte) (int, error) { return cgoAudioReadEncode(buf) }

View File

@ -110,6 +110,8 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error {
aim.logger.Warn().
Float64("latency_ms", latencyMs).
Msg("High audio processing latency detected")
// Record latency for goroutine cleanup optimization
}
if err != nil {
@ -146,6 +148,8 @@ func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame)
aim.logger.Warn().
Float64("latency_ms", latencyMs).
Msg("High audio processing latency detected")
// Record latency for goroutine cleanup optimization
}
if err != nil {

View File

@ -139,6 +139,8 @@ func (ais *AudioInputSupervisor) startProcess() error {
ais.processPID = ais.cmd.Process.Pid
ais.logger.Info().Int("pid", ais.processPID).Strs("args", args).Strs("opus_env", ais.opusEnv).Msg("audio input server process started")
// Add process to monitoring
// Connect client to the server synchronously to avoid race condition
ais.connectClient()

View File

@ -3,6 +3,7 @@ package kvm
import (
"bufio"
"io"
"runtime"
"strconv"
"strings"
"time"
@ -141,6 +142,10 @@ func unmountDCControl() error {
var dcState DCPowerState
func runDCControl() {
// Lock to OS thread to isolate DC control serial I/O
runtime.LockOSThread()
defer runtime.UnlockOSThread()
scopedLogger := serialLogger.With().Str("service", "dc_control").Logger()
reader := bufio.NewReader(port)
hasRestoreFeature := false
@ -290,6 +295,10 @@ func handleSerialChannel(d *webrtc.DataChannel) {
d.OnOpen(func() {
go func() {
// Lock to OS thread to isolate serial I/O
runtime.LockOSThread()
defer runtime.UnlockOSThread()
buf := make([]byte, 1024)
for {
n, err := port.Read(buf)