fix: use single channel for broadcasting events, as there won't be multiple StreamEventsClient anyway

This commit is contained in:
Siyuan 2025-11-20 14:50:40 +00:00
parent e3d2f96afd
commit 6ee260f0b8
1 changed files with 47 additions and 11 deletions

View File

@ -15,18 +15,20 @@ import (
// grpcServer wraps the Native instance and implements the gRPC service // grpcServer wraps the Native instance and implements the gRPC service
type grpcServer struct { type grpcServer struct {
pb.UnimplementedNativeServiceServer pb.UnimplementedNativeServiceServer
native *Native native *Native
logger *zerolog.Logger logger *zerolog.Logger
eventCh chan *pb.Event eventStreamChan chan *pb.Event
eventM sync.Mutex eventStreamMu sync.Mutex
eventStreamCtx context.Context
eventStreamCancel context.CancelFunc
} }
// NewGRPCServer creates a new gRPC server for the native service // NewGRPCServer creates a new gRPC server for the native service
func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer { func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer {
s := &grpcServer{ s := &grpcServer{
native: n, native: n,
logger: logger, logger: logger,
eventCh: make(chan *pb.Event, 100), eventStreamChan: make(chan *pb.Event, 100),
} }
// Store original callbacks and wrap them to also broadcast events // Store original callbacks and wrap them to also broadcast events
@ -82,7 +84,7 @@ func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer {
} }
func (s *grpcServer) broadcastEvent(event *pb.Event) { func (s *grpcServer) broadcastEvent(event *pb.Event) {
s.eventCh <- event s.eventStreamChan <- event
} }
func (s *grpcServer) IsReady(ctx context.Context, req *pb.IsReadyRequest) (*pb.IsReadyResponse, error) { func (s *grpcServer) IsReady(ctx context.Context, req *pb.IsReadyRequest) (*pb.IsReadyResponse, error) {
@ -94,15 +96,49 @@ func (s *grpcServer) StreamEvents(req *pb.Empty, stream pb.NativeService_StreamE
setProcTitle("connected") setProcTitle("connected")
defer setProcTitle("waiting") defer setProcTitle("waiting")
// Cancel previous stream if exists
s.eventStreamMu.Lock()
if s.eventStreamCancel != nil {
s.logger.Debug().Msg("cancelling previous StreamEvents call")
s.eventStreamCancel()
}
// Create a cancellable context for this stream
ctx, cancel := context.WithCancel(stream.Context())
s.eventStreamCtx = ctx
s.eventStreamCancel = cancel
s.eventStreamMu.Unlock()
// Clean up when this stream ends
defer func() {
s.eventStreamMu.Lock()
defer s.eventStreamMu.Unlock()
if s.eventStreamCtx == ctx {
s.eventStreamCancel = nil
s.eventStreamCtx = nil
}
cancel()
}()
// Stream events // Stream events
for { for {
select { select {
case event := <-s.eventCh: case event := <-s.eventStreamChan:
// Check if this stream is still the active one
s.eventStreamMu.Lock()
isActive := s.eventStreamCtx == ctx
s.eventStreamMu.Unlock()
if !isActive {
s.logger.Debug().Msg("stream replaced by new call, exiting")
return context.Canceled
}
if err := stream.Send(event); err != nil { if err := stream.Send(event); err != nil {
return err return err
} }
case <-stream.Context().Done(): case <-ctx.Done():
return stream.Context().Err() return ctx.Err()
} }
} }
} }