39
39
import org .bson .BsonNull ;
40
40
import org .bson .BsonString ;
41
41
import org .bson .Document ;
42
+ import org .junit .Ignore ;
42
43
import org .junit .Test ;
43
44
44
45
import java .util .ArrayList ;
46
+ import java .util .HashSet ;
45
47
import java .util .List ;
48
+ import java .util .Optional ;
49
+ import java .util .Set ;
46
50
import java .util .concurrent .BlockingQueue ;
47
51
import java .util .concurrent .CountDownLatch ;
48
- import java .util .concurrent .SynchronousQueue ;
52
+ import java .util .concurrent .LinkedBlockingQueue ;
49
53
import java .util .concurrent .TimeUnit ;
50
54
51
55
import static com .mongodb .ClusterFixture .configureFailPoint ;
57
61
import static com .mongodb .client .Fixture .getMongoClientSettingsBuilder ;
58
62
import static java .lang .String .format ;
59
63
import static java .util .Arrays .asList ;
64
+ import static java .util .Collections .singleton ;
60
65
import static java .util .Collections .synchronizedList ;
61
66
import static java .util .concurrent .TimeUnit .MILLISECONDS ;
62
67
import static java .util .concurrent .TimeUnit .NANOSECONDS ;
@@ -168,11 +173,12 @@ public void serverDescriptionChanged(final ServerDescriptionChangedEvent event)
168
173
* <a href="https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring-tests.rst#connection-pool-management">Connection Pool Management</a>.
169
174
*/
170
175
@ Test
176
+ @ Ignore
171
177
@ SuppressWarnings ("try" )
172
178
public void testConnectionPoolManagement () throws InterruptedException {
173
179
assumeTrue (serverVersionAtLeast (4 , 3 ));
174
180
assumeFalse (isServerlessTest ());
175
- BlockingQueue <Object > events = new SynchronousQueue <>(true );
181
+ BlockingQueue <Object > events = new LinkedBlockingQueue <>();
176
182
ServerMonitorListener serverMonitorListener = new ServerMonitorListener () {
177
183
@ Override
178
184
public void serverHeartbeatSucceeded (final ServerHeartbeatSucceededEvent event ) {
@@ -209,7 +215,7 @@ public void connectionPoolCleared(final ConnectionPoolClearedEvent event) {
209
215
/* Note that ServerHeartbeatSucceededEvent type is sometimes allowed but never required.
210
216
* This is because DefaultServerMonitor does not send such events in situations when a server check happens as part
211
217
* of a connection handshake. */
212
- assertPoll (events , ServerHeartbeatSucceededEvent .class , ConnectionPoolReadyEvent .class );
218
+ assertPoll (events , ServerHeartbeatSucceededEvent .class , singleton ( ConnectionPoolReadyEvent .class ) );
213
219
configureFailPoint (new BsonDocument ()
214
220
.append ("configureFailPoint" , new BsonString ("failCommand" ))
215
221
.append ("mode" , new BsonDocument ()
@@ -218,9 +224,9 @@ public void connectionPoolCleared(final ConnectionPoolClearedEvent event) {
218
224
.append ("failCommands" , new BsonArray (asList (new BsonString ("isMaster" ), new BsonString ("hello" ))))
219
225
.append ("errorCode" , new BsonInt32 (1234 ))
220
226
.append ("appName" , new BsonString (appName ))));
221
- assertPoll (events , ServerHeartbeatSucceededEvent .class , ServerHeartbeatFailedEvent . class );
222
- assertPoll ( events , null , ConnectionPoolClearedEvent .class );
223
- assertPoll (events , ServerHeartbeatSucceededEvent .class , ConnectionPoolReadyEvent .class );
227
+ assertPoll (events , ServerHeartbeatSucceededEvent .class ,
228
+ new HashSet <>( asList ( ServerHeartbeatFailedEvent . class , ConnectionPoolClearedEvent .class )) );
229
+ assertPoll (events , null , new HashSet <>( asList ( ServerHeartbeatSucceededEvent .class , ConnectionPoolReadyEvent .class )) );
224
230
} finally {
225
231
disableFailPoint ("failCommand" );
226
232
}
@@ -268,13 +274,14 @@ public void monitorsSleepAtLeastMinHeartbeatFreqencyMSBetweenChecks() {
268
274
}
269
275
}
270
276
271
- private static void assertPoll (final BlockingQueue <?> queue , @ Nullable final Class <?> allowed , final Class <?> required )
277
+ private static void assertPoll (final BlockingQueue <?> queue , @ Nullable final Class <?> allowed , final Set < Class <?> > required )
272
278
throws InterruptedException {
273
279
assertPoll (queue , allowed , required , Timeout .startNow (TEST_WAIT_TIMEOUT_MILLIS , MILLISECONDS ));
274
280
}
275
281
276
- private static void assertPoll (final BlockingQueue <?> queue , @ Nullable final Class <?> allowed , final Class <?> required ,
282
+ private static void assertPoll (final BlockingQueue <?> queue , @ Nullable final Class <?> allowed , final Set < Class <?> > required ,
277
283
final Timeout timeout ) throws InterruptedException {
284
+ Set <Class <?>> encountered = new HashSet <>();
278
285
while (true ) {
279
286
Object element ;
280
287
if (timeout .isImmediate ()) {
@@ -286,22 +293,31 @@ private static void assertPoll(final BlockingQueue<?> queue, @Nullable final Cla
286
293
}
287
294
if (element != null ) {
288
295
if (LOGGER .isInfoEnabled ()) {
289
- LOGGER .info ("Polled " + element . toString () );
296
+ LOGGER .info ("Polled " + element );
290
297
}
291
298
Class <?> elementClass = element .getClass ();
292
- if (required .isAssignableFrom (elementClass )) {
299
+ if (findAssignable (elementClass , required )
300
+ .map (found -> {
301
+ encountered .add (found );
302
+ return encountered .equals (required );
303
+ }).orElseGet (() -> {
304
+ assertTrue (String .format ("allowed %s, required %s, actual %s" , allowed , required , elementClass ),
305
+ allowed != null && allowed .isAssignableFrom (elementClass ));
306
+ return false ;
307
+ })) {
293
308
return ;
294
- } else {
295
- assertTrue (String .format ("allowed %s, required %s, actual %s" , allowed , required , elementClass ),
296
- allowed != null && allowed .isAssignableFrom (elementClass ));
297
309
}
298
310
}
299
311
if (timeout .expired ()) {
300
- fail (" required " + required );
312
+ fail (String . format ( "encountered %s, required %s" , encountered , required ) );
301
313
}
302
314
}
303
315
}
304
316
317
+ private static Optional <Class <?>> findAssignable (final Class <?> from , final Set <Class <?>> toAnyOf ) {
318
+ return toAnyOf .stream ().filter (to -> to .isAssignableFrom (from )).findAny ();
319
+ }
320
+
305
321
private static <E > void put (final BlockingQueue <E > q , final E e ) {
306
322
try {
307
323
q .put (e );
0 commit comments