Skip to content

Provide real-time service metrics #1874

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
3 changes: 1 addition & 2 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ Metrics options:

To enable kube-router metrics, start kube-router with `--metrics-port` and provide a port over 0

Metrics is generally exported at the same rate as the sync period for each service.
Metrics is generally exported at the same rate as the sync period for each service. Service metrics are exported real-time.

The default values unless other specified is

* iptables-sync-period - `1 min``
* ipvs-sync-period - `1 min``
* routes-sync-period - `1 min``

By enabling
Expand Down
133 changes: 23 additions & 110 deletions pkg/controllers/proxy/network_services_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -115,7 +116,8 @@ type NetworkServicesController struct {
krNode utils.NodeAware
syncPeriod time.Duration
mu sync.Mutex
serviceMap serviceInfoMap
serviceMap atomic.Pointer[serviceInfoMap]
serviceMetricsMap atomic.Pointer[metricsServiceMap]
endpointsMap endpointSliceInfoMap
podCidr string
excludedCidrs []net.IPNet
Expand All @@ -125,7 +127,6 @@ type NetworkServicesController struct {
client kubernetes.Interface
nodeportBindOnAllIP bool
MetricsEnabled bool
metricsMap map[string][]string
ln LinuxNetworking
readyForUpdates bool
ProxyFirewallSetup *sync.Cond
Expand Down Expand Up @@ -373,7 +374,7 @@ func (nsc *NetworkServicesController) Run(healthChan chan<- *healthcheck.Control
// and we don't want to duplicate the effort, so this is a slimmer version of doSync()
klog.V(1).Info("Performing requested sync of ipvs services")
nsc.mu.Lock()
err = nsc.syncIpvsServices(nsc.serviceMap, nsc.endpointsMap)
err = nsc.syncIpvsServices(nsc.getServiceMap(), nsc.endpointsMap)
if err != nil {
klog.Errorf("error during ipvs sync in network service controller. Error: %v", err)
}
Expand Down Expand Up @@ -420,26 +421,19 @@ func (nsc *NetworkServicesController) doSync() error {
klog.Errorf("Failed to do add masquerade rule in POSTROUTING chain of nat table due to: %s", err.Error())
}

nsc.serviceMap = nsc.buildServicesInfo()
nsc.setServiceMap(nsc.buildServicesInfo())
nsc.endpointsMap = nsc.buildEndpointSliceInfo()
err = nsc.syncHairpinIptablesRules()
if err != nil {
klog.Errorf("Error syncing hairpin iptables rules: %s", err.Error())
}

err = nsc.syncIpvsServices(nsc.serviceMap, nsc.endpointsMap)
err = nsc.syncIpvsServices(nsc.getServiceMap(), nsc.endpointsMap)
if err != nil {
klog.Errorf("Error syncing IPVS services: %s", err.Error())
return err
}

if nsc.MetricsEnabled {
err = nsc.publishMetrics(nsc.serviceMap)
if err != nil {
klog.Errorf("Error publishing metrics: %v", err)
return err
}
}
return nil
}

Expand Down Expand Up @@ -725,87 +719,6 @@ func (nsc *NetworkServicesController) syncIpvsFirewall() error {
return nil
}

func (nsc *NetworkServicesController) publishMetrics(serviceInfoMap serviceInfoMap) error {
start := time.Now()
defer func() {
endTime := time.Since(start)
klog.V(2).Infof("Publishing IPVS metrics took %v", endTime)
if nsc.MetricsEnabled {
metrics.ControllerIpvsMetricsExportTime.Observe(endTime.Seconds())
}
}()

ipvsSvcs, err := nsc.ln.ipvsGetServices()
if err != nil {
return errors.New("Failed to list IPVS services: " + err.Error())
}

klog.V(1).Info("Publishing IPVS metrics")
for _, svc := range serviceInfoMap {
var protocol uint16
var pushMetric bool
var svcVip string

protocol = convertSvcProtoToSysCallProto(svc.protocol)
for _, ipvsSvc := range ipvsSvcs {

uPort, err := safecast.ToUint16(svc.port)
if err != nil {
klog.Errorf("failed to convert port %d to uint16: %v", svc.port, err)
}
switch svcAddress := ipvsSvc.Address.String(); svcAddress {
case svc.clusterIP.String():
if protocol == ipvsSvc.Protocol && uPort == ipvsSvc.Port {
pushMetric = true
svcVip = svc.clusterIP.String()
} else {
pushMetric = false
}
case nsc.krNode.GetPrimaryNodeIP().String():
if protocol == ipvsSvc.Protocol && uPort == ipvsSvc.Port {
pushMetric = true
svcVip = nsc.krNode.GetPrimaryNodeIP().String()
} else {
pushMetric = false
}
default:
svcVip = ""
pushMetric = false
}

if pushMetric {

klog.V(3).Infof("Publishing metrics for %s/%s (%s:%d/%s)",
svc.namespace, svc.name, svcVip, svc.port, svc.protocol)

labelValues := []string{
svc.namespace,
svc.name,
svcVip,
svc.protocol,
strconv.Itoa(svc.port),
}

key := generateIPPortID(svcVip, svc.protocol, strconv.Itoa(svc.port))
nsc.metricsMap[key] = labelValues
// these same metrics should be deleted when the service is deleted.
metrics.ServiceBpsIn.WithLabelValues(labelValues...).Set(float64(ipvsSvc.Stats.BPSIn))
metrics.ServiceBpsOut.WithLabelValues(labelValues...).Set(float64(ipvsSvc.Stats.BPSOut))
metrics.ServiceBytesIn.WithLabelValues(labelValues...).Set(float64(ipvsSvc.Stats.BytesIn))
metrics.ServiceBytesOut.WithLabelValues(labelValues...).Set(float64(ipvsSvc.Stats.BytesOut))
metrics.ServiceCPS.WithLabelValues(labelValues...).Set(float64(ipvsSvc.Stats.CPS))
metrics.ServicePacketsIn.WithLabelValues(labelValues...).Set(float64(ipvsSvc.Stats.PacketsIn))
metrics.ServicePacketsOut.WithLabelValues(labelValues...).Set(float64(ipvsSvc.Stats.PacketsOut))
metrics.ServicePpsIn.WithLabelValues(labelValues...).Set(float64(ipvsSvc.Stats.PPSIn))
metrics.ServicePpsOut.WithLabelValues(labelValues...).Set(float64(ipvsSvc.Stats.PPSOut))
metrics.ServiceTotalConn.WithLabelValues(labelValues...).Set(float64(ipvsSvc.Stats.Connections))
metrics.ControllerIpvsServices.Set(float64(len(ipvsSvcs)))
}
}
}
return nil
}

// OnEndpointsUpdate handle change in endpoints update from the API server
func (nsc *NetworkServicesController) OnEndpointsUpdate(es *discovery.EndpointSlice) {

Expand Down Expand Up @@ -845,7 +758,7 @@ func (nsc *NetworkServicesController) OnEndpointsUpdate(es *discovery.EndpointSl

if !endpointsMapsEquivalent(newEndpointsMap, nsc.endpointsMap) {
nsc.endpointsMap = newEndpointsMap
nsc.serviceMap = newServiceMap
nsc.setServiceMap(newServiceMap)
klog.V(1).Infof("Syncing IPVS services sync for update to endpoint: %s/%s", es.Namespace, es.Name)
nsc.sync(synctypeIpvs)
} else {
Expand Down Expand Up @@ -881,9 +794,10 @@ func (nsc *NetworkServicesController) OnServiceUpdate(svc *v1.Service) {
newServiceMap := nsc.buildServicesInfo()
newEndpointsMap := nsc.buildEndpointSliceInfo()

if len(newServiceMap) != len(nsc.serviceMap) || !reflect.DeepEqual(newServiceMap, nsc.serviceMap) {
oldServiceMap := nsc.getServiceMap()
if len(newServiceMap) != len(oldServiceMap) || !reflect.DeepEqual(newServiceMap, oldServiceMap) {
nsc.endpointsMap = newEndpointsMap
nsc.serviceMap = newServiceMap
nsc.setServiceMap(newServiceMap)
klog.V(1).Infof("Syncing IPVS services sync on update to service: %s/%s", svc.Namespace, svc.Name)
nsc.sync(synctypeIpvs)
} else {
Expand Down Expand Up @@ -1270,7 +1184,7 @@ func (nsc *NetworkServicesController) syncHairpinIptablesRules() error {
ipv6RulesNeeded := make(map[string][]string)

// Generate the rules that we need
for svcName, svcInfo := range nsc.serviceMap {
for svcName, svcInfo := range nsc.getServiceMap() {
if nsc.globalHairpin || svcInfo.hairpin {
// If this service doesn't have any active & local endpoints on this node, then skip it as only local
// endpoints matter for hairpinning
Expand Down Expand Up @@ -2024,23 +1938,13 @@ func NewNetworkServicesController(clientset kubernetes.Interface,
return nil, err
}

nsc := NetworkServicesController{ln: ln, ipsetMutex: ipsetMutex, metricsMap: make(map[string][]string),
nsc := NetworkServicesController{ln: ln, ipsetMutex: ipsetMutex,
fwMarkMap: map[uint32]string{}}

if config.MetricsEnabled {
// Register the metrics for this controller
metrics.DefaultRegisterer.MustRegister(metrics.ControllerIpvsServices)
metrics.DefaultRegisterer.MustRegister(&nsc)
metrics.DefaultRegisterer.MustRegister(metrics.ControllerIpvsServicesSyncTime)
metrics.DefaultRegisterer.MustRegister(metrics.ServiceBpsIn)
metrics.DefaultRegisterer.MustRegister(metrics.ServiceBpsOut)
metrics.DefaultRegisterer.MustRegister(metrics.ServiceBytesIn)
metrics.DefaultRegisterer.MustRegister(metrics.ServiceBytesOut)
metrics.DefaultRegisterer.MustRegister(metrics.ServiceCPS)
metrics.DefaultRegisterer.MustRegister(metrics.ServicePacketsIn)
metrics.DefaultRegisterer.MustRegister(metrics.ServicePacketsOut)
metrics.DefaultRegisterer.MustRegister(metrics.ServicePpsIn)
metrics.DefaultRegisterer.MustRegister(metrics.ServicePpsOut)
metrics.DefaultRegisterer.MustRegister(metrics.ServiceTotalConn)
nsc.MetricsEnabled = true
}

Expand All @@ -2050,7 +1954,7 @@ func NewNetworkServicesController(clientset kubernetes.Interface,
nsc.gracefulTermination = config.IpvsGracefulTermination
nsc.globalHairpin = config.GlobalHairpinMode

nsc.serviceMap = make(serviceInfoMap)
nsc.setServiceMap(make(serviceInfoMap))
nsc.endpointsMap = make(endpointSliceInfoMap)
nsc.client = clientset

Expand Down Expand Up @@ -2144,3 +2048,12 @@ func NewNetworkServicesController(clientset kubernetes.Interface,

return &nsc, nil
}

func (nsc *NetworkServicesController) setServiceMap(serviceMap serviceInfoMap) {
nsc.serviceMap.Store(&serviceMap)
nsc.serviceMetricsMap.Store(nil)
}

func (nsc *NetworkServicesController) getServiceMap() serviceInfoMap {
return *nsc.serviceMap.Load()
}
14 changes: 7 additions & 7 deletions pkg/controllers/proxy/network_services_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ var _ = Describe("NetworkServicesController", func() {
waitForListerWithTimeoutG(nsc.svcLister, time.Second*10)
waitForListerWithTimeoutG(nsc.epSliceLister, time.Second*10)

nsc.serviceMap = nsc.buildServicesInfo()
nsc.setServiceMap(nsc.buildServicesInfo())
nsc.endpointsMap = nsc.buildEndpointSliceInfo()
})
Context("service no endpoints with externalIPs", func() {
Expand Down Expand Up @@ -185,7 +185,7 @@ var _ = Describe("NetworkServicesController", func() {
schedFlags{})
_, fooSvc2, _ = lnm.ipvsAddService(lnm.ipvsSvcs, net.ParseIP("5.6.7.8"), 6, 5678, false, 0, "rr",
schedFlags{true, true, false})
syncErr = nsc.syncIpvsServices(nsc.serviceMap, nsc.endpointsMap)
syncErr = nsc.syncIpvsServices(nsc.getServiceMap(), nsc.endpointsMap)
})
It("Should have called syncIpvsServices OK", func() {
Expect(syncErr).To(Succeed())
Expand All @@ -204,7 +204,7 @@ var _ = Describe("NetworkServicesController", func() {
Expect(
mockedLinuxNetworking.setupRoutesForExternalIPForDSRCalls()).To(
ContainElement(
struct{ In1 serviceInfoMap }{In1: nsc.serviceMap}))
struct{ In1 serviceInfoMap }{In1: nsc.getServiceMap()}))
})
It("Should have called ipAddrAdd for ClusterIP and ExternalIPs", func() {
Expect((func() []string {
Expand Down Expand Up @@ -267,7 +267,7 @@ var _ = Describe("NetworkServicesController", func() {
}
})
JustBeforeEach(func() {
syncErr = nsc.syncIpvsServices(nsc.serviceMap, nsc.endpointsMap)
syncErr = nsc.syncIpvsServices(nsc.getServiceMap(), nsc.endpointsMap)
})
It("Should have called syncIpvsServices OK", func() {
Expect(syncErr).To(Succeed())
Expand Down Expand Up @@ -334,7 +334,7 @@ var _ = Describe("NetworkServicesController", func() {
}
})
JustBeforeEach(func() {
syncErr = nsc.syncIpvsServices(nsc.serviceMap, nsc.endpointsMap)
syncErr = nsc.syncIpvsServices(nsc.getServiceMap(), nsc.endpointsMap)
})
It("Should have called syncIpvsServices OK", func() {
Expect(syncErr).To(Succeed())
Expand Down Expand Up @@ -395,7 +395,7 @@ var _ = Describe("NetworkServicesController", func() {
}
})
JustBeforeEach(func() {
syncErr = nsc.syncIpvsServices(nsc.serviceMap, nsc.endpointsMap)
syncErr = nsc.syncIpvsServices(nsc.getServiceMap(), nsc.endpointsMap)
})
It("Should have called syncIpvsServices OK", func() {
Expect(syncErr).To(Succeed())
Expand Down Expand Up @@ -463,7 +463,7 @@ var _ = Describe("NetworkServicesController", func() {
}
})
JustBeforeEach(func() {
syncErr = nsc.syncIpvsServices(nsc.serviceMap, nsc.endpointsMap)
syncErr = nsc.syncIpvsServices(nsc.getServiceMap(), nsc.endpointsMap)
})
It("Should have called syncIpvsServices OK", func() {
Expect(syncErr).To(Succeed())
Expand Down
Loading