Skip to content

Commit 6a22b8a

Browse files
czeslavorainest
andauthored
[Backport release/2.5.x] fix: make status queue not block when no sub exists (#4267) (#4280)
Co-authored-by: Travis Raines <571832+rainest@users.noreply.github.com>
1 parent aaf41c5 commit 6a22b8a

File tree

8 files changed

+189
-14
lines changed

8 files changed

+189
-14
lines changed

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,20 @@
5050
- [0.0.5](#005)
5151
- [0.0.4 and prior](#004-and-prior)
5252

53+
## [2.5.1]
54+
55+
> Release date: TBD
56+
57+
#### Fixed
58+
59+
- Fixed a bug where the controller sync loop would get stuck when a number of
60+
updates for one of Gateway API resources kinds (`HTTPRoute`, `TCPRoute`,
61+
`UDPRoute`, `TLSRoute`) exceeded 8192. This was caused by the fact that the
62+
controller was using a fixed-size buffer to store updates for each resource
63+
kind and there were no consumers for the updates. The sending was blocked
64+
after a buffer got full, resulting in a deadlock.
65+
[#4267](https://github.com/Kong/kubernetes-ingress-controller/pull/4267)
66+
5367
## [2.5.0]
5468

5569
> Release date: 2022-07-11

internal/controllers/gateway/httproute_controller.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"k8s.io/apimachinery/pkg/api/errors"
1010
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1111
"k8s.io/apimachinery/pkg/runtime"
12+
"k8s.io/apimachinery/pkg/runtime/schema"
1213
"k8s.io/apimachinery/pkg/types"
1314
ctrl "sigs.k8s.io/controller-runtime"
1415
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -21,6 +22,7 @@ import (
2122
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
2223

2324
"github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane"
25+
"github.com/kong/kubernetes-ingress-controller/v2/internal/util/kubernetes/object/status"
2426
)
2527

2628
// -----------------------------------------------------------------------------
@@ -34,6 +36,7 @@ type HTTPRouteReconciler struct {
3436
Log logr.Logger
3537
Scheme *runtime.Scheme
3638
DataplaneClient *dataplane.KongClient
39+
StatusQueue *status.Queue
3740
}
3841

3942
// SetupWithManager sets up the controller with the Manager.
@@ -76,6 +79,19 @@ func (r *HTTPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
7679
return err
7780
}
7881

82+
if r.StatusQueue != nil {
83+
if err := c.Watch(
84+
&source.Channel{Source: r.StatusQueue.Subscribe(schema.GroupVersionKind{
85+
Group: gatewayv1alpha2.GroupVersion.Group,
86+
Version: gatewayv1alpha2.GroupVersion.Version,
87+
Kind: "TLSRoute",
88+
})},
89+
&handler.EnqueueRequestForObject{},
90+
); err != nil {
91+
return err
92+
}
93+
}
94+
7995
// because of the additional burden of having to manage reference data-plane
8096
// configurations for HTTPRoute objects in the underlying Kong Gateway, we
8197
// simply reconcile ALL HTTPRoute objects. This allows us to drop the backend
@@ -223,8 +239,8 @@ func (r *HTTPRouteReconciler) listHTTPRoutesForGateway(obj client.Object) []reco
223239
// HTTPRoute Controller - Reconciliation
224240
// -----------------------------------------------------------------------------
225241

226-
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=httproutes,verbs=get;list;watch
227-
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=httproutes/status,verbs=get;update
242+
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=httproutes,verbs=get;list;watch
243+
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=httproutes/status,verbs=get;update
228244

229245
// Reconcile is part of the main kubernetes reconciliation loop which aims to
230246
// move the current state of the cluster closer to the desired state.

internal/controllers/gateway/tcproute_controller.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"k8s.io/apimachinery/pkg/api/errors"
1010
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1111
"k8s.io/apimachinery/pkg/runtime"
12+
"k8s.io/apimachinery/pkg/runtime/schema"
1213
"k8s.io/apimachinery/pkg/types"
1314
ctrl "sigs.k8s.io/controller-runtime"
1415
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -21,6 +22,7 @@ import (
2122
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
2223

2324
"github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane"
25+
"github.com/kong/kubernetes-ingress-controller/v2/internal/util/kubernetes/object/status"
2426
)
2527

2628
// -----------------------------------------------------------------------------
@@ -34,6 +36,7 @@ type TCPRouteReconciler struct {
3436
Log logr.Logger
3537
Scheme *runtime.Scheme
3638
DataplaneClient *dataplane.KongClient
39+
StatusQueue *status.Queue
3740
}
3841

3942
// SetupWithManager sets up the controller with the Manager.
@@ -76,6 +79,19 @@ func (r *TCPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
7679
return err
7780
}
7881

82+
if r.StatusQueue != nil {
83+
if err := c.Watch(
84+
&source.Channel{Source: r.StatusQueue.Subscribe(schema.GroupVersionKind{
85+
Group: gatewayv1alpha2.GroupVersion.Group,
86+
Version: gatewayv1alpha2.GroupVersion.Version,
87+
Kind: "TCPRoute",
88+
})},
89+
&handler.EnqueueRequestForObject{},
90+
); err != nil {
91+
return err
92+
}
93+
}
94+
7995
// because of the additional burden of having to manage reference data-plane
8096
// configurations for TCPRoute objects in the underlying Kong Gateway, we
8197
// simply reconcile ALL TCPRoute objects. This allows us to drop the backend
@@ -223,8 +239,8 @@ func (r *TCPRouteReconciler) listTCPRoutesForGateway(obj client.Object) []reconc
223239
// TCPRoute Controller - Reconciliation
224240
// -----------------------------------------------------------------------------
225241

226-
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tcproutes,verbs=get;list;watch
227-
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tcproutes/status,verbs=get;update
242+
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tcproutes,verbs=get;list;watch
243+
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tcproutes/status,verbs=get;update
228244

229245
// Reconcile is part of the main kubernetes reconciliation loop which aims to
230246
// move the current state of the cluster closer to the desired state.

internal/controllers/gateway/tlsroute_controller.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"k8s.io/apimachinery/pkg/api/errors"
1010
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1111
"k8s.io/apimachinery/pkg/runtime"
12+
"k8s.io/apimachinery/pkg/runtime/schema"
1213
"k8s.io/apimachinery/pkg/types"
1314
ctrl "sigs.k8s.io/controller-runtime"
1415
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -21,6 +22,7 @@ import (
2122
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
2223

2324
"github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane"
25+
"github.com/kong/kubernetes-ingress-controller/v2/internal/util/kubernetes/object/status"
2426
)
2527

2628
// -----------------------------------------------------------------------------
@@ -34,6 +36,7 @@ type TLSRouteReconciler struct {
3436
Log logr.Logger
3537
Scheme *runtime.Scheme
3638
DataplaneClient *dataplane.KongClient
39+
StatusQueue *status.Queue
3740
}
3841

3942
// SetupWithManager sets up the controller with the Manager.
@@ -76,6 +79,19 @@ func (r *TLSRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
7679
return err
7780
}
7881

82+
if r.StatusQueue != nil {
83+
if err := c.Watch(
84+
&source.Channel{Source: r.StatusQueue.Subscribe(schema.GroupVersionKind{
85+
Group: gatewayv1alpha2.GroupVersion.Group,
86+
Version: gatewayv1alpha2.GroupVersion.Version,
87+
Kind: "TLSRoute",
88+
})},
89+
&handler.EnqueueRequestForObject{},
90+
); err != nil {
91+
return err
92+
}
93+
}
94+
7995
// because of the additional burden of having to manage reference data-plane
8096
// configurations for TLSRoute objects in the underlying Kong Gateway, we
8197
// simply reconcile ALL TLSRoute objects. This allows us to drop the backend
@@ -223,8 +239,8 @@ func (r *TLSRouteReconciler) listTLSRoutesForGateway(obj client.Object) []reconc
223239
// TLSRoute Controller - Reconciliation
224240
// -----------------------------------------------------------------------------
225241

226-
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tlsroutes,verbs=get;list;watch
227-
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tlsroutes/status,verbs=get;update
242+
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tlsroutes,verbs=get;list;watch
243+
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=tlsroutes/status,verbs=get;update
228244

229245
// Reconcile is part of the main kubernetes reconciliation loop which aims to
230246
// move the current state of the cluster closer to the desired state.

internal/controllers/gateway/udproute_controller.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"k8s.io/apimachinery/pkg/api/errors"
1010
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1111
"k8s.io/apimachinery/pkg/runtime"
12+
"k8s.io/apimachinery/pkg/runtime/schema"
1213
"k8s.io/apimachinery/pkg/types"
1314
ctrl "sigs.k8s.io/controller-runtime"
1415
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -21,6 +22,7 @@ import (
2122
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
2223

2324
"github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane"
25+
"github.com/kong/kubernetes-ingress-controller/v2/internal/util/kubernetes/object/status"
2426
)
2527

2628
// -----------------------------------------------------------------------------
@@ -34,6 +36,7 @@ type UDPRouteReconciler struct {
3436
Log logr.Logger
3537
Scheme *runtime.Scheme
3638
DataplaneClient *dataplane.KongClient
39+
StatusQueue *status.Queue
3740
}
3841

3942
// SetupWithManager sets up the controller with the Manager.
@@ -76,6 +79,19 @@ func (r *UDPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error {
7679
return err
7780
}
7881

82+
if r.StatusQueue != nil {
83+
if err := c.Watch(
84+
&source.Channel{Source: r.StatusQueue.Subscribe(schema.GroupVersionKind{
85+
Group: gatewayv1alpha2.GroupVersion.Group,
86+
Version: gatewayv1alpha2.GroupVersion.Version,
87+
Kind: "UDPRoute",
88+
})},
89+
&handler.EnqueueRequestForObject{},
90+
); err != nil {
91+
return err
92+
}
93+
}
94+
7995
// because of the additional burden of having to manage reference data-plane
8096
// configurations for UDPRoute objects in the underlying Kong Gateway, we
8197
// simply reconcile ALL UDPRoute objects. This allows us to drop the backend
@@ -223,8 +239,8 @@ func (r *UDPRouteReconciler) listUDPRoutesForGateway(obj client.Object) []reconc
223239
// UDPRoute Controller - Reconciliation
224240
// -----------------------------------------------------------------------------
225241

226-
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=udproutes,verbs=get;list;watch
227-
//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=udproutes/status,verbs=get;update
242+
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=udproutes,verbs=get;list;watch
243+
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=udproutes/status,verbs=get;update
228244

229245
// Reconcile is part of the main kubernetes reconciliation loop which aims to
230246
// move the current state of the cluster closer to the desired state.

internal/manager/controllerdef.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ func setupControllers(
312312
Log: ctrl.Log.WithName("controllers").WithName("HTTPRoute"),
313313
Scheme: mgr.GetScheme(),
314314
DataplaneClient: dataplaneClient,
315+
StatusQueue: kubernetesStatusQueue,
315316
},
316317
},
317318
{
@@ -328,6 +329,7 @@ func setupControllers(
328329
Log: ctrl.Log.WithName("controllers").WithName("UDPRoute"),
329330
Scheme: mgr.GetScheme(),
330331
DataplaneClient: dataplaneClient,
332+
StatusQueue: kubernetesStatusQueue,
331333
},
332334
},
333335
{
@@ -344,6 +346,7 @@ func setupControllers(
344346
Log: ctrl.Log.WithName("controllers").WithName("TCPRoute"),
345347
Scheme: mgr.GetScheme(),
346348
DataplaneClient: dataplaneClient,
349+
StatusQueue: kubernetesStatusQueue,
347350
},
348351
},
349352
{
@@ -360,6 +363,7 @@ func setupControllers(
360363
Log: ctrl.Log.WithName("controllers").WithName("TLSRoute"),
361364
Scheme: mgr.GetScheme(),
362365
DataplaneClient: dataplaneClient,
366+
StatusQueue: kubernetesStatusQueue,
363367
},
364368
},
365369
}

internal/util/kubernetes/object/status/queue.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,11 @@ func NewQueue() *Queue {
4646
// Publish emits a GenericEvent for the provided objects that indicates to
4747
// subscribers that the status of that object needs to be updated.
4848
func (q *Queue) Publish(obj client.Object) {
49-
ch := q.getChanForKind(obj.GetObjectKind().GroupVersionKind())
49+
ch, ok := q.getChanForKind(obj.GetObjectKind().GroupVersionKind())
50+
if !ok {
51+
// There's no subscriber for this object kind - nothing to do.
52+
return
53+
}
5054
ch <- event.GenericEvent{Object: obj}
5155
}
5256

@@ -59,20 +63,32 @@ func (q *Queue) Publish(obj client.Object) {
5963
// be duplicated and each subscriber will receive events on a first come first
6064
// serve basis.
6165
func (q *Queue) Subscribe(gvk schema.GroupVersionKind) chan event.GenericEvent {
62-
return q.getChanForKind(gvk)
66+
return q.getOrCreateChanForKind(gvk)
6367
}
6468

6569
// ----------------------------------------------------------------------------
6670
// Queue - Private Methods
6771
// ----------------------------------------------------------------------------
6872

69-
func (q *Queue) getChanForKind(gvk schema.GroupVersionKind) chan event.GenericEvent {
73+
// getOrCreateChanForKind returns the subscription channel for the provided object GVK.
74+
// If the channel does not exist, it will be created.
75+
func (q *Queue) getOrCreateChanForKind(gvk schema.GroupVersionKind) chan event.GenericEvent {
7076
q.lock.Lock()
7177
defer q.lock.Unlock()
7278
ch, ok := q.channels[gvk.String()]
73-
if !ok { // if there's no channel built for this kind yet, make it
79+
if !ok {
80+
// If there's no channel built for this kind yet, make it.
7481
ch = make(chan event.GenericEvent, defaultBufferSize)
7582
q.channels[gvk.String()] = ch
7683
}
7784
return ch
7885
}
86+
87+
// getChanForKind returns the subscription channel for the provided object GVK.
88+
// The second return value indicates whether the channel exists.
89+
func (q *Queue) getChanForKind(gvk schema.GroupVersionKind) (chan event.GenericEvent, bool) {
90+
q.lock.RLock()
91+
defer q.lock.RUnlock()
92+
ch, ok := q.channels[gvk.String()]
93+
return ch, ok
94+
}

0 commit comments

Comments
 (0)