-
Notifications
You must be signed in to change notification settings - Fork 14.6k
MINOR: update TransactionLog#readTxnRecordValue to initialize TransactionMetadata with non-empty topic partitions #20370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the cleanup
producerEpoch, lastProducerEpoch, txnTimeoutMs, TransactionState.PREPARE_COMMIT, util.Set.of(partition1, partition2), 0L, 0L, TransactionVersion.TV_2) | ||
private val txnMetadata2 = new TransactionMetadata(transactionalId2, producerId2, producerId2, RecordBatch.NO_PRODUCER_ID, | ||
producerEpoch, lastProducerEpoch, txnTimeoutMs, TransactionState.PREPARE_COMMIT, util.Set.of(partition1), 0L, 0L, TransactionVersion.TV_2) | ||
private val txnMetadata1 = new TransactionMetadata(transactionalId1, producerId1, producerId1, RecordBatch.NO_PRODUCER_ID, producerEpoch, lastProducerEpoch, txnTimeoutMs, TransactionState.PREPARE_COMMIT, 0L, 0L, TransactionVersion.TV_2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems those tests assume txnMetadata1
have some topicPartitions initially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good catch -- are we testing the loading path anywhere else? I think that might be a scenario where we start with partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I missed this one. Fixed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry so for when we load the partition in the non test code, we always call this add partitions method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we always call this add partitions method?
yes
transactionMetadata.addPartitions(partitionsSchema.partitionIds |
we could remove the usage of addPartitions
from production code. for example:
val state = TransactionState.fromId(value.transactionStatus)
val tps: util.Set[TopicPartition] = if (!state.equals(TransactionState.EMPTY)) value.transactionPartitions
.stream().flatMap(partitionsSchema => partitionsSchema.partitionIds().stream().map(id => new TopicPartition(partitionsSchema.topic(), id.intValue())))
.collect(Collectors.toSet)
else util.Set.of()
Some(new TransactionMetadata(
transactionalId,
value.producerId,
value.previousProducerId,
value.nextProducerId,
value.producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH,
value.transactionTimeoutMs,
state,
tps,
value.transactionStartTimestampMs,
value.transactionLastUpdateTimestampMs,
TransactionVersion.fromFeatureLevel(value.clientTransactionVersion)))
If the method removePartition
could adopt copy-on-write policy, the inner member topicPartitions
could be an immutable collection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I don't know if this is particularly better or worse -- I don't have a strong opinion either way. Just wanted to confirm that we have a way we add partitions and do it the same way each time. 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be acceptable to remove addPartitions in this PR; we can address removePartition in a subsequent change if it does not cause performance regression.
…ompletionHandlerTest Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM assuming CI pass
Signed-off-by: PoAn Yang <payang@apache.org>
|
||
if (!transactionMetadata.state.equals(TransactionState.EMPTY)) | ||
value.transactionPartitions.forEach(partitionsSchema => { | ||
transactionMetadata.addPartitions(partitionsSchema.partitionIds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If removing addPartitions
is too costly right now, could we avoid exposing it publicly and mark it VisibleForTesting
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply. I add // VisibleForTesting
for functions which are only used in testing code. Thanks.
Signed-off-by: PoAn Yang <payang@apache.org>
This is followup PR for #19699.
TransactionMetadata with non-empty topic partitions
TxnTransitMetadata
comment, because it's not immutable.Reviewers: TengYao Chi kitingiao@gmail.com, Justine Olshan
jolshan@confluent.io, Kuan-Po Tseng brandboat@gmail.com, Chia-Ping
Tsai chia7712@gmail.com