Skip to content

Commit 5ea66dd

Browse files
committed
add a new syncer for endpointSlices
1 parent f6616f9 commit 5ea66dd

File tree

2 files changed

+209
-0
lines changed

2 files changed

+209
-0
lines changed
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package endpoints
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
discoveryv1 `k8s.io/api/discovery/v1`
7+
8+
"github.com/loft-sh/vcluster/pkg/mappings"
9+
"github.com/loft-sh/vcluster/pkg/patcher"
10+
"github.com/loft-sh/vcluster/pkg/pro"
11+
"github.com/loft-sh/vcluster/pkg/specialservices"
12+
"github.com/loft-sh/vcluster/pkg/syncer"
13+
"github.com/loft-sh/vcluster/pkg/syncer/synccontext"
14+
"github.com/loft-sh/vcluster/pkg/syncer/translator"
15+
syncertypes "github.com/loft-sh/vcluster/pkg/syncer/types"
16+
"github.com/loft-sh/vcluster/pkg/util/translate"
17+
corev1 "k8s.io/api/core/v1"
18+
kerrors "k8s.io/apimachinery/pkg/api/errors"
19+
"k8s.io/apimachinery/pkg/types"
20+
"k8s.io/klog/v2"
21+
ctrl "sigs.k8s.io/controller-runtime"
22+
"sigs.k8s.io/controller-runtime/pkg/client"
23+
)
24+
25+
func New(ctx *synccontext.RegisterContext) (syncertypes.Object, error) {
26+
mapper, err := ctx.Mappings.ByGVK(mappings.Endpoints())
27+
if err != nil {
28+
return nil, err
29+
}
30+
31+
return &endpointSliceSyncer{
32+
GenericTranslator: translator.NewGenericTranslator(ctx, "endpointslice", &discoveryv1.EndpointSlice{}, mapper),
33+
34+
excludedAnnotations: []string{
35+
"control-plane.alpha.kubernetes.io/leader",
36+
},
37+
}, nil
38+
}
39+
40+
type endpointSliceSyncer struct {
41+
syncertypes.GenericTranslator
42+
43+
excludedAnnotations []string
44+
}
45+
46+
var _ syncertypes.OptionsProvider = &endpointSliceSyncer{}
47+
48+
func (s *endpointSliceSyncer) Options() *syncertypes.Options {
49+
return &syncertypes.Options{
50+
ObjectCaching: true,
51+
}
52+
}
53+
54+
var _ syncertypes.Syncer = &endpointSliceSyncer{}
55+
56+
func (s *endpointSliceSyncer) Syncer() syncertypes.Sync[client.Object] {
57+
return syncer.ToGenericSyncer(s)
58+
}
59+
60+
func (s *endpointSliceSyncer) SyncToHost(ctx *synccontext.SyncContext, event *synccontext.SyncToHostEvent[*discoveryv1.EndpointSlice]) (ctrl.Result, error) {
61+
if event.HostOld != nil {
62+
return patcher.DeleteVirtualObject(ctx, event.Virtual, event.HostOld, "host object was deleted")
63+
}
64+
65+
pObj := s.translate(ctx, event.Virtual)
66+
err := pro.ApplyPatchesHostObject(ctx, nil, pObj, event.Virtual, ctx.Config.Sync.ToHost.EndpointSlices.Patches, false)
67+
if err != nil {
68+
return ctrl.Result{}, err
69+
}
70+
71+
return patcher.CreateHostObject(ctx, event.Virtual, pObj, s.EventRecorder(), false)
72+
}
73+
74+
func (s *endpointSliceSyncer) Sync(ctx *synccontext.SyncContext, event *synccontext.SyncEvent[*discoveryv1.EndpointSlice]) (_ ctrl.Result, retErr error) {
75+
patch, err := patcher.NewSyncerPatcher(ctx, event.Host, event.Virtual, patcher.TranslatePatches(ctx.Config.Sync.ToHost.EndpointSlices.Patches, false))
76+
if err != nil {
77+
return ctrl.Result{}, fmt.Errorf("new syncer patcher: %w", err)
78+
}
79+
defer func() {
80+
if err := patch.Patch(ctx, event.Host, event.Virtual); err != nil {
81+
retErr = errors.Join(retErr, err)
82+
}
83+
84+
if retErr != nil {
85+
s.EventRecorder().Eventf(event.Virtual, "Warning", "SyncError", "Error syncing: %v", retErr)
86+
}
87+
}()
88+
89+
err = s.translateUpdate(ctx, event.Host, event.Virtual)
90+
if err != nil {
91+
return ctrl.Result{}, err
92+
}
93+
94+
// bi-directional sync of annotations and labels
95+
event.Virtual.Annotations, event.Host.Annotations = translate.AnnotationsBidirectionalUpdate(event, s.excludedAnnotations...)
96+
event.Virtual.Labels, event.Host.Labels = translate.LabelsBidirectionalUpdate(event)
97+
98+
return ctrl.Result{}, nil
99+
}
100+
101+
func (s *endpointSliceSyncer) SyncToVirtual(ctx *synccontext.SyncContext, event *synccontext.SyncToVirtualEvent[*discoveryv1.EndpointSlice]) (_ ctrl.Result, retErr error) {
102+
// virtual object is not here anymore, so we delete
103+
return patcher.DeleteHostObject(ctx, event.Host, event.VirtualOld, "virtual object was deleted")
104+
}
105+
106+
var _ syncertypes.Starter = &endpointSliceSyncer{}
107+
108+
func (s *endpointSliceSyncer) ReconcileStart(ctx *synccontext.SyncContext, req ctrl.Request) (bool, error) {
109+
if req.NamespacedName == specialservices.DefaultKubernetesSvcKey {
110+
return true, nil
111+
}
112+
if specialservices.Default != nil {
113+
if _, ok := specialservices.Default.SpecialServicesToSync()[req.NamespacedName]; ok {
114+
return true, nil
115+
}
116+
}
117+
118+
svc := &corev1.Service{}
119+
err := ctx.VirtualClient.Get(ctx, types.NamespacedName{
120+
Namespace: req.Namespace,
121+
Name: req.Name,
122+
}, svc)
123+
if err != nil {
124+
if kerrors.IsNotFound(err) {
125+
return true, nil
126+
}
127+
128+
return true, err
129+
} else if svc.Spec.Selector != nil {
130+
// check if it was a managed endpointSlice object before and delete it
131+
endpointSlice := &discoveryv1.EndpointSlice{}
132+
err = ctx.PhysicalClient.Get(ctx, s.VirtualToHost(ctx, req.NamespacedName, nil), endpointSlice)
133+
if err != nil {
134+
if !kerrors.IsNotFound(err) {
135+
klog.Infof("Error retrieving endpointSliceList: %v", err)
136+
}
137+
138+
return true, nil
139+
}
140+
141+
// check if endpoints were created by us
142+
if endpointSlice.Annotations != nil && endpointSlice.Annotations[translate.NameAnnotation] != "" {
143+
// Deleting the endpointSlice is necessary here as some clusters would not correctly maintain
144+
// the endpointSlices if they were managed by us previously and now should be managed by Kubernetes.
145+
// In the worst case we would end up in a state where we have multiple endpoint slices pointing
146+
// to the same endpoints resulting in wrong DNS and cluster networking. Hence, deleting the previously
147+
// managed endpointSlices signals the Kubernetes controller to recreate the endpointSlices from the selector.
148+
klog.Infof("Refresh endpointSlice in physical cluster because they shouldn't be managed by vcluster anymore")
149+
err = ctx.PhysicalClient.Delete(ctx, endpointSlice)
150+
if err != nil {
151+
klog.Infof("Error deleting endpoints %s/%s: %v", endpointSlice.Namespace, endpointSlice.Name, err)
152+
return true, err
153+
}
154+
}
155+
156+
return true, nil
157+
}
158+
159+
// check if it was a Kubernetes managed endpointSlice object before and delete it
160+
endpointSlice := &discoveryv1.EndpointSlice{}
161+
err = ctx.PhysicalClient.Get(ctx, s.VirtualToHost(ctx, req.NamespacedName, nil), endpointSlice)
162+
if err == nil && (endpointSlice.Annotations == nil || endpointSlice.Annotations[translate.NameAnnotation] == "") {
163+
klog.Infof("Refresh endpointSlice in physical cluster because they should be managed by vCluster now")
164+
err = ctx.PhysicalClient.Delete(ctx, endpointSlice)
165+
if err != nil {
166+
klog.Infof("Error deleting endpointSlice %s/%s: %v", endpointSlice.Namespace, endpointSlice.Name, err)
167+
return true, err
168+
}
169+
}
170+
171+
return false, nil
172+
}
173+
174+
func (s *endpointSliceSyncer) ReconcileEnd() {}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package endpoints
2+
3+
import (
4+
"github.com/loft-sh/vcluster/pkg/mappings"
5+
"github.com/loft-sh/vcluster/pkg/syncer/synccontext"
6+
"github.com/loft-sh/vcluster/pkg/util/translate"
7+
discoveryv1 "k8s.io/api/discovery/v1"
8+
"k8s.io/apimachinery/pkg/types"
9+
"sigs.k8s.io/controller-runtime/pkg/client"
10+
)
11+
12+
func (s *endpointSliceSyncer) translate(ctx *synccontext.SyncContext, vObj client.Object) *discoveryv1.EndpointSlice {
13+
endpointSlice := translate.HostMetadata(vObj.(*discoveryv1.EndpointSlice), s.VirtualToHost(ctx, types.NamespacedName{Name: vObj.GetName(), Namespace: vObj.GetNamespace()}, vObj), s.excludedAnnotations...)
14+
s.translateSpec(ctx, endpointSlice)
15+
return endpointSlice
16+
}
17+
18+
func (s *endpointSliceSyncer) translateSpec(ctx *synccontext.SyncContext, endpointSlice *discoveryv1.EndpointSlice) {
19+
// translate the endpoints
20+
for i, ep := range endpointSlice.Endpoints {
21+
if ep.TargetRef != nil && ep.TargetRef.Kind == "Pod" {
22+
nameAndNamespace := mappings.VirtualToHost(ctx, ep.TargetRef.Name, ep.TargetRef.Namespace, mappings.Pods())
23+
endpointSlice.Endpoints[i].TargetRef.Name = nameAndNamespace.Name
24+
endpointSlice.Endpoints[i].TargetRef.Namespace = nameAndNamespace.Namespace
25+
}
26+
}
27+
}
28+
29+
func (s *endpointSliceSyncer) translateUpdate(ctx *synccontext.SyncContext, pObj, vObj *discoveryv1.EndpointSlice) error {
30+
// check endpointSlice.Endpoints
31+
translated := vObj.DeepCopy()
32+
s.translateSpec(ctx, translated)
33+
pObj.Endpoints = translated.Endpoints
34+
return nil
35+
}

0 commit comments

Comments
 (0)