From 5546e7e4851ce57bc55b56d8b0161de25d782d14 Mon Sep 17 00:00:00 2001 From: namest504 Date: Thu, 2 Oct 2025 10:16:00 +0900 Subject: [PATCH 1/7] [improve][api] Add getRawMessage() method to TableView for accessing raw Pulsar message --- .../pulsar/client/impl/TableViewTest.java | 34 +++++++++++++++++++ .../apache/pulsar/client/api/TableView.java | 13 +++++++ .../pulsar/client/impl/TableViewImpl.java | 9 +++++ 3 files changed, 56 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java index 58622d7db19a2..857a9718e48a6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java @@ -624,4 +624,38 @@ public void testBuildTableViewWithMessagesAlwaysAvailable() throws Exception { future.get(3, TimeUnit.SECONDS); assertTrue(index.get() <= 0); } + + @Test + public void testGetRawMessage() throws Exception { + String topic = "persistent://public/default/testGetRawMessage"; + admin.topics().createNonPartitionedTopic(topic); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + + String testKey = "key1"; + String testValue = "value1"; + producer.newMessage() + .key(testKey) + .value(testValue) + .property("myProp", "myValue") + .send(); + + @Cleanup + TableView tableView = pulsarClient.newTableView(Schema.STRING) + .topic(topic) + .create(); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> tableView.size() == 1); + + Message rawMessage = tableView.getRawMessage(testKey); + assertTrue(rawMessage != null, "Raw message should not be null for key: " + testKey); + assertEquals(rawMessage.getKey(), testKey); + assertEquals(new String(rawMessage.getData()), testValue); + assertEquals(rawMessage.getProperty("myProp"), "myValue"); + + Message missingMessage = tableView.getRawMessage("missingKey"); + assertTrue(missingMessage == null, "Raw message should be null for missing key"); + } + } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java index 767b8e1103fa6..6685f67e54f30 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java @@ -59,6 +59,19 @@ public interface TableView extends Closeable { */ T get(String key); + /** + * Returns the raw Pulsar {@link Message} object associated with the specified key. + * + *

This method allows access to the original Pulsar message containing raw payload bytes, + * message properties, message ID, and other metadata. It is useful for scenarios where + * clients require access to the entire message beyond the deserialized value {@code T} + * provided by {@link #get(String)}. + * + * @param key the key whose associated raw message is to be returned + * @return the raw {@link Message} object associated with the key, or {@code null} if no mapping exists + */ + Message getRawMessage(String key); + /** * Returns a Set view of the mappings contained in this map. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index 17b49828eeced..d3ad9a9835bd0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -52,6 +52,7 @@ public class TableViewImpl implements TableView { private final TableViewConfigurationData conf; private final ConcurrentMap data; + private final ConcurrentMap> rawMessages; private final Map immutableData; private final CompletableFuture> reader; @@ -85,6 +86,7 @@ public class TableViewImpl implements TableView { this.conf = conf; this.isPersistentTopic = conf.getTopicName().startsWith(TopicDomain.persistent.toString()); this.data = new ConcurrentHashMap<>(); + this.rawMessages = new ConcurrentHashMap<>(); this.immutableData = Collections.unmodifiableMap(data); this.listeners = new ArrayList<>(); this.listenersMutex = new ReentrantLock(); @@ -144,6 +146,11 @@ public T get(String key) { return data.get(key); } + @Override + public Message getRawMessage(String key) { + return rawMessages.get(key); + } + @Override public Set> entrySet() { return immutableData.entrySet(); @@ -235,8 +242,10 @@ private void handleMessage(Message msg) { listenersMutex.lock(); if (null == cur) { data.remove(key); + rawMessages.remove(key); } else { data.put(key, cur); + rawMessages.put(key, msg); } for (BiConsumer listener : listeners) { From 520456790692a446810345ab702ee8fe3a070774 Mon Sep 17 00:00:00 2001 From: namest504 Date: Thu, 2 Oct 2025 15:27:44 +0900 Subject: [PATCH 2/7] [improve][impl] Fix TableViewImpl.handleMessage message lifecycle management by adding retain() before storing message to rawMessages - Prevent premature message release that causes getRawMessage to return null keys. - Ensure message reference count is balanced by retaining message before storing and releasing old message after replacement. - Keep message release in finally block for safety. --- .../pulsar/client/impl/TableViewImpl.java | 128 +++++++++--------- 1 file changed, 63 insertions(+), 65 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index d3ad9a9835bd0..00abc58cff55a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -18,13 +18,12 @@ */ package org.apache.pulsar.client.impl; -import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.topics.TopicCompactionStrategy; + +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -32,19 +31,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.CryptoKeyReader; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.MessageIdAdv; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.api.ReaderBuilder; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.TableView; -import org.apache.pulsar.client.api.TopicMessageId; -import org.apache.pulsar.common.naming.TopicDomain; -import org.apache.pulsar.common.topics.TopicCompactionStrategy; + +import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG; @Slf4j public class TableViewImpl implements TableView { @@ -212,58 +200,68 @@ public void close() throws PulsarClientException { private void handleMessage(Message msg) { lastReadPositions.put(msg.getTopicName(), msg.getMessageId()); - try { - if (msg.hasKey()) { - String key = msg.getKey(); - T cur = msg.size() > 0 ? msg.getValue() : null; - if (log.isDebugEnabled()) { - log.debug("Applying message from topic {}. key={} value={}", - conf.getTopicName(), - key, - cur); - } - boolean update = true; - if (compactionStrategy != null) { - T prev = data.get(key); - update = !compactionStrategy.shouldKeepLeft(prev, cur); - if (!update) { - log.info("Skipped the message from topic {}. key={} value={} prev={}", - conf.getTopicName(), - key, - cur, - prev); - compactionStrategy.handleSkippedMessage(key, cur); - } - } + if (!msg.hasKey()) { + log.warn("Received message with no key, releasing."); + msg.release(); + checkAllFreshTask(msg); + return; + } - if (update) { - try { - listenersMutex.lock(); - if (null == cur) { - data.remove(key); - rawMessages.remove(key); - } else { - data.put(key, cur); - rawMessages.put(key, msg); - } + String key = msg.getKey(); + T cur = msg.size() > 0 ? msg.getValue() : null; + if (log.isDebugEnabled()) { + log.debug("Applying message from topic {}. key={} value={}", + conf.getTopicName(), + key, + cur); + } - for (BiConsumer listener : listeners) { - try { - listener.accept(key, cur); - } catch (Throwable t) { - log.error("Table view listener raised an exception", t); - } - } - } finally { - listenersMutex.unlock(); - } + boolean update = true; + if (compactionStrategy != null) { + T prev = data.get(key); + update = !compactionStrategy.shouldKeepLeft(prev, cur); + if (!update) { + log.info("Skipped the message from topic {}. key={} value={} prev={}", + conf.getTopicName(), + key, + cur, + prev); + compactionStrategy.handleSkippedMessage(key, cur); + msg.release(); + checkAllFreshTask(msg); + return; + } + } + + try { + listenersMutex.lock(); + Message oldRawMessage; + if (null == cur) { + data.remove(key); + oldRawMessage = rawMessages.remove(key); + msg.release(); + } else { + data.put(key, cur); + oldRawMessage = rawMessages.put(key, msg); + } + + if (oldRawMessage != null) { + oldRawMessage.release(); + } + + for (BiConsumer listener : listeners) { + try { + listener.accept(key, cur); + } catch (Throwable t) { + log.error("Table view listener raised an exception", t); } } - checkAllFreshTask(msg); } finally { - msg.release(); + listenersMutex.unlock(); } + + checkAllFreshTask(msg); } @Override From 8eda4b226c5ccecc5ced0d068bcc9cbc921ff2d3 Mon Sep 17 00:00:00 2001 From: namest504 Date: Wed, 15 Oct 2025 00:38:07 +0900 Subject: [PATCH 3/7] Refactor TableView to use builder for Message access --- .../pulsar/client/impl/TableViewTest.java | 22 +- .../apache/pulsar/client/api/TableView.java | 12 - .../pulsar/client/api/TableViewBuilder.java | 21 + .../client/impl/MessageTableViewImpl.java | 405 ++++++++++++++++++ .../client/impl/TableViewBuilderImpl.java | 21 +- .../pulsar/client/impl/TableViewImpl.java | 93 ++-- 6 files changed, 491 insertions(+), 83 deletions(-) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableViewImpl.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java index 857a9718e48a6..f6c9507ea798a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java @@ -626,8 +626,8 @@ public void testBuildTableViewWithMessagesAlwaysAvailable() throws Exception { } @Test - public void testGetRawMessage() throws Exception { - String topic = "persistent://public/default/testGetRawMessage"; + public void testCreateForMessages() throws Exception { + String topic = "persistent://public/default/testCreateForMessages"; admin.topics().createNonPartitionedTopic(topic); @Cleanup @@ -642,20 +642,20 @@ public void testGetRawMessage() throws Exception { .send(); @Cleanup - TableView tableView = pulsarClient.newTableView(Schema.STRING) + TableView> tableView = pulsarClient.newTableViewBuilder(Schema.STRING) .topic(topic) - .create(); + .createForMessages(); Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> tableView.size() == 1); - Message rawMessage = tableView.getRawMessage(testKey); - assertTrue(rawMessage != null, "Raw message should not be null for key: " + testKey); - assertEquals(rawMessage.getKey(), testKey); - assertEquals(new String(rawMessage.getData()), testValue); - assertEquals(rawMessage.getProperty("myProp"), "myValue"); + Message message = tableView.get(testKey); + Assert.assertNotNull(message, "Message should not be null for key: " + testKey); + assertEquals(message.getKey(), testKey); + assertEquals(message.getValue(), testValue); + assertEquals(message.getProperty("myProp"), "myValue"); - Message missingMessage = tableView.getRawMessage("missingKey"); - assertTrue(missingMessage == null, "Raw message should be null for missing key"); + Message missingMessage = tableView.get("missingKey"); + Assert.assertNull(missingMessage, "Message should be null for missing key"); } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java index 6685f67e54f30..1ad8995f6af9f 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java @@ -59,18 +59,6 @@ public interface TableView extends Closeable { */ T get(String key); - /** - * Returns the raw Pulsar {@link Message} object associated with the specified key. - * - *

This method allows access to the original Pulsar message containing raw payload bytes, - * message properties, message ID, and other metadata. It is useful for scenarios where - * clients require access to the entire message beyond the deserialized value {@code T} - * provided by {@link #get(String)}. - * - * @param key the key whose associated raw message is to be returned - * @return the raw {@link Message} object associated with the key, or {@code null} if no mapping exists - */ - Message getRawMessage(String key); /** * Returns a Set view of the mappings contained in this map. diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java index 76b8ff4fbdac3..18252f4179b2d 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java @@ -76,6 +76,27 @@ public interface TableViewBuilder { */ CompletableFuture> createAsync(); + /** + * Finalize the creation of the {@link TableView} instance that contains the full {@link Message}. + * + *

This method will block until the tableView is created successfully or an exception is thrown. + * + * @return the {@link TableView} instance + * @throws PulsarClientException + * if the tableView creation fails + */ + TableView> createForMessages() throws PulsarClientException; + + /** + * Finalize the creation of the {@link TableView} instance that contains the full {@link Message} + * in asynchronous mode. + * + *

This method will return a {@link CompletableFuture} that can be used to access the instance when it's ready. + * + * @return the {@link TableView} instance + */ + CompletableFuture>> createForMessagesAsync(); + /** * Set the topic name of the {@link TableView}. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableViewImpl.java new file mode 100644 index 0000000000000..5d6c7c1c14f1d --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableViewImpl.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TableView; +import org.apache.pulsar.client.api.TopicMessageId; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.topics.TopicCompactionStrategy; + +import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG; + +@Slf4j +class MessageTableViewImpl implements TableView> { + + private final TableViewConfigurationData conf; + + private final ConcurrentMap> data; + private final Map> immutableData; + + private final CompletableFuture> reader; + + private final List>> listeners; + private final ReentrantLock listenersMutex; + private final boolean isPersistentTopic; + private TopicCompactionStrategy> compactionStrategy; + + private final ConcurrentHashMap, Map> pendingRefreshRequests; + + private final ConcurrentHashMap lastReadPositions; + + MessageTableViewImpl(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) { + this.conf = conf; + this.isPersistentTopic = conf.getTopicName().startsWith(TopicDomain.persistent.toString()); + this.data = new ConcurrentHashMap<>(); + this.immutableData = Collections.unmodifiableMap(data); + this.listeners = new ArrayList<>(); + this.listenersMutex = new ReentrantLock(); + this.compactionStrategy = + TopicCompactionStrategy.load(TABLE_VIEW_TAG, conf.getTopicCompactionStrategyClassName()); + this.pendingRefreshRequests = new ConcurrentHashMap<>(); + this.lastReadPositions = new ConcurrentHashMap<>(); + ReaderBuilder readerBuilder = client.newReader(schema) + .topic(conf.getTopicName()) + .startMessageId(MessageId.earliest) + .autoUpdatePartitions(true) + .autoUpdatePartitionsInterval((int) conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS) + .poolMessages(true) + .subscriptionName(conf.getSubscriptionName()); + if (isPersistentTopic) { + readerBuilder.readCompacted(true); + } + + CryptoKeyReader cryptoKeyReader = conf.getCryptoKeyReader(); + if (cryptoKeyReader != null) { + readerBuilder.cryptoKeyReader(cryptoKeyReader); + } + + readerBuilder.cryptoFailureAction(conf.getCryptoFailureAction()); + + this.reader = readerBuilder.createAsync(); + } + + CompletableFuture>> start() { + return reader.thenCompose((reader) -> { + if (!isPersistentTopic) { + readTailMessages(reader); + return CompletableFuture.completedFuture(null); + } + return this.readAllExistingMessages(reader) + .thenRun(() -> readTailMessages(reader)); + }).thenApply(__ -> this); + } + + @Override + public int size() { + return data.size(); + } + + @Override + public boolean isEmpty() { + return data.isEmpty(); + } + + @Override + public boolean containsKey(String key) { + return data.containsKey(key); + } + + @Override + public Message get(String key) { + return data.get(key); + } + + + @Override + public Set>> entrySet() { + return immutableData.entrySet(); + } + + @Override + public Set keySet() { + return immutableData.keySet(); + } + + @Override + public Collection> values() { + return immutableData.values(); + } + + @Override + public void forEach(BiConsumer> action) { + data.forEach(action); + } + + @Override + public void listen(BiConsumer> action) { + try { + listenersMutex.lock(); + listeners.add(action); + } finally { + listenersMutex.unlock(); + } + } + + @Override + public void forEachAndListen(BiConsumer> action) { + try { + listenersMutex.lock(); + forEach(action); + listeners.add(action); + } finally { + listenersMutex.unlock(); + } + } + + @Override + public CompletableFuture closeAsync() { + return reader.thenCompose(Reader::closeAsync); + } + + @Override + public void close() throws PulsarClientException { + try { + closeAsync().get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + } + + private void handleMessage(Message msg) { + lastReadPositions.put(msg.getTopicName(), msg.getMessageId()); + + if (!msg.hasKey()) { + log.warn("Received message with no key, releasing."); + msg.release(); + checkAllFreshTask(msg); + return; + } + + String key = msg.getKey(); + + Message cur = msg.size() > 0 ? msg : null; + + if (log.isDebugEnabled()) { + log.debug("Applying message from topic {}. key={} value={}", + conf.getTopicName(), + key, + cur); + } + + boolean update = true; + if (compactionStrategy != null) { + Message prev = data.get(key); + update = !compactionStrategy.shouldKeepLeft(prev, cur); + if (!update) { + log.info("Skipped the message from topic {}. key={} value={} prev={}", + conf.getTopicName(), + key, + cur, + prev); + compactionStrategy.handleSkippedMessage(key, cur); + msg.release(); + checkAllFreshTask(msg); + return; + } + } + + try { + listenersMutex.lock(); + if (null == cur) { + data.remove(key); + msg.release(); + } else { + data.put(key, cur); + } + + for (BiConsumer> listener : listeners) { + try { + listener.accept(key, cur); + } catch (Throwable t) { + log.error("Table view listener raised an exception", t); + } + } + } finally { + listenersMutex.unlock(); + } + + checkAllFreshTask(msg); + } + + @Override + public CompletableFuture refreshAsync() { + CompletableFuture completableFuture = new CompletableFuture<>(); + reader.thenCompose(reader -> getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> { + if (lastMessageIds.isEmpty()) { + completableFuture.complete(null); + return; + } + pendingRefreshRequests.put(completableFuture, lastMessageIds); + filterReceivedMessages(lastMessageIds); + if (lastMessageIds.isEmpty()) { + pendingRefreshRequests.remove(completableFuture); + completableFuture.complete(null); + } + })).exceptionally(throwable -> { + completableFuture.completeExceptionally(throwable); + pendingRefreshRequests.remove(completableFuture); + return null; + }); + return completableFuture; + } + + @Override + public void refresh() throws PulsarClientException { + try { + refreshAsync().get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + } + + private CompletableFuture readAllExistingMessages(Reader reader) { + long startTime = System.nanoTime(); + AtomicLong messagesRead = new AtomicLong(); + + CompletableFuture future = new CompletableFuture<>(); + getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> { + if (lastMessageIds.isEmpty()) { + future.complete(null); + return; + } + readAllExistingMessages(reader, future, startTime, messagesRead, lastMessageIds); + }).exceptionally(ex -> { + future.completeExceptionally(ex); + return null; + }); + return future; + } + + private CompletableFuture> getLastMessageIdOfNonEmptyTopics(Reader reader) { + return reader.getLastMessageIdsAsync().thenApply(lastMessageIds -> { + Map lastMessageIdMap = new ConcurrentHashMap<>(); + lastMessageIds.forEach(topicMessageId -> { + if (((MessageIdAdv) topicMessageId).getEntryId() >= 0) { + lastMessageIdMap.put(topicMessageId.getOwnerTopic(), topicMessageId); + } + }); + return lastMessageIdMap; + }); + } + + private void filterReceivedMessages(Map lastMessageIds) { + lastMessageIds.forEach((partition, lastMessageId) -> { + MessageId messageId = lastReadPositions.get(partition); + if (messageId != null && lastMessageId.compareTo(messageId) <= 0) { + lastMessageIds.remove(partition); + } + }); + } + + private boolean checkFreshTask(Map maxMessageIds, CompletableFuture future, + MessageId messageId, String topicName) { + TopicMessageId maxMessageId = maxMessageIds.get(topicName); + if (maxMessageId != null && messageId.compareTo(maxMessageId) >= 0) { + maxMessageIds.remove(topicName); + } + if (maxMessageIds.isEmpty()) { + future.complete(null); + return true; + } else { + return false; + } + } + + private void checkAllFreshTask(Message msg) { + pendingRefreshRequests.forEach((future, maxMessageIds) -> { + String topicName = msg.getTopicName(); + MessageId messageId = msg.getMessageId(); + if (checkFreshTask(maxMessageIds, future, messageId, topicName)) { + pendingRefreshRequests.remove(future); + } + }); + } + + private void readAllExistingMessages(Reader reader, CompletableFuture future, long startTime, + AtomicLong messagesRead, Map maxMessageIds) { + reader.hasMessageAvailableAsync() + .thenAccept(hasMessage -> { + if (hasMessage) { + reader.readNextAsync() + .thenAccept(msg -> { + messagesRead.incrementAndGet(); + String topicName = msg.getTopicName(); + MessageId messageId = msg.getMessageId(); + handleMessage(msg); + if (!checkFreshTask(maxMessageIds, future, messageId, topicName)) { + readAllExistingMessages(reader, future, startTime, + messagesRead, maxMessageIds); + } + }).exceptionally(ex -> { + if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { + log.info("Reader {} was closed while reading existing messages.", + reader.getTopic()); + } else { + log.warn("Reader {} was interrupted while reading existing messages. ", + reader.getTopic(), ex); + } + future.completeExceptionally(ex); + return null; + }); + } else { + long endTime = System.nanoTime(); + long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime); + log.info("Started table view for topic {} - Replayed {} messages in {} seconds", + reader.getTopic(), + messagesRead, + durationMillis / 1000.0); + future.complete(null); + } + }); + } + + private void readTailMessages(Reader reader) { + reader.readNextAsync() + .thenAccept(msg -> { + handleMessage(msg); + readTailMessages(reader); + }).exceptionally(ex -> { + if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { + log.info("Reader {} was closed while reading tail messages.", reader.getTopic()); + pendingRefreshRequests.keySet().forEach(future -> { + pendingRefreshRequests.remove(future); + future.completeExceptionally(ex); + }); + } else { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + log.warn("Reader {} was interrupted while reading tail messages. " + + "Retrying..", reader.getTopic(), ex); + readTailMessages(reader); + } + return null; + }); + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java index e0a47a70b1c8b..31967df1c3ffb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java @@ -24,12 +24,7 @@ import java.util.concurrent.TimeUnit; import lombok.NonNull; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; -import org.apache.pulsar.client.api.CryptoKeyReader; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.TableView; -import org.apache.pulsar.client.api.TableViewBuilder; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; public class TableViewBuilderImpl implements TableViewBuilder { @@ -65,6 +60,20 @@ public CompletableFuture> createAsync() { return new TableViewImpl<>(client, schema, conf).start(); } + @Override + public TableView> createForMessages() throws PulsarClientException { + try { + return createForMessagesAsync().get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + } + + @Override + public CompletableFuture>> createForMessagesAsync() { + return new MessageTableViewImpl<>(client, schema, conf).start(); + } + @Override public TableViewBuilder topic(String topic) { checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index 00abc58cff55a..7a44d2b538e6c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -40,7 +40,7 @@ public class TableViewImpl implements TableView { private final TableViewConfigurationData conf; private final ConcurrentMap data; - private final ConcurrentMap> rawMessages; + private final Map immutableData; private final CompletableFuture> reader; @@ -74,7 +74,6 @@ public class TableViewImpl implements TableView { this.conf = conf; this.isPersistentTopic = conf.getTopicName().startsWith(TopicDomain.persistent.toString()); this.data = new ConcurrentHashMap<>(); - this.rawMessages = new ConcurrentHashMap<>(); this.immutableData = Collections.unmodifiableMap(data); this.listeners = new ArrayList<>(); this.listenersMutex = new ReentrantLock(); @@ -134,10 +133,6 @@ public T get(String key) { return data.get(key); } - @Override - public Message getRawMessage(String key) { - return rawMessages.get(key); - } @Override public Set> entrySet() { @@ -201,67 +196,57 @@ public void close() throws PulsarClientException { private void handleMessage(Message msg) { lastReadPositions.put(msg.getTopicName(), msg.getMessageId()); - if (!msg.hasKey()) { - log.warn("Received message with no key, releasing."); - msg.release(); - checkAllFreshTask(msg); - return; - } + try { + if (!msg.hasKey()) { + log.warn("Received message with no key."); + return; + } - String key = msg.getKey(); - T cur = msg.size() > 0 ? msg.getValue() : null; - if (log.isDebugEnabled()) { - log.debug("Applying message from topic {}. key={} value={}", - conf.getTopicName(), - key, - cur); - } + String key = msg.getKey(); + T cur = msg.size() > 0 ? msg.getValue() : null; - boolean update = true; - if (compactionStrategy != null) { - T prev = data.get(key); - update = !compactionStrategy.shouldKeepLeft(prev, cur); - if (!update) { - log.info("Skipped the message from topic {}. key={} value={} prev={}", + if (log.isDebugEnabled()) { + log.debug("Applying message from topic {}. key={} value={}", conf.getTopicName(), key, - cur, - prev); - compactionStrategy.handleSkippedMessage(key, cur); - msg.release(); - checkAllFreshTask(msg); - return; + cur); } - } - try { - listenersMutex.lock(); - Message oldRawMessage; - if (null == cur) { - data.remove(key); - oldRawMessage = rawMessages.remove(key); - msg.release(); - } else { - data.put(key, cur); - oldRawMessage = rawMessages.put(key, msg); + if (compactionStrategy != null) { + T prev = data.get(key); + if (!compactionStrategy.shouldKeepLeft(prev, cur)) { + log.info("Skipped the message from topic {}. key={} value={} prev={}", + conf.getTopicName(), + key, + cur, + prev); + compactionStrategy.handleSkippedMessage(key, cur); + return; + } } - if (oldRawMessage != null) { - oldRawMessage.release(); - } + try { + listenersMutex.lock(); + if (null == cur) { + data.remove(key); + } else { + data.put(key, cur); + } - for (BiConsumer listener : listeners) { - try { - listener.accept(key, cur); - } catch (Throwable t) { - log.error("Table view listener raised an exception", t); + for (BiConsumer listener : listeners) { + try { + listener.accept(key, cur); + } catch (Throwable t) { + log.error("Table view listener raised an exception", t); + } } + } finally { + listenersMutex.unlock(); } } finally { - listenersMutex.unlock(); + msg.release(); + checkAllFreshTask(msg); } - - checkAllFreshTask(msg); } @Override From d80eb49e83e3d099d4608257800afe9bddae1d83 Mon Sep 17 00:00:00 2001 From: namest504 Date: Mon, 20 Oct 2025 13:15:17 +0900 Subject: [PATCH 4/7] [improve][api] Add Builder Methods to Create Message-based TableView Signed-off-by: namest504 --- .../pulsar/client/api/TableViewBuilder.java | 41 ++ .../client/impl/AbstractTableViewImpl.java | 394 +++++++++++++++++ .../impl/MessageMapperTableViewImpl.java | 48 +++ .../client/impl/MessageTableViewImpl.java | 380 +---------------- .../client/impl/TableViewBuilderImpl.java | 17 +- .../pulsar/client/impl/TableViewImpl.java | 398 +----------------- 6 files changed, 512 insertions(+), 766 deletions(-) create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractTableViewImpl.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageMapperTableViewImpl.java diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java index 18252f4179b2d..64244a241d6cb 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java @@ -21,6 +21,8 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; + import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -97,6 +99,45 @@ public interface TableViewBuilder { */ CompletableFuture>> createForMessagesAsync(); + /** + * Creates a {@link TableView} instance where the values are the result of applying a user-defined + * `mapper` function to each message. + * + *

This provides a flexible way to create a key-value view over a topic, allowing users to extract data + * from the message payload, properties, and other metadata into a custom object {@code V}. + * + *

To get a view of the full {@link Message} objects, {@code java.util.function.Function.identity()} + * can be used as the mapper. + * + *

If the `mapper` function returns `null`, it is treated as a tombstone message, and the + * corresponding key will be removed from the `TableView`. + * + * @param mapper A function that takes a {@link Message} and returns a custom object of type {@code V}. + * @param The type of the value in the TableView. + * @return the {@link TableView} instance + * @throws PulsarClientException if the tableView creation fails + */ + TableView createMapped(Function, V> mapper) throws PulsarClientException; + + /** + * Creates a {@link TableView} instance in asynchronous mode where the values are the result of applying + * a user-defined `mapper` function to each message. + * + *

This provides a flexible way to create a key-value view over a topic, allowing users to extract data + * from the message payload, properties, and other metadata into a custom object {@code V}. + * + *

To get a view of the full {@link Message} objects, {@code java.util.function.Function.identity()} + * can be used as the mapper. + * + *

If the `mapper` function returns `null`, it is treated as a tombstone message, and the + * corresponding key will be removed from the `TableView`. + * + * @param mapper A function that takes a {@link Message} and returns a custom object of type {@code V}. + * @param The type of the value in the TableView. + * @return a future that can be used to access the {@link TableView} instance when it's ready + */ + CompletableFuture> createMappedAsync(Function, V> mapper); + /** * Set the topic name of the {@link TableView}. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractTableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractTableViewImpl.java new file mode 100644 index 0000000000000..011a42a7a2677 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractTableViewImpl.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; +import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.CryptoKeyReader; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TableView; +import org.apache.pulsar.client.api.TopicMessageId; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.topics.TopicCompactionStrategy; + +@Slf4j +abstract class AbstractTableViewImpl implements TableView { + private final TableViewConfigurationData conf; + private final ConcurrentMap data; + private final Map immutableData; + private final CompletableFuture> reader; + private final List> listeners; + private final ReentrantLock listenersMutex; + private final boolean isPersistentTopic; + private TopicCompactionStrategy compactionStrategy; + /** + * Store the refresh tasks. When read to the position recording in the right map, + * then remove the position in the right map. If the right map is empty, complete the future in the left. + * There should be no timeout exception here, because the caller can only retry for TimeoutException. + * It will only be completed exceptionally when no more messages can be read. + */ + private final ConcurrentHashMap, Map> pendingRefreshRequests; + /** + * This map stored the read position of each partition. It is used for the following case: + *

+ * 1. Get last message ID. + * 2. Receive message p1-1:1, p2-1:1, p2-1:2, p3-1:1 + * 3. Receive response of step1 {|p1-1:1|p2-2:2|p3-3:6|} + * 4. No more messages are written to this topic. + * As a result, the refresh operation will never be completed. + *

+ */ + private final ConcurrentHashMap lastReadPositions; + AbstractTableViewImpl(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) { + this.conf = conf; + this.isPersistentTopic = conf.getTopicName().startsWith(TopicDomain.persistent.toString()); + this.data = new ConcurrentHashMap<>(); + this.immutableData = Collections.unmodifiableMap(data); + this.listeners = new ArrayList<>(); + this.listenersMutex = new ReentrantLock(); + this.compactionStrategy = + TopicCompactionStrategy.load(TABLE_VIEW_TAG, conf.getTopicCompactionStrategyClassName()); + this.pendingRefreshRequests = new ConcurrentHashMap<>(); + this.lastReadPositions = new ConcurrentHashMap<>(); + ReaderBuilder readerBuilder = client.newReader(schema) + .topic(conf.getTopicName()) + .startMessageId(MessageId.earliest) + .autoUpdatePartitions(true) + .autoUpdatePartitionsInterval((int) conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS) + .poolMessages(true) + .subscriptionName(conf.getSubscriptionName()); + if (isPersistentTopic) { + readerBuilder.readCompacted(true); + } + CryptoKeyReader cryptoKeyReader = conf.getCryptoKeyReader(); + if (cryptoKeyReader != null) { + readerBuilder.cryptoKeyReader(cryptoKeyReader); + } + readerBuilder.cryptoFailureAction(conf.getCryptoFailureAction()); + this.reader = readerBuilder.createAsync(); + } + CompletableFuture> start() { + return reader.thenCompose((reader) -> { + if (!isPersistentTopic) { + readTailMessages(reader); + return CompletableFuture.completedFuture(null); + } + return this.readAllExistingMessages(reader) + .thenRun(() -> readTailMessages(reader)); + }).thenApply(__ -> this); + } + @Override + public int size() { + return data.size(); + } + @Override + public boolean isEmpty() { + return data.isEmpty(); + } + @Override + public boolean containsKey(String key) { + return data.containsKey(key); + } + @Override + public V get(String key) { + return data.get(key); + } + @Override + public Set> entrySet() { + return immutableData.entrySet(); + } + @Override + public Set keySet() { + return immutableData.keySet(); + } + @Override + public Collection values() { + return immutableData.values(); + } + @Override + public void forEach(BiConsumer action) { + data.forEach(action); + } + @Override + public void listen(BiConsumer action) { + try { + listenersMutex.lock(); + listeners.add(action); + } finally { + listenersMutex.unlock(); + } + } + @Override + public void forEachAndListen(BiConsumer action) { + // Ensure we iterate over all the existing entry _and_ start the listening from the exact next message + try { + listenersMutex.lock(); + // Execute the action over existing entries + forEach(action); + listeners.add(action); + } finally { + listenersMutex.unlock(); + } + } + @Override + public CompletableFuture closeAsync() { + return reader.thenCompose(Reader::closeAsync); + } + @Override + public void close() throws PulsarClientException { + try { + closeAsync().get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + } + private void handleMessage(Message msg) { + lastReadPositions.put(msg.getTopicName(), msg.getMessageId()); + try { + if (msg.hasKey()) { + String key = msg.getKey(); + V cur = getValueIfPresent(msg); + if (log.isDebugEnabled()) { + log.debug("Applying message from topic {}. key={} value={}", + conf.getTopicName(), + key, + cur); + } + boolean update = true; + if (compactionStrategy != null) { + V prev = data.get(key); + update = !compactionStrategy.shouldKeepLeft(prev, cur); + if (!update) { + log.info("Skipped the message from topic {}. key={} value={} prev={}", + conf.getTopicName(), + key, + cur, + prev); + compactionStrategy.handleSkippedMessage(key, cur); + } + } + if (update) { + try { + listenersMutex.lock(); + if (null == cur) { + data.remove(key); + } else { + data.put(key, cur); + } + for (BiConsumer listener : listeners) { + try { + listener.accept(key, cur); + } catch (Throwable t) { + log.error("Table view listener raised an exception", t); + } + } + } finally { + listenersMutex.unlock(); + } + } + } + checkAllFreshTask(msg); + } finally { + maybeReleaseMessage(msg); + } + } + protected void maybeReleaseMessage(Message msg) { + msg.release(); + } + @SuppressWarnings("unchecked") + protected V getValueIfPresent(Message msg) { + return msg.size() > 0 ? (V) msg.getValue() : null; + } + @Override + public CompletableFuture refreshAsync() { + CompletableFuture completableFuture = new CompletableFuture<>(); + reader.thenCompose(reader -> getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> { + if (lastMessageIds.isEmpty()) { + completableFuture.complete(null); + return; + } + // After get the response of lastMessageIds, put the future and result into `refreshMap` + // and then filter out partitions that has been read to the lastMessageID. + pendingRefreshRequests.put(completableFuture, lastMessageIds); + filterReceivedMessages(lastMessageIds); + // If there is no new messages, the refresh operation could be completed right now. + if (lastMessageIds.isEmpty()) { + pendingRefreshRequests.remove(completableFuture); + completableFuture.complete(null); + } + })).exceptionally(throwable -> { + completableFuture.completeExceptionally(throwable); + pendingRefreshRequests.remove(completableFuture); + return null; + }); + return completableFuture; + } + @Override + public void refresh() throws PulsarClientException { + try { + refreshAsync().get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + } + private CompletableFuture readAllExistingMessages(Reader reader) { + long startTime = System.nanoTime(); + AtomicLong messagesRead = new AtomicLong(); + CompletableFuture future = new CompletableFuture<>(); + getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> { + if (lastMessageIds.isEmpty()) { + future.complete(null); + return; + } + readAllExistingMessages(reader, future, startTime, messagesRead, lastMessageIds); + }).exceptionally(ex -> { + future.completeExceptionally(ex); + return null; + }); + return future; + } + private CompletableFuture> getLastMessageIdOfNonEmptyTopics(Reader reader) { + return reader.getLastMessageIdsAsync().thenApply(lastMessageIds -> { + Map lastMessageIdMap = new ConcurrentHashMap<>(); + lastMessageIds.forEach(topicMessageId -> { + if (((MessageIdAdv) topicMessageId).getEntryId() >= 0) { + lastMessageIdMap.put(topicMessageId.getOwnerTopic(), topicMessageId); + } // else: a negative entry id represents an empty topic so that we don't have to read messages from it + }); + return lastMessageIdMap; + }); + } + private void filterReceivedMessages(Map lastMessageIds) { + // The `lastMessageIds` and `readPositions` is concurrency-safe data types. + lastMessageIds.forEach((partition, lastMessageId) -> { + MessageId messageId = lastReadPositions.get(partition); + if (messageId != null && lastMessageId.compareTo(messageId) <= 0) { + lastMessageIds.remove(partition); + } + }); + } + private boolean checkFreshTask(Map maxMessageIds, CompletableFuture future, + MessageId messageId, String topicName) { + // The message received from multi-consumer/multi-reader is processed to TopicMessageImpl. + TopicMessageId maxMessageId = maxMessageIds.get(topicName); + // We need remove the partition from the maxMessageIds map + // once the partition has been read completely. + if (maxMessageId != null && messageId.compareTo(maxMessageId) >= 0) { + maxMessageIds.remove(topicName); + } + if (maxMessageIds.isEmpty()) { + future.complete(null); + return true; + } else { + return false; + } + } + private void checkAllFreshTask(Message msg) { + pendingRefreshRequests.forEach((future, maxMessageIds) -> { + String topicName = msg.getTopicName(); + MessageId messageId = msg.getMessageId(); + if (checkFreshTask(maxMessageIds, future, messageId, topicName)) { + pendingRefreshRequests.remove(future); + } + }); + } + private void readAllExistingMessages(Reader reader, CompletableFuture future, long startTime, + AtomicLong messagesRead, Map maxMessageIds) { + reader.hasMessageAvailableAsync() + .thenAccept(hasMessage -> { + if (hasMessage) { + reader.readNextAsync() + .thenAccept(msg -> { + messagesRead.incrementAndGet(); + String topicName = msg.getTopicName(); + MessageId messageId = msg.getMessageId(); + handleMessage(msg); + if (!checkFreshTask(maxMessageIds, future, messageId, topicName)) { + readAllExistingMessages(reader, future, startTime, + messagesRead, maxMessageIds); + } + }).exceptionally(ex -> { + if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { + log.info("Reader {} was closed while reading existing messages.", + reader.getTopic()); + } else { + log.warn("Reader {} was interrupted while reading existing messages. ", + reader.getTopic(), ex); + } + future.completeExceptionally(ex); + return null; + }); + } else { + // Reached the end + long endTime = System.nanoTime(); + long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime); + log.info("Started table view for topic {} - Replayed {} messages in {} seconds", + reader.getTopic(), + messagesRead, + durationMillis / 1000.0); + future.complete(null); + } + }); + } + private void readTailMessages(Reader reader) { + reader.readNextAsync() + .thenAccept(msg -> { + handleMessage(msg); + readTailMessages(reader); + }).exceptionally(ex -> { + if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { + log.info("Reader {} was closed while reading tail messages.", reader.getTopic()); + // Fail all refresh request when no more messages can be read. + pendingRefreshRequests.keySet().forEach(future -> { + pendingRefreshRequests.remove(future); + future.completeExceptionally(ex); + }); + } else { + // Retrying on the other exceptions such as NotConnectedException + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + log.warn("Reader {} was interrupted while reading tail messages. " + + "Retrying..", reader.getTopic(), ex); + readTailMessages(reader); + } + return null; + }); + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageMapperTableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageMapperTableViewImpl.java new file mode 100644 index 0000000000000..890135ab728b9 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageMapperTableViewImpl.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; + +@Slf4j +public class MessageMapperTableViewImpl extends AbstractTableViewImpl { + + private final Function, V> mapper; + + MessageMapperTableViewImpl(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf, + Function, V> mapper) { + super(client, schema, conf); + this.mapper = mapper; + } + + @Override + protected void maybeReleaseMessage(Message msg) { + // The message is passed to the user-defined mapper function. + // The user is responsible for releasing the message if needed. + // To be safe, we don't release the message here. + } + + @Override + protected V getValueIfPresent(Message msg) { + return msg.size() > 0 ? mapper.apply(msg) : null; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableViewImpl.java index 5d6c7c1c14f1d..d5893ce1890bf 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableViewImpl.java @@ -18,388 +18,26 @@ */ package org.apache.pulsar.client.impl; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BiConsumer; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.TableView; -import org.apache.pulsar.client.api.TopicMessageId; -import org.apache.pulsar.common.naming.TopicDomain; -import org.apache.pulsar.common.topics.TopicCompactionStrategy; - -import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG; @Slf4j -class MessageTableViewImpl implements TableView> { - - private final TableViewConfigurationData conf; - - private final ConcurrentMap> data; - private final Map> immutableData; - - private final CompletableFuture> reader; - - private final List>> listeners; - private final ReentrantLock listenersMutex; - private final boolean isPersistentTopic; - private TopicCompactionStrategy> compactionStrategy; - - private final ConcurrentHashMap, Map> pendingRefreshRequests; - - private final ConcurrentHashMap lastReadPositions; - +public class MessageTableViewImpl extends AbstractTableViewImpl> { MessageTableViewImpl(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) { - this.conf = conf; - this.isPersistentTopic = conf.getTopicName().startsWith(TopicDomain.persistent.toString()); - this.data = new ConcurrentHashMap<>(); - this.immutableData = Collections.unmodifiableMap(data); - this.listeners = new ArrayList<>(); - this.listenersMutex = new ReentrantLock(); - this.compactionStrategy = - TopicCompactionStrategy.load(TABLE_VIEW_TAG, conf.getTopicCompactionStrategyClassName()); - this.pendingRefreshRequests = new ConcurrentHashMap<>(); - this.lastReadPositions = new ConcurrentHashMap<>(); - ReaderBuilder readerBuilder = client.newReader(schema) - .topic(conf.getTopicName()) - .startMessageId(MessageId.earliest) - .autoUpdatePartitions(true) - .autoUpdatePartitionsInterval((int) conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS) - .poolMessages(true) - .subscriptionName(conf.getSubscriptionName()); - if (isPersistentTopic) { - readerBuilder.readCompacted(true); - } - - CryptoKeyReader cryptoKeyReader = conf.getCryptoKeyReader(); - if (cryptoKeyReader != null) { - readerBuilder.cryptoKeyReader(cryptoKeyReader); - } - - readerBuilder.cryptoFailureAction(conf.getCryptoFailureAction()); - - this.reader = readerBuilder.createAsync(); - } - - CompletableFuture>> start() { - return reader.thenCompose((reader) -> { - if (!isPersistentTopic) { - readTailMessages(reader); - return CompletableFuture.completedFuture(null); - } - return this.readAllExistingMessages(reader) - .thenRun(() -> readTailMessages(reader)); - }).thenApply(__ -> this); - } - - @Override - public int size() { - return data.size(); - } - - @Override - public boolean isEmpty() { - return data.isEmpty(); - } - - @Override - public boolean containsKey(String key) { - return data.containsKey(key); - } - - @Override - public Message get(String key) { - return data.get(key); - } - - - @Override - public Set>> entrySet() { - return immutableData.entrySet(); - } - - @Override - public Set keySet() { - return immutableData.keySet(); - } - - @Override - public Collection> values() { - return immutableData.values(); - } - - @Override - public void forEach(BiConsumer> action) { - data.forEach(action); + super(client, schema, conf); } @Override - public void listen(BiConsumer> action) { - try { - listenersMutex.lock(); - listeners.add(action); - } finally { - listenersMutex.unlock(); - } + protected void maybeReleaseMessage(Message msg) { + // don't release the message. Pooling of messages might have to be disabled in the client when using + // MessageTableViewImpl. } @Override - public void forEachAndListen(BiConsumer> action) { - try { - listenersMutex.lock(); - forEach(action); - listeners.add(action); - } finally { - listenersMutex.unlock(); - } - } - - @Override - public CompletableFuture closeAsync() { - return reader.thenCompose(Reader::closeAsync); - } - - @Override - public void close() throws PulsarClientException { - try { - closeAsync().get(); - } catch (Exception e) { - throw PulsarClientException.unwrap(e); - } - } - - private void handleMessage(Message msg) { - lastReadPositions.put(msg.getTopicName(), msg.getMessageId()); - - if (!msg.hasKey()) { - log.warn("Received message with no key, releasing."); - msg.release(); - checkAllFreshTask(msg); - return; - } - - String key = msg.getKey(); - - Message cur = msg.size() > 0 ? msg : null; - - if (log.isDebugEnabled()) { - log.debug("Applying message from topic {}. key={} value={}", - conf.getTopicName(), - key, - cur); - } - - boolean update = true; - if (compactionStrategy != null) { - Message prev = data.get(key); - update = !compactionStrategy.shouldKeepLeft(prev, cur); - if (!update) { - log.info("Skipped the message from topic {}. key={} value={} prev={}", - conf.getTopicName(), - key, - cur, - prev); - compactionStrategy.handleSkippedMessage(key, cur); - msg.release(); - checkAllFreshTask(msg); - return; - } - } - - try { - listenersMutex.lock(); - if (null == cur) { - data.remove(key); - msg.release(); - } else { - data.put(key, cur); - } - - for (BiConsumer> listener : listeners) { - try { - listener.accept(key, cur); - } catch (Throwable t) { - log.error("Table view listener raised an exception", t); - } - } - } finally { - listenersMutex.unlock(); - } - - checkAllFreshTask(msg); - } - - @Override - public CompletableFuture refreshAsync() { - CompletableFuture completableFuture = new CompletableFuture<>(); - reader.thenCompose(reader -> getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> { - if (lastMessageIds.isEmpty()) { - completableFuture.complete(null); - return; - } - pendingRefreshRequests.put(completableFuture, lastMessageIds); - filterReceivedMessages(lastMessageIds); - if (lastMessageIds.isEmpty()) { - pendingRefreshRequests.remove(completableFuture); - completableFuture.complete(null); - } - })).exceptionally(throwable -> { - completableFuture.completeExceptionally(throwable); - pendingRefreshRequests.remove(completableFuture); - return null; - }); - return completableFuture; - } - - @Override - public void refresh() throws PulsarClientException { - try { - refreshAsync().get(); - } catch (Exception e) { - throw PulsarClientException.unwrap(e); - } - } - - private CompletableFuture readAllExistingMessages(Reader reader) { - long startTime = System.nanoTime(); - AtomicLong messagesRead = new AtomicLong(); - - CompletableFuture future = new CompletableFuture<>(); - getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> { - if (lastMessageIds.isEmpty()) { - future.complete(null); - return; - } - readAllExistingMessages(reader, future, startTime, messagesRead, lastMessageIds); - }).exceptionally(ex -> { - future.completeExceptionally(ex); - return null; - }); - return future; - } - - private CompletableFuture> getLastMessageIdOfNonEmptyTopics(Reader reader) { - return reader.getLastMessageIdsAsync().thenApply(lastMessageIds -> { - Map lastMessageIdMap = new ConcurrentHashMap<>(); - lastMessageIds.forEach(topicMessageId -> { - if (((MessageIdAdv) topicMessageId).getEntryId() >= 0) { - lastMessageIdMap.put(topicMessageId.getOwnerTopic(), topicMessageId); - } - }); - return lastMessageIdMap; - }); - } - - private void filterReceivedMessages(Map lastMessageIds) { - lastMessageIds.forEach((partition, lastMessageId) -> { - MessageId messageId = lastReadPositions.get(partition); - if (messageId != null && lastMessageId.compareTo(messageId) <= 0) { - lastMessageIds.remove(partition); - } - }); - } - - private boolean checkFreshTask(Map maxMessageIds, CompletableFuture future, - MessageId messageId, String topicName) { - TopicMessageId maxMessageId = maxMessageIds.get(topicName); - if (maxMessageId != null && messageId.compareTo(maxMessageId) >= 0) { - maxMessageIds.remove(topicName); - } - if (maxMessageIds.isEmpty()) { - future.complete(null); - return true; - } else { - return false; - } - } - - private void checkAllFreshTask(Message msg) { - pendingRefreshRequests.forEach((future, maxMessageIds) -> { - String topicName = msg.getTopicName(); - MessageId messageId = msg.getMessageId(); - if (checkFreshTask(maxMessageIds, future, messageId, topicName)) { - pendingRefreshRequests.remove(future); - } - }); - } - - private void readAllExistingMessages(Reader reader, CompletableFuture future, long startTime, - AtomicLong messagesRead, Map maxMessageIds) { - reader.hasMessageAvailableAsync() - .thenAccept(hasMessage -> { - if (hasMessage) { - reader.readNextAsync() - .thenAccept(msg -> { - messagesRead.incrementAndGet(); - String topicName = msg.getTopicName(); - MessageId messageId = msg.getMessageId(); - handleMessage(msg); - if (!checkFreshTask(maxMessageIds, future, messageId, topicName)) { - readAllExistingMessages(reader, future, startTime, - messagesRead, maxMessageIds); - } - }).exceptionally(ex -> { - if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { - log.info("Reader {} was closed while reading existing messages.", - reader.getTopic()); - } else { - log.warn("Reader {} was interrupted while reading existing messages. ", - reader.getTopic(), ex); - } - future.completeExceptionally(ex); - return null; - }); - } else { - long endTime = System.nanoTime(); - long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime); - log.info("Started table view for topic {} - Replayed {} messages in {} seconds", - reader.getTopic(), - messagesRead, - durationMillis / 1000.0); - future.complete(null); - } - }); - } - - private void readTailMessages(Reader reader) { - reader.readNextAsync() - .thenAccept(msg -> { - handleMessage(msg); - readTailMessages(reader); - }).exceptionally(ex -> { - if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { - log.info("Reader {} was closed while reading tail messages.", reader.getTopic()); - pendingRefreshRequests.keySet().forEach(future -> { - pendingRefreshRequests.remove(future); - future.completeExceptionally(ex); - }); - } else { - try { - Thread.sleep(50); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - log.warn("Reader {} was interrupted while reading tail messages. " - + "Retrying..", reader.getTopic(), ex); - readTailMessages(reader); - } - return null; - }); + protected Message getValueIfPresent(Message msg) { + // return the message as the value for the table view + // if the payload is empty, the message is considered a tombstone message and it won't be preserved in the map + return msg.size() > 0 ? msg : null; } } \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java index 31967df1c3ffb..2f6ae3d973214 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import lombok.NonNull; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.*; @@ -74,6 +75,20 @@ public CompletableFuture>> createForMessagesAsync() { return new MessageTableViewImpl<>(client, schema, conf).start(); } + @Override + public TableView createMapped(Function, V> mapper) throws PulsarClientException { + try { + return createMappedAsync(mapper).get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + } + + @Override + public CompletableFuture> createMappedAsync(Function, V> mapper) { + return new MessageMapperTableViewImpl(client, schema, conf, mapper).start(); + } + @Override public TableViewBuilder topic(String topic) { checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank"); @@ -118,4 +133,4 @@ public TableViewBuilder cryptoFailureAction(ConsumerCryptoFailureAction actio conf.setCryptoFailureAction(action); return this; } -} +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index 7a44d2b538e6c..2a300b09608c7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -18,402 +18,12 @@ */ package org.apache.pulsar.client.impl; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.*; -import org.apache.pulsar.common.naming.TopicDomain; -import org.apache.pulsar.common.topics.TopicCompactionStrategy; +import org.apache.pulsar.client.api.Schema; -import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BiConsumer; - -import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG; - -@Slf4j -public class TableViewImpl implements TableView { - - private final TableViewConfigurationData conf; - - private final ConcurrentMap data; - - private final Map immutableData; - - private final CompletableFuture> reader; - - private final List> listeners; - private final ReentrantLock listenersMutex; - private final boolean isPersistentTopic; - private TopicCompactionStrategy compactionStrategy; - - /** - * Store the refresh tasks. When read to the position recording in the right map, - * then remove the position in the right map. If the right map is empty, complete the future in the left. - * There should be no timeout exception here, because the caller can only retry for TimeoutException. - * It will only be completed exceptionally when no more messages can be read. - */ - private final ConcurrentHashMap, Map> pendingRefreshRequests; - - /** - * This map stored the read position of each partition. It is used for the following case: - *

- * 1. Get last message ID. - * 2. Receive message p1-1:1, p2-1:1, p2-1:2, p3-1:1 - * 3. Receive response of step1 {|p1-1:1|p2-2:2|p3-3:6|} - * 4. No more messages are written to this topic. - * As a result, the refresh operation will never be completed. - *

- */ - private final ConcurrentHashMap lastReadPositions; +public class TableViewImpl extends AbstractTableViewImpl { TableViewImpl(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) { - this.conf = conf; - this.isPersistentTopic = conf.getTopicName().startsWith(TopicDomain.persistent.toString()); - this.data = new ConcurrentHashMap<>(); - this.immutableData = Collections.unmodifiableMap(data); - this.listeners = new ArrayList<>(); - this.listenersMutex = new ReentrantLock(); - this.compactionStrategy = - TopicCompactionStrategy.load(TABLE_VIEW_TAG, conf.getTopicCompactionStrategyClassName()); - this.pendingRefreshRequests = new ConcurrentHashMap<>(); - this.lastReadPositions = new ConcurrentHashMap<>(); - ReaderBuilder readerBuilder = client.newReader(schema) - .topic(conf.getTopicName()) - .startMessageId(MessageId.earliest) - .autoUpdatePartitions(true) - .autoUpdatePartitionsInterval((int) conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS) - .poolMessages(true) - .subscriptionName(conf.getSubscriptionName()); - if (isPersistentTopic) { - readerBuilder.readCompacted(true); - } - - CryptoKeyReader cryptoKeyReader = conf.getCryptoKeyReader(); - if (cryptoKeyReader != null) { - readerBuilder.cryptoKeyReader(cryptoKeyReader); - } - - readerBuilder.cryptoFailureAction(conf.getCryptoFailureAction()); - - this.reader = readerBuilder.createAsync(); - } - - CompletableFuture> start() { - return reader.thenCompose((reader) -> { - if (!isPersistentTopic) { - readTailMessages(reader); - return CompletableFuture.completedFuture(null); - } - return this.readAllExistingMessages(reader) - .thenRun(() -> readTailMessages(reader)); - }).thenApply(__ -> this); - } - - @Override - public int size() { - return data.size(); - } - - @Override - public boolean isEmpty() { - return data.isEmpty(); - } - - @Override - public boolean containsKey(String key) { - return data.containsKey(key); - } - - @Override - public T get(String key) { - return data.get(key); + super(client, schema, conf); } - - @Override - public Set> entrySet() { - return immutableData.entrySet(); - } - - @Override - public Set keySet() { - return immutableData.keySet(); - } - - @Override - public Collection values() { - return immutableData.values(); - } - - @Override - public void forEach(BiConsumer action) { - data.forEach(action); - } - - @Override - public void listen(BiConsumer action) { - try { - listenersMutex.lock(); - listeners.add(action); - } finally { - listenersMutex.unlock(); - } - } - - @Override - public void forEachAndListen(BiConsumer action) { - // Ensure we iterate over all the existing entry _and_ start the listening from the exact next message - try { - listenersMutex.lock(); - - // Execute the action over existing entries - forEach(action); - - listeners.add(action); - } finally { - listenersMutex.unlock(); - } - } - - @Override - public CompletableFuture closeAsync() { - return reader.thenCompose(Reader::closeAsync); - } - - @Override - public void close() throws PulsarClientException { - try { - closeAsync().get(); - } catch (Exception e) { - throw PulsarClientException.unwrap(e); - } - } - - private void handleMessage(Message msg) { - lastReadPositions.put(msg.getTopicName(), msg.getMessageId()); - - try { - if (!msg.hasKey()) { - log.warn("Received message with no key."); - return; - } - - String key = msg.getKey(); - T cur = msg.size() > 0 ? msg.getValue() : null; - - if (log.isDebugEnabled()) { - log.debug("Applying message from topic {}. key={} value={}", - conf.getTopicName(), - key, - cur); - } - - if (compactionStrategy != null) { - T prev = data.get(key); - if (!compactionStrategy.shouldKeepLeft(prev, cur)) { - log.info("Skipped the message from topic {}. key={} value={} prev={}", - conf.getTopicName(), - key, - cur, - prev); - compactionStrategy.handleSkippedMessage(key, cur); - return; - } - } - - try { - listenersMutex.lock(); - if (null == cur) { - data.remove(key); - } else { - data.put(key, cur); - } - - for (BiConsumer listener : listeners) { - try { - listener.accept(key, cur); - } catch (Throwable t) { - log.error("Table view listener raised an exception", t); - } - } - } finally { - listenersMutex.unlock(); - } - } finally { - msg.release(); - checkAllFreshTask(msg); - } - } - - @Override - public CompletableFuture refreshAsync() { - CompletableFuture completableFuture = new CompletableFuture<>(); - reader.thenCompose(reader -> getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> { - if (lastMessageIds.isEmpty()) { - completableFuture.complete(null); - return; - } - // After get the response of lastMessageIds, put the future and result into `refreshMap` - // and then filter out partitions that has been read to the lastMessageID. - pendingRefreshRequests.put(completableFuture, lastMessageIds); - filterReceivedMessages(lastMessageIds); - // If there is no new messages, the refresh operation could be completed right now. - if (lastMessageIds.isEmpty()) { - pendingRefreshRequests.remove(completableFuture); - completableFuture.complete(null); - } - })).exceptionally(throwable -> { - completableFuture.completeExceptionally(throwable); - pendingRefreshRequests.remove(completableFuture); - return null; - }); - return completableFuture; - } - - @Override - public void refresh() throws PulsarClientException { - try { - refreshAsync().get(); - } catch (Exception e) { - throw PulsarClientException.unwrap(e); - } - } - - private CompletableFuture readAllExistingMessages(Reader reader) { - long startTime = System.nanoTime(); - AtomicLong messagesRead = new AtomicLong(); - - CompletableFuture future = new CompletableFuture<>(); - getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> { - if (lastMessageIds.isEmpty()) { - future.complete(null); - return; - } - readAllExistingMessages(reader, future, startTime, messagesRead, lastMessageIds); - }).exceptionally(ex -> { - future.completeExceptionally(ex); - return null; - }); - return future; - } - - private CompletableFuture> getLastMessageIdOfNonEmptyTopics(Reader reader) { - return reader.getLastMessageIdsAsync().thenApply(lastMessageIds -> { - Map lastMessageIdMap = new ConcurrentHashMap<>(); - lastMessageIds.forEach(topicMessageId -> { - if (((MessageIdAdv) topicMessageId).getEntryId() >= 0) { - lastMessageIdMap.put(topicMessageId.getOwnerTopic(), topicMessageId); - } // else: a negative entry id represents an empty topic so that we don't have to read messages from it - }); - return lastMessageIdMap; - }); - } - - private void filterReceivedMessages(Map lastMessageIds) { - // The `lastMessageIds` and `readPositions` is concurrency-safe data types. - lastMessageIds.forEach((partition, lastMessageId) -> { - MessageId messageId = lastReadPositions.get(partition); - if (messageId != null && lastMessageId.compareTo(messageId) <= 0) { - lastMessageIds.remove(partition); - } - }); - } - - private boolean checkFreshTask(Map maxMessageIds, CompletableFuture future, - MessageId messageId, String topicName) { - // The message received from multi-consumer/multi-reader is processed to TopicMessageImpl. - TopicMessageId maxMessageId = maxMessageIds.get(topicName); - // We need remove the partition from the maxMessageIds map - // once the partition has been read completely. - if (maxMessageId != null && messageId.compareTo(maxMessageId) >= 0) { - maxMessageIds.remove(topicName); - } - if (maxMessageIds.isEmpty()) { - future.complete(null); - return true; - } else { - return false; - } - } - - private void checkAllFreshTask(Message msg) { - pendingRefreshRequests.forEach((future, maxMessageIds) -> { - String topicName = msg.getTopicName(); - MessageId messageId = msg.getMessageId(); - if (checkFreshTask(maxMessageIds, future, messageId, topicName)) { - pendingRefreshRequests.remove(future); - } - }); - } - - private void readAllExistingMessages(Reader reader, CompletableFuture future, long startTime, - AtomicLong messagesRead, Map maxMessageIds) { - reader.hasMessageAvailableAsync() - .thenAccept(hasMessage -> { - if (hasMessage) { - reader.readNextAsync() - .thenAccept(msg -> { - messagesRead.incrementAndGet(); - String topicName = msg.getTopicName(); - MessageId messageId = msg.getMessageId(); - handleMessage(msg); - if (!checkFreshTask(maxMessageIds, future, messageId, topicName)) { - readAllExistingMessages(reader, future, startTime, - messagesRead, maxMessageIds); - } - }).exceptionally(ex -> { - if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { - log.info("Reader {} was closed while reading existing messages.", - reader.getTopic()); - } else { - log.warn("Reader {} was interrupted while reading existing messages. ", - reader.getTopic(), ex); - } - future.completeExceptionally(ex); - return null; - }); - } else { - // Reached the end - long endTime = System.nanoTime(); - long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime); - log.info("Started table view for topic {} - Replayed {} messages in {} seconds", - reader.getTopic(), - messagesRead, - durationMillis / 1000.0); - future.complete(null); - } - }); - } - - private void readTailMessages(Reader reader) { - reader.readNextAsync() - .thenAccept(msg -> { - handleMessage(msg); - readTailMessages(reader); - }).exceptionally(ex -> { - if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) { - log.info("Reader {} was closed while reading tail messages.", reader.getTopic()); - // Fail all refresh request when no more messages can be read. - pendingRefreshRequests.keySet().forEach(future -> { - pendingRefreshRequests.remove(future); - future.completeExceptionally(ex); - }); - } else { - // Retrying on the other exceptions such as NotConnectedException - try { - Thread.sleep(50); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - log.warn("Reader {} was interrupted while reading tail messages. " - + "Retrying..", reader.getTopic(), ex); - readTailMessages(reader); - } - return null; - }); - } -} +} \ No newline at end of file From 8ea6ba9b3b69f9937834145173fd28763ee014ac Mon Sep 17 00:00:00 2001 From: namest504 Date: Mon, 20 Oct 2025 13:15:25 +0900 Subject: [PATCH 5/7] [improve][api] Add Test Signed-off-by: namest504 --- .../pulsar/client/impl/TableViewTest.java | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java index f6c9507ea798a..ed72aab66ea16 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java @@ -658,4 +658,90 @@ public void testCreateForMessages() throws Exception { Assert.assertNull(missingMessage, "Message should be null for missing key"); } + @Test + public void testCreateMapped() throws Exception { + String topic = "persistent://public/default/testCreateMapped"; + admin.topics().createNonPartitionedTopic(topic); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + + @Cleanup + TableView tableView = pulsarClient.newTableViewBuilder(Schema.STRING) + .topic(topic) + .createMapped(m -> { + if (m.getValue().equals("delete-me")) { + return null; + } + return m.getValue() + ":" + m.getProperty("myProp"); + }); + + // Send a message to be mapped + String testKey = "key1"; + String testValue = "value1"; + producer.newMessage() + .key(testKey) + .value(testValue) + .property("myProp", "myValue") + .send(); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> tableView.size() == 1); + + String mappedValue = tableView.get(testKey); + assertEquals(mappedValue, "value1:myValue"); + + // Send another message to update the value + producer.newMessage() + .key(testKey) + .value("value2") + .property("myProp", "myValue2") + .send(); + + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> "value2:myValue2".equals(tableView.get(testKey))); + assertEquals(tableView.size(), 1); + + // Send a message that maps to null (tombstone) + producer.newMessage() + .key(testKey) + .value("delete-me") + .send(); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> tableView.size() == 0); + Assert.assertNull(tableView.get(testKey), "Value should be null after tombstone message"); + } + + @Test + public void testCreateMappedWithIdentityMapper() throws Exception { + String topic = "persistent://public/default/testCreateMappedWithIdentityMapper"; + admin.topics().createNonPartitionedTopic(topic); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + + String testKey = "key1"; + String testValue = "value1"; + producer.newMessage() + .key(testKey) + .value(testValue) + .property("myProp", "myValue") + .send(); + + @Cleanup + TableView> tableView = pulsarClient.newTableViewBuilder(Schema.STRING) + .topic(topic) + .createMapped(java.util.function.Function.identity()); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> tableView.size() == 1); + + Message message = tableView.get(testKey); + Assert.assertNotNull(message, "Message should not be null for key: " + testKey); + assertEquals(message.getKey(), testKey); + assertEquals(message.getValue(), testValue); + assertEquals(message.getProperty("myProp"), "myValue"); + + Message missingMessage = tableView.get("missingKey"); + Assert.assertNull(missingMessage, "Message should be null for missing key"); + } + } From 0fa647e29aea49970aa8dd267d3cad1a8b6b5aa1 Mon Sep 17 00:00:00 2001 From: namest504 Date: Mon, 20 Oct 2025 13:20:03 +0900 Subject: [PATCH 6/7] [improve][api] Rename AbstractTableView Signed-off-by: namest504 --- .../pulsar/client/impl/TableViewTest.java | 39 +++++++++---------- ...leViewImpl.java => AbstractTableView.java} | 4 +- ...wImpl.java => MessageMapperTableView.java} | 6 +-- ...bleViewImpl.java => MessageTableView.java} | 4 +- .../{TableViewImpl.java => TableView.java} | 4 +- .../client/impl/TableViewBuilderImpl.java | 18 ++++----- .../pulsar/client/impl/TableViewImplTest.java | 3 +- 7 files changed, 38 insertions(+), 40 deletions(-) rename pulsar-client/src/main/java/org/apache/pulsar/client/impl/{AbstractTableViewImpl.java => AbstractTableView.java} (99%) rename pulsar-client/src/main/java/org/apache/pulsar/client/impl/{MessageMapperTableViewImpl.java => MessageMapperTableView.java} (85%) rename pulsar-client/src/main/java/org/apache/pulsar/client/impl/{MessageTableViewImpl.java => MessageTableView.java} (89%) rename pulsar-client/src/main/java/org/apache/pulsar/client/impl/{TableViewImpl.java => TableView.java} (85%) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java index ed72aab66ea16..fb7c846e1619b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java @@ -53,7 +53,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.TableView; import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -67,7 +66,7 @@ import org.testng.annotations.Test; /** - * Unit test for {@link org.apache.pulsar.client.impl.TableViewImpl}. + * Unit test for {@link TableView}. */ @Slf4j @Test(groups = "broker-impl") @@ -169,7 +168,7 @@ public void testRefreshAPI(int partition) throws Exception { } @Cleanup - TableView tv = pulsarClient.newTableView(Schema.BYTES) + org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableView(Schema.BYTES) .topic(topic) .create(); // Verify refresh can handle the case when the topic is empty @@ -225,11 +224,11 @@ public void testRefreshTaskCanBeCompletedWhenReaderClosed() throws Exception { String topic2 = "persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-2"; admin.topics().createNonPartitionedTopic(topic2); @Cleanup - TableView tv1 = pulsarClient.newTableView(Schema.BYTES) + org.apache.pulsar.client.api.TableView tv1 = pulsarClient.newTableView(Schema.BYTES) .topic(topic1) .create(); @Cleanup - TableView tv2 = pulsarClient.newTableView(Schema.BYTES) + org.apache.pulsar.client.api.TableView tv2 = pulsarClient.newTableView(Schema.BYTES) .topic(topic1) .create(); // 2. Slow down the rate of reading messages. @@ -269,7 +268,7 @@ public void testTableView() throws Exception { int count = 20; Set keys = this.publishMessages(topic, count, false); @Cleanup - TableView tv = pulsarClient.newTableViewBuilder(Schema.BYTES) + org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableViewBuilder(Schema.BYTES) .topic(topic) .autoUpdatePartitionsInterval(60, TimeUnit.SECONDS) .create(); @@ -316,7 +315,7 @@ public void testNewTableView() throws Exception { admin.topics().createPartitionedTopic(topic, 2); Set keys = this.publishMessages(topic, 10, false); @Cleanup - TableView tv = pulsarClient.newTableView() + org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableView() .topic(topic) .autoUpdatePartitionsInterval(60, TimeUnit.SECONDS) .create(); @@ -336,7 +335,7 @@ public void testTableViewUpdatePartitions(String topicDomain) throws Exception { // For non-persistent topic, this keys will never be received. Set keys = this.publishMessages(topic, count, false); @Cleanup - TableView tv = pulsarClient.newTableViewBuilder(Schema.BYTES) + org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableViewBuilder(Schema.BYTES) .topic(topic) .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS) .create(); @@ -374,7 +373,7 @@ public void testPublishNullValue(String topicDomain) throws Exception { String topic = topicDomain + "://public/default/tableview-test-publish-null-value"; admin.topics().createPartitionedTopic(topic, 3); - final TableView tv = pulsarClient.newTableViewBuilder(Schema.STRING) + final org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableViewBuilder(Schema.STRING) .topic(topic) .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS) .create(); @@ -397,7 +396,7 @@ public void testPublishNullValue(String topicDomain) throws Exception { tv.close(); @Cleanup - TableView tv1 = pulsarClient.newTableView(Schema.STRING) + org.apache.pulsar.client.api.TableView tv1 = pulsarClient.newTableView(Schema.STRING) .topic(topic) .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS) .create(); @@ -427,7 +426,7 @@ public void testAck(boolean partitionedTopic) throws Exception { } @Cleanup - TableView tv1 = pulsarClient.newTableViewBuilder(Schema.STRING) + org.apache.pulsar.client.api.TableView tv1 = pulsarClient.newTableViewBuilder(Schema.STRING) .topic(topic) .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS) .create(); @@ -476,7 +475,7 @@ public void testListen() throws Exception { } @Cleanup - TableView tv = pulsarClient.newTableViewBuilder(Schema.STRING) + org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableViewBuilder(Schema.STRING) .topic(topic) .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS) .create(); @@ -521,7 +520,7 @@ public void testTableViewWithEncryptedMessages() throws Exception { // TableView can read them using the private key @Cleanup - TableView tv = pulsarClient.newTableViewBuilder(Schema.BYTES) + org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableViewBuilder(Schema.BYTES) .topic(topic) .autoUpdatePartitionsInterval(60, TimeUnit.SECONDS) .defaultCryptoKeyReader("file:" + ECDSA_PRIVATE_KEY) @@ -540,7 +539,7 @@ public void testTableViewTailMessageReadRetry() throws Exception { String topic = "persistent://public/default/tableview-is-interrupted-test"; admin.topics().createNonPartitionedTopic(topic); @Cleanup - TableView tv = pulsarClient.newTableView(Schema.BYTES) + org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableView(Schema.BYTES) .topic(topic) .autoUpdatePartitionsInterval(60, TimeUnit.SECONDS) .create(); @@ -609,12 +608,12 @@ public void testBuildTableViewWithMessagesAlwaysAvailable() throws Exception { return CompletableFuture.completedFuture(message); }); @Cleanup - TableViewImpl tableView = (TableViewImpl) pulsarClient.newTableView() + TableView tableView = (TableView) pulsarClient.newTableView() .topic(topic) .createAsync() .get(); - TableViewImpl mockTableView = spy(tableView); - Method readAllExistingMessagesMethod = TableViewImpl.class + TableView mockTableView = spy(tableView); + Method readAllExistingMessagesMethod = TableView.class .getDeclaredMethod("readAllExistingMessages", Reader.class); readAllExistingMessagesMethod.setAccessible(true); CompletableFuture> future = @@ -642,7 +641,7 @@ public void testCreateForMessages() throws Exception { .send(); @Cleanup - TableView> tableView = pulsarClient.newTableViewBuilder(Schema.STRING) + org.apache.pulsar.client.api.TableView> tableView = pulsarClient.newTableViewBuilder(Schema.STRING) .topic(topic) .createForMessages(); @@ -667,7 +666,7 @@ public void testCreateMapped() throws Exception { Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); @Cleanup - TableView tableView = pulsarClient.newTableViewBuilder(Schema.STRING) + org.apache.pulsar.client.api.TableView tableView = pulsarClient.newTableViewBuilder(Schema.STRING) .topic(topic) .createMapped(m -> { if (m.getValue().equals("delete-me")) { @@ -728,7 +727,7 @@ public void testCreateMappedWithIdentityMapper() throws Exception { .send(); @Cleanup - TableView> tableView = pulsarClient.newTableViewBuilder(Schema.STRING) + org.apache.pulsar.client.api.TableView> tableView = pulsarClient.newTableViewBuilder(Schema.STRING) .topic(topic) .createMapped(java.util.function.Function.identity()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractTableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractTableView.java similarity index 99% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractTableViewImpl.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractTableView.java index 011a42a7a2677..682199880df18 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractTableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractTableView.java @@ -46,7 +46,7 @@ import org.apache.pulsar.common.topics.TopicCompactionStrategy; @Slf4j -abstract class AbstractTableViewImpl implements TableView { +abstract class AbstractTableView implements TableView { private final TableViewConfigurationData conf; private final ConcurrentMap data; private final Map immutableData; @@ -73,7 +73,7 @@ abstract class AbstractTableViewImpl implements TableView { *

*/ private final ConcurrentHashMap lastReadPositions; - AbstractTableViewImpl(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) { + AbstractTableView(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) { this.conf = conf; this.isPersistentTopic = conf.getTopicName().startsWith(TopicDomain.persistent.toString()); this.data = new ConcurrentHashMap<>(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageMapperTableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageMapperTableView.java similarity index 85% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageMapperTableViewImpl.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageMapperTableView.java index 890135ab728b9..4daf4a5829466 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageMapperTableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageMapperTableView.java @@ -24,12 +24,12 @@ import org.apache.pulsar.client.api.Schema; @Slf4j -public class MessageMapperTableViewImpl extends AbstractTableViewImpl { +public class MessageMapperTableView extends AbstractTableView { private final Function, V> mapper; - MessageMapperTableViewImpl(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf, - Function, V> mapper) { + MessageMapperTableView(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf, + Function, V> mapper) { super(client, schema, conf); this.mapper = mapper; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableView.java similarity index 89% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableViewImpl.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableView.java index d5893ce1890bf..b801ce95c8545 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableView.java @@ -23,8 +23,8 @@ import org.apache.pulsar.client.api.Schema; @Slf4j -public class MessageTableViewImpl extends AbstractTableViewImpl> { - MessageTableViewImpl(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) { +public class MessageTableView extends AbstractTableView> { + MessageTableView(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) { super(client, schema, conf); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableView.java similarity index 85% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableView.java index 2a300b09608c7..8d6c747026752 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableView.java @@ -20,9 +20,9 @@ import org.apache.pulsar.client.api.Schema; -public class TableViewImpl extends AbstractTableViewImpl { +public class TableView extends AbstractTableView { - TableViewImpl(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) { + TableView(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) { super(client, schema, conf); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java index 2f6ae3d973214..4208db8eca1b3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java @@ -48,7 +48,7 @@ public TableViewBuilder loadConf(Map config) { } @Override - public TableView create() throws PulsarClientException { + public org.apache.pulsar.client.api.TableView create() throws PulsarClientException { try { return createAsync().get(); } catch (Exception e) { @@ -57,12 +57,12 @@ public TableView create() throws PulsarClientException { } @Override - public CompletableFuture> createAsync() { - return new TableViewImpl<>(client, schema, conf).start(); + public CompletableFuture> createAsync() { + return new TableView<>(client, schema, conf).start(); } @Override - public TableView> createForMessages() throws PulsarClientException { + public org.apache.pulsar.client.api.TableView> createForMessages() throws PulsarClientException { try { return createForMessagesAsync().get(); } catch (Exception e) { @@ -71,12 +71,12 @@ public TableView> createForMessages() throws PulsarClientException { } @Override - public CompletableFuture>> createForMessagesAsync() { - return new MessageTableViewImpl<>(client, schema, conf).start(); + public CompletableFuture>> createForMessagesAsync() { + return new MessageTableView<>(client, schema, conf).start(); } @Override - public TableView createMapped(Function, V> mapper) throws PulsarClientException { + public org.apache.pulsar.client.api.TableView createMapped(Function, V> mapper) throws PulsarClientException { try { return createMappedAsync(mapper).get(); } catch (Exception e) { @@ -85,8 +85,8 @@ public TableView createMapped(Function, V> mapper) throws Puls } @Override - public CompletableFuture> createMappedAsync(Function, V> mapper) { - return new MessageMapperTableViewImpl(client, schema, conf, mapper).start(); + public CompletableFuture> createMappedAsync(Function, V> mapper) { + return new MessageMapperTableView(client, schema, conf, mapper).start(); } @Override diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java index c1ab9ae6b62ee..477ac2debd499 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java @@ -24,7 +24,6 @@ import static org.testng.Assert.assertNotNull; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.TableView; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -48,7 +47,7 @@ public void setup() { @Test public void testTableViewImpl() { data.setCryptoKeyReader(mock(CryptoKeyReader.class)); - TableView tableView = new TableViewImpl(client, Schema.BYTES, data); + org.apache.pulsar.client.api.TableView tableView = new TableView(client, Schema.BYTES, data); assertNotNull(tableView); } From 9e44fd723804a823b4a0efc5962c3bd35982cb76 Mon Sep 17 00:00:00 2001 From: namest504 Date: Mon, 20 Oct 2025 14:01:58 +0900 Subject: [PATCH 7/7] [improve][api] Remove createForMessages method for using createMapped method Signed-off-by: namest504 --- .../pulsar/client/impl/TableViewTest.java | 33 ------------------- .../pulsar/client/api/TableViewBuilder.java | 21 ------------ .../client/impl/TableViewBuilderImpl.java | 14 -------- 3 files changed, 68 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java index fb7c846e1619b..bd9d6a2895d0d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java @@ -624,39 +624,6 @@ public void testBuildTableViewWithMessagesAlwaysAvailable() throws Exception { assertTrue(index.get() <= 0); } - @Test - public void testCreateForMessages() throws Exception { - String topic = "persistent://public/default/testCreateForMessages"; - admin.topics().createNonPartitionedTopic(topic); - - @Cleanup - Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); - - String testKey = "key1"; - String testValue = "value1"; - producer.newMessage() - .key(testKey) - .value(testValue) - .property("myProp", "myValue") - .send(); - - @Cleanup - org.apache.pulsar.client.api.TableView> tableView = pulsarClient.newTableViewBuilder(Schema.STRING) - .topic(topic) - .createForMessages(); - - Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> tableView.size() == 1); - - Message message = tableView.get(testKey); - Assert.assertNotNull(message, "Message should not be null for key: " + testKey); - assertEquals(message.getKey(), testKey); - assertEquals(message.getValue(), testValue); - assertEquals(message.getProperty("myProp"), "myValue"); - - Message missingMessage = tableView.get("missingKey"); - Assert.assertNull(missingMessage, "Message should be null for missing key"); - } - @Test public void testCreateMapped() throws Exception { String topic = "persistent://public/default/testCreateMapped"; diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java index 64244a241d6cb..e8c138496e1d2 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java @@ -78,27 +78,6 @@ public interface TableViewBuilder { */ CompletableFuture> createAsync(); - /** - * Finalize the creation of the {@link TableView} instance that contains the full {@link Message}. - * - *

This method will block until the tableView is created successfully or an exception is thrown. - * - * @return the {@link TableView} instance - * @throws PulsarClientException - * if the tableView creation fails - */ - TableView> createForMessages() throws PulsarClientException; - - /** - * Finalize the creation of the {@link TableView} instance that contains the full {@link Message} - * in asynchronous mode. - * - *

This method will return a {@link CompletableFuture} that can be used to access the instance when it's ready. - * - * @return the {@link TableView} instance - */ - CompletableFuture>> createForMessagesAsync(); - /** * Creates a {@link TableView} instance where the values are the result of applying a user-defined * `mapper` function to each message. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java index 4208db8eca1b3..207ed354b3b69 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java @@ -61,20 +61,6 @@ public CompletableFuture> createAsync( return new TableView<>(client, schema, conf).start(); } - @Override - public org.apache.pulsar.client.api.TableView> createForMessages() throws PulsarClientException { - try { - return createForMessagesAsync().get(); - } catch (Exception e) { - throw PulsarClientException.unwrap(e); - } - } - - @Override - public CompletableFuture>> createForMessagesAsync() { - return new MessageTableView<>(client, schema, conf).start(); - } - @Override public org.apache.pulsar.client.api.TableView createMapped(Function, V> mapper) throws PulsarClientException { try {