@@ -19,10 +19,12 @@ import (
19
19
"fmt"
20
20
"os"
21
21
"reflect"
22
+ "strings"
22
23
"sync"
23
24
"testing"
24
25
"time"
25
26
27
+ "github.com/prometheus/client_golang/prometheus/testutil"
26
28
"github.com/stretchr/testify/require"
27
29
"go.uber.org/zap"
28
30
"go.uber.org/zap/zaptest"
@@ -80,6 +82,184 @@ func TestNewWatcherCancel(t *testing.T) {
80
82
}
81
83
}
82
84
85
+ func TestNewWatcherCountGauge (t * testing.T ) {
86
+ expectWatchGauge := func (watchers int ) {
87
+ expected := fmt .Sprintf (`# HELP etcd_debugging_mvcc_watcher_total Total number of watchers.
88
+ # TYPE etcd_debugging_mvcc_watcher_total gauge
89
+ etcd_debugging_mvcc_watcher_total %d
90
+ ` , watchers )
91
+ err := testutil .CollectAndCompare (watcherGauge , strings .NewReader (expected ), "etcd_debugging_mvcc_watcher_total" )
92
+ if err != nil {
93
+ t .Error (err )
94
+ }
95
+ }
96
+
97
+ t .Run ("regular watch" , func (t * testing.T ) {
98
+ b , tmpPath := betesting .NewDefaultTmpBackend (t )
99
+ s := newWatchableStore (zap .NewExample (), b , & lease.FakeLessor {}, StoreConfig {})
100
+ defer func () {
101
+ s .store .Close ()
102
+ os .Remove (tmpPath )
103
+ }()
104
+
105
+ // watcherGauge is a package variable and its value may change depending on
106
+ // the execution of other tests
107
+ initialGaugeState := int (testutil .ToFloat64 (watcherGauge ))
108
+
109
+ testKey := []byte ("foo" )
110
+ testValue := []byte ("bar" )
111
+ s .Put (testKey , testValue , lease .NoLease )
112
+
113
+ // we expect the gauge state to still be in its initial state
114
+ expectWatchGauge (initialGaugeState )
115
+
116
+ w := s .NewWatchStream ()
117
+ defer w .Close ()
118
+
119
+ wt , _ := w .Watch (0 , testKey , nil , 0 )
120
+
121
+ // after creating watch, the gauge state should have increased
122
+ expectWatchGauge (initialGaugeState + 1 )
123
+
124
+ if err := w .Cancel (wt ); err != nil {
125
+ t .Error (err )
126
+ }
127
+
128
+ // after cancelling watch, the gauge state should have decreased
129
+ expectWatchGauge (initialGaugeState )
130
+
131
+ w .Cancel (wt )
132
+
133
+ // cancelling the watch twice shouldn't decrement the counter twice
134
+ expectWatchGauge (initialGaugeState )
135
+ })
136
+
137
+ t .Run ("compacted watch" , func (t * testing.T ) {
138
+ b , tmpPath := betesting .NewDefaultTmpBackend (t )
139
+ s := newWatchableStore (zap .NewExample (), b , & lease.FakeLessor {}, StoreConfig {})
140
+ defer func () {
141
+ s .store .Close ()
142
+ os .Remove (tmpPath )
143
+ }()
144
+
145
+ // watcherGauge is a package variable and its value may change depending on
146
+ // the execution of other tests
147
+ initialGaugeState := int (testutil .ToFloat64 (watcherGauge ))
148
+
149
+ testKey := []byte ("foo" )
150
+ testValue := []byte ("bar" )
151
+
152
+ s .Put (testKey , testValue , lease .NoLease )
153
+ rev := s .Put (testKey , testValue , lease .NoLease )
154
+
155
+ // compact up to the revision of the key we just put
156
+ _ , err := s .Compact (traceutil .TODO (), rev )
157
+ if err != nil {
158
+ t .Error (err )
159
+ }
160
+
161
+ // we expect the gauge state to still be in its initial state
162
+ expectWatchGauge (initialGaugeState )
163
+
164
+ w := s .NewWatchStream ()
165
+ defer w .Close ()
166
+
167
+ wt , _ := w .Watch (0 , testKey , nil , rev - 1 )
168
+
169
+ // wait for the watcher to be marked as compacted
170
+ select {
171
+ case resp := <- w .Chan ():
172
+ if resp .CompactRevision == 0 {
173
+ t .Errorf ("resp.Compacted = %v, want %v" , resp .CompactRevision , rev )
174
+ }
175
+ case <- time .After (time .Second ):
176
+ t .Fatalf ("failed to receive response (timeout)" )
177
+ }
178
+
179
+ // after creating watch, the gauge state should have increased
180
+ expectWatchGauge (initialGaugeState + 1 )
181
+
182
+ if err := w .Cancel (wt ); err != nil {
183
+ t .Error (err )
184
+ }
185
+
186
+ // after cancelling watch, the gauge state should have decreased
187
+ expectWatchGauge (initialGaugeState )
188
+
189
+ w .Cancel (wt )
190
+
191
+ // cancelling the watch twice shouldn't decrement the counter twice
192
+ expectWatchGauge (initialGaugeState )
193
+ })
194
+
195
+ t .Run ("compacted watch, close/cancel race" , func (t * testing.T ) {
196
+ b , tmpPath := betesting .NewDefaultTmpBackend (t )
197
+ s := newWatchableStore (zap .NewExample (), b , & lease.FakeLessor {}, StoreConfig {})
198
+ defer func () {
199
+ s .store .Close ()
200
+ os .Remove (tmpPath )
201
+ }()
202
+
203
+ // watcherGauge is a package variable and its value may change depending on
204
+ // the execution of other tests
205
+ initialGaugeState := int (testutil .ToFloat64 (watcherGauge ))
206
+
207
+ testKey := []byte ("foo" )
208
+ testValue := []byte ("bar" )
209
+
210
+ s .Put (testKey , testValue , lease .NoLease )
211
+ rev := s .Put (testKey , testValue , lease .NoLease )
212
+
213
+ // compact up to the revision of the key we just put
214
+ _ , err := s .Compact (traceutil .TODO (), rev )
215
+ if err != nil {
216
+ t .Error (err )
217
+ }
218
+
219
+ // we expect the gauge state to still be in its initial state
220
+ expectWatchGauge (initialGaugeState )
221
+
222
+ w := s .NewWatchStream ()
223
+
224
+ wt , _ := w .Watch (0 , testKey , nil , rev - 1 )
225
+
226
+ // wait for the watcher to be marked as compacted
227
+ select {
228
+ case resp := <- w .Chan ():
229
+ if resp .CompactRevision == 0 {
230
+ t .Errorf ("resp.Compacted = %v, want %v" , resp .CompactRevision , rev )
231
+ }
232
+ case <- time .After (time .Second ):
233
+ t .Fatalf ("failed to receive response (timeout)" )
234
+ }
235
+
236
+ // after creating watch, the gauge state should have increased
237
+ expectWatchGauge (initialGaugeState + 1 )
238
+
239
+ // now race cancelling and closing the watcher and watch stream.
240
+ // in rare scenarios the watcher cancel function can be invoked
241
+ // multiple times, leading to a potentially negative gauge state,
242
+ // see: https://github.com/etcd-io/etcd/issues/19577
243
+ wg := sync.WaitGroup {}
244
+ wg .Add (2 )
245
+
246
+ go func () {
247
+ w .Cancel (wt )
248
+ wg .Done ()
249
+ }()
250
+
251
+ go func () {
252
+ w .Close ()
253
+ wg .Done ()
254
+ }()
255
+
256
+ wg .Wait ()
257
+
258
+ // the gauge should be decremented to its original state
259
+ expectWatchGauge (initialGaugeState )
260
+ })
261
+ }
262
+
83
263
// TestCancelUnsynced tests if running CancelFunc removes watchers from unsynced.
84
264
func TestCancelUnsynced (t * testing.T ) {
85
265
b , tmpPath := betesting .NewDefaultTmpBackend (t )
0 commit comments