From 8e27cd6b60a6229f0a370bd2e652b3a7a6329bfa Mon Sep 17 00:00:00 2001 From: Alex Ballas Date: Fri, 22 Aug 2025 21:15:46 +0300 Subject: [PATCH] chore: ensure that rpc messages get processed sequentially and avoid phantom and repeated key presses (#744) --- webrtc.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/webrtc.go b/webrtc.go index f6c8529..7a11e5c 100644 --- a/webrtc.go +++ b/webrtc.go @@ -23,6 +23,7 @@ type Session struct { HidChannel *webrtc.DataChannel DiskChannel *webrtc.DataChannel shouldUmountVirtualMedia bool + rpcQueue chan webrtc.DataChannelMessage } type SessionConfig struct { @@ -105,6 +106,12 @@ func newSession(config SessionConfig) (*Session, error) { return nil, err } session := &Session{peerConnection: peerConnection} + session.rpcQueue = make(chan webrtc.DataChannelMessage, 256) + go func() { + for msg := range session.rpcQueue { + onRPCMessage(msg, session) + } + }() peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { scopedLogger.Info().Str("label", d.Label()).Uint16("id", *d.ID()).Msg("New DataChannel") @@ -112,7 +119,8 @@ func newSession(config SessionConfig) (*Session, error) { case "rpc": session.RPCChannel = d d.OnMessage(func(msg webrtc.DataChannelMessage) { - go onRPCMessage(msg, session) + // Enqueue to ensure ordered processing + session.rpcQueue <- msg }) triggerOTAStateUpdate() triggerVideoStateUpdate() @@ -186,6 +194,11 @@ func newSession(config SessionConfig) (*Session, error) { if session == currentSession { currentSession = nil } + // Stop RPC processor + if session.rpcQueue != nil { + close(session.rpcQueue) + session.rpcQueue = nil + } if session.shouldUmountVirtualMedia { err := rpcUnmountImage() scopedLogger.Warn().Err(err).Msg("unmount image failed on connection close")