Skip to content

Commit a9a7d05

Browse files
feat: add compatibility with Spark 3.5
1 parent db25672 commit a9a7d05

File tree

7 files changed

+67
-57
lines changed

7 files changed

+67
-57
lines changed

.devcontainer/Dockerfile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ ADD .devcontainer/hive-site.xml "/tmp/"
3535
# Install Java & related
3636
RUN curl -s "https://get.sdkman.io?rcupdate=false" | bash
3737

38-
ARG JAVA_VERSION="8.0.402-amzn"
39-
ARG SCALA_VERSION="2.12.17"
38+
ENV JAVA_VERSION="17.0.13-amzn"
39+
ENV SCALA_VERSION="2.12.18"
4040
ENV SCALA_BINARY_VERSION="2.12"
41-
ARG SBT_VERSION="1.8.0"
42-
ARG SPARK_VERSION="3.3.1"
41+
ENV SBT_VERSION="1.10.6"
42+
ENV SPARK_VERSION="3.5.2"
4343

4444
RUN source "${HOME}/.sdkman/bin/sdkman-init.sh" \
4545
&& sdk install java "$JAVA_VERSION" \

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ The platform also includes a simple way to write unit and E2E tests.
1010

1111
## Changes from Original
1212

13+
- Optimized to worh with Spark 3.5.
1314
- Added compatibility with Hive Catalog(Input), MongoDB (Input/Output), Delta (Input/Output) and Iceberg (Input/Output).
1415
- Metrics and Verification results are stored in memory for later used by other apps.
1516
- Files can be read from the same path as the job file using local notation (\"./SOME_PATH"\).
16-
- Optimized to worh with [AWS Glue](https://aws.amazon.com/glue/) 4.0.
1717
- JSON schema is simplified and use pure *json-schema* with no extra elements. Enforce JSON schema validation.
1818
- Add more DQ check operators.
1919
- [Jinja compatible](https://jinja.palletsprojects.com/) templating engine instead of `org.apache.commons.text.StringSubstitutor`.

build.sbt

Lines changed: 51 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,19 @@ scmInfo := Some(
1919
scalaVersion := Option(System.getenv("SCALA_VERSION")).getOrElse("2.12.19")
2020

2121
val sparkVersion: Def.Initialize[String] = Def.setting {
22-
Option(System.getenv("SPARK_VERSION")).getOrElse("3.3.1")
22+
Option(System.getenv("SPARK_VERSION")).getOrElse("3.5.2")
23+
}
24+
25+
val sparkShortVersion: Def.Initialize[String] = Def.setting {
26+
sparkVersion.value.split('.').take(2).mkString(".")
2327
}
2428

2529
val jacksonVersion: Def.Initialize[String] = Def.setting {
2630
Option(System.getenv("JACKSON_VERSION")).getOrElse("2.12.7")
2731
}
2832

2933
val sparkTestVersion: Def.Initialize[String] = Def.setting {
30-
"3.3.1_1.3.0"
34+
"3.5.2_2.0.1"
3135
}
3236

3337
// sbt-scalafix
@@ -42,12 +46,12 @@ Test / testOptions := Seq(
4246
)
4347
)
4448

45-
lazy val excludeAvro = ExclusionRule(organization = "org.apache.avro", name = "avro")
46-
lazy val excludeSpark = ExclusionRule(organization = "org.apache.spark")
47-
lazy val excludeLog4j = ExclusionRule(organization = "org.apache.logging.log4j")
48-
lazy val excludeParquet = ExclusionRule(organization = "org.apache.parquet")
49-
lazy val excludeScalanlp = ExclusionRule(organization = "org.scalanlp")
50-
lazy val excludeJacksonCore = ExclusionRule(organization = "com.fasterxml.jackson.core")
49+
lazy val excludeAvro = ExclusionRule(organization = "org.apache.avro", name = "avro")
50+
lazy val excludeSpark = ExclusionRule(organization = "org.apache.spark")
51+
lazy val excludeLog4j = ExclusionRule(organization = "org.apache.logging.log4j")
52+
lazy val excludeParquet = ExclusionRule(organization = "org.apache.parquet")
53+
lazy val excludeScalanlp = ExclusionRule(organization = "org.scalanlp")
54+
lazy val excludeJacksonCore = ExclusionRule(organization = "com.fasterxml.jackson.core")
5155
lazy val excludeJacksonDataFormat = ExclusionRule(organization = "com.fasterxml.jackson.dataformat")
5256
lazy val excludeJacksonDataType = ExclusionRule(organization = "com.fasterxml.jackson.datatype")
5357
lazy val excludeJacksonModule = ExclusionRule(organization = "com.fasterxml.jackson.module")
@@ -63,52 +67,53 @@ libraryDependencies ++= Seq(
6367
"org.apache.spark" %% "spark-avro" % sparkVersion.value % "provided",
6468
"org.apache.spark" %% "spark-hadoop-cloud" % sparkVersion.value % "provided" excludeAll (excludeAWS),
6569
"com.holdenkarau" %% "spark-testing-base" % sparkTestVersion.value % "test" excludeAll (excludeSpark),
66-
"com.github.scopt" %% "scopt" % "3.7.1",
67-
"org.scala-lang" % "scala-library" % scalaVersion.value,
68-
"com.typesafe.play" %% "play-json" % "2.10.5" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
69-
"com.fasterxml.jackson.core" % "jackson-annotations" % jacksonVersion.value,
70-
"com.fasterxml.jackson.core" % "jackson-core" % jacksonVersion.value,
71-
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion.value,
70+
"com.github.scopt" %% "scopt" % "4.1.0",
71+
"org.scala-lang" % "scala-library" % scalaVersion.value,
72+
"com.typesafe.play" %% "play-json" % "2.10.6" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
73+
"com.fasterxml.jackson.core" % "jackson-annotations" % jacksonVersion.value,
74+
"com.fasterxml.jackson.core" % "jackson-core" % jacksonVersion.value,
75+
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion.value,
7276
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % jacksonVersion.value,
7377
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % jacksonVersion.value,
7478
"com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % jacksonVersion.value,
7579
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % jacksonVersion.value,
7680
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion.value,
77-
"com.hubspot.jinjava" % "jinjava" % "2.7.2" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
78-
"org.influxdb" % "influxdb-java" % "2.23",
79-
"io.github.spark-redshift-community" %% "spark-redshift" % "6.2.0-spark_3.3" excludeAll (excludeAWS, excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
81+
"com.hubspot.jinjava" % "jinjava" % "2.7.4" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
82+
"org.influxdb" % "influxdb-java" % "2.24",
83+
"io.github.spark-redshift-community" %% "spark-redshift" % "6.3.0-spark_3.5" excludeAll (excludeAWS, excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
8084
"com.segment.analytics.java" % "analytics" % "2.1.1" % "provided",
81-
"com.datastax.spark" %% "spark-cassandra-connector" % "3.5.0",
85+
"com.datastax.spark" %% "spark-cassandra-connector" % "3.5.1",
8286
"com.redislabs" %% "spark-redis" % "3.1.0",
83-
"org.apache.kafka" %% "kafka" % "3.7.0",
84-
"za.co.absa" %% "abris" % "3.2.1" % "provided" excludeAll (excludeAvro, excludeSpark),
87+
"org.apache.kafka" %% "kafka" % "3.9.0",
88+
"za.co.absa" %% "abris" % "3.2.2" % "provided" excludeAll (excludeAvro, excludeSpark),
8589
"org.apache.hudi" %% "hudi-spark-bundle" % "0.10.0" % "provided",
86-
"org.apache.parquet" % "parquet-avro" % "1.14.0" % "provided",
87-
"com.amazon.deequ" % "deequ" % "2.0.7-spark-3.3" excludeAll (excludeSpark, excludeScalanlp),
88-
"org.apache.avro" % "avro" % "1.11.3" % "provided",
89-
"com.databricks" %% "spark-xml" % "0.18.0",
90-
"com.outr" %% "hasher" % "1.2.2",
91-
"org.mongodb.spark" %% "mongo-spark-connector" % "10.3.0",
92-
"mysql" % "mysql-connector-java" % "8.0.33",
93-
"org.apache.logging.log4j" % "log4j-api" % "2.23.1" % "provided",
94-
"org.apache.logging.log4j" % "log4j-core" % "2.23.1" % "provided",
95-
"org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.23.1" % "provided",
96-
"org.postgresql" % "postgresql" % "42.7.3",
97-
"io.delta" %% "delta-core" % "2.4.0",
98-
"io.vertx" % "vertx-json-schema" % "4.5.9" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
99-
"com.google.guava" % "guava" % "25.0-jre",
100-
"org.apache.sedona" %% "sedona-spark-3.0" % "1.6.0" excludeAll (excludeSpark),
101-
"org.datasyslab" % "geotools-wrapper" % "1.6.0-31.0" excludeAll (excludeSpark),
102-
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.767" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
103-
"com.amazonaws" % "aws-java-sdk-dynamodb" % "1.12.767" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
104-
"software.amazon.awssdk" % "dynamodb" % "2.26.30" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
105-
"software.amazon.awssdk" % "glue" % "2.26.30" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
106-
"software.amazon.awssdk" % "s3" % "2.26.30" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
107-
"software.amazon.awssdk" % "sts" % "2.26.30" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
108-
"org.apache.iceberg" %% "iceberg-spark-runtime-3.3" % "1.6.0" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
109-
"com.jayway.jsonpath" % "json-path" % "2.9.0" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
110-
"io.trino" % "trino-jdbc" % "453",
111-
"com.syncron.amazonaws" % "simba-athena-jdbc-driver" % "2.1.5" from s"https://downloads.athena.us-east-1.amazonaws.com/drivers/JDBC/SimbaAthenaJDBC-2.1.5.1000/AthenaJDBC42-2.1.5.1000.jar"
90+
"org.apache.parquet" % "parquet-avro" % "1.15.0" % "provided",
91+
"com.amazon.deequ" % "deequ" % ("2.0.9-spark-" + sparkShortVersion.value) excludeAll (excludeSpark, excludeScalanlp),
92+
"org.apache.avro" % "avro" % "1.12.0" % "provided",
93+
"com.databricks" %% "spark-xml" % "0.18.0",
94+
"com.outr" %% "hasher" % "1.2.2",
95+
"org.mongodb.spark" %% "mongo-spark-connector" % "10.4.1",
96+
"mysql" % "mysql-connector-java" % "8.0.33",
97+
"org.apache.logging.log4j" % "log4j-api" % "2.24.3" % "provided",
98+
"org.apache.logging.log4j" % "log4j-core" % "2.24.3" % "provided",
99+
"org.apache.logging.log4j" % "log4j-slf4j-impl" % "2.24.3" % "provided",
100+
"org.postgresql" % "postgresql" % "42.7.5",
101+
"io.delta" %% "delta-core" % "2.4.0",
102+
"io.vertx" % "vertx-json-schema" % "4.5.12" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
103+
"com.google.guava" % "guava" % "25.1-jre",
104+
"org.apache.sedona" %% ("sedona-spark-" + sparkShortVersion.value) % "1.6.1" excludeAll (excludeSpark),
105+
"org.datasyslab" % "geotools-wrapper" % "1.7.0-28.5" excludeAll (excludeSpark),
106+
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.780" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
107+
"com.amazonaws" % "aws-java-sdk-dynamodb" % "1.12.780" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
108+
"software.amazon.awssdk" % "dynamodb" % "2.30.15" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
109+
"software.amazon.awssdk" % "glue" % "2.30.15" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
110+
"software.amazon.awssdk" % "s3" % "2.30.15" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
111+
"software.amazon.awssdk" % "sts" % "2.30.15" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
112+
"org.apache.iceberg" %% ("iceberg-spark-runtime-" + sparkShortVersion.value) % "1.7.1" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
113+
"com.jayway.jsonpath" % "json-path" % "2.9.0" excludeAll (excludeJacksonCore, excludeJacksonDataFormat, excludeJacksonDataType, excludeJacksonModule),
114+
"io.trino" % "trino-jdbc" % "470",
115+
"com.syncron.amazonaws" % "simba-athena-jdbc-driver" % "2.1.5" from s"https://downloads.athena.us-east-1.amazonaws.com/drivers/JDBC/SimbaAthenaJDBC-2.1.5.1000/AthenaJDBC42-2.1.5.1000.jar",
116+
"net.snowflake" % "snowflake-jdbc" % "3.22.0"
112117
)
113118

114119
resolvers ++= Seq(

project/plugins.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "1.0.0")
44
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.15")
55
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
66
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.11.0")
7+
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.4")

src/main/scala/com/yotpo/metorikku/output/writers/redis/RedisOutputWriter.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.yotpo.metorikku.output.writers.redis
22

33
import com.redislabs.provider.redis._
4+
import com.yotpo.metorikku.utils.FileUtils
45
import com.yotpo.metorikku.configuration.job.output.Redis
56
import com.yotpo.metorikku.output.Writer
67
import com.yotpo.metorikku.output.WriterSessionRegistration
@@ -9,8 +10,6 @@ import org.apache.spark.SparkConf
910
import org.apache.spark.sql.DataFrame
1011
import org.apache.spark.sql.SparkSession
1112

12-
import scala.util.parsing.json.JSONObject
13-
1413
object RedisOutputWriter extends WriterSessionRegistration {
1514
def addConfToSparkSession(sparkConf: SparkConf, redisConf: Redis): Unit = {
1615
sparkConf.set(s"redis.host", redisConf.host)
@@ -38,8 +37,13 @@ class RedisOutputWriter(props: Map[String, String], sparkSession: SparkSession)
3837
.na
3938
.fill("")
4039
.map(row =>
41-
row.getAs[Any](redisOutputOptions.keyColumn).toString ->
42-
JSONObject(row.getValuesMap(columns)).toString()
40+
row.getAs[Any](redisOutputOptions.keyColumn).toString -> {
41+
FileUtils
42+
.getObjectMapperByExtension("json") match {
43+
case Some(mapper) => mapper.writeValueAsString(row.getValuesMap(columns))
44+
case _ => throw new IllegalStateException("JSON mapper not found")
45+
}
46+
}
4347
)
4448
log.info(s"Writting Dataframe into redis with key ${redisOutputOptions.keyColumn}")
4549
redisDF.sparkSession.sparkContext.toRedisKV(redisDF.toJavaRDD)

src/main/scala/com/yotpo/metorikku/test/StreamMockInput.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class StreamMockInput(fileInput: File) extends File("", None, None, None, None)
1313
case class StreamMockInputReader(val name: String, fileInput: File) extends Reader {
1414
def read(sparkSession: SparkSession): DataFrame = {
1515
val df = fileInput.getReader(name).read(sparkSession)
16-
implicit val encoder = RowEncoder(df.schema)
16+
implicit val encoder = RowEncoder.encoderFor(df.schema)
1717
implicit val sqlContext = sparkSession.sqlContext
1818
val stream = MemoryStream[Row]
1919
stream.addData(df.collect())

0 commit comments

Comments
 (0)