Skip to content

Commit 2b05e3f

Browse files
author
Pirazzini Lorenzo
committed
[#19] Cached and lazy strategies
1 parent 96ea262 commit 2b05e3f

File tree

16 files changed

+97
-34
lines changed

16 files changed

+97
-34
lines changed

common/src/main/scala/it/agilelab/darwin/common/Connector.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,11 @@ abstract class Connector(config: Config) extends Serializable {
2727
*/
2828
def insert(schemas: Seq[(Long, Schema)]): Unit
2929

30+
/**
31+
* Retrieves a single schema using its ID from the storage.
32+
*
33+
* @param id the ID of the schema
34+
* @return an option that is empty if no schema was found for the ID or defined if a schema was found
35+
*/
3036
def findSchema(id: Long): Option[Schema]
3137
}

core/src/main/scala/it/agilelab/darwin/manager/AvroSchemaManager.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ import org.apache.avro.{Schema, SchemaNormalization}
88
import it.agilelab.darwin.manager.util.ByteArrayUtils._
99
import scala.collection.JavaConverters._
1010

11+
/**
12+
* The main entry point of the Darwin library.
13+
* An instance of AvroSchemaManager should ALWAYS be obtained through the AvroSchemaManagerFactory.
14+
* The manager is responsible for schemas registration, retrieval and updates.
15+
*/
1116
trait AvroSchemaManager extends Logging {
1217
private val V1_HEADER = Array[Byte](0xC3.toByte, 0x01.toByte)
1318
private val ID_SIZE = 8

core/src/main/scala/it/agilelab/darwin/manager/AvroSchemaManagerFactory.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,24 @@ import it.agilelab.darwin.common.Logging
55
import it.agilelab.darwin.manager.exception.ConnectorNotFoundException
66
import it.agilelab.darwin.manager.util.ConfigUtil
77

8+
/**
9+
* Factory used to obtain the desired implementation of AvroSchemaManager.
10+
* First of all the initialize method should be called passing the configuration (it will return an instance of
11+
* AvroSchemaManager. Then, the same instance can be retrieved using the getInstance method without passing the
12+
* configuration anymore.
13+
*/
814
object AvroSchemaManagerFactory extends Logging {
915

1016
private var _instance: AvroSchemaManager = _
1117

1218
/**
13-
* Returns an instance of AvroSchemaManager that can be used to register schemas.
19+
* Returns an instance of AvroSchemaManager that can be used to register and retrieve schemas.
1420
*
1521
* @param config the Config that is passed to the connector
1622
* @return an instance of AvroSchemaManager
1723
*/
1824
@throws[ConnectorNotFoundException]
19-
def getInstance(config: Config): AvroSchemaManager = {
25+
def initialize(config: Config): AvroSchemaManager = {
2026
synchronized {
2127
if (_instance == null) {
2228
log.debug("creating instance of AvroSchemaManager")
@@ -33,6 +39,12 @@ object AvroSchemaManagerFactory extends Logging {
3339
}
3440
}
3541

42+
/**
43+
* Returns the initialized instance of AvroSchemaManager that can be used to register and retrieve schemas.
44+
* The instance must be created once using the initialize method passing a configuration before calling this method.
45+
*
46+
* @return the initialized instance of AvroSchemaManager
47+
*/
3648
def getInstance: AvroSchemaManager = {
3749
synchronized {
3850
if (_instance == null) {

core/src/main/scala/it/agilelab/darwin/manager/CachedAvroSchemaManager.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ package it.agilelab.darwin.manager
33
import java.util.concurrent.atomic.AtomicReference
44
import org.apache.avro.Schema
55

6+
/**
7+
* Implementation of AvroSchemaManager that defines a cache where the storage data is loaded, in order to reduce the
8+
* number of accesses to the storage.
9+
*/
610
trait CachedAvroSchemaManager extends AvroSchemaManager {
711
protected val _cache: AtomicReference[Option[AvroSchemaCache]] = new AtomicReference[Option[AvroSchemaCache]](None)
812

@@ -23,7 +27,7 @@ trait CachedAvroSchemaManager extends AvroSchemaManager {
2327
* Throws an exception if the cache wasn't already loaded (the getInstance method must always be used to
2428
* initialize the cache using the required configuration).
2529
*/
26-
def reload(): AvroSchemaManager = {
30+
override def reload(): AvroSchemaManager = {
2731
log.debug("reloading cache...")
2832
_cache.set(Some(AvroSchemaCacheFingerprint(connector.fullLoad())))
2933
log.debug("cache reloaded")

core/src/main/scala/it/agilelab/darwin/manager/CachedEagerAvroSchemaManager.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@ import com.typesafe.config.Config
44
import org.apache.avro.Schema
55

66
/**
7-
* The main entry point to this library.
8-
* N.B.: each method all on this object must always be AFTER the initialization, performed invoking the initialize
9-
* method.
7+
* Implementation of CachedAvroSchemaManager that loads all the schemas into the cache at startup and doesn't
8+
* perform any other accesses to the storage: each retrieve is performed onto the cache.
109
*/
11-
case class CachedEagerAvroSchemaManager(override val config: Config) extends CachedAvroSchemaManager {
10+
case class CachedEagerAvroSchemaManager private[darwin](override val config: Config) extends CachedAvroSchemaManager {
1211

1312
override def getSchema(id: Long): Option[Schema] = cache.getSchema(id)
1413
}

core/src/main/scala/it/agilelab/darwin/manager/CachedLazyAvroSchemaManager.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ package it.agilelab.darwin.manager
22
import com.typesafe.config.Config
33
import org.apache.avro.Schema
44

5-
case class CachedLazyAvroSchemaManager(override val config: Config) extends CachedAvroSchemaManager {
5+
/**
6+
* Implementation of CachedAvroSchemaManager that loads all the schemas into the cache at startup and perform
7+
* all the retrieves onto the cache; an access to the storage is performed only if there is a cache miss.
8+
*/
9+
case class CachedLazyAvroSchemaManager private[darwin](override val config: Config) extends CachedAvroSchemaManager {
610

711
override def getSchema(id: Long): Option[Schema] = {
812
cache.getSchema(id).orElse{

core/src/main/scala/it/agilelab/darwin/manager/LazyAvroSchemaManager.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@ package it.agilelab.darwin.manager
33
import com.typesafe.config.Config
44
import org.apache.avro.Schema
55

6-
case class LazyAvroSchemaManager(override val config: Config) extends AvroSchemaManager {
6+
/**
7+
* Implementation of AvroSchemaManager that performs all the operations directly on the storage (retrievals and
8+
* insertions).
9+
*/
10+
case class LazyAvroSchemaManager private[darwin](override val config: Config) extends AvroSchemaManager {
711

812
override def getSchema(id: Long): Option[Schema] = connector.findSchema(id)
913

hbase/src/main/scala/it/agilelab/darwin/connector/hbase/HBaseConnector.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import org.apache.avro.Schema.Parser
77
import org.apache.commons.io.IOUtils
88
import org.apache.hadoop.conf.Configuration
99
import org.apache.hadoop.fs.Path
10-
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Result}
10+
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Put, Result}
1111
import org.apache.hadoop.hbase.security.User
1212
import org.apache.hadoop.hbase.util.Bytes
1313
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
@@ -126,8 +126,14 @@ case class HBaseConnector(config: Config) extends Connector(config) with Logging
126126
log.debug(s"insertion of schemas into $NAMESPACE_STRING:$TABLE_NAME_STRING successful")
127127
}
128128

129-
override def findSchema(id: Long): Option[Schema] = ???
129+
override def findSchema(id: Long): Option[Schema] = {
130+
log.debug(s"loading a schema with id = $id from table $NAMESPACE_STRING:$TABLE_NAME_STRING")
131+
val get: Get = new Get(Bytes.toBytes(id))
132+
get.addColumn(CF, QUALIFIER)
133+
val result: Result = connection.getTable(TABLE_NAME).get(get)
134+
val value: Option[Array[Byte]] = Option(result.getValue(CF, QUALIFIER))
135+
val schema: Option[Schema] = value.map(v => parser.parse(Bytes.toString(v)))
136+
log.debug(s"$schema loaded from HBase for id = $id")
137+
schema
138+
}
130139
}
131-
132-
133-

hbase/src/test/scala/it/agilelab/darwin/connector/hbase/HBaseConnectorSuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ class HBaseConnectorSuite extends FlatSpec with Matchers {
2121
val loaded: Seq[(Long, Schema)] = connector.fullLoad()
2222
assert(loaded.size == schemas.size)
2323
assert(loaded.forall(schemas.contains))
24+
val schema = connector.findSchema(loaded.head._1)
25+
assert(schema.isDefined)
26+
assert(schema.get == loaded.head._2)
27+
val noSchema = connector.findSchema(-1L)
28+
assert(noSchema.isEmpty)
2429
}
2530

2631
}

mock-application/src/test/java/it/agilelab/darwin/app/mock/JavaApplicationTest.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,33 @@
77
import it.agilelab.darwin.manager.AvroSchemaManagerFactory;
88
import org.apache.avro.Schema;
99
import org.apache.avro.reflect.ReflectData;
10+
import org.junit.jupiter.api.Test;
1011
import org.reflections.Reflections;
1112
import scala.collection.JavaConversions;
1213

13-
import java.util.ArrayList;
14-
import java.util.List;
15-
import java.util.Set;
14+
import java.util.*;
1615

17-
class JavaApplicationTest {
1816

19-
AvroSchemaManager manager = AvroSchemaManagerFactory.getInstance(ConfigFactory.empty());
17+
class JavaApplicationTest {
2018

19+
@Test
2120
void mainTest() {
21+
Map<String, Object> configMap = new HashMap<>();
22+
configMap.put("type", "cached_eager");
23+
AvroSchemaManager manager = AvroSchemaManagerFactory.initialize(ConfigFactory.parseMap(configMap));
24+
2225
List<Schema> schemas = new ArrayList<>();
2326
Schema s = ReflectData.get().getSchema(OneField.class);
2427
schemas.add(s);
2528
manager.getSchema(0L);
26-
27-
AvroSchemaManagerFactory.getInstance(ConfigFactory.empty()).registerAll(JavaConversions.asScalaBuffer(schemas));
29+
AvroSchemaManagerFactory.getInstance().registerAll(JavaConversions.asScalaBuffer(schemas));
2830

2931
long id = manager.getId(schemas.get(0));
3032
assert(manager.getSchema(id).isDefined());
3133
assert (schemas.get(0) == manager.getSchema(id).get());
3234
}
3335

36+
@Test
3437
void reflectionTest() {
3538
Reflections reflections = new Reflections("it.agilelab.darwin.app.mock.classes");
3639

0 commit comments

Comments
 (0)