Skip to content

Commit 6d7b656

Browse files
support aws s3 express zone behavior
1 parent 3dbe5a3 commit 6d7b656

10 files changed

+1001
-71
lines changed

api.go

+29-10
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@ import (
4040

4141
md5simd "github.com/minio/md5-simd"
4242
"github.com/minio/minio-go/v7/pkg/credentials"
43+
"github.com/minio/minio-go/v7/pkg/kvcache"
4344
"github.com/minio/minio-go/v7/pkg/s3utils"
4445
"github.com/minio/minio-go/v7/pkg/signer"
46+
"github.com/minio/minio-go/v7/pkg/singleflight"
4547
"golang.org/x/net/publicsuffix"
4648
)
4749

@@ -68,9 +70,11 @@ type Client struct {
6870
secure bool
6971

7072
// Needs allocation.
71-
httpClient *http.Client
72-
httpTrace *httptrace.ClientTrace
73-
bucketLocCache *bucketLocationCache
73+
httpClient *http.Client
74+
httpTrace *httptrace.ClientTrace
75+
bucketLocCache *kvcache.Cache[string, string]
76+
bucketSessionCache *kvcache.Cache[string, credentials.Value]
77+
credsGroup singleflight.Group[string, credentials.Value]
7478

7579
// Advanced functionality.
7680
isTraceEnabled bool
@@ -280,8 +284,11 @@ func privateNew(endpoint string, opts *Options) (*Client, error) {
280284
}
281285
clnt.region = opts.Region
282286

283-
// Instantiate bucket location cache.
284-
clnt.bucketLocCache = newBucketLocationCache()
287+
// Initialize bucket region cache.
288+
clnt.bucketLocCache = &kvcache.Cache[string, string]{}
289+
290+
// Initialize bucket session cache (s3 express).
291+
clnt.bucketSessionCache = &kvcache.Cache[string, credentials.Value]{}
285292

286293
// Introduce a new locked random seed.
287294
clnt.random = rand.New(&lockedRandSource{src: rand.NewSource(time.Now().UTC().UnixNano())})
@@ -818,14 +825,22 @@ func (c *Client) newRequest(ctx context.Context, method string, metadata request
818825
ctx = httptrace.WithClientTrace(ctx, c.httpTrace)
819826
}
820827

821-
// Initialize a new HTTP request for the method.
822-
req, err = http.NewRequestWithContext(ctx, method, targetURL.String(), nil)
828+
// make sure to de-dup calls to credential services, this reduces
829+
// the overall load to the endpoint generating credential service.
830+
value, err, _ := c.credsGroup.Do(metadata.bucketName, func() (credentials.Value, error) {
831+
if s3utils.IsS3ExpressBucket(metadata.bucketName) {
832+
return c.CreateSession(ctx, metadata.bucketName, SessionReadWrite)
833+
} else {
834+
// Get credentials from the configured credentials provider.
835+
return c.credsProvider.GetWithContext(c.CredContext())
836+
}
837+
})
823838
if err != nil {
824839
return nil, err
825840
}
826841

827-
// Get credentials from the configured credentials provider.
828-
value, err := c.credsProvider.GetWithContext(c.CredContext())
842+
// Initialize a new HTTP request for the method.
843+
req, err = http.NewRequestWithContext(ctx, method, targetURL.String(), nil)
829844
if err != nil {
830845
return nil, err
831846
}
@@ -837,6 +852,10 @@ func (c *Client) newRequest(ctx context.Context, method string, metadata request
837852
sessionToken = value.SessionToken
838853
)
839854

855+
if s3utils.IsS3ExpressBucket(metadata.bucketName) {
856+
req.Header.Set("x-amz-s3session-token", sessionToken)
857+
}
858+
840859
// Custom signer set then override the behavior.
841860
if c.overrideSignerType != credentials.SignatureDefault {
842861
signerType = c.overrideSignerType
@@ -971,7 +990,7 @@ func (c *Client) makeTargetURL(bucketName, objectName, bucketLocation string, is
971990
host = c.s3AccelerateEndpoint
972991
} else {
973992
// Do not change the host if the endpoint URL is a FIPS S3 endpoint or a S3 PrivateLink interface endpoint
974-
if !s3utils.IsAmazonFIPSEndpoint(*c.endpointURL) && !s3utils.IsAmazonPrivateLinkEndpoint(*c.endpointURL) {
993+
if !s3utils.IsAmazonFIPSEndpoint(*c.endpointURL) && !s3utils.IsAmazonPrivateLinkEndpoint(*c.endpointURL) && !s3utils.IsS3ExpressBucket(bucketName) {
975994
// Fetch new host based on the bucket location.
976995
host = getS3Endpoint(bucketLocation, c.s3DualstackEnabled)
977996
}

bucket-cache.go

-42
Original file line numberDiff line numberDiff line change
@@ -23,54 +23,12 @@ import (
2323
"net/http"
2424
"net/url"
2525
"path"
26-
"sync"
2726

2827
"github.com/minio/minio-go/v7/pkg/credentials"
2928
"github.com/minio/minio-go/v7/pkg/s3utils"
3029
"github.com/minio/minio-go/v7/pkg/signer"
3130
)
3231

33-
// bucketLocationCache - Provides simple mechanism to hold bucket
34-
// locations in memory.
35-
type bucketLocationCache struct {
36-
// mutex is used for handling the concurrent
37-
// read/write requests for cache.
38-
sync.RWMutex
39-
40-
// items holds the cached bucket locations.
41-
items map[string]string
42-
}
43-
44-
// newBucketLocationCache - Provides a new bucket location cache to be
45-
// used internally with the client object.
46-
func newBucketLocationCache() *bucketLocationCache {
47-
return &bucketLocationCache{
48-
items: make(map[string]string),
49-
}
50-
}
51-
52-
// Get - Returns a value of a given key if it exists.
53-
func (r *bucketLocationCache) Get(bucketName string) (location string, ok bool) {
54-
r.RLock()
55-
defer r.RUnlock()
56-
location, ok = r.items[bucketName]
57-
return
58-
}
59-
60-
// Set - Will persist a value into cache.
61-
func (r *bucketLocationCache) Set(bucketName, location string) {
62-
r.Lock()
63-
defer r.Unlock()
64-
r.items[bucketName] = location
65-
}
66-
67-
// Delete - Deletes a bucket name from cache.
68-
func (r *bucketLocationCache) Delete(bucketName string) {
69-
r.Lock()
70-
defer r.Unlock()
71-
delete(r.items, bucketName)
72-
}
73-
7432
// GetBucketLocation - get location for the bucket name from location cache, if not
7533
// fetch freshly by making a new request.
7634
func (c *Client) GetBucketLocation(ctx context.Context, bucketName string) (string, error) {

bucket-cache_test.go

+3-14
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,13 @@ import (
2929
"testing"
3030

3131
"github.com/minio/minio-go/v7/pkg/credentials"
32+
"github.com/minio/minio-go/v7/pkg/kvcache"
3233
"github.com/minio/minio-go/v7/pkg/signer"
3334
)
3435

35-
// Test validates `newBucketLocationCache`.
36-
func TestNewBucketLocationCache(t *testing.T) {
37-
expectedBucketLocationcache := &bucketLocationCache{
38-
items: make(map[string]string),
39-
}
40-
actualBucketLocationCache := newBucketLocationCache()
41-
42-
if !reflect.DeepEqual(actualBucketLocationCache, expectedBucketLocationcache) {
43-
t.Errorf("Unexpected return value")
44-
}
45-
}
46-
47-
// Tests validate bucketLocationCache operations.
36+
// Tests validate kvCache operations.
4837
func TestBucketLocationCacheOps(t *testing.T) {
49-
testBucketLocationCache := newBucketLocationCache()
38+
testBucketLocationCache := &kvcache.Cache[string, string]{}
5039
expectedBucketName := "minio-bucket"
5140
expectedLocation := "us-east-1"
5241
testBucketLocationCache.Set(expectedBucketName, expectedLocation)

create-session.go

+175
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* MinIO Go Library for Amazon S3 Compatible Cloud Storage
3+
* Copyright 2015-2025 MinIO, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package minio
19+
20+
import (
21+
"context"
22+
"encoding/xml"
23+
"errors"
24+
"net"
25+
"net/http"
26+
"net/url"
27+
"path"
28+
"time"
29+
30+
"github.com/minio/minio-go/v7/pkg/credentials"
31+
"github.com/minio/minio-go/v7/pkg/s3utils"
32+
"github.com/minio/minio-go/v7/pkg/signer"
33+
)
34+
35+
// SessionMode - session mode type there are only two types
36+
type SessionMode string
37+
38+
// Session constants
39+
const (
40+
SessionReadWrite SessionMode = "ReadWrite"
41+
SessionReadOnly SessionMode = "ReadOnly"
42+
)
43+
44+
type createSessionResult struct {
45+
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CreateSessionResult"`
46+
Credentials struct {
47+
AccessKey string `xml:"AccessKeyId" json:"accessKey,omitempty"`
48+
SecretKey string `xml:"SecretAccessKey" json:"secretKey,omitempty"`
49+
SessionToken string `xml:"SessionToken" json:"sessionToken,omitempty"`
50+
Expiration time.Time `xml:"Expiration" json:"expiration,omitempty"`
51+
} `xml:",omitempty"`
52+
}
53+
54+
// CreateSession - https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateSession.html
55+
// the returning credentials may be cached depending on the expiration of the original
56+
// credential, credentials will get renewed 10 secs earlier than when its gonna expire
57+
// allowing for some leeway in the renewal process.
58+
func (c *Client) CreateSession(ctx context.Context, bucketName string, sessionMode SessionMode) (cred credentials.Value, err error) {
59+
if err := s3utils.CheckValidBucketNameS3Express(bucketName); err != nil {
60+
return credentials.Value{}, err
61+
}
62+
63+
v, ok := c.bucketSessionCache.Get(bucketName)
64+
if ok && v.Expiration.After(time.Now().Add(10*time.Second)) {
65+
// Verify if the credentials will not expire
66+
// in another 10 seconds, if not we renew it again.
67+
return v, nil
68+
}
69+
70+
req, err := c.createSessionRequest(ctx, bucketName, sessionMode)
71+
if err != nil {
72+
return credentials.Value{}, err
73+
}
74+
75+
resp, err := c.do(req)
76+
defer closeResponse(resp)
77+
if err != nil {
78+
return credentials.Value{}, err
79+
}
80+
81+
credSession := &createSessionResult{}
82+
dec := xml.NewDecoder(resp.Body)
83+
if err = dec.Decode(credSession); err != nil {
84+
return credentials.Value{}, err
85+
}
86+
87+
defer c.bucketSessionCache.Set(bucketName, cred)
88+
89+
return credentials.Value{
90+
AccessKeyID: credSession.Credentials.AccessKey,
91+
SecretAccessKey: credSession.Credentials.SecretKey,
92+
SessionToken: credSession.Credentials.SessionToken,
93+
Expiration: credSession.Credentials.Expiration,
94+
}, nil
95+
}
96+
97+
// createSessionRequest - Wrapper creates a new CreateSession request.
98+
func (c *Client) createSessionRequest(ctx context.Context, bucketName string, sessionMode SessionMode) (*http.Request, error) {
99+
// Set location query.
100+
urlValues := make(url.Values)
101+
urlValues.Set("session", "")
102+
103+
// Set get bucket location always as path style.
104+
targetURL := *c.endpointURL
105+
106+
// as it works in makeTargetURL method from api.go file
107+
if h, p, err := net.SplitHostPort(targetURL.Host); err == nil {
108+
if targetURL.Scheme == "http" && p == "80" || targetURL.Scheme == "https" && p == "443" {
109+
targetURL.Host = h
110+
if ip := net.ParseIP(h); ip != nil && ip.To16() != nil {
111+
targetURL.Host = "[" + h + "]"
112+
}
113+
}
114+
}
115+
116+
isVirtualStyle := c.isVirtualHostStyleRequest(targetURL, bucketName)
117+
118+
var urlStr string
119+
120+
if isVirtualStyle {
121+
urlStr = c.endpointURL.Scheme + "://" + bucketName + "." + targetURL.Host + "/?session"
122+
} else {
123+
targetURL.Path = path.Join(bucketName, "") + "/"
124+
targetURL.RawQuery = urlValues.Encode()
125+
urlStr = targetURL.String()
126+
}
127+
128+
// Get a new HTTP request for the method.
129+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil)
130+
if err != nil {
131+
return nil, err
132+
}
133+
134+
// Set UserAgent for the request.
135+
c.setUserAgent(req)
136+
137+
// Get credentials from the configured credentials provider.
138+
value, err := c.credsProvider.GetWithContext(c.CredContext())
139+
if err != nil {
140+
return nil, err
141+
}
142+
143+
var (
144+
signerType = value.SignerType
145+
accessKeyID = value.AccessKeyID
146+
secretAccessKey = value.SecretAccessKey
147+
sessionToken = value.SessionToken
148+
)
149+
150+
// Custom signer set then override the behavior.
151+
if c.overrideSignerType != credentials.SignatureDefault {
152+
signerType = c.overrideSignerType
153+
}
154+
155+
// If signerType returned by credentials helper is anonymous,
156+
// then do not sign regardless of signerType override.
157+
if value.SignerType == credentials.SignatureAnonymous {
158+
signerType = credentials.SignatureAnonymous
159+
}
160+
161+
if signerType.IsAnonymous() || signerType.IsV2() {
162+
return req, errors.New("Only signature v4 is supported for CreateSession() API")
163+
}
164+
165+
// Set sha256 sum for signature calculation only with signature version '4'.
166+
contentSha256 := emptySHA256Hex
167+
if c.secure {
168+
contentSha256 = unsignedPayload
169+
}
170+
171+
req.Header.Set("X-Amz-Content-Sha256", contentSha256)
172+
req.Header.Set("x-amz-create-session-mode", string(sessionMode))
173+
req = signer.SignV4(*req, accessKeyID, secretAccessKey, sessionToken, c.region)
174+
return req, nil
175+
}

0 commit comments

Comments
 (0)