-
Notifications
You must be signed in to change notification settings - Fork 7
feat: implement retry mechanism for log processing #136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
834bc2b
232e983
dc8bde6
c99d83c
f1e6de3
6447dd5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,8 @@ type podEventLoggerOptions struct { | |
|
||
logger slog.Logger | ||
logDebounce time.Duration | ||
// maxRetries is the maximum number of retries for a log send failure. | ||
maxRetries int | ||
|
||
// The following fields are optional! | ||
namespaces []string | ||
|
@@ -52,6 +54,10 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve | |
opts.clock = quartz.NewReal() | ||
} | ||
|
||
if opts.maxRetries == 0 { | ||
opts.maxRetries = 10 | ||
} | ||
|
||
logCh := make(chan agentLog, 512) | ||
ctx, cancelFunc := context.WithCancel(ctx) | ||
reporter := &podEventLogger{ | ||
|
@@ -75,6 +81,7 @@ func newPodEventLogger(ctx context.Context, opts podEventLoggerOptions) (*podEve | |
logCache: logCache{ | ||
logs: map[string][]agentsdk.Log{}, | ||
}, | ||
maxRetries: opts.maxRetries, | ||
}, | ||
} | ||
|
||
|
@@ -407,6 +414,11 @@ type logQueuer struct { | |
loggerTTL time.Duration | ||
loggers map[string]agentLoggerLifecycle | ||
logCache logCache | ||
|
||
// retries maps agent tokens to their retry state for exponential backoff | ||
retries map[string]*retryState | ||
// maxRetries is the maximum number of retries for a log send failure. | ||
maxRetries int | ||
} | ||
|
||
func (l *logQueuer) work(ctx context.Context) { | ||
|
@@ -427,87 +439,117 @@ func (l *logQueuer) work(ctx context.Context) { | |
} | ||
} | ||
|
||
func (l *logQueuer) newLogger(ctx context.Context, log agentLog) (agentLoggerLifecycle, error) { | ||
client := agentsdk.New(l.coderURL) | ||
client.SetSessionToken(log.agentToken) | ||
logger := l.logger.With(slog.F("resource_name", log.resourceName)) | ||
client.SDK.SetLogger(logger) | ||
|
||
_, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ | ||
ID: sourceUUID, | ||
Icon: "/icon/k8s.png", | ||
DisplayName: "Kubernetes", | ||
}) | ||
if err != nil { | ||
// Posting the log source failed, which affects how logs appear. | ||
// We'll retry to ensure the log source is properly registered. | ||
logger.Error(ctx, "post log source", slog.Error(err)) | ||
return agentLoggerLifecycle{}, err | ||
} | ||
|
||
ls := agentsdk.NewLogSender(logger) | ||
sl := ls.GetScriptLogger(sourceUUID) | ||
|
||
gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) | ||
|
||
// connect to Agent v2.0 API, since we don't need features added later. | ||
// This maximizes compatibility. | ||
arpc, err := client.ConnectRPC20(gracefulCtx) | ||
if err != nil { | ||
logger.Error(ctx, "drpc connect", slog.Error(err)) | ||
gracefulCancel() | ||
return agentLoggerLifecycle{}, err | ||
} | ||
go func() { | ||
err := ls.SendLoop(gracefulCtx, arpc) | ||
// if the send loop exits on its own without the context | ||
// canceling, timeout the logger and force it to recreate. | ||
if err != nil && ctx.Err() == nil { | ||
l.loggerTimeout(log.agentToken) | ||
} | ||
}() | ||
|
||
closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { | ||
logger.Info(ctx, "logger timeout firing") | ||
l.loggerTimeout(log.agentToken) | ||
}) | ||
lifecycle := agentLoggerLifecycle{ | ||
scriptLogger: sl, | ||
close: func() { | ||
defer arpc.DRPCConn().Close() | ||
defer client.SDK.HTTPClient.CloseIdleConnections() | ||
// We could be stopping for reasons other than the timeout. If | ||
// so, stop the timer. | ||
closeTimer.Stop() | ||
defer gracefulCancel() | ||
timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) | ||
defer timeout.Stop() | ||
logger.Info(ctx, "logger closing") | ||
|
||
if err := sl.Flush(gracefulCtx); err != nil { | ||
// ctx err | ||
logger.Warn(gracefulCtx, "timeout reached while flushing") | ||
return | ||
} | ||
|
||
if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { | ||
// ctx err | ||
logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") | ||
} | ||
}, | ||
} | ||
lifecycle.closeTimer = closeTimer | ||
return lifecycle, nil | ||
} | ||
|
||
func (l *logQueuer) processLog(ctx context.Context, log agentLog) { | ||
l.mu.Lock() | ||
defer l.mu.Unlock() | ||
queuedLogs := l.logCache.push(log) | ||
|
||
queuedLogs := l.logCache.get(log.agentToken) | ||
if isAgentLogEmpty(log) { | ||
if queuedLogs == nil { | ||
return | ||
} | ||
} else { | ||
queuedLogs = l.logCache.push(log) | ||
} | ||
|
||
lgr, ok := l.loggers[log.agentToken] | ||
if !ok { | ||
client := agentsdk.New(l.coderURL) | ||
client.SetSessionToken(log.agentToken) | ||
logger := l.logger.With(slog.F("resource_name", log.resourceName)) | ||
client.SDK.SetLogger(logger) | ||
|
||
_, err := client.PostLogSource(ctx, agentsdk.PostLogSourceRequest{ | ||
ID: sourceUUID, | ||
Icon: "/icon/k8s.png", | ||
DisplayName: "Kubernetes", | ||
}) | ||
if err != nil { | ||
// This shouldn't fail sending the log, as it only affects how they | ||
// appear. | ||
logger.Error(ctx, "post log source", slog.Error(err)) | ||
// skip if we're in a retry cooldown window | ||
if rs := l.retries[log.agentToken]; rs != nil && rs.timer != nil { | ||
return | ||
} | ||
|
||
ls := agentsdk.NewLogSender(logger) | ||
sl := ls.GetScriptLogger(sourceUUID) | ||
|
||
gracefulCtx, gracefulCancel := context.WithCancel(context.Background()) | ||
|
||
// connect to Agent v2.0 API, since we don't need features added later. | ||
// This maximizes compatibility. | ||
arpc, err := client.ConnectRPC20(gracefulCtx) | ||
var err error | ||
lgr, err = l.newLogger(ctx, log) | ||
if err != nil { | ||
logger.Error(ctx, "drpc connect", slog.Error(err)) | ||
gracefulCancel() | ||
l.scheduleRetry(ctx, log.agentToken) | ||
return | ||
} | ||
go func() { | ||
err := ls.SendLoop(gracefulCtx, arpc) | ||
// if the send loop exits on its own without the context | ||
// canceling, timeout the logger and force it to recreate. | ||
if err != nil && ctx.Err() == nil { | ||
l.loggerTimeout(log.agentToken) | ||
} | ||
}() | ||
|
||
closeTimer := l.clock.AfterFunc(l.loggerTTL, func() { | ||
logger.Info(ctx, "logger timeout firing") | ||
l.loggerTimeout(log.agentToken) | ||
}) | ||
lifecycle := agentLoggerLifecycle{ | ||
scriptLogger: sl, | ||
close: func() { | ||
// We could be stopping for reasons other than the timeout. If | ||
// so, stop the timer. | ||
closeTimer.Stop() | ||
defer gracefulCancel() | ||
timeout := l.clock.AfterFunc(5*time.Second, gracefulCancel) | ||
defer timeout.Stop() | ||
logger.Info(ctx, "logger closing") | ||
|
||
if err := sl.Flush(gracefulCtx); err != nil { | ||
// ctx err | ||
logger.Warn(gracefulCtx, "timeout reached while flushing") | ||
return | ||
} | ||
|
||
if err := ls.WaitUntilEmpty(gracefulCtx); err != nil { | ||
// ctx err | ||
logger.Warn(gracefulCtx, "timeout reached while waiting for log queue to empty") | ||
} | ||
|
||
_ = arpc.DRPCConn().Close() | ||
client.SDK.HTTPClient.CloseIdleConnections() | ||
}, | ||
} | ||
lifecycle.closeTimer = closeTimer | ||
l.loggers[log.agentToken] = lifecycle | ||
lgr = lifecycle | ||
l.loggers[log.agentToken] = lgr | ||
} | ||
|
||
lgr.resetCloseTimer(l.loggerTTL) | ||
_ = lgr.scriptLogger.Send(ctx, queuedLogs...) | ||
if len(queuedLogs) == 0 { | ||
return | ||
} | ||
if err := lgr.scriptLogger.Send(ctx, queuedLogs...); err != nil { | ||
l.scheduleRetry(ctx, log.agentToken) | ||
return | ||
} | ||
l.clearRetryLocked(log.agentToken) | ||
l.logCache.delete(log.agentToken) | ||
} | ||
|
||
|
@@ -516,8 +558,9 @@ func (l *logQueuer) processDelete(log agentLog) { | |
lgr, ok := l.loggers[log.agentToken] | ||
if ok { | ||
delete(l.loggers, log.agentToken) | ||
|
||
} | ||
l.clearRetryLocked(log.agentToken) | ||
l.logCache.delete(log.agentToken) | ||
l.mu.Unlock() | ||
|
||
if ok { | ||
|
@@ -549,6 +592,81 @@ func (l *agentLoggerLifecycle) resetCloseTimer(ttl time.Duration) { | |
} | ||
} | ||
|
||
// retryState tracks exponential backoff for an agent token. | ||
type retryState struct { | ||
delay time.Duration | ||
timer *quartz.Timer | ||
retryCount int | ||
exhausted bool // prevent retry state recreation after max retries | ||
} | ||
|
||
func (l *logQueuer) scheduleRetry(ctx context.Context, token string) { | ||
if l.retries == nil { | ||
l.retries = make(map[string]*retryState) | ||
} | ||
|
||
rs := l.retries[token] | ||
|
||
if rs != nil && rs.exhausted { | ||
return | ||
} | ||
|
||
if rs == nil { | ||
rs = &retryState{delay: time.Second, retryCount: 0, exhausted: false} | ||
l.retries[token] = rs | ||
} | ||
|
||
rs.retryCount++ | ||
|
||
// If we've reached the max retries, clear the retry state and delete the log cache. | ||
if rs.retryCount >= l.maxRetries { | ||
l.logger.Error(ctx, "max retries exceeded", | ||
slog.F("retryCount", rs.retryCount), | ||
slog.F("maxRetries", l.maxRetries)) | ||
rs.exhausted = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be kept in memory forever now right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will be kept in memory until the pod is deleted. https://github.com/coder/coder-logstream-kube/blob/kacpersaw/feat-log-queue/logger.go#L562 |
||
if rs.timer != nil { | ||
rs.timer.Stop() | ||
rs.timer = nil | ||
} | ||
l.logCache.delete(token) | ||
return | ||
} | ||
|
||
if rs.timer != nil { | ||
return | ||
} | ||
|
||
l.logger.Info(ctx, "scheduling retry", | ||
slog.F("delay", rs.delay.String()), | ||
slog.F("retryCount", rs.retryCount)) | ||
|
||
rs.timer = l.clock.AfterFunc(rs.delay, func() { | ||
l.mu.Lock() | ||
defer l.mu.Unlock() | ||
|
||
if cur := l.retries[token]; cur != nil && !cur.exhausted { | ||
cur.timer = nil | ||
l.q <- agentLog{op: opLog, agentToken: token} | ||
} | ||
}) | ||
|
||
rs.delay *= 2 | ||
if rs.delay > 30*time.Second { | ||
rs.delay = 30 * time.Second | ||
} | ||
} | ||
|
||
// clearRetryLocked clears the retry state for the given token. | ||
// The caller must hold the mutex lock. | ||
func (l *logQueuer) clearRetryLocked(token string) { | ||
if rs := l.retries[token]; rs != nil { | ||
if rs.timer != nil { | ||
rs.timer.Stop() | ||
} | ||
delete(l.retries, token) | ||
} | ||
} | ||
|
||
func newColor(value ...color.Attribute) *color.Color { | ||
c := color.New(value...) | ||
c.EnableColor() | ||
|
@@ -572,3 +690,15 @@ func (l *logCache) push(log agentLog) []agentsdk.Log { | |
func (l *logCache) delete(token string) { | ||
delete(l.logs, token) | ||
} | ||
|
||
func (l *logCache) get(token string) []agentsdk.Log { | ||
logs, ok := l.logs[token] | ||
if !ok { | ||
return nil | ||
} | ||
return logs | ||
} | ||
|
||
func isAgentLogEmpty(log agentLog) bool { | ||
return log.resourceName == "" && log.log.Output == "" && log.log.CreatedAt.IsZero() | ||
} |
Uh oh!
There was an error while loading. Please reload this page.