Skip to content

Commit fa478d6

Browse files
committed
Refactored file upload: generalized HttpGatewayAcceptor file-download
1 parent a281dff commit fa478d6

File tree

9 files changed

+371
-265
lines changed

9 files changed

+371
-265
lines changed

services-api/src/main/java/io/scalecube/services/api/ServiceMessage.java

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.scalecube.services.api;
22

3+
import java.lang.reflect.Array;
4+
import java.util.Collection;
35
import java.util.Collections;
46
import java.util.HashMap;
57
import java.util.Map;
@@ -50,15 +52,25 @@ private ServiceMessage(Builder builder) {
5052
}
5153

5254
/**
53-
* Instantiates new message with the same data and headers as at given message.
55+
* Instantiates new message builder with the same data and headers as at given message.
5456
*
5557
* @param message the message to be copied
56-
* @return a new message, with the same data and headers
58+
* @return message builder
5759
*/
5860
public static Builder from(ServiceMessage message) {
5961
return ServiceMessage.builder().data(message.data()).headers(message.headers());
6062
}
6163

64+
/**
65+
* Instantiates new message builder with the headers as at given message.
66+
*
67+
* @param headers the headers to be copied
68+
* @return message builder
69+
*/
70+
public static Builder from(Map<String, String> headers) {
71+
return ServiceMessage.builder().headers(headers);
72+
}
73+
6274
/**
6375
* Instantiates new message with error qualifier for given error type and specified error code and
6476
* message.
@@ -81,7 +93,7 @@ public static ServiceMessage error(
8193
/**
8294
* Instantiates new empty message builder.
8395
*
84-
* @return new builder
96+
* @return new message builder
8597
*/
8698
public static Builder builder() {
8799
return new Builder();
@@ -196,7 +208,7 @@ public int errorType() {
196208
/**
197209
* Returns request method header.
198210
*
199-
* @return request method, or null if such header doesn't exist.
211+
* @return request method, or null if does not exist
200212
*/
201213
public String requestMethod() {
202214
return headers.get(HEADER_REQUEST_METHOD);
@@ -205,20 +217,43 @@ public String requestMethod() {
205217
/**
206218
* Returns upload filename header.
207219
*
208-
* @return upload filename, or null if such header doesn't exist.
220+
* @return request method, or null if does not exist
209221
*/
210222
public String uploadFilename() {
211223
return headers.get(HEADER_UPLOAD_FILENAME);
212224
}
213225

214226
@Override
215227
public String toString() {
216-
return new StringJoiner(", ", "ServiceMessage" + "[", "]")
217-
.add("headers(" + headers.size() + ")")
218-
.add("data=" + data)
228+
return new StringJoiner(", ", ServiceMessage.class.getSimpleName() + "[", "]")
229+
.add("headers[" + headers.size() + "]")
230+
.add("data=" + toString(data))
219231
.toString();
220232
}
221233

234+
public static String toString(Object request) {
235+
if (request == null) {
236+
return "null";
237+
}
238+
// Handle arrays
239+
if (request.getClass().isArray()) {
240+
return request.getClass().getComponentType().getSimpleName()
241+
+ "["
242+
+ Array.getLength(request)
243+
+ "]";
244+
}
245+
// Handle collections
246+
if (request instanceof Collection<?> collection) {
247+
return collection.getClass().getSimpleName() + "[" + collection.size() + "]";
248+
}
249+
// Handle maps
250+
if (request instanceof Map<?, ?> map) {
251+
return map.getClass().getSimpleName() + "[" + map.size() + "]";
252+
}
253+
// Fallback
254+
return String.valueOf(request);
255+
}
256+
222257
public static class Builder {
223258

224259
private final Map<String, String> headers = new HashMap<>(2);

services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,8 @@
88
import io.scalecube.services.exceptions.ForbiddenException;
99
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
1010
import io.scalecube.services.transport.api.ServiceMessageDataDecoder;
11-
import java.lang.reflect.Array;
1211
import java.lang.reflect.InvocationTargetException;
1312
import java.lang.reflect.Method;
14-
import java.util.Collection;
1513
import java.util.Map;
1614
import java.util.Objects;
1715
import org.reactivestreams.Publisher;
@@ -73,7 +71,7 @@ public Mono<ServiceMessage> invokeOne(ServiceMessage message) {
7371
logger.debug(
7472
"[{}] request: {}, response: {}",
7573
message.qualifier(),
76-
toString(request),
74+
ServiceMessage.toString(request),
7775
response);
7876
}
7977
})
@@ -83,7 +81,7 @@ public Mono<ServiceMessage> invokeOne(ServiceMessage message) {
8381
logger.error(
8482
"[{}][error] request: {}",
8583
message.qualifier(),
86-
toString(request),
84+
ServiceMessage.toString(request),
8785
ex);
8886
}
8987
});
@@ -119,14 +117,16 @@ public Flux<ServiceMessage> invokeMany(ServiceMessage message) {
119117
logger.debug(
120118
"[{}][subscribe] request: {}",
121119
message.qualifier(),
122-
toString(request));
120+
ServiceMessage.toString(request));
123121
}
124122
})
125123
.doOnComplete(
126124
() -> {
127125
if (logger != null && logger.isDebugEnabled()) {
128126
logger.debug(
129-
"[{}][complete] request: {}", message.qualifier(), toString(request));
127+
"[{}][complete] request: {}",
128+
message.qualifier(),
129+
ServiceMessage.toString(request));
130130
}
131131
})
132132
.doOnError(
@@ -135,7 +135,7 @@ public Flux<ServiceMessage> invokeMany(ServiceMessage message) {
135135
logger.error(
136136
"[{}][error] request: {}",
137137
message.qualifier(),
138-
toString(request),
138+
ServiceMessage.toString(request),
139139
ex);
140140
}
141141
});
@@ -181,7 +181,7 @@ public Flux<ServiceMessage> invokeBidirectional(Publisher<ServiceMessage> publis
181181
logger.debug(
182182
"[{}][subscribe] request: {}",
183183
qualifier,
184-
toString(request));
184+
ServiceMessage.toString(request));
185185
}
186186
})
187187
.doOnComplete(
@@ -190,7 +190,7 @@ public Flux<ServiceMessage> invokeBidirectional(Publisher<ServiceMessage> publis
190190
logger.debug(
191191
"[{}][complete] request: {}",
192192
qualifier,
193-
toString(request));
193+
ServiceMessage.toString(request));
194194
}
195195
})
196196
.doOnError(
@@ -199,7 +199,7 @@ public Flux<ServiceMessage> invokeBidirectional(Publisher<ServiceMessage> publis
199199
logger.error(
200200
"[{}][error] request: {}",
201201
qualifier,
202-
toString(request),
202+
ServiceMessage.toString(request),
203203
ex);
204204
}
205205
})
@@ -322,27 +322,4 @@ private Mono<Principal> mapPrincipal(RequestContext context) {
322322
return Mono.defer(() -> principalMapper.map(context))
323323
.switchIfEmpty(Mono.just(context.principal()));
324324
}
325-
326-
private static String toString(Object request) {
327-
if (request == null) {
328-
return "null";
329-
}
330-
// Handle arrays
331-
if (request.getClass().isArray()) {
332-
return request.getClass().getComponentType().getSimpleName()
333-
+ "["
334-
+ Array.getLength(request)
335-
+ "]";
336-
}
337-
// Handle collections
338-
if (request instanceof Collection<?> collection) {
339-
return collection.getClass().getSimpleName() + "[" + collection.size() + "]";
340-
}
341-
// Handle maps
342-
if (request instanceof Map<?, ?> map) {
343-
return map.getClass().getSimpleName() + "[" + map.size() + "]";
344-
}
345-
// Fallback
346-
return String.valueOf(request);
347-
}
348325
}

services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGatewayAcceptor.java

Lines changed: 20 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,11 @@
2020
import io.netty.handler.codec.http.multipart.HttpData;
2121
import io.scalecube.services.ServiceCall;
2222
import io.scalecube.services.ServiceReference;
23-
import io.scalecube.services.api.DynamicQualifier;
2423
import io.scalecube.services.api.ErrorData;
2524
import io.scalecube.services.api.ServiceMessage;
2625
import io.scalecube.services.api.ServiceMessage.Builder;
2726
import io.scalecube.services.exceptions.BadRequestException;
28-
import io.scalecube.services.exceptions.ServiceException;
27+
import io.scalecube.services.exceptions.InternalServiceException;
2928
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
3029
import io.scalecube.services.files.FileChannelFlux;
3130
import io.scalecube.services.registry.api.ServiceRegistry;
@@ -40,9 +39,9 @@
4039
import org.reactivestreams.Publisher;
4140
import org.slf4j.Logger;
4241
import org.slf4j.LoggerFactory;
42+
import reactor.core.Exceptions;
4343
import reactor.core.publisher.Flux;
4444
import reactor.core.publisher.Mono;
45-
import reactor.core.publisher.Signal;
4645
import reactor.netty.http.server.HttpServerRequest;
4746
import reactor.netty.http.server.HttpServerResponse;
4847

@@ -281,82 +280,36 @@ private Mono<Void> handleFileDownloadRequest(
281280
.requestMany(message)
282281
.switchOnFirst(
283282
(signal, flux) -> {
284-
final var qualifier = message.qualifier();
285-
final var map =
286-
DynamicQualifier.from("v1/endpoints/:endpointId/files/:name")
287-
.matchQualifier(qualifier);
288-
if (map == null) {
289-
throw new RuntimeException("Wrong qualifier: " + qualifier);
283+
if (signal.hasError()) {
284+
throw Exceptions.propagate(signal.getThrowable());
290285
}
291286

292-
final var filename = map.get("name");
293-
final var statusCode = toStatusCode(signal);
287+
if (!signal.hasValue()) {
288+
throw new InternalServiceException("File stream is missing or invalid");
289+
}
294290

295-
if (statusCode != HttpResponseStatus.OK.code()) {
296-
return response
297-
.status(statusCode)
298-
.sendString(Mono.just(errorMessage(statusCode, filename)))
299-
.then();
291+
final var downloadMessage = signal.get();
292+
if (downloadMessage.isError()) {
293+
return error(response, downloadMessage);
300294
}
301295

302-
final Flux<ByteBuf> responseFlux =
303-
flux.map(
304-
sm -> {
305-
if (sm.isError()) {
306-
throw new RuntimeException("File stream was interrupted");
307-
}
308-
return sm.data();
309-
});
296+
downloadMessage.headers().forEach(response::header);
310297

311298
return response
312-
.header("Content-Type", "application/octet-stream")
313-
.header("Content-Disposition", "attachment; filename=" + filename)
314-
.send(responseFlux)
299+
.send(
300+
flux.skip(1)
301+
.map(
302+
sm -> {
303+
if (sm.isError()) {
304+
throw new RuntimeException("File stream was interrupted");
305+
}
306+
return sm.data();
307+
}))
315308
.then();
316309
})
317310
.then();
318311
}
319312

320-
private static int toStatusCode(Signal<? extends ServiceMessage> signal) {
321-
if (signal.hasError()) {
322-
return toStatusCode(signal.getThrowable());
323-
}
324-
325-
if (!signal.hasValue()) {
326-
return HttpResponseStatus.NO_CONTENT.code();
327-
}
328-
329-
return toStatusCode(signal.get());
330-
}
331-
332-
private static int toStatusCode(Throwable throwable) {
333-
if (throwable instanceof ServiceException e) {
334-
return e.errorCode();
335-
} else {
336-
return HttpResponseStatus.INTERNAL_SERVER_ERROR.code();
337-
}
338-
}
339-
340-
private static int toStatusCode(ServiceMessage serviceMessage) {
341-
if (serviceMessage == null || !serviceMessage.hasData()) {
342-
return HttpResponseStatus.NO_CONTENT.code();
343-
}
344-
345-
if (serviceMessage.isError()) {
346-
return HttpResponseStatus.INTERNAL_SERVER_ERROR.code();
347-
}
348-
349-
return HttpResponseStatus.OK.code();
350-
}
351-
352-
private static String errorMessage(int statusCode, String fileName) {
353-
if (statusCode == 500) {
354-
return "File not found: " + fileName;
355-
} else {
356-
return HttpResponseStatus.valueOf(statusCode).reasonPhrase();
357-
}
358-
}
359-
360313
private static Flux<byte[]> createFileFlux(HttpData httpData) {
361314
try {
362315
if (httpData.isInMemory()) {

0 commit comments

Comments
 (0)