Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.scalecube.services.api;

import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -50,15 +52,25 @@ private ServiceMessage(Builder builder) {
}

/**
* Instantiates new message with the same data and headers as at given message.
* Instantiates new message builder with the same data and headers as at given message.
*
* @param message the message to be copied
* @return a new message, with the same data and headers
* @return message builder
*/
public static Builder from(ServiceMessage message) {
return ServiceMessage.builder().data(message.data()).headers(message.headers());
}

/**
* Instantiates new message builder with the headers as at given message.
*
* @param headers the headers to be copied
* @return message builder
*/
public static Builder from(Map<String, String> headers) {
return ServiceMessage.builder().headers(headers);
}

/**
* Instantiates new message with error qualifier for given error type and specified error code and
* message.
Expand All @@ -81,7 +93,7 @@ public static ServiceMessage error(
/**
* Instantiates new empty message builder.
*
* @return new builder
* @return new message builder
*/
public static Builder builder() {
return new Builder();
Expand Down Expand Up @@ -196,7 +208,7 @@ public int errorType() {
/**
* Returns request method header.
*
* @return request method, or null if such header doesn't exist.
* @return request method, or null if does not exist
*/
public String requestMethod() {
return headers.get(HEADER_REQUEST_METHOD);
Expand All @@ -205,20 +217,43 @@ public String requestMethod() {
/**
* Returns upload filename header.
*
* @return upload filename, or null if such header doesn't exist.
* @return request method, or null if does not exist
*/
public String uploadFilename() {
return headers.get(HEADER_UPLOAD_FILENAME);
}

@Override
public String toString() {
return new StringJoiner(", ", "ServiceMessage" + "[", "]")
.add("headers(" + headers.size() + ")")
.add("data=" + data)
return new StringJoiner(", ", ServiceMessage.class.getSimpleName() + "[", "]")
.add("headers[" + headers.size() + "]")
.add("data=" + toString(data))
.toString();
}

public static String toString(Object request) {
if (request == null) {
return "null";
}
// Handle arrays
if (request.getClass().isArray()) {
return request.getClass().getComponentType().getSimpleName()
+ "["
+ Array.getLength(request)
+ "]";
}
// Handle collections
if (request instanceof Collection<?> collection) {
return collection.getClass().getSimpleName() + "[" + collection.size() + "]";
}
// Handle maps
if (request instanceof Map<?, ?> map) {
return map.getClass().getSimpleName() + "[" + map.size() + "]";
}
// Fallback
return String.valueOf(request);
}

public static class Builder {

private final Map<String, String> headers = new HashMap<>(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
import io.scalecube.services.exceptions.ForbiddenException;
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
import io.scalecube.services.transport.api.ServiceMessageDataDecoder;
import java.lang.reflect.Array;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -73,7 +71,7 @@ public Mono<ServiceMessage> invokeOne(ServiceMessage message) {
logger.debug(
"[{}] request: {}, response: {}",
message.qualifier(),
toString(request),
ServiceMessage.toString(request),
response);
}
})
Expand All @@ -83,7 +81,7 @@ public Mono<ServiceMessage> invokeOne(ServiceMessage message) {
logger.error(
"[{}][error] request: {}",
message.qualifier(),
toString(request),
ServiceMessage.toString(request),
ex);
}
});
Expand Down Expand Up @@ -119,14 +117,16 @@ public Flux<ServiceMessage> invokeMany(ServiceMessage message) {
logger.debug(
"[{}][subscribe] request: {}",
message.qualifier(),
toString(request));
ServiceMessage.toString(request));
}
})
.doOnComplete(
() -> {
if (logger != null && logger.isDebugEnabled()) {
logger.debug(
"[{}][complete] request: {}", message.qualifier(), toString(request));
"[{}][complete] request: {}",
message.qualifier(),
ServiceMessage.toString(request));
}
})
.doOnError(
Expand All @@ -135,7 +135,7 @@ public Flux<ServiceMessage> invokeMany(ServiceMessage message) {
logger.error(
"[{}][error] request: {}",
message.qualifier(),
toString(request),
ServiceMessage.toString(request),
ex);
}
});
Expand Down Expand Up @@ -181,7 +181,7 @@ public Flux<ServiceMessage> invokeBidirectional(Publisher<ServiceMessage> publis
logger.debug(
"[{}][subscribe] request: {}",
qualifier,
toString(request));
ServiceMessage.toString(request));
}
})
.doOnComplete(
Expand All @@ -190,7 +190,7 @@ public Flux<ServiceMessage> invokeBidirectional(Publisher<ServiceMessage> publis
logger.debug(
"[{}][complete] request: {}",
qualifier,
toString(request));
ServiceMessage.toString(request));
}
})
.doOnError(
Expand All @@ -199,7 +199,7 @@ public Flux<ServiceMessage> invokeBidirectional(Publisher<ServiceMessage> publis
logger.error(
"[{}][error] request: {}",
qualifier,
toString(request),
ServiceMessage.toString(request),
ex);
}
})
Expand Down Expand Up @@ -322,27 +322,4 @@ private Mono<Principal> mapPrincipal(RequestContext context) {
return Mono.defer(() -> principalMapper.map(context))
.switchIfEmpty(Mono.just(context.principal()));
}

private static String toString(Object request) {
if (request == null) {
return "null";
}
// Handle arrays
if (request.getClass().isArray()) {
return request.getClass().getComponentType().getSimpleName()
+ "["
+ Array.getLength(request)
+ "]";
}
// Handle collections
if (request instanceof Collection<?> collection) {
return collection.getClass().getSimpleName() + "[" + collection.size() + "]";
}
// Handle maps
if (request instanceof Map<?, ?> map) {
return map.getClass().getSimpleName() + "[" + map.size() + "]";
}
// Fallback
return String.valueOf(request);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@
import io.netty.handler.codec.http.multipart.HttpData;
import io.scalecube.services.ServiceCall;
import io.scalecube.services.ServiceReference;
import io.scalecube.services.api.DynamicQualifier;
import io.scalecube.services.api.ErrorData;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.api.ServiceMessage.Builder;
import io.scalecube.services.exceptions.BadRequestException;
import io.scalecube.services.exceptions.ServiceException;
import io.scalecube.services.exceptions.InternalServiceException;
import io.scalecube.services.exceptions.ServiceProviderErrorMapper;
import io.scalecube.services.files.FileChannelFlux;
import io.scalecube.services.registry.api.ServiceRegistry;
Expand All @@ -40,9 +39,9 @@
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

Expand Down Expand Up @@ -281,82 +280,36 @@ private Mono<Void> handleFileDownloadRequest(
.requestMany(message)
.switchOnFirst(
(signal, flux) -> {
final var qualifier = message.qualifier();
final var map =
DynamicQualifier.from("v1/endpoints/:endpointId/files/:name")
.matchQualifier(qualifier);
if (map == null) {
throw new RuntimeException("Wrong qualifier: " + qualifier);
if (signal.hasError()) {
throw Exceptions.propagate(signal.getThrowable());
}

final var filename = map.get("name");
final var statusCode = toStatusCode(signal);
if (!signal.hasValue()) {
throw new InternalServiceException("File stream is missing or invalid");
}

if (statusCode != HttpResponseStatus.OK.code()) {
return response
.status(statusCode)
.sendString(Mono.just(errorMessage(statusCode, filename)))
.then();
final var downloadMessage = signal.get();
if (downloadMessage.isError()) {
return error(response, downloadMessage);
}

final Flux<ByteBuf> responseFlux =
flux.map(
sm -> {
if (sm.isError()) {
throw new RuntimeException("File stream was interrupted");
}
return sm.data();
});
downloadMessage.headers().forEach(response::header);

return response
.header("Content-Type", "application/octet-stream")
.header("Content-Disposition", "attachment; filename=" + filename)
.send(responseFlux)
.send(
flux.skip(1)
.map(
sm -> {
if (sm.isError()) {
throw new RuntimeException("File stream was interrupted");
}
return sm.data();
}))
.then();
})
.then();
}

private static int toStatusCode(Signal<? extends ServiceMessage> signal) {
if (signal.hasError()) {
return toStatusCode(signal.getThrowable());
}

if (!signal.hasValue()) {
return HttpResponseStatus.NO_CONTENT.code();
}

return toStatusCode(signal.get());
}

private static int toStatusCode(Throwable throwable) {
if (throwable instanceof ServiceException e) {
return e.errorCode();
} else {
return HttpResponseStatus.INTERNAL_SERVER_ERROR.code();
}
}

private static int toStatusCode(ServiceMessage serviceMessage) {
if (serviceMessage == null || !serviceMessage.hasData()) {
return HttpResponseStatus.NO_CONTENT.code();
}

if (serviceMessage.isError()) {
return HttpResponseStatus.INTERNAL_SERVER_ERROR.code();
}

return HttpResponseStatus.OK.code();
}

private static String errorMessage(int statusCode, String fileName) {
if (statusCode == 500) {
return "File not found: " + fileName;
} else {
return HttpResponseStatus.valueOf(statusCode).reasonPhrase();
}
}

private static Flux<byte[]> createFileFlux(HttpData httpData) {
try {
if (httpData.isInMemory()) {
Expand Down
Loading