diff --git a/controllers/clustercache/cluster_accessor.go b/controllers/clustercache/cluster_accessor.go index 0bbc92f2380d..01ca1831f2ea 100644 --- a/controllers/clustercache/cluster_accessor.go +++ b/controllers/clustercache/cluster_accessor.go @@ -191,6 +191,10 @@ type clusterAccessorLockedConnectionState struct { // all typed objects except the ones for which caching has been disabled via DisableFor. cachedClient client.Client + // uncachedClient to communicate with the workload cluster. + // It performs live GET/LIST calls directly against the API server with no caching. + uncachedClient client.Client + // cache is the cache used by the client. // It manages informers that have been created e.g. by adding indexes to the cache, // Get & List calls from the client or via the Watch method of the clusterAccessor. @@ -297,11 +301,12 @@ func (ca *clusterAccessor) Connect(ctx context.Context) (retErr error) { consecutiveFailures: 0, } ca.lockedState.connection = &clusterAccessorLockedConnectionState{ - restConfig: connection.RESTConfig, - restClient: connection.RESTClient, - cachedClient: connection.CachedClient, - cache: connection.Cache, - watches: sets.Set[string]{}, + restConfig: connection.RESTConfig, + restClient: connection.RESTClient, + cachedClient: connection.CachedClient, + uncachedClient: connection.UncachedClient, + cache: connection.Cache, + watches: sets.Set[string]{}, } return nil @@ -407,6 +412,18 @@ func (ca *clusterAccessor) GetReader(ctx context.Context) (client.Reader, error) return ca.lockedState.connection.cachedClient, nil } +// GetUncachedClient returns a live (uncached) client for the given cluster. +func (ca *clusterAccessor) GetUncachedClient(ctx context.Context) (client.Client, error) { + ca.rLock(ctx) + defer ca.rUnlock(ctx) + + if ca.lockedState.connection == nil { + return nil, errors.Wrapf(ErrClusterNotConnected, "error getting uncached client") + } + + return ca.lockedState.connection.uncachedClient, nil +} + func (ca *clusterAccessor) GetRESTConfig(ctx context.Context) (*rest.Config, error) { ca.rLock(ctx) defer ca.rUnlock(ctx) diff --git a/controllers/clustercache/cluster_accessor_client.go b/controllers/clustercache/cluster_accessor_client.go index 7e5ae2484626..8cc9a0a6340c 100644 --- a/controllers/clustercache/cluster_accessor_client.go +++ b/controllers/clustercache/cluster_accessor_client.go @@ -42,10 +42,11 @@ import ( ) type createConnectionResult struct { - RESTConfig *rest.Config - RESTClient *rest.RESTClient - CachedClient client.Client - Cache *stoppableCache + RESTConfig *rest.Config + RESTClient *rest.RESTClient + CachedClient client.Client + UncachedClient client.Client + Cache *stoppableCache } func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnectionResult, error) { @@ -97,6 +98,12 @@ func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnect if err != nil { return nil, errors.Wrapf(err, "error creating HTTP client and mapper (using in-cluster config)") } + + log.V(6).Info(fmt.Sprintf("Creating uncached client with updated REST config with host %q", restConfig.Host)) + uncachedClient, err = createUncachedClient(ca.config.Scheme, restConfig, httpClient, mapper) + if err != nil { + return nil, errors.Wrapf(err, "error creating uncached client (using in-cluster config)") + } } log.V(6).Info("Creating cached client and cache") @@ -106,10 +113,11 @@ func (ca *clusterAccessor) createConnection(ctx context.Context) (*createConnect } return &createConnectionResult{ - RESTConfig: restConfig, - RESTClient: restClient, - CachedClient: cachedClient, - Cache: cache, + RESTConfig: restConfig, + RESTClient: restClient, + CachedClient: cachedClient, + UncachedClient: uncachedClient, + Cache: cache, }, nil } @@ -208,7 +216,7 @@ func createUncachedClient(scheme *runtime.Scheme, config *rest.Config, httpClien return nil, errors.Wrapf(err, "error creating uncached client") } - return uncachedClient, nil + return newClientWithTimeout(uncachedClient, config.Timeout), nil } // createCachedClient creates a cached client for the given cluster, based on the rest.Config. diff --git a/controllers/clustercache/cluster_accessor_test.go b/controllers/clustercache/cluster_accessor_test.go index bfe5acdf1d88..c3fb117b69f1 100644 --- a/controllers/clustercache/cluster_accessor_test.go +++ b/controllers/clustercache/cluster_accessor_test.go @@ -76,8 +76,13 @@ func TestConnect(t *testing.T) { }, nil) accessor := newClusterAccessor(context.Background(), clusterKey, config) + // Before connect, getting the uncached client should fail with ErrClusterNotConnected + _, err := accessor.GetUncachedClient(ctx) + g.Expect(err).To(HaveOccurred()) + g.Expect(errors.Is(err, ErrClusterNotConnected)).To(BeTrue()) + // Connect when kubeconfig Secret doesn't exist (should fail) - err := accessor.Connect(ctx) + err = accessor.Connect(ctx) g.Expect(err).To(HaveOccurred()) g.Expect(err.Error()).To(Equal("error creating REST config: error getting kubeconfig secret: Secret \"test-cluster-kubeconfig\" not found")) g.Expect(accessor.Connected(ctx)).To(BeFalse()) @@ -136,6 +141,16 @@ func TestConnect(t *testing.T) { g.Expect(accessor.lockedState.healthChecking.lastProbeSuccessTime.IsZero()).To(BeFalse()) g.Expect(accessor.lockedState.healthChecking.consecutiveFailures).To(Equal(0)) + // After connect, getting the uncached client should succeed + r, err := accessor.GetUncachedClient(ctx) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(r).ToNot(BeNil()) + + // List Nodes via the uncached client + nodeListUncached := &corev1.NodeList{} + g.Expect(r.List(ctx, nodeListUncached)).To(Succeed()) + g.Expect(nodeListUncached.Items).To(BeEmpty()) + // Get client and test Get & List c, err := accessor.GetClient(ctx) g.Expect(err).ToNot(HaveOccurred()) @@ -150,6 +165,11 @@ func TestConnect(t *testing.T) { // Disconnect accessor.Disconnect(ctx) g.Expect(accessor.Connected(ctx)).To(BeFalse()) + + // After disconnect, getting the uncached client should fail with ErrClusterNotConnected + _, err = accessor.GetUncachedClient(ctx) + g.Expect(err).To(HaveOccurred()) + g.Expect(errors.Is(err, ErrClusterNotConnected)).To(BeTrue()) } func TestDisconnect(t *testing.T) { diff --git a/controllers/clustercache/cluster_cache.go b/controllers/clustercache/cluster_cache.go index 06d64f39863f..3d35562adac1 100644 --- a/controllers/clustercache/cluster_cache.go +++ b/controllers/clustercache/cluster_cache.go @@ -134,6 +134,10 @@ type ClusterCache interface { // If there is no connection to the workload cluster ErrClusterNotConnected will be returned. GetReader(ctx context.Context, cluster client.ObjectKey) (client.Reader, error) + // GetUncachedClient returns a live (uncached) client for the given cluster. + // If there is no connection to the workload cluster ErrClusterNotConnected will be returned. + GetUncachedClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) + // GetRESTConfig returns a REST config for the given cluster. // If there is no connection to the workload cluster ErrClusterNotConnected will be returned. GetRESTConfig(ctx context.Context, cluster client.ObjectKey) (*rest.Config, error) @@ -392,6 +396,16 @@ func (cc *clusterCache) GetReader(ctx context.Context, cluster client.ObjectKey) return accessor.GetReader(ctx) } +// GetUncachedClient returns a live (uncached) client for the given cluster. +// If there is no connection to the workload cluster ErrClusterNotConnected will be returned. +func (cc *clusterCache) GetUncachedClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) { + accessor := cc.getClusterAccessor(cluster) + if accessor == nil { + return nil, errors.Wrapf(ErrClusterNotConnected, "error getting uncached client") + } + return accessor.GetUncachedClient(ctx) +} + func (cc *clusterCache) GetRESTConfig(ctx context.Context, cluster client.ObjectKey) (*rest.Config, error) { accessor := cc.getClusterAccessor(cluster) if accessor == nil { diff --git a/controllers/clustercache/cluster_cache_fake.go b/controllers/clustercache/cluster_cache_fake.go index c8ab7f283797..f4f44b3da097 100644 --- a/controllers/clustercache/cluster_cache_fake.go +++ b/controllers/clustercache/cluster_cache_fake.go @@ -32,8 +32,9 @@ func NewFakeClusterCache(workloadClient client.Client, clusterKey client.ObjectK testCacheTracker.clusterAccessors[clusterKey] = &clusterAccessor{ lockedState: clusterAccessorLockedState{ connection: &clusterAccessorLockedConnectionState{ - cachedClient: workloadClient, - watches: sets.Set[string]{}.Insert(watchObjects...), + cachedClient: workloadClient, + uncachedClient: workloadClient, + watches: sets.Set[string]{}.Insert(watchObjects...), }, healthChecking: clusterAccessorLockedHealthCheckingState{ lastProbeTime: time.Now(),