kvm/internal/native/grpc_server.go

165 lines
4.0 KiB
Go

package native
import (
"context"
"fmt"
"net"
"sync"
"github.com/rs/zerolog"
"google.golang.org/grpc"
pb "github.com/jetkvm/kvm/internal/native/proto"
)
// grpcServer wraps the Native instance and implements the gRPC service
type grpcServer struct {
pb.UnimplementedNativeServiceServer
native *Native
logger *zerolog.Logger
eventStreamChan chan *pb.Event
eventStreamMu sync.Mutex
eventStreamCtx context.Context
eventStreamCancel context.CancelFunc
}
// NewGRPCServer creates a new gRPC server for the native service
func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer {
s := &grpcServer{
native: n,
logger: logger,
eventStreamChan: make(chan *pb.Event, 100),
}
// Store original callbacks and wrap them to also broadcast events
originalVideoStateChange := n.onVideoStateChange
originalIndevEvent := n.onIndevEvent
originalRpcEvent := n.onRpcEvent
// Wrap callbacks to both call original and broadcast events
n.onVideoStateChange = func(state VideoState) {
if originalVideoStateChange != nil {
originalVideoStateChange(state)
}
event := &pb.Event{
Type: "video_state_change",
Data: &pb.Event_VideoState{
VideoState: &pb.VideoState{
Ready: state.Ready,
Error: state.Error,
Width: int32(state.Width),
Height: int32(state.Height),
FramePerSecond: state.FramePerSecond,
},
},
}
s.broadcastEvent(event)
}
n.onIndevEvent = func(event string) {
if originalIndevEvent != nil {
originalIndevEvent(event)
}
s.broadcastEvent(&pb.Event{
Type: "indev_event",
Data: &pb.Event_IndevEvent{
IndevEvent: event,
},
})
}
n.onRpcEvent = func(event string) {
if originalRpcEvent != nil {
originalRpcEvent(event)
}
s.broadcastEvent(&pb.Event{
Type: "rpc_event",
Data: &pb.Event_RpcEvent{
RpcEvent: event,
},
})
}
return s
}
func (s *grpcServer) broadcastEvent(event *pb.Event) {
s.eventStreamChan <- event
}
func (s *grpcServer) IsReady(ctx context.Context, req *pb.IsReadyRequest) (*pb.IsReadyResponse, error) {
return &pb.IsReadyResponse{Ready: true, VideoReady: true}, nil
}
// StreamEvents streams events from the native process
func (s *grpcServer) StreamEvents(req *pb.Empty, stream pb.NativeService_StreamEventsServer) error {
setProcTitle("connected")
defer setProcTitle("waiting")
// Cancel previous stream if exists
s.eventStreamMu.Lock()
if s.eventStreamCancel != nil {
s.logger.Debug().Msg("cancelling previous StreamEvents call")
s.eventStreamCancel()
}
// Create a cancellable context for this stream
ctx, cancel := context.WithCancel(stream.Context())
s.eventStreamCtx = ctx
s.eventStreamCancel = cancel
s.eventStreamMu.Unlock()
// Clean up when this stream ends
defer func() {
s.eventStreamMu.Lock()
defer s.eventStreamMu.Unlock()
if s.eventStreamCtx == ctx {
s.eventStreamCancel = nil
s.eventStreamCtx = nil
}
cancel()
}()
// Stream events
for {
select {
case event := <-s.eventStreamChan:
// Check if this stream is still the active one
s.eventStreamMu.Lock()
isActive := s.eventStreamCtx == ctx
s.eventStreamMu.Unlock()
if !isActive {
s.logger.Debug().Msg("stream replaced by new call, exiting")
return context.Canceled
}
if err := stream.Send(event); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}
// StartGRPCServer starts the gRPC server on a Unix domain socket
func StartGRPCServer(server *grpcServer, socketPath string, logger *zerolog.Logger) (*grpc.Server, net.Listener, error) {
lis, err := net.Listen("unix", socketPath)
if err != nil {
return nil, nil, fmt.Errorf("failed to listen on socket: %w", err)
}
s := grpc.NewServer()
pb.RegisterNativeServiceServer(s, server)
go func() {
if err := s.Serve(lis); err != nil {
logger.Error().Err(err).Msg("gRPC server error")
}
}()
logger.Info().Str("socket", socketPath).Msg("gRPC server started")
return s, lis, nil
}