Skip to content

Commit 45d5b14

Browse files
defer: change to webflux and fully non-blocking
1 parent 1c23225 commit 45d5b14

File tree

2 files changed

+151
-73
lines changed

2 files changed

+151
-73
lines changed

defer/server/build.gradle

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,12 @@ repositories {
2626

2727

2828
dependencies {
29-
compile('org.springframework.boot:spring-boot-starter-web')
29+
compile('org.springframework.boot:spring-boot-starter-webflux')
30+
// compile('org.springframework.boot:spring-boot-starter-web')
31+
3032
compile "com.graphql-java:graphql-java:2018-09-16T01-27-46-27a6e44"
3133
compile 'com.google.guava:guava:26.0-jre'
3234
testCompile('org.springframework.boot:spring-boot-starter-test')
35+
// testImplementation('io.projectreactor:reactor-test')
36+
3337
}

defer/server/src/main/java/com/graphqljava/defer/GraphQLController.java

Lines changed: 146 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,23 @@
1313
import org.slf4j.Logger;
1414
import org.slf4j.LoggerFactory;
1515
import org.springframework.beans.factory.annotation.Autowired;
16+
import org.springframework.core.io.buffer.DataBuffer;
17+
import org.springframework.core.io.buffer.DataBufferFactory;
18+
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
19+
import org.springframework.http.HttpHeaders;
20+
import org.springframework.http.HttpStatus;
1621
import org.springframework.http.MediaType;
22+
import org.springframework.http.server.reactive.ServerHttpResponse;
1723
import org.springframework.web.bind.annotation.CrossOrigin;
1824
import org.springframework.web.bind.annotation.RequestBody;
1925
import org.springframework.web.bind.annotation.RequestMapping;
2026
import org.springframework.web.bind.annotation.RequestMethod;
2127
import org.springframework.web.bind.annotation.RestController;
28+
import reactor.core.publisher.Flux;
29+
import reactor.core.publisher.Mono;
2230

23-
import javax.servlet.AsyncContext;
24-
import javax.servlet.http.HttpServletRequest;
25-
import javax.servlet.http.HttpServletResponse;
2631
import java.io.IOException;
27-
import java.io.PrintWriter;
32+
import java.nio.charset.StandardCharsets;
2833
import java.util.LinkedHashMap;
2934
import java.util.Map;
3035

@@ -45,14 +50,14 @@ public class GraphQLController {
4550

4651
@RequestMapping(value = "/test", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
4752
@CrossOrigin
48-
public void graphql(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException {
53+
public Mono<Void> graphql(ServerHttpResponse serverHttpResponse) throws IOException {
4954
ImmutableMap<String, Object> body = ImmutableMap.of("query", "{books{title author comments @defer {user text}}}");
50-
graphql(body, httpServletRequest, httpServletResponse);
55+
return graphql(body, serverHttpResponse);
5156
}
5257

5358
@RequestMapping(value = "/graphql", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
5459
@CrossOrigin
55-
public void graphql(@RequestBody Map<String, Object> body, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException {
60+
public Mono<Void> graphql(@RequestBody Map<String, Object> body, ServerHttpResponse serverHttpResponse) throws IOException {
5661
String query = (String) body.get("query");
5762
if (query == null) {
5863
query = "";
@@ -69,94 +74,163 @@ public void graphql(@RequestBody Map<String, Object> body, HttpServletRequest ht
6974
ExecutionResult executionResult = graphql.execute(executionInput);
7075
Map<Object, Object> extensions = executionResult.getExtensions();
7176
if (extensions != null && extensions.containsKey(GraphQL.DEFERRED_RESULTS)) {
72-
handleDeferResponse(httpServletRequest, httpServletResponse, executionResult, extensions);
77+
return handleDeferResponse(serverHttpResponse, executionResult, extensions);
7378
} else {
74-
handleNormalResponse(httpServletResponse, executionResult);
79+
return handleNormalResponse(serverHttpResponse, executionResult);
7580
}
7681
}
7782

78-
private void handleDeferResponse(HttpServletRequest httpServletRequest,
79-
HttpServletResponse httpServletResponse,
80-
ExecutionResult executionResult,
81-
Map<Object, Object> extensions) {
82-
AsyncContext asyncContext = httpServletRequest.startAsync();
83-
asyncContext.start(() -> {
84-
Publisher<DeferredExecutionResult> deferredResults = (Publisher<DeferredExecutionResult>) extensions.get(GraphQL.DEFERRED_RESULTS);
85-
try {
86-
sendDeferResponse(asyncContext, httpServletRequest, httpServletResponse, executionResult, deferredResults);
87-
} catch (IOException e) {
88-
e.printStackTrace();
89-
}
90-
});
91-
83+
private Mono<Void> handleDeferResponse(ServerHttpResponse serverHttpResponse,
84+
ExecutionResult executionResult,
85+
Map<Object, Object> extensions) {
86+
Publisher<DeferredExecutionResult> deferredResults = (Publisher<DeferredExecutionResult>) extensions.get(GraphQL.DEFERRED_RESULTS);
87+
try {
88+
return sendDeferResponse(serverHttpResponse, executionResult, deferredResults);
89+
} catch (Exception e) {
90+
e.printStackTrace();
91+
throw new RuntimeException(e);
92+
}
9293
}
9394

94-
private void handleNormalResponse(HttpServletResponse httpServletResponse, ExecutionResult executionResult) throws IOException {
95+
private Mono<Void> handleNormalResponse(ServerHttpResponse serverHttpResponse, ExecutionResult executionResult) throws IOException {
9596
Map<String, Object> result = executionResult.toSpecification();
96-
httpServletResponse.setStatus(HttpServletResponse.SC_OK);
97-
httpServletResponse.setCharacterEncoding("UTF-8");
98-
httpServletResponse.setContentType("application/json");
99-
httpServletResponse.setHeader("Access-Control-Allow-Origin", "*");
97+
serverHttpResponse.setStatusCode(HttpStatus.OK);
98+
HttpHeaders headers = serverHttpResponse.getHeaders();
99+
headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
100100
String body = objectMapper.writeValueAsString(result);
101-
PrintWriter writer = httpServletResponse.getWriter();
102-
writer.write(body);
103-
writer.close();
101+
return serverHttpResponse.writeWith(strToDataBuffer(body));
102+
// PrintWriter writer = httpServletResponse.getWriter();
103+
// writer.write(body);
104+
// writer.close();
104105

105106
}
106107

107-
private void sendDeferResponse(AsyncContext asyncContext, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, ExecutionResult executionResult, Publisher<DeferredExecutionResult> deferredResults) throws IOException {
108-
httpServletResponse.setStatus(HttpServletResponse.SC_OK);
109-
httpServletResponse.setCharacterEncoding("UTF-8");
110-
httpServletResponse.setContentType("multipart/mixed; boundary=\"-\"");
111-
httpServletResponse.setHeader("Access-Control-Allow-Origin", "*");
112-
httpServletResponse.setHeader("Transfer-Encoding", "chunked");
113-
httpServletResponse.setHeader("Connection", "keep-alive");
114-
PrintWriter writer = httpServletResponse.getWriter();
108+
private Mono<Void> sendDeferResponse(ServerHttpResponse serverHttpResponse, ExecutionResult executionResult, Publisher<DeferredExecutionResult> deferredResults) {
109+
serverHttpResponse.setStatusCode(HttpStatus.OK);
110+
HttpHeaders headers = serverHttpResponse.getHeaders();
111+
headers.set("Content-Type", "multipart/mixed; boundary=\"-\"");
112+
headers.set("transfer-encoding", "chunked");
113+
// return message.headers().contains(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED, true);
114+
headers.set("Connection", "keep-alive");
115115

116-
DeferPart deferPart = new DeferPart(executionResult.toSpecification());
117-
writer.append(CRLF).append("---").append(CRLF);
118-
String body = deferPart.write();
119-
writer.write(body);
120-
httpServletResponse.flushBuffer();
121116

122-
deferredResults.subscribe(new Subscriber<DeferredExecutionResult>() {
117+
DataBufferFactory dataBufferFactory = serverHttpResponse.bufferFactory();
123118

124-
Subscription subscription;
125119

126-
@Override
127-
public void onSubscribe(Subscription s) {
128-
subscription = s;
129-
subscription.request(10);
130-
}
120+
// serverHttpResponse.writeAndFlushWith(Mono.just(firstDataBuffer)).subscribe(aVoid -> {
121+
// System.out.println("done FIRST");
122+
// }, throwable -> {
123+
// throwable.printStackTrace();
124+
// }, () -> {
125+
// System.out.println("completed FIRST");
126+
// });
127+
128+
129+
// Flux<Mono<DataBuffer>> dataBufferFlux = Flux.from(deferredResults).map(deferredExecutionResult -> {
130+
// DeferPart deferPart = new DeferPart(executionResult.toSpecification());
131+
// StringBuilder builder = new StringBuilder();
132+
// String body = deferPart.write();
133+
// System.out.println("body:" + body);
134+
// builder.append(CRLF).append("---").append(CRLF);
135+
// builder.append(body);
136+
// Mono<DataBuffer> dataBuffer = strToDataBuffer(dataBufferFactory, builder.toString());
137+
// return dataBuffer;
138+
// });
139+
140+
// Flux<Mono<DataBuffer>> resultFlux = Flux.mergeSequential(Flux.just(firstDataBuffer), dataBufferFlux);
141+
// serverHttpResponse.writeAndFlushWith(resultFlux).subscribe(aVoid -> {
142+
// StringBuilder end = new StringBuilder();
143+
// end.append(CRLF).append("-----").append(CRLF);
144+
// serverHttpResponse.writeWith(strToDataBuffer(dataBufferFactory, end.toString()));
145+
// serverHttpResponse.setComplete();
146+
//
147+
// });
148+
149+
// serverHttpResponse.beforeCommit(() -> {
150+
// System.out.println("BEFORE COMMIT");
151+
// });
152+
153+
Flux<Mono<DataBuffer>> dataBufferFlux = Flux.create(monoFluxSink -> {
154+
155+
Mono<DataBuffer> firstDataBuffer = firstResult(executionResult);
156+
monoFluxSink.next(firstDataBuffer);
157+
158+
deferredResults.subscribe(new Subscriber<DeferredExecutionResult>() {
131159

132-
@Override
133-
public void onNext(DeferredExecutionResult executionResult) {
134-
try {
135-
DeferPart deferPart = new DeferPart(executionResult.toSpecification());
136-
String body = deferPart.write();
137-
writer.append(CRLF).append("---").append(CRLF);
138-
writer.write(body);
139-
httpServletResponse.flushBuffer();
160+
Subscription subscription;
161+
162+
@Override
163+
public void onSubscribe(Subscription s) {
164+
subscription = s;
140165
subscription.request(10);
141-
} catch (Exception e) {
142-
e.printStackTrace();
143166
}
144-
}
145167

146-
@Override
147-
public void onError(Throwable t) {
148-
t.printStackTrace();
149-
}
168+
@Override
169+
public void onNext(DeferredExecutionResult executionResult) {
170+
try {
171+
// DeferPart deferPart = new DeferPart(executionResult.toSpecification());
172+
// String body = deferPart.write();
173+
// writer.append(CRLF).append("---").append(CRLF);
174+
// writer.write(body);
175+
System.out.println("is comitted:" + serverHttpResponse.isCommitted());
176+
DeferPart deferPart = new DeferPart(executionResult.toSpecification());
177+
StringBuilder builder = new StringBuilder();
178+
String body = deferPart.write();
179+
System.out.println("body:" + body);
180+
builder.append(CRLF).append("---").append(CRLF);
181+
builder.append(body);
182+
Mono<DataBuffer> dataBuffer = strToDataBuffer(builder.toString());
183+
monoFluxSink.next(dataBuffer);
184+
} catch (Exception e) {
185+
e.printStackTrace();
186+
}
187+
}
188+
189+
@Override
190+
public void onError(Throwable t) {
191+
t.printStackTrace();
192+
}
193+
194+
@Override
195+
public void onComplete() {
196+
// writer.append(CRLF).append("-----").append(CRLF);
197+
// writer.close();
198+
// asyncContext.complete();
199+
System.out.println("END!!!");
200+
StringBuilder end = new StringBuilder();
201+
end.append(CRLF).append("-----").append(CRLF);
202+
Mono<DataBuffer> dataBuffer = strToDataBuffer(end.toString());
203+
monoFluxSink.next(dataBuffer);
204+
// serverHttpResponse.writeAndFlushWith(Mono.just().subscribe(aVoid -> {
205+
// System.out.println("done END");
206+
// }, throwable -> {
207+
// throwable.printStackTrace();
208+
// }, () -> {
209+
// System.out.println("completed END");
210+
// serverHttpResponse.setComplete();
211+
// });
212+
}
213+
});
150214

151-
@Override
152-
public void onComplete() {
153-
writer.append(CRLF).append("-----").append(CRLF);
154-
writer.close();
155-
asyncContext.complete();
156-
}
157215
});
158216

217+
return serverHttpResponse.writeAndFlushWith(dataBufferFlux);
218+
}
219+
220+
private Mono<DataBuffer> firstResult(ExecutionResult executionResult) {
221+
StringBuilder builder = new StringBuilder();
222+
builder.append(CRLF).append("---").append(CRLF);
223+
DeferPart deferPart = new DeferPart(executionResult.toSpecification());
224+
String body = deferPart.write();
225+
builder.append(body);
226+
Mono<DataBuffer> dataBufferMono = strToDataBuffer(body);
227+
return dataBufferMono;
228+
}
159229

230+
private Mono<DataBuffer> strToDataBuffer(String string) {
231+
byte[] bytes = string.getBytes(StandardCharsets.UTF_8);
232+
DefaultDataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory();
233+
return Mono.just(defaultDataBufferFactory.wrap(bytes));
160234
}
161235

162236
private class DeferPart {

0 commit comments

Comments
 (0)