diff --git a/internal/native/grpc_server.go b/internal/native/grpc_server.go index 304203ce..dc177ef9 100644 --- a/internal/native/grpc_server.go +++ b/internal/native/grpc_server.go @@ -15,18 +15,18 @@ import ( // grpcServer wraps the Native instance and implements the gRPC service type grpcServer struct { pb.UnimplementedNativeServiceServer - native *Native - logger *zerolog.Logger - eventChs []chan *pb.Event - eventM sync.Mutex + native *Native + logger *zerolog.Logger + eventCh chan *pb.Event + eventM sync.Mutex } // NewGRPCServer creates a new gRPC server for the native service func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer { s := &grpcServer{ - native: n, - logger: logger, - eventChs: make([]chan *pb.Event, 0), + native: n, + logger: logger, + eventCh: make(chan *pb.Event, 100), } // Store original callbacks and wrap them to also broadcast events @@ -82,16 +82,7 @@ func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer { } func (s *grpcServer) broadcastEvent(event *pb.Event) { - s.eventM.Lock() - defer s.eventM.Unlock() - - for _, ch := range s.eventChs { - select { - case ch <- event: - default: - // Channel full, skip - } - } + s.eventCh <- event } func (s *grpcServer) IsReady(ctx context.Context, req *pb.IsReadyRequest) (*pb.IsReadyResponse, error) { @@ -103,30 +94,10 @@ func (s *grpcServer) StreamEvents(req *pb.Empty, stream pb.NativeService_StreamE setProcTitle("connected") defer setProcTitle("waiting") - eventCh := make(chan *pb.Event, 100) - - // Register this channel for events - s.eventM.Lock() - s.eventChs = append(s.eventChs, eventCh) - s.eventM.Unlock() - - // Unregister on exit - defer func() { - s.eventM.Lock() - defer s.eventM.Unlock() - for i, ch := range s.eventChs { - if ch == eventCh { - s.eventChs = append(s.eventChs[:i], s.eventChs[i+1:]...) - break - } - } - close(eventCh) - }() - // Stream events for { select { - case event := <-eventCh: + case event := <-s.eventCh: if err := stream.Send(event); err != nil { return err }