diff --git a/README.md b/README.md
index 8fbf748c289..92b0ce3a9d4 100644
--- a/README.md
+++ b/README.md
@@ -148,7 +148,8 @@ Detailed config refer to [Install HertzBeat via Docker](https://hertzbeat.apache
manager-host: ${MANAGER_HOST:127.0.0.1}
manager-port: ${MANAGER_PORT:1158}
```
- - If you need MySQL, OceanBase, Oracle, or DB2 monitoring with external JDBC drivers from `ext-lib`, use the JVM collector package.
+ - If you do not provide JDBC drivers in `ext-lib`, MySQL, MariaDB, and OceanBase can use the built-in query engine and run on the native collector package as well. TiDB follows the same rule for its SQL query metric set.
+ - If `mysql-connector-j` is present in `ext-lib`, the built-in server collector or JVM collector automatically prefers JDBC after restart for MySQL, MariaDB, and OceanBase. TiDB follows the same rule for its SQL query metric set, while its HTTP metrics are unchanged. Oracle and DB2 still require the JVM collector package because they depend on external JDBC drivers.
- Run `$ ./bin/startup.sh ` or `bin/startup.bat` for the JVM collector package. Run `$ ./bin/startup.sh ` for Linux or macOS native collector packages, and `bin\\startup.bat` for the Windows native collector package.
- Access `http://localhost:1157` and you will see the registered new collector in dashboard
diff --git a/README_CN.md b/README_CN.md
index fda8d7ca640..80058e8f7b4 100644
--- a/README_CN.md
+++ b/README_CN.md
@@ -145,7 +145,8 @@
manager-host: ${MANAGER_HOST:127.0.0.1}
manager-port: ${MANAGER_PORT:1158}
```
- - 如果需要通过 `ext-lib` 加载 MySQL、OceanBase、Oracle、DB2 等外置 JDBC 驱动,请使用 JVM 采集器安装包。
+ - 如果没有在 `ext-lib` 中提供 JDBC 驱动,MySQL、MariaDB、OceanBase 可以直接使用内置查询引擎,也可以使用 Native 采集器安装包;TiDB 的 SQL 查询指标也遵循同样规则。
+ - 如果在 `ext-lib` 中放入了 `mysql-connector-j`,主程序内置采集器或 JVM 采集器会在重启后自动优先走 JDBC;这一点现在适用于 MySQL、MariaDB、OceanBase,TiDB 的 SQL 查询指标也遵循同样规则,而它的 HTTP 指标不受影响。Oracle、DB2 仍然必须使用 JVM 采集器安装包,因为它们依赖外置 JDBC 驱动。
- JVM 采集器安装包使用 `$ ./bin/startup.sh ` 或 `bin/startup.bat` 启动。Linux 或 macOS 的 Native 采集器安装包使用 `$ ./bin/startup.sh ` 启动,Windows 的 Native 采集器安装包使用 `bin\\startup.bat` 启动
- 浏览器访问主 HertzBeat 服务 `http://localhost:1157` 查看概览页面即可看到注册上来的新采集器
diff --git a/README_JP.md b/README_JP.md
index fdb2d3f799f..710b582aa4e 100644
--- a/README_JP.md
+++ b/README_JP.md
@@ -148,7 +148,8 @@
- `mode: ${MODE:public}`:実行モード(パブリッククラスタまたはプライベートクラウドエッジ)。
- `manager-host: ${MANAGER_HOST:127.0.0.1}`:メインhertzbeatサーバーのIP。
- `manager-port: ${MANAGER_PORT:1158}`:メインhertzbeatサーバポート。
- - `ext-lib` で MySQL、OceanBase、Oracle、DB2 などの外部 JDBC ドライバーを読み込む必要がある場合は、JVM コレクターのインストールパッケージを使用してください。
+ - `ext-lib` に JDBC ドライバーを置かない場合、MySQL、MariaDB、OceanBase は組み込みのクエリエンジンを使って Native コレクターパッケージでも監視できます。TiDB も SQL クエリのメトリクスセットについては同じルールです。
+ - `ext-lib` に `mysql-connector-j` を置いた場合は、再起動後に組み込みサーバーコレクターまたは JVM コレクターが MySQL、MariaDB、OceanBase で自動的に JDBC を優先します。TiDB も SQL クエリのメトリクスセットについては同じルールで、HTTP メトリクスは影響を受けません。Oracle と DB2 は引き続き外部 JDBC ドライバーに依存するため、JVM コレクターパッケージを使用してください。
- JVM コレクターのインストールパッケージは `$ ./bin/startup.sh` または `bin/startup.bat`、Linux/macOS の Native コレクターパッケージは `$ ./bin/startup.sh`、Windows の Native コレクターパッケージは `bin\\startup.bat` で起動します。
- メインの HertzBeat サービス `http://localhost:1157` にアクセスすると、登録された新しいコレクターを確認できます。
diff --git a/hertzbeat-collector/hertzbeat-collector-basic/pom.xml b/hertzbeat-collector/hertzbeat-collector-basic/pom.xml
index de107950e41..63c88e262df 100644
--- a/hertzbeat-collector/hertzbeat-collector-basic/pom.xml
+++ b/hertzbeat-collector/hertzbeat-collector-basic/pom.xml
@@ -67,12 +67,6 @@
commons-net
commons-net
-
-
- com.mysql
- mysql-connector-j
- provided
-
com.clickhouse
diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/JdbcCommonCollect.java b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/JdbcCommonCollect.java
index 42529c72842..76e9421a6a2 100644
--- a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/JdbcCommonCollect.java
+++ b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/JdbcCommonCollect.java
@@ -20,7 +20,6 @@
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DriverManager;
-import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
@@ -34,6 +33,9 @@
import org.apache.hertzbeat.collector.collect.common.cache.CacheIdentifier;
import org.apache.hertzbeat.collector.collect.common.cache.GlobalConnectionCache;
import org.apache.hertzbeat.collector.collect.common.cache.JdbcConnect;
+import org.apache.hertzbeat.collector.collect.database.query.JdbcQueryExecutor;
+import org.apache.hertzbeat.collector.collect.database.query.JdbcQueryExecutorRegistry;
+import org.apache.hertzbeat.collector.collect.database.query.JdbcQueryRowSet;
import org.apache.hertzbeat.collector.collect.common.ssh.SshTunnelHelper;
import org.apache.hertzbeat.collector.constants.CollectorConstants;
import org.apache.hertzbeat.collector.dispatch.DispatchConstants;
@@ -205,31 +207,26 @@ public void preCheck(Metrics metrics) throws IllegalArgumentException {
public void collect(CollectRep.MetricsData.Builder builder, Metrics metrics) {
long startTime = System.currentTimeMillis();
JdbcProtocol jdbcProtocol = metrics.getJdbc();
- SshTunnel sshTunnel = jdbcProtocol.getSshTunnel();
-
int timeout = CollectUtil.getTimeout(jdbcProtocol.getTimeout());
boolean reuseConnection = Boolean.parseBoolean(jdbcProtocol.getReuseConnection());
- Statement statement = null;
- String databaseUrl;
try {
- if (sshTunnel != null && Boolean.parseBoolean(sshTunnel.getEnable())) {
- int localPort = SshTunnelHelper.localPortForward(sshTunnel, jdbcProtocol.getHost(), jdbcProtocol.getPort());
- databaseUrl = constructDatabaseUrl(jdbcProtocol, "localhost", String.valueOf(localPort));
- } else {
- databaseUrl = constructDatabaseUrl(jdbcProtocol, jdbcProtocol.getHost(), jdbcProtocol.getPort());
- }
-
- statement = getConnection(jdbcProtocol.getUsername(),
- jdbcProtocol.getPassword(), databaseUrl, timeout, reuseConnection);
switch (jdbcProtocol.getQueryType()) {
- case QUERY_TYPE_ONE_ROW -> queryOneRow(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime);
- case QUERY_TYPE_MULTI_ROW -> queryMultiRow(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime);
- case QUERY_TYPE_COLUMNS -> queryOneRowByMatchTwoColumns(statement, jdbcProtocol.getSql(), metrics.getAliasFields(), builder, startTime);
- case RUN_SCRIPT -> {
- Connection connection = statement.getConnection();
- FileSystemResource rc = new FileSystemResource(jdbcProtocol.getSql());
- ScriptUtils.executeSqlScript(connection, rc);
+ case QUERY_TYPE_ONE_ROW -> {
+ try (JdbcQueryRowSet rowSet = executeQuery(metrics, timeout, reuseConnection, 1)) {
+ queryOneRow(rowSet, metrics.getAliasFields(), builder, startTime);
+ }
+ }
+ case QUERY_TYPE_MULTI_ROW -> {
+ try (JdbcQueryRowSet rowSet = executeQuery(metrics, timeout, reuseConnection, 1000)) {
+ queryMultiRow(rowSet, metrics.getAliasFields(), builder, startTime);
+ }
}
+ case QUERY_TYPE_COLUMNS -> {
+ try (JdbcQueryRowSet rowSet = executeQuery(metrics, timeout, reuseConnection, 1000)) {
+ queryOneRowByMatchTwoColumns(rowSet, metrics.getAliasFields(), builder, startTime);
+ }
+ }
+ case RUN_SCRIPT -> runScript(metrics, timeout, reuseConnection);
default -> {
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg("Not support database query type: " + jdbcProtocol.getQueryType());
@@ -261,23 +258,6 @@ public void collect(CollectRep.MetricsData.Builder builder, Metrics metrics) {
log.error("Jdbc error: {}.", errorMessage, e);
builder.setCode(CollectRep.Code.FAIL);
builder.setMsg("Query Error: " + errorMessage);
- } finally {
- if (statement != null) {
- Connection connection = null;
- try {
- connection = statement.getConnection();
- statement.close();
- } catch (Exception e) {
- log.error("Jdbc close statement error: {}", e.getMessage());
- }
- try {
- if (!reuseConnection && connection != null) {
- connection.close();
- }
- } catch (Exception e) {
- log.error("Jdbc close connection error: {}", e.getMessage());
- }
- }
}
}
@@ -286,6 +266,52 @@ public String supportProtocol() {
return DispatchConstants.PROTOCOL_JDBC;
}
+ private JdbcQueryRowSet executeQuery(Metrics metrics, int timeout, boolean reuseConnection, int maxRows) throws Exception {
+ Optional executor = JdbcQueryExecutorRegistry.resolve(metrics);
+ if (executor.isPresent()) {
+ return executor.get().executeQuery(metrics, timeout, maxRows);
+ }
+ return executeJdbcQuery(metrics.getJdbc(), timeout, reuseConnection, maxRows);
+ }
+
+ private JdbcQueryRowSet executeJdbcQuery(JdbcProtocol jdbcProtocol, int timeout, boolean reuseConnection,
+ int maxRows) throws Exception {
+ Statement statement = null;
+ try {
+ String databaseUrl = resolveDatabaseUrl(jdbcProtocol);
+ statement = getConnection(jdbcProtocol.getUsername(),
+ jdbcProtocol.getPassword(), databaseUrl, timeout, reuseConnection);
+ statement.setMaxRows(maxRows);
+ return new ResultSetJdbcQueryRowSet(statement, statement.executeQuery(jdbcProtocol.getSql()), reuseConnection);
+ } catch (Exception exception) {
+ closeStatementAndConnection(statement, reuseConnection);
+ throw exception;
+ }
+ }
+
+ private void runScript(Metrics metrics, int timeout, boolean reuseConnection) throws Exception {
+ JdbcProtocol jdbcProtocol = metrics.getJdbc();
+ Statement statement = null;
+ try {
+ String databaseUrl = resolveDatabaseUrl(jdbcProtocol);
+ statement = getConnection(jdbcProtocol.getUsername(),
+ jdbcProtocol.getPassword(), databaseUrl, timeout, reuseConnection);
+ Connection connection = statement.getConnection();
+ FileSystemResource rc = new FileSystemResource(jdbcProtocol.getSql());
+ ScriptUtils.executeSqlScript(connection, rc);
+ } finally {
+ closeStatementAndConnection(statement, reuseConnection);
+ }
+ }
+
+ private String resolveDatabaseUrl(JdbcProtocol jdbcProtocol) throws Exception {
+ SshTunnel sshTunnel = jdbcProtocol.getSshTunnel();
+ if (sshTunnel != null && Boolean.parseBoolean(sshTunnel.getEnable())) {
+ int localPort = SshTunnelHelper.localPortForward(sshTunnel, jdbcProtocol.getHost(), jdbcProtocol.getPort());
+ return constructDatabaseUrl(jdbcProtocol, "localhost", String.valueOf(localPort));
+ }
+ return constructDatabaseUrl(jdbcProtocol, jdbcProtocol.getHost(), jdbcProtocol.getPort());
+ }
private Statement getConnection(String username, String password, String url, Integer timeout, boolean reuseConnection) throws Exception {
CacheIdentifier identifier = CacheIdentifier.builder()
@@ -343,29 +369,25 @@ private Statement getConnection(String username, String password, String url, In
* query metrics:one tow three four
* query sql:select one, tow, three, four from book limit 1;
*
- * @param statement statement
- * @param sql sql
+ * @param rowSet row set
* @param columns query metrics field list
* @throws Exception when error happen
*/
- private void queryOneRow(Statement statement, String sql, List columns,
+ private void queryOneRow(JdbcQueryRowSet rowSet, List columns,
CollectRep.MetricsData.Builder builder, long startTime) throws Exception {
- statement.setMaxRows(1);
- try (ResultSet resultSet = statement.executeQuery(sql)) {
- if (resultSet.next()) {
- CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
- for (String column : columns) {
- if (CollectorConstants.RESPONSE_TIME.equals(column)) {
- long time = System.currentTimeMillis() - startTime;
- valueRowBuilder.addColumn(String.valueOf(time));
- } else {
- String value = resultSet.getString(column);
- value = value == null ? CommonConstants.NULL_VALUE : value;
- valueRowBuilder.addColumn(value);
- }
+ if (rowSet.next()) {
+ CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
+ for (String column : columns) {
+ if (CollectorConstants.RESPONSE_TIME.equals(column)) {
+ long time = System.currentTimeMillis() - startTime;
+ valueRowBuilder.addColumn(String.valueOf(time));
+ } else {
+ String value = rowSet.getString(column);
+ value = value == null ? CommonConstants.NULL_VALUE : value;
+ valueRowBuilder.addColumn(value);
}
- builder.addValueRow(valueRowBuilder.build());
}
+ builder.addValueRow(valueRowBuilder.build());
}
}
@@ -380,33 +402,30 @@ private void queryOneRow(Statement statement, String sql, List columns,
* three - value3
* four - value4
*
- * @param statement statement
- * @param sql sql
+ * @param rowSet row set
* @param columns query metrics field list
* @throws Exception when error happen
*/
- private void queryOneRowByMatchTwoColumns(Statement statement, String sql, List columns,
+ private void queryOneRowByMatchTwoColumns(JdbcQueryRowSet rowSet, List columns,
CollectRep.MetricsData.Builder builder, long startTime) throws Exception {
- try (ResultSet resultSet = statement.executeQuery(sql)) {
- HashMap values = new HashMap<>(columns.size());
- while (resultSet.next()) {
- if (resultSet.getString(1) != null) {
- values.put(resultSet.getString(1).toLowerCase().trim(), resultSet.getString(2));
- }
+ HashMap values = new HashMap<>(columns.size());
+ while (rowSet.next()) {
+ if (rowSet.getString(1) != null) {
+ values.put(rowSet.getString(1).toLowerCase().trim(), rowSet.getString(2));
}
- CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
- for (String column : columns) {
- if (CollectorConstants.RESPONSE_TIME.equals(column)) {
- long time = System.currentTimeMillis() - startTime;
- valueRowBuilder.addColumn(String.valueOf(time));
- } else {
- String value = values.get(column.toLowerCase());
- value = value == null ? CommonConstants.NULL_VALUE : value;
- valueRowBuilder.addColumn(value);
- }
+ }
+ CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
+ for (String column : columns) {
+ if (CollectorConstants.RESPONSE_TIME.equals(column)) {
+ long time = System.currentTimeMillis() - startTime;
+ valueRowBuilder.addColumn(String.valueOf(time));
+ } else {
+ String value = values.get(column.toLowerCase());
+ value = value == null ? CommonConstants.NULL_VALUE : value;
+ valueRowBuilder.addColumn(value);
}
- builder.addValueRow(valueRowBuilder.build());
}
+ builder.addValueRow(valueRowBuilder.build());
}
/**
@@ -416,28 +435,45 @@ private void queryOneRowByMatchTwoColumns(Statement statement, String sql, List<
* query sql:select one, tow, three, four from book;
* and return multi row record mapping with the metrics
*
- * @param statement statement
- * @param sql sql
+ * @param rowSet row set
* @param columns query metrics field list
* @throws Exception when error happen
*/
- private void queryMultiRow(Statement statement, String sql, List columns,
+ private void queryMultiRow(JdbcQueryRowSet rowSet, List columns,
CollectRep.MetricsData.Builder builder, long startTime) throws Exception {
- try (ResultSet resultSet = statement.executeQuery(sql)) {
- while (resultSet.next()) {
- CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
- for (String column : columns) {
- if (CollectorConstants.RESPONSE_TIME.equals(column)) {
- long time = System.currentTimeMillis() - startTime;
- valueRowBuilder.addColumn(String.valueOf(time));
- } else {
- String value = resultSet.getString(column);
- value = value == null ? CommonConstants.NULL_VALUE : value;
- valueRowBuilder.addColumn(value);
- }
+ while (rowSet.next()) {
+ CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder();
+ for (String column : columns) {
+ if (CollectorConstants.RESPONSE_TIME.equals(column)) {
+ long time = System.currentTimeMillis() - startTime;
+ valueRowBuilder.addColumn(String.valueOf(time));
+ } else {
+ String value = rowSet.getString(column);
+ value = value == null ? CommonConstants.NULL_VALUE : value;
+ valueRowBuilder.addColumn(value);
}
- builder.addValueRow(valueRowBuilder.build());
}
+ builder.addValueRow(valueRowBuilder.build());
+ }
+ }
+
+ private void closeStatementAndConnection(Statement statement, boolean reuseConnection) {
+ if (statement == null) {
+ return;
+ }
+ Connection connection = null;
+ try {
+ connection = statement.getConnection();
+ statement.close();
+ } catch (Exception exception) {
+ log.error("Jdbc close statement error: {}", exception.getMessage());
+ }
+ try {
+ if (!reuseConnection && connection != null) {
+ connection.close();
+ }
+ } catch (Exception exception) {
+ log.error("Jdbc close connection error: {}", exception.getMessage());
}
}
@@ -548,4 +584,53 @@ private String constructDatabaseUrl(JdbcProtocol jdbcProtocol, String host, Stri
default -> throw new IllegalArgumentException("Not support database platform: " + jdbcProtocol.getPlatform());
};
}
+
+ private static final class ResultSetJdbcQueryRowSet implements JdbcQueryRowSet {
+
+ private final Statement statement;
+ private final java.sql.ResultSet resultSet;
+ private final boolean reuseConnection;
+
+ private ResultSetJdbcQueryRowSet(Statement statement, java.sql.ResultSet resultSet, boolean reuseConnection) {
+ this.statement = statement;
+ this.resultSet = resultSet;
+ this.reuseConnection = reuseConnection;
+ }
+
+ @Override
+ public boolean next() throws Exception {
+ return resultSet.next();
+ }
+
+ @Override
+ public String getString(String column) throws Exception {
+ return resultSet.getString(column);
+ }
+
+ @Override
+ public String getString(int index) throws Exception {
+ return resultSet.getString(index);
+ }
+
+ @Override
+ public void close() throws Exception {
+ Connection connection = null;
+ try {
+ connection = statement.getConnection();
+ } catch (Exception ignored) {
+ // ignore
+ }
+ try {
+ resultSet.close();
+ } finally {
+ try {
+ statement.close();
+ } finally {
+ if (!reuseConnection && connection != null) {
+ connection.close();
+ }
+ }
+ }
+ }
+ }
}
diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/query/JdbcQueryExecutor.java b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/query/JdbcQueryExecutor.java
new file mode 100644
index 00000000000..3d3c9076da3
--- /dev/null
+++ b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/query/JdbcQueryExecutor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.database.query;
+
+import org.apache.hertzbeat.common.entity.job.Metrics;
+
+/**
+ * Adapter point for replacing only the SQL query execution part of JdbcCommonCollect.
+ */
+public interface JdbcQueryExecutor {
+
+ boolean supports(Metrics metrics);
+
+ JdbcQueryRowSet executeQuery(Metrics metrics, int timeout, int maxRows) throws Exception;
+}
diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/query/JdbcQueryExecutorRegistry.java b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/query/JdbcQueryExecutorRegistry.java
new file mode 100644
index 00000000000..bb9f1c0baa6
--- /dev/null
+++ b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/query/JdbcQueryExecutorRegistry.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.database.query;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+
+/**
+ * Static registry used by JdbcCommonCollect to discover optional query executors.
+ */
+public final class JdbcQueryExecutorRegistry {
+
+ private static final List EXECUTORS = new CopyOnWriteArrayList<>();
+
+ private JdbcQueryExecutorRegistry() {
+ }
+
+ public static void register(JdbcQueryExecutor executor) {
+ if (executor == null || EXECUTORS.contains(executor)) {
+ return;
+ }
+ EXECUTORS.add(executor);
+ }
+
+ public static void unregister(JdbcQueryExecutor executor) {
+ if (executor == null) {
+ return;
+ }
+ EXECUTORS.remove(executor);
+ }
+
+ public static Optional resolve(Metrics metrics) {
+ return EXECUTORS.stream()
+ .filter(executor -> executor.supports(metrics))
+ .findFirst();
+ }
+}
diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/query/JdbcQueryRowSet.java b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/query/JdbcQueryRowSet.java
new file mode 100644
index 00000000000..a93cef11d77
--- /dev/null
+++ b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/database/query/JdbcQueryRowSet.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.database.query;
+
+/**
+ * A minimal row cursor abstraction shared by JDBC and R2DBC-backed database queries.
+ */
+public interface JdbcQueryRowSet extends AutoCloseable {
+
+ boolean next() throws Exception;
+
+ String getString(String column) throws Exception;
+
+ String getString(int index) throws Exception;
+
+ @Override
+ void close() throws Exception;
+}
diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/database/JdbcCommonCollectTest.java b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/database/JdbcCommonCollectTest.java
index 129f7e66331..88cc7adcb67 100644
--- a/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/database/JdbcCommonCollectTest.java
+++ b/hertzbeat-collector/hertzbeat-collector-basic/src/test/java/org/apache/hertzbeat/collector/collect/database/JdbcCommonCollectTest.java
@@ -26,10 +26,11 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.LinkedHashMap;
+import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
/**
@@ -93,37 +94,6 @@ void collect() {
CollectRep.MetricsData.Builder builder = CollectRep.MetricsData.newBuilder();
jdbcCommonCollect.collect(builder, metrics);
});
-
- String[] platforms = new String[]{
- "mysql", "mariadb",
- "postgresql",
- "clickhouse",
- "sqlserver",
- "oracle",
- "dm"
- };
- for (String platform : platforms) {
- JdbcProtocol jdbc = new JdbcProtocol();
- jdbc.setPlatform(platform);
-
- Metrics metrics = new Metrics();
- metrics.setJdbc(jdbc);
-
- CollectRep.MetricsData.Builder builder = CollectRep.MetricsData.newBuilder();
- jdbcCommonCollect.collect(builder, metrics);
- assertNotEquals(builder.getMsg(), "Query Error: Not support database platform: " + platform);
- }
- // invalid platform
- JdbcProtocol jdbc = new JdbcProtocol();
- jdbc.setPlatform("invalid");
-
- Metrics metrics = new Metrics();
- metrics.setJdbc(jdbc);
-
- CollectRep.MetricsData.Builder builder = CollectRep.MetricsData.newBuilder();
- jdbcCommonCollect.collect(builder, metrics);
- assertEquals(builder.getCode(), CollectRep.Code.FAIL);
- assertEquals(builder.getMsg(), "Query Error: Not support database platform: invalid");
}
@Test
@@ -151,13 +121,10 @@ void testUrlPassThrough() {
.database("test")
.url(originalUrl)
.build();
-
- // Use reflection to call constructDatabaseUrl method
- Method constructMethod = JdbcCommonCollect.class.getDeclaredMethod("constructDatabaseUrl", JdbcProtocol.class, String.class, String.class);
- constructMethod.setAccessible(true);
- String processedUrl = (String) constructMethod.invoke(jdbcCollect, jdbcProtocol, "localhost", "3306");
+
+ String processedUrl = constructDatabaseUrl(jdbcCollect, jdbcProtocol, "localhost", "3306");
// Verify that the processed URL is the same as the original URL
- assertEquals(originalUrl, processedUrl,
+ assertEquals(originalUrl, processedUrl,
"URL should be passed through without modification: " + originalUrl);
} catch (Exception e) {
System.out.println("URL rejected by security validation: " + originalUrl + ", reason: " + e.getMessage());
@@ -165,6 +132,39 @@ void testUrlPassThrough() {
}
}
+ @Test
+ void testConstructDatabaseUrlByPlatform() throws Exception {
+ Map expectedUrls = new LinkedHashMap<>();
+ expectedUrls.put("mysql", "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false");
+ expectedUrls.put("mariadb", "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false");
+ expectedUrls.put("postgresql", "jdbc:postgresql://localhost:3306/test");
+ expectedUrls.put("clickhouse", "jdbc:clickhouse://localhost:3306/test");
+ expectedUrls.put("sqlserver", "jdbc:sqlserver://localhost:3306;DatabaseName=test;trustServerCertificate=true;");
+ expectedUrls.put("oracle", "jdbc:oracle:thin:@localhost:3306/test");
+ expectedUrls.put("dm", "jdbc:dm://localhost:3306");
+
+ for (Map.Entry entry : expectedUrls.entrySet()) {
+ JdbcProtocol jdbcProtocol = JdbcProtocol.builder()
+ .platform(entry.getKey())
+ .database("test")
+ .build();
+
+ assertEquals(entry.getValue(), constructDatabaseUrl(jdbcCommonCollect, jdbcProtocol, "localhost", "3306"));
+ }
+ }
+
+ @Test
+ void testConstructDatabaseUrlRejectsUnsupportedPlatform() {
+ JdbcProtocol jdbcProtocol = JdbcProtocol.builder()
+ .platform("invalid")
+ .database("test")
+ .build();
+
+ IllegalArgumentException exception = assertThrows(IllegalArgumentException.class,
+ () -> constructDatabaseUrl(jdbcCommonCollect, jdbcProtocol, "localhost", "3306"));
+ assertEquals("Not support database platform: invalid", exception.getMessage());
+ }
+
@Test
void testConstructDatabaseUrlSecurityInterception() {
JdbcCommonCollect jdbcCollect = new JdbcCommonCollect();
@@ -197,15 +197,28 @@ void testConstructDatabaseUrlSecurityInterception() {
.build();
assertThrows(Exception.class, () -> {
- try {
- Method constructMethod = JdbcCommonCollect.class.getDeclaredMethod("constructDatabaseUrl", JdbcProtocol.class, String.class, String.class);
- constructMethod.setAccessible(true);
- constructMethod.invoke(jdbcCollect, jdbcProtocol, "localhost", "3306");
- } catch (InvocationTargetException e) {
- throw e.getCause();
- }
+ constructDatabaseUrl(jdbcCollect, jdbcProtocol, "localhost", "3306");
}, "Malicious URL should be blocked: " + maliciousUrl);
}
}
+ private String constructDatabaseUrl(JdbcCommonCollect jdbcCollect, JdbcProtocol jdbcProtocol,
+ String host, String port) throws Exception {
+ try {
+ Method constructMethod = JdbcCommonCollect.class
+ .getDeclaredMethod("constructDatabaseUrl", JdbcProtocol.class, String.class, String.class);
+ constructMethod.setAccessible(true);
+ return (String) constructMethod.invoke(jdbcCollect, jdbcProtocol, host, port);
+ } catch (InvocationTargetException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof Exception exception) {
+ throw exception;
+ }
+ if (cause instanceof Error error) {
+ throw error;
+ }
+ throw new RuntimeException(cause);
+ }
+ }
+
}
diff --git a/hertzbeat-collector/hertzbeat-collector-collector/pom.xml b/hertzbeat-collector/hertzbeat-collector-collector/pom.xml
index e4f32ac721f..5fcca5107d9 100644
--- a/hertzbeat-collector/hertzbeat-collector-collector/pom.xml
+++ b/hertzbeat-collector/hertzbeat-collector-collector/pom.xml
@@ -48,6 +48,12 @@
${hertzbeat.version}
+
+ org.apache.hertzbeat
+ hertzbeat-collector-mysql-r2dbc
+ ${hertzbeat.version}
+
+
org.apache.hertzbeat
@@ -99,6 +105,23 @@
io.micrometer
micrometer-registry-prometheus
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ com.mysql
+ mysql-connector-j
+ test
+
+
+ org.testcontainers
+ testcontainers
+ ${testcontainers.version}
+ test
+
@@ -365,14 +388,6 @@
apache-hertzbeat-collector-native-${hzb.version}
-
-
- src/main/resources
-
- META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect
-
-
-
org.apache.maven.plugins
diff --git a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlCollectorProperties.java b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlCollectorProperties.java
new file mode 100644
index 00000000000..ac34a37ab84
--- /dev/null
+++ b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlCollectorProperties.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.database.mysql;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * Collector-side MySQL query engine routing.
+ */
+@Getter
+@Setter
+@ConfigurationProperties(prefix = "hertzbeat.collector.mysql")
+public class MysqlCollectorProperties {
+
+ private QueryEngine queryEngine = QueryEngine.AUTO;
+
+ public QueryEngine resolveQueryEngine(boolean mysqlJdbcDriverAvailable) {
+ if (queryEngine == QueryEngine.AUTO) {
+ return mysqlJdbcDriverAvailable ? QueryEngine.JDBC : QueryEngine.R2DBC;
+ }
+ return queryEngine;
+ }
+
+ public boolean useR2dbc(boolean mysqlJdbcDriverAvailable) {
+ return resolveQueryEngine(mysqlJdbcDriverAvailable) == QueryEngine.R2DBC;
+ }
+
+ /**
+ * Supported collector-side query engines.
+ */
+ public enum QueryEngine {
+ AUTO,
+ JDBC,
+ R2DBC
+ }
+}
diff --git a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlJdbcDriverAvailability.java b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlJdbcDriverAvailability.java
new file mode 100644
index 00000000000..8c1466aedeb
--- /dev/null
+++ b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlJdbcDriverAvailability.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.database.mysql;
+
+import java.net.URL;
+import java.security.CodeSource;
+import java.util.Locale;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ClassUtils;
+import org.springframework.util.StringUtils;
+
+/**
+ * Detects whether a MySQL JDBC driver is available from the external ext-lib path.
+ */
+@Component
+public class MysqlJdbcDriverAvailability {
+
+ private static final String[] MYSQL_DRIVER_CLASSES = {
+ "com.mysql.cj.jdbc.Driver",
+ "com.mysql.jdbc.Driver"
+ };
+
+ public boolean hasMysqlJdbcDriver() {
+ ClassLoader classLoader = ClassUtils.getDefaultClassLoader();
+ for (String driverClass : MYSQL_DRIVER_CLASSES) {
+ if (!ClassUtils.isPresent(driverClass, classLoader)) {
+ continue;
+ }
+ try {
+ if (isExternalExtLibDriver(ClassUtils.forName(driverClass, classLoader))) {
+ return true;
+ }
+ } catch (ClassNotFoundException ignored) {
+ // Race-free enough for runtime detection: keep probing other known driver class names.
+ }
+ }
+ return false;
+ }
+
+ boolean isExternalExtLibDriver(Class> driverClass) {
+ String location = resolveLocation(driverClass);
+ return isExtLibLocation(location);
+ }
+
+ static boolean isExtLibLocation(String location) {
+ if (!StringUtils.hasText(location)) {
+ return false;
+ }
+ String normalized = location
+ .replace('\\', '/')
+ .toLowerCase(Locale.ROOT);
+ return normalized.contains("/ext-lib/");
+ }
+
+ private String resolveLocation(Class> driverClass) {
+ CodeSource codeSource = driverClass.getProtectionDomain().getCodeSource();
+ if (codeSource != null && codeSource.getLocation() != null) {
+ return codeSource.getLocation().toExternalForm();
+ }
+ String resourceName = ClassUtils.convertClassNameToResourcePath(driverClass.getName()) + ".class";
+ ClassLoader classLoader = driverClass.getClassLoader();
+ URL resource = classLoader != null ? classLoader.getResource(resourceName) : ClassLoader.getSystemResource(resourceName);
+ return resource != null ? resource.toExternalForm() : null;
+ }
+}
diff --git a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlR2dbcJdbcQueryExecutor.java b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlR2dbcJdbcQueryExecutor.java
new file mode 100644
index 00000000000..55837095c60
--- /dev/null
+++ b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlR2dbcJdbcQueryExecutor.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.database.mysql;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.hertzbeat.collector.collect.common.ssh.SshTunnelHelper;
+import org.apache.hertzbeat.collector.collect.database.query.JdbcQueryExecutor;
+import org.apache.hertzbeat.collector.collect.database.query.JdbcQueryExecutorRegistry;
+import org.apache.hertzbeat.collector.collect.database.query.JdbcQueryRowSet;
+import org.apache.hertzbeat.collector.mysql.r2dbc.MysqlQueryExecutor;
+import org.apache.hertzbeat.collector.mysql.r2dbc.QueryOptions;
+import org.apache.hertzbeat.collector.mysql.r2dbc.QueryResult;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.job.SshTunnel;
+import org.apache.hertzbeat.common.entity.job.protocol.JdbcProtocol;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+/**
+ * MySQL-compatible query-only adapter that lets JdbcCommonCollect execute read-only queries through the built-in
+ * R2DBC path when no MySQL JDBC driver is present.
+ */
+@Component
+public class MysqlR2dbcJdbcQueryExecutor implements JdbcQueryExecutor, InitializingBean, DisposableBean {
+
+ private static final String QUERY_TYPE_ONE_ROW = "oneRow";
+ private static final String QUERY_TYPE_MULTI_ROW = "multiRow";
+ private static final String QUERY_TYPE_COLUMNS = "columns";
+
+ private final MysqlCollectorProperties properties;
+ private final MysqlQueryExecutor mysqlQueryExecutor;
+ private final MysqlJdbcDriverAvailability mysqlJdbcDriverAvailability;
+
+ public MysqlR2dbcJdbcQueryExecutor(MysqlCollectorProperties properties,
+ MysqlQueryExecutor mysqlQueryExecutor,
+ MysqlJdbcDriverAvailability mysqlJdbcDriverAvailability) {
+ this.properties = properties;
+ this.mysqlQueryExecutor = mysqlQueryExecutor;
+ this.mysqlJdbcDriverAvailability = mysqlJdbcDriverAvailability;
+ }
+
+ @Override
+ public boolean supports(Metrics metrics) {
+ if (metrics == null || metrics.getJdbc() == null) {
+ return false;
+ }
+ JdbcProtocol jdbcProtocol = metrics.getJdbc();
+ if (!isMysqlCompatiblePlatform(jdbcProtocol.getPlatform())) {
+ return false;
+ }
+ String queryType = jdbcProtocol.getQueryType();
+ return properties.useR2dbc(mysqlJdbcDriverAvailability.hasMysqlJdbcDriver())
+ && (QUERY_TYPE_ONE_ROW.equals(queryType)
+ || QUERY_TYPE_MULTI_ROW.equals(queryType)
+ || QUERY_TYPE_COLUMNS.equals(queryType));
+ }
+
+ private boolean isMysqlCompatiblePlatform(String platform) {
+ return "mysql".equalsIgnoreCase(platform) || "mariadb".equalsIgnoreCase(platform);
+ }
+
+ @Override
+ public JdbcQueryRowSet executeQuery(Metrics metrics, int timeout, int maxRows) {
+ JdbcProtocol jdbcProtocol = metrics.getJdbc();
+ QueryOptions options = buildQueryOptions(jdbcProtocol, timeout, maxRows);
+ QueryResult queryResult = mysqlQueryExecutor.execute(jdbcProtocol.getSql(), options);
+ if (queryResult.hasError()) {
+ throw new IllegalStateException("R2DBC MySQL query failed: " + queryResult.getError());
+ }
+ return new QueryResultRowSet(queryResult);
+ }
+
+ @Override
+ public void afterPropertiesSet() {
+ JdbcQueryExecutorRegistry.register(this);
+ }
+
+ @Override
+ public void destroy() {
+ JdbcQueryExecutorRegistry.unregister(this);
+ }
+
+ private QueryOptions buildQueryOptions(JdbcProtocol jdbcProtocol, int timeout, int maxRows) {
+ MysqlTarget target = resolveTarget(jdbcProtocol);
+ SshTunnel sshTunnel = jdbcProtocol.getSshTunnel();
+ String host = target.host();
+ int port = target.port();
+ if (sshTunnel != null && Boolean.parseBoolean(sshTunnel.getEnable())) {
+ try {
+ int localPort = SshTunnelHelper.localPortForward(sshTunnel, host, String.valueOf(port));
+ host = "127.0.0.1";
+ port = localPort;
+ } catch (Exception exception) {
+ throw new IllegalStateException("R2DBC MySQL query adapter failed to establish SSH tunnel", exception);
+ }
+ }
+ return QueryOptions.builder()
+ .host(host)
+ .port(port)
+ .username(jdbcProtocol.getUsername())
+ .password(jdbcProtocol.getPassword())
+ .database(target.database())
+ .schema(target.database())
+ .timeout(Duration.ofMillis(timeout))
+ .maxRows(maxRows)
+ .fetchSize(256)
+ .readOnly(true)
+ .build();
+ }
+
+ private MysqlTarget resolveTarget(JdbcProtocol jdbcProtocol) {
+ if (StringUtils.hasText(jdbcProtocol.getUrl())) {
+ return parseJdbcUrl(jdbcProtocol.getUrl(), jdbcProtocol.getDatabase());
+ }
+ if (!StringUtils.hasText(jdbcProtocol.getHost()) || !StringUtils.hasText(jdbcProtocol.getPort())) {
+ throw new IllegalArgumentException("R2DBC MySQL query adapter requires host/port or a jdbc:mysql URL");
+ }
+ return new MysqlTarget(jdbcProtocol.getHost(), Integer.parseInt(jdbcProtocol.getPort()), jdbcProtocol.getDatabase());
+ }
+
+ private MysqlTarget parseJdbcUrl(String url, String fallbackDatabase) {
+ String trimmed = url.trim();
+ if (!(trimmed.startsWith("jdbc:mysql://") || trimmed.startsWith("jdbc:mariadb://"))) {
+ throw new IllegalArgumentException("R2DBC MySQL query adapter only supports jdbc:mysql:// or jdbc:mariadb:// URLs");
+ }
+ URI uri = URI.create(trimmed.substring("jdbc:".length()));
+ String host = uri.getHost();
+ int port = uri.getPort() > 0 ? uri.getPort() : 3306;
+ if (!StringUtils.hasText(host)) {
+ throw new IllegalArgumentException("R2DBC MySQL query adapter URL must include a host");
+ }
+ String path = uri.getPath();
+ String database = StringUtils.hasText(path) && path.length() > 1 ? path.substring(1) : fallbackDatabase;
+ return new MysqlTarget(host, port, database);
+ }
+
+ private record MysqlTarget(String host, int port, String database) {
+ }
+
+ private static final class QueryResultRowSet implements JdbcQueryRowSet {
+
+ private final List> rows;
+ private final Map columnIndexMap;
+ private int currentIndex = -1;
+
+ private QueryResultRowSet(QueryResult queryResult) {
+ this.rows = queryResult.getRows();
+ this.columnIndexMap = buildColumnIndexMap(queryResult.getColumns());
+ }
+
+ @Override
+ public boolean next() {
+ currentIndex++;
+ return currentIndex < rows.size();
+ }
+
+ @Override
+ public String getString(String column) {
+ Integer index = columnIndexMap.get(column.toLowerCase(Locale.ROOT));
+ if (index == null) {
+ throw new IllegalArgumentException("Column not found in R2DBC MySQL result: " + column);
+ }
+ return getString(index + 1);
+ }
+
+ @Override
+ public String getString(int index) {
+ if (currentIndex < 0 || currentIndex >= rows.size()) {
+ throw new IllegalStateException("R2DBC MySQL result cursor is not positioned on a row");
+ }
+ int zeroBased = index - 1;
+ List row = rows.get(currentIndex);
+ if (zeroBased < 0 || zeroBased >= row.size()) {
+ throw new IllegalArgumentException("Column index out of bounds in R2DBC MySQL result: " + index);
+ }
+ return row.get(zeroBased);
+ }
+
+ @Override
+ public void close() {
+ // QueryResult is fully materialized, so there is nothing left to close here.
+ }
+
+ private static Map buildColumnIndexMap(List columns) {
+ Map indexMap = new HashMap<>(columns.size());
+ for (int index = 0; index < columns.size(); index++) {
+ indexMap.put(columns.get(index).toLowerCase(Locale.ROOT), index);
+ }
+ return indexMap;
+ }
+ }
+}
diff --git a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/collect/strategy/CollectStrategyFactory.java b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/collect/strategy/CollectStrategyFactory.java
index 4c6517ab905..28b9d2db88e 100644
--- a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/collect/strategy/CollectStrategyFactory.java
+++ b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/collect/strategy/CollectStrategyFactory.java
@@ -20,6 +20,7 @@
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
+import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.collector.collect.AbstractCollect;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Configuration;
@@ -29,6 +30,7 @@
/**
* Specific metrics collection factory
*/
+@Slf4j
@Configuration
@Order(value = Ordered.HIGHEST_PRECEDENCE + 1)
public class CollectStrategyFactory implements CommandLineRunner {
@@ -49,10 +51,18 @@ public static AbstractCollect invoke(String protocol) {
@Override
public void run(String... args) throws Exception {
+ COLLECT_STRATEGY.clear();
// spi load and registry protocol and collect instance
ServiceLoader loader = ServiceLoader.load(AbstractCollect.class, AbstractCollect.class.getClassLoader());
for (AbstractCollect collect : loader) {
COLLECT_STRATEGY.put(collect.supportProtocol(), collect);
}
+ if (COLLECT_STRATEGY.isEmpty()) {
+ throw new IllegalStateException(
+ "No collect strategies were registered. "
+ + "Verify META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect "
+ + "is present on the runtime classpath.");
+ }
+ log.info("Registered {} collect strategies: {}", COLLECT_STRATEGY.size(), COLLECT_STRATEGY.keySet());
}
}
diff --git a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/nativex/NativeCollectorDefaults.java b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/nativex/NativeCollectorDefaults.java
index 5f97c36ecbf..1926c48cfb8 100644
--- a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/nativex/NativeCollectorDefaults.java
+++ b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/nativex/NativeCollectorDefaults.java
@@ -22,16 +22,18 @@
import org.springframework.core.NativeDetector;
/**
- * Applies the native-only collector defaults without forking {@code application.yml}.
+ * Applies collector defaults without forking {@code application.yml}.
*/
public final class NativeCollectorDefaults {
static final String AUTOCONFIGURE_EXCLUDE_PROPERTY = "spring.autoconfigure.exclude";
- static final String NATIVE_AUTOCONFIGURE_EXCLUDES = String.join(",",
+ static final String JVM_AUTOCONFIGURE_EXCLUDES = String.join(",",
"org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration",
"org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration",
"org.springframework.boot.jdbc.autoconfigure.DataSourceAutoConfiguration",
- "org.springframework.boot.hibernate.autoconfigure.HibernateJpaAutoConfiguration",
+ "org.springframework.boot.hibernate.autoconfigure.HibernateJpaAutoConfiguration");
+ static final String NATIVE_AUTOCONFIGURE_EXCLUDES = String.join(",",
+ JVM_AUTOCONFIGURE_EXCLUDES,
"org.springframework.boot.data.jpa.autoconfigure.DataJpaRepositoriesAutoConfiguration",
"org.springframework.boot.jdbc.autoconfigure.DataSourceInitializationAutoConfiguration",
"org.springframework.boot.jdbc.autoconfigure.DataSourceTransactionManagerAutoConfiguration",
@@ -43,16 +45,12 @@ private NativeCollectorDefaults() {
}
public static void applyTo(SpringApplication application) {
- Map defaultProperties = defaultProperties(NativeDetector.inNativeImage());
- if (!defaultProperties.isEmpty()) {
- application.setDefaultProperties(defaultProperties);
- }
+ application.setDefaultProperties(defaultProperties(NativeDetector.inNativeImage()));
}
static Map defaultProperties(boolean nativeImage) {
- if (!nativeImage) {
- return Map.of();
- }
- return Map.of(AUTOCONFIGURE_EXCLUDE_PROPERTY, NATIVE_AUTOCONFIGURE_EXCLUDES);
+ return Map.of(
+ AUTOCONFIGURE_EXCLUDE_PROPERTY,
+ nativeImage ? NATIVE_AUTOCONFIGURE_EXCLUDES : JVM_AUTOCONFIGURE_EXCLUDES);
}
}
diff --git a/hertzbeat-collector/hertzbeat-collector-collector/src/main/resources/application.yml b/hertzbeat-collector/hertzbeat-collector-collector/src/main/resources/application.yml
index 9615e547b94..03702ef5627 100644
--- a/hertzbeat-collector/hertzbeat-collector-collector/src/main/resources/application.yml
+++ b/hertzbeat-collector/hertzbeat-collector-collector/src/main/resources/application.yml
@@ -27,9 +27,6 @@ spring:
timeout-per-shutdown-phase: 10s
jackson:
default-property-inclusion: ALWAYS
- # need to disable spring boot mongodb auto config, or default mongodb connection tried and failed...
- autoconfigure:
- exclude: org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration, org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration, org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration, org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration
management:
endpoints:
web:
@@ -77,6 +74,13 @@ common:
type: netty
hertzbeat:
+ collector:
+ mysql:
+ # MySQL-compatible query engine routing for MySQL, MariaDB, OceanBase, and TiDB SQL metrics.
+ # auto : prefer JDBC only when mysql-connector-j is available from ext-lib, otherwise use the built-in query engine
+ # jdbc : always use JDBC
+ # r2dbc : always use the built-in query engine
+ query-engine: ${HERTZBEAT_COLLECTOR_MYSQL_QUERY_ENGINE:auto}
# Optional virtual-thread overrides. Remove this whole block to use built-in defaults.
vthreads:
enabled: true
diff --git a/hertzbeat-collector/hertzbeat-collector-collector/src/test/java/org/apache/hertzbeat/collector/collect/database/mysql/MariadbJdbcQueryAdapterTemplateIntegrationTest.java b/hertzbeat-collector/hertzbeat-collector-collector/src/test/java/org/apache/hertzbeat/collector/collect/database/mysql/MariadbJdbcQueryAdapterTemplateIntegrationTest.java
new file mode 100644
index 00000000000..1ac654969d0
--- /dev/null
+++ b/hertzbeat-collector/hertzbeat-collector-collector/src/test/java/org/apache/hertzbeat/collector/collect/database/mysql/MariadbJdbcQueryAdapterTemplateIntegrationTest.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.database.mysql;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hertzbeat.collector.collect.strategy.CollectStrategyFactory;
+import org.apache.hertzbeat.collector.dispatch.CollectDataDispatch;
+import org.apache.hertzbeat.collector.dispatch.MetricsCollect;
+import org.apache.hertzbeat.collector.mysql.r2dbc.MysqlR2dbcConnectionFactoryProvider;
+import org.apache.hertzbeat.collector.mysql.r2dbc.MysqlR2dbcQueryExecutor;
+import org.apache.hertzbeat.collector.mysql.r2dbc.ResultSetMapper;
+import org.apache.hertzbeat.collector.mysql.r2dbc.SqlGuard;
+import org.apache.hertzbeat.collector.timer.WheelTimerTask;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.job.protocol.JdbcProtocol;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.timer.Timeout;
+import org.apache.hertzbeat.common.util.JsonUtil;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DynamicTest;
+import org.junit.jupiter.api.TestFactory;
+import org.junit.jupiter.api.TestInstance;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+import org.yaml.snakeyaml.Yaml;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class MariadbJdbcQueryAdapterTemplateIntegrationTest {
+
+ private static final String TEST_DATABASE = "hzb";
+ private static final String TEST_USERNAME = "test";
+ private static final String TEST_PASSWORD = "test123";
+ private static final String ROOT_PASSWORD = "root123";
+
+ private GenericContainer> container;
+ private MysqlR2dbcJdbcQueryExecutor jdbcQueryExecutor;
+ private List mariadbTemplateMetrics;
+
+ @BeforeAll
+ void setUp() throws Exception {
+ Assumptions.assumeTrue(DockerClientFactory.instance().isDockerAvailable(), "Docker is required for integration tests");
+ new CollectStrategyFactory().run();
+ container = new GenericContainer<>(DockerImageName.parse("mariadb:11.4"))
+ .withExposedPorts(3306)
+ .withEnv("MARIADB_DATABASE", TEST_DATABASE)
+ .withEnv("MARIADB_USER", TEST_USERNAME)
+ .withEnv("MARIADB_PASSWORD", TEST_PASSWORD)
+ .withEnv("MARIADB_ROOT_PASSWORD", ROOT_PASSWORD)
+ .waitingFor(Wait.forListeningPort());
+ container.start();
+ awaitTcpLoginReady(container, TEST_USERNAME, TEST_PASSWORD, TEST_DATABASE);
+ initMonitoringData(container);
+
+ MysqlCollectorProperties properties = new MysqlCollectorProperties();
+ properties.setQueryEngine(MysqlCollectorProperties.QueryEngine.R2DBC);
+ jdbcQueryExecutor = new MysqlR2dbcJdbcQueryExecutor(
+ properties,
+ new MysqlR2dbcQueryExecutor(
+ new MysqlR2dbcConnectionFactoryProvider(),
+ new ResultSetMapper(),
+ new SqlGuard()),
+ new MysqlJdbcDriverAvailability());
+ jdbcQueryExecutor.afterPropertiesSet();
+ mariadbTemplateMetrics = loadMariadbTemplate().getMetrics();
+ }
+
+ @AfterAll
+ void tearDown() throws Exception {
+ if (jdbcQueryExecutor != null) {
+ jdbcQueryExecutor.destroy();
+ }
+ if (container != null) {
+ container.stop();
+ }
+ }
+
+ @TestFactory
+ Stream shouldCollectOfficialMariadbTemplateThroughJdbcQueryAdapter() {
+ return mariadbTemplateMetrics.stream()
+ .map(templateMetric -> DynamicTest.dynamicTest(templateMetric.getName(),
+ () -> verifyTemplateMetric(templateMetric)));
+ }
+
+ private void verifyTemplateMetric(Metrics templateMetric) throws Exception {
+ Metrics metric = materializeMetric(templateMetric);
+ if ("process_state".equals(metric.getName())) {
+ startBackgroundSleepQuery(container);
+ }
+ if ("slow_sql".equals(metric.getName())) {
+ generateSlowQuery(container);
+ }
+ CollectRep.MetricsData metricsData = collect(metric);
+ assertEquals(CollectRep.Code.SUCCESS, metricsData.getCode(),
+ () -> metric.getName() + " failed: " + metricsData.getMsg());
+ assertEquals(metric.getFields().size(), metricsData.getFieldsCount(),
+ () -> metric.getName() + " fields should still be produced by the original parser");
+ if ("columns".equals(metric.getJdbc().getQueryType())) {
+ assertEquals(1, metricsData.getValuesCount(), () -> metric.getName() + " should keep the original single-row shape");
+ }
+ if ("basic".equals(metric.getName())) {
+ assertTrue(metricsData.getValuesCount() > 0);
+ assertNotNull(metricsData.getValues().getFirst().getColumns(0));
+ assertTrue(!Objects.equals(CommonConstants.NULL_VALUE, metricsData.getValues().getFirst().getColumns(0)),
+ "basic.version should be collected through the adapted query path");
+ }
+ if ("process_state".equals(metric.getName()) || "slow_sql".equals(metric.getName())) {
+ assertTrue(metricsData.getValuesCount() > 0, () -> metric.getName() + " should return at least one row");
+ }
+ }
+
+ private CollectRep.MetricsData collect(Metrics metric) {
+ Job job = Job.builder()
+ .monitorId(1L)
+ .tenantId(1L)
+ .app("mariadb")
+ .defaultInterval(600L)
+ .metadata(new HashMap<>(0))
+ .labels(new HashMap<>(0))
+ .annotations(new HashMap<>(0))
+ .configmap(new ArrayList<>(0))
+ .metrics(new ArrayList<>(List.of(metric)))
+ .build();
+ WheelTimerTask timerTask = new WheelTimerTask(job, timeout -> {
+ });
+ CapturingCollectDataDispatch collectDataDispatch = new CapturingCollectDataDispatch();
+ MetricsCollect metricsCollect = new MetricsCollect(
+ metric,
+ new StubTimeout(timerTask),
+ collectDataDispatch,
+ "collector-test",
+ List.of());
+ metricsCollect.run();
+ assertNotNull(collectDataDispatch.metricsData, metric.getName() + " should dispatch metrics data");
+ return collectDataDispatch.metricsData;
+ }
+
+ private Metrics materializeMetric(Metrics templateMetric) {
+ Metrics metric = JsonUtil.fromJson(JsonUtil.toJson(templateMetric), Metrics.class);
+ JdbcProtocol jdbcProtocol = metric.getJdbc();
+ jdbcProtocol.setHost(container.getHost());
+ jdbcProtocol.setPort(String.valueOf(container.getMappedPort(3306)));
+ jdbcProtocol.setUsername(TEST_USERNAME);
+ jdbcProtocol.setPassword(TEST_PASSWORD);
+ jdbcProtocol.setTimeout(String.valueOf(Duration.ofSeconds(8).toMillis()));
+ jdbcProtocol.setReuseConnection("false");
+ jdbcProtocol.setUrl(null);
+ jdbcProtocol.setSshTunnel(null);
+ if (jdbcProtocol.getDatabase() == null || jdbcProtocol.getDatabase().contains("^_^")) {
+ jdbcProtocol.setDatabase(TEST_DATABASE);
+ }
+ if (metric.getAliasFields() == null || metric.getAliasFields().isEmpty()) {
+ metric.setAliasFields(metric.getFields().stream().map(Metrics.Field::getField).collect(Collectors.toList()));
+ }
+ return metric;
+ }
+
+ private Job loadMariadbTemplate() throws IOException {
+ Path template = Path.of("..", "..", "hertzbeat-manager", "src", "main", "resources", "define", "app-mariadb.yml")
+ .toAbsolutePath()
+ .normalize();
+ Yaml yaml = new Yaml();
+ try (Reader reader = Files.newBufferedReader(template)) {
+ return yaml.loadAs(reader, Job.class);
+ }
+ }
+
+ private void initMonitoringData(GenericContainer> mariaDb) throws Exception {
+ execRoot(mariaDb,
+ "GRANT SELECT ON mysql.* TO '" + TEST_USERNAME + "'@'%';"
+ + " GRANT PROCESS ON *.* TO '" + TEST_USERNAME + "'@'%';"
+ + " SET GLOBAL log_output='TABLE';"
+ + " SET GLOBAL slow_query_log='ON';"
+ + " SET GLOBAL long_query_time=0;"
+ + " FLUSH PRIVILEGES;");
+ generateSlowQuery(mariaDb);
+ }
+
+ private void generateSlowQuery(GenericContainer> mariaDb) throws Exception {
+ execUser(mariaDb, TEST_DATABASE, "SELECT SLEEP(0.2);");
+ Thread.sleep(300);
+ }
+
+ private void startBackgroundSleepQuery(GenericContainer> mariaDb) throws Exception {
+ String command = String.join(" ",
+ "CLIENT=$(command -v mysql || command -v mariadb)",
+ "&&",
+ "nohup sh -lc",
+ "'$CLIENT --protocol=TCP -h127.0.0.1 -P3306",
+ "-u" + TEST_USERNAME,
+ "-p" + TEST_PASSWORD,
+ TEST_DATABASE,
+ "-e",
+ "\"SELECT SLEEP(15)\" >/tmp/process-state.log 2>&1'",
+ ">/dev/null 2>&1 &");
+ mariaDb.execInContainer("sh", "-lc", command);
+ Thread.sleep(500);
+ }
+
+ private void awaitTcpLoginReady(GenericContainer> mariaDb, String username, String password, String database) throws Exception {
+ long deadline = System.currentTimeMillis() + Duration.ofSeconds(30).toMillis();
+ while (System.currentTimeMillis() < deadline) {
+ try {
+ var result = mariaDb.execInContainer("sh", "-lc", mysqlCliCommand(username, password, database, "SELECT 1"));
+ if (result.getExitCode() == 0) {
+ return;
+ }
+ } catch (Exception ignored) {
+ // Wait for the MariaDB entrypoint to finish bootstrapping and switch to the final TCP listener.
+ }
+ Thread.sleep(1000);
+ }
+ throw new IllegalStateException("Timed out waiting for MariaDB TCP login to become ready");
+ }
+
+ private void execRoot(GenericContainer> mariaDb, String sql) throws Exception {
+ var result = mariaDb.execInContainer("sh", "-lc", mysqlCliCommand("root", ROOT_PASSWORD, "mysql", sql));
+ if (result.getExitCode() != 0) {
+ throw new IllegalStateException("root mysql command failed: " + result.getStderr());
+ }
+ }
+
+ private void execUser(GenericContainer> mariaDb, String database, String sql) throws Exception {
+ var result = mariaDb.execInContainer("sh", "-lc", mysqlCliCommand(TEST_USERNAME, TEST_PASSWORD, database, sql));
+ if (result.getExitCode() != 0) {
+ throw new IllegalStateException("user mysql command failed: " + result.getStderr());
+ }
+ }
+
+ private String mysqlCliCommand(String username, String password, String database, String sql) {
+ return String.join(" ",
+ "CLIENT=$(command -v mysql || command -v mariadb)",
+ "&&",
+ "$CLIENT --protocol=TCP -h127.0.0.1 -P3306",
+ "-u" + username,
+ "-p" + password,
+ database,
+ "-e",
+ "\"" + sql.replace("\"", "\\\"") + "\"");
+ }
+
+ private static final class CapturingCollectDataDispatch implements CollectDataDispatch {
+
+ private CollectRep.MetricsData metricsData;
+
+ @Override
+ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData) {
+ this.metricsData = metricsData;
+ }
+
+ @Override
+ public void dispatchCollectData(Timeout timeout, Metrics metrics, List metricsDataList) {
+ if (metricsDataList != null && !metricsDataList.isEmpty()) {
+ this.metricsData = metricsDataList.getFirst();
+ }
+ }
+ }
+
+ private record StubTimeout(WheelTimerTask wheelTimerTask) implements Timeout {
+
+ @Override
+ public org.apache.hertzbeat.common.timer.Timer timer() {
+ return null;
+ }
+
+ @Override
+ public org.apache.hertzbeat.common.timer.TimerTask task() {
+ return wheelTimerTask;
+ }
+
+ @Override
+ public boolean isExpired() {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean cancel() {
+ return false;
+ }
+ }
+}
diff --git a/hertzbeat-collector/hertzbeat-collector-collector/src/test/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlJdbcDriverAvailabilityTest.java b/hertzbeat-collector/hertzbeat-collector-collector/src/test/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlJdbcDriverAvailabilityTest.java
new file mode 100644
index 00000000000..9c5cabe8428
--- /dev/null
+++ b/hertzbeat-collector/hertzbeat-collector-collector/src/test/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlJdbcDriverAvailabilityTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.database.mysql;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+class MysqlJdbcDriverAvailabilityTest {
+
+ @Test
+ void shouldTreatOnlyExtLibLocationsAsAutoJdbcSignal() {
+ assertTrue(MysqlJdbcDriverAvailability.isExtLibLocation("/opt/hertzbeat/ext-lib/mysql-connector-j-9.0.0.jar"));
+ assertTrue(MysqlJdbcDriverAvailability.isExtLibLocation("file:/C:/hertzbeat/ext-lib/mysql-connector-j-9.0.0.jar"));
+ assertFalse(MysqlJdbcDriverAvailability.isExtLibLocation("/Users/dev/.m2/repository/com/mysql/mysql-connector-j/9.0.0/mysql-connector-j-9.0.0.jar"));
+ assertFalse(MysqlJdbcDriverAvailability.isExtLibLocation(null));
+ }
+
+ @Test
+ void shouldIgnoreTestClasspathMysqlDriverWhenItIsNotFromExtLib() {
+ MysqlJdbcDriverAvailability availability = new MysqlJdbcDriverAvailability();
+
+ assertFalse(availability.hasMysqlJdbcDriver());
+ }
+}
diff --git a/hertzbeat-collector/hertzbeat-collector-collector/src/test/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlJdbcQueryAdapterCompatibilityIntegrationTest.java b/hertzbeat-collector/hertzbeat-collector-collector/src/test/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlJdbcQueryAdapterCompatibilityIntegrationTest.java
new file mode 100644
index 00000000000..38d7265ddb8
--- /dev/null
+++ b/hertzbeat-collector/hertzbeat-collector-collector/src/test/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlJdbcQueryAdapterCompatibilityIntegrationTest.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.database.mysql;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hertzbeat.collector.collect.strategy.CollectStrategyFactory;
+import org.apache.hertzbeat.collector.dispatch.CollectDataDispatch;
+import org.apache.hertzbeat.collector.dispatch.MetricsCollect;
+import org.apache.hertzbeat.collector.mysql.r2dbc.MysqlR2dbcConnectionFactoryProvider;
+import org.apache.hertzbeat.collector.mysql.r2dbc.MysqlR2dbcQueryExecutor;
+import org.apache.hertzbeat.collector.mysql.r2dbc.ResultSetMapper;
+import org.apache.hertzbeat.collector.mysql.r2dbc.SqlGuard;
+import org.apache.hertzbeat.collector.timer.WheelTimerTask;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.job.protocol.JdbcProtocol;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.timer.Timeout;
+import org.apache.hertzbeat.common.util.JsonUtil;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DynamicTest;
+import org.junit.jupiter.api.TestFactory;
+import org.junit.jupiter.api.TestInstance;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+import org.yaml.snakeyaml.Yaml;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class MysqlJdbcQueryAdapterCompatibilityIntegrationTest {
+
+ private static final String TEST_DATABASE = "hzb";
+ private static final String TEST_USERNAME = "test";
+ private static final String TEST_PASSWORD = "test123";
+ private static final String ROOT_PASSWORD = "root123";
+ private static final Set REPRESENTATIVE_TEMPLATE_METRICS = Set.of("basic", "process_state");
+
+ private List representativeTemplateMetrics;
+
+ @BeforeAll
+ void setUp() throws Exception {
+ Assumptions.assumeTrue(DockerClientFactory.instance().isDockerAvailable(), "Docker is required for integration tests");
+ new CollectStrategyFactory().run();
+ representativeTemplateMetrics = loadMysqlTemplate().getMetrics().stream()
+ .filter(metric -> REPRESENTATIVE_TEMPLATE_METRICS.contains(metric.getName()))
+ .collect(Collectors.toList());
+ }
+
+ @TestFactory
+ Stream shouldCollectRepresentativeTemplateMetricsAcrossCompatibilityMatrix() {
+ return Stream.of(
+ new DatabaseTarget("mysql-5.7.44", DockerImageName.parse("mysql:5.7.44"), false),
+ new DatabaseTarget("mysql-8.0.36", DockerImageName.parse("mysql:8.0.36"), false),
+ new DatabaseTarget("mariadb-11.4", DockerImageName.parse("mariadb:11.4"), true))
+ .map(target -> DynamicTest.dynamicTest(target.name(), () -> verifyRepresentativeMetrics(target)));
+ }
+
+ private void verifyRepresentativeMetrics(DatabaseTarget target) throws Exception {
+ MysqlCollectorProperties properties = new MysqlCollectorProperties();
+ properties.setQueryEngine(MysqlCollectorProperties.QueryEngine.R2DBC);
+ MysqlR2dbcJdbcQueryExecutor jdbcQueryExecutor = new MysqlR2dbcJdbcQueryExecutor(
+ properties,
+ new MysqlR2dbcQueryExecutor(
+ new MysqlR2dbcConnectionFactoryProvider(),
+ new ResultSetMapper(),
+ new SqlGuard()),
+ new MysqlJdbcDriverAvailability());
+ try (GenericContainer> container = createContainer(target)) {
+ jdbcQueryExecutor.afterPropertiesSet();
+ container.start();
+ awaitTcpLoginReady(container, TEST_USERNAME, TEST_PASSWORD, TEST_DATABASE);
+ initMonitoringData(container);
+
+ for (Metrics templateMetric : representativeTemplateMetrics) {
+ Metrics metric = materializeMetric(templateMetric, container);
+ if ("process_state".equals(metric.getName())) {
+ startBackgroundSleepQuery(container);
+ }
+ CollectRep.MetricsData metricsData = collect(metric);
+ assertEquals(CollectRep.Code.SUCCESS, metricsData.getCode(),
+ () -> target.name() + " " + metric.getName() + " failed: " + metricsData.getMsg());
+ assertEquals(metric.getFields().size(), metricsData.getFieldsCount(),
+ () -> target.name() + " " + metric.getName() + " should keep the original parser output shape");
+ if ("basic".equals(metric.getName())) {
+ assertTrue(metricsData.getValuesCount() > 0, () -> target.name() + " basic should return data");
+ assertNotNull(metricsData.getValues().getFirst().getColumns(0));
+ assertTrue(!Objects.equals(CommonConstants.NULL_VALUE, metricsData.getValues().getFirst().getColumns(0)),
+ () -> target.name() + " basic.version should be collected");
+ }
+ if ("process_state".equals(metric.getName())) {
+ assertTrue(metricsData.getValuesCount() > 0,
+ () -> target.name() + " process_state should return at least one grouped state row");
+ }
+ }
+ } finally {
+ jdbcQueryExecutor.destroy();
+ }
+ }
+
+ private GenericContainer> createContainer(DatabaseTarget target) {
+ GenericContainer> container = new GenericContainer<>(target.image())
+ .withExposedPorts(3306)
+ .waitingFor(Wait.forListeningPort());
+ if (target.mariaDb()) {
+ return container.withEnv("MARIADB_DATABASE", TEST_DATABASE)
+ .withEnv("MARIADB_USER", TEST_USERNAME)
+ .withEnv("MARIADB_PASSWORD", TEST_PASSWORD)
+ .withEnv("MARIADB_ROOT_PASSWORD", ROOT_PASSWORD);
+ }
+ return container.withEnv("MYSQL_DATABASE", TEST_DATABASE)
+ .withEnv("MYSQL_USER", TEST_USERNAME)
+ .withEnv("MYSQL_PASSWORD", TEST_PASSWORD)
+ .withEnv("MYSQL_ROOT_PASSWORD", ROOT_PASSWORD);
+ }
+
+ private CollectRep.MetricsData collect(Metrics metric) {
+ Job job = Job.builder()
+ .monitorId(1L)
+ .tenantId(1L)
+ .app("mysql")
+ .defaultInterval(600L)
+ .metadata(new HashMap<>(0))
+ .labels(new HashMap<>(0))
+ .annotations(new HashMap<>(0))
+ .configmap(new ArrayList<>(0))
+ .metrics(new ArrayList<>(List.of(metric)))
+ .build();
+ WheelTimerTask timerTask = new WheelTimerTask(job, timeout -> {
+ });
+ CapturingCollectDataDispatch collectDataDispatch = new CapturingCollectDataDispatch();
+ MetricsCollect metricsCollect = new MetricsCollect(
+ metric,
+ new StubTimeout(timerTask),
+ collectDataDispatch,
+ "collector-test",
+ List.of());
+ metricsCollect.run();
+ return collectDataDispatch.metricsData;
+ }
+
+ private Metrics materializeMetric(Metrics templateMetric, GenericContainer> container) {
+ Metrics metric = JsonUtil.fromJson(JsonUtil.toJson(templateMetric), Metrics.class);
+ JdbcProtocol jdbcProtocol = metric.getJdbc();
+ jdbcProtocol.setHost(container.getHost());
+ jdbcProtocol.setPort(String.valueOf(container.getMappedPort(3306)));
+ jdbcProtocol.setUsername(TEST_USERNAME);
+ jdbcProtocol.setPassword(TEST_PASSWORD);
+ jdbcProtocol.setTimeout(String.valueOf(Duration.ofSeconds(8).toMillis()));
+ jdbcProtocol.setReuseConnection("false");
+ jdbcProtocol.setUrl(null);
+ jdbcProtocol.setSshTunnel(null);
+ if (jdbcProtocol.getDatabase() == null || jdbcProtocol.getDatabase().contains("^_^")) {
+ jdbcProtocol.setDatabase(TEST_DATABASE);
+ }
+ if (metric.getAliasFields() == null || metric.getAliasFields().isEmpty()) {
+ metric.setAliasFields(metric.getFields().stream().map(Metrics.Field::getField).collect(Collectors.toList()));
+ }
+ return metric;
+ }
+
+ private Job loadMysqlTemplate() throws IOException {
+ Path template = Path.of("..", "..", "hertzbeat-manager", "src", "main", "resources", "define", "app-mysql.yml")
+ .toAbsolutePath()
+ .normalize();
+ Yaml yaml = new Yaml();
+ try (Reader reader = Files.newBufferedReader(template)) {
+ return yaml.loadAs(reader, Job.class);
+ }
+ }
+
+ private void initMonitoringData(GenericContainer> mysql) throws Exception {
+ execRoot(mysql,
+ "GRANT SELECT ON mysql.* TO '" + TEST_USERNAME + "'@'%';"
+ + " GRANT PROCESS ON *.* TO '" + TEST_USERNAME + "'@'%';"
+ + " SET GLOBAL log_output='TABLE';"
+ + " SET GLOBAL slow_query_log='ON';"
+ + " SET GLOBAL long_query_time=0;"
+ + " FLUSH PRIVILEGES;");
+ }
+
+ private void startBackgroundSleepQuery(GenericContainer> mysql) throws Exception {
+ String command = String.join(" ",
+ "CLIENT=$(command -v mysql || command -v mariadb)",
+ "&&",
+ "nohup sh -lc",
+ "'$CLIENT --protocol=TCP -h127.0.0.1 -P3306",
+ "-u" + TEST_USERNAME,
+ "-p" + TEST_PASSWORD,
+ TEST_DATABASE,
+ "-e",
+ "\"SELECT SLEEP(15)\" >/tmp/process-state.log 2>&1'",
+ ">/dev/null 2>&1 &");
+ mysql.execInContainer("sh", "-lc", command);
+ Thread.sleep(500);
+ }
+
+ private void awaitTcpLoginReady(GenericContainer> mysql, String username, String password, String database) throws Exception {
+ long deadline = System.currentTimeMillis() + Duration.ofSeconds(30).toMillis();
+ while (System.currentTimeMillis() < deadline) {
+ try {
+ var result = mysql.execInContainer("sh", "-lc", mysqlCliCommand(username, password, database, "SELECT 1"));
+ if (result.getExitCode() == 0) {
+ return;
+ }
+ } catch (Exception ignored) {
+ // Wait for the database entrypoint to finish bootstrapping and switch to the final TCP listener.
+ }
+ Thread.sleep(1000);
+ }
+ throw new IllegalStateException("Timed out waiting for MySQL-compatible TCP login to become ready");
+ }
+
+ private void execRoot(GenericContainer> mysql, String sql) throws Exception {
+ var result = mysql.execInContainer("sh", "-lc", mysqlCliCommand("root", ROOT_PASSWORD, "mysql", sql));
+ if (result.getExitCode() != 0) {
+ throw new IllegalStateException("root mysql command failed: " + result.getStderr());
+ }
+ }
+
+ private String mysqlCliCommand(String username, String password, String database, String sql) {
+ return String.join(" ",
+ "CLIENT=$(command -v mysql || command -v mariadb)",
+ "&&",
+ "$CLIENT --protocol=TCP -h127.0.0.1 -P3306",
+ "-u" + username,
+ "-p" + password,
+ database,
+ "-e",
+ "\"" + sql.replace("\"", "\\\"") + "\"");
+ }
+
+ private record DatabaseTarget(String name, DockerImageName image, boolean mariaDb) {
+ }
+
+ private static final class CapturingCollectDataDispatch implements CollectDataDispatch {
+
+ private CollectRep.MetricsData metricsData;
+
+ @Override
+ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData) {
+ this.metricsData = metricsData;
+ }
+
+ @Override
+ public void dispatchCollectData(Timeout timeout, Metrics metrics, List metricsDataList) {
+ if (metricsDataList != null && !metricsDataList.isEmpty()) {
+ this.metricsData = metricsDataList.getFirst();
+ }
+ }
+ }
+
+ private record StubTimeout(WheelTimerTask wheelTimerTask) implements Timeout {
+
+ @Override
+ public org.apache.hertzbeat.common.timer.Timer timer() {
+ return null;
+ }
+
+ @Override
+ public org.apache.hertzbeat.common.timer.TimerTask task() {
+ return wheelTimerTask;
+ }
+
+ @Override
+ public boolean isExpired() {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean cancel() {
+ return false;
+ }
+ }
+}
diff --git a/hertzbeat-collector/hertzbeat-collector-collector/src/test/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlJdbcQueryAdapterTemplateIntegrationTest.java b/hertzbeat-collector/hertzbeat-collector-collector/src/test/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlJdbcQueryAdapterTemplateIntegrationTest.java
new file mode 100644
index 00000000000..3d91c7107d6
--- /dev/null
+++ b/hertzbeat-collector/hertzbeat-collector-collector/src/test/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlJdbcQueryAdapterTemplateIntegrationTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.database.mysql;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hertzbeat.collector.collect.strategy.CollectStrategyFactory;
+import org.apache.hertzbeat.collector.dispatch.CollectDataDispatch;
+import org.apache.hertzbeat.collector.dispatch.MetricsCollect;
+import org.apache.hertzbeat.collector.mysql.r2dbc.MysqlR2dbcConnectionFactoryProvider;
+import org.apache.hertzbeat.collector.mysql.r2dbc.MysqlR2dbcQueryExecutor;
+import org.apache.hertzbeat.collector.mysql.r2dbc.ResultSetMapper;
+import org.apache.hertzbeat.collector.mysql.r2dbc.SqlGuard;
+import org.apache.hertzbeat.collector.timer.WheelTimerTask;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.job.protocol.JdbcProtocol;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.timer.Timeout;
+import org.apache.hertzbeat.common.util.JsonUtil;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DynamicTest;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestFactory;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+import org.yaml.snakeyaml.Yaml;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class MysqlJdbcQueryAdapterTemplateIntegrationTest {
+
+ private static final String TEST_DATABASE = "hzb";
+ private static final String TEST_USERNAME = "test";
+ private static final String TEST_PASSWORD = "test123";
+ private static final String ROOT_PASSWORD = "root123";
+
+ private GenericContainer> container;
+ private MysqlR2dbcJdbcQueryExecutor jdbcQueryExecutor;
+ private List mysqlTemplateMetrics;
+
+ @BeforeAll
+ void setUp() throws Exception {
+ Assumptions.assumeTrue(DockerClientFactory.instance().isDockerAvailable(), "Docker is required for integration tests");
+ new CollectStrategyFactory().run();
+ container = new GenericContainer<>(DockerImageName.parse("mysql:8.0.36"))
+ .withExposedPorts(3306)
+ .withEnv("MYSQL_DATABASE", TEST_DATABASE)
+ .withEnv("MYSQL_USER", TEST_USERNAME)
+ .withEnv("MYSQL_PASSWORD", TEST_PASSWORD)
+ .withEnv("MYSQL_ROOT_PASSWORD", ROOT_PASSWORD)
+ .waitingFor(Wait.forListeningPort());
+ container.start();
+ awaitTcpLoginReady(container, TEST_USERNAME, TEST_PASSWORD, TEST_DATABASE);
+ initMonitoringData(container);
+
+ MysqlCollectorProperties properties = new MysqlCollectorProperties();
+ properties.setQueryEngine(MysqlCollectorProperties.QueryEngine.R2DBC);
+ jdbcQueryExecutor = new MysqlR2dbcJdbcQueryExecutor(
+ properties,
+ new MysqlR2dbcQueryExecutor(
+ new MysqlR2dbcConnectionFactoryProvider(),
+ new ResultSetMapper(),
+ new SqlGuard()),
+ new MysqlJdbcDriverAvailability());
+ jdbcQueryExecutor.afterPropertiesSet();
+ mysqlTemplateMetrics = loadMysqlTemplate().getMetrics();
+ }
+
+ @AfterAll
+ void tearDown() throws Exception {
+ if (jdbcQueryExecutor != null) {
+ jdbcQueryExecutor.destroy();
+ }
+ if (container != null) {
+ container.stop();
+ }
+ }
+
+ @TestFactory
+ Stream shouldCollectOfficialMysqlTemplateThroughJdbcQueryAdapter() {
+ return mysqlTemplateMetrics.stream()
+ .map(templateMetric -> DynamicTest.dynamicTest(templateMetric.getName(),
+ () -> verifyTemplateMetric(templateMetric)));
+ }
+
+ private void verifyTemplateMetric(Metrics templateMetric) throws Exception {
+ Metrics metric = materializeMetric(templateMetric);
+ if ("process_state".equals(metric.getName())) {
+ startBackgroundSleepQuery(container);
+ }
+ if ("slow_sql".equals(metric.getName())) {
+ generateSlowQuery(container);
+ }
+ CollectRep.MetricsData metricsData = collect(metric);
+ assertEquals(CollectRep.Code.SUCCESS, metricsData.getCode(),
+ () -> metric.getName() + " failed: " + metricsData.getMsg());
+ assertEquals(metric.getFields().size(), metricsData.getFieldsCount(),
+ () -> metric.getName() + " fields should still be produced by the original parser");
+ if ("columns".equals(metric.getJdbc().getQueryType())) {
+ assertEquals(1, metricsData.getValuesCount(), () -> metric.getName() + " should keep the original single-row shape");
+ }
+ if ("basic".equals(metric.getName())) {
+ assertTrue(metricsData.getValuesCount() > 0);
+ assertNotNull(metricsData.getValues().getFirst().getColumns(0));
+ assertTrue(!Objects.equals(CommonConstants.NULL_VALUE, metricsData.getValues().getFirst().getColumns(0)),
+ "basic.version should be collected through the adapted query path");
+ }
+ if ("process_state".equals(metric.getName())
+ || "slow_sql".equals(metric.getName())
+ || "account_expiry".equals(metric.getName())) {
+ assertTrue(metricsData.getValuesCount() > 0, () -> metric.getName() + " should return at least one row");
+ }
+ }
+
+ private CollectRep.MetricsData collect(Metrics metric) {
+ Job job = Job.builder()
+ .monitorId(1L)
+ .tenantId(1L)
+ .app("mysql")
+ .defaultInterval(600L)
+ .metadata(new HashMap<>(0))
+ .labels(new HashMap<>(0))
+ .annotations(new HashMap<>(0))
+ .configmap(new ArrayList<>(0))
+ .metrics(new ArrayList<>(List.of(metric)))
+ .build();
+ WheelTimerTask timerTask = new WheelTimerTask(job, timeout -> {
+ });
+ CapturingCollectDataDispatch collectDataDispatch = new CapturingCollectDataDispatch();
+ MetricsCollect metricsCollect = new MetricsCollect(
+ metric,
+ new StubTimeout(timerTask),
+ collectDataDispatch,
+ "collector-test",
+ List.of());
+ metricsCollect.run();
+ assertNotNull(collectDataDispatch.metricsData, metric.getName() + " should dispatch metrics data");
+ return collectDataDispatch.metricsData;
+ }
+
+ private Metrics materializeMetric(Metrics templateMetric) {
+ Metrics metric = JsonUtil.fromJson(JsonUtil.toJson(templateMetric), Metrics.class);
+ JdbcProtocol jdbcProtocol = metric.getJdbc();
+ jdbcProtocol.setHost(container.getHost());
+ jdbcProtocol.setPort(String.valueOf(container.getMappedPort(3306)));
+ jdbcProtocol.setUsername(TEST_USERNAME);
+ jdbcProtocol.setPassword(TEST_PASSWORD);
+ jdbcProtocol.setTimeout(String.valueOf(Duration.ofSeconds(8).toMillis()));
+ jdbcProtocol.setReuseConnection("false");
+ jdbcProtocol.setUrl(null);
+ jdbcProtocol.setSshTunnel(null);
+ if (jdbcProtocol.getDatabase() == null || jdbcProtocol.getDatabase().contains("^_^")) {
+ jdbcProtocol.setDatabase(TEST_DATABASE);
+ }
+ if (metric.getAliasFields() == null || metric.getAliasFields().isEmpty()) {
+ metric.setAliasFields(metric.getFields().stream().map(Metrics.Field::getField).collect(Collectors.toList()));
+ }
+ return metric;
+ }
+
+ private Job loadMysqlTemplate() throws IOException {
+ Path template = Path.of("..", "..", "hertzbeat-manager", "src", "main", "resources", "define", "app-mysql.yml")
+ .toAbsolutePath()
+ .normalize();
+ Yaml yaml = new Yaml();
+ try (Reader reader = Files.newBufferedReader(template)) {
+ return yaml.loadAs(reader, Job.class);
+ }
+ }
+
+ private void initMonitoringData(GenericContainer> mysql) throws Exception {
+ execRoot(mysql,
+ "GRANT SELECT ON mysql.* TO '" + TEST_USERNAME + "'@'%';"
+ + " GRANT PROCESS ON *.* TO '" + TEST_USERNAME + "'@'%';"
+ + " SET GLOBAL log_output='TABLE';"
+ + " SET GLOBAL slow_query_log='ON';"
+ + " SET GLOBAL long_query_time=0;"
+ + " FLUSH PRIVILEGES;");
+ generateSlowQuery(mysql);
+ }
+
+ private void generateSlowQuery(GenericContainer> mysql) throws Exception {
+ execUser(mysql, TEST_DATABASE, "SELECT SLEEP(0.2);");
+ Thread.sleep(300);
+ }
+
+ private void startBackgroundSleepQuery(GenericContainer> mysql) throws Exception {
+ String command = String.join(" ",
+ "CLIENT=$(command -v mysql || command -v mariadb)",
+ "&&",
+ "nohup sh -lc",
+ "'$CLIENT --protocol=TCP -h127.0.0.1 -P3306",
+ "-u" + TEST_USERNAME,
+ "-p" + TEST_PASSWORD,
+ TEST_DATABASE,
+ "-e",
+ "\"SELECT SLEEP(15)\" >/tmp/process-state.log 2>&1'",
+ ">/dev/null 2>&1 &");
+ mysql.execInContainer("sh", "-lc", command);
+ Thread.sleep(500);
+ }
+
+ private void awaitTcpLoginReady(GenericContainer> mysql, String username, String password, String database) throws Exception {
+ long deadline = System.currentTimeMillis() + Duration.ofSeconds(30).toMillis();
+ while (System.currentTimeMillis() < deadline) {
+ try {
+ var result = mysql.execInContainer("sh", "-lc", mysqlCliCommand(username, password, database, "SELECT 1"));
+ if (result.getExitCode() == 0) {
+ return;
+ }
+ } catch (Exception ignored) {
+ // Wait for the MySQL entrypoint to finish bootstrapping and switch to the final TCP listener.
+ }
+ Thread.sleep(1000);
+ }
+ throw new IllegalStateException("Timed out waiting for MySQL TCP login to become ready");
+ }
+
+ private void execRoot(GenericContainer> mysql, String sql) throws Exception {
+ var result = mysql.execInContainer("sh", "-lc", mysqlCliCommand("root", ROOT_PASSWORD, "mysql", sql));
+ if (result.getExitCode() != 0) {
+ throw new IllegalStateException("root mysql command failed: " + result.getStderr());
+ }
+ }
+
+ private void execUser(GenericContainer> mysql, String database, String sql) throws Exception {
+ var result = mysql.execInContainer("sh", "-lc", mysqlCliCommand(TEST_USERNAME, TEST_PASSWORD, database, sql));
+ if (result.getExitCode() != 0) {
+ throw new IllegalStateException("user mysql command failed: " + result.getStderr());
+ }
+ }
+
+ private String mysqlCliCommand(String username, String password, String database, String sql) {
+ return String.join(" ",
+ "CLIENT=$(command -v mysql || command -v mariadb)",
+ "&&",
+ "$CLIENT --protocol=TCP -h127.0.0.1 -P3306",
+ "-u" + username,
+ "-p" + password,
+ database,
+ "-e",
+ "\"" + sql.replace("\"", "\\\"") + "\"");
+ }
+
+ private static final class CapturingCollectDataDispatch implements CollectDataDispatch {
+
+ private CollectRep.MetricsData metricsData;
+
+ @Override
+ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.MetricsData metricsData) {
+ this.metricsData = metricsData;
+ }
+
+ @Override
+ public void dispatchCollectData(Timeout timeout, Metrics metrics, List metricsDataList) {
+ if (metricsDataList != null && !metricsDataList.isEmpty()) {
+ this.metricsData = metricsDataList.getFirst();
+ }
+ }
+ }
+
+ private record StubTimeout(WheelTimerTask wheelTimerTask) implements Timeout {
+
+ @Override
+ public org.apache.hertzbeat.common.timer.Timer timer() {
+ return null;
+ }
+
+ @Override
+ public org.apache.hertzbeat.common.timer.TimerTask task() {
+ return wheelTimerTask;
+ }
+
+ @Override
+ public boolean isExpired() {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean cancel() {
+ return false;
+ }
+ }
+}
diff --git a/hertzbeat-collector/hertzbeat-collector-collector/src/test/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlJdbcQueryParityIntegrationTest.java b/hertzbeat-collector/hertzbeat-collector-collector/src/test/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlJdbcQueryParityIntegrationTest.java
new file mode 100644
index 00000000000..650e6e92c5c
--- /dev/null
+++ b/hertzbeat-collector/hertzbeat-collector-collector/src/test/java/org/apache/hertzbeat/collector/collect/database/mysql/MysqlJdbcQueryParityIntegrationTest.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.database.mysql;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.lang.reflect.Field;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hertzbeat.collector.collect.strategy.CollectStrategyFactory;
+import org.apache.hertzbeat.collector.collect.database.query.JdbcQueryExecutorRegistry;
+import org.apache.hertzbeat.collector.dispatch.CollectDataDispatch;
+import org.apache.hertzbeat.collector.dispatch.MetricsCollect;
+import org.apache.hertzbeat.collector.mysql.r2dbc.MysqlR2dbcConnectionFactoryProvider;
+import org.apache.hertzbeat.collector.mysql.r2dbc.MysqlR2dbcQueryExecutor;
+import org.apache.hertzbeat.collector.mysql.r2dbc.ResultSetMapper;
+import org.apache.hertzbeat.collector.mysql.r2dbc.SqlGuard;
+import org.apache.hertzbeat.collector.timer.WheelTimerTask;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.job.protocol.JdbcProtocol;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.timer.Timeout;
+import org.apache.hertzbeat.common.util.JsonUtil;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DynamicTest;
+import org.junit.jupiter.api.TestFactory;
+import org.junit.jupiter.api.TestInstance;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+import org.yaml.snakeyaml.Yaml;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class MysqlJdbcQueryParityIntegrationTest {
+
+ private static final String TEST_DATABASE = "hzb";
+ private static final String TEST_USERNAME = "test";
+ private static final String TEST_PASSWORD = "test123";
+ private static final String ROOT_PASSWORD = "root123";
+ private static final String PARITY_TABLE = "collector_parity_metrics";
+
+ private Metrics basicTemplateMetric;
+
+ @BeforeAll
+ void setUp() throws Exception {
+ Assumptions.assumeTrue(DockerClientFactory.instance().isDockerAvailable(), "Docker is required for integration tests");
+ new CollectStrategyFactory().run();
+ assertDoesNotThrow(() -> Class.forName("com.mysql.cj.jdbc.Driver"));
+ basicTemplateMetric = loadMysqlTemplate().getMetrics().stream()
+ .filter(metric -> "basic".equals(metric.getName()))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("Unable to locate the basic metric in app-mysql.yml"));
+ }
+
+ @AfterEach
+ void clearRegisteredExecutors() throws Exception {
+ Field executorsField = JdbcQueryExecutorRegistry.class.getDeclaredField("EXECUTORS");
+ executorsField.setAccessible(true);
+ @SuppressWarnings("unchecked")
+ CopyOnWriteArrayList
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ 3.5.0
+
+
+ require-java-25
+ validate
+
+ enforce
+
+
+
+
+ [25,)
+ Apache HertzBeat now requires JDK 25 or newer. Please switch JAVA_HOME before building or testing.
+
+
+
+
+
+
org.apache.maven.plugins
diff --git a/script/application.yml b/script/application.yml
index ecebc66681f..90c0ea4bb8e 100644
--- a/script/application.yml
+++ b/script/application.yml
@@ -336,6 +336,13 @@ grafana:
password: admin
hertzbeat:
+ collector:
+ mysql:
+ # MySQL-compatible query engine routing for MySQL, MariaDB, OceanBase, and TiDB SQL metrics.
+ # auto : prefer JDBC only when mysql-connector-j is available from ext-lib, otherwise use the built-in query engine
+ # jdbc : always use JDBC
+ # r2dbc : always use the built-in query engine
+ query-engine: ${HERTZBEAT_COLLECTOR_MYSQL_QUERY_ENGINE:auto}
# Optional virtual-thread overrides. Remove this whole block to use built-in defaults.
vthreads:
enabled: true
diff --git a/script/docker-compose/README.md b/script/docker-compose/README.md
index 58a90a98bca..62b6daf2e4f 100644
--- a/script/docker-compose/README.md
+++ b/script/docker-compose/README.md
@@ -2,9 +2,15 @@
Suggest the [HertzBeat + GreptimeDB + Postgresql Solution](hertzbeat-postgresql-greptimedb) for the best performance and stability.
+Notes:
+
+- MySQL, MariaDB, OceanBase, and TiDB SQL query metrics can use the built-in MySQL-compatible query engine without `mysql-connector-j`.
+- If you place `mysql-connector-j` in `ext-lib`, HertzBeat prefers JDBC after restart.
+- Oracle and DB2 still require external JDBC driver jars in `ext-lib`.
+
+
- Use Postgresql + GreptimeDB as HertzBeat dependent storage -> [HertzBeat+PostgreSQL+GreptimeDB Solution](hertzbeat-postgresql-greptimedb)
- Use Postgresql + VictoriaMetrics as HertzBeat dependent storage -> [HertzBeat+PostgreSQL+VictoriaMetrics Solution](hertzbeat-postgresql-victoria-metrics)
- Use Mysql + VictoriaMetrics as HertzBeat dependent storage -> [HertzBeat+Mysql+VictoriaMetrics Solution](hertzbeat-mysql-victoria-metrics)
- Use Mysql + IoTDB as HertzBeat dependent storage -> [HertzBeat+Mysql+IoTDB Solution](hertzbeat-mysql-iotdb)
- Use Mysql + Tdengine as HertzBeat dependent storage -> [HertzBeat+Mysql+Tdengine Solution](hertzbeat-mysql-tdengine)
-
diff --git a/script/docker-compose/hertzbeat-mysql-iotdb/README.md b/script/docker-compose/hertzbeat-mysql-iotdb/README.md
index 681e5f863f5..59ed05e9c1b 100644
--- a/script/docker-compose/hertzbeat-mysql-iotdb/README.md
+++ b/script/docker-compose/hertzbeat-mysql-iotdb/README.md
@@ -17,10 +17,11 @@
1. Download the hertzbeat-docker-compose installation deployment script file
The script file is located in `script/docker-compose/hertzbeat-mysql-iotdb` link [script/docker-compose](https://github.com/apache/hertzbeat/tree/master/script/docker-compose/ hertzbeat-mysql-iotdb)
-2. Add MYSQL jdbc driver jar
+2. Optional: add external JDBC driver jars to `ext-lib`
- Download the MYSQL jdbc driver jar package, such as mysql-connector-java-8.0.25.jar. https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.25.zip
- Copy the jar package to the ext-lib directory.
+ MySQL-compatible monitoring can use the built-in query engine directly, so `mysql-connector-j` is optional.
+ If you want HertzBeat to prefer JDBC after restart, place `mysql-connector-j` in `ext-lib`.
+ Oracle and DB2 still require external JDBC jars in `ext-lib`.
3. Enter the deployment script docker-compose directory, execute
diff --git a/script/docker-compose/hertzbeat-mysql-iotdb/README_CN.md b/script/docker-compose/hertzbeat-mysql-iotdb/README_CN.md
index dabeed00e6b..3bfe5b8f758 100644
--- a/script/docker-compose/hertzbeat-mysql-iotdb/README_CN.md
+++ b/script/docker-compose/hertzbeat-mysql-iotdb/README_CN.md
@@ -19,9 +19,10 @@
1. 下载hertzbeat-docker-compose安装部署脚本文件
脚本文件位于代码仓库下`script/docker-compose/hertzbeat-mysql-iotdb` 链接 [script/docker-compose](https://github.com/apache/hertzbeat/tree/master/script/docker-compose/hertzbeat-mysql-iotdb)
-2. 添加 MYSQL jdbc 驱动 jar
- 下载 MYSQL jdbc driver jar, 例如 mysql-connector-java-8.0.25.jar. https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.25.zip
- 将此 jar 包拷贝放入 ext-lib 目录下.
+2. 可选:向 `ext-lib` 添加外部 JDBC 驱动 jar
+ MySQL 兼容监控现在可以直接使用内置查询引擎,所以 `mysql-connector-j` 不是必需项。
+ 如果你希望 HertzBeat 在重启后优先走 JDBC,可以把 `mysql-connector-j` 放到 `ext-lib`。
+ Oracle、DB2 这类场景仍然需要把外部 JDBC 驱动放到 `ext-lib`。
3. 进入部署脚本 docker-compose 目录, 执行
diff --git a/script/docker-compose/hertzbeat-mysql-iotdb/conf/application.yml b/script/docker-compose/hertzbeat-mysql-iotdb/conf/application.yml
index c4c9d80797e..33b0537468e 100644
--- a/script/docker-compose/hertzbeat-mysql-iotdb/conf/application.yml
+++ b/script/docker-compose/hertzbeat-mysql-iotdb/conf/application.yml
@@ -236,6 +236,13 @@ grafana:
password: admin
hertzbeat:
+ collector:
+ mysql:
+ # MySQL-compatible query engine routing for MySQL, MariaDB, OceanBase, and TiDB SQL metrics.
+ # auto : prefer JDBC only when mysql-connector-j is available from ext-lib, otherwise use the built-in query engine
+ # jdbc : always use JDBC
+ # r2dbc : always use the built-in query engine
+ query-engine: ${HERTZBEAT_COLLECTOR_MYSQL_QUERY_ENGINE:auto}
# Optional virtual-thread overrides. Remove this whole block to use built-in defaults.
vthreads:
enabled: true
diff --git a/script/docker-compose/hertzbeat-mysql-iotdb/docker-compose.yaml b/script/docker-compose/hertzbeat-mysql-iotdb/docker-compose.yaml
index 9a1af1589e8..472cca7f2c8 100644
--- a/script/docker-compose/hertzbeat-mysql-iotdb/docker-compose.yaml
+++ b/script/docker-compose/hertzbeat-mysql-iotdb/docker-compose.yaml
@@ -68,6 +68,7 @@ services:
hostname: hertzbeat
restart: always
environment:
+ HERTZBEAT_COLLECTOR_MYSQL_QUERY_ENGINE: auto
TZ: Asia/Shanghai
LANG: zh_CN.UTF-8
depends_on:
diff --git a/script/docker-compose/hertzbeat-mysql-iotdb/ext-lib/README b/script/docker-compose/hertzbeat-mysql-iotdb/ext-lib/README
index 5898fde6b91..7e270434ca3 100644
--- a/script/docker-compose/hertzbeat-mysql-iotdb/ext-lib/README
+++ b/script/docker-compose/hertzbeat-mysql-iotdb/ext-lib/README
@@ -13,9 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-Please move external libs to this folder like:
+Please move external libs to this folder only when you need external JDBC jars, for example:
ojdbc8-21.5.0.0.jar
orai18n-21.5.0.0.jar
mysql-connector-java-8.0.30.jar
+Notes:
+
+- MySQL, MariaDB, OceanBase, and TiDB SQL query metrics can use the built-in MySQL-compatible query engine without `mysql-connector-j`.
+- If `mysql-connector-j` is present here, HertzBeat prefers JDBC after restart.
+- Oracle and DB2 still require external JDBC jars in `ext-lib`.
diff --git a/script/docker-compose/hertzbeat-mysql-tdengine/README.md b/script/docker-compose/hertzbeat-mysql-tdengine/README.md
index c5bd858883a..51278996514 100644
--- a/script/docker-compose/hertzbeat-mysql-tdengine/README.md
+++ b/script/docker-compose/hertzbeat-mysql-tdengine/README.md
@@ -17,10 +17,11 @@
1. Download the hertzbeat-docker-compose installation deployment script file
The script file is located in `script/docker-compose/hertzbeat-mysql-tdengine` link [script/docker-compose](https://github.com/apache/hertzbeat/tree/master/script/docker-compose/hertzbeat-mysql-tdengine)
-2. Add MYSQL jdbc driver jar
+2. Optional: add external JDBC driver jars to `ext-lib`
- Download the MYSQL jdbc driver jar package, such as mysql-connector-java-8.0.25.jar. https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.25.zip
- Copy the jar package to the ext-lib directory.
+ MySQL-compatible monitoring can use the built-in query engine directly, so `mysql-connector-j` is optional.
+ If you want HertzBeat to prefer JDBC after restart, place `mysql-connector-j` in `ext-lib`.
+ Oracle and DB2 still require external JDBC jars in `ext-lib`.
3. Enter the deployment script docker-compose directory, execute
diff --git a/script/docker-compose/hertzbeat-mysql-tdengine/README_CN.md b/script/docker-compose/hertzbeat-mysql-tdengine/README_CN.md
index a45144cdae2..ccc8c0f6a85 100644
--- a/script/docker-compose/hertzbeat-mysql-tdengine/README_CN.md
+++ b/script/docker-compose/hertzbeat-mysql-tdengine/README_CN.md
@@ -19,9 +19,10 @@
1. 下载hertzbeat-docker-compose安装部署脚本文件
脚本文件位于代码仓库下`script/docker-compose/hertzbeat-mysql-tdengine` 链接 [script/docker-compose](https://github.com/apache/hertzbeat/tree/master/script/docker-compose/hertzbeat-mysql-tdengine)
-2. 添加 MYSQL jdbc 驱动 jar
- 下载 MYSQL jdbc driver jar, 例如 mysql-connector-java-8.0.25.jar. https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.25.zip
- 将此 jar 包拷贝放入 ext-lib 目录下.
+2. 可选:向 `ext-lib` 添加外部 JDBC 驱动 jar
+ MySQL 兼容监控现在可以直接使用内置查询引擎,所以 `mysql-connector-j` 不是必需项。
+ 如果你希望 HertzBeat 在重启后优先走 JDBC,可以把 `mysql-connector-j` 放到 `ext-lib`。
+ Oracle、DB2 这类场景仍然需要把外部 JDBC 驱动放到 `ext-lib`。
3. 进入部署脚本 docker-compose 目录, 执行
diff --git a/script/docker-compose/hertzbeat-mysql-tdengine/conf/application.yml b/script/docker-compose/hertzbeat-mysql-tdengine/conf/application.yml
index 8a6dfe1528f..8d795ed6434 100644
--- a/script/docker-compose/hertzbeat-mysql-tdengine/conf/application.yml
+++ b/script/docker-compose/hertzbeat-mysql-tdengine/conf/application.yml
@@ -233,6 +233,13 @@ grafana:
password: admin
hertzbeat:
+ collector:
+ mysql:
+ # MySQL-compatible query engine routing for MySQL, MariaDB, OceanBase, and TiDB SQL metrics.
+ # auto : prefer JDBC only when mysql-connector-j is available from ext-lib, otherwise use the built-in query engine
+ # jdbc : always use JDBC
+ # r2dbc : always use the built-in query engine
+ query-engine: ${HERTZBEAT_COLLECTOR_MYSQL_QUERY_ENGINE:auto}
# Optional virtual-thread overrides. Remove this whole block to use built-in defaults.
vthreads:
enabled: true
diff --git a/script/docker-compose/hertzbeat-mysql-tdengine/docker-compose.yaml b/script/docker-compose/hertzbeat-mysql-tdengine/docker-compose.yaml
index 33afa63dadd..83e151bb550 100644
--- a/script/docker-compose/hertzbeat-mysql-tdengine/docker-compose.yaml
+++ b/script/docker-compose/hertzbeat-mysql-tdengine/docker-compose.yaml
@@ -67,6 +67,7 @@ services:
hostname: hertzbeat
restart: always
environment:
+ HERTZBEAT_COLLECTOR_MYSQL_QUERY_ENGINE: auto
TZ: Asia/Shanghai
LANG: zh_CN.UTF-8
depends_on:
diff --git a/script/docker-compose/hertzbeat-mysql-tdengine/ext-lib/README b/script/docker-compose/hertzbeat-mysql-tdengine/ext-lib/README
index 5898fde6b91..7e270434ca3 100644
--- a/script/docker-compose/hertzbeat-mysql-tdengine/ext-lib/README
+++ b/script/docker-compose/hertzbeat-mysql-tdengine/ext-lib/README
@@ -13,9 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-Please move external libs to this folder like:
+Please move external libs to this folder only when you need external JDBC jars, for example:
ojdbc8-21.5.0.0.jar
orai18n-21.5.0.0.jar
mysql-connector-java-8.0.30.jar
+Notes:
+
+- MySQL, MariaDB, OceanBase, and TiDB SQL query metrics can use the built-in MySQL-compatible query engine without `mysql-connector-j`.
+- If `mysql-connector-j` is present here, HertzBeat prefers JDBC after restart.
+- Oracle and DB2 still require external JDBC jars in `ext-lib`.
diff --git a/script/docker-compose/hertzbeat-mysql-victoria-metrics/README.md b/script/docker-compose/hertzbeat-mysql-victoria-metrics/README.md
index b2801a0e0f0..0b05f4750b5 100644
--- a/script/docker-compose/hertzbeat-mysql-victoria-metrics/README.md
+++ b/script/docker-compose/hertzbeat-mysql-victoria-metrics/README.md
@@ -17,10 +17,11 @@
1. Download the hertzbeat-docker-compose installation deployment script file
The script file is located in `script/docker-compose/hertzbeat-mysql-victoria-metrics` link [script/docker-compose](https://github.com/apache/hertzbeat/tree/master/script/docker-compose/hertzbeat-mysql-victoria-metrics)
-2. Add MYSQL jdbc driver jar
+2. Optional: add external JDBC driver jars to `ext-lib`
- Download the MYSQL jdbc driver jar package, such as mysql-connector-java-8.0.25.jar. https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.25.zip
- Copy the jar package to the ext-lib directory.
+ MySQL-compatible monitoring can use the built-in query engine directly, so `mysql-connector-j` is optional.
+ If you want HertzBeat to prefer JDBC after restart, place `mysql-connector-j` in `ext-lib`.
+ Oracle and DB2 still require external JDBC jars in `ext-lib`.
3. Enter the deployment script docker-compose directory, execute
diff --git a/script/docker-compose/hertzbeat-mysql-victoria-metrics/README_CN.md b/script/docker-compose/hertzbeat-mysql-victoria-metrics/README_CN.md
index 6c8e91bc4c0..3ee6c4942af 100644
--- a/script/docker-compose/hertzbeat-mysql-victoria-metrics/README_CN.md
+++ b/script/docker-compose/hertzbeat-mysql-victoria-metrics/README_CN.md
@@ -19,9 +19,10 @@
1. 下载hertzbeat-docker-compose安装部署脚本文件
脚本文件位于代码仓库下`script/docker-compose/hertzbeat-mysql-victoria-metrics` 链接 [script/docker-compose](https://github.com/apache/hertzbeat/tree/master/script/docker-compose/hertzbeat-mysql-victoria-metrics)
-2. 添加 MYSQL jdbc 驱动 jar
- 下载 MYSQL jdbc driver jar, 例如 mysql-connector-java-8.0.25.jar. https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.25.zip
- 将此 jar 包拷贝放入 ext-lib 目录下.
+2. 可选:向 `ext-lib` 添加外部 JDBC 驱动 jar
+ MySQL 兼容监控现在可以直接使用内置查询引擎,所以 `mysql-connector-j` 不是必需项。
+ 如果你希望 HertzBeat 在重启后优先走 JDBC,可以把 `mysql-connector-j` 放到 `ext-lib`。
+ Oracle、DB2 这类场景仍然需要把外部 JDBC 驱动放到 `ext-lib`。
3. 进入部署脚本 docker-compose 目录, 执行
diff --git a/script/docker-compose/hertzbeat-mysql-victoria-metrics/conf/application.yml b/script/docker-compose/hertzbeat-mysql-victoria-metrics/conf/application.yml
index d9118c6c560..f92abb9f989 100644
--- a/script/docker-compose/hertzbeat-mysql-victoria-metrics/conf/application.yml
+++ b/script/docker-compose/hertzbeat-mysql-victoria-metrics/conf/application.yml
@@ -236,6 +236,13 @@ grafana:
password: admin
hertzbeat:
+ collector:
+ mysql:
+ # MySQL-compatible query engine routing for MySQL, MariaDB, OceanBase, and TiDB SQL metrics.
+ # auto : prefer JDBC only when mysql-connector-j is available from ext-lib, otherwise use the built-in query engine
+ # jdbc : always use JDBC
+ # r2dbc : always use the built-in query engine
+ query-engine: ${HERTZBEAT_COLLECTOR_MYSQL_QUERY_ENGINE:auto}
# Optional virtual-thread overrides. Remove this whole block to use built-in defaults.
vthreads:
enabled: true
diff --git a/script/docker-compose/hertzbeat-mysql-victoria-metrics/docker-compose.yaml b/script/docker-compose/hertzbeat-mysql-victoria-metrics/docker-compose.yaml
index da4188bd442..acebf9889b6 100644
--- a/script/docker-compose/hertzbeat-mysql-victoria-metrics/docker-compose.yaml
+++ b/script/docker-compose/hertzbeat-mysql-victoria-metrics/docker-compose.yaml
@@ -67,6 +67,7 @@ services:
hostname: hertzbeat
restart: always
environment:
+ HERTZBEAT_COLLECTOR_MYSQL_QUERY_ENGINE: auto
TZ: Asia/Shanghai
LANG: zh_CN.UTF-8
depends_on:
diff --git a/script/docker-compose/hertzbeat-mysql-victoria-metrics/ext-lib/README b/script/docker-compose/hertzbeat-mysql-victoria-metrics/ext-lib/README
index 5898fde6b91..7e270434ca3 100644
--- a/script/docker-compose/hertzbeat-mysql-victoria-metrics/ext-lib/README
+++ b/script/docker-compose/hertzbeat-mysql-victoria-metrics/ext-lib/README
@@ -13,9 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-Please move external libs to this folder like:
+Please move external libs to this folder only when you need external JDBC jars, for example:
ojdbc8-21.5.0.0.jar
orai18n-21.5.0.0.jar
mysql-connector-java-8.0.30.jar
+Notes:
+
+- MySQL, MariaDB, OceanBase, and TiDB SQL query metrics can use the built-in MySQL-compatible query engine without `mysql-connector-j`.
+- If `mysql-connector-j` is present here, HertzBeat prefers JDBC after restart.
+- Oracle and DB2 still require external JDBC jars in `ext-lib`.
diff --git a/script/docker-compose/hertzbeat-postgresql-greptimedb/README.md b/script/docker-compose/hertzbeat-postgresql-greptimedb/README.md
index 0f4faa8def4..82e8291811f 100644
--- a/script/docker-compose/hertzbeat-postgresql-greptimedb/README.md
+++ b/script/docker-compose/hertzbeat-postgresql-greptimedb/README.md
@@ -18,7 +18,13 @@
The script file is located in `script/docker-compose/hertzbeat-postgresql-greptimedb` link [script/docker-compose](https://github.com/apache/hertzbeat/tree/master/script/docker-compose/hertzbeat-postgresql-greptimedb)
-2. Enter the deployment script docker-compose directory, execute
+2. Optional: add external JDBC driver jars to `ext-lib`
+
+ MySQL-compatible monitoring can use the built-in query engine directly, so `mysql-connector-j` is optional.
+ If you want HertzBeat to prefer JDBC after restart, place `mysql-connector-j` in `ext-lib`.
+ Oracle and DB2 still require external JDBC jars in `ext-lib`.
+
+3. Enter the deployment script docker-compose directory, execute
`docker compose up -d`
diff --git a/script/docker-compose/hertzbeat-postgresql-greptimedb/README_CN.md b/script/docker-compose/hertzbeat-postgresql-greptimedb/README_CN.md
index 5815eb70050..60347ce8732 100644
--- a/script/docker-compose/hertzbeat-postgresql-greptimedb/README_CN.md
+++ b/script/docker-compose/hertzbeat-postgresql-greptimedb/README_CN.md
@@ -20,7 +20,12 @@
脚本文件位于代码仓库下`script/docker-compose/hertzbeat-postgresql-greptimedb` 链接 [script/docker-compose](https://github.com/apache/hertzbeat/tree/master/script/docker-compose/hertzbeat-postgresql-greptimedb)
-2. 进入部署脚本 docker-compose 目录, 执行
+2. 可选:向 `ext-lib` 添加外部 JDBC 驱动 jar
+ MySQL 兼容监控现在可以直接使用内置查询引擎,所以 `mysql-connector-j` 不是必需项。
+ 如果你希望 HertzBeat 在重启后优先走 JDBC,可以把 `mysql-connector-j` 放到 `ext-lib`。
+ Oracle、DB2 这类场景仍然需要把外部 JDBC 驱动放到 `ext-lib`。
+
+3. 进入部署脚本 docker-compose 目录, 执行
`docker compose up -d`
diff --git a/script/docker-compose/hertzbeat-postgresql-greptimedb/conf/application.yml b/script/docker-compose/hertzbeat-postgresql-greptimedb/conf/application.yml
index 0a1feb682dc..6cc14900303 100644
--- a/script/docker-compose/hertzbeat-postgresql-greptimedb/conf/application.yml
+++ b/script/docker-compose/hertzbeat-postgresql-greptimedb/conf/application.yml
@@ -233,6 +233,13 @@ grafana:
password: admin
hertzbeat:
+ collector:
+ mysql:
+ # MySQL-compatible query engine routing for MySQL, MariaDB, OceanBase, and TiDB SQL metrics.
+ # auto : prefer JDBC only when mysql-connector-j is available from ext-lib, otherwise use the built-in query engine
+ # jdbc : always use JDBC
+ # r2dbc : always use the built-in query engine
+ query-engine: ${HERTZBEAT_COLLECTOR_MYSQL_QUERY_ENGINE:auto}
# Optional virtual-thread overrides. Remove this whole block to use built-in defaults.
vthreads:
enabled: true
diff --git a/script/docker-compose/hertzbeat-postgresql-greptimedb/docker-compose.yaml b/script/docker-compose/hertzbeat-postgresql-greptimedb/docker-compose.yaml
index be1603c409d..2c3782607ba 100644
--- a/script/docker-compose/hertzbeat-postgresql-greptimedb/docker-compose.yaml
+++ b/script/docker-compose/hertzbeat-postgresql-greptimedb/docker-compose.yaml
@@ -87,6 +87,7 @@ services:
hostname: hertzbeat
restart: always
environment:
+ HERTZBEAT_COLLECTOR_MYSQL_QUERY_ENGINE: auto
TZ: Asia/Shanghai
LANG: zh_CN.UTF-8
depends_on:
diff --git a/script/docker-compose/hertzbeat-postgresql-greptimedb/ext-lib/README b/script/docker-compose/hertzbeat-postgresql-greptimedb/ext-lib/README
index 5898fde6b91..7e270434ca3 100644
--- a/script/docker-compose/hertzbeat-postgresql-greptimedb/ext-lib/README
+++ b/script/docker-compose/hertzbeat-postgresql-greptimedb/ext-lib/README
@@ -13,9 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-Please move external libs to this folder like:
+Please move external libs to this folder only when you need external JDBC jars, for example:
ojdbc8-21.5.0.0.jar
orai18n-21.5.0.0.jar
mysql-connector-java-8.0.30.jar
+Notes:
+
+- MySQL, MariaDB, OceanBase, and TiDB SQL query metrics can use the built-in MySQL-compatible query engine without `mysql-connector-j`.
+- If `mysql-connector-j` is present here, HertzBeat prefers JDBC after restart.
+- Oracle and DB2 still require external JDBC jars in `ext-lib`.
diff --git a/script/docker-compose/hertzbeat-postgresql-victoria-metrics/README.md b/script/docker-compose/hertzbeat-postgresql-victoria-metrics/README.md
index 835c280e5e9..a999460c94a 100644
--- a/script/docker-compose/hertzbeat-postgresql-victoria-metrics/README.md
+++ b/script/docker-compose/hertzbeat-postgresql-victoria-metrics/README.md
@@ -18,7 +18,13 @@
The script file is located in `script/docker-compose/hertzbeat-postgresql-victoria-metrics` link [script/docker-compose](https://github.com/apache/hertzbeat/tree/master/script/docker-compose/hertzbeat-postgresql-victoria-metrics)
-2. Enter the deployment script docker-compose directory, execute
+2. Optional: add external JDBC driver jars to `ext-lib`
+
+ MySQL-compatible monitoring can use the built-in query engine directly, so `mysql-connector-j` is optional.
+ If you want HertzBeat to prefer JDBC after restart, place `mysql-connector-j` in `ext-lib`.
+ Oracle and DB2 still require external JDBC jars in `ext-lib`.
+
+3. Enter the deployment script docker-compose directory, execute
`docker compose up -d`
diff --git a/script/docker-compose/hertzbeat-postgresql-victoria-metrics/README_CN.md b/script/docker-compose/hertzbeat-postgresql-victoria-metrics/README_CN.md
index c0e2cf9915f..dc9c386147c 100644
--- a/script/docker-compose/hertzbeat-postgresql-victoria-metrics/README_CN.md
+++ b/script/docker-compose/hertzbeat-postgresql-victoria-metrics/README_CN.md
@@ -20,7 +20,12 @@
脚本文件位于代码仓库下`script/docker-compose/hertzbeat-postgre-victoria-metrics` 链接 [script/docker-compose](https://github.com/apache/hertzbeat/tree/master/script/docker-compose/hertzbeat-postgresql-victoria-metrics)
-2. 进入部署脚本 docker-compose 目录, 执行
+2. 可选:向 `ext-lib` 添加外部 JDBC 驱动 jar
+ MySQL 兼容监控现在可以直接使用内置查询引擎,所以 `mysql-connector-j` 不是必需项。
+ 如果你希望 HertzBeat 在重启后优先走 JDBC,可以把 `mysql-connector-j` 放到 `ext-lib`。
+ Oracle、DB2 这类场景仍然需要把外部 JDBC 驱动放到 `ext-lib`。
+
+3. 进入部署脚本 docker-compose 目录, 执行
`docker compose up -d`
diff --git a/script/docker-compose/hertzbeat-postgresql-victoria-metrics/conf/application.yml b/script/docker-compose/hertzbeat-postgresql-victoria-metrics/conf/application.yml
index ed4c59bcbb4..3108f5b9944 100644
--- a/script/docker-compose/hertzbeat-postgresql-victoria-metrics/conf/application.yml
+++ b/script/docker-compose/hertzbeat-postgresql-victoria-metrics/conf/application.yml
@@ -235,6 +235,13 @@ grafana:
password: admin
hertzbeat:
+ collector:
+ mysql:
+ # MySQL-compatible query engine routing for MySQL, MariaDB, OceanBase, and TiDB SQL metrics.
+ # auto : prefer JDBC only when mysql-connector-j is available from ext-lib, otherwise use the built-in query engine
+ # jdbc : always use JDBC
+ # r2dbc : always use the built-in query engine
+ query-engine: ${HERTZBEAT_COLLECTOR_MYSQL_QUERY_ENGINE:auto}
# Optional virtual-thread overrides. Remove this whole block to use built-in defaults.
vthreads:
enabled: true
diff --git a/script/docker-compose/hertzbeat-postgresql-victoria-metrics/docker-compose.yaml b/script/docker-compose/hertzbeat-postgresql-victoria-metrics/docker-compose.yaml
index fa28d86ea79..fda39244719 100644
--- a/script/docker-compose/hertzbeat-postgresql-victoria-metrics/docker-compose.yaml
+++ b/script/docker-compose/hertzbeat-postgresql-victoria-metrics/docker-compose.yaml
@@ -70,6 +70,7 @@ services:
hostname: hertzbeat
restart: always
environment:
+ HERTZBEAT_COLLECTOR_MYSQL_QUERY_ENGINE: auto
TZ: Asia/Shanghai
LANG: zh_CN.UTF-8
depends_on:
diff --git a/script/docker-compose/hertzbeat-postgresql-victoria-metrics/ext-lib/README b/script/docker-compose/hertzbeat-postgresql-victoria-metrics/ext-lib/README
index 5898fde6b91..7e270434ca3 100644
--- a/script/docker-compose/hertzbeat-postgresql-victoria-metrics/ext-lib/README
+++ b/script/docker-compose/hertzbeat-postgresql-victoria-metrics/ext-lib/README
@@ -13,9 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-Please move external libs to this folder like:
+Please move external libs to this folder only when you need external JDBC jars, for example:
ojdbc8-21.5.0.0.jar
orai18n-21.5.0.0.jar
mysql-connector-java-8.0.30.jar
+Notes:
+
+- MySQL, MariaDB, OceanBase, and TiDB SQL query metrics can use the built-in MySQL-compatible query engine without `mysql-connector-j`.
+- If `mysql-connector-j` is present here, HertzBeat prefers JDBC after restart.
+- Oracle and DB2 still require external JDBC jars in `ext-lib`.
diff --git a/script/ext-lib/README b/script/ext-lib/README
index 0afb3b3bbec..30800cd2328 100644
--- a/script/ext-lib/README
+++ b/script/ext-lib/README
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-Please move external libs to this folder for JVM-based server / collector packages, for example:
+Please move external libs to this folder only when you need JVM runtime extension, for example:
ojdbc8-21.5.0.0.jar
orai18n-21.5.0.0.jar
@@ -23,5 +23,7 @@ jcc-11.5.9.0.jar
Note:
- `ext-lib` is loaded by the JVM server package and the JVM collector package.
+- MySQL, MariaDB, OceanBase, and TiDB SQL query metrics can use the built-in MySQL-compatible query engine without `mysql-connector-j`.
+- If `mysql-connector-j` is present here, the JVM server package or JVM collector package prefers JDBC after restart.
- The native collector package does not support loading external JDBC driver jars from `ext-lib` at runtime.
-- If you need MySQL, OceanBase, Oracle, or DB2 monitoring with external JDBC drivers, use the JVM collector package.
+- If you need Oracle or DB2 monitoring, or you explicitly want the JDBC path for MySQL-compatible monitoring, use the JVM collector package.