Compare commits

..

9 Commits

Author SHA1 Message Date
Alex ffdbce4cc9
Merge 5da357ba01 into c8dd84c6b7 2025-09-09 23:32:05 +00:00
Alex P 5da357ba01 [WIP] Cleanup: Remove hid optimization code, as it is out of scope 2025-09-09 23:31:58 +00:00
Alex P eab0261344 Cleanup: remove devLog with calculated param 2025-09-09 22:44:33 +00:00
Alex P e0b6e612c0 Updates: defer the mutex unlock 2025-09-09 22:03:35 +00:00
Alex P f48c3fe25a [WIP] Updates, Cleanup: use uint64 for non-negative values 2025-09-09 21:54:36 +00:00
Alex P d4c10aef87 Updates: use uint64 since we won't have negative numbers here 2025-09-09 21:37:08 +00:00
Alex P 2a81497d34 Improvements: input performance 2025-09-09 20:58:34 +00:00
Alex P 8cff7d600b pr-optimizations,perf(input): optimize JSON-RPC input handling with ultra-fast path
Add ultra-fast path for input methods that completely bypasses float64 conversions and reflection
Use direct JSON unmarshaling to target types for maximum
2025-09-09 18:56:54 +00:00
Alex P eca1e6a80d Cleanup: implement PR Review suggestions 2025-09-09 18:09:19 +00:00
16 changed files with 160 additions and 449 deletions

View File

@ -1,218 +0,0 @@
package kvm
import (
"fmt"
)
// Constants for input validation
const (
// MaxKeyboardKeys defines the maximum number of simultaneous key presses
// This matches the USB HID keyboard report specification
MaxKeyboardKeys = 6
)
// Input RPC Direct Handlers
// This module provides optimized direct handlers for high-frequency input events,
// bypassing the reflection-based RPC system for improved performance.
//
// Performance benefits:
// - Eliminates reflection overhead (~2-3ms per call)
// - Reduces memory allocations
// - Optimizes parameter parsing and validation
// - Provides faster code path for input methods
//
// The handlers maintain full compatibility with existing RPC interface
// while providing significant latency improvements for input events.
// Common validation helpers for parameter parsing
// These reduce code duplication and provide consistent error messages
// validateFloat64Param extracts and validates a float64 parameter from the params map
func validateFloat64Param(params map[string]interface{}, paramName, methodName string, min, max float64) (float64, error) {
value, ok := params[paramName].(float64)
if !ok {
return 0, fmt.Errorf("%s: %s parameter must be a number, got %T", methodName, paramName, params[paramName])
}
if value < min || value > max {
return 0, fmt.Errorf("%s: %s value %v out of range [%v to %v]", methodName, paramName, value, min, max)
}
return value, nil
}
// validateKeysArray extracts and validates a keys array parameter
func validateKeysArray(params map[string]interface{}, methodName string) ([]uint8, error) {
keysInterface, ok := params["keys"].([]interface{})
if !ok {
return nil, fmt.Errorf("%s: keys parameter must be an array, got %T", methodName, params["keys"])
}
if len(keysInterface) > MaxKeyboardKeys {
return nil, fmt.Errorf("%s: too many keys (%d), maximum is %d", methodName, len(keysInterface), MaxKeyboardKeys)
}
keys := make([]uint8, len(keysInterface))
for i, keyInterface := range keysInterface {
keyFloat, ok := keyInterface.(float64)
if !ok {
return nil, fmt.Errorf("%s: key at index %d must be a number, got %T", methodName, i, keyInterface)
}
if keyFloat < 0 || keyFloat > 255 {
return nil, fmt.Errorf("%s: key at index %d value %v out of range [0-255]", methodName, i, keyFloat)
}
keys[i] = uint8(keyFloat)
}
return keys, nil
}
// Input parameter structures for direct RPC handlers
// These mirror the original RPC method signatures but provide
// optimized parsing from JSON map parameters.
// KeyboardReportParams represents parameters for keyboard HID report
// Matches rpcKeyboardReport(modifier uint8, keys []uint8)
type KeyboardReportParams struct {
Modifier uint8 `json:"modifier"` // Keyboard modifier keys (Ctrl, Alt, Shift, etc.)
Keys []uint8 `json:"keys"` // Array of pressed key codes (up to 6 keys)
}
// AbsMouseReportParams represents parameters for absolute mouse positioning
// Matches rpcAbsMouseReport(x, y int, buttons uint8)
type AbsMouseReportParams struct {
X int `json:"x"` // Absolute X coordinate (0-32767)
Y int `json:"y"` // Absolute Y coordinate (0-32767)
Buttons uint8 `json:"buttons"` // Mouse button state bitmask
}
// RelMouseReportParams represents parameters for relative mouse movement
// Matches rpcRelMouseReport(dx, dy int8, buttons uint8)
type RelMouseReportParams struct {
Dx int8 `json:"dx"` // Relative X movement delta (-127 to +127)
Dy int8 `json:"dy"` // Relative Y movement delta (-127 to +127)
Buttons uint8 `json:"buttons"` // Mouse button state bitmask
}
// WheelReportParams represents parameters for mouse wheel events
// Matches rpcWheelReport(wheelY int8)
type WheelReportParams struct {
WheelY int8 `json:"wheelY"` // Wheel scroll delta (-127 to +127)
}
// Direct handler for keyboard reports
// Optimized path that bypasses reflection for keyboard input events
func handleKeyboardReportDirect(params map[string]interface{}) (interface{}, error) {
// Extract and validate modifier parameter
modifierFloat, err := validateFloat64Param(params, "modifier", "keyboardReport", 0, 255)
if err != nil {
return nil, err
}
modifier := uint8(modifierFloat)
// Extract and validate keys array
keys, err := validateKeysArray(params, "keyboardReport")
if err != nil {
return nil, err
}
_, err = rpcKeyboardReport(modifier, keys)
return nil, err
}
// Direct handler for absolute mouse reports
// Optimized path that bypasses reflection for absolute mouse positioning
func handleAbsMouseReportDirect(params map[string]interface{}) (interface{}, error) {
// Extract and validate x coordinate
xFloat, err := validateFloat64Param(params, "x", "absMouseReport", 0, 32767)
if err != nil {
return nil, err
}
x := int(xFloat)
// Extract and validate y coordinate
yFloat, err := validateFloat64Param(params, "y", "absMouseReport", 0, 32767)
if err != nil {
return nil, err
}
y := int(yFloat)
// Extract and validate buttons
buttonsFloat, err := validateFloat64Param(params, "buttons", "absMouseReport", 0, 255)
if err != nil {
return nil, err
}
buttons := uint8(buttonsFloat)
return nil, rpcAbsMouseReport(x, y, buttons)
}
// Direct handler for relative mouse reports
// Optimized path that bypasses reflection for relative mouse movement
func handleRelMouseReportDirect(params map[string]interface{}) (interface{}, error) {
// Extract and validate dx (relative X movement)
dxFloat, err := validateFloat64Param(params, "dx", "relMouseReport", -127, 127)
if err != nil {
return nil, err
}
dx := int8(dxFloat)
// Extract and validate dy (relative Y movement)
dyFloat, err := validateFloat64Param(params, "dy", "relMouseReport", -127, 127)
if err != nil {
return nil, err
}
dy := int8(dyFloat)
// Extract and validate buttons
buttonsFloat, err := validateFloat64Param(params, "buttons", "relMouseReport", 0, 255)
if err != nil {
return nil, err
}
buttons := uint8(buttonsFloat)
return nil, rpcRelMouseReport(dx, dy, buttons)
}
// Direct handler for wheel reports
// Optimized path that bypasses reflection for mouse wheel events
func handleWheelReportDirect(params map[string]interface{}) (interface{}, error) {
// Extract and validate wheelY (scroll delta)
wheelYFloat, err := validateFloat64Param(params, "wheelY", "wheelReport", -127, 127)
if err != nil {
return nil, err
}
wheelY := int8(wheelYFloat)
return nil, rpcWheelReport(wheelY)
}
// handleInputRPCDirect routes input method calls to their optimized direct handlers
// This is the main entry point for the fast path that bypasses reflection.
// It provides significant performance improvements for high-frequency input events.
//
// Performance monitoring: Consider adding metrics collection here to track
// latency improvements and call frequency for production monitoring.
func handleInputRPCDirect(method string, params map[string]interface{}) (interface{}, error) {
switch method {
case "keyboardReport":
return handleKeyboardReportDirect(params)
case "absMouseReport":
return handleAbsMouseReportDirect(params)
case "relMouseReport":
return handleRelMouseReportDirect(params)
case "wheelReport":
return handleWheelReportDirect(params)
default:
// This should never happen if isInputMethod is correctly implemented
return nil, fmt.Errorf("handleInputRPCDirect: unsupported method '%s'", method)
}
}
// isInputMethod determines if a given RPC method should use the optimized direct path
// Returns true for input-related methods that have direct handlers implemented.
// This function must be kept in sync with handleInputRPCDirect.
func isInputMethod(method string) bool {
switch method {
case "keyboardReport", "absMouseReport", "relMouseReport", "wheelReport":
return true
default:
return false
}
}

View File

@ -16,8 +16,8 @@ var microphoneMuteState struct {
func SetAudioMuted(muted bool) {
audioMuteState.mu.Lock()
defer audioMuteState.mu.Unlock()
audioMuteState.muted = muted
audioMuteState.mu.Unlock()
}
func IsAudioMuted() bool {
@ -28,8 +28,8 @@ func IsAudioMuted() bool {
func SetMicrophoneMuted(muted bool) {
microphoneMuteState.mu.Lock()
defer microphoneMuteState.mu.Unlock()
microphoneMuteState.muted = muted
microphoneMuteState.mu.Unlock()
}
func IsMicrophoneMuted() bool {

View File

@ -87,9 +87,9 @@ static volatile int playback_initialized = 0;
// Function to dynamically update Opus encoder parameters
int update_opus_encoder_params(int bitrate, int complexity, int vbr, int vbr_constraint,
int signal_type, int bandwidth, int dtx) {
// This function works for both audio input and output encoder parameters
// Require either capture (output) or playback (input) initialization
if (!encoder || (!capture_initialized && !playback_initialized)) {
// This function updates encoder parameters for audio input (capture)
// Only capture uses the encoder; playback uses a separate decoder
if (!encoder || !capture_initialized) {
return -1; // Audio encoder not initialized
}

View File

@ -260,14 +260,14 @@ var (
lastMetricsUpdate int64
// Counter value tracking (since prometheus counters don't have Get() method)
audioFramesReceivedValue int64
audioFramesDroppedValue int64
audioBytesProcessedValue int64
audioConnectionDropsValue int64
micFramesSentValue int64
micFramesDroppedValue int64
micBytesProcessedValue int64
micConnectionDropsValue int64
audioFramesReceivedValue uint64
audioFramesDroppedValue uint64
audioBytesProcessedValue uint64
audioConnectionDropsValue uint64
micFramesSentValue uint64
micFramesDroppedValue uint64
micBytesProcessedValue uint64
micConnectionDropsValue uint64
// Atomic counters for device health metrics - functionality removed, no longer used
@ -277,11 +277,11 @@ var (
// UnifiedAudioMetrics provides a common structure for both input and output audio streams
type UnifiedAudioMetrics struct {
FramesReceived int64 `json:"frames_received"`
FramesDropped int64 `json:"frames_dropped"`
FramesSent int64 `json:"frames_sent,omitempty"`
BytesProcessed int64 `json:"bytes_processed"`
ConnectionDrops int64 `json:"connection_drops"`
FramesReceived uint64 `json:"frames_received"`
FramesDropped uint64 `json:"frames_dropped"`
FramesSent uint64 `json:"frames_sent,omitempty"`
BytesProcessed uint64 `json:"bytes_processed"`
ConnectionDrops uint64 `json:"connection_drops"`
LastFrameTime time.Time `json:"last_frame_time"`
AverageLatency time.Duration `json:"average_latency"`
}
@ -303,10 +303,10 @@ func convertAudioMetricsToUnified(metrics AudioMetrics) UnifiedAudioMetrics {
func convertAudioInputMetricsToUnified(metrics AudioInputMetrics) UnifiedAudioMetrics {
return UnifiedAudioMetrics{
FramesReceived: 0, // AudioInputMetrics doesn't have FramesReceived
FramesDropped: metrics.FramesDropped,
FramesSent: metrics.FramesSent,
BytesProcessed: metrics.BytesProcessed,
ConnectionDrops: metrics.ConnectionDrops,
FramesDropped: uint64(metrics.FramesDropped),
FramesSent: uint64(metrics.FramesSent),
BytesProcessed: uint64(metrics.BytesProcessed),
ConnectionDrops: uint64(metrics.ConnectionDrops),
LastFrameTime: metrics.LastFrameTime,
AverageLatency: metrics.AverageLatency,
}
@ -314,22 +314,22 @@ func convertAudioInputMetricsToUnified(metrics AudioInputMetrics) UnifiedAudioMe
// UpdateAudioMetrics updates Prometheus metrics with current audio data
func UpdateAudioMetrics(metrics UnifiedAudioMetrics) {
oldReceived := atomic.SwapInt64(&audioFramesReceivedValue, metrics.FramesReceived)
oldReceived := atomic.SwapUint64(&audioFramesReceivedValue, metrics.FramesReceived)
if metrics.FramesReceived > oldReceived {
audioFramesReceivedTotal.Add(float64(metrics.FramesReceived - oldReceived))
}
oldDropped := atomic.SwapInt64(&audioFramesDroppedValue, metrics.FramesDropped)
oldDropped := atomic.SwapUint64(&audioFramesDroppedValue, metrics.FramesDropped)
if metrics.FramesDropped > oldDropped {
audioFramesDroppedTotal.Add(float64(metrics.FramesDropped - oldDropped))
}
oldBytes := atomic.SwapInt64(&audioBytesProcessedValue, metrics.BytesProcessed)
oldBytes := atomic.SwapUint64(&audioBytesProcessedValue, metrics.BytesProcessed)
if metrics.BytesProcessed > oldBytes {
audioBytesProcessedTotal.Add(float64(metrics.BytesProcessed - oldBytes))
}
oldDrops := atomic.SwapInt64(&audioConnectionDropsValue, metrics.ConnectionDrops)
oldDrops := atomic.SwapUint64(&audioConnectionDropsValue, metrics.ConnectionDrops)
if metrics.ConnectionDrops > oldDrops {
audioConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - oldDrops))
}
@ -345,22 +345,22 @@ func UpdateAudioMetrics(metrics UnifiedAudioMetrics) {
// UpdateMicrophoneMetrics updates Prometheus metrics with current microphone data
func UpdateMicrophoneMetrics(metrics UnifiedAudioMetrics) {
oldSent := atomic.SwapInt64(&micFramesSentValue, metrics.FramesSent)
oldSent := atomic.SwapUint64(&micFramesSentValue, metrics.FramesSent)
if metrics.FramesSent > oldSent {
microphoneFramesSentTotal.Add(float64(metrics.FramesSent - oldSent))
}
oldDropped := atomic.SwapInt64(&micFramesDroppedValue, metrics.FramesDropped)
oldDropped := atomic.SwapUint64(&micFramesDroppedValue, metrics.FramesDropped)
if metrics.FramesDropped > oldDropped {
microphoneFramesDroppedTotal.Add(float64(metrics.FramesDropped - oldDropped))
}
oldBytes := atomic.SwapInt64(&micBytesProcessedValue, metrics.BytesProcessed)
oldBytes := atomic.SwapUint64(&micBytesProcessedValue, metrics.BytesProcessed)
if metrics.BytesProcessed > oldBytes {
microphoneBytesProcessedTotal.Add(float64(metrics.BytesProcessed - oldBytes))
}
oldDrops := atomic.SwapInt64(&micConnectionDropsValue, metrics.ConnectionDrops)
oldDrops := atomic.SwapUint64(&micConnectionDropsValue, metrics.ConnectionDrops)
if metrics.ConnectionDrops > oldDrops {
microphoneConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - oldDrops))
}

View File

@ -230,7 +230,7 @@ func (ais *AudioInputSupervisor) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) er
}
// SendConfig sends a configuration update to the subprocess (convenience method)
func (ais *AudioInputSupervisor) SendConfig(config InputIPCConfig) error {
func (ais *AudioInputSupervisor) SendConfig(config UnifiedIPCConfig) error {
if ais.client == nil {
return fmt.Errorf("client not initialized")
}
@ -243,7 +243,7 @@ func (ais *AudioInputSupervisor) SendConfig(config InputIPCConfig) error {
}
// SendOpusConfig sends a complete Opus encoder configuration to the audio input server
func (ais *AudioInputSupervisor) SendOpusConfig(config InputIPCOpusConfig) error {
func (ais *AudioInputSupervisor) SendOpusConfig(config UnifiedIPCOpusConfig) error {
if ais.client == nil {
return fmt.Errorf("client not initialized")
}

View File

@ -134,14 +134,12 @@ func (mp *GenericMessagePool) GetStats() (hitCount, missCount int64, hitRate flo
// Helper functions
// EncodeMessageHeader encodes a message header into a byte slice
func EncodeMessageHeader(magic uint32, msgType uint8, length uint32, timestamp int64) []byte {
header := make([]byte, 17)
// EncodeMessageHeader encodes a message header into a provided byte slice
func EncodeMessageHeader(header []byte, magic uint32, msgType uint8, length uint32, timestamp int64) {
binary.LittleEndian.PutUint32(header[0:4], magic)
header[4] = msgType
binary.LittleEndian.PutUint32(header[5:9], length)
binary.LittleEndian.PutUint64(header[9:17], uint64(timestamp))
return header
}
// EncodeAudioConfig encodes basic audio configuration to binary format
@ -179,14 +177,12 @@ func WriteIPCMessage(conn net.Conn, msg IPCMessage, pool *GenericMessagePool, dr
defer pool.Put(optMsg)
// Prepare header in pre-allocated buffer
header := EncodeMessageHeader(msg.GetMagic(), msg.GetType(), msg.GetLength(), msg.GetTimestamp())
copy(optMsg.header[:], header)
EncodeMessageHeader(optMsg.header[:], msg.GetMagic(), msg.GetType(), msg.GetLength(), msg.GetTimestamp())
// Set write deadline for timeout handling (more efficient than goroutines)
if deadline := time.Now().Add(Config.WriteTimeout); deadline.After(time.Now()) {
if err := conn.SetWriteDeadline(deadline); err != nil {
// If we can't set deadline, proceed without it
// This maintains compatibility with connections that don't support deadlines
_ = err // Explicitly ignore error for linter
}
}

View File

@ -27,27 +27,11 @@ var (
messagePoolSize = Config.MessagePoolSize // Pre-allocated message pool size
)
// Legacy aliases for backward compatibility
type InputMessageType = UnifiedMessageType
type InputIPCMessage = UnifiedIPCMessage
// Legacy constants for backward compatibility
const (
InputMessageTypeOpusFrame = MessageTypeOpusFrame
InputMessageTypeConfig = MessageTypeConfig
InputMessageTypeOpusConfig = MessageTypeOpusConfig
InputMessageTypeStop = MessageTypeStop
InputMessageTypeHeartbeat = MessageTypeHeartbeat
InputMessageTypeAck = MessageTypeAck
)
// Methods are now inherited from UnifiedIPCMessage
// OptimizedIPCMessage represents an optimized message with pre-allocated buffers
type OptimizedIPCMessage struct {
header [17]byte // Pre-allocated header buffer (headerSize = 17)
data []byte // Reusable data buffer
msg InputIPCMessage // Embedded message
header [17]byte // Pre-allocated header buffer (headerSize = 17)
data []byte // Reusable data buffer
msg UnifiedIPCMessage // Embedded message
}
// MessagePool manages a pool of reusable messages to reduce allocations
@ -109,7 +93,7 @@ func (mp *MessagePool) Get() *OptimizedIPCMessage {
atomic.AddInt64(&mp.hitCount, 1)
// Reset message for reuse
msg.data = msg.data[:0]
msg.msg = InputIPCMessage{}
msg.msg = UnifiedIPCMessage{}
return msg
}
mp.mutex.Unlock()
@ -120,7 +104,7 @@ func (mp *MessagePool) Get() *OptimizedIPCMessage {
atomic.AddInt64(&mp.hitCount, 1)
// Reset message for reuse and ensure proper capacity
msg.data = msg.data[:0]
msg.msg = InputIPCMessage{}
msg.msg = UnifiedIPCMessage{}
// Ensure data buffer has sufficient capacity
if cap(msg.data) < maxFrameSize {
msg.data = make([]byte, 0, maxFrameSize)
@ -148,7 +132,7 @@ func (mp *MessagePool) Put(msg *OptimizedIPCMessage) {
// Reset the message for reuse
msg.data = msg.data[:0]
msg.msg = InputIPCMessage{}
msg.msg = UnifiedIPCMessage{}
// First try to return to pre-allocated pool for fastest reuse
mp.mutex.Lock()
@ -168,10 +152,6 @@ func (mp *MessagePool) Put(msg *OptimizedIPCMessage) {
}
}
// Legacy aliases for backward compatibility
type InputIPCConfig = UnifiedIPCConfig
type InputIPCOpusConfig = UnifiedIPCOpusConfig
// AudioInputServer handles IPC communication for audio input processing
type AudioInputServer struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
@ -186,10 +166,10 @@ type AudioInputServer struct {
running bool
// Triple-goroutine architecture
messageChan chan *InputIPCMessage // Buffered channel for incoming messages
processChan chan *InputIPCMessage // Buffered channel for processing queue
stopChan chan struct{} // Stop signal for all goroutines
wg sync.WaitGroup // Wait group for goroutine coordination
messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages
processChan chan *UnifiedIPCMessage // Buffered channel for processing queue
stopChan chan struct{} // Stop signal for all goroutines
wg sync.WaitGroup // Wait group for goroutine coordination
// Channel resizing support
channelMutex sync.RWMutex // Protects channel recreation
@ -246,8 +226,8 @@ func NewAudioInputServer() (*AudioInputServer, error) {
return &AudioInputServer{
listener: listener,
messageChan: make(chan *InputIPCMessage, initialBufferSize),
processChan: make(chan *InputIPCMessage, initialBufferSize),
messageChan: make(chan *UnifiedIPCMessage, initialBufferSize),
processChan: make(chan *UnifiedIPCMessage, initialBufferSize),
stopChan: make(chan struct{}),
bufferSize: initialBufferSize,
lastBufferSize: initialBufferSize,
@ -405,7 +385,7 @@ func (ais *AudioInputServer) handleConnection(conn net.Conn) {
//
// The function uses pooled buffers for efficient memory management and
// ensures all messages conform to the JetKVM audio protocol specification.
func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error) {
func (ais *AudioInputServer) readMessage(conn net.Conn) (*UnifiedIPCMessage, error) {
// Get optimized message from pool
optMsg := globalMessagePool.Get()
defer globalMessagePool.Put(optMsg)
@ -419,7 +399,7 @@ func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error
// Parse header using optimized access
msg := &optMsg.msg
msg.Magic = binary.LittleEndian.Uint32(optMsg.header[0:4])
msg.Type = InputMessageType(optMsg.header[4])
msg.Type = UnifiedMessageType(optMsg.header[4])
msg.Length = binary.LittleEndian.Uint32(optMsg.header[5:9])
msg.Timestamp = int64(binary.LittleEndian.Uint64(optMsg.header[9:17]))
@ -450,7 +430,7 @@ func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error
}
// Return a copy of the message (data will be copied by caller if needed)
result := &InputIPCMessage{
result := &UnifiedIPCMessage{
Magic: msg.Magic,
Type: msg.Type,
Length: msg.Length,
@ -467,17 +447,17 @@ func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error
}
// processMessage processes a received message
func (ais *AudioInputServer) processMessage(msg *InputIPCMessage) error {
func (ais *AudioInputServer) processMessage(msg *UnifiedIPCMessage) error {
switch msg.Type {
case InputMessageTypeOpusFrame:
case MessageTypeOpusFrame:
return ais.processOpusFrame(msg.Data)
case InputMessageTypeConfig:
case MessageTypeConfig:
return ais.processConfig(msg.Data)
case InputMessageTypeOpusConfig:
case MessageTypeOpusConfig:
return ais.processOpusConfig(msg.Data)
case InputMessageTypeStop:
case MessageTypeStop:
return fmt.Errorf("stop message received")
case InputMessageTypeHeartbeat:
case MessageTypeHeartbeat:
return ais.sendAck()
default:
return fmt.Errorf("unknown message type: %d", msg.Type)
@ -538,7 +518,7 @@ func (ais *AudioInputServer) processOpusConfig(data []byte) error {
}
// Deserialize Opus configuration
config := InputIPCOpusConfig{
config := UnifiedIPCOpusConfig{
SampleRate: int(binary.LittleEndian.Uint32(data[0:4])),
Channels: int(binary.LittleEndian.Uint32(data[4:8])),
FrameSize: int(binary.LittleEndian.Uint32(data[8:12])),
@ -552,19 +532,33 @@ func (ais *AudioInputServer) processOpusConfig(data []byte) error {
logger.Info().Interface("config", config).Msg("applying dynamic Opus encoder configuration")
// Apply the Opus encoder configuration dynamically
err := CGOUpdateOpusEncoderParams(
config.Bitrate,
config.Complexity,
config.VBR,
0, // VBR constraint - using default
config.SignalType,
config.Bandwidth,
config.DTX,
)
// Note: We don't call CGOAudioInit() here as it would destroy and recreate the encoder,
// causing temporary unavailability. The encoder should already be initialized when
// the audio input server starts.
// Apply the Opus encoder configuration dynamically with retry logic
var err error
for attempt := 0; attempt < 3; attempt++ {
err = CGOUpdateOpusEncoderParams(
config.Bitrate,
config.Complexity,
config.VBR,
0, // VBR constraint - using default
config.SignalType,
config.Bandwidth,
config.DTX,
)
if err == nil {
break
}
logger.Warn().Err(err).Int("attempt", attempt+1).Msg("Failed to update Opus encoder parameters, retrying")
if attempt < 2 {
time.Sleep(time.Duration(attempt+1) * 50 * time.Millisecond)
}
}
if err != nil {
logger.Error().Err(err).Msg("failed to apply Opus encoder configuration")
logger.Error().Err(err).Msg("failed to apply Opus encoder configuration after retries")
return fmt.Errorf("failed to apply Opus configuration: %w", err)
}
@ -581,9 +575,9 @@ func (ais *AudioInputServer) sendAck() error {
return fmt.Errorf("no connection")
}
msg := &InputIPCMessage{
msg := &UnifiedIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeAck,
Type: MessageTypeAck,
Length: 0,
Timestamp: time.Now().UnixNano(),
}
@ -595,7 +589,7 @@ func (ais *AudioInputServer) sendAck() error {
var globalInputServerMessagePool = NewGenericMessagePool(messagePoolSize)
// writeMessage writes a message to the connection using shared common utilities
func (ais *AudioInputServer) writeMessage(conn net.Conn, msg *InputIPCMessage) error {
func (ais *AudioInputServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error {
// Use shared WriteIPCMessage function with global message pool
return WriteIPCMessage(conn, msg, globalInputServerMessagePool, &ais.droppedFrames)
}
@ -673,9 +667,9 @@ func (aic *AudioInputClient) Disconnect() {
if aic.conn != nil {
// Send stop message
msg := &InputIPCMessage{
msg := &UnifiedIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeStop,
Type: MessageTypeStop,
Length: 0,
Timestamp: time.Now().UnixNano(),
}
@ -700,9 +694,9 @@ func (aic *AudioInputClient) SendFrame(frame []byte) error {
}
// Direct message creation without timestamp overhead
msg := &InputIPCMessage{
msg := &UnifiedIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeOpusFrame,
Type: MessageTypeOpusFrame,
Length: uint32(len(frame)),
Data: frame,
}
@ -736,9 +730,9 @@ func (aic *AudioInputClient) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error
}
// Use zero-copy data directly
msg := &InputIPCMessage{
msg := &UnifiedIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeOpusFrame,
Type: MessageTypeOpusFrame,
Length: uint32(frameLen),
Timestamp: time.Now().UnixNano(),
Data: frame.Data(), // Zero-copy data access
@ -748,7 +742,7 @@ func (aic *AudioInputClient) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error
}
// SendConfig sends a configuration update to the audio input server
func (aic *AudioInputClient) SendConfig(config InputIPCConfig) error {
func (aic *AudioInputClient) SendConfig(config UnifiedIPCConfig) error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
@ -766,9 +760,9 @@ func (aic *AudioInputClient) SendConfig(config InputIPCConfig) error {
// Serialize config using common function
data := EncodeAudioConfig(config.SampleRate, config.Channels, config.FrameSize)
msg := &InputIPCMessage{
msg := &UnifiedIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeConfig,
Type: MessageTypeConfig,
Length: uint32(len(data)),
Timestamp: time.Now().UnixNano(),
Data: data,
@ -778,7 +772,7 @@ func (aic *AudioInputClient) SendConfig(config InputIPCConfig) error {
}
// SendOpusConfig sends a complete Opus encoder configuration update to the audio input server
func (aic *AudioInputClient) SendOpusConfig(config InputIPCOpusConfig) error {
func (aic *AudioInputClient) SendOpusConfig(config UnifiedIPCOpusConfig) error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
@ -795,9 +789,9 @@ func (aic *AudioInputClient) SendOpusConfig(config InputIPCOpusConfig) error {
// Serialize Opus configuration using common function
data := EncodeOpusConfig(config.SampleRate, config.Channels, config.FrameSize, config.Bitrate, config.Complexity, config.VBR, config.SignalType, config.Bandwidth, config.DTX)
msg := &InputIPCMessage{
msg := &UnifiedIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeOpusConfig,
Type: MessageTypeOpusConfig,
Length: uint32(len(data)),
Timestamp: time.Now().UnixNano(),
Data: data,
@ -815,9 +809,9 @@ func (aic *AudioInputClient) SendHeartbeat() error {
return fmt.Errorf("not connected to audio input server")
}
msg := &InputIPCMessage{
msg := &UnifiedIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeHeartbeat,
Type: MessageTypeHeartbeat,
Length: 0,
Timestamp: time.Now().UnixNano(),
}
@ -829,7 +823,7 @@ func (aic *AudioInputClient) SendHeartbeat() error {
// Global shared message pool for input IPC clients
var globalInputMessagePool = NewGenericMessagePool(messagePoolSize)
func (aic *AudioInputClient) writeMessage(msg *InputIPCMessage) error {
func (aic *AudioInputClient) writeMessage(msg *UnifiedIPCMessage) error {
// Increment total frames counter
atomic.AddInt64(&aic.totalFrames, 1)
@ -1093,9 +1087,9 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
}
// processMessageWithRecovery processes a message with enhanced error recovery
func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, logger zerolog.Logger) error {
func (ais *AudioInputServer) processMessageWithRecovery(msg *UnifiedIPCMessage, logger zerolog.Logger) error {
// Intelligent frame dropping: prioritize recent frames
if msg.Type == InputMessageTypeOpusFrame {
if msg.Type == MessageTypeOpusFrame {
// Check if processing queue is getting full
processChan := ais.getProcessChan()
queueLen := len(processChan)
@ -1172,7 +1166,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
// Calculate end-to-end latency using message timestamp
var latency time.Duration
if msg.Type == InputMessageTypeOpusFrame && msg.Timestamp > 0 {
if msg.Type == MessageTypeOpusFrame && msg.Timestamp > 0 {
msgTime := time.Unix(0, msg.Timestamp)
latency = time.Since(msgTime)
// Use exponential moving average for end-to-end latency tracking
@ -1291,14 +1285,14 @@ func GetGlobalMessagePoolStats() MessagePoolStats {
}
// getMessageChan safely returns the current message channel
func (ais *AudioInputServer) getMessageChan() chan *InputIPCMessage {
func (ais *AudioInputServer) getMessageChan() chan *UnifiedIPCMessage {
ais.channelMutex.RLock()
defer ais.channelMutex.RUnlock()
return ais.messageChan
}
// getProcessChan safely returns the current process channel
func (ais *AudioInputServer) getProcessChan() chan *InputIPCMessage {
func (ais *AudioInputServer) getProcessChan() chan *UnifiedIPCMessage {
ais.channelMutex.RLock()
defer ais.channelMutex.RUnlock()
return ais.processChan

View File

@ -13,24 +13,6 @@ import (
"github.com/rs/zerolog"
)
// Legacy aliases for backward compatibility
type OutputIPCConfig = UnifiedIPCConfig
type OutputIPCOpusConfig = UnifiedIPCOpusConfig
type OutputMessageType = UnifiedMessageType
type OutputIPCMessage = UnifiedIPCMessage
// Legacy constants for backward compatibility
const (
OutputMessageTypeOpusFrame = MessageTypeOpusFrame
OutputMessageTypeConfig = MessageTypeConfig
OutputMessageTypeOpusConfig = MessageTypeOpusConfig
OutputMessageTypeStop = MessageTypeStop
OutputMessageTypeHeartbeat = MessageTypeHeartbeat
OutputMessageTypeAck = MessageTypeAck
)
// Methods are now inherited from UnifiedIPCMessage
// Global shared message pool for output IPC client header reading
var globalOutputClientMessagePool = NewGenericMessagePool(Config.OutputMessagePoolSize)
@ -48,9 +30,9 @@ type AudioOutputServer struct {
logger zerolog.Logger
// Message channels
messageChan chan *OutputIPCMessage // Buffered channel for incoming messages
processChan chan *OutputIPCMessage // Buffered channel for processing queue
wg sync.WaitGroup // Wait group for goroutine coordination
messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages
processChan chan *UnifiedIPCMessage // Buffered channel for processing queue
wg sync.WaitGroup // Wait group for goroutine coordination
// Configuration
socketPath string
@ -65,8 +47,8 @@ func NewAudioOutputServer() (*AudioOutputServer, error) {
socketPath: socketPath,
magicNumber: Config.OutputMagicNumber,
logger: logger,
messageChan: make(chan *OutputIPCMessage, Config.ChannelBufferSize),
processChan: make(chan *OutputIPCMessage, Config.ChannelBufferSize),
messageChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize),
processChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize),
}
return server, nil
@ -112,6 +94,7 @@ func (s *AudioOutputServer) Stop() {
if s.listener != nil {
s.listener.Close()
s.listener = nil
}
if s.conn != nil {
@ -171,7 +154,7 @@ func (s *AudioOutputServer) handleConnection(conn net.Conn) {
}
// readMessage reads a message from the connection
func (s *AudioOutputServer) readMessage(conn net.Conn) (*OutputIPCMessage, error) {
func (s *AudioOutputServer) readMessage(conn net.Conn) (*UnifiedIPCMessage, error) {
header := make([]byte, 17)
if _, err := io.ReadFull(conn, header); err != nil {
return nil, fmt.Errorf("failed to read header: %w", err)
@ -182,7 +165,7 @@ func (s *AudioOutputServer) readMessage(conn net.Conn) (*OutputIPCMessage, error
return nil, fmt.Errorf("invalid magic number: expected %d, got %d", s.magicNumber, magic)
}
msgType := OutputMessageType(header[4])
msgType := UnifiedMessageType(header[4])
length := binary.LittleEndian.Uint32(header[5:9])
timestamp := int64(binary.LittleEndian.Uint64(header[9:17]))
@ -194,7 +177,7 @@ func (s *AudioOutputServer) readMessage(conn net.Conn) (*OutputIPCMessage, error
}
}
return &OutputIPCMessage{
return &UnifiedIPCMessage{
Magic: magic,
Type: msgType,
Length: length,
@ -204,14 +187,14 @@ func (s *AudioOutputServer) readMessage(conn net.Conn) (*OutputIPCMessage, error
}
// processMessage processes a received message
func (s *AudioOutputServer) processMessage(msg *OutputIPCMessage) error {
func (s *AudioOutputServer) processMessage(msg *UnifiedIPCMessage) error {
switch msg.Type {
case OutputMessageTypeOpusConfig:
case MessageTypeOpusConfig:
return s.processOpusConfig(msg.Data)
case OutputMessageTypeStop:
case MessageTypeStop:
s.logger.Info().Msg("Received stop message")
return nil
case OutputMessageTypeHeartbeat:
case MessageTypeHeartbeat:
s.logger.Debug().Msg("Received heartbeat")
return nil
default:
@ -228,7 +211,7 @@ func (s *AudioOutputServer) processOpusConfig(data []byte) error {
}
// Decode Opus configuration
config := OutputIPCOpusConfig{
config := UnifiedIPCOpusConfig{
SampleRate: int(binary.LittleEndian.Uint32(data[0:4])),
Channels: int(binary.LittleEndian.Uint32(data[4:8])),
FrameSize: int(binary.LittleEndian.Uint32(data[8:12])),
@ -282,9 +265,9 @@ func (s *AudioOutputServer) SendFrame(frame []byte) error {
return fmt.Errorf("no client connected")
}
msg := &OutputIPCMessage{
msg := &UnifiedIPCMessage{
Magic: s.magicNumber,
Type: OutputMessageTypeOpusFrame,
Type: MessageTypeOpusFrame,
Length: uint32(len(frame)),
Timestamp: time.Now().UnixNano(),
Data: frame,
@ -294,8 +277,9 @@ func (s *AudioOutputServer) SendFrame(frame []byte) error {
}
// writeMessage writes a message to the connection
func (s *AudioOutputServer) writeMessage(conn net.Conn, msg *OutputIPCMessage) error {
header := EncodeMessageHeader(msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp)
func (s *AudioOutputServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error {
header := make([]byte, 17)
EncodeMessageHeader(header, msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp)
if _, err := conn.Write(header); err != nil {
return fmt.Errorf("failed to write header: %w", err)
@ -415,8 +399,8 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) {
return nil, fmt.Errorf("invalid magic number in IPC message: got 0x%x, expected 0x%x", magic, outputMagicNumber)
}
msgType := OutputMessageType(optMsg.header[4])
if msgType != OutputMessageTypeOpusFrame {
msgType := UnifiedMessageType(optMsg.header[4])
if msgType != MessageTypeOpusFrame {
return nil, fmt.Errorf("unexpected message type: %d", msgType)
}
@ -443,7 +427,7 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) {
}
// SendOpusConfig sends Opus configuration to the audio output server
func (c *AudioOutputClient) SendOpusConfig(config OutputIPCOpusConfig) error {
func (c *AudioOutputClient) SendOpusConfig(config UnifiedIPCOpusConfig) error {
c.mtx.Lock()
defer c.mtx.Unlock()
@ -460,9 +444,9 @@ func (c *AudioOutputClient) SendOpusConfig(config OutputIPCOpusConfig) error {
// Serialize Opus configuration using common function
data := EncodeOpusConfig(config.SampleRate, config.Channels, config.FrameSize, config.Bitrate, config.Complexity, config.VBR, config.SignalType, config.Bandwidth, config.DTX)
msg := &OutputIPCMessage{
msg := &UnifiedIPCMessage{
Magic: c.magicNumber,
Type: OutputMessageTypeOpusConfig,
Type: MessageTypeOpusConfig,
Length: uint32(len(data)),
Timestamp: time.Now().UnixNano(),
Data: data,
@ -472,8 +456,9 @@ func (c *AudioOutputClient) SendOpusConfig(config OutputIPCOpusConfig) error {
}
// writeMessage writes a message to the connection
func (c *AudioOutputClient) writeMessage(msg *OutputIPCMessage) error {
header := EncodeMessageHeader(msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp)
func (c *AudioOutputClient) writeMessage(msg *UnifiedIPCMessage) error {
header := make([]byte, 17)
EncodeMessageHeader(header, msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp)
if _, err := c.conn.Write(header); err != nil {
return fmt.Errorf("failed to write header: %w", err)

View File

@ -389,7 +389,8 @@ func (s *UnifiedAudioServer) SendFrame(frame []byte) error {
// writeMessage writes a message to the connection
func (s *UnifiedAudioServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error {
header := EncodeMessageHeader(msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp)
header := make([]byte, 17)
EncodeMessageHeader(header, msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp)
// Optimize: Use single write for header+data to reduce system calls
if msg.Length > 0 && msg.Data != nil {

View File

@ -62,7 +62,7 @@ func (aim *AudioInputIPCManager) Start() error {
return err
}
config := InputIPCConfig{
config := UnifiedIPCConfig{
SampleRate: Config.InputIPCSampleRate,
Channels: Config.InputIPCChannels,
FrameSize: Config.InputIPCFrameSize,
@ -72,7 +72,7 @@ func (aim *AudioInputIPCManager) Start() error {
if err := ValidateInputIPCConfig(config.SampleRate, config.Channels, config.FrameSize); err != nil {
aim.logger.Warn().Err(err).Msg("invalid input IPC config from constants, using defaults")
// Use safe defaults if config validation fails
config = InputIPCConfig{
config = UnifiedIPCConfig{
SampleRate: 48000,
Channels: 2,
FrameSize: 960,

View File

@ -56,7 +56,7 @@ func (aom *AudioOutputIPCManager) Start() error {
aom.logComponentStarted(AudioOutputIPCComponent)
// Send initial configuration
config := OutputIPCConfig{
config := UnifiedIPCConfig{
SampleRate: Config.SampleRate,
Channels: Config.Channels,
FrameSize: int(Config.AudioQualityMediumFrameSize.Milliseconds()),
@ -202,7 +202,7 @@ func (aom *AudioOutputIPCManager) calculateFrameRate() float64 {
}
// SendConfig sends configuration to the IPC server
func (aom *AudioOutputIPCManager) SendConfig(config OutputIPCConfig) error {
func (aom *AudioOutputIPCManager) SendConfig(config UnifiedIPCConfig) error {
if aom.server == nil {
return fmt.Errorf("audio output server not initialized")
}

View File

@ -318,7 +318,7 @@ func (s *AudioOutputSupervisor) connectClient() {
}
// SendOpusConfig sends Opus configuration to the audio output subprocess
func (s *AudioOutputSupervisor) SendOpusConfig(config OutputIPCOpusConfig) error {
func (aos *AudioOutputSupervisor) SendOpusConfig(config UnifiedIPCOpusConfig) error {
if outputClient == nil {
return fmt.Errorf("client not initialized")
}

View File

@ -63,10 +63,10 @@ type AudioConfig struct {
// AudioMetrics tracks audio performance metrics
type AudioMetrics struct {
FramesReceived int64
FramesDropped int64
BytesProcessed int64
ConnectionDrops int64
FramesReceived uint64
FramesDropped uint64
BytesProcessed uint64
ConnectionDrops uint64
LastFrameTime time.Time
AverageLatency time.Duration
}
@ -214,8 +214,8 @@ func SetAudioQuality(quality AudioQuality) {
// Send dynamic configuration update to running subprocess via IPC
if supervisor.IsConnected() {
// Convert AudioConfig to OutputIPCOpusConfig with complete Opus parameters
opusConfig := OutputIPCOpusConfig{
// Convert AudioConfig to UnifiedIPCOpusConfig with complete Opus parameters
opusConfig := UnifiedIPCOpusConfig{
SampleRate: config.SampleRate,
Channels: config.Channels,
FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples
@ -311,8 +311,8 @@ func SetMicrophoneQuality(quality AudioQuality) {
// Send dynamic configuration update to running subprocess via IPC
if supervisor.IsConnected() {
// Convert AudioConfig to InputIPCOpusConfig with complete Opus parameters
opusConfig := InputIPCOpusConfig{
// Convert AudioConfig to UnifiedIPCOpusConfig with complete Opus parameters
opusConfig := UnifiedIPCOpusConfig{
SampleRate: config.SampleRate,
Channels: config.Channels,
FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples
@ -363,10 +363,10 @@ func GetGlobalAudioMetrics() AudioMetrics {
// Batched metrics to reduce atomic operations frequency
var (
batchedFramesReceived int64
batchedBytesProcessed int64
batchedFramesDropped int64
batchedConnectionDrops int64
batchedFramesReceived uint64
batchedBytesProcessed uint64
batchedFramesDropped uint64
batchedConnectionDrops uint64
lastFlushTime int64 // Unix timestamp in nanoseconds
)
@ -374,7 +374,7 @@ var (
// RecordFrameReceived increments the frames received counter with batched updates
func RecordFrameReceived(bytes int) {
// Use local batching to reduce atomic operations frequency
atomic.AddInt64(&batchedBytesProcessed, int64(bytes))
atomic.AddUint64(&batchedBytesProcessed, uint64(bytes))
// Update timestamp immediately for accurate tracking
metrics.LastFrameTime = time.Now()
@ -391,23 +391,23 @@ func RecordConnectionDrop() {
// flushBatchedMetrics flushes accumulated metrics to the main counters
func flushBatchedMetrics() {
// Atomically move batched metrics to main metrics
framesReceived := atomic.SwapInt64(&batchedFramesReceived, 0)
bytesProcessed := atomic.SwapInt64(&batchedBytesProcessed, 0)
framesDropped := atomic.SwapInt64(&batchedFramesDropped, 0)
connectionDrops := atomic.SwapInt64(&batchedConnectionDrops, 0)
framesReceived := atomic.SwapUint64(&batchedFramesReceived, 0)
bytesProcessed := atomic.SwapUint64(&batchedBytesProcessed, 0)
framesDropped := atomic.SwapUint64(&batchedFramesDropped, 0)
connectionDrops := atomic.SwapUint64(&batchedConnectionDrops, 0)
// Update main metrics if we have any batched data
if framesReceived > 0 {
atomic.AddInt64(&metrics.FramesReceived, framesReceived)
atomic.AddUint64(&metrics.FramesReceived, framesReceived)
}
if bytesProcessed > 0 {
atomic.AddInt64(&metrics.BytesProcessed, bytesProcessed)
atomic.AddUint64(&metrics.BytesProcessed, bytesProcessed)
}
if framesDropped > 0 {
atomic.AddInt64(&metrics.FramesDropped, framesDropped)
atomic.AddUint64(&metrics.FramesDropped, framesDropped)
}
if connectionDrops > 0 {
atomic.AddInt64(&metrics.ConnectionDrops, connectionDrops)
atomic.AddUint64(&metrics.ConnectionDrops, connectionDrops)
}
// Update last flush time

View File

@ -20,8 +20,6 @@ import (
"github.com/jetkvm/kvm/internal/usbgadget"
)
// Direct RPC message handling for optimal input responsiveness
type JSONRPCRequest struct {
JSONRPC string `json:"jsonrpc"`
Method string `json:"method"`
@ -123,39 +121,6 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) {
scopedLogger.Trace().Msg("Received RPC request")
// Fast path for input methods - bypass reflection for performance
// This optimization reduces latency by 3-6ms per input event by:
// - Eliminating reflection overhead
// - Reducing memory allocations
// - Optimizing parameter parsing and validation
// See input_rpc.go for implementation details
if isInputMethod(request.Method) {
result, err := handleInputRPCDirect(request.Method, request.Params)
if err != nil {
scopedLogger.Error().Err(err).Msg("Error calling direct input handler")
errorResponse := JSONRPCResponse{
JSONRPC: "2.0",
Error: map[string]interface{}{
"code": -32603,
"message": "Internal error",
"data": err.Error(),
},
ID: request.ID,
}
writeJSONRPCResponse(errorResponse, session)
return
}
response := JSONRPCResponse{
JSONRPC: "2.0",
Result: result,
ID: request.ID,
}
writeJSONRPCResponse(response, session)
return
}
// Fallback to reflection-based handler for non-input methods
handler, ok := rpcHandlers[request.Method]
if !ok {
errorResponse := JSONRPCResponse{

View File

@ -195,12 +195,6 @@ export function useMicrophone() {
// Find the audio transceiver (should already exist with sendrecv direction)
const transceivers = peerConnection.getTransceivers();
devLog("Available transceivers:", transceivers.map((t: RTCRtpTransceiver) => ({
direction: t.direction,
mid: t.mid,
senderTrack: t.sender.track?.kind,
receiverTrack: t.receiver.track?.kind
})));
// Look for an audio transceiver that can send (has sendrecv or sendonly direction)
const audioTransceiver = transceivers.find((transceiver: RTCRtpTransceiver) => {

6
usb.go
View File

@ -60,16 +60,10 @@ func rpcRelMouseReport(dx int8, dy int8, buttons uint8) error {
}
func rpcWheelReport(wheelY int8) error {
if gadget == nil {
return nil // Gracefully handle uninitialized gadget (e.g., in tests)
}
return gadget.AbsMouseWheelReport(wheelY)
}
func rpcGetKeyboardLedState() (state usbgadget.KeyboardState) {
if gadget == nil {
return usbgadget.KeyboardState{} // Return empty state for uninitialized gadget
}
return gadget.GetKeyboardState()
}