From 621be3c1d9a37aead171261813f3760c09b9fc93 Mon Sep 17 00:00:00 2001 From: Siyuan Date: Thu, 6 Nov 2025 21:54:56 +0000 Subject: [PATCH] fix: PR issues --- internal/lldp/lldp.go | 13 ++++++---- internal/lldp/neigh.go | 20 +++++++++------ internal/lldp/rx.go | 56 ++++++++++++++++++++---------------------- internal/lldp/tx.go | 6 ++--- 4 files changed, 50 insertions(+), 45 deletions(-) diff --git a/internal/lldp/lldp.go b/internal/lldp/lldp.go index e9c24986..3c693707 100644 --- a/internal/lldp/lldp.go +++ b/internal/lldp/lldp.go @@ -30,15 +30,17 @@ type LLDP struct { advertiseOptions *AdvertiseOptions onChange func(neighbors []Neighbor) - neighbors *ttlcache.Cache[string, Neighbor] + neighbors *ttlcache.Cache[neighborCacheKey, Neighbor] // State tracking - rxRunning bool txRunning bool txCtx context.Context txCancel context.CancelFunc - rxCtx context.Context - rxCancel context.CancelFunc + + rxRunning bool + rxWaitGroup *sync.WaitGroup + rxCtx context.Context + rxCancel context.CancelFunc } type AdvertiseOptions struct { @@ -72,8 +74,9 @@ func NewLLDP(opts *Options) *LLDP { advertiseOptions: opts.AdvertiseOptions, enableRx: opts.EnableRx, enableTx: opts.EnableTx, + rxWaitGroup: &sync.WaitGroup{}, l: opts.Logger, - neighbors: ttlcache.New(ttlcache.WithTTL[string, Neighbor](1 * time.Hour)), + neighbors: ttlcache.New(ttlcache.WithTTL[neighborCacheKey, Neighbor](1 * time.Hour)), onChange: opts.OnChange, } } diff --git a/internal/lldp/neigh.go b/internal/lldp/neigh.go index b3460738..3a951c64 100644 --- a/internal/lldp/neigh.go +++ b/internal/lldp/neigh.go @@ -1,7 +1,6 @@ package lldp import ( - "fmt" "time" ) @@ -27,22 +26,28 @@ type Neighbor struct { Values map[string]string `json:"values"` } -func (n *Neighbor) cacheKey() string { - return fmt.Sprintf("%s-%s", n.Mac, n.Source) +type neighborCacheKey struct { + mac string + source string +} + +func (n *Neighbor) cacheKey() neighborCacheKey { + return neighborCacheKey{mac: n.Mac, source: n.Source} } func (l *LLDP) addNeighbor(neighbor *Neighbor, ttl time.Duration) { logger := l.l.With(). + Str("source", neighbor.Source). Str("mac", neighbor.Mac). Interface("neighbor", neighbor). Logger() key := neighbor.cacheKey() - current_neigh := l.neighbors.Get(key) - if current_neigh != nil { - current_source := current_neigh.Value().Source - if current_source == "lldp" && neighbor.Source != "lldp" { + currentNeighbor := l.neighbors.Get(key) + if currentNeighbor != nil { + currentSource := currentNeighbor.Value().Source + if currentSource == "lldp" && neighbor.Source != "lldp" { logger.Info().Msg("skip updating neighbor, as LLDP has higher priority") return } @@ -56,6 +61,7 @@ func (l *LLDP) addNeighbor(neighbor *Neighbor, ttl time.Duration) { func (l *LLDP) deleteNeighbor(neighbor *Neighbor) { logger := l.l.With(). + Str("source", neighbor.Source). Str("mac", neighbor.Mac). Logger() diff --git a/internal/lldp/rx.go b/internal/lldp/rx.go index 7ccb1033..a903ebbd 100644 --- a/internal/lldp/rx.go +++ b/internal/lldp/rx.go @@ -92,46 +92,40 @@ func (l *LLDP) doCapture(logger *zerolog.Logger, rxCtx context.Context) { l.mu.Lock() l.rxRunning = false l.mu.Unlock() + + logger.Info().Msg("RX goroutine finished") + + l.rxWaitGroup.Done() }() + l.rxWaitGroup.Add(1) + // TODO: use a channel to handle the packets // PacketSource.Packets() is not reliable and can cause panics and the upstream hasn't fixed it yet for rxCtx.Err() == nil { - if l.pktSourceRx == nil || l.tPacketRx == nil { - logger.Error().Msg("packet source or TPacketRx not initialized") - break - } - packet, err := l.pktSourceRx.NextPacket() - if err == nil { - if handleErr := l.handlePacket(packet, logger); handleErr != nil { - logger.Error(). - Err(handleErr). - Msg("error handling packet") + if err != nil { + logger.Error(). + Err(err). + Msg("error getting next packet") + + // Immediately break for known unrecoverable errors + if err == io.EOF || err == io.ErrUnexpectedEOF || + err == io.ErrNoProgress || err == io.ErrClosedPipe || err == io.ErrShortBuffer || + err == syscall.EBADF || + strings.Contains(err.Error(), "use of closed file") { + return } + continue } - // Immediately retry for temporary network errors and EAGAIN - // temporary has been deprecated and most cases are timeouts - if nerr, ok := err.(net.Error); ok && nerr.Timeout() { + if err := l.handlePacket(packet, logger); err != nil { + logger.Error(). + Err(err). + Msg("error handling packet") continue } - if err == syscall.EAGAIN { - continue - } - - // Immediately break for known unrecoverable errors - if err == io.EOF || err == io.ErrUnexpectedEOF || - err == io.ErrNoProgress || err == io.ErrClosedPipe || err == io.ErrShortBuffer || - err == syscall.EBADF || - strings.Contains(err.Error(), "use of closed file") { - break - } - - logger.Error(). - Err(err). - Msg("error receiving LLDP packet") } } @@ -369,8 +363,10 @@ func (l *LLDP) stopCapture() error { logger.Info().Msg("cancelled RX context, waiting for goroutine to finish") } - // Wait a bit for goroutine to finish - time.Sleep(500 * time.Millisecond) + // wait for the goroutine to finish + start := time.Now() + l.rxWaitGroup.Wait() + logger.Info().Dur("duration", time.Since(start)).Msg("RX goroutine finished") if l.tPacketRx != nil { logger.Info().Msg("closing TPacketRx") diff --git a/internal/lldp/tx.go b/internal/lldp/tx.go index 745414ca..658c9d4c 100644 --- a/internal/lldp/tx.go +++ b/internal/lldp/tx.go @@ -60,10 +60,10 @@ var ( func toLLDPCapabilitiesBytes(capabilities []string) uint16 { r := uint16(0) for _, capability := range capabilities { - if _, ok := capabilityMap[capability]; !ok { - continue + mask, ok := capabilityMap[capability] + if ok { + r |= mask } - r |= capabilityMap[capability] } return r }