From 4a84bff8dc4fca5270c93afc3bca5eea6be2a393 Mon Sep 17 00:00:00 2001 From: Omar Nasser Date: Sat, 9 Aug 2025 18:34:44 +0300 Subject: [PATCH 1/3] Replace ListerWatcher with ListerWatcherWithContext --- internal/store/builder.go | 7 ++++--- internal/store/certificatesigningrequest.go | 6 +++--- internal/store/clusterrole.go | 6 +++--- internal/store/clusterrolebinding.go | 6 +++--- internal/store/configmap.go | 6 +++--- internal/store/cronjob.go | 6 +++--- internal/store/daemonset.go | 6 +++--- internal/store/deployment.go | 6 +++--- internal/store/endpoint.go | 6 +++--- internal/store/endpointslice.go | 6 +++--- internal/store/horizontalpodautoscaler.go | 6 +++--- internal/store/ingress.go | 6 +++--- internal/store/ingressclass.go | 6 +++--- internal/store/job.go | 6 +++--- internal/store/lease.go | 6 +++--- internal/store/limitrange.go | 6 +++--- internal/store/mutatingwebhookconfiguration.go | 6 +++--- internal/store/namespace.go | 6 +++--- internal/store/networkpolicy.go | 6 +++--- internal/store/node.go | 6 +++--- internal/store/persistentvolume.go | 6 +++--- internal/store/persistentvolumeclaim.go | 6 +++--- internal/store/pod.go | 6 +++--- internal/store/poddisruptionbudget.go | 6 +++--- internal/store/replicaset.go | 6 +++--- internal/store/replicationcontroller.go | 6 +++--- internal/store/resourcequota.go | 6 +++--- internal/store/role.go | 6 +++--- internal/store/rolebinding.go | 6 +++--- internal/store/secret.go | 6 +++--- internal/store/service.go | 6 +++--- internal/store/serviceaccount.go | 6 +++--- internal/store/statefulset.go | 6 +++--- internal/store/storageclass.go | 6 +++--- internal/store/validatingwebhookconfiguration.go | 6 +++--- internal/store/volumeattachment.go | 6 +++--- pkg/builder/types/interfaces.go | 2 +- 37 files changed, 110 insertions(+), 109 deletions(-) diff --git a/internal/store/builder.go b/internal/store/builder.go index 8cdf11dcf2..e82f856f8a 100644 --- a/internal/store/builder.go +++ b/internal/store/builder.go @@ -516,12 +516,13 @@ func (b *Builder) buildIngressClassStores() []cache.Store { func (b *Builder) buildStores( metricFamilies []generator.FamilyGenerator, expectedType interface{}, - listWatchFunc func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher, + listWatchWithContextFunc func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext, useAPIServerCache bool, objectLimit int64, ) []cache.Store { metricFamilies = generator.FilterFamilyGenerators(b.familyGeneratorFilter, metricFamilies) composedMetricGenFuncs := generator.ComposeMetricGenFuncs(metricFamilies) familyHeaders := generator.ExtractMetricFamilyHeaders(metricFamilies) + var listerWatcher func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher if b.namespaces.IsAllNamespaces() { store := metricsstore.NewMetricsStore( @@ -531,7 +532,7 @@ func (b *Builder) buildStores( if b.fieldSelectorFilter != "" { klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) } - listWatcher := listWatchFunc(b.kubeClient, v1.NamespaceAll, b.fieldSelectorFilter) + listWatcher := listerWatcher(b.kubeClient, v1.NamespaceAll, b.fieldSelectorFilter) b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) return []cache.Store{store} } @@ -545,7 +546,7 @@ func (b *Builder) buildStores( if b.fieldSelectorFilter != "" { klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) } - listWatcher := listWatchFunc(b.kubeClient, ns, b.fieldSelectorFilter) + listWatcher := listerWatcher(b.kubeClient, ns, b.fieldSelectorFilter) b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) stores = append(stores, store) } diff --git a/internal/store/certificatesigningrequest.go b/internal/store/certificatesigningrequest.go index ea287e1490..da1a1a72dc 100644 --- a/internal/store/certificatesigningrequest.go +++ b/internal/store/certificatesigningrequest.go @@ -153,12 +153,12 @@ func wrapCSRFunc(f func(*certv1.CertificateSigningRequest) *metric.Family) func( } } -func createCSRListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createCSRListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.CertificatesV1().CertificateSigningRequests().List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return kubeClient.CertificatesV1().CertificateSigningRequests().Watch(context.TODO(), opts) }, } diff --git a/internal/store/clusterrole.go b/internal/store/clusterrole.go index 374a48a10e..7c9a50b914 100644 --- a/internal/store/clusterrole.go +++ b/internal/store/clusterrole.go @@ -138,12 +138,12 @@ func clusterRoleMetricFamilies(allowAnnotationsList, allowLabelsList []string) [ } } -func createClusterRoleListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createClusterRoleListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.RbacV1().ClusterRoles().List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return kubeClient.RbacV1().ClusterRoles().Watch(context.TODO(), opts) }, } diff --git a/internal/store/clusterrolebinding.go b/internal/store/clusterrolebinding.go index 302a6b40f4..3be0114161 100644 --- a/internal/store/clusterrolebinding.go +++ b/internal/store/clusterrolebinding.go @@ -140,12 +140,12 @@ func clusterRoleBindingMetricFamilies(allowAnnotationsList, allowLabelsList []st } } -func createClusterRoleBindingListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createClusterRoleBindingListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.RbacV1().ClusterRoleBindings().List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return kubeClient.RbacV1().ClusterRoleBindings().Watch(context.TODO(), opts) }, } diff --git a/internal/store/configmap.go b/internal/store/configmap.go index 7fc7f246b0..97c58c0d16 100644 --- a/internal/store/configmap.go +++ b/internal/store/configmap.go @@ -134,13 +134,13 @@ func configMapMetricFamilies(allowAnnotationsList, allowLabelsList []string) []g } } -func createConfigMapListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createConfigMapListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().ConfigMaps(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().ConfigMaps(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/cronjob.go b/internal/store/cronjob.go index eae698fafb..4bb0539127 100644 --- a/internal/store/cronjob.go +++ b/internal/store/cronjob.go @@ -338,13 +338,13 @@ func wrapCronJobFunc(f func(*batchv1.CronJob) *metric.Family) func(interface{}) } } -func createCronJobListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createCronJobListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.BatchV1().CronJobs(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.BatchV1().CronJobs(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/daemonset.go b/internal/store/daemonset.go index 2ef2bb0c4b..1ff9f55ea2 100644 --- a/internal/store/daemonset.go +++ b/internal/store/daemonset.go @@ -284,13 +284,13 @@ func wrapDaemonSetFunc(f func(*v1.DaemonSet) *metric.Family) func(interface{}) * } } -func createDaemonSetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createDaemonSetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.AppsV1().DaemonSets(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.AppsV1().DaemonSets(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/deployment.go b/internal/store/deployment.go index 7f902b0c16..1d65a2af22 100644 --- a/internal/store/deployment.go +++ b/internal/store/deployment.go @@ -344,13 +344,13 @@ func wrapDeploymentFunc(f func(*v1.Deployment) *metric.Family) func(interface{}) } } -func createDeploymentListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createDeploymentListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.AppsV1().Deployments(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.AppsV1().Deployments(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/endpoint.go b/internal/store/endpoint.go index fddfa4fde5..1551b138d0 100644 --- a/internal/store/endpoint.go +++ b/internal/store/endpoint.go @@ -199,13 +199,13 @@ func wrapEndpointFunc(f func(*v1.Endpoints) *metric.Family) func(interface{}) *m } } -func createEndpointsListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createEndpointsListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().Endpoints(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().Endpoints(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/endpointslice.go b/internal/store/endpointslice.go index 6f5eeae751..a687dd0757 100644 --- a/internal/store/endpointslice.go +++ b/internal/store/endpointslice.go @@ -255,13 +255,13 @@ func wrapEndpointSliceFunc(f func(*discoveryv1.EndpointSlice) *metric.Family) fu } } -func createEndpointSliceListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createEndpointSliceListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.DiscoveryV1().EndpointSlices(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/horizontalpodautoscaler.go b/internal/store/horizontalpodautoscaler.go index d0bc3f28b9..646e3202d5 100644 --- a/internal/store/horizontalpodautoscaler.go +++ b/internal/store/horizontalpodautoscaler.go @@ -83,13 +83,13 @@ func wrapHPAFunc(f func(*autoscaling.HorizontalPodAutoscaler) *metric.Family) fu } } -func createHPAListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createHPAListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.AutoscalingV2().HorizontalPodAutoscalers(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.AutoscalingV2().HorizontalPodAutoscalers(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/ingress.go b/internal/store/ingress.go index 00edd6f181..317c01244f 100644 --- a/internal/store/ingress.go +++ b/internal/store/ingress.go @@ -223,13 +223,13 @@ func wrapIngressFunc(f func(*networkingv1.Ingress) *metric.Family) func(interfac } } -func createIngressListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createIngressListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.NetworkingV1().Ingresses(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.NetworkingV1().Ingresses(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/ingressclass.go b/internal/store/ingressclass.go index 0724c9110a..0ae2b6ab70 100644 --- a/internal/store/ingressclass.go +++ b/internal/store/ingressclass.go @@ -134,12 +134,12 @@ func wrapIngressClassFunc(f func(*networkingv1.IngressClass) *metric.Family) fun } } -func createIngressClassListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createIngressClassListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.NetworkingV1().IngressClasses().List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return kubeClient.NetworkingV1().IngressClasses().Watch(context.TODO(), opts) }, } diff --git a/internal/store/job.go b/internal/store/job.go index bac4806384..c371092d99 100644 --- a/internal/store/job.go +++ b/internal/store/job.go @@ -440,13 +440,13 @@ func wrapJobFunc(f func(*v1batch.Job) *metric.Family) func(interface{}) *metric. } } -func createJobListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createJobListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.BatchV1().Jobs(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.BatchV1().Jobs(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/lease.go b/internal/store/lease.go index 3df4251da0..e915bdc11d 100644 --- a/internal/store/lease.go +++ b/internal/store/lease.go @@ -116,13 +116,13 @@ func wrapLeaseFunc(f func(*coordinationv1.Lease) *metric.Family) func(interface{ } } -func createLeaseListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createLeaseListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.CoordinationV1().Leases(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.CoordinationV1().Leases(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/limitrange.go b/internal/store/limitrange.go index 1be7c1c370..45340a20fb 100644 --- a/internal/store/limitrange.go +++ b/internal/store/limitrange.go @@ -129,13 +129,13 @@ func wrapLimitRangeFunc(f func(*v1.LimitRange) *metric.Family) func(interface{}) } } -func createLimitRangeListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createLimitRangeListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().LimitRanges(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().LimitRanges(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/mutatingwebhookconfiguration.go b/internal/store/mutatingwebhookconfiguration.go index 8380346ace..5f1d0da466 100644 --- a/internal/store/mutatingwebhookconfiguration.go +++ b/internal/store/mutatingwebhookconfiguration.go @@ -111,12 +111,12 @@ var ( } ) -func createMutatingWebhookConfigurationListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createMutatingWebhookConfigurationListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Watch(context.TODO(), opts) }, } diff --git a/internal/store/namespace.go b/internal/store/namespace.go index e46c73cf28..1908d0ed0f 100644 --- a/internal/store/namespace.go +++ b/internal/store/namespace.go @@ -176,12 +176,12 @@ func wrapNamespaceFunc(f func(*v1.Namespace) *metric.Family) func(interface{}) * } } -func createNamespaceListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createNamespaceListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.CoreV1().Namespaces().List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return kubeClient.CoreV1().Namespaces().Watch(context.TODO(), opts) }, } diff --git a/internal/store/networkpolicy.go b/internal/store/networkpolicy.go index 7d546b8c29..92c0b65000 100644 --- a/internal/store/networkpolicy.go +++ b/internal/store/networkpolicy.go @@ -156,13 +156,13 @@ func wrapNetworkPolicyFunc(f func(*networkingv1.NetworkPolicy) *metric.Family) f } } -func createNetworkPolicyListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createNetworkPolicyListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.NetworkingV1().NetworkPolicies(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.NetworkingV1().NetworkPolicies(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/node.go b/internal/store/node.go index 7bd6e4aa8f..978b6488bf 100644 --- a/internal/store/node.go +++ b/internal/store/node.go @@ -520,12 +520,12 @@ func wrapNodeFunc(f func(*v1.Node) *metric.Family) func(interface{}) *metric.Fam } } -func createNodeListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createNodeListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.CoreV1().Nodes().List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return kubeClient.CoreV1().Nodes().Watch(context.TODO(), opts) }, } diff --git a/internal/store/persistentvolume.go b/internal/store/persistentvolume.go index c24ea76da1..7b02696870 100644 --- a/internal/store/persistentvolume.go +++ b/internal/store/persistentvolume.go @@ -76,12 +76,12 @@ func wrapPersistentVolumeFunc(f func(*v1.PersistentVolume) *metric.Family) func( } } -func createPersistentVolumeListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createPersistentVolumeListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.CoreV1().PersistentVolumes().List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return kubeClient.CoreV1().PersistentVolumes().Watch(context.TODO(), opts) }, } diff --git a/internal/store/persistentvolumeclaim.go b/internal/store/persistentvolumeclaim.go index a8f7cdedc2..033dd386fd 100644 --- a/internal/store/persistentvolumeclaim.go +++ b/internal/store/persistentvolumeclaim.go @@ -282,13 +282,13 @@ func wrapPersistentVolumeClaimFunc(f func(*v1.PersistentVolumeClaim) *metric.Fam } } -func createPersistentVolumeClaimListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createPersistentVolumeClaimListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().PersistentVolumeClaims(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/pod.go b/internal/store/pod.go index d4744dbb2d..e2d89549f0 100644 --- a/internal/store/pod.go +++ b/internal/store/pod.go @@ -1819,13 +1819,13 @@ func wrapPodFunc(f func(*v1.Pod) *metric.Family) func(interface{}) *metric.Famil } } -func createPodListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createPodListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().Pods(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().Pods(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/poddisruptionbudget.go b/internal/store/poddisruptionbudget.go index c4fa0cb5b0..3e1f09cbb9 100644 --- a/internal/store/poddisruptionbudget.go +++ b/internal/store/poddisruptionbudget.go @@ -202,13 +202,13 @@ func wrapPodDisruptionBudgetFunc(f func(*policyv1.PodDisruptionBudget) *metric.F } } -func createPodDisruptionBudgetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createPodDisruptionBudgetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.PolicyV1().PodDisruptionBudgets(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.PolicyV1().PodDisruptionBudgets(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/replicaset.go b/internal/store/replicaset.go index 192299ff6d..67cd70d307 100644 --- a/internal/store/replicaset.go +++ b/internal/store/replicaset.go @@ -270,13 +270,13 @@ func wrapReplicaSetFunc(f func(*v1.ReplicaSet) *metric.Family) func(interface{}) } } -func createReplicaSetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createReplicaSetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.AppsV1().ReplicaSets(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.AppsV1().ReplicaSets(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/replicationcontroller.go b/internal/store/replicationcontroller.go index f29b1a9226..6fb37fe3be 100644 --- a/internal/store/replicationcontroller.go +++ b/internal/store/replicationcontroller.go @@ -226,13 +226,13 @@ func wrapReplicationControllerFunc(f func(*v1.ReplicationController) *metric.Fam } } -func createReplicationControllerListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createReplicationControllerListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().ReplicationControllers(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().ReplicationControllers(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/resourcequota.go b/internal/store/resourcequota.go index 6fc017cfc6..ea8008bac5 100644 --- a/internal/store/resourcequota.go +++ b/internal/store/resourcequota.go @@ -154,13 +154,13 @@ func wrapResourceQuotaFunc(f func(*v1.ResourceQuota) *metric.Family) func(interf } } -func createResourceQuotaListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createResourceQuotaListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().ResourceQuotas(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().ResourceQuotas(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/role.go b/internal/store/role.go index 1ba5707c92..c0eb1ccd20 100644 --- a/internal/store/role.go +++ b/internal/store/role.go @@ -138,13 +138,13 @@ func roleMetricFamilies(allowAnnotationsList, allowLabelsList []string) []genera } } -func createRoleListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createRoleListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.RbacV1().Roles(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.RbacV1().Roles(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/rolebinding.go b/internal/store/rolebinding.go index f371581dfe..10af16aa5e 100644 --- a/internal/store/rolebinding.go +++ b/internal/store/rolebinding.go @@ -140,13 +140,13 @@ func roleBindingMetricFamilies(allowAnnotationsList, allowLabelsList []string) [ } } -func createRoleBindingListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createRoleBindingListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.RbacV1().RoleBindings(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.RbacV1().RoleBindings(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/secret.go b/internal/store/secret.go index 644cbb7798..f85b866b63 100644 --- a/internal/store/secret.go +++ b/internal/store/secret.go @@ -218,13 +218,13 @@ func wrapSecretFunc(f func(*v1.Secret) *metric.Family) func(interface{}) *metric } } -func createSecretListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createSecretListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().Secrets(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().Secrets(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/service.go b/internal/store/service.go index e4a8282e58..f82c3fb31e 100644 --- a/internal/store/service.go +++ b/internal/store/service.go @@ -201,13 +201,13 @@ func wrapSvcFunc(f func(*v1.Service) *metric.Family) func(interface{}) *metric.F } } -func createServiceListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createServiceListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().Services(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().Services(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/serviceaccount.go b/internal/store/serviceaccount.go index f7b7f9394f..322a17564f 100644 --- a/internal/store/serviceaccount.go +++ b/internal/store/serviceaccount.go @@ -235,13 +235,13 @@ func wrapServiceAccountFunc(f func(*v1.ServiceAccount) *metric.Family) func(inte } } -func createServiceAccountListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createServiceAccountListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().ServiceAccounts(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.CoreV1().ServiceAccounts(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/statefulset.go b/internal/store/statefulset.go index 2cb6329d7c..23722f2a9e 100644 --- a/internal/store/statefulset.go +++ b/internal/store/statefulset.go @@ -338,13 +338,13 @@ func wrapStatefulSetFunc(f func(*v1.StatefulSet) *metric.Family) func(interface{ } } -func createStatefulSetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher { +func createStatefulSetListWatch(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return kubeClient.AppsV1().StatefulSets(ns).List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return kubeClient.AppsV1().StatefulSets(ns).Watch(context.TODO(), opts) }, diff --git a/internal/store/storageclass.go b/internal/store/storageclass.go index 15c76d7965..40d130d384 100644 --- a/internal/store/storageclass.go +++ b/internal/store/storageclass.go @@ -146,12 +146,12 @@ func wrapStorageClassFunc(f func(*storagev1.StorageClass) *metric.Family) func(i } } -func createStorageClassListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createStorageClassListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.StorageV1().StorageClasses().List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return kubeClient.StorageV1().StorageClasses().Watch(context.TODO(), opts) }, } diff --git a/internal/store/validatingwebhookconfiguration.go b/internal/store/validatingwebhookconfiguration.go index 070daba51a..99c5df6826 100644 --- a/internal/store/validatingwebhookconfiguration.go +++ b/internal/store/validatingwebhookconfiguration.go @@ -111,12 +111,12 @@ var ( } ) -func createValidatingWebhookConfigurationListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createValidatingWebhookConfigurationListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().Watch(context.TODO(), opts) }, } diff --git a/internal/store/volumeattachment.go b/internal/store/volumeattachment.go index 22cdd8e963..d6aebe3d50 100644 --- a/internal/store/volumeattachment.go +++ b/internal/store/volumeattachment.go @@ -167,12 +167,12 @@ func wrapVolumeAttachmentFunc(f func(*storagev1.VolumeAttachment) *metric.Family } } -func createVolumeAttachmentListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcher { +func createVolumeAttachmentListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { return kubeClient.StorageV1().VolumeAttachments().List(context.TODO(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return kubeClient.StorageV1().VolumeAttachments().Watch(context.TODO(), opts) }, } diff --git a/pkg/builder/types/interfaces.go b/pkg/builder/types/interfaces.go index bc13ca3c3e..c1586e2ef9 100644 --- a/pkg/builder/types/interfaces.go +++ b/pkg/builder/types/interfaces.go @@ -56,7 +56,7 @@ type BuilderInterface interface { // BuildStoresFunc function signature that is used to return a list of cache.Store type BuildStoresFunc func(metricFamilies []generator.FamilyGenerator, expectedType interface{}, - listWatchFunc func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher, + listWatchWithContextFunc func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext, useAPIServerCache bool, limit int64, ) []cache.Store From aad019eafed64618c0630fa461f477833c9adf64 Mon Sep 17 00:00:00 2001 From: Omar Nasser Date: Sat, 9 Aug 2025 20:18:25 +0300 Subject: [PATCH 2/3] Fix unused ctx --- internal/store/builder.go | 8 +++++--- internal/store/certificatesigningrequest.go | 4 ++-- internal/store/clusterrole.go | 4 ++-- internal/store/clusterrolebinding.go | 4 ++-- internal/store/configmap.go | 4 ++-- internal/store/cronjob.go | 4 ++-- internal/store/daemonset.go | 4 ++-- internal/store/deployment.go | 4 ++-- internal/store/endpoint.go | 4 ++-- internal/store/endpointslice.go | 4 ++-- internal/store/horizontalpodautoscaler.go | 4 ++-- internal/store/ingress.go | 4 ++-- internal/store/ingressclass.go | 4 ++-- internal/store/job.go | 4 ++-- internal/store/lease.go | 4 ++-- internal/store/limitrange.go | 4 ++-- internal/store/mutatingwebhookconfiguration.go | 4 ++-- internal/store/namespace.go | 4 ++-- internal/store/networkpolicy.go | 4 ++-- internal/store/node.go | 4 ++-- internal/store/persistentvolume.go | 4 ++-- internal/store/persistentvolumeclaim.go | 4 ++-- internal/store/pod.go | 4 ++-- internal/store/poddisruptionbudget.go | 4 ++-- internal/store/replicaset.go | 4 ++-- internal/store/replicationcontroller.go | 4 ++-- internal/store/resourcequota.go | 4 ++-- internal/store/role.go | 4 ++-- internal/store/rolebinding.go | 4 ++-- internal/store/secret.go | 4 ++-- internal/store/service.go | 4 ++-- internal/store/serviceaccount.go | 4 ++-- internal/store/statefulset.go | 4 ++-- internal/store/storageclass.go | 4 ++-- internal/store/validatingwebhookconfiguration.go | 4 ++-- internal/store/volumeattachment.go | 4 ++-- pkg/app/server_test.go | 11 ++++------- pkg/builder/builder_test.go | 2 +- pkg/builder/types/interfaces.go | 2 +- pkg/customresource/registry_factory.go | 2 +- pkg/customresourcestate/custom_resource_metrics.go | 7 +++---- 41 files changed, 85 insertions(+), 87 deletions(-) diff --git a/internal/store/builder.go b/internal/store/builder.go index e82f856f8a..7100577997 100644 --- a/internal/store/builder.go +++ b/internal/store/builder.go @@ -558,7 +558,7 @@ func (b *Builder) buildStores( func (b *Builder) buildCustomResourceStores(resourceName string, metricFamilies []generator.FamilyGenerator, expectedType interface{}, - listWatchFunc func(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher, + listWatchWithContextFunc func(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcherWithContext, useAPIServerCache bool, objectLimit int64, ) []cache.Store { metricFamilies = generator.FilterFamilyGenerators(b.familyGeneratorFilter, metricFamilies) @@ -566,6 +566,8 @@ func (b *Builder) buildCustomResourceStores(resourceName string, familyHeaders := generator.ExtractMetricFamilyHeaders(metricFamilies) + var listerWatcher func(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher + gvr, err := util.GVRFromType(resourceName, expectedType) if err != nil { klog.ErrorS(err, "Failed to get GVR from type", "resourceName", resourceName, "expectedType", expectedType) @@ -590,7 +592,7 @@ func (b *Builder) buildCustomResourceStores(resourceName string, if b.fieldSelectorFilter != "" { klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) } - listWatcher := listWatchFunc(customResourceClient, v1.NamespaceAll, b.fieldSelectorFilter) + listWatcher := listerWatcher(customResourceClient, v1.NamespaceAll, b.fieldSelectorFilter) b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) return []cache.Store{store} } @@ -602,7 +604,7 @@ func (b *Builder) buildCustomResourceStores(resourceName string, composedMetricGenFuncs, ) klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) - listWatcher := listWatchFunc(customResourceClient, ns, b.fieldSelectorFilter) + listWatcher := listerWatcher(customResourceClient, ns, b.fieldSelectorFilter) b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) stores = append(stores, store) } diff --git a/internal/store/certificatesigningrequest.go b/internal/store/certificatesigningrequest.go index da1a1a72dc..452b0517fc 100644 --- a/internal/store/certificatesigningrequest.go +++ b/internal/store/certificatesigningrequest.go @@ -156,10 +156,10 @@ func wrapCSRFunc(f func(*certv1.CertificateSigningRequest) *metric.Family) func( func createCSRListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.CertificatesV1().CertificateSigningRequests().List(context.TODO(), opts) + return kubeClient.CertificatesV1().CertificateSigningRequests().List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.CertificatesV1().CertificateSigningRequests().Watch(context.TODO(), opts) + return kubeClient.CertificatesV1().CertificateSigningRequests().Watch(ctx, opts) }, } } diff --git a/internal/store/clusterrole.go b/internal/store/clusterrole.go index 7c9a50b914..0108513303 100644 --- a/internal/store/clusterrole.go +++ b/internal/store/clusterrole.go @@ -141,10 +141,10 @@ func clusterRoleMetricFamilies(allowAnnotationsList, allowLabelsList []string) [ func createClusterRoleListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.RbacV1().ClusterRoles().List(context.TODO(), opts) + return kubeClient.RbacV1().ClusterRoles().List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.RbacV1().ClusterRoles().Watch(context.TODO(), opts) + return kubeClient.RbacV1().ClusterRoles().Watch(ctx, opts) }, } } diff --git a/internal/store/clusterrolebinding.go b/internal/store/clusterrolebinding.go index 3be0114161..1adc40191f 100644 --- a/internal/store/clusterrolebinding.go +++ b/internal/store/clusterrolebinding.go @@ -143,10 +143,10 @@ func clusterRoleBindingMetricFamilies(allowAnnotationsList, allowLabelsList []st func createClusterRoleBindingListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.RbacV1().ClusterRoleBindings().List(context.TODO(), opts) + return kubeClient.RbacV1().ClusterRoleBindings().List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.RbacV1().ClusterRoleBindings().Watch(context.TODO(), opts) + return kubeClient.RbacV1().ClusterRoleBindings().Watch(ctx, opts) }, } } diff --git a/internal/store/configmap.go b/internal/store/configmap.go index 97c58c0d16..66935516b3 100644 --- a/internal/store/configmap.go +++ b/internal/store/configmap.go @@ -138,11 +138,11 @@ func createConfigMapListWatch(kubeClient clientset.Interface, ns string, fieldSe return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ConfigMaps(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().ConfigMaps(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ConfigMaps(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().ConfigMaps(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/cronjob.go b/internal/store/cronjob.go index 4bb0539127..bdbba39e76 100644 --- a/internal/store/cronjob.go +++ b/internal/store/cronjob.go @@ -342,11 +342,11 @@ func createCronJobListWatch(kubeClient clientset.Interface, ns string, fieldSele return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.BatchV1().CronJobs(ns).List(context.TODO(), opts) + return kubeClient.BatchV1().CronJobs(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.BatchV1().CronJobs(ns).Watch(context.TODO(), opts) + return kubeClient.BatchV1().CronJobs(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/daemonset.go b/internal/store/daemonset.go index 1ff9f55ea2..196f5d4d00 100644 --- a/internal/store/daemonset.go +++ b/internal/store/daemonset.go @@ -288,11 +288,11 @@ func createDaemonSetListWatch(kubeClient clientset.Interface, ns string, fieldSe return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().DaemonSets(ns).List(context.TODO(), opts) + return kubeClient.AppsV1().DaemonSets(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().DaemonSets(ns).Watch(context.TODO(), opts) + return kubeClient.AppsV1().DaemonSets(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/deployment.go b/internal/store/deployment.go index 1d65a2af22..302a5c41c3 100644 --- a/internal/store/deployment.go +++ b/internal/store/deployment.go @@ -348,11 +348,11 @@ func createDeploymentListWatch(kubeClient clientset.Interface, ns string, fieldS return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().Deployments(ns).List(context.TODO(), opts) + return kubeClient.AppsV1().Deployments(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().Deployments(ns).Watch(context.TODO(), opts) + return kubeClient.AppsV1().Deployments(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/endpoint.go b/internal/store/endpoint.go index 1551b138d0..5388e045dc 100644 --- a/internal/store/endpoint.go +++ b/internal/store/endpoint.go @@ -203,11 +203,11 @@ func createEndpointsListWatch(kubeClient clientset.Interface, ns string, fieldSe return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Endpoints(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().Endpoints(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Endpoints(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().Endpoints(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/endpointslice.go b/internal/store/endpointslice.go index a687dd0757..a70d47cc85 100644 --- a/internal/store/endpointslice.go +++ b/internal/store/endpointslice.go @@ -259,11 +259,11 @@ func createEndpointSliceListWatch(kubeClient clientset.Interface, ns string, fie return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), opts) + return kubeClient.DiscoveryV1().EndpointSlices(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.DiscoveryV1().EndpointSlices(ns).Watch(context.TODO(), opts) + return kubeClient.DiscoveryV1().EndpointSlices(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/horizontalpodautoscaler.go b/internal/store/horizontalpodautoscaler.go index 646e3202d5..afc49e2e37 100644 --- a/internal/store/horizontalpodautoscaler.go +++ b/internal/store/horizontalpodautoscaler.go @@ -87,11 +87,11 @@ func createHPAListWatch(kubeClient clientset.Interface, ns string, fieldSelector return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.AutoscalingV2().HorizontalPodAutoscalers(ns).List(context.TODO(), opts) + return kubeClient.AutoscalingV2().HorizontalPodAutoscalers(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.AutoscalingV2().HorizontalPodAutoscalers(ns).Watch(context.TODO(), opts) + return kubeClient.AutoscalingV2().HorizontalPodAutoscalers(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/ingress.go b/internal/store/ingress.go index 317c01244f..560030975e 100644 --- a/internal/store/ingress.go +++ b/internal/store/ingress.go @@ -227,11 +227,11 @@ func createIngressListWatch(kubeClient clientset.Interface, ns string, fieldSele return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.NetworkingV1().Ingresses(ns).List(context.TODO(), opts) + return kubeClient.NetworkingV1().Ingresses(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.NetworkingV1().Ingresses(ns).Watch(context.TODO(), opts) + return kubeClient.NetworkingV1().Ingresses(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/ingressclass.go b/internal/store/ingressclass.go index 0ae2b6ab70..7b4bea2d80 100644 --- a/internal/store/ingressclass.go +++ b/internal/store/ingressclass.go @@ -137,10 +137,10 @@ func wrapIngressClassFunc(f func(*networkingv1.IngressClass) *metric.Family) fun func createIngressClassListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.NetworkingV1().IngressClasses().List(context.TODO(), opts) + return kubeClient.NetworkingV1().IngressClasses().List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.NetworkingV1().IngressClasses().Watch(context.TODO(), opts) + return kubeClient.NetworkingV1().IngressClasses().Watch(ctx, opts) }, } } diff --git a/internal/store/job.go b/internal/store/job.go index c371092d99..0240019045 100644 --- a/internal/store/job.go +++ b/internal/store/job.go @@ -444,11 +444,11 @@ func createJobListWatch(kubeClient clientset.Interface, ns string, fieldSelector return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.BatchV1().Jobs(ns).List(context.TODO(), opts) + return kubeClient.BatchV1().Jobs(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.BatchV1().Jobs(ns).Watch(context.TODO(), opts) + return kubeClient.BatchV1().Jobs(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/lease.go b/internal/store/lease.go index e915bdc11d..eac54d56ab 100644 --- a/internal/store/lease.go +++ b/internal/store/lease.go @@ -120,11 +120,11 @@ func createLeaseListWatch(kubeClient clientset.Interface, ns string, fieldSelect return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoordinationV1().Leases(ns).List(context.TODO(), opts) + return kubeClient.CoordinationV1().Leases(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoordinationV1().Leases(ns).Watch(context.TODO(), opts) + return kubeClient.CoordinationV1().Leases(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/limitrange.go b/internal/store/limitrange.go index 45340a20fb..a717e687e2 100644 --- a/internal/store/limitrange.go +++ b/internal/store/limitrange.go @@ -133,11 +133,11 @@ func createLimitRangeListWatch(kubeClient clientset.Interface, ns string, fieldS return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().LimitRanges(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().LimitRanges(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().LimitRanges(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().LimitRanges(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/mutatingwebhookconfiguration.go b/internal/store/mutatingwebhookconfiguration.go index 5f1d0da466..4d8a604ed8 100644 --- a/internal/store/mutatingwebhookconfiguration.go +++ b/internal/store/mutatingwebhookconfiguration.go @@ -114,10 +114,10 @@ var ( func createMutatingWebhookConfigurationListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().List(context.TODO(), opts) + return kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Watch(context.TODO(), opts) + return kubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations().Watch(ctx, opts) }, } } diff --git a/internal/store/namespace.go b/internal/store/namespace.go index 1908d0ed0f..48cf5e7efd 100644 --- a/internal/store/namespace.go +++ b/internal/store/namespace.go @@ -179,10 +179,10 @@ func wrapNamespaceFunc(f func(*v1.Namespace) *metric.Family) func(interface{}) * func createNamespaceListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.CoreV1().Namespaces().List(context.TODO(), opts) + return kubeClient.CoreV1().Namespaces().List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.CoreV1().Namespaces().Watch(context.TODO(), opts) + return kubeClient.CoreV1().Namespaces().Watch(ctx, opts) }, } } diff --git a/internal/store/networkpolicy.go b/internal/store/networkpolicy.go index 92c0b65000..c36c451dc9 100644 --- a/internal/store/networkpolicy.go +++ b/internal/store/networkpolicy.go @@ -160,11 +160,11 @@ func createNetworkPolicyListWatch(kubeClient clientset.Interface, ns string, fie return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.NetworkingV1().NetworkPolicies(ns).List(context.TODO(), opts) + return kubeClient.NetworkingV1().NetworkPolicies(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.NetworkingV1().NetworkPolicies(ns).Watch(context.TODO(), opts) + return kubeClient.NetworkingV1().NetworkPolicies(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/node.go b/internal/store/node.go index 978b6488bf..85f1e579fd 100644 --- a/internal/store/node.go +++ b/internal/store/node.go @@ -523,10 +523,10 @@ func wrapNodeFunc(f func(*v1.Node) *metric.Family) func(interface{}) *metric.Fam func createNodeListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.CoreV1().Nodes().List(context.TODO(), opts) + return kubeClient.CoreV1().Nodes().List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.CoreV1().Nodes().Watch(context.TODO(), opts) + return kubeClient.CoreV1().Nodes().Watch(ctx, opts) }, } } diff --git a/internal/store/persistentvolume.go b/internal/store/persistentvolume.go index 7b02696870..d6a1a92723 100644 --- a/internal/store/persistentvolume.go +++ b/internal/store/persistentvolume.go @@ -79,10 +79,10 @@ func wrapPersistentVolumeFunc(f func(*v1.PersistentVolume) *metric.Family) func( func createPersistentVolumeListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.CoreV1().PersistentVolumes().List(context.TODO(), opts) + return kubeClient.CoreV1().PersistentVolumes().List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.CoreV1().PersistentVolumes().Watch(context.TODO(), opts) + return kubeClient.CoreV1().PersistentVolumes().Watch(ctx, opts) }, } } diff --git a/internal/store/persistentvolumeclaim.go b/internal/store/persistentvolumeclaim.go index 033dd386fd..4fee337ea1 100644 --- a/internal/store/persistentvolumeclaim.go +++ b/internal/store/persistentvolumeclaim.go @@ -286,11 +286,11 @@ func createPersistentVolumeClaimListWatch(kubeClient clientset.Interface, ns str return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().PersistentVolumeClaims(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().PersistentVolumeClaims(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().PersistentVolumeClaims(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/pod.go b/internal/store/pod.go index e2d89549f0..8e0bd231be 100644 --- a/internal/store/pod.go +++ b/internal/store/pod.go @@ -1823,11 +1823,11 @@ func createPodListWatch(kubeClient clientset.Interface, ns string, fieldSelector return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Pods(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().Pods(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Pods(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().Pods(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/poddisruptionbudget.go b/internal/store/poddisruptionbudget.go index 3e1f09cbb9..378e9c9ada 100644 --- a/internal/store/poddisruptionbudget.go +++ b/internal/store/poddisruptionbudget.go @@ -206,11 +206,11 @@ func createPodDisruptionBudgetListWatch(kubeClient clientset.Interface, ns strin return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.PolicyV1().PodDisruptionBudgets(ns).List(context.TODO(), opts) + return kubeClient.PolicyV1().PodDisruptionBudgets(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.PolicyV1().PodDisruptionBudgets(ns).Watch(context.TODO(), opts) + return kubeClient.PolicyV1().PodDisruptionBudgets(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/replicaset.go b/internal/store/replicaset.go index 67cd70d307..05b686ccc8 100644 --- a/internal/store/replicaset.go +++ b/internal/store/replicaset.go @@ -274,11 +274,11 @@ func createReplicaSetListWatch(kubeClient clientset.Interface, ns string, fieldS return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().ReplicaSets(ns).List(context.TODO(), opts) + return kubeClient.AppsV1().ReplicaSets(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().ReplicaSets(ns).Watch(context.TODO(), opts) + return kubeClient.AppsV1().ReplicaSets(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/replicationcontroller.go b/internal/store/replicationcontroller.go index 6fb37fe3be..6c8ef5bece 100644 --- a/internal/store/replicationcontroller.go +++ b/internal/store/replicationcontroller.go @@ -230,11 +230,11 @@ func createReplicationControllerListWatch(kubeClient clientset.Interface, ns str return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ReplicationControllers(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().ReplicationControllers(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ReplicationControllers(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().ReplicationControllers(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/resourcequota.go b/internal/store/resourcequota.go index ea8008bac5..40dfb8c1be 100644 --- a/internal/store/resourcequota.go +++ b/internal/store/resourcequota.go @@ -158,11 +158,11 @@ func createResourceQuotaListWatch(kubeClient clientset.Interface, ns string, fie return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ResourceQuotas(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().ResourceQuotas(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ResourceQuotas(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().ResourceQuotas(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/role.go b/internal/store/role.go index c0eb1ccd20..e1f2267fb1 100644 --- a/internal/store/role.go +++ b/internal/store/role.go @@ -142,11 +142,11 @@ func createRoleListWatch(kubeClient clientset.Interface, ns string, fieldSelecto return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.RbacV1().Roles(ns).List(context.TODO(), opts) + return kubeClient.RbacV1().Roles(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.RbacV1().Roles(ns).Watch(context.TODO(), opts) + return kubeClient.RbacV1().Roles(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/rolebinding.go b/internal/store/rolebinding.go index 10af16aa5e..4327a4d52c 100644 --- a/internal/store/rolebinding.go +++ b/internal/store/rolebinding.go @@ -144,11 +144,11 @@ func createRoleBindingListWatch(kubeClient clientset.Interface, ns string, field return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.RbacV1().RoleBindings(ns).List(context.TODO(), opts) + return kubeClient.RbacV1().RoleBindings(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.RbacV1().RoleBindings(ns).Watch(context.TODO(), opts) + return kubeClient.RbacV1().RoleBindings(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/secret.go b/internal/store/secret.go index f85b866b63..0f49935b8b 100644 --- a/internal/store/secret.go +++ b/internal/store/secret.go @@ -222,11 +222,11 @@ func createSecretListWatch(kubeClient clientset.Interface, ns string, fieldSelec return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Secrets(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().Secrets(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Secrets(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().Secrets(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/service.go b/internal/store/service.go index f82c3fb31e..bcdb59977a 100644 --- a/internal/store/service.go +++ b/internal/store/service.go @@ -205,11 +205,11 @@ func createServiceListWatch(kubeClient clientset.Interface, ns string, fieldSele return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Services(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().Services(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().Services(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().Services(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/serviceaccount.go b/internal/store/serviceaccount.go index 322a17564f..4f8529a188 100644 --- a/internal/store/serviceaccount.go +++ b/internal/store/serviceaccount.go @@ -239,11 +239,11 @@ func createServiceAccountListWatch(kubeClient clientset.Interface, ns string, fi return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ServiceAccounts(ns).List(context.TODO(), opts) + return kubeClient.CoreV1().ServiceAccounts(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.CoreV1().ServiceAccounts(ns).Watch(context.TODO(), opts) + return kubeClient.CoreV1().ServiceAccounts(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/statefulset.go b/internal/store/statefulset.go index 23722f2a9e..3268ea7925 100644 --- a/internal/store/statefulset.go +++ b/internal/store/statefulset.go @@ -342,11 +342,11 @@ func createStatefulSetListWatch(kubeClient clientset.Interface, ns string, field return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().StatefulSets(ns).List(context.TODO(), opts) + return kubeClient.AppsV1().StatefulSets(ns).List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector - return kubeClient.AppsV1().StatefulSets(ns).Watch(context.TODO(), opts) + return kubeClient.AppsV1().StatefulSets(ns).Watch(ctx, opts) }, } } diff --git a/internal/store/storageclass.go b/internal/store/storageclass.go index 40d130d384..bdd0fc16cd 100644 --- a/internal/store/storageclass.go +++ b/internal/store/storageclass.go @@ -149,10 +149,10 @@ func wrapStorageClassFunc(f func(*storagev1.StorageClass) *metric.Family) func(i func createStorageClassListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.StorageV1().StorageClasses().List(context.TODO(), opts) + return kubeClient.StorageV1().StorageClasses().List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.StorageV1().StorageClasses().Watch(context.TODO(), opts) + return kubeClient.StorageV1().StorageClasses().Watch(ctx, opts) }, } } diff --git a/internal/store/validatingwebhookconfiguration.go b/internal/store/validatingwebhookconfiguration.go index 99c5df6826..9ebc814cc2 100644 --- a/internal/store/validatingwebhookconfiguration.go +++ b/internal/store/validatingwebhookconfiguration.go @@ -114,10 +114,10 @@ var ( func createValidatingWebhookConfigurationListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().List(context.TODO(), opts) + return kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().Watch(context.TODO(), opts) + return kubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations().Watch(ctx, opts) }, } } diff --git a/internal/store/volumeattachment.go b/internal/store/volumeattachment.go index d6aebe3d50..0f3783633d 100644 --- a/internal/store/volumeattachment.go +++ b/internal/store/volumeattachment.go @@ -170,10 +170,10 @@ func wrapVolumeAttachmentFunc(f func(*storagev1.VolumeAttachment) *metric.Family func createVolumeAttachmentListWatch(kubeClient clientset.Interface, _ string, _ string) cache.ListerWatcherWithContext { return &cache.ListWatch{ ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return kubeClient.StorageV1().VolumeAttachments().List(context.TODO(), opts) + return kubeClient.StorageV1().VolumeAttachments().List(ctx, opts) }, WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { - return kubeClient.StorageV1().VolumeAttachments().Watch(context.TODO(), opts) + return kubeClient.StorageV1().VolumeAttachments().Watch(ctx, opts) }, } } diff --git a/pkg/app/server_test.go b/pkg/app/server_test.go index afc7d82e03..eb4fee3bfb 100644 --- a/pkg/app/server_test.go +++ b/pkg/app/server_test.go @@ -627,9 +627,6 @@ func TestCustomResourceExtension(t *testing.T) { customResourceClients[f.Name()] = customResourceClient } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - reg := prometheus.NewRegistry() builder := store.NewBuilder() builder.WithCustomResourceStoreFactories(factories...) @@ -659,7 +656,7 @@ func TestCustomResourceExtension(t *testing.T) { }) handler := metricshandler.New(&options.Options{}, kubeClient, builder, false) - handler.ConfigureSharding(ctx, 0, 1) + handler.ConfigureSharding(builder.ctx, 0, 1) // Wait for caches to fill time.Sleep(time.Second) @@ -968,14 +965,14 @@ func (f *fooFactory) ExpectedType() interface{} { return &samplev1alpha1.Foo{} } -func (f *fooFactory) ListWatch(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher { +func (f *fooFactory) ListWatch(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcherWithContext { client := customResourceClient.(*samplefake.Clientset) return &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { opts.FieldSelector = fieldSelector return client.SamplecontrollerV1alpha1().Foos(ns).List(context.Background(), opts) }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { opts.FieldSelector = fieldSelector return client.SamplecontrollerV1alpha1().Foos(ns).Watch(context.Background(), opts) }, diff --git a/pkg/builder/builder_test.go b/pkg/builder/builder_test.go index 4ecd59a29e..cde6f5493e 100644 --- a/pkg/builder/builder_test.go +++ b/pkg/builder/builder_test.go @@ -65,7 +65,7 @@ func TestBuilderWithCustomStore(t *testing.T) { func customStore(_ []generator.FamilyGenerator, _ interface{}, - _ func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher, + _ func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcherWithContext, _ bool, _ int64, ) []cache.Store { diff --git a/pkg/builder/types/interfaces.go b/pkg/builder/types/interfaces.go index c1586e2ef9..a9d31bc913 100644 --- a/pkg/builder/types/interfaces.go +++ b/pkg/builder/types/interfaces.go @@ -64,7 +64,7 @@ type BuildStoresFunc func(metricFamilies []generator.FamilyGenerator, type BuildCustomResourceStoresFunc func(resourceName string, metricFamilies []generator.FamilyGenerator, expectedType interface{}, - listWatchFunc func(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher, + listWatchFunc func(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcherWithContext, useAPIServerCache bool, limit int64, ) []cache.Store diff --git a/pkg/customresource/registry_factory.go b/pkg/customresource/registry_factory.go index 2edc0fe1eb..5650b62268 100644 --- a/pkg/customresource/registry_factory.go +++ b/pkg/customresource/registry_factory.go @@ -114,5 +114,5 @@ type RegistryFactory interface { // }, // } // } - ListWatch(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher + ListWatch(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcherWithContext } diff --git a/pkg/customresourcestate/custom_resource_metrics.go b/pkg/customresourcestate/custom_resource_metrics.go index 9ca2700835..ed707489cd 100644 --- a/pkg/customresourcestate/custom_resource_metrics.go +++ b/pkg/customresourcestate/custom_resource_metrics.go @@ -90,15 +90,14 @@ func (s customResourceMetrics) ExpectedType() interface{} { return &u } -func (s customResourceMetrics) ListWatch(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher { +func (s customResourceMetrics) ListWatch(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcherWithContext { api := customResourceClient.(dynamic.NamespaceableResourceInterface).Namespace(ns) - ctx := context.Background() return &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + ListWithContextFunc: func(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { options.FieldSelector = fieldSelector return api.List(ctx, options) }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + WatchFuncWithContext: func(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) { options.FieldSelector = fieldSelector return api.Watch(ctx, options) }, From a9067ff3e395ed987ef5f3da4cd8429a6f2f2400 Mon Sep 17 00:00:00 2001 From: Omar Nasser Date: Mon, 11 Aug 2025 19:18:47 +0300 Subject: [PATCH 3/3] Convert ListerWatcher to ListerWatcherWithContext --- internal/store/builder.go | 25 +++++++++++-------------- pkg/sharding/listwatch.go | 17 +++++++++-------- pkg/watch/watch.go | 15 ++++++++------- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/internal/store/builder.go b/internal/store/builder.go index 7100577997..8c364ce375 100644 --- a/internal/store/builder.go +++ b/internal/store/builder.go @@ -522,7 +522,6 @@ func (b *Builder) buildStores( metricFamilies = generator.FilterFamilyGenerators(b.familyGeneratorFilter, metricFamilies) composedMetricGenFuncs := generator.ComposeMetricGenFuncs(metricFamilies) familyHeaders := generator.ExtractMetricFamilyHeaders(metricFamilies) - var listerWatcher func(kubeClient clientset.Interface, ns string, fieldSelector string) cache.ListerWatcher if b.namespaces.IsAllNamespaces() { store := metricsstore.NewMetricsStore( @@ -532,8 +531,8 @@ func (b *Builder) buildStores( if b.fieldSelectorFilter != "" { klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) } - listWatcher := listerWatcher(b.kubeClient, v1.NamespaceAll, b.fieldSelectorFilter) - b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) + listWatcherWithContext := listWatchWithContextFunc(b.kubeClient, v1.NamespaceAll, b.fieldSelectorFilter) + b.startReflector(expectedType, store, listWatcherWithContext, useAPIServerCache, objectLimit) return []cache.Store{store} } @@ -546,8 +545,8 @@ func (b *Builder) buildStores( if b.fieldSelectorFilter != "" { klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) } - listWatcher := listerWatcher(b.kubeClient, ns, b.fieldSelectorFilter) - b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) + listWatcherWithContext := listWatchWithContextFunc(b.kubeClient, ns, b.fieldSelectorFilter) + b.startReflector(expectedType, store, listWatcherWithContext, useAPIServerCache, objectLimit) stores = append(stores, store) } @@ -566,8 +565,6 @@ func (b *Builder) buildCustomResourceStores(resourceName string, familyHeaders := generator.ExtractMetricFamilyHeaders(metricFamilies) - var listerWatcher func(customResourceClient interface{}, ns string, fieldSelector string) cache.ListerWatcher - gvr, err := util.GVRFromType(resourceName, expectedType) if err != nil { klog.ErrorS(err, "Failed to get GVR from type", "resourceName", resourceName, "expectedType", expectedType) @@ -592,8 +589,8 @@ func (b *Builder) buildCustomResourceStores(resourceName string, if b.fieldSelectorFilter != "" { klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) } - listWatcher := listerWatcher(customResourceClient, v1.NamespaceAll, b.fieldSelectorFilter) - b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) + listWatcherWithContext := listWatchWithContextFunc(customResourceClient, v1.NamespaceAll, b.fieldSelectorFilter) + b.startReflector(expectedType, store, listWatcherWithContext, useAPIServerCache, objectLimit) return []cache.Store{store} } @@ -604,8 +601,8 @@ func (b *Builder) buildCustomResourceStores(resourceName string, composedMetricGenFuncs, ) klog.InfoS("FieldSelector is used", "fieldSelector", b.fieldSelectorFilter) - listWatcher := listerWatcher(customResourceClient, ns, b.fieldSelectorFilter) - b.startReflector(expectedType, store, listWatcher, useAPIServerCache, objectLimit) + listWatcherWithContext := listWatchWithContextFunc(customResourceClient, ns, b.fieldSelectorFilter) + b.startReflector(expectedType, store, listWatcherWithContext, useAPIServerCache, objectLimit) stores = append(stores, store) } @@ -617,12 +614,12 @@ func (b *Builder) buildCustomResourceStores(resourceName string, func (b *Builder) startReflector( expectedType interface{}, store cache.Store, - listWatcher cache.ListerWatcher, + listWatcherWithContext cache.ListerWatcherWithContext, useAPIServerCache bool, objectLimit int64, ) { - instrumentedListWatch := watch.NewInstrumentedListerWatcher(listWatcher, b.listWatchMetrics, reflect.TypeOf(expectedType).String(), useAPIServerCache, objectLimit) - reflector := cache.NewReflectorWithOptions(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatch), expectedType, store, cache.ReflectorOptions{ResyncPeriod: 0}) + instrumentedListWatchWithContext := watch.NewInstrumentedListerWatcher(listWatcherWithContext, b.listWatchMetrics, reflect.TypeOf(expectedType).String(), useAPIServerCache, objectLimit) + reflector := cache.NewReflectorWithOptions(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatchWithContext), expectedType, store, cache.ReflectorOptions{ResyncPeriod: 0}) if cr, ok := expectedType.(*unstructured.Unstructured); ok { go reflector.Run((*b.GVKToReflectorStopChanMap)[cr.GroupVersionKind().String()]) } else { diff --git a/pkg/sharding/listwatch.go b/pkg/sharding/listwatch.go index 99ca091a8c..05e9a714e7 100644 --- a/pkg/sharding/listwatch.go +++ b/pkg/sharding/listwatch.go @@ -17,6 +17,7 @@ limitations under the License. package sharding import ( + "context" "hash/fnv" jump "github.com/dgryski/go-jump" @@ -29,23 +30,23 @@ import ( type shardedListWatch struct { sharding *sharding - lw cache.ListerWatcher + lwc cache.ListerWatcherWithContext } // NewShardedListWatch returns a new shardedListWatch via the cache.ListerWatcher interface. // In the case of no sharding needed, it returns the provided cache.ListerWatcher -func NewShardedListWatch(shard int32, totalShards int, lw cache.ListerWatcher) cache.ListerWatcher { +func NewShardedListWatch(shard int32, totalShards int, lwc cache.ListerWatcherWithContext) cache.ListerWatcherWithContext { // This is an "optimization" as this configuration means no sharding is to // be performed. if shard == 0 && totalShards == 1 { - return lw + return lwc } - return &shardedListWatch{sharding: &sharding{shard: shard, totalShards: totalShards}, lw: lw} + return &shardedListWatch{sharding: &sharding{shard: shard, totalShards: totalShards}, lwc: lwc} } -func (s *shardedListWatch) List(options metav1.ListOptions) (runtime.Object, error) { - list, err := s.lw.List(options) +func (s *shardedListWatch) ListWithContext(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + list, err := s.lwc.ListWithContext(ctx, options) if err != nil { return nil, err } @@ -74,8 +75,8 @@ func (s *shardedListWatch) List(options metav1.ListOptions) (runtime.Object, err return res, nil } -func (s *shardedListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) { - w, err := s.lw.Watch(options) +func (s *shardedListWatch) WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) { + w, err := s.lwc.WatchWithContext(ctx, options) if err != nil { return nil, err } diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go index d65b8a3d74..07f894eed5 100644 --- a/pkg/watch/watch.go +++ b/pkg/watch/watch.go @@ -17,6 +17,7 @@ limitations under the License. package watch import ( + "context" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "k8s.io/apimachinery/pkg/api/meta" @@ -73,7 +74,7 @@ func NewListWatchMetrics(r prometheus.Registerer) *ListWatchMetrics { // InstrumentedListerWatcher provides the kube_state_metrics_watch_total metric // with a cache.ListerWatcher obj and the related resource. type InstrumentedListerWatcher struct { - lw cache.ListerWatcher + lwc cache.ListerWatcherWithContext metrics *ListWatchMetrics resource string useAPIServerCache bool @@ -81,9 +82,9 @@ type InstrumentedListerWatcher struct { } // NewInstrumentedListerWatcher returns a new InstrumentedListerWatcher. -func NewInstrumentedListerWatcher(lw cache.ListerWatcher, metrics *ListWatchMetrics, resource string, useAPIServerCache bool, limit int64) cache.ListerWatcher { +func NewInstrumentedListerWatcher(lwc cache.ListerWatcherWithContext, metrics *ListWatchMetrics, resource string, useAPIServerCache bool, limit int64) cache.ListerWatcherWithContext { return &InstrumentedListerWatcher{ - lw: lw, + lwc: lwc, metrics: metrics, resource: resource, useAPIServerCache: useAPIServerCache, @@ -95,7 +96,7 @@ func NewInstrumentedListerWatcher(lw cache.ListerWatcher, metrics *ListWatchMetr // / counters based on the outcome of the List operation it instruments. // It supports setting object limits, this means if it is set it will only list and process // n objects of the same resource type. -func (i *InstrumentedListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) { +func (i *InstrumentedListerWatcher) ListWithContext(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { if i.useAPIServerCache { options.ResourceVersion = "0" @@ -106,7 +107,7 @@ func (i *InstrumentedListerWatcher) List(options metav1.ListOptions) (runtime.Ob i.metrics.ListObjectsLimit.WithLabelValues(i.resource).Set(float64(i.limit)) } - res, err := i.lw.List(options) + res, err := i.lwc.ListWithContext(ctx, options) if err != nil { i.metrics.ListRequestsTotal.WithLabelValues("error", i.resource).Inc() @@ -134,8 +135,8 @@ func (i *InstrumentedListerWatcher) List(options metav1.ListOptions) (runtime.Ob // Watch is a wrapper func around the cache.ListerWatcher.Watch func. It increases the success/error // counters based on the outcome of the Watch operation it instruments. -func (i *InstrumentedListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { - res, err := i.lw.Watch(options) +func (i *InstrumentedListerWatcher) WatchWithContext(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) { + res, err := i.lwc.WatchWithContext(ctx, options) if err != nil { i.metrics.WatchRequestsTotal.WithLabelValues("error", i.resource).Inc() return nil, err