From 56947fa8d6126a2e89b318e38ba854bc10b8f741 Mon Sep 17 00:00:00 2001 From: Pierluigi Grillo Date: Sun, 4 Nov 2018 19:01:18 +0100 Subject: [PATCH 1/3] Add MySQL connector --- build.sbt | 8 +++ ...it.agilelab.darwin.common.ConnectorCreator | 1 + .../connector/mysql/ConfigurationKeys.scala | 9 +++ .../connector/mysql/MySQLConnection.scala | 25 ++++++++ .../connector/mysql/MySQLConnector.scala | 61 +++++++++++++++++++ .../mysql/MySQLConnectorCreator.scala | 8 +++ mysql/src/test/resources/mysql.properties | 4 ++ mysql/src/test/resources/mysqlmock.avsc | 28 +++++++++ .../darwin/connector/mysql/MySQL2Mock.scala | 3 + .../connector/mysql/MySQLConnectorSuite.scala | 38 ++++++++++++ .../darwin/connector/mysql/MySQLMock.scala | 6 ++ project/Dependencies.scala | 2 + 12 files changed, 193 insertions(+) create mode 100644 mysql/src/main/resources/META-INF/services/it.agilelab.darwin.common.ConnectorCreator create mode 100644 mysql/src/main/scala/it/agilelab/darwin/connector/mysql/ConfigurationKeys.scala create mode 100644 mysql/src/main/scala/it/agilelab/darwin/connector/mysql/MySQLConnection.scala create mode 100644 mysql/src/main/scala/it/agilelab/darwin/connector/mysql/MySQLConnector.scala create mode 100644 mysql/src/main/scala/it/agilelab/darwin/connector/mysql/MySQLConnectorCreator.scala create mode 100644 mysql/src/test/resources/mysql.properties create mode 100644 mysql/src/test/resources/mysqlmock.avsc create mode 100644 mysql/src/test/scala/it/agilelab/darwin/connector/mysql/MySQL2Mock.scala create mode 100644 mysql/src/test/scala/it/agilelab/darwin/connector/mysql/MySQLConnectorSuite.scala create mode 100644 mysql/src/test/scala/it/agilelab/darwin/connector/mysql/MySQLMock.scala diff --git a/build.sbt b/build.sbt index ec2afd4..6b06536 100644 --- a/build.sbt +++ b/build.sbt @@ -38,6 +38,14 @@ lazy val hbaseConnector = Project("darwin-hbase-connector", file("hbase")) .settings(crossScalaVersions := Versions.crossScalaVersions) .enablePlugins(JavaAppPackaging) +lazy val mysqlConnector = Project("darwin-mysql-connector", file("mysql")) + .settings(Settings.commonSettings:_*) + .dependsOn(coreCommon) + .settings(pgpPassphrase := Settings.pgpPass) + .settings(libraryDependencies ++= Dependencies.mysql_conn_dep) + .settings(crossScalaVersions := Versions.crossScalaVersions) + .enablePlugins(JavaAppPackaging) + lazy val postgresConnector = Project("darwin-postgres-connector", file("postgres")) .settings(Settings.commonSettings:_*) .dependsOn(coreCommon) diff --git a/mysql/src/main/resources/META-INF/services/it.agilelab.darwin.common.ConnectorCreator b/mysql/src/main/resources/META-INF/services/it.agilelab.darwin.common.ConnectorCreator new file mode 100644 index 0000000..014d69a --- /dev/null +++ b/mysql/src/main/resources/META-INF/services/it.agilelab.darwin.common.ConnectorCreator @@ -0,0 +1 @@ +it.agilelab.darwin.connector.mysql.MySQLConnectorCreator diff --git a/mysql/src/main/scala/it/agilelab/darwin/connector/mysql/ConfigurationKeys.scala b/mysql/src/main/scala/it/agilelab/darwin/connector/mysql/ConfigurationKeys.scala new file mode 100644 index 0000000..79bb8cc --- /dev/null +++ b/mysql/src/main/scala/it/agilelab/darwin/connector/mysql/ConfigurationKeys.scala @@ -0,0 +1,9 @@ +package it.agilelab.darwin.connector.mysql + +object ConfigurationKeys { + val TABLE : String = "table" + val HOST : String = "host" + val DATABASE : String = "db" + val USER : String = "username" + val PASSWORD : String = "password" +} diff --git a/mysql/src/main/scala/it/agilelab/darwin/connector/mysql/MySQLConnection.scala b/mysql/src/main/scala/it/agilelab/darwin/connector/mysql/MySQLConnection.scala new file mode 100644 index 0000000..bdfb562 --- /dev/null +++ b/mysql/src/main/scala/it/agilelab/darwin/connector/mysql/MySQLConnection.scala @@ -0,0 +1,25 @@ +package it.agilelab.darwin.connector.mysql + +import java.sql.{Connection, DriverManager} + +import com.typesafe.config.Config + +trait MySQLConnection { + + private var connectionUrl : String = "" + private val driverName : String = "org.mariadb.jdbc.Driver" + + protected def setConnectionConfig(config : Config) = { + val db = config.getString(ConfigurationKeys.DATABASE) + val host = config.getString(ConfigurationKeys.HOST) + val user = config.getString(ConfigurationKeys.USER) + val password = config.getString(ConfigurationKeys.PASSWORD) + connectionUrl = s"jdbc:mysql://$host/$db?user=$user&password=$password" + } + + protected def getConnection: Connection = { + Class.forName(driverName) + val connection: Connection = DriverManager.getConnection(connectionUrl) + connection + } +} diff --git a/mysql/src/main/scala/it/agilelab/darwin/connector/mysql/MySQLConnector.scala b/mysql/src/main/scala/it/agilelab/darwin/connector/mysql/MySQLConnector.scala new file mode 100644 index 0000000..6a468a4 --- /dev/null +++ b/mysql/src/main/scala/it/agilelab/darwin/connector/mysql/MySQLConnector.scala @@ -0,0 +1,61 @@ +package it.agilelab.darwin.connector.mysql + +import java.sql.ResultSet + +import com.typesafe.config.Config +import it.agilelab.darwin.common.Connector +import org.apache.avro.Schema +import org.apache.avro.Schema.Parser + +class MySQLConnector(config: Config) extends Connector(config) with MySQLConnection { + + private def parser: Parser = new Parser() + + private val DEFAULT_TABLENAME = "SCHEMA_REPOSITORY" + + val TABLE_NAME: String = if (config.hasPath(ConfigurationKeys.TABLE)) { + config.getString(ConfigurationKeys.TABLE) + } else { + DEFAULT_TABLENAME + } + + setConnectionConfig(config) + + override def fullLoad(): Seq[(Long, Schema)] = { + val connection = getConnection + var schemas: Seq[(Long, Schema)] = Seq.empty[(Long, Schema)] + val statement = connection.createStatement() + val resultSet: ResultSet = statement.executeQuery(s"select * from $TABLE_NAME") + + while (resultSet.next()) { + val id = resultSet.getLong("id") + val schema = parser.parse(resultSet.getString("schema")) + schemas = schemas :+ (id -> schema) + } + connection.close + schemas + } + + override def insert(schemas: Seq[(Long, Schema)]): Unit = { + val connection = getConnection + try { + connection.setAutoCommit(false) + schemas.foreach { case (id, schema) => + val insertSchemaPS = connection.prepareStatement(s"INSERT INTO $TABLE_NAME (`id`,`schema`) VALUES (?,?)") + insertSchemaPS.setLong(1, id) + insertSchemaPS.setString(2, schema.toString) + insertSchemaPS.executeUpdate() + insertSchemaPS.close() + } + connection.commit + } catch { + case e: Exception => { + connection.rollback + // e.printStackTrace + throw e // should re-throw? + } + } finally { + connection.close + } + } +} diff --git a/mysql/src/main/scala/it/agilelab/darwin/connector/mysql/MySQLConnectorCreator.scala b/mysql/src/main/scala/it/agilelab/darwin/connector/mysql/MySQLConnectorCreator.scala new file mode 100644 index 0000000..3097f5d --- /dev/null +++ b/mysql/src/main/scala/it/agilelab/darwin/connector/mysql/MySQLConnectorCreator.scala @@ -0,0 +1,8 @@ +package it.agilelab.darwin.connector.mysql + +import com.typesafe.config.Config +import it.agilelab.darwin.common.{Connector, ConnectorCreator} + +class MySQLConnectorCreator extends ConnectorCreator { + override def create(config: Config): Connector = new MySQLConnector(config) +} diff --git a/mysql/src/test/resources/mysql.properties b/mysql/src/test/resources/mysql.properties new file mode 100644 index 0000000..f96f864 --- /dev/null +++ b/mysql/src/test/resources/mysql.properties @@ -0,0 +1,4 @@ +host = localhost:3306 +db = mysqldb +username = root +password = mysqlpwd \ No newline at end of file diff --git a/mysql/src/test/resources/mysqlmock.avsc b/mysql/src/test/resources/mysqlmock.avsc new file mode 100644 index 0000000..edaefe9 --- /dev/null +++ b/mysql/src/test/resources/mysqlmock.avsc @@ -0,0 +1,28 @@ +{ + "type" : "record", + "name" : "MySQLMock", + "namespace" : "it.agilelab.darwin.connector.mysql", + "fields" : [ { + "name" : "one", + "type" : "int" + }, { + "name" : "two", + "type" : "string" + }, { + "name" : "three", + "type" : "long" + }, { + "name" : "four", + "type" : { + "type" : "record", + "name" : "MySQL2Mock", + "fields" : [ { + "name" : "one", + "type" : "boolean" + }, { + "name" : "two", + "type" : "long" + } ] + } + } ] +} diff --git a/mysql/src/test/scala/it/agilelab/darwin/connector/mysql/MySQL2Mock.scala b/mysql/src/test/scala/it/agilelab/darwin/connector/mysql/MySQL2Mock.scala new file mode 100644 index 0000000..936b825 --- /dev/null +++ b/mysql/src/test/scala/it/agilelab/darwin/connector/mysql/MySQL2Mock.scala @@ -0,0 +1,3 @@ +package it.agilelab.darwin.connector.mysql + +case class MySQL2Mock(one: Boolean, two: Long) diff --git a/mysql/src/test/scala/it/agilelab/darwin/connector/mysql/MySQLConnectorSuite.scala b/mysql/src/test/scala/it/agilelab/darwin/connector/mysql/MySQLConnectorSuite.scala new file mode 100644 index 0000000..8698d99 --- /dev/null +++ b/mysql/src/test/scala/it/agilelab/darwin/connector/mysql/MySQLConnectorSuite.scala @@ -0,0 +1,38 @@ +package it.agilelab.darwin.connector.mysql + +import com.typesafe.config.ConfigFactory +import it.agilelab.darwin.common.Connector +import org.apache.avro.{Schema, SchemaNormalization} +import org.scalatest.{FlatSpec, Matchers} +import org.apache.avro.reflect.ReflectData + + +class MySQLConnectorSuite extends FlatSpec with Matchers { + val connector: Connector = new MySQLConnectorCreator().create(ConfigFactory.load("mysql.properties")) + + "MySQLConnector" should "load all existing schemas" in { + connector.fullLoad() + } + + it should "insert and retrieve" in { + val outerSchema = new Schema.Parser().parse(getClass.getClassLoader.getResourceAsStream("mysqlmock.avsc")) + val innerSchema = outerSchema.getField("four").schema() + + val schemas = Seq(innerSchema, outerSchema) + .map(s => SchemaNormalization.parsingFingerprint64(s) -> s) + connector.insert(schemas) + val loaded: Seq[(Long, Schema)] = connector.fullLoad() + assert(loaded.size == schemas.size) + assert(loaded.forall(schemas.contains)) + } + + it should "check schemas" in { + val outerSchema: Schema = ReflectData.get().getSchema(classOf[MySQLMock]) + val innerSchema: Schema = ReflectData.get().getSchema(classOf[MySQL2Mock]) + + val loaded: Seq[Schema] = connector.fullLoad().map(s => s._2) + + assert(loaded.contains(outerSchema)) + assert(loaded.contains(innerSchema)) + } +} diff --git a/mysql/src/test/scala/it/agilelab/darwin/connector/mysql/MySQLMock.scala b/mysql/src/test/scala/it/agilelab/darwin/connector/mysql/MySQLMock.scala new file mode 100644 index 0000000..2b0c89c --- /dev/null +++ b/mysql/src/test/scala/it/agilelab/darwin/connector/mysql/MySQLMock.scala @@ -0,0 +1,6 @@ +package it.agilelab.darwin.connector.mysql + +case class MySQLMock(one: Int, + two: String, + three: Long, + four: MySQL2Mock ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6c59a3b..9e781fe 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -18,10 +18,12 @@ object Dependencies { val spark_core = "org.apache.spark" %% "spark-core" % "2.3.0" % "provided" val spark_sql = "org.apache.spark" %% "spark-sql" % "2.3.0" % "provided" val postgres_conn = "org.postgresql" % "postgresql" % "9.3-1100-jdbc4" + val mysql_conn = "org.mariadb.jdbc" % "mariadb-java-client" % "2.3.0" val core_deps = Seq(scalatest, avro, typesafe_config) val mock_app_dep = core_deps ++ Seq(avro4s, reflections) val hbase_conn_dep = core_deps ++ Seq(hbase_common, hbase_server, hadoop_common) val postgres_conn_dep = core_deps :+ postgres_conn val spark_app = mock_app_dep ++ Seq(spark_core, spark_sql, hbase_common) + val mysql_conn_dep = core_deps :+ mysql_conn } From ec19a780fc20a540b9c1f8c02189f400225a3107 Mon Sep 17 00:00:00 2001 From: Pierluigi Grillo Date: Sun, 4 Nov 2018 19:02:17 +0100 Subject: [PATCH 2/3] Docker configuration to test MySQL connector --- Dockerfile | 42 +++++++++++++++++++++++++++++++++++++ init-db-and-launch-tests.sh | 18 ++++++++++++++++ 2 files changed, 60 insertions(+) create mode 100644 Dockerfile create mode 100644 init-db-and-launch-tests.sh diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..90be771 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,42 @@ +FROM mariadb:10.3 + +# Install OpenJDK-8 +RUN apt-get update && \ + apt-get install -y openjdk-8-jdk && \ + apt-get clean; + +# Fix certificate issues +RUN apt-get update && \ + apt-get install ca-certificates-java && \ + apt-get clean && \ + update-ca-certificates -f; + +# Setup JAVA_HOME -- useful for docker commandline +ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/ +RUN export JAVA_HOME + +ENV SBT_VERSION 1.2.6 + +RUN apt-get update && \ + apt-get install -y curl && \ + apt-get clean + +RUN curl -L -o sbt-$SBT_VERSION.deb http://dl.bintray.com/sbt/debian/sbt-$SBT_VERSION.deb && \ + dpkg -i sbt-$SBT_VERSION.deb && \ + rm sbt-$SBT_VERSION.deb && \ + apt-get update && \ + apt-get install sbt && \ + sbt sbtVersion + +# Install Git +RUN apt-get update && \ + apt-get install -y git && \ + apt-get clean; + +RUN git clone https://github.com/pier485/darwin.git + +WORKDIR /darwin + +RUN chmod 700 /darwin/init-db-and-launch-tests.sh + +#CMD sbt darwin-mysql-connector/test \ No newline at end of file diff --git a/init-db-and-launch-tests.sh b/init-db-and-launch-tests.sh new file mode 100644 index 0000000..4241c79 --- /dev/null +++ b/init-db-and-launch-tests.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +RET=1 +while [[ RET -ne 0 ]]; do + echo "=> Waiting for confirmation of MariaDB service startup" + sleep 5 + mysql -uroot -p${MYSQL_ROOT_PASSWORD} -e "status" > /dev/null 2>&1 + RET=$? +done + +echo "Create database and table"; + +mysql -uroot -p${MYSQL_ROOT_PASSWORD} -e "CREATE DATABASE IF NOT EXISTS mysqldb;" +mysql -uroot -p${MYSQL_ROOT_PASSWORD} -e "USE mysqldb; CREATE TABLE \`SCHEMA_REPOSITORY\` (\`id\` bigint(20) DEFAULT NULL, \`schema\` text NOT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;" + +echo "Launching darwin-mysql-connector tests" + +sbt darwin-mysql-connector/test \ No newline at end of file From eb6fdcbb7b91190b9a6f91cd70b7c496fce11f12 Mon Sep 17 00:00:00 2001 From: Pierluigi Grillo Date: Sun, 4 Nov 2018 19:20:47 +0100 Subject: [PATCH 3/3] Fix project branch in Dockerfile --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 90be771..c314a1b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -33,7 +33,7 @@ RUN apt-get update && \ apt-get install -y git && \ apt-get clean; -RUN git clone https://github.com/pier485/darwin.git +RUN git clone -b mysqlconnector https://github.com/pier485/darwin.git WORKDIR /darwin