Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ lazy val `kafka-jdbc-connector` =
(project in file("."))
.settings(
name := "kafka-jdbc-connector",
version := "1.2.0",
version := "1.2.0-SNAPSHOT",
organization := "com.agoda",
scalaVersion := "2.11.7",
crossScalaVersions := Seq("2.11.7", "2.12.2"),
Expand Down
1 change: 0 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import sbt._

object Dependencies {
private val ScalaTestV = "3.0.1"

private val LogBack = "ch.qos.logback" % "logback-classic" % "1.2.3"
private val ScalaLogging = "com.typesafe.scala-logging" %% "scala-logging" % "3.5.0"
private val KafkaConnectApi = "org.apache.kafka" % "connect-api" % "0.9.0.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.util.{Failure, Success, Try}

class JdbcSourceConnector extends SourceConnector {
private val logger = LoggerFactory.getLogger(classOf[JdbcSourceConnector])

Class.forName("org.postgresql.Driver")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The driver should be loaded automatically, is it required to explicitly do this?

private var config: JdbcSourceConnectorConfig = _

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import scala.collection.immutable.IndexedSeq
*
* MySQL :: MySQL Server.
*
* PostgreSQL :: PostgreSQL Server.
*/
sealed abstract class DatabaseProduct(override val entryName: String) extends EnumEntry

Expand All @@ -21,4 +22,5 @@ object DatabaseProduct extends Enum[DatabaseProduct] {

case object MsSQL extends DatabaseProduct("Microsoft SQL Server")
case object MySQL extends DatabaseProduct("MySQL")
case object PostgreSQL extends DatabaseProduct("PostgreSQL")
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.sql.{Connection, PreparedStatement, ResultSet}

import com.agoda.kafka.connector.jdbc.JdbcSourceConnectorConstants
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL}
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL, PostgreSQL}
import com.agoda.kafka.connector.jdbc.models.Mode.IncrementingMode
import com.agoda.kafka.connector.jdbc.utils.DataConverter
import org.apache.kafka.connect.data.Schema
Expand Down Expand Up @@ -44,8 +44,9 @@ case class IdBasedDataService(databaseProduct: DatabaseProduct,

override def createPreparedStatement(connection: Connection): Try[PreparedStatement] = Try {
val preparedStatement = databaseProduct match {
case MsSQL => connection.prepareStatement(s"EXECUTE $storedProcedureName @$incrementingVariableName = ?, @$batchSizeVariableName = ?")
case MySQL => connection.prepareStatement(s"CALL $storedProcedureName (@$incrementingVariableName := ?, @$batchSizeVariableName := ?)")
case MsSQL => connection.prepareStatement(s"EXECUTE $storedProcedureName @$incrementingVariableName = ?, @$batchSizeVariableName = ?")
case MySQL => connection.prepareStatement(s"CALL $storedProcedureName (@$incrementingVariableName := ?, @$batchSizeVariableName := ?)")
case PostgreSQL => connection.prepareStatement(s"SELECT * from $storedProcedureName (?, ?)")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass arguments by name?

}
preparedStatement.setObject(1, incrementingOffset)
preparedStatement.setObject(2, batchSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.{Date, GregorianCalendar, TimeZone}

import com.agoda.kafka.connector.jdbc.JdbcSourceConnectorConstants
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL}
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL, PostgreSQL}
import com.agoda.kafka.connector.jdbc.models.Mode.TimestampMode
import com.agoda.kafka.connector.jdbc.utils.DataConverter
import org.apache.kafka.connect.data.Schema
Expand Down Expand Up @@ -43,8 +43,9 @@ case class TimeBasedDataService(databaseProduct: DatabaseProduct,

override def createPreparedStatement(connection: Connection): Try[PreparedStatement] = Try {
val preparedStatement = databaseProduct match {
case MsSQL => connection.prepareStatement(s"EXECUTE $storedProcedureName @$timestampVariableName = ?, @$batchSizeVariableName = ?")
case MySQL => connection.prepareStatement(s"CALL $storedProcedureName (@$timestampVariableName := ?, @$batchSizeVariableName := ?)")
case MsSQL => connection.prepareStatement(s"EXECUTE $storedProcedureName @$timestampVariableName = ?, @$batchSizeVariableName = ?")
case MySQL => connection.prepareStatement(s"CALL $storedProcedureName (@$timestampVariableName := ?, @$batchSizeVariableName := ?)")
case PostgreSQL => connection.prepareStatement(s"SELECT * from $storedProcedureName (?, ?)")
}
preparedStatement.setTimestamp(1, new Timestamp(timestampOffset), calendar)
preparedStatement.setObject(2, batchSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.util.{Date, GregorianCalendar, TimeZone}

import com.agoda.kafka.connector.jdbc.JdbcSourceConnectorConstants
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL}
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL, PostgreSQL}
import com.agoda.kafka.connector.jdbc.models.Mode.{IncrementingMode, TimestampMode}
import com.agoda.kafka.connector.jdbc.utils.DataConverter
import org.apache.kafka.connect.data.Schema
Expand Down Expand Up @@ -53,8 +53,9 @@ case class TimeIdBasedDataService(databaseProduct: DatabaseProduct,

override def createPreparedStatement(connection: Connection): Try[PreparedStatement] = Try {
val preparedStatement = databaseProduct match {
case MsSQL => connection.prepareStatement(s"EXECUTE $storedProcedureName @$timestampVariableName = ?, @$incrementingVariableName = ?, @$batchSizeVariableName = ?")
case MySQL => connection.prepareStatement(s"CALL $storedProcedureName (@$timestampVariableName := ?, @$incrementingVariableName := ?, @$batchSizeVariableName := ?)")
case MsSQL => connection.prepareStatement(s"EXECUTE $storedProcedureName @$timestampVariableName = ?, @$incrementingVariableName = ?, @$batchSizeVariableName = ?")
case MySQL => connection.prepareStatement(s"CALL $storedProcedureName (@$timestampVariableName := ?, @$incrementingVariableName := ?, @$batchSizeVariableName := ?)")
case PostgreSQL => connection.prepareStatement(s"SELECT * from $storedProcedureName (?, ?, ?)")
}
preparedStatement.setTimestamp(1, new Timestamp(timestampOffset), calendar)
preparedStatement.setObject(2, incrementingOffset)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package com.agoda.kafka.connector.jdbc.models

import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL}
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{
MsSQL,
MySQL,
PostgreSQL
}
import org.scalatest.{Matchers, WordSpec}

class DatabaseProductTest extends WordSpec with Matchers {
class DatabaseProductTest extends WordSpec with Matchers {

"module" should {
"convert DatabaseProduct to its string representation" in {
DatabaseProduct.MySQL.entryName shouldEqual "MySQL"
DatabaseProduct.MsSQL.entryName shouldEqual "Microsoft SQL Server"
DatabaseProduct.PostgreSQL.entryName shouldEqual "PostgreSQL"
}

"convert string to corresponding DatabaseProduct representation" in {
DatabaseProduct.withName("MySQL") shouldBe MySQL
DatabaseProduct.withName("Microsoft SQL Server") shouldBe MsSQL
DatabaseProduct.withName("PostgreSQL") shouldBe PostgreSQL
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.agoda.kafka.connector.jdbc.services
import java.sql.{Connection, PreparedStatement, ResultSet}

import com.agoda.kafka.connector.jdbc.JdbcSourceConnectorConstants
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL}
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL, PostgreSQL}
import com.agoda.kafka.connector.jdbc.models.Mode.IncrementingMode
import com.agoda.kafka.connector.jdbc.utils.DataConverter
import org.apache.kafka.connect.data.{Field, Schema, Struct}
Expand Down Expand Up @@ -50,6 +50,20 @@ class IdBasedDataServiceTest extends WordSpec with Matchers with MockitoSugar {
dataConverter = dataConverter
)

val idBasedDataServicePostgreSQL =
IdBasedDataService(
databaseProduct = PostgreSQL,
storedProcedureName = "stored-procedure",
batchSize = 100,
batchSizeVariableName = "batch-size-variable",
incrementingVariableName = "incrementing-variable",
incrementingOffset = 0L,
incrementingFieldName = "id",
topic = "id-based-data-topic",
keyFieldOpt = None,
dataConverter = dataConverter
)

"create correct prepared statement for Mssql" in {
val connection = mock[Connection]
val statement = mock[PreparedStatement]
Expand Down Expand Up @@ -80,6 +94,21 @@ class IdBasedDataServiceTest extends WordSpec with Matchers with MockitoSugar {
verify(statement).setObject(2, 100)
}

"create correct prepared statement for PostgreSQL" in {
val connection = mock[Connection]
val statement = mock[PreparedStatement]

when(connection.prepareStatement("SELECT * from stored-procedure (?, ?)")).thenReturn(statement)
doNothing().when(statement).setObject(1, 0L)
doNothing().when(statement).setObject(2, 100)

idBasedDataServicePostgreSQL.createPreparedStatement(connection)

verify(connection).prepareStatement("SELECT * from stored-procedure (?, ?)")
verify(statement).setObject(1, 0L)
verify(statement).setObject(2, 100)
}

"create correct string representation" in {
idBasedDataServiceMssql.toString shouldBe
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.sql._
import java.util.{GregorianCalendar, TimeZone}

import com.agoda.kafka.connector.jdbc.JdbcSourceConnectorConstants
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL}
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL, PostgreSQL}
import com.agoda.kafka.connector.jdbc.models.Mode.{IncrementingMode, TimestampMode}
import com.agoda.kafka.connector.jdbc.utils.DataConverter
import org.apache.kafka.connect.data.{Field, Schema, Struct}
Expand Down Expand Up @@ -54,6 +54,21 @@ class TimeBasedDataServiceTest extends WordSpec with Matchers with MockitoSugar
calendar = UTC_Calendar
)

val timeBasedDataServicePostgreSQL =
TimeBasedDataService(
databaseProduct = PostgreSQL,
storedProcedureName = "stored-procedure",
batchSize = 100,
batchSizeVariableName = "batch-size-variable",
timestampVariableName = "timestamp-variable",
timestampOffset = 0L,
timestampFieldName = "time",
topic = "time-based-data-topic",
keyFieldOpt = None,
dataConverter = dataConverter,
calendar = UTC_Calendar
)

val timestamp = new Timestamp(0L)

"create correct prepared statement for Mssql" in {
Expand Down Expand Up @@ -86,6 +101,21 @@ class TimeBasedDataServiceTest extends WordSpec with Matchers with MockitoSugar
verify(statement).setObject(2, 100)
}

"create correct prepared statement for PostgreSQL" in {
val connection = mock[Connection]
val statement = mock[PreparedStatement]

when(connection.prepareStatement("SELECT * from stored-procedure (?, ?)")).thenReturn(statement)
doNothing().when(statement).setTimestamp(1, timestamp, UTC_Calendar)
doNothing().when(statement).setObject(2, 100)

timeBasedDataServicePostgreSQL.createPreparedStatement(connection)

verify(connection).prepareStatement("SELECT * from stored-procedure (?, ?)")
verify(statement).setTimestamp(1, timestamp, UTC_Calendar)
verify(statement).setObject(2, 100)
}

"create correct string representation" in {
timeBasedDataServiceMssql.toString shouldBe
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.sql._
import java.util.{GregorianCalendar, TimeZone}

import com.agoda.kafka.connector.jdbc.JdbcSourceConnectorConstants
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL}
import com.agoda.kafka.connector.jdbc.models.DatabaseProduct.{MsSQL, MySQL, PostgreSQL}
import com.agoda.kafka.connector.jdbc.models.Mode.{IncrementingMode, TimestampMode}
import com.agoda.kafka.connector.jdbc.utils.DataConverter
import org.apache.kafka.connect.data.{Field, Schema, Struct}
Expand Down Expand Up @@ -61,6 +61,24 @@ class TimeIdBasedDataServiceTest extends WordSpec with Matchers with MockitoSuga
calendar = UTC_CALENDAR
)

val timeIdBasedDataServicePostgresql =
TimeIdBasedDataService(
databaseProduct = PostgreSQL,
storedProcedureName = "stored-procedure",
batchSize = 100,
batchSizeVariableName = "batch-size-variable",
timestampVariableName = "timestamp-variable",
timestampOffset = 0L,
timestampFieldName = "time",
incrementingVariableName = "incrementing-variable",
incrementingOffset = 0L,
incrementingFieldName = "id",
topic = "time-id-based-data-topic",
keyFieldOpt = None,
dataConverter = dataConverter,
calendar = UTC_CALENDAR
)

val timestamp = new Timestamp(0L)

"create correct prepared statement for Mssql" in {
Expand Down Expand Up @@ -97,6 +115,23 @@ class TimeIdBasedDataServiceTest extends WordSpec with Matchers with MockitoSuga
verify(statement).setObject(3, 100)
}

"create correct prepared statement for PostgreSQL" in {
val connection = mock[Connection]
val statement = mock[PreparedStatement]

when(connection.prepareStatement("SELECT * from stored-procedure (?, ?, ?)")).thenReturn(statement)
doNothing().when(statement).setTimestamp(1, timestamp, UTC_CALENDAR)
doNothing().when(statement).setObject(2, 0L)
doNothing().when(statement).setObject(3, 100)

timeIdBasedDataServicePostgresql.createPreparedStatement(connection)

verify(connection).prepareStatement("SELECT * from stored-procedure (?, ?, ?)")
verify(statement).setTimestamp(1, timestamp, UTC_CALENDAR)
verify(statement).setObject(2, 0L)
verify(statement).setObject(3, 100)
}

"create correct string representation" in {
timeIdBasedDataServiceMssql.toString shouldBe
s"""
Expand Down