From 2d2c84df0c47f4a0355883e6c3cdbff900b6b7c9 Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Wed, 19 Nov 2025 22:55:18 +0000 Subject: [PATCH] fix: event handler not being called --- internal/native/grpc_client.go | 66 ++++++++++------------------------ 1 file changed, 19 insertions(+), 47 deletions(-) diff --git a/internal/native/grpc_client.go b/internal/native/grpc_client.go index 300a2284..340d1f56 100644 --- a/internal/native/grpc_client.go +++ b/internal/native/grpc_client.go @@ -29,8 +29,6 @@ type GRPCClient struct { eventStream pb.NativeService_StreamEventsClient eventM sync.RWMutex - eventCh chan *pb.Event - eventDone chan struct{} onVideoStateChange func(state VideoState) onIndevEvent func(event string) @@ -69,8 +67,6 @@ func NewGRPCClient(opts grpcClientOptions) (*GRPCClient, error) { conn: conn, client: client, logger: opts.Logger, - eventCh: make(chan *pb.Event, 100), - eventDone: make(chan struct{}), onVideoStateChange: opts.OnVideoStateChange, onIndevEvent: opts.OnIndevEvent, onRpcEvent: opts.OnRpcEvent, @@ -117,10 +113,26 @@ func (c *GRPCClient) handleEventStream(stream pb.NativeService_StreamEventsClien } logger.Trace().Msg("received event") - select { - case c.eventCh <- event: + switch event.Type { + case "video_state_change": + state := event.GetVideoState() + if state == nil { + logger.Warn().Msg("video state event is nil") + continue + } + go c.onVideoStateChange(VideoState{ + Ready: state.Ready, + Error: state.Error, + Width: int(state.Width), + Height: int(state.Height), + FramePerSecond: state.FramePerSecond, + }) + case "indev_event": + go c.onIndevEvent(event.GetIndevEvent()) + case "rpc_event": + go c.onRpcEvent(event.GetRpcEvent()) default: - logger.Warn().Msg("event channel full, dropping event") + logger.Warn().Str("type", event.Type).Msg("unknown event type") } } } @@ -210,44 +222,6 @@ func (c *GRPCClient) WaitReady() error { } } -func (c *GRPCClient) handleEvent(event *pb.Event) { - switch event.Type { - case "video_state_change": - state := event.GetVideoState() - if state == nil { - c.logger.Warn().Msg("video state event is nil") - return - } - c.onVideoStateChange(VideoState{ - Ready: state.Ready, - Error: state.Error, - Width: int(state.Width), - Height: int(state.Height), - FramePerSecond: state.FramePerSecond, - }) - case "indev_event": - c.onIndevEvent(event.GetIndevEvent()) - case "rpc_event": - c.onRpcEvent(event.GetRpcEvent()) - default: - c.logger.Warn().Str("type", event.Type).Msg("unknown event type") - } -} - -// OnEvent registers an event handler -func (c *GRPCClient) OnEvent(eventType string, handler func(data interface{})) { - go func() { - for { - select { - case event := <-c.eventCh: - c.handleEvent(event) - case <-c.eventDone: - return - } - } - }() -} - // Close closes the gRPC client func (c *GRPCClient) Close() error { c.closeM.Lock() @@ -260,8 +234,6 @@ func (c *GRPCClient) Close() error { // cancel all ongoing operations c.cancel() - close(c.eventDone) - c.eventM.Lock() if c.eventStream != nil { if err := c.eventStream.CloseSend(); err != nil {