diff --git a/webrtc.go b/webrtc.go index 7d0c52c..8966fb4 100644 --- a/webrtc.go +++ b/webrtc.go @@ -29,14 +29,13 @@ type Session struct { DiskChannel *webrtc.DataChannel AudioInputManager *audio.AudioInputManager shouldUmountVirtualMedia bool - // Microphone operation throttling micCooldown time.Duration - // Audio frame processing audioFrameChan chan []byte audioStopChan chan struct{} audioWg sync.WaitGroup + rpcQueue chan webrtc.DataChannelMessage } type SessionConfig struct { @@ -118,6 +117,7 @@ func newSession(config SessionConfig) (*Session, error) { if err != nil { return nil, err } + session := &Session{ peerConnection: peerConnection, AudioInputManager: audio.NewAudioInputManager(), @@ -129,13 +129,21 @@ func newSession(config SessionConfig) (*Session, error) { // Start audio processing goroutine session.startAudioProcessor(*logger) + session.rpcQueue = make(chan webrtc.DataChannelMessage, 256) + go func() { + for msg := range session.rpcQueue { + onRPCMessage(msg, session) + } + }() + peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { scopedLogger.Info().Str("label", d.Label()).Uint16("id", *d.ID()).Msg("New DataChannel") switch d.Label() { case "rpc": session.RPCChannel = d d.OnMessage(func(msg webrtc.DataChannelMessage) { - go onRPCMessage(msg, session) + // Enqueue to ensure ordered processing + session.rpcQueue <- msg }) triggerOTAStateUpdate() triggerVideoStateUpdate() @@ -259,6 +267,11 @@ func newSession(config SessionConfig) (*Session, error) { if session == currentSession { currentSession = nil } + // Stop RPC processor + if session.rpcQueue != nil { + close(session.rpcQueue) + session.rpcQueue = nil + } if session.shouldUmountVirtualMedia { err := rpcUnmountImage() scopedLogger.Warn().Err(err).Msg("unmount image failed on connection close")