Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,18 @@ jobs:
runs-on: ubuntu-22.04
strategy:
matrix:
java: [11]
java: [11, 17]
scala: [2.13.16, 3.3.6]
flink: [1.18.1, 1.19.1]
flink: [1.19.3, 1.20.2]
sbt-module: ['flink-1-api', 'scala-api-common']
include:
- scala: 3.3.6
java: 17
flink: 1.20.0
sbt-module: 'flink-1-api'
- scala: 3.3.6
- scala: 2.13.16
java: 17
flink: 1.20.0
sbt-module: 'scala-api-common'
flink: 2.0.0
sbt-module: 'flink-2-api'
- scala: 3.3.6
java: 17
flink: 2.0.0
java: 21
flink: 2.0.0
sbt-module: 'flink-2-api'
env:
JAVA_OPTIONS: '--add-opens java.base/java.lang=ALL-UNNAMED'
Expand All @@ -47,7 +43,7 @@ jobs:
run: JAVA_OPTS=$JAVA_OPTIONS sbt "++ ${{ matrix.scala }} docs/mdoc"
- name: Run tests on examples
# always running on Scala 3.x version by default
if: ${{ !startsWith(matrix.flink, '1.18') && !startsWith(matrix.flink, '2.') }}
if: ${{ !startsWith(matrix.flink, '2.') }}
run: JAVA_OPTS=$JAVA_OPTIONS sbt -DflinkVersion1=${{ matrix.flink }} "project examples; test"
- name: Run tests on Flink API
run: JAVA_OPTS=$JAVA_OPTIONS sbt -DflinkVersion1=${{ matrix.flink }} -DflinkVersion2=${{ matrix.flink }} "++ ${{ matrix.scala }}; project ${{ matrix.sbt-module }}; test"
run: JAVA_OPTS=$JAVA_OPTIONS sbt -DflinkVersion1=${{ matrix.flink }} -DflinkVersion2=${{ matrix.flink }} "++ ${{ matrix.scala }}; project ${{ matrix.sbt-module }}; test"
12 changes: 8 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Global / excludeLintKeys := Set(git.useGitDescribe, crossScalaVersions)

lazy val rootScalaVersion = "3.3.6"
lazy val crossVersions = Seq("2.13.16", rootScalaVersion)
lazy val flinkVersion1 = System.getProperty("flinkVersion1", "1.20.1")
lazy val flinkVersion1 = System.getProperty("flinkVersion1", "1.20.2")
lazy val flinkVersion2 = System.getProperty("flinkVersion2", "2.0.0")

lazy val root = (project in file("."))
Expand Down Expand Up @@ -114,9 +114,12 @@ lazy val `scala-api-common` = (project in file("modules/flink-common-api"))
scalaVersion := rootScalaVersion,
crossScalaVersions := crossVersions,
libraryDependencies ++= Seq(
"org.apache.flink" % "flink-streaming-java" % flinkVersion1 % Provided,
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
"ch.qos.logback" % "logback-classic" % "1.5.17" % Test
"org.apache.flink" % "flink-streaming-java" % flinkVersion1 % Provided,
"org.apache.flink" % "flink-test-utils" % flinkVersion1 % Test,
("org.apache.flink" % "flink-streaming-java" % flinkVersion1 % Test).classifier("tests"),
"org.typelevel" %% "cats-core" % "2.13.0" % Test,
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
"ch.qos.logback" % "logback-classic" % "1.5.17" % Test
)
)

Expand Down Expand Up @@ -192,6 +195,7 @@ lazy val `examples` = (project in file("modules/examples"))
"ch.qos.logback" % "logback-classic" % "1.4.14" % Provided,
"org.apache.flink" % "flink-test-utils" % flinkVersion1 % Test,
"org.apache.flink" % "flink-streaming-java" % flinkVersion1 % Test classifier "tests",
"org.typelevel" %% "cats-core" % "2.13.0" % Test,
"org.scalatest" %% "scalatest" % "3.2.15" % Test
),
Compile / run := Defaults
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import org.scalatest.time.Seconds
import org.scalatest.time.Span
import org.scalatest.time.Millis
import org.scalatest.Inspectors

import org.apache.flinkx.api.*
import org.apache.flinkx.api.serializers.*

import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
Expand All @@ -26,17 +24,17 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.serialization.SerializerConfigImpl
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.util.Collector

import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters.*

class CustomTriggerTest extends AnyFlatSpec with Matchers with Inspectors:

it should "test custom trigger" in {
val cfg = ExecutionConfig()
val cfg = new SerializerConfigImpl()
val serializer = deriveTypeInformation[TestEvent].createSerializer(
cfg
)
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream}
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
import org.apache.flink.util.Preconditions.checkNotNull
import org.apache.flinkx.api.ScalaStreamOps._
import org.apache.flinkx.api.function.util.{
ScalaAllWindowFunction,
ScalaAllWindowFunctionWrapper,
ScalaProcessAllWindowFunctionWrapper,
ScalaReduceFunction
}
import org.apache.flinkx.api.function.{AllWindowFunction, ProcessAllWindowFunction}
import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
import org.apache.flink.util.Preconditions.checkNotNull
import ScalaStreamOps._

/** A [[AllWindowedStream]] represents a data stream where the stream of elements is split into windows based on a
* [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]]. Window emission is triggered based on a
Expand Down
Loading