Skip to content

Commit d7779ac

Browse files
committed
Fix high memory usage in ContextPropagationHelper
Prior to this commit, gh-1149 added support for cancellation detection at the transport level and the propagation of the CANCEL signal to data fetcher `Publisher`. `ContextPropagationHelper` stores in the context a `Sink` that emits the cancel signal. All upstream publishers are then subscribing to this and canceling themselves if they receive a signal. In the case of high field count queries + high request/sec services, this would create a significant memory overhead. This commit reduces the support to the essential parts: * avoid further calls to data fetchers if the request is canceled * cancel subscription publishers This means other in-flight data fetchers that are publisher-based are not canceled anymore. This should be a good compromise for high RPS See gh-1242
1 parent 7d1b5ce commit d7779ac

File tree

5 files changed

+38
-44
lines changed

5 files changed

+38
-44
lines changed

spring-graphql/src/main/java/org/springframework/graphql/execution/ContextDataFetcherDecorator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public Object get(DataFetchingEnvironment env) throws Exception {
131131
value = ReactiveAdapterRegistryHelper.toMonoIfReactive(value);
132132

133133
if (value instanceof Mono<?> mono) {
134-
value = ContextPropagationHelper.bindCancelFrom(mono, graphQlContext).contextWrite(snapshot::updateContext).toFuture();
134+
value = mono.contextWrite(snapshot::updateContext).toFuture();
135135
}
136136

137137
return value;

spring-graphql/src/main/java/org/springframework/graphql/execution/ContextPropagationHelper.java

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.graphql.execution;
1818

19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
1921
import graphql.GraphQLContext;
2022
import io.micrometer.context.ContextSnapshot;
2123
import io.micrometer.context.ContextSnapshotFactory;
@@ -40,7 +42,9 @@ public abstract class ContextPropagationHelper {
4042

4143
private static final String CONTEXT_SNAPSHOT_FACTORY_KEY = ContextPropagationHelper.class.getName() + ".KEY";
4244

43-
private static final String CANCEL_PUBLISHER_KEY = ContextPropagationHelper.class.getName() + ".cancelled";
45+
private static final String CANCELED_KEY = ContextPropagationHelper.class.getName() + ".canceled";
46+
47+
private static final String CANCELED_PUBLISHER_KEY = ContextPropagationHelper.class.getName() + ".canceledPublisher";
4448

4549

4650
/**
@@ -120,50 +124,37 @@ public static ContextSnapshot captureFrom(GraphQLContext context) {
120124
}
121125

122126
/**
123-
* Create a publisher and store it into the given {@link GraphQLContext}.
124-
* This publisher can then be used to propagate cancel signals to upstream publishers.
127+
* Create an atomic boolean and store it into the given {@link GraphQLContext}.
128+
* This boolean value can then be checked by upstream publishers to know whether the request is canceled.
125129
* @param context the current GraphQL context
126-
* @since 1.3.5
130+
* @since 1.3.6
127131
*/
128-
public static Sinks.Empty<Void> createCancelPublisher(GraphQLContext context) {
129-
Sinks.Empty<Void> requestCancelled = Sinks.empty();
130-
context.put(CANCEL_PUBLISHER_KEY, requestCancelled.asMono());
131-
return requestCancelled;
132+
public static Runnable createCancelSignal(GraphQLContext context) {
133+
AtomicBoolean requestCancelled = new AtomicBoolean();
134+
Sinks.Empty<Void> cancelSignal = Sinks.empty();
135+
context.put(CANCELED_KEY, requestCancelled);
136+
context.put(CANCELED_PUBLISHER_KEY, cancelSignal.asMono());
137+
return () -> {
138+
requestCancelled.set(true);
139+
cancelSignal.tryEmitEmpty();
140+
};
132141
}
133142

134143
/**
135144
* Return {@code true} if the current request has been cancelled, {@code false} otherwise.
136-
* This checks whether a {@link #createCancelPublisher(GraphQLContext) cancellation publisher is present}
145+
* This checks whether a {@link #createCancelSignal(GraphQLContext) cancellation publisher is present}
137146
* in the given context and the cancel signal has fired already.
138147
* @param context the current GraphQL context
139148
* @since 1.4.0
140149
*/
141150
public static boolean isCancelled(GraphQLContext context) {
142-
Mono<Void> cancelSignal = context.get(CANCEL_PUBLISHER_KEY);
143-
if (cancelSignal != null) {
144-
return cancelSignal.toFuture().isDone();
151+
AtomicBoolean requestCancelled = context.get(CANCELED_KEY);
152+
if (requestCancelled != null) {
153+
return requestCancelled.get();
145154
}
146155
return false;
147156
}
148157

149-
/**
150-
* Bind the source {@link Mono} to the publisher from the given {@link GraphQLContext}.
151-
* The returned {@code Mono} will be cancelled when this publisher completes.
152-
* Subscribers must use the returned {@code Mono} instance.
153-
* @param source the source {@code Mono}
154-
* @param context the current GraphQL context
155-
* @param <T> the type of published elements
156-
* @return the new {@code Mono} that will be cancelled when notified
157-
* @since 1.3.5
158-
*/
159-
public static <T> Mono<T> bindCancelFrom(Mono<T> source, GraphQLContext context) {
160-
Mono<Void> cancelSignal = context.get(CANCEL_PUBLISHER_KEY);
161-
if (cancelSignal != null) {
162-
return source.takeUntilOther(cancelSignal);
163-
}
164-
return source;
165-
}
166-
167158
/**
168159
* Bind the source {@link Flux} to the publisher from the given {@link GraphQLContext}.
169160
* The returned {@code Flux} will be cancelled when this publisher completes.
@@ -175,7 +166,7 @@ public static <T> Mono<T> bindCancelFrom(Mono<T> source, GraphQLContext context)
175166
* @since 1.3.5
176167
*/
177168
public static <T> Flux<T> bindCancelFrom(Flux<T> source, GraphQLContext context) {
178-
Mono<Void> cancelSignal = context.get(CANCEL_PUBLISHER_KEY);
169+
Mono<Void> cancelSignal = context.get(CANCELED_PUBLISHER_KEY);
179170
if (cancelSignal != null) {
180171
return source.takeUntilOther(cancelSignal);
181172
}

spring-graphql/src/main/java/org/springframework/graphql/execution/DefaultExecutionGraphQlService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import io.micrometer.context.ContextSnapshotFactory;
3131
import org.dataloader.DataLoaderRegistry;
3232
import reactor.core.publisher.Mono;
33-
import reactor.core.publisher.Sinks;
3433

3534
import org.springframework.graphql.ExecutionGraphQlRequest;
3635
import org.springframework.graphql.ExecutionGraphQlResponse;
@@ -93,13 +92,13 @@ public final Mono<ExecutionGraphQlResponse> execute(ExecutionGraphQlRequest requ
9392
factory.captureFrom(contextView).updateContext(graphQLContext);
9493

9594
ExecutionInput executionInputToUse = registerDataLoaders(executionInput);
96-
Sinks.Empty<Void> cancelPublisher = ContextPropagationHelper.createCancelPublisher(graphQLContext);
95+
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(graphQLContext);
9796

9897
return Mono.fromFuture(this.graphQlSource.graphQl().executeAsync(executionInputToUse))
9998
.onErrorResume((ex) -> ex instanceof GraphQLError, (ex) ->
10099
Mono.just(ExecutionResult.newExecutionResult().addError((GraphQLError) ex).build()))
101100
.map((result) -> new DefaultExecutionGraphQlResponse(executionInputToUse, result))
102-
.doOnCancel(cancelPublisher::tryEmitEmpty);
101+
.doOnCancel(cancelSignal::run);
103102
});
104103
}
105104

spring-graphql/src/test/java/org/springframework/graphql/execution/ContextDataFetcherDecoratorTests.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@
4242
import io.micrometer.context.ContextRegistry;
4343
import io.micrometer.context.ContextSnapshot;
4444
import io.micrometer.context.ContextSnapshotFactory;
45+
import org.junit.jupiter.api.Disabled;
4546
import org.junit.jupiter.api.Test;
4647
import reactor.core.publisher.Flux;
4748
import reactor.core.publisher.Mono;
48-
import reactor.core.publisher.Sinks;
4949
import reactor.test.StepVerifier;
5050

5151
import org.springframework.graphql.GraphQlSetup;
@@ -292,6 +292,7 @@ void trivialDataFetcherIsNotDecorated() {
292292
}
293293

294294
@Test
295+
@Disabled("until https://github.com/spring-projects/spring-graphql/issues/1171")
295296
void cancelMonoDataFetcherWhenRequestCancelled() {
296297
AtomicBoolean dataFetcherCancelled = new AtomicBoolean();
297298
GraphQL graphQl = GraphQlSetup.schemaContent(SCHEMA_CONTENT)
@@ -303,14 +304,15 @@ void cancelMonoDataFetcherWhenRequestCancelled() {
303304
.toGraphQl();
304305

305306
ExecutionInput input = ExecutionInput.newExecutionInput().query("{ greeting }").build();
306-
Sinks.Empty<Void> requestCancelled = ContextPropagationHelper.createCancelPublisher(input.getGraphQLContext());
307+
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(input.getGraphQLContext());
307308

308309
CompletableFuture<ExecutionResult> asyncResult = graphQl.executeAsync(input);
309-
requestCancelled.tryEmitEmpty();
310+
cancelSignal.run();
310311
await().atMost(Duration.ofSeconds(2)).until(dataFetcherCancelled::get);
311312
}
312313

313314
@Test
315+
@Disabled("until https://github.com/spring-projects/spring-graphql/issues/1171")
314316
void cancelFluxDataFetcherWhenRequestCancelled() {
315317
AtomicBoolean dataFetcherCancelled = new AtomicBoolean();
316318
GraphQL graphQl = GraphQlSetup.schemaContent(SCHEMA_CONTENT)
@@ -322,10 +324,10 @@ void cancelFluxDataFetcherWhenRequestCancelled() {
322324
.toGraphQl();
323325

324326
ExecutionInput input = ExecutionInput.newExecutionInput().query("{ greeting }").build();
325-
Sinks.Empty<Void> requestCancelled = ContextPropagationHelper.createCancelPublisher(input.getGraphQLContext());
327+
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(input.getGraphQLContext());
326328

327329
CompletableFuture<ExecutionResult> asyncResult = graphQl.executeAsync(input);
328-
requestCancelled.tryEmitEmpty();
330+
cancelSignal.run();
329331
await().atMost(Duration.ofSeconds(2)).until(dataFetcherCancelled::get);
330332
}
331333

@@ -336,8 +338,8 @@ void returnAbortExecutionForBlockingDataFetcherWhenRequestCancelled() throws Exc
336338
.toGraphQl();
337339

338340
ExecutionInput input = ExecutionInput.newExecutionInput().query("{ greeting }").build();
339-
Sinks.Empty<Void> requestCancelled = ContextPropagationHelper.createCancelPublisher(input.getGraphQLContext());
340-
requestCancelled.tryEmitEmpty();
341+
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(input.getGraphQLContext());
342+
cancelSignal.run();
341343
ExecutionResult result = graphQl.executeAsync(input).get();
342344

343345
assertThat(result.getErrors()).hasSize(1);
@@ -357,11 +359,11 @@ void cancelFluxDataFetcherSubscriptionWhenRequestCancelled() throws Exception {
357359
.toGraphQl();
358360

359361
ExecutionInput input = ExecutionInput.newExecutionInput().query("subscription { greetings }").build();
360-
Sinks.Empty<Void> requestCancelled = ContextPropagationHelper.createCancelPublisher(input.getGraphQLContext());
362+
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(input.getGraphQLContext());
361363

362364
ExecutionResult executionResult = graphQl.executeAsync(input).get();
363365
ResponseHelper.forSubscription(executionResult).subscribe();
364-
requestCancelled.tryEmitEmpty();
366+
cancelSignal.run();
365367

366368
await().atMost(Duration.ofSeconds(2)).until(dataFetcherCancelled::get);
367369
assertThat(dataFetcherCancelled).isTrue();

spring-graphql/src/test/java/org/springframework/graphql/execution/DefaultExecutionGraphQlServiceTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import graphql.ErrorType;
2424
import org.dataloader.DataLoaderRegistry;
25+
import org.junit.jupiter.api.Disabled;
2526
import org.junit.jupiter.api.Test;
2627
import reactor.core.publisher.Flux;
2728
import reactor.core.publisher.Mono;
@@ -82,6 +83,7 @@ void shouldHandleGraphQlErrors() {
8283
}
8384

8485
@Test
86+
@Disabled("until https://github.com/spring-projects/spring-graphql/issues/1171")
8587
void cancellationSupport() {
8688
AtomicBoolean cancelled = new AtomicBoolean();
8789
Mono<String> greetingMono = Mono.just("hi")

0 commit comments

Comments
 (0)