Restructure to ensure we're releasing locks

We were holding locks too long and doing more structure copying than optimal.
This commit is contained in:
Marc Brooks 2025-11-10 17:18:04 -06:00
parent af8bfc702c
commit b6c1e49803
No known key found for this signature in database
GPG Key ID: 583A6AF2D6AE1DC6
8 changed files with 205 additions and 149 deletions

View File

@ -7,6 +7,7 @@ import (
"strconv"
"sync"
"github.com/guregu/null/v6"
"github.com/jetkvm/kvm/internal/confparser"
"github.com/jetkvm/kvm/internal/logging"
"github.com/jetkvm/kvm/internal/network/types"
@ -254,6 +255,16 @@ func LoadConfig() {
loadedConfig.KeyboardLayout = "en-US"
}
// fixup old lldp modes enabled and all are now rx_and_tx
if loadedConfig.NetworkConfig.LLDPMode.String == "enabled" || loadedConfig.NetworkConfig.LLDPMode.String == "all" {
loadedConfig.NetworkConfig.LLDPMode = null.StringFrom("rx_and_tx")
}
// fixup old lldp mode basic is now rx_only
if loadedConfig.NetworkConfig.LLDPMode.String == "basic" {
loadedConfig.NetworkConfig.LLDPMode = null.StringFrom("rx_only")
}
config = &loadedConfig
logging.GetRootLogger().UpdateLogLevel(config.DefaultLogLevel)

View File

@ -39,7 +39,7 @@ type testNetworkConfig struct {
IPv6Mode null.String `json:"ipv6_mode" one_of:"slaac,dhcpv6,slaac_and_dhcpv6,static,link_local,disabled" default:"slaac"`
IPv6Static *testIPv6StaticConfig `json:"ipv6_static,omitempty" required_if:"IPv6Mode=static"`
LLDPMode null.String `json:"lldp_mode,omitempty" one_of:"disabled,basic,all" default:"basic"`
LLDPMode null.String `json:"lldp_mode,omitempty" one_of:"disabled,enabled,rx_only,tx_only,rx_and_tx,basic,all" default:"rx_and_tx"`
LLDPTxTLVs []string `json:"lldp_tx_tlvs,omitempty" one_of:"chassis,port,system,vlan" default:"chassis,port,system,vlan"`
MDNSMode null.String `json:"mdns_mode,omitempty" one_of:"disabled,auto,ipv4_only,ipv6_only" default:"auto"`
TimeSyncMode null.String `json:"time_sync_mode,omitempty" one_of:"ntp_only,ntp_and_http,http_only,custom" default:"ntp_and_http"`

View File

@ -14,16 +14,18 @@ import (
var defaultLogger = logging.GetSubsystemLogger("lldp")
type RunningState struct {
Enabled bool
Running bool
Logger *zerolog.Logger
TPacket *afpacket.TPacket
Ctx context.Context
Cancel context.CancelFunc
}
type LLDP struct {
mu sync.RWMutex
l *zerolog.Logger
tPacketRx *afpacket.TPacket
tPacketTx *afpacket.TPacket
pktSourceRx *gopacket.PacketSource
enableRx bool
enableTx bool
l *zerolog.Logger
interfaceName string
advertiseOptions *AdvertiseOptions
@ -32,15 +34,11 @@ type LLDP struct {
neighbors map[neighborCacheKey]Neighbor
neighborsMu sync.RWMutex
// State tracking
txRunning bool
txCtx context.Context
txCancel context.CancelFunc
Tx *RunningState
Rx *RunningState
rxRunning bool
pktSourceRx *gopacket.PacketSource
rxWaitGroup *sync.WaitGroup
rxCtx context.Context
rxCancel context.CancelFunc
}
type AdvertiseOptions struct {
@ -74,24 +72,32 @@ func NewLLDP(opts *Options) *LLDP {
return &LLDP{
interfaceName: opts.InterfaceName,
advertiseOptions: opts.AdvertiseOptions,
enableRx: opts.EnableRx,
enableTx: opts.EnableTx,
rxWaitGroup: &sync.WaitGroup{},
l: opts.Logger,
neighbors: make(map[neighborCacheKey]Neighbor),
onChange: opts.OnChange,
Rx: &RunningState{
Enabled: opts.EnableRx,
},
Tx: &RunningState{
Enabled: opts.EnableTx,
},
rxWaitGroup: &sync.WaitGroup{},
l: opts.Logger,
neighbors: make(map[neighborCacheKey]Neighbor),
onChange: opts.OnChange,
}
}
func (l *LLDP) Start() error {
if l.enableRx {
l.mu.RLock()
rxEnabled, txEnabled := l.Rx.Enabled, l.Tx.Enabled
l.mu.RUnlock()
if rxEnabled {
if err := l.startRx(); err != nil {
return fmt.Errorf("failed to start RX: %w", err)
}
}
// Start TX if enabled
if l.enableTx {
if txEnabled {
if err := l.startTx(); err != nil {
return fmt.Errorf("failed to start TX: %w", err)
}
@ -103,25 +109,22 @@ func (l *LLDP) Start() error {
// StartRx starts the LLDP receiver if not already running
func (l *LLDP) startRx() error {
l.mu.RLock()
running := l.rxRunning
enabled := l.enableRx
running := l.Rx.Running
enabled := l.Rx.Enabled
l.mu.RUnlock()
if running || !enabled {
return nil
}
if err := l.setUpCapture(); err != nil {
return fmt.Errorf("failed to set up capture: %w", err)
}
return l.startCapture()
}
// SetAdvertiseOptions updates the advertise options and resends LLDP packets if TX is running
func (l *LLDP) SetAdvertiseOptions(opts *AdvertiseOptions) error {
l.mu.Lock()
txRunning := l.txRunning
logger := l.l
txRunning := l.Tx.Running
l.advertiseOptions = opts
l.mu.Unlock()
@ -130,7 +133,7 @@ func (l *LLDP) SetAdvertiseOptions(opts *AdvertiseOptions) error {
if err := l.sendTxPackets(); err != nil {
return fmt.Errorf("failed to resend LLDP packet with new options: %w", err)
}
l.l.Info().Msg("advertise options changed, resent LLDP packet")
logger.Info().Msg("advertise options changed, resent LLDP packet")
}
return nil
@ -138,8 +141,8 @@ func (l *LLDP) SetAdvertiseOptions(opts *AdvertiseOptions) error {
func (l *LLDP) SetRxAndTx(rx, tx bool) error {
l.mu.Lock()
l.enableRx = rx
l.enableTx = tx
l.Rx.Enabled = rx
l.Tx.Enabled = tx
l.mu.Unlock()
// if rx is enabled, start the RX

View File

@ -2,6 +2,7 @@ package lldp
import (
"context"
"errors"
"fmt"
"io"
"net"
@ -10,6 +11,7 @@ import (
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/afpacket"
"github.com/google/gopacket/layers"
"github.com/rs/zerolog"
)
@ -30,21 +32,7 @@ var multicastAddrs = []string{
"01:00:0C:CC:CC:CC",
}
func (l *LLDP) setUpCapture() error {
l.mu.Lock()
defer l.mu.Unlock()
if l.tPacketRx != nil {
return nil
}
logger := l.l.With().Str("interface", l.interfaceName).Logger()
tPacketRx, err := afPacketNewTPacket(l.interfaceName)
if err != nil {
return err
}
logger.Info().Msg("created TPacketRx")
func (l *LLDP) setUpPacketSourceUnderLock(tPacket *afpacket.TPacket, logger *zerolog.Logger) (*gopacket.PacketSource, error) {
// set up multicast addresses
// otherwise the kernel might discard the packets
// another workaround would be to enable promiscuous mode but that's too tricky
@ -72,23 +60,30 @@ func (l *LLDP) setUpCapture() error {
Msg("added multicast address")
}
if err = tPacketRx.SetBPF(bpfFilter); err != nil {
if err := tPacket.SetBPF(bpfFilter); err != nil {
logger.Error().
Err(err).
Msg("unable to set BPF filter")
tPacketRx.Close()
return err
return nil, err
}
logger.Info().Msg("BPF filter set")
l.pktSourceRx = gopacket.NewPacketSource(tPacketRx, layers.LayerTypeEthernet)
l.tPacketRx = tPacketRx
return nil
return gopacket.NewPacketSource(tPacket, layers.LayerTypeEthernet), nil
}
func (l *LLDP) doCapture(logger *zerolog.Logger) {
if l.pktSourceRx == nil || l.rxCtx == nil {
l.mu.Lock()
l.Rx.Running = true
ctx := l.Rx.Ctx
l.mu.Unlock()
defer func() {
l.mu.Lock()
l.Rx.Running = false
l.mu.Unlock()
}()
if l.pktSourceRx == nil || ctx == nil {
logger.Error().Msg("packet source or RX context not initialized")
return
}
@ -101,7 +96,7 @@ func (l *LLDP) doCapture(logger *zerolog.Logger) {
for {
// check if the context is done before blocking call
select {
case <-l.rxCtx.Done():
case <-ctx.Done():
logger.Info().Msg("RX context cancelled")
return
default:
@ -109,7 +104,6 @@ func (l *LLDP) doCapture(logger *zerolog.Logger) {
logger.Trace().Msg("waiting for next packet")
packet, err := l.pktSourceRx.NextPacket()
logger.Trace().Interface("packet", packet).Err(err).Msg("got next packet")
if err != nil {
logger.Error().
@ -117,16 +111,22 @@ func (l *LLDP) doCapture(logger *zerolog.Logger) {
Msg("error getting next packet")
// Immediately break for known unrecoverable errors
if err == io.EOF || err == io.ErrUnexpectedEOF ||
err == io.ErrNoProgress || err == io.ErrClosedPipe || err == io.ErrShortBuffer ||
err == syscall.EBADF ||
if errors.Is(err, io.EOF) ||
errors.Is(err, io.ErrUnexpectedEOF) ||
errors.Is(err, io.ErrNoProgress) ||
errors.Is(err, io.ErrClosedPipe) ||
errors.Is(err, io.ErrShortBuffer) ||
errors.Is(err, syscall.EBADF) ||
strings.Contains(err.Error(), "use of closed file") {
logger.Error().Msg("unrecoverable error, stopping capture")
return
}
continue
}
logger.Trace().Interface("packet", packet).Msg("got next packet")
if err := l.handlePacket(packet, logger); err != nil {
logger.Error().
Err(err).
@ -136,30 +136,52 @@ func (l *LLDP) doCapture(logger *zerolog.Logger) {
}
}
func (l *LLDP) startCapture() error {
func (l *LLDP) prepareCapture() (bool, error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.rxRunning {
return nil // Already running
if l.Rx.Running {
l.l.Debug().Msg("LLDP receiver already running")
return false, nil // Already running
}
if l.tPacketRx == nil {
return fmt.Errorf("AFPacket not initialized")
}
if l.pktSourceRx == nil {
return fmt.Errorf("packet source not initialized")
}
logger := l.l.With().Str("interface", l.interfaceName).Logger()
logger := l.l.With().Str("interface", l.interfaceName).Str("direction", "rx").Logger()
logger.Info().Msg("starting capture LLDP ethernet frames")
// Create a new context for this instance
l.rxCtx, l.rxCancel = context.WithCancel(context.Background())
l.rxRunning = true
tPacket, err := afPacketNewTPacket(l.interfaceName)
if err != nil {
logger.Error().Err(err).Msg("could not create TPacket instance for receiving LLDP packets")
return false, err
}
go l.doCapture(&logger)
l.pktSourceRx, err = l.setUpPacketSourceUnderLock(tPacket, &logger)
if err != nil {
logger.Error().Err(err).Msg("could not create packet source for receiving LLDP packets")
tPacket.Close()
return false, err
}
runningState := &RunningState{Enabled: true, Logger: &logger}
runningState.Ctx, runningState.Cancel = context.WithCancel(context.Background())
runningState.TPacket = tPacket
l.Rx = runningState
logger.Debug().Msg("created packet source for receiving LLDP packets")
return true, nil
}
func (l *LLDP) startCapture() error {
// set up the logger and context (inside a lock)
startRunning, err := l.prepareCapture()
if err != nil {
return err
}
if startRunning {
logger := l.Rx.Logger
go l.doCapture(logger)
}
return nil
}
@ -182,7 +204,7 @@ func (l *LLDP) handlePacket(packet gopacket.Packet, logger *zerolog.Logger) erro
lldpRaw := packet.Layer(layers.LayerTypeLinkLayerDiscovery)
if lldpRaw != nil {
l.l.Trace().Hex("packet", packet.Data()).Msg("received LLDP frame")
logger.Trace().Hex("packet", packet.Data()).Msg("received LLDP frame")
lldpInfo := packet.Layer(layers.LayerTypeLinkLayerDiscoveryInfo)
if lldpInfo == nil {
@ -198,7 +220,7 @@ func (l *LLDP) handlePacket(packet gopacket.Packet, logger *zerolog.Logger) erro
cdpRaw := packet.Layer(layers.LayerTypeCiscoDiscovery)
if cdpRaw != nil {
l.l.Trace().Hex("packet", packet.Data()).Msg("received CDP frame")
logger.Trace().Hex("packet", packet.Data()).Msg("received CDP frame")
cdpInfo := packet.Layer(layers.LayerTypeCiscoDiscoveryInfo)
if cdpInfo == nil {
@ -335,32 +357,33 @@ func (l *LLDP) stopCapture() error {
l.mu.Lock()
defer l.mu.Unlock()
if !l.rxRunning {
if !l.Rx.Running {
return nil // Already stopped
}
logger := l.l.With().Str("interface", l.interfaceName).Logger()
logger := l.Rx.Logger
rxCancel := l.Rx.Cancel
tPacket := l.Rx.TPacket
logger.Info().Msg("stopping LLDP receiver")
// Cancel context to signal stop
rxCancel := l.rxCancel
if rxCancel != nil {
l.Rx.Cancel = nil // so we don't cancel again
rxCancel()
l.rxCancel = nil
logger.Info().Msg("cancelled RX context, waiting for goroutine to finish")
}
// stop the TPacketRx
go func() {
if l.tPacketRx == nil {
if tPacket == nil {
return
}
// write an empty packet to the TPacketRx to interrupt the blocking read
// it's a shitty workaround until https://github.com/google/gopacket/pull/777 is merged,
// or we have a better solution, see https://github.com/google/gopacket/issues/1064
_ = l.tPacketRx.WritePacketData([]byte{})
_ = tPacket.WritePacketData([]byte{})
}()
// wait for the goroutine to finish
@ -368,19 +391,20 @@ func (l *LLDP) stopCapture() error {
l.rxWaitGroup.Wait()
logger.Info().Dur("duration", time.Since(start)).Msg("RX goroutine finished")
l.rxRunning = false
if l.tPacketRx != nil {
logger.Info().Msg("closing TPacketRx")
l.tPacketRx.Close()
l.tPacketRx = nil
}
l.Rx.Running = false
// close the packet source first because it's constructed with the Rx.TPacket
if l.pktSourceRx != nil {
logger.Info().Msg("closing packet source")
l.pktSourceRx = nil
}
if tPacket != nil {
logger.Info().Msg("closing RX TPacket")
tPacket.Close()
l.Rx.TPacket = nil
}
return nil
}

View File

@ -9,7 +9,6 @@ import (
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/rs/zerolog"
)
var (
@ -17,14 +16,10 @@ var (
lldpEtherType = layers.EthernetTypeLinkLayerDiscovery
)
func (l *LLDP) toPayloadValues() []layers.LinkLayerDiscoveryValue {
func (l *LLDP) toPayloadValues(opts *AdvertiseOptions) []layers.LinkLayerDiscoveryValue {
// See also: layers.LinkLayerDiscovery.SerializeTo()
r := []layers.LinkLayerDiscoveryValue{}
l.mu.RLock()
opts := l.advertiseOptions
l.mu.RUnlock()
if opts == nil {
return r
}
@ -74,35 +69,31 @@ func (l *LLDP) toPayloadValues() []layers.LinkLayerDiscoveryValue {
func (l *LLDP) setUpTx() error {
l.mu.Lock()
defer l.mu.Unlock()
// Check if already set up (double-check pattern to prevent duplicate setup)
if l.tPacketTx != nil {
if l.Tx.TPacket != nil {
return nil
}
logger := l.l.With().Str("interface", l.interfaceName).Logger()
tPacketTx, err := afPacketNewTPacket(l.interfaceName)
logger := l.l.With().Str("interface", l.interfaceName).Str("direction", "tx").Logger()
txState := &RunningState{Enabled: true, Logger: &logger}
txState.Ctx, txState.Cancel = context.WithCancel(context.Background())
var err error
txState.TPacket, err = afPacketNewTPacket(l.interfaceName)
if err != nil {
return err
}
logger.Info().Msg("created TPacket instance for sending LLDP packets")
l.tPacketTx = tPacketTx
l.Tx = txState
return nil
}
func (l *LLDP) sendTxPackets() error {
l.mu.RLock()
defer l.mu.RUnlock()
logger := l.l.With().Str("interface", l.interfaceName).Logger()
iface, err := net.InterfaceByName(l.interfaceName)
func (l *LLDP) generateTxPackets(interfaceName string, opts *AdvertiseOptions) (gopacket.SerializeBuffer, error) {
iface, err := net.InterfaceByName(interfaceName)
if err != nil {
return err
}
if l.tPacketTx == nil {
return fmt.Errorf("AFPacket not initialized")
return nil, err
}
// create payload
@ -122,46 +113,76 @@ func (l *LLDP) sendTxPackets() error {
ID: []byte(iface.Name),
},
TTL: uint16(3600),
Values: l.toPayloadValues(),
Values: l.toPayloadValues(opts),
}
buf := gopacket.NewSerializeBuffer()
if err := gopacket.SerializeLayers(buf, gopacket.SerializeOptions{
err = gopacket.SerializeLayers(buf, gopacket.SerializeOptions{
FixLengths: true,
ComputeChecksums: true,
}, &ethFrame, &lldpFrame); err != nil {
l.l.Error().Err(err).Msg("unable to serialize packet")
}, &ethFrame, &lldpFrame)
if err != nil {
return nil, err
}
return buf, nil
}
func (l *LLDP) sendTxPackets() error {
l.mu.RLock()
interfaceName := l.interfaceName
opts := l.advertiseOptions
logger := l.Tx.Logger
tPacket := l.Tx.TPacket
l.mu.RUnlock()
if tPacket == nil {
logger.Error().Msg("AFPacket not initialized for Tx")
return fmt.Errorf("AFPacket not initialized for Tx")
}
logger.Trace().Msg("generating LLDP Tx packets")
buf, err := l.generateTxPackets(interfaceName, opts)
if err != nil {
logger.Error().Err(err).Msg("unable to serialize packet")
return err
}
logger.Trace().Hex("packet", buf.Bytes()).Msg("sending LLDP packet")
start := time.Now()
// send packet
if err := l.tPacketTx.WritePacketData(buf.Bytes()); err != nil {
l.l.Error().Err(err).Msg("unable to send packet")
if err := tPacket.WritePacketData(buf.Bytes()); err != nil {
logger.Error().Err(err).Msg("unable to send packet")
return err
}
logger.Info().Dur("duration", time.Since(start)).Msg("sent LLDP packet")
return nil
}
const txInterval = 30 * time.Second // Standard LLDP transmission interval
func (l *LLDP) doSendPeriodically(logger *zerolog.Logger, txCtx context.Context) {
func (l *LLDP) doSendPeriodically() {
l.mu.Lock()
l.txRunning = true
l.Tx.Running = true
logger := l.Tx.Logger
txCtx := l.Tx.Ctx
l.mu.Unlock()
defer func() {
l.mu.Lock()
l.txRunning = false
l.Tx.Running = false
l.mu.Unlock()
}()
logger.Info().Msg("starting LLDP transmitter")
ticker := time.NewTicker(txInterval)
defer ticker.Stop()
// Send initial packet immediately
logger.Trace().Msg("sending initial LLDP packet")
if err := l.sendTxPackets(); err != nil {
logger.Error().Err(err).Msg("error sending initial LLDP packet")
}
@ -169,6 +190,7 @@ func (l *LLDP) doSendPeriodically(logger *zerolog.Logger, txCtx context.Context)
for {
select {
case <-ticker.C:
logger.Trace().Msg("time to send periodic LLDP packet")
if err := l.sendTxPackets(); err != nil {
logger.Error().Err(err).Msg("error sending LLDP packet")
}
@ -181,50 +203,45 @@ func (l *LLDP) doSendPeriodically(logger *zerolog.Logger, txCtx context.Context)
func (l *LLDP) startTx() error {
l.mu.RLock()
running := l.txRunning
enabled := l.enableTx
cancel := l.txCancel
logger := l.l
running := l.Tx.Running
enabled := l.Tx.Enabled
previousCancel := l.Tx.Cancel
l.mu.RUnlock()
if running || !enabled {
logger.Trace().Bool("running", running).Bool("enabled", enabled).Msg("alrady running Tx or not enabled")
return nil
}
if cancel != nil {
cancel()
if previousCancel != nil {
logger.Trace().Msg("stopping previous Tx context before starting new one")
previousCancel()
}
l.mu.Lock()
l.txCtx, l.txCancel = context.WithCancel(context.Background())
l.mu.Unlock()
if err := l.setUpTx(); err != nil {
return fmt.Errorf("failed to set up TX: %w", err)
}
logger := l.l.With().Str("interface", l.interfaceName).Logger()
logger.Info().Msg("starting LLDP transmitter")
go l.doSendPeriodically(&logger, l.txCtx)
go l.doSendPeriodically()
return nil
}
func (l *LLDP) stopTx() error {
l.mu.Lock()
if !l.txRunning {
if !l.Tx.Running {
l.mu.Unlock()
return nil // Already stopped
}
logger := l.l.With().Str("interface", l.interfaceName).Logger()
logger.Info().Msg("stopping LLDP transmitter")
// Cancel context to signal stop
txCancel := l.txCancel
l.txRunning = false
txCancel := l.Tx.Cancel
l.Tx.Running = false
l.mu.Unlock()
l.Tx.Logger.Info().Msg("stopping LLDP transmitter")
// Cancel context (goroutine will handle cleanup)
if txCancel != nil {
txCancel()

View File

@ -42,7 +42,7 @@ type NetworkConfig struct {
IPv6Mode null.String `json:"ipv6_mode,omitempty" one_of:"slaac,dhcpv6,slaac_and_dhcpv6,static,link_local,disabled" default:"slaac"`
IPv6Static *IPv6StaticConfig `json:"ipv6_static,omitempty" required_if:"IPv6Mode=static"`
LLDPMode null.String `json:"lldp_mode,omitempty" one_of:"disabled,rx_only,tx_only,rx_and_tx,basic,all" default:"rx_and_tx"`
LLDPMode null.String `json:"lldp_mode,omitempty" one_of:"disabled,enabled,rx_only,tx_only,rx_and_tx,basic,all" default:"rx_and_tx"`
LLDPTxTLVs []string `json:"lldp_tx_tlvs,omitempty" one_of:"chassis,port,system,vlan" default:"chassis,port,system,vlan"`
MDNSMode null.String `json:"mdns_mode,omitempty" one_of:"disabled,auto,ipv4_only,ipv6_only" default:"auto"`
TimeSyncMode null.String `json:"time_sync_mode,omitempty" one_of:"ntp_only,ntp_and_http,http_only,custom" default:"ntp_and_http"`
@ -54,7 +54,7 @@ type NetworkConfig struct {
}
func (c *NetworkConfig) ShouldEnableLLDPTransmit() bool {
// backwards compatibility: `basic` mode will be `rx_only` due to privacy concerns
// backwards compatibility: `basic` mode will be `rx_only` due to privacy concerns, `enabled` and `all` will enable transmit
return c.LLDPMode.String != "rx_only" && c.LLDPMode.String != "disabled" && c.LLDPMode.String != "basic"
}

View File

@ -211,6 +211,7 @@ func initNetwork() error {
},
Logger: networkLogger,
})
if err := lldpService.Start(); err != nil {
networkLogger.Error().Err(err).Msg("failed to start LLDP service")
}

View File

@ -414,7 +414,7 @@ func (nm *NetlinkManager) reconcileDefaultRoute(link *Link, expected map[string]
// add remaining expected default routes
for _, gateway := range expected {
nm.logger.Warn().Str("gateway", gateway.String()).Msg("adding default route")
nm.logger.Info().Str("gateway", gateway.String()).Msg("adding default route")
route := &netlink.Route{
Dst: &ipv4DefaultRoute,