Skip to content

Commit 2e9c863

Browse files
authored
Add EXTERNAL SASL configuration (#209)
* Add EXTERNAL SASL configuration Fixes #207 Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
1 parent 4e22a3d commit 2e9c863

File tree

8 files changed

+104
-21
lines changed

8 files changed

+104
-21
lines changed

README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
2121
* [Multi hosts](#multi-hosts)
2222
* [Load Balancer](#load-balancer)
2323
* [TLS](#tls)
24+
* [Sasl Mechanisms](#sasl-mechanisms)
2425
* [Streams](#streams)
2526
* [Statistics](#streams-statistics)
2627
* [Publish messages](#publish-messages)
@@ -175,6 +176,29 @@ env, err := stream.NewEnvironment(
175176
)
176177
```
177178

179+
### Sasl Mechanisms
180+
181+
To configure SASL you need to set the `SaslMechanism` parameter `Environment.SetSaslConfiguration`:
182+
```golang
183+
cfg := new(tls.Config)
184+
cfg.ServerName = "my_server_name"
185+
cfg.RootCAs = x509.NewCertPool()
186+
187+
if ca, err := os.ReadFile("certs/ca_certificate.pem"); err == nil {
188+
cfg.RootCAs.AppendCertsFromPEM(ca)
189+
}
190+
191+
if cert, err := tls.LoadX509KeyPair("certs/client/cert.pem", "certs/client/key.pem"); err == nil {
192+
cfg.Certificates = append(cfg.Certificates, cert)
193+
}
194+
195+
env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().
196+
SetUri("rabbitmq-stream+tls://my_server_name:5551/").
197+
IsTLS(true).
198+
SetSaslConfiguration(stream.SaslConfigurationExternal). // SASL EXTERNAL
199+
SetTLSConfig(cfg))
200+
```
201+
178202
### Streams
179203

180204
To define streams you need to use the the `environment` interfaces `DeclareStream` and `DeleteStream`.

go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
1111
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
1212
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
1313
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
14+
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I=
1415
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
1516
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
1617
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
@@ -32,6 +33,7 @@ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8
3233
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
3334
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
3435
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
36+
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
3537
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
3638
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
3739
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -151,6 +153,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
151153
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
152154
golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E=
153155
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
156+
golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE=
154157
golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA=
155158
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
156159
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

pkg/amqp/decode.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ type unmarshaler interface {
4242
//
4343
// If i implements unmarshaler, i.unmarshal() will be called.
4444
//
45-
// Pointers to primitive types will be decoded via the appropriate read[Type] function.
45+
// Pointers to primitive types will be decoded via the appropriate read[type] function.
4646
//
47-
// If i is a pointer to a pointer (**Type), it will be dereferenced and a new instance
47+
// If it is a pointer to a pointer (**Type), it will be dereferenced and a new instance
4848
// of (*Type) is allocated via reflection.
4949
//
5050
// Common map types (map[string]string, map[Symbol]interface{}, and

pkg/stream/client.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,22 @@ import (
1414
"time"
1515
)
1616

17+
// SaslConfiguration see
18+
//
19+
// SaslConfigurationPlain = "PLAIN"
20+
// SaslConfigurationExternal = "EXTERNAL"
21+
//
22+
23+
type SaslConfiguration struct {
24+
Mechanism string
25+
}
26+
27+
func newSaslConfigurationDefault() *SaslConfiguration {
28+
return &SaslConfiguration{
29+
Mechanism: SaslConfigurationPlain,
30+
}
31+
}
32+
1733
type TuneState struct {
1834
requestedMaxFrameSize int
1935
requestedHeartbeat int
@@ -42,13 +58,14 @@ type Client struct {
4258
coordinator *Coordinator
4359
broker *Broker
4460
tcpParameters *TCPParameters
61+
saslConfiguration *SaslConfiguration
4562

4663
mutex *sync.Mutex
4764
metadataListener metadataListener
4865
lastHeartBeat HeartBeat
4966
}
5067

51-
func newClient(connectionName string, broker *Broker, tcpParameters *TCPParameters) *Client {
68+
func newClient(connectionName string, broker *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration) *Client {
5269
var clientBroker = broker
5370
if broker == nil {
5471
clientBroker = newBrokerDefault()
@@ -57,10 +74,15 @@ func newClient(connectionName string, broker *Broker, tcpParameters *TCPParamete
5774
tcpParameters = newTCPParameterDefault()
5875
}
5976

77+
if saslConfiguration == nil {
78+
saslConfiguration = newSaslConfigurationDefault()
79+
}
80+
6081
c := &Client{
6182
coordinator: NewCoordinator(),
6283
broker: clientBroker,
6384
tcpParameters: tcpParameters,
85+
saslConfiguration: saslConfiguration,
6486
destructor: &sync.Once{},
6587
mutex: &sync.Mutex{},
6688
clientProperties: ClientProperties{items: make(map[string]string)},
@@ -224,10 +246,15 @@ func (c *Client) authenticate(user string, password string) error {
224246
}
225247
saslMechanism := ""
226248
for i := 0; i < len(saslMechanisms); i++ {
227-
if saslMechanisms[i] == "PLAIN" {
228-
saslMechanism = "PLAIN"
249+
if saslMechanisms[i] == c.saslConfiguration.Mechanism {
250+
saslMechanism = c.saslConfiguration.Mechanism
251+
break
229252
}
230253
}
254+
if saslMechanism == "" {
255+
return fmt.Errorf("no matching mechanism found")
256+
}
257+
231258
response := unicodeNull + user + unicodeNull + password
232259
saslResponse := []byte(response)
233260
return c.sendSaslAuthenticate(saslMechanism, saslResponse)

pkg/stream/constants.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,8 @@ func lookUpCommand(command uint16) string {
194194

195195
return constLookup[command]
196196
}
197+
198+
const (
199+
SaslConfigurationPlain = "PLAIN"
200+
SaslConfigurationExternal = "EXTERNAL"
201+
)

pkg/stream/coordinator_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ var _ = Describe("Coordinator", func() {
1414
client *Client
1515
)
1616
BeforeEach(func() {
17-
client = newClient("test-client", nil, nil)
17+
client = newClient("test-client", nil, nil, nil)
1818

1919
})
2020
AfterEach(func() {
@@ -131,7 +131,7 @@ var _ = Describe("Coordinator", func() {
131131
client *Client
132132
)
133133
BeforeEach(func() {
134-
client = newClient("test-client", nil, nil)
134+
client = newClient("test-client", nil, nil, nil)
135135

136136
})
137137
AfterEach(func() {
@@ -185,7 +185,7 @@ var _ = Describe("Coordinator", func() {
185185
client *Client
186186
)
187187
BeforeEach(func() {
188-
client = newClient("test-client", nil, nil)
188+
client = newClient("test-client", nil, nil, nil)
189189

190190
})
191191
AfterEach(func() {

pkg/stream/environment.go

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
2525
if options == nil {
2626
options = NewEnvironmentOptions()
2727
}
28-
client := newClient("go-stream-locator", nil, options.TCPParameters)
28+
client := newClient("go-stream-locator", nil, options.TCPParameters, options.SaslConfiguration)
2929
defer func(client *Client) {
3030
err := client.Close()
3131
if err != nil {
@@ -38,6 +38,12 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
3838
return nil, fmt.Errorf(" MaxConsumersPerClient and MaxProducersPerClient must be between 1 and 254")
3939
}
4040

41+
if options.SaslConfiguration != nil {
42+
if options.SaslConfiguration.Mechanism != SaslConfigurationPlain && options.SaslConfiguration.Mechanism != SaslConfigurationExternal {
43+
return nil, fmt.Errorf("SaslConfiguration mechanism must be PLAIN or EXTERNAL")
44+
}
45+
}
46+
4147
if len(options.ConnectionParameters) == 0 {
4248
options.ConnectionParameters = []*Broker{newBrokerDefault()}
4349
}
@@ -76,7 +82,7 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
7682
}
7783
func (env *Environment) newReconnectClient() (*Client, error) {
7884
broker := env.options.ConnectionParameters[0]
79-
client := newClient("go-stream-locator", broker, env.options.TCPParameters)
85+
client := newClient("go-stream-locator", broker, env.options.TCPParameters, env.options.SaslConfiguration)
8086

8187
err := client.connect()
8288
tentatives := 1
@@ -86,7 +92,8 @@ func (env *Environment) newReconnectClient() (*Client, error) {
8692
time.Sleep(time.Duration(tentatives) * time.Second)
8793
rand.Seed(time.Now().UnixNano())
8894
n := rand.Intn(len(env.options.ConnectionParameters))
89-
client = newClient("stream-locator", env.options.ConnectionParameters[n], env.options.TCPParameters)
95+
client = newClient("stream-locator", env.options.ConnectionParameters[n], env.options.TCPParameters,
96+
env.options.SaslConfiguration)
9097
tentatives = tentatives + 1
9198
err = client.connect()
9299

@@ -261,6 +268,7 @@ func (env *Environment) IsClosed() bool {
261268
type EnvironmentOptions struct {
262269
ConnectionParameters []*Broker
263270
TCPParameters *TCPParameters
271+
SaslConfiguration *SaslConfiguration
264272
MaxProducersPerClient int
265273
MaxConsumersPerClient int
266274
AddressResolver *AddressResolver
@@ -272,6 +280,7 @@ func NewEnvironmentOptions() *EnvironmentOptions {
272280
MaxConsumersPerClient: 1,
273281
ConnectionParameters: []*Broker{},
274282
TCPParameters: newTCPParameterDefault(),
283+
SaslConfiguration: newSaslConfigurationDefault(),
275284
}
276285
}
277286

@@ -328,6 +337,14 @@ func (envOptions *EnvironmentOptions) SetVHost(vhost string) *EnvironmentOptions
328337
return envOptions
329338
}
330339

340+
func (envOptions *EnvironmentOptions) SetSaslConfiguration(value string) *EnvironmentOptions {
341+
if envOptions.SaslConfiguration == nil {
342+
envOptions.SaslConfiguration = newSaslConfigurationDefault()
343+
}
344+
envOptions.SaslConfiguration.Mechanism = value
345+
return envOptions
346+
}
347+
331348
func (envOptions *EnvironmentOptions) SetTLSConfig(config *tls.Config) *EnvironmentOptions {
332349
if envOptions.TCPParameters == nil {
333350
envOptions.TCPParameters = newTCPParameterDefault()
@@ -506,7 +523,7 @@ func (c *Client) maybeCleanConsumers(streamName string) {
506523
}
507524
}
508525

509-
func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, streamName string,
526+
func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string,
510527
options *ProducerOptions) (*Producer, error) {
511528
cc.mutex.Lock()
512529
defer cc.mutex.Unlock()
@@ -521,7 +538,7 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
521538
}
522539

523540
if clientResult == nil {
524-
clientResult = cc.newClientForProducer(leader, tcpParameters)
541+
clientResult = cc.newClientForProducer(leader, tcpParameters, saslConfiguration)
525542
}
526543

527544
err := clientResult.connect()
@@ -538,7 +555,7 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
538555
if err != nil {
539556
return nil, err
540557
}
541-
clientResult = cc.newClientForProducer(leader, tcpParameters)
558+
clientResult = cc.newClientForProducer(leader, tcpParameters, saslConfiguration)
542559
err = clientResult.connect()
543560
if err != nil {
544561
return nil, err
@@ -555,8 +572,8 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP
555572
return producer, nil
556573
}
557574

558-
func (cc *environmentCoordinator) newClientForProducer(leader *Broker, tcpParameters *TCPParameters) *Client {
559-
clientResult := newClient("go-stream-producer", leader, tcpParameters)
575+
func (cc *environmentCoordinator) newClientForProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration) *Client {
576+
clientResult := newClient("go-stream-producer", leader, tcpParameters, saslConfiguration)
560577
chMeta := make(chan metaDataUpdateEvent, 1)
561578
clientResult.metadataListener = chMeta
562579
go func(ch <-chan metaDataUpdateEvent, cl *Client) {
@@ -575,7 +592,7 @@ func (cc *environmentCoordinator) newClientForProducer(leader *Broker, tcpParame
575592
return clientResult
576593
}
577594

578-
func (cc *environmentCoordinator) newConsumer(leader *Broker, tcpParameters *TCPParameters,
595+
func (cc *environmentCoordinator) newConsumer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration,
579596
streamName string, messagesHandler MessagesHandler,
580597
options *ConsumerOptions) (*Consumer, error) {
581598
cc.mutex.Lock()
@@ -591,7 +608,7 @@ func (cc *environmentCoordinator) newConsumer(leader *Broker, tcpParameters *TCP
591608
}
592609

593610
if clientResult == nil {
594-
clientResult = newClient("go-stream-consumer", leader, tcpParameters)
611+
clientResult = newClient("go-stream-consumer", leader, tcpParameters, saslConfiguration)
595612
chMeta := make(chan metaDataUpdateEvent)
596613
clientResult.metadataListener = chMeta
597614
go func(ch <-chan metaDataUpdateEvent, cl *Client) {
@@ -675,8 +692,8 @@ func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName st
675692
}
676693
leader.cloneFrom(clientLocator.broker, resolver)
677694

678-
producer, err := ps.producersCoordinator[coordinatorKey].newProducer(leader, clientLocator.tcpParameters, streamName,
679-
options)
695+
producer, err := ps.producersCoordinator[coordinatorKey].newProducer(leader, clientLocator.tcpParameters,
696+
clientLocator.saslConfiguration, streamName, options)
680697
if err != nil {
681698
return nil, err
682699
}
@@ -740,7 +757,7 @@ func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName
740757
}
741758
consumerBroker.cloneFrom(clientLocator.broker, resolver)
742759
consumer, err := ps.consumersCoordinator[coordinatorKey].
743-
newConsumer(consumerBroker, clientLocator.tcpParameters, streamName, messagesHandler, consumerOptions)
760+
newConsumer(consumerBroker, clientLocator.tcpParameters, clientLocator.saslConfiguration, streamName, messagesHandler, consumerOptions)
744761
if err != nil {
745762
return nil, err
746763
}

pkg/stream/environment_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,13 @@ var _ = Describe("Environment test", func() {
343343
Expect(err).To(HaveOccurred())
344344
})
345345

346+
It("Fail to set SetSaslConfiguration", func() {
347+
_, err := NewEnvironment(NewEnvironmentOptions().
348+
SetSaslConfiguration("IS_NOT_VALID").
349+
IsTLS(true))
350+
Expect(err).To(HaveOccurred())
351+
})
352+
346353
It("Set TCP parameters", func() {
347354
// weak test, atm I don't have other ways to test these values
348355
// just validate that the connection still works

0 commit comments

Comments
 (0)