Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 6108953

Browse files
eolivelligaoran10
authored andcommitted
[transactions] Implement KIP-664 - DESCRIBE_TRANSACTIONS (#77)
(cherry picked from commit 1f2fe99)
1 parent 9d440e6 commit 6108953

File tree

4 files changed

+123
-112
lines changed

4 files changed

+123
-112
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
328328
case LIST_TRANSACTIONS:
329329
handleListTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
330330
break;
331+
case DESCRIBE_TRANSACTIONS:
332+
handleDescribeTransactionsRequest(kafkaHeaderAndRequest, responseFuture);
333+
break;
331334
case DELETE_GROUPS:
332335
handleDeleteGroupsRequest(kafkaHeaderAndRequest, responseFuture);
333336
break;
@@ -589,6 +592,9 @@ protected void handleError(KafkaHeaderAndRequest kafkaHeaderAndRequest,
589592
protected abstract void
590593
handleListTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);
591594

595+
protected abstract void
596+
handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups, CompletableFuture<AbstractResponse> response);
597+
592598
protected abstract void
593599
handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups, CompletableFuture<AbstractResponse> response);
594600

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
import org.apache.kafka.common.message.DescribeClusterResponseData;
118118
import org.apache.kafka.common.message.DescribeConfigsRequestData;
119119
import org.apache.kafka.common.message.DescribeConfigsResponseData;
120+
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
120121
import org.apache.kafka.common.message.EndTxnRequestData;
121122
import org.apache.kafka.common.message.EndTxnResponseData;
122123
import org.apache.kafka.common.message.FetchRequestData;
@@ -161,6 +162,8 @@
161162
import org.apache.kafka.common.requests.DescribeConfigsRequest;
162163
import org.apache.kafka.common.requests.DescribeConfigsResponse;
163164
import org.apache.kafka.common.requests.DescribeGroupsRequest;
165+
import org.apache.kafka.common.requests.DescribeTransactionsRequest;
166+
import org.apache.kafka.common.requests.DescribeTransactionsResponse;
164167
import org.apache.kafka.common.requests.EndTxnRequest;
165168
import org.apache.kafka.common.requests.EndTxnResponse;
166169
import org.apache.kafka.common.requests.FetchRequest;
@@ -2043,6 +2046,16 @@ protected void handleListTransactionsRequest(KafkaHeaderAndRequest listTransacti
20432046
resultFuture.complete(new ListTransactionsResponse(listResult));
20442047
}
20452048

2049+
@Override
2050+
protected void handleDescribeTransactionsRequest(KafkaHeaderAndRequest listGroups,
2051+
CompletableFuture<AbstractResponse> response) {
2052+
checkArgument(listGroups.getRequest() instanceof DescribeTransactionsRequest);
2053+
DescribeTransactionsRequest request = (DescribeTransactionsRequest) listGroups.getRequest();
2054+
DescribeTransactionsResponseData describeResult = getTransactionCoordinator()
2055+
.handleDescribeTransactions(request.data().transactionalIds());
2056+
response.complete(new DescribeTransactionsResponse(describeResult));
2057+
}
2058+
20462059
@Override
20472060
protected void handleDeleteGroupsRequest(KafkaHeaderAndRequest deleteGroups,
20482061
CompletableFuture<AbstractResponse> resultFuture) {

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;
1515

16+
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.DEAD;
1617
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.ONGOING;
1718
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_ABORT;
1819
import static io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState.PREPARE_COMMIT;
@@ -53,6 +54,7 @@
5354
import org.apache.commons.lang3.StringUtils;
5455
import org.apache.kafka.common.TopicPartition;
5556
import org.apache.kafka.common.internals.Topic;
57+
import org.apache.kafka.common.message.DescribeTransactionsResponseData;
5658
import org.apache.kafka.common.message.ListTransactionsResponseData;
5759
import org.apache.kafka.common.protocol.Errors;
5860
import org.apache.kafka.common.record.RecordBatch;
@@ -233,6 +235,76 @@ public ListTransactionsResponseData handleListTransactions(List<String> filtered
233235
return this.txnManager.listTransactionStates(filteredProducerIds, filteredStates);
234236
}
235237

238+
public DescribeTransactionsResponseData handleDescribeTransactions(List<String> transactionalIds) {
239+
DescribeTransactionsResponseData response = new DescribeTransactionsResponseData();
240+
if (transactionalIds != null) {
241+
transactionalIds.forEach(transactionalId -> {
242+
DescribeTransactionsResponseData.TransactionState transactionState =
243+
handleDescribeTransactions(transactionalId);
244+
response.transactionStates().add(transactionState);
245+
});
246+
}
247+
return response;
248+
}
249+
250+
private DescribeTransactionsResponseData.TransactionState handleDescribeTransactions(String transactionalId) {
251+
// https://github.com/apache/kafka/blob/915991445fde106d02e61a70425ae2601c813db0/core/
252+
// src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala#L270
253+
if (transactionalId == null) {
254+
throw new IllegalArgumentException("Invalid null transactionalId");
255+
}
256+
257+
DescribeTransactionsResponseData.TransactionState transactionState =
258+
new DescribeTransactionsResponseData.TransactionState()
259+
.setTransactionalId(transactionalId);
260+
261+
if (!isActive.get()) {
262+
transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
263+
} else if (transactionalId.isEmpty()) {
264+
transactionState.setErrorCode(Errors.INVALID_REQUEST.code());
265+
} else {
266+
Either<Errors, Optional<CoordinatorEpochAndTxnMetadata>> tState =
267+
txnManager.getTransactionState(transactionalId);
268+
if (tState.isLeft()) {
269+
transactionState.setErrorCode(tState.getLeft().code());
270+
} else {
271+
Optional<CoordinatorEpochAndTxnMetadata> right = tState.getRight();
272+
if (!right.isPresent()) {
273+
transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code());
274+
} else {
275+
CoordinatorEpochAndTxnMetadata coordinatorEpochAndMetadata = right.get();
276+
TransactionMetadata txnMetadata = coordinatorEpochAndMetadata.getTransactionMetadata();
277+
txnMetadata.inLock(() -> {
278+
if (txnMetadata.getState() == DEAD) {
279+
// The transaction state is being expired, so ignore it
280+
transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code());
281+
} else {
282+
txnMetadata.getTopicPartitions().forEach(topicPartition -> {
283+
var topicData = transactionState.topics().find(topicPartition.topic());
284+
if (topicData == null) {
285+
topicData = new DescribeTransactionsResponseData.TopicData()
286+
.setTopic(topicPartition.topic());
287+
transactionState.topics().add(topicData);
288+
}
289+
topicData.partitions().add(topicPartition.partition());
290+
});
291+
292+
transactionState
293+
.setErrorCode(Errors.NONE.code())
294+
.setProducerId(txnMetadata.getProducerId())
295+
.setProducerEpoch(txnMetadata.getProducerEpoch())
296+
.setTransactionState(txnMetadata.getState().toAdminState().toString())
297+
.setTransactionTimeoutMs(txnMetadata.getTxnTimeoutMs())
298+
.setTransactionStartTimeMs(txnMetadata.getTxnStartTimestamp());
299+
}
300+
return null;
301+
});
302+
}
303+
}
304+
}
305+
return transactionState;
306+
}
307+
236308
@Data
237309
@EqualsAndHashCode
238310
@AllArgsConstructor

tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java

Lines changed: 32 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.kafka.clients.admin.ListTransactionsOptions;
5757
import org.apache.kafka.clients.admin.ListTransactionsResult;
5858
import org.apache.kafka.clients.admin.NewTopic;
59+
import org.apache.kafka.clients.admin.TransactionDescription;
5960
import org.apache.kafka.clients.admin.TransactionListing;
6061
import org.apache.kafka.clients.consumer.ConsumerConfig;
6162
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -87,6 +88,8 @@
8788
@Slf4j
8889
public class TransactionTest extends KopProtocolHandlerTestBase {
8990

91+
private static final int TRANSACTION_TIMEOUT_CONFIG_VALUE = 600 * 1000;
92+
9093
protected void setupTransactions() {
9194
this.conf.setDefaultNumberOfNamespaceBundles(4);
9295
this.conf.setOffsetsTopicNumPartitions(10);
@@ -1162,7 +1165,7 @@ private KafkaProducer<Integer, String> buildTransactionProducer(String transacti
11621165
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, txTimeout);
11631166
} else {
11641167
// very long time-out
1165-
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 600 * 1000);
1168+
producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, TRANSACTION_TIMEOUT_CONFIG_VALUE);
11661169
}
11671170
producerProps.put(CLIENT_ID_CONFIG, "dummy_client_" + UUID.randomUUID());
11681171
addCustomizeProps(producerProps);
@@ -1491,10 +1494,10 @@ public void testAbortedTxEventuallyPurged() throws Exception {
14911494
}
14921495
}
14931496

1494-
@Test(timeOut = 100000 * 30)
1495-
public void testListTransactions() throws Exception {
1497+
@Test(timeOut = 1000 * 30)
1498+
public void testListAndDescribeTransactions() throws Exception {
14961499

1497-
String topicName = "testListTransactions";
1500+
String topicName = "testListAndDescribeTransactions";
14981501
String transactionalId = "myProducer_" + UUID.randomUUID();
14991502

15001503
@Cleanup
@@ -1599,116 +1602,33 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa
15991602
.findFirst()
16001603
.get();
16011604
assertEquals(transactionState, transactionListing.state());
1602-
}
1603-
1604-
@Test(timeOut = 100000 * 30)
1605-
public void testListTransactions() throws Exception {
1606-
1607-
String topicName = "testListTransactions";
1608-
String transactionalId = "myProducer_" + UUID.randomUUID();
1609-
1610-
@Cleanup
1611-
KafkaProducer<Integer, String> producer = buildTransactionProducer(transactionalId);
1612-
@Cleanup
1613-
AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties());
1614-
1615-
producer.initTransactions();
1616-
producer.beginTransaction();
1617-
assertTransactionState(kafkaAdmin, transactionalId,
1618-
org.apache.kafka.clients.admin.TransactionState.EMPTY);
1619-
producer.send(new ProducerRecord<>(topicName, 1, "bar")).get();
1620-
producer.flush();
1621-
1622-
ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions();
1623-
listTransactionsResult.all().get().forEach(t -> {
1624-
log.info("Found transactionalId: {} {} {}",
1625-
t.transactionalId(),
1626-
t.producerId(),
1627-
t.state());
1628-
});
1629-
assertTransactionState(kafkaAdmin, transactionalId,
1630-
org.apache.kafka.clients.admin.TransactionState.ONGOING);
1631-
Awaitility.await().untilAsserted(() -> {
1632-
assertTransactionState(kafkaAdmin, transactionalId,
1633-
org.apache.kafka.clients.admin.TransactionState.ONGOING);
1634-
});
1635-
producer.commitTransaction();
1636-
Awaitility.await().untilAsserted(() -> {
1637-
assertTransactionState(kafkaAdmin, transactionalId,
1638-
org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT);
1639-
});
1640-
producer.beginTransaction();
1641-
1642-
assertTransactionState(kafkaAdmin, transactionalId,
1643-
org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT);
1644-
1645-
producer.send(new ProducerRecord<>(topicName, 1, "bar")).get();
1646-
producer.flush();
1647-
producer.abortTransaction();
1648-
Awaitility.await().untilAsserted(() -> {
1649-
assertTransactionState(kafkaAdmin, transactionalId,
1650-
org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT);
1651-
});
1652-
producer.close();
1653-
assertTransactionState(kafkaAdmin, transactionalId,
1654-
org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT);
1655-
}
16561605

1657-
private static void assertTransactionState(AdminClient kafkaAdmin, String transactionalId,
1658-
org.apache.kafka.clients.admin.TransactionState transactionState)
1659-
throws Exception {
1660-
ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions();
1661-
Collection<TransactionListing> transactionListings = listTransactionsResult.all().get();
1662-
transactionListings.forEach(t -> {
1663-
log.info("Found transactionalId: {} {} {}",
1664-
t.transactionalId(),
1665-
t.producerId(),
1666-
t.state());
1667-
});
1668-
TransactionListing transactionListing = transactionListings
1669-
.stream()
1670-
.filter(t -> t.transactionalId().equals(transactionalId))
1671-
.findFirst()
1672-
.get();
1673-
assertEquals(transactionState, transactionListing.state());
1674-
1675-
// filter for the same state
1676-
ListTransactionsOptions optionFilterState = new ListTransactionsOptions()
1677-
.filterStates(Collections.singleton(transactionState));
1678-
listTransactionsResult = kafkaAdmin.listTransactions(optionFilterState);
1679-
transactionListings = listTransactionsResult.all().get();
1680-
transactionListing = transactionListings
1681-
.stream()
1682-
.filter(t -> t.transactionalId().equals(transactionalId))
1683-
.findFirst()
1684-
.get();
1685-
assertEquals(transactionState, transactionListing.state());
1686-
1687-
1688-
// filter for the same producer id
1689-
ListTransactionsOptions optionFilterProducer = new ListTransactionsOptions()
1690-
.filterProducerIds(Collections.singleton(transactionListing.producerId()));
1691-
listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducer);
1692-
transactionListings = listTransactionsResult.all().get();
1693-
transactionListing = transactionListings
1694-
.stream()
1695-
.filter(t -> t.transactionalId().equals(transactionalId))
1696-
.findFirst()
1697-
.get();
1698-
assertEquals(transactionState, transactionListing.state());
1606+
Map<String, TransactionDescription> map =
1607+
kafkaAdmin.describeTransactions(Collections.singleton(transactionalId))
1608+
.all().get();
1609+
assertEquals(1, map.size());
1610+
TransactionDescription transactionDescription = map.get(transactionalId);
1611+
log.info("transactionDescription {}", transactionDescription);
1612+
assertNotNull(transactionDescription);
1613+
assertEquals(transactionDescription.state(), transactionState);
1614+
assertTrue(transactionDescription.producerEpoch() >= 0);
1615+
assertEquals(TRANSACTION_TIMEOUT_CONFIG_VALUE, transactionDescription.transactionTimeoutMs());
1616+
assertTrue(transactionDescription.transactionStartTimeMs().isPresent());
1617+
assertTrue(transactionDescription.coordinatorId() >= 0);
1618+
1619+
switch (transactionState) {
1620+
case EMPTY:
1621+
case COMPLETE_COMMIT:
1622+
case COMPLETE_ABORT:
1623+
assertEquals(0, transactionDescription.topicPartitions().size());
1624+
break;
1625+
case ONGOING:
1626+
assertEquals(1, transactionDescription.topicPartitions().size());
1627+
break;
1628+
default:
1629+
fail("unhandled " + transactionState);
1630+
}
16991631

1700-
// filter for the same producer id and state
1701-
ListTransactionsOptions optionFilterProducerAndState = new ListTransactionsOptions()
1702-
.filterStates(Collections.singleton(transactionState))
1703-
.filterProducerIds(Collections.singleton(transactionListing.producerId()));
1704-
listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducerAndState);
1705-
transactionListings = listTransactionsResult.all().get();
1706-
transactionListing = transactionListings
1707-
.stream()
1708-
.filter(t -> t.transactionalId().equals(transactionalId))
1709-
.findFirst()
1710-
.get();
1711-
assertEquals(transactionState, transactionListing.state());
17121632
}
17131633

17141634
/**

0 commit comments

Comments
 (0)