diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java index 58622d7db19a2..bd9d6a2895d0d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java @@ -53,7 +53,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.TableView; import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -67,7 +66,7 @@ import org.testng.annotations.Test; /** - * Unit test for {@link org.apache.pulsar.client.impl.TableViewImpl}. + * Unit test for {@link TableView}. */ @Slf4j @Test(groups = "broker-impl") @@ -169,7 +168,7 @@ public void testRefreshAPI(int partition) throws Exception { } @Cleanup - TableView tv = pulsarClient.newTableView(Schema.BYTES) + org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableView(Schema.BYTES) .topic(topic) .create(); // Verify refresh can handle the case when the topic is empty @@ -225,11 +224,11 @@ public void testRefreshTaskCanBeCompletedWhenReaderClosed() throws Exception { String topic2 = "persistent://public/default/testRefreshTaskCanBeCompletedWhenReaderClosed-2"; admin.topics().createNonPartitionedTopic(topic2); @Cleanup - TableView tv1 = pulsarClient.newTableView(Schema.BYTES) + org.apache.pulsar.client.api.TableView tv1 = pulsarClient.newTableView(Schema.BYTES) .topic(topic1) .create(); @Cleanup - TableView tv2 = pulsarClient.newTableView(Schema.BYTES) + org.apache.pulsar.client.api.TableView tv2 = pulsarClient.newTableView(Schema.BYTES) .topic(topic1) .create(); // 2. Slow down the rate of reading messages. @@ -269,7 +268,7 @@ public void testTableView() throws Exception { int count = 20; Set keys = this.publishMessages(topic, count, false); @Cleanup - TableView tv = pulsarClient.newTableViewBuilder(Schema.BYTES) + org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableViewBuilder(Schema.BYTES) .topic(topic) .autoUpdatePartitionsInterval(60, TimeUnit.SECONDS) .create(); @@ -316,7 +315,7 @@ public void testNewTableView() throws Exception { admin.topics().createPartitionedTopic(topic, 2); Set keys = this.publishMessages(topic, 10, false); @Cleanup - TableView tv = pulsarClient.newTableView() + org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableView() .topic(topic) .autoUpdatePartitionsInterval(60, TimeUnit.SECONDS) .create(); @@ -336,7 +335,7 @@ public void testTableViewUpdatePartitions(String topicDomain) throws Exception { // For non-persistent topic, this keys will never be received. Set keys = this.publishMessages(topic, count, false); @Cleanup - TableView tv = pulsarClient.newTableViewBuilder(Schema.BYTES) + org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableViewBuilder(Schema.BYTES) .topic(topic) .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS) .create(); @@ -374,7 +373,7 @@ public void testPublishNullValue(String topicDomain) throws Exception { String topic = topicDomain + "://public/default/tableview-test-publish-null-value"; admin.topics().createPartitionedTopic(topic, 3); - final TableView tv = pulsarClient.newTableViewBuilder(Schema.STRING) + final org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableViewBuilder(Schema.STRING) .topic(topic) .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS) .create(); @@ -397,7 +396,7 @@ public void testPublishNullValue(String topicDomain) throws Exception { tv.close(); @Cleanup - TableView tv1 = pulsarClient.newTableView(Schema.STRING) + org.apache.pulsar.client.api.TableView tv1 = pulsarClient.newTableView(Schema.STRING) .topic(topic) .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS) .create(); @@ -427,7 +426,7 @@ public void testAck(boolean partitionedTopic) throws Exception { } @Cleanup - TableView tv1 = pulsarClient.newTableViewBuilder(Schema.STRING) + org.apache.pulsar.client.api.TableView tv1 = pulsarClient.newTableViewBuilder(Schema.STRING) .topic(topic) .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS) .create(); @@ -476,7 +475,7 @@ public void testListen() throws Exception { } @Cleanup - TableView tv = pulsarClient.newTableViewBuilder(Schema.STRING) + org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableViewBuilder(Schema.STRING) .topic(topic) .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS) .create(); @@ -521,7 +520,7 @@ public void testTableViewWithEncryptedMessages() throws Exception { // TableView can read them using the private key @Cleanup - TableView tv = pulsarClient.newTableViewBuilder(Schema.BYTES) + org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableViewBuilder(Schema.BYTES) .topic(topic) .autoUpdatePartitionsInterval(60, TimeUnit.SECONDS) .defaultCryptoKeyReader("file:" + ECDSA_PRIVATE_KEY) @@ -540,7 +539,7 @@ public void testTableViewTailMessageReadRetry() throws Exception { String topic = "persistent://public/default/tableview-is-interrupted-test"; admin.topics().createNonPartitionedTopic(topic); @Cleanup - TableView tv = pulsarClient.newTableView(Schema.BYTES) + org.apache.pulsar.client.api.TableView tv = pulsarClient.newTableView(Schema.BYTES) .topic(topic) .autoUpdatePartitionsInterval(60, TimeUnit.SECONDS) .create(); @@ -609,12 +608,12 @@ public void testBuildTableViewWithMessagesAlwaysAvailable() throws Exception { return CompletableFuture.completedFuture(message); }); @Cleanup - TableViewImpl tableView = (TableViewImpl) pulsarClient.newTableView() + TableView tableView = (TableView) pulsarClient.newTableView() .topic(topic) .createAsync() .get(); - TableViewImpl mockTableView = spy(tableView); - Method readAllExistingMessagesMethod = TableViewImpl.class + TableView mockTableView = spy(tableView); + Method readAllExistingMessagesMethod = TableView.class .getDeclaredMethod("readAllExistingMessages", Reader.class); readAllExistingMessagesMethod.setAccessible(true); CompletableFuture> future = @@ -624,4 +623,91 @@ public void testBuildTableViewWithMessagesAlwaysAvailable() throws Exception { future.get(3, TimeUnit.SECONDS); assertTrue(index.get() <= 0); } + + @Test + public void testCreateMapped() throws Exception { + String topic = "persistent://public/default/testCreateMapped"; + admin.topics().createNonPartitionedTopic(topic); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + + @Cleanup + org.apache.pulsar.client.api.TableView tableView = pulsarClient.newTableViewBuilder(Schema.STRING) + .topic(topic) + .createMapped(m -> { + if (m.getValue().equals("delete-me")) { + return null; + } + return m.getValue() + ":" + m.getProperty("myProp"); + }); + + // Send a message to be mapped + String testKey = "key1"; + String testValue = "value1"; + producer.newMessage() + .key(testKey) + .value(testValue) + .property("myProp", "myValue") + .send(); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> tableView.size() == 1); + + String mappedValue = tableView.get(testKey); + assertEquals(mappedValue, "value1:myValue"); + + // Send another message to update the value + producer.newMessage() + .key(testKey) + .value("value2") + .property("myProp", "myValue2") + .send(); + + Awaitility.await().atMost(5, TimeUnit.SECONDS) + .until(() -> "value2:myValue2".equals(tableView.get(testKey))); + assertEquals(tableView.size(), 1); + + // Send a message that maps to null (tombstone) + producer.newMessage() + .key(testKey) + .value("delete-me") + .send(); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> tableView.size() == 0); + Assert.assertNull(tableView.get(testKey), "Value should be null after tombstone message"); + } + + @Test + public void testCreateMappedWithIdentityMapper() throws Exception { + String topic = "persistent://public/default/testCreateMappedWithIdentityMapper"; + admin.topics().createNonPartitionedTopic(topic); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + + String testKey = "key1"; + String testValue = "value1"; + producer.newMessage() + .key(testKey) + .value(testValue) + .property("myProp", "myValue") + .send(); + + @Cleanup + org.apache.pulsar.client.api.TableView> tableView = pulsarClient.newTableViewBuilder(Schema.STRING) + .topic(topic) + .createMapped(java.util.function.Function.identity()); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> tableView.size() == 1); + + Message message = tableView.get(testKey); + Assert.assertNotNull(message, "Message should not be null for key: " + testKey); + assertEquals(message.getKey(), testKey); + assertEquals(message.getValue(), testValue); + assertEquals(message.getProperty("myProp"), "myValue"); + + Message missingMessage = tableView.get("missingKey"); + Assert.assertNull(missingMessage, "Message should be null for missing key"); + } + } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java index 767b8e1103fa6..1ad8995f6af9f 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java @@ -59,6 +59,7 @@ public interface TableView extends Closeable { */ T get(String key); + /** * Returns a Set view of the mappings contained in this map. * diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java index 76b8ff4fbdac3..e8c138496e1d2 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java @@ -21,6 +21,8 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; + import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -76,6 +78,45 @@ public interface TableViewBuilder { */ CompletableFuture> createAsync(); + /** + * Creates a {@link TableView} instance where the values are the result of applying a user-defined + * `mapper` function to each message. + * + *

This provides a flexible way to create a key-value view over a topic, allowing users to extract data + * from the message payload, properties, and other metadata into a custom object {@code V}. + * + *

To get a view of the full {@link Message} objects, {@code java.util.function.Function.identity()} + * can be used as the mapper. + * + *

If the `mapper` function returns `null`, it is treated as a tombstone message, and the + * corresponding key will be removed from the `TableView`. + * + * @param mapper A function that takes a {@link Message} and returns a custom object of type {@code V}. + * @param The type of the value in the TableView. + * @return the {@link TableView} instance + * @throws PulsarClientException if the tableView creation fails + */ + TableView createMapped(Function, V> mapper) throws PulsarClientException; + + /** + * Creates a {@link TableView} instance in asynchronous mode where the values are the result of applying + * a user-defined `mapper` function to each message. + * + *

This provides a flexible way to create a key-value view over a topic, allowing users to extract data + * from the message payload, properties, and other metadata into a custom object {@code V}. + * + *

To get a view of the full {@link Message} objects, {@code java.util.function.Function.identity()} + * can be used as the mapper. + * + *

If the `mapper` function returns `null`, it is treated as a tombstone message, and the + * corresponding key will be removed from the `TableView`. + * + * @param mapper A function that takes a {@link Message} and returns a custom object of type {@code V}. + * @param The type of the value in the TableView. + * @return a future that can be used to access the {@link TableView} instance when it's ready + */ + CompletableFuture> createMappedAsync(Function, V> mapper); + /** * Set the topic name of the {@link TableView}. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractTableView.java similarity index 93% rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractTableView.java index 17b49828eeced..682199880df18 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractTableView.java @@ -17,7 +17,6 @@ * under the License. */ package org.apache.pulsar.client.impl; - import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG; import java.util.ArrayList; import java.util.Collection; @@ -47,20 +46,15 @@ import org.apache.pulsar.common.topics.TopicCompactionStrategy; @Slf4j -public class TableViewImpl implements TableView { - +abstract class AbstractTableView implements TableView { private final TableViewConfigurationData conf; - - private final ConcurrentMap data; - private final Map immutableData; - + private final ConcurrentMap data; + private final Map immutableData; private final CompletableFuture> reader; - - private final List> listeners; + private final List> listeners; private final ReentrantLock listenersMutex; private final boolean isPersistentTopic; - private TopicCompactionStrategy compactionStrategy; - + private TopicCompactionStrategy compactionStrategy; /** * Store the refresh tasks. When read to the position recording in the right map, * then remove the position in the right map. If the right map is empty, complete the future in the left. @@ -68,7 +62,6 @@ public class TableViewImpl implements TableView { * It will only be completed exceptionally when no more messages can be read. */ private final ConcurrentHashMap, Map> pendingRefreshRequests; - /** * This map stored the read position of each partition. It is used for the following case: *

@@ -80,8 +73,7 @@ public class TableViewImpl implements TableView { *

*/ private final ConcurrentHashMap lastReadPositions; - - TableViewImpl(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) { + AbstractTableView(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) { this.conf = conf; this.isPersistentTopic = conf.getTopicName().startsWith(TopicDomain.persistent.toString()); this.data = new ConcurrentHashMap<>(); @@ -102,18 +94,14 @@ public class TableViewImpl implements TableView { if (isPersistentTopic) { readerBuilder.readCompacted(true); } - CryptoKeyReader cryptoKeyReader = conf.getCryptoKeyReader(); if (cryptoKeyReader != null) { readerBuilder.cryptoKeyReader(cryptoKeyReader); } - readerBuilder.cryptoFailureAction(conf.getCryptoFailureAction()); - this.reader = readerBuilder.createAsync(); } - - CompletableFuture> start() { + CompletableFuture> start() { return reader.thenCompose((reader) -> { if (!isPersistentTopic) { readTailMessages(reader); @@ -123,49 +111,40 @@ CompletableFuture> start() { .thenRun(() -> readTailMessages(reader)); }).thenApply(__ -> this); } - @Override public int size() { return data.size(); } - @Override public boolean isEmpty() { return data.isEmpty(); } - @Override public boolean containsKey(String key) { return data.containsKey(key); } - @Override - public T get(String key) { + public V get(String key) { return data.get(key); } - @Override - public Set> entrySet() { + public Set> entrySet() { return immutableData.entrySet(); } - @Override public Set keySet() { return immutableData.keySet(); } - @Override - public Collection values() { + public Collection values() { return immutableData.values(); } - @Override - public void forEach(BiConsumer action) { + public void forEach(BiConsumer action) { data.forEach(action); } - @Override - public void listen(BiConsumer action) { + public void listen(BiConsumer action) { try { listenersMutex.lock(); listeners.add(action); @@ -173,27 +152,22 @@ public void listen(BiConsumer action) { listenersMutex.unlock(); } } - @Override - public void forEachAndListen(BiConsumer action) { + public void forEachAndListen(BiConsumer action) { // Ensure we iterate over all the existing entry _and_ start the listening from the exact next message try { listenersMutex.lock(); - // Execute the action over existing entries forEach(action); - listeners.add(action); } finally { listenersMutex.unlock(); } } - @Override public CompletableFuture closeAsync() { return reader.thenCompose(Reader::closeAsync); } - @Override public void close() throws PulsarClientException { try { @@ -202,23 +176,21 @@ public void close() throws PulsarClientException { throw PulsarClientException.unwrap(e); } } - private void handleMessage(Message msg) { lastReadPositions.put(msg.getTopicName(), msg.getMessageId()); try { if (msg.hasKey()) { String key = msg.getKey(); - T cur = msg.size() > 0 ? msg.getValue() : null; + V cur = getValueIfPresent(msg); if (log.isDebugEnabled()) { log.debug("Applying message from topic {}. key={} value={}", conf.getTopicName(), key, cur); } - boolean update = true; if (compactionStrategy != null) { - T prev = data.get(key); + V prev = data.get(key); update = !compactionStrategy.shouldKeepLeft(prev, cur); if (!update) { log.info("Skipped the message from topic {}. key={} value={} prev={}", @@ -229,7 +201,6 @@ private void handleMessage(Message msg) { compactionStrategy.handleSkippedMessage(key, cur); } } - if (update) { try { listenersMutex.lock(); @@ -238,8 +209,7 @@ private void handleMessage(Message msg) { } else { data.put(key, cur); } - - for (BiConsumer listener : listeners) { + for (BiConsumer listener : listeners) { try { listener.accept(key, cur); } catch (Throwable t) { @@ -253,10 +223,16 @@ private void handleMessage(Message msg) { } checkAllFreshTask(msg); } finally { - msg.release(); + maybeReleaseMessage(msg); } } - + protected void maybeReleaseMessage(Message msg) { + msg.release(); + } + @SuppressWarnings("unchecked") + protected V getValueIfPresent(Message msg) { + return msg.size() > 0 ? (V) msg.getValue() : null; + } @Override public CompletableFuture refreshAsync() { CompletableFuture completableFuture = new CompletableFuture<>(); @@ -281,7 +257,6 @@ public CompletableFuture refreshAsync() { }); return completableFuture; } - @Override public void refresh() throws PulsarClientException { try { @@ -290,11 +265,9 @@ public void refresh() throws PulsarClientException { throw PulsarClientException.unwrap(e); } } - private CompletableFuture readAllExistingMessages(Reader reader) { long startTime = System.nanoTime(); AtomicLong messagesRead = new AtomicLong(); - CompletableFuture future = new CompletableFuture<>(); getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> { if (lastMessageIds.isEmpty()) { @@ -308,7 +281,6 @@ private CompletableFuture readAllExistingMessages(Reader reader) { }); return future; } - private CompletableFuture> getLastMessageIdOfNonEmptyTopics(Reader reader) { return reader.getLastMessageIdsAsync().thenApply(lastMessageIds -> { Map lastMessageIdMap = new ConcurrentHashMap<>(); @@ -320,7 +292,6 @@ private CompletableFuture> getLastMessageIdOfNonEmpt return lastMessageIdMap; }); } - private void filterReceivedMessages(Map lastMessageIds) { // The `lastMessageIds` and `readPositions` is concurrency-safe data types. lastMessageIds.forEach((partition, lastMessageId) -> { @@ -330,7 +301,6 @@ private void filterReceivedMessages(Map lastMessageIds) } }); } - private boolean checkFreshTask(Map maxMessageIds, CompletableFuture future, MessageId messageId, String topicName) { // The message received from multi-consumer/multi-reader is processed to TopicMessageImpl. @@ -347,7 +317,6 @@ private boolean checkFreshTask(Map maxMessageIds, Comple return false; } } - private void checkAllFreshTask(Message msg) { pendingRefreshRequests.forEach((future, maxMessageIds) -> { String topicName = msg.getTopicName(); @@ -357,7 +326,6 @@ private void checkAllFreshTask(Message msg) { } }); } - private void readAllExistingMessages(Reader reader, CompletableFuture future, long startTime, AtomicLong messagesRead, Map maxMessageIds) { reader.hasMessageAvailableAsync() @@ -396,7 +364,6 @@ private void readAllExistingMessages(Reader reader, CompletableFuture f } }); } - private void readTailMessages(Reader reader) { reader.readNextAsync() .thenAccept(msg -> { @@ -424,4 +391,4 @@ private void readTailMessages(Reader reader) { return null; }); } -} +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageMapperTableView.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageMapperTableView.java new file mode 100644 index 0000000000000..4daf4a5829466 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageMapperTableView.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import java.util.function.Function; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; + +@Slf4j +public class MessageMapperTableView extends AbstractTableView { + + private final Function, V> mapper; + + MessageMapperTableView(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf, + Function, V> mapper) { + super(client, schema, conf); + this.mapper = mapper; + } + + @Override + protected void maybeReleaseMessage(Message msg) { + // The message is passed to the user-defined mapper function. + // The user is responsible for releasing the message if needed. + // To be safe, we don't release the message here. + } + + @Override + protected V getValueIfPresent(Message msg) { + return msg.size() > 0 ? mapper.apply(msg) : null; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableView.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableView.java new file mode 100644 index 0000000000000..b801ce95c8545 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageTableView.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; + +@Slf4j +public class MessageTableView extends AbstractTableView> { + MessageTableView(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) { + super(client, schema, conf); + } + + @Override + protected void maybeReleaseMessage(Message msg) { + // don't release the message. Pooling of messages might have to be disabled in the client when using + // MessageTableViewImpl. + } + + @Override + protected Message getValueIfPresent(Message msg) { + // return the message as the value for the table view + // if the payload is empty, the message is considered a tombstone message and it won't be preserved in the map + return msg.size() > 0 ? msg : null; + } +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableView.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableView.java new file mode 100644 index 0000000000000..8d6c747026752 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableView.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import org.apache.pulsar.client.api.Schema; + +public class TableView extends AbstractTableView { + + TableView(PulsarClientImpl client, Schema schema, TableViewConfigurationData conf) { + super(client, schema, conf); + } + +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java index e0a47a70b1c8b..207ed354b3b69 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java @@ -22,14 +22,10 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import lombok.NonNull; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; -import org.apache.pulsar.client.api.CryptoKeyReader; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.TableView; -import org.apache.pulsar.client.api.TableViewBuilder; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; public class TableViewBuilderImpl implements TableViewBuilder { @@ -52,7 +48,7 @@ public TableViewBuilder loadConf(Map config) { } @Override - public TableView create() throws PulsarClientException { + public org.apache.pulsar.client.api.TableView create() throws PulsarClientException { try { return createAsync().get(); } catch (Exception e) { @@ -61,8 +57,22 @@ public TableView create() throws PulsarClientException { } @Override - public CompletableFuture> createAsync() { - return new TableViewImpl<>(client, schema, conf).start(); + public CompletableFuture> createAsync() { + return new TableView<>(client, schema, conf).start(); + } + + @Override + public org.apache.pulsar.client.api.TableView createMapped(Function, V> mapper) throws PulsarClientException { + try { + return createMappedAsync(mapper).get(); + } catch (Exception e) { + throw PulsarClientException.unwrap(e); + } + } + + @Override + public CompletableFuture> createMappedAsync(Function, V> mapper) { + return new MessageMapperTableView(client, schema, conf, mapper).start(); } @Override @@ -109,4 +119,4 @@ public TableViewBuilder cryptoFailureAction(ConsumerCryptoFailureAction actio conf.setCryptoFailureAction(action); return this; } -} +} \ No newline at end of file diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java index c1ab9ae6b62ee..477ac2debd499 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TableViewImplTest.java @@ -24,7 +24,6 @@ import static org.testng.Assert.assertNotNull; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.TableView; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -48,7 +47,7 @@ public void setup() { @Test public void testTableViewImpl() { data.setCryptoKeyReader(mock(CryptoKeyReader.class)); - TableView tableView = new TableViewImpl(client, Schema.BYTES, data); + org.apache.pulsar.client.api.TableView tableView = new TableView(client, Schema.BYTES, data); assertNotNull(tableView); }