Skip to content

Conversation

FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented Aug 18, 2025

This is followup PR for #19699.

  • Update TransactionLog#readTxnRecordValue to initialize
    TransactionMetadata with non-empty topic partitions
  • Update TxnTransitMetadata comment, because it's not immutable.

Reviewers: TengYao Chi kitingiao@gmail.com, Justine Olshan
jolshan@confluent.io, Kuan-Po Tseng brandboat@gmail.com, Chia-Ping
Tsai chia7712@gmail.com

Signed-off-by: PoAn Yang <payang@apache.org>
Copy link
Member

@brandboat brandboat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the cleanup

producerEpoch, lastProducerEpoch, txnTimeoutMs, TransactionState.PREPARE_COMMIT, util.Set.of(partition1, partition2), 0L, 0L, TransactionVersion.TV_2)
private val txnMetadata2 = new TransactionMetadata(transactionalId2, producerId2, producerId2, RecordBatch.NO_PRODUCER_ID,
producerEpoch, lastProducerEpoch, txnTimeoutMs, TransactionState.PREPARE_COMMIT, util.Set.of(partition1), 0L, 0L, TransactionVersion.TV_2)
private val txnMetadata1 = new TransactionMetadata(transactionalId1, producerId1, producerId1, RecordBatch.NO_PRODUCER_ID, producerEpoch, lastProducerEpoch, txnTimeoutMs, TransactionState.PREPARE_COMMIT, 0L, 0L, TransactionVersion.TV_2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems those tests assume txnMetadata1 have some topicPartitions initially.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch -- are we testing the loading path anywhere else? I think that might be a scenario where we start with partitions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed this one. Fixed it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry so for when we load the partition in the non test code, we always call this add partitions method?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we always call this add partitions method?

yes

transactionMetadata.addPartitions(partitionsSchema.partitionIds

we could remove the usage of addPartitions from production code. for example:

        val state = TransactionState.fromId(value.transactionStatus)
        val tps: util.Set[TopicPartition] = if (!state.equals(TransactionState.EMPTY)) value.transactionPartitions
          .stream().flatMap(partitionsSchema => partitionsSchema.partitionIds().stream().map(id => new TopicPartition(partitionsSchema.topic(), id.intValue())))
          .collect(Collectors.toSet)
        else util.Set.of()
        Some(new TransactionMetadata(
          transactionalId,
          value.producerId,
          value.previousProducerId,
          value.nextProducerId,
          value.producerEpoch,
          RecordBatch.NO_PRODUCER_EPOCH,
          value.transactionTimeoutMs,
          state,
          tps,
          value.transactionStartTimestampMs,
          value.transactionLastUpdateTimestampMs,
          TransactionVersion.fromFeatureLevel(value.clientTransactionVersion)))

If the method removePartition could adopt copy-on-write policy, the inner member topicPartitions could be an immutable collection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I don't know if this is particularly better or worse -- I don't have a strong opinion either way. Just wanted to confirm that we have a way we add partitions and do it the same way each time. 👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be acceptable to remove addPartitions in this PR; we can address removePartition in a subsequent change if it does not cause performance regression.

…ompletionHandlerTest

Signed-off-by: PoAn Yang <payang@apache.org>
Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM assuming CI pass

@jolshan jolshan self-requested a review August 19, 2025 16:13
@github-actions github-actions bot added the small Small PRs label Aug 20, 2025
@FrankYang0529 FrankYang0529 requested a review from chia7712 August 20, 2025 16:07
@FrankYang0529 FrankYang0529 changed the title MINOR: remove topicPartitions from TransactionMetadata constructor MINOR: update TransactionLog#readTxnRecordValue to initialize TransactionMetadata with non-empty topic partitions Aug 21, 2025
Signed-off-by: PoAn Yang <payang@apache.org>

if (!transactionMetadata.state.equals(TransactionState.EMPTY))
value.transactionPartitions.forEach(partitionsSchema => {
transactionMetadata.addPartitions(partitionsSchema.partitionIds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If removing addPartitions is too costly right now, could we avoid exposing it publicly and mark it VisibleForTesting instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply. I add // VisibleForTesting for functions which are only used in testing code. Thanks.

@chia7712 chia7712 merged commit 5bbc421 into apache:trunk Aug 26, 2025
24 checks passed
@FrankYang0529 FrankYang0529 deleted the KAFKA-18884 branch August 26, 2025 06:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker small Small PRs transactions Transactions and EOS
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants