Skip to content

Commit 82ac126

Browse files
Merge branch 'master' into chore-service-bot-update
2 parents b4e073e + e784c26 commit 82ac126

File tree

10 files changed

+589
-231
lines changed

10 files changed

+589
-231
lines changed

client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222
import com.google.common.cache.CacheLoader;
2323
import com.google.common.cache.LoadingCache;
2424
import com.google.common.collect.ImmutableMap;
25+
import io.confluent.kafka.schemaregistry.client.rest.entities.Association;
26+
import io.confluent.kafka.schemaregistry.client.rest.entities.LifecyclePolicy;
2527
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaRegistryServerVersion;
28+
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.AssociationCreateOrUpdateRequest;
29+
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.AssociationResponse;
2630
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
2731
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
2832
import io.confluent.kafka.schemaregistry.utils.QualifiedSubject;
@@ -1103,6 +1107,56 @@ public void close() throws IOException {
11031107
}
11041108
}
11051109

1110+
@Override
1111+
public AssociationResponse createAssociation(AssociationCreateOrUpdateRequest request)
1112+
throws IOException, RestClientException {
1113+
return restService.createAssociation(DEFAULT_REQUEST_PROPERTIES, null, false, request);
1114+
}
1115+
1116+
@Override
1117+
public AssociationResponse createOrUpdateAssociation(AssociationCreateOrUpdateRequest request)
1118+
throws IOException, RestClientException {
1119+
return restService.createOrUpdateAssociation(DEFAULT_REQUEST_PROPERTIES, null, false, request);
1120+
}
1121+
1122+
@Override
1123+
public List<Association> getAssociationsBySubject(String subject,
1124+
String resourceType, List<String> associationTypes, String lifecycle, int offset, int limit)
1125+
throws IOException, RestClientException {
1126+
LifecyclePolicy lifecyclePolicy = lifecycle != null ? LifecyclePolicy.valueOf(lifecycle) : null;
1127+
return restService.getAssociationsBySubject(
1128+
DEFAULT_REQUEST_PROPERTIES, subject, resourceType, associationTypes, lifecyclePolicy,
1129+
offset, limit);
1130+
}
1131+
1132+
@Override
1133+
public List<Association> getAssociationsByResourceId(String resourceId,
1134+
String resourceType, List<String> associationTypes, String lifecycle, int offset, int limit)
1135+
throws IOException, RestClientException {
1136+
LifecyclePolicy lifecyclePolicy = lifecycle != null ? LifecyclePolicy.valueOf(lifecycle) : null;
1137+
return restService.getAssociationsByResourceId(
1138+
DEFAULT_REQUEST_PROPERTIES, resourceId, resourceType, associationTypes, lifecyclePolicy,
1139+
offset, limit);
1140+
}
1141+
1142+
@Override
1143+
public List<Association> getAssociationsByResourceName(String resourceName,
1144+
String resourceNamespace, String resourceType, List<String> associationTypes,
1145+
String lifecycle, int offset, int limit) throws IOException, RestClientException {
1146+
LifecyclePolicy lifecyclePolicy = lifecycle != null ? LifecyclePolicy.valueOf(lifecycle) : null;
1147+
return restService.getAssociationsByResourceName(
1148+
DEFAULT_REQUEST_PROPERTIES, resourceName, resourceNamespace, resourceType, associationTypes,
1149+
lifecyclePolicy, offset, limit);
1150+
}
1151+
1152+
@Override
1153+
public void deleteAssociations(String resourceId, String resourceType,
1154+
List<String> associationTypes, boolean cascadeLifecycle)
1155+
throws IOException, RestClientException {
1156+
restService.deleteAssociations(
1157+
DEFAULT_REQUEST_PROPERTIES, resourceId, resourceType, associationTypes, cascadeLifecycle);
1158+
}
1159+
11061160
private void checkMissingSchemaCache(String subject, ParsedSchema schema, boolean normalize)
11071161
throws RestClientException {
11081162
if (missingSchemaCache.getIfPresent(

client/src/main/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClient.java

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.util.TreeMap;
4545
import java.util.UUID;
4646
import java.util.concurrent.ConcurrentHashMap;
47+
import java.util.concurrent.CopyOnWriteArrayList;
4748
import java.util.concurrent.ExecutionException;
4849
import java.util.Arrays;
4950
import java.util.Objects;
@@ -98,6 +99,7 @@ public class MockSchemaRegistryClient implements SchemaRegistryClient {
9899
private final Map<String, List<Association>> subjectToAssocCache;
99100
private final Map<ResourceAndAssocType, Association> resourceAndAssocTypeCache;
100101
private final Map<String, List<Association>> resourceIdToAssocCache;
102+
private final Map<String, Map<String, List<Association>>> resourceNameToAssocCache;
101103
private final Map<String, String> modes;
102104
private final Map<String, AtomicInteger> ids;
103105
private final LoadingCache<Schema, ParsedSchema> parsedSchemaCache;
@@ -152,6 +154,7 @@ public MockSchemaRegistryClient(List<SchemaProvider> providers) {
152154
subjectToAssocCache = new ConcurrentHashMap<>();
153155
resourceAndAssocTypeCache = new ConcurrentHashMap<>();
154156
resourceIdToAssocCache = new ConcurrentHashMap<>();
157+
resourceNameToAssocCache = new ConcurrentHashMap<>();
155158
modes = new ConcurrentHashMap<>();
156159
ids = new ConcurrentHashMap<>();
157160
this.providers = providers != null && !providers.isEmpty()
@@ -1003,12 +1006,19 @@ private synchronized void createAssociationsHelper(AssociationCreateOrUpdateRequ
10031006

10041007
subjectToAssocCache.computeIfAbsent(
10051008
associationInRequest.getSubject(),
1006-
k -> new ArrayList<>()
1009+
k -> new CopyOnWriteArrayList<>()
10071010
).add(newAssociation);
10081011

10091012
resourceIdToAssocCache.computeIfAbsent(
10101013
request.getResourceId(),
1011-
k -> new ArrayList<>()
1014+
k -> new CopyOnWriteArrayList<>()
1015+
).add(newAssociation);
1016+
1017+
resourceNameToAssocCache.computeIfAbsent(
1018+
request.getResourceName(),
1019+
k -> new ConcurrentHashMap<>()
1020+
).computeIfAbsent(request.getResourceNamespace(),
1021+
k -> new CopyOnWriteArrayList<>()
10121022
).add(newAssociation);
10131023
}
10141024
}
@@ -1167,6 +1177,70 @@ public List<Association> getAssociationsByResourceId(String resourceId, String r
11671177
return filtered.subList(start, end);
11681178
}
11691179

1180+
public List<Association> getAssociationsByResourceName(String resourceName,
1181+
String resourceNamespace,
1182+
String resourceType,
1183+
List<String> associationTypes,
1184+
String lifecycle, int offset, int limit)
1185+
throws IOException, RestClientException {
1186+
if (resourceName == null || resourceName.isEmpty()) {
1187+
throw new RestClientException("Association parameters are invalid", 422, 42212);
1188+
}
1189+
if (lifecycle != null) {
1190+
try {
1191+
LifecyclePolicy.valueOf(lifecycle);
1192+
} catch (IllegalArgumentException e) {
1193+
throw new RestClientException("Association parameters are invalid", 422, 42212);
1194+
}
1195+
}
1196+
1197+
Map<String, List<Association>> namespaceMap = resourceNameToAssocCache.get(resourceName);
1198+
if (namespaceMap == null || namespaceMap.isEmpty()) {
1199+
return new ArrayList<>(); // Return empty list
1200+
}
1201+
1202+
List<Association> associations = new ArrayList<>();
1203+
// If resourceNamespace is null or "*", collect from all namespaces
1204+
if (resourceNamespace == null || WILDCARD.equals(resourceNamespace)) {
1205+
for (List<Association> assocList : namespaceMap.values()) {
1206+
associations.addAll(assocList);
1207+
}
1208+
} else {
1209+
// Get associations from specific namespace
1210+
List<Association> namespaceAssociations = namespaceMap.get(resourceNamespace);
1211+
if (namespaceAssociations != null) {
1212+
associations.addAll(namespaceAssociations);
1213+
}
1214+
}
1215+
1216+
if (associations.isEmpty()) {
1217+
return new ArrayList<>(); // Return empty list
1218+
}
1219+
1220+
List<Association> filtered = associations.stream()
1221+
.filter(association ->
1222+
resourceType == null || association.getResourceType().equals(resourceType))
1223+
.filter(association ->
1224+
associationTypes == null
1225+
|| associationTypes.isEmpty()
1226+
|| associationTypes.contains(association.getAssociationType()))
1227+
.filter(association ->
1228+
lifecycle == null || association.getLifecycle().toString().equals(lifecycle))
1229+
.collect(Collectors.toList());
1230+
1231+
// Apply pagination
1232+
int start = offset;
1233+
if (start > filtered.size()) {
1234+
start = filtered.size();
1235+
}
1236+
1237+
int end = start + limit;
1238+
if (limit <= 0 || end > filtered.size()) {
1239+
end = filtered.size();
1240+
}
1241+
return filtered.subList(start, end);
1242+
}
1243+
11701244
private void checkDeleteAssociation(Association association, boolean cascadeLifecycle)
11711245
throws RestClientException {
11721246
if (!cascadeLifecycle && association.getLifecycle() == LifecyclePolicy.STRONG
@@ -1185,6 +1259,13 @@ private void deleteAssociation(Association association, boolean cascadeLifecycle
11851259
deleteSubjectNoAssociationsCheck(null, subject, false);
11861260
deleteSubjectNoAssociationsCheck(null, subject, true);
11871261
}
1262+
resourceNameToAssocCache.computeIfPresent(association.getResourceName(), (k, map) -> {
1263+
map.computeIfPresent(association.getResourceNamespace(), (k2, list) -> {
1264+
list.remove(association);
1265+
return list.isEmpty() ? null : list;
1266+
});
1267+
return map.isEmpty() ? null : map;
1268+
});
11881269
resourceIdToAssocCache.computeIfPresent(resourceId, (k, list) -> {
11891270
list.remove(association);
11901271
return list.isEmpty() ? null : list;

client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClient.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,16 @@ public default List<Association> getAssociationsByResourceId(String resourceId,
439439
throw new UnsupportedOperationException();
440440
}
441441

442+
public default List<Association> getAssociationsByResourceName(String resourceName,
443+
String resourceNamespace,
444+
String resourceType,
445+
List<String> associationTypes,
446+
String lifecycle,
447+
int offset, int limit)
448+
throws IOException, RestClientException {
449+
throw new UnsupportedOperationException();
450+
}
451+
442452
public default void deleteAssociations(String resourceId, String resourceType,
443453
List<String> associationTypes, boolean cascadeLifecycle)
444454
throws IOException, RestClientException {

client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/RestService.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.confluent.kafka.schemaregistry.client.rest.entities.Config;
3030
import io.confluent.kafka.schemaregistry.client.rest.entities.ContextId;
3131
import io.confluent.kafka.schemaregistry.client.rest.entities.ErrorMessage;
32+
import io.confluent.kafka.schemaregistry.client.rest.entities.LifecyclePolicy;
3233
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
3334
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaRegistryServerVersion;
3435
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaRegistryDeployment;
@@ -1964,6 +1965,94 @@ public AssociationBatchResponse createOrUpdateAssociations(
19641965
return response;
19651966
}
19661967

1968+
public List<Association> getAssociationsBySubject(
1969+
Map<String, String> requestProperties,
1970+
String subject, String resourceType, List<String> associationTypes,
1971+
LifecyclePolicy lifecycle, int offset, int limit)
1972+
throws IOException, RestClientException {
1973+
UriBuilder builder =
1974+
UriBuilder.fromPath("/associations/subjects/{subject}");
1975+
if (resourceType != null) {
1976+
builder.queryParam("resourceType", resourceType);
1977+
}
1978+
for (String associationType : associationTypes) {
1979+
builder.queryParam("associationType", associationType);
1980+
}
1981+
if (lifecycle != null) {
1982+
builder.queryParam("lifecycle", lifecycle.toString());
1983+
}
1984+
if (offset > 0) {
1985+
builder.queryParam("offset", offset);
1986+
}
1987+
if (limit >= 0) {
1988+
builder.queryParam("limit", limit);
1989+
}
1990+
String path = builder.build(subject).toString();
1991+
1992+
List<Association> response = httpRequest(path, "GET", null,
1993+
requestProperties, ASSOCIATIONS_RESPONSE_TYPE);
1994+
return response;
1995+
}
1996+
1997+
public List<Association> getAssociationsByResourceId(
1998+
Map<String, String> requestProperties,
1999+
String resourceId, String resourceType, List<String> associationTypes,
2000+
LifecyclePolicy lifecycle, int offset, int limit)
2001+
throws IOException, RestClientException {
2002+
UriBuilder builder =
2003+
UriBuilder.fromPath("/associations/resources/{resourceId}");
2004+
if (resourceType != null) {
2005+
builder.queryParam("resourceType", resourceType);
2006+
}
2007+
for (String associationType : associationTypes) {
2008+
builder.queryParam("associationType", associationType);
2009+
}
2010+
if (lifecycle != null) {
2011+
builder.queryParam("lifecycle", lifecycle.toString());
2012+
}
2013+
if (offset > 0) {
2014+
builder.queryParam("offset", offset);
2015+
}
2016+
if (limit >= 1) {
2017+
builder.queryParam("limit", limit);
2018+
}
2019+
String path = builder.build(resourceId).toString();
2020+
2021+
List<Association> response = httpRequest(path, "GET", null,
2022+
requestProperties, ASSOCIATIONS_RESPONSE_TYPE);
2023+
return response;
2024+
}
2025+
2026+
public List<Association> getAssociationsByResourceName(
2027+
Map<String, String> requestProperties,
2028+
String resourceName, String resourceNamespace,
2029+
String resourceType, List<String> associationTypes, LifecyclePolicy lifecycle,
2030+
int offset, int limit)
2031+
throws IOException, RestClientException {
2032+
UriBuilder builder =
2033+
UriBuilder.fromPath("/associations/resources/{resourceNamespace}/{resourceName}");
2034+
if (resourceType != null) {
2035+
builder.queryParam("resourceType", resourceType);
2036+
}
2037+
for (String associationType : associationTypes) {
2038+
builder.queryParam("associationType", associationType);
2039+
}
2040+
if (lifecycle != null) {
2041+
builder.queryParam("lifecycle", lifecycle.toString());
2042+
}
2043+
if (offset > 0) {
2044+
builder.queryParam("offset", offset);
2045+
}
2046+
if (limit >= 1) {
2047+
builder.queryParam("limit", limit);
2048+
}
2049+
String path = builder.build(resourceNamespace, resourceName).toString();
2050+
2051+
List<Association> response = httpRequest(path, "GET", null,
2052+
requestProperties, ASSOCIATIONS_RESPONSE_TYPE);
2053+
return response;
2054+
}
2055+
19672056
public void deleteAssociations(
19682057
Map<String, String> requestProperties,
19692058
String resourceId, String resourceType, List<String> associationTypes,

client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/requests/AssociationCreateOrUpdateRequest.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@
3434
@JsonIgnoreProperties(ignoreUnknown = true)
3535
public class AssociationCreateOrUpdateRequest {
3636

37-
private static final String DEFAULT_RESOURCE_TYPE = "topic";
38-
private static final String DEFAULT_ASSOCIATION_TYPE = "value";
37+
private static final String TOPIC_RESOURCE_TYPE = "topic";
38+
private static final String KEY_ASSOCIATION_TYPE = "key";
39+
private static final String VALUE_ASSOCIATION_TYPE = "value";
3940
private static final LifecyclePolicy DEFAULT_LIFECYCLE = LifecyclePolicy.STRONG;
4041

4142
private String resourceName;
@@ -138,16 +139,24 @@ public void validate() {
138139
throw new IllegalPropertyException("resourceId", "cannot be null or empty");
139140
}
140141
if (getResourceType() != null && !getResourceType().isEmpty()) {
141-
checkName(getResourceType(), "resourceType");
142+
if (!getResourceType().equals(TOPIC_RESOURCE_TYPE)) {
143+
throw new IllegalPropertyException(
144+
"resourceType", "must be '" + TOPIC_RESOURCE_TYPE + "'");
145+
}
142146
} else {
143-
setResourceType(DEFAULT_RESOURCE_TYPE);
147+
setResourceType(TOPIC_RESOURCE_TYPE);
144148
}
145149
for (AssociationCreateOrUpdateInfo info : getAssociations()) {
146150
checkSubject(info.getSubject());
147151
if (info.getAssociationType() != null && !info.getAssociationType().isEmpty()) {
148-
checkName(info.getAssociationType(), "associationType");
152+
if (!info.getAssociationType().equals(KEY_ASSOCIATION_TYPE)
153+
&& !info.getAssociationType().equals(VALUE_ASSOCIATION_TYPE)) {
154+
throw new IllegalPropertyException(
155+
"associationType",
156+
"must be either '" + KEY_ASSOCIATION_TYPE + "' or '" + VALUE_ASSOCIATION_TYPE + "'");
157+
}
149158
} else {
150-
info.setAssociationType(DEFAULT_ASSOCIATION_TYPE);
159+
info.setAssociationType(VALUE_ASSOCIATION_TYPE);
151160
}
152161
if (info.getLifecycle() == LifecyclePolicy.WEAK) {
153162
if (Boolean.TRUE.equals(info.getFrozen())) {

client/src/main/java/io/confluent/kafka/schemaregistry/utils/QualifiedSubject.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,32 @@ public static QualifiedSubject createFromUnqualified(String tenant, String unqua
227227
}
228228
}
229229

230+
/**
231+
* Creates a QualifiedSubject from the given tenant and unqualified context and subject.
232+
*
233+
* @param tenant the tenant
234+
* @param unqualifiedContext the context without a tenant prefix
235+
* @param unqualifiedSubject the subject without a tenant prefix
236+
* @return the QualifiedSubject, or null if the unqualified subject is null or invalid
237+
*/
238+
public static QualifiedSubject createFromUnqualified(
239+
String tenant, String unqualifiedContext, String unqualifiedSubject) {
240+
try {
241+
if (unqualifiedSubject == null) {
242+
return null;
243+
}
244+
if (unqualifiedContext != null
245+
&& !unqualifiedContext.isEmpty()
246+
&& !unqualifiedSubject.startsWith(CONTEXT_PREFIX)) {
247+
unqualifiedSubject = QualifiedSubject.normalizeContext(unqualifiedContext)
248+
+ unqualifiedSubject;
249+
}
250+
return QualifiedSubject.createFromUnqualified(tenant, unqualifiedSubject);
251+
} catch (IllegalArgumentException e) {
252+
return null;
253+
}
254+
}
255+
230256
/**
231257
* Returns the context for the given qualified subject.
232258
*

0 commit comments

Comments
 (0)