diff --git a/internal/native/grpc_client.go b/internal/native/grpc_client.go index ffb6e5b7..86345935 100644 --- a/internal/native/grpc_client.go +++ b/internal/native/grpc_client.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "sync" "time" @@ -60,52 +61,53 @@ func NewGRPCClient(socketPath string, logger *zerolog.Logger) (*GRPCClient, erro } func (c *GRPCClient) startEventStream() { - // for { - // return - // c.closeM.Lock() - // if c.closed { - // c.closeM.Unlock() - // return - // } - // c.closeM.Unlock() + for { + c.closeM.Lock() + if c.closed { + c.closeM.Unlock() + return + } + c.closeM.Unlock() - // ctx := context.Background() - // stream, err := c.client.StreamEvents(ctx, &pb.Empty{}) - // if err != nil { - // c.logger.Warn().Err(err).Msg("failed to start event stream, retrying...") - // time.Sleep(1 * time.Second) - // continue - // } + ctx := context.Background() + stream, err := c.client.StreamEvents(ctx, &pb.Empty{}) + if err != nil { + c.logger.Warn().Err(err).Msg("failed to start event stream, retrying...") + time.Sleep(1 * time.Second) + continue + } - // c.eventM.Lock() - // c.eventStream = stream - // c.eventM.Unlock() + c.eventM.Lock() + c.eventStream = stream + c.eventM.Unlock() - // for { - // event, err := stream.Recv() - // if err == io.EOF { - // c.logger.Debug().Msg("event stream closed") - // break - // } - // if err != nil { - // c.logger.Warn().Err(err).Msg("event stream error") - // break - // } + for { + event, err := stream.Recv() + if err == io.EOF { + c.logger.Debug().Msg("event stream closed") + break + } + if err != nil { + c.logger.Warn().Err(err).Msg("event stream error") + break + } - // select { - // case c.eventCh <- event: - // default: - // c.logger.Warn().Msg("event channel full, dropping event") - // } - // } + c.logger.Info().Str("type", event.Type).Msg("received event") - // c.eventM.Lock() - // c.eventStream = nil - // c.eventM.Unlock() + select { + case c.eventCh <- event: + default: + c.logger.Warn().Msg("event channel full, dropping event") + } + } - // // Wait before retrying - // time.Sleep(1 * time.Second) - // } + c.eventM.Lock() + c.eventStream = nil + c.eventM.Unlock() + + // Wait before retrying + time.Sleep(1 * time.Second) + } } func (c *GRPCClient) checkIsReady(ctx context.Context) error { diff --git a/internal/native/grpc_server.go b/internal/native/grpc_server.go index e00a3908..bfa2fc25 100644 --- a/internal/native/grpc_server.go +++ b/internal/native/grpc_server.go @@ -5,7 +5,6 @@ import ( "fmt" "net" "sync" - "time" "github.com/rs/zerolog" "google.golang.org/grpc" @@ -36,7 +35,6 @@ func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer { originalVideoStateChange := n.onVideoStateChange originalIndevEvent := n.onIndevEvent originalRpcEvent := n.onRpcEvent - originalVideoFrameReceived := n.onVideoFrameReceived // Wrap callbacks to both call original and broadcast events n.onVideoStateChange = func(state VideoState) { @@ -82,21 +80,6 @@ func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer { }) } - n.onVideoFrameReceived = func(frame []byte, duration time.Duration) { - if originalVideoFrameReceived != nil { - originalVideoFrameReceived(frame, duration) - } - s.broadcastEvent(&pb.Event{ - Type: "video_frame", - Data: &pb.Event_VideoFrame{ - VideoFrame: &pb.VideoFrame{ - Frame: frame, - DurationNs: duration.Nanoseconds(), - }, - }, - }) - } - return s } diff --git a/internal/native/log.go b/internal/native/log.go index d74b81a2..41ae4df9 100644 --- a/internal/native/log.go +++ b/internal/native/log.go @@ -1,14 +1,11 @@ package native import ( - "os" - "github.com/jetkvm/kvm/internal/logging" "github.com/rs/zerolog" ) -var nativeL = logging.GetSubsystemLogger("native").With().Int("pid", os.Getpid()).Logger() -var nativeLogger = &nativeL +var nativeLogger = logging.GetSubsystemLogger("native") var displayLogger = logging.GetSubsystemLogger("display") type nativeLogMessage struct { diff --git a/internal/native/native.go b/internal/native/native.go index f60573d6..5d126f32 100644 --- a/internal/native/native.go +++ b/internal/native/native.go @@ -40,6 +40,9 @@ type NativeOptions struct { } func NewNative(opts NativeOptions) *Native { + nativeSubLogger := nativeLogger.With().Str("scope", "native").Logger() + displaySubLogger := displayLogger.With().Str("scope", "native").Logger() + onVideoStateChange := opts.OnVideoStateChange if onVideoStateChange == nil { onVideoStateChange = func(state VideoState) { @@ -78,8 +81,8 @@ func NewNative(opts NativeOptions) *Native { return &Native{ disable: opts.Disable, ready: make(chan struct{}), - l: nativeLogger, - lD: displayLogger, + l: &nativeSubLogger, + lD: &displaySubLogger, systemVersion: opts.SystemVersion, appVersion: opts.AppVersion, displayRotation: opts.DisplayRotation, diff --git a/internal/native/proxy.go b/internal/native/proxy.go index 60e3bea2..e31a53c0 100644 --- a/internal/native/proxy.go +++ b/internal/native/proxy.go @@ -162,7 +162,7 @@ func (p *NativeProxy) spawnProcess() (*cmdWrapper, error) { p.binaryPath, "-subcomponent=native", ) - cmd.Stdout = os.Stdout // Forward stdout to parent + // cmd.Stdout = os.Stdout // Forward stdout to parent cmd.Stderr = os.Stderr // Forward stderr to parent // Set environment variable to indicate native process mode cmd.Env = append(