mirror of https://github.com/jetkvm/kvm.git
feat: handle grpc events (#986)
Co-authored-by: Siyuan <siyuan@buildjet.com>
This commit is contained in:
parent
07935add15
commit
85eb4babdf
|
|
@ -79,6 +79,18 @@ func NewGRPCClient(opts grpcClientOptions) (*GRPCClient, error) {
|
||||||
// Start event stream
|
// Start event stream
|
||||||
go grpcClient.startEventStream()
|
go grpcClient.startEventStream()
|
||||||
|
|
||||||
|
// Start event handler to process events from the channel
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case event := <-grpcClient.eventCh:
|
||||||
|
grpcClient.handleEvent(event)
|
||||||
|
case <-grpcClient.eventDone:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return grpcClient, nil
|
return grpcClient, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -234,20 +246,6 @@ func (c *GRPCClient) handleEvent(event *pb.Event) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnEvent registers an event handler
|
|
||||||
func (c *GRPCClient) OnEvent(eventType string, handler func(data interface{})) {
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event := <-c.eventCh:
|
|
||||||
c.handleEvent(event)
|
|
||||||
case <-c.eventDone:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close closes the gRPC client
|
// Close closes the gRPC client
|
||||||
func (c *GRPCClient) Close() error {
|
func (c *GRPCClient) Close() error {
|
||||||
c.closeM.Lock()
|
c.closeM.Lock()
|
||||||
|
|
|
||||||
|
|
@ -17,8 +17,10 @@ type grpcServer struct {
|
||||||
pb.UnimplementedNativeServiceServer
|
pb.UnimplementedNativeServiceServer
|
||||||
native *Native
|
native *Native
|
||||||
logger *zerolog.Logger
|
logger *zerolog.Logger
|
||||||
eventChs []chan *pb.Event
|
eventStreamChan chan *pb.Event
|
||||||
eventM sync.Mutex
|
eventStreamMu sync.Mutex
|
||||||
|
eventStreamCtx context.Context
|
||||||
|
eventStreamCancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGRPCServer creates a new gRPC server for the native service
|
// NewGRPCServer creates a new gRPC server for the native service
|
||||||
|
|
@ -26,7 +28,7 @@ func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer {
|
||||||
s := &grpcServer{
|
s := &grpcServer{
|
||||||
native: n,
|
native: n,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
eventChs: make([]chan *pb.Event, 0),
|
eventStreamChan: make(chan *pb.Event, 100),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store original callbacks and wrap them to also broadcast events
|
// Store original callbacks and wrap them to also broadcast events
|
||||||
|
|
@ -82,16 +84,7 @@ func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *grpcServer) broadcastEvent(event *pb.Event) {
|
func (s *grpcServer) broadcastEvent(event *pb.Event) {
|
||||||
s.eventM.Lock()
|
s.eventStreamChan <- event
|
||||||
defer s.eventM.Unlock()
|
|
||||||
|
|
||||||
for _, ch := range s.eventChs {
|
|
||||||
select {
|
|
||||||
case ch <- event:
|
|
||||||
default:
|
|
||||||
// Channel full, skip
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *grpcServer) IsReady(ctx context.Context, req *pb.IsReadyRequest) (*pb.IsReadyResponse, error) {
|
func (s *grpcServer) IsReady(ctx context.Context, req *pb.IsReadyRequest) (*pb.IsReadyResponse, error) {
|
||||||
|
|
@ -103,35 +96,49 @@ func (s *grpcServer) StreamEvents(req *pb.Empty, stream pb.NativeService_StreamE
|
||||||
setProcTitle("connected")
|
setProcTitle("connected")
|
||||||
defer setProcTitle("waiting")
|
defer setProcTitle("waiting")
|
||||||
|
|
||||||
eventCh := make(chan *pb.Event, 100)
|
// Cancel previous stream if exists
|
||||||
|
s.eventStreamMu.Lock()
|
||||||
|
if s.eventStreamCancel != nil {
|
||||||
|
s.logger.Debug().Msg("cancelling previous StreamEvents call")
|
||||||
|
s.eventStreamCancel()
|
||||||
|
}
|
||||||
|
|
||||||
// Register this channel for events
|
// Create a cancellable context for this stream
|
||||||
s.eventM.Lock()
|
ctx, cancel := context.WithCancel(stream.Context())
|
||||||
s.eventChs = append(s.eventChs, eventCh)
|
s.eventStreamCtx = ctx
|
||||||
s.eventM.Unlock()
|
s.eventStreamCancel = cancel
|
||||||
|
s.eventStreamMu.Unlock()
|
||||||
|
|
||||||
// Unregister on exit
|
// Clean up when this stream ends
|
||||||
defer func() {
|
defer func() {
|
||||||
s.eventM.Lock()
|
s.eventStreamMu.Lock()
|
||||||
defer s.eventM.Unlock()
|
defer s.eventStreamMu.Unlock()
|
||||||
for i, ch := range s.eventChs {
|
if s.eventStreamCtx == ctx {
|
||||||
if ch == eventCh {
|
s.eventStreamCancel = nil
|
||||||
s.eventChs = append(s.eventChs[:i], s.eventChs[i+1:]...)
|
s.eventStreamCtx = nil
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
cancel()
|
||||||
close(eventCh)
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Stream events
|
// Stream events
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case event := <-eventCh:
|
case event := <-s.eventStreamChan:
|
||||||
|
// Check if this stream is still the active one
|
||||||
|
s.eventStreamMu.Lock()
|
||||||
|
isActive := s.eventStreamCtx == ctx
|
||||||
|
s.eventStreamMu.Unlock()
|
||||||
|
|
||||||
|
if !isActive {
|
||||||
|
s.logger.Debug().Msg("stream replaced by new call, exiting")
|
||||||
|
return context.Canceled
|
||||||
|
}
|
||||||
|
|
||||||
if err := stream.Send(event); err != nil {
|
if err := stream.Send(event); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case <-stream.Context().Done():
|
case <-ctx.Done():
|
||||||
return stream.Context().Err()
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue