Skip to content

Commit 8a65879

Browse files
authored
Refactor correlation ID handling in notifications to use normalization and decoding functions (#303)
1 parent 53f659a commit 8a65879

File tree

3 files changed

+71
-25
lines changed

3 files changed

+71
-25
lines changed

.vscode/launch.json

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,7 @@
3737
"envFile": "${workspaceFolder}/.env",
3838
"args": [
3939
"test",
40-
"catalog-cache",
41-
"is-cached",
42-
"--file_path=/Users/cjlapao/Downloads",
43-
"--target_path=dropbox/test_machine/macos",
44-
"--target_filename=21de185744bf519e687cdf12f62b1c741371cdfa5e747b029056710e5b8c57fe-1.pvm"
40+
"catalog-providers"
4541
]
4642
},
4743
{

src/notifications/main.go

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (p *NotificationService) SetContext(ctx basecontext.ApiContext) *Notificati
101101

102102
func (p *NotificationService) ResetCounters(correlationId string) {
103103
if correlationId != "" {
104-
delete(p.progressCounters, correlationId)
104+
delete(p.progressCounters, normalizeCorrelationID(correlationId))
105105
}
106106
}
107107

@@ -172,10 +172,11 @@ func (p *NotificationService) updateProgressTracker(tracker *ProgressTracker, pr
172172

173173
func (p *NotificationService) NotifyProgress(correlationId string, prefix string, progress float64) {
174174
msg := NewProgressNotificationMessage(correlationId, prefix, progress)
175+
encodedID := msg.CorrelationId()
175176

176177
// Create or update progress tracker
177178
p.mu.Lock()
178-
tracker, exists := p.activeProgress[correlationId]
179+
tracker, exists := p.activeProgress[encodedID]
179180
if !exists {
180181
tracker = &ProgressTracker{
181182
StartTime: time.Now(),
@@ -184,23 +185,28 @@ func (p *NotificationService) NotifyProgress(correlationId string, prefix string
184185
RateSamples: make([]RateSample, 0, 60),
185186
TotalSize: msg.totalSize, // Make sure we capture the total size
186187
}
187-
p.activeProgress[correlationId] = tracker
188+
p.activeProgress[encodedID] = tracker
188189
}
189-
p.mu.Unlock()
190190

191-
p.updateProgressTracker(tracker, progress, msg.currentSize)
191+
currentSize := msg.currentSize
192+
if currentSize == 0 {
193+
currentSize = tracker.CurrentSize
194+
}
195+
p.updateProgressTracker(tracker, progress, currentSize)
192196

193197
if progress >= 100 {
194198
msg.Close()
195199
tracker.IsComplete = true
196200
}
201+
p.mu.Unlock()
197202

198203
p.Notify(msg)
199204
}
200205

201206
func (p *NotificationService) FinishProgress(correlationId string, prefix string) {
207+
encodedID := normalizeCorrelationID(correlationId)
202208
p.mu.Lock()
203-
if tracker, exists := p.activeProgress[correlationId]; exists {
209+
if tracker, exists := p.activeProgress[encodedID]; exists {
204210
tracker.CurrentProgress = 100
205211
tracker.LastUpdateTime = time.Now()
206212
tracker.IsComplete = true
@@ -353,22 +359,30 @@ func (p *NotificationService) CleanupNotifications(correlationId string) {
353359
return
354360
}
355361

362+
encodedID := normalizeCorrelationID(correlationId)
363+
p.cleanupNotificationsByEncodedID(encodedID)
364+
p.ctx.LogDebugf("Cleaned up notifications for correlation ID: %s", correlationId)
365+
}
366+
367+
func (p *NotificationService) cleanupNotificationsByEncodedID(encodedID string) {
368+
if encodedID == "" {
369+
return
370+
}
371+
356372
// Remove from active progress tracking
357373
p.mu.Lock()
358-
delete(p.activeProgress, correlationId)
374+
delete(p.activeProgress, encodedID)
359375
p.mu.Unlock()
360376

361377
// Reset previous message if it was for this correlation ID
362-
if p.previousMessage.correlationId == correlationId {
378+
if p.previousMessage.correlationId == encodedID {
363379
p.previousMessage = NotificationMessage{}
364380
}
365381

366382
// Reset current message if it was for this correlation ID
367-
if p.CurrentMessage.correlationId == correlationId {
383+
if p.CurrentMessage.correlationId == encodedID {
368384
p.CurrentMessage = NotificationMessage{}
369385
}
370-
371-
p.ctx.LogDebugf("Cleaned up notifications for correlation ID: %s", correlationId)
372386
}
373387

374388
// GetActiveProgressCount returns the number of active progress notifications
@@ -391,18 +405,20 @@ func (p *NotificationService) GetActiveProgressIDs() []string {
391405

392406
// IsProgressActive checks if a progress notification is active for the given correlation ID
393407
func (p *NotificationService) IsProgressActive(correlationId string) bool {
408+
encodedID := normalizeCorrelationID(correlationId)
394409
p.mu.RLock()
395410
defer p.mu.RUnlock()
396-
_, exists := p.activeProgress[correlationId]
411+
_, exists := p.activeProgress[encodedID]
397412
return exists
398413
}
399414

400415
// GetProgressStatus returns the current progress status for a given correlation ID
401416
// Returns progress percentage and whether the progress exists
402417
func (p *NotificationService) GetProgressStatus(correlationId string) (float64, bool) {
418+
encodedID := normalizeCorrelationID(correlationId)
403419
p.mu.RLock()
404420
defer p.mu.RUnlock()
405-
if tracker, exists := p.activeProgress[correlationId]; exists {
421+
if tracker, exists := p.activeProgress[encodedID]; exists {
406422
return tracker.CurrentProgress, true
407423
}
408424
return 0, false
@@ -425,26 +441,32 @@ func (p *NotificationService) CleanupStaleProgress(staleDuration time.Duration)
425441

426442
// Then cleanup each ID (this will acquire write lock)
427443
for _, id := range idsToCleanup {
428-
p.ctx.LogDebugf("Cleaning up stale progress for correlation ID: %s", id)
429-
p.CleanupNotifications(id)
444+
decodedID, err := decodeCorrelationID(id)
445+
if err != nil || decodedID == "" {
446+
decodedID = id
447+
}
448+
p.ctx.LogDebugf("Cleaning up stale progress for correlation ID: %s", decodedID)
449+
p.cleanupNotificationsByEncodedID(id)
430450
}
431451
}
432452

433453
// GetProgressDuration returns the duration since the progress started
434454
func (p *NotificationService) GetProgressDuration(correlationId string) (time.Duration, bool) {
455+
encodedID := normalizeCorrelationID(correlationId)
435456
p.mu.RLock()
436457
defer p.mu.RUnlock()
437-
if tracker, exists := p.activeProgress[correlationId]; exists {
458+
if tracker, exists := p.activeProgress[encodedID]; exists {
438459
return time.Since(tracker.StartTime), true
439460
}
440461
return 0, false
441462
}
442463

443464
// GetProgressRate calculates transfer and progress rates for a given correlation ID
444465
func (p *NotificationService) GetProgressRate(correlationId string) (*ProgressRate, bool) {
466+
encodedID := normalizeCorrelationID(correlationId)
445467
p.mu.RLock()
446468
defer p.mu.RUnlock()
447-
tracker, exists := p.activeProgress[correlationId]
469+
tracker, exists := p.activeProgress[encodedID]
448470
if !exists || tracker.TotalSize <= 0 {
449471
return nil, false
450472
}
@@ -465,9 +487,10 @@ func (p *NotificationService) GetProgressRate(correlationId string) (*ProgressRa
465487

466488
// PredictTimeRemaining estimates the time remaining based on recent progress
467489
func (p *NotificationService) PredictTimeRemaining(correlationId string) (time.Duration, bool) {
490+
encodedID := normalizeCorrelationID(correlationId)
468491
p.mu.RLock()
469492
defer p.mu.RUnlock()
470-
tracker, exists := p.activeProgress[correlationId]
493+
tracker, exists := p.activeProgress[encodedID]
471494
if !exists || tracker.TotalSize <= 0 {
472495
return 0, false
473496
}

src/notifications/notification_message.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,33 @@ import (
55
"time"
66
)
77

8+
func normalizeCorrelationID(id string) string {
9+
if id == "" {
10+
return ""
11+
}
12+
13+
decoded, err := base64.StdEncoding.DecodeString(id)
14+
if err == nil && base64.StdEncoding.EncodeToString(decoded) == id {
15+
// Already encoded
16+
return id
17+
}
18+
19+
return base64.StdEncoding.EncodeToString([]byte(id))
20+
}
21+
22+
func decodeCorrelationID(id string) (string, error) {
23+
if id == "" {
24+
return "", nil
25+
}
26+
27+
decoded, err := base64.StdEncoding.DecodeString(id)
28+
if err != nil {
29+
return "", err
30+
}
31+
32+
return string(decoded), nil
33+
}
34+
835
type NotificationMessage struct {
936
correlationId string
1037
Message string
@@ -27,7 +54,7 @@ func NewNotificationMessage(message string, level NotificationMessageLevel) *Not
2754
}
2855

2956
func NewProgressNotificationMessage(correlationId string, message string, progress float64) *NotificationMessage {
30-
cid := base64.StdEncoding.EncodeToString([]byte(correlationId))
57+
cid := normalizeCorrelationID(correlationId)
3158
return &NotificationMessage{
3259
correlationId: cid,
3360
Message: message,
@@ -42,7 +69,7 @@ func (nm *NotificationMessage) String() string {
4269
}
4370

4471
func (nm *NotificationMessage) SetCorrelationId(id string) *NotificationMessage {
45-
nm.correlationId = id
72+
nm.correlationId = normalizeCorrelationID(id)
4673
return nm
4774
}
4875

0 commit comments

Comments
 (0)