Skip to content

Commit 85166b1

Browse files
Jayendra Parsaijparsai
authored andcommitted
feat: Populate cluster cache info in principal
Assisted by: Cursor Signed-off-by: Jayendra Parsai <jparsai@redhat.com>
1 parent bb90158 commit 85166b1

File tree

18 files changed

+1340
-154
lines changed

18 files changed

+1340
-154
lines changed

agent/agent.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,23 @@ func (a *Agent) Start(ctx context.Context) error {
353353
go http.ListenAndServe(healthzAddr, nil)
354354
}
355355

356+
// Start the background process of periodic sync of cluster cache info.
357+
// This will send periodic updates of Application, Resource and API counts to principal.
358+
if a.mode == types.AgentModeManaged {
359+
go func() {
360+
ticker := time.NewTicker(10 * time.Second)
361+
defer ticker.Stop()
362+
for {
363+
select {
364+
case <-ticker.C:
365+
a.addClusterCacheInfoUpdateToQueue()
366+
case <-a.context.Done():
367+
return
368+
}
369+
}
370+
}()
371+
}
372+
356373
if a.remote != nil {
357374
a.remote.SetClientMode(a.mode)
358375
// TODO: Right now, maintainConnection always returns nil. Revisit

agent/inbound.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ func (a *Agent) deleteApplication(app *v1alpha1.Application) error {
448448
return err
449449
}
450450

451-
if a.mode == types.AgentModeManaged && err == nil {
451+
if a.mode == types.AgentModeManaged {
452452
appCache.DeleteApplicationSpec(app.UID, logCtx)
453453
}
454454

agent/outbound.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,22 @@
1515
package agent
1616

1717
import (
18+
"errors"
19+
20+
"github.com/argoproj-labs/argocd-agent/internal/argocd/cluster"
21+
"github.com/argoproj-labs/argocd-agent/internal/cache"
1822
"github.com/argoproj-labs/argocd-agent/internal/event"
1923
"github.com/argoproj-labs/argocd-agent/internal/logging/logfields"
2024
"github.com/argoproj-labs/argocd-agent/internal/resources"
2125
"github.com/argoproj-labs/argocd-agent/pkg/types"
2226
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
27+
cacheutil "github.com/argoproj/argo-cd/v3/util/cache"
2328
"github.com/sirupsen/logrus"
2429
corev1 "k8s.io/api/core/v1"
2530
)
2631

32+
var eventSentAfterZeroApps = false
33+
2734
// addAppCreationToQueue processes a new application event originating from the
2835
// AppInformer and puts it in the send queue.
2936
func (a *Agent) addAppCreationToQueue(app *v1alpha1.Application) {
@@ -261,3 +268,66 @@ func (a *Agent) addAppProjectDeletionToQueue(appProject *v1alpha1.AppProject) {
261268
q.Add(a.emitter.AppProjectEvent(event.Delete, appProject))
262269
logCtx.WithField(logfields.SendQueueLen, q.Len()).Debugf("Added appProject delete event to send queue")
263270
}
271+
272+
// addClusterCacheInfoUpdateToQueue processes a cluster cache info update event
273+
// and puts it in the send queue.
274+
func (a *Agent) addClusterCacheInfoUpdateToQueue() {
275+
logCtx := log().WithFields(logrus.Fields{
276+
"event": "addClusterCacheInfoUpdateToQueue",
277+
})
278+
279+
clusterServer := "https://kubernetes.default.svc"
280+
var clusterInfo *v1alpha1.ClusterInfo
281+
var err error
282+
283+
// Get the updated cluster info from agent's cache.
284+
clusterInfo, err = cluster.GetClusterInfo(a.context, a.kubeClient.Clientset, a.namespace, clusterServer, a.redisProxyMsgHandler.redisAddress, cacheutil.RedisCompressionGZip, cache.SourceAgent)
285+
if err != nil {
286+
if !errors.Is(err, cacheutil.ErrCacheMiss) {
287+
logCtx.WithError(err).Errorf("Failed to get cluster info from cache")
288+
}
289+
return
290+
}
291+
292+
// If the cluster does not have any applications, Argo CD does not update the cluster cache info.
293+
// Hence there is no need to send event to principal in this case, as information is same as last time.
294+
// However, after application count changed to 0, we need to send one last event to principal to share the information after application deletion.
295+
// For this, we use eventSentAfterZeroApps flag to track if the an event has been sent after application count changed to 0.
296+
297+
// If the cluster has applications, we reset the eventSentAfterZeroApps flag to false.
298+
if clusterInfo.ApplicationsCount != 0 {
299+
eventSentAfterZeroApps = false
300+
}
301+
302+
// If the one event has been sent after application count changed to 0, then do nothing.
303+
if eventSentAfterZeroApps {
304+
return
305+
}
306+
307+
// If the cluster has no applications, we set the eventSentAfterZeroApps flag to true.
308+
// and send one event to principal to update the cluster cache info.
309+
if clusterInfo.ApplicationsCount == 0 {
310+
eventSentAfterZeroApps = true
311+
}
312+
313+
// Send the event to principal to update the cluster cache info.
314+
q := a.queues.SendQ(defaultQueueName)
315+
if q != nil {
316+
clusterInfoEvent := a.emitter.ClusterCacheInfoUpdateEvent(event.ClusterCacheInfoUpdate, &event.ClusterCacheInfo{
317+
ApplicationsCount: clusterInfo.ApplicationsCount,
318+
APIsCount: clusterInfo.CacheInfo.APIsCount,
319+
ResourcesCount: clusterInfo.CacheInfo.ResourcesCount,
320+
})
321+
322+
q.Add(clusterInfoEvent)
323+
logCtx.WithFields(logrus.Fields{
324+
"sendq_len": q.Len(),
325+
"sendq_name": defaultQueueName,
326+
"applicationsCount": clusterInfo.ApplicationsCount,
327+
"apisCount": clusterInfo.CacheInfo.APIsCount,
328+
"resourcesCount": clusterInfo.CacheInfo.ResourcesCount,
329+
}).Infof("Added ClusterCacheInfoUpdate event to send queue")
330+
} else {
331+
logCtx.Error("Default queue not found, unable to send ClusterCacheInfoUpdate event")
332+
}
333+
}

agent/outbound_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,14 @@ package agent
33
import (
44
"testing"
55

6+
"github.com/alicebob/miniredis/v2"
7+
"github.com/argoproj-labs/argocd-agent/internal/argocd/cluster"
68
"github.com/argoproj-labs/argocd-agent/internal/event"
79
"github.com/argoproj-labs/argocd-agent/pkg/types"
10+
"github.com/argoproj/argo-cd/v3/common"
811
"github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
12+
cacheutil "github.com/argoproj/argo-cd/v3/util/cache"
13+
"github.com/google/uuid"
914
"github.com/stretchr/testify/assert"
1015
"github.com/stretchr/testify/require"
1116
corev1 "k8s.io/api/core/v1"
@@ -371,3 +376,49 @@ func Test_addAppProjectDeletionToQueue(t *testing.T) {
371376
require.False(t, a.projectManager.IsManaged("test-project"))
372377
})
373378
}
379+
380+
func Test_addClusterCacheInfoUpdateToQueue(t *testing.T) {
381+
a := newAgent(t)
382+
a.remote.SetClientID("agent")
383+
a.emitter = event.NewEventSource("principal")
384+
385+
miniRedis, err := miniredis.Run()
386+
require.NoError(t, err)
387+
require.NotNil(t, miniRedis)
388+
defer miniRedis.Close()
389+
err = WithRedisHost(miniRedis.Addr())(a)
390+
require.NoError(t, err)
391+
392+
// Create the required Redis secret
393+
secret := &corev1.Secret{
394+
ObjectMeta: v1.ObjectMeta{Name: common.RedisInitialCredentials},
395+
Data: map[string][]byte{common.RedisInitialCredentialsKey: []byte(uuid.NewString())},
396+
}
397+
_, err = a.kubeClient.Clientset.CoreV1().Secrets("argocd").Create(a.context, secret, v1.CreateOptions{})
398+
require.NoError(t, err)
399+
400+
// First populate the cache with dummy data
401+
clusterMgr, err := cluster.NewManager(a.context, a.namespace, miniRedis.Addr(), cacheutil.RedisCompressionGZip, a.kubeClient.Clientset)
402+
require.NoError(t, err)
403+
err = clusterMgr.MapCluster("test-agent", &v1alpha1.Cluster{
404+
Name: "test-cluster",
405+
Server: "https://kubernetes.default.svc",
406+
})
407+
require.NoError(t, err)
408+
409+
// Set dummy cluster cache stats
410+
clusterMgr.SetClusterCacheStats(&event.ClusterCacheInfo{
411+
ApplicationsCount: 5,
412+
APIsCount: 10,
413+
ResourcesCount: 100,
414+
}, "test-agent")
415+
416+
// Should not have an event in queue
417+
require.Equal(t, 0, a.queues.SendQ(defaultQueueName).Len())
418+
419+
// Add a cluster cache info update event to the queue
420+
a.addClusterCacheInfoUpdateToQueue()
421+
422+
// Should have an event in queue
423+
require.Equal(t, 1, a.queues.SendQ(defaultQueueName).Len())
424+
}

internal/argocd/cluster/cluster.go

Lines changed: 114 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,24 @@
1515
package cluster
1616

1717
import (
18+
"context"
19+
"errors"
1820
"fmt"
1921
"time"
2022

21-
"github.com/redis/go-redis/v9"
23+
"github.com/argoproj-labs/argocd-agent/internal/cache"
24+
"github.com/argoproj-labs/argocd-agent/internal/event"
25+
"github.com/sirupsen/logrus"
2226

23-
"github.com/argoproj/argo-cd/v3/common"
2427
appv1 "github.com/argoproj/argo-cd/v3/pkg/apis/application/v1alpha1"
2528
cacheutil "github.com/argoproj/argo-cd/v3/util/cache"
26-
appstatecache "github.com/argoproj/argo-cd/v3/util/cache/appstate"
2729
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/client-go/kubernetes"
2831
)
2932

30-
// UpdateClusterConnectionInfo updates cluster info with connection state and time in mapped cluster
31-
func (m *Manager) UpdateClusterConnectionInfo(agentName, status string, t time.Time) {
33+
// SetAgentConnectionStatus updates cluster info with connection state and time in mapped cluster at principal.
34+
// This is called when the agent is connected or disconnected with the principal.
35+
func (m *Manager) SetAgentConnectionStatus(agentName, status appv1.ConnectionStatus, modifiedAt time.Time) {
3236
m.mutex.Lock()
3337
defer m.mutex.Unlock()
3438

@@ -39,67 +43,135 @@ func (m *Manager) UpdateClusterConnectionInfo(agentName, status string, t time.T
3943
return
4044
}
4145

42-
// Create Redis client and cache
43-
redisOptions := &redis.Options{Addr: m.redisAddress}
44-
45-
if err := common.SetOptionalRedisPasswordFromKubeConfig(m.ctx, m.kubeclient, m.namespace, redisOptions); err != nil {
46-
log().Errorf("Failed to fetch & set redis password for namespace %s: %v", m.namespace, err)
47-
}
48-
redisClient := redis.NewClient(redisOptions)
49-
50-
cache := appstatecache.NewCache(cacheutil.NewCache(
51-
cacheutil.NewRedisCache(redisClient, time.Minute, m.redisCompressionType)), time.Minute)
52-
5346
state := "disconnected"
5447
if status == appv1.ConnectionStatusSuccessful {
5548
state = "connected"
5649
}
5750

58-
// update the info
59-
if err := cache.SetClusterInfo(cluster.Server,
51+
// Update the cluster connection state and time in mapped cluster at principal.
52+
if err := m.setClusterInfo(cluster.Server, agentName, cluster.Name,
6053
&appv1.ClusterInfo{
6154
ConnectionState: appv1.ConnectionState{
6255
Status: status,
6356
Message: fmt.Sprintf("Agent: '%s' is %s with principal", agentName, state),
64-
ModifiedAt: &metav1.Time{Time: t}}},
65-
); err != nil {
66-
log().Errorf("Failed to update connection info in Cluster: '%s' mapped with Agent: '%s'. Error: %v", cluster.Name, agentName, err)
57+
ModifiedAt: &metav1.Time{Time: modifiedAt},
58+
},
59+
}); err != nil {
60+
log().Errorf("failed to refresh connection info in cluster: '%s' mapped with agent: '%s'. Error: %v", cluster.Name, agentName, err)
6761
return
6862
}
6963

7064
log().Infof("Updated connection status to '%s' in Cluster: '%s' mapped with Agent: '%s'", status, cluster.Name, agentName)
7165
}
7266

73-
// refreshClusterConnectionInfo gets latest cluster info from cache and re-saves it to avoid deletion of info
74-
// by ArgoCD after cache expiration time duration (i.e. 10 Minute)
75-
func (m *Manager) refreshClusterConnectionInfo() {
67+
// refreshClusterInfo gets latest cluster info from cache and re-saves it to avoid deletion of info
68+
// by Argo CD after cache expiration time duration (i.e. 10 minutes)
69+
func (m *Manager) refreshClusterInfo() {
7670
m.mutex.Lock()
7771
defer m.mutex.Unlock()
7872

79-
// iterate through all clusters
73+
// Iterate through all clusters.
8074
for agentName, cluster := range m.clusters {
81-
// Create Redis client and cache
82-
redisOptions := &redis.Options{Addr: m.redisAddress}
75+
clusterInfo, err := GetClusterInfo(m.ctx, m.kubeclient, m.namespace, cluster.Server, m.redisAddress, m.redisCompressionType, cache.SourcePrincipal)
76+
if err != nil {
77+
if !errors.Is(err, cacheutil.ErrCacheMiss) {
78+
log().Errorf("failed to get connection info from cluster: '%s' mapped with agent: '%s'. Error: %v", cluster.Name, agentName, err)
79+
}
80+
continue
81+
}
8382

84-
if err := common.SetOptionalRedisPasswordFromKubeConfig(m.ctx, m.kubeclient, m.namespace, redisOptions); err != nil {
85-
log().Errorf("Failed to fetch & set redis password for namespace %s: %v", m.namespace, err)
83+
// Re-save same info.
84+
if err := m.setClusterInfo(cluster.Server, agentName, cluster.Name, clusterInfo); err != nil {
85+
log().Errorf("failed to refresh connection info in cluster: '%s' mapped with agent: '%s'. Error: %v", cluster.Name, agentName, err)
86+
continue
8687
}
87-
redisClient := redis.NewClient(redisOptions)
88+
}
89+
}
8890

89-
cache := appstatecache.NewCache(cacheutil.NewCache(
90-
cacheutil.NewRedisCache(redisClient, time.Minute, m.redisCompressionType)), time.Minute)
91+
// SetClusterCacheStats updates cluster cache info with Application, Resource and API counts in principal.
92+
// This is called when principal receives clusterCacheInfoUpdate event from agent.
93+
func (m *Manager) SetClusterCacheStats(clusterInfo *event.ClusterCacheInfo, agentName string) error {
94+
m.mutex.Lock()
95+
defer m.mutex.Unlock()
9196

92-
// fetch latest info
93-
clusterInfo := &appv1.ClusterInfo{}
94-
if err := cache.GetClusterInfo(cluster.Server, clusterInfo); err != nil {
95-
log().Errorf("Failed to get connection info from Cluster: '%s' mapped with Agent: '%s'. Error: %v", cluster.Name, agentName, err)
96-
return
97-
}
97+
// Check if we have a mapping for the requested agent.
98+
cluster := m.mapping(agentName)
99+
if cluster == nil {
100+
log().Errorf("agent %s is not mapped to any cluster", agentName)
101+
return fmt.Errorf("agent %s is not mapped to any cluster", agentName)
102+
}
103+
104+
// Get existing cluster info to preserve cluster connection status
105+
existingClusterInfo, err := GetClusterInfo(m.ctx, m.kubeclient, m.namespace, cluster.Server, m.redisAddress, m.redisCompressionType, cache.SourcePrincipal)
106+
if err != nil && !errors.Is(err, cacheutil.ErrCacheMiss) {
107+
log().Errorf("failed to get existing cluster info for cluster: '%s' mapped with agent: '%s'. Error: %v", cluster.Name, agentName, err)
108+
// Continue with default values if we can't get existing info
109+
}
110+
111+
// Create new cluster info with cache stats
112+
newClusterInfo := &appv1.ClusterInfo{
113+
ApplicationsCount: clusterInfo.ApplicationsCount,
114+
CacheInfo: appv1.ClusterCacheInfo{
115+
APIsCount: clusterInfo.APIsCount,
116+
ResourcesCount: clusterInfo.ResourcesCount,
117+
},
118+
}
98119

99-
// re-save same info
100-
if err := cache.SetClusterInfo(cluster.Server, clusterInfo); err != nil {
101-
log().Errorf("Failed to refresh connection info in Cluster: '%s' mapped with Agent: '%s'. Error: %v", cluster.Name, agentName, err)
102-
return
120+
// Preserve existing cluster connection status
121+
if existingClusterInfo != nil {
122+
newClusterInfo.ConnectionState = existingClusterInfo.ConnectionState
123+
if existingClusterInfo.CacheInfo.LastCacheSyncTime != nil {
124+
newClusterInfo.CacheInfo.LastCacheSyncTime = existingClusterInfo.CacheInfo.LastCacheSyncTime
103125
}
104126
}
127+
128+
// Set the info in mapped cluster at principal.
129+
if err := m.setClusterInfo(cluster.Server, agentName, cluster.Name, newClusterInfo); err != nil {
130+
log().Errorf("failed to update cluster cache stats in cluster: '%s' mapped with agent: '%s'. Error: %v", cluster.Name, agentName, err)
131+
return err
132+
}
133+
134+
log().WithFields(logrus.Fields{
135+
"applicationsCount": clusterInfo.ApplicationsCount,
136+
"apisCount": clusterInfo.APIsCount,
137+
"resourcesCount": clusterInfo.ResourcesCount,
138+
"cluster": cluster.Name,
139+
"agent": agentName,
140+
}).Infof("Updated cluster cache stats in cluster.")
141+
142+
return nil
143+
}
144+
145+
// setClusterInfo saves the given ClusterInfo in the cache.
146+
func (m *Manager) setClusterInfo(clusterServer, agentName, clusterName string, clusterInfo *appv1.ClusterInfo) error {
147+
148+
// Get cluster cache instance from redis.
149+
clusterCache, err := cache.GetCacheInstance(m.ctx, m.kubeclient, m.namespace, m.redisAddress, m.redisCompressionType, cache.SourcePrincipal)
150+
if err != nil {
151+
return fmt.Errorf("failed to get cluster cache instance: %v", err)
152+
}
153+
154+
// Save the given cluster info in cache.
155+
if err := clusterCache.SetClusterInfo(clusterServer, clusterInfo); err != nil {
156+
return fmt.Errorf("failed to refresh connection info in cluster: '%s' mapped with agent: '%s': %v", clusterName, agentName, err)
157+
}
158+
return nil
159+
}
160+
161+
// GetClusterInfo retrieves the ClusterInfo for a given cluster from the cache.
162+
func GetClusterInfo(ctx context.Context, kubeclient kubernetes.Interface, namespace,
163+
clusterServer, redisAddress string, redisCompressionType cacheutil.RedisCompressionType, role string) (*appv1.ClusterInfo, error) {
164+
165+
// Get cluster cache instance from redis.
166+
clusterCache, err := cache.GetCacheInstance(ctx, kubeclient, namespace, redisAddress, redisCompressionType, role)
167+
if err != nil {
168+
return nil, fmt.Errorf("failed to get cluster cache instance: %v", err)
169+
}
170+
171+
clusterInfo := &appv1.ClusterInfo{}
172+
// Fetch the cluster info from cache.
173+
if err := clusterCache.GetClusterInfo(clusterServer, clusterInfo); err != nil {
174+
return nil, err
175+
}
176+
return clusterInfo, nil
105177
}

0 commit comments

Comments
 (0)