Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -115,18 +115,24 @@ abstract class WindowUpdateSender {
* the caller wants to buffer.
*/
boolean canBufferUnprocessedBytes(int len) {
return !checkWindowSizeExceeded(unprocessed.addAndGet(len));
long buffered, processed;
// get received before unprocessed in order to avoid counting
// unprocessed bytes that might get unbuffered asynchronously
// twice.
processed = received.get();
buffered = unprocessed.addAndGet(len);
return !checkWindowSizeExceeded(processed, buffered);
}

// adds the provided amount to the amount of already
// received and processed bytes and checks whether the
// processed and processed bytes and checks whether the
// flow control window is exceeded. If so, take
// corrective actions and return true.
private boolean checkWindowSizeExceeded(long len) {
private boolean checkWindowSizeExceeded(long processed, long len) {
// because windowSize is bound by Integer.MAX_VALUE
// we will never reach the point where received.get() + len
// could overflow
long rcv = received.get() + len;
long rcv = processed + len;
return rcv > windowSize && windowSizeExceeded(rcv);
}

Expand All @@ -141,6 +147,7 @@ private boolean checkWindowSizeExceeded(long len) {
* @param delta the amount of processed bytes to release
*/
void processed(int delta) {
assert delta >= 0 : delta;
long rest = unprocessed.addAndGet(-delta);
assert rest >= 0;
update(delta);
Expand All @@ -164,6 +171,7 @@ void processed(int delta) {
* @return the amount of remaining unprocessed bytes
*/
long released(int delta) {
assert delta >= 0 : delta;
long rest = unprocessed.addAndGet(-delta);
assert rest >= 0;
return rest;
Expand Down Expand Up @@ -192,7 +200,7 @@ void update(int delta) {
synchronized (this) {
int tosend = (int)Math.min(received.get(), Integer.MAX_VALUE);
if (tosend > limit) {
received.getAndAdd(-tosend);
received.addAndGet(-tosend);
sendWindowUpdate(tosend);
}
}
Expand Down
8 changes: 6 additions & 2 deletions test/jdk/java/net/httpclient/http2/StreamFlowControlTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2024, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -35,6 +35,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ProtocolException;
import java.net.URI;
Expand Down Expand Up @@ -357,7 +358,9 @@ public void handle(Http2TestExchange t) throws IOException {
// to wait for the connection window
fct.conn.obtainConnectionWindow(resp.length);
} catch (InterruptedException ie) {
// ignore and continue...
var ioe = new InterruptedIOException(ie.toString());
ioe.initCause(ie);
throw ioe;
}
}
try {
Expand All @@ -366,6 +369,7 @@ public void handle(Http2TestExchange t) throws IOException {
if (t instanceof FCHttp2TestExchange fct) {
fct.conn.updateConnectionWindow(resp.length);
}
throw x;
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -239,7 +239,7 @@ public static abstract class HttpTestExchange implements AutoCloseable {
public abstract OutputStream getResponseBody();
public abstract HttpTestRequestHeaders getRequestHeaders();
public abstract HttpTestResponseHeaders getResponseHeaders();
public abstract void sendResponseHeaders(int code, int contentLength) throws IOException;
public abstract void sendResponseHeaders(int code, long contentLength) throws IOException;
public abstract URI getRequestURI();
public abstract String getRequestMethod();
public abstract void close();
Expand Down Expand Up @@ -292,7 +292,7 @@ public HttpTestResponseHeaders getResponseHeaders() {
return HttpTestResponseHeaders.of(exchange.getResponseHeaders());
}
@Override
public void sendResponseHeaders(int code, int contentLength) throws IOException {
public void sendResponseHeaders(int code, long contentLength) throws IOException {
if (contentLength == 0) contentLength = -1;
else if (contentLength < 0) contentLength = 0;
exchange.sendResponseHeaders(code, contentLength);
Expand Down Expand Up @@ -355,7 +355,7 @@ public HttpTestResponseHeaders getResponseHeaders() {
return HttpTestResponseHeaders.of(exchange.getResponseHeaders());
}
@Override
public void sendResponseHeaders(int code, int contentLength) throws IOException {
public void sendResponseHeaders(int code, long contentLength) throws IOException {
if (contentLength == 0) contentLength = -1;
else if (contentLength < 0) contentLength = 0;
exchange.sendResponseHeaders(code, contentLength);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -63,22 +63,28 @@ synchronized void updateWindow(int update) {
}

void waitForWindow(int demand) throws InterruptedException {
// first wait for the connection window
conn.obtainConnectionWindow(demand);
// now wait for the stream window
// first wait for the stream window
waitForStreamWindow(demand);
// now wait for the connection window
conn.obtainConnectionWindow(demand);
}

public void waitForStreamWindow(int demand) throws InterruptedException {
synchronized (this) {
while (demand > 0) {
int n = Math.min(demand, window);
demand -= n;
window -= n;
if (demand > 0) {
wait();
public void waitForStreamWindow(int amount) throws InterruptedException {
int demand = amount;
try {
synchronized (this) {
while (amount > 0) {
int n = Math.min(amount, window);
amount -= n;
window -= n;
if (amount > 0) {
wait();
}
}
}
} catch (Throwable t) {
window += (demand - amount);
throw t;
}
}

Expand Down
Loading