Skip to content

Commit a2f6277

Browse files
Jayendra Parsaijparsai
authored andcommitted
feat: Populate cluster cache info in principal
Assisted by: Cursor Signed-off-by: Jayendra Parsai <jparsai@redhat.com>
1 parent 217bdb6 commit a2f6277

File tree

15 files changed

+870
-149
lines changed

15 files changed

+870
-149
lines changed

agent/agent.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,17 @@ func (a *Agent) Start(ctx context.Context) error {
353353
go http.ListenAndServe(healthzAddr, nil)
354354
}
355355

356+
// Start the background process of periodic sync of cluster cache info.
357+
// This will send periodic updates of Application, Resource and API counts to principal.
358+
if a.mode == types.AgentModeManaged {
359+
go func() {
360+
for {
361+
go a.addClusterCacheInfoUpdateToQueue("periodic_sync")
362+
time.Sleep(3 * time.Minute)
363+
}
364+
}()
365+
}
366+
356367
if a.remote != nil {
357368
a.remote.SetClientMode(a.mode)
358369
// TODO: Right now, maintainConnection always returns nil. Revisit

agent/inbound.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,12 @@ func (a *Agent) createApplication(incoming *v1alpha1.Application) (*v1alpha1.App
374374
return created, nil
375375
}
376376

377+
if a.mode == types.AgentModeManaged {
378+
// When sync status changed to Synced, trigger a cluster cache info update event
379+
// so that the principal can be informed of the cache update.
380+
go a.addClusterCacheInfoUpdateToQueue("application_created")
381+
}
382+
377383
return created, err
378384
}
379385

@@ -448,8 +454,11 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error {
448454
return err
449455
}
450456

451-
if a.mode == types.AgentModeManaged && err == nil {
457+
if a.mode == types.AgentModeManaged {
452458
appCache.DeleteApplicationSpec(app.UID, logCtx)
459+
// When sync status changed to Synced, trigger a cluster cache info update event
460+
// so that the principal can be informed of the cache update.
461+
go a.addClusterCacheInfoUpdateToQueue("application_deleted")
453462
}
454463

455464
err = a.appManager.Unmanage(app.QualifiedName())

agent/outbound.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,16 @@
1515
package agent
1616

1717
import (
18+
"errors"
19+
"time"
20+
21+
"github.com/argoproj-labs/argocd-agent/internal/argocd/cluster"
1822
"github.com/argoproj-labs/argocd-agent/internal/event"
1923
"github.com/argoproj-labs/argocd-agent/internal/logging/logfields"
2024
"github.com/argoproj-labs/argocd-agent/internal/resources"
2125
"github.com/argoproj-labs/argocd-agent/pkg/types"
2226
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
27+
cacheutil "github.com/argoproj/argo-cd/v3/util/cache"
2328
"github.com/sirupsen/logrus"
2429
corev1 "k8s.io/api/core/v1"
2530
)
@@ -105,6 +110,13 @@ func (a *Agent) addAppUpdateToQueue(old *v1alpha1.Application, new *v1alpha1.App
105110
WithField(logfields.SendQueueLen, q.Len()).
106111
WithField(logfields.SendQueueName, defaultQueueName).
107112
Debugf("Added event of type %s to send queue", eventType)
113+
114+
// When sync status changed to Synced, trigger a cluster cache info update event
115+
// so that the principal can be informed of the cache update.
116+
if a.mode == types.AgentModeManaged && old.Status.Sync.Status != new.Status.Sync.Status &&
117+
new.Status.Sync.Status == v1alpha1.SyncStatusCodeSynced {
118+
go a.addClusterCacheInfoUpdateToQueue("application_synced")
119+
}
108120
}
109121

110122
// addAppDeletionToQueue processes an application delete event originating from
@@ -261,3 +273,50 @@ func (a *Agent) addAppProjectDeletionToQueue(appProject *v1alpha1.AppProject) {
261273
q.Add(a.emitter.AppProjectEvent(event.Delete, appProject))
262274
logCtx.WithField(logfields.SendQueueLen, q.Len()).Debugf("Added appProject delete event to send queue")
263275
}
276+
277+
// addClusterCacheInfoUpdateToQueue processes a cluster cache info update event
278+
// and puts it in the send queue.
279+
func (a *Agent) addClusterCacheInfoUpdateToQueue(reason string) {
280+
logCtx := log().WithFields(logrus.Fields{
281+
"event": "addClusterCacheInfoUpdateToQueue",
282+
"reason": reason,
283+
})
284+
285+
clusterServer := "https://kubernetes.default.svc"
286+
var clusterInfo *v1alpha1.ClusterInfo
287+
var err error
288+
289+
// If the reason is application_synced, application_created or application_deleted,
290+
// we wait for 20 seconds to ensure that updated info is sent to principal.
291+
// It is because the cluster cache is not updated immediately by Argo CD.
292+
// Instead it is updated in next clusterInfoUpdater cycle.
293+
if reason != "periodic_sync" {
294+
time.Sleep(20 * time.Second)
295+
}
296+
297+
// Get the updated cluster info from agent's cache.
298+
clusterInfo, err = cluster.GetClusterInfo(a.context, a.kubeClient.Clientset, a.namespace, clusterServer, a.redisProxyMsgHandler.redisAddress, cacheutil.RedisCompressionGZip)
299+
if err != nil {
300+
if !errors.Is(err, cacheutil.ErrCacheMiss) {
301+
logCtx.WithError(err).Errorf("Failed to get cluster info from cache")
302+
}
303+
return
304+
}
305+
306+
// Send the event to principal to update the cluster cache info.
307+
q := a.queues.SendQ(defaultQueueName)
308+
if q != nil {
309+
clusterInfoEvent := a.emitter.ClusterCacheInfoUpdateEvent(event.ClusterCacheInfoUpdate, clusterInfo)
310+
q.Add(clusterInfoEvent)
311+
logCtx.WithFields(logrus.Fields{
312+
"sendq_len": q.Len(),
313+
"sendq_name": defaultQueueName,
314+
"applicationsCount": clusterInfo.ApplicationsCount,
315+
"apisCount": clusterInfo.CacheInfo.APIsCount,
316+
"resourcesCount": clusterInfo.CacheInfo.ResourcesCount,
317+
"reason": reason,
318+
}).Infof("Added ClusterCacheInfoUpdate event to send queue")
319+
} else {
320+
logCtx.Error("Default queue not found, unable to send ClusterCacheInfoUpdate event")
321+
}
322+
}

agent/outbound_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@ package agent
33
import (
44
"testing"
55

6+
"github.com/alicebob/miniredis/v2"
7+
"github.com/argoproj-labs/argocd-agent/internal/argocd/cluster"
68
"github.com/argoproj-labs/argocd-agent/internal/event"
79
"github.com/argoproj-labs/argocd-agent/pkg/types"
10+
"github.com/argoproj/argo-cd/v3/common"
811
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
12+
cacheutil "github.com/argoproj/argo-cd/v3/util/cache"
913
"github.com/stretchr/testify/assert"
1014
"github.com/stretchr/testify/require"
1115
corev1 "k8s.io/api/core/v1"
@@ -371,3 +375,51 @@ func Test_addAppProjectDeletionToQueue(t *testing.T) {
371375
require.False(t, a.projectManager.IsManaged("test-project"))
372376
})
373377
}
378+
379+
func Test_addClusterCacheInfoUpdateToQueue(t *testing.T) {
380+
a := newAgent(t)
381+
a.remote.SetClientID("agent")
382+
a.emitter = event.NewEventSource("principal")
383+
384+
miniRedis, err := miniredis.Run()
385+
require.NoError(t, err)
386+
require.NotNil(t, miniRedis)
387+
defer miniRedis.Close()
388+
err = WithRedisHost(miniRedis.Addr())(a)
389+
require.NoError(t, err)
390+
391+
// Create the required Redis secret
392+
secret := &corev1.Secret{
393+
ObjectMeta: v1.ObjectMeta{Name: common.RedisInitialCredentials},
394+
Data: map[string][]byte{common.RedisInitialCredentialsKey: []byte("password123")},
395+
}
396+
_, err = a.kubeClient.Clientset.CoreV1().Secrets("argocd").Create(a.context, secret, v1.CreateOptions{})
397+
require.NoError(t, err)
398+
399+
// First populate the cache with dummy data
400+
clusterMgr, err := cluster.NewManager(a.context, a.namespace, miniRedis.Addr(), cacheutil.RedisCompressionGZip, a.kubeClient.Clientset)
401+
require.NoError(t, err)
402+
err = clusterMgr.MapCluster("test-agent", &v1alpha1.Cluster{
403+
Name: "test-cluster",
404+
Server: "https://kubernetes.default.svc",
405+
})
406+
require.NoError(t, err)
407+
408+
// Set dummy cluster cache stats
409+
clusterMgr.SetClusterCacheStats(v1alpha1.ClusterInfo{
410+
ApplicationsCount: 5,
411+
CacheInfo: v1alpha1.ClusterCacheInfo{
412+
APIsCount: 10,
413+
ResourcesCount: 100,
414+
},
415+
}, "test-agent")
416+
417+
// Should not have an event in queue
418+
require.Equal(t, 0, a.queues.SendQ(defaultQueueName).Len())
419+
420+
// Add a cluster cache info update event to the queue
421+
a.addClusterCacheInfoUpdateToQueue("periodic_sync")
422+
423+
// Should have an event in queue
424+
require.Equal(t, 1, a.queues.SendQ(defaultQueueName).Len())
425+
}

0 commit comments

Comments
 (0)