Skip to content

Commit 75db89d

Browse files
authored
Feature/improve configurations and error handling (#20)
* set scalac flags * refactoring and custom exception
1 parent ee65678 commit 75db89d

File tree

10 files changed

+80
-58
lines changed

10 files changed

+80
-58
lines changed

KaaSchemaRegistry/src/main/scala/com/davideicardi/kaa/KaaSchemaRegistry.scala

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
55
import org.apache.avro.SchemaNormalization
66
import org.apache.kafka.clients.consumer.KafkaConsumer
77
import java.util.{Collections, Properties, UUID}
8-
import java.util.concurrent.Executors
9-
import java.util.concurrent.ExecutorService
108
import java.time.{Duration => JavaDuration}
11-
import collection.JavaConverters._
129
import com.github.blemale.scaffeine.{ Cache, Scaffeine }
1310
import org.apache.kafka.clients.producer.ProducerConfig
1411
import org.apache.kafka.clients.consumer.ConsumerConfig
@@ -20,6 +17,7 @@ import java.util.concurrent.atomic.AtomicBoolean
2017
import scala.concurrent.Future
2118
import scala.concurrent.ExecutionContext
2219
import scala.concurrent.Await
20+
import com.davideicardi.kaa.utils.RetryConfig
2321

2422
object KaaSchemaRegistry {
2523
val DEFAULT_TOPIC_NAME = "schemas-v1"
@@ -30,20 +28,18 @@ class KaaSchemaRegistry(
3028
topic: String = KaaSchemaRegistry.DEFAULT_TOPIC_NAME,
3129
cliendId: String = "KaaSchemaRegistry",
3230
pollInterval: Duration = 5.second,
33-
getRetries: Int = 5,
34-
getRetryDelay: Duration = 2.second
31+
getRetry: RetryConfig = RetryConfig(5, 2.second)
3532
) extends SchemaRegistry {
3633

37-
// TODO Eval to put this code inside an "init" function instead of here in the constructor
38-
private val producer: KafkaProducer[java.lang.Long, String]
39-
= new KafkaProducer(createProducerConfig(), new LongSerializer(), new StringSerializer())
40-
private val consumer: KafkaConsumer[java.lang.Long, String]
41-
= new KafkaConsumer(createConsumerConfig(), new LongDeserializer(), new StringDeserializer())
42-
private var executor: ExecutorService = null
34+
implicit private val ec = ExecutionContext.global
35+
36+
private val producer = createProducer()
37+
private val consumer = createConsumer()
4338
private val cache: Cache[Long, String] = Scaffeine().build[Long, String]()
4439
private val stopping = new AtomicBoolean(false)
45-
implicit private val ec = ExecutionContext.global
46-
private val subscriber = Future {
40+
private val startConsumerFuture = startConsumer()
41+
42+
private def startConsumer(): Future[Unit] = Future {
4743
consumer.subscribe(Collections.singletonList(topic))
4844
val jPollInterval = JavaDuration.ofMillis(pollInterval.toMillis)
4945
while (!stopping.get()) {
@@ -57,13 +53,14 @@ class KaaSchemaRegistry(
5753
consumer.close();
5854
}
5955

60-
// TODO eval if shutdown is called properly
6156
def shutdown(): Unit = {
6257
stopping.set(true)
63-
Await.result(subscriber, 10.seconds)
58+
Await.result(startConsumerFuture, 10.seconds)
6459
}
6560

6661
override def put(schema: Schema): SchemaId = {
62+
if (stopping.get()) throw new UnsupportedOperationException("KaaSchemaRegistry is not available")
63+
6764
val fingerprint = SchemaNormalization.parsingFingerprint64(schema)
6865

6966
if (cache.getIfPresent(fingerprint).isEmpty) {
@@ -75,28 +72,36 @@ class KaaSchemaRegistry(
7572
}
7673

7774
override def get(id: SchemaId): Option[Schema] = {
78-
Retry.retryIfNone(getRetries, getRetryDelay) {
75+
Retry.retryIfNone(getRetry) {
7976
cache.getIfPresent(id.value)
8077
.map(new Schema.Parser().parse)
8178
}
8279
}
8380

84-
def createConsumerConfig(): Properties = {
81+
protected def createConsumer() = {
82+
new KafkaConsumer(consumerProps(), new LongDeserializer(), new StringDeserializer())
83+
}
84+
85+
protected def consumerProps(): Properties = {
8586
val props = new Properties()
8687
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
8788
props.put(ConsumerConfig.CLIENT_ID_CONFIG, cliendId)
8889
props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
8990
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
9091
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
91-
// TODO study if we need other default properties and allow to extend this from outside
92+
9293
props
9394
}
9495

95-
def createProducerConfig(): Properties = {
96+
protected def createProducer() = {
97+
new KafkaProducer(producerProps(), new LongSerializer(), new StringSerializer())
98+
}
99+
100+
protected def producerProps(): Properties = {
96101
val props = new Properties()
97102
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
98103
props.put(ProducerConfig.CLIENT_ID_CONFIG, cliendId)
99-
// TODO study if we need other default properties and allow to extend this from outside
104+
100105
props
101106
}
102107
}
Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,7 @@
11
package com.davideicardi.kaa
22

3-
import org.apache.avro.Schema
4-
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
5-
import org.apache.avro.SchemaNormalization
6-
import org.apache.kafka.clients.consumer.KafkaConsumer
7-
import java.util.{Collections, Properties, UUID, Optional}
8-
import java.util.concurrent.Executors
9-
import java.util.concurrent.ExecutorService
3+
import java.util.{Collections, Properties, Optional}
104
import scala.jdk.CollectionConverters._
11-
import com.github.blemale.scaffeine.{ Cache, Scaffeine }
12-
import org.apache.kafka.clients.producer.ProducerConfig
13-
import org.apache.kafka.clients.consumer.ConsumerConfig
14-
import org.apache.kafka.common.serialization.{LongDeserializer, StringDeserializer}
15-
import org.apache.kafka.common.serialization.{LongSerializer, StringSerializer}
165
import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
176
import org.apache.kafka.common.config.TopicConfig
187

@@ -32,7 +21,7 @@ class KaaSchemaRegistryAdmin(
3221
)
3322
newTopic.configs(newTopicsConfigs.asJava)
3423

35-
adminClient.createTopics(Collections.singletonList(newTopic)).all().get()
24+
val _ = adminClient.createTopics(Collections.singletonList(newTopic)).all().get()
3625
}
3726

3827
def topicExists(): Boolean = {
@@ -41,6 +30,6 @@ class KaaSchemaRegistryAdmin(
4130
}
4231

4332
def deleteTopic(): Unit = {
44-
adminClient.deleteTopics(Collections.singletonList(topic)).all().get()
33+
val _ = adminClient.deleteTopics(Collections.singletonList(topic)).all().get()
4534
}
4635
}

KaaSchemaRegistry/src/main/scala/com/davideicardi/kaa/SchemaRegistry.scala

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,6 @@
11
package com.davideicardi.kaa
22

33
import org.apache.avro.Schema
4-
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
5-
import org.apache.avro.SchemaNormalization
6-
import org.apache.kafka.clients.consumer.KafkaConsumer
7-
import java.util.{Collections, Properties}
8-
import java.util.concurrent.Executors
9-
import java.util.concurrent.ExecutorService
10-
import collection.JavaConverters._
11-
import com.github.blemale.scaffeine.{ Cache, Scaffeine }
12-
import org.apache.kafka.clients.producer.ProducerConfig
13-
import org.apache.kafka.clients.consumer.ConsumerConfig
14-
import org.apache.kafka.common.serialization.{LongDeserializer, StringDeserializer}
15-
import org.apache.kafka.common.serialization.{LongSerializer, StringSerializer}
164

175
trait SchemaRegistry {
186
def put(schema: Schema): SchemaId

KaaSchemaRegistry/src/main/scala/com/davideicardi/kaa/avro/AvroSingleObjectEncoding.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.davideicardi.kaa.avro
33
import java.nio.{ByteBuffer, ByteOrder}
44

55
import com.davideicardi.kaa.SchemaId
6+
import com.davideicardi.kaa.InvalidSchemaException
67

78
/**
89
* https://avro.apache.org/docs/current/spec.html#single_object_encoding
@@ -24,7 +25,7 @@ object AvroSingleObjectEncoding {
2425
SchemaId(id) -> data
2526
}
2627
else {
27-
throw new Exception(s"Byte array is not in correct format. First ${V1_HEADER.length} bytes are not equal" +
28+
throw new InvalidSchemaException(s"Byte array is not in correct format. First ${V1_HEADER.length} bytes are not equal" +
2829
s" to ${V1_HEADER.mkString("[", ", ", "]")}")
2930
}
3031
}

KaaSchemaRegistry/src/main/scala/com/davideicardi/kaa/avro/AvroSingleObjectSerializer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.davideicardi.kaa.avro
22

33
import com.sksamuel.avro4s._
44
import com.davideicardi.kaa.SchemaRegistry
5+
import com.davideicardi.kaa.SchemaNotFoundException
56

67
class AvroSingleObjectSerializer[T >: Null : SchemaFor : Encoder : Decoder]
78
(schemaRegistry: SchemaRegistry){
@@ -18,7 +19,7 @@ class AvroSingleObjectSerializer[T >: Null : SchemaFor : Encoder : Decoder]
1819
def deserialize(bytes: Array[Byte]): T = {
1920
val (schemaId, serialized) = AvroSingleObjectEncoding.decode(bytes)
2021
val schema = schemaRegistry.get(schemaId)
21-
.getOrElse(throw new Exception(s"Schema $schemaId not found in registry"))
22+
.getOrElse(throw new SchemaNotFoundException(s"Schema $schemaId not found in registry"))
2223

2324
binarySerializer.read(serialized, schema)
2425
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.davideicardi.kaa
2+
3+
4+
class InvalidSchemaException(s:String) extends Exception(s){}
5+
6+
class SchemaNotFoundException(s:String) extends Exception(s){}

KaaSchemaRegistry/src/main/scala/com/davideicardi/kaa/utils/Retry.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,24 @@ import scala.concurrent._
55

66
object Retry {
77
@annotation.tailrec
8-
def retryIfNone[T](count: Int, delay: Duration)(fn: => Option[T]): Option[T] = {
8+
def retryIfNone[T](retryConfig: RetryConfig)(fn: => Option[T]): Option[T] = {
99
fn match {
1010
case Some(x) => Some(x)
11-
case None if count > 1 => {
12-
Sleep.sleep(delay.toMillis)
13-
println("retrying...")
14-
retryIfNone(count - 1, delay)(fn)
11+
case None if retryConfig.count > 1 => {
12+
Sleep.sleep(retryConfig.delay.toMillis)
13+
retryIfNone(retryConfig.copy(count = retryConfig.count - 1))(fn)
1514
}
1615
case None => None
1716
}
1817
}
1918
}
2019

20+
object RetryConfig {
21+
val NoRetry = RetryConfig(0, 0.second)
22+
}
23+
24+
case class RetryConfig(count: Int, delay: Duration) {}
25+
2126
object Sleep {
2227
def sleep(millis: Long): Unit =
2328
try {

KaaSchemaRegistry/src/test/scala/kaa/darwin/AvroSingleObjectSerializerSpec.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@ class AvroSingleObjectSerializerSpec extends AnyFlatSpec with should.Matchers {
3636
bin should be (binarySerializer.write(dragonite))
3737
}
3838

39-
// TODO add tests to verify that it is backward and forward compatibile
40-
// add tests to verify what's happening when schema is not found
41-
4239
object AvroUtils {
4340
def calcFingerprint(schema: Schema): SchemaId = {
4441
SchemaId(SchemaNormalization.parsingFingerprint64(schema))

SampleApp/src/main/scala/sampleApp.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import com.davideicardi.kaa.avro.AvroSingleObjectSerializer
22
import com.davideicardi.kaa.KaaSchemaRegistry
3-
import scala.tools.nsc.doc.html.HtmlTags.Sup
43
import com.davideicardi.kaa.KaaSchemaRegistryAdmin
54
object SampleApp {
65
def main(args: Array[String]): Unit = {

build.sbt

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,38 @@ import xerial.sbt.Sonatype._
33
// Common settings
44
organization in ThisBuild := "com.davideicardi"
55
scalaVersion in ThisBuild := "2.13.3"
6-
scalacOptions in ThisBuild += "-deprecation"
6+
scalacOptions in ThisBuild ++= Seq(
7+
"-deprecation", // Emit warning and location for usages of deprecated APIs.
8+
"-explaintypes", // Explain type errors in more detail.
9+
"-feature", // Emit warning and location for usages of features that should be imported explicitly.
10+
"-unchecked", // Enable additional warnings where generated code depends on assumptions.
11+
"-Xcheckinit", // Wrap field accessors to throw an exception on uninitialized access.
12+
"-Xfatal-warnings", // Fail the compilation if there are any warnings.
13+
"-Xlint:adapted-args", // Warn if an argument list is modified to match the receiver.
14+
"-Xlint:constant", // Evaluation of a constant arithmetic expression results in an error.
15+
"-Xlint:delayedinit-select", // Selecting member of DelayedInit.
16+
"-Xlint:doc-detached", // A Scaladoc comment appears to be detached from its element.
17+
"-Xlint:inaccessible", // Warn about inaccessible types in method signatures.
18+
"-Xlint:infer-any", // Warn when a type argument is inferred to be `Any`.
19+
"-Xlint:missing-interpolator", // A string literal appears to be missing an interpolator id.
20+
"-Xlint:nullary-unit", // Warn when nullary methods return Unit.
21+
"-Xlint:option-implicit", // Option.apply used implicit view.
22+
"-Xlint:package-object-classes", // Class or object defined in package object.
23+
"-Xlint:poly-implicit-overload", // Parameterized overloaded implicit methods are not visible as view bounds.
24+
"-Xlint:private-shadow", // A private field (or class parameter) shadows a superclass field.
25+
"-Xlint:stars-align", // Pattern sequence wildcard must align with sequence component.
26+
"-Xlint:type-parameter-shadow", // A local type parameter shadows a type already in scope.
27+
"-Ywarn-dead-code", // Warn when dead code is identified.
28+
"-Ywarn-extra-implicit", // Warn when more than one implicit parameter section is defined.
29+
"-Ywarn-numeric-widen", // Warn when numerics are widened.
30+
"-Ywarn-unused:implicits", // Warn if an implicit parameter is unused.
31+
"-Ywarn-unused:imports", // Warn if an import selector is not referenced.
32+
"-Ywarn-unused:locals", // Warn if a local definition is unused.
33+
"-Ywarn-unused:params", // Warn if a value parameter is unused.
34+
"-Ywarn-unused:patvars", // Warn if a variable bound in a pattern is unused.
35+
"-Ywarn-unused:privates", // Warn if a private member is unused.
36+
"-Ywarn-value-discard", // Warn when non-Unit expression results are unused.
37+
)
738

839
// sbt-dynver version settings
940
dynverSonatypeSnapshots in ThisBuild := true

0 commit comments

Comments
 (0)