20
20
import java .io .InterruptedIOException ;
21
21
import java .net .SocketTimeoutException ;
22
22
import java .util .ArrayList ;
23
- import java .util .Arrays ;
24
23
import java .util .List ;
25
24
import java .util .concurrent .Executors ;
26
25
import java .util .concurrent .ScheduledExecutorService ;
31
30
import static com .mongodb .ClusterConnectionMode .Single ;
32
31
import static com .mongodb .ClusterType .ReplicaSet ;
33
32
import static com .mongodb .ClusterType .Sharded ;
33
+ import static com .mongodb .ClusterType .Unknown ;
34
34
import static com .mongodb .MongoAuthority .Type .Set ;
35
35
import static java .util .concurrent .TimeUnit .MILLISECONDS ;
36
36
import static org .bson .util .Assertions .isTrue ;
@@ -52,14 +52,19 @@ public class DBTCPConnector implements DBConnector {
52
52
53
53
private final MyPort _myPort = new MyPort ();
54
54
55
- private ServerSelector prefixedServerSelector ;
55
+ private final ClusterConnectionMode connectionMode ;
56
+
57
+ private ClusterType type = ClusterType .Unknown ;
58
+ private MongosHAServerSelector mongosHAServerSelector ;
56
59
57
60
/**
58
61
* @param mongo the Mongo instance
59
62
* @throws MongoException
60
63
*/
61
64
public DBTCPConnector ( Mongo mongo ) {
62
65
_mongo = mongo ;
66
+ connectionMode = _mongo .getAuthority ().getType () == Set || _mongo .getMongoOptions ().getRequiredReplicaSetName () != null ?
67
+ Multiple : Single ;
63
68
}
64
69
65
70
public void start () {
@@ -76,8 +81,7 @@ public void start() {
76
81
Clusters .create (clusterId ,
77
82
ClusterSettings .builder ()
78
83
.hosts (_mongo .getAuthority ().getServerAddresses ())
79
- .mode (_mongo .getAuthority ().getType () == Set || options .getRequiredReplicaSetName () != null ?
80
- Multiple : Single )
84
+ .mode (connectionMode )
81
85
.requiredReplicaSetName (_mongo .getMongoOptions ().getRequiredReplicaSetName ())
82
86
.build (),
83
87
ServerSettings .builder ()
@@ -326,7 +330,7 @@ private Response innerCall(final DB db, final DBCollection coll, final OutMessag
326
330
public ServerAddress getAddress () {
327
331
isTrue ("open" , !_closed );
328
332
ClusterDescription clusterDescription = getClusterDescription ();
329
- if (clusterDescription . getConnectionMode () == Single ) {
333
+ if (connectionMode == Single ) {
330
334
return clusterDescription .getAny ().get (0 ).getAddress ();
331
335
}
332
336
if (clusterDescription .getPrimaries ().isEmpty ()) {
@@ -362,16 +366,13 @@ public List<ServerAddress> getServerAddressList() {
362
366
363
367
public ReplicaSetStatus getReplicaSetStatus () {
364
368
isTrue ("open" , !_closed );
365
- ClusterDescription description = getClusterDescription ();
366
- return description .getType () == ReplicaSet &&
367
- description .getConnectionMode () == Multiple
368
- ? new ReplicaSetStatus (description ) : null ;
369
+ return getType () == ReplicaSet && connectionMode == Multiple ? new ReplicaSetStatus (getClusterDescription ()) : null ;
369
370
}
370
371
371
372
// This call can block if it's not yet known.
372
373
boolean isMongosConnection () {
373
374
isTrue ("open" , !_closed );
374
- return getClusterDescription (). getType () == Sharded ;
375
+ return getType () == Sharded ;
375
376
}
376
377
377
378
public String getConnectPoint (){
@@ -393,8 +394,7 @@ private boolean shouldRetryQuery(ReadPreference readPreference, final DBCollecti
393
394
if (readPreference .equals (ReadPreference .primary ())) {
394
395
return false ;
395
396
}
396
- ClusterDescription description = getClusterDescription ();
397
- return description .getConnectionMode () == Multiple && description .getType () == ReplicaSet ;
397
+ return connectionMode == Multiple && getType () == ReplicaSet ;
398
398
}
399
399
400
400
private ClusterDescription getClusterDescription () {
@@ -501,7 +501,7 @@ void requestEnsureConnection(){
501
501
if ( getPinnedRequestPortForThread () != null )
502
502
return ;
503
503
504
- setPinnedRequestPortForThread (getConnection (new ReadPreferenceServerSelector (ReadPreference .primary ())));
504
+ setPinnedRequestPortForThread (getConnection (createServerSelector (ReadPreference .primary ())));
505
505
}
506
506
507
507
private DBPort getConnection (final ServerSelector serverSelector ) {
@@ -551,23 +551,37 @@ void setPinnedRequestPortForThread(final DBPort port) {
551
551
private final ThreadLocal <PinnedRequestStatus > pinnedRequestStatusThreadLocal = new ThreadLocal <PinnedRequestStatus >();
552
552
}
553
553
554
- private ServerSelector createServerSelector (final ReadPreference readPref ) {
555
- return new CompositeServerSelector (Arrays .asList (getPrefixedServerSelector (),
556
- new ReadPreferenceServerSelector (readPref ),
557
- new LatencyMinimizingServerSelector (_mongo .getMongoOptions ()
558
- .acceptableLatencyDifferenceMS , MILLISECONDS )));
559
- }
560
-
561
- private synchronized ServerSelector getPrefixedServerSelector () {
562
- if (prefixedServerSelector == null ) {
563
- ClusterDescription clusterDescription = getClusterDescription ();
564
- if (clusterDescription .getConnectionMode () == Multiple && clusterDescription .getType () == Sharded ) {
565
- prefixedServerSelector = new MongosHAServerSelector ();
554
+ private ServerSelector createServerSelector (final ReadPreference readPreference ) {
555
+ if (connectionMode == Multiple ) {
556
+ List <ServerSelector > serverSelectorList = new ArrayList <ServerSelector >();
557
+ if (getType () == Sharded ) {
558
+ serverSelectorList .add (getMongosHAServerSelector ());
559
+ } else if (getType () == ReplicaSet ) {
560
+ serverSelectorList .add (new ReadPreferenceServerSelector (readPreference ));
566
561
} else {
567
- prefixedServerSelector = new NoOpServerSelector ( );
562
+ serverSelectorList . add ( new AnyServerSelector () );
568
563
}
564
+ serverSelectorList .add (new LatencyMinimizingServerSelector (_mongo .getMongoOptions ().acceptableLatencyDifferenceMS ,
565
+ MILLISECONDS ));
566
+ return new CompositeServerSelector (serverSelectorList );
567
+ } else {
568
+ return new AnyServerSelector ();
569
+ }
570
+ }
571
+
572
+ private synchronized ClusterType getType () {
573
+ if (type == Unknown ) {
574
+ type = getClusterDescription ().getType ();
575
+ }
576
+ return type ;
577
+ }
578
+
579
+ // There needs to be just one instance of this because it's stateful between requests
580
+ private synchronized MongosHAServerSelector getMongosHAServerSelector () {
581
+ if (mongosHAServerSelector == null ) {
582
+ mongosHAServerSelector = new MongosHAServerSelector ();
569
583
}
570
- return prefixedServerSelector ;
584
+ return mongosHAServerSelector ;
571
585
}
572
586
573
587
static class PinnedRequestStatus {
0 commit comments