diff --git a/internal/native/grpc_client.go b/internal/native/grpc_client.go index 4ea7bcb2..297bc175 100644 --- a/internal/native/grpc_client.go +++ b/internal/native/grpc_client.go @@ -82,6 +82,10 @@ func NewGRPCClient(opts grpcClientOptions) (*GRPCClient, error) { return grpcClient, nil } +func (c *GRPCClient) getContext() context.Context { + return c.ctx +} + func (c *GRPCClient) handleEventStream(stream pb.NativeService_StreamEventsClient) { c.eventM.Lock() c.eventStream = stream @@ -135,6 +139,14 @@ func (c *GRPCClient) startEventStream() { } c.closeM.Unlock() + // check if the context is done + select { + case <-c.ctx.Done(): + c.logger.Info().Msg("event stream context done, closing") + return + default: + } + stream, err := c.client.StreamEvents(c.ctx, &pb.Empty{}) if err != nil { c.logger.Warn().Err(err).Msg("failed to start event stream, retrying ...") diff --git a/internal/native/proxy.go b/internal/native/proxy.go index 74896e58..b5091a19 100644 --- a/internal/native/proxy.go +++ b/internal/native/proxy.go @@ -1,6 +1,7 @@ package native import ( + "context" "crypto/rand" "encoding/hex" "fmt" @@ -119,16 +120,15 @@ type NativeProxy struct { videoStreamListener net.Listener binaryPath string - client *GRPCClient - clientMu sync.RWMutex - cmd *cmdWrapper - logger *zerolog.Logger - ready chan struct{} - options *nativeProxyOptions - restartMu sync.Mutex - restarts uint - stopped bool - processWait chan error + client *GRPCClient + clientMu sync.RWMutex + cmd *cmdWrapper + logger *zerolog.Logger + ready chan struct{} + options *nativeProxyOptions + restartMu sync.Mutex + restarts uint + stopped bool } // NewNativeProxy creates a new NativeProxy that spawns a separate process @@ -149,7 +149,6 @@ func NewNativeProxy(opts NativeOptions) (*NativeProxy, error) { logger: nativeLogger, ready: make(chan struct{}), options: proxyOptions, - processWait: make(chan error, 1), restarts: 0, } @@ -308,7 +307,7 @@ func (p *NativeProxy) setUpGRPCClient() error { } // Start monitoring process for crashes - go p.monitorProcess() + go p.monitorProcess(client.getContext()) return nil } @@ -366,8 +365,14 @@ func (p *NativeProxy) Start() error { } // monitorProcess monitors the native process and restarts it if it crashes -func (p *NativeProxy) monitorProcess() { +func (p *NativeProxy) monitorProcess(ctx context.Context) { for { + select { + case <-ctx.Done(): + return + default: + } + p.restartMu.Lock() cmd := p.cmd stopped := p.stopped @@ -382,8 +387,11 @@ func (p *NativeProxy) monitorProcess() { } err := cmd.Wait() + // check if the context is done + // if yes, it means that there should be already one restart in progress select { - case p.processWait <- err: + case <-ctx.Done(): + return default: }