chore: use ordered unreliable channel for pointer events

This commit is contained in:
Siyuan Miao 2025-09-11 17:24:10 +02:00
parent 3f83efa830
commit f10a2283d4
5 changed files with 110 additions and 56 deletions

View File

@ -66,8 +66,13 @@ func handleHidRPCMessage(message hidrpc.Message, session *Session) {
} }
} }
func onHidMessage(data []byte, session *Session) { func onHidMessage(msg hidQueueMessage, session *Session) {
scopedLogger := hidRPCLogger.With().Bytes("data", data).Logger() data := msg.Data
scopedLogger := hidRPCLogger.With().
Str("channel", msg.channel).
Bytes("data", data).
Logger()
scopedLogger.Debug().Msg("HID RPC message received") scopedLogger.Debug().Msg("HID RPC message received")
if len(data) < 1 { if len(data) < 1 {

View File

@ -117,6 +117,9 @@ export interface RTCState {
rpcHidUnreliableChannel: RTCDataChannel | null; rpcHidUnreliableChannel: RTCDataChannel | null;
setRpcHidUnreliableChannel: (channel: RTCDataChannel) => void; setRpcHidUnreliableChannel: (channel: RTCDataChannel) => void;
rpcHidUnreliableNonOrderedChannel: RTCDataChannel | null;
setRpcHidUnreliableNonOrderedChannel: (channel: RTCDataChannel) => void;
peerConnectionState: RTCPeerConnectionState | null; peerConnectionState: RTCPeerConnectionState | null;
setPeerConnectionState: (state: RTCPeerConnectionState) => void; setPeerConnectionState: (state: RTCPeerConnectionState) => void;
@ -175,6 +178,9 @@ export const useRTCStore = create<RTCState>(set => ({
rpcHidUnreliableChannel: null, rpcHidUnreliableChannel: null,
setRpcHidUnreliableChannel: (channel: RTCDataChannel) => set({ rpcHidUnreliableChannel: channel }), setRpcHidUnreliableChannel: (channel: RTCDataChannel) => set({ rpcHidUnreliableChannel: channel }),
rpcHidUnreliableNonOrderedChannel: null,
setRpcHidUnreliableNonOrderedChannel: (channel: RTCDataChannel) => set({ rpcHidUnreliableNonOrderedChannel: channel }),
transceiver: null, transceiver: null,
setTransceiver: (transceiver: RTCRtpTransceiver) => set({ transceiver }), setTransceiver: (transceiver: RTCRtpTransceiver) => set({ transceiver }),

View File

@ -22,12 +22,14 @@ const KEEPALIVE_MESSAGE = new KeypressKeepAliveMessage();
interface sendMessageParams { interface sendMessageParams {
ignoreHandshakeState?: boolean; ignoreHandshakeState?: boolean;
useUnreliableChannel?: boolean; useUnreliableChannel?: boolean;
requireOrdered?: boolean;
} }
export function useHidRpc(onHidRpcMessage?: (payload: RpcMessage) => void) { export function useHidRpc(onHidRpcMessage?: (payload: RpcMessage) => void) {
const { const {
rpcHidChannel, rpcHidChannel,
rpcHidUnreliableChannel, rpcHidUnreliableChannel,
rpcHidUnreliableNonOrderedChannel,
setRpcHidProtocolVersion, setRpcHidProtocolVersion,
rpcHidProtocolVersion, hidRpcDisabled, rpcHidProtocolVersion, hidRpcDisabled,
} = useRTCStore(); } = useRTCStore();
@ -41,6 +43,10 @@ export function useHidRpc(onHidRpcMessage?: (payload: RpcMessage) => void) {
return rpcHidUnreliableChannel?.readyState === "open" && rpcHidProtocolVersion !== null; return rpcHidUnreliableChannel?.readyState === "open" && rpcHidProtocolVersion !== null;
}, [rpcHidUnreliableChannel, rpcHidProtocolVersion]); }, [rpcHidUnreliableChannel, rpcHidProtocolVersion]);
const rpcHidUnreliableNonOrderedReady = useMemo(() => {
return rpcHidUnreliableNonOrderedChannel?.readyState === "open" && rpcHidProtocolVersion !== null;
}, [rpcHidUnreliableNonOrderedChannel, rpcHidProtocolVersion]);
const rpcHidStatus = useMemo(() => { const rpcHidStatus = useMemo(() => {
if (hidRpcDisabled) return "disabled"; if (hidRpcDisabled) return "disabled";
@ -50,7 +56,7 @@ export function useHidRpc(onHidRpcMessage?: (payload: RpcMessage) => void) {
return `ready (v${rpcHidProtocolVersion}${rpcHidUnreliableReady ? "+u" : ""})`; return `ready (v${rpcHidProtocolVersion}${rpcHidUnreliableReady ? "+u" : ""})`;
}, [rpcHidChannel, rpcHidUnreliableReady, rpcHidProtocolVersion, hidRpcDisabled]); }, [rpcHidChannel, rpcHidUnreliableReady, rpcHidProtocolVersion, hidRpcDisabled]);
const sendMessage = useCallback((message: RpcMessage, { ignoreHandshakeState, useUnreliableChannel }: sendMessageParams = {}) => { const sendMessage = useCallback((message: RpcMessage, { ignoreHandshakeState, useUnreliableChannel, requireOrdered = true }: sendMessageParams = {}) => {
if (hidRpcDisabled) return; if (hidRpcDisabled) return;
if (rpcHidChannel?.readyState !== "open") return; if (rpcHidChannel?.readyState !== "open") return;
if (!rpcHidReady && !ignoreHandshakeState) return; if (!rpcHidReady && !ignoreHandshakeState) return;
@ -63,12 +69,24 @@ export function useHidRpc(onHidRpcMessage?: (payload: RpcMessage) => void) {
} }
if (!data) return; if (!data) return;
if (useUnreliableChannel && rpcHidUnreliableReady) { if (useUnreliableChannel) {
if (requireOrdered && rpcHidUnreliableReady) {
rpcHidUnreliableChannel?.send(data as unknown as ArrayBuffer); rpcHidUnreliableChannel?.send(data as unknown as ArrayBuffer);
} else { } else if (!requireOrdered && rpcHidUnreliableNonOrderedReady) {
rpcHidChannel?.send(data as unknown as ArrayBuffer); rpcHidUnreliableNonOrderedChannel?.send(data as unknown as ArrayBuffer);
} }
}, [rpcHidChannel, rpcHidReady, hidRpcDisabled, rpcHidUnreliableChannel, rpcHidUnreliableReady]); return;
}
rpcHidChannel?.send(data as unknown as ArrayBuffer);
}, [
rpcHidChannel,
rpcHidUnreliableChannel,
hidRpcDisabled, rpcHidUnreliableNonOrderedChannel,
rpcHidReady,
rpcHidUnreliableReady,
rpcHidUnreliableNonOrderedReady,
]);
const reportKeyboardEvent = useCallback( const reportKeyboardEvent = useCallback(
(keys: number[], modifier: number) => { (keys: number[], modifier: number) => {
@ -85,7 +103,7 @@ export function useHidRpc(onHidRpcMessage?: (payload: RpcMessage) => void) {
const reportAbsMouseEvent = useCallback( const reportAbsMouseEvent = useCallback(
(x: number, y: number, buttons: number) => { (x: number, y: number, buttons: number) => {
sendMessage(new PointerReportMessage(x, y, buttons)); sendMessage(new PointerReportMessage(x, y, buttons), { useUnreliableChannel: true });
}, },
[sendMessage], [sendMessage],
); );
@ -112,7 +130,7 @@ export function useHidRpc(onHidRpcMessage?: (payload: RpcMessage) => void) {
); );
const reportKeypressKeepAlive = useCallback(() => { const reportKeypressKeepAlive = useCallback(() => {
sendMessage(KEEPALIVE_MESSAGE, { useUnreliableChannel: true }); sendMessage(KEEPALIVE_MESSAGE, { useUnreliableChannel: true, requireOrdered: false });
}, [sendMessage]); }, [sendMessage]);
const sendHandshake = useCallback(() => { const sendHandshake = useCallback(() => {

View File

@ -136,6 +136,7 @@ export default function KvmIdRoute() {
rpcDataChannel, rpcDataChannel,
setTransceiver, setTransceiver,
setRpcHidChannel, setRpcHidChannel,
setRpcHidUnreliableNonOrderedChannel,
setRpcHidUnreliableChannel, setRpcHidUnreliableChannel,
} = useRTCStore(); } = useRTCStore();
@ -489,9 +490,8 @@ export default function KvmIdRoute() {
setRpcHidChannel(rpcHidChannel); setRpcHidChannel(rpcHidChannel);
}; };
const rpcHidUnreliableChannel = pc.createDataChannel("hidrpc-unreliable", { const rpcHidUnreliableChannel = pc.createDataChannel("hidrpc-unreliable-ordered", {
// We don't need to be ordered, as we're using the unreliable channel for keepalive messages ordered: true,
ordered: false,
maxRetransmits: 0, maxRetransmits: 0,
}); });
rpcHidUnreliableChannel.binaryType = "arraybuffer"; rpcHidUnreliableChannel.binaryType = "arraybuffer";
@ -499,6 +499,15 @@ export default function KvmIdRoute() {
setRpcHidUnreliableChannel(rpcHidUnreliableChannel); setRpcHidUnreliableChannel(rpcHidUnreliableChannel);
}; };
const rpcHidUnreliableNonOrderedChannel = pc.createDataChannel("hidrpc-unreliable-nonordered", {
ordered: false,
maxRetransmits: 0,
});
rpcHidUnreliableNonOrderedChannel.binaryType = "arraybuffer";
rpcHidUnreliableNonOrderedChannel.onopen = () => {
setRpcHidUnreliableNonOrderedChannel(rpcHidUnreliableNonOrderedChannel);
};
setPeerConnection(pc); setPeerConnection(pc);
}, [ }, [
cleanupAndStopReconnecting, cleanupAndStopReconnecting,
@ -510,6 +519,7 @@ export default function KvmIdRoute() {
setPeerConnectionState, setPeerConnectionState,
setRpcDataChannel, setRpcDataChannel,
setRpcHidChannel, setRpcHidChannel,
setRpcHidUnreliableNonOrderedChannel,
setRpcHidUnreliableChannel, setRpcHidUnreliableChannel,
setTransceiver, setTransceiver,
]); ]);

101
webrtc.go
View File

@ -29,7 +29,12 @@ type Session struct {
hidRPCAvailable bool hidRPCAvailable bool
hidQueueLock sync.Mutex hidQueueLock sync.Mutex
hidQueue []chan webrtc.DataChannelMessage hidQueue []chan hidQueueMessage
}
type hidQueueMessage struct {
webrtc.DataChannelMessage
channel string
} }
type SessionConfig struct { type SessionConfig struct {
@ -78,16 +83,59 @@ func (s *Session) initQueues() {
s.hidQueueLock.Lock() s.hidQueueLock.Lock()
defer s.hidQueueLock.Unlock() defer s.hidQueueLock.Unlock()
s.hidQueue = make([]chan webrtc.DataChannelMessage, 0) s.hidQueue = make([]chan hidQueueMessage, 0)
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
q := make(chan webrtc.DataChannelMessage, 256) q := make(chan hidQueueMessage, 256)
s.hidQueue = append(s.hidQueue, q) s.hidQueue = append(s.hidQueue, q)
} }
} }
func (s *Session) handleQueues(index int) { func (s *Session) handleQueues(index int) {
for msg := range s.hidQueue[index] { for msg := range s.hidQueue[index] {
onHidMessage(msg.Data, s) onHidMessage(msg, s)
}
}
func getOnHidMessageHandler(session *Session, scopedLogger *zerolog.Logger, channel string) func(msg webrtc.DataChannelMessage) {
return func(msg webrtc.DataChannelMessage) {
l := scopedLogger.With().
Str("channel", channel).
Int("length", len(msg.Data)).
Logger()
// only log data if the log level is debug or lower
if scopedLogger.GetLevel() > zerolog.DebugLevel {
l = l.With().Str("data", string(msg.Data)).Logger()
}
if msg.IsString {
l.Warn().Msg("received string data in HID RPC message handler")
return
}
if len(msg.Data) < 1 {
l.Warn().Msg("received empty data in HID RPC message handler")
return
}
l.Trace().Msg("received data in HID RPC message handler")
// Enqueue to ensure ordered processing
queueIndex := hidrpc.GetQueueIndex(hidrpc.MessageType(msg.Data[0]))
if queueIndex >= len(session.hidQueue) || queueIndex < 0 {
l.Warn().Int("queueIndex", queueIndex).Msg("received data in HID RPC message handler, but queue index not found")
queueIndex = 3
}
queue := session.hidQueue[queueIndex]
if queue != nil {
queue <- hidQueueMessage{
DataChannelMessage: msg,
channel: channel,
}
} else {
l.Warn().Int("queueIndex", queueIndex).Msg("received data in HID RPC message handler, but queue is nil")
return
}
} }
} }
@ -145,41 +193,6 @@ func newSession(config SessionConfig) (*Session, error) {
go session.handleQueues(i) go session.handleQueues(i)
} }
onHidMessage := func(msg webrtc.DataChannelMessage) {
l := scopedLogger.With().Int("length", len(msg.Data)).Logger()
// only log data if the log level is debug or lower
if scopedLogger.GetLevel() > zerolog.DebugLevel {
l = l.With().Str("data", string(msg.Data)).Logger()
}
if msg.IsString {
l.Warn().Msg("received string data in HID RPC message handler")
return
}
if len(msg.Data) < 1 {
l.Warn().Msg("received empty data in HID RPC message handler")
return
}
l.Trace().Msg("received data in HID RPC message handler")
// Enqueue to ensure ordered processing
queueIndex := hidrpc.GetQueueIndex(hidrpc.MessageType(msg.Data[0]))
if queueIndex >= len(session.hidQueue) || queueIndex < 0 {
l.Warn().Int("queueIndex", queueIndex).Msg("received data in HID RPC message handler, but queue index not found")
queueIndex = 3
}
queue := session.hidQueue[queueIndex]
if queue != nil {
queue <- msg
} else {
l.Warn().Int("queueIndex", queueIndex).Msg("received data in HID RPC message handler, but queue is nil")
return
}
}
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@ -192,10 +205,12 @@ func newSession(config SessionConfig) (*Session, error) {
switch d.Label() { switch d.Label() {
case "hidrpc": case "hidrpc":
session.HidChannel = d session.HidChannel = d
d.OnMessage(onHidMessage) d.OnMessage(getOnHidMessageHandler(session, scopedLogger, "hidrpc"))
// we won't send anything over the unreliable channel // we won't send anything over the unreliable channels
case "hidrpc-unreliable": case "hidrpc-unreliable-ordered":
d.OnMessage(onHidMessage) d.OnMessage(getOnHidMessageHandler(session, scopedLogger, "hidrpc-unreliable-ordered"))
case "hidrpc-unreliable-nonordered":
d.OnMessage(getOnHidMessageHandler(session, scopedLogger, "hidrpc-unreliable-nonordered"))
case "rpc": case "rpc":
session.RPCChannel = d session.RPCChannel = d
d.OnMessage(func(msg webrtc.DataChannelMessage) { d.OnMessage(func(msg webrtc.DataChannelMessage) {