Skip to content

Commit 75bb1f9

Browse files
committed
pip-451: Support Label-based Topic Subscription
1 parent ab65faa commit 75bb1f9

File tree

1 file changed

+255
-0
lines changed

1 file changed

+255
-0
lines changed

pip/pip-451.md

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
# PIP-451: Support Label-based Topic Subscription
2+
3+
# Motivation
4+
5+
Currently, Apache Pulsar supports Pattern Subscription, which allows consumers to subscribe to multiple topics using Regular Expressions (Regex). While powerful, Regex-based subscription has several structural limitations in complex microservice architectures:
6+
7+
* Coupling: It couples the consumption logic with the topic naming convention. Changing business requirements often forces topic renaming, which is operationally expensive and risky.S
8+
9+
* Flexibility: It is difficult to group semantically related but differently named topics (e.g., persistent://public/default/payment-core and persistent://public/legacy/billing-v1) into a single subscription without complex regex wizardry.
10+
11+
* Complexity: complex Regex can be hard to maintain and error-prone.
12+
13+
Label-based Subscription solves these issues by decoupling "Identity" (Topic Name) from "Attributes" (Labels). Users can attach Key-Value metadata (e.g., env=prod, dept=finance) to topics and subscribe by specifying a label selector.
14+
15+
# Goals
16+
17+
## In Scope
18+
* Management: Allow attaching, updating, and removing Key-Value labels to/from Topics via the Admin API.
19+
* Subscription: Allow Consumers to subscribe to topics matching specific Labels within specified Namespaces. Support cross-namespace subscription via an explicit namespace list in the Client API, avoiding the complexity of background metadata polling.
20+
21+
22+
23+
# High-Level Design
24+
The design introduces a metadata-driven approach where labels are stored in TopicPolicies.
25+
The Broker maintains an in-memory index to map labels to topics. The client utilizes a "Watch" mechanism to receive real-time updates when topics matching the labels are created or updated.
26+
## Key points
27+
* Storage: Labels are stored as Map<String, String> inside TopicPolicies.
28+
* Indexing: The Broker maintains an In-Memory Inverted Index (LabelKey -> LabelValue -> Set<Topic>) per Namespace. This hierarchical structure ensures efficient lookups for Key-Value pairs without iterating through all topics.
29+
* Discovery Protocol: We extend the CommandWatchTopicList protocol (PIP-179) to accept a label_selector.
30+
* Client Implementation: The Client accepts a list of target namespaces and manages multiple watchers (one per namespace) to aggregate matching topics.
31+
32+
# Detailed Design
33+
34+
## Design & Implementation Details
35+
36+
### Storage
37+
38+
#### Topic Labels in TopicPolicies
39+
40+
Add a labels field to the TopicPolicies class. Since Topic Policies are propagated via the __change_events system topic, this ensures durability and consistency across brokers.
41+
```java
42+
public class TopicPolicies {
43+
// New field: Key-Value labels
44+
private Map<String, String> customLabels;
45+
}
46+
```
47+
48+
#### In-Memory Inverted Index for Labels-Topic Mapping
49+
50+
The SystemTopicBasedTopicPoliciesService will maintain a nested map structure per Namespace to support efficient Key-Value lookups.
51+
52+
**Data Structure**:
53+
54+
```java
55+
// Map<Namespace, Map<LabelKey, Map<LabelValue, Set<TopicName>>>>
56+
Map<String, Map<String, Map<String, Set<String>>>> labelTopicInvertedIndex;
57+
```
58+
59+
### Topic Labels Observation and Notification Mechanism
60+
61+
### TopicPoliciesService Interface Changes
62+
63+
```java
64+
public interface TopicPoliciesService {
65+
// ... Existing methods ...
66+
67+
/**
68+
* Register a label change listener with the service.
69+
*/
70+
void registerTopicLabelsListener(NamespaceName namespace, TopicLabelsListener listener);
71+
72+
/**
73+
* Unregister a label change listener with the service.
74+
*/
75+
void unregisterTopicLabelsListener(NamespaceName namespace, TopicLabelsListener listener);
76+
77+
/**
78+
* Query the list of all matching Topics under the specified Namespace based on the label selector.
79+
*/
80+
CompletableFuture<List<TopicName>> getTopicsByLabels(NamespaceName namespace, Map<String, String> labels);
81+
82+
/**
83+
* Query all existing labels within a Namespace.
84+
*/
85+
CompletableFuture<Map<String, Set<String>>> getAllLabels(NamespaceName namespace);
86+
}
87+
```
88+
To support real-time label change notifications, we will introduce a listener mechanism in TopicPoliciesService.
89+
90+
#### Add TopicLabelsListener Interface
91+
92+
```java
93+
public interface TopicLabelsListener {
94+
/**
95+
* Triggered when topic labels are substantially changed.
96+
* @param topicName The affected topic.
97+
* @param allLabels The latest snapshot of all labels.
98+
*/
99+
void onLabelsUpdate(TopicName topicName, Map<String, String> allLabels);
100+
}
101+
```
102+
103+
#### Listener Management in TopicPoliciesService
104+
105+
The TopicPoliciesService will maintain a list of TopicLabelsListener and notify them when a policy update includes label changes.
106+
107+
#### Notification Logic in Broker
108+
* Change Detection: When SystemTopicBasedTopicPoliciesService consumes a policy event, it compares oldLabels from the cache with newLabels from the event, then update in-memory inverted index (labelTopicInvertedIndex).
109+
110+
* Global Dispatch: If `!Objects.equals(oldLabels, newLabels)`, the service invokes `onLabelsUpdate(topicName, labels)` for all registered listeners.
111+
112+
* Watcher Evaluation: The TopicListService (registered as a listener) iterates through its TopicListWatcher instances.
113+
114+
#### TopicListWatcher State Machine
115+
116+
Each TopicListWatcher maintains a set of currently matched topics to perform state-based updates:
117+
118+
```java
119+
public void onLabelsUpdate(TopicName topicName, @Nullable Map<String, String> allLabels) {
120+
// allLabels can be null if the topic is deleted
121+
boolean wasMatching = matchedTopics.contains(topicName);
122+
boolean matchesNow = allLabels != null && labelMatcher.matches(allLabels);
123+
124+
List<String> newTopics = Collections.emptyList();
125+
List<String> deletedTopics = Collections.emptyList();
126+
127+
if (!wasMatching && matchesNow) {
128+
// It did not match before, but now it matches (matches after adding or modifying labels).
129+
newTopics = Collections.singletonList(topicName.toString());
130+
matchedTopics.add(topicName.toString());
131+
} else if (wasMatching && !matchesNow) {
132+
// It matched before, but now it no longer matches (fails to match after the Topic is deleted or its labels are modified).
133+
deletedTopics = Collections.singletonList(topicName.toString());
134+
matchedTopics.remove(topicName.toString());
135+
} else {
136+
// If the state remains unchanged
137+
return;
138+
}
139+
140+
String hash = TopicList.calculateHash(matchedTopics);
141+
topicListService.sendTopicListUpdate(id, hash, deletedTopics, newTopics);
142+
}
143+
```
144+
145+
### Client Implementation Details
146+
The Client implementation acts as an orchestrator to support the "Cross-Namespace" requirement defined in topicsByLabel.
147+
148+
* Multi-Watcher Orchestration:
149+
150+
When `subscribe()` is called, the Client iterates over the provided namespaces.
151+
For each namespace, it initiates a CommandWatchTopicList request.
152+
153+
* Aggregation:
154+
155+
The Client maintains a unified view of matching topics.
156+
New Topic Event: If any of the namespace watchers receives a NEW_TOPIC update (meaning a topic in that namespace matched the labels), the Client adds it to the list and creates a child consumer.
157+
Deleted Topic Event: If a watcher receives a DELETED_TOPIC update (topic deleted or label removed), the Client closes the corresponding child consumer.
158+
159+
* Deterministic Scope:
160+
161+
Unlike Regex subscription which might require scanning metadata for matching namespaces, this design relies on the user providing the `Set<String>` namespaces.
162+
### Public API & CLI Changes
163+
164+
#### Set Custom Labels:
165+
166+
* CLI: pulsar-admin topics set-custom-labels <topic-name> --labels "key1=value1,key2=value2"
167+
* REST API: POST /admin/v2/topics/{tenant}/{namespace}/{topic}/custom-labels with a JSON payload {"labels": {"key1":"value1", "key2":"value2"}}
168+
* Action: Sets or updates custom labels for the specified topic. The broker (or admin client before sending) will validate that all provided keys (e.g., key1, key2) are present in the allowedCustomLabelKeys list defined in broker.conf. Invalid keys will result in an error. This operation will update the topic's policy and publish a change event to the system topic (__change_events) for that namespace.
169+
170+
#### Get Custom Labels:
171+
172+
* CLI: pulsar-admin topics get-custom-labels <topic-name>
173+
* REST API: GET /admin/v2/topics/{tenant}/{namespace}/{topic}/custom-labels
174+
* Action: Retrieves the currently set custom labels for the topic.
175+
176+
#### Remove Custom Labels:
177+
178+
* CLI:
179+
- pulsar-admin topics remove-custom-labels <topic-name> --labels "key1,key2" (to remove specific labels)
180+
- pulsar-admin topics remove-custom-labels <topic-name> --all (to remove all custom labels from the topic)
181+
* pulsar-admin topics remove-custom-labels <topic-name> --labels "key1,key2" (to remove specific labels)
182+
* pulsar-admin topics remove-custom-labels <topic-name> --all (to remove all custom labels from the topic)
183+
* REST API: DELETE /admin/v2/topics/{tenant}/{namespace}/{topic}/custom-labels with a query params keys=k1&keys=k2 or all=true.
184+
* Action: Removes the specified custom labels or all custom labels from the topic. This also updates the topic policy.
185+
186+
#### Query topic associated with specific labels:
187+
188+
* CLI: pulsar-admin topics list <namespace> --custome-labels "k1:v1,k2:v2"
189+
* REST API: GET /admin/v2/topics/{tenant}/{namespace} with a query params custome-labels=k1:v1,k2:v2
190+
191+
#### Get all existing labels within a namespace:
192+
193+
* CLI: pulsar-admin namespaces list-topic-custome-labels <namespace>
194+
* REST API: GET
195+
/admin/v2/topics/{tenant}/{namespace}/topicCustomeLabels
196+
197+
### Consumer API (Client)
198+
Introduce `topicsByLabel` in the ConsumerBuilder.
199+
```java
200+
public interface ConsumerBuilder<T> {
201+
202+
// ... existing methods ...
203+
204+
/**
205+
* Subscribe to topics matching the given labels within specific namespaces.
206+
* * @param labels The Key-Value pairs that a topic must match (AND semantics).
207+
* @param namespaces The set of namespaces to look for topics.
208+
* If empty, defaults to the current consumer's namespace.
209+
*/
210+
ConsumerBuilder<T> topicsByLabel(Map<String, String> labels, Set<String> namespaces);
211+
}
212+
```
213+
### Binary protocol
214+
215+
Extend CommandWatchTopicList to include label_selector:
216+
```protobuf
217+
message CommandWatchTopicList {
218+
required uint64 request_id = 1;
219+
required uint64 watcher_id = 2;
220+
required string namespace = 3;
221+
required string topics_pattern = 4;
222+
// Only present when the client reconnects:
223+
optional string topics_hash = 5;
224+
225+
226+
// If not empty, the broker filters topics by these labels.
227+
// 'topics_pattern' should be ignored if this field is set.
228+
repeated KeyValue labels = 6;
229+
}
230+
```
231+
232+
### Configuration Changes
233+
234+
maxCustomLabelValueLength=<integer>
235+
Description: The maximum character length for a custom label value.
236+
Default: 128
237+
238+
# Security Considerations
239+
Authorization: Setting labels modifies Topic Policies, which requires update-policies permission on the Namespace or Topic.
240+
Filtering: When returning the list of topics matching a label via CommandWatchTopicList, the Broker must verify that the subscribing client has consume permission on each of those individual topics before including them in the response.
241+
242+
# Backward Compatibility
243+
Protocol: Old brokers will not understand the label_selector field in CommandWatchTopicList.
244+
Behavior: Old brokers might ignore the `label_selector` field and return all topics (acting as a simple Watch).
245+
Mitigation: The Client library must check the Broker's protocol version or capabilities. If the Broker does not support Label Subscription, the Client should throw a NotSupportedException rather than falling back to consuming all topics (which would be dangerous).
246+
API: This is a purely additive change to the Public API.
247+
248+
# Alternatives
249+
Using topic properties to store labels, since we need to reverse query the topic list through labels, simply storing labels in the topic properties is insufficient. This is because it is difficult to collect all properties under a single namespace. Although this issue can be resolved by adding a secondary index of {labels}/{topic-list} in the metadata, it may introduce challenges to data consistency. Therefore, this is not considered a priority implementation solution.
250+
251+
# Links
252+
253+
254+
* Mailing List discussion thread:
255+
* Mailing List voting thread:

0 commit comments

Comments
 (0)