diff --git a/mqtt-broker/pom.xml b/mqtt-broker/pom.xml index 2293b02ff..6e93eb653 100644 --- a/mqtt-broker/pom.xml +++ b/mqtt-broker/pom.xml @@ -20,7 +20,7 @@ pulsar-protocol-handler-mqtt-parent io.streamnative.pulsar.handlers - 3.4.0-SNAPSHOT + 4.2.0-SNAPSHOT 4.0.0 pulsar-protocol-handler-mqtt diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/rest/MQTTAdditionalServlet.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/rest/MQTTAdditionalServlet.java index dd37d580d..1472a23ee 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/rest/MQTTAdditionalServlet.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/rest/MQTTAdditionalServlet.java @@ -16,7 +16,6 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithPulsarService; import org.apache.pulsar.common.configuration.PulsarConfiguration; -import org.eclipse.jetty.servlet.ServletHolder; /** * MQTT additional servlet. @@ -36,8 +35,8 @@ public String getBasePath() { } @Override - public ServletHolder getServletHolder() { - return new ServletHolder(new MQTTServiceServlet(pulsarService)); + public Object getServletInstance() { + return new MQTTServiceServlet(pulsarService); } @Override diff --git a/mqtt-common/pom.xml b/mqtt-common/pom.xml index 707ea52b6..d1ef745ef 100644 --- a/mqtt-common/pom.xml +++ b/mqtt-common/pom.xml @@ -20,7 +20,7 @@ pulsar-protocol-handler-mqtt-parent io.streamnative.pulsar.handlers - 3.4.0-SNAPSHOT + 4.2.0-SNAPSHOT 4.0.0 pulsar-protocol-handler-mqtt-common diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/event/PulsarTopicChangeListener.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/event/PulsarTopicChangeListener.java index ffbdba8c7..961bf64a2 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/event/PulsarTopicChangeListener.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/event/PulsarTopicChangeListener.java @@ -15,7 +15,7 @@ import io.streamnative.pulsar.handlers.mqtt.common.utils.EventParserUtils; import java.util.regex.Pattern; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; diff --git a/mqtt-proxy/pom.xml b/mqtt-proxy/pom.xml index 47fdb6df7..807060904 100644 --- a/mqtt-proxy/pom.xml +++ b/mqtt-proxy/pom.xml @@ -20,7 +20,7 @@ pulsar-protocol-handler-mqtt-parent io.streamnative.pulsar.handlers - 3.4.0-SNAPSHOT + 4.2.0-SNAPSHOT 4.0.0 pulsar-protocol-handler-mqtt-proxy diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/authentication/mtls/AuthenticationProviderMTls.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/authentication/mtls/AuthenticationProviderMTls.java index 1095e3769..70acda530 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/authentication/mtls/AuthenticationProviderMTls.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/authentication/mtls/AuthenticationProviderMTls.java @@ -39,8 +39,8 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.codec.binary.Hex; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.text.StrBuilder; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.text.StrBuilder; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/WebService.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/WebService.java index 70a3e3c1c..edb1e93b9 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/WebService.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/WebService.java @@ -14,7 +14,6 @@ package io.streamnative.pulsar.handlers.mqtt.proxy.web; import io.prometheus.client.CollectorRegistry; -import io.prometheus.client.jetty.JettyStatisticsCollector; import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration; import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService; import io.streamnative.pulsar.handlers.mqtt.proxy.impl.MQTTProxyException; @@ -32,26 +31,25 @@ import org.apache.pulsar.broker.web.UnrecognizedPropertyExceptionMapper; import org.apache.pulsar.broker.web.WebExecutorThreadPool; import org.apache.pulsar.common.util.PulsarSslFactory; +import org.apache.pulsar.jetty.metrics.JettyStatisticsCollector; +import org.eclipse.jetty.ee8.servlet.ServletContextHandler; +import org.eclipse.jetty.ee8.servlet.ServletHolder; import org.eclipse.jetty.server.ConnectionFactory; -import org.eclipse.jetty.server.ConnectionLimit; import org.eclipse.jetty.server.ForwardedRequestCustomizer; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.NetworkConnectionLimit; import org.eclipse.jetty.server.ProxyConnectionFactory; -import org.eclipse.jetty.server.RequestLog; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.server.handler.ContextHandlerCollection; import org.eclipse.jetty.server.handler.DefaultHandler; -import org.eclipse.jetty.server.handler.HandlerCollection; -import org.eclipse.jetty.server.handler.RequestLogHandler; +import org.eclipse.jetty.server.handler.QoSHandler; import org.eclipse.jetty.server.handler.ResourceHandler; import org.eclipse.jetty.server.handler.StatisticsHandler; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.eclipse.jetty.util.resource.Resource; +import org.eclipse.jetty.util.resource.ResourceFactory; import org.glassfish.jersey.media.multipart.MultiPartFeature; import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.servlet.ServletContainer; @@ -99,7 +97,7 @@ public WebService(MQTTProxyService proxyService) { config.getHttpServerThreadPoolQueueSize()); this.server = new Server(webServiceExecutor); if (config.getMaxHttpServerConnections() > 0) { - server.addBean(new ConnectionLimit(config.getMaxHttpServerConnections(), server)); + server.addBean(new NetworkConnectionLimit(config.getMaxHttpServerConnections(), server)); } List connectors = new ArrayList<>(); @@ -176,14 +174,15 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require if (attributeMap != null) { attributeMap.forEach(servletContextHandler::setAttribute); } - handlers.add(servletContextHandler); + handlers.add(servletContextHandler.get()); } public void addStaticResources(String basePath, String resourcePath) { ContextHandler capHandler = new ContextHandler(); capHandler.setContextPath(basePath); ResourceHandler resHandler = new ResourceHandler(); - resHandler.setBaseResource(Resource.newClassPathResource(resourcePath)); + ResourceFactory resourceFactory = ResourceFactory.root(); + resHandler.setBaseResource(resourceFactory.newClassLoaderResource(resourcePath, true)); resHandler.setEtags(true); resHandler.setCacheControl(WebService.HANDLER_CACHE_CONTROL); capHandler.setHandler(resHandler); @@ -192,19 +191,15 @@ public void addStaticResources(String basePath, String resourcePath) { public void start() throws MQTTProxyException { try { - RequestLogHandler requestLogHandler = new RequestLogHandler(); - RequestLog requestLogger = JettyRequestLogFactory.createRequestLogger(false, server); - requestLogHandler.setRequestLog(requestLogger); - handlers.add(0, new ContextHandlerCollection()); - handlers.add(requestLogHandler); + server.setRequestLog(JettyRequestLogFactory.createRequestLogger(false, server)); ContextHandlerCollection contexts = new ContextHandlerCollection(); - contexts.setHandlers(handlers.toArray(new Handler[handlers.size()])); + contexts.setHandlers(handlers); Handler handlerForContexts = GzipHandlerUtil.wrapWithGzipHandler(contexts, config.getHttpServerGzipCompressionExcludedPaths()); - HandlerCollection handlerCollection = new HandlerCollection(); - handlerCollection.setHandlers(new Handler[] {handlerForContexts, new DefaultHandler(), requestLogHandler}); + Handler.Collection handlerCollection = new Handler.Sequence(); + handlerCollection.setHandlers(handlerForContexts, new DefaultHandler()); // Metrics handler StatisticsHandler stats = new StatisticsHandler(); @@ -216,7 +211,14 @@ public void start() throws MQTTProxyException { // Already registered. Eg: in unit tests } - server.setHandler(stats); + Handler serverHandler = stats; + if (config.getMaxConcurrentHttpRequests() > 0) { + QoSHandler qoSHandler = new QoSHandler(serverHandler); + qoSHandler.setMaxRequestCount(config.getMaxConcurrentHttpRequests()); + serverHandler = qoSHandler; + } + server.setHandler(serverHandler); + server.start(); if (httpConnector != null) { diff --git a/pom.xml b/pom.xml index a8656297a..500f4e089 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> io.streamnative.pulsar.handlers pulsar-protocol-handler-mqtt-parent - 3.4.0-SNAPSHOT + 4.2.0-SNAPSHOT StreamNative :: Pulsar Protocol Handler :: MoP Parent Parent for MQTT on Pulsar implemented using Pulsar Protocol Handler. diff --git a/tests/pom.xml b/tests/pom.xml index d036d25f4..d2ef8e747 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -20,7 +20,7 @@ pulsar-protocol-handler-mqtt-parent io.streamnative.pulsar.handlers - 3.4.0-SNAPSHOT + 4.2.0-SNAPSHOT 4.0.0 pulsar-protocol-handler-mqtt-tests