Skip to content

WIP: gprc interceptor #19005

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.opensearch.transport.grpc;

import io.grpc.ServerInterceptor;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.network.NetworkService;
Expand All @@ -28,6 +29,8 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.AuxTransport;
import org.opensearch.transport.client.Client;
import org.opensearch.transport.grpc.interceptor.GrpcInterceptorProvider;
import org.opensearch.transport.grpc.interceptor.OrderedGrpcInterceptor;
import org.opensearch.transport.grpc.proto.request.search.query.AbstractQueryBuilderProtoUtils;
import org.opensearch.transport.grpc.proto.request.search.query.QueryBuilderProtoConverter;
import org.opensearch.transport.grpc.proto.request.search.query.QueryBuilderProtoConverterRegistry;
Expand All @@ -36,11 +39,7 @@
import org.opensearch.transport.grpc.ssl.SecureNetty4GrpcServerTransport;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.function.Supplier;

import io.grpc.BindableService;
Expand Down Expand Up @@ -69,6 +68,7 @@ public final class GrpcPlugin extends Plugin implements NetworkPlugin, Extensibl
private final List<QueryBuilderProtoConverter> queryConverters = new ArrayList<>();
private QueryBuilderProtoConverterRegistry queryRegistry;
private AbstractQueryBuilderProtoUtils queryUtils;
private final List<ServerInterceptor> serverInterceptors = new ArrayList<>();

/**
* Creates a new GrpcPlugin instance.
Expand All @@ -88,6 +88,17 @@ public void loadExtensions(ExtensiblePlugin.ExtensionLoader loader) {
if (extensions != null) {
queryConverters.addAll(extensions);
}
List<GrpcInterceptorProvider> providers = loader.loadExtensions(GrpcInterceptorProvider.class);
if (providers != null) {
List<OrderedGrpcInterceptor> orderedList = new ArrayList<>();
for (GrpcInterceptorProvider provider : providers) {
orderedList.addAll(provider.getOrderedGrpcInterceptors());
}
orderedList.sort(Comparator.comparingInt(OrderedGrpcInterceptor::getOrder));
for (OrderedGrpcInterceptor ordered : orderedList) {
serverInterceptors.add(ordered.getInterceptor());
}
}
}

/**
Expand Down Expand Up @@ -148,7 +159,7 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports(
);
return Collections.singletonMap(
GRPC_TRANSPORT_SETTING_KEY,
() -> new Netty4GrpcServerTransport(settings, grpcServices, networkService)
() -> new Netty4GrpcServerTransport(settings, grpcServices, networkService, serverInterceptors)
);
}

Expand Down Expand Up @@ -191,7 +202,7 @@ public Map<String, Supplier<AuxTransport>> getSecureAuxTransports(
);
return Collections.singletonMap(
GRPC_SECURE_TRANSPORT_SETTING_KEY,
() -> new SecureNetty4GrpcServerTransport(settings, grpcServices, networkService, secureAuxTransportSettingsProvider)
() -> new SecureNetty4GrpcServerTransport(settings, grpcServices, networkService, secureAuxTransportSettingsProvider, serverInterceptors)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.transport.grpc;

import io.grpc.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.network.NetworkService;
Expand Down Expand Up @@ -37,8 +38,6 @@
import java.util.function.Function;
import java.util.function.UnaryOperator;

import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.netty.shaded.io.netty.channel.EventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
Expand Down Expand Up @@ -195,6 +194,7 @@ public class Netty4GrpcServerTransport extends AuxTransport {
private final int nettyEventLoopThreads;
private final long maxInboundMessageSize;
private final long maxConcurrentConnectionCalls;
private final List<ServerInterceptor> serverInterceptors;
private final TimeValue maxConnectionAge;
private final TimeValue maxConnectionIdle;
private final TimeValue keepAliveTimeout;
Expand All @@ -210,10 +210,11 @@ public class Netty4GrpcServerTransport extends AuxTransport {
* @param services the gRPC compatible services to be registered with the server.
* @param networkService the bind/publish addresses.
*/
public Netty4GrpcServerTransport(Settings settings, List<BindableService> services, NetworkService networkService) {
public Netty4GrpcServerTransport(Settings settings, List<BindableService> services, NetworkService networkService, List<ServerInterceptor> serverInterceptors) {
logger.debug("Initializing Netty4GrpcServerTransport with settings = {}", settings);
this.settings = Objects.requireNonNull(settings);
this.services = Objects.requireNonNull(services);
this.serverInterceptors = Objects.requireNonNull(serverInterceptors);
this.networkService = Objects.requireNonNull(networkService);
final List<String> grpcBindHost = SETTING_GRPC_BIND_HOST.get(settings);
this.bindHosts = (grpcBindHost.isEmpty() ? NetworkService.GLOBAL_NETWORK_BIND_HOST_SETTING.get(settings) : grpcBindHost).toArray(
Expand Down Expand Up @@ -368,6 +369,10 @@ private TransportAddress bindAddress(InetAddress hostAddress, PortsRange portRan
.addService(new HealthStatusManager().getHealthService())
.addService(ProtoReflectionService.newInstance());

for (ServerInterceptor interceptor : serverInterceptors) {
serverBuilder.intercept(interceptor);
}

for (UnaryOperator<NettyServerBuilder> op : serverBuilderConfigs) {
op.apply(serverBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc.interceptor;

import java.util.List;

public interface GrpcInterceptorProvider {
List<OrderedGrpcInterceptor> getOrderedGrpcInterceptors();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport.grpc.interceptor;

import io.grpc.ServerInterceptor;

public interface OrderedGrpcInterceptor {
int getOrder(); // Lower values = higher priority
ServerInterceptor getInterceptor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.transport.grpc.ssl;

import io.grpc.ServerInterceptor;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -79,9 +80,10 @@ public SecureNetty4GrpcServerTransport(
Settings settings,
List<BindableService> services,
NetworkService networkService,
SecureAuxTransportSettingsProvider secureTransportSettingsProvider
SecureAuxTransportSettingsProvider secureTransportSettingsProvider,
List<ServerInterceptor> serverInterceptors
) {
super(settings, services, networkService);
super(settings, services, networkService, serverInterceptors);
this.port = SecureNetty4GrpcServerTransport.SETTING_GRPC_SECURE_PORT.get(settings);
this.portSettingKey = SecureNetty4GrpcServerTransport.SETTING_GRPC_SECURE_PORT.getKey();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hamcrest.MatcherAssert;
import org.junit.Before;

import java.util.ArrayList;
import java.util.List;

import io.grpc.BindableService;
Expand All @@ -37,7 +38,7 @@ public void setup() {
}

public void testBasicStartAndStop() {
try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(createSettings(), services, networkService)) {
try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(createSettings(), services, networkService, new ArrayList<>())) {
transport.start();

MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray()));
Expand All @@ -48,7 +49,7 @@ public void testBasicStartAndStop() {
}

public void testGrpcTransportHealthcheck() {
try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(createSettings(), services, networkService)) {
try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(createSettings(), services, networkService, new ArrayList<>())) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.getBoundAddress().boundAddresses());
try (NettyGrpcClient client = new NettyGrpcClient.Builder().setAddress(remoteAddress).build()) {
Expand All @@ -61,7 +62,7 @@ public void testGrpcTransportHealthcheck() {
}

public void testGrpcTransportListServices() {
try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(createSettings(), services, networkService)) {
try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(createSettings(), services, networkService, new ArrayList<>())) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.getBoundAddress().boundAddresses());
try (NettyGrpcClient client = new NettyGrpcClient.Builder().setAddress(remoteAddress).build()) {
Expand All @@ -77,7 +78,7 @@ public void testWithCustomPort() {
// Create settings with a specific port
Settings settings = Settings.builder().put(Netty4GrpcServerTransport.SETTING_GRPC_PORT.getKey(), "9000-9010").build();

try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService)) {
try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, new ArrayList<>())) {
transport.start();

MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray()));
Expand All @@ -96,7 +97,7 @@ public void testWithCustomPublishPort() {
.put(Netty4GrpcServerTransport.SETTING_GRPC_PUBLISH_PORT.getKey(), 9000)
.build();

try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService)) {
try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, new ArrayList<>())) {
transport.start();

MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray()));
Expand All @@ -115,7 +116,7 @@ public void testWithCustomHost() {
.put(Netty4GrpcServerTransport.SETTING_GRPC_HOST.getKey(), "127.0.0.1")
.build();

try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService)) {
try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, new ArrayList<>())) {
transport.start();

MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray()));
Expand All @@ -138,7 +139,7 @@ public void testWithCustomBindHost() {
.put(Netty4GrpcServerTransport.SETTING_GRPC_BIND_HOST.getKey(), "127.0.0.1")
.build();

try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService)) {
try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, new ArrayList<>())) {
transport.start();

MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray()));
Expand All @@ -161,7 +162,7 @@ public void testWithCustomPublishHost() {
.put(Netty4GrpcServerTransport.SETTING_GRPC_PUBLISH_HOST.getKey(), "127.0.0.1")
.build();

try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService)) {
try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, new ArrayList<>())) {
transport.start();

MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray()));
Expand All @@ -184,7 +185,7 @@ public void testWithCustomWorkerCount() {
.put(Netty4GrpcServerTransport.SETTING_GRPC_WORKER_COUNT.getKey(), 4)
.build();

try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService)) {
try (Netty4GrpcServerTransport transport = new Netty4GrpcServerTransport(settings, services, networkService, new ArrayList<>())) {
transport.start();

MatcherAssert.assertThat(transport.getBoundAddress().boundAddresses(), not(emptyArray()));
Expand Down
Loading