From b49d67c87d3e92fcd3c3e7787e2faf761606cd2f Mon Sep 17 00:00:00 2001 From: Siyuan Date: Thu, 6 Nov 2025 22:40:44 +0000 Subject: [PATCH] fix: RX goroutine cleanup --- internal/lldp/rx.go | 44 +++++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/internal/lldp/rx.go b/internal/lldp/rx.go index a903ebbd..cf8fb543 100644 --- a/internal/lldp/rx.go +++ b/internal/lldp/rx.go @@ -87,23 +87,25 @@ func (l *LLDP) setUpCapture() error { return nil } -func (l *LLDP) doCapture(logger *zerolog.Logger, rxCtx context.Context) { - defer func() { - l.mu.Lock() - l.rxRunning = false - l.mu.Unlock() - - logger.Info().Msg("RX goroutine finished") - - l.rxWaitGroup.Done() - }() - +func (l *LLDP) doCapture(logger *zerolog.Logger) { l.rxWaitGroup.Add(1) + defer l.rxWaitGroup.Done() // TODO: use a channel to handle the packets // PacketSource.Packets() is not reliable and can cause panics and the upstream hasn't fixed it yet - for rxCtx.Err() == nil { + for { + // check if the context is done before blocking call + select { + case <-l.rxCtx.Done(): + logger.Info().Msg("RX context cancelled") + return + default: + } + + logger.Trace().Msg("waiting for next packet") packet, err := l.pktSourceRx.NextPacket() + logger.Trace().Interface("packet", packet).Err(err).Msg("got next packet") + if err != nil { logger.Error(). Err(err). @@ -152,9 +154,7 @@ func (l *LLDP) startCapture() error { l.rxCtx, l.rxCancel = context.WithCancel(context.Background()) l.rxRunning = true - // Capture context in closure - rxCtx := l.rxCtx - go l.doCapture(&logger, rxCtx) + go l.doCapture(&logger) return nil } @@ -363,11 +363,25 @@ func (l *LLDP) stopCapture() error { logger.Info().Msg("cancelled RX context, waiting for goroutine to finish") } + // stop the TPacketRx + go func() { + if l.tPacketRx == nil { + return + } + + // write an empty packet to the TPacketRx to interrupt the blocking read + // it's a shitty workaround until https://github.com/google/gopacket/pull/777 is merged, + // or we have a better solution, see https://github.com/google/gopacket/issues/1064 + l.tPacketRx.WritePacketData([]byte{}) + }() + // wait for the goroutine to finish start := time.Now() l.rxWaitGroup.Wait() logger.Info().Dur("duration", time.Since(start)).Msg("RX goroutine finished") + l.rxRunning = false + if l.tPacketRx != nil { logger.Info().Msg("closing TPacketRx") l.tPacketRx.Close()