Skip to content

Commit

Permalink
KAFKA-18575: Transaction Version 2 doesn't correctly handle race con…
Browse files Browse the repository at this point in the history
…dition with completing and new transaction(#18604)

There is a subtle race condition with transactions V2 if a transaction is still completing when checking if we need to add a partition, but it completes when the request reaches the coordinator.

One approach was to remove the verification for TV2 and just check the epoch on write, but a simpler one is to simply return concurrent transactions from the partition leader (before attempting to add the partition). I've done this and added a test for this behavior.

Locally, I reproduced the race but adding a 1 second sleep when handling the WriteTxnMarkersRequest and a 2 second delay before adding the partition to the AddPartitionsToTxnManager. Without this change, the race happened on every second transaction as the first one completed. With this change, the error went away.

As a followup, we may want to clean up some of the code and comments with respect to verification as the code is used by both TV0 + verification and TV2. But that doesn't need to complete for 4.0. This does :)

Reviewers: Jeff Kim <[email protected]>, Artem Livshits <[email protected]>, Calvin Liu <[email protected]>
  • Loading branch information
jolshan authored Jan 22, 2025
1 parent 410065a commit 94a1bfb
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
9 changes: 7 additions & 2 deletions core/src/main/scala/kafka/log/UnifiedLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
def hasOngoingTransaction(producerId: Long, producerEpoch: Short): Boolean = lock synchronized {
val entry = producerStateManager.activeProducers.get(producerId)
// With transactions V2, if we see a future epoch, we are likely in the process of completing the previous transaction.
// Return early with ConcurrentTransactionsException until the transaction completes.
if (entry != null && entry.currentTxnFirstOffset.isPresent && entry.producerEpoch() < producerEpoch)
throw new ConcurrentTransactionsException("The producer attempted to update a transaction " +
"while another concurrent operation on the same transaction was ongoing.")
entry != null && entry.currentTxnFirstOffset.isPresent && entry.producerEpoch() == producerEpoch
}

Expand Down Expand Up @@ -1030,7 +1035,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// transaction is completed or aborted. We can guarantee the transaction coordinator knows about the transaction given step 1 and that the transaction is still
// ongoing. If the transaction is expected to be ongoing, we will not set a VerificationGuard. If the transaction is aborted, hasOngoingTransaction is false and
// requestVerificationGuard is the sentinel, so we will throw an error. A subsequent produce request (retry) should create verification state and return to phase 1.
if (batch.isTransactional && !hasOngoingTransaction(batch.producerId, batch.producerEpoch()) && batchMissingRequiredVerification(batch, requestVerificationGuard))
if (batch.isTransactional && !batch.isControlBatch && !hasOngoingTransaction(batch.producerId, batch.producerEpoch()) && batchMissingRequiredVerification(batch, requestVerificationGuard))
throw new InvalidTxnStateException("Record was not part of an ongoing transaction")
}

Expand All @@ -1051,7 +1056,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}

private def batchMissingRequiredVerification(batch: MutableRecordBatch, requestVerificationGuard: VerificationGuard): Boolean = {
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() && !batch.isControlBatch &&
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled() &&
!verificationGuard(batch.producerId).verify(requestVerificationGuard)
}

Expand Down
36 changes: 36 additions & 0 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4020,6 +4020,42 @@ class UnifiedLogTest {
log.appendAsLeader(transactionalRecords, leaderEpoch = 0, verificationGuard = verificationGuard)
}

@Test
def testPreviousTransactionOngoing(): Unit = {
val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true)

val producerId = 23L
val producerEpoch = 1.toShort
val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
val log = createLog(logDir, logConfig, producerStateManagerConfig = producerStateManagerConfig)

val verificationGuard = log.maybeStartTransactionVerification(producerId, 0, producerEpoch)
assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)

val transactionalRecords = MemoryRecords.withTransactionalRecords(
Compression.NONE,
producerId,
producerEpoch,
0,
new SimpleRecord("1".getBytes),
new SimpleRecord("2".getBytes)
)
log.appendAsLeader(transactionalRecords, origin = AppendOrigin.CLIENT, leaderEpoch = 0, verificationGuard = verificationGuard)

assertThrows(classOf[ConcurrentTransactionsException], () => log.maybeStartTransactionVerification(producerId, 0, (producerEpoch + 1).toShort))
assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)

val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
producerId,
producerEpoch,
new EndTransactionMarker(ControlRecordType.COMMIT, 0)
)

log.appendAsLeader(endTransactionMarkerRecord, origin = AppendOrigin.COORDINATOR, leaderEpoch = 0)
val verificationGuard2 = log.maybeStartTransactionVerification(producerId, 0, (producerEpoch + 1).toShort)
assertNotEquals(VerificationGuard.SENTINEL, verificationGuard2)
}

@Test
def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure(): Unit = {
val logConfig = LogTestUtils.createLogConfig()
Expand Down

0 comments on commit 94a1bfb

Please sign in to comment.