From 28aaa8d79b30ffe2d5dc0c01681538d2701c10cb Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 2 Oct 2025 20:44:37 +0200 Subject: [PATCH 1/9] Convert BytesTransportResponse when proxying response from/to local node execution, to a BytesTransportResponse as opposed to materializing the response object on heap. When a proxy node acts as a proxy to query its local data, and the coordinating node is on a different version than the proxy node, the response will fail to deserialize in the coord node because it was written with the version of the proxy node as opposed to that of the coord (target) node. This is because DirectResponseChannel does not read and write such response, which would lead to it being converted to the right format. This commit attempts to fix this problem by tracking the version used to write the response, and conditionally converting it in the ProxyRequestHandler. --- .../SearchQueryThenFetchAsyncAction.java | 5 +++- .../transport/BytesTransportResponse.java | 24 +++++++++++++++++++ .../transport/TransportActionProxy.java | 9 +++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index a8f22eb1cc572..65f6ef090445a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -845,7 +845,10 @@ void onShardDone() { out.close(); } } - ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(out.moveToBytesReference())); + ActionListener.respondAndRelease( + channelListener, + new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion(), namedWriteableRegistry) + ); } private void maybeFreeContext( diff --git a/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java b/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java index 571d0d4008e24..b48cd6210921c 100644 --- a/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java +++ b/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java @@ -9,7 +9,11 @@ package org.elasticsearch.transport; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; @@ -20,9 +24,29 @@ public class BytesTransportResponse extends TransportResponse implements BytesTransportMessage { private final ReleasableBytesReference bytes; + private final TransportVersion version; + private final NamedWriteableRegistry namedWriteableRegistry; public BytesTransportResponse(ReleasableBytesReference bytes) { this.bytes = bytes; + this.version = null; + this.namedWriteableRegistry = null; + } + + public BytesTransportResponse(ReleasableBytesReference bytes, TransportVersion version, NamedWriteableRegistry namedWriteableRegistry) { + this.bytes = bytes; + this.version = version; + this.namedWriteableRegistry = namedWriteableRegistry; + } + + public boolean mustConvertResponseForVersion(TransportVersion targetVersion) { + return version != null && version.equals(targetVersion) == false; + } + + public StreamInput streamInput() throws IOException { + NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(bytes().streamInput(), namedWriteableRegistry); + in.setTransportVersion(version); + return in; } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index bc5dab2074a6e..7fe3c048d6485 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -18,6 +18,7 @@ import org.elasticsearch.tasks.TaskId; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Map; import java.util.concurrent.Executor; import java.util.function.Function; @@ -62,6 +63,14 @@ public Executor executor() { @Override public void handleResponse(TransportResponse response) { + if (response instanceof BytesTransportResponse btr && btr.mustConvertResponseForVersion(channel.getVersion())) { + try { + StreamInput streamInput = btr.streamInput(); + response = read(streamInput); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } channel.sendResponse(response); } From a8b04f27d8789e29826df236cd54853a7cda52de Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 2 Oct 2025 21:23:54 +0200 Subject: [PATCH 2/9] Update docs/changelog/135873.yaml --- docs/changelog/135873.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/135873.yaml diff --git a/docs/changelog/135873.yaml b/docs/changelog/135873.yaml new file mode 100644 index 0000000000000..81f9ea856bf03 --- /dev/null +++ b/docs/changelog/135873.yaml @@ -0,0 +1,5 @@ +pr: 135873 +summary: Convert `BytesTransportResponse` when proxying response from/to local node +area: "Network, Search" +type: bug +issues: [] From 4046fa3583f607d90dc55094edfd091714a85fdd Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 2 Oct 2025 23:01:30 +0200 Subject: [PATCH 3/9] iter --- docs/changelog/135873.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/135873.yaml b/docs/changelog/135873.yaml index 81f9ea856bf03..11311425296d7 100644 --- a/docs/changelog/135873.yaml +++ b/docs/changelog/135873.yaml @@ -1,5 +1,5 @@ pr: 135873 summary: Convert `BytesTransportResponse` when proxying response from/to local node -area: "Network, Search" +area: "Network" type: bug issues: [] From 02afc14d6c6c800b7c4da97172e8bd61ca7cd258 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 3 Oct 2025 12:06:33 +0200 Subject: [PATCH 4/9] iter --- .../SearchQueryThenFetchAsyncAction.java | 10 ++- .../action/search/SearchTransportService.java | 82 ++++++++++++++++--- .../TransportOpenPointInTimeAction.java | 8 +- .../action/search/TransportSearchAction.java | 2 +- .../transport/BytesTransportResponse.java | 24 +++--- .../transport/TransportActionProxy.java | 30 +++++-- .../transport/TransportActionProxyTests.java | 23 ++++-- .../ClearCcrRestoreSessionAction.java | 26 ++++-- 8 files changed, 153 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 65f6ef090445a..4aced264baa61 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -607,7 +607,13 @@ static void registerNodeSearchAction( } } ); - TransportActionProxy.registerProxyAction(transportService, NODE_SEARCH_ACTION_NAME, true, NodeQueryResponse::new); + TransportActionProxy.registerProxyAction( + transportService, + NODE_SEARCH_ACTION_NAME, + true, + NodeQueryResponse::new, + namedWriteableRegistry + ); } private static void releaseLocalContext( @@ -847,7 +853,7 @@ void onShardDone() { } ActionListener.respondAndRelease( channelListener, - new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion(), namedWriteableRegistry) + new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion()) ); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 1e0fa28889c97..fc885f7562a80 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -21,6 +21,7 @@ import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -384,7 +385,11 @@ public void writeTo(StreamOutput out) throws IOException { } } - public static void registerRequestHandler(TransportService transportService, SearchService searchService) { + public static void registerRequestHandler( + TransportService transportService, + SearchService searchService, + NamedWriteableRegistry namedWriteableRegistry + ) { final TransportRequestHandler freeContextHandler = (request, channel, task) -> { logger.trace("releasing search context [{}]", request.id()); boolean freed = searchService.freeReaderContext(request.id()); @@ -401,7 +406,8 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, false, - SearchFreeContextResponse::readFrom + SearchFreeContextResponse::readFrom, + namedWriteableRegistry ); // TODO: remove this handler once the lowest compatible version stops using it @@ -411,7 +417,13 @@ public static void registerRequestHandler(TransportService transportService, Sea OriginalIndices.readOriginalIndices(in); return res; }, freeContextHandler); - TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, false, SearchFreeContextResponse::readFrom); + TransportActionProxy.registerProxyAction( + transportService, + FREE_CONTEXT_ACTION_NAME, + false, + SearchFreeContextResponse::readFrom, + namedWriteableRegistry + ); transportService.registerRequestHandler( CLEAR_SCROLL_CONTEXTS_ACTION_NAME, @@ -426,7 +438,8 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME, false, - (in) -> ActionResponse.Empty.INSTANCE + (in) -> ActionResponse.Empty.INSTANCE, + namedWriteableRegistry ); transportService.registerRequestHandler( @@ -435,7 +448,7 @@ public static void registerRequestHandler(TransportService transportService, Sea ShardSearchRequest::new, (request, channel, task) -> searchService.executeDfsPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel)) ); - TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, true, DfsSearchResult::new); + TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, true, DfsSearchResult::new, namedWriteableRegistry); transportService.registerRequestHandler( QUERY_ACTION_NAME, @@ -451,7 +464,8 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService, QUERY_ACTION_NAME, true, - (request) -> ((ShardSearchRequest) request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new + (request) -> ((ShardSearchRequest) request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new, + namedWriteableRegistry ); transportService.registerRequestHandler( @@ -465,7 +479,13 @@ public static void registerRequestHandler(TransportService transportService, Sea channel.getVersion() ) ); - TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, true, QuerySearchResult::new); + TransportActionProxy.registerProxyAction( + transportService, + QUERY_ID_ACTION_NAME, + true, + QuerySearchResult::new, + namedWriteableRegistry + ); transportService.registerRequestHandler( QUERY_SCROLL_ACTION_NAME, @@ -478,7 +498,13 @@ public static void registerRequestHandler(TransportService transportService, Sea channel.getVersion() ) ); - TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, true, ScrollQuerySearchResult::new); + TransportActionProxy.registerProxyAction( + transportService, + QUERY_SCROLL_ACTION_NAME, + true, + ScrollQuerySearchResult::new, + namedWriteableRegistry + ); transportService.registerRequestHandler( QUERY_FETCH_SCROLL_ACTION_NAME, @@ -490,7 +516,13 @@ public static void registerRequestHandler(TransportService transportService, Sea new ChannelActionListener<>(channel) ) ); - TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, true, ScrollQueryFetchSearchResult::new); + TransportActionProxy.registerProxyAction( + transportService, + QUERY_FETCH_SCROLL_ACTION_NAME, + true, + ScrollQueryFetchSearchResult::new, + namedWriteableRegistry + ); final TransportRequestHandler rankShardFeatureRequest = (request, channel, task) -> searchService .executeRankFeaturePhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel)); @@ -500,7 +532,13 @@ public static void registerRequestHandler(TransportService transportService, Sea RankFeatureShardRequest::new, rankShardFeatureRequest ); - TransportActionProxy.registerProxyAction(transportService, RANK_FEATURE_SHARD_ACTION_NAME, true, RankFeatureResult::new); + TransportActionProxy.registerProxyAction( + transportService, + RANK_FEATURE_SHARD_ACTION_NAME, + true, + RankFeatureResult::new, + namedWriteableRegistry + ); final TransportRequestHandler shardFetchRequestHandler = (request, channel, task) -> searchService .executeFetchPhase(request, (SearchShardTask) task, new ChannelActionListener<>(channel)); @@ -510,7 +548,13 @@ public static void registerRequestHandler(TransportService transportService, Sea ShardFetchRequest::new, shardFetchRequestHandler ); - TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, true, FetchSearchResult::new); + TransportActionProxy.registerProxyAction( + transportService, + FETCH_ID_SCROLL_ACTION_NAME, + true, + FetchSearchResult::new, + namedWriteableRegistry + ); transportService.registerRequestHandler( FETCH_ID_ACTION_NAME, @@ -520,7 +564,13 @@ public static void registerRequestHandler(TransportService transportService, Sea ShardFetchSearchRequest::new, shardFetchRequestHandler ); - TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, true, FetchSearchResult::new); + TransportActionProxy.registerProxyAction( + transportService, + FETCH_ID_ACTION_NAME, + true, + FetchSearchResult::new, + namedWriteableRegistry + ); transportService.registerRequestHandler( QUERY_CAN_MATCH_NODE_NAME, @@ -528,7 +578,13 @@ public static void registerRequestHandler(TransportService transportService, Sea CanMatchNodeRequest::new, (request, channel, task) -> searchService.canMatch(request, new ChannelActionListener<>(channel)) ); - TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NODE_NAME, true, CanMatchNodeResponse::new); + TransportActionProxy.registerProxyAction( + transportService, + QUERY_CAN_MATCH_NODE_NAME, + true, + CanMatchNodeResponse::new, + namedWriteableRegistry + ); } private static Executor buildFreeContextExecutor(TransportService transportService) { diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java index 1038308bc6bf3..f9ae44c473014 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenPointInTimeAction.java @@ -94,7 +94,13 @@ public TransportOpenPointInTimeAction( ShardOpenReaderRequest::new, new ShardOpenReaderRequestHandler() ); - TransportActionProxy.registerProxyAction(transportService, OPEN_SHARD_READER_CONTEXT_NAME, false, ShardOpenReaderResponse::new); + TransportActionProxy.registerProxyAction( + transportService, + OPEN_SHARD_READER_CONTEXT_NAME, + false, + ShardOpenReaderResponse::new, + namedWriteableRegistry + ); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index cd8c561a4ad75..3385f53c9ed51 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -196,7 +196,7 @@ public TransportSearchAction( this.searchPhaseController = searchPhaseController; this.searchTransportService = searchTransportService; this.remoteClusterService = searchTransportService.getRemoteClusterService(); - SearchTransportService.registerRequestHandler(transportService, searchService); + SearchTransportService.registerRequestHandler(transportService, searchService, namedWriteableRegistry); SearchQueryThenFetchAsyncAction.registerNodeSearchAction( searchTransportService, searchService, diff --git a/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java b/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java index b48cd6210921c..227d4bd995bb5 100644 --- a/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java +++ b/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java @@ -11,8 +11,6 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.common.bytes.ReleasableBytesReference; -import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; -import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -25,28 +23,26 @@ public class BytesTransportResponse extends TransportResponse implements BytesTr private final ReleasableBytesReference bytes; private final TransportVersion version; - private final NamedWriteableRegistry namedWriteableRegistry; - public BytesTransportResponse(ReleasableBytesReference bytes) { - this.bytes = bytes; - this.version = null; - this.namedWriteableRegistry = null; - } - - public BytesTransportResponse(ReleasableBytesReference bytes, TransportVersion version, NamedWriteableRegistry namedWriteableRegistry) { + public BytesTransportResponse(ReleasableBytesReference bytes, TransportVersion version) { this.bytes = bytes; this.version = version; - this.namedWriteableRegistry = namedWriteableRegistry; } + /** + * Does the binary response need conversion before being sent to the provided target version? + */ public boolean mustConvertResponseForVersion(TransportVersion targetVersion) { return version != null && version.equals(targetVersion) == false; } + /** + * Returns a {@link StreamInput} configured to read the underlying bytes that this response holds. + */ public StreamInput streamInput() throws IOException { - NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(bytes().streamInput(), namedWriteableRegistry); - in.setTransportVersion(version); - return in; + StreamInput streamInput = bytes.streamInput(); + streamInput.setTransportVersion(version); + return streamInput; } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index 7fe3c048d6485..cf907dbe945ac 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -9,6 +9,8 @@ package org.elasticsearch.transport; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -37,15 +39,18 @@ private static class ProxyRequestHandler> responseFunction; + private final NamedWriteableRegistry namedWriteableRegistry; ProxyRequestHandler( TransportService service, String action, - Function> responseFunction + Function> responseFunction, + NamedWriteableRegistry namedWriteableRegistry ) { this.service = service; this.action = action; this.responseFunction = responseFunction; + this.namedWriteableRegistry = namedWriteableRegistry; } @Override @@ -63,10 +68,16 @@ public Executor executor() { @Override public void handleResponse(TransportResponse response) { + // This is a short term solution to ensure data node responses for batched search go back to the coordinating + // node in the expected format when a proxy data node proxies the request to itself. The response would otherwise + // be sent directly via DirectResponseChannel, skipping the read and write step that this handler normally performs. if (response instanceof BytesTransportResponse btr && btr.mustConvertResponseForVersion(channel.getVersion())) { try { - StreamInput streamInput = btr.streamInput(); - response = read(streamInput); + NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput( + btr.streamInput(), + namedWriteableRegistry + ); + response = responseFunction.apply(wrappedRequest).read(in); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -82,7 +93,7 @@ public void handleException(TransportException exp) { @Override public TransportResponse read(StreamInput in) throws IOException { if (in.getTransportVersion().equals(channel.getVersion()) && in.supportReadAllToReleasableBytesReference()) { - return new BytesTransportResponse(in.readAllToReleasableBytesReference()); + return new BytesTransportResponse(in.readAllToReleasableBytesReference(), channel.getVersion()); } else { return responseFunction.apply(wrappedRequest).read(in); } @@ -153,7 +164,9 @@ public static void registerProxyActionWithDynamicResponseType( TransportService service, String action, boolean cancellable, - Function> responseFunction + Function> responseFunction, + NamedWriteableRegistry namedWriteableRegistry + ) { RequestHandlerRegistry requestHandler = service.getRequestHandler(action); service.registerRequestHandler( @@ -164,7 +177,7 @@ public static void registerProxyActionWithDynamicResponseType( in -> cancellable ? new CancellableProxyRequest<>(in, requestHandler::newRequest) : new ProxyRequest<>(in, requestHandler::newRequest), - new ProxyRequestHandler<>(service, action, responseFunction) + new ProxyRequestHandler<>(service, action, responseFunction, namedWriteableRegistry) ); } @@ -176,9 +189,10 @@ public static void registerProxyAction( TransportService service, String action, boolean cancellable, - Writeable.Reader reader + Writeable.Reader reader, + NamedWriteableRegistry namedWriteableRegistry ) { - registerProxyActionWithDynamicResponseType(service, action, cancellable, request -> reader); + registerProxyActionWithDynamicResponseType(service, action, cancellable, request -> reader, namedWriteableRegistry); } private static final String PROXY_ACTION_PREFIX = "internal:transport/proxy/"; diff --git a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java index e1a525cab3f52..bf636221424e1 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.VersionInformation; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; @@ -110,6 +111,7 @@ private MockTransportService buildService(VersionInformation version, TransportV } public void testSendMessage() throws InterruptedException { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); serviceA.registerRequestHandler( "internal:test", EsExecutors.DIRECT_EXECUTOR_SERVICE, @@ -123,7 +125,7 @@ public void testSendMessage() throws InterruptedException { } ); final boolean cancellable = randomBoolean(); - TransportActionProxy.registerProxyAction(serviceA, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceA, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB); serviceB.registerRequestHandler( @@ -139,7 +141,7 @@ public void testSendMessage() throws InterruptedException { assertThat(response.hasReferences(), equalTo(false)); } ); - TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); AbstractSimpleTransportTestCase.connectToNode(serviceB, nodeC); serviceC.registerRequestHandler( "internal:test", @@ -155,7 +157,7 @@ public void testSendMessage() throws InterruptedException { } ); - TransportActionProxy.registerProxyAction(serviceC, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceC, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); // Node A -> Node B -> Node C: different versions - serialize the response { final List responses = Collections.synchronizedList(new ArrayList<>()); @@ -277,7 +279,13 @@ public void testSendLocalRequest() throws Exception { latch.countDown(); } }); - TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction( + serviceB, + "internal:test", + cancellable, + SimpleTestResponse::new, + new NamedWriteableRegistry(Collections.emptyList()) + ); AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB); // Node A -> Proxy Node B (Local execution) @@ -324,6 +332,7 @@ public void handleException(TransportException exp) { } public void testException() throws InterruptedException { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); boolean cancellable = randomBoolean(); serviceA.registerRequestHandler( "internal:test", @@ -335,7 +344,7 @@ public void testException() throws InterruptedException { channel.sendResponse(response); } ); - TransportActionProxy.registerProxyAction(serviceA, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceA, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB); serviceB.registerRequestHandler( @@ -348,7 +357,7 @@ public void testException() throws InterruptedException { channel.sendResponse(response); } ); - TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); AbstractSimpleTransportTestCase.connectToNode(serviceB, nodeC); serviceC.registerRequestHandler( "internal:test", @@ -358,7 +367,7 @@ public void testException() throws InterruptedException { throw new ElasticsearchException("greetings from TS_C"); } ); - TransportActionProxy.registerProxyAction(serviceC, "internal:test", cancellable, SimpleTestResponse::new); + TransportActionProxy.registerProxyAction(serviceC, "internal:test", cancellable, SimpleTestResponse::new, namedWriteableRegistry); CountDownLatch latch = new CountDownLatch(1); serviceA.sendRequest( diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 66210d43f2f7a..e586ed8679062 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.RemoteClusterActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; @@ -51,7 +52,8 @@ private TransportDeleteCcrRestoreSessionAction( String actionName, ActionFilters actionFilters, TransportService transportService, - CcrRestoreSourceService ccrRestoreService + CcrRestoreSourceService ccrRestoreService, + NamedWriteableRegistry namedWriteableRegistry ) { super( actionName, @@ -60,7 +62,13 @@ private TransportDeleteCcrRestoreSessionAction( ClearCcrRestoreSessionRequest::new, transportService.getThreadPool().executor(ThreadPool.Names.GENERIC) ); - TransportActionProxy.registerProxyAction(transportService, actionName, false, in -> ActionResponse.Empty.INSTANCE); + TransportActionProxy.registerProxyAction( + transportService, + actionName, + false, + in -> ActionResponse.Empty.INSTANCE, + namedWriteableRegistry + ); this.ccrRestoreService = ccrRestoreService; } @@ -80,16 +88,22 @@ public static class InternalTransportAction extends TransportDeleteCcrRestoreSes public InternalTransportAction( ActionFilters actionFilters, TransportService transportService, - CcrRestoreSourceService ccrRestoreService + CcrRestoreSourceService ccrRestoreService, + NamedWriteableRegistry namedWriteableRegistry ) { - super(INTERNAL_NAME, actionFilters, transportService, ccrRestoreService); + super(INTERNAL_NAME, actionFilters, transportService, ccrRestoreService, namedWriteableRegistry); } } public static class TransportAction extends TransportDeleteCcrRestoreSessionAction { @Inject - public TransportAction(ActionFilters actionFilters, TransportService transportService, CcrRestoreSourceService ccrRestoreService) { - super(NAME, actionFilters, transportService, ccrRestoreService); + public TransportAction( + ActionFilters actionFilters, + TransportService transportService, + CcrRestoreSourceService ccrRestoreService, + NamedWriteableRegistry namedWriteableRegistry + ) { + super(NAME, actionFilters, transportService, ccrRestoreService, namedWriteableRegistry); } @Override From 436d394b89f95d00896df8fafce56b1722adaf28 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 3 Oct 2025 12:08:51 +0200 Subject: [PATCH 5/9] iter --- .../org/elasticsearch/transport/BytesTransportResponse.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java b/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java index 227d4bd995bb5..528199b335373 100644 --- a/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java +++ b/server/src/main/java/org/elasticsearch/transport/BytesTransportResponse.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; +import java.util.Objects; /** * A specialized, bytes only response, that can potentially be optimized on the network layer. @@ -26,14 +27,14 @@ public class BytesTransportResponse extends TransportResponse implements BytesTr public BytesTransportResponse(ReleasableBytesReference bytes, TransportVersion version) { this.bytes = bytes; - this.version = version; + this.version = Objects.requireNonNull(version); } /** * Does the binary response need conversion before being sent to the provided target version? */ public boolean mustConvertResponseForVersion(TransportVersion targetVersion) { - return version != null && version.equals(targetVersion) == false; + return version.equals(targetVersion) == false; } /** From 946a6fa3f865089cb94e0ccdc17203403a8cb70e Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 3 Oct 2025 12:18:39 +0200 Subject: [PATCH 6/9] iter --- .../GetCcrRestoreFileChunkAction.java | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java index 1fb4fb6f0fb72..035eb88fb11b8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.BigArrays; @@ -64,7 +65,8 @@ private TransportGetCcrRestoreFileChunkAction( BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters, - CcrRestoreSourceService restoreSourceService + CcrRestoreSourceService restoreSourceService, + NamedWriteableRegistry namedWriteableRegistry ) { super( actionName, @@ -73,7 +75,13 @@ private TransportGetCcrRestoreFileChunkAction( GetCcrRestoreFileChunkRequest::new, transportService.getThreadPool().executor(ThreadPool.Names.GENERIC) ); - TransportActionProxy.registerProxyAction(transportService, actionName, false, GetCcrRestoreFileChunkResponse::new); + TransportActionProxy.registerProxyAction( + transportService, + actionName, + false, + GetCcrRestoreFileChunkResponse::new, + namedWriteableRegistry + ); this.restoreSourceService = restoreSourceService; this.bigArrays = bigArrays; } @@ -111,9 +119,10 @@ public InternalTransportAction( BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters, - CcrRestoreSourceService restoreSourceService + CcrRestoreSourceService restoreSourceService, + NamedWriteableRegistry namedWriteableRegistry ) { - super(INTERNAL_NAME, bigArrays, transportService, actionFilters, restoreSourceService); + super(INTERNAL_NAME, bigArrays, transportService, actionFilters, restoreSourceService, namedWriteableRegistry); } } @@ -123,9 +132,10 @@ public TransportAction( BigArrays bigArrays, TransportService transportService, ActionFilters actionFilters, - CcrRestoreSourceService restoreSourceService + CcrRestoreSourceService restoreSourceService, + NamedWriteableRegistry namedWriteableRegistry ) { - super(NAME, bigArrays, transportService, actionFilters, restoreSourceService); + super(NAME, bigArrays, transportService, actionFilters, restoreSourceService, namedWriteableRegistry); } @Override From 01405da5064533268362aa31e56bedaeeee29397 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 3 Oct 2025 12:22:29 +0200 Subject: [PATCH 7/9] iter --- .../ClearCcrRestoreSessionActionTests.java | 22 ++++++++++++++++--- .../GetCcrRestoreFileChunkActionTests.java | 14 +++++++++--- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionActionTests.java index 540a6a8f7bcb5..f2bdda160bbf6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionActionTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; @@ -23,6 +24,8 @@ import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver; import org.elasticsearch.xpack.core.security.authz.privilege.IndexPrivilege; +import java.util.Collections; + import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -54,21 +57,29 @@ public void testPrivilegeForActions() { } public void testActionNames() { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); final ActionFilters actionFilters = mock(ActionFilters.class); final TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(); final CcrRestoreSourceService ccrRestoreSourceService = mock(CcrRestoreSourceService.class); - final var action = new ClearCcrRestoreSessionAction.TransportAction(actionFilters, transportService, ccrRestoreSourceService); + final var action = new ClearCcrRestoreSessionAction.TransportAction( + actionFilters, + transportService, + ccrRestoreSourceService, + namedWriteableRegistry + ); assertThat(action.actionName, equalTo(ClearCcrRestoreSessionAction.NAME)); final var internalAction = new ClearCcrRestoreSessionAction.InternalTransportAction( actionFilters, transportService, - ccrRestoreSourceService + ccrRestoreSourceService, + namedWriteableRegistry ); assertThat(internalAction.actionName, equalTo(ClearCcrRestoreSessionAction.INTERNAL_NAME)); } public void testRequestedShardIdMustBeConsistentWithSessionShardId() { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); final ActionFilters actionFilters = mock(ActionFilters.class); final TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(); final CcrRestoreSourceService ccrRestoreSourceService = mock(CcrRestoreSourceService.class); @@ -87,7 +98,12 @@ public void testRequestedShardIdMustBeConsistentWithSessionShardId() { } }).when(ccrRestoreSourceService).ensureSessionShardIdConsistency(anyString(), any()); - final var action = new ClearCcrRestoreSessionAction.TransportAction(actionFilters, transportService, ccrRestoreSourceService); + final var action = new ClearCcrRestoreSessionAction.TransportAction( + actionFilters, + transportService, + ccrRestoreSourceService, + namedWriteableRegistry + ); final String sessionUUID = UUIDs.randomBase64UUID(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java index 61866dbf2029f..1b938b9150c6f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkActionTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; @@ -27,6 +28,8 @@ import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver; import org.elasticsearch.xpack.core.security.authz.privilege.IndexPrivilege; +import java.util.Collections; + import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -58,6 +61,7 @@ public void testPrivilegeForActions() { } public void testActionNames() { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); final ActionFilters actionFilters = mock(ActionFilters.class); final BigArrays bigArrays = mock(BigArrays.class); final TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(); @@ -67,7 +71,8 @@ public void testActionNames() { bigArrays, transportService, actionFilters, - ccrRestoreSourceService + ccrRestoreSourceService, + namedWriteableRegistry ); assertThat(action.actionName, equalTo(GetCcrRestoreFileChunkAction.NAME)); @@ -75,12 +80,14 @@ public void testActionNames() { bigArrays, transportService, actionFilters, - ccrRestoreSourceService + ccrRestoreSourceService, + namedWriteableRegistry ); assertThat(internalAction.actionName, equalTo(GetCcrRestoreFileChunkAction.INTERNAL_NAME)); } public void testRequestedShardIdMustBeConsistentWithSessionShardId() { + NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList()); final ActionFilters actionFilters = mock(ActionFilters.class); final BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), ByteSizeValue.ofBytes(1024)); final TransportService transportService = MockUtils.setupTransportServiceWithThreadpoolExecutor(); @@ -108,7 +115,8 @@ public void testRequestedShardIdMustBeConsistentWithSessionShardId() { bigArrays, transportService, actionFilters, - ccrRestoreSourceService + ccrRestoreSourceService, + namedWriteableRegistry ); final String expectedFileName = randomAlphaOfLengthBetween(3, 12); From cbc8e21cd74fc3c9f703cfa978e6c575ea23a03f Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 3 Oct 2025 13:09:43 +0200 Subject: [PATCH 8/9] Update server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java Co-authored-by: David Turner --- .../java/org/elasticsearch/transport/TransportActionProxy.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index cf907dbe945ac..12c65375cb6c0 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -93,7 +93,7 @@ public void handleException(TransportException exp) { @Override public TransportResponse read(StreamInput in) throws IOException { if (in.getTransportVersion().equals(channel.getVersion()) && in.supportReadAllToReleasableBytesReference()) { - return new BytesTransportResponse(in.readAllToReleasableBytesReference(), channel.getVersion()); + return new BytesTransportResponse(in.readAllToReleasableBytesReference(), in.getTransportVersion()); } else { return responseFunction.apply(wrappedRequest).read(in); } From 52c782587c334fc79287236fb88d28dbfb5b3ab3 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 3 Oct 2025 14:29:08 +0200 Subject: [PATCH 9/9] iter --- .../transport/TransportActionProxy.java | 15 +- .../transport/TransportActionProxyTests.java | 159 ++++++++++++++++++ 2 files changed, 170 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index 12c65375cb6c0..d53ab209c3960 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -72,17 +72,24 @@ public void handleResponse(TransportResponse response) { // node in the expected format when a proxy data node proxies the request to itself. The response would otherwise // be sent directly via DirectResponseChannel, skipping the read and write step that this handler normally performs. if (response instanceof BytesTransportResponse btr && btr.mustConvertResponseForVersion(channel.getVersion())) { - try { + try ( NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput( btr.streamInput(), namedWriteableRegistry - ); - response = responseFunction.apply(wrappedRequest).read(in); + ) + ) { + TransportResponse convertedResponse = responseFunction.apply(wrappedRequest).read(in); + try { + channel.sendResponse(convertedResponse); + } finally { + convertedResponse.decRef(); + } } catch (IOException e) { throw new UncheckedIOException(e); } + } else { + channel.sendResponse(response); } - channel.sendResponse(response); } @Override diff --git a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java index bf636221424e1..c4146634f10c6 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.VersionInformation; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; @@ -283,6 +284,158 @@ public void testSendLocalRequest() throws Exception { serviceB, "internal:test", cancellable, + // For a proxy node proxying to itself, the response is sent directly, without it being read by the proxy layer + r -> { throw new AssertionError(); }, + new NamedWriteableRegistry(Collections.emptyList()) + ); + AbstractSimpleTransportTestCase.connectToNode(serviceA, nodeB); + + // Node A -> Proxy Node B (Local execution) + serviceA.sendRequest( + nodeB, + TransportActionProxy.getProxyAction("internal:test"), + TransportActionProxy.wrapRequest(nodeB, new SimpleTestRequest("TS_A", cancellable)), // Request + new TransportResponseHandler() { + @Override + public SimpleTestResponse read(StreamInput in) throws IOException { + return new SimpleTestResponse(in); + } + + @Override + public Executor executor() { + return TransportResponseHandler.TRANSPORT_WORKER; + } + + @Override + public void handleResponse(SimpleTestResponse response) { + try { + assertEquals("TS_B", response.targetNode); + } finally { + latch.countDown(); + } + } + + @Override + public void handleException(TransportException exp) { + try { + throw new AssertionError(exp); + } finally { + latch.countDown(); + } + } + } + ); + latch.await(); + + final var responseInstance = response.get(); + assertThat(responseInstance, notNullValue()); + responseInstance.decRef(); + assertBusy(() -> assertThat(responseInstance.hasReferences(), equalTo(false))); + } + + public void testSendLocalRequestBytesTransportResponseSameVersion() throws Exception { + final AtomicReference response = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(2); + + final boolean cancellable = randomBoolean(); + serviceB.registerRequestHandler("internal:test", randomExecutor(threadPool), SimpleTestRequest::new, (request, channel, task) -> { + try { + assertThat(task instanceof CancellableTask, equalTo(cancellable)); + assertEquals(request.sourceNode, "TS_A"); + + SimpleTestResponse tsB = new SimpleTestResponse("TS_B"); + try (RecyclerBytesStreamOutput out = serviceB.newNetworkBytesStream()) { + out.setTransportVersion(transportVersion1); + tsB.writeTo(out); + // simulate what happens in SearchQueryThenFetchAsyncAction with NodeQueryResponse + final BytesTransportResponse responseB = new BytesTransportResponse(out.moveToBytesReference(), transportVersion1); + channel.sendResponse(responseB); + response.set(responseB); + } finally { + tsB.decRef(); + } + } finally { + latch.countDown(); + } + }); + TransportActionProxy.registerProxyAction(serviceB, "internal:test", cancellable, in -> { + throw new AssertionError("read should not be called for local proxying when versions align"); + }, new NamedWriteableRegistry(Collections.emptyList())); + AbstractSimpleTransportTestCase.connectToNode(serviceC, nodeB); + + // Node C -> Proxy Node B (Local execution) + serviceC.sendRequest( + nodeB, + TransportActionProxy.getProxyAction("internal:test"), + TransportActionProxy.wrapRequest(nodeB, new SimpleTestRequest("TS_A", cancellable)), // Request + new TransportResponseHandler() { + @Override + public SimpleTestResponse read(StreamInput in) throws IOException { + return new SimpleTestResponse(in); + } + + @Override + public Executor executor() { + return TransportResponseHandler.TRANSPORT_WORKER; + } + + @Override + public void handleResponse(SimpleTestResponse response) { + try { + assertEquals("TS_B", response.targetNode); + } finally { + latch.countDown(); + } + } + + @Override + public void handleException(TransportException exp) { + try { + throw new AssertionError(exp); + } finally { + latch.countDown(); + } + } + } + ); + latch.await(); + + final var responseInstance = response.get(); + assertThat(responseInstance, notNullValue()); + responseInstance.decRef(); + assertBusy(() -> assertThat(responseInstance.hasReferences(), equalTo(false))); + } + + public void testSendLocalRequestBytesTransportResponseDifferentVersions() throws Exception { + final AtomicReference response = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(2); + + final boolean cancellable = randomBoolean(); + serviceB.registerRequestHandler("internal:test", randomExecutor(threadPool), SimpleTestRequest::new, (request, channel, task) -> { + try { + assertThat(task instanceof CancellableTask, equalTo(cancellable)); + assertEquals(request.sourceNode, "TS_A"); + + SimpleTestResponse tsB = new SimpleTestResponse("TS_B"); + try (RecyclerBytesStreamOutput out = serviceB.newNetworkBytesStream()) { + out.setTransportVersion(transportVersion1); + tsB.writeTo(out); + // simulate what happens in SearchQueryThenFetchAsyncAction with NodeQueryResponse + final BytesTransportResponse responseB = new BytesTransportResponse(out.moveToBytesReference(), transportVersion1); + channel.sendResponse(responseB); + response.set(responseB); + } finally { + tsB.decRef(); + } + } finally { + latch.countDown(); + } + }); + TransportActionProxy.registerProxyAction( + serviceB, + "internal:test", + cancellable, + // this is called by the conversion layer in ProxyRequestHandler SimpleTestResponse::new, new NamedWriteableRegistry(Collections.emptyList()) ); @@ -459,11 +612,17 @@ protected void closeInternal() {} SimpleTestResponse(StreamInput in) throws IOException { this.targetNode = in.readString(); + if (in.getTransportVersion().supports(transportVersion1)) { + in.readBoolean(); + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(targetNode); + if (out.getTransportVersion().supports(transportVersion1)) { + out.writeBoolean(true); + } } @Override