mirror of https://github.com/jetkvm/kvm.git
chore: use single channel for broadcasting events, as there won't be multiple StreamEventsClient anyway
This commit is contained in:
parent
08937b1197
commit
e3d2f96afd
|
|
@ -17,7 +17,7 @@ type grpcServer struct {
|
|||
pb.UnimplementedNativeServiceServer
|
||||
native *Native
|
||||
logger *zerolog.Logger
|
||||
eventChs []chan *pb.Event
|
||||
eventCh chan *pb.Event
|
||||
eventM sync.Mutex
|
||||
}
|
||||
|
||||
|
|
@ -26,7 +26,7 @@ func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer {
|
|||
s := &grpcServer{
|
||||
native: n,
|
||||
logger: logger,
|
||||
eventChs: make([]chan *pb.Event, 0),
|
||||
eventCh: make(chan *pb.Event, 100),
|
||||
}
|
||||
|
||||
// Store original callbacks and wrap them to also broadcast events
|
||||
|
|
@ -82,16 +82,7 @@ func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer {
|
|||
}
|
||||
|
||||
func (s *grpcServer) broadcastEvent(event *pb.Event) {
|
||||
s.eventM.Lock()
|
||||
defer s.eventM.Unlock()
|
||||
|
||||
for _, ch := range s.eventChs {
|
||||
select {
|
||||
case ch <- event:
|
||||
default:
|
||||
// Channel full, skip
|
||||
}
|
||||
}
|
||||
s.eventCh <- event
|
||||
}
|
||||
|
||||
func (s *grpcServer) IsReady(ctx context.Context, req *pb.IsReadyRequest) (*pb.IsReadyResponse, error) {
|
||||
|
|
@ -103,30 +94,10 @@ func (s *grpcServer) StreamEvents(req *pb.Empty, stream pb.NativeService_StreamE
|
|||
setProcTitle("connected")
|
||||
defer setProcTitle("waiting")
|
||||
|
||||
eventCh := make(chan *pb.Event, 100)
|
||||
|
||||
// Register this channel for events
|
||||
s.eventM.Lock()
|
||||
s.eventChs = append(s.eventChs, eventCh)
|
||||
s.eventM.Unlock()
|
||||
|
||||
// Unregister on exit
|
||||
defer func() {
|
||||
s.eventM.Lock()
|
||||
defer s.eventM.Unlock()
|
||||
for i, ch := range s.eventChs {
|
||||
if ch == eventCh {
|
||||
s.eventChs = append(s.eventChs[:i], s.eventChs[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
close(eventCh)
|
||||
}()
|
||||
|
||||
// Stream events
|
||||
for {
|
||||
select {
|
||||
case event := <-eventCh:
|
||||
case event := <-s.eventCh:
|
||||
if err := stream.Send(event); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue