Skip to content

Commit 1fb7ed3

Browse files
authored
Enable the background thread to hand over a newly opened connection to a thread that is blocked by the max connecting limit (#805)
Backport JAVA-4316, JAVA-4346
1 parent 09d7d5a commit 1fb7ed3

File tree

4 files changed

+146
-41
lines changed

4 files changed

+146
-41
lines changed

driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -194,21 +194,18 @@ public void prune() {
194194

195195
/**
196196
* Try to populate this pool with items so that {@link #getCount()} is not smaller than {@code minSize}.
197-
* The {@code postCreate} action throwing a exception causes this method to stop and re-throw that exception.
197+
* The {@code initAndRelease} action throwing an exception causes this method to stop and re-throw that exception.
198198
*
199-
* @param initialize An action applied to non-{@code null} new items.
200-
* If an exception is thrown by the action, the action must treat the provided item as if obtained via
201-
* a {@link #get(long, TimeUnit) get…} method, {@linkplain #release(Object, boolean) releasing} it
202-
* if an exception is thrown; otherwise the action must not release the item.
199+
* @param initAndRelease An action applied to non-{@code null} new items.
200+
* If an exception is thrown by the action, the action must {@linkplain #release(Object, boolean) prune} the item.
201+
* Otherwise, the action must {@linkplain #release(Object) release} the item.
203202
*/
204-
public void ensureMinSize(final int minSize, final Consumer<T> initialize) {
203+
public void ensureMinSize(final int minSize, final Consumer<T> initAndRelease) {
205204
while (getCount() < minSize) {
206205
if (!acquirePermit(0, TimeUnit.MILLISECONDS)) {
207206
break;
208207
}
209-
T newItem = createNewAndReleasePermitIfFailure();
210-
initialize.accept(newItem);
211-
release(newItem);
208+
initAndRelease.accept(createNewAndReleasePermitIfFailure());
212209
}
213210
}
214211

driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373

7474
import static com.mongodb.assertions.Assertions.assertFalse;
7575
import static com.mongodb.assertions.Assertions.assertNotNull;
76+
import static com.mongodb.assertions.Assertions.assertNull;
7677
import static com.mongodb.assertions.Assertions.assertTrue;
7778
import static com.mongodb.assertions.Assertions.fail;
7879
import static com.mongodb.assertions.Assertions.isTrue;
@@ -198,7 +199,7 @@ public void getAsync(final SingleResultCallback<InternalConnection> callback) {
198199
LOGGER.trace(format("Pooled connection %s to server %s is not yet open",
199200
getId(connection), serverId));
200201
}
201-
openConcurrencyLimiter.openAsyncOrGetAvailable(connection, timeout, eventSendingCallback);
202+
openConcurrencyLimiter.openAsyncWithConcurrencyLimit(connection, timeout, eventSendingCallback);
202203
}
203204
}
204205
}));
@@ -358,7 +359,7 @@ public synchronized void run() {
358359
LOGGER.debug(format("Ensuring minimum pooled connections to %s", serverId.getAddress()));
359360
}
360361
pool.ensureMinSize(settings.getMinSize(), newConnection ->
361-
openConcurrencyLimiter.openImmediately(new PooledConnection(newConnection)));
362+
openConcurrencyLimiter.openImmediatelyAndTryHandOverOrRelease(new PooledConnection(newConnection)));
362363
}
363364
} catch (MongoInterruptedException | MongoTimeoutException e) {
364365
//complete the maintenance task
@@ -802,47 +803,56 @@ private final class OpenConcurrencyLimiter {
802803
}
803804

804805
PooledConnection openOrGetAvailable(final PooledConnection connection, final Timeout timeout) throws MongoTimeoutException {
805-
return openOrGetAvailable(connection, true, timeout);
806+
PooledConnection result = openWithConcurrencyLimit(connection, OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, timeout);
807+
return assertNotNull(result);
806808
}
807809

808-
void openImmediately(final PooledConnection connection) throws MongoTimeoutException {
809-
PooledConnection result = openOrGetAvailable(connection, false, Timeout.immediate());
810-
assertTrue(result == connection);
810+
void openImmediatelyAndTryHandOverOrRelease(final PooledConnection connection) throws MongoTimeoutException {
811+
assertNull(openWithConcurrencyLimit(connection, OpenWithConcurrencyLimitMode.TRY_HAND_OVER_OR_RELEASE, Timeout.immediate()));
811812
}
812813

813814
/**
814815
* This method can be thought of as operating in two phases.
815816
* In the first phase it tries to synchronously acquire a permit to open the {@code connection}
816-
* or get a different {@linkplain PooledConnection#opened() opened} connection if {@code tryGetAvailable} is {@code true} and
817-
* one becomes available while waiting for a permit.
817+
* or get a different {@linkplain PooledConnection#opened() opened} connection if {@code mode} is
818+
* {@link OpenWithConcurrencyLimitMode#TRY_GET_AVAILABLE} and one becomes available while waiting for a permit.
818819
* The first phase has one of the following outcomes:
819820
* <ol>
820821
* <li>A {@link MongoTimeoutException} or a different {@link Exception} is thrown,
821822
* and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.</li>
822823
* <li>An opened connection different from the specified one is returned,
823-
* and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.</li>
824+
* and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.
825+
* This outcome is possible only if {@code mode} is {@link OpenWithConcurrencyLimitMode#TRY_GET_AVAILABLE}.</li>
824826
* <li>A permit is acquired, {@link #connectionCreated(ConnectionPoolListener, ConnectionId)} is reported
825827
* and an attempt to open the specified {@code connection} is made. This is the second phase in which
826828
* the {@code connection} is {@linkplain PooledConnection#open() opened synchronously}.
827829
* The attempt to open the {@code connection} has one of the following outcomes
828-
* combined with releasing the acquired permit:</li>
830+
* combined with releasing the acquired permit:
829831
* <ol>
830832
* <li>An {@link Exception} is thrown
831833
* and the {@code connection} is {@linkplain PooledConnection#closeAndHandleOpenFailure() closed}.</li>
832-
* <li>The specified {@code connection}, which is now opened, is returned.</li>
834+
* <li>Else if the specified {@code connection} is opened successfully and
835+
* {@code mode} is {@link OpenWithConcurrencyLimitMode#TRY_HAND_OVER_OR_RELEASE},
836+
* then {@link #tryHandOverOrRelease(UsageTrackingInternalConnection)} is called and {@code null} is returned.</li>
837+
* <li>Else the specified {@code connection}, which is now opened, is returned.</li>
833838
* </ol>
839+
* </li>
834840
* </ol>
835841
*
836842
* @param timeout Applies only to the first phase.
837843
* @return An {@linkplain PooledConnection#opened() opened} connection which is
838-
* either the specified {@code connection} or a different one.
844+
* either the specified {@code connection},
845+
* or potentially a different one if {@code mode} is {@link OpenWithConcurrencyLimitMode#TRY_GET_AVAILABLE},
846+
* or {@code null} if {@code mode} is {@link OpenWithConcurrencyLimitMode#TRY_HAND_OVER_OR_RELEASE}.
839847
* @throws MongoTimeoutException If the first phase timed out.
840848
*/
841-
private PooledConnection openOrGetAvailable(
842-
final PooledConnection connection, final boolean tryGetAvailable, final Timeout timeout) throws MongoTimeoutException {
849+
@Nullable
850+
private PooledConnection openWithConcurrencyLimit(final PooledConnection connection, final OpenWithConcurrencyLimitMode mode,
851+
final Timeout timeout) throws MongoTimeoutException {
843852
PooledConnection availableConnection;
844853
try {//phase one
845-
availableConnection = acquirePermitOrGetAvailableOpenedConnection(tryGetAvailable, timeout);
854+
availableConnection = acquirePermitOrGetAvailableOpenedConnection(
855+
mode == OpenWithConcurrencyLimitMode.TRY_GET_AVAILABLE, timeout);
846856
} catch (RuntimeException e) {
847857
connection.closeSilently();
848858
throw e;
@@ -853,25 +863,32 @@ private PooledConnection openOrGetAvailable(
853863
} else {//acquired a permit, phase two
854864
try {
855865
connection.open();
866+
if (mode == OpenWithConcurrencyLimitMode.TRY_HAND_OVER_OR_RELEASE) {
867+
tryHandOverOrRelease(connection.wrapped);
868+
return null;
869+
} else {
870+
return connection;
871+
}
856872
} finally {
857873
releasePermit();
858874
}
859-
return connection;
860875
}
861876
}
862877

863878
/**
864-
* This method is similar to {@link #openOrGetAvailable(PooledConnection, boolean, Timeout)} with the following differences:
879+
* This method is similar to {@link #openWithConcurrencyLimit(PooledConnection, OpenWithConcurrencyLimitMode, Timeout)}
880+
* with the following differences:
865881
* <ul>
866-
* <li>It does not have the {@code tryGetAvailable} parameter and acts as if this parameter were {@code true}.</li>
882+
* <li>It does not have the {@code mode} parameter and acts as if this parameter were
883+
* {@link OpenWithConcurrencyLimitMode#TRY_GET_AVAILABLE}.</li>
867884
* <li>While the first phase is still synchronous, the {@code connection} is
868885
* {@linkplain PooledConnection#openAsync(SingleResultCallback) opened asynchronously} in the second phase.</li>
869886
* <li>Instead of returning a result or throwing an exception via Java {@code return}/{@code throw} statements,
870887
* it calls {@code callback.}{@link SingleResultCallback#onResult(Object, Throwable) onResult(result, failure)}
871888
* and passes either a {@link PooledConnection} or an {@link Exception}.</li>
872889
* </ul>
873890
*/
874-
void openAsyncOrGetAvailable(
891+
void openAsyncWithConcurrencyLimit(
875892
final PooledConnection connection, final Timeout timeout, final SingleResultCallback<InternalConnection> callback) {
876893
PooledConnection availableConnection;
877894
try {//phase one
@@ -1044,6 +1061,14 @@ private long awaitNanos(final Condition condition, final long timeoutNanos) thro
10441061
}
10451062
}
10461063

1064+
/**
1065+
* @see OpenConcurrencyLimiter#openWithConcurrencyLimit(PooledConnection, OpenWithConcurrencyLimitMode, Timeout)
1066+
*/
1067+
private enum OpenWithConcurrencyLimitMode {
1068+
TRY_GET_AVAILABLE,
1069+
TRY_HAND_OVER_OR_RELEASE
1070+
}
1071+
10471072
@NotThreadSafe
10481073
private static final class MutableReference<T> {
10491074
@Nullable
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
{
2+
"version": 1,
3+
"style": "integration",
4+
"description": "threads blocked by maxConnecting check out minPoolSize connections",
5+
"runOn": [
6+
{
7+
"minServerVersion": "4.4.0"
8+
}
9+
],
10+
"failPoint": {
11+
"configureFailPoint": "failCommand",
12+
"mode": "alwaysOn",
13+
"data": {
14+
"failCommands": [
15+
"isMaster",
16+
"hello"
17+
],
18+
"closeConnection": false,
19+
"blockConnection": true,
20+
"blockTimeMS": 500
21+
}
22+
},
23+
"poolOptions": {
24+
"minPoolSize": 2,
25+
"maxPoolSize": 3,
26+
"waitQueueTimeoutMS": 5000
27+
},
28+
"operations": [
29+
{
30+
"name": "start",
31+
"target": "thread1"
32+
},
33+
{
34+
"name": "start",
35+
"target": "thread2"
36+
},
37+
{
38+
"name": "wait",
39+
"ms": 200
40+
},
41+
{
42+
"name": "checkOut",
43+
"thread": "thread1"
44+
},
45+
{
46+
"name": "waitForEvent",
47+
"event": "ConnectionCreated",
48+
"count": 2
49+
},
50+
{
51+
"name": "checkOut",
52+
"thread": "thread2"
53+
},
54+
{
55+
"name": "waitForEvent",
56+
"event": "ConnectionCheckedOut",
57+
"count": 2
58+
}
59+
],
60+
"events": [
61+
{
62+
"type": "ConnectionCreated",
63+
"address": 42
64+
},
65+
{
66+
"type": "ConnectionCreated",
67+
"address": 42
68+
},
69+
{
70+
"type": "ConnectionCheckedOut",
71+
"address": 42
72+
},
73+
{
74+
"type": "ConnectionCheckedOut",
75+
"address": 42
76+
}
77+
],
78+
"ignore": [
79+
"ConnectionClosed",
80+
"ConnectionReady",
81+
"ConnectionPoolCreated",
82+
"ConnectionCheckOutStarted"
83+
]
84+
}

driver-core/src/test/unit/com/mongodb/internal/connection/ConcurrentPoolTest.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -141,37 +141,40 @@ public void testCloseItemOnReleaseAfterPoolClosed() {
141141
@Test
142142
public void testEnsureMinSize() {
143143
pool = new ConcurrentPool<TestCloseable>(3, new TestItemFactory());
144-
Consumer<TestCloseable> noInit = x -> {};
145-
pool.ensureMinSize(0, noInit);
144+
Consumer<TestCloseable> initAndRelease = connection -> pool.release(connection);
145+
pool.ensureMinSize(0, initAndRelease);
146146
assertEquals(0, pool.getAvailableCount());
147147

148-
pool.ensureMinSize(1, noInit);
148+
pool.ensureMinSize(1, initAndRelease);
149149
assertEquals(1, pool.getAvailableCount());
150150

151-
pool.ensureMinSize(1, noInit);
151+
pool.ensureMinSize(1, initAndRelease);
152152
assertEquals(1, pool.getAvailableCount());
153153

154154
pool.get();
155-
pool.ensureMinSize(1, noInit);
155+
pool.ensureMinSize(1, initAndRelease);
156156
assertEquals(0, pool.getAvailableCount());
157157

158-
pool.ensureMinSize(4, noInit);
158+
pool.ensureMinSize(4, initAndRelease);
159159
assertEquals(3, pool.getAvailableCount());
160160
}
161161

162162
@Test
163163
public void whenEnsuringMinSizeShouldNotInitializePooledItemIfNotRequested() {
164164
pool = new ConcurrentPool<TestCloseable>(3, new TestItemFactory());
165165

166-
pool.ensureMinSize(1, noInit -> {});
166+
pool.ensureMinSize(1, pool::release);
167167
assertFalse(pool.get().isInitialized());
168168
}
169169

170170
@Test
171171
public void whenEnsuringMinSizeShouldInitializePooledItemIfRequested() {
172172
pool = new ConcurrentPool<TestCloseable>(3, new TestItemFactory());
173173

174-
pool.ensureMinSize(1, TestCloseable.INIT_ACTION);
174+
pool.ensureMinSize(1, connection -> {
175+
connection.initialized = true;
176+
pool.release(connection);
177+
});
175178
assertTrue(pool.get().isInitialized());
176179
}
177180

@@ -180,7 +183,7 @@ public void testThatEnsuringMinSizeReleasesPermitIfCreateFails() {
180183
pool = new ConcurrentPool<TestCloseable>(1, new TestItemFactory(true));
181184

182185
try {
183-
pool.ensureMinSize(1, TestCloseable.INIT_ACTION);
186+
pool.ensureMinSize(1, ignore -> fail());
184187
fail();
185188
} catch (MongoException e) {
186189
// expected
@@ -252,10 +255,6 @@ public ConcurrentPool.Prune shouldPrune(final TestCloseable testCloseable) {
252255
}
253256

254257
static class TestCloseable implements Closeable {
255-
private static final Consumer<TestCloseable> INIT_ACTION = connection -> {
256-
connection.initialized = true;
257-
};
258-
259258
private boolean closed;
260259
private ConcurrentPool.Prune shouldPrune;
261260
private boolean initialized;

0 commit comments

Comments
 (0)