@@ -188,10 +188,11 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
188
188
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup ("kafka.log.remote" , "RemoteLogManager" );
189
189
190
190
// The endpoint for remote log metadata manager to connect to
191
- private Optional <Endpoint > endpoint = Optional .empty ();
191
+ private final Optional <Endpoint > endpoint ;
192
+ private final Timer remoteReadTimer ;
193
+
192
194
private boolean closed = false ;
193
195
194
- private final Timer remoteReadTimer ;
195
196
private volatile DelayedOperationPurgatory <DelayedRemoteListOffsets > delayedRemoteListOffsetsPurgatory ;
196
197
197
198
/**
@@ -1022,7 +1023,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, RemoteLogSegment
1022
1023
1023
1024
List <EpochEntry > epochEntries = getLeaderEpochEntries (log , segment .baseOffset (), nextSegmentBaseOffset );
1024
1025
Map <Integer , Long > segmentLeaderEpochs = new HashMap <>(epochEntries .size ());
1025
- epochEntries .forEach (entry -> segmentLeaderEpochs .put (entry .epoch , entry .startOffset ));
1026
+ epochEntries .forEach (entry -> segmentLeaderEpochs .put (entry .epoch () , entry .startOffset () ));
1026
1027
1027
1028
boolean isTxnIdxEmpty = segment .txnIndex ().isEmpty ();
1028
1029
RemoteLogSegmentMetadata copySegmentStartedRlsm = new RemoteLogSegmentMetadata (segmentId , segment .baseOffset (), endOffset ,
@@ -1083,7 +1084,7 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, RemoteLogSegment
1083
1084
1084
1085
// `epochEntries` cannot be empty, there is a pre-condition validation in RemoteLogSegmentMetadata
1085
1086
// constructor
1086
- int lastEpochInSegment = epochEntries .get (epochEntries .size () - 1 ).epoch ;
1087
+ int lastEpochInSegment = epochEntries .get (epochEntries .size () - 1 ).epoch () ;
1087
1088
copiedOffsetOption = Optional .of (new OffsetAndEpoch (endOffset , lastEpochInSegment ));
1088
1089
// Update the highest offset in remote storage for this partition's log so that the local log segments
1089
1090
// are not deleted before they are copied to remote storage.
@@ -1222,7 +1223,7 @@ private boolean deleteLogSegmentsDueToLeaderEpochCacheTruncation(EpochEntry earl
1222
1223
RemoteLogSegmentMetadata metadata )
1223
1224
throws RemoteStorageException , ExecutionException , InterruptedException {
1224
1225
boolean isSegmentDeleted = deleteRemoteLogSegment (metadata ,
1225
- ignored -> metadata .segmentLeaderEpochs ().keySet ().stream ().allMatch (epoch -> epoch < earliestEpochEntry .epoch ));
1226
+ ignored -> metadata .segmentLeaderEpochs ().keySet ().stream ().allMatch (epoch -> epoch < earliestEpochEntry .epoch () ));
1226
1227
if (isSegmentDeleted ) {
1227
1228
logger .info ("Deleted remote log segment {} due to leader-epoch-cache truncation. " +
1228
1229
"Current earliest-epoch-entry: {}, segment-end-offset: {} and segment-epochs: {}" ,
@@ -1391,7 +1392,7 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE
1391
1392
if (earliestEpochEntryOptional .isPresent ()) {
1392
1393
EpochEntry earliestEpochEntry = earliestEpochEntryOptional .get ();
1393
1394
Iterator <Integer > epochsToClean = remoteLeaderEpochs .stream ()
1394
- .filter (remoteEpoch -> remoteEpoch < earliestEpochEntry .epoch )
1395
+ .filter (remoteEpoch -> remoteEpoch < earliestEpochEntry .epoch () )
1395
1396
.iterator ();
1396
1397
1397
1398
List <RemoteLogSegmentMetadata > listOfSegmentsToBeCleaned = new ArrayList <>();
@@ -1647,11 +1648,11 @@ static NavigableMap<Integer, Long> buildFilteredLeaderEpochMap(NavigableMap<Inte
1647
1648
}
1648
1649
1649
1650
public FetchDataInfo read (RemoteStorageFetchInfo remoteStorageFetchInfo ) throws RemoteStorageException , IOException {
1650
- int fetchMaxBytes = remoteStorageFetchInfo .fetchMaxBytes ;
1651
- TopicPartition tp = remoteStorageFetchInfo .topicIdPartition .topicPartition ();
1652
- FetchRequest .PartitionData fetchInfo = remoteStorageFetchInfo .fetchInfo ;
1651
+ int fetchMaxBytes = remoteStorageFetchInfo .fetchMaxBytes () ;
1652
+ TopicPartition tp = remoteStorageFetchInfo .topicIdPartition () .topicPartition ();
1653
+ FetchRequest .PartitionData fetchInfo = remoteStorageFetchInfo .fetchInfo () ;
1653
1654
1654
- boolean includeAbortedTxns = remoteStorageFetchInfo .fetchIsolation == FetchIsolation .TXN_COMMITTED ;
1655
+ boolean includeAbortedTxns = remoteStorageFetchInfo .fetchIsolation () == FetchIsolation .TXN_COMMITTED ;
1655
1656
1656
1657
long offset = fetchInfo .fetchOffset ;
1657
1658
int maxBytes = Math .min (fetchMaxBytes , fetchInfo .maxBytes );
@@ -1703,14 +1704,14 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws
1703
1704
// An empty record is sent instead of an incomplete batch when
1704
1705
// - there is no minimum-one-message constraint and
1705
1706
// - the first batch size is more than maximum bytes that can be sent and
1706
- if (!remoteStorageFetchInfo .minOneMessage && firstBatchSize > maxBytes ) {
1707
+ if (!remoteStorageFetchInfo .minOneMessage () && firstBatchSize > maxBytes ) {
1707
1708
LOGGER .debug ("Returning empty record for offset {} in partition {} because the first batch size {} " +
1708
1709
"is greater than max fetch bytes {}" , offset , tp , firstBatchSize , maxBytes );
1709
1710
return new FetchDataInfo (new LogOffsetMetadata (offset ), MemoryRecords .EMPTY );
1710
1711
}
1711
1712
1712
1713
int updatedFetchSize =
1713
- remoteStorageFetchInfo .minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes ;
1714
+ remoteStorageFetchInfo .minOneMessage () && firstBatchSize > maxBytes ? firstBatchSize : maxBytes ;
1714
1715
1715
1716
ByteBuffer buffer = ByteBuffer .allocate (updatedFetchSize );
1716
1717
int remainingBytes = updatedFetchSize ;
@@ -1759,7 +1760,7 @@ private FetchDataInfo addAbortedTransactions(long startOffset,
1759
1760
1760
1761
OffsetIndex offsetIndex = indexCache .getIndexEntry (segmentMetadata ).offsetIndex ();
1761
1762
long upperBoundOffset = offsetIndex .fetchUpperBoundOffset (startOffsetPosition , fetchSize )
1762
- .map (position -> position . offset ).orElse (segmentMetadata .endOffset () + 1 );
1763
+ .map (OffsetPosition :: offset ).orElse (segmentMetadata .endOffset () + 1 );
1763
1764
1764
1765
final Set <FetchResponseData .AbortedTransaction > abortedTransactions = new HashSet <>();
1765
1766
@@ -1804,8 +1805,8 @@ private void collectAbortedTransactions(long startOffset,
1804
1805
if (txnIndexOpt .isPresent ()) {
1805
1806
TransactionIndex txnIndex = txnIndexOpt .get ();
1806
1807
TxnIndexSearchResult searchResult = txnIndex .collectAbortedTxns (startOffset , upperBoundOffset );
1807
- accumulator .accept (searchResult .abortedTransactions );
1808
- isSearchComplete = searchResult .isComplete ;
1808
+ accumulator .accept (searchResult .abortedTransactions () );
1809
+ isSearchComplete = searchResult .isComplete () ;
1809
1810
}
1810
1811
if (!isSearchComplete ) {
1811
1812
currentMetadataOpt = findNextSegmentWithTxnIndex (tp , currentMetadata .endOffset () + 1 , leaderEpochCache );
@@ -1833,8 +1834,8 @@ private void collectAbortedTransactionInLocalSegments(long startOffset,
1833
1834
TransactionIndex txnIndex = localLogSegments .next ().txnIndex ();
1834
1835
if (txnIndex != null ) {
1835
1836
TxnIndexSearchResult searchResult = txnIndex .collectAbortedTxns (startOffset , upperBoundOffset );
1836
- accumulator .accept (searchResult .abortedTransactions );
1837
- if (searchResult .isComplete ) {
1837
+ accumulator .accept (searchResult .abortedTransactions () );
1838
+ if (searchResult .isComplete () ) {
1838
1839
return ;
1839
1840
}
1840
1841
}
@@ -1875,9 +1876,9 @@ Optional<RemoteLogSegmentMetadata> findNextSegmentWithTxnIndex(TopicPartition tp
1875
1876
}
1876
1877
int initialEpoch = initialEpochOpt .getAsInt ();
1877
1878
for (EpochEntry epochEntry : leaderEpochCache .epochEntries ()) {
1878
- if (epochEntry .epoch >= initialEpoch ) {
1879
- long startOffset = Math .max (epochEntry .startOffset , offset );
1880
- Optional <RemoteLogSegmentMetadata > metadataOpt = fetchNextSegmentWithTxnIndex (tp , epochEntry .epoch , startOffset );
1879
+ if (epochEntry .epoch () >= initialEpoch ) {
1880
+ long startOffset = Math .max (epochEntry .startOffset () , offset );
1881
+ Optional <RemoteLogSegmentMetadata > metadataOpt = fetchNextSegmentWithTxnIndex (tp , epochEntry .epoch () , startOffset );
1881
1882
if (metadataOpt .isPresent ()) {
1882
1883
return metadataOpt ;
1883
1884
}
@@ -1906,7 +1907,7 @@ OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition, Unifie
1906
1907
LeaderEpochFileCache leaderEpochCache = log .leaderEpochCache ();
1907
1908
Optional <EpochEntry > maybeEpochEntry = leaderEpochCache .latestEntry ();
1908
1909
while (offsetAndEpoch == null && maybeEpochEntry .isPresent ()) {
1909
- int epoch = maybeEpochEntry .get ().epoch ;
1910
+ int epoch = maybeEpochEntry .get ().epoch () ;
1910
1911
Optional <Long > highestRemoteOffsetOpt =
1911
1912
remoteLogMetadataManagerPlugin .get ().highestOffsetForEpoch (topicIdPartition , epoch );
1912
1913
if (highestRemoteOffsetOpt .isPresent ()) {
@@ -1935,7 +1936,7 @@ long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throw
1935
1936
Optional <Long > logStartOffset = Optional .empty ();
1936
1937
LeaderEpochFileCache leaderEpochCache = log .leaderEpochCache ();
1937
1938
OptionalInt earliestEpochOpt = leaderEpochCache .earliestEntry ()
1938
- .map (epochEntry -> OptionalInt .of (epochEntry .epoch ))
1939
+ .map (epochEntry -> OptionalInt .of (epochEntry .epoch () ))
1939
1940
.orElseGet (OptionalInt ::empty );
1940
1941
while (logStartOffset .isEmpty () && earliestEpochOpt .isPresent ()) {
1941
1942
Iterator <RemoteLogSegmentMetadata > iterator =
0 commit comments