Skip to content

Commit b159927

Browse files
committed
Backport bd6152f5967107d7b32db9bcfa224fc07314f098
1 parent 4addb57 commit b159927

File tree

5 files changed

+182
-38
lines changed

5 files changed

+182
-38
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/Stream.java

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import java.util.concurrent.Flow.Subscription;
4747
import java.util.concurrent.atomic.AtomicInteger;
4848
import java.util.concurrent.atomic.AtomicReference;
49+
import java.util.concurrent.locks.Lock;
50+
import java.util.concurrent.locks.ReentrantLock;
4951
import java.util.function.BiPredicate;
5052
import java.net.http.HttpClient;
5153
import java.net.http.HttpHeaders;
@@ -157,6 +159,10 @@ class Stream<T> extends ExchangeImpl<T> {
157159

158160
// send lock: prevent sending DataFrames after reset occurred.
159161
private final Object sendLock = new Object();
162+
// inputQ lock: methods that take from the inputQ
163+
// must not run concurrently.
164+
private final Lock inputQLock = new ReentrantLock();
165+
160166
/**
161167
* A reference to this Stream's connection Send Window controller. The
162168
* stream MUST acquire the appropriate amount of Send Window before
@@ -177,6 +183,8 @@ HttpConnection connection() {
177183
private void schedule() {
178184
boolean onCompleteCalled = false;
179185
HttpResponse.BodySubscriber<T> subscriber = responseSubscriber;
186+
// prevents drainInputQueue() from running concurrently
187+
inputQLock.lock();
180188
try {
181189
if (subscriber == null) {
182190
subscriber = responseSubscriber = pendingResponseSubscriber;
@@ -194,7 +202,7 @@ private void schedule() {
194202
handleReset(rf, subscriber);
195203
return;
196204
}
197-
DataFrame df = (DataFrame)frame;
205+
DataFrame df = (DataFrame) frame;
198206
boolean finished = df.getFlag(DataFrame.END_STREAM);
199207

200208
List<ByteBuffer> buffers = df.getData();
@@ -244,6 +252,7 @@ private void schedule() {
244252
} catch (Throwable throwable) {
245253
errorRef.compareAndSet(null, throwable);
246254
} finally {
255+
inputQLock.unlock();
247256
if (sched.isStopped()) drainInputQueue();
248257
}
249258

@@ -262,26 +271,36 @@ private void schedule() {
262271
} catch (Throwable x) {
263272
Log.logError("Subscriber::onError threw exception: {0}", t);
264273
} finally {
274+
// cancelImpl will eventually call drainInputQueue();
265275
cancelImpl(t);
266-
drainInputQueue();
267276
}
268277
}
269278
}
270279

271-
// must only be called from the scheduler schedule() loop.
272-
// ensure that all received data frames are accounted for
280+
// Called from the scheduler schedule() loop,
281+
// or after resetting the stream.
282+
// Ensures that all received data frames are accounted for
273283
// in the connection window flow control if the scheduler
274284
// is stopped before all the data is consumed.
285+
// The inputQLock is used to prevent concurrently taking
286+
// from the queue.
275287
private void drainInputQueue() {
276288
Http2Frame frame;
277-
while ((frame = inputQ.poll()) != null) {
278-
if (frame instanceof DataFrame df) {
279-
// Data frames that have been added to the inputQ
280-
// must be released using releaseUnconsumed() to
281-
// account for the amount of unprocessed bytes
282-
// tracked by the connection.windowUpdater.
283-
connection.releaseUnconsumed(df);
289+
// will wait until schedule() has finished taking
290+
// from the queue, if needed.
291+
inputQLock.lock();
292+
try {
293+
while ((frame = inputQ.poll()) != null) {
294+
if (frame instanceof DataFrame df) {
295+
// Data frames that have been added to the inputQ
296+
// must be released using releaseUnconsumed() to
297+
// account for the amount of unprocessed bytes
298+
// tracked by the connection.windowUpdater.
299+
connection.releaseUnconsumed(df);
300+
}
284301
}
302+
} finally {
303+
inputQLock.unlock();
285304
}
286305
}
287306

@@ -393,12 +412,35 @@ private void receiveDataFrame(DataFrame df) {
393412
return;
394413
}
395414
}
396-
inputQ.add(df);
415+
pushDataFrame(len, df);
397416
} finally {
398417
sched.runOrSchedule();
399418
}
400419
}
401420

421+
// Ensures that no data frame is pushed on the inputQ
422+
// after the stream is closed.
423+
// Changes to the `closed` boolean are guarded by the
424+
// stateLock. Contention should be low as only one
425+
// thread at a time adds to the inputQ, and
426+
// we can only contend when closing the stream.
427+
// Note that this method can run concurrently with
428+
// methods holding the inputQLock: that is OK.
429+
// The inputQLock is there to ensure that methods
430+
// taking from the queue are not running concurrently
431+
// with each others, but concurrently adding at the
432+
// end of the queue while peeking/polling at the head
433+
// is OK.
434+
private void pushDataFrame(int len, DataFrame df) {
435+
boolean closed = false;
436+
synchronized(this) {
437+
if (!(closed = this.closed)) {
438+
inputQ.add(df);
439+
}
440+
}
441+
if (closed && len > 0) connection.releaseUnconsumed(df);
442+
}
443+
402444
/** Handles a RESET frame. RESET is always handled inline in the queue. */
403445
private void receiveResetFrame(ResetFrame frame) {
404446
inputQ.add(frame);
@@ -1475,6 +1517,8 @@ void cancelImpl(final Throwable e, final int resetFrameErrCode) {
14751517
}
14761518
} catch (Throwable ex) {
14771519
Log.logError(ex);
1520+
} finally {
1521+
drainInputQueue();
14781522
}
14791523
}
14801524

@@ -1700,7 +1744,7 @@ String dbgString() {
17001744
@Override
17011745
protected boolean windowSizeExceeded(long received) {
17021746
onProtocolError(new ProtocolException("stream %s flow control window exceeded"
1703-
.formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR);
1747+
.formatted(streamid)), ResetFrame.FLOW_CONTROL_ERROR);
17041748
return true;
17051749
}
17061750
}

test/jdk/java/net/httpclient/http2/ConnectionFlowControlTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,11 @@ void test(String uri) throws Exception {
175175
var response = responses.get(keys[i]);
176176
String ckey = response.headers().firstValue("X-Connection-Key").get();
177177
if (label == null) label = ckey;
178-
assertEquals(ckey, label, "Unexpected key for " + query);
178+
if (i < max - 1) {
179+
// the connection window might be exceeded at i == max - 2, which
180+
// means that the last request could go on a new connection.
181+
assertEquals(ckey, label, "Unexpected key for " + query);
182+
}
179183
int wait = uri.startsWith("https://") ? 500 : 250;
180184
try (InputStream is = response.body()) {
181185
Thread.sleep(Utils.adjustTimeout(wait));

test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
/*
2525
* @test
26-
* @bug 8342075
26+
* @bug 8342075 8343855
2727
* @library /test/lib /test/jdk/java/net/httpclient/lib
2828
* @build jdk.httpclient.test.lib.http2.Http2TestServer jdk.test.lib.net.SimpleSSLContext
2929
* jdk.httpclient.test.lib.common.TestServerConfigurator
@@ -56,6 +56,7 @@
5656
import javax.net.ssl.SSLContext;
5757
import javax.net.ssl.SSLSession;
5858

59+
import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpHeadOrGetHandler;
5960
import jdk.httpclient.test.lib.common.HttpServerAdapters.HttpTestServer;
6061
import jdk.httpclient.test.lib.http2.BodyOutputStream;
6162
import jdk.httpclient.test.lib.http2.Http2Handler;
@@ -72,6 +73,7 @@
7273
import org.testng.annotations.DataProvider;
7374
import org.testng.annotations.Test;
7475

76+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
7577
import static org.testng.Assert.assertEquals;
7678
import static org.testng.Assert.fail;
7779

@@ -95,6 +97,19 @@ public Object[][] variants() {
9597
};
9698
}
9799

100+
static void sleep(long wait) throws InterruptedException {
101+
if (wait <= 0) return;
102+
long remaining = Utils.adjustTimeout(wait);
103+
long start = System.nanoTime();
104+
while (remaining > 0) {
105+
Thread.sleep(remaining);
106+
long end = System.nanoTime();
107+
remaining = remaining - NANOSECONDS.toMillis(end - start);
108+
}
109+
System.out.printf("Waited %s ms%n",
110+
NANOSECONDS.toMillis(System.nanoTime() - start));
111+
}
112+
98113

99114
@Test(dataProvider = "variants")
100115
void test(String uri,
@@ -121,7 +136,7 @@ void test(String uri,
121136
CompletableFuture<String> sent = new CompletableFuture<>();
122137
responseSent.put(query, sent);
123138
HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
124-
.POST(BodyPublishers.ofString("Hello there!"))
139+
.GET()
125140
.build();
126141
System.out.println("\nSending request:" + uriWithQuery);
127142
final HttpClient cc = client;
@@ -136,9 +151,9 @@ void test(String uri,
136151
// we have to pull to get the exception, but slow enough
137152
// so that DataFrames are buffered up to the point that
138153
// the window is exceeded...
139-
int wait = uri.startsWith("https://") ? 500 : 350;
154+
long wait = uri.startsWith("https://") ? 800 : 350;
140155
try (InputStream is = response.body()) {
141-
Thread.sleep(Utils.adjustTimeout(wait));
156+
sleep(wait);
142157
is.readAllBytes();
143158
}
144159
// we could fail here if we haven't waited long enough
@@ -187,7 +202,7 @@ void testAsync(String uri,
187202
CompletableFuture<String> sent = new CompletableFuture<>();
188203
responseSent.put(query, sent);
189204
HttpRequest request = HttpRequest.newBuilder(uriWithQuery)
190-
.POST(BodyPublishers.ofString("Hello there!"))
205+
.GET()
191206
.build();
192207
System.out.println("\nSending request:" + uriWithQuery);
193208
final HttpClient cc = client;
@@ -201,9 +216,9 @@ void testAsync(String uri,
201216
assertEquals(key, label, "Unexpected key for " + query);
202217
}
203218
sent.join();
204-
int wait = uri.startsWith("https://") ? 600 : 300;
219+
long wait = uri.startsWith("https://") ? 800 : 350;
205220
try (InputStream is = response.body()) {
206-
Thread.sleep(Utils.adjustTimeout(wait));
221+
sleep(wait);
207222
is.readAllBytes();
208223
}
209224
// we could fail here if we haven't waited long enough
@@ -269,7 +284,9 @@ public void setup() throws Exception {
269284
var https2TestServer = new Http2TestServer("localhost", true, sslContext);
270285
https2TestServer.addHandler(new Http2TestHandler(), "/https2/");
271286
this.https2TestServer = HttpTestServer.of(https2TestServer);
287+
this.https2TestServer.addHandler(new HttpHeadOrGetHandler(), "/https2/head/");
272288
https2URI = "https://" + this.https2TestServer.serverAuthority() + "/https2/x";
289+
String h2Head = "https://" + this.https2TestServer.serverAuthority() + "/https2/head/z";
273290

274291
// Override the default exchange supplier with a custom one to enable
275292
// particular test scenarios
@@ -278,6 +295,17 @@ public void setup() throws Exception {
278295

279296
this.http2TestServer.start();
280297
this.https2TestServer.start();
298+
299+
// warmup to eliminate delay due to SSL class loading and initialization.
300+
HttpClient client = HttpClient.newBuilder().executor(Executors.newCachedThreadPool()).sslContext(sslContext).build();
301+
try {
302+
var request = HttpRequest.newBuilder(URI.create(h2Head)).method("HEAD", BodyPublishers.noBody()).build();
303+
var resp = client.send(request, BodyHandlers.discarding());
304+
assertEquals(resp.statusCode(), 200);
305+
} finally {
306+
ExecutorService exec = (ExecutorService)client.executor().get();
307+
exec.shutdownNow();
308+
}
281309
}
282310

283311
@AfterTest
@@ -296,11 +324,19 @@ public void handle(Http2TestExchange t) throws IOException {
296324
OutputStream os = t.getResponseBody()) {
297325

298326
byte[] bytes = is.readAllBytes();
299-
System.out.println("Server " + t.getLocalAddress() + " received:\n"
300-
+ t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8));
327+
if (bytes.length != 0) {
328+
System.out.println("Server " + t.getLocalAddress() + " received:\n"
329+
+ t.getRequestURI() + ": " + new String(bytes, StandardCharsets.UTF_8));
330+
} else {
331+
System.out.println("No request body for " + t.getRequestMethod());
332+
}
333+
301334
t.getResponseHeaders().setHeader("X-Connection-Key", t.getConnectionKey());
302335

303-
if (bytes.length == 0) bytes = "no request body!".getBytes(StandardCharsets.UTF_8);
336+
if (bytes.length == 0) {
337+
bytes = "no request body!"
338+
.repeat(100).getBytes(StandardCharsets.UTF_8);
339+
}
304340
int window = Integer.getInteger("jdk.httpclient.windowsize", 2 * 16 * 1024);
305341
final int maxChunkSize;
306342
if (t instanceof FCHttp2TestExchange fct) {
@@ -324,13 +360,22 @@ public void handle(Http2TestExchange t) throws IOException {
324360
// ignore and continue...
325361
}
326362
}
327-
((BodyOutputStream) os).writeUncontrolled(resp, 0, resp.length);
363+
try {
364+
((BodyOutputStream) os).writeUncontrolled(resp, 0, resp.length);
365+
} catch (IOException x) {
366+
if (t instanceof FCHttp2TestExchange fct) {
367+
fct.conn.updateConnectionWindow(resp.length);
368+
}
369+
}
370+
}
371+
} finally {
372+
if (t instanceof FCHttp2TestExchange fct) {
373+
fct.responseSent(query);
374+
} else {
375+
fail("Exchange is not %s but %s"
376+
.formatted(FCHttp2TestExchange.class.getName(), t.getClass().getName()));
328377
}
329378
}
330-
if (t instanceof FCHttp2TestExchange fct) {
331-
fct.responseSent(query);
332-
} else fail("Exchange is not %s but %s"
333-
.formatted(FCHttp2TestExchange.class.getName(), t.getClass().getName()));
334379
}
335380
}
336381

0 commit comments

Comments
 (0)