chore: ensure that rpc messages get processed sequentially and avoid phantom and repeated key presses (#744)

This commit is contained in:
Alex Ballas 2025-08-22 21:15:46 +03:00 committed by Alex P
parent 9d40263eed
commit 70e49a1cac
1 changed files with 16 additions and 3 deletions

View File

@ -29,14 +29,13 @@ type Session struct {
DiskChannel *webrtc.DataChannel DiskChannel *webrtc.DataChannel
AudioInputManager *audio.AudioInputManager AudioInputManager *audio.AudioInputManager
shouldUmountVirtualMedia bool shouldUmountVirtualMedia bool
// Microphone operation throttling // Microphone operation throttling
micCooldown time.Duration micCooldown time.Duration
// Audio frame processing // Audio frame processing
audioFrameChan chan []byte audioFrameChan chan []byte
audioStopChan chan struct{} audioStopChan chan struct{}
audioWg sync.WaitGroup audioWg sync.WaitGroup
rpcQueue chan webrtc.DataChannelMessage
} }
type SessionConfig struct { type SessionConfig struct {
@ -118,6 +117,7 @@ func newSession(config SessionConfig) (*Session, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
session := &Session{ session := &Session{
peerConnection: peerConnection, peerConnection: peerConnection,
AudioInputManager: audio.NewAudioInputManager(), AudioInputManager: audio.NewAudioInputManager(),
@ -129,13 +129,21 @@ func newSession(config SessionConfig) (*Session, error) {
// Start audio processing goroutine // Start audio processing goroutine
session.startAudioProcessor(*logger) session.startAudioProcessor(*logger)
session.rpcQueue = make(chan webrtc.DataChannelMessage, 256)
go func() {
for msg := range session.rpcQueue {
onRPCMessage(msg, session)
}
}()
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
scopedLogger.Info().Str("label", d.Label()).Uint16("id", *d.ID()).Msg("New DataChannel") scopedLogger.Info().Str("label", d.Label()).Uint16("id", *d.ID()).Msg("New DataChannel")
switch d.Label() { switch d.Label() {
case "rpc": case "rpc":
session.RPCChannel = d session.RPCChannel = d
d.OnMessage(func(msg webrtc.DataChannelMessage) { d.OnMessage(func(msg webrtc.DataChannelMessage) {
go onRPCMessage(msg, session) // Enqueue to ensure ordered processing
session.rpcQueue <- msg
}) })
triggerOTAStateUpdate() triggerOTAStateUpdate()
triggerVideoStateUpdate() triggerVideoStateUpdate()
@ -259,6 +267,11 @@ func newSession(config SessionConfig) (*Session, error) {
if session == currentSession { if session == currentSession {
currentSession = nil currentSession = nil
} }
// Stop RPC processor
if session.rpcQueue != nil {
close(session.rpcQueue)
session.rpcQueue = nil
}
if session.shouldUmountVirtualMedia { if session.shouldUmountVirtualMedia {
err := rpcUnmountImage() err := rpcUnmountImage()
scopedLogger.Warn().Err(err).Msg("unmount image failed on connection close") scopedLogger.Warn().Err(err).Msg("unmount image failed on connection close")