kvm/internal/lldp/rx.go

398 lines
9.2 KiB
Go

package lldp
import (
"context"
"fmt"
"io"
"net"
"strings"
"syscall"
"time"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/rs/zerolog"
)
const IFNAMSIZ = 16
var (
lldpDefaultTTL = 120 * time.Second
cdpDefaultTTL = 180 * time.Second
)
var multicastAddrs = []string{
// LLDP
"01:80:C2:00:00:00",
"01:80:C2:00:00:03",
"01:80:C2:00:00:0E",
// CDP
"01:00:0C:CC:CC:CC",
}
func (l *LLDP) setUpCapture() error {
l.mu.Lock()
defer l.mu.Unlock()
if l.tPacketRx != nil {
return nil
}
logger := l.l.With().Str("interface", l.interfaceName).Logger()
tPacketRx, err := afPacketNewTPacket(l.interfaceName)
if err != nil {
return err
}
logger.Info().Msg("created TPacketRx")
// set up multicast addresses
// otherwise the kernel might discard the packets
// another workaround would be to enable promiscuous mode but that's too tricky
for _, mac := range multicastAddrs {
hwAddr, err := net.ParseMAC(mac)
if err != nil {
logger.Error().
Str("mac", mac).
MACAddr("hwaddr", hwAddr).
Err(err).
Msg("unable to parse MAC address")
continue
}
if err := addMulticastAddr(l.interfaceName, hwAddr); err != nil {
logger.Error().
MACAddr("hwaddr", hwAddr).
Err(err).
Msg("unable to add multicast address")
continue
}
logger.Info().
MACAddr("hwaddr", hwAddr).
Msg("added multicast address")
}
if err = tPacketRx.SetBPF(bpfFilter); err != nil {
logger.Error().
Err(err).
Msg("unable to set BPF filter")
tPacketRx.Close()
return err
}
logger.Info().Msg("BPF filter set")
l.pktSourceRx = gopacket.NewPacketSource(tPacketRx, layers.LayerTypeEthernet)
l.tPacketRx = tPacketRx
return nil
}
func (l *LLDP) doCapture(logger *zerolog.Logger) {
if l.pktSourceRx == nil || l.rxCtx == nil {
logger.Error().Msg("packet source or RX context not initialized")
return
}
l.rxWaitGroup.Add(1)
defer l.rxWaitGroup.Done()
// TODO: use a channel to handle the packets
// PacketSource.Packets() is not reliable and can cause panics and the upstream hasn't fixed it yet
for {
// check if the context is done before blocking call
select {
case <-l.rxCtx.Done():
logger.Info().Msg("RX context cancelled")
return
default:
}
logger.Trace().Msg("waiting for next packet")
packet, err := l.pktSourceRx.NextPacket()
logger.Trace().Interface("packet", packet).Err(err).Msg("got next packet")
if err != nil {
logger.Error().
Err(err).
Msg("error getting next packet")
// Immediately break for known unrecoverable errors
if err == io.EOF || err == io.ErrUnexpectedEOF ||
err == io.ErrNoProgress || err == io.ErrClosedPipe || err == io.ErrShortBuffer ||
err == syscall.EBADF ||
strings.Contains(err.Error(), "use of closed file") {
return
}
continue
}
if err := l.handlePacket(packet, logger); err != nil {
logger.Error().
Err(err).
Msg("error handling packet")
continue
}
}
}
func (l *LLDP) startCapture() error {
l.mu.Lock()
defer l.mu.Unlock()
if l.rxRunning {
return nil // Already running
}
if l.tPacketRx == nil {
return fmt.Errorf("AFPacket not initialized")
}
if l.pktSourceRx == nil {
return fmt.Errorf("packet source not initialized")
}
logger := l.l.With().Str("interface", l.interfaceName).Logger()
logger.Info().Msg("starting capture LLDP ethernet frames")
// Create a new context for this instance
l.rxCtx, l.rxCancel = context.WithCancel(context.Background())
l.rxRunning = true
go l.doCapture(&logger)
return nil
}
func (l *LLDP) handlePacket(packet gopacket.Packet, logger *zerolog.Logger) error {
linkLayer := packet.LinkLayer()
if linkLayer == nil {
return fmt.Errorf("no link layer")
}
srcMac := linkLayer.LinkFlow().Src().String()
dstMac := linkLayer.LinkFlow().Dst().String()
logger.Trace().
Str("src_mac", srcMac).
Str("dst_mac", dstMac).
Int("length", len(packet.Data())).
Hex("data", packet.Data()).
Msg("received packet")
lldpRaw := packet.Layer(layers.LayerTypeLinkLayerDiscovery)
if lldpRaw != nil {
l.l.Trace().Hex("packet", packet.Data()).Msg("received LLDP frame")
lldpInfo := packet.Layer(layers.LayerTypeLinkLayerDiscoveryInfo)
if lldpInfo == nil {
return fmt.Errorf("no LLDP info layer")
}
return l.handlePacketLLDP(
srcMac,
lldpRaw.(*layers.LinkLayerDiscovery),
lldpInfo.(*layers.LinkLayerDiscoveryInfo),
)
}
cdpRaw := packet.Layer(layers.LayerTypeCiscoDiscovery)
if cdpRaw != nil {
l.l.Trace().Hex("packet", packet.Data()).Msg("received CDP frame")
cdpInfo := packet.Layer(layers.LayerTypeCiscoDiscoveryInfo)
if cdpInfo == nil {
return fmt.Errorf("no CDP info layer")
}
return l.handlePacketCDP(
srcMac,
cdpRaw.(*layers.CiscoDiscovery),
cdpInfo.(*layers.CiscoDiscoveryInfo),
)
}
return nil
}
func capabilitiesToString(capabilities layers.LLDPCapabilities) []string {
capStr := []string{}
if capabilities.Other {
capStr = append(capStr, "other")
}
if capabilities.Repeater {
capStr = append(capStr, "repeater")
}
if capabilities.Bridge {
capStr = append(capStr, "bridge")
}
if capabilities.WLANAP {
capStr = append(capStr, "wlanap")
}
if capabilities.Router {
capStr = append(capStr, "router")
}
if capabilities.Phone {
capStr = append(capStr, "phone")
}
if capabilities.DocSis {
capStr = append(capStr, "docsis")
}
return capStr
}
func (l *LLDP) handlePacketLLDP(mac string, raw *layers.LinkLayerDiscovery, info *layers.LinkLayerDiscoveryInfo) error {
n := newNeighbor(mac, NeighborSourceLLDP)
ttl := lldpDefaultTTL
for _, v := range raw.Values {
switch v.Type {
case layers.LLDPTLVChassisID:
n.ChassisID = string(raw.ChassisID.ID)
n.Values["chassis_id"] = n.ChassisID
case layers.LLDPTLVPortID:
n.PortID = string(raw.PortID.ID)
n.Values["port_id"] = n.PortID
case layers.LLDPTLVPortDescription:
n.PortDescription = info.PortDescription
n.Values["port_description"] = n.PortDescription
case layers.LLDPTLVSysName:
n.SystemName = info.SysName
n.Values["system_name"] = n.SystemName
case layers.LLDPTLVSysDescription:
n.SystemDescription = info.SysDescription
n.Values["system_description"] = n.SystemDescription
case layers.LLDPTLVMgmtAddress:
mgmtAddress := parseTlvMgmtAddress(v)
if mgmtAddress != nil {
n.ManagementAddresses = append(
n.ManagementAddresses,
lldpMgmtAddressToSerializable(mgmtAddress),
)
}
case layers.LLDPTLVSysCapabilities:
n.Capabilities = capabilitiesToString(info.SysCapabilities.EnabledCap)
case layers.LLDPTLVTTL:
n.TTL = uint16(raw.TTL)
ttl = time.Duration(n.TTL) * time.Second
n.Values["ttl"] = fmt.Sprintf("%d", n.TTL)
case layers.LLDPTLVOrgSpecific:
for _, org := range info.OrgTLVs {
n.Values[fmt.Sprintf("org_specific_%d", org.OUI)] = string(org.Info)
}
}
}
// delete the neighbor if the TTL is less than 1 second
// LLDP shutdown frame should have a TTL of 0 and contains mandatory TLVs only
// but we will simply ignore the TLVs check for now
if ttl < 1*time.Second {
l.deleteNeighbor(n)
} else {
l.addNeighbor(n, ttl)
}
return nil
}
func (l *LLDP) handlePacketCDP(mac string, raw *layers.CiscoDiscovery, info *layers.CiscoDiscoveryInfo) error {
// TODO: implement full CDP parsing
n := newNeighbor(mac, NeighborSourceCDP)
ttl := cdpDefaultTTL
n.ChassisID = info.DeviceID
n.PortID = info.PortID
n.SystemName = info.SysName
n.SystemDescription = info.Platform
n.TTL = uint16(raw.TTL)
if n.TTL > 1 {
ttl = time.Duration(n.TTL) * time.Second
}
for _, addr := range info.MgmtAddresses {
addrFamily := "ipv4"
if addr.To4() == nil {
addrFamily = "ipv6"
}
n.ManagementAddresses = append(n.ManagementAddresses, ManagementAddress{
AddressFamily: addrFamily,
Address: addr.String(),
InterfaceSubtype: "if_name",
InterfaceNumber: 0,
OID: "",
})
}
l.addNeighbor(n, ttl)
return nil
}
func (l *LLDP) stopCapture() error {
l.mu.Lock()
defer l.mu.Unlock()
if !l.rxRunning {
return nil // Already stopped
}
logger := l.l.With().Str("interface", l.interfaceName).Logger()
logger.Info().Msg("stopping LLDP receiver")
// Cancel context to signal stop
rxCancel := l.rxCancel
if rxCancel != nil {
rxCancel()
l.rxCancel = nil
logger.Info().Msg("cancelled RX context, waiting for goroutine to finish")
}
// stop the TPacketRx
go func() {
if l.tPacketRx == nil {
return
}
// write an empty packet to the TPacketRx to interrupt the blocking read
// it's a shitty workaround until https://github.com/google/gopacket/pull/777 is merged,
// or we have a better solution, see https://github.com/google/gopacket/issues/1064
l.tPacketRx.WritePacketData([]byte{})
}()
// wait for the goroutine to finish
start := time.Now()
l.rxWaitGroup.Wait()
logger.Info().Dur("duration", time.Since(start)).Msg("RX goroutine finished")
l.rxRunning = false
if l.tPacketRx != nil {
logger.Info().Msg("closing TPacketRx")
l.tPacketRx.Close()
l.tPacketRx = nil
}
if l.pktSourceRx != nil {
logger.Info().Msg("closing packet source")
l.pktSourceRx = nil
}
return nil
}
func (l *LLDP) stopRx() error {
if err := l.stopCapture(); err != nil {
return err
}
// clean up the neighbors table
l.flushNeighbors()
l.onChange([]Neighbor{})
return nil
}