Skip to content

Commit 2307833

Browse files
[refacto] TypeInformation.createSerializer(ExecutionConfig) should continue to work
1 parent c0ae030 commit 2307833

File tree

8 files changed

+8
-8
lines changed

8 files changed

+8
-8
lines changed

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/CaseClassTypeInfo.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class CaseClassTypeInfo[T <: Product](
4949
override def createSerializer(config: SerializerConfig): TypeSerializer[T] = ser.duplicate()
5050

5151
// override modifier removed to satisfy both implementation requirement of Flink 1.x and removal in 2.x
52-
def createSerializer(config: ExecutionConfig): TypeSerializer[T] = null
52+
def createSerializer(config: ExecutionConfig): TypeSerializer[T] = ser.duplicate()
5353

5454
private val REGEX_INT_FIELD: String = "[0-9]+"
5555
private val REGEX_STR_FIELD: String = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*"

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/CollectionTypeInformation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ case class CollectionTypeInformation[T: ClassTag](serializer: TypeSerializer[T])
1212

1313
override def createSerializer(config: SerializerConfig): TypeSerializer[T] = serializer.duplicate()
1414
// override modifier removed to satisfy both implementation requirement of Flink 1.x and removal in 2.x
15-
def createSerializer(config: ExecutionConfig): TypeSerializer[T] = null
15+
def createSerializer(config: ExecutionConfig): TypeSerializer[T] = serializer.duplicate()
1616

1717
override def isBasicType: Boolean = false
1818
override def isTupleType: Boolean = false

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/CoproductTypeInformation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ case class CoproductTypeInformation[T](c: Class[T], ser: TypeSerializer[T]) exte
99

1010
override def createSerializer(config: SerializerConfig): TypeSerializer[T] = ser.duplicate()
1111
// override modifier removed to satisfy both implementation requirement of Flink 1.x and removal in 2.x
12-
def createSerializer(config: ExecutionConfig): TypeSerializer[T] = null
12+
def createSerializer(config: ExecutionConfig): TypeSerializer[T] = ser.duplicate()
1313

1414
override def isBasicType: Boolean = false
1515
override def isTupleType: Boolean = false

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/EitherTypeInfo.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class EitherTypeInfo[A, B, T <: Either[A, B]](
7070
}
7171

7272
// override modifier removed to satisfy both implementation requirement of Flink 1.x and removal in 2.x
73-
def createSerializer(config: ExecutionConfig): TypeSerializer[T] = null
73+
def createSerializer(config: ExecutionConfig): TypeSerializer[T] = createSerializer(config.getSerializerConfig)
7474

7575
override def equals(obj: Any): Boolean = {
7676
obj match {

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/MappedTypeInformation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ case class MappedTypeInformation[A: ClassTag, B](mapper: TypeMapper[A, B], neste
1515
override def createSerializer(config: SerializerConfig): TypeSerializer[A] =
1616
new MappedSerializer(mapper, nested.createSerializer(config))
1717
// override modifier removed to satisfy both implementation requirement of Flink 1.x and removal in 2.x
18-
def createSerializer(config: ExecutionConfig): TypeSerializer[A] = null
18+
def createSerializer(config: ExecutionConfig): TypeSerializer[A] = createSerializer(config.getSerializerConfig)
1919

2020
override def isKeyType: Boolean = nested.isKeyType
2121
override def getTotalFields: Int = nested.getTotalFields

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/OptionTypeInfo.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ class OptionTypeInfo[A, T <: Option[A]](private val elemTypeInfo: TypeInformatio
7474
}
7575

7676
// override modifier removed to satisfy both implementation requirement of Flink 1.x and removal in 2.x
77-
def createSerializer(config: ExecutionConfig): TypeSerializer[T] = null
77+
def createSerializer(config: ExecutionConfig): TypeSerializer[T] = createSerializer(config.getSerializerConfig)
7878

7979
override def toString = s"Option[$elemTypeInfo]"
8080

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/SimpleTypeInformation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ abstract class SimpleTypeInformation[T: ClassTag: TypeSerializer] extends TypeIn
1111

1212
override def createSerializer(config: SerializerConfig): TypeSerializer[T] = implicitly[TypeSerializer[T]].duplicate()
1313
// override modifier removed to satisfy both implementation requirement of Flink 1.x and removal in 2.x
14-
def createSerializer(config: ExecutionConfig): TypeSerializer[T] = null
14+
def createSerializer(config: ExecutionConfig): TypeSerializer[T] = implicitly[TypeSerializer[T]].duplicate()
1515

1616
override def isBasicType: Boolean = false
1717
override def isTupleType: Boolean = false

modules/flink-common-api/src/main/scala/org/apache/flinkx/api/typeinfo/UnitTypeInformation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ class UnitTypeInformation extends TypeInformation[Unit] {
1010

1111
override def createSerializer(config: SerializerConfig): TypeSerializer[Unit] = new UnitSerializer()
1212
// override modifier removed to satisfy both implementation requirement of Flink 1.x and removal in 2.x
13-
def createSerializer(config: ExecutionConfig): TypeSerializer[Unit] = null
13+
def createSerializer(config: ExecutionConfig): TypeSerializer[Unit] = new UnitSerializer()
1414

1515
override def isKeyType: Boolean = false
1616
override def getTotalFields: Int = 0

0 commit comments

Comments
 (0)