mirror of https://github.com/jetkvm/kvm.git
Compare commits
9 Commits
c4099596f4
...
ffdbce4cc9
| Author | SHA1 | Date |
|---|---|---|
|
|
ffdbce4cc9 | |
|
|
5da357ba01 | |
|
|
eab0261344 | |
|
|
e0b6e612c0 | |
|
|
f48c3fe25a | |
|
|
d4c10aef87 | |
|
|
2a81497d34 | |
|
|
8cff7d600b | |
|
|
eca1e6a80d |
218
input_rpc.go
218
input_rpc.go
|
|
@ -1,218 +0,0 @@
|
||||||
package kvm
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Constants for input validation
|
|
||||||
const (
|
|
||||||
// MaxKeyboardKeys defines the maximum number of simultaneous key presses
|
|
||||||
// This matches the USB HID keyboard report specification
|
|
||||||
MaxKeyboardKeys = 6
|
|
||||||
)
|
|
||||||
|
|
||||||
// Input RPC Direct Handlers
|
|
||||||
// This module provides optimized direct handlers for high-frequency input events,
|
|
||||||
// bypassing the reflection-based RPC system for improved performance.
|
|
||||||
//
|
|
||||||
// Performance benefits:
|
|
||||||
// - Eliminates reflection overhead (~2-3ms per call)
|
|
||||||
// - Reduces memory allocations
|
|
||||||
// - Optimizes parameter parsing and validation
|
|
||||||
// - Provides faster code path for input methods
|
|
||||||
//
|
|
||||||
// The handlers maintain full compatibility with existing RPC interface
|
|
||||||
// while providing significant latency improvements for input events.
|
|
||||||
|
|
||||||
// Common validation helpers for parameter parsing
|
|
||||||
// These reduce code duplication and provide consistent error messages
|
|
||||||
|
|
||||||
// validateFloat64Param extracts and validates a float64 parameter from the params map
|
|
||||||
func validateFloat64Param(params map[string]interface{}, paramName, methodName string, min, max float64) (float64, error) {
|
|
||||||
value, ok := params[paramName].(float64)
|
|
||||||
if !ok {
|
|
||||||
return 0, fmt.Errorf("%s: %s parameter must be a number, got %T", methodName, paramName, params[paramName])
|
|
||||||
}
|
|
||||||
if value < min || value > max {
|
|
||||||
return 0, fmt.Errorf("%s: %s value %v out of range [%v to %v]", methodName, paramName, value, min, max)
|
|
||||||
}
|
|
||||||
return value, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// validateKeysArray extracts and validates a keys array parameter
|
|
||||||
func validateKeysArray(params map[string]interface{}, methodName string) ([]uint8, error) {
|
|
||||||
keysInterface, ok := params["keys"].([]interface{})
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("%s: keys parameter must be an array, got %T", methodName, params["keys"])
|
|
||||||
}
|
|
||||||
if len(keysInterface) > MaxKeyboardKeys {
|
|
||||||
return nil, fmt.Errorf("%s: too many keys (%d), maximum is %d", methodName, len(keysInterface), MaxKeyboardKeys)
|
|
||||||
}
|
|
||||||
|
|
||||||
keys := make([]uint8, len(keysInterface))
|
|
||||||
for i, keyInterface := range keysInterface {
|
|
||||||
keyFloat, ok := keyInterface.(float64)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("%s: key at index %d must be a number, got %T", methodName, i, keyInterface)
|
|
||||||
}
|
|
||||||
if keyFloat < 0 || keyFloat > 255 {
|
|
||||||
return nil, fmt.Errorf("%s: key at index %d value %v out of range [0-255]", methodName, i, keyFloat)
|
|
||||||
}
|
|
||||||
keys[i] = uint8(keyFloat)
|
|
||||||
}
|
|
||||||
return keys, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Input parameter structures for direct RPC handlers
|
|
||||||
// These mirror the original RPC method signatures but provide
|
|
||||||
// optimized parsing from JSON map parameters.
|
|
||||||
|
|
||||||
// KeyboardReportParams represents parameters for keyboard HID report
|
|
||||||
// Matches rpcKeyboardReport(modifier uint8, keys []uint8)
|
|
||||||
type KeyboardReportParams struct {
|
|
||||||
Modifier uint8 `json:"modifier"` // Keyboard modifier keys (Ctrl, Alt, Shift, etc.)
|
|
||||||
Keys []uint8 `json:"keys"` // Array of pressed key codes (up to 6 keys)
|
|
||||||
}
|
|
||||||
|
|
||||||
// AbsMouseReportParams represents parameters for absolute mouse positioning
|
|
||||||
// Matches rpcAbsMouseReport(x, y int, buttons uint8)
|
|
||||||
type AbsMouseReportParams struct {
|
|
||||||
X int `json:"x"` // Absolute X coordinate (0-32767)
|
|
||||||
Y int `json:"y"` // Absolute Y coordinate (0-32767)
|
|
||||||
Buttons uint8 `json:"buttons"` // Mouse button state bitmask
|
|
||||||
}
|
|
||||||
|
|
||||||
// RelMouseReportParams represents parameters for relative mouse movement
|
|
||||||
// Matches rpcRelMouseReport(dx, dy int8, buttons uint8)
|
|
||||||
type RelMouseReportParams struct {
|
|
||||||
Dx int8 `json:"dx"` // Relative X movement delta (-127 to +127)
|
|
||||||
Dy int8 `json:"dy"` // Relative Y movement delta (-127 to +127)
|
|
||||||
Buttons uint8 `json:"buttons"` // Mouse button state bitmask
|
|
||||||
}
|
|
||||||
|
|
||||||
// WheelReportParams represents parameters for mouse wheel events
|
|
||||||
// Matches rpcWheelReport(wheelY int8)
|
|
||||||
type WheelReportParams struct {
|
|
||||||
WheelY int8 `json:"wheelY"` // Wheel scroll delta (-127 to +127)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Direct handler for keyboard reports
|
|
||||||
// Optimized path that bypasses reflection for keyboard input events
|
|
||||||
func handleKeyboardReportDirect(params map[string]interface{}) (interface{}, error) {
|
|
||||||
// Extract and validate modifier parameter
|
|
||||||
modifierFloat, err := validateFloat64Param(params, "modifier", "keyboardReport", 0, 255)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
modifier := uint8(modifierFloat)
|
|
||||||
|
|
||||||
// Extract and validate keys array
|
|
||||||
keys, err := validateKeysArray(params, "keyboardReport")
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = rpcKeyboardReport(modifier, keys)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Direct handler for absolute mouse reports
|
|
||||||
// Optimized path that bypasses reflection for absolute mouse positioning
|
|
||||||
func handleAbsMouseReportDirect(params map[string]interface{}) (interface{}, error) {
|
|
||||||
// Extract and validate x coordinate
|
|
||||||
xFloat, err := validateFloat64Param(params, "x", "absMouseReport", 0, 32767)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
x := int(xFloat)
|
|
||||||
|
|
||||||
// Extract and validate y coordinate
|
|
||||||
yFloat, err := validateFloat64Param(params, "y", "absMouseReport", 0, 32767)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
y := int(yFloat)
|
|
||||||
|
|
||||||
// Extract and validate buttons
|
|
||||||
buttonsFloat, err := validateFloat64Param(params, "buttons", "absMouseReport", 0, 255)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
buttons := uint8(buttonsFloat)
|
|
||||||
|
|
||||||
return nil, rpcAbsMouseReport(x, y, buttons)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Direct handler for relative mouse reports
|
|
||||||
// Optimized path that bypasses reflection for relative mouse movement
|
|
||||||
func handleRelMouseReportDirect(params map[string]interface{}) (interface{}, error) {
|
|
||||||
// Extract and validate dx (relative X movement)
|
|
||||||
dxFloat, err := validateFloat64Param(params, "dx", "relMouseReport", -127, 127)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
dx := int8(dxFloat)
|
|
||||||
|
|
||||||
// Extract and validate dy (relative Y movement)
|
|
||||||
dyFloat, err := validateFloat64Param(params, "dy", "relMouseReport", -127, 127)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
dy := int8(dyFloat)
|
|
||||||
|
|
||||||
// Extract and validate buttons
|
|
||||||
buttonsFloat, err := validateFloat64Param(params, "buttons", "relMouseReport", 0, 255)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
buttons := uint8(buttonsFloat)
|
|
||||||
|
|
||||||
return nil, rpcRelMouseReport(dx, dy, buttons)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Direct handler for wheel reports
|
|
||||||
// Optimized path that bypasses reflection for mouse wheel events
|
|
||||||
func handleWheelReportDirect(params map[string]interface{}) (interface{}, error) {
|
|
||||||
// Extract and validate wheelY (scroll delta)
|
|
||||||
wheelYFloat, err := validateFloat64Param(params, "wheelY", "wheelReport", -127, 127)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
wheelY := int8(wheelYFloat)
|
|
||||||
|
|
||||||
return nil, rpcWheelReport(wheelY)
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleInputRPCDirect routes input method calls to their optimized direct handlers
|
|
||||||
// This is the main entry point for the fast path that bypasses reflection.
|
|
||||||
// It provides significant performance improvements for high-frequency input events.
|
|
||||||
//
|
|
||||||
// Performance monitoring: Consider adding metrics collection here to track
|
|
||||||
// latency improvements and call frequency for production monitoring.
|
|
||||||
func handleInputRPCDirect(method string, params map[string]interface{}) (interface{}, error) {
|
|
||||||
switch method {
|
|
||||||
case "keyboardReport":
|
|
||||||
return handleKeyboardReportDirect(params)
|
|
||||||
case "absMouseReport":
|
|
||||||
return handleAbsMouseReportDirect(params)
|
|
||||||
case "relMouseReport":
|
|
||||||
return handleRelMouseReportDirect(params)
|
|
||||||
case "wheelReport":
|
|
||||||
return handleWheelReportDirect(params)
|
|
||||||
default:
|
|
||||||
// This should never happen if isInputMethod is correctly implemented
|
|
||||||
return nil, fmt.Errorf("handleInputRPCDirect: unsupported method '%s'", method)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// isInputMethod determines if a given RPC method should use the optimized direct path
|
|
||||||
// Returns true for input-related methods that have direct handlers implemented.
|
|
||||||
// This function must be kept in sync with handleInputRPCDirect.
|
|
||||||
func isInputMethod(method string) bool {
|
|
||||||
switch method {
|
|
||||||
case "keyboardReport", "absMouseReport", "relMouseReport", "wheelReport":
|
|
||||||
return true
|
|
||||||
default:
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -16,8 +16,8 @@ var microphoneMuteState struct {
|
||||||
|
|
||||||
func SetAudioMuted(muted bool) {
|
func SetAudioMuted(muted bool) {
|
||||||
audioMuteState.mu.Lock()
|
audioMuteState.mu.Lock()
|
||||||
|
defer audioMuteState.mu.Unlock()
|
||||||
audioMuteState.muted = muted
|
audioMuteState.muted = muted
|
||||||
audioMuteState.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func IsAudioMuted() bool {
|
func IsAudioMuted() bool {
|
||||||
|
|
@ -28,8 +28,8 @@ func IsAudioMuted() bool {
|
||||||
|
|
||||||
func SetMicrophoneMuted(muted bool) {
|
func SetMicrophoneMuted(muted bool) {
|
||||||
microphoneMuteState.mu.Lock()
|
microphoneMuteState.mu.Lock()
|
||||||
|
defer microphoneMuteState.mu.Unlock()
|
||||||
microphoneMuteState.muted = muted
|
microphoneMuteState.muted = muted
|
||||||
microphoneMuteState.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func IsMicrophoneMuted() bool {
|
func IsMicrophoneMuted() bool {
|
||||||
|
|
|
||||||
|
|
@ -87,9 +87,9 @@ static volatile int playback_initialized = 0;
|
||||||
// Function to dynamically update Opus encoder parameters
|
// Function to dynamically update Opus encoder parameters
|
||||||
int update_opus_encoder_params(int bitrate, int complexity, int vbr, int vbr_constraint,
|
int update_opus_encoder_params(int bitrate, int complexity, int vbr, int vbr_constraint,
|
||||||
int signal_type, int bandwidth, int dtx) {
|
int signal_type, int bandwidth, int dtx) {
|
||||||
// This function works for both audio input and output encoder parameters
|
// This function updates encoder parameters for audio input (capture)
|
||||||
// Require either capture (output) or playback (input) initialization
|
// Only capture uses the encoder; playback uses a separate decoder
|
||||||
if (!encoder || (!capture_initialized && !playback_initialized)) {
|
if (!encoder || !capture_initialized) {
|
||||||
return -1; // Audio encoder not initialized
|
return -1; // Audio encoder not initialized
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -260,14 +260,14 @@ var (
|
||||||
lastMetricsUpdate int64
|
lastMetricsUpdate int64
|
||||||
|
|
||||||
// Counter value tracking (since prometheus counters don't have Get() method)
|
// Counter value tracking (since prometheus counters don't have Get() method)
|
||||||
audioFramesReceivedValue int64
|
audioFramesReceivedValue uint64
|
||||||
audioFramesDroppedValue int64
|
audioFramesDroppedValue uint64
|
||||||
audioBytesProcessedValue int64
|
audioBytesProcessedValue uint64
|
||||||
audioConnectionDropsValue int64
|
audioConnectionDropsValue uint64
|
||||||
micFramesSentValue int64
|
micFramesSentValue uint64
|
||||||
micFramesDroppedValue int64
|
micFramesDroppedValue uint64
|
||||||
micBytesProcessedValue int64
|
micBytesProcessedValue uint64
|
||||||
micConnectionDropsValue int64
|
micConnectionDropsValue uint64
|
||||||
|
|
||||||
// Atomic counters for device health metrics - functionality removed, no longer used
|
// Atomic counters for device health metrics - functionality removed, no longer used
|
||||||
|
|
||||||
|
|
@ -277,11 +277,11 @@ var (
|
||||||
|
|
||||||
// UnifiedAudioMetrics provides a common structure for both input and output audio streams
|
// UnifiedAudioMetrics provides a common structure for both input and output audio streams
|
||||||
type UnifiedAudioMetrics struct {
|
type UnifiedAudioMetrics struct {
|
||||||
FramesReceived int64 `json:"frames_received"`
|
FramesReceived uint64 `json:"frames_received"`
|
||||||
FramesDropped int64 `json:"frames_dropped"`
|
FramesDropped uint64 `json:"frames_dropped"`
|
||||||
FramesSent int64 `json:"frames_sent,omitempty"`
|
FramesSent uint64 `json:"frames_sent,omitempty"`
|
||||||
BytesProcessed int64 `json:"bytes_processed"`
|
BytesProcessed uint64 `json:"bytes_processed"`
|
||||||
ConnectionDrops int64 `json:"connection_drops"`
|
ConnectionDrops uint64 `json:"connection_drops"`
|
||||||
LastFrameTime time.Time `json:"last_frame_time"`
|
LastFrameTime time.Time `json:"last_frame_time"`
|
||||||
AverageLatency time.Duration `json:"average_latency"`
|
AverageLatency time.Duration `json:"average_latency"`
|
||||||
}
|
}
|
||||||
|
|
@ -303,10 +303,10 @@ func convertAudioMetricsToUnified(metrics AudioMetrics) UnifiedAudioMetrics {
|
||||||
func convertAudioInputMetricsToUnified(metrics AudioInputMetrics) UnifiedAudioMetrics {
|
func convertAudioInputMetricsToUnified(metrics AudioInputMetrics) UnifiedAudioMetrics {
|
||||||
return UnifiedAudioMetrics{
|
return UnifiedAudioMetrics{
|
||||||
FramesReceived: 0, // AudioInputMetrics doesn't have FramesReceived
|
FramesReceived: 0, // AudioInputMetrics doesn't have FramesReceived
|
||||||
FramesDropped: metrics.FramesDropped,
|
FramesDropped: uint64(metrics.FramesDropped),
|
||||||
FramesSent: metrics.FramesSent,
|
FramesSent: uint64(metrics.FramesSent),
|
||||||
BytesProcessed: metrics.BytesProcessed,
|
BytesProcessed: uint64(metrics.BytesProcessed),
|
||||||
ConnectionDrops: metrics.ConnectionDrops,
|
ConnectionDrops: uint64(metrics.ConnectionDrops),
|
||||||
LastFrameTime: metrics.LastFrameTime,
|
LastFrameTime: metrics.LastFrameTime,
|
||||||
AverageLatency: metrics.AverageLatency,
|
AverageLatency: metrics.AverageLatency,
|
||||||
}
|
}
|
||||||
|
|
@ -314,22 +314,22 @@ func convertAudioInputMetricsToUnified(metrics AudioInputMetrics) UnifiedAudioMe
|
||||||
|
|
||||||
// UpdateAudioMetrics updates Prometheus metrics with current audio data
|
// UpdateAudioMetrics updates Prometheus metrics with current audio data
|
||||||
func UpdateAudioMetrics(metrics UnifiedAudioMetrics) {
|
func UpdateAudioMetrics(metrics UnifiedAudioMetrics) {
|
||||||
oldReceived := atomic.SwapInt64(&audioFramesReceivedValue, metrics.FramesReceived)
|
oldReceived := atomic.SwapUint64(&audioFramesReceivedValue, metrics.FramesReceived)
|
||||||
if metrics.FramesReceived > oldReceived {
|
if metrics.FramesReceived > oldReceived {
|
||||||
audioFramesReceivedTotal.Add(float64(metrics.FramesReceived - oldReceived))
|
audioFramesReceivedTotal.Add(float64(metrics.FramesReceived - oldReceived))
|
||||||
}
|
}
|
||||||
|
|
||||||
oldDropped := atomic.SwapInt64(&audioFramesDroppedValue, metrics.FramesDropped)
|
oldDropped := atomic.SwapUint64(&audioFramesDroppedValue, metrics.FramesDropped)
|
||||||
if metrics.FramesDropped > oldDropped {
|
if metrics.FramesDropped > oldDropped {
|
||||||
audioFramesDroppedTotal.Add(float64(metrics.FramesDropped - oldDropped))
|
audioFramesDroppedTotal.Add(float64(metrics.FramesDropped - oldDropped))
|
||||||
}
|
}
|
||||||
|
|
||||||
oldBytes := atomic.SwapInt64(&audioBytesProcessedValue, metrics.BytesProcessed)
|
oldBytes := atomic.SwapUint64(&audioBytesProcessedValue, metrics.BytesProcessed)
|
||||||
if metrics.BytesProcessed > oldBytes {
|
if metrics.BytesProcessed > oldBytes {
|
||||||
audioBytesProcessedTotal.Add(float64(metrics.BytesProcessed - oldBytes))
|
audioBytesProcessedTotal.Add(float64(metrics.BytesProcessed - oldBytes))
|
||||||
}
|
}
|
||||||
|
|
||||||
oldDrops := atomic.SwapInt64(&audioConnectionDropsValue, metrics.ConnectionDrops)
|
oldDrops := atomic.SwapUint64(&audioConnectionDropsValue, metrics.ConnectionDrops)
|
||||||
if metrics.ConnectionDrops > oldDrops {
|
if metrics.ConnectionDrops > oldDrops {
|
||||||
audioConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - oldDrops))
|
audioConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - oldDrops))
|
||||||
}
|
}
|
||||||
|
|
@ -345,22 +345,22 @@ func UpdateAudioMetrics(metrics UnifiedAudioMetrics) {
|
||||||
|
|
||||||
// UpdateMicrophoneMetrics updates Prometheus metrics with current microphone data
|
// UpdateMicrophoneMetrics updates Prometheus metrics with current microphone data
|
||||||
func UpdateMicrophoneMetrics(metrics UnifiedAudioMetrics) {
|
func UpdateMicrophoneMetrics(metrics UnifiedAudioMetrics) {
|
||||||
oldSent := atomic.SwapInt64(&micFramesSentValue, metrics.FramesSent)
|
oldSent := atomic.SwapUint64(&micFramesSentValue, metrics.FramesSent)
|
||||||
if metrics.FramesSent > oldSent {
|
if metrics.FramesSent > oldSent {
|
||||||
microphoneFramesSentTotal.Add(float64(metrics.FramesSent - oldSent))
|
microphoneFramesSentTotal.Add(float64(metrics.FramesSent - oldSent))
|
||||||
}
|
}
|
||||||
|
|
||||||
oldDropped := atomic.SwapInt64(&micFramesDroppedValue, metrics.FramesDropped)
|
oldDropped := atomic.SwapUint64(&micFramesDroppedValue, metrics.FramesDropped)
|
||||||
if metrics.FramesDropped > oldDropped {
|
if metrics.FramesDropped > oldDropped {
|
||||||
microphoneFramesDroppedTotal.Add(float64(metrics.FramesDropped - oldDropped))
|
microphoneFramesDroppedTotal.Add(float64(metrics.FramesDropped - oldDropped))
|
||||||
}
|
}
|
||||||
|
|
||||||
oldBytes := atomic.SwapInt64(&micBytesProcessedValue, metrics.BytesProcessed)
|
oldBytes := atomic.SwapUint64(&micBytesProcessedValue, metrics.BytesProcessed)
|
||||||
if metrics.BytesProcessed > oldBytes {
|
if metrics.BytesProcessed > oldBytes {
|
||||||
microphoneBytesProcessedTotal.Add(float64(metrics.BytesProcessed - oldBytes))
|
microphoneBytesProcessedTotal.Add(float64(metrics.BytesProcessed - oldBytes))
|
||||||
}
|
}
|
||||||
|
|
||||||
oldDrops := atomic.SwapInt64(&micConnectionDropsValue, metrics.ConnectionDrops)
|
oldDrops := atomic.SwapUint64(&micConnectionDropsValue, metrics.ConnectionDrops)
|
||||||
if metrics.ConnectionDrops > oldDrops {
|
if metrics.ConnectionDrops > oldDrops {
|
||||||
microphoneConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - oldDrops))
|
microphoneConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - oldDrops))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -230,7 +230,7 @@ func (ais *AudioInputSupervisor) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) er
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendConfig sends a configuration update to the subprocess (convenience method)
|
// SendConfig sends a configuration update to the subprocess (convenience method)
|
||||||
func (ais *AudioInputSupervisor) SendConfig(config InputIPCConfig) error {
|
func (ais *AudioInputSupervisor) SendConfig(config UnifiedIPCConfig) error {
|
||||||
if ais.client == nil {
|
if ais.client == nil {
|
||||||
return fmt.Errorf("client not initialized")
|
return fmt.Errorf("client not initialized")
|
||||||
}
|
}
|
||||||
|
|
@ -243,7 +243,7 @@ func (ais *AudioInputSupervisor) SendConfig(config InputIPCConfig) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendOpusConfig sends a complete Opus encoder configuration to the audio input server
|
// SendOpusConfig sends a complete Opus encoder configuration to the audio input server
|
||||||
func (ais *AudioInputSupervisor) SendOpusConfig(config InputIPCOpusConfig) error {
|
func (ais *AudioInputSupervisor) SendOpusConfig(config UnifiedIPCOpusConfig) error {
|
||||||
if ais.client == nil {
|
if ais.client == nil {
|
||||||
return fmt.Errorf("client not initialized")
|
return fmt.Errorf("client not initialized")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -134,14 +134,12 @@ func (mp *GenericMessagePool) GetStats() (hitCount, missCount int64, hitRate flo
|
||||||
|
|
||||||
// Helper functions
|
// Helper functions
|
||||||
|
|
||||||
// EncodeMessageHeader encodes a message header into a byte slice
|
// EncodeMessageHeader encodes a message header into a provided byte slice
|
||||||
func EncodeMessageHeader(magic uint32, msgType uint8, length uint32, timestamp int64) []byte {
|
func EncodeMessageHeader(header []byte, magic uint32, msgType uint8, length uint32, timestamp int64) {
|
||||||
header := make([]byte, 17)
|
|
||||||
binary.LittleEndian.PutUint32(header[0:4], magic)
|
binary.LittleEndian.PutUint32(header[0:4], magic)
|
||||||
header[4] = msgType
|
header[4] = msgType
|
||||||
binary.LittleEndian.PutUint32(header[5:9], length)
|
binary.LittleEndian.PutUint32(header[5:9], length)
|
||||||
binary.LittleEndian.PutUint64(header[9:17], uint64(timestamp))
|
binary.LittleEndian.PutUint64(header[9:17], uint64(timestamp))
|
||||||
return header
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// EncodeAudioConfig encodes basic audio configuration to binary format
|
// EncodeAudioConfig encodes basic audio configuration to binary format
|
||||||
|
|
@ -179,14 +177,12 @@ func WriteIPCMessage(conn net.Conn, msg IPCMessage, pool *GenericMessagePool, dr
|
||||||
defer pool.Put(optMsg)
|
defer pool.Put(optMsg)
|
||||||
|
|
||||||
// Prepare header in pre-allocated buffer
|
// Prepare header in pre-allocated buffer
|
||||||
header := EncodeMessageHeader(msg.GetMagic(), msg.GetType(), msg.GetLength(), msg.GetTimestamp())
|
EncodeMessageHeader(optMsg.header[:], msg.GetMagic(), msg.GetType(), msg.GetLength(), msg.GetTimestamp())
|
||||||
copy(optMsg.header[:], header)
|
|
||||||
|
|
||||||
// Set write deadline for timeout handling (more efficient than goroutines)
|
// Set write deadline for timeout handling (more efficient than goroutines)
|
||||||
if deadline := time.Now().Add(Config.WriteTimeout); deadline.After(time.Now()) {
|
if deadline := time.Now().Add(Config.WriteTimeout); deadline.After(time.Now()) {
|
||||||
if err := conn.SetWriteDeadline(deadline); err != nil {
|
if err := conn.SetWriteDeadline(deadline); err != nil {
|
||||||
// If we can't set deadline, proceed without it
|
// If we can't set deadline, proceed without it
|
||||||
// This maintains compatibility with connections that don't support deadlines
|
|
||||||
_ = err // Explicitly ignore error for linter
|
_ = err // Explicitly ignore error for linter
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,27 +27,11 @@ var (
|
||||||
messagePoolSize = Config.MessagePoolSize // Pre-allocated message pool size
|
messagePoolSize = Config.MessagePoolSize // Pre-allocated message pool size
|
||||||
)
|
)
|
||||||
|
|
||||||
// Legacy aliases for backward compatibility
|
|
||||||
type InputMessageType = UnifiedMessageType
|
|
||||||
type InputIPCMessage = UnifiedIPCMessage
|
|
||||||
|
|
||||||
// Legacy constants for backward compatibility
|
|
||||||
const (
|
|
||||||
InputMessageTypeOpusFrame = MessageTypeOpusFrame
|
|
||||||
InputMessageTypeConfig = MessageTypeConfig
|
|
||||||
InputMessageTypeOpusConfig = MessageTypeOpusConfig
|
|
||||||
InputMessageTypeStop = MessageTypeStop
|
|
||||||
InputMessageTypeHeartbeat = MessageTypeHeartbeat
|
|
||||||
InputMessageTypeAck = MessageTypeAck
|
|
||||||
)
|
|
||||||
|
|
||||||
// Methods are now inherited from UnifiedIPCMessage
|
|
||||||
|
|
||||||
// OptimizedIPCMessage represents an optimized message with pre-allocated buffers
|
// OptimizedIPCMessage represents an optimized message with pre-allocated buffers
|
||||||
type OptimizedIPCMessage struct {
|
type OptimizedIPCMessage struct {
|
||||||
header [17]byte // Pre-allocated header buffer (headerSize = 17)
|
header [17]byte // Pre-allocated header buffer (headerSize = 17)
|
||||||
data []byte // Reusable data buffer
|
data []byte // Reusable data buffer
|
||||||
msg InputIPCMessage // Embedded message
|
msg UnifiedIPCMessage // Embedded message
|
||||||
}
|
}
|
||||||
|
|
||||||
// MessagePool manages a pool of reusable messages to reduce allocations
|
// MessagePool manages a pool of reusable messages to reduce allocations
|
||||||
|
|
@ -109,7 +93,7 @@ func (mp *MessagePool) Get() *OptimizedIPCMessage {
|
||||||
atomic.AddInt64(&mp.hitCount, 1)
|
atomic.AddInt64(&mp.hitCount, 1)
|
||||||
// Reset message for reuse
|
// Reset message for reuse
|
||||||
msg.data = msg.data[:0]
|
msg.data = msg.data[:0]
|
||||||
msg.msg = InputIPCMessage{}
|
msg.msg = UnifiedIPCMessage{}
|
||||||
return msg
|
return msg
|
||||||
}
|
}
|
||||||
mp.mutex.Unlock()
|
mp.mutex.Unlock()
|
||||||
|
|
@ -120,7 +104,7 @@ func (mp *MessagePool) Get() *OptimizedIPCMessage {
|
||||||
atomic.AddInt64(&mp.hitCount, 1)
|
atomic.AddInt64(&mp.hitCount, 1)
|
||||||
// Reset message for reuse and ensure proper capacity
|
// Reset message for reuse and ensure proper capacity
|
||||||
msg.data = msg.data[:0]
|
msg.data = msg.data[:0]
|
||||||
msg.msg = InputIPCMessage{}
|
msg.msg = UnifiedIPCMessage{}
|
||||||
// Ensure data buffer has sufficient capacity
|
// Ensure data buffer has sufficient capacity
|
||||||
if cap(msg.data) < maxFrameSize {
|
if cap(msg.data) < maxFrameSize {
|
||||||
msg.data = make([]byte, 0, maxFrameSize)
|
msg.data = make([]byte, 0, maxFrameSize)
|
||||||
|
|
@ -148,7 +132,7 @@ func (mp *MessagePool) Put(msg *OptimizedIPCMessage) {
|
||||||
|
|
||||||
// Reset the message for reuse
|
// Reset the message for reuse
|
||||||
msg.data = msg.data[:0]
|
msg.data = msg.data[:0]
|
||||||
msg.msg = InputIPCMessage{}
|
msg.msg = UnifiedIPCMessage{}
|
||||||
|
|
||||||
// First try to return to pre-allocated pool for fastest reuse
|
// First try to return to pre-allocated pool for fastest reuse
|
||||||
mp.mutex.Lock()
|
mp.mutex.Lock()
|
||||||
|
|
@ -168,10 +152,6 @@ func (mp *MessagePool) Put(msg *OptimizedIPCMessage) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Legacy aliases for backward compatibility
|
|
||||||
type InputIPCConfig = UnifiedIPCConfig
|
|
||||||
type InputIPCOpusConfig = UnifiedIPCOpusConfig
|
|
||||||
|
|
||||||
// AudioInputServer handles IPC communication for audio input processing
|
// AudioInputServer handles IPC communication for audio input processing
|
||||||
type AudioInputServer struct {
|
type AudioInputServer struct {
|
||||||
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
|
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
|
||||||
|
|
@ -186,10 +166,10 @@ type AudioInputServer struct {
|
||||||
running bool
|
running bool
|
||||||
|
|
||||||
// Triple-goroutine architecture
|
// Triple-goroutine architecture
|
||||||
messageChan chan *InputIPCMessage // Buffered channel for incoming messages
|
messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages
|
||||||
processChan chan *InputIPCMessage // Buffered channel for processing queue
|
processChan chan *UnifiedIPCMessage // Buffered channel for processing queue
|
||||||
stopChan chan struct{} // Stop signal for all goroutines
|
stopChan chan struct{} // Stop signal for all goroutines
|
||||||
wg sync.WaitGroup // Wait group for goroutine coordination
|
wg sync.WaitGroup // Wait group for goroutine coordination
|
||||||
|
|
||||||
// Channel resizing support
|
// Channel resizing support
|
||||||
channelMutex sync.RWMutex // Protects channel recreation
|
channelMutex sync.RWMutex // Protects channel recreation
|
||||||
|
|
@ -246,8 +226,8 @@ func NewAudioInputServer() (*AudioInputServer, error) {
|
||||||
|
|
||||||
return &AudioInputServer{
|
return &AudioInputServer{
|
||||||
listener: listener,
|
listener: listener,
|
||||||
messageChan: make(chan *InputIPCMessage, initialBufferSize),
|
messageChan: make(chan *UnifiedIPCMessage, initialBufferSize),
|
||||||
processChan: make(chan *InputIPCMessage, initialBufferSize),
|
processChan: make(chan *UnifiedIPCMessage, initialBufferSize),
|
||||||
stopChan: make(chan struct{}),
|
stopChan: make(chan struct{}),
|
||||||
bufferSize: initialBufferSize,
|
bufferSize: initialBufferSize,
|
||||||
lastBufferSize: initialBufferSize,
|
lastBufferSize: initialBufferSize,
|
||||||
|
|
@ -405,7 +385,7 @@ func (ais *AudioInputServer) handleConnection(conn net.Conn) {
|
||||||
//
|
//
|
||||||
// The function uses pooled buffers for efficient memory management and
|
// The function uses pooled buffers for efficient memory management and
|
||||||
// ensures all messages conform to the JetKVM audio protocol specification.
|
// ensures all messages conform to the JetKVM audio protocol specification.
|
||||||
func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error) {
|
func (ais *AudioInputServer) readMessage(conn net.Conn) (*UnifiedIPCMessage, error) {
|
||||||
// Get optimized message from pool
|
// Get optimized message from pool
|
||||||
optMsg := globalMessagePool.Get()
|
optMsg := globalMessagePool.Get()
|
||||||
defer globalMessagePool.Put(optMsg)
|
defer globalMessagePool.Put(optMsg)
|
||||||
|
|
@ -419,7 +399,7 @@ func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error
|
||||||
// Parse header using optimized access
|
// Parse header using optimized access
|
||||||
msg := &optMsg.msg
|
msg := &optMsg.msg
|
||||||
msg.Magic = binary.LittleEndian.Uint32(optMsg.header[0:4])
|
msg.Magic = binary.LittleEndian.Uint32(optMsg.header[0:4])
|
||||||
msg.Type = InputMessageType(optMsg.header[4])
|
msg.Type = UnifiedMessageType(optMsg.header[4])
|
||||||
msg.Length = binary.LittleEndian.Uint32(optMsg.header[5:9])
|
msg.Length = binary.LittleEndian.Uint32(optMsg.header[5:9])
|
||||||
msg.Timestamp = int64(binary.LittleEndian.Uint64(optMsg.header[9:17]))
|
msg.Timestamp = int64(binary.LittleEndian.Uint64(optMsg.header[9:17]))
|
||||||
|
|
||||||
|
|
@ -450,7 +430,7 @@ func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return a copy of the message (data will be copied by caller if needed)
|
// Return a copy of the message (data will be copied by caller if needed)
|
||||||
result := &InputIPCMessage{
|
result := &UnifiedIPCMessage{
|
||||||
Magic: msg.Magic,
|
Magic: msg.Magic,
|
||||||
Type: msg.Type,
|
Type: msg.Type,
|
||||||
Length: msg.Length,
|
Length: msg.Length,
|
||||||
|
|
@ -467,17 +447,17 @@ func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error
|
||||||
}
|
}
|
||||||
|
|
||||||
// processMessage processes a received message
|
// processMessage processes a received message
|
||||||
func (ais *AudioInputServer) processMessage(msg *InputIPCMessage) error {
|
func (ais *AudioInputServer) processMessage(msg *UnifiedIPCMessage) error {
|
||||||
switch msg.Type {
|
switch msg.Type {
|
||||||
case InputMessageTypeOpusFrame:
|
case MessageTypeOpusFrame:
|
||||||
return ais.processOpusFrame(msg.Data)
|
return ais.processOpusFrame(msg.Data)
|
||||||
case InputMessageTypeConfig:
|
case MessageTypeConfig:
|
||||||
return ais.processConfig(msg.Data)
|
return ais.processConfig(msg.Data)
|
||||||
case InputMessageTypeOpusConfig:
|
case MessageTypeOpusConfig:
|
||||||
return ais.processOpusConfig(msg.Data)
|
return ais.processOpusConfig(msg.Data)
|
||||||
case InputMessageTypeStop:
|
case MessageTypeStop:
|
||||||
return fmt.Errorf("stop message received")
|
return fmt.Errorf("stop message received")
|
||||||
case InputMessageTypeHeartbeat:
|
case MessageTypeHeartbeat:
|
||||||
return ais.sendAck()
|
return ais.sendAck()
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unknown message type: %d", msg.Type)
|
return fmt.Errorf("unknown message type: %d", msg.Type)
|
||||||
|
|
@ -538,7 +518,7 @@ func (ais *AudioInputServer) processOpusConfig(data []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deserialize Opus configuration
|
// Deserialize Opus configuration
|
||||||
config := InputIPCOpusConfig{
|
config := UnifiedIPCOpusConfig{
|
||||||
SampleRate: int(binary.LittleEndian.Uint32(data[0:4])),
|
SampleRate: int(binary.LittleEndian.Uint32(data[0:4])),
|
||||||
Channels: int(binary.LittleEndian.Uint32(data[4:8])),
|
Channels: int(binary.LittleEndian.Uint32(data[4:8])),
|
||||||
FrameSize: int(binary.LittleEndian.Uint32(data[8:12])),
|
FrameSize: int(binary.LittleEndian.Uint32(data[8:12])),
|
||||||
|
|
@ -552,19 +532,33 @@ func (ais *AudioInputServer) processOpusConfig(data []byte) error {
|
||||||
|
|
||||||
logger.Info().Interface("config", config).Msg("applying dynamic Opus encoder configuration")
|
logger.Info().Interface("config", config).Msg("applying dynamic Opus encoder configuration")
|
||||||
|
|
||||||
// Apply the Opus encoder configuration dynamically
|
// Note: We don't call CGOAudioInit() here as it would destroy and recreate the encoder,
|
||||||
err := CGOUpdateOpusEncoderParams(
|
// causing temporary unavailability. The encoder should already be initialized when
|
||||||
config.Bitrate,
|
// the audio input server starts.
|
||||||
config.Complexity,
|
|
||||||
config.VBR,
|
// Apply the Opus encoder configuration dynamically with retry logic
|
||||||
0, // VBR constraint - using default
|
var err error
|
||||||
config.SignalType,
|
for attempt := 0; attempt < 3; attempt++ {
|
||||||
config.Bandwidth,
|
err = CGOUpdateOpusEncoderParams(
|
||||||
config.DTX,
|
config.Bitrate,
|
||||||
)
|
config.Complexity,
|
||||||
|
config.VBR,
|
||||||
|
0, // VBR constraint - using default
|
||||||
|
config.SignalType,
|
||||||
|
config.Bandwidth,
|
||||||
|
config.DTX,
|
||||||
|
)
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
logger.Warn().Err(err).Int("attempt", attempt+1).Msg("Failed to update Opus encoder parameters, retrying")
|
||||||
|
if attempt < 2 {
|
||||||
|
time.Sleep(time.Duration(attempt+1) * 50 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error().Err(err).Msg("failed to apply Opus encoder configuration")
|
logger.Error().Err(err).Msg("failed to apply Opus encoder configuration after retries")
|
||||||
return fmt.Errorf("failed to apply Opus configuration: %w", err)
|
return fmt.Errorf("failed to apply Opus configuration: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -581,9 +575,9 @@ func (ais *AudioInputServer) sendAck() error {
|
||||||
return fmt.Errorf("no connection")
|
return fmt.Errorf("no connection")
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := &InputIPCMessage{
|
msg := &UnifiedIPCMessage{
|
||||||
Magic: inputMagicNumber,
|
Magic: inputMagicNumber,
|
||||||
Type: InputMessageTypeAck,
|
Type: MessageTypeAck,
|
||||||
Length: 0,
|
Length: 0,
|
||||||
Timestamp: time.Now().UnixNano(),
|
Timestamp: time.Now().UnixNano(),
|
||||||
}
|
}
|
||||||
|
|
@ -595,7 +589,7 @@ func (ais *AudioInputServer) sendAck() error {
|
||||||
var globalInputServerMessagePool = NewGenericMessagePool(messagePoolSize)
|
var globalInputServerMessagePool = NewGenericMessagePool(messagePoolSize)
|
||||||
|
|
||||||
// writeMessage writes a message to the connection using shared common utilities
|
// writeMessage writes a message to the connection using shared common utilities
|
||||||
func (ais *AudioInputServer) writeMessage(conn net.Conn, msg *InputIPCMessage) error {
|
func (ais *AudioInputServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error {
|
||||||
// Use shared WriteIPCMessage function with global message pool
|
// Use shared WriteIPCMessage function with global message pool
|
||||||
return WriteIPCMessage(conn, msg, globalInputServerMessagePool, &ais.droppedFrames)
|
return WriteIPCMessage(conn, msg, globalInputServerMessagePool, &ais.droppedFrames)
|
||||||
}
|
}
|
||||||
|
|
@ -673,9 +667,9 @@ func (aic *AudioInputClient) Disconnect() {
|
||||||
|
|
||||||
if aic.conn != nil {
|
if aic.conn != nil {
|
||||||
// Send stop message
|
// Send stop message
|
||||||
msg := &InputIPCMessage{
|
msg := &UnifiedIPCMessage{
|
||||||
Magic: inputMagicNumber,
|
Magic: inputMagicNumber,
|
||||||
Type: InputMessageTypeStop,
|
Type: MessageTypeStop,
|
||||||
Length: 0,
|
Length: 0,
|
||||||
Timestamp: time.Now().UnixNano(),
|
Timestamp: time.Now().UnixNano(),
|
||||||
}
|
}
|
||||||
|
|
@ -700,9 +694,9 @@ func (aic *AudioInputClient) SendFrame(frame []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Direct message creation without timestamp overhead
|
// Direct message creation without timestamp overhead
|
||||||
msg := &InputIPCMessage{
|
msg := &UnifiedIPCMessage{
|
||||||
Magic: inputMagicNumber,
|
Magic: inputMagicNumber,
|
||||||
Type: InputMessageTypeOpusFrame,
|
Type: MessageTypeOpusFrame,
|
||||||
Length: uint32(len(frame)),
|
Length: uint32(len(frame)),
|
||||||
Data: frame,
|
Data: frame,
|
||||||
}
|
}
|
||||||
|
|
@ -736,9 +730,9 @@ func (aic *AudioInputClient) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use zero-copy data directly
|
// Use zero-copy data directly
|
||||||
msg := &InputIPCMessage{
|
msg := &UnifiedIPCMessage{
|
||||||
Magic: inputMagicNumber,
|
Magic: inputMagicNumber,
|
||||||
Type: InputMessageTypeOpusFrame,
|
Type: MessageTypeOpusFrame,
|
||||||
Length: uint32(frameLen),
|
Length: uint32(frameLen),
|
||||||
Timestamp: time.Now().UnixNano(),
|
Timestamp: time.Now().UnixNano(),
|
||||||
Data: frame.Data(), // Zero-copy data access
|
Data: frame.Data(), // Zero-copy data access
|
||||||
|
|
@ -748,7 +742,7 @@ func (aic *AudioInputClient) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendConfig sends a configuration update to the audio input server
|
// SendConfig sends a configuration update to the audio input server
|
||||||
func (aic *AudioInputClient) SendConfig(config InputIPCConfig) error {
|
func (aic *AudioInputClient) SendConfig(config UnifiedIPCConfig) error {
|
||||||
aic.mtx.Lock()
|
aic.mtx.Lock()
|
||||||
defer aic.mtx.Unlock()
|
defer aic.mtx.Unlock()
|
||||||
|
|
||||||
|
|
@ -766,9 +760,9 @@ func (aic *AudioInputClient) SendConfig(config InputIPCConfig) error {
|
||||||
// Serialize config using common function
|
// Serialize config using common function
|
||||||
data := EncodeAudioConfig(config.SampleRate, config.Channels, config.FrameSize)
|
data := EncodeAudioConfig(config.SampleRate, config.Channels, config.FrameSize)
|
||||||
|
|
||||||
msg := &InputIPCMessage{
|
msg := &UnifiedIPCMessage{
|
||||||
Magic: inputMagicNumber,
|
Magic: inputMagicNumber,
|
||||||
Type: InputMessageTypeConfig,
|
Type: MessageTypeConfig,
|
||||||
Length: uint32(len(data)),
|
Length: uint32(len(data)),
|
||||||
Timestamp: time.Now().UnixNano(),
|
Timestamp: time.Now().UnixNano(),
|
||||||
Data: data,
|
Data: data,
|
||||||
|
|
@ -778,7 +772,7 @@ func (aic *AudioInputClient) SendConfig(config InputIPCConfig) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendOpusConfig sends a complete Opus encoder configuration update to the audio input server
|
// SendOpusConfig sends a complete Opus encoder configuration update to the audio input server
|
||||||
func (aic *AudioInputClient) SendOpusConfig(config InputIPCOpusConfig) error {
|
func (aic *AudioInputClient) SendOpusConfig(config UnifiedIPCOpusConfig) error {
|
||||||
aic.mtx.Lock()
|
aic.mtx.Lock()
|
||||||
defer aic.mtx.Unlock()
|
defer aic.mtx.Unlock()
|
||||||
|
|
||||||
|
|
@ -795,9 +789,9 @@ func (aic *AudioInputClient) SendOpusConfig(config InputIPCOpusConfig) error {
|
||||||
// Serialize Opus configuration using common function
|
// Serialize Opus configuration using common function
|
||||||
data := EncodeOpusConfig(config.SampleRate, config.Channels, config.FrameSize, config.Bitrate, config.Complexity, config.VBR, config.SignalType, config.Bandwidth, config.DTX)
|
data := EncodeOpusConfig(config.SampleRate, config.Channels, config.FrameSize, config.Bitrate, config.Complexity, config.VBR, config.SignalType, config.Bandwidth, config.DTX)
|
||||||
|
|
||||||
msg := &InputIPCMessage{
|
msg := &UnifiedIPCMessage{
|
||||||
Magic: inputMagicNumber,
|
Magic: inputMagicNumber,
|
||||||
Type: InputMessageTypeOpusConfig,
|
Type: MessageTypeOpusConfig,
|
||||||
Length: uint32(len(data)),
|
Length: uint32(len(data)),
|
||||||
Timestamp: time.Now().UnixNano(),
|
Timestamp: time.Now().UnixNano(),
|
||||||
Data: data,
|
Data: data,
|
||||||
|
|
@ -815,9 +809,9 @@ func (aic *AudioInputClient) SendHeartbeat() error {
|
||||||
return fmt.Errorf("not connected to audio input server")
|
return fmt.Errorf("not connected to audio input server")
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := &InputIPCMessage{
|
msg := &UnifiedIPCMessage{
|
||||||
Magic: inputMagicNumber,
|
Magic: inputMagicNumber,
|
||||||
Type: InputMessageTypeHeartbeat,
|
Type: MessageTypeHeartbeat,
|
||||||
Length: 0,
|
Length: 0,
|
||||||
Timestamp: time.Now().UnixNano(),
|
Timestamp: time.Now().UnixNano(),
|
||||||
}
|
}
|
||||||
|
|
@ -829,7 +823,7 @@ func (aic *AudioInputClient) SendHeartbeat() error {
|
||||||
// Global shared message pool for input IPC clients
|
// Global shared message pool for input IPC clients
|
||||||
var globalInputMessagePool = NewGenericMessagePool(messagePoolSize)
|
var globalInputMessagePool = NewGenericMessagePool(messagePoolSize)
|
||||||
|
|
||||||
func (aic *AudioInputClient) writeMessage(msg *InputIPCMessage) error {
|
func (aic *AudioInputClient) writeMessage(msg *UnifiedIPCMessage) error {
|
||||||
// Increment total frames counter
|
// Increment total frames counter
|
||||||
atomic.AddInt64(&aic.totalFrames, 1)
|
atomic.AddInt64(&aic.totalFrames, 1)
|
||||||
|
|
||||||
|
|
@ -1093,9 +1087,9 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// processMessageWithRecovery processes a message with enhanced error recovery
|
// processMessageWithRecovery processes a message with enhanced error recovery
|
||||||
func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, logger zerolog.Logger) error {
|
func (ais *AudioInputServer) processMessageWithRecovery(msg *UnifiedIPCMessage, logger zerolog.Logger) error {
|
||||||
// Intelligent frame dropping: prioritize recent frames
|
// Intelligent frame dropping: prioritize recent frames
|
||||||
if msg.Type == InputMessageTypeOpusFrame {
|
if msg.Type == MessageTypeOpusFrame {
|
||||||
// Check if processing queue is getting full
|
// Check if processing queue is getting full
|
||||||
processChan := ais.getProcessChan()
|
processChan := ais.getProcessChan()
|
||||||
queueLen := len(processChan)
|
queueLen := len(processChan)
|
||||||
|
|
@ -1172,7 +1166,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
|
||||||
|
|
||||||
// Calculate end-to-end latency using message timestamp
|
// Calculate end-to-end latency using message timestamp
|
||||||
var latency time.Duration
|
var latency time.Duration
|
||||||
if msg.Type == InputMessageTypeOpusFrame && msg.Timestamp > 0 {
|
if msg.Type == MessageTypeOpusFrame && msg.Timestamp > 0 {
|
||||||
msgTime := time.Unix(0, msg.Timestamp)
|
msgTime := time.Unix(0, msg.Timestamp)
|
||||||
latency = time.Since(msgTime)
|
latency = time.Since(msgTime)
|
||||||
// Use exponential moving average for end-to-end latency tracking
|
// Use exponential moving average for end-to-end latency tracking
|
||||||
|
|
@ -1291,14 +1285,14 @@ func GetGlobalMessagePoolStats() MessagePoolStats {
|
||||||
}
|
}
|
||||||
|
|
||||||
// getMessageChan safely returns the current message channel
|
// getMessageChan safely returns the current message channel
|
||||||
func (ais *AudioInputServer) getMessageChan() chan *InputIPCMessage {
|
func (ais *AudioInputServer) getMessageChan() chan *UnifiedIPCMessage {
|
||||||
ais.channelMutex.RLock()
|
ais.channelMutex.RLock()
|
||||||
defer ais.channelMutex.RUnlock()
|
defer ais.channelMutex.RUnlock()
|
||||||
return ais.messageChan
|
return ais.messageChan
|
||||||
}
|
}
|
||||||
|
|
||||||
// getProcessChan safely returns the current process channel
|
// getProcessChan safely returns the current process channel
|
||||||
func (ais *AudioInputServer) getProcessChan() chan *InputIPCMessage {
|
func (ais *AudioInputServer) getProcessChan() chan *UnifiedIPCMessage {
|
||||||
ais.channelMutex.RLock()
|
ais.channelMutex.RLock()
|
||||||
defer ais.channelMutex.RUnlock()
|
defer ais.channelMutex.RUnlock()
|
||||||
return ais.processChan
|
return ais.processChan
|
||||||
|
|
|
||||||
|
|
@ -13,24 +13,6 @@ import (
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Legacy aliases for backward compatibility
|
|
||||||
type OutputIPCConfig = UnifiedIPCConfig
|
|
||||||
type OutputIPCOpusConfig = UnifiedIPCOpusConfig
|
|
||||||
type OutputMessageType = UnifiedMessageType
|
|
||||||
type OutputIPCMessage = UnifiedIPCMessage
|
|
||||||
|
|
||||||
// Legacy constants for backward compatibility
|
|
||||||
const (
|
|
||||||
OutputMessageTypeOpusFrame = MessageTypeOpusFrame
|
|
||||||
OutputMessageTypeConfig = MessageTypeConfig
|
|
||||||
OutputMessageTypeOpusConfig = MessageTypeOpusConfig
|
|
||||||
OutputMessageTypeStop = MessageTypeStop
|
|
||||||
OutputMessageTypeHeartbeat = MessageTypeHeartbeat
|
|
||||||
OutputMessageTypeAck = MessageTypeAck
|
|
||||||
)
|
|
||||||
|
|
||||||
// Methods are now inherited from UnifiedIPCMessage
|
|
||||||
|
|
||||||
// Global shared message pool for output IPC client header reading
|
// Global shared message pool for output IPC client header reading
|
||||||
var globalOutputClientMessagePool = NewGenericMessagePool(Config.OutputMessagePoolSize)
|
var globalOutputClientMessagePool = NewGenericMessagePool(Config.OutputMessagePoolSize)
|
||||||
|
|
||||||
|
|
@ -48,9 +30,9 @@ type AudioOutputServer struct {
|
||||||
logger zerolog.Logger
|
logger zerolog.Logger
|
||||||
|
|
||||||
// Message channels
|
// Message channels
|
||||||
messageChan chan *OutputIPCMessage // Buffered channel for incoming messages
|
messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages
|
||||||
processChan chan *OutputIPCMessage // Buffered channel for processing queue
|
processChan chan *UnifiedIPCMessage // Buffered channel for processing queue
|
||||||
wg sync.WaitGroup // Wait group for goroutine coordination
|
wg sync.WaitGroup // Wait group for goroutine coordination
|
||||||
|
|
||||||
// Configuration
|
// Configuration
|
||||||
socketPath string
|
socketPath string
|
||||||
|
|
@ -65,8 +47,8 @@ func NewAudioOutputServer() (*AudioOutputServer, error) {
|
||||||
socketPath: socketPath,
|
socketPath: socketPath,
|
||||||
magicNumber: Config.OutputMagicNumber,
|
magicNumber: Config.OutputMagicNumber,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
messageChan: make(chan *OutputIPCMessage, Config.ChannelBufferSize),
|
messageChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize),
|
||||||
processChan: make(chan *OutputIPCMessage, Config.ChannelBufferSize),
|
processChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize),
|
||||||
}
|
}
|
||||||
|
|
||||||
return server, nil
|
return server, nil
|
||||||
|
|
@ -112,6 +94,7 @@ func (s *AudioOutputServer) Stop() {
|
||||||
|
|
||||||
if s.listener != nil {
|
if s.listener != nil {
|
||||||
s.listener.Close()
|
s.listener.Close()
|
||||||
|
s.listener = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.conn != nil {
|
if s.conn != nil {
|
||||||
|
|
@ -171,7 +154,7 @@ func (s *AudioOutputServer) handleConnection(conn net.Conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// readMessage reads a message from the connection
|
// readMessage reads a message from the connection
|
||||||
func (s *AudioOutputServer) readMessage(conn net.Conn) (*OutputIPCMessage, error) {
|
func (s *AudioOutputServer) readMessage(conn net.Conn) (*UnifiedIPCMessage, error) {
|
||||||
header := make([]byte, 17)
|
header := make([]byte, 17)
|
||||||
if _, err := io.ReadFull(conn, header); err != nil {
|
if _, err := io.ReadFull(conn, header); err != nil {
|
||||||
return nil, fmt.Errorf("failed to read header: %w", err)
|
return nil, fmt.Errorf("failed to read header: %w", err)
|
||||||
|
|
@ -182,7 +165,7 @@ func (s *AudioOutputServer) readMessage(conn net.Conn) (*OutputIPCMessage, error
|
||||||
return nil, fmt.Errorf("invalid magic number: expected %d, got %d", s.magicNumber, magic)
|
return nil, fmt.Errorf("invalid magic number: expected %d, got %d", s.magicNumber, magic)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgType := OutputMessageType(header[4])
|
msgType := UnifiedMessageType(header[4])
|
||||||
length := binary.LittleEndian.Uint32(header[5:9])
|
length := binary.LittleEndian.Uint32(header[5:9])
|
||||||
timestamp := int64(binary.LittleEndian.Uint64(header[9:17]))
|
timestamp := int64(binary.LittleEndian.Uint64(header[9:17]))
|
||||||
|
|
||||||
|
|
@ -194,7 +177,7 @@ func (s *AudioOutputServer) readMessage(conn net.Conn) (*OutputIPCMessage, error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return &OutputIPCMessage{
|
return &UnifiedIPCMessage{
|
||||||
Magic: magic,
|
Magic: magic,
|
||||||
Type: msgType,
|
Type: msgType,
|
||||||
Length: length,
|
Length: length,
|
||||||
|
|
@ -204,14 +187,14 @@ func (s *AudioOutputServer) readMessage(conn net.Conn) (*OutputIPCMessage, error
|
||||||
}
|
}
|
||||||
|
|
||||||
// processMessage processes a received message
|
// processMessage processes a received message
|
||||||
func (s *AudioOutputServer) processMessage(msg *OutputIPCMessage) error {
|
func (s *AudioOutputServer) processMessage(msg *UnifiedIPCMessage) error {
|
||||||
switch msg.Type {
|
switch msg.Type {
|
||||||
case OutputMessageTypeOpusConfig:
|
case MessageTypeOpusConfig:
|
||||||
return s.processOpusConfig(msg.Data)
|
return s.processOpusConfig(msg.Data)
|
||||||
case OutputMessageTypeStop:
|
case MessageTypeStop:
|
||||||
s.logger.Info().Msg("Received stop message")
|
s.logger.Info().Msg("Received stop message")
|
||||||
return nil
|
return nil
|
||||||
case OutputMessageTypeHeartbeat:
|
case MessageTypeHeartbeat:
|
||||||
s.logger.Debug().Msg("Received heartbeat")
|
s.logger.Debug().Msg("Received heartbeat")
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
|
|
@ -228,7 +211,7 @@ func (s *AudioOutputServer) processOpusConfig(data []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode Opus configuration
|
// Decode Opus configuration
|
||||||
config := OutputIPCOpusConfig{
|
config := UnifiedIPCOpusConfig{
|
||||||
SampleRate: int(binary.LittleEndian.Uint32(data[0:4])),
|
SampleRate: int(binary.LittleEndian.Uint32(data[0:4])),
|
||||||
Channels: int(binary.LittleEndian.Uint32(data[4:8])),
|
Channels: int(binary.LittleEndian.Uint32(data[4:8])),
|
||||||
FrameSize: int(binary.LittleEndian.Uint32(data[8:12])),
|
FrameSize: int(binary.LittleEndian.Uint32(data[8:12])),
|
||||||
|
|
@ -282,9 +265,9 @@ func (s *AudioOutputServer) SendFrame(frame []byte) error {
|
||||||
return fmt.Errorf("no client connected")
|
return fmt.Errorf("no client connected")
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := &OutputIPCMessage{
|
msg := &UnifiedIPCMessage{
|
||||||
Magic: s.magicNumber,
|
Magic: s.magicNumber,
|
||||||
Type: OutputMessageTypeOpusFrame,
|
Type: MessageTypeOpusFrame,
|
||||||
Length: uint32(len(frame)),
|
Length: uint32(len(frame)),
|
||||||
Timestamp: time.Now().UnixNano(),
|
Timestamp: time.Now().UnixNano(),
|
||||||
Data: frame,
|
Data: frame,
|
||||||
|
|
@ -294,8 +277,9 @@ func (s *AudioOutputServer) SendFrame(frame []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeMessage writes a message to the connection
|
// writeMessage writes a message to the connection
|
||||||
func (s *AudioOutputServer) writeMessage(conn net.Conn, msg *OutputIPCMessage) error {
|
func (s *AudioOutputServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error {
|
||||||
header := EncodeMessageHeader(msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp)
|
header := make([]byte, 17)
|
||||||
|
EncodeMessageHeader(header, msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp)
|
||||||
|
|
||||||
if _, err := conn.Write(header); err != nil {
|
if _, err := conn.Write(header); err != nil {
|
||||||
return fmt.Errorf("failed to write header: %w", err)
|
return fmt.Errorf("failed to write header: %w", err)
|
||||||
|
|
@ -415,8 +399,8 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) {
|
||||||
return nil, fmt.Errorf("invalid magic number in IPC message: got 0x%x, expected 0x%x", magic, outputMagicNumber)
|
return nil, fmt.Errorf("invalid magic number in IPC message: got 0x%x, expected 0x%x", magic, outputMagicNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
msgType := OutputMessageType(optMsg.header[4])
|
msgType := UnifiedMessageType(optMsg.header[4])
|
||||||
if msgType != OutputMessageTypeOpusFrame {
|
if msgType != MessageTypeOpusFrame {
|
||||||
return nil, fmt.Errorf("unexpected message type: %d", msgType)
|
return nil, fmt.Errorf("unexpected message type: %d", msgType)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -443,7 +427,7 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendOpusConfig sends Opus configuration to the audio output server
|
// SendOpusConfig sends Opus configuration to the audio output server
|
||||||
func (c *AudioOutputClient) SendOpusConfig(config OutputIPCOpusConfig) error {
|
func (c *AudioOutputClient) SendOpusConfig(config UnifiedIPCOpusConfig) error {
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
|
|
||||||
|
|
@ -460,9 +444,9 @@ func (c *AudioOutputClient) SendOpusConfig(config OutputIPCOpusConfig) error {
|
||||||
// Serialize Opus configuration using common function
|
// Serialize Opus configuration using common function
|
||||||
data := EncodeOpusConfig(config.SampleRate, config.Channels, config.FrameSize, config.Bitrate, config.Complexity, config.VBR, config.SignalType, config.Bandwidth, config.DTX)
|
data := EncodeOpusConfig(config.SampleRate, config.Channels, config.FrameSize, config.Bitrate, config.Complexity, config.VBR, config.SignalType, config.Bandwidth, config.DTX)
|
||||||
|
|
||||||
msg := &OutputIPCMessage{
|
msg := &UnifiedIPCMessage{
|
||||||
Magic: c.magicNumber,
|
Magic: c.magicNumber,
|
||||||
Type: OutputMessageTypeOpusConfig,
|
Type: MessageTypeOpusConfig,
|
||||||
Length: uint32(len(data)),
|
Length: uint32(len(data)),
|
||||||
Timestamp: time.Now().UnixNano(),
|
Timestamp: time.Now().UnixNano(),
|
||||||
Data: data,
|
Data: data,
|
||||||
|
|
@ -472,8 +456,9 @@ func (c *AudioOutputClient) SendOpusConfig(config OutputIPCOpusConfig) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeMessage writes a message to the connection
|
// writeMessage writes a message to the connection
|
||||||
func (c *AudioOutputClient) writeMessage(msg *OutputIPCMessage) error {
|
func (c *AudioOutputClient) writeMessage(msg *UnifiedIPCMessage) error {
|
||||||
header := EncodeMessageHeader(msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp)
|
header := make([]byte, 17)
|
||||||
|
EncodeMessageHeader(header, msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp)
|
||||||
|
|
||||||
if _, err := c.conn.Write(header); err != nil {
|
if _, err := c.conn.Write(header); err != nil {
|
||||||
return fmt.Errorf("failed to write header: %w", err)
|
return fmt.Errorf("failed to write header: %w", err)
|
||||||
|
|
|
||||||
|
|
@ -389,7 +389,8 @@ func (s *UnifiedAudioServer) SendFrame(frame []byte) error {
|
||||||
|
|
||||||
// writeMessage writes a message to the connection
|
// writeMessage writes a message to the connection
|
||||||
func (s *UnifiedAudioServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error {
|
func (s *UnifiedAudioServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error {
|
||||||
header := EncodeMessageHeader(msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp)
|
header := make([]byte, 17)
|
||||||
|
EncodeMessageHeader(header, msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp)
|
||||||
|
|
||||||
// Optimize: Use single write for header+data to reduce system calls
|
// Optimize: Use single write for header+data to reduce system calls
|
||||||
if msg.Length > 0 && msg.Data != nil {
|
if msg.Length > 0 && msg.Data != nil {
|
||||||
|
|
|
||||||
|
|
@ -62,7 +62,7 @@ func (aim *AudioInputIPCManager) Start() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
config := InputIPCConfig{
|
config := UnifiedIPCConfig{
|
||||||
SampleRate: Config.InputIPCSampleRate,
|
SampleRate: Config.InputIPCSampleRate,
|
||||||
Channels: Config.InputIPCChannels,
|
Channels: Config.InputIPCChannels,
|
||||||
FrameSize: Config.InputIPCFrameSize,
|
FrameSize: Config.InputIPCFrameSize,
|
||||||
|
|
@ -72,7 +72,7 @@ func (aim *AudioInputIPCManager) Start() error {
|
||||||
if err := ValidateInputIPCConfig(config.SampleRate, config.Channels, config.FrameSize); err != nil {
|
if err := ValidateInputIPCConfig(config.SampleRate, config.Channels, config.FrameSize); err != nil {
|
||||||
aim.logger.Warn().Err(err).Msg("invalid input IPC config from constants, using defaults")
|
aim.logger.Warn().Err(err).Msg("invalid input IPC config from constants, using defaults")
|
||||||
// Use safe defaults if config validation fails
|
// Use safe defaults if config validation fails
|
||||||
config = InputIPCConfig{
|
config = UnifiedIPCConfig{
|
||||||
SampleRate: 48000,
|
SampleRate: 48000,
|
||||||
Channels: 2,
|
Channels: 2,
|
||||||
FrameSize: 960,
|
FrameSize: 960,
|
||||||
|
|
|
||||||
|
|
@ -56,7 +56,7 @@ func (aom *AudioOutputIPCManager) Start() error {
|
||||||
aom.logComponentStarted(AudioOutputIPCComponent)
|
aom.logComponentStarted(AudioOutputIPCComponent)
|
||||||
|
|
||||||
// Send initial configuration
|
// Send initial configuration
|
||||||
config := OutputIPCConfig{
|
config := UnifiedIPCConfig{
|
||||||
SampleRate: Config.SampleRate,
|
SampleRate: Config.SampleRate,
|
||||||
Channels: Config.Channels,
|
Channels: Config.Channels,
|
||||||
FrameSize: int(Config.AudioQualityMediumFrameSize.Milliseconds()),
|
FrameSize: int(Config.AudioQualityMediumFrameSize.Milliseconds()),
|
||||||
|
|
@ -202,7 +202,7 @@ func (aom *AudioOutputIPCManager) calculateFrameRate() float64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendConfig sends configuration to the IPC server
|
// SendConfig sends configuration to the IPC server
|
||||||
func (aom *AudioOutputIPCManager) SendConfig(config OutputIPCConfig) error {
|
func (aom *AudioOutputIPCManager) SendConfig(config UnifiedIPCConfig) error {
|
||||||
if aom.server == nil {
|
if aom.server == nil {
|
||||||
return fmt.Errorf("audio output server not initialized")
|
return fmt.Errorf("audio output server not initialized")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -318,7 +318,7 @@ func (s *AudioOutputSupervisor) connectClient() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendOpusConfig sends Opus configuration to the audio output subprocess
|
// SendOpusConfig sends Opus configuration to the audio output subprocess
|
||||||
func (s *AudioOutputSupervisor) SendOpusConfig(config OutputIPCOpusConfig) error {
|
func (aos *AudioOutputSupervisor) SendOpusConfig(config UnifiedIPCOpusConfig) error {
|
||||||
if outputClient == nil {
|
if outputClient == nil {
|
||||||
return fmt.Errorf("client not initialized")
|
return fmt.Errorf("client not initialized")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -63,10 +63,10 @@ type AudioConfig struct {
|
||||||
|
|
||||||
// AudioMetrics tracks audio performance metrics
|
// AudioMetrics tracks audio performance metrics
|
||||||
type AudioMetrics struct {
|
type AudioMetrics struct {
|
||||||
FramesReceived int64
|
FramesReceived uint64
|
||||||
FramesDropped int64
|
FramesDropped uint64
|
||||||
BytesProcessed int64
|
BytesProcessed uint64
|
||||||
ConnectionDrops int64
|
ConnectionDrops uint64
|
||||||
LastFrameTime time.Time
|
LastFrameTime time.Time
|
||||||
AverageLatency time.Duration
|
AverageLatency time.Duration
|
||||||
}
|
}
|
||||||
|
|
@ -214,8 +214,8 @@ func SetAudioQuality(quality AudioQuality) {
|
||||||
|
|
||||||
// Send dynamic configuration update to running subprocess via IPC
|
// Send dynamic configuration update to running subprocess via IPC
|
||||||
if supervisor.IsConnected() {
|
if supervisor.IsConnected() {
|
||||||
// Convert AudioConfig to OutputIPCOpusConfig with complete Opus parameters
|
// Convert AudioConfig to UnifiedIPCOpusConfig with complete Opus parameters
|
||||||
opusConfig := OutputIPCOpusConfig{
|
opusConfig := UnifiedIPCOpusConfig{
|
||||||
SampleRate: config.SampleRate,
|
SampleRate: config.SampleRate,
|
||||||
Channels: config.Channels,
|
Channels: config.Channels,
|
||||||
FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples
|
FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples
|
||||||
|
|
@ -311,8 +311,8 @@ func SetMicrophoneQuality(quality AudioQuality) {
|
||||||
|
|
||||||
// Send dynamic configuration update to running subprocess via IPC
|
// Send dynamic configuration update to running subprocess via IPC
|
||||||
if supervisor.IsConnected() {
|
if supervisor.IsConnected() {
|
||||||
// Convert AudioConfig to InputIPCOpusConfig with complete Opus parameters
|
// Convert AudioConfig to UnifiedIPCOpusConfig with complete Opus parameters
|
||||||
opusConfig := InputIPCOpusConfig{
|
opusConfig := UnifiedIPCOpusConfig{
|
||||||
SampleRate: config.SampleRate,
|
SampleRate: config.SampleRate,
|
||||||
Channels: config.Channels,
|
Channels: config.Channels,
|
||||||
FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples
|
FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples
|
||||||
|
|
@ -363,10 +363,10 @@ func GetGlobalAudioMetrics() AudioMetrics {
|
||||||
|
|
||||||
// Batched metrics to reduce atomic operations frequency
|
// Batched metrics to reduce atomic operations frequency
|
||||||
var (
|
var (
|
||||||
batchedFramesReceived int64
|
batchedFramesReceived uint64
|
||||||
batchedBytesProcessed int64
|
batchedBytesProcessed uint64
|
||||||
batchedFramesDropped int64
|
batchedFramesDropped uint64
|
||||||
batchedConnectionDrops int64
|
batchedConnectionDrops uint64
|
||||||
|
|
||||||
lastFlushTime int64 // Unix timestamp in nanoseconds
|
lastFlushTime int64 // Unix timestamp in nanoseconds
|
||||||
)
|
)
|
||||||
|
|
@ -374,7 +374,7 @@ var (
|
||||||
// RecordFrameReceived increments the frames received counter with batched updates
|
// RecordFrameReceived increments the frames received counter with batched updates
|
||||||
func RecordFrameReceived(bytes int) {
|
func RecordFrameReceived(bytes int) {
|
||||||
// Use local batching to reduce atomic operations frequency
|
// Use local batching to reduce atomic operations frequency
|
||||||
atomic.AddInt64(&batchedBytesProcessed, int64(bytes))
|
atomic.AddUint64(&batchedBytesProcessed, uint64(bytes))
|
||||||
|
|
||||||
// Update timestamp immediately for accurate tracking
|
// Update timestamp immediately for accurate tracking
|
||||||
metrics.LastFrameTime = time.Now()
|
metrics.LastFrameTime = time.Now()
|
||||||
|
|
@ -391,23 +391,23 @@ func RecordConnectionDrop() {
|
||||||
// flushBatchedMetrics flushes accumulated metrics to the main counters
|
// flushBatchedMetrics flushes accumulated metrics to the main counters
|
||||||
func flushBatchedMetrics() {
|
func flushBatchedMetrics() {
|
||||||
// Atomically move batched metrics to main metrics
|
// Atomically move batched metrics to main metrics
|
||||||
framesReceived := atomic.SwapInt64(&batchedFramesReceived, 0)
|
framesReceived := atomic.SwapUint64(&batchedFramesReceived, 0)
|
||||||
bytesProcessed := atomic.SwapInt64(&batchedBytesProcessed, 0)
|
bytesProcessed := atomic.SwapUint64(&batchedBytesProcessed, 0)
|
||||||
framesDropped := atomic.SwapInt64(&batchedFramesDropped, 0)
|
framesDropped := atomic.SwapUint64(&batchedFramesDropped, 0)
|
||||||
connectionDrops := atomic.SwapInt64(&batchedConnectionDrops, 0)
|
connectionDrops := atomic.SwapUint64(&batchedConnectionDrops, 0)
|
||||||
|
|
||||||
// Update main metrics if we have any batched data
|
// Update main metrics if we have any batched data
|
||||||
if framesReceived > 0 {
|
if framesReceived > 0 {
|
||||||
atomic.AddInt64(&metrics.FramesReceived, framesReceived)
|
atomic.AddUint64(&metrics.FramesReceived, framesReceived)
|
||||||
}
|
}
|
||||||
if bytesProcessed > 0 {
|
if bytesProcessed > 0 {
|
||||||
atomic.AddInt64(&metrics.BytesProcessed, bytesProcessed)
|
atomic.AddUint64(&metrics.BytesProcessed, bytesProcessed)
|
||||||
}
|
}
|
||||||
if framesDropped > 0 {
|
if framesDropped > 0 {
|
||||||
atomic.AddInt64(&metrics.FramesDropped, framesDropped)
|
atomic.AddUint64(&metrics.FramesDropped, framesDropped)
|
||||||
}
|
}
|
||||||
if connectionDrops > 0 {
|
if connectionDrops > 0 {
|
||||||
atomic.AddInt64(&metrics.ConnectionDrops, connectionDrops)
|
atomic.AddUint64(&metrics.ConnectionDrops, connectionDrops)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update last flush time
|
// Update last flush time
|
||||||
|
|
|
||||||
35
jsonrpc.go
35
jsonrpc.go
|
|
@ -20,8 +20,6 @@ import (
|
||||||
"github.com/jetkvm/kvm/internal/usbgadget"
|
"github.com/jetkvm/kvm/internal/usbgadget"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Direct RPC message handling for optimal input responsiveness
|
|
||||||
|
|
||||||
type JSONRPCRequest struct {
|
type JSONRPCRequest struct {
|
||||||
JSONRPC string `json:"jsonrpc"`
|
JSONRPC string `json:"jsonrpc"`
|
||||||
Method string `json:"method"`
|
Method string `json:"method"`
|
||||||
|
|
@ -123,39 +121,6 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) {
|
||||||
|
|
||||||
scopedLogger.Trace().Msg("Received RPC request")
|
scopedLogger.Trace().Msg("Received RPC request")
|
||||||
|
|
||||||
// Fast path for input methods - bypass reflection for performance
|
|
||||||
// This optimization reduces latency by 3-6ms per input event by:
|
|
||||||
// - Eliminating reflection overhead
|
|
||||||
// - Reducing memory allocations
|
|
||||||
// - Optimizing parameter parsing and validation
|
|
||||||
// See input_rpc.go for implementation details
|
|
||||||
if isInputMethod(request.Method) {
|
|
||||||
result, err := handleInputRPCDirect(request.Method, request.Params)
|
|
||||||
if err != nil {
|
|
||||||
scopedLogger.Error().Err(err).Msg("Error calling direct input handler")
|
|
||||||
errorResponse := JSONRPCResponse{
|
|
||||||
JSONRPC: "2.0",
|
|
||||||
Error: map[string]interface{}{
|
|
||||||
"code": -32603,
|
|
||||||
"message": "Internal error",
|
|
||||||
"data": err.Error(),
|
|
||||||
},
|
|
||||||
ID: request.ID,
|
|
||||||
}
|
|
||||||
writeJSONRPCResponse(errorResponse, session)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
response := JSONRPCResponse{
|
|
||||||
JSONRPC: "2.0",
|
|
||||||
Result: result,
|
|
||||||
ID: request.ID,
|
|
||||||
}
|
|
||||||
writeJSONRPCResponse(response, session)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fallback to reflection-based handler for non-input methods
|
|
||||||
handler, ok := rpcHandlers[request.Method]
|
handler, ok := rpcHandlers[request.Method]
|
||||||
if !ok {
|
if !ok {
|
||||||
errorResponse := JSONRPCResponse{
|
errorResponse := JSONRPCResponse{
|
||||||
|
|
|
||||||
|
|
@ -195,12 +195,6 @@ export function useMicrophone() {
|
||||||
|
|
||||||
// Find the audio transceiver (should already exist with sendrecv direction)
|
// Find the audio transceiver (should already exist with sendrecv direction)
|
||||||
const transceivers = peerConnection.getTransceivers();
|
const transceivers = peerConnection.getTransceivers();
|
||||||
devLog("Available transceivers:", transceivers.map((t: RTCRtpTransceiver) => ({
|
|
||||||
direction: t.direction,
|
|
||||||
mid: t.mid,
|
|
||||||
senderTrack: t.sender.track?.kind,
|
|
||||||
receiverTrack: t.receiver.track?.kind
|
|
||||||
})));
|
|
||||||
|
|
||||||
// Look for an audio transceiver that can send (has sendrecv or sendonly direction)
|
// Look for an audio transceiver that can send (has sendrecv or sendonly direction)
|
||||||
const audioTransceiver = transceivers.find((transceiver: RTCRtpTransceiver) => {
|
const audioTransceiver = transceivers.find((transceiver: RTCRtpTransceiver) => {
|
||||||
|
|
|
||||||
6
usb.go
6
usb.go
|
|
@ -60,16 +60,10 @@ func rpcRelMouseReport(dx int8, dy int8, buttons uint8) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func rpcWheelReport(wheelY int8) error {
|
func rpcWheelReport(wheelY int8) error {
|
||||||
if gadget == nil {
|
|
||||||
return nil // Gracefully handle uninitialized gadget (e.g., in tests)
|
|
||||||
}
|
|
||||||
return gadget.AbsMouseWheelReport(wheelY)
|
return gadget.AbsMouseWheelReport(wheelY)
|
||||||
}
|
}
|
||||||
|
|
||||||
func rpcGetKeyboardLedState() (state usbgadget.KeyboardState) {
|
func rpcGetKeyboardLedState() (state usbgadget.KeyboardState) {
|
||||||
if gadget == nil {
|
|
||||||
return usbgadget.KeyboardState{} // Return empty state for uninitialized gadget
|
|
||||||
}
|
|
||||||
return gadget.GetKeyboardState()
|
return gadget.GetKeyboardState()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue