diff --git a/services-api/src/main/java/io/scalecube/services/api/ServiceMessage.java b/services-api/src/main/java/io/scalecube/services/api/ServiceMessage.java index 30f8633c9..fffb0d91a 100644 --- a/services-api/src/main/java/io/scalecube/services/api/ServiceMessage.java +++ b/services-api/src/main/java/io/scalecube/services/api/ServiceMessage.java @@ -1,5 +1,7 @@ package io.scalecube.services.api; +import java.lang.reflect.Array; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -50,15 +52,25 @@ private ServiceMessage(Builder builder) { } /** - * Instantiates new message with the same data and headers as at given message. + * Instantiates new message builder with the same data and headers as at given message. * * @param message the message to be copied - * @return a new message, with the same data and headers + * @return message builder */ public static Builder from(ServiceMessage message) { return ServiceMessage.builder().data(message.data()).headers(message.headers()); } + /** + * Instantiates new message builder with the headers as at given message. + * + * @param headers the headers to be copied + * @return message builder + */ + public static Builder from(Map headers) { + return ServiceMessage.builder().headers(headers); + } + /** * Instantiates new message with error qualifier for given error type and specified error code and * message. @@ -81,7 +93,7 @@ public static ServiceMessage error( /** * Instantiates new empty message builder. * - * @return new builder + * @return new message builder */ public static Builder builder() { return new Builder(); @@ -196,7 +208,7 @@ public int errorType() { /** * Returns request method header. * - * @return request method, or null if such header doesn't exist. + * @return request method, or null if does not exist */ public String requestMethod() { return headers.get(HEADER_REQUEST_METHOD); @@ -205,7 +217,7 @@ public String requestMethod() { /** * Returns upload filename header. * - * @return upload filename, or null if such header doesn't exist. + * @return request method, or null if does not exist */ public String uploadFilename() { return headers.get(HEADER_UPLOAD_FILENAME); @@ -213,12 +225,35 @@ public String uploadFilename() { @Override public String toString() { - return new StringJoiner(", ", "ServiceMessage" + "[", "]") - .add("headers(" + headers.size() + ")") - .add("data=" + data) + return new StringJoiner(", ", ServiceMessage.class.getSimpleName() + "[", "]") + .add("headers[" + headers.size() + "]") + .add("data=" + toString(data)) .toString(); } + public static String toString(Object request) { + if (request == null) { + return "null"; + } + // Handle arrays + if (request.getClass().isArray()) { + return request.getClass().getComponentType().getSimpleName() + + "[" + + Array.getLength(request) + + "]"; + } + // Handle collections + if (request instanceof Collection collection) { + return collection.getClass().getSimpleName() + "[" + collection.size() + "]"; + } + // Handle maps + if (request instanceof Map map) { + return map.getClass().getSimpleName() + "[" + map.size() + "]"; + } + // Fallback + return String.valueOf(request); + } + public static class Builder { private final Map headers = new HashMap<>(2); diff --git a/services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java b/services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java index 431354992..d9bbcdc14 100644 --- a/services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java +++ b/services-api/src/main/java/io/scalecube/services/methods/ServiceMethodInvoker.java @@ -8,10 +8,8 @@ import io.scalecube.services.exceptions.ForbiddenException; import io.scalecube.services.exceptions.ServiceProviderErrorMapper; import io.scalecube.services.transport.api.ServiceMessageDataDecoder; -import java.lang.reflect.Array; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.util.Collection; import java.util.Map; import java.util.Objects; import org.reactivestreams.Publisher; @@ -73,7 +71,7 @@ public Mono invokeOne(ServiceMessage message) { logger.debug( "[{}] request: {}, response: {}", message.qualifier(), - toString(request), + ServiceMessage.toString(request), response); } }) @@ -83,7 +81,7 @@ public Mono invokeOne(ServiceMessage message) { logger.error( "[{}][error] request: {}", message.qualifier(), - toString(request), + ServiceMessage.toString(request), ex); } }); @@ -119,14 +117,16 @@ public Flux invokeMany(ServiceMessage message) { logger.debug( "[{}][subscribe] request: {}", message.qualifier(), - toString(request)); + ServiceMessage.toString(request)); } }) .doOnComplete( () -> { if (logger != null && logger.isDebugEnabled()) { logger.debug( - "[{}][complete] request: {}", message.qualifier(), toString(request)); + "[{}][complete] request: {}", + message.qualifier(), + ServiceMessage.toString(request)); } }) .doOnError( @@ -135,7 +135,7 @@ public Flux invokeMany(ServiceMessage message) { logger.error( "[{}][error] request: {}", message.qualifier(), - toString(request), + ServiceMessage.toString(request), ex); } }); @@ -181,7 +181,7 @@ public Flux invokeBidirectional(Publisher publis logger.debug( "[{}][subscribe] request: {}", qualifier, - toString(request)); + ServiceMessage.toString(request)); } }) .doOnComplete( @@ -190,7 +190,7 @@ public Flux invokeBidirectional(Publisher publis logger.debug( "[{}][complete] request: {}", qualifier, - toString(request)); + ServiceMessage.toString(request)); } }) .doOnError( @@ -199,7 +199,7 @@ public Flux invokeBidirectional(Publisher publis logger.error( "[{}][error] request: {}", qualifier, - toString(request), + ServiceMessage.toString(request), ex); } }) @@ -322,27 +322,4 @@ private Mono mapPrincipal(RequestContext context) { return Mono.defer(() -> principalMapper.map(context)) .switchIfEmpty(Mono.just(context.principal())); } - - private static String toString(Object request) { - if (request == null) { - return "null"; - } - // Handle arrays - if (request.getClass().isArray()) { - return request.getClass().getComponentType().getSimpleName() - + "[" - + Array.getLength(request) - + "]"; - } - // Handle collections - if (request instanceof Collection collection) { - return collection.getClass().getSimpleName() + "[" + collection.size() + "]"; - } - // Handle maps - if (request instanceof Map map) { - return map.getClass().getSimpleName() + "[" + map.size() + "]"; - } - // Fallback - return String.valueOf(request); - } } diff --git a/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGatewayAcceptor.java b/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGatewayAcceptor.java index acbb692d1..ffcc40275 100644 --- a/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGatewayAcceptor.java +++ b/services-gateway/src/main/java/io/scalecube/services/gateway/http/HttpGatewayAcceptor.java @@ -20,12 +20,11 @@ import io.netty.handler.codec.http.multipart.HttpData; import io.scalecube.services.ServiceCall; import io.scalecube.services.ServiceReference; -import io.scalecube.services.api.DynamicQualifier; import io.scalecube.services.api.ErrorData; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.api.ServiceMessage.Builder; import io.scalecube.services.exceptions.BadRequestException; -import io.scalecube.services.exceptions.ServiceException; +import io.scalecube.services.exceptions.InternalServiceException; import io.scalecube.services.exceptions.ServiceProviderErrorMapper; import io.scalecube.services.files.FileChannelFlux; import io.scalecube.services.registry.api.ServiceRegistry; @@ -40,9 +39,9 @@ import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Signal; import reactor.netty.http.server.HttpServerRequest; import reactor.netty.http.server.HttpServerResponse; @@ -281,82 +280,36 @@ private Mono handleFileDownloadRequest( .requestMany(message) .switchOnFirst( (signal, flux) -> { - final var qualifier = message.qualifier(); - final var map = - DynamicQualifier.from("v1/endpoints/:endpointId/files/:name") - .matchQualifier(qualifier); - if (map == null) { - throw new RuntimeException("Wrong qualifier: " + qualifier); + if (signal.hasError()) { + throw Exceptions.propagate(signal.getThrowable()); } - final var filename = map.get("name"); - final var statusCode = toStatusCode(signal); + if (!signal.hasValue()) { + throw new InternalServiceException("File stream is missing or invalid"); + } - if (statusCode != HttpResponseStatus.OK.code()) { - return response - .status(statusCode) - .sendString(Mono.just(errorMessage(statusCode, filename))) - .then(); + final var downloadMessage = signal.get(); + if (downloadMessage.isError()) { + return error(response, downloadMessage); } - final Flux responseFlux = - flux.map( - sm -> { - if (sm.isError()) { - throw new RuntimeException("File stream was interrupted"); - } - return sm.data(); - }); + downloadMessage.headers().forEach(response::header); return response - .header("Content-Type", "application/octet-stream") - .header("Content-Disposition", "attachment; filename=" + filename) - .send(responseFlux) + .send( + flux.skip(1) + .map( + sm -> { + if (sm.isError()) { + throw new RuntimeException("File stream was interrupted"); + } + return sm.data(); + })) .then(); }) .then(); } - private static int toStatusCode(Signal signal) { - if (signal.hasError()) { - return toStatusCode(signal.getThrowable()); - } - - if (!signal.hasValue()) { - return HttpResponseStatus.NO_CONTENT.code(); - } - - return toStatusCode(signal.get()); - } - - private static int toStatusCode(Throwable throwable) { - if (throwable instanceof ServiceException e) { - return e.errorCode(); - } else { - return HttpResponseStatus.INTERNAL_SERVER_ERROR.code(); - } - } - - private static int toStatusCode(ServiceMessage serviceMessage) { - if (serviceMessage == null || !serviceMessage.hasData()) { - return HttpResponseStatus.NO_CONTENT.code(); - } - - if (serviceMessage.isError()) { - return HttpResponseStatus.INTERNAL_SERVER_ERROR.code(); - } - - return HttpResponseStatus.OK.code(); - } - - private static String errorMessage(int statusCode, String fileName) { - if (statusCode == 500) { - return "File not found: " + fileName; - } else { - return HttpResponseStatus.valueOf(statusCode).reasonPhrase(); - } - } - private static Flux createFileFlux(HttpData httpData) { try { if (httpData.isInMemory()) { diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/files/FileDownloadTest.java b/services-gateway/src/test/java/io/scalecube/services/gateway/files/FileDownloadTest.java index a4f377dbf..5f19d917e 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/files/FileDownloadTest.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/files/FileDownloadTest.java @@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.anyString; @@ -33,15 +34,16 @@ import java.net.http.HttpResponse; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Duration; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import reactor.core.Exceptions; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -119,170 +121,212 @@ void afterEach() { } } - @ParameterizedTest(name = "Download file of size {0} bytes") - @ValueSource(longs = {512, 1024, 1024 * 1024, 10 * 1024 * 1024}) - void testDownloadSuccessfully(long fileSize) throws IOException { - final var reportResponse = - serviceCall - .api(ReportService.class) - .exportReport(new ExportReportRequest().fileSize(fileSize)) - .block(TIMEOUT); - assertNotNull(reportResponse, "reportResponse"); - assertNotNull(reportResponse.reportPath(), "reportResponse.reportPath"); - assertTrue(reportResponse.reportPath().matches("^v1/endpoints/.*/files/.*$")); - - final var file = - downloadFile("http://localhost:" + httpAddress.port() + "/" + reportResponse.reportPath()); - final var list = Files.readAllLines(file); - - assertTrue(file.toFile().length() >= fileSize, "fileSize: " + file.toFile().length()); - for (String s : list) { - assertTrue(s.startsWith("export report @"), "line: " + s); + @Nested + class FileServiceTests { + + @ParameterizedTest(name = "Download file of size {0} bytes") + @ValueSource(longs = {0, 512, 1024, 1024 * 1024, 10 * 1024 * 1024}) + void testDownloadSuccessfully(long fileSize) throws Exception { + final var reportResponse = + serviceCall + .api(ReportService.class) + .exportReport(new ExportReportRequest().fileSize(fileSize)) + .block(TIMEOUT); + assertNotNull(reportResponse, "reportResponse"); + assertNotNull(reportResponse.reportPath(), "reportResponse.reportPath"); + assertTrue(reportResponse.reportPath().matches("^v1/endpoints/.*/files/.*$")); + + final var file = + downloadFile( + "http://localhost:" + httpAddress.port() + "/" + reportResponse.reportPath()); + assertTrue(Files.exists(file), "File does not exist"); + assertTrue(file.toFile().length() >= fileSize, "fileSize: " + file.toFile().length()); + for (String s : Files.readAllLines(file)) { + assertTrue(s.startsWith("export report @"), "line: " + s); + } } - } - @Test - void testFileExpired() throws InterruptedException { - final var ttl = 500; - final var reportResponse = - serviceCall - .api(ReportService.class) - .exportReport(new ExportReportRequest().ttl(ttl)) - .block(TIMEOUT); - assertNotNull(reportResponse, "reportResponse"); - assertNotNull(reportResponse.reportPath(), "reportResponse.reportPath"); - assertTrue(reportResponse.reportPath().matches("^v1/endpoints/.*/files/.*$")); + @Test + void testFileExpired() throws Exception { + final var ttl = 500; + final var reportResponse = + serviceCall + .api(ReportService.class) + .exportReport(new ExportReportRequest().ttl(ttl)) + .block(TIMEOUT); + assertNotNull(reportResponse, "reportResponse"); + assertNotNull(reportResponse.reportPath(), "reportResponse.reportPath"); + assertTrue(reportResponse.reportPath().matches("^v1/endpoints/.*/files/.*$")); - // Download file first time + // Download file first time - downloadFile("http://localhost:" + httpAddress.port() + "/" + reportResponse.reportPath()); + downloadFile("http://localhost:" + httpAddress.port() + "/" + reportResponse.reportPath()); - // Await file expiration + // Await file expiration - Thread.sleep(ttl * 3); + Thread.sleep(ttl * 3); - // Verify that file is expired + // Verify that file is expired - try { - downloadFile("http://localhost:" + httpAddress.port() + "/" + reportResponse.reportPath()); - fail("Expected exception"); - } catch (Exception e) { - final var cause = getRootCause(e); - assertInstanceOf(IOException.class, cause); - final var ex = (IOException) cause; - final var errorType = InternalServiceException.ERROR_TYPE; - final var message = ex.getMessage(); - assertTrue( - message.startsWith("No Content-Disposition header in response [" + errorType), - "message: " + message); + try { + downloadFile("http://localhost:" + httpAddress.port() + "/" + reportResponse.reportPath()); + fail("Expected exception"); + } catch (Exception e) { + final var cause = getRootCause(e); + assertInstanceOf(IOException.class, cause); + final var ex = (IOException) cause; + final var errorType = InternalServiceException.ERROR_TYPE; + final var message = ex.getMessage(); + assertTrue( + message.startsWith("No Content-Disposition header in response [" + errorType), + "message: " + message); + } } - } - @Test - void testWrongFile() { - StepVerifier.create(serviceCall.api(ReportService.class).exportReportWrongFile()) - .expectSubscription() - .verifyErrorSatisfies( - ex -> { - assertInstanceOf(InternalServiceException.class, ex, "exceptionType"); - final var serviceException = (InternalServiceException) ex; - assertEquals(500, serviceException.errorCode()); - assertTrue(serviceException.getMessage().startsWith("Wrong file: target")); - }); - } + @Test + void testWrongFile() { + StepVerifier.create(serviceCall.api(ReportService.class).exportReportWrongFile()) + .expectSubscription() + .verifyErrorSatisfies( + ex -> { + assertInstanceOf(InternalServiceException.class, ex, "exceptionType"); + final var serviceException = (InternalServiceException) ex; + assertEquals(500, serviceException.errorCode()); + assertTrue(serviceException.getMessage().startsWith("Wrong file: target")); + }); + } - @Test - void testFileNotFound() { - final var reportResponse = - serviceCall.api(ReportService.class).exportReport(new ExportReportRequest()).block(TIMEOUT); - assertNotNull(reportResponse, "reportResponse"); - assertNotNull(reportResponse.reportPath(), "reportResponse.reportPath"); - assertTrue(reportResponse.reportPath().matches("^v1/endpoints/.*/files/.*$")); - - final var reportPath = reportResponse.reportPath(); - final var s = reportPath.substring(reportPath.lastIndexOf("/")); - final var newReportPath = - reportPath.replace(s, "/file_must_not_be_found_" + System.currentTimeMillis()); - - try { - downloadFile("http://localhost:" + httpAddress.port() + "/" + newReportPath); - fail("Expected exception"); - } catch (Exception e) { - final var cause = getRootCause(e); - assertInstanceOf(IOException.class, cause); - final var ex = (IOException) cause; - final var errorType = InternalServiceException.ERROR_TYPE; - final var message = ex.getMessage(); - assertTrue( - message.startsWith("No Content-Disposition header in response [" + errorType), - "message: " + message); + @Test + void testFileNotFound() { + final var reportResponse = + serviceCall + .api(ReportService.class) + .exportReport(new ExportReportRequest()) + .block(TIMEOUT); + assertNotNull(reportResponse, "reportResponse"); + assertNotNull(reportResponse.reportPath(), "reportResponse.reportPath"); + assertTrue(reportResponse.reportPath().matches("^v1/endpoints/.*/files/.*$")); + + final var reportPath = reportResponse.reportPath(); + final var s = reportPath.substring(reportPath.lastIndexOf("/")); + final var newReportPath = + reportPath.replace(s, "/file_must_not_be_found_" + System.currentTimeMillis()); + + try { + downloadFile("http://localhost:" + httpAddress.port() + "/" + newReportPath); + fail("Expected exception"); + } catch (Exception e) { + final var cause = getRootCause(e); + assertInstanceOf(IOException.class, cause); + final var ex = (IOException) cause; + final var errorType = InternalServiceException.ERROR_TYPE; + final var message = ex.getMessage(); + assertTrue( + message.startsWith("No Content-Disposition header in response [" + errorType), + "message: " + message); + } } - } - @Test - void testWrongFilename() { - final var reportResponse = - serviceCall.api(ReportService.class).exportReport(new ExportReportRequest()).block(TIMEOUT); - assertNotNull(reportResponse, "reportResponse"); - assertNotNull(reportResponse.reportPath(), "reportResponse.reportPath"); - assertTrue(reportResponse.reportPath().matches("^v1/endpoints/.*/files/.*$")); - - final var reportPath = reportResponse.reportPath(); - final var s = reportPath.substring(reportPath.lastIndexOf("/")); - final var newReportPath = reportPath.replace(s, ""); - - try { - downloadFile("http://localhost:" + httpAddress.port() + "/" + newReportPath); - fail("Expected exception"); - } catch (Exception e) { - final var cause = getRootCause(e); - assertInstanceOf(IOException.class, cause); - final var ex = (IOException) cause; - final var errorType = ServiceUnavailableException.ERROR_TYPE; - final var message = ex.getMessage(); - assertTrue( - message.startsWith("No Content-Disposition header in response [" + errorType), - "message: " + message); + @Test + void testWrongFilename() { + final var reportResponse = + serviceCall + .api(ReportService.class) + .exportReport(new ExportReportRequest()) + .block(TIMEOUT); + assertNotNull(reportResponse, "reportResponse"); + assertNotNull(reportResponse.reportPath(), "reportResponse.reportPath"); + assertTrue(reportResponse.reportPath().matches("^v1/endpoints/.*/files/.*$")); + + final var reportPath = reportResponse.reportPath(); + final var s = reportPath.substring(reportPath.lastIndexOf("/")); + final var newReportPath = reportPath.replace(s, ""); + + try { + downloadFile("http://localhost:" + httpAddress.port() + "/" + newReportPath); + fail("Expected exception"); + } catch (Exception e) { + final var cause = getRootCause(e); + assertInstanceOf(IOException.class, cause); + final var ex = (IOException) cause; + final var errorType = ServiceUnavailableException.ERROR_TYPE; + final var message = ex.getMessage(); + assertTrue( + message.startsWith("No Content-Disposition header in response [" + errorType), + "message: " + message); + } } - } - @Test - void testAnotherWrongFilename() { - final var reportResponse = - serviceCall.api(ReportService.class).exportReport(new ExportReportRequest()).block(TIMEOUT); - assertNotNull(reportResponse, "reportResponse"); - assertNotNull(reportResponse.reportPath(), "reportResponse.reportPath"); - assertTrue(reportResponse.reportPath().matches("^v1/endpoints/.*/files/.*$")); - - final var reportPath = reportResponse.reportPath(); - final var s = reportPath.substring(reportPath.lastIndexOf("/")); - final var newReportPath = reportPath.replace(s, "/"); - - try { - downloadFile("http://localhost:" + httpAddress.port() + "/" + newReportPath); - fail("Expected exception"); - } catch (Exception e) { - final var cause = getRootCause(e); - assertInstanceOf(IOException.class, cause); - final var ex = (IOException) cause; - final var errorType = ServiceUnavailableException.ERROR_TYPE; - final var message = ex.getMessage(); - assertTrue( - message.startsWith("No Content-Disposition header in response [" + errorType), - "message: " + message); + @Test + void testAnotherWrongFilename() { + final var reportResponse = + serviceCall + .api(ReportService.class) + .exportReport(new ExportReportRequest()) + .block(TIMEOUT); + assertNotNull(reportResponse, "reportResponse"); + assertNotNull(reportResponse.reportPath(), "reportResponse.reportPath"); + assertTrue(reportResponse.reportPath().matches("^v1/endpoints/.*/files/.*$")); + + final var reportPath = reportResponse.reportPath(); + final var s = reportPath.substring(reportPath.lastIndexOf("/")); + final var newReportPath = reportPath.replace(s, "/"); + + try { + downloadFile("http://localhost:" + httpAddress.port() + "/" + newReportPath); + fail("Expected exception"); + } catch (Exception e) { + final var cause = getRootCause(e); + assertInstanceOf(IOException.class, cause); + final var ex = (IOException) cause; + final var errorType = ServiceUnavailableException.ERROR_TYPE; + final var message = ex.getMessage(); + assertTrue( + message.startsWith("No Content-Disposition header in response [" + errorType), + "message: " + message); + } } } - private static Path downloadFile(String reportUrl) { - try { - return HttpClient.newHttpClient() - .send( - HttpRequest.newBuilder().uri(URI.create(reportUrl)).build(), - HttpResponse.BodyHandlers.ofFileDownload(Path.of("target"), CREATE, WRITE)) - .body(); - } catch (Exception e) { - throw Exceptions.propagate(e); + @Nested + class DirectDownloadTests { + + @ParameterizedTest(name = "Fail on download: {0}") + @ValueSource( + strings = { + "immediateErrorOnDownload", + "emptyOnDownload", + "missingContentDispositionHeaderOnDownload" + }) + void failOnDownload(String path) { + final var exception = + assertThrows( + IOException.class, + () -> downloadFile("http://localhost:" + httpAddress.port() + "/v1/api/" + path)); + assertTrue(exception.getMessage().contains("No Content-Disposition header")); } + + @ParameterizedTest(name = "Download file of size {0} bytes") + @ValueSource(longs = {0, 512, 1024, 1024 * 1024, 10 * 1024 * 1024}) + void successfulDownload(long fileSize) throws Exception { + final var file = + downloadFile( + "http://localhost:" + httpAddress.port() + "/v1/api/successfulDownload/" + fileSize); + assertTrue(Files.exists(file), "File does not exist"); + assertTrue(file.toFile().length() >= fileSize, "fileSize: " + file.toFile().length()); + for (String s : Files.readAllLines(file)) { + assertTrue(s.startsWith("export report @"), "line: " + s); + } + } + } + + private static Path downloadFile(String reportUrl) throws Exception { + Path tmpPath = Paths.get(System.getProperty("java.io.tmpdir")); + return HttpClient.newHttpClient() + .send( + HttpRequest.newBuilder().uri(URI.create(reportUrl)).build(), + HttpResponse.BodyHandlers.ofFileDownload(tmpPath, CREATE, WRITE)) + .body(); } private static Throwable getRootCause(Throwable throwable) { diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/files/ReportService.java b/services-gateway/src/test/java/io/scalecube/services/gateway/files/ReportService.java index 22184075a..8fab07fee 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/files/ReportService.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/files/ReportService.java @@ -1,8 +1,11 @@ package io.scalecube.services.gateway.files; import io.scalecube.services.annotations.RequestType; +import io.scalecube.services.annotations.ResponseType; import io.scalecube.services.annotations.Service; import io.scalecube.services.annotations.ServiceMethod; +import io.scalecube.services.annotations.Tag; +import io.scalecube.services.api.ServiceMessage; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -22,4 +25,24 @@ public interface ReportService { @RequestType(byte[].class) @ServiceMethod Mono uploadReportError(Flux reportStream); + + @Tag(key = "Content-Type", value = "application/file") + @ResponseType(byte[].class) + @ServiceMethod + Flux immediateErrorOnDownload(); + + @Tag(key = "Content-Type", value = "application/file") + @ResponseType(byte[].class) + @ServiceMethod + Flux emptyOnDownload(); + + @Tag(key = "Content-Type", value = "application/file") + @ResponseType(byte[].class) + @ServiceMethod + Flux missingContentDispositionHeaderOnDownload(); + + @Tag(key = "Content-Type", value = "application/file") + @ResponseType(byte[].class) + @ServiceMethod("successfulDownload/:fileSize") + Flux successfulDownload(); } diff --git a/services-gateway/src/test/java/io/scalecube/services/gateway/files/ReportServiceImpl.java b/services-gateway/src/test/java/io/scalecube/services/gateway/files/ReportServiceImpl.java index ab43dff36..92ae3fba0 100644 --- a/services-gateway/src/test/java/io/scalecube/services/gateway/files/ReportServiceImpl.java +++ b/services-gateway/src/test/java/io/scalecube/services/gateway/files/ReportServiceImpl.java @@ -5,7 +5,9 @@ import io.scalecube.services.RequestContext; import io.scalecube.services.annotations.AfterConstruct; +import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.files.AddFileRequest; +import io.scalecube.services.files.FileChannelFlux; import io.scalecube.services.files.FileService; import java.io.File; import java.io.IOException; @@ -85,6 +87,63 @@ public Mono uploadReportError(Flux reportStream) { }); } + @Override + public Flux immediateErrorOnDownload() { + return Flux.error(new RuntimeException("Immediate error on download")); + } + + @Override + public Flux emptyOnDownload() { + return Flux.empty(); + } + + @Override + public Flux missingContentDispositionHeaderOnDownload() { + return RequestContext.deferContextual() + .flatMapMany( + context -> { + final var headers = context.headers(); + + final var message = + ServiceMessage.from(headers) + // .header("Content-Type", "application/octet-stream") + // .header("Content-Disposition", "attachment; filename=" + name) + .build(); + + return Flux.just(message) + .concatWith( + Flux.just(new byte[0]) + .map(bytes -> ServiceMessage.from(headers).data(bytes).build())); + }); + } + + @Override + public Flux successfulDownload() { + return RequestContext.deferContextual() + .flatMapMany( + context -> { + final var fileSize = context.pathVar("fileSize", Long.class); + final var headers = context.headers(); + final File file; + try { + file = generateFile(Files.createTempFile("export_report_", null), fileSize); + } catch (IOException e) { + throw new RuntimeException(e); + } + + final var message = + ServiceMessage.from(headers) + .header("Content-Type", "application/octet-stream") + .header("Content-Disposition", "attachment; filename=" + file.getName()) + .build(); + + return Flux.just(message) + .concatWith( + FileChannelFlux.createFrom(file.toPath()) + .map(bytes -> ServiceMessage.from(headers).data(bytes).build())); + }); + } + public static File generateFile(final Path file, final long maxSize) throws IOException { String lineTemplate = "export report @ "; byte[] lineBytes; diff --git a/services/src/main/java/io/scalecube/services/files/FileService.java b/services/src/main/java/io/scalecube/services/files/FileService.java index ef2cb9d97..f04cdc8fb 100644 --- a/services/src/main/java/io/scalecube/services/files/FileService.java +++ b/services/src/main/java/io/scalecube/services/files/FileService.java @@ -5,18 +5,18 @@ import reactor.core.publisher.Mono; /** - * Service interface for adding files locally, those added files will be accessible by {@link + * Service interface for adding files locally. Added files will be accessible by {@link * FileStreamer}. Typical usage: client defines an app service with injected {@link FileService}, * client generates a file in the app service, then calls {@link #addFile(AddFileRequest)}, then * returns result (file path qualifier) all the way back to the caller of app service. On the caller - * side file path qualifier gets combined with http-gateway address, and then url for file streaming - * is ready. Then caller side has time to download a file until file gets expired. + * side file path qualifier gets combined with http-gateway address, and then url for file download + * is ready. */ @Service public interface FileService { /** - * Adding file and returning path qualifier for the added file. {@link AddFileRequest} must + * Adds a file and returning path qualifier for the added file. {@link AddFileRequest} must * contain {@code file} that exists, that is not directory, and must have valid path, another * parameter - {@code ttl} (optional) represents time after which file will be deleted. Returned * file path qualifier comes as: {@code v1/scalecube.endpoints/${microservices:id}/files/:name}, diff --git a/services/src/main/java/io/scalecube/services/files/FileServiceImpl.java b/services/src/main/java/io/scalecube/services/files/FileServiceImpl.java index cd6517b8d..e769bfe5f 100644 --- a/services/src/main/java/io/scalecube/services/files/FileServiceImpl.java +++ b/services/src/main/java/io/scalecube/services/files/FileServiceImpl.java @@ -3,6 +3,7 @@ import io.scalecube.services.Microservices; import io.scalecube.services.RequestContext; import io.scalecube.services.annotations.AfterConstruct; +import io.scalecube.services.api.ServiceMessage; import java.io.File; import java.io.FileNotFoundException; import java.nio.file.Files; @@ -90,17 +91,28 @@ public Mono addFile(AddFileRequest request) { } @Override - public Flux streamFile() { + public Flux streamFile() { return RequestContext.deferContextual() .flatMapMany( context -> { - final var name = context.pathVar("name"); - final var filePath = baseDir.resolve(name); - if (!isPathValid(filePath)) { - return Flux.error(new FileNotFoundException("File not found: " + name)); - } else { - return FileChannelFlux.createFrom(filePath, chunkSize); + final var headers = context.headers(); + final var filename = context.pathVar("filename"); + final var path = baseDir.resolve(filename); + + if (!isPathValid(path)) { + return Flux.error(new FileNotFoundException("File not found: " + filename)); } + + final var message = + ServiceMessage.from(headers) + .header("Content-Type", "application/octet-stream") + .header("Content-Disposition", "attachment; filename=" + filename) + .build(); + + return Flux.just(message) + .concatWith( + FileChannelFlux.createFrom(path, chunkSize) + .map(bytes -> ServiceMessage.from(headers).data(bytes).build())); }); } diff --git a/services/src/main/java/io/scalecube/services/files/FileStreamer.java b/services/src/main/java/io/scalecube/services/files/FileStreamer.java index 3e7cd17a0..704086abf 100644 --- a/services/src/main/java/io/scalecube/services/files/FileStreamer.java +++ b/services/src/main/java/io/scalecube/services/files/FileStreamer.java @@ -1,15 +1,17 @@ package io.scalecube.services.files; +import io.scalecube.services.annotations.ResponseType; import io.scalecube.services.annotations.RestMethod; import io.scalecube.services.annotations.Service; import io.scalecube.services.annotations.ServiceMethod; import io.scalecube.services.annotations.Tag; +import io.scalecube.services.api.ServiceMessage; import reactor.core.publisher.Flux; /** * System service interface for streaming files after they have been added locally with {@link * FileService#addFile(AddFileRequest)}. NOTE: this is system service interface, clients are not - * supposed to inject it into their app services and call it directly. + * supposed to use it directly. */ @Service(FileStreamer.NAMESPACE) public interface FileStreamer { @@ -18,6 +20,7 @@ public interface FileStreamer { @Tag(key = "Content-Type", value = "application/file") @RestMethod("GET") - @ServiceMethod("${microservices:id}/files/:name") - Flux streamFile(); + @ResponseType(byte[].class) + @ServiceMethod("${microservices:id}/files/:filename") + Flux streamFile(); }