Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

Commit 9d440e6

Browse files
eolivelligaoran10
authored andcommitted
g[transactions] Implement KIP-664 listTransactions (#76)
(cherry picked from commit 5ef4a85)
1 parent 30f78c2 commit 9d440e6

File tree

1 file changed

+110
-0
lines changed

1 file changed

+110
-0
lines changed

tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionTest.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1601,6 +1601,116 @@ private static void assertTransactionState(AdminClient kafkaAdmin, String transa
16011601
assertEquals(transactionState, transactionListing.state());
16021602
}
16031603

1604+
@Test(timeOut = 100000 * 30)
1605+
public void testListTransactions() throws Exception {
1606+
1607+
String topicName = "testListTransactions";
1608+
String transactionalId = "myProducer_" + UUID.randomUUID();
1609+
1610+
@Cleanup
1611+
KafkaProducer<Integer, String> producer = buildTransactionProducer(transactionalId);
1612+
@Cleanup
1613+
AdminClient kafkaAdmin = AdminClient.create(newKafkaAdminClientProperties());
1614+
1615+
producer.initTransactions();
1616+
producer.beginTransaction();
1617+
assertTransactionState(kafkaAdmin, transactionalId,
1618+
org.apache.kafka.clients.admin.TransactionState.EMPTY);
1619+
producer.send(new ProducerRecord<>(topicName, 1, "bar")).get();
1620+
producer.flush();
1621+
1622+
ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions();
1623+
listTransactionsResult.all().get().forEach(t -> {
1624+
log.info("Found transactionalId: {} {} {}",
1625+
t.transactionalId(),
1626+
t.producerId(),
1627+
t.state());
1628+
});
1629+
assertTransactionState(kafkaAdmin, transactionalId,
1630+
org.apache.kafka.clients.admin.TransactionState.ONGOING);
1631+
Awaitility.await().untilAsserted(() -> {
1632+
assertTransactionState(kafkaAdmin, transactionalId,
1633+
org.apache.kafka.clients.admin.TransactionState.ONGOING);
1634+
});
1635+
producer.commitTransaction();
1636+
Awaitility.await().untilAsserted(() -> {
1637+
assertTransactionState(kafkaAdmin, transactionalId,
1638+
org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT);
1639+
});
1640+
producer.beginTransaction();
1641+
1642+
assertTransactionState(kafkaAdmin, transactionalId,
1643+
org.apache.kafka.clients.admin.TransactionState.COMPLETE_COMMIT);
1644+
1645+
producer.send(new ProducerRecord<>(topicName, 1, "bar")).get();
1646+
producer.flush();
1647+
producer.abortTransaction();
1648+
Awaitility.await().untilAsserted(() -> {
1649+
assertTransactionState(kafkaAdmin, transactionalId,
1650+
org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT);
1651+
});
1652+
producer.close();
1653+
assertTransactionState(kafkaAdmin, transactionalId,
1654+
org.apache.kafka.clients.admin.TransactionState.COMPLETE_ABORT);
1655+
}
1656+
1657+
private static void assertTransactionState(AdminClient kafkaAdmin, String transactionalId,
1658+
org.apache.kafka.clients.admin.TransactionState transactionState)
1659+
throws Exception {
1660+
ListTransactionsResult listTransactionsResult = kafkaAdmin.listTransactions();
1661+
Collection<TransactionListing> transactionListings = listTransactionsResult.all().get();
1662+
transactionListings.forEach(t -> {
1663+
log.info("Found transactionalId: {} {} {}",
1664+
t.transactionalId(),
1665+
t.producerId(),
1666+
t.state());
1667+
});
1668+
TransactionListing transactionListing = transactionListings
1669+
.stream()
1670+
.filter(t -> t.transactionalId().equals(transactionalId))
1671+
.findFirst()
1672+
.get();
1673+
assertEquals(transactionState, transactionListing.state());
1674+
1675+
// filter for the same state
1676+
ListTransactionsOptions optionFilterState = new ListTransactionsOptions()
1677+
.filterStates(Collections.singleton(transactionState));
1678+
listTransactionsResult = kafkaAdmin.listTransactions(optionFilterState);
1679+
transactionListings = listTransactionsResult.all().get();
1680+
transactionListing = transactionListings
1681+
.stream()
1682+
.filter(t -> t.transactionalId().equals(transactionalId))
1683+
.findFirst()
1684+
.get();
1685+
assertEquals(transactionState, transactionListing.state());
1686+
1687+
1688+
// filter for the same producer id
1689+
ListTransactionsOptions optionFilterProducer = new ListTransactionsOptions()
1690+
.filterProducerIds(Collections.singleton(transactionListing.producerId()));
1691+
listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducer);
1692+
transactionListings = listTransactionsResult.all().get();
1693+
transactionListing = transactionListings
1694+
.stream()
1695+
.filter(t -> t.transactionalId().equals(transactionalId))
1696+
.findFirst()
1697+
.get();
1698+
assertEquals(transactionState, transactionListing.state());
1699+
1700+
// filter for the same producer id and state
1701+
ListTransactionsOptions optionFilterProducerAndState = new ListTransactionsOptions()
1702+
.filterStates(Collections.singleton(transactionState))
1703+
.filterProducerIds(Collections.singleton(transactionListing.producerId()));
1704+
listTransactionsResult = kafkaAdmin.listTransactions(optionFilterProducerAndState);
1705+
transactionListings = listTransactionsResult.all().get();
1706+
transactionListing = transactionListings
1707+
.stream()
1708+
.filter(t -> t.transactionalId().equals(transactionalId))
1709+
.findFirst()
1710+
.get();
1711+
assertEquals(transactionState, transactionListing.state());
1712+
}
1713+
16041714
/**
16051715
* Get the Kafka server address.
16061716
*/

0 commit comments

Comments
 (0)