-
Notifications
You must be signed in to change notification settings - Fork 4.6k
credentials: implement file-based JWT Call Credentials (part 1 for A97) #8431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 16 commits
6e63daa
3268ea5
b18a1f5
eb391af
d43893a
167b86e
439d28c
b36d4b6
26e0451
da2de8c
51ce34c
f87f1f2
15dd057
54cbbcb
9c5035d
ec915dc
1d95fa2
a797ed9
fd388d1
790a2d9
6713190
3f563eb
a38573b
12fedd5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* | ||
* Copyright 2025 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
// Package jwt implements JWT token file-based call credentials. | ||
// | ||
// This package provides support for A97 JWT Call Credentials, allowing gRPC | ||
// clients to authenticate using JWT tokens read from files. While originally | ||
// designed for xDS environments, these credentials are general-purpose. | ||
// | ||
// The credentials can be used directly in gRPC clients or configured via xDS. | ||
// | ||
// # Token Requirements | ||
// | ||
// JWT tokens must: | ||
// - Be valid, well-formed JWT tokens with header, payload, and signature | ||
// - Include an "exp" (expiration) claim | ||
// - Be readable from the specified file path | ||
// | ||
// # Considerations | ||
// | ||
// - Tokens are cached until expiration to avoid excessive file I/O | ||
// - Transport security is required (RequireTransportSecurity returns true) | ||
// - Errors in reading tokens or parsing JWTs will result in RPC UNAVAILALBE or | ||
// UNAUTHENTICATED errors | ||
// - These errors are cached and retried with exponential backoff. | ||
// | ||
// This implementation is originally intended for use in service mesh | ||
// environments like Istio where JWT tokens are provisioned and rotated by the | ||
// infrastructure. | ||
// | ||
// # Experimental | ||
// | ||
// Notice: All APIs in this package are experimental and may be removed in a | ||
// later release. | ||
package jwt | ||
easwars marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,281 @@ | ||
/* | ||
* | ||
* Copyright 2025 gRPC authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* | ||
*/ | ||
|
||
// Package jwt implements gRPC credentials using JWT tokens from files. | ||
package jwt | ||
|
||
import ( | ||
"context" | ||
"encoding/base64" | ||
"encoding/json" | ||
"fmt" | ||
"os" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"google.golang.org/grpc/codes" | ||
"google.golang.org/grpc/credentials" | ||
"google.golang.org/grpc/internal/backoff" | ||
"google.golang.org/grpc/status" | ||
) | ||
|
||
// jwtClaims represents the JWT claims structure for extracting expiration time. | ||
type jwtClaims struct { | ||
Exp int64 `json:"exp"` | ||
} | ||
|
||
// jwtTokenFileCallCreds provides JWT token-based PerRPCCredentials that reads | ||
// tokens from a file. | ||
// This implementation follows the A97 JWT Call Credentials specification. | ||
type jwtTokenFileCallCreds struct { | ||
tokenFilePath string | ||
backoffStrategy backoff.Strategy // Strategy when error occurs | ||
|
||
// Cached token data | ||
mu sync.RWMutex | ||
cachedToken string | ||
cachedExpiration time.Time // Slightly less than actual expiration time | ||
cachedError error // Error from last failed attempt | ||
retryAttempt int // Current retry attempt number | ||
nextRetryTime time.Time // When next retry is allowed | ||
|
||
// Pre-emptive refresh mutex | ||
refreshMu sync.Mutex | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The use of two locks complicates things significantly with lock ordering and the possibility of deadlocks. Can we avoid it either by using a single lock or by using other primitives like channels to signal that a pre-emptive refresh is ongoing? Your thoughts please. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think here it's not too bad wrt how the two locks are used. Onyl the preemptive goroutines are attempting to get both locks whilst the rest of the methods are only dealing with As an alternative, the The code is essentially: func (c *jwtTokenFileCallCreds) triggerPreemptiveRefresh() {
select {
case c.refreshCh <- struct{}{}:
default:
// Channel is full, refresh is already pending.
}
}
func (c *jwtTokenFileCallCreds) refreshWorker() {
for {
select {
case <-c.refreshCh:
// Re-check if refresh is still needed.
c.mu.RLock()
stillNeeded := c.needsPreemptiveRefreshLocked()
c.mu.RUnlock()
if stillNeeded {
// Force refresh even if the token is still valid.
_, _ = c.refreshTokenSync(true)
}
case <-c.closeCh:
return
}
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a valid point. Even if it is a single goroutine, we cannot leak it, because our tests check for leaked goroutines and therefore unit tests will fail. Thanks for trying though. |
||
} | ||
|
||
// NewTokenFileCallCredentials creates PerRPCCredentials that reads JWT tokens | ||
// from the specified file path. | ||
func NewTokenFileCallCredentials(tokenFilePath string) (credentials.PerRPCCredentials, error) { | ||
if tokenFilePath == "" { | ||
return nil, fmt.Errorf("tokenFilePath cannot be empty") | ||
} | ||
|
||
return &jwtTokenFileCallCreds{ | ||
tokenFilePath: tokenFilePath, | ||
backoffStrategy: backoff.DefaultExponential, | ||
}, nil | ||
} | ||
|
||
// GetRequestMetadata gets the current request metadata, refreshing tokens if | ||
// required. This implementation follows the PerRPCCredentials interface. The | ||
// tokens will get automatically refreshed if they are about to expire or if | ||
// they haven't been loaded successfully yet. | ||
// If it's not possible to extract a token from the file, UNAVAILABLE is | ||
// returned. | ||
// If the token is extracted but invalid, then UNAUTHENTICATED is returned. | ||
// If errors are encoutered, a backoff is applied before retrying. | ||
func (c *jwtTokenFileCallCreds) GetRequestMetadata(ctx context.Context, _ ...string) (map[string]string, error) { | ||
ri, _ := credentials.RequestInfoFromContext(ctx) | ||
if err := credentials.CheckSecurityLevel(ri.AuthInfo, credentials.PrivacyAndIntegrity); err != nil { | ||
return nil, fmt.Errorf("unable to transfer JWT token file PerRPCCredentials: %v", err) | ||
} | ||
|
||
// This may be delayed if the token needs to be refreshed from file. | ||
token, err := c.getToken() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return map[string]string{ | ||
"authorization": "Bearer " + token, | ||
}, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. btw @easwars , should I be concerned with the allocations here? I could cache the string (I don't think it's safe to cache the map itself) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI, I've cached it just in case |
||
} | ||
|
||
// RequireTransportSecurity indicates whether the credentials requires | ||
// transport security. | ||
func (c *jwtTokenFileCallCreds) RequireTransportSecurity() bool { | ||
return true | ||
} | ||
easwars marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// getToken returns a valid JWT token, reading from file if necessary. | ||
// Implements pre-emptive refresh and caches errors with backoff. | ||
func (c *jwtTokenFileCallCreds) getToken() (string, error) { | ||
c.mu.RLock() | ||
|
||
if c.isTokenValidLocked() { | ||
token := c.cachedToken | ||
shouldRefresh := c.needsPreemptiveRefreshLocked() | ||
c.mu.RUnlock() | ||
|
||
if shouldRefresh { | ||
c.triggerPreemptiveRefresh() | ||
} | ||
return token, nil | ||
} | ||
|
||
// If still within backoff period, return cached error to avoid repeated | ||
// file reads. | ||
if c.cachedError != nil && time.Now().Before(c.nextRetryTime) { | ||
err := c.cachedError | ||
c.mu.RUnlock() | ||
return "", err | ||
} | ||
|
||
c.mu.RUnlock() | ||
// Token is expired or missing or the retry backoff period has expired. | ||
// So we should refresh synchronously. | ||
// NOTE: refreshTokenSync itself acquires the write lock. | ||
return c.refreshTokenSync(false) | ||
} | ||
|
||
// isTokenValidLocked checks if the cached token is still valid. | ||
// Caller must hold c.mu.RLock(). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please consider adding a Yeah, we used to do a lot of what you have done (basically express the constraint in the comment). But of late, we have been trying to the pattern that I've suggested instead and we feel it works better. Thanks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice, I hadn't seen that pattern. Makes it far less error-prone! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Btw, have you considered having There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I personally haven't considered it so far, because I tend to avoid RWLocks after being bitten by them once (in a bad way). But, I'm not opposed to it and it does sound more explicit when using RWLocks. |
||
func (c *jwtTokenFileCallCreds) isTokenValidLocked() bool { | ||
if c.cachedToken == "" { | ||
return false | ||
} | ||
return c.cachedExpiration.After(time.Now()) | ||
} | ||
|
||
// needsPreemptiveRefreshLocked checks if a pre-emptive refresh should be | ||
// triggered. | ||
// Returns true if the cached token is valid but expires within 1 minute. | ||
// We only trigger pre-emptive refresh for valid tokens - if the token is | ||
// invalid or expired, the next RPC will handle synchronous refresh instead. | ||
// Caller must hold c.mu.RLock(). | ||
func (c *jwtTokenFileCallCreds) needsPreemptiveRefreshLocked() bool { | ||
return c.isTokenValidLocked() && time.Until(c.cachedExpiration) < time.Minute | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make a const for the one minute pre-emptive refresh trigger? |
||
} | ||
|
||
// triggerPreemptiveRefresh starts a background refresh if needed. | ||
// Multiple concurrent calls are safe - only one refresh will run at a time. | ||
// The refresh runs in a separate goroutine and does not block the caller. | ||
func (c *jwtTokenFileCallCreds) triggerPreemptiveRefresh() { | ||
go func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to not have to spawn goroutines when there is already a pre-emptive refresh request in progress, instead of spawning a goroutine and then realizing the same and not doing anything in the goroutine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep. This can be addressed in the alternative I have proposed in the other comment by having a single worker goroutine (at the risk of that leaking if Close() is not called). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ack |
||
c.refreshMu.Lock() | ||
defer c.refreshMu.Unlock() | ||
|
||
// Re-check if refresh is still needed under mutex. | ||
c.mu.RLock() | ||
stillNeeded := c.needsPreemptiveRefreshLocked() | ||
c.mu.RUnlock() | ||
|
||
if !stillNeeded { | ||
return // Another goroutine already refreshed or token expired. | ||
} | ||
|
||
// Force refresh to read new token even if current one is still valid. | ||
_, _ = c.refreshTokenSync(true) | ||
}() | ||
} | ||
|
||
// refreshTokenSync reads a new token from the file and updates the cache. If | ||
// forceRefresh is true, bypasses the validity check of the currently | ||
// cached token and always reads from file. | ||
// This is used for pre-emptive refresh to ensure new tokens are loaded even | ||
// when the cached token is still valid. If forceRefresh is false, skips | ||
// file read when cached token is still valid, optimizing concurrent synchronous | ||
// refresh calls where one RPC may have already updated the cache while another | ||
// was waiting on the lock. | ||
func (c *jwtTokenFileCallCreds) refreshTokenSync(forceRefresh bool) (string, error) { | ||
c.mu.Lock() | ||
defer c.mu.Unlock() | ||
|
||
// Double-check under write lock but skip if preemptive refresh is | ||
// requested. | ||
if !forceRefresh && c.isTokenValidLocked() { | ||
return c.cachedToken, nil | ||
} | ||
|
||
tokenBytes, err := os.ReadFile(c.tokenFilePath) | ||
if err != nil { | ||
err = status.Errorf(codes.Unavailable, "failed to read token file %q: %v", c.tokenFilePath, err) | ||
c.setErrorWithBackoffLocked(err) | ||
return "", err | ||
} | ||
|
||
token := strings.TrimSpace(string(tokenBytes)) | ||
if token == "" { | ||
err := status.Errorf(codes.Unavailable, "token file %q is empty", c.tokenFilePath) | ||
c.setErrorWithBackoffLocked(err) | ||
return "", err | ||
} | ||
|
||
// Parse JWT to extract expiration. | ||
exp, err := c.extractExpiration(token) | ||
if err != nil { | ||
err = status.Errorf(codes.Unauthenticated, "failed to parse JWT from token file %q: %v", c.tokenFilePath, err) | ||
c.setErrorWithBackoffLocked(err) | ||
return "", err | ||
} | ||
|
||
// Success - clear any cached error and backoff state, update token cache. | ||
c.clearErrorAndBackoffLocked() | ||
c.cachedToken = token | ||
// Per RFC A97: consider token invalid if it expires within the next 30 | ||
// seconds to accommodate for clock skew and server processing time. | ||
c.cachedExpiration = exp.Add(-30 * time.Second) | ||
|
||
return token, nil | ||
} | ||
|
||
// extractExpiration parses the JWT token to extract the expiration time. | ||
func (c *jwtTokenFileCallCreds) extractExpiration(token string) (time.Time, error) { | ||
parts := strings.Split(token, ".") | ||
if len(parts) != 3 { | ||
return time.Time{}, fmt.Errorf("invalid JWT format: expected 3 parts, got %d", len(parts)) | ||
} | ||
|
||
payload := parts[1] | ||
// Add padding if necessary for base64 decoding. | ||
if m := len(payload) % 4; m != 0 { | ||
payload += strings.Repeat("=", 4-m) | ||
} | ||
|
||
payloadBytes, err := base64.URLEncoding.DecodeString(payload) | ||
if err != nil { | ||
return time.Time{}, fmt.Errorf("failed to decode JWT payload: %v", err) | ||
} | ||
|
||
var claims jwtClaims | ||
if err := json.Unmarshal(payloadBytes, &claims); err != nil { | ||
return time.Time{}, fmt.Errorf("failed to unmarshal JWT claims: %v", err) | ||
} | ||
|
||
if claims.Exp == 0 { | ||
return time.Time{}, fmt.Errorf("JWT token has no expiration claim") | ||
} | ||
|
||
expTime := time.Unix(claims.Exp, 0) | ||
|
||
// Check if token is already expired. | ||
if expTime.Before(time.Now()) { | ||
return time.Time{}, fmt.Errorf("JWT token is expired") | ||
} | ||
|
||
return expTime, nil | ||
} | ||
|
||
// setErrorWithBackoffLocked caches an error and calculates the next retry time | ||
// using exponential backoff. | ||
// Caller must hold c.mu write lock. | ||
func (c *jwtTokenFileCallCreds) setErrorWithBackoffLocked(err error) { | ||
c.cachedError = err | ||
c.retryAttempt++ | ||
backoffDelay := c.backoffStrategy.Backoff(c.retryAttempt - 1) | ||
c.nextRetryTime = time.Now().Add(backoffDelay) | ||
} | ||
|
||
// clearErrorAndBackoffLocked clears the cached error and resets backoff state. | ||
// Caller must hold c.mu write lock. | ||
func (c *jwtTokenFileCallCreds) clearErrorAndBackoffLocked() { | ||
c.cachedError = nil | ||
c.retryAttempt = 0 | ||
c.nextRetryTime = time.Time{} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It looks like these two bullet points can/should be merged?