Skip to content

Commit fb7a5f4

Browse files
[#106] Implement multi-connector (#107)
# New features and improvements - New multi-connector which fetches from multiple "normal" connectors, see related chapter in readme.md - APIs that consume InputStreams and that fail now return a left empty byte array when the input stream supports marking - Update sbt to 1.5.5 # Bug Fixes - fix a bug that did not restore the byte buffer endianness in case id extraction failed
1 parent abfeb3b commit fb7a5f4

File tree

14 files changed

+730
-54
lines changed

14 files changed

+730
-54
lines changed

README.md

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ In order to access to Darwin core functionalities add the core dependency to you
4646
#### sbt
4747
```scala
4848
libraryDependencies += "it.agilelab" %% "darwin-core" % "1.2.1-SNAPSHOT"
49-
```
49+
```
5050
#### maven
5151
```xml
5252
<dependency>
@@ -63,7 +63,7 @@ Then add the connector of your choice, either HBase:
6363
#### sbt
6464
```scala
6565
libraryDependencies += "it.agilelab" %% "darwin-hbase-connector" % "1.2.1-SNAPSHOT"
66-
```
66+
```
6767
#### maven
6868
```xml
6969
<dependency>
@@ -81,7 +81,7 @@ Or PostgreSql:
8181

8282
```scala
8383
libraryDependencies += "it.agilelab" %% "darwin-postgres-connector" % "1.2.1-SNAPSHOT"
84-
```
84+
```
8585
#### maven
8686
```xml
8787
<dependency>
@@ -100,7 +100,7 @@ Or Rest
100100

101101
```scala
102102
libraryDependencies += "it.agilelab" %% "darwin-rest-connector" % "1.2.1-SNAPSHOT"
103-
```
103+
```
104104
#### maven
105105
```xml
106106
<dependency>
@@ -122,7 +122,7 @@ Or Mock (only for test scenarios):
122122

123123
```scala
124124
libraryDependencies += "it.agilelab" %% "darwin-mock-connector" % "1.2.1-SNAPSHOT"
125-
```
125+
```
126126
#### maven
127127
```xml
128128
<dependency>
@@ -141,7 +141,7 @@ Darwin can be used as a *facade* over confluent schema registry.
141141

142142
```scala
143143
libraryDependencies += "it.agilelab" %% "darwin-confluent-connector" % "1.2.1-SNAPSHOT"
144-
```
144+
```
145145
#### maven
146146
```xml
147147
<dependency>
@@ -384,7 +384,6 @@ timeout = 5000
384384
## REST
385385

386386
The configuration keys managed by the `RestConnector` are:
387-
-
388387
- **protocol**: http or https
389388
- **host**: the hostname where rest-server (or an http proxy) is deployed
390389
- **port**: the port where rest-server (or an http proxy) is listening
@@ -526,3 +525,50 @@ Here is an example of configuration:
526525
"resources": ["schemas/Apple.avsc", "schemas/Orange.avsc"]
527526
"mode": "permissive"
528527
```
528+
529+
----
530+
531+
## Multi-Connector
532+
533+
Multi-connector can connect to multiple connectors in a hierarchical order. It is useful when schemas are registered on different datastore (i.e. confluent + hbase).
534+
535+
You configure it in the following way:
536+
537+
```
538+
darwin {
539+
type = "lazy"
540+
connector = "multi"
541+
registrar = "hbase"
542+
confluent-single-object-encoding: "confluent"
543+
standard-single-object-encoding: ["hbase", "mongo"]
544+
confluent {
545+
endpoints: ["http://schema-registry-00:7777", "http://schema-registry-01:7777"]
546+
max-cached-schemas: 1000
547+
}
548+
hbase {
549+
isSecure: false
550+
namespace: "DARWIN"
551+
table: "REPOSITORY"
552+
coreSite: "/etc/hadoop/conf/core-site.xml"
553+
hbaseSite: "/etc/hadoop/conf/hbase-site.xml"
554+
}
555+
mongo {
556+
username = "mongo"
557+
password = "mongo"
558+
host = ["localhost:12345"]
559+
database = "test"
560+
collection = "collection_test"
561+
timeout = 5000
562+
}
563+
}
564+
```
565+
566+
When extracting the schemaId, it will check if the single object encoding is "confluent" or "standard" way and extract the id.
567+
Given the id, it will go through the chain of connectors to find the schema: first confluent-single-object-encoding then
568+
standard-single-object-encoding **in order**.
569+
The first that matches, is the one that will be used.
570+
571+
In order to initialize the single connectors, a configuration will be created merging the specific part
572+
(i.e. hbase/mongo/confluent) with the outer layer: in case of duplicated entries the more specific one will be used.
573+
574+
Registration of the schema, will work with the connector set as registrar.

build.sbt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import sbt.Keys.baseDirectory
77
* See project/Dependencies.scala for the dependencies definitions.
88
* See project/Versions.scala for the versions definitions.
99
*/
10-
dynverVTagPrefix in ThisBuild := false
10+
ThisBuild / dynverVTagPrefix := false
1111

1212
lazy val root = Project("darwin", file("."))
1313
.settings(Settings.commonSettings: _*)
@@ -22,7 +22,8 @@ lazy val root = Project("darwin", file("."))
2222
mockApplication,
2323
restConnector,
2424
mongoConnector,
25-
confluentConnector
25+
confluentConnector,
26+
multiConnector
2627
)
2728

2829
lazy val core = Project("darwin-core", file("core"))
@@ -113,3 +114,12 @@ lazy val sparkApplication = Project("darwin-spark-application", file("spark-appl
113114
.settings(libraryDependencies ++= Dependencies.spark_app)
114115
.settings(crossScalaVersions := Seq(Versions.scala, Versions.scala_211))
115116
.settings(Settings.notPublishSettings)
117+
118+
lazy val multiConnector = Project("darwin-multi-connector", file("multi-connector"))
119+
.settings(Settings.commonSettings: _*)
120+
.dependsOn(coreCommon)
121+
.dependsOn(core)
122+
.dependsOn(mockConnector % Test)
123+
.dependsOn(confluentConnector % Test)
124+
.settings(crossScalaVersions := Versions.crossScalaVersions)
125+
.settings(libraryDependencies += Dependencies.scalatest)

common/src/main/scala/it/agilelab/darwin/common/compat/package.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,5 +98,12 @@ package object compat {
9898
case _ => self.asInstanceOf[Either[L, R1]]
9999
}
100100
}
101+
102+
def rightFlatMap[L1 >: L, R1](f: R => Either[L1, R1]): Either[L1, R1] = {
103+
self match {
104+
case Right(v) => f(v)
105+
case _ => self.asInstanceOf[Either[L1, R1]]
106+
}
107+
}
101108
}
102109
}

common/src/main/scala/it/agilelab/darwin/manager/util/AvroSingleObjectEncodingUtils.scala

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
package it.agilelab.darwin.manager.util
22

3-
import java.io.{ InputStream, OutputStream }
4-
import java.nio.{ ByteBuffer, ByteOrder }
5-
import java.util
6-
73
import it.agilelab.darwin.common.DarwinConcurrentHashMap
84
import it.agilelab.darwin.manager.exception.DarwinException
95
import it.agilelab.darwin.manager.util.ByteArrayUtils._
106
import org.apache.avro.Schema
117

8+
import java.io.{ InputStream, OutputStream }
9+
import java.nio.{ ByteBuffer, ByteOrder }
10+
import java.util
11+
1212
object AvroSingleObjectEncodingUtils {
13-
private val V1_HEADER = Array[Byte](0xc3.toByte, 0x01.toByte)
14-
private val ID_SIZE = 8
15-
private val HEADER_LENGTH = V1_HEADER.length + ID_SIZE
13+
val V1_HEADER: Array[Byte] = Array[Byte](0xc3.toByte, 0x01.toByte)
14+
private val ID_SIZE = 8
15+
private val HEADER_LENGTH = V1_HEADER.length + ID_SIZE
1616

1717
private val schemaMap = DarwinConcurrentHashMap.empty[Schema, Long]
1818

@@ -180,10 +180,13 @@ object AvroSingleObjectEncodingUtils {
180180
buf.getLong
181181
} else {
182182
val lastEndianness = buf.order()
183-
buf.order(endianness)
184-
val toRet = buf.getLong
185-
buf.order(lastEndianness)
186-
toRet
183+
try {
184+
buf.order(endianness)
185+
val toRet = buf.getLong
186+
toRet
187+
} finally {
188+
buf.order(lastEndianness)
189+
}
187190
}
188191
}
189192

@@ -219,15 +222,19 @@ object AvroSingleObjectEncodingUtils {
219222
if (inputStream.markSupported()) {
220223
inputStream.reset()
221224
inputStream.mark(0)
225+
Left(Array.emptyByteArray)
226+
} else {
227+
Left(buffer.slice(0, V1_HEADER.length))
222228
}
223-
Left(buffer.slice(0, V1_HEADER.length))
224229
}
225230
} else {
226231
if (inputStream.markSupported()) {
227232
inputStream.reset()
228233
inputStream.mark(0)
234+
Left(Array.emptyByteArray)
235+
} else {
236+
Left(buffer.slice(0, bytesReadMagicBytes))
229237
}
230-
Left(buffer.slice(0, bytesReadMagicBytes))
231238
}
232239
}
233240

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
1-
package it.agilelab.darwin.connector.confluent
2-
3-
import java.io.{ InputStream, OutputStream }
4-
import java.nio.{ ByteBuffer, ByteOrder }
5-
import java.util
1+
package it.agilelab.darwin.manager.util
62

73
import it.agilelab.darwin.common.DarwinConcurrentHashMap
84
import it.agilelab.darwin.manager.exception.DarwinException
9-
import it.agilelab.darwin.manager.util.ByteArrayUtils
105
import it.agilelab.darwin.manager.util.ByteArrayUtils.EnrichedInt
116
import org.apache.avro.Schema
127

8+
import java.io.{ InputStream, OutputStream }
9+
import java.nio.{ ByteBuffer, ByteOrder }
10+
import java.util
11+
1312
object ConfluentSingleObjectEncoding {
14-
private val V1_HEADER = Array[Byte](0x00.toByte)
13+
val V1_HEADER = Array[Byte](0x00.toByte)
1514
private val ID_SIZE = 4
1615
private val HEADER_LENGTH = V1_HEADER.length + ID_SIZE
1716

@@ -183,10 +182,13 @@ object ConfluentSingleObjectEncoding {
183182
buf.getInt
184183
} else {
185184
val lastEndianness = buf.order()
186-
buf.order(endianness)
187-
val toRet = buf.getInt()
188-
buf.order(lastEndianness)
189-
toRet
185+
try {
186+
buf.order(endianness)
187+
val toRet = buf.getInt()
188+
toRet
189+
} finally {
190+
buf.order(lastEndianness)
191+
}
190192
}
191193
}
192194

@@ -222,15 +224,19 @@ object ConfluentSingleObjectEncoding {
222224
if (inputStream.markSupported()) {
223225
inputStream.reset()
224226
inputStream.mark(0)
227+
Left(Array.emptyByteArray)
228+
} else {
229+
Left(buffer.slice(0, V1_HEADER.length))
225230
}
226-
Left(buffer.slice(0, V1_HEADER.length))
227231
}
228232
} else {
229233
if (inputStream.markSupported()) {
230234
inputStream.reset()
231235
inputStream.mark(0)
236+
Left(Array.emptyByteArray)
237+
} else {
238+
Left(buffer.slice(0, bytesReadMagicBytes))
232239
}
233-
Left(buffer.slice(0, bytesReadMagicBytes))
234240
}
235241
}
236242

common/src/test/scala/it/agilelab/darwin/manager/util/AvroSingleObjectEncodingUtilsSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,15 @@ abstract class AvroSingleObjectEncodingUtilsSpec(val endianness: ByteOrder) exte
6565
"extractId(InputStream)" should "return Left if the input stream has only one byte" in {
6666
val stream = new ByteArrayInputStream(Array(Random.nextInt().toByte))
6767
val id = AvroSingleObjectEncodingUtils.extractId(stream, endianness)
68-
id.left.map(_.length == 1) should be(Left(true))
68+
id.left.map(_.length == 0) should be(Left(true))
6969
stream.read() should not be (-1)
7070
stream.read() should be(-1)
7171
}
7272

7373
"extractId(InputStream)" should "return Left if the input stream does not have the expected header" in {
7474
val stream = new ByteArrayInputStream(Array(0xc3.toByte, 0x02.toByte))
7575
val id = AvroSingleObjectEncodingUtils.extractId(stream, endianness)
76-
id.left.map(_.sameElements(Array(0xc3.toByte, 0x02.toByte))) should be(Left(true))
76+
id.left.map(_.length == 0) should be(Left(true))
7777
stream.read().toByte should be(0xc3.toByte)
7878
stream.read().toByte should be(0x02.toByte)
7979
stream.read() should be(-1)
Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
1-
package it.agilelab.darwin.connector.confluent
2-
3-
import java.io.{ ByteArrayInputStream, ByteArrayOutputStream }
4-
import java.nio.{ BufferUnderflowException, ByteBuffer, ByteOrder }
5-
import java.util
1+
package it.agilelab.darwin.manager.util
62

3+
import it.agilelab.darwin.common.compat._
74
import it.agilelab.darwin.manager.util.ByteArrayUtils._
8-
import org.apache.avro.{ Schema, SchemaNormalization }
95
import org.apache.avro.generic.{ GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord }
106
import org.apache.avro.io.{ DecoderFactory, EncoderFactory }
117
import org.apache.avro.util.ByteBufferInputStream
12-
13-
import scala.util.Random
8+
import org.apache.avro.{ Schema, SchemaNormalization }
149
import org.scalatest.flatspec.AnyFlatSpec
1510
import org.scalatest.matchers.should.Matchers
16-
import it.agilelab.darwin.common.compat._
11+
12+
import java.io.{ ByteArrayInputStream, ByteArrayOutputStream }
13+
import java.nio.{ BufferUnderflowException, ByteBuffer, ByteOrder }
14+
import java.util
15+
import scala.util.Random
1716

1817
abstract class ConfluentAvroSingleObjectEncodingSpec(val endianness: ByteOrder) extends AnyFlatSpec with Matchers {
1918
val sizeOfBuffer = 200
@@ -63,17 +62,17 @@ abstract class ConfluentAvroSingleObjectEncodingSpec(val endianness: ByteOrder)
6362
}
6463

6564
"extractId(InputStream)" should "return Left if the input stream has only one byte" in {
66-
val stream = new ByteArrayInputStream(Array(Random.nextInt().toByte))
65+
val stream = new ByteArrayInputStream(Array((Random.nextInt(2048) + 1).toByte)) // scalastyle:ignore
6766
val id = ConfluentSingleObjectEncoding.extractId(stream, endianness)
68-
id.left.map(_.length == 1) should be(Left(true))
67+
id.left.map(_.length == 0) should be(Left(true))
6968
stream.read() should not be (-1)
7069
stream.read() should be(-1)
7170
}
7271

7372
"extractId(InputStream)" should "return Left if the input stream does not have the expected header" in {
7473
val stream = new ByteArrayInputStream(Array(0x01.toByte))
7574
val id = ConfluentSingleObjectEncoding.extractId(stream, endianness)
76-
id.left.map(_.sameElements(Array(0x01.toByte))) should be(Left(true))
75+
id.left.map(_.length == 0) should be(Left(true))
7776
stream.read().toByte should be(0x01.toByte)
7877
stream.read() should be(-1)
7978
}

confluent/src/main/scala/it/agilelab/darwin/connector/confluent/ConfluentConnector.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import it.agilelab.darwin.common.Connector
66
import it.agilelab.darwin.common.compat._
77
import it.agilelab.darwin.manager.SchemaPayloadPair
88
import it.agilelab.darwin.manager.exception.DarwinException
9+
import it.agilelab.darwin.manager.util.ConfluentSingleObjectEncoding
910
import org.apache.avro.Schema
1011

1112
import java.io.{ IOException, InputStream, OutputStream }
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
it.agilelab.darwin.connector.multi.MultiConnectorCreator

0 commit comments

Comments
 (0)