Skip to content

Commit c0c4b26

Browse files
jparsaiJayendra Parsai
andauthored
feat: Populate cluster cache info in principal (#513)
Signed-off-by: Jayendra Parsai <jparsai@redhat.com> Co-authored-by: Jayendra Parsai <jparsai@jparsai-thinkpadp1gen4i.remote.csb>
1 parent edfedbd commit c0c4b26

27 files changed

+1019
-205
lines changed

agent/agent.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"sync/atomic"
2323
"time"
2424

25+
"github.com/argoproj-labs/argocd-agent/internal/argocd/cluster"
2526
"github.com/argoproj-labs/argocd-agent/internal/backend"
2627
kubeapp "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/application"
2728
kubeappproject "github.com/argoproj-labs/argocd-agent/internal/backend/kubernetes/appproject"
@@ -47,6 +48,8 @@ import (
4748

4849
appCache "github.com/argoproj-labs/argocd-agent/internal/cache"
4950
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
51+
cacheutil "github.com/argoproj/argo-cd/v3/util/cache"
52+
appstatecache "github.com/argoproj/argo-cd/v3/util/cache/appstate"
5053
ty "k8s.io/apimachinery/pkg/types"
5154
)
5255

@@ -94,6 +97,9 @@ type Agent struct {
9497

9598
// enableResourceProxy determines if the agent should proxy resources to the principal
9699
enableResourceProxy bool
100+
101+
cacheRefreshInterval time.Duration
102+
clusterCache *appstatecache.Cache
97103
}
98104

99105
const defaultQueueName = "default"
@@ -273,6 +279,13 @@ func NewAgent(ctx context.Context, client *kube.KubernetesClient, namespace stri
273279
connMap: map[string]connectionEntry{},
274280
}
275281

282+
clusterCache, err := cluster.NewClusterCacheInstance(ctx, client.Clientset,
283+
a.namespace, a.redisProxyMsgHandler.redisAddress, cacheutil.RedisCompressionGZip)
284+
if err != nil {
285+
return nil, fmt.Errorf("failed to create cluster cache instance: %v", err)
286+
}
287+
a.clusterCache = clusterCache
288+
276289
return a, nil
277290
}
278291

@@ -353,6 +366,23 @@ func (a *Agent) Start(ctx context.Context) error {
353366
go http.ListenAndServe(healthzAddr, nil)
354367
}
355368

369+
// Start the background process of periodic sync of cluster cache info.
370+
// This will send periodic updates of Application, Resource and API counts to principal.
371+
if a.mode == types.AgentModeManaged {
372+
go func() {
373+
ticker := time.NewTicker(a.cacheRefreshInterval)
374+
defer ticker.Stop()
375+
for {
376+
select {
377+
case <-ticker.C:
378+
a.addClusterCacheInfoUpdateToQueue()
379+
case <-a.context.Done():
380+
return
381+
}
382+
}
383+
}()
384+
}
385+
356386
if a.remote != nil {
357387
a.remote.SetClientMode(a.mode)
358388
// TODO: Right now, maintainConnection always returns nil. Revisit

agent/agent_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929

3030
func newAgent(t *testing.T) *Agent {
3131
t.Helper()
32-
kubec := kube.NewKubernetesFakeClientWithApps()
32+
kubec := kube.NewKubernetesFakeClientWithApps("argocd")
3333
remote, err := client.NewRemote("127.0.0.1", 8080)
3434
require.NoError(t, err)
3535
agent, err := NewAgent(context.TODO(), kubec, "argocd", WithRemote(remote))
@@ -38,7 +38,7 @@ func newAgent(t *testing.T) *Agent {
3838
}
3939

4040
func Test_NewAgent(t *testing.T) {
41-
kubec := kube.NewKubernetesFakeClientWithApps()
41+
kubec := kube.NewKubernetesFakeClientWithApps("agent")
4242
agent, err := NewAgent(context.TODO(), kubec, "agent", WithRemote(&client.Remote{}))
4343
require.NotNil(t, agent)
4444
require.NoError(t, err)

agent/inbound.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error {
457457
return err
458458
}
459459

460-
if a.mode == types.AgentModeManaged && err == nil {
460+
if a.mode == types.AgentModeManaged {
461461
appCache.DeleteApplicationSpec(app.UID, logCtx)
462462
}
463463

agent/inbound_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func Test_ProcessIncomingAppWithUIDMismatch(t *testing.T) {
103103
Name: "owner",
104104
UID: "uid-1",
105105
},
106-
},
106+
},
107107
}}
108108

109109
// oldApp is the app that is already present on the agent

agent/options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package agent
1616

1717
import (
1818
"fmt"
19+
"time"
1920

2021
"github.com/argoproj-labs/argocd-agent/pkg/client"
2122
"github.com/argoproj-labs/argocd-agent/pkg/types"
@@ -99,3 +100,10 @@ func WithEnableResourceProxy(enable bool) AgentOption {
99100
return nil
100101
}
101102
}
103+
104+
func WithCacheRefreshInterval(interval time.Duration) AgentOption {
105+
return func(o *Agent) error {
106+
o.cacheRefreshInterval = interval
107+
return nil
108+
}
109+
}

agent/outbound.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
package agent
1616

1717
import (
18+
"errors"
19+
1820
"github.com/argoproj-labs/argocd-agent/internal/event"
1921
"github.com/argoproj-labs/argocd-agent/internal/logging/logfields"
2022
"github.com/argoproj-labs/argocd-agent/internal/resources"
2123
"github.com/argoproj-labs/argocd-agent/pkg/types"
2224
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
25+
cacheutil "github.com/argoproj/argo-cd/v3/util/cache"
2326
"github.com/sirupsen/logrus"
2427
corev1 "k8s.io/api/core/v1"
2528
)
@@ -261,3 +264,43 @@ func (a *Agent) addAppProjectDeletionToQueue(appProject *v1alpha1.AppProject) {
261264
q.Add(a.emitter.AppProjectEvent(event.Delete, appProject))
262265
logCtx.WithField(logfields.SendQueueLen, q.Len()).Debugf("Added appProject delete event to send queue")
263266
}
267+
268+
// addClusterCacheInfoUpdateToQueue processes a cluster cache info update event
269+
// and puts it in the send queue.
270+
func (a *Agent) addClusterCacheInfoUpdateToQueue() {
271+
logCtx := log().WithFields(logrus.Fields{
272+
"event": "addClusterCacheInfoUpdateToQueue",
273+
})
274+
275+
clusterServer := "https://kubernetes.default.svc"
276+
clusterInfo := &v1alpha1.ClusterInfo{}
277+
278+
// Get the updated cluster info from agent's cache.
279+
if err := a.clusterCache.GetClusterInfo(clusterServer, clusterInfo); err != nil {
280+
if !errors.Is(err, cacheutil.ErrCacheMiss) {
281+
logCtx.WithError(err).Errorf("Failed to get cluster info from cache")
282+
}
283+
return
284+
}
285+
286+
// Send the event to principal to update the cluster cache info.
287+
q := a.queues.SendQ(defaultQueueName)
288+
if q != nil {
289+
clusterInfoEvent := a.emitter.ClusterCacheInfoUpdateEvent(event.ClusterCacheInfoUpdate, &event.ClusterCacheInfo{
290+
ApplicationsCount: clusterInfo.ApplicationsCount,
291+
APIsCount: clusterInfo.CacheInfo.APIsCount,
292+
ResourcesCount: clusterInfo.CacheInfo.ResourcesCount,
293+
})
294+
295+
q.Add(clusterInfoEvent)
296+
logCtx.WithFields(logrus.Fields{
297+
"sendq_len": q.Len(),
298+
"sendq_name": defaultQueueName,
299+
"applicationsCount": clusterInfo.ApplicationsCount,
300+
"apisCount": clusterInfo.CacheInfo.APIsCount,
301+
"resourcesCount": clusterInfo.CacheInfo.ResourcesCount,
302+
}).Infof("Added ClusterCacheInfoUpdate event to send queue")
303+
} else {
304+
logCtx.Error("Default queue not found, unable to send ClusterCacheInfoUpdate event")
305+
}
306+
}

agent/outbound_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
package agent
22

33
import (
4+
"context"
45
"testing"
56

7+
"github.com/alicebob/miniredis/v2"
8+
"github.com/argoproj-labs/argocd-agent/internal/argocd/cluster"
69
"github.com/argoproj-labs/argocd-agent/internal/event"
10+
"github.com/argoproj-labs/argocd-agent/pkg/client"
711
"github.com/argoproj-labs/argocd-agent/pkg/types"
12+
"github.com/argoproj-labs/argocd-agent/test/fake/kube"
813
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
14+
cacheutil "github.com/argoproj/argo-cd/v3/util/cache"
915
"github.com/stretchr/testify/assert"
1016
"github.com/stretchr/testify/require"
1117
corev1 "k8s.io/api/core/v1"
@@ -371,3 +377,45 @@ func Test_addAppProjectDeletionToQueue(t *testing.T) {
371377
require.False(t, a.projectManager.IsManaged("test-project"))
372378
})
373379
}
380+
381+
func Test_addClusterCacheInfoUpdateToQueue(t *testing.T) {
382+
miniRedis, err := miniredis.Run()
383+
require.NoError(t, err)
384+
require.NotNil(t, miniRedis)
385+
defer miniRedis.Close()
386+
387+
kubec := kube.NewKubernetesFakeClientWithApps("argocd")
388+
remote, err := client.NewRemote("127.0.0.1", 8080)
389+
require.NoError(t, err)
390+
391+
a, err := NewAgent(context.TODO(), kubec, "argocd", WithRemote(remote), WithRedisHost(miniRedis.Addr()))
392+
require.NoError(t, err)
393+
394+
a.remote.SetClientID("agent")
395+
a.emitter = event.NewEventSource("principal")
396+
397+
// First populate the cache with dummy data
398+
clusterMgr, err := cluster.NewManager(a.context, a.namespace, miniRedis.Addr(), cacheutil.RedisCompressionGZip, a.kubeClient.Clientset)
399+
require.NoError(t, err)
400+
err = clusterMgr.MapCluster("test-agent", &v1alpha1.Cluster{
401+
Name: "test-cluster",
402+
Server: "https://kubernetes.default.svc",
403+
})
404+
require.NoError(t, err)
405+
406+
// Set dummy cluster cache stats
407+
clusterMgr.SetClusterCacheStats(&event.ClusterCacheInfo{
408+
ApplicationsCount: 5,
409+
APIsCount: 10,
410+
ResourcesCount: 100,
411+
}, "test-agent")
412+
413+
// Should not have an event in queue
414+
require.Equal(t, 0, a.queues.SendQ(defaultQueueName).Len())
415+
416+
// Add a cluster cache info update event to the queue
417+
a.addClusterCacheInfoUpdateToQueue()
418+
419+
// Should have an event in queue
420+
require.Equal(t, 1, a.queues.SendQ(defaultQueueName).Len())
421+
}

cmd/argocd-agent/agent.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ func NewAgentRunCommand() *cobra.Command {
6767
// Time interval for agent to principal ping
6868
// Ex: "30m", "1h" or "1h20m10s". Valid time units are "s", "m", "h".
6969
keepAlivePingInterval time.Duration
70+
71+
// Time interval for agent to refresh cluster cache info in principal
72+
cacheRefreshInterval time.Duration
7073
)
7174
command := &cobra.Command{
7275
Use: "agent",
@@ -174,6 +177,7 @@ func NewAgentRunCommand() *cobra.Command {
174177
agentOpts = append(agentOpts, agent.WithRedisPassword(redisPassword))
175178

176179
agentOpts = append(agentOpts, agent.WithEnableResourceProxy(enableResourceProxy))
180+
agentOpts = append(agentOpts, agent.WithCacheRefreshInterval(cacheRefreshInterval))
177181

178182
if metricsPort > 0 {
179183
agentOpts = append(agentOpts, agent.WithMetricsPort(metricsPort))
@@ -263,6 +267,9 @@ func NewAgentRunCommand() *cobra.Command {
263267
command.Flags().BoolVar(&enableResourceProxy, "enable-resource-proxy",
264268
env.BoolWithDefault("ARGOCD_AGENT_ENABLE_RESOURCE_PROXY", true),
265269
"Enable resource proxy")
270+
command.Flags().DurationVar(&cacheRefreshInterval, "cache-refresh-interval",
271+
env.DurationWithDefault("ARGOCD_AGENT_CACHE_REFRESH_INTERVAL", nil, 10*time.Second),
272+
"Interval to refresh cluster cache info in principal")
266273

267274
command.Flags().StringVar(&kubeConfig, "kubeconfig", "", "Path to a kubeconfig file to use")
268275
command.Flags().StringVar(&kubeContext, "kubecontext", "", "Override the default kube context")

0 commit comments

Comments
 (0)