fix: event handler

This commit is contained in:
Siyuan 2025-11-12 15:56:48 +00:00
parent a494f2f15f
commit d6c97d17ef
5 changed files with 49 additions and 64 deletions

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io"
"sync" "sync"
"time" "time"
@ -60,52 +61,53 @@ func NewGRPCClient(socketPath string, logger *zerolog.Logger) (*GRPCClient, erro
} }
func (c *GRPCClient) startEventStream() { func (c *GRPCClient) startEventStream() {
// for { for {
// return c.closeM.Lock()
// c.closeM.Lock() if c.closed {
// if c.closed { c.closeM.Unlock()
// c.closeM.Unlock() return
// return }
// } c.closeM.Unlock()
// c.closeM.Unlock()
// ctx := context.Background() ctx := context.Background()
// stream, err := c.client.StreamEvents(ctx, &pb.Empty{}) stream, err := c.client.StreamEvents(ctx, &pb.Empty{})
// if err != nil { if err != nil {
// c.logger.Warn().Err(err).Msg("failed to start event stream, retrying...") c.logger.Warn().Err(err).Msg("failed to start event stream, retrying...")
// time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
// continue continue
// } }
// c.eventM.Lock() c.eventM.Lock()
// c.eventStream = stream c.eventStream = stream
// c.eventM.Unlock() c.eventM.Unlock()
// for { for {
// event, err := stream.Recv() event, err := stream.Recv()
// if err == io.EOF { if err == io.EOF {
// c.logger.Debug().Msg("event stream closed") c.logger.Debug().Msg("event stream closed")
// break break
// } }
// if err != nil { if err != nil {
// c.logger.Warn().Err(err).Msg("event stream error") c.logger.Warn().Err(err).Msg("event stream error")
// break break
// } }
// select { c.logger.Info().Str("type", event.Type).Msg("received event")
// case c.eventCh <- event:
// default:
// c.logger.Warn().Msg("event channel full, dropping event")
// }
// }
// c.eventM.Lock() select {
// c.eventStream = nil case c.eventCh <- event:
// c.eventM.Unlock() default:
c.logger.Warn().Msg("event channel full, dropping event")
}
}
// // Wait before retrying c.eventM.Lock()
// time.Sleep(1 * time.Second) c.eventStream = nil
// } c.eventM.Unlock()
// Wait before retrying
time.Sleep(1 * time.Second)
}
} }
func (c *GRPCClient) checkIsReady(ctx context.Context) error { func (c *GRPCClient) checkIsReady(ctx context.Context) error {

View File

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"net" "net"
"sync" "sync"
"time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -36,7 +35,6 @@ func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer {
originalVideoStateChange := n.onVideoStateChange originalVideoStateChange := n.onVideoStateChange
originalIndevEvent := n.onIndevEvent originalIndevEvent := n.onIndevEvent
originalRpcEvent := n.onRpcEvent originalRpcEvent := n.onRpcEvent
originalVideoFrameReceived := n.onVideoFrameReceived
// Wrap callbacks to both call original and broadcast events // Wrap callbacks to both call original and broadcast events
n.onVideoStateChange = func(state VideoState) { n.onVideoStateChange = func(state VideoState) {
@ -82,21 +80,6 @@ func NewGRPCServer(n *Native, logger *zerolog.Logger) *grpcServer {
}) })
} }
n.onVideoFrameReceived = func(frame []byte, duration time.Duration) {
if originalVideoFrameReceived != nil {
originalVideoFrameReceived(frame, duration)
}
s.broadcastEvent(&pb.Event{
Type: "video_frame",
Data: &pb.Event_VideoFrame{
VideoFrame: &pb.VideoFrame{
Frame: frame,
DurationNs: duration.Nanoseconds(),
},
},
})
}
return s return s
} }

View File

@ -1,14 +1,11 @@
package native package native
import ( import (
"os"
"github.com/jetkvm/kvm/internal/logging" "github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog" "github.com/rs/zerolog"
) )
var nativeL = logging.GetSubsystemLogger("native").With().Int("pid", os.Getpid()).Logger() var nativeLogger = logging.GetSubsystemLogger("native")
var nativeLogger = &nativeL
var displayLogger = logging.GetSubsystemLogger("display") var displayLogger = logging.GetSubsystemLogger("display")
type nativeLogMessage struct { type nativeLogMessage struct {

View File

@ -40,6 +40,9 @@ type NativeOptions struct {
} }
func NewNative(opts NativeOptions) *Native { func NewNative(opts NativeOptions) *Native {
nativeSubLogger := nativeLogger.With().Str("scope", "native").Logger()
displaySubLogger := displayLogger.With().Str("scope", "native").Logger()
onVideoStateChange := opts.OnVideoStateChange onVideoStateChange := opts.OnVideoStateChange
if onVideoStateChange == nil { if onVideoStateChange == nil {
onVideoStateChange = func(state VideoState) { onVideoStateChange = func(state VideoState) {
@ -78,8 +81,8 @@ func NewNative(opts NativeOptions) *Native {
return &Native{ return &Native{
disable: opts.Disable, disable: opts.Disable,
ready: make(chan struct{}), ready: make(chan struct{}),
l: nativeLogger, l: &nativeSubLogger,
lD: displayLogger, lD: &displaySubLogger,
systemVersion: opts.SystemVersion, systemVersion: opts.SystemVersion,
appVersion: opts.AppVersion, appVersion: opts.AppVersion,
displayRotation: opts.DisplayRotation, displayRotation: opts.DisplayRotation,

View File

@ -162,7 +162,7 @@ func (p *NativeProxy) spawnProcess() (*cmdWrapper, error) {
p.binaryPath, p.binaryPath,
"-subcomponent=native", "-subcomponent=native",
) )
cmd.Stdout = os.Stdout // Forward stdout to parent // cmd.Stdout = os.Stdout // Forward stdout to parent
cmd.Stderr = os.Stderr // Forward stderr to parent cmd.Stderr = os.Stderr // Forward stderr to parent
// Set environment variable to indicate native process mode // Set environment variable to indicate native process mode
cmd.Env = append( cmd.Env = append(