Skip to content

Commit bb32f75

Browse files
Segment Replication - Remove redundant replica doc parsing on writes. (#7279) (#7281)
This change removes unnecessary doc parsing currently performed on replicas by updating applyIndexOperationOnReplicas to pass a doc id from the primary. (cherry picked from commit 66e49a6) Signed-off-by: Marc Handalian <handalm@amazon.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent c1b986f commit bb32f75

File tree

8 files changed

+78
-1
lines changed

8 files changed

+78
-1
lines changed

server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -857,6 +857,7 @@ private static Engine.Result performOpOnReplica(
857857
indexRequest.routing()
858858
);
859859
result = replica.applyIndexOperationOnReplica(
860+
primaryResponse.getId(),
860861
primaryResponse.getSeqNo(),
861862
primaryResponse.getPrimaryTerm(),
862863
primaryResponse.getVersion(),

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -916,13 +916,31 @@ public Engine.IndexResult applyIndexOperationOnPrimary(
916916
}
917917

918918
public Engine.IndexResult applyIndexOperationOnReplica(
919+
String id,
919920
long seqNo,
920921
long opPrimaryTerm,
921922
long version,
922923
long autoGeneratedTimeStamp,
923924
boolean isRetry,
924925
SourceToParse sourceToParse
925926
) throws IOException {
927+
if (indexSettings.isSegRepEnabled()) {
928+
Engine.Index index = new Engine.Index(
929+
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
930+
new ParsedDocument(null, null, id, null, null, sourceToParse.source(), sourceToParse.getXContentType(), null),
931+
seqNo,
932+
opPrimaryTerm,
933+
version,
934+
null,
935+
Engine.Operation.Origin.REPLICA,
936+
System.nanoTime(),
937+
autoGeneratedTimeStamp,
938+
isRetry,
939+
UNASSIGNED_SEQ_NO,
940+
0
941+
);
942+
return getEngine().index(index);
943+
}
926944
return applyIndexOperation(
927945
getEngine(),
928946
seqNo,
@@ -1133,6 +1151,21 @@ public Engine.DeleteResult applyDeleteOperationOnPrimary(
11331151
}
11341152

11351153
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long opPrimaryTerm, long version, String id) throws IOException {
1154+
if (indexSettings.isSegRepEnabled()) {
1155+
final Engine.Delete delete = new Engine.Delete(
1156+
id,
1157+
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
1158+
seqNo,
1159+
opPrimaryTerm,
1160+
version,
1161+
null,
1162+
Engine.Operation.Origin.REPLICA,
1163+
System.nanoTime(),
1164+
UNASSIGNED_SEQ_NO,
1165+
0
1166+
);
1167+
return getEngine().delete(delete);
1168+
}
11361169
return applyDeleteOperation(
11371170
getEngine(),
11381171
seqNo,

server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception {
152152
final IndexShard remainingReplica = shards.getReplicas().get(1);
153153
// slip the extra document into the replica
154154
remainingReplica.applyIndexOperationOnReplica(
155+
"id",
155156
remainingReplica.getLocalCheckpoint() + 1,
156157
remainingReplica.getOperationPrimaryTerm(),
157158
1,

server/src/test/java/org/opensearch/index/shard/IndexShardTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@
169169
import java.util.Locale;
170170
import java.util.Map;
171171
import java.util.Set;
172+
import java.util.UUID;
172173
import java.util.concurrent.BrokenBarrierException;
173174
import java.util.concurrent.CountDownLatch;
174175
import java.util.concurrent.CyclicBarrier;
@@ -2236,6 +2237,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
22362237
shard.applyDeleteOperationOnReplica(1, primaryTerm, 2, "id");
22372238
shard.getEngine().rollTranslogGeneration(); // isolate the delete in it's own generation
22382239
shard.applyIndexOperationOnReplica(
2240+
UUID.randomUUID().toString(),
22392241
0,
22402242
primaryTerm,
22412243
1,
@@ -2244,6 +2246,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
22442246
new SourceToParse(shard.shardId().getIndexName(), "id", new BytesArray("{}"), XContentType.JSON)
22452247
);
22462248
shard.applyIndexOperationOnReplica(
2249+
UUID.randomUUID().toString(),
22472250
3,
22482251
primaryTerm,
22492252
3,
@@ -2254,6 +2257,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
22542257
// Flushing a new commit with local checkpoint=1 allows to skip the translog gen #1 in recovery.
22552258
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
22562259
shard.applyIndexOperationOnReplica(
2260+
UUID.randomUUID().toString(),
22572261
2,
22582262
primaryTerm,
22592263
3,
@@ -2262,6 +2266,7 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException {
22622266
new SourceToParse(shard.shardId().getIndexName(), "id-2", new BytesArray("{}"), XContentType.JSON)
22632267
);
22642268
shard.applyIndexOperationOnReplica(
2269+
UUID.randomUUID().toString(),
22652270
5,
22662271
primaryTerm,
22672272
1,
@@ -2409,6 +2414,7 @@ public void testRecoverFromStoreWithNoOps() throws IOException {
24092414
updateMappings(otherShard, shard.indexSettings().getIndexMetadata());
24102415
SourceToParse sourceToParse = new SourceToParse(shard.shardId().getIndexName(), "1", new BytesArray("{}"), XContentType.JSON);
24112416
otherShard.applyIndexOperationOnReplica(
2417+
UUID.randomUUID().toString(),
24122418
1,
24132419
otherShard.getOperationPrimaryTerm(),
24142420
1,
@@ -2536,6 +2542,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
25362542
final String indexName = shard.shardId().getIndexName();
25372543
// Index #0, index #1
25382544
shard.applyIndexOperationOnReplica(
2545+
UUID.randomUUID().toString(),
25392546
0,
25402547
primaryTerm,
25412548
1,
@@ -2546,6 +2553,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
25462553
flushShard(shard);
25472554
shard.updateGlobalCheckpointOnReplica(0, "test"); // stick the global checkpoint here.
25482555
shard.applyIndexOperationOnReplica(
2556+
UUID.randomUUID().toString(),
25492557
1,
25502558
primaryTerm,
25512559
1,
@@ -2558,6 +2566,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
25582566
shard.getEngine().rollTranslogGeneration();
25592567
shard.markSeqNoAsNoop(1, primaryTerm, "test");
25602568
shard.applyIndexOperationOnReplica(
2569+
UUID.randomUUID().toString(),
25612570
2,
25622571
primaryTerm,
25632572
1,
@@ -3948,6 +3957,7 @@ private Result indexOnReplicaWithGaps(final IndexShard indexShard, final int ope
39483957
XContentType.JSON
39493958
);
39503959
indexShard.applyIndexOperationOnReplica(
3960+
UUID.randomUUID().toString(),
39513961
i,
39523962
indexShard.getOperationPrimaryTerm(),
39533963
1,
@@ -4577,6 +4587,7 @@ public void testDoNotTrimCommitsWhenOpenReadOnlyEngine() throws Exception {
45774587
seqNo++; // create gaps in sequence numbers
45784588
}
45794589
shard.applyIndexOperationOnReplica(
4590+
UUID.randomUUID().toString(),
45804591
seqNo,
45814592
shard.getOperationPrimaryTerm(),
45824593
1,

server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.opensearch.index.replication.TestReplicationSource;
3939
import org.opensearch.index.store.Store;
4040
import org.opensearch.index.store.StoreFileMetadata;
41+
import org.opensearch.index.translog.SnapshotMatchers;
42+
import org.opensearch.index.translog.Translog;
4143
import org.opensearch.indices.recovery.RecoverySettings;
4244
import org.opensearch.indices.recovery.RecoveryTarget;
4345
import org.opensearch.indices.replication.CheckpointInfoResponse;
@@ -176,6 +178,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
176178
shards.index(new IndexRequest(index.getName()).id(String.valueOf(i)).source("{\"foo\": \"bar\"}", XContentType.JSON));
177179
}
178180

181+
assertEqualTranslogOperations(shards, primaryShard);
179182
primaryShard.refresh("Test");
180183
replicateSegments(primaryShard, shards.getReplicas());
181184

@@ -189,7 +192,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
189192
);
190193
}
191194
}
192-
195+
assertEqualTranslogOperations(shards, primaryShard);
193196
primaryShard.refresh("Test");
194197
replicateSegments(primaryShard, shards.getReplicas());
195198
shards.assertAllEqual(numDocs);
@@ -204,6 +207,7 @@ public void testSegmentReplication_Index_Update_Delete() throws Exception {
204207
shards.delete(new DeleteRequest(index.getName()).id(String.valueOf(i)));
205208
}
206209
}
210+
assertEqualTranslogOperations(shards, primaryShard);
207211
primaryShard.refresh("Test");
208212
replicateSegments(primaryShard, shards.getReplicas());
209213
final List<DocIdSeqNoAndSource> docsAfterDelete = getDocIdAndSeqNos(primaryShard);
@@ -753,6 +757,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception {
753757
for (IndexShard shard : shards.getReplicas()) {
754758
assertDocCounts(shard, numDocs, numDocs);
755759
}
760+
assertEqualTranslogOperations(shards, oldPrimary);
756761

757762
// 2. Create ops that are in the replica's xlog, not in the index.
758763
// index some more into both but don't replicate. replica will have only numDocs searchable, but should have totalDocs
@@ -761,6 +766,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception {
761766
final int totalDocs = numDocs + additonalDocs;
762767

763768
assertDocCounts(oldPrimary, totalDocs, totalDocs);
769+
assertEqualTranslogOperations(shards, oldPrimary);
764770
for (IndexShard shard : shards.getReplicas()) {
765771
assertDocCounts(shard, totalDocs, numDocs);
766772
}
@@ -1083,4 +1089,20 @@ private void assertEqualCommittedSegments(IndexShard primary, IndexShard... repl
10831089
assertTrue(diff.missing.isEmpty());
10841090
}
10851091
}
1092+
1093+
private void assertEqualTranslogOperations(ReplicationGroup shards, IndexShard primaryShard) throws IOException {
1094+
try (final Translog.Snapshot snapshot = getTranslog(primaryShard).newSnapshot()) {
1095+
List<Translog.Operation> operations = new ArrayList<>();
1096+
Translog.Operation op;
1097+
while ((op = snapshot.next()) != null) {
1098+
final Translog.Operation newOp = op;
1099+
operations.add(newOp);
1100+
}
1101+
for (IndexShard replica : shards.getReplicas()) {
1102+
try (final Translog.Snapshot replicaSnapshot = getTranslog(replica).newSnapshot()) {
1103+
assertThat(replicaSnapshot, SnapshotMatchers.containsOperationsInAnyOrder(operations));
1104+
}
1105+
}
1106+
}
1107+
}
10861108
}

server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import java.util.Collections;
6666
import java.util.List;
6767
import java.util.Optional;
68+
import java.util.UUID;
6869
import java.util.concurrent.ArrayBlockingQueue;
6970
import java.util.concurrent.BlockingQueue;
7071
import java.util.concurrent.CyclicBarrier;
@@ -182,6 +183,7 @@ private SeqNoStats populateRandomData(IndexShard shard) throws IOException {
182183
Randomness.shuffle(seqNos);
183184
for (long seqNo : seqNos) {
184185
shard.applyIndexOperationOnReplica(
186+
UUID.randomUUID().toString(),
185187
seqNo,
186188
1,
187189
shard.getOperationPrimaryTerm(),

server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import java.util.HashMap;
7979
import java.util.List;
8080
import java.util.Map;
81+
import java.util.UUID;
8182
import java.util.concurrent.CountDownLatch;
8283
import java.util.concurrent.Future;
8384
import java.util.concurrent.atomic.AtomicReference;
@@ -181,6 +182,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
181182
orgReplica.flush(new FlushRequest().force(true)); // isolate delete#1 in its own translog generation and lucene segment
182183
// index #0
183184
orgReplica.applyIndexOperationOnReplica(
185+
UUID.randomUUID().toString(),
184186
0,
185187
primaryTerm,
186188
1,
@@ -190,6 +192,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
190192
);
191193
// index #3
192194
orgReplica.applyIndexOperationOnReplica(
195+
UUID.randomUUID().toString(),
193196
3,
194197
primaryTerm,
195198
1,
@@ -201,6 +204,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
201204
orgReplica.flush(new FlushRequest().force(true).waitIfOngoing(true));
202205
// index #2
203206
orgReplica.applyIndexOperationOnReplica(
207+
UUID.randomUUID().toString(),
204208
2,
205209
primaryTerm,
206210
1,
@@ -212,6 +216,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception {
212216
orgReplica.updateGlobalCheckpointOnReplica(3L, "test");
213217
// index #5 -> force NoOp #4.
214218
orgReplica.applyIndexOperationOnReplica(
219+
UUID.randomUUID().toString(),
215220
5,
216221
primaryTerm,
217222
1,

test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@
151151
import java.util.List;
152152
import java.util.Map;
153153
import java.util.Set;
154+
import java.util.UUID;
154155
import java.util.concurrent.CountDownLatch;
155156
import java.util.concurrent.ExecutionException;
156157
import java.util.concurrent.TimeUnit;
@@ -1122,6 +1123,7 @@ protected Engine.IndexResult indexDoc(IndexShard shard, String id, String source
11221123
final long seqNo = shard.seqNoStats().getMaxSeqNo() + 1;
11231124
shard.advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); // manually replicate max_seq_no_of_updates
11241125
result = shard.applyIndexOperationOnReplica(
1126+
UUID.randomUUID().toString(),
11251127
seqNo,
11261128
shard.getOperationPrimaryTerm(),
11271129
0,

0 commit comments

Comments
 (0)