Skip to content

Commit bfd25dd

Browse files
stIncMalerozza
authored andcommitted
Support calling QueryBatchCursor.close concurrently with other QueryBatchCursor methods (#765)
JAVA-4183
1 parent 357ce8b commit bfd25dd

22 files changed

+732
-201
lines changed

.evergreen/run-load-balancer-tests.sh

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,22 @@ echo $first
7474
--tests UnifiedTransactionsTest \
7575
--tests InitialDnsSeedlistDiscoveryTest
7676
second=$?
77+
echo $second
78+
79+
./gradlew -PjdkHome=/opt/java/${JDK} \
80+
-Dorg.mongodb.test.uri=${SINGLE_MONGOS_LB_URI} \
81+
-Dorg.mongodb.test.transaction.uri=${MULTI_MONGOS_LB_URI} \
82+
${GRADLE_EXTRA_VARS} --stacktrace --info --continue driver-core:test \
83+
--tests QueryBatchCursorFunctionalSpecification
84+
third=$?
85+
echo $third
7786

7887
if [ $first -ne 0 ]; then
7988
exit $first
8089
elif [ $second -ne 0 ]; then
8190
exit $second
91+
elif [ $third -ne 0 ]; then
92+
exit $third
8293
else
8394
exit 0
8495
fi

driver-core/src/main/com/mongodb/assertions/Assertions.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,11 +179,23 @@ public static boolean assertFalse(final boolean value) throws AssertionError {
179179

180180
/**
181181
* @throws AssertionError Always
182+
* @return Never completes normally. The return type is {@link AssertionError} to allow writing {@code throw fail()}.
183+
* This may be helpful in non-{@code void} methods.
182184
*/
183-
public static void fail() throws AssertionError {
185+
public static AssertionError fail() throws AssertionError {
184186
throw new AssertionError();
185187
}
186188

189+
/**
190+
* @param msg The failure message.
191+
* @throws AssertionError Always
192+
* @return Never completes normally. The return type is {@link AssertionError} to allow writing {@code throw fail("failure message")}.
193+
* This may be helpful in non-{@code void} methods.
194+
*/
195+
public static AssertionError fail(final String msg) throws AssertionError {
196+
throw new AssertionError(assertNotNull(msg));
197+
}
198+
187199
private Assertions() {
188200
}
189201
}

driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,8 @@ public interface AsyncBatchCursor<T> extends Closeable {
6666
* To help making such code simpler, this method is required to be idempotent.
6767
* <p>
6868
* Another quirk is that this method is allowed to release resources "eventually",
69-
* i.e., not before (in the happens before order) returning.
69+
* i.e., not before (in the happens-before order) returning.
7070
* Nevertheless, {@link #isClosed()} called after (in the happens-before order) {@link #close()} must return {@code true}.
71-
*
72-
* @see #close()
7371
*/
7472
@Override
7573
void close();

driver-core/src/main/com/mongodb/internal/binding/ConnectionSource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.mongodb.connection.ServerDescription;
2121
import com.mongodb.internal.connection.Connection;
2222
import com.mongodb.internal.session.SessionContext;
23+
import com.mongodb.lang.Nullable;
2324

2425
/**
2526
* A source of connections to a single MongoDB server.
@@ -42,8 +43,10 @@ public interface ConnectionSource extends ReferenceCounted {
4243
*
4344
* @since 3.6
4445
*/
46+
@Nullable
4547
SessionContext getSessionContext();
4648

49+
@Nullable
4750
ServerApi getServerApi();
4851

4952
/**

driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,9 @@ private MongoTimeoutException createTimeoutException(final Timeout timeout) {
333333
}
334334
}
335335

336-
336+
/**
337+
* Is package-access for the purpose of testing and must not be used for any other purpose outside of this class.
338+
*/
337339
ConcurrentPool<UsageTrackingInternalConnection> getPool() {
338340
return pool;
339341
}

driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,9 @@ public void connect() {
209209
serverMonitor.connect();
210210
}
211211

212+
/**
213+
* Is package-access for the purpose of testing and must not be used for any other purpose outside of this class.
214+
*/
212215
ConnectionPool getConnectionPool() {
213216
return connectionPool;
214217
}

driver-core/src/main/com/mongodb/internal/connection/LoadBalancedServer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,13 @@ public void getConnectionAsync(final SingleResultCallback<AsyncConnection> callb
144144
});
145145
}
146146

147+
/**
148+
* Is package-access for the purpose of testing and must not be used for any other purpose outside of this class.
149+
*/
150+
ConnectionPool getConnectionPool() {
151+
return connectionPool;
152+
}
153+
147154
private class LoadBalancedServerProtocolExecutor implements ProtocolExecutor {
148155
@SuppressWarnings("unchecked")
149156
@Override

driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.mongodb.ServerAddress;
2020
import com.mongodb.ServerCursor;
2121
import com.mongodb.annotations.NotThreadSafe;
22+
import com.mongodb.lang.Nullable;
2223

2324
import java.io.Closeable;
2425
import java.util.Iterator;
@@ -37,6 +38,15 @@
3738
*/
3839
@NotThreadSafe
3940
public interface BatchCursor<T> extends Iterator<List<T>>, Closeable {
41+
/**
42+
* Despite this interface being {@linkplain NotThreadSafe non-thread-safe},
43+
* {@link #close()} is allowed to be called concurrently with any method of the cursor, including itself.
44+
* This is useful to cancel blocked {@link #hasNext()}, {@link #next()}.
45+
* This method is idempotent.
46+
* <p>
47+
* Another quirk is that this method is allowed to release resources "eventually",
48+
* i.e., not before (in the happens-before order) returning.
49+
*/
4050
@Override
4151
void close();
4252

@@ -85,6 +95,7 @@ public interface BatchCursor<T> extends Iterator<List<T>>, Closeable {
8595
*
8696
* @return ServerCursor
8797
*/
98+
@Nullable
8899
ServerCursor getServerCursor();
89100

90101
/**

driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.internal.operation;
1818

19+
import java.util.concurrent.atomic.AtomicBoolean;
1920
import java.util.function.Function;
2021
import com.mongodb.MongoChangeStreamException;
2122
import com.mongodb.MongoException;
@@ -44,7 +45,7 @@ final class ChangeStreamBatchCursor<T> implements AggregateResponseBatchCursor<T
4445

4546
private AggregateResponseBatchCursor<RawBsonDocument> wrapped;
4647
private BsonDocument resumeToken;
47-
private volatile boolean closed;
48+
private final AtomicBoolean closed;
4849

4950
ChangeStreamBatchCursor(final ChangeStreamOperation<T> changeStreamOperation,
5051
final AggregateResponseBatchCursor<RawBsonDocument> wrapped,
@@ -56,6 +57,7 @@ final class ChangeStreamBatchCursor<T> implements AggregateResponseBatchCursor<T
5657
this.wrapped = wrapped;
5758
this.resumeToken = resumeToken;
5859
this.maxWireVersion = maxWireVersion;
60+
closed = new AtomicBoolean();
5961
}
6062

6163
AggregateResponseBatchCursor<RawBsonDocument> getWrapped() {
@@ -108,8 +110,7 @@ public List<T> apply(final AggregateResponseBatchCursor<RawBsonDocument> queryBa
108110

109111
@Override
110112
public void close() {
111-
if (!closed) {
112-
closed = true;
113+
if (!closed.getAndSet(true)) {
113114
wrapped.close();
114115
binding.release();
115116
}

0 commit comments

Comments
 (0)