@@ -3,15 +3,107 @@ package messaging
3
3
import (
4
4
"fmt"
5
5
"strings"
6
-
7
- stan "github.com/nats-io/go-nats-streaming"
8
- "github.com/nats-io/go-nats-streaming/pb"
6
+ "time"
9
7
10
8
"github.com/netlify/netlify-commons/nconf"
9
+
10
+ "github.com/pkg/errors"
11
+
12
+ "github.com/nats-io/stan.go"
13
+ "github.com/nats-io/stan.go/pb"
14
+
15
+ "github.com/netlify/netlify-commons/discovery"
16
+ "github.com/sirupsen/logrus"
11
17
)
12
18
13
- func StartPoint (config * nconf.NatsConfig ) (stan.SubscriptionOption , error ) {
14
- switch v := strings .ToLower (config .StartPos ); v {
19
+ const (
20
+ NatsAuthMethodUser = "user"
21
+ NatsAuthMethodToken = "token"
22
+ NatsAuthMethodTLS = "tls"
23
+ )
24
+
25
+ type NatsAuth struct {
26
+ Method string `mapstructure:"method"`
27
+ User string `mapstructure:"user"`
28
+ Password string `mapstructure:"password"`
29
+ Token string `mapstructure:"token"`
30
+ }
31
+
32
+ type NatsConfig struct {
33
+ TLS * nconf.TLSConfig `mapstructure:"tls_conf"`
34
+ DiscoveryName string `mapstructure:"discovery_name" split_words:"true"`
35
+ Servers []string `mapstructure:"servers"`
36
+ Auth NatsAuth `mapstructure:"auth"`
37
+
38
+ // for streaming
39
+ ClusterID string `mapstructure:"cluster_id" split_words:"true"`
40
+ ClientID string `mapstructure:"client_id" split_words:"true"`
41
+ StartPos string `mapstructure:"start_pos" split_words:"true"`
42
+ }
43
+
44
+ type NatsClientConfig struct {
45
+ NatsConfig
46
+ Subject string `mapstructure:"command_subject"`
47
+ Group string `mapstructure:"command_group"`
48
+
49
+ // StartAt will configure where the client should resume the stream:
50
+ // - `all`: all the messages available
51
+ // - `last`: from where the client left off
52
+ // - `new`: all new messages for the client
53
+ // - `first`: from the first message available (default)
54
+ // - other: if it isn't one of the above fields, it will try and parse the param as a go duration (e.g. 30s, 1h)
55
+ StartAt string `mapstructure:"start_at"`
56
+ }
57
+
58
+ func (c * NatsConfig ) LoadServerNames () error {
59
+ if c .DiscoveryName == "" {
60
+ return nil
61
+ }
62
+
63
+ natsURLs := []string {}
64
+ endpoints , err := discovery .DiscoverEndpoints (c .DiscoveryName )
65
+ if err != nil {
66
+ return err
67
+ }
68
+
69
+ for _ , endpoint := range endpoints {
70
+ natsURLs = append (natsURLs , fmt .Sprintf ("nats://%s:%d" , endpoint .Target , endpoint .Port ))
71
+ }
72
+
73
+ c .Servers = natsURLs
74
+ return nil
75
+ }
76
+
77
+ // ServerString will build the proper string for nats connect
78
+ func (c * NatsConfig ) ServerString () string {
79
+ return strings .Join (c .Servers , "," )
80
+ }
81
+
82
+ func (c * NatsConfig ) Fields () logrus.Fields {
83
+ f := logrus.Fields {
84
+ "servers" : strings .Join (c .Servers , "," ),
85
+ }
86
+
87
+ if c .Auth .Method != "" {
88
+ f ["auth_method" ] = c .Auth .Method
89
+ }
90
+
91
+ if c .TLS != nil {
92
+ f ["ca_files" ] = strings .Join (c .TLS .CAFiles , "," )
93
+ f ["key_file" ] = c .TLS .KeyFile
94
+ f ["cert_file" ] = c .TLS .CertFile
95
+ }
96
+
97
+ if c .ClusterID != "" {
98
+ f ["client_id" ] = c .ClientID
99
+ f ["cluster_id" ] = c .ClusterID
100
+ }
101
+
102
+ return f
103
+ }
104
+
105
+ func (c * NatsConfig ) StartPoint () (stan.SubscriptionOption , error ) {
106
+ switch v := strings .ToLower (c .StartPos ); v {
15
107
case "all" :
16
108
return stan .DeliverAllAvailable (), nil
17
109
case "last" :
@@ -20,6 +112,11 @@ func StartPoint(config *nconf.NatsConfig) (stan.SubscriptionOption, error) {
20
112
return stan .StartAt (pb .StartPosition_NewOnly ), nil
21
113
case "" , "first" :
22
114
return stan .StartAt (pb .StartPosition_First ), nil
115
+ default :
116
+ dur , err := time .ParseDuration (v )
117
+ if err != nil {
118
+ return nil , errors .Wrap (err , "Failed to parse field as a duration" )
119
+ }
120
+ return stan .StartAtTimeDelta (dur ), nil
23
121
}
24
- return nil , fmt .Errorf ("Unknown start position '%s', possible values are all, last, new, first and ''" , config .StartPos )
25
122
}
0 commit comments