Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
import static org.testng.AssertJUnit.assertTrue;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -43,10 +42,6 @@
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.Logger;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand All @@ -57,6 +52,7 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.utils.TestLogAppender;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
Expand Down Expand Up @@ -480,18 +476,12 @@ public void testCreateNamespaceEventsSystemTopicFactoryException() throws Except
@Test
public void testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws Exception {
// catch the log output in SystemTopicBasedTopicPoliciesService
Logger logger = (Logger) LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
List<String> logMessages = new ArrayList<>();
AbstractAppender appender = new AbstractAppender("TestAppender", null, null) {
@Override
public void append(LogEvent event) {
logMessages.add(event.getMessage().getFormattedMessage());
}
};
appender.start();
logger.addAppender(appender);
@Cleanup
TestLogAppender testLogAppender =
TestLogAppender.create(Optional.of("SystemTopicBasedTopicPoliciesService"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this would attach to the root logger since SystemTopicBasedTopicPoliciesService wouldn't be found. The logger name is the full class name.
Please improve TestLogAppender and add a .create(Class<?> loggerNameClass) method to simplify the creation. It could delegate to create(Optional.of(loggerNameClass.getName())).

Copy link
Contributor Author

@TakaHiR07 TakaHiR07 Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace to TestLogAppender testLogAppender = TestLogAppender.create(Optional.of(SystemTopicBasedTopicPoliciesService.class.getName())); ?

Copy link
Member

@lhotari lhotari Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would work, but please modify TestLogAppenderand add a new method there (.create(Class<?> loggerNameClass)). That way it will be cleaner. TestLogAppender can be improved in PRs when there are new use cases for improvements.


// create namespace-5 and topic
pulsar.getTopicPoliciesService().close();
SystemTopicBasedTopicPoliciesService spyService =
Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true);
Expand Down Expand Up @@ -527,8 +517,8 @@ public void append(LogEvent event) {
reader.close();
log.info("successfully close spy reader");
Awaitility.await().untilAsserted(() -> {
boolean logFound = logMessages.stream()
.anyMatch(msg -> msg.contains("Closing the topic policies reader for"));
boolean logFound = testLogAppender.getEvents().stream().anyMatch(logEvent ->
logEvent.getMessage().toString().contains("Closing the topic policies reader for"));
assertTrue(logFound);
});

Expand Down Expand Up @@ -565,39 +555,29 @@ public void append(LogEvent event) {
// make sure not do cleanPoliciesCacheInitMap() twice
// totally trigger prepareInitPoliciesCacheAsync() twice, so the time of cleanPoliciesCacheInitMap() is 2.
// in previous code, the time would be 3
boolean logFound = logMessages.stream()
.anyMatch(msg -> msg.contains("Failed to create reader on __change_events topic"));
boolean logFound = testLogAppender.getEvents().stream().anyMatch(logEvent ->
logEvent.getMessage().toString().contains("Failed to create reader on __change_events topic"));
assertFalse(logFound);
boolean logFound2 = logMessages.stream()
.anyMatch(msg -> msg.contains("Failed to check the move events for the system topic"));
boolean logFound2 = testLogAppender.getEvents().stream().anyMatch(logEvent ->
logEvent.getMessage().toString().contains("Failed to check the move events for the system topic"));
assertTrue(logFound2);
verify(spyService, times(2)).cleanPoliciesCacheInitMap(any(), anyBoolean());

// make sure not occur Recursive update
boolean logFound3 = logMessages.stream()
.anyMatch(msg -> msg.contains("Recursive update"));
boolean logFound3 = testLogAppender.getEvents().stream().anyMatch(logEvent ->
logEvent.getMessage().toString().contains("Recursive update"));
assertFalse(logFound3);

// clean log appender
appender.stop();
logger.removeAppender(appender);
}

@Test
public void testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() throws Exception {
// catch the log output in SystemTopicBasedTopicPoliciesService
Logger logger = (Logger) LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
List<String> logMessages = new ArrayList<>();
AbstractAppender appender = new AbstractAppender("TestAppender", null, null) {
@Override
public void append(LogEvent event) {
logMessages.add(event.getMessage().getFormattedMessage());
}
};
appender.start();
logger.addAppender(appender);
@Cleanup
TestLogAppender testLogAppender =
TestLogAppender.create(Optional.of("SystemTopicBasedTopicPoliciesService"));

// create namespace-5 and topic
pulsar.getTopicPoliciesService().close();
SystemTopicBasedTopicPoliciesService spyService =
Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true);
Expand Down Expand Up @@ -644,17 +624,13 @@ public void append(LogEvent event) {

// make sure not do cleanPoliciesCacheInitMap() twice
// totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanPoliciesCacheInitMap() is 1.
boolean logFound = logMessages.stream()
.anyMatch(msg -> msg.contains("Failed to create reader on __change_events topic"));
boolean logFound = testLogAppender.getEvents().stream().anyMatch(logEvent ->
logEvent.getMessage().toString().contains("Failed to create reader on __change_events topic"));
assertTrue(logFound);
boolean logFound2 = logMessages.stream()
.anyMatch(msg -> msg.contains("Failed to check the move events for the system topic")
|| msg.contains("Failed to read event from the system topic"));
boolean logFound2 = testLogAppender.getEvents().stream().anyMatch(logEvent ->
logEvent.getMessage().toString().contains("Failed to check the move events for the system topic")
|| logEvent.getMessage().toString().contains("Failed to read event from the system topic"));
assertFalse(logFound2);
verify(spyService, times(1)).cleanPoliciesCacheInitMap(any(), anyBoolean());

// clean log appender
appender.stop();
logger.removeAppender(appender);
}
}
Loading