diff --git a/internal/native/grpc_client.go b/internal/native/grpc_client.go index c28ceb6c..4ea7bcb2 100644 --- a/internal/native/grpc_client.go +++ b/internal/native/grpc_client.go @@ -20,6 +20,9 @@ import ( // GRPCClient wraps the gRPC client for the native service type GRPCClient struct { + ctx context.Context + cancel context.CancelFunc + conn *grpc.ClientConn client pb.NativeServiceClient logger *zerolog.Logger @@ -58,7 +61,11 @@ func NewGRPCClient(opts grpcClientOptions) (*GRPCClient, error) { client := pb.NewNativeServiceClient(conn) + ctx, cancel := context.WithCancel(context.Background()) + grpcClient := &GRPCClient{ + ctx: ctx, + cancel: cancel, conn: conn, client: client, logger: opts.Logger, @@ -83,8 +90,8 @@ func (c *GRPCClient) handleEventStream(stream pb.NativeService_StreamEventsClien c.eventM.Unlock() }() - logger := *c.logger for { + logger := c.logger.With().Interface("stream", stream).Logger() if stream == nil { logger.Error().Msg("event stream is nil") break @@ -128,8 +135,7 @@ func (c *GRPCClient) startEventStream() { } c.closeM.Unlock() - ctx := context.Background() - stream, err := c.client.StreamEvents(ctx, &pb.Empty{}) + stream, err := c.client.StreamEvents(c.ctx, &pb.Empty{}) if err != nil { c.logger.Warn().Err(err).Msg("failed to start event stream, retrying ...") time.Sleep(5 * time.Second) @@ -161,7 +167,7 @@ func (c *GRPCClient) checkIsReady(ctx context.Context) error { // WaitReady waits for the gRPC connection to be ready func (c *GRPCClient) WaitReady() error { - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + ctx, cancel := context.WithTimeout(c.ctx, 60*time.Second) defer cancel() prevState := connectivity.Idle @@ -243,6 +249,9 @@ func (c *GRPCClient) Close() error { } c.closed = true + // cancel all ongoing operations + c.cancel() + close(c.eventDone) c.eventM.Lock()