diff --git a/pom.xml b/pom.xml index cc226b5..c105b7a 100644 --- a/pom.xml +++ b/pom.xml @@ -12,6 +12,8 @@ 1.3.6.RELEASE -Xdoclint:none 2.10.0 + 0.11.0.3 + 2.11 -Xdoclint:none @@ -60,18 +62,21 @@ org.apache.kafka - kafka_2.9.2 - 0.8.2.2 + kafka_${kafka.scala.version} + ${kafka.version} + + + org.apache.kafka + kafka-clients + ${kafka.version} org.freemarker freemarker - 2.3.23 org.springframework.retry spring-retry - 1.1.3.RELEASE @@ -123,7 +128,6 @@ junit junit - 4.11 test @@ -137,13 +141,11 @@ org.codehaus.groovy groovy-all - 2.4.6 test org.spockframework spock-core - 1.0-groovy-2.4 test @@ -152,13 +154,6 @@ 0.7.1 test - - - org.apache.kafka - kafka-clients - 0.8.2.2 - test - diff --git a/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java b/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java index 84df01b..4cf24ff 100644 --- a/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java +++ b/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java @@ -24,13 +24,14 @@ import com.homeadvisor.kafdrop.model.*; import com.homeadvisor.kafdrop.util.BrokerChannel; import com.homeadvisor.kafdrop.util.Version; -import kafka.api.ConsumerMetadataRequest; +import kafka.api.GroupCoordinatorRequest; import kafka.api.PartitionOffsetRequestInfo; -import kafka.cluster.Broker; +import kafka.cluster.BrokerEndPoint; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import kafka.javaapi.*; import kafka.network.BlockingChannel; +import kafka.server.ConfigType; import kafka.utils.ZKGroupDirs; import kafka.utils.ZKGroupTopicDirs; import kafka.utils.ZkUtils; @@ -128,7 +129,7 @@ public void start() throws Exception }); brokerPathCache.start(StartMode.POST_INITIALIZED_EVENT); - topicConfigPathCache = new PathChildrenCache(curatorFramework, ZkUtils.TopicConfigPath(), true); + topicConfigPathCache = new PathChildrenCache(curatorFramework, ZkUtils.getEntityConfigPath(ConfigType.Topic()), true); topicConfigPathCache.getListenable().addListener((f, e) -> { if (e.getType() == PathChildrenCacheEvent.Type.INITIALIZED) { @@ -392,7 +393,7 @@ public TopicVO parseZkTopic(ChildData input) objectMapper.reader(TopicRegistrationVO.class).readValue(input.getData()); topic.setConfig( - Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.TopicConfigPath() + "/" + topic.getName())) + Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.getEntityConfigPath(ConfigType.Topic()) + "/" + topic.getName())) .map(this::readTopicConfig) .orElse(Collections.emptyMap())); @@ -437,7 +438,7 @@ private Map getTopicMetadata(BlockingChannel channel, String... channel.send(request); final kafka.api.TopicMetadataResponse underlyingResponse = - kafka.api.TopicMetadataResponse.readFrom(channel.receive().buffer()); + kafka.api.TopicMetadataResponse.readFrom(channel.receive().payload()); LOG.debug("Received topic metadata response: {}", underlyingResponse); @@ -453,7 +454,7 @@ private TopicVO processTopicMetadata(TopicMetadata tmd) TopicVO topic = new TopicVO(tmd.topic()); topic.setConfig( - Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.TopicConfigPath() + "/" + topic.getName())) + Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.getEntityConfigPath(ConfigType.Topic()) + "/" + topic.getName())) .map(this::readTopicConfig) .orElse(Collections.emptyMap())); @@ -482,7 +483,7 @@ private TopicPartitionVO parsePartitionMetadata(String topic, PartitionMetadata private List getIsr(String topic, PartitionMetadata pmd) { - return pmd.isr().stream().map(Broker::id).collect(Collectors.toList()); + return pmd.isr().stream().map(BrokerEndPoint::id).collect(Collectors.toList()); } private Map readTopicConfig(ChildData d) @@ -741,14 +742,14 @@ private Map getConsumerOffsets(BlockingChannel channel, channel.send(request.underlying()); final kafka.api.OffsetFetchResponse underlyingResponse = - kafka.api.OffsetFetchResponse.readFrom(channel.receive().buffer()); + kafka.api.OffsetFetchResponse.readFrom(channel.receive().payload()); LOG.debug("Received consumer offset response: {}", underlyingResponse); OffsetFetchResponse response = new OffsetFetchResponse(underlyingResponse); return response.offsets().entrySet().stream() - .filter(entry -> entry.getValue().error() == ErrorMapping.NoError()) + .filter(entry -> entry.getValue().error().code() == ErrorMapping.NoError()) .collect(Collectors.toMap(entry -> entry.getKey().partition(), entry -> entry.getValue().offset())); } @@ -766,14 +767,14 @@ private Integer offsetManagerBroker(String groupId) private Integer offsetManagerBroker(BlockingChannel channel, String groupId) { - final ConsumerMetadataRequest request = - new ConsumerMetadataRequest(groupId, (short) 0, 0, clientId()); + final GroupCoordinatorRequest request = + new GroupCoordinatorRequest(groupId, (short) 0, 0, clientId()); LOG.debug("Sending consumer metadata request: {}", request); channel.send(request); - ConsumerMetadataResponse response = - ConsumerMetadataResponse.readFrom(channel.receive().buffer()); + GroupCoordinatorResponse response = + GroupCoordinatorResponse.readFrom(channel.receive().payload()); LOG.debug("Received consumer metadata response: {}", response); @@ -866,7 +867,7 @@ private OffsetResponse sendOffsetRequest(Integer brokerId, TopicVO topic, { channel.send(offsetRequest.underlying()); final kafka.api.OffsetResponse underlyingResponse = - kafka.api.OffsetResponse.readFrom(channel.receive().buffer()); + kafka.api.OffsetResponse.readFrom(channel.receive().payload()); LOG.debug("Received offset response: {}", underlyingResponse);