Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
* Enumeration showing if a topic is persistent.
*/
public enum TopicDomain {
persistent("persistent"), non_persistent("non-persistent");
persistent("persistent"), non_persistent("non-persistent"),
topic("topic"), segment("segment");

private String value;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,12 @@ public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurat
return FutureUtil.failedFuture(
new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
}
if (isScalableDomain(topic)) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidTopicNameException(
"Scalable topic domains (topic://, segment://) require the V5 client SDK."
+ " Topic: '" + topic + "'"));
}

if (schema instanceof AutoProduceBytesSchema) {
AutoProduceBytesSchema autoProduceBytesSchema = (AutoProduceBytesSchema) schema;
Expand Down Expand Up @@ -577,6 +583,12 @@ public <T> CompletableFuture<Consumer<T>> subscribeAsync(ConsumerConfigurationDa
return FutureUtil.failedFuture(
new PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
}
if (isScalableDomain(topic)) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidTopicNameException(
"Scalable topic domains (topic://, segment://) require the V5 client SDK."
+ " Topic: '" + topic + "'"));
}
}

if (isBlank(conf.getSubscriptionName())) {
Expand Down Expand Up @@ -742,6 +754,12 @@ public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationDat
return FutureUtil.failedFuture(new PulsarClientException
.InvalidTopicNameException("Invalid topic name: '" + topic + "'"));
}
if (isScalableDomain(topic)) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidTopicNameException(
"Scalable topic domains (topic://, segment://) require the V5 client SDK."
+ " Topic: '" + topic + "'"));
}
}

if (conf.getStartMessageId() == null) {
Expand Down Expand Up @@ -1431,4 +1449,9 @@ public TransactionBuilder newTransaction() {
NameResolver<InetAddress> getNameResolver() {
return DnsResolverUtil.adaptToNameResolver(addressResolver);
}

private static boolean isScalableDomain(String topic) {
TopicName topicName = TopicName.get(topic);
return topicName.isScalable() || topicName.isSegment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.scalable.HashRange;
import org.apache.pulsar.common.util.Codec;

/**
Expand All @@ -49,6 +51,10 @@ public class TopicName implements ServiceUnitId {

private final int partitionIndex;

// segment:// fields (null for non-segment domains)
private final HashRange segmentRange;
private final long segmentId;

private static final ConcurrentHashMap<String, TopicName> cache = new ConcurrentHashMap<>();

public static void clearIfReachedMaxCapacity(int maxCapacity) {
Expand Down Expand Up @@ -132,8 +138,12 @@ private TopicName(String completeTopicName) {

String rest = parts.get(1);

// Expected format: tenant/namespace/<localName>
parts = Splitter.on("/").limit(4).splitToList(rest);
// Scalable topic domains (topic://, segment://) only support the new format
// and local names may contain '/', so use limit(3) to keep the rest as localName.
boolean isScalableDomain = this.domain == TopicDomain.topic
|| this.domain == TopicDomain.segment;
int splitLimit = isScalableDomain ? 3 : 4;
parts = Splitter.on("/").limit(splitLimit).splitToList(rest);
if (parts.size() == 4) {
throw new IllegalArgumentException(
"V1 topic names (with cluster component) are no longer supported. "
Expand All @@ -142,7 +152,35 @@ private TopicName(String completeTopicName) {
} else if (parts.size() == 3) {
this.tenant = parts.get(0);
this.namespacePortion = parts.get(1);
this.localName = parts.get(2);
String rawLocalName = parts.get(2);

// For segment:// domains, split local name into parent topic name + descriptor
if (this.domain == TopicDomain.segment) {
int lastSlash = rawLocalName.lastIndexOf('/');
if (lastSlash <= 0) {
throw new IllegalArgumentException(
"Invalid segment topic name: local name must contain"
+ " '<parent-topic>/<hashStart>-<hashEnd>-<segmentId>'. Got: "
+ completeTopicName);
}
this.localName = rawLocalName.substring(0, lastSlash);
String descriptor = rawLocalName.substring(lastSlash + 1);
String[] descParts = descriptor.split("-");
if (descParts.length != 3) {
throw new IllegalArgumentException(
"Invalid segment descriptor: expected '<hexStart>-<hexEnd>-<segmentId>',"
+ " got: '" + descriptor + "'");
}
this.segmentRange = HashRange.of(
Integer.parseInt(descParts[0], 16),
Integer.parseInt(descParts[1], 16));
this.segmentId = Long.parseLong(descParts[2]);
} else {
this.localName = rawLocalName;
this.segmentRange = null;
this.segmentId = -1;
}

this.partitionIndex = getPartitionIndex(completeTopicName);
this.namespaceName = NamespaceName.get(tenant, namespacePortion);
} else {
Expand All @@ -157,12 +195,56 @@ private TopicName(String completeTopicName) {
} catch (NullPointerException e) {
throw new IllegalArgumentException("Invalid topic name: " + completeTopicName, e);
}
this.completeTopicName = String.format("%s://%s/%s/%s",
domain, tenant, namespacePortion, localName);
if (this.domain == TopicDomain.segment) {
this.completeTopicName = String.format("%s://%s/%s/%s/%s",
domain, tenant, namespacePortion, localName,
String.format("%04x-%04x-%d", segmentRange.start(), segmentRange.end(), segmentId));
} else {
this.completeTopicName = String.format("%s://%s/%s/%s",
domain, tenant, namespacePortion, localName);
}
}

public boolean isPersistent() {
return TopicDomain.persistent == domain;
return TopicDomain.persistent == domain || TopicDomain.topic == domain || TopicDomain.segment == domain;
}

public boolean isScalable() {
return TopicDomain.topic == domain;
}

public boolean isSegment() {
return TopicDomain.segment == domain;
}

/**
* Get the segment hash range for segment:// topics.
*
* @return the hash range, or empty for non-segment domains
*/
public Optional<HashRange> getSegmentRange() {
return Optional.ofNullable(segmentRange);
}

/**
* Get the segment ID for segment:// topics.
*
* @return the segment ID, or -1 for non-segment domains
*/
public long getSegmentId() {
return segmentId;
}

/**
* Get the segment descriptor string (e.g. "0000-7fff-1") for segment:// topics.
*
* @return the descriptor, or null for non-segment domains
*/
public String getSegmentDescriptor() {
if (segmentRange == null) {
return null;
}
return String.format("%04x-%04x-%d", segmentRange.start(), segmentRange.end(), segmentId);
}

/**
Expand Down Expand Up @@ -299,6 +381,14 @@ public String getRestPath(boolean includeDomain) {
public String getPersistenceNamingEncoding() {
// The convention is: domain://tenant/namespace/topic
// We want to persist in the order: tenant/namespace/domain/topic
// For segment topics, include the segment descriptor: tenant/namespace/segment/topic/descriptor
if (domain == TopicDomain.segment && segmentRange != null) {
return String.format("%s/%s/%s/%s/%04x-%04x-%d", tenant, namespacePortion, domain,
getEncodedLocalName(), segmentRange.start(), segmentRange.end(), segmentId);
}
if (domain == TopicDomain.topic) {
return String.format("%s/%s/%s", tenant, namespacePortion, getEncodedLocalName());
}
return String.format("%s/%s/%s/%s", tenant, namespacePortion, domain, getEncodedLocalName());
}

Expand All @@ -325,6 +415,13 @@ public static String fromPersistenceNamingEncoding(String mlName) {
localName = Codec.decode(parts.get(3));
return String.format("%s://%s/%s/%s", domain, tenant, namespacePortion, localName);
} else if (parts.size() == 5) {
if ("segment".equals(parts.get(2))) {
// Segment topic ML name: tenant/namespace/segment/topic/descriptor
tenant = parts.get(0);
namespacePortion = parts.get(1);
localName = Codec.decode(parts.get(3));
return String.format("segment://%s/%s/%s/%s", tenant, namespacePortion, localName, parts.get(4));
}
// Legacy V1 managed ledger name: tenant/cluster/namespace/domain/topic
// Convert to V2 format, dropping the cluster component
tenant = parts.get(0);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.scalable;

/**
* Represents an inclusive hash range [start, end] within a 16-bit hash space (0x0000-0xFFFF).
*/
public record HashRange(int start, int end) implements Comparable<HashRange> {

public static final int MIN_HASH = 0x0000;
public static final int MAX_HASH = 0xFFFF;

public HashRange {
if (start < MIN_HASH || start > MAX_HASH) {
throw new IllegalArgumentException("start must be in [0x0000, 0xFFFF], got: " + start);
}
if (end < MIN_HASH || end > MAX_HASH) {
throw new IllegalArgumentException("end must be in [0x0000, 0xFFFF], got: " + end);
}
if (end < start) {
throw new IllegalArgumentException("end must be >= start, got: [" + start + ", " + end + "]");
}
}

public static HashRange of(int start, int end) {
return new HashRange(start, end);
}

public static HashRange full() {
return new HashRange(MIN_HASH, MAX_HASH);
}

public boolean contains(int hash) {
return hash >= start && hash <= end;
}

public boolean contains(HashRange other) {
return start <= other.start && end >= other.end;
}

public boolean isAdjacentTo(HashRange other) {
return this.end + 1 == other.start || other.end + 1 == this.start;
}

public int size() {
return end - start + 1;
}

/**
* Split this range at the midpoint into two sub-ranges.
*
* @return array of two HashRange objects: [start, mid] and [mid+1, end]
* @throws IllegalStateException if the range cannot be split (size < 2)
*/
public HashRange[] split() {
if (size() < 2) {
throw new IllegalStateException("Cannot split range of size " + size());
}
int mid = start + (end - start) / 2;
return new HashRange[]{
new HashRange(start, mid),
new HashRange(mid + 1, end)
};
}

/**
* Merge this range with an adjacent range.
*
* @param other the adjacent range to merge with
* @return the merged range
* @throws IllegalArgumentException if ranges are not adjacent
*/
public HashRange merge(HashRange other) {
if (this.end + 1 == other.start) {
return new HashRange(this.start, other.end);
} else if (other.end + 1 == this.start) {
return new HashRange(other.start, this.end);
}
throw new IllegalArgumentException(
"Ranges are not adjacent: " + this + " and " + other);
}

@Override
public int compareTo(HashRange o) {
int result = Integer.compare(start, o.start);
if (result == 0) {
result = Integer.compare(end, o.end);
}
return result;
}

public String toHexString() {
return String.format("%04x-%04x", start, end);
}

public static HashRange fromHexString(String hex) {
String[] parts = hex.split("-", 2);
if (parts.length != 2) {
throw new IllegalArgumentException("Invalid hash range format: " + hex);
}
return new HashRange(Integer.parseInt(parts[0], 16), Integer.parseInt(parts[1], 16));
}

@Override
public String toString() {
return "[" + String.format("%04x", start) + ", " + String.format("%04x", end) + "]";
}
}
Loading
Loading