Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.function.Function;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerFormDecoderProvider;
import reactor.netty.resources.LoopResources;

public class HttpGateway implements Gateway {
Expand All @@ -38,6 +39,7 @@ public class HttpGateway implements Gateway {
private final HttpGatewayAuthenticator authenticator;
private final boolean corsEnabled;
private final CorsConfigBuilder corsConfigBuilder;
private final Consumer<HttpServerFormDecoderProvider.Builder> formDecoderBuilder;

private DisposableServer server;
private LoopResources loopResources;
Expand All @@ -50,6 +52,7 @@ private HttpGateway(Builder builder) {
this.authenticator = builder.authenticator;
this.corsEnabled = builder.corsEnabled;
this.corsConfigBuilder = builder.corsConfigBuilder;
this.formDecoderBuilder = builder.formDecoderBuilder;
}

public static Builder builder() {
Expand Down Expand Up @@ -80,6 +83,12 @@ public Gateway start(ServiceCall call, ServiceRegistry serviceRegistry) {
.handle(
new HttpGatewayAcceptor(
callFactory.apply(call), serviceRegistry, errorMapper, authenticator))
.httpFormDecoder(
builder -> {
if (formDecoderBuilder != null) {
formDecoderBuilder.accept(builder);
}
})
.bind()
.toFuture()
.get();
Expand Down Expand Up @@ -129,6 +138,8 @@ public static class Builder {
.allowedRequestHeaders("*")
.exposeHeaders("*")
.maxAge(3600);
private Consumer<HttpServerFormDecoderProvider.Builder> formDecoderBuilder =
builder -> builder.maxSize(100 * 1024 * 1024);

private Builder() {}

Expand Down Expand Up @@ -196,6 +207,15 @@ public Builder corsConfigBuilder(Consumer<CorsConfigBuilder> consumer) {
return this;
}

public Consumer<HttpServerFormDecoderProvider.Builder> formDecoderBuilder() {
return formDecoderBuilder;
}

public Builder formDecoderBuilder(Consumer<HttpServerFormDecoderProvider.Builder> consumer) {
this.formDecoderBuilder = consumer;
return this;
}

public HttpGateway build() {
return new HttpGateway(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.multipart.FileUpload;
Expand Down Expand Up @@ -52,7 +51,6 @@ public class HttpGatewayAcceptor

private static final String ERROR_NAMESPACE = "io.scalecube.services.error";
private static final long MAX_SERVICE_MESSAGE_SIZE = 1024 * 1024;
private static final long MAX_FILE_UPLOAD_SIZE = 100 * 1024 * 1024;

private final ServiceCall serviceCall;
private final ServiceRegistry serviceRegistry;
Expand Down Expand Up @@ -104,35 +102,59 @@ private Mono<Void> handleFileUploadRequest(
Map<String, String> principal,
HttpServerRequest httpRequest,
HttpServerResponse httpResponse) {
return Mono.fromRunnable(() -> validateFileUploadRequest(httpRequest))
.thenMany(httpRequest.receiveForm())
.doOnNext(HttpGatewayAcceptor::doPostFileUploadCheck)
return httpRequest
.receiveForm()
.map(
data -> {
if (!(data instanceof FileUpload file)) {
throw new BadRequestException(
"Non file-upload part is not allowed, name=" + data.getName());
}
return file.retain();
})
.collectList()
.flatMap(
httpData ->
serviceCall
.requestBidirectional(
createFileFlux(httpData)
.map(
data ->
toMessage(
httpRequest,
builder -> {
final var filename =
((FileUpload) httpData).getFilename();
files -> {
if (files.size() != 1) {
return Mono.error(
new BadRequestException(
"Exactly one file-upload part is expected (received: "
+ files.size()
+ ")"));
}

final var fileUpload = files.get(0);
final var filename = fileUpload.getFilename();

return serviceCall
.requestBidirectional(
createFileFlux(fileUpload)
.map(
data ->
toMessage(
httpRequest,
builder ->
builder
.headers(principal)
.header(HEADER_UPLOAD_FILENAME, filename)
.data(data);
})))
.last()
.flatMap(
response ->
response.isError() // check error
? error(httpResponse, response)
: response.hasData() // check data
? ok(httpResponse, response)
: noContent(httpResponse))
.doFinally(signalType -> httpData.delete()))
.data(data))))
.last()
.flatMap(
response ->
response.isError() // check error
? error(httpResponse, response)
: response.hasData() // check data
? ok(httpResponse, response)
: noContent(httpResponse))
.doFinally(
signalType -> {
try {
fileUpload.delete();
} finally {
safestRelease(fileUpload);
}
});
})
.then()
.onErrorResume(ex -> error(httpResponse, errorMapper.toMessage(ERROR_NAMESPACE, ex)));
}
Expand All @@ -150,7 +172,11 @@ private Mono<Void> handleServiceRequest(
final var limit = MAX_SERVICE_MESSAGE_SIZE;
if (readableBytes >= limit) {
throw new BadRequestException(
"Service message is too large, size: " + readableBytes + ", limit: " + limit);
"Service message is too large (size: "
+ readableBytes
+ ", limit: "
+ limit
+ ")");
}
return reduce.writeBytes(byteBuf);
})
Expand Down Expand Up @@ -321,29 +347,4 @@ private static Flux<byte[]> createFileFlux(HttpData httpData) {
throw new RuntimeException(e);
}
}

private static void validateFileUploadRequest(HttpServerRequest httpRequest) {
final var contentLengthHeader =
httpRequest.requestHeaders().get(HttpHeaderNames.CONTENT_LENGTH);
final var contentLength =
contentLengthHeader != null ? Long.parseLong(contentLengthHeader) : -1L;
final var limit = MAX_FILE_UPLOAD_SIZE;
if (contentLength > limit) {
throw new BadRequestException(
"File upload is too large, size: " + contentLength + ", limit: " + limit);
}
}

private static void doPostFileUploadCheck(HttpData httpData) {
if (!(httpData instanceof FileUpload)) {
throw new BadRequestException("File upload is missing or invalid");
}
final long fileSize = httpData.length();
final var limit = MAX_FILE_UPLOAD_SIZE;
if (fileSize > limit) {
httpData.delete();
throw new BadRequestException(
"File upload is too large, size: " + fileSize + ", limit: " + limit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.Function;
import java.util.stream.Stream;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.OkHttpClient;
Expand All @@ -31,11 +33,15 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.publisher.Mono;

public class FileUploadTest {

private static final int MAX_SIZE = 15 * 1024 * 1024;
private static final int SIZE_OVER_LIMIT = MAX_SIZE << 1;

private static Microservices gateway;
private static Microservices microservices;
private static Address httpAddress;
Expand All @@ -56,7 +62,12 @@ static void beforeAll() {
.options(opts -> opts.metadata(serviceEndpoint)))
.transport(
() -> new RSocketServiceTransport().credentialsSupplier(credentialsSupplier))
.gateway(() -> HttpGateway.builder().id("HTTP").build()));
.gateway(
() ->
HttpGateway.builder()
.id("HTTP")
.formDecoderBuilder(builder -> builder.maxSize(MAX_SIZE))
.build()));

microservices =
Microservices.start(
Expand Down Expand Up @@ -85,9 +96,9 @@ static void afterAll() {
}
}

@ParameterizedTest(name = "Upload file of size {0} bytes")
@ValueSource(longs = {0, 64, 512, 1024, 1024 * 1024, 10 * 1024 * 1024})
public void uploadSuccessfully(long fileSize) throws Exception {
@ParameterizedTest(name = "Upload successfully, size: {0} bytes")
@ValueSource(longs = {0, 512, 1024, 1024 * 1024, 10 * 1024 * 1024})
void uploadSuccessfully(long fileSize) throws Exception {
final var client = new OkHttpClient();
final var file = generateFile(Files.createTempFile("export_report_", null), fileSize);

Expand All @@ -113,10 +124,11 @@ public void uploadSuccessfully(long fileSize) throws Exception {
}
}

@Test
public void uploadFailed() throws Exception {
@ParameterizedTest
@MethodSource("uploadFailedSource")
void uploadFailed(UploadFailedArgs args) throws Exception {
final var client = new OkHttpClient();
final var file = generateFile(Files.createTempFile("export_report_", null), 1024);
final var file = generateFile(Files.createTempFile("export_report_", null), args.fileSize());

final var body =
new MultipartBody.Builder()
Expand All @@ -135,8 +147,47 @@ public void uploadFailed() throws Exception {
assertEquals(
HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), response.code(), "response.code");
assertEquals(
"{\"errorCode\":500,\"errorMessage\":\"Upload report failed: %s\"}"
.formatted(file.getName()),
"{\"errorCode\":%s,\"errorMessage\":\"%s\"}"
.formatted(args.errorCode(), args.errorMessageFunc().apply(file)),
response.body().string(),
"response.body");
}
}

@Test
void onlySingleFileAllowed() throws Exception {
final var client = new OkHttpClient();

final var fileSize = 1024 * 1024;
final var file1 = generateFile(Files.createTempFile("export_report_1_", null), fileSize);
final var file2 = generateFile(Files.createTempFile("export_report_2_", null), fileSize);

final var body =
new MultipartBody.Builder()
.setType(MultipartBody.FORM)
.addFormDataPart(
"file",
file1.getName(),
RequestBody.create(file1, MediaType.get("application/text")))
.addFormDataPart(
"file",
file2.getName(),
RequestBody.create(file2, MediaType.get("application/text")))
.build();

final var request =
new Request.Builder()
.url("http://localhost:" + httpAddress.port() + "/v1/api/uploadReportError")
.post(body)
.build();

try (Response response = client.newCall(request).execute()) {
assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code(), "response.code");
assertEquals(
"{"
+ "\"errorCode\":400,"
+ "\"errorMessage\":\"Exactly one file-upload part is expected (received: 2)\""
+ "}",
response.body().string(),
"response.body");
}
Expand All @@ -152,4 +203,19 @@ private static void assertFilesEqual(Path expected, Path actual) {
throw new RuntimeException(e);
}
}

private static Stream<UploadFailedArgs> uploadFailedSource() {
return Stream.of(
new UploadFailedArgs(
1024 * 1024,
HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
file -> "Upload report failed: " + file.getName()),
new UploadFailedArgs(
SIZE_OVER_LIMIT,
HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
file -> "java.io.IOException: Size exceed allowed maximum capacity"));
}

private record UploadFailedArgs(
long fileSize, int errorCode, Function<File, String> errorMessageFunc) {}
}