diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 2f503e5512a22..5ef793f2156c6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -28,7 +28,6 @@ import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; import static org.testng.AssertJUnit.assertTrue; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -43,10 +42,6 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.reflect.FieldUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.core.LogEvent; -import org.apache.logging.log4j.core.Logger; -import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -57,6 +52,7 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.apache.pulsar.utils.TestLogAppender; import org.assertj.core.api.Assertions; import org.awaitility.Awaitility; import org.mockito.Mockito; @@ -480,18 +476,12 @@ public void testCreateNamespaceEventsSystemTopicFactoryException() throws Except @Test public void testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws Exception { // catch the log output in SystemTopicBasedTopicPoliciesService - Logger logger = (Logger) LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class); - List logMessages = new ArrayList<>(); - AbstractAppender appender = new AbstractAppender("TestAppender", null, null) { - @Override - public void append(LogEvent event) { - logMessages.add(event.getMessage().getFormattedMessage()); - } - }; - appender.start(); - logger.addAppender(appender); + @Cleanup + TestLogAppender testLogAppender = + TestLogAppender.create(Optional.of("SystemTopicBasedTopicPoliciesService")); // create namespace-5 and topic + pulsar.getTopicPoliciesService().close(); SystemTopicBasedTopicPoliciesService spyService = Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar)); FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true); @@ -527,8 +517,8 @@ public void append(LogEvent event) { reader.close(); log.info("successfully close spy reader"); Awaitility.await().untilAsserted(() -> { - boolean logFound = logMessages.stream() - .anyMatch(msg -> msg.contains("Closing the topic policies reader for")); + boolean logFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> + logEvent.getMessage().toString().contains("Closing the topic policies reader for")); assertTrue(logFound); }); @@ -565,39 +555,29 @@ public void append(LogEvent event) { // make sure not do cleanPoliciesCacheInitMap() twice // totally trigger prepareInitPoliciesCacheAsync() twice, so the time of cleanPoliciesCacheInitMap() is 2. // in previous code, the time would be 3 - boolean logFound = logMessages.stream() - .anyMatch(msg -> msg.contains("Failed to create reader on __change_events topic")); + boolean logFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> + logEvent.getMessage().toString().contains("Failed to create reader on __change_events topic")); assertFalse(logFound); - boolean logFound2 = logMessages.stream() - .anyMatch(msg -> msg.contains("Failed to check the move events for the system topic")); + boolean logFound2 = testLogAppender.getEvents().stream().anyMatch(logEvent -> + logEvent.getMessage().toString().contains("Failed to check the move events for the system topic")); assertTrue(logFound2); verify(spyService, times(2)).cleanPoliciesCacheInitMap(any(), anyBoolean()); // make sure not occur Recursive update - boolean logFound3 = logMessages.stream() - .anyMatch(msg -> msg.contains("Recursive update")); + boolean logFound3 = testLogAppender.getEvents().stream().anyMatch(logEvent -> + logEvent.getMessage().toString().contains("Recursive update")); assertFalse(logFound3); - - // clean log appender - appender.stop(); - logger.removeAppender(appender); } @Test public void testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() throws Exception { // catch the log output in SystemTopicBasedTopicPoliciesService - Logger logger = (Logger) LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class); - List logMessages = new ArrayList<>(); - AbstractAppender appender = new AbstractAppender("TestAppender", null, null) { - @Override - public void append(LogEvent event) { - logMessages.add(event.getMessage().getFormattedMessage()); - } - }; - appender.start(); - logger.addAppender(appender); + @Cleanup + TestLogAppender testLogAppender = + TestLogAppender.create(Optional.of("SystemTopicBasedTopicPoliciesService")); // create namespace-5 and topic + pulsar.getTopicPoliciesService().close(); SystemTopicBasedTopicPoliciesService spyService = Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar)); FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true); @@ -644,17 +624,13 @@ public void append(LogEvent event) { // make sure not do cleanPoliciesCacheInitMap() twice // totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanPoliciesCacheInitMap() is 1. - boolean logFound = logMessages.stream() - .anyMatch(msg -> msg.contains("Failed to create reader on __change_events topic")); + boolean logFound = testLogAppender.getEvents().stream().anyMatch(logEvent -> + logEvent.getMessage().toString().contains("Failed to create reader on __change_events topic")); assertTrue(logFound); - boolean logFound2 = logMessages.stream() - .anyMatch(msg -> msg.contains("Failed to check the move events for the system topic") - || msg.contains("Failed to read event from the system topic")); + boolean logFound2 = testLogAppender.getEvents().stream().anyMatch(logEvent -> + logEvent.getMessage().toString().contains("Failed to check the move events for the system topic") + || logEvent.getMessage().toString().contains("Failed to read event from the system topic")); assertFalse(logFound2); verify(spyService, times(1)).cleanPoliciesCacheInitMap(any(), anyBoolean()); - - // clean log appender - appender.stop(); - logger.removeAppender(appender); } }