diff --git a/kaas_local_jaas.conf b/kaas_local_jaas.conf
new file mode 100644
index 0000000..6dc2616
--- /dev/null
+++ b/kaas_local_jaas.conf
@@ -0,0 +1,11 @@
+Client {
+ org.apache.zookeeper.server.auth.DigestLoginModule required
+ username="xyz"
+ password="xyz";
+};
+
+KafkaClient {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="xyz"
+ password="xyz";
+};
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 1e73b39..2003600 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,6 +74,18 @@
kafka_2.9.2
0.8.2.2
+
+
+ org.projectlombok
+ lombok
+ 1.16.6
+
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.11.0.2
+
org.freemarker
freemarker
@@ -197,7 +209,7 @@
org.apache.maven.plugins
maven-compiler-plugin
- 2.3.2
+ 3.7.0
1.8
1.8
diff --git a/src/main/java/com/homeadvisor/kafdrop/KafDrop.java b/src/main/java/com/homeadvisor/kafdrop/KafDrop.java
index 47ed3de..ad66669 100644
--- a/src/main/java/com/homeadvisor/kafdrop/KafDrop.java
+++ b/src/main/java/com/homeadvisor/kafdrop/KafDrop.java
@@ -21,6 +21,7 @@
import com.google.common.base.Throwables;
import com.homeadvisor.kafdrop.config.ini.IniFilePropertySource;
import com.homeadvisor.kafdrop.config.ini.IniFileReader;
+import joptsimple.internal.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.Banner;
@@ -36,7 +37,12 @@
import org.springframework.web.servlet.config.annotation.ContentNegotiationConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
-import java.io.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
import java.util.Objects;
import java.util.stream.Stream;
@@ -83,17 +89,15 @@ public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event)
catch (Exception ex)
{
System.err.println("Unable to set up logging.dir from logging.file " + loggingFile + ": " +
- Throwables.getStackTraceAsString(ex));
+ Throwables.getStackTraceAsString(ex));
}
}
if (environment.containsProperty("debug") &&
- !"false".equalsIgnoreCase(environment.getProperty("debug", String.class)))
+ !"false".equalsIgnoreCase(environment.getProperty("debug", String.class)))
{
System.setProperty(PROP_SPRING_BOOT_LOG_LEVEL, "DEBUG");
}
-
}
-
}
private static class EnvironmentSetupListener implements ApplicationListener, Ordered
@@ -107,19 +111,41 @@ public int getOrder()
return Ordered.HIGHEST_PRECEDENCE + 10;
}
- @Override
- public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event)
- {
- final ConfigurableEnvironment environment = event.getEnvironment();
- if (environment.containsProperty(SM_CONFIG_DIR))
- {
- Stream.of("kafdrop", "global")
- .map(name -> readProperties(environment, name))
- .filter(Objects::nonNull)
- .forEach(iniPropSource -> environment.getPropertySources()
- .addBefore("applicationConfigurationProperties", iniPropSource));
- }
- }
+ @Override
+ public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event)
+ {
+ final ConfigurableEnvironment environment = event.getEnvironment();
+
+ LOG.info("Initializing jaas config");
+ String env = environment.getProperty("kafka.env");
+ Boolean isSecured = environment.getProperty("kafka.isSecured", Boolean.class);
+ LOG.info("env: {} .Issecured kafka: {}", env, isSecured);
+ if (isSecured && Strings.isNullOrEmpty(env)) {
+ throw new RuntimeException("'env' cannot be null if connecting to secured kafka.");
+ }
+
+ LOG.info("ENV: {}", env);
+ String path;
+
+ if (isSecured) {
+ if ((env.equalsIgnoreCase("stage") || env.equalsIgnoreCase("prod") || env.equalsIgnoreCase("local"))) {
+ path = environment.getProperty("user.dir") + "/kaas_" + env.toLowerCase() + "_jaas.conf";
+ LOG.info("PATH: {}", path);
+ System.setProperty("java.security.auth.login.config", path);
+ }
+ else {
+ throw new RuntimeException("unable to identify env. set 'evn' variable either to 'stage' or 'prod' or local");
+ }
+ }
+
+ if (environment.containsProperty(SM_CONFIG_DIR)) {
+ Stream.of("kafdrop", "global")
+ .map(name -> readProperties(environment, name))
+ .filter(Objects::nonNull)
+ .forEach(iniPropSource -> environment.getPropertySources()
+ .addBefore("applicationConfigurationProperties", iniPropSource));
+ }
+ }
private IniFilePropertySource readProperties(Environment environment, String name)
{
diff --git a/src/main/java/com/homeadvisor/kafdrop/config/KafkaConfiguration.java b/src/main/java/com/homeadvisor/kafdrop/config/KafkaConfiguration.java
new file mode 100644
index 0000000..5a78edb
--- /dev/null
+++ b/src/main/java/com/homeadvisor/kafdrop/config/KafkaConfiguration.java
@@ -0,0 +1,22 @@
+package com.homeadvisor.kafdrop.config;
+
+import lombok.*;
+import org.springframework.boot.context.properties.*;
+import org.springframework.stereotype.*;
+
+/**
+ * Created by Satendra Sahu on 9/26/18
+ */
+@Component
+@ConfigurationProperties(prefix = "kafka")
+@Data
+public class KafkaConfiguration
+{
+ private String env = "local";
+ private String brokerConnect;
+ private Boolean isSecured = false;
+ private String keyDeserializer;
+ private String valueDeserializer;
+ private String saslMechanism;
+ private String securityProtocol;
+}
diff --git a/src/main/java/com/homeadvisor/kafdrop/controller/ClusterController.java b/src/main/java/com/homeadvisor/kafdrop/controller/ClusterController.java
index 6ed7e4b..7af3a08 100644
--- a/src/main/java/com/homeadvisor/kafdrop/controller/ClusterController.java
+++ b/src/main/java/com/homeadvisor/kafdrop/controller/ClusterController.java
@@ -28,10 +28,16 @@
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
-import org.springframework.web.bind.annotation.*;
+import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.ResponseStatus;
import java.util.Collections;
import java.util.List;
@@ -112,4 +118,10 @@ public static class ClusterInfoVO
public List brokers;
public List topics;
}
+
+ @ResponseStatus(HttpStatus.OK)
+ @RequestMapping("/health_check")
+ public void healthCheck()
+ {
+ }
}
diff --git a/src/main/java/com/homeadvisor/kafdrop/model/BrokerVO.java b/src/main/java/com/homeadvisor/kafdrop/model/BrokerVO.java
index 7e8ba7b..660a552 100644
--- a/src/main/java/com/homeadvisor/kafdrop/model/BrokerVO.java
+++ b/src/main/java/com/homeadvisor/kafdrop/model/BrokerVO.java
@@ -18,88 +18,113 @@
package com.homeadvisor.kafdrop.model;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Date;
+@JsonIgnoreProperties(ignoreUnknown = true)
public class BrokerVO
{
private int id;
private String host;
+ private String[] endpoints;
private int port;
private int jmxPort;
private int version;
private boolean controller;
private Date timestamp;
+ public void setEndpoints(String[] endpoints)
+ {
+ this.endpoints = endpoints;
+ if (host == null)
+ {
+ String[] hostPort = endpoints[0].split("://")[1].split(":");
+ this.host = hostPort[0];
+ this.port = Integer.parseInt(hostPort[1]);
+ }
+ }
+
+ public String[] getEndpoints()
+ {
+ return this.endpoints;
+ }
+
public int getId()
{
- return id;
+ return id;
}
public void setId(int id)
{
- this.id = id;
+ this.id = id;
}
public String getHost()
{
- return host;
+ return host;
}
public void setHost(String host)
{
- this.host = host;
+ if (host != null)
+ {
+ this.host = host;
+ }
}
public int getPort()
{
- return port;
+ return port;
}
public void setPort(int port)
{
- this.port = port;
+ if (port > 0)
+ {
+ this.port = port;
+ }
}
public int getJmxPort()
{
- return jmxPort;
+ return jmxPort;
}
@JsonProperty("jmx_port")
public void setJmxPort(int jmxPort)
{
- this.jmxPort = jmxPort;
+ this.jmxPort = jmxPort;
}
public int getVersion()
{
- return version;
+ return version;
}
public void setVersion(int version)
{
- this.version = version;
+ this.version = version;
}
public Date getTimestamp()
{
- return timestamp;
+ return timestamp;
}
public void setTimestamp(Date timestamp)
{
- this.timestamp = timestamp;
+ this.timestamp = timestamp;
}
public boolean isController()
{
- return controller;
+ return controller;
}
public void setController(boolean controller)
{
- this.controller = controller;
+ this.controller = controller;
}
}
diff --git a/src/main/java/com/homeadvisor/kafdrop/model/TopicVO.java b/src/main/java/com/homeadvisor/kafdrop/model/TopicVO.java
index e48f0a5..92b184d 100644
--- a/src/main/java/com/homeadvisor/kafdrop/model/TopicVO.java
+++ b/src/main/java/com/homeadvisor/kafdrop/model/TopicVO.java
@@ -19,7 +19,7 @@
package com.homeadvisor.kafdrop.model;
import java.util.*;
-import java.util.stream.Collectors;
+import java.util.stream.*;
public class TopicVO implements Comparable
{
@@ -30,119 +30,125 @@ public class TopicVO implements Comparable
// partition state
// delete supported?
-
public TopicVO(String name)
{
- this.name = name;
+ this.name = name;
}
public String getName()
{
- return name;
+ return name;
}
public void setName(String name)
{
- this.name = name;
+ this.name = name;
}
public Map getConfig()
{
- return config;
+ return config;
}
public void setConfig(Map config)
{
- this.config = config;
+ this.config = config;
+ }
+
+ public Map getPartitionMap()
+ {
+ return partitions;
}
public Collection getPartitions()
{
- return partitions.values();
+ return partitions.values();
}
public Optional getPartition(int partitionId)
{
- return Optional.ofNullable(partitions.get(partitionId));
+ return Optional.ofNullable(partitions.get(partitionId));
}
public Collection getLeaderPartitions(int brokerId)
{
- return partitions.values().stream()
- .filter(tp -> tp.getLeader() != null && tp.getLeader().getId() == brokerId)
- .collect(Collectors.toList());
+ return partitions.values().stream()
+ .filter(tp -> tp.getLeader() != null && tp.getLeader().getId() == brokerId)
+ .collect(Collectors.toList());
}
public Collection getUnderReplicatedPartitions()
{
- return partitions.values().stream()
- .filter(TopicPartitionVO::isUnderReplicated)
- .collect(Collectors.toList());
+ return partitions.values().stream()
+ .filter(TopicPartitionVO::isUnderReplicated)
+ .collect(Collectors.toList());
}
public void setPartitions(Map partitions)
{
- this.partitions = partitions;
+ this.partitions = partitions;
}
/**
- * Returns the total number of messages published to the topic, ever
- * @return
- */
+ * Returns the total number of messages published to the topic, ever
+ * @return
+ */
public long getTotalSize()
{
- return partitions.values().stream()
- .map(TopicPartitionVO::getSize)
- .reduce(0L, Long::sum);
+ return partitions.values().stream()
+ .map(TopicPartitionVO::getSize)
+ .reduce(0L, Long::sum);
}
/**
- * Returns the total number of messages available to consume from the topic.
- * @return
- */
+ * Returns the total number of messages available to consume from the topic.
+ * @return
+ */
public long getAvailableSize()
{
- return partitions.values().stream()
- .map(p -> p.getSize() - p.getFirstOffset())
- .reduce(0L, Long::sum);
+ return partitions.values().stream()
+ .map(p -> p.getSize() - p.getFirstOffset())
+ .reduce(0L, Long::sum);
}
public double getPreferredReplicaPercent()
{
- long preferredLeaderCount = partitions.values().stream()
- .filter(TopicPartitionVO::isLeaderPreferred)
- .count();
- return ((double) preferredLeaderCount) / ((double)partitions.size());
+ long preferredLeaderCount = partitions.values().stream()
+ .filter(TopicPartitionVO::isLeaderPreferred)
+ .count();
+ return ((double) preferredLeaderCount) / ((double) partitions.size());
}
public void addPartition(TopicPartitionVO partition)
{
- partitions.put(partition.getId(), partition);
+ partitions.put(partition.getId(), partition);
}
@Override
public int compareTo(TopicVO that)
{
- return this.name.compareTo(that.name);
+ return this.name.compareTo(that.name);
}
@Override
public boolean equals(Object o)
{
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o)
+ { return true; }
+ if (o == null || getClass() != o.getClass())
+ { return false; }
- TopicVO that = (TopicVO) o;
+ TopicVO that = (TopicVO) o;
- if (!name.equals(that.name)) return false;
+ if (!name.equals(that.name))
+ { return false; }
- return true;
+ return true;
}
@Override
public int hashCode()
{
- return name.hashCode();
+ return name.hashCode();
}
-
}
diff --git a/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java b/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java
index 84df01b..f6e03ab 100644
--- a/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java
+++ b/src/main/java/com/homeadvisor/kafdrop/service/CuratorKafkaMonitor.java
@@ -18,45 +18,44 @@
package com.homeadvisor.kafdrop.service;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
+import com.fasterxml.jackson.databind.*;
+import com.google.common.base.*;
+import com.google.common.collect.*;
import com.homeadvisor.kafdrop.model.*;
-import com.homeadvisor.kafdrop.util.BrokerChannel;
-import com.homeadvisor.kafdrop.util.Version;
-import kafka.api.ConsumerMetadataRequest;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.cluster.Broker;
-import kafka.common.ErrorMapping;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.*;
-import kafka.network.BlockingChannel;
-import kafka.utils.ZKGroupDirs;
-import kafka.utils.ZKGroupTopicDirs;
-import kafka.utils.ZkUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.curator.framework.CuratorFramework;
+import com.homeadvisor.kafdrop.util.*;
+import javax.annotation.*;
+import kafka.api.*;
+import kafka.cluster.*;
+import kafka.common.*;
+import kafka.javaapi.ConsumerMetadataResponse;
+import kafka.javaapi.OffsetFetchRequest;
+import kafka.javaapi.OffsetFetchResponse;
+import kafka.javaapi.OffsetRequest;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.network.*;
+import kafka.utils.*;
+import org.apache.commons.lang.*;
+import org.apache.curator.framework.*;
import org.apache.curator.framework.recipes.cache.*;
-import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.retry.backoff.FixedBackOffPolicy;
-import org.springframework.retry.policy.SimpleRetryPolicy;
-import org.springframework.retry.support.RetryTemplate;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import java.io.IOException;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache.*;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.*;
+import org.slf4j.*;
+import org.springframework.beans.factory.annotation.*;
+import org.springframework.retry.backoff.*;
+import org.springframework.retry.policy.*;
+import org.springframework.retry.support.*;
+import org.springframework.stereotype.*;
+
+import java.io.*;
import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinTask;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.LongStream;
-import java.util.stream.Stream;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.stream.*;
@Service
public class CuratorKafkaMonitor implements KafkaMonitor
@@ -85,845 +84,860 @@ public class CuratorKafkaMonitor implements KafkaMonitor
@Autowired
private CuratorKafkaMonitorProperties properties;
+ @Autowired
+ private KafkaHighLevelConsumer kafkaHighLevelConsumer;
private Version kafkaVersion;
private RetryTemplate retryTemplate;
@PostConstruct
- public void start() throws Exception
- {
- try
- {
- kafkaVersion = new Version(properties.getKafkaVersion());
- }
- catch (Exception ex)
- {
- throw new IllegalStateException("Invalid kafka version: " + properties.getKafkaVersion(), ex);
- }
-
- threadPool = new ForkJoinPool(properties.getThreadPoolSize());
-
- FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
- backOffPolicy.setBackOffPeriod(properties.getRetry().getBackoffMillis());
-
- final SimpleRetryPolicy retryPolicy =
- new SimpleRetryPolicy(properties.getRetry().getMaxAttempts(),
- ImmutableMap.of(InterruptedException.class, false,
- Exception.class, true));
-
- retryTemplate = new RetryTemplate();
- retryTemplate.setBackOffPolicy(backOffPolicy);
- retryTemplate.setRetryPolicy(retryPolicy);
-
- cacheInitCounter.set(4);
-
- brokerPathCache = new PathChildrenCache(curatorFramework, ZkUtils.BrokerIdsPath(), true);
- brokerPathCache.getListenable().addListener(new BrokerListener());
- brokerPathCache.getListenable().addListener((f, e) -> {
- if (e.getType() == PathChildrenCacheEvent.Type.INITIALIZED)
- {
- cacheInitCounter.decrementAndGet();
- LOG.info("Broker cache initialized");
- }
- });
- brokerPathCache.start(StartMode.POST_INITIALIZED_EVENT);
-
- topicConfigPathCache = new PathChildrenCache(curatorFramework, ZkUtils.TopicConfigPath(), true);
- topicConfigPathCache.getListenable().addListener((f, e) -> {
- if (e.getType() == PathChildrenCacheEvent.Type.INITIALIZED)
- {
- cacheInitCounter.decrementAndGet();
- LOG.info("Topic configuration cache initialized");
- }
- });
- topicConfigPathCache.start(StartMode.POST_INITIALIZED_EVENT);
-
- topicTreeCache = new TreeCache(curatorFramework, ZkUtils.BrokerTopicsPath());
- topicTreeCache.getListenable().addListener((client, event) -> {
- if (event.getType() == TreeCacheEvent.Type.INITIALIZED)
- {
- cacheInitCounter.decrementAndGet();
- LOG.info("Topic tree cache initialized");
- }
- });
- topicTreeCache.start();
-
- consumerTreeCache = new TreeCache(curatorFramework, ZkUtils.ConsumersPath());
- consumerTreeCache.getListenable().addListener((client, event) -> {
- if (event.getType() == TreeCacheEvent.Type.INITIALIZED)
- {
- cacheInitCounter.decrementAndGet();
- LOG.info("Consumer tree cache initialized");
- }
- });
- consumerTreeCache.start();
-
- controllerNodeCache = new NodeCache(curatorFramework, ZkUtils.ControllerPath());
- controllerNodeCache.getListenable().addListener(this::updateController);
- controllerNodeCache.start(true);
- updateController();
+ public void start()
+ throws Exception
+ {
+ try
+ {
+ kafkaVersion = new Version(properties.getKafkaVersion());
+ }
+ catch (Exception ex)
+ {
+ throw new IllegalStateException("Invalid kafka version: " + properties.getKafkaVersion(), ex);
+ }
+
+ threadPool = new ForkJoinPool(properties.getThreadPoolSize());
+
+ FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
+ backOffPolicy.setBackOffPeriod(properties.getRetry().getBackoffMillis());
+
+ final SimpleRetryPolicy retryPolicy =
+ new SimpleRetryPolicy(properties.getRetry().getMaxAttempts(),
+ ImmutableMap.of(InterruptedException.class, false,
+ Exception.class, true));
+
+ retryTemplate = new RetryTemplate();
+ retryTemplate.setBackOffPolicy(backOffPolicy);
+ retryTemplate.setRetryPolicy(retryPolicy);
+
+ cacheInitCounter.set(4);
+
+ brokerPathCache = new PathChildrenCache(curatorFramework, ZkUtils.BrokerIdsPath(), true);
+ brokerPathCache.getListenable().addListener(new BrokerListener());
+ brokerPathCache.getListenable().addListener((f, e) -> {
+ if (e.getType() == PathChildrenCacheEvent.Type.INITIALIZED)
+ {
+ cacheInitCounter.decrementAndGet();
+ LOG.info("Broker cache initialized");
+ }
+ });
+ brokerPathCache.start(StartMode.POST_INITIALIZED_EVENT);
+
+ topicConfigPathCache = new PathChildrenCache(curatorFramework, ZkUtils.TopicConfigPath(), true);
+ topicConfigPathCache.getListenable().addListener((f, e) -> {
+ if (e.getType() == PathChildrenCacheEvent.Type.INITIALIZED)
+ {
+ cacheInitCounter.decrementAndGet();
+ LOG.info("Topic configuration cache initialized");
+ }
+ });
+ topicConfigPathCache.start(StartMode.POST_INITIALIZED_EVENT);
+
+ topicTreeCache = new TreeCache(curatorFramework, ZkUtils.BrokerTopicsPath());
+ topicTreeCache.getListenable().addListener((client, event) -> {
+ if (event.getType() == TreeCacheEvent.Type.INITIALIZED)
+ {
+ cacheInitCounter.decrementAndGet();
+ LOG.info("Topic tree cache initialized");
+ }
+ });
+ topicTreeCache.start();
+
+ consumerTreeCache = new TreeCache(curatorFramework, ZkUtils.ConsumersPath());
+ consumerTreeCache.getListenable().addListener((client, event) -> {
+ if (event.getType() == TreeCacheEvent.Type.INITIALIZED)
+ {
+ cacheInitCounter.decrementAndGet();
+ LOG.info("Consumer tree cache initialized");
+ }
+ });
+ consumerTreeCache.start();
+
+ controllerNodeCache = new NodeCache(curatorFramework, ZkUtils.ControllerPath());
+ controllerNodeCache.getListenable().addListener(this::updateController);
+ controllerNodeCache.start(true);
+ updateController();
}
private String clientId()
{
- return properties.getClientId();
+ return properties.getClientId();
+ }
+
+ @Override
+ public Version getKafkaVersion()
+ {
+ return kafkaVersion;
}
private void updateController()
{
- Optional.ofNullable(controllerNodeCache.getCurrentData())
- .map(data -> {
- try
- {
- Map controllerData = objectMapper.reader(Map.class).readValue(data.getData());
- return (Integer) controllerData.get("brokerid");
- }
- catch (IOException e)
- {
- LOG.error("Unable to read controller data", e);
- return null;
- }
- })
- .ifPresent(this::updateController);
+ Optional.ofNullable(controllerNodeCache.getCurrentData())
+ .map(data -> {
+ try
+ {
+ Map controllerData = objectMapper.reader(Map.class).readValue(data.getData());
+ return (Integer) controllerData.get("brokerid");
+ }
+ catch (IOException e)
+ {
+ LOG.error("Unable to read controller data", e);
+ return null;
+ }
+ })
+ .ifPresent(this::updateController);
}
private void updateController(int brokerId)
{
- brokerCache.values()
- .forEach(broker -> broker.setController(broker.getId() == brokerId));
+ brokerCache.values()
+ .forEach(broker -> broker.setController(broker.getId() == brokerId));
}
private void validateInitialized()
{
- if (cacheInitCounter.get() > 0)
- {
- throw new NotInitializedException();
- }
+ if (cacheInitCounter.get() > 0)
+ {
+ throw new NotInitializedException();
+ }
}
-
@PreDestroy
- public void stop() throws IOException
+ public void stop()
+ throws IOException
{
- consumerTreeCache.close();
- topicConfigPathCache.close();
- brokerPathCache.close();
- controllerNodeCache.close();
+ consumerTreeCache.close();
+ topicConfigPathCache.close();
+ brokerPathCache.close();
+ controllerNodeCache.close();
}
private int brokerId(ChildData input)
{
- return Integer.parseInt(StringUtils.substringAfter(input.getPath(), ZkUtils.BrokerIdsPath() + "/"));
+ return Integer.parseInt(StringUtils.substringAfter(input.getPath(), ZkUtils.BrokerIdsPath() + "/"));
}
private BrokerVO addBroker(BrokerVO broker)
{
- final BrokerVO oldBroker = brokerCache.put(broker.getId(), broker);
- LOG.info("Kafka broker {} was {}", broker.getId(), oldBroker == null ? "added" : "updated");
- return oldBroker;
+ final BrokerVO oldBroker = brokerCache.put(broker.getId(), broker);
+ LOG.info("Kafka broker {} was {}", broker.getId(), oldBroker == null ? "added" : "updated");
+ return oldBroker;
}
private BrokerVO removeBroker(int brokerId)
{
- final BrokerVO broker = brokerCache.remove(brokerId);
- LOG.info("Kafka broker {} was removed", broker.getId());
- return broker;
+ final BrokerVO broker = brokerCache.remove(brokerId);
+ LOG.info("Kafka broker {} was removed", broker.getId());
+ return broker;
}
@Override
public List getBrokers()
{
- validateInitialized();
- return brokerCache.values().stream().collect(Collectors.toList());
+ validateInitialized();
+ return brokerCache.values().stream().collect(Collectors.toList());
}
@Override
public Optional getBroker(int id)
{
- validateInitialized();
- return Optional.ofNullable(brokerCache.get(id));
+ validateInitialized();
+ return Optional.ofNullable(brokerCache.get(id));
}
private BrokerChannel brokerChannel(Integer brokerId)
{
- if (brokerId == null)
- {
- brokerId = randomBroker();
- if (brokerId == null)
- {
- throw new BrokerNotFoundException("No brokers available to select from");
- }
- }
+ if (brokerId == null)
+ {
+ brokerId = randomBroker();
+ if (brokerId == null)
+ {
+ throw new BrokerNotFoundException("No brokers available to select from");
+ }
+ }
- Integer finalBrokerId = brokerId;
- BrokerVO broker = getBroker(brokerId)
- .orElseThrow(() -> new BrokerNotFoundException("Broker " + finalBrokerId + " is not available"));
+ Integer finalBrokerId = brokerId;
+ BrokerVO broker = getBroker(brokerId)
+ .orElseThrow(() -> new BrokerNotFoundException("Broker " + finalBrokerId + " is not available"));
- return BrokerChannel.forBroker(broker.getHost(), broker.getPort());
+ return BrokerChannel.forBroker(broker.getHost(), broker.getPort());
}
private Integer randomBroker()
{
- if (brokerCache.size() > 0)
- {
- List brokerIds = brokerCache.keySet().stream().collect(Collectors.toList());
- Collections.shuffle(brokerIds);
- return brokerIds.get(0);
- }
- else
- {
- return null;
- }
+ if (brokerCache.size() > 0)
+ {
+ List brokerIds = brokerCache.keySet().stream().collect(Collectors.toList());
+ Collections.shuffle(brokerIds);
+ return brokerIds.get(0);
+ } else
+ {
+ return null;
+ }
}
public ClusterSummaryVO getClusterSummary()
{
- return getClusterSummary(getTopics());
+ return getClusterSummary(getTopics());
}
@Override
- public ClusterSummaryVO getClusterSummary(Collection topics) {
- final ClusterSummaryVO topicSummary = topics.stream()
- .map(topic -> {
- ClusterSummaryVO summary = new ClusterSummaryVO();
- summary.setPartitionCount(topic.getPartitions().size());
- summary.setUnderReplicatedCount(topic.getUnderReplicatedPartitions().size());
- summary.setPreferredReplicaPercent(topic.getPreferredReplicaPercent());
- topic.getPartitions()
- .forEach(partition -> {
- if (partition.getLeader() != null) {
- summary.addBrokerLeaderPartition(partition.getLeader().getId());
- }
- if (partition.getPreferredLeader() != null) {
- summary.addBrokerPreferredLeaderPartition(partition.getPreferredLeader().getId());
- }
- partition.getReplicas()
- .forEach(replica -> summary.addExpectedBrokerId(replica.getId()));
- });
- return summary;
- })
- .reduce((s1, s2) -> {
- s1.setPartitionCount(s1.getPartitionCount() + s2.getPartitionCount());
- s1.setUnderReplicatedCount(s1.getUnderReplicatedCount() + s2.getUnderReplicatedCount());
- s1.setPreferredReplicaPercent(s1.getPreferredReplicaPercent() + s2.getPreferredReplicaPercent());
- s2.getBrokerLeaderPartitionCount().forEach(s1::addBrokerLeaderPartition);
- s2.getBrokerPreferredLeaderPartitionCount().forEach(s1::addBrokerPreferredLeaderPartition);
- return s1;
- })
- .orElseGet(ClusterSummaryVO::new);
- topicSummary.setTopicCount(topics.size());
- topicSummary.setPreferredReplicaPercent(topicSummary.getPreferredReplicaPercent() / topics.size());
- return topicSummary;
+ public ClusterSummaryVO getClusterSummary(Collection topics)
+ {
+ final ClusterSummaryVO topicSummary = topics.stream()
+ .map(topic -> {
+ ClusterSummaryVO summary = new ClusterSummaryVO();
+ summary.setPartitionCount(topic.getPartitions().size());
+ summary.setUnderReplicatedCount(topic.getUnderReplicatedPartitions().size());
+ summary.setPreferredReplicaPercent(topic.getPreferredReplicaPercent());
+ topic.getPartitions()
+ .forEach(partition -> {
+ if (partition.getLeader() != null)
+ {
+ summary.addBrokerLeaderPartition(partition.getLeader().getId());
+ }
+ if (partition.getPreferredLeader() != null)
+ {
+ summary.addBrokerPreferredLeaderPartition(partition.getPreferredLeader().getId());
+ }
+ partition.getReplicas()
+ .forEach(replica -> summary.addExpectedBrokerId(replica.getId()));
+ });
+ return summary;
+ })
+ .reduce((s1, s2) -> {
+ s1.setPartitionCount(s1.getPartitionCount() + s2.getPartitionCount());
+ s1.setUnderReplicatedCount(s1.getUnderReplicatedCount() + s2.getUnderReplicatedCount());
+ s1.setPreferredReplicaPercent(s1.getPreferredReplicaPercent() + s2.getPreferredReplicaPercent());
+ s2.getBrokerLeaderPartitionCount().forEach(s1::addBrokerLeaderPartition);
+ s2.getBrokerPreferredLeaderPartitionCount().forEach(s1::addBrokerPreferredLeaderPartition);
+ return s1;
+ })
+ .orElseGet(ClusterSummaryVO::new);
+ topicSummary.setTopicCount(topics.size());
+ topicSummary.setPreferredReplicaPercent(topicSummary.getPreferredReplicaPercent() / topics.size());
+ return topicSummary;
}
@Override
public List getTopics()
{
- validateInitialized();
- return getTopicMetadata().values().stream()
- .sorted(Comparator.comparing(TopicVO::getName))
- .collect(Collectors.toList());
+ validateInitialized();
+ return getTopicMetadata().values().stream()
+ .sorted(Comparator.comparing(TopicVO::getName))
+ .collect(Collectors.toList());
}
@Override
public Optional getTopic(String topic)
{
- validateInitialized();
- final Optional topicVO = Optional.ofNullable(getTopicMetadata(topic).get(topic));
- topicVO.ifPresent(
- vo -> {
- getTopicPartitionSizes(vo, kafka.api.OffsetRequest.LatestTime())
- .entrySet()
- .forEach(entry -> vo.getPartition(entry.getKey()).ifPresent(p -> p.setSize(entry.getValue())));
- getTopicPartitionSizes(vo, kafka.api.OffsetRequest.EarliestTime())
- .entrySet()
- .forEach(entry -> vo.getPartition(entry.getKey()).ifPresent(p -> p.setFirstOffset(entry.getValue())));
- }
- );
- return topicVO;
+ validateInitialized();
+ final Optional topicVO = Optional.ofNullable(getTopicMetadata(topic).get(topic));
+ if (kafkaVersion.compareTo(new Version(0, 8, 2)) > 0)
+ {
+ topicVO.ifPresent(vo -> {
+ vo.setPartitions(getTopicPartitionSizes(vo));
+ });
+ } else
+ {
+ topicVO.ifPresent(
+ vo -> {
+ getTopicPartitionSizes(vo, kafka.api.OffsetRequest.LatestTime())
+ .entrySet()
+ .forEach(entry -> vo.getPartition(entry.getKey()).ifPresent(p -> p.setSize(entry.getValue())));
+ getTopicPartitionSizes(vo, kafka.api.OffsetRequest.EarliestTime())
+ .entrySet()
+ .forEach(entry -> vo.getPartition(entry.getKey()).ifPresent(p -> p.setFirstOffset(entry.getValue())));
+ }
+ );
+ }
+ return topicVO;
}
private Map getTopicMetadata(String... topics)
{
- if (kafkaVersion.compareTo(new Version(0, 9, 0)) >= 0)
- {
- return retryTemplate.execute(
- context -> brokerChannel(null)
- .execute(channel -> getTopicMetadata(channel, topics)));
- }
- else
- {
- Stream topicStream;
- if (topics == null || topics.length == 0)
- {
- topicStream =
- Optional.ofNullable(
- topicTreeCache.getCurrentChildren(ZkUtils.BrokerTopicsPath()))
- .map(Map::keySet)
- .map(Collection::stream)
- .orElse(Stream.empty());
- }
- else
- {
- topicStream = Stream.of(topics);
- }
-
- return topicStream
- .map(this::getTopicZkData)
- .filter(Objects::nonNull)
- .collect(Collectors.toMap(TopicVO::getName, topic -> topic));
- }
+ if (kafkaVersion.compareTo(new Version(0, 9, 0)) >= 0)
+ {
+ return retryTemplate.execute(
+ context -> brokerChannel(null)
+ .execute(channel -> getTopicMetadata(channel, topics)));
+ } else
+ {
+ Stream topicStream;
+ if (topics == null || topics.length == 0)
+ {
+ topicStream =
+ Optional.ofNullable(
+ topicTreeCache.getCurrentChildren(ZkUtils.BrokerTopicsPath()))
+ .map(Map::keySet)
+ .map(Collection::stream)
+ .orElse(Stream.empty());
+ } else
+ {
+ topicStream = Stream.of(topics);
+ }
+
+ return topicStream
+ .map(this::getTopicZkData)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toMap(TopicVO::getName, topic -> topic));
+ }
}
private TopicVO getTopicZkData(String topic)
{
- return Optional.ofNullable(topicTreeCache.getCurrentData(ZkUtils.getTopicPath(topic)))
- .map(this::parseZkTopic)
- .orElse(null);
+ return Optional.ofNullable(topicTreeCache.getCurrentData(ZkUtils.getTopicPath(topic)))
+ .map(this::parseZkTopic)
+ .orElse(null);
}
public TopicVO parseZkTopic(ChildData input)
{
- try
- {
- final TopicVO topic = new TopicVO(StringUtils.substringAfterLast(input.getPath(), "/"));
+ try
+ {
+ final TopicVO topic = new TopicVO(StringUtils.substringAfterLast(input.getPath(), "/"));
- final TopicRegistrationVO topicRegistration =
- objectMapper.reader(TopicRegistrationVO.class).readValue(input.getData());
+ final TopicRegistrationVO topicRegistration =
+ objectMapper.reader(TopicRegistrationVO.class).readValue(input.getData());
- topic.setConfig(
- Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.TopicConfigPath() + "/" + topic.getName()))
- .map(this::readTopicConfig)
- .orElse(Collections.emptyMap()));
+ topic.setConfig(
+ Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.TopicConfigPath() + "/" + topic.getName()))
+ .map(this::readTopicConfig)
+ .orElse(Collections.emptyMap()));
- for (Map.Entry> entry : topicRegistration.getReplicas().entrySet())
- {
- final int partitionId = entry.getKey();
- final List partitionBrokerIds = entry.getValue();
+ for (Map.Entry> entry : topicRegistration.getReplicas().entrySet())
+ {
+ final int partitionId = entry.getKey();
+ final List partitionBrokerIds = entry.getValue();
- final TopicPartitionVO partition = new TopicPartitionVO(partitionId);
+ final TopicPartitionVO partition = new TopicPartitionVO(partitionId);
- final Optional partitionState = partitionState(topic.getName(), partition.getId());
+ final Optional partitionState = partitionState(topic.getName(), partition.getId());
- partitionBrokerIds.stream()
- .map(brokerId -> {
- TopicPartitionVO.PartitionReplica replica = new TopicPartitionVO.PartitionReplica();
- replica.setId(brokerId);
- replica.setInService(partitionState.map(ps -> ps.getIsr().contains(brokerId)).orElse(false));
- replica.setLeader(partitionState.map(ps -> brokerId == ps.getLeader()).orElse(false));
- return replica;
- })
- .forEach(partition::addReplica);
+ partitionBrokerIds.stream()
+ .map(brokerId -> {
+ TopicPartitionVO.PartitionReplica replica = new TopicPartitionVO.PartitionReplica();
+ replica.setId(brokerId);
+ replica.setInService(partitionState.map(ps -> ps.getIsr().contains(brokerId)).orElse(false));
+ replica.setLeader(partitionState.map(ps -> brokerId == ps.getLeader()).orElse(false));
+ return replica;
+ })
+ .forEach(partition::addReplica);
- topic.addPartition(partition);
- }
+ topic.addPartition(partition);
+ }
- // todo: get partition sizes here as single bulk request?
+ // todo: get partition sizes here as single bulk request?
- return topic;
- }
- catch (IOException e)
- {
- throw Throwables.propagate(e);
- }
+ return topic;
+ }
+ catch (IOException e)
+ {
+ throw Throwables.propagate(e);
+ }
}
private Map getTopicMetadata(BlockingChannel channel, String... topics)
{
- final TopicMetadataRequest request =
- new TopicMetadataRequest((short) 0, 0, clientId(), Arrays.asList(topics));
-
- LOG.debug("Sending topic metadata request: {}", request);
-
- channel.send(request);
- final kafka.api.TopicMetadataResponse underlyingResponse =
- kafka.api.TopicMetadataResponse.readFrom(channel.receive().buffer());
-
- LOG.debug("Received topic metadata response: {}", underlyingResponse);
-
- TopicMetadataResponse response = new TopicMetadataResponse(underlyingResponse);
- return response.topicsMetadata().stream()
- .filter(tmd -> tmd.errorCode() == ErrorMapping.NoError())
- .map(this::processTopicMetadata)
- .collect(Collectors.toMap(TopicVO::getName, t -> t));
+ return kafkaHighLevelConsumer.getTopicsInfo(topics);
}
private TopicVO processTopicMetadata(TopicMetadata tmd)
{
- TopicVO topic = new TopicVO(tmd.topic());
+ TopicVO topic = new TopicVO(tmd.topic());
- topic.setConfig(
- Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.TopicConfigPath() + "/" + topic.getName()))
- .map(this::readTopicConfig)
- .orElse(Collections.emptyMap()));
+ topic.setConfig(
+ Optional.ofNullable(topicConfigPathCache.getCurrentData(ZkUtils.TopicConfigPath() + "/" + topic.getName()))
+ .map(this::readTopicConfig)
+ .orElse(Collections.emptyMap()));
- topic.setPartitions(
- tmd.partitionsMetadata().stream()
- .map((pmd) -> parsePartitionMetadata(tmd.topic(), pmd))
- .collect(Collectors.toMap(TopicPartitionVO::getId, p -> p))
- );
- return topic;
+ topic.setPartitions(
+ tmd.partitionsMetadata().stream()
+ .map((pmd) -> parsePartitionMetadata(tmd.topic(), pmd))
+ .collect(Collectors.toMap(TopicPartitionVO::getId, p -> p))
+ );
+ return topic;
}
private TopicPartitionVO parsePartitionMetadata(String topic, PartitionMetadata pmd)
{
- TopicPartitionVO partition = new TopicPartitionVO(pmd.partitionId());
- if (pmd.leader() != null)
- {
- partition.addReplica(new TopicPartitionVO.PartitionReplica(pmd.leader().id(), true, true));
- }
+ TopicPartitionVO partition = new TopicPartitionVO(pmd.partitionId());
+ if (pmd.leader() != null)
+ {
+ partition.addReplica(new TopicPartitionVO.PartitionReplica(pmd.leader().id(), true, true));
+ }
- final List isr = getIsr(topic, pmd);
- pmd.replicas().stream()
- .map(replica -> new TopicPartitionVO.PartitionReplica(replica.id(), isr.contains(replica.id()), false))
- .forEach(partition::addReplica);
- return partition;
+ final List isr = getIsr(topic, pmd);
+ pmd.replicas().stream()
+ .map(replica -> new TopicPartitionVO.PartitionReplica(replica.id(), isr.contains(replica.id()), false))
+ .forEach(partition::addReplica);
+ return partition;
}
private List getIsr(String topic, PartitionMetadata pmd)
{
- return pmd.isr().stream().map(Broker::id).collect(Collectors.toList());
+ return pmd.isr().stream().map(Broker::id).collect(Collectors.toList());
}
private Map readTopicConfig(ChildData d)
{
- try
- {
- final Map configData = objectMapper.reader(Map.class).readValue(d.getData());
- return (Map) configData.get("config");
- }
- catch (IOException e)
- {
- throw Throwables.propagate(e);
- }
+ try
+ {
+ final Map configData = objectMapper.reader(Map.class).readValue(d.getData());
+ return (Map) configData.get("config");
+ }
+ catch (IOException e)
+ {
+ throw Throwables.propagate(e);
+ }
}
-
private Optional partitionState(String topicName, int partitionId)
- throws IOException
+ throws IOException
{
- final Optional partitionData = Optional.ofNullable(topicTreeCache.getCurrentData(
- ZkUtils.getTopicPartitionLeaderAndIsrPath(topicName, partitionId)))
- .map(ChildData::getData);
- if (partitionData.isPresent())
- {
- return Optional.ofNullable(objectMapper.reader(TopicPartitionStateVO.class).readValue(partitionData.get()));
- }
- else
- {
- return Optional.empty();
- }
+ final Optional partitionData = Optional.ofNullable(topicTreeCache.getCurrentData(
+ ZkUtils.getTopicPartitionLeaderAndIsrPath(topicName, partitionId)))
+ .map(ChildData::getData);
+ if (partitionData.isPresent())
+ {
+ return Optional.ofNullable(objectMapper.reader(TopicPartitionStateVO.class).readValue(partitionData.get()));
+ } else
+ {
+ return Optional.empty();
+ }
}
@Override
public List getConsumers()
{
- validateInitialized();
- return getConsumerStream(null).collect(Collectors.toList());
+ validateInitialized();
+ return getConsumerStream(null).collect(Collectors.toList());
}
@Override
public List getConsumers(final TopicVO topic)
{
- validateInitialized();
- return getConsumerStream(topic)
- .filter(consumer -> consumer.getTopic(topic.getName()) != null)
- .collect(Collectors.toList());
+ validateInitialized();
+ return getConsumerStream(topic)
+ .filter(consumer -> consumer.getTopic(topic.getName()) != null)
+ .collect(Collectors.toList());
}
@Override
public List getConsumers(final String topic)
{
- return getConsumers(getTopic(topic).get());
+ return getConsumers(getTopic(topic).get());
}
private Stream getConsumerStream(TopicVO topic)
{
- return consumerTreeCache.getCurrentChildren(ZkUtils.ConsumersPath()).keySet().stream()
- .map(g -> getConsumerByTopic(g, topic))
- .filter(Optional::isPresent)
- .map(Optional::get)
- .sorted(Comparator.comparing(ConsumerVO::getGroupId));
+ return consumerTreeCache.getCurrentChildren(ZkUtils.ConsumersPath()).keySet().stream()
+ .map(g -> getConsumerByTopic(g, topic))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .sorted(Comparator.comparing(ConsumerVO::getGroupId));
}
@Override
public Optional getConsumer(String groupId)
{
- validateInitialized();
- return getConsumerByTopic(groupId, null);
+ validateInitialized();
+ return getConsumerByTopic(groupId, null);
}
@Override
public Optional getConsumerByTopicName(String groupId, String topicName)
{
- return getConsumerByTopic(groupId, Optional.of(topicName).flatMap(this::getTopic).orElse(null));
+ return getConsumerByTopic(groupId, Optional.of(topicName).flatMap(this::getTopic).orElse(null));
}
@Override
public Optional getConsumerByTopic(String groupId, TopicVO topic)
{
- final ConsumerVO consumer = new ConsumerVO(groupId);
- final ZKGroupDirs groupDirs = new ZKGroupDirs(groupId);
-
- if (consumerTreeCache.getCurrentData(groupDirs.consumerGroupDir()) == null) return Optional.empty();
-
- // todo: get number of threads in each instance (subscription -> topic -> # threads)
- Optional.ofNullable(consumerTreeCache.getCurrentChildren(groupDirs.consumerRegistryDir()))
- .ifPresent(
- children ->
- children.keySet().stream()
- .map(id -> readConsumerRegistration(groupDirs, id))
- .forEach(consumer::addActiveInstance));
-
- Stream topicStream = null;
-
- if (topic != null)
- {
- if (consumerTreeCache.getCurrentData(groupDirs.consumerGroupDir() + "/owners/" + topic.getName()) != null)
- {
- topicStream = Stream.of(topic.getName());
- }
- else
- {
- topicStream = Stream.empty();
- }
- }
- else
- {
- topicStream = Optional.ofNullable(
- consumerTreeCache.getCurrentChildren(groupDirs.consumerGroupDir() + "/owners"))
- .map(Map::keySet)
- .map(Collection::stream)
- .orElse(Stream.empty());
- }
-
- topicStream
- .map(ConsumerTopicVO::new)
- .forEach(consumerTopic -> {
- getConsumerPartitionStream(groupId, consumerTopic.getTopic(), topic)
- .forEach(consumerTopic::addOffset);
- consumer.addTopic(consumerTopic);
- });
-
- return Optional.of(consumer);
+ final ConsumerVO consumer = new ConsumerVO(groupId);
+ final ZKGroupDirs groupDirs = new ZKGroupDirs(groupId);
+
+ if (consumerTreeCache.getCurrentData(groupDirs.consumerGroupDir()) == null)
+ { return Optional.empty(); }
+
+ // todo: get number of threads in each instance (subscription -> topic -> # threads)
+ Optional.ofNullable(consumerTreeCache.getCurrentChildren(groupDirs.consumerRegistryDir()))
+ .ifPresent(
+ children ->
+ children.keySet().stream()
+ .map(id -> readConsumerRegistration(groupDirs, id))
+ .forEach(consumer::addActiveInstance));
+
+ Stream topicStream = null;
+
+ if (topic != null)
+ {
+ if (consumerTreeCache.getCurrentData(groupDirs.consumerGroupDir() + "/owners/" + topic.getName()) != null)
+ {
+ topicStream = Stream.of(topic.getName());
+ } else
+ {
+ topicStream = Stream.empty();
+ }
+ } else
+ {
+ topicStream = Optional.ofNullable(
+ consumerTreeCache.getCurrentChildren(groupDirs.consumerGroupDir() + "/owners"))
+ .map(Map::keySet)
+ .map(Collection::stream)
+ .orElse(Stream.empty());
+ }
+
+ topicStream
+ .map(ConsumerTopicVO::new)
+ .forEach(consumerTopic -> {
+ getConsumerPartitionStream(groupId, consumerTopic.getTopic(), topic)
+ .forEach(consumerTopic::addOffset);
+ consumer.addTopic(consumerTopic);
+ });
+
+ return Optional.of(consumer);
+ }
+
+ @Override
+ public List getMessages(TopicPartition topicPartition, long offset, long count)
+ {
+
+ List> records = kafkaHighLevelConsumer.getLatestRecords(topicPartition, offset, count);
+ List messageVOS = Lists.newArrayList();
+ for (ConsumerRecord record : records)
+ {
+ MessageVO messageVo = new MessageVO();
+ messageVo.setKey(record.key());
+ messageVo.setMessage(record.value());
+ messageVo.setChecksum(record.checksum());
+ messageVo.setCompressionCodec(record.headers().toString());
+ messageVo.setValid(true);
+
+ messageVOS.add(messageVo);
+ }
+ return messageVOS;
}
private ConsumerRegistrationVO readConsumerRegistration(ZKGroupDirs groupDirs, String id)
{
- try
- {
- ChildData data = consumerTreeCache.getCurrentData(groupDirs.consumerRegistryDir() + "/" + id);
- final Map consumerData = objectMapper.reader(Map.class).readValue(data.getData());
- Map subscriptions = (Map) consumerData.get("subscription");
+ try
+ {
+ ChildData data = consumerTreeCache.getCurrentData(groupDirs.consumerRegistryDir() + "/" + id);
+ final Map consumerData = objectMapper.reader(Map.class).readValue(data.getData());
+ Map subscriptions = (Map) consumerData.get("subscription");
- ConsumerRegistrationVO vo = new ConsumerRegistrationVO(id);
- vo.setSubscriptions(subscriptions);
- return vo;
- }
- catch (IOException ex)
- {
- throw Throwables.propagate(ex);
- }
+ ConsumerRegistrationVO vo = new ConsumerRegistrationVO(id);
+ vo.setSubscriptions(subscriptions);
+ return vo;
+ }
+ catch (IOException ex)
+ {
+ throw Throwables.propagate(ex);
+ }
}
private Stream getConsumerPartitionStream(String groupId,
- String topicName,
- TopicVO topicOpt)
+ String topicName,
+ TopicVO topicOpt)
{
- ZKGroupTopicDirs groupTopicDirs = new ZKGroupTopicDirs(groupId, topicName);
+ ZKGroupTopicDirs groupTopicDirs = new ZKGroupTopicDirs(groupId, topicName);
- if (topicOpt == null || topicOpt.getName().equals(topicName))
- {
- topicOpt = getTopic(topicName).orElse(null);
- }
+ if (topicOpt == null || topicOpt.getName().equals(topicName))
+ {
+ topicOpt = getTopic(topicName).orElse(null);
+ }
- if (topicOpt != null)
- {
- final TopicVO topic = topicOpt;
+ if (topicOpt != null)
+ {
+ final TopicVO topic = topicOpt;
- Map consumerOffsets = getConsumerOffsets(groupId, topic);
+ Map consumerOffsets = getConsumerOffsets(groupId, topic);
- return topic.getPartitions().stream()
- .map(partition -> {
- int partitionId = partition.getId();
+ return topic.getPartitions().stream()
+ .map(partition -> {
+ int partitionId = partition.getId();
- final ConsumerPartitionVO consumerPartition = new ConsumerPartitionVO(groupId, topicName, partitionId);
- consumerPartition.setOwner(
- Optional.ofNullable(
- consumerTreeCache.getCurrentData(groupTopicDirs.consumerOwnerDir() + "/" + partitionId))
- .map(data -> new String(data.getData()))
- .orElse(null));
+ final ConsumerPartitionVO consumerPartition = new ConsumerPartitionVO(groupId, topicName, partitionId);
+ consumerPartition.setOwner(
+ Optional.ofNullable(
+ consumerTreeCache.getCurrentData(groupTopicDirs.consumerOwnerDir() + "/" + partitionId))
+ .map(data -> new String(data.getData()))
+ .orElse(null));
- consumerPartition.setOffset(consumerOffsets.getOrDefault(partitionId, -1L));
+ consumerPartition.setOffset(consumerOffsets.getOrDefault(partitionId, -1L));
- final Optional topicPartition = topic.getPartition(partitionId);
- consumerPartition.setSize(topicPartition.map(TopicPartitionVO::getSize).orElse(-1L));
- consumerPartition.setFirstOffset(topicPartition.map(TopicPartitionVO::getFirstOffset).orElse(-1L));
+ final Optional topicPartition = topic.getPartition(partitionId);
+ consumerPartition.setSize(topicPartition.map(TopicPartitionVO::getSize).orElse(-1L));
+ consumerPartition.setFirstOffset(topicPartition.map(TopicPartitionVO::getFirstOffset).orElse(-1L));
- return consumerPartition;
- });
- }
- else
- {
- return Stream.empty();
- }
+ return consumerPartition;
+ });
+ } else
+ {
+ return Stream.empty();
+ }
}
private Map getConsumerOffsets(String groupId, TopicVO topic)
{
- try
- {
- // Kafka doesn't really give us an indication of whether a consumer is
- // using Kafka or Zookeeper based offset tracking. So look up the offsets
- // for both and assume that the largest offset is the correct one.
-
- ForkJoinTask