Skip to content

Commit 6a0d032

Browse files
[refacto] Simplify unnecessary complex test in Flink 2
1 parent 2307833 commit 6a0d032

File tree

2 files changed

+5
-24
lines changed

2 files changed

+5
-24
lines changed

modules/flink-2-api/src/test/scala/org/apache/flinkx/api/StreamExecutionEnvironmentTest.scala

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import org.apache.flink.util.FlinkRuntimeException
99
import org.scalatest.flatspec.AnyFlatSpec
1010
import org.scalatest.matchers.should.Matchers
1111

12-
import scala.util.{Failure, Success, Try}
13-
1412
class StreamExecutionEnvironmentTest extends AnyFlatSpec with Matchers with IntegrationTest {
1513

1614
it should "create a stream from a source" in {
@@ -26,32 +24,19 @@ class StreamExecutionEnvironmentTest extends AnyFlatSpec with Matchers with Inte
2624
}
2725

2826
it should "create a stream from a sequence" in {
29-
import org.apache.flinkx.api.serializers._
27+
import org.apache.flinkx.api.serializers.*
3028
val typeInfo = implicitly[TypeInformation[Long]]
3129

3230
val stream = env.fromSequence(1, 100)
3331

3432
stream.dataType shouldBe typeInfo
3533
}
3634

37-
"From Flink 1.19, TypeInformation.of(Class)" should "fail-fast trying to resolve Scala type" in {
38-
Try(Class.forName("org.apache.flink.configuration.PipelineOptions").getField("SERIALIZATION_CONFIG")) match {
39-
case Failure(_) => // Before Flink 1.19: no fail-fast, exception happens at execution
40-
implicit val typeInfo: TypeInformation[Option[Int]] = TypeInformation.of(classOf[Option[Int]])
41-
val stream = env.fromElements(Some(1), None, Some(100))
42-
val exception = intercept[UnsupportedOperationException] {
43-
stream.executeAndCollect(3)
44-
}
45-
exception.getMessage should startWith(
46-
"Generic types have been disabled in the ExecutionConfig and type scala.Option is treated as a generic type."
47-
)
48-
49-
case Success(_) => // From Flink 1.19: fail-fast at Scala type resolution
50-
val exception = intercept[FlinkRuntimeException] {
51-
TypeInformation.of(classOf[Option[Int]])
52-
}
53-
exception.getMessage should startWith("You are using a 'Class' to resolve 'scala.Option' Scala type.")
35+
"TypeInformation.of(Class)" should "fail-fast trying to resolve Scala type" in {
36+
val exception = intercept[FlinkRuntimeException] {
37+
TypeInformation.of(classOf[Option[Int]])
5438
}
39+
exception.getMessage should startWith("You are using a 'Class' to resolve 'scala.Option' Scala type.")
5540
}
5641

5742
// --------------------------------------------------------------------------

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/serializer/SeqSerializer.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,8 @@ package org.apache.flinkx.api.serializer
33
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshot}
44
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
55

6-
import scala.reflect.ClassTag
7-
86
class SeqSerializer[T](child: TypeSerializer[T], clazz: Class[T]) extends MutableSerializer[Seq[T]] {
97

10-
private implicit val classTag: ClassTag[T] = ClassTag(clazz)
11-
128
override val isImmutableType: Boolean = child.isImmutableType
139

1410
override def copy(from: Seq[T]): Seq[T] = {

0 commit comments

Comments
 (0)