From e14e0c5940c071dd57d759884c67b2b077e45d1d Mon Sep 17 00:00:00 2001 From: Goetz Lindenmaier Date: Tue, 19 Aug 2025 16:18:50 +0200 Subject: [PATCH] backport 06126361db1edb1d4c181a82952c1ac133a839f9 --- .../internal/net/http/WindowUpdateSender.java | 20 +++- .../http2/StreamFlowControlTest.java | 8 +- .../test/lib/common/HttpServerAdapters.java | 8 +- .../test/lib/http2/BodyOutputStream.java | 30 +++-- .../lib/http2/Http2TestServerConnection.java | 111 +++++++++++------- 5 files changed, 111 insertions(+), 66 deletions(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java b/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java index 3424ae1ccaa..996fa293542 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/WindowUpdateSender.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -115,18 +115,24 @@ abstract class WindowUpdateSender { * the caller wants to buffer. */ boolean canBufferUnprocessedBytes(int len) { - return !checkWindowSizeExceeded(unprocessed.addAndGet(len)); + long buffered, processed; + // get received before unprocessed in order to avoid counting + // unprocessed bytes that might get unbuffered asynchronously + // twice. + processed = received.get(); + buffered = unprocessed.addAndGet(len); + return !checkWindowSizeExceeded(processed, buffered); } // adds the provided amount to the amount of already - // received and processed bytes and checks whether the + // processed and processed bytes and checks whether the // flow control window is exceeded. If so, take // corrective actions and return true. - private boolean checkWindowSizeExceeded(long len) { + private boolean checkWindowSizeExceeded(long processed, long len) { // because windowSize is bound by Integer.MAX_VALUE // we will never reach the point where received.get() + len // could overflow - long rcv = received.get() + len; + long rcv = processed + len; return rcv > windowSize && windowSizeExceeded(rcv); } @@ -141,6 +147,7 @@ private boolean checkWindowSizeExceeded(long len) { * @param delta the amount of processed bytes to release */ void processed(int delta) { + assert delta >= 0 : delta; long rest = unprocessed.addAndGet(-delta); assert rest >= 0; update(delta); @@ -164,6 +171,7 @@ void processed(int delta) { * @return the amount of remaining unprocessed bytes */ long released(int delta) { + assert delta >= 0 : delta; long rest = unprocessed.addAndGet(-delta); assert rest >= 0; return rest; @@ -192,7 +200,7 @@ void update(int delta) { synchronized (this) { int tosend = (int)Math.min(received.get(), Integer.MAX_VALUE); if (tosend > limit) { - received.getAndAdd(-tosend); + received.addAndGet(-tosend); sendWindowUpdate(tosend); } } diff --git a/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java b/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java index e3fd4e1246b..06a50ed6f72 100644 --- a/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java +++ b/test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2024, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -35,6 +35,7 @@ import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.io.OutputStream; import java.net.ProtocolException; import java.net.URI; @@ -357,7 +358,9 @@ public void handle(Http2TestExchange t) throws IOException { // to wait for the connection window fct.conn.obtainConnectionWindow(resp.length); } catch (InterruptedException ie) { - // ignore and continue... + var ioe = new InterruptedIOException(ie.toString()); + ioe.initCause(ie); + throw ioe; } } try { @@ -366,6 +369,7 @@ public void handle(Http2TestExchange t) throws IOException { if (t instanceof FCHttp2TestExchange fct) { fct.conn.updateConnectionWindow(resp.length); } + throw x; } } } finally { diff --git a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/common/HttpServerAdapters.java b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/common/HttpServerAdapters.java index 3b9de333c93..d095bffcd20 100644 --- a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/common/HttpServerAdapters.java +++ b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/common/HttpServerAdapters.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -239,7 +239,7 @@ public static abstract class HttpTestExchange implements AutoCloseable { public abstract OutputStream getResponseBody(); public abstract HttpTestRequestHeaders getRequestHeaders(); public abstract HttpTestResponseHeaders getResponseHeaders(); - public abstract void sendResponseHeaders(int code, int contentLength) throws IOException; + public abstract void sendResponseHeaders(int code, long contentLength) throws IOException; public abstract URI getRequestURI(); public abstract String getRequestMethod(); public abstract void close(); @@ -292,7 +292,7 @@ public HttpTestResponseHeaders getResponseHeaders() { return HttpTestResponseHeaders.of(exchange.getResponseHeaders()); } @Override - public void sendResponseHeaders(int code, int contentLength) throws IOException { + public void sendResponseHeaders(int code, long contentLength) throws IOException { if (contentLength == 0) contentLength = -1; else if (contentLength < 0) contentLength = 0; exchange.sendResponseHeaders(code, contentLength); @@ -355,7 +355,7 @@ public HttpTestResponseHeaders getResponseHeaders() { return HttpTestResponseHeaders.of(exchange.getResponseHeaders()); } @Override - public void sendResponseHeaders(int code, int contentLength) throws IOException { + public void sendResponseHeaders(int code, long contentLength) throws IOException { if (contentLength == 0) contentLength = -1; else if (contentLength < 0) contentLength = 0; exchange.sendResponseHeaders(code, contentLength); diff --git a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyOutputStream.java b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyOutputStream.java index db5778ec3d9..c6eee5afabf 100644 --- a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyOutputStream.java +++ b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/BodyOutputStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016, 2024, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2016, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -63,22 +63,28 @@ synchronized void updateWindow(int update) { } void waitForWindow(int demand) throws InterruptedException { - // first wait for the connection window - conn.obtainConnectionWindow(demand); - // now wait for the stream window + // first wait for the stream window waitForStreamWindow(demand); + // now wait for the connection window + conn.obtainConnectionWindow(demand); } - public void waitForStreamWindow(int demand) throws InterruptedException { - synchronized (this) { - while (demand > 0) { - int n = Math.min(demand, window); - demand -= n; - window -= n; - if (demand > 0) { - wait(); + public void waitForStreamWindow(int amount) throws InterruptedException { + int demand = amount; + try { + synchronized (this) { + while (amount > 0) { + int n = Math.min(amount, window); + amount -= n; + window -= n; + if (amount > 0) { + wait(); + } } } + } catch (Throwable t) { + window += (demand - amount); + throw t; } } diff --git a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java index 7c96e0e036c..059edf0cce7 100644 --- a/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java +++ b/test/jdk/java/net/httpclient/lib/jdk/httpclient/test/lib/http2/Http2TestServerConnection.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -171,7 +171,7 @@ void fail(Throwable t) { Properties properties) throws IOException { - System.err.println("TestServer: New connection from " + socket); + System.err.println(server.name + ": New connection from " + socket); if (socket instanceof SSLSocket) { SSLSocket sslSocket = (SSLSocket)socket; @@ -218,10 +218,10 @@ private SettingsFrame getServerSettingProperties() { String prop = properties.getProperty(propPrefix + key); if (prop != null) { try { - System.err.println("TestServer: setting " + key + " property to: " + + System.err.println(server.name + ": setting " + key + " property to: " + prop); int num = Integer.parseInt(numS); - System.err.println("TestServer: num = " + num); + System.err.println(server.name + ": num = " + num); s.setParameter(num, Integer.parseInt(prop)); } catch (NumberFormatException e) {/* ignore errors */} } @@ -269,7 +269,7 @@ private void sendGoAway(final int error) throws IOException { } final GoAwayFrame frame = new GoAwayFrame(maxProcessedStreamId, error); outputQ.put(frame); - System.err.println("Sending GOAWAY frame " + frame + " from server connection " + this); + System.err.println(server.name + ": Sending GOAWAY frame " + frame + " from server connection " + this); } /** @@ -285,7 +285,7 @@ private PingRequest getNextRequest() { */ void handlePing(PingFrame ping) throws IOException { if (ping.streamid() != 0) { - System.err.println("Invalid ping received"); + System.err.println(server.name + ": Invalid ping received"); close(ErrorFrame.PROTOCOL_ERROR); return; } @@ -293,7 +293,7 @@ void handlePing(PingFrame ping) throws IOException { // did we send a Ping? PingRequest request = getNextRequest(); if (request == null) { - System.err.println("Invalid ping ACK received"); + System.err.println(server.name + ": Invalid ping ACK received"); close(ErrorFrame.PROTOCOL_ERROR); return; } else if (!Arrays.equals(request.pingData, ping.getData())) { @@ -356,7 +356,7 @@ void close(int error) { if (stopping) return; stopping = true; - System.err.printf("Server connection to %s stopping. %d streams\n", + System.err.printf(server.name + ": Server connection to %s stopping. %d streams\n", socket.getRemoteSocketAddress().toString(), streams.size()); streams.forEach((i, q) -> { q.orderlyClose(); @@ -376,16 +376,20 @@ private void readPreface() throws IOException { byte[] bytes = new byte[len]; int n = is.readNBytes(bytes, 0, len); if (Arrays.compare(clientPreface, bytes) != 0) { - System.err.printf("Invalid preface: read %d/%d bytes%n", n, len); - throw new IOException("Invalid preface: " + - new String(bytes, 0, len, ISO_8859_1)); + String msg = String.format("Invalid preface: read %s/%s bytes", n, len); + System.err.println(server.name + ": " + msg); + throw new IOException(msg +": \"" + + new String(bytes, 0, n, ISO_8859_1) + .replace("\r", "\\r") + .replace("\n", "\\n") + + "\""); } } Http1InitialRequest doUpgrade(Http1InitialRequest upgrade) throws IOException { String h2c = getHeader(upgrade.headers, "Upgrade"); if (h2c == null || !h2c.equals("h2c")) { - System.err.println("Server:HEADERS: " + upgrade); + System.err.println(server.name + ":HEADERS: " + upgrade); throw new IOException("Bad upgrade 1 " + h2c); } @@ -457,7 +461,7 @@ void run() throws Exception { socket.close(); return; } else { - System.err.println("Server:HEADERS: " + upgrade); + System.err.println(server.name + ":HEADERS: " + upgrade); throw new IOException("Bad upgrade 1 " + h2c); } } @@ -556,7 +560,7 @@ private void handleCommonFrame(Http2Frame f) throws IOException { outputQ.put(frame); return; } else if (f instanceof GoAwayFrame) { - System.err.println("Closing: "+ f.toString()); + System.err.println(server.name + ": Closing connection: "+ f.toString()); close(ErrorFrame.NO_ERROR); } else if (f instanceof PingFrame) { handlePing((PingFrame)f); @@ -649,7 +653,7 @@ void createPrimordialStream(Http1InitialRequest request) throws IOException { // skip processing the request if configured to do so final String connKey = connectionKey(); if (!shouldProcessNewHTTPRequest(connKey)) { - System.err.println("Rejecting primordial stream 1 and sending GOAWAY" + + System.err.println(server.name + ": Rejecting primordial stream 1 and sending GOAWAY" + " on server connection " + connKey + ", for request: " + path); sendGoAway(ErrorFrame.NO_ERROR); return; @@ -726,7 +730,7 @@ void createStream(HeaderFrame frame) throws IOException { final String connKey = connectionKey(); final String path = headers.firstValue(":path").orElse(""); if (!shouldProcessNewHTTPRequest(connKey)) { - System.err.println("Rejecting stream " + streamid + System.err.println(server.name + ": Rejecting stream " + streamid + " and sending GOAWAY on server connection " + connKey + ", for request: " + path); sendGoAway(ErrorFrame.NO_ERROR); @@ -764,17 +768,17 @@ void handleRequest(HttpHeaders headers, //System.out.println("scheme = " + scheme); String authority = headers.firstValue(":authority").orElse(""); //System.out.println("authority = " + authority); - System.err.printf("TestServer: %s %s\n", method, path); + System.err.printf(server.name + ": %s %s\n", method, path); int winsize = clientSettings.getParameter( SettingsFrame.INITIAL_WINDOW_SIZE); //System.err.println ("Stream window size = " + winsize); final InputStream bis; if (endStreamReceived && queue.size() == 0) { - System.err.println("Server: got END_STREAM for stream " + streamid); + System.err.println(server.name + ": got END_STREAM for stream " + streamid); bis = NullInputStream.INSTANCE; } else { - System.err.println("Server: creating input stream for stream " + streamid); + System.err.println(server.name + ": creating input stream for stream " + streamid); bis = new BodyInputStream(queue, streamid, this); } try (bis; @@ -802,7 +806,7 @@ headers, rspheadersBuilder, uri, bis, getSSLSession(), if (bos.closed) { Queue q = streams.get(streamid); if (q != null && (q.isClosed() || q.isClosing())) { - System.err.println("TestServer: Stream " + streamid + " closed: " + closed); + System.err.println(server.name + ": Stream " + streamid + " closed: " + closed); return; } } @@ -812,7 +816,7 @@ headers, rspheadersBuilder, uri, bis, getSSLSession(), // everything happens in the exchange from here. Hopefully will // return though. } catch (Throwable e) { - System.err.println("TestServer: handleRequest exception: " + e); + System.err.println(server.name + ": handleRequest exception: " + e); e.printStackTrace(); close(-1); } @@ -844,7 +848,7 @@ void readLoop() { while (!stopping) { Http2Frame frame = readFrameImpl(); if (frame == null) { - System.err.println("EOF reached on connection " + connectionKey() + System.err.println(server.name + ": EOF reached on connection " + connectionKey() + ", will no longer accept incoming frames"); closeIncoming(); return; @@ -865,7 +869,7 @@ void readLoop() { Queue q = streams.get(stream); if (frame.type() == HeadersFrame.TYPE) { if (q != null) { - System.err.println("HEADERS frame for existing stream! Error."); + System.err.println(server.name + ": HEADERS frame for existing stream! Error."); // TODO: close connection continue; } else { @@ -874,7 +878,8 @@ void readLoop() { // if we already sent a goaway, then don't create new streams with // higher stream ids. if (finalProcessedStreamId != -1 && streamId > finalProcessedStreamId) { - System.err.println(connectionKey() + " resetting stream " + streamId + System.err.println(server.name + ": " + connectionKey() + + " resetting stream " + streamId + " as REFUSED_STREAM"); final ResetFrame rst = new ResetFrame(streamId, REFUSED_STREAM); outputQ.put(rst); @@ -884,7 +889,7 @@ void readLoop() { } } else { if (q == null && !pushStreams.contains(stream)) { - System.err.printf("Non Headers frame received with"+ + System.err.printf(server.name + ": Non Headers frame received with"+ " non existing stream (%d) ", frame.streamid()); System.err.println(frame); continue; @@ -914,15 +919,15 @@ void readLoop() { } else if (isClientStreamId(stream) && stream < next) { // We may receive a reset on a client stream that has already // been closed. Just ignore it. - System.err.println("TestServer: received ResetFrame on closed stream: " + stream); + System.err.println(server.name + ": received ResetFrame on closed stream: " + stream); System.err.println(frame); } else if (isServerStreamId(stream) && stream < nextPush) { // We may receive a reset on a push stream that has already // been closed. Just ignore it. - System.err.println("TestServer: received ResetFrame on closed push stream: " + stream); + System.err.println(server.name + ": received ResetFrame on closed push stream: " + stream); System.err.println(frame); } else { - System.err.println("TestServer: Unexpected frame on: " + stream); + System.err.println(server.name + ": Unexpected frame on: " + stream); System.err.println(frame); throw new IOException("Unexpected frame"); } @@ -934,7 +939,7 @@ void readLoop() { } } catch (Throwable e) { if (!stopping) { - System.err.println("Http server reader thread shutdown"); + System.err.println(server.name + ": Http server reader thread shutdown"); e.printStackTrace(); } close(ErrorFrame.PROTOCOL_ERROR); @@ -1075,7 +1080,7 @@ void writeLoop() { : new ContinuationFrame(rh.streamid(), flags, list); if (Log.headers()) { // avoid too much chatter: log only if Log.headers() is enabled - System.err.println("TestServer writing " + hf); + System.err.println(server.name + ": writing " + hf); } writeFrame(hf); cont++; @@ -1085,7 +1090,7 @@ void writeLoop() { } else writeFrame(frame); } - System.err.println("TestServer: Connection writer stopping"); + System.err.println(server.name + ": Connection writer stopping " + connectionKey()); } catch (Throwable e) { e.printStackTrace(); /*close(); @@ -1137,7 +1142,7 @@ public void sendEndStream() throws IOException { ii.transferTo(oo); } catch (Throwable ex) { - System.err.printf("TestServer: pushing response error: %s\n", + System.err.printf(server.name + ": pushing response error: %s\n", ex.toString()); } finally { closeIgnore(ii); @@ -1300,7 +1305,7 @@ Http1InitialRequest readHttp1Request() throws IOException { } return new Http1InitialRequest(headers, buf); } catch (IOException e) { - System.err.println("TestServer: headers read: [ " + headers + " ]"); + System.err.println(server.name + ": headers read: [ " + headers + " ]"); throw e; } } @@ -1332,7 +1337,7 @@ void sendHttp1Response(int code, String msg, String... headers) throws IOExcepti } private void unexpectedFrame(Http2Frame frame) { - System.err.println("OOPS. Unexpected"); + System.err.println(server.name + ": OOPS. Unexpected"); assert false; } @@ -1372,19 +1377,41 @@ void registerStreamWindowUpdater(int streamid, Consumer r) { * @param amount */ public synchronized void obtainConnectionWindow(int amount) throws InterruptedException { - while (amount > 0) { - int n = Math.min(amount, sendWindow); - amount -= n; - sendWindow -= n; - if (amount > 0) - wait(); + int demand = amount; + try { + int waited = 0; + while (amount > 0) { + int n = Math.min(amount, sendWindow); + amount -= n; + sendWindow -= n; + if (amount > 0) { + // Do not include this print line on a version that does not have + // JDK-8337395 + System.err.printf("%s: blocked waiting for %s connection window, obtained %s%n", + server.name, amount, demand - amount); + waited++; + wait(); + } + } + if (waited > 0) { + // Do not backport this print line on a version that does not have + // JDK-8337395 + System.err.printf("%s: obtained %s connection window, remaining %s%n", + server.name, demand, sendWindow); + } + assert amount == 0; + } catch (Throwable t) { + sendWindow += (demand - amount); + throw t; } } public void updateConnectionWindow(int amount) { - System.out.printf("sendWindow (window=%s, amount=%s) is now: %s%n", - sendWindow, amount, sendWindow + amount); synchronized (this) { + // Do not backport this print line on a version that does not have + // JDK-8337395 + System.err.printf(server.name + ": update sendWindow (window=%s, amount=%s) is now: %s%n", + sendWindow, amount, sendWindow + amount); sendWindow += amount; notifyAll(); }