mirror of https://github.com/jetkvm/kvm.git
Fix: linter errors
This commit is contained in:
parent
3c1e9b8dc2
commit
1e1677b35a
|
@ -16,9 +16,9 @@ import (
|
||||||
const (
|
const (
|
||||||
inputMagicNumber uint32 = 0x4A4B4D49 // "JKMI" (JetKVM Microphone Input)
|
inputMagicNumber uint32 = 0x4A4B4D49 // "JKMI" (JetKVM Microphone Input)
|
||||||
inputSocketName = "audio_input.sock"
|
inputSocketName = "audio_input.sock"
|
||||||
maxFrameSize = 4096 // Maximum Opus frame size
|
maxFrameSize = 4096 // Maximum Opus frame size
|
||||||
writeTimeout = 5 * time.Millisecond // Non-blocking write timeout
|
writeTimeout = 5 * time.Millisecond // Non-blocking write timeout
|
||||||
maxDroppedFrames = 100 // Maximum consecutive dropped frames before reconnect
|
maxDroppedFrames = 100 // Maximum consecutive dropped frames before reconnect
|
||||||
)
|
)
|
||||||
|
|
||||||
// InputMessageType represents the type of IPC message
|
// InputMessageType represents the type of IPC message
|
||||||
|
@ -55,17 +55,17 @@ type AudioInputServer struct {
|
||||||
processingTime int64 // Average processing time in nanoseconds (atomic)
|
processingTime int64 // Average processing time in nanoseconds (atomic)
|
||||||
droppedFrames int64 // Dropped frames counter (atomic)
|
droppedFrames int64 // Dropped frames counter (atomic)
|
||||||
totalFrames int64 // Total frames counter (atomic)
|
totalFrames int64 // Total frames counter (atomic)
|
||||||
|
|
||||||
listener net.Listener
|
listener net.Listener
|
||||||
conn net.Conn
|
conn net.Conn
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
running bool
|
running bool
|
||||||
|
|
||||||
// Triple-goroutine architecture
|
// Triple-goroutine architecture
|
||||||
messageChan chan *InputIPCMessage // Buffered channel for incoming messages
|
messageChan chan *InputIPCMessage // Buffered channel for incoming messages
|
||||||
processChan chan *InputIPCMessage // Buffered channel for processing queue
|
processChan chan *InputIPCMessage // Buffered channel for processing queue
|
||||||
stopChan chan struct{} // Stop signal for all goroutines
|
stopChan chan struct{} // Stop signal for all goroutines
|
||||||
wg sync.WaitGroup // Wait group for goroutine coordination
|
wg sync.WaitGroup // Wait group for goroutine coordination
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAudioInputServer creates a new audio input server
|
// NewAudioInputServer creates a new audio input server
|
||||||
|
@ -315,10 +315,10 @@ type AudioInputClient struct {
|
||||||
// Atomic fields must be first for proper alignment on ARM
|
// Atomic fields must be first for proper alignment on ARM
|
||||||
droppedFrames int64 // Atomic counter for dropped frames
|
droppedFrames int64 // Atomic counter for dropped frames
|
||||||
totalFrames int64 // Atomic counter for total frames
|
totalFrames int64 // Atomic counter for total frames
|
||||||
|
|
||||||
conn net.Conn
|
conn net.Conn
|
||||||
mtx sync.Mutex
|
mtx sync.Mutex
|
||||||
running bool
|
running bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAudioInputClient creates a new audio input client
|
// NewAudioInputClient creates a new audio input client
|
||||||
|
@ -575,7 +575,7 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
|
||||||
// Check if processing queue is getting full
|
// Check if processing queue is getting full
|
||||||
queueLen := len(ais.processChan)
|
queueLen := len(ais.processChan)
|
||||||
bufferSize := int(atomic.LoadInt64(&ais.bufferSize))
|
bufferSize := int(atomic.LoadInt64(&ais.bufferSize))
|
||||||
|
|
||||||
if queueLen > bufferSize*3/4 {
|
if queueLen > bufferSize*3/4 {
|
||||||
// Drop oldest frames, keep newest
|
// Drop oldest frames, keep newest
|
||||||
select {
|
select {
|
||||||
|
@ -585,7 +585,7 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to processing queue
|
// Send to processing queue
|
||||||
select {
|
select {
|
||||||
case ais.processChan <- msg:
|
case ais.processChan <- msg:
|
||||||
|
@ -605,7 +605,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
|
||||||
defer ais.wg.Done()
|
defer ais.wg.Done()
|
||||||
ticker := time.NewTicker(100 * time.Millisecond)
|
ticker := time.NewTicker(100 * time.Millisecond)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ais.stopChan:
|
case <-ais.stopChan:
|
||||||
|
@ -618,7 +618,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err := ais.processMessage(msg)
|
err := ais.processMessage(msg)
|
||||||
processingTime := time.Since(start).Nanoseconds()
|
processingTime := time.Since(start).Nanoseconds()
|
||||||
|
|
||||||
// Calculate end-to-end latency using message timestamp
|
// Calculate end-to-end latency using message timestamp
|
||||||
if msg.Type == InputMessageTypeOpusFrame && msg.Timestamp > 0 {
|
if msg.Type == InputMessageTypeOpusFrame && msg.Timestamp > 0 {
|
||||||
msgTime := time.Unix(0, msg.Timestamp)
|
msgTime := time.Unix(0, msg.Timestamp)
|
||||||
|
@ -634,7 +634,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
|
||||||
newAvg := (currentAvg + processingTime) / 2
|
newAvg := (currentAvg + processingTime) / 2
|
||||||
atomic.StoreInt64(&ais.processingTime, newAvg)
|
atomic.StoreInt64(&ais.processingTime, newAvg)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
atomic.AddInt64(&ais.droppedFrames, 1)
|
atomic.AddInt64(&ais.droppedFrames, 1)
|
||||||
}
|
}
|
||||||
|
@ -643,12 +643,12 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
|
||||||
goto adaptiveBuffering
|
goto adaptiveBuffering
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
adaptiveBuffering:
|
adaptiveBuffering:
|
||||||
// Adaptive buffer sizing based on processing time
|
// Adaptive buffer sizing based on processing time
|
||||||
avgTime := atomic.LoadInt64(&ais.processingTime)
|
avgTime := atomic.LoadInt64(&ais.processingTime)
|
||||||
currentSize := atomic.LoadInt64(&ais.bufferSize)
|
currentSize := atomic.LoadInt64(&ais.bufferSize)
|
||||||
|
|
||||||
if avgTime > 10*1000*1000 { // > 10ms processing time
|
if avgTime > 10*1000*1000 { // > 10ms processing time
|
||||||
// Increase buffer size
|
// Increase buffer size
|
||||||
newSize := currentSize * 2
|
newSize := currentSize * 2
|
||||||
|
@ -685,4 +685,4 @@ func getInputSocketPath() string {
|
||||||
return path
|
return path
|
||||||
}
|
}
|
||||||
return filepath.Join("/var/run", inputSocketName)
|
return filepath.Join("/var/run", inputSocketName)
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,10 +10,10 @@ import (
|
||||||
// with reduced contention using atomic operations and conditional locking
|
// with reduced contention using atomic operations and conditional locking
|
||||||
type MicrophoneContentionManager struct {
|
type MicrophoneContentionManager struct {
|
||||||
// Atomic fields (must be 64-bit aligned on 32-bit systems)
|
// Atomic fields (must be 64-bit aligned on 32-bit systems)
|
||||||
lastOpNano int64 // Unix nanoseconds of last operation
|
lastOpNano int64 // Unix nanoseconds of last operation
|
||||||
cooldownNanos int64 // Cooldown duration in nanoseconds
|
cooldownNanos int64 // Cooldown duration in nanoseconds
|
||||||
operationID int64 // Incremental operation ID for tracking
|
operationID int64 // Incremental operation ID for tracking
|
||||||
|
|
||||||
// Lock-free state flags (using atomic.Pointer for lock-free updates)
|
// Lock-free state flags (using atomic.Pointer for lock-free updates)
|
||||||
lockPtr unsafe.Pointer // *sync.Mutex - conditionally allocated
|
lockPtr unsafe.Pointer // *sync.Mutex - conditionally allocated
|
||||||
}
|
}
|
||||||
|
@ -27,61 +27,61 @@ func NewMicrophoneContentionManager(cooldown time.Duration) *MicrophoneContentio
|
||||||
|
|
||||||
// OperationResult represents the result of attempting a microphone operation
|
// OperationResult represents the result of attempting a microphone operation
|
||||||
type OperationResult struct {
|
type OperationResult struct {
|
||||||
Allowed bool
|
Allowed bool
|
||||||
RemainingCooldown time.Duration
|
RemainingCooldown time.Duration
|
||||||
OperationID int64
|
OperationID int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// TryOperation attempts to perform a microphone operation with optimized contention handling
|
// TryOperation attempts to perform a microphone operation with optimized contention handling
|
||||||
func (mcm *MicrophoneContentionManager) TryOperation() OperationResult {
|
func (mcm *MicrophoneContentionManager) TryOperation() OperationResult {
|
||||||
now := time.Now().UnixNano()
|
now := time.Now().UnixNano()
|
||||||
cooldown := atomic.LoadInt64(&mcm.cooldownNanos)
|
cooldown := atomic.LoadInt64(&mcm.cooldownNanos)
|
||||||
|
|
||||||
// Fast path: check if we're clearly outside cooldown period using atomic read
|
// Fast path: check if we're clearly outside cooldown period using atomic read
|
||||||
lastOp := atomic.LoadInt64(&mcm.lastOpNano)
|
lastOp := atomic.LoadInt64(&mcm.lastOpNano)
|
||||||
elapsed := now - lastOp
|
elapsed := now - lastOp
|
||||||
|
|
||||||
if elapsed >= cooldown {
|
if elapsed >= cooldown {
|
||||||
// Attempt atomic update without locking
|
// Attempt atomic update without locking
|
||||||
if atomic.CompareAndSwapInt64(&mcm.lastOpNano, lastOp, now) {
|
if atomic.CompareAndSwapInt64(&mcm.lastOpNano, lastOp, now) {
|
||||||
opID := atomic.AddInt64(&mcm.operationID, 1)
|
opID := atomic.AddInt64(&mcm.operationID, 1)
|
||||||
return OperationResult{
|
return OperationResult{
|
||||||
Allowed: true,
|
Allowed: true,
|
||||||
RemainingCooldown: 0,
|
RemainingCooldown: 0,
|
||||||
OperationID: opID,
|
OperationID: opID,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Slow path: potential contention, check remaining cooldown
|
// Slow path: potential contention, check remaining cooldown
|
||||||
currentLastOp := atomic.LoadInt64(&mcm.lastOpNano)
|
currentLastOp := atomic.LoadInt64(&mcm.lastOpNano)
|
||||||
currentElapsed := now - currentLastOp
|
currentElapsed := now - currentLastOp
|
||||||
|
|
||||||
if currentElapsed >= cooldown {
|
if currentElapsed >= cooldown {
|
||||||
// Race condition: another operation might have updated lastOpNano
|
// Race condition: another operation might have updated lastOpNano
|
||||||
// Try once more with CAS
|
// Try once more with CAS
|
||||||
if atomic.CompareAndSwapInt64(&mcm.lastOpNano, currentLastOp, now) {
|
if atomic.CompareAndSwapInt64(&mcm.lastOpNano, currentLastOp, now) {
|
||||||
opID := atomic.AddInt64(&mcm.operationID, 1)
|
opID := atomic.AddInt64(&mcm.operationID, 1)
|
||||||
return OperationResult{
|
return OperationResult{
|
||||||
Allowed: true,
|
Allowed: true,
|
||||||
RemainingCooldown: 0,
|
RemainingCooldown: 0,
|
||||||
OperationID: opID,
|
OperationID: opID,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// If CAS failed, fall through to cooldown calculation
|
// If CAS failed, fall through to cooldown calculation
|
||||||
currentLastOp = atomic.LoadInt64(&mcm.lastOpNano)
|
currentLastOp = atomic.LoadInt64(&mcm.lastOpNano)
|
||||||
currentElapsed = now - currentLastOp
|
currentElapsed = now - currentLastOp
|
||||||
}
|
}
|
||||||
|
|
||||||
remaining := time.Duration(cooldown - currentElapsed)
|
remaining := time.Duration(cooldown - currentElapsed)
|
||||||
if remaining < 0 {
|
if remaining < 0 {
|
||||||
remaining = 0
|
remaining = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
return OperationResult{
|
return OperationResult{
|
||||||
Allowed: false,
|
Allowed: false,
|
||||||
RemainingCooldown: remaining,
|
RemainingCooldown: remaining,
|
||||||
OperationID: atomic.LoadInt64(&mcm.operationID),
|
OperationID: atomic.LoadInt64(&mcm.operationID),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,20 +127,20 @@ func GetMicrophoneContentionManager() *MicrophoneContentionManager {
|
||||||
if ptr != nil {
|
if ptr != nil {
|
||||||
return (*MicrophoneContentionManager)(ptr)
|
return (*MicrophoneContentionManager)(ptr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize on first use
|
// Initialize on first use
|
||||||
if atomic.CompareAndSwapInt32(&micContentionInitialized, 0, 1) {
|
if atomic.CompareAndSwapInt32(&micContentionInitialized, 0, 1) {
|
||||||
manager := NewMicrophoneContentionManager(200 * time.Millisecond)
|
manager := NewMicrophoneContentionManager(200 * time.Millisecond)
|
||||||
atomic.StorePointer(&globalMicContentionManager, unsafe.Pointer(manager))
|
atomic.StorePointer(&globalMicContentionManager, unsafe.Pointer(manager))
|
||||||
return manager
|
return manager
|
||||||
}
|
}
|
||||||
|
|
||||||
// Another goroutine initialized it, try again
|
// Another goroutine initialized it, try again
|
||||||
ptr = atomic.LoadPointer(&globalMicContentionManager)
|
ptr = atomic.LoadPointer(&globalMicContentionManager)
|
||||||
if ptr != nil {
|
if ptr != nil {
|
||||||
return (*MicrophoneContentionManager)(ptr)
|
return (*MicrophoneContentionManager)(ptr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fallback: create a new manager (should rarely happen)
|
// Fallback: create a new manager (should rarely happen)
|
||||||
return NewMicrophoneContentionManager(200 * time.Millisecond)
|
return NewMicrophoneContentionManager(200 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
@ -155,4 +155,4 @@ func TryMicrophoneOperation() OperationResult {
|
||||||
func SetMicrophoneCooldown(cooldown time.Duration) {
|
func SetMicrophoneCooldown(cooldown time.Duration) {
|
||||||
manager := GetMicrophoneContentionManager()
|
manager := GetMicrophoneContentionManager()
|
||||||
manager.SetCooldown(cooldown)
|
manager.SetCooldown(cooldown)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue