Skip to content

Commit 057d276

Browse files
[refacto] Drop Flink 1.18 support to reunify most of Flink 1 & 2 code (#275)
* [refacto] Drop Flink 1.18 support to reunify most of Flink 1 & 2 codebase * [refacto] Remove LowPrioImplicits left over * [refacto] Fix CI * [refacto] Remove SerializerSnapshotTest left over * [refacto] Fix missing conversions of TypeSerializerSnapshot.resolveSchemaCompatibility() * [refacto] Remove LowPrioImplicits left over * [refacto] Clean some imports * [refacto] CI build flink 2 using java 17 and 21 * [refacto] Clean LowPrioImplicits and serializers * [refacto] TypeInformation.createSerializer(ExecutionConfig) should continue to work * [refacto] Simplify unnecessary complex test in Flink 2 * [refacto] Fix import * [refacto] small improvements on stream accessors
1 parent 7613bc8 commit 057d276

File tree

147 files changed

+1310
-4201
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

147 files changed

+1310
-4201
lines changed

.github/workflows/ci.yml

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,18 @@ jobs:
1414
runs-on: ubuntu-22.04
1515
strategy:
1616
matrix:
17-
java: [11]
17+
java: [11, 17]
1818
scala: [2.13.16, 3.3.6]
19-
flink: [1.18.1, 1.19.1]
19+
flink: [1.19.3, 1.20.2]
2020
sbt-module: ['flink-1-api', 'scala-api-common']
2121
include:
22-
- scala: 3.3.6
23-
java: 17
24-
flink: 1.20.0
25-
sbt-module: 'flink-1-api'
26-
- scala: 3.3.6
22+
- scala: 2.13.16
2723
java: 17
28-
flink: 1.20.0
29-
sbt-module: 'scala-api-common'
24+
flink: 2.0.0
25+
sbt-module: 'flink-2-api'
3026
- scala: 3.3.6
31-
java: 17
32-
flink: 2.0.0
27+
java: 21
28+
flink: 2.0.0
3329
sbt-module: 'flink-2-api'
3430
env:
3531
JAVA_OPTIONS: '--add-opens java.base/java.lang=ALL-UNNAMED'
@@ -47,7 +43,7 @@ jobs:
4743
run: JAVA_OPTS=$JAVA_OPTIONS sbt "++ ${{ matrix.scala }} docs/mdoc"
4844
- name: Run tests on examples
4945
# always running on Scala 3.x version by default
50-
if: ${{ !startsWith(matrix.flink, '1.18') && !startsWith(matrix.flink, '2.') }}
46+
if: ${{ !startsWith(matrix.flink, '2.') }}
5147
run: JAVA_OPTS=$JAVA_OPTIONS sbt -DflinkVersion1=${{ matrix.flink }} "project examples; test"
5248
- name: Run tests on Flink API
53-
run: JAVA_OPTS=$JAVA_OPTIONS sbt -DflinkVersion1=${{ matrix.flink }} -DflinkVersion2=${{ matrix.flink }} "++ ${{ matrix.scala }}; project ${{ matrix.sbt-module }}; test"
49+
run: JAVA_OPTS=$JAVA_OPTIONS sbt -DflinkVersion1=${{ matrix.flink }} -DflinkVersion2=${{ matrix.flink }} "++ ${{ matrix.scala }}; project ${{ matrix.sbt-module }}; test"

build.sbt

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Global / excludeLintKeys := Set(git.useGitDescribe, crossScalaVersions)
66

77
lazy val rootScalaVersion = "3.3.6"
88
lazy val crossVersions = Seq("2.13.16", rootScalaVersion)
9-
lazy val flinkVersion1 = System.getProperty("flinkVersion1", "1.20.1")
9+
lazy val flinkVersion1 = System.getProperty("flinkVersion1", "1.20.2")
1010
lazy val flinkVersion2 = System.getProperty("flinkVersion2", "2.0.0")
1111

1212
lazy val root = (project in file("."))
@@ -114,9 +114,12 @@ lazy val `scala-api-common` = (project in file("modules/flink-common-api"))
114114
scalaVersion := rootScalaVersion,
115115
crossScalaVersions := crossVersions,
116116
libraryDependencies ++= Seq(
117-
"org.apache.flink" % "flink-streaming-java" % flinkVersion1 % Provided,
118-
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
119-
"ch.qos.logback" % "logback-classic" % "1.5.17" % Test
117+
"org.apache.flink" % "flink-streaming-java" % flinkVersion1 % Provided,
118+
"org.apache.flink" % "flink-test-utils" % flinkVersion1 % Test,
119+
("org.apache.flink" % "flink-streaming-java" % flinkVersion1 % Test).classifier("tests"),
120+
"org.typelevel" %% "cats-core" % "2.13.0" % Test,
121+
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
122+
"ch.qos.logback" % "logback-classic" % "1.5.17" % Test
120123
)
121124
)
122125

@@ -192,6 +195,7 @@ lazy val `examples` = (project in file("modules/examples"))
192195
"ch.qos.logback" % "logback-classic" % "1.4.14" % Provided,
193196
"org.apache.flink" % "flink-test-utils" % flinkVersion1 % Test,
194197
"org.apache.flink" % "flink-streaming-java" % flinkVersion1 % Test classifier "tests",
198+
"org.typelevel" %% "cats-core" % "2.13.0" % Test,
195199
"org.scalatest" %% "scalatest" % "3.2.15" % Test
196200
),
197201
Compile / run := Defaults

modules/examples/src/test/scala/org/example/CustomTriggerTests.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,8 @@ import org.scalatest.time.Seconds
88
import org.scalatest.time.Span
99
import org.scalatest.time.Millis
1010
import org.scalatest.Inspectors
11-
1211
import org.apache.flinkx.api.*
1312
import org.apache.flinkx.api.serializers.*
14-
1513
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
1614
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
1715
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
@@ -26,17 +24,17 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
2624
import org.apache.flink.streaming.api.watermark.Watermark
2725
import org.apache.flink.api.common.typeinfo.TypeInformation
2826
import org.apache.flink.api.common.ExecutionConfig
27+
import org.apache.flink.api.common.serialization.SerializerConfigImpl
2928
import org.apache.flink.api.common.state.ReducingStateDescriptor
3029
import org.apache.flink.util.Collector
3130

3231
import java.util.concurrent.TimeUnit
33-
3432
import scala.collection.JavaConverters.*
3533

3634
class CustomTriggerTest extends AnyFlatSpec with Matchers with Inspectors:
3735

3836
it should "test custom trigger" in {
39-
val cfg = ExecutionConfig()
37+
val cfg = new SerializerConfigImpl()
4038
val serializer = deriveTypeInformation[TestEvent].createSerializer(
4139
cfg
4240
)

modules/flink-1-api/src/main/scala-2/org/apache/flinkx/api/LowPrioImplicits.scala

Lines changed: 0 additions & 75 deletions
This file was deleted.

modules/flink-1-api/src/main/scala-3/org/apache/flinkx/api/LowPrioImplicits.scala

Lines changed: 0 additions & 85 deletions
This file was deleted.

modules/flink-1-api/src/main/scala/org/apache/flinkx/api/AllWindowedStream.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,20 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
66
import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream}
77
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
88
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
9+
import org.apache.flink.streaming.api.windowing.evictors.Evictor
10+
import org.apache.flink.streaming.api.windowing.time.Time
11+
import org.apache.flink.streaming.api.windowing.triggers.Trigger
12+
import org.apache.flink.streaming.api.windowing.windows.Window
13+
import org.apache.flink.util.Collector
14+
import org.apache.flink.util.Preconditions.checkNotNull
15+
import org.apache.flinkx.api.ScalaStreamOps._
916
import org.apache.flinkx.api.function.util.{
1017
ScalaAllWindowFunction,
1118
ScalaAllWindowFunctionWrapper,
1219
ScalaProcessAllWindowFunctionWrapper,
1320
ScalaReduceFunction
1421
}
1522
import org.apache.flinkx.api.function.{AllWindowFunction, ProcessAllWindowFunction}
16-
import org.apache.flink.streaming.api.windowing.evictors.Evictor
17-
import org.apache.flink.streaming.api.windowing.time.Time
18-
import org.apache.flink.streaming.api.windowing.triggers.Trigger
19-
import org.apache.flink.streaming.api.windowing.windows.Window
20-
import org.apache.flink.util.Collector
21-
import org.apache.flink.util.Preconditions.checkNotNull
22-
import ScalaStreamOps._
2323

2424
/** A [[AllWindowedStream]] represents a data stream where the stream of elements is split into windows based on a
2525
* [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]. Window emission is triggered based on a

0 commit comments

Comments
 (0)