@@ -157,14 +157,13 @@ class Stream<T> extends ExchangeImpl<T> {
157
157
158
158
// send lock: prevent sending DataFrames after reset occurred.
159
159
private final Object sendLock = new Object ();
160
-
161
160
/**
162
161
* A reference to this Stream's connection Send Window controller. The
163
162
* stream MUST acquire the appropriate amount of Send Window before
164
163
* sending any data. Will be null for PushStreams, as they cannot send data.
165
164
*/
166
165
private final WindowController windowController ;
167
- private final WindowUpdateSender windowUpdater ;
166
+ private final WindowUpdateSender streamWindowUpdater ;
168
167
169
168
@ Override
170
169
HttpConnection connection () {
@@ -203,7 +202,8 @@ private void schedule() {
203
202
int size = Utils .remaining (dsts , Integer .MAX_VALUE );
204
203
if (size == 0 && finished ) {
205
204
inputQ .remove ();
206
- connection .ensureWindowUpdated (df ); // must update connection window
205
+ // consumed will not be called
206
+ connection .releaseUnconsumed (df ); // must update connection window
207
207
Log .logTrace ("responseSubscriber.onComplete" );
208
208
if (debug .on ()) debug .log ("incoming: onComplete" );
209
209
sched .stop ();
@@ -219,7 +219,11 @@ private void schedule() {
219
219
try {
220
220
subscriber .onNext (dsts );
221
221
} catch (Throwable t ) {
222
- connection .dropDataFrame (df ); // must update connection window
222
+ // Data frames that have been added to the inputQ
223
+ // must be released using releaseUnconsumed() to
224
+ // account for the amount of unprocessed bytes
225
+ // tracked by the connection.windowUpdater.
226
+ connection .releaseUnconsumed (df );
223
227
throw t ;
224
228
}
225
229
if (consumed (df )) {
@@ -271,8 +275,12 @@ private void schedule() {
271
275
private void drainInputQueue () {
272
276
Http2Frame frame ;
273
277
while ((frame = inputQ .poll ()) != null ) {
274
- if (frame instanceof DataFrame ) {
275
- connection .dropDataFrame ((DataFrame )frame );
278
+ if (frame instanceof DataFrame df ) {
279
+ // Data frames that have been added to the inputQ
280
+ // must be released using releaseUnconsumed() to
281
+ // account for the amount of unprocessed bytes
282
+ // tracked by the connection.windowUpdater.
283
+ connection .releaseUnconsumed (df );
276
284
}
277
285
}
278
286
}
@@ -298,12 +306,13 @@ private boolean consumed(DataFrame df) {
298
306
boolean endStream = df .getFlag (DataFrame .END_STREAM );
299
307
if (len == 0 ) return endStream ;
300
308
301
- connection .windowUpdater .update (len );
302
-
309
+ connection .windowUpdater .processed (len );
303
310
if (!endStream ) {
311
+ streamWindowUpdater .processed (len );
312
+ } else {
304
313
// Don't send window update on a stream which is
305
314
// closed or half closed.
306
- windowUpdater . update (len );
315
+ streamWindowUpdater . released (len );
307
316
}
308
317
309
318
// true: end of stream; false: more data coming
@@ -373,8 +382,21 @@ public String toString() {
373
382
}
374
383
375
384
private void receiveDataFrame (DataFrame df ) {
376
- inputQ .add (df );
377
- sched .runOrSchedule ();
385
+ try {
386
+ int len = df .payloadLength ();
387
+ if (len > 0 ) {
388
+ // we return from here if the connection is being closed.
389
+ if (!connection .windowUpdater .canBufferUnprocessedBytes (len )) return ;
390
+ // we return from here if the stream is being closed.
391
+ if (closed || !streamWindowUpdater .canBufferUnprocessedBytes (len )) {
392
+ connection .releaseUnconsumed (df );
393
+ return ;
394
+ }
395
+ }
396
+ inputQ .add (df );
397
+ } finally {
398
+ sched .runOrSchedule ();
399
+ }
378
400
}
379
401
380
402
/** Handles a RESET frame. RESET is always handled inline in the queue. */
@@ -452,7 +474,7 @@ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
452
474
this .responseHeadersBuilder = new HttpHeadersBuilder ();
453
475
this .rspHeadersConsumer = new HeadersConsumer ();
454
476
this .requestPseudoHeaders = createPseudoHeaders (request );
455
- this .windowUpdater = new StreamWindowUpdateSender (connection );
477
+ this .streamWindowUpdater = new StreamWindowUpdateSender (connection );
456
478
}
457
479
458
480
private boolean checkRequestCancelled () {
@@ -486,6 +508,8 @@ void incoming(Http2Frame frame) throws IOException {
486
508
if (debug .on ()) {
487
509
debug .log ("request cancelled or stream closed: dropping data frame" );
488
510
}
511
+ // Data frames that have not been added to the inputQ
512
+ // can be released using dropDataFrame
489
513
connection .dropDataFrame (df );
490
514
} else {
491
515
receiveDataFrame (df );
@@ -1365,12 +1389,18 @@ void cancel(IOException cause) {
1365
1389
1366
1390
@ Override
1367
1391
void onProtocolError (final IOException cause ) {
1392
+ onProtocolError (cause , ResetFrame .PROTOCOL_ERROR );
1393
+ }
1394
+
1395
+ void onProtocolError (final IOException cause , int code ) {
1368
1396
if (debug .on ()) {
1369
- debug .log ("cancelling exchange on stream %d due to protocol error: %s" , streamid , cause .getMessage ());
1397
+ debug .log ("cancelling exchange on stream %d due to protocol error [%s]: %s" ,
1398
+ streamid , ErrorFrame .stringForCode (code ),
1399
+ cause .getMessage ());
1370
1400
}
1371
1401
Log .logError ("cancelling exchange on stream {0} due to protocol error: {1}\n " , streamid , cause );
1372
1402
// send a RESET frame and close the stream
1373
- cancelImpl (cause , ResetFrame . PROTOCOL_ERROR );
1403
+ cancelImpl (cause , code );
1374
1404
}
1375
1405
1376
1406
void connectionClosing (Throwable cause ) {
@@ -1666,6 +1696,13 @@ String dbgString() {
1666
1696
return dbgString = dbg ;
1667
1697
}
1668
1698
}
1699
+
1700
+ @ Override
1701
+ protected boolean windowSizeExceeded (long received ) {
1702
+ onProtocolError (new ProtocolException ("stream %s flow control window exceeded"
1703
+ .formatted (streamid )), ResetFrame .FLOW_CONTROL_ERROR );
1704
+ return true ;
1705
+ }
1669
1706
}
1670
1707
1671
1708
/**
0 commit comments